diff --git a/.env.example b/.env.example index 8e954433..d32b67c7 100644 --- a/.env.example +++ b/.env.example @@ -173,9 +173,21 @@ MCP_DOCMOST_PASSWORD= # Keep-alive recycle window (ms) for streaming chat/agent AI + external-MCP calls. # A pooled connection idle longer than this is closed instead of reused, so a # NAT / egress firewall / reverse proxy that silently drops idle connections -# cannot poison a reused socket into a PRE-RESPONSE `read ECONNRESET`. Lower it if -# your egress drops idle connections faster than ~10s. Default 10000 (10 s). -# AI_STREAM_KEEPALIVE_MS=10000 +# cannot poison a reused socket into a PRE-RESPONSE `read ECONNRESET`. Kept under +# common ~5s upstream/middlebox idle cutoffs so undici recycles the socket before +# the network kills it (fewer resets), while still reusing within a burst of +# back-to-back calls. Lower it further if your egress drops idle connections even +# faster. Default 4000 (4 s). +# AI_STREAM_KEEPALIVE_MS=4000 + +# Number of PRE-RESPONSE connection retries for streaming chat/agent AI calls: a +# reset/timeout BEFORE any response byte (e.g. `read ECONNRESET` on a stale pooled +# socket) is retried on a fresh connection with jittered exponential backoff. +# Total attempts = value + 1, so the default 4 gives 5 attempts — headroom to +# absorb a short BURST of upstream resets without exhausting the budget. Safe to +# retry: a started stream is never replayed, only a connect that never responded. +# 0 disables the retry. Default 4. +# AI_STREAM_PRE_RESPONSE_RETRIES=4 # Silence timeout (ms) for EXTERNAL-MCP transport ONLY (not the chat provider). # Tighter than AI_STREAM_TIMEOUT_MS so a byte-silent/hung MCP server is broken in diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts index 07c8ec40..994c8d80 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts @@ -6,6 +6,8 @@ import { streamKeepAliveMs, streamingDispatcherOptions, isRetryableConnectError, + preResponseConnectRetries, + preResponseBackoffMs, } from './ai-streaming-fetch'; /** @@ -47,8 +49,8 @@ describe('streamTimeoutMs', () => { expect(streamingDispatcherOptions()).toEqual({ headersTimeout: 900_000, bodyTimeout: 900_000, - keepAliveTimeout: 10_000, - keepAliveMaxTimeout: 10_000, + keepAliveTimeout: 4_000, + keepAliveMaxTimeout: 4_000, }); }); }); @@ -60,21 +62,91 @@ describe('streamKeepAliveMs', () => { else process.env.AI_STREAM_KEEPALIVE_MS = ORIG; }); - it('defaults to 10s (recycle idle sockets so a NAT/proxy drop cannot poison reuse)', () => { + it('defaults to 4s (recycle idle sockets under common ~5s upstream idle cutoffs)', () => { delete process.env.AI_STREAM_KEEPALIVE_MS; - expect(streamKeepAliveMs()).toBe(10_000); + expect(streamKeepAliveMs()).toBe(4_000); }); it('honours a positive override and ignores invalid/non-positive', () => { - process.env.AI_STREAM_KEEPALIVE_MS = '4000'; - expect(streamKeepAliveMs()).toBe(4000); + process.env.AI_STREAM_KEEPALIVE_MS = '7000'; + expect(streamKeepAliveMs()).toBe(7000); for (const bad of ['0', '-1', 'x', '']) { process.env.AI_STREAM_KEEPALIVE_MS = bad; - expect(streamKeepAliveMs()).toBe(10_000); + expect(streamKeepAliveMs()).toBe(4_000); } }); }); +/** + * #310: the PRE-RESPONSE retry budget was raised 2 -> 4 (5 total attempts) and + * made env-configurable so a BURST of upstream resets doesn't exhaust it. + */ +describe('preResponseConnectRetries', () => { + const ORIG = process.env.AI_STREAM_PRE_RESPONSE_RETRIES; + afterEach(() => { + if (ORIG === undefined) delete process.env.AI_STREAM_PRE_RESPONSE_RETRIES; + else process.env.AI_STREAM_PRE_RESPONSE_RETRIES = ORIG; + }); + + it('defaults to 4 retries (5 total attempts)', () => { + delete process.env.AI_STREAM_PRE_RESPONSE_RETRIES; + expect(preResponseConnectRetries()).toBe(4); + }); + + it('honours a non-negative override (incl. 0 = single attempt)', () => { + process.env.AI_STREAM_PRE_RESPONSE_RETRIES = '6'; + expect(preResponseConnectRetries()).toBe(6); + process.env.AI_STREAM_PRE_RESPONSE_RETRIES = '0'; + expect(preResponseConnectRetries()).toBe(0); + }); + + it('ignores an invalid / negative override (falls back to default 4)', () => { + for (const bad of ['-1', 'abc', '']) { + process.env.AI_STREAM_PRE_RESPONSE_RETRIES = bad; + expect(preResponseConnectRetries()).toBe(4); + } + }); +}); + +/** + * #310: linear `150 * (attempt + 1)` backoff replaced with capped exponential + + * FULL jitter to avoid a thundering herd of lock-step reconnects. Bound-check the + * jitter by pinning the randomness source to its extremes. + */ +describe('preResponseBackoffMs', () => { + it('with rand=0 waits 0 (bottom of the full-jitter window)', () => { + for (let attempt = 0; attempt < 6; attempt++) { + expect(preResponseBackoffMs(attempt, () => 0)).toBe(0); + } + }); + + it('with rand=1 returns the capped exponential top of the window', () => { + // base 150ms, exp = 150 * 2**attempt, capped at 2000ms. + expect(preResponseBackoffMs(0, () => 1)).toBe(150); + expect(preResponseBackoffMs(1, () => 1)).toBe(300); + expect(preResponseBackoffMs(2, () => 1)).toBe(600); + expect(preResponseBackoffMs(3, () => 1)).toBe(1200); + // 150 * 2**4 = 2400 -> capped to 2000. + expect(preResponseBackoffMs(4, () => 1)).toBe(2000); + expect(preResponseBackoffMs(10, () => 1)).toBe(2000); + }); + + it('stays within [0, cap] and is NOT the old fixed linear value', () => { + const cap = 2000; + for (let attempt = 0; attempt < 8; attempt++) { + for (const r of [0, 0.5, 0.999, 1]) { + const d = preResponseBackoffMs(attempt, () => r); + expect(d).toBeGreaterThanOrEqual(0); + expect(d).toBeLessThanOrEqual(cap); + } + } + // The old formula gave a fixed 150*(attempt+1); the jittered one with a + // mid-range rand does not reproduce it (e.g. attempt 0 -> 75, not 150). + expect(preResponseBackoffMs(0, () => 0.5)).toBe(75); + expect(preResponseBackoffMs(0, () => 0.5)).not.toBe(150); + }); +}); + describe('isRetryableConnectError', () => { it('matches connection-level codes on the error or its cause', () => { expect(isRetryableConnectError({ cause: { code: 'ECONNRESET' } })).toBe(true); @@ -156,8 +228,12 @@ describe('createStreamingFetch — against a delayed server', () => { describe('withPreResponseRetry', () => { // The retry is the OUTERMOST layer (over the dispatcher-bound streaming fetch), // matching ai.service's withPreResponseRetry(instrument(createStreamingFetch())). - // PRE_RESPONSE_CONNECT_RETRIES is 2 -> at most 3 total attempts. - const MAX_ATTEMPTS = 3; + // The budget is env-driven (AI_STREAM_PRE_RESPONSE_RETRIES, default 4 -> 5 + // total attempts). We PIN it to 2 here so the exhaustion test is fast and + // deterministic regardless of the default; total attempts = retries + 1 = 3. + const RETRIES = 2; + const MAX_ATTEMPTS = RETRIES + 1; + const ORIG_RETRIES = process.env.AI_STREAM_PRE_RESPONSE_RETRIES; let server: http.Server; let url: string; let requests = 0; @@ -194,6 +270,13 @@ describe('withPreResponseRetry', () => { beforeEach(() => { requests = 0; resetMode = 'first'; + process.env.AI_STREAM_PRE_RESPONSE_RETRIES = String(RETRIES); + }); + + afterEach(() => { + if (ORIG_RETRIES === undefined) + delete process.env.AI_STREAM_PRE_RESPONSE_RETRIES; + else process.env.AI_STREAM_PRE_RESPONSE_RETRIES = ORIG_RETRIES; }); it('retries a pre-response reset on a fresh connection and succeeds', async () => { @@ -216,12 +299,28 @@ describe('withPreResponseRetry', () => { expect(caught).toBeDefined(); // A retryable connection error reached the caller (not swallowed). expect(isRetryableConnectError(caught)).toBe(true); - // Bounded: exactly PRE_RESPONSE_CONNECT_RETRIES + 1 attempts hit the server + // Bounded: exactly AI_STREAM_PRE_RESPONSE_RETRIES + 1 attempts hit the server // (pins both the limit and that the final error propagates — guards an // off-by-one or an infinite loop). expect(requests).toBe(MAX_ATTEMPTS); }); + it('honours a raised AI_STREAM_PRE_RESPONSE_RETRIES (more attempts before giving up)', async () => { + // Env-driven budget: 4 retries -> 5 total attempts against a persistently + // resetting connect. + process.env.AI_STREAM_PRE_RESPONSE_RETRIES = '4'; + resetMode = 'all'; + let caught: unknown; + try { + await retryingFetch()(url); + } catch (e) { + caught = e; + } + expect(caught).toBeDefined(); + expect(isRetryableConnectError(caught)).toBe(true); + expect(requests).toBe(5); + }); + it('does NOT retry an aborted request (no retry storm)', async () => { resetMode = 'all'; const ctrl = new AbortController(); diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.ts index f24abd39..725c71f5 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.ts @@ -19,7 +19,7 @@ import { Agent } from 'undici'; const DEFAULT_STREAM_TIMEOUT_MS = 900_000; /** - * Default keep-alive recycle window (10s). A pooled connection idle longer than + * Default keep-alive recycle window (4s). A pooled connection idle longer than * this is CLOSED rather than reused. * * Long agent turns leave gaps of tens of seconds between provider calls (one @@ -30,17 +30,70 @@ const DEFAULT_STREAM_TIMEOUT_MS = 900_000; * the resets correlate with idleSincePrevCall ~42s, while a direct path to the * provider does NOT reset). Recycling idle sockets well below such a drop window * means a long-gap call opens a fresh connection instead of reusing a stale one. + * Kept comfortably under common ~5s upstream/middlebox idle cutoffs so undici + * recycles the socket before the network kills it, while still long enough to + * reuse a connection within a single burst of back-to-back calls (#310). * `keepAliveMaxTimeout` also caps a server-advertised keep-alive so the provider * cannot push the reuse window back up. */ -const DEFAULT_STREAM_KEEPALIVE_MS = 10_000; +const DEFAULT_STREAM_KEEPALIVE_MS = 4_000; /** - * How many times to retry a PRE-RESPONSE connection failure (a reset/timeout - * before ANY response byte) on a fresh connection. Safe because `fetch()` only - * rejects before the Response resolves — a started stream is never replayed. + * Default number of times to retry a PRE-RESPONSE connection failure (a + * reset/timeout before ANY response byte) on a fresh connection. Safe because + * `fetch()` only rejects before the Response resolves — a started stream is + * never replayed. + * + * Raised from 2 to 4 (total 5 attempts) so a short BURST of upstream/middlebox + * resets is absorbed without exhausting the budget: prod saw 2 of 3 attempts + * burned on a single turn, leaving no headroom (#310). Override with + * `AI_STREAM_PRE_RESPONSE_RETRIES`. */ -const PRE_RESPONSE_CONNECT_RETRIES = 2; +const DEFAULT_PRE_RESPONSE_CONNECT_RETRIES = 4; + +/** + * Configured PRE-RESPONSE retry budget. Override with + * `AI_STREAM_PRE_RESPONSE_RETRIES`; a missing/invalid/negative value falls back + * to {@link DEFAULT_PRE_RESPONSE_CONNECT_RETRIES}. Total attempts = value + 1. + * 0 disables the retry (a single attempt). + */ +export function preResponseConnectRetries(): number { + // Read the raw string first: an empty/whitespace value coerces to 0 via + // Number(), which is a VALID setting here (0 = single attempt), so it must be + // treated as "unset" rather than "disable the retry". + const rawStr = process.env.AI_STREAM_PRE_RESPONSE_RETRIES; + if (rawStr === undefined || rawStr.trim() === '') { + return DEFAULT_PRE_RESPONSE_CONNECT_RETRIES; + } + const raw = Number(rawStr); + return Number.isFinite(raw) && raw >= 0 + ? Math.floor(raw) + : DEFAULT_PRE_RESPONSE_CONNECT_RETRIES; +} + +/** Base backoff before the first PRE-RESPONSE retry (ms). */ +const PRE_RESPONSE_BACKOFF_BASE_MS = 150; + +/** Cap on the exponential backoff window before jitter (ms). */ +const PRE_RESPONSE_BACKOFF_CAP_MS = 2_000; + +/** + * Backoff (ms) to wait before PRE-RESPONSE retry number `attempt` (0-based). + * + * Capped exponential with FULL jitter: `delay = random in [0, min(base*2^attempt, + * cap)]`. Full jitter spreads concurrent retries across the whole window so a + * burst of turns that all reset at once do not reconnect in lock-step and + * hammer the upstream in a thundering herd (#310); the exponential growth backs + * off harder as resets persist, and the cap keeps the wait bounded. + */ +export function preResponseBackoffMs( + attempt: number, + rand: () => number = Math.random, +): number { + const exp = PRE_RESPONSE_BACKOFF_BASE_MS * 2 ** attempt; + const capped = Math.min(exp, PRE_RESPONSE_BACKOFF_CAP_MS); + return rand() * capped; +} /** undici cause codes for a connection-level failure that occurred PRE-RESPONSE. */ const RETRYABLE_CONNECT_CODES = new Set([ @@ -177,20 +230,19 @@ export function createStreamingFetch(): typeof fetch { */ export function withPreResponseRetry(baseFetch: typeof fetch): typeof fetch { return (async (input: Parameters[0], init?: RequestInit) => { + const maxRetries = preResponseConnectRetries(); for (let attempt = 0; ; attempt++) { try { return await baseFetch(input, init); } catch (err) { const aborted = init?.signal?.aborted === true; - if ( - aborted || - attempt >= PRE_RESPONSE_CONNECT_RETRIES || - !isRetryableConnectError(err) - ) { + if (aborted || attempt >= maxRetries || !isRetryableConnectError(err)) { throw err; } - // Brief backoff before the fresh-connection retry. - await new Promise((resolve) => setTimeout(resolve, 150 * (attempt + 1))); + // Jittered backoff before the fresh-connection retry (anti-thundering-herd). + await new Promise((resolve) => + setTimeout(resolve, preResponseBackoffMs(attempt)), + ); } } }) as typeof fetch;