fix(ai-chat): OpenAI Chat Completions for multi-turn + provider settings, stream UX & errors" -m "Live-stand fixes (OpenRouter / OpenAI-compatible):
- openai provider: use .chat() (Chat Completions) instead of the default callable (Responses API), which gateways reject on multi-turn -> 400. - updateAiProviderSettings: assemble settings.ai.provider via jsonb_build_object with ::text-cast bound params + jsonb_typeof self-heal (postgres.js was double-encoding it into an array; the ::text cast avoids 'could not determine data type of parameter'). - chat agent: drop the hard maxOutputTokens cap (truncated complex tool calls); keep a tiny cap only on the test-connection ping. - testConnection + chat stream: surface the real provider error (statusCode+message) to logs and the UI instead of generic masks; never log the API key. - chat UI: typing indicator, incremental streaming render, tool 'running' status, Stop. Also bundled (prior uncommitted ai-chat work): - history 'AI agent' provenance badge; vector RAG (pgvector image + page_embeddings + AI_QUEUE indexer + space-scoped semanticSearch); external MCP servers backend (@ai-sdk/mcp client, SSRF IP-pinning, encrypted headers, admin CRUD/Test); yjs duplicate-instance fix via pnpm patch (single CJS instance server-side).
This commit is contained in:
@@ -14,6 +14,7 @@ import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo';
|
||||
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
|
||||
import { User, Workspace, AiChatMessage } from '@docmost/db/types/entity.types';
|
||||
import { AiChatToolsService } from './tools/ai-chat-tools.service';
|
||||
import { McpClientsService } from './external-mcp/mcp-clients.service';
|
||||
import { buildSystemPrompt } from './ai-chat.prompt';
|
||||
|
||||
/**
|
||||
@@ -62,6 +63,7 @@ export class AiChatService {
|
||||
private readonly aiChatMessageRepo: AiChatMessageRepo,
|
||||
private readonly aiSettings: AiSettingsService,
|
||||
private readonly tools: AiChatToolsService,
|
||||
private readonly mcpClients: McpClientsService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
@@ -143,13 +145,58 @@ export class AiChatService {
|
||||
// Pass the resolved chatId so the write tools can mint provenance tokens
|
||||
// (access + collab) carrying { actor:'agent', aiChatId: chatId }, making
|
||||
// agent REST/collab writes attributable and non-spoofable (§6.5/§6.6).
|
||||
const tools = await this.tools.forUser(
|
||||
const docmostTools = await this.tools.forUser(
|
||||
user,
|
||||
sessionId,
|
||||
workspace.id,
|
||||
chatId,
|
||||
);
|
||||
|
||||
// Merge in admin-configured external MCP tools (web search, etc.; §6.8).
|
||||
// A down/slow external server never crashes the turn — toolsFor skips it and
|
||||
// records the outcome. The returned client handles MUST be closed in the
|
||||
// streamText lifecycle (onFinish/onError/onAbort) — leaking them is a bug.
|
||||
// Docmost tools take precedence on a name clash (external are namespaced, so
|
||||
// a clash is not expected; the spread order makes intent explicit).
|
||||
let external: Awaited<ReturnType<McpClientsService['toolsFor']>> = {
|
||||
tools: {},
|
||||
clients: [],
|
||||
outcomes: [],
|
||||
};
|
||||
try {
|
||||
external = await this.mcpClients.toolsFor(workspace.id);
|
||||
} catch (err) {
|
||||
// Building the external toolset must never break the turn; proceed with
|
||||
// Docmost-only tools. Never log URLs/headers — short message only.
|
||||
this.logger.warn(
|
||||
`External MCP toolset unavailable: ${
|
||||
err instanceof Error ? err.message : 'unknown error'
|
||||
}`,
|
||||
);
|
||||
}
|
||||
const tools = { ...external.tools, ...docmostTools };
|
||||
|
||||
// Close every external client EXACTLY ONCE across the turn's terminal
|
||||
// callbacks (onFinish/onError/onAbort all fire at most once collectively,
|
||||
// but guard anyway). Close errors are swallowed so they never break the
|
||||
// response.
|
||||
let clientsClosed = false;
|
||||
const closeExternalClients = async (): Promise<void> => {
|
||||
if (clientsClosed) return;
|
||||
clientsClosed = true;
|
||||
await Promise.all(
|
||||
external.clients.map((c) =>
|
||||
c.close().catch((closeErr) => {
|
||||
this.logger.warn(
|
||||
`Failed to close external MCP client: ${
|
||||
closeErr instanceof Error ? closeErr.message : 'unknown error'
|
||||
}`,
|
||||
);
|
||||
}),
|
||||
),
|
||||
);
|
||||
};
|
||||
|
||||
// Persist the assistant message. Used by onFinish (full result) and the
|
||||
// abort/error paths (partial result). Guarded so we persist at most once.
|
||||
let persisted = false;
|
||||
@@ -175,16 +222,25 @@ export class AiChatService {
|
||||
}
|
||||
};
|
||||
|
||||
// NOTE: streamText is synchronous in v6 — do NOT await it.
|
||||
const result = streamText({
|
||||
// NOTE: streamText is synchronous in v6 — do NOT await it. A synchronous
|
||||
// failure here (or in pipe below) would skip the terminal callbacks, so the
|
||||
// catch releases the leased external clients to avoid a connection leak.
|
||||
let result: ReturnType<typeof streamText>;
|
||||
try {
|
||||
result = streamText({
|
||||
model,
|
||||
system,
|
||||
messages,
|
||||
tools,
|
||||
// No maxOutputTokens cap on the agent: tool-call arguments (e.g. a full
|
||||
// page body for the write tools) are emitted as OUTPUT tokens, so a fixed
|
||||
// cap would truncate complex tool calls mid-argument. Let the model use its
|
||||
// natural per-step budget. (Cost/credit limits are an account concern, not
|
||||
// something to enforce by silently breaking the agent.)
|
||||
stopWhen: stepCountIs(8),
|
||||
abortSignal: signal,
|
||||
onFinish: ({ text, finishReason, totalUsage, steps }) => {
|
||||
return persistAssistant({
|
||||
onFinish: async ({ text, finishReason, totalUsage, steps }) => {
|
||||
await persistAssistant({
|
||||
text,
|
||||
toolCalls: serializeSteps(steps),
|
||||
metadata: {
|
||||
@@ -196,21 +252,36 @@ export class AiChatService {
|
||||
parts: assistantParts(steps, text),
|
||||
},
|
||||
});
|
||||
// Lifecycle: release the external MCP clients leased for this turn.
|
||||
await closeExternalClients();
|
||||
},
|
||||
onError: ({ error }) => {
|
||||
this.logger.error('AI chat stream error', error as Error);
|
||||
// Persist whatever text we have (likely empty) so the turn is recorded.
|
||||
return persistAssistant({
|
||||
onError: async ({ error }) => {
|
||||
// NestJS Logger.error(message, stack?, context?): pass the real message
|
||||
// (with statusCode when present) + the stack string, not the Error
|
||||
// object, so the actual provider cause is clearly logged.
|
||||
const e = error as {
|
||||
statusCode?: number;
|
||||
message?: string;
|
||||
stack?: string;
|
||||
};
|
||||
const errorText = e?.statusCode
|
||||
? `${e.statusCode}: ${e.message ?? String(error)}`
|
||||
: (e?.message ?? String(error));
|
||||
this.logger.error(`AI chat stream error: ${errorText}`, e?.stack);
|
||||
// Persist whatever text we have (likely empty) so the turn is recorded,
|
||||
// and record the error text in metadata so it is visible in history.
|
||||
await persistAssistant({
|
||||
text: '',
|
||||
toolCalls: null,
|
||||
metadata: { finishReason: 'error', parts: [] },
|
||||
metadata: { finishReason: 'error', parts: [], error: errorText },
|
||||
});
|
||||
await closeExternalClients();
|
||||
},
|
||||
onAbort: ({ steps }) => {
|
||||
onAbort: async ({ steps }) => {
|
||||
// Client disconnected / request aborted: persist the partial answer,
|
||||
// including any completed tool steps so the turn replays faithfully.
|
||||
const text = steps.map((s) => s.text ?? '').join('');
|
||||
return persistAssistant({
|
||||
await persistAssistant({
|
||||
text,
|
||||
toolCalls: serializeSteps(steps),
|
||||
metadata: {
|
||||
@@ -218,23 +289,42 @@ export class AiChatService {
|
||||
parts: assistantParts(steps, text),
|
||||
},
|
||||
});
|
||||
await closeExternalClients();
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
// Fire-and-forget async title generation for a freshly created chat. Never
|
||||
// block the stream on it; swallow any error.
|
||||
if (isNewChat && incomingText) {
|
||||
void this.generateTitle(chatId, workspace.id, incomingText).catch(
|
||||
(err) => {
|
||||
this.logger.warn(
|
||||
`Title generation failed: ${(err as Error)?.message ?? err}`,
|
||||
);
|
||||
// Fire-and-forget async title generation for a freshly created chat. Never
|
||||
// block the stream on it; swallow any error.
|
||||
if (isNewChat && incomingText) {
|
||||
void this.generateTitle(chatId, workspace.id, incomingText).catch(
|
||||
(err) => {
|
||||
this.logger.warn(
|
||||
`Title generation failed: ${(err as Error)?.message ?? err}`,
|
||||
);
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Stream the UI-message protocol straight to the hijacked Node response.
|
||||
// Without onError the AI SDK masks the cause ('An error occurred.') and the
|
||||
// UI shows a generic failure. Surface the real provider message instead.
|
||||
// AI SDK error messages / 4xx bodies never contain the API key, so this is
|
||||
// safe; we never dump the resolved config/apiKey.
|
||||
result.pipeUIMessageStreamToResponse(res.raw, {
|
||||
onError: (error: unknown) => {
|
||||
const e = error as { statusCode?: number; message?: string };
|
||||
return e?.statusCode
|
||||
? `${e.statusCode}: ${e.message}`
|
||||
: (e?.message ?? 'AI stream error');
|
||||
},
|
||||
);
|
||||
});
|
||||
} catch (err) {
|
||||
// Synchronous failure before/while wiring the stream: the terminal
|
||||
// callbacks will not run, so release the leased external clients here and
|
||||
// re-throw for the controller to surface on the socket.
|
||||
await closeExternalClients();
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Stream the UI-message protocol straight to the hijacked Node response.
|
||||
result.pipeUIMessageStreamToResponse(res.raw);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user