fix(ai-chat): reconnect on provider ECONNRESET via a resilient fetch
Outbound LLM calls used Node's default global undici agent (default keep-alive pooling, no transport-level reconnect), so a TCP RST on a reused/poisoned keep-alive socket surfaced as "Cannot connect to API: read ECONNRESET" and failed the chat stream and title generation after the AI SDK's own retries were exhausted. Add a dedicated resilient outbound HTTP layer (ai-http.ts): a shared undici RetryAgent over a tuned Agent, exposed as `aiFetch` and injected into every AI provider factory (createOpenAI chat/embeddings/STT, createGoogleGenerativeAI, createOllama) plus the raw JSON STT fetch. The RetryAgent reconnects on connection-level errors (ECONNRESET, ...) on a FRESH socket, opts POST into the retry methods (undici's default list excludes POST), and leaves HTTP-status retries (429/5xx + Retry-After) to the AI SDK to avoid double-retry. - ai-http.ts: shared RetryAgent(Agent) + aiFetch (maxRetries 2, conservative keep-alive, connect timeout, streaming-safe timeouts) - ai.service.ts: inject fetch: aiFetch into every provider factory - ai-http.spec.ts: regression test that aiFetch injects the RetryAgent dispatcher into the underlying fetch Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
47
apps/server/src/integrations/ai/ai-http.spec.ts
Normal file
47
apps/server/src/integrations/ai/ai-http.spec.ts
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
import { RetryAgent } from 'undici';
|
||||||
|
|
||||||
|
import { aiFetch } from './ai-http';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Light, dependency-free unit checks for the shared AI HTTP layer. The module
|
||||||
|
* constructs its undici dispatcher eagerly at import time, so importing it here
|
||||||
|
* already exercises that construction; we make NO real network calls.
|
||||||
|
*/
|
||||||
|
describe('ai-http', () => {
|
||||||
|
it('exports aiFetch as a function', () => {
|
||||||
|
expect(typeof aiFetch).toBe('function');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('constructs the dispatcher eagerly without throwing at import time', () => {
|
||||||
|
// Reaching this assertion means the top-level Agent/RetryAgent construction
|
||||||
|
// in ai-http.ts did not throw when the module was imported above.
|
||||||
|
expect(aiFetch).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('forwards the resilient RetryAgent dispatcher into the underlying fetch', async () => {
|
||||||
|
// CRITICAL regression guard: aiFetch must inject the shared undici dispatcher
|
||||||
|
// into the real fetch call, otherwise AI traffic silently falls back to the
|
||||||
|
// default global agent and the ECONNRESET production bug returns. aiFetch
|
||||||
|
// resolves `fetch` at call time, so spying on globalThis.fetch intercepts it
|
||||||
|
// and prevents any real network call.
|
||||||
|
const spy = jest
|
||||||
|
.spyOn(globalThis, 'fetch')
|
||||||
|
.mockResolvedValue(new Response(null));
|
||||||
|
try {
|
||||||
|
await aiFetch('https://example.invalid/', { method: 'POST' });
|
||||||
|
|
||||||
|
expect(spy).toHaveBeenCalledTimes(1);
|
||||||
|
const init = spy.mock.calls[0][1] as {
|
||||||
|
dispatcher?: unknown;
|
||||||
|
method?: string;
|
||||||
|
};
|
||||||
|
// The dispatcher must be the resilient RetryAgent, not the default agent.
|
||||||
|
expect(init.dispatcher).toBeInstanceOf(RetryAgent);
|
||||||
|
// `{ ...init }` spreading must preserve the caller's original options.
|
||||||
|
expect(init.method).toBe('POST');
|
||||||
|
} finally {
|
||||||
|
// Never let the global fetch stub leak into other tests.
|
||||||
|
spy.mockRestore();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
93
apps/server/src/integrations/ai/ai-http.ts
Normal file
93
apps/server/src/integrations/ai/ai-http.ts
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
import { Agent, RetryAgent, type Dispatcher } from 'undici';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dedicated, resilient outbound HTTP layer for ALL AI provider calls.
|
||||||
|
*
|
||||||
|
* WHY THIS EXISTS
|
||||||
|
* ---------------
|
||||||
|
* Production logs showed the AI chat stream (and title generation) failing with
|
||||||
|
* `read ECONNRESET` after the AI SDK's own retries were exhausted. The provider
|
||||||
|
* clients were built with NO custom `fetch`, so all outbound LLM traffic used
|
||||||
|
* Node's default global undici agent: default keep-alive pooling and NO
|
||||||
|
* transport-level reconnect on connection resets. `read ECONNRESET` is a TCP RST
|
||||||
|
* on a reused/poisoned keep-alive socket against the upstream provider/gateway.
|
||||||
|
* The AI SDK retried, but every attempt reused the same poisoned condition and
|
||||||
|
* hit the same error.
|
||||||
|
*
|
||||||
|
* WHAT THIS DOES
|
||||||
|
* --------------
|
||||||
|
* It builds a single shared undici `RetryAgent` and exposes a `fetch`-compatible
|
||||||
|
* `aiFetch`, which is injected into every AI SDK provider factory via the
|
||||||
|
* provider `fetch` option. That covers chat stream, public-share chat, title
|
||||||
|
* generation, embeddings, STT and the test-connection probe at once.
|
||||||
|
*
|
||||||
|
* The RetryAgent retries CONNECTION-LEVEL errors (e.g. ECONNRESET) on a FRESH
|
||||||
|
* socket — opening a new connection rather than reusing the poisoned one. POST is
|
||||||
|
* explicitly opted in, because every LLM/chat/embedding/STT call is a POST and
|
||||||
|
* undici's default retry `methods` list excludes POST. HTTP-STATUS retries
|
||||||
|
* (429/5xx + Retry-After) are deliberately left to the AI SDK to avoid
|
||||||
|
* double-retry; this layer only handles transport-level reconnects.
|
||||||
|
*
|
||||||
|
* MID-STREAM NOTE
|
||||||
|
* --------------
|
||||||
|
* This squarely fixes the production case: a reset BEFORE any response byte —
|
||||||
|
* undici reconnects on a fresh socket (no Range header). If a reset instead
|
||||||
|
* happens AFTER partial SSE bytes were already delivered, undici's RetryHandler
|
||||||
|
* attempts a Range-resume retry; LLM/SSE endpoints do not support Range and
|
||||||
|
* reject it, so the error surfaces as "server does not support the range header
|
||||||
|
* and the payload was partially consumed" instead of the raw ECONNRESET. The
|
||||||
|
* stream is NEVER corrupted (undici guards against concatenation) — only the
|
||||||
|
* error message for that rarer mid-stream case changes.
|
||||||
|
*/
|
||||||
|
|
||||||
|
const baseAgent = new Agent({
|
||||||
|
// Cap TCP/TLS connect so a stuck connect fails fast and gets retried instead
|
||||||
|
// of hanging indefinitely.
|
||||||
|
connect: { timeout: 10_000 },
|
||||||
|
// Keep keep-alive CONSERVATIVE. A longer keep-alive widens the window in which
|
||||||
|
// a stale/half-closed socket can be reused, which is exactly the condition
|
||||||
|
// that produces `read ECONNRESET`. Do NOT raise this.
|
||||||
|
keepAliveTimeout: 4_000,
|
||||||
|
// Do NOT override headersTimeout/bodyTimeout — keep undici defaults so
|
||||||
|
// long-lived SSE streaming responses are not killed mid-stream.
|
||||||
|
});
|
||||||
|
|
||||||
|
const dispatcher: Dispatcher = new RetryAgent(baseAgent, {
|
||||||
|
// A poisoned keep-alive socket is almost always cured by the FIRST reconnect on
|
||||||
|
// a fresh socket, so 2 transport retries are plenty. More would only add latency
|
||||||
|
// against a genuinely-down upstream — and the AI SDK still retries on top.
|
||||||
|
maxRetries: 2,
|
||||||
|
minTimeout: 250,
|
||||||
|
maxTimeout: 2_000,
|
||||||
|
timeoutFactor: 2,
|
||||||
|
// CRITICAL: include POST — every LLM/chat/embedding/STT call is a POST, and
|
||||||
|
// undici's default `methods` list excludes POST (so without this, none of the
|
||||||
|
// AI traffic would ever be retried).
|
||||||
|
methods: ['GET', 'POST', 'PUT', 'PATCH', 'HEAD', 'OPTIONS', 'DELETE'],
|
||||||
|
// Do NOT retry on HTTP status here — leave 429/5xx + Retry-After handling to
|
||||||
|
// the AI SDK to avoid double-retry. We only want transport-level reconnects.
|
||||||
|
statusCodes: [],
|
||||||
|
// An explicit copy of undici 7.x's default connection-error code set, pinned
|
||||||
|
// here so a future undici upgrade can't silently change which transport errors
|
||||||
|
// we reconnect on. These are the errors we retry on a FRESH connection.
|
||||||
|
errorCodes: [
|
||||||
|
'ECONNRESET',
|
||||||
|
'ECONNREFUSED',
|
||||||
|
'ENOTFOUND',
|
||||||
|
'ENETDOWN',
|
||||||
|
'ENETUNREACH',
|
||||||
|
'EHOSTDOWN',
|
||||||
|
'EHOSTUNREACH',
|
||||||
|
'UND_ERR_SOCKET',
|
||||||
|
'EPIPE',
|
||||||
|
],
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `fetch`-compatible function that routes the request through the shared,
|
||||||
|
* resilient AI dispatcher. Injected into AI SDK provider factories via their
|
||||||
|
* `fetch` option. Follows the repo convention (see mcp-clients.service.ts
|
||||||
|
* `guardedFetch`).
|
||||||
|
*/
|
||||||
|
export const aiFetch: typeof fetch = (input, init) =>
|
||||||
|
fetch(input, { ...init, dispatcher } as RequestInit);
|
||||||
@@ -14,6 +14,7 @@ import { AiNotConfiguredException } from './ai-not-configured.exception';
|
|||||||
import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception';
|
import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception';
|
||||||
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
||||||
import { describeProviderError } from './ai-error.util';
|
import { describeProviderError } from './ai-error.util';
|
||||||
|
import { aiFetch } from './ai-http';
|
||||||
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
||||||
import { SecretBoxService } from '../crypto/secret-box';
|
import { SecretBoxService } from '../crypto/secret-box';
|
||||||
import { AiDriver } from './ai.types';
|
import { AiDriver } from './ai.types';
|
||||||
@@ -140,12 +141,14 @@ export class AiService {
|
|||||||
// Responses API (/responses), which OpenAI-compatible gateways
|
// Responses API (/responses), which OpenAI-compatible gateways
|
||||||
// (OpenRouter, etc.) reject on multi-turn requests (history with
|
// (OpenRouter, etc.) reject on multi-turn requests (history with
|
||||||
// assistant messages) → 400.
|
// assistant messages) → 400.
|
||||||
return createOpenAI({ apiKey, baseURL: baseUrl }).chat(chatModel);
|
return createOpenAI({ apiKey, baseURL: baseUrl, fetch: aiFetch }).chat(
|
||||||
|
chatModel,
|
||||||
|
);
|
||||||
case 'gemini':
|
case 'gemini':
|
||||||
return createGoogleGenerativeAI({ apiKey })(chatModel);
|
return createGoogleGenerativeAI({ apiKey, fetch: aiFetch })(chatModel);
|
||||||
case 'ollama':
|
case 'ollama':
|
||||||
// Ollama needs no API key.
|
// Ollama needs no API key.
|
||||||
return createOllama({ baseURL: baseUrl })(chatModel);
|
return createOllama({ baseURL: baseUrl, fetch: aiFetch })(chatModel);
|
||||||
default:
|
default:
|
||||||
throw new AiNotConfiguredException();
|
throw new AiNotConfiguredException();
|
||||||
}
|
}
|
||||||
@@ -180,16 +183,19 @@ export class AiService {
|
|||||||
return createOpenAI({
|
return createOpenAI({
|
||||||
apiKey: cfg.embeddingApiKey,
|
apiKey: cfg.embeddingApiKey,
|
||||||
baseURL: cfg.embeddingBaseUrl,
|
baseURL: cfg.embeddingBaseUrl,
|
||||||
|
fetch: aiFetch,
|
||||||
}).textEmbeddingModel(cfg.embeddingModel);
|
}).textEmbeddingModel(cfg.embeddingModel);
|
||||||
case 'gemini':
|
case 'gemini':
|
||||||
return createGoogleGenerativeAI({
|
return createGoogleGenerativeAI({
|
||||||
apiKey: cfg.embeddingApiKey,
|
apiKey: cfg.embeddingApiKey,
|
||||||
|
fetch: aiFetch,
|
||||||
}).textEmbeddingModel(cfg.embeddingModel);
|
}).textEmbeddingModel(cfg.embeddingModel);
|
||||||
case 'ollama':
|
case 'ollama':
|
||||||
// Ollama needs no API key (e.g. nomic-embed-text).
|
// Ollama needs no API key (e.g. nomic-embed-text).
|
||||||
return createOllama({ baseURL: cfg.embeddingBaseUrl }).textEmbeddingModel(
|
return createOllama({
|
||||||
cfg.embeddingModel,
|
baseURL: cfg.embeddingBaseUrl,
|
||||||
);
|
fetch: aiFetch,
|
||||||
|
}).textEmbeddingModel(cfg.embeddingModel);
|
||||||
default:
|
default:
|
||||||
throw new AiEmbeddingNotConfiguredException();
|
throw new AiEmbeddingNotConfiguredException();
|
||||||
}
|
}
|
||||||
@@ -235,6 +241,7 @@ export class AiService {
|
|||||||
const model = createOpenAI({
|
const model = createOpenAI({
|
||||||
apiKey: cfg.sttApiKey ?? 'unused',
|
apiKey: cfg.sttApiKey ?? 'unused',
|
||||||
baseURL,
|
baseURL,
|
||||||
|
fetch: aiFetch,
|
||||||
}).transcription(cfg.sttModel);
|
}).transcription(cfg.sttModel);
|
||||||
const { text } = await transcribe({
|
const { text } = await transcribe({
|
||||||
model,
|
model,
|
||||||
@@ -268,7 +275,7 @@ export class AiService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`;
|
const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`;
|
||||||
const res = await fetch(url, {
|
const res = await aiFetch(url, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: {
|
headers: {
|
||||||
'Content-Type': 'application/json',
|
'Content-Type': 'application/json',
|
||||||
|
|||||||
Reference in New Issue
Block a user