diff --git a/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts index 747130ba..5fea145a 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.run-race.spec.ts @@ -335,3 +335,149 @@ describe('AiChatService.stream — abortSignal wiring (#184 F3)', () => { ); }); }); + +/** + * F14 — the begin-failure RESILIENCE branch (the `else` of the run-race guard). + * + * stream() wraps runHooks.begin in try/catch with TWO branches: + * - RunAlreadyActiveError -> 409 ConflictException (pinned above). + * - ANY OTHER begin failure -> SWALLOW + continue UNTRACKED on the socket signal + * (legacy fallback): it logs "...streaming without run tracking", leaves + * `effectiveSignal = signal` (runId undefined) and serves the turn anyway. + * + * The contract: a transient beginRun failure (e.g. a non-unique DB error inserting + * the run row) must STILL serve the user's turn — it must NOT re-throw and must NOT + * be misclassified as a 409. A regression that re-threw here would break EVERY turn + * on a begin failure with nothing to catch it. This branch is otherwise undriven by + * any spec, so it is pinned here SEPARATELY from the 409 path: a plain begin error + * proceeds to streamText with the SOCKET signal and still persists the user turn. + */ +describe('AiChatService.stream — begin-failure resilience / legacy fallback (#184 F14)', () => { + const streamTextMock = streamText as unknown as jest.Mock; + + function makeStreamResult() { + return { + consumeStream: jest.fn(), + pipeUIMessageStreamToResponse: jest.fn(), + }; + } + + function makeRes() { + return { + raw: { + writeHead: jest.fn(), + write: jest.fn(), + once: jest.fn(), + on: jest.fn(), + flushHeaders: jest.fn(), + writableEnded: false, + destroyed: false, + }, + }; + } + + // Same harness as the F3 abortSignal block, but it also exposes + // aiChatMessageRepo so we can assert the user turn IS persisted (the turn really + // streamed) despite begin() blowing up. + function makeService() { + const aiChatRepo = { + findById: jest.fn(async () => ({ id: 'chat-1', workspaceId: 'ws-1' })), + insert: jest.fn(), + }; + const aiChatMessageRepo = { + insert: jest.fn(async () => ({ id: 'msg-1' })), + findAllByChat: jest.fn(async () => []), + update: jest.fn(async () => ({ id: 'msg-1' })), + }; + const aiSettings = { resolve: jest.fn(async () => ({})) }; + const tools = { forUser: jest.fn(async () => ({})) }; + const mcpClients = { + toolsFor: jest.fn(async () => ({ + tools: {}, + clients: [], + outcomes: [], + instructions: [], + })), + }; + const svc = new AiChatService( + {} as never, // ai + aiChatRepo as never, + aiChatMessageRepo as never, + aiSettings as never, + tools as never, + mcpClients as never, + {} as never, // aiAgentRoleRepo + {} as never, // pageRepo + {} as never, // pageAccess + ); + return { svc, aiChatMessageRepo }; + } + + const body = { + chatId: 'chat-1', + messages: [ + { id: 'm1', role: 'user', parts: [{ type: 'text', text: 'hi' }] }, + ], + }; + + beforeEach(() => { + streamTextMock.mockReset(); + streamTextMock.mockImplementation(() => makeStreamResult()); + jest + .spyOn(Logger.prototype, 'log') + .mockImplementation(() => undefined as never); + }); + + afterEach(() => jest.restoreAllMocks()); + + it('a PLAIN begin() failure (NOT RunAlreadyActiveError) does NOT 409 — it swallows, logs, and streams the turn UNTRACKED on the socket signal', async () => { + const errorSpy = jest + .spyOn(Logger.prototype, 'error') + .mockImplementation(() => undefined as never); + + const { svc, aiChatMessageRepo } = makeService(); + const socketSignal = new AbortController().signal; + + // A transient, NON-race begin failure (e.g. a non-unique DB error inserting + // the run row). This is the `else` branch of the begin try/catch. + const begin = jest.fn(async () => { + throw new Error('insert failed'); + }); + + const promise = svc.stream({ + user: { id: 'user-1' } as never, + workspace: { id: 'ws-1' } as never, + sessionId: 'sess-1', + body: body as never, + res: makeRes() as never, + signal: socketSignal, + model: {} as never, + role: null, + runHooks: { + begin, + onAssistantSeeded: jest.fn(), + onStep: jest.fn(), + onSettled: jest.fn(), + } as never, + }); + + // The turn proceeds: NO throw at all (in particular NOT a 409). + await expect(promise).resolves.toBeUndefined(); + + expect(begin).toHaveBeenCalledTimes(1); + + // The resilience branch logged the legacy-fallback warning. + expect(errorSpy).toHaveBeenCalledWith( + expect.stringContaining('streaming without run tracking'), + expect.anything(), + ); + + // The turn really streamed: the user message was persisted and streamText ran. + expect(aiChatMessageRepo.insert).toHaveBeenCalled(); + expect(streamTextMock).toHaveBeenCalledTimes(1); + + // The decisive wiring: with no run handle, the fallback uses the SOCKET signal + // (effectiveSignal = signal, runId undefined) — not a run-bound signal. + expect(streamTextMock.mock.calls[0][0].abortSignal).toBe(socketSignal); + }); +});