diff --git a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx index 52b425cc..e349a8fe 100644 --- a/apps/client/src/features/ai-chat/components/ai-chat-window.tsx +++ b/apps/client/src/features/ai-chat/components/ai-chat-window.tsx @@ -166,8 +166,10 @@ export default function AiChatWindow() { useAiChatMessagesQuery(activeChatId ?? undefined); // #184 reconnect-and-live-follow. Whether detached agent runs are enabled for - // this workspace; the reconnect endpoint is flag-gated server-side, so we must - // not poll it when the feature is off. + // this workspace. The reconnect endpoint itself is NOT flag-gated server-side + // (it is only owner-gated and returns `{ run: null }` when the chat has no + // run); but when the feature is off no runs are ever created, so polling it + // would always come back empty — we gate it off here to avoid pointless polls. const workspace = useAtomValue(workspaceAtom); const autonomousRunsEnabled = workspace?.settings?.ai?.autonomousRuns === true; diff --git a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts index 5dcf4afd..555d11b9 100644 --- a/apps/client/src/features/ai-chat/queries/ai-chat-query.ts +++ b/apps/client/src/features/ai-chat/queries/ai-chat-query.ts @@ -60,11 +60,12 @@ export const AI_CHAT_RUN_RQ_KEY = (chatId: string) => ["ai-chat-run", chatId]; export function useAiChatsQuery() { const query = useInfiniteQuery({ queryKey: AI_CHATS_RQ_KEY, - queryFn: ({ pageParam }) => - getAiChats({ cursor: pageParam, limit: 50 }), + queryFn: ({ pageParam }) => getAiChats({ cursor: pageParam, limit: 50 }), initialPageParam: undefined as string | undefined, getNextPageParam: (lastPage) => - lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined, + lastPage.meta.hasNextPage + ? (lastPage.meta.nextCursor ?? undefined) + : undefined, }); const data = useMemo | undefined>(() => { @@ -94,7 +95,9 @@ export function useAiChatMessagesQuery(chatId: string | undefined) { getAiChatMessages({ chatId: chatId as string, cursor: pageParam }), initialPageParam: undefined as string | undefined, getNextPageParam: (lastPage) => - lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined, + lastPage.meta.hasNextPage + ? (lastPage.meta.nextCursor ?? undefined) + : undefined, enabled: !!chatId, }); @@ -144,9 +147,10 @@ export function useAiChatMessagesQuery(chatId: string | undefined) { * is thus naturally bounded by the run terminating; no separate timeout cap. * * `enabled` gates the whole thing: callers pass `false` when the autonomous-runs - * feature is off (the endpoint is flag-gated server-side and would 403) OR when - * THIS tab is the one actively streaming the run (the live SSE owns the view, so - * we must not also poll/merge). The global `retry: false` means a 403/anything + * feature is off (the endpoint is NOT flag-gated server-side, but with the feature + * off the chat has no runs, so polling would only ever return `{ run: null }`) OR + * when THIS tab is the one actively streaming the run (the live SSE owns the view, + * so we must not also poll/merge). The global `retry: false` means a failed fetch * leaves `data` undefined, so refetchInterval(undefined run) returns false — a * failed fetch can never spin a tight loop. */ @@ -311,11 +315,14 @@ export function useImportAiRolesFromCatalogMutation() { mutationFn: (payload) => importAiRolesFromCatalog(payload), onSuccess: (result) => { notifications.show({ - message: t("Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}", { - created: result.created, - renamed: result.renamed, - skipped: result.skipped, - }), + message: t( + "Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}", + { + created: result.created, + renamed: result.renamed, + skipped: result.skipped, + }, + ), }); // Surface partial failures (e.g. unique-name races) as a red warning. if (result.errors.length > 0) { diff --git a/apps/client/src/features/ai-chat/services/ai-chat-service.ts b/apps/client/src/features/ai-chat/services/ai-chat-service.ts index abd5ab8e..fba0dfad 100644 --- a/apps/client/src/features/ai-chat/services/ai-chat-service.ts +++ b/apps/client/src/features/ai-chat/services/ai-chat-service.ts @@ -49,7 +49,9 @@ export async function getAiChatMessages( * partial output while the run is in-flight, the final output once it finished). * The DB is the source of truth, so this works for an in-flight run (the browser * dropped, the run kept going) and a finished one alike; `{ run: null }` when the - * chat has never had a run. Owner-gated and flag-gated server-side. + * chat has never had a run. Owner-gated server-side (the requesting user must own + * the chat); it is NOT flag-gated — when the feature is off the chat simply has no + * runs, so the endpoint returns `{ run: null }`. */ export async function getAiChatRun( chatId: string, diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts index 419a3bfc..d7c8ce97 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -12,10 +12,13 @@ function uniqueViolation(constraintName: string): Error & { code: string; constraint_name: string; } { - return Object.assign(new Error('duplicate key value violates unique constraint'), { - code: '23505', - constraint_name: constraintName, - }); + return Object.assign( + new Error('duplicate key value violates unique constraint'), + { + code: '23505', + constraint_name: constraintName, + }, + ); } /** @@ -260,6 +263,33 @@ describe('AiChatRunService run lifecycle', () => { ); }); + it('finalizeRun is IDEMPOTENT: a second settle no-ops (single terminal write)', async () => { + // The #184 review fix: AiChatService.stream wraps the turn in a safety-net + // catch that settles a failed turn AND streamText's terminal callback may + // also settle — both routes call finalizeRun. Only the FIRST may write the + // terminal row; the second must no-op so a late settle can never clobber the + // real terminal status or double-write the row. + const repo = makeRepo(); + const svc = new AiChatRunService(repo as never); + await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + + await svc.finalizeRun('run-1', 'ws-1', 'error', 'first'); + expect(svc.isLocallyActive('run-1')).toBe(false); + // A second settle (e.g. a streamText callback firing after the catch) no-ops. + await svc.finalizeRun('run-1', 'ws-1', 'completed', undefined); + + expect(repo.update).toHaveBeenCalledTimes(1); + expect(repo.update).toHaveBeenCalledWith( + 'run-1', + 'ws-1', + expect.objectContaining({ status: 'failed', error: 'first' }), + ); + }); + it('recordStep / linkAssistantMessage are best-effort: a repo failure is swallowed', async () => { const repo = makeRepo({ update: jest.fn(async () => { diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts index 1fb8ca4e..c9456420 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -201,10 +201,19 @@ export class AiChatRunService implements OnModuleInit { /** * Finalize a run to its terminal status (succeeded / failed / aborted), - * stamping finishedAt + any error, and DROP its in-memory entry. Idempotent - * and best-effort: the at-most-once turn terminal callbacks call it, but a - * double call (or a call after the row was swept) merely re-writes the same - * terminal row. + * stamping finishedAt + any error, and DROP its in-memory entry. Best-effort. + * + * IDEMPOTENT (#184 review): the terminal write happens AT MOST ONCE per run. + * `this.active.delete(runId)` returns false when the run was already settled + * (its in-memory entry already dropped); in that case we no-op. This collapses + * a legitimate double-settle to a single write: AiChatService.stream wraps the + * turn in a safety-net catch that settles the run to 'error' on any failure + * BEFORE streamText's terminal callbacks own the lifecycle — and on the rare + * path where streamText DID attach (so a callback also settles) the two would + * otherwise both call onSettled. The first caller wins and writes the terminal + * row; the second returns early, so a late settle can never clobber the real + * terminal status or double-write. beginRun always registers the entry before + * the turn can settle, so a legitimate first finalize always finds it. */ async finalizeRun( runId: string, @@ -212,7 +221,7 @@ export class AiChatRunService implements OnModuleInit { turnStatus: TurnTerminalStatus, error?: string, ): Promise { - this.active.delete(runId); + if (!this.active.delete(runId)) return; try { await this.runRepo.update(runId, workspaceId, { status: mapTurnStatusToRun(turnStatus), @@ -258,10 +267,7 @@ export class AiChatRunService implements OnModuleInit { /** Fetch a run by id (workspace-scoped). Used to resolve + ownership-check an * explicit stop targeting a runId. */ - getRun( - runId: string, - workspaceId: string, - ): Promise { + getRun(runId: string, workspaceId: string): Promise { return this.runRepo.findById(runId, workspaceId); } diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts b/apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts new file mode 100644 index 00000000..321ecda1 --- /dev/null +++ b/apps/server/src/core/ai-chat/ai-chat.controller.run.spec.ts @@ -0,0 +1,163 @@ +import { BadRequestException, ForbiddenException } from '@nestjs/common'; +import { AiChatController } from './ai-chat.controller'; +import type { User, Workspace } from '@docmost/db/types/entity.types'; + +/** + * Wiring spec for the #184 run-reconnect / run-stop endpoints + * (`POST /ai-chat/run` and `POST /ai-chat/stop`). Both are OWNER-gated via + * assertOwnedChat (the requesting user must own the chat) and NOT flag-gated. + * Exercised with hand-rolled mocks — no Nest graph, no DB. The controller's + * constructor order is (aiChatService, aiChatRunService, aiChatRepo, + * aiChatMessageRepo, aiTranscription). + */ +describe('AiChatController run endpoints (#184)', () => { + const user = { id: 'u1' } as User; + const workspace = { id: 'ws1' } as Workspace; + + function makeController(opts: { + chat?: unknown; // what aiChatRepo.findById returns (owner-gate) + run?: unknown; // getLatestForChat / getRun result + activeRun?: unknown; // getActiveForChat result + message?: unknown; // aiChatMessageRepo.findById result + stopped?: boolean; // requestStop result + }) { + const aiChatRunService = { + getLatestForChat: jest.fn().mockResolvedValue(opts.run), + getRun: jest.fn().mockResolvedValue(opts.run), + getActiveForChat: jest.fn().mockResolvedValue(opts.activeRun), + requestStop: jest.fn().mockResolvedValue(opts.stopped ?? false), + }; + const aiChatRepo = { + findById: jest.fn().mockResolvedValue(opts.chat), + }; + const aiChatMessageRepo = { + findById: jest.fn().mockResolvedValue(opts.message), + }; + const controller = new AiChatController( + {} as never, // aiChatService + aiChatRunService as never, + aiChatRepo as never, + aiChatMessageRepo as never, + {} as never, // aiTranscription + ); + return { controller, aiChatRunService, aiChatRepo, aiChatMessageRepo }; + } + + describe('POST /ai-chat/run (getRun)', () => { + it('owner-gates: a chat the user does not own throws ForbiddenException', async () => { + const { controller, aiChatRunService } = makeController({ + chat: { id: 'c1', creatorId: 'someone-else' }, + }); + await expect( + controller.getRun({ chatId: 'c1' }, user, workspace), + ).rejects.toBeInstanceOf(ForbiddenException); + // It must NOT reach the run lookup once the owner-gate fails. + expect(aiChatRunService.getLatestForChat).not.toHaveBeenCalled(); + }); + + it('returns { run: null, message: null } when the chat has never had a run', async () => { + const { controller, aiChatRunService } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + run: undefined, + }); + const res = await controller.getRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ run: null, message: null }); + expect(aiChatRunService.getLatestForChat).toHaveBeenCalledWith( + 'c1', + 'ws1', + ); + }); + + it('returns the run and its projected assistant message', async () => { + const run = { id: 'run-1', chatId: 'c1', assistantMessageId: 'm1' }; + const message = { id: 'm1', role: 'assistant' }; + const { controller, aiChatMessageRepo } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + run, + message, + }); + const res = await controller.getRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ run, message }); + expect(aiChatMessageRepo.findById).toHaveBeenCalledWith('m1', 'ws1'); + }); + + it('returns message: null when the run has no linked assistant message', async () => { + const run = { id: 'run-1', chatId: 'c1', assistantMessageId: null }; + const { controller, aiChatMessageRepo } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + run, + }); + const res = await controller.getRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ run, message: null }); + expect(aiChatMessageRepo.findById).not.toHaveBeenCalled(); + }); + }); + + describe('POST /ai-chat/stop (stopRun)', () => { + it('throws BadRequestException when neither runId nor chatId is given', async () => { + const { controller } = makeController({}); + await expect( + controller.stopRun({}, user, workspace), + ).rejects.toBeInstanceOf(BadRequestException); + }); + + it('stops by runId: owner-gates via the run’s chat, then requests the stop', async () => { + const { controller, aiChatRunService, aiChatRepo } = makeController({ + run: { id: 'run-1', chatId: 'c1' }, + chat: { id: 'c1', creatorId: 'u1' }, + stopped: true, + }); + const res = await controller.stopRun({ runId: 'run-1' }, user, workspace); + expect(res).toEqual({ stopped: true }); + expect(aiChatRunService.getRun).toHaveBeenCalledWith('run-1', 'ws1'); + expect(aiChatRepo.findById).toHaveBeenCalledWith('c1', 'ws1'); + expect(aiChatRunService.requestStop).toHaveBeenCalledWith('run-1', 'ws1'); + }); + + it('stops by runId: a foreign run’s chat throws ForbiddenException (no stop)', async () => { + const { controller, aiChatRunService } = makeController({ + run: { id: 'run-1', chatId: 'c1' }, + chat: { id: 'c1', creatorId: 'someone-else' }, + }); + await expect( + controller.stopRun({ runId: 'run-1' }, user, workspace), + ).rejects.toBeInstanceOf(ForbiddenException); + expect(aiChatRunService.requestStop).not.toHaveBeenCalled(); + }); + + it('stops by runId: an unknown run reports { stopped: false }', async () => { + const { controller, aiChatRunService } = makeController({ + run: undefined, + }); + const res = await controller.stopRun({ runId: 'gone' }, user, workspace); + expect(res).toEqual({ stopped: false }); + expect(aiChatRunService.requestStop).not.toHaveBeenCalled(); + }); + + it('stops by chatId: owner-gates, resolves the active run, requests the stop', async () => { + const { controller, aiChatRunService, aiChatRepo } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + activeRun: { id: 'run-9' }, + stopped: true, + }); + const res = await controller.stopRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ stopped: true }); + expect(aiChatRepo.findById).toHaveBeenCalledWith('c1', 'ws1'); + expect(aiChatRunService.getActiveForChat).toHaveBeenCalledWith( + 'c1', + 'ws1', + ); + expect(aiChatRunService.requestStop).toHaveBeenCalledWith('run-9', 'ws1'); + }); + + it('stops by chatId: reports { stopped: false } when no run is active', async () => { + const { controller, aiChatRunService } = makeController({ + chat: { id: 'c1', creatorId: 'u1' }, + activeRun: undefined, + }); + const res = await controller.stopRun({ chatId: 'c1' }, user, workspace); + expect(res).toEqual({ stopped: false }); + expect(aiChatRunService.requestStop).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts index 77e9d3c4..602773d1 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts @@ -1,5 +1,7 @@ import { Logger } from '@nestjs/common'; -import { AiChatService } from './ai-chat.service'; +import { AiChatService, AiChatRunHooks } from './ai-chat.service'; +import { AiChatRunService } from './ai-chat-run.service'; +import type { User, Workspace } from '@docmost/db/types/entity.types'; /** * Lifecycle unit tests for AiChatService.onModuleInit (#183 crash-recovery @@ -59,3 +61,97 @@ describe('AiChatService.onModuleInit (startup sweep)', () => { expect(String(warnSpy.mock.calls[0][0])).toContain('db unavailable'); }); }); + +/** + * #184 CRITICAL run-lifecycle safety net (review fix). A transient failure + * AFTER a successful beginRun but BEFORE streamText's terminal callbacks own the + * lifecycle must STILL settle the run — otherwise the run row is stuck 'running' + * forever (sweepRunning only runs at startup) and the partial unique index + the + * controller pre-check 409 every future turn in that chat until a restart. Here + * we model the very first bare await after beginRun (the user-message insert) + * throwing, wiring the run hooks to a REAL AiChatRunService (mock repo) exactly + * as the controller does, and assert the run is settled to 'error' and its + * in-memory entry dropped (so a follow-up turn would NOT be 409'd). + */ +describe('AiChatService.stream run-lifecycle safety net (#184)', () => { + const user = { id: 'u1' } as User; + const workspace = { id: 'ws1' } as Workspace; + + afterEach(() => jest.restoreAllMocks()); + + it('an exception after beginRun settles the run to error and drops the in-memory entry', async () => { + jest.spyOn(Logger.prototype, 'error').mockImplementation(() => undefined); + + // Real run service over a mock repo, so finalizeRun's in-memory bookkeeping + // (active.delete) is exercised for real. + const runRepo = { + insert: jest.fn().mockResolvedValue({ id: 'run-1', status: 'running' }), + update: jest.fn().mockResolvedValue({ id: 'run-1' }), + }; + const runService = new AiChatRunService(runRepo as never); + + // The user-message insert (the first bare await after beginRun) throws. + const aiChatMessageRepo = { + insert: jest.fn().mockRejectedValue(new Error('insert boom')), + }; + const aiChatRepo = { + // Existing chat -> chatId stays, no new-chat insert path. + findById: jest.fn().mockResolvedValue({ id: 'chat-1', creatorId: 'u1' }), + }; + + const service = new AiChatService( + {} as never, // ai + aiChatRepo as never, + aiChatMessageRepo as never, + {} as never, // aiSettings + {} as never, // tools + {} as never, // mcpClients + {} as never, // aiAgentRoleRepo + {} as never, // pageRepo + {} as never, // pageAccess + ); + + const runHooks: AiChatRunHooks = { + begin: (chatId) => + runService.beginRun({ + chatId, + workspaceId: workspace.id, + userId: user.id, + trigger: 'user', + }), + onSettled: (runId, status, error) => + runService.finalizeRun(runId, workspace.id, status, error), + }; + + await expect( + service.stream({ + user, + workspace, + sessionId: 'sess', + body: { + chatId: 'chat-1', + messages: [ + { id: 'm', role: 'user', parts: [{ type: 'text', text: 'hi' }] }, + ], + }, + res: {} as never, + signal: new AbortController().signal, + model: {} as never, + role: null, + runHooks, + }), + ).rejects.toThrow('insert boom'); + + // The run was begun... + expect(runRepo.insert).toHaveBeenCalledTimes(1); + // ...then settled to a terminal FAILED status by the safety net... + expect(runRepo.update).toHaveBeenCalledTimes(1); + expect(runRepo.update).toHaveBeenCalledWith( + 'run-1', + 'ws1', + expect.objectContaining({ status: 'failed' }), + ); + // ...and the in-memory entry is gone, so a follow-up turn is NOT 409'd. + expect(runService.isLocallyActive('run-1')).toBe(false); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index 4ffb1cef..66a7d924 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -154,9 +154,7 @@ export interface AiChatRunHooks { // Called once the chat id is resolved; returns the run handle whose `signal` // drives the agent loop's abort. Returning null disables run tracking (the // turn falls back to the passed-in socket signal). - begin( - chatId: string, - ): Promise<{ runId: string; signal: AbortSignal } | null>; + begin(chatId: string): Promise<{ runId: string; signal: AbortSignal } | null>; onAssistantSeeded?( runId: string, assistantMessageId: string, @@ -418,531 +416,571 @@ export class AiChatService implements OnModuleInit { } } - // Extract the incoming user turn (the last user message from useChat). - const incoming = lastUserMessage(body.messages); - const incomingText = uiMessageText(incoming); - - // Persist the user message before contacting the model. - await this.aiChatMessageRepo.insert({ - chatId, - workspaceId: workspace.id, - userId: user.id, - role: 'user', - content: incomingText, - // jsonb column: UIMessage parts are JSON-serializable at runtime but not - // structurally `JsonValue`, so cast through unknown. - metadata: (incoming?.parts ? { parts: incoming.parts } : null) as never, - }); - - // Rebuild the conversation from persisted history (not the client payload), - // so the model always sees the authoritative server-side transcript. Load - // the FULL history in chronological order (oldest -> newest, incl. the user - // message just inserted above) so NO turns are dropped — there is no - // recent-tail window anymore. `findAllByChat` keeps a 5000-row memory-safety - // backstop (on overflow it keeps the NEWEST rows and logs a warning); that - // is a safety net far above any realistic chat, not a conversational limit. - const history = await this.aiChatMessageRepo.findAllByChat( - chatId, - workspace.id, - ); - const uiMessages = history.map(rowToUiMessage); - // convertToModelMessages is async in ai@6.0.134 (returns Promise). - const messages = await convertToModelMessages(uiMessages); - - // Interrupt-resume detection (#198): the client "send now" flag is only a - // hint — confirm it against the persisted history (the preceding assistant - // turn must really be aborted/streaming) so a spoofed flag cannot inject the - // interrupt note onto an ordinary turn. The partial output the model needs is - // already in `messages` (the aborted assistant row replays via findRecent). - const interrupted = isInterruptResume(history, body.interrupted); - - // The model is resolved by the controller before hijack (clean 503 path). - // Here we only need the admin-configured system prompt. - const resolved = await this.aiSettings.resolve(workspace.id); - - // Build the external MCP toolset FIRST so the system prompt can carry each - // connected server's admin-authored guidance (#180). Merge in admin- - // configured external MCP tools (web search, etc.; §6.8). A down/slow - // external server never crashes the turn — toolsFor skips it and records the - // outcome. The returned client handles MUST be closed in the streamText - // lifecycle (onFinish/onError/onAbort) — leaking them is a bug. Docmost - // tools take precedence on a name clash (external are namespaced, so a clash - // is not expected; the spread order makes intent explicit). - let external: Awaited> = { - tools: {}, - clients: [], - outcomes: [], - instructions: [], - }; + // #184 RUN-LIFECYCLE SAFETY NET (review fix). Everything from here until + // streamText's terminal callbacks (onFinish/onError/onAbort) take ownership of + // the run lifecycle runs under this try. Between a successful beginRun (run row + // 'running' + an in-memory AbortController) and streamText attaching there are + // bare awaits — the user-message insert, findAllByChat, convertToModelMessages, + // aiSettings.resolve — plus the buildSystemPrompt/forUser block and the + // synchronous streamText wiring, NONE of which finalize the run on failure (the + // inner catches only release MCP clients and rethrow). Without this net any + // transient failure there would leave ai_chat_runs stuck 'running' FOREVER + // (sweepRunning only runs at server startup): the partial unique index + the + // controller pre-check would then 409 every future turn in this chat until a + // restart, and an observer tab would poll forever. The catch settles the run to + // 'error' (which deletes the in-memory entry and writes a terminal row) before + // rethrowing to the controller. finalizeRun (via onSettled) is idempotent, so + // if streamText DID attach and a terminal callback also settles, exactly one + // terminal write wins — never a double-settle. try { - external = await this.mcpClients.toolsFor(workspace.id); - } catch (err) { - // Building the external toolset must never break the turn; proceed with - // Docmost-only tools. Never log URLs/headers — short message only. - this.logger.warn( - `External MCP toolset unavailable: ${ - err instanceof Error ? err.message : 'unknown error' - }`, - ); - } + // Extract the incoming user turn (the last user message from useChat). + const incoming = lastUserMessage(body.messages); + const incomingText = uiMessageText(incoming); - // Close every external client EXACTLY ONCE across the turn's terminal - // callbacks (onFinish/onError/onAbort all fire at most once collectively, - // but guard anyway). DEFINED HERE — before the prompt/toolset are built — so - // that if buildSystemPrompt or forUser throws AFTER the external lease was - // taken (toolsFor above), the lease is still released. Otherwise its refCount - // stays >= 1 forever and the external undici sockets leak until restart - // (#180 reorder moved toolsFor ahead of these; #185 review). Close errors are - // swallowed so they never break the response. - let clientsClosed = false; - const closeExternalClients = async (): Promise => { - if (clientsClosed) return; - clientsClosed = true; - await Promise.all( - external.clients.map((c) => - c.close().catch((closeErr) => { - this.logger.warn( - `Failed to close external MCP client: ${ - closeErr instanceof Error ? closeErr.message : 'unknown error' - }`, - ); - }), - ), - ); - }; - - // Build the system prompt + Docmost toolset. If either throws after the - // external MCP lease was taken above, release the lease before rethrowing so - // the leased transports are not leaked (#185 review). - let system: string; - let docmostTools: Awaited>; - try { - system = buildSystemPrompt({ - workspace, - adminPrompt: resolved?.systemPrompt, - // The role (pre-resolved by the controller) REPLACES the persona layer; - // the safety framework is still appended by buildSystemPrompt. - roleInstructions: role?.instructions, - // Server-validated open page (authoritative title), not the client value. - openedPage: openPageContext, - // Guidance only for servers that connected and yielded ≥1 callable tool. - mcpInstructions: external.instructions, - // History-confirmed interrupt-resume flag (#198): adds the interrupt note - // so the model treats the partial answer above as cut off, not finished. - interrupted, - }); - - // Pass the resolved chatId so the write tools can mint provenance tokens - // (access + collab) carrying { actor:'agent', aiChatId: chatId }, making - // agent REST/collab writes attributable and non-spoofable (§6.5/§6.6). - docmostTools = await this.tools.forUser( - user, - sessionId, - workspace.id, - chatId, - // Same server-validated open page used by the system prompt above; - // exposed to the model via getCurrentPage so page identity (and the - // AUTHORITATIVE title) survives prompt mangling / client title spoofing. - openPageContext, - ); - } catch (err) { - await closeExternalClients(); - throw err; - } - - const tools = { ...external.tools, ...docmostTools }; - - // Accumulate the turn's streamed output so a provider error / disconnect can - // persist the PARTIAL answer the user already saw — the SDK's onError/onAbort - // callbacks don't hand us the in-progress text. `capturedSteps` holds finished - // steps (tool calls + their text); `inProgressText` holds the text streamed in - // the CURRENT, not-yet-finished step, reset whenever a step finishes. - const capturedSteps: StepLike[] = []; - let inProgressText = ''; - - // Step-granular durability (#183): create the assistant row UPFRONT in the - // 'streaming' state (before any token), then UPDATE it as each step finishes - // and finalize it once on the terminal callback. If the process dies - // mid-turn the row survives with every finished step already persisted; the - // startup sweep (sweepStreaming) later flips a dangling 'streaming' row to - // 'aborted'. The DB is now the single source of truth for the turn — the - // socket is never required for the write path. A failed upfront insert is - // logged and leaves assistantId undefined; the per-step/terminal updates then - // no-op (guarded below) so the turn still streams to the user. - let assistantId: string | undefined; - try { - const seed = flushAssistant([], '', 'streaming'); - const seeded = await this.aiChatMessageRepo.insert({ + // Persist the user message before contacting the model. + await this.aiChatMessageRepo.insert({ chatId, workspaceId: workspace.id, userId: user.id, - role: 'assistant', - content: seed.content, - // jsonb columns: cast through never (same as the user insert above). - toolCalls: (seed.toolCalls ?? null) as never, - metadata: seed.metadata as never, - status: seed.status, + role: 'user', + content: incomingText, + // jsonb column: UIMessage parts are JSON-serializable at runtime but not + // structurally `JsonValue`, so cast through unknown. + metadata: (incoming?.parts ? { parts: incoming.parts } : null) as never, }); - assistantId = seeded?.id; - } catch (err) { - this.logger.error( - `Failed to insert upfront assistant row (chat ${chatId}, workspace ${workspace.id})`, - err as Error, + + // Rebuild the conversation from persisted history (not the client payload), + // so the model always sees the authoritative server-side transcript. Load + // the FULL history in chronological order (oldest -> newest, incl. the user + // message just inserted above) so NO turns are dropped — there is no + // recent-tail window anymore. `findAllByChat` keeps a 5000-row memory-safety + // backstop (on overflow it keeps the NEWEST rows and logs a warning); that + // is a safety net far above any realistic chat, not a conversational limit. + const history = await this.aiChatMessageRepo.findAllByChat( + chatId, + workspace.id, ); - } + const uiMessages = history.map(rowToUiMessage); + // convertToModelMessages is async in ai@6.0.134 (returns Promise). + const messages = await convertToModelMessages(uiMessages); - // Link the assistant message (the #183 projection) to its run (#184), so a - // reconnecting client can resolve the run's output. Best-effort. - if (runId && assistantId) { + // Interrupt-resume detection (#198): the client "send now" flag is only a + // hint — confirm it against the persisted history (the preceding assistant + // turn must really be aborted/streaming) so a spoofed flag cannot inject the + // interrupt note onto an ordinary turn. The partial output the model needs is + // already in `messages` (the aborted assistant row replays via findRecent). + const interrupted = isInterruptResume(history, body.interrupted); + + // The model is resolved by the controller before hijack (clean 503 path). + // Here we only need the admin-configured system prompt. + const resolved = await this.aiSettings.resolve(workspace.id); + + // Build the external MCP toolset FIRST so the system prompt can carry each + // connected server's admin-authored guidance (#180). Merge in admin- + // configured external MCP tools (web search, etc.; §6.8). A down/slow + // external server never crashes the turn — toolsFor skips it and records the + // outcome. The returned client handles MUST be closed in the streamText + // lifecycle (onFinish/onError/onAbort) — leaking them is a bug. Docmost + // tools take precedence on a name clash (external are namespaced, so a clash + // is not expected; the spread order makes intent explicit). + let external: Awaited> = { + tools: {}, + clients: [], + outcomes: [], + instructions: [], + }; try { - await runHooks?.onAssistantSeeded?.(runId, assistantId); + external = await this.mcpClients.toolsFor(workspace.id); } catch (err) { + // Building the external toolset must never break the turn; proceed with + // Docmost-only tools. Never log URLs/headers — short message only. this.logger.warn( - `Failed to link assistant row to run ${runId}: ${ + `External MCP toolset unavailable: ${ err instanceof Error ? err.message : 'unknown error' }`, ); } - } - // Per-step (non-terminal) update: persist the finished steps the moment a - // step ends. Tolerant — a failed update is logged and swallowed so it never - // throws into the stream. Keeps status 'streaming'. - const updateStreaming = async (): Promise => { - if (!assistantId) return; - // Cheap short-circuit once the turn is finalized (see `finalized` below). - // The AUTHORITATIVE guard is `onlyIfStreaming` on the UPDATE: a late - // fire-and-forget step update could still be in flight on another pool - // connection when finalize runs, so the SQL `WHERE status='streaming'` - // (not this flag) is what prevents it clobbering the terminal row. - if (finalized) return; + // Close every external client EXACTLY ONCE across the turn's terminal + // callbacks (onFinish/onError/onAbort all fire at most once collectively, + // but guard anyway). DEFINED HERE — before the prompt/toolset are built — so + // that if buildSystemPrompt or forUser throws AFTER the external lease was + // taken (toolsFor above), the lease is still released. Otherwise its refCount + // stays >= 1 forever and the external undici sockets leak until restart + // (#180 reorder moved toolsFor ahead of these; #185 review). Close errors are + // swallowed so they never break the response. + let clientsClosed = false; + const closeExternalClients = async (): Promise => { + if (clientsClosed) return; + clientsClosed = true; + await Promise.all( + external.clients.map((c) => + c.close().catch((closeErr) => { + this.logger.warn( + `Failed to close external MCP client: ${ + closeErr instanceof Error ? closeErr.message : 'unknown error' + }`, + ); + }), + ), + ); + }; + + // Build the system prompt + Docmost toolset. If either throws after the + // external MCP lease was taken above, release the lease before rethrowing so + // the leased transports are not leaked (#185 review). + let system: string; + let docmostTools: Awaited>; try { - await this.aiChatMessageRepo.update( - assistantId, + system = buildSystemPrompt({ + workspace, + adminPrompt: resolved?.systemPrompt, + // The role (pre-resolved by the controller) REPLACES the persona layer; + // the safety framework is still appended by buildSystemPrompt. + roleInstructions: role?.instructions, + // Server-validated open page (authoritative title), not the client value. + openedPage: openPageContext, + // Guidance only for servers that connected and yielded ≥1 callable tool. + mcpInstructions: external.instructions, + // History-confirmed interrupt-resume flag (#198): adds the interrupt note + // so the model treats the partial answer above as cut off, not finished. + interrupted, + }); + + // Pass the resolved chatId so the write tools can mint provenance tokens + // (access + collab) carrying { actor:'agent', aiChatId: chatId }, making + // agent REST/collab writes attributable and non-spoofable (§6.5/§6.6). + docmostTools = await this.tools.forUser( + user, + sessionId, workspace.id, - flushAssistant(capturedSteps, '', 'streaming'), - { onlyIfStreaming: true }, + chatId, + // Same server-validated open page used by the system prompt above; + // exposed to the model via getCurrentPage so page identity (and the + // AUTHORITATIVE title) survives prompt mangling / client title spoofing. + openPageContext, ); } catch (err) { - this.logger.warn( - `Failed to update streaming assistant row: ${ - err instanceof Error ? err.message : 'unknown error' - }`, - ); + await closeExternalClients(); + throw err; } - }; - // Serialize the per-step updates (#183 review): onStepFinish fires them - // without await, so two could otherwise commit out of order on different pool - // connections (step N landing after N+1). Chaining each onto the previous - // keeps the persisted row monotonic with step order; each link short-circuits - // on `finalized`, so a tail of late updates is cheap. - let stepUpdateChain: Promise = Promise.resolve(); + const tools = { ...external.tools, ...docmostTools }; - // Terminal finalize: write the completed/error/aborted row exactly once - // across the (mutually-exclusive, at-most-once) onFinish/onError/onAbort - // callbacks — mirroring the pre-#183 persist-at-most-once guard for the - // TERMINAL status (the row may be updated many times with 'streaming' before - // this fires once). - let finalized = false; - const finalizeAssistant = async ( - flushed: AssistantFlush, - ): Promise => { - if (finalized) return; - finalized = true; - const plan = planFinalizeAssistant(assistantId); + // Accumulate the turn's streamed output so a provider error / disconnect can + // persist the PARTIAL answer the user already saw — the SDK's onError/onAbort + // callbacks don't hand us the in-progress text. `capturedSteps` holds finished + // steps (tool calls + their text); `inProgressText` holds the text streamed in + // the CURRENT, not-yet-finished step, reset whenever a step finishes. + const capturedSteps: StepLike[] = []; + let inProgressText = ''; + + // Step-granular durability (#183): create the assistant row UPFRONT in the + // 'streaming' state (before any token), then UPDATE it as each step finishes + // and finalize it once on the terminal callback. If the process dies + // mid-turn the row survives with every finished step already persisted; the + // startup sweep (sweepStreaming) later flips a dangling 'streaming' row to + // 'aborted'. The DB is now the single source of truth for the turn — the + // socket is never required for the write path. A failed upfront insert is + // logged and leaves assistantId undefined; the per-step/terminal updates then + // no-op (guarded below) so the turn still streams to the user. + let assistantId: string | undefined; try { - // Shared dispatch (see applyFinalize): UPDATE the upfront row, or — when - // the upfront insert failed (kind 'insert') — INSERT the terminal row as - // the only safety against losing the turn entirely. - await applyFinalize( - this.aiChatMessageRepo, - plan, - { chatId, workspaceId: workspace.id, userId: user.id }, - flushed, - ); + const seed = flushAssistant([], '', 'streaming'); + const seeded = await this.aiChatMessageRepo.insert({ + chatId, + workspaceId: workspace.id, + userId: user.id, + role: 'assistant', + content: seed.content, + // jsonb columns: cast through never (same as the user insert above). + toolCalls: (seed.toolCalls ?? null) as never, + metadata: seed.metadata as never, + status: seed.status, + }); + assistantId = seeded?.id; } catch (err) { this.logger.error( - `Failed to finalize assistant message (kind=${plan.kind})`, + `Failed to insert upfront assistant row (chat ${chatId}, workspace ${workspace.id})`, err as Error, ); } - }; - // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure - // first-chunk latency, the model-silent gap right before a disconnect, and - // how many SSE heartbeats were written, so a Safari drop can be classified - // (idle-gap vs hard wall-clock cap vs slow first chunk). - const streamStartedAt = Date.now(); - let firstModelChunkAt: number | undefined; - let lastModelChunkAt = streamStartedAt; - let heartbeatsSent = 0; - - // NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous - // failure here (or in pipe below) would skip the terminal callbacks, so the - // catch releases the leased external clients to avoid a connection leak. - let result: ReturnType; - try { - result = streamText({ - model, - system, - messages, - tools, - // No maxOutputTokens cap on the agent: tool-call arguments (e.g. a full - // page body for the write tools) are emitted as OUTPUT tokens, so a fixed - // cap would truncate complex tool calls mid-argument. Let the model use its - // natural per-step budget. (Cost/credit limits are an account concern, not - // something to enforce by silently breaking the agent.) - stopWhen: stepCountIs(MAX_AGENT_STEPS), - // Forced finalization: reserve the LAST allowed step for a text-only - // answer. Without this, a turn that spends all its steps on tool calls - // ends with no assistant text (an empty turn). prepareAgentStep forbids - // further tool calls and appends a synthesis instruction on that step, - // concatenated onto the original `system` so the persona is preserved. - prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), - // #184: the RUN's signal (explicit-stop) when a run wraps this turn, else - // the socket-bound signal (legacy). A browser disconnect aborts only in - // the legacy path. - abortSignal: effectiveSignal, - onChunk: ({ chunk }) => { - // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model - // output chunk means the stream is actively emitting bytes; track first - // + most-recent activity timestamps. - const now = Date.now(); - firstModelChunkAt ??= now; - lastModelChunkAt = now; - // 'text-delta' is the assistant's prose; tool-call args are separate chunk - // types — so this mirrors exactly what streams to the client. - if (chunk.type === 'text-delta') inProgressText += chunk.text; - }, - onStepFinish: (step) => { - // The finished step's full text is now in `step.text`; fold it in and reset - // the in-progress accumulator for the next step. - capturedSteps.push(step as StepLike); - inProgressText = ''; - // Step-granular durability (#183): persist this finished step (its text + - // tool calls + tool RESULTS) the moment it ends, so a process death after - // this point still recovers the step. Not awaited here (never block the - // stream), but SERIALIZED via stepUpdateChain so the writes commit in - // step order; updateStreaming is error-tolerant (logs + swallows). - stepUpdateChain = stepUpdateChain.then(() => updateStreaming()); - // #184: persist the run's progress (finished-step count). Fire-and- - // forget; the hook swallows its own errors. - if (runId) runHooks?.onStep?.(runId, capturedSteps.length); - }, - onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => { - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: success - // baseline for Safari comparison. - const diagNow = Date.now(); - this.logger.log( - `AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` + - `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + - `heartbeatsSent=${heartbeatsSent} steps=${steps.length}`, + // Link the assistant message (the #183 projection) to its run (#184), so a + // reconnecting client can resolve the run's output. Best-effort. + if (runId && assistantId) { + try { + await runHooks?.onAssistantSeeded?.(runId, assistantId); + } catch (err) { + this.logger.warn( + `Failed to link assistant row to run ${runId}: ${ + err instanceof Error ? err.message : 'unknown error' + }`, ); - // Finalize the assistant row (#183): the upfront 'streaming' row is - // UPDATEd to 'completed' with the turn's final text, cumulative usage and - // full UIMessage parts. We pass the SDK `steps` (which carry the final - // step's text) as the captured steps so metadata.parts matches the - // pre-#183 onFinish record exactly; `inProgressText` is '' here (the last - // step already finished). Final-step usage (usage.input+output) ≈ the - // conversation's CURRENT context size, distinct from totalUsage. - // - // COLUMN-SEMANTICS NOTE (#183): `content` is built by flushAssistant as - // the CONCATENATION of every step's text (stepsText), whereas pre-#183 - // it stored only the FINAL step's text. This is a deliberate, harmless - // change: the UI and the Markdown export render from `metadata.parts` - // (per-step text + tool parts), not from `content`; `content` is the - // plain-text projection (full-text search / fallback). A multi-step - // turn's `content` therefore now holds all steps' prose, not just the - // last block. - await finalizeAssistant( - flushAssistant(steps as StepLike[], '', 'completed', { - finishReason: finishReason as string, - usage: totalUsage as StreamUsage, - contextTokens: - (usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) || - undefined, - // Max context window for the chat header badge denominator; - // resolved from the admin-configured provider settings (in - // closure scope here). Omitted/0 = no limit. - maxContextTokens: resolved?.chatContextWindow, - }), - ); - // #184: settle the RUN as succeeded (best-effort, after the projection - // is finalized above). - if (runId) await runHooks?.onSettled?.(runId, 'completed'); - // Lifecycle: release the external MCP clients leased for this turn. - await closeExternalClients(); + } + } - // Generate the chat title for a freshly created chat AFTER the stream's - // provider call has completed — NOT concurrently with it. The z.ai coding - // endpoint stalls one of two concurrent requests to the same plan, which - // black-holed the chat stream (~300s headers timeout) when title - // generation raced it. Running it here (solo, fire-and-forget) avoids the - // race; never block the turn on it, swallow any error. - if (isNewChat && incomingText) { - void this.generateTitle(chatId, workspace.id, incomingText).catch( - (err) => { - this.logger.warn( - `Title generation failed: ${(err as Error)?.message ?? err}`, - ); - }, + // Per-step (non-terminal) update: persist the finished steps the moment a + // step ends. Tolerant — a failed update is logged and swallowed so it never + // throws into the stream. Keeps status 'streaming'. + const updateStreaming = async (): Promise => { + if (!assistantId) return; + // Cheap short-circuit once the turn is finalized (see `finalized` below). + // The AUTHORITATIVE guard is `onlyIfStreaming` on the UPDATE: a late + // fire-and-forget step update could still be in flight on another pool + // connection when finalize runs, so the SQL `WHERE status='streaming'` + // (not this flag) is what prevents it clobbering the terminal row. + if (finalized) return; + try { + await this.aiChatMessageRepo.update( + assistantId, + workspace.id, + flushAssistant(capturedSteps, '', 'streaming'), + { onlyIfStreaming: true }, + ); + } catch (err) { + this.logger.warn( + `Failed to update streaming assistant row: ${ + err instanceof Error ? err.message : 'unknown error' + }`, + ); + } + }; + + // Serialize the per-step updates (#183 review): onStepFinish fires them + // without await, so two could otherwise commit out of order on different pool + // connections (step N landing after N+1). Chaining each onto the previous + // keeps the persisted row monotonic with step order; each link short-circuits + // on `finalized`, so a tail of late updates is cheap. + let stepUpdateChain: Promise = Promise.resolve(); + + // Terminal finalize: write the completed/error/aborted row exactly once + // across the (mutually-exclusive, at-most-once) onFinish/onError/onAbort + // callbacks — mirroring the pre-#183 persist-at-most-once guard for the + // TERMINAL status (the row may be updated many times with 'streaming' before + // this fires once). + let finalized = false; + const finalizeAssistant = async ( + flushed: AssistantFlush, + ): Promise => { + if (finalized) return; + finalized = true; + const plan = planFinalizeAssistant(assistantId); + try { + // Shared dispatch (see applyFinalize): UPDATE the upfront row, or — when + // the upfront insert failed (kind 'insert') — INSERT the terminal row as + // the only safety against losing the turn entirely. + await applyFinalize( + this.aiChatMessageRepo, + plan, + { chatId, workspaceId: workspace.id, userId: user.id }, + flushed, + ); + } catch (err) { + this.logger.error( + `Failed to finalize assistant message (kind=${plan.kind})`, + err as Error, + ); + } + }; + + // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure + // first-chunk latency, the model-silent gap right before a disconnect, and + // how many SSE heartbeats were written, so a Safari drop can be classified + // (idle-gap vs hard wall-clock cap vs slow first chunk). + const streamStartedAt = Date.now(); + let firstModelChunkAt: number | undefined; + let lastModelChunkAt = streamStartedAt; + let heartbeatsSent = 0; + + // NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous + // failure here (or in pipe below) would skip the terminal callbacks, so the + // catch releases the leased external clients to avoid a connection leak. + let result: ReturnType; + try { + result = streamText({ + model, + system, + messages, + tools, + // No maxOutputTokens cap on the agent: tool-call arguments (e.g. a full + // page body for the write tools) are emitted as OUTPUT tokens, so a fixed + // cap would truncate complex tool calls mid-argument. Let the model use its + // natural per-step budget. (Cost/credit limits are an account concern, not + // something to enforce by silently breaking the agent.) + stopWhen: stepCountIs(MAX_AGENT_STEPS), + // Forced finalization: reserve the LAST allowed step for a text-only + // answer. Without this, a turn that spends all its steps on tool calls + // ends with no assistant text (an empty turn). prepareAgentStep forbids + // further tool calls and appends a synthesis instruction on that step, + // concatenated onto the original `system` so the persona is preserved. + prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), + // #184: the RUN's signal (explicit-stop) when a run wraps this turn, else + // the socket-bound signal (legacy). A browser disconnect aborts only in + // the legacy path. + abortSignal: effectiveSignal, + onChunk: ({ chunk }) => { + // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model + // output chunk means the stream is actively emitting bytes; track first + // + most-recent activity timestamps. + const now = Date.now(); + firstModelChunkAt ??= now; + lastModelChunkAt = now; + // 'text-delta' is the assistant's prose; tool-call args are separate chunk + // types — so this mirrors exactly what streams to the client. + if (chunk.type === 'text-delta') inProgressText += chunk.text; + }, + onStepFinish: (step) => { + // The finished step's full text is now in `step.text`; fold it in and reset + // the in-progress accumulator for the next step. + capturedSteps.push(step as StepLike); + inProgressText = ''; + // Step-granular durability (#183): persist this finished step (its text + + // tool calls + tool RESULTS) the moment it ends, so a process death after + // this point still recovers the step. Not awaited here (never block the + // stream), but SERIALIZED via stepUpdateChain so the writes commit in + // step order; updateStreaming is error-tolerant (logs + swallows). + stepUpdateChain = stepUpdateChain.then(() => updateStreaming()); + // #184: persist the run's progress (finished-step count). Fire-and- + // forget; the hook swallows its own errors. + if (runId) runHooks?.onStep?.(runId, capturedSteps.length); + }, + onFinish: async ({ + text, + finishReason, + totalUsage, + usage, + steps, + }) => { + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: success + // baseline for Safari comparison. + const diagNow = Date.now(); + this.logger.log( + `AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `heartbeatsSent=${heartbeatsSent} steps=${steps.length}`, ); - } - }, - onError: async ({ error }) => { - // NestJS Logger.error(message, stack?, context?): pass the real message - // (with statusCode when present) + the stack string, not the Error - // object, so the actual provider cause is clearly logged. Reuse the - // shared formatter so provider error formatting stays unified. - const e = error as { stack?: string }; - const errorText = describeProviderError(error, String(error)); - this.logger.error(`AI chat stream error: ${errorText}`, e?.stack); - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of - // an error-terminated stream. - const diagNow = Date.now(); - this.logger.warn( - `AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` + - `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + - `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`, - ); - // Finalize the PARTIAL answer streamed before the failure (text + any - // finished tool steps) WITH the error in metadata, so the turn shows what - // the user already saw plus the cause — not just a bare error. Status - // 'error' (#183). - await finalizeAssistant( - flushAssistant(capturedSteps, inProgressText, 'error', { - error: errorText, - }), - ); - // #184: settle the RUN as failed, carrying the provider/transport cause. - if (runId) await runHooks?.onSettled?.(runId, 'error', errorText); - await closeExternalClients(); - }, - onAbort: async ({ steps }) => { - const partialChars = - capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) + - inProgressText.length; - // Unlike onError/onFinish, this terminal path otherwise writes nothing, so - // an aborted turn (client disconnect / proxy drop / stop()) would be - // invisible in the logs. Log it (warn) so the abort is traceable. - this.logger.warn( - `AI chat stream aborted (chat ${chatId}) after ${steps.length} ` + - `step(s), ${partialChars} chars partial text; persisting partial turn.`, - ); - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key - // line — classifies the Safari drop. - const diagNow = Date.now(); - this.logger.warn( - `AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` + - `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + - `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` + - `steps=${steps.length}`, - ); - await finalizeAssistant( - flushAssistant(capturedSteps, inProgressText, 'aborted'), - ); - // #184: settle the RUN as aborted (an explicit user stop reached the - // run's signal; a disconnect does not abort a run-wrapped turn). - if (runId) await runHooks?.onSettled?.(runId, 'aborted'); - await closeExternalClients(); - }, - }); + // Finalize the assistant row (#183): the upfront 'streaming' row is + // UPDATEd to 'completed' with the turn's final text, cumulative usage and + // full UIMessage parts. We pass the SDK `steps` (which carry the final + // step's text) as the captured steps so metadata.parts matches the + // pre-#183 onFinish record exactly; `inProgressText` is '' here (the last + // step already finished). Final-step usage (usage.input+output) ≈ the + // conversation's CURRENT context size, distinct from totalUsage. + // + // COLUMN-SEMANTICS NOTE (#183): `content` is built by flushAssistant as + // the CONCATENATION of every step's text (stepsText), whereas pre-#183 + // it stored only the FINAL step's text. This is a deliberate, harmless + // change: the UI and the Markdown export render from `metadata.parts` + // (per-step text + tool parts), not from `content`; `content` is the + // plain-text projection (full-text search / fallback). A multi-step + // turn's `content` therefore now holds all steps' prose, not just the + // last block. + await finalizeAssistant( + flushAssistant(steps as StepLike[], '', 'completed', { + finishReason: finishReason as string, + usage: totalUsage as StreamUsage, + contextTokens: + (usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) || + undefined, + // Max context window for the chat header badge denominator; + // resolved from the admin-configured provider settings (in + // closure scope here). Omitted/0 = no limit. + maxContextTokens: resolved?.chatContextWindow, + }), + ); + // #184: settle the RUN as succeeded (best-effort, after the projection + // is finalized above). + if (runId) await runHooks?.onSettled?.(runId, 'completed'); + // Lifecycle: release the external MCP clients leased for this turn. + await closeExternalClients(); - // Drain the stream independently of the client socket so the turn always - // runs to completion (or to its abort) and the terminal callbacks - // (onFinish/onError/onAbort) fire — releasing the per-turn object graph - // (history, the per-request toolset closures, captured steps, SDK buffers) - // and closing leased MCP clients. WITHOUT this, a client disconnect leaves - // the pipe's dead socket as the only reader; backpressure stalls the stream, - // the callbacks never run, and every dropped turn stays rooted in memory — - // the heap-OOM leak. consumeStream removes that backpressure (AI SDK v6 - // "Handling client disconnects"). NOT awaited (fire-and-forget); the stream - // errors are already logged by the streamText `onError` callback above, so - // swallow here to avoid an unhandledRejection. - void result.consumeStream({ onError: () => undefined }); + // Generate the chat title for a freshly created chat AFTER the stream's + // provider call has completed — NOT concurrently with it. The z.ai coding + // endpoint stalls one of two concurrent requests to the same plan, which + // black-holed the chat stream (~300s headers timeout) when title + // generation raced it. Running it here (solo, fire-and-forget) avoids the + // race; never block the turn on it, swallow any error. + if (isNewChat && incomingText) { + void this.generateTitle(chatId, workspace.id, incomingText).catch( + (err) => { + this.logger.warn( + `Title generation failed: ${(err as Error)?.message ?? err}`, + ); + }, + ); + } + }, + onError: async ({ error }) => { + // NestJS Logger.error(message, stack?, context?): pass the real message + // (with statusCode when present) + the stack string, not the Error + // object, so the actual provider cause is clearly logged. Reuse the + // shared formatter so provider error formatting stays unified. + const e = error as { stack?: string }; + const errorText = describeProviderError(error, String(error)); + this.logger.error(`AI chat stream error: ${errorText}`, e?.stack); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of + // an error-terminated stream. + const diagNow = Date.now(); + this.logger.warn( + `AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`, + ); + // Finalize the PARTIAL answer streamed before the failure (text + any + // finished tool steps) WITH the error in metadata, so the turn shows what + // the user already saw plus the cause — not just a bare error. Status + // 'error' (#183). + await finalizeAssistant( + flushAssistant(capturedSteps, inProgressText, 'error', { + error: errorText, + }), + ); + // #184: settle the RUN as failed, carrying the provider/transport cause. + if (runId) await runHooks?.onSettled?.(runId, 'error', errorText); + await closeExternalClients(); + }, + onAbort: async ({ steps }) => { + const partialChars = + capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) + + inProgressText.length; + // Unlike onError/onFinish, this terminal path otherwise writes nothing, so + // an aborted turn (client disconnect / proxy drop / stop()) would be + // invisible in the logs. Log it (warn) so the abort is traceable. + this.logger.warn( + `AI chat stream aborted (chat ${chatId}) after ${steps.length} ` + + `step(s), ${partialChars} chars partial text; persisting partial turn.`, + ); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key + // line — classifies the Safari drop. + const diagNow = Date.now(); + this.logger.warn( + `AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` + + `steps=${steps.length}`, + ); + await finalizeAssistant( + flushAssistant(capturedSteps, inProgressText, 'aborted'), + ); + // #184: settle the RUN as aborted (an explicit user stop reached the + // run's signal; a disconnect does not abort a run-wrapped turn). + if (runId) await runHooks?.onSettled?.(runId, 'aborted'); + await closeExternalClients(); + }, + }); - // Stream the UI-message protocol straight to the hijacked Node response. - // Without onError the AI SDK masks the cause ('An error occurred.') and the - // UI shows a generic failure. Surface the real provider message instead. - // AI SDK error messages / 4xx bodies never contain the API key, so this is - // safe; we never dump the resolved config/apiKey. - // - // SSE buffering / proxy note: pipeUIMessageStreamToResponse writes the - // headers immediately (res.writeHead) and each chunk incrementally, and the - // SDK's default UI_MESSAGE_STREAM_HEADERS already include - // `x-accel-buffering: no` (disables nginx response buffering) plus - // `content-type: text/event-stream` and `cache-control: no-cache`. We pass - // `headers` explicitly anyway so the intent is visible here and survives any - // future change to the SDK defaults (prepareHeaders only fills a header when - // absent, so this never clobbers the SDK's content-type). DEPLOYMENT: the - // reverse proxy in front of this server MUST NOT buffer this route, or the - // whole response is released at once and nothing streams. nginx honours the - // `x-accel-buffering: no` header we send (and additionally set - // `proxy_buffering off; proxy_cache off;` for /api/ai-chat/stream); traefik - // does not buffer responses by default. - // Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2). - stripStreamingHopByHopHeaders(res.raw); - // Running sum of per-step usage (v6 `finish-step.usage` is per-step). Sent - // as the cumulative authoritative usage so the client never jumps DOWN. - let cumulativeStepUsage: ChatStreamUsage | undefined; - result.pipeUIMessageStreamToResponse(res.raw, { - headers: { 'X-Accel-Buffering': 'no' }, - // Surface the authoritative chatId on the streamed assistant UI message so - // the client adopts the REAL id of the row we created, instead of guessing - // the newest chat in its list. `messageMetadata` is invoked by the AI SDK - // on the `start`, `finish-step` and `finish` stream parts (ai@6 — note the - // `finish-step` trigger relies on it being delivered as its own - // message-metadata chunk); we attach `chatId` on the `start` part so it - // reaches the client (as message.metadata.chatId) at the very first chunk — - // before any second tab can race a newer chat into the list. This fixes the - // two-tab "adoption race" (#137). + // Drain the stream independently of the client socket so the turn always + // runs to completion (or to its abort) and the terminal callbacks + // (onFinish/onError/onAbort) fire — releasing the per-turn object graph + // (history, the per-request toolset closures, captured steps, SDK buffers) + // and closing leased MCP clients. WITHOUT this, a client disconnect leaves + // the pipe's dead socket as the only reader; backpressure stalls the stream, + // the callbacks never run, and every dropped turn stays rooted in memory — + // the heap-OOM leak. consumeStream removes that backpressure (AI SDK v6 + // "Handling client disconnects"). NOT awaited (fire-and-forget); the stream + // errors are already logged by the streamText `onError` callback above, so + // swallow here to avoid an unhandledRejection. + void result.consumeStream({ onError: () => undefined }); + + // Stream the UI-message protocol straight to the hijacked Node response. + // Without onError the AI SDK masks the cause ('An error occurred.') and the + // UI shows a generic failure. Surface the real provider message instead. + // AI SDK error messages / 4xx bodies never contain the API key, so this is + // safe; we never dump the resolved config/apiKey. // - // `finish-step.usage` is PER-STEP (not cumulative) in v6, and the client - // merges each metadata.usage by replacement — so on a multi-step agent turn - // (up to MAX_AGENT_STEPS) the naive per-step value would make the live - // counter jump DOWN at each boundary. We keep a running sum here and send - // the CUMULATIVE usage, which converges to `finish.totalUsage` (#151). - messageMetadata: ({ part }) => { - const p = part as StreamMetadataPart; - if (p.type === 'finish-step') { - cumulativeStepUsage = accumulateStepUsage( - cumulativeStepUsage, - normalizeStreamUsage(p.usage), - ); - } - return chatStreamMetadata(p, chatId, cumulativeStepUsage, runId); - }, - // Stream reasoning (thinking) parts to the client so the live counter can - // estimate reasoning tokens from streamed text. v6 default is already - // true; set explicitly so the intent survives any future SDK default - // change. Providers that don't emit reasoning text still surface the - // count via the authoritative `usage.reasoningTokens` on finish-step. - sendReasoning: true, - onError: (error: unknown) => { - // Reuse the shared formatter so provider error formatting stays - // unified between the log line and the streamed error message. - return describeProviderError(error, 'AI stream error'); - }, - }); + // SSE buffering / proxy note: pipeUIMessageStreamToResponse writes the + // headers immediately (res.writeHead) and each chunk incrementally, and the + // SDK's default UI_MESSAGE_STREAM_HEADERS already include + // `x-accel-buffering: no` (disables nginx response buffering) plus + // `content-type: text/event-stream` and `cache-control: no-cache`. We pass + // `headers` explicitly anyway so the intent is visible here and survives any + // future change to the SDK defaults (prepareHeaders only fills a header when + // absent, so this never clobbers the SDK's content-type). DEPLOYMENT: the + // reverse proxy in front of this server MUST NOT buffer this route, or the + // whole response is released at once and nothing streams. nginx honours the + // `x-accel-buffering: no` header we send (and additionally set + // `proxy_buffering off; proxy_cache off;` for /api/ai-chat/stream); traefik + // does not buffer responses by default. + // Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2). + stripStreamingHopByHopHeaders(res.raw); + // Running sum of per-step usage (v6 `finish-step.usage` is per-step). Sent + // as the cumulative authoritative usage so the client never jumps DOWN. + let cumulativeStepUsage: ChatStreamUsage | undefined; + result.pipeUIMessageStreamToResponse(res.raw, { + headers: { 'X-Accel-Buffering': 'no' }, + // Surface the authoritative chatId on the streamed assistant UI message so + // the client adopts the REAL id of the row we created, instead of guessing + // the newest chat in its list. `messageMetadata` is invoked by the AI SDK + // on the `start`, `finish-step` and `finish` stream parts (ai@6 — note the + // `finish-step` trigger relies on it being delivered as its own + // message-metadata chunk); we attach `chatId` on the `start` part so it + // reaches the client (as message.metadata.chatId) at the very first chunk — + // before any second tab can race a newer chat into the list. This fixes the + // two-tab "adoption race" (#137). + // + // `finish-step.usage` is PER-STEP (not cumulative) in v6, and the client + // merges each metadata.usage by replacement — so on a multi-step agent turn + // (up to MAX_AGENT_STEPS) the naive per-step value would make the live + // counter jump DOWN at each boundary. We keep a running sum here and send + // the CUMULATIVE usage, which converges to `finish.totalUsage` (#151). + messageMetadata: ({ part }) => { + const p = part as StreamMetadataPart; + if (p.type === 'finish-step') { + cumulativeStepUsage = accumulateStepUsage( + cumulativeStepUsage, + normalizeStreamUsage(p.usage), + ); + } + return chatStreamMetadata(p, chatId, cumulativeStepUsage, runId); + }, + // Stream reasoning (thinking) parts to the client so the live counter can + // estimate reasoning tokens from streamed text. v6 default is already + // true; set explicitly so the intent survives any future SDK default + // change. Providers that don't emit reasoning text still surface the + // count via the authoritative `usage.reasoningTokens` on finish-step. + sendReasoning: true, + onError: (error: unknown) => { + // Reuse the shared formatter so provider error formatting stays + // unified between the log line and the streamed error message. + return describeProviderError(error, 'AI stream error'); + }, + }); - // Force the status line + headers onto the socket NOW (before the model's - // first token), so the proxy sees the response start immediately even if the - // provider's first chunk is delayed. writeToServerResponse already called - // writeHead synchronously above; flushHeaders is a belt-and-braces no-op once - // headers are sent, and is guarded for response-likes that lack it. - res.raw.flushHeaders?.(); - // Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout). - // DIAGNOSTIC (Safari stream-drop investigation) — temporary: count beats so a disconnect log can show - // how many pings were written before Safari dropped. - startSseHeartbeat(res.raw, 15_000, () => { - heartbeatsSent += 1; - }); + // Force the status line + headers onto the socket NOW (before the model's + // first token), so the proxy sees the response start immediately even if the + // provider's first chunk is delayed. writeToServerResponse already called + // writeHead synchronously above; flushHeaders is a belt-and-braces no-op once + // headers are sent, and is guarded for response-likes that lack it. + res.raw.flushHeaders?.(); + // Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout). + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: count beats so a disconnect log can show + // how many pings were written before Safari dropped. + startSseHeartbeat(res.raw, 15_000, () => { + heartbeatsSent += 1; + }); + } catch (err) { + // Synchronous failure before/while wiring the stream: the terminal + // callbacks will not run, so release the leased external clients here and + // re-throw for the controller to surface on the socket. + await closeExternalClients(); + throw err; + } } catch (err) { - // Synchronous failure before/while wiring the stream: the terminal - // callbacks will not run, so release the leased external clients here and - // re-throw for the controller to surface on the socket. - await closeExternalClients(); + // #184 safety net (see the opening comment): settle the run on ANY failure + // before streamText's callbacks own the lifecycle, so the run row never + // stays 'running' forever (which would 409 every later turn in this chat). + // finalizeRun (onSettled) is idempotent — a settle here and a settle from a + // streamText callback collapse to a single terminal write. + if (runId) { + await runHooks?.onSettled?.( + runId, + 'error', + err instanceof Error + ? err.message + : 'Agent run failed before streaming started', + ); + } throw err; } } @@ -955,7 +993,10 @@ export class AiChatService implements OnModuleInit { * permission). The content is truncated to keep the prompt cheap and within * context limits. Throws AiNotConfiguredException (503) if AI is unconfigured. */ - async generatePageTitle(workspaceId: string, content: string): Promise { + async generatePageTitle( + workspaceId: string, + content: string, + ): Promise { const model = await this.ai.getChatModel(workspaceId); const { text } = await generateText({ model, diff --git a/apps/server/src/database/types/entity.types.ts b/apps/server/src/database/types/entity.types.ts index 0474ab9e..54c0753d 100644 --- a/apps/server/src/database/types/entity.types.ts +++ b/apps/server/src/database/types/entity.types.ts @@ -56,16 +56,12 @@ export type UpdatableAiChat = Updateable>; // full-text search. It is omitted from the public type so it never leaks // into HTTP responses or the chat history fed to the language model. export type AiChatMessage = Omit, 'tsv'>; -export type InsertableAiChatMessage = Omit< - Insertable, - 'tsv' ->; +export type InsertableAiChatMessage = Omit, 'tsv'>; // AI Chat Run (#184 phase 1): the agent run as a first-class lifecycle object, // detached from the HTTP request / browser window. export type AiChatRun = Selectable; export type InsertableAiChatRun = Insertable; -export type UpdatableAiChatRun = Updateable>; // AI Provider Credentials // SECURITY (D9/§8.1): holds encrypted per-workspace provider API keys. @@ -211,11 +207,14 @@ export type UpdatableFavorite = Updateable>; // Page Transclusion export type PageTransclusion = Selectable; export type InsertablePageTransclusion = Insertable; -export type UpdatablePageTransclusion = Updateable>; +export type UpdatablePageTransclusion = Updateable< + Omit +>; // Page Transclusion Reference export type PageTransclusionReference = Selectable; -export type InsertablePageTransclusionReference = Insertable; +export type InsertablePageTransclusionReference = + Insertable; export type UpdatablePageTransclusionReference = Updateable< Omit >; @@ -285,7 +284,9 @@ export type UpdatablePagePermission = Updateable>; // Page Verification export type PageVerification = Selectable<_PageVerifications>; export type InsertablePageVerification = Insertable<_PageVerifications>; -export type UpdatablePageVerification = Updateable>; +export type UpdatablePageVerification = Updateable< + Omit<_PageVerifications, 'id'> +>; // Page Verifier export type PageVerifier = Selectable<_PageVerifiers>;