diff --git a/.env.example b/.env.example index 97e8dba8..834ba7d7 100644 --- a/.env.example +++ b/.env.example @@ -149,6 +149,19 @@ MCP_DOCMOST_PASSWORD= # your egress drops idle connections faster than ~10s. Default 10000 (10 s). # AI_STREAM_KEEPALIVE_MS=10000 +# Silence timeout (ms) for EXTERNAL-MCP transport ONLY (not the chat provider). +# Tighter than AI_STREAM_TIMEOUT_MS so a byte-silent/hung MCP server is broken in +# ~5 min instead of 15. Note it also cuts a legitimately long but byte-silent +# single tool call (a slow crawl that emits nothing until done) and an SSE +# transport idling >5 min BETWEEN tool calls. Default 300000 (5 min). +# AI_MCP_STREAM_TIMEOUT_MS=300000 + +# Total wall-clock cap (ms) for ONE external MCP tool call (app-level, not +# transport). Aborts a tool that keeps the socket warm (SSE heartbeats / trickle) +# but never returns a result — which the silence timeout above never breaks. +# Default 900000 (15 min). +# AI_MCP_CALL_TIMEOUT_MS=900000 + # --- Anonymous public-share AI assistant --- # Opt-in per workspace (AI settings -> "public share assistant"; off by default). # When enabled, anonymous visitors of a published share can ask an AI about that diff --git a/apps/server/src/core/ai-chat/external-mcp/mcp-call-timeout.spec.ts b/apps/server/src/core/ai-chat/external-mcp/mcp-call-timeout.spec.ts new file mode 100644 index 00000000..d5880ae0 --- /dev/null +++ b/apps/server/src/core/ai-chat/external-mcp/mcp-call-timeout.spec.ts @@ -0,0 +1,205 @@ +import { type Tool, type ToolCallOptions } from 'ai'; +import { + wrapToolWithCallTimeout, + wrapToolsWithCallTimeout, +} from './mcp-clients.service'; +import { + mcpStreamTimeoutMs, + mcpCallTimeoutMs, +} from '../../../integrations/ai/ai-streaming-fetch'; + +/** + * Per-call total-timeout guard for external MCP tools (mcp-clients.service). + * + * `@ai-sdk/mcp`'s tool execute has NO built-in per-call timeout — a tool that + * keeps the connection warm but never returns is otherwise unbounded. The + * wrapper attaches a fresh AbortController + timer per CALL and composes it with + * the turn's abortSignal via AbortSignal.any, so EITHER the per-call timeout OR a + * client disconnect aborts the in-flight call. + * + * Fake timers prove the timeout fires WITHOUT real waiting; no leaked timer keeps + * the process alive after a fast resolve. + */ +const CALL_TIMEOUT_MS = 900_000; + +/** Build a Tool around an `execute` impl, mirroring the SDK's minimal shape. */ +function toolWith( + execute: (args: unknown, options: ToolCallOptions) => unknown, +): Tool { + return { description: 'x', inputSchema: undefined, execute } as unknown as Tool; +} + +/** Invoke a (possibly wrapped) tool's execute with an optional turn signal. */ +function callExecute( + tool: Tool, + args: unknown, + abortSignal?: AbortSignal, +): unknown { + const execute = tool.execute as ( + args: unknown, + options: ToolCallOptions, + ) => unknown; + return execute(args, { abortSignal } as ToolCallOptions); +} + +describe('wrapToolWithCallTimeout', () => { + beforeEach(() => jest.useFakeTimers()); + afterEach(() => { + jest.clearAllTimers(); + jest.useRealTimers(); + }); + + it('aborts a tool that only rejects when its abortSignal fires, after ms elapses', async () => { + // The tool resolves NEVER on its own — it only settles when the abortSignal + // it is handed aborts. So a resolution proves the per-call timer fired and + // aborted the call (not the tool finishing by itself). + let received: AbortSignal | undefined; + const tool = toolWith((_args, options) => { + received = options.abortSignal; + return new Promise((_resolve, reject) => { + options.abortSignal?.addEventListener('abort', () => { + reject(options.abortSignal?.reason ?? new Error('aborted')); + }); + }); + }); + + const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS); + const promise = callExecute(wrapped, { q: 'x' }) as Promise; + // Attach the rejection handler synchronously so advancing timers cannot mark + // it an unhandled rejection. + const settled = promise.then( + () => ({ ok: true as const }), + (err: unknown) => ({ ok: false as const, err }), + ); + + // Nothing fired yet. + jest.advanceTimersByTime(CALL_TIMEOUT_MS - 1); + // Past the cap -> the per-call timer aborts the composed signal. + jest.advanceTimersByTime(2); + + const result = await settled; + expect(result.ok).toBe(false); + expect(received).toBeInstanceOf(AbortSignal); + // The abort reason / rejection mentions the timeout. + const message = + (result as { err: unknown }).err instanceof Error + ? ((result as { err: Error }).err.message) + : String((result as { err: unknown }).err); + expect(message).toMatch(/timed out after 900000ms/); + }); + + it('aborts a REAL-client-style tool that never settles and ignores abort (race fix)', async () => { + // Models the ACTUAL @ai-sdk/mcp semantics: its in-flight promise does NOT + // reject on abort (it only checks the signal when a response arrives), so a + // warm-but-stuck call NEVER settles on its own and does NOT listen to the + // abort signal. The wrapper must still reject after `ms` via the race — an + // implementation that merely `await original(...)` would hang here forever. + // This test FAILS against the old await-only code and PASSES with the race. + const tool = toolWith(() => new Promise(() => {})); // never settles, no abort + const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS); + const promise = callExecute(wrapped, { q: 'x' }) as Promise; + // Assert the rejection without hanging: drive fake time async so the timer's + // abort -> race rejection microtasks flush, then await the rejection. + const expectation = expect(promise).rejects.toThrow(/timed out after 900000ms/); + await jest.advanceTimersByTimeAsync(CALL_TIMEOUT_MS + 1); + await expectation; + }); + + it('passes a fast tool through and leaks no timer (advancing later does not throw)', async () => { + const tool = toolWith(() => Promise.resolve('fast-result')); + const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS); + + const value = await (callExecute(wrapped, {}) as Promise); + expect(value).toBe('fast-result'); + + // The timer was cleared in the finally — advancing past the cap aborts + // nothing and throws nothing. + expect(() => jest.advanceTimersByTime(CALL_TIMEOUT_MS * 2)).not.toThrow(); + }); + + it('aborts when the caller turn signal aborts before the timeout (disconnect path)', async () => { + // Real-client semantics: the tool never settles and does NOT listen to abort, + // so the wrapper must reject via the race when the caller's turn signal (a + // client disconnect) aborts BEFORE the per-call cap. The race propagates the + // caller's abort reason. + const tool = toolWith(() => new Promise(() => {})); // never settles, no abort + const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS); + const turn = new AbortController(); + const promise = callExecute(wrapped, {}, turn.signal) as Promise; + const settled = promise.then( + () => ({ ok: true as const }), + (err: unknown) => ({ ok: false as const, err }), + ); + + // Disconnect well before the cap; the per-call timer never fires here. + turn.abort(new Error('client disconnected')); + const result = await settled; + expect(result.ok).toBe(false); + const message = + (result as { err: unknown }).err instanceof Error + ? (result as { err: Error }).err.message + : String((result as { err: unknown }).err); + // The caller's abort reason propagates through the race. + expect(message).toMatch(/client disconnected/); + }); + + it('passes a tool with no execute through unchanged', () => { + const noExecute = { description: 'x', inputSchema: undefined } as unknown as Tool; + const wrapped = wrapToolWithCallTimeout(noExecute, CALL_TIMEOUT_MS); + // Same object back, execute still absent. + expect(wrapped).toBe(noExecute); + expect((wrapped as { execute?: unknown }).execute).toBeUndefined(); + }); +}); + +describe('wrapToolsWithCallTimeout', () => { + beforeEach(() => jest.useFakeTimers()); + afterEach(() => { + jest.clearAllTimers(); + jest.useRealTimers(); + }); + + it('wraps every tool in the map (each call gets its own guard)', async () => { + const tools: Record = { + a: toolWith(() => Promise.resolve('A')), + b: toolWith(() => Promise.resolve('B')), + }; + const out = wrapToolsWithCallTimeout(tools, CALL_TIMEOUT_MS); + expect(Object.keys(out)).toEqual(['a', 'b']); + expect(await (callExecute(out.a, {}) as Promise)).toBe('A'); + expect(await (callExecute(out.b, {}) as Promise)).toBe('B'); + }); +}); + +describe('mcp timeout env helpers', () => { + const ORIG_SILENCE = process.env.AI_MCP_STREAM_TIMEOUT_MS; + const ORIG_CALL = process.env.AI_MCP_CALL_TIMEOUT_MS; + afterEach(() => { + if (ORIG_SILENCE === undefined) delete process.env.AI_MCP_STREAM_TIMEOUT_MS; + else process.env.AI_MCP_STREAM_TIMEOUT_MS = ORIG_SILENCE; + if (ORIG_CALL === undefined) delete process.env.AI_MCP_CALL_TIMEOUT_MS; + else process.env.AI_MCP_CALL_TIMEOUT_MS = ORIG_CALL; + }); + + it('mcpStreamTimeoutMs defaults to 5 min and honors a positive override', () => { + delete process.env.AI_MCP_STREAM_TIMEOUT_MS; + expect(mcpStreamTimeoutMs()).toBe(300_000); + process.env.AI_MCP_STREAM_TIMEOUT_MS = '60000'; + expect(mcpStreamTimeoutMs()).toBe(60_000); + for (const bad of ['0', '-1', 'x', '']) { + process.env.AI_MCP_STREAM_TIMEOUT_MS = bad; + expect(mcpStreamTimeoutMs()).toBe(300_000); + } + }); + + it('mcpCallTimeoutMs defaults to 15 min and honors a positive override', () => { + delete process.env.AI_MCP_CALL_TIMEOUT_MS; + expect(mcpCallTimeoutMs()).toBe(900_000); + process.env.AI_MCP_CALL_TIMEOUT_MS = '120000'; + expect(mcpCallTimeoutMs()).toBe(120_000); + for (const bad of ['0', '-1', 'x', '']) { + process.env.AI_MCP_CALL_TIMEOUT_MS = bad; + expect(mcpCallTimeoutMs()).toBe(900_000); + } + }); +}); diff --git a/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts b/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts index fe83801b..88629ebe 100644 --- a/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts +++ b/apps/server/src/core/ai-chat/external-mcp/mcp-clients.service.ts @@ -1,12 +1,16 @@ import { isIP } from 'node:net'; import { lookup as dnsLookup, type LookupAddress } from 'node:dns'; import { Injectable, Logger } from '@nestjs/common'; -import { type Tool } from 'ai'; +import { type Tool, type ToolCallOptions } from 'ai'; import { createMCPClient } from '@ai-sdk/mcp'; import { Agent, type Dispatcher } from 'undici'; import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo'; import { AiMcpServer } from '@docmost/db/types/entity.types'; -import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch'; +import { + streamingDispatcherOptions, + mcpStreamTimeoutMs, + mcpCallTimeoutMs, +} from '../../../integrations/ai/ai-streaming-fetch'; import { SecretBoxService } from '../../../integrations/crypto/secret-box'; import { isUrlAllowed, isIpAllowed } from './ssrf-guard'; @@ -219,6 +223,8 @@ export class McpClientsService { const tools: Record = {}; const clients: McpClient[] = []; const outcomes: ServerOutcome[] = []; + // Per-call total wall-clock cap, read once for this build (env-overridable). + const callTimeoutMs = mcpCallTimeoutMs(); for (const server of servers) { try { @@ -230,10 +236,13 @@ export class McpClientsService { Array.isArray(allow) && allow.length > 0 ? pick(raw, allow) : raw; + // Bound each tool's execute with a per-call total-timeout guard before + // merging, so a single chatty-but-stuck call is aborted after the cap. + const guarded = wrapToolsWithCallTimeout(picked, callTimeoutMs); // Namespace each tool with the sanitized server name AND disambiguate // against names already merged from earlier servers, so no external // tool is silently overwritten on collision. - this.mergeNamespaced(tools, picked, server.name, server.id); + this.mergeNamespaced(tools, guarded, server.name, server.id); outcomes.push({ name: server.name, ok: true }); } catch (err) { // A failed server is skipped — the turn proceeds with the rest. Log a @@ -400,17 +409,21 @@ export function validateResolvedAddresses( * to an IP literal). */ function buildPinnedDispatcher(): Agent { + // External-MCP traffic uses a DEDICATED, shorter silence timeout + // (`AI_MCP_STREAM_TIMEOUT_MS`, default 5 min) — deliberately tighter than the + // chat provider's 15-min `streamTimeoutMs()` — so a byte-silent/hung MCP + // upstream is broken in ~5 min instead of 15. We keep the keep-alive options + // from `streamingDispatcherOptions()` but OVERRIDE headers/body timeouts. + // Accepted trade-off: a legitimately long but byte-silent single tool call, + // and an SSE transport idling >5 min BETWEEN tool calls, are also cut here; the + // per-call total cap (wrapToolsWithCallTimeout, `AI_MCP_CALL_TIMEOUT_MS`) is the + // complementary guard for chatty-but-stuck calls that keep the socket warm yet + // never return. + const mcpSilenceMs = mcpStreamTimeoutMs(); return new Agent({ - // Raise undici's default 300s headers/body timeouts on external MCP traffic - // to the same generous-but-finite silence timeout the chat fetch uses (#175). - // A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open - // across the whole turn; that connection can idle BETWEEN tool calls longer - // than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a - // tool-call failure that aborts the streamed turn and shows the user "Lost - // connection to the AI provider". A slow single tool call (a crawl) can - // likewise exceed headersTimeout. The timeout stays FINITE so a genuinely - // hung server is still broken eventually. ...streamingDispatcherOptions(), + headersTimeout: mcpSilenceMs, + bodyTimeout: mcpSilenceMs, connect: { lookup: (hostname, _options, callback) => { // Always resolve ALL addresses ourselves; do not trust the caller's @@ -572,6 +585,78 @@ function disambiguate( return capName(`${name.slice(0, MAX_TOOL_NAME_LENGTH - 14)}_${Date.now()}`); } +/** + * Wrap every tool's execute with a per-call total-timeout guard so a single + * external MCP tool call that keeps the connection warm but never returns is + * aborted after `ms` wall-clock (complements the transport silence timeout). + */ +export function wrapToolsWithCallTimeout( + tools: Record, + ms: number, +): Record { + const out: Record = {}; + for (const [name, t] of Object.entries(tools)) { + out[name] = wrapToolWithCallTimeout(t, ms); + } + return out; +} + +/** + * Per-call total-timeout wrapper for one MCP tool. A fresh AbortController + + * timer bounds the call; it is composed with the turn's abortSignal via + * AbortSignal.any so EITHER the per-call timeout OR a client disconnect aborts + * the call. We RACE the call against the composed abort signal rather than just + * awaiting it, because @ai-sdk/mcp does NOT settle its in-flight promise on abort + * (verified in @ai-sdk/mcp@1.0.52: request() only does throwIfAborted() once + * before send and only re-checks the signal inside the response-message handler, + * which runs ONLY when a response arrives). So for a warm-but-stuck call awaiting + * `original` alone would hang forever even after the timer aborts. + */ +export function wrapToolWithCallTimeout(tool: Tool, ms: number): Tool { + const original = tool.execute; + if (typeof original !== 'function') return tool; + const execute = async (args: unknown, options: ToolCallOptions) => { + const controller = new AbortController(); + const timer = setTimeout(() => { + controller.abort(new Error(`MCP tool call timed out after ${ms}ms`)); + }, ms); + timer.unref?.(); + const abortSignal = options?.abortSignal + ? AbortSignal.any([options.abortSignal, controller.signal]) + : controller.signal; + // Reject as soon as the composed signal fires, independent of whether + // `original` ever settles. The losing `original` promise is left pending; it + // is cleaned up when the client is closed at turn end, and Promise.race + // attaches a rejection handler to BOTH inputs so a late rejection of either + // is never an unhandled rejection (do NOT add an extra .catch — it could + // swallow the real result and would break the race semantics). + const aborted = new Promise((_, reject) => { + const fail = () => reject(abortReason(abortSignal)); + if (abortSignal.aborted) fail(); + else abortSignal.addEventListener('abort', fail, { once: true }); + }); + try { + return await Promise.race([ + original(args, { ...options, abortSignal }), + aborted, + ]); + } finally { + clearTimeout(timer); + } + }; + // `Tool` is a union whose `execute` overloads conflict; cast narrowly so the + // wrapped tool keeps every other field while swapping only `execute`. + return { ...tool, execute } as unknown as Tool; +} + +/** The signal's reason as an Error (informative thrown value on abort/timeout). */ +function abortReason(signal: AbortSignal): Error { + const r = signal.reason; + return r instanceof Error + ? r + : new Error(typeof r === 'string' ? r : 'MCP tool call aborted'); +} + /** Reject a promise after `ms`, so a hung connect/tools() never stalls a turn. */ function withTimeout(promise: Promise, ms: number): Promise { return new Promise((resolve, reject) => { diff --git a/apps/server/src/integrations/ai/ai-streaming-fetch.ts b/apps/server/src/integrations/ai/ai-streaming-fetch.ts index b781df9a..f24abd39 100644 --- a/apps/server/src/integrations/ai/ai-streaming-fetch.ts +++ b/apps/server/src/integrations/ai/ai-streaming-fetch.ts @@ -70,6 +70,47 @@ export function streamKeepAliveMs(): number { return positiveEnv('AI_STREAM_KEEPALIVE_MS', DEFAULT_STREAM_KEEPALIVE_MS); } +/** Default SILENCE timeout for EXTERNAL-MCP transport (5 min). */ +const DEFAULT_MCP_STREAM_TIMEOUT_MS = 300_000; + +/** Default total wall-clock cap for ONE external MCP tool call (15 min). */ +const DEFAULT_MCP_CALL_TIMEOUT_MS = 900_000; + +/** + * SILENCE timeout (ms) for EXTERNAL-MCP transport ONLY. Override with + * `AI_MCP_STREAM_TIMEOUT_MS`; a missing/invalid/non-positive value falls back to + * {@link DEFAULT_MCP_STREAM_TIMEOUT_MS} (5 min). + * + * Deliberately tighter than the chat provider's {@link streamTimeoutMs} (15 min) + * so a byte-silent/hung MCP upstream is broken in ~5 min instead of 15. This is + * the undici `headersTimeout`/`bodyTimeout` for the external-MCP dispatcher only + * — it must NOT change the chat provider, which legitimately needs 15 min between + * reasoning chunks (#175). + * + * Trade-off: a legitimately long but byte-silent single tool call (a slow crawl + * that emits nothing until done) and an SSE transport that idles >5 min BETWEEN + * tool calls are also cut here. The per-call total cap ({@link mcpCallTimeoutMs}, + * applied in mcp-clients.service) is the complementary guard for chatty-but-stuck + * calls that keep the socket warm yet never return. + */ +export function mcpStreamTimeoutMs(): number { + return positiveEnv('AI_MCP_STREAM_TIMEOUT_MS', DEFAULT_MCP_STREAM_TIMEOUT_MS); +} + +/** + * Total wall-clock cap (ms) for ONE external MCP tool call — APP-LEVEL, not + * transport. Override with `AI_MCP_CALL_TIMEOUT_MS`; a missing/invalid/ + * non-positive value falls back to {@link DEFAULT_MCP_CALL_TIMEOUT_MS} (15 min). + * + * Catches a tool that keeps the connection warm (SSE heartbeats / trickle) but + * never returns a result — which the transport silence timeout + * ({@link mcpStreamTimeoutMs}) would never break because the socket never goes + * byte-silent. + */ +export function mcpCallTimeoutMs(): number { + return positiveEnv('AI_MCP_CALL_TIMEOUT_MS', DEFAULT_MCP_CALL_TIMEOUT_MS); +} + /** * undici `Agent` options for streaming AI traffic — the (generous, finite) * silence timeouts plus the keep-alive recycle window. Shared by the chat