diff --git a/plugins/codex/scripts/app-server-broker.mjs b/plugins/codex/scripts/app-server-broker.mjs index 1954274f..be078c35 100644 --- a/plugins/codex/scripts/app-server-broker.mjs +++ b/plugins/codex/scripts/app-server-broker.mjs @@ -99,7 +99,16 @@ async function main() { } } + // Tracks whether `shutdown()` is running so the upstream-exit watchdog + // below can tell intentional teardown (`broker/shutdown`, SIGTERM, SIGINT) + // apart from the upstream app-server dying on its own. Both paths resolve + // `appClient.exitPromise` via `appClient.close()`; without this flag the + // watchdog would re-enter `shutdown()` during graceful teardown and race + // `process.exit(0)` with `process.exit(1)`. + let shuttingDown = false; + async function shutdown(server) { + shuttingDown = true; for (const socket of sockets) { socket.end(); } @@ -115,6 +124,26 @@ async function main() { appClient.setNotificationHandler(routeNotification); + // Propagate upstream codex app-server exit so connected clients see a + // socket EOF instead of waiting forever for a turn/completed. Without + // this, a codex CLI crash (rate limit, OOM, internal error) leaves the + // broker idle-alive: the socket to each client stays open but no more + // notifications ever arrive, so `captureTurn` hangs. Closing the server + // triggers the client-side `exitPromise` resolution path that + // `captureTurn` now uses as a watchdog. + // + // The `shuttingDown` guard is required — `shutdown()` itself closes + // `appClient`, which resolves the same `exitPromise` this handler is + // listening on. Without the guard, every graceful termination would + // trigger this watchdog, re-enter `shutdown()`, and race the intentional + // `process.exit(0)` with `process.exit(1)`. + appClient.exitPromise.then(() => { + if (shuttingDown) { + return; + } + shutdown(server).catch(() => {}).finally(() => process.exit(1)); + }); + const server = net.createServer((socket) => { sockets.add(socket); socket.setEncoding("utf8"); diff --git a/plugins/codex/scripts/lib/codex.mjs b/plugins/codex/scripts/lib/codex.mjs index f2fe88bd..055cf070 100644 --- a/plugins/codex/scripts/lib/codex.mjs +++ b/plugins/codex/scripts/lib/codex.mjs @@ -575,6 +575,51 @@ async function captureTurn(client, threadId, startRequest, options = {}) { applyTurnNotification(state, message); }); + // Resolve captureTurn when the app-server disconnects before sending a + // terminal signal. The existing resolution paths both require server + // cooperation: (a) a `turn/completed` notification, or (b) a final_answer + // phase agentMessage that arms `scheduleInferredCompletion`. Neither fires + // when the Codex CLI exits mid-turn (rate limit, OOM, internal error) — + // observed in 2026-04-18 adversarial-review runs where the CLI emitted a + // plan-phase agentMessage, ran tool calls, then exited cleanly via IPC EOF. + // Without this watchdog, `state.completion` hangs, the companion script + // exits via its own IPC close handling, and runTrackedJob's try/catch is + // never reached — leaving a zombie job and no final verdict. + // + // Success preservation: if a `final_answer` agentMessage already arrived + // AND no subagent work is outstanding, the turn is authoritatively + // complete — the disconnect is just Codex closing the door behind a + // successful response. Mirror `scheduleInferredCompletion`'s success + // semantics (same flag, same pending-work check) without waiting for + // its 250ms debounce, which can't help a dead socket. Only when no + // terminal signal was captured do we synthesize a `failed` turn. + client.exitPromise.then(() => { + if (state.completed) { + return; + } + const hasFinalAnswer = + state.finalAnswerSeen && + state.pendingCollaborations.size === 0 && + state.activeSubagentTurns.size === 0; + if (hasFinalAnswer) { + completeTurn(state, null, { inferred: true }); + return; + } + state.error = state.error ?? client.exitError ?? { + message: "codex app-server disconnected before the turn completed." + }; + emitProgress( + state.onProgress, + "App-server disconnected before turn/completed; marking turn as failed.", + "failed" + ); + completeTurn( + state, + { id: state.turnId ?? "disconnected-turn", status: "failed" }, + { inferred: true } + ); + }); + try { const response = await startRequest(); options.onResponse?.(response, state); diff --git a/tests/fake-codex-fixture.mjs b/tests/fake-codex-fixture.mjs index debcadce..a093670b 100644 --- a/tests/fake-codex-fixture.mjs +++ b/tests/fake-codex-fixture.mjs @@ -533,6 +533,52 @@ rl.on("line", (line) => { interruptibleTurns.set(turnId, { threadId: thread.id, timer }); } else if (BEHAVIOR === "slow-task") { emitTurnCompletedLater(thread.id, turnId, items, 400); + } else if (BEHAVIOR === "disconnect-before-completion") { + // Reproduces the real-world Codex CLI exit path: plan-phase + // agentMessage (phase !== "final_answer") is emitted, then the + // server disconnects without ever sending turn/completed or a + // final_answer agentMessage. Neither existing captureTurn + // resolution path (turn/completed notification OR finalAnswerSeen + // inferred timer) fires → captureTurn hangs indefinitely. + send({ method: "turn/started", params: { threadId: thread.id, turn: buildTurn(turnId) } }); + send({ + method: "item/completed", + params: { + threadId: thread.id, + turnId, + item: { + type: "agentMessage", + id: "msg_" + turnId, + text: "I'm going to inspect the target and return a verdict.", + phase: "plan" + } + } + }); + // Flush stdout then disconnect cleanly, matching observed upstream + // exit mode (exit 0 via IPC EOF; no protocol-level error). + process.stdout.write("", () => process.exit(0)); + } else if (BEHAVIOR === "final-answer-then-exit") { + // The captureTurn watchdog must distinguish "disconnected with + // authoritative completion signal" (success) from "disconnected + // with no terminal signal" (failure). Emit a completed + // final_answer agentMessage and then exit before sending + // turn/completed — this is the success-path race the watchdog + // must preserve instead of demoting to failed. + send({ method: "turn/started", params: { threadId: thread.id, turn: buildTurn(turnId) } }); + send({ + method: "item/completed", + params: { + threadId: thread.id, + turnId, + item: { + type: "agentMessage", + id: "msg_" + turnId, + text: payload, + phase: "final_answer" + } + } + }); + process.stdout.write("", () => process.exit(0)); } else { emitTurnCompleted(thread.id, turnId, items); } diff --git a/tests/runtime.test.mjs b/tests/runtime.test.mjs index 90408372..13288155 100644 --- a/tests/runtime.test.mjs +++ b/tests/runtime.test.mjs @@ -1,13 +1,20 @@ import fs from "node:fs"; +import os from "node:os"; import path from "node:path"; import test from "node:test"; import assert from "node:assert/strict"; -import { spawn } from "node:child_process"; +import { spawn, spawnSync } from "node:child_process"; import { fileURLToPath } from "node:url"; import { buildEnv, installFakeCodex } from "./fake-codex-fixture.mjs"; import { initGitRepo, makeTempDir, run } from "./helpers.mjs"; -import { loadBrokerSession, saveBrokerSession } from "../plugins/codex/scripts/lib/broker-lifecycle.mjs"; +import { + loadBrokerSession, + saveBrokerSession, + sendBrokerShutdown, + waitForBrokerEndpoint +} from "../plugins/codex/scripts/lib/broker-lifecycle.mjs"; +import { createBrokerEndpoint } from "../plugins/codex/scripts/lib/broker-endpoint.mjs"; import { resolveStateDir } from "../plugins/codex/scripts/lib/state.mjs"; const ROOT = path.resolve(path.dirname(fileURLToPath(import.meta.url)), ".."); @@ -300,6 +307,131 @@ test("adversarial review asks Codex to inspect larger diffs itself", () => { assert.doesNotMatch(state.lastTurnStart.prompt, /PROMPT_SELF_COLLECT_[ABC]/); }); +test("adversarial review resolves when app-server disconnects before turn/completed", () => { + // Regression: Codex CLI can exit cleanly (token limit / rate limit / internal + // error) after emitting a non-final_answer agentMessage but before sending + // turn/completed. Neither existing captureTurn resolution path fires — the + // Promise hangs and the companion script exits via IPC EOF without writing a + // final verdict, leaving a zombie job record. + // + // Observed in the wild on 2026-04-18 (MeeePtt favorites-fix commit). A passing + // test asserts captureTurn can handle "app-server closed unexpectedly". + const repo = makeTempDir(); + const binDir = makeTempDir(); + installFakeCodex(binDir, "disconnect-before-completion"); + initGitRepo(repo); + fs.writeFileSync(path.join(repo, "README.md"), "hello\n"); + run("git", ["add", "README.md"], { cwd: repo }); + run("git", ["commit", "-m", "init"], { cwd: repo }); + fs.writeFileSync(path.join(repo, "README.md"), "hello again\n"); + + // Explicit 10s timeout guards against the hang on unfixed code — without + // this, the test would block until node --test's outer limit and report + // "hung" instead of "disconnected". + const result = spawnSync("node", [SCRIPT, "adversarial-review"], { + cwd: repo, + env: buildEnv(binDir), + encoding: "utf8", + timeout: 10_000 + }); + + assert.notEqual(result.signal, "SIGTERM", + `adversarial-review hung after app-server disconnect (stdout=${result.stdout}, stderr=${result.stderr})`); + assert.notEqual(result.status, 0, + "adversarial-review must surface disconnect as non-zero exit"); + assert.match( + `${result.stdout}\n${result.stderr}`, + /disconnect|closed|unexpectedly/i, + "rendered output should mention the disconnect" + ); +}); + +test("adversarial review preserves final_answer when app-server exits before turn/completed", () => { + // Companion to the disconnect-before-completion test: when the + // app-server emits a completed final_answer agentMessage and then exits + // cleanly (no turn/completed notification), the captureTurn watchdog + // must NOT demote the successful turn to failed. `finalAnswerSeen` is + // the authoritative "work finished" signal — the disconnect is just + // Codex closing the door, not a failure. + const repo = makeTempDir(); + const binDir = makeTempDir(); + installFakeCodex(binDir, "final-answer-then-exit"); + initGitRepo(repo); + fs.mkdirSync(path.join(repo, "src")); + fs.writeFileSync(path.join(repo, "src", "app.js"), "export const value = items[0];\n"); + run("git", ["add", "src/app.js"], { cwd: repo }); + run("git", ["commit", "-m", "init"], { cwd: repo }); + fs.writeFileSync(path.join(repo, "src", "app.js"), "export const value = items[0].id;\n"); + + const result = spawnSync("node", [SCRIPT, "adversarial-review"], { + cwd: repo, + env: buildEnv(binDir), + encoding: "utf8", + timeout: 10_000 + }); + + assert.notEqual(result.signal, "SIGTERM", + `adversarial-review hung after final_answer + exit (stderr=${result.stderr})`); + assert.equal(result.status, 0, + `final_answer before disconnect must stay exit 0 (stdout=${result.stdout}, stderr=${result.stderr})`); + assert.match( + result.stdout, + /Missing empty-state guard/, + "structured findings from the final_answer payload must survive the disconnect" + ); +}); + +test("broker exits cleanly on broker/shutdown without tripping the upstream-exit watchdog", async () => { + // Guards against a regression introduced while fixing the + // "disconnect-before-completion" hang: the upstream-exit watchdog added + // to app-server-broker.mjs listens on appClient.exitPromise, which is + // resolved both by Codex CLI crashes AND by shutdown()'s own + // appClient.close(). Without an `isShuttingDown` guard the watchdog + // re-enters shutdown() during broker/shutdown (or SIGTERM/SIGINT) and + // races the intentional process.exit(0) with process.exit(1). + const binDir = makeTempDir(); + installFakeCodex(binDir); + const cwd = makeTempDir(); + + // Spawn the broker directly (not detached) so the test can observe its + // real exit code — the shared-broker lifecycle normally detaches the + // process which hides exit code from supervisors. + const brokerScript = path.join(PLUGIN_ROOT, "scripts", "app-server-broker.mjs"); + const sessionDir = fs.mkdtempSync(path.join(os.tmpdir(), "broker-test-")); + const endpoint = createBrokerEndpoint(sessionDir); + const pidFile = path.join(sessionDir, "broker.pid"); + + const broker = spawn( + "node", + [brokerScript, "serve", "--endpoint", endpoint, "--cwd", cwd, "--pid-file", pidFile], + { env: buildEnv(binDir), stdio: ["ignore", "pipe", "pipe"] } + ); + + const exited = new Promise((resolve) => { + broker.on("exit", (code, signal) => resolve({ code, signal })); + }); + + try { + const ready = await waitForBrokerEndpoint(endpoint, 5_000); + assert.ok(ready, "broker should become reachable"); + + await sendBrokerShutdown(endpoint); + const { code, signal } = await exited; + + assert.equal(signal, null, "broker should exit without a signal"); + assert.equal( + code, + 0, + "broker/shutdown must return exit 0 — non-zero indicates the upstream-exit watchdog tripped during graceful teardown" + ); + } finally { + if (broker.exitCode === null && broker.signalCode === null) { + broker.kill("SIGKILL"); + } + fs.rmSync(sessionDir, { recursive: true, force: true }); + } +}); + test("review includes reasoning output when the app server returns it", () => { const repo = makeTempDir(); const binDir = makeTempDir();