From 76876339a98105b868cf80e1a978d16e29e48e78 Mon Sep 17 00:00:00 2001 From: Ben Heidorn <301326+Cybourgeoisie@users.noreply.github.com> Date: Tue, 2 Jun 2026 07:46:19 -0400 Subject: [PATCH 1/3] fix(agent): update OutputInputImage re-export for SDK compat and persist user input - Replace removed `OutputInputImage` with `OutputImage` + backward-compat alias (`OutputImage as OutputInputImage`) after @openrouter/sdk dropped the old export name. - Persist fresh user input items to state only after the API call succeeds, preventing duplicates on transient retries. Co-Authored-By: Claude Opus 4 (1M context) --- packages/agent/src/index.ts | 3 +- packages/agent/src/lib/model-result.ts | 27 ++ .../tests/unit/user-input-persistence.test.ts | 369 ++++++++++++++++++ 3 files changed, 398 insertions(+), 1 deletion(-) create mode 100644 packages/agent/tests/unit/user-input-persistence.test.ts diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 50abf3d..4540e07 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -44,8 +44,9 @@ export type { // Output item types (StreamableOutputItem members) OutputFileSearchCallItem, OutputFunctionCallItem, + OutputImage, + OutputImage as OutputInputImage, OutputImageGenerationCallItem, - OutputInputImage, OutputItems, OutputMessage, OutputReasoningItem, diff --git a/packages/agent/src/lib/model-result.ts b/packages/agent/src/lib/model-result.ts index f1087b5..a66b9f8 100644 --- a/packages/agent/src/lib/model-result.ts +++ b/packages/agent/src/lib/model-result.ts @@ -78,6 +78,7 @@ import { isServerTool, isToolCallOutputEvent, } from './tool-types.js'; +import { normalizeInputToArray } from './turn-context.js'; /** * Typeguard for plain-object records (non-null, non-array). @@ -1493,11 +1494,17 @@ export class ModelResult< // (newly supplied this turn). `onResponseReceived` must fire only for // fresh items — re-hooking historical outputs on every callModel call // would double-invoke non-idempotent hooks. + // + // Fresh items are tracked locally and persisted to state only after the + // API call succeeds, avoiding duplication when a caller retries after a + // transient API failure. const hasLoadedHistory = !!this.currentState?.messages && Array.isArray(this.currentState.messages) && this.currentState.messages.length > 0; + let freshItemsForState: models.BaseInputsUnion[] | undefined; + if (hasLoadedHistory && this.currentState) { // `currentState.messages` is InputsUnion — keep it as that union so // appendToMessages (which expects InputsUnion) accepts it directly. @@ -1527,6 +1534,8 @@ export class ModelResult< ? await this.applyHooksToFreshItems(freshItems, historicalMessages, initialContext) : undefined; + freshItemsForState = hookedFresh; + baseRequest = { ...baseRequest, input: hookedFresh @@ -1544,6 +1553,9 @@ export class ModelResult< this.contextStore ?? undefined, this.options.sharedContextSchema, ); + + freshItemsForState = normalizeInputToArray(hookedInput); + baseRequest = { ...baseRequest, input: hookedInput, @@ -1572,6 +1584,21 @@ export class ModelResult< throw apiResult.error; } + // Persist fresh user items to state now that the API accepted the + // request. This runs after the success check so a transient API + // failure does not leave orphaned user turns in state (which would + // cause duplicates when the caller retries with the same input). + if ( + freshItemsForState && + freshItemsForState.length > 0 && + this.stateAccessor && + this.currentState + ) { + await this.saveStateSafely({ + messages: appendToMessages(this.currentState.messages, freshItemsForState), + }); + } + // Handle both streaming and non-streaming responses // The API may return a non-streaming response even when stream: true is requested if (isEventStream(apiResult.value)) { diff --git a/packages/agent/tests/unit/user-input-persistence.test.ts b/packages/agent/tests/unit/user-input-persistence.test.ts new file mode 100644 index 0000000..bdf712c --- /dev/null +++ b/packages/agent/tests/unit/user-input-persistence.test.ts @@ -0,0 +1,369 @@ +import type { OpenRouterCore } from '@openrouter/sdk/core'; +import type * as models from '@openrouter/sdk/models'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { z } from 'zod/v4'; +import type { ConversationState, StateAccessor } from '../../src/index.js'; +import { callModel } from '../../src/inner-loop/call-model.js'; +import { ToolType } from '../../src/lib/tool-types.js'; + +const mockBetaResponsesSend = vi.hoisted(() => vi.fn()); + +vi.mock('@openrouter/sdk/funcs/betaResponsesSend', () => ({ + betaResponsesSend: mockBetaResponsesSend, +})); + +function textResponse(text: string): models.OpenResponsesResult { + return { + id: 'resp_text', + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + id: 'msg_1', + type: 'message', + role: 'assistant', + status: 'completed', + content: [ + { + type: 'output_text', + text, + annotations: [], + }, + ], + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +function toolCallResponse(callId: string, name: string, args: string): models.OpenResponsesResult { + return { + id: `resp_tc_${callId}`, + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + type: 'function_call', + id: `fc_${callId}`, + callId, + name, + arguments: args, + status: 'completed', + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +const echoTool = { + type: ToolType.Function, + function: { + name: 'echo', + description: 'Echo input', + inputSchema: z.object({ + message: z.string(), + }), + execute: async (params: { message: string }) => ({ + echoed: params.message, + }), + }, +} as const; + +const client = {} as OpenRouterCore; + +describe('User Input Persistence to State', () => { + let storedState: ConversationState | null; + let stateAccessor: StateAccessor; + + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + storedState = null; + stateAccessor = { + load: async () => storedState, + save: async (state) => { + storedState = state; + }, + }; + }); + + it('should persist array user input items to state.messages after successful API call', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Hello back!'), + }); + + const result = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await result.getText(); + + expect(storedState).not.toBeNull(); + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + expect(Array.isArray(messages)).toBe(true); + + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello'); + }); + + it('should persist string input normalized as user message', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Response'), + }); + + const result = callModel(client, { + model: 'test-model', + input: 'Hello string input', + state: stateAccessor, + }); + + await result.getText(); + + expect(storedState).not.toBeNull(); + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + expect(Array.isArray(messages)).toBe(true); + + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello string input'); + }); + + it('should persist user input alongside tool results after tool execution', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce({ + ok: true, + value: toolCallResponse('call_1', 'echo', '{"message":"hi"}'), + }) + .mockResolvedValueOnce({ + ok: true, + value: textResponse('Done echoing.'), + }); + + const result = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Echo hi', + }, + ], + tools: [ + echoTool, + ] as const, + state: stateAccessor, + }); + + await result.getText(); + + expect(storedState).not.toBeNull(); + const messages = storedState!.messages as Array<{ + role?: string; + type?: string; + }>; + expect(Array.isArray(messages)).toBe(true); + + const userItems = messages.filter((m) => m.role === 'user'); + const fnCalls = messages.filter((m) => m.type === 'function_call'); + const fnOutputs = messages.filter((m) => m.type === 'function_call_output'); + + expect(userItems.length).toBe(1); + expect(fnCalls.length).toBe(1); + expect(fnOutputs.length).toBe(1); + }); + + it('should include prior user input in API request when resuming from state', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('First response'), + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'First message', + }, + ], + state: stateAccessor, + }); + await result1.getText(); + + const messagesAfterFirst = storedState!.messages as Array<{ + role?: string; + }>; + const firstUserItems = messagesAfterFirst.filter((m) => m.role === 'user'); + expect(firstUserItems.length).toBe(1); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Second response'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Second message', + }, + ], + state: stateAccessor, + }); + await result2.getText(); + + const secondCallRequest = mockBetaResponsesSend.mock.calls[1]?.[1]?.responsesRequest; + expect(secondCallRequest).toBeDefined(); + + const input = secondCallRequest.input as Array<{ + role?: string; + content?: string; + }>; + expect(Array.isArray(input)).toBe(true); + + const userMessages = input.filter((i) => i.role === 'user'); + expect(userMessages.length).toBe(2); + expect(userMessages[0]!.content).toBe('First message'); + expect(userMessages[1]!.content).toBe('Second message'); + }); + + it('should accumulate user input from both calls in state after two callModel invocations', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Reply 1'), + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Question 1', + }, + ], + state: stateAccessor, + }); + await result1.getText(); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Reply 2'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Question 2', + }, + ], + state: stateAccessor, + }); + await result2.getText(); + + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(2); + expect(userItems[0]!.content).toBe('Question 1'); + expect(userItems[1]!.content).toBe('Question 2'); + }); + + it('should not duplicate user input in state when retrying after API failure', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: false, + error: new Error('API failure'), + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await expect(result1.getText()).rejects.toThrow(); + + const messagesAfterFailure = storedState?.messages as + | Array<{ + role?: string; + }> + | undefined; + const userItemsAfterFailure = messagesAfterFailure?.filter((m) => m.role === 'user') ?? []; + expect(userItemsAfterFailure.length).toBe(0); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Hello back!'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await result2.getText(); + + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello'); + }); +}); From ce00bf9bede9a68d1b38d798c906c72356cc97a2 Mon Sep 17 00:00:00 2001 From: Ben Heidorn <301326+Cybourgeoisie@users.noreply.github.com> Date: Tue, 2 Jun 2026 11:13:32 -0400 Subject: [PATCH 2/3] fix(agent): persist user input atomically with assistant output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move fresh user-item persistence from initStream (after ok:true) to saveResponseToState (after stream consumption). Previously, an ok:true response followed by a mid-stream failure would leave the user turn in state with no assistant reply — on retry the same input was appended again, producing duplicates. Now user input and assistant output land atomically in a single saveStateSafely call: if the stream fails, neither is written, so a retry starts clean. Adds a streaming-failure regression test (ok:true + stream error + retry) to user-input-persistence.test.ts. Co-Authored-By: Claude Opus 4 (1M context) --- packages/agent/src/lib/model-result.ts | 35 +++++------ .../tests/unit/user-input-persistence.test.ts | 61 +++++++++++++++++++ 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/packages/agent/src/lib/model-result.ts b/packages/agent/src/lib/model-result.ts index a66b9f8..4ca5fc9 100644 --- a/packages/agent/src/lib/model-result.ts +++ b/packages/agent/src/lib/model-result.ts @@ -219,6 +219,8 @@ export class ModelResult< }> = []; // Track resolved request after async function resolution private resolvedRequest: models.ResponsesRequest | null = null; + // Fresh user items to persist atomically with the assistant response + private pendingFreshItems: models.BaseInputsUnion[] | undefined; // State management for multi-turn conversations private stateAccessor: StateAccessor | null = null; @@ -492,11 +494,17 @@ export class ModelResult< response.output, ]; + // Persist pending fresh user items together with the assistant output + // so they land atomically — if the stream failed before reaching here + // neither the user turn nor the assistant turn is written to state. + let messages = this.currentState.messages; + if (this.pendingFreshItems && this.pendingFreshItems.length > 0) { + messages = appendToMessages(messages, this.pendingFreshItems); + this.pendingFreshItems = undefined; + } + await this.saveStateSafely({ - messages: appendToMessages( - this.currentState.messages, - outputItems as models.BaseInputsUnion[], - ), + messages: appendToMessages(messages, outputItems as models.BaseInputsUnion[]), previousResponseId: response.id, }); } @@ -1584,19 +1592,12 @@ export class ModelResult< throw apiResult.error; } - // Persist fresh user items to state now that the API accepted the - // request. This runs after the success check so a transient API - // failure does not leave orphaned user turns in state (which would - // cause duplicates when the caller retries with the same input). - if ( - freshItemsForState && - freshItemsForState.length > 0 && - this.stateAccessor && - this.currentState - ) { - await this.saveStateSafely({ - messages: appendToMessages(this.currentState.messages, freshItemsForState), - }); + // Stash fresh user items so saveResponseToState can persist them + // atomically with the assistant output. Writing them here would leave + // an orphaned user turn if the stream fails after ok:true — on retry + // the same input would be appended again, producing duplicates. + if (freshItemsForState && freshItemsForState.length > 0) { + this.pendingFreshItems = freshItemsForState; } // Handle both streaming and non-streaming responses diff --git a/packages/agent/tests/unit/user-input-persistence.test.ts b/packages/agent/tests/unit/user-input-persistence.test.ts index bdf712c..467037d 100644 --- a/packages/agent/tests/unit/user-input-persistence.test.ts +++ b/packages/agent/tests/unit/user-input-persistence.test.ts @@ -313,6 +313,67 @@ describe('User Input Persistence to State', () => { expect(userItems[1]!.content).toBe('Question 2'); }); + it('should not persist user input when stream fails mid-read (ok:true + stream error)', async () => { + const failingStream = new ReadableStream({ + start(controller) { + controller.error(new Error('stream dropped')); + }, + }); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: failingStream, + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await expect(result1.getText()).rejects.toThrow(); + + const messagesAfterStreamFailure = storedState?.messages as + | Array<{ + role?: string; + }> + | undefined; + const userItemsAfterFailure = + messagesAfterStreamFailure?.filter((m) => m.role === 'user') ?? []; + expect(userItemsAfterFailure.length).toBe(0); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Hello back!'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await result2.getText(); + + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello'); + }); + it('should not duplicate user input in state when retrying after API failure', async () => { mockBetaResponsesSend.mockResolvedValueOnce({ ok: false, From 7b6704ae792df992df15dcf56c04a0afffe412fe Mon Sep 17 00:00:00 2001 From: Ben Heidorn <301326+Cybourgeoisie@users.noreply.github.com> Date: Tue, 2 Jun 2026 11:39:31 -0400 Subject: [PATCH 3/3] test(agent): assert assistant message lands atomically with user input Add assistant-item assertion to the streaming-failure regression test, making the atomicity invariant explicit: after a successful retry, both exactly one user turn and one assistant turn are present in state. Co-Authored-By: Claude Opus 4 (1M context) --- packages/agent/tests/unit/user-input-persistence.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/agent/tests/unit/user-input-persistence.test.ts b/packages/agent/tests/unit/user-input-persistence.test.ts index 467037d..886b1bf 100644 --- a/packages/agent/tests/unit/user-input-persistence.test.ts +++ b/packages/agent/tests/unit/user-input-persistence.test.ts @@ -367,11 +367,15 @@ describe('User Input Persistence to State', () => { const messages = storedState!.messages as Array<{ role?: string; + type?: string; content?: string; }>; const userItems = messages.filter((m) => m.role === 'user'); expect(userItems.length).toBe(1); expect(userItems[0]!.content).toBe('Hello'); + + const assistantItems = messages.filter((m) => m.role === 'assistant' || m.type === 'message'); + expect(assistantItems.length).toBe(1); }); it('should not duplicate user input in state when retrying after API failure', async () => {