From a19dc61e4028b0b816b1ec6a08e193711395460f Mon Sep 17 00:00:00 2001 From: Maksym Yezhov Date: Tue, 2 Jun 2026 18:41:37 -0700 Subject: [PATCH] refactor: v2 - ai assistant - introduce agentthread primitive --- .../components/AiChat/AiChatContent.tsx | 31 ++++- .../components/AiChat/AiChatStoreContext.tsx | 10 +- .../shared/components/AiChat/agentClient.ts | 26 ++-- .../shared/components/AiChat/agentThread.ts | 113 +++++++++++++++++ .../shared/components/AiChat/aiChatStore.ts | 120 ++++++------------ 5 files changed, 196 insertions(+), 104 deletions(-) create mode 100644 src/routes/v2/shared/components/AiChat/agentThread.ts diff --git a/src/routes/v2/shared/components/AiChat/AiChatContent.tsx b/src/routes/v2/shared/components/AiChat/AiChatContent.tsx index 6ba5a7e56..34223658d 100644 --- a/src/routes/v2/shared/components/AiChat/AiChatContent.tsx +++ b/src/routes/v2/shared/components/AiChat/AiChatContent.tsx @@ -5,7 +5,9 @@ import { useEffect, useRef, useState } from "react"; import type { RecentPipelineRun } from "@/agent/session"; import type { ToolBridgeApi } from "@/agent/toolBridgeApi"; import { useAuthLocalStorage } from "@/components/shared/Authentication/useAuthLocalStorage"; -import { BlockStack } from "@/components/ui/layout"; +import { Button } from "@/components/ui/button"; +import { Icon } from "@/components/ui/icon"; +import { BlockStack, InlineStack } from "@/components/ui/layout"; import { useAiProviderSettings } from "@/hooks/useAiProviderSettings"; import useToastNotification from "@/hooks/useToastNotification"; import { useBackend } from "@/providers/BackendProvider"; @@ -99,11 +101,14 @@ export const AiChatContent = observer(function AiChatContent({ refetchOnWindowFocus: false, }); + const thread = aiChat.activeThread; + function handleSend(prompt: string) { + if (!thread) return; const recentRuns = recentRunsData ? projectRecentRuns(recentRunsData) : undefined; - aiChat.sendMessage(prompt, { + thread.sendMessage(prompt, { onError: (msg) => notify(msg, "error"), bridge, aiConfig, @@ -115,13 +120,29 @@ export const AiChatContent = observer(function AiChatContent({ return ; } + if (!thread) return null; + return ( + + + - + ); }); diff --git a/src/routes/v2/shared/components/AiChat/AiChatStoreContext.tsx b/src/routes/v2/shared/components/AiChat/AiChatStoreContext.tsx index 190fa76ee..3778db859 100644 --- a/src/routes/v2/shared/components/AiChat/AiChatStoreContext.tsx +++ b/src/routes/v2/shared/components/AiChat/AiChatStoreContext.tsx @@ -6,7 +6,6 @@ import { useRequiredContext, } from "@/hooks/useRequiredContext"; -import { getAgentClient } from "./agentClient"; import { AiChatStore } from "./aiChatStore"; const AiChatStoreCtx = createRequiredContext("AiChatStoreContext"); @@ -14,7 +13,14 @@ const AiChatStoreCtx = createRequiredContext("AiChatStoreContext"); export function AiChatStoreProvider({ children }: { children: ReactNode }) { const [store] = useState(() => new AiChatStore()); - useEffect(() => () => getAgentClient().terminate(), []); + // Ensure a thread exists on (re)mount and tear them all down on + // unmount. The ensure-on-mount step matters under React StrictMode, + // whose mount -> unmount -> mount cycle would otherwise leave the + // store empty after the cleanup disposes the constructor-seeded thread. + useEffect(() => { + store.ensureActiveThread(); + return () => store.disposeAll(); + }, [store]); return ( {children} diff --git a/src/routes/v2/shared/components/AiChat/agentClient.ts b/src/routes/v2/shared/components/AiChat/agentClient.ts index a853f4bcf..6e9a8186e 100644 --- a/src/routes/v2/shared/components/AiChat/agentClient.ts +++ b/src/routes/v2/shared/components/AiChat/agentClient.ts @@ -1,9 +1,12 @@ /** * Main-thread client for the in-browser agent worker. * - * Spawns a single Web Worker (lazy, on first use), wires it up over - * Comlink, and exposes a typed `ask()` method that the AI Chat store - * calls. + * Each `AgentClient` instance is bound to a single `threadId` and owns + * exactly one Web Worker (spawned lazily on first use). It wires the + * worker up over Comlink and exposes a typed `ask()` method. The thread + * id is injected into every request so the worker keys its in-memory + * conversation memory by it. Lifecycle (create / terminate) is owned by + * the `AgentThread` primitive — there is no global singleton. */ import * as Comlink from "comlink"; @@ -20,16 +23,17 @@ interface InitDeps { interface AskOptions { message: string; - threadId?: string; recentRuns?: RecentPipelineRun[]; aiConfig: AiProviderConfig; } -class AgentClient { +export class AgentClient { private worker: Worker | null = null; private remote: Comlink.Remote | null = null; private initPromise: Promise | null = null; + constructor(private readonly threadId: string) {} + private async ensureInit( deps: InitDeps, ): Promise> { @@ -64,9 +68,10 @@ class AgentClient { signal?: AbortSignal, ): Promise { const remote = await this.ensureInit(deps); + const params = { ...options, threadId: this.threadId }; return signal - ? remote.ask(options, Comlink.proxy(signal)) - : remote.ask(options); + ? remote.ask(params, Comlink.proxy(signal)) + : remote.ask(params); } terminate(): void { @@ -76,10 +81,3 @@ class AgentClient { this.initPromise = null; } } - -let singleton: AgentClient | null = null; - -export function getAgentClient(): AgentClient { - if (!singleton) singleton = new AgentClient(); - return singleton; -} diff --git a/src/routes/v2/shared/components/AiChat/agentThread.ts b/src/routes/v2/shared/components/AiChat/agentThread.ts new file mode 100644 index 000000000..c0537f618 --- /dev/null +++ b/src/routes/v2/shared/components/AiChat/agentThread.ts @@ -0,0 +1,113 @@ +import { action, makeObservable, observable, runInAction } from "mobx"; + +import type { RecentPipelineRun } from "@/agent/session"; +import type { ToolBridgeApi } from "@/agent/toolBridgeApi"; +import type { AiProviderConfig } from "@/types/aiProvider"; +import { getErrorMessage } from "@/utils/string"; + +import { AgentClient } from "./agentClient"; +import type { ChatMessage } from "./types"; + +function generateMessageId(): string { + return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 7)}`; +} + +function generateThreadId(): string { + return `thread-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +interface SendMessageOptions { + onError: (message: string) => void; + bridge: ToolBridgeApi; + aiConfig: AiProviderConfig; + recentRuns?: RecentPipelineRun[]; +} + +/** + * A single AI conversation: one Web Worker (agent + in-memory session) + * plus the chat state that survives the React component tree (window + * minimize / hide / unmount). Disposing a thread terminates its worker + * and discards the conversation entirely. + */ +export class AgentThread { + readonly threadId: string; + + @observable.shallow accessor messages: ChatMessage[] = []; + @observable accessor thinkingText: string | null = null; + @observable accessor isPending = false; + + private readonly client: AgentClient; + private abortController: AbortController | null = null; + + constructor(threadId?: string) { + makeObservable(this); + this.threadId = threadId ?? generateThreadId(); + this.client = new AgentClient(this.threadId); + } + + abort() { + this.abortController?.abort(); + } + + async sendMessage(prompt: string, options: SendMessageOptions) { + const abortController = new AbortController(); + this.abortController = abortController; + + runInAction(() => { + this.messages = [ + ...this.messages, + { id: generateMessageId(), role: "user", content: prompt }, + ]; + this.isPending = true; + this.thinkingText = null; + }); + + try { + const response = await this.client.ask( + { + bridge: options.bridge, + onStatus: (status) => { + runInAction(() => { + this.thinkingText = status.text; + }); + }, + }, + { + message: prompt, + aiConfig: options.aiConfig, + ...(options.recentRuns && { recentRuns: options.recentRuns }), + }, + abortController.signal, + ); + + runInAction(() => { + this.thinkingText = null; + this.messages = [ + ...this.messages, + { + id: generateMessageId(), + role: "assistant", + content: response.answer, + }, + ]; + }); + } catch (error) { + options.onError(`AI request failed: ${getErrorMessage(error)}`); + } finally { + this.abortController = null; + runInAction(() => { + this.isPending = false; + this.thinkingText = null; + }); + } + } + + @action dispose() { + this.abortController?.abort(); + this.abortController = null; + this.client.terminate(); + } + + // future: persist() — snapshot/restore a thread across reloads. Out of + // scope for now. +} diff --git a/src/routes/v2/shared/components/AiChat/aiChatStore.ts b/src/routes/v2/shared/components/AiChat/aiChatStore.ts index cbf567e8e..95a2a8a4b 100644 --- a/src/routes/v2/shared/components/AiChat/aiChatStore.ts +++ b/src/routes/v2/shared/components/AiChat/aiChatStore.ts @@ -1,102 +1,56 @@ -import { action, makeObservable, observable, runInAction } from "mobx"; +import { action, computed, makeObservable, observable } from "mobx"; -import type { RecentPipelineRun } from "@/agent/session"; -import type { ToolBridgeApi } from "@/agent/toolBridgeApi"; -import type { AiProviderConfig } from "@/types/aiProvider"; -import { getErrorMessage } from "@/utils/string"; - -import { getAgentClient } from "./agentClient"; -import type { ChatMessage } from "./types"; - -function generateMessageId(): string { - return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 7)}`; -} - -interface SendMessageOptions { - onError: (message: string) => void; - bridge: ToolBridgeApi; - aiConfig: AiProviderConfig; - recentRuns?: RecentPipelineRun[]; -} +import { AgentThread } from "./agentThread"; /** - * Stores AI chat state (messages, thread, pending status) outside the - * React component tree so it survives window minimize / hide / unmount. + * Owns the collection of {@link AgentThread}s for one AI chat provider. + * + * Today only a single thread is effectively active: starting a new + * session (or navigating to a different pipeline / run) disposes the + * current thread and creates a fresh one in one go. The collection shape + * leaves room for multiple concurrent threads in the future. */ export class AiChatStore { - @observable.shallow accessor messages: ChatMessage[] = []; - @observable accessor threadId: string | undefined = undefined; - @observable accessor thinkingText: string | null = null; - @observable accessor isPending = false; - - private abortController: AbortController | null = null; + @observable.shallow accessor threads: AgentThread[] = []; + @observable accessor activeThreadId: string | null = null; constructor() { makeObservable(this); + this.newThread(); } - @action resetState() { - this.messages = []; - this.threadId = undefined; - this.thinkingText = null; - this.isPending = false; - this.abortController?.abort(); - this.abortController = null; + @computed get activeThread(): AgentThread | null { + return this.threads.find((t) => t.threadId === this.activeThreadId) ?? null; } - abort() { - this.abortController?.abort(); + /** Creates a thread if none is active. Idempotent. */ + @action ensureActiveThread(): AgentThread { + return this.activeThread ?? this.newThread(); } - async sendMessage(prompt: string, options: SendMessageOptions) { - runInAction(() => { - this.messages = [ - ...this.messages, - { id: generateMessageId(), role: "user", content: prompt }, - ]; - this.isPending = true; - this.thinkingText = null; - }); + /** + * Disposes the current active thread and spins up a fresh one, + * making it active. Used for both navigation resets and the + * user-triggered "new chat" action. + */ + @action newThread(): AgentThread { + const previous = this.activeThread; + if (previous) { + previous.dispose(); + this.threads = this.threads.filter((t) => t !== previous); + } - try { - const client = getAgentClient(); - const response = await client.ask( - { - bridge: options.bridge, - onStatus: (status) => { - runInAction(() => { - this.thinkingText = status.text; - }); - }, - }, - { - message: prompt, - aiConfig: options.aiConfig, - ...(this.threadId && { threadId: this.threadId }), - ...(options.recentRuns && { recentRuns: options.recentRuns }), - }, - ); + const thread = new AgentThread(); + this.threads = [...this.threads, thread]; + this.activeThreadId = thread.threadId; + return thread; + } - runInAction(() => { - this.thinkingText = null; - this.threadId = response.threadId; - this.messages = [ - ...this.messages, - { - id: generateMessageId(), - role: "assistant", - content: response.answer, - }, - ]; - }); - } catch (error) { - options.onError(`AI request failed: ${getErrorMessage(error)}`); - } finally { - this.abortController = null; - runInAction(() => { - this.isPending = false; - this.thinkingText = null; - }); + @action disposeAll() { + for (const thread of this.threads) { + thread.dispose(); } + this.threads = []; + this.activeThreadId = null; } }