diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts index 7e93013b..419a3bfc 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -1,9 +1,23 @@ import { Logger } from '@nestjs/common'; import { AiChatRunService, + RunAlreadyActiveError, + ONE_ACTIVE_RUN_PER_CHAT_INDEX, mapTurnStatusToRun, } from './ai-chat-run.service'; +/** Shape a Postgres unique-violation the way the postgres.js driver surfaces it: + * SQLSTATE 23505 + the offending index in `constraint_name`. */ +function uniqueViolation(constraintName: string): Error & { + code: string; + constraint_name: string; +} { + return Object.assign(new Error('duplicate key value violates unique constraint'), { + code: '23505', + constraint_name: constraintName, + }); +} + /** * Unit coverage for the #184 phase-1 run lifecycle (AiChatRunService) with a * hand-rolled mock repo — no Nest graph, no DB. The invariant under test is the @@ -92,6 +106,79 @@ describe('AiChatRunService run lifecycle', () => { expect(svc.isLocallyActive('run-1')).toBe(true); }); + it('beginRun REJECTS the racer: a 23505 on the one-active-per-chat index throws RunAlreadyActiveError (not swallowed) and registers no controller', async () => { + // The race: the controller's cheap pre-check passed for BOTH concurrent + // turns, so the loser's INSERT hits the partial unique index. That rejection + // is the authoritative gate — it must surface, not be swallowed into an + // untracked turn. + const repo = makeRepo({ + insert: jest.fn(async () => { + throw uniqueViolation(ONE_ACTIVE_RUN_PER_CHAT_INDEX); + }), + }); + const svc = new AiChatRunService(repo as never); + await expect( + svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), + ).rejects.toBeInstanceOf(RunAlreadyActiveError); + // No controller leaked for a rejected start. + expect(svc.isLocallyActive('run-1')).toBe(false); + }); + + it('beginRun does NOT mask an unrelated unique violation as already-active', async () => { + // A 23505 on some OTHER constraint is a real bug, not the race — it must + // propagate unchanged so it is never silently treated as "already active". + const other = uniqueViolation('ai_chat_runs_pkey'); + const repo = makeRepo({ + insert: jest.fn(async () => { + throw other; + }), + }); + const svc = new AiChatRunService(repo as never); + await expect( + svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), + ).rejects.toBe(other); + }); + + it('beginRun propagates a non-unique insert failure unchanged', async () => { + const boom = new Error('connection reset'); + const repo = makeRepo({ + insert: jest.fn(async () => { + throw boom; + }), + }); + const svc = new AiChatRunService(repo as never); + await expect( + svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), + ).rejects.toBe(boom); + }); + + it('two concurrent begins on one chat: exactly one wins, the other is rejected as already-active', async () => { + // Integration-style: model the DB partial unique index with a one-shot slot. + // The first insert claims it; the second hits a 23505 on the active index. + let slotTaken = false; + const repo = makeRepo({ + insert: jest.fn(async (v: any) => { + if (slotTaken) throw uniqueViolation(ONE_ACTIVE_RUN_PER_CHAT_INDEX); + slotTaken = true; + return { id: 'run-win', status: v.status, chatId: v.chatId }; + }), + }); + const svc = new AiChatRunService(repo as never); + const results = await Promise.allSettled([ + svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), + svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), + ]); + const fulfilled = results.filter((r) => r.status === 'fulfilled'); + const rejected = results.filter((r) => r.status === 'rejected'); + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + expect((rejected[0] as PromiseRejectedResult).reason).toBeInstanceOf( + RunAlreadyActiveError, + ); + // Exactly the winner is locally active. + expect(svc.isLocallyActive('run-win')).toBe(true); + }); + it('a SUBSCRIBER detaching does NOT abort the run (only an explicit stop does)', async () => { const repo = makeRepo(); const svc = new AiChatRunService(repo as never); diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts index e23668bb..1fb8ca4e 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -1,6 +1,28 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo'; import { AiChatRun } from '@docmost/db/types/entity.types'; +import { isUniqueViolation, violatedConstraint } from '@docmost/db/utils'; + +/** Name of the partial unique index enforcing "one active run per chat" (see the + * ai_chat_runs migration). A 23505 on THIS constraint is the race-safe signal + * that a concurrent turn already owns the chat — distinct from any other unique + * collision, which must NOT be silently treated as "already active". */ +export const ONE_ACTIVE_RUN_PER_CHAT_INDEX = 'ai_chat_runs_one_active_per_chat'; + +/** + * Thrown by {@link AiChatRunService.beginRun} when the run-row INSERT loses the + * race for a chat's single active slot (the partial unique index rejects it with + * a 23505). This is the AUTHORITATIVE concurrency gate: the controller's cheap + * pre-check is only a fast-path, and a request that slips past it must NOT run + * untracked. The caller (AiChatService.stream) translates this into a 409 and + * aborts the turn BEFORE any AI/provider call. + */ +export class RunAlreadyActiveError extends Error { + constructor(public readonly chatId: string) { + super(`An agent run is already in progress for chat ${chatId}`); + this.name = 'RunAlreadyActiveError'; + } +} /** * The terminal status of a TURN (the #183 assistant-row lifecycle) maps onto the @@ -96,9 +118,12 @@ export class AiChatRunService implements OnModuleInit { /** * Start a run for a turn: insert the run row (status 'running', startedAt now), * register a fresh AbortController for it, and return a {@link RunHandle} whose - * `signal` the agent loop uses. The DB unique index guarantees at most one - * active run per chat — a second concurrent start on the same chat REJECTS at - * the insert (the caller surfaces a 409). The controller is registered AFTER a + * `signal` the agent loop uses. The DB partial unique index guarantees at most + * one active run per chat — a second concurrent start on the same chat REJECTS + * at the insert (a 23505 on {@link ONE_ACTIVE_RUN_PER_CHAT_INDEX}). That + * rejection is the AUTHORITATIVE race gate: it is surfaced as a distinct + * {@link RunAlreadyActiveError} (NOT swallowed), so the caller turns it into a + * 409 and never streams an untracked turn. The controller is registered AFTER a * successful insert so a rejected start leaks nothing. */ async beginRun(args: { @@ -107,14 +132,29 @@ export class AiChatRunService implements OnModuleInit { userId: string; trigger?: string; }): Promise { - const run = await this.runRepo.insert({ - chatId: args.chatId, - workspaceId: args.workspaceId, - createdBy: args.userId, - trigger: args.trigger ?? 'user', - status: 'running', - startedAt: new Date(), - }); + let run: AiChatRun; + try { + run = await this.runRepo.insert({ + chatId: args.chatId, + workspaceId: args.workspaceId, + createdBy: args.userId, + trigger: args.trigger ?? 'user', + status: 'running', + startedAt: new Date(), + }); + } catch (err) { + // The race backstop: a concurrent turn already holds this chat's single + // active slot, so the partial unique index rejected our insert. Surface a + // distinct signal — the caller MUST reject this turn (409), not run it + // untracked. Any OTHER error propagates unchanged. + if ( + isUniqueViolation(err) && + violatedConstraint(err) === ONE_ACTIVE_RUN_PER_CHAT_INDEX + ) { + throw new RunAlreadyActiveError(args.chatId); + } + throw err; + } const controller = new AbortController(); this.active.set(run.id, { controller, diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.ts b/apps/server/src/core/ai-chat/ai-chat.controller.ts index 2506cfe8..6abaae72 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.ts @@ -314,9 +314,10 @@ export class AiChatController { workspace.id, ); if (active) { - throw new ConflictException( - 'An agent run is already in progress for this chat', - ); + throw new ConflictException({ + message: 'An agent run is already in progress for this chat', + code: 'A_RUN_ALREADY_ACTIVE', + }); } } @@ -424,13 +425,28 @@ export class AiChatController { runHooks, }); } catch (err) { - // Any failure AFTER hijack can no longer send a clean JSON error, so emit - // a minimal error on the raw socket if nothing has been written yet. - this.logger.error('AI chat stream failed', err as Error); + // Any failure AFTER hijack can no longer go through Nest's exception + // filter, so emit the error on the raw socket if nothing has been written + // yet. The lost-the-race 409 (RunAlreadyActiveError -> ConflictException) + // is raised by stream() BEFORE it writes a byte, so headers are still + // unsent here: honor the HttpException's real status + body (a clean 409), + // not a blanket 500. Everything else stays a 500. + const isHttp = err instanceof HttpException; + if (!isHttp) { + this.logger.error('AI chat stream failed', err as Error); + } if (!res.raw.headersSent) { - res.raw.statusCode = 500; + const status = isHttp ? err.getStatus() : 500; + const payload = isHttp + ? err.getResponse() + : { error: 'Internal server error' }; + res.raw.statusCode = status; res.raw.setHeader('Content-Type', 'application/json'); - res.raw.end(JSON.stringify({ error: 'Internal server error' })); + res.raw.end( + JSON.stringify( + typeof payload === 'string' ? { message: payload } : payload, + ), + ); } else if (!res.raw.writableEnded) { res.raw.end(); } diff --git a/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts new file mode 100644 index 00000000..463f75ad --- /dev/null +++ b/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts @@ -0,0 +1,100 @@ +import { ConflictException } from '@nestjs/common'; + +// Mock the AI SDK so we can PROVE no provider call is made for the turn we are +// about to reject. The race rejection happens at runHooks.begin(), long before +// any streamText/generateText, so these never resolve a real model. +jest.mock('ai', () => ({ + streamText: jest.fn(), + generateText: jest.fn(), + convertToModelMessages: jest.fn(() => []), + stepCountIs: jest.fn(() => () => false), +})); + +import { streamText, generateText } from 'ai'; +import { AiChatService } from './ai-chat.service'; +import { RunAlreadyActiveError } from './ai-chat-run.service'; + +/** + * Race-closure coverage for the "one active run per chat" guard (#184). + * + * THE BUG: two simultaneous POST /ai-chat/stream on the same chat both pass the + * controller's cheap pre-check (TOCTOU), so the loser's run-row INSERT hits the + * partial unique index. Previously that 23505 was SWALLOWED and the second turn + * streamed UNTRACKED (no runId, not stoppable). THE FIX: beginRun surfaces a + * RunAlreadyActiveError and stream() turns it into a 409 BEFORE any AI call — + * the second turn never runs. + */ +describe('AiChatService.stream — concurrent-run race rejection (#184)', () => { + const streamTextMock = streamText as unknown as jest.Mock; + const generateTextMock = generateText as unknown as jest.Mock; + + beforeEach(() => { + streamTextMock.mockReset(); + generateTextMock.mockReset(); + }); + + // Minimal service whose only reachable deps before begin() are aiChatRepo + // (resolve the existing chat) — everything past begin must remain untouched. + function makeService(beginImpl: () => Promise) { + const aiChatMessageRepo = { insert: jest.fn() }; + const aiChatRepo = { + // An existing chat: stream keeps the supplied chatId and skips creation. + findById: jest.fn(async () => ({ id: 'chat-1', workspaceId: 'ws-1' })), + insert: jest.fn(), + }; + const svc = new AiChatService( + {} as never, // ai + aiChatRepo as never, + aiChatMessageRepo as never, + {} as never, // aiSettings + {} as never, // tools + {} as never, // mcpClients + {} as never, // aiAgentRoleRepo + {} as never, // pageRepo + {} as never, // pageAccess + ); + const begin = jest.fn(beginImpl); + return { svc, begin, aiChatRepo, aiChatMessageRepo }; + } + + const baseArgs = (begin: jest.Mock) => ({ + user: { id: 'user-1' } as never, + workspace: { id: 'ws-1' } as never, + sessionId: 'sess-1', + body: { chatId: 'chat-1', messages: [] } as never, + res: { raw: {} } as never, + signal: new AbortController().signal, + model: {} as never, + role: null, + runHooks: { + begin, + onAssistantSeeded: jest.fn(), + onStep: jest.fn(), + onSettled: jest.fn(), + } as never, + }); + + it('rejects the racer with a 409 ConflictException BEFORE any AI call, and never persists an untracked turn', async () => { + // begin loses the unique-index race -> RunAlreadyActiveError. + const { svc, begin, aiChatMessageRepo } = makeService(() => { + throw new RunAlreadyActiveError('chat-1'); + }); + + const promise = svc.stream(baseArgs(begin)); + + await expect(promise).rejects.toBeInstanceOf(ConflictException); + await promise.catch((err: ConflictException) => { + expect(err.getStatus()).toBe(409); + expect((err.getResponse() as { code?: string }).code).toBe( + 'A_RUN_ALREADY_ACTIVE', + ); + }); + + // The decisive assertions: the rejected racer spent NO tokens and left NO + // untracked turn behind. + expect(begin).toHaveBeenCalledTimes(1); + expect(streamTextMock).not.toHaveBeenCalled(); + expect(generateTextMock).not.toHaveBeenCalled(); + expect(aiChatMessageRepo.insert).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index 5095b0f3..4ffb1cef 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -1,4 +1,5 @@ import { + ConflictException, ForbiddenException, Injectable, Logger, @@ -30,6 +31,7 @@ import { import { AiChatToolsService } from './tools/ai-chat-tools.service'; import { McpClientsService } from './external-mcp/mcp-clients.service'; import { buildSystemPrompt } from './ai-chat.prompt'; +import { RunAlreadyActiveError } from './ai-chat-run.service'; import { roleModelOverride } from './roles/role-model-config'; import { startSseHeartbeat, @@ -395,8 +397,20 @@ export class AiChatService implements OnModuleInit { effectiveSignal = handle.signal; } } catch (err) { - // A failed run start must not break the turn — fall back to the socket - // signal (legacy behavior) and stream anyway. + // RACE BACKSTOP: the run-row INSERT lost the chat's single active slot + // (the partial unique index rejected it). This is the AUTHORITATIVE + // concurrency gate — the controller's pre-check is only a fast-path, and a + // request that slipped past it must NOT proceed. Reject the turn with a + // 409 NOW, BEFORE any AI/provider call: no tokens are spent and no + // untracked turn streams. (Matches the controller's pre-check 409.) + if (err instanceof RunAlreadyActiveError) { + throw new ConflictException({ + message: 'An agent run is already in progress for this chat', + code: 'A_RUN_ALREADY_ACTIVE', + }); + } + // Any OTHER run-start failure must not break the turn — fall back to the + // socket signal (legacy behavior) and stream anyway. this.logger.error( `Failed to begin agent run (chat ${chatId}); streaming without run tracking`, err as Error,