feat(ai-chat): persistent history as source of truth — step durability + server export (#183)
The chat lived in inconsistent paradigms (in-memory stream + client export vs.
DB-as-context), which made export flaky and lost the assistant answer if the
process died mid-turn. Make the DB the single source of truth.
A. STEP-GRANULAR DURABILITY (server)
- ai_chat_messages gains a nullable `status` column (migration; NULL = legacy =
completed). The assistant row is now INSERTED UPFRONT as `status:'streaming'`
and UPDATEd on every onStepFinish with all finished steps (text + tool calls +
tool RESULTS), then finalized once to completed/error/aborted on the terminal
callback. So a process death mid-turn keeps every finished step; a startup
sweep (OnModuleInit → sweepStreaming) flips any dangling 'streaming' row to
'aborted'. The write path no longer depends on a live socket.
- Pure exported `flushAssistant(steps, inProgressText, status, extra?)` builds
the persist payload (metadata.parts byte-identical to the old builder), so a
future background worker can call the same path. AiChatMessageRepo gains
`update`, `sweepStreaming`, and `findAllByChat`.
- consumeStream drain, external-MCP client close-once, SSE heartbeat preserved.
B. SERVER-SIDE EXPORT
- New pure `chat-markdown.util.ts` renders Markdown from DB rows ONLY (server
port of the client builder). Because A persists the in-progress row, the
export now includes an interrupted turn up to its last finished step (flagged
"still generating"). `POST /ai-chat/export` (owner-gated via assertOwnedChat,
workspace-scoped) returns it; `lang` accepts a full client locale tag
('en-US'/'ru-RU') and is normalized server-side (normalizeLang) — a strict
@IsIn(['en','ru']) DTO rejected the real client's i18n.language with a 400,
caught in real-browser testing.
- Client: handleCopy calls the endpoint; `canExport = !!activeChatId`. The whole
liveThreadRef/liveStateRef/onLiveContentChange/hasLiveContent hybrid (and the
client chat-markdown util + test) is removed — the server is now authoritative.
Tests: flushAssistant unit (status shapes + parts parity), chat-markdown.util
unit (incl. legacy NULL-status + interrupted note + ru + normalizeLang locale
tags), controller export wiring + owner-gate, integration update/sweepStreaming.
Verified: server build + 318 ai-chat unit + 3 integration; client tsc + 157
ai-chat unit; and END-TO-END in a real browser — a chat turn persists mid-stream
and the Copy button exports the DB-sourced markdown (showing the in-progress
row), HTTP 200 after the locale fix.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,9 @@
|
||||
import { ForbiddenException, Injectable, Logger } from '@nestjs/common';
|
||||
import {
|
||||
ForbiddenException,
|
||||
Injectable,
|
||||
Logger,
|
||||
OnModuleInit,
|
||||
} from '@nestjs/common';
|
||||
import { FastifyReply } from 'fastify';
|
||||
import {
|
||||
streamText,
|
||||
@@ -60,7 +65,10 @@ export function prepareAgentStep(
|
||||
system: string,
|
||||
): { toolChoice: 'none'; system: string } | undefined {
|
||||
if (stepNumber >= MAX_AGENT_STEPS - 1) {
|
||||
return { toolChoice: 'none', system: `${system}\n\n${FINAL_STEP_INSTRUCTION}` };
|
||||
return {
|
||||
toolChoice: 'none',
|
||||
system: `${system}\n\n${FINAL_STEP_INSTRUCTION}`,
|
||||
};
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
@@ -121,7 +129,7 @@ export interface AiChatStreamArgs {
|
||||
* can be rebuilt for `convertToModelMessages`.
|
||||
*/
|
||||
@Injectable()
|
||||
export class AiChatService {
|
||||
export class AiChatService implements OnModuleInit {
|
||||
private readonly logger = new Logger(AiChatService.name);
|
||||
|
||||
constructor(
|
||||
@@ -136,6 +144,32 @@ export class AiChatService {
|
||||
private readonly pageAccess: PageAccessService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Crash-recovery sweep on server start (#183): any assistant row left in the
|
||||
* 'streaming' state is the relic of a turn whose process died before it
|
||||
* reached a terminal status. Flip those to 'aborted' so history/export show
|
||||
* them settled (with whatever finished steps were already persisted) instead
|
||||
* of perpetually "streaming". Best-effort: a sweep failure is logged but must
|
||||
* never block server startup.
|
||||
*/
|
||||
async onModuleInit(): Promise<void> {
|
||||
try {
|
||||
const swept = await this.aiChatMessageRepo.sweepStreaming();
|
||||
if (swept > 0) {
|
||||
this.logger.log(
|
||||
`Startup sweep: marked ${swept} dangling 'streaming' assistant ` +
|
||||
`message(s) as 'aborted'.`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`Startup sweep of dangling 'streaming' messages failed: ${
|
||||
err instanceof Error ? err.message : 'unknown error'
|
||||
}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the agent role that applies to this stream request, scoped to the
|
||||
* workspace and soft-delete aware. For an EXISTING chat the role is read from
|
||||
@@ -259,9 +293,7 @@ export class AiChatService {
|
||||
content: incomingText,
|
||||
// jsonb column: UIMessage parts are JSON-serializable at runtime but not
|
||||
// structurally `JsonValue`, so cast through unknown.
|
||||
metadata: (incoming?.parts
|
||||
? { parts: incoming.parts }
|
||||
: null) as never,
|
||||
metadata: (incoming?.parts ? { parts: incoming.parts } : null) as never,
|
||||
});
|
||||
|
||||
// Rebuild the conversation from persisted history (not the client payload),
|
||||
@@ -347,31 +379,6 @@ export class AiChatService {
|
||||
);
|
||||
};
|
||||
|
||||
// Persist the assistant message. Used by onFinish (full result) and the
|
||||
// abort/error paths (partial result). Guarded so we persist at most once.
|
||||
let persisted = false;
|
||||
const persistAssistant = async (data: {
|
||||
text: string;
|
||||
toolCalls: unknown;
|
||||
metadata: Record<string, unknown>;
|
||||
}): Promise<void> => {
|
||||
if (persisted) return;
|
||||
persisted = true;
|
||||
try {
|
||||
await this.aiChatMessageRepo.insert({
|
||||
chatId,
|
||||
workspaceId: workspace.id,
|
||||
userId: user.id,
|
||||
role: 'assistant',
|
||||
content: data.text ?? '',
|
||||
toolCalls: (data.toolCalls ?? null) as never,
|
||||
metadata: data.metadata as never,
|
||||
});
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to persist assistant message', err as Error);
|
||||
}
|
||||
};
|
||||
|
||||
// Accumulate the turn's streamed output so a provider error / disconnect can
|
||||
// persist the PARTIAL answer the user already saw — the SDK's onError/onAbort
|
||||
// callbacks don't hand us the in-progress text. `capturedSteps` holds finished
|
||||
@@ -380,6 +387,94 @@ export class AiChatService {
|
||||
const capturedSteps: StepLike[] = [];
|
||||
let inProgressText = '';
|
||||
|
||||
// Step-granular durability (#183): create the assistant row UPFRONT in the
|
||||
// 'streaming' state (before any token), then UPDATE it as each step finishes
|
||||
// and finalize it once on the terminal callback. If the process dies
|
||||
// mid-turn the row survives with every finished step already persisted; the
|
||||
// startup sweep (sweepStreaming) later flips a dangling 'streaming' row to
|
||||
// 'aborted'. The DB is now the single source of truth for the turn — the
|
||||
// socket is never required for the write path. A failed upfront insert is
|
||||
// logged and leaves assistantId undefined; the per-step/terminal updates then
|
||||
// no-op (guarded below) so the turn still streams to the user.
|
||||
let assistantId: string | undefined;
|
||||
try {
|
||||
const seed = flushAssistant([], '', 'streaming');
|
||||
const seeded = await this.aiChatMessageRepo.insert({
|
||||
chatId,
|
||||
workspaceId: workspace.id,
|
||||
userId: user.id,
|
||||
role: 'assistant',
|
||||
content: seed.content,
|
||||
// jsonb columns: cast through never (same as the user insert above).
|
||||
toolCalls: (seed.toolCalls ?? null) as never,
|
||||
metadata: seed.metadata as never,
|
||||
status: seed.status,
|
||||
});
|
||||
assistantId = seeded?.id;
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to insert upfront assistant row', err as Error);
|
||||
}
|
||||
|
||||
// Per-step (non-terminal) update: persist the finished steps the moment a
|
||||
// step ends. Tolerant — a failed update is logged and swallowed so it never
|
||||
// throws into the stream. Keeps status 'streaming'.
|
||||
const updateStreaming = async (): Promise<void> => {
|
||||
if (!assistantId) return;
|
||||
try {
|
||||
await this.aiChatMessageRepo.update(
|
||||
assistantId,
|
||||
workspace.id,
|
||||
flushAssistant(capturedSteps, '', 'streaming'),
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`Failed to update streaming assistant row: ${
|
||||
err instanceof Error ? err.message : 'unknown error'
|
||||
}`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// Terminal finalize: write the completed/error/aborted row exactly once
|
||||
// across the (mutually-exclusive, at-most-once) onFinish/onError/onAbort
|
||||
// callbacks — mirroring the pre-#183 persist-at-most-once guard for the
|
||||
// TERMINAL status (the row may be updated many times with 'streaming' before
|
||||
// this fires once).
|
||||
let finalized = false;
|
||||
const finalizeAssistant = async (
|
||||
flushed: AssistantFlush,
|
||||
): Promise<void> => {
|
||||
if (finalized) return;
|
||||
finalized = true;
|
||||
if (!assistantId) {
|
||||
// The upfront insert failed: fall back to inserting the terminal row so
|
||||
// the turn is not lost entirely.
|
||||
try {
|
||||
await this.aiChatMessageRepo.insert({
|
||||
chatId,
|
||||
workspaceId: workspace.id,
|
||||
userId: user.id,
|
||||
role: 'assistant',
|
||||
content: flushed.content,
|
||||
toolCalls: (flushed.toolCalls ?? null) as never,
|
||||
metadata: flushed.metadata as never,
|
||||
status: flushed.status,
|
||||
});
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
'Failed to persist terminal assistant message',
|
||||
err as Error,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await this.aiChatMessageRepo.update(assistantId, workspace.id, flushed);
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to finalize assistant message', err as Error);
|
||||
}
|
||||
};
|
||||
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure
|
||||
// first-chunk latency, the model-silent gap right before a disconnect, and
|
||||
// how many SSE heartbeats were written, so a Safari drop can be classified
|
||||
@@ -395,144 +490,141 @@ export class AiChatService {
|
||||
let result: ReturnType<typeof streamText>;
|
||||
try {
|
||||
result = streamText({
|
||||
model,
|
||||
system,
|
||||
messages,
|
||||
tools,
|
||||
// No maxOutputTokens cap on the agent: tool-call arguments (e.g. a full
|
||||
// page body for the write tools) are emitted as OUTPUT tokens, so a fixed
|
||||
// cap would truncate complex tool calls mid-argument. Let the model use its
|
||||
// natural per-step budget. (Cost/credit limits are an account concern, not
|
||||
// something to enforce by silently breaking the agent.)
|
||||
stopWhen: stepCountIs(MAX_AGENT_STEPS),
|
||||
// Forced finalization: reserve the LAST allowed step for a text-only
|
||||
// answer. Without this, a turn that spends all its steps on tool calls
|
||||
// ends with no assistant text (an empty turn). prepareAgentStep forbids
|
||||
// further tool calls and appends a synthesis instruction on that step,
|
||||
// concatenated onto the original `system` so the persona is preserved.
|
||||
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
||||
abortSignal: signal,
|
||||
onChunk: ({ chunk }) => {
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model
|
||||
// output chunk means the stream is actively emitting bytes; track first
|
||||
// + most-recent activity timestamps.
|
||||
const now = Date.now();
|
||||
firstModelChunkAt ??= now;
|
||||
lastModelChunkAt = now;
|
||||
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
||||
// types — so this mirrors exactly what streams to the client.
|
||||
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
||||
},
|
||||
onStepFinish: (step) => {
|
||||
// The finished step's full text is now in `step.text`; fold it in and reset
|
||||
// the in-progress accumulator for the next step.
|
||||
capturedSteps.push(step as StepLike);
|
||||
inProgressText = '';
|
||||
},
|
||||
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: success
|
||||
// baseline for Safari comparison.
|
||||
const diagNow = Date.now();
|
||||
this.logger.log(
|
||||
`AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||
`heartbeatsSent=${heartbeatsSent} steps=${steps.length}`,
|
||||
);
|
||||
await persistAssistant({
|
||||
text,
|
||||
toolCalls: serializeSteps(steps),
|
||||
metadata: {
|
||||
finishReason,
|
||||
// Persist the turn's cumulative usage WITH reasoning tokens resolved
|
||||
// from either the new `outputTokenDetails` or the deprecated top-level
|
||||
// field, so reopened history / the Markdown export show the thinking
|
||||
// token cost too.
|
||||
usage: normalizeStreamUsage(totalUsage as StreamUsage) ?? totalUsage,
|
||||
// Final-step usage = the context actually fed to the model on the last LLM
|
||||
// call (full history + tool results) plus the answer it just generated.
|
||||
// input+output of the FINAL step ≈ the conversation's CURRENT context size,
|
||||
// distinct from totalUsage which sums every step (cumulative tokens spent).
|
||||
contextTokens:
|
||||
(usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) || undefined,
|
||||
// Persist the FULL set of UIMessage parts for the turn (text +
|
||||
// tool-call/result), so the rebuilt history replays prior tool
|
||||
// context to the model on later turns.
|
||||
parts: assistantParts(steps, text),
|
||||
},
|
||||
});
|
||||
// Lifecycle: release the external MCP clients leased for this turn.
|
||||
await closeExternalClients();
|
||||
|
||||
// Generate the chat title for a freshly created chat AFTER the stream's
|
||||
// provider call has completed — NOT concurrently with it. The z.ai coding
|
||||
// endpoint stalls one of two concurrent requests to the same plan, which
|
||||
// black-holed the chat stream (~300s headers timeout) when title
|
||||
// generation raced it. Running it here (solo, fire-and-forget) avoids the
|
||||
// race; never block the turn on it, swallow any error.
|
||||
if (isNewChat && incomingText) {
|
||||
void this.generateTitle(chatId, workspace.id, incomingText).catch(
|
||||
(err) => {
|
||||
this.logger.warn(
|
||||
`Title generation failed: ${(err as Error)?.message ?? err}`,
|
||||
);
|
||||
},
|
||||
model,
|
||||
system,
|
||||
messages,
|
||||
tools,
|
||||
// No maxOutputTokens cap on the agent: tool-call arguments (e.g. a full
|
||||
// page body for the write tools) are emitted as OUTPUT tokens, so a fixed
|
||||
// cap would truncate complex tool calls mid-argument. Let the model use its
|
||||
// natural per-step budget. (Cost/credit limits are an account concern, not
|
||||
// something to enforce by silently breaking the agent.)
|
||||
stopWhen: stepCountIs(MAX_AGENT_STEPS),
|
||||
// Forced finalization: reserve the LAST allowed step for a text-only
|
||||
// answer. Without this, a turn that spends all its steps on tool calls
|
||||
// ends with no assistant text (an empty turn). prepareAgentStep forbids
|
||||
// further tool calls and appends a synthesis instruction on that step,
|
||||
// concatenated onto the original `system` so the persona is preserved.
|
||||
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
||||
abortSignal: signal,
|
||||
onChunk: ({ chunk }) => {
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model
|
||||
// output chunk means the stream is actively emitting bytes; track first
|
||||
// + most-recent activity timestamps.
|
||||
const now = Date.now();
|
||||
firstModelChunkAt ??= now;
|
||||
lastModelChunkAt = now;
|
||||
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
||||
// types — so this mirrors exactly what streams to the client.
|
||||
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
||||
},
|
||||
onStepFinish: (step) => {
|
||||
// The finished step's full text is now in `step.text`; fold it in and reset
|
||||
// the in-progress accumulator for the next step.
|
||||
capturedSteps.push(step as StepLike);
|
||||
inProgressText = '';
|
||||
// Step-granular durability (#183): persist this finished step (its text +
|
||||
// tool calls + tool RESULTS) the moment it ends, so a process death after
|
||||
// this point still recovers the step. Fire-and-forget but error-tolerant
|
||||
// (updateStreaming logs + swallows) — never throw into the stream.
|
||||
void updateStreaming();
|
||||
},
|
||||
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: success
|
||||
// baseline for Safari comparison.
|
||||
const diagNow = Date.now();
|
||||
this.logger.log(
|
||||
`AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||
`heartbeatsSent=${heartbeatsSent} steps=${steps.length}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
onError: async ({ error }) => {
|
||||
// NestJS Logger.error(message, stack?, context?): pass the real message
|
||||
// (with statusCode when present) + the stack string, not the Error
|
||||
// object, so the actual provider cause is clearly logged. Reuse the
|
||||
// shared formatter so provider error formatting stays unified.
|
||||
const e = error as { stack?: string };
|
||||
const errorText = describeProviderError(error, String(error));
|
||||
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of
|
||||
// an error-terminated stream.
|
||||
const diagNow = Date.now();
|
||||
this.logger.warn(
|
||||
`AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`,
|
||||
);
|
||||
// Persist the PARTIAL answer streamed before the failure (text + any
|
||||
// finished tool steps) WITH the error in metadata, so the turn shows what
|
||||
// the user already saw plus the cause — not just a bare error.
|
||||
await persistAssistant(
|
||||
buildPartialAssistantRecord(
|
||||
capturedSteps,
|
||||
inProgressText,
|
||||
'error',
|
||||
errorText,
|
||||
),
|
||||
);
|
||||
await closeExternalClients();
|
||||
},
|
||||
onAbort: async ({ steps }) => {
|
||||
const partialChars =
|
||||
capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) +
|
||||
inProgressText.length;
|
||||
// Unlike onError/onFinish, this terminal path otherwise writes nothing, so
|
||||
// an aborted turn (client disconnect / proxy drop / stop()) would be
|
||||
// invisible in the logs. Log it (warn) so the abort is traceable.
|
||||
this.logger.warn(
|
||||
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
||||
`step(s), ${partialChars} chars partial text; persisting partial turn.`,
|
||||
);
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key
|
||||
// line — classifies the Safari drop.
|
||||
const diagNow = Date.now();
|
||||
this.logger.warn(
|
||||
`AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` +
|
||||
`steps=${steps.length}`,
|
||||
);
|
||||
await persistAssistant(
|
||||
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
||||
);
|
||||
await closeExternalClients();
|
||||
},
|
||||
// Finalize the assistant row (#183): the upfront 'streaming' row is
|
||||
// UPDATEd to 'completed' with the turn's final text, cumulative usage and
|
||||
// full UIMessage parts. We pass the SDK `steps` (which carry the final
|
||||
// step's text) as the captured steps so metadata.parts matches the
|
||||
// pre-#183 onFinish record exactly; `inProgressText` is '' here (the last
|
||||
// step already finished). Final-step usage (usage.input+output) ≈ the
|
||||
// conversation's CURRENT context size, distinct from totalUsage.
|
||||
await finalizeAssistant(
|
||||
flushAssistant(steps as StepLike[], '', 'completed', {
|
||||
finishReason: finishReason as string,
|
||||
usage: totalUsage as StreamUsage,
|
||||
contextTokens:
|
||||
(usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) ||
|
||||
undefined,
|
||||
}),
|
||||
);
|
||||
// Lifecycle: release the external MCP clients leased for this turn.
|
||||
await closeExternalClients();
|
||||
|
||||
// Generate the chat title for a freshly created chat AFTER the stream's
|
||||
// provider call has completed — NOT concurrently with it. The z.ai coding
|
||||
// endpoint stalls one of two concurrent requests to the same plan, which
|
||||
// black-holed the chat stream (~300s headers timeout) when title
|
||||
// generation raced it. Running it here (solo, fire-and-forget) avoids the
|
||||
// race; never block the turn on it, swallow any error.
|
||||
if (isNewChat && incomingText) {
|
||||
void this.generateTitle(chatId, workspace.id, incomingText).catch(
|
||||
(err) => {
|
||||
this.logger.warn(
|
||||
`Title generation failed: ${(err as Error)?.message ?? err}`,
|
||||
);
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
onError: async ({ error }) => {
|
||||
// NestJS Logger.error(message, stack?, context?): pass the real message
|
||||
// (with statusCode when present) + the stack string, not the Error
|
||||
// object, so the actual provider cause is clearly logged. Reuse the
|
||||
// shared formatter so provider error formatting stays unified.
|
||||
const e = error as { stack?: string };
|
||||
const errorText = describeProviderError(error, String(error));
|
||||
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of
|
||||
// an error-terminated stream.
|
||||
const diagNow = Date.now();
|
||||
this.logger.warn(
|
||||
`AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`,
|
||||
);
|
||||
// Finalize the PARTIAL answer streamed before the failure (text + any
|
||||
// finished tool steps) WITH the error in metadata, so the turn shows what
|
||||
// the user already saw plus the cause — not just a bare error. Status
|
||||
// 'error' (#183).
|
||||
await finalizeAssistant(
|
||||
flushAssistant(capturedSteps, inProgressText, 'error', {
|
||||
error: errorText,
|
||||
}),
|
||||
);
|
||||
await closeExternalClients();
|
||||
},
|
||||
onAbort: async ({ steps }) => {
|
||||
const partialChars =
|
||||
capturedSteps.reduce((n, s) => n + (s.text?.length ?? 0), 0) +
|
||||
inProgressText.length;
|
||||
// Unlike onError/onFinish, this terminal path otherwise writes nothing, so
|
||||
// an aborted turn (client disconnect / proxy drop / stop()) would be
|
||||
// invisible in the logs. Log it (warn) so the abort is traceable.
|
||||
this.logger.warn(
|
||||
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
||||
`step(s), ${partialChars} chars partial text; persisting partial turn.`,
|
||||
);
|
||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key
|
||||
// line — classifies the Safari drop.
|
||||
const diagNow = Date.now();
|
||||
this.logger.warn(
|
||||
`AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` +
|
||||
`steps=${steps.length}`,
|
||||
);
|
||||
await finalizeAssistant(
|
||||
flushAssistant(capturedSteps, inProgressText, 'aborted'),
|
||||
);
|
||||
await closeExternalClients();
|
||||
},
|
||||
});
|
||||
|
||||
// Drain the stream independently of the client socket so the turn always
|
||||
@@ -652,7 +744,10 @@ export class AiChatService {
|
||||
'punctuation at the end.',
|
||||
prompt: firstMessage.slice(0, 2000),
|
||||
});
|
||||
const title = text.trim().replace(/^["']|["']$/g, '').slice(0, 120);
|
||||
const title = text
|
||||
.trim()
|
||||
.replace(/^["']|["']$/g, '')
|
||||
.slice(0, 120);
|
||||
if (title) {
|
||||
await this.aiChatRepo.update(chatId, { title }, workspaceId);
|
||||
}
|
||||
@@ -974,6 +1069,82 @@ export function rowToUiMessage(row: AiChatMessage): Omit<UIMessage, 'id'> & {
|
||||
return { id: row.id, role, parts: parts as UIMessage['parts'] };
|
||||
}
|
||||
|
||||
/**
|
||||
* The persisted-row patch shape produced by {@link flushAssistant}. It is the
|
||||
* SAME shape the assistant repo insert/update consume (content + toolCalls +
|
||||
* metadata) plus the lifecycle `status` column added in #183.
|
||||
*/
|
||||
export interface AssistantFlush {
|
||||
content: string;
|
||||
toolCalls: unknown;
|
||||
metadata: Record<string, unknown>;
|
||||
status: 'streaming' | 'completed' | 'error' | 'aborted';
|
||||
}
|
||||
|
||||
/**
|
||||
* PURE assistant-row builder (#183 step-granular durability). Given the turn's
|
||||
* accumulated steps + the in-progress (not-yet-finished) text + the lifecycle
|
||||
* status, it returns the row patch to persist. The SAME path runs for the
|
||||
* upfront insert (empty steps, status 'streaming'), every per-step update, and
|
||||
* the terminal finalize (completed/error/aborted) — and a future background
|
||||
* worker can call it identically, so it must stay a pure function of its inputs
|
||||
* (NO `this`, no IO).
|
||||
*
|
||||
* `metadata.parts` is built by the EXACT same logic the old
|
||||
* buildPartialAssistantRecord used (assistantParts over finished steps, then the
|
||||
* in-progress text appended as a trailing text part), so rowToUiMessage /
|
||||
* findRecent keep replaying the turn unchanged. `metadata.finishReason`,
|
||||
* `metadata.error`, `metadata.usage` and `metadata.contextTokens` are attached
|
||||
* only when provided/relevant, matching the pre-#183 onFinish/onError records.
|
||||
*/
|
||||
export function flushAssistant(
|
||||
capturedSteps: ReadonlyArray<StepLike> | undefined,
|
||||
inProgressText: string,
|
||||
status: 'streaming' | 'completed' | 'error' | 'aborted',
|
||||
extra?: {
|
||||
finishReason?: string;
|
||||
usage?: ChatStreamUsage | StreamUsage | undefined;
|
||||
contextTokens?: number;
|
||||
error?: string;
|
||||
},
|
||||
): AssistantFlush {
|
||||
const finished = capturedSteps ?? [];
|
||||
const stepsText = finished.map((s) => s.text ?? '').join('');
|
||||
const trailing = inProgressText ?? '';
|
||||
// assistantParts emits text parts only for FINISHED steps; append the
|
||||
// in-progress step's text (the partial answer cut off by an error/abort, or
|
||||
// simply not yet flushed mid-stream) as the last text part so the persisted
|
||||
// parts match what streamed to the client.
|
||||
const parts = assistantParts(finished, '') as unknown as Array<
|
||||
Record<string, unknown>
|
||||
>;
|
||||
if (trailing) parts.push({ type: 'text', text: trailing });
|
||||
|
||||
const metadata: Record<string, unknown> = {
|
||||
parts: parts as unknown as UIMessage['parts'],
|
||||
};
|
||||
// finishReason: prefer an explicit one; else derive a sensible value from the
|
||||
// terminal status (so onError/onAbort records keep their historical reason).
|
||||
if (extra?.finishReason) {
|
||||
metadata.finishReason = extra.finishReason;
|
||||
} else if (status === 'error' || status === 'aborted') {
|
||||
metadata.finishReason = status;
|
||||
}
|
||||
if (extra?.usage !== undefined) {
|
||||
metadata.usage =
|
||||
normalizeStreamUsage(extra.usage as StreamUsage) ?? extra.usage;
|
||||
}
|
||||
if (extra?.contextTokens) metadata.contextTokens = extra.contextTokens;
|
||||
if (extra?.error) metadata.error = extra.error;
|
||||
|
||||
return {
|
||||
content: stepsText + trailing,
|
||||
toolCalls: serializeSteps(finished),
|
||||
metadata,
|
||||
status,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the assistant-message record persisted on a partial/failed turn (the
|
||||
* streamText onError / onAbort paths). Captures the partial answer the user
|
||||
@@ -982,6 +1153,9 @@ export function rowToUiMessage(row: AiChatMessage): Omit<UIMessage, 'id'> & {
|
||||
* it is recorded in metadata.error so the cause shows in history; an aborted
|
||||
* turn passes none. Pure, so the partial-recording shape is unit-testable
|
||||
* without seaming streamText.
|
||||
*
|
||||
* Thin wrapper over {@link flushAssistant} (retained for the existing unit
|
||||
* tests and its historical `{ text, toolCalls, metadata }` shape).
|
||||
*/
|
||||
export function buildPartialAssistantRecord(
|
||||
steps: ReadonlyArray<StepLike> | undefined,
|
||||
@@ -989,24 +1163,13 @@ export function buildPartialAssistantRecord(
|
||||
finishReason: 'error' | 'aborted',
|
||||
errorText?: string,
|
||||
): { text: string; toolCalls: unknown; metadata: Record<string, unknown> } {
|
||||
const finished = steps ?? [];
|
||||
const stepsText = finished.map((s) => s.text ?? '').join('');
|
||||
const trailing = inProgressText ?? '';
|
||||
// assistantParts emits text parts only for FINISHED steps; append the
|
||||
// in-progress step's text (the answer cut off by the error) as the last text
|
||||
// part so the persisted parts match what streamed to the client.
|
||||
const parts = assistantParts(finished, '') as unknown as Array<
|
||||
Record<string, unknown>
|
||||
>;
|
||||
if (trailing) parts.push({ type: 'text', text: trailing });
|
||||
const flushed = flushAssistant(steps, inProgressText, finishReason, {
|
||||
error: errorText,
|
||||
});
|
||||
return {
|
||||
text: stepsText + trailing,
|
||||
toolCalls: serializeSteps(finished),
|
||||
metadata: {
|
||||
finishReason,
|
||||
parts: parts as unknown as UIMessage['parts'],
|
||||
...(errorText ? { error: errorText } : {}),
|
||||
},
|
||||
text: flushed.content,
|
||||
toolCalls: flushed.toolCalls,
|
||||
metadata: flushed.metadata,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user