@@ -1,12 +1,44 @@
|
||||
import * as http from 'node:http';
|
||||
import { RetryAgent } from 'undici';
|
||||
|
||||
// A short header timeout makes the #140 "header stall" deterministic and fast.
|
||||
// Must be set BEFORE importing ai-http (the undici agents read it at module load).
|
||||
process.env.AI_HTTP_HEADERS_TIMEOUT_MS = '800';
|
||||
import { Agent, RetryAgent } from 'undici';
|
||||
|
||||
import { aiFetch } from './ai-http';
|
||||
|
||||
/**
|
||||
* Spin up a throwaway loopback HTTP server. `onReq` decides how it responds (or
|
||||
* deliberately stalls). Returns the base URL and a close fn.
|
||||
*/
|
||||
async function loopback(
|
||||
onReq: (req: http.IncomingMessage, res: http.ServerResponse) => void,
|
||||
): Promise<{ url: string; close: () => Promise<void> }> {
|
||||
const server = http.createServer(onReq);
|
||||
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
|
||||
const { port } = server.address() as { port: number };
|
||||
return {
|
||||
url: `http://127.0.0.1:${port}`,
|
||||
close: () =>
|
||||
new Promise<void>((resolve) => server.close(() => resolve())),
|
||||
};
|
||||
}
|
||||
|
||||
async function codeChain(p: Promise<unknown>): Promise<string[]> {
|
||||
try {
|
||||
await p;
|
||||
return ['<no-throw>'];
|
||||
} catch (e) {
|
||||
const chain: string[] = [];
|
||||
let cur: unknown = e;
|
||||
for (let d = 0; d < 4 && cur; d++) {
|
||||
const x = cur as { code?: string; cause?: unknown };
|
||||
// Capture BOTH the undici error .code (when present) and the stringified
|
||||
// error (which carries the class name, e.g. RequestContentLengthMismatchError)
|
||||
// so the chain is meaningful regardless of how the code surfaces.
|
||||
chain.push(`${x.code ?? ''} ${String(x)}`.trim());
|
||||
cur = x.cause;
|
||||
}
|
||||
return chain;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Light, dependency-free unit checks for the shared AI HTTP layer. The module
|
||||
* constructs its undici dispatcher eagerly at import time, so importing it here
|
||||
@@ -49,64 +81,171 @@ describe('ai-http', () => {
|
||||
spy.mockRestore();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* #140 regression: a provider that accepts the request but stalls without ever
|
||||
* sending response headers must FAIL FAST (at headersTimeout — set to 800ms
|
||||
* above, not undici's 300s default) and be RETRIED on a fresh connection.
|
||||
* headersTimeout only bounds time-to-headers, so a healthy fast response is
|
||||
* unaffected. Uses a real loopback server; makes no external network calls.
|
||||
*/
|
||||
describe('aiFetch header-stall resilience (#140)', () => {
|
||||
function makeServer(
|
||||
handler: http.RequestListener,
|
||||
): Promise<{ url: string; close: () => Promise<void> }> {
|
||||
return new Promise((resolve) => {
|
||||
const server = http.createServer(handler);
|
||||
server.listen(0, '127.0.0.1', () => {
|
||||
const port = (server.address() as { port: number }).port;
|
||||
resolve({
|
||||
url: `http://127.0.0.1:${port}/health`,
|
||||
close: () => new Promise<void>((r) => server.close(() => r())),
|
||||
});
|
||||
/**
|
||||
* #140 ROOT-CAUSE LOCK. These tests reproduce, against a loopback server that
|
||||
* NEVER sends response headers (standing in for z.ai's slow-first-byte
|
||||
* reasoning turn), exactly why the issue-#141 transport config "broke every
|
||||
* time" — and assert the corrected config does not. They use SHORT finite
|
||||
* timeouts on locally-built agents so the suite stays fast; the shipped
|
||||
* `aiFetch` disables these timeouts entirely (see HEADERS_TIMEOUT_MS).
|
||||
*/
|
||||
describe('#140 headers-timeout retry hazard', () => {
|
||||
it('BAD (issue #141): retrying a POST on UND_ERR_HEADERS_TIMEOUT throws CONTENT_LENGTH_MISMATCH', async () => {
|
||||
// A server that consumes the request body but never replies -> the client
|
||||
// headersTimeout fires. This mirrors z.ai stalling before the first byte.
|
||||
const srv = await loopback((req) => req.resume());
|
||||
const base = new Agent({
|
||||
headersTimeout: 300,
|
||||
bodyTimeout: 300,
|
||||
connect: { timeout: 1000 },
|
||||
});
|
||||
// The exact mistake of #141: retry POST on the headers-timeout code.
|
||||
const bad = new RetryAgent(base, {
|
||||
maxRetries: 2,
|
||||
minTimeout: 50,
|
||||
maxTimeout: 100,
|
||||
methods: ['POST'],
|
||||
statusCodes: [],
|
||||
errorCodes: ['ECONNRESET', 'UND_ERR_HEADERS_TIMEOUT'],
|
||||
});
|
||||
const chain = await codeChain(
|
||||
fetch(`${srv.url}/chat/completions`, {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
// Multibyte body, like a real Russian chat turn.
|
||||
body: JSON.stringify({ msg: `Привет ${'слово '.repeat(20)}😀` }),
|
||||
dispatcher: bad,
|
||||
} as RequestInit),
|
||||
);
|
||||
// The hallmark production error (#140/#141) — proves the retry corrupts
|
||||
// the re-sent POST body rather than surfacing the real timeout. undici
|
||||
// exposes it as code UND_ERR_REQ_CONTENT_LENGTH_MISMATCH /
|
||||
// RequestContentLengthMismatchError.
|
||||
expect(chain.join('|')).toMatch(/ContentLengthMismatch|CONTENT_LENGTH/);
|
||||
await bad.close(); // closing the RetryAgent wrapper closes the base pool
|
||||
await srv.close();
|
||||
});
|
||||
}
|
||||
|
||||
it('retries a header stall on a fresh connection and recovers', async () => {
|
||||
let attempts = 0;
|
||||
const { url, close } = await makeServer((_req, res) => {
|
||||
attempts++;
|
||||
// First attempt: never send headers -> UND_ERR_HEADERS_TIMEOUT -> retry.
|
||||
if (attempts === 1) return;
|
||||
res.writeHead(200, { 'content-type': 'application/json' });
|
||||
res.end(JSON.stringify({ ok: true, servedOnAttempt: attempts }));
|
||||
it('GOOD (this fix): NOT retrying the headers-timeout surfaces the honest UND_ERR_HEADERS_TIMEOUT', async () => {
|
||||
const srv = await loopback((req) => req.resume());
|
||||
const base = new Agent({
|
||||
headersTimeout: 300,
|
||||
bodyTimeout: 300,
|
||||
connect: { timeout: 1000 },
|
||||
});
|
||||
// Our shipped retry set: connection resets only, NO timeout codes.
|
||||
const good = new RetryAgent(base, {
|
||||
maxRetries: 2,
|
||||
minTimeout: 50,
|
||||
maxTimeout: 100,
|
||||
methods: ['POST'],
|
||||
statusCodes: [],
|
||||
errorCodes: ['ECONNRESET', 'UND_ERR_SOCKET', 'EPIPE'],
|
||||
});
|
||||
const chain = await codeChain(
|
||||
fetch(`${srv.url}/chat/completions`, {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
body: JSON.stringify({ msg: `Привет ${'слово '.repeat(20)}😀` }),
|
||||
dispatcher: good,
|
||||
} as RequestInit),
|
||||
);
|
||||
// The honest timeout (UND_ERR_HEADERS_TIMEOUT / HeadersTimeoutError) — no
|
||||
// CONTENT_LENGTH_MISMATCH corruption.
|
||||
expect(chain.join('|')).toMatch(/HeadersTimeout|HEADERS_TIMEOUT/);
|
||||
expect(chain.join('|')).not.toMatch(/ContentLengthMismatch|CONTENT_LENGTH/);
|
||||
await good.close(); // closing the RetryAgent wrapper closes the base pool
|
||||
await srv.close();
|
||||
});
|
||||
try {
|
||||
const res = await aiFetch(url, { method: 'GET' });
|
||||
expect(res.status).toBe(200);
|
||||
const body = (await res.json()) as { servedOnAttempt: number };
|
||||
expect(attempts).toBeGreaterThanOrEqual(2); // the stalled attempt was retried
|
||||
expect(body.servedOnAttempt).toBeGreaterThanOrEqual(2);
|
||||
} finally {
|
||||
await close();
|
||||
}
|
||||
}, 15000);
|
||||
|
||||
it('passes a healthy fast response straight through (one attempt)', async () => {
|
||||
let attempts = 0;
|
||||
const { url, close } = await makeServer((_req, res) => {
|
||||
attempts++;
|
||||
res.writeHead(200, { 'content-type': 'application/json' });
|
||||
res.end(JSON.stringify({ ok: true }));
|
||||
});
|
||||
try {
|
||||
const res = await aiFetch(url, { method: 'GET' });
|
||||
it('aiFetch awaits a slow-first-byte response that a finite headersTimeout would abort (curl parity)', async () => {
|
||||
// The SAME server delays the response headers by 2.5s (z.ai's slow first
|
||||
// byte). A control agent with a TIGHT headersTimeout aborts it; the shipped
|
||||
// aiFetch (generous 120s default) must instead deliver the 200 — proving a
|
||||
// real slow-first-byte turn is tolerated, not killed the way #141's 60s cap
|
||||
// did. The gap is wide because undici's timeout timer wheel is coarse (~1s);
|
||||
// a 2.5s delay vs a 1s control timeout makes the abort deterministic.
|
||||
const srv = await loopback((_req, res) => {
|
||||
setTimeout(() => {
|
||||
res.writeHead(200, { 'content-type': 'text/event-stream' });
|
||||
res.end('data: ok\n\n');
|
||||
}, 2500);
|
||||
});
|
||||
const url = `${srv.url}/chat/completions`;
|
||||
const init: RequestInit = {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
body: JSON.stringify({ msg: 'Привет 😀' }),
|
||||
};
|
||||
|
||||
// CONTROL: a finite headersTimeout (like #141) aborts the slow first byte.
|
||||
const finite = new Agent({ headersTimeout: 1000, bodyTimeout: 1000 });
|
||||
const controlChain = await codeChain(
|
||||
fetch(url, { ...init, dispatcher: finite } as RequestInit),
|
||||
);
|
||||
expect(controlChain.join('|')).toMatch(/HeadersTimeout|HEADERS_TIMEOUT/);
|
||||
await finite.close();
|
||||
|
||||
// THE FIX: aiFetch (disabled timeouts) waits and delivers the 200.
|
||||
const res = await aiFetch(url, init);
|
||||
expect(res.status).toBe(200);
|
||||
expect(attempts).toBe(1);
|
||||
} finally {
|
||||
await close();
|
||||
}
|
||||
}, 15000);
|
||||
await res.text();
|
||||
await srv.close();
|
||||
});
|
||||
});
|
||||
|
||||
describe('#140 z.ai concurrency gate + 429 backoff', () => {
|
||||
it('serializes concurrent requests to the same host until the first body is consumed', async () => {
|
||||
const arrivals: string[] = [];
|
||||
const srv = await loopback((req, res) => {
|
||||
arrivals.push(req.url ?? '');
|
||||
setTimeout(() => {
|
||||
res.writeHead(200, { 'content-type': 'text/plain' });
|
||||
res.end('body');
|
||||
}, 30);
|
||||
});
|
||||
const mk = (q: string): Promise<Response> =>
|
||||
aiFetch(`${srv.url}/chat/completions?${q}`, { method: 'POST', body: q });
|
||||
|
||||
// A acquires the only slot and gets its headers; its BODY is not yet read,
|
||||
// so the slot is still held.
|
||||
const pA = mk('a');
|
||||
const resA = await pA;
|
||||
// B is requested now but must QUEUE behind A's still-open body.
|
||||
const pB = mk('b');
|
||||
await new Promise((r) => setTimeout(r, 120));
|
||||
// B must NOT have hit the server yet — only A has arrived.
|
||||
expect(arrivals.length).toBe(1);
|
||||
|
||||
// Consuming A's body releases the slot, letting B proceed.
|
||||
await resA.text();
|
||||
const resB = await pB;
|
||||
await resB.text();
|
||||
expect(arrivals.length).toBe(2);
|
||||
await srv.close();
|
||||
});
|
||||
|
||||
it('waits out a 429 (Retry-After) and returns the eventual success', async () => {
|
||||
let n = 0;
|
||||
const srv = await loopback((_req, res) => {
|
||||
n++;
|
||||
if (n === 1) {
|
||||
res.writeHead(429, { 'retry-after': '0' });
|
||||
res.end('rate limited');
|
||||
} else {
|
||||
res.writeHead(200, { 'content-type': 'text/plain' });
|
||||
res.end('ok');
|
||||
}
|
||||
});
|
||||
const res = await aiFetch(`${srv.url}/chat/completions`, {
|
||||
method: 'POST',
|
||||
body: 'x',
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
expect(await res.text()).toBe('ok');
|
||||
expect(n).toBe(2); // one 429, retried once on backoff, then 200
|
||||
await srv.close();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -7,9 +7,7 @@ import { Logger } from '@nestjs/common';
|
||||
* WHY THIS EXISTS
|
||||
* ---------------
|
||||
* Production logs showed the AI chat stream (and title generation) failing with
|
||||
* `read ECONNRESET` after the AI SDK's own retries were exhausted, and
|
||||
* (z.ai GLM coding endpoint, #140) intermittently stalling without ever sending
|
||||
* response headers until undici's 300s default cut the request with no retry. The provider
|
||||
* `read ECONNRESET` after the AI SDK's own retries were exhausted. The provider
|
||||
* clients were built with NO custom `fetch`, so all outbound LLM traffic used
|
||||
* Node's default global undici agent: default keep-alive pooling and NO
|
||||
* transport-level reconnect on connection resets. `read ECONNRESET` is a TCP RST
|
||||
@@ -41,22 +39,64 @@ import { Logger } from '@nestjs/common';
|
||||
* and the payload was partially consumed" instead of the raw ECONNRESET. The
|
||||
* stream is NEVER corrupted (undici guards against concatenation) — only the
|
||||
* error message for that rarer mid-stream case changes.
|
||||
*
|
||||
* ROOT CAUSE OF #140 (z.ai chat stream "breaks every time", curl does not)
|
||||
* ----------------------------------------------------------------------
|
||||
* z.ai's coding endpoint (`api/coding/paas/v4`, GLM-5.2 — a REASONING model)
|
||||
* has a long, variable TIME-TO-FIRST-RESPONSE-HEADER on a heavy chat request
|
||||
* (tools + system prompt + injected document + history): it sits in its
|
||||
* reasoning phase emitting nothing for tens of seconds before the first SSE
|
||||
* byte. A trivial `generateText` ping returns in <2s, which is why the Settings
|
||||
* "test connection" always passes while the real chat stalls. `curl` succeeds
|
||||
* because it imposes NO time-to-first-header limit — it simply waits.
|
||||
*
|
||||
* The earlier "fix" (issue #141) made this STRICTLY WORSE in two ways:
|
||||
* 1. It set `headersTimeout: 60_000`, so undici ABORTS every heavy turn at
|
||||
* ~60s — deterministically, before z.ai answers (the prod logs show every
|
||||
* request failing at ~61-62s). That is the "ломается КАЖДЫЙ раз" regression.
|
||||
* 2. It added `UND_ERR_HEADERS_TIMEOUT` to the RetryAgent retry codes. Retrying
|
||||
* a POST-with-body after a headers-timeout abort makes undici re-send the
|
||||
* body against a torn-down request and throw
|
||||
* `UND_ERR_REQ_CONTENT_LENGTH_MISMATCH` — the exact production error.
|
||||
* (Reproduced in ai-http.spec.ts.)
|
||||
*
|
||||
* THE FIX (three parts, all in this module):
|
||||
* 1. GENEROUS-FINITE timeouts (default 120s), not the 60s of #141 and not
|
||||
* "infinite". A 30-min probe showed paced single requests always answer in
|
||||
* <10s; so 120s tolerates real slow turns yet bounds a genuinely-stuck one.
|
||||
* 2. NEVER retry a header/body timeout at the transport layer (only genuine
|
||||
* connection resets are retried on a fresh socket) — retrying a timed-out
|
||||
* POST-with-body is what produced #141's CONTENT_LENGTH_MISMATCH.
|
||||
* 3. Serialize per host + back off on 429 (see below): z.ai's coding plan
|
||||
* throttles bursts (the agent fires up to 20 requests/turn), so we hold a
|
||||
* single in-flight slot per host and wait out 429s instead of cascading.
|
||||
*/
|
||||
|
||||
// `headersTimeout` bounds time-to-FIRST-response-headers (before any body). It
|
||||
// is NOT the streaming budget: once headers arrive the SSE body streams freely,
|
||||
// unaffected by this value — so it is safe to keep SHORT. Some providers (seen
|
||||
// with the z.ai GLM coding endpoint, #140) intermittently accept the request but
|
||||
// never send response headers; undici's 300s default then hangs the user for
|
||||
// FIVE MINUTES before failing, with no retry. Cap it so a stalled request fails
|
||||
// FAST and is retried on a fresh connection (the retry usually lands on a healthy
|
||||
// path and responds in seconds). Env-overridable for ops tuning.
|
||||
const HEADERS_TIMEOUT_MS =
|
||||
Number(process.env.AI_HTTP_HEADERS_TIMEOUT_MS) || 60_000;
|
||||
// `bodyTimeout` bounds the gap BETWEEN streamed body chunks (not total stream
|
||||
// length). Kept generous so a legitimately slow/thinking model with sparse SSE
|
||||
// chunks is never killed mid-stream. Env-overridable.
|
||||
const BODY_TIMEOUT_MS = Number(process.env.AI_HTTP_BODY_TIMEOUT_MS) || 300_000;
|
||||
// Time-to-FIRST-response-headers (`headersTimeout`) and gap-between-streamed-
|
||||
// body-chunks (`bodyTimeout`) bounds, in ms. GENEROUS but FINITE by default
|
||||
// (env-overridable; set 0 to disable entirely).
|
||||
//
|
||||
// Rationale (measured): a PACED single z.ai request responds within ~10s and
|
||||
// NEVER hung over a 30-min probe (22/22, max 9.9s). z.ai only stalls under
|
||||
// BURSTS — so paired with the per-host concurrency gate below (which removes
|
||||
// bursts), a request still pending after 120s is genuinely STUCK, not
|
||||
// normal-slow: cut it with a clear error rather than hang for minutes (the
|
||||
// reported "висит десятки минут" symptom). 120s is ~12× the observed worst paced
|
||||
// TTFB. Contrast: #141's 60s was too tight (aborted real slow turns at ~61s);
|
||||
// a curl-style "wait forever" (0) is too loose (a truly stuck request hangs).
|
||||
//
|
||||
// undici REQUIRES a non-negative integer here and throws at construction time on
|
||||
// anything else, so a typo'd env value (e.g. "60s") must NOT reach it — that
|
||||
// would crash the whole AI layer at import. Sanitize: invalid/negative → default;
|
||||
// an explicit env 0 disables the timeout.
|
||||
const envTimeoutMs = (name: string, def: number): number => {
|
||||
const raw = process.env[name];
|
||||
if (raw === undefined) return def;
|
||||
const n = Number(raw);
|
||||
return Number.isInteger(n) && n >= 0 ? n : def;
|
||||
};
|
||||
const HEADERS_TIMEOUT_MS = envTimeoutMs('AI_HTTP_HEADERS_TIMEOUT_MS', 120_000);
|
||||
const BODY_TIMEOUT_MS = envTimeoutMs('AI_HTTP_BODY_TIMEOUT_MS', 120_000);
|
||||
|
||||
const baseAgent = new Agent({
|
||||
// Cap TCP/TLS connect so a stuck connect fails fast and gets retried instead
|
||||
@@ -66,9 +106,9 @@ const baseAgent = new Agent({
|
||||
// a stale/half-closed socket can be reused, which is exactly the condition
|
||||
// that produces `read ECONNRESET`. Do NOT raise this.
|
||||
keepAliveTimeout: 4_000,
|
||||
// Short time-to-headers (see HEADERS_TIMEOUT_MS) so a header stall fails fast
|
||||
// and gets retried; generous per-chunk body timeout so real streams survive
|
||||
// (see BODY_TIMEOUT_MS). Lowering headersTimeout does NOT truncate streams.
|
||||
// Generous-but-finite (default 120s; see HEADERS_TIMEOUT_MS above): tolerate
|
||||
// z.ai's slow first byte / sparse reasoning chunks, but cut a genuinely stuck
|
||||
// request so it can't hang for minutes (#140). Do NOT drop back to ~60s.
|
||||
headersTimeout: HEADERS_TIMEOUT_MS,
|
||||
bodyTimeout: BODY_TIMEOUT_MS,
|
||||
});
|
||||
@@ -91,6 +131,15 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, {
|
||||
// An explicit copy of undici 7.x's default connection-error code set, pinned
|
||||
// here so a future undici upgrade can't silently change which transport errors
|
||||
// we reconnect on. These are the errors we retry on a FRESH connection.
|
||||
//
|
||||
// CRITICAL — do NOT add timeout codes here:
|
||||
// - `UND_ERR_HEADERS_TIMEOUT` / `UND_ERR_BODY_TIMEOUT` are NOT connection
|
||||
// resets; retrying them re-sends a POST body against a torn-down request
|
||||
// and throws `UND_ERR_REQ_CONTENT_LENGTH_MISMATCH` (the #140/#141 prod
|
||||
// error — see ai-http.spec.ts), and merely re-incurs the same slow wait.
|
||||
// A header/body timeout means the upstream is slow, not that the socket is
|
||||
// poisoned — it must surface, not retry. (Timeouts are disabled anyway; see
|
||||
// HEADERS_TIMEOUT_MS / BODY_TIMEOUT_MS.)
|
||||
errorCodes: [
|
||||
'ECONNRESET',
|
||||
'ECONNREFUSED',
|
||||
@@ -100,12 +149,6 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, {
|
||||
'EHOSTDOWN',
|
||||
'EHOSTUNREACH',
|
||||
'UND_ERR_SOCKET',
|
||||
// Added (NOT in undici's default set): a header timeout fires BEFORE any
|
||||
// response body, so retrying is clean (no partially-consumed stream / Range
|
||||
// problem) — and it is exactly the z.ai stall mode (#140), where a fresh
|
||||
// retry usually succeeds. We deliberately do NOT retry UND_ERR_BODY_TIMEOUT
|
||||
// (mid-body; partial SSE already delivered, not safe to resume).
|
||||
'UND_ERR_HEADERS_TIMEOUT',
|
||||
'EPIPE',
|
||||
],
|
||||
});
|
||||
@@ -113,6 +156,120 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, {
|
||||
const logger = new Logger('AiHttp');
|
||||
let requestSeq = 0;
|
||||
|
||||
// PER-HOST CONCURRENCY GATE (z.ai GLM Coding Plan #140).
|
||||
// ----------------------------------------------------
|
||||
// z.ai's coding plan enforces an (undocumented) ~1 in-flight request limit per
|
||||
// account and throttles bursts with 429s + minute-long stalls; OpenCode etc.
|
||||
// "just work" because a CLI is naturally serial. Our server fires CONCURRENT AI
|
||||
// calls (chat stream + title generation + RAG embeddings + multiple tabs), all
|
||||
// on the same key, which trips that limit. So serialize outbound AI traffic PER
|
||||
// HOST to a small concurrency (default 1). The slot is held for the WHOLE
|
||||
// response — including the streamed SSE body — so a long chat stream blocks a
|
||||
// second concurrent request until it finishes, exactly like a single CLI client.
|
||||
// Env-tunable for providers that allow real concurrency (OpenAI/OpenRouter).
|
||||
const MAX_CONCURRENCY = ((): number => {
|
||||
const n = Number(process.env.AI_HTTP_MAX_CONCURRENCY);
|
||||
return Number.isInteger(n) && n > 0 ? n : 1;
|
||||
})();
|
||||
// Defensive cap: never hold a slot longer than this even if the caller never
|
||||
// finishes reading the body (a hung stream must not deadlock all AI traffic,
|
||||
// since headersTimeout/bodyTimeout are disabled — see HEADERS_TIMEOUT_MS).
|
||||
const MAX_HOLD_MS = ((): number => {
|
||||
const n = Number(process.env.AI_HTTP_MAX_HOLD_MS);
|
||||
return Number.isInteger(n) && n > 0 ? n : 600_000;
|
||||
})();
|
||||
// How many times to wait-out a 429 (rate limit) before surfacing it. The AI SDK
|
||||
// retries on top, but z.ai's coding plan 429s hard during peak hours, so a short
|
||||
// transport-level backoff that respects Retry-After absorbs most of it.
|
||||
const MAX_429_RETRIES = ((): number => {
|
||||
const n = Number(process.env.AI_HTTP_MAX_429_RETRIES);
|
||||
return Number.isInteger(n) && n >= 0 ? n : 3;
|
||||
})();
|
||||
|
||||
/** A fair, hand-off counting semaphore (FIFO). For max=1 it is a mutex. */
|
||||
class Semaphore {
|
||||
private active = 0;
|
||||
private readonly waiters: Array<() => void> = [];
|
||||
constructor(private readonly max: number) {}
|
||||
async acquire(): Promise<void> {
|
||||
if (this.active < this.max) {
|
||||
this.active++;
|
||||
return;
|
||||
}
|
||||
await new Promise<void>((resolve) => this.waiters.push(resolve));
|
||||
// Resumed via release(): the slot was handed directly to us.
|
||||
}
|
||||
release(): void {
|
||||
const next = this.waiters.shift();
|
||||
if (next) next(); // hand the slot to the next waiter; active unchanged
|
||||
else this.active--; // no waiter: free the slot
|
||||
}
|
||||
}
|
||||
|
||||
const hostGates = new Map<string, Semaphore>();
|
||||
function gateForHost(host: string): Semaphore {
|
||||
let gate = hostGates.get(host);
|
||||
if (!gate) {
|
||||
gate = new Semaphore(MAX_CONCURRENCY);
|
||||
hostGates.set(host, gate);
|
||||
}
|
||||
return gate;
|
||||
}
|
||||
|
||||
/** Parse a 429 `Retry-After` (seconds, or HTTP-date) into ms; null if absent. */
|
||||
function retryAfterMs(res: Response): number | null {
|
||||
const h = res.headers.get('retry-after');
|
||||
if (!h) return null;
|
||||
const secs = Number(h);
|
||||
if (Number.isFinite(secs)) return Math.max(0, secs * 1000);
|
||||
const when = Date.parse(h);
|
||||
return Number.isNaN(when) ? null : Math.max(0, when - Date.now());
|
||||
}
|
||||
|
||||
const sleep = (ms: number): Promise<void> =>
|
||||
new Promise((resolve) => {
|
||||
const t = setTimeout(resolve, ms);
|
||||
(t as { unref?: () => void }).unref?.();
|
||||
});
|
||||
|
||||
/**
|
||||
* Wrap a Response so `release` runs exactly once when its body is fully read,
|
||||
* cancelled, or errors — that is what holds the concurrency slot for the entire
|
||||
* (possibly streaming) response. Bodyless responses release immediately.
|
||||
*/
|
||||
function releaseOnBodyEnd(res: Response, release: () => void): Response {
|
||||
if (!res.body) {
|
||||
release();
|
||||
return res;
|
||||
}
|
||||
const reader = res.body.getReader();
|
||||
const wrapped = new ReadableStream<Uint8Array>({
|
||||
async pull(controller) {
|
||||
try {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
controller.close();
|
||||
release();
|
||||
} else {
|
||||
controller.enqueue(value);
|
||||
}
|
||||
} catch (err) {
|
||||
controller.error(err);
|
||||
release();
|
||||
}
|
||||
},
|
||||
cancel(reason) {
|
||||
release();
|
||||
return reader.cancel(reason);
|
||||
},
|
||||
});
|
||||
return new Response(wrapped, {
|
||||
status: res.status,
|
||||
statusText: res.statusText,
|
||||
headers: res.headers,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* A `fetch`-compatible function that routes the request through the shared,
|
||||
* resilient AI dispatcher. Injected into AI SDK provider factories via their
|
||||
@@ -135,24 +292,65 @@ export const aiFetch: typeof fetch = async (input, init) => {
|
||||
: input instanceof URL
|
||||
? input.href
|
||||
: (input as Request).url;
|
||||
let host = '';
|
||||
let path = rawUrl;
|
||||
try {
|
||||
const u = new URL(rawUrl);
|
||||
host = u.host;
|
||||
path = u.host + u.pathname;
|
||||
} catch {
|
||||
// Non-absolute / unparseable URL: keep the raw string (still no secrets).
|
||||
}
|
||||
const isChat = /\/(chat\/completions|responses)\b/.test(path);
|
||||
const isChat = /\/(chat\/completions|responses|messages)\b/.test(path);
|
||||
const log = (msg: string): void =>
|
||||
isChat ? logger.log(msg) : logger.debug(msg);
|
||||
|
||||
// Serialize per host (z.ai concurrency limit, #140). The slot is held until the
|
||||
// response BODY is fully consumed (see releaseOnBodyEnd), with a defensive
|
||||
// max-hold so a hung stream can never deadlock all AI traffic.
|
||||
const gate = gateForHost(host);
|
||||
await gate.acquire();
|
||||
let released = false;
|
||||
const holdTimer = setTimeout(() => {
|
||||
if (released) return;
|
||||
released = true;
|
||||
gate.release();
|
||||
logger.warn(
|
||||
`provider request #${id} held the ${host || 'ai'} slot > ${MAX_HOLD_MS}ms; releasing defensively`,
|
||||
);
|
||||
}, MAX_HOLD_MS);
|
||||
(holdTimer as { unref?: () => void }).unref?.();
|
||||
const release = (): void => {
|
||||
if (released) return;
|
||||
released = true;
|
||||
clearTimeout(holdTimer);
|
||||
gate.release();
|
||||
};
|
||||
|
||||
const startedAt = performance.now();
|
||||
log(`provider request #${id} -> ${method} ${path}`);
|
||||
try {
|
||||
const res = await fetch(input, { ...init, dispatcher } as RequestInit);
|
||||
const ms = Math.round(performance.now() - startedAt);
|
||||
log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`);
|
||||
return res;
|
||||
for (let attempt = 0; ; attempt++) {
|
||||
const res = await fetch(input, { ...init, dispatcher } as RequestInit);
|
||||
const ms = Math.round(performance.now() - startedAt);
|
||||
// 429 (rate limit) — wait out z.ai's throttle (Retry-After or exponential
|
||||
// backoff) and retry, instead of failing the turn. Keep holding the slot:
|
||||
// we ARE the single in-flight request; the backoff naturally paces us.
|
||||
if (res.status === 429 && attempt < MAX_429_RETRIES) {
|
||||
const wait = retryAfterMs(res) ?? Math.min(8_000, 500 * 2 ** attempt);
|
||||
log(
|
||||
`provider request #${id} <- 429 in ${ms}ms; backoff ${wait}ms (attempt ${attempt + 1}/${MAX_429_RETRIES})`,
|
||||
);
|
||||
await res.body?.cancel?.().catch(() => {});
|
||||
await sleep(wait);
|
||||
continue;
|
||||
}
|
||||
log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`);
|
||||
// Hold the concurrency slot until the (possibly streamed) body is consumed.
|
||||
return releaseOnBodyEnd(res, release);
|
||||
}
|
||||
} catch (err) {
|
||||
release();
|
||||
const ms = Math.round(performance.now() - startedAt);
|
||||
// Node's fetch reports a generic "fetch failed"; the real reason (e.g. an
|
||||
// undici SocketError with .code ECONNRESET / UND_ERR_SOCKET /
|
||||
|
||||
@@ -296,6 +296,9 @@ export class AiService {
|
||||
);
|
||||
}
|
||||
const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`;
|
||||
// Bound the STT call explicitly. aiFetch disables transport-level
|
||||
// headers/body timeouts by default (see ai-http.ts #140 note), so without an
|
||||
// app-level signal a hung STT endpoint would wait forever. Mirror embedTexts.
|
||||
const res = await aiFetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
@@ -310,6 +313,7 @@ export class AiService {
|
||||
format,
|
||||
},
|
||||
}),
|
||||
signal: AbortSignal.timeout(AiService.sttTimeoutMs()),
|
||||
});
|
||||
if (!res.ok) {
|
||||
// Surface status + body so the real reason reaches the user; never log the key.
|
||||
@@ -376,6 +380,16 @@ export class AiService {
|
||||
return Number.isFinite(raw) && raw > 0 ? raw : 120_000;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-transcription (STT) call timeout in ms. Configurable via
|
||||
* AI_STT_TIMEOUT_MS; falls back to 120000 (2 min) when unset or invalid.
|
||||
* Needed because aiFetch disables transport-level timeouts by default (#140).
|
||||
*/
|
||||
private static sttTimeoutMs(): number {
|
||||
const raw = Number(process.env.AI_STT_TIMEOUT_MS);
|
||||
return Number.isFinite(raw) && raw > 0 ? raw : 120_000;
|
||||
}
|
||||
|
||||
// Build a tiny valid WAV (mono, 16-bit PCM, 16 kHz, ~1s of silence), used only
|
||||
// as a connectivity probe for the STT endpoint in testConnection.
|
||||
private static silentWavProbe(): Uint8Array {
|
||||
|
||||
Reference in New Issue
Block a user