diff --git a/.changeset/turn-level-retry.md b/.changeset/turn-level-retry.md new file mode 100644 index 0000000..61fd21e --- /dev/null +++ b/.changeset/turn-level-retry.md @@ -0,0 +1,16 @@ +--- +'@openrouter/agent': minor +--- + +Add turn-level retry and hang detection to the callModel tool loop, plus a tool-level retry helper. + +**`retryTurn` option on `callModel`** — when a turn (one provider request + stream consumption) fails, the turn is re-sent with the full accumulated conversation intact instead of aborting the whole loop. Tool results gathered in prior turns are never discarded and tools are not re-executed. Covers all turn sites: the initial request (send and consume phases), follow-up requests after tool execution, the forced final response, and state resume. + +- `limit` — max retries per turn (default 2) +- `idleTimeoutMs` — converts silently-hung streams (no events, no terminal frame, connection left open) into retryable `TurnIdleTimeoutError` failures; the hung connection is cancelled +- `isRetryable` — custom retryability policy; the default (`defaultIsTurnRetryable`) retries idle timeouts, streams that ended without a terminal event, network errors, and HTTP 408/429/5xx, and does not retry `response.failed` terminal events (e.g. refusals) or other 4xx +- `backoffMs` — fixed or per-attempt delay between retries + +Mid-turn retries emit a new `turn.retry` event on `getFullResponsesStream()` (`isTurnRetryEvent` guard exported); events already received for that turn should be treated as void since the retried attempt re-streams the turn from the start. Failure classification is now typed: `TurnIdleTimeoutError`, `TurnStreamEndedError`, `TurnResponseFailedError` (messages unchanged). + +**`withToolRetry(tool, options)`** — wrap a tool so its `execute` function is automatically re-run when it throws (transient network failures inside tools no longer burn a model round trip). Supports regular and generator tools, preserves tool typing and type-guard classification, with `limit`, `backoffMs`, `isRetryable`, and `onRetry` observability hook. Only wrap idempotent tools. diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 4540e07..716998c 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -133,6 +133,9 @@ export type { ContextInput } from './lib/tool-context.js'; export { buildToolExecuteContext, ToolContextStore } from './lib/tool-context.js'; // Real-time tool event broadcasting export { ToolEventBroadcaster } from './lib/tool-event-broadcaster.js'; +// Tool-level retry +export type { ToolRetryContext, ToolRetryOptions } from './lib/tool-retry.js'; +export { withToolRetry } from './lib/tool-retry.js'; export type { ChatStreamEvent, ClientTool, @@ -178,6 +181,7 @@ export type { ToolWithGenerator, TurnContext, TurnEndEvent, + TurnRetryEvent, TurnStartEvent, TypedToolCall, TypedToolCallUnion, @@ -198,11 +202,20 @@ export { isToolPreliminaryResultEvent, isToolResultEvent, isTurnEndEvent, + isTurnRetryEvent, isTurnStartEvent, ToolType, toolHasApprovalConfigured, } from './lib/tool-types.js'; // Turn context helpers export { buildTurnContext, normalizeInputToArray } from './lib/turn-context.js'; +// Turn-level retry +export type { RetryTurnOptions, TurnRetryContext } from './lib/turn-retry.js'; +export { + defaultIsTurnRetryable, + TurnIdleTimeoutError, + TurnResponseFailedError, + TurnStreamEndedError, +} from './lib/turn-retry.js'; export type { Hook, OpenRouterOptions, SDKOptions } from './openrouter.js'; export { OpenRouter } from './openrouter.js'; diff --git a/packages/agent/src/inner-loop/call-model.ts b/packages/agent/src/inner-loop/call-model.ts index dc42df9..f862eed 100644 --- a/packages/agent/src/inner-loop/call-model.ts +++ b/packages/agent/src/inner-loop/call-model.ts @@ -101,6 +101,7 @@ export function callModel< onTurnStart, onTurnEnd, allowFinalResponse, + retryTurn, ...apiRequest } = request; @@ -165,5 +166,8 @@ export function callModel< ...(allowFinalResponse !== undefined && { allowFinalResponse, }), + ...(retryTurn !== undefined && { + retryTurn, + }), } as GetResponseOptions); } diff --git a/packages/agent/src/lib/async-params.ts b/packages/agent/src/lib/async-params.ts index 18ba08d..b1b352a 100644 --- a/packages/agent/src/lib/async-params.ts +++ b/packages/agent/src/lib/async-params.ts @@ -10,6 +10,7 @@ import type { ToolContextMapWithShared, TurnContext, } from './tool-types.js'; +import type { RetryTurnOptions } from './turn-retry.js'; // Re-export Tool type for convenience export type { Tool } from './tool-types.js'; @@ -98,6 +99,22 @@ type BaseCallModelInput< * (HITL pause, approval pause, interruption, or natural completion). */ allowFinalResponse?: boolean | string; + /** + * Retry a failed turn (one provider request + stream consumption) instead + * of aborting the whole tool loop. The accumulated conversation — + * including all tool results gathered in prior turns — is preserved + * across retries, so a single dead stream no longer discards the loop's + * gathered context. + * + * `idleTimeoutMs` additionally converts silently-hung streams (no events, + * no terminal frame, connection left open) into retryable failures. + * + * Mid-turn retries emit a `turn.retry` event on + * `getFullResponsesStream()`; events already received for that turn + * should be treated as void since the retried attempt re-streams the + * turn from the start. + */ + retryTurn?: RetryTurnOptions; }; /** @@ -199,6 +216,7 @@ export async function resolveAsyncFunctions { @@ -308,63 +329,92 @@ export class ModelResult< } satisfies TurnStartEvent); const consumer = stream.createConsumer(); + let sawTerminalEvent = false; for await (const event of consumer) { broadcaster.push(event); + if (isResponseCompletedEvent(event) || isResponseIncompleteEvent(event)) { + sawTerminalEvent = true; + } } - broadcaster.push({ - type: 'turn.end', - turnNumber: 0, - timestamp: Date.now(), - } satisfies TurnEndEvent); + // With turn retry enabled, a stream that ended without a terminal + // event is a failed attempt that the loop path will retry — the + // retried attempt (pipeAndConsumeStream) owns turn.end for turn 0. + if (!retryEnabled || sawTerminalEvent) { + broadcaster.push({ + type: 'turn.end', + turnNumber: 0, + timestamp: Date.now(), + } satisfies TurnEndEvent); + } })().catch((error) => { - broadcaster.complete(error instanceof Error ? error : new Error(String(error))); + // With turn retry enabled the loop path owns turn-0 failures (it will + // retry or reject); completing the broadcaster here would poison every + // stream consumer before the retry gets a chance. + if (!retryEnabled) { + broadcaster.complete(error instanceof Error ? error : new Error(String(error))); + } }); } /** * Pipe a follow-up stream into the turn broadcaster and capture the completed response. * Emits turn.start / turn.end delimiters around the stream events. + * + * Retried attempts pass `emitTurnStart: false` — the `turn.retry` event + * (pushed by the retry driver) marks the restart instead of a duplicate + * `turn.start`. */ private async pipeAndConsumeStream( stream: ReusableReadableStream, turnNumber: number, + opts?: { + emitTurnStart?: boolean; + }, ): Promise { const broadcaster = this.turnBroadcaster!; - broadcaster.push({ - type: 'turn.start', - turnNumber, - timestamp: Date.now(), - } satisfies TurnStartEvent); + if (opts?.emitTurnStart !== false) { + broadcaster.push({ + type: 'turn.start', + turnNumber, + timestamp: Date.now(), + } satisfies TurnStartEvent); + } const consumer = stream.createConsumer(); + const idleTimeoutMs = this.options.retryTurn?.idleTimeoutMs; + const events = iterateWithIdleTimeout( + consumer, + idleTimeoutMs, + () => new TurnIdleTimeoutError(turnNumber, idleTimeoutMs ?? 0), + ); let completedResponse: models.OpenResponsesResult | null = null; - for await (const event of consumer) { + for await (const event of events) { broadcaster.push(event); if (isResponseCompletedEvent(event)) { completedResponse = event.response; } if (isResponseFailedEvent(event)) { const errorMsg = 'message' in event ? String(event.message) : 'Response failed'; - throw new Error(errorMsg); + throw new TurnResponseFailedError(errorMsg); } if (isResponseIncompleteEvent(event)) { completedResponse = event.response; } } + if (!completedResponse) { + throw new TurnStreamEndedError('Follow-up stream ended without a completed response'); + } + broadcaster.push({ type: 'turn.end', turnNumber, timestamp: Date.now(), } satisfies TurnEndEvent); - if (!completedResponse) { - throw new Error('Follow-up stream ended without a completed response'); - } - return completedResponse; } @@ -477,6 +527,198 @@ export class ModelResult< throw new Error('Neither stream nor response initialized'); } + // ========================================================================= + // Turn-level retry + // ========================================================================= + + /** + * Run a turn attempt with retry per the `retryTurn` option. `attemptFn` + * receives the attempt number (0 = initial attempt) and must be safe to + * call repeatedly — all conversation-state mutation happens outside it. + * Each retry pushes a `turn.retry` event to the turn broadcaster (when one + * exists) before re-attempting. + */ + private async runTurnAttempts( + turnNumber: number, + attemptFn: (attempt: number) => Promise, + ): Promise { + const config = this.options.retryTurn; + const limit = config ? (config.limit ?? 2) : 0; + let attempt = 0; + + while (true) { + try { + return await attemptFn(attempt); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + if (!config || attempt >= limit) { + throw err; + } + + const retryContext = { + turnNumber, + attempt: attempt + 1, + }; + const retryable = config.isRetryable + ? await config.isRetryable(err, retryContext) + : defaultIsTurnRetryable(err); + if (!retryable) { + throw err; + } + + attempt++; + this.turnBroadcaster?.push({ + type: 'turn.retry', + turnNumber, + attempt, + error: err.message, + timestamp: Date.now(), + } satisfies TurnRetryEvent); + + const backoff = resolveBackoffMs(config.backoffMs, attempt); + if (backoff > 0) { + await sleep(backoff); + } + } + } + } + + /** + * Send a turn request and consume its stream to a completed response. + * This is the idempotent retry unit: it performs no conversation-state + * mutation, so a failed call can be re-issued as-is. On failure the + * stream is cancelled to release the underlying connection. + */ + private async sendAndConsumeTurn( + request: models.ResponsesRequest, + turnNumber: number, + attempt: number, + ): Promise { + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + + if (!result.ok) { + throw result.error; + } + + const value = result.value; + if (isEventStream(value)) { + const stream = new ReusableReadableStream(value); + try { + if (this.turnBroadcaster) { + return await this.pipeAndConsumeStream(stream, turnNumber, { + emitTurnStart: attempt === 0, + }); + } + return await consumeStreamForCompletion(stream, { + idleTimeoutMs: this.options.retryTurn?.idleTimeoutMs, + turnNumber, + }); + } catch (error) { + // Abort the (possibly hung) upstream connection before retrying. + void stream.cancel().catch(() => {}); + throw error; + } + } + if (this.isNonStreamingResponse(value)) { + return value; + } + throw new Error('Unexpected response type from API'); + } + + /** + * Consume the initial (turn 0) response with turn-level retry. A retry + * re-issues the already-resolved initial request — the resolved input and + * instructions are unchanged — and replaces `this.reusableStream` so + * late-attaching consumers replay the successful attempt. When the turn + * broadcaster is active, the retried attempt is piped through + * `pipeAndConsumeStream` so stream consumers receive the fresh events + * (the failed initial pipe intentionally skips its `turn.end`). + */ + private async getInitialResponseWithRetry(): Promise { + if (!this.options.retryTurn) { + return this.getInitialResponse(); + } + + return this.runTurnAttempts(0, async (attempt) => { + if (attempt === 0) { + if (this.finalResponse) { + return this.finalResponse; + } + const stream = this.reusableStream; + if (!stream) { + throw new Error('Neither stream nor response initialized'); + } + try { + return await consumeStreamForCompletion(stream, { + idleTimeoutMs: this.options.retryTurn?.idleTimeoutMs, + turnNumber: 0, + }); + } catch (error) { + void stream.cancel().catch(() => {}); + throw error; + } + } + + // Retry attempt: re-issue the resolved initial request. + if (!this.resolvedRequest) { + throw new Error('Request not initialized'); + } + const request: models.ResponsesRequest = { + ...this.resolvedRequest, + stream: true, + }; + + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + if (!result.ok) { + throw result.error; + } + + const value = result.value; + if (isEventStream(value)) { + const stream = new ReusableReadableStream(value); + this.reusableStream = stream; + try { + if (this.turnBroadcaster && this.initialStreamPipeStarted) { + return await this.pipeAndConsumeStream(stream, 0, { + emitTurnStart: false, + }); + } + return await consumeStreamForCompletion(stream, { + idleTimeoutMs: this.options.retryTurn?.idleTimeoutMs, + turnNumber: 0, + }); + } catch (error) { + void stream.cancel().catch(() => {}); + throw error; + } + } + if (this.isNonStreamingResponse(value)) { + this.finalResponse = value; + if (this.turnBroadcaster && this.initialStreamPipeStarted) { + this.turnBroadcaster.push({ + type: 'turn.end', + turnNumber: 0, + timestamp: Date.now(), + } satisfies TurnEndEvent); + } + return value; + } + throw new Error('Unexpected response type from API'); + }); + } + /** * Save response output to state. * Appends the response output to the message history and records the response ID. @@ -1133,33 +1375,11 @@ export class ModelResult< stream: true, }; - const newResult = await betaResponsesSend( - this.options.client, - { - responsesRequest: newRequest, - }, - this.options.options, + // The conversation state for this turn is fully captured in `newRequest` + // above — send + consume is the idempotent retry unit. + return this.runTurnAttempts(turnNumber, (attempt) => + this.sendAndConsumeTurn(newRequest, turnNumber, attempt), ); - - if (!newResult.ok) { - throw newResult.error; - } - - // Handle streaming or non-streaming response - const value = newResult.value; - if (isEventStream(value)) { - const followUpStream = new ReusableReadableStream(value); - - if (this.turnBroadcaster) { - return this.pipeAndConsumeStream(followUpStream, turnNumber); - } - - return consumeStreamForCompletion(followUpStream); - } - if (this.isNonStreamingResponse(value)) { - return value; - } - throw new Error('Unexpected response type from API'); } /** @@ -1224,30 +1444,11 @@ export class ModelResult< stream: true, }; - const result = await betaResponsesSend( - this.options.client, - { - responsesRequest: finalRequest, - }, - this.options.options, + // `finalRequest` carries the full conversation — send + consume is the + // idempotent retry unit. + return this.runTurnAttempts(turnNumber, (attempt) => + this.sendAndConsumeTurn(finalRequest, turnNumber, attempt), ); - - if (!result.ok) { - throw result.error; - } - - const value = result.value; - if (isEventStream(value)) { - const stream = new ReusableReadableStream(value); - if (this.turnBroadcaster) { - return this.pipeAndConsumeStream(stream, turnNumber); - } - return consumeStreamForCompletion(stream); - } - if (this.isNonStreamingResponse(value)) { - return value; - } - throw new Error('Unexpected response type from API'); } /** @@ -1579,18 +1780,22 @@ export class ModelResult< // Force stream mode for initial request const request = this.resolvedRequest; - // Make the API request - const apiResult = await betaResponsesSend( - this.options.client, - { - responsesRequest: request, - }, - this.options.options, - ); - - if (!apiResult.ok) { - throw apiResult.error; - } + // Make the API request. Send-phase failures (the request never + // produced a stream) are retried per `retryTurn`; nothing has been + // exposed to consumers yet, so the retry is invisible. + const apiResult = await this.runTurnAttempts(0, async () => { + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + if (!result.ok) { + throw result.error; + } + return result; + }); // Stash fresh user items so saveResponseToState can persist them // atomically with the assistant output. Writing them here would leave @@ -1805,18 +2010,20 @@ export class ModelResult< this.resolvedRequest = request; - // Make the API request - const apiResult = await betaResponsesSend( - this.options.client, - { - responsesRequest: request, - }, - this.options.options, - ); - - if (!apiResult.ok) { - throw apiResult.error; - } + // Make the API request (send-phase failures retried per `retryTurn`) + const apiResult = await this.runTurnAttempts(turnContext.numberOfTurns, async () => { + const result = await betaResponsesSend( + this.options.client, + { + responsesRequest: request, + }, + this.options.options, + ); + if (!result.ok) { + throw result.error; + } + return result; + }); // Handle both streaming and non-streaming responses if (isEventStream(apiResult.value)) { @@ -1852,8 +2059,8 @@ export class ModelResult< return; } - // Get initial response - let currentResponse = await this.getInitialResponse(); + // Get initial response (with turn-level retry when configured) + let currentResponse = await this.getInitialResponseWithRetry(); // Save initial response to state await this.saveResponseToState(currentResponse); diff --git a/packages/agent/src/lib/stream-transformers.ts b/packages/agent/src/lib/stream-transformers.ts index b35b56b..6a7e6b4 100644 --- a/packages/agent/src/lib/stream-transformers.ts +++ b/packages/agent/src/lib/stream-transformers.ts @@ -31,6 +31,12 @@ import { isWebSearchCallOutputItem, } from './stream-type-guards.js'; import type { ClientTool, ParsedToolCall, ServerTool, Tool } from './tool-types.js'; +import { + iterateWithIdleTimeout, + TurnIdleTimeoutError, + TurnResponseFailedError, + TurnStreamEndedError, +} from './turn-retry.js'; /** * Extract text deltas from responses stream events @@ -630,14 +636,29 @@ export async function* buildMessageStream( } /** - * Consume stream until completion and return the complete response + * Consume stream until completion and return the complete response. + * + * When `idleTimeoutMs` is set, a gap of more than that many milliseconds + * between events fails consumption with a retryable `TurnIdleTimeoutError` + * (the caller is responsible for cancelling the underlying stream). */ export async function consumeStreamForCompletion( stream: ReusableReadableStream, + options?: { + idleTimeoutMs?: number | undefined; + turnNumber?: number | undefined; + }, ): Promise { const consumer = stream.createConsumer(); + const turnNumber = options?.turnNumber ?? 0; - for await (const event of consumer) { + const events = iterateWithIdleTimeout( + consumer, + options?.idleTimeoutMs, + () => new TurnIdleTimeoutError(turnNumber, options?.idleTimeoutMs ?? 0), + ); + + for await (const event of events) { if (!('type' in event)) { continue; } @@ -648,7 +669,7 @@ export async function consumeStreamForCompletion( if (isResponseFailedEvent(event)) { // The failed event contains the full response with error information - throw new Error(`Response failed: ${JSON.stringify(event.response.error)}`); + throw new TurnResponseFailedError(`Response failed: ${JSON.stringify(event.response.error)}`); } if (isResponseIncompleteEvent(event)) { @@ -657,7 +678,7 @@ export async function consumeStreamForCompletion( } } - throw new Error('Stream ended without completion event'); + throw new TurnStreamEndedError('Stream ended without completion event'); } /** diff --git a/packages/agent/src/lib/tool-retry.ts b/packages/agent/src/lib/tool-retry.ts new file mode 100644 index 0000000..a62ee98 --- /dev/null +++ b/packages/agent/src/lib/tool-retry.ts @@ -0,0 +1,181 @@ +/** + * Tool-level retry: wrap a tool so its `execute` function is automatically + * re-run when it throws. This absorbs transient failures (e.g. a flaky + * network call inside the tool) before they reach the model — without the + * wrapper, a throwing tool is reported to the model as an error result, + * which burns a full provider round trip while the model decides to re-try. + * + * The SDK stays unopinionated about tool errors by default; retry is the + * tool author's policy, opted into per tool: + * + * ```typescript + * const fetchTool = withToolRetry( + * tool({ + * name: 'web_fetch', + * // ... + * execute: async ({ url }) => fetchPage(url), + * }), + * { limit: 2, onRetry: ({ toolName, attempt, error }) => log(...) }, + * ); + * ``` + * + * Notes: + * - Only wrap tools whose `execute` is safe to re-run (idempotent or + * side-effect free). Do not wrap tools like `send_email`. + * - Generator tools are re-run from the start on retry; preliminary results + * already emitted by the failed attempt will have been forwarded to + * consumers and may repeat. + * - HITL `onToolCalled` hooks and manual tools are not retried; the tool is + * returned unchanged if it has no `execute` function. + */ + +import type { Tool } from './tool-types.js'; +import { resolveBackoffMs, sleep } from './turn-retry.js'; + +/** + * Context passed to `isRetryable` / `onRetry` when a tool execution fails. + */ +export interface ToolRetryContext { + /** Name of the tool whose execution failed. */ + toolName: string; + /** The retry attempt about to be made (1 = first retry). */ + attempt: number; + /** The error thrown by the failed attempt. */ + error: unknown; +} + +/** + * Options for `withToolRetry`. + */ +export interface ToolRetryOptions { + /** + * Maximum number of retries (not counting the initial attempt). + * Default: 2. + */ + limit?: number; + /** + * Delay before each retry attempt, in milliseconds. A function receives + * the attempt number (1 = first retry). Default: 0. + */ + backoffMs?: number | ((attempt: number) => number); + /** + * Decide whether a thrown error should be retried. Default: retry all + * errors (the wrapper only sees `execute` throws; input validation + * happens before it and is never retried). + */ + isRetryable?: (context: ToolRetryContext) => boolean | Promise; + /** + * Observability hook invoked before each retry attempt. + */ + onRetry?: (context: ToolRetryContext) => void | Promise; +} + +/** + * Decide whether a failure should be retried and run the retry hooks. + * Returns true when the caller should re-attempt. + */ +async function shouldRetry( + options: ToolRetryOptions | undefined, + context: ToolRetryContext, +): Promise { + const limit = options?.limit ?? 2; + if (context.attempt > limit) { + return false; + } + if (options?.isRetryable && !(await options.isRetryable(context))) { + return false; + } + await options?.onRetry?.(context); + const backoff = resolveBackoffMs(options?.backoffMs, context.attempt); + if (backoff > 0) { + await sleep(backoff); + } + return true; +} + +/** + * Wrap a tool so its `execute` function is automatically re-run (up to + * `limit` times) when it throws. Regular async tools and generator tools + * are both supported; tools without an `execute` function are returned + * unchanged. The tool's type — including its schemas and inferred + * input/output types — is preserved. + */ +export function withToolRetry(tool: TTool, options?: ToolRetryOptions): TTool { + const fn = ( + tool as { + function?: { + name?: string; + execute?: unknown; + }; + } + ).function; + if (!fn || typeof fn.execute !== 'function') { + return tool; + } + + const toolName = fn.name ?? 'unknown'; + const originalExecute = fn.execute as (...args: unknown[]) => unknown; + // Match the SDK's own discriminator (isGeneratorTool): a tool is a + // generator tool iff its function declares an eventSchema. The executor + // drives generator tools via iterator.next(), so the wrapper must also be + // an async generator in that case. + const isGenerator = 'eventSchema' in fn; + + let wrappedExecute: (...args: unknown[]) => unknown; + + if (isGenerator) { + // Generator tool: a failed attempt is re-run from the start. Yields from + // the failed attempt have already been forwarded. + wrappedExecute = async function* retryingGeneratorExecute(...args: unknown[]) { + let attempt = 0; + while (true) { + try { + const iterator = originalExecute(...args) as AsyncGenerator; + let step = await iterator.next(); + while (!step.done) { + yield step.value; + step = await iterator.next(); + } + return step.value; + } catch (error) { + attempt++; + const retry = await shouldRetry(options, { + toolName, + attempt, + error, + }); + if (!retry) { + throw error; + } + } + } + }; + } else { + wrappedExecute = async function retryingExecute(...args: unknown[]) { + let attempt = 0; + while (true) { + try { + return await originalExecute(...args); + } catch (error) { + attempt++; + const retry = await shouldRetry(options, { + toolName, + attempt, + error, + }); + if (!retry) { + throw error; + } + } + } + }; + } + + return { + ...tool, + function: { + ...fn, + execute: wrappedExecute, + }, + } as TTool; +} diff --git a/packages/agent/src/lib/tool-types.ts b/packages/agent/src/lib/tool-types.ts index e768358..3622039 100644 --- a/packages/agent/src/lib/tool-types.ts +++ b/packages/agent/src/lib/tool-types.ts @@ -811,6 +811,24 @@ export type TurnEndEvent = { timestamp: number; }; +/** + * Turn retry event emitted when a turn failed and is about to be re-sent + * (requires the `retryTurn` option). Events already emitted for this turn + * before the failure should be treated as void — the retried attempt + * re-streams the turn from the start. For send-phase failures (the request + * never produced a stream) a `turn.retry` may precede the turn's + * `turn.start`. + */ +export type TurnRetryEvent = { + type: 'turn.retry'; + turnNumber: number; + /** The retry attempt about to be made (1 = first retry). */ + attempt: number; + /** Message of the error that caused the retry. */ + error: string; + timestamp: number; +}; + /** * Enhanced stream event types for getFullResponsesStream * Extends StreamEvents with tool preliminary results, tool results, @@ -824,7 +842,8 @@ export type ResponseStreamEvent = | ToolResultEvent | ToolCallOutputEvent | TurnStartEvent - | TurnEndEvent; + | TurnEndEvent + | TurnRetryEvent; /** * Type guard to check if an event is a tool preliminary result event @@ -865,6 +884,13 @@ export function isTurnEndEvent(event: ResponseStreamEvent): event is TurnEndEven return event.type === 'turn.end'; } +/** + * Type guard to check if an event is a turn retry event + */ +export function isTurnRetryEvent(event: ResponseStreamEvent): event is TurnRetryEvent { + return event.type === 'turn.retry'; +} + /** * Tool stream event types for getToolStream * Includes both argument deltas and preliminary results diff --git a/packages/agent/src/lib/turn-retry.ts b/packages/agent/src/lib/turn-retry.ts new file mode 100644 index 0000000..d27ed07 --- /dev/null +++ b/packages/agent/src/lib/turn-retry.ts @@ -0,0 +1,217 @@ +/** + * Turn-level retry support for the callModel tool loop. + * + * A "turn" is one provider request + stream consumption. The conversation + * state accumulated across turns (tool results, prior outputs) lives on the + * ModelResult instance, so a failed turn can be re-sent without losing any + * gathered context. This module holds the public option types, the typed + * errors that classify turn failures, the default retryability policy, and + * the idle-timeout iterator that converts silently-hung streams into + * retryable failures. + */ + +/** + * Context passed to `isRetryable` when deciding whether to retry a failed turn. + */ +export interface TurnRetryContext { + /** The turn that failed (0 = initial request). */ + turnNumber: number; + /** The retry attempt about to be made (1 = first retry). */ + attempt: number; +} + +/** + * Options for turn-level retry in the callModel tool loop. + * + * When a turn's provider request or stream fails, the turn is re-sent with + * the full accumulated conversation intact — tool results gathered in prior + * turns are never discarded. Without this option a single dead turn aborts + * the entire loop. + * + * Mid-turn retries emit a `turn.retry` event on the unified stream + * (`getFullResponsesStream`). Events received from the failed attempt before + * the failure remain in the stream; consumers that care about exact turn + * contents should treat events between `turn.start`/`turn.retry` and a + * subsequent `turn.retry` as void. + */ +export interface RetryTurnOptions { + /** + * Maximum number of retries per turn (not counting the initial attempt). + * Default: 2. + */ + limit?: number; + /** + * If no stream event arrives for this many milliseconds during a turn, the + * turn fails with a retryable `TurnIdleTimeoutError` and the underlying + * stream is cancelled. This is what converts silently-hung provider + * streams (no terminal event, connection left open) into recoverable + * failures. Default: no idle timeout. + */ + idleTimeoutMs?: number; + /** + * Delay before each retry attempt, in milliseconds. A function receives + * the attempt number (1 = first retry). Default: 0 (the provider round + * trip itself dominates latency). + */ + backoffMs?: number | ((attempt: number) => number); + /** + * Decide whether a failed turn should be retried. Defaults to + * `defaultIsTurnRetryable`: retries idle timeouts, streams that ended + * without a terminal event, network errors, and HTTP 408/429/5xx; does not + * retry `response.failed` events (e.g. refusals) or other HTTP 4xx. + */ + isRetryable?: (error: Error, context: TurnRetryContext) => boolean | Promise; +} + +/** + * Thrown when a turn's stream produced no events for `idleTimeoutMs` + * milliseconds. Retryable by default. + */ +export class TurnIdleTimeoutError extends Error { + readonly turnNumber: number; + readonly idleTimeoutMs: number; + + constructor(turnNumber: number, idleTimeoutMs: number) { + super( + `Turn ${turnNumber} stream produced no events for ${idleTimeoutMs}ms (idle timeout exceeded)`, + ); + this.name = 'TurnIdleTimeoutError'; + this.turnNumber = turnNumber; + this.idleTimeoutMs = idleTimeoutMs; + } +} + +/** + * Thrown when a turn's stream closed without a terminal event + * (`response.completed` / `response.incomplete`). This is the signature of + * an upstream stream dying mid-flight. Retryable by default. + */ +export class TurnStreamEndedError extends Error { + constructor(message: string) { + super(message); + this.name = 'TurnStreamEndedError'; + } +} + +/** + * Thrown when a turn's stream emitted a terminal `response.failed` event. + * The provider deliberately failed the response (which includes refusals), + * so this is NOT retryable by default — opt in via `isRetryable` if your + * gateway emits transient failures this way. + */ +export class TurnResponseFailedError extends Error { + constructor(message: string) { + super(message); + this.name = 'TurnResponseFailedError'; + } +} + +/** + * Best-effort extraction of an HTTP status code from SDK / fetch errors. + */ +function statusCodeOf(error: Error): number | undefined { + const candidate = error as Error & { + statusCode?: unknown; + status?: unknown; + httpMeta?: { + response?: { + status?: unknown; + }; + }; + }; + for (const value of [ + candidate.statusCode, + candidate.status, + candidate.httpMeta?.response?.status, + ]) { + if (typeof value === 'number') { + return value; + } + } + return undefined; +} + +/** + * Default turn retryability policy: + * - `TurnIdleTimeoutError` / `TurnStreamEndedError` — retry (dead/hung stream) + * - `TurnResponseFailedError` — don't retry (deliberate provider failure, + * e.g. a refusal; retrying re-asks a deterministic question) + * - HTTP errors — retry 408, 429, and 5xx; don't retry other 4xx + * - anything else (network/socket errors surfaced by the fetch layer) — retry + */ +export function defaultIsTurnRetryable(error: Error): boolean { + if (error instanceof TurnIdleTimeoutError || error instanceof TurnStreamEndedError) { + return true; + } + if (error instanceof TurnResponseFailedError) { + return false; + } + const status = statusCodeOf(error); + if (status !== undefined) { + return status === 408 || status === 429 || status >= 500; + } + return true; +} + +/** + * Resolve the backoff delay for a retry attempt. + */ +export function resolveBackoffMs( + backoffMs: RetryTurnOptions['backoffMs'], + attempt: number, +): number { + if (typeof backoffMs === 'function') { + return backoffMs(attempt); + } + return backoffMs ?? 0; +} + +/** + * Wrap an async iterator so that a gap of more than `idleTimeoutMs` between + * events fails the iteration with the error produced by `makeTimeoutError`. + * + * The caller is responsible for cancelling the underlying stream when the + * timeout fires — the wrapped iterator's pending `next()` is abandoned (its + * eventual settlement is swallowed to avoid unhandled rejections). + */ +export async function* iterateWithIdleTimeout( + iterator: AsyncIterableIterator, + idleTimeoutMs: number | undefined, + makeTimeoutError: () => Error, +): AsyncIterableIterator { + if (!idleTimeoutMs || idleTimeoutMs <= 0) { + yield* iterator; + return; + } + + while (true) { + let timer: ReturnType | undefined; + const nextPromise = iterator.next(); + // If the timeout wins the race, nobody awaits this promise anymore — + // swallow its eventual rejection so it can't surface as unhandled. + nextPromise.catch(() => {}); + try { + const result = await Promise.race([ + nextPromise, + new Promise((_, reject) => { + timer = setTimeout(() => reject(makeTimeoutError()), idleTimeoutMs); + }), + ]); + if (result.done) { + return; + } + yield result.value; + } finally { + if (timer !== undefined) { + clearTimeout(timer); + } + } + } +} + +/** + * Promise-based sleep used between retry attempts. + */ +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/agent/tests/unit/tool-retry.test.ts b/packages/agent/tests/unit/tool-retry.test.ts new file mode 100644 index 0000000..de9f363 --- /dev/null +++ b/packages/agent/tests/unit/tool-retry.test.ts @@ -0,0 +1,305 @@ +/** + * Tests for `withToolRetry` — automatic re-run of a tool's execute function + * when it throws, absorbing transient failures before they reach the model. + */ +import { describe, expect, it, vi } from 'vitest'; +import { z } from 'zod/v4'; +import { withToolRetry } from '../../src/lib/tool-retry.js'; +import type { Tool } from '../../src/lib/tool-types.js'; +import { isGeneratorTool, isRegularExecuteTool, ToolType } from '../../src/lib/tool-types.js'; + +function makeRegularTool( + execute: (params: { url: string }) => Promise<{ + body: string; + }>, +) { + return { + type: ToolType.Function, + function: { + name: 'web_fetch', + description: 'Fetch a page.', + inputSchema: z.object({ + url: z.string(), + }), + outputSchema: z.object({ + body: z.string(), + }), + execute, + }, + } as const; +} + +describe('withToolRetry: regular execute tools', () => { + it('retries a throwing execute and returns the eventual success', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValueOnce(new Error('ECONNRESET')) + .mockRejectedValueOnce(new Error('ETIMEDOUT')) + .mockResolvedValueOnce({ + body: 'hello', + }); + + const wrapped = withToolRetry(makeRegularTool(execute), { + limit: 2, + }); + + const result = await ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }); + + expect(result).toEqual({ + body: 'hello', + }); + expect(execute).toHaveBeenCalledTimes(3); + }); + + it('throws the last error once the limit is exhausted', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValue(new Error('permanently down')); + + const wrapped = withToolRetry(makeRegularTool(execute), { + limit: 1, + }); + + await expect( + ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }), + ).rejects.toThrow('permanently down'); + expect(execute).toHaveBeenCalledTimes(2); + }); + + it('respects isRetryable declining a retry', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValue(new Error('404 not found')); + const isRetryable = vi.fn(() => false); + + const wrapped = withToolRetry(makeRegularTool(execute), { + limit: 3, + isRetryable, + }); + + await expect( + ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }), + ).rejects.toThrow('404 not found'); + expect(execute).toHaveBeenCalledTimes(1); + expect(isRetryable).toHaveBeenCalledWith( + expect.objectContaining({ + toolName: 'web_fetch', + attempt: 1, + }), + ); + }); + + it('invokes onRetry with attempt context before each retry', async () => { + const execute = vi + .fn< + (params: { url: string }) => Promise<{ + body: string; + }> + >() + .mockRejectedValueOnce(new Error('flaky')) + .mockResolvedValueOnce({ + body: 'ok', + }); + const onRetry = vi.fn(); + + const wrapped = withToolRetry(makeRegularTool(execute), { + onRetry, + }); + + await ( + wrapped.function.execute as (params: { url: string }) => Promise<{ + body: string; + }> + )({ + url: 'https://example.com', + }); + + expect(onRetry).toHaveBeenCalledTimes(1); + expect(onRetry).toHaveBeenCalledWith( + expect.objectContaining({ + toolName: 'web_fetch', + attempt: 1, + error: expect.any(Error), + }), + ); + }); + + it('preserves the tool shape and type-guard classification', () => { + const original = makeRegularTool(async () => ({ + body: 'x', + })); + const wrapped = withToolRetry(original); + + expect(isRegularExecuteTool(wrapped as unknown as Tool)).toBe(true); + expect(isGeneratorTool(wrapped as unknown as Tool)).toBe(false); + expect(wrapped.function.name).toBe('web_fetch'); + expect(wrapped.function.inputSchema).toBe(original.function.inputSchema); + expect(wrapped.function.outputSchema).toBe(original.function.outputSchema); + }); + + it('returns tools without an execute function unchanged', () => { + const manualTool = { + type: ToolType.Function, + function: { + name: 'manual_thing', + description: 'No execute.', + inputSchema: z.object({}), + outputSchema: z.object({}), + }, + } as const; + + const wrapped = withToolRetry(manualTool as unknown as Tool); + expect(wrapped).toBe(manualTool); + }); +}); + +describe('withToolRetry: generator tools', () => { + function makeGeneratorTool( + execute: (params: { query: string }) => AsyncGenerator, + ) { + return { + type: ToolType.Function, + function: { + name: 'web_search', + description: 'Search.', + inputSchema: z.object({ + query: z.string(), + }), + outputSchema: z.object({ + results: z.array(z.string()), + }), + eventSchema: z.object({ + progress: z.string(), + }), + execute, + }, + } as const; + } + + it('re-runs a generator that throws mid-iteration and yields the retried run', async () => { + let calls = 0; + const wrapped = withToolRetry( + makeGeneratorTool(async function* (_params) { + calls++; + yield { + progress: `attempt ${calls} started`, + }; + if (calls === 1) { + throw new Error('stream cut'); + } + return { + results: [ + 'found it', + ], + }; + }), + { + limit: 1, + }, + ); + + const iterator = ( + wrapped.function.execute as (params: { + query: string; + }) => AsyncGenerator + )({ + query: 'cats', + }); + + const yielded: unknown[] = []; + let step = await iterator.next(); + while (!step.done) { + yielded.push(step.value); + step = await iterator.next(); + } + + expect(calls).toBe(2); + // Yields from BOTH attempts are forwarded (documented behavior). + expect(yielded).toEqual([ + { + progress: 'attempt 1 started', + }, + { + progress: 'attempt 2 started', + }, + ]); + expect(step.value).toEqual({ + results: [ + 'found it', + ], + }); + }); + + it('keeps the generator classification so the executor drives it correctly', () => { + const wrapped = withToolRetry( + makeGeneratorTool(async function* () { + yield { + progress: 'hi', + }; + return { + results: [], + }; + }), + ); + expect(isGeneratorTool(wrapped as unknown as Tool)).toBe(true); + }); + + it('throws once generator retries are exhausted', async () => { + const wrapped = withToolRetry( + makeGeneratorTool(async function* () { + yield { + progress: 'starting', + }; + throw new Error('always fails'); + }), + { + limit: 1, + }, + ); + + const iterator = ( + wrapped.function.execute as (params: { + query: string; + }) => AsyncGenerator + )({ + query: 'cats', + }); + + await expect(async () => { + let step = await iterator.next(); + while (!step.done) { + step = await iterator.next(); + } + }).rejects.toThrow('always fails'); + }); +}); diff --git a/packages/agent/tests/unit/turn-retry.test.ts b/packages/agent/tests/unit/turn-retry.test.ts new file mode 100644 index 0000000..84d7153 --- /dev/null +++ b/packages/agent/tests/unit/turn-retry.test.ts @@ -0,0 +1,620 @@ +/** + * Tests for turn-level retry (`retryTurn` option). + * + * A "turn" is one provider request + stream consumption. These tests verify: + * - a dead follow-up stream is retried with the accumulated conversation + * intact (tool results are NOT re-executed or lost) + * - the initial (turn 0) stream is retried by re-issuing the resolved request + * - silently-hung streams are converted into retryable failures via + * `idleTimeoutMs` + * - `turn.retry` events surface on getFullResponsesStream + * - retry limits, retryability policy, and backoff hooks are honored + */ +import type { OpenRouterCore } from '@openrouter/sdk/core'; +import type * as models from '@openrouter/sdk/models'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { z } from 'zod/v4'; + +const mockBetaResponsesSend = vi.hoisted(() => vi.fn()); + +vi.mock('@openrouter/sdk/funcs/betaResponsesSend', () => ({ + betaResponsesSend: mockBetaResponsesSend, +})); + +import { callModel } from '../../src/inner-loop/call-model.js'; +import { ToolType } from '../../src/lib/tool-types.js'; +import { TurnIdleTimeoutError, TurnStreamEndedError } from '../../src/lib/turn-retry.js'; + +function toolCallResponse(): models.OpenResponsesResult { + return { + id: 'resp_tool_call', + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + type: 'function_call', + id: 'fc_1', + callId: 'call_abc', + name: 'get_weather', + arguments: '{"location":"Tokyo"}', + status: 'completed', + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +function textResponse(text: string): models.OpenResponsesResult { + return { + id: 'resp_text', + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + id: 'msg_text', + type: 'message', + role: 'assistant', + status: 'completed', + content: [ + { + type: 'output_text', + text, + annotations: [], + }, + ], + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +/** Stream that emits a completed event for the given response, then closes. */ +function completedStream(response: models.OpenResponsesResult): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.completed', + response, + }); + controller.close(); + }, + }); +} + +/** + * Stream that emits some deltas then closes WITHOUT a terminal event — the + * signature of an upstream stream dying mid-flight. + */ +function deadStream(): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.output_text.delta', + delta: 'partial...', + }); + controller.close(); + }, + }); +} + +/** Stream that emits one delta then never produces anything again (hang). */ +function hangingStream(): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.output_text.delta', + delta: 'and then silence', + }); + // Never closes, never errors — a silently-hung upstream connection. + }, + }); +} + +/** Stream whose transport errors mid-read. */ +function erroringStream(): ReadableStream { + return new ReadableStream({ + start(controller) { + controller.error(new Error('network drop mid-stream')); + }, + }); +} + +const weatherTool = { + type: ToolType.Function, + function: { + name: 'get_weather', + description: 'Get the weather for a location.', + inputSchema: z.object({ + location: z.string(), + }), + outputSchema: z.object({ + temperature: z.number(), + }), + execute: vi.fn(async (_params: { location: string }) => ({ + temperature: 22, + })), + }, +} as const; + +const client = {} as OpenRouterCore; + +function ok(value: unknown) { + return { + ok: true, + value, + }; +} + +describe('retryTurn: follow-up turn retry', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('retries a dead follow-up stream with the accumulated conversation intact', async () => { + mockBetaResponsesSend + // Turn 0: model asks for the tool + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + // Turn 1, attempt 0: stream dies without a terminal event + .mockResolvedValueOnce(ok(deadStream())) + // Turn 1, attempt 1 (retry): success + .mockResolvedValueOnce(ok(completedStream(textResponse('Sunny, 22C')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 2, + }, + }); + + const text = await result.getText(); + + expect(text).toBe('Sunny, 22C'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(3); + // The tool executed exactly once — retry re-sends the request, it does + // NOT re-execute tools. + expect(weatherTool.function.execute).toHaveBeenCalledTimes(1); + + // The retried request is byte-identical to the failed one: same + // accumulated conversation including the function_call_output. + const attempt0Input = mockBetaResponsesSend.mock.calls[1]?.[1]?.responsesRequest?.input; + const attempt1Input = mockBetaResponsesSend.mock.calls[2]?.[1]?.responsesRequest?.input; + expect(attempt1Input).toEqual(attempt0Input); + const hasToolOutput = ( + attempt1Input as Array<{ + type?: string; + }> + ).some((item) => item.type === 'function_call_output'); + expect(hasToolOutput).toBe(true); + }); + + it('does not retry when retryTurn is not configured', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + }); + + await expect(result.getText()).rejects.toThrow('Stream ended without completion event'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + }); + + it('gives up after the retry limit is exhausted', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + // Turn 1: initial attempt + 2 retries, all dead + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(deadStream())); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 2, + }, + }); + + await expect(result.getText()).rejects.toThrow(TurnStreamEndedError); + // 1 (turn 0) + 3 (turn 1 attempts) + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(4); + }); + + it('respects a custom isRetryable that declines the retry', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())); + + const isRetryable = vi.fn(() => false); + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 2, + isRetryable, + }, + }); + + await expect(result.getText()).rejects.toThrow(TurnStreamEndedError); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + expect(isRetryable).toHaveBeenCalledTimes(1); + const [error, context] = isRetryable.mock.calls[0] as unknown as [ + Error, + { + turnNumber: number; + attempt: number; + }, + ]; + expect(error).toBeInstanceOf(TurnStreamEndedError); + expect(context.turnNumber).toBe(1); + expect(context.attempt).toBe(1); + }); + + it('retries transport errors that surface mid-stream', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(erroringStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).resolves.toBe('Recovered'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(3); + }); + + it('invokes the backoff function with the attempt number', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Done')))); + + const backoffMs = vi.fn(() => 0); + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + backoffMs, + }, + }); + + await result.getText(); + expect(backoffMs).toHaveBeenCalledWith(1); + }); +}); + +describe('retryTurn: idle timeout (hung streams)', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('converts a silently-hung follow-up stream into a retried turn', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(hangingStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered after hang')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 1, + idleTimeoutMs: 50, + }, + }); + + await expect(result.getText()).resolves.toBe('Recovered after hang'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(3); + }); + + it('fails with TurnIdleTimeoutError when retries are exhausted on hangs', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(hangingStream())) + .mockResolvedValueOnce(ok(hangingStream())); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 1, + idleTimeoutMs: 50, + }, + }); + + const error = await result.getText().then( + () => null, + (e: unknown) => e, + ); + expect(error).toBeInstanceOf(TurnIdleTimeoutError); + expect((error as TurnIdleTimeoutError).idleTimeoutMs).toBe(50); + }); +}); + +describe('retryTurn: initial turn (turn 0)', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('re-issues the initial request when the turn-0 stream dies', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Hello!')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).resolves.toBe('Hello!'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + + // Retried request carries the same resolved input. + const firstInput = mockBetaResponsesSend.mock.calls[0]?.[1]?.responsesRequest?.input; + const retryInput = mockBetaResponsesSend.mock.calls[1]?.[1]?.responsesRequest?.input; + expect(retryInput).toEqual(firstInput); + }); + + it('retries a hung turn-0 stream via the idle timeout', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(hangingStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: { + idleTimeoutMs: 50, + }, + }); + + await expect(result.getText()).resolves.toBe('Recovered'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + }); + + it('retries send-phase failures (HTTP 5xx) before any stream exists', async () => { + const serverError = Object.assign(new Error('Internal Server Error'), { + statusCode: 500, + }); + mockBetaResponsesSend + .mockResolvedValueOnce({ + ok: false, + error: serverError, + }) + .mockResolvedValueOnce(ok(completedStream(textResponse('Recovered')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).resolves.toBe('Recovered'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(2); + }); + + it('does not retry non-retryable HTTP errors (400) by default', async () => { + const badRequest = Object.assign(new Error('Bad Request'), { + statusCode: 400, + }); + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: false, + error: badRequest, + }); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + await expect(result.getText()).rejects.toThrow('Bad Request'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(1); + }); + + it('does not retry a response.failed terminal event by default', async () => { + const failedStream = new ReadableStream({ + start(controller) { + controller.enqueue({ + type: 'response.failed', + response: { + ...textResponse(''), + status: 'failed', + error: { + code: 'refusal', + message: 'Model refused', + }, + }, + }); + controller.close(); + }, + }); + mockBetaResponsesSend.mockResolvedValueOnce(ok(failedStream)); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: { + limit: 3, + }, + }); + + await expect(result.getText()).rejects.toThrow('Response failed'); + expect(mockBetaResponsesSend).toHaveBeenCalledTimes(1); + }); +}); + +describe('retryTurn: turn.retry events on getFullResponsesStream', () => { + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + weatherTool.function.execute.mockClear(); + }); + + it('emits turn.retry between turn.start and turn.end for a retried follow-up', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(completedStream(toolCallResponse()))) + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Sunny')))); + + const result = callModel(client, { + model: 'test-model', + input: 'What is the weather in Tokyo?', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + const events: Array<{ + type: string; + turnNumber?: number; + attempt?: number; + }> = []; + for await (const event of result.getFullResponsesStream()) { + if ('type' in event) { + events.push( + event as { + type: string; + turnNumber?: number; + attempt?: number; + }, + ); + } + } + + const retryEvents = events.filter((e) => e.type === 'turn.retry'); + expect(retryEvents).toHaveLength(1); + expect(retryEvents[0]).toMatchObject({ + turnNumber: 1, + attempt: 1, + }); + + // Exactly one turn.start and one turn.end for turn 1 — the retry does + // not duplicate the turn delimiters. + const turn1Starts = events.filter((e) => e.type === 'turn.start' && e.turnNumber === 1); + const turn1Ends = events.filter((e) => e.type === 'turn.end' && e.turnNumber === 1); + expect(turn1Starts).toHaveLength(1); + expect(turn1Ends).toHaveLength(1); + + // Ordering: start < retry < end + const startIdx = events.findIndex((e) => e.type === 'turn.start' && e.turnNumber === 1); + const retryIdx = events.findIndex((e) => e.type === 'turn.retry'); + const endIdx = events.findIndex((e) => e.type === 'turn.end' && e.turnNumber === 1); + expect(startIdx).toBeLessThan(retryIdx); + expect(retryIdx).toBeLessThan(endIdx); + }); + + it('recovers a dead turn-0 stream without poisoning stream consumers', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce(ok(deadStream())) + .mockResolvedValueOnce(ok(completedStream(textResponse('Hello!')))); + + const result = callModel(client, { + model: 'test-model', + input: 'Hi', + tools: [ + weatherTool, + ] as const, + retryTurn: {}, + }); + + const events: Array<{ + type: string; + turnNumber?: number; + }> = []; + for await (const event of result.getFullResponsesStream()) { + if ('type' in event) { + events.push( + event as { + type: string; + turnNumber?: number; + }, + ); + } + } + + const retryEvents = events.filter((e) => e.type === 'turn.retry'); + expect(retryEvents).toHaveLength(1); + + const turn0Ends = events.filter((e) => e.type === 'turn.end' && e.turnNumber === 0); + expect(turn0Ends).toHaveLength(1); + + // The stream completed cleanly and the text is retrievable. + await expect(result.getText()).resolves.toBe('Hello!'); + }); +});