Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/agent_control_demo/demo_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize_demo_agent() -> bool:
agent_name=AGENT_NAME,
agent_description="Demo chatbot for testing controls",
server_url=SERVER_URL,
observability_enabled=True
)
logger.info("Agent initialized successfully")
return True
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/src/agent_control/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
from typing import TYPE_CHECKING, Any

import httpx
from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult
from agent_control_telemetry.sinks import (
BaseControlEventSink,
ControlEventSink,
SinkResult,
)

from agent_control.settings import configure_settings, get_settings

Expand Down
34 changes: 23 additions & 11 deletions server/src/agent_control_server/observability/ingest/direct.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Direct event ingestor implementation.

This module provides the DirectEventIngestor, which processes events
immediately (synchronously) by storing them directly to the EventStore.
immediately by writing them to an async control-event sink. Existing
store-based callers are preserved by wrapping EventStore instances in the
default EventStoreControlEventSink internally.

For high-throughput scenarios, users can implement their own buffered
ingestor (e.g., QueuedEventIngestor, RedisEventIngestor).
Expand All @@ -11,39 +13,48 @@
import logging

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import AsyncControlEventSink

from ..sinks import EventStoreControlEventSink
from ..store.base import EventStore
from .base import EventIngestor, IngestResult

logger = logging.getLogger(__name__)


class DirectEventIngestor(EventIngestor):
"""Processes events immediately by storing them to the EventStore.
"""Processes events immediately by writing them to an async control-event sink.

This is the simplest ingestor implementation. Events are stored
directly to the database, adding ~5-20ms latency per batch.
This is the simplest ingestor implementation. Events are written
directly to the configured sink, adding ~5-20ms latency per batch.

For use cases that require lower latency or higher throughput,
implement a custom buffered ingestor (e.g., QueuedEventIngestor).

Attributes:
store: The EventStore to write events to
sink: The AsyncControlEventSink used to write events
log_to_stdout: Whether to log events as structured JSON
"""

def __init__(self, store: EventStore, log_to_stdout: bool = False):
def __init__(
self,
store: EventStore | AsyncControlEventSink,
log_to_stdout: bool = False,
):
"""Initialize the ingestor.

Args:
store: The EventStore to write events to
store: Either an EventStore or an AsyncControlEventSink implementation
log_to_stdout: Whether to log events as structured JSON (default: False)
"""
self.store = store
if isinstance(store, EventStore):
self.sink: AsyncControlEventSink = EventStoreControlEventSink(store)
else:
self.sink = store
self.log_to_stdout = log_to_stdout

async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
"""Ingest events by storing them directly to the EventStore.
"""Ingest events by writing them directly to the configured sink.

Args:
events: List of control execution events to ingest
Expand All @@ -59,8 +70,9 @@ async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
dropped = 0

try:
# Store events
processed = await self.store.store(events)
sink_result = await self.sink.write_events(events)
processed = sink_result.accepted
dropped = sink_result.dropped

# Log to stdout if enabled
if self.log_to_stdout:
Expand Down
23 changes: 23 additions & 0 deletions server/src/agent_control_server/observability/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Server-side sink implementations for observability event delivery."""

from __future__ import annotations

from collections.abc import Sequence

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import SinkResult

from .store.base import EventStore


class EventStoreControlEventSink:
"""Write events through an EventStore-backed sink."""

def __init__(self, store: EventStore):
self.store = store

async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult:
"""Write events to the underlying store and report accepted/dropped counts."""
stored = await self.store.store(list(events))
dropped = max(len(events) - stored, 0)
return SinkResult(accepted=stored, dropped=dropped)
37 changes: 37 additions & 0 deletions server/tests/test_observability_direct_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uuid import uuid4

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import SinkResult
from agent_control_server.observability.ingest.direct import DirectEventIngestor
from agent_control_server.observability.store.base import EventStore

Expand Down Expand Up @@ -37,6 +38,15 @@ async def query_events(self, query): # pragma: no cover - not used
raise NotImplementedError


class CountingSink:
def __init__(self) -> None:
self.calls: list[list[ControlExecutionEvent]] = []

async def write_events(self, events: list[ControlExecutionEvent]) -> SinkResult:
self.calls.append(events)
return SinkResult(accepted=len(events), dropped=0)


@pytest.mark.asyncio
async def test_direct_ingestor_drops_on_store_error() -> None:
# Given: an ingestor with a failing store
Expand Down Expand Up @@ -117,3 +127,30 @@ async def test_direct_ingestor_flush_noop() -> None:

# Then: no error is raised
assert True


@pytest.mark.asyncio
async def test_direct_ingestor_accepts_control_event_sink() -> None:
sink = CountingSink()
ingestor = DirectEventIngestor(sink)
events = [
ControlExecutionEvent(
trace_id="a" * 32,
span_id="b" * 16,
agent_name="agent-test-01",
control_id=1,
control_name="c",
check_stage="pre",
applies_to="llm_call",
action="observe",
matched=True,
confidence=0.9,
)
]

result = await ingestor.ingest(events)

assert result.received == 1
assert result.processed == 1
assert result.dropped == 0
assert sink.calls == [events]
1 change: 1 addition & 0 deletions telemetry/tests/test_sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import UTC, datetime

from agent_control_models import ControlExecutionEvent

from agent_control_telemetry import (
BaseAsyncControlEventSink,
BaseControlEventSink,
Expand Down
Loading