Compare commits
11 Commits
dd64c2ea05
...
a670de7638
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a670de7638 | ||
|
|
9703fc2b36 | ||
|
|
86e631034c | ||
|
|
524d69d089 | ||
|
|
6c6253caac | ||
|
|
791bff419a | ||
|
|
4ba8991bca | ||
|
|
c79e7e549d | ||
|
|
443ad3a856 | ||
|
|
66225294b2 | ||
|
|
25121e269e |
13
.env.example
13
.env.example
@@ -149,19 +149,6 @@ MCP_DOCMOST_PASSWORD=
|
||||
# your egress drops idle connections faster than ~10s. Default 10000 (10 s).
|
||||
# AI_STREAM_KEEPALIVE_MS=10000
|
||||
|
||||
# Silence timeout (ms) for EXTERNAL-MCP transport ONLY (not the chat provider).
|
||||
# Tighter than AI_STREAM_TIMEOUT_MS so a byte-silent/hung MCP server is broken in
|
||||
# ~5 min instead of 15. Note it also cuts a legitimately long but byte-silent
|
||||
# single tool call (a slow crawl that emits nothing until done) and an SSE
|
||||
# transport idling >5 min BETWEEN tool calls. Default 300000 (5 min).
|
||||
# AI_MCP_STREAM_TIMEOUT_MS=300000
|
||||
|
||||
# Total wall-clock cap (ms) for ONE external MCP tool call (app-level, not
|
||||
# transport). Aborts a tool that keeps the socket warm (SSE heartbeats / trickle)
|
||||
# but never returns a result — which the silence timeout above never breaks.
|
||||
# Default 900000 (15 min).
|
||||
# AI_MCP_CALL_TIMEOUT_MS=900000
|
||||
|
||||
# --- Anonymous public-share AI assistant ---
|
||||
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
||||
# When enabled, anonymous visitors of a published share can ask an AI about that
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"start": "cross-env NODE_ENV=development nest start",
|
||||
"start:dev": "cross-env NODE_ENV=development nest start --watch",
|
||||
"start:debug": "cross-env NODE_ENV=development nest start --debug --watch",
|
||||
"start:prod": "cross-env NODE_ENV=production node --heapsnapshot-near-heap-limit=2 dist/main",
|
||||
"start:prod": "cross-env NODE_ENV=production node dist/main",
|
||||
"collab:prod": "cross-env NODE_ENV=production node dist/collaboration/server/collab-main",
|
||||
"collab:dev": "cross-env NODE_ENV=development node dist/collaboration/server/collab-main",
|
||||
"email:dev": "email dev -p 5019 -d ./src/integrations/transactional/emails",
|
||||
|
||||
@@ -575,19 +575,6 @@ export class AiChatService {
|
||||
},
|
||||
});
|
||||
|
||||
// Drain the stream independently of the client socket so the turn always
|
||||
// runs to completion (or to its abort) and the terminal callbacks
|
||||
// (onFinish/onError/onAbort) fire — releasing the per-turn object graph
|
||||
// (history, the per-request toolset closures, captured steps, SDK buffers)
|
||||
// and closing leased MCP clients. WITHOUT this, a client disconnect leaves
|
||||
// the pipe's dead socket as the only reader; backpressure stalls the stream,
|
||||
// the callbacks never run, and every dropped turn stays rooted in memory —
|
||||
// the heap-OOM leak. consumeStream removes that backpressure (AI SDK v6
|
||||
// "Handling client disconnects"). NOT awaited (fire-and-forget); the stream
|
||||
// errors are already logged by the streamText `onError` callback above, so
|
||||
// swallow here to avoid an unhandledRejection.
|
||||
void result.consumeStream({ onError: () => undefined });
|
||||
|
||||
// Stream the UI-message protocol straight to the hijacked Node response.
|
||||
// Without onError the AI SDK masks the cause ('An error occurred.') and the
|
||||
// UI shows a generic failure. Surface the real provider message instead.
|
||||
|
||||
@@ -1,205 +0,0 @@
|
||||
import { type Tool, type ToolCallOptions } from 'ai';
|
||||
import {
|
||||
wrapToolWithCallTimeout,
|
||||
wrapToolsWithCallTimeout,
|
||||
} from './mcp-clients.service';
|
||||
import {
|
||||
mcpStreamTimeoutMs,
|
||||
mcpCallTimeoutMs,
|
||||
} from '../../../integrations/ai/ai-streaming-fetch';
|
||||
|
||||
/**
|
||||
* Per-call total-timeout guard for external MCP tools (mcp-clients.service).
|
||||
*
|
||||
* `@ai-sdk/mcp`'s tool execute has NO built-in per-call timeout — a tool that
|
||||
* keeps the connection warm but never returns is otherwise unbounded. The
|
||||
* wrapper attaches a fresh AbortController + timer per CALL and composes it with
|
||||
* the turn's abortSignal via AbortSignal.any, so EITHER the per-call timeout OR a
|
||||
* client disconnect aborts the in-flight call.
|
||||
*
|
||||
* Fake timers prove the timeout fires WITHOUT real waiting; no leaked timer keeps
|
||||
* the process alive after a fast resolve.
|
||||
*/
|
||||
const CALL_TIMEOUT_MS = 900_000;
|
||||
|
||||
/** Build a Tool around an `execute` impl, mirroring the SDK's minimal shape. */
|
||||
function toolWith(
|
||||
execute: (args: unknown, options: ToolCallOptions) => unknown,
|
||||
): Tool {
|
||||
return { description: 'x', inputSchema: undefined, execute } as unknown as Tool;
|
||||
}
|
||||
|
||||
/** Invoke a (possibly wrapped) tool's execute with an optional turn signal. */
|
||||
function callExecute(
|
||||
tool: Tool,
|
||||
args: unknown,
|
||||
abortSignal?: AbortSignal,
|
||||
): unknown {
|
||||
const execute = tool.execute as (
|
||||
args: unknown,
|
||||
options: ToolCallOptions,
|
||||
) => unknown;
|
||||
return execute(args, { abortSignal } as ToolCallOptions);
|
||||
}
|
||||
|
||||
describe('wrapToolWithCallTimeout', () => {
|
||||
beforeEach(() => jest.useFakeTimers());
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('aborts a tool that only rejects when its abortSignal fires, after ms elapses', async () => {
|
||||
// The tool resolves NEVER on its own — it only settles when the abortSignal
|
||||
// it is handed aborts. So a resolution proves the per-call timer fired and
|
||||
// aborted the call (not the tool finishing by itself).
|
||||
let received: AbortSignal | undefined;
|
||||
const tool = toolWith((_args, options) => {
|
||||
received = options.abortSignal;
|
||||
return new Promise((_resolve, reject) => {
|
||||
options.abortSignal?.addEventListener('abort', () => {
|
||||
reject(options.abortSignal?.reason ?? new Error('aborted'));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||
const promise = callExecute(wrapped, { q: 'x' }) as Promise<unknown>;
|
||||
// Attach the rejection handler synchronously so advancing timers cannot mark
|
||||
// it an unhandled rejection.
|
||||
const settled = promise.then(
|
||||
() => ({ ok: true as const }),
|
||||
(err: unknown) => ({ ok: false as const, err }),
|
||||
);
|
||||
|
||||
// Nothing fired yet.
|
||||
jest.advanceTimersByTime(CALL_TIMEOUT_MS - 1);
|
||||
// Past the cap -> the per-call timer aborts the composed signal.
|
||||
jest.advanceTimersByTime(2);
|
||||
|
||||
const result = await settled;
|
||||
expect(result.ok).toBe(false);
|
||||
expect(received).toBeInstanceOf(AbortSignal);
|
||||
// The abort reason / rejection mentions the timeout.
|
||||
const message =
|
||||
(result as { err: unknown }).err instanceof Error
|
||||
? ((result as { err: Error }).err.message)
|
||||
: String((result as { err: unknown }).err);
|
||||
expect(message).toMatch(/timed out after 900000ms/);
|
||||
});
|
||||
|
||||
it('aborts a REAL-client-style tool that never settles and ignores abort (race fix)', async () => {
|
||||
// Models the ACTUAL @ai-sdk/mcp semantics: its in-flight promise does NOT
|
||||
// reject on abort (it only checks the signal when a response arrives), so a
|
||||
// warm-but-stuck call NEVER settles on its own and does NOT listen to the
|
||||
// abort signal. The wrapper must still reject after `ms` via the race — an
|
||||
// implementation that merely `await original(...)` would hang here forever.
|
||||
// This test FAILS against the old await-only code and PASSES with the race.
|
||||
const tool = toolWith(() => new Promise(() => {})); // never settles, no abort
|
||||
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||
const promise = callExecute(wrapped, { q: 'x' }) as Promise<unknown>;
|
||||
// Assert the rejection without hanging: drive fake time async so the timer's
|
||||
// abort -> race rejection microtasks flush, then await the rejection.
|
||||
const expectation = expect(promise).rejects.toThrow(/timed out after 900000ms/);
|
||||
await jest.advanceTimersByTimeAsync(CALL_TIMEOUT_MS + 1);
|
||||
await expectation;
|
||||
});
|
||||
|
||||
it('passes a fast tool through and leaks no timer (advancing later does not throw)', async () => {
|
||||
const tool = toolWith(() => Promise.resolve('fast-result'));
|
||||
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||
|
||||
const value = await (callExecute(wrapped, {}) as Promise<unknown>);
|
||||
expect(value).toBe('fast-result');
|
||||
|
||||
// The timer was cleared in the finally — advancing past the cap aborts
|
||||
// nothing and throws nothing.
|
||||
expect(() => jest.advanceTimersByTime(CALL_TIMEOUT_MS * 2)).not.toThrow();
|
||||
});
|
||||
|
||||
it('aborts when the caller turn signal aborts before the timeout (disconnect path)', async () => {
|
||||
// Real-client semantics: the tool never settles and does NOT listen to abort,
|
||||
// so the wrapper must reject via the race when the caller's turn signal (a
|
||||
// client disconnect) aborts BEFORE the per-call cap. The race propagates the
|
||||
// caller's abort reason.
|
||||
const tool = toolWith(() => new Promise(() => {})); // never settles, no abort
|
||||
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||
const turn = new AbortController();
|
||||
const promise = callExecute(wrapped, {}, turn.signal) as Promise<unknown>;
|
||||
const settled = promise.then(
|
||||
() => ({ ok: true as const }),
|
||||
(err: unknown) => ({ ok: false as const, err }),
|
||||
);
|
||||
|
||||
// Disconnect well before the cap; the per-call timer never fires here.
|
||||
turn.abort(new Error('client disconnected'));
|
||||
const result = await settled;
|
||||
expect(result.ok).toBe(false);
|
||||
const message =
|
||||
(result as { err: unknown }).err instanceof Error
|
||||
? (result as { err: Error }).err.message
|
||||
: String((result as { err: unknown }).err);
|
||||
// The caller's abort reason propagates through the race.
|
||||
expect(message).toMatch(/client disconnected/);
|
||||
});
|
||||
|
||||
it('passes a tool with no execute through unchanged', () => {
|
||||
const noExecute = { description: 'x', inputSchema: undefined } as unknown as Tool;
|
||||
const wrapped = wrapToolWithCallTimeout(noExecute, CALL_TIMEOUT_MS);
|
||||
// Same object back, execute still absent.
|
||||
expect(wrapped).toBe(noExecute);
|
||||
expect((wrapped as { execute?: unknown }).execute).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('wrapToolsWithCallTimeout', () => {
|
||||
beforeEach(() => jest.useFakeTimers());
|
||||
afterEach(() => {
|
||||
jest.clearAllTimers();
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
it('wraps every tool in the map (each call gets its own guard)', async () => {
|
||||
const tools: Record<string, Tool> = {
|
||||
a: toolWith(() => Promise.resolve('A')),
|
||||
b: toolWith(() => Promise.resolve('B')),
|
||||
};
|
||||
const out = wrapToolsWithCallTimeout(tools, CALL_TIMEOUT_MS);
|
||||
expect(Object.keys(out)).toEqual(['a', 'b']);
|
||||
expect(await (callExecute(out.a, {}) as Promise<unknown>)).toBe('A');
|
||||
expect(await (callExecute(out.b, {}) as Promise<unknown>)).toBe('B');
|
||||
});
|
||||
});
|
||||
|
||||
describe('mcp timeout env helpers', () => {
|
||||
const ORIG_SILENCE = process.env.AI_MCP_STREAM_TIMEOUT_MS;
|
||||
const ORIG_CALL = process.env.AI_MCP_CALL_TIMEOUT_MS;
|
||||
afterEach(() => {
|
||||
if (ORIG_SILENCE === undefined) delete process.env.AI_MCP_STREAM_TIMEOUT_MS;
|
||||
else process.env.AI_MCP_STREAM_TIMEOUT_MS = ORIG_SILENCE;
|
||||
if (ORIG_CALL === undefined) delete process.env.AI_MCP_CALL_TIMEOUT_MS;
|
||||
else process.env.AI_MCP_CALL_TIMEOUT_MS = ORIG_CALL;
|
||||
});
|
||||
|
||||
it('mcpStreamTimeoutMs defaults to 5 min and honors a positive override', () => {
|
||||
delete process.env.AI_MCP_STREAM_TIMEOUT_MS;
|
||||
expect(mcpStreamTimeoutMs()).toBe(300_000);
|
||||
process.env.AI_MCP_STREAM_TIMEOUT_MS = '60000';
|
||||
expect(mcpStreamTimeoutMs()).toBe(60_000);
|
||||
for (const bad of ['0', '-1', 'x', '']) {
|
||||
process.env.AI_MCP_STREAM_TIMEOUT_MS = bad;
|
||||
expect(mcpStreamTimeoutMs()).toBe(300_000);
|
||||
}
|
||||
});
|
||||
|
||||
it('mcpCallTimeoutMs defaults to 15 min and honors a positive override', () => {
|
||||
delete process.env.AI_MCP_CALL_TIMEOUT_MS;
|
||||
expect(mcpCallTimeoutMs()).toBe(900_000);
|
||||
process.env.AI_MCP_CALL_TIMEOUT_MS = '120000';
|
||||
expect(mcpCallTimeoutMs()).toBe(120_000);
|
||||
for (const bad of ['0', '-1', 'x', '']) {
|
||||
process.env.AI_MCP_CALL_TIMEOUT_MS = bad;
|
||||
expect(mcpCallTimeoutMs()).toBe(900_000);
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -1,16 +1,12 @@
|
||||
import { isIP } from 'node:net';
|
||||
import { lookup as dnsLookup, type LookupAddress } from 'node:dns';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { type Tool, type ToolCallOptions } from 'ai';
|
||||
import { type Tool } from 'ai';
|
||||
import { createMCPClient } from '@ai-sdk/mcp';
|
||||
import { Agent, type Dispatcher } from 'undici';
|
||||
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
|
||||
import { AiMcpServer } from '@docmost/db/types/entity.types';
|
||||
import {
|
||||
streamingDispatcherOptions,
|
||||
mcpStreamTimeoutMs,
|
||||
mcpCallTimeoutMs,
|
||||
} from '../../../integrations/ai/ai-streaming-fetch';
|
||||
import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch';
|
||||
import { SecretBoxService } from '../../../integrations/crypto/secret-box';
|
||||
import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
|
||||
|
||||
@@ -251,8 +247,6 @@ export class McpClientsService {
|
||||
const tools: Record<string, Tool> = {};
|
||||
const clients: McpClient[] = [];
|
||||
const outcomes: ServerOutcome[] = [];
|
||||
// Per-call total wall-clock cap, read once for this build (env-overridable).
|
||||
const callTimeoutMs = mcpCallTimeoutMs();
|
||||
const instructions: McpServerInstruction[] = [];
|
||||
|
||||
for (const server of servers) {
|
||||
@@ -263,16 +257,12 @@ export class McpClientsService {
|
||||
const allow = server.toolAllowlist;
|
||||
const picked =
|
||||
Array.isArray(allow) && allow.length > 0 ? pick(raw, allow) : raw;
|
||||
// Bound each tool's execute with a per-call total-timeout guard before
|
||||
// merging, so a single chatty-but-stuck call is aborted after the cap.
|
||||
const guarded = wrapToolsWithCallTimeout(picked, callTimeoutMs);
|
||||
// Namespace each tool with the sanitized server name AND disambiguate
|
||||
// against names already merged from earlier servers, so no external
|
||||
// tool is silently overwritten on collision. The returned count drives
|
||||
// whether this server's prompt guidance is included (≥1 tool merged).
|
||||
// tool is silently overwritten on collision.
|
||||
const merged = this.mergeNamespaced(
|
||||
tools,
|
||||
guarded,
|
||||
picked,
|
||||
server.name,
|
||||
server.id,
|
||||
);
|
||||
@@ -459,21 +449,17 @@ export function validateResolvedAddresses(addrs: readonly LookupAddress[]): {
|
||||
* to an IP literal).
|
||||
*/
|
||||
function buildPinnedDispatcher(): Agent {
|
||||
// External-MCP traffic uses a DEDICATED, shorter silence timeout
|
||||
// (`AI_MCP_STREAM_TIMEOUT_MS`, default 5 min) — deliberately tighter than the
|
||||
// chat provider's 15-min `streamTimeoutMs()` — so a byte-silent/hung MCP
|
||||
// upstream is broken in ~5 min instead of 15. We keep the keep-alive options
|
||||
// from `streamingDispatcherOptions()` but OVERRIDE headers/body timeouts.
|
||||
// Accepted trade-off: a legitimately long but byte-silent single tool call,
|
||||
// and an SSE transport idling >5 min BETWEEN tool calls, are also cut here; the
|
||||
// per-call total cap (wrapToolsWithCallTimeout, `AI_MCP_CALL_TIMEOUT_MS`) is the
|
||||
// complementary guard for chatty-but-stuck calls that keep the socket warm yet
|
||||
// never return.
|
||||
const mcpSilenceMs = mcpStreamTimeoutMs();
|
||||
return new Agent({
|
||||
// Raise undici's default 300s headers/body timeouts on external MCP traffic
|
||||
// to the same generous-but-finite silence timeout the chat fetch uses (#175).
|
||||
// A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open
|
||||
// across the whole turn; that connection can idle BETWEEN tool calls longer
|
||||
// than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a
|
||||
// tool-call failure that aborts the streamed turn and shows the user "Lost
|
||||
// connection to the AI provider". A slow single tool call (a crawl) can
|
||||
// likewise exceed headersTimeout. The timeout stays FINITE so a genuinely
|
||||
// hung server is still broken eventually.
|
||||
...streamingDispatcherOptions(),
|
||||
headersTimeout: mcpSilenceMs,
|
||||
bodyTimeout: mcpSilenceMs,
|
||||
connect: {
|
||||
lookup: (hostname, _options, callback) => {
|
||||
// Always resolve ALL addresses ourselves; do not trust the caller's
|
||||
@@ -644,78 +630,6 @@ function disambiguate(
|
||||
return capName(`${name.slice(0, MAX_TOOL_NAME_LENGTH - 14)}_${Date.now()}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap every tool's execute with a per-call total-timeout guard so a single
|
||||
* external MCP tool call that keeps the connection warm but never returns is
|
||||
* aborted after `ms` wall-clock (complements the transport silence timeout).
|
||||
*/
|
||||
export function wrapToolsWithCallTimeout(
|
||||
tools: Record<string, Tool>,
|
||||
ms: number,
|
||||
): Record<string, Tool> {
|
||||
const out: Record<string, Tool> = {};
|
||||
for (const [name, t] of Object.entries(tools)) {
|
||||
out[name] = wrapToolWithCallTimeout(t, ms);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-call total-timeout wrapper for one MCP tool. A fresh AbortController +
|
||||
* timer bounds the call; it is composed with the turn's abortSignal via
|
||||
* AbortSignal.any so EITHER the per-call timeout OR a client disconnect aborts
|
||||
* the call. We RACE the call against the composed abort signal rather than just
|
||||
* awaiting it, because @ai-sdk/mcp does NOT settle its in-flight promise on abort
|
||||
* (verified in @ai-sdk/mcp@1.0.52: request() only does throwIfAborted() once
|
||||
* before send and only re-checks the signal inside the response-message handler,
|
||||
* which runs ONLY when a response arrives). So for a warm-but-stuck call awaiting
|
||||
* `original` alone would hang forever even after the timer aborts.
|
||||
*/
|
||||
export function wrapToolWithCallTimeout(tool: Tool, ms: number): Tool {
|
||||
const original = tool.execute;
|
||||
if (typeof original !== 'function') return tool;
|
||||
const execute = async (args: unknown, options: ToolCallOptions) => {
|
||||
const controller = new AbortController();
|
||||
const timer = setTimeout(() => {
|
||||
controller.abort(new Error(`MCP tool call timed out after ${ms}ms`));
|
||||
}, ms);
|
||||
timer.unref?.();
|
||||
const abortSignal = options?.abortSignal
|
||||
? AbortSignal.any([options.abortSignal, controller.signal])
|
||||
: controller.signal;
|
||||
// Reject as soon as the composed signal fires, independent of whether
|
||||
// `original` ever settles. The losing `original` promise is left pending; it
|
||||
// is cleaned up when the client is closed at turn end, and Promise.race
|
||||
// attaches a rejection handler to BOTH inputs so a late rejection of either
|
||||
// is never an unhandled rejection (do NOT add an extra .catch — it could
|
||||
// swallow the real result and would break the race semantics).
|
||||
const aborted = new Promise<never>((_, reject) => {
|
||||
const fail = () => reject(abortReason(abortSignal));
|
||||
if (abortSignal.aborted) fail();
|
||||
else abortSignal.addEventListener('abort', fail, { once: true });
|
||||
});
|
||||
try {
|
||||
return await Promise.race([
|
||||
original(args, { ...options, abortSignal }),
|
||||
aborted,
|
||||
]);
|
||||
} finally {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
};
|
||||
// `Tool` is a union whose `execute` overloads conflict; cast narrowly so the
|
||||
// wrapped tool keeps every other field while swapping only `execute`.
|
||||
return { ...tool, execute } as unknown as Tool;
|
||||
}
|
||||
|
||||
/** The signal's reason as an Error (informative thrown value on abort/timeout). */
|
||||
function abortReason(signal: AbortSignal): Error {
|
||||
const r = signal.reason;
|
||||
return r instanceof Error
|
||||
? r
|
||||
: new Error(typeof r === 'string' ? r : 'MCP tool call aborted');
|
||||
}
|
||||
|
||||
/** Reject a promise after `ms`, so a hung connect/tools() never stalls a turn. */
|
||||
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
|
||||
@@ -244,15 +244,6 @@ export class PublicShareChatService {
|
||||
},
|
||||
});
|
||||
|
||||
// Drain the stream independently of the client socket so the turn always
|
||||
// runs to completion (or to its abort) even when the anonymous client
|
||||
// disconnects — otherwise the dead socket is the only reader, backpressure
|
||||
// stalls the stream, and the per-turn object graph stays rooted (heap-OOM
|
||||
// leak). consumeStream removes that backpressure (AI SDK v6 "Handling
|
||||
// client disconnects"). Fire-and-forget; stream errors are already logged
|
||||
// by the streamText `onError` callback above.
|
||||
void result.consumeStream({ onError: () => undefined });
|
||||
|
||||
// Stream the UI-message protocol straight to the hijacked Node response.
|
||||
// Surface the real provider message (AI SDK error bodies never carry the
|
||||
// API key, so this is safe; we never dump the resolved config).
|
||||
|
||||
@@ -70,47 +70,6 @@ export function streamKeepAliveMs(): number {
|
||||
return positiveEnv('AI_STREAM_KEEPALIVE_MS', DEFAULT_STREAM_KEEPALIVE_MS);
|
||||
}
|
||||
|
||||
/** Default SILENCE timeout for EXTERNAL-MCP transport (5 min). */
|
||||
const DEFAULT_MCP_STREAM_TIMEOUT_MS = 300_000;
|
||||
|
||||
/** Default total wall-clock cap for ONE external MCP tool call (15 min). */
|
||||
const DEFAULT_MCP_CALL_TIMEOUT_MS = 900_000;
|
||||
|
||||
/**
|
||||
* SILENCE timeout (ms) for EXTERNAL-MCP transport ONLY. Override with
|
||||
* `AI_MCP_STREAM_TIMEOUT_MS`; a missing/invalid/non-positive value falls back to
|
||||
* {@link DEFAULT_MCP_STREAM_TIMEOUT_MS} (5 min).
|
||||
*
|
||||
* Deliberately tighter than the chat provider's {@link streamTimeoutMs} (15 min)
|
||||
* so a byte-silent/hung MCP upstream is broken in ~5 min instead of 15. This is
|
||||
* the undici `headersTimeout`/`bodyTimeout` for the external-MCP dispatcher only
|
||||
* — it must NOT change the chat provider, which legitimately needs 15 min between
|
||||
* reasoning chunks (#175).
|
||||
*
|
||||
* Trade-off: a legitimately long but byte-silent single tool call (a slow crawl
|
||||
* that emits nothing until done) and an SSE transport that idles >5 min BETWEEN
|
||||
* tool calls are also cut here. The per-call total cap ({@link mcpCallTimeoutMs},
|
||||
* applied in mcp-clients.service) is the complementary guard for chatty-but-stuck
|
||||
* calls that keep the socket warm yet never return.
|
||||
*/
|
||||
export function mcpStreamTimeoutMs(): number {
|
||||
return positiveEnv('AI_MCP_STREAM_TIMEOUT_MS', DEFAULT_MCP_STREAM_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Total wall-clock cap (ms) for ONE external MCP tool call — APP-LEVEL, not
|
||||
* transport. Override with `AI_MCP_CALL_TIMEOUT_MS`; a missing/invalid/
|
||||
* non-positive value falls back to {@link DEFAULT_MCP_CALL_TIMEOUT_MS} (15 min).
|
||||
*
|
||||
* Catches a tool that keeps the connection warm (SSE heartbeats / trickle) but
|
||||
* never returns a result — which the transport silence timeout
|
||||
* ({@link mcpStreamTimeoutMs}) would never break because the socket never goes
|
||||
* byte-silent.
|
||||
*/
|
||||
export function mcpCallTimeoutMs(): number {
|
||||
return positiveEnv('AI_MCP_CALL_TIMEOUT_MS', DEFAULT_MCP_CALL_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* undici `Agent` options for streaming AI traffic — the (generous, finite)
|
||||
* silence timeouts plus the keep-alive recycle window. Shared by the chat
|
||||
|
||||
Reference in New Issue
Block a user