fix(ai-chat): keep SSE stream alive in Safari (heartbeat + strip hop-by-hop headers)

Safari/WebKit dropped the AI chat answer stream mid-turn ("Load failed",
shown as "Lost connection to the server") while Chrome/Firefox were fine.
Two Safari-specific causes: (1) during model think/tool gaps the UI-message
SSE stream emits no bytes and WebKit aborts a non-progressing fetch far more
aggressively than Chrome; (2) the AI SDK sets a hop-by-hop `Connection:
keep-alive` header which is illegal on HTTP/2 — Chrome/Firefox ignore it,
Safari rejects the whole response. Earlier commits only improved the error
text, never the drop itself.

Add apps/server/src/core/ai-chat/sse-resilience.ts with two helpers wired into
both stream paths (authenticated + public share):
- startSseHeartbeat: writes a `: ping` SSE comment every 15s (ignored by the
  client's EventSourceParserStream) so bytes keep flowing; unref'd timer,
  guarded writes, auto-clear on finish/close.
- stripStreamingHopByHopHeaders: wraps writeHead once to drop Connection/
  Keep-Alive before the head is sent, so they can never leak into an HTTP/2
  response.
Add sse-resilience.spec.ts (7 tests). tsc + eslint clean.
This commit is contained in:
claude_code
2026-06-23 01:02:55 +03:00
parent 544355a3c8
commit 1b4de2b420
4 changed files with 242 additions and 0 deletions

View File

@@ -26,6 +26,10 @@ import { AiChatToolsService } from './tools/ai-chat-tools.service';
import { McpClientsService } from './external-mcp/mcp-clients.service';
import { buildSystemPrompt } from './ai-chat.prompt';
import { roleModelOverride } from './roles/role-model-config';
import {
startSseHeartbeat,
stripStreamingHopByHopHeaders,
} from './sse-resilience';
// Max agent steps per turn. One step = one model generation; a step that calls
// tools is followed by another step carrying the tool results. Raised from 8 so
@@ -502,6 +506,8 @@ export class AiChatService {
// `x-accel-buffering: no` header we send (and additionally set
// `proxy_buffering off; proxy_cache off;` for /api/ai-chat/stream); traefik
// does not buffer responses by default.
// Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2).
stripStreamingHopByHopHeaders(res.raw);
result.pipeUIMessageStreamToResponse(res.raw, {
headers: { 'X-Accel-Buffering': 'no' },
onError: (error: unknown) => {
@@ -517,6 +523,8 @@ export class AiChatService {
// writeHead synchronously above; flushHeaders is a belt-and-braces no-op once
// headers are sent, and is guarded for response-likes that lack it.
res.raw.flushHeaders?.();
// Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout).
startSseHeartbeat(res.raw);
} catch (err) {
// Synchronous failure before/while wiring the stream: the terminal
// callbacks will not run, so release the leased external clients here and

View File

@@ -20,6 +20,10 @@ import {
createPublicShareWorkspaceLimiter,
} from './public-share-workspace-limiter';
import { describeProviderError } from '../../integrations/ai/ai-error.util';
import {
startSseHeartbeat,
stripStreamingHopByHopHeaders,
} from './sse-resilience';
/**
* Loose shape of the anonymous public-share chat POST body. We do NOT bind a
@@ -243,6 +247,8 @@ export class PublicShareChatService {
// Stream the UI-message protocol straight to the hijacked Node response.
// Surface the real provider message (AI SDK error bodies never carry the
// API key, so this is safe; we never dump the resolved config).
// Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2).
stripStreamingHopByHopHeaders(res.raw);
result.pipeUIMessageStreamToResponse(res.raw, {
headers: { 'X-Accel-Buffering': 'no' },
onError: (error: unknown) => {
@@ -257,6 +263,8 @@ export class PublicShareChatService {
// Force the status line + headers onto the socket now (before the first
// token), so the proxy sees the response start immediately.
res.raw.flushHeaders?.();
// Heartbeat: keep the SSE stream progressing during silent tool/think gaps (Safari/proxy idle timeout).
startSseHeartbeat(res.raw);
} catch (err) {
// Synchronous failure before/while wiring the stream: re-throw for the
// controller to surface on the socket.

View File

@@ -0,0 +1,137 @@
import type { ServerResponse } from 'node:http';
import {
startSseHeartbeat,
stripStreamingHopByHopHeaders,
} from './sse-resilience';
/**
* Unit tests for the SSE streaming resilience helpers.
*
* startSseHeartbeat keeps a hijacked SSE response progressing during silent
* tool/think gaps by writing an SSE comment line on a timer (Safari/proxy idle
* timeout). stripStreamingHopByHopHeaders scrubs the hop-by-hop
* Connection/Keep-Alive headers the AI SDK adds before the response head is
* written (Safari rejects them over HTTP/2).
*/
describe('startSseHeartbeat', () => {
beforeEach(() => {
jest.useFakeTimers();
});
afterEach(() => {
jest.clearAllTimers();
jest.useRealTimers();
});
const makeRes = (
overrides: Partial<{ writableEnded: boolean; destroyed: boolean }> = {},
) => {
const handlers: Record<string, () => void> = {};
const res = {
writableEnded: false,
destroyed: false,
write: jest.fn(),
once: jest.fn((event: string, handler: () => void) => {
handlers[event] = handler;
return res;
}),
...overrides,
};
return { res, handlers };
};
it('writes an SSE comment ping each interval', () => {
const { res } = makeRes();
startSseHeartbeat(res as unknown as ServerResponse, 15_000);
jest.advanceTimersByTime(15_000);
expect(res.write).toHaveBeenCalledTimes(1);
expect(res.write).toHaveBeenLastCalledWith(': ping\n\n');
jest.advanceTimersByTime(15_000);
expect(res.write).toHaveBeenCalledTimes(2);
});
it('stops pinging after the returned stop() is called', () => {
const { res } = makeRes();
const stop = startSseHeartbeat(res as unknown as ServerResponse, 15_000);
jest.advanceTimersByTime(15_000);
expect(res.write).toHaveBeenCalledTimes(1);
stop();
jest.advanceTimersByTime(60_000);
expect(res.write).toHaveBeenCalledTimes(1);
});
it('stops pinging when the registered finish/close handler fires', () => {
const { res, handlers } = makeRes();
startSseHeartbeat(res as unknown as ServerResponse, 15_000);
jest.advanceTimersByTime(15_000);
expect(res.write).toHaveBeenCalledTimes(1);
// Both 'close' and 'finish' are registered with the same stop handler.
expect(handlers.close).toBeDefined();
expect(handlers.finish).toBeDefined();
handlers.finish();
jest.advanceTimersByTime(60_000);
expect(res.write).toHaveBeenCalledTimes(1);
});
it('does not write when the response is already ended', () => {
const { res } = makeRes({ writableEnded: true });
startSseHeartbeat(res as unknown as ServerResponse, 15_000);
jest.advanceTimersByTime(45_000);
expect(res.write).not.toHaveBeenCalled();
});
it('does not write when the socket is destroyed', () => {
const { res } = makeRes({ destroyed: true });
startSseHeartbeat(res as unknown as ServerResponse, 15_000);
jest.advanceTimersByTime(45_000);
expect(res.write).not.toHaveBeenCalled();
});
});
describe('stripStreamingHopByHopHeaders', () => {
it('removes connection/keep-alive headers but keeps the rest', () => {
const writeHead = jest.fn();
const res = { writeHead } as unknown as ServerResponse;
stripStreamingHopByHopHeaders(res);
res.writeHead(200, {
'content-type': 'text/event-stream',
connection: 'keep-alive',
'Keep-Alive': 'timeout=5',
'x-accel-buffering': 'no',
});
expect(writeHead).toHaveBeenCalledTimes(1);
const [statusCode, headers] = writeHead.mock.calls[0] as [
number,
Record<string, unknown>,
];
expect(statusCode).toBe(200);
expect(headers).not.toHaveProperty('connection');
expect(headers).not.toHaveProperty('Keep-Alive');
expect(headers).toEqual({
'content-type': 'text/event-stream',
'x-accel-buffering': 'no',
});
});
it('leaves a header-less writeHead(statusCode) call untouched', () => {
const writeHead = jest.fn();
const res = { writeHead } as unknown as ServerResponse;
stripStreamingHopByHopHeaders(res);
res.writeHead(204);
expect(writeHead).toHaveBeenCalledWith(204);
});
});

View File

@@ -0,0 +1,89 @@
import type { ServerResponse } from 'node:http';
/**
* SSE streaming resilience helpers for the hijacked AI-chat responses.
*
* Both AI-chat stream paths (authenticated + public share) hand the AI SDK's
* UI-message stream straight to the raw Node socket via
* pipeUIMessageStreamToResponse. Two Safari/WebKit-specific failure modes break
* that stream where Chrome/Firefox are unaffected; these helpers close both.
*/
/**
* Keep a hijacked SSE response "making progress" by periodically writing an SSE
* comment line (": ping\n\n") to the raw socket.
*
* Why: while the model is thinking or running tools the UI-message stream emits
* no bytes. WebKit/Safari aborts a streaming fetch that stops making progress
* far more aggressively than Chrome (surfaces in the browser as "Load failed"),
* and reverse proxies time out idle streams as well. A periodic heartbeat keeps
* bytes flowing so neither drops the connection.
*
* A line whose first character is ":" is an SSE comment: the client's
* EventSourceParserStream ignores it, so it never becomes a UI chunk. Each ping
* is a COMPLETE SSE record, so interleaving it with the SDK's own writes cannot
* corrupt an event frame.
*
* Returns a stop() that clears the timer; it is also cleared automatically when
* the response finishes or the socket closes. The interval is unref()'d so it
* never keeps the process alive, and writes are guarded so we never write to an
* already-ended/destroyed socket.
*/
export function startSseHeartbeat(
res: ServerResponse,
intervalMs = 15_000,
): () => void {
const timer = setInterval(() => {
if (res.writableEnded || res.destroyed) return;
try {
res.write(': ping\n\n');
} catch {
// Socket vanished between the guard and the write; nothing to do.
}
}, intervalMs);
timer.unref?.();
const stop = (): void => clearInterval(timer);
res.once('close', stop);
res.once('finish', stop);
return stop;
}
/**
* Strip the hop-by-hop `Connection` / `Keep-Alive` headers the AI SDK adds to
* its UI-message-stream response (its UI_MESSAGE_STREAM_HEADERS default sets
* `connection: keep-alive`).
*
* Those headers are valid only on an HTTP/1.1 connection. If a reverse proxy
* forwards them verbatim into an HTTP/2 response, Safari/WebKit REJECTS the
* whole response while Chrome and Firefox silently ignore it — the exact
* "works in Chrome, breaks in Safari" symptom. They are hop-by-hop headers the
* application has no business emitting, so we scrub them at the moment the SDK
* writes the response head (after which they can no longer be removed).
*
* Implemented by wrapping writeHead once for this single hijacked response: the
* SDK calls res.writeHead(statusCode, headersObject); we delete any
* connection/keep-alive keys from that object before delegating to the original.
*/
export function stripStreamingHopByHopHeaders(res: ServerResponse): void {
const originalWriteHead = res.writeHead.bind(res) as (
...args: unknown[]
) => ServerResponse;
(
res as unknown as { writeHead: (...args: unknown[]) => ServerResponse }
).writeHead = (...args: unknown[]): ServerResponse => {
for (const arg of args) {
if (arg && typeof arg === 'object' && !Array.isArray(arg)) {
const headers = arg as Record<string, unknown>;
for (const key of Object.keys(headers)) {
const lower = key.toLowerCase();
if (lower === 'connection' || lower === 'keep-alive') {
delete headers[key];
}
}
}
}
return originalWriteHead(...args);
};
}