From c0844d5431e186811a18b6008c7fd052e50b9746 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Sun, 28 Jun 2026 23:52:32 +0300 Subject: [PATCH] fix(ai-chat): unconditional boot sweep + single-instance guard for autonomous runs (#184) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F1 (DECISION C): make the crash-recovery boot sweep UNCONDITIONAL. A fast restart (deploy/OOM within the old 10-min window of the last step) left a run stuck `running` forever, and the one-active-run gate then 409'd every future turn in that chat. On a fresh single-process boot any pending|running run is definitionally hung, so onModuleInit now settles ALL of them to `aborted` with no staleness window. AiChatRunRepo.sweepRunning takes an optional { staleMs } window, kept ONLY for the future phase-2 multi-instance timer sweep (the boot path passes no window). Repo + service tests assert a fresh `running` run (updatedAt = now) is settled, not skipped. F2 (DECISION A): treat phase-1 autonomousRuns as SINGLE-INSTANCE-ONLY. Stop and its AbortController are process-local, so cross-instance Stop is unreliable (phase 2). AiChatRunService now logs a startup WARNING when a horizontally-scaled deployment is detected — via EnvironmentService.isCloud() (CLOUD=true), the only horizontal-scaling signal this codebase has (the socket.io Redis adapter is always wired since REDIS_URL is mandatory, so it is not a discriminator). The constraint is documented in AGENTS.md. Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 1 + .../core/ai-chat/ai-chat-run.service.spec.ts | 79 +++++++++++++---- .../src/core/ai-chat/ai-chat-run.service.ts | 53 ++++++++++-- .../ai-chat/ai-chat.service.lifecycle.spec.ts | 2 +- .../repos/ai-chat/ai-chat-run.repo.spec.ts | 84 +++++++++++++++++++ .../repos/ai-chat/ai-chat-run.repo.ts | 54 ++++++++---- 6 files changed, 236 insertions(+), 37 deletions(-) create mode 100644 apps/server/src/database/repos/ai-chat/ai-chat-run.repo.spec.ts diff --git a/AGENTS.md b/AGENTS.md index e8eed03d..e517ee6a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -259,6 +259,7 @@ The API server is a Fastify app with a global `/api` prefix (`main.ts` excludes - `core/ai-chat/tools/` — the agent's ~40 read+write tools. Every tool runs under the **calling user's** CASL permissions via a per-user loopback access token (`docmost-client.loader.ts`), so the agent can never exceed what the user could do. Only **reversible** operations are exposed (page history + trash; no permanent delete). Agent edits get an "AI agent" provenance badge in page history (`20260616T130000-agent-provenance` migration). - `core/ai-chat/embedding/` — RAG indexer + a BullMQ consumer on `AI_QUEUE` that embeds pages into `page_embeddings` (vector search), complementing Postgres full-text search. Pages are (re)indexed on edit; `AI_EMBEDDING_TIMEOUT_MS` bounds a hung embeddings endpoint. - `core/ai-chat/external-mcp/` — admins can attach external MCP servers (e.g. Tavily) to give the agent web access. **`ssrf-guard.ts` validates outbound MCP URLs against SSRF** — keep that guard in the path when touching external-MCP connection logic. + - `core/ai-chat/ai-chat-run.service.ts` + `ai_chat_runs` — **detached/autonomous agent runs** (`#184`), behind the per-workspace `settings.ai.autonomousRuns` flag (off by default). When on, a turn becomes a server-side RUN that survives a browser disconnect; only an explicit `POST /ai-chat/stop` ends it, and a client reconnects/live-follows via `POST /ai-chat/run`. **DEPLOY CONSTRAINT — single-instance only in phase 1:** Stop and the AbortController that backs it are process-local, so a Stop only aborts a run executing on the **same** replica that owns it (cross-instance pub/sub stop is phase 2). Do **not** enable `autonomousRuns` on a horizontally-scaled deployment (multiple replicas behind a load balancer, or Docmost cloud `CLOUD=true`) — run a single instance instead. The server logs a startup WARNING when it detects a multi-instance deployment (`CLOUD=true`) so the constraint is visible. The startup sweep settles any run left dangling by a restart. ### Client structure Vite SPA. Code is organized by feature under `apps/client/src/features/*` (mirrors the server domains: `page`, `space`, `comment`, `ai-chat`, `editor`, …). Conventions: diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts index d7c8ce97..e2d40eab 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.spec.ts @@ -29,6 +29,11 @@ function uniqueViolation(constraintName: string): Error & { * abort-signal level (the signal the agent loop actually consumes). */ +/** Minimal EnvironmentService stub. Single-instance (CLOUD unset) by default. */ +function makeEnv(isCloud = false) { + return { isCloud: () => isCloud }; +} + function makeRepo(overrides: Record = {}) { return { insert: jest.fn(async (v: any) => ({ @@ -63,7 +68,7 @@ describe('AiChatRunService.onModuleInit (startup sweep)', () => { const logSpy = jest .spyOn(Logger.prototype, 'log') .mockImplementation(() => undefined); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await expect(svc.onModuleInit()).resolves.toBeUndefined(); expect(repo.sweepRunning).toHaveBeenCalledTimes(1); expect(logSpy).toHaveBeenCalledTimes(1); @@ -79,17 +84,61 @@ describe('AiChatRunService.onModuleInit (startup sweep)', () => { const warnSpy = jest .spyOn(Logger.prototype, 'warn') .mockImplementation(() => undefined); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await expect(svc.onModuleInit()).resolves.toBeUndefined(); - expect(warnSpy).toHaveBeenCalledTimes(1); + // The first warn is the sweep failure (the multi-instance warn never fires + // single-instance), so the message is the db error. expect(String(warnSpy.mock.calls[0][0])).toContain('db down'); }); + + it('F1 (DECISION C): the boot sweep is UNCONDITIONAL — sweepRunning is called with NO staleness window, so a fresh running run (updatedAt = now) is settled, not skipped', async () => { + // The bug: a fast restart (deploy/OOM within minutes of the last step) left a + // run stuck 'running' under the old 10-min window, 409ing every later turn in + // the chat. The fix settles ALL pending|running on boot. We assert the service + // invokes sweepRunning with no `staleMs` (the unconditional path); the repo's + // own spec proves no-window => no updatedAt filter. + const repo = makeRepo({ sweepRunning: jest.fn(async () => 1) }); + jest.spyOn(Logger.prototype, 'log').mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never, makeEnv() as never); + await svc.onModuleInit(); + expect(repo.sweepRunning).toHaveBeenCalledTimes(1); + const callArgs = repo.sweepRunning.mock.calls[0] as unknown[]; + const firstArg = callArgs[0] as { staleMs?: number } | undefined; + // Either no opts at all, or opts without a staleMs window => unconditional. + expect(firstArg?.staleMs).toBeUndefined(); + }); + + it('F2 (DECISION A): warns at startup that autonomousRuns is single-instance-only when a horizontally-scaled deployment (CLOUD) is detected', async () => { + const repo = makeRepo(); + const warnSpy = jest + .spyOn(Logger.prototype, 'warn') + .mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never, makeEnv(true) as never); + await svc.onModuleInit(); + const warned = warnSpy.mock.calls.some((c) => + /single-instance-only/i.test(String(c[0])), + ); + expect(warned).toBe(true); + }); + + it('F2: does NOT warn about multi-instance on a single-instance (CLOUD unset) deployment', async () => { + const repo = makeRepo(); + const warnSpy = jest + .spyOn(Logger.prototype, 'warn') + .mockImplementation(() => undefined); + const svc = new AiChatRunService(repo as never, makeEnv(false) as never); + await svc.onModuleInit(); + const warned = warnSpy.mock.calls.some((c) => + /single-instance-only/i.test(String(c[0])), + ); + expect(warned).toBe(false); + }); }); describe('AiChatRunService run lifecycle', () => { it('beginRun inserts a running row and registers a live abort controller', async () => { const repo = makeRepo(); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); const handle = await svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', @@ -119,7 +168,7 @@ describe('AiChatRunService run lifecycle', () => { throw uniqueViolation(ONE_ACTIVE_RUN_PER_CHAT_INDEX); }), }); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await expect( svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), ).rejects.toBeInstanceOf(RunAlreadyActiveError); @@ -136,7 +185,7 @@ describe('AiChatRunService run lifecycle', () => { throw other; }), }); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await expect( svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), ).rejects.toBe(other); @@ -149,7 +198,7 @@ describe('AiChatRunService run lifecycle', () => { throw boom; }), }); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await expect( svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), ).rejects.toBe(boom); @@ -166,7 +215,7 @@ describe('AiChatRunService run lifecycle', () => { return { id: 'run-win', status: v.status, chatId: v.chatId }; }), }); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); const results = await Promise.allSettled([ svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }), @@ -184,7 +233,7 @@ describe('AiChatRunService run lifecycle', () => { it('a SUBSCRIBER detaching does NOT abort the run (only an explicit stop does)', async () => { const repo = makeRepo(); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); const handle = await svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', @@ -201,7 +250,7 @@ describe('AiChatRunService run lifecycle', () => { it('requestStop aborts the live controller, marks the row, and reports true', async () => { const repo = makeRepo(); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); const handle = await svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', @@ -224,7 +273,7 @@ describe('AiChatRunService run lifecycle', () => { const repo = makeRepo({ markStopRequested: jest.fn(async () => ({ id: 'run-9' })), }); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); const result = await svc.requestStop('run-9', 'ws-1'); expect(result).toBe(true); expect(svc.isLocallyActive('run-9')).toBe(false); @@ -234,14 +283,14 @@ describe('AiChatRunService run lifecycle', () => { const repo = makeRepo({ markStopRequested: jest.fn(async () => undefined), }); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); const result = await svc.requestStop('run-done', 'ws-1'); expect(result).toBe(false); }); it('finalizeRun settles the row to the mapped status with finishedAt and drops the in-memory entry', async () => { const repo = makeRepo(); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', @@ -270,7 +319,7 @@ describe('AiChatRunService run lifecycle', () => { // terminal row; the second must no-op so a late settle can never clobber the // real terminal status or double-write the row. const repo = makeRepo(); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', @@ -297,7 +346,7 @@ describe('AiChatRunService run lifecycle', () => { }), }); jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); - const svc = new AiChatRunService(repo as never); + const svc = new AiChatRunService(repo as never, makeEnv() as never); await expect(svc.recordStep('run-1', 'ws-1', 3)).resolves.toBeUndefined(); await expect( svc.linkAssistantMessage('run-1', 'ws-1', 'msg-1'), diff --git a/apps/server/src/core/ai-chat/ai-chat-run.service.ts b/apps/server/src/core/ai-chat/ai-chat-run.service.ts index c9456420..16f599c2 100644 --- a/apps/server/src/core/ai-chat/ai-chat-run.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat-run.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo'; import { AiChatRun } from '@docmost/db/types/entity.types'; import { isUniqueViolation, violatedConstraint } from '@docmost/db/utils'; +import { EnvironmentService } from '../../integrations/environment/environment.service'; /** Name of the partial unique index enforcing "one active run per chat" (see the * ai_chat_runs migration). A 23505 on THIS constraint is the race-safe signal @@ -90,16 +91,28 @@ export class AiChatRunService implements OnModuleInit { // restart) still records `stop_requested_at` on the row. private readonly active = new Map(); - constructor(private readonly runRepo: AiChatRunRepo) {} + constructor( + private readonly runRepo: AiChatRunRepo, + private readonly environment: EnvironmentService, + ) {} /** - * Crash-recovery sweep on server start: any run left pending/running that has - * been untouched past the staleness window is the relic of a process that died - * mid-turn; flip it to 'aborted'. Best-effort — a sweep failure is logged but - * MUST NOT block startup (mirrors AiChatService.onModuleInit for #183). + * Crash-recovery sweep on server start: settle EVERY run still left + * pending/running to 'aborted' (F1 / DECISION C). The boot sweep is + * UNCONDITIONAL — no staleness window — because phase 1 is single-process: on a + * fresh boot any pending|running run is definitionally hung (no live runner owns + * it), so even a fast restart (deploy/OOM within minutes of the last step) can + * no longer leave a run stuck 'running' forever (which would make the + * one-active-run gate 409 every future turn in that chat). The staleness window + * is reintroduced only for the phase-2 multi-instance timer sweep, where a + * booting replica must not abort a run another replica is actively executing. + * Best-effort — a sweep failure is logged but MUST NOT block startup (mirrors + * AiChatService.onModuleInit for #183). */ async onModuleInit(): Promise { + this.warnIfMultiInstance(); try { + // No `staleMs`: unconditional boot sweep (F1). See AiChatRunRepo.sweepRunning. const swept = await this.runRepo.sweepRunning(); if (swept > 0) { this.logger.log( @@ -115,6 +128,36 @@ export class AiChatRunService implements OnModuleInit { } } + /** + * F2 (DECISION A): autonomous runs are SINGLE-INSTANCE-ONLY in phase 1. An + * explicit Stop, and the in-memory AbortController that backs it, are + * process-local: a Stop only aborts the live turn if it lands on the SAME + * replica that owns the run (it still stamps `stop_requested_at` cross-instance, + * but nothing reads that flag during an active run yet). Cross-instance pub/sub + * stop is phase 2. So if the deployment is horizontally scaled, warn loudly at + * startup that a Stop may not reach a run executing on another replica. + * + * DETECTION: this codebase always wires the socket.io Redis adapter (REDIS_URL + * is mandatory), so the adapter alone is NOT a horizontal-scaling signal. The + * authoritative signal the codebase has is `CLOUD=true` (EnvironmentService + * .isCloud()), the Docmost-cloud multi-replica deployment. We warn whenever that + * is set, because any workspace could enable settings.ai.autonomousRuns. A + * self-hosted operator running multiple replicas behind a load balancer is also + * multi-instance; the deploy docs (.env.example / AGENTS.md) spell out the + * single-instance constraint for that case. + */ + private warnIfMultiInstance(): void { + if (this.environment.isCloud()) { + this.logger.warn( + 'Autonomous agent runs (settings.ai.autonomousRuns) are SINGLE-INSTANCE-ONLY ' + + 'in phase 1: a horizontally-scaled deployment was detected (CLOUD=true). ' + + 'An explicit Stop only aborts a run executing on the same replica that owns ' + + 'it (cross-instance Stop is not yet reliable — phase 2). Run a single ' + + 'instance if you enable autonomousRuns, or keep the flag off.', + ); + } + } + /** * Start a run for a turn: insert the run row (status 'running', startedAt now), * register a fresh AbortController for it, and return a {@link RunHandle} whose diff --git a/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts index 602773d1..8e46fd73 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.lifecycle.spec.ts @@ -88,7 +88,7 @@ describe('AiChatService.stream run-lifecycle safety net (#184)', () => { insert: jest.fn().mockResolvedValue({ id: 'run-1', status: 'running' }), update: jest.fn().mockResolvedValue({ id: 'run-1' }), }; - const runService = new AiChatRunService(runRepo as never); + const runService = new AiChatRunService(runRepo as never, { isCloud: () => false } as never); // The user-message insert (the first bare await after beginRun) throws. const aiChatMessageRepo = { diff --git a/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.spec.ts b/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.spec.ts new file mode 100644 index 00000000..1b5407de --- /dev/null +++ b/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.spec.ts @@ -0,0 +1,84 @@ +import { AiChatRunRepo, SWEEP_RUN_STALE_MS } from './ai-chat-run.repo'; +import type { KyselyDB } from '../../types/kysely.types'; + +/** + * Unit coverage for AiChatRunRepo.sweepRunning over a chainable builder mock (no + * live DB). The F1 invariant under test (DECISION C): the BOOT sweep is + * UNCONDITIONAL — it adds NO `updatedAt <` predicate, so a fresh 'running' run + * (updatedAt = now) IS settled rather than skipped by a staleness window. The + * window is added ONLY when an explicit `staleMs` is supplied (the future phase-2 + * multi-instance timer sweep). We assert the EXACT predicates the spec mandates. + */ +describe('AiChatRunRepo.sweepRunning', () => { + type Recorded = { + table?: string; + set?: Record; + wheres: Array<[string, string, unknown]>; + returning?: string; + }; + + function makeDb(swept: Array<{ id: string }>): { + db: KyselyDB; + rec: Recorded; + } { + const rec: Recorded = { wheres: [] }; + const builder: Record = {}; + const chain = () => builder; + builder.set = (v: Record) => { + rec.set = v; + return builder; + }; + builder.where = (col: string, op: string, val: unknown) => { + rec.wheres.push([col, op, val]); + return builder; + }; + builder.returning = (col: string) => { + rec.returning = col; + return builder; + }; + builder.execute = () => Promise.resolve(swept); + void chain; + const db = { + updateTable: (table: string) => { + rec.table = table; + return builder; + }, + } as unknown as KyselyDB; + return { db, rec }; + } + + it('F1: the boot sweep (no staleMs) is UNCONDITIONAL — only a status filter, NO updatedAt window', async () => { + const { db, rec } = makeDb([{ id: 'r1' }, { id: 'r2' }]); + const repo = new AiChatRunRepo(db); + + const swept = await repo.sweepRunning(); + + expect(swept).toBe(2); + expect(rec.table).toBe('aiChatRuns'); + // The status filter is always present... + expect(rec.wheres).toContainEqual([ + 'status', + 'in', + expect.arrayContaining(['pending', 'running']), + ]); + // ...but a fresh 'running' run (updatedAt = now) must NOT be skipped: no + // updatedAt predicate at all on the boot path. + expect(rec.wheres.some(([col]) => col === 'updatedAt')).toBe(false); + // It flips to 'aborted' and stamps finishedAt. + expect(rec.set).toEqual( + expect.objectContaining({ status: 'aborted', finishedAt: expect.any(Date) }), + ); + }); + + it('phase-2 path: an explicit staleMs reintroduces the updatedAt window', async () => { + const { db, rec } = makeDb([]); + const repo = new AiChatRunRepo(db); + + await repo.sweepRunning({ staleMs: SWEEP_RUN_STALE_MS }); + + const updatedAtWhere = rec.wheres.find(([col]) => col === 'updatedAt'); + expect(updatedAtWhere).toBeDefined(); + expect(updatedAtWhere![1]).toBe('<'); + expect(updatedAtWhere![2]).toBeInstanceOf(Date); + }); +}); diff --git a/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts b/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts index ba3e2179..7bb6bcdb 100644 --- a/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts +++ b/apps/server/src/database/repos/ai-chat/ai-chat-run.repo.ts @@ -14,11 +14,14 @@ import { export const ACTIVE_RUN_STATUSES = ['pending', 'running'] as const; // Crash-recovery sweep recency threshold (mirrors AiChatMessageRepo.sweepStreaming, -// #183): a 'running'/'pending' run is only swept to 'aborted' once it has been -// UNTOUCHED for this long, so a fresh replica's boot-sweep can never abort a run -// another replica is actively executing. The runner bumps `updatedAt` on every -// step, so a live run never matches. -const SWEEP_RUN_STALE_MS = 10 * 60 * 1000; // 10 minutes +// #183): when a staleness window is supplied, a 'running'/'pending' run is only +// swept to 'aborted' once it has been UNTOUCHED for this long, so a sibling +// replica's boot-sweep can never abort a run another replica is actively +// executing. The runner bumps `updatedAt` on every step, so a live run never +// matches. PHASE 1 is single-process and the boot sweep passes NO window (every +// dangling run is settled unconditionally — see sweepRunning / F1). This constant +// is the window to reintroduce for the phase-2 multi-instance timer sweep. +export const SWEEP_RUN_STALE_MS = 10 * 60 * 1000; // 10 minutes /** * Repository for `ai_chat_runs` (#184 phase 1): the agent run as a first-class, @@ -164,16 +167,30 @@ export class AiChatRunRepo { /** * Crash-recovery sweep (mirrors AiChatMessageRepo.sweepStreaming): flip every - * run still left pending/running but UNTOUCHED for SWEEP_RUN_STALE_MS — a run - * whose process died before reaching a terminal status — to 'aborted', stamping - * `finished_at`. Run once on server start. Returns the number swept. - * Workspace-wide on purpose (a crash can dangle runs in any workspace). + * run still left pending/running — a run whose process died before reaching a + * terminal status — to 'aborted', stamping `finished_at`. Returns the number + * swept. Workspace-wide on purpose (a crash can dangle runs in any workspace). + * + * F1 (DECISION C): the BOOT sweep is UNCONDITIONAL — it passes no `staleMs`, so + * EVERY dangling run is settled regardless of how recently it was touched. On a + * fresh single-process boot any pending|running run is definitionally hung (no + * runner is alive to own it), so a fast restart (deploy/OOM within minutes of + * the last step) no longer leaves a run stuck 'running' forever — which would + * make the one-active-run gate 409 every future turn in that chat. + * + * The optional `staleMs` window is reintroduced ONLY for the future phase-2 + * multi-instance timer sweep (see {@link SWEEP_RUN_STALE_MS}): there a booting + * replica must NOT abort a run another replica is actively executing, so it + * sweeps only runs UNTOUCHED past the window. Phase 1 is single-process, so the + * boot path supplies no window. */ - async sweepRunning(trx?: KyselyTransaction): Promise { + async sweepRunning( + opts: { staleMs?: number } = {}, + trx?: KyselyTransaction, + ): Promise { const db = dbOrTx(this.db, trx); const now = new Date(); - const staleBefore = new Date(now.getTime() - SWEEP_RUN_STALE_MS); - const rows = await db + let query = db .updateTable('aiChatRuns') .set({ status: 'aborted', @@ -181,10 +198,15 @@ export class AiChatRunRepo { updatedAt: now, error: sql`coalesce(error, ${'Run interrupted by a server restart.'})`, }) - .where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[]) - .where('updatedAt', '<', staleBefore) - .returning('id') - .execute(); + .where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[]); + // Multi-instance (phase 2) only: skip runs touched within the window so a + // sibling replica's live run is never aborted. Omitted on the phase-1 boot + // sweep -> unconditional. + if (typeof opts.staleMs === 'number') { + const staleBefore = new Date(now.getTime() - opts.staleMs); + query = query.where('updatedAt', '<', staleBefore); + } + const rows = await query.returning('id').execute(); return rows.length; } }