fix(ai-chat): recycle keep-alive sockets + retry pre-response resets (#175)

The real cause of the long-task "Lost connection to the AI provider" — the
earlier 300s-timeout fix (#176) was the wrong layer. The provider-HTTP telemetry
on the user's deploy shows the failures are PRE-RESPONSE `read ECONNRESET` ~500ms
in (not a 300s/15min timeout), correlated with idleSincePrevCall ~42s and large
bodies; and crucially a retry of the SAME request often succeeds. A direct probe
to the real z.ai endpoint does NOT reset (113KB bodies and a 45s-idle keep-alive
reuse both succeed), and another agent (opencode) runs fine from the same infra —
so the provider is healthy and the egress network is usable. The difference is
the transport: undici's keep-alive pool REUSES a socket that the deployment's
egress (NAT / firewall / conntrack) silently dropped during a long idle gap, so
the next request resets pre-response.

Fix (brings gitmost in line with clients that don't reuse stale sockets):
- Keep-alive recycling: the streaming dispatcher (chat fetch AND the external-MCP
  dispatcher, via the shared streamingDispatcherOptions) now sets
  keepAliveTimeout + keepAliveMaxTimeout to a 10s recycle window
  (AI_STREAM_KEEPALIVE_MS), so a connection idle longer than that is closed
  instead of reused — a long-gap step opens a fresh connection. keepAliveMaxTimeout
  also caps a server-advertised keep-alive so the provider can't widen the window.
- Pre-response connection retry: createStreamingFetch retries a connection-level
  reset (ECONNRESET / UND_ERR_SOCKET / ECONNREFUSED / EPIPE / *_TIMEOUT) on a
  fresh connection up to 2 times. This is SAFE because fetch() only rejects before
  the Response resolves — a started stream is never replayed; an abort (client
  disconnect) is never retried.

Tests: ai-streaming-fetch.spec — keep-alive options, streamKeepAliveMs env,
isRetryableConnectError, and a server that resets the first connection so the
retry must land on a fresh one (+ aborted requests are not retried). Verified on
the stand that a normal turn still streams (reasoning + text + finish) through the
new transport. server tsc + ai/mcp specs green.

Note: root cause is the deployment's egress dropping idle connections (Traefik is
inbound-only); this makes the app resilient to it. AI_STREAM_KEEPALIVE_MS can be
lowered if the egress drops faster than ~10s.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
claude code agent 227
2026-06-24 23:51:17 +03:00
parent d1fbcc1bfa
commit b0faa2fe32
3 changed files with 198 additions and 17 deletions

View File

@@ -142,6 +142,13 @@ MCP_DOCMOST_PASSWORD=
# provider is eventually broken instead of leaking forever. Default 900000 (15 min).
# AI_STREAM_TIMEOUT_MS=900000
# Keep-alive recycle window (ms) for streaming chat/agent AI + external-MCP calls.
# A pooled connection idle longer than this is closed instead of reused, so a
# NAT / egress firewall / reverse proxy that silently drops idle connections
# cannot poison a reused socket into a PRE-RESPONSE `read ECONNRESET`. Lower it if
# your egress drops idle connections faster than ~10s. Default 10000 (10 s).
# AI_STREAM_KEEPALIVE_MS=10000
# --- Anonymous public-share AI assistant ---
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
# When enabled, anonymous visitors of a published share can ask an AI about that

View File

@@ -2,7 +2,9 @@ import * as http from 'node:http';
import {
createStreamingFetch,
streamTimeoutMs,
streamKeepAliveMs,
streamingDispatcherOptions,
isRetryableConnectError,
} from './ai-streaming-fetch';
/**
@@ -38,15 +40,54 @@ describe('streamTimeoutMs', () => {
}
});
it('applies the timeout to BOTH undici stream timeouts', () => {
it('applies the silence timeout + keep-alive recycle window to the dispatcher', () => {
delete process.env.AI_STREAM_TIMEOUT_MS;
delete process.env.AI_STREAM_KEEPALIVE_MS;
expect(streamingDispatcherOptions()).toEqual({
headersTimeout: 900_000,
bodyTimeout: 900_000,
keepAliveTimeout: 10_000,
keepAliveMaxTimeout: 10_000,
});
});
});
describe('streamKeepAliveMs', () => {
const ORIG = process.env.AI_STREAM_KEEPALIVE_MS;
afterEach(() => {
if (ORIG === undefined) delete process.env.AI_STREAM_KEEPALIVE_MS;
else process.env.AI_STREAM_KEEPALIVE_MS = ORIG;
});
it('defaults to 10s (recycle idle sockets so a NAT/proxy drop cannot poison reuse)', () => {
delete process.env.AI_STREAM_KEEPALIVE_MS;
expect(streamKeepAliveMs()).toBe(10_000);
});
it('honours a positive override and ignores invalid/non-positive', () => {
process.env.AI_STREAM_KEEPALIVE_MS = '4000';
expect(streamKeepAliveMs()).toBe(4000);
for (const bad of ['0', '-1', 'x', '']) {
process.env.AI_STREAM_KEEPALIVE_MS = bad;
expect(streamKeepAliveMs()).toBe(10_000);
}
});
});
describe('isRetryableConnectError', () => {
it('matches connection-level codes on the error or its cause', () => {
expect(isRetryableConnectError({ cause: { code: 'ECONNRESET' } })).toBe(true);
expect(isRetryableConnectError({ cause: { code: 'UND_ERR_SOCKET' } })).toBe(true);
expect(isRetryableConnectError({ code: 'ECONNREFUSED' })).toBe(true);
});
it('does NOT match aborts / unrelated errors', () => {
expect(isRetryableConnectError({ name: 'AbortError', cause: { code: 'ABORT_ERR' } })).toBe(false);
expect(isRetryableConnectError({ cause: { code: 'UND_ERR_HEADERS_TIMEOUT' } })).toBe(false);
expect(isRetryableConnectError(new Error('plain'))).toBe(false);
expect(isRetryableConnectError(undefined)).toBe(false);
});
});
describe('createStreamingFetch — against a delayed server', () => {
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
let server: http.Server;
@@ -110,3 +151,56 @@ describe('createStreamingFetch — against a delayed server', () => {
if (code) expect(code).toBe('UND_ERR_HEADERS_TIMEOUT');
});
});
describe('createStreamingFetch — pre-response connection retry', () => {
let server: http.Server;
let url: string;
let requests = 0;
beforeAll(async () => {
server = http.createServer((req, res) => {
requests += 1;
if (requests === 1) {
// Reset the FIRST connection before any response byte (a poisoned/stale
// keep-alive socket). The retry must open a fresh connection.
const sock = req.socket as import('node:net').Socket & {
resetAndDestroy?: () => void;
};
if (typeof sock.resetAndDestroy === 'function') sock.resetAndDestroy();
else sock.destroy();
return;
}
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('ok');
});
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
const addr = server.address() as import('node:net').AddressInfo;
url = `http://127.0.0.1:${addr.port}/`;
});
afterAll(async () => {
await new Promise<void>((resolve) => server.close(() => resolve()));
});
beforeEach(() => {
requests = 0;
});
it('retries a pre-response reset on a fresh connection and succeeds', async () => {
const streamingFetch = createStreamingFetch();
const res = await streamingFetch(url);
expect(res.status).toBe(200);
expect(await res.text()).toBe('ok');
// first request reset -> retry -> second request served.
expect(requests).toBeGreaterThanOrEqual(2);
});
it('does NOT retry an aborted request (no retry storm)', async () => {
const ctrl = new AbortController();
ctrl.abort();
const streamingFetch = createStreamingFetch();
await expect(streamingFetch(url, { signal: ctrl.signal })).rejects.toBeDefined();
// Pre-aborted: the request never reached the server, so nothing was retried.
expect(requests).toBe(0);
});
});

View File

@@ -18,41 +18,121 @@ import { Agent } from 'undici';
*/
const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
/**
* Default keep-alive recycle window (10s). A pooled connection idle longer than
* this is CLOSED rather than reused.
*
* Long agent turns leave gaps of tens of seconds between provider calls (one
* call per step; a crawl/search tool runs in between). A NAT / reverse proxy /
* conntrack in front of the deployment silently drops an idle connection after
* its own timeout; undici, not knowing, then reuses that dead socket and the
* next request fails PRE-RESPONSE with `read ECONNRESET` (#175 prod telemetry:
* the resets correlate with idleSincePrevCall ~42s, while a direct path to the
* provider does NOT reset). Recycling idle sockets well below such a drop window
* means a long-gap call opens a fresh connection instead of reusing a stale one.
* `keepAliveMaxTimeout` also caps a server-advertised keep-alive so the provider
* cannot push the reuse window back up.
*/
const DEFAULT_STREAM_KEEPALIVE_MS = 10_000;
/**
* How many times to retry a PRE-RESPONSE connection failure (a reset/timeout
* before ANY response byte) on a fresh connection. Safe because `fetch()` only
* rejects before the Response resolves — a started stream is never replayed.
*/
const PRE_RESPONSE_CONNECT_RETRIES = 2;
/** undici cause codes for a connection-level failure that occurred PRE-RESPONSE. */
const RETRYABLE_CONNECT_CODES = new Set([
'ECONNRESET',
'ECONNREFUSED',
'EPIPE',
'ETIMEDOUT',
'UND_ERR_SOCKET',
'UND_ERR_CONNECT_TIMEOUT',
]);
function positiveEnv(name: string, fallback: number): number {
const raw = Number(process.env[name]);
return Number.isFinite(raw) && raw > 0 ? raw : fallback;
}
/**
* The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a
* missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}.
*/
export function streamTimeoutMs(): number {
const raw = Number(process.env.AI_STREAM_TIMEOUT_MS);
return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS;
return positiveEnv('AI_STREAM_TIMEOUT_MS', DEFAULT_STREAM_TIMEOUT_MS);
}
/** Keep-alive recycle window (ms). Override with `AI_STREAM_KEEPALIVE_MS`. */
export function streamKeepAliveMs(): number {
return positiveEnv('AI_STREAM_KEEPALIVE_MS', DEFAULT_STREAM_KEEPALIVE_MS);
}
/**
* undici `Agent` timeout options for streaming AI traffic — both stream timeouts
* set to the (generous, finite) silence timeout. Shared by the chat provider
* fetch and the external-MCP dispatcher so they behave identically (#175).
* undici `Agent` options for streaming AI traffic — the (generous, finite)
* silence timeouts plus the keep-alive recycle window. Shared by the chat
* provider fetch and the external-MCP dispatcher so they behave identically.
*/
export function streamingDispatcherOptions(): {
headersTimeout: number;
bodyTimeout: number;
keepAliveTimeout: number;
keepAliveMaxTimeout: number;
} {
const t = streamTimeoutMs();
return { headersTimeout: t, bodyTimeout: t };
const ka = streamKeepAliveMs();
return {
headersTimeout: t,
bodyTimeout: t,
keepAliveTimeout: ka,
keepAliveMaxTimeout: ka,
};
}
/** True for a connection-level error worth retrying on a fresh connection. */
export function isRetryableConnectError(err: unknown): boolean {
const e = err as { code?: string; cause?: { code?: string } } | undefined;
const code = e?.cause?.code ?? e?.code;
return typeof code === 'string' && RETRYABLE_CONNECT_CODES.has(code);
}
/**
* Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed
* by a dedicated undici dispatcher whose stream timeouts are the generous-but-
* finite silence timeout above (#175). A single shared dispatcher is returned
* (callers hold it for the service lifetime) so its connection pool is reused.
* by a dedicated undici dispatcher (finite silence timeouts + keep-alive
* recycling, #175). A single shared dispatcher is returned (callers hold it for
* the service lifetime) so its connection pool is reused.
*
* On a PRE-RESPONSE connection reset (`fetch()` rejects before the Response
* resolves — so nothing has streamed) it retries a few times on a fresh
* connection. A poisoned keep-alive socket is destroyed by undici on the reset,
* so the retry lands on a new connection. An abort (client disconnect) is never
* retried.
*/
export function createStreamingFetch(): typeof fetch {
const dispatcher = new Agent(streamingDispatcherOptions());
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
fetch(input, {
...(init ?? {}),
// `dispatcher` is an undici-specific init field (not in the DOM RequestInit
// type); Node's global fetch reads it. Cast to satisfy the type.
dispatcher,
} as RequestInit & { dispatcher: Agent })) as typeof fetch;
return (async (input: Parameters<typeof fetch>[0], init?: RequestInit) => {
for (let attempt = 0; ; attempt++) {
try {
return await fetch(input, {
...(init ?? {}),
// `dispatcher` is an undici-specific init field (not in the DOM
// RequestInit type); Node's global fetch reads it. Cast to satisfy it.
dispatcher,
} as RequestInit & { dispatcher: Agent });
} catch (err) {
const aborted = init?.signal?.aborted === true;
if (
aborted ||
attempt >= PRE_RESPONSE_CONNECT_RETRIES ||
!isRetryableConnectError(err)
) {
throw err;
}
// Brief backoff before the fresh-connection retry.
await new Promise((resolve) => setTimeout(resolve, 150 * (attempt + 1)));
}
}
}) as typeof fetch;
}