fix(ai): устойчивость pre-response ECONNRESET — бюджет ретраев, jittered backoff, keep-alive (#310) #317
+15
-3
@@ -173,9 +173,21 @@ MCP_DOCMOST_PASSWORD=
|
||||
# Keep-alive recycle window (ms) for streaming chat/agent AI + external-MCP calls.
|
||||
# A pooled connection idle longer than this is closed instead of reused, so a
|
||||
# NAT / egress firewall / reverse proxy that silently drops idle connections
|
||||
# cannot poison a reused socket into a PRE-RESPONSE `read ECONNRESET`. Lower it if
|
||||
# your egress drops idle connections faster than ~10s. Default 10000 (10 s).
|
||||
# AI_STREAM_KEEPALIVE_MS=10000
|
||||
# cannot poison a reused socket into a PRE-RESPONSE `read ECONNRESET`. Kept under
|
||||
# common ~5s upstream/middlebox idle cutoffs so undici recycles the socket before
|
||||
# the network kills it (fewer resets), while still reusing within a burst of
|
||||
# back-to-back calls. Lower it further if your egress drops idle connections even
|
||||
# faster. Default 4000 (4 s).
|
||||
# AI_STREAM_KEEPALIVE_MS=4000
|
||||
|
||||
# Number of PRE-RESPONSE connection retries for streaming chat/agent AI calls: a
|
||||
# reset/timeout BEFORE any response byte (e.g. `read ECONNRESET` on a stale pooled
|
||||
# socket) is retried on a fresh connection with jittered exponential backoff.
|
||||
# Total attempts = value + 1, so the default 4 gives 5 attempts — headroom to
|
||||
# absorb a short BURST of upstream resets without exhausting the budget. Safe to
|
||||
# retry: a started stream is never replayed, only a connect that never responded.
|
||||
# 0 disables the retry. Default 4.
|
||||
# AI_STREAM_PRE_RESPONSE_RETRIES=4
|
||||
|
||||
# Silence timeout (ms) for EXTERNAL-MCP transport ONLY (not the chat provider).
|
||||
# Tighter than AI_STREAM_TIMEOUT_MS so a byte-silent/hung MCP server is broken in
|
||||
|
||||
@@ -6,6 +6,8 @@ import {
|
||||
streamKeepAliveMs,
|
||||
streamingDispatcherOptions,
|
||||
isRetryableConnectError,
|
||||
preResponseConnectRetries,
|
||||
preResponseBackoffMs,
|
||||
} from './ai-streaming-fetch';
|
||||
|
||||
/**
|
||||
@@ -47,8 +49,8 @@ describe('streamTimeoutMs', () => {
|
||||
expect(streamingDispatcherOptions()).toEqual({
|
||||
headersTimeout: 900_000,
|
||||
bodyTimeout: 900_000,
|
||||
keepAliveTimeout: 10_000,
|
||||
keepAliveMaxTimeout: 10_000,
|
||||
keepAliveTimeout: 4_000,
|
||||
keepAliveMaxTimeout: 4_000,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -60,21 +62,91 @@ describe('streamKeepAliveMs', () => {
|
||||
else process.env.AI_STREAM_KEEPALIVE_MS = ORIG;
|
||||
});
|
||||
|
||||
it('defaults to 10s (recycle idle sockets so a NAT/proxy drop cannot poison reuse)', () => {
|
||||
it('defaults to 4s (recycle idle sockets under common ~5s upstream idle cutoffs)', () => {
|
||||
delete process.env.AI_STREAM_KEEPALIVE_MS;
|
||||
expect(streamKeepAliveMs()).toBe(10_000);
|
||||
expect(streamKeepAliveMs()).toBe(4_000);
|
||||
});
|
||||
|
||||
it('honours a positive override and ignores invalid/non-positive', () => {
|
||||
process.env.AI_STREAM_KEEPALIVE_MS = '4000';
|
||||
expect(streamKeepAliveMs()).toBe(4000);
|
||||
process.env.AI_STREAM_KEEPALIVE_MS = '7000';
|
||||
expect(streamKeepAliveMs()).toBe(7000);
|
||||
for (const bad of ['0', '-1', 'x', '']) {
|
||||
process.env.AI_STREAM_KEEPALIVE_MS = bad;
|
||||
expect(streamKeepAliveMs()).toBe(10_000);
|
||||
expect(streamKeepAliveMs()).toBe(4_000);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* #310: the PRE-RESPONSE retry budget was raised 2 -> 4 (5 total attempts) and
|
||||
* made env-configurable so a BURST of upstream resets doesn't exhaust it.
|
||||
*/
|
||||
describe('preResponseConnectRetries', () => {
|
||||
const ORIG = process.env.AI_STREAM_PRE_RESPONSE_RETRIES;
|
||||
afterEach(() => {
|
||||
if (ORIG === undefined) delete process.env.AI_STREAM_PRE_RESPONSE_RETRIES;
|
||||
else process.env.AI_STREAM_PRE_RESPONSE_RETRIES = ORIG;
|
||||
});
|
||||
|
||||
it('defaults to 4 retries (5 total attempts)', () => {
|
||||
delete process.env.AI_STREAM_PRE_RESPONSE_RETRIES;
|
||||
expect(preResponseConnectRetries()).toBe(4);
|
||||
});
|
||||
|
||||
it('honours a non-negative override (incl. 0 = single attempt)', () => {
|
||||
process.env.AI_STREAM_PRE_RESPONSE_RETRIES = '6';
|
||||
expect(preResponseConnectRetries()).toBe(6);
|
||||
process.env.AI_STREAM_PRE_RESPONSE_RETRIES = '0';
|
||||
expect(preResponseConnectRetries()).toBe(0);
|
||||
});
|
||||
|
||||
it('ignores an invalid / negative override (falls back to default 4)', () => {
|
||||
for (const bad of ['-1', 'abc', '']) {
|
||||
process.env.AI_STREAM_PRE_RESPONSE_RETRIES = bad;
|
||||
expect(preResponseConnectRetries()).toBe(4);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* #310: linear `150 * (attempt + 1)` backoff replaced with capped exponential +
|
||||
* FULL jitter to avoid a thundering herd of lock-step reconnects. Bound-check the
|
||||
* jitter by pinning the randomness source to its extremes.
|
||||
*/
|
||||
describe('preResponseBackoffMs', () => {
|
||||
it('with rand=0 waits 0 (bottom of the full-jitter window)', () => {
|
||||
for (let attempt = 0; attempt < 6; attempt++) {
|
||||
expect(preResponseBackoffMs(attempt, () => 0)).toBe(0);
|
||||
}
|
||||
});
|
||||
|
||||
it('with rand=1 returns the capped exponential top of the window', () => {
|
||||
// base 150ms, exp = 150 * 2**attempt, capped at 2000ms.
|
||||
expect(preResponseBackoffMs(0, () => 1)).toBe(150);
|
||||
expect(preResponseBackoffMs(1, () => 1)).toBe(300);
|
||||
expect(preResponseBackoffMs(2, () => 1)).toBe(600);
|
||||
expect(preResponseBackoffMs(3, () => 1)).toBe(1200);
|
||||
// 150 * 2**4 = 2400 -> capped to 2000.
|
||||
expect(preResponseBackoffMs(4, () => 1)).toBe(2000);
|
||||
expect(preResponseBackoffMs(10, () => 1)).toBe(2000);
|
||||
});
|
||||
|
||||
it('stays within [0, cap] and is NOT the old fixed linear value', () => {
|
||||
const cap = 2000;
|
||||
for (let attempt = 0; attempt < 8; attempt++) {
|
||||
for (const r of [0, 0.5, 0.999, 1]) {
|
||||
const d = preResponseBackoffMs(attempt, () => r);
|
||||
expect(d).toBeGreaterThanOrEqual(0);
|
||||
expect(d).toBeLessThanOrEqual(cap);
|
||||
}
|
||||
}
|
||||
// The old formula gave a fixed 150*(attempt+1); the jittered one with a
|
||||
// mid-range rand does not reproduce it (e.g. attempt 0 -> 75, not 150).
|
||||
expect(preResponseBackoffMs(0, () => 0.5)).toBe(75);
|
||||
expect(preResponseBackoffMs(0, () => 0.5)).not.toBe(150);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isRetryableConnectError', () => {
|
||||
it('matches connection-level codes on the error or its cause', () => {
|
||||
expect(isRetryableConnectError({ cause: { code: 'ECONNRESET' } })).toBe(true);
|
||||
@@ -156,8 +228,12 @@ describe('createStreamingFetch — against a delayed server', () => {
|
||||
describe('withPreResponseRetry', () => {
|
||||
// The retry is the OUTERMOST layer (over the dispatcher-bound streaming fetch),
|
||||
// matching ai.service's withPreResponseRetry(instrument(createStreamingFetch())).
|
||||
// PRE_RESPONSE_CONNECT_RETRIES is 2 -> at most 3 total attempts.
|
||||
const MAX_ATTEMPTS = 3;
|
||||
// The budget is env-driven (AI_STREAM_PRE_RESPONSE_RETRIES, default 4 -> 5
|
||||
// total attempts). We PIN it to 2 here so the exhaustion test is fast and
|
||||
// deterministic regardless of the default; total attempts = retries + 1 = 3.
|
||||
const RETRIES = 2;
|
||||
const MAX_ATTEMPTS = RETRIES + 1;
|
||||
const ORIG_RETRIES = process.env.AI_STREAM_PRE_RESPONSE_RETRIES;
|
||||
let server: http.Server;
|
||||
let url: string;
|
||||
let requests = 0;
|
||||
@@ -194,6 +270,13 @@ describe('withPreResponseRetry', () => {
|
||||
beforeEach(() => {
|
||||
requests = 0;
|
||||
resetMode = 'first';
|
||||
process.env.AI_STREAM_PRE_RESPONSE_RETRIES = String(RETRIES);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
if (ORIG_RETRIES === undefined)
|
||||
delete process.env.AI_STREAM_PRE_RESPONSE_RETRIES;
|
||||
else process.env.AI_STREAM_PRE_RESPONSE_RETRIES = ORIG_RETRIES;
|
||||
});
|
||||
|
||||
it('retries a pre-response reset on a fresh connection and succeeds', async () => {
|
||||
@@ -216,12 +299,28 @@ describe('withPreResponseRetry', () => {
|
||||
expect(caught).toBeDefined();
|
||||
// A retryable connection error reached the caller (not swallowed).
|
||||
expect(isRetryableConnectError(caught)).toBe(true);
|
||||
// Bounded: exactly PRE_RESPONSE_CONNECT_RETRIES + 1 attempts hit the server
|
||||
// Bounded: exactly AI_STREAM_PRE_RESPONSE_RETRIES + 1 attempts hit the server
|
||||
// (pins both the limit and that the final error propagates — guards an
|
||||
// off-by-one or an infinite loop).
|
||||
expect(requests).toBe(MAX_ATTEMPTS);
|
||||
});
|
||||
|
||||
it('honours a raised AI_STREAM_PRE_RESPONSE_RETRIES (more attempts before giving up)', async () => {
|
||||
// Env-driven budget: 4 retries -> 5 total attempts against a persistently
|
||||
// resetting connect.
|
||||
process.env.AI_STREAM_PRE_RESPONSE_RETRIES = '4';
|
||||
resetMode = 'all';
|
||||
let caught: unknown;
|
||||
try {
|
||||
await retryingFetch()(url);
|
||||
} catch (e) {
|
||||
caught = e;
|
||||
}
|
||||
expect(caught).toBeDefined();
|
||||
expect(isRetryableConnectError(caught)).toBe(true);
|
||||
expect(requests).toBe(5);
|
||||
});
|
||||
|
||||
it('does NOT retry an aborted request (no retry storm)', async () => {
|
||||
resetMode = 'all';
|
||||
const ctrl = new AbortController();
|
||||
|
||||
@@ -19,7 +19,7 @@ import { Agent } from 'undici';
|
||||
const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
|
||||
|
||||
/**
|
||||
* Default keep-alive recycle window (10s). A pooled connection idle longer than
|
||||
* Default keep-alive recycle window (4s). A pooled connection idle longer than
|
||||
* this is CLOSED rather than reused.
|
||||
*
|
||||
* Long agent turns leave gaps of tens of seconds between provider calls (one
|
||||
@@ -30,17 +30,70 @@ const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
|
||||
* the resets correlate with idleSincePrevCall ~42s, while a direct path to the
|
||||
* provider does NOT reset). Recycling idle sockets well below such a drop window
|
||||
* means a long-gap call opens a fresh connection instead of reusing a stale one.
|
||||
* Kept comfortably under common ~5s upstream/middlebox idle cutoffs so undici
|
||||
* recycles the socket before the network kills it, while still long enough to
|
||||
* reuse a connection within a single burst of back-to-back calls (#310).
|
||||
* `keepAliveMaxTimeout` also caps a server-advertised keep-alive so the provider
|
||||
* cannot push the reuse window back up.
|
||||
*/
|
||||
const DEFAULT_STREAM_KEEPALIVE_MS = 10_000;
|
||||
const DEFAULT_STREAM_KEEPALIVE_MS = 4_000;
|
||||
|
||||
/**
|
||||
* How many times to retry a PRE-RESPONSE connection failure (a reset/timeout
|
||||
* before ANY response byte) on a fresh connection. Safe because `fetch()` only
|
||||
* rejects before the Response resolves — a started stream is never replayed.
|
||||
* Default number of times to retry a PRE-RESPONSE connection failure (a
|
||||
* reset/timeout before ANY response byte) on a fresh connection. Safe because
|
||||
* `fetch()` only rejects before the Response resolves — a started stream is
|
||||
* never replayed.
|
||||
*
|
||||
* Raised from 2 to 4 (total 5 attempts) so a short BURST of upstream/middlebox
|
||||
* resets is absorbed without exhausting the budget: prod saw 2 of 3 attempts
|
||||
* burned on a single turn, leaving no headroom (#310). Override with
|
||||
* `AI_STREAM_PRE_RESPONSE_RETRIES`.
|
||||
*/
|
||||
const PRE_RESPONSE_CONNECT_RETRIES = 2;
|
||||
const DEFAULT_PRE_RESPONSE_CONNECT_RETRIES = 4;
|
||||
|
||||
/**
|
||||
* Configured PRE-RESPONSE retry budget. Override with
|
||||
* `AI_STREAM_PRE_RESPONSE_RETRIES`; a missing/invalid/negative value falls back
|
||||
* to {@link DEFAULT_PRE_RESPONSE_CONNECT_RETRIES}. Total attempts = value + 1.
|
||||
* 0 disables the retry (a single attempt).
|
||||
*/
|
||||
export function preResponseConnectRetries(): number {
|
||||
// Read the raw string first: an empty/whitespace value coerces to 0 via
|
||||
// Number(), which is a VALID setting here (0 = single attempt), so it must be
|
||||
// treated as "unset" rather than "disable the retry".
|
||||
const rawStr = process.env.AI_STREAM_PRE_RESPONSE_RETRIES;
|
||||
if (rawStr === undefined || rawStr.trim() === '') {
|
||||
return DEFAULT_PRE_RESPONSE_CONNECT_RETRIES;
|
||||
}
|
||||
const raw = Number(rawStr);
|
||||
return Number.isFinite(raw) && raw >= 0
|
||||
? Math.floor(raw)
|
||||
: DEFAULT_PRE_RESPONSE_CONNECT_RETRIES;
|
||||
}
|
||||
|
||||
/** Base backoff before the first PRE-RESPONSE retry (ms). */
|
||||
const PRE_RESPONSE_BACKOFF_BASE_MS = 150;
|
||||
|
||||
/** Cap on the exponential backoff window before jitter (ms). */
|
||||
const PRE_RESPONSE_BACKOFF_CAP_MS = 2_000;
|
||||
|
||||
/**
|
||||
* Backoff (ms) to wait before PRE-RESPONSE retry number `attempt` (0-based).
|
||||
*
|
||||
* Capped exponential with FULL jitter: `delay = random in [0, min(base*2^attempt,
|
||||
* cap)]`. Full jitter spreads concurrent retries across the whole window so a
|
||||
* burst of turns that all reset at once do not reconnect in lock-step and
|
||||
* hammer the upstream in a thundering herd (#310); the exponential growth backs
|
||||
* off harder as resets persist, and the cap keeps the wait bounded.
|
||||
*/
|
||||
export function preResponseBackoffMs(
|
||||
attempt: number,
|
||||
rand: () => number = Math.random,
|
||||
): number {
|
||||
const exp = PRE_RESPONSE_BACKOFF_BASE_MS * 2 ** attempt;
|
||||
const capped = Math.min(exp, PRE_RESPONSE_BACKOFF_CAP_MS);
|
||||
return rand() * capped;
|
||||
}
|
||||
|
||||
/** undici cause codes for a connection-level failure that occurred PRE-RESPONSE. */
|
||||
const RETRYABLE_CONNECT_CODES = new Set([
|
||||
@@ -177,20 +230,19 @@ export function createStreamingFetch(): typeof fetch {
|
||||
*/
|
||||
export function withPreResponseRetry(baseFetch: typeof fetch): typeof fetch {
|
||||
return (async (input: Parameters<typeof fetch>[0], init?: RequestInit) => {
|
||||
const maxRetries = preResponseConnectRetries();
|
||||
for (let attempt = 0; ; attempt++) {
|
||||
try {
|
||||
return await baseFetch(input, init);
|
||||
} catch (err) {
|
||||
const aborted = init?.signal?.aborted === true;
|
||||
if (
|
||||
aborted ||
|
||||
attempt >= PRE_RESPONSE_CONNECT_RETRIES ||
|
||||
!isRetryableConnectError(err)
|
||||
) {
|
||||
if (aborted || attempt >= maxRetries || !isRetryableConnectError(err)) {
|
||||
throw err;
|
||||
}
|
||||
// Brief backoff before the fresh-connection retry.
|
||||
await new Promise((resolve) => setTimeout(resolve, 150 * (attempt + 1)));
|
||||
// Jittered backoff before the fresh-connection retry (anti-thundering-herd).
|
||||
await new Promise((resolve) =>
|
||||
setTimeout(resolve, preResponseBackoffMs(attempt)),
|
||||
);
|
||||
}
|
||||
}
|
||||
}) as typeof fetch;
|
||||
|
||||
Reference in New Issue
Block a user