diff --git a/apps/server/src/core/ai-chat/ai-chat.service.spec.ts b/apps/server/src/core/ai-chat/ai-chat.service.spec.ts index b788646e..d6f333d0 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.spec.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.spec.ts @@ -4,7 +4,7 @@ import { serializeSteps, rowToUiMessage, prepareAgentStep, - buildErrorAssistantRecord, + buildPartialAssistantRecord, MAX_AGENT_STEPS, FINAL_STEP_INSTRUCTION, } from './ai-chat.service'; @@ -232,16 +232,19 @@ describe('prepareAgentStep', () => { }); /** - * Unit test for buildErrorAssistantRecord: the pure helper that shapes the - * assistant-message record persisted on a first-turn (or any) stream failure. - * The streamText onError callback builds the formatted error text via - * describeProviderError (tested separately) and hands it to this helper; pinning - * the record shape here covers the persist-assistant-on-error logic without - * having to seam streamText itself. + * Unit test for buildPartialAssistantRecord: the pure helper that shapes the + * assistant-message record persisted on a partial/failed turn (the streamText + * onError / onAbort paths). It captures the PARTIAL answer the user already saw + * (finished steps' text + tool parts, plus the in-progress step's text) so a + * provider error / disconnect no longer throws the streamed answer away. Pinning + * the record shape here covers the persist-partial logic without seaming + * streamText itself. */ -describe('buildErrorAssistantRecord', () => { - it('records an empty turn with the error text in metadata (finishReason=error)', () => { - const rec = buildErrorAssistantRecord('401: Unauthorized'); +describe('buildPartialAssistantRecord', () => { + type AnyPart = Record; + + it('records an empty turn with the error text (preserves old behavior)', () => { + const rec = buildPartialAssistantRecord([], '', 'error', '401: Unauthorized'); expect(rec).toEqual({ text: '', toolCalls: null, @@ -249,13 +252,46 @@ describe('buildErrorAssistantRecord', () => { }); }); - it('always produces empty text + empty parts so a failed turn is still recorded', () => { - const rec = buildErrorAssistantRecord('boom'); - // No partial text and no UI parts: the turn exists in history but renders as - // an error, with the cause preserved in metadata.error. - expect(rec.text).toBe(''); - expect(rec.metadata.parts).toEqual([]); - expect(rec.toolCalls).toBeNull(); + it('persists in-progress text (no finished steps) as the partial answer', () => { + const rec = buildPartialAssistantRecord([], 'partial answer', 'error', 'boom'); + expect(rec.text).toBe('partial answer'); + expect(rec.metadata.parts).toEqual([ + { type: 'text', text: 'partial answer' }, + ]); expect(rec.metadata.error).toBe('boom'); }); + + it('combines a finished tool step with trailing in-progress text', () => { + const steps = [ + { + text: 'looked it up', + toolCalls: [ + { toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } }, + ], + toolResults: [ + { toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } }, + ], + }, + ]; + const rec = buildPartialAssistantRecord(steps, ' and then', 'error', 'boom'); + const parts = rec.metadata.parts as AnyPart[]; + // The finished step's text part is present. + expect(parts).toContainEqual({ type: 'text', text: 'looked it up' }); + // The paired tool call+result becomes an output-available part. + const toolPart = parts.find((p) => p.type === 'tool-getPage'); + expect(toolPart).toBeDefined(); + expect(toolPart!.state).toBe('output-available'); + // The in-progress text is appended LAST so the parts match the stream order. + expect(parts[parts.length - 1]).toEqual({ type: 'text', text: ' and then' }); + expect(rec.text).toBe('looked it up and then'); + expect(rec.toolCalls).not.toBeNull(); + expect(rec.metadata.error).toBe('boom'); + }); + + it('omits the error key on the abort path (no errorText)', () => { + const rec = buildPartialAssistantRecord([], 'half', 'aborted'); + expect(rec.metadata.finishReason).toBe('aborted'); + expect('error' in rec.metadata).toBe(false); + expect(rec.text).toBe('half'); + }); }); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index 2f50bbcb..d44a9abd 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -368,6 +368,14 @@ export class AiChatService { } }; + // Accumulate the turn's streamed output so a provider error / disconnect can + // persist the PARTIAL answer the user already saw — the SDK's onError/onAbort + // callbacks don't hand us the in-progress text. `capturedSteps` holds finished + // steps (tool calls + their text); `inProgressText` holds the text streamed in + // the CURRENT, not-yet-finished step, reset whenever a step finishes. + const capturedSteps: StepLike[] = []; + let inProgressText = ''; + // NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous // failure here (or in pipe below) would skip the terminal callbacks, so the // catch releases the leased external clients to avoid a connection leak. @@ -391,6 +399,17 @@ export class AiChatService { // concatenated onto the original `system` so the persona is preserved. prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), abortSignal: signal, + onChunk: ({ chunk }) => { + // 'text-delta' is the assistant's prose; tool-call args are separate chunk + // types — so this mirrors exactly what streams to the client. + if (chunk.type === 'text-delta') inProgressText += chunk.text; + }, + onStepFinish: (step) => { + // The finished step's full text is now in `step.text`; fold it in and reset + // the in-progress accumulator for the next step. + capturedSteps.push(step as StepLike); + inProgressText = ''; + }, onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => { await persistAssistant({ text, @@ -421,31 +440,33 @@ export class AiChatService { const e = error as { stack?: string }; const errorText = describeProviderError(error, String(error)); this.logger.error(`AI chat stream error: ${errorText}`, e?.stack); - // Persist whatever text we have (likely empty) so the turn is recorded, - // and record the error text in metadata so it is visible in history. - await persistAssistant(buildErrorAssistantRecord(errorText)); + // Persist the PARTIAL answer streamed before the failure (text + any + // finished tool steps) WITH the error in metadata, so the turn shows what + // the user already saw plus the cause — not just a bare error. + await persistAssistant( + buildPartialAssistantRecord( + capturedSteps, + inProgressText, + 'error', + errorText, + ), + ); await closeExternalClients(); }, onAbort: async ({ steps }) => { - // Client disconnected / request aborted: persist the partial answer, - // including any completed tool steps so the turn replays faithfully. - const text = steps.map((s) => s.text ?? '').join(''); - // Unlike onError/onFinish, this terminal path otherwise writes nothing, - // so an aborted turn (client disconnect / proxy drop / stop()) would be - // invisible in the logs. Log it (warn) so the abort is traceable, with - // the step count and how much partial text was produced before the cut. + const partialChars = + capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) + + inProgressText.length; + // Unlike onError/onFinish, this terminal path otherwise writes nothing, so + // an aborted turn (client disconnect / proxy drop / stop()) would be + // invisible in the logs. Log it (warn) so the abort is traceable. this.logger.warn( `AI chat stream aborted (chat ${chatId}) after ${steps.length} ` + - `step(s), ${text.length} chars partial text; persisting partial turn.`, + `step(s), ${partialChars} chars partial text; persisting partial turn.`, + ); + await persistAssistant( + buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'), ); - await persistAssistant({ - text, - toolCalls: serializeSteps(steps), - metadata: { - finishReason: 'aborted', - parts: assistantParts(steps, text), - }, - }); await closeExternalClients(); }, }); @@ -754,22 +775,38 @@ export function rowToUiMessage(row: AiChatMessage): Omit & { } /** - * Build the assistant-message record persisted when a turn fails before any text - * is produced (the streamText onError path). Pure: it takes the formatted error - * text and returns the exact `{ text, toolCalls, metadata }` payload handed to - * persistAssistant, so the first-turn-failure recording shape is unit-testable - * without seaming streamText. The empty text + empty parts mean the failed turn - * is still recorded in history, with the provider cause visible in metadata. + * Build the assistant-message record persisted on a partial/failed turn (the + * streamText onError / onAbort paths). Captures the partial answer the user + * already saw: each finished step's text + tool parts (via assistantParts), + * then the in-progress step's text appended last. When `errorText` is provided + * it is recorded in metadata.error so the cause shows in history; an aborted + * turn passes none. Pure, so the partial-recording shape is unit-testable + * without seaming streamText. */ -export function buildErrorAssistantRecord(errorText: string): { - text: string; - toolCalls: null; - metadata: { finishReason: 'error'; parts: []; error: string }; -} { +export function buildPartialAssistantRecord( + steps: ReadonlyArray | undefined, + inProgressText: string, + finishReason: 'error' | 'aborted', + errorText?: string, +): { text: string; toolCalls: unknown; metadata: Record } { + const finished = steps ?? []; + const stepsText = finished.map((s) => s.text ?? '').join(''); + const trailing = inProgressText ?? ''; + // assistantParts emits text parts only for FINISHED steps; append the + // in-progress step's text (the answer cut off by the error) as the last text + // part so the persisted parts match what streamed to the client. + const parts = assistantParts(finished, '') as unknown as Array< + Record + >; + if (trailing) parts.push({ type: 'text', text: trailing }); return { - text: '', - toolCalls: null, - metadata: { finishReason: 'error', parts: [], error: errorText }, + text: stepsText + trailing, + toolCalls: serializeSteps(finished), + metadata: { + finishReason, + parts: parts as unknown as UIMessage['parts'], + ...(errorText ? { error: errorText } : {}), + }, }; }