chore(ai-chat): add temporary Safari stream-drop diagnostics
Investigate the Safari-only "Lost connection to the AI provider" mid-stream disconnect (Chrome unaffected). Pure instrumentation, no behavior change: the 15s heartbeat interval and all stream callbacks are unchanged. - sse-resilience.ts: startSseHeartbeat() gains an optional onBeat hook fired after each successfully written ping (beat counter). - ai-chat.service.ts: track stream start, first-chunk latency, model-silent gap and heartbeat count; log them on finish/error/abort to classify the drop (idle-gap vs hard wall-clock cap vs slow first chunk). - ai-chat.controller.ts: append elapsed-since-request to the disconnect warn. All blocks tagged "DIAGNOSTIC ... temporary" for easy removal once the Safari failure mode is identified. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -159,6 +159,9 @@ export class AiChatController {
|
|||||||
// we also drop it on response `finish` so it never lingers after the stream
|
// we also drop it on response `finish` so it never lingers after the stream
|
||||||
// completes normally (the AI SDK pipes the response fire-and-forget, so we
|
// completes normally (the AI SDK pipes the response fire-and-forget, so we
|
||||||
// cannot simply remove it once `stream()` returns).
|
// cannot simply remove it once `stream()` returns).
|
||||||
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: wall-clock at
|
||||||
|
// which a Safari disconnect is observed, measured from request receipt.
|
||||||
|
const reqStartedAt = Date.now();
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
const onClose = (): void => {
|
const onClose = (): void => {
|
||||||
// A genuine disconnect leaves the response unfinished (unlike a normal
|
// A genuine disconnect leaves the response unfinished (unlike a normal
|
||||||
@@ -167,7 +170,8 @@ export class AiChatController {
|
|||||||
// so log it here before aborting the agent loop.
|
// so log it here before aborting the agent loop.
|
||||||
if (!res.raw.writableEnded) {
|
if (!res.raw.writableEnded) {
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
'AI chat stream: client disconnected before completion; aborting turn',
|
`AI chat stream: client disconnected before completion; aborting turn ` +
|
||||||
|
`(elapsed=${Date.now() - reqStartedAt}ms since request received)`,
|
||||||
);
|
);
|
||||||
controller.abort();
|
controller.abort();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -380,6 +380,15 @@ export class AiChatService {
|
|||||||
const capturedSteps: StepLike[] = [];
|
const capturedSteps: StepLike[] = [];
|
||||||
let inProgressText = '';
|
let inProgressText = '';
|
||||||
|
|
||||||
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure
|
||||||
|
// first-chunk latency, the model-silent gap right before a disconnect, and
|
||||||
|
// how many SSE heartbeats were written, so a Safari drop can be classified
|
||||||
|
// (idle-gap vs hard wall-clock cap vs slow first chunk).
|
||||||
|
const streamStartedAt = Date.now();
|
||||||
|
let firstModelChunkAt: number | undefined;
|
||||||
|
let lastModelChunkAt = streamStartedAt;
|
||||||
|
let heartbeatsSent = 0;
|
||||||
|
|
||||||
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
||||||
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
||||||
// catch releases the leased external clients to avoid a connection leak.
|
// catch releases the leased external clients to avoid a connection leak.
|
||||||
@@ -404,6 +413,12 @@ export class AiChatService {
|
|||||||
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
||||||
abortSignal: signal,
|
abortSignal: signal,
|
||||||
onChunk: ({ chunk }) => {
|
onChunk: ({ chunk }) => {
|
||||||
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model
|
||||||
|
// output chunk means the stream is actively emitting bytes; track first
|
||||||
|
// + most-recent activity timestamps.
|
||||||
|
const now = Date.now();
|
||||||
|
firstModelChunkAt ??= now;
|
||||||
|
lastModelChunkAt = now;
|
||||||
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
||||||
// types — so this mirrors exactly what streams to the client.
|
// types — so this mirrors exactly what streams to the client.
|
||||||
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
||||||
@@ -415,6 +430,14 @@ export class AiChatService {
|
|||||||
inProgressText = '';
|
inProgressText = '';
|
||||||
},
|
},
|
||||||
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
||||||
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: success
|
||||||
|
// baseline for Safari comparison.
|
||||||
|
const diagNow = Date.now();
|
||||||
|
this.logger.log(
|
||||||
|
`AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||||
|
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||||
|
`heartbeatsSent=${heartbeatsSent} steps=${steps.length}`,
|
||||||
|
);
|
||||||
await persistAssistant({
|
await persistAssistant({
|
||||||
text,
|
text,
|
||||||
toolCalls: serializeSteps(steps),
|
toolCalls: serializeSteps(steps),
|
||||||
@@ -464,6 +487,14 @@ export class AiChatService {
|
|||||||
const e = error as { stack?: string };
|
const e = error as { stack?: string };
|
||||||
const errorText = describeProviderError(error, String(error));
|
const errorText = describeProviderError(error, String(error));
|
||||||
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
||||||
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of
|
||||||
|
// an error-terminated stream.
|
||||||
|
const diagNow = Date.now();
|
||||||
|
this.logger.warn(
|
||||||
|
`AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||||
|
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||||
|
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`,
|
||||||
|
);
|
||||||
// Persist the PARTIAL answer streamed before the failure (text + any
|
// Persist the PARTIAL answer streamed before the failure (text + any
|
||||||
// finished tool steps) WITH the error in metadata, so the turn shows what
|
// finished tool steps) WITH the error in metadata, so the turn shows what
|
||||||
// the user already saw plus the cause — not just a bare error.
|
// the user already saw plus the cause — not just a bare error.
|
||||||
@@ -488,6 +519,15 @@ export class AiChatService {
|
|||||||
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
||||||
`step(s), ${partialChars} chars partial text; persisting partial turn.`,
|
`step(s), ${partialChars} chars partial text; persisting partial turn.`,
|
||||||
);
|
);
|
||||||
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key
|
||||||
|
// line — classifies the Safari drop.
|
||||||
|
const diagNow = Date.now();
|
||||||
|
this.logger.warn(
|
||||||
|
`AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` +
|
||||||
|
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||||
|
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` +
|
||||||
|
`steps=${steps.length}`,
|
||||||
|
);
|
||||||
await persistAssistant(
|
await persistAssistant(
|
||||||
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
||||||
);
|
);
|
||||||
@@ -566,7 +606,11 @@ export class AiChatService {
|
|||||||
// headers are sent, and is guarded for response-likes that lack it.
|
// headers are sent, and is guarded for response-likes that lack it.
|
||||||
res.raw.flushHeaders?.();
|
res.raw.flushHeaders?.();
|
||||||
// Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout).
|
// Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout).
|
||||||
startSseHeartbeat(res.raw);
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: count beats so a disconnect log can show
|
||||||
|
// how many pings were written before Safari dropped.
|
||||||
|
startSseHeartbeat(res.raw, 15_000, () => {
|
||||||
|
heartbeatsSent += 1;
|
||||||
|
});
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// Synchronous failure before/while wiring the stream: the terminal
|
// Synchronous failure before/while wiring the stream: the terminal
|
||||||
// callbacks will not run, so release the leased external clients here and
|
// callbacks will not run, so release the leased external clients here and
|
||||||
|
|||||||
@@ -28,15 +28,24 @@ import type { ServerResponse } from 'node:http';
|
|||||||
* the response finishes or the socket closes. The interval is unref()'d so it
|
* the response finishes or the socket closes. The interval is unref()'d so it
|
||||||
* never keeps the process alive, and writes are guarded so we never write to an
|
* never keeps the process alive, and writes are guarded so we never write to an
|
||||||
* already-ended/destroyed socket.
|
* already-ended/destroyed socket.
|
||||||
|
*
|
||||||
|
* `onBeat` is an OPTIONAL diagnostic hook invoked once after each heartbeat that
|
||||||
|
* was actually written (only when the write did not throw). It is purely for
|
||||||
|
* telemetry/counters and never affects the heartbeat behavior.
|
||||||
*/
|
*/
|
||||||
export function startSseHeartbeat(
|
export function startSseHeartbeat(
|
||||||
res: ServerResponse,
|
res: ServerResponse,
|
||||||
intervalMs = 15_000,
|
intervalMs = 15_000,
|
||||||
|
onBeat?: () => void,
|
||||||
): () => void {
|
): () => void {
|
||||||
const timer = setInterval(() => {
|
const timer = setInterval(() => {
|
||||||
if (res.writableEnded || res.destroyed) return;
|
if (res.writableEnded || res.destroyed) return;
|
||||||
try {
|
try {
|
||||||
res.write(': ping\n\n');
|
res.write(': ping\n\n');
|
||||||
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Notify the
|
||||||
|
// optional hook only after a successful write, so beat counters reflect
|
||||||
|
// pings that actually reached the socket.
|
||||||
|
onBeat?.();
|
||||||
} catch {
|
} catch {
|
||||||
// Socket vanished between the guard and the write; nothing to do.
|
// Socket vanished between the guard and the write; nothing to do.
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user