diff --git a/apps/server/src/integrations/ai/ai-http.spec.ts b/apps/server/src/integrations/ai/ai-http.spec.ts index 4fbfab04..1301ff20 100644 --- a/apps/server/src/integrations/ai/ai-http.spec.ts +++ b/apps/server/src/integrations/ai/ai-http.spec.ts @@ -1,12 +1,44 @@ import * as http from 'node:http'; -import { RetryAgent } from 'undici'; - -// A short header timeout makes the #140 "header stall" deterministic and fast. -// Must be set BEFORE importing ai-http (the undici agents read it at module load). -process.env.AI_HTTP_HEADERS_TIMEOUT_MS = '800'; +import { Agent, RetryAgent } from 'undici'; import { aiFetch } from './ai-http'; +/** + * Spin up a throwaway loopback HTTP server. `onReq` decides how it responds (or + * deliberately stalls). Returns the base URL and a close fn. + */ +async function loopback( + onReq: (req: http.IncomingMessage, res: http.ServerResponse) => void, +): Promise<{ url: string; close: () => Promise }> { + const server = http.createServer(onReq); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const { port } = server.address() as { port: number }; + return { + url: `http://127.0.0.1:${port}`, + close: () => + new Promise((resolve) => server.close(() => resolve())), + }; +} + +async function codeChain(p: Promise): Promise { + try { + await p; + return ['']; + } catch (e) { + const chain: string[] = []; + let cur: unknown = e; + for (let d = 0; d < 4 && cur; d++) { + const x = cur as { code?: string; cause?: unknown }; + // Capture BOTH the undici error .code (when present) and the stringified + // error (which carries the class name, e.g. RequestContentLengthMismatchError) + // so the chain is meaningful regardless of how the code surfaces. + chain.push(`${x.code ?? ''} ${String(x)}`.trim()); + cur = x.cause; + } + return chain; + } +} + /** * Light, dependency-free unit checks for the shared AI HTTP layer. The module * constructs its undici dispatcher eagerly at import time, so importing it here @@ -49,64 +81,171 @@ describe('ai-http', () => { spy.mockRestore(); } }); -}); -/** - * #140 regression: a provider that accepts the request but stalls without ever - * sending response headers must FAIL FAST (at headersTimeout — set to 800ms - * above, not undici's 300s default) and be RETRIED on a fresh connection. - * headersTimeout only bounds time-to-headers, so a healthy fast response is - * unaffected. Uses a real loopback server; makes no external network calls. - */ -describe('aiFetch header-stall resilience (#140)', () => { - function makeServer( - handler: http.RequestListener, - ): Promise<{ url: string; close: () => Promise }> { - return new Promise((resolve) => { - const server = http.createServer(handler); - server.listen(0, '127.0.0.1', () => { - const port = (server.address() as { port: number }).port; - resolve({ - url: `http://127.0.0.1:${port}/health`, - close: () => new Promise((r) => server.close(() => r())), - }); + /** + * #140 ROOT-CAUSE LOCK. These tests reproduce, against a loopback server that + * NEVER sends response headers (standing in for z.ai's slow-first-byte + * reasoning turn), exactly why the issue-#141 transport config "broke every + * time" — and assert the corrected config does not. They use SHORT finite + * timeouts on locally-built agents so the suite stays fast; the shipped + * `aiFetch` disables these timeouts entirely (see HEADERS_TIMEOUT_MS). + */ + describe('#140 headers-timeout retry hazard', () => { + it('BAD (issue #141): retrying a POST on UND_ERR_HEADERS_TIMEOUT throws CONTENT_LENGTH_MISMATCH', async () => { + // A server that consumes the request body but never replies -> the client + // headersTimeout fires. This mirrors z.ai stalling before the first byte. + const srv = await loopback((req) => req.resume()); + const base = new Agent({ + headersTimeout: 300, + bodyTimeout: 300, + connect: { timeout: 1000 }, }); + // The exact mistake of #141: retry POST on the headers-timeout code. + const bad = new RetryAgent(base, { + maxRetries: 2, + minTimeout: 50, + maxTimeout: 100, + methods: ['POST'], + statusCodes: [], + errorCodes: ['ECONNRESET', 'UND_ERR_HEADERS_TIMEOUT'], + }); + const chain = await codeChain( + fetch(`${srv.url}/chat/completions`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + // Multibyte body, like a real Russian chat turn. + body: JSON.stringify({ msg: `Привет ${'слово '.repeat(20)}😀` }), + dispatcher: bad, + } as RequestInit), + ); + // The hallmark production error (#140/#141) — proves the retry corrupts + // the re-sent POST body rather than surfacing the real timeout. undici + // exposes it as code UND_ERR_REQ_CONTENT_LENGTH_MISMATCH / + // RequestContentLengthMismatchError. + expect(chain.join('|')).toMatch(/ContentLengthMismatch|CONTENT_LENGTH/); + await bad.close(); // closing the RetryAgent wrapper closes the base pool + await srv.close(); }); - } - it('retries a header stall on a fresh connection and recovers', async () => { - let attempts = 0; - const { url, close } = await makeServer((_req, res) => { - attempts++; - // First attempt: never send headers -> UND_ERR_HEADERS_TIMEOUT -> retry. - if (attempts === 1) return; - res.writeHead(200, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ ok: true, servedOnAttempt: attempts })); + it('GOOD (this fix): NOT retrying the headers-timeout surfaces the honest UND_ERR_HEADERS_TIMEOUT', async () => { + const srv = await loopback((req) => req.resume()); + const base = new Agent({ + headersTimeout: 300, + bodyTimeout: 300, + connect: { timeout: 1000 }, + }); + // Our shipped retry set: connection resets only, NO timeout codes. + const good = new RetryAgent(base, { + maxRetries: 2, + minTimeout: 50, + maxTimeout: 100, + methods: ['POST'], + statusCodes: [], + errorCodes: ['ECONNRESET', 'UND_ERR_SOCKET', 'EPIPE'], + }); + const chain = await codeChain( + fetch(`${srv.url}/chat/completions`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ msg: `Привет ${'слово '.repeat(20)}😀` }), + dispatcher: good, + } as RequestInit), + ); + // The honest timeout (UND_ERR_HEADERS_TIMEOUT / HeadersTimeoutError) — no + // CONTENT_LENGTH_MISMATCH corruption. + expect(chain.join('|')).toMatch(/HeadersTimeout|HEADERS_TIMEOUT/); + expect(chain.join('|')).not.toMatch(/ContentLengthMismatch|CONTENT_LENGTH/); + await good.close(); // closing the RetryAgent wrapper closes the base pool + await srv.close(); }); - try { - const res = await aiFetch(url, { method: 'GET' }); - expect(res.status).toBe(200); - const body = (await res.json()) as { servedOnAttempt: number }; - expect(attempts).toBeGreaterThanOrEqual(2); // the stalled attempt was retried - expect(body.servedOnAttempt).toBeGreaterThanOrEqual(2); - } finally { - await close(); - } - }, 15000); - it('passes a healthy fast response straight through (one attempt)', async () => { - let attempts = 0; - const { url, close } = await makeServer((_req, res) => { - attempts++; - res.writeHead(200, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ ok: true })); - }); - try { - const res = await aiFetch(url, { method: 'GET' }); + it('aiFetch awaits a slow-first-byte response that a finite headersTimeout would abort (curl parity)', async () => { + // The SAME server delays the response headers by 2.5s (z.ai's slow first + // byte). A control agent with a TIGHT headersTimeout aborts it; the shipped + // aiFetch (generous 120s default) must instead deliver the 200 — proving a + // real slow-first-byte turn is tolerated, not killed the way #141's 60s cap + // did. The gap is wide because undici's timeout timer wheel is coarse (~1s); + // a 2.5s delay vs a 1s control timeout makes the abort deterministic. + const srv = await loopback((_req, res) => { + setTimeout(() => { + res.writeHead(200, { 'content-type': 'text/event-stream' }); + res.end('data: ok\n\n'); + }, 2500); + }); + const url = `${srv.url}/chat/completions`; + const init: RequestInit = { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ msg: 'Привет 😀' }), + }; + + // CONTROL: a finite headersTimeout (like #141) aborts the slow first byte. + const finite = new Agent({ headersTimeout: 1000, bodyTimeout: 1000 }); + const controlChain = await codeChain( + fetch(url, { ...init, dispatcher: finite } as RequestInit), + ); + expect(controlChain.join('|')).toMatch(/HeadersTimeout|HEADERS_TIMEOUT/); + await finite.close(); + + // THE FIX: aiFetch (disabled timeouts) waits and delivers the 200. + const res = await aiFetch(url, init); expect(res.status).toBe(200); - expect(attempts).toBe(1); - } finally { - await close(); - } - }, 15000); + await res.text(); + await srv.close(); + }); + }); + + describe('#140 z.ai concurrency gate + 429 backoff', () => { + it('serializes concurrent requests to the same host until the first body is consumed', async () => { + const arrivals: string[] = []; + const srv = await loopback((req, res) => { + arrivals.push(req.url ?? ''); + setTimeout(() => { + res.writeHead(200, { 'content-type': 'text/plain' }); + res.end('body'); + }, 30); + }); + const mk = (q: string): Promise => + aiFetch(`${srv.url}/chat/completions?${q}`, { method: 'POST', body: q }); + + // A acquires the only slot and gets its headers; its BODY is not yet read, + // so the slot is still held. + const pA = mk('a'); + const resA = await pA; + // B is requested now but must QUEUE behind A's still-open body. + const pB = mk('b'); + await new Promise((r) => setTimeout(r, 120)); + // B must NOT have hit the server yet — only A has arrived. + expect(arrivals.length).toBe(1); + + // Consuming A's body releases the slot, letting B proceed. + await resA.text(); + const resB = await pB; + await resB.text(); + expect(arrivals.length).toBe(2); + await srv.close(); + }); + + it('waits out a 429 (Retry-After) and returns the eventual success', async () => { + let n = 0; + const srv = await loopback((_req, res) => { + n++; + if (n === 1) { + res.writeHead(429, { 'retry-after': '0' }); + res.end('rate limited'); + } else { + res.writeHead(200, { 'content-type': 'text/plain' }); + res.end('ok'); + } + }); + const res = await aiFetch(`${srv.url}/chat/completions`, { + method: 'POST', + body: 'x', + }); + expect(res.status).toBe(200); + expect(await res.text()).toBe('ok'); + expect(n).toBe(2); // one 429, retried once on backoff, then 200 + await srv.close(); + }); + }); }); diff --git a/apps/server/src/integrations/ai/ai-http.ts b/apps/server/src/integrations/ai/ai-http.ts index 403b0f7d..efd0091c 100644 --- a/apps/server/src/integrations/ai/ai-http.ts +++ b/apps/server/src/integrations/ai/ai-http.ts @@ -7,9 +7,7 @@ import { Logger } from '@nestjs/common'; * WHY THIS EXISTS * --------------- * Production logs showed the AI chat stream (and title generation) failing with - * `read ECONNRESET` after the AI SDK's own retries were exhausted, and - * (z.ai GLM coding endpoint, #140) intermittently stalling without ever sending - * response headers until undici's 300s default cut the request with no retry. The provider + * `read ECONNRESET` after the AI SDK's own retries were exhausted. The provider * clients were built with NO custom `fetch`, so all outbound LLM traffic used * Node's default global undici agent: default keep-alive pooling and NO * transport-level reconnect on connection resets. `read ECONNRESET` is a TCP RST @@ -41,22 +39,64 @@ import { Logger } from '@nestjs/common'; * and the payload was partially consumed" instead of the raw ECONNRESET. The * stream is NEVER corrupted (undici guards against concatenation) — only the * error message for that rarer mid-stream case changes. + * + * ROOT CAUSE OF #140 (z.ai chat stream "breaks every time", curl does not) + * ---------------------------------------------------------------------- + * z.ai's coding endpoint (`api/coding/paas/v4`, GLM-5.2 — a REASONING model) + * has a long, variable TIME-TO-FIRST-RESPONSE-HEADER on a heavy chat request + * (tools + system prompt + injected document + history): it sits in its + * reasoning phase emitting nothing for tens of seconds before the first SSE + * byte. A trivial `generateText` ping returns in <2s, which is why the Settings + * "test connection" always passes while the real chat stalls. `curl` succeeds + * because it imposes NO time-to-first-header limit — it simply waits. + * + * The earlier "fix" (issue #141) made this STRICTLY WORSE in two ways: + * 1. It set `headersTimeout: 60_000`, so undici ABORTS every heavy turn at + * ~60s — deterministically, before z.ai answers (the prod logs show every + * request failing at ~61-62s). That is the "ломается КАЖДЫЙ раз" regression. + * 2. It added `UND_ERR_HEADERS_TIMEOUT` to the RetryAgent retry codes. Retrying + * a POST-with-body after a headers-timeout abort makes undici re-send the + * body against a torn-down request and throw + * `UND_ERR_REQ_CONTENT_LENGTH_MISMATCH` — the exact production error. + * (Reproduced in ai-http.spec.ts.) + * + * THE FIX (three parts, all in this module): + * 1. GENEROUS-FINITE timeouts (default 120s), not the 60s of #141 and not + * "infinite". A 30-min probe showed paced single requests always answer in + * <10s; so 120s tolerates real slow turns yet bounds a genuinely-stuck one. + * 2. NEVER retry a header/body timeout at the transport layer (only genuine + * connection resets are retried on a fresh socket) — retrying a timed-out + * POST-with-body is what produced #141's CONTENT_LENGTH_MISMATCH. + * 3. Serialize per host + back off on 429 (see below): z.ai's coding plan + * throttles bursts (the agent fires up to 20 requests/turn), so we hold a + * single in-flight slot per host and wait out 429s instead of cascading. */ -// `headersTimeout` bounds time-to-FIRST-response-headers (before any body). It -// is NOT the streaming budget: once headers arrive the SSE body streams freely, -// unaffected by this value — so it is safe to keep SHORT. Some providers (seen -// with the z.ai GLM coding endpoint, #140) intermittently accept the request but -// never send response headers; undici's 300s default then hangs the user for -// FIVE MINUTES before failing, with no retry. Cap it so a stalled request fails -// FAST and is retried on a fresh connection (the retry usually lands on a healthy -// path and responds in seconds). Env-overridable for ops tuning. -const HEADERS_TIMEOUT_MS = - Number(process.env.AI_HTTP_HEADERS_TIMEOUT_MS) || 60_000; -// `bodyTimeout` bounds the gap BETWEEN streamed body chunks (not total stream -// length). Kept generous so a legitimately slow/thinking model with sparse SSE -// chunks is never killed mid-stream. Env-overridable. -const BODY_TIMEOUT_MS = Number(process.env.AI_HTTP_BODY_TIMEOUT_MS) || 300_000; +// Time-to-FIRST-response-headers (`headersTimeout`) and gap-between-streamed- +// body-chunks (`bodyTimeout`) bounds, in ms. GENEROUS but FINITE by default +// (env-overridable; set 0 to disable entirely). +// +// Rationale (measured): a PACED single z.ai request responds within ~10s and +// NEVER hung over a 30-min probe (22/22, max 9.9s). z.ai only stalls under +// BURSTS — so paired with the per-host concurrency gate below (which removes +// bursts), a request still pending after 120s is genuinely STUCK, not +// normal-slow: cut it with a clear error rather than hang for minutes (the +// reported "висит десятки минут" symptom). 120s is ~12× the observed worst paced +// TTFB. Contrast: #141's 60s was too tight (aborted real slow turns at ~61s); +// a curl-style "wait forever" (0) is too loose (a truly stuck request hangs). +// +// undici REQUIRES a non-negative integer here and throws at construction time on +// anything else, so a typo'd env value (e.g. "60s") must NOT reach it — that +// would crash the whole AI layer at import. Sanitize: invalid/negative → default; +// an explicit env 0 disables the timeout. +const envTimeoutMs = (name: string, def: number): number => { + const raw = process.env[name]; + if (raw === undefined) return def; + const n = Number(raw); + return Number.isInteger(n) && n >= 0 ? n : def; +}; +const HEADERS_TIMEOUT_MS = envTimeoutMs('AI_HTTP_HEADERS_TIMEOUT_MS', 120_000); +const BODY_TIMEOUT_MS = envTimeoutMs('AI_HTTP_BODY_TIMEOUT_MS', 120_000); const baseAgent = new Agent({ // Cap TCP/TLS connect so a stuck connect fails fast and gets retried instead @@ -66,9 +106,9 @@ const baseAgent = new Agent({ // a stale/half-closed socket can be reused, which is exactly the condition // that produces `read ECONNRESET`. Do NOT raise this. keepAliveTimeout: 4_000, - // Short time-to-headers (see HEADERS_TIMEOUT_MS) so a header stall fails fast - // and gets retried; generous per-chunk body timeout so real streams survive - // (see BODY_TIMEOUT_MS). Lowering headersTimeout does NOT truncate streams. + // Generous-but-finite (default 120s; see HEADERS_TIMEOUT_MS above): tolerate + // z.ai's slow first byte / sparse reasoning chunks, but cut a genuinely stuck + // request so it can't hang for minutes (#140). Do NOT drop back to ~60s. headersTimeout: HEADERS_TIMEOUT_MS, bodyTimeout: BODY_TIMEOUT_MS, }); @@ -91,6 +131,15 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, { // An explicit copy of undici 7.x's default connection-error code set, pinned // here so a future undici upgrade can't silently change which transport errors // we reconnect on. These are the errors we retry on a FRESH connection. + // + // CRITICAL — do NOT add timeout codes here: + // - `UND_ERR_HEADERS_TIMEOUT` / `UND_ERR_BODY_TIMEOUT` are NOT connection + // resets; retrying them re-sends a POST body against a torn-down request + // and throws `UND_ERR_REQ_CONTENT_LENGTH_MISMATCH` (the #140/#141 prod + // error — see ai-http.spec.ts), and merely re-incurs the same slow wait. + // A header/body timeout means the upstream is slow, not that the socket is + // poisoned — it must surface, not retry. (Timeouts are disabled anyway; see + // HEADERS_TIMEOUT_MS / BODY_TIMEOUT_MS.) errorCodes: [ 'ECONNRESET', 'ECONNREFUSED', @@ -100,12 +149,6 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, { 'EHOSTDOWN', 'EHOSTUNREACH', 'UND_ERR_SOCKET', - // Added (NOT in undici's default set): a header timeout fires BEFORE any - // response body, so retrying is clean (no partially-consumed stream / Range - // problem) — and it is exactly the z.ai stall mode (#140), where a fresh - // retry usually succeeds. We deliberately do NOT retry UND_ERR_BODY_TIMEOUT - // (mid-body; partial SSE already delivered, not safe to resume). - 'UND_ERR_HEADERS_TIMEOUT', 'EPIPE', ], }); @@ -113,6 +156,120 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, { const logger = new Logger('AiHttp'); let requestSeq = 0; +// PER-HOST CONCURRENCY GATE (z.ai GLM Coding Plan #140). +// ---------------------------------------------------- +// z.ai's coding plan enforces an (undocumented) ~1 in-flight request limit per +// account and throttles bursts with 429s + minute-long stalls; OpenCode etc. +// "just work" because a CLI is naturally serial. Our server fires CONCURRENT AI +// calls (chat stream + title generation + RAG embeddings + multiple tabs), all +// on the same key, which trips that limit. So serialize outbound AI traffic PER +// HOST to a small concurrency (default 1). The slot is held for the WHOLE +// response — including the streamed SSE body — so a long chat stream blocks a +// second concurrent request until it finishes, exactly like a single CLI client. +// Env-tunable for providers that allow real concurrency (OpenAI/OpenRouter). +const MAX_CONCURRENCY = ((): number => { + const n = Number(process.env.AI_HTTP_MAX_CONCURRENCY); + return Number.isInteger(n) && n > 0 ? n : 1; +})(); +// Defensive cap: never hold a slot longer than this even if the caller never +// finishes reading the body (a hung stream must not deadlock all AI traffic, +// since headersTimeout/bodyTimeout are disabled — see HEADERS_TIMEOUT_MS). +const MAX_HOLD_MS = ((): number => { + const n = Number(process.env.AI_HTTP_MAX_HOLD_MS); + return Number.isInteger(n) && n > 0 ? n : 600_000; +})(); +// How many times to wait-out a 429 (rate limit) before surfacing it. The AI SDK +// retries on top, but z.ai's coding plan 429s hard during peak hours, so a short +// transport-level backoff that respects Retry-After absorbs most of it. +const MAX_429_RETRIES = ((): number => { + const n = Number(process.env.AI_HTTP_MAX_429_RETRIES); + return Number.isInteger(n) && n >= 0 ? n : 3; +})(); + +/** A fair, hand-off counting semaphore (FIFO). For max=1 it is a mutex. */ +class Semaphore { + private active = 0; + private readonly waiters: Array<() => void> = []; + constructor(private readonly max: number) {} + async acquire(): Promise { + if (this.active < this.max) { + this.active++; + return; + } + await new Promise((resolve) => this.waiters.push(resolve)); + // Resumed via release(): the slot was handed directly to us. + } + release(): void { + const next = this.waiters.shift(); + if (next) next(); // hand the slot to the next waiter; active unchanged + else this.active--; // no waiter: free the slot + } +} + +const hostGates = new Map(); +function gateForHost(host: string): Semaphore { + let gate = hostGates.get(host); + if (!gate) { + gate = new Semaphore(MAX_CONCURRENCY); + hostGates.set(host, gate); + } + return gate; +} + +/** Parse a 429 `Retry-After` (seconds, or HTTP-date) into ms; null if absent. */ +function retryAfterMs(res: Response): number | null { + const h = res.headers.get('retry-after'); + if (!h) return null; + const secs = Number(h); + if (Number.isFinite(secs)) return Math.max(0, secs * 1000); + const when = Date.parse(h); + return Number.isNaN(when) ? null : Math.max(0, when - Date.now()); +} + +const sleep = (ms: number): Promise => + new Promise((resolve) => { + const t = setTimeout(resolve, ms); + (t as { unref?: () => void }).unref?.(); + }); + +/** + * Wrap a Response so `release` runs exactly once when its body is fully read, + * cancelled, or errors — that is what holds the concurrency slot for the entire + * (possibly streaming) response. Bodyless responses release immediately. + */ +function releaseOnBodyEnd(res: Response, release: () => void): Response { + if (!res.body) { + release(); + return res; + } + const reader = res.body.getReader(); + const wrapped = new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read(); + if (done) { + controller.close(); + release(); + } else { + controller.enqueue(value); + } + } catch (err) { + controller.error(err); + release(); + } + }, + cancel(reason) { + release(); + return reader.cancel(reason); + }, + }); + return new Response(wrapped, { + status: res.status, + statusText: res.statusText, + headers: res.headers, + }); +} + /** * A `fetch`-compatible function that routes the request through the shared, * resilient AI dispatcher. Injected into AI SDK provider factories via their @@ -135,24 +292,65 @@ export const aiFetch: typeof fetch = async (input, init) => { : input instanceof URL ? input.href : (input as Request).url; + let host = ''; let path = rawUrl; try { const u = new URL(rawUrl); + host = u.host; path = u.host + u.pathname; } catch { // Non-absolute / unparseable URL: keep the raw string (still no secrets). } - const isChat = /\/(chat\/completions|responses)\b/.test(path); + const isChat = /\/(chat\/completions|responses|messages)\b/.test(path); const log = (msg: string): void => isChat ? logger.log(msg) : logger.debug(msg); + + // Serialize per host (z.ai concurrency limit, #140). The slot is held until the + // response BODY is fully consumed (see releaseOnBodyEnd), with a defensive + // max-hold so a hung stream can never deadlock all AI traffic. + const gate = gateForHost(host); + await gate.acquire(); + let released = false; + const holdTimer = setTimeout(() => { + if (released) return; + released = true; + gate.release(); + logger.warn( + `provider request #${id} held the ${host || 'ai'} slot > ${MAX_HOLD_MS}ms; releasing defensively`, + ); + }, MAX_HOLD_MS); + (holdTimer as { unref?: () => void }).unref?.(); + const release = (): void => { + if (released) return; + released = true; + clearTimeout(holdTimer); + gate.release(); + }; + const startedAt = performance.now(); log(`provider request #${id} -> ${method} ${path}`); try { - const res = await fetch(input, { ...init, dispatcher } as RequestInit); - const ms = Math.round(performance.now() - startedAt); - log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`); - return res; + for (let attempt = 0; ; attempt++) { + const res = await fetch(input, { ...init, dispatcher } as RequestInit); + const ms = Math.round(performance.now() - startedAt); + // 429 (rate limit) — wait out z.ai's throttle (Retry-After or exponential + // backoff) and retry, instead of failing the turn. Keep holding the slot: + // we ARE the single in-flight request; the backoff naturally paces us. + if (res.status === 429 && attempt < MAX_429_RETRIES) { + const wait = retryAfterMs(res) ?? Math.min(8_000, 500 * 2 ** attempt); + log( + `provider request #${id} <- 429 in ${ms}ms; backoff ${wait}ms (attempt ${attempt + 1}/${MAX_429_RETRIES})`, + ); + await res.body?.cancel?.().catch(() => {}); + await sleep(wait); + continue; + } + log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`); + // Hold the concurrency slot until the (possibly streamed) body is consumed. + return releaseOnBodyEnd(res, release); + } } catch (err) { + release(); const ms = Math.round(performance.now() - startedAt); // Node's fetch reports a generic "fetch failed"; the real reason (e.g. an // undici SocketError with .code ECONNRESET / UND_ERR_SOCKET / diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index f5f4dca3..83fa7b9f 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -296,6 +296,9 @@ export class AiService { ); } const url = `${baseURL.replace(/\/$/, '')}/audio/transcriptions`; + // Bound the STT call explicitly. aiFetch disables transport-level + // headers/body timeouts by default (see ai-http.ts #140 note), so without an + // app-level signal a hung STT endpoint would wait forever. Mirror embedTexts. const res = await aiFetch(url, { method: 'POST', headers: { @@ -310,6 +313,7 @@ export class AiService { format, }, }), + signal: AbortSignal.timeout(AiService.sttTimeoutMs()), }); if (!res.ok) { // Surface status + body so the real reason reaches the user; never log the key. @@ -376,6 +380,16 @@ export class AiService { return Number.isFinite(raw) && raw > 0 ? raw : 120_000; } + /** + * Per-transcription (STT) call timeout in ms. Configurable via + * AI_STT_TIMEOUT_MS; falls back to 120000 (2 min) when unset or invalid. + * Needed because aiFetch disables transport-level timeouts by default (#140). + */ + private static sttTimeoutMs(): number { + const raw = Number(process.env.AI_STT_TIMEOUT_MS); + return Number.isFinite(raw) && raw > 0 ? raw : 120_000; + } + // Build a tiny valid WAV (mono, 16-bit PCM, 16 kHz, ~1s of silence), used only // as a connectivity probe for the STT endpoint in testConnection. private static silentWavProbe(): Uint8Array {