diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index f35cde1a..15877a52 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -420,11 +420,18 @@ export class AiChatService implements OnModuleInit { // throws into the stream. Keeps status 'streaming'. const updateStreaming = async (): Promise => { if (!assistantId) return; + // Cheap short-circuit once the turn is finalized (see `finalized` below). + // The AUTHORITATIVE guard is `onlyIfStreaming` on the UPDATE: a late + // fire-and-forget step update could still be in flight on another pool + // connection when finalize runs, so the SQL `WHERE status='streaming'` + // (not this flag) is what prevents it clobbering the terminal row. + if (finalized) return; try { await this.aiChatMessageRepo.update( assistantId, workspace.id, flushAssistant(capturedSteps, '', 'streaming'), + { onlyIfStreaming: true }, ); } catch (err) { this.logger.warn( diff --git a/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts b/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts index 88fe00ed..005d7def 100644 --- a/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts +++ b/apps/server/src/database/repos/ai-chat/ai-chat-message.repo.ts @@ -134,16 +134,26 @@ export class AiChatMessageRepo { metadata: unknown; status: string | null; }>, - trx?: KyselyTransaction, + opts?: { onlyIfStreaming?: boolean; trx?: KyselyTransaction }, ): Promise { - const db = dbOrTx(this.db, trx); - return db + const db = dbOrTx(this.db, opts?.trx); + let query = db .updateTable('aiChatMessages') .set({ ...(patch as Record), updatedAt: new Date() }) .where('id', '=', id) - .where('workspaceId', '=', workspaceId) - .returning(this.baseFields) - .executeTakeFirst(); + .where('workspaceId', '=', workspaceId); + // Concurrency guard (#183 review): a per-step 'streaming' update must NEVER + // overwrite a row the terminal callback already finalized. onStepFinish + // fires the streaming update fire-and-forget, so its UPDATE can land AFTER + // finalize on a DIFFERENT pool connection (commit order is not guaranteed). + // Scoping the streaming update to rows STILL in 'streaming' makes a late + // update a no-op once the row is completed/error/aborted — regardless of + // commit order. The terminal finalize runs WITHOUT this guard so it always + // wins. + if (opts?.onlyIfStreaming) { + query = query.where('status', '=', 'streaming'); + } + return query.returning(this.baseFields).executeTakeFirst(); } /** diff --git a/apps/server/test/integration/ai-chat-message-status.int-spec.ts b/apps/server/test/integration/ai-chat-message-status.int-spec.ts index bcec6427..2299e658 100644 --- a/apps/server/test/integration/ai-chat-message-status.int-spec.ts +++ b/apps/server/test/integration/ai-chat-message-status.int-spec.ts @@ -73,6 +73,40 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => { ); }); + it('onlyIfStreaming update is a NO-OP once the row is finalized (race guard)', async () => { + // Reproduce the step-update-vs-finalize race (#183 review): the row is + // finalized to 'completed', then a LATE per-step 'streaming' update lands. + // With `onlyIfStreaming` it must match nothing and leave the finalized row + // untouched (no clobber back to 'streaming', no lost usage). + const seeded = await repo.insert({ + chatId, + workspaceId, + userId, + role: 'assistant', + content: 'partial', + status: 'streaming', + }); + // Terminal finalize (unguarded) wins. + await repo.update(seeded.id, workspaceId, { + content: 'final answer', + status: 'completed', + metadata: { usage: { totalTokens: 42 } } as never, + }); + // A straggler per-step update arrives AFTER finalize. + const late = await repo.update( + seeded.id, + workspaceId, + { content: 'partial', status: 'streaming', metadata: {} as never }, + { onlyIfStreaming: true }, + ); + expect(late).toBeUndefined(); // matched no 'streaming' row -> no-op + const rows = await repo.findAllByChat(chatId, workspaceId); + const row = rows.find((r) => r.id === seeded.id)!; + expect(row.status).toBe('completed'); // NOT clobbered back to streaming + expect(row.content).toBe('final answer'); + expect((row.metadata as any).usage.totalTokens).toBe(42); // usage preserved + }); + it('update is workspace-scoped: a foreign workspace id matches nothing', async () => { const seeded = await repo.insert({ chatId,