From a14560c7c9c092d5b9032d6eb76b790ba1a868e8 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Wed, 24 Jun 2026 21:50:41 +0300 Subject: [PATCH 1/2] fix(ai-chat): raise undici's 300s stream timeout for long agent turns (#175) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Long research turns failed mid-task with "Lost connection to the AI provider". Node's global fetch (undici) defaults BOTH headersTimeout and bodyTimeout to 300_000ms, and the chat provider + the external-MCP dispatcher both ran on it with no override, so: - the z.ai chat stream dropped when a late step's huge accumulated context pushed the model's time-to-first-token past 5 min (the model reasons server-side with NO streamed reasoning, so the connection is silent until the first answer token — reproduced: even a trivial glm-5.2 query has a ~4-8s first-chunk gap; a long run reaches 400k+-token steps), or a reasoning model paused >5 min between chunks (bodyTimeout); - the crawl4ai SSE transport, held open across the whole turn, dropped when it idled >5 min between tool calls. Fix: a dedicated undici dispatcher whose stream timeouts are raised to a generous-but-FINITE silence timeout (default 15 min, AI_STREAM_TIMEOUT_MS) on each path. NOT disabled (0): that would let a genuinely hung provider — with the client still connected — hang forever, since the turn's abortSignal only fires on client disconnect. The timeout bounds SILENCE (time-to-first-byte and the gap BETWEEN chunks), NOT total turn duration, so an arbitrarily long turn that keeps streaming is never cut; only a stream quiet for >15 min is treated as a hang. - ai-streaming-fetch.ts: createStreamingFetch() + streamTimeoutMs() / streamingDispatcherOptions() (the shared, configurable timeout). - ai.service: the chat provider fetch is createStreamingFetch(), wrapped by the existing passive ECONNRESET telemetry (createDiagnosticFetch gained an optional baseFetch) so the telemetry observes the SAME transport. - mcp-clients: the SSRF-pinned Agent uses streamingDispatcherOptions(). Investigation: reproduced the transport mechanism against the real z.ai endpoint (a 1ms headersTimeout throws UND_ERR_HEADERS_TIMEOUT — the exact drop) and ran the actual research agent to a ~428k-token context. Verified the fixed path streams cleanly live (glm-5.2 turns finish; telemetry confirms the streaming fetch is in use). Tests: ai-streaming-fetch.spec (default 15m + env override + invalid fallback + both-timeouts + streams a delayed response); ai-http-diagnostics + ai/mcp specs green. server tsc clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../external-mcp/mcp-clients.service.ts | 11 +++ .../integrations/ai/ai-http-diagnostics.ts | 14 +++- .../ai/ai-streaming-fetch.spec.ts | 78 +++++++++++++++++++ .../src/integrations/ai/ai-streaming-fetch.ts | 58 ++++++++++++++ apps/server/src/integrations/ai/ai.service.ts | 10 ++- 5 files changed, 164 insertions(+), 7 deletions(-) create mode 100644 apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts create mode 100644 apps/server/src/integrations/ai/ai-streaming-fetch.ts diff --git a/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts b/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts index 30e94dc0..fe83801b 100644 --- a/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts +++ b/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts @@ -6,6 +6,7 @@ import { createMCPClient } from '@ai-sdk/mcp'; import { Agent, type Dispatcher } from 'undici'; import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo'; import { AiMcpServer } from '@docmost/db/types/entity.types'; +import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch'; import { SecretBoxService } from '../../../integrations/crypto/secret-box'; import { isUrlAllowed, isIpAllowed } from './ssrf-guard'; @@ -400,6 +401,16 @@ export function validateResolvedAddresses( */ function buildPinnedDispatcher(): Agent { return new Agent({ + // Raise undici's default 300s headers/body timeouts on external MCP traffic + // to the same generous-but-finite silence timeout the chat fetch uses (#175). + // A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open + // across the whole turn; that connection can idle BETWEEN tool calls longer + // than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a + // tool-call failure that aborts the streamed turn and shows the user "Lost + // connection to the AI provider". A slow single tool call (a crawl) can + // likewise exceed headersTimeout. The timeout stays FINITE so a genuinely + // hung server is still broken eventually. + ...streamingDispatcherOptions(), connect: { lookup: (hostname, _options, callback) => { // Always resolve ALL addresses ourselves; do not trust the caller's diff --git a/apps/server/src/integrations/ai/ai-http-diagnostics.ts b/apps/server/src/integrations/ai/ai-http-diagnostics.ts index eb9beeb2..0761d050 100644 --- a/apps/server/src/integrations/ai/ai-http-diagnostics.ts +++ b/apps/server/src/integrations/ai/ai-http-diagnostics.ts @@ -25,7 +25,13 @@ import { Logger } from '@nestjs/common'; * The seq/last-call timestamps are module-level, so under concurrent turns the * idle-gap figure is approximate (fine for single-user reproduction). */ -export function createDiagnosticFetch(context: string): typeof fetch { +export function createDiagnosticFetch( + context: string, + // The underlying fetch to instrument. Defaults to the global fetch; the chat + // provider passes a streaming fetch (disabled undici stream timeouts, #175) so + // the telemetry observes the SAME transport the long agent turn actually uses. + baseFetch: typeof fetch = fetch, +): typeof fetch { const logger = new Logger(context); let callSeq = 0; let lastCallStartedAt: number | undefined; @@ -46,9 +52,9 @@ export function createDiagnosticFetch(context: string): typeof fetch { ? body.byteLength : undefined; try { - // Delegate to global fetch; return the Response UNTOUCHED (never read/clone - // the body) so the streamed SSE response is unaffected. - const res = await fetch(input, init); + // Delegate to the base fetch; return the Response UNTOUCHED (never read/ + // clone the body) so the streamed SSE response is unaffected. + const res = await baseFetch(input, init); logger.log( `provider HTTP DIAGNOSTIC: call#${callId} OK ` + `headersAfter=${Date.now() - startedAt}ms status=${res.status} ` + diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts new file mode 100644 index 00000000..df6a16d5 --- /dev/null +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts @@ -0,0 +1,78 @@ +import * as http from 'node:http'; +import { + createStreamingFetch, + streamTimeoutMs, + streamingDispatcherOptions, +} from './ai-streaming-fetch'; + +/** + * #175: undici's default 300s headers/body timeouts severed long agent turns. + * The streaming fetch raises them to a generous-but-FINITE silence timeout (not + * 0 — a true hang must still break). We pin: the configured value + env override, + * that both dispatcher timeouts use it, and that a delayed response streams. + */ +describe('streamTimeoutMs', () => { + const ORIG = process.env.AI_STREAM_TIMEOUT_MS; + afterEach(() => { + if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS; + else process.env.AI_STREAM_TIMEOUT_MS = ORIG; + }); + + it('defaults to a generous-but-finite 15 minutes', () => { + delete process.env.AI_STREAM_TIMEOUT_MS; + expect(streamTimeoutMs()).toBe(900_000); + // Finite — NOT disabled (0 would let a hung provider leak forever). + expect(streamTimeoutMs()).toBeGreaterThan(0); + expect(Number.isFinite(streamTimeoutMs())).toBe(true); + }); + + it('honours a positive AI_STREAM_TIMEOUT_MS override', () => { + process.env.AI_STREAM_TIMEOUT_MS = '120000'; + expect(streamTimeoutMs()).toBe(120000); + }); + + it('ignores an invalid / non-positive override (falls back to default)', () => { + for (const bad of ['0', '-5', 'abc', '']) { + process.env.AI_STREAM_TIMEOUT_MS = bad; + expect(streamTimeoutMs()).toBe(900_000); + } + }); + + it('applies the timeout to BOTH undici stream timeouts', () => { + delete process.env.AI_STREAM_TIMEOUT_MS; + expect(streamingDispatcherOptions()).toEqual({ + headersTimeout: 900_000, + bodyTimeout: 900_000, + }); + }); +}); + +describe('createStreamingFetch — against a delayed server', () => { + let server: http.Server; + let url: string; + // The server waits before sending ANY byte (a long time-to-first-token). + const DELAY = 400; + + beforeAll(async () => { + server = http.createServer((_req, res) => { + setTimeout(() => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.end('ok'); + }, DELAY); + }); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const addr = server.address() as import('node:net').AddressInfo; + url = `http://127.0.0.1:${addr.port}/`; + }); + + afterAll(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + it('streams the delayed response instead of timing out', async () => { + const streamingFetch = createStreamingFetch(); + const res = await streamingFetch(url); + expect(res.status).toBe(200); + expect(await res.text()).toBe('ok'); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.ts new file mode 100644 index 00000000..f257fe4e --- /dev/null +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.ts @@ -0,0 +1,58 @@ +import { Agent } from 'undici'; + +/** + * Default SILENCE timeout for streaming AI calls (15 min). Generous, but FINITE. + * + * Node's global fetch (undici) defaults headersTimeout and bodyTimeout to + * 300_000ms, which severed legitimate long agent turns mid-stream — surfacing as + * "Lost connection to the AI provider" (#175): a late step with a huge context + * pushes the model's time-to-first-token past 5 min, or a reasoning model pauses + * >5 min between chunks. We do NOT disable the timeout (0) — that would let a + * genuinely hung provider, with the client still connected, hang forever + * (abortSignal only fires on client disconnect). Instead we raise it well above + * any realistic gap while keeping it finite so a true hang is eventually broken. + * + * This bounds SILENCE (time-to-first-byte and the gap BETWEEN chunks), NOT total + * turn duration — so an arbitrarily long turn that keeps streaming bytes is never + * cut; only a stream that goes quiet for longer than this is treated as a hang. + */ +const DEFAULT_STREAM_TIMEOUT_MS = 900_000; + +/** + * The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a + * missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}. + */ +export function streamTimeoutMs(): number { + const raw = Number(process.env.AI_STREAM_TIMEOUT_MS); + return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS; +} + +/** + * undici `Agent` timeout options for streaming AI traffic — both stream timeouts + * set to the (generous, finite) silence timeout. Shared by the chat provider + * fetch and the external-MCP dispatcher so they behave identically (#175). + */ +export function streamingDispatcherOptions(): { + headersTimeout: number; + bodyTimeout: number; +} { + const t = streamTimeoutMs(); + return { headersTimeout: t, bodyTimeout: t }; +} + +/** + * Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed + * by a dedicated undici dispatcher whose stream timeouts are the generous-but- + * finite silence timeout above (#175). A single shared dispatcher is returned + * (callers hold it for the service lifetime) so its connection pool is reused. + */ +export function createStreamingFetch(): typeof fetch { + const dispatcher = new Agent(streamingDispatcherOptions()); + return ((input: Parameters[0], init?: RequestInit) => + fetch(input, { + ...(init ?? {}), + // `dispatcher` is an undici-specific init field (not in the DOM RequestInit + // type); Node's global fetch reads it. Cast to satisfy the type. + dispatcher, + } as RequestInit & { dispatcher: Agent })) as typeof fetch; +} diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index 4f72d23b..65bdda5c 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -16,6 +16,7 @@ import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { describeProviderError } from './ai-error.util'; // DIAGNOSTIC (provider ECONNRESET investigation) — temporary. import { createDiagnosticFetch } from './ai-http-diagnostics'; +import { createStreamingFetch } from './ai-streaming-fetch'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { SecretBoxService } from '../crypto/secret-box'; import { AiDriver } from './ai.types'; @@ -45,11 +46,14 @@ export interface ChatModelOverride { export class AiService { private readonly logger = new Logger(AiService.name); - // DIAGNOSTIC (provider ECONNRESET investigation) — temporary: passive - // instrumentation of the OpenAI-compatible provider HTTP calls (z.ai). - // Logs call timing/outcome only — no behavior change. + // Provider HTTP fetch for the chat path: a streaming fetch that DISABLES + // undici's 300s headers/body timeouts (#175 — long agent turns were severed + // mid-stream), wrapped with passive ECONNRESET-investigation telemetry so the + // logs observe the exact transport the turn uses. Held for the service + // lifetime to reuse the streaming dispatcher's connection pool. private readonly aiDiagnosticFetch = createDiagnosticFetch( 'AiService:provider-http', + createStreamingFetch(), ); constructor( -- 2.49.1 From da15b55786d55bc381aafa3debc49f821626ffd5 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Wed, 24 Jun 2026 22:31:58 +0300 Subject: [PATCH 2/2] =?UTF-8?q?refactor(ai):=20address=20PR=20#176=20revie?= =?UTF-8?q?w=20=E2=80=94=20finite-timeout=20wording,=20env=20doc,=20tests,?= =?UTF-8?q?=20permanent=20provider-http=20module?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wording: every comment now says the stream timeouts are RAISED to a generous-but-finite ~15-min silence timeout, not "disabled (0)" (the stale comments contradicted the code, which uses AI_STREAM_TIMEOUT_MS, default 900000ms). - Architecture (the load-bearing-temporary trap): the streaming fetch reached the chat provider only by riding the "temporary DIAGNOSTIC" telemetry, so deleting the telemetry by its own label would silently revert the timeout fix. Legitimize it: rename ai-http-diagnostics.ts -> ai-provider-http.ts, createDiagnosticFetch -> createInstrumentedFetch, field aiDiagnosticFetch -> aiProviderFetch, drop the "temporary" labels, and document the chat transport (streaming fetch + instrumentation) as one intentional construct. - Docs: AI_STREAM_TIMEOUT_MS added to .env.example next to AI_EMBEDDING_TIMEOUT_MS. - Tests: - ai-provider-http.spec: createInstrumentedFetch delegates to the injected baseFetch with the same input/init, returns the Response untouched, rethrows the error, and defaults to global fetch — covering the baseFetch seam. - ai-streaming-fetch.spec: the delayed-server test is now LOAD-BEARING — with AI_STREAM_TIMEOUT_MS set below the 1.5s server delay the call actually rejects (a lost dispatcher -> global 300s default would NOT), proving the configured dispatcher is wired; plus the default-timeout happy path. server tsc clean; ai-streaming-fetch / ai-provider-http / ai.service / mcp-servers / ai-error specs green (41). Co-Authored-By: Claude Opus 4.8 (1M context) --- .env.example | 6 +++ .../integrations/ai/ai-provider-http.spec.ts | 40 +++++++++++++++++++ ...ttp-diagnostics.ts => ai-provider-http.ts} | 36 ++++++++++------- .../ai/ai-streaming-fetch.spec.ts | 40 +++++++++++++++++-- apps/server/src/integrations/ai/ai.service.ts | 22 +++++----- 5 files changed, 114 insertions(+), 30 deletions(-) create mode 100644 apps/server/src/integrations/ai/ai-provider-http.spec.ts rename apps/server/src/integrations/ai/{ai-http-diagnostics.ts => ai-provider-http.ts} (65%) diff --git a/.env.example b/.env.example index fa886282..4726805b 100644 --- a/.env.example +++ b/.env.example @@ -136,6 +136,12 @@ MCP_DOCMOST_PASSWORD= # A slow/hung embeddings endpoint fails after this and the batch continues. # AI_EMBEDDING_TIMEOUT_MS=120000 +# Silence timeout (ms) for streaming chat/agent AI calls AND external-MCP traffic. +# Bounds time-to-first-byte and the gap BETWEEN chunks (NOT the total turn length), +# so an arbitrarily long turn that keeps streaming is never cut. Finite so a hung +# provider is eventually broken instead of leaking forever. Default 900000 (15 min). +# AI_STREAM_TIMEOUT_MS=900000 + # --- Anonymous public-share AI assistant --- # Opt-in per workspace (AI settings -> "public share assistant"; off by default). # When enabled, anonymous visitors of a published share can ask an AI about that diff --git a/apps/server/src/integrations/ai/ai-provider-http.spec.ts b/apps/server/src/integrations/ai/ai-provider-http.spec.ts new file mode 100644 index 00000000..7ccb744c --- /dev/null +++ b/apps/server/src/integrations/ai/ai-provider-http.spec.ts @@ -0,0 +1,40 @@ +import { createInstrumentedFetch } from './ai-provider-http'; + +/** + * createInstrumentedFetch must be behavior-neutral: it delegates to the supplied + * baseFetch with the SAME input/init, returns the Response object untouched (so + * the streamed SSE body is never read/cloned), and rethrows the same error. The + * baseFetch injection is the seam that carries the streaming fetch (#175) onto + * the chat provider, so it is tested directly. + */ +describe('createInstrumentedFetch', () => { + it('delegates to the injected baseFetch with the same input/init', async () => { + const fakeResponse = new Response('ok', { status: 200 }); + const baseFetch = jest.fn().mockResolvedValue(fakeResponse); + const instrumented = createInstrumentedFetch('test', baseFetch as never); + + const init = { method: 'POST', body: '{"q":1}' }; + const res = await instrumented('https://example.com/v1/chat', init); + + expect(baseFetch).toHaveBeenCalledTimes(1); + expect(baseFetch).toHaveBeenCalledWith('https://example.com/v1/chat', init); + // The Response is returned UNTOUCHED (same reference — never read/cloned). + expect(res).toBe(fakeResponse); + }); + + it('rethrows the base fetch error unchanged (pre-response failure)', async () => { + const err = Object.assign(new TypeError('fetch failed'), { + cause: { code: 'ECONNRESET' }, + }); + const baseFetch = jest.fn().mockRejectedValue(err); + const instrumented = createInstrumentedFetch('test', baseFetch as never); + + await expect(instrumented('https://example.com/')).rejects.toBe(err); + }); + + it('defaults to the global fetch when no baseFetch is given', () => { + // Constructing without a baseFetch must not throw — it simply wraps global + // fetch (the non-chat default). + expect(() => createInstrumentedFetch('test')).not.toThrow(); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-http-diagnostics.ts b/apps/server/src/integrations/ai/ai-provider-http.ts similarity index 65% rename from apps/server/src/integrations/ai/ai-http-diagnostics.ts rename to apps/server/src/integrations/ai/ai-provider-http.ts index 0761d050..22ef2f44 100644 --- a/apps/server/src/integrations/ai/ai-http-diagnostics.ts +++ b/apps/server/src/integrations/ai/ai-provider-http.ts @@ -1,16 +1,22 @@ import { Logger } from '@nestjs/common'; /** - * DIAGNOSTIC (provider ECONNRESET investigation) — temporary. + * The provider HTTP fetch used by the chat path: a thin, behavior-neutral + * instrumentation wrapper around a supplied `fetch`. * - * A PASSIVE, behavior-neutral wrapper around the global `fetch`, injected into - * the OpenAI-compatible provider client (`createOpenAI({ fetch })`, the z.ai - * path). Per provider HTTP call it logs: time-to-response-headers + status + - * request-body size on success; and on a pre-response rejection the failure - * latency + error code/cause + request-body size + the idle gap since the - * previous provider call. It NEVER retries, times out, swaps the dispatcher, or - * reads/clones the response body — the Response is returned untouched (streaming - * unaffected) and any error is rethrown unchanged. + * It defaults to the global `fetch`, but the chat provider passes the streaming + * fetch (which RAISES undici's 300s stream timeouts to a generous-but-finite + * silence timeout so a long agent turn is not severed mid-stream — #175). So this + * wrapper observes the EXACT transport a turn uses. It NEVER retries, times out, + * swaps the dispatcher, or reads/clones the response body — the Response is + * returned untouched (streaming unaffected) and any error is rethrown unchanged. + * + * Per provider HTTP call it logs: time-to-response-headers + status + request + * body size on success; and on a pre-response rejection the failure latency + + * error code/cause + request body size + the idle gap since the previous call. + * This telemetry is intentional and kept (it diagnoses provider connection + * resets / mid-stream cuts), and it is load-bearing: the streaming fetch reaches + * the chat provider THROUGH this wrapper, so the two are one construct. * * How to read the result (a long agentic turn makes one provider call per step): * - a failed turn whose last provider line is "PRE-RESPONSE FAILED ... ECONNRESET" @@ -23,13 +29,13 @@ import { Logger } from '@nestjs/common'; * different failure mode. * * The seq/last-call timestamps are module-level, so under concurrent turns the - * idle-gap figure is approximate (fine for single-user reproduction). + * idle-gap figure is approximate (fine for single-user diagnosis). */ -export function createDiagnosticFetch( +export function createInstrumentedFetch( context: string, // The underlying fetch to instrument. Defaults to the global fetch; the chat - // provider passes a streaming fetch (disabled undici stream timeouts, #175) so - // the telemetry observes the SAME transport the long agent turn actually uses. + // provider passes the streaming fetch (raised, finite undici stream timeouts, + // #175) so the telemetry observes the SAME transport the long agent turn uses. baseFetch: typeof fetch = fetch, ): typeof fetch { const logger = new Logger(context); @@ -56,7 +62,7 @@ export function createDiagnosticFetch( // clone the body) so the streamed SSE response is unaffected. const res = await baseFetch(input, init); logger.log( - `provider HTTP DIAGNOSTIC: call#${callId} OK ` + + `provider HTTP: call#${callId} OK ` + `headersAfter=${Date.now() - startedAt}ms status=${res.status} ` + `reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`, ); @@ -70,7 +76,7 @@ export function createDiagnosticFetch( cause?: { code?: string; message?: string }; }; logger.warn( - `provider HTTP DIAGNOSTIC: call#${callId} PRE-RESPONSE FAILED ` + + `provider HTTP: call#${callId} PRE-RESPONSE FAILED ` + `after=${Date.now() - startedAt}ms code=${e?.cause?.code ?? 'none'} ` + `name=${e?.name ?? 'Error'} cause=${e?.cause?.message ?? e?.message ?? 'unknown'} ` + `reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`, diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts index df6a16d5..b28ecf51 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts @@ -48,10 +48,13 @@ describe('streamTimeoutMs', () => { }); describe('createStreamingFetch — against a delayed server', () => { + const ORIG = process.env.AI_STREAM_TIMEOUT_MS; let server: http.Server; let url: string; - // The server waits before sending ANY byte (a long time-to-first-token). - const DELAY = 400; + // The server waits before sending ANY byte (a long time-to-first-token). It is + // > undici's ~1s timeout-timer granularity so a sub-second configured timeout + // fires deterministically in the load-bearing test below. + const DELAY = 1500; beforeAll(async () => { server = http.createServer((_req, res) => { @@ -69,10 +72,41 @@ describe('createStreamingFetch — against a delayed server', () => { await new Promise((resolve) => server.close(() => resolve())); }); - it('streams the delayed response instead of timing out', async () => { + afterEach(() => { + if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS; + else process.env.AI_STREAM_TIMEOUT_MS = ORIG; + }); + + it('streams the delayed response at the default (generous) timeout', async () => { + delete process.env.AI_STREAM_TIMEOUT_MS; // default 15 min >> DELAY const streamingFetch = createStreamingFetch(); const res = await streamingFetch(url); expect(res.status).toBe(200); expect(await res.text()).toBe('ok'); }); + + it('LOAD-BEARING: a sub-DELAY AI_STREAM_TIMEOUT_MS actually severs the response', async () => { + // Proves the configured dispatcher is wired into the fetch: with the timeout + // set below DELAY the call must reject with undici's headers-timeout. If the + // dispatcher were lost (fallback to global fetch's 300s default), the 1.5s + // response would slip through and this would NOT throw. + process.env.AI_STREAM_TIMEOUT_MS = '500'; + const streamingFetch = createStreamingFetch(); + let caught: unknown; + const startedAt = Date.now(); + try { + await streamingFetch(url).then((r) => r.text()); + } catch (e) { + caught = e; + } + // It rejected (a lost dispatcher -> global 300s default would NOT reject on a + // 1.5s response) and it did so BEFORE the response would have arrived (DELAY). + // Use `.name` (realm-safe) — undici's TypeError fails cross-realm instanceof. + expect(caught).toBeDefined(); + expect((caught as Error)?.name).toBe('TypeError'); + expect(Date.now() - startedAt).toBeLessThan(DELAY); + // When present, the undici cause is the headers timeout. + const code = (caught as { cause?: { code?: string } })?.cause?.code; + if (code) expect(code).toBe('UND_ERR_HEADERS_TIMEOUT'); + }); }); diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index 65bdda5c..2a524f2c 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -14,8 +14,7 @@ import { AiNotConfiguredException } from './ai-not-configured.exception'; import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception'; import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { describeProviderError } from './ai-error.util'; -// DIAGNOSTIC (provider ECONNRESET investigation) — temporary. -import { createDiagnosticFetch } from './ai-http-diagnostics'; +import { createInstrumentedFetch } from './ai-provider-http'; import { createStreamingFetch } from './ai-streaming-fetch'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { SecretBoxService } from '../crypto/secret-box'; @@ -46,12 +45,12 @@ export interface ChatModelOverride { export class AiService { private readonly logger = new Logger(AiService.name); - // Provider HTTP fetch for the chat path: a streaming fetch that DISABLES - // undici's 300s headers/body timeouts (#175 — long agent turns were severed - // mid-stream), wrapped with passive ECONNRESET-investigation telemetry so the - // logs observe the exact transport the turn uses. Held for the service - // lifetime to reuse the streaming dispatcher's connection pool. - private readonly aiDiagnosticFetch = createDiagnosticFetch( + // Provider HTTP fetch for the chat path: the streaming fetch — which RAISES + // undici's 300s headers/body timeouts to a generous-but-finite silence timeout + // so a long agent turn is not severed mid-stream (#175) — wrapped with the + // provider-HTTP instrumentation so the logs observe that exact transport. Held + // for the service lifetime to reuse the streaming dispatcher's connection pool. + private readonly aiProviderFetch = createInstrumentedFetch( 'AiService:provider-http', createStreamingFetch(), ); @@ -152,13 +151,12 @@ export class AiService { // endpoint. The default callable createOpenAI(...)(model) targets the // Responses API (/responses), which OpenAI-compatible gateways // (OpenRouter, etc.) reject on multi-turn requests (history with - // assistant messages) → 400. - // DIAGNOSTIC (provider ECONNRESET investigation) — temporary: pass the - // passive instrumented fetch (logging only; no behavior change). + // assistant messages) → 400. The provider fetch is the instrumented + // streaming fetch (finite-but-generous stream timeouts, #175). return createOpenAI({ apiKey, baseURL: baseUrl, - fetch: this.aiDiagnosticFetch, + fetch: this.aiProviderFetch, }).chat(chatModel); case 'gemini': return createGoogleGenerativeAI({ apiKey })(chatModel); -- 2.49.1