From 1b4de2b42095cd6ec465c4f818ff88fa61543a0e Mon Sep 17 00:00:00 2001 From: claude_code Date: Tue, 23 Jun 2026 01:02:55 +0300 Subject: [PATCH] fix(ai-chat): keep SSE stream alive in Safari (heartbeat + strip hop-by-hop headers) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Safari/WebKit dropped the AI chat answer stream mid-turn ("Load failed", shown as "Lost connection to the server") while Chrome/Firefox were fine. Two Safari-specific causes: (1) during model think/tool gaps the UI-message SSE stream emits no bytes and WebKit aborts a non-progressing fetch far more aggressively than Chrome; (2) the AI SDK sets a hop-by-hop `Connection: keep-alive` header which is illegal on HTTP/2 — Chrome/Firefox ignore it, Safari rejects the whole response. Earlier commits only improved the error text, never the drop itself. Add apps/server/src/core/ai-chat/sse-resilience.ts with two helpers wired into both stream paths (authenticated + public share): - startSseHeartbeat: writes a `: ping` SSE comment every 15s (ignored by the client's EventSourceParserStream) so bytes keep flowing; unref'd timer, guarded writes, auto-clear on finish/close. - stripStreamingHopByHopHeaders: wraps writeHead once to drop Connection/ Keep-Alive before the head is sent, so they can never leak into an HTTP/2 response. Add sse-resilience.spec.ts (7 tests). tsc + eslint clean. --- .../src/core/ai-chat/ai-chat.service.ts | 8 + .../core/ai-chat/public-share-chat.service.ts | 8 + .../src/core/ai-chat/sse-resilience.spec.ts | 137 ++++++++++++++++++ .../server/src/core/ai-chat/sse-resilience.ts | 89 ++++++++++++ 4 files changed, 242 insertions(+) create mode 100644 apps/server/src/core/ai-chat/sse-resilience.spec.ts create mode 100644 apps/server/src/core/ai-chat/sse-resilience.ts diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index d44a9abd..457682b8 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -26,6 +26,10 @@ import { AiChatToolsService } from './tools/ai-chat-tools.service'; import { McpClientsService } from './external-mcp/mcp-clients.service'; import { buildSystemPrompt } from './ai-chat.prompt'; import { roleModelOverride } from './roles/role-model-config'; +import { + startSseHeartbeat, + stripStreamingHopByHopHeaders, +} from './sse-resilience'; // Max agent steps per turn. One step = one model generation; a step that calls // tools is followed by another step carrying the tool results. Raised from 8 so @@ -502,6 +506,8 @@ export class AiChatService { // `x-accel-buffering: no` header we send (and additionally set // `proxy_buffering off; proxy_cache off;` for /api/ai-chat/stream); traefik // does not buffer responses by default. + // Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2). + stripStreamingHopByHopHeaders(res.raw); result.pipeUIMessageStreamToResponse(res.raw, { headers: { 'X-Accel-Buffering': 'no' }, onError: (error: unknown) => { @@ -517,6 +523,8 @@ export class AiChatService { // writeHead synchronously above; flushHeaders is a belt-and-braces no-op once // headers are sent, and is guarded for response-likes that lack it. res.raw.flushHeaders?.(); + // Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout). + startSseHeartbeat(res.raw); } catch (err) { // Synchronous failure before/while wiring the stream: the terminal // callbacks will not run, so release the leased external clients here and diff --git a/apps/server/src/core/ai-chat/public-share-chat.service.ts b/apps/server/src/core/ai-chat/public-share-chat.service.ts index a5251739..f2d8f0f8 100644 --- a/apps/server/src/core/ai-chat/public-share-chat.service.ts +++ b/apps/server/src/core/ai-chat/public-share-chat.service.ts @@ -20,6 +20,10 @@ import { createPublicShareWorkspaceLimiter, } from './public-share-workspace-limiter'; import { describeProviderError } from '../../integrations/ai/ai-error.util'; +import { + startSseHeartbeat, + stripStreamingHopByHopHeaders, +} from './sse-resilience'; /** * Loose shape of the anonymous public-share chat POST body. We do NOT bind a @@ -243,6 +247,8 @@ export class PublicShareChatService { // Stream the UI-message protocol straight to the hijacked Node response. // Surface the real provider message (AI SDK error bodies never carry the // API key, so this is safe; we never dump the resolved config). + // Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2). + stripStreamingHopByHopHeaders(res.raw); result.pipeUIMessageStreamToResponse(res.raw, { headers: { 'X-Accel-Buffering': 'no' }, onError: (error: unknown) => { @@ -257,6 +263,8 @@ export class PublicShareChatService { // Force the status line + headers onto the socket now (before the first // token), so the proxy sees the response start immediately. res.raw.flushHeaders?.(); + // Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout). + startSseHeartbeat(res.raw); } catch (err) { // Synchronous failure before/while wiring the stream: re-throw for the // controller to surface on the socket. diff --git a/apps/server/src/core/ai-chat/sse-resilience.spec.ts b/apps/server/src/core/ai-chat/sse-resilience.spec.ts new file mode 100644 index 00000000..fc5590bd --- /dev/null +++ b/apps/server/src/core/ai-chat/sse-resilience.spec.ts @@ -0,0 +1,137 @@ +import type { ServerResponse } from 'node:http'; +import { + startSseHeartbeat, + stripStreamingHopByHopHeaders, +} from './sse-resilience'; + +/** + * Unit tests for the SSE streaming resilience helpers. + * + * startSseHeartbeat keeps a hijacked SSE response progressing during silent + * tool/think gaps by writing an SSE comment line on a timer (Safari/proxy idle + * timeout). stripStreamingHopByHopHeaders scrubs the hop-by-hop + * Connection/Keep-Alive headers the AI SDK adds before the response head is + * written (Safari rejects them over HTTP/2). + */ +describe('startSseHeartbeat', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.useRealTimers(); + }); + + const makeRes = ( + overrides: Partial<{ writableEnded: boolean; destroyed: boolean }> = {}, + ) => { + const handlers: Record void> = {}; + const res = { + writableEnded: false, + destroyed: false, + write: jest.fn(), + once: jest.fn((event: string, handler: () => void) => { + handlers[event] = handler; + return res; + }), + ...overrides, + }; + return { res, handlers }; + }; + + it('writes an SSE comment ping each interval', () => { + const { res } = makeRes(); + startSseHeartbeat(res as unknown as ServerResponse, 15_000); + + jest.advanceTimersByTime(15_000); + expect(res.write).toHaveBeenCalledTimes(1); + expect(res.write).toHaveBeenLastCalledWith(': ping\n\n'); + + jest.advanceTimersByTime(15_000); + expect(res.write).toHaveBeenCalledTimes(2); + }); + + it('stops pinging after the returned stop() is called', () => { + const { res } = makeRes(); + const stop = startSseHeartbeat(res as unknown as ServerResponse, 15_000); + + jest.advanceTimersByTime(15_000); + expect(res.write).toHaveBeenCalledTimes(1); + + stop(); + jest.advanceTimersByTime(60_000); + expect(res.write).toHaveBeenCalledTimes(1); + }); + + it('stops pinging when the registered finish/close handler fires', () => { + const { res, handlers } = makeRes(); + startSseHeartbeat(res as unknown as ServerResponse, 15_000); + + jest.advanceTimersByTime(15_000); + expect(res.write).toHaveBeenCalledTimes(1); + + // Both 'close' and 'finish' are registered with the same stop handler. + expect(handlers.close).toBeDefined(); + expect(handlers.finish).toBeDefined(); + handlers.finish(); + + jest.advanceTimersByTime(60_000); + expect(res.write).toHaveBeenCalledTimes(1); + }); + + it('does not write when the response is already ended', () => { + const { res } = makeRes({ writableEnded: true }); + startSseHeartbeat(res as unknown as ServerResponse, 15_000); + + jest.advanceTimersByTime(45_000); + expect(res.write).not.toHaveBeenCalled(); + }); + + it('does not write when the socket is destroyed', () => { + const { res } = makeRes({ destroyed: true }); + startSseHeartbeat(res as unknown as ServerResponse, 15_000); + + jest.advanceTimersByTime(45_000); + expect(res.write).not.toHaveBeenCalled(); + }); +}); + +describe('stripStreamingHopByHopHeaders', () => { + it('removes connection/keep-alive headers but keeps the rest', () => { + const writeHead = jest.fn(); + const res = { writeHead } as unknown as ServerResponse; + + stripStreamingHopByHopHeaders(res); + + res.writeHead(200, { + 'content-type': 'text/event-stream', + connection: 'keep-alive', + 'Keep-Alive': 'timeout=5', + 'x-accel-buffering': 'no', + }); + + expect(writeHead).toHaveBeenCalledTimes(1); + const [statusCode, headers] = writeHead.mock.calls[0] as [ + number, + Record, + ]; + expect(statusCode).toBe(200); + expect(headers).not.toHaveProperty('connection'); + expect(headers).not.toHaveProperty('Keep-Alive'); + expect(headers).toEqual({ + 'content-type': 'text/event-stream', + 'x-accel-buffering': 'no', + }); + }); + + it('leaves a header-less writeHead(statusCode) call untouched', () => { + const writeHead = jest.fn(); + const res = { writeHead } as unknown as ServerResponse; + + stripStreamingHopByHopHeaders(res); + res.writeHead(204); + + expect(writeHead).toHaveBeenCalledWith(204); + }); +}); diff --git a/apps/server/src/core/ai-chat/sse-resilience.ts b/apps/server/src/core/ai-chat/sse-resilience.ts new file mode 100644 index 00000000..dbf3d8e4 --- /dev/null +++ b/apps/server/src/core/ai-chat/sse-resilience.ts @@ -0,0 +1,89 @@ +import type { ServerResponse } from 'node:http'; + +/** + * SSE streaming resilience helpers for the hijacked AI-chat responses. + * + * Both AI-chat stream paths (authenticated + public share) hand the AI SDK's + * UI-message stream straight to the raw Node socket via + * pipeUIMessageStreamToResponse. Two Safari/WebKit-specific failure modes break + * that stream where Chrome/Firefox are unaffected; these helpers close both. + */ + +/** + * Keep a hijacked SSE response "making progress" by periodically writing an SSE + * comment line (": ping\n\n") to the raw socket. + * + * Why: while the model is thinking or running tools the UI-message stream emits + * no bytes. WebKit/Safari aborts a streaming fetch that stops making progress + * far more aggressively than Chrome (surfaces in the browser as "Load failed"), + * and reverse proxies time out idle streams as well. A periodic heartbeat keeps + * bytes flowing so neither drops the connection. + * + * A line whose first character is ":" is an SSE comment: the client's + * EventSourceParserStream ignores it, so it never becomes a UI chunk. Each ping + * is a COMPLETE SSE record, so interleaving it with the SDK's own writes cannot + * corrupt an event frame. + * + * Returns a stop() that clears the timer; it is also cleared automatically when + * the response finishes or the socket closes. The interval is unref()'d so it + * never keeps the process alive, and writes are guarded so we never write to an + * already-ended/destroyed socket. + */ +export function startSseHeartbeat( + res: ServerResponse, + intervalMs = 15_000, +): () => void { + const timer = setInterval(() => { + if (res.writableEnded || res.destroyed) return; + try { + res.write(': ping\n\n'); + } catch { + // Socket vanished between the guard and the write; nothing to do. + } + }, intervalMs); + timer.unref?.(); + + const stop = (): void => clearInterval(timer); + res.once('close', stop); + res.once('finish', stop); + return stop; +} + +/** + * Strip the hop-by-hop `Connection` / `Keep-Alive` headers the AI SDK adds to + * its UI-message-stream response (its UI_MESSAGE_STREAM_HEADERS default sets + * `connection: keep-alive`). + * + * Those headers are valid only on an HTTP/1.1 connection. If a reverse proxy + * forwards them verbatim into an HTTP/2 response, Safari/WebKit REJECTS the + * whole response while Chrome and Firefox silently ignore it — the exact + * "works in Chrome, breaks in Safari" symptom. They are hop-by-hop headers the + * application has no business emitting, so we scrub them at the moment the SDK + * writes the response head (after which they can no longer be removed). + * + * Implemented by wrapping writeHead once for this single hijacked response: the + * SDK calls res.writeHead(statusCode, headersObject); we delete any + * connection/keep-alive keys from that object before delegating to the original. + */ +export function stripStreamingHopByHopHeaders(res: ServerResponse): void { + const originalWriteHead = res.writeHead.bind(res) as ( + ...args: unknown[] + ) => ServerResponse; + + ( + res as unknown as { writeHead: (...args: unknown[]) => ServerResponse } + ).writeHead = (...args: unknown[]): ServerResponse => { + for (const arg of args) { + if (arg && typeof arg === 'object' && !Array.isArray(arg)) { + const headers = arg as Record; + for (const key of Object.keys(headers)) { + const lower = key.toLowerCase(); + if (lower === 'connection' || lower === 'keep-alive') { + delete headers[key]; + } + } + } + } + return originalWriteHead(...args); + }; +}