From 9ad3931a1c6a39fa0042512ed1bc6541991a1137 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Mon, 29 Jun 2026 01:34:43 +0300 Subject: [PATCH] fix(ai-chat): make finalizeRun once-gate atomic against concurrent settle (#184 round-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The F6 once-gate was non-atomic: `settled.has` was read BEFORE the awaited terminal UPDATE and `settled.add` only after, so two concurrent finalizeRun calls for the same run (the documented safety-net catch vs a streamText terminal callback) both passed the check and both wrote the terminal row — double-write + last-write-wins status clobber, a window the bounded retry only widened. Restore a SYNCHRONOUS atomic claim before any await: capture the entry, then `active.delete` as a check-and-clear in one tick. The first caller claims and proceeds; a concurrent second caller finds the entry gone and returns at the claim, before any UPDATE. On a successful write we arm `settled` (post-write idempotency gate) and do not restore; on total bounded-retry failure we restore the claimed entry so a retrier can complete it — never both write and restore. Also fix the F6(b) JSDoc/comment to not overclaim an in-process retrier on the no-streamText path: there the only settler is the safety-net, so recovery on total UPDATE failure is the unconditional boot sweep on the next restart. Adds a concurrency test firing two simultaneous finalizeRun on one run (update held on a pending promise) asserting update is called EXACTLY ONCE; existing F6 retry-rides-transient + retain-on-total-failure tests stay green. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../core/ai-chat/ai-chat-run.service.spec.ts | 49 +++++++++++++ .../src/core/ai-chat/ai-chat-run.service.ts | 68 +++++++++++++------ 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts index dd863bbd..bf7c0825 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -339,6 +339,55 @@ describe('AiChatRunService run lifecycle', () => { ); }); + it('CONCURRENCY: two simultaneous finalizeRun on the same run write the terminal row EXACTLY ONCE (the 2nd caller exits synchronously at the atomic claim)', async () => { + // The CRITICAL race: AiChatService.stream's safety-net catch settles the turn + // to 'error' while a streamText terminal callback also settles it — both call + // finalizeRun for the SAME runId. The once-gate must close ATOMICALLY: a + // `settled.has` check alone is read BEFORE the awaited UPDATE, so both callers + // would pass it and BOTH write the row (last-write-wins clobber + double + // write). The fix claims the run with a SYNCHRONOUS `active.delete` before any + // await, so the second caller returns in the same tick, before the UPDATE. + // + // We force the two calls to overlap by making `update` return a promise we + // resolve only AFTER both finalizeRun calls have run their synchronous bodies. + let resolveUpdate!: (v: unknown) => void; + const updateGate = new Promise((res) => { + resolveUpdate = res; + }); + const update = jest.fn(() => updateGate); + const repo = makeRepo({ update }); + const svc = new AiChatRunService(repo as never, makeEnv() as never); + await svc.beginRun({ + chatId: 'chat-1', + workspaceId: 'ws-1', + userId: 'user-1', + }); + + // Fire both before the (pending) update resolves. The first synchronously + // claims the entry (active.delete) and awaits update; the second, started in + // the same macrotask, finds the entry already gone and returns at the claim + // WITHOUT ever calling update. + const p1 = svc.finalizeRun('run-1', 'ws-1', 'completed'); + const p2 = svc.finalizeRun('run-1', 'ws-1', 'error', 'safety-net'); + + // The decisive assertion: exactly one caller reached the terminal UPDATE. + expect(update).toHaveBeenCalledTimes(1); + + // Let the single in-flight update land; both calls resolve cleanly. + resolveUpdate({ id: 'run-1' }); + await Promise.all([p1, p2]); + + expect(update).toHaveBeenCalledTimes(1); + // The winner is the FIRST caller ('completed' -> 'succeeded'); the late + // 'error' settle never wrote, so it could not clobber the real status. + expect(update).toHaveBeenCalledWith( + 'run-1', + 'ws-1', + expect.objectContaining({ status: 'succeeded' }), + ); + expect(svc.isLocallyActive('run-1')).toBe(false); + }); + it('F6: a TRANSIENT terminal-write failure is ridden out by the bounded retry — the run is settled, not stranded', async () => { // The bug: finalizeRun used to DROP the in-memory entry BEFORE the terminal // UPDATE, then only warn-log a failure. A single transient blip (pool diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts index 5e7b217c..d0625b53 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -263,26 +263,41 @@ export class AiChatRunService implements OnModuleInit { /** * Finalize a run to its terminal status (succeeded / failed / aborted), * stamping finishedAt + any error. Best-effort, but ROBUST against a transient - * terminal-write failure (F6). + * terminal-write failure (F6) AND atomically safe against a concurrent settle. * - * ORDER MATTERS (F6): the terminal UPDATE happens FIRST; only once it SUCCEEDS - * do we record the run as settled and drop its in-memory entry. If the UPDATE - * fails on every bounded attempt we KEEP the in-memory entry and do NOT mark it - * settled — so a later settle (a streamText callback, a requestStop -> onAbort, - * a future sweep) can retry the terminal write. A run is therefore never + * ATOMIC ONCE-CLAIM (the gate must close in ONE synchronous tick): two + * finalizeRun calls for the SAME run can race — the documented real path is + * AiChatService.stream's safety-net catch settling the turn to 'error' while a + * streamText terminal callback (onFinish/onAbort/onError) ALSO settles it. The + * `settled.has` check alone is NOT a gate: it is read BEFORE the awaited UPDATE, + * so two callers can both see `false` and both write the row (last-write-wins + * clobbers the real terminal status, and the bounded retry only widens that + * window). The claim therefore happens via `active.delete`, a SYNCHRONOUS + * check-and-clear with NO await between the gate and the entry removal: the + * second concurrent caller finds the entry already gone and returns in the same + * tick, before any UPDATE. The transition "nobody is finalizing" -> "I am + * finalizing" is thus a single atomic step. + * + * ORDER MATTERS (F6): once we own the claim, the terminal UPDATE happens FIRST; + * only once it SUCCEEDS do we record the run as settled. If the UPDATE fails on + * every bounded attempt we RESTORE the in-memory entry and do NOT mark it + * settled — so a later settle can retry the terminal write and the run is never * silently stranded 'running' (which would 409 every future turn in the chat - * until a restart, since phase 1 has no periodic sweep). + * until a restart). Who that later retrier is depends on the path: when + * streamText attached, its terminal callback (or a requestStop -> onAbort) is a + * second in-process settler that retries; on the NO-streamText path (the turn + * threw before streamText was wired, so ONLY the safety-net ever settles) there + * is no in-process retrier — recovery there is the UNCONDITIONAL boot sweep on + * the next restart (phase 1 has no periodic in-process sweep). The retained + * entry is bounded (cleared on restart) and harmless meanwhile. * * IDEMPOTENT on SUCCESS (#184 review): the terminal write happens AT MOST ONCE - * per run. The once-gate keys off {@link settled} (the terminal row already - * written), NOT off entry-deletion — so a dropped-then-retried write is still - * allowed, while a genuine double-settle collapses to a single write. - * AiChatService.stream wraps the turn in a safety-net catch that settles the run - * to 'error' on any failure BEFORE streamText's terminal callbacks own the - * lifecycle — and on the rare path where streamText DID attach (so a callback - * also settles) both routes call onSettled. The FIRST to write the terminal row - * wins; the second sees `settled` and returns early, so a late settle can never - * clobber the real terminal status or double-write the row. + * per run. After a successful write the once-gate keys off {@link settled} (the + * terminal row already written) so a settle arriving AFTER the entry was already + * dropped-and-settled returns early; a settle racing the in-flight write is + * stopped earlier still, by the `active.delete` claim. Either way a genuine + * double-settle collapses to a single write and a late settle can never clobber + * the real terminal status or double-write the row. */ async finalizeRun( runId: string, @@ -290,8 +305,15 @@ export class AiChatRunService implements OnModuleInit { turnStatus: TurnTerminalStatus, error?: string, ): Promise { + // ---- Atomic once-claim (synchronous; NO await before the gate closes) ---- // Already terminally written -> idempotent no-op. if (this.settled.has(runId)) return; + // Capture the entry BEFORE the delete so a total-failure path can restore it. + const entry = this.active.get(runId); + // SYNCHRONOUS check-and-clear: the FIRST caller deletes (claims) the entry; + // any concurrent SECOND caller finds nothing to delete and returns HERE, in + // the same tick, before any await — so it can never reach the UPDATE. + if (!this.active.delete(runId)) return; for ( let attempt = 1; @@ -304,10 +326,9 @@ export class AiChatRunService implements OnModuleInit { finishedAt: new Date(), error: error ?? null, }); - // Terminal write landed: arm the once-gate, then (and only then) free the - // chat's active slot by dropping the in-memory entry. + // Terminal write landed: arm the once-gate. The entry is already gone + // (claimed above); we do NOT restore it. The slot is now free. this.settled.add(runId); - this.active.delete(runId); return; } catch (err) { this.logger.warn( @@ -320,9 +341,12 @@ export class AiChatRunService implements OnModuleInit { } } } - // Every attempt failed: deliberately KEEP the in-memory entry and leave the - // run UNsettled so a later finalize/requestStop/sweep can retry — the run is - // not stranded. + // Every attempt failed: RESTORE the claimed entry (and leave the run + // UNsettled) so a retrier can try again — the run is not stranded. Where + // streamText attached, its terminal callback / requestStop -> onAbort is that + // retrier; on the no-streamText path the retrier is the boot sweep on the + // next restart. The restored entry is bounded and cleared on restart. + if (entry) this.active.set(runId, entry); } /** Small async backoff between terminal-write retries (F6). Isolated so it is