feat(ai-chat): live-follow a still-running run on chat reopen (#184)

Reopening a chat whose agent run is still going showed a frozen snapshot
from the moment it was opened. Add a passive-observer reconnect-poll path:
when this tab did NOT start the run locally, poll POST /ai-chat/run every
2s while the run is pending/running and merge its incrementally-persisted
assistant message into the thread, so new steps/tool-calls and the growing
text appear live. Polling stops on terminal status (refetchInterval keyed
on run.status, mirroring the reindex polling); a final messages invalidate
shows the persisted end state.

Observer-vs-streamer detection: ChatThread reports its local useChat
streaming status up; the window only polls/merges while NOT locally
streaming (the streamer's SSE owns the view — no double-render). Gated by
settings.ai.autonomousRuns; the query is disabled when the feature is off
so the flag-gated endpoint is never hit, and a failed fetch can't loop
(retry:false -> refetchInterval(undefined)=false).

Pure decisions (poll interval, observe gate, message merge) extracted to
run-polling.ts and unit-tested; added query enable-gating and ChatThread
observer-merge tests. Client-only change — the reconnect endpoint already
returns the run plus the assistant message with its metadata.parts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
a
2026-06-28 04:10:18 +03:00
parent 78b9905f11
commit 6900c30935
10 changed files with 506 additions and 2 deletions

View File

@@ -17,7 +17,7 @@ import {
IconPlus, IconPlus,
IconX, IconX,
} from "@tabler/icons-react"; } from "@tabler/icons-react";
import { useAtom, useSetAtom } from "jotai"; import { useAtom, useAtomValue, useSetAtom } from "jotai";
import { useMatch } from "react-router-dom"; import { useMatch } from "react-router-dom";
import { useTranslation } from "react-i18next"; import { useTranslation } from "react-i18next";
import { useQueryClient } from "@tanstack/react-query"; import { useQueryClient } from "@tanstack/react-query";
@@ -34,9 +34,12 @@ import {
AI_CHATS_RQ_KEY, AI_CHATS_RQ_KEY,
AI_CHAT_MESSAGES_RQ_KEY, AI_CHAT_MESSAGES_RQ_KEY,
useAiChatMessagesQuery, useAiChatMessagesQuery,
useAiChatRunQuery,
useAiChatsQuery, useAiChatsQuery,
useAiRolesQuery, useAiRolesQuery,
} from "@/features/ai-chat/queries/ai-chat-query.ts"; } from "@/features/ai-chat/queries/ai-chat-query.ts";
import { shouldObserveRun } from "@/features/ai-chat/utils/run-polling.ts";
import { workspaceAtom } from "@/features/user/atoms/current-user-atom";
import ConversationList from "@/features/ai-chat/components/conversation-list.tsx"; import ConversationList from "@/features/ai-chat/components/conversation-list.tsx";
import ChatThread from "@/features/ai-chat/components/chat-thread.tsx"; import ChatThread from "@/features/ai-chat/components/chat-thread.tsx";
import { exportAiChat } from "@/features/ai-chat/services/ai-chat-service.ts"; import { exportAiChat } from "@/features/ai-chat/services/ai-chat-service.ts";
@@ -162,6 +165,59 @@ export default function AiChatWindow() {
const { data: messageRows, isLoading: messagesLoading } = const { data: messageRows, isLoading: messagesLoading } =
useAiChatMessagesQuery(activeChatId ?? undefined); useAiChatMessagesQuery(activeChatId ?? undefined);
// #184 reconnect-and-live-follow. Whether detached agent runs are enabled for
// this workspace; the reconnect endpoint is flag-gated server-side, so we must
// not poll it when the feature is off.
const workspace = useAtomValue(workspaceAtom);
const autonomousRunsEnabled =
workspace?.settings?.ai?.autonomousRuns === true;
// Whether THIS tab is the one actively streaming the open chat's run locally
// (it started the run here and holds the SSE). Reported up from ChatThread. We
// are the STREAMER while true and a passive OBSERVER while false — the basis of
// the observer-vs-streamer detection. Reset to false by the fresh ChatThread's
// mount effect on every chat switch.
const [localStreaming, setLocalStreaming] = useState(false);
const onStreamingChange = useCallback((streaming: boolean) => {
setLocalStreaming(streaming);
}, []);
// Poll the latest run of the open chat ONLY when we are a passive observer:
// feature on, a chat is open, and we are NOT the local streamer (the streamer
// already has the live SSE — polling/merging too would double-render). The
// query's own status-keyed refetchInterval stops once the run is terminal.
const { data: runData } = useAiChatRunQuery(
activeChatId ?? undefined,
autonomousRunsEnabled && !localStreaming,
);
const run = runData?.run ?? null;
// The run's incrementally-persisted assistant message to merge into the thread,
// but only while we are an observer (never when we are the streamer — guards
// against a stale poll fighting the live stream). Includes a terminal run so the
// final persisted output is shown on reopen.
const observedRow = shouldObserveRun(run, localStreaming)
? (runData?.message ?? null)
: null;
// When the observed run reaches a terminal status, do a final messages refetch
// so the persisted final state (token/context badge, export source) is shown,
// then the query's refetchInterval has already stopped polling. Deduped per run
// id so it fires exactly once per run, not on every subsequent poll-less render.
const finalizedRunIdRef = useRef<string | null>(null);
useEffect(() => {
if (!run || !activeChatId) return;
if (run.status === "pending" || run.status === "running") {
// Active again (a new run) — re-arm so its terminal transition fires once.
finalizedRunIdRef.current = null;
return;
}
if (finalizedRunIdRef.current === run.id) return;
finalizedRunIdRef.current = run.id;
queryClient.invalidateQueries({
queryKey: AI_CHAT_MESSAGES_RQ_KEY(activeChatId),
});
}, [run, activeChatId, queryClient]);
// The page the user is currently viewing. AiChatWindow lives in a pathless // The page the user is currently viewing. AiChatWindow lives in a pathless
// parent layout route, so useParams() can't see :pageSlug. Match the full // parent layout route, so useParams() can't see :pageSlug. Match the full
// pathname against the authenticated page route instead so "the current page" // pathname against the authenticated page route instead so "the current page"
@@ -636,6 +692,12 @@ export default function AiChatWindow() {
assistantName={currentRole?.name} assistantName={currentRole?.name}
onTurnFinished={onTurnFinished} onTurnFinished={onTurnFinished}
onServerChatId={onServerChatId} onServerChatId={onServerChatId}
// #184: live-follow a still-running run when we reopened the chat as
// a passive observer; null when there is nothing to observe or this
// tab is the streamer. onStreamingChange lets the window stop polling
// while we are the streamer.
observedRow={observedRow}
onStreamingChange={onStreamingChange}
/> />
)} )}
</div> </div>

View File

@@ -11,6 +11,7 @@ const h = vi.hoisted(() => ({
onFinish: null as null | ((arg: Record<string, unknown>) => void), onFinish: null as null | ((arg: Record<string, unknown>) => void),
sendMessage: vi.fn(), sendMessage: vi.fn(),
stop: vi.fn(), stop: vi.fn(),
setMessages: vi.fn(),
transport: null as null | { transport: null as null | {
prepareSendMessagesRequest: (arg: { prepareSendMessagesRequest: (arg: {
messages: unknown[]; messages: unknown[];
@@ -30,6 +31,8 @@ vi.mock("@ai-sdk/react", () => ({
status: h.state.status, status: h.state.status,
stop: h.state.stop, stop: h.state.stop,
error: null, error: null,
// #184: ChatThread reads setMessages to merge a polled observer run.
setMessages: h.state.setMessages,
}; };
}, },
})); }));
@@ -140,3 +143,56 @@ describe("ChatThread — send now (#198)", () => {
expect(prep({ messages: [], body: {} }).body.interrupted).toBe(false); expect(prep({ messages: [], body: {} }).body.interrupted).toBe(false);
}); });
}); });
// #184 passive-observer merge: when reconnecting to a still-running run, the
// parent feeds the polled run message via `observedRow`; ChatThread merges it via
// setMessages — but ONLY when this tab is NOT itself streaming (the streamer's
// SSE owns the view, so a stale observedRow must never overwrite it).
describe("ChatThread — observer run merge (#184)", () => {
beforeEach(() => {
h.state.onFinish = null;
h.state.setMessages.mockReset();
});
const observedRow = {
id: "a-run",
role: "assistant",
content: "step 1\nstep 2",
metadata: {
parts: [{ type: "text", text: "step 1\nstep 2" }],
},
createdAt: "2026-01-01T00:00:00Z",
} as const;
function renderObserver(status: string) {
h.state.status = status;
render(
<MantineProvider>
<ChatThread
chatId="c1"
initialRows={[]}
onTurnFinished={vi.fn()}
observedRow={observedRow as never}
/>
</MantineProvider>,
);
}
it("merges the polled run message when this tab is a passive observer", () => {
renderObserver("ready");
expect(h.state.setMessages).toHaveBeenCalledTimes(1);
// The updater replaces/append the observed assistant row by id.
const updater = h.state.setMessages.mock.calls[0][0] as (
prev: { id: string; parts: { text: string }[] }[],
) => { id: string; parts: { text: string }[] }[];
const merged = updater([{ id: "u1", parts: [{ text: "hi" }] }]);
expect(merged).toHaveLength(2);
expect(merged[1].id).toBe("a-run");
expect(merged[1].parts[0].text).toBe("step 1\nstep 2");
});
it("does NOT merge while THIS tab is the streamer (no double-render)", () => {
renderObserver("streaming");
expect(h.state.setMessages).not.toHaveBeenCalled();
});
});

View File

@@ -24,6 +24,7 @@ import {
} from "@/features/ai-chat/utils/role-launch.ts"; } from "@/features/ai-chat/utils/role-launch.ts";
import { describeChatError } from "@/features/ai-chat/utils/error-message.ts"; import { describeChatError } from "@/features/ai-chat/utils/error-message.ts";
import { extractServerChatId } from "@/features/ai-chat/utils/adopt-chat-id.ts"; import { extractServerChatId } from "@/features/ai-chat/utils/adopt-chat-id.ts";
import { mergeObservedMessage } from "@/features/ai-chat/utils/run-polling.ts";
import { import {
dequeue, dequeue,
enqueueMessage, enqueueMessage,
@@ -86,6 +87,19 @@ interface ChatThreadProps {
* Copy/export button available mid-stream). Distinct from onTurnFinished, * Copy/export button available mid-stream). Distinct from onTurnFinished,
* which fires only at the terminal outcome. */ * which fires only at the terminal outcome. */
onServerChatId?: (serverChatId?: string) => void; onServerChatId?: (serverChatId?: string) => void;
/** #184 reconnect-and-live-follow. When THIS tab reopened a chat whose agent
* run is still going (it is a PASSIVE OBSERVER — it did not start the run here),
* the parent polls the reconnect endpoint and feeds the run's incrementally-
* persisted assistant message here; we merge it into the live list so new
* steps/tool-calls appear as they are persisted. Null when there is nothing to
* observe (no run, feature off, or this tab IS the streamer). The merge is
* ADDITIONALLY guarded by our own `isStreaming`, so a stale value can never
* fight the local stream when we are the streamer. */
observedRow?: IAiChatMessageRow | null;
/** Report this tab's live streaming status up to the parent, so it can stop
* polling the run while WE are the active streamer (the SSE owns the view) and
* resume once we go idle. Called from an effect on every transition. */
onStreamingChange?: (streaming: boolean) => void;
} }
/** /**
@@ -131,6 +145,8 @@ export default function ChatThread({
assistantName, assistantName,
onTurnFinished, onTurnFinished,
onServerChatId, onServerChatId,
observedRow,
onStreamingChange,
}: ChatThreadProps) { }: ChatThreadProps) {
const { t } = useTranslation(); const { t } = useTranslation();
@@ -274,7 +290,7 @@ export default function ChatThread({
[], [],
); );
const { messages, sendMessage, status, stop, error } = useChat({ const { messages, sendMessage, status, stop, error, setMessages } = useChat({
// Stable per-mount key. Existing chats use their real id; new chats use a // Stable per-mount key. Existing chats use their real id; new chats use a
// generated client id (never `undefined`) so the store is NOT re-created on // generated client id (never `undefined`) so the store is NOT re-created on
// every render mid-stream (see `chatStoreId` above). // every render mid-stream (see `chatStoreId` above).
@@ -378,6 +394,27 @@ export default function ChatThread({
const isStreaming = status === "submitted" || status === "streaming"; const isStreaming = status === "submitted" || status === "streaming";
// #184: report our live streaming status up so the parent stops polling the run
// while WE are the streamer (the SSE owns the view) and resumes once we go idle.
// Effect (not render) so it never updates parent state during our own render;
// fires on mount with `false`, which also re-syncs the parent after a chat
// switch remounts this thread (a fresh mount is idle until the user sends).
useEffect(() => {
onStreamingChange?.(isStreaming);
}, [isStreaming, onStreamingChange]);
// #184 passive-observer merge: when the parent feeds a polled run message (we
// reopened a chat whose run is still going and did NOT start it here), merge it
// into the live list so new steps/tool-calls appear as they are persisted. Hard-
// gated by `!isStreaming`: if THIS tab is actually the streamer, the local SSE
// owns the view and a stale observedRow must never overwrite it. `observedRow`
// is a stable per-poll object, so this runs once per poll, not per render.
useEffect(() => {
if (isStreaming || !observedRow) return;
const observed = rowToUiMessage(observedRow);
setMessages((prev) => mergeObservedMessage(prev, observed));
}, [observedRow, isStreaming, setMessages]);
// "Send now" on a queued message: interrupt the current turn and immediately // "Send now" on a queued message: interrupt the current turn and immediately
// send THIS message, keeping the agent's partial output. Other queued messages // send THIS message, keeping the agent's partial output. Other queued messages
// stay queued and flush normally after the new turn. Reuses the existing // stay queued and flush normally after the new turn. Reuses the existing

View File

@@ -12,6 +12,7 @@ import {
deleteAiChat, deleteAiChat,
deleteAiRole, deleteAiRole,
getAiChatMessages, getAiChatMessages,
getAiChatRun,
getAiChats, getAiChats,
getAiRoleCatalog, getAiRoleCatalog,
getAiRoleCatalogBundle, getAiRoleCatalogBundle,
@@ -24,6 +25,7 @@ import {
import { import {
IAiChat, IAiChat,
IAiChatMessageRow, IAiChatMessageRow,
IAiChatRunResponse,
IAiRole, IAiRole,
IAiRoleCatalog, IAiRoleCatalog,
IAiRoleCatalogBundle, IAiRoleCatalogBundle,
@@ -34,6 +36,7 @@ import {
IAiRoleUpdateFromCatalogResult, IAiRoleUpdateFromCatalogResult,
} from "@/features/ai-chat/types/ai-chat.types.ts"; } from "@/features/ai-chat/types/ai-chat.types.ts";
import { IPagination } from "@/lib/types.ts"; import { IPagination } from "@/lib/types.ts";
import { runPollInterval } from "@/features/ai-chat/utils/run-polling.ts";
export const AI_CHATS_RQ_KEY = ["ai-chats"]; export const AI_CHATS_RQ_KEY = ["ai-chats"];
export const AI_ROLES_RQ_KEY = ["ai-roles"]; export const AI_ROLES_RQ_KEY = ["ai-roles"];
@@ -51,6 +54,7 @@ export const AI_CHAT_MESSAGES_RQ_KEY = (chatId: string) => [
"ai-chat-messages", "ai-chat-messages",
chatId, chatId,
]; ];
export const AI_CHAT_RUN_RQ_KEY = (chatId: string) => ["ai-chat-run", chatId];
/** Paginated list of the current user's chats (auto-loads further pages). */ /** Paginated list of the current user's chats (auto-loads further pages). */
export function useAiChatsQuery() { export function useAiChatsQuery() {
@@ -131,6 +135,33 @@ export function useAiChatMessagesQuery(chatId: string | undefined) {
}; };
} }
/**
* Reconnect to a chat's latest agent run and LIVE-FOLLOW it (#184). While the run
* is active the query re-polls every {@link runPollInterval} ms (driven off the
* fetched `run.status`, the same status-keyed refetchInterval pattern as the
* embeddings reindex polling); once the run reaches a terminal status — or there
* is no run — the interval returns `false` and polling stops on its own. Polling
* is thus naturally bounded by the run terminating; no separate timeout cap.
*
* `enabled` gates the whole thing: callers pass `false` when the autonomous-runs
* feature is off (the endpoint is flag-gated server-side and would 403) OR when
* THIS tab is the one actively streaming the run (the live SSE owns the view, so
* we must not also poll/merge). The global `retry: false` means a 403/anything
* leaves `data` undefined, so refetchInterval(undefined run) returns false — a
* failed fetch can never spin a tight loop.
*/
export function useAiChatRunQuery(
chatId: string | undefined,
enabled: boolean,
) {
return useQuery<IAiChatRunResponse, Error>({
queryKey: AI_CHAT_RUN_RQ_KEY(chatId ?? ""),
queryFn: () => getAiChatRun(chatId as string),
enabled: !!chatId && enabled,
refetchInterval: (query) => runPollInterval(query.state.data?.run),
});
}
export function useRenameAiChatMutation() { export function useRenameAiChatMutation() {
const queryClient = useQueryClient(); const queryClient = useQueryClient();
const { t } = useTranslation(); const { t } = useTranslation();

View File

@@ -0,0 +1,92 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import React from "react";
import { renderHook, waitFor } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import type { IAiChatRunResponse } from "@/features/ai-chat/types/ai-chat.types.ts";
// react-i18next is pulled in transitively by ai-chat-query.ts (the mutation hooks
// use it); stub it so the module imports cleanly in this hook test.
vi.mock("react-i18next", () => ({
useTranslation: () => ({ t: (key: string) => key }),
}));
vi.mock("@mantine/notifications", () => ({
notifications: { show: vi.fn() },
}));
// Mock the whole service module; only getAiChatRun is exercised here, but the
// other named exports must exist so ai-chat-query.ts imports resolve.
vi.mock("@/features/ai-chat/services/ai-chat-service.ts", () => ({
getAiChatRun: vi.fn(),
getAiChatMessages: vi.fn(),
getAiChats: vi.fn(),
getAiRoleCatalog: vi.fn(),
getAiRoleCatalogBundle: vi.fn(),
getAiRoles: vi.fn(),
importAiRolesFromCatalog: vi.fn(),
createAiRole: vi.fn(),
deleteAiChat: vi.fn(),
deleteAiRole: vi.fn(),
renameAiChat: vi.fn(),
updateAiRole: vi.fn(),
updateAiRoleFromCatalog: vi.fn(),
}));
import { getAiChatRun } from "@/features/ai-chat/services/ai-chat-service.ts";
import { useAiChatRunQuery } from "@/features/ai-chat/queries/ai-chat-query.ts";
function createWrapper() {
const queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
});
return function Wrapper({ children }: { children: React.ReactNode }) {
return (
<QueryClientProvider client={queryClient}>{children}</QueryClientProvider>
);
};
}
const runningResponse: IAiChatRunResponse = {
run: { id: "run-1", chatId: "c1", status: "running" },
message: {
id: "a1",
role: "assistant",
content: "working...",
createdAt: "2026-01-01T00:00:00Z",
},
};
describe("useAiChatRunQuery — enable gating", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("fetches the run when enabled (passive observer, feature on)", async () => {
vi.mocked(getAiChatRun).mockResolvedValue(runningResponse);
const { result } = renderHook(() => useAiChatRunQuery("c1", true), {
wrapper: createWrapper(),
});
await waitFor(() => expect(result.current.isSuccess).toBe(true));
expect(getAiChatRun).toHaveBeenCalledWith("c1");
expect(result.current.data?.run?.status).toBe("running");
});
it("does NOT fetch when disabled (this tab is the streamer / feature off)", async () => {
vi.mocked(getAiChatRun).mockResolvedValue(runningResponse);
renderHook(() => useAiChatRunQuery("c1", false), {
wrapper: createWrapper(),
});
// Give any errant fetch a chance to fire, then assert none did.
await new Promise((r) => setTimeout(r, 20));
expect(getAiChatRun).not.toHaveBeenCalled();
});
it("does NOT fetch when there is no chat id", async () => {
vi.mocked(getAiChatRun).mockResolvedValue(runningResponse);
renderHook(() => useAiChatRunQuery(undefined, true), {
wrapper: createWrapper(),
});
await new Promise((r) => setTimeout(r, 20));
expect(getAiChatRun).not.toHaveBeenCalled();
});
});

View File

@@ -5,6 +5,7 @@ import {
IAiChatListParams, IAiChatListParams,
IAiChatMessageRow, IAiChatMessageRow,
IAiChatMessagesParams, IAiChatMessagesParams,
IAiChatRunResponse,
IAiRole, IAiRole,
IAiRoleCatalog, IAiRoleCatalog,
IAiRoleCatalogBundle, IAiRoleCatalogBundle,
@@ -42,6 +43,21 @@ export async function getAiChatMessages(
return req.data; return req.data;
} }
/**
* Reconnect to the latest agent run of a chat (#184). Returns the run's
* persisted lifecycle state and the assistant message it materializes (the
* partial output while the run is in-flight, the final output once it finished).
* The DB is the source of truth, so this works for an in-flight run (the browser
* dropped, the run kept going) and a finished one alike; `{ run: null }` when the
* chat has never had a run. Owner-gated and flag-gated server-side.
*/
export async function getAiChatRun(
chatId: string,
): Promise<IAiChatRunResponse> {
const req = await api.post<IAiChatRunResponse>("/ai-chat/run", { chatId });
return req.data;
}
/** /**
* Resolve the chat bound to a document (the current user's most-recent chat * Resolve the chat bound to a document (the current user's most-recent chat
* created on that page), or null when there is none. Drives auto-open-on-page. * created on that page), or null when there is none. Drives auto-open-on-page.

View File

@@ -200,6 +200,38 @@ export interface IAiChatMessageRow {
createdAt: string; createdAt: string;
} }
/**
* A persisted agent-run row (#184), mirroring the `ai_chat_runs` fields the
* client reads from `POST /ai-chat/run`. Only `status` is load-bearing for the
* reconnect-and-live-update UX (it drives the poll cadence); the rest are carried
* for display/diagnostics. The DB is the source of truth, so this resolves for an
* in-flight run (the browser dropped, the run kept going) and a finished one.
*/
export interface IAiChatRun {
id: string;
chatId: string;
// 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'. The first two are
// ACTIVE (keep polling); the rest are TERMINAL (stop polling).
status: "pending" | "running" | "succeeded" | "failed" | "aborted" | string;
error?: string | null;
stepCount?: number;
assistantMessageId?: string | null;
startedAt?: string | null;
finishedAt?: string | null;
createdAt?: string;
updatedAt?: string;
}
/**
* Response of `POST /ai-chat/run` (#184): the latest run of a chat and the
* assistant message it materializes (the partial/final output, projected from the
* persisted rows). Both are `null` when the chat has never had a run.
*/
export interface IAiChatRunResponse {
run: IAiChatRun | null;
message: IAiChatMessageRow | null;
}
export interface IAiChatListParams extends QueryParams {} export interface IAiChatListParams extends QueryParams {}
export interface IAiChatMessagesParams { export interface IAiChatMessagesParams {

View File

@@ -0,0 +1,104 @@
import { describe, it, expect } from "vitest";
import type { UIMessage } from "@ai-sdk/react";
import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts";
import {
RUN_POLL_INTERVAL_MS,
isRunActive,
runPollInterval,
shouldObserveRun,
mergeObservedMessage,
} from "./run-polling.ts";
function makeRun(status: string): IAiChatRun {
return { id: "run-1", chatId: "c1", status };
}
function makeMsg(id: string, text: string): UIMessage {
return {
id,
role: "assistant",
parts: [{ type: "text", text }],
} as UIMessage;
}
describe("isRunActive", () => {
it("treats pending and running as active", () => {
expect(isRunActive(makeRun("pending"))).toBe(true);
expect(isRunActive(makeRun("running"))).toBe(true);
});
it("treats terminal / unknown / nullish as not active", () => {
expect(isRunActive(makeRun("succeeded"))).toBe(false);
expect(isRunActive(makeRun("failed"))).toBe(false);
expect(isRunActive(makeRun("aborted"))).toBe(false);
expect(isRunActive(makeRun("weird-future-status"))).toBe(false);
expect(isRunActive(null)).toBe(false);
expect(isRunActive(undefined)).toBe(false);
});
});
describe("runPollInterval (the refetchInterval helper)", () => {
it("returns 2000ms while the run is pending/running", () => {
expect(runPollInterval(makeRun("pending"))).toBe(RUN_POLL_INTERVAL_MS);
expect(runPollInterval(makeRun("running"))).toBe(RUN_POLL_INTERVAL_MS);
expect(RUN_POLL_INTERVAL_MS).toBe(2000);
});
it("returns false (stop polling) once the run is terminal", () => {
expect(runPollInterval(makeRun("succeeded"))).toBe(false);
expect(runPollInterval(makeRun("failed"))).toBe(false);
expect(runPollInterval(makeRun("aborted"))).toBe(false);
});
it("returns false (no polling) when there is no run", () => {
expect(runPollInterval(null)).toBe(false);
expect(runPollInterval(undefined)).toBe(false);
});
});
describe("shouldObserveRun (observer-vs-streamer decision)", () => {
it("observes an active run when this tab is NOT the local streamer", () => {
expect(shouldObserveRun(makeRun("running"), false)).toBe(true);
expect(shouldObserveRun(makeRun("pending"), false)).toBe(true);
});
it("observes a terminal run too (so the final output shows on reopen)", () => {
expect(shouldObserveRun(makeRun("succeeded"), false)).toBe(true);
});
it("does NOT observe when this tab IS the streamer (no double-render)", () => {
expect(shouldObserveRun(makeRun("running"), true)).toBe(false);
expect(shouldObserveRun(makeRun("succeeded"), true)).toBe(false);
});
it("does NOT observe when there is no run", () => {
expect(shouldObserveRun(null, false)).toBe(false);
expect(shouldObserveRun(undefined, false)).toBe(false);
});
});
describe("mergeObservedMessage", () => {
it("replaces the message with the same id in place (per-step growth)", () => {
const prev = [makeMsg("u1", "hi"), makeMsg("a1", "step 1")];
const observed = makeMsg("a1", "step 1\nstep 2");
const next = mergeObservedMessage(prev, observed);
expect(next).toHaveLength(2);
expect(next[1]).toBe(observed);
expect(next[0]).toBe(prev[0]); // untouched
expect(next).not.toBe(prev); // new array (never mutates input)
});
it("appends when the observed message is not yet present", () => {
const prev = [makeMsg("u1", "hi")];
const observed = makeMsg("a1", "first token");
const next = mergeObservedMessage(prev, observed);
expect(next).toHaveLength(2);
expect(next[1]).toBe(observed);
});
it("returns the original list unchanged when there is nothing to merge", () => {
const prev = [makeMsg("u1", "hi")];
expect(mergeObservedMessage(prev, null)).toBe(prev);
expect(mergeObservedMessage(prev, undefined)).toBe(prev);
});
});

View File

@@ -0,0 +1,71 @@
import type { UIMessage } from "@ai-sdk/react";
import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts";
/**
* Reconnect-and-live-follow helpers (#184). When a chat is reopened while its
* agent run is STILL going, this tab is a PASSIVE OBSERVER: it did not start the
* run here (no local SSE stream), so it catches up by POLLING the reconnect
* endpoint (`POST /ai-chat/run`) and merging the run's incrementally-persisted
* assistant message into the rendered thread. These are the small pure decisions
* that machinery hangs off, extracted so they can be unit-tested in isolation
* (mirrors how reindex polling / editor-sync-state are tested).
*/
/** How often to re-poll the reconnect endpoint while a run is ACTIVE. */
export const RUN_POLL_INTERVAL_MS = 2000;
// 'pending' and 'running' are the two ACTIVE statuses; 'succeeded' | 'failed' |
// 'aborted' are TERMINAL (and any unknown future status is treated as terminal,
// so a stale/odd value never polls forever).
const ACTIVE_STATUSES = new Set(["pending", "running"]);
/** Whether a run is still going (worth polling / merging live updates from). */
export function isRunActive(run: IAiChatRun | null | undefined): boolean {
return !!run && ACTIVE_STATUSES.has(run.status);
}
/**
* The TanStack Query `refetchInterval` value for the run query: poll every
* {@link RUN_POLL_INTERVAL_MS} while the run is active, and `false` (stop) once
* it is terminal or there is no run. Polling is thus naturally bounded by the run
* reaching a terminal status — no separate timeout cap is needed.
*/
export function runPollInterval(
run: IAiChatRun | null | undefined,
): number | false {
return isRunActive(run) ? RUN_POLL_INTERVAL_MS : false;
}
/**
* Observer-vs-streamer decision. We render the polled run message (catch up +
* keep advancing) ONLY when this tab is a passive observer: there IS a run AND
* this tab is NOT the one locally streaming it (we reconnected, we didn't start
* it here). When this tab is the streamer, the live SSE stream owns the view, so
* we neither poll nor merge — avoiding a double-render fight. Terminal runs still
* merge (so the final persisted output is shown on reopen); the poll itself is
* stopped separately by {@link runPollInterval}.
*/
export function shouldObserveRun(
run: IAiChatRun | null | undefined,
localStreaming: boolean,
): boolean {
return !!run && !localStreaming;
}
/**
* Merge an observed assistant message into the rendered list: replace the message
* with the same id in place (the in-progress assistant row is already seeded from
* history, so per-step growth replaces it), or append it when absent. Returns a
* new array; the input is never mutated.
*/
export function mergeObservedMessage(
messages: UIMessage[],
observed: UIMessage | null | undefined,
): UIMessage[] {
if (!observed) return messages;
const idx = messages.findIndex((m) => m.id === observed.id);
if (idx === -1) return [...messages, observed];
const next = messages.slice();
next[idx] = observed;
return next;
}

View File

@@ -65,6 +65,9 @@ export interface IWorkspaceAiSettings {
dictation?: boolean; dictation?: boolean;
dictationStreaming?: boolean; dictationStreaming?: boolean;
publicShareAssistant?: boolean; publicShareAssistant?: boolean;
// #184: detached agent runs (a run survives a browser disconnect and can be
// reconnected to / live-followed on reopen). Gates the run-reconnect polling.
autonomousRuns?: boolean;
} }
export interface IWorkspaceSharingSettings { export interface IWorkspaceSharingSettings {