chore(ai-chat): add stream timing logs + env-gated aiFetch bypass (diagnostics)
The streaming chat turn hangs in all browsers while the non-streaming test endpoint works — both use the same model/transport (createOpenAI + aiFetch), so the suspect is the streaming path / custom undici RetryAgent transport. - ai-http.ts: wrap aiFetch with per-request timing logs (start, ms-to-headers on success, elapsed ms + cause on failure). Chat at info, embeddings at debug. Only host+path logged. - ai-chat.controller.ts / ai-chat.service.ts: log turn START, first-chunk latency, FINISHED duration, and elapsed ms on disconnect/error/abort. - ai.service.ts: AI_BYPASS_RESILIENT_FETCH=true makes the CHAT model omit fetch:aiFetch and use the default global fetch — isolates transport vs request-shape. Chat-only; embeddings/STT untouched; reversible via env. - .env.example: document the flag. No timeout/retry change. tsc clean; ai-chat + ai suites pass (292).
This commit is contained in:
@@ -128,6 +128,11 @@ MCP_DOCMOST_PASSWORD=
|
|||||||
# A slow/hung embeddings endpoint fails after this and the batch continues.
|
# A slow/hung embeddings endpoint fails after this and the batch continues.
|
||||||
# AI_EMBEDDING_TIMEOUT_MS=120000
|
# AI_EMBEDDING_TIMEOUT_MS=120000
|
||||||
|
|
||||||
|
# Diagnostic: bypass the resilient outbound HTTP layer (custom undici RetryAgent)
|
||||||
|
# for the CHAT model, using the default global fetch instead. Use only to isolate
|
||||||
|
# a streaming/transport issue; leave unset in normal operation.
|
||||||
|
# AI_BYPASS_RESILIENT_FETCH=true
|
||||||
|
|
||||||
# --- Anonymous public-share AI assistant ---
|
# --- Anonymous public-share AI assistant ---
|
||||||
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
||||||
# When enabled, anonymous visitors of a published share can ask an AI about that
|
# When enabled, anonymous visitors of a published share can ask an AI about that
|
||||||
|
|||||||
@@ -142,6 +142,9 @@ export class AiChatController {
|
|||||||
|
|
||||||
const body = (req.body ?? {}) as AiChatStreamBody;
|
const body = (req.body ?? {}) as AiChatStreamBody;
|
||||||
|
|
||||||
|
// Diagnostic timing baseline for this turn (see START / terminal logs below).
|
||||||
|
const startedAt = Date.now();
|
||||||
|
|
||||||
// Resolve the agent role for this turn BEFORE hijack: existing chats read it
|
// Resolve the agent role for this turn BEFORE hijack: existing chats read it
|
||||||
// from ai_chats.role_id (authoritative), a new chat from body.roleId. The
|
// from ai_chats.role_id (authoritative), a new chat from body.roleId. The
|
||||||
// role drives both the persona and the optional model override below.
|
// role drives both the persona and the optional model override below.
|
||||||
@@ -167,7 +170,7 @@ export class AiChatController {
|
|||||||
// so log it here before aborting the agent loop.
|
// so log it here before aborting the agent loop.
|
||||||
if (!res.raw.writableEnded) {
|
if (!res.raw.writableEnded) {
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
'AI chat stream: client disconnected before completion; aborting turn',
|
`AI chat stream: client disconnected before completion after ${Date.now() - startedAt}ms; aborting turn`,
|
||||||
);
|
);
|
||||||
controller.abort();
|
controller.abort();
|
||||||
}
|
}
|
||||||
@@ -175,6 +178,10 @@ export class AiChatController {
|
|||||||
req.raw.once('close', onClose);
|
req.raw.once('close', onClose);
|
||||||
res.raw.once('finish', () => req.raw.off('close', onClose));
|
res.raw.once('finish', () => req.raw.off('close', onClose));
|
||||||
|
|
||||||
|
this.logger.log(
|
||||||
|
`AI chat stream START chat=${body.chatId ?? 'new'} ua="${req.headers['user-agent'] ?? ''}"`,
|
||||||
|
);
|
||||||
|
|
||||||
// Commit to streaming: hijack so Fastify stops managing the response and
|
// Commit to streaming: hijack so Fastify stops managing the response and
|
||||||
// the AI SDK can write the UI-message stream directly to the Node socket.
|
// the AI SDK can write the UI-message stream directly to the Node socket.
|
||||||
res.hijack();
|
res.hijack();
|
||||||
|
|||||||
@@ -192,6 +192,7 @@ export class AiChatService {
|
|||||||
model,
|
model,
|
||||||
role,
|
role,
|
||||||
}: AiChatStreamArgs): Promise<void> {
|
}: AiChatStreamArgs): Promise<void> {
|
||||||
|
const turnStartedAt = Date.now();
|
||||||
// Resolve / create the chat. A new chat is created when no valid chatId is
|
// Resolve / create the chat. A new chat is created when no valid chatId is
|
||||||
// supplied or the supplied one does not belong to this workspace.
|
// supplied or the supplied one does not belong to this workspace.
|
||||||
let isNewChat = false;
|
let isNewChat = false;
|
||||||
@@ -380,6 +381,10 @@ export class AiChatService {
|
|||||||
const capturedSteps: StepLike[] = [];
|
const capturedSteps: StepLike[] = [];
|
||||||
let inProgressText = '';
|
let inProgressText = '';
|
||||||
|
|
||||||
|
// Log only the FIRST streamed chunk so we can see the provider's observed
|
||||||
|
// time-to-first-token without flooding the log with every delta.
|
||||||
|
let firstChunkLogged = false;
|
||||||
|
|
||||||
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
||||||
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
||||||
// catch releases the leased external clients to avoid a connection leak.
|
// catch releases the leased external clients to avoid a connection leak.
|
||||||
@@ -404,6 +409,12 @@ export class AiChatService {
|
|||||||
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
prepareStep: ({ stepNumber }) => prepareAgentStep(stepNumber, system),
|
||||||
abortSignal: signal,
|
abortSignal: signal,
|
||||||
onChunk: ({ chunk }) => {
|
onChunk: ({ chunk }) => {
|
||||||
|
if (!firstChunkLogged) {
|
||||||
|
firstChunkLogged = true;
|
||||||
|
this.logger.log(
|
||||||
|
`AI chat stream first chunk (${chunk.type}) chat=${chatId} after ${Date.now() - turnStartedAt}ms`,
|
||||||
|
);
|
||||||
|
}
|
||||||
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
// 'text-delta' is the assistant's prose; tool-call args are separate chunk
|
||||||
// types — so this mirrors exactly what streams to the client.
|
// types — so this mirrors exactly what streams to the client.
|
||||||
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
if (chunk.type === 'text-delta') inProgressText += chunk.text;
|
||||||
@@ -415,6 +426,9 @@ export class AiChatService {
|
|||||||
inProgressText = '';
|
inProgressText = '';
|
||||||
},
|
},
|
||||||
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
||||||
|
this.logger.log(
|
||||||
|
`AI chat stream FINISHED chat=${chatId} in ${Date.now() - turnStartedAt}ms, ${steps.length} step(s)`,
|
||||||
|
);
|
||||||
await persistAssistant({
|
await persistAssistant({
|
||||||
text,
|
text,
|
||||||
toolCalls: serializeSteps(steps),
|
toolCalls: serializeSteps(steps),
|
||||||
@@ -444,6 +458,9 @@ export class AiChatService {
|
|||||||
const e = error as { stack?: string };
|
const e = error as { stack?: string };
|
||||||
const errorText = describeProviderError(error, String(error));
|
const errorText = describeProviderError(error, String(error));
|
||||||
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
||||||
|
this.logger.warn(
|
||||||
|
`AI chat stream ERROR terminal chat=${chatId} after ${Date.now() - turnStartedAt}ms`,
|
||||||
|
);
|
||||||
// Persist the PARTIAL answer streamed before the failure (text + any
|
// Persist the PARTIAL answer streamed before the failure (text + any
|
||||||
// finished tool steps) WITH the error in metadata, so the turn shows what
|
// finished tool steps) WITH the error in metadata, so the turn shows what
|
||||||
// the user already saw plus the cause — not just a bare error.
|
// the user already saw plus the cause — not just a bare error.
|
||||||
@@ -466,7 +483,8 @@ export class AiChatService {
|
|||||||
// invisible in the logs. Log it (warn) so the abort is traceable.
|
// invisible in the logs. Log it (warn) so the abort is traceable.
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
`AI chat stream aborted (chat ${chatId}) after ${steps.length} ` +
|
||||||
`step(s), ${partialChars} chars partial text; persisting partial turn.`,
|
`step(s), ${partialChars} chars partial text; persisting partial turn` +
|
||||||
|
` after ${Date.now() - turnStartedAt}ms`,
|
||||||
);
|
);
|
||||||
await persistAssistant(
|
await persistAssistant(
|
||||||
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import { Agent, RetryAgent, type Dispatcher } from 'undici';
|
import { Agent, RetryAgent, type Dispatcher } from 'undici';
|
||||||
|
import { Logger } from '@nestjs/common';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dedicated, resilient outbound HTTP layer for ALL AI provider calls.
|
* Dedicated, resilient outbound HTTP layer for ALL AI provider calls.
|
||||||
@@ -83,11 +84,53 @@ const dispatcher: Dispatcher = new RetryAgent(baseAgent, {
|
|||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const logger = new Logger('AiHttp');
|
||||||
|
let requestSeq = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `fetch`-compatible function that routes the request through the shared,
|
* A `fetch`-compatible function that routes the request through the shared,
|
||||||
* resilient AI dispatcher. Injected into AI SDK provider factories via their
|
* resilient AI dispatcher. Injected into AI SDK provider factories via their
|
||||||
* `fetch` option. Follows the repo convention (see mcp-clients.service.ts
|
* `fetch` option. Follows the repo convention (see mcp-clients.service.ts
|
||||||
* `guardedFetch`).
|
* `guardedFetch`).
|
||||||
|
*
|
||||||
|
* Wrapped with timing logs so provider latency is visible: for streaming
|
||||||
|
* responses `fetch` resolves when RESPONSE HEADERS arrive (the body streams
|
||||||
|
* after), so "in <ms>ms (headers received)" is exactly the provider's
|
||||||
|
* time-to-first-byte, and a rejection time pinpoints a headers/body timeout.
|
||||||
|
* Chat/Responses calls log at info; bulk embedding calls log at debug so RAG
|
||||||
|
* indexing never floods the logs. No secrets are logged — only host + pathname.
|
||||||
*/
|
*/
|
||||||
export const aiFetch: typeof fetch = (input, init) =>
|
export const aiFetch: typeof fetch = async (input, init) => {
|
||||||
fetch(input, { ...init, dispatcher } as RequestInit);
|
const id = ++requestSeq;
|
||||||
|
const method = (init?.method ?? 'GET').toUpperCase();
|
||||||
|
const rawUrl =
|
||||||
|
typeof input === 'string'
|
||||||
|
? input
|
||||||
|
: input instanceof URL
|
||||||
|
? input.href
|
||||||
|
: (input as Request).url;
|
||||||
|
let path = rawUrl;
|
||||||
|
try {
|
||||||
|
const u = new URL(rawUrl);
|
||||||
|
path = u.host + u.pathname;
|
||||||
|
} catch {
|
||||||
|
// Non-absolute / unparseable URL: keep the raw string (still no secrets).
|
||||||
|
}
|
||||||
|
const isChat = /\/(chat\/completions|responses)\b/.test(path);
|
||||||
|
const log = (msg: string): void =>
|
||||||
|
isChat ? logger.log(msg) : logger.debug(msg);
|
||||||
|
const startedAt = performance.now();
|
||||||
|
log(`provider request #${id} -> ${method} ${path}`);
|
||||||
|
try {
|
||||||
|
const res = await fetch(input, { ...init, dispatcher } as RequestInit);
|
||||||
|
const ms = Math.round(performance.now() - startedAt);
|
||||||
|
log(`provider request #${id} <- ${res.status} in ${ms}ms (headers received)`);
|
||||||
|
return res;
|
||||||
|
} catch (err) {
|
||||||
|
const ms = Math.round(performance.now() - startedAt);
|
||||||
|
logger.warn(
|
||||||
|
`provider request #${id} x after ${ms}ms: ${(err as Error)?.message ?? String(err)}`,
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|||||||
@@ -133,6 +133,19 @@ export class AiService {
|
|||||||
throw new AiNotConfiguredException();
|
throw new AiNotConfiguredException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Diagnostic toggle: when AI_BYPASS_RESILIENT_FETCH=true the chat model
|
||||||
|
// bypasses the resilient aiFetch (custom undici RetryAgent) and uses the
|
||||||
|
// default global fetch. Isolates whether the streaming chat hang comes from
|
||||||
|
// the custom transport vs the request shape. Reversible via env, no rebuild.
|
||||||
|
const bypassResilientFetch =
|
||||||
|
process.env.AI_BYPASS_RESILIENT_FETCH === 'true';
|
||||||
|
if (bypassResilientFetch) {
|
||||||
|
this.logger.warn(
|
||||||
|
'AI chat: resilient aiFetch BYPASSED for chat model ' +
|
||||||
|
'(AI_BYPASS_RESILIENT_FETCH=true; using default fetch)',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
switch (driver) {
|
switch (driver) {
|
||||||
case 'openai':
|
case 'openai':
|
||||||
// baseURL (when set) covers openai-compatible endpoints. Use Chat
|
// baseURL (when set) covers openai-compatible endpoints. Use Chat
|
||||||
@@ -141,14 +154,22 @@ export class AiService {
|
|||||||
// Responses API (/responses), which OpenAI-compatible gateways
|
// Responses API (/responses), which OpenAI-compatible gateways
|
||||||
// (OpenRouter, etc.) reject on multi-turn requests (history with
|
// (OpenRouter, etc.) reject on multi-turn requests (history with
|
||||||
// assistant messages) → 400.
|
// assistant messages) → 400.
|
||||||
return createOpenAI({ apiKey, baseURL: baseUrl, fetch: aiFetch }).chat(
|
return createOpenAI({
|
||||||
chatModel,
|
apiKey,
|
||||||
);
|
baseURL: baseUrl,
|
||||||
|
...(bypassResilientFetch ? {} : { fetch: aiFetch }),
|
||||||
|
}).chat(chatModel);
|
||||||
case 'gemini':
|
case 'gemini':
|
||||||
return createGoogleGenerativeAI({ apiKey, fetch: aiFetch })(chatModel);
|
return createGoogleGenerativeAI({
|
||||||
|
apiKey,
|
||||||
|
...(bypassResilientFetch ? {} : { fetch: aiFetch }),
|
||||||
|
})(chatModel);
|
||||||
case 'ollama':
|
case 'ollama':
|
||||||
// Ollama needs no API key.
|
// Ollama needs no API key.
|
||||||
return createOllama({ baseURL: baseUrl, fetch: aiFetch })(chatModel);
|
return createOllama({
|
||||||
|
baseURL: baseUrl,
|
||||||
|
...(bypassResilientFetch ? {} : { fetch: aiFetch }),
|
||||||
|
})(chatModel);
|
||||||
default:
|
default:
|
||||||
throw new AiNotConfiguredException();
|
throw new AiNotConfiguredException();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user