revert(ai-http): drop resilient fetch/RetryAgent layer (#140)
The custom undici RetryAgent + aiFetch transport added for issue #140 did not actually heal mid-stream provider drops: undici's retry path is a Range-based download-resume that SSE/chat-completions endpoints cannot satisfy, so a reset after the first byte only swapped ECONNRESET for a "server does not support the range header" error. Its only real effect was reconnecting a poisoned keep-alive socket before the first byte, and PR #141 on top of it turned the 60s headers timeout into deterministic ~61s failures (plus CONTENT_LENGTH_MISMATCH from retrying a POST body after a timeout abort). The root cause is the z.ai coding endpoint, not our transport. Remove the whole layer and return all AI provider calls to Node's default global fetch. - delete integrations/ai/ai-http.ts and its spec - ai.service.ts: drop the aiFetch import, the AI_BYPASS_RESILIENT_FETCH diagnostic toggle, and fetch:aiFetch from every chat/embedding/STT factory; raw STT call back to global fetch - ai-chat.controller.ts: drop the stream-timing START log + startedAt - ai-chat.service.ts: drop the first-chunk/FINISHED/ERROR timing logs - .env.example: drop AI_BYPASS_RESILIENT_FETCH Reverts:1af5d34a,7c308728,b7abb7ea,35fc58ea,d6cd2754,6efb8656. Preserved (not part of the rollback): client-disconnect abort, title generation in onFinish, partial-answer persistence, Safari SSE heartbeat. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -128,11 +128,6 @@ MCP_DOCMOST_PASSWORD=
|
|||||||
# A slow/hung embeddings endpoint fails after this and the batch continues.
|
# A slow/hung embeddings endpoint fails after this and the batch continues.
|
||||||
# AI_EMBEDDING_TIMEOUT_MS=120000
|
# AI_EMBEDDING_TIMEOUT_MS=120000
|
||||||
|
|
||||||
# Diagnostic: bypass the resilient outbound HTTP layer (custom undici RetryAgent)
|
|
||||||
# for the CHAT model, using the default global fetch instead. Use only to isolate
|
|
||||||
# a streaming/transport issue; leave unset in normal operation.
|
|
||||||
# AI_BYPASS_RESILIENT_FETCH=true
|
|
||||||
|
|
||||||
# --- Anonymous public-share AI assistant ---
|
# --- Anonymous public-share AI assistant ---
|
||||||
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
||||||
# When enabled, anonymous visitors of a published share can ask an AI about that
|
# When enabled, anonymous visitors of a published share can ask an AI about that
|
||||||
|
|||||||
@@ -142,9 +142,6 @@ export class AiChatController {
|
|||||||
|
|
||||||
const body = (req.body ?? {}) as AiChatStreamBody;
|
const body = (req.body ?? {}) as AiChatStreamBody;
|
||||||
|
|
||||||
// Diagnostic timing baseline for this turn (see START / terminal logs below).
|
|
||||||
const startedAt = Date.now();
|
|
||||||
|
|
||||||
// Resolve the agent role for this turn BEFORE hijack: existing chats read it
|
// Resolve the agent role for this turn BEFORE hijack: existing chats read it
|
||||||
// from ai_chats.role_id (authoritative), a new chat from body.roleId. The
|
// from ai_chats.role_id (authoritative), a new chat from body.roleId. The
|
||||||
// role drives both the persona and the optional model override below.
|
// role drives both the persona and the optional model override below.
|
||||||
@@ -170,7 +167,7 @@ export class AiChatController {
|
|||||||
// so log it here before aborting the agent loop.
|
// so log it here before aborting the agent loop.
|
||||||
if (!res.raw.writableEnded) {
|
if (!res.raw.writableEnded) {
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
`AI chat stream: client disconnected before completion after ${Date.now() - startedAt}ms; aborting turn`,
|
'AI chat stream: client disconnected before completion; aborting turn',
|
||||||
);
|
);
|
||||||
controller.abort();
|
controller.abort();
|
||||||
}
|
}
|
||||||
@@ -178,10 +175,6 @@ export class AiChatController {
|
|||||||
req.raw.once('close', onClose);
|
req.raw.once('close', onClose);
|
||||||
res.raw.once('finish', () => req.raw.off('close', onClose));
|
res.raw.once('finish', () => req.raw.off('close', onClose));
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`AI chat stream START chat=${body.chatId ?? 'new'} ua="${req.headers['user-agent'] ?? ''}"`,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Commit to streaming: hijack so Fastify stops managing the response and
|
// Commit to streaming: hijack so Fastify stops managing the response and
|
||||||
// the AI SDK can write the UI-message stream directly to the Node socket.
|
// the AI SDK can write the UI-message stream directly to the Node socket.
|
||||||
res.hijack();
|
res.hijack();
|
||||||
|
|||||||
@@ -192,7 +192,6 @@ export class AiChatService {
|
|||||||
model,
|
model,
|
||||||
role,
|
role,
|
||||||
}: AiChatStreamArgs): Promise<void> {
|
}: AiChatStreamArgs): Promise<void> {
|
||||||
const turnStartedAt = Date.now();
|
|
||||||
// Resolve / create the chat. A new chat is created when no valid chatId is
|
// Resolve / create the chat. A new chat is created when no valid chatId is
|
||||||
// supplied or the supplied one does not belong to this workspace.
|
// supplied or the supplied one does not belong to this workspace.
|
||||||
let isNewChat = false;
|
let isNewChat = false;
|
||||||
@@ -381,10 +380,6 @@ export class AiChatService {
|
|||||||
const capturedSteps: StepLike[] = [];
|
const capturedSteps: StepLike[] = [];
|
||||||
let inProgressText = '';
|
let inProgressText = '';
|
||||||
|
|
||||||
// Log only the FIRST streamed chunk so we can see the provider's observed
|
|
||||||
// time-to-first-token without flooding the log with every delta.
|
|
||||||
let firstChunkLogged = false;
|
|
||||||
|
|
||||||
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
||||||
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
||||||
// catch releases the leased external clients to avoid a connection leak.
|
// catch releases the leased external clients to avoid a connection leak.
|
||||||
@@ -409,12 +404,6 @@ export class AiChatService {
|
|||||||
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
||||||
abortSignal: signal,
|
abortSignal: signal,
|
||||||
onChunk: ({ chunk }) => {
|
onChunk: ({ chunk }) => {
|
||||||
if (!firstChunkLogged) {
|
|
||||||
firstChunkLogged = true;
|
|
||||||
this.logger.log(
|
|
||||||
`AI chat stream first chunk (${chunk.type}) chat=${chatId} after ${Date.now() - turnStartedAt}ms`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
||||||
// types — so this mirrors exactly what streams to the client.
|
// types — so this mirrors exactly what streams to the client.
|
||||||
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
||||||
@@ -426,9 +415,6 @@ export class AiChatService {
|
|||||||
inProgressText = '';
|
inProgressText = '';
|
||||||
},
|
},
|
||||||
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
||||||
this.logger.log(
|
|
||||||
`AI chat stream FINISHED chat=${chatId} in ${Date.now() - turnStartedAt}ms, ${steps.length} step(s)`,
|
|
||||||
);
|
|
||||||
await persistAssistant({
|
await persistAssistant({
|
||||||
text,
|
text,
|
||||||
toolCalls: serializeSteps(steps),
|
toolCalls: serializeSteps(steps),
|
||||||
@@ -474,9 +460,6 @@ export class AiChatService {
|
|||||||
const e = error as { stack?: string };
|
const e = error as { stack?: string };
|
||||||
const errorText = describeProviderError(error, String(error));
|
const errorText = describeProviderError(error, String(error));
|
||||||
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
||||||
this.logger.warn(
|
|
||||||
`AI chat stream ERROR terminal chat=${chatId} after ${Date.now() - turnStartedAt}ms`,
|
|
||||||
);
|
|
||||||
// Persist the PARTIAL answer streamed before the failure (text + any
|
// Persist the PARTIAL answer streamed before the failure (text + any
|
||||||
// finished tool steps) WITH the error in metadata, so the turn shows what
|
// finished tool steps) WITH the error in metadata, so the turn shows what
|
||||||
// the user already saw plus the cause — not just a bare error.
|
// the user already saw plus the cause — not just a bare error.
|
||||||
@@ -499,8 +482,7 @@ export class AiChatService {
|
|||||||
// invisible in the logs. Log it (warn) so the abort is traceable.
|
// invisible in the logs. Log it (warn) so the abort is traceable.
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
||||||
`step(s), ${partialChars} chars partial text; persisting partial turn` +
|
`step(s), ${partialChars} chars partial text; persisting partial turn.`,
|
||||||
` after ${Date.now() - turnStartedAt}ms`,
|
|
||||||
);
|
);
|
||||||
await persistAssistant(
|
await persistAssistant(
|
||||||
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
||||||
|
|||||||
@@ -1,112 +0,0 @@
|
|||||||
import * as http from 'node:http';
|
|
||||||
import { RetryAgent } from 'undici';
|
|
||||||
|
|
||||||
// A short header timeout makes the #140 "header stall" deterministic and fast.
|
|
||||||
// Must be set BEFORE importing ai-http (the undici agents read it at module load).
|
|
||||||
process.env.AI_HTTP_HEADERS_TIMEOUT_MS = '800';
|
|
||||||
|
|
||||||
import { aiFetch } from './ai-http';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Light, dependency-free unit checks for the shared AI HTTP layer. The module
|
|
||||||
* constructs its undici dispatcher eagerly at import time, so importing it here
|
|
||||||
* already exercises that construction; we make NO real network calls.
|
|
||||||
*/
|
|
||||||
describe('ai-http', () => {
|
|
||||||
it('exports aiFetch as a function', () => {
|
|
||||||
expect(typeof aiFetch).toBe('function');
|
|
||||||
});
|
|
||||||
|
|
||||||
it('constructs the dispatcher eagerly without throwing at import time', () => {
|
|
||||||
// Reaching this assertion means the top-level Agent/RetryAgent construction
|
|
||||||
// in ai-http.ts did not throw when the module was imported above.
|
|
||||||
expect(aiFetch).toBeDefined();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('forwards the resilient RetryAgent dispatcher into the underlying fetch', async () => {
|
|
||||||
// CRITICAL regression guard: aiFetch must inject the shared undici dispatcher
|
|
||||||
// into the real fetch call, otherwise AI traffic silently falls back to the
|
|
||||||
// default global agent and the ECONNRESET production bug returns. aiFetch
|
|
||||||
// resolves `fetch` at call time, so spying on globalThis.fetch intercepts it
|
|
||||||
// and prevents any real network call.
|
|
||||||
const spy = jest
|
|
||||||
.spyOn(globalThis, 'fetch')
|
|
||||||
.mockResolvedValue(new Response(null));
|
|
||||||
try {
|
|
||||||
await aiFetch('https://example.invalid/', { method: 'POST' });
|
|
||||||
|
|
||||||
expect(spy).toHaveBeenCalledTimes(1);
|
|
||||||
const init = spy.mock.calls[0][1] as {
|
|
||||||
dispatcher?: unknown;
|
|
||||||
method?: string;
|
|
||||||
};
|
|
||||||
// The dispatcher must be the resilient RetryAgent, not the default agent.
|
|
||||||
expect(init.dispatcher).toBeInstanceOf(RetryAgent);
|
|
||||||
// `{ ...init }` spreading must preserve the caller's original options.
|
|
||||||
expect(init.method).toBe('POST');
|
|
||||||
} finally {
|
|
||||||
// Never let the global fetch stub leak into other tests.
|
|
||||||
spy.mockRestore();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* #140 regression: a provider that accepts the request but stalls without ever
|
|
||||||
* sending response headers must FAIL FAST (at headersTimeout — set to 800ms
|
|
||||||
* above, not undici's 300s default) and be RETRIED on a fresh connection.
|
|
||||||
* headersTimeout only bounds time-to-headers, so a healthy fast response is
|
|
||||||
* unaffected. Uses a real loopback server; makes no external network calls.
|
|
||||||
*/
|
|
||||||
describe('aiFetch header-stall resilience (#140)', () => {
|
|
||||||
function makeServer(
|
|
||||||
handler: http.RequestListener,
|
|
||||||
): Promise<{ url: string; close: () => Promise<void> }> {
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
const server = http.createServer(handler);
|
|
||||||
server.listen(0, '127.0.0.1', () => {
|
|
||||||
const port = (server.address() as { port: number }).port;
|
|
||||||
resolve({
|
|
||||||
url: `http://127.0.0.1:${port}/health`,
|
|
||||||
close: () => new Promise<void>((r) => server.close(() => r())),
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
it('retries a header stall on a fresh connection and recovers', async () => {
|
|
||||||
let attempts = 0;
|
|
||||||
const { url, close } = await makeServer((_req, res) => {
|
|
||||||
attempts++;
|
|
||||||
// First attempt: never send headers -> UND_ERR_HEADERS_TIMEOUT -> retry.
|
|
||||||
if (attempts === 1) return;
|
|
||||||
res.writeHead(200, { 'content-type': 'application/json' });
|
|
||||||
res.end(JSON.stringify({ ok: true, servedOnAttempt: attempts }));
|
|
||||||
});
|
|
||||||
try {
|
|
||||||
const res = await aiFetch(url, { method: 'GET' });
|
|
||||||
expect(res.status).toBe(200);
|
|
||||||
const body = (await res.json()) as { servedOnAttempt: number };
|
|
||||||
expect(attempts).toBeGreaterThanOrEqual(2); // the stalled attempt was retried
|
|
||||||
expect(body.servedOnAttempt).toBeGreaterThanOrEqual(2);
|
|
||||||
} finally {
|
|
||||||
await close();
|
|
||||||
}
|
|
||||||
}, 15000);
|
|
||||||
|
|
||||||
it('passes a healthy fast response straight through (one attempt)', async () => {
|
|
||||||
let attempts = 0;
|
|
||||||
const { url, close } = await makeServer((_req, res) => {
|
|
||||||
attempts++;
|
|
||||||
res.writeHead(200, { 'content-type': 'application/json' });
|
|
||||||
res.end(JSON.stringify({ ok: true }));
|
|
||||||
});
|
|
||||||
try {
|
|
||||||
const res = await aiFetch(url, { method: 'GET' });
|
|
||||||
expect(res.status).toBe(200);
|
|
||||||
expect(attempts).toBe(1);
|
|
||||||
} finally {
|
|
||||||
await close();
|
|
||||||
}
|
|
||||||
}, 15000);
|
|
||||||
});
|
|
||||||
@@ -1,175 +0,0 @@
|
|||||||
import { Agent, RetryAgent, type Dispatcher } from 'undici';
|
|
||||||
import { Logger } from '@nestjs/common';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Dedicated, resilient outbound HTTP layer for ALL AI provider calls.
|
|
||||||
*
|
|
||||||
* WHY THIS EXISTS
|
|
||||||
* ---------------
|
|
||||||
* Production logs showed the AI chat stream (and title generation) failing with
|
|
||||||
* `read ECONNRESET` after the AI SDK's own retries were exhausted, and
|
|
||||||
* (z.ai GLM coding endpoint, #140) intermittently stalling without ever sending
|
|
||||||
* response headers until undici's 300s default cut the request with no retry. The provider
|
|
||||||
* clients were built with NO custom `fetch`, so all outbound LLM traffic used
|
|
||||||
* Node's default global undici agent: default keep-alive pooling and NO
|
|
||||||
* transport-level reconnect on connection resets. `read ECONNRESET` is a TCP RST
|
|
||||||
* on a reused/poisoned keep-alive socket against the upstream provider/gateway.
|
|
||||||
* The AI SDK retried, but every attempt reused the same poisoned condition and
|
|
||||||
* hit the same error.
|
|
||||||
*
|
|
||||||
* WHAT THIS DOES
|
|
||||||
* --------------
|
|
||||||
* It builds a single shared undici `RetryAgent` and exposes a `fetch`-compatible
|
|
||||||
* `aiFetch`, which is injected into every AI SDK provider factory via the
|
|
||||||
* provider `fetch` option. That covers chat stream, public-share chat, title
|
|
||||||
* generation, embeddings, STT and the test-connection probe at once.
|
|
||||||
*
|
|
||||||
* The RetryAgent retries CONNECTION-LEVEL errors (e.g. ECONNRESET) on a FRESH
|
|
||||||
* socket — opening a new connection rather than reusing the poisoned one. POST is
|
|
||||||
* explicitly opted in, because every LLM/chat/embedding/STT call is a POST and
|
|
||||||
* undici's default retry `methods` list excludes POST. HTTP-STATUS retries
|
|
||||||
* (429/5xx + Retry-After) are deliberately left to the AI SDK to avoid
|
|
||||||
* double-retry; this layer only handles transport-level reconnects.
|
|
||||||
*
|
|
||||||
* MID-STREAM NOTE
|
|
||||||
* --------------
|
|
||||||
* This squarely fixes the production case: a reset BEFORE any response byte —
|
|
||||||
* undici reconnects on a fresh socket (no Range header). If a reset instead
|
|
||||||
* happens AFTER partial SSE bytes were already delivered, undici's RetryHandler
|
|
||||||
* attempts a Range-resume retry; LLM/SSE endpoints do not support Range and
|
|
||||||
* reject it, so the error surfaces as "server does not support the range header
|
|
||||||
* and the payload was partially consumed" instead of the raw ECONNRESET. The
|
|
||||||
* stream is NEVER corrupted (undici guards against concatenation) — only the
|
|
||||||
* error message for that rarer mid-stream case changes.
|
|
||||||
*/
|
|
||||||
|
|
||||||
// `headersTimeout` bounds time-to-FIRST-response-headers (before any body). It
|
|
||||||
// is NOT the streaming budget: once headers arrive the SSE body streams freely,
|
|
||||||
// unaffected by this value — so it is safe to keep SHORT. Some providers (seen
|
|
||||||
// with the z.ai GLM coding endpoint, #140) intermittently accept the request but
|
|
||||||
// never send response headers; undici's 300s default then hangs the user for
|
|
||||||
// FIVE MINUTES before failing, with no retry. Cap it so a stalled request fails
|
|
||||||
// FAST and is retried on a fresh connection (the retry usually lands on a healthy
|
|
||||||
// path and responds in seconds). Env-overridable for ops tuning.
|
|
||||||
const HEADERS_TIMEOUT_MS =
|
|
||||||
Number(process.env.AI_HTTP_HEADERS_TIMEOUT_MS) || 60_000;
|
|
||||||
// `bodyTimeout` bounds the gap BETWEEN streamed body chunks (not total stream
|
|
||||||
// length). Kept generous so a legitimately slow/thinking model with sparse SSE
|
|
||||||
// chunks is never killed mid-stream. Env-overridable.
|
|
||||||
const BODY_TIMEOUT_MS = Number(process.env.AI_HTTP_BODY_TIMEOUT_MS) || 300_000;
|
|
||||||
|
|
||||||
const baseAgent = new Agent({
|
|
||||||
// Cap TCP/TLS connect so a stuck connect fails fast and gets retried instead
|
|
||||||
// of hanging indefinitely.
|
|
||||||
connect: { timeout: 10_000 },
|
|
||||||
// Keep keep-alive CONSERVATIVE. A longer keep-alive widens the window in which
|
|
||||||
// a stale/half-closed socket can be reused, which is exactly the condition
|
|
||||||
// that produces `read ECONNRESET`. Do NOT raise this.
|
|
||||||
keepAliveTimeout: 4_000,
|
|
||||||
// Short time-to-headers (see HEADERS_TIMEOUT_MS) so a header stall fails fast
|
|
||||||
// and gets retried; generous per-chunk body timeout so real streams survive
|
|
||||||
// (see BODY_TIMEOUT_MS). Lowering headersTimeout does NOT truncate streams.
|
|
||||||
headersTimeout: HEADERS_TIMEOUT_MS,
|
|
||||||
bodyTimeout: BODY_TIMEOUT_MS,
|
|
||||||
});
|
|
||||||
|
|
||||||
const dispatcher: Dispatcher = new RetryAgent(baseAgent, {
|
|
||||||
// A poisoned keep-alive socket is almost always cured by the FIRST reconnect on
|
|
||||||
// a fresh socket, so 2 transport retries are plenty. More would only add latency
|
|
||||||
// against a genuinely-down upstream — and the AI SDK still retries on top.
|
|
||||||
maxRetries: 2,
|
|
||||||
minTimeout: 250,
|
|
||||||
maxTimeout: 2_000,
|
|
||||||
timeoutFactor: 2,
|
|
||||||
// CRITICAL: include POST — every LLM/chat/embedding/STT call is a POST, and
|
|
||||||
// undici's default `methods` list excludes POST (so without this, none of the
|
|
||||||
// AI traffic would ever be retried).
|
|
||||||
methods: ['GET', 'POST', 'PUT', 'PATCH', 'HEAD', 'OPTIONS', 'DELETE'],
|
|
||||||
// Do NOT retry on HTTP status here — leave 429/5xx + Retry-After handling to
|
|
||||||
// the AI SDK to avoid double-retry. We only want transport-level reconnects.
|
|
||||||
statusCodes: [],
|
|
||||||
// An explicit copy of undici 7.x's default connection-error code set, pinned
|
|
||||||
// here so a future undici upgrade can't silently change which transport errors
|
|
||||||
// we reconnect on. These are the errors we retry on a FRESH connection.
|
|
||||||
errorCodes: [
|
|
||||||
'ECONNRESET',
|
|
||||||
'ECONNREFUSED',
|
|
||||||
'ENOTFOUND',
|
|
||||||
'ENETDOWN',
|
|
||||||
'ENETUNREACH',
|
|
||||||
'EHOSTDOWN',
|
|
||||||
'EHOSTUNREACH',
|
|
||||||
'UND_ERR_SOCKET',
|
|
||||||
// Added (NOT in undici's default set): a header timeout fires BEFORE any
|
|
||||||
// response body, so retrying is clean (no partially-consumed stream / Range
|
|
||||||
// problem) — and it is exactly the z.ai stall mode (#140), where a fresh
|
|
||||||
// retry usually succeeds. We deliberately do NOT retry UND_ERR_BODY_TIMEOUT
|
|
||||||
// (mid-body; partial SSE already delivered, not safe to resume).
|
|
||||||
'UND_ERR_HEADERS_TIMEOUT',
|
|
||||||
'EPIPE',
|
|
||||||
],
|
|
||||||
});
|
|
||||||
|
|
||||||
const logger = new Logger('AiHttp');
|
|
||||||
let requestSeq = 0;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A `fetch`-compatible function that routes the request through the shared,
|
|
||||||
* resilient AI dispatcher. Injected into AI SDK provider factories via their
|
|
||||||
* `fetch` option. Follows the repo convention (see mcp-clients.service.ts
|
|
||||||
* `guardedFetch`).
|
|
||||||
*
|
|
||||||
* Wrapped with timing logs so provider latency is visible: for streaming
|
|
||||||
* responses `fetch` resolves when RESPONSE HEADERS arrive (the body streams
|
|
||||||
* after), so "in <ms>ms (headers received)" is exactly the provider's
|
|
||||||
* time-to-first-byte, and a rejection time pinpoints a headers/body timeout.
|
|
||||||
* Chat/Responses calls log at info; bulk embedding calls log at debug so RAG
|
|
||||||
* indexing never floods the logs. No secrets are logged — only host + pathname.
|
|
||||||
*/
|
|
||||||
export const aiFetch: typeof fetch = async (input, init) => {
|
|
||||||
const id = ++requestSeq;
|
|
||||||
const method = (init?.method ?? 'GET').toUpperCase();
|
|
||||||
const rawUrl =
|
|
||||||
typeof input === 'string'
|
|
||||||
? input
|
|
||||||
: input instanceof URL
|
|
||||||
? input.href
|
|
||||||
: (input as Request).url;
|
|
||||||
let path = rawUrl;
|
|
||||||
try {
|
|
||||||
const u = new URL(rawUrl);
|
|
||||||
path = u.host + u.pathname;
|
|
||||||
} catch {
|
|
||||||
// Non-absolute / unparseable URL: keep the raw string (still no secrets).
|
|
||||||
}
|
|
||||||
const isChat = /\/(chat\/completions|responses)\b/.test(path);
|
|
||||||
const log = (msg: string): void =>
|
|
||||||
isChat ? logger.log(msg) : logger.debug(msg);
|
|
||||||
const startedAt = performance.now();
|
|
||||||
log(`provider request #${id} -> ${method} ${path}`);
|
|
||||||
try {
|
|
||||||
const res = await fetch(input, { ...init, dispatcher } as RequestInit);
|
|
||||||
const ms = Math.round(performance.now() - startedAt);
|
|
||||||
log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`);
|
|
||||||
return res;
|
|
||||||
} catch (err) {
|
|
||||||
const ms = Math.round(performance.now() - startedAt);
|
|
||||||
// Node's fetch reports a generic "fetch failed"; the real reason (e.g. an
|
|
||||||
// undici SocketError with .code ECONNRESET / UND_ERR_SOCKET /
|
|
||||||
// UND_ERR_*TIMEOUT) lives in err.cause (sometimes nested one level deeper).
|
|
||||||
// Surface the code+message of the cause chain so the failure is actionable.
|
|
||||||
const parts: string[] = [];
|
|
||||||
let cur: unknown = err;
|
|
||||||
for (let depth = 0; cur && depth < 3; depth++) {
|
|
||||||
const e = cur as { code?: string; message?: string; cause?: unknown };
|
|
||||||
const code = e.code ? `[${e.code}] ` : '';
|
|
||||||
const msg = e.message ?? String(e);
|
|
||||||
parts.push(`${code}${msg}`);
|
|
||||||
cur = e.cause;
|
|
||||||
}
|
|
||||||
logger.warn(
|
|
||||||
`provider request #${id} x after ${ms}ms: ${parts.join(' <- ')}`,
|
|
||||||
);
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
@@ -14,7 +14,6 @@ import { AiNotConfiguredException } from './ai-not-configured.exception';
|
|||||||
import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception';
|
import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception';
|
||||||
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
||||||
import { describeProviderError } from './ai-error.util';
|
import { describeProviderError } from './ai-error.util';
|
||||||
import { aiFetch } from './ai-http';
|
|
||||||
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
||||||
import { SecretBoxService } from '../crypto/secret-box';
|
import { SecretBoxService } from '../crypto/secret-box';
|
||||||
import { AiDriver } from './ai.types';
|
import { AiDriver } from './ai.types';
|
||||||
@@ -133,19 +132,6 @@ export class AiService {
|
|||||||
throw new AiNotConfiguredException();
|
throw new AiNotConfiguredException();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Diagnostic toggle: when AI_BYPASS_RESILIENT_FETCH=true the chat model
|
|
||||||
// bypasses the resilient aiFetch (custom undici RetryAgent) and uses the
|
|
||||||
// default global fetch. Isolates whether the streaming chat hang comes from
|
|
||||||
// the custom transport vs the request shape. Reversible via env, no rebuild.
|
|
||||||
const bypassResilientFetch =
|
|
||||||
process.env.AI_BYPASS_RESILIENT_FETCH === 'true';
|
|
||||||
if (bypassResilientFetch) {
|
|
||||||
this.logger.warn(
|
|
||||||
'AI chat: resilient aiFetch BYPASSED for chat model ' +
|
|
||||||
'(AI_BYPASS_RESILIENT_FETCH=true; using default fetch)',
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (driver) {
|
switch (driver) {
|
||||||
case 'openai':
|
case 'openai':
|
||||||
// baseURL (when set) covers openai-compatible endpoints. Use Chat
|
// baseURL (when set) covers openai-compatible endpoints. Use Chat
|
||||||
@@ -154,22 +140,12 @@ export class AiService {
|
|||||||
// Responses API (/responses), which OpenAI-compatible gateways
|
// Responses API (/responses), which OpenAI-compatible gateways
|
||||||
// (OpenRouter, etc.) reject on multi-turn requests (history with
|
// (OpenRouter, etc.) reject on multi-turn requests (history with
|
||||||
// assistant messages) → 400.
|
// assistant messages) → 400.
|
||||||
return createOpenAI({
|
return createOpenAI({ apiKey, baseURL: baseUrl }).chat(chatModel);
|
||||||
apiKey,
|
|
||||||
baseURL: baseUrl,
|
|
||||||
...(bypassResilientFetch ? {} : { fetch: aiFetch }),
|
|
||||||
}).chat(chatModel);
|
|
||||||
case 'gemini':
|
case 'gemini':
|
||||||
return createGoogleGenerativeAI({
|
return createGoogleGenerativeAI({ apiKey })(chatModel);
|
||||||
apiKey,
|
|
||||||
...(bypassResilientFetch ? {} : { fetch: aiFetch }),
|
|
||||||
})(chatModel);
|
|
||||||
case 'ollama':
|
case 'ollama':
|
||||||
// Ollama needs no API key.
|
// Ollama needs no API key.
|
||||||
return createOllama({
|
return createOllama({ baseURL: baseUrl })(chatModel);
|
||||||
baseURL: baseUrl,
|
|
||||||
...(bypassResilientFetch ? {} : { fetch: aiFetch }),
|
|
||||||
})(chatModel);
|
|
||||||
default:
|
default:
|
||||||
throw new AiNotConfiguredException();
|
throw new AiNotConfiguredException();
|
||||||
}
|
}
|
||||||
@@ -204,18 +180,15 @@ export class AiService {
|
|||||||
return createOpenAI({
|
return createOpenAI({
|
||||||
apiKey: cfg.embeddingApiKey,
|
apiKey: cfg.embeddingApiKey,
|
||||||
baseURL: cfg.embeddingBaseUrl,
|
baseURL: cfg.embeddingBaseUrl,
|
||||||
fetch: aiFetch,
|
|
||||||
}).textEmbeddingModel(cfg.embeddingModel);
|
}).textEmbeddingModel(cfg.embeddingModel);
|
||||||
case 'gemini':
|
case 'gemini':
|
||||||
return createGoogleGenerativeAI({
|
return createGoogleGenerativeAI({
|
||||||
apiKey: cfg.embeddingApiKey,
|
apiKey: cfg.embeddingApiKey,
|
||||||
fetch: aiFetch,
|
|
||||||
}).textEmbeddingModel(cfg.embeddingModel);
|
}).textEmbeddingModel(cfg.embeddingModel);
|
||||||
case 'ollama':
|
case 'ollama':
|
||||||
// Ollama needs no API key (e.g. nomic-embed-text).
|
// Ollama needs no API key (e.g. nomic-embed-text).
|
||||||
return createOllama({
|
return createOllama({
|
||||||
baseURL: cfg.embeddingBaseUrl,
|
baseURL: cfg.embeddingBaseUrl,
|
||||||
fetch: aiFetch,
|
|
||||||
}).textEmbeddingModel(cfg.embeddingModel);
|
}).textEmbeddingModel(cfg.embeddingModel);
|
||||||
default:
|
default:
|
||||||
throw new AiEmbeddingNotConfiguredException();
|
throw new AiEmbeddingNotConfiguredException();
|
||||||
@@ -262,7 +235,6 @@ export class AiService {
|
|||||||
const model = createOpenAI({
|
const model = createOpenAI({
|
||||||
apiKey: cfg.sttApiKey ?? 'unused',
|
apiKey: cfg.sttApiKey ?? 'unused',
|
||||||
baseURL,
|
baseURL,
|
||||||
fetch: aiFetch,
|
|
||||||
}).transcription(cfg.sttModel);
|
}).transcription(cfg.sttModel);
|
||||||
const { text } = await transcribe({
|
const { text } = await transcribe({
|
||||||
model,
|
model,
|
||||||
@@ -296,7 +268,7 @@ export class AiService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`;
|
const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`;
|
||||||
const res = await aiFetch(url, {
|
const res = await fetch(url, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: {
|
headers: {
|
||||||
'Content-Type': 'application/json',
|
'Content-Type': 'application/json',
|
||||||
|
|||||||
Reference in New Issue
Block a user