diff --git a/.env.example b/.env.example index ae932ee5..1948fafe 100644 --- a/.env.example +++ b/.env.example @@ -128,11 +128,6 @@ MCP_DOCMOST_PASSWORD= # A slow/hung embeddings endpoint fails after this and the batch continues. # AI_EMBEDDING_TIMEOUT_MS=120000 -# Diagnostic: bypass the resilient outbound HTTP layer (custom undici RetryAgent) -# for the CHAT model, using the default global fetch instead. Use only to isolate -# a streaming/transport issue; leave unset in normal operation. -# AI_BYPASS_RESILIENT_FETCH=true - # --- Anonymous public-share AI assistant --- # Opt-in per workspace (AI settings -> "public share assistant"; off by default). # When enabled, anonymous visitors of a published share can ask an AI about that diff --git a/apps/server/src/core/ai-chat/ai-chat.controller.ts b/apps/server/src/core/ai-chat/ai-chat.controller.ts index 965e8a84..0870969e 100644 --- a/apps/server/src/core/ai-chat/ai-chat.controller.ts +++ b/apps/server/src/core/ai-chat/ai-chat.controller.ts @@ -142,9 +142,6 @@ export class AiChatController { const body = (req.body ?? {}) as AiChatStreamBody; - // Diagnostic timing baseline for this turn (see START / terminal logs below). - const startedAt = Date.now(); - // Resolve the agent role for this turn BEFORE hijack: existing chats read it // from ai_chats.role_id (authoritative), a new chat from body.roleId. The // role drives both the persona and the optional model override below. @@ -170,7 +167,7 @@ export class AiChatController { // so log it here before aborting the agent loop. if (!res.raw.writableEnded) { this.logger.warn( - `AI chat stream: client disconnected before completion after ${Date.now() - startedAt}ms; aborting turn`, + 'AI chat stream: client disconnected before completion; aborting turn', ); controller.abort(); } @@ -178,10 +175,6 @@ export class AiChatController { req.raw.once('close', onClose); res.raw.once('finish', () => req.raw.off('close', onClose)); - this.logger.log( - `AI chat stream START chat=${body.chatId ?? 'new'} ua="${req.headers['user-agent'] ?? ''}"`, - ); - // Commit to streaming: hijack so Fastify stops managing the response and // the AI SDK can write the UI-message stream directly to the Node socket. res.hijack(); diff --git a/apps/server/src/core/ai-chat/ai-chat.service.ts b/apps/server/src/core/ai-chat/ai-chat.service.ts index ed8b77cf..a96a4437 100644 --- a/apps/server/src/core/ai-chat/ai-chat.service.ts +++ b/apps/server/src/core/ai-chat/ai-chat.service.ts @@ -192,7 +192,6 @@ export class AiChatService { model, role, }: AiChatStreamArgs): Promise { - const turnStartedAt = Date.now(); // Resolve / create the chat. A new chat is created when no valid chatId is // supplied or the supplied one does not belong to this workspace. let isNewChat = false; @@ -381,10 +380,6 @@ export class AiChatService { const capturedSteps: StepLike[] = []; let inProgressText = ''; - // Log only the FIRST streamed chunk so we can see the provider's observed - // time-to-first-token without flooding the log with every delta. - let firstChunkLogged = false; - // NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous // failure here (or in pipe below) would skip the terminal callbacks, so the // catch releases the leased external clients to avoid a connection leak. @@ -409,12 +404,6 @@ export class AiChatService { prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system), abortSignal: signal, onChunk: ({ chunk }) => { - if (!firstChunkLogged) { - firstChunkLogged = true; - this.logger.log( - `AI chat stream first chunk (${chunk.type}) chat=${chatId} after ${Date.now() - turnStartedAt}ms`, - ); - } // 'text-delta' is the assistant's prose; tool-call args are separate chunk // types — so this mirrors exactly what streams to the client. if (chunk.type === 'text-delta') inProgressText += chunk.text; @@ -426,9 +415,6 @@ export class AiChatService { inProgressText = ''; }, onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => { - this.logger.log( - `AI chat stream FINISHED chat=${chatId} in ${Date.now() - turnStartedAt}ms, ${steps.length} step(s)`, - ); await persistAssistant({ text, toolCalls: serializeSteps(steps), @@ -474,9 +460,6 @@ export class AiChatService { const e = error as { stack?: string }; const errorText = describeProviderError(error, String(error)); this.logger.error(`AI chat stream error: ${errorText}`, e?.stack); - this.logger.warn( - `AI chat stream ERROR terminal chat=${chatId} after ${Date.now() - turnStartedAt}ms`, - ); // Persist the PARTIAL answer streamed before the failure (text + any // finished tool steps) WITH the error in metadata, so the turn shows what // the user already saw plus the cause — not just a bare error. @@ -499,8 +482,7 @@ export class AiChatService { // invisible in the logs. Log it (warn) so the abort is traceable. this.logger.warn( `AI chat stream aborted (chat ${chatId}) after ${steps.length} ` + - `step(s), ${partialChars} chars partial text; persisting partial turn` + - ` after ${Date.now() - turnStartedAt}ms`, + `step(s), ${partialChars} chars partial text; persisting partial turn.`, ); await persistAssistant( buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'), diff --git a/apps/server/src/integrations/ai/ai-http.spec.ts b/apps/server/src/integrations/ai/ai-http.spec.ts deleted file mode 100644 index 4fbfab04..00000000 --- a/apps/server/src/integrations/ai/ai-http.spec.ts +++ /dev/null @@ -1,112 +0,0 @@ -import * as http from 'node:http'; -import { RetryAgent } from 'undici'; - -// A short header timeout makes the #140 "header stall" deterministic and fast. -// Must be set BEFORE importing ai-http (the undici agents read it at module load). -process.env.AI_HTTP_HEADERS_TIMEOUT_MS = '800'; - -import { aiFetch } from './ai-http'; - -/** - * Light, dependency-free unit checks for the shared AI HTTP layer. The module - * constructs its undici dispatcher eagerly at import time, so importing it here - * already exercises that construction; we make NO real network calls. - */ -describe('ai-http', () => { - it('exports aiFetch as a function', () => { - expect(typeof aiFetch).toBe('function'); - }); - - it('constructs the dispatcher eagerly without throwing at import time', () => { - // Reaching this assertion means the top-level Agent/RetryAgent construction - // in ai-http.ts did not throw when the module was imported above. - expect(aiFetch).toBeDefined(); - }); - - it('forwards the resilient RetryAgent dispatcher into the underlying fetch', async () => { - // CRITICAL regression guard: aiFetch must inject the shared undici dispatcher - // into the real fetch call, otherwise AI traffic silently falls back to the - // default global agent and the ECONNRESET production bug returns. aiFetch - // resolves `fetch` at call time, so spying on globalThis.fetch intercepts it - // and prevents any real network call. - const spy = jest - .spyOn(globalThis, 'fetch') - .mockResolvedValue(new Response(null)); - try { - await aiFetch('https://example.invalid/', { method: 'POST' }); - - expect(spy).toHaveBeenCalledTimes(1); - const init = spy.mock.calls[0][1] as { - dispatcher?: unknown; - method?: string; - }; - // The dispatcher must be the resilient RetryAgent, not the default agent. - expect(init.dispatcher).toBeInstanceOf(RetryAgent); - // `{ ...init }` spreading must preserve the caller's original options. - expect(init.method).toBe('POST'); - } finally { - // Never let the global fetch stub leak into other tests. - spy.mockRestore(); - } - }); -}); - -/** - * #140 regression: a provider that accepts the request but stalls without ever - * sending response headers must FAIL FAST (at headersTimeout — set to 800ms - * above, not undici's 300s default) and be RETRIED on a fresh connection. - * headersTimeout only bounds time-to-headers, so a healthy fast response is - * unaffected. Uses a real loopback server; makes no external network calls. - */ -describe('aiFetch header-stall resilience (#140)', () => { - function makeServer( - handler: http.RequestListener, - ): Promise<{ url: string; close: () => Promise }> { - return new Promise((resolve) => { - const server = http.createServer(handler); - server.listen(0, '127.0.0.1', () => { - const port = (server.address() as { port: number }).port; - resolve({ - url: `http://127.0.0.1:${port}/health`, - close: () => new Promise((r) => server.close(() => r())), - }); - }); - }); - } - - it('retries a header stall on a fresh connection and recovers', async () => { - let attempts = 0; - const { url, close } = await makeServer((_req, res) => { - attempts++; - // First attempt: never send headers -> UND_ERR_HEADERS_TIMEOUT -> retry. - if (attempts === 1) return; - res.writeHead(200, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ ok: true, servedOnAttempt: attempts })); - }); - try { - const res = await aiFetch(url, { method: 'GET' }); - expect(res.status).toBe(200); - const body = (await res.json()) as { servedOnAttempt: number }; - expect(attempts).toBeGreaterThanOrEqual(2); // the stalled attempt was retried - expect(body.servedOnAttempt).toBeGreaterThanOrEqual(2); - } finally { - await close(); - } - }, 15000); - - it('passes a healthy fast response straight through (one attempt)', async () => { - let attempts = 0; - const { url, close } = await makeServer((_req, res) => { - attempts++; - res.writeHead(200, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ ok: true })); - }); - try { - const res = await aiFetch(url, { method: 'GET' }); - expect(res.status).toBe(200); - expect(attempts).toBe(1); - } finally { - await close(); - } - }, 15000); -}); diff --git a/apps/server/src/integrations/ai/ai-http.ts b/apps/server/src/integrations/ai/ai-http.ts deleted file mode 100644 index 403b0f7d..00000000 --- a/apps/server/src/integrations/ai/ai-http.ts +++ /dev/null @@ -1,175 +0,0 @@ -import { Agent, RetryAgent, type Dispatcher } from 'undici'; -import { Logger } from '@nestjs/common'; - -/** - * Dedicated, resilient outbound HTTP layer for ALL AI provider calls. - * - * WHY THIS EXISTS - * --------------- - * Production logs showed the AI chat stream (and title generation) failing with - * `read ECONNRESET` after the AI SDK's own retries were exhausted, and - * (z.ai GLM coding endpoint, #140) intermittently stalling without ever sending - * response headers until undici's 300s default cut the request with no retry. The provider - * clients were built with NO custom `fetch`, so all outbound LLM traffic used - * Node's default global undici agent: default keep-alive pooling and NO - * transport-level reconnect on connection resets. `read ECONNRESET` is a TCP RST - * on a reused/poisoned keep-alive socket against the upstream provider/gateway. - * The AI SDK retried, but every attempt reused the same poisoned condition and - * hit the same error. - * - * WHAT THIS DOES - * -------------- - * It builds a single shared undici `RetryAgent` and exposes a `fetch`-compatible - * `aiFetch`, which is injected into every AI SDK provider factory via the - * provider `fetch` option. That covers chat stream, public-share chat, title - * generation, embeddings, STT and the test-connection probe at once. - * - * The RetryAgent retries CONNECTION-LEVEL errors (e.g. ECONNRESET) on a FRESH - * socket — opening a new connection rather than reusing the poisoned one. POST is - * explicitly opted in, because every LLM/chat/embedding/STT call is a POST and - * undici's default retry `methods` list excludes POST. HTTP-STATUS retries - * (429/5xx + Retry-After) are deliberately left to the AI SDK to avoid - * double-retry; this layer only handles transport-level reconnects. - * - * MID-STREAM NOTE - * -------------- - * This squarely fixes the production case: a reset BEFORE any response byte — - * undici reconnects on a fresh socket (no Range header). If a reset instead - * happens AFTER partial SSE bytes were already delivered, undici's RetryHandler - * attempts a Range-resume retry; LLM/SSE endpoints do not support Range and - * reject it, so the error surfaces as "server does not support the range header - * and the payload was partially consumed" instead of the raw ECONNRESET. The - * stream is NEVER corrupted (undici guards against concatenation) — only the - * error message for that rarer mid-stream case changes. - */ - -// `headersTimeout` bounds time-to-FIRST-response-headers (before any body). It -// is NOT the streaming budget: once headers arrive the SSE body streams freely, -// unaffected by this value — so it is safe to keep SHORT. Some providers (seen -// with the z.ai GLM coding endpoint, #140) intermittently accept the request but -// never send response headers; undici's 300s default then hangs the user for -// FIVE MINUTES before failing, with no retry. Cap it so a stalled request fails -// FAST and is retried on a fresh connection (the retry usually lands on a healthy -// path and responds in seconds). Env-overridable for ops tuning. -const HEADERS_TIMEOUT_MS = - Number(process.env.AI_HTTP_HEADERS_TIMEOUT_MS) || 60_000; -// `bodyTimeout` bounds the gap BETWEEN streamed body chunks (not total stream -// length). Kept generous so a legitimately slow/thinking model with sparse SSE -// chunks is never killed mid-stream. Env-overridable. -const BODY_TIMEOUT_MS = Number(process.env.AI_HTTP_BODY_TIMEOUT_MS) || 300_000; - -const baseAgent = new Agent({ - // Cap TCP/TLS connect so a stuck connect fails fast and gets retried instead - // of hanging indefinitely. - connect: { timeout: 10_000 }, - // Keep keep-alive CONSERVATIVE. A longer keep-alive widens the window in which - // a stale/half-closed socket can be reused, which is exactly the condition - // that produces `read ECONNRESET`. Do NOT raise this. - keepAliveTimeout: 4_000, - // Short time-to-headers (see HEADERS_TIMEOUT_MS) so a header stall fails fast - // and gets retried; generous per-chunk body timeout so real streams survive - // (see BODY_TIMEOUT_MS). Lowering headersTimeout does NOT truncate streams. - headersTimeout: HEADERS_TIMEOUT_MS, - bodyTimeout: BODY_TIMEOUT_MS, -}); - -const dispatcher: Dispatcher = new RetryAgent(baseAgent, { - // A poisoned keep-alive socket is almost always cured by the FIRST reconnect on - // a fresh socket, so 2 transport retries are plenty. More would only add latency - // against a genuinely-down upstream — and the AI SDK still retries on top. - maxRetries: 2, - minTimeout: 250, - maxTimeout: 2_000, - timeoutFactor: 2, - // CRITICAL: include POST — every LLM/chat/embedding/STT call is a POST, and - // undici's default `methods` list excludes POST (so without this, none of the - // AI traffic would ever be retried). - methods: ['GET', 'POST', 'PUT', 'PATCH', 'HEAD', 'OPTIONS', 'DELETE'], - // Do NOT retry on HTTP status here — leave 429/5xx + Retry-After handling to - // the AI SDK to avoid double-retry. We only want transport-level reconnects. - statusCodes: [], - // An explicit copy of undici 7.x's default connection-error code set, pinned - // here so a future undici upgrade can't silently change which transport errors - // we reconnect on. These are the errors we retry on a FRESH connection. - errorCodes: [ - 'ECONNRESET', - 'ECONNREFUSED', - 'ENOTFOUND', - 'ENETDOWN', - 'ENETUNREACH', - 'EHOSTDOWN', - 'EHOSTUNREACH', - 'UND_ERR_SOCKET', - // Added (NOT in undici's default set): a header timeout fires BEFORE any - // response body, so retrying is clean (no partially-consumed stream / Range - // problem) — and it is exactly the z.ai stall mode (#140), where a fresh - // retry usually succeeds. We deliberately do NOT retry UND_ERR_BODY_TIMEOUT - // (mid-body; partial SSE already delivered, not safe to resume). - 'UND_ERR_HEADERS_TIMEOUT', - 'EPIPE', - ], -}); - -const logger = new Logger('AiHttp'); -let requestSeq = 0; - -/** - * A `fetch`-compatible function that routes the request through the shared, - * resilient AI dispatcher. Injected into AI SDK provider factories via their - * `fetch` option. Follows the repo convention (see mcp-clients.service.ts - * `guardedFetch`). - * - * Wrapped with timing logs so provider latency is visible: for streaming - * responses `fetch` resolves when RESPONSE HEADERS arrive (the body streams - * after), so "in ms (headers received)" is exactly the provider's - * time-to-first-byte, and a rejection time pinpoints a headers/body timeout. - * Chat/Responses calls log at info; bulk embedding calls log at debug so RAG - * indexing never floods the logs. No secrets are logged — only host + pathname. - */ -export const aiFetch: typeof fetch = async (input, init) => { - const id = ++requestSeq; - const method = (init?.method ?? 'GET').toUpperCase(); - const rawUrl = - typeof input === 'string' - ? input - : input instanceof URL - ? input.href - : (input as Request).url; - let path = rawUrl; - try { - const u = new URL(rawUrl); - path = u.host + u.pathname; - } catch { - // Non-absolute / unparseable URL: keep the raw string (still no secrets). - } - const isChat = /\/(chat\/completions|responses)\b/.test(path); - const log = (msg: string): void => - isChat ? logger.log(msg) : logger.debug(msg); - const startedAt = performance.now(); - log(`provider request #${id} -> ${method} ${path}`); - try { - const res = await fetch(input, { ...init, dispatcher } as RequestInit); - const ms = Math.round(performance.now() - startedAt); - log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`); - return res; - } catch (err) { - const ms = Math.round(performance.now() - startedAt); - // Node's fetch reports a generic "fetch failed"; the real reason (e.g. an - // undici SocketError with .code ECONNRESET / UND_ERR_SOCKET / - // UND_ERR_*TIMEOUT) lives in err.cause (sometimes nested one level deeper). - // Surface the code+message of the cause chain so the failure is actionable. - const parts: string[] = []; - let cur: unknown = err; - for (let depth = 0; cur && depth < 3; depth++) { - const e = cur as { code?: string; message?: string; cause?: unknown }; - const code = e.code ? `[${e.code}] ` : ''; - const msg = e.message ?? String(e); - parts.push(`${code}${msg}`); - cur = e.cause; - } - logger.warn( - `provider request #${id} x after ${ms}ms: ${parts.join(' <- ')}`, - ); - throw err; - } -}; diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index f5f4dca3..078de791 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -14,7 +14,6 @@ import { AiNotConfiguredException } from './ai-not-configured.exception'; import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception'; import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { describeProviderError } from './ai-error.util'; -import { aiFetch } from './ai-http'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { SecretBoxService } from '../crypto/secret-box'; import { AiDriver } from './ai.types'; @@ -133,19 +132,6 @@ export class AiService { throw new AiNotConfiguredException(); } - // Diagnostic toggle: when AI_BYPASS_RESILIENT_FETCH=true the chat model - // bypasses the resilient aiFetch (custom undici RetryAgent) and uses the - // default global fetch. Isolates whether the streaming chat hang comes from - // the custom transport vs the request shape. Reversible via env, no rebuild. - const bypassResilientFetch = - process.env.AI_BYPASS_RESILIENT_FETCH === 'true'; - if (bypassResilientFetch) { - this.logger.warn( - 'AI chat: resilient aiFetch BYPASSED for chat model ' + - '(AI_BYPASS_RESILIENT_FETCH=true; using default fetch)', - ); - } - switch (driver) { case 'openai': // baseURL (when set) covers openai-compatible endpoints. Use Chat @@ -154,22 +140,12 @@ export class AiService { // Responses API (/responses), which OpenAI-compatible gateways // (OpenRouter, etc.) reject on multi-turn requests (history with // assistant messages) → 400. - return createOpenAI({ - apiKey, - baseURL: baseUrl, - ...(bypassResilientFetch ? {} : { fetch: aiFetch }), - }).chat(chatModel); + return createOpenAI({ apiKey, baseURL: baseUrl }).chat(chatModel); case 'gemini': - return createGoogleGenerativeAI({ - apiKey, - ...(bypassResilientFetch ? {} : { fetch: aiFetch }), - })(chatModel); + return createGoogleGenerativeAI({ apiKey })(chatModel); case 'ollama': // Ollama needs no API key. - return createOllama({ - baseURL: baseUrl, - ...(bypassResilientFetch ? {} : { fetch: aiFetch }), - })(chatModel); + return createOllama({ baseURL: baseUrl })(chatModel); default: throw new AiNotConfiguredException(); } @@ -204,18 +180,15 @@ export class AiService { return createOpenAI({ apiKey: cfg.embeddingApiKey, baseURL: cfg.embeddingBaseUrl, - fetch: aiFetch, }).textEmbeddingModel(cfg.embeddingModel); case 'gemini': return createGoogleGenerativeAI({ apiKey: cfg.embeddingApiKey, - fetch: aiFetch, }).textEmbeddingModel(cfg.embeddingModel); case 'ollama': // Ollama needs no API key (e.g. nomic-embed-text). return createOllama({ baseURL: cfg.embeddingBaseUrl, - fetch: aiFetch, }).textEmbeddingModel(cfg.embeddingModel); default: throw new AiEmbeddingNotConfiguredException(); @@ -262,7 +235,6 @@ export class AiService { const model = createOpenAI({ apiKey: cfg.sttApiKey ?? 'unused', baseURL, - fetch: aiFetch, }).transcription(cfg.sttModel); const { text } = await transcribe({ model, @@ -296,7 +268,7 @@ export class AiService { ); } const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`; - const res = await aiFetch(url, { + const res = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json',