From 4c0a4eb9cc3f55dd4434f7298e982965717a53e4 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Sun, 28 Jun 2026 14:54:19 +0300 Subject: [PATCH] fix(ai-chat): settle detached runs on pre-stream failures + review fixes (#184) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CRITICAL: any failure between a successful beginRun and streamText's terminal callbacks taking ownership (the bare awaits: user-message insert, history load, convertToModelMessages, settings resolve; the buildSystemPrompt/forUser block; and synchronous streamText wiring) left ai_chat_runs stuck 'running' forever (sweepRunning only runs at startup), which then 409'd every future turn in the chat and made the observer tab poll forever. Wrap the body of stream() after beginRun in a safety-net try/catch that settles the run to 'error' (via onSettled) before rethrowing, and make finalizeRun idempotent (active.delete is the once-guard) so a settle here and a settle from a streamText callback collapse to a single terminal write. Also from review comment 2519: - correct three client comments that falsely claimed /ai-chat/run is "flag-gated server-side and would 403" — it is owner-gated only; with the feature off the chat simply has no runs so the endpoint returns { run: null } (ai-chat-window.tsx, ai-chat-service.ts, ai-chat-query.ts). - remove the dead UpdatableAiChatRun type (zero usages; the repo update uses an inline Partial<...>). - add controller specs for POST /ai-chat/run and /ai-chat/stop (owner-gating, run:null when no run, run+message, stop by runId and by chatId). - add tests: an exception after beginRun settles the run to 'error' and drops the in-memory entry (next turn is not 409'd); finalizeRun is idempotent. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../ai-chat/components/ai-chat-window.tsx | 6 +- .../features/ai-chat/queries/ai-chat-query.ts | 31 +- .../ai-chat/services/ai-chat-service.ts | 4 +- .../core/ai-chat/ai-chat-run.service.spec.ts | 38 +- .../src/core/ai-chat/ai-chat-run.service.ts | 24 +- .../ai-chat/ai-chat.controller.run.spec.ts | 163 +++ .../ai-chat/ai-chat.service.lifecycle.spec.ts | 98 +- .../src/core/ai-chat/ai-chat.service.ts | 1023 +++++++++-------- .../server/src/database/types/entity.types.ts | 17 +- 9 files changed, 876 insertions(+), 528 deletions(-) create mode 100644 apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts diff --git a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx index 52b425cc..e349a8fe 100644 --- a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx +++ b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx @@ -166,8 +166,10 @@ export default function AiChatWindow() { useAiChatMessagesQuery(activeChatId ?? undefined); // #184 reconnect-and-live-follow. Whether detached agent runs are enabled for - // this workspace; the reconnect endpoint is flag-gated server-side, so we must - // not poll it when the feature is off. + // this workspace. The reconnect endpoint itself is NOT flag-gated server-side + // (it is only owner-gated and returns `{ run: null }` when the chat has no + // run); but when the feature is off no runs are ever created, so polling it + // would always come back empty — we gate it off here to avoid pointless polls. const workspace = useAtomValue(workspaceAtom); const autonomousRunsEnabled = workspace?.settings?.ai?.autonomousRuns === true; diff --git a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts index 5dcf4afd..555d11b9 100644 --- a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts +++ b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts @@ -60,11 +60,12 @@ export const AI_CHAT_RUN_RQ_KEY = (chatId: string) => ["ai-chat-run", chatId]; export function useAiChatsQuery() { const query = useInfiniteQuery({ queryKey: AI_CHATS_RQ_KEY, - queryFn: ({ pageParam }) => - getAiChats({ cursor: pageParam, limit: 50 }), + queryFn: ({ pageParam }) => getAiChats({ cursor: pageParam, limit: 50 }), initialPageParam: undefined as string | undefined, getNextPageParam: (lastPage) => - lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined, + lastPage.meta.hasNextPage + ? (lastPage.meta.nextCursor ?? undefined) + : undefined, }); const data = useMemo | undefined>(() => { @@ -94,7 +95,9 @@ export function useAiChatMessagesQuery(chatId: string | undefined) { getAiChatMessages({ chatId: chatId as string, cursor: pageParam }), initialPageParam: undefined as string | undefined, getNextPageParam: (lastPage) => - lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined, + lastPage.meta.hasNextPage + ? (lastPage.meta.nextCursor ?? undefined) + : undefined, enabled: !!chatId, }); @@ -144,9 +147,10 @@ export function useAiChatMessagesQuery(chatId: string | undefined) { * is thus naturally bounded by the run terminating; no separate timeout cap. * * `enabled` gates the whole thing: callers pass `false` when the autonomous-runs - * feature is off (the endpoint is flag-gated server-side and would 403) OR when - * THIS tab is the one actively streaming the run (the live SSE owns the view, so - * we must not also poll/merge). The global `retry: false` means a 403/anything + * feature is off (the endpoint is NOT flag-gated server-side, but with the feature + * off the chat has no runs, so polling would only ever return `{ run: null }`) OR + * when THIS tab is the one actively streaming the run (the live SSE owns the view, + * so we must not also poll/merge). The global `retry: false` means a failed fetch * leaves `data` undefined, so refetchInterval(undefined run) returns false — a * failed fetch can never spin a tight loop. */ @@ -311,11 +315,14 @@ export function useImportAiRolesFromCatalogMutation() { mutationFn: (payload) => importAiRolesFromCatalog(payload), onSuccess: (result) => { notifications.show({ - message: t("Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}", { - created: result.created, - renamed: result.renamed, - skipped: result.skipped, - }), + message: t( + "Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}", + { + created: result.created, + renamed: result.renamed, + skipped: result.skipped, + }, + ), }); // Surface partial failures (e.g. unique-name races) as a red warning. if (result.errors.length > 0) { diff --git a/apps/client/src/features/ai-chat/services/ai-chat-service.ts b/apps/client/src/features/ai-chat/services/ai-chat-service.ts index abd5ab8e..fba0dfad 100644 --- a/apps/client/src/features/ai-chat/services/ai-chat-service.ts +++ b/apps/client/src/features/ai-chat/services/ai-chat-service.ts @@ -49,7 +49,9 @@ export async function getAiChatMessages( * partial output while the run is in-flight, the final output once it finished). * The DB is the source of truth, so this works for an in-flight run (the browser * dropped, the run kept going) and a finished one alike; `{ run: null }` when the - * chat has never had a run. Owner-gated and flag-gated server-side. + * chat has never had a run. Owner-gated server-side (the requesting user must own + * the chat); it is NOT flag-gated — when the feature is off the chat simply has no + * runs, so the endpoint returns `{ run: null }`. */ export async function getAiChatRun( chatId: string, diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts index 419a3bfc..d7c8ce97 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -12,10 +12,13 @@ function uniqueViolation(constraintName: string): Error & { code: string; constraint_name: string; } { - return Object.assign(new Error('duplicate key value violates unique constraint'), { - code: '23505', - constraint_name: constraintName, - }); + return Object.assign( + new Error('duplicate key value violates unique constraint'), + { + code: '23505', + constraint_name: constraintName, + }, + ); } /** @@ -260,6 +263,33 @@ describe('AiChatRunService run lifecycle', () => { ); }); + it('finalizeRun is IDEMPOTENT: a second settle no-ops (single terminal write)', async () => { + // The #184 review fix: AiChatService.stream wraps the turn in a safety-net + // catch that settles a failed turn AND streamText's terminal callback may + // also settle — both routes call finalizeRun. Only the FIRST may write the + // terminal row; the second must no-op so a late settle can never clobber the + // real terminal status or double-write the row. + const repo = makeRepo(); + const svc = new AiChatRunService(repo as never); + await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + + await svc.finalizeRun('run-1', 'ws-1', 'error', 'first'); + expect(svc.isLocallyActive('run-1')).toBe(false); + // A second settle (e.g. a streamText callback firing after the catch) no-ops. + await svc.finalizeRun('run-1', 'ws-1', 'completed', undefined); + + expect(repo.update).toHaveBeenCalledTimes(1); + expect(repo.update).toHaveBeenCalledWith( + 'run-1', + 'ws-1', + expect.objectContaining({ status: 'failed', error: 'first' }), + ); + }); + it('recordStep / linkAssistantMessage are best-effort: a repo failure is swallowed', async () => { const repo = makeRepo({ update: jest.fn(async () => { diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts index 1fb8ca4e..c9456420 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -201,10 +201,19 @@ export class AiChatRunService implements OnModuleInit { /** * Finalize a run to its terminal status (succeeded / failed / aborted), - * stamping finishedAt + any error, and DROP its in-memory entry. Idempotent - * and best-effort: the at-most-once turn terminal callbacks call it, but a - * double call (or a call after the row was swept) merely re-writes the same - * terminal row. + * stamping finishedAt + any error, and DROP its in-memory entry. Best-effort. + * + * IDEMPOTENT (#184 review): the terminal write happens AT MOST ONCE per run. + * `this.active.delete(runId)` returns false when the run was already settled + * (its in-memory entry already dropped); in that case we no-op. This collapses + * a legitimate double-settle to a single write: AiChatService.stream wraps the + * turn in a safety-net catch that settles the run to 'error' on any failure + * BEFORE streamText's terminal callbacks own the lifecycle — and on the rare + * path where streamText DID attach (so a callback also settles) the two would + * otherwise both call onSettled. The first caller wins and writes the terminal + * row; the second returns early, so a late settle can never clobber the real + * terminal status or double-write. beginRun always registers the entry before + * the turn can settle, so a legitimate first finalize always finds it. */ async finalizeRun( runId: string, @@ -212,7 +221,7 @@ export class AiChatRunService implements OnModuleInit { turnStatus: TurnTerminalStatus, error?: string, ): Promise { - this.active.delete(runId); + if (!this.active.delete(runId)) return; try { await this.runRepo.update(runId, workspaceId, { status: mapTurnStatusToRun(turnStatus), @@ -258,10 +267,7 @@ export class AiChatRunService implements OnModuleInit { /** Fetch a run by id (workspace-scoped). Used to resolve + ownership-check an * explicit stop targeting a runId. */ - getRun( - runId: string, - workspaceId: string, - ): Promise { + getRun(runId: string, workspaceId: string): Promise { return this.runRepo.findById(runId, workspaceId); } diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts b/apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts new file mode 100644 index 00000000..321ecda1 --- /dev/null +++ b/apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts @@ -0,0 +1,163 @@ +import { BadRequestException, ForbiddenException } from '@nestjs/common'; +import { AiChatController } from './ai-chat.controller'; +import type { User, Workspace } from '@docmost/db/types/entity.types'; + +/** + * Wiring spec for the #184 run-reconnect / run-stop endpoints + * (`POST /ai-chat/run` and `POST /ai-chat/stop`). Both are OWNER-gated via + * assertOwnedChat (the requesting user must own the chat) and NOT flag-gated. + * Exercised with hand-rolled mocks — no Nest graph, no DB. The controller's + * constructor order is (aiChatService, aiChatRunService, aiChatRepo, + * aiChatMessageRepo, aiTranscription). + */ +describe('AiChatController run endpoints (#184)', () => { + const user = { id: 'u1' } as User; + const workspace = { id: 'ws1' } as Workspace; + + function makeController(opts: { + chat?: unknown; // what aiChatRepo.findById returns (owner-gate) + run?: unknown; // getLatestForChat / getRun result + activeRun?: unknown; // getActiveForChat result + message?: unknown; // aiChatMessageRepo.findById result + stopped?: boolean; // requestStop result + }) { + const aiChatRunService = { + getLatestForChat: jest.fn().mockResolvedValue(opts.run), + getRun: jest.fn().mockResolvedValue(opts.run), + getActiveForChat: jest.fn().mockResolvedValue(opts.activeRun), + requestStop: jest.fn().mockResolvedValue(opts.stopped ?? false), + }; + const aiChatRepo = { + findById: jest.fn().mockResolvedValue(opts.chat), + }; + const aiChatMessageRepo = { + findById: jest.fn().mockResolvedValue(opts.message), + }; + const controller = new AiChatController( + {} as never, // aiChatService + aiChatRunService as never, + aiChatRepo as never, + aiChatMessageRepo as never, + {} as never, // aiTranscription + ); + return { controller, aiChatRunService, aiChatRepo, aiChatMessageRepo }; + } + + describe('POST /ai-chat/run (getRun)', () => { + it('owner-gates: a chat the user does not own throws ForbiddenException', async () => { + const { controller, aiChatRunService } = makeController({ + chat: { id: 'c1', creatorId: 'someone-else' }, + }); + await expect( + controller.getRun({ chatId: 'c1' }, user, workspace), + ).rejects.toBeInstanceOf(ForbiddenException); + // It must NOT reach the run lookup once the owner-gate fails. + expect(aiChatRunService.getLatestForChat).not.toHaveBeenCalled(); + }); + + it('returns { run: null, message: null } when the chat has never had a run', async () => { + const { controller, aiChatRunService } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + run: undefined, + }); + const res = await controller.getRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ run: null, message: null }); + expect(aiChatRunService.getLatestForChat).toHaveBeenCalledWith( + 'c1', + 'ws1', + ); + }); + + it('returns the run and its projected assistant message', async () => { + const run = { id: 'run-1', chatId: 'c1', assistantMessageId: 'm1' }; + const message = { id: 'm1', role: 'assistant' }; + const { controller, aiChatMessageRepo } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + run, + message, + }); + const res = await controller.getRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ run, message }); + expect(aiChatMessageRepo.findById).toHaveBeenCalledWith('m1', 'ws1'); + }); + + it('returns message: null when the run has no linked assistant message', async () => { + const run = { id: 'run-1', chatId: 'c1', assistantMessageId: null }; + const { controller, aiChatMessageRepo } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + run, + }); + const res = await controller.getRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ run, message: null }); + expect(aiChatMessageRepo.findById).not.toHaveBeenCalled(); + }); + }); + + describe('POST /ai-chat/stop (stopRun)', () => { + it('throws BadRequestException when neither runId nor chatId is given', async () => { + const { controller } = makeController({}); + await expect( + controller.stopRun({}, user, workspace), + ).rejects.toBeInstanceOf(BadRequestException); + }); + + it('stops by runId: owner-gates via the run’s chat, then requests the stop', async () => { + const { controller, aiChatRunService, aiChatRepo } = makeController({ + run: { id: 'run-1', chatId: 'c1' }, + chat: { id: 'c1', creatorId: 'u1' }, + stopped: true, + }); + const res = await controller.stopRun({ runId: 'run-1' }, user, workspace); + expect(res).toEqual({ stopped: true }); + expect(aiChatRunService.getRun).toHaveBeenCalledWith('run-1', 'ws1'); + expect(aiChatRepo.findById).toHaveBeenCalledWith('c1', 'ws1'); + expect(aiChatRunService.requestStop).toHaveBeenCalledWith('run-1', 'ws1'); + }); + + it('stops by runId: a foreign run’s chat throws ForbiddenException (no stop)', async () => { + const { controller, aiChatRunService } = makeController({ + run: { id: 'run-1', chatId: 'c1' }, + chat: { id: 'c1', creatorId: 'someone-else' }, + }); + await expect( + controller.stopRun({ runId: 'run-1' }, user, workspace), + ).rejects.toBeInstanceOf(ForbiddenException); + expect(aiChatRunService.requestStop).not.toHaveBeenCalled(); + }); + + it('stops by runId: an unknown run reports { stopped: false }', async () => { + const { controller, aiChatRunService } = makeController({ + run: undefined, + }); + const res = await controller.stopRun({ runId: 'gone' }, user, workspace); + expect(res).toEqual({ stopped: false }); + expect(aiChatRunService.requestStop).not.toHaveBeenCalled(); + }); + + it('stops by chatId: owner-gates, resolves the active run, requests the stop', async () => { + const { controller, aiChatRunService, aiChatRepo } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + activeRun: { id: 'run-9' }, + stopped: true, + }); + const res = await controller.stopRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ stopped: true }); + expect(aiChatRepo.findById).toHaveBeenCalledWith('c1', 'ws1'); + expect(aiChatRunService.getActiveForChat).toHaveBeenCalledWith( + 'c1', + 'ws1', + ); + expect(aiChatRunService.requestStop).toHaveBeenCalledWith('run-9', 'ws1'); + }); + + it('stops by chatId: reports { stopped: false } when no run is active', async () => { + const { controller, aiChatRunService } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + activeRun: undefined, + }); + const res = await controller.stopRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ stopped: false }); + expect(aiChatRunService.requestStop).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts index 77e9d3c4..602773d1 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts @@ -1,5 +1,7 @@ import { Logger } from '@nestjs/common'; -import { AiChatService } from './ai-chat.service'; +import { AiChatService, AiChatRunHooks } from './ai-chat.service'; +import { AiChatRunService } from './ai-chat-run.service'; +import type { User, Workspace } from '@docmost/db/types/entity.types'; /** * Lifecycle unit tests for AiChatService.onModuleInit (#183 crash-recovery @@ -59,3 +61,97 @@ describe('AiChatService.onModuleInit (startup sweep)', () => { expect(String(warnSpy.mock.calls[0][0])).toContain('db unavailable'); }); }); + +/** + * #184 CRITICAL run-lifecycle safety net (review fix). A transient failure + * AFTER a successful beginRun but BEFORE streamText's terminal callbacks own the + * lifecycle must STILL settle the run — otherwise the run row is stuck 'running' + * forever (sweepRunning only runs at startup) and the partial unique index + the + * controller pre-check 409 every future turn in that chat until a restart. Here + * we model the very first bare await after beginRun (the user-message insert) + * throwing, wiring the run hooks to a REAL AiChatRunService (mock repo) exactly + * as the controller does, and assert the run is settled to 'error' and its + * in-memory entry dropped (so a follow-up turn would NOT be 409'd). + */ +describe('AiChatService.stream run-lifecycle safety net (#184)', () => { + const user = { id: 'u1' } as User; + const workspace = { id: 'ws1' } as Workspace; + + afterEach(() => jest.restoreAllMocks()); + + it('an exception after beginRun settles the run to error and drops the in-memory entry', async () => { + jest.spyOn(Logger.prototype, 'error').mockImplementation(() => undefined); + + // Real run service over a mock repo, so finalizeRun's in-memory bookkeeping + // (active.delete) is exercised for real. + const runRepo = { + insert: jest.fn().mockResolvedValue({ id: 'run-1', status: 'running' }), + update: jest.fn().mockResolvedValue({ id: 'run-1' }), + }; + const runService = new AiChatRunService(runRepo as never); + + // The user-message insert (the first bare await after beginRun) throws. + const aiChatMessageRepo = { + insert: jest.fn().mockRejectedValue(new Error('insert boom')), + }; + const aiChatRepo = { + // Existing chat -> chatId stays, no new-chat insert path. + findById: jest.fn().mockResolvedValue({ id: 'chat-1', creatorId: 'u1' }), + }; + + const service = new AiChatService( + {} as never, // ai + aiChatRepo as never, + aiChatMessageRepo as never, + {} as never, // aiSettings + {} as never, // tools + {} as never, // mcpClients + {} as never, // aiAgentRoleRepo + {} as never, // pageRepo + {} as never, // pageAccess + ); + + const runHooks: AiChatRunHooks = { + begin: (chatId) => + runService.beginRun({ + chatId, + workspaceId: workspace.id, + userId: user.id, + trigger: 'user', + }), + onSettled: (runId, status, error) => + runService.finalizeRun(runId, workspace.id, status, error), + }; + + await expect( + service.stream({ + user, + workspace, + sessionId: 'sess', + body: { + chatId: 'chat-1', + messages: [ + { id: 'm', role: 'user', parts: [{ type: 'text', text: 'hi' }] }, + ], + }, + res: {} as never, + signal: new AbortController().signal, + model: {} as never, + role: null, + runHooks, + }), + ).rejects.toThrow('insert boom'); + + // The run was begun... + expect(runRepo.insert).toHaveBeenCalledTimes(1); + // ...then settled to a terminal FAILED status by the safety net... + expect(runRepo.update).toHaveBeenCalledTimes(1); + expect(runRepo.update).toHaveBeenCalledWith( + 'run-1', + 'ws1', + expect.objectContaining({ status: 'failed' }), + ); + // ...and the in-memory entry is gone, so a follow-up turn is NOT 409'd. + expect(runService.isLocallyActive('run-1')).toBe(false); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index 4ffb1cef..66a7d924 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -154,9 +154,7 @@ export interface AiChatRunHooks { // Called once the chat id is resolved; returns the run handle whose `signal` // drives the agent loop's abort. Returning null disables run tracking (the // turn falls back to the passed-in socket signal). - begin( - chatId: string, - ): Promise<{ runId: string; signal: AbortSignal } | null>; + begin(chatId: string): Promise<{ runId: string; signal: AbortSignal } | null>; onAssistantSeeded?( runId: string, assistantMessageId: string, @@ -418,531 +416,571 @@ export class AiChatService implements OnModuleInit { } } - // Extract the incoming user turn (the last user message from useChat). - const incoming = lastUserMessage(body.messages); - const incomingText = uiMessageText(incoming); - - // Persist the user message before contacting the model. - await this.aiChatMessageRepo.insert({ - chatId, - workspaceId: workspace.id, - userId: user.id, - role: 'user', - content: incomingText, - // jsonb column: UIMessage parts are JSON-serializable at runtime but not - // structurally `JsonValue`, so cast through unknown. - metadata: (incoming?.parts ? { parts: incoming.parts } : null) as never, - }); - - // Rebuild the conversation from persisted history (not the client payload), - // so the model always sees the authoritative server-side transcript. Load - // the FULL history in chronological order (oldest -> newest, incl. the user - // message just inserted above) so NO turns are dropped — there is no - // recent-tail window anymore. `findAllByChat` keeps a 5000-row memory-safety - // backstop (on overflow it keeps the NEWEST rows and logs a warning); that - // is a safety net far above any realistic chat, not a conversational limit. - const history = await this.aiChatMessageRepo.findAllByChat( - chatId, - workspace.id, - ); - const uiMessages = history.map(rowToUiMessage); - // convertToModelMessages is async in ai@6.0.134 (returns Promise). - const messages = await convertToModelMessages(uiMessages); - - // Interrupt-resume detection (#198): the client "send now" flag is only a - // hint — confirm it against the persisted history (the preceding assistant - // turn must really be aborted/streaming) so a spoofed flag cannot inject the - // interrupt note onto an ordinary turn. The partial output the model needs is - // already in `messages` (the aborted assistant row replays via findRecent). - const interrupted = isInterruptResume(history, body.interrupted); - - // The model is resolved by the controller before hijack (clean 503 path). - // Here we only need the admin-configured system prompt. - const resolved = await this.aiSettings.resolve(workspace.id); - - // Build the external MCP toolset FIRST so the system prompt can carry each - // connected server's admin-authored guidance (#180). Merge in admin- - // configured external MCP tools (web search, etc.; §6.8). A down/slow - // external server never crashes the turn — toolsFor skips it and records the - // outcome. The returned client handles MUST be closed in the streamText - // lifecycle (onFinish/onError/onAbort) — leaking them is a bug. Docmost - // tools take precedence on a name clash (external are namespaced, so a clash - // is not expected; the spread order makes intent explicit). - let external: Awaited> = { - tools: {}, - clients: [], - outcomes: [], - instructions: [], - }; + // #184 RUN-LIFECYCLE SAFETY NET (review fix). Everything from here until + // streamText's terminal callbacks (onFinish/onError/onAbort) take ownership of + // the run lifecycle runs under this try. Between a successful beginRun (run row + // 'running' + an in-memory AbortController) and streamText attaching there are + // bare awaits — the user-message insert, findAllByChat, convertToModelMessages, + // aiSettings.resolve — plus the buildSystemPrompt/forUser block and the + // synchronous streamText wiring, NONE of which finalize the run on failure (the + // inner catches only release MCP clients and rethrow). Without this net any + // transient failure there would leave ai_chat_runs stuck 'running' FOREVER + // (sweepRunning only runs at server startup): the partial unique index + the + // controller pre-check would then 409 every future turn in this chat until a + // restart, and an observer tab would poll forever. The catch settles the run to + // 'error' (which deletes the in-memory entry and writes a terminal row) before + // rethrowing to the controller. finalizeRun (via onSettled) is idempotent, so + // if streamText DID attach and a terminal callback also settles, exactly one + // terminal write wins — never a double-settle. try { - external = await this.mcpClients.toolsFor(workspace.id); - } catch (err) { - // Building the external toolset must never break the turn; proceed with - // Docmost-only tools. Never log URLs/headers — short message only. - this.logger.warn( - `External MCP toolset unavailable: ${ - err instanceof Error ? err.message : 'unknown error' - }`, - ); - } + // Extract the incoming user turn (the last user message from useChat). + const incoming = lastUserMessage(body.messages); + const incomingText = uiMessageText(incoming); - // Close every external client EXACTLY ONCE across the turn's terminal - // callbacks (onFinish/onError/onAbort all fire at most once collectively, - // but guard anyway). DEFINED HERE — before the prompt/toolset are built — so - // that if buildSystemPrompt or forUser throws AFTER the external lease was - // taken (toolsFor above), the lease is still released. Otherwise its refCount - // stays >= 1 forever and the external undici sockets leak until restart - // (#180 reorder moved toolsFor ahead of these; #185 review). Close errors are - // swallowed so they never break the response. - let clientsClosed = false; - const closeExternalClients = async (): Promise => { - if (clientsClosed) return; - clientsClosed = true; - await Promise.all( - external.clients.map((c) => - c.close().catch((closeErr) => { - this.logger.warn( - `Failed to close external MCP client: ${ - closeErr instanceof Error ? closeErr.message : 'unknown error' - }`, - ); - }), - ), - ); - }; - - // Build the system prompt + Docmost toolset. If either throws after the - // external MCP lease was taken above, release the lease before rethrowing so - // the leased transports are not leaked (#185 review). - let system: string; - let docmostTools: Awaited>; - try { - system = buildSystemPrompt({ - workspace, - adminPrompt: resolved?.systemPrompt, - // The role (pre-resolved by the controller) REPLACES the persona layer; - // the safety framework is still appended by buildSystemPrompt. - roleInstructions: role?.instructions, - // Server-validated open page (authoritative title), not the client value. - openedPage: openPageContext, - // Guidance only for servers that connected and yielded ≥1 callable tool. - mcpInstructions: external.instructions, - // History-confirmed interrupt-resume flag (#198): adds the interrupt note - // so the model treats the partial answer above as cut off, not finished. - interrupted, - }); - - // Pass the resolved chatId so the write tools can mint provenance tokens - // (access + collab) carrying { actor:'agent', aiChatId: chatId }, making - // agent REST/collab writes attributable and non-spoofable (§6.5/§6.6). - docmostTools = await this.tools.forUser( - user, - sessionId, - workspace.id, - chatId, - // Same server-validated open page used by the system prompt above; - // exposed to the model via getCurrentPage so page identity (and the - // AUTHORITATIVE title) survives prompt mangling / client title spoofing. - openPageContext, - ); - } catch (err) { - await closeExternalClients(); - throw err; - } - - const tools = { ...external.tools, ...docmostTools }; - - // Accumulate the turn's streamed output so a provider error / disconnect can - // persist the PARTIAL answer the user already saw — the SDK's onError/onAbort - // callbacks don't hand us the in-progress text. `capturedSteps` holds finished - // steps (tool calls + their text); `inProgressText` holds the text streamed in - // the CURRENT, not-yet-finished step, reset whenever a step finishes. - const capturedSteps: StepLike[] = []; - let inProgressText = ''; - - // Step-granular durability (#183): create the assistant row UPFRONT in the - // 'streaming' state (before any token), then UPDATE it as each step finishes - // and finalize it once on the terminal callback. If the process dies - // mid-turn the row survives with every finished step already persisted; the - // startup sweep (sweepStreaming) later flips a dangling 'streaming' row to - // 'aborted'. The DB is now the single source of truth for the turn — the - // socket is never required for the write path. A failed upfront insert is - // logged and leaves assistantId undefined; the per-step/terminal updates then - // no-op (guarded below) so the turn still streams to the user. - let assistantId: string | undefined; - try { - const seed = flushAssistant([], '', 'streaming'); - const seeded = await this.aiChatMessageRepo.insert({ + // Persist the user message before contacting the model. + await this.aiChatMessageRepo.insert({ chatId, workspaceId: workspace.id, userId: user.id, - role: 'assistant', - content: seed.content, - // jsonb columns: cast through never (same as the user insert above). - toolCalls: (seed.toolCalls ?? null) as never, - metadata: seed.metadata as never, - status: seed.status, + role: 'user', + content: incomingText, + // jsonb column: UIMessage parts are JSON-serializable at runtime but not + // structurally `JsonValue`, so cast through unknown. + metadata: (incoming?.parts ? { parts: incoming.parts } : null) as never, }); - assistantId = seeded?.id; - } catch (err) { - this.logger.error( - `Failed to insert upfront assistant row (chat ${chatId}, workspace ${workspace.id})`, - err as Error, + + // Rebuild the conversation from persisted history (not the client payload), + // so the model always sees the authoritative server-side transcript. Load + // the FULL history in chronological order (oldest -> newest, incl. the user + // message just inserted above) so NO turns are dropped — there is no + // recent-tail window anymore. `findAllByChat` keeps a 5000-row memory-safety + // backstop (on overflow it keeps the NEWEST rows and logs a warning); that + // is a safety net far above any realistic chat, not a conversational limit. + const history = await this.aiChatMessageRepo.findAllByChat( + chatId, + workspace.id, ); - } + const uiMessages = history.map(rowToUiMessage); + // convertToModelMessages is async in ai@6.0.134 (returns Promise). + const messages = await convertToModelMessages(uiMessages); - // Link the assistant message (the #183 projection) to its run (#184), so a - // reconnecting client can resolve the run's output. Best-effort. - if (runId && assistantId) { + // Interrupt-resume detection (#198): the client "send now" flag is only a + // hint — confirm it against the persisted history (the preceding assistant + // turn must really be aborted/streaming) so a spoofed flag cannot inject the + // interrupt note onto an ordinary turn. The partial output the model needs is + // already in `messages` (the aborted assistant row replays via findRecent). + const interrupted = isInterruptResume(history, body.interrupted); + + // The model is resolved by the controller before hijack (clean 503 path). + // Here we only need the admin-configured system prompt. + const resolved = await this.aiSettings.resolve(workspace.id); + + // Build the external MCP toolset FIRST so the system prompt can carry each + // connected server's admin-authored guidance (#180). Merge in admin- + // configured external MCP tools (web search, etc.; §6.8). A down/slow + // external server never crashes the turn — toolsFor skips it and records the + // outcome. The returned client handles MUST be closed in the streamText + // lifecycle (onFinish/onError/onAbort) — leaking them is a bug. Docmost + // tools take precedence on a name clash (external are namespaced, so a clash + // is not expected; the spread order makes intent explicit). + let external: Awaited> = { + tools: {}, + clients: [], + outcomes: [], + instructions: [], + }; try { - await runHooks?.onAssistantSeeded?.(runId, assistantId); + external = await this.mcpClients.toolsFor(workspace.id); } catch (err) { + // Building the external toolset must never break the turn; proceed with + // Docmost-only tools. Never log URLs/headers — short message only. this.logger.warn( - `Failed to link assistant row to run ${runId}: ${ + `External MCP toolset unavailable: ${ err instanceof Error ? err.message : 'unknown error' }`, ); } - } - // Per-step (non-terminal) update: persist the finished steps the moment a - // step ends. Tolerant — a failed update is logged and swallowed so it never - // throws into the stream. Keeps status 'streaming'. - const updateStreaming = async (): Promise => { - if (!assistantId) return; - // Cheap short-circuit once the turn is finalized (see `finalized` below). - // The AUTHORITATIVE guard is `onlyIfStreaming` on the UPDATE: a late - // fire-and-forget step update could still be in flight on another pool - // connection when finalize runs, so the SQL `WHERE status='streaming'` - // (not this flag) is what prevents it clobbering the terminal row. - if (finalized) return; + // Close every external client EXACTLY ONCE across the turn's terminal + // callbacks (onFinish/onError/onAbort all fire at most once collectively, + // but guard anyway). DEFINED HERE — before the prompt/toolset are built — so + // that if buildSystemPrompt or forUser throws AFTER the external lease was + // taken (toolsFor above), the lease is still released. Otherwise its refCount + // stays >= 1 forever and the external undici sockets leak until restart + // (#180 reorder moved toolsFor ahead of these; #185 review). Close errors are + // swallowed so they never break the response. + let clientsClosed = false; + const closeExternalClients = async (): Promise => { + if (clientsClosed) return; + clientsClosed = true; + await Promise.all( + external.clients.map((c) => + c.close().catch((closeErr) => { + this.logger.warn( + `Failed to close external MCP client: ${ + closeErr instanceof Error ? closeErr.message : 'unknown error' + }`, + ); + }), + ), + ); + }; + + // Build the system prompt + Docmost toolset. If either throws after the + // external MCP lease was taken above, release the lease before rethrowing so + // the leased transports are not leaked (#185 review). + let system: string; + let docmostTools: Awaited>; try { - await this.aiChatMessageRepo.update( - assistantId, + system = buildSystemPrompt({ + workspace, + adminPrompt: resolved?.systemPrompt, + // The role (pre-resolved by the controller) REPLACES the persona layer; + // the safety framework is still appended by buildSystemPrompt. + roleInstructions: role?.instructions, + // Server-validated open page (authoritative title), not the client value. + openedPage: openPageContext, + // Guidance only for servers that connected and yielded ≥1 callable tool. + mcpInstructions: external.instructions, + // History-confirmed interrupt-resume flag (#198): adds the interrupt note + // so the model treats the partial answer above as cut off, not finished. + interrupted, + }); + + // Pass the resolved chatId so the write tools can mint provenance tokens + // (access + collab) carrying { actor:'agent', aiChatId: chatId }, making + // agent REST/collab writes attributable and non-spoofable (§6.5/§6.6). + docmostTools = await this.tools.forUser( + user, + sessionId, workspace.id, - flushAssistant(capturedSteps, '', 'streaming'), - { onlyIfStreaming: true }, + chatId, + // Same server-validated open page used by the system prompt above; + // exposed to the model via getCurrentPage so page identity (and the + // AUTHORITATIVE title) survives prompt mangling / client title spoofing. + openPageContext, ); } catch (err) { - this.logger.warn( - `Failed to update streaming assistant row: ${ - err instanceof Error ? err.message : 'unknown error' - }`, - ); + await closeExternalClients(); + throw err; } - }; - // Serialize the per-step updates (#183 review): onStepFinish fires them - // without await, so two could otherwise commit out of order on different pool - // connections (step N landing after N+1). Chaining each onto the previous - // keeps the persisted row monotonic with step order; each link short-circuits - // on `finalized`, so a tail of late updates is cheap. - let stepUpdateChain: Promise = Promise.resolve(); + const tools = { ...external.tools, ...docmostTools }; - // Terminal finalize: write the completed/error/aborted row exactly once - // across the (mutually-exclusive, at-most-once) onFinish/onError/onAbort - // callbacks — mirroring the pre-#183 persist-at-most-once guard for the - // TERMINAL status (the row may be updated many times with 'streaming' before - // this fires once). - let finalized = false; - const finalizeAssistant = async ( - flushed: AssistantFlush, - ): Promise => { - if (finalized) return; - finalized = true; - const plan = planFinalizeAssistant(assistantId); + // Accumulate the turn's streamed output so a provider error / disconnect can + // persist the PARTIAL answer the user already saw — the SDK's onError/onAbort + // callbacks don't hand us the in-progress text. `capturedSteps` holds finished + // steps (tool calls + their text); `inProgressText` holds the text streamed in + // the CURRENT, not-yet-finished step, reset whenever a step finishes. + const capturedSteps: StepLike[] = []; + let inProgressText = ''; + + // Step-granular durability (#183): create the assistant row UPFRONT in the + // 'streaming' state (before any token), then UPDATE it as each step finishes + // and finalize it once on the terminal callback. If the process dies + // mid-turn the row survives with every finished step already persisted; the + // startup sweep (sweepStreaming) later flips a dangling 'streaming' row to + // 'aborted'. The DB is now the single source of truth for the turn — the + // socket is never required for the write path. A failed upfront insert is + // logged and leaves assistantId undefined; the per-step/terminal updates then + // no-op (guarded below) so the turn still streams to the user. + let assistantId: string | undefined; try { - // Shared dispatch (see applyFinalize): UPDATE the upfront row, or — when - // the upfront insert failed (kind 'insert') — INSERT the terminal row as - // the only safety against losing the turn entirely. - await applyFinalize( - this.aiChatMessageRepo, - plan, - { chatId, workspaceId: workspace.id, userId: user.id }, - flushed, - ); + const seed = flushAssistant([], '', 'streaming'); + const seeded = await this.aiChatMessageRepo.insert({ + chatId, + workspaceId: workspace.id, + userId: user.id, + role: 'assistant', + content: seed.content, + // jsonb columns: cast through never (same as the user insert above). + toolCalls: (seed.toolCalls ?? null) as never, + metadata: seed.metadata as never, + status: seed.status, + }); + assistantId = seeded?.id; } catch (err) { this.logger.error( - `Failed to finalize assistant message (kind=${plan.kind})`, + `Failed to insert upfront assistant row (chat ${chatId}, workspace ${workspace.id})`, err as Error, ); } - }; - // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure - // first-chunk latency, the model-silent gap right before a disconnect, and - // how many SSE heartbeats were written, so a Safari drop can be classified - // (idle-gap vs hard wall-clock cap vs slow first chunk). - const streamStartedAt = Date.now(); - let firstModelChunkAt: number | undefined; - let lastModelChunkAt = streamStartedAt; - let heartbeatsSent = 0; - - // NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous - // failure here (or in pipe below) would skip the terminal callbacks, so the - // catch releases the leased external clients to avoid a connection leak. - let result: ReturnType; - try { - result = streamText({ - model, - system, - messages, - tools, - // No maxOutputTokens cap on the agent: tool-call arguments (e.g. a full - // page body for the write tools) are emitted as OUTPUT tokens, so a fixed - // cap would truncate complex tool calls mid-argument. Let the model use its - // natural per-step budget. (Cost/credit limits are an account concern, not - // something to enforce by silently breaking the agent.) - stopWhen: stepCountIs(MAX_AGENT_STEPS), - // Forced finalization: reserve the LAST allowed step for a text-only - // answer. Without this, a turn that spends all its steps on tool calls - // ends with no assistant text (an empty turn). prepareAgentStep forbids - // further tool calls and appends a synthesis instruction on that step, - // concatenated onto the original `system` so the persona is preserved. - prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), - // #184: the RUN's signal (explicit-stop) when a run wraps this turn, else - // the socket-bound signal (legacy). A browser disconnect aborts only in - // the legacy path. - abortSignal: effectiveSignal, - onChunk: ({ chunk }) => { - // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model - // output chunk means the stream is actively emitting bytes; track first - // + most-recent activity timestamps. - const now = Date.now(); - firstModelChunkAt ??= now; - lastModelChunkAt = now; - // 'text-delta' is the assistant's prose; tool-call args are separate chunk - // types — so this mirrors exactly what streams to the client. - if (chunk.type === 'text-delta') inProgressText += chunk.text; - }, - onStepFinish: (step) => { - // The finished step's full text is now in `step.text`; fold it in and reset - // the in-progress accumulator for the next step. - capturedSteps.push(step as StepLike); - inProgressText = ''; - // Step-granular durability (#183): persist this finished step (its text + - // tool calls + tool RESULTS) the moment it ends, so a process death after - // this point still recovers the step. Not awaited here (never block the - // stream), but SERIALIZED via stepUpdateChain so the writes commit in - // step order; updateStreaming is error-tolerant (logs + swallows). - stepUpdateChain = stepUpdateChain.then(() => updateStreaming()); - // #184: persist the run's progress (finished-step count). Fire-and- - // forget; the hook swallows its own errors. - if (runId) runHooks?.onStep?.(runId, capturedSteps.length); - }, - onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => { - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: success - // baseline for Safari comparison. - const diagNow = Date.now(); - this.logger.log( - `AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` + - `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + - `heartbeatsSent=${heartbeatsSent} steps=${steps.length}`, + // Link the assistant message (the #183 projection) to its run (#184), so a + // reconnecting client can resolve the run's output. Best-effort. + if (runId && assistantId) { + try { + await runHooks?.onAssistantSeeded?.(runId, assistantId); + } catch (err) { + this.logger.warn( + `Failed to link assistant row to run ${runId}: ${ + err instanceof Error ? err.message : 'unknown error' + }`, ); - // Finalize the assistant row (#183): the upfront 'streaming' row is - // UPDATEd to 'completed' with the turn's final text, cumulative usage and - // full UIMessage parts. We pass the SDK `steps` (which carry the final - // step's text) as the captured steps so metadata.parts matches the - // pre-#183 onFinish record exactly; `inProgressText` is '' here (the last - // step already finished). Final-step usage (usage.input+output) ≈ the - // conversation's CURRENT context size, distinct from totalUsage. - // - // COLUMN-SEMANTICS NOTE (#183): `content` is built by flushAssistant as - // the CONCATENATION of every step's text (stepsText), whereas pre-#183 - // it stored only the FINAL step's text. This is a deliberate, harmless - // change: the UI and the Markdown export render from `metadata.parts` - // (per-step text + tool parts), not from `content`; `content` is the - // plain-text projection (full-text search / fallback). A multi-step - // turn's `content` therefore now holds all steps' prose, not just the - // last block. - await finalizeAssistant( - flushAssistant(steps as StepLike[], '', 'completed', { - finishReason: finishReason as string, - usage: totalUsage as StreamUsage, - contextTokens: - (usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) || - undefined, - // Max context window for the chat header badge denominator; - // resolved from the admin-configured provider settings (in - // closure scope here). Omitted/0 = no limit. - maxContextTokens: resolved?.chatContextWindow, - }), - ); - // #184: settle the RUN as succeeded (best-effort, after the projection - // is finalized above). - if (runId) await runHooks?.onSettled?.(runId, 'completed'); - // Lifecycle: release the external MCP clients leased for this turn. - await closeExternalClients(); + } + } - // Generate the chat title for a freshly created chat AFTER the stream's - // provider call has completed — NOT concurrently with it. The z.ai coding - // endpoint stalls one of two concurrent requests to the same plan, which - // black-holed the chat stream (~300s headers timeout) when title - // generation raced it. Running it here (solo, fire-and-forget) avoids the - // race; never block the turn on it, swallow any error. - if (isNewChat && incomingText) { - void this.generateTitle(chatId, workspace.id, incomingText).catch( - (err) => { - this.logger.warn( - `Title generation failed: ${(err as Error)?.message ?? err}`, - ); - }, + // Per-step (non-terminal) update: persist the finished steps the moment a + // step ends. Tolerant — a failed update is logged and swallowed so it never + // throws into the stream. Keeps status 'streaming'. + const updateStreaming = async (): Promise => { + if (!assistantId) return; + // Cheap short-circuit once the turn is finalized (see `finalized` below). + // The AUTHORITATIVE guard is `onlyIfStreaming` on the UPDATE: a late + // fire-and-forget step update could still be in flight on another pool + // connection when finalize runs, so the SQL `WHERE status='streaming'` + // (not this flag) is what prevents it clobbering the terminal row. + if (finalized) return; + try { + await this.aiChatMessageRepo.update( + assistantId, + workspace.id, + flushAssistant(capturedSteps, '', 'streaming'), + { onlyIfStreaming: true }, + ); + } catch (err) { + this.logger.warn( + `Failed to update streaming assistant row: ${ + err instanceof Error ? err.message : 'unknown error' + }`, + ); + } + }; + + // Serialize the per-step updates (#183 review): onStepFinish fires them + // without await, so two could otherwise commit out of order on different pool + // connections (step N landing after N+1). Chaining each onto the previous + // keeps the persisted row monotonic with step order; each link short-circuits + // on `finalized`, so a tail of late updates is cheap. + let stepUpdateChain: Promise = Promise.resolve(); + + // Terminal finalize: write the completed/error/aborted row exactly once + // across the (mutually-exclusive, at-most-once) onFinish/onError/onAbort + // callbacks — mirroring the pre-#183 persist-at-most-once guard for the + // TERMINAL status (the row may be updated many times with 'streaming' before + // this fires once). + let finalized = false; + const finalizeAssistant = async ( + flushed: AssistantFlush, + ): Promise => { + if (finalized) return; + finalized = true; + const plan = planFinalizeAssistant(assistantId); + try { + // Shared dispatch (see applyFinalize): UPDATE the upfront row, or — when + // the upfront insert failed (kind 'insert') — INSERT the terminal row as + // the only safety against losing the turn entirely. + await applyFinalize( + this.aiChatMessageRepo, + plan, + { chatId, workspaceId: workspace.id, userId: user.id }, + flushed, + ); + } catch (err) { + this.logger.error( + `Failed to finalize assistant message (kind=${plan.kind})`, + err as Error, + ); + } + }; + + // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure + // first-chunk latency, the model-silent gap right before a disconnect, and + // how many SSE heartbeats were written, so a Safari drop can be classified + // (idle-gap vs hard wall-clock cap vs slow first chunk). + const streamStartedAt = Date.now(); + let firstModelChunkAt: number | undefined; + let lastModelChunkAt = streamStartedAt; + let heartbeatsSent = 0; + + // NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous + // failure here (or in pipe below) would skip the terminal callbacks, so the + // catch releases the leased external clients to avoid a connection leak. + let result: ReturnType; + try { + result = streamText({ + model, + system, + messages, + tools, + // No maxOutputTokens cap on the agent: tool-call arguments (e.g. a full + // page body for the write tools) are emitted as OUTPUT tokens, so a fixed + // cap would truncate complex tool calls mid-argument. Let the model use its + // natural per-step budget. (Cost/credit limits are an account concern, not + // something to enforce by silently breaking the agent.) + stopWhen: stepCountIs(MAX_AGENT_STEPS), + // Forced finalization: reserve the LAST allowed step for a text-only + // answer. Without this, a turn that spends all its steps on tool calls + // ends with no assistant text (an empty turn). prepareAgentStep forbids + // further tool calls and appends a synthesis instruction on that step, + // concatenated onto the original `system` so the persona is preserved. + prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), + // #184: the RUN's signal (explicit-stop) when a run wraps this turn, else + // the socket-bound signal (legacy). A browser disconnect aborts only in + // the legacy path. + abortSignal: effectiveSignal, + onChunk: ({ chunk }) => { + // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model + // output chunk means the stream is actively emitting bytes; track first + // + most-recent activity timestamps. + const now = Date.now(); + firstModelChunkAt ??= now; + lastModelChunkAt = now; + // 'text-delta' is the assistant's prose; tool-call args are separate chunk + // types — so this mirrors exactly what streams to the client. + if (chunk.type === 'text-delta') inProgressText += chunk.text; + }, + onStepFinish: (step) => { + // The finished step's full text is now in `step.text`; fold it in and reset + // the in-progress accumulator for the next step. + capturedSteps.push(step as StepLike); + inProgressText = ''; + // Step-granular durability (#183): persist this finished step (its text + + // tool calls + tool RESULTS) the moment it ends, so a process death after + // this point still recovers the step. Not awaited here (never block the + // stream), but SERIALIZED via stepUpdateChain so the writes commit in + // step order; updateStreaming is error-tolerant (logs + swallows). + stepUpdateChain = stepUpdateChain.then(() => updateStreaming()); + // #184: persist the run's progress (finished-step count). Fire-and- + // forget; the hook swallows its own errors. + if (runId) runHooks?.onStep?.(runId, capturedSteps.length); + }, + onFinish: async ({ + text, + finishReason, + totalUsage, + usage, + steps, + }) => { + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: success + // baseline for Safari comparison. + const diagNow = Date.now(); + this.logger.log( + `AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `heartbeatsSent=${heartbeatsSent} steps=${steps.length}`, ); - } - }, - onError: async ({ error }) => { - // NestJS Logger.error(message, stack?, context?): pass the real message - // (with statusCode when present) + the stack string, not the Error - // object, so the actual provider cause is clearly logged. Reuse the - // shared formatter so provider error formatting stays unified. - const e = error as { stack?: string }; - const errorText = describeProviderError(error, String(error)); - this.logger.error(`AI chat stream error: ${errorText}`, e?.stack); - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of - // an error-terminated stream. - const diagNow = Date.now(); - this.logger.warn( - `AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` + - `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + - `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`, - ); - // Finalize the PARTIAL answer streamed before the failure (text + any - // finished tool steps) WITH the error in metadata, so the turn shows what - // the user already saw plus the cause — not just a bare error. Status - // 'error' (#183). - await finalizeAssistant( - flushAssistant(capturedSteps, inProgressText, 'error', { - error: errorText, - }), - ); - // #184: settle the RUN as failed, carrying the provider/transport cause. - if (runId) await runHooks?.onSettled?.(runId, 'error', errorText); - await closeExternalClients(); - }, - onAbort: async ({ steps }) => { - const partialChars = - capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) + - inProgressText.length; - // Unlike onError/onFinish, this terminal path otherwise writes nothing, so - // an aborted turn (client disconnect / proxy drop / stop()) would be - // invisible in the logs. Log it (warn) so the abort is traceable. - this.logger.warn( - `AI chat stream aborted (chat ${chatId}) after ${steps.length} ` + - `step(s), ${partialChars} chars partial text; persisting partial turn.`, - ); - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key - // line — classifies the Safari drop. - const diagNow = Date.now(); - this.logger.warn( - `AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` + - `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + - `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` + - `steps=${steps.length}`, - ); - await finalizeAssistant( - flushAssistant(capturedSteps, inProgressText, 'aborted'), - ); - // #184: settle the RUN as aborted (an explicit user stop reached the - // run's signal; a disconnect does not abort a run-wrapped turn). - if (runId) await runHooks?.onSettled?.(runId, 'aborted'); - await closeExternalClients(); - }, - }); + // Finalize the assistant row (#183): the upfront 'streaming' row is + // UPDATEd to 'completed' with the turn's final text, cumulative usage and + // full UIMessage parts. We pass the SDK `steps` (which carry the final + // step's text) as the captured steps so metadata.parts matches the + // pre-#183 onFinish record exactly; `inProgressText` is '' here (the last + // step already finished). Final-step usage (usage.input+output) ≈ the + // conversation's CURRENT context size, distinct from totalUsage. + // + // COLUMN-SEMANTICS NOTE (#183): `content` is built by flushAssistant as + // the CONCATENATION of every step's text (stepsText), whereas pre-#183 + // it stored only the FINAL step's text. This is a deliberate, harmless + // change: the UI and the Markdown export render from `metadata.parts` + // (per-step text + tool parts), not from `content`; `content` is the + // plain-text projection (full-text search / fallback). A multi-step + // turn's `content` therefore now holds all steps' prose, not just the + // last block. + await finalizeAssistant( + flushAssistant(steps as StepLike[], '', 'completed', { + finishReason: finishReason as string, + usage: totalUsage as StreamUsage, + contextTokens: + (usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) || + undefined, + // Max context window for the chat header badge denominator; + // resolved from the admin-configured provider settings (in + // closure scope here). Omitted/0 = no limit. + maxContextTokens: resolved?.chatContextWindow, + }), + ); + // #184: settle the RUN as succeeded (best-effort, after the projection + // is finalized above). + if (runId) await runHooks?.onSettled?.(runId, 'completed'); + // Lifecycle: release the external MCP clients leased for this turn. + await closeExternalClients(); - // Drain the stream independently of the client socket so the turn always - // runs to completion (or to its abort) and the terminal callbacks - // (onFinish/onError/onAbort) fire — releasing the per-turn object graph - // (history, the per-request toolset closures, captured steps, SDK buffers) - // and closing leased MCP clients. WITHOUT this, a client disconnect leaves - // the pipe's dead socket as the only reader; backpressure stalls the stream, - // the callbacks never run, and every dropped turn stays rooted in memory — - // the heap-OOM leak. consumeStream removes that backpressure (AI SDK v6 - // "Handling client disconnects"). NOT awaited (fire-and-forget); the stream - // errors are already logged by the streamText `onError` callback above, so - // swallow here to avoid an unhandledRejection. - void result.consumeStream({ onError: () => undefined }); + // Generate the chat title for a freshly created chat AFTER the stream's + // provider call has completed — NOT concurrently with it. The z.ai coding + // endpoint stalls one of two concurrent requests to the same plan, which + // black-holed the chat stream (~300s headers timeout) when title + // generation raced it. Running it here (solo, fire-and-forget) avoids the + // race; never block the turn on it, swallow any error. + if (isNewChat && incomingText) { + void this.generateTitle(chatId, workspace.id, incomingText).catch( + (err) => { + this.logger.warn( + `Title generation failed: ${(err as Error)?.message ?? err}`, + ); + }, + ); + } + }, + onError: async ({ error }) => { + // NestJS Logger.error(message, stack?, context?): pass the real message + // (with statusCode when present) + the stack string, not the Error + // object, so the actual provider cause is clearly logged. Reuse the + // shared formatter so provider error formatting stays unified. + const e = error as { stack?: string }; + const errorText = describeProviderError(error, String(error)); + this.logger.error(`AI chat stream error: ${errorText}`, e?.stack); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of + // an error-terminated stream. + const diagNow = Date.now(); + this.logger.warn( + `AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`, + ); + // Finalize the PARTIAL answer streamed before the failure (text + any + // finished tool steps) WITH the error in metadata, so the turn shows what + // the user already saw plus the cause — not just a bare error. Status + // 'error' (#183). + await finalizeAssistant( + flushAssistant(capturedSteps, inProgressText, 'error', { + error: errorText, + }), + ); + // #184: settle the RUN as failed, carrying the provider/transport cause. + if (runId) await runHooks?.onSettled?.(runId, 'error', errorText); + await closeExternalClients(); + }, + onAbort: async ({ steps }) => { + const partialChars = + capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) + + inProgressText.length; + // Unlike onError/onFinish, this terminal path otherwise writes nothing, so + // an aborted turn (client disconnect / proxy drop / stop()) would be + // invisible in the logs. Log it (warn) so the abort is traceable. + this.logger.warn( + `AI chat stream aborted (chat ${chatId}) after ${steps.length} ` + + `step(s), ${partialChars} chars partial text; persisting partial turn.`, + ); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key + // line — classifies the Safari drop. + const diagNow = Date.now(); + this.logger.warn( + `AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` + + `steps=${steps.length}`, + ); + await finalizeAssistant( + flushAssistant(capturedSteps, inProgressText, 'aborted'), + ); + // #184: settle the RUN as aborted (an explicit user stop reached the + // run's signal; a disconnect does not abort a run-wrapped turn). + if (runId) await runHooks?.onSettled?.(runId, 'aborted'); + await closeExternalClients(); + }, + }); - // Stream the UI-message protocol straight to the hijacked Node response. - // Without onError the AI SDK masks the cause ('An error occurred.') and the - // UI shows a generic failure. Surface the real provider message instead. - // AI SDK error messages / 4xx bodies never contain the API key, so this is - // safe; we never dump the resolved config/apiKey. - // - // SSE buffering / proxy note: pipeUIMessageStreamToResponse writes the - // headers immediately (res.writeHead) and each chunk incrementally, and the - // SDK's default UI_MESSAGE_STREAM_HEADERS already include - // `x-accel-buffering: no` (disables nginx response buffering) plus - // `content-type: text/event-stream` and `cache-control: no-cache`. We pass - // `headers` explicitly anyway so the intent is visible here and survives any - // future change to the SDK defaults (prepareHeaders only fills a header when - // absent, so this never clobbers the SDK's content-type). DEPLOYMENT: the - // reverse proxy in front of this server MUST NOT buffer this route, or the - // whole response is released at once and nothing streams. nginx honours the - // `x-accel-buffering: no` header we send (and additionally set - // `proxy_buffering off; proxy_cache off;` for /api/ai-chat/stream); traefik - // does not buffer responses by default. - // Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2). - stripStreamingHopByHopHeaders(res.raw); - // Running sum of per-step usage (v6 `finish-step.usage` is per-step). Sent - // as the cumulative authoritative usage so the client never jumps DOWN. - let cumulativeStepUsage: ChatStreamUsage | undefined; - result.pipeUIMessageStreamToResponse(res.raw, { - headers: { 'X-Accel-Buffering': 'no' }, - // Surface the authoritative chatId on the streamed assistant UI message so - // the client adopts the REAL id of the row we created, instead of guessing - // the newest chat in its list. `messageMetadata` is invoked by the AI SDK - // on the `start`, `finish-step` and `finish` stream parts (ai@6 — note the - // `finish-step` trigger relies on it being delivered as its own - // message-metadata chunk); we attach `chatId` on the `start` part so it - // reaches the client (as message.metadata.chatId) at the very first chunk — - // before any second tab can race a newer chat into the list. This fixes the - // two-tab "adoption race" (#137). + // Drain the stream independently of the client socket so the turn always + // runs to completion (or to its abort) and the terminal callbacks + // (onFinish/onError/onAbort) fire — releasing the per-turn object graph + // (history, the per-request toolset closures, captured steps, SDK buffers) + // and closing leased MCP clients. WITHOUT this, a client disconnect leaves + // the pipe's dead socket as the only reader; backpressure stalls the stream, + // the callbacks never run, and every dropped turn stays rooted in memory — + // the heap-OOM leak. consumeStream removes that backpressure (AI SDK v6 + // "Handling client disconnects"). NOT awaited (fire-and-forget); the stream + // errors are already logged by the streamText `onError` callback above, so + // swallow here to avoid an unhandledRejection. + void result.consumeStream({ onError: () => undefined }); + + // Stream the UI-message protocol straight to the hijacked Node response. + // Without onError the AI SDK masks the cause ('An error occurred.') and the + // UI shows a generic failure. Surface the real provider message instead. + // AI SDK error messages / 4xx bodies never contain the API key, so this is + // safe; we never dump the resolved config/apiKey. // - // `finish-step.usage` is PER-STEP (not cumulative) in v6, and the client - // merges each metadata.usage by replacement — so on a multi-step agent turn - // (up to MAX_AGENT_STEPS) the naive per-step value would make the live - // counter jump DOWN at each boundary. We keep a running sum here and send - // the CUMULATIVE usage, which converges to `finish.totalUsage` (#151). - messageMetadata: ({ part }) => { - const p = part as StreamMetadataPart; - if (p.type === 'finish-step') { - cumulativeStepUsage = accumulateStepUsage( - cumulativeStepUsage, - normalizeStreamUsage(p.usage), - ); - } - return chatStreamMetadata(p, chatId, cumulativeStepUsage, runId); - }, - // Stream reasoning (thinking) parts to the client so the live counter can - // estimate reasoning tokens from streamed text. v6 default is already - // true; set explicitly so the intent survives any future SDK default - // change. Providers that don't emit reasoning text still surface the - // count via the authoritative `usage.reasoningTokens` on finish-step. - sendReasoning: true, - onError: (error: unknown) => { - // Reuse the shared formatter so provider error formatting stays - // unified between the log line and the streamed error message. - return describeProviderError(error, 'AI stream error'); - }, - }); + // SSE buffering / proxy note: pipeUIMessageStreamToResponse writes the + // headers immediately (res.writeHead) and each chunk incrementally, and the + // SDK's default UI_MESSAGE_STREAM_HEADERS already include + // `x-accel-buffering: no` (disables nginx response buffering) plus + // `content-type: text/event-stream` and `cache-control: no-cache`. We pass + // `headers` explicitly anyway so the intent is visible here and survives any + // future change to the SDK defaults (prepareHeaders only fills a header when + // absent, so this never clobbers the SDK's content-type). DEPLOYMENT: the + // reverse proxy in front of this server MUST NOT buffer this route, or the + // whole response is released at once and nothing streams. nginx honours the + // `x-accel-buffering: no` header we send (and additionally set + // `proxy_buffering off; proxy_cache off;` for /api/ai-chat/stream); traefik + // does not buffer responses by default. + // Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2). + stripStreamingHopByHopHeaders(res.raw); + // Running sum of per-step usage (v6 `finish-step.usage` is per-step). Sent + // as the cumulative authoritative usage so the client never jumps DOWN. + let cumulativeStepUsage: ChatStreamUsage | undefined; + result.pipeUIMessageStreamToResponse(res.raw, { + headers: { 'X-Accel-Buffering': 'no' }, + // Surface the authoritative chatId on the streamed assistant UI message so + // the client adopts the REAL id of the row we created, instead of guessing + // the newest chat in its list. `messageMetadata` is invoked by the AI SDK + // on the `start`, `finish-step` and `finish` stream parts (ai@6 — note the + // `finish-step` trigger relies on it being delivered as its own + // message-metadata chunk); we attach `chatId` on the `start` part so it + // reaches the client (as message.metadata.chatId) at the very first chunk — + // before any second tab can race a newer chat into the list. This fixes the + // two-tab "adoption race" (#137). + // + // `finish-step.usage` is PER-STEP (not cumulative) in v6, and the client + // merges each metadata.usage by replacement — so on a multi-step agent turn + // (up to MAX_AGENT_STEPS) the naive per-step value would make the live + // counter jump DOWN at each boundary. We keep a running sum here and send + // the CUMULATIVE usage, which converges to `finish.totalUsage` (#151). + messageMetadata: ({ part }) => { + const p = part as StreamMetadataPart; + if (p.type === 'finish-step') { + cumulativeStepUsage = accumulateStepUsage( + cumulativeStepUsage, + normalizeStreamUsage(p.usage), + ); + } + return chatStreamMetadata(p, chatId, cumulativeStepUsage, runId); + }, + // Stream reasoning (thinking) parts to the client so the live counter can + // estimate reasoning tokens from streamed text. v6 default is already + // true; set explicitly so the intent survives any future SDK default + // change. Providers that don't emit reasoning text still surface the + // count via the authoritative `usage.reasoningTokens` on finish-step. + sendReasoning: true, + onError: (error: unknown) => { + // Reuse the shared formatter so provider error formatting stays + // unified between the log line and the streamed error message. + return describeProviderError(error, 'AI stream error'); + }, + }); - // Force the status line + headers onto the socket NOW (before the model's - // first token), so the proxy sees the response start immediately even if the - // provider's first chunk is delayed. writeToServerResponse already called - // writeHead synchronously above; flushHeaders is a belt-and-braces no-op once - // headers are sent, and is guarded for response-likes that lack it. - res.raw.flushHeaders?.(); - // Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout). - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: count beats so a disconnect log can show - // how many pings were written before Safari dropped. - startSseHeartbeat(res.raw, 15_000, () => { - heartbeatsSent += 1; - }); + // Force the status line + headers onto the socket NOW (before the model's + // first token), so the proxy sees the response start immediately even if the + // provider's first chunk is delayed. writeToServerResponse already called + // writeHead synchronously above; flushHeaders is a belt-and-braces no-op once + // headers are sent, and is guarded for response-likes that lack it. + res.raw.flushHeaders?.(); + // Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout). + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: count beats so a disconnect log can show + // how many pings were written before Safari dropped. + startSseHeartbeat(res.raw, 15_000, () => { + heartbeatsSent += 1; + }); + } catch (err) { + // Synchronous failure before/while wiring the stream: the terminal + // callbacks will not run, so release the leased external clients here and + // re-throw for the controller to surface on the socket. + await closeExternalClients(); + throw err; + } } catch (err) { - // Synchronous failure before/while wiring the stream: the terminal - // callbacks will not run, so release the leased external clients here and - // re-throw for the controller to surface on the socket. - await closeExternalClients(); + // #184 safety net (see the opening comment): settle the run on ANY failure + // before streamText's callbacks own the lifecycle, so the run row never + // stays 'running' forever (which would 409 every later turn in this chat). + // finalizeRun (onSettled) is idempotent — a settle here and a settle from a + // streamText callback collapse to a single terminal write. + if (runId) { + await runHooks?.onSettled?.( + runId, + 'error', + err instanceof Error + ? err.message + : 'Agent run failed before streaming started', + ); + } throw err; } } @@ -955,7 +993,10 @@ export class AiChatService implements OnModuleInit { * permission). The content is truncated to keep the prompt cheap and within * context limits. Throws AiNotConfiguredException (503) if AI is unconfigured. */ - async generatePageTitle(workspaceId: string, content: string): Promise { + async generatePageTitle( + workspaceId: string, + content: string, + ): Promise { const model = await this.ai.getChatModel(workspaceId); const { text } = await generateText({ model, diff --git a/apps/server/src/database/types/entity.types.ts b/apps/server/src/database/types/entity.types.ts index 0474ab9e..54c0753d 100644 --- a/apps/server/src/database/types/entity.types.ts +++ b/apps/server/src/database/types/entity.types.ts @@ -56,16 +56,12 @@ export type UpdatableAiChat = Updateable>; // full-text search. It is omitted from the public type so it never leaks // into HTTP responses or the chat history fed to the language model. export type AiChatMessage = Omit, 'tsv'>; -export type InsertableAiChatMessage = Omit< - Insertable, - 'tsv' ->; +export type InsertableAiChatMessage = Omit, 'tsv'>; // AI Chat Run (#184 phase 1): the agent run as a first-class lifecycle object, // detached from the HTTP request / browser window. export type AiChatRun = Selectable; export type InsertableAiChatRun = Insertable; -export type UpdatableAiChatRun = Updateable>; // AI Provider Credentials // SECURITY (D9/§8.1): holds encrypted per-workspace provider API keys. @@ -211,11 +207,14 @@ export type UpdatableFavorite = Updateable>; // Page Transclusion export type PageTransclusion = Selectable; export type InsertablePageTransclusion = Insertable; -export type UpdatablePageTransclusion = Updateable>; +export type UpdatablePageTransclusion = Updateable< + Omit +>; // Page Transclusion Reference export type PageTransclusionReference = Selectable; -export type InsertablePageTransclusionReference = Insertable; +export type InsertablePageTransclusionReference = + Insertable; export type UpdatablePageTransclusionReference = Updateable< Omit >; @@ -285,7 +284,9 @@ export type UpdatablePagePermission = Updateable>; // Page Verification export type PageVerification = Selectable<_PageVerifications>; export type InsertablePageVerification = Insertable<_PageVerifications>; -export type UpdatablePageVerification = Updateable>; +export type UpdatablePageVerification = Updateable< + Omit<_PageVerifications, 'id'> +>; // Page Verifier export type PageVerifier = Selectable<_PageVerifiers>;