From 2a2b9763fd545346059ec823acaa6f10e98c5039 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Thu, 16 Apr 2026 15:31:44 -0700 Subject: [PATCH 1/2] add otel sink --- README.md | 18 + sdks/python/pyproject.toml | 5 + sdks/python/src/agent_control/__init__.py | 2 + .../python/src/agent_control/observability.py | 19 ++ sdks/python/src/agent_control/otel_sink.py | 317 ++++++++++++++++++ sdks/python/src/agent_control/settings.py | 16 + sdks/python/tests/test_observability.py | 21 +- sdks/python/tests/test_otel_sink.py | 300 +++++++++++++++++ 8 files changed, 697 insertions(+), 1 deletion(-) create mode 100644 sdks/python/src/agent_control/otel_sink.py create mode 100644 sdks/python/tests/test_otel_sink.py diff --git a/README.md b/README.md index 4bd967f7..8acf30cf 100644 --- a/README.md +++ b/README.md @@ -185,6 +185,24 @@ events the SDK emits through its normal event-construction flow. The default SDK sink remains the OSS path to the Agent Control server. To use registered or named custom sinks, set `observability_sink_name` explicitly. +The SDK also includes a built-in OpenTelemetry sink. Install the OTEL extra, +select the `otel` sink, and configure the OTLP exporter through Agent Control +settings or environment variables: + +```bash +uv pip install "agent-control-sdk[otel]" +export AGENT_CONTROL_OBSERVABILITY_SINK_NAME=otel +export AGENT_CONTROL_OTEL_ENABLED=true +export AGENT_CONTROL_OTEL_ENDPOINT=http://localhost:4318/v1/traces +export AGENT_CONTROL_OTEL_HEADERS='{"authorization":"Bearer demo-token"}' +export AGENT_CONTROL_OTEL_SERVICE_NAME=awesome-bot +``` + +If the `otel` sink is selected without an OTLP endpoint/exporter configured, +the OTEL path stays inert and the default OSS SDK-to-server behavior still +remains unchanged unless `observability_sink_name` is explicitly switched away +from `default`. + Next, create a control in Step 4, then run the setup and agent scripts in order to see blocking in action. diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index dd670cd6..e551ace8 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -39,6 +39,11 @@ Repository = "https://github.com/yourusername/agent-control" strands-agents = ["strands-agents>=1.26.0"] google-adk = ["google-adk>=1.0.0"] galileo = ["agent-control-evaluator-galileo>=3.0.0"] +otel = [ + "opentelemetry-api>=1.24.0", + "opentelemetry-sdk>=1.24.0", + "opentelemetry-exporter-otlp-proto-http>=1.24.0", +] [dependency-groups] dev = [ diff --git a/sdks/python/src/agent_control/__init__.py b/sdks/python/src/agent_control/__init__.py index 58ba0cb1..57748a87 100644 --- a/sdks/python/src/agent_control/__init__.py +++ b/sdks/python/src/agent_control/__init__.py @@ -112,6 +112,7 @@ async def handle_input(user_message: str) -> str: unregister_control_event_sink_factory, write_events, ) +from .otel_sink import control_event_to_otel_span from .tracing import ( get_current_span_id, get_current_trace_id, @@ -1380,6 +1381,7 @@ async def main(): "write_events", "shutdown_observability", "is_observability_enabled", + "control_event_to_otel_span", "get_event_batcher", "get_event_sink", "get_registered_control_event_sink_factory_names", diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 6eb4145a..313e66cc 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -28,6 +28,12 @@ Configuration (Environment Variables): # Observability (event batching) AGENT_CONTROL_OBSERVABILITY_ENABLED: Enable observability (default: true) + AGENT_CONTROL_OBSERVABILITY_SINK_NAME: Selected control-event sink (default: default) + AGENT_CONTROL_OBSERVABILITY_SINK_CONFIG: JSON config for the selected sink + AGENT_CONTROL_OTEL_ENABLED: Enable the built-in OTEL sink (default: false) + AGENT_CONTROL_OTEL_ENDPOINT: OTLP HTTP endpoint for exported control-event spans + AGENT_CONTROL_OTEL_HEADERS: JSON object of OTLP exporter headers + AGENT_CONTROL_OTEL_SERVICE_NAME: OTEL service.name for emitted spans AGENT_CONTROL_BATCH_SIZE: Max events per batch (default: 100) AGENT_CONTROL_FLUSH_INTERVAL: Seconds between flushes (default: 5.0) AGENT_CONTROL_SHUTDOWN_JOIN_TIMEOUT: Seconds to wait for worker shutdown (default: 5.0) @@ -75,6 +81,8 @@ if TYPE_CHECKING: from agent_control_models import ControlExecutionEvent +from .otel_sink import OTEL_CONTROL_EVENT_SINK_NAME, create_otel_control_event_sink + # ============================================================================= # Logger Setup - Standard Library Pattern # ============================================================================= @@ -820,6 +828,17 @@ def get_stats(self) -> dict: _used_custom_event_sinks_lock = threading.Lock() +def _register_builtin_control_event_sink_factories() -> None: + """Ensure built-in named sink factories are available.""" + _named_event_sink_factories.register( + OTEL_CONTROL_EVENT_SINK_NAME, + create_otel_control_event_sink, + ) + + +_register_builtin_control_event_sink_factories() + + class _BatcherControlEventSink(BaseControlEventSink): """Default SDK sink backed by the existing queue-based EventBatcher.""" diff --git a/sdks/python/src/agent_control/otel_sink.py b/sdks/python/src/agent_control/otel_sink.py new file mode 100644 index 00000000..0413c496 --- /dev/null +++ b/sdks/python/src/agent_control/otel_sink.py @@ -0,0 +1,317 @@ +"""OpenTelemetry sink support for Agent Control control-execution events.""" + +from __future__ import annotations + +import json +import logging +import os +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any + +from agent_control_models import ControlExecutionEvent, JSONObject +from agent_control_telemetry.sinks import BaseControlEventSink, SinkResult + +from .settings import get_settings +from .tracing import _generate_span_id, validate_span_id, validate_trace_id + +logger = logging.getLogger(__name__) + +OTEL_CONTROL_EVENT_SINK_NAME = "otel" +_OTEL_INSTRUMENTATION_SCOPE = "agent_control.observability" +_OTEL_NOOP_WARNING = ( + "OpenTelemetry sink selected but OpenTelemetry SDK/exporter dependencies are not available; " + "control events will be accepted without being exported" +) + +AttributeValue = str | bool | int | float | list[str] | list[bool] | list[int] | list[float] + + +@dataclass(frozen=True) +class OTELControlEventSpan: + """Normalized OTEL span payload derived from a control event.""" + + name: str + trace_id: str + parent_span_id: str + attributes: dict[str, AttributeValue] + start_time_unix_nano: int + end_time_unix_nano: int + error_message: str | None = None + + +@dataclass(frozen=True) +class OTELSinkConfig: + """Configuration for OTEL sink creation.""" + + enabled: bool + endpoint: str | None + headers: dict[str, str] + service_name: str + + +@dataclass(frozen=True) +class OTELSDKModules: + """References to the OTEL SDK classes/functions used by the sink.""" + + tracer_provider_cls: type[Any] + resource_cls: type[Any] + batch_span_processor_cls: type[Any] + otlp_span_exporter_cls: type[Any] + span_context_cls: type[Any] + non_recording_span_cls: type[Any] + trace_flags_cls: type[Any] + trace_state_cls: type[Any] + span_kind: Any + set_span_in_context: Any + + +def _to_unix_nano(timestamp: datetime, /) -> int: + return int(timestamp.astimezone(UTC).timestamp() * 1_000_000_000) + + +def _normalize_attribute_value(value: object) -> AttributeValue: + """Coerce arbitrary metadata into a value OTEL span attributes accept.""" + if isinstance(value, bool | int | float | str): + return value + if isinstance(value, list): + if all(isinstance(item, bool) for item in value): + return [item for item in value] + if all(isinstance(item, int) and not isinstance(item, bool) for item in value): + return [item for item in value] + if all(isinstance(item, float) for item in value): + return [item for item in value] + if all(isinstance(item, str) for item in value): + return [item for item in value] + if isinstance(value, tuple): + return _normalize_attribute_value(list(value)) + return json.dumps(value, default=str, sort_keys=True) + + +def control_event_to_otel_span(event: ControlExecutionEvent) -> OTELControlEventSpan: + """Convert a control-execution event into OTEL span data.""" + end_time_unix_nano = _to_unix_nano(event.timestamp) + if event.execution_duration_ms is not None: + start_time_unix_nano = max( + 0, + end_time_unix_nano - int(event.execution_duration_ms * 1_000_000), + ) + else: + start_time_unix_nano = end_time_unix_nano + + attributes: dict[str, AttributeValue] = { + "agent_control.control_execution_id": event.control_execution_id, + "agent_control.agent_name": event.agent_name, + "agent_control.control_id": event.control_id, + "agent_control.control_name": event.control_name, + "agent_control.check_stage": event.check_stage, + "agent_control.applies_to": event.applies_to, + "agent_control.action": event.action, + "agent_control.matched": event.matched, + "agent_control.confidence": event.confidence, + "agent_control.event_timestamp": event.timestamp.isoformat(), + } + + if event.execution_duration_ms is not None: + attributes["agent_control.execution_duration_ms"] = event.execution_duration_ms + if event.evaluator_name is not None: + attributes["agent_control.evaluator_name"] = event.evaluator_name + if event.selector_path is not None: + attributes["agent_control.selector_path"] = event.selector_path + if event.error_message is not None: + attributes["agent_control.error_message"] = event.error_message + + for key, value in sorted(event.metadata.items()): + attributes[f"agent_control.metadata.{key}"] = _normalize_attribute_value(value) + + return OTELControlEventSpan( + name="agent_control.control_execution", + trace_id=event.trace_id, + parent_span_id=event.span_id, + attributes=attributes, + start_time_unix_nano=start_time_unix_nano, + end_time_unix_nano=end_time_unix_nano, + error_message=event.error_message, + ) + + +class _NoOpControlEventSink(BaseControlEventSink): + """Sink that accepts events but intentionally emits nothing.""" + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + return SinkResult(accepted=len(events), dropped=0) + + +class OTELControlEventSink(BaseControlEventSink): + """Emit control-execution events as OpenTelemetry spans.""" + + def __init__( + self, + *, + tracer_provider: Any, + tracer: Any, + sdk_modules: OTELSDKModules, + ) -> None: + self._tracer_provider: Any = tracer_provider + self._tracer: Any = tracer + self._sdk_modules = sdk_modules + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + accepted = 0 + dropped = 0 + + for event in events: + try: + self._write_event(event) + accepted += 1 + except Exception: + logger.warning("Failed to emit control event to OTEL", exc_info=True) + dropped += 1 + + return SinkResult(accepted=accepted, dropped=dropped) + + def flush(self) -> None: + force_flush = getattr(self._tracer_provider, "force_flush", None) + if callable(force_flush): + force_flush() + + def close(self) -> None: + shutdown = getattr(self._tracer_provider, "shutdown", None) + if callable(shutdown): + shutdown() + + def _write_event(self, event: ControlExecutionEvent) -> None: + span_data = control_event_to_otel_span(event) + parent_context = self._build_parent_context(span_data) + span = self._tracer.start_span( + span_data.name, + context=parent_context, + kind=self._sdk_modules.span_kind.INTERNAL, + start_time=span_data.start_time_unix_nano, + ) + span.set_attributes(span_data.attributes) + if span_data.error_message: + record_exception = getattr(span, "record_exception", None) + if callable(record_exception): + record_exception(RuntimeError(span_data.error_message)) + span.end(end_time=span_data.end_time_unix_nano) + + def _build_parent_context(self, span_data: OTELControlEventSpan) -> object | None: + trace_id = span_data.trace_id + parent_span_id = span_data.parent_span_id + + if not validate_trace_id(trace_id) or trace_id == "0" * 32: + return None + if not validate_span_id(parent_span_id) or parent_span_id == "0" * 16: + parent_span_id = _generate_span_id() + + parent_span_context = self._sdk_modules.span_context_cls( + trace_id=int(trace_id, 16), + span_id=int(parent_span_id, 16), + is_remote=True, + trace_flags=self._sdk_modules.trace_flags_cls( + self._sdk_modules.trace_flags_cls.SAMPLED + ), + trace_state=self._sdk_modules.trace_state_cls(), + ) + return self._sdk_modules.set_span_in_context( + self._sdk_modules.non_recording_span_cls(parent_span_context) + ) + + +def _resolve_otel_sink_config(config: JSONObject) -> OTELSinkConfig: + """Resolve OTEL sink config from settings with per-sink overrides.""" + settings = get_settings() + + enabled = bool(config.get("enabled", settings.otel_enabled)) + + endpoint_value = config.get("endpoint", settings.otel_endpoint) + endpoint = str(endpoint_value) if endpoint_value else None + + headers_value = config.get("headers", settings.otel_headers) + headers: dict[str, str] = {} + if isinstance(headers_value, dict): + headers = {str(key): str(value) for key, value in headers_value.items()} + + service_name_value = config.get("service_name", settings.otel_service_name) + service_name = str(service_name_value) if service_name_value else "agent-control-sdk" + + return OTELSinkConfig( + enabled=enabled, + endpoint=endpoint, + headers=headers, + service_name=service_name, + ) + + +def _has_explicit_otel_exporter_configuration(config: OTELSinkConfig) -> bool: + """Return whether an OTLP exporter should be configured for the sink.""" + if config.endpoint: + return True + return bool( + os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") or os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + ) + + +def _load_otel_sdk_modules() -> OTELSDKModules: + """Import OTEL SDK modules on demand so the sink remains optional.""" + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( # type: ignore[import-not-found] + OTLPSpanExporter, + ) + from opentelemetry.sdk.resources import Resource # type: ignore[import-not-found] + from opentelemetry.sdk.trace import TracerProvider # type: ignore[import-not-found] + from opentelemetry.sdk.trace.export import BatchSpanProcessor # type: ignore[import-not-found] + from opentelemetry.trace import ( # type: ignore[import-not-found] + NonRecordingSpan, + SpanContext, + SpanKind, + TraceFlags, + TraceState, + set_span_in_context, + ) + + return OTELSDKModules( + tracer_provider_cls=TracerProvider, + resource_cls=Resource, + batch_span_processor_cls=BatchSpanProcessor, + otlp_span_exporter_cls=OTLPSpanExporter, + span_context_cls=SpanContext, + non_recording_span_cls=NonRecordingSpan, + trace_flags_cls=TraceFlags, + trace_state_cls=TraceState, + span_kind=SpanKind, + set_span_in_context=set_span_in_context, + ) + + +def create_otel_control_event_sink(config: JSONObject) -> BaseControlEventSink: + """Create the built-in OTEL control-event sink.""" + resolved_config = _resolve_otel_sink_config(config) + if not resolved_config.enabled: + return _NoOpControlEventSink() + + try: + sdk_modules = _load_otel_sdk_modules() + except ImportError: + logger.warning(_OTEL_NOOP_WARNING) + return _NoOpControlEventSink() + + resource = sdk_modules.resource_cls.create({"service.name": resolved_config.service_name}) + tracer_provider = sdk_modules.tracer_provider_cls(resource=resource) + + if _has_explicit_otel_exporter_configuration(resolved_config): + exporter_kwargs: dict[str, object] = {} + if resolved_config.endpoint: + exporter_kwargs["endpoint"] = resolved_config.endpoint + if resolved_config.headers: + exporter_kwargs["headers"] = resolved_config.headers + exporter = sdk_modules.otlp_span_exporter_cls(**exporter_kwargs) + tracer_provider.add_span_processor(sdk_modules.batch_span_processor_cls(exporter)) + + tracer = tracer_provider.get_tracer(_OTEL_INSTRUMENTATION_SCOPE) + return OTELControlEventSink( + tracer_provider=tracer_provider, + tracer=tracer, + sdk_modules=sdk_modules, + ) diff --git a/sdks/python/src/agent_control/settings.py b/sdks/python/src/agent_control/settings.py index 982811f9..d87572d8 100644 --- a/sdks/python/src/agent_control/settings.py +++ b/sdks/python/src/agent_control/settings.py @@ -70,6 +70,22 @@ class SDKSettings(BaseSettings): default_factory=dict, description="JSON config payload passed to the selected control-event sink", ) + otel_enabled: bool = Field( + default=False, + description="Enable the built-in OpenTelemetry control-event sink", + ) + otel_endpoint: str | None = Field( + default=None, + description="OTLP HTTP endpoint for OTEL control-event export", + ) + otel_headers: dict[str, str] = Field( + default_factory=dict, + description="Headers to pass to the OTLP exporter", + ) + otel_service_name: str = Field( + default="agent-control-sdk", + description="service.name resource for OTEL control-event spans", + ) batch_size: int = Field( default=100, ge=1, diff --git a/sdks/python/tests/test_observability.py b/sdks/python/tests/test_observability.py index e20fb303..f6db817c 100644 --- a/sdks/python/tests/test_observability.py +++ b/sdks/python/tests/test_observability.py @@ -28,6 +28,7 @@ unregister_control_event_sink, write_events, ) +from agent_control.otel_sink import OTEL_CONTROL_EVENT_SINK_NAME from agent_control.settings import SDKSettings, configure_settings, get_settings from agent_control_models import ControlExecutionEvent from agent_control_telemetry import ( @@ -119,6 +120,7 @@ def reset_observability_state() -> None: obs._external_event_sinks.clear() for name in obs.get_registered_control_event_sink_factory_names(): obs.unregister_control_event_sink_factory(name) + obs._register_builtin_control_event_sink_factories() class TestEventBatcherInit: @@ -922,7 +924,10 @@ def test_named_sink_factory_is_selected_by_config(self): assert result is True assert sink.received_batches - assert get_registered_control_event_sink_factory_names() == ("custom",) + assert get_registered_control_event_sink_factory_names() == ( + "custom", + OTEL_CONTROL_EVENT_SINK_NAME, + ) def test_named_sink_factory_failure_disables_delivery_without_raising(self): register_control_event_sink_factory( @@ -1155,6 +1160,20 @@ def test_sdk_settings_parse_observability_sink_env(monkeypatch) -> None: assert settings.observability_sink_config == {"project": "demo"} +def test_sdk_settings_parse_otel_env(monkeypatch) -> None: + monkeypatch.setenv("AGENT_CONTROL_OTEL_ENABLED", "true") + monkeypatch.setenv("AGENT_CONTROL_OTEL_ENDPOINT", "http://collector:4318/v1/traces") + monkeypatch.setenv("AGENT_CONTROL_OTEL_HEADERS", '{"authorization":"Bearer demo"}') + monkeypatch.setenv("AGENT_CONTROL_OTEL_SERVICE_NAME", "agent-control-tests") + + settings = SDKSettings() + + assert settings.otel_enabled is True + assert settings.otel_endpoint == "http://collector:4318/v1/traces" + assert settings.otel_headers == {"authorization": "Bearer demo"} + assert settings.otel_service_name == "agent-control-tests" + + class TestShutdownObservability: """Tests for shutdown_observability function.""" diff --git a/sdks/python/tests/test_otel_sink.py b/sdks/python/tests/test_otel_sink.py new file mode 100644 index 00000000..4e9704c5 --- /dev/null +++ b/sdks/python/tests/test_otel_sink.py @@ -0,0 +1,300 @@ +"""Tests for the built-in OpenTelemetry control-event sink.""" + +from __future__ import annotations + +import os +from datetime import UTC, datetime +from unittest.mock import patch + +from agent_control import add_event, init_observability, sync_shutdown_observability +from agent_control.otel_sink import ( + OTEL_CONTROL_EVENT_SINK_NAME, + OTELControlEventSink, + OTELSDKModules, + control_event_to_otel_span, + create_otel_control_event_sink, +) +from agent_control.settings import configure_settings, get_settings +from agent_control_models import ControlExecutionEvent + + +def _make_event(**overrides: object) -> ControlExecutionEvent: + event = ControlExecutionEvent( + control_execution_id="ce-123", + trace_id="a" * 32, + span_id="b" * 16, + agent_name="test-agent", + control_id=7, + control_name="detect-pii", + check_stage="pre", + applies_to="llm_call", + action="observe", + matched=True, + confidence=0.85, + timestamp=datetime(2026, 4, 16, 12, 0, tzinfo=UTC), + execution_duration_ms=12.5, + evaluator_name="regex", + selector_path="input", + error_message=None, + metadata={"labels": ["security", "pii"], "threshold": 3, "nested": {"k": "v"}}, + ) + return event.model_copy(update=overrides) + + +class FakeSpan: + def __init__(self) -> None: + self.attributes: dict[str, object] = {} + self.end_time: int | None = None + self.exceptions: list[str] = [] + + def set_attributes(self, attributes: dict[str, object]) -> None: + self.attributes = dict(attributes) + + def record_exception(self, exc: BaseException) -> None: + self.exceptions.append(str(exc)) + + def end(self, end_time: int) -> None: + self.end_time = end_time + + +class FakeTracer: + def __init__(self) -> None: + self.calls: list[dict[str, object]] = [] + self.spans: list[FakeSpan] = [] + + def start_span( + self, + name: str, + *, + context: object = None, + kind: object = None, + start_time: int | None = None, + ) -> FakeSpan: + self.calls.append( + { + "name": name, + "context": context, + "kind": kind, + "start_time": start_time, + } + ) + span = FakeSpan() + self.spans.append(span) + return span + + +class FakeTracerProvider: + def __init__(self, *, resource: object) -> None: + self.resource = resource + self.processors: list[object] = [] + self.tracer = FakeTracer() + self.force_flush_calls = 0 + self.shutdown_calls = 0 + self.tracer_scope_name: str | None = None + + def add_span_processor(self, processor: object) -> None: + self.processors.append(processor) + + def get_tracer(self, name: str) -> FakeTracer: + self.tracer_scope_name = name + return self.tracer + + def force_flush(self) -> None: + self.force_flush_calls += 1 + + def shutdown(self) -> None: + self.shutdown_calls += 1 + + +class FakeResource: + @staticmethod + def create(attributes: dict[str, object]) -> dict[str, object]: + return {"attributes": attributes} + + +class FakeBatchSpanProcessor: + def __init__(self, exporter: object) -> None: + self.exporter = exporter + + +class FakeOTLPSpanExporter: + def __init__(self, **kwargs: object) -> None: + self.kwargs = kwargs + + +class FakeSpanContext: + def __init__( + self, + *, + trace_id: int, + span_id: int, + is_remote: bool, + trace_flags: object, + trace_state: object, + ) -> None: + self.trace_id = trace_id + self.span_id = span_id + self.is_remote = is_remote + self.trace_flags = trace_flags + self.trace_state = trace_state + + +class FakeNonRecordingSpan: + def __init__(self, span_context: FakeSpanContext) -> None: + self.span_context = span_context + + +class FakeTraceFlags(int): + SAMPLED = 1 + + +class FakeTraceState: + pass + + +class FakeSpanKind: + INTERNAL = "internal" + + +def _fake_set_span_in_context(span: FakeNonRecordingSpan) -> dict[str, object]: + return {"parent": span} + + +def _fake_otel_sdk_modules() -> OTELSDKModules: + return OTELSDKModules( + tracer_provider_cls=FakeTracerProvider, + resource_cls=FakeResource, + batch_span_processor_cls=FakeBatchSpanProcessor, + otlp_span_exporter_cls=FakeOTLPSpanExporter, + span_context_cls=FakeSpanContext, + non_recording_span_cls=FakeNonRecordingSpan, + trace_flags_cls=FakeTraceFlags, + trace_state_cls=FakeTraceState, + span_kind=FakeSpanKind, + set_span_in_context=_fake_set_span_in_context, + ) + + +def setup_function() -> None: + original_settings = get_settings().model_dump() + setup_function.original_settings = original_settings # type: ignore[attr-defined] + configure_settings( + observability_enabled=True, + observability_sink_name="default", + observability_sink_config={}, + otel_enabled=False, + otel_endpoint=None, + otel_headers={}, + otel_service_name="agent-control-sdk", + ) + os.environ.pop("OTEL_EXPORTER_OTLP_ENDPOINT", None) + os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) + + +def teardown_function() -> None: + sync_shutdown_observability() + configure_settings(**setup_function.original_settings) # type: ignore[attr-defined] + + +def test_control_event_to_otel_span_maps_event_fields() -> None: + event = _make_event(error_message="blocked") + + span = control_event_to_otel_span(event) + + assert span.trace_id == event.trace_id + assert span.parent_span_id == event.span_id + assert span.name == "agent_control.control_execution" + assert span.attributes["agent_control.control_name"] == "detect-pii" + assert span.attributes["agent_control.matched"] is True + assert span.attributes["agent_control.metadata.labels"] == ["security", "pii"] + assert span.attributes["agent_control.metadata.nested"] == '{"k": "v"}' + assert span.error_message == "blocked" + assert span.end_time_unix_nano >= span.start_time_unix_nano + + +def test_create_otel_control_event_sink_is_inert_when_disabled() -> None: + configure_settings(otel_enabled=False) + + sink = create_otel_control_event_sink({}) + result = sink.write_events([_make_event()]) + + assert result.accepted == 1 + assert result.dropped == 0 + + +def test_create_otel_control_event_sink_without_exporter_stays_inert() -> None: + configure_settings(otel_enabled=True, otel_endpoint=None) + + with patch("agent_control.otel_sink._load_otel_sdk_modules", return_value=_fake_otel_sdk_modules()): + sink = create_otel_control_event_sink({}) + + assert isinstance(sink, OTELControlEventSink) + tracer_provider = sink._tracer_provider + assert isinstance(tracer_provider, FakeTracerProvider) + assert tracer_provider.processors == [] + + +def test_create_otel_control_event_sink_uses_exporter_config_and_emits_spans() -> None: + configure_settings( + otel_enabled=True, + otel_endpoint="http://collector:4318/v1/traces", + otel_headers={"x-api-key": "secret"}, + otel_service_name="agent-control-tests", + ) + + with patch("agent_control.otel_sink._load_otel_sdk_modules", return_value=_fake_otel_sdk_modules()): + sink = create_otel_control_event_sink({}) + + assert isinstance(sink, OTELControlEventSink) + tracer_provider = sink._tracer_provider + assert isinstance(tracer_provider, FakeTracerProvider) + assert tracer_provider.resource == {"attributes": {"service.name": "agent-control-tests"}} + assert len(tracer_provider.processors) == 1 + + processor = tracer_provider.processors[0] + assert isinstance(processor, FakeBatchSpanProcessor) + assert isinstance(processor.exporter, FakeOTLPSpanExporter) + assert processor.exporter.kwargs == { + "endpoint": "http://collector:4318/v1/traces", + "headers": {"x-api-key": "secret"}, + } + + event = _make_event(error_message="rule failed") + result = sink.write_events([event]) + + assert result.accepted == 1 + assert result.dropped == 0 + assert len(tracer_provider.tracer.calls) == 1 + first_call = tracer_provider.tracer.calls[0] + assert first_call["name"] == "agent_control.control_execution" + assert first_call["kind"] == FakeSpanKind.INTERNAL + context = first_call["context"] + assert isinstance(context, dict) + parent_span = context["parent"] + assert isinstance(parent_span, FakeNonRecordingSpan) + assert parent_span.span_context.trace_id == int(event.trace_id, 16) + assert parent_span.span_context.span_id == int(event.span_id, 16) + span = tracer_provider.tracer.spans[0] + assert span.attributes["agent_control.agent_name"] == event.agent_name + assert span.attributes["agent_control.error_message"] == "rule failed" + assert span.exceptions == ["rule failed"] + + sink.flush() + sink.close() + assert tracer_provider.force_flush_calls == 1 + assert tracer_provider.shutdown_calls == 1 + + +def test_observability_uses_builtin_otel_sink_when_selected() -> None: + configure_settings( + observability_sink_name=OTEL_CONTROL_EVENT_SINK_NAME, + otel_enabled=True, + otel_endpoint="http://collector:4318/v1/traces", + ) + + with patch("agent_control.otel_sink._load_otel_sdk_modules", return_value=_fake_otel_sdk_modules()): + batcher = init_observability(enabled=True) + result = add_event(_make_event()) + + assert batcher is None + assert result is True From 923298869717f21bbe365467f163f615e4deac13 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Thu, 16 Apr 2026 15:35:12 -0700 Subject: [PATCH 2/2] add otel sink --- sdks/python/src/agent_control/otel_sink.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/python/src/agent_control/otel_sink.py b/sdks/python/src/agent_control/otel_sink.py index 0413c496..9934b44e 100644 --- a/sdks/python/src/agent_control/otel_sink.py +++ b/sdks/python/src/agent_control/otel_sink.py @@ -8,7 +8,7 @@ from collections.abc import Sequence from dataclasses import dataclass from datetime import UTC, datetime -from typing import Any +from typing import Any, cast from agent_control_models import ControlExecutionEvent, JSONObject from agent_control_telemetry.sinks import BaseControlEventSink, SinkResult @@ -215,8 +215,11 @@ def _build_parent_context(self, span_data: OTELControlEventSpan) -> object | Non ), trace_state=self._sdk_modules.trace_state_cls(), ) - return self._sdk_modules.set_span_in_context( - self._sdk_modules.non_recording_span_cls(parent_span_context) + return cast( + object, + self._sdk_modules.set_span_in_context( + self._sdk_modules.non_recording_span_cls(parent_span_context) + ), )