fix(ai-chat): raise undici's 300s stream timeout for long agent turns (#175)
Long research turns failed mid-task with "Lost connection to the AI provider".
Node's global fetch (undici) defaults BOTH headersTimeout and bodyTimeout to
300_000ms, and the chat provider + the external-MCP dispatcher both ran on it
with no override, so:
- the z.ai chat stream dropped when a late step's huge accumulated context
pushed the model's time-to-first-token past 5 min (the model reasons
server-side with NO streamed reasoning, so the connection is silent until the
first answer token — reproduced: even a trivial glm-5.2 query has a ~4-8s
first-chunk gap; a long run reaches 400k+-token steps), or a reasoning model
paused >5 min between chunks (bodyTimeout);
- the crawl4ai SSE transport, held open across the whole turn, dropped when it
idled >5 min between tool calls.
Fix: a dedicated undici dispatcher whose stream timeouts are raised to a
generous-but-FINITE silence timeout (default 15 min, AI_STREAM_TIMEOUT_MS) on
each path. NOT disabled (0): that would let a genuinely hung provider — with the
client still connected — hang forever, since the turn's abortSignal only fires on
client disconnect. The timeout bounds SILENCE (time-to-first-byte and the gap
BETWEEN chunks), NOT total turn duration, so an arbitrarily long turn that keeps
streaming is never cut; only a stream quiet for >15 min is treated as a hang.
- ai-streaming-fetch.ts: createStreamingFetch() + streamTimeoutMs() /
streamingDispatcherOptions() (the shared, configurable timeout).
- ai.service: the chat provider fetch is createStreamingFetch(), wrapped by the
existing passive ECONNRESET telemetry (createDiagnosticFetch gained an
optional baseFetch) so the telemetry observes the SAME transport.
- mcp-clients: the SSRF-pinned Agent uses streamingDispatcherOptions().
Investigation: reproduced the transport mechanism against the real z.ai endpoint
(a 1ms headersTimeout throws UND_ERR_HEADERS_TIMEOUT — the exact drop) and ran
the actual research agent to a ~428k-token context. Verified the fixed path
streams cleanly live (glm-5.2 turns finish; telemetry confirms the streaming
fetch is in use).
Tests: ai-streaming-fetch.spec (default 15m + env override + invalid fallback +
both-timeouts + streams a delayed response); ai-http-diagnostics + ai/mcp specs
green. server tsc clean.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,7 @@ import { createMCPClient } from '@ai-sdk/mcp';
|
||||
import { Agent, type Dispatcher } from 'undici';
|
||||
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
|
||||
import { AiMcpServer } from '@docmost/db/types/entity.types';
|
||||
import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch';
|
||||
import { SecretBoxService } from '../../../integrations/crypto/secret-box';
|
||||
import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
|
||||
|
||||
@@ -400,6 +401,16 @@ export function validateResolvedAddresses(
|
||||
*/
|
||||
function buildPinnedDispatcher(): Agent {
|
||||
return new Agent({
|
||||
// Raise undici's default 300s headers/body timeouts on external MCP traffic
|
||||
// to the same generous-but-finite silence timeout the chat fetch uses (#175).
|
||||
// A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open
|
||||
// across the whole turn; that connection can idle BETWEEN tool calls longer
|
||||
// than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a
|
||||
// tool-call failure that aborts the streamed turn and shows the user "Lost
|
||||
// connection to the AI provider". A slow single tool call (a crawl) can
|
||||
// likewise exceed headersTimeout. The timeout stays FINITE so a genuinely
|
||||
// hung server is still broken eventually.
|
||||
...streamingDispatcherOptions(),
|
||||
connect: {
|
||||
lookup: (hostname, _options, callback) => {
|
||||
// Always resolve ALL addresses ourselves; do not trust the caller's
|
||||
|
||||
@@ -25,7 +25,13 @@ import { Logger } from '@nestjs/common';
|
||||
* The seq/last-call timestamps are module-level, so under concurrent turns the
|
||||
* idle-gap figure is approximate (fine for single-user reproduction).
|
||||
*/
|
||||
export function createDiagnosticFetch(context: string): typeof fetch {
|
||||
export function createDiagnosticFetch(
|
||||
context: string,
|
||||
// The underlying fetch to instrument. Defaults to the global fetch; the chat
|
||||
// provider passes a streaming fetch (disabled undici stream timeouts, #175) so
|
||||
// the telemetry observes the SAME transport the long agent turn actually uses.
|
||||
baseFetch: typeof fetch = fetch,
|
||||
): typeof fetch {
|
||||
const logger = new Logger(context);
|
||||
let callSeq = 0;
|
||||
let lastCallStartedAt: number | undefined;
|
||||
@@ -46,9 +52,9 @@ export function createDiagnosticFetch(context: string): typeof fetch {
|
||||
? body.byteLength
|
||||
: undefined;
|
||||
try {
|
||||
// Delegate to global fetch; return the Response UNTOUCHED (never read/clone
|
||||
// the body) so the streamed SSE response is unaffected.
|
||||
const res = await fetch(input, init);
|
||||
// Delegate to the base fetch; return the Response UNTOUCHED (never read/
|
||||
// clone the body) so the streamed SSE response is unaffected.
|
||||
const res = await baseFetch(input, init);
|
||||
logger.log(
|
||||
`provider HTTP DIAGNOSTIC: call#${callId} OK ` +
|
||||
`headersAfter=${Date.now() - startedAt}ms status=${res.status} ` +
|
||||
|
||||
78
apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts
Normal file
78
apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
import * as http from 'node:http';
|
||||
import {
|
||||
createStreamingFetch,
|
||||
streamTimeoutMs,
|
||||
streamingDispatcherOptions,
|
||||
} from './ai-streaming-fetch';
|
||||
|
||||
/**
|
||||
* #175: undici's default 300s headers/body timeouts severed long agent turns.
|
||||
* The streaming fetch raises them to a generous-but-FINITE silence timeout (not
|
||||
* 0 — a true hang must still break). We pin: the configured value + env override,
|
||||
* that both dispatcher timeouts use it, and that a delayed response streams.
|
||||
*/
|
||||
describe('streamTimeoutMs', () => {
|
||||
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
|
||||
afterEach(() => {
|
||||
if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||
else process.env.AI_STREAM_TIMEOUT_MS = ORIG;
|
||||
});
|
||||
|
||||
it('defaults to a generous-but-finite 15 minutes', () => {
|
||||
delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||
expect(streamTimeoutMs()).toBe(900_000);
|
||||
// Finite — NOT disabled (0 would let a hung provider leak forever).
|
||||
expect(streamTimeoutMs()).toBeGreaterThan(0);
|
||||
expect(Number.isFinite(streamTimeoutMs())).toBe(true);
|
||||
});
|
||||
|
||||
it('honours a positive AI_STREAM_TIMEOUT_MS override', () => {
|
||||
process.env.AI_STREAM_TIMEOUT_MS = '120000';
|
||||
expect(streamTimeoutMs()).toBe(120000);
|
||||
});
|
||||
|
||||
it('ignores an invalid / non-positive override (falls back to default)', () => {
|
||||
for (const bad of ['0', '-5', 'abc', '']) {
|
||||
process.env.AI_STREAM_TIMEOUT_MS = bad;
|
||||
expect(streamTimeoutMs()).toBe(900_000);
|
||||
}
|
||||
});
|
||||
|
||||
it('applies the timeout to BOTH undici stream timeouts', () => {
|
||||
delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||
expect(streamingDispatcherOptions()).toEqual({
|
||||
headersTimeout: 900_000,
|
||||
bodyTimeout: 900_000,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('createStreamingFetch — against a delayed server', () => {
|
||||
let server: http.Server;
|
||||
let url: string;
|
||||
// The server waits before sending ANY byte (a long time-to-first-token).
|
||||
const DELAY = 400;
|
||||
|
||||
beforeAll(async () => {
|
||||
server = http.createServer((_req, res) => {
|
||||
setTimeout(() => {
|
||||
res.writeHead(200, { 'Content-Type': 'text/plain' });
|
||||
res.end('ok');
|
||||
}, DELAY);
|
||||
});
|
||||
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
|
||||
const addr = server.address() as import('node:net').AddressInfo;
|
||||
url = `http://127.0.0.1:${addr.port}/`;
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||
});
|
||||
|
||||
it('streams the delayed response instead of timing out', async () => {
|
||||
const streamingFetch = createStreamingFetch();
|
||||
const res = await streamingFetch(url);
|
||||
expect(res.status).toBe(200);
|
||||
expect(await res.text()).toBe('ok');
|
||||
});
|
||||
});
|
||||
58
apps/server/src/integrations/ai/ai-streaming-fetch.ts
Normal file
58
apps/server/src/integrations/ai/ai-streaming-fetch.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import { Agent } from 'undici';
|
||||
|
||||
/**
|
||||
* Default SILENCE timeout for streaming AI calls (15 min). Generous, but FINITE.
|
||||
*
|
||||
* Node's global fetch (undici) defaults headersTimeout and bodyTimeout to
|
||||
* 300_000ms, which severed legitimate long agent turns mid-stream — surfacing as
|
||||
* "Lost connection to the AI provider" (#175): a late step with a huge context
|
||||
* pushes the model's time-to-first-token past 5 min, or a reasoning model pauses
|
||||
* >5 min between chunks. We do NOT disable the timeout (0) — that would let a
|
||||
* genuinely hung provider, with the client still connected, hang forever
|
||||
* (abortSignal only fires on client disconnect). Instead we raise it well above
|
||||
* any realistic gap while keeping it finite so a true hang is eventually broken.
|
||||
*
|
||||
* This bounds SILENCE (time-to-first-byte and the gap BETWEEN chunks), NOT total
|
||||
* turn duration — so an arbitrarily long turn that keeps streaming bytes is never
|
||||
* cut; only a stream that goes quiet for longer than this is treated as a hang.
|
||||
*/
|
||||
const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
|
||||
|
||||
/**
|
||||
* The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a
|
||||
* missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}.
|
||||
*/
|
||||
export function streamTimeoutMs(): number {
|
||||
const raw = Number(process.env.AI_STREAM_TIMEOUT_MS);
|
||||
return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS;
|
||||
}
|
||||
|
||||
/**
|
||||
* undici `Agent` timeout options for streaming AI traffic — both stream timeouts
|
||||
* set to the (generous, finite) silence timeout. Shared by the chat provider
|
||||
* fetch and the external-MCP dispatcher so they behave identically (#175).
|
||||
*/
|
||||
export function streamingDispatcherOptions(): {
|
||||
headersTimeout: number;
|
||||
bodyTimeout: number;
|
||||
} {
|
||||
const t = streamTimeoutMs();
|
||||
return { headersTimeout: t, bodyTimeout: t };
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed
|
||||
* by a dedicated undici dispatcher whose stream timeouts are the generous-but-
|
||||
* finite silence timeout above (#175). A single shared dispatcher is returned
|
||||
* (callers hold it for the service lifetime) so its connection pool is reused.
|
||||
*/
|
||||
export function createStreamingFetch(): typeof fetch {
|
||||
const dispatcher = new Agent(streamingDispatcherOptions());
|
||||
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
|
||||
fetch(input, {
|
||||
...(init ?? {}),
|
||||
// `dispatcher` is an undici-specific init field (not in the DOM RequestInit
|
||||
// type); Node's global fetch reads it. Cast to satisfy the type.
|
||||
dispatcher,
|
||||
} as RequestInit & { dispatcher: Agent })) as typeof fetch;
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
||||
import { describeProviderError } from './ai-error.util';
|
||||
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary.
|
||||
import { createDiagnosticFetch } from './ai-http-diagnostics';
|
||||
import { createStreamingFetch } from './ai-streaming-fetch';
|
||||
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
||||
import { SecretBoxService } from '../crypto/secret-box';
|
||||
import { AiDriver } from './ai.types';
|
||||
@@ -45,11 +46,14 @@ export interface ChatModelOverride {
|
||||
export class AiService {
|
||||
private readonly logger = new Logger(AiService.name);
|
||||
|
||||
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary: passive
|
||||
// instrumentation of the OpenAI-compatible provider HTTP calls (z.ai).
|
||||
// Logs call timing/outcome only — no behavior change.
|
||||
// Provider HTTP fetch for the chat path: a streaming fetch that DISABLES
|
||||
// undici's 300s headers/body timeouts (#175 — long agent turns were severed
|
||||
// mid-stream), wrapped with passive ECONNRESET-investigation telemetry so the
|
||||
// logs observe the exact transport the turn uses. Held for the service
|
||||
// lifetime to reuse the streaming dispatcher's connection pool.
|
||||
private readonly aiDiagnosticFetch = createDiagnosticFetch(
|
||||
'AiService:provider-http',
|
||||
createStreamingFetch(),
|
||||
);
|
||||
|
||||
constructor(
|
||||
|
||||
Reference in New Issue
Block a user