fix(ai-chat): persist partial answer when a turn errors mid-stream
A provider error (e.g. read ECONNRESET) routed the turn through the
streamText onError callback, which persisted an EMPTY assistant record
(buildErrorAssistantRecord -> text:'', parts:[]). The answer text already
streamed to and shown by the client was therefore lost from the persisted
row, the chat export, and reopened history — leaving only the error line.
The AI SDK v6 onError callback receives only { error } (no steps/text),
and the visible final answer streams in the last, not-yet-finished step,
so it is absent from every finished step.text. Accumulate it ourselves:
onChunk folds each 'text-delta' into inProgressText; onStepFinish moves a
finished step into capturedSteps and resets inProgressText. onError and
onAbort now persist the partial answer (finished steps' text + tool parts
via assistantParts, then the in-progress text appended last) through a new
shared pure helper buildPartialAssistantRecord, recording the cause in
metadata.error on the error path. Replaces buildErrorAssistantRecord; its
empty-turn shape is preserved when nothing streamed.
Complementary to the resilient-fetch reconnect: that reduces how often a
turn dies; this preserves what was produced when it dies anyway.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -4,7 +4,7 @@ import {
|
||||
serializeSteps,
|
||||
rowToUiMessage,
|
||||
prepareAgentStep,
|
||||
buildErrorAssistantRecord,
|
||||
buildPartialAssistantRecord,
|
||||
MAX_AGENT_STEPS,
|
||||
FINAL_STEP_INSTRUCTION,
|
||||
} from './ai-chat.service';
|
||||
@@ -232,16 +232,19 @@ describe('prepareAgentStep', () => {
|
||||
});
|
||||
|
||||
/**
|
||||
* Unit test for buildErrorAssistantRecord: the pure helper that shapes the
|
||||
* assistant-message record persisted on a first-turn (or any) stream failure.
|
||||
* The streamText onError callback builds the formatted error text via
|
||||
* describeProviderError (tested separately) and hands it to this helper; pinning
|
||||
* the record shape here covers the persist-assistant-on-error logic without
|
||||
* having to seam streamText itself.
|
||||
* Unit test for buildPartialAssistantRecord: the pure helper that shapes the
|
||||
* assistant-message record persisted on a partial/failed turn (the streamText
|
||||
* onError / onAbort paths). It captures the PARTIAL answer the user already saw
|
||||
* (finished steps' text + tool parts, plus the in-progress step's text) so a
|
||||
* provider error / disconnect no longer throws the streamed answer away. Pinning
|
||||
* the record shape here covers the persist-partial logic without seaming
|
||||
* streamText itself.
|
||||
*/
|
||||
describe('buildErrorAssistantRecord', () => {
|
||||
it('records an empty turn with the error text in metadata (finishReason=error)', () => {
|
||||
const rec = buildErrorAssistantRecord('401: Unauthorized');
|
||||
describe('buildPartialAssistantRecord', () => {
|
||||
type AnyPart = Record<string, unknown>;
|
||||
|
||||
it('records an empty turn with the error text (preserves old behavior)', () => {
|
||||
const rec = buildPartialAssistantRecord([], '', 'error', '401: Unauthorized');
|
||||
expect(rec).toEqual({
|
||||
text: '',
|
||||
toolCalls: null,
|
||||
@@ -249,13 +252,46 @@ describe('buildErrorAssistantRecord', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('always produces empty text + empty parts so a failed turn is still recorded', () => {
|
||||
const rec = buildErrorAssistantRecord('boom');
|
||||
// No partial text and no UI parts: the turn exists in history but renders as
|
||||
// an error, with the cause preserved in metadata.error.
|
||||
expect(rec.text).toBe('');
|
||||
expect(rec.metadata.parts).toEqual([]);
|
||||
expect(rec.toolCalls).toBeNull();
|
||||
it('persists in-progress text (no finished steps) as the partial answer', () => {
|
||||
const rec = buildPartialAssistantRecord([], 'partial answer', 'error', 'boom');
|
||||
expect(rec.text).toBe('partial answer');
|
||||
expect(rec.metadata.parts).toEqual([
|
||||
{ type: 'text', text: 'partial answer' },
|
||||
]);
|
||||
expect(rec.metadata.error).toBe('boom');
|
||||
});
|
||||
|
||||
it('combines a finished tool step with trailing in-progress text', () => {
|
||||
const steps = [
|
||||
{
|
||||
text: 'looked it up',
|
||||
toolCalls: [
|
||||
{ toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } },
|
||||
],
|
||||
toolResults: [
|
||||
{ toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } },
|
||||
],
|
||||
},
|
||||
];
|
||||
const rec = buildPartialAssistantRecord(steps, ' and then', 'error', 'boom');
|
||||
const parts = rec.metadata.parts as AnyPart[];
|
||||
// The finished step's text part is present.
|
||||
expect(parts).toContainEqual({ type: 'text', text: 'looked it up' });
|
||||
// The paired tool call+result becomes an output-available part.
|
||||
const toolPart = parts.find((p) => p.type === 'tool-getPage');
|
||||
expect(toolPart).toBeDefined();
|
||||
expect(toolPart!.state).toBe('output-available');
|
||||
// The in-progress text is appended LAST so the parts match the stream order.
|
||||
expect(parts[parts.length - 1]).toEqual({ type: 'text', text: ' and then' });
|
||||
expect(rec.text).toBe('looked it up and then');
|
||||
expect(rec.toolCalls).not.toBeNull();
|
||||
expect(rec.metadata.error).toBe('boom');
|
||||
});
|
||||
|
||||
it('omits the error key on the abort path (no errorText)', () => {
|
||||
const rec = buildPartialAssistantRecord([], 'half', 'aborted');
|
||||
expect(rec.metadata.finishReason).toBe('aborted');
|
||||
expect('error' in rec.metadata).toBe(false);
|
||||
expect(rec.text).toBe('half');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -368,6 +368,14 @@ export class AiChatService {
|
||||
}
|
||||
};
|
||||
|
||||
// Accumulate the turn's streamed output so a provider error / disconnect can
|
||||
// persist the PARTIAL answer the user already saw — the SDK's onError/onAbort
|
||||
// callbacks don't hand us the in-progress text. `capturedSteps` holds finished
|
||||
// steps (tool calls + their text); `inProgressText` holds the text streamed in
|
||||
// the CURRENT, not-yet-finished step, reset whenever a step finishes.
|
||||
const capturedSteps: StepLike[] = [];
|
||||
let inProgressText = '';
|
||||
|
||||
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
||||
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
||||
// catch releases the leased external clients to avoid a connection leak.
|
||||
@@ -391,6 +399,17 @@ export class AiChatService {
|
||||
// concatenated onto the original `system` so the persona is preserved.
|
||||
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
||||
abortSignal: signal,
|
||||
onChunk: ({ chunk }) => {
|
||||
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
||||
// types — so this mirrors exactly what streams to the client.
|
||||
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
||||
},
|
||||
onStepFinish: (step) => {
|
||||
// The finished step's full text is now in `step.text`; fold it in and reset
|
||||
// the in-progress accumulator for the next step.
|
||||
capturedSteps.push(step as StepLike);
|
||||
inProgressText = '';
|
||||
},
|
||||
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
||||
await persistAssistant({
|
||||
text,
|
||||
@@ -421,31 +440,33 @@ export class AiChatService {
|
||||
const e = error as { stack?: string };
|
||||
const errorText = describeProviderError(error, String(error));
|
||||
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
||||
// Persist whatever text we have (likely empty) so the turn is recorded,
|
||||
// and record the error text in metadata so it is visible in history.
|
||||
await persistAssistant(buildErrorAssistantRecord(errorText));
|
||||
// Persist the PARTIAL answer streamed before the failure (text + any
|
||||
// finished tool steps) WITH the error in metadata, so the turn shows what
|
||||
// the user already saw plus the cause — not just a bare error.
|
||||
await persistAssistant(
|
||||
buildPartialAssistantRecord(
|
||||
capturedSteps,
|
||||
inProgressText,
|
||||
'error',
|
||||
errorText,
|
||||
),
|
||||
);
|
||||
await closeExternalClients();
|
||||
},
|
||||
onAbort: async ({ steps }) => {
|
||||
// Client disconnected / request aborted: persist the partial answer,
|
||||
// including any completed tool steps so the turn replays faithfully.
|
||||
const text = steps.map((s) => s.text ?? '').join('');
|
||||
// Unlike onError/onFinish, this terminal path otherwise writes nothing,
|
||||
// so an aborted turn (client disconnect / proxy drop / stop()) would be
|
||||
// invisible in the logs. Log it (warn) so the abort is traceable, with
|
||||
// the step count and how much partial text was produced before the cut.
|
||||
const partialChars =
|
||||
capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) +
|
||||
inProgressText.length;
|
||||
// Unlike onError/onFinish, this terminal path otherwise writes nothing, so
|
||||
// an aborted turn (client disconnect / proxy drop / stop()) would be
|
||||
// invisible in the logs. Log it (warn) so the abort is traceable.
|
||||
this.logger.warn(
|
||||
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
||||
`step(s), ${text.length} chars partial text; persisting partial turn.`,
|
||||
`step(s), ${partialChars} chars partial text; persisting partial turn.`,
|
||||
);
|
||||
await persistAssistant(
|
||||
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
||||
);
|
||||
await persistAssistant({
|
||||
text,
|
||||
toolCalls: serializeSteps(steps),
|
||||
metadata: {
|
||||
finishReason: 'aborted',
|
||||
parts: assistantParts(steps, text),
|
||||
},
|
||||
});
|
||||
await closeExternalClients();
|
||||
},
|
||||
});
|
||||
@@ -754,22 +775,38 @@ export function rowToUiMessage(row: AiChatMessage): Omit<UIMessage, 'id'> & {
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the assistant-message record persisted when a turn fails before any text
|
||||
* is produced (the streamText onError path). Pure: it takes the formatted error
|
||||
* text and returns the exact `{ text, toolCalls, metadata }` payload handed to
|
||||
* persistAssistant, so the first-turn-failure recording shape is unit-testable
|
||||
* without seaming streamText. The empty text + empty parts mean the failed turn
|
||||
* is still recorded in history, with the provider cause visible in metadata.
|
||||
* Build the assistant-message record persisted on a partial/failed turn (the
|
||||
* streamText onError / onAbort paths). Captures the partial answer the user
|
||||
* already saw: each finished step's text + tool parts (via assistantParts),
|
||||
* then the in-progress step's text appended last. When `errorText` is provided
|
||||
* it is recorded in metadata.error so the cause shows in history; an aborted
|
||||
* turn passes none. Pure, so the partial-recording shape is unit-testable
|
||||
* without seaming streamText.
|
||||
*/
|
||||
export function buildErrorAssistantRecord(errorText: string): {
|
||||
text: string;
|
||||
toolCalls: null;
|
||||
metadata: { finishReason: 'error'; parts: []; error: string };
|
||||
} {
|
||||
export function buildPartialAssistantRecord(
|
||||
steps: ReadonlyArray<StepLike> | undefined,
|
||||
inProgressText: string,
|
||||
finishReason: 'error' | 'aborted',
|
||||
errorText?: string,
|
||||
): { text: string; toolCalls: unknown; metadata: Record<string, unknown> } {
|
||||
const finished = steps ?? [];
|
||||
const stepsText = finished.map((s) => s.text ?? '').join('');
|
||||
const trailing = inProgressText ?? '';
|
||||
// assistantParts emits text parts only for FINISHED steps; append the
|
||||
// in-progress step's text (the answer cut off by the error) as the last text
|
||||
// part so the persisted parts match what streamed to the client.
|
||||
const parts = assistantParts(finished, '') as unknown as Array<
|
||||
Record<string, unknown>
|
||||
>;
|
||||
if (trailing) parts.push({ type: 'text', text: trailing });
|
||||
return {
|
||||
text: '',
|
||||
toolCalls: null,
|
||||
metadata: { finishReason: 'error', parts: [], error: errorText },
|
||||
text: stepsText + trailing,
|
||||
toolCalls: serializeSteps(finished),
|
||||
metadata: {
|
||||
finishReason,
|
||||
parts: parts as unknown as UIMessage['parts'],
|
||||
...(errorText ? { error: errorText } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user