fix(ai-chat): close the concurrent-run race in #184 (insert is the gate)
The "one active run per chat" guard was bypassable under a race. Two simultaneous POST /ai-chat/stream on the same chat both passed the controller's pre-hijack 409 check (a check-then-act TOCTOU), then the loser's INSERT into ai_chat_runs hit the partial unique index (ai_chat_runs_one_active_per_chat, 23505). That error was SWALLOWED, so the second turn streamed UNTRACKED: no runId, not targetable by /stop, and (autonomousRuns on) onClose won't abort it -> an orphan unstoppable run that also spends provider tokens. Make the unique-index INSERT the authoritative gate: - AiChatRunService.beginRun: when the run-row INSERT fails with a 23505 on ONE_ACTIVE_RUN_PER_CHAT_INDEX (via isUniqueViolation/violatedConstraint), no longer swallow it -> throw a distinct RunAlreadyActiveError. Any other error (incl. a 23505 on a different constraint) propagates unchanged. - AiChatService.stream: when begin throws RunAlreadyActiveError, reject the turn with a 409 ConflictException (code A_RUN_ALREADY_ACTIVE) BEFORE any AI/provider call -> no tokens spent, no untracked turn. Other begin failures keep the legacy best-effort fallback (stream socket-bound). - ai-chat.controller: post-hijack catch honors an HttpException's real status/body (clean 409) instead of a blanket 500, since the race 409 is raised before a byte is written. Pre-check 409 now carries the same code. The controller's cheap pre-check stays as a fast-path for the common sequential double-submit; the INSERT violation is the race-safe backstop. Tests: ai-chat-run.service.spec proves beginRun throws RunAlreadyActiveError on the active-index 23505 (and only that constraint), leaks no controller, and an integration-style two-concurrent-begins test where exactly one wins; new ai-chat.service.run-race.spec proves stream rejects with a 409 ConflictException BEFORE any streamText/generateText and never persists an untracked turn. The latter fails without the fix. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,28 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo';
|
||||
import { AiChatRun } from '@docmost/db/types/entity.types';
|
||||
import { isUniqueViolation, violatedConstraint } from '@docmost/db/utils';
|
||||
|
||||
/** Name of the partial unique index enforcing "one active run per chat" (see the
|
||||
* ai_chat_runs migration). A 23505 on THIS constraint is the race-safe signal
|
||||
* that a concurrent turn already owns the chat — distinct from any other unique
|
||||
* collision, which must NOT be silently treated as "already active". */
|
||||
export const ONE_ACTIVE_RUN_PER_CHAT_INDEX = 'ai_chat_runs_one_active_per_chat';
|
||||
|
||||
/**
|
||||
* Thrown by {@link AiChatRunService.beginRun} when the run-row INSERT loses the
|
||||
* race for a chat's single active slot (the partial unique index rejects it with
|
||||
* a 23505). This is the AUTHORITATIVE concurrency gate: the controller's cheap
|
||||
* pre-check is only a fast-path, and a request that slips past it must NOT run
|
||||
* untracked. The caller (AiChatService.stream) translates this into a 409 and
|
||||
* aborts the turn BEFORE any AI/provider call.
|
||||
*/
|
||||
export class RunAlreadyActiveError extends Error {
|
||||
constructor(public readonly chatId: string) {
|
||||
super(`An agent run is already in progress for chat ${chatId}`);
|
||||
this.name = 'RunAlreadyActiveError';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The terminal status of a TURN (the #183 assistant-row lifecycle) maps onto the
|
||||
@@ -96,9 +118,12 @@ export class AiChatRunService implements OnModuleInit {
|
||||
/**
|
||||
* Start a run for a turn: insert the run row (status 'running', startedAt now),
|
||||
* register a fresh AbortController for it, and return a {@link RunHandle} whose
|
||||
* `signal` the agent loop uses. The DB unique index guarantees at most one
|
||||
* active run per chat — a second concurrent start on the same chat REJECTS at
|
||||
* the insert (the caller surfaces a 409). The controller is registered AFTER a
|
||||
* `signal` the agent loop uses. The DB partial unique index guarantees at most
|
||||
* one active run per chat — a second concurrent start on the same chat REJECTS
|
||||
* at the insert (a 23505 on {@link ONE_ACTIVE_RUN_PER_CHAT_INDEX}). That
|
||||
* rejection is the AUTHORITATIVE race gate: it is surfaced as a distinct
|
||||
* {@link RunAlreadyActiveError} (NOT swallowed), so the caller turns it into a
|
||||
* 409 and never streams an untracked turn. The controller is registered AFTER a
|
||||
* successful insert so a rejected start leaks nothing.
|
||||
*/
|
||||
async beginRun(args: {
|
||||
@@ -107,14 +132,29 @@ export class AiChatRunService implements OnModuleInit {
|
||||
userId: string;
|
||||
trigger?: string;
|
||||
}): Promise<RunHandle> {
|
||||
const run = await this.runRepo.insert({
|
||||
chatId: args.chatId,
|
||||
workspaceId: args.workspaceId,
|
||||
createdBy: args.userId,
|
||||
trigger: args.trigger ?? 'user',
|
||||
status: 'running',
|
||||
startedAt: new Date(),
|
||||
});
|
||||
let run: AiChatRun;
|
||||
try {
|
||||
run = await this.runRepo.insert({
|
||||
chatId: args.chatId,
|
||||
workspaceId: args.workspaceId,
|
||||
createdBy: args.userId,
|
||||
trigger: args.trigger ?? 'user',
|
||||
status: 'running',
|
||||
startedAt: new Date(),
|
||||
});
|
||||
} catch (err) {
|
||||
// The race backstop: a concurrent turn already holds this chat's single
|
||||
// active slot, so the partial unique index rejected our insert. Surface a
|
||||
// distinct signal — the caller MUST reject this turn (409), not run it
|
||||
// untracked. Any OTHER error propagates unchanged.
|
||||
if (
|
||||
isUniqueViolation(err) &&
|
||||
violatedConstraint(err) === ONE_ACTIVE_RUN_PER_CHAT_INDEX
|
||||
) {
|
||||
throw new RunAlreadyActiveError(args.chatId);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const controller = new AbortController();
|
||||
this.active.set(run.id, {
|
||||
controller,
|
||||
|
||||
Reference in New Issue
Block a user