Skip to content

mcp: add wait_for_event tool + resource subscriptions to bridge#23

Open
soupat wants to merge 3 commits intomainfrom
feat/mcp-bridge-events
Open

mcp: add wait_for_event tool + resource subscriptions to bridge#23
soupat wants to merge 3 commits intomainfrom
feat/mcp-bridge-events

Conversation

@soupat
Copy link
Copy Markdown
Collaborator

@soupat soupat commented Apr 26, 2026

Summary

Two new event-delivery surfaces on the device-connect-agent-tools MCP
bridge, both backed by the same EventSubscriptionManager. Either lets a
dispatcher agent learn when a remote device emits an event
(progress / work_done / work_failed / …) without polling a status
function in a loop.

wait_for_event tool

  • One-call wait for a Device Connect event. Filter by event_name
    (e.g. work_done) and/or match_params (e.g. {"task_id": "T-42"}).
    Returns the matched event payload or {"timeout": true}.
  • Race-safe: a 32-event-per-device ring buffer holds recent events.
    If the matching event already fired before the wait call lands (the
    common dispatch→wait race for fast tasks), the tool returns
    immediately from the buffer. Otherwise blocks up to timeout_seconds
    for a future event.
  • Pre-warm: every invoke_device call ensures a fabric subscription
    exists for the target device before the RPC reply, so events fired
    during the invocation always land in the ring buffer for a subsequent
    wait. Pinned subs survive waiter timeouts and resource unsubscribes —
    released only on bridge shutdown.

Resource subscriptions

  • New resource template events://devices/{device_id}/latest exposes
    the most recent event payload for a given device.
  • Bridge handles resources/subscribe and pushes
    notifications/resources/updated to subscribed sessions when events
    arrive. Refcounted: one fabric subscription per device, shared across
    multiple session subscribers.
  • get_capabilities patched to advertise resources.subscribe=true
    (the underlying MCP SDK hardcodes False; FastMCP doesn't expose a
    clean override path).

Tool description tightening

  • invoke_device: nudges agents to prefer wait_for_event over
    re-invoking a status function in a poll loop.
  • wait_for_event: spells out everyday triggers ("wait", "tell me
    when", "block until"), explains race-safety, and includes a TIP
    for waiting on terminal events.

EventSubscriptionManager (mcp/event_subscriptions.py) is the
underlying implementation — encapsulates the ring buffer, waiter
queues, fabric subscription lifecycle, and resource-subscribe state.
Reusable for clients that want the push surface without going through
the bridge.

Why both surfaces

CLI MCP clients (codex CLI, Claude Code) don't actively use
resources/subscribe today — they call tools. wait_for_event covers
that case with a single tool call. The resource subscription path is
useful for MCP clients that do subscribe natively (Claude Desktop) and
is future-proofing for CLI clients as their support catches up. Both
surfaces share the same fabric subscription and ring buffer, so adding
the second surface costs nothing on the wire.

Test plan

24 unit tests in tests/test_mcp_bridge_subscriptions.py cover:

  • URI parse/build (uri_for_device, device_id_from_uri) including 5 garbage-input rejections
  • Event payload parsing — JSON-RPC envelope, bare dict, malformed bytes
  • Subject-name extraction from both NATS-style (.) and Zenoh-style (/) separators
  • Subscribe → fire event → session notified, latest cached
  • Two sessions sharing one fabric subscription
  • Refcounted unsubscribe tears down fabric sub when last subscriber leaves
  • Subscribe to unrelated URI is a no-op
  • wait_for_event returns matching event
  • wait_for_event filters by event_name
  • wait_for_event filters by match_params
  • wait_for_event returns None on timeout
  • wait_for_event releases fabric sub on lone-waiter timeout
  • wait_for_event keeps fabric sub when a session subscription still holds it
  • ensure_fabric_sub pins the sub; survives waiter timeout; released on close()
  • Race fix: event already fired before wait_for_event is called → ring-buffer hit returns immediately
  • Ring-buffer scan skips non-matching events to find an older match
24 passed in 0.55s

Live-verified end-to-end against a Jetson worker:

  • wait_for_event resolves immediately from the ring buffer for fast tasks (3/3 race-repro runs pass).
  • Resource subscriptions confirmed: notifications/resources/updated arrives at a Python MCP client subscribed to events://devices/jetson-01/latest; resources/read returns the payload.

soupat added 2 commits April 26, 2026 09:29
Two new event-delivery surfaces on the device-connect-agent-tools MCP
bridge, both backed by the same EventSubscriptionManager. Either gives
a dispatcher agent a way to know when a remote device emits an event
(progress / work_done / work_failed / etc.) without polling a status
function in a loop.

wait_for_event tool
- One-call wait for a Device Connect event from a worker. Filter by
  event_name (e.g. "work_done") and/or match_params (e.g.
  {"task_id": "T-42"}). Returns the matched event payload
  ({device_id, event_name, params}) or {"timeout": true}.
- Race-safe: a 32-event-per-device ring buffer holds recent events.
  If a matching event already fired before the wait call lands (the
  common dispatch→wait race for fast tasks), the tool returns
  immediately from the buffer. Otherwise blocks up to timeout_seconds
  for a future event.
- Pre-warm: every invoke_device call ensures a fabric subscription
  exists for the target device before the RPC reply, so events fired
  during the invocation always land in the ring buffer for a
  subsequent wait. Pinned subs survive waiter timeouts and
  resource unsubscribes — released only on bridge shutdown.

Resource subscriptions
- New resource template events://devices/{device_id}/latest exposes
  the most recent event payload for a given device.
- Bridge handles resources/subscribe and pushes
  notifications/resources/updated to subscribed sessions when events
  arrive on the fabric. Refcounted: one fabric subscription per
  device, shared across multiple session subscribers.
- get_capabilities patched to advertise resources.subscribe=true (the
  underlying MCP SDK hardcodes False; FastMCP doesn't expose a clean
  override path).

Tool description tightening
- invoke_device: nudges agents to prefer wait_for_event over
  re-invoking a status function in a poll loop.
- wait_for_event: spells out everyday triggers ("wait", "tell me
  when", "block until"), explains race-safety, and includes a TIP
  for waiting on terminal events on coding workers.

EventSubscriptionManager (mcp/event_subscriptions.py) is the underlying
implementation — encapsulates the ring buffer, waiter queues, fabric
subscription lifecycle, and resource-subscribe state. Reusable for
clients that want the push surface without going through the bridge.

Tests: 24 unit cases cover URI parse/build, JSON-RPC + bare-dict +
malformed event payloads, Zenoh-style separator parsing, subscribe +
fire + notify, multi-session refcounting, unsubscribe-then-tear-down,
no-op for unrecognized URIs, dispatch→wait race resolution via ring
buffer, ring-buffer match skipping non-matching events, pinned fabric
sub surviving waiter timeouts and released on close().
Two issues surfaced by PR #23 CI:

1. Lint: `device_id_from_uri` was imported into bridge.py but never
   used after refactoring the resource handler. Removed.

2. Fuzz tests: collection failed with ModuleNotFoundError on `mcp`.
   The fuzz CI job installs [dev,fuzz] but not [mcp]. Importing any
   submodule under device_connect_agent_tools.mcp triggers
   mcp/__init__.py → bridge.py → event_subscriptions.py, which
   eagerly imported `mcp.server.lowlevel.server.request_ctx` at
   module top level.

   Mirror bridge.py's existing fastmcp guard: wrap the mcp imports in
   try/except, set _MCP_AVAILABLE flag, and let the bridge's own
   fastmcp guard (which is checked before EventSubscriptionManager is
   ever instantiated) be the gatekeeper at runtime. The fuzz tests
   that exercise schema/jsonrpc parsers have no functional dependency
   on the mcp module — they just need the imports not to blow up at
   collection time.

   ServerSession references in type hints rely on
   `from __future__ import annotations` (already present), so they
   stay as forward refs and don't need their own guard.
soupat added a commit that referenced this pull request Apr 26, 2026
Two issues surfaced by PR #23 CI:

1. Lint: `device_id_from_uri` was imported into bridge.py but never
   used after refactoring the resource handler. Removed.

2. Fuzz tests: collection failed with ModuleNotFoundError on `mcp`.
   The fuzz CI job installs [dev,fuzz] but not [mcp]. Importing any
   submodule under device_connect_agent_tools.mcp triggers
   mcp/__init__.py → bridge.py → event_subscriptions.py, which
   eagerly imported `mcp.server.lowlevel.server.request_ctx` at
   module top level.

   Mirror bridge.py's existing fastmcp guard: wrap the mcp imports in
   try/except, set _MCP_AVAILABLE flag, and let the bridge's own
   fastmcp guard (which is checked before EventSubscriptionManager is
   ever instantiated) be the gatekeeper at runtime. The fuzz tests
   that exercise schema/jsonrpc parsers have no functional dependency
   on the mcp module — they just need the imports not to blow up at
   collection time.

   ServerSession references in type hints rely on
   `from __future__ import annotations` (already present), so they
   stay as forward refs and don't need their own guard.
Pre-existing flake on main (run 24551372160 on 2026-04-17 hit the same
assertion) — surfaced again on PR #23. The test waited a fixed 0.5s
after spawning two D2D sensors and then called discover_devices once.
Under Zenoh CI load, both peers' presence announcements don't always
propagate within that window, so discover_devices sees only one of
the two. The miss is non-deterministic — different runs miss different
sensors.

Replace the single fixed-sleep + single discover with a poll-until-
both-visible loop with a 10s deadline:
- pass refresh=True so the agent-tools 30s cache doesn't mask peers
  that show up between polls
- 0.5s between polls is plenty given Zenoh peer convergence is
  sub-second once the first announcement lands
- assertion messages now show what's actually in the device list when
  a real failure happens

Test scope unchanged: still asserts both sensors are eventually
visible. Just gives Zenoh enough time to converge.
soupat added a commit that referenced this pull request Apr 26, 2026
Pre-existing flake on main (run 24551372160 on 2026-04-17 hit the same
assertion) — surfaced again on PR #23. The test waited a fixed 0.5s
after spawning two D2D sensors and then called discover_devices once.
Under Zenoh CI load, both peers' presence announcements don't always
propagate within that window, so discover_devices sees only one of
the two. The miss is non-deterministic — different runs miss different
sensors.

Replace the single fixed-sleep + single discover with a poll-until-
both-visible loop with a 10s deadline:
- pass refresh=True so the agent-tools 30s cache doesn't mask peers
  that show up between polls
- 0.5s between polls is plenty given Zenoh peer convergence is
  sub-second once the first announcement lands
- assertion messages now show what's actually in the device list when
  a real failure happens

Test scope unchanged: still asserts both sensors are eventually
visible. Just gives Zenoh enough time to converge.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant