diff --git a/apps/server/src/integrations/ai/ai-http.spec.ts b/apps/server/src/integrations/ai/ai-http.spec.ts new file mode 100644 index 00000000..044c18f9 --- /dev/null +++ b/apps/server/src/integrations/ai/ai-http.spec.ts @@ -0,0 +1,47 @@ +import { RetryAgent } from 'undici'; + +import { aiFetch } from './ai-http'; + +/** + * Light, dependency-free unit checks for the shared AI HTTP layer. The module + * constructs its undici dispatcher eagerly at import time, so importing it here + * already exercises that construction; we make NO real network calls. + */ +describe('ai-http', () => { + it('exports aiFetch as a function', () => { + expect(typeof aiFetch).toBe('function'); + }); + + it('constructs the dispatcher eagerly without throwing at import time', () => { + // Reaching this assertion means the top-level Agent/RetryAgent construction + // in ai-http.ts did not throw when the module was imported above. + expect(aiFetch).toBeDefined(); + }); + + it('forwards the resilient RetryAgent dispatcher into the underlying fetch', async () => { + // CRITICAL regression guard: aiFetch must inject the shared undici dispatcher + // into the real fetch call, otherwise AI traffic silently falls back to the + // default global agent and the ECONNRESET production bug returns. aiFetch + // resolves `fetch` at call time, so spying on globalThis.fetch intercepts it + // and prevents any real network call. + const spy = jest + .spyOn(globalThis, 'fetch') + .mockResolvedValue(new Response(null)); + try { + await aiFetch('https://example.invalid/', { method: 'POST' }); + + expect(spy).toHaveBeenCalledTimes(1); + const init = spy.mock.calls[0][1] as { + dispatcher?: unknown; + method?: string; + }; + // The dispatcher must be the resilient RetryAgent, not the default agent. + expect(init.dispatcher).toBeInstanceOf(RetryAgent); + // `{ ...init }` spreading must preserve the caller's original options. + expect(init.method).toBe('POST'); + } finally { + // Never let the global fetch stub leak into other tests. + spy.mockRestore(); + } + }); +}); diff --git a/apps/server/src/integrations/ai/ai-http.ts b/apps/server/src/integrations/ai/ai-http.ts new file mode 100644 index 00000000..5182393c --- /dev/null +++ b/apps/server/src/integrations/ai/ai-http.ts @@ -0,0 +1,93 @@ +import { Agent, RetryAgent, type Dispatcher } from 'undici'; + +/** + * Dedicated, resilient outbound HTTP layer for ALL AI provider calls. + * + * WHY THIS EXISTS + * --------------- + * Production logs showed the AI chat stream (and title generation) failing with + * `read ECONNRESET` after the AI SDK's own retries were exhausted. The provider + * clients were built with NO custom `fetch`, so all outbound LLM traffic used + * Node's default global undici agent: default keep-alive pooling and NO + * transport-level reconnect on connection resets. `read ECONNRESET` is a TCP RST + * on a reused/poisoned keep-alive socket against the upstream provider/gateway. + * The AI SDK retried, but every attempt reused the same poisoned condition and + * hit the same error. + * + * WHAT THIS DOES + * -------------- + * It builds a single shared undici `RetryAgent` and exposes a `fetch`-compatible + * `aiFetch`, which is injected into every AI SDK provider factory via the + * provider `fetch` option. That covers chat stream, public-share chat, title + * generation, embeddings, STT and the test-connection probe at once. + * + * The RetryAgent retries CONNECTION-LEVEL errors (e.g. ECONNRESET) on a FRESH + * socket — opening a new connection rather than reusing the poisoned one. POST is + * explicitly opted in, because every LLM/chat/embedding/STT call is a POST and + * undici's default retry `methods` list excludes POST. HTTP-STATUS retries + * (429/5xx + Retry-After) are deliberately left to the AI SDK to avoid + * double-retry; this layer only handles transport-level reconnects. + * + * MID-STREAM NOTE + * -------------- + * This squarely fixes the production case: a reset BEFORE any response byte — + * undici reconnects on a fresh socket (no Range header). If a reset instead + * happens AFTER partial SSE bytes were already delivered, undici's RetryHandler + * attempts a Range-resume retry; LLM/SSE endpoints do not support Range and + * reject it, so the error surfaces as "server does not support the range header + * and the payload was partially consumed" instead of the raw ECONNRESET. The + * stream is NEVER corrupted (undici guards against concatenation) — only the + * error message for that rarer mid-stream case changes. + */ + +const baseAgent = new Agent({ + // Cap TCP/TLS connect so a stuck connect fails fast and gets retried instead + // of hanging indefinitely. + connect: { timeout: 10_000 }, + // Keep keep-alive CONSERVATIVE. A longer keep-alive widens the window in which + // a stale/half-closed socket can be reused, which is exactly the condition + // that produces `read ECONNRESET`. Do NOT raise this. + keepAliveTimeout: 4_000, + // Do NOT override headersTimeout/bodyTimeout — keep undici defaults so + // long-lived SSE streaming responses are not killed mid-stream. +}); + +const dispatcher: Dispatcher = new RetryAgent(baseAgent, { + // A poisoned keep-alive socket is almost always cured by the FIRST reconnect on + // a fresh socket, so 2 transport retries are plenty. More would only add latency + // against a genuinely-down upstream — and the AI SDK still retries on top. + maxRetries: 2, + minTimeout: 250, + maxTimeout: 2_000, + timeoutFactor: 2, + // CRITICAL: include POST — every LLM/chat/embedding/STT call is a POST, and + // undici's default `methods` list excludes POST (so without this, none of the + // AI traffic would ever be retried). + methods: ['GET', 'POST', 'PUT', 'PATCH', 'HEAD', 'OPTIONS', 'DELETE'], + // Do NOT retry on HTTP status here — leave 429/5xx + Retry-After handling to + // the AI SDK to avoid double-retry. We only want transport-level reconnects. + statusCodes: [], + // An explicit copy of undici 7.x's default connection-error code set, pinned + // here so a future undici upgrade can't silently change which transport errors + // we reconnect on. These are the errors we retry on a FRESH connection. + errorCodes: [ + 'ECONNRESET', + 'ECONNREFUSED', + 'ENOTFOUND', + 'ENETDOWN', + 'ENETUNREACH', + 'EHOSTDOWN', + 'EHOSTUNREACH', + 'UND_ERR_SOCKET', + 'EPIPE', + ], +}); + +/** + * A `fetch`-compatible function that routes the request through the shared, + * resilient AI dispatcher. Injected into AI SDK provider factories via their + * `fetch` option. Follows the repo convention (see mcp-clients.service.ts + * `guardedFetch`). + */ +export const aiFetch: typeof fetch = (input, init) => + fetch(input, { ...init, dispatcher } as RequestInit); diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index 858118f0..442ebf9b 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -14,6 +14,7 @@ import { AiNotConfiguredException } from './ai-not-configured.exception'; import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception'; import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { describeProviderError } from './ai-error.util'; +import { aiFetch } from './ai-http'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { SecretBoxService } from '../crypto/secret-box'; import { AiDriver } from './ai.types'; @@ -140,12 +141,14 @@ export class AiService { // Responses API (/responses), which OpenAI-compatible gateways // (OpenRouter, etc.) reject on multi-turn requests (history with // assistant messages) → 400. - return createOpenAI({ apiKey, baseURL: baseUrl }).chat(chatModel); + return createOpenAI({ apiKey, baseURL: baseUrl, fetch: aiFetch }).chat( + chatModel, + ); case 'gemini': - return createGoogleGenerativeAI({ apiKey })(chatModel); + return createGoogleGenerativeAI({ apiKey, fetch: aiFetch })(chatModel); case 'ollama': // Ollama needs no API key. - return createOllama({ baseURL: baseUrl })(chatModel); + return createOllama({ baseURL: baseUrl, fetch: aiFetch })(chatModel); default: throw new AiNotConfiguredException(); } @@ -180,16 +183,19 @@ export class AiService { return createOpenAI({ apiKey: cfg.embeddingApiKey, baseURL: cfg.embeddingBaseUrl, + fetch: aiFetch, }).textEmbeddingModel(cfg.embeddingModel); case 'gemini': return createGoogleGenerativeAI({ apiKey: cfg.embeddingApiKey, + fetch: aiFetch, }).textEmbeddingModel(cfg.embeddingModel); case 'ollama': // Ollama needs no API key (e.g. nomic-embed-text). - return createOllama({ baseURL: cfg.embeddingBaseUrl }).textEmbeddingModel( - cfg.embeddingModel, - ); + return createOllama({ + baseURL: cfg.embeddingBaseUrl, + fetch: aiFetch, + }).textEmbeddingModel(cfg.embeddingModel); default: throw new AiEmbeddingNotConfiguredException(); } @@ -235,6 +241,7 @@ export class AiService { const model = createOpenAI({ apiKey: cfg.sttApiKey ?? 'unused', baseURL, + fetch: aiFetch, }).transcription(cfg.sttModel); const { text } = await transcribe({ model, @@ -268,7 +275,7 @@ export class AiService { ); } const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`; - const res = await fetch(url, { + const res = await aiFetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json',