diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts index bf7c0825..c95a4d7e 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -435,6 +435,9 @@ describe('AiChatRunService run lifecycle', () => { }), }); jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); + const errorSpy = jest + .spyOn(Logger.prototype, 'error') + .mockImplementation(() => undefined); const svc = new AiChatRunService(repo as never, makeEnv() as never); await svc.beginRun({ chatId: 'chat-1', @@ -445,6 +448,16 @@ describe('AiChatRunService run lifecycle', () => { // First settle: every bounded attempt fails -> entry retained, NOT settled. await svc.finalizeRun('run-1', 'ws-1', 'completed'); expect(svc.isLocallyActive('run-1')).toBe(true); + // F12: the give-up emits ONE explicit, greppable ERROR (run + chat context) + // so an operator can tell "gave up, run held in memory" from a per-attempt + // blip — distinct from the per-attempt warns. + const gaveUp = errorSpy.mock.calls.some( + (c) => + /NON-TERMINAL/.test(String(c[0])) && + /run-1/.test(String(c[0])) && + /chat-1/.test(String(c[0])), + ); + expect(gaveUp).toBe(true); // The DB recovers; a later settle now succeeds and frees the slot. healthy = true; diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts index d0625b53..9ec61df6 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -280,16 +280,20 @@ export class AiChatRunService implements OnModuleInit { * * ORDER MATTERS (F6): once we own the claim, the terminal UPDATE happens FIRST; * only once it SUCCEEDS do we record the run as settled. If the UPDATE fails on - * every bounded attempt we RESTORE the in-memory entry and do NOT mark it - * settled — so a later settle can retry the terminal write and the run is never - * silently stranded 'running' (which would 409 every future turn in the chat - * until a restart). Who that later retrier is depends on the path: when - * streamText attached, its terminal callback (or a requestStop -> onAbort) is a - * second in-process settler that retries; on the NO-streamText path (the turn - * threw before streamText was wired, so ONLY the safety-net ever settles) there - * is no in-process retrier — recovery there is the UNCONDITIONAL boot sweep on - * the next restart (phase 1 has no periodic in-process sweep). The retained - * entry is bounded (cleared on restart) and harmless meanwhile. + * every bounded attempt we RESTORE the in-memory entry, leave the run UNsettled, + * and emit an ERROR signal that the row is left non-terminal 'running' (which + * would 409 every future turn in the chat until recovery). An in-process retry + * by a LATER settle is only POSSIBLE, never guaranteed: it needs (a) the entry + * to have been restored at the give-up path AND (b) a fresh settler to arrive + * AFTER that restore. A concurrent settler that arrives DURING the retry window + * — while the entry is deleted for backoff and not yet restored — is consumed at + * the synchronous `active.delete` claim (it finds nothing to delete and returns + * a no-op), so it does NOT become an in-process retrier. The NO-streamText path + * (the turn threw before streamText was wired, so ONLY the safety-net ever + * settles) likewise has no second in-process settler at all. The UNCONDITIONAL + * backstop in every case is the boot sweep on the next restart (phase 1 has no + * periodic in-process sweep); the retained entry is bounded (cleared on restart) + * and harmless meanwhile. * * IDEMPOTENT on SUCCESS (#184 review): the terminal write happens AT MOST ONCE * per run. After a successful write the once-gate keys off {@link settled} (the @@ -315,6 +319,7 @@ export class AiChatRunService implements OnModuleInit { // the same tick, before any await — so it can never reach the UPDATE. if (!this.active.delete(runId)) return; + let lastError: unknown; for ( let attempt = 1; attempt <= AiChatRunService.FINALIZE_MAX_ATTEMPTS; @@ -331,6 +336,7 @@ export class AiChatRunService implements OnModuleInit { this.settled.add(runId); return; } catch (err) { + lastError = err; this.logger.warn( `Failed to finalize run ${runId} (attempt ${attempt}/${ AiChatRunService.FINALIZE_MAX_ATTEMPTS @@ -341,11 +347,25 @@ export class AiChatRunService implements OnModuleInit { } } } - // Every attempt failed: RESTORE the claimed entry (and leave the run - // UNsettled) so a retrier can try again — the run is not stranded. Where - // streamText attached, its terminal callback / requestStop -> onAbort is that - // retrier; on the no-streamText path the retrier is the boot sweep on the - // next restart. The restored entry is bounded and cleared on restart. + // Every attempt failed: this is a give-up, materially worse than a per-attempt + // blip — the row is left NON-TERMINAL ('running'), so emit ONE explicit, + // greppable ERROR so an operator can tell "survived a blip" from "gave up, run + // held in memory until recovery" (the last warn alone says only "attempt 3/3"). + this.logger.error( + `Run ${runId} (chat ${entry?.chatId ?? 'unknown'}) left NON-TERMINAL ` + + `('running'): terminal write failed after ${ + AiChatRunService.FINALIZE_MAX_ATTEMPTS + } attempts; entry retained in memory, recovery deferred to next settle / ` + + `boot sweep`, + lastError, + ); + // RESTORE the claimed entry (and leave the run UNsettled) so a LATER settle + // that arrives AFTER this restore MAY retry the terminal write — but that + // in-process retry is NOT guaranteed (a concurrent settler caught in the retry + // window above is consumed at the `active.delete` claim, and the no-streamText + // path has no second settler at all). The UNCONDITIONAL backstop in every case + // is the boot sweep on the next restart; the restored entry is bounded and + // cleared on restart. if (entry) this.active.set(runId, entry); }