fix(ai-chat): don't sever long agent turns at undici's 300s stream timeout (#175)

Long research turns failed mid-task with "Lost connection to the AI provider".
Node's global fetch (undici) defaults BOTH headersTimeout and bodyTimeout to
300_000ms, and the chat provider + the external-MCP dispatcher both ran on it
with no override, so:
  - the z.ai chat stream dropped when a late step's huge accumulated context
    pushed the model's time-to-first-token past 5 min (reproduced: even a trivial
    glm-5.2 query has a ~4-8s first-chunk latency; the live telemetry shows it
    scaling with context — and a long run reaches 400k+-token steps), or a
    reasoning model paused >5 min between chunks (bodyTimeout);
  - the crawl4ai SSE transport, held open across the whole turn, dropped when it
    idled >5 min between tool calls — a tool failure that aborts the turn and
    surfaces the same banner.

Fix: a dedicated undici dispatcher with both stream timeouts DISABLED (0) on each
path. Cancellation is unchanged — the turn is bound to the request abortSignal
(client disconnect) and capped by MAX_AGENT_STEPS, so it still terminates; it
just no longer dies at an arbitrary 5-minute wall-clock.
  - ai-streaming-fetch.ts: createStreamingFetch() (+ exported option contract).
  - ai.service: the chat provider's fetch is now createStreamingFetch(), wrapped
    by the existing passive ECONNRESET telemetry (createDiagnosticFetch gained an
    optional baseFetch) so the telemetry observes the SAME transport the turn
    uses.
  - mcp-clients: headersTimeout/bodyTimeout: 0 on the SSRF-pinned Agent.

Investigation: reproduced the transport mechanism against the real z.ai endpoint
(a 1ms headersTimeout throws UND_ERR_HEADERS_TIMEOUT — the exact drop) and ran
the actual research agent to a ~428k-token context. Verified the fixed path
streams cleanly live (glm-5.2 turns finish; telemetry confirms the streaming
fetch is in use).

Tests: ai-streaming-fetch.spec (option contract + streams a delayed response);
ai-http-diagnostics + ai/mcp specs green. server tsc clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
claude code agent 227
2026-06-24 21:50:41 +03:00
parent 4cc8df836f
commit a5aa1185b1
5 changed files with 123 additions and 7 deletions

View File

@@ -400,6 +400,17 @@ export function validateResolvedAddresses(
*/
function buildPinnedDispatcher(): Agent {
return new Agent({
// Disable undici's default 300s headers/body timeouts on external MCP
// traffic. A long agent turn keeps an SSE transport (e.g. crawl4ai's
// /mcp/sse) open across the whole turn; that connection can idle BETWEEN
// tool calls longer than 5 min, and undici's bodyTimeout would otherwise
// sever it mid-task — a tool-call failure that aborts the streamed turn and
// shows the user "Lost connection to the AI provider" (#175). A slow single
// tool call (a crawl) can likewise exceed headersTimeout. Connection
// lifetime is bounded by the turn (clients are closed in onFinish/onError/
// onAbort), so disabling the wall-clock cap is safe.
headersTimeout: 0,
bodyTimeout: 0,
connect: {
lookup: (hostname, _options, callback) => {
// Always resolve ALL addresses ourselves; do not trust the caller's

View File

@@ -25,7 +25,13 @@ import { Logger } from '@nestjs/common';
* The seq/last-call timestamps are module-level, so under concurrent turns the
* idle-gap figure is approximate (fine for single-user reproduction).
*/
export function createDiagnosticFetch(context: string): typeof fetch {
export function createDiagnosticFetch(
context: string,
// The underlying fetch to instrument. Defaults to the global fetch; the chat
// provider passes a streaming fetch (disabled undici stream timeouts, #175) so
// the telemetry observes the SAME transport the long agent turn actually uses.
baseFetch: typeof fetch = fetch,
): typeof fetch {
const logger = new Logger(context);
let callSeq = 0;
let lastCallStartedAt: number | undefined;
@@ -46,9 +52,9 @@ export function createDiagnosticFetch(context: string): typeof fetch {
? body.byteLength
: undefined;
try {
// Delegate to global fetch; return the Response UNTOUCHED (never read/clone
// the body) so the streamed SSE response is unaffected.
const res = await fetch(input, init);
// Delegate to the base fetch; return the Response UNTOUCHED (never read/
// clone the body) so the streamed SSE response is unaffected.
const res = await baseFetch(input, init);
logger.log(
`provider HTTP DIAGNOSTIC: call#${callId} OK ` +
`headersAfter=${Date.now() - startedAt}ms status=${res.status} ` +

View File

@@ -0,0 +1,50 @@
import * as http from 'node:http';
import {
createStreamingFetch,
STREAMING_DISPATCHER_OPTIONS,
} from './ai-streaming-fetch';
/**
* #175: Node's global fetch (undici) defaults headers/body timeouts to 300s and
* severs a long agent turn mid-stream. createStreamingFetch must DISABLE those
* timeouts (0). We pin the option contract deterministically AND confirm the
* built fetch actually streams a deliberately-delayed response.
*/
describe('createStreamingFetch', () => {
it('disables BOTH undici stream timeouts (the #175 contract)', () => {
expect(STREAMING_DISPATCHER_OPTIONS.headersTimeout).toBe(0);
expect(STREAMING_DISPATCHER_OPTIONS.bodyTimeout).toBe(0);
});
describe('against a delayed server', () => {
let server: http.Server;
let url: string;
// The server waits before sending ANY byte (a long time-to-first-token).
const DELAY = 400;
beforeAll(async () => {
server = http.createServer((_req, res) => {
setTimeout(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('ok');
}, DELAY);
});
await new Promise<void>((resolve) =>
server.listen(0, '127.0.0.1', resolve),
);
const addr = server.address() as import('node:net').AddressInfo;
url = `http://127.0.0.1:${addr.port}/`;
});
afterAll(async () => {
await new Promise<void>((resolve) => server.close(() => resolve()));
});
it('streams the delayed response instead of timing out', async () => {
const streamingFetch = createStreamingFetch();
const res = await streamingFetch(url);
expect(res.status).toBe(200);
expect(await res.text()).toBe('ok');
});
});
});

View File

@@ -0,0 +1,45 @@
import { Agent } from 'undici';
/**
* Build a `fetch` for LONG-LIVED streaming AI calls (the agent chat turn).
*
* Node's global fetch (undici) defaults BOTH `headersTimeout` and `bodyTimeout`
* to 300_000ms. A legitimate long agent turn trips that and is severed
* mid-stream — surfacing to the user as "Lost connection to the AI provider"
* (issue #175):
* - headersTimeout (time to the first response byte) is exceeded when a late
* step sends a huge accumulated context and the model's time-to-first-token
* grows past 5 min;
* - bodyTimeout (max gap BETWEEN body chunks) is exceeded when a reasoning
* model pauses to "think" for more than 5 min between emitted chunks.
*
* This dispatcher disables both timeouts (0). Cancellation is NOT lost: the
* agent turn is bound to the request's abortSignal, which fires when the client
* disconnects (see ai-chat.controller), and the agent loop is bounded by
* MAX_AGENT_STEPS — so a turn still terminates; it just no longer dies at an
* arbitrary 5-minute wall-clock. keepAlive (undici default) is kept so the
* sequential per-step calls of one turn reuse the connection.
*
* A single shared dispatcher is returned (callers hold it for the service
* lifetime) so its connection pool is reused across requests.
*/
/**
* undici Agent options for streaming AI calls: both stream timeouts DISABLED (0)
* so a long turn is never severed at undici's 300s default (#175). Exported so a
* test can pin the contract without a timing-dependent assertion.
*/
export const STREAMING_DISPATCHER_OPTIONS = {
headersTimeout: 0,
bodyTimeout: 0,
} as const;
export function createStreamingFetch(): typeof fetch {
const dispatcher = new Agent({ ...STREAMING_DISPATCHER_OPTIONS });
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
fetch(input, {
...(init ?? {}),
// `dispatcher` is an undici-specific init field (not in the DOM RequestInit
// type); Node's global fetch reads it. Cast to satisfy the type.
dispatcher,
} as RequestInit & { dispatcher: Agent })) as typeof fetch;
}

View File

@@ -16,6 +16,7 @@ import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
import { describeProviderError } from './ai-error.util';
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary.
import { createDiagnosticFetch } from './ai-http-diagnostics';
import { createStreamingFetch } from './ai-streaming-fetch';
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
import { SecretBoxService } from '../crypto/secret-box';
import { AiDriver } from './ai.types';
@@ -45,11 +46,14 @@ export interface ChatModelOverride {
export class AiService {
private readonly logger = new Logger(AiService.name);
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary: passive
// instrumentation of the OpenAI-compatible provider HTTP calls (z.ai).
// Logs call timing/outcome only — no behavior change.
// Provider HTTP fetch for the chat path: a streaming fetch that DISABLES
// undici's 300s headers/body timeouts (#175 — long agent turns were severed
// mid-stream), wrapped with passive ECONNRESET-investigation telemetry so the
// logs observe the exact transport the turn uses. Held for the service
// lifetime to reuse the streaming dispatcher's connection pool.
private readonly aiDiagnosticFetch = createDiagnosticFetch(
'AiService:provider-http',
createStreamingFetch(),
);
constructor(