diff --git a/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts b/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts index 30e94dc0..cdc949e2 100644 --- a/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts +++ b/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts @@ -400,6 +400,17 @@ export function validateResolvedAddresses( */ function buildPinnedDispatcher(): Agent { return new Agent({ + // Disable undici's default 300s headers/body timeouts on external MCP + // traffic. A long agent turn keeps an SSE transport (e.g. crawl4ai's + // /mcp/sse) open across the whole turn; that connection can idle BETWEEN + // tool calls longer than 5 min, and undici's bodyTimeout would otherwise + // sever it mid-task — a tool-call failure that aborts the streamed turn and + // shows the user "Lost connection to the AI provider" (#175). A slow single + // tool call (a crawl) can likewise exceed headersTimeout. Connection + // lifetime is bounded by the turn (clients are closed in onFinish/onError/ + // onAbort), so disabling the wall-clock cap is safe. + headersTimeout: 0, + bodyTimeout: 0, connect: { lookup: (hostname, _options, callback) => { // Always resolve ALL addresses ourselves; do not trust the caller's diff --git a/apps/server/src/integrations/ai/ai-http-diagnostics.ts b/apps/server/src/integrations/ai/ai-http-diagnostics.ts index eb9beeb2..0761d050 100644 --- a/apps/server/src/integrations/ai/ai-http-diagnostics.ts +++ b/apps/server/src/integrations/ai/ai-http-diagnostics.ts @@ -25,7 +25,13 @@ import { Logger } from '@nestjs/common'; * The seq/last-call timestamps are module-level, so under concurrent turns the * idle-gap figure is approximate (fine for single-user reproduction). */ -export function createDiagnosticFetch(context: string): typeof fetch { +export function createDiagnosticFetch( + context: string, + // The underlying fetch to instrument. Defaults to the global fetch; the chat + // provider passes a streaming fetch (disabled undici stream timeouts, #175) so + // the telemetry observes the SAME transport the long agent turn actually uses. + baseFetch: typeof fetch = fetch, +): typeof fetch { const logger = new Logger(context); let callSeq = 0; let lastCallStartedAt: number | undefined; @@ -46,9 +52,9 @@ export function createDiagnosticFetch(context: string): typeof fetch { ? body.byteLength : undefined; try { - // Delegate to global fetch; return the Response UNTOUCHED (never read/clone - // the body) so the streamed SSE response is unaffected. - const res = await fetch(input, init); + // Delegate to the base fetch; return the Response UNTOUCHED (never read/ + // clone the body) so the streamed SSE response is unaffected. + const res = await baseFetch(input, init); logger.log( `provider HTTP DIAGNOSTIC: call#${callId} OK ` + `headersAfter=${Date.now() - startedAt}ms status=${res.status} ` + diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts new file mode 100644 index 00000000..2db93cbc --- /dev/null +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts @@ -0,0 +1,50 @@ +import * as http from 'node:http'; +import { + createStreamingFetch, + STREAMING_DISPATCHER_OPTIONS, +} from './ai-streaming-fetch'; + +/** + * #175: Node's global fetch (undici) defaults headers/body timeouts to 300s and + * severs a long agent turn mid-stream. createStreamingFetch must DISABLE those + * timeouts (0). We pin the option contract deterministically AND confirm the + * built fetch actually streams a deliberately-delayed response. + */ +describe('createStreamingFetch', () => { + it('disables BOTH undici stream timeouts (the #175 contract)', () => { + expect(STREAMING_DISPATCHER_OPTIONS.headersTimeout).toBe(0); + expect(STREAMING_DISPATCHER_OPTIONS.bodyTimeout).toBe(0); + }); + + describe('against a delayed server', () => { + let server: http.Server; + let url: string; + // The server waits before sending ANY byte (a long time-to-first-token). + const DELAY = 400; + + beforeAll(async () => { + server = http.createServer((_req, res) => { + setTimeout(() => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.end('ok'); + }, DELAY); + }); + await new Promise((resolve) => + server.listen(0, '127.0.0.1', resolve), + ); + const addr = server.address() as import('node:net').AddressInfo; + url = `http://127.0.0.1:${addr.port}/`; + }); + + afterAll(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + it('streams the delayed response instead of timing out', async () => { + const streamingFetch = createStreamingFetch(); + const res = await streamingFetch(url); + expect(res.status).toBe(200); + expect(await res.text()).toBe('ok'); + }); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.ts new file mode 100644 index 00000000..a3a42d34 --- /dev/null +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.ts @@ -0,0 +1,45 @@ +import { Agent } from 'undici'; + +/** + * Build a `fetch` for LONG-LIVED streaming AI calls (the agent chat turn). + * + * Node's global fetch (undici) defaults BOTH `headersTimeout` and `bodyTimeout` + * to 300_000ms. A legitimate long agent turn trips that and is severed + * mid-stream — surfacing to the user as "Lost connection to the AI provider" + * (issue #175): + * - headersTimeout (time to the first response byte) is exceeded when a late + * step sends a huge accumulated context and the model's time-to-first-token + * grows past 5 min; + * - bodyTimeout (max gap BETWEEN body chunks) is exceeded when a reasoning + * model pauses to "think" for more than 5 min between emitted chunks. + * + * This dispatcher disables both timeouts (0). Cancellation is NOT lost: the + * agent turn is bound to the request's abortSignal, which fires when the client + * disconnects (see ai-chat.controller), and the agent loop is bounded by + * MAX_AGENT_STEPS — so a turn still terminates; it just no longer dies at an + * arbitrary 5-minute wall-clock. keepAlive (undici default) is kept so the + * sequential per-step calls of one turn reuse the connection. + * + * A single shared dispatcher is returned (callers hold it for the service + * lifetime) so its connection pool is reused across requests. + */ +/** + * undici Agent options for streaming AI calls: both stream timeouts DISABLED (0) + * so a long turn is never severed at undici's 300s default (#175). Exported so a + * test can pin the contract without a timing-dependent assertion. + */ +export const STREAMING_DISPATCHER_OPTIONS = { + headersTimeout: 0, + bodyTimeout: 0, +} as const; + +export function createStreamingFetch(): typeof fetch { + const dispatcher = new Agent({ ...STREAMING_DISPATCHER_OPTIONS }); + return ((input: Parameters[0], init?: RequestInit) => + fetch(input, { + ...(init ?? {}), + // `dispatcher` is an undici-specific init field (not in the DOM RequestInit + // type); Node's global fetch reads it. Cast to satisfy the type. + dispatcher, + } as RequestInit & { dispatcher: Agent })) as typeof fetch; +} diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index 4f72d23b..65bdda5c 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -16,6 +16,7 @@ import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { describeProviderError } from './ai-error.util'; // DIAGNOSTIC (provider ECONNRESET investigation) — temporary. import { createDiagnosticFetch } from './ai-http-diagnostics'; +import { createStreamingFetch } from './ai-streaming-fetch'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { SecretBoxService } from '../crypto/secret-box'; import { AiDriver } from './ai.types'; @@ -45,11 +46,14 @@ export interface ChatModelOverride { export class AiService { private readonly logger = new Logger(AiService.name); - // DIAGNOSTIC (provider ECONNRESET investigation) — temporary: passive - // instrumentation of the OpenAI-compatible provider HTTP calls (z.ai). - // Logs call timing/outcome only — no behavior change. + // Provider HTTP fetch for the chat path: a streaming fetch that DISABLES + // undici's 300s headers/body timeouts (#175 — long agent turns were severed + // mid-stream), wrapped with passive ECONNRESET-investigation telemetry so the + // logs observe the exact transport the turn uses. Held for the service + // lifetime to reuse the streaming dispatcher's connection pool. private readonly aiDiagnosticFetch = createDiagnosticFetch( 'AiService:provider-http', + createStreamingFetch(), ); constructor(