Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions src/routes/v2/shared/components/AiChat/AiChatContent.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { useEffect, useRef, useState } from "react";
import type { RecentPipelineRun } from "@/agent/session";
import type { ToolBridgeApi } from "@/agent/toolBridgeApi";
import { useAuthLocalStorage } from "@/components/shared/Authentication/useAuthLocalStorage";
import { BlockStack } from "@/components/ui/layout";
import { Button } from "@/components/ui/button";
import { Icon } from "@/components/ui/icon";
import { BlockStack, InlineStack } from "@/components/ui/layout";
import { useAiProviderSettings } from "@/hooks/useAiProviderSettings";
import useToastNotification from "@/hooks/useToastNotification";
import { useBackend } from "@/providers/BackendProvider";
Expand Down Expand Up @@ -99,11 +101,14 @@ export const AiChatContent = observer(function AiChatContent({
refetchOnWindowFocus: false,
});

const thread = aiChat.activeThread;

function handleSend(prompt: string) {
if (!thread) return;
const recentRuns = recentRunsData
? projectRecentRuns(recentRunsData)
: undefined;
aiChat.sendMessage(prompt, {
thread.sendMessage(prompt, {
onError: (msg) => notify(msg, "error"),
bridge,
aiConfig,
Expand All @@ -115,13 +120,29 @@ export const AiChatContent = observer(function AiChatContent({
return <AiProviderSetup />;
}

if (!thread) return null;

return (
<BlockStack fill>
<InlineStack
className="border-b p-2 w-full"
align="end"
blockAlign="center"
>
<Button
size="sm"
variant="ghost"
onClick={() => aiChat.newThread()}
aria-label="New chat"
>
<Icon name="SquarePen" />
</Button>
</InlineStack>
<ChatMessageList
messages={aiChat.messages}
thinkingText={aiChat.thinkingText}
messages={thread.messages}
thinkingText={thread.thinkingText}
/>
<ChatInput isPending={aiChat.isPending} onSubmit={handleSend} />
<ChatInput isPending={thread.isPending} onSubmit={handleSend} />
</BlockStack>
);
});
10 changes: 8 additions & 2 deletions src/routes/v2/shared/components/AiChat/AiChatStoreContext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ import {
useRequiredContext,
} from "@/hooks/useRequiredContext";

import { getAgentClient } from "./agentClient";
import { AiChatStore } from "./aiChatStore";

const AiChatStoreCtx = createRequiredContext<AiChatStore>("AiChatStoreContext");

export function AiChatStoreProvider({ children }: { children: ReactNode }) {
const [store] = useState(() => new AiChatStore());

useEffect(() => () => getAgentClient().terminate(), []);
// Ensure a thread exists on (re)mount and tear them all down on
// unmount. The ensure-on-mount step matters under React StrictMode,
// whose mount -> unmount -> mount cycle would otherwise leave the
// store empty after the cleanup disposes the constructor-seeded thread.
useEffect(() => {
store.ensureActiveThread();
return () => store.disposeAll();
}, [store]);

return (
<AiChatStoreCtx.Provider value={store}>{children}</AiChatStoreCtx.Provider>
Expand Down
26 changes: 12 additions & 14 deletions src/routes/v2/shared/components/AiChat/agentClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
/**
* Main-thread client for the in-browser agent worker.
*
* Spawns a single Web Worker (lazy, on first use), wires it up over
* Comlink, and exposes a typed `ask()` method that the AI Chat store
* calls.
* Each `AgentClient` instance is bound to a single `threadId` and owns
* exactly one Web Worker (spawned lazily on first use). It wires the
* worker up over Comlink and exposes a typed `ask()` method. The thread
* id is injected into every request so the worker keys its in-memory
* conversation memory by it. Lifecycle (create / terminate) is owned by
* the `AgentThread` primitive — there is no global singleton.
*/
import * as Comlink from "comlink";

Expand All @@ -20,16 +23,17 @@ interface InitDeps {

interface AskOptions {
message: string;
threadId?: string;
recentRuns?: RecentPipelineRun[];
aiConfig: AiProviderConfig;
}

class AgentClient {
export class AgentClient {
private worker: Worker | null = null;
private remote: Comlink.Remote<AgentWorkerApi> | null = null;
private initPromise: Promise<void> | null = null;

constructor(private readonly threadId: string) {}

private async ensureInit(
deps: InitDeps,
): Promise<Comlink.Remote<AgentWorkerApi>> {
Expand Down Expand Up @@ -64,9 +68,10 @@ class AgentClient {
signal?: AbortSignal,
): Promise<AgentResponse> {
const remote = await this.ensureInit(deps);
const params = { ...options, threadId: this.threadId };
return signal
? remote.ask(options, Comlink.proxy(signal))
: remote.ask(options);
? remote.ask(params, Comlink.proxy(signal))
: remote.ask(params);
}

terminate(): void {
Expand All @@ -76,10 +81,3 @@ class AgentClient {
this.initPromise = null;
}
}

let singleton: AgentClient | null = null;

export function getAgentClient(): AgentClient {
if (!singleton) singleton = new AgentClient();
return singleton;
}
113 changes: 113 additions & 0 deletions src/routes/v2/shared/components/AiChat/agentThread.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { action, makeObservable, observable, runInAction } from "mobx";

import type { RecentPipelineRun } from "@/agent/session";
import type { ToolBridgeApi } from "@/agent/toolBridgeApi";
import type { AiProviderConfig } from "@/types/aiProvider";
import { getErrorMessage } from "@/utils/string";

import { AgentClient } from "./agentClient";
import type { ChatMessage } from "./types";

function generateMessageId(): string {
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 7)}`;
}

function generateThreadId(): string {
return `thread-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}

interface SendMessageOptions {
onError: (message: string) => void;
bridge: ToolBridgeApi;
aiConfig: AiProviderConfig;
recentRuns?: RecentPipelineRun[];
}

/**
* A single AI conversation: one Web Worker (agent + in-memory session)
* plus the chat state that survives the React component tree (window
* minimize / hide / unmount). Disposing a thread terminates its worker
* and discards the conversation entirely.
*/
export class AgentThread {
readonly threadId: string;

@observable.shallow accessor messages: ChatMessage[] = [];
@observable accessor thinkingText: string | null = null;
@observable accessor isPending = false;

private readonly client: AgentClient;
private abortController: AbortController | null = null;

constructor(threadId?: string) {
makeObservable(this);
this.threadId = threadId ?? generateThreadId();
this.client = new AgentClient(this.threadId);
}

abort() {
this.abortController?.abort();
}

async sendMessage(prompt: string, options: SendMessageOptions) {
const abortController = new AbortController();
this.abortController = abortController;

runInAction(() => {
this.messages = [
...this.messages,
{ id: generateMessageId(), role: "user", content: prompt },
];
this.isPending = true;
this.thinkingText = null;
});

try {
const response = await this.client.ask(
{
bridge: options.bridge,
onStatus: (status) => {
runInAction(() => {
this.thinkingText = status.text;
});
},
},
{
message: prompt,
aiConfig: options.aiConfig,
...(options.recentRuns && { recentRuns: options.recentRuns }),
},
abortController.signal,
);

runInAction(() => {
this.thinkingText = null;
this.messages = [
...this.messages,
{
id: generateMessageId(),
role: "assistant",
content: response.answer,
},
];
});
} catch (error) {
options.onError(`AI request failed: ${getErrorMessage(error)}`);
} finally {
this.abortController = null;
runInAction(() => {
this.isPending = false;
this.thinkingText = null;
});
}
}

@action dispose() {
this.abortController?.abort();
this.abortController = null;
this.client.terminate();
}

// future: persist() — snapshot/restore a thread across reloads. Out of
// scope for now.
}
120 changes: 37 additions & 83 deletions src/routes/v2/shared/components/AiChat/aiChatStore.ts
Original file line number Diff line number Diff line change
@@ -1,102 +1,56 @@
import { action, makeObservable, observable, runInAction } from "mobx";
import { action, computed, makeObservable, observable } from "mobx";

import type { RecentPipelineRun } from "@/agent/session";
import type { ToolBridgeApi } from "@/agent/toolBridgeApi";
import type { AiProviderConfig } from "@/types/aiProvider";
import { getErrorMessage } from "@/utils/string";

import { getAgentClient } from "./agentClient";
import type { ChatMessage } from "./types";

function generateMessageId(): string {
return `msg-${Date.now()}-${Math.random().toString(36).slice(2, 7)}`;
}

interface SendMessageOptions {
onError: (message: string) => void;
bridge: ToolBridgeApi;
aiConfig: AiProviderConfig;
recentRuns?: RecentPipelineRun[];
}
import { AgentThread } from "./agentThread";

/**
* Stores AI chat state (messages, thread, pending status) outside the
* React component tree so it survives window minimize / hide / unmount.
* Owns the collection of {@link AgentThread}s for one AI chat provider.
*
* Today only a single thread is effectively active: starting a new
* session (or navigating to a different pipeline / run) disposes the
* current thread and creates a fresh one in one go. The collection shape
* leaves room for multiple concurrent threads in the future.
*/
export class AiChatStore {
@observable.shallow accessor messages: ChatMessage[] = [];
@observable accessor threadId: string | undefined = undefined;
@observable accessor thinkingText: string | null = null;
@observable accessor isPending = false;

private abortController: AbortController | null = null;
@observable.shallow accessor threads: AgentThread[] = [];
@observable accessor activeThreadId: string | null = null;

constructor() {
makeObservable(this);
this.newThread();
}

@action resetState() {
this.messages = [];
this.threadId = undefined;
this.thinkingText = null;
this.isPending = false;
this.abortController?.abort();
this.abortController = null;
@computed get activeThread(): AgentThread | null {
return this.threads.find((t) => t.threadId === this.activeThreadId) ?? null;
}

abort() {
this.abortController?.abort();
/** Creates a thread if none is active. Idempotent. */
@action ensureActiveThread(): AgentThread {
return this.activeThread ?? this.newThread();
}

async sendMessage(prompt: string, options: SendMessageOptions) {
runInAction(() => {
this.messages = [
...this.messages,
{ id: generateMessageId(), role: "user", content: prompt },
];
this.isPending = true;
this.thinkingText = null;
});
/**
* Disposes the current active thread and spins up a fresh one,
* making it active. Used for both navigation resets and the
* user-triggered "new chat" action.
*/
@action newThread(): AgentThread {
const previous = this.activeThread;
if (previous) {
previous.dispose();
this.threads = this.threads.filter((t) => t !== previous);
}

try {
const client = getAgentClient();
const response = await client.ask(
{
bridge: options.bridge,
onStatus: (status) => {
runInAction(() => {
this.thinkingText = status.text;
});
},
},
{
message: prompt,
aiConfig: options.aiConfig,
...(this.threadId && { threadId: this.threadId }),
...(options.recentRuns && { recentRuns: options.recentRuns }),
},
);
const thread = new AgentThread();
this.threads = [...this.threads, thread];
this.activeThreadId = thread.threadId;
return thread;
}

runInAction(() => {
this.thinkingText = null;
this.threadId = response.threadId;
this.messages = [
...this.messages,
{
id: generateMessageId(),
role: "assistant",
content: response.answer,
},
];
});
} catch (error) {
options.onError(`AI request failed: ${getErrorMessage(error)}`);
} finally {
this.abortController = null;
runInAction(() => {
this.isPending = false;
this.thinkingText = null;
});
@action disposeAll() {
for (const thread of this.threads) {
thread.dispose();
}
this.threads = [];
this.activeThreadId = null;
}
}
Loading