Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# OpenSAMPL data paths
archive/
ntp-snapshots/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
6 changes: 6 additions & 0 deletions opensampl/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class BaseConfig(BaseSettings):
False, description="Allow insecure requests to be made to the backend", alias="INSECURE_REQUESTS"
)

ENABLE_GEOLOCATE: bool = Field(
False,
description="Enable geolocate features which extract a location from ip addresses",
alias="ENABLE_GEOLOCATE",
)

@field_serializer("ARCHIVE_PATH")
def convert_to_str(self, v: Path) -> str:
"""Convert archive path to a string for serialization"""
Expand Down
24 changes: 20 additions & 4 deletions opensampl/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
configuration validation, and settings management.
"""

from __future__ import annotations

import shlex
from importlib.resources import as_file, files
from pathlib import Path
from types import ModuleType
from typing import Any, Union
from typing import TYPE_CHECKING, Any

from dotenv import dotenv_values, set_key
from loguru import logger
Expand All @@ -20,8 +21,11 @@
from opensampl.config.base import BaseConfig
from opensampl.server import check_command

if TYPE_CHECKING:
from types import ModuleType


def get_resolved_resource_path(pkg: Union[str, ModuleType], relative_path: str) -> str:
def get_resolved_resource_path(pkg: str | ModuleType, relative_path: str) -> str:
"""Retrieve the resolved path to a resource in a package."""
resource = files(pkg).joinpath(relative_path)
with as_file(resource) as real_path:
Expand All @@ -35,6 +39,8 @@ class ServerConfig(BaseConfig):

COMPOSE_FILE: str = Field(default="", description="Fully resolved path to the Docker Compose file.")

OVERRIDE_FILE: str | None = Field(defualt=None, description="Override for the compose file")

DOCKER_ENV_FILE: str = Field(default="", description="Fully resolved path to the Docker .env file.")

docker_env_values: dict[str, Any] = Field(default_factory=dict, init=False)
Expand All @@ -54,7 +60,7 @@ def _ignore_in_set(self) -> list[str]:
return ignored

@model_validator(mode="after")
def get_docker_values(self) -> "ServerConfig":
def get_docker_values(self) -> ServerConfig:
"""Get the values that the docker containers will use on startup"""
self.docker_env_values = dotenv_values(self.DOCKER_ENV_FILE)
return self
Expand All @@ -67,6 +73,14 @@ def resolve_compose_file(cls, v: Any) -> str:
return get_resolved_resource_path(opensampl.server, "docker-compose.yaml")
return str(Path(v).expanduser().resolve())

@field_validator("OVERRIDE_FILE", mode="before")
@classmethod
def resolve_override_file(cls, v: Any) -> str:
"""Resolve the provided compose file for docker to use, or default to the docker-compose.yaml provided"""
if v:
return str(Path(v).expanduser().resolve())
return v

@field_validator("DOCKER_ENV_FILE", mode="before")
@classmethod
def resolve_docker_env_file(cls, v: Any) -> str:
Expand All @@ -89,6 +103,8 @@ def build_docker_compose_base(self):
compose_command = self.get_compose_command()
command = shlex.split(compose_command)
command.extend(["--env-file", self.DOCKER_ENV_FILE, "-f", self.COMPOSE_FILE])
if self.OVERRIDE_FILE:
command.extend(["-f", self.OVERRIDE_FILE])
return command

def set_by_name(self, name: str, value: Any):
Expand Down
23 changes: 23 additions & 0 deletions opensampl/db/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class ProbeMetadata(Base):
adva_metadata = relationship("AdvaMetadata", back_populates="probe", uselist=False)
microchip_twst_metadata = relationship("MicrochipTWSTMetadata", back_populates="probe", uselist=False)
microchip_tp4100_metadata = relationship("MicrochipTP4100Metadata", back_populates="probe", uselist=False)
ntp_metadata = relationship("NtpMetadata", back_populates="probe", uselist=False)

# --- CUSTOM PROBE METADATA RELATIONSHIP ---

Expand Down Expand Up @@ -433,8 +434,30 @@ class MicrochipTP4100Metadata(Base):
probe = relationship("ProbeMetadata", back_populates="microchip_tp4100_metadata")


class NtpMetadata(Base):
"""NTP Clock Probe specific metadata"""

__tablename__ = "ntp_metadata"

probe_uuid = Column(String, ForeignKey("probe_metadata.uuid"), primary_key=True)
mode = Column(Text)
reference = Column(Boolean, comment="Is used as a reference for other probes")
target_host = Column(Text)
target_port = Column(Integer)
sync_status = Column(Text)
leap_status = Column(Text)
reference_id = Column(Text)
observation_sources = Column(JSONB)
collection_id = Column(Text)
collection_ip = Column(Text)
timeout = Column(Float)
additional_metadata = Column(JSONB)
probe = relationship("ProbeMetadata", back_populates="ntp_metadata")


# --- CUSTOM TABLES --- !! Do not remove line, used as reference when inserting metadata table


# --- TABLE FUNCTIONS ---


Expand Down
115 changes: 115 additions & 0 deletions opensampl/helpers/geolocator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Associate NTP probes with ``castdb.locations`` for the geospatial Grafana dashboard."""

from __future__ import annotations

import ipaddress
import json
import os
import socket
import urllib.request
from typing import TYPE_CHECKING

from loguru import logger

from opensampl.load.table_factory import TableFactory

if TYPE_CHECKING:
from sqlalchemy.orm import Session


_GEO_CACHE: dict[str, tuple[float, float, str]] = {}


def _env_bool(name: str, default: bool) -> bool:
v = os.getenv(name)
if v is None:
return default
return v.strip().lower() in ("1", "true", "yes", "on")


def _default_lab_coords() -> tuple[float, float]:
lat = float(os.getenv("DEFAULT_LAT", "37.4419"))
lon = float(os.getenv("DEFAULT_LON", "-122.1430"))
return lat, lon


def _is_private_or_loopback(ip: str) -> bool:
try:
addr = ipaddress.ip_address(ip)
except ValueError:
return True
return bool(addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_reserved)


def _lookup_geo_ipapi(ip: str) -> tuple[float, float, str] | None:
if ip in _GEO_CACHE:
return _GEO_CACHE[ip]
url = f"http://ip-api.com/json/{ip}?fields=status,lat,lon,city,country"
try:
with urllib.request.urlopen(url, timeout=4.0) as resp: # noqa: S310
body = json.loads(resp.read().decode("utf-8"))
except Exception as e:
logger.warning("ip-api geolocation failed for {}: {}", ip, e)
return None

if body.get("status") != "success" or body.get("lat") is None or body.get("lon") is None:
logger.warning("ip-api returned no coordinates for {}", ip)
return None

city = body.get("city") or ""
country = body.get("country") or ""
label = ", ".join(x for x in (city, country) if x)
out = (float(body["lat"]), float(body["lon"]), label or ip)
_GEO_CACHE[ip] = out
return out


def create_location(session: Session, geolocate_enabled: bool, ip_address: str, geo_override: dict) -> str | None:
"""
Set probe ``name``, ``public``, and ``location_uuid`` on NTP metadata before ``probe_metadata`` insert.

Uses ``additional_metadata.geo_override`` when present (lat/lon/label). Otherwise resolves the remote
host, uses RFC1918/loopback defaults from env, or ip-api.com for public IPs (HTTP, no API key).
"""
lat: float | None = None
lon: float | None = None
name: str | None = None

if isinstance(geo_override, dict) and geo_override.get("lat") is not None and geo_override.get("lon") is not None:
lat = float(geo_override["lat"])
lon = float(geo_override["lon"])

if isinstance(geo_override, dict) and geo_override.get("name") is not None:
name = geo_override["name"]

if geolocate_enabled and lat is None and lon is None:
ip_for_geo = ip_address
try:
ip_for_geo = socket.gethostbyname(ip_address)
except OSError as e:
logger.debug("Could not resolve {}: {}", ip_address, e)

if _is_private_or_loopback(ip_for_geo):
lat, lon = _default_lab_coords()
else:
geo = _lookup_geo_ipapi(ip_for_geo)
if geo:
lat, lon, _name = geo
name = name or _name
else:
lat, lon = _default_lab_coords()

loc_factory = TableFactory("locations", session=session)
loc = None
if name:
loc = loc_factory.find_existing({"name": name})

if loc is None:
loc = loc_factory.write(
{"name": name, "lat": lat, "lon": lon, "public": True},
if_exists="ignore",
)

if loc:
return loc.uuid
return None
19 changes: 17 additions & 2 deletions opensampl/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from opensampl.config.base import BaseConfig
from opensampl.db.orm import Base, ProbeData
from opensampl.helpers.geolocator import create_location
from opensampl.load.routing import route
from opensampl.load.table_factory import TableFactory
from opensampl.metrics import MetricType
Expand Down Expand Up @@ -125,9 +126,9 @@ def load_time_data(
strict=strict,
session=session,
)
probe = data_definition.probe # ty: ignore[possibly-unbound-attribute]
probe_readable = (
data_definition.probe.name # ty: ignore[possibly-unbound-attribute]
or f"{data_definition.probe.ip_address} ({data_definition.probe.probe_id})" # ty: ignore[possibly-unbound-attribute]
probe.name or f"{probe.ip_address} ({probe.probe_id})" # ty: ignore[possibly-unbound-attribute]
)

if any(x is None for x in [data_definition.probe, data_definition.metric, data_definition.reference]):
Expand Down Expand Up @@ -199,6 +200,19 @@ def load_probe_metadata(

pm_cols = {col.name for col in pm_factory.inspector.columns}
probe_info = {k: data.pop(k) for k in list(data.keys()) if k in pm_cols}
location_name = probe_info.pop("location_name", None)
geolocation = ({"name": location_name} if location_name else {}) | probe_info.pop("geolocation", {})

if geolocation or _config.ENABLE_GEOLOCATE:
location_uuid = create_location(
session,
geolocate_enabled=_config.ENABLE_GEOLOCATE,
geo_override=geolocation,
ip_address=probe_key.ip_address,
)
if location_uuid:
probe_info.update({"location_uuid": location_uuid})

probe_info.update({"probe_id": probe_key.probe_id, "ip_address": probe_key.ip_address, "vendor": vendor.name})
probe = pm_factory.write(data=probe_info, if_exists="update")

Expand Down Expand Up @@ -227,6 +241,7 @@ def create_new_tables(*, _config: BaseConfig, create_schema: bool = True, sessio
session.execute(text(f"CREATE SCHEMA IF NOT EXISTS {Base.metadata.schema}"))
session.commit()
Base.metadata.create_all(session.bind)
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Error writing to table: {e}")
Expand Down
57 changes: 57 additions & 0 deletions opensampl/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,62 @@ class METRICS:
unit="unknown",
value_type=object,
)
NTP_DELAY = MetricType(
name="NTP Delay",
description="Round-trip delay (RTT) to the NTP server or observed path delay in seconds",
unit="s",
value_type=float,
)
NTP_JITTER = MetricType(
name="NTP Jitter",
description=(
"Jitter or offset variation for NTP in seconds (true value from chrony/ntpq when available; "
"remote single-packet collection may use a delay/dispersion bound estimate)"
),
unit="s",
value_type=float,
)
NTP_STRATUM = MetricType(
name="NTP Stratum",
description="NTP stratum level (distance from reference clock)",
unit="level",
value_type=float,
)
NTP_REACHABILITY = MetricType(
name="NTP Reachability",
description="NTP reachability register (0-255) as a scalar for plotting",
unit="count",
value_type=float,
)
NTP_DISPERSION = MetricType(
name="NTP Dispersion",
description="Combined error budget / dispersion in seconds",
unit="s",
value_type=float,
)
NTP_ROOT_DELAY = MetricType(
name="NTP Root Delay",
description="Root delay from NTP packet or local estimate in seconds",
unit="s",
value_type=float,
)
NTP_ROOT_DISPERSION = MetricType(
name="NTP Root Dispersion",
description="Root dispersion from NTP packet or local estimate in seconds",
unit="s",
value_type=float,
)
NTP_POLL_INTERVAL = MetricType(
name="NTP Poll Interval",
description="Poll interval in seconds",
unit="s",
value_type=float,
)
NTP_SYNC_HEALTH = MetricType(
name="NTP Sync Health",
description="1.0 if synchronized/healthy, 0.0 otherwise (probe-defined)",
unit="ratio",
value_type=float,
)

# --- CUSTOM METRICS --- !! Do not remove line, used as reference when inserting metric
2 changes: 1 addition & 1 deletion opensampl/mixins/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CollectConfig(BaseModel):

Attributes:
output_dir: When provided, will save collected data as a file to provided directory.
Filename will be automatically generated as {ip_address}_{probe_id}_{vendor}_{timestamp}.txt
Filename will be automatically generated as {vendor}_{ip_address}_{probe_id}_{vendor}_{timestamp}.txt
load: Whether to load collected data directly to the database
duration: Number of seconds to collect data for

Expand Down
Loading