diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts new file mode 100644 index 00000000..7e93013b --- /dev/null +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -0,0 +1,189 @@ +import { Logger } from '@nestjs/common'; +import { + AiChatRunService, + mapTurnStatusToRun, +} from './ai-chat-run.service'; + +/** + * Unit coverage for the #184 phase-1 run lifecycle (AiChatRunService) with a + * hand-rolled mock repo — no Nest graph, no DB. The invariant under test is the + * one that makes a run "autonomous": a run keeps going when its SUBSCRIBER (the + * browser) detaches, and ONLY an explicit stop aborts it. We assert that at the + * abort-signal level (the signal the agent loop actually consumes). + */ + +function makeRepo(overrides: Record = {}) { + return { + insert: jest.fn(async (v: any) => ({ + id: 'run-1', + status: v.status ?? 'running', + chatId: v.chatId, + workspaceId: v.workspaceId, + })), + update: jest.fn(async () => ({ id: 'run-1' })), + markStopRequested: jest.fn(async () => ({ id: 'run-1' })), + findActiveByChat: jest.fn(async () => undefined), + findLatestByChat: jest.fn(async () => undefined), + findById: jest.fn(async () => undefined), + sweepRunning: jest.fn(async () => 0), + ...overrides, + }; +} + +describe('mapTurnStatusToRun', () => { + it('maps the turn terminal status to the run terminal status', () => { + expect(mapTurnStatusToRun('completed')).toBe('succeeded'); + expect(mapTurnStatusToRun('error')).toBe('failed'); + expect(mapTurnStatusToRun('aborted')).toBe('aborted'); + }); +}); + +describe('AiChatRunService.onModuleInit (startup sweep)', () => { + afterEach(() => jest.restoreAllMocks()); + + it('calls sweepRunning and resolves; logs when > 0', async () => { + const repo = makeRepo({ sweepRunning: jest.fn(async () => 2) }); + const logSpy = jest + .spyOn(Logger.prototype, 'log') + .mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never); + await expect(svc.onModuleInit()).resolves.toBeUndefined(); + expect(repo.sweepRunning).toHaveBeenCalledTimes(1); + expect(logSpy).toHaveBeenCalledTimes(1); + expect(String(logSpy.mock.calls[0][0])).toContain('2'); + }); + + it('a sweep failure is swallowed (never blocks startup)', async () => { + const repo = makeRepo({ + sweepRunning: jest.fn(async () => { + throw new Error('db down'); + }), + }); + const warnSpy = jest + .spyOn(Logger.prototype, 'warn') + .mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never); + await expect(svc.onModuleInit()).resolves.toBeUndefined(); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(String(warnSpy.mock.calls[0][0])).toContain('db down'); + }); +}); + +describe('AiChatRunService run lifecycle', () => { + it('beginRun inserts a running row and registers a live abort controller', async () => { + const repo = makeRepo(); + const svc = new AiChatRunService(repo as never); + const handle = await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + expect(repo.insert).toHaveBeenCalledWith( + expect.objectContaining({ + chatId: 'chat-1', + workspaceId: 'ws-1', + createdBy: 'user-1', + status: 'running', + trigger: 'user', + }), + ); + expect(handle.runId).toBe('run-1'); + expect(handle.signal.aborted).toBe(false); + expect(svc.isLocallyActive('run-1')).toBe(true); + }); + + it('a SUBSCRIBER detaching does NOT abort the run (only an explicit stop does)', async () => { + const repo = makeRepo(); + const svc = new AiChatRunService(repo as never); + const handle = await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + // Model a browser disconnect: nothing in the run service is told to stop. + // The signal the agent loop consumes must stay un-aborted and the run stays + // locally active — i.e. it keeps running server-side. + expect(handle.signal.aborted).toBe(false); + expect(svc.isLocallyActive('run-1')).toBe(true); + // markStopRequested was never called by a mere detach. + expect(repo.markStopRequested).not.toHaveBeenCalled(); + }); + + it('requestStop aborts the live controller, marks the row, and reports true', async () => { + const repo = makeRepo(); + const svc = new AiChatRunService(repo as never); + const handle = await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + const aborted = jest.fn(); + handle.signal.addEventListener('abort', aborted); + + const result = await svc.requestStop('run-1', 'ws-1'); + + expect(result).toBe(true); + expect(handle.signal.aborted).toBe(true); + expect(aborted).toHaveBeenCalledTimes(1); + expect(repo.markStopRequested).toHaveBeenCalledWith('run-1', 'ws-1'); + }); + + it('requestStop on a run this replica does NOT hold still marks the row (true)', async () => { + // e.g. after a restart, or a sibling replica owns the controller. The row is + // marked so the owning replica/sweep settles it; we report a stop took effect. + const repo = makeRepo({ + markStopRequested: jest.fn(async () => ({ id: 'run-9' })), + }); + const svc = new AiChatRunService(repo as never); + const result = await svc.requestStop('run-9', 'ws-1'); + expect(result).toBe(true); + expect(svc.isLocallyActive('run-9')).toBe(false); + }); + + it('requestStop on an already-settled run (nothing active) reports false', async () => { + const repo = makeRepo({ + markStopRequested: jest.fn(async () => undefined), + }); + const svc = new AiChatRunService(repo as never); + const result = await svc.requestStop('run-done', 'ws-1'); + expect(result).toBe(false); + }); + + it('finalizeRun settles the row to the mapped status with finishedAt and drops the in-memory entry', async () => { + const repo = makeRepo(); + const svc = new AiChatRunService(repo as never); + await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + expect(svc.isLocallyActive('run-1')).toBe(true); + + await svc.finalizeRun('run-1', 'ws-1', 'error', 'provider blew up'); + + expect(svc.isLocallyActive('run-1')).toBe(false); + expect(repo.update).toHaveBeenCalledWith( + 'run-1', + 'ws-1', + expect.objectContaining({ + status: 'failed', + error: 'provider blew up', + finishedAt: expect.any(Date), + }), + ); + }); + + it('recordStep / linkAssistantMessage are best-effort: a repo failure is swallowed', async () => { + const repo = makeRepo({ + update: jest.fn(async () => { + throw new Error('transient'); + }), + }); + jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never); + await expect(svc.recordStep('run-1', 'ws-1', 3)).resolves.toBeUndefined(); + await expect( + svc.linkAssistantMessage('run-1', 'ws-1', 'msg-1'), + ).resolves.toBeUndefined(); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts new file mode 100644 index 00000000..e23668bb --- /dev/null +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -0,0 +1,242 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo'; +import { AiChatRun } from '@docmost/db/types/entity.types'; + +/** + * The terminal status of a TURN (the #183 assistant-row lifecycle) maps onto the + * terminal status of a RUN (#184). A turn that completed -> the run succeeded; a + * turn that errored -> the run failed; a turn aborted (explicit user stop) -> the + * run aborted. Pure + unit-testable. + */ +export type TurnTerminalStatus = 'completed' | 'error' | 'aborted'; +export type RunTerminalStatus = 'succeeded' | 'failed' | 'aborted'; + +export function mapTurnStatusToRun( + status: TurnTerminalStatus, +): RunTerminalStatus { + switch (status) { + case 'completed': + return 'succeeded'; + case 'error': + return 'failed'; + case 'aborted': + return 'aborted'; + } +} + +/** An in-flight run held in process memory: its AbortController is the ONLY thing + * that can stop the turn (an explicit user stop), independent of the browser + * socket. A mere disconnect never touches it, so the run keeps going. */ +interface ActiveRun { + controller: AbortController; + chatId: string; + workspaceId: string; +} + +/** The live handle the streaming path drives a run through (returned by + * {@link AiChatRunService.beginRun}). The `signal` governs the agent loop's + * abort — wired to the run, NOT to the HTTP socket. */ +export interface RunHandle { + runId: string; + signal: AbortSignal; +} + +/** + * AiChatRunService (#184 phase 1) — owns the agent RUN as a first-class, + * server-side lifecycle object detached from the HTTP request / browser window. + * + * Responsibilities: + * - create a run row when a turn starts (pending -> running) and register an + * in-memory AbortController for it (the explicit-stop lever); + * - finalize the run row (succeeded / failed / aborted) and unregister it; + * - service an EXPLICIT user stop (`requestStop`) — the ONLY thing that aborts a + * run; a browser disconnect deliberately does NOT; + * - crash-recovery sweep of dangling runs on startup. + * + * The agent loop itself still runs in AiChatService.stream (reusing #183's + * step-granular durable write path, `consumeStream` already drains it independent + * of the socket); this service only wraps it in a durable lifecycle and an + * abort handle that outlives the subscriber. + */ +@Injectable() +export class AiChatRunService implements OnModuleInit { + private readonly logger = new Logger(AiChatRunService.name); + + // runId -> ActiveRun. Process-local on purpose (phase 1 is single-process / + // in-memory transport; a cross-process BullMQ runner + Redis stop-signal is + // deferred to phase 2). A stop for a runId not in this map (e.g. after a + // restart) still records `stop_requested_at` on the row. + private readonly active = new Map(); + + constructor(private readonly runRepo: AiChatRunRepo) {} + + /** + * Crash-recovery sweep on server start: any run left pending/running that has + * been untouched past the staleness window is the relic of a process that died + * mid-turn; flip it to 'aborted'. Best-effort — a sweep failure is logged but + * MUST NOT block startup (mirrors AiChatService.onModuleInit for #183). + */ + async onModuleInit(): Promise { + try { + const swept = await this.runRepo.sweepRunning(); + if (swept > 0) { + this.logger.log( + `Startup sweep: marked ${swept} dangling agent run(s) as 'aborted'.`, + ); + } + } catch (err) { + this.logger.warn( + `Startup sweep of dangling runs failed: ${ + err instanceof Error ? err.message : 'unknown error' + }`, + ); + } + } + + /** + * Start a run for a turn: insert the run row (status 'running', startedAt now), + * register a fresh AbortController for it, and return a {@link RunHandle} whose + * `signal` the agent loop uses. The DB unique index guarantees at most one + * active run per chat — a second concurrent start on the same chat REJECTS at + * the insert (the caller surfaces a 409). The controller is registered AFTER a + * successful insert so a rejected start leaks nothing. + */ + async beginRun(args: { + chatId: string; + workspaceId: string; + userId: string; + trigger?: string; + }): Promise { + const run = await this.runRepo.insert({ + chatId: args.chatId, + workspaceId: args.workspaceId, + createdBy: args.userId, + trigger: args.trigger ?? 'user', + status: 'running', + startedAt: new Date(), + }); + const controller = new AbortController(); + this.active.set(run.id, { + controller, + chatId: args.chatId, + workspaceId: args.workspaceId, + }); + return { runId: run.id, signal: controller.signal }; + } + + /** Link the assistant message (the #183 projection) to its run. Best-effort. */ + async linkAssistantMessage( + runId: string, + workspaceId: string, + assistantMessageId: string, + ): Promise { + try { + await this.runRepo.update(runId, workspaceId, { assistantMessageId }); + } catch (err) { + this.logger.warn( + `Failed to link assistant message to run ${runId}: ${ + err instanceof Error ? err.message : 'unknown error' + }`, + ); + } + } + + /** Persist progress: bump the run's finished-step count. Best-effort (never + * blocks or breaks the stream). */ + async recordStep( + runId: string, + workspaceId: string, + stepCount: number, + ): Promise { + try { + await this.runRepo.update(runId, workspaceId, { stepCount }); + } catch (err) { + this.logger.warn( + `Failed to record step for run ${runId}: ${ + err instanceof Error ? err.message : 'unknown error' + }`, + ); + } + } + + /** + * Finalize a run to its terminal status (succeeded / failed / aborted), + * stamping finishedAt + any error, and DROP its in-memory entry. Idempotent + * and best-effort: the at-most-once turn terminal callbacks call it, but a + * double call (or a call after the row was swept) merely re-writes the same + * terminal row. + */ + async finalizeRun( + runId: string, + workspaceId: string, + turnStatus: TurnTerminalStatus, + error?: string, + ): Promise { + this.active.delete(runId); + try { + await this.runRepo.update(runId, workspaceId, { + status: mapTurnStatusToRun(turnStatus), + finishedAt: new Date(), + error: error ?? null, + }); + } catch (err) { + this.logger.warn( + `Failed to finalize run ${runId}: ${ + err instanceof Error ? err.message : 'unknown error' + }`, + ); + } + } + + /** + * Request an EXPLICIT stop of a run (the user pressed Stop). This is the ONLY + * thing that aborts a run — distinct from a browser disconnect, which leaves + * the run going. Records `stop_requested_at` on the row (only while active) and + * aborts the in-process controller if this replica owns the run. Returns true + * when a stop took effect (row marked and/or controller aborted), false when + * there was nothing active to stop. + */ + async requestStop(runId: string, workspaceId: string): Promise { + const marked = await this.runRepo.markStopRequested(runId, workspaceId); + const entry = this.active.get(runId); + if (entry) { + // Abort the live turn -> streamText onAbort fires -> the partial is + // persisted (#183) and finalizeRun settles the row as 'aborted'. + entry.controller.abort(); + } + return Boolean(marked) || Boolean(entry); + } + + /** Latest persisted run for a chat — the reconnect target (an in-flight or + * finished run). Pure read-through to the repo. */ + getLatestForChat( + chatId: string, + workspaceId: string, + ): Promise { + return this.runRepo.findLatestByChat(chatId, workspaceId); + } + + /** Fetch a run by id (workspace-scoped). Used to resolve + ownership-check an + * explicit stop targeting a runId. */ + getRun( + runId: string, + workspaceId: string, + ): Promise { + return this.runRepo.findById(runId, workspaceId); + } + + /** The active run on a chat, if any (used to reject a concurrent start with a + * clean 409 before committing to the stream). */ + getActiveForChat( + chatId: string, + workspaceId: string, + ): Promise { + return this.runRepo.findActiveByChat(chatId, workspaceId); + } + + /** Test/diagnostic seam: whether this replica is holding a live controller for + * the run. */ + isLocallyActive(runId: string): boolean { + return this.active.has(runId); + } +} diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.bound-chat.spec.ts b/apps/server/src/core/ai-chat/ai-chat.controller.bound-chat.spec.ts index 769123a8..aa5adddf 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.bound-chat.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.bound-chat.spec.ts @@ -19,6 +19,7 @@ describe('AiChatController.boundChat', () => { }; const controller = new AiChatController( {} as never, + {} as never, // aiChatRunService aiChatRepo as never, {} as never, {} as never, diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts b/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts index f46aeaa0..4ceba306 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts @@ -53,6 +53,7 @@ describe('AiChatController.export', () => { }; const controller = new AiChatController( {} as never, + {} as never, // aiChatRunService aiChatRepo as never, aiChatMessageRepo as never, {} as never, diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.ts b/apps/server/src/core/ai-chat/ai-chat.controller.ts index 7834d6b2..2506cfe8 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.ts @@ -1,6 +1,7 @@ import { BadRequestException, Body, + ConflictException, Controller, ForbiddenException, HttpCode, @@ -27,7 +28,12 @@ import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.rep import { UserThrottlerGuard } from '../../integrations/throttle/user-throttler.guard'; import { AI_CHAT_THROTTLER } from '../../integrations/throttle/throttler-names'; import { FileInterceptor } from '../../common/interceptors/file.interceptor'; -import { AiChatService, AiChatStreamBody } from './ai-chat.service'; +import { + AiChatRunHooks, + AiChatService, + AiChatStreamBody, +} from './ai-chat.service'; +import { AiChatRunService } from './ai-chat-run.service'; import { AiTranscriptionService } from './ai-transcription.service'; import { BoundChatDto, @@ -35,7 +41,9 @@ import { ExportChatDto, GeneratePageTitleDto, GetChatMessagesDto, + GetRunDto, RenameChatDto, + StopRunDto, } from './dto/ai-chat.dto'; import { describeProviderError } from '../../integrations/ai/ai-error.util'; import { buildChatMarkdown } from './chat-markdown.util'; @@ -52,6 +60,7 @@ export class AiChatController { constructor( private readonly aiChatService: AiChatService, + private readonly aiChatRunService: AiChatRunService, private readonly aiChatRepo: AiChatRepo, private readonly aiChatMessageRepo: AiChatMessageRepo, private readonly aiTranscription: AiTranscriptionService, @@ -137,6 +146,75 @@ export class AiChatController { return { markdown }; } + /** + * Reconnect to the latest run of a chat (#184 phase 1). Returns the run's + * persisted lifecycle state ({ status, error, stepCount, timings, ... }) plus + * the assistant message it projects (the partial/final output) — the DB is the + * source of truth, so this works for an in-flight run (the browser dropped, the + * run kept going) and a finished one alike. Owner-gated via assertOwnedChat. + * `{ run: null }` when the chat has never had a run. + */ + @HttpCode(HttpStatus.OK) + @Post('run') + async getRun( + @Body() dto: GetRunDto, + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + ): Promise<{ run: unknown; message: unknown }> { + await this.assertOwnedChat(dto.chatId, user, workspace); + const run = await this.aiChatRunService.getLatestForChat( + dto.chatId, + workspace.id, + ); + if (!run) return { run: null, message: null }; + const message = run.assistantMessageId + ? await this.aiChatMessageRepo.findById( + run.assistantMessageId, + workspace.id, + ) + : undefined; + return { run, message: message ?? null }; + } + + /** + * Explicitly STOP an agent run (#184 phase 1) — the user pressed Stop. This is + * the ONLY thing that ends a detached run; a browser disconnect deliberately + * does not. Target by `runId` (from the streamed start metadata) or by `chatId` + * (stop whatever run is active on it). Owner-gated. Returns + * `{ stopped }` — false when there was nothing active to stop. + */ + @HttpCode(HttpStatus.OK) + @Post('stop') + async stopRun( + @Body() dto: StopRunDto, + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + ): Promise<{ stopped: boolean }> { + let runId = dto.runId; + if (!runId && !dto.chatId) { + throw new BadRequestException('runId or chatId is required'); + } + if (runId) { + // Resolve the run to its chat and owner-gate via that chat. + const run = await this.aiChatRunService.getRun(runId, workspace.id); + if (!run) return { stopped: false }; + await this.assertOwnedChat(run.chatId, user, workspace); + } else { + await this.assertOwnedChat(dto.chatId!, user, workspace); + const active = await this.aiChatRunService.getActiveForChat( + dto.chatId!, + workspace.id, + ); + if (!active) return { stopped: false }; + runId = active.id; + } + const stopped = await this.aiChatRunService.requestStop( + runId, + workspace.id, + ); + return { stopped }; + } + /** Rename a chat. */ @HttpCode(HttpStatus.OK) @Post('rename') @@ -188,11 +266,20 @@ export class AiChatController { @AuthWorkspace() workspace: Workspace, ): Promise { // A7 gate: the workspace must have AI chat explicitly enabled. - const settings = (workspace.settings ?? {}) as { ai?: { chat?: boolean } }; + const settings = (workspace.settings ?? {}) as { + ai?: { chat?: boolean; autonomousRuns?: boolean }; + }; if (settings.ai?.chat !== true) { throw new ForbiddenException('AI chat is disabled'); } + // #184 phase 1 flag: when ON, the turn becomes a detached, durable RUN — its + // lifecycle is tracked in ai_chat_runs, a browser disconnect no longer aborts + // it, and only an explicit /ai-chat/stop ends it. When OFF (the default) the + // turn is socket-bound exactly as before, so existing deployments are + // unaffected. + const autonomousRuns = settings.ai?.autonomousRuns === true; + const sessionId = (req.raw as { sessionId?: string }).sessionId; if (!sessionId) { // The chat requires an interactive session to mint loopback tokens @@ -216,6 +303,57 @@ export class AiChatController { // HttpException) instead of breaking mid-stream. const model = await this.aiChatService.getChatModel(workspace.id, role); + // #184: one active run per chat. For an EXISTING chat reject a concurrent + // start with a clean 409 BEFORE hijack (the common double-submit / second-tab + // case), so the user gets JSON, not a mid-stream error. A brand-new chat + // (no chatId) cannot have a prior run, and the DB partial unique index is the + // backstop against any race that slips past this check. + if (autonomousRuns && body.chatId) { + const active = await this.aiChatRunService.getActiveForChat( + body.chatId, + workspace.id, + ); + if (active) { + throw new ConflictException( + 'An agent run is already in progress for this chat', + ); + } + } + + // Run-lifecycle hooks (#184), only when the flag is on. They wrap the turn in + // a durable run whose abort is governed by the run (explicit stop), persist + // its progress, and settle its terminal status — see AiChatRunService. + const runHooks: AiChatRunHooks | undefined = autonomousRuns + ? { + begin: (chatId) => + this.aiChatRunService.beginRun({ + chatId, + workspaceId: workspace.id, + userId: user.id, + trigger: 'user', + }), + onAssistantSeeded: (runId, messageId) => + this.aiChatRunService.linkAssistantMessage( + runId, + workspace.id, + messageId, + ), + onStep: (runId, stepCount) => + void this.aiChatRunService.recordStep( + runId, + workspace.id, + stepCount, + ), + onSettled: (runId, status, error) => + this.aiChatRunService.finalizeRun( + runId, + workspace.id, + status, + error, + ), + } + : undefined; + // Abort the agent loop when the client disconnects. `close` also fires on // normal completion, so only abort when the response has not finished // writing (a genuine disconnect). `once` fires at most once and self-removes; @@ -230,18 +368,44 @@ export class AiChatController { // A genuine disconnect leaves the response unfinished (unlike a normal // completion, which also fires `close`). Such a drop — e.g. a reverse // proxy cutting the SSE mid-answer — is otherwise invisible server-side, - // so log it here before aborting the agent loop. + // so log it here. if (!res.raw.writableEnded) { - this.logger.warn( - `AI chat stream: client disconnected before completion; aborting turn ` + - `(elapsed=${Date.now() - reqStartedAt}ms since request received)`, - ); - controller.abort(); + if (autonomousRuns) { + // #184: the turn is a DETACHED run. A disconnect must NOT abort it — + // the run keeps executing and persisting server-side; the client + // reconnects via /ai-chat/run (or re-stops via /ai-chat/stop). Log only. + this.logger.log( + `AI chat stream: client disconnected; run continues server-side ` + + `(elapsed=${Date.now() - reqStartedAt}ms since request received)`, + ); + } else { + this.logger.warn( + `AI chat stream: client disconnected before completion; aborting turn ` + + `(elapsed=${Date.now() - reqStartedAt}ms since request received)`, + ); + controller.abort(); + } } }; req.raw.once('close', onClose); res.raw.once('finish', () => req.raw.off('close', onClose)); + // #184: in detached mode the turn is NOT aborted on disconnect, so the SDK's + // pipe keeps writing to a socket the client may have dropped — for the rest of + // the (continuing) run. A write to the dead socket can emit an 'error' on the + // raw response; without a listener that surfaces as an unhandled error event. + // Swallow it (the run continues server-side regardless). Legacy mode aborts on + // disconnect, so it does not need this and keeps its exact prior behavior. + if (autonomousRuns) { + res.raw.on('error', (err) => { + this.logger.debug( + `AI chat detached stream: post-disconnect socket error swallowed: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); + } + // Commit to streaming: hijack so Fastify stops managing the response and // the AI SDK can write the UI-message stream directly to the Node socket. res.hijack(); @@ -256,6 +420,8 @@ export class AiChatController { signal: controller.signal, model, role, + // #184: present only when the flag is on; wraps the turn in a durable run. + runHooks, }); } catch (err) { // Any failure AFTER hijack can no longer send a clean JSON error, so emit diff --git a/apps/server/src/core/ai-chat/ai-chat.generate-page-title.spec.ts b/apps/server/src/core/ai-chat/ai-chat.generate-page-title.spec.ts index ef242fdf..ef94e5b7 100644 --- a/apps/server/src/core/ai-chat/ai-chat.generate-page-title.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.generate-page-title.spec.ts @@ -57,6 +57,7 @@ describe('AiChatController.generatePageTitle', () => { const aiChatService = { generatePageTitle: generate }; const controller = new AiChatController( aiChatService as never, + {} as never, // aiChatRunService {} as never, {} as never, {} as never, diff --git a/apps/server/src/core/ai-chat/ai-chat.module.ts b/apps/server/src/core/ai-chat/ai-chat.module.ts index b8afd4c1..c0e8aa02 100644 --- a/apps/server/src/core/ai-chat/ai-chat.module.ts +++ b/apps/server/src/core/ai-chat/ai-chat.module.ts @@ -3,6 +3,7 @@ import { AiModule } from '../../integrations/ai/ai.module'; import { TokenModule } from '../auth/token.module'; import { AiChatController } from './ai-chat.controller'; import { AiChatService } from './ai-chat.service'; +import { AiChatRunService } from './ai-chat-run.service'; import { AiTranscriptionService } from './ai-transcription.service'; import { AiChatToolsService } from './tools/ai-chat-tools.service'; import { EmbeddingModule } from './embedding/embedding.module'; @@ -42,6 +43,7 @@ import { PublicShareChatToolsService } from './tools/public-share-chat-tools.ser controllers: [AiChatController, PublicShareChatController], providers: [ AiChatService, + AiChatRunService, AiTranscriptionService, AiChatToolsService, PublicShareChatService, diff --git a/apps/server/src/core/ai-chat/ai-chat.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.spec.ts index 4e5ac72a..418ed178 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.spec.ts @@ -371,6 +371,12 @@ describe('chatStreamMetadata', () => { }); }); + it('attaches the runId on the start part when a run wraps the turn (#184)', () => { + expect( + chatStreamMetadata({ type: 'start' }, 'chat-1', undefined, 'run-1'), + ).toEqual({ chatId: 'chat-1', runId: 'run-1' }); + }); + it('returns the CUMULATIVE step usage passed in for the finish-step part', () => { // finish-step usage is per-step in v6; the caller accumulates and passes the // running sum, which this just wraps. diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index e4c81584..5095b0f3 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -140,6 +140,33 @@ export interface AiChatStreamBody { messages?: UIMessage[]; } +/** + * Optional run-lifecycle hooks (#184 phase 1). When supplied, the turn is wrapped + * in a first-class server-side RUN: `begin` is called once the chat id is known + * and returns the run's AbortSignal (decoupled from the HTTP socket — a browser + * disconnect no longer governs the abort), and the lifecycle callbacks persist + * the run's progress and terminal status. Absent (the default) => the legacy + * socket-bound behavior is unchanged. + */ +export interface AiChatRunHooks { + // Called once the chat id is resolved; returns the run handle whose `signal` + // drives the agent loop's abort. Returning null disables run tracking (the + // turn falls back to the passed-in socket signal). + begin( + chatId: string, + ): Promise<{ runId: string; signal: AbortSignal } | null>; + onAssistantSeeded?( + runId: string, + assistantMessageId: string, + ): Promise | void; + onStep?(runId: string, stepCount: number): void; + onSettled?( + runId: string, + status: 'completed' | 'error' | 'aborted', + error?: string, + ): Promise | void; +} + export interface AiChatStreamArgs { user: User; workspace: Workspace; @@ -147,6 +174,10 @@ export interface AiChatStreamArgs { body: AiChatStreamBody; res: FastifyReply; signal: AbortSignal; + // Run-lifecycle hooks (#184). When present the turn becomes a detached, + // durable RUN whose abort is governed by the run (explicit stop), not the + // socket; when absent the turn stays socket-bound (legacy behavior). + runHooks?: AiChatRunHooks; // Resolved by the controller BEFORE res.hijack(), so an unconfigured provider // (AiNotConfiguredException -> 503) surfaces as clean JSON before streaming. // For a role with a model override this already carries the override-resolved @@ -303,6 +334,7 @@ export class AiChatService implements OnModuleInit { signal, model, role, + runHooks, }: AiChatStreamArgs): Promise { // Resolve / create the chat. A new chat is created when no valid chatId is // supplied or the supplied one does not belong to this workspace. @@ -347,6 +379,31 @@ export class AiChatService implements OnModuleInit { isNewChat = true; } + // Start the durable RUN now that the chat id is known (#184 phase 1). The + // returned `runId` + `signal` make the turn a first-class server-side object + // whose abort is governed by the run (an explicit user stop), NOT by the HTTP + // socket — so a browser disconnect no longer ends the turn. With no runHooks + // (the default / flag off) the turn stays socket-bound via `signal` and + // `runId` is undefined, leaving the legacy path byte-for-byte unchanged. + let runId: string | undefined; + let effectiveSignal = signal; + if (runHooks) { + try { + const handle = await runHooks.begin(chatId); + if (handle) { + runId = handle.runId; + effectiveSignal = handle.signal; + } + } catch (err) { + // A failed run start must not break the turn — fall back to the socket + // signal (legacy behavior) and stream anyway. + this.logger.error( + `Failed to begin agent run (chat ${chatId}); streaming without run tracking`, + err as Error, + ); + } + } + // Extract the incoming user turn (the last user message from useChat). const incoming = lastUserMessage(body.messages); const incomingText = uiMessageText(incoming); @@ -520,6 +577,20 @@ export class AiChatService implements OnModuleInit { ); } + // Link the assistant message (the #183 projection) to its run (#184), so a + // reconnecting client can resolve the run's output. Best-effort. + if (runId && assistantId) { + try { + await runHooks?.onAssistantSeeded?.(runId, assistantId); + } catch (err) { + this.logger.warn( + `Failed to link assistant row to run ${runId}: ${ + err instanceof Error ? err.message : 'unknown error' + }`, + ); + } + } + // Per-step (non-terminal) update: persist the finished steps the moment a // step ends. Tolerant — a failed update is logged and swallowed so it never // throws into the stream. Keeps status 'streaming'. @@ -615,7 +686,10 @@ export class AiChatService implements OnModuleInit { // further tool calls and appends a synthesis instruction on that step, // concatenated onto the original `system` so the persona is preserved. prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), - abortSignal: signal, + // #184: the RUN's signal (explicit-stop) when a run wraps this turn, else + // the socket-bound signal (legacy). A browser disconnect aborts only in + // the legacy path. + abortSignal: effectiveSignal, onChunk: ({ chunk }) => { // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model // output chunk means the stream is actively emitting bytes; track first @@ -638,6 +712,9 @@ export class AiChatService implements OnModuleInit { // stream), but SERIALIZED via stepUpdateChain so the writes commit in // step order; updateStreaming is error-tolerant (logs + swallows). stepUpdateChain = stepUpdateChain.then(() => updateStreaming()); + // #184: persist the run's progress (finished-step count). Fire-and- + // forget; the hook swallows its own errors. + if (runId) runHooks?.onStep?.(runId, capturedSteps.length); }, onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => { // DIAGNOSTIC (Safari stream-drop investigation) — temporary: success @@ -677,6 +754,9 @@ export class AiChatService implements OnModuleInit { maxContextTokens: resolved?.chatContextWindow, }), ); + // #184: settle the RUN as succeeded (best-effort, after the projection + // is finalized above). + if (runId) await runHooks?.onSettled?.(runId, 'completed'); // Lifecycle: release the external MCP clients leased for this turn. await closeExternalClients(); @@ -721,6 +801,8 @@ export class AiChatService implements OnModuleInit { error: errorText, }), ); + // #184: settle the RUN as failed, carrying the provider/transport cause. + if (runId) await runHooks?.onSettled?.(runId, 'error', errorText); await closeExternalClients(); }, onAbort: async ({ steps }) => { @@ -746,6 +828,9 @@ export class AiChatService implements OnModuleInit { await finalizeAssistant( flushAssistant(capturedSteps, inProgressText, 'aborted'), ); + // #184: settle the RUN as aborted (an explicit user stop reached the + // run's signal; a disconnect does not abort a run-wrapped turn). + if (runId) await runHooks?.onSettled?.(runId, 'aborted'); await closeExternalClients(); }, }); @@ -812,7 +897,7 @@ export class AiChatService implements OnModuleInit { normalizeStreamUsage(p.usage), ); } - return chatStreamMetadata(p, chatId, cumulativeStepUsage); + return chatStreamMetadata(p, chatId, cumulativeStepUsage, runId); }, // Stream reasoning (thinking) parts to the client so the live counter can // estimate reasoning tokens from streamed text. v6 default is already @@ -979,8 +1064,12 @@ export function chatStreamMetadata( part: StreamMetadataPart, chatId: string, cumulativeStepUsage?: ChatStreamUsage, -): { chatId: string } | { usage: ChatStreamUsage } | undefined { - if (part.type === 'start') return { chatId }; + // #184: the active run's id, attached alongside `chatId` on the `start` part so + // the client learns the run it can reconnect to / stop. Omitted when the turn + // is not run-wrapped (legacy path). + runId?: string, +): { chatId: string; runId?: string } | { usage: ChatStreamUsage } | undefined { + if (part.type === 'start') return runId ? { chatId, runId } : { chatId }; if (part.type === 'finish-step') { return cumulativeStepUsage ? { usage: cumulativeStepUsage } : undefined; } diff --git a/apps/server/src/core/ai-chat/dto/ai-chat.dto.ts b/apps/server/src/core/ai-chat/dto/ai-chat.dto.ts index 5c3e97e1..1fd3c8ff 100644 --- a/apps/server/src/core/ai-chat/dto/ai-chat.dto.ts +++ b/apps/server/src/core/ai-chat/dto/ai-chat.dto.ts @@ -43,6 +43,30 @@ export class BoundChatDto { pageId: string; } +/** + * Reconnect to the latest run of a chat (#184): fetch its persisted lifecycle + * state (and the assistant message it projects) for an in-flight or finished run. + */ +export class GetRunDto { + @IsString() + chatId: string; +} + +/** + * Explicitly STOP an agent run (#184): the user pressed Stop — distinct from a + * browser disconnect, which never stops a run. Either the run id (preferred, from + * the streamed start metadata) or the chat id (stop whatever run is active on it). + */ +export class StopRunDto { + @IsOptional() + @IsString() + runId?: string; + + @IsOptional() + @IsString() + chatId?: string; +} + /** Export a chat to Markdown (#183). `lang` localizes the few fixed * role/tool-action labels; defaults to English server-side. */ export class ExportChatDto { diff --git a/apps/server/src/database/database.module.ts b/apps/server/src/database/database.module.ts index da90ef35..f155dff1 100644 --- a/apps/server/src/database/database.module.ts +++ b/apps/server/src/database/database.module.ts @@ -31,6 +31,7 @@ import { FavoriteRepo } from '@docmost/db/repos/favorite/favorite.repo'; import { TemplateRepo } from '@docmost/db/repos/template/template.repo'; import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo'; import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo'; +import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo'; import { AiAgentRoleRepo } from '@docmost/db/repos/ai-agent-roles/ai-agent-roles.repo'; @@ -104,6 +105,7 @@ import { normalizePostgresUrl } from '../common/helpers'; TemplateRepo, AiChatRepo, AiChatMessageRepo, + AiChatRunRepo, AiProviderCredentialsRepo, AiMcpServerRepo, AiAgentRoleRepo, @@ -137,6 +139,7 @@ import { normalizePostgresUrl } from '../common/helpers'; TemplateRepo, AiChatRepo, AiChatMessageRepo, + AiChatRunRepo, AiProviderCredentialsRepo, AiMcpServerRepo, AiAgentRoleRepo, diff --git a/apps/server/src/database/migrations/20260627T130000-ai-chat-runs.ts b/apps/server/src/database/migrations/20260627T130000-ai-chat-runs.ts new file mode 100644 index 00000000..78b1c178 --- /dev/null +++ b/apps/server/src/database/migrations/20260627T130000-ai-chat-runs.ts @@ -0,0 +1,104 @@ +import { type Kysely, sql } from 'kysely'; + +/** + * `ai_chat_runs` — the agent RUN as a first-class, server-side lifecycle object + * (#184 phase 1: autonomous agent runs detached from the browser window). + * + * Until now an agent turn lived ONLY as long as the HTTP request was open + * (`res.hijack()` in ai-chat.controller.ts); a browser disconnect aborted it. + * This table makes a turn a persistent object the server owns: it is created + * when a run starts, transitions pending -> running -> succeeded|failed|aborted, + * and survives the subscriber (browser) going away. The DB is the source of + * truth — a later client reconnects/sees the result by reading this row plus the + * assistant message it projects (`assistant_message_id`). + * + * The assistant message row (#183 step-granular durability) is the PROJECTION of + * a run's output; this row is the run's LIFECYCLE. They are linked by + * `assistant_message_id` (SET NULL if the message is later pruned). + * + * `status` : 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'. + * `trigger` : 'user' | 'autostart' | 'schedule' | 'api' | 'continue' — only + * 'user' is produced in phase 1; the others are reserved for the + * autonomy triggers deferred to phase 2 so they need no later + * migration. + * + * ONE ACTIVE RUN PER CHAT is enforced by a partial unique index on `chat_id` + * WHERE status IN ('pending','running'): an autonomous run and a user run can + * never trample each other on the same chat. Settled runs (succeeded/failed/ + * aborted) are excluded from the index so a chat can accumulate any number of + * historical runs. + */ +export async function up(db: Kysely): Promise { + await db.schema + .createTable('ai_chat_runs') + .ifNotExists() + .addColumn('id', 'uuid', (col) => + col.primaryKey().defaultTo(sql`gen_uuid_v7()`), + ) + .addColumn('chat_id', 'uuid', (col) => + col.references('ai_chats.id').onDelete('cascade').notNull(), + ) + .addColumn('workspace_id', 'uuid', (col) => + col.references('workspaces.id').onDelete('cascade').notNull(), + ) + // The human who triggered the run (audit). SET NULL on user deletion so the + // run history outlives its author; NULL is also the natural value for a + // future system/cron/api trigger with no human actor. + .addColumn('created_by', 'uuid', (col) => + col.references('users.id').onDelete('set null'), + ) + // The assistant message this run materializes (the #183 projection). SET NULL + // if that message row is later deleted; nullable because the run row is + // created a moment BEFORE the assistant row is seeded. + .addColumn('assistant_message_id', 'uuid', (col) => + col.references('ai_chat_messages.id').onDelete('set null'), + ) + .addColumn('trigger', 'varchar(20)', (col) => + col.notNull().defaultTo('user'), + ) + .addColumn('status', 'varchar(20)', (col) => + col.notNull().defaultTo('pending'), + ) + // Terminal error message for a failed run (provider/transport cause), + // mirroring the assistant message's metadata.error. + .addColumn('error', 'text', (col) => col) + // Number of agent steps finished so far (kept monotonic with the projection). + .addColumn('step_count', 'integer', (col) => col.notNull().defaultTo(0)) + // Set when an EXPLICIT user stop is requested (distinct from a mere browser + // disconnect, which never stops a run). The runner aborts the turn and the + // run settles as 'aborted'. + .addColumn('stop_requested_at', 'timestamptz', (col) => col) + .addColumn('started_at', 'timestamptz', (col) => col) + .addColumn('finished_at', 'timestamptz', (col) => col) + .addColumn('created_at', 'timestamptz', (col) => + col.notNull().defaultTo(sql`now()`), + ) + .addColumn('updated_at', 'timestamptz', (col) => + col.notNull().defaultTo(sql`now()`), + ) + .execute(); + + // Reconnect / "latest run for this chat" reads hit chat_id first. + await db.schema + .createIndex('ai_chat_runs_chat_id_idx') + .ifNotExists() + .on('ai_chat_runs') + .column('chat_id') + .execute(); + + // One ACTIVE run per chat (advisory at the DB level): a second pending/running + // run on the same chat is rejected, so a user turn and an autonomous turn can + // never race on the same chat. Partial so settled runs do not collide. + await db.schema + .createIndex('ai_chat_runs_one_active_per_chat') + .ifNotExists() + .on('ai_chat_runs') + .column('chat_id') + .unique() + .where(sql.ref('status'), 'in', sql`('pending','running')`) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('ai_chat_runs').execute(); +} diff --git a/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts b/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts index c9352e31..78cc7064 100644 --- a/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts +++ b/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts @@ -121,6 +121,23 @@ export class AiChatMessageRepo { return rows.reverse(); } + /** Fetch a single message by id + workspace (e.g. a run's projection row for + * the #184 reconnect read). Returns undefined when nothing matches. */ + async findById( + id: string, + workspaceId: string, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + return db + .selectFrom('aiChatMessages') + .select(this.baseFields) + .where('id', '=', id) + .where('workspaceId', '=', workspaceId) + .where('deletedAt', 'is', null) + .executeTakeFirst(); + } + async insert( insertable: InsertableAiChatMessage, trx?: KyselyTransaction, diff --git a/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts b/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts new file mode 100644 index 00000000..ba3e2179 --- /dev/null +++ b/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts @@ -0,0 +1,190 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectKysely } from 'nestjs-kysely'; +import { sql } from 'kysely'; +import { KyselyDB, KyselyTransaction } from '../../types/kysely.types'; +import { dbOrTx } from '../../utils'; +import { + AiChatRun, + InsertableAiChatRun, +} from '@docmost/db/types/entity.types'; + +// Statuses that count as "the run is still live" (an autonomous and a user run +// must never both be live on one chat — enforced by the partial unique index and +// checked here for friendly 409s before the insert races the constraint). +export const ACTIVE_RUN_STATUSES = ['pending', 'running'] as const; + +// Crash-recovery sweep recency threshold (mirrors AiChatMessageRepo.sweepStreaming, +// #183): a 'running'/'pending' run is only swept to 'aborted' once it has been +// UNTOUCHED for this long, so a fresh replica's boot-sweep can never abort a run +// another replica is actively executing. The runner bumps `updatedAt` on every +// step, so a live run never matches. +const SWEEP_RUN_STALE_MS = 10 * 60 * 1000; // 10 minutes + +/** + * Repository for `ai_chat_runs` (#184 phase 1): the agent run as a first-class, + * server-side lifecycle object detached from the HTTP request. The run row is the + * point a client subscribes/reconnects to (by `id` or by chat); the assistant + * message it links to (`assistantMessageId`) is the #183 projection of its output. + */ +@Injectable() +export class AiChatRunRepo { + private readonly logger = new Logger(AiChatRunRepo.name); + + private baseFields: Array = [ + 'id', + 'chatId', + 'workspaceId', + 'createdBy', + 'assistantMessageId', + 'trigger', + 'status', + 'error', + 'stepCount', + 'stopRequestedAt', + 'startedAt', + 'finishedAt', + 'createdAt', + 'updatedAt', + ]; + + constructor(@InjectKysely() private readonly db: KyselyDB) {} + + async insert( + insertable: InsertableAiChatRun, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + return db + .insertInto('aiChatRuns') + .values(insertable) + .returning(this.baseFields) + .executeTakeFirst(); + } + + async findById( + id: string, + workspaceId: string, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + return db + .selectFrom('aiChatRuns') + .select(this.baseFields) + .where('id', '=', id) + .where('workspaceId', '=', workspaceId) + .executeTakeFirst(); + } + + /** The currently-active (pending|running) run for a chat, if any. At most one + * exists thanks to the partial unique index. */ + async findActiveByChat( + chatId: string, + workspaceId: string, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + return db + .selectFrom('aiChatRuns') + .select(this.baseFields) + .where('chatId', '=', chatId) + .where('workspaceId', '=', workspaceId) + .where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[]) + .executeTakeFirst(); + } + + /** The most-recent run for a chat (active or settled) — the reconnect target. */ + async findLatestByChat( + chatId: string, + workspaceId: string, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + return db + .selectFrom('aiChatRuns') + .select(this.baseFields) + .where('chatId', '=', chatId) + .where('workspaceId', '=', workspaceId) + .orderBy('createdAt', 'desc') + .orderBy('id', 'desc') + .limit(1) + .executeTakeFirst(); + } + + /** + * Patch a run by id + workspace; always bumps `updatedAt`. Used for every + * lifecycle transition (mark running, link the assistant message, bump + * step_count, finalize succeeded/failed/aborted). Returns the updated row or + * undefined when nothing matched (e.g. a foreign workspace). + */ + async update( + id: string, + workspaceId: string, + patch: Partial<{ + status: string; + error: string | null; + stepCount: number; + assistantMessageId: string | null; + stopRequestedAt: Date | null; + startedAt: Date | null; + finishedAt: Date | null; + }>, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + return db + .updateTable('aiChatRuns') + .set({ ...(patch as Record), updatedAt: new Date() }) + .where('id', '=', id) + .where('workspaceId', '=', workspaceId) + .returning(this.baseFields) + .executeTakeFirst(); + } + + /** + * Mark an EXPLICIT stop request on an active run (distinct from a browser + * disconnect, which never stops a run). Stamps `stop_requested_at` ONLY while + * the run is still active, so a late stop on an already-settled run is a no-op. + * Returns the row when a stop was recorded, else undefined (nothing active). + */ + async markStopRequested( + id: string, + workspaceId: string, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + return db + .updateTable('aiChatRuns') + .set({ stopRequestedAt: new Date(), updatedAt: new Date() }) + .where('id', '=', id) + .where('workspaceId', '=', workspaceId) + .where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[]) + .returning(this.baseFields) + .executeTakeFirst(); + } + + /** + * Crash-recovery sweep (mirrors AiChatMessageRepo.sweepStreaming): flip every + * run still left pending/running but UNTOUCHED for SWEEP_RUN_STALE_MS — a run + * whose process died before reaching a terminal status — to 'aborted', stamping + * `finished_at`. Run once on server start. Returns the number swept. + * Workspace-wide on purpose (a crash can dangle runs in any workspace). + */ + async sweepRunning(trx?: KyselyTransaction): Promise { + const db = dbOrTx(this.db, trx); + const now = new Date(); + const staleBefore = new Date(now.getTime() - SWEEP_RUN_STALE_MS); + const rows = await db + .updateTable('aiChatRuns') + .set({ + status: 'aborted', + finishedAt: now, + updatedAt: now, + error: sql`coalesce(error, ${'Run interrupted by a server restart.'})`, + }) + .where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[]) + .where('updatedAt', '<', staleBefore) + .returning('id') + .execute(); + return rows.length; + } +} diff --git a/apps/server/src/database/types/db.d.ts b/apps/server/src/database/types/db.d.ts index 462a9349..ad03eacb 100644 --- a/apps/server/src/database/types/db.d.ts +++ b/apps/server/src/database/types/db.d.ts @@ -644,6 +644,35 @@ export interface AiChatMessages { deletedAt: Timestamp | null; } +// The agent RUN as a first-class server-side lifecycle object (#184 phase 1). +// Mirrors migration 20260627T130000-ai-chat-runs.ts. A run is created when an +// agent turn starts and survives the browser disconnecting; the DB is the source +// of truth a later client reconnects to. `assistantMessageId` links to the #183 +// projection row (the assistant message this run materializes). +export interface AiChatRuns { + id: Generated; + chatId: string; + workspaceId: string; + // SET NULL on user deletion (the run history outlives its author); also NULL + // for a future non-human trigger (cron/api). + createdBy: string | null; + // The assistant message this run materializes; SET NULL if it is pruned. + assistantMessageId: string | null; + // 'user' | 'autostart' | 'schedule' | 'api' | 'continue' (only 'user' is + // produced in phase 1; the rest are reserved for the deferred autonomy triggers). + trigger: Generated; + // 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'. + status: Generated; + error: string | null; + stepCount: Generated; + // Set when an EXPLICIT user stop is requested (distinct from a disconnect). + stopRequestedAt: Timestamp | null; + startedAt: Timestamp | null; + finishedAt: Timestamp | null; + createdAt: Generated; + updatedAt: Generated; +} + export interface UserSessions { id: Generated; userId: string; @@ -663,6 +692,7 @@ export interface DB { aiAgentRoles: AiAgentRoles; aiChats: AiChats; aiChatMessages: AiChatMessages; + aiChatRuns: AiChatRuns; apiKeys: ApiKeys; attachments: Attachments; audit: Audit; diff --git a/apps/server/src/database/types/entity.types.ts b/apps/server/src/database/types/entity.types.ts index 36f9be46..0474ab9e 100644 --- a/apps/server/src/database/types/entity.types.ts +++ b/apps/server/src/database/types/entity.types.ts @@ -3,6 +3,7 @@ import { AiAgentRoles, AiChats, AiChatMessages, + AiChatRuns, Attachments, Comments, Groups, @@ -60,6 +61,12 @@ export type InsertableAiChatMessage = Omit< 'tsv' >; +// AI Chat Run (#184 phase 1): the agent run as a first-class lifecycle object, +// detached from the HTTP request / browser window. +export type AiChatRun = Selectable; +export type InsertableAiChatRun = Insertable; +export type UpdatableAiChatRun = Updateable>; + // AI Provider Credentials // SECURITY (D9/§8.1): holds encrypted per-workspace provider API keys. // Never expose this table through workspace endpoints. diff --git a/apps/server/test/integration/ai-chat-run.int-spec.ts b/apps/server/test/integration/ai-chat-run.int-spec.ts new file mode 100644 index 00000000..3596ff06 --- /dev/null +++ b/apps/server/test/integration/ai-chat-run.int-spec.ts @@ -0,0 +1,275 @@ +import { Kysely } from 'kysely'; +import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo'; +import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo'; +import { AiChatRunService } from '../../src/core/ai-chat/ai-chat-run.service'; +import { + getTestDb, + destroyTestDb, + createWorkspace, + createUser, + createChat, +} from './db'; + +/** + * Integration coverage for the #184 phase-1 durable agent run: real SQL against + * docmost_test. Proves the core invariant primitives — a run is a first-class + * lifecycle row, at most one is active per chat, a detached run's progress + * survives with NO subscriber, an explicit stop settles it as aborted, a + * reconnect read returns the persisted state, and a crash sweep recovers + * dangling runs. + */ +describe('AiChatRun durable lifecycle [integration]', () => { + let db: Kysely; + let runRepo: AiChatRunRepo; + let messageRepo: AiChatMessageRepo; + let service: AiChatRunService; + let workspaceId: string; + let otherWorkspaceId: string; + let userId: string; + let chatId: string; + + beforeAll(async () => { + db = getTestDb(); + runRepo = new AiChatRunRepo(db as any); + messageRepo = new AiChatMessageRepo(db as any); + service = new AiChatRunService(runRepo); + workspaceId = (await createWorkspace(db)).id; + otherWorkspaceId = (await createWorkspace(db)).id; + userId = (await createUser(db, workspaceId)).id; + chatId = (await createChat(db, { workspaceId, creatorId: userId })).id; + }); + + afterAll(async () => { + await destroyTestDb(); + }); + + // Each test that creates an active run settles it (or uses its own chat) so the + // partial unique index does not bleed across tests. + + it('insert + findById round-trips a run row, defaulting status/trigger', async () => { + const run = await runRepo.insert({ + chatId, + workspaceId, + createdBy: userId, + }); + expect(run.status).toBe('pending'); + expect(run.trigger).toBe('user'); + expect(run.stepCount).toBe(0); + + const found = await runRepo.findById(run.id, workspaceId); + expect(found!.id).toBe(run.id); + // Workspace-scoped: a foreign workspace sees nothing. + expect(await runRepo.findById(run.id, otherWorkspaceId)).toBeUndefined(); + + // settle so it does not occupy the active slot + await runRepo.update(run.id, workspaceId, { + status: 'succeeded', + finishedAt: new Date(), + }); + }); + + it('enforces ONE ACTIVE run per chat (partial unique index rejects a second)', async () => { + const activeChat = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + const first = await runRepo.insert({ + chatId: activeChat, + workspaceId, + createdBy: userId, + status: 'running', + }); + // A second pending/running run on the SAME chat must be rejected by the DB. + await expect( + runRepo.insert({ + chatId: activeChat, + workspaceId, + createdBy: userId, + status: 'running', + }), + ).rejects.toThrow(); + + // findActiveByChat returns exactly the one active run. + const active = await runRepo.findActiveByChat(activeChat, workspaceId); + expect(active!.id).toBe(first.id); + + // Once it settles, the slot frees and a new run may start. + await runRepo.update(first.id, workspaceId, { + status: 'succeeded', + finishedAt: new Date(), + }); + expect( + await runRepo.findActiveByChat(activeChat, workspaceId), + ).toBeUndefined(); + const second = await runRepo.insert({ + chatId: activeChat, + workspaceId, + createdBy: userId, + status: 'running', + }); + expect(second.id).not.toBe(first.id); + await runRepo.update(second.id, workspaceId, { + status: 'aborted', + finishedAt: new Date(), + }); + }); + + it('DETACHED run: persists + finalizes succeeded with NO subscriber, reconnect returns state', async () => { + // A dedicated chat so the active-run slot is clean. + const runChat = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + + // beginRun = the runner starts the turn (registers an in-memory controller). + const handle = await service.beginRun({ + chatId: runChat, + workspaceId, + userId, + }); + expect(handle.signal.aborted).toBe(false); + expect(service.isLocallyActive(handle.runId)).toBe(true); + + // The assistant projection row (#183) is seeded + linked. + const seeded = await messageRepo.insert({ + chatId: runChat, + workspaceId, + userId, + role: 'assistant', + content: '', + status: 'streaming', + metadata: { parts: [] } as never, + }); + await service.linkAssistantMessage(handle.runId, workspaceId, seeded.id); + + // Progress is persisted as steps finish — NO HTTP socket involved here at all. + await service.recordStep(handle.runId, workspaceId, 1); + await messageRepo.update(seeded.id, workspaceId, { + content: 'partial work', + metadata: { parts: [{ type: 'text', text: 'partial work' }] }, + }); + + // The turn completes; finalize the projection then the run. + await messageRepo.update(seeded.id, workspaceId, { + content: 'final answer', + status: 'completed', + }); + await service.finalizeRun(handle.runId, workspaceId, 'completed'); + + expect(service.isLocallyActive(handle.runId)).toBe(false); + + // Reconnect: the latest run for the chat + its projected message, from the DB. + const run = await service.getLatestForChat(runChat, workspaceId); + expect(run!.status).toBe('succeeded'); + expect(run!.stepCount).toBe(1); + expect(run!.assistantMessageId).toBe(seeded.id); + expect(run!.finishedAt).toBeTruthy(); + const message = await messageRepo.findById(seeded.id, workspaceId); + expect(message!.status).toBe('completed'); + expect(message!.content).toBe('final answer'); + }); + + it('EXPLICIT stop aborts the run signal, marks the row, and settles as aborted', async () => { + const runChat = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + const handle = await service.beginRun({ + chatId: runChat, + workspaceId, + userId, + }); + + // User presses Stop. + const stopped = await service.requestStop(handle.runId, workspaceId); + expect(stopped).toBe(true); + expect(handle.signal.aborted).toBe(true); + + // The row carries the stop request (distinct from a disconnect, which would + // leave stop_requested_at NULL). + const afterStop = await runRepo.findById(handle.runId, workspaceId); + expect(afterStop!.stopRequestedAt).toBeTruthy(); + + // The terminal callback (onAbort) settles the run. + await service.finalizeRun(handle.runId, workspaceId, 'aborted'); + const run = await service.getLatestForChat(runChat, workspaceId); + expect(run!.status).toBe('aborted'); + }); + + it('markStopRequested is a no-op on an already-settled run (returns undefined)', async () => { + const runChat = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + const run = await runRepo.insert({ + chatId: runChat, + workspaceId, + createdBy: userId, + status: 'running', + }); + await runRepo.update(run.id, workspaceId, { + status: 'succeeded', + finishedAt: new Date(), + }); + const marked = await runRepo.markStopRequested(run.id, workspaceId); + expect(marked).toBeUndefined(); + }); + + it('sweepRunning aborts STALE dangling runs but not fresh or settled ones', async () => { + const sweepChat1 = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + const sweepChat2 = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + const sweepChat3 = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + + const stale = await runRepo.insert({ + chatId: sweepChat1, + workspaceId, + createdBy: userId, + status: 'running', + }); + const fresh = await runRepo.insert({ + chatId: sweepChat2, + workspaceId, + createdBy: userId, + status: 'running', + }); + const settled = await runRepo.insert({ + chatId: sweepChat3, + workspaceId, + createdBy: userId, + status: 'running', + }); + await runRepo.update(settled.id, workspaceId, { + status: 'succeeded', + finishedAt: new Date(), + }); + // Backdate the stale run's updatedAt past the 10-minute staleness window. + await db + .updateTable('aiChatRuns') + .set({ updatedAt: new Date(Date.now() - 20 * 60 * 1000) }) + .where('id', '=', stale.id) + .execute(); + + const swept = await runRepo.sweepRunning(); + expect(swept).toBeGreaterThanOrEqual(1); + + expect((await runRepo.findById(stale.id, workspaceId))!.status).toBe( + 'aborted', + ); + // Fresh (recently-updated) running run survives — a sibling replica may still + // be executing it. + expect((await runRepo.findById(fresh.id, workspaceId))!.status).toBe( + 'running', + ); + expect((await runRepo.findById(settled.id, workspaceId))!.status).toBe( + 'succeeded', + ); + + // cleanup active fresh run + await runRepo.update(fresh.id, workspaceId, { + status: 'aborted', + finishedAt: new Date(), + }); + }); +});