diff --git a/apps/server/src/integrations/ai/ai-http.spec.ts b/apps/server/src/integrations/ai/ai-http.spec.ts index 570b1b6d..7e3b7b24 100644 --- a/apps/server/src/integrations/ai/ai-http.spec.ts +++ b/apps/server/src/integrations/ai/ai-http.spec.ts @@ -194,4 +194,58 @@ describe('ai-http', () => { await srv.close(); }); }); + + describe('#140 z.ai concurrency gate + 429 backoff', () => { + it('serializes concurrent requests to the same host until the first body is consumed', async () => { + const arrivals: string[] = []; + const srv = await loopback((req, res) => { + arrivals.push(req.url ?? ''); + setTimeout(() => { + res.writeHead(200, { 'content-type': 'text/plain' }); + res.end('body'); + }, 30); + }); + const mk = (q: string): Promise => + aiFetch(`${srv.url}/chat/completions?${q}`, { method: 'POST', body: q }); + + // A acquires the only slot and gets its headers; its BODY is not yet read, + // so the slot is still held. + const pA = mk('a'); + const resA = await pA; + // B is requested now but must QUEUE behind A's still-open body. + const pB = mk('b'); + await new Promise((r) => setTimeout(r, 120)); + // B must NOT have hit the server yet — only A has arrived. + expect(arrivals.length).toBe(1); + + // Consuming A's body releases the slot, letting B proceed. + await resA.text(); + const resB = await pB; + await resB.text(); + expect(arrivals.length).toBe(2); + await srv.close(); + }); + + it('waits out a 429 (Retry-After) and returns the eventual success', async () => { + let n = 0; + const srv = await loopback((_req, res) => { + n++; + if (n === 1) { + res.writeHead(429, { 'retry-after': '0' }); + res.end('rate limited'); + } else { + res.writeHead(200, { 'content-type': 'text/plain' }); + res.end('ok'); + } + }); + const res = await aiFetch(`${srv.url}/chat/completions`, { + method: 'POST', + body: 'x', + }); + expect(res.status).toBe(200); + expect(await res.text()).toBe('ok'); + expect(n).toBe(2); // one 429, retried once on backoff, then 200 + await srv.close(); + }); + }); }); diff --git a/apps/server/src/integrations/ai/ai-http.ts b/apps/server/src/integrations/ai/ai-http.ts index 9e34fa1c..d5f99c4b 100644 --- a/apps/server/src/integrations/ai/ai-http.ts +++ b/apps/server/src/integrations/ai/ai-http.ts @@ -140,6 +140,120 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, { const logger = new Logger('AiHttp'); let requestSeq = 0; +// PER-HOST CONCURRENCY GATE (z.ai GLM Coding Plan #140). +// ---------------------------------------------------- +// z.ai's coding plan enforces an (undocumented) ~1 in-flight request limit per +// account and throttles bursts with 429s + minute-long stalls; OpenCode etc. +// "just work" because a CLI is naturally serial. Our server fires CONCURRENT AI +// calls (chat stream + title generation + RAG embeddings + multiple tabs), all +// on the same key, which trips that limit. So serialize outbound AI traffic PER +// HOST to a small concurrency (default 1). The slot is held for the WHOLE +// response — including the streamed SSE body — so a long chat stream blocks a +// second concurrent request until it finishes, exactly like a single CLI client. +// Env-tunable for providers that allow real concurrency (OpenAI/OpenRouter). +const MAX_CONCURRENCY = ((): number => { + const n = Number(process.env.AI_HTTP_MAX_CONCURRENCY); + return Number.isInteger(n) && n > 0 ? n : 1; +})(); +// Defensive cap: never hold a slot longer than this even if the caller never +// finishes reading the body (a hung stream must not deadlock all AI traffic, +// since headersTimeout/bodyTimeout are disabled — see HEADERS_TIMEOUT_MS). +const MAX_HOLD_MS = ((): number => { + const n = Number(process.env.AI_HTTP_MAX_HOLD_MS); + return Number.isInteger(n) && n > 0 ? n : 600_000; +})(); +// How many times to wait-out a 429 (rate limit) before surfacing it. The AI SDK +// retries on top, but z.ai's coding plan 429s hard during peak hours, so a short +// transport-level backoff that respects Retry-After absorbs most of it. +const MAX_429_RETRIES = ((): number => { + const n = Number(process.env.AI_HTTP_MAX_429_RETRIES); + return Number.isInteger(n) && n >= 0 ? n : 3; +})(); + +/** A fair, hand-off counting semaphore (FIFO). For max=1 it is a mutex. */ +class Semaphore { + private active = 0; + private readonly waiters: Array<() => void> = []; + constructor(private readonly max: number) {} + async acquire(): Promise { + if (this.active < this.max) { + this.active++; + return; + } + await new Promise((resolve) => this.waiters.push(resolve)); + // Resumed via release(): the slot was handed directly to us. + } + release(): void { + const next = this.waiters.shift(); + if (next) next(); // hand the slot to the next waiter; active unchanged + else this.active--; // no waiter: free the slot + } +} + +const hostGates = new Map(); +function gateForHost(host: string): Semaphore { + let gate = hostGates.get(host); + if (!gate) { + gate = new Semaphore(MAX_CONCURRENCY); + hostGates.set(host, gate); + } + return gate; +} + +/** Parse a 429 `Retry-After` (seconds, or HTTP-date) into ms; null if absent. */ +function retryAfterMs(res: Response): number | null { + const h = res.headers.get('retry-after'); + if (!h) return null; + const secs = Number(h); + if (Number.isFinite(secs)) return Math.max(0, secs * 1000); + const when = Date.parse(h); + return Number.isNaN(when) ? null : Math.max(0, when - Date.now()); +} + +const sleep = (ms: number): Promise => + new Promise((resolve) => { + const t = setTimeout(resolve, ms); + (t as { unref?: () => void }).unref?.(); + }); + +/** + * Wrap a Response so `release` runs exactly once when its body is fully read, + * cancelled, or errors — that is what holds the concurrency slot for the entire + * (possibly streaming) response. Bodyless responses release immediately. + */ +function releaseOnBodyEnd(res: Response, release: () => void): Response { + if (!res.body) { + release(); + return res; + } + const reader = res.body.getReader(); + const wrapped = new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read(); + if (done) { + controller.close(); + release(); + } else { + controller.enqueue(value); + } + } catch (err) { + controller.error(err); + release(); + } + }, + cancel(reason) { + release(); + return reader.cancel(reason); + }, + }); + return new Response(wrapped, { + status: res.status, + statusText: res.statusText, + headers: res.headers, + }); +} + /** * A `fetch`-compatible function that routes the request through the shared, * resilient AI dispatcher. Injected into AI SDK provider factories via their @@ -162,24 +276,65 @@ export const aiFetch: typeof fetch = async (input, init) => { : input instanceof URL ? input.href : (input as Request).url; + let host = ''; let path = rawUrl; try { const u = new URL(rawUrl); + host = u.host; path = u.host + u.pathname; } catch { // Non-absolute / unparseable URL: keep the raw string (still no secrets). } - const isChat = /\/(chat\/completions|responses)\b/.test(path); + const isChat = /\/(chat\/completions|responses|messages)\b/.test(path); const log = (msg: string): void => isChat ? logger.log(msg) : logger.debug(msg); + + // Serialize per host (z.ai concurrency limit, #140). The slot is held until the + // response BODY is fully consumed (see releaseOnBodyEnd), with a defensive + // max-hold so a hung stream can never deadlock all AI traffic. + const gate = gateForHost(host); + await gate.acquire(); + let released = false; + const holdTimer = setTimeout(() => { + if (released) return; + released = true; + gate.release(); + logger.warn( + `provider request #${id} held the ${host || 'ai'} slot > ${MAX_HOLD_MS}ms; releasing defensively`, + ); + }, MAX_HOLD_MS); + (holdTimer as { unref?: () => void }).unref?.(); + const release = (): void => { + if (released) return; + released = true; + clearTimeout(holdTimer); + gate.release(); + }; + const startedAt = performance.now(); log(`provider request #${id} -> ${method} ${path}`); try { - const res = await fetch(input, { ...init, dispatcher } as RequestInit); - const ms = Math.round(performance.now() - startedAt); - log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`); - return res; + for (let attempt = 0; ; attempt++) { + const res = await fetch(input, { ...init, dispatcher } as RequestInit); + const ms = Math.round(performance.now() - startedAt); + // 429 (rate limit) — wait out z.ai's throttle (Retry-After or exponential + // backoff) and retry, instead of failing the turn. Keep holding the slot: + // we ARE the single in-flight request; the backoff naturally paces us. + if (res.status === 429 && attempt < MAX_429_RETRIES) { + const wait = retryAfterMs(res) ?? Math.min(8_000, 500 * 2 ** attempt); + log( + `provider request #${id} <- 429 in ${ms}ms; backoff ${wait}ms (attempt ${attempt + 1}/${MAX_429_RETRIES})`, + ); + await res.body?.cancel?.().catch(() => {}); + await sleep(wait); + continue; + } + log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`); + // Hold the concurrency slot until the (possibly streamed) body is consumed. + return releaseOnBodyEnd(res, release); + } } catch (err) { + release(); const ms = Math.round(performance.now() - startedAt); // Node's fetch reports a generic "fetch failed"; the real reason (e.g. an // undici SocketError with .code ECONNRESET / UND_ERR_SOCKET /