Compare commits

..

1 Commits

Author SHA1 Message Date
claude code agent 227
a14560c7c9 fix(ai-chat): raise undici's 300s stream timeout for long agent turns (#175)
Long research turns failed mid-task with "Lost connection to the AI provider".
Node's global fetch (undici) defaults BOTH headersTimeout and bodyTimeout to
300_000ms, and the chat provider + the external-MCP dispatcher both ran on it
with no override, so:
  - the z.ai chat stream dropped when a late step's huge accumulated context
    pushed the model's time-to-first-token past 5 min (the model reasons
    server-side with NO streamed reasoning, so the connection is silent until the
    first answer token — reproduced: even a trivial glm-5.2 query has a ~4-8s
    first-chunk gap; a long run reaches 400k+-token steps), or a reasoning model
    paused >5 min between chunks (bodyTimeout);
  - the crawl4ai SSE transport, held open across the whole turn, dropped when it
    idled >5 min between tool calls.

Fix: a dedicated undici dispatcher whose stream timeouts are raised to a
generous-but-FINITE silence timeout (default 15 min, AI_STREAM_TIMEOUT_MS) on
each path. NOT disabled (0): that would let a genuinely hung provider — with the
client still connected — hang forever, since the turn's abortSignal only fires on
client disconnect. The timeout bounds SILENCE (time-to-first-byte and the gap
BETWEEN chunks), NOT total turn duration, so an arbitrarily long turn that keeps
streaming is never cut; only a stream quiet for >15 min is treated as a hang.
  - ai-streaming-fetch.ts: createStreamingFetch() + streamTimeoutMs() /
    streamingDispatcherOptions() (the shared, configurable timeout).
  - ai.service: the chat provider fetch is createStreamingFetch(), wrapped by the
    existing passive ECONNRESET telemetry (createDiagnosticFetch gained an
    optional baseFetch) so the telemetry observes the SAME transport.
  - mcp-clients: the SSRF-pinned Agent uses streamingDispatcherOptions().

Investigation: reproduced the transport mechanism against the real z.ai endpoint
(a 1ms headersTimeout throws UND_ERR_HEADERS_TIMEOUT — the exact drop) and ran
the actual research agent to a ~428k-token context. Verified the fixed path
streams cleanly live (glm-5.2 turns finish; telemetry confirms the streaming
fetch is in use).

Tests: ai-streaming-fetch.spec (default 15m + env override + invalid fallback +
both-timeouts + streams a delayed response); ai-http-diagnostics + ai/mcp specs
green. server tsc clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 22:09:10 +03:00
3 changed files with 116 additions and 75 deletions

View File

@@ -6,6 +6,7 @@ import { createMCPClient } from '@ai-sdk/mcp';
import { Agent, type Dispatcher } from 'undici'; import { Agent, type Dispatcher } from 'undici';
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo'; import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
import { AiMcpServer } from '@docmost/db/types/entity.types'; import { AiMcpServer } from '@docmost/db/types/entity.types';
import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch';
import { SecretBoxService } from '../../../integrations/crypto/secret-box'; import { SecretBoxService } from '../../../integrations/crypto/secret-box';
import { isUrlAllowed, isIpAllowed } from './ssrf-guard'; import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
@@ -400,17 +401,16 @@ export function validateResolvedAddresses(
*/ */
function buildPinnedDispatcher(): Agent { function buildPinnedDispatcher(): Agent {
return new Agent({ return new Agent({
// Disable undici's default 300s headers/body timeouts on external MCP // Raise undici's default 300s headers/body timeouts on external MCP traffic
// traffic. A long agent turn keeps an SSE transport (e.g. crawl4ai's // to the same generous-but-finite silence timeout the chat fetch uses (#175).
// /mcp/sse) open across the whole turn; that connection can idle BETWEEN // A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open
// tool calls longer than 5 min, and undici's bodyTimeout would otherwise // across the whole turn; that connection can idle BETWEEN tool calls longer
// sever it mid-task — a tool-call failure that aborts the streamed turn and // than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a
// shows the user "Lost connection to the AI provider" (#175). A slow single // tool-call failure that aborts the streamed turn and shows the user "Lost
// tool call (a crawl) can likewise exceed headersTimeout. Connection // connection to the AI provider". A slow single tool call (a crawl) can
// lifetime is bounded by the turn (clients are closed in onFinish/onError/ // likewise exceed headersTimeout. The timeout stays FINITE so a genuinely
// onAbort), so disabling the wall-clock cap is safe. // hung server is still broken eventually.
headersTimeout: 0, ...streamingDispatcherOptions(),
bodyTimeout: 0,
connect: { connect: {
lookup: (hostname, _options, callback) => { lookup: (hostname, _options, callback) => {
// Always resolve ALL addresses ourselves; do not trust the caller's // Always resolve ALL addresses ourselves; do not trust the caller's

View File

@@ -1,50 +1,78 @@
import * as http from 'node:http'; import * as http from 'node:http';
import { import {
createStreamingFetch, createStreamingFetch,
STREAMING_DISPATCHER_OPTIONS, streamTimeoutMs,
streamingDispatcherOptions,
} from './ai-streaming-fetch'; } from './ai-streaming-fetch';
/** /**
* #175: Node's global fetch (undici) defaults headers/body timeouts to 300s and * #175: undici's default 300s headers/body timeouts severed long agent turns.
* severs a long agent turn mid-stream. createStreamingFetch must DISABLE those * The streaming fetch raises them to a generous-but-FINITE silence timeout (not
* timeouts (0). We pin the option contract deterministically AND confirm the * 0 — a true hang must still break). We pin: the configured value + env override,
* built fetch actually streams a deliberately-delayed response. * that both dispatcher timeouts use it, and that a delayed response streams.
*/ */
describe('createStreamingFetch', () => { describe('streamTimeoutMs', () => {
it('disables BOTH undici stream timeouts (the #175 contract)', () => { const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
expect(STREAMING_DISPATCHER_OPTIONS.headersTimeout).toBe(0); afterEach(() => {
expect(STREAMING_DISPATCHER_OPTIONS.bodyTimeout).toBe(0); if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS;
else process.env.AI_STREAM_TIMEOUT_MS = ORIG;
}); });
describe('against a delayed server', () => { it('defaults to a generous-but-finite 15 minutes', () => {
let server: http.Server; delete process.env.AI_STREAM_TIMEOUT_MS;
let url: string; expect(streamTimeoutMs()).toBe(900_000);
// The server waits before sending ANY byte (a long time-to-first-token). // Finite — NOT disabled (0 would let a hung provider leak forever).
const DELAY = 400; expect(streamTimeoutMs()).toBeGreaterThan(0);
expect(Number.isFinite(streamTimeoutMs())).toBe(true);
});
beforeAll(async () => { it('honours a positive AI_STREAM_TIMEOUT_MS override', () => {
server = http.createServer((_req, res) => { process.env.AI_STREAM_TIMEOUT_MS = '120000';
setTimeout(() => { expect(streamTimeoutMs()).toBe(120000);
res.writeHead(200, { 'Content-Type': 'text/plain' }); });
res.end('ok');
}, DELAY);
});
await new Promise<void>((resolve) =>
server.listen(0, '127.0.0.1', resolve),
);
const addr = server.address() as import('node:net').AddressInfo;
url = `http://127.0.0.1:${addr.port}/`;
});
afterAll(async () => { it('ignores an invalid / non-positive override (falls back to default)', () => {
await new Promise<void>((resolve) => server.close(() => resolve())); for (const bad of ['0', '-5', 'abc', '']) {
}); process.env.AI_STREAM_TIMEOUT_MS = bad;
expect(streamTimeoutMs()).toBe(900_000);
}
});
it('streams the delayed response instead of timing out', async () => { it('applies the timeout to BOTH undici stream timeouts', () => {
const streamingFetch = createStreamingFetch(); delete process.env.AI_STREAM_TIMEOUT_MS;
const res = await streamingFetch(url); expect(streamingDispatcherOptions()).toEqual({
expect(res.status).toBe(200); headersTimeout: 900_000,
expect(await res.text()).toBe('ok'); bodyTimeout: 900_000,
}); });
}); });
}); });
describe('createStreamingFetch — against a delayed server', () => {
let server: http.Server;
let url: string;
// The server waits before sending ANY byte (a long time-to-first-token).
const DELAY = 400;
beforeAll(async () => {
server = http.createServer((_req, res) => {
setTimeout(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('ok');
}, DELAY);
});
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
const addr = server.address() as import('node:net').AddressInfo;
url = `http://127.0.0.1:${addr.port}/`;
});
afterAll(async () => {
await new Promise<void>((resolve) => server.close(() => resolve()));
});
it('streams the delayed response instead of timing out', async () => {
const streamingFetch = createStreamingFetch();
const res = await streamingFetch(url);
expect(res.status).toBe(200);
expect(await res.text()).toBe('ok');
});
});

View File

@@ -1,40 +1,53 @@
import { Agent } from 'undici'; import { Agent } from 'undici';
/** /**
* Build a `fetch` for LONG-LIVED streaming AI calls (the agent chat turn). * Default SILENCE timeout for streaming AI calls (15 min). Generous, but FINITE.
* *
* Node's global fetch (undici) defaults BOTH `headersTimeout` and `bodyTimeout` * Node's global fetch (undici) defaults headersTimeout and bodyTimeout to
* to 300_000ms. A legitimate long agent turn trips that and is severed * 300_000ms, which severed legitimate long agent turns mid-stream — surfacing as
* mid-stream — surfacing to the user as "Lost connection to the AI provider" * "Lost connection to the AI provider" (#175): a late step with a huge context
* (issue #175): * pushes the model's time-to-first-token past 5 min, or a reasoning model pauses
* - headersTimeout (time to the first response byte) is exceeded when a late * >5 min between chunks. We do NOT disable the timeout (0) — that would let a
* step sends a huge accumulated context and the model's time-to-first-token * genuinely hung provider, with the client still connected, hang forever
* grows past 5 min; * (abortSignal only fires on client disconnect). Instead we raise it well above
* - bodyTimeout (max gap BETWEEN body chunks) is exceeded when a reasoning * any realistic gap while keeping it finite so a true hang is eventually broken.
* model pauses to "think" for more than 5 min between emitted chunks.
* *
* This dispatcher disables both timeouts (0). Cancellation is NOT lost: the * This bounds SILENCE (time-to-first-byte and the gap BETWEEN chunks), NOT total
* agent turn is bound to the request's abortSignal, which fires when the client * turn duration — so an arbitrarily long turn that keeps streaming bytes is never
* disconnects (see ai-chat.controller), and the agent loop is bounded by * cut; only a stream that goes quiet for longer than this is treated as a hang.
* MAX_AGENT_STEPS — so a turn still terminates; it just no longer dies at an
* arbitrary 5-minute wall-clock. keepAlive (undici default) is kept so the
* sequential per-step calls of one turn reuse the connection.
*
* A single shared dispatcher is returned (callers hold it for the service
* lifetime) so its connection pool is reused across requests.
*/ */
/** const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
* undici Agent options for streaming AI calls: both stream timeouts DISABLED (0)
* so a long turn is never severed at undici's 300s default (#175). Exported so a
* test can pin the contract without a timing-dependent assertion.
*/
export const STREAMING_DISPATCHER_OPTIONS = {
headersTimeout: 0,
bodyTimeout: 0,
} as const;
/**
* The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a
* missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}.
*/
export function streamTimeoutMs(): number {
const raw = Number(process.env.AI_STREAM_TIMEOUT_MS);
return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS;
}
/**
* undici `Agent` timeout options for streaming AI traffic — both stream timeouts
* set to the (generous, finite) silence timeout. Shared by the chat provider
* fetch and the external-MCP dispatcher so they behave identically (#175).
*/
export function streamingDispatcherOptions(): {
headersTimeout: number;
bodyTimeout: number;
} {
const t = streamTimeoutMs();
return { headersTimeout: t, bodyTimeout: t };
}
/**
* Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed
* by a dedicated undici dispatcher whose stream timeouts are the generous-but-
* finite silence timeout above (#175). A single shared dispatcher is returned
* (callers hold it for the service lifetime) so its connection pool is reused.
*/
export function createStreamingFetch(): typeof fetch { export function createStreamingFetch(): typeof fetch {
const dispatcher = new Agent({ ...STREAMING_DISPATCHER_OPTIONS }); const dispatcher = new Agent(streamingDispatcherOptions());
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) => return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
fetch(input, { fetch(input, {
...(init ?? {}), ...(init ?? {}),