From 6900c30935506a0e004cc2ba8a1703d9d8758c5f Mon Sep 17 00:00:00 2001 From: a Date: Sun, 28 Jun 2026 04:10:18 +0300 Subject: [PATCH] feat(ai-chat): live-follow a still-running run on chat reopen (#184) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reopening a chat whose agent run is still going showed a frozen snapshot from the moment it was opened. Add a passive-observer reconnect-poll path: when this tab did NOT start the run locally, poll POST /ai-chat/run every 2s while the run is pending/running and merge its incrementally-persisted assistant message into the thread, so new steps/tool-calls and the growing text appear live. Polling stops on terminal status (refetchInterval keyed on run.status, mirroring the reindex polling); a final messages invalidate shows the persisted end state. Observer-vs-streamer detection: ChatThread reports its local useChat streaming status up; the window only polls/merges while NOT locally streaming (the streamer's SSE owns the view — no double-render). Gated by settings.ai.autonomousRuns; the query is disabled when the feature is off so the flag-gated endpoint is never hit, and a failed fetch can't loop (retry:false -> refetchInterval(undefined)=false). Pure decisions (poll interval, observe gate, message merge) extracted to run-polling.ts and unit-tested; added query enable-gating and ChatThread observer-merge tests. Client-only change — the reconnect endpoint already returns the run plus the assistant message with its metadata.parts. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../ai-chat/components/ai-chat-window.tsx | 64 ++++++++++- .../ai-chat/components/chat-thread.test.tsx | 56 ++++++++++ .../ai-chat/components/chat-thread.tsx | 39 ++++++- .../features/ai-chat/queries/ai-chat-query.ts | 31 ++++++ .../queries/ai-chat-run-query.test.tsx | 92 ++++++++++++++++ .../ai-chat/services/ai-chat-service.ts | 16 +++ .../features/ai-chat/types/ai-chat.types.ts | 32 ++++++ .../ai-chat/utils/run-polling.test.ts | 104 ++++++++++++++++++ .../src/features/ai-chat/utils/run-polling.ts | 71 ++++++++++++ .../workspace/types/workspace.types.ts | 3 + 10 files changed, 506 insertions(+), 2 deletions(-) create mode 100644 apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx create mode 100644 apps/client/src/features/ai-chat/utils/run-polling.test.ts create mode 100644 apps/client/src/features/ai-chat/utils/run-polling.ts diff --git a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx index 3df60ddb..52b425cc 100644 --- a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx +++ b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx @@ -17,7 +17,7 @@ import { IconPlus, IconX, } from "@tabler/icons-react"; -import { useAtom, useSetAtom } from "jotai"; +import { useAtom, useAtomValue, useSetAtom } from "jotai"; import { useMatch } from "react-router-dom"; import { useTranslation } from "react-i18next"; import { useQueryClient } from "@tanstack/react-query"; @@ -34,9 +34,12 @@ import { AI_CHATS_RQ_KEY, AI_CHAT_MESSAGES_RQ_KEY, useAiChatMessagesQuery, + useAiChatRunQuery, useAiChatsQuery, useAiRolesQuery, } from "@/features/ai-chat/queries/ai-chat-query.ts"; +import { shouldObserveRun } from "@/features/ai-chat/utils/run-polling.ts"; +import { workspaceAtom } from "@/features/user/atoms/current-user-atom"; import ConversationList from "@/features/ai-chat/components/conversation-list.tsx"; import ChatThread from "@/features/ai-chat/components/chat-thread.tsx"; import { exportAiChat } from "@/features/ai-chat/services/ai-chat-service.ts"; @@ -162,6 +165,59 @@ export default function AiChatWindow() { const { data: messageRows, isLoading: messagesLoading } = useAiChatMessagesQuery(activeChatId ?? undefined); + // #184 reconnect-and-live-follow. Whether detached agent runs are enabled for + // this workspace; the reconnect endpoint is flag-gated server-side, so we must + // not poll it when the feature is off. + const workspace = useAtomValue(workspaceAtom); + const autonomousRunsEnabled = + workspace?.settings?.ai?.autonomousRuns === true; + + // Whether THIS tab is the one actively streaming the open chat's run locally + // (it started the run here and holds the SSE). Reported up from ChatThread. We + // are the STREAMER while true and a passive OBSERVER while false — the basis of + // the observer-vs-streamer detection. Reset to false by the fresh ChatThread's + // mount effect on every chat switch. + const [localStreaming, setLocalStreaming] = useState(false); + const onStreamingChange = useCallback((streaming: boolean) => { + setLocalStreaming(streaming); + }, []); + + // Poll the latest run of the open chat ONLY when we are a passive observer: + // feature on, a chat is open, and we are NOT the local streamer (the streamer + // already has the live SSE — polling/merging too would double-render). The + // query's own status-keyed refetchInterval stops once the run is terminal. + const { data: runData } = useAiChatRunQuery( + activeChatId ?? undefined, + autonomousRunsEnabled && !localStreaming, + ); + const run = runData?.run ?? null; + // The run's incrementally-persisted assistant message to merge into the thread, + // but only while we are an observer (never when we are the streamer — guards + // against a stale poll fighting the live stream). Includes a terminal run so the + // final persisted output is shown on reopen. + const observedRow = shouldObserveRun(run, localStreaming) + ? (runData?.message ?? null) + : null; + + // When the observed run reaches a terminal status, do a final messages refetch + // so the persisted final state (token/context badge, export source) is shown, + // then the query's refetchInterval has already stopped polling. Deduped per run + // id so it fires exactly once per run, not on every subsequent poll-less render. + const finalizedRunIdRef = useRef(null); + useEffect(() => { + if (!run || !activeChatId) return; + if (run.status === "pending" || run.status === "running") { + // Active again (a new run) — re-arm so its terminal transition fires once. + finalizedRunIdRef.current = null; + return; + } + if (finalizedRunIdRef.current === run.id) return; + finalizedRunIdRef.current = run.id; + queryClient.invalidateQueries({ + queryKey: AI_CHAT_MESSAGES_RQ_KEY(activeChatId), + }); + }, [run, activeChatId, queryClient]); + // The page the user is currently viewing. AiChatWindow lives in a pathless // parent layout route, so useParams() can't see :pageSlug. Match the full // pathname against the authenticated page route instead so "the current page" @@ -636,6 +692,12 @@ export default function AiChatWindow() { assistantName={currentRole?.name} onTurnFinished={onTurnFinished} onServerChatId={onServerChatId} + // #184: live-follow a still-running run when we reopened the chat as + // a passive observer; null when there is nothing to observe or this + // tab is the streamer. onStreamingChange lets the window stop polling + // while we are the streamer. + observedRow={observedRow} + onStreamingChange={onStreamingChange} /> )} diff --git a/apps/client/src/features/ai-chat/components/chat-thread.test.tsx b/apps/client/src/features/ai-chat/components/chat-thread.test.tsx index 94499d0f..985e5c41 100644 --- a/apps/client/src/features/ai-chat/components/chat-thread.test.tsx +++ b/apps/client/src/features/ai-chat/components/chat-thread.test.tsx @@ -11,6 +11,7 @@ const h = vi.hoisted(() => ({ onFinish: null as null | ((arg: Record) => void), sendMessage: vi.fn(), stop: vi.fn(), + setMessages: vi.fn(), transport: null as null | { prepareSendMessagesRequest: (arg: { messages: unknown[]; @@ -30,6 +31,8 @@ vi.mock("@ai-sdk/react", () => ({ status: h.state.status, stop: h.state.stop, error: null, + // #184: ChatThread reads setMessages to merge a polled observer run. + setMessages: h.state.setMessages, }; }, })); @@ -140,3 +143,56 @@ describe("ChatThread — send now (#198)", () => { expect(prep({ messages: [], body: {} }).body.interrupted).toBe(false); }); }); + +// #184 passive-observer merge: when reconnecting to a still-running run, the +// parent feeds the polled run message via `observedRow`; ChatThread merges it via +// setMessages — but ONLY when this tab is NOT itself streaming (the streamer's +// SSE owns the view, so a stale observedRow must never overwrite it). +describe("ChatThread — observer run merge (#184)", () => { + beforeEach(() => { + h.state.onFinish = null; + h.state.setMessages.mockReset(); + }); + + const observedRow = { + id: "a-run", + role: "assistant", + content: "step 1\nstep 2", + metadata: { + parts: [{ type: "text", text: "step 1\nstep 2" }], + }, + createdAt: "2026-01-01T00:00:00Z", + } as const; + + function renderObserver(status: string) { + h.state.status = status; + render( + + + , + ); + } + + it("merges the polled run message when this tab is a passive observer", () => { + renderObserver("ready"); + expect(h.state.setMessages).toHaveBeenCalledTimes(1); + // The updater replaces/append the observed assistant row by id. + const updater = h.state.setMessages.mock.calls[0][0] as ( + prev: { id: string; parts: { text: string }[] }[], + ) => { id: string; parts: { text: string }[] }[]; + const merged = updater([{ id: "u1", parts: [{ text: "hi" }] }]); + expect(merged).toHaveLength(2); + expect(merged[1].id).toBe("a-run"); + expect(merged[1].parts[0].text).toBe("step 1\nstep 2"); + }); + + it("does NOT merge while THIS tab is the streamer (no double-render)", () => { + renderObserver("streaming"); + expect(h.state.setMessages).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/client/src/features/ai-chat/components/chat-thread.tsx b/apps/client/src/features/ai-chat/components/chat-thread.tsx index 875e36f6..c9a0de20 100644 --- a/apps/client/src/features/ai-chat/components/chat-thread.tsx +++ b/apps/client/src/features/ai-chat/components/chat-thread.tsx @@ -24,6 +24,7 @@ import { } from "@/features/ai-chat/utils/role-launch.ts"; import { describeChatError } from "@/features/ai-chat/utils/error-message.ts"; import { extractServerChatId } from "@/features/ai-chat/utils/adopt-chat-id.ts"; +import { mergeObservedMessage } from "@/features/ai-chat/utils/run-polling.ts"; import { dequeue, enqueueMessage, @@ -86,6 +87,19 @@ interface ChatThreadProps { * Copy/export button available mid-stream). Distinct from onTurnFinished, * which fires only at the terminal outcome. */ onServerChatId?: (serverChatId?: string) => void; + /** #184 reconnect-and-live-follow. When THIS tab reopened a chat whose agent + * run is still going (it is a PASSIVE OBSERVER — it did not start the run here), + * the parent polls the reconnect endpoint and feeds the run's incrementally- + * persisted assistant message here; we merge it into the live list so new + * steps/tool-calls appear as they are persisted. Null when there is nothing to + * observe (no run, feature off, or this tab IS the streamer). The merge is + * ADDITIONALLY guarded by our own `isStreaming`, so a stale value can never + * fight the local stream when we are the streamer. */ + observedRow?: IAiChatMessageRow | null; + /** Report this tab's live streaming status up to the parent, so it can stop + * polling the run while WE are the active streamer (the SSE owns the view) and + * resume once we go idle. Called from an effect on every transition. */ + onStreamingChange?: (streaming: boolean) => void; } /** @@ -131,6 +145,8 @@ export default function ChatThread({ assistantName, onTurnFinished, onServerChatId, + observedRow, + onStreamingChange, }: ChatThreadProps) { const { t } = useTranslation(); @@ -274,7 +290,7 @@ export default function ChatThread({ [], ); - const { messages, sendMessage, status, stop, error } = useChat({ + const { messages, sendMessage, status, stop, error, setMessages } = useChat({ // Stable per-mount key. Existing chats use their real id; new chats use a // generated client id (never `undefined`) so the store is NOT re-created on // every render mid-stream (see `chatStoreId` above). @@ -378,6 +394,27 @@ export default function ChatThread({ const isStreaming = status === "submitted" || status === "streaming"; + // #184: report our live streaming status up so the parent stops polling the run + // while WE are the streamer (the SSE owns the view) and resumes once we go idle. + // Effect (not render) so it never updates parent state during our own render; + // fires on mount with `false`, which also re-syncs the parent after a chat + // switch remounts this thread (a fresh mount is idle until the user sends). + useEffect(() => { + onStreamingChange?.(isStreaming); + }, [isStreaming, onStreamingChange]); + + // #184 passive-observer merge: when the parent feeds a polled run message (we + // reopened a chat whose run is still going and did NOT start it here), merge it + // into the live list so new steps/tool-calls appear as they are persisted. Hard- + // gated by `!isStreaming`: if THIS tab is actually the streamer, the local SSE + // owns the view and a stale observedRow must never overwrite it. `observedRow` + // is a stable per-poll object, so this runs once per poll, not per render. + useEffect(() => { + if (isStreaming || !observedRow) return; + const observed = rowToUiMessage(observedRow); + setMessages((prev) => mergeObservedMessage(prev, observed)); + }, [observedRow, isStreaming, setMessages]); + // "Send now" on a queued message: interrupt the current turn and immediately // send THIS message, keeping the agent's partial output. Other queued messages // stay queued and flush normally after the new turn. Reuses the existing diff --git a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts index 37cf70f8..5dcf4afd 100644 --- a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts +++ b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts @@ -12,6 +12,7 @@ import { deleteAiChat, deleteAiRole, getAiChatMessages, + getAiChatRun, getAiChats, getAiRoleCatalog, getAiRoleCatalogBundle, @@ -24,6 +25,7 @@ import { import { IAiChat, IAiChatMessageRow, + IAiChatRunResponse, IAiRole, IAiRoleCatalog, IAiRoleCatalogBundle, @@ -34,6 +36,7 @@ import { IAiRoleUpdateFromCatalogResult, } from "@/features/ai-chat/types/ai-chat.types.ts"; import { IPagination } from "@/lib/types.ts"; +import { runPollInterval } from "@/features/ai-chat/utils/run-polling.ts"; export const AI_CHATS_RQ_KEY = ["ai-chats"]; export const AI_ROLES_RQ_KEY = ["ai-roles"]; @@ -51,6 +54,7 @@ export const AI_CHAT_MESSAGES_RQ_KEY = (chatId: string) => [ "ai-chat-messages", chatId, ]; +export const AI_CHAT_RUN_RQ_KEY = (chatId: string) => ["ai-chat-run", chatId]; /** Paginated list of the current user's chats (auto-loads further pages). */ export function useAiChatsQuery() { @@ -131,6 +135,33 @@ export function useAiChatMessagesQuery(chatId: string | undefined) { }; } +/** + * Reconnect to a chat's latest agent run and LIVE-FOLLOW it (#184). While the run + * is active the query re-polls every {@link runPollInterval} ms (driven off the + * fetched `run.status`, the same status-keyed refetchInterval pattern as the + * embeddings reindex polling); once the run reaches a terminal status — or there + * is no run — the interval returns `false` and polling stops on its own. Polling + * is thus naturally bounded by the run terminating; no separate timeout cap. + * + * `enabled` gates the whole thing: callers pass `false` when the autonomous-runs + * feature is off (the endpoint is flag-gated server-side and would 403) OR when + * THIS tab is the one actively streaming the run (the live SSE owns the view, so + * we must not also poll/merge). The global `retry: false` means a 403/anything + * leaves `data` undefined, so refetchInterval(undefined run) returns false — a + * failed fetch can never spin a tight loop. + */ +export function useAiChatRunQuery( + chatId: string | undefined, + enabled: boolean, +) { + return useQuery({ + queryKey: AI_CHAT_RUN_RQ_KEY(chatId ?? ""), + queryFn: () => getAiChatRun(chatId as string), + enabled: !!chatId && enabled, + refetchInterval: (query) => runPollInterval(query.state.data?.run), + }); +} + export function useRenameAiChatMutation() { const queryClient = useQueryClient(); const { t } = useTranslation(); diff --git a/apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx b/apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx new file mode 100644 index 00000000..db09683a --- /dev/null +++ b/apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx @@ -0,0 +1,92 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import React from "react"; +import { renderHook, waitFor } from "@testing-library/react"; +import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import type { IAiChatRunResponse } from "@/features/ai-chat/types/ai-chat.types.ts"; + +// react-i18next is pulled in transitively by ai-chat-query.ts (the mutation hooks +// use it); stub it so the module imports cleanly in this hook test. +vi.mock("react-i18next", () => ({ + useTranslation: () => ({ t: (key: string) => key }), +})); + +vi.mock("@mantine/notifications", () => ({ + notifications: { show: vi.fn() }, +})); + +// Mock the whole service module; only getAiChatRun is exercised here, but the +// other named exports must exist so ai-chat-query.ts imports resolve. +vi.mock("@/features/ai-chat/services/ai-chat-service.ts", () => ({ + getAiChatRun: vi.fn(), + getAiChatMessages: vi.fn(), + getAiChats: vi.fn(), + getAiRoleCatalog: vi.fn(), + getAiRoleCatalogBundle: vi.fn(), + getAiRoles: vi.fn(), + importAiRolesFromCatalog: vi.fn(), + createAiRole: vi.fn(), + deleteAiChat: vi.fn(), + deleteAiRole: vi.fn(), + renameAiChat: vi.fn(), + updateAiRole: vi.fn(), + updateAiRoleFromCatalog: vi.fn(), +})); + +import { getAiChatRun } from "@/features/ai-chat/services/ai-chat-service.ts"; +import { useAiChatRunQuery } from "@/features/ai-chat/queries/ai-chat-query.ts"; + +function createWrapper() { + const queryClient = new QueryClient({ + defaultOptions: { queries: { retry: false } }, + }); + return function Wrapper({ children }: { children: React.ReactNode }) { + return ( + {children} + ); + }; +} + +const runningResponse: IAiChatRunResponse = { + run: { id: "run-1", chatId: "c1", status: "running" }, + message: { + id: "a1", + role: "assistant", + content: "working...", + createdAt: "2026-01-01T00:00:00Z", + }, +}; + +describe("useAiChatRunQuery — enable gating", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("fetches the run when enabled (passive observer, feature on)", async () => { + vi.mocked(getAiChatRun).mockResolvedValue(runningResponse); + const { result } = renderHook(() => useAiChatRunQuery("c1", true), { + wrapper: createWrapper(), + }); + await waitFor(() => expect(result.current.isSuccess).toBe(true)); + expect(getAiChatRun).toHaveBeenCalledWith("c1"); + expect(result.current.data?.run?.status).toBe("running"); + }); + + it("does NOT fetch when disabled (this tab is the streamer / feature off)", async () => { + vi.mocked(getAiChatRun).mockResolvedValue(runningResponse); + renderHook(() => useAiChatRunQuery("c1", false), { + wrapper: createWrapper(), + }); + // Give any errant fetch a chance to fire, then assert none did. + await new Promise((r) => setTimeout(r, 20)); + expect(getAiChatRun).not.toHaveBeenCalled(); + }); + + it("does NOT fetch when there is no chat id", async () => { + vi.mocked(getAiChatRun).mockResolvedValue(runningResponse); + renderHook(() => useAiChatRunQuery(undefined, true), { + wrapper: createWrapper(), + }); + await new Promise((r) => setTimeout(r, 20)); + expect(getAiChatRun).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/client/src/features/ai-chat/services/ai-chat-service.ts b/apps/client/src/features/ai-chat/services/ai-chat-service.ts index f8a57d0a..abd5ab8e 100644 --- a/apps/client/src/features/ai-chat/services/ai-chat-service.ts +++ b/apps/client/src/features/ai-chat/services/ai-chat-service.ts @@ -5,6 +5,7 @@ import { IAiChatListParams, IAiChatMessageRow, IAiChatMessagesParams, + IAiChatRunResponse, IAiRole, IAiRoleCatalog, IAiRoleCatalogBundle, @@ -42,6 +43,21 @@ export async function getAiChatMessages( return req.data; } +/** + * Reconnect to the latest agent run of a chat (#184). Returns the run's + * persisted lifecycle state and the assistant message it materializes (the + * partial output while the run is in-flight, the final output once it finished). + * The DB is the source of truth, so this works for an in-flight run (the browser + * dropped, the run kept going) and a finished one alike; `{ run: null }` when the + * chat has never had a run. Owner-gated and flag-gated server-side. + */ +export async function getAiChatRun( + chatId: string, +): Promise { + const req = await api.post("/ai-chat/run", { chatId }); + return req.data; +} + /** * Resolve the chat bound to a document (the current user's most-recent chat * created on that page), or null when there is none. Drives auto-open-on-page. diff --git a/apps/client/src/features/ai-chat/types/ai-chat.types.ts b/apps/client/src/features/ai-chat/types/ai-chat.types.ts index 78b049c0..8ed54ab2 100644 --- a/apps/client/src/features/ai-chat/types/ai-chat.types.ts +++ b/apps/client/src/features/ai-chat/types/ai-chat.types.ts @@ -200,6 +200,38 @@ export interface IAiChatMessageRow { createdAt: string; } +/** + * A persisted agent-run row (#184), mirroring the `ai_chat_runs` fields the + * client reads from `POST /ai-chat/run`. Only `status` is load-bearing for the + * reconnect-and-live-update UX (it drives the poll cadence); the rest are carried + * for display/diagnostics. The DB is the source of truth, so this resolves for an + * in-flight run (the browser dropped, the run kept going) and a finished one. + */ +export interface IAiChatRun { + id: string; + chatId: string; + // 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'. The first two are + // ACTIVE (keep polling); the rest are TERMINAL (stop polling). + status: "pending" | "running" | "succeeded" | "failed" | "aborted" | string; + error?: string | null; + stepCount?: number; + assistantMessageId?: string | null; + startedAt?: string | null; + finishedAt?: string | null; + createdAt?: string; + updatedAt?: string; +} + +/** + * Response of `POST /ai-chat/run` (#184): the latest run of a chat and the + * assistant message it materializes (the partial/final output, projected from the + * persisted rows). Both are `null` when the chat has never had a run. + */ +export interface IAiChatRunResponse { + run: IAiChatRun | null; + message: IAiChatMessageRow | null; +} + export interface IAiChatListParams extends QueryParams {} export interface IAiChatMessagesParams { diff --git a/apps/client/src/features/ai-chat/utils/run-polling.test.ts b/apps/client/src/features/ai-chat/utils/run-polling.test.ts new file mode 100644 index 00000000..a1a9cb26 --- /dev/null +++ b/apps/client/src/features/ai-chat/utils/run-polling.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect } from "vitest"; +import type { UIMessage } from "@ai-sdk/react"; +import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts"; +import { + RUN_POLL_INTERVAL_MS, + isRunActive, + runPollInterval, + shouldObserveRun, + mergeObservedMessage, +} from "./run-polling.ts"; + +function makeRun(status: string): IAiChatRun { + return { id: "run-1", chatId: "c1", status }; +} + +function makeMsg(id: string, text: string): UIMessage { + return { + id, + role: "assistant", + parts: [{ type: "text", text }], + } as UIMessage; +} + +describe("isRunActive", () => { + it("treats pending and running as active", () => { + expect(isRunActive(makeRun("pending"))).toBe(true); + expect(isRunActive(makeRun("running"))).toBe(true); + }); + + it("treats terminal / unknown / nullish as not active", () => { + expect(isRunActive(makeRun("succeeded"))).toBe(false); + expect(isRunActive(makeRun("failed"))).toBe(false); + expect(isRunActive(makeRun("aborted"))).toBe(false); + expect(isRunActive(makeRun("weird-future-status"))).toBe(false); + expect(isRunActive(null)).toBe(false); + expect(isRunActive(undefined)).toBe(false); + }); +}); + +describe("runPollInterval (the refetchInterval helper)", () => { + it("returns 2000ms while the run is pending/running", () => { + expect(runPollInterval(makeRun("pending"))).toBe(RUN_POLL_INTERVAL_MS); + expect(runPollInterval(makeRun("running"))).toBe(RUN_POLL_INTERVAL_MS); + expect(RUN_POLL_INTERVAL_MS).toBe(2000); + }); + + it("returns false (stop polling) once the run is terminal", () => { + expect(runPollInterval(makeRun("succeeded"))).toBe(false); + expect(runPollInterval(makeRun("failed"))).toBe(false); + expect(runPollInterval(makeRun("aborted"))).toBe(false); + }); + + it("returns false (no polling) when there is no run", () => { + expect(runPollInterval(null)).toBe(false); + expect(runPollInterval(undefined)).toBe(false); + }); +}); + +describe("shouldObserveRun (observer-vs-streamer decision)", () => { + it("observes an active run when this tab is NOT the local streamer", () => { + expect(shouldObserveRun(makeRun("running"), false)).toBe(true); + expect(shouldObserveRun(makeRun("pending"), false)).toBe(true); + }); + + it("observes a terminal run too (so the final output shows on reopen)", () => { + expect(shouldObserveRun(makeRun("succeeded"), false)).toBe(true); + }); + + it("does NOT observe when this tab IS the streamer (no double-render)", () => { + expect(shouldObserveRun(makeRun("running"), true)).toBe(false); + expect(shouldObserveRun(makeRun("succeeded"), true)).toBe(false); + }); + + it("does NOT observe when there is no run", () => { + expect(shouldObserveRun(null, false)).toBe(false); + expect(shouldObserveRun(undefined, false)).toBe(false); + }); +}); + +describe("mergeObservedMessage", () => { + it("replaces the message with the same id in place (per-step growth)", () => { + const prev = [makeMsg("u1", "hi"), makeMsg("a1", "step 1")]; + const observed = makeMsg("a1", "step 1\nstep 2"); + const next = mergeObservedMessage(prev, observed); + expect(next).toHaveLength(2); + expect(next[1]).toBe(observed); + expect(next[0]).toBe(prev[0]); // untouched + expect(next).not.toBe(prev); // new array (never mutates input) + }); + + it("appends when the observed message is not yet present", () => { + const prev = [makeMsg("u1", "hi")]; + const observed = makeMsg("a1", "first token"); + const next = mergeObservedMessage(prev, observed); + expect(next).toHaveLength(2); + expect(next[1]).toBe(observed); + }); + + it("returns the original list unchanged when there is nothing to merge", () => { + const prev = [makeMsg("u1", "hi")]; + expect(mergeObservedMessage(prev, null)).toBe(prev); + expect(mergeObservedMessage(prev, undefined)).toBe(prev); + }); +}); diff --git a/apps/client/src/features/ai-chat/utils/run-polling.ts b/apps/client/src/features/ai-chat/utils/run-polling.ts new file mode 100644 index 00000000..c6e4c006 --- /dev/null +++ b/apps/client/src/features/ai-chat/utils/run-polling.ts @@ -0,0 +1,71 @@ +import type { UIMessage } from "@ai-sdk/react"; +import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts"; + +/** + * Reconnect-and-live-follow helpers (#184). When a chat is reopened while its + * agent run is STILL going, this tab is a PASSIVE OBSERVER: it did not start the + * run here (no local SSE stream), so it catches up by POLLING the reconnect + * endpoint (`POST /ai-chat/run`) and merging the run's incrementally-persisted + * assistant message into the rendered thread. These are the small pure decisions + * that machinery hangs off, extracted so they can be unit-tested in isolation + * (mirrors how reindex polling / editor-sync-state are tested). + */ + +/** How often to re-poll the reconnect endpoint while a run is ACTIVE. */ +export const RUN_POLL_INTERVAL_MS = 2000; + +// 'pending' and 'running' are the two ACTIVE statuses; 'succeeded' | 'failed' | +// 'aborted' are TERMINAL (and any unknown future status is treated as terminal, +// so a stale/odd value never polls forever). +const ACTIVE_STATUSES = new Set(["pending", "running"]); + +/** Whether a run is still going (worth polling / merging live updates from). */ +export function isRunActive(run: IAiChatRun | null | undefined): boolean { + return !!run && ACTIVE_STATUSES.has(run.status); +} + +/** + * The TanStack Query `refetchInterval` value for the run query: poll every + * {@link RUN_POLL_INTERVAL_MS} while the run is active, and `false` (stop) once + * it is terminal or there is no run. Polling is thus naturally bounded by the run + * reaching a terminal status — no separate timeout cap is needed. + */ +export function runPollInterval( + run: IAiChatRun | null | undefined, +): number | false { + return isRunActive(run) ? RUN_POLL_INTERVAL_MS : false; +} + +/** + * Observer-vs-streamer decision. We render the polled run message (catch up + + * keep advancing) ONLY when this tab is a passive observer: there IS a run AND + * this tab is NOT the one locally streaming it (we reconnected, we didn't start + * it here). When this tab is the streamer, the live SSE stream owns the view, so + * we neither poll nor merge — avoiding a double-render fight. Terminal runs still + * merge (so the final persisted output is shown on reopen); the poll itself is + * stopped separately by {@link runPollInterval}. + */ +export function shouldObserveRun( + run: IAiChatRun | null | undefined, + localStreaming: boolean, +): boolean { + return !!run && !localStreaming; +} + +/** + * Merge an observed assistant message into the rendered list: replace the message + * with the same id in place (the in-progress assistant row is already seeded from + * history, so per-step growth replaces it), or append it when absent. Returns a + * new array; the input is never mutated. + */ +export function mergeObservedMessage( + messages: UIMessage[], + observed: UIMessage | null | undefined, +): UIMessage[] { + if (!observed) return messages; + const idx = messages.findIndex((m) => m.id === observed.id); + if (idx === -1) return [...messages, observed]; + const next = messages.slice(); + next[idx] = observed; + return next; +} diff --git a/apps/client/src/features/workspace/types/workspace.types.ts b/apps/client/src/features/workspace/types/workspace.types.ts index 99824db6..f84eb550 100644 --- a/apps/client/src/features/workspace/types/workspace.types.ts +++ b/apps/client/src/features/workspace/types/workspace.types.ts @@ -65,6 +65,9 @@ export interface IWorkspaceAiSettings { dictation?: boolean; dictationStreaming?: boolean; publicShareAssistant?: boolean; + // #184: detached agent runs (a run survives a browser disconnect and can be + // reconnected to / live-followed on reopen). Gates the run-reconnect polling. + autonomousRuns?: boolean; } export interface IWorkspaceSharingSettings {