refactor(ai): address PR #176 review — finite-timeout wording, env doc, tests, permanent provider-http module

- Wording: every comment now says the stream timeouts are RAISED to a
  generous-but-finite ~15-min silence timeout, not "disabled (0)" (the stale
  comments contradicted the code, which uses AI_STREAM_TIMEOUT_MS, default
  900000ms).
- Architecture (the load-bearing-temporary trap): the streaming fetch reached
  the chat provider only by riding the "temporary DIAGNOSTIC" telemetry, so
  deleting the telemetry by its own label would silently revert the timeout fix.
  Legitimize it: rename ai-http-diagnostics.ts -> ai-provider-http.ts,
  createDiagnosticFetch -> createInstrumentedFetch, field aiDiagnosticFetch ->
  aiProviderFetch, drop the "temporary" labels, and document the chat transport
  (streaming fetch + instrumentation) as one intentional construct.
- Docs: AI_STREAM_TIMEOUT_MS added to .env.example next to AI_EMBEDDING_TIMEOUT_MS.
- Tests:
  - ai-provider-http.spec: createInstrumentedFetch delegates to the injected
    baseFetch with the same input/init, returns the Response untouched, rethrows
    the error, and defaults to global fetch — covering the baseFetch seam.
  - ai-streaming-fetch.spec: the delayed-server test is now LOAD-BEARING — with
    AI_STREAM_TIMEOUT_MS set below the 1.5s server delay the call actually rejects
    (a lost dispatcher -> global 300s default would NOT), proving the configured
    dispatcher is wired; plus the default-timeout happy path.

server tsc clean; ai-streaming-fetch / ai-provider-http / ai.service / mcp-servers
/ ai-error specs green (41).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
claude code agent 227
2026-06-24 22:31:58 +03:00
parent a14560c7c9
commit da15b55786
5 changed files with 114 additions and 30 deletions

View File

@@ -136,6 +136,12 @@ MCP_DOCMOST_PASSWORD=
# A slow/hung embeddings endpoint fails after this and the batch continues. # A slow/hung embeddings endpoint fails after this and the batch continues.
# AI_EMBEDDING_TIMEOUT_MS=120000 # AI_EMBEDDING_TIMEOUT_MS=120000
# Silence timeout (ms) for streaming chat/agent AI calls AND external-MCP traffic.
# Bounds time-to-first-byte and the gap BETWEEN chunks (NOT the total turn length),
# so an arbitrarily long turn that keeps streaming is never cut. Finite so a hung
# provider is eventually broken instead of leaking forever. Default 900000 (15 min).
# AI_STREAM_TIMEOUT_MS=900000
# --- Anonymous public-share AI assistant --- # --- Anonymous public-share AI assistant ---
# Opt-in per workspace (AI settings -> "public share assistant"; off by default). # Opt-in per workspace (AI settings -> "public share assistant"; off by default).
# When enabled, anonymous visitors of a published share can ask an AI about that # When enabled, anonymous visitors of a published share can ask an AI about that

View File

@@ -0,0 +1,40 @@
import { createInstrumentedFetch } from './ai-provider-http';
/**
* createInstrumentedFetch must be behavior-neutral: it delegates to the supplied
* baseFetch with the SAME input/init, returns the Response object untouched (so
* the streamed SSE body is never read/cloned), and rethrows the same error. The
* baseFetch injection is the seam that carries the streaming fetch (#175) onto
* the chat provider, so it is tested directly.
*/
describe('createInstrumentedFetch', () => {
it('delegates to the injected baseFetch with the same input/init', async () => {
const fakeResponse = new Response('ok', { status: 200 });
const baseFetch = jest.fn().mockResolvedValue(fakeResponse);
const instrumented = createInstrumentedFetch('test', baseFetch as never);
const init = { method: 'POST', body: '{"q":1}' };
const res = await instrumented('https://example.com/v1/chat', init);
expect(baseFetch).toHaveBeenCalledTimes(1);
expect(baseFetch).toHaveBeenCalledWith('https://example.com/v1/chat', init);
// The Response is returned UNTOUCHED (same reference — never read/cloned).
expect(res).toBe(fakeResponse);
});
it('rethrows the base fetch error unchanged (pre-response failure)', async () => {
const err = Object.assign(new TypeError('fetch failed'), {
cause: { code: 'ECONNRESET' },
});
const baseFetch = jest.fn().mockRejectedValue(err);
const instrumented = createInstrumentedFetch('test', baseFetch as never);
await expect(instrumented('https://example.com/')).rejects.toBe(err);
});
it('defaults to the global fetch when no baseFetch is given', () => {
// Constructing without a baseFetch must not throw — it simply wraps global
// fetch (the non-chat default).
expect(() => createInstrumentedFetch('test')).not.toThrow();
});
});

View File

@@ -1,16 +1,22 @@
import { Logger } from '@nestjs/common'; import { Logger } from '@nestjs/common';
/** /**
* DIAGNOSTIC (provider ECONNRESET investigation) temporary. * The provider HTTP fetch used by the chat path: a thin, behavior-neutral
* instrumentation wrapper around a supplied `fetch`.
* *
* A PASSIVE, behavior-neutral wrapper around the global `fetch`, injected into * It defaults to the global `fetch`, but the chat provider passes the streaming
* the OpenAI-compatible provider client (`createOpenAI({ fetch })`, the z.ai * fetch (which RAISES undici's 300s stream timeouts to a generous-but-finite
* path). Per provider HTTP call it logs: time-to-response-headers + status + * silence timeout so a long agent turn is not severed mid-stream #175). So this
* request-body size on success; and on a pre-response rejection the failure * wrapper observes the EXACT transport a turn uses. It NEVER retries, times out,
* latency + error code/cause + request-body size + the idle gap since the * swaps the dispatcher, or reads/clones the response body the Response is
* previous provider call. It NEVER retries, times out, swaps the dispatcher, or * returned untouched (streaming unaffected) and any error is rethrown unchanged.
* reads/clones the response body the Response is returned untouched (streaming *
* unaffected) and any error is rethrown unchanged. * Per provider HTTP call it logs: time-to-response-headers + status + request
* body size on success; and on a pre-response rejection the failure latency +
* error code/cause + request body size + the idle gap since the previous call.
* This telemetry is intentional and kept (it diagnoses provider connection
* resets / mid-stream cuts), and it is load-bearing: the streaming fetch reaches
* the chat provider THROUGH this wrapper, so the two are one construct.
* *
* How to read the result (a long agentic turn makes one provider call per step): * How to read the result (a long agentic turn makes one provider call per step):
* - a failed turn whose last provider line is "PRE-RESPONSE FAILED ... ECONNRESET" * - a failed turn whose last provider line is "PRE-RESPONSE FAILED ... ECONNRESET"
@@ -23,13 +29,13 @@ import { Logger } from '@nestjs/common';
* different failure mode. * different failure mode.
* *
* The seq/last-call timestamps are module-level, so under concurrent turns the * The seq/last-call timestamps are module-level, so under concurrent turns the
* idle-gap figure is approximate (fine for single-user reproduction). * idle-gap figure is approximate (fine for single-user diagnosis).
*/ */
export function createDiagnosticFetch( export function createInstrumentedFetch(
context: string, context: string,
// The underlying fetch to instrument. Defaults to the global fetch; the chat // The underlying fetch to instrument. Defaults to the global fetch; the chat
// provider passes a streaming fetch (disabled undici stream timeouts, #175) so // provider passes the streaming fetch (raised, finite undici stream timeouts,
// the telemetry observes the SAME transport the long agent turn actually uses. // #175) so the telemetry observes the SAME transport the long agent turn uses.
baseFetch: typeof fetch = fetch, baseFetch: typeof fetch = fetch,
): typeof fetch { ): typeof fetch {
const logger = new Logger(context); const logger = new Logger(context);
@@ -56,7 +62,7 @@ export function createDiagnosticFetch(
// clone the body) so the streamed SSE response is unaffected. // clone the body) so the streamed SSE response is unaffected.
const res = await baseFetch(input, init); const res = await baseFetch(input, init);
logger.log( logger.log(
`provider HTTP DIAGNOSTIC: call#${callId} OK ` + `provider HTTP: call#${callId} OK ` +
`headersAfter=${Date.now() - startedAt}ms status=${res.status} ` + `headersAfter=${Date.now() - startedAt}ms status=${res.status} ` +
`reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`, `reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`,
); );
@@ -70,7 +76,7 @@ export function createDiagnosticFetch(
cause?: { code?: string; message?: string }; cause?: { code?: string; message?: string };
}; };
logger.warn( logger.warn(
`provider HTTP DIAGNOSTIC: call#${callId} PRE-RESPONSE FAILED ` + `provider HTTP: call#${callId} PRE-RESPONSE FAILED ` +
`after=${Date.now() - startedAt}ms code=${e?.cause?.code ?? 'none'} ` + `after=${Date.now() - startedAt}ms code=${e?.cause?.code ?? 'none'} ` +
`name=${e?.name ?? 'Error'} cause=${e?.cause?.message ?? e?.message ?? 'unknown'} ` + `name=${e?.name ?? 'Error'} cause=${e?.cause?.message ?? e?.message ?? 'unknown'} ` +
`reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`, `reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`,

View File

@@ -48,10 +48,13 @@ describe('streamTimeoutMs', () => {
}); });
describe('createStreamingFetch — against a delayed server', () => { describe('createStreamingFetch — against a delayed server', () => {
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
let server: http.Server; let server: http.Server;
let url: string; let url: string;
// The server waits before sending ANY byte (a long time-to-first-token). // The server waits before sending ANY byte (a long time-to-first-token). It is
const DELAY = 400; // > undici's ~1s timeout-timer granularity so a sub-second configured timeout
// fires deterministically in the load-bearing test below.
const DELAY = 1500;
beforeAll(async () => { beforeAll(async () => {
server = http.createServer((_req, res) => { server = http.createServer((_req, res) => {
@@ -69,10 +72,41 @@ describe('createStreamingFetch — against a delayed server', () => {
await new Promise<void>((resolve) => server.close(() => resolve())); await new Promise<void>((resolve) => server.close(() => resolve()));
}); });
it('streams the delayed response instead of timing out', async () => { afterEach(() => {
if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS;
else process.env.AI_STREAM_TIMEOUT_MS = ORIG;
});
it('streams the delayed response at the default (generous) timeout', async () => {
delete process.env.AI_STREAM_TIMEOUT_MS; // default 15 min >> DELAY
const streamingFetch = createStreamingFetch(); const streamingFetch = createStreamingFetch();
const res = await streamingFetch(url); const res = await streamingFetch(url);
expect(res.status).toBe(200); expect(res.status).toBe(200);
expect(await res.text()).toBe('ok'); expect(await res.text()).toBe('ok');
}); });
it('LOAD-BEARING: a sub-DELAY AI_STREAM_TIMEOUT_MS actually severs the response', async () => {
// Proves the configured dispatcher is wired into the fetch: with the timeout
// set below DELAY the call must reject with undici's headers-timeout. If the
// dispatcher were lost (fallback to global fetch's 300s default), the 1.5s
// response would slip through and this would NOT throw.
process.env.AI_STREAM_TIMEOUT_MS = '500';
const streamingFetch = createStreamingFetch();
let caught: unknown;
const startedAt = Date.now();
try {
await streamingFetch(url).then((r) => r.text());
} catch (e) {
caught = e;
}
// It rejected (a lost dispatcher -> global 300s default would NOT reject on a
// 1.5s response) and it did so BEFORE the response would have arrived (DELAY).
// Use `.name` (realm-safe) — undici's TypeError fails cross-realm instanceof.
expect(caught).toBeDefined();
expect((caught as Error)?.name).toBe('TypeError');
expect(Date.now() - startedAt).toBeLessThan(DELAY);
// When present, the undici cause is the headers timeout.
const code = (caught as { cause?: { code?: string } })?.cause?.code;
if (code) expect(code).toBe('UND_ERR_HEADERS_TIMEOUT');
});
}); });

View File

@@ -14,8 +14,7 @@ import { AiNotConfiguredException } from './ai-not-configured.exception';
import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception'; import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception';
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception'; import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
import { describeProviderError } from './ai-error.util'; import { describeProviderError } from './ai-error.util';
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary. import { createInstrumentedFetch } from './ai-provider-http';
import { createDiagnosticFetch } from './ai-http-diagnostics';
import { createStreamingFetch } from './ai-streaming-fetch'; import { createStreamingFetch } from './ai-streaming-fetch';
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
import { SecretBoxService } from '../crypto/secret-box'; import { SecretBoxService } from '../crypto/secret-box';
@@ -46,12 +45,12 @@ export interface ChatModelOverride {
export class AiService { export class AiService {
private readonly logger = new Logger(AiService.name); private readonly logger = new Logger(AiService.name);
// Provider HTTP fetch for the chat path: a streaming fetch that DISABLES // Provider HTTP fetch for the chat path: the streaming fetch — which RAISES
// undici's 300s headers/body timeouts (#175 — long agent turns were severed // undici's 300s headers/body timeouts to a generous-but-finite silence timeout
// mid-stream), wrapped with passive ECONNRESET-investigation telemetry so the // so a long agent turn is not severed mid-stream (#175) — wrapped with the
// logs observe the exact transport the turn uses. Held for the service // provider-HTTP instrumentation so the logs observe that exact transport. Held
// lifetime to reuse the streaming dispatcher's connection pool. // for the service lifetime to reuse the streaming dispatcher's connection pool.
private readonly aiDiagnosticFetch = createDiagnosticFetch( private readonly aiProviderFetch = createInstrumentedFetch(
'AiService:provider-http', 'AiService:provider-http',
createStreamingFetch(), createStreamingFetch(),
); );
@@ -152,13 +151,12 @@ export class AiService {
// endpoint. The default callable createOpenAI(...)(model) targets the // endpoint. The default callable createOpenAI(...)(model) targets the
// Responses API (/responses), which OpenAI-compatible gateways // Responses API (/responses), which OpenAI-compatible gateways
// (OpenRouter, etc.) reject on multi-turn requests (history with // (OpenRouter, etc.) reject on multi-turn requests (history with
// assistant messages) → 400. // assistant messages) → 400. The provider fetch is the instrumented
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary: pass the // streaming fetch (finite-but-generous stream timeouts, #175).
// passive instrumented fetch (logging only; no behavior change).
return createOpenAI({ return createOpenAI({
apiKey, apiKey,
baseURL: baseUrl, baseURL: baseUrl,
fetch: this.aiDiagnosticFetch, fetch: this.aiProviderFetch,
}).chat(chatModel); }).chat(chatModel);
case 'gemini': case 'gemini':
return createGoogleGenerativeAI({ apiKey })(chatModel); return createGoogleGenerativeAI({ apiKey })(chatModel);