fix(ai-http): serialize per-host AI requests + back off on 429 (z.ai #140)
z.ai's GLM Coding Plan throttles hard (429s) and stalls under bursts/overlap; two mitigations in the shared AI transport: - Per-host concurrency gate (default 1, env AI_HTTP_MAX_CONCURRENCY): outbound AI requests to a given host are serialized, and the slot is held until the (streamed) response body is fully consumed — so a chat stream blocks an overlapping title-gen / second-tab / RAG-embedding request instead of tripping z.ai's ~1-concurrent limit. A defensive max-hold (AI_HTTP_MAX_HOLD_MS, 10 min) prevents a hung stream from deadlocking all AI traffic (headers/body timeouts are disabled, see #144). - 429 backoff (AI_HTTP_MAX_429_RETRIES, default 3): respect Retry-After (or exponential backoff) and retry, so a rate-limited agent step waits the throttle out instead of failing the whole turn. NOTE: these address the burst/overlap dimension. The dominant symptom — z.ai's erratic time-to-first-byte (measured 2s..56s, endpoint/UA/tool-count-independent) — is mitigated by #144 (wait like curl); it is a z.ai-side capacity issue, not something client code can speed up. Tests: ai-http.spec.ts gains a concurrency-gate test (a 2nd request to the same host does not hit the server until the 1st body is consumed) and a 429-backoff test (Retry-After honored, eventual 200). 8/8 pass; typecheck clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -194,4 +194,58 @@ describe('ai-http', () => {
|
|||||||
await srv.close();
|
await srv.close();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('#140 z.ai concurrency gate + 429 backoff', () => {
|
||||||
|
it('serializes concurrent requests to the same host until the first body is consumed', async () => {
|
||||||
|
const arrivals: string[] = [];
|
||||||
|
const srv = await loopback((req, res) => {
|
||||||
|
arrivals.push(req.url ?? '');
|
||||||
|
setTimeout(() => {
|
||||||
|
res.writeHead(200, { 'content-type': 'text/plain' });
|
||||||
|
res.end('body');
|
||||||
|
}, 30);
|
||||||
|
});
|
||||||
|
const mk = (q: string): Promise<Response> =>
|
||||||
|
aiFetch(`${srv.url}/chat/completions?${q}`, { method: 'POST', body: q });
|
||||||
|
|
||||||
|
// A acquires the only slot and gets its headers; its BODY is not yet read,
|
||||||
|
// so the slot is still held.
|
||||||
|
const pA = mk('a');
|
||||||
|
const resA = await pA;
|
||||||
|
// B is requested now but must QUEUE behind A's still-open body.
|
||||||
|
const pB = mk('b');
|
||||||
|
await new Promise((r) => setTimeout(r, 120));
|
||||||
|
// B must NOT have hit the server yet — only A has arrived.
|
||||||
|
expect(arrivals.length).toBe(1);
|
||||||
|
|
||||||
|
// Consuming A's body releases the slot, letting B proceed.
|
||||||
|
await resA.text();
|
||||||
|
const resB = await pB;
|
||||||
|
await resB.text();
|
||||||
|
expect(arrivals.length).toBe(2);
|
||||||
|
await srv.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('waits out a 429 (Retry-After) and returns the eventual success', async () => {
|
||||||
|
let n = 0;
|
||||||
|
const srv = await loopback((_req, res) => {
|
||||||
|
n++;
|
||||||
|
if (n === 1) {
|
||||||
|
res.writeHead(429, { 'retry-after': '0' });
|
||||||
|
res.end('rate limited');
|
||||||
|
} else {
|
||||||
|
res.writeHead(200, { 'content-type': 'text/plain' });
|
||||||
|
res.end('ok');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
const res = await aiFetch(`${srv.url}/chat/completions`, {
|
||||||
|
method: 'POST',
|
||||||
|
body: 'x',
|
||||||
|
});
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(await res.text()).toBe('ok');
|
||||||
|
expect(n).toBe(2); // one 429, retried once on backoff, then 200
|
||||||
|
await srv.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -140,6 +140,120 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, {
|
|||||||
const logger = new Logger('AiHttp');
|
const logger = new Logger('AiHttp');
|
||||||
let requestSeq = 0;
|
let requestSeq = 0;
|
||||||
|
|
||||||
|
// PER-HOST CONCURRENCY GATE (z.ai GLM Coding Plan #140).
|
||||||
|
// ----------------------------------------------------
|
||||||
|
// z.ai's coding plan enforces an (undocumented) ~1 in-flight request limit per
|
||||||
|
// account and throttles bursts with 429s + minute-long stalls; OpenCode etc.
|
||||||
|
// "just work" because a CLI is naturally serial. Our server fires CONCURRENT AI
|
||||||
|
// calls (chat stream + title generation + RAG embeddings + multiple tabs), all
|
||||||
|
// on the same key, which trips that limit. So serialize outbound AI traffic PER
|
||||||
|
// HOST to a small concurrency (default 1). The slot is held for the WHOLE
|
||||||
|
// response — including the streamed SSE body — so a long chat stream blocks a
|
||||||
|
// second concurrent request until it finishes, exactly like a single CLI client.
|
||||||
|
// Env-tunable for providers that allow real concurrency (OpenAI/OpenRouter).
|
||||||
|
const MAX_CONCURRENCY = ((): number => {
|
||||||
|
const n = Number(process.env.AI_HTTP_MAX_CONCURRENCY);
|
||||||
|
return Number.isInteger(n) && n > 0 ? n : 1;
|
||||||
|
})();
|
||||||
|
// Defensive cap: never hold a slot longer than this even if the caller never
|
||||||
|
// finishes reading the body (a hung stream must not deadlock all AI traffic,
|
||||||
|
// since headersTimeout/bodyTimeout are disabled — see HEADERS_TIMEOUT_MS).
|
||||||
|
const MAX_HOLD_MS = ((): number => {
|
||||||
|
const n = Number(process.env.AI_HTTP_MAX_HOLD_MS);
|
||||||
|
return Number.isInteger(n) && n > 0 ? n : 600_000;
|
||||||
|
})();
|
||||||
|
// How many times to wait-out a 429 (rate limit) before surfacing it. The AI SDK
|
||||||
|
// retries on top, but z.ai's coding plan 429s hard during peak hours, so a short
|
||||||
|
// transport-level backoff that respects Retry-After absorbs most of it.
|
||||||
|
const MAX_429_RETRIES = ((): number => {
|
||||||
|
const n = Number(process.env.AI_HTTP_MAX_429_RETRIES);
|
||||||
|
return Number.isInteger(n) && n >= 0 ? n : 3;
|
||||||
|
})();
|
||||||
|
|
||||||
|
/** A fair, hand-off counting semaphore (FIFO). For max=1 it is a mutex. */
|
||||||
|
class Semaphore {
|
||||||
|
private active = 0;
|
||||||
|
private readonly waiters: Array<() => void> = [];
|
||||||
|
constructor(private readonly max: number) {}
|
||||||
|
async acquire(): Promise<void> {
|
||||||
|
if (this.active < this.max) {
|
||||||
|
this.active++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await new Promise<void>((resolve) => this.waiters.push(resolve));
|
||||||
|
// Resumed via release(): the slot was handed directly to us.
|
||||||
|
}
|
||||||
|
release(): void {
|
||||||
|
const next = this.waiters.shift();
|
||||||
|
if (next) next(); // hand the slot to the next waiter; active unchanged
|
||||||
|
else this.active--; // no waiter: free the slot
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const hostGates = new Map<string, Semaphore>();
|
||||||
|
function gateForHost(host: string): Semaphore {
|
||||||
|
let gate = hostGates.get(host);
|
||||||
|
if (!gate) {
|
||||||
|
gate = new Semaphore(MAX_CONCURRENCY);
|
||||||
|
hostGates.set(host, gate);
|
||||||
|
}
|
||||||
|
return gate;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Parse a 429 `Retry-After` (seconds, or HTTP-date) into ms; null if absent. */
|
||||||
|
function retryAfterMs(res: Response): number | null {
|
||||||
|
const h = res.headers.get('retry-after');
|
||||||
|
if (!h) return null;
|
||||||
|
const secs = Number(h);
|
||||||
|
if (Number.isFinite(secs)) return Math.max(0, secs * 1000);
|
||||||
|
const when = Date.parse(h);
|
||||||
|
return Number.isNaN(when) ? null : Math.max(0, when - Date.now());
|
||||||
|
}
|
||||||
|
|
||||||
|
const sleep = (ms: number): Promise<void> =>
|
||||||
|
new Promise((resolve) => {
|
||||||
|
const t = setTimeout(resolve, ms);
|
||||||
|
(t as { unref?: () => void }).unref?.();
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap a Response so `release` runs exactly once when its body is fully read,
|
||||||
|
* cancelled, or errors — that is what holds the concurrency slot for the entire
|
||||||
|
* (possibly streaming) response. Bodyless responses release immediately.
|
||||||
|
*/
|
||||||
|
function releaseOnBodyEnd(res: Response, release: () => void): Response {
|
||||||
|
if (!res.body) {
|
||||||
|
release();
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
const reader = res.body.getReader();
|
||||||
|
const wrapped = new ReadableStream<Uint8Array>({
|
||||||
|
async pull(controller) {
|
||||||
|
try {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) {
|
||||||
|
controller.close();
|
||||||
|
release();
|
||||||
|
} else {
|
||||||
|
controller.enqueue(value);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
controller.error(err);
|
||||||
|
release();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
cancel(reason) {
|
||||||
|
release();
|
||||||
|
return reader.cancel(reason);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return new Response(wrapped, {
|
||||||
|
status: res.status,
|
||||||
|
statusText: res.statusText,
|
||||||
|
headers: res.headers,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `fetch`-compatible function that routes the request through the shared,
|
* A `fetch`-compatible function that routes the request through the shared,
|
||||||
* resilient AI dispatcher. Injected into AI SDK provider factories via their
|
* resilient AI dispatcher. Injected into AI SDK provider factories via their
|
||||||
@@ -162,24 +276,65 @@ export const aiFetch: typeof fetch = async (input, init) => {
|
|||||||
: input instanceof URL
|
: input instanceof URL
|
||||||
? input.href
|
? input.href
|
||||||
: (input as Request).url;
|
: (input as Request).url;
|
||||||
|
let host = '';
|
||||||
let path = rawUrl;
|
let path = rawUrl;
|
||||||
try {
|
try {
|
||||||
const u = new URL(rawUrl);
|
const u = new URL(rawUrl);
|
||||||
|
host = u.host;
|
||||||
path = u.host + u.pathname;
|
path = u.host + u.pathname;
|
||||||
} catch {
|
} catch {
|
||||||
// Non-absolute / unparseable URL: keep the raw string (still no secrets).
|
// Non-absolute / unparseable URL: keep the raw string (still no secrets).
|
||||||
}
|
}
|
||||||
const isChat = /\/(chat\/completions|responses)\b/.test(path);
|
const isChat = /\/(chat\/completions|responses|messages)\b/.test(path);
|
||||||
const log = (msg: string): void =>
|
const log = (msg: string): void =>
|
||||||
isChat ? logger.log(msg) : logger.debug(msg);
|
isChat ? logger.log(msg) : logger.debug(msg);
|
||||||
|
|
||||||
|
// Serialize per host (z.ai concurrency limit, #140). The slot is held until the
|
||||||
|
// response BODY is fully consumed (see releaseOnBodyEnd), with a defensive
|
||||||
|
// max-hold so a hung stream can never deadlock all AI traffic.
|
||||||
|
const gate = gateForHost(host);
|
||||||
|
await gate.acquire();
|
||||||
|
let released = false;
|
||||||
|
const holdTimer = setTimeout(() => {
|
||||||
|
if (released) return;
|
||||||
|
released = true;
|
||||||
|
gate.release();
|
||||||
|
logger.warn(
|
||||||
|
`provider request #${id} held the ${host || 'ai'} slot > ${MAX_HOLD_MS}ms; releasing defensively`,
|
||||||
|
);
|
||||||
|
}, MAX_HOLD_MS);
|
||||||
|
(holdTimer as { unref?: () => void }).unref?.();
|
||||||
|
const release = (): void => {
|
||||||
|
if (released) return;
|
||||||
|
released = true;
|
||||||
|
clearTimeout(holdTimer);
|
||||||
|
gate.release();
|
||||||
|
};
|
||||||
|
|
||||||
const startedAt = performance.now();
|
const startedAt = performance.now();
|
||||||
log(`provider request #${id} -> ${method} ${path}`);
|
log(`provider request #${id} -> ${method} ${path}`);
|
||||||
try {
|
try {
|
||||||
|
for (let attempt = 0; ; attempt++) {
|
||||||
const res = await fetch(input, { ...init, dispatcher } as RequestInit);
|
const res = await fetch(input, { ...init, dispatcher } as RequestInit);
|
||||||
const ms = Math.round(performance.now() - startedAt);
|
const ms = Math.round(performance.now() - startedAt);
|
||||||
|
// 429 (rate limit) — wait out z.ai's throttle (Retry-After or exponential
|
||||||
|
// backoff) and retry, instead of failing the turn. Keep holding the slot:
|
||||||
|
// we ARE the single in-flight request; the backoff naturally paces us.
|
||||||
|
if (res.status === 429 && attempt < MAX_429_RETRIES) {
|
||||||
|
const wait = retryAfterMs(res) ?? Math.min(8_000, 500 * 2 ** attempt);
|
||||||
|
log(
|
||||||
|
`provider request #${id} <- 429 in ${ms}ms; backoff ${wait}ms (attempt ${attempt + 1}/${MAX_429_RETRIES})`,
|
||||||
|
);
|
||||||
|
await res.body?.cancel?.().catch(() => {});
|
||||||
|
await sleep(wait);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`);
|
log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`);
|
||||||
return res;
|
// Hold the concurrency slot until the (possibly streamed) body is consumed.
|
||||||
|
return releaseOnBodyEnd(res, release);
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
release();
|
||||||
const ms = Math.round(performance.now() - startedAt);
|
const ms = Math.round(performance.now() - startedAt);
|
||||||
// Node's fetch reports a generic "fetch failed"; the real reason (e.g. an
|
// Node's fetch reports a generic "fetch failed"; the real reason (e.g. an
|
||||||
// undici SocketError with .code ECONNRESET / UND_ERR_SOCKET /
|
// undici SocketError with .code ECONNRESET / UND_ERR_SOCKET /
|
||||||
|
|||||||
Reference in New Issue
Block a user