diff --git a/.env.example b/.env.example index 7407e629..08eea50a 100644 --- a/.env.example +++ b/.env.example @@ -170,6 +170,20 @@ MCP_DOCMOST_PASSWORD= # Default 900000 (15 min). # AI_MCP_CALL_TIMEOUT_MS=900000 +# --- Autonomous / detached agent runs (settings.ai.autonomousRuns) --- +# Opt-in per workspace (AI settings; off by default). When on, a chat turn becomes +# a server-side RUN that survives a browser disconnect — only an explicit Stop ends +# it, and a client reconnects/live-follows the run. +# +# DEPLOY CONSTRAINT — SINGLE-INSTANCE ONLY in phase 1: Stop and the in-process +# AbortController that backs it are process-local, so a Stop only aborts a run +# executing on the SAME replica that owns it (cross-instance pub/sub stop is phase +# 2 and not yet reliable). Do NOT enable autonomousRuns on a horizontally-scaled +# deployment (multiple replicas behind a load balancer, or Docmost cloud +# CLOUD=true) — run a single instance instead. The server logs a startup WARNING +# when it detects a multi-instance deployment (CLOUD=true) so the constraint is +# visible, and a startup sweep settles any run left dangling by a restart. + # --- Anonymous public-share AI assistant --- # Opt-in per workspace (AI settings -> "public share assistant"; off by default). # When enabled, anonymous visitors of a published share can ask an AI about that diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts index e2d40eab..dd863bbd 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -339,6 +339,81 @@ describe('AiChatRunService run lifecycle', () => { ); }); + it('F6: a TRANSIENT terminal-write failure is ridden out by the bounded retry — the run is settled, not stranded', async () => { + // The bug: finalizeRun used to DROP the in-memory entry BEFORE the terminal + // UPDATE, then only warn-log a failure. A single transient blip (pool + // exhaustion / deadlock / connection hiccup) on that PK UPDATE left the row + // 'running' with nothing left to recover it -> every later turn in that chat + // 409s until a restart. The fix updates FIRST and retries. + let calls = 0; + const repo = makeRepo({ + update: jest.fn(async () => { + calls += 1; + if (calls === 1) throw new Error('deadlock detected'); + return { id: 'run-1' }; + }), + }); + jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never, makeEnv() as never); + await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + + await svc.finalizeRun('run-1', 'ws-1', 'completed'); + + // The retry landed the terminal write: the entry is dropped (slot freed) and + // the row carries the real terminal status — NOT stranded at 'running'. + expect(svc.isLocallyActive('run-1')).toBe(false); + expect(repo.update).toHaveBeenCalledTimes(2); + expect(repo.update).toHaveBeenLastCalledWith( + 'run-1', + 'ws-1', + expect.objectContaining({ status: 'succeeded' }), + ); + }); + + it('F6: if the terminal write keeps failing, the entry is RETAINED and a LATER settle completes it (chat not permanently 409d)', async () => { + // Worst case: the DB is down for the whole first finalize (all attempts fail). + // The run must NOT be silently lost — the entry stays so a subsequent settle + // (a streamText callback, requestStop -> onAbort, or a future sweep) can retry. + let healthy = false; + const repo = makeRepo({ + update: jest.fn(async () => { + if (!healthy) throw new Error('pool exhausted'); + return { id: 'run-1' }; + }), + }); + jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never, makeEnv() as never); + await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + + // First settle: every bounded attempt fails -> entry retained, NOT settled. + await svc.finalizeRun('run-1', 'ws-1', 'completed'); + expect(svc.isLocallyActive('run-1')).toBe(true); + + // The DB recovers; a later settle now succeeds and frees the slot. + healthy = true; + await svc.finalizeRun('run-1', 'ws-1', 'completed'); + expect(svc.isLocallyActive('run-1')).toBe(false); + expect(repo.update).toHaveBeenLastCalledWith( + 'run-1', + 'ws-1', + expect.objectContaining({ status: 'succeeded' }), + ); + + // And it is now idempotent: a further settle no-ops (terminal row already + // written), so a double-settle can never clobber the real status. + const callsBefore = repo.update.mock.calls.length; + await svc.finalizeRun('run-1', 'ws-1', 'error', 'late'); + expect(repo.update).toHaveBeenCalledTimes(callsBefore); + }); + it('recordStep / linkAssistantMessage are best-effort: a repo failure is swallowed', async () => { const repo = makeRepo({ update: jest.fn(async () => { diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts index 16f599c2..5e7b217c 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -91,6 +91,24 @@ export class AiChatRunService implements OnModuleInit { // restart) still records `stop_requested_at` on the row. private readonly active = new Map(); + // runIds whose TERMINAL row write has SUCCEEDED — the idempotency once-gate + // (F6). A finalize must short-circuit only AFTER the terminal write has landed, + // NOT merely after the in-memory entry was dropped: a transient UPDATE failure + // has to stay retryable, so "already settled" means "row already terminal", not + // "entry already gone". Grows by one short UUID per finished run over process + // uptime — negligible in phase 1's single process. + private readonly settled = new Set(); + + // Bounded retry for the terminal write (F6): a single PK UPDATE can fail + // transiently under many fire-and-forget writes (pool exhaustion, deadlock, a + // brief connection blip). Riding out that blip in-place matters because the + // dominant success path (streamText onFinish) settles exactly ONCE — if that + // write is dropped and never retried, the row is stranded 'running' and the + // one-active-run gate 409s every future turn in the chat until a restart (no + // periodic sweep in phase 1). + private static readonly FINALIZE_MAX_ATTEMPTS = 3; + private static readonly FINALIZE_RETRY_BASE_MS = 50; + constructor( private readonly runRepo: AiChatRunRepo, private readonly environment: EnvironmentService, @@ -244,19 +262,27 @@ export class AiChatRunService implements OnModuleInit { /** * Finalize a run to its terminal status (succeeded / failed / aborted), - * stamping finishedAt + any error, and DROP its in-memory entry. Best-effort. + * stamping finishedAt + any error. Best-effort, but ROBUST against a transient + * terminal-write failure (F6). * - * IDEMPOTENT (#184 review): the terminal write happens AT MOST ONCE per run. - * `this.active.delete(runId)` returns false when the run was already settled - * (its in-memory entry already dropped); in that case we no-op. This collapses - * a legitimate double-settle to a single write: AiChatService.stream wraps the - * turn in a safety-net catch that settles the run to 'error' on any failure - * BEFORE streamText's terminal callbacks own the lifecycle — and on the rare - * path where streamText DID attach (so a callback also settles) the two would - * otherwise both call onSettled. The first caller wins and writes the terminal - * row; the second returns early, so a late settle can never clobber the real - * terminal status or double-write. beginRun always registers the entry before - * the turn can settle, so a legitimate first finalize always finds it. + * ORDER MATTERS (F6): the terminal UPDATE happens FIRST; only once it SUCCEEDS + * do we record the run as settled and drop its in-memory entry. If the UPDATE + * fails on every bounded attempt we KEEP the in-memory entry and do NOT mark it + * settled — so a later settle (a streamText callback, a requestStop -> onAbort, + * a future sweep) can retry the terminal write. A run is therefore never + * silently stranded 'running' (which would 409 every future turn in the chat + * until a restart, since phase 1 has no periodic sweep). + * + * IDEMPOTENT on SUCCESS (#184 review): the terminal write happens AT MOST ONCE + * per run. The once-gate keys off {@link settled} (the terminal row already + * written), NOT off entry-deletion — so a dropped-then-retried write is still + * allowed, while a genuine double-settle collapses to a single write. + * AiChatService.stream wraps the turn in a safety-net catch that settles the run + * to 'error' on any failure BEFORE streamText's terminal callbacks own the + * lifecycle — and on the rare path where streamText DID attach (so a callback + * also settles) both routes call onSettled. The FIRST to write the terminal row + * wins; the second sees `settled` and returns early, so a late settle can never + * clobber the real terminal status or double-write the row. */ async finalizeRun( runId: string, @@ -264,20 +290,45 @@ export class AiChatRunService implements OnModuleInit { turnStatus: TurnTerminalStatus, error?: string, ): Promise { - if (!this.active.delete(runId)) return; - try { - await this.runRepo.update(runId, workspaceId, { - status: mapTurnStatusToRun(turnStatus), - finishedAt: new Date(), - error: error ?? null, - }); - } catch (err) { - this.logger.warn( - `Failed to finalize run ${runId}: ${ - err instanceof Error ? err.message : 'unknown error' - }`, - ); + // Already terminally written -> idempotent no-op. + if (this.settled.has(runId)) return; + + for ( + let attempt = 1; + attempt <= AiChatRunService.FINALIZE_MAX_ATTEMPTS; + attempt++ + ) { + try { + await this.runRepo.update(runId, workspaceId, { + status: mapTurnStatusToRun(turnStatus), + finishedAt: new Date(), + error: error ?? null, + }); + // Terminal write landed: arm the once-gate, then (and only then) free the + // chat's active slot by dropping the in-memory entry. + this.settled.add(runId); + this.active.delete(runId); + return; + } catch (err) { + this.logger.warn( + `Failed to finalize run ${runId} (attempt ${attempt}/${ + AiChatRunService.FINALIZE_MAX_ATTEMPTS + }): ${err instanceof Error ? err.message : 'unknown error'}`, + ); + if (attempt < AiChatRunService.FINALIZE_MAX_ATTEMPTS) { + await this.delay(AiChatRunService.FINALIZE_RETRY_BASE_MS * attempt); + } + } } + // Every attempt failed: deliberately KEEP the in-memory entry and leave the + // run UNsettled so a later finalize/requestStop/sweep can retry — the run is + // not stranded. + } + + /** Small async backoff between terminal-write retries (F6). Isolated so it is + * trivial to stub/fake-time in tests. */ + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } /** diff --git a/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts index 12ce4b97..747130ba 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts @@ -242,4 +242,96 @@ describe('AiChatService.stream — abortSignal wiring (#184 F3)', () => { expect(streamTextMock).toHaveBeenCalledTimes(1); expect(streamTextMock.mock.calls[0][0].abortSignal).toBe(socketSignal); }); + + /** + * F9 — streamText's TERMINAL callbacks carry the #184 run lifecycle: + * onStepFinish -> runHooks.onStep(runId, stepCount) + * onFinish -> runHooks.onSettled(runId, 'completed') (dominant path) + * onAbort -> runHooks.onSettled(runId, 'aborted') + * onError -> runHooks.onSettled(runId, 'error', cause) + * makeStreamResult() ignores the streamText options, so these callbacks never + * fire on their own — a regression in this wiring (esp. the success path) would + * strand the run with NO test catching it. Here we CAPTURE the options streamText + * was handed and invoke each callback with the real wiring, asserting the run + * hooks fire with the right args. + */ + // Drive stream() to the point streamText is called, capturing the options object + // (which carries onStepFinish/onFinish/onError/onAbort) and the run hooks. + async function captureStreamCallbacks() { + const { svc } = makeService(); + let capturedOpts: any; + streamTextMock.mockImplementation((opts: any) => { + capturedOpts = opts; + return makeStreamResult(); + }); + const runHooks = { + begin: jest.fn(async () => ({ + runId: 'run-1', + signal: new AbortController().signal, + })), + onAssistantSeeded: jest.fn(), + onStep: jest.fn(), + onSettled: jest.fn(), + }; + await svc.stream({ + user: { id: 'user-1' } as never, + workspace: { id: 'ws-1' } as never, + sessionId: 'sess-1', + body: body as never, + res: makeRes() as never, + signal: new AbortController().signal, + model: {} as never, + role: null, + runHooks: runHooks as never, + }); + expect(capturedOpts).toBeDefined(); + return { capturedOpts, runHooks }; + } + + it('F9: onStepFinish bumps the run step count, onFinish settles the run "completed" (the dominant autonomous-run path)', async () => { + const { capturedOpts, runHooks } = await captureStreamCallbacks(); + + // A finished step -> onStep(runId, finishedStepCount). + capturedOpts.onStepFinish({ text: 'step one', toolCalls: [], content: [] }); + expect(runHooks.onStep).toHaveBeenCalledWith('run-1', 1); + capturedOpts.onStepFinish({ text: 'step two', toolCalls: [], content: [] }); + expect(runHooks.onStep).toHaveBeenLastCalledWith('run-1', 2); + + // The success terminal callback settles the run. + await capturedOpts.onFinish({ + text: 'done', + finishReason: 'stop', + totalUsage: {}, + usage: {}, + steps: [], + }); + expect(runHooks.onSettled).toHaveBeenCalledWith('run-1', 'completed'); + }); + + it('F9: onAbort settles the run "aborted"', async () => { + jest + .spyOn(Logger.prototype, 'warn') + .mockImplementation(() => undefined as never); + const { capturedOpts, runHooks } = await captureStreamCallbacks(); + + await capturedOpts.onAbort({ steps: [] }); + expect(runHooks.onSettled).toHaveBeenCalledWith('run-1', 'aborted'); + }); + + it('F9: onError settles the run "error" carrying the provider cause', async () => { + jest + .spyOn(Logger.prototype, 'error') + .mockImplementation(() => undefined as never); + jest + .spyOn(Logger.prototype, 'warn') + .mockImplementation(() => undefined as never); + const { capturedOpts, runHooks } = await captureStreamCallbacks(); + + await capturedOpts.onError({ error: new Error('provider exploded') }); + expect(runHooks.onSettled).toHaveBeenCalledWith( + 'run-1', + 'error', + expect.stringContaining('provider exploded'), + ); + }); }); diff --git a/apps/server/test/integration/ai-chat-run.int-spec.ts b/apps/server/test/integration/ai-chat-run.int-spec.ts index 3596ff06..f6a753a1 100644 --- a/apps/server/test/integration/ai-chat-run.int-spec.ts +++ b/apps/server/test/integration/ai-chat-run.int-spec.ts @@ -1,5 +1,8 @@ import { Kysely } from 'kysely'; -import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo'; +import { + AiChatRunRepo, + SWEEP_RUN_STALE_MS, +} from '@docmost/db/repos/ai-chat/ai-chat-run.repo'; import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo'; import { AiChatRunService } from '../../src/core/ai-chat/ai-chat-run.service'; import { @@ -32,7 +35,9 @@ describe('AiChatRun durable lifecycle [integration]', () => { db = getTestDb(); runRepo = new AiChatRunRepo(db as any); messageRepo = new AiChatMessageRepo(db as any); - service = new AiChatRunService(runRepo); + // Boot-sweep isn't triggered here; the isCloud stub is all the service needs + // for these direct-call integration cases (F7). + service = new AiChatRunService(runRepo, { isCloud: () => false } as never); workspaceId = (await createWorkspace(db)).id; otherWorkspaceId = (await createWorkspace(db)).id; userId = (await createUser(db, workspaceId)).id; @@ -251,14 +256,17 @@ describe('AiChatRun durable lifecycle [integration]', () => { .where('id', '=', stale.id) .execute(); - const swept = await runRepo.sweepRunning(); + // WINDOWED sweep (phase-2 multi-instance timer path): only runs older than the + // staleness window are aborted, so a sibling replica's fresh run survives. The + // no-arg boot sweep (variant C) is unconditional — covered separately below. + const swept = await runRepo.sweepRunning({ staleMs: SWEEP_RUN_STALE_MS }); expect(swept).toBeGreaterThanOrEqual(1); expect((await runRepo.findById(stale.id, workspaceId))!.status).toBe( 'aborted', ); - // Fresh (recently-updated) running run survives — a sibling replica may still - // be executing it. + // Fresh (recently-updated) running run survives the WINDOWED sweep — a sibling + // replica may still be executing it. expect((await runRepo.findById(fresh.id, workspaceId))!.status).toBe( 'running', ); @@ -272,4 +280,25 @@ describe('AiChatRun durable lifecycle [integration]', () => { finishedAt: new Date(), }); }); + + it('sweepRunning() with NO args (boot sweep / variant C) aborts even a FRESH running run', async () => { + // F1/DECISION C at the SQL level: the unconditional boot sweep has NO + // staleness window, so a run updated just now (a fast restart) is settled too + // — otherwise it would stay 'running' forever and 409 every future turn. + const bootChat = ( + await createChat(db, { workspaceId, creatorId: userId }) + ).id; + const fresh = await runRepo.insert({ + chatId: bootChat, + workspaceId, + createdBy: userId, + status: 'running', + }); + // updatedAt = now (fresh, untouched). The no-arg sweep settles it anyway. + const swept = await runRepo.sweepRunning(); + expect(swept).toBeGreaterThanOrEqual(1); + expect((await runRepo.findById(fresh.id, workspaceId))!.status).toBe( + 'aborted', + ); + }); });