fix(ai-chat): recycle keep-alive sockets + retry pre-response resets (#175) #179
@@ -142,6 +142,13 @@ MCP_DOCMOST_PASSWORD=
|
|||||||
# provider is eventually broken instead of leaking forever. Default 900000 (15 min).
|
# provider is eventually broken instead of leaking forever. Default 900000 (15 min).
|
||||||
# AI_STREAM_TIMEOUT_MS=900000
|
# AI_STREAM_TIMEOUT_MS=900000
|
||||||
|
|
||||||
|
# Keep-alive recycle window (ms) for streaming chat/agent AI + external-MCP calls.
|
||||||
|
# A pooled connection idle longer than this is closed instead of reused, so a
|
||||||
|
# NAT / egress firewall / reverse proxy that silently drops idle connections
|
||||||
|
# cannot poison a reused socket into a PRE-RESPONSE `read ECONNRESET`. Lower it if
|
||||||
|
# your egress drops idle connections faster than ~10s. Default 10000 (10 s).
|
||||||
|
# AI_STREAM_KEEPALIVE_MS=10000
|
||||||
|
|
||||||
# --- Anonymous public-share AI assistant ---
|
# --- Anonymous public-share AI assistant ---
|
||||||
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
||||||
# When enabled, anonymous visitors of a published share can ask an AI about that
|
# When enabled, anonymous visitors of a published share can ask an AI about that
|
||||||
|
|||||||
@@ -1,8 +1,11 @@
|
|||||||
import * as http from 'node:http';
|
import * as http from 'node:http';
|
||||||
import {
|
import {
|
||||||
createStreamingFetch,
|
createStreamingFetch,
|
||||||
|
withPreResponseRetry,
|
||||||
streamTimeoutMs,
|
streamTimeoutMs,
|
||||||
|
streamKeepAliveMs,
|
||||||
streamingDispatcherOptions,
|
streamingDispatcherOptions,
|
||||||
|
isRetryableConnectError,
|
||||||
} from './ai-streaming-fetch';
|
} from './ai-streaming-fetch';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -38,15 +41,54 @@ describe('streamTimeoutMs', () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
it('applies the timeout to BOTH undici stream timeouts', () => {
|
it('applies the silence timeout + keep-alive recycle window to the dispatcher', () => {
|
||||||
delete process.env.AI_STREAM_TIMEOUT_MS;
|
delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||||
|
delete process.env.AI_STREAM_KEEPALIVE_MS;
|
||||||
expect(streamingDispatcherOptions()).toEqual({
|
expect(streamingDispatcherOptions()).toEqual({
|
||||||
headersTimeout: 900_000,
|
headersTimeout: 900_000,
|
||||||
bodyTimeout: 900_000,
|
bodyTimeout: 900_000,
|
||||||
|
keepAliveTimeout: 10_000,
|
||||||
|
keepAliveMaxTimeout: 10_000,
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('streamKeepAliveMs', () => {
|
||||||
|
const ORIG = process.env.AI_STREAM_KEEPALIVE_MS;
|
||||||
|
afterEach(() => {
|
||||||
|
if (ORIG === undefined) delete process.env.AI_STREAM_KEEPALIVE_MS;
|
||||||
|
else process.env.AI_STREAM_KEEPALIVE_MS = ORIG;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('defaults to 10s (recycle idle sockets so a NAT/proxy drop cannot poison reuse)', () => {
|
||||||
|
delete process.env.AI_STREAM_KEEPALIVE_MS;
|
||||||
|
expect(streamKeepAliveMs()).toBe(10_000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('honours a positive override and ignores invalid/non-positive', () => {
|
||||||
|
process.env.AI_STREAM_KEEPALIVE_MS = '4000';
|
||||||
|
expect(streamKeepAliveMs()).toBe(4000);
|
||||||
|
for (const bad of ['0', '-1', 'x', '']) {
|
||||||
|
process.env.AI_STREAM_KEEPALIVE_MS = bad;
|
||||||
|
expect(streamKeepAliveMs()).toBe(10_000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('isRetryableConnectError', () => {
|
||||||
|
it('matches connection-level codes on the error or its cause', () => {
|
||||||
|
expect(isRetryableConnectError({ cause: { code: 'ECONNRESET' } })).toBe(true);
|
||||||
|
expect(isRetryableConnectError({ cause: { code: 'UND_ERR_SOCKET' } })).toBe(true);
|
||||||
|
expect(isRetryableConnectError({ code: 'ECONNREFUSED' })).toBe(true);
|
||||||
|
});
|
||||||
|
it('does NOT match aborts / unrelated errors', () => {
|
||||||
|
expect(isRetryableConnectError({ name: 'AbortError', cause: { code: 'ABORT_ERR' } })).toBe(false);
|
||||||
|
expect(isRetryableConnectError({ cause: { code: 'UND_ERR_HEADERS_TIMEOUT' } })).toBe(false);
|
||||||
|
expect(isRetryableConnectError(new Error('plain'))).toBe(false);
|
||||||
|
expect(isRetryableConnectError(undefined)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('createStreamingFetch — against a delayed server', () => {
|
describe('createStreamingFetch — against a delayed server', () => {
|
||||||
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
|
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
|
||||||
let server: http.Server;
|
let server: http.Server;
|
||||||
@@ -110,3 +152,84 @@ describe('createStreamingFetch — against a delayed server', () => {
|
|||||||
if (code) expect(code).toBe('UND_ERR_HEADERS_TIMEOUT');
|
if (code) expect(code).toBe('UND_ERR_HEADERS_TIMEOUT');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('withPreResponseRetry', () => {
|
||||||
|
// The retry is the OUTERMOST layer (over the dispatcher-bound streaming fetch),
|
||||||
|
// matching ai.service's withPreResponseRetry(instrument(createStreamingFetch())).
|
||||||
|
// PRE_RESPONSE_CONNECT_RETRIES is 2 -> at most 3 total attempts.
|
||||||
|
const MAX_ATTEMPTS = 3;
|
||||||
|
let server: http.Server;
|
||||||
|
let url: string;
|
||||||
|
let requests = 0;
|
||||||
|
// 'first' resets only the first connection; 'all' resets every connection.
|
||||||
|
let resetMode: 'first' | 'all' = 'first';
|
||||||
|
|
||||||
|
const retryingFetch = () => withPreResponseRetry(createStreamingFetch());
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
server = http.createServer((req, res) => {
|
||||||
|
requests += 1;
|
||||||
|
const shouldReset = resetMode === 'all' || requests === 1;
|
||||||
|
if (shouldReset) {
|
||||||
|
// Reset before any response byte (a poisoned/stale keep-alive socket).
|
||||||
|
const sock = req.socket as import('node:net').Socket & {
|
||||||
|
resetAndDestroy?: () => void;
|
||||||
|
};
|
||||||
|
if (typeof sock.resetAndDestroy === 'function') sock.resetAndDestroy();
|
||||||
|
else sock.destroy();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
res.writeHead(200, { 'Content-Type': 'text/plain' });
|
||||||
|
res.end('ok');
|
||||||
|
});
|
||||||
|
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
|
||||||
|
const addr = server.address() as import('node:net').AddressInfo;
|
||||||
|
url = `http://127.0.0.1:${addr.port}/`;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
requests = 0;
|
||||||
|
resetMode = 'first';
|
||||||
|
});
|
||||||
|
|
||||||
|
it('retries a pre-response reset on a fresh connection and succeeds', async () => {
|
||||||
|
resetMode = 'first';
|
||||||
|
const res = await retryingFetch()(url);
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(await res.text()).toBe('ok');
|
||||||
|
// first request reset -> retry -> second request served.
|
||||||
|
expect(requests).toBe(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('gives up after the retry bound and rethrows the original reset', async () => {
|
||||||
|
resetMode = 'all'; // every attempt resets -> retries exhaust
|
||||||
|
let caught: unknown;
|
||||||
|
try {
|
||||||
|
await retryingFetch()(url);
|
||||||
|
} catch (e) {
|
||||||
|
caught = e;
|
||||||
|
}
|
||||||
|
expect(caught).toBeDefined();
|
||||||
|
// A retryable connection error reached the caller (not swallowed).
|
||||||
|
expect(isRetryableConnectError(caught)).toBe(true);
|
||||||
|
// Bounded: exactly PRE_RESPONSE_CONNECT_RETRIES + 1 attempts hit the server
|
||||||
|
// (pins both the limit and that the final error propagates — guards an
|
||||||
|
// off-by-one or an infinite loop).
|
||||||
|
expect(requests).toBe(MAX_ATTEMPTS);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT retry an aborted request (no retry storm)', async () => {
|
||||||
|
resetMode = 'all';
|
||||||
|
const ctrl = new AbortController();
|
||||||
|
ctrl.abort();
|
||||||
|
await expect(
|
||||||
|
retryingFetch()(url, { signal: ctrl.signal }),
|
||||||
|
).rejects.toBeDefined();
|
||||||
|
// Pre-aborted: the request never reached the server, so nothing was retried.
|
||||||
|
expect(requests).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -18,41 +18,139 @@ import { Agent } from 'undici';
|
|||||||
*/
|
*/
|
||||||
const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
|
const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default keep-alive recycle window (10s). A pooled connection idle longer than
|
||||||
|
* this is CLOSED rather than reused.
|
||||||
|
*
|
||||||
|
* Long agent turns leave gaps of tens of seconds between provider calls (one
|
||||||
|
* call per step; a crawl/search tool runs in between). A NAT / reverse proxy /
|
||||||
|
* conntrack in front of the deployment silently drops an idle connection after
|
||||||
|
* its own timeout; undici, not knowing, then reuses that dead socket and the
|
||||||
|
* next request fails PRE-RESPONSE with `read ECONNRESET` (#175 prod telemetry:
|
||||||
|
* the resets correlate with idleSincePrevCall ~42s, while a direct path to the
|
||||||
|
* provider does NOT reset). Recycling idle sockets well below such a drop window
|
||||||
|
* means a long-gap call opens a fresh connection instead of reusing a stale one.
|
||||||
|
* `keepAliveMaxTimeout` also caps a server-advertised keep-alive so the provider
|
||||||
|
* cannot push the reuse window back up.
|
||||||
|
*/
|
||||||
|
const DEFAULT_STREAM_KEEPALIVE_MS = 10_000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many times to retry a PRE-RESPONSE connection failure (a reset/timeout
|
||||||
|
* before ANY response byte) on a fresh connection. Safe because `fetch()` only
|
||||||
|
* rejects before the Response resolves — a started stream is never replayed.
|
||||||
|
*/
|
||||||
|
const PRE_RESPONSE_CONNECT_RETRIES = 2;
|
||||||
|
|
||||||
|
/** undici cause codes for a connection-level failure that occurred PRE-RESPONSE. */
|
||||||
|
const RETRYABLE_CONNECT_CODES = new Set([
|
||||||
|
'ECONNRESET',
|
||||||
|
'ECONNREFUSED',
|
||||||
|
'EPIPE',
|
||||||
|
'ETIMEDOUT',
|
||||||
|
'UND_ERR_SOCKET',
|
||||||
|
'UND_ERR_CONNECT_TIMEOUT',
|
||||||
|
]);
|
||||||
|
|
||||||
|
function positiveEnv(name: string, fallback: number): number {
|
||||||
|
const raw = Number(process.env[name]);
|
||||||
|
return Number.isFinite(raw) && raw > 0 ? raw : fallback;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a
|
* The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a
|
||||||
* missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}.
|
* missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}.
|
||||||
*/
|
*/
|
||||||
export function streamTimeoutMs(): number {
|
export function streamTimeoutMs(): number {
|
||||||
const raw = Number(process.env.AI_STREAM_TIMEOUT_MS);
|
return positiveEnv('AI_STREAM_TIMEOUT_MS', DEFAULT_STREAM_TIMEOUT_MS);
|
||||||
return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS;
|
}
|
||||||
|
|
||||||
|
/** Keep-alive recycle window (ms). Override with `AI_STREAM_KEEPALIVE_MS`. */
|
||||||
|
export function streamKeepAliveMs(): number {
|
||||||
|
return positiveEnv('AI_STREAM_KEEPALIVE_MS', DEFAULT_STREAM_KEEPALIVE_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* undici `Agent` timeout options for streaming AI traffic — both stream timeouts
|
* undici `Agent` options for streaming AI traffic — the (generous, finite)
|
||||||
* set to the (generous, finite) silence timeout. Shared by the chat provider
|
* silence timeouts plus the keep-alive recycle window. Shared by the chat
|
||||||
* fetch and the external-MCP dispatcher so they behave identically (#175).
|
* provider fetch and the external-MCP dispatcher so they behave identically.
|
||||||
*/
|
*/
|
||||||
export function streamingDispatcherOptions(): {
|
export function streamingDispatcherOptions(): {
|
||||||
headersTimeout: number;
|
headersTimeout: number;
|
||||||
bodyTimeout: number;
|
bodyTimeout: number;
|
||||||
|
keepAliveTimeout: number;
|
||||||
|
keepAliveMaxTimeout: number;
|
||||||
} {
|
} {
|
||||||
const t = streamTimeoutMs();
|
const t = streamTimeoutMs();
|
||||||
return { headersTimeout: t, bodyTimeout: t };
|
const ka = streamKeepAliveMs();
|
||||||
|
return {
|
||||||
|
headersTimeout: t,
|
||||||
|
bodyTimeout: t,
|
||||||
|
keepAliveTimeout: ka,
|
||||||
|
keepAliveMaxTimeout: ka,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/** True for a connection-level error worth retrying on a fresh connection. */
|
||||||
|
export function isRetryableConnectError(err: unknown): boolean {
|
||||||
|
const e = err as { code?: string; cause?: { code?: string } } | undefined;
|
||||||
|
const code = e?.cause?.code ?? e?.code;
|
||||||
|
return typeof code === 'string' && RETRYABLE_CONNECT_CODES.has(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed
|
* Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed
|
||||||
* by a dedicated undici dispatcher whose stream timeouts are the generous-but-
|
* by a dedicated undici dispatcher (finite silence timeouts + keep-alive
|
||||||
* finite silence timeout above (#175). A single shared dispatcher is returned
|
* recycling, #175). A single shared dispatcher is returned (callers hold it for
|
||||||
* (callers hold it for the service lifetime) so its connection pool is reused.
|
* the service lifetime) so its connection pool is reused.
|
||||||
|
*
|
||||||
|
* This is the BASE transport — no retry. The chat path wraps it as
|
||||||
|
* `withPreResponseRetry(createInstrumentedFetch(ctx, createStreamingFetch()))`
|
||||||
|
* so the retry is the OUTERMOST layer and the instrumentation observes EVERY
|
||||||
|
* attempt (a recovered reset is still logged — see withPreResponseRetry).
|
||||||
*/
|
*/
|
||||||
export function createStreamingFetch(): typeof fetch {
|
export function createStreamingFetch(): typeof fetch {
|
||||||
const dispatcher = new Agent(streamingDispatcherOptions());
|
const dispatcher = new Agent(streamingDispatcherOptions());
|
||||||
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
|
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
|
||||||
fetch(input, {
|
fetch(input, {
|
||||||
...(init ?? {}),
|
...(init ?? {}),
|
||||||
// `dispatcher` is an undici-specific init field (not in the DOM RequestInit
|
// `dispatcher` is an undici-specific init field (not in the DOM
|
||||||
// type); Node's global fetch reads it. Cast to satisfy the type.
|
// RequestInit type); Node's global fetch reads it. Cast to satisfy it.
|
||||||
dispatcher,
|
dispatcher,
|
||||||
} as RequestInit & { dispatcher: Agent })) as typeof fetch;
|
} as RequestInit & { dispatcher: Agent })) as typeof fetch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap a fetch so a PRE-RESPONSE connection reset (`baseFetch` rejects before the
|
||||||
|
* Response resolves — so nothing has streamed) is retried a few times on a fresh
|
||||||
|
* connection (#175). A poisoned keep-alive socket is destroyed by undici on the
|
||||||
|
* reset, so the retry lands on a new connection. An abort (client disconnect) is
|
||||||
|
* never retried.
|
||||||
|
*
|
||||||
|
* This is the OUTERMOST transport layer by design: composing it as
|
||||||
|
* `withPreResponseRetry(instrumentedFetch)` means every attempt — including the
|
||||||
|
* resets that the retry recovers from — flows through the instrumentation, so the
|
||||||
|
* "PRE-RESPONSE FAILED ... ECONNRESET ... idleSincePrevCall" telemetry stays
|
||||||
|
* visible precisely when the fix is working (and AI_STREAM_KEEPALIVE_MS can be
|
||||||
|
* tuned from real data). A retry INSIDE the transport would hide it.
|
||||||
|
*/
|
||||||
|
export function withPreResponseRetry(baseFetch: typeof fetch): typeof fetch {
|
||||||
|
return (async (input: Parameters<typeof fetch>[0], init?: RequestInit) => {
|
||||||
|
for (let attempt = 0; ; attempt++) {
|
||||||
|
try {
|
||||||
|
return await baseFetch(input, init);
|
||||||
|
} catch (err) {
|
||||||
|
const aborted = init?.signal?.aborted === true;
|
||||||
|
if (
|
||||||
|
aborted ||
|
||||||
|
attempt >= PRE_RESPONSE_CONNECT_RETRIES ||
|
||||||
|
!isRetryableConnectError(err)
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
// Brief backoff before the fresh-connection retry.
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 150 * (attempt + 1)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}) as typeof fetch;
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,7 +16,10 @@ import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured
|
|||||||
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
||||||
import { describeProviderError } from './ai-error.util';
|
import { describeProviderError } from './ai-error.util';
|
||||||
import { createInstrumentedFetch } from './ai-provider-http';
|
import { createInstrumentedFetch } from './ai-provider-http';
|
||||||
import { createStreamingFetch } from './ai-streaming-fetch';
|
import {
|
||||||
|
createStreamingFetch,
|
||||||
|
withPreResponseRetry,
|
||||||
|
} from './ai-streaming-fetch';
|
||||||
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
||||||
import { SecretBoxService } from '../crypto/secret-box';
|
import { SecretBoxService } from '../crypto/secret-box';
|
||||||
import { AiDriver } from './ai.types';
|
import { AiDriver } from './ai.types';
|
||||||
@@ -46,14 +49,15 @@ export interface ChatModelOverride {
|
|||||||
export class AiService {
|
export class AiService {
|
||||||
private readonly logger = new Logger(AiService.name);
|
private readonly logger = new Logger(AiService.name);
|
||||||
|
|
||||||
// Provider HTTP fetch for the chat path: the streaming fetch — which RAISES
|
// Provider HTTP fetch for the chat path, layered so each transport concern is
|
||||||
// undici's 300s headers/body timeouts to a generous-but-finite silence timeout
|
// observed (#175). Inside-out: the streaming fetch (finite silence timeouts +
|
||||||
// so a long agent turn is not severed mid-stream (#175) — wrapped with the
|
// keep-alive recycling) → provider-HTTP instrumentation (logs every attempt) →
|
||||||
// provider-HTTP instrumentation so the logs observe that exact transport. Held
|
// pre-response connection-reset retry as the OUTERMOST layer. Retry-outer means
|
||||||
// for the service lifetime to reuse the streaming dispatcher's connection pool.
|
// a reset the retry recovers from is still logged with its idle-gap, instead of
|
||||||
private readonly aiProviderFetch = createInstrumentedFetch(
|
// collapsing into a clean "OK". Held for the service lifetime to reuse the
|
||||||
'AiService:provider-http',
|
// streaming dispatcher's connection pool.
|
||||||
createStreamingFetch(),
|
private readonly aiProviderFetch = withPreResponseRetry(
|
||||||
|
createInstrumentedFetch('AiService:provider-http', createStreamingFetch()),
|
||||||
);
|
);
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
|
|||||||
Reference in New Issue
Block a user