fix(ai-chat): guard step-update vs finalize race with WHERE status='streaming' (#183 review)
Review caught a real race: onStepFinish fires `updateStreaming()` fire-and- forget (not awaited), so the FINAL step's streaming UPDATE and the terminal `finalizeAssistant` UPDATE run as two concurrent statements on different pool connections — commit order is not guaranteed. If the late streaming update lands AFTER finalize, the completed row is clobbered back to status='streaming' with no usage/finishReason, and the next startup sweep then mis-marks the finished turn 'aborted'. Green unit/integration tests don't reproduce a cross-connection race. Fix: scope the per-step update with `onlyIfStreaming` → SQL `WHERE status='streaming'`. Once finalize has set a terminal status the late update matches zero rows and no-ops, regardless of commit order; finalize runs unguarded so it always wins. A cheap `if (finalized) return` short-circuit avoids most wasted queries, but the SQL guard is the authoritative fix (the flag can be set after a query is already in flight). Integration test: finalize to 'completed', then a late onlyIfStreaming update is a no-op — status/content/usage preserved. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -420,11 +420,18 @@ export class AiChatService implements OnModuleInit {
|
|||||||
// throws into the stream. Keeps status 'streaming'.
|
// throws into the stream. Keeps status 'streaming'.
|
||||||
const updateStreaming = async (): Promise<void> => {
|
const updateStreaming = async (): Promise<void> => {
|
||||||
if (!assistantId) return;
|
if (!assistantId) return;
|
||||||
|
// Cheap short-circuit once the turn is finalized (see `finalized` below).
|
||||||
|
// The AUTHORITATIVE guard is `onlyIfStreaming` on the UPDATE: a late
|
||||||
|
// fire-and-forget step update could still be in flight on another pool
|
||||||
|
// connection when finalize runs, so the SQL `WHERE status='streaming'`
|
||||||
|
// (not this flag) is what prevents it clobbering the terminal row.
|
||||||
|
if (finalized) return;
|
||||||
try {
|
try {
|
||||||
await this.aiChatMessageRepo.update(
|
await this.aiChatMessageRepo.update(
|
||||||
assistantId,
|
assistantId,
|
||||||
workspace.id,
|
workspace.id,
|
||||||
flushAssistant(capturedSteps, '', 'streaming'),
|
flushAssistant(capturedSteps, '', 'streaming'),
|
||||||
|
{ onlyIfStreaming: true },
|
||||||
);
|
);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
|
|||||||
@@ -134,16 +134,26 @@ export class AiChatMessageRepo {
|
|||||||
metadata: unknown;
|
metadata: unknown;
|
||||||
status: string | null;
|
status: string | null;
|
||||||
}>,
|
}>,
|
||||||
trx?: KyselyTransaction,
|
opts?: { onlyIfStreaming?: boolean; trx?: KyselyTransaction },
|
||||||
): Promise<AiChatMessage | undefined> {
|
): Promise<AiChatMessage | undefined> {
|
||||||
const db = dbOrTx(this.db, trx);
|
const db = dbOrTx(this.db, opts?.trx);
|
||||||
return db
|
let query = db
|
||||||
.updateTable('aiChatMessages')
|
.updateTable('aiChatMessages')
|
||||||
.set({ ...(patch as Record<string, unknown>), updatedAt: new Date() })
|
.set({ ...(patch as Record<string, unknown>), updatedAt: new Date() })
|
||||||
.where('id', '=', id)
|
.where('id', '=', id)
|
||||||
.where('workspaceId', '=', workspaceId)
|
.where('workspaceId', '=', workspaceId);
|
||||||
.returning(this.baseFields)
|
// Concurrency guard (#183 review): a per-step 'streaming' update must NEVER
|
||||||
.executeTakeFirst();
|
// overwrite a row the terminal callback already finalized. onStepFinish
|
||||||
|
// fires the streaming update fire-and-forget, so its UPDATE can land AFTER
|
||||||
|
// finalize on a DIFFERENT pool connection (commit order is not guaranteed).
|
||||||
|
// Scoping the streaming update to rows STILL in 'streaming' makes a late
|
||||||
|
// update a no-op once the row is completed/error/aborted — regardless of
|
||||||
|
// commit order. The terminal finalize runs WITHOUT this guard so it always
|
||||||
|
// wins.
|
||||||
|
if (opts?.onlyIfStreaming) {
|
||||||
|
query = query.where('status', '=', 'streaming');
|
||||||
|
}
|
||||||
|
return query.returning(this.baseFields).executeTakeFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -73,6 +73,40 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('onlyIfStreaming update is a NO-OP once the row is finalized (race guard)', async () => {
|
||||||
|
// Reproduce the step-update-vs-finalize race (#183 review): the row is
|
||||||
|
// finalized to 'completed', then a LATE per-step 'streaming' update lands.
|
||||||
|
// With `onlyIfStreaming` it must match nothing and leave the finalized row
|
||||||
|
// untouched (no clobber back to 'streaming', no lost usage).
|
||||||
|
const seeded = await repo.insert({
|
||||||
|
chatId,
|
||||||
|
workspaceId,
|
||||||
|
userId,
|
||||||
|
role: 'assistant',
|
||||||
|
content: 'partial',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
// Terminal finalize (unguarded) wins.
|
||||||
|
await repo.update(seeded.id, workspaceId, {
|
||||||
|
content: 'final answer',
|
||||||
|
status: 'completed',
|
||||||
|
metadata: { usage: { totalTokens: 42 } } as never,
|
||||||
|
});
|
||||||
|
// A straggler per-step update arrives AFTER finalize.
|
||||||
|
const late = await repo.update(
|
||||||
|
seeded.id,
|
||||||
|
workspaceId,
|
||||||
|
{ content: 'partial', status: 'streaming', metadata: {} as never },
|
||||||
|
{ onlyIfStreaming: true },
|
||||||
|
);
|
||||||
|
expect(late).toBeUndefined(); // matched no 'streaming' row -> no-op
|
||||||
|
const rows = await repo.findAllByChat(chatId, workspaceId);
|
||||||
|
const row = rows.find((r) => r.id === seeded.id)!;
|
||||||
|
expect(row.status).toBe('completed'); // NOT clobbered back to streaming
|
||||||
|
expect(row.content).toBe('final answer');
|
||||||
|
expect((row.metadata as any).usage.totalTokens).toBe(42); // usage preserved
|
||||||
|
});
|
||||||
|
|
||||||
it('update is workspace-scoped: a foreign workspace id matches nothing', async () => {
|
it('update is workspace-scoped: a foreign workspace id matches nothing', async () => {
|
||||||
const seeded = await repo.insert({
|
const seeded = await repo.insert({
|
||||||
chatId,
|
chatId,
|
||||||
|
|||||||
Reference in New Issue
Block a user