Compare commits
4 Commits
fe1bbbe806
...
59190148db
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
59190148db | ||
| 80a4b5a1b0 | |||
|
|
da15b55786 | ||
|
|
a14560c7c9 |
@@ -136,6 +136,12 @@ MCP_DOCMOST_PASSWORD=
|
||||
# A slow/hung embeddings endpoint fails after this and the batch continues.
|
||||
# AI_EMBEDDING_TIMEOUT_MS=120000
|
||||
|
||||
# Silence timeout (ms) for streaming chat/agent AI calls AND external-MCP traffic.
|
||||
# Bounds time-to-first-byte and the gap BETWEEN chunks (NOT the total turn length),
|
||||
# so an arbitrarily long turn that keeps streaming is never cut. Finite so a hung
|
||||
# provider is eventually broken instead of leaking forever. Default 900000 (15 min).
|
||||
# AI_STREAM_TIMEOUT_MS=900000
|
||||
|
||||
# --- Anonymous public-share AI assistant ---
|
||||
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
||||
# When enabled, anonymous visitors of a published share can ask an AI about that
|
||||
|
||||
@@ -1307,5 +1307,9 @@
|
||||
"Page tree (child pages, recursive)": "Page tree (child pages, recursive)",
|
||||
"Render the full nested tree of all descendant pages": "Render the full nested tree of all descendant pages",
|
||||
"Showing {{count}} subpages_one": "Showing {{count}} subpage",
|
||||
"Showing {{count}} subpages_other": "Showing {{count}} subpages"
|
||||
"Showing {{count}} subpages_other": "Showing {{count}} subpages",
|
||||
"Protocol": "Protocol",
|
||||
"How chat requests are sent and how reasoning is surfaced": "How chat requests are sent and how reasoning is surfaced",
|
||||
"OpenAI-compatible (surfaces reasoning)": "OpenAI-compatible (surfaces reasoning)",
|
||||
"OpenAI (official)": "OpenAI (official)"
|
||||
}
|
||||
|
||||
@@ -1160,5 +1160,9 @@
|
||||
"Render the full nested tree of all descendant pages": "Показать полное вложенное дерево всех дочерних страниц",
|
||||
"Showing {{count}} subpages_one": "Показано {{count}} подстраница",
|
||||
"Showing {{count}} subpages_few": "Показано {{count}} подстраницы",
|
||||
"Showing {{count}} subpages_many": "Показано {{count}} подстраниц"
|
||||
"Showing {{count}} subpages_many": "Показано {{count}} подстраниц",
|
||||
"Protocol": "Протокол",
|
||||
"How chat requests are sent and how reasoning is surfaced": "Как отправляются запросы чата и как показывается reasoning",
|
||||
"OpenAI-compatible (surfaces reasoning)": "OpenAI-совместимый (показывает reasoning)",
|
||||
"OpenAI (official)": "OpenAI (официальный)"
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ import {
|
||||
AiTestCapability,
|
||||
IAiSettingsUpdate,
|
||||
SttApiStyle,
|
||||
ChatApiStyle,
|
||||
} from "@/features/workspace/services/ai-settings-service.ts";
|
||||
import { useAiRolesQuery } from "@/features/ai-chat/queries/ai-chat-query.ts";
|
||||
import { IAiRole } from "@/features/ai-chat/types/ai-chat.types.ts";
|
||||
@@ -82,6 +83,8 @@ const STT_LANGUAGE_OPTIONS: { value: string; label: string }[] = [
|
||||
// (empty means "leave unchanged" unless explicitly cleared).
|
||||
const formSchema = z.object({
|
||||
chatModel: z.string(),
|
||||
// Chat provider implementation (reasoning surfacing). Default openai-compatible.
|
||||
chatApiStyle: z.enum(["openai-compatible", "openai"]),
|
||||
// Cheap model id for the anonymous public-share assistant; empty = use chatModel.
|
||||
publicShareChatModel: z.string(),
|
||||
// Agent-role id whose persona the public-share assistant adopts; empty =
|
||||
@@ -308,6 +311,7 @@ export default function AiProviderSettings() {
|
||||
validate: zod4Resolver(formSchema),
|
||||
initialValues: {
|
||||
chatModel: "",
|
||||
chatApiStyle: "openai-compatible" as ChatApiStyle,
|
||||
publicShareChatModel: "",
|
||||
publicShareAssistantRoleId: "",
|
||||
embeddingModel: "",
|
||||
@@ -330,6 +334,7 @@ export default function AiProviderSettings() {
|
||||
if (!settings) return;
|
||||
form.setValues({
|
||||
chatModel: settings.chatModel ?? "",
|
||||
chatApiStyle: settings.chatApiStyle ?? "openai-compatible",
|
||||
publicShareChatModel: settings.publicShareChatModel ?? "",
|
||||
publicShareAssistantRoleId: settings.publicShareAssistantRoleId ?? "",
|
||||
embeddingModel: settings.embeddingModel ?? "",
|
||||
@@ -359,6 +364,7 @@ export default function AiProviderSettings() {
|
||||
// Everything is OpenAI-compatible.
|
||||
driver: "openai",
|
||||
chatModel: values.chatModel,
|
||||
chatApiStyle: values.chatApiStyle,
|
||||
// Cheap model id for the anonymous public-share assistant; empty falls
|
||||
// back to chatModel server-side.
|
||||
publicShareChatModel: values.publicShareChatModel,
|
||||
@@ -761,6 +767,24 @@ export default function AiProviderSettings() {
|
||||
{t("Resolves to {{url}}", { url: chatResolved })}
|
||||
</Text>
|
||||
|
||||
<Select
|
||||
mt="sm"
|
||||
label={t("Protocol")}
|
||||
description={t(
|
||||
"How chat requests are sent and how reasoning is surfaced",
|
||||
)}
|
||||
data={[
|
||||
{
|
||||
value: "openai-compatible",
|
||||
label: t("OpenAI-compatible (surfaces reasoning)"),
|
||||
},
|
||||
{ value: "openai", label: t("OpenAI (official)") },
|
||||
]}
|
||||
allowDeselect={false}
|
||||
disabled={isLoading}
|
||||
{...form.getInputProps("chatApiStyle")}
|
||||
/>
|
||||
|
||||
{/* Anonymous public-share assistant: a single master toggle + an
|
||||
optional cheaper model id. Reuses this card's driver/URL/key. */}
|
||||
<Group justify="space-between" align="center" wrap="nowrap" mt="md">
|
||||
|
||||
@@ -9,6 +9,12 @@ export type AiDriver = "openai" | "gemini" | "ollama";
|
||||
// - 'json' -> JSON body with base64-encoded audio (OpenRouter)
|
||||
export type SttApiStyle = "multipart" | "json";
|
||||
|
||||
// Chat provider implementation for the `openai` driver (chosen explicitly):
|
||||
// - 'openai-compatible' -> maps streamed reasoning_content to reasoning parts
|
||||
// (z.ai/GLM, DeepSeek, OpenRouter, ...). Default.
|
||||
// - 'openai' -> official provider; real-OpenAI reasoning-model shaping.
|
||||
export type ChatApiStyle = "openai-compatible" | "openai";
|
||||
|
||||
// Masked AI provider settings returned by the server.
|
||||
// No API key is ever returned; only `hasApiKey` / `hasEmbeddingApiKey` indicate
|
||||
// whether one is stored. `embeddingBaseUrl` is the RAW stored value (empty means
|
||||
@@ -16,6 +22,7 @@ export type SttApiStyle = "multipart" | "json";
|
||||
export interface IAiSettings {
|
||||
driver?: AiDriver;
|
||||
chatModel?: string;
|
||||
chatApiStyle?: ChatApiStyle;
|
||||
// Cheap model id for the anonymous public-share assistant; empty = chatModel.
|
||||
publicShareChatModel?: string;
|
||||
// Agent-role id whose persona the public-share assistant adopts; empty =
|
||||
@@ -49,6 +56,7 @@ export interface IAiSettings {
|
||||
export interface IAiSettingsUpdate {
|
||||
driver?: AiDriver;
|
||||
chatModel?: string;
|
||||
chatApiStyle?: ChatApiStyle;
|
||||
publicShareChatModel?: string;
|
||||
// Agent-role id whose persona the public-share assistant adopts; empty =
|
||||
// built-in locked persona.
|
||||
|
||||
@@ -6,6 +6,7 @@ import { createMCPClient } from '@ai-sdk/mcp';
|
||||
import { Agent, type Dispatcher } from 'undici';
|
||||
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
|
||||
import { AiMcpServer } from '@docmost/db/types/entity.types';
|
||||
import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch';
|
||||
import { SecretBoxService } from '../../../integrations/crypto/secret-box';
|
||||
import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
|
||||
|
||||
@@ -400,6 +401,16 @@ export function validateResolvedAddresses(
|
||||
*/
|
||||
function buildPinnedDispatcher(): Agent {
|
||||
return new Agent({
|
||||
// Raise undici's default 300s headers/body timeouts on external MCP traffic
|
||||
// to the same generous-but-finite silence timeout the chat fetch uses (#175).
|
||||
// A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open
|
||||
// across the whole turn; that connection can idle BETWEEN tool calls longer
|
||||
// than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a
|
||||
// tool-call failure that aborts the streamed turn and shows the user "Lost
|
||||
// connection to the AI provider". A slow single tool call (a crawl) can
|
||||
// likewise exceed headersTimeout. The timeout stays FINITE so a genuinely
|
||||
// hung server is still broken eventually.
|
||||
...streamingDispatcherOptions(),
|
||||
connect: {
|
||||
lookup: (hostname, _options, callback) => {
|
||||
// Always resolve ALL addresses ourselves; do not trust the caller's
|
||||
|
||||
@@ -239,7 +239,7 @@ export class WorkspaceRepo {
|
||||
// is a real jsonb object, never a double-encoded string. The CASE self-heals
|
||||
// workspaces whose settings.ai.provider was previously corrupted into an
|
||||
// array/string.
|
||||
const ALLOWED = ['driver', 'chatModel', 'embeddingModel', 'baseUrl', 'embeddingBaseUrl', 'sttModel', 'sttBaseUrl', 'sttApiStyle', 'sttLanguage', 'systemPrompt', 'publicShareChatModel', 'publicShareAssistantRoleId'];
|
||||
const ALLOWED = ['driver', 'chatModel', 'chatApiStyle', 'embeddingModel', 'baseUrl', 'embeddingBaseUrl', 'sttModel', 'sttBaseUrl', 'sttApiStyle', 'sttLanguage', 'systemPrompt', 'publicShareChatModel', 'publicShareAssistantRoleId'];
|
||||
const entries = Object.entries(provider).filter(
|
||||
([k, v]) => v !== undefined && ALLOWED.includes(k),
|
||||
);
|
||||
|
||||
40
apps/server/src/integrations/ai/ai-provider-http.spec.ts
Normal file
40
apps/server/src/integrations/ai/ai-provider-http.spec.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import { createInstrumentedFetch } from './ai-provider-http';
|
||||
|
||||
/**
|
||||
* createInstrumentedFetch must be behavior-neutral: it delegates to the supplied
|
||||
* baseFetch with the SAME input/init, returns the Response object untouched (so
|
||||
* the streamed SSE body is never read/cloned), and rethrows the same error. The
|
||||
* baseFetch injection is the seam that carries the streaming fetch (#175) onto
|
||||
* the chat provider, so it is tested directly.
|
||||
*/
|
||||
describe('createInstrumentedFetch', () => {
|
||||
it('delegates to the injected baseFetch with the same input/init', async () => {
|
||||
const fakeResponse = new Response('ok', { status: 200 });
|
||||
const baseFetch = jest.fn().mockResolvedValue(fakeResponse);
|
||||
const instrumented = createInstrumentedFetch('test', baseFetch as never);
|
||||
|
||||
const init = { method: 'POST', body: '{"q":1}' };
|
||||
const res = await instrumented('https://example.com/v1/chat', init);
|
||||
|
||||
expect(baseFetch).toHaveBeenCalledTimes(1);
|
||||
expect(baseFetch).toHaveBeenCalledWith('https://example.com/v1/chat', init);
|
||||
// The Response is returned UNTOUCHED (same reference — never read/cloned).
|
||||
expect(res).toBe(fakeResponse);
|
||||
});
|
||||
|
||||
it('rethrows the base fetch error unchanged (pre-response failure)', async () => {
|
||||
const err = Object.assign(new TypeError('fetch failed'), {
|
||||
cause: { code: 'ECONNRESET' },
|
||||
});
|
||||
const baseFetch = jest.fn().mockRejectedValue(err);
|
||||
const instrumented = createInstrumentedFetch('test', baseFetch as never);
|
||||
|
||||
await expect(instrumented('https://example.com/')).rejects.toBe(err);
|
||||
});
|
||||
|
||||
it('defaults to the global fetch when no baseFetch is given', () => {
|
||||
// Constructing without a baseFetch must not throw — it simply wraps global
|
||||
// fetch (the non-chat default).
|
||||
expect(() => createInstrumentedFetch('test')).not.toThrow();
|
||||
});
|
||||
});
|
||||
@@ -1,16 +1,22 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
|
||||
/**
|
||||
* DIAGNOSTIC (provider ECONNRESET investigation) — temporary.
|
||||
* The provider HTTP fetch used by the chat path: a thin, behavior-neutral
|
||||
* instrumentation wrapper around a supplied `fetch`.
|
||||
*
|
||||
* A PASSIVE, behavior-neutral wrapper around the global `fetch`, injected into
|
||||
* the OpenAI-compatible provider client (`createOpenAI({ fetch })`, the z.ai
|
||||
* path). Per provider HTTP call it logs: time-to-response-headers + status +
|
||||
* request-body size on success; and on a pre-response rejection the failure
|
||||
* latency + error code/cause + request-body size + the idle gap since the
|
||||
* previous provider call. It NEVER retries, times out, swaps the dispatcher, or
|
||||
* reads/clones the response body — the Response is returned untouched (streaming
|
||||
* unaffected) and any error is rethrown unchanged.
|
||||
* It defaults to the global `fetch`, but the chat provider passes the streaming
|
||||
* fetch (which RAISES undici's 300s stream timeouts to a generous-but-finite
|
||||
* silence timeout so a long agent turn is not severed mid-stream — #175). So this
|
||||
* wrapper observes the EXACT transport a turn uses. It NEVER retries, times out,
|
||||
* swaps the dispatcher, or reads/clones the response body — the Response is
|
||||
* returned untouched (streaming unaffected) and any error is rethrown unchanged.
|
||||
*
|
||||
* Per provider HTTP call it logs: time-to-response-headers + status + request
|
||||
* body size on success; and on a pre-response rejection the failure latency +
|
||||
* error code/cause + request body size + the idle gap since the previous call.
|
||||
* This telemetry is intentional and kept (it diagnoses provider connection
|
||||
* resets / mid-stream cuts), and it is load-bearing: the streaming fetch reaches
|
||||
* the chat provider THROUGH this wrapper, so the two are one construct.
|
||||
*
|
||||
* How to read the result (a long agentic turn makes one provider call per step):
|
||||
* - a failed turn whose last provider line is "PRE-RESPONSE FAILED ... ECONNRESET"
|
||||
@@ -23,9 +29,15 @@ import { Logger } from '@nestjs/common';
|
||||
* different failure mode.
|
||||
*
|
||||
* The seq/last-call timestamps are module-level, so under concurrent turns the
|
||||
* idle-gap figure is approximate (fine for single-user reproduction).
|
||||
* idle-gap figure is approximate (fine for single-user diagnosis).
|
||||
*/
|
||||
export function createDiagnosticFetch(context: string): typeof fetch {
|
||||
export function createInstrumentedFetch(
|
||||
context: string,
|
||||
// The underlying fetch to instrument. Defaults to the global fetch; the chat
|
||||
// provider passes the streaming fetch (raised, finite undici stream timeouts,
|
||||
// #175) so the telemetry observes the SAME transport the long agent turn uses.
|
||||
baseFetch: typeof fetch = fetch,
|
||||
): typeof fetch {
|
||||
const logger = new Logger(context);
|
||||
let callSeq = 0;
|
||||
let lastCallStartedAt: number | undefined;
|
||||
@@ -46,11 +58,11 @@ export function createDiagnosticFetch(context: string): typeof fetch {
|
||||
? body.byteLength
|
||||
: undefined;
|
||||
try {
|
||||
// Delegate to global fetch; return the Response UNTOUCHED (never read/clone
|
||||
// the body) so the streamed SSE response is unaffected.
|
||||
const res = await fetch(input, init);
|
||||
// Delegate to the base fetch; return the Response UNTOUCHED (never read/
|
||||
// clone the body) so the streamed SSE response is unaffected.
|
||||
const res = await baseFetch(input, init);
|
||||
logger.log(
|
||||
`provider HTTP DIAGNOSTIC: call#${callId} OK ` +
|
||||
`provider HTTP: call#${callId} OK ` +
|
||||
`headersAfter=${Date.now() - startedAt}ms status=${res.status} ` +
|
||||
`reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`,
|
||||
);
|
||||
@@ -64,7 +76,7 @@ export function createDiagnosticFetch(context: string): typeof fetch {
|
||||
cause?: { code?: string; message?: string };
|
||||
};
|
||||
logger.warn(
|
||||
`provider HTTP DIAGNOSTIC: call#${callId} PRE-RESPONSE FAILED ` +
|
||||
`provider HTTP: call#${callId} PRE-RESPONSE FAILED ` +
|
||||
`after=${Date.now() - startedAt}ms code=${e?.cause?.code ?? 'none'} ` +
|
||||
`name=${e?.name ?? 'Error'} cause=${e?.cause?.message ?? e?.message ?? 'unknown'} ` +
|
||||
`reqBytes=${bodyBytes ?? 'n/a'} idleSincePrevCall=${idleSincePrev ?? 'n/a'}ms`,
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
MaskedAiSettings,
|
||||
ResolvedAiConfig,
|
||||
SttApiStyle,
|
||||
ChatApiStyle,
|
||||
} from './ai.types';
|
||||
|
||||
/**
|
||||
@@ -24,6 +25,7 @@ import {
|
||||
export interface UpdateAiSettingsInput {
|
||||
driver?: AiDriver;
|
||||
chatModel?: string;
|
||||
chatApiStyle?: ChatApiStyle;
|
||||
embeddingModel?: string;
|
||||
baseUrl?: string;
|
||||
embeddingBaseUrl?: string;
|
||||
@@ -157,6 +159,8 @@ export class AiSettingsService {
|
||||
const config: ResolvedAiConfig = {
|
||||
driver: provider.driver,
|
||||
chatModel: provider.chatModel,
|
||||
// Plain passthrough; getChatModel defaults unset to 'openai-compatible'.
|
||||
chatApiStyle: provider.chatApiStyle,
|
||||
// Cheap model id for the anonymous public-share assistant; reuses the chat
|
||||
// driver/baseUrl/apiKey. Empty/unset → callers fall back to chatModel.
|
||||
publicShareChatModel: provider.publicShareChatModel,
|
||||
@@ -238,6 +242,7 @@ export class AiSettingsService {
|
||||
return {
|
||||
driver: provider.driver,
|
||||
chatModel: provider.chatModel,
|
||||
chatApiStyle: provider.chatApiStyle,
|
||||
embeddingModel: provider.embeddingModel,
|
||||
baseUrl: provider.baseUrl,
|
||||
embeddingBaseUrl: provider.embeddingBaseUrl,
|
||||
@@ -278,6 +283,7 @@ export class AiSettingsService {
|
||||
for (const key of [
|
||||
'driver',
|
||||
'chatModel',
|
||||
'chatApiStyle',
|
||||
'embeddingModel',
|
||||
'baseUrl',
|
||||
'embeddingBaseUrl',
|
||||
|
||||
112
apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts
Normal file
112
apps/server/src/integrations/ai/ai-streaming-fetch.spec.ts
Normal file
@@ -0,0 +1,112 @@
|
||||
import * as http from 'node:http';
|
||||
import {
|
||||
createStreamingFetch,
|
||||
streamTimeoutMs,
|
||||
streamingDispatcherOptions,
|
||||
} from './ai-streaming-fetch';
|
||||
|
||||
/**
|
||||
* #175: undici's default 300s headers/body timeouts severed long agent turns.
|
||||
* The streaming fetch raises them to a generous-but-FINITE silence timeout (not
|
||||
* 0 — a true hang must still break). We pin: the configured value + env override,
|
||||
* that both dispatcher timeouts use it, and that a delayed response streams.
|
||||
*/
|
||||
describe('streamTimeoutMs', () => {
|
||||
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
|
||||
afterEach(() => {
|
||||
if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||
else process.env.AI_STREAM_TIMEOUT_MS = ORIG;
|
||||
});
|
||||
|
||||
it('defaults to a generous-but-finite 15 minutes', () => {
|
||||
delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||
expect(streamTimeoutMs()).toBe(900_000);
|
||||
// Finite — NOT disabled (0 would let a hung provider leak forever).
|
||||
expect(streamTimeoutMs()).toBeGreaterThan(0);
|
||||
expect(Number.isFinite(streamTimeoutMs())).toBe(true);
|
||||
});
|
||||
|
||||
it('honours a positive AI_STREAM_TIMEOUT_MS override', () => {
|
||||
process.env.AI_STREAM_TIMEOUT_MS = '120000';
|
||||
expect(streamTimeoutMs()).toBe(120000);
|
||||
});
|
||||
|
||||
it('ignores an invalid / non-positive override (falls back to default)', () => {
|
||||
for (const bad of ['0', '-5', 'abc', '']) {
|
||||
process.env.AI_STREAM_TIMEOUT_MS = bad;
|
||||
expect(streamTimeoutMs()).toBe(900_000);
|
||||
}
|
||||
});
|
||||
|
||||
it('applies the timeout to BOTH undici stream timeouts', () => {
|
||||
delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||
expect(streamingDispatcherOptions()).toEqual({
|
||||
headersTimeout: 900_000,
|
||||
bodyTimeout: 900_000,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('createStreamingFetch — against a delayed server', () => {
|
||||
const ORIG = process.env.AI_STREAM_TIMEOUT_MS;
|
||||
let server: http.Server;
|
||||
let url: string;
|
||||
// The server waits before sending ANY byte (a long time-to-first-token). It is
|
||||
// > undici's ~1s timeout-timer granularity so a sub-second configured timeout
|
||||
// fires deterministically in the load-bearing test below.
|
||||
const DELAY = 1500;
|
||||
|
||||
beforeAll(async () => {
|
||||
server = http.createServer((_req, res) => {
|
||||
setTimeout(() => {
|
||||
res.writeHead(200, { 'Content-Type': 'text/plain' });
|
||||
res.end('ok');
|
||||
}, DELAY);
|
||||
});
|
||||
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
|
||||
const addr = server.address() as import('node:net').AddressInfo;
|
||||
url = `http://127.0.0.1:${addr.port}/`;
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
if (ORIG === undefined) delete process.env.AI_STREAM_TIMEOUT_MS;
|
||||
else process.env.AI_STREAM_TIMEOUT_MS = ORIG;
|
||||
});
|
||||
|
||||
it('streams the delayed response at the default (generous) timeout', async () => {
|
||||
delete process.env.AI_STREAM_TIMEOUT_MS; // default 15 min >> DELAY
|
||||
const streamingFetch = createStreamingFetch();
|
||||
const res = await streamingFetch(url);
|
||||
expect(res.status).toBe(200);
|
||||
expect(await res.text()).toBe('ok');
|
||||
});
|
||||
|
||||
it('LOAD-BEARING: a sub-DELAY AI_STREAM_TIMEOUT_MS actually severs the response', async () => {
|
||||
// Proves the configured dispatcher is wired into the fetch: with the timeout
|
||||
// set below DELAY the call must reject with undici's headers-timeout. If the
|
||||
// dispatcher were lost (fallback to global fetch's 300s default), the 1.5s
|
||||
// response would slip through and this would NOT throw.
|
||||
process.env.AI_STREAM_TIMEOUT_MS = '500';
|
||||
const streamingFetch = createStreamingFetch();
|
||||
let caught: unknown;
|
||||
const startedAt = Date.now();
|
||||
try {
|
||||
await streamingFetch(url).then((r) => r.text());
|
||||
} catch (e) {
|
||||
caught = e;
|
||||
}
|
||||
// It rejected (a lost dispatcher -> global 300s default would NOT reject on a
|
||||
// 1.5s response) and it did so BEFORE the response would have arrived (DELAY).
|
||||
// Use `.name` (realm-safe) — undici's TypeError fails cross-realm instanceof.
|
||||
expect(caught).toBeDefined();
|
||||
expect((caught as Error)?.name).toBe('TypeError');
|
||||
expect(Date.now() - startedAt).toBeLessThan(DELAY);
|
||||
// When present, the undici cause is the headers timeout.
|
||||
const code = (caught as { cause?: { code?: string } })?.cause?.code;
|
||||
if (code) expect(code).toBe('UND_ERR_HEADERS_TIMEOUT');
|
||||
});
|
||||
});
|
||||
58
apps/server/src/integrations/ai/ai-streaming-fetch.ts
Normal file
58
apps/server/src/integrations/ai/ai-streaming-fetch.ts
Normal file
@@ -0,0 +1,58 @@
|
||||
import { Agent } from 'undici';
|
||||
|
||||
/**
|
||||
* Default SILENCE timeout for streaming AI calls (15 min). Generous, but FINITE.
|
||||
*
|
||||
* Node's global fetch (undici) defaults headersTimeout and bodyTimeout to
|
||||
* 300_000ms, which severed legitimate long agent turns mid-stream — surfacing as
|
||||
* "Lost connection to the AI provider" (#175): a late step with a huge context
|
||||
* pushes the model's time-to-first-token past 5 min, or a reasoning model pauses
|
||||
* >5 min between chunks. We do NOT disable the timeout (0) — that would let a
|
||||
* genuinely hung provider, with the client still connected, hang forever
|
||||
* (abortSignal only fires on client disconnect). Instead we raise it well above
|
||||
* any realistic gap while keeping it finite so a true hang is eventually broken.
|
||||
*
|
||||
* This bounds SILENCE (time-to-first-byte and the gap BETWEEN chunks), NOT total
|
||||
* turn duration — so an arbitrarily long turn that keeps streaming bytes is never
|
||||
* cut; only a stream that goes quiet for longer than this is treated as a hang.
|
||||
*/
|
||||
const DEFAULT_STREAM_TIMEOUT_MS = 900_000;
|
||||
|
||||
/**
|
||||
* The configured silence timeout (ms). Override with `AI_STREAM_TIMEOUT_MS`; a
|
||||
* missing/invalid/non-positive value falls back to {@link DEFAULT_STREAM_TIMEOUT_MS}.
|
||||
*/
|
||||
export function streamTimeoutMs(): number {
|
||||
const raw = Number(process.env.AI_STREAM_TIMEOUT_MS);
|
||||
return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STREAM_TIMEOUT_MS;
|
||||
}
|
||||
|
||||
/**
|
||||
* undici `Agent` timeout options for streaming AI traffic — both stream timeouts
|
||||
* set to the (generous, finite) silence timeout. Shared by the chat provider
|
||||
* fetch and the external-MCP dispatcher so they behave identically (#175).
|
||||
*/
|
||||
export function streamingDispatcherOptions(): {
|
||||
headersTimeout: number;
|
||||
bodyTimeout: number;
|
||||
} {
|
||||
const t = streamTimeoutMs();
|
||||
return { headersTimeout: t, bodyTimeout: t };
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a `fetch` for long-lived streaming AI calls (the agent chat turn) backed
|
||||
* by a dedicated undici dispatcher whose stream timeouts are the generous-but-
|
||||
* finite silence timeout above (#175). A single shared dispatcher is returned
|
||||
* (callers hold it for the service lifetime) so its connection pool is reused.
|
||||
*/
|
||||
export function createStreamingFetch(): typeof fetch {
|
||||
const dispatcher = new Agent(streamingDispatcherOptions());
|
||||
return ((input: Parameters<typeof fetch>[0], init?: RequestInit) =>
|
||||
fetch(input, {
|
||||
...(init ?? {}),
|
||||
// `dispatcher` is an undici-specific init field (not in the DOM RequestInit
|
||||
// type); Node's global fetch reads it. Cast to satisfy the type.
|
||||
dispatcher,
|
||||
} as RequestInit & { dispatcher: Agent })) as typeof fetch;
|
||||
}
|
||||
@@ -287,41 +287,62 @@ describe('AiService.getChatModel role model override', () => {
|
||||
});
|
||||
|
||||
/**
|
||||
* Provider selection for the `openai` driver (reasoning surfacing). A custom
|
||||
* baseURL means an openai-COMPATIBLE third-party endpoint (z.ai/GLM, DeepSeek,
|
||||
* ...): we must use @ai-sdk/openai-compatible, which maps the streamed
|
||||
* `reasoning_content` to reasoning parts (the official @ai-sdk/openai provider
|
||||
* drops it). Real OpenAI (no baseURL) keeps the official provider. We assert via
|
||||
* the built model's `.provider` tag.
|
||||
* Chat provider selection by the EXPLICIT `chatApiStyle` (NOT inferred from
|
||||
* baseUrl): 'openai-compatible' (default) uses @ai-sdk/openai-compatible, which
|
||||
* maps streamed reasoning_content to reasoning parts; 'openai' uses the official
|
||||
* provider; and openai-compatible without a baseURL safely falls back to the
|
||||
* official provider (it has no default endpoint). Asserted via `.provider`.
|
||||
*/
|
||||
describe('AiService.getChatModel openai provider selection', () => {
|
||||
function serviceWith(baseUrl: string | undefined) {
|
||||
describe('AiService.getChatModel chatApiStyle provider selection', () => {
|
||||
function serviceWith(opts: {
|
||||
baseUrl?: string;
|
||||
chatApiStyle?: 'openai-compatible' | 'openai';
|
||||
}) {
|
||||
const aiSettings = {
|
||||
resolve: jest.fn().mockResolvedValue({
|
||||
driver: 'openai',
|
||||
chatModel: 'glm-5.2',
|
||||
apiKey: 'key',
|
||||
baseUrl,
|
||||
baseUrl: opts.baseUrl,
|
||||
chatApiStyle: opts.chatApiStyle,
|
||||
}),
|
||||
};
|
||||
return new AiService(
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
aiSettings as any,
|
||||
{ find: jest.fn() } as any,
|
||||
{ decryptSecret: jest.fn() } as any,
|
||||
{ find: jest.fn() } as never,
|
||||
{ decryptSecret: jest.fn() } as never,
|
||||
);
|
||||
}
|
||||
|
||||
it('uses the openai-compatible provider when a custom baseURL is set', async () => {
|
||||
const model = await serviceWith('https://api.z.ai/api/coding/paas/v4').getChatModel(
|
||||
'ws-1',
|
||||
);
|
||||
// openai-compatible surfaces reasoning_content; tagged "openai-compatible.*".
|
||||
expect((model as { provider: string }).provider).toContain('openai-compatible');
|
||||
const providerOf = async (svc: AiService) =>
|
||||
(
|
||||
(await svc.getChatModel('ws-1')) as { provider: string }
|
||||
).provider;
|
||||
|
||||
it("'openai-compatible' + baseURL -> openai-compatible provider", async () => {
|
||||
expect(
|
||||
await providerOf(
|
||||
serviceWith({ baseUrl: 'https://api.z.ai/v4', chatApiStyle: 'openai-compatible' }),
|
||||
),
|
||||
).toContain('openai-compatible');
|
||||
});
|
||||
|
||||
it('uses the official openai provider when there is no baseURL (real OpenAI)', async () => {
|
||||
const model = await serviceWith(undefined).getChatModel('ws-1');
|
||||
expect((model as { provider: string }).provider).toBe('openai.chat');
|
||||
it("'openai' + baseURL -> official openai provider", async () => {
|
||||
expect(
|
||||
await providerOf(serviceWith({ baseUrl: 'https://api.z.ai/v4', chatApiStyle: 'openai' })),
|
||||
).toBe('openai.chat');
|
||||
});
|
||||
|
||||
it('unset + baseURL -> defaults to openai-compatible', async () => {
|
||||
expect(
|
||||
await providerOf(serviceWith({ baseUrl: 'https://api.z.ai/v4' })),
|
||||
).toContain('openai-compatible');
|
||||
});
|
||||
|
||||
it("'openai-compatible' WITHOUT baseURL -> safe fallback to official openai", async () => {
|
||||
expect(
|
||||
await providerOf(serviceWith({ chatApiStyle: 'openai-compatible' })),
|
||||
).toBe('openai.chat');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -15,8 +15,8 @@ import { AiNotConfiguredException } from './ai-not-configured.exception';
|
||||
import { AiEmbeddingNotConfiguredException } from './ai-embedding-not-configured.exception';
|
||||
import { AiSttNotConfiguredException } from './ai-stt-not-configured.exception';
|
||||
import { describeProviderError } from './ai-error.util';
|
||||
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary.
|
||||
import { createDiagnosticFetch } from './ai-http-diagnostics';
|
||||
import { createInstrumentedFetch } from './ai-provider-http';
|
||||
import { createStreamingFetch } from './ai-streaming-fetch';
|
||||
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
||||
import { SecretBoxService } from '../crypto/secret-box';
|
||||
import { AiDriver } from './ai.types';
|
||||
@@ -46,11 +46,14 @@ export interface ChatModelOverride {
|
||||
export class AiService {
|
||||
private readonly logger = new Logger(AiService.name);
|
||||
|
||||
// DIAGNOSTIC (provider ECONNRESET investigation) — temporary: passive
|
||||
// instrumentation of the OpenAI-compatible provider HTTP calls (z.ai).
|
||||
// Logs call timing/outcome only — no behavior change.
|
||||
private readonly aiDiagnosticFetch = createDiagnosticFetch(
|
||||
// Provider HTTP fetch for the chat path: the streaming fetch — which RAISES
|
||||
// undici's 300s headers/body timeouts to a generous-but-finite silence timeout
|
||||
// so a long agent turn is not severed mid-stream (#175) — wrapped with the
|
||||
// provider-HTTP instrumentation so the logs observe that exact transport. Held
|
||||
// for the service lifetime to reuse the streaming dispatcher's connection pool.
|
||||
private readonly aiProviderFetch = createInstrumentedFetch(
|
||||
'AiService:provider-http',
|
||||
createStreamingFetch(),
|
||||
);
|
||||
|
||||
constructor(
|
||||
@@ -93,6 +96,10 @@ export class AiService {
|
||||
|
||||
let apiKey = cfg.apiKey;
|
||||
let baseUrl = cfg.baseUrl;
|
||||
// Chat provider implementation, chosen EXPLICITLY by the admin (not inferred
|
||||
// from baseUrl). Unset → 'openai-compatible' so reasoning is surfaced by
|
||||
// default for this fork's openai+baseUrl setups.
|
||||
const chatApiStyle = cfg.chatApiStyle ?? 'openai-compatible';
|
||||
|
||||
// A driver override that differs from the workspace driver needs that
|
||||
// driver's own creds (the workspace driver's key would be wrong/absent).
|
||||
@@ -143,32 +150,41 @@ export class AiService {
|
||||
}
|
||||
|
||||
switch (driver) {
|
||||
case 'openai':
|
||||
// A custom baseURL means an openai-COMPATIBLE third-party endpoint
|
||||
// (z.ai / GLM, DeepSeek, OpenRouter, ...). Use @ai-sdk/openai-compatible
|
||||
// there: unlike the official @ai-sdk/openai provider, it maps the
|
||||
// provider's streamed `reasoning_content` to reasoning parts, so the
|
||||
// agent's chain-of-thought is surfaced to the UI (and the model is not
|
||||
// silent during a long server-side "thinking" phase). It also targets
|
||||
// Chat Completions (/chat/completions), the portable endpoint that
|
||||
// OpenAI-compatible gateways accept on multi-turn history (the official
|
||||
// provider's default callable targets /responses, which they 400).
|
||||
if (baseUrl) {
|
||||
case 'openai': {
|
||||
// The provider implementation is chosen by the admin's `chatApiStyle`
|
||||
// (NOT inferred from baseUrl — a custom URL can front real OpenAI too).
|
||||
// Both branches hit Chat Completions (/chat/completions); the provider
|
||||
// fetch is the instrumented streaming fetch (finite-but-generous stream
|
||||
// timeouts, #175).
|
||||
//
|
||||
// 'openai-compatible' (default) maps the third-party provider's streamed
|
||||
// `reasoning_content` to reasoning parts (z.ai/GLM, DeepSeek, ...) — the
|
||||
// point of #175. It has no default endpoint, so it requires a baseURL;
|
||||
// when there is none (real OpenAI, or a role's cross-driver override that
|
||||
// cleared baseUrl) we fall back to the official provider.
|
||||
if (chatApiStyle === 'openai-compatible' && baseUrl) {
|
||||
return createOpenAICompatible({
|
||||
name: 'openai-compatible',
|
||||
apiKey,
|
||||
baseURL: baseUrl,
|
||||
// Passive ECONNRESET telemetry; on the chat path it also carries the
|
||||
// streaming fetch (disabled long-turn timeouts) once #175 lands.
|
||||
fetch: this.aiDiagnosticFetch,
|
||||
// Keep streamed token usage (stream_options.include_usage): without
|
||||
// it @ai-sdk/openai-compatible omits usage, zeroing the live token
|
||||
// counter and reasoning-token metadata. The official provider always
|
||||
// sent it, so this preserves parity.
|
||||
includeUsage: true,
|
||||
fetch: this.aiProviderFetch,
|
||||
})(chatModel);
|
||||
}
|
||||
// Real OpenAI (no custom baseURL): keep the official provider, on Chat
|
||||
// Completions to preserve multi-turn compatibility.
|
||||
// Official @ai-sdk/openai: real-OpenAI reasoning-model request shaping;
|
||||
// `.chat()` targets Chat Completions (the default callable targets the
|
||||
// Responses API, which openai-compatible gateways 400 on multi-turn
|
||||
// history). In this fork baseUrl is normally set; undefined = real OpenAI.
|
||||
return createOpenAI({
|
||||
apiKey,
|
||||
fetch: this.aiDiagnosticFetch,
|
||||
baseURL: baseUrl,
|
||||
fetch: this.aiProviderFetch,
|
||||
}).chat(chatModel);
|
||||
}
|
||||
case 'gemini':
|
||||
return createGoogleGenerativeAI({ apiKey })(chatModel);
|
||||
case 'ollama':
|
||||
|
||||
@@ -16,6 +16,15 @@ export const AI_DRIVERS: AiDriver[] = ['openai', 'gemini', 'ollama'];
|
||||
export type SttApiStyle = 'multipart' | 'json';
|
||||
export const STT_API_STYLES: SttApiStyle[] = ['multipart', 'json'];
|
||||
|
||||
// Chat provider implementation for the `openai` driver. Chosen explicitly by the
|
||||
// admin (NOT inferred from baseUrl — a custom URL can front real OpenAI too).
|
||||
// 'openai-compatible' = @ai-sdk/openai-compatible: maps streamed
|
||||
// `reasoning_content` to reasoning parts (z.ai/GLM, DeepSeek, OpenRouter, ...).
|
||||
// 'openai' = official @ai-sdk/openai: real-OpenAI reasoning-model request shaping
|
||||
// (max_completion_tokens, the 'developer' role), no third-party reasoning map.
|
||||
export type ChatApiStyle = 'openai-compatible' | 'openai';
|
||||
export const CHAT_API_STYLES: ChatApiStyle[] = ['openai-compatible', 'openai'];
|
||||
|
||||
/**
|
||||
* Non-secret provider settings persisted under `settings.ai.provider`.
|
||||
* The API key is intentionally absent here.
|
||||
@@ -23,6 +32,9 @@ export const STT_API_STYLES: SttApiStyle[] = ['multipart', 'json'];
|
||||
export interface AiProviderSettings {
|
||||
driver: AiDriver;
|
||||
chatModel: string;
|
||||
// Chat provider implementation for the `openai` driver. Unset → defaults to
|
||||
// 'openai-compatible' (so reasoning is surfaced by default). See ChatApiStyle.
|
||||
chatApiStyle?: ChatApiStyle;
|
||||
embeddingModel?: string;
|
||||
baseUrl?: string;
|
||||
// Embedding-specific base URL. Falls back to `baseUrl` when empty/unset.
|
||||
@@ -76,6 +88,7 @@ export interface ResolvedAiConfig extends Partial<AiProviderSettings> {
|
||||
export interface MaskedAiSettings {
|
||||
driver?: AiDriver;
|
||||
chatModel?: string;
|
||||
chatApiStyle?: ChatApiStyle;
|
||||
embeddingModel?: string;
|
||||
baseUrl?: string;
|
||||
embeddingBaseUrl?: string;
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
import { IsIn, IsOptional, IsString } from 'class-validator';
|
||||
import { AI_DRIVERS, AiDriver, STT_API_STYLES, SttApiStyle } from '../ai.types';
|
||||
import {
|
||||
AI_DRIVERS,
|
||||
AiDriver,
|
||||
CHAT_API_STYLES,
|
||||
ChatApiStyle,
|
||||
STT_API_STYLES,
|
||||
SttApiStyle,
|
||||
} from '../ai.types';
|
||||
|
||||
/**
|
||||
* Admin update payload for the workspace AI provider settings.
|
||||
@@ -18,6 +25,10 @@ export class UpdateAiSettingsDto {
|
||||
@IsString()
|
||||
chatModel?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsIn(CHAT_API_STYLES)
|
||||
chatApiStyle?: ChatApiStyle;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
embeddingModel?: string;
|
||||
|
||||
Reference in New Issue
Block a user