From 13cac155c118b75a3eafcd1d6387c42c813c48ad Mon Sep 17 00:00:00 2001 From: claude_code Date: Wed, 24 Jun 2026 15:14:29 +0300 Subject: [PATCH] chore(ai-chat): add temporary Safari stream-drop diagnostics Investigate the Safari-only "Lost connection to the AI provider" mid-stream disconnect (Chrome unaffected). Pure instrumentation, no behavior change: the 15s heartbeat interval and all stream callbacks are unchanged. - sse-resilience.ts: startSseHeartbeat() gains an optional onBeat hook fired after each successfully written ping (beat counter). - ai-chat.service.ts: track stream start, first-chunk latency, model-silent gap and heartbeat count; log them on finish/error/abort to classify the drop (idle-gap vs hard wall-clock cap vs slow first chunk). - ai-chat.controller.ts: append elapsed-since-request to the disconnect warn. All blocks tagged "DIAGNOSTIC ... temporary" for easy removal once the Safari failure mode is identified. Co-Authored-By: Claude Opus 4.8 --- .../src/core/ai-chat/ai-chat.controller.ts | 6 ++- .../src/core/ai-chat/ai-chat.service.ts | 46 ++++++++++++++++++- .../server/src/core/ai-chat/sse-resilience.ts | 9 ++++ 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.ts b/apps/server/src/core/ai-chat/ai-chat.controller.ts index 0870969e..a8ddccb1 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.ts @@ -159,6 +159,9 @@ export class AiChatController { // we also drop it on response `finish` so it never lingers after the stream // completes normally (the AI SDK pipes the response fire-and-forget, so we // cannot simply remove it once `stream()` returns). + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: wall-clock at + // which a Safari disconnect is observed, measured from request receipt. + const reqStartedAt = Date.now(); const controller = new AbortController(); const onClose = (): void => { // A genuine disconnect leaves the response unfinished (unlike a normal @@ -167,7 +170,8 @@ export class AiChatController { // so log it here before aborting the agent loop. if (!res.raw.writableEnded) { this.logger.warn( - 'AI chat stream: client disconnected before completion; aborting turn', + `AI chat stream: client disconnected before completion; aborting turn ` + + `(elapsed=${Date.now() - reqStartedAt}ms since request received)`, ); controller.abort(); } diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index 91cb64af..1cce9cf3 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -380,6 +380,15 @@ export class AiChatService { const capturedSteps: StepLike[] = []; let inProgressText = ''; + // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure + // first-chunk latency, the model-silent gap right before a disconnect, and + // how many SSE heartbeats were written, so a Safari drop can be classified + // (idle-gap vs hard wall-clock cap vs slow first chunk). + const streamStartedAt = Date.now(); + let firstModelChunkAt: number | undefined; + let lastModelChunkAt = streamStartedAt; + let heartbeatsSent = 0; + // NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous // failure here (or in pipe below) would skip the terminal callbacks, so the // catch releases the leased external clients to avoid a connection leak. @@ -404,6 +413,12 @@ export class AiChatService { prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), abortSignal: signal, onChunk: ({ chunk }) => { + // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Any model + // output chunk means the stream is actively emitting bytes; track first + // + most-recent activity timestamps. + const now = Date.now(); + firstModelChunkAt ??= now; + lastModelChunkAt = now; // 'text-delta' is the assistant's prose; tool-call args are separate chunk // types — so this mirrors exactly what streams to the client. if (chunk.type === 'text-delta') inProgressText += chunk.text; @@ -415,6 +430,14 @@ export class AiChatService { inProgressText = ''; }, onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => { + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: success + // baseline for Safari comparison. + const diagNow = Date.now(); + this.logger.log( + `AI chat stream DIAGNOSTIC (finish): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `heartbeatsSent=${heartbeatsSent} steps=${steps.length}`, + ); await persistAssistant({ text, toolCalls: serializeSteps(steps), @@ -464,6 +487,14 @@ export class AiChatService { const e = error as { stack?: string }; const errorText = describeProviderError(error, String(error)); this.logger.error(`AI chat stream error: ${errorText}`, e?.stack); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: timing of + // an error-terminated stream. + const diagNow = Date.now(); + this.logger.warn( + `AI chat stream DIAGNOSTIC (error): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`, + ); // Persist the PARTIAL answer streamed before the failure (text + any // finished tool steps) WITH the error in metadata, so the turn shows what // the user already saw plus the cause — not just a bare error. @@ -488,6 +519,15 @@ export class AiChatService { `AI chat stream aborted (chat ${chatId}) after ${steps.length} ` + `step(s), ${partialChars} chars partial text; persisting partial turn.`, ); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: THE key + // line — classifies the Safari drop. + const diagNow = Date.now(); + this.logger.warn( + `AI chat stream DIAGNOSTIC (abort/disconnect): elapsed=${diagNow - streamStartedAt}ms ` + + `firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` + + `silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` + + `steps=${steps.length}`, + ); await persistAssistant( buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'), ); @@ -566,7 +606,11 @@ export class AiChatService { // headers are sent, and is guarded for response-likes that lack it. res.raw.flushHeaders?.(); // Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout). - startSseHeartbeat(res.raw); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary: count beats so a disconnect log can show + // how many pings were written before Safari dropped. + startSseHeartbeat(res.raw, 15_000, () => { + heartbeatsSent += 1; + }); } catch (err) { // Synchronous failure before/while wiring the stream: the terminal // callbacks will not run, so release the leased external clients here and diff --git a/apps/server/src/core/ai-chat/sse-resilience.ts b/apps/server/src/core/ai-chat/sse-resilience.ts index dbf3d8e4..826aff9d 100644 --- a/apps/server/src/core/ai-chat/sse-resilience.ts +++ b/apps/server/src/core/ai-chat/sse-resilience.ts @@ -28,15 +28,24 @@ import type { ServerResponse } from 'node:http'; * the response finishes or the socket closes. The interval is unref()'d so it * never keeps the process alive, and writes are guarded so we never write to an * already-ended/destroyed socket. + * + * `onBeat` is an OPTIONAL diagnostic hook invoked once after each heartbeat that + * was actually written (only when the write did not throw). It is purely for + * telemetry/counters and never affects the heartbeat behavior. */ export function startSseHeartbeat( res: ServerResponse, intervalMs = 15_000, + onBeat?: () => void, ): () => void { const timer = setInterval(() => { if (res.writableEnded || res.destroyed) return; try { res.write(': ping\n\n'); + // DIAGNOSTIC (Safari stream-drop investigation) — temporary. Notify the + // optional hook only after a successful write, so beat counters reflect + // pings that actually reached the socket. + onBeat?.(); } catch { // Socket vanished between the guard and the write; nothing to do. }