From c065e26d14c36cc3d11eb7cc5ae73935cd9d11b4 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Thu, 25 Jun 2026 00:10:40 +0300 Subject: [PATCH] refactor(ai): retry outside instrumentation + retry-exhaustion test (#179 review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Invert the transport layers so the pre-response retry is OUTERMOST and the provider-HTTP instrumentation is INNER. Before, the retry lived inside createStreamingFetch (under the instrumentation), so a reset the retry recovered from logged only a clean "OK status=200" — the "PRE-RESPONSE FAILED ... ECONNRESET ... idleSincePrevCall" signal went blind exactly when the fix works, and AI_STREAM_KEEPALIVE_MS couldn't be tuned from prod data. Now createStreamingFetch is the dispatcher-bound BASE (no retry) and a new withPreResponseRetry() wraps it; ai.service composes withPreResponseRetry(createInstrumentedFetch('AiService:provider-http', createStreamingFetch())), so every attempt — including recovered resets — flows through the instrumentation. (Also expresses the keepAlive-config vs retry- behavior boundary structurally, per review #3.) - Add the retry-exhaustion test: a server that resets EVERY connection, asserting the call rejects with a retryable connection error AND exactly PRE_RESPONSE_CONNECT_RETRIES + 1 (= 3) requests reached the server — pinning the bound and that the final error propagates (guards an off-by-one / infinite loop / swallowed error). Existing happy-retry + abort tests moved onto withPreResponseRetry. Verified on the stand: a normal turn still streams (reasoning + finish) and the provider-HTTP telemetry still logs. server tsc + ai/mcp specs green (30). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../ai/ai-streaming-fetch.spec.ts | 47 +++++++++++++++---- .../src/integrations/ai/ai-streaming-fetch.ts | 40 +++++++++++----- apps/server/src/integrations/ai/ai.service.ts | 22 +++++---- 3 files changed, 80 insertions(+), 29 deletions(-) diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts index 1af56a26..07c8ec40 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts @@ -1,6 +1,7 @@ import * as http from 'node:http'; import { createStreamingFetch, + withPreResponseRetry, streamTimeoutMs, streamKeepAliveMs, streamingDispatcherOptions, @@ -152,17 +153,25 @@ describe('createStreamingFetch — against a delayed server', () => { }); }); -describe('createStreamingFetch — pre-response connection retry', () => { +describe('withPreResponseRetry', () => { + // The retry is the OUTERMOST layer (over the dispatcher-bound streaming fetch), + // matching ai.service's withPreResponseRetry(instrument(createStreamingFetch())). + // PRE_RESPONSE_CONNECT_RETRIES is 2 -> at most 3 total attempts. + const MAX_ATTEMPTS = 3; let server: http.Server; let url: string; let requests = 0; + // 'first' resets only the first connection; 'all' resets every connection. + let resetMode: 'first' | 'all' = 'first'; + + const retryingFetch = () => withPreResponseRetry(createStreamingFetch()); beforeAll(async () => { server = http.createServer((req, res) => { requests += 1; - if (requests === 1) { - // Reset the FIRST connection before any response byte (a poisoned/stale - // keep-alive socket). The retry must open a fresh connection. + const shouldReset = resetMode === 'all' || requests === 1; + if (shouldReset) { + // Reset before any response byte (a poisoned/stale keep-alive socket). const sock = req.socket as import('node:net').Socket & { resetAndDestroy?: () => void; }; @@ -184,22 +193,42 @@ describe('createStreamingFetch — pre-response connection retry', () => { beforeEach(() => { requests = 0; + resetMode = 'first'; }); it('retries a pre-response reset on a fresh connection and succeeds', async () => { - const streamingFetch = createStreamingFetch(); - const res = await streamingFetch(url); + resetMode = 'first'; + const res = await retryingFetch()(url); expect(res.status).toBe(200); expect(await res.text()).toBe('ok'); // first request reset -> retry -> second request served. - expect(requests).toBeGreaterThanOrEqual(2); + expect(requests).toBe(2); + }); + + it('gives up after the retry bound and rethrows the original reset', async () => { + resetMode = 'all'; // every attempt resets -> retries exhaust + let caught: unknown; + try { + await retryingFetch()(url); + } catch (e) { + caught = e; + } + expect(caught).toBeDefined(); + // A retryable connection error reached the caller (not swallowed). + expect(isRetryableConnectError(caught)).toBe(true); + // Bounded: exactly PRE_RESPONSE_CONNECT_RETRIES + 1 attempts hit the server + // (pins both the limit and that the final error propagates — guards an + // off-by-one or an infinite loop). + expect(requests).toBe(MAX_ATTEMPTS); }); it('does NOT retry an aborted request (no retry storm)', async () => { + resetMode = 'all'; const ctrl = new AbortController(); ctrl.abort(); - const streamingFetch = createStreamingFetch(); - await expect(streamingFetch(url, { signal: ctrl.signal })).rejects.toBeDefined(); + await expect( + retryingFetch()(url, { signal: ctrl.signal }), + ).rejects.toBeDefined(); // Pre-aborted: the request never reached the server, so nothing was retried. expect(requests).toBe(0); }); diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.ts index 75a3770d..b781df9a 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.ts @@ -104,23 +104,41 @@ export function isRetryableConnectError(err: unknown): boolean { * recycling, #175). A single shared dispatcher is returned (callers hold it for * the service lifetime) so its connection pool is reused. * - * On a PRE-RESPONSE connection reset (`fetch()` rejects before the Response - * resolves — so nothing has streamed) it retries a few times on a fresh - * connection. A poisoned keep-alive socket is destroyed by undici on the reset, - * so the retry lands on a new connection. An abort (client disconnect) is never - * retried. + * This is the BASE transport — no retry. The chat path wraps it as + * `withPreResponseRetry(createInstrumentedFetch(ctx, createStreamingFetch()))` + * so the retry is the OUTERMOST layer and the instrumentation observes EVERY + * attempt (a recovered reset is still logged — see withPreResponseRetry). */ export function createStreamingFetch(): typeof fetch { const dispatcher = new Agent(streamingDispatcherOptions()); + return ((input: Parameters[0], init?: RequestInit) => + fetch(input, { + ...(init ?? {}), + // `dispatcher` is an undici-specific init field (not in the DOM + // RequestInit type); Node's global fetch reads it. Cast to satisfy it. + dispatcher, + } as RequestInit & { dispatcher: Agent })) as typeof fetch; +} + +/** + * Wrap a fetch so a PRE-RESPONSE connection reset (`baseFetch` rejects before the + * Response resolves — so nothing has streamed) is retried a few times on a fresh + * connection (#175). A poisoned keep-alive socket is destroyed by undici on the + * reset, so the retry lands on a new connection. An abort (client disconnect) is + * never retried. + * + * This is the OUTERMOST transport layer by design: composing it as + * `withPreResponseRetry(instrumentedFetch)` means every attempt — including the + * resets that the retry recovers from — flows through the instrumentation, so the + * "PRE-RESPONSE FAILED ... ECONNRESET ... idleSincePrevCall" telemetry stays + * visible precisely when the fix is working (and AI_STREAM_KEEPALIVE_MS can be + * tuned from real data). A retry INSIDE the transport would hide it. + */ +export function withPreResponseRetry(baseFetch: typeof fetch): typeof fetch { return (async (input: Parameters[0], init?: RequestInit) => { for (let attempt = 0; ; attempt++) { try { - return await fetch(input, { - ...(init ?? {}), - // `dispatcher` is an undici-specific init field (not in the DOM - // RequestInit type); Node's global fetch reads it. Cast to satisfy it. - dispatcher, - } as RequestInit & { dispatcher: Agent }); + return await baseFetch(input, init); } catch (err) { const aborted = init?.signal?.aborted === true; if ( diff --git a/apps/server/src/integrations/ai/ai.service.ts b/apps/server/src/integrations/ai/ai.service.ts index 18f15b5d..16aa6997 100644 --- a/apps/server/src/integrations/ai/ai.service.ts +++ b/apps/server/src/integrations/ai/ai.service.ts @@ -16,7 +16,10 @@ import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { describeProviderError } from './ai-error.util'; import { createInstrumentedFetch } from './ai-provider-http'; -import { createStreamingFetch } from './ai-streaming-fetch'; +import { + createStreamingFetch, + withPreResponseRetry, +} from './ai-streaming-fetch'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { SecretBoxService } from '../crypto/secret-box'; import { AiDriver } from './ai.types'; @@ -46,14 +49,15 @@ export interface ChatModelOverride { export class AiService { private readonly logger = new Logger(AiService.name); - // Provider HTTP fetch for the chat path: the streaming fetch — which RAISES - // undici's 300s headers/body timeouts to a generous-but-finite silence timeout - // so a long agent turn is not severed mid-stream (#175) — wrapped with the - // provider-HTTP instrumentation so the logs observe that exact transport. Held - // for the service lifetime to reuse the streaming dispatcher's connection pool. - private readonly aiProviderFetch = createInstrumentedFetch( - 'AiService:provider-http', - createStreamingFetch(), + // Provider HTTP fetch for the chat path, layered so each transport concern is + // observed (#175). Inside-out: the streaming fetch (finite silence timeouts + + // keep-alive recycling) → provider-HTTP instrumentation (logs every attempt) → + // pre-response connection-reset retry as the OUTERMOST layer. Retry-outer means + // a reset the retry recovers from is still logged with its idle-gap, instead of + // collapsing into a clean "OK". Held for the service lifetime to reuse the + // streaming dispatcher's connection pool. + private readonly aiProviderFetch = withPreResponseRetry( + createInstrumentedFetch('AiService:provider-http', createStreamingFetch()), ); constructor(