diff --git a/.env.example b/.env.example index 4726805b..97e8dba8 100644 --- a/.env.example +++ b/.env.example @@ -142,6 +142,13 @@ MCP_DOCMOST_PASSWORD= # provider is eventually broken instead of leaking forever. Default 900000 (15 min). # AI_STREAM_TIMEOUT_MS=900000 +# Keep-alive recycle window (ms) for streaming chat/agent AI + external-MCP calls. +# A pooled connection idle longer than this is closed instead of reused, so a +# NAT / egress firewall / reverse proxy that silently drops idle connections +# cannot poison a reused socket into a PRE-RESPONSE `read ECONNRESET`. Lower it if +# your egress drops idle connections faster than ~10s. Default 10000 (10 s). +# AI_STREAM_KEEPALIVE_MS=10000 + # --- Anonymous public-share AI assistant --- # Opt-in per workspace (AI settings -> "public share assistant"; off by default). # When enabled, anonymous visitors of a published share can ask an AI about that diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts index b28ecf51..07c8ec40 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts @@ -1,8 +1,11 @@ import * as http from 'node:http'; import { createStreamingFetch, + withPreResponseRetry, streamTimeoutMs, + streamKeepAliveMs, streamingDispatcherOptions, + isRetryableConnectError, } from './ai-streaming-fetch'; /** @@ -38,15 +41,54 @@ describe('streamTimeoutMs', () => { } }); - it('applies the timeout to BOTH undici stream timeouts', () => { + it('applies the silence timeout + keep-alive recycle window to the dispatcher', () => { delete process.env.AI_STREAM_TIMEOUT_MS; + delete process.env.AI_STREAM_KEEPALIVE_MS; expect(streamingDispatcherOptions()).toEqual({ headersTimeout: 900_000, bodyTimeout: 900_000, + keepAliveTimeout: 10_000, + keepAliveMaxTimeout: 10_000, }); }); }); +describe('streamKeepAliveMs', () => { + const ORIG = process.env.AI_STREAM_KEEPALIVE_MS; + afterEach(() => { + if (ORIG === undefined) delete process.env.AI_STREAM_KEEPALIVE_MS; + else process.env.AI_STREAM_KEEPALIVE_MS = ORIG; + }); + + it('defaults to 10s (recycle idle sockets so a NAT/proxy drop cannot poison reuse)', () => { + delete process.env.AI_STREAM_KEEPALIVE_MS; + expect(streamKeepAliveMs()).toBe(10_000); + }); + + it('honours a positive override and ignores invalid/non-positive', () => { + process.env.AI_STREAM_KEEPALIVE_MS = '4000'; + expect(streamKeepAliveMs()).toBe(4000); + for (const bad of ['0', '-1', 'x', '']) { + process.env.AI_STREAM_KEEPALIVE_MS = bad; + expect(streamKeepAliveMs()).toBe(10_000); + } + }); +}); + +describe('isRetryableConnectError', () => { + it('matches connection-level codes on the error or its cause', () => { + expect(isRetryableConnectError({ cause: { code: 'ECONNRESET' } })).toBe(true); + expect(isRetryableConnectError({ cause: { code: 'UND_ERR_SOCKET' } })).toBe(true); + expect(isRetryableConnectError({ code: 'ECONNREFUSED' })).toBe(true); + }); + it('does NOT match aborts / unrelated errors', () => { + expect(isRetryableConnectError({ name: 'AbortError', cause: { code: 'ABORT_ERR' } })).toBe(false); + expect(isRetryableConnectError({ cause: { code: 'UND_ERR_HEADERS_TIMEOUT' } })).toBe(false); + expect(isRetryableConnectError(new Error('plain'))).toBe(false); + expect(isRetryableConnectError(undefined)).toBe(false); + }); +}); + describe('createStreamingFetch — against a delayed server', () => { const ORIG = process.env.AI_STREAM_TIMEOUT_MS; let server: http.Server; @@ -110,3 +152,84 @@ describe('createStreamingFetch — against a delayed server', () => { if (code) expect(code).toBe('UND_ERR_HEADERS_TIMEOUT'); }); }); + +describe('withPreResponseRetry', () => { + // The retry is the OUTERMOST layer (over the dispatcher-bound streaming fetch), + // matching ai.service's withPreResponseRetry(instrument(createStreamingFetch())). + // PRE_RESPONSE_CONNECT_RETRIES is 2 -> at most 3 total attempts. + const MAX_ATTEMPTS = 3; + let server: http.Server; + let url: string; + let requests = 0; + // 'first' resets only the first connection; 'all' resets every connection. + let resetMode: 'first' | 'all' = 'first'; + + const retryingFetch = () => withPreResponseRetry(createStreamingFetch()); + + beforeAll(async () => { + server = http.createServer((req, res) => { + requests += 1; + const shouldReset = resetMode === 'all' || requests === 1; + if (shouldReset) { + // Reset before any response byte (a poisoned/stale keep-alive socket). + const sock = req.socket as import('node:net').Socket & { + resetAndDestroy?: () => void; + }; + if (typeof sock.resetAndDestroy === 'function') sock.resetAndDestroy(); + else sock.destroy(); + return; + } + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.end('ok'); + }); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const addr = server.address() as import('node:net').AddressInfo; + url = `http://127.0.0.1:${addr.port}/`; + }); + + afterAll(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + beforeEach(() => { + requests = 0; + resetMode = 'first'; + }); + + it('retries a pre-response reset on a fresh connection and succeeds', async () => { + resetMode = 'first'; + const res = await retryingFetch()(url); + expect(res.status).toBe(200); + expect(await res.text()).toBe('ok'); + // first request reset -> retry -> second request served. + expect(requests).toBe(2); + }); + + it('gives up after the retry bound and rethrows the original reset', async () => { + resetMode = 'all'; // every attempt resets -> retries exhaust + let caught: unknown; + try { + await retryingFetch()(url); + } catch (e) { + caught = e; + } + expect(caught).toBeDefined(); + // A retryable connection error reached the caller (not swallowed). + expect(isRetryableConnectError(caught)).toBe(true); + // Bounded: exactly PRE_RESPONSE_CONNECT_RETRIES + 1 attempts hit the server + // (pins both the limit and that the final error propagates — guards an + // off-by-one or an infinite loop). + expect(requests).toBe(MAX_ATTEMPTS); + }); + + it('does NOT retry an aborted request (no retry storm)', async () => { + resetMode = 'all'; + const ctrl = new AbortController(); + ctrl.abort(); + await expect( + retryingFetch()(url, { signal: ctrl.signal }), + ).rejects.toBeDefined(); + // Pre-aborted: the request never reached the server, so nothing was retried. + expect(requests).toBe(0); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.ts index f257fe4e..b781df9a 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.ts @@ -18,41 +18,139 @@ import { Agent } from 'undici'; */ const DEFAULT_STREAM_TIMEOUT_MS = 900_000; +/** + * Default keep-alive recycle window (10s). A pooled connection idle longer than + * this is CLOSED rather than reused. + * + * Long agent turns leave gaps of tens of seconds between provider calls (one + * call per step; a crawl/search tool runs in between). A NAT / reverse proxy / + * conntrack in front of the deployment silently drops an idle connection after + * its own timeout; undici, not knowing, then reuses that dead socket and the + * next request fails PRE-RESPONSE with `read ECONNRESET` (#175 prod telemetry: + * the resets correlate with idleSincePrevCall ~42s, while a direct path to the + * provider does NOT reset). Recycling idle sockets well below such a drop window + * means a long-gap call opens a fresh connection instead of reusing a stale one. + * `keepAliveMaxTimeout` also caps a server-advertised keep-alive so the provider + * cannot push the reuse window back up. + */ +const DEFAULT_STREAM_KEEPALIVE_MS = 10_000; + +/** + * How many times to retry a PRE-RESPONSE connection failure (a reset/timeout + * before ANY response byte) on a fresh connection. Safe because `fetch()` only + * rejects before the Response resolves — a started stream is never replayed. + */ +const PRE_RESPONSE_CONNECT_RETRIES = 2; + +/** undici cause codes for a connection-level failure that occurred PRE-RESPONSE. */ +const RETRYABLE_CONNECT_CODES = new Set([ + 'ECONNRESET', + 'ECONNREFUSED', + 'EPIPE', + 'ETIMEDOUT', + 'UND_ERR_SOCKET', + 'UND_ERR_CONNECT_TIMEOUT', +]); + +function positiveEnv(name: string, fallback: number): number { + const raw = Number(process.env[name]); + return Number.isFinite(raw) && raw > 0 ? raw : fallback; +} + /** * The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a * missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}. */ export function streamTimeoutMs(): number { - const raw = Number(process.env.AI_STREAM_TIMEOUT_MS); - return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS; + return positiveEnv('AI_STREAM_TIMEOUT_MS', DEFAULT_STREAM_TIMEOUT_MS); +} + +/** Keep-alive recycle window (ms). Override with `AI_STREAM_KEEPALIVE_MS`. */ +export function streamKeepAliveMs(): number { + return positiveEnv('AI_STREAM_KEEPALIVE_MS', DEFAULT_STREAM_KEEPALIVE_MS); } /** - * undici `Agent` timeout options for streaming AI traffic — both stream timeouts - * set to the (generous, finite) silence timeout. Shared by the chat provider - * fetch and the external-MCP dispatcher so they behave identically (#175). + * undici `Agent` options for streaming AI traffic — the (generous, finite) + * silence timeouts plus the keep-alive recycle window. Shared by the chat + * provider fetch and the external-MCP dispatcher so they behave identically. */ export function streamingDispatcherOptions(): { headersTimeout: number; bodyTimeout: number; + keepAliveTimeout: number; + keepAliveMaxTimeout: number; } { const t = streamTimeoutMs(); - return { headersTimeout: t, bodyTimeout: t }; + const ka = streamKeepAliveMs(); + return { + headersTimeout: t, + bodyTimeout: t, + keepAliveTimeout: ka, + keepAliveMaxTimeout: ka, + }; +} + +/** True for a connection-level error worth retrying on a fresh connection. */ +export function isRetryableConnectError(err: unknown): boolean { + const e = err as { code?: string; cause?: { code?: string } } | undefined; + const code = e?.cause?.code ?? e?.code; + return typeof code === 'string' && RETRYABLE_CONNECT_CODES.has(code); } /** * Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed - * by a dedicated undici dispatcher whose stream timeouts are the generous-but- - * finite silence timeout above (#175). A single shared dispatcher is returned - * (callers hold it for the service lifetime) so its connection pool is reused. + * by a dedicated undici dispatcher (finite silence timeouts + keep-alive + * recycling, #175). A single shared dispatcher is returned (callers hold it for + * the service lifetime) so its connection pool is reused. + * + * This is the BASE transport — no retry. The chat path wraps it as + * `withPreResponseRetry(createInstrumentedFetch(ctx, createStreamingFetch()))` + * so the retry is the OUTERMOST layer and the instrumentation observes EVERY + * attempt (a recovered reset is still logged — see withPreResponseRetry). */ export function createStreamingFetch(): typeof fetch { const dispatcher = new Agent(streamingDispatcherOptions()); return ((input: Parameters[0], init?: RequestInit) => fetch(input, { ...(init ?? {}), - // `dispatcher` is an undici-specific init field (not in the DOM RequestInit - // type); Node's global fetch reads it. Cast to satisfy the type. + // `dispatcher` is an undici-specific init field (not in the DOM + // RequestInit type); Node's global fetch reads it. Cast to satisfy it. dispatcher, } as RequestInit & { dispatcher: Agent })) as typeof fetch; } + +/** + * Wrap a fetch so a PRE-RESPONSE connection reset (`baseFetch` rejects before the + * Response resolves — so nothing has streamed) is retried a few times on a fresh + * connection (#175). A poisoned keep-alive socket is destroyed by undici on the + * reset, so the retry lands on a new connection. An abort (client disconnect) is + * never retried. + * + * This is the OUTERMOST transport layer by design: composing it as + * `withPreResponseRetry(instrumentedFetch)` means every attempt — including the + * resets that the retry recovers from — flows through the instrumentation, so the + * "PRE-RESPONSE FAILED ... ECONNRESET ... idleSincePrevCall" telemetry stays + * visible precisely when the fix is working (and AI_STREAM_KEEPALIVE_MS can be + * tuned from real data). A retry INSIDE the transport would hide it. + */ +export function withPreResponseRetry(baseFetch: typeof fetch): typeof fetch { + return (async (input: Parameters[0], init?: RequestInit) => { + for (let attempt = 0; ; attempt++) { + try { + return await baseFetch(input, init); + } catch (err) { + const aborted = init?.signal?.aborted === true; + if ( + aborted || + attempt >= PRE_RESPONSE_CONNECT_RETRIES || + !isRetryableConnectError(err) + ) { + throw err; + } + // Brief backoff before the fresh-connection retry. + await new Promise((resolve) => setTimeout(resolve, 150 * (attempt + 1))); + } + } + }) as typeof fetch; +} diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index 18f15b5d..16aa6997 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -16,7 +16,10 @@ import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { describeProviderError } from './ai-error.util'; import { createInstrumentedFetch } from './ai-provider-http'; -import { createStreamingFetch } from './ai-streaming-fetch'; +import { + createStreamingFetch, + withPreResponseRetry, +} from './ai-streaming-fetch'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { SecretBoxService } from '../crypto/secret-box'; import { AiDriver } from './ai.types'; @@ -46,14 +49,15 @@ export interface ChatModelOverride { export class AiService { private readonly logger = new Logger(AiService.name); - // Provider HTTP fetch for the chat path: the streaming fetch — which RAISES - // undici's 300s headers/body timeouts to a generous-but-finite silence timeout - // so a long agent turn is not severed mid-stream (#175) — wrapped with the - // provider-HTTP instrumentation so the logs observe that exact transport. Held - // for the service lifetime to reuse the streaming dispatcher's connection pool. - private readonly aiProviderFetch = createInstrumentedFetch( - 'AiService:provider-http', - createStreamingFetch(), + // Provider HTTP fetch for the chat path, layered so each transport concern is + // observed (#175). Inside-out: the streaming fetch (finite silence timeouts + + // keep-alive recycling) → provider-HTTP instrumentation (logs every attempt) → + // pre-response connection-reset retry as the OUTERMOST layer. Retry-outer means + // a reset the retry recovers from is still logged with its idle-gap, instead of + // collapsing into a clean "OK". Held for the service lifetime to reuse the + // streaming dispatcher's connection pool. + private readonly aiProviderFetch = withPreResponseRetry( + createInstrumentedFetch('AiService:provider-http', createStreamingFetch()), ); constructor(