From 68899a2c2ee73955fa5b909d8b67debb3f626e39 Mon Sep 17 00:00:00 2001 From: agent_coder Date: Sat, 4 Jul 2026 23:20:21 +0300 Subject: [PATCH] =?UTF-8?q?feat(ai-chat):=20durable=20detached=20agent=20r?= =?UTF-8?q?uns=20=E2=80=94=20phase=201=20(#184/#234)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Squashed for a clean rebase onto develop (was 19 commits; the reviewer approved the net diff at fb246080). Detaches an agent run from the HTTP request/browser window: a run is a first-class lifecycle object (ai_chat_runs), a browser disconnect no longer kills it, a concurrent-run insert-gate prevents double runs, and a reopened chat live-follows a still-running run via a polled observer merge. Co-Authored-By: Claude Opus 4.8 (1M context) --- .env.example | 14 + AGENTS.md | 1 + CHANGELOG.md | 13 + .../ai-chat/components/ai-chat-window.tsx | 168 ++- .../ai-chat/components/chat-thread.test.tsx | 56 + .../ai-chat/components/chat-thread.tsx | 111 +- .../features/ai-chat/queries/ai-chat-query.ts | 56 +- .../queries/ai-chat-run-query.test.tsx | 92 ++ .../ai-chat/services/ai-chat-service.ts | 33 + .../features/ai-chat/types/ai-chat.types.ts | 32 + .../ai-chat/utils/run-polling.test.ts | 303 ++++ .../src/features/ai-chat/utils/run-polling.ts | 151 ++ .../components/ai-provider-settings.tsx | 62 + .../workspace/types/workspace.types.ts | 6 + .../core/ai-chat/ai-chat-run.service.spec.ts | 527 +++++++ .../src/core/ai-chat/ai-chat-run.service.ts | 452 ++++++ .../ai-chat.controller.bound-chat.spec.ts | 1 + .../ai-chat/ai-chat.controller.export.spec.ts | 1 + .../ai-chat/ai-chat.controller.run.spec.ts | 164 +++ .../src/core/ai-chat/ai-chat.controller.ts | 216 ++- .../ai-chat.generate-page-title.spec.ts | 1 + .../server/src/core/ai-chat/ai-chat.module.ts | 2 + .../ai-chat/ai-chat.service.lifecycle.spec.ts | 100 +- .../ai-chat/ai-chat.service.run-race.spec.ts | 489 +++++++ .../src/core/ai-chat/ai-chat.service.spec.ts | 6 + .../src/core/ai-chat/ai-chat.service.ts | 1260 +++++++++-------- .../src/core/ai-chat/dto/ai-chat.dto.ts | 24 + .../workspace/dto/update-workspace.dto.ts | 8 + .../workspace/services/workspace.service.ts | 15 + apps/server/src/database/database.module.ts | 3 + .../20260627T130000-ai-chat-runs.ts | 106 ++ .../repos/ai-chat/ai-chat-message.repo.ts | 17 + .../repos/ai-chat/ai-chat-run.repo.spec.ts | 82 ++ .../repos/ai-chat/ai-chat-run.repo.ts | 212 +++ apps/server/src/database/types/db.d.ts | 30 + .../server/src/database/types/entity.types.ts | 22 +- .../test/integration/ai-chat-run.int-spec.ts | 304 ++++ 37 files changed, 4535 insertions(+), 605 deletions(-) create mode 100644 apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx create mode 100644 apps/client/src/features/ai-chat/utils/run-polling.test.ts create mode 100644 apps/client/src/features/ai-chat/utils/run-polling.ts create mode 100644 apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts create mode 100644 apps/server/src/core/ai-chat/ai-chat-run.service.ts create mode 100644 apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts create mode 100644 apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts create mode 100644 apps/server/src/database/migrations/20260627T130000-ai-chat-runs.ts create mode 100644 apps/server/src/database/repos/ai-chat/ai-chat-run.repo.spec.ts create mode 100644 apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts create mode 100644 apps/server/test/integration/ai-chat-run.int-spec.ts diff --git a/.env.example b/.env.example index d3d63309..a704b6ca 100644 --- a/.env.example +++ b/.env.example @@ -209,6 +209,20 @@ MCP_DOCMOST_PASSWORD= # active" behavior. # AI_CHAT_DEFERRED_TOOLS=true +# --- Autonomous / detached agent runs (settings.ai.autonomousRuns) --- +# Opt-in per workspace (AI settings; off by default). When on, a chat turn becomes +# a server-side RUN that survives a browser disconnect — only an explicit Stop ends +# it, and a client reconnects/live-follows the run. +# +# DEPLOY CONSTRAINT — SINGLE-INSTANCE ONLY in phase 1: Stop and the in-process +# AbortController that backs it are process-local, so a Stop only aborts a run +# executing on the SAME replica that owns it (cross-instance pub/sub stop is phase +# 2 and not yet reliable). Do NOT enable autonomousRuns on a horizontally-scaled +# deployment (multiple replicas behind a load balancer, or Docmost cloud +# CLOUD=true) — run a single instance instead. The server logs a startup WARNING +# when it detects a multi-instance deployment (CLOUD=true) so the constraint is +# visible, and a startup sweep settles any run left dangling by a restart. + # --- Anonymous public-share AI assistant --- # Opt-in per workspace (AI settings -> "public share assistant"; off by default). # When enabled, anonymous visitors of a published share can ask an AI about that diff --git a/AGENTS.md b/AGENTS.md index 9bedbc39..a68d5d7a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -279,6 +279,7 @@ The API server is a Fastify app with a global `/api` prefix (`main.ts` excludes - `core/ai-chat/tools/` — the agent's ~40 read+write tools. Every tool runs under the **calling user's** CASL permissions via a per-user loopback access token (`docmost-client.loader.ts`), so the agent can never exceed what the user could do. Only **reversible** operations are exposed (page history + trash; no permanent delete). Agent edits get an "AI agent" provenance badge in page history (`20260616T130000-agent-provenance` migration). - `core/ai-chat/embedding/` — RAG indexer + a BullMQ consumer on `AI_QUEUE` that embeds pages into `page_embeddings` (vector search), complementing Postgres full-text search. Pages are (re)indexed on edit; `AI_EMBEDDING_TIMEOUT_MS` bounds a hung embeddings endpoint. - `core/ai-chat/external-mcp/` — admins can attach external MCP servers (e.g. Tavily) to give the agent web access. **`ssrf-guard.ts` validates outbound MCP URLs against SSRF** — keep that guard in the path when touching external-MCP connection logic. + - `core/ai-chat/ai-chat-run.service.ts` + `ai_chat_runs` — **detached/autonomous agent runs** (`#184`), behind the per-workspace `settings.ai.autonomousRuns` flag (off by default). When on, a turn becomes a server-side RUN that survives a browser disconnect; only an explicit `POST /ai-chat/stop` ends it, and a client reconnects/live-follows via `POST /ai-chat/run`. **DEPLOY CONSTRAINT — single-instance only in phase 1:** Stop and the AbortController that backs it are process-local, so a Stop only aborts a run executing on the **same** replica that owns it (cross-instance pub/sub stop is phase 2). Do **not** enable `autonomousRuns` on a horizontally-scaled deployment (multiple replicas behind a load balancer, or Docmost cloud `CLOUD=true`) — run a single instance instead. The server logs a startup WARNING when it detects a multi-instance deployment (`CLOUD=true`) so the constraint is visible. The startup sweep settles any run left dangling by a restart. ### Client structure Vite SPA. Code is organized by feature under `apps/client/src/features/*` (mirrors the server domains: `page`, `space`, `comment`, `ai-chat`, `editor`, …). Conventions: diff --git a/CHANGELOG.md b/CHANGELOG.md index c7003b7b..aea1854d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 append/prepend fragments, nor to COMMENT bodies — a comment may legitimately contain a standalone footnote definition, which canonicalization would drop. (#228) +- **Detached, autonomous agent runs that survive a browser disconnect.** When the + new `settings.ai.autonomousRuns` workspace flag is on (off by default), an + AI-chat turn becomes a first-class, server-side RUN tracked in a new + `ai_chat_runs` table instead of a socket-bound stream: closing the tab or + losing the connection no longer aborts the turn — it keeps executing and + persisting server-side, and only an explicit Stop ends it. A client can + reconnect and live-follow (or stop) an in-flight run via `POST /ai-chat/run` + (resolve the latest run + its assistant message for a chat) and + `POST /ai-chat/stop` (stop by `runId` or `chatId`). A partial unique index + enforces one active run per chat, and a startup sweep settles any run left + dangling by a restart. Phase 1 is single-instance-only (cross-instance Stop is + not yet reliable); the server warns at startup on a horizontally-scaled + deployment. (#184) - **Out-of-band page transfer via an in-RAM blob sandbox (`stash_page`).** A new MCP tool serializes a whole page (its full ProseMirror JSON, with every internal image/file mirrored) into an ephemeral in-RAM blob and returns only diff --git a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx index c26bfa2d..5c67e177 100644 --- a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx +++ b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx @@ -19,7 +19,7 @@ import { IconPlus, IconX, } from "@tabler/icons-react"; -import { useAtom, useSetAtom } from "jotai"; +import { useAtom, useAtomValue, useSetAtom } from "jotai"; import { useLocation, useMatch } from "react-router-dom"; import { useTranslation } from "react-i18next"; import { useQueryClient } from "@tanstack/react-query"; @@ -41,13 +41,24 @@ import { extractPageSlugId } from "@/lib"; import { AI_CHATS_RQ_KEY, AI_CHAT_MESSAGES_RQ_KEY, + AI_CHAT_RUN_RQ_KEY, useAiChatMessagesQuery, + useAiChatRunQuery, useAiChatsQuery, useAiRolesQuery, } from "@/features/ai-chat/queries/ai-chat-query.ts"; +import { + shouldClearLatchOnQueryError, + shouldClearStoppingLatch, + shouldObserveRun, +} from "@/features/ai-chat/utils/run-polling.ts"; +import { workspaceAtom } from "@/features/user/atoms/current-user-atom"; import ConversationList from "@/features/ai-chat/components/conversation-list.tsx"; import ChatThread from "@/features/ai-chat/components/chat-thread.tsx"; -import { exportAiChat } from "@/features/ai-chat/services/ai-chat-service.ts"; +import { + exportAiChat, + stopRun, +} from "@/features/ai-chat/services/ai-chat-service.ts"; import { useChatSession } from "@/features/ai-chat/hooks/use-chat-session.ts"; import { shouldCollapseOnOutsidePointer, @@ -234,6 +245,147 @@ export default function AiChatWindow() { const { data: messageRows, isLoading: messagesLoading } = useAiChatMessagesQuery(activeChatId ?? undefined); + // #184 reconnect-and-live-follow. Whether detached agent runs are enabled for + // this workspace. The reconnect endpoint itself is NOT flag-gated server-side + // (it is only owner-gated and returns `{ run: null }` when the chat has no + // run); but when the feature is off no runs are ever created, so polling it + // would always come back empty — we gate it off here to avoid pointless polls. + const workspace = useAtomValue(workspaceAtom); + const autonomousRunsEnabled = + workspace?.settings?.ai?.autonomousRuns === true; + + // Whether THIS tab is the one actively streaming the open chat's run locally + // (it started the run here and holds the SSE). Reported up from ChatThread. We + // are the STREAMER while true and a passive OBSERVER while false — the basis of + // the observer-vs-streamer detection. Reset to false by the fresh ChatThread's + // mount effect on every chat switch. + const [localStreaming, setLocalStreaming] = useState(false); + const onStreamingChange = useCallback((streaming: boolean) => { + setLocalStreaming(streaming); + }, []); + + // #184 Stop wiring. While a detached run is being stopped we SUPPRESS the + // observer merge so the stopping run's still-persisting output does not + // re-stream back into view between the moment the user pressed Stop and the run + // actually settling as 'aborted' server-side. Polling itself keeps running (so + // the terminal transition is still detected) — only the visual merge is gated. + // Cleared when the run is observed terminal (below) or the chat is switched. + const [stoppingRun, setStoppingRun] = useState(false); + // Reset the stopping latch whenever the open chat changes: it is scoped to the + // run of the previously-open chat. + useEffect(() => { + setStoppingRun(false); + }, [activeChatId]); + + // Authoritative stop of the open chat's detached run (the Stop button in + // autonomous mode). Latch "stopping" first (suppresses the re-stream flash), + // then request the server stop — the ONLY thing that ends a detached run; a mere + // local SSE abort is a client disconnect the server ignores. On failure we + // release the latch so the observer resumes (better to show the live run than to + // freeze the view) and surface the error. + const handleServerStop = useCallback( + (chatId: string): void => { + setStoppingRun(true); + // #234 F4: drop the PREVIOUS turn's run from the cache so `run` becomes null + // until the CURRENT turn's run is fetched fresh. Without this, once the local + // stream aborts (localStreaming -> false) the run query re-enables and + // react-query SYNCHRONOUSLY returns the still-cached prior terminal run; the + // terminal effect would then clear the stopping latch against that STALE run + // before the current turn's (still-running, detached, growing) run is ever + // observed — re-opening the observer merge and flashing the growing output + // over the frozen row. With the cache cleared the terminal effect's + // `if (!run) return` holds the latch until the current run itself is observed + // terminal (see shouldClearStoppingLatch). + queryClient.removeQueries({ queryKey: AI_CHAT_RUN_RQ_KEY(chatId) }); + void stopRun(chatId).catch(() => { + setStoppingRun(false); + notifications.show({ + message: t("Failed to stop the run"), + color: "red", + }); + }); + }, + [t, queryClient], + ); + + // Poll the latest run of the open chat ONLY when we are a passive observer: + // feature on, a chat is open, and we are NOT the local streamer (the streamer + // already has the live SSE — polling/merging too would double-render). The + // query's own status-keyed refetchInterval stops once the run is terminal. + const { data: runData, isError: runQueryFailed } = useAiChatRunQuery( + activeChatId ?? undefined, + autonomousRunsEnabled && !localStreaming, + ); + const run = runData?.run ?? null; + + // Safety net (#234 F4 review): after handleServerStop clears the run cache, + // `run` is null until the current turn's run is fetched fresh, and the terminal + // effect below holds the latch via `if (!run) return`. If that refetch instead + // ERRORS PERMANENTLY (the GET-run keeps failing) while we are no longer the + // streamer, the run stays null, its status-keyed refetchInterval is off, and + // nothing would ever observe a terminal run — freezing the view with the + // observer merge suppressed. Release the latch on that error so the live view + // resumes rather than stays stuck (the local stopRun may already have succeeded + // independently). + // + // #234 F7: this must NOT fire on a TRANSIENT error while `run` is still an + // ACTIVE held run. In TanStack Query v5 (retry:false) the query's `data` is + // RETAINED on error, so `runQueryFailed` can be true while `run` is still + // pending/running — releasing then would re-open the observer merge and flash + // the growing detached run over the frozen row (the very flash F4 prevents). The + // decision is the pure, unit-tested `shouldClearLatchOnQueryError`, which gates + // on the run NOT being active: it cures only the genuine permanent-null-freeze + // (`run === null`) and never releases against an active run. + useEffect(() => { + if ( + shouldClearLatchOnQueryError({ + stoppingRun, + isLocalStreaming: localStreaming, + runQueryFailed, + run, + }) + ) + setStoppingRun(false); + }, [stoppingRun, localStreaming, runQueryFailed, run]); + // The run's incrementally-persisted assistant message to merge into the thread, + // but only while we are an observer (never when we are the streamer — guards + // against a stale poll fighting the live stream). Includes a terminal run so the + // final persisted output is shown on reopen. + const observedRow = + shouldObserveRun(run, localStreaming) && !stoppingRun + ? (runData?.message ?? null) + : null; + + // When the observed run reaches a terminal status, do a final messages refetch + // so the persisted final state (token/context badge, export source) is shown, + // then the query's refetchInterval has already stopped polling. Deduped per run + // id so it fires exactly once per run, not on every subsequent poll-less render. + const finalizedRunIdRef = useRef(null); + useEffect(() => { + if (!run || !activeChatId) return; + if (run.status === "pending" || run.status === "running") { + // Active again (a new run) — re-arm so its terminal transition fires once. + finalizedRunIdRef.current = null; + return; + } + // Terminal: a stop we requested has landed (or the run finished on its own), + // so release the stopping latch — the observer merge can now show the final + // persisted (aborted/finished) output without any live re-stream. The decision + // is the pure, unit-tested `shouldClearStoppingLatch` (run-polling.ts): release + // ONLY when we requested a stop, this tab is no longer the streamer, AND the + // CURRENT run is terminal. The #234 F4 cache removal in handleServerStop makes + // `run` null (this branch's `if (!run) return` above holds) until the current + // turn's run is fetched fresh, so the latch can never clear against a stale + // cached run. + if (shouldClearStoppingLatch({ stoppingRun, run, isLocalStreaming: localStreaming })) + setStoppingRun(false); + if (finalizedRunIdRef.current === run.id) return; + finalizedRunIdRef.current = run.id; + queryClient.invalidateQueries({ + queryKey: AI_CHAT_MESSAGES_RQ_KEY(activeChatId), + }); + }, [run, activeChatId, queryClient, stoppingRun, localStreaming]); + // The page the user is currently viewing. AiChatWindow lives in a pathless // parent layout route, so useParams() can't see :pageSlug. Match the full // pathname against the authenticated page route instead so "the current page" @@ -882,6 +1034,18 @@ export default function AiChatWindow() { assistantName={currentRole?.name} onTurnFinished={onTurnFinished} onServerChatId={onServerChatId} + // #184: live-follow a still-running run when we reopened the chat as + // a passive observer; null when there is nothing to observe or this + // tab is the streamer. onStreamingChange lets the window stop polling + // while we are the streamer. + observedRow={observedRow} + onStreamingChange={onStreamingChange} + // #184: in autonomous mode the Stop button must hit the authoritative + // server stop (a local SSE abort is a client disconnect the server + // ignores). onServerStop also arms the "stopping" latch above so the + // stopped run's output does not re-stream via the observer merge. + autonomousRunsEnabled={autonomousRunsEnabled} + onServerStop={handleServerStop} /> )} diff --git a/apps/client/src/features/ai-chat/components/chat-thread.test.tsx b/apps/client/src/features/ai-chat/components/chat-thread.test.tsx index 359abbd7..0e9c184e 100644 --- a/apps/client/src/features/ai-chat/components/chat-thread.test.tsx +++ b/apps/client/src/features/ai-chat/components/chat-thread.test.tsx @@ -11,6 +11,7 @@ const h = vi.hoisted(() => ({ onFinish: null as null | ((arg: Record) => void), sendMessage: vi.fn(), stop: vi.fn(), + setMessages: vi.fn(), transport: null as null | { prepareSendMessagesRequest: (arg: { messages: unknown[]; @@ -30,6 +31,8 @@ vi.mock("@ai-sdk/react", () => ({ status: h.state.status, stop: h.state.stop, error: null, + // #184: ChatThread reads setMessages to merge a polled observer run. + setMessages: h.state.setMessages, }; }, })); @@ -228,3 +231,56 @@ describe("ChatThread — turn-end decision (onFinish)", () => { } }); }); + +// #184 passive-observer merge: when reconnecting to a still-running run, the +// parent feeds the polled run message via `observedRow`; ChatThread merges it via +// setMessages — but ONLY when this tab is NOT itself streaming (the streamer's +// SSE owns the view, so a stale observedRow must never overwrite it). +describe("ChatThread — observer run merge (#184)", () => { + beforeEach(() => { + h.state.onFinish = null; + h.state.setMessages.mockReset(); + }); + + const observedRow = { + id: "a-run", + role: "assistant", + content: "step 1\nstep 2", + metadata: { + parts: [{ type: "text", text: "step 1\nstep 2" }], + }, + createdAt: "2026-01-01T00:00:00Z", + } as const; + + function renderObserver(status: string) { + h.state.status = status; + render( + + + , + ); + } + + it("merges the polled run message when this tab is a passive observer", () => { + renderObserver("ready"); + expect(h.state.setMessages).toHaveBeenCalledTimes(1); + // The updater replaces/append the observed assistant row by id. + const updater = h.state.setMessages.mock.calls[0][0] as ( + prev: { id: string; parts: { text: string }[] }[], + ) => { id: string; parts: { text: string }[] }[]; + const merged = updater([{ id: "u1", parts: [{ text: "hi" }] }]); + expect(merged).toHaveLength(2); + expect(merged[1].id).toBe("a-run"); + expect(merged[1].parts[0].text).toBe("step 1\nstep 2"); + }); + + it("does NOT merge while THIS tab is the streamer (no double-render)", () => { + renderObserver("streaming"); + expect(h.state.setMessages).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/client/src/features/ai-chat/components/chat-thread.tsx b/apps/client/src/features/ai-chat/components/chat-thread.tsx index 875e36f6..a6ec1908 100644 --- a/apps/client/src/features/ai-chat/components/chat-thread.tsx +++ b/apps/client/src/features/ai-chat/components/chat-thread.tsx @@ -24,6 +24,7 @@ import { } from "@/features/ai-chat/utils/role-launch.ts"; import { describeChatError } from "@/features/ai-chat/utils/error-message.ts"; import { extractServerChatId } from "@/features/ai-chat/utils/adopt-chat-id.ts"; +import { mergeObservedMessage } from "@/features/ai-chat/utils/run-polling.ts"; import { dequeue, enqueueMessage, @@ -86,6 +87,29 @@ interface ChatThreadProps { * Copy/export button available mid-stream). Distinct from onTurnFinished, * which fires only at the terminal outcome. */ onServerChatId?: (serverChatId?: string) => void; + /** #184 reconnect-and-live-follow. When THIS tab reopened a chat whose agent + * run is still going (it is a PASSIVE OBSERVER — it did not start the run here), + * the parent polls the reconnect endpoint and feeds the run's incrementally- + * persisted assistant message here; we merge it into the live list so new + * steps/tool-calls appear as they are persisted. Null when there is nothing to + * observe (no run, feature off, or this tab IS the streamer). The merge is + * ADDITIONALLY guarded by our own `isStreaming`, so a stale value can never + * fight the local stream when we are the streamer. */ + observedRow?: IAiChatMessageRow | null; + /** Report this tab's live streaming status up to the parent, so it can stop + * polling the run while WE are the active streamer (the SSE owns the view) and + * resume once we go idle. Called from an effect on every transition. */ + onStreamingChange?: (streaming: boolean) => void; + /** #184: whether detached/autonomous agent runs are enabled for this workspace. + * When true the Stop button must additionally hit the AUTHORITATIVE server stop + * (via onServerStop) — aborting only the local SSE is just a client disconnect, + * which the server deliberately ignores, so the detached run would keep going. */ + autonomousRunsEnabled?: boolean; + /** #184: request the server-side stop of this chat's active run (the parent owns + * the endpoint call + the "stopping" latch that keeps observer-polling from + * immediately re-streaming the stopping run's output). Called with the resolved + * chat id when the user presses Stop in autonomous mode. */ + onServerStop?: (chatId: string) => void; } /** @@ -131,6 +155,10 @@ export default function ChatThread({ assistantName, onTurnFinished, onServerChatId, + observedRow, + onStreamingChange, + autonomousRunsEnabled, + onServerStop, }: ChatThreadProps) { const { t } = useTranslation(); @@ -216,6 +244,16 @@ export default function ChatThread({ const flushOnAbortRef = useRef(false); const interruptNextSendRef = useRef(false); + // #234 F5: the user pressed Stop while streaming a BRAND-NEW chat whose server + // chat id has not been adopted yet (the `start` chunk carrying it hadn't landed + // when Stop was pressed). A local SSE abort alone does NOT stop the DETACHED + // autonomous run — it keeps burning tokens and WRITING TO PAGES — so we cannot + // just no-op. We latch the stop as PENDING and fire the authoritative server + // stop the moment onServerChatId adopts the id (below). Read-and-cleared there; + // also defused on every new turn start so it can never fire against a later, + // unrelated turn's run. + const stopPendingRef = useRef(false); + // FIFO dequeue + send the next queued message (no-op when the queue is empty). // Returns whether a message was actually sent, so callers can tell an empty // dequeue (nothing to flush) from a real send. @@ -274,7 +312,7 @@ export default function ChatThread({ [], ); - const { messages, sendMessage, status, stop, error } = useChat({ + const { messages, sendMessage, status, stop, error, setMessages } = useChat({ // Stable per-mount key. Existing chats use their real id; new chats use a // generated client id (never `undefined`) so the store is NOT re-created on // every render mid-stream (see `chatStoreId` above). @@ -365,7 +403,14 @@ export default function ChatThread({ return; lastForwardedChatIdRef.current = serverChatId; onServerChatId(serverChatId); - }, [messages, onServerChatId]); + // #234 F5: if Stop was pressed before the id was known, the authoritative + // server stop was deferred to this adoption point — fire it now with the + // just-adopted id. One-shot (read-and-clear) so it can't fire twice. + if (stopPendingRef.current) { + stopPendingRef.current = false; + onServerStop?.(serverChatId); + } + }, [messages, onServerChatId, onServerStop]); // Live "turn was interrupted" marker for the CURRENT session. The red error // banner (driven by `error`) covers the error case; this covers an aborted @@ -378,6 +423,27 @@ export default function ChatThread({ const isStreaming = status === "submitted" || status === "streaming"; + // #184: report our live streaming status up so the parent stops polling the run + // while WE are the streamer (the SSE owns the view) and resumes once we go idle. + // Effect (not render) so it never updates parent state during our own render; + // fires on mount with `false`, which also re-syncs the parent after a chat + // switch remounts this thread (a fresh mount is idle until the user sends). + useEffect(() => { + onStreamingChange?.(isStreaming); + }, [isStreaming, onStreamingChange]); + + // #184 passive-observer merge: when the parent feeds a polled run message (we + // reopened a chat whose run is still going and did NOT start it here), merge it + // into the live list so new steps/tool-calls appear as they are persisted. Hard- + // gated by `!isStreaming`: if THIS tab is actually the streamer, the local SSE + // owns the view and a stale observedRow must never overwrite it. `observedRow` + // is a stable per-poll object, so this runs once per poll, not per render. + useEffect(() => { + if (isStreaming || !observedRow) return; + const observed = rowToUiMessage(observedRow); + setMessages((prev) => mergeObservedMessage(prev, observed)); + }, [observedRow, isStreaming, setMessages]); + // "Send now" on a queued message: interrupt the current turn and immediately // send THIS message, keeping the agent's partial output. Other queued messages // stay queued and flush normally after the new turn. Reuses the existing @@ -409,6 +475,40 @@ export default function ChatThread({ [setQueue, stop], ); + // Stop the current turn. ALWAYS abort the local SSE (`stop()`) so the composer + // returns to idle immediately. In AUTONOMOUS mode the turn is a DETACHED run: + // aborting the local SSE is only a client disconnect, which the server ignores, + // so the run would keep executing — we ADDITIONALLY request the authoritative + // server-side stop (the parent owns that call + the "stopping" latch that keeps + // observer-polling from re-streaming the stopping run's output). The chat id is + // read live from chatIdRef (adopted early at the stream's `start` chunk); if it + // is not known yet — a brand-new chat in the first moment of its first turn — + // only the local abort happens (there is no server-side run handle to stop yet). + const handleStop = useCallback(() => { + stop(); + if (!autonomousRunsEnabled) return; + if (chatIdRef.current) { + onServerStop?.(chatIdRef.current); + } else { + // #234 F5: no chat id yet (brand-new chat in the first moment of its first + // turn, before the `start` chunk adopted the id). Latch the stop as pending; + // the onServerChatId adoption effect fires the deferred server stop as soon + // as the id appears, so the detached run is still authoritatively stopped + // instead of left running by a silent local-only abort. + // + // KNOWN LIMITATION (#234 F5 review): `stop()` above has already aborted the + // local SSE reader. In the rare sub-window where Stop is pressed while still + // `submitted` (request sent, not one chunk read yet), that abort can cancel + // the reader BEFORE the `start` chunk is applied to `messages`, so the + // adoption effect never runs and this pending stop never fires. The detached + // run then keeps going for that turn. This is not a regression (the pre-fix + // behavior sent no server stop at all); closing it fully would require + // deferring the local abort until adoption, which is riskier and out of scope + // for this fix. Documented so a future change can address the abort-ordering. + stopPendingRef.current = true; + } + }, [stop, autonomousRunsEnabled, onServerStop]); + // Clear the stopped marker as soon as a new turn begins streaming, and drop any // stale "Send now" interrupt flags. On the legit interrupt path both refs are // already consumed synchronously (onFinish + prepareSendMessagesRequest) before @@ -420,6 +520,11 @@ export default function ChatThread({ setStopNotice(null); flushOnAbortRef.current = false; interruptNextSendRef.current = false; + // #234 F5: a new turn is starting — drop any pending deferred-stop from a + // previous turn that never adopted an id, so it can never fire against this + // (or a later) unrelated turn's run. A deferred stop for the CURRENT turn is + // set AFTER this effect (on the Stop click), so this does not clobber it. + stopPendingRef.current = false; } }, [isStreaming]); @@ -539,7 +644,7 @@ export default function ChatThread({ sendMessage({ text })} onQueue={enqueue} - onStop={stop} + onStop={handleStop} isStreaming={isStreaming} /> diff --git a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts index 37cf70f8..555d11b9 100644 --- a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts +++ b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts @@ -12,6 +12,7 @@ import { deleteAiChat, deleteAiRole, getAiChatMessages, + getAiChatRun, getAiChats, getAiRoleCatalog, getAiRoleCatalogBundle, @@ -24,6 +25,7 @@ import { import { IAiChat, IAiChatMessageRow, + IAiChatRunResponse, IAiRole, IAiRoleCatalog, IAiRoleCatalogBundle, @@ -34,6 +36,7 @@ import { IAiRoleUpdateFromCatalogResult, } from "@/features/ai-chat/types/ai-chat.types.ts"; import { IPagination } from "@/lib/types.ts"; +import { runPollInterval } from "@/features/ai-chat/utils/run-polling.ts"; export const AI_CHATS_RQ_KEY = ["ai-chats"]; export const AI_ROLES_RQ_KEY = ["ai-roles"]; @@ -51,16 +54,18 @@ export const AI_CHAT_MESSAGES_RQ_KEY = (chatId: string) => [ "ai-chat-messages", chatId, ]; +export const AI_CHAT_RUN_RQ_KEY = (chatId: string) => ["ai-chat-run", chatId]; /** Paginated list of the current user's chats (auto-loads further pages). */ export function useAiChatsQuery() { const query = useInfiniteQuery({ queryKey: AI_CHATS_RQ_KEY, - queryFn: ({ pageParam }) => - getAiChats({ cursor: pageParam, limit: 50 }), + queryFn: ({ pageParam }) => getAiChats({ cursor: pageParam, limit: 50 }), initialPageParam: undefined as string | undefined, getNextPageParam: (lastPage) => - lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined, + lastPage.meta.hasNextPage + ? (lastPage.meta.nextCursor ?? undefined) + : undefined, }); const data = useMemo | undefined>(() => { @@ -90,7 +95,9 @@ export function useAiChatMessagesQuery(chatId: string | undefined) { getAiChatMessages({ chatId: chatId as string, cursor: pageParam }), initialPageParam: undefined as string | undefined, getNextPageParam: (lastPage) => - lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined, + lastPage.meta.hasNextPage + ? (lastPage.meta.nextCursor ?? undefined) + : undefined, enabled: !!chatId, }); @@ -131,6 +138,34 @@ export function useAiChatMessagesQuery(chatId: string | undefined) { }; } +/** + * Reconnect to a chat's latest agent run and LIVE-FOLLOW it (#184). While the run + * is active the query re-polls every {@link runPollInterval} ms (driven off the + * fetched `run.status`, the same status-keyed refetchInterval pattern as the + * embeddings reindex polling); once the run reaches a terminal status — or there + * is no run — the interval returns `false` and polling stops on its own. Polling + * is thus naturally bounded by the run terminating; no separate timeout cap. + * + * `enabled` gates the whole thing: callers pass `false` when the autonomous-runs + * feature is off (the endpoint is NOT flag-gated server-side, but with the feature + * off the chat has no runs, so polling would only ever return `{ run: null }`) OR + * when THIS tab is the one actively streaming the run (the live SSE owns the view, + * so we must not also poll/merge). The global `retry: false` means a failed fetch + * leaves `data` undefined, so refetchInterval(undefined run) returns false — a + * failed fetch can never spin a tight loop. + */ +export function useAiChatRunQuery( + chatId: string | undefined, + enabled: boolean, +) { + return useQuery({ + queryKey: AI_CHAT_RUN_RQ_KEY(chatId ?? ""), + queryFn: () => getAiChatRun(chatId as string), + enabled: !!chatId && enabled, + refetchInterval: (query) => runPollInterval(query.state.data?.run), + }); +} + export function useRenameAiChatMutation() { const queryClient = useQueryClient(); const { t } = useTranslation(); @@ -280,11 +315,14 @@ export function useImportAiRolesFromCatalogMutation() { mutationFn: (payload) => importAiRolesFromCatalog(payload), onSuccess: (result) => { notifications.show({ - message: t("Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}", { - created: result.created, - renamed: result.renamed, - skipped: result.skipped, - }), + message: t( + "Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}", + { + created: result.created, + renamed: result.renamed, + skipped: result.skipped, + }, + ), }); // Surface partial failures (e.g. unique-name races) as a red warning. if (result.errors.length > 0) { diff --git a/apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx b/apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx new file mode 100644 index 00000000..db09683a --- /dev/null +++ b/apps/client/src/features/ai-chat/queries/ai-chat-run-query.test.tsx @@ -0,0 +1,92 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import React from "react"; +import { renderHook, waitFor } from "@testing-library/react"; +import { QueryClient, QueryClientProvider } from "@tanstack/react-query"; +import type { IAiChatRunResponse } from "@/features/ai-chat/types/ai-chat.types.ts"; + +// react-i18next is pulled in transitively by ai-chat-query.ts (the mutation hooks +// use it); stub it so the module imports cleanly in this hook test. +vi.mock("react-i18next", () => ({ + useTranslation: () => ({ t: (key: string) => key }), +})); + +vi.mock("@mantine/notifications", () => ({ + notifications: { show: vi.fn() }, +})); + +// Mock the whole service module; only getAiChatRun is exercised here, but the +// other named exports must exist so ai-chat-query.ts imports resolve. +vi.mock("@/features/ai-chat/services/ai-chat-service.ts", () => ({ + getAiChatRun: vi.fn(), + getAiChatMessages: vi.fn(), + getAiChats: vi.fn(), + getAiRoleCatalog: vi.fn(), + getAiRoleCatalogBundle: vi.fn(), + getAiRoles: vi.fn(), + importAiRolesFromCatalog: vi.fn(), + createAiRole: vi.fn(), + deleteAiChat: vi.fn(), + deleteAiRole: vi.fn(), + renameAiChat: vi.fn(), + updateAiRole: vi.fn(), + updateAiRoleFromCatalog: vi.fn(), +})); + +import { getAiChatRun } from "@/features/ai-chat/services/ai-chat-service.ts"; +import { useAiChatRunQuery } from "@/features/ai-chat/queries/ai-chat-query.ts"; + +function createWrapper() { + const queryClient = new QueryClient({ + defaultOptions: { queries: { retry: false } }, + }); + return function Wrapper({ children }: { children: React.ReactNode }) { + return ( + {children} + ); + }; +} + +const runningResponse: IAiChatRunResponse = { + run: { id: "run-1", chatId: "c1", status: "running" }, + message: { + id: "a1", + role: "assistant", + content: "working...", + createdAt: "2026-01-01T00:00:00Z", + }, +}; + +describe("useAiChatRunQuery — enable gating", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("fetches the run when enabled (passive observer, feature on)", async () => { + vi.mocked(getAiChatRun).mockResolvedValue(runningResponse); + const { result } = renderHook(() => useAiChatRunQuery("c1", true), { + wrapper: createWrapper(), + }); + await waitFor(() => expect(result.current.isSuccess).toBe(true)); + expect(getAiChatRun).toHaveBeenCalledWith("c1"); + expect(result.current.data?.run?.status).toBe("running"); + }); + + it("does NOT fetch when disabled (this tab is the streamer / feature off)", async () => { + vi.mocked(getAiChatRun).mockResolvedValue(runningResponse); + renderHook(() => useAiChatRunQuery("c1", false), { + wrapper: createWrapper(), + }); + // Give any errant fetch a chance to fire, then assert none did. + await new Promise((r) => setTimeout(r, 20)); + expect(getAiChatRun).not.toHaveBeenCalled(); + }); + + it("does NOT fetch when there is no chat id", async () => { + vi.mocked(getAiChatRun).mockResolvedValue(runningResponse); + renderHook(() => useAiChatRunQuery(undefined, true), { + wrapper: createWrapper(), + }); + await new Promise((r) => setTimeout(r, 20)); + expect(getAiChatRun).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/client/src/features/ai-chat/services/ai-chat-service.ts b/apps/client/src/features/ai-chat/services/ai-chat-service.ts index 2e1528c8..025e8069 100644 --- a/apps/client/src/features/ai-chat/services/ai-chat-service.ts +++ b/apps/client/src/features/ai-chat/services/ai-chat-service.ts @@ -5,6 +5,7 @@ import { IAiChatListParams, IAiChatMessageRow, IAiChatMessagesParams, + IAiChatRunResponse, IAiRole, IAiRoleCatalog, IAiRoleCatalogBundle, @@ -42,6 +43,38 @@ export async function getAiChatMessages( return req.data; } +/** + * Reconnect to the latest agent run of a chat (#184). Returns the run's + * persisted lifecycle state and the assistant message it materializes (the + * partial output while the run is in-flight, the final output once it finished). + * The DB is the source of truth, so this works for an in-flight run (the browser + * dropped, the run kept going) and a finished one alike; `{ run: null }` when the + * chat has never had a run. Owner-gated server-side (the requesting user must own + * the chat); it is NOT flag-gated — when the feature is off the chat simply has no + * runs, so the endpoint returns `{ run: null }`. + */ +export async function getAiChatRun( + chatId: string, +): Promise { + const req = await api.post("/ai-chat/run", { chatId }); + return req.data; +} + +/** + * Explicitly STOP the active agent run of a chat (#184). This is the ONLY thing + * that ends a DETACHED run — a mere browser disconnect (aborting the local SSE) + * is deliberately ignored server-side, so the client must call this to actually + * stop an autonomous run. Targeted by `chatId` (the server resolves whatever run + * is active on it); owner-gated server-side. Returns `{ stopped }` — false when + * there was nothing active to stop. + */ +export async function stopRun( + chatId: string, +): Promise<{ stopped: boolean }> { + const req = await api.post<{ stopped: boolean }>("/ai-chat/stop", { chatId }); + return req.data; +} + /** * Resolve the chat bound to a document (the current user's most-recent chat * created on that page), or null when there is none. Drives auto-open-on-page. diff --git a/apps/client/src/features/ai-chat/types/ai-chat.types.ts b/apps/client/src/features/ai-chat/types/ai-chat.types.ts index 78b049c0..8ed54ab2 100644 --- a/apps/client/src/features/ai-chat/types/ai-chat.types.ts +++ b/apps/client/src/features/ai-chat/types/ai-chat.types.ts @@ -200,6 +200,38 @@ export interface IAiChatMessageRow { createdAt: string; } +/** + * A persisted agent-run row (#184), mirroring the `ai_chat_runs` fields the + * client reads from `POST /ai-chat/run`. Only `status` is load-bearing for the + * reconnect-and-live-update UX (it drives the poll cadence); the rest are carried + * for display/diagnostics. The DB is the source of truth, so this resolves for an + * in-flight run (the browser dropped, the run kept going) and a finished one. + */ +export interface IAiChatRun { + id: string; + chatId: string; + // 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'. The first two are + // ACTIVE (keep polling); the rest are TERMINAL (stop polling). + status: "pending" | "running" | "succeeded" | "failed" | "aborted" | string; + error?: string | null; + stepCount?: number; + assistantMessageId?: string | null; + startedAt?: string | null; + finishedAt?: string | null; + createdAt?: string; + updatedAt?: string; +} + +/** + * Response of `POST /ai-chat/run` (#184): the latest run of a chat and the + * assistant message it materializes (the partial/final output, projected from the + * persisted rows). Both are `null` when the chat has never had a run. + */ +export interface IAiChatRunResponse { + run: IAiChatRun | null; + message: IAiChatMessageRow | null; +} + export interface IAiChatListParams extends QueryParams {} export interface IAiChatMessagesParams { diff --git a/apps/client/src/features/ai-chat/utils/run-polling.test.ts b/apps/client/src/features/ai-chat/utils/run-polling.test.ts new file mode 100644 index 00000000..b93f08fb --- /dev/null +++ b/apps/client/src/features/ai-chat/utils/run-polling.test.ts @@ -0,0 +1,303 @@ +import { describe, it, expect } from "vitest"; +import type { UIMessage } from "@ai-sdk/react"; +import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts"; +import { + RUN_POLL_INTERVAL_MS, + isRunActive, + runPollInterval, + shouldObserveRun, + shouldClearStoppingLatch, + shouldClearLatchOnQueryError, + mergeObservedMessage, +} from "./run-polling.ts"; + +function makeRun(status: string): IAiChatRun { + return { id: "run-1", chatId: "c1", status }; +} + +function makeMsg(id: string, text: string): UIMessage { + return { + id, + role: "assistant", + parts: [{ type: "text", text }], + } as UIMessage; +} + +describe("isRunActive", () => { + it("treats pending and running as active", () => { + expect(isRunActive(makeRun("pending"))).toBe(true); + expect(isRunActive(makeRun("running"))).toBe(true); + }); + + it("treats terminal / unknown / nullish as not active", () => { + expect(isRunActive(makeRun("succeeded"))).toBe(false); + expect(isRunActive(makeRun("failed"))).toBe(false); + expect(isRunActive(makeRun("aborted"))).toBe(false); + expect(isRunActive(makeRun("weird-future-status"))).toBe(false); + expect(isRunActive(null)).toBe(false); + expect(isRunActive(undefined)).toBe(false); + }); +}); + +describe("runPollInterval (the refetchInterval helper)", () => { + it("returns 2000ms while the run is pending/running", () => { + expect(runPollInterval(makeRun("pending"))).toBe(RUN_POLL_INTERVAL_MS); + expect(runPollInterval(makeRun("running"))).toBe(RUN_POLL_INTERVAL_MS); + expect(RUN_POLL_INTERVAL_MS).toBe(2000); + }); + + it("returns false (stop polling) once the run is terminal", () => { + expect(runPollInterval(makeRun("succeeded"))).toBe(false); + expect(runPollInterval(makeRun("failed"))).toBe(false); + expect(runPollInterval(makeRun("aborted"))).toBe(false); + }); + + it("returns false (no polling) when there is no run", () => { + expect(runPollInterval(null)).toBe(false); + expect(runPollInterval(undefined)).toBe(false); + }); +}); + +describe("shouldObserveRun (observer-vs-streamer decision)", () => { + it("observes an active run when this tab is NOT the local streamer", () => { + expect(shouldObserveRun(makeRun("running"), false)).toBe(true); + expect(shouldObserveRun(makeRun("pending"), false)).toBe(true); + }); + + it("observes a terminal run too (so the final output shows on reopen)", () => { + expect(shouldObserveRun(makeRun("succeeded"), false)).toBe(true); + }); + + it("does NOT observe when this tab IS the streamer (no double-render)", () => { + expect(shouldObserveRun(makeRun("running"), true)).toBe(false); + expect(shouldObserveRun(makeRun("succeeded"), true)).toBe(false); + }); + + it("does NOT observe when there is no run", () => { + expect(shouldObserveRun(null, false)).toBe(false); + expect(shouldObserveRun(undefined, false)).toBe(false); + }); +}); + +describe("shouldClearStoppingLatch (#234 latch-release decision)", () => { + // The one case the latch SHOULD clear: we requested a stop, we are the passive + // observer (not streaming), and the CURRENT run is terminal. + it("clears only when stopping, observing, and the run is terminal", () => { + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: makeRun("aborted"), + isLocalStreaming: false, + }), + ).toBe(true); + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: makeRun("succeeded"), + isLocalStreaming: false, + }), + ).toBe(true); + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: makeRun("failed"), + isLocalStreaming: false, + }), + ).toBe(true); + }); + + // Round-3 regression: clearing while THIS tab is still the local streamer would + // re-open the flash for the current turn the moment we switch to observer role. + // A predicate lacking the streaming gate would (wrongly) return true here. + it("does NOT clear while this tab is the local streamer", () => { + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: makeRun("aborted"), + isLocalStreaming: true, + }), + ).toBe(false); + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: makeRun("succeeded"), + isLocalStreaming: true, + }), + ).toBe(false); + }); + + // The detached run keeps growing after a local abort — while it is still + // active the latch MUST hold so the observer merge stays suppressed. + it("does NOT clear while the run is still active", () => { + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: makeRun("running"), + isLocalStreaming: false, + }), + ).toBe(false); + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: makeRun("pending"), + isLocalStreaming: false, + }), + ).toBe(false); + }); + + // #234 F4: on Stop the stale PREVIOUS-turn run is removed from the cache, so the + // observed `run` is null until the current turn's run is fetched fresh. A null + // run HOLDS the latch — it can never clear against the just-removed stale run, + // only against the current turn's own terminal run once observed. + it("does NOT clear against a removed/absent run (F4 stale-run guard)", () => { + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: null, + isLocalStreaming: false, + }), + ).toBe(false); + expect( + shouldClearStoppingLatch({ + stoppingRun: true, + run: undefined, + isLocalStreaming: false, + }), + ).toBe(false); + }); + + it("does NOT clear when no stop was requested", () => { + expect( + shouldClearStoppingLatch({ + stoppingRun: false, + run: makeRun("aborted"), + isLocalStreaming: false, + }), + ).toBe(false); + }); +}); + +describe("shouldClearLatchOnQueryError (#234 F7 error-safety-net decision)", () => { + // This guards the REAL anti-flash decision the component's run-query-error + // safety-net effect uses (ai-chat-window.tsx wires the effect to THIS helper, + // not a copy — so the test is non-vacuous vs the live code). + + // (b) The F7 hole: a TRANSIENT run-query error while `run` is STILL ACTIVE must + // NOT clear the latch. TanStack Query v5 retains `data` on error, so + // runQueryFailed can be true while the held run is still pending/running. + // Against the PRE-F7 condition (without `!isRunActive(run)`) this would return + // true — so this assertion fails on the buggy code (non-vacuous). + it("does NOT clear on a transient error while the run is still ACTIVE (F7)", () => { + expect( + shouldClearLatchOnQueryError({ + stoppingRun: true, + isLocalStreaming: false, + runQueryFailed: true, + run: makeRun("running"), + }), + ).toBe(false); + expect( + shouldClearLatchOnQueryError({ + stoppingRun: true, + isLocalStreaming: false, + runQueryFailed: true, + run: makeRun("pending"), + }), + ).toBe(false); + }); + + // (a) The genuine permanent-null-freeze: run cache cleared by removeQueries + + // the refetch keeps ERRORING, so `run === null`. This is the ONLY case the + // safety-net exists to cure — it MUST clear so the frozen view resumes. + it("clears on a permanent error when the run is null (permanent-null-freeze)", () => { + expect( + shouldClearLatchOnQueryError({ + stoppingRun: true, + isLocalStreaming: false, + runQueryFailed: true, + run: null, + }), + ).toBe(true); + expect( + shouldClearLatchOnQueryError({ + stoppingRun: true, + isLocalStreaming: false, + runQueryFailed: true, + run: undefined, + }), + ).toBe(true); + }); + + // A TERMINAL run also satisfies `!isRunActive`; clearing then is harmless — the + // terminal effect (shouldClearStoppingLatch) already clears for a terminal run, + // so this only ever agrees with it. Asserted so the (c) reasoning is pinned. + it("clears on an error when the run is terminal (harmless, agrees with terminal effect)", () => { + expect( + shouldClearLatchOnQueryError({ + stoppingRun: true, + isLocalStreaming: false, + runQueryFailed: true, + run: makeRun("aborted"), + }), + ).toBe(true); + }); + + it("does NOT clear without an actual query error", () => { + expect( + shouldClearLatchOnQueryError({ + stoppingRun: true, + isLocalStreaming: false, + runQueryFailed: false, + run: null, + }), + ).toBe(false); + }); + + it("does NOT clear while this tab is the local streamer", () => { + expect( + shouldClearLatchOnQueryError({ + stoppingRun: true, + isLocalStreaming: true, + runQueryFailed: true, + run: null, + }), + ).toBe(false); + }); + + it("does NOT clear when no stop was requested", () => { + expect( + shouldClearLatchOnQueryError({ + stoppingRun: false, + isLocalStreaming: false, + runQueryFailed: true, + run: null, + }), + ).toBe(false); + }); +}); + +describe("mergeObservedMessage", () => { + it("replaces the message with the same id in place (per-step growth)", () => { + const prev = [makeMsg("u1", "hi"), makeMsg("a1", "step 1")]; + const observed = makeMsg("a1", "step 1\nstep 2"); + const next = mergeObservedMessage(prev, observed); + expect(next).toHaveLength(2); + expect(next[1]).toBe(observed); + expect(next[0]).toBe(prev[0]); // untouched + expect(next).not.toBe(prev); // new array (never mutates input) + }); + + it("appends when the observed message is not yet present", () => { + const prev = [makeMsg("u1", "hi")]; + const observed = makeMsg("a1", "first token"); + const next = mergeObservedMessage(prev, observed); + expect(next).toHaveLength(2); + expect(next[1]).toBe(observed); + }); + + it("returns the original list unchanged when there is nothing to merge", () => { + const prev = [makeMsg("u1", "hi")]; + expect(mergeObservedMessage(prev, null)).toBe(prev); + expect(mergeObservedMessage(prev, undefined)).toBe(prev); + }); +}); diff --git a/apps/client/src/features/ai-chat/utils/run-polling.ts b/apps/client/src/features/ai-chat/utils/run-polling.ts new file mode 100644 index 00000000..560b675f --- /dev/null +++ b/apps/client/src/features/ai-chat/utils/run-polling.ts @@ -0,0 +1,151 @@ +import type { UIMessage } from "@ai-sdk/react"; +import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts"; + +/** + * Reconnect-and-live-follow helpers (#184). When a chat is reopened while its + * agent run is STILL going, this tab is a PASSIVE OBSERVER: it did not start the + * run here (no local SSE stream), so it catches up by POLLING the reconnect + * endpoint (`POST /ai-chat/run`) and merging the run's incrementally-persisted + * assistant message into the rendered thread. These are the small pure decisions + * that machinery hangs off, extracted so they can be unit-tested in isolation + * (mirrors how reindex polling / editor-sync-state are tested). + */ + +/** How often to re-poll the reconnect endpoint while a run is ACTIVE. */ +export const RUN_POLL_INTERVAL_MS = 2000; + +// 'pending' and 'running' are the two ACTIVE statuses; 'succeeded' | 'failed' | +// 'aborted' are TERMINAL (and any unknown future status is treated as terminal, +// so a stale/odd value never polls forever). +const ACTIVE_STATUSES = new Set(["pending", "running"]); + +/** Whether a run is still going (worth polling / merging live updates from). */ +export function isRunActive(run: IAiChatRun | null | undefined): boolean { + return !!run && ACTIVE_STATUSES.has(run.status); +} + +/** + * The TanStack Query `refetchInterval` value for the run query: poll every + * {@link RUN_POLL_INTERVAL_MS} while the run is active, and `false` (stop) once + * it is terminal or there is no run. Polling is thus naturally bounded by the run + * reaching a terminal status — no separate timeout cap is needed. + */ +export function runPollInterval( + run: IAiChatRun | null | undefined, +): number | false { + return isRunActive(run) ? RUN_POLL_INTERVAL_MS : false; +} + +/** + * Observer-vs-streamer decision. We render the polled run message (catch up + + * keep advancing) ONLY when this tab is a passive observer: there IS a run AND + * this tab is NOT the one locally streaming it (we reconnected, we didn't start + * it here). When this tab is the streamer, the live SSE stream owns the view, so + * we neither poll nor merge — avoiding a double-render fight. Terminal runs still + * merge (so the final persisted output is shown on reopen); the poll itself is + * stopped separately by {@link runPollInterval}. + */ +export function shouldObserveRun( + run: IAiChatRun | null | undefined, + localStreaming: boolean, +): boolean { + return !!run && !localStreaming; +} + +/** + * Should the "stopping" latch — which suppresses the observer re-stream flash + * after the user pressed Stop — be RELEASED now? All three must hold: + * - `stoppingRun`: we actually requested a stop (otherwise nothing to release); + * - `!isLocalStreaming`: this tab is NOT the local streamer. While we are the + * streamer the run query is disabled, so the observed `run` is not the run we + * are following — releasing the latch then would re-open the flash for the + * current turn the instant we switch to observer role; + * - the observed `run` EXISTS and has reached a TERMINAL status. + * + * The null / still-active `run` case is the #234 F4 invariant. On Stop the stale + * PREVIOUS-turn run is removed from the query cache (`removeQueries`), so `run` + * is null until the CURRENT turn's run is re-fetched fresh; a null or active run + * therefore HOLDS the latch, so it can only ever clear against the current turn's + * OWN terminal run — never a stale cached one. (The cache removal itself is + * integration-level in AiChatWindow; this predicate encodes the decision given + * whatever run is currently observed, and a stale terminal run is + * indistinguishable from a current terminal run at the predicate level — hence + * the cache removal is what guarantees only the current run is ever passed here.) + */ +export function shouldClearStoppingLatch(args: { + stoppingRun: boolean; + run: IAiChatRun | null | undefined; + isLocalStreaming: boolean; +}): boolean { + const { stoppingRun, run, isLocalStreaming } = args; + if (!stoppingRun || isLocalStreaming) return false; + return !!run && !isRunActive(run); +} + +/** + * Should the "stopping" latch be RELEASED by the run-query ERROR safety-net? + * (#234 F7 — a NEW path of the same re-stream flash the F4 latch exists to + * prevent.) After Stop, `handleServerStop` clears the run cache; the terminal + * effect then holds the latch via `if (!run) return` until the CURRENT turn's run + * is fetched fresh. If that refetch instead ERRORS permanently, `run` stays null, + * its status-keyed refetchInterval is off, and nothing would ever observe a + * terminal run — freezing the view with the observer merge suppressed. This + * safety-net cures ONLY that genuine permanent-null-freeze. + * + * All four must hold: + * - `stoppingRun`: we actually requested a stop (otherwise nothing to release); + * - `!isLocalStreaming`: this tab is NOT the local streamer (same reason as + * {@link shouldClearStoppingLatch}); + * - `runQueryFailed`: the run query is in its error state (TanStack Query v5 with + * retry:false — isError); + * - `!isRunActive(run)`: the observed `run` is NOT an active (pending/running) + * held run. This is the F7 gate. In TanStack Query v5 the query's `data` is + * RETAINED on error, so `runQueryFailed` can be true while `run` is STILL an + * ACTIVE run (a single transient GET-run failure in the window between Stop and + * settle). Without this gate a transient error would release the latch early — + * re-opening the observer merge and flashing the growing detached run over the + * frozen row (exactly the F4 flash). Gating on the run NOT being active means we + * only ever cure the permanent-null-freeze (`run === null`, so + * `isRunActive(null)` is false), never release against an active run. + * + * (A terminal `run` also satisfies `!isRunActive(run)`; clearing then is harmless + * — the terminal effect's {@link shouldClearStoppingLatch} already clears the + * latch for a terminal run, so this only ever agrees with it, never conflicts.) + * + * INVARIANT (do not break): clearing the latch on the `run === null` branch is safe + * ONLY because the run query's `refetchInterval` (see {@link runPollInterval}) stops + * polling when the data is empty — so after we clear on null+error there is no + * subsequent auto-poll that could return a still-active detached run and re-open the + * merge. If `refetchInterval` is ever changed to keep polling on `run === null`/on + * error, this null-branch clear would re-open the F7 flash through the null path. + * Do not change the run query's refetchInterval without re-checking this path. + */ +export function shouldClearLatchOnQueryError(args: { + stoppingRun: boolean; + isLocalStreaming: boolean; + runQueryFailed: boolean; + run: IAiChatRun | null | undefined; +}): boolean { + const { stoppingRun, isLocalStreaming, runQueryFailed, run } = args; + return ( + stoppingRun && !isLocalStreaming && runQueryFailed && !isRunActive(run) + ); +} + +/** + * Merge an observed assistant message into the rendered list: replace the message + * with the same id in place (the in-progress assistant row is already seeded from + * history, so per-step growth replaces it), or append it when absent. Returns a + * new array; the input is never mutated. + */ +export function mergeObservedMessage( + messages: UIMessage[], + observed: UIMessage | null | undefined, +): UIMessage[] { + if (!observed) return messages; + const idx = messages.findIndex((m) => m.id === observed.id); + if (idx === -1) return [...messages, observed]; + const next = messages.slice(); + next[idx] = observed; + return next; +} diff --git a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx index 6e7cb185..1a0024ac 100644 --- a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx +++ b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx @@ -394,6 +394,10 @@ export default function AiProviderSettings() { useState( workspace?.settings?.ai?.publicShareAssistant ?? false, ); + // #184: detached/autonomous agent runs (settings.ai.autonomousRuns). + const [autonomousRunsEnabled, setAutonomousRunsEnabled] = useState( + workspace?.settings?.ai?.autonomousRuns ?? false, + ); const [chatToggleLoading, setChatToggleLoading] = useState(false); const [searchToggleLoading, setSearchToggleLoading] = useState(false); const [dictationToggleLoading, setDictationToggleLoading] = useState(false); @@ -403,6 +407,8 @@ export default function AiProviderSettings() { publicShareAssistantToggleLoading, setPublicShareAssistantToggleLoading, ] = useState(false); + const [autonomousRunsToggleLoading, setAutonomousRunsToggleLoading] = + useState(false); // Whether a key is currently stored server-side (drives the placeholder). const [hasApiKey, setHasApiKey] = useState(false); @@ -730,6 +736,37 @@ export default function AiProviderSettings() { } } + // Optimistic toggle for detached/autonomous agent runs + // (settings.ai.autonomousRuns). When on, a chat turn becomes a server-side run + // that survives a browser disconnect and can be reconnected to / live-followed; + // only an explicit Stop ends it. Off by default; single-instance-only in phase 1. + async function handleToggleAutonomousRuns(value: boolean) { + setAutonomousRunsToggleLoading(true); + const previous = autonomousRunsEnabled; + setAutonomousRunsEnabled(value); + try { + const updated = await updateWorkspace({ autonomousRuns: value }); + setWorkspace({ + ...updated, + settings: { + ...updated.settings, + ai: { ...updated.settings?.ai, autonomousRuns: value }, + }, + }); + notifications.show({ message: t("Updated successfully") }); + } catch (err) { + setAutonomousRunsEnabled(previous); + const message = (err as { response?: { data?: { message?: string } } }) + ?.response?.data?.message; + notifications.show({ + message: message ?? t("Failed to update data"), + color: "red", + }); + } finally { + setAutonomousRunsToggleLoading(false); + } + } + // Admins only — match the previous behavior. if (!isAdmin) { return ( @@ -960,6 +997,31 @@ export default function AiProviderSettings() { {...form.getInputProps("publicShareAssistantRoleId")} /> + {/* Detached/autonomous agent runs: a chat turn becomes a server-side run + that survives a browser disconnect; only an explicit Stop ends it. + Single-instance-only in phase 1. */} + + + + {t("Autonomous agent runs")} + + + {t( + "Keep an agent turn running server-side even if the browser disconnects; reconnect and follow it on reopen. Single-instance deployments only.", + )} + + + + handleToggleAutonomousRuns(e.currentTarget.checked) + } + /> + +