Skip to content

feat(event): add event subscription & consume system#615

Open
liuxinyanglxy wants to merge 2 commits intomainfrom
feat/event-consume
Open

feat(event): add event subscription & consume system#615
liuxinyanglxy wants to merge 2 commits intomainfrom
feat/event-consume

Conversation

@liuxinyanglxy
Copy link
Copy Markdown
Collaborator

@liuxinyanglxy liuxinyanglxy commented Apr 22, 2026

Summary

Adds a new event top-level command for real-time Lark event subscription & consume, built for AI-agent subprocess ergonomics. Production scope this phase: IM domain only (11 keys — message / reaction / chat member / chat lifecycle).

  • event list / event schema / event consume / event status / event stop — full CLI surface around a local _bus daemon per AppID (Unix Socket on macOS/Linux, Named Pipe on Windows), with orphan-bus detection and cleanup.
  • Schema system: SchemaDef { Native | Custom } + FieldOverrides map[JSONPointer]FieldMeta replaces the old Kind/OutputSchema/OutputType. Native schemas auto-wrap in the V2 envelope; Custom are business-authored and emitted verbatim. Struct tags desc / enum / kind drive reflected annotations; kind renders to the standard JSON Schema format keyword with a Feishu-specific value vocabulary (open_id / chat_id / email / timestamp_ms / …).
  • Subprocess contract for AI agents:
    • [event] ready event_key=<key> stderr marker before any stdout emission (parent blocks on this instead of sleep).
    • stdin EOF is a graceful shutdown signal (paired with a descriptive diagnostic so < /dev/null / nohup / systemd callers aren't mystified).
    • --max-events N / --timeout D for bounded runs; ideal for 'fetch one sample' pattern.
    • No --dry-run — event subscription is a long-running lifecycle, not a single HTTP write, so --max-events 1 --timeout 30s is the preview path (mirrors Google Workspace CLI's choice on their events subscribe).
  • Reliability: per-key buffer + workers, orphan-bus GC, dedup of SDK replays, SIGTERM/Ctrl-C trigger PreConsume cleanup (unsubscribe OAPI when applicable).

Non-goals this phase

  • Mail real-time subscription: out of scope for this PR. Continue using lark-cli mail +watch (unchanged). The framework's PreConsume / FieldOverrides / ParamMulti capabilities are complete and mail can return in a later PR without further framework changes.
  • Runtime schema validation: schemas are descriptive (AI + JQ ergonomics), not contracts.

Testing

  • go test ./cmd/event/... ./events/... ./internal/event/... all green locally
  • events/lint_test.go guards orphan FieldOverrides pointers with a synthetic canary key
  • Smoke: event schema im.message.receive_v1 --json shows enum on message_type / chat_type and format on every ID / timestamp field
  • Smoke: event schema im.chat.disbanded_v1 shows V2 envelope wrap with FieldOverrides applied
  • Smoke: event consume im.message.receive_v1 --max-events 1 --timeout 30s exits cleanly after one event / timeout
  • CI green

Summary by CodeRabbit

Release Notes

  • New Features

    • Added event consume command to subscribe to and process real-time events with support for filtering, sampling, and output formatting.
    • Added event list command to display available event types and metadata.
    • Added event schema command to view detailed event schemas and field documentation.
    • Added event status and event stop commands to manage the local event bus daemon.
    • Introduced IM domain events including message receive, read receipts, and chat membership changes.
    • Event bus now auto-starts on-demand with configurable idle shutdown and lifecycle management.
  • Documentation

    • Updated event consumption guides with examples, prerequisites, and common usage patterns for AI integration.

…detection

Introduces end-to-end Feishu event consumption via a new `lark-cli event`
command family. Users can subscribe to and consume real-time events
(IM messages, chat/member lifecycle, reactions, ...) in a forked bus
daemon architecture with orphan detection, reflected + overrideable JSON
schemas, and AI-friendly `--json` / `--jq` output.

Commands
--------
- `event list [--json]`      list subscribable EventKeys
- `event schema <key>`       Parameters + Output Schema + auth info
- `event consume <key>`      foreground blocking consume; SIGINT/SIGTERM
                             /stdin-EOF shutdown; `--max-events` /
                             `--timeout` bounded; `--jq` projection;
                             `--output-dir` spool; `--param` KV inputs
- `event status [--fail-on-orphan] [--json]`   bus daemon health
- `event stop [--all] [--force] [--json]`      stop bus daemon(s)
- `event _bus` (hidden)      forked daemon entrypoint

Architecture
------------
- Bus daemon (internal/event/bus): per-AppID forked process that holds
  the Feishu long-poll connection and fans events out to 1..N local
  consumers over an IPC socket. Drop-oldest backpressure, TOCTOU-safe
  cleanup via AcquireCleanupLock, idle-timeout self-shutdown, graceful
  SIGTERM.
- Consume client (internal/event/consume): fork+dial the daemon,
  handshake, remote preflight (HTTP /open-apis/event/v1/connection),
  JQ projection, sequence-gap detection, health probe. Bounded
  execution (`--max-events` / `--timeout`) for AI/script usage.
- Wire protocol (internal/event/protocol): newline-delimited JSON
  frames with 1 MB size cap and 5 s write deadlines. Hello / HelloAck /
  PreShutdownCheck / Shutdown / StatusQuery control messages.
- Orphan detection (internal/event/busdiscover): OS process-table scan
  (ps on Unix, PowerShell on Windows) with two-gate cmdline filter
  (lark-cli + event _bus) that naturally rejects pid-reused unrelated
  processes.
- Transport (internal/event/transport): Unix socket on darwin/linux,
  Windows named pipe on windows.
- Schema system (internal/event, internal/event/schemas): SchemaDef with
  mutually-exclusive Native (framework wraps V2 envelope) or Custom
  (zero-touch) specs. Reflection reads `desc` / `enum` / `kind` struct
  tags, with array elements diving into `items`. FieldOverrides overlay
  engine addresses paths via JSON Pointer (including `/*` array
  wildcard) and runs post-reflect, post-envelope. Lint guards orphan
  override paths.
- IM events (events/im): 11 keys — receive / read / recalled, chat and
  member lifecycle, reactions — all with per-field open_id / union_id /
  user_id / chat_id / message_id / timestamp_ms format annotations.

Robustness
----------
- Bus idle-timer race fix: re-check live conn count under lock before
  honoring the tick; Stop+drain before Reset per timer contract.
- Protocol frame cap: replace `br.ReadBytes('\n')` with `ReadFrame` that
  rejects frames > MaxFrameBytes (1 MB). Closes a DoS path where any
  local peer could grow the reader's buffer unbounded.
- Control-message writes gated by WriteTimeout (5 s) so a wedged peer
  kernel buffer can't stall writers indefinitely.
- Consume signal goroutine: `signal.Stop` + `ctx.Done` select, no leak
  across repeated invocations in the same process.
- JQ pre-flight compile so bad expressions fail before the bus fork and
  any server-side PreConsume side effects.
- `f.NewAPIClient`'s `*core.ConfigError` now passes through unwrapped
  so the actionable "run lark-cli config init" hint reaches the user.

Subprocess / AI contract
------------------------
- `event consume` emits `[event] ready event_key=<key>` on stderr once
  the bus handshake completes and events will flow. Parent processes
  block-read stderr until this line before reading stdout — no `sleep`
  fallback needed.
- All list-like commands have `--json` for structured consumption.
- Skill docs in `skills/lark-event/` (SKILL.md + references/) brief AI
  agents on the command surface, JQ against Output Schema, bounded
  execution, and subprocess lifecycle.

Testing
-------
Unit tests across bus/hub, consume loop, protocol codec, dedup,
registry, transport (Unix + Windows), schema reflection, field
overrides, pointer resolver. Integration tests cover fork startup,
shutdown, orphan detection, probe, stdin EOF, preflight, bounded
execution, and Windows busdiscover PowerShell compatibility.

Change-Id: Ib69d6d8409b33b99790081e273d4b5b01b7dbf80
Stdin-EOF silently terminating `event consume` was the biggest footgun
for daemon-style callers (`< /dev/null`, `nohup`, systemd default
`StandardInput=null`). They see the process exit immediately with no
obvious cause. watchStdinEOF now writes a descriptive stderr line
naming the cause and listing the three workarounds (`--max-events` /
`--timeout` for bounded runs, keep stdin open via `< /dev/tty` or
`< <(tail -f /dev/null)`, stop via SIGTERM instead).

- cmd/event/consume.go: watchStdinEOF takes errOut; prints diagnostic
  before cancel; honors --quiet by passing io.Discard.
- cmd/event/consume_stdin_test.go: test signature updated; new
  TestWatchStdinEOF_DiagnosticMessage pins the message content so it
  can't rot unnoticed.

Docs / repo hygiene:
- SKILL.md stdin-EOF note trimmed; `kill -9` warning rewritten to
  focus on the actual server-side consequence (PreConsume unsubscribe
  skipped → subscription leak).
- lark-event-im.md reference: call out `im.message.receive_v1` as
  the lone flat shape vs the other 10 envelope-shaped native keys.
- Removed obsolete lark-event-subscribe.md (old `+subscribe` docs).
- .gitignore: drop stale `span.log` / `.home/` entries (not in use).

Change-Id: Iddf26b2c7810be90fa47e15069b2bf733ffe54cd
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@github-actions github-actions Bot added the size/XL Architecture-level or global-impact change label Apr 22, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

📝 Walkthrough

Walkthrough

Introduces a comprehensive event-consumption system for real-time Feishu/Lark events with CLI commands to consume, list, and manage events. Includes bus daemon infrastructure, wire protocol definitions, IPC transport layer, schema resolution, event deduplication, and integration with IM event sources.

Changes

Cohort / File(s) Summary
CLI Event Commands
cmd/event/event.go, cmd/event/consume.go, cmd/event/list.go, cmd/event/schema.go, cmd/event/status.go, cmd/event/stop.go, cmd/event/bus.go
New top-level event command group with subcommands for consuming events, listing event keys, viewing schemas, checking bus status, stopping buses, and running the bus daemon. Includes comprehensive flag parsing, validation, and error handling.
Event Command Helpers
cmd/event/appmeta_err.go, cmd/event/console_url.go, cmd/event/suggestions.go, cmd/event/runtime.go, cmd/event/table.go, cmd/event/preflight_test.go
Support utilities for error formatting, console URL generation, event-key suggestions with Levenshtein distance, API client wrapping, table rendering, and preflight validation helpers.
Event Command Tests
cmd/event/*_test.go
Comprehensive test coverage for CLI commands including parameter parsing, output directory sanitization, console URL generation, schema resolution, status/stop operations, and bus lifecycle management.
Protocol & Transport
internal/event/protocol/messages.go, internal/event/protocol/codec.go, internal/event/transport/transport.go, internal/event/transport/transport_unix.go, internal/event/transport/transport_windows.go
Wire protocol message definitions (Hello, Event, StatusQuery, PreShutdownCheck, etc.), JSON codec with frame size limits, and platform-specific IPC transport via Unix sockets or Windows named pipes.
Event Bus Core
internal/event/bus/bus.go, internal/event/bus/conn.go, internal/event/bus/hub.go, internal/event/bus/log.go
Bus daemon with connection lifecycle management, event fan-out hub with subscriber tracking, per-connection state, and buffered message handling with backpressure and deduplication.
Event Bus Tests
internal/event/bus/*_test.go
Tests for bus shutdown semantics, connection I/O, hub operations, concurrent registration/unregistration, publish race conditions, and TOCTOU cleanup locks.
Event Consumption
internal/event/consume/consume.go, internal/event/consume/loop.go, internal/event/consume/handshake.go, internal/event/consume/startup.go, internal/event/consume/sink.go, internal/event/consume/jq.go, internal/event/consume/shutdown.go
Consumer-side event loop, bus discovery/startup with forking, jq-based filtering, output sinking (stdout/directory), and graceful shutdown with PreShutdownCheck protocol.
Event Consumption Tests
internal/event/consume/*_test.go
Test coverage for parameter validation, max-events/timeout bounds checking, jq compilation/execution, listening prompts, output formatting, remote connection checking, and startup orchestration.
Event Schemas
internal/event/schemas/fromtype.go, internal/event/schemas/envelope.go, internal/event/schemas/overlay.go, internal/event/schemas/pointer.go
JSON schema generation from Go types via reflection, V2 envelope wrapping for native events, field-override application via JSON Pointer paths, and recursive schema traversal with cycle detection.
Schema Tests
internal/event/schemas/*_test.go
Coverage for type-to-schema reflection, cyclic type handling, field override resolution, wildcard pointer matching, and envelope wrapping.
Event Sources
internal/event/source/source.go, internal/event/source/feishu.go, internal/event/source/sdk_log_patterns.go
Pluggable event source abstraction with Feishu WebSocket SDK integration, event validation, payload extraction, and lifecycle status notifications via log pattern matching.
Event Source Tests
internal/event/source/*_test.go
Tests for SDK integration, malformed payload handling, connection lifecycle detection, and log pattern classification.
Event Registry & Types
internal/event/types.go, internal/event/registry.go, internal/event/dedup.go
Core event type definitions (RawEvent, KeyDefinition, ParamDef), global registry with validation and normalization, and TTL-backed deduplication filter with ring buffer overflow handling.
Registry & Dedup Tests
internal/event/registry_test.go, internal/event/dedup_test.go
Coverage for schema validation, parameter constraints, auth type validation, duplicate-event detection, TTL expiry, ring eviction, and concurrent dedup operations.
IM Event Definitions
events/im/register.go, events/im/message_receive.go, events/im/native.go
IM event key registration with im.message.receive_v1 custom processor and native event keys (reactions, chat membership, chat updates) with schema field overrides.
IM Event Tests
events/im/*_test.go, events/lint_test.go
Tests for event registration metadata, message payload processing (text/interactive), native event wiring, and field override validation across all keys.
Event Startup Infrastructure
internal/event/busctl/busctl.go, internal/event/busdiscover/busdiscover.go, internal/event/busdiscover/busdiscover_unix.go, internal/event/busdiscover/busdiscover_windows.go
Bus daemon process discovery via ps (Unix) or PowerShell (Windows), status querying, and shutdown signaling.
Discovery Tests
internal/event/busdiscover/*_test.go
Coverage for command-line parsing, process table scanning, platform-specific implementations, and error handling.
App Metadata
internal/appmeta/app_version.go
Feishu Open API wrapper for fetching published app versions with event types and tenant-scope requirements.
Metadata Tests
internal/appmeta/app_version_test.go
Tests for version selection, event type/scope extraction, and published status filtering.
Integration & Utilities
internal/event/integration_test.go, internal/event/testutil/testutil.go, internal/event/consume/remote_preflight.go
End-to-end bus/consumer wiring test, fake transport and stub API client for testing, and remote connection capacity checking.
Lockfile Updates
internal/lockfile/lockfile.go, internal/lockfile/lock_unix.go, internal/lockfile/lock_windows.go, internal/lockfile/lockfile_test.go
New ErrHeld sentinel error for benign lock contention with updated error handling in Unix/Windows variants.
Root & Shortcuts
cmd/root.go, shortcuts/common/runner.go, shortcuts/common/types.go, shortcuts/event/subscribe.go
Root command wiring for new event command, hidden shortcut field support, and deprecation of event +subscribe in favor of event consume.
Event Registry Initialization
events/register.go
Package initialization function that registers IM event keys into the global registry at startup.
Documentation
skills/lark-event/SKILL.md, skills/lark-event/references/lark-event-im.md
Updated skill documentation for event consumption workflow, removed legacy subscribe documentation, and added IM event reference covering EventKey listing and jq patterns.
Dependencies
go.mod
Added github.com/Microsoft/go-winio v0.6.2 for Windows named-pipe support.

Sequence Diagram(s)

sequenceDiagram
    participant User as User / AI
    participant CLI as lark-cli event<br/>consume
    participant Bus as Event Bus<br/>Daemon
    participant Source as Event Source<br/>(Feishu WebSocket)
    participant OpenAPI as Feishu Open<br/>APIs
    
    User->>CLI: event consume im.message.receive_v1 --param=...
    CLI->>CLI: Validate EventKey, parse params
    CLI->>CLI: Check local bus socket
    alt Bus not running
        CLI->>CLI: Fork bus daemon
        CLI->>Bus: (background daemon)
    end
    CLI->>Bus: Dial IPC socket
    CLI->>Bus: Send Hello (EventKey, EventTypes)
    Bus->>Bus: Register consumer
    Bus->>Source: Start WebSocket source<br/>(if first consumer)
    Source->>OpenAPI: Authenticate with AppID/AppSecret
    Source->>OpenAPI: WebSocket subscribe to events
    Bus->>CLI: Send HelloAck
    CLI->>OpenAPI: Preflight checks<br/>(app version, scopes)
    CLI->>CLI: Run PreConsume hook (if exists)
    CLI->>User: Print [event] ready marker to stderr
    Source->>Source: Receive events from Feishu
    Source->>Bus: Emit RawEvent
    Bus->>Bus: Deduplicate by EventID
    Bus->>Bus: Route to registered consumers
    Bus->>CLI: Send protocol.Event<br/>(with ProcessFunc output)
    CLI->>CLI: Apply jq filter (optional)
    CLI->>CLI: Write to stdout/directory
    User->>CLI: Signal (SIGTERM) or stdin EOF
    CLI->>Bus: Send PreShutdownCheck
    Bus->>Bus: Check if CLI is last consumer
    Bus->>CLI: Send PreShutdownAck
    CLI->>CLI: Run cleanup hook
    CLI->>User: Exit with status 0
    Bus->>Bus: If last consumer: idle countdown<br/>→ shutdown after 30s
Loading
sequenceDiagram
    participant Cmd as cmd/event<br/>consume
    participant Startup as consume.EnsureBus
    participant Probe as probeAndDialBus
    participant Transport as IPC Transport
    participant Fork as Bus Fork
    participant Bus as Event Bus<br/>Daemon
    
    Cmd->>Startup: EnsureBus(ctx, tr, appID, ...)
    Startup->>Probe: Try to probe existing bus
    Probe->>Transport: tr.Address(appID)
    Transport-->>Probe: Unix socket path
    Probe->>Transport: tr.Dial(socketPath)
    alt Socket exists & responsive
        Probe->>Bus: StatusQuery
        Bus-->>Probe: StatusResponse
        Probe-->>Startup: net.Conn (success)
    else No bus or unresponsive
        Probe-->>Startup: error
        Startup->>Startup: Check remote connections<br/>(if APIClient provided)
        Startup->>Fork: forkBus(appID, profileName, domain)
        Fork->>Fork: Acquire bus.fork.lock<br/>(per-appID)
        Fork->>Fork: Spawn child: lark-cli event _bus
        Fork-->>Startup: PID
        Startup->>Startup: Announce forked daemon to stderr
        Startup->>Startup: Retry dial loop with deadline
        Startup->>Probe: Probe again (up to deadline)
        Probe->>Transport: tr.Dial(socketPath)
        Bus-->>Probe: StatusResponse
        Probe-->>Startup: net.Conn (success)
    end
    Startup-->>Cmd: net.Conn
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

The PR introduces substantial new infrastructure spanning protocol design, concurrent data structures (hub, dedup filter, bus lifecycle), cross-platform IPC transport, event schema reflection and validation, complex CLI orchestration with preflight checks, and event consumption loops with backpressure handling. High heterogeneity across 150+ files, intricate domain logic (deduplication, jq filtering, PreShutdownCheck protocol semantics), and dense interactions between bus, protocol, transport, and CLI layers demand careful, multi-pass review of control flow, concurrency invariants, and error propagation.

Possibly related PRs

Suggested labels

size/XL

Suggested reviewers

  • liangshuo-1

Poem

🐰 The event bus hops to life with eager ears,
Protocol frames dance through the IPC sphere,
Consumers gather 'round to hear each tale,
While dedup's ring ensures no news will fail!
From Feishu streams to blessed stdout, we feast.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/event-consume
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch feat/event-consume

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/lockfile/lockfile.go (1)

67-75: ⚠️ Potential issue | 🔴 Critical

Both tryLockFile implementations unconditionally wrap all errors with ErrHeld, masking real failures.

The Unix and Windows implementations (lock_unix.go:17 and lock_windows.go:39) use fmt.Errorf("%w ...", ErrHeld, ...) for any syscall.Flock or LockFileEx failure, not just lock contention. This contradicts the ErrHeld documentation (lockfile.go:24–26), which states callers rely on errors.Is(err, ErrHeld) to distinguish benign contention from real failures like EACCES or missing parent directories. Real failures should surface unwrapped so startup code can handle them appropriately, not be masked as lock contention.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/lockfile/lockfile.go` around lines 67 - 75, The Unix and Windows
tryLockFile implementations currently wrap every error from
syscall.Flock/LockFileEx with ErrHeld, hiding real failures; update tryLockFile
(both implementations referenced as tryLockFile in lock_unix.go and
lock_windows.go) to inspect the underlying OS error and only return
fmt.Errorf("%w: ...", ErrHeld, ...) when the errno indicates contention (e.g.,
EWOULDBLOCK/EAGAIN on Unix or the specific lock-contention Windows error codes),
otherwise return the original error unwrapped so callers can detect real
failures (use errors.Is to check for ErrHeld where needed).
🟡 Minor comments (16)
internal/event/source/source_test.go-28-37 (1)

28-37: ⚠️ Potential issue | 🟡 Minor

Clean up the global registry after the test.

TestRegister leaves the registered source behind for any later package tests. Add cleanup so test order cannot leak registry state.

🧪 Proposed test isolation fix
 func TestRegister(t *testing.T) {
 	ResetForTest()
+	t.Cleanup(ResetForTest)
 
 	src := &mockSource{name: "test-source"}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/source/source_test.go` around lines 28 - 37, TestRegister
currently registers a mockSource and relies on ResetForTest only at start,
leaving global registry state for other tests; update the test to ensure
registry is cleaned up after the test by invoking ResetForTest in a teardown
(preferably via t.Cleanup) so any call to Register in TestRegister is
undone—refer to TestRegister, Register, ResetForTest, All and mockSource when
adding the cleanup.
internal/event/source/source_test.go-49-59 (1)

49-59: ⚠️ Potential issue | 🟡 Minor

Avoid sleep-based async assertions.

time.Sleep(50ms) makes the test scheduler-dependent. Wait for the expected channel receives with a timeout instead.

🧪 Proposed deterministic wait
 	received := make(chan *event.RawEvent, 10)
-	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
 	go src.Start(ctx, nil, func(e *event.RawEvent) {
 		received <- e
 	}, nil)
 
-	time.Sleep(50 * time.Millisecond)
-	if len(received) != 2 {
-		t.Errorf("expected 2 events, got %d", len(received))
+	for i := 0; i < 2; i++ {
+		select {
+		case <-received:
+		case <-time.After(100 * time.Millisecond):
+			t.Fatalf("expected 2 events, got %d", i)
+		}
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/source/source_test.go` around lines 49 - 59, Replace the
sleep-based assertion with a deterministic receive loop: instead of sleeping
after starting src.Start, repeatedly read from the received channel (the
make(chan *event.RawEvent, 10) named received) until you have collected the two
expected events (or until a test timeout triggers) using a select that includes
a time.After deadline, and fail the test if the timeout fires; keep the
src.Start invocation and use the channel receives to drive the assertion so the
test no longer depends on time.Sleep.
cmd/event/appmeta_err.go-32-35 (1)

32-35: ⚠️ Potential issue | 🟡 Minor

Truncate by rune to keep localized errors valid UTF-8.

Line 34 slices by byte, so Chinese or other multibyte error text can be cut mid-rune and produce malformed stderr for agents/users. Prefer rune-aware truncation.

🛠️ Proposed fix
 	const maxErrLen = 200
-	if len(msg) > maxErrLen {
-		return msg[:maxErrLen] + "…"
+	runes := []rune(msg)
+	if len(runes) > maxErrLen {
+		return string(runes[:maxErrLen]) + "…"
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/appmeta_err.go` around lines 32 - 35, The current truncation slices
the string by bytes (using msg[:maxErrLen]) which can split multibyte runes and
produce invalid UTF-8; change the truncation to be rune-aware by converting msg
to a rune slice (e.g. r := []rune(msg)), then if len(r) > maxErrLen return
string(r[:maxErrLen]) + "…" (using the same maxErrLen constant) so
localized/multibyte error text remains valid UTF-8; update the code around the
const maxErrLen and the truncation branch that references msg to use this
rune-based approach.
internal/event/bus/log.go-48-53 (1)

48-53: ⚠️ Potential issue | 🟡 Minor

Return a closer for bus.log and close it on daemon exit.

The file opened by SetupBusLogger is never explicitly closed. log.Logger does not own the file handle, so it remains open until process exit, preventing proper log sync during graceful shutdown. Return the closer alongside the logger and have the command handler defer it during cleanup.

♻️ Proposed signature change
-func SetupBusLogger(eventsDir string) (*log.Logger, error) {
+func SetupBusLogger(eventsDir string) (*log.Logger, func() error, error) {

At the call site in cmd/event/bus.go, defer the closer:

-logger, err := bus.SetupBusLogger(eventsDir)
-if err != nil {
-	return err
-}
+logger, closer, err := bus.SetupBusLogger(eventsDir)
+if err != nil {
+	return err
+}
+defer closer()

A test must verify that the closer is non-nil and can be invoked.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/bus/log.go` around lines 48 - 53, Modify SetupBusLogger to
return the opened file as an io.Closer alongside the *log.Logger and error
(e.g., change signature to return (*log.Logger, io.Closer, error)); keep opening
the file with vfs.OpenFile as f, return log.New(f, "", log.LstdFlags) and f as
the closer; update the caller in the command handler (e.g., where SetupBusLogger
is invoked in the bus command) to capture the closer and defer closer.Close()
during daemon shutdown; add a unit test that calls SetupBusLogger and asserts
the returned closer is non-nil and that calling Close on it succeeds.
internal/event/transport/transport_unix.go-46-48 (1)

46-48: ⚠️ Potential issue | 🟡 Minor

Add explicit path length validation to prevent AF_UNIX socket path overflow.

The socket path core.GetConfigDir()/events/<appID>/bus.sock can exceed the Unix-domain socket limit (~104 bytes on macOS, ~108 on Linux) when LARKSUITE_CLI_CONFIG_DIR is set to a very long path. While Bus.Run() does handle the Listen error, this failure mode is difficult to diagnose without explicit validation. Add a length check in Address() that validates the path before use, returning an error with a clear message if the path exceeds the platform limit, or derive a short socket path from a hash of the config directory and app ID.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/transport/transport_unix.go` around lines 46 - 48, The
Address() method in unixTransport must validate the full socket path length
built from core.GetConfigDir(), "events", sanitizeAppID(appID), and "bus.sock"
to avoid AF_UNIX path overflow; update unixTransport.Address to either return
(string, error) or otherwise ensure callers handle errors, compute the candidate
path, check its byte length against a platform limit constant (e.g. ~104/108),
and if it exceeds the limit either produce a deterministic short path (e.g. use
a hash of core.GetConfigDir()+appID and place the socket in a short base dir) or
return a descriptive error (used by Bus.Run()) explaining the path is too long
and suggesting using a shorter config dir; reference unixTransport.Address,
sanitizeAppID, core.GetConfigDir and ensure Bus.Run or its caller handles the
new error return.
internal/event/consume/bounded_test.go-67-72 (1)

67-72: ⚠️ Potential issue | 🟡 Minor

Wait for the timeout instead of sleeping.

This test can be made deterministic by blocking on ctx.Done(); sleeping past the deadline still depends on timer scheduling under load.

🧪 Proposed flake-resistant timeout wait
 	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
 	defer cancel()
-	time.Sleep(5 * time.Millisecond)
+	select {
+	case <-ctx.Done():
+	case <-time.After(100 * time.Millisecond):
+		t.Fatal("context did not time out")
+	}
 
 	opts := Options{MaxEvents: 5, Timeout: 1 * time.Millisecond}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/bounded_test.go` around lines 67 - 72, The test
currently sleeps past the deadline which is flaky; replace the time.Sleep(5 *
time.Millisecond) with a deterministic wait on the context by blocking on
<-ctx.Done() so the goroutine waits exactly until the timeout/cancellation; keep
ctx, cancel and Options as-is and then call exitReason(ctx, 0, opts) to compute
the reason. Ensure you still defer cancel() and do not remove the context
timeout setup in bounded_test.go (use the existing ctx variable and
Options{MaxEvents: 5, Timeout: 1 * time.Millisecond}).
internal/event/transport/transport_test.go-88-89 (1)

88-89: ⚠️ Potential issue | 🟡 Minor

Check Listen before closing the listener.

Ignoring the error can turn a listen failure into a nil-pointer panic and hide the useful diagnostic.

🧪 Proposed error check
-	ln, _ := tr.Listen(addr)
-	ln.Close()
+	ln, err := tr.Listen(addr)
+	if err != nil {
+		t.Fatalf("listen: %v", err)
+	}
+	if err := ln.Close(); err != nil {
+		t.Fatalf("close listener: %v", err)
+	}
 	tr.Cleanup(addr)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/transport/transport_test.go` around lines 88 - 89, The test
calls tr.Listen(addr) but ignores its error and immediately calls ln.Close(),
risking a nil-pointer panic; update the test to capture and check the error
returned by tr.Listen (e.g., ln, err := tr.Listen(addr)), fail the test
(t.Fatalf or t.Fatalf-like helper) if err != nil, and only call ln.Close() when
ln is non-nil (or defer ln.Close() after the successful listen) so failures are
reported clearly instead of causing a panic.
internal/event/source/feishu_log_test.go-52-56 (1)

52-56: ⚠️ Potential issue | 🟡 Minor

Assert handler errors in these tests.

Several calls ignore the returned error, so a future regression could return an error without failing the test. Please check err consistently like the malformed JSON test does.

🧪 Proposed error checks
 	// Missing event_id
 	req := &larkevent.EventReq{Body: []byte(`{"header":{"event_type":"im.receive"}}`)}
-	handler(context.Background(), req)
+	if err := handler(context.Background(), req); err != nil {
+		t.Fatalf("handler returned err for missing event_id: %v", err)
+	}
 	// Missing event_type
 	req2 := &larkevent.EventReq{Body: []byte(`{"header":{"event_id":"abc"}}`)}
-	handler(context.Background(), req2)
+	if err := handler(context.Background(), req2); err != nil {
+		t.Fatalf("handler returned err for missing event_type: %v", err)
+	}
 	req := &larkevent.EventReq{Body: nil}
-	handler(context.Background(), req)
+	if err := handler(context.Background(), req); err != nil {
+		t.Fatalf("handler returned err: %v", err)
+	}
 	body := []byte(`{"header":{"event_id":"evt-42","event_type":"im.message.receive_v1","create_time":"1700000000000"}}`)
-	handler(context.Background(), &larkevent.EventReq{Body: body})
+	if err := handler(context.Background(), &larkevent.EventReq{Body: body}); err != nil {
+		t.Fatalf("handler returned err: %v", err)
+	}
 	s := &FeishuSource{Logger: nil} // no logger
 	handler := s.buildRawHandler(func(_ *event.RawEvent) {})
-	handler(context.Background(), &larkevent.EventReq{Body: []byte("bad json")})
+	if err := handler(context.Background(), &larkevent.EventReq{Body: []byte("bad json")}); err != nil {
+		t.Fatalf("handler returned err: %v", err)
+	}

Also applies to: 76-77, 95-96, 119-120

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/source/feishu_log_test.go` around lines 52 - 56, The tests
call handler(context.Background(), req) without checking its returned error;
update those calls (including the ones at the shown block and the other
occurrences around lines 76-77, 95-96, and 119-120) to capture the returned err
(e.g., err := handler(...)) and assert it using the same pattern as the
malformed JSON test (e.g., require.Error/require.NotNil or t.Fatalf/t.Errorf) so
any unexpected error will fail the test; target the handler symbol and each
corresponding req/req2 invocation when making the changes.
internal/event/transport/transport_test.go-35-50 (1)

35-50: ⚠️ Potential issue | 🟡 Minor

Propagate accept errors to avoid a hung test.

If Accept fails, the goroutine exits without sending and Line 49 blocks until the global test timeout. Send the accept result and use a bounded wait.

🧪 Proposed accept result handling
-	accepted := make(chan net.Conn, 1)
+	type acceptResult struct {
+		conn net.Conn
+		err  error
+	}
+	accepted := make(chan acceptResult, 1)
 	go func() {
 		conn, err := ln.Accept()
-		if err == nil {
-			accepted <- conn
-		}
+		accepted <- acceptResult{conn: conn, err: err}
 	}()
@@
-	serverConn := <-accepted
+	var res acceptResult
+	select {
+	case res = <-accepted:
+	case <-time.After(2 * time.Second):
+		t.Fatal("accept timed out")
+	}
+	if res.err != nil {
+		t.Fatalf("accept: %v", res.err)
+	}
+	serverConn := res.conn
 	defer serverConn.Close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/transport/transport_test.go` around lines 35 - 50, The accept
goroutine can exit on error and leave the test blocked; change it to send the
accept outcome (conn and err) over a result channel (replace accepted chan
net.Conn with a chan struct{conn net.Conn; err error}) so the main test receives
both success and failure instead of blocking; after tr.Dial, wait on that result
with a bounded timeout (e.g., select with time.After) and fail the test if
accept returned an error or the timeout fires; update uses of accepted/
serverConn to handle the result struct accordingly (references: the accept
goroutine calling ln.Accept, the accepted channel, and the serverConn usage).
internal/event/schemas/pointer.go-37-62 (1)

37-62: ⚠️ Potential issue | 🟡 Minor

Decode JSON Pointer escape sequences per RFC 6901.

Override keys are described as RFC 6901 JSON Pointer paths, but path tokens are matched without decoding ~1/ and ~0~. Valid pointers for fields containing / or ~ will silently miss. Add a decode function and apply it during token resolution.

🐛 Proposed token decoding
 import "strings"
+
+func decodePointerToken(token string) string {
+	// RFC 6901 requires ~1 to be decoded before ~0.
+	return strings.NewReplacer("~1", "/", "~0", "~").Replace(token)
+}
@@
 	for _, part := range parts {
+		part = decodePointerToken(part)
 		next := []map[string]interface{}{}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/schemas/pointer.go` around lines 37 - 62, The pointer token
matching must decode RFC6901 escape sequences so tokens like "~1" and "~0" are
interpreted as "/" and "~"; add a helper decode function (e.g.,
decodeJSONPointerToken) implementing "~1"→"/" and "~0"→"~" and call it at the
start of the loop that iterates over parts (the for _, part := range parts loop)
before any comparisons (including the part == "*" check) or map lookups
(properties[part], items handling). Ensure you use the decoded token for all
subsequent operations (next item selection, props lookup, child extraction) so
fields containing '/' or '~' are matched correctly.
internal/event/consume/jq.go-43-56 (1)

43-56: ⚠️ Potential issue | 🟡 Minor

Reject multi-output jq expressions at compile time for clarity.

applyJQ is designed as an event filter where each event produces zero or one output (nil for filtered-out events, json.RawMessage for passing events). While gojq supports multi-output expressions like .items[], the function's single-return signature enforces single-output semantics. To prevent silent data loss when users mistakenly write such expressions, add validation in CompileJQ that rejects any expression that produces multiple outputs, with a clear error message guiding them to use a filter that returns a single value.

Suggested validation approach
 func CompileJQ(expr string) (*gojq.Code, error) {
 	query, err := gojq.Parse(expr)
 	if err != nil {
 		return nil, fmt.Errorf("invalid jq expression: %w", err)
 	}
 	code, err := gojq.Compile(query)
 	if err != nil {
 		return nil, fmt.Errorf("jq compile error: %w", err)
 	}
+	// Validate single-output semantics by test-running on empty input.
+	// If the expression produces multiple outputs (e.g. `.items[]`),
+	// reject it early rather than silently truncating at runtime.
+	iter := code.Run(map[string]interface{}{})
+	if _, ok := iter.Next(); ok {
+		if _, ok := iter.Next(); ok {
+			return nil, fmt.Errorf("jq expression produces multiple outputs; use a filter that returns a single value (e.g., wrap in [...])")
+		}
+	}
 	return code, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/jq.go` around lines 43 - 56, CompileJQ must reject
queries that can produce multiple outputs: after compiling the query (the value
returned by CompileJQ), run the compiled code with a placeholder input using
code.Run(nil) and inspect the iterator (iter.Next()); if a first value exists
and a second iter.Next() also returns ok==true then return a clear error from
CompileJQ stating the jq expression may produce multiple outputs and must return
a single value (this prevents applyJQ's single-return contract from silently
dropping results). Ensure the error message references the problematic
expression and that applyJQ continues to assume single-output behavior.
internal/event/busdiscover/busdiscover.go-40-61 (1)

40-61: ⚠️ Potential issue | 🟡 Minor

Regex should accept both --profile= and --profile flag forms.

appIDPattern requires \s+ between --profile and the value. The current buildForkArgs in internal/event/consume/startup.go always builds the argv as separate array elements (["--profile", profileName]), which produces the space-separated form in the cmdline. However, the regex defensively should accept both --profile=cli_x and --profile cli_x forms. If the fork invocation syntax ever changes or a developer manually invokes the hidden daemon with the = form, the process would pass the two substring gates but fail the regex match, silently breaking orphan detection for that process.

🛠️ Proposed fix
-var appIDPattern = regexp.MustCompile(`--profile\s+(cli_[a-zA-Z0-9_]+)`)
+var appIDPattern = regexp.MustCompile(`--profile[=\s]+(cli_[a-zA-Z0-9_]+)`)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/busdiscover/busdiscover.go` around lines 40 - 61, The
appIDPattern in this file only matches a space between the --profile flag and
the value, so parseAppIDFromCmdline can fail when the flag is passed as
--profile=cli_x; update the regex used by appIDPattern to accept either an
equals sign or whitespace between --profile and the AppID (e.g., use a
non-capturing group like (?:=|\s+) before the capture group for cli_...), then
keep parseAppIDFromCmdline as-is so it will correctly extract the AppID for both
"--profile cli_xyz" and "--profile=cli_xyz" forms.
internal/event/schemas/fromtype.go-163-185 (1)

163-185: ⚠️ Potential issue | 🟡 Minor

Anonymous-field flattening diverges from encoding/json when the embed has a JSON name tag.

encoding/json treats an anonymous struct field with an explicit JSON name tag (e.g. Inner Inner `json:"inner"` ) as a named field holding a nested object — not as a promotion/flatten. collectFields currently flattens all anonymous struct fields unconditionally, so the generated schema will have Inner's fields promoted at the parent level while the actual JSON wire form nests them under "inner". For SDK event structs this is the exact case where schemas silently lie to AI agents.

Additionally, the package doc at Lines 12-14 says "embedded anonymous structs… are skipped", which matches neither the flattening code here nor the tag-aware path encoding/json implements. Please align the doc with whichever behavior lands.

🧰 Suggested handling
-		// Anonymous embed: recurse into its fields (after unwrapping pointer).
-		// This must come before the exported check because reflect reports
-		// an anonymous field of a lowercase type as unexported, yet its
-		// exported fields still promote through encoding/json.
-		if f.Anonymous {
+		// Anonymous embed: encoding/json promotes the inner fields ONLY when
+		// there's no explicit json name tag; a tagged anonymous field is a
+		// normal named field carrying a nested object. Mirror that here so
+		// the schema matches the wire shape.
+		if f.Anonymous && parseJSONTag(f) == f.Name {
 			embedded := f.Type
 			for embedded.Kind() == reflect.Ptr {
 				embedded = embedded.Elem()
 			}
 			if embedded.Kind() == reflect.Struct {
 				collectFields(embedded, props, visiting, cache)
 			}
 			continue
 		}

Note: parseJSONTag returns f.Name when there's no json tag at all, which is the "untagged embed" signal we want to key on.

cmd/event/status.go-349-358 (1)

349-358: ⚠️ Potential issue | 🟡 Minor

--fail-on-orphan exits with no actionable error body; prefer output.ErrWithHint.

output.ErrBare(ExitValidation) sets the exit code but carries no message or hint. For scripted/--json consumers and AI agents, the failure comes through as exit 2 with nothing to act on — they'd have to re-parse the preceding JSON to find which AppID is orphaned and what to run. Since statuses already has the orphan PIDs in hand, it's cheap to surface the suggested action directly in the error.

As per coding guidelines in cmd/**/*.go: "Use output.Errorf or output.ErrWithHint in RunE functions instead of bare fmt.Errorf to ensure structured errors that AI agents can parse" and "Make error messages structured, actionable, and specific for AI agent parsing".

🧰 Suggested shape
 func exitForOrphan(statuses []appStatus, failOnOrphan bool) error {
 	if !failOnOrphan {
 		return nil
 	}
-	for _, s := range statuses {
-		if s.State == stateOrphan {
-			return output.ErrBare(output.ExitValidation)
-		}
-	}
-	return nil
+	var orphans []appStatus
+	for _, s := range statuses {
+		if s.State == stateOrphan {
+			orphans = append(orphans, s)
+		}
+	}
+	if len(orphans) == 0 {
+		return nil
+	}
+	ids := make([]string, 0, len(orphans))
+	kills := make([]string, 0, len(orphans))
+	for _, s := range orphans {
+		ids = append(ids, s.AppID)
+		kills = append(kills, fmt.Sprintf("kill %d", s.PID))
+	}
+	return output.ErrWithHint(
+		output.ExitValidation,
+		fmt.Sprintf("orphan bus detected for %d app(s): %s", len(orphans), strings.Join(ids, ", ")),
+		fmt.Sprintf("Run: %s", strings.Join(kills, " && ")),
+	)
 }

(Exact output.ErrWithHint signature may differ — adjust to match the helper used elsewhere in cmd/event/*.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/status.go` around lines 349 - 358, The exitForOrphan function
currently returns output.ErrBare when any appStatus has State == stateOrphan;
change this to return a structured error via output.ErrWithHint (or
output.Errorf if ErrWithHint not available) that includes the orphan AppID(s)
extracted from the statuses slice and a concrete next step hint (e.g., command
to reconcile or inspect the orphaned app), so callers of exitForOrphan receive a
parseable message and actionable hint; update the return path in exitForOrphan
to build a message listing the orphan IDs and pass the suggested action into
ErrWithHint/Errorf while leaving behavior when failOnOrphan is false unchanged.
internal/event/protocol/codec.go-62-85 (1)

62-85: ⚠️ Potential issue | 🟡 Minor

ReadFrame API has asymmetric lifetime guarantees — fix or document the contract.

The fast path (err == nil && len(buf) == 0) returns a slice directly from bufio.Reader's internal buffer, which is invalid after the next ReadFrame call. The slow path returns a fresh allocation via append. Verification confirms all five current call sites (handshake.go:27, conn.go:138, startup.go:128, busctl.go:42, bus.go:256) decode and discard the bytes immediately, so this is latent. However, the inconsistent contract invites misuse by future callers who might batch frames or pass bytes to concurrent code. Either explicitly document the "valid until next ReadFrame" guarantee in the doc-comment, or always return a copy for uniform semantics.

🛡️ Suggested uniform-copy fix
 	case nil:
-		if len(buf) == 0 {
-			// Fast path: whole frame fit in br's internal buffer.
-			return chunk, nil
-		}
-		if len(buf)+len(chunk) > MaxFrameBytes {
+		if len(buf)+len(chunk) > MaxFrameBytes {
 			return nil, fmt.Errorf("protocol: frame exceeds %d bytes", MaxFrameBytes)
 		}
-		return append(buf, chunk...), nil
+		// Always copy: ReadSlice's returned slice is only valid until the
+		// next read, but consumers (e.g. Decode stashing the bytes) may
+		// outlive that window.
+		out := make([]byte, len(buf)+len(chunk))
+		copy(out, buf)
+		copy(out[len(buf):], chunk)
+		return out, nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/protocol/codec.go` around lines 62 - 85, ReadFrame currently
returns a slice directly into bufio.Reader's internal buffer on the fast path
(err == nil && len(buf) == 0), which makes the returned bytes invalid after the
next ReadFrame call; change the fast path to return a copy instead so the API
uniformly hands callers an independent []byte (e.g. allocate a new slice and
copy chunk) and keep existing overflow checks using MaxFrameBytes; update
ReadFrame's implementation (function ReadFrame, symbol MaxFrameBytes) to always
return a freshly allocated slice to provide stable lifetime semantics for
callers.
internal/event/consume/consume.go-179-202 (1)

179-202: ⚠️ Potential issue | 🟡 Minor

validateParams panics when params is nil and a key defines defaults.

Line 182 writes into params unconditionally, but nil maps panic on assignment. A library caller (or future refactor) that passes nil for a key whose definition has any Default != "" param will crash here instead of getting a clean error. The cmd layer happens to pass an initialized map today, but this is a fragile contract for an exported helper.

🛡️ Proposed guard
 func validateParams(def *event.KeyDefinition, params map[string]string) error {
+	// Callers may legitimately pass nil when no --param flags were given.
+	// We only mutate on default-fill, so lazily allocate.
 	for _, p := range def.Params {
 		if _, ok := params[p.Name]; !ok && p.Default != "" {
+			if params == nil {
+				// Note: this only fills locally; caller's nil reference stays nil.
+				// If that's undesirable, require *map[string]string instead.
+				params = map[string]string{}
+			}
 			params[p.Name] = p.Default
 		}
 	}

Alternatively, document that params must be non-nil and validate that up front with a clear error.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/consume.go` around lines 179 - 202, validateParams
currently writes into the params map and will panic if callers pass nil; add an
explicit nil guard at the top of validateParams (func validateParams(def
*event.KeyDefinition, params map[string]string) error) that returns a clear
error like "params cannot be nil" (or similar) so callers get a controlled error
instead of a panic; reference validateParams, params and event.KeyDefinition
when locating the change.
🧹 Nitpick comments (21)
.gitignore (1)

38-38: Add trailing newline at end of file.

The file is missing a trailing newline, which may cause Git warnings and violates POSIX convention for text files.

📝 Proposed fix
-app.log
+app.log
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.gitignore at line 38, Add a single trailing newline to the end of the
.gitignore file so the file ends with a newline character (e.g., after the
"app.log" entry); this satisfies POSIX text file conventions and removes Git
warnings about missing end-of-file newline.
internal/event/consume/sink_test.go (1)

108-137: LGTM; minor: also assert ns/seq segments.

Good coverage. Consider tightening TestDirSink_FilenameFormat to also verify that parts[0] (ns) and parts[2] (seq, after stripping .json) are numeric, so a refactor that swaps the segment order or stringifies a non-numeric ns still trips the test.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/sink_test.go` around lines 108 - 137,
TestDirSink_FilenameFormat currently only asserts the PID segment; update the
test to also validate that parts[0] (ns) and parts[2] (seq) are numeric: after
splitting the trimmed filename in TestDirSink_FilenameFormat, check that
parts[0] parses as an integer (e.g., via strconv.Atoi) and that parts[2] parses
as an integer (ensure you strip ".json" already done), and fail the test with a
clear message if either parse returns an error so swaps or non-numeric segments
are caught.
internal/event/consume/loop_seq_test.go (1)

16-38: Drift risk: test harness duplicates production logic instead of exercising it.

seqGapDetector is a hand-written copy of the inline gap-detection block inside consumeLoop's reader goroutine. The comment acknowledges this ("If this logic changes in loop.go, update here too"), but that means these tests can stay green while the real code regresses (or vice-versa). Prefer extracting the production logic into a small package-private type/function (e.g. seqGapDetector in loop.go) and calling it from both the reader goroutine and these tests — then remove the duplicate. This is the exact scenario the regression test at lines 78-87 is trying to prevent, so wiring it to the real code is especially valuable here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/loop_seq_test.go` around lines 16 - 38, The test
duplicates the gap-detection logic; extract that logic into a single
package-private seqGapDetector type (or observeSeqGap function) in loop.go and
have the consumeLoop reader goroutine call that implementation, then update
loop_seq_test.go to construct/use that package-private
seqGapDetector/observeSeqGap (instead of its local copy) so tests exercise the
real production logic (keep the same fields/method names: seqGapDetector,
observe, lastSeq, quiet, errOut and ensure consumeLoop uses the new shared
implementation).
shortcuts/event/subscribe.go (1)

88-92: LGTM — deprecation path is appropriate.

Hiding the superseded event +subscribe while keeping it executable is a clean migration approach. Consider also adding a DEPRECATED line to Description (surfaces in any tooling that lists shortcuts directly) and/or a one-line stderr notice in Execute pointing users at event consume.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@shortcuts/event/subscribe.go` around lines 88 - 92, Add a clear deprecation
notice to the shortcut so users and tooling see it: update the Description field
in subscribe.go (the shortcut for "event +subscribe") to prepend or append a
"DEPRECATED: use `event consume`" line, and in the Execute method emit a
single-line stderr notice (e.g., via fmt.Fprintln(os.Stderr, ...)) that informs
users the command is deprecated and points them to "event consume"; keep the
existing Hidden: true behavior and ensure the message references the shortcut
name ("event +subscribe") and the replacement ("event consume").
cmd/event/table.go (1)

15-44: Non-ASCII event descriptions will misalign columns.

Event descriptions like "接收 IM 消息…" in the registry use non-ASCII characters. The current tableWidths and printTableRow use byte-length calculations (len()), which measure CJK characters as 3 bytes but display as 2 columns. This causes padding to be computed wrong and columns to misalign.

Since github.com/mattn/go-runewidth is already a dependency (go.mod line 51), use runewidth.StringWidth() for both width calculation and adjust printTableRow to compute padding manually since Go's fmt.Fprintf("%-*s", ...) always pads by byte count, not display width:

Switch to runewidth
 import (
 	"fmt"
 	"io"
+	"strings"
+
+	"github.com/mattn/go-runewidth"
 )
@@
-	for i, h := range headers {
-		widths[i] = len(h)
-	}
+	for i, h := range headers {
+		widths[i] = runewidth.StringWidth(h)
+	}
@@
-			if l := len(cell); l > widths[i] {
-				widths[i] = l
-			}
+			if l := runewidth.StringWidth(cell); l > widths[i] {
+				widths[i] = l
+			}
@@
-		fmt.Fprintf(out, "%-*s%s", widths[i], cell, gap)
+		pad := widths[i] - runewidth.StringWidth(cell)
+		if pad < 0 {
+			pad = 0
+		}
+		fmt.Fprint(out, cell, strings.Repeat(" ", pad), gap)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/table.go` around lines 15 - 44, tableWidths currently uses len() so
CJK/multibyte text miscomputes column widths; replace len(h) and len(cell) with
runewidth.StringWidth(...) to compute display widths in tableWidths. In
printTableRow stop using fmt.Fprintf with "%-*s" (which pads by bytes) and
instead for each non-final cell compute pad := widths[i] -
runewidth.StringWidth(cell) then write cell followed by strings.Repeat(" ",
max(0,pad)) and gap to out; keep the final-cell behavior (fmt.Fprintln(out,
cell)). Add necessary imports (github.com/mattn/go-runewidth and strings) and
ensure padding never goes negative.
internal/event/protocol/codec_test.go (2)

62-69: Check the Encode error and guard against empty buffer.

Encode's returned error is silently dropped; if encoding ever fails the next line indexes buf.Bytes()[buf.Len()-1] which would panic with an unhelpful message rather than a clear test failure.

Proposed tweak
 func TestEncodeAddsNewline(t *testing.T) {
 	msg := &Bye{Type: MsgTypeBye}
 	var buf bytes.Buffer
-	Encode(&buf, msg)
-	if buf.Bytes()[buf.Len()-1] != '\n' {
+	if err := Encode(&buf, msg); err != nil {
+		t.Fatalf("encode: %v", err)
+	}
+	if buf.Len() == 0 || buf.Bytes()[buf.Len()-1] != '\n' {
 		t.Error("encoded message should end with newline")
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/protocol/codec_test.go` around lines 62 - 69, The test
TestEncodeAddsNewline currently discards Encode's error and directly indexes
buf, which can panic on encoding failure or empty output; update the test to
check the error returned by Encode(msg) and call t.Fatalf/t.Fatalf-like failure
if non-nil, then guard the buffer access by asserting buf.Len() > 0 (and failing
the test if it's zero) before inspecting buf.Bytes()[buf.Len()-1]; reference
Encode, msg/Bye and buf in the changes.

9-34: Hello test uses an out-of-scope mail.* event key.

This PR is scoped to IM events per the objectives; using mail.user_mailbox.event.message_received_v1 here is harmless for the codec (it's treated as an opaque string) but misleading to readers. Consider aligning with an IM key (e.g. im.message.receive_v1) for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/protocol/codec_test.go` around lines 9 - 34, The test
TestEncodeDecodeHello uses a mail event key which is out of scope for this PR;
update the Hello test instance (in TestEncodeDecodeHello) to use an IM-scoped
event key and types (e.g. replace "mail.user_mailbox.event.message_received_v1"
in the Hello.EventKey and Hello.EventTypes with an IM key such as
"im.message.receive_v1") so the test data aligns with the PR objectives while
leaving Encode, Decode and Hello unchanged.
internal/event/schemas/pointer_test.go (1)

20-111: Add coverage for escaped JSON Pointer tokens.

These tests cover traversal shape well, but FieldOverrides are described as JSON Pointer keys; add a regression for ~1 and ~0 so slash/tilde field names don’t silently break.
As per coding guidelines, **/*.go: Every behavior change must have an accompanying test.

🧪 Suggested additional test
+func TestResolvePointer_EscapedTokens(t *testing.T) {
+	schema := parseSchema(t, `{
+        "type":"object",
+        "properties":{
+            "a/b":{"type":"string"},
+            "tilde~key":{"type":"number"}
+        }
+    }`)
+
+	if nodes := ResolvePointer(schema, "/a~1b"); len(nodes) != 1 || nodes[0]["type"] != "string" {
+		t.Fatalf("want escaped slash property, got %v", nodes)
+	}
+	if nodes := ResolvePointer(schema, "/tilde~0key"); len(nodes) != 1 || nodes[0]["type"] != "number" {
+		t.Fatalf("want escaped tilde property, got %v", nodes)
+	}
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/schemas/pointer_test.go` around lines 20 - 111, Add a unit
test to pointer_test.go to cover JSON Pointer escaped tokens (~1 for '/' and ~0
for '~') so ResolvePointer correctly decodes and resolves field names containing
slashes and tildes; locate tests around TestResolvePointer_* and add a new test
(e.g., TestResolvePointer_EscapedTokens) that builds a schema with property
names containing "/" and "~" (encoded in the pointer as "~1" and "~0"), calls
ResolvePointer with the escaped pointer, and asserts it returns the expected
node(s) (checking length and type/value) to prevent regressions in pointer
decoding.
cmd/event/bus.go (1)

35-35: Return structured RunE errors for agent parsing.

The hidden bus is part of the subprocess contract, so config, logger, and runtime failures should be wrapped with output.Errorf or output.ErrWithHint and include stable context/action hints instead of returning raw errors.
As per coding guidelines, cmd/**/*.go: Use output.Errorf or output.ErrWithHint in RunE functions instead of bare fmt.Errorf to ensure structured errors that AI agents can parse.

Also applies to: 42-42, 62-62

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/bus.go` at line 35, Replace bare error returns in the RunE handlers
for the hidden bus with structured output errors: instead of returning err (and
similar returns at the other occurrences), wrap the error using output.Errorf or
output.ErrWithHint and include a stable context/action hint (e.g., "starting
hidden bus", "loading bus config", "initializing runtime") along with the
original err to preserve details; update the return sites that currently do
"return err" (including the occurrences around lines shown) to return
output.Errorf("hidden bus: %s: %v", "<action hint>", err) or use
output.ErrWithHint(err, "<action hint>", "<suggested fix>") so AI agents can
parse structured errors.
cmd/event/consume_test.go (1)

112-120: Add traversal bypass cases for output-dir sanitization.

The unsafe-path coverage should include bare and nested .. forms so a prefix-only rejection can’t regress path safety. As per coding guidelines, “Every behavior change must have an accompanying test”.

🧪 Proposed additional cases
 		{
 			name:       "parent escape rejected",
 			in:         "../outside",
 			wantSentry: errOutputDirUnsafe,
 		},
+		{
+			name:       "bare parent rejected",
+			in:         "..",
+			wantSentry: errOutputDirUnsafe,
+		},
+		{
+			name:       "nested parent escape rejected",
+			in:         "events/../../outside",
+			wantSentry: errOutputDirUnsafe,
+		},
 		{
 			name:       "absolute path rejected",
 			in:         "/tmp/events",
 			wantSentry: errOutputDirUnsafe,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/consume_test.go` around lines 112 - 120, Add two table-driven test
cases to the existing slice in cmd/event/consume_test.go (the same table that
contains cases with fields name, in, wantSentry) to cover traversal bypass: one
with name "bare parent rejected" and in set to "..", and one with name "nested
parent rejected" and in set to "sub/dir/../.."(or similar nested ../ form); both
should expect wantSentry to be errOutputDirUnsafe so the output-dir sanitization
rejects bare and nested parent traversals and prevents prefix-only matches.
cmd/event/runtime.go (1)

34-43: Empty Content-Type on 4xx/5xx falls through to JSON parsing.

When a gateway returns a non-JSON error response with no Content-Type header (not uncommon for minimal reverse-proxy errors), the ct != "" guard sends it down ParseJSONResponse, producing a less actionable "decode" error instead of the nice api METHOD PATH returned N: <body> message. Consider treating empty Content-Type on HTTP error the same as non-JSON.

Also, body locally shadows the body interface{} parameter — a rename (e.g., snippet) would improve readability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/runtime.go` around lines 34 - 43, The error branch should treat
empty Content-Type as non-JSON and avoid shadowing the function's body
parameter: change the guard to if resp.StatusCode >= 400 &&
!client.IsJSONContentType(ct) (remove the ct != "" check) so responses with no
Content-Type hit the friendly error path, and rename the local variable
currently declared as body (which shadows the body interface{} parameter) to
snippet (or similar) when creating the preview from resp.RawBody before
returning fmt.Errorf("api %s %s returned %d: %s", method, path, resp.StatusCode,
snippet).
internal/event/consume/remote_preflight.go (1)

43-61: Control-char handling doesn't match the comment.

The comment says "Collapse control chars to spaces" but only \n, \r, \t are replaced — other C0 control bytes (0x00-0x1F, 0x7F) still pass through, and so would a stray \r\n ambiguity in log forging. Either broaden the check or soften the comment.

Proposed tightening
-	for _, r := range s {
-		if r == '\n' || r == '\r' || r == '\t' {
-			out = append(out, ' ')
-			continue
-		}
-		out = append(out, string(r)...)
-	}
+	for _, r := range s {
+		if r < 0x20 || r == 0x7f {
+			out = append(out, ' ')
+			continue
+		}
+		out = utf8.AppendRune(out, r)
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/remote_preflight.go` around lines 43 - 61,
truncateForError currently only collapses '\n', '\r', '\t' but lets other C0
control bytes through; change the control-char handling in truncateForError to
map all control characters to a single space (e.g. use unicode.IsControl(r) or
check r < 0x20 || r == 0x7F) instead of only those three, and when emitting
non-control runes append their UTF-8 bytes correctly (avoid append(out,
string(r)...) inefficiency) so the output cannot contain stray control bytes
that could forge logs.
cmd/event/suggestions.go (1)

32-48: Use a stable sort so suggestions are deterministic across runs.

Every substring match gets dist == 0, so when more than maxSuggestions keys substring-match the input (e.g. im.message against the full im.message.* set), sort.Slice may return a different 3 out of N on each invocation. That makes "did you mean ..." output flap and, in principle, could produce flaky behavior in tests that do stricter ordering checks than the ones in this PR.

♻️ Proposed fix
-	sort.Slice(hits, func(i, j int) bool { return hits[i].dist < hits[j].dist })
+	sort.SliceStable(hits, func(i, j int) bool { return hits[i].dist < hits[j].dist })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/suggestions.go` around lines 32 - 48, Sort currently uses
sort.Slice causing nondeterministic order when distances tie (many substring
matches with dist==0); change to a stable, deterministic sort by replacing the
comparator with a stable ordering on (dist, key) — e.g. use sort.SliceStable (or
sort.Slice with a comparator that first compares hits[i].dist vs hits[j].dist
and, if equal, compares hits[i].key vs hits[j].key) for the hits []match slice
so selecting the top n (maxSuggestions) is deterministic; update the sort call
in suggestions.go around the hits sort to use that stable tie-breaker.
internal/event/dedup.go (1)

42-83: Dual-layer dedup algorithm is correct; one minor efficiency quirk worth a one-liner comment.

Traced the TTL-map + ring-buffer logic against the tests in dedup_test.go; the invariants hold. One minor observation that doesn't affect correctness: when seen[eventID] exists but is TTL-expired, you delete it and fall through to the re-record path without scanning the ring. At line 71, if d.ring[d.pos] happens to equal eventID (same event happens to sit at the current rotating slot), line 72's delete(d.seen, old) will undo the seen[eventID] = now write from line 68. Subsequent IsDuplicate(eventID) still returns true via the ring-scan fallback, so it's not a correctness bug — just an edge case where the next lookup pays the O(ring) fallback cost instead of the O(1) map hit. Likelihood is ~1/ringSize per expired re-insert, so with default ringSize=10000 this is negligible. Safe to defer, but a short // Note: comment here would save a future reader the trace.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/dedup.go` around lines 42 - 83, In DedupFilter.IsDuplicate,
add a one-line comment near the eviction block (the code that does "if old :=
d.ring[d.pos]; old != "" { delete(d.seen, old) }") explaining the edge-case
where an event whose TTL just expired can be re-recorded into seen but then
immediately evicted if d.ring[d.pos] == eventID, causing the next lookup to hit
the ring fallback instead of the O(1) map lookup; reference the
DedupFilter.IsDuplicate method and the seen, ring and pos fields so future
readers understand this benign efficiency quirk.
internal/event/integration_test.go (1)

253-263: Silently dropping protocol.Encode/Decode errors hides root cause when a test fails.

In connectConsumer (and similarly at Lines 352/374) protocol.Encode(conn, hello) and protocol.Decode(sc.Bytes()) return values are discarded. If the Hello write fails or the HelloAck is malformed, the test will fail later at sc.Scan() / the type assertion with a symptom (no hello_ack / expected HelloAck) that points away from the actual cause. A quick if err != nil { t.Fatalf(...) } here will make future regressions diagnose themselves. Same pattern applies to Line 352 in TestIntegration_DedupFilter.

🧰 Proposed fix
-		protocol.Encode(conn, hello)
+		if err := protocol.Encode(conn, hello); err != nil {
+			t.Fatalf("%s: encode hello: %v", name, err)
+		}
 		sc := bufio.NewScanner(conn)
 		conn.SetReadDeadline(time.Now().Add(3 * time.Second))
 		if !sc.Scan() {
 			t.Fatalf("%s: no hello_ack", name)
 		}
-		msg, _ := protocol.Decode(sc.Bytes())
+		msg, err := protocol.Decode(sc.Bytes())
+		if err != nil {
+			t.Fatalf("%s: decode hello_ack: %v", name, err)
+		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/integration_test.go` around lines 253 - 263, In
connectConsumer, capture and handle errors returned by protocol.Encode(conn,
hello) and protocol.Decode(sc.Bytes()) instead of discarding them: check each
error and call t.Fatalf with a descriptive message and the error (e.g.,
"protocol.Encode hello failed: %v" / "protocol.Decode hello_ack failed: %v");
apply the same pattern in the other test locations mentioned
(TestIntegration_DedupFilter) so any write/parse failures are reported
immediately rather than surfacing later as "no hello_ack" or wrong type errors.
internal/appmeta/app_version.go (1)

52-60: Defensive: url.PathEscape(appID) before interpolation.

Feishu AppIDs are tightly constrained (cli_[a-z0-9]+), so this isn't an active bug, but the path is built by string formatting without escaping — a future caller who feeds this from less-trusted input (copy-pasted values, legacy configs) could silently produce a malformed URL or unintended query-string injection (appID containing ? or /). url.PathEscape is cheap insurance and keeps the failure mode as "404 from the API" rather than "unexpected query reshape".

🧰 Suggested tweak
-	path := fmt.Sprintf(
-		"/open-apis/application/v6/applications/%s/app_versions?lang=zh_cn&page_size=2",
-		appID,
-	)
+	path := fmt.Sprintf(
+		"/open-apis/application/v6/applications/%s/app_versions?lang=zh_cn&page_size=2",
+		url.PathEscape(appID),
+	)

(Requires adding "net/url" to the import block.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/appmeta/app_version.go` around lines 52 - 60, The path is built by
string formatting in FetchCurrentPublished without escaping appID, which can
allow malformed URLs or query injection; update FetchCurrentPublished to call
url.PathEscape(appID) when interpolating appID into the path (and add "net/url"
to the imports) so the constructed path uses the escaped value and preserves
safe behavior for untrusted or malformed appID inputs.
cmd/event/schema.go (1)

175-178: Use output.Errorf in RunE for structured AI-parsable errors.

runSchema is invoked from a Cobra RunE, so a bare fmt.Errorf here bypasses the structured error envelope AI agents rely on. Path is only reachable on an internal marshal/unmarshal failure of schemas we just produced, so it's very unlikely to fire — still, easy to align with the rest of the command.

♻️ Proposed tweak
-	resolved, _, err := resolveSchemaJSON(def)
-	if err != nil {
-		return fmt.Errorf("resolve schema: %w", err)
-	}
+	resolved, _, err := resolveSchemaJSON(def)
+	if err != nil {
+		return output.Errorf(output.ExitInternal, "internal", "resolve schema: %s", err)
+	}

As per coding guidelines: "Use output.Errorf or output.ErrWithHint in RunE functions instead of bare fmt.Errorf to ensure structured errors that AI agents can parse".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/schema.go` around lines 175 - 178, Replace the bare fmt.Errorf
returned from resolveSchemaJSON in runSchema with the structured output.Errorf
(or output.ErrWithHint if you can provide a hint) so the Cobra RunE returns an
AI-parsable error envelope; specifically, in the runSchema function where you
call resolved, _, err := resolveSchemaJSON(def) check err and call
output.Errorf("resolve schema: %v", err) (or output.ErrWithHint(..., hint))
instead of return fmt.Errorf(...) to align with RunE error handling conventions.
internal/event/schemas/fromtype_test.go (1)

246-257: Nit: %q applied to a pointer-to-struct value.

At lines 247 and 256 the t.Errorf format string uses %q but passes recipients.Items / states.Items, which are *struct{...}. %q expects a string-like, so on failure the test output will look like %!q(*struct...=0x...) rather than a useful value. Consider formatting the fields explicitly.

♻️ Proposed tweak
 	if recipients.Items == nil || recipients.Items.Format != "email" {
-		t.Errorf("recipients.items.format = %q, want email", recipients.Items)
+		t.Errorf("recipients.items = %+v, want Items.Format=email", recipients.Items)
 	}
@@
 	if states.Items == nil || len(states.Items.Enum) != 3 {
-		t.Errorf("states.items.enum = %v, want 3 values", states.Items)
+		t.Errorf("states.items = %+v, want 3 enum values", states.Items)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/schemas/fromtype_test.go` around lines 246 - 257, The test is
using %q with pointer values (recipients.Items and states.Items) which prints
ugly pointer details; change the t.Errorf calls to format the actual fields
instead — for the recipients check use recipients.Items.Format (e.g. %q,
recipients.Items.Format) and for the states check log the items' enum length or
values (e.g. len(states.Items.Enum) or %#v, states.Items.Enum) so the failure
message shows meaningful data rather than a pointer.
internal/event/types.go (1)

83-128: Use the ProcessFunc alias inside KeyDefinition to avoid drift.

ProcessFunc is declared at Line 84, but KeyDefinition.Process at Line 125 re-spells the same signature inline. If the hook signature ever evolves (e.g., adding an error-wrapping parameter), the two definitions must change in lockstep and the compiler won't help you notice. Reusing the alias removes the duplication and makes ProcessFunc the single source of truth.

♻️ Proposed tweak
 	// Process is the business logic function. Required when Schema.Custom
 	// carries Processed output; must be nil when Schema.Native is used.
-	Process func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error) `json:"-"`
+	Process ProcessFunc `json:"-"`
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/types.go` around lines 83 - 128, The KeyDefinition struct
duplicates the ProcessFunc signature inline; update KeyDefinition.Process to use
the ProcessFunc alias instead of re-declaring the func type so the signature is
centralized — change the field type of KeyDefinition.Process from the inline
func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string)
(json.RawMessage, error) to ProcessFunc (keeping the `json:"-"` tag intact) so
future signature changes only need to be made on ProcessFunc.
internal/event/consume/startup.go (1)

92-98: Retry loop ignores context cancellation.

EnsureBus accepts ctx but the post-fork dial retry at Lines 92–98 only checks wall-clock deadline. If the caller cancels ctx (Ctrl-C, parent deadline, timeout wrapper in Run), this loop keeps sleeping/dialing for up to dialTimeout. Returning on ctx.Err() here makes the shutdown path responsive and matches how the rest of the consume pipeline treats ctx as authoritative.

♻️ Proposed change
 	deadline := time.Now().Add(dialTimeout)
 	for time.Now().Before(deadline) {
-		time.Sleep(dialRetryInterval)
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		case <-time.After(dialRetryInterval):
+		}
 		if conn, err := tr.Dial(addr); err == nil {
 			return conn, nil
 		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/startup.go` around lines 92 - 98, The retry loop in
EnsureBus ignores the supplied ctx and only checks a wall-clock deadline; update
the loop around tr.Dial(addr) to observe context cancellation: replace the
time.Sleep+dialRetryInterval with a select that waits on either ctx.Done() or
time.After(dialRetryInterval), and if ctx.Err() != nil return ctx.Err(); keep
the existing deadline logic (dialTimeout) intact and still return conn,nil on
successful tr.Dial(addr). Ensure the same ctx cancellation check is applied
before each dial attempt so the function returns promptly when ctx is canceled.
internal/event/testutil/testutil.go (1)

117-128: ErrStubUnconfigured is defined but never returned.

CallAPI falls back to json.RawMessage("{}") whenever both Body and Err are unset, so the exported ErrStubUnconfigured sentinel (documented as "returned as a sentinel so tests can assert via errors.Is") is never actually produced by this stub. Either wire it into the Body == "" && Err == nil path, or drop the sentinel so tests don't rely on dead API surface.

♻️ Suggested wiring
 	if s.Err != nil {
 		return nil, s.Err
 	}
 	if s.Body == "" {
-		return json.RawMessage("{}"), nil
+		return nil, ErrStubUnconfigured
 	}
 	return json.RawMessage(s.Body), nil

(Or keep the {} default and remove ErrStubUnconfigured.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/testutil/testutil.go` around lines 117 - 128, The exported
sentinel ErrStubUnconfigured is never returned by CallAPI because the function
currently returns json.RawMessage("{}") when s.Body == "" and s.Err == nil;
update CallAPI (in the StubAPIClient/CallAPI code) to return nil,
ErrStubUnconfigured when s.Body == "" && s.Err == nil (or alternatively remove
ErrStubUnconfigured and its documentation/tests if you prefer the {} default),
and adjust any tests that expect the sentinel accordingly so the symbol is
either actually produced or removed from the API surface.

Comment thread cmd/event/bus.go
return err
}

eventsDir := filepath.Join(core.GetConfigDir(), "events", cfg.AppID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sanitize cfg.AppID before using it as a path component.

cfg.AppID is joined into an app-managed filesystem path before logger setup. If the config is corrupted or manually edited to include separators like ../, the bus log directory can escape {configDir}/events; reuse a shared AppID/path-component sanitizer before filepath.Join.
Based on learnings, app-managed paths do not need redundant SafeInputPath only when the root directory is validated and filename/path components are sanitized.

🛡️ Proposed shape
-			eventsDir := filepath.Join(core.GetConfigDir(), "events", cfg.AppID)
+			safeAppID := sanitizeEventPathComponent(cfg.AppID)
+			eventsDir := filepath.Join(core.GetConfigDir(), "events", safeAppID)

Consider sharing the same sanitization policy used for lockfile AppID-derived names so bus logs and locks cannot diverge on edge-case IDs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/bus.go` at line 38, Sanitize cfg.AppID before using it in
filepath.Join to prevent path traversal: call the shared AppID/path-component
sanitizer used for lockfile names (e.g., the existing SafeInputPath or the
project’s AppID sanitizer) on cfg.AppID, then use the sanitized value in
filepath.Join(core.GetConfigDir(), "events", sanitizedAppID) so bus log
directories cannot escape the config/events root; update the code that
constructs eventsDir (referencing cfg.AppID, filepath.Join, core.GetConfigDir())
to use the sanitizedAppID.

Comment thread cmd/event/stop.go
Comment on lines +256 to +274
func discoverAppIDs() []string {
eventsDir := filepath.Join(core.GetConfigDir(), "events")
entries, err := os.ReadDir(eventsDir)
if err != nil {
return nil
}
var ids []string
for _, e := range entries {
if !e.IsDir() {
continue
}
sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock")
if _, statErr := os.Stat(sockPath); statErr != nil {
continue
}
ids = append(ids, e.Name())
}
return ids
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use vfs.* for filesystem access in discoverAppIDs.

os.ReadDir and os.Stat bypass the project's mockable filesystem abstraction, making --all behavior (which appIDs get discovered) unmockable in unit tests. Switch to vfs.ReadDir / vfs.Stat.

♻️ Proposed fix
 	eventsDir := filepath.Join(core.GetConfigDir(), "events")
-	entries, err := os.ReadDir(eventsDir)
+	entries, err := vfs.ReadDir(eventsDir)
 	if err != nil {
 		return nil
 	}
 	var ids []string
 	for _, e := range entries {
 		if !e.IsDir() {
 			continue
 		}
 		sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock")
-		if _, statErr := os.Stat(sockPath); statErr != nil {
+		if _, statErr := vfs.Stat(sockPath); statErr != nil {
 			continue
 		}
 		ids = append(ids, e.Name())
 	}
 	return ids

Add "github.com/larksuite/cli/internal/vfs" to the imports.

As per coding guidelines: "Use vfs.* functions instead of os.* for all filesystem access to enable test mocking".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func discoverAppIDs() []string {
eventsDir := filepath.Join(core.GetConfigDir(), "events")
entries, err := os.ReadDir(eventsDir)
if err != nil {
return nil
}
var ids []string
for _, e := range entries {
if !e.IsDir() {
continue
}
sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock")
if _, statErr := os.Stat(sockPath); statErr != nil {
continue
}
ids = append(ids, e.Name())
}
return ids
}
func discoverAppIDs() []string {
eventsDir := filepath.Join(core.GetConfigDir(), "events")
entries, err := vfs.ReadDir(eventsDir)
if err != nil {
return nil
}
var ids []string
for _, e := range entries {
if !e.IsDir() {
continue
}
sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock")
if _, statErr := vfs.Stat(sockPath); statErr != nil {
continue
}
ids = append(ids, e.Name())
}
return ids
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/event/stop.go` around lines 256 - 274, discoverAppIDs currently uses
os.ReadDir and os.Stat which bypass the mockable filesystem; replace those calls
with vfs.ReadDir and vfs.Stat and add the import
"github.com/larksuite/cli/internal/vfs" so the function uses the project's vfs
abstraction (keep the same logic and return values, only swap ReadDir/Stat to
vfs.ReadDir/vfs.Stat in the discoverAppIDs function).

Comment thread events/im/register.go
Comment on lines +20 to +32
out := []event.KeyDefinition{
{
Key: "im.message.receive_v1",
DisplayName: "Receive message",
Description: "接收 IM 消息(text/post/image/file/audio/media/sticker/interactive/share_chat/share_user/system 所有类型)",
EventType: "im.message.receive_v1",
Schema: event.SchemaDef{
Custom: &event.SchemaSpec{Type: reflect.TypeOf(ImMessageReceiveOutput{})},
},
Process: processImMessageReceive,
AuthTypes: []string{"bot"},
RequiredConsoleEvents: []string{"im.message.receive_v1"},
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify how key scopes are consumed and whether im.message.receive_v1 remains scope-less.
# Expected: preflight/status logic should depend on KeyDefinition.Scopes, and this key should have required scopes.
rg -n -C4 'im\.message\.receive_v1|RequiredConsoleEvents|\.Scopes\b|TenantScopes' events cmd internal

Repository: larksuite/cli

Length of output: 48893


Add required scopes to im.message.receive_v1 key definition.

The processed key omits Scopes, which causes scope preflight validation to be skipped (see cmd/event/consume.go line 273: the check returns early when Scopes is empty). Wire the required scopes to match the pattern used by native IM keys on line 45, so that scope checks are enforced uniformly across all IM event registrations.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@events/im/register.go` around lines 20 - 32, The KeyDefinition for
"im.message.receive_v1" is missing the Scopes field so scope preflight is
skipped; update the KeyDefinition in events/im/register.go (the struct literal
that sets Key: "im.message.receive_v1" and Process: processImMessageReceive) to
include the same Scopes slice used by the native IM keys (mirror the pattern
used by the other IM KeyDefinition entries), e.g. add a Scopes field with the
identical scope strings those native IM keys use so scope validation runs for
this event.

Comment thread internal/event/bus/bus.go
Comment on lines +339 to +348
ack := protocol.NewHelloAck("v1", firstForKey)
_ = protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout)

// Quote untrusted fields — EventKey / EventTypes come straight off
// the wire from an unprivileged local process, and a value with
// embedded newlines would forge bus.log entries.
b.logger.Printf("Consumer connected: pid=%d key=%q event_types=%q first=%v",
hello.PID, hello.EventKey, hello.EventTypes, firstForKey)

bc.Start()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle failed HelloAck writes before starting the consumer.

Line 340 ignores the handshake write error after the connection has already been registered. If the client disconnected or the write times out, the hub keeps a consumer that never received the ack, which can skew first/last bookkeeping and keep the bus non-idle until later cleanup.

Proposed fix
 	ack := protocol.NewHelloAck("v1", firstForKey)
-	_ = protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout)
+	if err := protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout); err != nil {
+		b.logger.Printf("Consumer handshake failed: pid=%d key=%q err=%v", hello.PID, hello.EventKey, err)
+		bc.Close()
+		return
+	}
 
 	// Quote untrusted fields — EventKey / EventTypes come straight off
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ack := protocol.NewHelloAck("v1", firstForKey)
_ = protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout)
// Quote untrusted fields — EventKey / EventTypes come straight off
// the wire from an unprivileged local process, and a value with
// embedded newlines would forge bus.log entries.
b.logger.Printf("Consumer connected: pid=%d key=%q event_types=%q first=%v",
hello.PID, hello.EventKey, hello.EventTypes, firstForKey)
bc.Start()
ack := protocol.NewHelloAck("v1", firstForKey)
if err := protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout); err != nil {
b.logger.Printf("Consumer handshake failed: pid=%d key=%q err=%v", hello.PID, hello.EventKey, err)
bc.Close()
return
}
// Quote untrusted fields — EventKey / EventTypes come straight off
// the wire from an unprivileged local process, and a value with
// embedded newlines would forge bus.log entries.
b.logger.Printf("Consumer connected: pid=%d key=%q event_types=%q first=%v",
hello.PID, hello.EventKey, hello.EventTypes, firstForKey)
bc.Start()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/bus/bus.go` around lines 339 - 348, The handshake ack write
(protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout)) is ignored;
check its returned error and if non-nil abort registration: log the failure
(include error and context from hello.EventKey/hello.PID), close the connection,
avoid calling bc.Start() and remove/unregister the consumer so it isn't left
tracked; ensure any resources allocated for this consumer are cleaned up before
returning. Use the existing symbols protocol.NewHelloAck,
protocol.EncodeWithDeadline, conn, hello, b.logger.Printf, and bc.Start to
locate and implement this early-return-on-error behavior.

Comment on lines +114 to +171
func (c *Conn) SenderLoop() {
for {
select {
case <-c.closed:
return
case msg := <-c.sendCh:
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := protocol.Encode(c.conn, msg); err != nil {
if c.logger != nil {
c.logger.Printf("WARN: write to pid=%d failed: %v", c.pid, err)
}
c.shutdown()
return
}
}
}
}

// ReaderLoop monitors the connection for EOF/close and processes any
// control messages (Bye, PreShutdownCheck) the consume client sends.
// Uses the bufio.Reader handed in from handleConn so buffered bytes
// carried over from the Hello read aren't dropped.
func (c *Conn) ReaderLoop() {
for {
line, err := protocol.ReadFrame(c.reader)
if err != nil {
break
}
line = bytes.TrimRight(line, "\n")
if len(line) == 0 {
continue
}
msg, err := protocol.Decode(line)
if err != nil {
continue
}
c.handleControlMessage(msg)
}
c.shutdown()
}

func (c *Conn) handleControlMessage(msg interface{}) {
switch m := msg.(type) {
case *protocol.Bye:
c.shutdown()
case *protocol.PreShutdownCheck:
// Query the Hub (via callback) and reply with whether this is the
// last consumer for the given EventKey. The consume client uses this
// to decide whether to run cleanup (e.g. unsubscribe mailbox).
lastForKey := true
if c.checkLastForKey != nil {
lastForKey = c.checkLastForKey(m.EventKey)
}
ack := protocol.NewPreShutdownAck(lastForKey)
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
protocol.Encode(c.conn, ack)
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Concurrent writes to c.conn from two goroutines can corrupt framing.

SenderLoop writes events via protocol.Encode(c.conn, msg) from one goroutine (Lines 120-121), while handleControlMessage writes a PreShutdownAck via protocol.Encode(c.conn, ack) from the ReaderLoop goroutine (Lines 168-169). net.Conn.Write is safe to call concurrently in the sense that it won't race the runtime, but the byte streams can interleave arbitrarily — and the protocol here is newline-delimited JSON, so any interleave produces a malformed frame that the consumer will fail to decode (or worse, decode into the wrong type). The paired SetWriteDeadline calls also clobber each other because the deadline is per-conn, not per-goroutine.

The simplest fixes:

  1. Route the ack through sendCh so all writes serialise through SenderLoop. PushDropOldest is already the supported publish path.
  2. Or, add a writeMu sync.Mutex held across SetWriteDeadline+Encode in both sites.

Option 1 is preferred since it keeps the single-writer invariant the comment at Line 177 implicitly relies on.

♻️ Sketch of option 1
 	case *protocol.PreShutdownCheck:
 		lastForKey := true
 		if c.checkLastForKey != nil {
 			lastForKey = c.checkLastForKey(m.EventKey)
 		}
 		ack := protocol.NewPreShutdownAck(lastForKey)
-		c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
-		protocol.Encode(c.conn, ack)
+		// Route through sendCh so SenderLoop remains the sole writer.
+		select {
+		case c.sendCh <- ack:
+		case <-c.closed:
+		}
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *Conn) SenderLoop() {
for {
select {
case <-c.closed:
return
case msg := <-c.sendCh:
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := protocol.Encode(c.conn, msg); err != nil {
if c.logger != nil {
c.logger.Printf("WARN: write to pid=%d failed: %v", c.pid, err)
}
c.shutdown()
return
}
}
}
}
// ReaderLoop monitors the connection for EOF/close and processes any
// control messages (Bye, PreShutdownCheck) the consume client sends.
// Uses the bufio.Reader handed in from handleConn so buffered bytes
// carried over from the Hello read aren't dropped.
func (c *Conn) ReaderLoop() {
for {
line, err := protocol.ReadFrame(c.reader)
if err != nil {
break
}
line = bytes.TrimRight(line, "\n")
if len(line) == 0 {
continue
}
msg, err := protocol.Decode(line)
if err != nil {
continue
}
c.handleControlMessage(msg)
}
c.shutdown()
}
func (c *Conn) handleControlMessage(msg interface{}) {
switch m := msg.(type) {
case *protocol.Bye:
c.shutdown()
case *protocol.PreShutdownCheck:
// Query the Hub (via callback) and reply with whether this is the
// last consumer for the given EventKey. The consume client uses this
// to decide whether to run cleanup (e.g. unsubscribe mailbox).
lastForKey := true
if c.checkLastForKey != nil {
lastForKey = c.checkLastForKey(m.EventKey)
}
ack := protocol.NewPreShutdownAck(lastForKey)
c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
protocol.Encode(c.conn, ack)
}
}
func (c *Conn) handleControlMessage(msg interface{}) {
switch m := msg.(type) {
case *protocol.Bye:
c.shutdown()
case *protocol.PreShutdownCheck:
// Query the Hub (via callback) and reply with whether this is the
// last consumer for the given EventKey. The consume client uses this
// to decide whether to run cleanup (e.g. unsubscribe mailbox).
lastForKey := true
if c.checkLastForKey != nil {
lastForKey = c.checkLastForKey(m.EventKey)
}
ack := protocol.NewPreShutdownAck(lastForKey)
// Route through sendCh so SenderLoop remains the sole writer.
select {
case c.sendCh <- ack:
case <-c.closed:
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/bus/conn.go` around lines 114 - 171,
ReaderLoop/handleControlMessage currently write directly to c.conn (calling
SetWriteDeadline + protocol.Encode) while SenderLoop also writes, which can
interleave frames; instead build the PreShutdownAck in handleControlMessage and
send it into c.sendCh so all writes go through SenderLoop. Concretely, remove
the direct c.conn.SetWriteDeadline/protocol.Encode call in handleControlMessage,
create the ack via protocol.NewPreShutdownAck(lastForKey) and enqueue it with
the same push helper used for normal publishes (e.g. PushDropOldest or the
existing send enqueue path) so SenderLoop performs the single
SetWriteDeadline+Encode sequence; ensure the enqueue is non-blocking or uses the
existing drop-oldest semantics to avoid deadlocks.

Comment thread internal/event/bus/hub.go
Comment on lines +78 to +110
func (h *Hub) UnregisterAndIsLast(s Subscriber) bool {
h.mu.Lock()
defer h.mu.Unlock()
delete(h.subscribers, s)
if h.keyCounts[s.EventKey()] > 0 {
h.keyCounts[s.EventKey()]--
}
isLast := h.keyCounts[s.EventKey()] == 0
if isLast {
delete(h.keyCounts, s.EventKey())
}
return isLast
}

// AcquireCleanupLock is the race-free replacement for "check last then
// cleanup". Returns true iff this subscriber is still the only one
// registered for eventKey AND no other cleanup is already in progress. On
// true return, caller MUST eventually call ReleaseCleanupLock to unblock
// any waiting RegisterAndIsFirst. Both checks (count <= 1 and
// already-locked) run under the same write lock so they are atomic —
// preventing a late-arriving Hello from slipping in between the check and
// the reservation.
func (h *Hub) AcquireCleanupLock(eventKey string) bool {
h.mu.Lock()
defer h.mu.Unlock()
if h.keyCounts[eventKey] > 1 {
return false
}
if _, alreadyLocked := h.cleanupInProgress[eventKey]; alreadyLocked {
return false
}
h.cleanupInProgress[eventKey] = make(chan struct{})
return true
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make lifecycle bookkeeping exact for absent subscribers.

UnregisterAndIsLast decrements keyCounts even when s was not registered, and AcquireCleanupLock can acquire cleanup rights when the count is 0. A duplicate/stale unregister or cleanup check can therefore corrupt first/last decisions for an active key.

Proposed fix
 func (h *Hub) UnregisterAndIsLast(s Subscriber) bool {
 	h.mu.Lock()
 	defer h.mu.Unlock()
-	delete(h.subscribers, s)
-	if h.keyCounts[s.EventKey()] > 0 {
-		h.keyCounts[s.EventKey()]--
+	eventKey := s.EventKey()
+	if _, ok := h.subscribers[s]; !ok {
+		return h.keyCounts[eventKey] == 0
 	}
-	isLast := h.keyCounts[s.EventKey()] == 0
+	delete(h.subscribers, s)
+	if h.keyCounts[eventKey] > 0 {
+		h.keyCounts[eventKey]--
+	}
+	isLast := h.keyCounts[eventKey] == 0
 	if isLast {
-		delete(h.keyCounts, s.EventKey())
+		delete(h.keyCounts, eventKey)
 	}
 	return isLast
 }
@@
 func (h *Hub) AcquireCleanupLock(eventKey string) bool {
 	h.mu.Lock()
 	defer h.mu.Unlock()
-	if h.keyCounts[eventKey] > 1 {
+	if h.keyCounts[eventKey] != 1 {
 		return false
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (h *Hub) UnregisterAndIsLast(s Subscriber) bool {
h.mu.Lock()
defer h.mu.Unlock()
delete(h.subscribers, s)
if h.keyCounts[s.EventKey()] > 0 {
h.keyCounts[s.EventKey()]--
}
isLast := h.keyCounts[s.EventKey()] == 0
if isLast {
delete(h.keyCounts, s.EventKey())
}
return isLast
}
// AcquireCleanupLock is the race-free replacement for "check last then
// cleanup". Returns true iff this subscriber is still the only one
// registered for eventKey AND no other cleanup is already in progress. On
// true return, caller MUST eventually call ReleaseCleanupLock to unblock
// any waiting RegisterAndIsFirst. Both checks (count <= 1 and
// already-locked) run under the same write lock so they are atomic —
// preventing a late-arriving Hello from slipping in between the check and
// the reservation.
func (h *Hub) AcquireCleanupLock(eventKey string) bool {
h.mu.Lock()
defer h.mu.Unlock()
if h.keyCounts[eventKey] > 1 {
return false
}
if _, alreadyLocked := h.cleanupInProgress[eventKey]; alreadyLocked {
return false
}
h.cleanupInProgress[eventKey] = make(chan struct{})
return true
func (h *Hub) UnregisterAndIsLast(s Subscriber) bool {
h.mu.Lock()
defer h.mu.Unlock()
eventKey := s.EventKey()
if _, ok := h.subscribers[s]; !ok {
return h.keyCounts[eventKey] == 0
}
delete(h.subscribers, s)
if h.keyCounts[eventKey] > 0 {
h.keyCounts[eventKey]--
}
isLast := h.keyCounts[eventKey] == 0
if isLast {
delete(h.keyCounts, eventKey)
}
return isLast
}
// AcquireCleanupLock is the race-free replacement for "check last then
// cleanup". Returns true iff this subscriber is still the only one
// registered for eventKey AND no other cleanup is already in progress. On
// true return, caller MUST eventually call ReleaseCleanupLock to unblock
// any waiting RegisterAndIsFirst. Both checks (count <= 1 and
// already-locked) run under the same write lock so they are atomic —
// preventing a late-arriving Hello from slipping in between the check and
// the reservation.
func (h *Hub) AcquireCleanupLock(eventKey string) bool {
h.mu.Lock()
defer h.mu.Unlock()
if h.keyCounts[eventKey] != 1 {
return false
}
if _, alreadyLocked := h.cleanupInProgress[eventKey]; alreadyLocked {
return false
}
h.cleanupInProgress[eventKey] = make(chan struct{})
return true
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/bus/hub.go` around lines 78 - 110, UnregisterAndIsLast should
only decrement keyCounts when the subscriber was actually registered: in
UnregisterAndIsLast check existence with "_, existed := h.subscribers[s]" before
deleting and only decrement h.keyCounts[s.EventKey()] if existed; if not existed
return false (not last). Also tighten AcquireCleanupLock to only grant cleanup
when keyCounts[eventKey] == 1 (not <=1): change the current >1 check to require
equality ==1 so a zero count cannot acquire the lock; keep the existing
cleanupInProgress check and channel creation. These changes reference
UnregisterAndIsLast, AcquireCleanupLock, h.subscribers, h.keyCounts, and
h.cleanupInProgress.

Comment on lines +22 to +28
func newPlatformScanner() Scanner {
return &unixScanner{
runPS: func() ([]byte, error) {
return exec.Command("ps", "-eo", "pid,lstart,command").Output()
},
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Force C locale on ps to avoid silent failure on localized systems.

ps -o lstart emits weekday/month using the caller's LC_TIME/LANG. On any non-English-locale host (e.g. zh_CN, de_DE, fr_FR), lstart looks like "周日 4月 19 03:03:40 2026" and fails to parse against the English "Mon Jan _2 15:04:05 2006" layout. Since parseOneUnixPSLine silently drops unparseable lines, ScanBusProcesses returns an empty list → orphan detection is broken for those users, with no diagnostic.

🛠️ Proposed fix
 func newPlatformScanner() Scanner {
 	return &unixScanner{
 		runPS: func() ([]byte, error) {
-			return exec.Command("ps", "-eo", "pid,lstart,command").Output()
+			cmd := exec.Command("ps", "-eo", "pid,lstart,command")
+			// Force C locale so lstart is emitted in English (Mon Jan _2 ...).
+			// Without this, localized hosts produce unparseable timestamps and
+			// every candidate line is silently dropped.
+			cmd.Env = append(os.Environ(), "LC_ALL=C", "LANG=C")
+			return cmd.Output()
 		},
 	}
 }

Requires adding "os" to the import block.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/busdiscover/busdiscover_unix.go` around lines 22 - 28,
newPlatformScanner currently runs "ps -eo pid,lstart,command" which emits
localized lstart strings that parseOneUnixPSLine expects in English; fix by
forcing the C locale for the ps invocation in
newPlatformScanner/unixScanner.runPS so output is English (e.g. set LC_ALL=C in
the command's environment via os.Environ()) and add the "os" import; ensure
runPS uses that modified env so ScanBusProcesses and parseOneUnixPSLine reliably
parse on non-English hosts.

Comment on lines +26 to +30
br := bufio.NewReader(conn)
line, err := protocol.ReadFrame(br)
if err != nil {
return nil, nil, fmt.Errorf("no hello_ack received: %w", err)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether protocol.ReadFrame or surrounding handshake code already sets read deadlines.
# Expected: either doHello sets a read deadline, or ReadFrame is wrapped by code that does.
rg -n -C4 'func ReadFrame|ReadFrame\s*\(|SetReadDeadline|SetDeadline' internal/event

Repository: larksuite/cli

Length of output: 11058


Set a read deadline for the hello_ack handshake.

The consumer-side handshake read has no deadline. If the bus accepts the connection and never sends hello_ack, the consumer hangs indefinitely, breaking bounded startup. The bus-side handshake in internal/event/bus/bus.go sets a 5-second read deadline before ReadFrame and clears it after—apply the same pattern here.

Proposed fix
 import (
 	"bufio"
 	"bytes"
 	"fmt"
 	"net"
 	"os"
+	"time"
 
 	"github.com/larksuite/cli/internal/event/protocol"
 )
 
+const helloAckTimeout = 5 * time.Second
+
 	br := bufio.NewReader(conn)
+	if err := conn.SetReadDeadline(time.Now().Add(helloAckTimeout)); err != nil {
+		return nil, nil, fmt.Errorf("set hello_ack read deadline: %w", err)
+	}
 	line, err := protocol.ReadFrame(br)
+	_ = conn.SetReadDeadline(time.Time{})
 	if err != nil {
 		return nil, nil, fmt.Errorf("no hello_ack received: %w", err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
br := bufio.NewReader(conn)
line, err := protocol.ReadFrame(br)
if err != nil {
return nil, nil, fmt.Errorf("no hello_ack received: %w", err)
}
br := bufio.NewReader(conn)
if err := conn.SetReadDeadline(time.Now().Add(helloAckTimeout)); err != nil {
return nil, nil, fmt.Errorf("set hello_ack read deadline: %w", err)
}
line, err := protocol.ReadFrame(br)
_ = conn.SetReadDeadline(time.Time{})
if err != nil {
return nil, nil, fmt.Errorf("no hello_ack received: %w", err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/handshake.go` around lines 26 - 30, The handshake
reader in internal/event/consume/handshake.go currently calls
protocol.ReadFrame(br) without a deadline; set a read deadline on conn (e.g.
conn.SetReadDeadline(time.Now().Add(5*time.Second))) immediately before calling
protocol.ReadFrame(br) and clear it afterward
(conn.SetReadDeadline(time.Time{})) to mirror the bus-side handshake behavior;
ensure you import time if missing and retain the existing error handling around
the ReadFrame call (variables: conn, br, line, err).

Comment on lines +77 to +83
func TestReadyMarker_SuppressedWhenQuiet(t *testing.T) {
var buf bytes.Buffer
writeReadyMarker(&buf, Options{EventKey: "im.message.receive_v1", Quiet: true})

if buf.Len() != 0 {
t.Errorf("Quiet=true must suppress ready marker; got %q", buf.String())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t suppress the AI readiness marker in quiet mode.

The PR defines [event] ready event_key=<key> as the subprocess readiness contract before stdout. Quiet can suppress extra progress, but hiding this marker can leave event consume --quiet callers unable to detect readiness.

🛠️ Proposed test update
-func TestReadyMarker_SuppressedWhenQuiet(t *testing.T) {
+func TestReadyMarker_StillEmittedWhenQuiet(t *testing.T) {
 	var buf bytes.Buffer
 	writeReadyMarker(&buf, Options{EventKey: "im.message.receive_v1", Quiet: true})
 
-	if buf.Len() != 0 {
-		t.Errorf("Quiet=true must suppress ready marker; got %q", buf.String())
+	want := "[event] ready event_key=im.message.receive_v1\n"
+	if got := buf.String(); got != want {
+		t.Errorf("got %q, want %q", got, want)
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/listening_text_test.go` around lines 77 - 83, The test
TestReadyMarker_SuppressedWhenQuiet is wrong: the ready marker must still be
emitted regardless of Quiet. Update the test (and any assertions around
writeReadyMarker and Options) to assert that writeReadyMarker(&buf,
Options{EventKey:"im.message.receive_v1", Quiet:true}) writes the readiness line
(e.g. contains "[event] ready event_key=im.message.receive_v1") instead of
expecting an empty buffer; reference the TestReadyMarker_SuppressedWhenQuiet
test and the writeReadyMarker function/Options struct when making the change.

Comment on lines +42 to +51
scanner := bufio.NewScanner(conn)
if scanner.Scan() {
resp, err := protocol.Decode(scanner.Bytes())
if err == nil {
if ack, ok := resp.(*protocol.PreShutdownAck); ok {
return ack.LastForKey
}
}
}
return true
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wait for the PreShutdownAck instead of trusting the next frame.

Line 43 reads only one frame; if an event/source-state frame is already queued before the bus ack, this falls through to true and can run cleanup even when the bus would have returned LastForKey=false. Loop until the ack or deadline, ignoring unrelated frames.

🐛 Proposed fix
 	scanner := bufio.NewScanner(conn)
-	if scanner.Scan() {
+	for scanner.Scan() {
 		resp, err := protocol.Decode(scanner.Bytes())
 		if err == nil {
 			if ack, ok := resp.(*protocol.PreShutdownAck); ok {
 				return ack.LastForKey
 			}
 		}
 	}
 	return true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
scanner := bufio.NewScanner(conn)
if scanner.Scan() {
resp, err := protocol.Decode(scanner.Bytes())
if err == nil {
if ack, ok := resp.(*protocol.PreShutdownAck); ok {
return ack.LastForKey
}
}
}
return true
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
resp, err := protocol.Decode(scanner.Bytes())
if err == nil {
if ack, ok := resp.(*protocol.PreShutdownAck); ok {
return ack.LastForKey
}
}
}
return true
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/event/consume/shutdown.go` around lines 42 - 51, The current code
reads only one frame from conn with scanner.Scan() and returns true if that
frame isn’t a protocol.PreShutdownAck; change this to loop reading frames until
a PreShutdownAck is received or the read deadline expires: repeatedly call
scanner.Scan(), decode with protocol.Decode(scanner.Bytes()), ignore
non-*protocol.PreShutdownAck frames, and only return ack.LastForKey when you see
a *protocol.PreShutdownAck; on scanner error or deadline expiry return the
existing fallback (true). Update the logic around conn/scanner (and any read
deadline handling) so unrelated event/source-state frames are skipped rather
than causing an immediate shutdown decision.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size/XL Architecture-level or global-impact change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants