Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,36 @@ if __name__ == "__main__":

Use `agent_control.shutdown()` or `await agent_control.ashutdown()` before process exit so short-lived scripts flush pending observability events cleanly.

External integrations can register a sink for the same finalized
control-event payloads:

```python
from agent_control import (
register_control_event_sink,
unregister_control_event_sink,
)
from agent_control_telemetry import BaseControlEventSink, SinkResult


class MyControlEventSink(BaseControlEventSink):
def write_events(self, events):
for event in events:
forward_to_external_system(event.model_dump(mode="json"))
return SinkResult(accepted=len(events), dropped=0)


sink = MyControlEventSink()
register_control_event_sink(sink)

# Later, when tearing down the integration:
unregister_control_event_sink(sink)
```

Registered sinks receive the same local, server, and merged control-execution
events the SDK emits through its normal event-construction flow. If no
external sink is registered, the default OSS delivery path is unchanged. If one
or more sinks are registered, they replace the default built-in delivery path.

Next, create a control in Step 4, then run the setup and agent scripts in
order to see blocking in action.

Expand Down
6 changes: 6 additions & 0 deletions sdks/python/src/agent_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,14 @@ async def handle_input(user_message: str) -> str:
get_event_sink,
get_log_config,
get_logger,
get_registered_control_event_sinks,
init_observability,
is_observability_enabled,
log_control_evaluation,
register_control_event_sink,
shutdown_observability,
sync_shutdown_observability,
unregister_control_event_sink,
write_events,
)
from .tracing import (
Expand Down Expand Up @@ -1368,6 +1371,9 @@ async def main():
"is_observability_enabled",
"get_event_batcher",
"get_event_sink",
"get_registered_control_event_sinks",
"register_control_event_sink",
"unregister_control_event_sink",
"configure_logging",
"get_log_config",
"log_control_evaluation",
Expand Down
78 changes: 70 additions & 8 deletions sdks/python/src/agent_control/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,8 @@ def get_stats(self) -> dict:
# Global batcher instance
_batcher: EventBatcher | None = None
_event_sink: ControlEventSink | None = None
_external_event_sinks: list[ControlEventSink] = []
_external_event_sinks_lock = threading.Lock()


class _BatcherControlEventSink(BaseControlEventSink):
Expand Down Expand Up @@ -828,10 +830,57 @@ def get_event_batcher() -> EventBatcher | None:


def get_event_sink() -> ControlEventSink | None:
"""Get the active global control-event sink."""
"""Get the active built-in control-event sink."""
return _event_sink


def register_control_event_sink(sink: ControlEventSink) -> None:
"""Register an external control-event sink.

Registered sinks receive the same finalized control-event payloads emitted
through the SDK's local, server, and merged event flows. When one or more
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we expose caller-registered sinks here, we also need a lifecycle story for them. Right now shutdown still only drains the built-in batcher, so any buffered or networked external sink can still drop data on process exit. I'd rather make ownership explicit here before we merge the base API.

external sinks are registered, they replace the default built-in delivery
path. Registration is idempotent for the same sink instance.
"""
with _external_event_sinks_lock:
if sink not in _external_event_sinks:
_external_event_sinks.append(sink)


def unregister_control_event_sink(sink: ControlEventSink) -> None:
"""Unregister a previously registered external control-event sink."""
with _external_event_sinks_lock:
try:
_external_event_sinks.remove(sink)
except ValueError:
pass


def get_registered_control_event_sinks() -> tuple[ControlEventSink, ...]:
"""Return the currently registered external control-event sinks."""
with _external_event_sinks_lock:
return tuple(_external_event_sinks)


def _get_active_control_event_sinks() -> tuple[ControlEventSink, ...]:
"""Resolve the currently active sinks.

Observability must be enabled before any sink is considered. When enabled,
registered sinks override the default built-in sink. This keeps the current
OSS behavior intact when no sink is selected, while leaving a single
resolution seam for future config-driven sink selection.
"""
if not get_settings().observability_enabled:
return ()

registered_sinks = get_registered_control_event_sinks()
if registered_sinks:
return registered_sinks
if _event_sink is not None:
return (_event_sink,)
return ()


def init_observability(
server_url: str | None = None,
api_key: str | None = None,
Expand All @@ -852,8 +901,9 @@ def init_observability(
"""
global _batcher, _event_sink

# Check if enabled
is_enabled = enabled if enabled is not None else get_settings().observability_enabled
if enabled is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes enabled= from a per-call override into a process-wide mutation. After one init(..., observability_enabled=False), later init/re-init flows and later sink resolution stay disabled until something explicitly rewrites settings. I don't think we want an init-time override to poison the rest of the process like that.

configure_settings(observability_enabled=is_enabled)

if not is_enabled:
logger.debug("Observability disabled")
Expand All @@ -877,7 +927,7 @@ def init_observability(

def add_event(event: ControlExecutionEvent) -> bool:
"""
Add an event to the global batcher.
Add an event to the active control-event sink.

Args:
event: Control execution event to add
Expand All @@ -889,10 +939,22 @@ def add_event(event: ControlExecutionEvent) -> bool:


def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult:
"""Write events through the active global sink."""
if _event_sink is None:
"""Write events through the active sink selection."""
active_sinks = _get_active_control_event_sinks()
primary_result: SinkResult | None = None

for sink in active_sinks:
try:
result = sink.write_events(events)
except Exception:
logger.warning("Control-event sink write failed", exc_info=True)
continue
if primary_result is None:
primary_result = result

if primary_result is None:
return SinkResult(accepted=0, dropped=len(events))
return _event_sink.write_events(events)
return primary_result


def sync_shutdown_observability() -> None:
Expand All @@ -915,8 +977,8 @@ async def shutdown_observability() -> None:


def is_observability_enabled() -> bool:
"""Check if observability is enabled and initialized."""
return _event_sink is not None
"""Check if observability is enabled and an active sink is available."""
return bool(_get_active_control_event_sinks())


def log_span_start(
Expand Down
Loading
Loading