-
Notifications
You must be signed in to change notification settings - Fork 25
feat(sdk): add external sink #175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
afe9246
8b57ca7
92cee53
d8b10aa
efd93b0
946fb5a
2c623e5
6d6cced
9025e45
d93564b
d51558c
9a36ae5
c2e9f05
04729cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -798,6 +798,8 @@ def get_stats(self) -> dict: | |
| # Global batcher instance | ||
| _batcher: EventBatcher | None = None | ||
| _event_sink: ControlEventSink | None = None | ||
| _external_event_sinks: list[ControlEventSink] = [] | ||
| _external_event_sinks_lock = threading.Lock() | ||
|
|
||
|
|
||
| class _BatcherControlEventSink(BaseControlEventSink): | ||
|
|
@@ -828,10 +830,57 @@ def get_event_batcher() -> EventBatcher | None: | |
|
|
||
|
|
||
| def get_event_sink() -> ControlEventSink | None: | ||
| """Get the active global control-event sink.""" | ||
| """Get the active built-in control-event sink.""" | ||
| return _event_sink | ||
|
|
||
|
|
||
| def register_control_event_sink(sink: ControlEventSink) -> None: | ||
| """Register an external control-event sink. | ||
|
|
||
| Registered sinks receive the same finalized control-event payloads emitted | ||
| through the SDK's local, server, and merged event flows. When one or more | ||
| external sinks are registered, they replace the default built-in delivery | ||
| path. Registration is idempotent for the same sink instance. | ||
| """ | ||
| with _external_event_sinks_lock: | ||
| if sink not in _external_event_sinks: | ||
| _external_event_sinks.append(sink) | ||
|
|
||
|
|
||
| def unregister_control_event_sink(sink: ControlEventSink) -> None: | ||
| """Unregister a previously registered external control-event sink.""" | ||
| with _external_event_sinks_lock: | ||
| try: | ||
| _external_event_sinks.remove(sink) | ||
| except ValueError: | ||
| pass | ||
|
|
||
|
|
||
| def get_registered_control_event_sinks() -> tuple[ControlEventSink, ...]: | ||
| """Return the currently registered external control-event sinks.""" | ||
| with _external_event_sinks_lock: | ||
| return tuple(_external_event_sinks) | ||
|
|
||
|
|
||
| def _get_active_control_event_sinks() -> tuple[ControlEventSink, ...]: | ||
| """Resolve the currently active sinks. | ||
|
|
||
| Observability must be enabled before any sink is considered. When enabled, | ||
| registered sinks override the default built-in sink. This keeps the current | ||
| OSS behavior intact when no sink is selected, while leaving a single | ||
| resolution seam for future config-driven sink selection. | ||
| """ | ||
| if not get_settings().observability_enabled: | ||
| return () | ||
|
|
||
| registered_sinks = get_registered_control_event_sinks() | ||
| if registered_sinks: | ||
| return registered_sinks | ||
| if _event_sink is not None: | ||
| return (_event_sink,) | ||
| return () | ||
|
|
||
|
|
||
| def init_observability( | ||
| server_url: str | None = None, | ||
| api_key: str | None = None, | ||
|
|
@@ -852,8 +901,9 @@ def init_observability( | |
| """ | ||
| global _batcher, _event_sink | ||
|
|
||
| # Check if enabled | ||
| is_enabled = enabled if enabled is not None else get_settings().observability_enabled | ||
| if enabled is not None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This changes |
||
| configure_settings(observability_enabled=is_enabled) | ||
|
|
||
| if not is_enabled: | ||
| logger.debug("Observability disabled") | ||
|
|
@@ -877,7 +927,7 @@ def init_observability( | |
|
|
||
| def add_event(event: ControlExecutionEvent) -> bool: | ||
| """ | ||
| Add an event to the global batcher. | ||
| Add an event to the active control-event sink. | ||
|
|
||
| Args: | ||
| event: Control execution event to add | ||
|
|
@@ -889,10 +939,22 @@ def add_event(event: ControlExecutionEvent) -> bool: | |
|
|
||
|
|
||
| def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult: | ||
| """Write events through the active global sink.""" | ||
| if _event_sink is None: | ||
| """Write events through the active sink selection.""" | ||
| active_sinks = _get_active_control_event_sinks() | ||
| primary_result: SinkResult | None = None | ||
|
|
||
| for sink in active_sinks: | ||
| try: | ||
| result = sink.write_events(events) | ||
| except Exception: | ||
| logger.warning("Control-event sink write failed", exc_info=True) | ||
| continue | ||
| if primary_result is None: | ||
| primary_result = result | ||
|
|
||
| if primary_result is None: | ||
| return SinkResult(accepted=0, dropped=len(events)) | ||
| return _event_sink.write_events(events) | ||
| return primary_result | ||
|
|
||
|
|
||
| def sync_shutdown_observability() -> None: | ||
|
|
@@ -915,8 +977,8 @@ async def shutdown_observability() -> None: | |
|
|
||
|
|
||
| def is_observability_enabled() -> bool: | ||
| """Check if observability is enabled and initialized.""" | ||
| return _event_sink is not None | ||
| """Check if observability is enabled and an active sink is available.""" | ||
| return bool(_get_active_control_event_sinks()) | ||
|
|
||
|
|
||
| def log_span_start( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we expose caller-registered sinks here, we also need a lifecycle story for them. Right now shutdown still only drains the built-in batcher, so any buffered or networked external sink can still drop data on process exit. I'd rather make ownership explicit here before we merge the base API.