Compare commits
1 Commits
a5aa1185b1
...
a14560c7c9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a14560c7c9 |
@@ -6,6 +6,7 @@ import { createMCPClient } from '@ai-sdk/mcp';
|
|||||||
import { Agent, type Dispatcher } from 'undici';
|
import { Agent, type Dispatcher } from 'undici';
|
||||||
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
|
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
|
||||||
import { AiMcpServer } from '@docmost/db/types/entity.types';
|
import { AiMcpServer } from '@docmost/db/types/entity.types';
|
||||||
|
import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch';
|
||||||
import { SecretBoxService } from '../../../integrations/crypto/secret-box';
|
import { SecretBoxService } from '../../../integrations/crypto/secret-box';
|
||||||
import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
|
import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
|
||||||
|
|
||||||
@@ -400,17 +401,16 @@ export function validateResolvedAddresses(
|
|||||||
*/
|
*/
|
||||||
function buildPinnedDispatcher(): Agent {
|
function buildPinnedDispatcher(): Agent {
|
||||||
return new Agent({
|
return new Agent({
|
||||||
// Disable undici's default 300s headers/body timeouts on external MCP
|
// Raise undici's default 300s headers/body timeouts on external MCP traffic
|
||||||
// traffic. A long agent turn keeps an SSE transport (e.g. crawl4ai's
|
// to the same generous-but-finite silence timeout the chat fetch uses (#175).
|
||||||
// /mcp/sse) open across the whole turn; that connection can idle BETWEEN
|
// A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open
|
||||||
// tool calls longer than 5 min, and undici's bodyTimeout would otherwise
|
// across the whole turn; that connection can idle BETWEEN tool calls longer
|
||||||
// sever it mid-task — a tool-call failure that aborts the streamed turn and
|
// than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a
|
||||||
// shows the user "Lost connection to the AI provider" (#175). A slow single
|
// tool-call failure that aborts the streamed turn and shows the user "Lost
|
||||||
// tool call (a crawl) can likewise exceed headersTimeout. Connection
|
// connection to the AI provider". A slow single tool call (a crawl) can
|
||||||
// lifetime is bounded by the turn (clients are closed in onFinish/onError/
|
// likewise exceed headersTimeout. The timeout stays FINITE so a genuinely
|
||||||
// onAbort), so disabling the wall-clock cap is safe.
|
// hung server is still broken eventually.
|
||||||
headersTimeout: 0,
|
...streamingDispatcherOptions(),
|
||||||
bodyTimeout: 0,
|
|
||||||
connect: {
|
connect: {
|
||||||
lookup: (hostname, _options, callback) => {
|
lookup: (hostname, _options, callback) => {
|
||||||
// Always resolve ALL addresses ourselves; do not trust the caller's
|
// Always resolve ALL addresses ourselves; do not trust the caller's
|
||||||
|
|||||||
@@ -1,50 +1,78 @@
|
|||||||
import * as http from 'node:http';
|
import * as http from 'node:http';
|
||||||
import {
|
import {
|
||||||
createStreamingFetch,
|
createStreamingFetch,
|
||||||
STREAMING_DISPATCHER_OPTIONS,
|
streamTimeoutMs,
|
||||||
|
streamingDispatcherOptions,
|
||||||
} from './ai-streaming-fetch';
|
} from './ai-streaming-fetch';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* #175: Node's global fetch (undici) defaults headers/body timeouts to 300s and
|
* #175: undici's default 300s headers/body timeouts severed long agent turns.
|
||||||
* severs a long agent turn mid-stream. createStreamingFetch must DISABLE those
|
* The streaming fetch raises them to a generous-but-FINITE silence timeout (not
|
||||||
* timeouts (0). We pin the option contract deterministically AND confirm the
|
* 0 — a true hang must still break). We pin: the configured value + env override,
|
||||||
* built fetch actually streams a deliberately-delayed response.
|
* that both dispatcher timeouts use it, and that a delayed response streams.
|
||||||
*/
|
*/
|
||||||
describe('createStreamingFetch', () => {
|
describe('streamTimeoutMs', () => {
|
||||||
it('disables BOTH undici stream timeouts (the #175 contract)', () => {
|
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
|
||||||
expect(STREAMING_DISPATCHER_OPTIONS.headersTimeout).toBe(0);
|
afterEach(() => {
|
||||||
expect(STREAMING_DISPATCHER_OPTIONS.bodyTimeout).toBe(0);
|
if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||||
|
else process.env.AI_STREAM_TIMEOUT_MS = ORIG;
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('against a delayed server', () => {
|
it('defaults to a generous-but-finite 15 minutes', () => {
|
||||||
let server: http.Server;
|
delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||||
let url: string;
|
expect(streamTimeoutMs()).toBe(900_000);
|
||||||
// The server waits before sending ANY byte (a long time-to-first-token).
|
// Finite — NOT disabled (0 would let a hung provider leak forever).
|
||||||
const DELAY = 400;
|
expect(streamTimeoutMs()).toBeGreaterThan(0);
|
||||||
|
expect(Number.isFinite(streamTimeoutMs())).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
beforeAll(async () => {
|
it('honours a positive AI_STREAM_TIMEOUT_MS override', () => {
|
||||||
server = http.createServer((_req, res) => {
|
process.env.AI_STREAM_TIMEOUT_MS = '120000';
|
||||||
setTimeout(() => {
|
expect(streamTimeoutMs()).toBe(120000);
|
||||||
res.writeHead(200, { 'Content-Type': 'text/plain' });
|
});
|
||||||
res.end('ok');
|
|
||||||
}, DELAY);
|
|
||||||
});
|
|
||||||
await new Promise<void>((resolve) =>
|
|
||||||
server.listen(0, '127.0.0.1', resolve),
|
|
||||||
);
|
|
||||||
const addr = server.address() as import('node:net').AddressInfo;
|
|
||||||
url = `http://127.0.0.1:${addr.port}/`;
|
|
||||||
});
|
|
||||||
|
|
||||||
afterAll(async () => {
|
it('ignores an invalid / non-positive override (falls back to default)', () => {
|
||||||
await new Promise<void>((resolve) => server.close(() => resolve()));
|
for (const bad of ['0', '-5', 'abc', '']) {
|
||||||
});
|
process.env.AI_STREAM_TIMEOUT_MS = bad;
|
||||||
|
expect(streamTimeoutMs()).toBe(900_000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it('streams the delayed response instead of timing out', async () => {
|
it('applies the timeout to BOTH undici stream timeouts', () => {
|
||||||
const streamingFetch = createStreamingFetch();
|
delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||||
const res = await streamingFetch(url);
|
expect(streamingDispatcherOptions()).toEqual({
|
||||||
expect(res.status).toBe(200);
|
headersTimeout: 900_000,
|
||||||
expect(await res.text()).toBe('ok');
|
bodyTimeout: 900_000,
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('createStreamingFetch — against a delayed server', () => {
|
||||||
|
let server: http.Server;
|
||||||
|
let url: string;
|
||||||
|
// The server waits before sending ANY byte (a long time-to-first-token).
|
||||||
|
const DELAY = 400;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
server = http.createServer((_req, res) => {
|
||||||
|
setTimeout(() => {
|
||||||
|
res.writeHead(200, { 'Content-Type': 'text/plain' });
|
||||||
|
res.end('ok');
|
||||||
|
}, DELAY);
|
||||||
|
});
|
||||||
|
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
|
||||||
|
const addr = server.address() as import('node:net').AddressInfo;
|
||||||
|
url = `http://127.0.0.1:${addr.port}/`;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('streams the delayed response instead of timing out', async () => {
|
||||||
|
const streamingFetch = createStreamingFetch();
|
||||||
|
const res = await streamingFetch(url);
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(await res.text()).toBe('ok');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -1,40 +1,53 @@
|
|||||||
import { Agent } from 'undici';
|
import { Agent } from 'undici';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build a `fetch` for LONG-LIVED streaming AI calls (the agent chat turn).
|
* Default SILENCE timeout for streaming AI calls (15 min). Generous, but FINITE.
|
||||||
*
|
*
|
||||||
* Node's global fetch (undici) defaults BOTH `headersTimeout` and `bodyTimeout`
|
* Node's global fetch (undici) defaults headersTimeout and bodyTimeout to
|
||||||
* to 300_000ms. A legitimate long agent turn trips that and is severed
|
* 300_000ms, which severed legitimate long agent turns mid-stream — surfacing as
|
||||||
* mid-stream — surfacing to the user as "Lost connection to the AI provider"
|
* "Lost connection to the AI provider" (#175): a late step with a huge context
|
||||||
* (issue #175):
|
* pushes the model's time-to-first-token past 5 min, or a reasoning model pauses
|
||||||
* - headersTimeout (time to the first response byte) is exceeded when a late
|
* >5 min between chunks. We do NOT disable the timeout (0) — that would let a
|
||||||
* step sends a huge accumulated context and the model's time-to-first-token
|
* genuinely hung provider, with the client still connected, hang forever
|
||||||
* grows past 5 min;
|
* (abortSignal only fires on client disconnect). Instead we raise it well above
|
||||||
* - bodyTimeout (max gap BETWEEN body chunks) is exceeded when a reasoning
|
* any realistic gap while keeping it finite so a true hang is eventually broken.
|
||||||
* model pauses to "think" for more than 5 min between emitted chunks.
|
|
||||||
*
|
*
|
||||||
* This dispatcher disables both timeouts (0). Cancellation is NOT lost: the
|
* This bounds SILENCE (time-to-first-byte and the gap BETWEEN chunks), NOT total
|
||||||
* agent turn is bound to the request's abortSignal, which fires when the client
|
* turn duration — so an arbitrarily long turn that keeps streaming bytes is never
|
||||||
* disconnects (see ai-chat.controller), and the agent loop is bounded by
|
* cut; only a stream that goes quiet for longer than this is treated as a hang.
|
||||||
* MAX_AGENT_STEPS — so a turn still terminates; it just no longer dies at an
|
|
||||||
* arbitrary 5-minute wall-clock. keepAlive (undici default) is kept so the
|
|
||||||
* sequential per-step calls of one turn reuse the connection.
|
|
||||||
*
|
|
||||||
* A single shared dispatcher is returned (callers hold it for the service
|
|
||||||
* lifetime) so its connection pool is reused across requests.
|
|
||||||
*/
|
*/
|
||||||
/**
|
const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
|
||||||
* undici Agent options for streaming AI calls: both stream timeouts DISABLED (0)
|
|
||||||
* so a long turn is never severed at undici's 300s default (#175). Exported so a
|
|
||||||
* test can pin the contract without a timing-dependent assertion.
|
|
||||||
*/
|
|
||||||
export const STREAMING_DISPATCHER_OPTIONS = {
|
|
||||||
headersTimeout: 0,
|
|
||||||
bodyTimeout: 0,
|
|
||||||
} as const;
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a
|
||||||
|
* missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}.
|
||||||
|
*/
|
||||||
|
export function streamTimeoutMs(): number {
|
||||||
|
const raw = Number(process.env.AI_STREAM_TIMEOUT_MS);
|
||||||
|
return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* undici `Agent` timeout options for streaming AI traffic — both stream timeouts
|
||||||
|
* set to the (generous, finite) silence timeout. Shared by the chat provider
|
||||||
|
* fetch and the external-MCP dispatcher so they behave identically (#175).
|
||||||
|
*/
|
||||||
|
export function streamingDispatcherOptions(): {
|
||||||
|
headersTimeout: number;
|
||||||
|
bodyTimeout: number;
|
||||||
|
} {
|
||||||
|
const t = streamTimeoutMs();
|
||||||
|
return { headersTimeout: t, bodyTimeout: t };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed
|
||||||
|
* by a dedicated undici dispatcher whose stream timeouts are the generous-but-
|
||||||
|
* finite silence timeout above (#175). A single shared dispatcher is returned
|
||||||
|
* (callers hold it for the service lifetime) so its connection pool is reused.
|
||||||
|
*/
|
||||||
export function createStreamingFetch(): typeof fetch {
|
export function createStreamingFetch(): typeof fetch {
|
||||||
const dispatcher = new Agent({ ...STREAMING_DISPATCHER_OPTIONS });
|
const dispatcher = new Agent(streamingDispatcherOptions());
|
||||||
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
|
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
|
||||||
fetch(input, {
|
fetch(input, {
|
||||||
...(init ?? {}),
|
...(init ?? {}),
|
||||||
|
|||||||
Reference in New Issue
Block a user