From e125ae6f3d7650f18b0dd1e0d103d8df6d37ee64 Mon Sep 17 00:00:00 2001 From: robert-j-y <212159665+robert-j-y@users.noreply.github.com> Date: Thu, 11 Jun 2026 12:53:17 -0700 Subject: [PATCH] feat(agent): turn-level retry with idle timeout, plus withToolRetry helper When a turn's response stream dies mid-loop (ends without a terminal event, fails, or hangs silently), the whole callModel promise rejected and callers could only restart from turn 0, re-running every tool call. retryTurn re-sends just the failed turn's request with the accumulated conversation state intact, and idleTimeoutMs converts silent stream hangs into retryable failures. Retries surface as turn.retry events. withToolRetry separately absorbs transient tool execute throws so they don't burn a model turn. Co-authored-by: Cursor --- .changeset/turn-level-retry.md | 16 + packages/agent/src/index.ts | 13 + packages/agent/src/inner-loop/call-model.ts | 4 + packages/agent/src/lib/async-params.ts | 18 + packages/agent/src/lib/model-result.ts | 391 ++++++++--- packages/agent/src/lib/stream-transformers.ts | 29 +- packages/agent/src/lib/tool-retry.ts | 181 +++++ packages/agent/src/lib/tool-types.ts | 28 +- packages/agent/src/lib/turn-retry.ts | 217 ++++++ packages/agent/tests/unit/tool-retry.test.ts | 305 +++++++++ packages/agent/tests/unit/turn-retry.test.ts | 620 ++++++++++++++++++ 11 files changed, 1725 insertions(+), 97 deletions(-) create mode 100644 .changeset/turn-level-retry.md create mode 100644 packages/agent/src/lib/tool-retry.ts create mode 100644 packages/agent/src/lib/turn-retry.ts create mode 100644 packages/agent/tests/unit/tool-retry.test.ts create mode 100644 packages/agent/tests/unit/turn-retry.test.ts diff --git a/.changeset/turn-level-retry.md b/.changeset/turn-level-retry.md new file mode 100644 index 0000000..61fd21e --- /dev/null +++ b/.changeset/turn-level-retry.md @@ -0,0 +1,16 @@ +--- +'@openrouter/agent': minor +--- + +Add turn-level retry and hang detection to the callModel tool loop, plus a tool-level retry helper. + +**`retryTurn` option on `callModel`** — when a turn (one provider request + stream consumption) fails, the turn is re-sent with the full accumulated conversation intact instead of aborting the whole loop. Tool results gathered in prior turns are never discarded and tools are not re-executed. Covers all turn sites: the initial request (send and consume phases), follow-up requests after tool execution, the forced final response, and state resume. + +- `limit` — max retries per turn (default 2) +- `idleTimeoutMs` — converts silently-hung streams (no events, no terminal frame, connection left open) into retryable `TurnIdleTimeoutError` failures; the hung connection is cancelled +- `isRetryable` — custom retryability policy; the default (`defaultIsTurnRetryable`) retries idle timeouts, streams that ended without a terminal event, network errors, and HTTP 408/429/5xx, and does not retry `response.failed` terminal events (e.g. refusals) or other 4xx +- `backoffMs` — fixed or per-attempt delay between retries + +Mid-turn retries emit a new `turn.retry` event on `getFullResponsesStream()` (`isTurnRetryEvent` guard exported); events already received for that turn should be treated as void since the retried attempt re-streams the turn from the start. Failure classification is now typed: `TurnIdleTimeoutError`, `TurnStreamEndedError`, `TurnResponseFailedError` (messages unchanged). + +**`withToolRetry(tool, options)`** — wrap a tool so its `execute` function is automatically re-run when it throws (transient network failures inside tools no longer burn a model round trip). Supports regular and generator tools, preserves tool typing and type-guard classification, with `limit`, `backoffMs`, `isRetryable`, and `onRetry` observability hook. Only wrap idempotent tools. diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 4540e07..716998c 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -133,6 +133,9 @@ export type { ContextInput } from './lib/tool-context.js'; export { buildToolExecuteContext, ToolContextStore } from './lib/tool-context.js'; // Real-time tool event broadcasting export { ToolEventBroadcaster } from './lib/tool-event-broadcaster.js'; +// Tool-level retry +export type { ToolRetryContext, ToolRetryOptions } from './lib/tool-retry.js'; +export { withToolRetry } from './lib/tool-retry.js'; export type { ChatStreamEvent, ClientTool, @@ -178,6 +181,7 @@ export type { ToolWithGenerator, TurnContext, TurnEndEvent, + TurnRetryEvent, TurnStartEvent, TypedToolCall, TypedToolCallUnion, @@ -198,11 +202,20 @@ export { isToolPreliminaryResultEvent, isToolResultEvent, isTurnEndEvent, + isTurnRetryEvent, isTurnStartEvent, ToolType, toolHasApprovalConfigured, } from './lib/tool-types.js'; // Turn context helpers export { buildTurnContext, normalizeInputToArray } from './lib/turn-context.js'; +// Turn-level retry +export type { RetryTurnOptions, TurnRetryContext } from './lib/turn-retry.js'; +export { + defaultIsTurnRetryable, + TurnIdleTimeoutError, + TurnResponseFailedError, + TurnStreamEndedError, +} from './lib/turn-retry.js'; export type { Hook, OpenRouterOptions, SDKOptions } from './openrouter.js'; export { OpenRouter } from './openrouter.js'; diff --git a/packages/agent/src/inner-loop/call-model.ts b/packages/agent/src/inner-loop/call-model.ts index dc42df9..f862eed 100644 --- a/packages/agent/src/inner-loop/call-model.ts +++ b/packages/agent/src/inner-loop/call-model.ts @@ -101,6 +101,7 @@ export function callModel< onTurnStart, onTurnEnd, allowFinalResponse, + retryTurn, ...apiRequest } = request; @@ -165,5 +166,8 @@ export function callModel< ...(allowFinalResponse !== undefined && { allowFinalResponse, }), + ...(retryTurn !== undefined && { + retryTurn, + }), } as GetResponseOptions); } diff --git a/packages/agent/src/lib/async-params.ts b/packages/agent/src/lib/async-params.ts index 18ba08d..b1b352a 100644 --- a/packages/agent/src/lib/async-params.ts +++ b/packages/agent/src/lib/async-params.ts @@ -10,6 +10,7 @@ import type { ToolContextMapWithShared, TurnContext, } from './tool-types.js'; +import type { RetryTurnOptions } from './turn-retry.js'; // Re-export Tool type for convenience export type { Tool } from './tool-types.js'; @@ -98,6 +99,22 @@ type BaseCallModelInput< * (HITL pause, approval pause, interruption, or natural completion). */ allowFinalResponse?: boolean | string; + /** + * Retry a failed turn (one provider request + stream consumption) instead + * of aborting the whole tool loop. The accumulated conversation — + * including all tool results gathered in prior turns — is preserved + * across retries, so a single dead stream no longer discards the loop's + * gathered context. + * + * `idleTimeoutMs` additionally converts silently-hung streams (no events, + * no terminal frame, connection left open) into retryable failures. + * + * Mid-turn retries emit a `turn.retry` event on + * `getFullResponsesStream()`; events already received for that turn + * should be treated as void since the retried attempt re-streams the + * turn from the start. + */ + retryTurn?: RetryTurnOptions; }; /** @@ -199,6 +216,7 @@ export async function resolveAsyncFunctions { @@ -308,63 +329,92 @@ export class ModelResult< } satisfies TurnStartEvent); const consumer = stream.createConsumer(); + let sawTerminalEvent = false; for await (const event of consumer) { broadcaster.push(event); + if (isResponseCompletedEvent(event) || isResponseIncompleteEvent(event)) { + sawTerminalEvent = true; + } } - broadcaster.push({ - type: 'turn.end', - turnNumber: 0, - timestamp: Date.now(), - } satisfies TurnEndEvent); + // With turn retry enabled, a stream that ended without a terminal + // event is a failed attempt that the loop path will retry — the + // retried attempt (pipeAndConsumeStream) owns turn.end for turn 0. + if (!retryEnabled || sawTerminalEvent) { + broadcaster.push({ + type: 'turn.end', + turnNumber: 0, + timestamp: Date.now(), + } satisfies TurnEndEvent); + } })().catch((error) => { - broadcaster.complete(error instanceof Error ? error : new Error(String(error))); + // With turn retry enabled the loop path owns turn-0 failures (it will + // retry or reject); completing the broadcaster here would poison every + // stream consumer before the retry gets a chance. + if (!retryEnabled) { + broadcaster.complete(error instanceof Error ? error : new Error(String(error))); + } }); } /** * Pipe a follow-up stream into the turn broadcaster and capture the completed response. * Emits turn.start / turn.end delimiters around the stream events. + * + * Retried attempts pass `emitTurnStart: false` — the `turn.retry` event + * (pushed by the retry driver) marks the restart instead of a duplicate + * `turn.start`. */ private async pipeAndConsumeStream( stream: ReusableReadableStream, turnNumber: number, + opts?: { + emitTurnStart?: boolean; + }, ): Promise { const broadcaster = this.turnBroadcaster!; - broadcaster.push({ - type: 'turn.start', - turnNumber, - timestamp: Date.now(), - } satisfies TurnStartEvent); + if (opts?.emitTurnStart !== false) { + broadcaster.push({ + type: 'turn.start', + turnNumber, + timestamp: Date.now(), + } satisfies TurnStartEvent); + } const consumer = stream.createConsumer(); + const idleTimeoutMs = this.options.retryTurn?.idleTimeoutMs; + const events = iterateWithIdleTimeout( + consumer, + idleTimeoutMs, + () => new TurnIdleTimeoutError(turnNumber, idleTimeoutMs ?? 0), + ); let completedResponse: models.OpenResponsesResult | null = null; - for await (const event of consumer) { + for await (const event of events) { broadcaster.push(event); if (isResponseCompletedEvent(event)) { completedResponse = event.response; } if (isResponseFailedEvent(event)) { const errorMsg = 'message' in event ? String(event.message) : 'Response failed'; - throw new Error(errorMsg); + throw new TurnResponseFailedError(errorMsg); } if (isResponseIncompleteEvent(event)) { completedResponse = event.response; } } + if (!completedResponse) { + throw new TurnStreamEndedError('Follow-up stream ended without a completed response'); + } + broadcaster.push({ type: 'turn.end', turnNumber, timestamp: Date.now(), } satisfies TurnEndEvent); - if (!completedResponse) { - throw new Error('Follow-up stream ended without a completed response'); - } - return completedResponse; } @@ -477,6 +527,198 @@ export class ModelResult< throw new Error('Neither stream nor response initialized'); } + // ========================================================================= + // Turn-level retry + // ========================================================================= + + /** + * Run a turn attempt with retry per the `retryTurn` option. `attemptFn` + * receives the attempt number (0 = initial attempt) and must be safe to + * call repeatedly — all conversation-state mutation happens outside it. + * Each retry pushes a `turn.retry` event to the turn broadcaster (when one + * exists) before re-attempting. + */ + private async runTurnAttempts( + turnNumber: number, + attemptFn: (attempt: number) => Promise, + ): Promise { + const config = this.options.retryTurn; + const limit = config ? (config.limit ?? 2) : 0; + let attempt = 0; + + while (true) { + try { + return await attemptFn(attempt); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + if (!config || attempt >= limit) { + throw err; + } + + const retryContext = { + turnNumber, + attempt: attempt + 1, + }; + const retryable = config.isRetryable + ? await config.isRetryable(err, retryContext) + : defaultIsTurnRetryable(err); + if (!retryable) { + throw err; + } + + attempt++; + this.turnBroadcaster?.push({ + type: 'turn.retry', + turnNumber, + attempt, + error: err.message, + timestamp: Date.now(), + } satisfies TurnRetryEvent); + + const backoff = resolveBackoffMs(config.backoffMs, attempt); + if (backoff > 0) { + await sleep(backoff); + } + } + } + } + + /** + * Send a turn request and consume its stream to a completed response. + * This is the idempotent retry unit: it performs no conversation-state + * mutation, so a failed call can be re-issued as-is. On failure the + * stream is cancelled to release the underlying connection. + */ + private async sendAndConsumeTurn( + request: models.ResponsesRequest, + turnNumber: number, + attempt: number, + ): Promise { + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + + if (!result.ok) { + throw result.error; + } + + const value = result.value; + if (isEventStream(value)) { + const stream = new ReusableReadableStream(value); + try { + if (this.turnBroadcaster) { + return await this.pipeAndConsumeStream(stream, turnNumber, { + emitTurnStart: attempt === 0, + }); + } + return await consumeStreamForCompletion(stream, { + idleTimeoutMs: this.options.retryTurn?.idleTimeoutMs, + turnNumber, + }); + } catch (error) { + // Abort the (possibly hung) upstream connection before retrying. + void stream.cancel().catch(() => {}); + throw error; + } + } + if (this.isNonStreamingResponse(value)) { + return value; + } + throw new Error('Unexpected response type from API'); + } + + /** + * Consume the initial (turn 0) response with turn-level retry. A retry + * re-issues the already-resolved initial request — the resolved input and + * instructions are unchanged — and replaces `this.reusableStream` so + * late-attaching consumers replay the successful attempt. When the turn + * broadcaster is active, the retried attempt is piped through + * `pipeAndConsumeStream` so stream consumers receive the fresh events + * (the failed initial pipe intentionally skips its `turn.end`). + */ + private async getInitialResponseWithRetry(): Promise { + if (!this.options.retryTurn) { + return this.getInitialResponse(); + } + + return this.runTurnAttempts(0, async (attempt) => { + if (attempt === 0) { + if (this.finalResponse) { + return this.finalResponse; + } + const stream = this.reusableStream; + if (!stream) { + throw new Error('Neither stream nor response initialized'); + } + try { + return await consumeStreamForCompletion(stream, { + idleTimeoutMs: this.options.retryTurn?.idleTimeoutMs, + turnNumber: 0, + }); + } catch (error) { + void stream.cancel().catch(() => {}); + throw error; + } + } + + // Retry attempt: re-issue the resolved initial request. + if (!this.resolvedRequest) { + throw new Error('Request not initialized'); + } + const request: models.ResponsesRequest = { + ...this.resolvedRequest, + stream: true, + }; + + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + if (!result.ok) { + throw result.error; + } + + const value = result.value; + if (isEventStream(value)) { + const stream = new ReusableReadableStream(value); + this.reusableStream = stream; + try { + if (this.turnBroadcaster && this.initialStreamPipeStarted) { + return await this.pipeAndConsumeStream(stream, 0, { + emitTurnStart: false, + }); + } + return await consumeStreamForCompletion(stream, { + idleTimeoutMs: this.options.retryTurn?.idleTimeoutMs, + turnNumber: 0, + }); + } catch (error) { + void stream.cancel().catch(() => {}); + throw error; + } + } + if (this.isNonStreamingResponse(value)) { + this.finalResponse = value; + if (this.turnBroadcaster && this.initialStreamPipeStarted) { + this.turnBroadcaster.push({ + type: 'turn.end', + turnNumber: 0, + timestamp: Date.now(), + } satisfies TurnEndEvent); + } + return value; + } + throw new Error('Unexpected response type from API'); + }); + } + /** * Save response output to state. * Appends the response output to the message history and records the response ID. @@ -1133,33 +1375,11 @@ export class ModelResult< stream: true, }; - const newResult = await betaResponsesSend( - this.options.client, - { - responsesRequest: newRequest, - }, - this.options.options, + // The conversation state for this turn is fully captured in `newRequest` + // above — send + consume is the idempotent retry unit. + return this.runTurnAttempts(turnNumber, (attempt) => + this.sendAndConsumeTurn(newRequest, turnNumber, attempt), ); - - if (!newResult.ok) { - throw newResult.error; - } - - // Handle streaming or non-streaming response - const value = newResult.value; - if (isEventStream(value)) { - const followUpStream = new ReusableReadableStream(value); - - if (this.turnBroadcaster) { - return this.pipeAndConsumeStream(followUpStream, turnNumber); - } - - return consumeStreamForCompletion(followUpStream); - } - if (this.isNonStreamingResponse(value)) { - return value; - } - throw new Error('Unexpected response type from API'); } /** @@ -1224,30 +1444,11 @@ export class ModelResult< stream: true, }; - const result = await betaResponsesSend( - this.options.client, - { - responsesRequest: finalRequest, - }, - this.options.options, + // `finalRequest` carries the full conversation — send + consume is the + // idempotent retry unit. + return this.runTurnAttempts(turnNumber, (attempt) => + this.sendAndConsumeTurn(finalRequest, turnNumber, attempt), ); - - if (!result.ok) { - throw result.error; - } - - const value = result.value; - if (isEventStream(value)) { - const stream = new ReusableReadableStream(value); - if (this.turnBroadcaster) { - return this.pipeAndConsumeStream(stream, turnNumber); - } - return consumeStreamForCompletion(stream); - } - if (this.isNonStreamingResponse(value)) { - return value; - } - throw new Error('Unexpected response type from API'); } /** @@ -1579,18 +1780,22 @@ export class ModelResult< // Force stream mode for initial request const request = this.resolvedRequest; - // Make the API request - const apiResult = await betaResponsesSend( - this.options.client, - { - responsesRequest: request, - }, - this.options.options, - ); - - if (!apiResult.ok) { - throw apiResult.error; - } + // Make the API request. Send-phase failures (the request never + // produced a stream) are retried per `retryTurn`; nothing has been + // exposed to consumers yet, so the retry is invisible. + const apiResult = await this.runTurnAttempts(0, async () => { + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + if (!result.ok) { + throw result.error; + } + return result; + }); // Stash fresh user items so saveResponseToState can persist them // atomically with the assistant output. Writing them here would leave @@ -1805,18 +2010,20 @@ export class ModelResult< this.resolvedRequest = request; - // Make the API request - const apiResult = await betaResponsesSend( - this.options.client, - { - responsesRequest: request, - }, - this.options.options, - ); - - if (!apiResult.ok) { - throw apiResult.error; - } + // Make the API request (send-phase failures retried per `retryTurn`) + const apiResult = await this.runTurnAttempts(turnContext.numberOfTurns, async () => { + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + if (!result.ok) { + throw result.error; + } + return result; + }); // Handle both streaming and non-streaming responses if (isEventStream(apiResult.value)) { @@ -1852,8 +2059,8 @@ export class ModelResult< return; } - // Get initial response - let currentResponse = await this.getInitialResponse(); + // Get initial response (with turn-level retry when configured) + let currentResponse = await this.getInitialResponseWithRetry(); // Save initial response to state await this.saveResponseToState(currentResponse); diff --git a/packages/agent/src/lib/stream-transformers.ts b/packages/agent/src/lib/stream-transformers.ts index b35b56b..6a7e6b4 100644 --- a/packages/agent/src/lib/stream-transformers.ts +++ b/packages/agent/src/lib/stream-transformers.ts @@ -31,6 +31,12 @@ import { isWebSearchCallOutputItem, } from './stream-type-guards.js'; import type { ClientTool, ParsedToolCall, ServerTool, Tool } from './tool-types.js'; +import { + iterateWithIdleTimeout, + TurnIdleTimeoutError, + TurnResponseFailedError, + TurnStreamEndedError, +} from './turn-retry.js'; /** * Extract text deltas from responses stream events @@ -630,14 +636,29 @@ export async function* buildMessageStream( } /** - * Consume stream until completion and return the complete response + * Consume stream until completion and return the complete response. + * + * When `idleTimeoutMs` is set, a gap of more than that many milliseconds + * between events fails consumption with a retryable `TurnIdleTimeoutError` + * (the caller is responsible for cancelling the underlying stream). */ export async function consumeStreamForCompletion( stream: ReusableReadableStream, + options?: { + idleTimeoutMs?: number | undefined; + turnNumber?: number | undefined; + }, ): Promise { const consumer = stream.createConsumer(); + const turnNumber = options?.turnNumber ?? 0; - for await (const event of consumer) { + const events = iterateWithIdleTimeout( + consumer, + options?.idleTimeoutMs, + () => new TurnIdleTimeoutError(turnNumber, options?.idleTimeoutMs ?? 0), + ); + + for await (const event of events) { if (!('type' in event)) { continue; } @@ -648,7 +669,7 @@ export async function consumeStreamForCompletion( if (isResponseFailedEvent(event)) { // The failed event contains the full response with error information - throw new Error(`Response failed: ${JSON.stringify(event.response.error)}`); + throw new TurnResponseFailedError(`Response failed: ${JSON.stringify(event.response.error)}`); } if (isResponseIncompleteEvent(event)) { @@ -657,7 +678,7 @@ export async function consumeStreamForCompletion( } } - throw new Error('Stream ended without completion event'); + throw new TurnStreamEndedError('Stream ended without completion event'); } /** diff --git a/packages/agent/src/lib/tool-retry.ts b/packages/agent/src/lib/tool-retry.ts new file mode 100644 index 0000000..a62ee98 --- /dev/null +++ b/packages/agent/src/lib/tool-retry.ts @@ -0,0 +1,181 @@ +/** + * Tool-level retry: wrap a tool so its `execute` function is automatically + * re-run when it throws. This absorbs transient failures (e.g. a flaky + * network call inside the tool) before they reach the model — without the + * wrapper, a throwing tool is reported to the model as an error result, + * which burns a full provider round trip while the model decides to re-try. + * + * The SDK stays unopinionated about tool errors by default; retry is the + * tool author's policy, opted into per tool: + * + * ```typescript + * const fetchTool = withToolRetry( + * tool({ + * name: 'web_fetch', + * // ... + * execute: async ({ url }) => fetchPage(url), + * }), + * { limit: 2, onRetry: ({ toolName, attempt, error }) => log(...) }, + * ); + * ``` + * + * Notes: + * - Only wrap tools whose `execute` is safe to re-run (idempotent or + * side-effect free). Do not wrap tools like `send_email`. + * - Generator tools are re-run from the start on retry; preliminary results + * already emitted by the failed attempt will have been forwarded to + * consumers and may repeat. + * - HITL `onToolCalled` hooks and manual tools are not retried; the tool is + * returned unchanged if it has no `execute` function. + */ + +import type { Tool } from './tool-types.js'; +import { resolveBackoffMs, sleep } from './turn-retry.js'; + +/** + * Context passed to `isRetryable` / `onRetry` when a tool execution fails. + */ +export interface ToolRetryContext { + /** Name of the tool whose execution failed. */ + toolName: string; + /** The retry attempt about to be made (1 = first retry). */ + attempt: number; + /** The error thrown by the failed attempt. */ + error: unknown; +} + +/** + * Options for `withToolRetry`. + */ +export interface ToolRetryOptions { + /** + * Maximum number of retries (not counting the initial attempt). + * Default: 2. + */ + limit?: number; + /** + * Delay before each retry attempt, in milliseconds. A function receives + * the attempt number (1 = first retry). Default: 0. + */ + backoffMs?: number | ((attempt: number) => number); + /** + * Decide whether a thrown error should be retried. Default: retry all + * errors (the wrapper only sees `execute` throws; input validation + * happens before it and is never retried). + */ + isRetryable?: (context: ToolRetryContext) => boolean | Promise; + /** + * Observability hook invoked before each retry attempt. + */ + onRetry?: (context: ToolRetryContext) => void | Promise; +} + +/** + * Decide whether a failure should be retried and run the retry hooks. + * Returns true when the caller should re-attempt. + */ +async function shouldRetry( + options: ToolRetryOptions | undefined, + context: ToolRetryContext, +): Promise { + const limit = options?.limit ?? 2; + if (context.attempt > limit) { + return false; + } + if (options?.isRetryable && !(await options.isRetryable(context))) { + return false; + } + await options?.onRetry?.(context); + const backoff = resolveBackoffMs(options?.backoffMs, context.attempt); + if (backoff > 0) { + await sleep(backoff); + } + return true; +} + +/** + * Wrap a tool so its `execute` function is automatically re-run (up to + * `limit` times) when it throws. Regular async tools and generator tools + * are both supported; tools without an `execute` function are returned + * unchanged. The tool's type — including its schemas and inferred + * input/output types — is preserved. + */ +export function withToolRetry(tool: TTool, options?: ToolRetryOptions): TTool { + const fn = ( + tool as { + function?: { + name?: string; + execute?: unknown; + }; + } + ).function; + if (!fn || typeof fn.execute !== 'function') { + return tool; + } + + const toolName = fn.name ?? 'unknown'; + const originalExecute = fn.execute as (...args: unknown[]) => unknown; + // Match the SDK's own discriminator (isGeneratorTool): a tool is a + // generator tool iff its function declares an eventSchema. The executor + // drives generator tools via iterator.next(), so the wrapper must also be + // an async generator in that case. + const isGenerator = 'eventSchema' in fn; + + let wrappedExecute: (...args: unknown[]) => unknown; + + if (isGenerator) { + // Generator tool: a failed attempt is re-run from the start. Yields from + // the failed attempt have already been forwarded. + wrappedExecute = async function* retryingGeneratorExecute(...args: unknown[]) { + let attempt = 0; + while (true) { + try { + const iterator = originalExecute(...args) as AsyncGenerator; + let step = await iterator.next(); + while (!step.done) { + yield step.value; + step = await iterator.next(); + } + return step.value; + } catch (error) { + attempt++; + const retry = await shouldRetry(options, { + toolName, + attempt, + error, + }); + if (!retry) { + throw error; + } + } + } + }; + } else { + wrappedExecute = async function retryingExecute(...args: unknown[]) { + let attempt = 0; + while (true) { + try { + return await originalExecute(...args); + } catch (error) { + attempt++; + const retry = await shouldRetry(options, { + toolName, + attempt, + error, + }); + if (!retry) { + throw error; + } + } + } + }; + } + + return { + ...tool, + function: { + ...fn, + execute: wrappedExecute, + }, + } as TTool; +} diff --git a/packages/agent/src/lib/tool-types.ts b/packages/agent/src/lib/tool-types.ts index e768358..3622039 100644 --- a/packages/agent/src/lib/tool-types.ts +++ b/packages/agent/src/lib/tool-types.ts @@ -811,6 +811,24 @@ export type TurnEndEvent = { timestamp: number; }; +/** + * Turn retry event emitted when a turn failed and is about to be re-sent + * (requires the `retryTurn` option). Events already emitted for this turn + * before the failure should be treated as void — the retried attempt + * re-streams the turn from the start. For send-phase failures (the request + * never produced a stream) a `turn.retry` may precede the turn's + * `turn.start`. + */ +export type TurnRetryEvent = { + type: 'turn.retry'; + turnNumber: number; + /** The retry attempt about to be made (1 = first retry). */ + attempt: number; + /** Message of the error that caused the retry. */ + error: string; + timestamp: number; +}; + /** * Enhanced stream event types for getFullResponsesStream * Extends StreamEvents with tool preliminary results, tool results, @@ -824,7 +842,8 @@ export type ResponseStreamEvent = | ToolResultEvent | ToolCallOutputEvent | TurnStartEvent - | TurnEndEvent; + | TurnEndEvent + | TurnRetryEvent; /** * Type guard to check if an event is a tool preliminary result event @@ -865,6 +884,13 @@ export function isTurnEndEvent(event: ResponseStreamEvent): event is TurnEndEven return event.type === 'turn.end'; } +/** + * Type guard to check if an event is a turn retry event + */ +export function isTurnRetryEvent(event: ResponseStreamEvent): event is TurnRetryEvent { + return event.type === 'turn.retry'; +} + /** * Tool stream event types for getToolStream * Includes both argument deltas and preliminary results diff --git a/packages/agent/src/lib/turn-retry.ts b/packages/agent/src/lib/turn-retry.ts new file mode 100644 index 0000000..d27ed07 --- /dev/null +++ b/packages/agent/src/lib/turn-retry.ts @@ -0,0 +1,217 @@ +/** + * Turn-level retry support for the callModel tool loop. + * + * A "turn" is one provider request + stream consumption. The conversation + * state accumulated across turns (tool results, prior outputs) lives on the + * ModelResult instance, so a failed turn can be re-sent without losing any + * gathered context. This module holds the public option types, the typed + * errors that classify turn failures, the default retryability policy, and + * the idle-timeout iterator that converts silently-hung streams into + * retryable failures. + */ + +/** + * Context passed to `isRetryable` when deciding whether to retry a failed turn. + */ +export interface TurnRetryContext { + /** The turn that failed (0 = initial request). */ + turnNumber: number; + /** The retry attempt about to be made (1 = first retry). */ + attempt: number; +} + +/** + * Options for turn-level retry in the callModel tool loop. + * + * When a turn's provider request or stream fails, the turn is re-sent with + * the full accumulated conversation intact — tool results gathered in prior + * turns are never discarded. Without this option a single dead turn aborts + * the entire loop. + * + * Mid-turn retries emit a `turn.retry` event on the unified stream + * (`getFullResponsesStream`). Events received from the failed attempt before + * the failure remain in the stream; consumers that care about exact turn + * contents should treat events between `turn.start`/`turn.retry` and a + * subsequent `turn.retry` as void. + */ +export interface RetryTurnOptions { + /** + * Maximum number of retries per turn (not counting the initial attempt). + * Default: 2. + */ + limit?: number; + /** + * If no stream event arrives for this many milliseconds during a turn, the + * turn fails with a retryable `TurnIdleTimeoutError` and the underlying + * stream is cancelled. This is what converts silently-hung provider + * streams (no terminal event, connection left open) into recoverable + * failures. Default: no idle timeout. + */ + idleTimeoutMs?: number; + /** + * Delay before each retry attempt, in milliseconds. A function receives + * the attempt number (1 = first retry). Default: 0 (the provider round + * trip itself dominates latency). + */ + backoffMs?: number | ((attempt: number) => number); + /** + * Decide whether a failed turn should be retried. Defaults to + * `defaultIsTurnRetryable`: retries idle timeouts, streams that ended + * without a terminal event, network errors, and HTTP 408/429/5xx; does not + * retry `response.failed` events (e.g. refusals) or other HTTP 4xx. + */ + isRetryable?: (error: Error, context: TurnRetryContext) => boolean | Promise; +} + +/** + * Thrown when a turn's stream produced no events for `idleTimeoutMs` + * milliseconds. Retryable by default. + */ +export class TurnIdleTimeoutError extends Error { + readonly turnNumber: number; + readonly idleTimeoutMs: number; + + constructor(turnNumber: number, idleTimeoutMs: number) { + super( + `Turn ${turnNumber} stream produced no events for ${idleTimeoutMs}ms (idle timeout exceeded)`, + ); + this.name = 'TurnIdleTimeoutError'; + this.turnNumber = turnNumber; + this.idleTimeoutMs = idleTimeoutMs; + } +} + +/** + * Thrown when a turn's stream closed without a terminal event + * (`response.completed` / `response.incomplete`). This is the signature of + * an upstream stream dying mid-flight. Retryable by default. + */ +export class TurnStreamEndedError extends Error { + constructor(message: string) { + super(message); + this.name = 'TurnStreamEndedError'; + } +} + +/** + * Thrown when a turn's stream emitted a terminal `response.failed` event. + * The provider deliberately failed the response (which includes refusals), + * so this is NOT retryable by default — opt in via `isRetryable` if your + * gateway emits transient failures this way. + */ +export class TurnResponseFailedError extends Error { + constructor(message: string) { + super(message); + this.name = 'TurnResponseFailedError'; + } +} + +/** + * Best-effort extraction of an HTTP status code from SDK / fetch errors. + */ +function statusCodeOf(error: Error): number | undefined { + const candidate = error as Error & { + statusCode?: unknown; + status?: unknown; + httpMeta?: { + response?: { + status?: unknown; + }; + }; + }; + for (const value of [ + candidate.statusCode, + candidate.status, + candidate.httpMeta?.response?.status, + ]) { + if (typeof value === 'number') { + return value; + } + } + return undefined; +} + +/** + * Default turn retryability policy: + * - `TurnIdleTimeoutError` / `TurnStreamEndedError` — retry (dead/hung stream) + * - `TurnResponseFailedError` — don't retry (deliberate provider failure, + * e.g. a refusal; retrying re-asks a deterministic question) + * - HTTP errors — retry 408, 429, and 5xx; don't retry other 4xx + * - anything else (network/socket errors surfaced by the fetch layer) — retry + */ +export function defaultIsTurnRetryable(error: Error): boolean { + if (error instanceof TurnIdleTimeoutError || error instanceof TurnStreamEndedError) { + return true; + } + if (error instanceof TurnResponseFailedError) { + return false; + } + const status = statusCodeOf(error); + if (status !== undefined) { + return status === 408 || status === 429 || status >= 500; + } + return true; +} + +/** + * Resolve the backoff delay for a retry attempt. + */ +export function resolveBackoffMs( + backoffMs: RetryTurnOptions['backoffMs'], + attempt: number, +): number { + if (typeof backoffMs === 'function') { + return backoffMs(attempt); + } + return backoffMs ?? 0; +} + +/** + * Wrap an async iterator so that a gap of more than `idleTimeoutMs` between + * events fails the iteration with the error produced by `makeTimeoutError`. + * + * The caller is responsible for cancelling the underlying stream when the + * timeout fires — the wrapped iterator's pending `next()` is abandoned (its + * eventual settlement is swallowed to avoid unhandled rejections). + */ +export async function* iterateWithIdleTimeout( + iterator: AsyncIterableIterator, + idleTimeoutMs: number | undefined, + makeTimeoutError: () => Error, +): AsyncIterableIterator { + if (!idleTimeoutMs || idleTimeoutMs <= 0) { + yield* iterator; + return; + } + + while (true) { + let timer: ReturnType | undefined; + const nextPromise = iterator.next(); + // If the timeout wins the race, nobody awaits this promise anymore — + // swallow its eventual rejection so it can't surface as unhandled. + nextPromise.catch(() => {}); + try { + const result = await Promise.race([ + nextPromise, + new Promise((_, reject) => { + timer = setTimeout(() => reject(makeTimeoutError()), idleTimeoutMs); + }), + ]); + if (result.done) { + return; + } + yield result.value; + } finally { + if (timer !== undefined) { + clearTimeout(timer); + } + } + } +} + +/** + * Promise-based sleep used between retry attempts. + */ +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/agent/tests/unit/tool-retry.test.ts b/packages/agent/tests/unit/tool-retry.test.ts new file mode 100644 index 0000000..de9f363 --- /dev/null +++ b/packages/agent/tests/unit/tool-retry.test.ts @@ -0,0 +1,305 @@ +/** + * Tests for `withToolRetry` — automatic re-run of a tool's execute function + * when it throws, absorbing transient failures before they reach the model. + */ +import { describe, expect, it, vi } from 'vitest'; +import { z } from 'zod/v4'; +import { withToolRetry } from '../../src/lib/tool-retry.js'; +import type { Tool } from '../../src/lib/tool-types.js'; +import { isGeneratorTool, isRegularExecuteTool, ToolType } from '../../src/lib/tool-types.js'; + +function makeRegularTool( + execute: (params: { url: string }) => Promise<{ + body: string; + }>, +) { + return { + type: ToolType.Function, + function: { + name: 'web_fetch', + description: 'Fetch a page.', + inputSchema: z.object({ + url: z.string(), + }), + outputSchema: z.object({ + body: z.string(), + }), + execute, + }, + } as const; +} + +describe('withToolRetry: regular execute tools', () => { + it('retries a throwing execute and returns the eventual success', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValueOnce(new Error('ECONNRESET')) + .mockRejectedValueOnce(new Error('ETIMEDOUT')) + .mockResolvedValueOnce({ + body: 'hello', + }); + + const wrapped = withToolRetry(makeRegularTool(execute), { + limit: 2, + }); + + const result = await ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }); + + expect(result).toEqual({ + body: 'hello', + }); + expect(execute).toHaveBeenCalledTimes(3); + }); + + it('throws the last error once the limit is exhausted', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValue(new Error('permanently down')); + + const wrapped = withToolRetry(makeRegularTool(execute), { + limit: 1, + }); + + await expect( + ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }), + ).rejects.toThrow('permanently down'); + expect(execute).toHaveBeenCalledTimes(2); + }); + + it('respects isRetryable declining a retry', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValue(new Error('404 not found')); + const isRetryable = vi.fn(() => false); + + const wrapped = withToolRetry(makeRegularTool(execute), { + limit: 3, + isRetryable, + }); + + await expect( + ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }), + ).rejects.toThrow('404 not found'); + expect(execute).toHaveBeenCalledTimes(1); + expect(isRetryable).toHaveBeenCalledWith( + expect.objectContaining({ + toolName: 'web_fetch', + attempt: 1, + }), + ); + }); + + it('invokes onRetry with attempt context before each retry', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValueOnce(new Error('flaky')) + .mockResolvedValueOnce({ + body: 'ok', + }); + const onRetry = vi.fn(); + + const wrapped = withToolRetry(makeRegularTool(execute), { + onRetry, + }); + + await ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }); + + expect(onRetry).toHaveBeenCalledTimes(1); + expect(onRetry).toHaveBeenCalledWith( + expect.objectContaining({ + toolName: 'web_fetch', + attempt: 1, + error: expect.any(Error), + }), + ); + }); + + it('preserves the tool shape and type-guard classification', () => { + const original = makeRegularTool(async () => ({ + body: 'x', + })); + const wrapped = withToolRetry(original); + + expect(isRegularExecuteTool(wrapped as unknown as Tool)).toBe(true); + expect(isGeneratorTool(wrapped as unknown as Tool)).toBe(false); + expect(wrapped.function.name).toBe('web_fetch'); + expect(wrapped.function.inputSchema).toBe(original.function.inputSchema); + expect(wrapped.function.outputSchema).toBe(original.function.outputSchema); + }); + + it('returns tools without an execute function unchanged', () => { + const manualTool = { + type: ToolType.Function, + function: { + name: 'manual_thing', + description: 'No execute.', + inputSchema: z.object({}), + outputSchema: z.object({}), + }, + } as const; + + const wrapped = withToolRetry(manualTool as unknown as Tool); + expect(wrapped).toBe(manualTool); + }); +}); + +describe('withToolRetry: generator tools', () => { + function makeGeneratorTool( + execute: (params: { query: string }) => AsyncGenerator, + ) { + return { + type: ToolType.Function, + function: { + name: 'web_search', + description: 'Search.', + inputSchema: z.object({ + query: z.string(), + }), + outputSchema: z.object({ + results: z.array(z.string()), + }), + eventSchema: z.object({ + progress: z.string(), + }), + execute, + }, + } as const; + } + + it('re-runs a generator that throws mid-iteration and yields the retried run', async () => { + let calls = 0; + const wrapped = withToolRetry( + makeGeneratorTool(async function* (_params) { + calls++; + yield { + progress: `attempt ${calls} started`, + }; + if (calls === 1) { + throw new Error('stream cut'); + } + return { + results: [ + 'found it', + ], + }; + }), + { + limit: 1, + }, + ); + + const iterator = ( + wrapped.function.execute as (params: { + query: string; + }) => AsyncGenerator + )({ + query: 'cats', + }); + + const yielded: unknown[] = []; + let step = await iterator.next(); + while (!step.done) { + yielded.push(step.value); + step = await iterator.next(); + } + + expect(calls).toBe(2); + // Yields from BOTH attempts are forwarded (documented behavior). + expect(yielded).toEqual([ + { + progress: 'attempt 1 started', + }, + { + progress: 'attempt 2 started', + }, + ]); + expect(step.value).toEqual({ + results: [ + 'found it', + ], + }); + }); + + it('keeps the generator classification so the executor drives it correctly', () => { + const wrapped = withToolRetry( + makeGeneratorTool(async function* () { + yield { + progress: 'hi', + }; + return { + results: [], + }; + }), + ); + expect(isGeneratorTool(wrapped as unknown as Tool)).toBe(true); + }); + + it('throws once generator retries are exhausted', async () => { + const wrapped = withToolRetry( + makeGeneratorTool(async function* () { + yield { + progress: 'starting', + }; + throw new Error('always fails'); + }), + { + limit: 1, + }, + ); + + const iterator = ( + wrapped.function.execute as (params: { + query: string; + }) => AsyncGenerator + )({ + query: 'cats', + }); + + await expect(async () => { + let step = await iterator.next(); + while (!step.done) { + step = await iterator.next(); + } + }).rejects.toThrow('always fails'); + }); +}); diff --git a/packages/agent/tests/unit/turn-retry.test.ts b/packages/agent/tests/unit/turn-retry.test.ts new file mode 100644 index 0000000..84d7153 --- /dev/null +++ b/packages/agent/tests/unit/turn-retry.test.ts @@ -0,0 +1,620 @@ +/** + * Tests for turn-level retry (`retryTurn` option). + * + * A "turn" is one provider request + stream consumption. These tests verify: + * - a dead follow-up stream is retried with the accumulated conversation + * intact (tool results are NOT re-executed or lost) + * - the initial (turn 0) stream is retried by re-issuing the resolved request + * - silently-hung streams are converted into retryable failures via + * `idleTimeoutMs` + * - `turn.retry` events surface on getFullResponsesStream + * - retry limits, retryability policy, and backoff hooks are honored + */ +import type { OpenRouterCore } from '@openrouter/sdk/core'; +import type * as models from '@openrouter/sdk/models'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { z } from 'zod/v4'; + +const mockBetaResponsesSend = vi.hoisted(() => vi.fn()); + +vi.mock('@openrouter/sdk/funcs/betaResponsesSend', () => ({ + betaResponsesSend: mockBetaResponsesSend, +})); + +import { callModel } from '../../src/inner-loop/call-model.js'; +import { ToolType } from '../../src/lib/tool-types.js'; +import { TurnIdleTimeoutError, TurnStreamEndedError } from '../../src/lib/turn-retry.js'; + +function toolCallResponse(): models.OpenResponsesResult { + return { + id: 'resp_tool_call', + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + type: 'function_call', + id: 'fc_1', + callId: 'call_abc', + name: 'get_weather', + arguments: '{"location":"Tokyo"}', + status: 'completed', + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +function textResponse(text: string): models.OpenResponsesResult { + return { + id: 'resp_text', + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + id: 'msg_text', + type: 'message', + role: 'assistant', + status: 'completed', + content: [ + { + type: 'output_text', + text, + annotations: [], + }, + ], + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +/** Stream that emits a completed event for the given response, then closes. */ +function completedStream(response: models.OpenResponsesResult): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.completed', + response, + }); + controller.close(); + }, + }); +} + +/** + * Stream that emits some deltas then closes WITHOUT a terminal event — the + * signature of an upstream stream dying mid-flight. + */ +function deadStream(): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.output_text.delta', + delta: 'partial...', + }); + controller.close(); + }, + }); +} + +/** Stream that emits one delta then never produces anything again (hang). */ +function hangingStream(): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.output_text.delta', + delta: 'and then silence', + }); + // Never closes, never errors — a silently-hung upstream connection. + }, + }); +} + +/** Stream whose transport errors mid-read. */ +function erroringStream(): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.error(new Error('network drop mid-stream')); + }, + }); +} + +const weatherTool = { + type: ToolType.Function, + function: { + name: 'get_weather', + description: 'Get the weather for a location.', + inputSchema: z.object({ + location: z.string(), + }), + outputSchema: z.object({ + temperature: z.number(), + }), + execute: vi.fn(async (_params: { location: string }) => ({ + temperature: 22, + })), + }, +} as const; + +const client = {} as OpenRouterCore; + +function ok(value: unknown) { + return { + ok: true, + value, + }; +} + +describe('retryTurn: follow-up turn retry', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('retries a dead follow-up stream with the accumulated conversation intact', async () => { + mockBetaResponsesSend + // Turn 0: model asks for the tool + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + // Turn 1, attempt 0: stream dies without a terminal event + .mockResolvedValueOnce(ok(deadStream())) + // Turn 1, attempt 1 (retry): success + .mockResolvedValueOnce(ok(completedStream(textResponse('Sunny, 22C')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 2, + }, + }); + + const text = await result.getText(); + + expect(text).toBe('Sunny, 22C'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(3); + // The tool executed exactly once — retry re-sends the request, it does + // NOT re-execute tools. + expect(weatherTool.function.execute).toHaveBeenCalledTimes(1); + + // The retried request is byte-identical to the failed one: same + // accumulated conversation including the function_call_output. + const attempt0Input = mockBetaResponsesSend.mock.calls[1]?.[1]?.responsesRequest?.input; + const attempt1Input = mockBetaResponsesSend.mock.calls[2]?.[1]?.responsesRequest?.input; + expect(attempt1Input).toEqual(attempt0Input); + const hasToolOutput = ( + attempt1Input as Array<{ + type?: string; + }> + ).some((item) => item.type === 'function_call_output'); + expect(hasToolOutput).toBe(true); + }); + + it('does not retry when retryTurn is not configured', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + }); + + await expect(result.getText()).rejects.toThrow('Stream ended without completion event'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + }); + + it('gives up after the retry limit is exhausted', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + // Turn 1: initial attempt + 2 retries, all dead + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(deadStream())); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 2, + }, + }); + + await expect(result.getText()).rejects.toThrow(TurnStreamEndedError); + // 1 (turn 0) + 3 (turn 1 attempts) + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(4); + }); + + it('respects a custom isRetryable that declines the retry', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())); + + const isRetryable = vi.fn(() => false); + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 2, + isRetryable, + }, + }); + + await expect(result.getText()).rejects.toThrow(TurnStreamEndedError); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + expect(isRetryable).toHaveBeenCalledTimes(1); + const [error, context] = isRetryable.mock.calls[0] as unknown as [ + Error, + { + turnNumber: number; + attempt: number; + }, + ]; + expect(error).toBeInstanceOf(TurnStreamEndedError); + expect(context.turnNumber).toBe(1); + expect(context.attempt).toBe(1); + }); + + it('retries transport errors that surface mid-stream', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(erroringStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).resolves.toBe('Recovered'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(3); + }); + + it('invokes the backoff function with the attempt number', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Done')))); + + const backoffMs = vi.fn(() => 0); + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + backoffMs, + }, + }); + + await result.getText(); + expect(backoffMs).toHaveBeenCalledWith(1); + }); +}); + +describe('retryTurn: idle timeout (hung streams)', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('converts a silently-hung follow-up stream into a retried turn', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(hangingStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered after hang')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 1, + idleTimeoutMs: 50, + }, + }); + + await expect(result.getText()).resolves.toBe('Recovered after hang'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(3); + }); + + it('fails with TurnIdleTimeoutError when retries are exhausted on hangs', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(hangingStream())) + .mockResolvedValueOnce(ok(hangingStream())); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 1, + idleTimeoutMs: 50, + }, + }); + + const error = await result.getText().then( + () => null, + (e: unknown) => e, + ); + expect(error).toBeInstanceOf(TurnIdleTimeoutError); + expect((error as TurnIdleTimeoutError).idleTimeoutMs).toBe(50); + }); +}); + +describe('retryTurn: initial turn (turn 0)', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('re-issues the initial request when the turn-0 stream dies', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Hello!')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).resolves.toBe('Hello!'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + + // Retried request carries the same resolved input. + const firstInput = mockBetaResponsesSend.mock.calls[0]?.[1]?.responsesRequest?.input; + const retryInput = mockBetaResponsesSend.mock.calls[1]?.[1]?.responsesRequest?.input; + expect(retryInput).toEqual(firstInput); + }); + + it('retries a hung turn-0 stream via the idle timeout', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(hangingStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: { + idleTimeoutMs: 50, + }, + }); + + await expect(result.getText()).resolves.toBe('Recovered'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + }); + + it('retries send-phase failures (HTTP 5xx) before any stream exists', async () => { + const serverError = Object.assign(new Error('Internal Server Error'), { + statusCode: 500, + }); + mockBetaResponsesSend + .mockResolvedValueOnce({ + ok: false, + error: serverError, + }) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).resolves.toBe('Recovered'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + }); + + it('does not retry non-retryable HTTP errors (400) by default', async () => { + const badRequest = Object.assign(new Error('Bad Request'), { + statusCode: 400, + }); + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: false, + error: badRequest, + }); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).rejects.toThrow('Bad Request'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(1); + }); + + it('does not retry a response.failed terminal event by default', async () => { + const failedStream = new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.failed', + response: { + ...textResponse(''), + status: 'failed', + error: { + code: 'refusal', + message: 'Model refused', + }, + }, + }); + controller.close(); + }, + }); + mockBetaResponsesSend.mockResolvedValueOnce(ok(failedStream)); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 3, + }, + }); + + await expect(result.getText()).rejects.toThrow('Response failed'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(1); + }); +}); + +describe('retryTurn: turn.retry events on getFullResponsesStream', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('emits turn.retry between turn.start and turn.end for a retried follow-up', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Sunny')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + const events: Array<{ + type: string; + turnNumber?: number; + attempt?: number; + }> = []; + for await (const event of result.getFullResponsesStream()) { + if ('type' in event) { + events.push( + event as { + type: string; + turnNumber?: number; + attempt?: number; + }, + ); + } + } + + const retryEvents = events.filter((e) => e.type === 'turn.retry'); + expect(retryEvents).toHaveLength(1); + expect(retryEvents[0]).toMatchObject({ + turnNumber: 1, + attempt: 1, + }); + + // Exactly one turn.start and one turn.end for turn 1 — the retry does + // not duplicate the turn delimiters. + const turn1Starts = events.filter((e) => e.type === 'turn.start' && e.turnNumber === 1); + const turn1Ends = events.filter((e) => e.type === 'turn.end' && e.turnNumber === 1); + expect(turn1Starts).toHaveLength(1); + expect(turn1Ends).toHaveLength(1); + + // Ordering: start < retry < end + const startIdx = events.findIndex((e) => e.type === 'turn.start' && e.turnNumber === 1); + const retryIdx = events.findIndex((e) => e.type === 'turn.retry'); + const endIdx = events.findIndex((e) => e.type === 'turn.end' && e.turnNumber === 1); + expect(startIdx).toBeLessThan(retryIdx); + expect(retryIdx).toBeLessThan(endIdx); + }); + + it('recovers a dead turn-0 stream without poisoning stream consumers', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Hello!')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + const events: Array<{ + type: string; + turnNumber?: number; + }> = []; + for await (const event of result.getFullResponsesStream()) { + if ('type' in event) { + events.push( + event as { + type: string; + turnNumber?: number; + }, + ); + } + } + + const retryEvents = events.filter((e) => e.type === 'turn.retry'); + expect(retryEvents).toHaveLength(1); + + const turn0Ends = events.filter((e) => e.type === 'turn.end' && e.turnNumber === 0); + expect(turn0Ends).toHaveLength(1); + + // The stream completed cleanly and the text is retrievable. + await expect(result.getText()).resolves.toBe('Hello!'); + }); +});