feat(event): add event subscription & consume system#615
feat(event): add event subscription & consume system#615liuxinyanglxy wants to merge 2 commits intomainfrom
Conversation
…detection
Introduces end-to-end Feishu event consumption via a new `lark-cli event`
command family. Users can subscribe to and consume real-time events
(IM messages, chat/member lifecycle, reactions, ...) in a forked bus
daemon architecture with orphan detection, reflected + overrideable JSON
schemas, and AI-friendly `--json` / `--jq` output.
Commands
--------
- `event list [--json]` list subscribable EventKeys
- `event schema <key>` Parameters + Output Schema + auth info
- `event consume <key>` foreground blocking consume; SIGINT/SIGTERM
/stdin-EOF shutdown; `--max-events` /
`--timeout` bounded; `--jq` projection;
`--output-dir` spool; `--param` KV inputs
- `event status [--fail-on-orphan] [--json]` bus daemon health
- `event stop [--all] [--force] [--json]` stop bus daemon(s)
- `event _bus` (hidden) forked daemon entrypoint
Architecture
------------
- Bus daemon (internal/event/bus): per-AppID forked process that holds
the Feishu long-poll connection and fans events out to 1..N local
consumers over an IPC socket. Drop-oldest backpressure, TOCTOU-safe
cleanup via AcquireCleanupLock, idle-timeout self-shutdown, graceful
SIGTERM.
- Consume client (internal/event/consume): fork+dial the daemon,
handshake, remote preflight (HTTP /open-apis/event/v1/connection),
JQ projection, sequence-gap detection, health probe. Bounded
execution (`--max-events` / `--timeout`) for AI/script usage.
- Wire protocol (internal/event/protocol): newline-delimited JSON
frames with 1 MB size cap and 5 s write deadlines. Hello / HelloAck /
PreShutdownCheck / Shutdown / StatusQuery control messages.
- Orphan detection (internal/event/busdiscover): OS process-table scan
(ps on Unix, PowerShell on Windows) with two-gate cmdline filter
(lark-cli + event _bus) that naturally rejects pid-reused unrelated
processes.
- Transport (internal/event/transport): Unix socket on darwin/linux,
Windows named pipe on windows.
- Schema system (internal/event, internal/event/schemas): SchemaDef with
mutually-exclusive Native (framework wraps V2 envelope) or Custom
(zero-touch) specs. Reflection reads `desc` / `enum` / `kind` struct
tags, with array elements diving into `items`. FieldOverrides overlay
engine addresses paths via JSON Pointer (including `/*` array
wildcard) and runs post-reflect, post-envelope. Lint guards orphan
override paths.
- IM events (events/im): 11 keys — receive / read / recalled, chat and
member lifecycle, reactions — all with per-field open_id / union_id /
user_id / chat_id / message_id / timestamp_ms format annotations.
Robustness
----------
- Bus idle-timer race fix: re-check live conn count under lock before
honoring the tick; Stop+drain before Reset per timer contract.
- Protocol frame cap: replace `br.ReadBytes('\n')` with `ReadFrame` that
rejects frames > MaxFrameBytes (1 MB). Closes a DoS path where any
local peer could grow the reader's buffer unbounded.
- Control-message writes gated by WriteTimeout (5 s) so a wedged peer
kernel buffer can't stall writers indefinitely.
- Consume signal goroutine: `signal.Stop` + `ctx.Done` select, no leak
across repeated invocations in the same process.
- JQ pre-flight compile so bad expressions fail before the bus fork and
any server-side PreConsume side effects.
- `f.NewAPIClient`'s `*core.ConfigError` now passes through unwrapped
so the actionable "run lark-cli config init" hint reaches the user.
Subprocess / AI contract
------------------------
- `event consume` emits `[event] ready event_key=<key>` on stderr once
the bus handshake completes and events will flow. Parent processes
block-read stderr until this line before reading stdout — no `sleep`
fallback needed.
- All list-like commands have `--json` for structured consumption.
- Skill docs in `skills/lark-event/` (SKILL.md + references/) brief AI
agents on the command surface, JQ against Output Schema, bounded
execution, and subprocess lifecycle.
Testing
-------
Unit tests across bus/hub, consume loop, protocol codec, dedup,
registry, transport (Unix + Windows), schema reflection, field
overrides, pointer resolver. Integration tests cover fork startup,
shutdown, orphan detection, probe, stdin EOF, preflight, bounded
execution, and Windows busdiscover PowerShell compatibility.
Change-Id: Ib69d6d8409b33b99790081e273d4b5b01b7dbf80
Stdin-EOF silently terminating `event consume` was the biggest footgun for daemon-style callers (`< /dev/null`, `nohup`, systemd default `StandardInput=null`). They see the process exit immediately with no obvious cause. watchStdinEOF now writes a descriptive stderr line naming the cause and listing the three workarounds (`--max-events` / `--timeout` for bounded runs, keep stdin open via `< /dev/tty` or `< <(tail -f /dev/null)`, stop via SIGTERM instead). - cmd/event/consume.go: watchStdinEOF takes errOut; prints diagnostic before cancel; honors --quiet by passing io.Discard. - cmd/event/consume_stdin_test.go: test signature updated; new TestWatchStdinEOF_DiagnosticMessage pins the message content so it can't rot unnoticed. Docs / repo hygiene: - SKILL.md stdin-EOF note trimmed; `kill -9` warning rewritten to focus on the actual server-side consequence (PreConsume unsubscribe skipped → subscription leak). - lark-event-im.md reference: call out `im.message.receive_v1` as the lone flat shape vs the other 10 envelope-shaped native keys. - Removed obsolete lark-event-subscribe.md (old `+subscribe` docs). - .gitignore: drop stale `span.log` / `.home/` entries (not in use). Change-Id: Iddf26b2c7810be90fa47e15069b2bf733ffe54cd
|
|
📝 WalkthroughWalkthroughIntroduces a comprehensive event-consumption system for real-time Feishu/Lark events with CLI commands to consume, list, and manage events. Includes bus daemon infrastructure, wire protocol definitions, IPC transport layer, schema resolution, event deduplication, and integration with IM event sources. Changes
Sequence Diagram(s)sequenceDiagram
participant User as User / AI
participant CLI as lark-cli event<br/>consume
participant Bus as Event Bus<br/>Daemon
participant Source as Event Source<br/>(Feishu WebSocket)
participant OpenAPI as Feishu Open<br/>APIs
User->>CLI: event consume im.message.receive_v1 --param=...
CLI->>CLI: Validate EventKey, parse params
CLI->>CLI: Check local bus socket
alt Bus not running
CLI->>CLI: Fork bus daemon
CLI->>Bus: (background daemon)
end
CLI->>Bus: Dial IPC socket
CLI->>Bus: Send Hello (EventKey, EventTypes)
Bus->>Bus: Register consumer
Bus->>Source: Start WebSocket source<br/>(if first consumer)
Source->>OpenAPI: Authenticate with AppID/AppSecret
Source->>OpenAPI: WebSocket subscribe to events
Bus->>CLI: Send HelloAck
CLI->>OpenAPI: Preflight checks<br/>(app version, scopes)
CLI->>CLI: Run PreConsume hook (if exists)
CLI->>User: Print [event] ready marker to stderr
Source->>Source: Receive events from Feishu
Source->>Bus: Emit RawEvent
Bus->>Bus: Deduplicate by EventID
Bus->>Bus: Route to registered consumers
Bus->>CLI: Send protocol.Event<br/>(with ProcessFunc output)
CLI->>CLI: Apply jq filter (optional)
CLI->>CLI: Write to stdout/directory
User->>CLI: Signal (SIGTERM) or stdin EOF
CLI->>Bus: Send PreShutdownCheck
Bus->>Bus: Check if CLI is last consumer
Bus->>CLI: Send PreShutdownAck
CLI->>CLI: Run cleanup hook
CLI->>User: Exit with status 0
Bus->>Bus: If last consumer: idle countdown<br/>→ shutdown after 30s
sequenceDiagram
participant Cmd as cmd/event<br/>consume
participant Startup as consume.EnsureBus
participant Probe as probeAndDialBus
participant Transport as IPC Transport
participant Fork as Bus Fork
participant Bus as Event Bus<br/>Daemon
Cmd->>Startup: EnsureBus(ctx, tr, appID, ...)
Startup->>Probe: Try to probe existing bus
Probe->>Transport: tr.Address(appID)
Transport-->>Probe: Unix socket path
Probe->>Transport: tr.Dial(socketPath)
alt Socket exists & responsive
Probe->>Bus: StatusQuery
Bus-->>Probe: StatusResponse
Probe-->>Startup: net.Conn (success)
else No bus or unresponsive
Probe-->>Startup: error
Startup->>Startup: Check remote connections<br/>(if APIClient provided)
Startup->>Fork: forkBus(appID, profileName, domain)
Fork->>Fork: Acquire bus.fork.lock<br/>(per-appID)
Fork->>Fork: Spawn child: lark-cli event _bus
Fork-->>Startup: PID
Startup->>Startup: Announce forked daemon to stderr
Startup->>Startup: Retry dial loop with deadline
Startup->>Probe: Probe again (up to deadline)
Probe->>Transport: tr.Dial(socketPath)
Bus-->>Probe: StatusResponse
Probe-->>Startup: net.Conn (success)
end
Startup-->>Cmd: net.Conn
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes The PR introduces substantial new infrastructure spanning protocol design, concurrent data structures (hub, dedup filter, bus lifecycle), cross-platform IPC transport, event schema reflection and validation, complex CLI orchestration with preflight checks, and event consumption loops with backpressure handling. High heterogeneity across 150+ files, intricate domain logic (deduplication, jq filtering, PreShutdownCheck protocol semantics), and dense interactions between bus, protocol, transport, and CLI layers demand careful, multi-pass review of control flow, concurrency invariants, and error propagation. Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
|
There was a problem hiding this comment.
Actionable comments posted: 11
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/lockfile/lockfile.go (1)
67-75:⚠️ Potential issue | 🔴 CriticalBoth
tryLockFileimplementations unconditionally wrap all errors withErrHeld, masking real failures.The Unix and Windows implementations (lock_unix.go:17 and lock_windows.go:39) use
fmt.Errorf("%w ...", ErrHeld, ...)for anysyscall.FlockorLockFileExfailure, not just lock contention. This contradicts theErrHelddocumentation (lockfile.go:24–26), which states callers rely onerrors.Is(err, ErrHeld)to distinguish benign contention from real failures likeEACCESor missing parent directories. Real failures should surface unwrapped so startup code can handle them appropriately, not be masked as lock contention.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/lockfile/lockfile.go` around lines 67 - 75, The Unix and Windows tryLockFile implementations currently wrap every error from syscall.Flock/LockFileEx with ErrHeld, hiding real failures; update tryLockFile (both implementations referenced as tryLockFile in lock_unix.go and lock_windows.go) to inspect the underlying OS error and only return fmt.Errorf("%w: ...", ErrHeld, ...) when the errno indicates contention (e.g., EWOULDBLOCK/EAGAIN on Unix or the specific lock-contention Windows error codes), otherwise return the original error unwrapped so callers can detect real failures (use errors.Is to check for ErrHeld where needed).
🟡 Minor comments (16)
internal/event/source/source_test.go-28-37 (1)
28-37:⚠️ Potential issue | 🟡 MinorClean up the global registry after the test.
TestRegisterleaves the registered source behind for any later package tests. Add cleanup so test order cannot leak registry state.🧪 Proposed test isolation fix
func TestRegister(t *testing.T) { ResetForTest() + t.Cleanup(ResetForTest) src := &mockSource{name: "test-source"}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/source/source_test.go` around lines 28 - 37, TestRegister currently registers a mockSource and relies on ResetForTest only at start, leaving global registry state for other tests; update the test to ensure registry is cleaned up after the test by invoking ResetForTest in a teardown (preferably via t.Cleanup) so any call to Register in TestRegister is undone—refer to TestRegister, Register, ResetForTest, All and mockSource when adding the cleanup.internal/event/source/source_test.go-49-59 (1)
49-59:⚠️ Potential issue | 🟡 MinorAvoid sleep-based async assertions.
time.Sleep(50ms)makes the test scheduler-dependent. Wait for the expected channel receives with a timeout instead.🧪 Proposed deterministic wait
received := make(chan *event.RawEvent, 10) - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() go src.Start(ctx, nil, func(e *event.RawEvent) { received <- e }, nil) - time.Sleep(50 * time.Millisecond) - if len(received) != 2 { - t.Errorf("expected 2 events, got %d", len(received)) + for i := 0; i < 2; i++ { + select { + case <-received: + case <-time.After(100 * time.Millisecond): + t.Fatalf("expected 2 events, got %d", i) + } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/source/source_test.go` around lines 49 - 59, Replace the sleep-based assertion with a deterministic receive loop: instead of sleeping after starting src.Start, repeatedly read from the received channel (the make(chan *event.RawEvent, 10) named received) until you have collected the two expected events (or until a test timeout triggers) using a select that includes a time.After deadline, and fail the test if the timeout fires; keep the src.Start invocation and use the channel receives to drive the assertion so the test no longer depends on time.Sleep.cmd/event/appmeta_err.go-32-35 (1)
32-35:⚠️ Potential issue | 🟡 MinorTruncate by rune to keep localized errors valid UTF-8.
Line 34 slices by byte, so Chinese or other multibyte error text can be cut mid-rune and produce malformed stderr for agents/users. Prefer rune-aware truncation.
🛠️ Proposed fix
const maxErrLen = 200 - if len(msg) > maxErrLen { - return msg[:maxErrLen] + "…" + runes := []rune(msg) + if len(runes) > maxErrLen { + return string(runes[:maxErrLen]) + "…" }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/appmeta_err.go` around lines 32 - 35, The current truncation slices the string by bytes (using msg[:maxErrLen]) which can split multibyte runes and produce invalid UTF-8; change the truncation to be rune-aware by converting msg to a rune slice (e.g. r := []rune(msg)), then if len(r) > maxErrLen return string(r[:maxErrLen]) + "…" (using the same maxErrLen constant) so localized/multibyte error text remains valid UTF-8; update the code around the const maxErrLen and the truncation branch that references msg to use this rune-based approach.internal/event/bus/log.go-48-53 (1)
48-53:⚠️ Potential issue | 🟡 MinorReturn a closer for
bus.logand close it on daemon exit.The file opened by
SetupBusLoggeris never explicitly closed.log.Loggerdoes not own the file handle, so it remains open until process exit, preventing proper log sync during graceful shutdown. Return the closer alongside the logger and have the command handler defer it during cleanup.♻️ Proposed signature change
-func SetupBusLogger(eventsDir string) (*log.Logger, error) { +func SetupBusLogger(eventsDir string) (*log.Logger, func() error, error) {At the call site in
cmd/event/bus.go, defer the closer:-logger, err := bus.SetupBusLogger(eventsDir) -if err != nil { - return err -} +logger, closer, err := bus.SetupBusLogger(eventsDir) +if err != nil { + return err +} +defer closer()A test must verify that the closer is non-nil and can be invoked.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/bus/log.go` around lines 48 - 53, Modify SetupBusLogger to return the opened file as an io.Closer alongside the *log.Logger and error (e.g., change signature to return (*log.Logger, io.Closer, error)); keep opening the file with vfs.OpenFile as f, return log.New(f, "", log.LstdFlags) and f as the closer; update the caller in the command handler (e.g., where SetupBusLogger is invoked in the bus command) to capture the closer and defer closer.Close() during daemon shutdown; add a unit test that calls SetupBusLogger and asserts the returned closer is non-nil and that calling Close on it succeeds.internal/event/transport/transport_unix.go-46-48 (1)
46-48:⚠️ Potential issue | 🟡 MinorAdd explicit path length validation to prevent AF_UNIX socket path overflow.
The socket path
core.GetConfigDir()/events/<appID>/bus.sockcan exceed the Unix-domain socket limit (~104 bytes on macOS, ~108 on Linux) whenLARKSUITE_CLI_CONFIG_DIRis set to a very long path. WhileBus.Run()does handle the Listen error, this failure mode is difficult to diagnose without explicit validation. Add a length check inAddress()that validates the path before use, returning an error with a clear message if the path exceeds the platform limit, or derive a short socket path from a hash of the config directory and app ID.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/transport/transport_unix.go` around lines 46 - 48, The Address() method in unixTransport must validate the full socket path length built from core.GetConfigDir(), "events", sanitizeAppID(appID), and "bus.sock" to avoid AF_UNIX path overflow; update unixTransport.Address to either return (string, error) or otherwise ensure callers handle errors, compute the candidate path, check its byte length against a platform limit constant (e.g. ~104/108), and if it exceeds the limit either produce a deterministic short path (e.g. use a hash of core.GetConfigDir()+appID and place the socket in a short base dir) or return a descriptive error (used by Bus.Run()) explaining the path is too long and suggesting using a shorter config dir; reference unixTransport.Address, sanitizeAppID, core.GetConfigDir and ensure Bus.Run or its caller handles the new error return.internal/event/consume/bounded_test.go-67-72 (1)
67-72:⚠️ Potential issue | 🟡 MinorWait for the timeout instead of sleeping.
This test can be made deterministic by blocking on
ctx.Done(); sleeping past the deadline still depends on timer scheduling under load.🧪 Proposed flake-resistant timeout wait
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) defer cancel() - time.Sleep(5 * time.Millisecond) + select { + case <-ctx.Done(): + case <-time.After(100 * time.Millisecond): + t.Fatal("context did not time out") + } opts := Options{MaxEvents: 5, Timeout: 1 * time.Millisecond}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/consume/bounded_test.go` around lines 67 - 72, The test currently sleeps past the deadline which is flaky; replace the time.Sleep(5 * time.Millisecond) with a deterministic wait on the context by blocking on <-ctx.Done() so the goroutine waits exactly until the timeout/cancellation; keep ctx, cancel and Options as-is and then call exitReason(ctx, 0, opts) to compute the reason. Ensure you still defer cancel() and do not remove the context timeout setup in bounded_test.go (use the existing ctx variable and Options{MaxEvents: 5, Timeout: 1 * time.Millisecond}).internal/event/transport/transport_test.go-88-89 (1)
88-89:⚠️ Potential issue | 🟡 MinorCheck
Listenbefore closing the listener.Ignoring the error can turn a listen failure into a nil-pointer panic and hide the useful diagnostic.
🧪 Proposed error check
- ln, _ := tr.Listen(addr) - ln.Close() + ln, err := tr.Listen(addr) + if err != nil { + t.Fatalf("listen: %v", err) + } + if err := ln.Close(); err != nil { + t.Fatalf("close listener: %v", err) + } tr.Cleanup(addr)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/transport/transport_test.go` around lines 88 - 89, The test calls tr.Listen(addr) but ignores its error and immediately calls ln.Close(), risking a nil-pointer panic; update the test to capture and check the error returned by tr.Listen (e.g., ln, err := tr.Listen(addr)), fail the test (t.Fatalf or t.Fatalf-like helper) if err != nil, and only call ln.Close() when ln is non-nil (or defer ln.Close() after the successful listen) so failures are reported clearly instead of causing a panic.internal/event/source/feishu_log_test.go-52-56 (1)
52-56:⚠️ Potential issue | 🟡 MinorAssert handler errors in these tests.
Several calls ignore the returned error, so a future regression could return an error without failing the test. Please check
errconsistently like the malformed JSON test does.🧪 Proposed error checks
// Missing event_id req := &larkevent.EventReq{Body: []byte(`{"header":{"event_type":"im.receive"}}`)} - handler(context.Background(), req) + if err := handler(context.Background(), req); err != nil { + t.Fatalf("handler returned err for missing event_id: %v", err) + } // Missing event_type req2 := &larkevent.EventReq{Body: []byte(`{"header":{"event_id":"abc"}}`)} - handler(context.Background(), req2) + if err := handler(context.Background(), req2); err != nil { + t.Fatalf("handler returned err for missing event_type: %v", err) + }req := &larkevent.EventReq{Body: nil} - handler(context.Background(), req) + if err := handler(context.Background(), req); err != nil { + t.Fatalf("handler returned err: %v", err) + }body := []byte(`{"header":{"event_id":"evt-42","event_type":"im.message.receive_v1","create_time":"1700000000000"}}`) - handler(context.Background(), &larkevent.EventReq{Body: body}) + if err := handler(context.Background(), &larkevent.EventReq{Body: body}); err != nil { + t.Fatalf("handler returned err: %v", err) + }s := &FeishuSource{Logger: nil} // no logger handler := s.buildRawHandler(func(_ *event.RawEvent) {}) - handler(context.Background(), &larkevent.EventReq{Body: []byte("bad json")}) + if err := handler(context.Background(), &larkevent.EventReq{Body: []byte("bad json")}); err != nil { + t.Fatalf("handler returned err: %v", err) + }Also applies to: 76-77, 95-96, 119-120
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/source/feishu_log_test.go` around lines 52 - 56, The tests call handler(context.Background(), req) without checking its returned error; update those calls (including the ones at the shown block and the other occurrences around lines 76-77, 95-96, and 119-120) to capture the returned err (e.g., err := handler(...)) and assert it using the same pattern as the malformed JSON test (e.g., require.Error/require.NotNil or t.Fatalf/t.Errorf) so any unexpected error will fail the test; target the handler symbol and each corresponding req/req2 invocation when making the changes.internal/event/transport/transport_test.go-35-50 (1)
35-50:⚠️ Potential issue | 🟡 MinorPropagate accept errors to avoid a hung test.
If
Acceptfails, the goroutine exits without sending and Line 49 blocks until the global test timeout. Send the accept result and use a bounded wait.🧪 Proposed accept result handling
- accepted := make(chan net.Conn, 1) + type acceptResult struct { + conn net.Conn + err error + } + accepted := make(chan acceptResult, 1) go func() { conn, err := ln.Accept() - if err == nil { - accepted <- conn - } + accepted <- acceptResult{conn: conn, err: err} }() @@ - serverConn := <-accepted + var res acceptResult + select { + case res = <-accepted: + case <-time.After(2 * time.Second): + t.Fatal("accept timed out") + } + if res.err != nil { + t.Fatalf("accept: %v", res.err) + } + serverConn := res.conn defer serverConn.Close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/transport/transport_test.go` around lines 35 - 50, The accept goroutine can exit on error and leave the test blocked; change it to send the accept outcome (conn and err) over a result channel (replace accepted chan net.Conn with a chan struct{conn net.Conn; err error}) so the main test receives both success and failure instead of blocking; after tr.Dial, wait on that result with a bounded timeout (e.g., select with time.After) and fail the test if accept returned an error or the timeout fires; update uses of accepted/ serverConn to handle the result struct accordingly (references: the accept goroutine calling ln.Accept, the accepted channel, and the serverConn usage).internal/event/schemas/pointer.go-37-62 (1)
37-62:⚠️ Potential issue | 🟡 MinorDecode JSON Pointer escape sequences per RFC 6901.
Override keys are described as RFC 6901 JSON Pointer paths, but path tokens are matched without decoding
~1→/and~0→~. Valid pointers for fields containing/or~will silently miss. Add a decode function and apply it during token resolution.🐛 Proposed token decoding
import "strings" + +func decodePointerToken(token string) string { + // RFC 6901 requires ~1 to be decoded before ~0. + return strings.NewReplacer("~1", "/", "~0", "~").Replace(token) +} @@ for _, part := range parts { + part = decodePointerToken(part) next := []map[string]interface{}{}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/schemas/pointer.go` around lines 37 - 62, The pointer token matching must decode RFC6901 escape sequences so tokens like "~1" and "~0" are interpreted as "/" and "~"; add a helper decode function (e.g., decodeJSONPointerToken) implementing "~1"→"/" and "~0"→"~" and call it at the start of the loop that iterates over parts (the for _, part := range parts loop) before any comparisons (including the part == "*" check) or map lookups (properties[part], items handling). Ensure you use the decoded token for all subsequent operations (next item selection, props lookup, child extraction) so fields containing '/' or '~' are matched correctly.internal/event/consume/jq.go-43-56 (1)
43-56:⚠️ Potential issue | 🟡 MinorReject multi-output jq expressions at compile time for clarity.
applyJQis designed as an event filter where each event produces zero or one output (nil for filtered-out events, json.RawMessage for passing events). Whilegojqsupports multi-output expressions like.items[], the function's single-return signature enforces single-output semantics. To prevent silent data loss when users mistakenly write such expressions, add validation inCompileJQthat rejects any expression that produces multiple outputs, with a clear error message guiding them to use a filter that returns a single value.Suggested validation approach
func CompileJQ(expr string) (*gojq.Code, error) { query, err := gojq.Parse(expr) if err != nil { return nil, fmt.Errorf("invalid jq expression: %w", err) } code, err := gojq.Compile(query) if err != nil { return nil, fmt.Errorf("jq compile error: %w", err) } + // Validate single-output semantics by test-running on empty input. + // If the expression produces multiple outputs (e.g. `.items[]`), + // reject it early rather than silently truncating at runtime. + iter := code.Run(map[string]interface{}{}) + if _, ok := iter.Next(); ok { + if _, ok := iter.Next(); ok { + return nil, fmt.Errorf("jq expression produces multiple outputs; use a filter that returns a single value (e.g., wrap in [...])") + } + } return code, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/consume/jq.go` around lines 43 - 56, CompileJQ must reject queries that can produce multiple outputs: after compiling the query (the value returned by CompileJQ), run the compiled code with a placeholder input using code.Run(nil) and inspect the iterator (iter.Next()); if a first value exists and a second iter.Next() also returns ok==true then return a clear error from CompileJQ stating the jq expression may produce multiple outputs and must return a single value (this prevents applyJQ's single-return contract from silently dropping results). Ensure the error message references the problematic expression and that applyJQ continues to assume single-output behavior.internal/event/busdiscover/busdiscover.go-40-61 (1)
40-61:⚠️ Potential issue | 🟡 MinorRegex should accept both
--profile=and--profileflag forms.
appIDPatternrequires\s+between--profileand the value. The currentbuildForkArgsininternal/event/consume/startup.goalways builds the argv as separate array elements (["--profile", profileName]), which produces the space-separated form in the cmdline. However, the regex defensively should accept both--profile=cli_xand--profile cli_xforms. If the fork invocation syntax ever changes or a developer manually invokes the hidden daemon with the=form, the process would pass the two substring gates but fail the regex match, silently breaking orphan detection for that process.🛠️ Proposed fix
-var appIDPattern = regexp.MustCompile(`--profile\s+(cli_[a-zA-Z0-9_]+)`) +var appIDPattern = regexp.MustCompile(`--profile[=\s]+(cli_[a-zA-Z0-9_]+)`)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/busdiscover/busdiscover.go` around lines 40 - 61, The appIDPattern in this file only matches a space between the --profile flag and the value, so parseAppIDFromCmdline can fail when the flag is passed as --profile=cli_x; update the regex used by appIDPattern to accept either an equals sign or whitespace between --profile and the AppID (e.g., use a non-capturing group like (?:=|\s+) before the capture group for cli_...), then keep parseAppIDFromCmdline as-is so it will correctly extract the AppID for both "--profile cli_xyz" and "--profile=cli_xyz" forms.internal/event/schemas/fromtype.go-163-185 (1)
163-185:⚠️ Potential issue | 🟡 MinorAnonymous-field flattening diverges from
encoding/jsonwhen the embed has a JSON name tag.
encoding/jsontreats an anonymous struct field with an explicit JSON name tag (e.g.Inner Inner `json:"inner"`) as a named field holding a nested object — not as a promotion/flatten.collectFieldscurrently flattens all anonymous struct fields unconditionally, so the generated schema will haveInner's fields promoted at the parent level while the actual JSON wire form nests them under"inner". For SDK event structs this is the exact case where schemas silently lie to AI agents.Additionally, the package doc at Lines 12-14 says "embedded anonymous structs… are skipped", which matches neither the flattening code here nor the tag-aware path
encoding/jsonimplements. Please align the doc with whichever behavior lands.🧰 Suggested handling
- // Anonymous embed: recurse into its fields (after unwrapping pointer). - // This must come before the exported check because reflect reports - // an anonymous field of a lowercase type as unexported, yet its - // exported fields still promote through encoding/json. - if f.Anonymous { + // Anonymous embed: encoding/json promotes the inner fields ONLY when + // there's no explicit json name tag; a tagged anonymous field is a + // normal named field carrying a nested object. Mirror that here so + // the schema matches the wire shape. + if f.Anonymous && parseJSONTag(f) == f.Name { embedded := f.Type for embedded.Kind() == reflect.Ptr { embedded = embedded.Elem() } if embedded.Kind() == reflect.Struct { collectFields(embedded, props, visiting, cache) } continue }Note:
parseJSONTagreturnsf.Namewhen there's nojsontag at all, which is the "untagged embed" signal we want to key on.cmd/event/status.go-349-358 (1)
349-358:⚠️ Potential issue | 🟡 Minor
--fail-on-orphanexits with no actionable error body; preferoutput.ErrWithHint.
output.ErrBare(ExitValidation)sets the exit code but carries no message or hint. For scripted/--jsonconsumers and AI agents, the failure comes through as exit 2 with nothing to act on — they'd have to re-parse the preceding JSON to find which AppID is orphaned and what to run. Sincestatusesalready has the orphan PIDs in hand, it's cheap to surface the suggested action directly in the error.As per coding guidelines in
cmd/**/*.go: "Useoutput.Errorforoutput.ErrWithHintinRunEfunctions instead of barefmt.Errorfto ensure structured errors that AI agents can parse" and "Make error messages structured, actionable, and specific for AI agent parsing".🧰 Suggested shape
func exitForOrphan(statuses []appStatus, failOnOrphan bool) error { if !failOnOrphan { return nil } - for _, s := range statuses { - if s.State == stateOrphan { - return output.ErrBare(output.ExitValidation) - } - } - return nil + var orphans []appStatus + for _, s := range statuses { + if s.State == stateOrphan { + orphans = append(orphans, s) + } + } + if len(orphans) == 0 { + return nil + } + ids := make([]string, 0, len(orphans)) + kills := make([]string, 0, len(orphans)) + for _, s := range orphans { + ids = append(ids, s.AppID) + kills = append(kills, fmt.Sprintf("kill %d", s.PID)) + } + return output.ErrWithHint( + output.ExitValidation, + fmt.Sprintf("orphan bus detected for %d app(s): %s", len(orphans), strings.Join(ids, ", ")), + fmt.Sprintf("Run: %s", strings.Join(kills, " && ")), + ) }(Exact
output.ErrWithHintsignature may differ — adjust to match the helper used elsewhere incmd/event/*.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/status.go` around lines 349 - 358, The exitForOrphan function currently returns output.ErrBare when any appStatus has State == stateOrphan; change this to return a structured error via output.ErrWithHint (or output.Errorf if ErrWithHint not available) that includes the orphan AppID(s) extracted from the statuses slice and a concrete next step hint (e.g., command to reconcile or inspect the orphaned app), so callers of exitForOrphan receive a parseable message and actionable hint; update the return path in exitForOrphan to build a message listing the orphan IDs and pass the suggested action into ErrWithHint/Errorf while leaving behavior when failOnOrphan is false unchanged.internal/event/protocol/codec.go-62-85 (1)
62-85:⚠️ Potential issue | 🟡 Minor
ReadFrameAPI has asymmetric lifetime guarantees — fix or document the contract.The fast path (
err == nil && len(buf) == 0) returns a slice directly frombufio.Reader's internal buffer, which is invalid after the nextReadFramecall. The slow path returns a fresh allocation viaappend. Verification confirms all five current call sites (handshake.go:27,conn.go:138,startup.go:128,busctl.go:42,bus.go:256) decode and discard the bytes immediately, so this is latent. However, the inconsistent contract invites misuse by future callers who might batch frames or pass bytes to concurrent code. Either explicitly document the "valid until nextReadFrame" guarantee in the doc-comment, or always return a copy for uniform semantics.🛡️ Suggested uniform-copy fix
case nil: - if len(buf) == 0 { - // Fast path: whole frame fit in br's internal buffer. - return chunk, nil - } - if len(buf)+len(chunk) > MaxFrameBytes { + if len(buf)+len(chunk) > MaxFrameBytes { return nil, fmt.Errorf("protocol: frame exceeds %d bytes", MaxFrameBytes) } - return append(buf, chunk...), nil + // Always copy: ReadSlice's returned slice is only valid until the + // next read, but consumers (e.g. Decode stashing the bytes) may + // outlive that window. + out := make([]byte, len(buf)+len(chunk)) + copy(out, buf) + copy(out[len(buf):], chunk) + return out, nil🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/protocol/codec.go` around lines 62 - 85, ReadFrame currently returns a slice directly into bufio.Reader's internal buffer on the fast path (err == nil && len(buf) == 0), which makes the returned bytes invalid after the next ReadFrame call; change the fast path to return a copy instead so the API uniformly hands callers an independent []byte (e.g. allocate a new slice and copy chunk) and keep existing overflow checks using MaxFrameBytes; update ReadFrame's implementation (function ReadFrame, symbol MaxFrameBytes) to always return a freshly allocated slice to provide stable lifetime semantics for callers.internal/event/consume/consume.go-179-202 (1)
179-202:⚠️ Potential issue | 🟡 Minor
validateParamspanics whenparamsis nil and a key defines defaults.Line 182 writes into
paramsunconditionally, but nil maps panic on assignment. A library caller (or future refactor) that passesnilfor a key whose definition has anyDefault != ""param will crash here instead of getting a clean error. The cmd layer happens to pass an initialized map today, but this is a fragile contract for an exported helper.🛡️ Proposed guard
func validateParams(def *event.KeyDefinition, params map[string]string) error { + // Callers may legitimately pass nil when no --param flags were given. + // We only mutate on default-fill, so lazily allocate. for _, p := range def.Params { if _, ok := params[p.Name]; !ok && p.Default != "" { + if params == nil { + // Note: this only fills locally; caller's nil reference stays nil. + // If that's undesirable, require *map[string]string instead. + params = map[string]string{} + } params[p.Name] = p.Default } }Alternatively, document that
paramsmust be non-nil and validate that up front with a clear error.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/consume/consume.go` around lines 179 - 202, validateParams currently writes into the params map and will panic if callers pass nil; add an explicit nil guard at the top of validateParams (func validateParams(def *event.KeyDefinition, params map[string]string) error) that returns a clear error like "params cannot be nil" (or similar) so callers get a controlled error instead of a panic; reference validateParams, params and event.KeyDefinition when locating the change.
🧹 Nitpick comments (21)
.gitignore (1)
38-38: Add trailing newline at end of file.The file is missing a trailing newline, which may cause Git warnings and violates POSIX convention for text files.
📝 Proposed fix
-app.log +app.log🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.gitignore at line 38, Add a single trailing newline to the end of the .gitignore file so the file ends with a newline character (e.g., after the "app.log" entry); this satisfies POSIX text file conventions and removes Git warnings about missing end-of-file newline.internal/event/consume/sink_test.go (1)
108-137: LGTM; minor: also assert ns/seq segments.Good coverage. Consider tightening
TestDirSink_FilenameFormatto also verify thatparts[0](ns) andparts[2](seq, after stripping.json) are numeric, so a refactor that swaps the segment order or stringifies a non-numeric ns still trips the test.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/consume/sink_test.go` around lines 108 - 137, TestDirSink_FilenameFormat currently only asserts the PID segment; update the test to also validate that parts[0] (ns) and parts[2] (seq) are numeric: after splitting the trimmed filename in TestDirSink_FilenameFormat, check that parts[0] parses as an integer (e.g., via strconv.Atoi) and that parts[2] parses as an integer (ensure you strip ".json" already done), and fail the test with a clear message if either parse returns an error so swaps or non-numeric segments are caught.internal/event/consume/loop_seq_test.go (1)
16-38: Drift risk: test harness duplicates production logic instead of exercising it.
seqGapDetectoris a hand-written copy of the inline gap-detection block insideconsumeLoop's reader goroutine. The comment acknowledges this ("If this logic changes in loop.go, update here too"), but that means these tests can stay green while the real code regresses (or vice-versa). Prefer extracting the production logic into a small package-private type/function (e.g.seqGapDetectorinloop.go) and calling it from both the reader goroutine and these tests — then remove the duplicate. This is the exact scenario the regression test at lines 78-87 is trying to prevent, so wiring it to the real code is especially valuable here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/consume/loop_seq_test.go` around lines 16 - 38, The test duplicates the gap-detection logic; extract that logic into a single package-private seqGapDetector type (or observeSeqGap function) in loop.go and have the consumeLoop reader goroutine call that implementation, then update loop_seq_test.go to construct/use that package-private seqGapDetector/observeSeqGap (instead of its local copy) so tests exercise the real production logic (keep the same fields/method names: seqGapDetector, observe, lastSeq, quiet, errOut and ensure consumeLoop uses the new shared implementation).shortcuts/event/subscribe.go (1)
88-92: LGTM — deprecation path is appropriate.Hiding the superseded
event +subscribewhile keeping it executable is a clean migration approach. Consider also adding aDEPRECATEDline toDescription(surfaces in any tooling that lists shortcuts directly) and/or a one-line stderr notice inExecutepointing users atevent consume.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@shortcuts/event/subscribe.go` around lines 88 - 92, Add a clear deprecation notice to the shortcut so users and tooling see it: update the Description field in subscribe.go (the shortcut for "event +subscribe") to prepend or append a "DEPRECATED: use `event consume`" line, and in the Execute method emit a single-line stderr notice (e.g., via fmt.Fprintln(os.Stderr, ...)) that informs users the command is deprecated and points them to "event consume"; keep the existing Hidden: true behavior and ensure the message references the shortcut name ("event +subscribe") and the replacement ("event consume").cmd/event/table.go (1)
15-44: Non-ASCII event descriptions will misalign columns.Event descriptions like
"接收 IM 消息…"in the registry use non-ASCII characters. The currenttableWidthsandprintTableRowuse byte-length calculations (len()), which measure CJK characters as 3 bytes but display as 2 columns. This causes padding to be computed wrong and columns to misalign.Since
github.com/mattn/go-runewidthis already a dependency (go.mod line 51), userunewidth.StringWidth()for both width calculation and adjustprintTableRowto compute padding manually since Go'sfmt.Fprintf("%-*s", ...)always pads by byte count, not display width:Switch to runewidth
import ( "fmt" "io" + "strings" + + "github.com/mattn/go-runewidth" ) @@ - for i, h := range headers { - widths[i] = len(h) - } + for i, h := range headers { + widths[i] = runewidth.StringWidth(h) + } @@ - if l := len(cell); l > widths[i] { - widths[i] = l - } + if l := runewidth.StringWidth(cell); l > widths[i] { + widths[i] = l + } @@ - fmt.Fprintf(out, "%-*s%s", widths[i], cell, gap) + pad := widths[i] - runewidth.StringWidth(cell) + if pad < 0 { + pad = 0 + } + fmt.Fprint(out, cell, strings.Repeat(" ", pad), gap)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/table.go` around lines 15 - 44, tableWidths currently uses len() so CJK/multibyte text miscomputes column widths; replace len(h) and len(cell) with runewidth.StringWidth(...) to compute display widths in tableWidths. In printTableRow stop using fmt.Fprintf with "%-*s" (which pads by bytes) and instead for each non-final cell compute pad := widths[i] - runewidth.StringWidth(cell) then write cell followed by strings.Repeat(" ", max(0,pad)) and gap to out; keep the final-cell behavior (fmt.Fprintln(out, cell)). Add necessary imports (github.com/mattn/go-runewidth and strings) and ensure padding never goes negative.internal/event/protocol/codec_test.go (2)
62-69: Check theEncodeerror and guard against empty buffer.
Encode's returned error is silently dropped; if encoding ever fails the next line indexesbuf.Bytes()[buf.Len()-1]which would panic with an unhelpful message rather than a clear test failure.Proposed tweak
func TestEncodeAddsNewline(t *testing.T) { msg := &Bye{Type: MsgTypeBye} var buf bytes.Buffer - Encode(&buf, msg) - if buf.Bytes()[buf.Len()-1] != '\n' { + if err := Encode(&buf, msg); err != nil { + t.Fatalf("encode: %v", err) + } + if buf.Len() == 0 || buf.Bytes()[buf.Len()-1] != '\n' { t.Error("encoded message should end with newline") } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/protocol/codec_test.go` around lines 62 - 69, The test TestEncodeAddsNewline currently discards Encode's error and directly indexes buf, which can panic on encoding failure or empty output; update the test to check the error returned by Encode(msg) and call t.Fatalf/t.Fatalf-like failure if non-nil, then guard the buffer access by asserting buf.Len() > 0 (and failing the test if it's zero) before inspecting buf.Bytes()[buf.Len()-1]; reference Encode, msg/Bye and buf in the changes.
9-34: Hello test uses an out-of-scopemail.*event key.This PR is scoped to IM events per the objectives; using
mail.user_mailbox.event.message_received_v1here is harmless for the codec (it's treated as an opaque string) but misleading to readers. Consider aligning with an IM key (e.g.im.message.receive_v1) for consistency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/protocol/codec_test.go` around lines 9 - 34, The test TestEncodeDecodeHello uses a mail event key which is out of scope for this PR; update the Hello test instance (in TestEncodeDecodeHello) to use an IM-scoped event key and types (e.g. replace "mail.user_mailbox.event.message_received_v1" in the Hello.EventKey and Hello.EventTypes with an IM key such as "im.message.receive_v1") so the test data aligns with the PR objectives while leaving Encode, Decode and Hello unchanged.internal/event/schemas/pointer_test.go (1)
20-111: Add coverage for escaped JSON Pointer tokens.These tests cover traversal shape well, but FieldOverrides are described as JSON Pointer keys; add a regression for
~1and~0so slash/tilde field names don’t silently break.
As per coding guidelines,**/*.go: Every behavior change must have an accompanying test.🧪 Suggested additional test
+func TestResolvePointer_EscapedTokens(t *testing.T) { + schema := parseSchema(t, `{ + "type":"object", + "properties":{ + "a/b":{"type":"string"}, + "tilde~key":{"type":"number"} + } + }`) + + if nodes := ResolvePointer(schema, "/a~1b"); len(nodes) != 1 || nodes[0]["type"] != "string" { + t.Fatalf("want escaped slash property, got %v", nodes) + } + if nodes := ResolvePointer(schema, "/tilde~0key"); len(nodes) != 1 || nodes[0]["type"] != "number" { + t.Fatalf("want escaped tilde property, got %v", nodes) + } +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/schemas/pointer_test.go` around lines 20 - 111, Add a unit test to pointer_test.go to cover JSON Pointer escaped tokens (~1 for '/' and ~0 for '~') so ResolvePointer correctly decodes and resolves field names containing slashes and tildes; locate tests around TestResolvePointer_* and add a new test (e.g., TestResolvePointer_EscapedTokens) that builds a schema with property names containing "/" and "~" (encoded in the pointer as "~1" and "~0"), calls ResolvePointer with the escaped pointer, and asserts it returns the expected node(s) (checking length and type/value) to prevent regressions in pointer decoding.cmd/event/bus.go (1)
35-35: Return structuredRunEerrors for agent parsing.The hidden bus is part of the subprocess contract, so config, logger, and runtime failures should be wrapped with
output.Errorforoutput.ErrWithHintand include stable context/action hints instead of returning raw errors.
As per coding guidelines,cmd/**/*.go: Useoutput.Errorforoutput.ErrWithHintinRunEfunctions instead of barefmt.Errorfto ensure structured errors that AI agents can parse.Also applies to: 42-42, 62-62
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/bus.go` at line 35, Replace bare error returns in the RunE handlers for the hidden bus with structured output errors: instead of returning err (and similar returns at the other occurrences), wrap the error using output.Errorf or output.ErrWithHint and include a stable context/action hint (e.g., "starting hidden bus", "loading bus config", "initializing runtime") along with the original err to preserve details; update the return sites that currently do "return err" (including the occurrences around lines shown) to return output.Errorf("hidden bus: %s: %v", "<action hint>", err) or use output.ErrWithHint(err, "<action hint>", "<suggested fix>") so AI agents can parse structured errors.cmd/event/consume_test.go (1)
112-120: Add traversal bypass cases for output-dir sanitization.The unsafe-path coverage should include bare and nested
..forms so a prefix-only rejection can’t regress path safety. As per coding guidelines, “Every behavior change must have an accompanying test”.🧪 Proposed additional cases
{ name: "parent escape rejected", in: "../outside", wantSentry: errOutputDirUnsafe, }, + { + name: "bare parent rejected", + in: "..", + wantSentry: errOutputDirUnsafe, + }, + { + name: "nested parent escape rejected", + in: "events/../../outside", + wantSentry: errOutputDirUnsafe, + }, { name: "absolute path rejected", in: "/tmp/events", wantSentry: errOutputDirUnsafe,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/consume_test.go` around lines 112 - 120, Add two table-driven test cases to the existing slice in cmd/event/consume_test.go (the same table that contains cases with fields name, in, wantSentry) to cover traversal bypass: one with name "bare parent rejected" and in set to "..", and one with name "nested parent rejected" and in set to "sub/dir/../.."(or similar nested ../ form); both should expect wantSentry to be errOutputDirUnsafe so the output-dir sanitization rejects bare and nested parent traversals and prevents prefix-only matches.cmd/event/runtime.go (1)
34-43: EmptyContent-Typeon 4xx/5xx falls through to JSON parsing.When a gateway returns a non-JSON error response with no
Content-Typeheader (not uncommon for minimal reverse-proxy errors), thect != ""guard sends it downParseJSONResponse, producing a less actionable "decode" error instead of the niceapi METHOD PATH returned N: <body>message. Consider treating empty Content-Type on HTTP error the same as non-JSON.Also,
bodylocally shadows thebody interface{}parameter — a rename (e.g.,snippet) would improve readability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/runtime.go` around lines 34 - 43, The error branch should treat empty Content-Type as non-JSON and avoid shadowing the function's body parameter: change the guard to if resp.StatusCode >= 400 && !client.IsJSONContentType(ct) (remove the ct != "" check) so responses with no Content-Type hit the friendly error path, and rename the local variable currently declared as body (which shadows the body interface{} parameter) to snippet (or similar) when creating the preview from resp.RawBody before returning fmt.Errorf("api %s %s returned %d: %s", method, path, resp.StatusCode, snippet).internal/event/consume/remote_preflight.go (1)
43-61: Control-char handling doesn't match the comment.The comment says "Collapse control chars to spaces" but only
\n,\r,\tare replaced — other C0 control bytes (0x00-0x1F, 0x7F) still pass through, and so would a stray\r\nambiguity in log forging. Either broaden the check or soften the comment.Proposed tightening
- for _, r := range s { - if r == '\n' || r == '\r' || r == '\t' { - out = append(out, ' ') - continue - } - out = append(out, string(r)...) - } + for _, r := range s { + if r < 0x20 || r == 0x7f { + out = append(out, ' ') + continue + } + out = utf8.AppendRune(out, r) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/consume/remote_preflight.go` around lines 43 - 61, truncateForError currently only collapses '\n', '\r', '\t' but lets other C0 control bytes through; change the control-char handling in truncateForError to map all control characters to a single space (e.g. use unicode.IsControl(r) or check r < 0x20 || r == 0x7F) instead of only those three, and when emitting non-control runes append their UTF-8 bytes correctly (avoid append(out, string(r)...) inefficiency) so the output cannot contain stray control bytes that could forge logs.cmd/event/suggestions.go (1)
32-48: Use a stable sort so suggestions are deterministic across runs.Every substring match gets
dist == 0, so when more thanmaxSuggestionskeys substring-match the input (e.g.im.messageagainst the fullim.message.*set),sort.Slicemay return a different 3 out of N on each invocation. That makes"did you mean ..."output flap and, in principle, could produce flaky behavior in tests that do stricter ordering checks than the ones in this PR.♻️ Proposed fix
- sort.Slice(hits, func(i, j int) bool { return hits[i].dist < hits[j].dist }) + sort.SliceStable(hits, func(i, j int) bool { return hits[i].dist < hits[j].dist })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/suggestions.go` around lines 32 - 48, Sort currently uses sort.Slice causing nondeterministic order when distances tie (many substring matches with dist==0); change to a stable, deterministic sort by replacing the comparator with a stable ordering on (dist, key) — e.g. use sort.SliceStable (or sort.Slice with a comparator that first compares hits[i].dist vs hits[j].dist and, if equal, compares hits[i].key vs hits[j].key) for the hits []match slice so selecting the top n (maxSuggestions) is deterministic; update the sort call in suggestions.go around the hits sort to use that stable tie-breaker.internal/event/dedup.go (1)
42-83: Dual-layer dedup algorithm is correct; one minor efficiency quirk worth a one-liner comment.Traced the TTL-map + ring-buffer logic against the tests in
dedup_test.go; the invariants hold. One minor observation that doesn't affect correctness: whenseen[eventID]exists but is TTL-expired, you delete it and fall through to the re-record path without scanning the ring. At line 71, ifd.ring[d.pos]happens to equaleventID(same event happens to sit at the current rotating slot), line 72'sdelete(d.seen, old)will undo theseen[eventID] = nowwrite from line 68. SubsequentIsDuplicate(eventID)still returnstruevia the ring-scan fallback, so it's not a correctness bug — just an edge case where the next lookup pays the O(ring) fallback cost instead of the O(1) map hit. Likelihood is ~1/ringSize per expired re-insert, so with default ringSize=10000 this is negligible. Safe to defer, but a short// Note:comment here would save a future reader the trace.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/dedup.go` around lines 42 - 83, In DedupFilter.IsDuplicate, add a one-line comment near the eviction block (the code that does "if old := d.ring[d.pos]; old != "" { delete(d.seen, old) }") explaining the edge-case where an event whose TTL just expired can be re-recorded into seen but then immediately evicted if d.ring[d.pos] == eventID, causing the next lookup to hit the ring fallback instead of the O(1) map lookup; reference the DedupFilter.IsDuplicate method and the seen, ring and pos fields so future readers understand this benign efficiency quirk.internal/event/integration_test.go (1)
253-263: Silently droppingprotocol.Encode/Decodeerrors hides root cause when a test fails.In
connectConsumer(and similarly at Lines 352/374)protocol.Encode(conn, hello)andprotocol.Decode(sc.Bytes())return values are discarded. If the Hello write fails or the HelloAck is malformed, the test will fail later atsc.Scan()/ the type assertion with a symptom (no hello_ack/expected HelloAck) that points away from the actual cause. A quickif err != nil { t.Fatalf(...) }here will make future regressions diagnose themselves. Same pattern applies to Line 352 inTestIntegration_DedupFilter.🧰 Proposed fix
- protocol.Encode(conn, hello) + if err := protocol.Encode(conn, hello); err != nil { + t.Fatalf("%s: encode hello: %v", name, err) + } sc := bufio.NewScanner(conn) conn.SetReadDeadline(time.Now().Add(3 * time.Second)) if !sc.Scan() { t.Fatalf("%s: no hello_ack", name) } - msg, _ := protocol.Decode(sc.Bytes()) + msg, err := protocol.Decode(sc.Bytes()) + if err != nil { + t.Fatalf("%s: decode hello_ack: %v", name, err) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/integration_test.go` around lines 253 - 263, In connectConsumer, capture and handle errors returned by protocol.Encode(conn, hello) and protocol.Decode(sc.Bytes()) instead of discarding them: check each error and call t.Fatalf with a descriptive message and the error (e.g., "protocol.Encode hello failed: %v" / "protocol.Decode hello_ack failed: %v"); apply the same pattern in the other test locations mentioned (TestIntegration_DedupFilter) so any write/parse failures are reported immediately rather than surfacing later as "no hello_ack" or wrong type errors.internal/appmeta/app_version.go (1)
52-60: Defensive:url.PathEscape(appID)before interpolation.Feishu AppIDs are tightly constrained (
cli_[a-z0-9]+), so this isn't an active bug, but the path is built by string formatting without escaping — a future caller who feeds this from less-trusted input (copy-pasted values, legacy configs) could silently produce a malformed URL or unintended query-string injection (appIDcontaining?or/).url.PathEscapeis cheap insurance and keeps the failure mode as "404 from the API" rather than "unexpected query reshape".🧰 Suggested tweak
- path := fmt.Sprintf( - "/open-apis/application/v6/applications/%s/app_versions?lang=zh_cn&page_size=2", - appID, - ) + path := fmt.Sprintf( + "/open-apis/application/v6/applications/%s/app_versions?lang=zh_cn&page_size=2", + url.PathEscape(appID), + )(Requires adding
"net/url"to the import block.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/appmeta/app_version.go` around lines 52 - 60, The path is built by string formatting in FetchCurrentPublished without escaping appID, which can allow malformed URLs or query injection; update FetchCurrentPublished to call url.PathEscape(appID) when interpolating appID into the path (and add "net/url" to the imports) so the constructed path uses the escaped value and preserves safe behavior for untrusted or malformed appID inputs.cmd/event/schema.go (1)
175-178: Useoutput.ErrorfinRunEfor structured AI-parsable errors.
runSchemais invoked from a CobraRunE, so a barefmt.Errorfhere bypasses the structured error envelope AI agents rely on. Path is only reachable on an internal marshal/unmarshal failure of schemas we just produced, so it's very unlikely to fire — still, easy to align with the rest of the command.♻️ Proposed tweak
- resolved, _, err := resolveSchemaJSON(def) - if err != nil { - return fmt.Errorf("resolve schema: %w", err) - } + resolved, _, err := resolveSchemaJSON(def) + if err != nil { + return output.Errorf(output.ExitInternal, "internal", "resolve schema: %s", err) + }As per coding guidelines: "Use
output.Errorforoutput.ErrWithHintinRunEfunctions instead of barefmt.Errorfto ensure structured errors that AI agents can parse".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/event/schema.go` around lines 175 - 178, Replace the bare fmt.Errorf returned from resolveSchemaJSON in runSchema with the structured output.Errorf (or output.ErrWithHint if you can provide a hint) so the Cobra RunE returns an AI-parsable error envelope; specifically, in the runSchema function where you call resolved, _, err := resolveSchemaJSON(def) check err and call output.Errorf("resolve schema: %v", err) (or output.ErrWithHint(..., hint)) instead of return fmt.Errorf(...) to align with RunE error handling conventions.internal/event/schemas/fromtype_test.go (1)
246-257: Nit:%qapplied to a pointer-to-struct value.At lines 247 and 256 the
t.Errorfformat string uses%qbut passesrecipients.Items/states.Items, which are*struct{...}.%qexpects a string-like, so on failure the test output will look like%!q(*struct...=0x...)rather than a useful value. Consider formatting the fields explicitly.♻️ Proposed tweak
if recipients.Items == nil || recipients.Items.Format != "email" { - t.Errorf("recipients.items.format = %q, want email", recipients.Items) + t.Errorf("recipients.items = %+v, want Items.Format=email", recipients.Items) } @@ if states.Items == nil || len(states.Items.Enum) != 3 { - t.Errorf("states.items.enum = %v, want 3 values", states.Items) + t.Errorf("states.items = %+v, want 3 enum values", states.Items) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/schemas/fromtype_test.go` around lines 246 - 257, The test is using %q with pointer values (recipients.Items and states.Items) which prints ugly pointer details; change the t.Errorf calls to format the actual fields instead — for the recipients check use recipients.Items.Format (e.g. %q, recipients.Items.Format) and for the states check log the items' enum length or values (e.g. len(states.Items.Enum) or %#v, states.Items.Enum) so the failure message shows meaningful data rather than a pointer.internal/event/types.go (1)
83-128: Use theProcessFuncalias insideKeyDefinitionto avoid drift.
ProcessFuncis declared at Line 84, butKeyDefinition.Processat Line 125 re-spells the same signature inline. If the hook signature ever evolves (e.g., adding an error-wrapping parameter), the two definitions must change in lockstep and the compiler won't help you notice. Reusing the alias removes the duplication and makesProcessFuncthe single source of truth.♻️ Proposed tweak
// Process is the business logic function. Required when Schema.Custom // carries Processed output; must be nil when Schema.Native is used. - Process func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error) `json:"-"` + Process ProcessFunc `json:"-"`🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/types.go` around lines 83 - 128, The KeyDefinition struct duplicates the ProcessFunc signature inline; update KeyDefinition.Process to use the ProcessFunc alias instead of re-declaring the func type so the signature is centralized — change the field type of KeyDefinition.Process from the inline func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error) to ProcessFunc (keeping the `json:"-"` tag intact) so future signature changes only need to be made on ProcessFunc.internal/event/consume/startup.go (1)
92-98: Retry loop ignores context cancellation.
EnsureBusacceptsctxbut the post-fork dial retry at Lines 92–98 only checks wall-clock deadline. If the caller cancelsctx(Ctrl-C, parent deadline, timeout wrapper inRun), this loop keeps sleeping/dialing for up todialTimeout. Returning onctx.Err()here makes the shutdown path responsive and matches how the rest of the consume pipeline treats ctx as authoritative.♻️ Proposed change
deadline := time.Now().Add(dialTimeout) for time.Now().Before(deadline) { - time.Sleep(dialRetryInterval) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(dialRetryInterval): + } if conn, err := tr.Dial(addr); err == nil { return conn, nil } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/consume/startup.go` around lines 92 - 98, The retry loop in EnsureBus ignores the supplied ctx and only checks a wall-clock deadline; update the loop around tr.Dial(addr) to observe context cancellation: replace the time.Sleep+dialRetryInterval with a select that waits on either ctx.Done() or time.After(dialRetryInterval), and if ctx.Err() != nil return ctx.Err(); keep the existing deadline logic (dialTimeout) intact and still return conn,nil on successful tr.Dial(addr). Ensure the same ctx cancellation check is applied before each dial attempt so the function returns promptly when ctx is canceled.internal/event/testutil/testutil.go (1)
117-128:ErrStubUnconfiguredis defined but never returned.
CallAPIfalls back tojson.RawMessage("{}")whenever bothBodyandErrare unset, so the exportedErrStubUnconfiguredsentinel (documented as "returned as a sentinel so tests can assert via errors.Is") is never actually produced by this stub. Either wire it into theBody == "" && Err == nilpath, or drop the sentinel so tests don't rely on dead API surface.♻️ Suggested wiring
if s.Err != nil { return nil, s.Err } if s.Body == "" { - return json.RawMessage("{}"), nil + return nil, ErrStubUnconfigured } return json.RawMessage(s.Body), nil(Or keep the
{}default and removeErrStubUnconfigured.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/event/testutil/testutil.go` around lines 117 - 128, The exported sentinel ErrStubUnconfigured is never returned by CallAPI because the function currently returns json.RawMessage("{}") when s.Body == "" and s.Err == nil; update CallAPI (in the StubAPIClient/CallAPI code) to return nil, ErrStubUnconfigured when s.Body == "" && s.Err == nil (or alternatively remove ErrStubUnconfigured and its documentation/tests if you prefer the {} default), and adjust any tests that expect the sentinel accordingly so the symbol is either actually produced or removed from the API surface.
| return err | ||
| } | ||
|
|
||
| eventsDir := filepath.Join(core.GetConfigDir(), "events", cfg.AppID) |
There was a problem hiding this comment.
Sanitize cfg.AppID before using it as a path component.
cfg.AppID is joined into an app-managed filesystem path before logger setup. If the config is corrupted or manually edited to include separators like ../, the bus log directory can escape {configDir}/events; reuse a shared AppID/path-component sanitizer before filepath.Join.
Based on learnings, app-managed paths do not need redundant SafeInputPath only when the root directory is validated and filename/path components are sanitized.
🛡️ Proposed shape
- eventsDir := filepath.Join(core.GetConfigDir(), "events", cfg.AppID)
+ safeAppID := sanitizeEventPathComponent(cfg.AppID)
+ eventsDir := filepath.Join(core.GetConfigDir(), "events", safeAppID)Consider sharing the same sanitization policy used for lockfile AppID-derived names so bus logs and locks cannot diverge on edge-case IDs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/event/bus.go` at line 38, Sanitize cfg.AppID before using it in
filepath.Join to prevent path traversal: call the shared AppID/path-component
sanitizer used for lockfile names (e.g., the existing SafeInputPath or the
project’s AppID sanitizer) on cfg.AppID, then use the sanitized value in
filepath.Join(core.GetConfigDir(), "events", sanitizedAppID) so bus log
directories cannot escape the config/events root; update the code that
constructs eventsDir (referencing cfg.AppID, filepath.Join, core.GetConfigDir())
to use the sanitizedAppID.
| func discoverAppIDs() []string { | ||
| eventsDir := filepath.Join(core.GetConfigDir(), "events") | ||
| entries, err := os.ReadDir(eventsDir) | ||
| if err != nil { | ||
| return nil | ||
| } | ||
| var ids []string | ||
| for _, e := range entries { | ||
| if !e.IsDir() { | ||
| continue | ||
| } | ||
| sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock") | ||
| if _, statErr := os.Stat(sockPath); statErr != nil { | ||
| continue | ||
| } | ||
| ids = append(ids, e.Name()) | ||
| } | ||
| return ids | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Use vfs.* for filesystem access in discoverAppIDs.
os.ReadDir and os.Stat bypass the project's mockable filesystem abstraction, making --all behavior (which appIDs get discovered) unmockable in unit tests. Switch to vfs.ReadDir / vfs.Stat.
♻️ Proposed fix
eventsDir := filepath.Join(core.GetConfigDir(), "events")
- entries, err := os.ReadDir(eventsDir)
+ entries, err := vfs.ReadDir(eventsDir)
if err != nil {
return nil
}
var ids []string
for _, e := range entries {
if !e.IsDir() {
continue
}
sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock")
- if _, statErr := os.Stat(sockPath); statErr != nil {
+ if _, statErr := vfs.Stat(sockPath); statErr != nil {
continue
}
ids = append(ids, e.Name())
}
return idsAdd "github.com/larksuite/cli/internal/vfs" to the imports.
As per coding guidelines: "Use vfs.* functions instead of os.* for all filesystem access to enable test mocking".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func discoverAppIDs() []string { | |
| eventsDir := filepath.Join(core.GetConfigDir(), "events") | |
| entries, err := os.ReadDir(eventsDir) | |
| if err != nil { | |
| return nil | |
| } | |
| var ids []string | |
| for _, e := range entries { | |
| if !e.IsDir() { | |
| continue | |
| } | |
| sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock") | |
| if _, statErr := os.Stat(sockPath); statErr != nil { | |
| continue | |
| } | |
| ids = append(ids, e.Name()) | |
| } | |
| return ids | |
| } | |
| func discoverAppIDs() []string { | |
| eventsDir := filepath.Join(core.GetConfigDir(), "events") | |
| entries, err := vfs.ReadDir(eventsDir) | |
| if err != nil { | |
| return nil | |
| } | |
| var ids []string | |
| for _, e := range entries { | |
| if !e.IsDir() { | |
| continue | |
| } | |
| sockPath := filepath.Join(eventsDir, e.Name(), "bus.sock") | |
| if _, statErr := vfs.Stat(sockPath); statErr != nil { | |
| continue | |
| } | |
| ids = append(ids, e.Name()) | |
| } | |
| return ids | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/event/stop.go` around lines 256 - 274, discoverAppIDs currently uses
os.ReadDir and os.Stat which bypass the mockable filesystem; replace those calls
with vfs.ReadDir and vfs.Stat and add the import
"github.com/larksuite/cli/internal/vfs" so the function uses the project's vfs
abstraction (keep the same logic and return values, only swap ReadDir/Stat to
vfs.ReadDir/vfs.Stat in the discoverAppIDs function).
| out := []event.KeyDefinition{ | ||
| { | ||
| Key: "im.message.receive_v1", | ||
| DisplayName: "Receive message", | ||
| Description: "接收 IM 消息(text/post/image/file/audio/media/sticker/interactive/share_chat/share_user/system 所有类型)", | ||
| EventType: "im.message.receive_v1", | ||
| Schema: event.SchemaDef{ | ||
| Custom: &event.SchemaSpec{Type: reflect.TypeOf(ImMessageReceiveOutput{})}, | ||
| }, | ||
| Process: processImMessageReceive, | ||
| AuthTypes: []string{"bot"}, | ||
| RequiredConsoleEvents: []string{"im.message.receive_v1"}, | ||
| }, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify how key scopes are consumed and whether im.message.receive_v1 remains scope-less.
# Expected: preflight/status logic should depend on KeyDefinition.Scopes, and this key should have required scopes.
rg -n -C4 'im\.message\.receive_v1|RequiredConsoleEvents|\.Scopes\b|TenantScopes' events cmd internalRepository: larksuite/cli
Length of output: 48893
Add required scopes to im.message.receive_v1 key definition.
The processed key omits Scopes, which causes scope preflight validation to be skipped (see cmd/event/consume.go line 273: the check returns early when Scopes is empty). Wire the required scopes to match the pattern used by native IM keys on line 45, so that scope checks are enforced uniformly across all IM event registrations.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@events/im/register.go` around lines 20 - 32, The KeyDefinition for
"im.message.receive_v1" is missing the Scopes field so scope preflight is
skipped; update the KeyDefinition in events/im/register.go (the struct literal
that sets Key: "im.message.receive_v1" and Process: processImMessageReceive) to
include the same Scopes slice used by the native IM keys (mirror the pattern
used by the other IM KeyDefinition entries), e.g. add a Scopes field with the
identical scope strings those native IM keys use so scope validation runs for
this event.
| ack := protocol.NewHelloAck("v1", firstForKey) | ||
| _ = protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout) | ||
|
|
||
| // Quote untrusted fields — EventKey / EventTypes come straight off | ||
| // the wire from an unprivileged local process, and a value with | ||
| // embedded newlines would forge bus.log entries. | ||
| b.logger.Printf("Consumer connected: pid=%d key=%q event_types=%q first=%v", | ||
| hello.PID, hello.EventKey, hello.EventTypes, firstForKey) | ||
|
|
||
| bc.Start() |
There was a problem hiding this comment.
Handle failed HelloAck writes before starting the consumer.
Line 340 ignores the handshake write error after the connection has already been registered. If the client disconnected or the write times out, the hub keeps a consumer that never received the ack, which can skew first/last bookkeeping and keep the bus non-idle until later cleanup.
Proposed fix
ack := protocol.NewHelloAck("v1", firstForKey)
- _ = protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout)
+ if err := protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout); err != nil {
+ b.logger.Printf("Consumer handshake failed: pid=%d key=%q err=%v", hello.PID, hello.EventKey, err)
+ bc.Close()
+ return
+ }
// Quote untrusted fields — EventKey / EventTypes come straight off📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ack := protocol.NewHelloAck("v1", firstForKey) | |
| _ = protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout) | |
| // Quote untrusted fields — EventKey / EventTypes come straight off | |
| // the wire from an unprivileged local process, and a value with | |
| // embedded newlines would forge bus.log entries. | |
| b.logger.Printf("Consumer connected: pid=%d key=%q event_types=%q first=%v", | |
| hello.PID, hello.EventKey, hello.EventTypes, firstForKey) | |
| bc.Start() | |
| ack := protocol.NewHelloAck("v1", firstForKey) | |
| if err := protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout); err != nil { | |
| b.logger.Printf("Consumer handshake failed: pid=%d key=%q err=%v", hello.PID, hello.EventKey, err) | |
| bc.Close() | |
| return | |
| } | |
| // Quote untrusted fields — EventKey / EventTypes come straight off | |
| // the wire from an unprivileged local process, and a value with | |
| // embedded newlines would forge bus.log entries. | |
| b.logger.Printf("Consumer connected: pid=%d key=%q event_types=%q first=%v", | |
| hello.PID, hello.EventKey, hello.EventTypes, firstForKey) | |
| bc.Start() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/event/bus/bus.go` around lines 339 - 348, The handshake ack write
(protocol.EncodeWithDeadline(conn, ack, protocol.WriteTimeout)) is ignored;
check its returned error and if non-nil abort registration: log the failure
(include error and context from hello.EventKey/hello.PID), close the connection,
avoid calling bc.Start() and remove/unregister the consumer so it isn't left
tracked; ensure any resources allocated for this consumer are cleaned up before
returning. Use the existing symbols protocol.NewHelloAck,
protocol.EncodeWithDeadline, conn, hello, b.logger.Printf, and bc.Start to
locate and implement this early-return-on-error behavior.
| func (c *Conn) SenderLoop() { | ||
| for { | ||
| select { | ||
| case <-c.closed: | ||
| return | ||
| case msg := <-c.sendCh: | ||
| c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) | ||
| if err := protocol.Encode(c.conn, msg); err != nil { | ||
| if c.logger != nil { | ||
| c.logger.Printf("WARN: write to pid=%d failed: %v", c.pid, err) | ||
| } | ||
| c.shutdown() | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // ReaderLoop monitors the connection for EOF/close and processes any | ||
| // control messages (Bye, PreShutdownCheck) the consume client sends. | ||
| // Uses the bufio.Reader handed in from handleConn so buffered bytes | ||
| // carried over from the Hello read aren't dropped. | ||
| func (c *Conn) ReaderLoop() { | ||
| for { | ||
| line, err := protocol.ReadFrame(c.reader) | ||
| if err != nil { | ||
| break | ||
| } | ||
| line = bytes.TrimRight(line, "\n") | ||
| if len(line) == 0 { | ||
| continue | ||
| } | ||
| msg, err := protocol.Decode(line) | ||
| if err != nil { | ||
| continue | ||
| } | ||
| c.handleControlMessage(msg) | ||
| } | ||
| c.shutdown() | ||
| } | ||
|
|
||
| func (c *Conn) handleControlMessage(msg interface{}) { | ||
| switch m := msg.(type) { | ||
| case *protocol.Bye: | ||
| c.shutdown() | ||
| case *protocol.PreShutdownCheck: | ||
| // Query the Hub (via callback) and reply with whether this is the | ||
| // last consumer for the given EventKey. The consume client uses this | ||
| // to decide whether to run cleanup (e.g. unsubscribe mailbox). | ||
| lastForKey := true | ||
| if c.checkLastForKey != nil { | ||
| lastForKey = c.checkLastForKey(m.EventKey) | ||
| } | ||
| ack := protocol.NewPreShutdownAck(lastForKey) | ||
| c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) | ||
| protocol.Encode(c.conn, ack) | ||
| } | ||
| } |
There was a problem hiding this comment.
Concurrent writes to c.conn from two goroutines can corrupt framing.
SenderLoop writes events via protocol.Encode(c.conn, msg) from one goroutine (Lines 120-121), while handleControlMessage writes a PreShutdownAck via protocol.Encode(c.conn, ack) from the ReaderLoop goroutine (Lines 168-169). net.Conn.Write is safe to call concurrently in the sense that it won't race the runtime, but the byte streams can interleave arbitrarily — and the protocol here is newline-delimited JSON, so any interleave produces a malformed frame that the consumer will fail to decode (or worse, decode into the wrong type). The paired SetWriteDeadline calls also clobber each other because the deadline is per-conn, not per-goroutine.
The simplest fixes:
- Route the ack through
sendChso all writes serialise throughSenderLoop.PushDropOldestis already the supported publish path. - Or, add a
writeMu sync.Mutexheld acrossSetWriteDeadline+Encodein both sites.
Option 1 is preferred since it keeps the single-writer invariant the comment at Line 177 implicitly relies on.
♻️ Sketch of option 1
case *protocol.PreShutdownCheck:
lastForKey := true
if c.checkLastForKey != nil {
lastForKey = c.checkLastForKey(m.EventKey)
}
ack := protocol.NewPreShutdownAck(lastForKey)
- c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
- protocol.Encode(c.conn, ack)
+ // Route through sendCh so SenderLoop remains the sole writer.
+ select {
+ case c.sendCh <- ack:
+ case <-c.closed:
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (c *Conn) SenderLoop() { | |
| for { | |
| select { | |
| case <-c.closed: | |
| return | |
| case msg := <-c.sendCh: | |
| c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) | |
| if err := protocol.Encode(c.conn, msg); err != nil { | |
| if c.logger != nil { | |
| c.logger.Printf("WARN: write to pid=%d failed: %v", c.pid, err) | |
| } | |
| c.shutdown() | |
| return | |
| } | |
| } | |
| } | |
| } | |
| // ReaderLoop monitors the connection for EOF/close and processes any | |
| // control messages (Bye, PreShutdownCheck) the consume client sends. | |
| // Uses the bufio.Reader handed in from handleConn so buffered bytes | |
| // carried over from the Hello read aren't dropped. | |
| func (c *Conn) ReaderLoop() { | |
| for { | |
| line, err := protocol.ReadFrame(c.reader) | |
| if err != nil { | |
| break | |
| } | |
| line = bytes.TrimRight(line, "\n") | |
| if len(line) == 0 { | |
| continue | |
| } | |
| msg, err := protocol.Decode(line) | |
| if err != nil { | |
| continue | |
| } | |
| c.handleControlMessage(msg) | |
| } | |
| c.shutdown() | |
| } | |
| func (c *Conn) handleControlMessage(msg interface{}) { | |
| switch m := msg.(type) { | |
| case *protocol.Bye: | |
| c.shutdown() | |
| case *protocol.PreShutdownCheck: | |
| // Query the Hub (via callback) and reply with whether this is the | |
| // last consumer for the given EventKey. The consume client uses this | |
| // to decide whether to run cleanup (e.g. unsubscribe mailbox). | |
| lastForKey := true | |
| if c.checkLastForKey != nil { | |
| lastForKey = c.checkLastForKey(m.EventKey) | |
| } | |
| ack := protocol.NewPreShutdownAck(lastForKey) | |
| c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) | |
| protocol.Encode(c.conn, ack) | |
| } | |
| } | |
| func (c *Conn) handleControlMessage(msg interface{}) { | |
| switch m := msg.(type) { | |
| case *protocol.Bye: | |
| c.shutdown() | |
| case *protocol.PreShutdownCheck: | |
| // Query the Hub (via callback) and reply with whether this is the | |
| // last consumer for the given EventKey. The consume client uses this | |
| // to decide whether to run cleanup (e.g. unsubscribe mailbox). | |
| lastForKey := true | |
| if c.checkLastForKey != nil { | |
| lastForKey = c.checkLastForKey(m.EventKey) | |
| } | |
| ack := protocol.NewPreShutdownAck(lastForKey) | |
| // Route through sendCh so SenderLoop remains the sole writer. | |
| select { | |
| case c.sendCh <- ack: | |
| case <-c.closed: | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/event/bus/conn.go` around lines 114 - 171,
ReaderLoop/handleControlMessage currently write directly to c.conn (calling
SetWriteDeadline + protocol.Encode) while SenderLoop also writes, which can
interleave frames; instead build the PreShutdownAck in handleControlMessage and
send it into c.sendCh so all writes go through SenderLoop. Concretely, remove
the direct c.conn.SetWriteDeadline/protocol.Encode call in handleControlMessage,
create the ack via protocol.NewPreShutdownAck(lastForKey) and enqueue it with
the same push helper used for normal publishes (e.g. PushDropOldest or the
existing send enqueue path) so SenderLoop performs the single
SetWriteDeadline+Encode sequence; ensure the enqueue is non-blocking or uses the
existing drop-oldest semantics to avoid deadlocks.
| func (h *Hub) UnregisterAndIsLast(s Subscriber) bool { | ||
| h.mu.Lock() | ||
| defer h.mu.Unlock() | ||
| delete(h.subscribers, s) | ||
| if h.keyCounts[s.EventKey()] > 0 { | ||
| h.keyCounts[s.EventKey()]-- | ||
| } | ||
| isLast := h.keyCounts[s.EventKey()] == 0 | ||
| if isLast { | ||
| delete(h.keyCounts, s.EventKey()) | ||
| } | ||
| return isLast | ||
| } | ||
|
|
||
| // AcquireCleanupLock is the race-free replacement for "check last then | ||
| // cleanup". Returns true iff this subscriber is still the only one | ||
| // registered for eventKey AND no other cleanup is already in progress. On | ||
| // true return, caller MUST eventually call ReleaseCleanupLock to unblock | ||
| // any waiting RegisterAndIsFirst. Both checks (count <= 1 and | ||
| // already-locked) run under the same write lock so they are atomic — | ||
| // preventing a late-arriving Hello from slipping in between the check and | ||
| // the reservation. | ||
| func (h *Hub) AcquireCleanupLock(eventKey string) bool { | ||
| h.mu.Lock() | ||
| defer h.mu.Unlock() | ||
| if h.keyCounts[eventKey] > 1 { | ||
| return false | ||
| } | ||
| if _, alreadyLocked := h.cleanupInProgress[eventKey]; alreadyLocked { | ||
| return false | ||
| } | ||
| h.cleanupInProgress[eventKey] = make(chan struct{}) | ||
| return true |
There was a problem hiding this comment.
Make lifecycle bookkeeping exact for absent subscribers.
UnregisterAndIsLast decrements keyCounts even when s was not registered, and AcquireCleanupLock can acquire cleanup rights when the count is 0. A duplicate/stale unregister or cleanup check can therefore corrupt first/last decisions for an active key.
Proposed fix
func (h *Hub) UnregisterAndIsLast(s Subscriber) bool {
h.mu.Lock()
defer h.mu.Unlock()
- delete(h.subscribers, s)
- if h.keyCounts[s.EventKey()] > 0 {
- h.keyCounts[s.EventKey()]--
+ eventKey := s.EventKey()
+ if _, ok := h.subscribers[s]; !ok {
+ return h.keyCounts[eventKey] == 0
}
- isLast := h.keyCounts[s.EventKey()] == 0
+ delete(h.subscribers, s)
+ if h.keyCounts[eventKey] > 0 {
+ h.keyCounts[eventKey]--
+ }
+ isLast := h.keyCounts[eventKey] == 0
if isLast {
- delete(h.keyCounts, s.EventKey())
+ delete(h.keyCounts, eventKey)
}
return isLast
}
@@
func (h *Hub) AcquireCleanupLock(eventKey string) bool {
h.mu.Lock()
defer h.mu.Unlock()
- if h.keyCounts[eventKey] > 1 {
+ if h.keyCounts[eventKey] != 1 {
return false
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (h *Hub) UnregisterAndIsLast(s Subscriber) bool { | |
| h.mu.Lock() | |
| defer h.mu.Unlock() | |
| delete(h.subscribers, s) | |
| if h.keyCounts[s.EventKey()] > 0 { | |
| h.keyCounts[s.EventKey()]-- | |
| } | |
| isLast := h.keyCounts[s.EventKey()] == 0 | |
| if isLast { | |
| delete(h.keyCounts, s.EventKey()) | |
| } | |
| return isLast | |
| } | |
| // AcquireCleanupLock is the race-free replacement for "check last then | |
| // cleanup". Returns true iff this subscriber is still the only one | |
| // registered for eventKey AND no other cleanup is already in progress. On | |
| // true return, caller MUST eventually call ReleaseCleanupLock to unblock | |
| // any waiting RegisterAndIsFirst. Both checks (count <= 1 and | |
| // already-locked) run under the same write lock so they are atomic — | |
| // preventing a late-arriving Hello from slipping in between the check and | |
| // the reservation. | |
| func (h *Hub) AcquireCleanupLock(eventKey string) bool { | |
| h.mu.Lock() | |
| defer h.mu.Unlock() | |
| if h.keyCounts[eventKey] > 1 { | |
| return false | |
| } | |
| if _, alreadyLocked := h.cleanupInProgress[eventKey]; alreadyLocked { | |
| return false | |
| } | |
| h.cleanupInProgress[eventKey] = make(chan struct{}) | |
| return true | |
| func (h *Hub) UnregisterAndIsLast(s Subscriber) bool { | |
| h.mu.Lock() | |
| defer h.mu.Unlock() | |
| eventKey := s.EventKey() | |
| if _, ok := h.subscribers[s]; !ok { | |
| return h.keyCounts[eventKey] == 0 | |
| } | |
| delete(h.subscribers, s) | |
| if h.keyCounts[eventKey] > 0 { | |
| h.keyCounts[eventKey]-- | |
| } | |
| isLast := h.keyCounts[eventKey] == 0 | |
| if isLast { | |
| delete(h.keyCounts, eventKey) | |
| } | |
| return isLast | |
| } | |
| // AcquireCleanupLock is the race-free replacement for "check last then | |
| // cleanup". Returns true iff this subscriber is still the only one | |
| // registered for eventKey AND no other cleanup is already in progress. On | |
| // true return, caller MUST eventually call ReleaseCleanupLock to unblock | |
| // any waiting RegisterAndIsFirst. Both checks (count <= 1 and | |
| // already-locked) run under the same write lock so they are atomic — | |
| // preventing a late-arriving Hello from slipping in between the check and | |
| // the reservation. | |
| func (h *Hub) AcquireCleanupLock(eventKey string) bool { | |
| h.mu.Lock() | |
| defer h.mu.Unlock() | |
| if h.keyCounts[eventKey] != 1 { | |
| return false | |
| } | |
| if _, alreadyLocked := h.cleanupInProgress[eventKey]; alreadyLocked { | |
| return false | |
| } | |
| h.cleanupInProgress[eventKey] = make(chan struct{}) | |
| return true | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/event/bus/hub.go` around lines 78 - 110, UnregisterAndIsLast should
only decrement keyCounts when the subscriber was actually registered: in
UnregisterAndIsLast check existence with "_, existed := h.subscribers[s]" before
deleting and only decrement h.keyCounts[s.EventKey()] if existed; if not existed
return false (not last). Also tighten AcquireCleanupLock to only grant cleanup
when keyCounts[eventKey] == 1 (not <=1): change the current >1 check to require
equality ==1 so a zero count cannot acquire the lock; keep the existing
cleanupInProgress check and channel creation. These changes reference
UnregisterAndIsLast, AcquireCleanupLock, h.subscribers, h.keyCounts, and
h.cleanupInProgress.
| func newPlatformScanner() Scanner { | ||
| return &unixScanner{ | ||
| runPS: func() ([]byte, error) { | ||
| return exec.Command("ps", "-eo", "pid,lstart,command").Output() | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
Force C locale on ps to avoid silent failure on localized systems.
ps -o lstart emits weekday/month using the caller's LC_TIME/LANG. On any non-English-locale host (e.g. zh_CN, de_DE, fr_FR), lstart looks like "周日 4月 19 03:03:40 2026" and fails to parse against the English "Mon Jan _2 15:04:05 2006" layout. Since parseOneUnixPSLine silently drops unparseable lines, ScanBusProcesses returns an empty list → orphan detection is broken for those users, with no diagnostic.
🛠️ Proposed fix
func newPlatformScanner() Scanner {
return &unixScanner{
runPS: func() ([]byte, error) {
- return exec.Command("ps", "-eo", "pid,lstart,command").Output()
+ cmd := exec.Command("ps", "-eo", "pid,lstart,command")
+ // Force C locale so lstart is emitted in English (Mon Jan _2 ...).
+ // Without this, localized hosts produce unparseable timestamps and
+ // every candidate line is silently dropped.
+ cmd.Env = append(os.Environ(), "LC_ALL=C", "LANG=C")
+ return cmd.Output()
},
}
}Requires adding "os" to the import block.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/event/busdiscover/busdiscover_unix.go` around lines 22 - 28,
newPlatformScanner currently runs "ps -eo pid,lstart,command" which emits
localized lstart strings that parseOneUnixPSLine expects in English; fix by
forcing the C locale for the ps invocation in
newPlatformScanner/unixScanner.runPS so output is English (e.g. set LC_ALL=C in
the command's environment via os.Environ()) and add the "os" import; ensure
runPS uses that modified env so ScanBusProcesses and parseOneUnixPSLine reliably
parse on non-English hosts.
| br := bufio.NewReader(conn) | ||
| line, err := protocol.ReadFrame(br) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("no hello_ack received: %w", err) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether protocol.ReadFrame or surrounding handshake code already sets read deadlines.
# Expected: either doHello sets a read deadline, or ReadFrame is wrapped by code that does.
rg -n -C4 'func ReadFrame|ReadFrame\s*\(|SetReadDeadline|SetDeadline' internal/eventRepository: larksuite/cli
Length of output: 11058
Set a read deadline for the hello_ack handshake.
The consumer-side handshake read has no deadline. If the bus accepts the connection and never sends hello_ack, the consumer hangs indefinitely, breaking bounded startup. The bus-side handshake in internal/event/bus/bus.go sets a 5-second read deadline before ReadFrame and clears it after—apply the same pattern here.
Proposed fix
import (
"bufio"
"bytes"
"fmt"
"net"
"os"
+ "time"
"github.com/larksuite/cli/internal/event/protocol"
)
+const helloAckTimeout = 5 * time.Second
+
br := bufio.NewReader(conn)
+ if err := conn.SetReadDeadline(time.Now().Add(helloAckTimeout)); err != nil {
+ return nil, nil, fmt.Errorf("set hello_ack read deadline: %w", err)
+ }
line, err := protocol.ReadFrame(br)
+ _ = conn.SetReadDeadline(time.Time{})
if err != nil {
return nil, nil, fmt.Errorf("no hello_ack received: %w", err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| br := bufio.NewReader(conn) | |
| line, err := protocol.ReadFrame(br) | |
| if err != nil { | |
| return nil, nil, fmt.Errorf("no hello_ack received: %w", err) | |
| } | |
| br := bufio.NewReader(conn) | |
| if err := conn.SetReadDeadline(time.Now().Add(helloAckTimeout)); err != nil { | |
| return nil, nil, fmt.Errorf("set hello_ack read deadline: %w", err) | |
| } | |
| line, err := protocol.ReadFrame(br) | |
| _ = conn.SetReadDeadline(time.Time{}) | |
| if err != nil { | |
| return nil, nil, fmt.Errorf("no hello_ack received: %w", err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/event/consume/handshake.go` around lines 26 - 30, The handshake
reader in internal/event/consume/handshake.go currently calls
protocol.ReadFrame(br) without a deadline; set a read deadline on conn (e.g.
conn.SetReadDeadline(time.Now().Add(5*time.Second))) immediately before calling
protocol.ReadFrame(br) and clear it afterward
(conn.SetReadDeadline(time.Time{})) to mirror the bus-side handshake behavior;
ensure you import time if missing and retain the existing error handling around
the ReadFrame call (variables: conn, br, line, err).
| func TestReadyMarker_SuppressedWhenQuiet(t *testing.T) { | ||
| var buf bytes.Buffer | ||
| writeReadyMarker(&buf, Options{EventKey: "im.message.receive_v1", Quiet: true}) | ||
|
|
||
| if buf.Len() != 0 { | ||
| t.Errorf("Quiet=true must suppress ready marker; got %q", buf.String()) | ||
| } |
There was a problem hiding this comment.
Don’t suppress the AI readiness marker in quiet mode.
The PR defines [event] ready event_key=<key> as the subprocess readiness contract before stdout. Quiet can suppress extra progress, but hiding this marker can leave event consume --quiet callers unable to detect readiness.
🛠️ Proposed test update
-func TestReadyMarker_SuppressedWhenQuiet(t *testing.T) {
+func TestReadyMarker_StillEmittedWhenQuiet(t *testing.T) {
var buf bytes.Buffer
writeReadyMarker(&buf, Options{EventKey: "im.message.receive_v1", Quiet: true})
- if buf.Len() != 0 {
- t.Errorf("Quiet=true must suppress ready marker; got %q", buf.String())
+ want := "[event] ready event_key=im.message.receive_v1\n"
+ if got := buf.String(); got != want {
+ t.Errorf("got %q, want %q", got, want)
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/event/consume/listening_text_test.go` around lines 77 - 83, The test
TestReadyMarker_SuppressedWhenQuiet is wrong: the ready marker must still be
emitted regardless of Quiet. Update the test (and any assertions around
writeReadyMarker and Options) to assert that writeReadyMarker(&buf,
Options{EventKey:"im.message.receive_v1", Quiet:true}) writes the readiness line
(e.g. contains "[event] ready event_key=im.message.receive_v1") instead of
expecting an empty buffer; reference the TestReadyMarker_SuppressedWhenQuiet
test and the writeReadyMarker function/Options struct when making the change.
| scanner := bufio.NewScanner(conn) | ||
| if scanner.Scan() { | ||
| resp, err := protocol.Decode(scanner.Bytes()) | ||
| if err == nil { | ||
| if ack, ok := resp.(*protocol.PreShutdownAck); ok { | ||
| return ack.LastForKey | ||
| } | ||
| } | ||
| } | ||
| return true |
There was a problem hiding this comment.
Wait for the PreShutdownAck instead of trusting the next frame.
Line 43 reads only one frame; if an event/source-state frame is already queued before the bus ack, this falls through to true and can run cleanup even when the bus would have returned LastForKey=false. Loop until the ack or deadline, ignoring unrelated frames.
🐛 Proposed fix
scanner := bufio.NewScanner(conn)
- if scanner.Scan() {
+ for scanner.Scan() {
resp, err := protocol.Decode(scanner.Bytes())
if err == nil {
if ack, ok := resp.(*protocol.PreShutdownAck); ok {
return ack.LastForKey
}
}
}
return true📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| scanner := bufio.NewScanner(conn) | |
| if scanner.Scan() { | |
| resp, err := protocol.Decode(scanner.Bytes()) | |
| if err == nil { | |
| if ack, ok := resp.(*protocol.PreShutdownAck); ok { | |
| return ack.LastForKey | |
| } | |
| } | |
| } | |
| return true | |
| scanner := bufio.NewScanner(conn) | |
| for scanner.Scan() { | |
| resp, err := protocol.Decode(scanner.Bytes()) | |
| if err == nil { | |
| if ack, ok := resp.(*protocol.PreShutdownAck); ok { | |
| return ack.LastForKey | |
| } | |
| } | |
| } | |
| return true |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/event/consume/shutdown.go` around lines 42 - 51, The current code
reads only one frame from conn with scanner.Scan() and returns true if that
frame isn’t a protocol.PreShutdownAck; change this to loop reading frames until
a PreShutdownAck is received or the read deadline expires: repeatedly call
scanner.Scan(), decode with protocol.Decode(scanner.Bytes()), ignore
non-*protocol.PreShutdownAck frames, and only return ack.LastForKey when you see
a *protocol.PreShutdownAck; on scanner error or deadline expiry return the
existing fallback (true). Update the logic around conn/scanner (and any read
deadline handling) so unrelated event/source-state frames are skipped rather
than causing an immediate shutdown decision.
Summary
Adds a new
eventtop-level command for real-time Lark event subscription & consume, built for AI-agent subprocess ergonomics. Production scope this phase: IM domain only (11 keys — message / reaction / chat member / chat lifecycle).event list/event schema/event consume/event status/event stop— full CLI surface around a local_busdaemon per AppID (Unix Socket on macOS/Linux, Named Pipe on Windows), with orphan-bus detection and cleanup.SchemaDef { Native | Custom } + FieldOverrides map[JSONPointer]FieldMetareplaces the oldKind/OutputSchema/OutputType. Native schemas auto-wrap in the V2 envelope; Custom are business-authored and emitted verbatim. Struct tagsdesc/enum/kinddrive reflected annotations;kindrenders to the standard JSON Schemaformatkeyword with a Feishu-specific value vocabulary (open_id/chat_id/email/timestamp_ms/ …).[event] ready event_key=<key>stderr marker before any stdout emission (parent blocks on this instead of sleep).< /dev/null/nohup/ systemd callers aren't mystified).--max-events N/--timeout Dfor bounded runs; ideal for 'fetch one sample' pattern.--dry-run— event subscription is a long-running lifecycle, not a single HTTP write, so--max-events 1 --timeout 30sis the preview path (mirrors Google Workspace CLI's choice on their events subscribe).Non-goals this phase
lark-cli mail +watch(unchanged). The framework'sPreConsume/FieldOverrides/ParamMulticapabilities are complete and mail can return in a later PR without further framework changes.Testing
go test ./cmd/event/... ./events/... ./internal/event/...all green locallyevents/lint_test.goguards orphanFieldOverridespointers with a synthetic canary keyevent schema im.message.receive_v1 --jsonshows enum onmessage_type/chat_typeandformaton every ID / timestamp fieldevent schema im.chat.disbanded_v1shows V2 envelope wrap with FieldOverrides appliedevent consume im.message.receive_v1 --max-events 1 --timeout 30sexits cleanly after one event / timeoutSummary by CodeRabbit
Release Notes
New Features
event consumecommand to subscribe to and process real-time events with support for filtering, sampling, and output formatting.event listcommand to display available event types and metadata.event schemacommand to view detailed event schemas and field documentation.event statusandevent stopcommands to manage the local event bus daemon.Documentation