fix(ai-chat): close the concurrent-run race in #184 (insert is the gate)
The "one active run per chat" guard was bypassable under a race. Two simultaneous POST /ai-chat/stream on the same chat both passed the controller's pre-hijack 409 check (a check-then-act TOCTOU), then the loser's INSERT into ai_chat_runs hit the partial unique index (ai_chat_runs_one_active_per_chat, 23505). That error was SWALLOWED, so the second turn streamed UNTRACKED: no runId, not targetable by /stop, and (autonomousRuns on) onClose won't abort it -> an orphan unstoppable run that also spends provider tokens. Make the unique-index INSERT the authoritative gate: - AiChatRunService.beginRun: when the run-row INSERT fails with a 23505 on ONE_ACTIVE_RUN_PER_CHAT_INDEX (via isUniqueViolation/violatedConstraint), no longer swallow it -> throw a distinct RunAlreadyActiveError. Any other error (incl. a 23505 on a different constraint) propagates unchanged. - AiChatService.stream: when begin throws RunAlreadyActiveError, reject the turn with a 409 ConflictException (code A_RUN_ALREADY_ACTIVE) BEFORE any AI/provider call -> no tokens spent, no untracked turn. Other begin failures keep the legacy best-effort fallback (stream socket-bound). - ai-chat.controller: post-hijack catch honors an HttpException's real status/body (clean 409) instead of a blanket 500, since the race 409 is raised before a byte is written. Pre-check 409 now carries the same code. The controller's cheap pre-check stays as a fast-path for the common sequential double-submit; the INSERT violation is the race-safe backstop. Tests: ai-chat-run.service.spec proves beginRun throws RunAlreadyActiveError on the active-index 23505 (and only that constraint), leaks no controller, and an integration-style two-concurrent-begins test where exactly one wins; new ai-chat.service.run-race.spec proves stream rejects with a 409 ConflictException BEFORE any streamText/generateText and never persists an untracked turn. The latter fails without the fix. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,9 +1,23 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
import {
|
||||
AiChatRunService,
|
||||
RunAlreadyActiveError,
|
||||
ONE_ACTIVE_RUN_PER_CHAT_INDEX,
|
||||
mapTurnStatusToRun,
|
||||
} from './ai-chat-run.service';
|
||||
|
||||
/** Shape a Postgres unique-violation the way the postgres.js driver surfaces it:
|
||||
* SQLSTATE 23505 + the offending index in `constraint_name`. */
|
||||
function uniqueViolation(constraintName: string): Error & {
|
||||
code: string;
|
||||
constraint_name: string;
|
||||
} {
|
||||
return Object.assign(new Error('duplicate key value violates unique constraint'), {
|
||||
code: '23505',
|
||||
constraint_name: constraintName,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit coverage for the #184 phase-1 run lifecycle (AiChatRunService) with a
|
||||
* hand-rolled mock repo — no Nest graph, no DB. The invariant under test is the
|
||||
@@ -92,6 +106,79 @@ describe('AiChatRunService run lifecycle', () => {
|
||||
expect(svc.isLocallyActive('run-1')).toBe(true);
|
||||
});
|
||||
|
||||
it('beginRun REJECTS the racer: a 23505 on the one-active-per-chat index throws RunAlreadyActiveError (not swallowed) and registers no controller', async () => {
|
||||
// The race: the controller's cheap pre-check passed for BOTH concurrent
|
||||
// turns, so the loser's INSERT hits the partial unique index. That rejection
|
||||
// is the authoritative gate — it must surface, not be swallowed into an
|
||||
// untracked turn.
|
||||
const repo = makeRepo({
|
||||
insert: jest.fn(async () => {
|
||||
throw uniqueViolation(ONE_ACTIVE_RUN_PER_CHAT_INDEX);
|
||||
}),
|
||||
});
|
||||
const svc = new AiChatRunService(repo as never);
|
||||
await expect(
|
||||
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
|
||||
).rejects.toBeInstanceOf(RunAlreadyActiveError);
|
||||
// No controller leaked for a rejected start.
|
||||
expect(svc.isLocallyActive('run-1')).toBe(false);
|
||||
});
|
||||
|
||||
it('beginRun does NOT mask an unrelated unique violation as already-active', async () => {
|
||||
// A 23505 on some OTHER constraint is a real bug, not the race — it must
|
||||
// propagate unchanged so it is never silently treated as "already active".
|
||||
const other = uniqueViolation('ai_chat_runs_pkey');
|
||||
const repo = makeRepo({
|
||||
insert: jest.fn(async () => {
|
||||
throw other;
|
||||
}),
|
||||
});
|
||||
const svc = new AiChatRunService(repo as never);
|
||||
await expect(
|
||||
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
|
||||
).rejects.toBe(other);
|
||||
});
|
||||
|
||||
it('beginRun propagates a non-unique insert failure unchanged', async () => {
|
||||
const boom = new Error('connection reset');
|
||||
const repo = makeRepo({
|
||||
insert: jest.fn(async () => {
|
||||
throw boom;
|
||||
}),
|
||||
});
|
||||
const svc = new AiChatRunService(repo as never);
|
||||
await expect(
|
||||
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
|
||||
).rejects.toBe(boom);
|
||||
});
|
||||
|
||||
it('two concurrent begins on one chat: exactly one wins, the other is rejected as already-active', async () => {
|
||||
// Integration-style: model the DB partial unique index with a one-shot slot.
|
||||
// The first insert claims it; the second hits a 23505 on the active index.
|
||||
let slotTaken = false;
|
||||
const repo = makeRepo({
|
||||
insert: jest.fn(async (v: any) => {
|
||||
if (slotTaken) throw uniqueViolation(ONE_ACTIVE_RUN_PER_CHAT_INDEX);
|
||||
slotTaken = true;
|
||||
return { id: 'run-win', status: v.status, chatId: v.chatId };
|
||||
}),
|
||||
});
|
||||
const svc = new AiChatRunService(repo as never);
|
||||
const results = await Promise.allSettled([
|
||||
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
|
||||
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
|
||||
]);
|
||||
const fulfilled = results.filter((r) => r.status === 'fulfilled');
|
||||
const rejected = results.filter((r) => r.status === 'rejected');
|
||||
expect(fulfilled).toHaveLength(1);
|
||||
expect(rejected).toHaveLength(1);
|
||||
expect((rejected[0] as PromiseRejectedResult).reason).toBeInstanceOf(
|
||||
RunAlreadyActiveError,
|
||||
);
|
||||
// Exactly the winner is locally active.
|
||||
expect(svc.isLocallyActive('run-win')).toBe(true);
|
||||
});
|
||||
|
||||
it('a SUBSCRIBER detaching does NOT abort the run (only an explicit stop does)', async () => {
|
||||
const repo = makeRepo();
|
||||
const svc = new AiChatRunService(repo as never);
|
||||
|
||||
@@ -1,6 +1,28 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo';
|
||||
import { AiChatRun } from '@docmost/db/types/entity.types';
|
||||
import { isUniqueViolation, violatedConstraint } from '@docmost/db/utils';
|
||||
|
||||
/** Name of the partial unique index enforcing "one active run per chat" (see the
|
||||
* ai_chat_runs migration). A 23505 on THIS constraint is the race-safe signal
|
||||
* that a concurrent turn already owns the chat — distinct from any other unique
|
||||
* collision, which must NOT be silently treated as "already active". */
|
||||
export const ONE_ACTIVE_RUN_PER_CHAT_INDEX = 'ai_chat_runs_one_active_per_chat';
|
||||
|
||||
/**
|
||||
* Thrown by {@link AiChatRunService.beginRun} when the run-row INSERT loses the
|
||||
* race for a chat's single active slot (the partial unique index rejects it with
|
||||
* a 23505). This is the AUTHORITATIVE concurrency gate: the controller's cheap
|
||||
* pre-check is only a fast-path, and a request that slips past it must NOT run
|
||||
* untracked. The caller (AiChatService.stream) translates this into a 409 and
|
||||
* aborts the turn BEFORE any AI/provider call.
|
||||
*/
|
||||
export class RunAlreadyActiveError extends Error {
|
||||
constructor(public readonly chatId: string) {
|
||||
super(`An agent run is already in progress for chat ${chatId}`);
|
||||
this.name = 'RunAlreadyActiveError';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The terminal status of a TURN (the #183 assistant-row lifecycle) maps onto the
|
||||
@@ -96,9 +118,12 @@ export class AiChatRunService implements OnModuleInit {
|
||||
/**
|
||||
* Start a run for a turn: insert the run row (status 'running', startedAt now),
|
||||
* register a fresh AbortController for it, and return a {@link RunHandle} whose
|
||||
* `signal` the agent loop uses. The DB unique index guarantees at most one
|
||||
* active run per chat — a second concurrent start on the same chat REJECTS at
|
||||
* the insert (the caller surfaces a 409). The controller is registered AFTER a
|
||||
* `signal` the agent loop uses. The DB partial unique index guarantees at most
|
||||
* one active run per chat — a second concurrent start on the same chat REJECTS
|
||||
* at the insert (a 23505 on {@link ONE_ACTIVE_RUN_PER_CHAT_INDEX}). That
|
||||
* rejection is the AUTHORITATIVE race gate: it is surfaced as a distinct
|
||||
* {@link RunAlreadyActiveError} (NOT swallowed), so the caller turns it into a
|
||||
* 409 and never streams an untracked turn. The controller is registered AFTER a
|
||||
* successful insert so a rejected start leaks nothing.
|
||||
*/
|
||||
async beginRun(args: {
|
||||
@@ -107,14 +132,29 @@ export class AiChatRunService implements OnModuleInit {
|
||||
userId: string;
|
||||
trigger?: string;
|
||||
}): Promise<RunHandle> {
|
||||
const run = await this.runRepo.insert({
|
||||
chatId: args.chatId,
|
||||
workspaceId: args.workspaceId,
|
||||
createdBy: args.userId,
|
||||
trigger: args.trigger ?? 'user',
|
||||
status: 'running',
|
||||
startedAt: new Date(),
|
||||
});
|
||||
let run: AiChatRun;
|
||||
try {
|
||||
run = await this.runRepo.insert({
|
||||
chatId: args.chatId,
|
||||
workspaceId: args.workspaceId,
|
||||
createdBy: args.userId,
|
||||
trigger: args.trigger ?? 'user',
|
||||
status: 'running',
|
||||
startedAt: new Date(),
|
||||
});
|
||||
} catch (err) {
|
||||
// The race backstop: a concurrent turn already holds this chat's single
|
||||
// active slot, so the partial unique index rejected our insert. Surface a
|
||||
// distinct signal — the caller MUST reject this turn (409), not run it
|
||||
// untracked. Any OTHER error propagates unchanged.
|
||||
if (
|
||||
isUniqueViolation(err) &&
|
||||
violatedConstraint(err) === ONE_ACTIVE_RUN_PER_CHAT_INDEX
|
||||
) {
|
||||
throw new RunAlreadyActiveError(args.chatId);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const controller = new AbortController();
|
||||
this.active.set(run.id, {
|
||||
controller,
|
||||
|
||||
@@ -314,9 +314,10 @@ export class AiChatController {
|
||||
workspace.id,
|
||||
);
|
||||
if (active) {
|
||||
throw new ConflictException(
|
||||
'An agent run is already in progress for this chat',
|
||||
);
|
||||
throw new ConflictException({
|
||||
message: 'An agent run is already in progress for this chat',
|
||||
code: 'A_RUN_ALREADY_ACTIVE',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -424,13 +425,28 @@ export class AiChatController {
|
||||
runHooks,
|
||||
});
|
||||
} catch (err) {
|
||||
// Any failure AFTER hijack can no longer send a clean JSON error, so emit
|
||||
// a minimal error on the raw socket if nothing has been written yet.
|
||||
this.logger.error('AI chat stream failed', err as Error);
|
||||
// Any failure AFTER hijack can no longer go through Nest's exception
|
||||
// filter, so emit the error on the raw socket if nothing has been written
|
||||
// yet. The lost-the-race 409 (RunAlreadyActiveError -> ConflictException)
|
||||
// is raised by stream() BEFORE it writes a byte, so headers are still
|
||||
// unsent here: honor the HttpException's real status + body (a clean 409),
|
||||
// not a blanket 500. Everything else stays a 500.
|
||||
const isHttp = err instanceof HttpException;
|
||||
if (!isHttp) {
|
||||
this.logger.error('AI chat stream failed', err as Error);
|
||||
}
|
||||
if (!res.raw.headersSent) {
|
||||
res.raw.statusCode = 500;
|
||||
const status = isHttp ? err.getStatus() : 500;
|
||||
const payload = isHttp
|
||||
? err.getResponse()
|
||||
: { error: 'Internal server error' };
|
||||
res.raw.statusCode = status;
|
||||
res.raw.setHeader('Content-Type', 'application/json');
|
||||
res.raw.end(JSON.stringify({ error: 'Internal server error' }));
|
||||
res.raw.end(
|
||||
JSON.stringify(
|
||||
typeof payload === 'string' ? { message: payload } : payload,
|
||||
),
|
||||
);
|
||||
} else if (!res.raw.writableEnded) {
|
||||
res.raw.end();
|
||||
}
|
||||
|
||||
100
apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts
Normal file
100
apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import { ConflictException } from '@nestjs/common';
|
||||
|
||||
// Mock the AI SDK so we can PROVE no provider call is made for the turn we are
|
||||
// about to reject. The race rejection happens at runHooks.begin(), long before
|
||||
// any streamText/generateText, so these never resolve a real model.
|
||||
jest.mock('ai', () => ({
|
||||
streamText: jest.fn(),
|
||||
generateText: jest.fn(),
|
||||
convertToModelMessages: jest.fn(() => []),
|
||||
stepCountIs: jest.fn(() => () => false),
|
||||
}));
|
||||
|
||||
import { streamText, generateText } from 'ai';
|
||||
import { AiChatService } from './ai-chat.service';
|
||||
import { RunAlreadyActiveError } from './ai-chat-run.service';
|
||||
|
||||
/**
|
||||
* Race-closure coverage for the "one active run per chat" guard (#184).
|
||||
*
|
||||
* THE BUG: two simultaneous POST /ai-chat/stream on the same chat both pass the
|
||||
* controller's cheap pre-check (TOCTOU), so the loser's run-row INSERT hits the
|
||||
* partial unique index. Previously that 23505 was SWALLOWED and the second turn
|
||||
* streamed UNTRACKED (no runId, not stoppable). THE FIX: beginRun surfaces a
|
||||
* RunAlreadyActiveError and stream() turns it into a 409 BEFORE any AI call —
|
||||
* the second turn never runs.
|
||||
*/
|
||||
describe('AiChatService.stream — concurrent-run race rejection (#184)', () => {
|
||||
const streamTextMock = streamText as unknown as jest.Mock;
|
||||
const generateTextMock = generateText as unknown as jest.Mock;
|
||||
|
||||
beforeEach(() => {
|
||||
streamTextMock.mockReset();
|
||||
generateTextMock.mockReset();
|
||||
});
|
||||
|
||||
// Minimal service whose only reachable deps before begin() are aiChatRepo
|
||||
// (resolve the existing chat) — everything past begin must remain untouched.
|
||||
function makeService(beginImpl: () => Promise<unknown>) {
|
||||
const aiChatMessageRepo = { insert: jest.fn() };
|
||||
const aiChatRepo = {
|
||||
// An existing chat: stream keeps the supplied chatId and skips creation.
|
||||
findById: jest.fn(async () => ({ id: 'chat-1', workspaceId: 'ws-1' })),
|
||||
insert: jest.fn(),
|
||||
};
|
||||
const svc = new AiChatService(
|
||||
{} as never, // ai
|
||||
aiChatRepo as never,
|
||||
aiChatMessageRepo as never,
|
||||
{} as never, // aiSettings
|
||||
{} as never, // tools
|
||||
{} as never, // mcpClients
|
||||
{} as never, // aiAgentRoleRepo
|
||||
{} as never, // pageRepo
|
||||
{} as never, // pageAccess
|
||||
);
|
||||
const begin = jest.fn(beginImpl);
|
||||
return { svc, begin, aiChatRepo, aiChatMessageRepo };
|
||||
}
|
||||
|
||||
const baseArgs = (begin: jest.Mock) => ({
|
||||
user: { id: 'user-1' } as never,
|
||||
workspace: { id: 'ws-1' } as never,
|
||||
sessionId: 'sess-1',
|
||||
body: { chatId: 'chat-1', messages: [] } as never,
|
||||
res: { raw: {} } as never,
|
||||
signal: new AbortController().signal,
|
||||
model: {} as never,
|
||||
role: null,
|
||||
runHooks: {
|
||||
begin,
|
||||
onAssistantSeeded: jest.fn(),
|
||||
onStep: jest.fn(),
|
||||
onSettled: jest.fn(),
|
||||
} as never,
|
||||
});
|
||||
|
||||
it('rejects the racer with a 409 ConflictException BEFORE any AI call, and never persists an untracked turn', async () => {
|
||||
// begin loses the unique-index race -> RunAlreadyActiveError.
|
||||
const { svc, begin, aiChatMessageRepo } = makeService(() => {
|
||||
throw new RunAlreadyActiveError('chat-1');
|
||||
});
|
||||
|
||||
const promise = svc.stream(baseArgs(begin));
|
||||
|
||||
await expect(promise).rejects.toBeInstanceOf(ConflictException);
|
||||
await promise.catch((err: ConflictException) => {
|
||||
expect(err.getStatus()).toBe(409);
|
||||
expect((err.getResponse() as { code?: string }).code).toBe(
|
||||
'A_RUN_ALREADY_ACTIVE',
|
||||
);
|
||||
});
|
||||
|
||||
// The decisive assertions: the rejected racer spent NO tokens and left NO
|
||||
// untracked turn behind.
|
||||
expect(begin).toHaveBeenCalledTimes(1);
|
||||
expect(streamTextMock).not.toHaveBeenCalled();
|
||||
expect(generateTextMock).not.toHaveBeenCalled();
|
||||
expect(aiChatMessageRepo.insert).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -1,4 +1,5 @@
|
||||
import {
|
||||
ConflictException,
|
||||
ForbiddenException,
|
||||
Injectable,
|
||||
Logger,
|
||||
@@ -30,6 +31,7 @@ import {
|
||||
import { AiChatToolsService } from './tools/ai-chat-tools.service';
|
||||
import { McpClientsService } from './external-mcp/mcp-clients.service';
|
||||
import { buildSystemPrompt } from './ai-chat.prompt';
|
||||
import { RunAlreadyActiveError } from './ai-chat-run.service';
|
||||
import { roleModelOverride } from './roles/role-model-config';
|
||||
import {
|
||||
startSseHeartbeat,
|
||||
@@ -395,8 +397,20 @@ export class AiChatService implements OnModuleInit {
|
||||
effectiveSignal = handle.signal;
|
||||
}
|
||||
} catch (err) {
|
||||
// A failed run start must not break the turn — fall back to the socket
|
||||
// signal (legacy behavior) and stream anyway.
|
||||
// RACE BACKSTOP: the run-row INSERT lost the chat's single active slot
|
||||
// (the partial unique index rejected it). This is the AUTHORITATIVE
|
||||
// concurrency gate — the controller's pre-check is only a fast-path, and a
|
||||
// request that slipped past it must NOT proceed. Reject the turn with a
|
||||
// 409 NOW, BEFORE any AI/provider call: no tokens are spent and no
|
||||
// untracked turn streams. (Matches the controller's pre-check 409.)
|
||||
if (err instanceof RunAlreadyActiveError) {
|
||||
throw new ConflictException({
|
||||
message: 'An agent run is already in progress for this chat',
|
||||
code: 'A_RUN_ALREADY_ACTIVE',
|
||||
});
|
||||
}
|
||||
// Any OTHER run-start failure must not break the turn — fall back to the
|
||||
// socket signal (legacy behavior) and stream anyway.
|
||||
this.logger.error(
|
||||
`Failed to begin agent run (chat ${chatId}); streaming without run tracking`,
|
||||
err as Error,
|
||||
|
||||
Reference in New Issue
Block a user