From ea61c96a7ca798945733e9936652dbc412ddf600 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Thu, 25 Jun 2026 11:53:25 +0300 Subject: [PATCH] =?UTF-8?q?refactor(review):=20address=20PR=20#186=20revie?= =?UTF-8?q?w=20(#183=20=E2=80=94=20recency=20sweep,=20#174=20export,=20tes?= =?UTF-8?q?ts,=20cleanups)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 15-point review of the persistent-history PR. Architecture decisions: crash recovery = recency threshold; tool-label duplication = leave as-is. Must-fix: 1. Boot-sweep bounded by recency. sweepStreaming now also requires `updatedAt < now() - SWEEP_STREAMING_STALE_MS` (10 min), so a fresh replica's startup sweep can't abort a turn another replica is actively streaming (multi-instance deploy). Int-spec: a FRESH 'streaming' row is NOT swept, a STALE one IS. 2. Restore export during the FIRST streaming turn of a new chat (#174). The server chatId is now adopted EARLY (in-place, on the start-chunk metadata) via a new `onServerChatId` callback wired through use-chat-session → chat-thread, so `activeChatId` is set at turn start and the Copy button is live mid-first- turn (canExport = !!activeChatId). Hook tests for early/in-place/no-op adopt. 3. Cover finalizeAssistant's fallback-insert branch: extracted pure `planFinalizeAssistant(assistantId)` (update when id present, insert when the upfront insert failed) + a dispatch harness test for both arms. Tests: onModuleInit lifecycle spec (sweep called; throw → resolves + warns); int-spec updatedAt assertion → toBeGreaterThan. Cleanups: cap findAllByChat at 5000 rows; upfront-insert-failure log carries chatId+workspaceId; removed the now-dead buildPartialAssistantRecord (only the spec consumed it; shapes still pinned by the flushAssistant suite); controller passes `lang: dto.lang` (normalizeLang handles undefined); dropped a no-op `?? undefined` in errorOf; documented the content-column semantics change (concatenated step text, UI renders from metadata.parts); CHANGELOG [Unreleased] entry (#183, #174); reworded the stale LABELS parity comment. Verified: server build + 323 ai-chat unit + 5 integration; client tsc + 160 ai-chat unit; prettier clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 16 ++- .../ai-chat/components/ai-chat-window.tsx | 7 +- .../ai-chat/components/chat-thread.tsx | 27 ++++ .../ai-chat/hooks/use-chat-session.test.tsx | 44 ++++++- .../ai-chat/hooks/use-chat-session.ts | 44 ++++++- .../ai-chat/ai-chat.controller.export.spec.ts | 76 +++++++++++ .../src/core/ai-chat/ai-chat.controller.ts | 3 +- .../ai-chat/ai-chat.service.lifecycle.spec.ts | 61 +++++++++ .../src/core/ai-chat/ai-chat.service.spec.ts | 120 +++--------------- .../src/core/ai-chat/ai-chat.service.ts | 67 +++++----- .../src/core/ai-chat/chat-markdown.util.ts | 11 +- .../repos/ai-chat/ai-chat-message.repo.ts | 27 ++++ .../ai-chat-message-status.int-spec.ts | 70 ++++++++-- 13 files changed, 408 insertions(+), 165 deletions(-) create mode 100644 apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 26adb3f9..90293ba7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Persistent AI-chat history as the source of truth + server-side export.** + An assistant turn is now persisted to the database step by step: the row is + inserted upfront as `streaming` and updated as each agent step finishes, then + finalized once to `completed`/`error`/`aborted`. A process that dies mid-turn + keeps every finished step, and a startup sweep flips any dangling `streaming` + row (untouched for 10 minutes) to `aborted`. Chat "Copy" now exports + server-side from these rows (`POST /ai-chat/export`) rather than from live + client state, so the export is identical whether a chat is freshly streaming, + just switched to, or reloaded — and is available from the first turn of a new + chat. (#183, #174) + - **AI-agent attribution for MCP writes.** Comments (and pages) created through the MCP endpoint by a dedicated agent account are now badged as "AI", with unspoofable provenance derived from a per-user `is_agent` flag (not from the - request body). **Operator setup:** use a *dedicated* service account for the + request body). **Operator setup:** use a _dedicated_ service account for the MCP fallback and set the flag with SQL — `UPDATE users SET is_agent = true WHERE email = ''`. Never flag a human or shared account, or its normal edits get mis-attributed as AI. See the @@ -150,8 +161,7 @@ embeds — plus a large batch of security hardening and test coverage. - Page templates: import `ThrottleModule` so collab boots, never strand an in-flight page-embed id, and add defense-in-depth workspace checks. - Pages: `movePage` cycle guard with no phantom `PAGE_MOVED` event. -- Import: surface the real error cause from `/pages/import` instead of a generic - 400. +- Import: surface the real error cause from `/pages/import` instead of a generic 400. ### Security diff --git a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx index 547898bd..de0b9923 100644 --- a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx +++ b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx @@ -194,6 +194,7 @@ export default function AiChatWindow() { threadKey, waitingForHistory, onTurnFinished, + onServerChatId, cancelPendingAdoption, } = useChatSession({ activeChatId, @@ -238,7 +239,10 @@ export default function AiChatWindow() { // SERVER-sourced (the DB is the single source of truth — #183): the assistant // row is persisted upfront + per step, so even a brand-new chat whose first // turn is streaming/interrupted has a server row to render. Enable the button - // whenever a persisted chat is active (`activeChatId` is set). + // whenever a persisted chat is active (`activeChatId` is set). For a BRAND-NEW + // chat that id is adopted EARLY — at the stream's `start` chunk via + // onServerChatId (#174) — so the Copy button is available during the first + // turn's stream, not only after it terminates. const activeChat = useMemo( () => chats?.items?.find((c) => c.id === activeChatId) ?? null, [chats, activeChatId], @@ -629,6 +633,7 @@ export default function AiChatWindow() { onRolePicked={(role) => setSelectedRoleId(role.id)} assistantName={currentRole?.name} onTurnFinished={onTurnFinished} + onServerChatId={onServerChatId} onLiveTurnTokens={setLiveTurnTokens} /> )} diff --git a/apps/client/src/features/ai-chat/components/chat-thread.tsx b/apps/client/src/features/ai-chat/components/chat-thread.tsx index 0c4ecbd0..c906a940 100644 --- a/apps/client/src/features/ai-chat/components/chat-thread.tsx +++ b/apps/client/src/features/ai-chat/components/chat-thread.tsx @@ -61,6 +61,12 @@ interface ChatThreadProps { * authoritative id the server streamed on the assistant message metadata, or * undefined on a failed turn — see adopt-chat-id.ts for the full #137 design. */ onTurnFinished: (serverChatId?: string) => void; + /** Called EARLY (at the stream's `start` chunk) with the authoritative server + * chat id streamed on the assistant message metadata, so a brand-new chat + * adopts its real id WHILE the first turn is still streaming (#174 — makes the + * Copy/export button available mid-stream). Distinct from onTurnFinished, + * which fires only at the terminal outcome. */ + onServerChatId?: (serverChatId?: string) => void; /** Reports the live turn-token total (reasoning + output) for the in-flight * turn so the parent can show a header badge that ticks mid-stream. THROTTLED * here (~8 Hz) so the parent re-renders a handful of times a second, not on @@ -110,6 +116,7 @@ export default function ChatThread({ onRolePicked, assistantName, onTurnFinished, + onServerChatId, onLiveTurnTokens, }: ChatThreadProps) { const { t } = useTranslation(); @@ -279,6 +286,26 @@ export default function ChatThread({ // Keep the flush helper pointed at the latest sendMessage instance. sendMessageRef.current = sendMessage; + // EARLY chat-id adoption (#174): the server streams the authoritative chat id + // on the assistant message metadata at the `start` chunk (message.metadata. + // chatId — see adopt-chat-id.ts / chatStreamMetadata). Forward it to the parent + // AS SOON AS it appears (mid-stream), so a brand-new chat adopts its real id + // WHILE the first turn is still streaming and activeChatId-gated affordances + // (the Copy/export button) light up immediately, instead of only at onFinish. + // Keyed by the last-seen id so we forward each distinct id exactly once. The + // parent's onServerChatId is idempotent and a no-op once the chat has an id. + const lastForwardedChatIdRef = useRef(undefined); + useEffect(() => { + if (!onServerChatId) return; + const tail = messages[messages.length - 1]; + if (tail?.role !== "assistant") return; + const serverChatId = extractServerChatId(tail); + if (!serverChatId || serverChatId === lastForwardedChatIdRef.current) + return; + lastForwardedChatIdRef.current = serverChatId; + onServerChatId(serverChatId); + }, [messages, onServerChatId]); + // Live "turn was interrupted" marker for the CURRENT session. The red error // banner (driven by `error`) covers the error case; this covers an aborted // turn, distinguishing a manual Stop (`isAbort`) from a dropped connection diff --git a/apps/client/src/features/ai-chat/hooks/use-chat-session.test.tsx b/apps/client/src/features/ai-chat/hooks/use-chat-session.test.tsx index 8104d1e6..0080cc80 100644 --- a/apps/client/src/features/ai-chat/hooks/use-chat-session.test.tsx +++ b/apps/client/src/features/ai-chat/hooks/use-chat-session.test.tsx @@ -64,7 +64,10 @@ describe("useChatSession", () => { result.current.onTurnFinished(undefined); expect(setActiveChatId).not.toHaveBeenCalled(); // The refetch lands with the new row => adopt it. - rerender({ activeChatId: null, chats: { items: [{ id: "x" }, { id: "new" }] } }); + rerender({ + activeChatId: null, + chats: { items: [{ id: "x" }, { id: "new" }] }, + }); expect(setActiveChatId).toHaveBeenCalledWith("new"); }); @@ -88,7 +91,10 @@ describe("useChatSession", () => { }); result.current.onTurnFinished(undefined); // a was deleted, new was added — same length, but membership changed. - rerender({ activeChatId: null, chats: { items: [{ id: "b" }, { id: "new" }] } }); + rerender({ + activeChatId: null, + chats: { items: [{ id: "b" }, { id: "new" }] }, + }); expect(setActiveChatId).toHaveBeenCalledWith("new"); }); @@ -171,6 +177,40 @@ describe("useChatSession", () => { expect(setActiveChatId).not.toHaveBeenCalledWith("late"); }); + it("#174 early adopt: onServerChatId adopts the streamed id mid-stream (Copy button available during the first turn)", () => { + // Brand-new chat: no id yet. The server streams the real chat id "A" on the + // `start` chunk WHILE the first turn is still streaming (before onTurnFinished + // fires at the terminal outcome). The hook must adopt it immediately so the + // window's activeChatId-gated Copy/export button lights up during the stream. + const { result, setActiveChatId } = setup({ + activeChatId: null, + chats: { items: [] }, + }); + result.current.onServerChatId("A"); + expect(setActiveChatId).toHaveBeenCalledWith("A"); + }); + + it("#174 early adopt is in-place: threadKey stays stable (live stream not torn down)", () => { + const chats = { items: [] }; + const { result, rerender } = setup({ activeChatId: null, chats }); + const keyBefore = result.current.threadKey; + result.current.onServerChatId("A"); + // Parent reflects the adopted id back in; the SAME mount key is kept so the + // in-flight useChat store (the streaming turn) is preserved. + rerender({ activeChatId: "A", chats }); + expect(result.current.threadKey).toBe(keyBefore); + }); + + it("#174 early adopt: no-op for an existing chat and for a missing id", () => { + const { result, setActiveChatId } = setup({ + activeChatId: "chat-1", + chats: { items: [{ id: "chat-1" }] }, + }); + result.current.onServerChatId("chat-1"); // already has an id + result.current.onServerChatId(undefined); // no streamed id + expect(setActiveChatId).not.toHaveBeenCalled(); + }); + it("in-place adopt keeps threadKey stable; an external switch remounts", () => { const chats = { items: [{ id: "B" }] }; const { result, rerender } = setup({ activeChatId: null, chats }); diff --git a/apps/client/src/features/ai-chat/hooks/use-chat-session.ts b/apps/client/src/features/ai-chat/hooks/use-chat-session.ts index 998f2631..d21ebd11 100644 --- a/apps/client/src/features/ai-chat/hooks/use-chat-session.ts +++ b/apps/client/src/features/ai-chat/hooks/use-chat-session.ts @@ -34,6 +34,13 @@ export interface UseChatSessionResult { /** Call when a turn finishes; `serverChatId` is the authoritative streamed id * (undefined on a failed turn). Handles new-chat id adoption + invalidations. */ onTurnFinished: (serverChatId?: string) => void; + /** Call EARLY (at the stream's `start` chunk) with the authoritative streamed + * chat id so a brand-new chat adopts its real id WHILE its first turn is still + * streaming — making `activeChatId`-gated affordances (e.g. the Copy/export + * button, #174) available immediately. In-place adoption only (same mount key, + * no list/messages invalidation — that is left to onTurnFinished at the end). + * Idempotent and a no-op once the chat already has an id. */ + onServerChatId: (serverChatId?: string) => void; /** Disarm any pending error-path new-chat fallback. The window calls this from * startNewChat/selectChat so a late refetch can't yank the user back into a * just-failed chat after they explicitly moved on. */ @@ -85,13 +92,10 @@ export function useChatSession( // `newThread`/`switchThread` to (re)mount, `adoptThread` for in-place adoption. // Initial: a non-null activeChatId switches to it; a null one gets a fresh // session key with no chat id yet. - const [thread, dispatch] = useReducer( - threadSessionReducer, - undefined, - () => - activeChatId === null - ? newThread(`new-${generateId()}`) - : switchThread(activeChatId), + const [thread, dispatch] = useReducer(threadSessionReducer, undefined, () => + activeChatId === null + ? newThread(`new-${generateId()}`) + : switchThread(activeChatId), ); // Error-path fallback for new-chat id adoption. When a brand-new chat's first @@ -150,6 +154,31 @@ export function useChatSession( [chats, setActiveChatId, onInvalidateChatList, onInvalidateChatMessages], ); + // EARLY adoption (#174): adopt the authoritative streamed chat id the moment + // the server emits it on the `start` chunk, so a brand-new chat gets its real + // `activeChatId` WHILE its first turn streams — not only at terminal + // onTurnFinished. This makes the activeChatId-gated Copy/export button + // available during the first turn. Pure in-place adoption (same mount key, like + // the primary path) with NO invalidation: the list/messages refresh stays on + // onTurnFinished at the end of the turn. Reads the live id from the ref so a + // repeat call after adoption is a no-op (resolveAdoptedChatId only fires for a + // still-new chat). + const onServerChatId = useCallback( + (serverChatId?: string) => { + const adopted = resolveAdoptedChatId( + activeChatIdRef.current, + serverChatId, + ); + if (!adopted) return; + activeChatIdRef.current = adopted; + setActiveChatId(adopted); + dispatch({ type: "adopt", chatId: adopted }); + // Early adoption beat the error-path fallback to it — disarm. + pendingNewChatRef.current = null; + }, + [setActiveChatId], + ); + // FALLBACK resolver. Armed only by onTurnFinished when a brand-new chat's first // turn errored before the `start` chunk (no authoritative id streamed). Once // the per-user list refetch lands with the just-created row, adopt the SINGLE @@ -233,6 +262,7 @@ export function useChatSession( threadKey: thread.key, waitingForHistory, onTurnFinished, + onServerChatId, cancelPendingAdoption, }; } diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts b/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts index f8d84cb1..a518abc9 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts @@ -1,5 +1,10 @@ import { ForbiddenException } from '@nestjs/common'; import { AiChatController } from './ai-chat.controller'; +import { + planFinalizeAssistant, + flushAssistant, + type AssistantFlush, +} from './ai-chat.service'; import type { User, Workspace } from '@docmost/db/types/entity.types'; /** @@ -90,3 +95,74 @@ describe('AiChatController.export', () => { expect(res.markdown).toContain('## 2. ИИ-агент'); }); }); + +/** + * The terminal-finalize dispatch (#183): the assistant row is INSERTed upfront + * as 'streaming' and finalized once on the terminal callback. When the upfront + * insert SUCCEEDED (we hold an id) finalize UPDATEs that row; when it FAILED + * (assistantId is undefined) finalize falls back to INSERTing the terminal row + * so the turn is not lost — the only safety against losing the turn entirely. + * + * `planFinalizeAssistant` is the pure decision; this also drives a tiny harness + * that mirrors the service's `finalizeAssistant` repo dispatch over a mock repo, + * proving both branches issue the right call with the terminal payload. + */ +describe('finalizeAssistant dispatch (planFinalizeAssistant)', () => { + const workspaceId = 'ws1'; + + // Mirror of the service's finalize repo-dispatch over the plan: UPDATE when an + // upfront row exists, else INSERT the terminal row. + async function dispatchFinalize( + repo: { insert: jest.Mock; update: jest.Mock }, + assistantId: string | undefined, + flushed: AssistantFlush, + ): Promise { + const plan = planFinalizeAssistant(assistantId); + if (plan.kind === 'insert') { + await repo.insert({ + chatId: 'c1', + workspaceId, + userId: 'u1', + role: 'assistant', + content: flushed.content, + toolCalls: flushed.toolCalls ?? null, + metadata: flushed.metadata, + status: flushed.status, + }); + } else { + await repo.update(plan.id, workspaceId, flushed); + } + } + + it('plan: update when the upfront insert returned an id', () => { + expect(planFinalizeAssistant('a1')).toEqual({ kind: 'update', id: 'a1' }); + }); + + it('plan: insert (fallback) when there is no upfront id', () => { + expect(planFinalizeAssistant(undefined)).toEqual({ kind: 'insert' }); + }); + + it('(a) upfront insert succeeded -> finalize UPDATEs the row by id', async () => { + const repo = { insert: jest.fn(), update: jest.fn() }; + const flushed = flushAssistant([], 'final answer', 'completed', { + finishReason: 'stop', + }); + await dispatchFinalize(repo, 'a1', flushed); + expect(repo.update).toHaveBeenCalledWith('a1', workspaceId, flushed); + expect(repo.insert).not.toHaveBeenCalled(); + }); + + it('(b) upfront insert failed -> finalize INSERTs the terminal payload', async () => { + const repo = { insert: jest.fn(), update: jest.fn() }; + const flushed = flushAssistant([], 'partial', 'error', { error: 'boom' }); + await dispatchFinalize(repo, undefined, flushed); + expect(repo.update).not.toHaveBeenCalled(); + expect(repo.insert).toHaveBeenCalledTimes(1); + const arg = repo.insert.mock.calls[0][0]; + // The fallback insert carries the terminal content/status/metadata. + expect(arg.role).toBe('assistant'); + expect(arg.content).toBe('partial'); + expect(arg.status).toBe('error'); + expect((arg.metadata as { error?: string }).error).toBe('boom'); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.ts b/apps/server/src/core/ai-chat/ai-chat.controller.ts index be6e65da..0f243dec 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.ts @@ -107,7 +107,8 @@ export class AiChatController { title: chat.title ?? null, chatId: dto.chatId, rows, - lang: dto.lang ?? 'en', + // normalizeLang(undefined) already yields 'en', so no `?? 'en'` is needed. + lang: dto.lang, }); return { markdown }; } diff --git a/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts new file mode 100644 index 00000000..77e9d3c4 --- /dev/null +++ b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts @@ -0,0 +1,61 @@ +import { Logger } from '@nestjs/common'; +import { AiChatService } from './ai-chat.service'; + +/** + * Lifecycle unit tests for AiChatService.onModuleInit (#183 crash-recovery + * sweep). The sweep is BEST-EFFORT: a failure must be logged (warn) but must + * NEVER throw out of onModuleInit and block server startup. Exercised with a + * hand-rolled mock repo — no Nest graph, no DB. Only `aiChatMessageRepo` is + * touched by onModuleInit, so the other constructor deps are stubbed as never. + */ +describe('AiChatService.onModuleInit (startup sweep)', () => { + function makeService(sweepStreaming: jest.Mock) { + const aiChatMessageRepo = { sweepStreaming }; + const service = new AiChatService( + {} as never, // ai + {} as never, // aiChatRepo + aiChatMessageRepo as never, + {} as never, // aiSettings + {} as never, // tools + {} as never, // mcpClients + {} as never, // aiAgentRoleRepo + {} as never, // pageRepo + {} as never, // pageAccess + ); + return { service, aiChatMessageRepo }; + } + + afterEach(() => jest.restoreAllMocks()); + + it('happy path: calls sweepStreaming and resolves', async () => { + const sweepStreaming = jest.fn().mockResolvedValue(0); + const { service } = makeService(sweepStreaming); + await expect(service.onModuleInit()).resolves.toBeUndefined(); + expect(sweepStreaming).toHaveBeenCalledTimes(1); + }); + + it('logs how many rows were swept when > 0', async () => { + const sweepStreaming = jest.fn().mockResolvedValue(3); + const logSpy = jest + .spyOn(Logger.prototype, 'log') + .mockImplementation(() => undefined); + const { service } = makeService(sweepStreaming); + await service.onModuleInit(); + expect(logSpy).toHaveBeenCalledTimes(1); + expect(String(logSpy.mock.calls[0][0])).toContain('3'); + }); + + it('sweepStreaming throws -> onModuleInit resolves (does NOT throw) and warns', async () => { + const sweepStreaming = jest + .fn() + .mockRejectedValue(new Error('db unavailable')); + const warnSpy = jest + .spyOn(Logger.prototype, 'warn') + .mockImplementation(() => undefined); + const { service } = makeService(sweepStreaming); + // Must not throw — a sweep failure may never block startup. + await expect(service.onModuleInit()).resolves.toBeUndefined(); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(String(warnSpy.mock.calls[0][0])).toContain('db unavailable'); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.spec.ts index 926c5bde..878de557 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.spec.ts @@ -4,7 +4,6 @@ import { serializeSteps, rowToUiMessage, prepareAgentStep, - buildPartialAssistantRecord, flushAssistant, chatStreamMetadata, accumulateStepUsage, @@ -241,101 +240,13 @@ describe('prepareAgentStep', () => { }); }); -/** - * Unit test for buildPartialAssistantRecord: the pure helper that shapes the - * assistant-message record persisted on a partial/failed turn (the streamText - * onError / onAbort paths). It captures the PARTIAL answer the user already saw - * (finished steps' text + tool parts, plus the in-progress step's text) so a - * provider error / disconnect no longer throws the streamed answer away. Pinning - * the record shape here covers the persist-partial logic without seaming - * streamText itself. - */ -describe('buildPartialAssistantRecord', () => { - type AnyPart = Record; - - it('records an empty turn with the error text (preserves old behavior)', () => { - const rec = buildPartialAssistantRecord( - [], - '', - 'error', - '401: Unauthorized', - ); - expect(rec).toEqual({ - text: '', - toolCalls: null, - metadata: { - finishReason: 'error', - parts: [], - error: '401: Unauthorized', - }, - }); - }); - - it('persists in-progress text (no finished steps) as the partial answer', () => { - const rec = buildPartialAssistantRecord( - [], - 'partial answer', - 'error', - 'boom', - ); - expect(rec.text).toBe('partial answer'); - expect(rec.metadata.parts).toEqual([ - { type: 'text', text: 'partial answer' }, - ]); - expect(rec.metadata.error).toBe('boom'); - }); - - it('combines a finished tool step with trailing in-progress text', () => { - const steps = [ - { - text: 'looked it up', - toolCalls: [ - { toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } }, - ], - toolResults: [ - { toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } }, - ], - }, - ]; - const rec = buildPartialAssistantRecord( - steps, - ' and then', - 'error', - 'boom', - ); - const parts = rec.metadata.parts as AnyPart[]; - // The finished step's text part is present. - expect(parts).toContainEqual({ type: 'text', text: 'looked it up' }); - // The paired tool call+result becomes an output-available part. - const toolPart = parts.find((p) => p.type === 'tool-getPage'); - expect(toolPart).toBeDefined(); - expect(toolPart!.state).toBe('output-available'); - // The in-progress text is appended LAST so the parts match the stream order. - expect(parts[parts.length - 1]).toEqual({ - type: 'text', - text: ' and then', - }); - expect(rec.text).toBe('looked it up and then'); - expect(rec.toolCalls).not.toBeNull(); - expect(rec.metadata.error).toBe('boom'); - }); - - it('omits the error key on the abort path (no errorText)', () => { - const rec = buildPartialAssistantRecord([], 'half', 'aborted'); - expect(rec.metadata.finishReason).toBe('aborted'); - expect('error' in rec.metadata).toBe(false); - expect(rec.text).toBe('half'); - }); -}); - /** * flushAssistant (#183): the PURE row builder behind the step-granular durable * write path. It runs identically for the upfront insert (empty steps, * 'streaming'), every per-step update, and the terminal finalize — so a future * background worker can call the same function. These tests pin the four status - * shapes and, critically, that `metadata.parts` stays IDENTICAL to the old - * buildPartialAssistantRecord / assistantParts output (rowToUiMessage/findRecent - * depend on it). + * shapes and the `metadata.parts` shape that rowToUiMessage/findRecent depend on + * (per-step text + tool parts via assistantParts, in-progress text appended). */ describe('flushAssistant', () => { type AnyPart = Record; @@ -411,21 +322,24 @@ describe('flushAssistant', () => { }); }); - it('metadata.parts parity with buildPartialAssistantRecord (error path)', () => { + it('combines a finished tool step with trailing in-progress text (error path)', () => { + // The error path captures the PARTIAL answer the user already saw: each + // finished step's text + tool parts, then the in-progress step's text last. const flushed = flushAssistant([toolStep], ' and then', 'error', { error: 'boom', }); - const legacy = buildPartialAssistantRecord( - [toolStep], - ' and then', - 'error', - 'boom', - ); - // The whole metadata block (parts + finishReason + error) must match the - // legacy partial-record shape so rebuilt history is unchanged. - expect(flushed.metadata).toEqual(legacy.metadata); - expect(flushed.content).toBe(legacy.text); - expect(flushed.toolCalls).toEqual(legacy.toolCalls); + const parts = flushed.metadata.parts as AnyPart[]; + expect(parts).toContainEqual({ type: 'text', text: 'looked it up' }); + const toolPart = parts.find((p) => p.type === 'tool-getPage'); + expect(toolPart!.state).toBe('output-available'); + // In-progress text appended LAST so the parts match the stream order. + expect(parts[parts.length - 1]).toEqual({ + type: 'text', + text: ' and then', + }); + expect(flushed.content).toBe('looked it up and then'); + expect(flushed.toolCalls).not.toBeNull(); + expect(flushed.metadata.error).toBe('boom'); }); }); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index 15877a52..d214ec35 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -412,7 +412,10 @@ export class AiChatService implements OnModuleInit { }); assistantId = seeded?.id; } catch (err) { - this.logger.error('Failed to insert upfront assistant row', err as Error); + this.logger.error( + `Failed to insert upfront assistant row (chat ${chatId}, workspace ${workspace.id})`, + err as Error, + ); } // Per-step (non-terminal) update: persist the finished steps the moment a @@ -453,7 +456,8 @@ export class AiChatService implements OnModuleInit { ): Promise => { if (finalized) return; finalized = true; - if (!assistantId) { + const plan = planFinalizeAssistant(assistantId); + if (plan.kind === 'insert') { // The upfront insert failed: fall back to inserting the terminal row so // the turn is not lost entirely. try { @@ -476,7 +480,7 @@ export class AiChatService implements OnModuleInit { return; } try { - await this.aiChatMessageRepo.update(assistantId, workspace.id, flushed); + await this.aiChatMessageRepo.update(plan.id, workspace.id, flushed); } catch (err) { this.logger.error('Failed to finalize assistant message', err as Error); } @@ -552,6 +556,15 @@ export class AiChatService implements OnModuleInit { // pre-#183 onFinish record exactly; `inProgressText` is '' here (the last // step already finished). Final-step usage (usage.input+output) ≈ the // conversation's CURRENT context size, distinct from totalUsage. + // + // COLUMN-SEMANTICS NOTE (#183): `content` is built by flushAssistant as + // the CONCATENATION of every step's text (stepsText), whereas pre-#183 + // it stored only the FINAL step's text. This is a deliberate, harmless + // change: the UI and the Markdown export render from `metadata.parts` + // (per-step text + tool parts), not from `content`; `content` is the + // plain-text projection (full-text search / fallback). A multi-step + // turn's `content` therefore now holds all steps' prose, not just the + // last block. await finalizeAssistant( flushAssistant(steps as StepLike[], '', 'completed', { finishReason: finishReason as string, @@ -1088,6 +1101,21 @@ export interface AssistantFlush { status: 'streaming' | 'completed' | 'error' | 'aborted'; } +/** + * Pure decision for the terminal finalize (#183): given whether the upfront + * assistant row exists (`assistantId`), choose whether the terminal payload is + * written by UPDATEing that row or — when the upfront insert failed and there is + * no id — by INSERTing a fresh terminal row so the turn is not lost entirely. + * Returns `{ kind: 'update', id }` or `{ kind: 'insert' }`. Extracted so the + * fallback-insert branch (the only safety against losing a turn whose upfront + * insert failed) is unit-testable without seaming streamText. + */ +export function planFinalizeAssistant( + assistantId: string | undefined, +): { kind: 'update'; id: string } | { kind: 'insert' } { + return assistantId ? { kind: 'update', id: assistantId } : { kind: 'insert' }; +} + /** * PURE assistant-row builder (#183 step-granular durability). Given the turn's * accumulated steps + the in-progress (not-yet-finished) text + the lifecycle @@ -1097,9 +1125,8 @@ export interface AssistantFlush { * worker can call it identically, so it must stay a pure function of its inputs * (NO `this`, no IO). * - * `metadata.parts` is built by the EXACT same logic the old - * buildPartialAssistantRecord used (assistantParts over finished steps, then the - * in-progress text appended as a trailing text part), so rowToUiMessage / + * `metadata.parts` is built by assistantParts over the finished steps, then the + * in-progress text appended as a trailing text part, so rowToUiMessage / * findRecent keep replaying the turn unchanged. `metadata.finishReason`, * `metadata.error`, `metadata.usage` and `metadata.contextTokens` are attached * only when provided/relevant, matching the pre-#183 onFinish/onError records. @@ -1152,34 +1179,6 @@ export function flushAssistant( }; } -/** - * Build the assistant-message record persisted on a partial/failed turn (the - * streamText onError / onAbort paths). Captures the partial answer the user - * already saw: each finished step's text + tool parts (via assistantParts), - * then the in-progress step's text appended last. When `errorText` is provided - * it is recorded in metadata.error so the cause shows in history; an aborted - * turn passes none. Pure, so the partial-recording shape is unit-testable - * without seaming streamText. - * - * Thin wrapper over {@link flushAssistant} (retained for the existing unit - * tests and its historical `{ text, toolCalls, metadata }` shape). - */ -export function buildPartialAssistantRecord( - steps: ReadonlyArray | undefined, - inProgressText: string, - finishReason: 'error' | 'aborted', - errorText?: string, -): { text: string; toolCalls: unknown; metadata: Record } { - const flushed = flushAssistant(steps, inProgressText, finishReason, { - error: errorText, - }); - return { - text: flushed.content, - toolCalls: flushed.toolCalls, - metadata: flushed.metadata, - }; -} - /** * Reduce SDK step objects to a compact, JSON-serializable trace for the * `tool_calls` column. Stores only what the UI action-log and history need — diff --git a/apps/server/src/core/ai-chat/chat-markdown.util.ts b/apps/server/src/core/ai-chat/chat-markdown.util.ts index 870eaf5a..ebbed474 100644 --- a/apps/server/src/core/ai-chat/chat-markdown.util.ts +++ b/apps/server/src/core/ai-chat/chat-markdown.util.ts @@ -48,9 +48,12 @@ interface UsageLike { reasoningTokens?: number; } -/** Localized label table. Keep the keys identical to the client's i18n keys so - * the two exports read the same. Only role + tool-action labels are localized; - * everything structural is an English constant in the renderer. */ +/** Localized label table. The client-side Markdown builder was removed by #183 + * (the export is now server-side only), so this no longer mirrors a second + * exporter — instead the tool-action labels are kept in parity with the + * on-screen action-log labels in the client's `tool-parts.tsx` (`toolLabelKey`) + * so the export reads the same as the UI. Only role + tool-action labels are + * localized; everything structural is an English constant in the renderer. */ const LABELS: Record< ExportLang, { @@ -232,7 +235,7 @@ export function buildChatMarkdown(args: { }; const errorOf = (row: AiChatMessage): string | undefined => { const meta = (row.metadata ?? {}) as { error?: string }; - return meta.error ?? undefined; + return meta.error; }; // Metadata bullet list. Total tokens is only shown when there is a sum. diff --git a/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts b/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts index 005d7def..bd455096 100644 --- a/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts +++ b/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts @@ -9,6 +9,20 @@ import { import { PaginationOptions } from '@docmost/db/pagination/pagination-options'; import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagination'; +// Crash-recovery sweep recency threshold (#183 review): a 'streaming' row is +// only swept to 'aborted' once it has been UNTOUCHED for this long. A live turn +// bumps `updatedAt` on every step (well under this window), so its row never +// matches; only a turn whose process truly died (no step update for >threshold) +// is swept. Chosen safely ABOVE the longest realistic turn so a fresh replica's +// boot-sweep can never abort a turn another replica is actively streaming +// (multi-instance deploy). +const SWEEP_STREAMING_STALE_MS = 10 * 60 * 1000; // 10 minutes + +// Hard upper bound on the rows materialized by `findAllByChat` (export path). +// A generous cap so a pathologically huge chat cannot load an unbounded result +// into memory; far above any realistic transcript length. +const FIND_ALL_BY_CHAT_LIMIT = 5000; + @Injectable() export class AiChatMessageRepo { constructor(@InjectKysely() private readonly db: KyselyDB) {} @@ -66,6 +80,10 @@ export class AiChatMessageRepo { // (#183), where the DB is the single source of truth and the whole transcript // must be rendered in one pass (findByChat is cursor-paginated and would only // return the first page). + // + // Hard-capped at FIND_ALL_BY_CHAT_LIMIT rows (a generous bound, far above any + // realistic transcript) so exporting a pathologically huge chat cannot + // materialize an unbounded result set in memory. async findAllByChat( chatId: string, workspaceId: string, @@ -78,6 +96,7 @@ export class AiChatMessageRepo { .where('deletedAt', 'is', null) .orderBy('createdAt', 'asc') .orderBy('id', 'asc') + .limit(FIND_ALL_BY_CHAT_LIMIT) .execute(); } @@ -162,13 +181,21 @@ export class AiChatMessageRepo { * status) to 'aborted'. Run once on server start. Returns the number of rows * swept so the caller can log it. Workspace-wide on purpose — a crash can have * dangling streaming rows across any workspace. + * + * Bounded by recency (#183 review): only rows UNTOUCHED for + * SWEEP_STREAMING_STALE_MS are swept. A live turn bumps `updatedAt` on every + * step, so an actively-streaming row never matches; this prevents a fresh + * replica's boot-sweep from aborting a turn another replica is still streaming + * in a multi-instance deploy. */ async sweepStreaming(trx?: KyselyTransaction): Promise { const db = dbOrTx(this.db, trx); + const staleBefore = new Date(Date.now() - SWEEP_STREAMING_STALE_MS); const rows = await db .updateTable('aiChatMessages') .set({ status: 'aborted', updatedAt: new Date() }) .where('status', '=', 'streaming') + .where('updatedAt', '<', staleBefore) .returning('id') .execute(); return rows.length; diff --git a/apps/server/test/integration/ai-chat-message-status.int-spec.ts b/apps/server/test/integration/ai-chat-message-status.int-spec.ts index 2299e658..9aa0238c 100644 --- a/apps/server/test/integration/ai-chat-message-status.int-spec.ts +++ b/apps/server/test/integration/ai-chat-message-status.int-spec.ts @@ -68,7 +68,8 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => { expect(updated!.content).toBe('final answer'); expect(updated!.status).toBe('completed'); expect((updated!.metadata as any).parts).toHaveLength(1); - expect(new Date(updated!.updatedAt).getTime()).toBeGreaterThanOrEqual( + // The 5ms sleep above guarantees a strictly-later timestamp. + expect(new Date(updated!.updatedAt).getTime()).toBeGreaterThan( new Date(before).getTime(), ); }); @@ -128,8 +129,23 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => { await repo.update(seeded.id, workspaceId, { status: 'completed' }); }); - it('sweepStreaming flips dangling streaming rows to aborted and counts them', async () => { - // Two dangling streaming rows in our workspace + one in another workspace. + // Backdate a row's updatedAt so it qualifies as a STALE streaming row (the + // sweep only flips rows untouched for >10 minutes — a live turn bumps + // updatedAt every step, so it would never match). + async function backdateUpdatedAt( + id: string, + minutesAgo: number, + ): Promise { + await db + .updateTable('aiChatMessages') + .set({ updatedAt: new Date(Date.now() - minutesAgo * 60 * 1000) }) + .where('id', '=', id) + .execute(); + } + + it('sweepStreaming flips STALE dangling streaming rows to aborted and counts them', async () => { + // Two dangling streaming rows in our workspace + one in another workspace — + // all backdated past the staleness threshold so the sweep picks them up. const a = await createMessage(db, { workspaceId, chatId, @@ -142,6 +158,16 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => { role: 'assistant', status: 'streaming', }); + const other = await createMessage(db, { + workspaceId: otherWorkspaceId, + chatId: otherChatId, + role: 'assistant', + status: 'streaming', + }); + await backdateUpdatedAt(a.id, 20); + await backdateUpdatedAt(b.id, 20); + await backdateUpdatedAt(other.id, 20); + // A settled row must NOT be touched. const done = await createMessage(db, { workspaceId, @@ -156,15 +182,9 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => { role: 'assistant', status: null, }); - await createMessage(db, { - workspaceId: otherWorkspaceId, - chatId: otherChatId, - role: 'assistant', - status: 'streaming', - }); const swept = await repo.sweepStreaming(); - // At least the 3 streaming rows we created (2 here + 1 in the other ws). + // At least the 3 stale streaming rows we created (2 here + 1 in the other ws). expect(swept).toBeGreaterThanOrEqual(3); const rows = await repo.findAllByChat(chatId, workspaceId); @@ -181,4 +201,34 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => { expect(rows2.find((r) => r.id === a.id)!.status).toBe('aborted'); expect(again).toBeGreaterThanOrEqual(0); }); + + it('sweepStreaming does NOT sweep a FRESH streaming row (recency bound, #183 review)', async () => { + // A row that is actively streaming (recent updatedAt) must survive the sweep: + // a fresh replica's boot-sweep must never abort a turn another replica is + // still streaming in a multi-instance deploy. + const fresh = await createMessage(db, { + workspaceId, + chatId, + role: 'assistant', + status: 'streaming', + }); + // A STALE streaming row created alongside it IS swept — proving the sweep + // ran and the only difference is recency. + const stale = await createMessage(db, { + workspaceId, + chatId, + role: 'assistant', + status: 'streaming', + }); + await backdateUpdatedAt(stale.id, 20); + + await repo.sweepStreaming(); + + const rows = await repo.findAllByChat(chatId, workspaceId); + const byId = new Map(rows.map((r) => [r.id, r])); + // Fresh (recently-updated) streaming row is left untouched... + expect(byId.get(fresh.id)!.status).toBe('streaming'); + // ...while the stale one alongside it was swept to 'aborted'. + expect(byId.get(stale.id)!.status).toBe('aborted'); + }); });