diff --git a/README.md b/README.md index e9674f7d..16cbb7de 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,36 @@ if __name__ == "__main__": Use `agent_control.shutdown()` or `await agent_control.ashutdown()` before process exit so short-lived scripts flush pending observability events cleanly. +External integrations can register a sink for the same finalized +control-event payloads: + +```python +from agent_control import ( + register_control_event_sink, + unregister_control_event_sink, +) +from agent_control_telemetry import BaseControlEventSink, SinkResult + + +class MyControlEventSink(BaseControlEventSink): + def write_events(self, events): + for event in events: + forward_to_external_system(event.model_dump(mode="json")) + return SinkResult(accepted=len(events), dropped=0) + + +sink = MyControlEventSink() +register_control_event_sink(sink) + +# Later, when tearing down the integration: +unregister_control_event_sink(sink) +``` + +Registered sinks receive the same local, server, and merged control-execution +events the SDK emits through its normal event-construction flow. If no +external sink is registered, the default OSS delivery path is unchanged. If one +or more sinks are registered, they replace the default built-in delivery path. + Next, create a control in Step 4, then run the setup and agent scripts in order to see blocking in action. diff --git a/sdks/python/src/agent_control/__init__.py b/sdks/python/src/agent_control/__init__.py index 1475049c..90178b46 100644 --- a/sdks/python/src/agent_control/__init__.py +++ b/sdks/python/src/agent_control/__init__.py @@ -98,11 +98,14 @@ async def handle_input(user_message: str) -> str: get_event_sink, get_log_config, get_logger, + get_registered_control_event_sinks, init_observability, is_observability_enabled, log_control_evaluation, + register_control_event_sink, shutdown_observability, sync_shutdown_observability, + unregister_control_event_sink, write_events, ) from .tracing import ( @@ -1368,6 +1371,9 @@ async def main(): "is_observability_enabled", "get_event_batcher", "get_event_sink", + "get_registered_control_event_sinks", + "register_control_event_sink", + "unregister_control_event_sink", "configure_logging", "get_log_config", "log_control_evaluation", diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 32bb1dd1..0fa07706 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -798,6 +798,8 @@ def get_stats(self) -> dict: # Global batcher instance _batcher: EventBatcher | None = None _event_sink: ControlEventSink | None = None +_external_event_sinks: list[ControlEventSink] = [] +_external_event_sinks_lock = threading.Lock() class _BatcherControlEventSink(BaseControlEventSink): @@ -828,10 +830,57 @@ def get_event_batcher() -> EventBatcher | None: def get_event_sink() -> ControlEventSink | None: - """Get the active global control-event sink.""" + """Get the active built-in control-event sink.""" return _event_sink +def register_control_event_sink(sink: ControlEventSink) -> None: + """Register an external control-event sink. + + Registered sinks receive the same finalized control-event payloads emitted + through the SDK's local, server, and merged event flows. When one or more + external sinks are registered, they replace the default built-in delivery + path. Registration is idempotent for the same sink instance. + """ + with _external_event_sinks_lock: + if sink not in _external_event_sinks: + _external_event_sinks.append(sink) + + +def unregister_control_event_sink(sink: ControlEventSink) -> None: + """Unregister a previously registered external control-event sink.""" + with _external_event_sinks_lock: + try: + _external_event_sinks.remove(sink) + except ValueError: + pass + + +def get_registered_control_event_sinks() -> tuple[ControlEventSink, ...]: + """Return the currently registered external control-event sinks.""" + with _external_event_sinks_lock: + return tuple(_external_event_sinks) + + +def _get_active_control_event_sinks() -> tuple[ControlEventSink, ...]: + """Resolve the currently active sinks. + + Observability must be enabled before any sink is considered. When enabled, + registered sinks override the default built-in sink. This keeps the current + OSS behavior intact when no sink is selected, while leaving a single + resolution seam for future config-driven sink selection. + """ + if not get_settings().observability_enabled: + return () + + registered_sinks = get_registered_control_event_sinks() + if registered_sinks: + return registered_sinks + if _event_sink is not None: + return (_event_sink,) + return () + + def init_observability( server_url: str | None = None, api_key: str | None = None, @@ -852,8 +901,9 @@ def init_observability( """ global _batcher, _event_sink - # Check if enabled is_enabled = enabled if enabled is not None else get_settings().observability_enabled + if enabled is not None: + configure_settings(observability_enabled=is_enabled) if not is_enabled: logger.debug("Observability disabled") @@ -877,7 +927,7 @@ def init_observability( def add_event(event: ControlExecutionEvent) -> bool: """ - Add an event to the global batcher. + Add an event to the active control-event sink. Args: event: Control execution event to add @@ -889,10 +939,22 @@ def add_event(event: ControlExecutionEvent) -> bool: def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult: - """Write events through the active global sink.""" - if _event_sink is None: + """Write events through the active sink selection.""" + active_sinks = _get_active_control_event_sinks() + primary_result: SinkResult | None = None + + for sink in active_sinks: + try: + result = sink.write_events(events) + except Exception: + logger.warning("Control-event sink write failed", exc_info=True) + continue + if primary_result is None: + primary_result = result + + if primary_result is None: return SinkResult(accepted=0, dropped=len(events)) - return _event_sink.write_events(events) + return primary_result def sync_shutdown_observability() -> None: @@ -915,8 +977,8 @@ async def shutdown_observability() -> None: def is_observability_enabled() -> bool: - """Check if observability is enabled and initialized.""" - return _event_sink is not None + """Check if observability is enabled and an active sink is available.""" + return bool(_get_active_control_event_sinks()) def log_span_start( diff --git a/sdks/python/tests/test_observability.py b/sdks/python/tests/test_observability.py index 740e22f9..ed326255 100644 --- a/sdks/python/tests/test_observability.py +++ b/sdks/python/tests/test_observability.py @@ -1,26 +1,31 @@ """Tests for the observability module (EventBatcher).""" import asyncio -import os -from datetime import datetime, timezone +import logging +from collections.abc import Sequence +from datetime import UTC, datetime from unittest.mock import AsyncMock, MagicMock, patch -from uuid import uuid4 import httpx import pytest - from agent_control.observability import ( EventBatcher, add_event, get_event_batcher, get_event_sink, + get_registered_control_event_sinks, init_observability, is_observability_enabled, log_span_end, log_span_start, + register_control_event_sink, shutdown_observability, + sync_shutdown_observability, + unregister_control_event_sink, ) -from agent_control.settings import get_settings +from agent_control.settings import configure_settings, get_settings +from agent_control_models import ControlExecutionEvent +from agent_control_telemetry.sinks import BaseControlEventSink, SinkResult def create_mock_event(): @@ -29,7 +34,6 @@ def create_mock_event(): mock_event.model_dump = MagicMock(return_value={ "trace_id": "a" * 32, "span_id": "b" * 16, - "agent_name": str(uuid4()), "agent_name": "test-agent", "control_id": 1, "control_name": "test-control", @@ -38,11 +42,36 @@ def create_mock_event(): "action": "observe", "matched": False, "confidence": 0.95, - "timestamp": datetime.now(timezone.utc).isoformat(), + "timestamp": datetime.now(UTC).isoformat(), }) return mock_event +class RecordingSink(BaseControlEventSink): + """Test sink that records the exact event batches it receives.""" + + def __init__(self, *, accepted: int | None = None): + self.accepted = accepted + self.received_batches: list[list[ControlExecutionEvent]] = [] + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + self.received_batches.append(list(events)) + accepted = self.accepted if self.accepted is not None else len(events) + dropped = max(len(events) - accepted, 0) + return SinkResult(accepted=accepted, dropped=dropped) + + +def reset_observability_state() -> None: + """Clear global observability state between tests.""" + import agent_control.observability as obs + + obs._batcher = None + obs._event_sink = None + configure_settings(observability_enabled=True) + with obs._external_event_sinks_lock: + obs._external_event_sinks.clear() + + class TestEventBatcherInit: """Tests for EventBatcher initialization.""" @@ -461,7 +490,10 @@ def test_send_batch_sync_returns_true_on_202(self): client_context = MagicMock() client_context.__enter__.return_value = client - with patch("agent_control.observability.httpx.Client", return_value=client_context) as client_ctor: + with patch( + "agent_control.observability.httpx.Client", + return_value=client_context, + ) as client_ctor: result = batcher._send_batch_sync([create_mock_event()]) assert result is True @@ -476,7 +508,10 @@ def test_send_batch_sync_returns_false_on_401_without_retry(self): client_context = MagicMock() client_context.__enter__.return_value = client - with patch("agent_control.observability.httpx.Client", return_value=client_context) as client_ctor: + with patch( + "agent_control.observability.httpx.Client", + return_value=client_context, + ) as client_ctor: result = batcher._send_batch_sync([create_mock_event()]) assert result is False @@ -603,54 +638,60 @@ class TestGlobalBatcher: def test_get_event_batcher_not_initialized(self): """Test get_event_batcher returns None when not initialized.""" - # Reset global state import agent_control.observability as obs + old_batcher = obs._batcher old_sink = obs._event_sink - obs._batcher = None - obs._event_sink = None + old_external_sinks = obs.get_registered_control_event_sinks() + reset_observability_state() try: assert get_event_batcher() is None finally: obs._batcher = old_batcher obs._event_sink = old_sink + for sink in old_external_sinks: + register_control_event_sink(sink) def test_get_event_sink_not_initialized(self): """Test get_event_sink returns None when not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher old_sink = obs._event_sink - obs._batcher = None - obs._event_sink = None + old_external_sinks = obs.get_registered_control_event_sinks() + reset_observability_state() try: assert get_event_sink() is None finally: obs._batcher = old_batcher obs._event_sink = old_sink + for sink in old_external_sinks: + register_control_event_sink(sink) def test_is_observability_enabled_false(self): """Test is_observability_enabled returns False when not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher old_sink = obs._event_sink - obs._batcher = None - obs._event_sink = None + old_external_sinks = obs.get_registered_control_event_sinks() + reset_observability_state() try: assert is_observability_enabled() is False finally: obs._batcher = old_batcher obs._event_sink = old_sink + for sink in old_external_sinks: + register_control_event_sink(sink) def test_add_event_without_batcher(self): """Test add_event returns False when batcher not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher old_sink = obs._event_sink - obs._batcher = None - obs._event_sink = None + old_external_sinks = obs.get_registered_control_event_sinks() + reset_observability_state() try: result = add_event(create_mock_event()) @@ -658,6 +699,112 @@ def test_add_event_without_batcher(self): finally: obs._batcher = old_batcher obs._event_sink = old_sink + for sink in old_external_sinks: + register_control_event_sink(sink) + + +class TestExternalControlEventSinks: + """Tests for vendor-neutral external control-event sink registration.""" + + def setup_method(self) -> None: + self._import_and_reset() + + def teardown_method(self) -> None: + sync_shutdown_observability() + self._import_and_reset() + + @staticmethod + def _import_and_reset() -> None: + reset_observability_state() + + def test_register_and_unregister_external_sink(self): + sink = RecordingSink() + + register_control_event_sink(sink) + register_control_event_sink(sink) + + assert get_registered_control_event_sinks() == (sink,) + assert is_observability_enabled() is True + + unregister_control_event_sink(sink) + + assert get_registered_control_event_sinks() == () + assert is_observability_enabled() is False + + def test_registered_sink_does_not_activate_when_observability_disabled(self): + sink = RecordingSink() + register_control_event_sink(sink) + configure_settings(observability_enabled=False) + + result = add_event(create_mock_event()) + + assert result is False + assert sink.received_batches == [] + assert is_observability_enabled() is False + + def test_write_events_delivers_to_external_sink_without_builtin_batcher(self): + sink = RecordingSink() + event = create_mock_event() + + register_control_event_sink(sink) + + result = add_event(event) + + assert result is True + assert sink.received_batches == [[event]] + assert get_event_sink() is None + + def test_registered_sink_overrides_builtin_sink(self): + sink = RecordingSink() + register_control_event_sink(sink) + + batcher = init_observability(enabled=True) + assert batcher is not None + batcher.add_event = MagicMock(return_value=True) + event = create_mock_event() + + result = add_event(event) + + assert result is True + batcher.add_event.assert_not_called() + assert sink.received_batches == [[event]] + + def test_external_sink_failure_does_not_fall_back_to_builtin_sink(self): + sink = RecordingSink() + sink.write_events = MagicMock(side_effect=RuntimeError("boom")) + register_control_event_sink(sink) + + batcher = init_observability(enabled=True) + assert batcher is not None + batcher.add_event = MagicMock(return_value=True) + + result = add_event(create_mock_event()) + + assert result is False + batcher.add_event.assert_not_called() + + def test_unregistering_external_sink_restores_builtin_sink(self): + sink = RecordingSink() + register_control_event_sink(sink) + + batcher = init_observability(enabled=True) + assert batcher is not None + batcher.add_event = MagicMock(return_value=True) + + unregister_control_event_sink(sink) + + result = add_event(create_mock_event()) + + assert result is True + batcher.add_event.assert_called_once() + + def test_external_only_sink_controls_write_result_when_no_builtin_sink_exists(self): + sink = RecordingSink(accepted=0) + register_control_event_sink(sink) + + result = add_event(create_mock_event()) + + assert result is False class TestInitObservability: @@ -831,8 +978,6 @@ class TestSpanLogging: def test_log_span_start(self, caplog): """Test log_span_start logs correctly.""" - import logging - from agent_control.settings import configure_settings caplog.set_level(logging.INFO) # Ensure logging is enabled @@ -845,8 +990,6 @@ def test_log_span_start(self, caplog): def test_log_span_end(self, caplog): """Test log_span_end logs correctly.""" - import logging - from agent_control.settings import configure_settings caplog.set_level(logging.INFO) # Ensure logging is enabled @@ -867,8 +1010,6 @@ def test_log_span_end(self, caplog): def test_log_span_disabled(self, caplog): """Test that logging is skipped when span logging is disabled via config.""" - import logging - from agent_control.settings import configure_settings caplog.set_level(logging.INFO) # Save original config diff --git a/sdks/python/tests/test_observability_updates.py b/sdks/python/tests/test_observability_updates.py index 6d3c84f5..f40e035a 100644 --- a/sdks/python/tests/test_observability_updates.py +++ b/sdks/python/tests/test_observability_updates.py @@ -1,12 +1,13 @@ """Tests for reconstructed control-execution events in SDK evaluation flows.""" +from collections.abc import Sequence from unittest.mock import AsyncMock, MagicMock, patch import pytest from agent_control import evaluation from agent_control.evaluation import ( - _ControlAdapter, _build_server_control_lookup, + _ControlAdapter, _has_applicable_prefiltered_server_controls, _merge_results, ) @@ -15,11 +16,27 @@ enqueue_observability_events, map_applies_to, ) +from agent_control.observability import ( + register_control_event_sink, + unregister_control_event_sink, +) +from agent_control_models import ControlDefinition, ControlExecutionEvent +from agent_control_telemetry.sinks import BaseControlEventSink, SinkResult from agent_control_telemetry.trace_context import ( clear_trace_context_provider, set_trace_context_provider, ) -from agent_control_models import ControlDefinition + + +class RecordingSink(BaseControlEventSink): + """Test sink that records delivered control events.""" + + def __init__(self) -> None: + self.received_batches: list[list[ControlExecutionEvent]] = [] + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + self.received_batches.append(list(events)) + return SinkResult(accepted=len(events), dropped=0) class TestMapAppliesTo: @@ -390,6 +407,69 @@ async def test_delivers_local_events_in_oss_mode(self): assert result.non_matches is not None assert len(result.non_matches) == 1 + @pytest.mark.asyncio + async def test_external_sink_receives_local_events(self): + from agent_control_models import ControlMatch, EvaluationResponse, EvaluatorResult, Step + + sink = RecordingSink() + register_control_event_sink(sink) + + try: + mock_response = EvaluationResponse( + is_safe=True, + confidence=1.0, + non_matches=[ + ControlMatch( + control_id=1, + control_name="test-ctrl", + action="observe", + result=EvaluatorResult(matched=False, confidence=0.1), + ) + ], + ) + mock_engine = MagicMock() + mock_engine.process = AsyncMock(return_value=mock_response) + + controls = [{ + "id": 1, + "name": "test-ctrl", + "control": { + "condition": { + "evaluator": {"name": "regex", "config": {"pattern": "test"}}, + "selector": {"path": "input"}, + }, + "action": {"decision": "observe"}, + "execution": "sdk", + }, + }] + + client = MagicMock() + client.http_client = AsyncMock() + step = Step(type="llm", name="test-step", input="hello") + + with patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), patch( + "agent_control.evaluation.list_evaluators", return_value=["regex"] + ): + await evaluation.check_evaluation_with_local( + client=client, + agent_name="agent-000000000001", + step=step, + stage="pre", + controls=controls, + trace_id="abc123", + span_id="def456", + event_agent_name="test-agent", + ) + + assert len(sink.received_batches) == 1 + assert len(sink.received_batches[0]) == 1 + event = sink.received_batches[0][0] + assert event.control_id == 1 + assert event.trace_id == "abc123" + assert event.span_id == "def456" + finally: + unregister_control_event_sink(sink) + @pytest.mark.asyncio async def test_resolves_provider_trace_context_for_local_events(self): from agent_control_models import ControlMatch, EvaluationResponse, EvaluatorResult, Step @@ -497,9 +577,13 @@ async def test_forwards_provider_trace_headers_to_server_when_ids_omitted(self): class TestCheckEvaluation: + def teardown_method(self) -> None: + clear_trace_context_provider() @pytest.mark.asyncio - async def test_check_evaluation_enqueues_reconstructed_server_events_when_observability_enabled(self): + async def test_check_evaluation_enqueues_reconstructed_server_events_when_enabled( + self, + ): from agent_control_models import Step mock_http_response = MagicMock() @@ -526,7 +610,10 @@ async def test_check_evaluation_enqueues_reconstructed_server_events_when_observ client.http_client.post = AsyncMock(return_value=mock_http_response) step = Step(type="llm", name="test-step", input="hello") - with patch("agent_control.evaluation.is_observability_enabled", return_value=True), patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue: + with ( + patch("agent_control.evaluation.is_observability_enabled", return_value=True), + patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue, + ): result = await evaluation.check_evaluation( client=client, agent_name="agent-000000000001", @@ -540,13 +627,59 @@ async def test_check_evaluation_enqueues_reconstructed_server_events_when_observ assert result.is_safe is True assert result.confidence == 0.9 + @pytest.mark.asyncio + async def test_external_sink_receives_server_events(self): + from agent_control_models import Step + + sink = RecordingSink() + register_control_event_sink(sink) + + try: + mock_http_response = MagicMock() + mock_http_response.raise_for_status = MagicMock() + mock_http_response.json.return_value = { + "is_safe": True, + "confidence": 0.9, + "matches": None, + "errors": None, + "non_matches": [ + { + "control_id": 1, + "control_name": "ctrl-1", + "action": "observe", + "control_execution_id": "ce-1", + "result": {"matched": False, "confidence": 0.1}, + } + ], + } + + client = MagicMock() + client.base_url = "http://localhost:8000" + client.http_client = AsyncMock() + client.http_client.post = AsyncMock(return_value=mock_http_response) + step = Step(type="llm", name="test-step", input="hello") + + await evaluation.check_evaluation( + client=client, + agent_name="agent-000000000001", + step=step, + stage="pre", + ) + + assert len(sink.received_batches) == 1 + assert len(sink.received_batches[0]) == 1 + assert sink.received_batches[0][0].control_execution_id == "ce-1" + finally: + unregister_control_event_sink(sink) + @pytest.mark.asyncio async def test_skips_local_event_reconstruction_when_observability_disabled(self): from agent_control_models import EvaluationResponse, Step - controls = [{ - "id": 1, - "name": "local-ctrl", + controls = [ + { + "id": 1, + "name": "local-ctrl", "control": { "condition": { "evaluator": {"name": "regex", "config": {"pattern": "test"}}, @@ -555,7 +688,8 @@ async def test_skips_local_event_reconstruction_when_observability_disabled(self "action": {"decision": "observe"}, "execution": "sdk", }, - }] + } + ] mock_response = EvaluationResponse(is_safe=True, confidence=1.0) mock_engine = MagicMock() @@ -565,7 +699,13 @@ async def test_skips_local_event_reconstruction_when_observability_disabled(self client.http_client = AsyncMock() step = Step(type="llm", name="test-step", input="hello") - with patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), patch("agent_control.evaluation.list_evaluators", return_value=["regex"]), patch("agent_control.evaluation.is_observability_enabled", return_value=False), patch("agent_control.evaluation.build_control_execution_events") as mock_build, patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue: + with ( + patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), + patch("agent_control.evaluation.list_evaluators", return_value=["regex"]), + patch("agent_control.evaluation.is_observability_enabled", return_value=False), + patch("agent_control.evaluation.build_control_execution_events") as mock_build, + patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue, + ): result = await evaluation.check_evaluation_with_local( client=client, agent_name="agent-000000000001", @@ -599,7 +739,10 @@ async def test_check_evaluation_skips_enqueue_when_observability_disabled(self): client.http_client.post = AsyncMock(return_value=mock_http_response) step = Step(type="llm", name="test-step", input="hello") - with patch("agent_control.evaluation.is_observability_enabled", return_value=False), patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue: + with ( + patch("agent_control.evaluation.is_observability_enabled", return_value=False), + patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue, + ): result = await evaluation.check_evaluation( client=client, agent_name="agent-000000000001", @@ -713,6 +856,100 @@ async def test_merged_event_mode_enqueues_reconstructed_local_and_server_events_ assert result.matches is not None assert len(result.matches) == 2 + @pytest.mark.asyncio + async def test_external_sink_receives_merged_local_and_server_events(self): + from agent_control_models import ControlMatch, EvaluationResponse, EvaluatorResult, Step + + sink = RecordingSink() + register_control_event_sink(sink) + + try: + local_response = EvaluationResponse( + is_safe=True, + confidence=1.0, + matches=[ + ControlMatch( + control_id=1, + control_name="local-ctrl", + action="observe", + result=EvaluatorResult(matched=False, confidence=0.8), + ) + ], + ) + server_response = { + "is_safe": True, + "confidence": 0.9, + "matches": [ + { + "control_id": 2, + "control_name": "server-ctrl", + "action": "observe", + "control_execution_id": "ce-server", + "result": {"matched": False, "confidence": 0.4}, + } + ], + "errors": None, + "non_matches": None, + } + controls = [ + { + "id": 1, + "name": "local-ctrl", + "control": { + "condition": { + "evaluator": {"name": "regex", "config": {"pattern": "test"}}, + "selector": {"path": "input"}, + }, + "action": {"decision": "observe"}, + "execution": "sdk", + }, + }, + { + "id": 2, + "name": "server-ctrl", + "control": { + "condition": { + "evaluator": {"name": "regex", "config": {"pattern": "test"}}, + "selector": {"path": "input"}, + }, + "action": {"decision": "observe"}, + "execution": "server", + }, + }, + ] + + mock_engine = MagicMock() + mock_engine.process = AsyncMock(return_value=local_response) + mock_http_response = MagicMock() + mock_http_response.raise_for_status = MagicMock() + mock_http_response.json.return_value = server_response + + client = MagicMock() + client.http_client = AsyncMock() + client.http_client.post = AsyncMock(return_value=mock_http_response) + step = Step(type="llm", name="test-step", input="hello") + + with patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), patch( + "agent_control.evaluation.list_evaluators", return_value=["regex"] + ): + await evaluation.check_evaluation_with_local( + client=client, + agent_name="agent-000000000001", + step=step, + stage="pre", + controls=controls, + trace_id="abc123", + span_id="def456", + event_agent_name="test-agent", + ) + + assert len(sink.received_batches) == 1 + merged_events = sink.received_batches[0] + assert len(merged_events) == 2 + assert {event.control_id for event in merged_events} == {1, 2} + finally: + unregister_control_event_sink(sink) + @pytest.mark.asyncio async def test_merged_event_mode_enqueues_local_events_before_reraising_server_failure(self): from agent_control_models import ControlMatch, EvaluationResponse, EvaluatorResult, Step @@ -765,7 +1002,12 @@ async def test_merged_event_mode_enqueues_local_events_before_reraising_server_f client.http_client.post = AsyncMock(side_effect=RuntimeError("server unavailable")) step = Step(type="llm", name="test-step", input="hello") - with patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), patch("agent_control.evaluation.list_evaluators", return_value=["regex"]), patch("agent_control.evaluation.is_observability_enabled", return_value=True), patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue: + with ( + patch("agent_control.evaluation.ControlEngine", return_value=mock_engine), + patch("agent_control.evaluation.list_evaluators", return_value=["regex"]), + patch("agent_control.evaluation.is_observability_enabled", return_value=True), + patch("agent_control.evaluation.enqueue_observability_events") as mock_enqueue, + ): with pytest.raises(RuntimeError, match="server unavailable"): await evaluation.check_evaluation_with_local( client=client,