fix(ai-chat): make finalizeRun once-gate atomic against concurrent settle (#184 round-3)
The F6 once-gate was non-atomic: `settled.has` was read BEFORE the awaited terminal UPDATE and `settled.add` only after, so two concurrent finalizeRun calls for the same run (the documented safety-net catch vs a streamText terminal callback) both passed the check and both wrote the terminal row — double-write + last-write-wins status clobber, a window the bounded retry only widened. Restore a SYNCHRONOUS atomic claim before any await: capture the entry, then `active.delete` as a check-and-clear in one tick. The first caller claims and proceeds; a concurrent second caller finds the entry gone and returns at the claim, before any UPDATE. On a successful write we arm `settled` (post-write idempotency gate) and do not restore; on total bounded-retry failure we restore the claimed entry so a retrier can complete it — never both write and restore. Also fix the F6(b) JSDoc/comment to not overclaim an in-process retrier on the no-streamText path: there the only settler is the safety-net, so recovery on total UPDATE failure is the unconditional boot sweep on the next restart. Adds a concurrency test firing two simultaneous finalizeRun on one run (update held on a pending promise) asserting update is called EXACTLY ONCE; existing F6 retry-rides-transient + retain-on-total-failure tests stay green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -339,6 +339,55 @@ describe('AiChatRunService run lifecycle', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('CONCURRENCY: two simultaneous finalizeRun on the same run write the terminal row EXACTLY ONCE (the 2nd caller exits synchronously at the atomic claim)', async () => {
|
||||
// The CRITICAL race: AiChatService.stream's safety-net catch settles the turn
|
||||
// to 'error' while a streamText terminal callback also settles it — both call
|
||||
// finalizeRun for the SAME runId. The once-gate must close ATOMICALLY: a
|
||||
// `settled.has` check alone is read BEFORE the awaited UPDATE, so both callers
|
||||
// would pass it and BOTH write the row (last-write-wins clobber + double
|
||||
// write). The fix claims the run with a SYNCHRONOUS `active.delete` before any
|
||||
// await, so the second caller returns in the same tick, before the UPDATE.
|
||||
//
|
||||
// We force the two calls to overlap by making `update` return a promise we
|
||||
// resolve only AFTER both finalizeRun calls have run their synchronous bodies.
|
||||
let resolveUpdate!: (v: unknown) => void;
|
||||
const updateGate = new Promise((res) => {
|
||||
resolveUpdate = res;
|
||||
});
|
||||
const update = jest.fn(() => updateGate);
|
||||
const repo = makeRepo({ update });
|
||||
const svc = new AiChatRunService(repo as never, makeEnv() as never);
|
||||
await svc.beginRun({
|
||||
chatId: 'chat-1',
|
||||
workspaceId: 'ws-1',
|
||||
userId: 'user-1',
|
||||
});
|
||||
|
||||
// Fire both before the (pending) update resolves. The first synchronously
|
||||
// claims the entry (active.delete) and awaits update; the second, started in
|
||||
// the same macrotask, finds the entry already gone and returns at the claim
|
||||
// WITHOUT ever calling update.
|
||||
const p1 = svc.finalizeRun('run-1', 'ws-1', 'completed');
|
||||
const p2 = svc.finalizeRun('run-1', 'ws-1', 'error', 'safety-net');
|
||||
|
||||
// The decisive assertion: exactly one caller reached the terminal UPDATE.
|
||||
expect(update).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Let the single in-flight update land; both calls resolve cleanly.
|
||||
resolveUpdate({ id: 'run-1' });
|
||||
await Promise.all([p1, p2]);
|
||||
|
||||
expect(update).toHaveBeenCalledTimes(1);
|
||||
// The winner is the FIRST caller ('completed' -> 'succeeded'); the late
|
||||
// 'error' settle never wrote, so it could not clobber the real status.
|
||||
expect(update).toHaveBeenCalledWith(
|
||||
'run-1',
|
||||
'ws-1',
|
||||
expect.objectContaining({ status: 'succeeded' }),
|
||||
);
|
||||
expect(svc.isLocallyActive('run-1')).toBe(false);
|
||||
});
|
||||
|
||||
it('F6: a TRANSIENT terminal-write failure is ridden out by the bounded retry — the run is settled, not stranded', async () => {
|
||||
// The bug: finalizeRun used to DROP the in-memory entry BEFORE the terminal
|
||||
// UPDATE, then only warn-log a failure. A single transient blip (pool
|
||||
|
||||
@@ -263,26 +263,41 @@ export class AiChatRunService implements OnModuleInit {
|
||||
/**
|
||||
* Finalize a run to its terminal status (succeeded / failed / aborted),
|
||||
* stamping finishedAt + any error. Best-effort, but ROBUST against a transient
|
||||
* terminal-write failure (F6).
|
||||
* terminal-write failure (F6) AND atomically safe against a concurrent settle.
|
||||
*
|
||||
* ORDER MATTERS (F6): the terminal UPDATE happens FIRST; only once it SUCCEEDS
|
||||
* do we record the run as settled and drop its in-memory entry. If the UPDATE
|
||||
* fails on every bounded attempt we KEEP the in-memory entry and do NOT mark it
|
||||
* settled — so a later settle (a streamText callback, a requestStop -> onAbort,
|
||||
* a future sweep) can retry the terminal write. A run is therefore never
|
||||
* ATOMIC ONCE-CLAIM (the gate must close in ONE synchronous tick): two
|
||||
* finalizeRun calls for the SAME run can race — the documented real path is
|
||||
* AiChatService.stream's safety-net catch settling the turn to 'error' while a
|
||||
* streamText terminal callback (onFinish/onAbort/onError) ALSO settles it. The
|
||||
* `settled.has` check alone is NOT a gate: it is read BEFORE the awaited UPDATE,
|
||||
* so two callers can both see `false` and both write the row (last-write-wins
|
||||
* clobbers the real terminal status, and the bounded retry only widens that
|
||||
* window). The claim therefore happens via `active.delete`, a SYNCHRONOUS
|
||||
* check-and-clear with NO await between the gate and the entry removal: the
|
||||
* second concurrent caller finds the entry already gone and returns in the same
|
||||
* tick, before any UPDATE. The transition "nobody is finalizing" -> "I am
|
||||
* finalizing" is thus a single atomic step.
|
||||
*
|
||||
* ORDER MATTERS (F6): once we own the claim, the terminal UPDATE happens FIRST;
|
||||
* only once it SUCCEEDS do we record the run as settled. If the UPDATE fails on
|
||||
* every bounded attempt we RESTORE the in-memory entry and do NOT mark it
|
||||
* settled — so a later settle can retry the terminal write and the run is never
|
||||
* silently stranded 'running' (which would 409 every future turn in the chat
|
||||
* until a restart, since phase 1 has no periodic sweep).
|
||||
* until a restart). Who that later retrier is depends on the path: when
|
||||
* streamText attached, its terminal callback (or a requestStop -> onAbort) is a
|
||||
* second in-process settler that retries; on the NO-streamText path (the turn
|
||||
* threw before streamText was wired, so ONLY the safety-net ever settles) there
|
||||
* is no in-process retrier — recovery there is the UNCONDITIONAL boot sweep on
|
||||
* the next restart (phase 1 has no periodic in-process sweep). The retained
|
||||
* entry is bounded (cleared on restart) and harmless meanwhile.
|
||||
*
|
||||
* IDEMPOTENT on SUCCESS (#184 review): the terminal write happens AT MOST ONCE
|
||||
* per run. The once-gate keys off {@link settled} (the terminal row already
|
||||
* written), NOT off entry-deletion — so a dropped-then-retried write is still
|
||||
* allowed, while a genuine double-settle collapses to a single write.
|
||||
* AiChatService.stream wraps the turn in a safety-net catch that settles the run
|
||||
* to 'error' on any failure BEFORE streamText's terminal callbacks own the
|
||||
* lifecycle — and on the rare path where streamText DID attach (so a callback
|
||||
* also settles) both routes call onSettled. The FIRST to write the terminal row
|
||||
* wins; the second sees `settled` and returns early, so a late settle can never
|
||||
* clobber the real terminal status or double-write the row.
|
||||
* per run. After a successful write the once-gate keys off {@link settled} (the
|
||||
* terminal row already written) so a settle arriving AFTER the entry was already
|
||||
* dropped-and-settled returns early; a settle racing the in-flight write is
|
||||
* stopped earlier still, by the `active.delete` claim. Either way a genuine
|
||||
* double-settle collapses to a single write and a late settle can never clobber
|
||||
* the real terminal status or double-write the row.
|
||||
*/
|
||||
async finalizeRun(
|
||||
runId: string,
|
||||
@@ -290,8 +305,15 @@ export class AiChatRunService implements OnModuleInit {
|
||||
turnStatus: TurnTerminalStatus,
|
||||
error?: string,
|
||||
): Promise<void> {
|
||||
// ---- Atomic once-claim (synchronous; NO await before the gate closes) ----
|
||||
// Already terminally written -> idempotent no-op.
|
||||
if (this.settled.has(runId)) return;
|
||||
// Capture the entry BEFORE the delete so a total-failure path can restore it.
|
||||
const entry = this.active.get(runId);
|
||||
// SYNCHRONOUS check-and-clear: the FIRST caller deletes (claims) the entry;
|
||||
// any concurrent SECOND caller finds nothing to delete and returns HERE, in
|
||||
// the same tick, before any await — so it can never reach the UPDATE.
|
||||
if (!this.active.delete(runId)) return;
|
||||
|
||||
for (
|
||||
let attempt = 1;
|
||||
@@ -304,10 +326,9 @@ export class AiChatRunService implements OnModuleInit {
|
||||
finishedAt: new Date(),
|
||||
error: error ?? null,
|
||||
});
|
||||
// Terminal write landed: arm the once-gate, then (and only then) free the
|
||||
// chat's active slot by dropping the in-memory entry.
|
||||
// Terminal write landed: arm the once-gate. The entry is already gone
|
||||
// (claimed above); we do NOT restore it. The slot is now free.
|
||||
this.settled.add(runId);
|
||||
this.active.delete(runId);
|
||||
return;
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
@@ -320,9 +341,12 @@ export class AiChatRunService implements OnModuleInit {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Every attempt failed: deliberately KEEP the in-memory entry and leave the
|
||||
// run UNsettled so a later finalize/requestStop/sweep can retry — the run is
|
||||
// not stranded.
|
||||
// Every attempt failed: RESTORE the claimed entry (and leave the run
|
||||
// UNsettled) so a retrier can try again — the run is not stranded. Where
|
||||
// streamText attached, its terminal callback / requestStop -> onAbort is that
|
||||
// retrier; on the no-streamText path the retrier is the boot sweep on the
|
||||
// next restart. The restored entry is bounded and cleared on restart.
|
||||
if (entry) this.active.set(runId, entry);
|
||||
}
|
||||
|
||||
/** Small async backoff between terminal-write retries (F6). Isolated so it is
|
||||
|
||||
Reference in New Issue
Block a user