diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 50abf3d..4540e07 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -44,8 +44,9 @@ export type { // Output item types (StreamableOutputItem members) OutputFileSearchCallItem, OutputFunctionCallItem, + OutputImage, + OutputImage as OutputInputImage, OutputImageGenerationCallItem, - OutputInputImage, OutputItems, OutputMessage, OutputReasoningItem, diff --git a/packages/agent/src/lib/model-result.ts b/packages/agent/src/lib/model-result.ts index f1087b5..4ca5fc9 100644 --- a/packages/agent/src/lib/model-result.ts +++ b/packages/agent/src/lib/model-result.ts @@ -78,6 +78,7 @@ import { isServerTool, isToolCallOutputEvent, } from './tool-types.js'; +import { normalizeInputToArray } from './turn-context.js'; /** * Typeguard for plain-object records (non-null, non-array). @@ -218,6 +219,8 @@ export class ModelResult< }> = []; // Track resolved request after async function resolution private resolvedRequest: models.ResponsesRequest | null = null; + // Fresh user items to persist atomically with the assistant response + private pendingFreshItems: models.BaseInputsUnion[] | undefined; // State management for multi-turn conversations private stateAccessor: StateAccessor | null = null; @@ -491,11 +494,17 @@ export class ModelResult< response.output, ]; + // Persist pending fresh user items together with the assistant output + // so they land atomically — if the stream failed before reaching here + // neither the user turn nor the assistant turn is written to state. + let messages = this.currentState.messages; + if (this.pendingFreshItems && this.pendingFreshItems.length > 0) { + messages = appendToMessages(messages, this.pendingFreshItems); + this.pendingFreshItems = undefined; + } + await this.saveStateSafely({ - messages: appendToMessages( - this.currentState.messages, - outputItems as models.BaseInputsUnion[], - ), + messages: appendToMessages(messages, outputItems as models.BaseInputsUnion[]), previousResponseId: response.id, }); } @@ -1493,11 +1502,17 @@ export class ModelResult< // (newly supplied this turn). `onResponseReceived` must fire only for // fresh items — re-hooking historical outputs on every callModel call // would double-invoke non-idempotent hooks. + // + // Fresh items are tracked locally and persisted to state only after the + // API call succeeds, avoiding duplication when a caller retries after a + // transient API failure. const hasLoadedHistory = !!this.currentState?.messages && Array.isArray(this.currentState.messages) && this.currentState.messages.length > 0; + let freshItemsForState: models.BaseInputsUnion[] | undefined; + if (hasLoadedHistory && this.currentState) { // `currentState.messages` is InputsUnion — keep it as that union so // appendToMessages (which expects InputsUnion) accepts it directly. @@ -1527,6 +1542,8 @@ export class ModelResult< ? await this.applyHooksToFreshItems(freshItems, historicalMessages, initialContext) : undefined; + freshItemsForState = hookedFresh; + baseRequest = { ...baseRequest, input: hookedFresh @@ -1544,6 +1561,9 @@ export class ModelResult< this.contextStore ?? undefined, this.options.sharedContextSchema, ); + + freshItemsForState = normalizeInputToArray(hookedInput); + baseRequest = { ...baseRequest, input: hookedInput, @@ -1572,6 +1592,14 @@ export class ModelResult< throw apiResult.error; } + // Stash fresh user items so saveResponseToState can persist them + // atomically with the assistant output. Writing them here would leave + // an orphaned user turn if the stream fails after ok:true — on retry + // the same input would be appended again, producing duplicates. + if (freshItemsForState && freshItemsForState.length > 0) { + this.pendingFreshItems = freshItemsForState; + } + // Handle both streaming and non-streaming responses // The API may return a non-streaming response even when stream: true is requested if (isEventStream(apiResult.value)) { diff --git a/packages/agent/tests/unit/user-input-persistence.test.ts b/packages/agent/tests/unit/user-input-persistence.test.ts new file mode 100644 index 0000000..886b1bf --- /dev/null +++ b/packages/agent/tests/unit/user-input-persistence.test.ts @@ -0,0 +1,434 @@ +import type { OpenRouterCore } from '@openrouter/sdk/core'; +import type * as models from '@openrouter/sdk/models'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { z } from 'zod/v4'; +import type { ConversationState, StateAccessor } from '../../src/index.js'; +import { callModel } from '../../src/inner-loop/call-model.js'; +import { ToolType } from '../../src/lib/tool-types.js'; + +const mockBetaResponsesSend = vi.hoisted(() => vi.fn()); + +vi.mock('@openrouter/sdk/funcs/betaResponsesSend', () => ({ + betaResponsesSend: mockBetaResponsesSend, +})); + +function textResponse(text: string): models.OpenResponsesResult { + return { + id: 'resp_text', + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + id: 'msg_1', + type: 'message', + role: 'assistant', + status: 'completed', + content: [ + { + type: 'output_text', + text, + annotations: [], + }, + ], + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +function toolCallResponse(callId: string, name: string, args: string): models.OpenResponsesResult { + return { + id: `resp_tc_${callId}`, + object: 'response', + createdAt: 0, + model: 'test-model', + status: 'completed', + completedAt: 0, + output: [ + { + type: 'function_call', + id: `fc_${callId}`, + callId, + name, + arguments: args, + status: 'completed', + }, + ], + error: null, + incompleteDetails: null, + temperature: null, + topP: null, + presencePenalty: null, + frequencyPenalty: null, + metadata: null, + instructions: null, + tools: [], + toolChoice: 'auto', + parallelToolCalls: false, + } as models.OpenResponsesResult; +} + +const echoTool = { + type: ToolType.Function, + function: { + name: 'echo', + description: 'Echo input', + inputSchema: z.object({ + message: z.string(), + }), + execute: async (params: { message: string }) => ({ + echoed: params.message, + }), + }, +} as const; + +const client = {} as OpenRouterCore; + +describe('User Input Persistence to State', () => { + let storedState: ConversationState | null; + let stateAccessor: StateAccessor; + + beforeEach(() => { + mockBetaResponsesSend.mockReset(); + storedState = null; + stateAccessor = { + load: async () => storedState, + save: async (state) => { + storedState = state; + }, + }; + }); + + it('should persist array user input items to state.messages after successful API call', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Hello back!'), + }); + + const result = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await result.getText(); + + expect(storedState).not.toBeNull(); + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + expect(Array.isArray(messages)).toBe(true); + + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello'); + }); + + it('should persist string input normalized as user message', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Response'), + }); + + const result = callModel(client, { + model: 'test-model', + input: 'Hello string input', + state: stateAccessor, + }); + + await result.getText(); + + expect(storedState).not.toBeNull(); + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + expect(Array.isArray(messages)).toBe(true); + + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello string input'); + }); + + it('should persist user input alongside tool results after tool execution', async () => { + mockBetaResponsesSend + .mockResolvedValueOnce({ + ok: true, + value: toolCallResponse('call_1', 'echo', '{"message":"hi"}'), + }) + .mockResolvedValueOnce({ + ok: true, + value: textResponse('Done echoing.'), + }); + + const result = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Echo hi', + }, + ], + tools: [ + echoTool, + ] as const, + state: stateAccessor, + }); + + await result.getText(); + + expect(storedState).not.toBeNull(); + const messages = storedState!.messages as Array<{ + role?: string; + type?: string; + }>; + expect(Array.isArray(messages)).toBe(true); + + const userItems = messages.filter((m) => m.role === 'user'); + const fnCalls = messages.filter((m) => m.type === 'function_call'); + const fnOutputs = messages.filter((m) => m.type === 'function_call_output'); + + expect(userItems.length).toBe(1); + expect(fnCalls.length).toBe(1); + expect(fnOutputs.length).toBe(1); + }); + + it('should include prior user input in API request when resuming from state', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('First response'), + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'First message', + }, + ], + state: stateAccessor, + }); + await result1.getText(); + + const messagesAfterFirst = storedState!.messages as Array<{ + role?: string; + }>; + const firstUserItems = messagesAfterFirst.filter((m) => m.role === 'user'); + expect(firstUserItems.length).toBe(1); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Second response'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Second message', + }, + ], + state: stateAccessor, + }); + await result2.getText(); + + const secondCallRequest = mockBetaResponsesSend.mock.calls[1]?.[1]?.responsesRequest; + expect(secondCallRequest).toBeDefined(); + + const input = secondCallRequest.input as Array<{ + role?: string; + content?: string; + }>; + expect(Array.isArray(input)).toBe(true); + + const userMessages = input.filter((i) => i.role === 'user'); + expect(userMessages.length).toBe(2); + expect(userMessages[0]!.content).toBe('First message'); + expect(userMessages[1]!.content).toBe('Second message'); + }); + + it('should accumulate user input from both calls in state after two callModel invocations', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Reply 1'), + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Question 1', + }, + ], + state: stateAccessor, + }); + await result1.getText(); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Reply 2'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Question 2', + }, + ], + state: stateAccessor, + }); + await result2.getText(); + + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(2); + expect(userItems[0]!.content).toBe('Question 1'); + expect(userItems[1]!.content).toBe('Question 2'); + }); + + it('should not persist user input when stream fails mid-read (ok:true + stream error)', async () => { + const failingStream = new ReadableStream({ + start(controller) { + controller.error(new Error('stream dropped')); + }, + }); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: failingStream, + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await expect(result1.getText()).rejects.toThrow(); + + const messagesAfterStreamFailure = storedState?.messages as + | Array<{ + role?: string; + }> + | undefined; + const userItemsAfterFailure = + messagesAfterStreamFailure?.filter((m) => m.role === 'user') ?? []; + expect(userItemsAfterFailure.length).toBe(0); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Hello back!'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await result2.getText(); + + const messages = storedState!.messages as Array<{ + role?: string; + type?: string; + content?: string; + }>; + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello'); + + const assistantItems = messages.filter((m) => m.role === 'assistant' || m.type === 'message'); + expect(assistantItems.length).toBe(1); + }); + + it('should not duplicate user input in state when retrying after API failure', async () => { + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: false, + error: new Error('API failure'), + }); + + const result1 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await expect(result1.getText()).rejects.toThrow(); + + const messagesAfterFailure = storedState?.messages as + | Array<{ + role?: string; + }> + | undefined; + const userItemsAfterFailure = messagesAfterFailure?.filter((m) => m.role === 'user') ?? []; + expect(userItemsAfterFailure.length).toBe(0); + + mockBetaResponsesSend.mockResolvedValueOnce({ + ok: true, + value: textResponse('Hello back!'), + }); + + const result2 = callModel(client, { + model: 'test-model', + input: [ + { + role: 'user', + content: 'Hello', + }, + ], + state: stateAccessor, + }); + + await result2.getText(); + + const messages = storedState!.messages as Array<{ + role?: string; + content?: string; + }>; + const userItems = messages.filter((m) => m.role === 'user'); + expect(userItems.length).toBe(1); + expect(userItems[0]!.content).toBe('Hello'); + }); +});