Compare commits
15 Commits
fix/ai-str
...
feat/ai-ch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa7a115f66 | ||
|
|
ea61c96a7c | ||
|
|
ae6faf3abc | ||
|
|
e7b719bbb8 | ||
|
|
27c91e4a69 | ||
|
|
c3596dce68 | ||
|
|
b6787cc542 | ||
| 176b0f575f | |||
|
|
df81851eb3 | ||
|
|
4597183a1e | ||
|
|
5aa199660d | ||
|
|
bf2ebb9d47 | ||
|
|
ad90e2290e | ||
| e262f1695c | |||
|
|
91e7335d54 |
13
.env.example
13
.env.example
@@ -149,6 +149,19 @@ MCP_DOCMOST_PASSWORD=
|
|||||||
# your egress drops idle connections faster than ~10s. Default 10000 (10 s).
|
# your egress drops idle connections faster than ~10s. Default 10000 (10 s).
|
||||||
# AI_STREAM_KEEPALIVE_MS=10000
|
# AI_STREAM_KEEPALIVE_MS=10000
|
||||||
|
|
||||||
|
# Silence timeout (ms) for EXTERNAL-MCP transport ONLY (not the chat provider).
|
||||||
|
# Tighter than AI_STREAM_TIMEOUT_MS so a byte-silent/hung MCP server is broken in
|
||||||
|
# ~5 min instead of 15. Note it also cuts a legitimately long but byte-silent
|
||||||
|
# single tool call (a slow crawl that emits nothing until done) and an SSE
|
||||||
|
# transport idling >5 min BETWEEN tool calls. Default 300000 (5 min).
|
||||||
|
# AI_MCP_STREAM_TIMEOUT_MS=300000
|
||||||
|
|
||||||
|
# Total wall-clock cap (ms) for ONE external MCP tool call (app-level, not
|
||||||
|
# transport). Aborts a tool that keeps the socket warm (SSE heartbeats / trickle)
|
||||||
|
# but never returns a result — which the silence timeout above never breaks.
|
||||||
|
# Default 900000 (15 min).
|
||||||
|
# AI_MCP_CALL_TIMEOUT_MS=900000
|
||||||
|
|
||||||
# --- Anonymous public-share AI assistant ---
|
# --- Anonymous public-share AI assistant ---
|
||||||
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
|
||||||
# When enabled, anonymous visitors of a published share can ask an AI about that
|
# When enabled, anonymous visitors of a published share can ask an AI about that
|
||||||
|
|||||||
16
CHANGELOG.md
16
CHANGELOG.md
@@ -12,10 +12,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
|
- **Persistent AI-chat history as the source of truth + server-side export.**
|
||||||
|
An assistant turn is now persisted to the database step by step: the row is
|
||||||
|
inserted upfront as `streaming` and updated as each agent step finishes, then
|
||||||
|
finalized once to `completed`/`error`/`aborted`. A process that dies mid-turn
|
||||||
|
keeps every finished step, and a startup sweep flips any dangling `streaming`
|
||||||
|
row (untouched for 10 minutes) to `aborted`. Chat "Copy" now exports
|
||||||
|
server-side from these rows (`POST /ai-chat/export`) rather than from live
|
||||||
|
client state, so the export is identical whether a chat is freshly streaming,
|
||||||
|
just switched to, or reloaded — and is available from the first turn of a new
|
||||||
|
chat. (#183, #174)
|
||||||
|
|
||||||
- **AI-agent attribution for MCP writes.** Comments (and pages) created through
|
- **AI-agent attribution for MCP writes.** Comments (and pages) created through
|
||||||
the MCP endpoint by a dedicated agent account are now badged as "AI", with
|
the MCP endpoint by a dedicated agent account are now badged as "AI", with
|
||||||
unspoofable provenance derived from a per-user `is_agent` flag (not from the
|
unspoofable provenance derived from a per-user `is_agent` flag (not from the
|
||||||
request body). **Operator setup:** use a *dedicated* service account for the
|
request body). **Operator setup:** use a _dedicated_ service account for the
|
||||||
MCP fallback and set the flag with SQL —
|
MCP fallback and set the flag with SQL —
|
||||||
`UPDATE users SET is_agent = true WHERE email = '<mcp-account>'`. Never flag a
|
`UPDATE users SET is_agent = true WHERE email = '<mcp-account>'`. Never flag a
|
||||||
human or shared account, or its normal edits get mis-attributed as AI. See the
|
human or shared account, or its normal edits get mis-attributed as AI. See the
|
||||||
@@ -150,8 +161,7 @@ embeds — plus a large batch of security hardening and test coverage.
|
|||||||
- Page templates: import `ThrottleModule` so collab boots, never strand an
|
- Page templates: import `ThrottleModule` so collab boots, never strand an
|
||||||
in-flight page-embed id, and add defense-in-depth workspace checks.
|
in-flight page-embed id, and add defense-in-depth workspace checks.
|
||||||
- Pages: `movePage` cycle guard with no phantom `PAGE_MOVED` event.
|
- Pages: `movePage` cycle guard with no phantom `PAGE_MOVED` event.
|
||||||
- Import: surface the real error cause from `/pages/import` instead of a generic
|
- Import: surface the real error cause from `/pages/import` instead of a generic 400.
|
||||||
400.
|
|
||||||
|
|
||||||
### Security
|
### Security
|
||||||
|
|
||||||
|
|||||||
@@ -258,6 +258,7 @@
|
|||||||
"Copy to space": "Copy to space",
|
"Copy to space": "Copy to space",
|
||||||
"Copy chat": "Copy chat",
|
"Copy chat": "Copy chat",
|
||||||
"Copied": "Copied",
|
"Copied": "Copied",
|
||||||
|
"Failed to export chat": "Failed to export chat",
|
||||||
"Duplicate": "Duplicate",
|
"Duplicate": "Duplicate",
|
||||||
"Select a user": "Select a user",
|
"Select a user": "Select a user",
|
||||||
"Select a group": "Select a group",
|
"Select a group": "Select a group",
|
||||||
|
|||||||
@@ -257,6 +257,7 @@
|
|||||||
"Copy": "Копировать",
|
"Copy": "Копировать",
|
||||||
"Copy to space": "Копировать в пространство",
|
"Copy to space": "Копировать в пространство",
|
||||||
"Copied": "Скопировано",
|
"Copied": "Скопировано",
|
||||||
|
"Failed to export chat": "Не удалось экспортировать чат",
|
||||||
"Duplicate": "Дублировать",
|
"Duplicate": "Дублировать",
|
||||||
"Select a user": "Выберите пользователя",
|
"Select a user": "Выберите пользователя",
|
||||||
"Select a group": "Выберите группу",
|
"Select a group": "Выберите группу",
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import {
|
|||||||
useRef,
|
useRef,
|
||||||
useState,
|
useState,
|
||||||
} from "react";
|
} from "react";
|
||||||
import { type UIMessage } from "@ai-sdk/react";
|
|
||||||
import { Group, Loader, Tooltip } from "@mantine/core";
|
import { Group, Loader, Tooltip } from "@mantine/core";
|
||||||
import {
|
import {
|
||||||
IconArrowsDiagonal,
|
IconArrowsDiagonal,
|
||||||
@@ -40,7 +39,7 @@ import {
|
|||||||
} from "@/features/ai-chat/queries/ai-chat-query.ts";
|
} from "@/features/ai-chat/queries/ai-chat-query.ts";
|
||||||
import ConversationList from "@/features/ai-chat/components/conversation-list.tsx";
|
import ConversationList from "@/features/ai-chat/components/conversation-list.tsx";
|
||||||
import ChatThread from "@/features/ai-chat/components/chat-thread.tsx";
|
import ChatThread from "@/features/ai-chat/components/chat-thread.tsx";
|
||||||
import { buildChatMarkdown } from "@/features/ai-chat/utils/chat-markdown.ts";
|
import { exportAiChat } from "@/features/ai-chat/services/ai-chat-service.ts";
|
||||||
import { useChatSession } from "@/features/ai-chat/hooks/use-chat-session.ts";
|
import { useChatSession } from "@/features/ai-chat/hooks/use-chat-session.ts";
|
||||||
import {
|
import {
|
||||||
shouldCollapseOnOutsidePointer,
|
shouldCollapseOnOutsidePointer,
|
||||||
@@ -80,17 +79,31 @@ function computeInitialGeom() {
|
|||||||
Math.min(DEFAULT_HEIGHT, window.innerHeight - 2 * EDGE_MARGIN),
|
Math.min(DEFAULT_HEIGHT, window.innerHeight - 2 * EDGE_MARGIN),
|
||||||
);
|
);
|
||||||
const left = Math.max(EDGE_MARGIN, window.innerWidth - width - 24);
|
const left = Math.max(EDGE_MARGIN, window.innerWidth - width - 24);
|
||||||
const maxTop = Math.max(EDGE_MARGIN, window.innerHeight - height - EDGE_MARGIN);
|
const maxTop = Math.max(
|
||||||
|
EDGE_MARGIN,
|
||||||
|
window.innerHeight - height - EDGE_MARGIN,
|
||||||
|
);
|
||||||
const top = Math.min(60, maxTop);
|
const top = Math.min(60, maxTop);
|
||||||
return { left, top, width, height };
|
return { left, top, width, height };
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clamp a geometry so the window stays within the current viewport.
|
// Clamp a geometry so the window stays within the current viewport.
|
||||||
function clampGeom(g: { left: number; top: number; width: number; height: number }) {
|
function clampGeom(g: {
|
||||||
|
left: number;
|
||||||
|
top: number;
|
||||||
|
width: number;
|
||||||
|
height: number;
|
||||||
|
}) {
|
||||||
const effWidth = Math.max(g.width, MIN_WIDTH);
|
const effWidth = Math.max(g.width, MIN_WIDTH);
|
||||||
const effHeight = Math.max(g.height, MIN_HEIGHT);
|
const effHeight = Math.max(g.height, MIN_HEIGHT);
|
||||||
const maxLeft = Math.max(EDGE_MARGIN, window.innerWidth - effWidth - EDGE_MARGIN);
|
const maxLeft = Math.max(
|
||||||
const maxTop = Math.max(EDGE_MARGIN, window.innerHeight - effHeight - EDGE_MARGIN);
|
EDGE_MARGIN,
|
||||||
|
window.innerWidth - effWidth - EDGE_MARGIN,
|
||||||
|
);
|
||||||
|
const maxTop = Math.max(
|
||||||
|
EDGE_MARGIN,
|
||||||
|
window.innerHeight - effHeight - EDGE_MARGIN,
|
||||||
|
);
|
||||||
return {
|
return {
|
||||||
...g,
|
...g,
|
||||||
left: Math.min(Math.max(EDGE_MARGIN, g.left), maxLeft),
|
left: Math.min(Math.max(EDGE_MARGIN, g.left), maxLeft),
|
||||||
@@ -107,7 +120,7 @@ function clampGeom(g: { left: number; top: number; width: number; height: number
|
|||||||
* ported from the GitmostAgent.jsx design.
|
* ported from the GitmostAgent.jsx design.
|
||||||
*/
|
*/
|
||||||
export default function AiChatWindow() {
|
export default function AiChatWindow() {
|
||||||
const { t } = useTranslation();
|
const { t, i18n } = useTranslation();
|
||||||
const clipboard = useClipboard({ timeout: 500 });
|
const clipboard = useClipboard({ timeout: 500 });
|
||||||
const queryClient = useQueryClient();
|
const queryClient = useQueryClient();
|
||||||
const [windowOpen, setWindowOpen] = useAtom(aiChatWindowOpenAtom);
|
const [windowOpen, setWindowOpen] = useAtom(aiChatWindowOpenAtom);
|
||||||
@@ -148,14 +161,6 @@ export default function AiChatWindow() {
|
|||||||
const { data: messageRows, isLoading: messagesLoading } =
|
const { data: messageRows, isLoading: messagesLoading } =
|
||||||
useAiChatMessagesQuery(activeChatId ?? undefined);
|
useAiChatMessagesQuery(activeChatId ?? undefined);
|
||||||
|
|
||||||
// Live snapshot of the active thread's useChat state, kept up to date by
|
|
||||||
// ChatThread. Lets the export include the in-progress (not-yet-persisted)
|
|
||||||
// streaming turn. A ref avoids re-rendering this window on every token.
|
|
||||||
const liveThreadRef = useRef<{ messages: UIMessage[]; isStreaming: boolean }>({
|
|
||||||
messages: [],
|
|
||||||
isStreaming: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
// Live turn-token total (reasoning + output) for the in-flight turn, pushed up
|
// Live turn-token total (reasoning + output) for the in-flight turn, pushed up
|
||||||
// (THROTTLED to ~8 Hz inside ChatThread) so the header badge ticks mid-stream.
|
// (THROTTLED to ~8 Hz inside ChatThread) so the header badge ticks mid-stream.
|
||||||
// `null` means no turn is in flight -> the badge falls back to the persisted
|
// `null` means no turn is in flight -> the badge falls back to the persisted
|
||||||
@@ -185,8 +190,13 @@ export default function AiChatWindow() {
|
|||||||
// The invalidate closures are passed inline: `onTurnFinished` is read live by
|
// The invalidate closures are passed inline: `onTurnFinished` is read live by
|
||||||
// useChat's onFinish (never in an effect dep array), so their identity does not
|
// useChat's onFinish (never in an effect dep array), so their identity does not
|
||||||
// matter — no memoization ceremony needed.
|
// matter — no memoization ceremony needed.
|
||||||
const { threadKey, waitingForHistory, onTurnFinished, cancelPendingAdoption } =
|
const {
|
||||||
useChatSession({
|
threadKey,
|
||||||
|
waitingForHistory,
|
||||||
|
onTurnFinished,
|
||||||
|
onServerChatId,
|
||||||
|
cancelPendingAdoption,
|
||||||
|
} = useChatSession({
|
||||||
activeChatId,
|
activeChatId,
|
||||||
setActiveChatId,
|
setActiveChatId,
|
||||||
chats,
|
chats,
|
||||||
@@ -225,19 +235,28 @@ export default function AiChatWindow() {
|
|||||||
[cancelPendingAdoption, setActiveChatId, setDraft, setSelectedRoleId],
|
[cancelPendingAdoption, setActiveChatId, setDraft, setSelectedRoleId],
|
||||||
);
|
);
|
||||||
|
|
||||||
// The active chat object (for its title) and an export gate: only enable the
|
// The active chat object (for its title) and an export gate. The export is now
|
||||||
// export button when an existing chat with loaded persisted rows is active.
|
// SERVER-sourced (the DB is the single source of truth — #183): the assistant
|
||||||
|
// row is persisted upfront + per step, so even a brand-new chat whose first
|
||||||
|
// turn is streaming/interrupted has a server row to render. Enable the button
|
||||||
|
// whenever a persisted chat is active (`activeChatId` is set). For a BRAND-NEW
|
||||||
|
// chat that id is adopted EARLY — at the stream's `start` chunk via
|
||||||
|
// onServerChatId (#174) — so the Copy button is available during the first
|
||||||
|
// turn's stream, not only after it terminates.
|
||||||
const activeChat = useMemo(
|
const activeChat = useMemo(
|
||||||
() => chats?.items?.find((c) => c.id === activeChatId) ?? null,
|
() => chats?.items?.find((c) => c.id === activeChatId) ?? null,
|
||||||
[chats, activeChatId],
|
[chats, activeChatId],
|
||||||
);
|
);
|
||||||
const canExport = !!activeChatId && !!messageRows && messageRows.length > 0;
|
const canExport = !!activeChatId;
|
||||||
|
|
||||||
// The role to display in the header and as the assistant's name. Prefer the
|
// The role to display in the header and as the assistant's name. Prefer the
|
||||||
// persisted role of an existing chat (chat-list JOIN); fall back to the role
|
// persisted role of an existing chat (chat-list JOIN); fall back to the role
|
||||||
// picked via a card click for a brand-new or just-adopted chat. selectChat
|
// picked via a card click for a brand-new or just-adopted chat. selectChat
|
||||||
// resets selectedRoleId, so this fallback never leaks into an unrelated chat.
|
// resets selectedRoleId, so this fallback never leaks into an unrelated chat.
|
||||||
const currentRole = useMemo<{ name: string; emoji: string | null } | null>(() => {
|
const currentRole = useMemo<{
|
||||||
|
name: string;
|
||||||
|
emoji: string | null;
|
||||||
|
} | null>(() => {
|
||||||
if (activeChat?.roleName) {
|
if (activeChat?.roleName) {
|
||||||
return { name: activeChat.roleName, emoji: activeChat.roleEmoji ?? null };
|
return { name: activeChat.roleName, emoji: activeChat.roleEmoji ?? null };
|
||||||
}
|
}
|
||||||
@@ -245,37 +264,21 @@ export default function AiChatWindow() {
|
|||||||
return picked ? { name: picked.name, emoji: picked.emoji } : null;
|
return picked ? { name: picked.name, emoji: picked.emoji } : null;
|
||||||
}, [activeChat, enabledRoles, selectedRoleId]);
|
}, [activeChat, enabledRoles, selectedRoleId]);
|
||||||
|
|
||||||
// Build a Markdown export from the already-loaded persisted rows (no network
|
// Fetch the server-rendered Markdown export and copy it to the clipboard. The
|
||||||
// call) and copy it to the clipboard. The "Copied" notification is the
|
// server is the single source of truth (#183): it renders the transcript from
|
||||||
// feedback.
|
// the persisted rows — including an interrupted turn's in-progress row — so the
|
||||||
const handleCopy = useCallback(() => {
|
// export is identical whether the chat is freshly streaming, just switched to,
|
||||||
if (!activeChatId || !messageRows || messageRows.length === 0) return;
|
// or reloaded. The `lang` of the active i18n drives the few localized labels.
|
||||||
// While the active thread is streaming, the current user message and the
|
const handleCopy = useCallback(async () => {
|
||||||
// in-progress assistant reply are NOT yet in messageRows (the persisted
|
if (!activeChatId) return;
|
||||||
// query is only refetched after the turn finishes). Pull the live tail —
|
try {
|
||||||
// messages whose id is not among the persisted rows — and append them,
|
const markdown = await exportAiChat(activeChatId, i18n.language);
|
||||||
// flagging the streaming assistant message as still generating.
|
|
||||||
const live = liveThreadRef.current;
|
|
||||||
const rowIds = new Set(messageRows.map((r) => r.id));
|
|
||||||
const pending = live.isStreaming
|
|
||||||
? live.messages
|
|
||||||
.filter((m) => !rowIds.has(m.id))
|
|
||||||
.map((m) => ({
|
|
||||||
role: m.role,
|
|
||||||
parts: (m.parts ?? []) as { type: string; text?: string }[],
|
|
||||||
generating: m.role === "assistant",
|
|
||||||
}))
|
|
||||||
: [];
|
|
||||||
const markdown = buildChatMarkdown({
|
|
||||||
title: activeChat?.title ?? null,
|
|
||||||
chatId: activeChatId,
|
|
||||||
rows: messageRows,
|
|
||||||
pending,
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
clipboard.copy(markdown);
|
clipboard.copy(markdown);
|
||||||
notifications.show({ message: t("Copied") });
|
notifications.show({ message: t("Copied") });
|
||||||
}, [activeChatId, messageRows, activeChat, clipboard, t]);
|
} catch {
|
||||||
|
notifications.show({ message: t("Failed to export chat"), color: "red" });
|
||||||
|
}
|
||||||
|
}, [activeChatId, clipboard, t, i18n.language]);
|
||||||
|
|
||||||
// Current context size for the active chat: how much the conversation now
|
// Current context size for the active chat: how much the conversation now
|
||||||
// occupies in the model's context window — NOT the cumulative tokens spent.
|
// occupies in the model's context window — NOT the cumulative tokens spent.
|
||||||
@@ -351,7 +354,8 @@ export default function AiChatWindow() {
|
|||||||
const width = el.offsetWidth;
|
const width = el.offsetWidth;
|
||||||
const height = el.offsetHeight;
|
const height = el.offsetHeight;
|
||||||
setGeom((prev) => {
|
setGeom((prev) => {
|
||||||
if (!prev || (prev.width === width && prev.height === height)) return prev;
|
if (!prev || (prev.width === width && prev.height === height))
|
||||||
|
return prev;
|
||||||
return { ...prev, width, height };
|
return { ...prev, width, height };
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -497,11 +501,15 @@ export default function AiChatWindow() {
|
|||||||
flash a "0" badge before any token streams in (#151 review). */}
|
flash a "0" badge before any token streams in (#151 review). */}
|
||||||
{liveTurnTokens !== null && liveTurnTokens > 0 ? (
|
{liveTurnTokens !== null && liveTurnTokens > 0 ? (
|
||||||
<Tooltip label={t("Tokens generated this turn")} withArrow>
|
<Tooltip label={t("Tokens generated this turn")} withArrow>
|
||||||
<span className={classes.badge}>{formatTokens(liveTurnTokens)}</span>
|
<span className={classes.badge}>
|
||||||
|
{formatTokens(liveTurnTokens)}
|
||||||
|
</span>
|
||||||
</Tooltip>
|
</Tooltip>
|
||||||
) : contextTokens > 0 ? (
|
) : contextTokens > 0 ? (
|
||||||
<Tooltip label={t("Current context size")} withArrow>
|
<Tooltip label={t("Current context size")} withArrow>
|
||||||
<span className={classes.badge}>{formatTokens(contextTokens)}</span>
|
<span className={classes.badge}>
|
||||||
|
{formatTokens(contextTokens)}
|
||||||
|
</span>
|
||||||
</Tooltip>
|
</Tooltip>
|
||||||
) : null}
|
) : null}
|
||||||
</div>
|
</div>
|
||||||
@@ -515,7 +523,11 @@ export default function AiChatWindow() {
|
|||||||
aria-label={t("Copy chat")}
|
aria-label={t("Copy chat")}
|
||||||
onClick={handleCopy}
|
onClick={handleCopy}
|
||||||
>
|
>
|
||||||
{clipboard.copied ? <IconCheck size={14} /> : <IconCopy size={14} />}
|
{clipboard.copied ? (
|
||||||
|
<IconCheck size={14} />
|
||||||
|
) : (
|
||||||
|
<IconCopy size={14} />
|
||||||
|
)}
|
||||||
</button>
|
</button>
|
||||||
)}
|
)}
|
||||||
<button
|
<button
|
||||||
@@ -621,7 +633,7 @@ export default function AiChatWindow() {
|
|||||||
onRolePicked={(role) => setSelectedRoleId(role.id)}
|
onRolePicked={(role) => setSelectedRoleId(role.id)}
|
||||||
assistantName={currentRole?.name}
|
assistantName={currentRole?.name}
|
||||||
onTurnFinished={onTurnFinished}
|
onTurnFinished={onTurnFinished}
|
||||||
liveStateRef={liveThreadRef}
|
onServerChatId={onServerChatId}
|
||||||
onLiveTurnTokens={setLiveTurnTokens}
|
onLiveTurnTokens={setLiveTurnTokens}
|
||||||
/>
|
/>
|
||||||
)}
|
)}
|
||||||
|
|||||||
@@ -1,11 +1,4 @@
|
|||||||
import {
|
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
||||||
useCallback,
|
|
||||||
useEffect,
|
|
||||||
useMemo,
|
|
||||||
useRef,
|
|
||||||
useState,
|
|
||||||
type MutableRefObject,
|
|
||||||
} from "react";
|
|
||||||
import { generateId } from "ai";
|
import { generateId } from "ai";
|
||||||
import { ActionIcon, Box, Group, Stack, Text } from "@mantine/core";
|
import { ActionIcon, Box, Group, Stack, Text } from "@mantine/core";
|
||||||
import { IconClockHour4, IconX } from "@tabler/icons-react";
|
import { IconClockHour4, IconX } from "@tabler/icons-react";
|
||||||
@@ -68,12 +61,12 @@ interface ChatThreadProps {
|
|||||||
* authoritative id the server streamed on the assistant message metadata, or
|
* authoritative id the server streamed on the assistant message metadata, or
|
||||||
* undefined on a failed turn — see adopt-chat-id.ts for the full #137 design. */
|
* undefined on a failed turn — see adopt-chat-id.ts for the full #137 design. */
|
||||||
onTurnFinished: (serverChatId?: string) => void;
|
onTurnFinished: (serverChatId?: string) => void;
|
||||||
/** Parent-owned ref that this thread keeps updated with its live useChat
|
/** Called EARLY (at the stream's `start` chunk) with the authoritative server
|
||||||
* snapshot (full message list + streaming flag), so the header's
|
* chat id streamed on the assistant message metadata, so a brand-new chat
|
||||||
* "Copy chat" export can include the in-progress, not-yet-persisted
|
* adopts its real id WHILE the first turn is still streaming (#174 — makes the
|
||||||
* assistant message. A ref (not state) avoids re-rendering the parent on
|
* Copy/export button available mid-stream). Distinct from onTurnFinished,
|
||||||
* every streamed delta. */
|
* which fires only at the terminal outcome. */
|
||||||
liveStateRef?: MutableRefObject<{ messages: UIMessage[]; isStreaming: boolean }>;
|
onServerChatId?: (serverChatId?: string) => void;
|
||||||
/** Reports the live turn-token total (reasoning + output) for the in-flight
|
/** Reports the live turn-token total (reasoning + output) for the in-flight
|
||||||
* turn so the parent can show a header badge that ticks mid-stream. THROTTLED
|
* turn so the parent can show a header badge that ticks mid-stream. THROTTLED
|
||||||
* here (~8 Hz) so the parent re-renders a handful of times a second, not on
|
* here (~8 Hz) so the parent re-renders a handful of times a second, not on
|
||||||
@@ -123,7 +116,7 @@ export default function ChatThread({
|
|||||||
onRolePicked,
|
onRolePicked,
|
||||||
assistantName,
|
assistantName,
|
||||||
onTurnFinished,
|
onTurnFinished,
|
||||||
liveStateRef,
|
onServerChatId,
|
||||||
onLiveTurnTokens,
|
onLiveTurnTokens,
|
||||||
}: ChatThreadProps) {
|
}: ChatThreadProps) {
|
||||||
const { t } = useTranslation();
|
const { t } = useTranslation();
|
||||||
@@ -293,6 +286,26 @@ export default function ChatThread({
|
|||||||
// Keep the flush helper pointed at the latest sendMessage instance.
|
// Keep the flush helper pointed at the latest sendMessage instance.
|
||||||
sendMessageRef.current = sendMessage;
|
sendMessageRef.current = sendMessage;
|
||||||
|
|
||||||
|
// EARLY chat-id adoption (#174): the server streams the authoritative chat id
|
||||||
|
// on the assistant message metadata at the `start` chunk (message.metadata.
|
||||||
|
// chatId — see adopt-chat-id.ts / chatStreamMetadata). Forward it to the parent
|
||||||
|
// AS SOON AS it appears (mid-stream), so a brand-new chat adopts its real id
|
||||||
|
// WHILE the first turn is still streaming and activeChatId-gated affordances
|
||||||
|
// (the Copy/export button) light up immediately, instead of only at onFinish.
|
||||||
|
// Keyed by the last-seen id so we forward each distinct id exactly once. The
|
||||||
|
// parent's onServerChatId is idempotent and a no-op once the chat has an id.
|
||||||
|
const lastForwardedChatIdRef = useRef<string | undefined>(undefined);
|
||||||
|
useEffect(() => {
|
||||||
|
if (!onServerChatId) return;
|
||||||
|
const tail = messages[messages.length - 1];
|
||||||
|
if (tail?.role !== "assistant") return;
|
||||||
|
const serverChatId = extractServerChatId(tail);
|
||||||
|
if (!serverChatId || serverChatId === lastForwardedChatIdRef.current)
|
||||||
|
return;
|
||||||
|
lastForwardedChatIdRef.current = serverChatId;
|
||||||
|
onServerChatId(serverChatId);
|
||||||
|
}, [messages, onServerChatId]);
|
||||||
|
|
||||||
// Live "turn was interrupted" marker for the CURRENT session. The red error
|
// Live "turn was interrupted" marker for the CURRENT session. The red error
|
||||||
// banner (driven by `error`) covers the error case; this covers an aborted
|
// banner (driven by `error`) covers the error case; this covers an aborted
|
||||||
// turn, distinguishing a manual Stop (`isAbort`) from a dropped connection
|
// turn, distinguishing a manual Stop (`isAbort`) from a dropped connection
|
||||||
@@ -309,18 +322,11 @@ export default function ChatThread({
|
|||||||
if (isStreaming) setStopNotice(null);
|
if (isStreaming) setStopNotice(null);
|
||||||
}, [isStreaming]);
|
}, [isStreaming]);
|
||||||
|
|
||||||
// Mirror the live useChat snapshot into the parent-owned ref so the export
|
// Classify the turn error into a heading + detail so the banner names the cause
|
||||||
// (handled in AiChatWindow) can include the in-progress streaming turn. The
|
// (connection reset, timeout, rate limit, context overflow, quota, ...) instead
|
||||||
// cleanup clears the ref on unmount so a thread torn down by `key` on chat
|
// of a generic "Something went wrong". Computed here (not only in the JSX) so
|
||||||
// switch can't leak its (possibly still-streaming) tail into the next chat's
|
// the SAME on-screen banner text can be mirrored into the export (issue #160).
|
||||||
// export before the new thread's effect repopulates the ref.
|
const errorView = error ? describeChatError(error.message ?? "", t) : null;
|
||||||
useEffect(() => {
|
|
||||||
if (!liveStateRef) return;
|
|
||||||
liveStateRef.current = { messages, isStreaming };
|
|
||||||
return () => {
|
|
||||||
liveStateRef.current = { messages: [], isStreaming: false };
|
|
||||||
};
|
|
||||||
}, [liveStateRef, messages, isStreaming]);
|
|
||||||
|
|
||||||
// Report the live turn-token total to the parent header badge, THROTTLED to
|
// Report the live turn-token total to the parent header badge, THROTTLED to
|
||||||
// ~8 Hz so the parent re-renders a few times a second instead of on every
|
// ~8 Hz so the parent re-renders a few times a second instead of on every
|
||||||
@@ -343,8 +349,7 @@ export default function ChatThread({
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const tail = messages[messages.length - 1];
|
const tail = messages[messages.length - 1];
|
||||||
const live =
|
const live = tail?.role === "assistant" ? liveTurnTokens(tail) : null;
|
||||||
tail?.role === "assistant" ? liveTurnTokens(tail) : null;
|
|
||||||
const total = live ? live.reasoning + live.output : 0;
|
const total = live ? live.reasoning + live.output : 0;
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const MIN_INTERVAL = 120; // ms (~8 Hz)
|
const MIN_INTERVAL = 120; // ms (~8 Hz)
|
||||||
@@ -370,11 +375,6 @@ export default function ChatThread({
|
|||||||
};
|
};
|
||||||
}, []);
|
}, []);
|
||||||
|
|
||||||
// Classify the turn error into a heading + detail so the banner names the cause
|
|
||||||
// (connection reset, timeout, rate limit, context overflow, quota, ...) instead
|
|
||||||
// of a generic "Something went wrong".
|
|
||||||
const errorView = error ? describeChatError(error.message ?? "", t) : null;
|
|
||||||
|
|
||||||
// A role was picked with autoStart=false: the role is bound but NOTHING was
|
// A role was picked with autoStart=false: the role is bound but NOTHING was
|
||||||
// sent, so chatId stays null and the empty state would keep showing the cards.
|
// sent, so chatId stays null and the empty state would keep showing the cards.
|
||||||
// This flag hides the cards and reveals the composer (with the role indicated)
|
// This flag hides the cards and reveals the composer (with the role indicated)
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import MessageItem from "@/features/ai-chat/components/message-item.tsx";
|
|||||||
import TypingIndicator from "@/features/ai-chat/components/typing-indicator.tsx";
|
import TypingIndicator from "@/features/ai-chat/components/typing-indicator.tsx";
|
||||||
import { isToolPart, toolRunState, ToolUiPart } from "@/features/ai-chat/utils/tool-parts.tsx";
|
import { isToolPart, toolRunState, ToolUiPart } from "@/features/ai-chat/utils/tool-parts.tsx";
|
||||||
import { assistantMessageHasVisibleContent } from "@/features/ai-chat/utils/message-content.ts";
|
import { assistantMessageHasVisibleContent } from "@/features/ai-chat/utils/message-content.ts";
|
||||||
import { liveTurnTokens } from "@/features/ai-chat/utils/count-stream-tokens.ts";
|
|
||||||
import classes from "@/features/ai-chat/components/ai-chat.module.css";
|
import classes from "@/features/ai-chat/components/ai-chat.module.css";
|
||||||
|
|
||||||
interface MessageListProps {
|
interface MessageListProps {
|
||||||
@@ -51,7 +50,9 @@ const BOTTOM_THRESHOLD = 40;
|
|||||||
* assistant message's LAST part is not live output:
|
* assistant message's LAST part is not live output:
|
||||||
* - the last message is still the user's (assistant hasn't started a row), or
|
* - the last message is still the user's (assistant hasn't started a row), or
|
||||||
* - the assistant row has no parts yet, or
|
* - the assistant row has no parts yet, or
|
||||||
* - its last part is an empty/whitespace text part, or
|
* - its last part is an empty/whitespace text part, or a finished ("done")
|
||||||
|
* text part while the turn continues (the model paused after some narration
|
||||||
|
* and is thinking about its next step), or
|
||||||
* - its last part is a finished/errored tool (the model is thinking about the
|
* - its last part is a finished/errored tool (the model is thinking about the
|
||||||
* next step between tool calls).
|
* next step between tool calls).
|
||||||
* It hides only while output is actively rendering: a non-empty streaming text
|
* It hides only while output is actively rendering: a non-empty streaming text
|
||||||
@@ -65,7 +66,19 @@ export function showTypingIndicator(messages: UIMessage[], isStreaming: boolean)
|
|||||||
const lastPart = last.parts[last.parts.length - 1];
|
const lastPart = last.parts[last.parts.length - 1];
|
||||||
if (!lastPart) return true; // assistant row exists but has no parts yet.
|
if (!lastPart) return true; // assistant row exists but has no parts yet.
|
||||||
// The answer text is actively streaming in -> MessageItem renders it; no dots.
|
// The answer text is actively streaming in -> MessageItem renders it; no dots.
|
||||||
if (lastPart.type === "text" && lastPart.text.trim().length > 0) return false;
|
// Only while it is STILL streaming, though: once a non-empty text part is
|
||||||
|
// finalized ("done") but the turn is still in flight, the model has paused
|
||||||
|
// after some narration and is working on its next step (e.g. about to call a
|
||||||
|
// tool) — nothing is visibly progressing, so the dots must show. A text part
|
||||||
|
// without a `state` is treated as still-rendering (kept suppressed); this
|
||||||
|
// branch only runs while streaming, where live parts always carry a state.
|
||||||
|
if (
|
||||||
|
lastPart.type === "text" &&
|
||||||
|
lastPart.text.trim().length > 0 &&
|
||||||
|
(lastPart as { state?: "streaming" | "done" }).state !== "done"
|
||||||
|
) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// A tool still in flight shows its own Loader in ToolCallCard -> no dots.
|
// A tool still in flight shows its own Loader in ToolCallCard -> no dots.
|
||||||
if (
|
if (
|
||||||
isToolPart(lastPart.type) &&
|
isToolPart(lastPart.type) &&
|
||||||
@@ -95,19 +108,6 @@ export function typingIndicatorShowsName(messages: UIMessage[]): boolean {
|
|||||||
return !assistantMessageHasVisibleContent(last);
|
return !assistantMessageHasVisibleContent(last);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The live thinking-token count to show on the standalone typing indicator. It
|
|
||||||
* is the reasoning split of the tail assistant message (estimate while streaming,
|
|
||||||
* authoritative once the server attaches usage at a step/turn boundary). Returns
|
|
||||||
* 0 when the turn has produced no reasoning yet — the indicator then shows the
|
|
||||||
* plain "Thinking…" line.
|
|
||||||
*/
|
|
||||||
export function tailThinkingTokens(messages: UIMessage[]): number {
|
|
||||||
const last = messages[messages.length - 1];
|
|
||||||
if (!last || last.role !== "assistant") return 0;
|
|
||||||
return liveTurnTokens(last).reasoning;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scrollable transcript. Auto-scrolls to the newest message as it streams in,
|
* Scrollable transcript. Auto-scrolls to the newest message as it streams in,
|
||||||
* but only while the user is pinned to the bottom — if they scrolled up to read
|
* but only while the user is pinned to the bottom — if they scrolled up to read
|
||||||
@@ -208,7 +208,6 @@ export default function MessageList({
|
|||||||
<TypingIndicator
|
<TypingIndicator
|
||||||
assistantName={assistantName}
|
assistantName={assistantName}
|
||||||
showName={typingIndicatorShowsName(messages)}
|
showName={typingIndicatorShowsName(messages)}
|
||||||
thinkingTokens={tailThinkingTokens(messages)}
|
|
||||||
/>
|
/>
|
||||||
)}
|
)}
|
||||||
</Stack>
|
</Stack>
|
||||||
|
|||||||
@@ -82,4 +82,14 @@ describe("showTypingIndicator", () => {
|
|||||||
showTypingIndicator([msg("assistant", [doneTool, text])], true),
|
showTypingIndicator([msg("assistant", [doneTool, text])], true),
|
||||||
).toBe(false);
|
).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("shows while streaming after a text part is finalized (paused before the next step)", () => {
|
||||||
|
const doneText = { type: "text", text: "Now creating the page in", state: "done" } as unknown as UIMessage["parts"][number];
|
||||||
|
expect(showTypingIndicator([msg("assistant", [doneText])], true)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("hides while a text part is actively streaming (state: streaming)", () => {
|
||||||
|
const streamingText = { type: "text", text: "Now writ", state: "streaming" } as unknown as UIMessage["parts"][number];
|
||||||
|
expect(showTypingIndicator([msg("assistant", [streamingText])], true)).toBe(false);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,50 +0,0 @@
|
|||||||
import { describe, expect, it } from "vitest";
|
|
||||||
import type { UIMessage } from "@ai-sdk/react";
|
|
||||||
import { tailThinkingTokens } from "@/features/ai-chat/components/message-list.tsx";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pure-helper tests for `tailThinkingTokens`: the live thinking-token count the
|
|
||||||
* standalone typing indicator shows. It is the reasoning split of the tail
|
|
||||||
* assistant message (estimate while streaming, authoritative once usage arrives).
|
|
||||||
*/
|
|
||||||
const msg = (
|
|
||||||
role: "user" | "assistant",
|
|
||||||
parts: unknown[],
|
|
||||||
metadata?: unknown,
|
|
||||||
): UIMessage =>
|
|
||||||
({ id: Math.random().toString(), role, parts, metadata }) as UIMessage;
|
|
||||||
|
|
||||||
describe("tailThinkingTokens", () => {
|
|
||||||
it("is 0 when there are no messages", () => {
|
|
||||||
expect(tailThinkingTokens([])).toBe(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("is 0 when the tail message is the user's", () => {
|
|
||||||
expect(tailThinkingTokens([msg("user", [{ type: "text", text: "q" }])])).toBe(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("is 0 when the assistant has produced no reasoning yet", () => {
|
|
||||||
expect(
|
|
||||||
tailThinkingTokens([msg("assistant", [{ type: "text", text: "answer" }])]),
|
|
||||||
).toBe(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("estimates reasoning tokens from streamed reasoning text", () => {
|
|
||||||
// 8 chars -> 2 tokens.
|
|
||||||
expect(
|
|
||||||
tailThinkingTokens([
|
|
||||||
msg("assistant", [{ type: "reasoning", text: "12345678" }]),
|
|
||||||
]),
|
|
||||||
).toBe(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("uses authoritative usage.reasoningTokens once the server attaches it", () => {
|
|
||||||
expect(
|
|
||||||
tailThinkingTokens([
|
|
||||||
msg("assistant", [{ type: "reasoning", text: "x" }], {
|
|
||||||
usage: { outputTokens: 100, reasoningTokens: 42 },
|
|
||||||
}),
|
|
||||||
]),
|
|
||||||
).toBe(42);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@@ -16,12 +16,6 @@ interface TypingIndicatorProps {
|
|||||||
* assistant row above already shows the same name, to avoid a duplicate label.
|
* assistant row above already shows the same name, to avoid a duplicate label.
|
||||||
*/
|
*/
|
||||||
showName?: boolean;
|
showName?: boolean;
|
||||||
/**
|
|
||||||
* Live thinking/reasoning token count for the in-flight turn. When > 0 the
|
|
||||||
* typing line becomes `Thinking… · {count} tokens` (like Claude Code). Omitted
|
|
||||||
* / 0 keeps the plain `Thinking…` line.
|
|
||||||
*/
|
|
||||||
thinkingTokens?: number;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -32,23 +26,20 @@ interface TypingIndicatorProps {
|
|||||||
*
|
*
|
||||||
* Mirrors the assistant row layout in MessageItem (the dimmed label), so it reads
|
* Mirrors the assistant row layout in MessageItem (the dimmed label), so it reads
|
||||||
* as the assistant's bubble taking shape. The dimmed label uses the configured
|
* as the assistant's bubble taking shape. The dimmed label uses the configured
|
||||||
* identity name when provided (otherwise the generic "AI agent"), while the
|
* identity name when provided (otherwise the generic "AI agent"); below it the
|
||||||
* typing line is always the generic "Thinking…" (it never includes the
|
* animated dots stand in for the nascent bubble until content arrives.
|
||||||
* role/identity name).
|
|
||||||
*/
|
*/
|
||||||
export default function TypingIndicator({ assistantName, showName = true, thinkingTokens }: TypingIndicatorProps) {
|
export default function TypingIndicator({ assistantName, showName = true }: TypingIndicatorProps) {
|
||||||
const { t } = useTranslation();
|
const { t } = useTranslation();
|
||||||
const name = resolveAssistantName(assistantName);
|
const name = resolveAssistantName(assistantName);
|
||||||
// Show the running thinking-token count only once there is something to count.
|
|
||||||
const thinkingLine =
|
|
||||||
thinkingTokens && thinkingTokens > 0
|
|
||||||
? t("Thinking… · {{count}} tokens", { count: thinkingTokens })
|
|
||||||
: t("Thinking…");
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<Box className={classes.messageRow}>
|
<Box className={classes.messageRow}>
|
||||||
{showName !== false && (
|
{showName !== false && (
|
||||||
<Text size="xs" c="dimmed" mb={4}>
|
// Extra bottom gap (vs MessageItem's mb={4}) gives the small bouncing
|
||||||
|
// dots room below the name label; without it they crowd the label. Only
|
||||||
|
// applies when the name is shown — the nameless case spaces fine on its own.
|
||||||
|
<Text size="xs" c="dimmed" mb={8}>
|
||||||
{name ?? t("AI agent")}
|
{name ?? t("AI agent")}
|
||||||
</Text>
|
</Text>
|
||||||
)}
|
)}
|
||||||
@@ -58,9 +49,6 @@ export default function TypingIndicator({ assistantName, showName = true, thinki
|
|||||||
<span />
|
<span />
|
||||||
<span />
|
<span />
|
||||||
</span>
|
</span>
|
||||||
<Text size="sm" c="dimmed">
|
|
||||||
{thinkingLine}
|
|
||||||
</Text>
|
|
||||||
</Group>
|
</Group>
|
||||||
</Box>
|
</Box>
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -64,7 +64,10 @@ describe("useChatSession", () => {
|
|||||||
result.current.onTurnFinished(undefined);
|
result.current.onTurnFinished(undefined);
|
||||||
expect(setActiveChatId).not.toHaveBeenCalled();
|
expect(setActiveChatId).not.toHaveBeenCalled();
|
||||||
// The refetch lands with the new row => adopt it.
|
// The refetch lands with the new row => adopt it.
|
||||||
rerender({ activeChatId: null, chats: { items: [{ id: "x" }, { id: "new" }] } });
|
rerender({
|
||||||
|
activeChatId: null,
|
||||||
|
chats: { items: [{ id: "x" }, { id: "new" }] },
|
||||||
|
});
|
||||||
expect(setActiveChatId).toHaveBeenCalledWith("new");
|
expect(setActiveChatId).toHaveBeenCalledWith("new");
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -88,7 +91,10 @@ describe("useChatSession", () => {
|
|||||||
});
|
});
|
||||||
result.current.onTurnFinished(undefined);
|
result.current.onTurnFinished(undefined);
|
||||||
// a was deleted, new was added — same length, but membership changed.
|
// a was deleted, new was added — same length, but membership changed.
|
||||||
rerender({ activeChatId: null, chats: { items: [{ id: "b" }, { id: "new" }] } });
|
rerender({
|
||||||
|
activeChatId: null,
|
||||||
|
chats: { items: [{ id: "b" }, { id: "new" }] },
|
||||||
|
});
|
||||||
expect(setActiveChatId).toHaveBeenCalledWith("new");
|
expect(setActiveChatId).toHaveBeenCalledWith("new");
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -171,6 +177,40 @@ describe("useChatSession", () => {
|
|||||||
expect(setActiveChatId).not.toHaveBeenCalledWith("late");
|
expect(setActiveChatId).not.toHaveBeenCalledWith("late");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("#174 early adopt: onServerChatId adopts the streamed id mid-stream (Copy button available during the first turn)", () => {
|
||||||
|
// Brand-new chat: no id yet. The server streams the real chat id "A" on the
|
||||||
|
// `start` chunk WHILE the first turn is still streaming (before onTurnFinished
|
||||||
|
// fires at the terminal outcome). The hook must adopt it immediately so the
|
||||||
|
// window's activeChatId-gated Copy/export button lights up during the stream.
|
||||||
|
const { result, setActiveChatId } = setup({
|
||||||
|
activeChatId: null,
|
||||||
|
chats: { items: [] },
|
||||||
|
});
|
||||||
|
result.current.onServerChatId("A");
|
||||||
|
expect(setActiveChatId).toHaveBeenCalledWith("A");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("#174 early adopt is in-place: threadKey stays stable (live stream not torn down)", () => {
|
||||||
|
const chats = { items: [] };
|
||||||
|
const { result, rerender } = setup({ activeChatId: null, chats });
|
||||||
|
const keyBefore = result.current.threadKey;
|
||||||
|
result.current.onServerChatId("A");
|
||||||
|
// Parent reflects the adopted id back in; the SAME mount key is kept so the
|
||||||
|
// in-flight useChat store (the streaming turn) is preserved.
|
||||||
|
rerender({ activeChatId: "A", chats });
|
||||||
|
expect(result.current.threadKey).toBe(keyBefore);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("#174 early adopt: no-op for an existing chat and for a missing id", () => {
|
||||||
|
const { result, setActiveChatId } = setup({
|
||||||
|
activeChatId: "chat-1",
|
||||||
|
chats: { items: [{ id: "chat-1" }] },
|
||||||
|
});
|
||||||
|
result.current.onServerChatId("chat-1"); // already has an id
|
||||||
|
result.current.onServerChatId(undefined); // no streamed id
|
||||||
|
expect(setActiveChatId).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
it("in-place adopt keeps threadKey stable; an external switch remounts", () => {
|
it("in-place adopt keeps threadKey stable; an external switch remounts", () => {
|
||||||
const chats = { items: [{ id: "B" }] };
|
const chats = { items: [{ id: "B" }] };
|
||||||
const { result, rerender } = setup({ activeChatId: null, chats });
|
const { result, rerender } = setup({ activeChatId: null, chats });
|
||||||
|
|||||||
@@ -34,6 +34,13 @@ export interface UseChatSessionResult {
|
|||||||
/** Call when a turn finishes; `serverChatId` is the authoritative streamed id
|
/** Call when a turn finishes; `serverChatId` is the authoritative streamed id
|
||||||
* (undefined on a failed turn). Handles new-chat id adoption + invalidations. */
|
* (undefined on a failed turn). Handles new-chat id adoption + invalidations. */
|
||||||
onTurnFinished: (serverChatId?: string) => void;
|
onTurnFinished: (serverChatId?: string) => void;
|
||||||
|
/** Call EARLY (at the stream's `start` chunk) with the authoritative streamed
|
||||||
|
* chat id so a brand-new chat adopts its real id WHILE its first turn is still
|
||||||
|
* streaming — making `activeChatId`-gated affordances (e.g. the Copy/export
|
||||||
|
* button, #174) available immediately. In-place adoption only (same mount key,
|
||||||
|
* no list/messages invalidation — that is left to onTurnFinished at the end).
|
||||||
|
* Idempotent and a no-op once the chat already has an id. */
|
||||||
|
onServerChatId: (serverChatId?: string) => void;
|
||||||
/** Disarm any pending error-path new-chat fallback. The window calls this from
|
/** Disarm any pending error-path new-chat fallback. The window calls this from
|
||||||
* startNewChat/selectChat so a late refetch can't yank the user back into a
|
* startNewChat/selectChat so a late refetch can't yank the user back into a
|
||||||
* just-failed chat after they explicitly moved on. */
|
* just-failed chat after they explicitly moved on. */
|
||||||
@@ -85,10 +92,7 @@ export function useChatSession(
|
|||||||
// `newThread`/`switchThread` to (re)mount, `adoptThread` for in-place adoption.
|
// `newThread`/`switchThread` to (re)mount, `adoptThread` for in-place adoption.
|
||||||
// Initial: a non-null activeChatId switches to it; a null one gets a fresh
|
// Initial: a non-null activeChatId switches to it; a null one gets a fresh
|
||||||
// session key with no chat id yet.
|
// session key with no chat id yet.
|
||||||
const [thread, dispatch] = useReducer(
|
const [thread, dispatch] = useReducer(threadSessionReducer, undefined, () =>
|
||||||
threadSessionReducer,
|
|
||||||
undefined,
|
|
||||||
() =>
|
|
||||||
activeChatId === null
|
activeChatId === null
|
||||||
? newThread(`new-${generateId()}`)
|
? newThread(`new-${generateId()}`)
|
||||||
: switchThread(activeChatId),
|
: switchThread(activeChatId),
|
||||||
@@ -150,6 +154,31 @@ export function useChatSession(
|
|||||||
[chats, setActiveChatId, onInvalidateChatList, onInvalidateChatMessages],
|
[chats, setActiveChatId, onInvalidateChatList, onInvalidateChatMessages],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// EARLY adoption (#174): adopt the authoritative streamed chat id the moment
|
||||||
|
// the server emits it on the `start` chunk, so a brand-new chat gets its real
|
||||||
|
// `activeChatId` WHILE its first turn streams — not only at terminal
|
||||||
|
// onTurnFinished. This makes the activeChatId-gated Copy/export button
|
||||||
|
// available during the first turn. Pure in-place adoption (same mount key, like
|
||||||
|
// the primary path) with NO invalidation: the list/messages refresh stays on
|
||||||
|
// onTurnFinished at the end of the turn. Reads the live id from the ref so a
|
||||||
|
// repeat call after adoption is a no-op (resolveAdoptedChatId only fires for a
|
||||||
|
// still-new chat).
|
||||||
|
const onServerChatId = useCallback(
|
||||||
|
(serverChatId?: string) => {
|
||||||
|
const adopted = resolveAdoptedChatId(
|
||||||
|
activeChatIdRef.current,
|
||||||
|
serverChatId,
|
||||||
|
);
|
||||||
|
if (!adopted) return;
|
||||||
|
activeChatIdRef.current = adopted;
|
||||||
|
setActiveChatId(adopted);
|
||||||
|
dispatch({ type: "adopt", chatId: adopted });
|
||||||
|
// Early adoption beat the error-path fallback to it — disarm.
|
||||||
|
pendingNewChatRef.current = null;
|
||||||
|
},
|
||||||
|
[setActiveChatId],
|
||||||
|
);
|
||||||
|
|
||||||
// FALLBACK resolver. Armed only by onTurnFinished when a brand-new chat's first
|
// FALLBACK resolver. Armed only by onTurnFinished when a brand-new chat's first
|
||||||
// turn errored before the `start` chunk (no authoritative id streamed). Once
|
// turn errored before the `start` chunk (no authoritative id streamed). Once
|
||||||
// the per-user list refetch lands with the just-created row, adopt the SINGLE
|
// the per-user list refetch lands with the just-created row, adopt the SINGLE
|
||||||
@@ -233,6 +262,7 @@ export function useChatSession(
|
|||||||
threadKey: thread.key,
|
threadKey: thread.key,
|
||||||
waitingForHistory,
|
waitingForHistory,
|
||||||
onTurnFinished,
|
onTurnFinished,
|
||||||
|
onServerChatId,
|
||||||
cancelPendingAdoption,
|
cancelPendingAdoption,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,6 +50,24 @@ export async function deleteAiChat(chatId: string): Promise<void> {
|
|||||||
await api.post("/ai-chat/delete", { chatId });
|
await api.post("/ai-chat/delete", { chatId });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Export a chat to Markdown (#183). The server renders the transcript from the
|
||||||
|
* persisted rows (the DB is the single source of truth — including an
|
||||||
|
* interrupted turn's in-progress row, persisted upfront + per step), so the
|
||||||
|
* client just copies the returned string. `lang` localizes the few fixed
|
||||||
|
* role/tool labels; defaults to English server-side when omitted.
|
||||||
|
*/
|
||||||
|
export async function exportAiChat(
|
||||||
|
chatId: string,
|
||||||
|
lang?: string,
|
||||||
|
): Promise<string> {
|
||||||
|
const req = await api.post<{ markdown: string }>("/ai-chat/export", {
|
||||||
|
chatId,
|
||||||
|
lang,
|
||||||
|
});
|
||||||
|
return req.data.markdown;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Agent roles API (`/ai-chat/roles`). `list` is available to any workspace
|
* Agent roles API (`/ai-chat/roles`). `list` is available to any workspace
|
||||||
* member (for the chat-creation picker); create/update/delete are admin-only
|
* member (for the chat-creation picker); create/update/delete are admin-only
|
||||||
@@ -76,6 +94,8 @@ export async function updateAiRole(data: IAiRoleUpdate): Promise<IAiRole> {
|
|||||||
|
|
||||||
/** Soft-delete a role (admin). */
|
/** Soft-delete a role (admin). */
|
||||||
export async function deleteAiRole(id: string): Promise<{ success: true }> {
|
export async function deleteAiRole(id: string): Promise<{ success: true }> {
|
||||||
const req = await api.post<{ success: true }>("/ai-chat/roles/delete", { id });
|
const req = await api.post<{ success: true }>("/ai-chat/roles/delete", {
|
||||||
|
id,
|
||||||
|
});
|
||||||
return req.data;
|
return req.data;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,491 +0,0 @@
|
|||||||
import { describe, it, expect } from "vitest";
|
|
||||||
import { buildChatMarkdown } from "@/features/ai-chat/utils/chat-markdown.ts";
|
|
||||||
import type { IAiChatMessageRow } from "@/features/ai-chat/types/ai-chat.types.ts";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests for the client-only Markdown export builder. The output embeds a live
|
|
||||||
* `new Date().toISOString()` export timestamp; we never assert that value, only
|
|
||||||
* the deterministic structure (headings, numbering, fenced blocks, totals).
|
|
||||||
*
|
|
||||||
* A pass-through translator keeps role/tool labels predictable so the
|
|
||||||
* structural assertions are stable without an i18n runtime.
|
|
||||||
*/
|
|
||||||
const t = (key: string, values?: Record<string, unknown>): string => {
|
|
||||||
if (values && typeof values.name === "string") {
|
|
||||||
return key.replace("{{name}}", values.name);
|
|
||||||
}
|
|
||||||
return key;
|
|
||||||
};
|
|
||||||
|
|
||||||
function row(partial: Partial<IAiChatMessageRow>): IAiChatMessageRow {
|
|
||||||
return {
|
|
||||||
id: partial.id ?? "id",
|
|
||||||
role: partial.role ?? "user",
|
|
||||||
content: partial.content ?? null,
|
|
||||||
metadata: partial.metadata ?? null,
|
|
||||||
createdAt: partial.createdAt ?? "2026-06-21T00:00:00.000Z",
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
describe("buildChatMarkdown — structure", () => {
|
|
||||||
it("emits the title heading, chat id and message count", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "My chat",
|
|
||||||
chatId: "chat-123",
|
|
||||||
rows: [],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("# My chat");
|
|
||||||
expect(md).toContain("- Chat ID: `chat-123`");
|
|
||||||
expect(md).toContain("- Messages: 0");
|
|
||||||
expect(md).toContain("- Exported:"); // timestamp present, value not asserted
|
|
||||||
});
|
|
||||||
|
|
||||||
it("falls back to the translated 'Untitled chat' for empty/blank titles", () => {
|
|
||||||
expect(
|
|
||||||
buildChatMarkdown({ title: null, chatId: "c", rows: [], t }),
|
|
||||||
).toContain("# Untitled chat");
|
|
||||||
expect(
|
|
||||||
buildChatMarkdown({ title: " ", chatId: "c", rows: [], t }),
|
|
||||||
).toContain("# Untitled chat");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("numbers rows sequentially with role headings", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({ role: "user", content: "hi" }),
|
|
||||||
row({ role: "assistant", content: "hello" }),
|
|
||||||
row({ role: "user", content: "again" }),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("## 1. You");
|
|
||||||
expect(md).toContain("## 2. AI agent");
|
|
||||||
expect(md).toContain("## 3. You");
|
|
||||||
// Heading numbering is strictly index+1, not e.g. role-relative.
|
|
||||||
expect(md).not.toContain("## 0.");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("renders the per-row text content from `content` when no metadata.parts", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [row({ role: "user", content: "plain body" })],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("plain body");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("buildChatMarkdown — text parts", () => {
|
|
||||||
it("skips empty / whitespace-only text parts", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "ignored-content",
|
|
||||||
metadata: {
|
|
||||||
parts: [
|
|
||||||
{ type: "text", text: " " },
|
|
||||||
{ type: "text", text: "" },
|
|
||||||
{ type: "text", text: "kept line" },
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
] as any,
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("kept line");
|
|
||||||
// Whitespace-only part contributed no block of its own.
|
|
||||||
expect(md).not.toContain(" \n\n");
|
|
||||||
// When metadata.parts exists, the plain `content` fallback is NOT used.
|
|
||||||
expect(md).not.toContain("ignored-content");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("buildChatMarkdown — tool parts", () => {
|
|
||||||
it("renders a tool label, name, state and fenced Input/Output blocks", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "",
|
|
||||||
metadata: {
|
|
||||||
parts: [
|
|
||||||
{
|
|
||||||
type: "tool-getPage",
|
|
||||||
state: "output-available",
|
|
||||||
input: { pageId: "p1" },
|
|
||||||
output: { id: "p1", title: "Home" },
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
} as any,
|
|
||||||
],
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
// Known tool name maps to its label key; raw name in backticks; done state.
|
|
||||||
expect(md).toContain("**Tool: Read page** (`getPage`) — done");
|
|
||||||
expect(md).toContain("Input:");
|
|
||||||
expect(md).toContain("Output:");
|
|
||||||
// Fenced JSON blocks contain the stringified payloads.
|
|
||||||
expect(md).toContain('"pageId": "p1"');
|
|
||||||
expect(md).toContain('"title": "Home"');
|
|
||||||
expect(md).toContain("```json");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("renders the generic label for an unknown tool and surfaces errorText", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "",
|
|
||||||
metadata: {
|
|
||||||
parts: [
|
|
||||||
{
|
|
||||||
type: "tool-mysteryTool",
|
|
||||||
state: "output-error",
|
|
||||||
input: { a: 1 },
|
|
||||||
errorText: "boom",
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
} as any,
|
|
||||||
],
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("**Tool: Ran tool mysteryTool** (`mysteryTool`) — error");
|
|
||||||
expect(md).toContain("**Error:** boom");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("does not throw on a circular tool input (falls back to String)", () => {
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
const circular: any = {};
|
|
||||||
circular.self = circular;
|
|
||||||
expect(() =>
|
|
||||||
buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "",
|
|
||||||
metadata: {
|
|
||||||
parts: [
|
|
||||||
{
|
|
||||||
type: "tool-getPage",
|
|
||||||
state: "input-available",
|
|
||||||
input: circular,
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
} as any,
|
|
||||||
],
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
}),
|
|
||||||
).not.toThrow();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("buildChatMarkdown — fence anti-breakout", () => {
|
|
||||||
it("lengthens the delimiter so embedded ``` cannot break out of the block", () => {
|
|
||||||
// Tool input whose stringified string form contains a literal ``` run.
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "",
|
|
||||||
metadata: {
|
|
||||||
parts: [
|
|
||||||
{
|
|
||||||
type: "tool-getPage",
|
|
||||||
state: "output-available",
|
|
||||||
// A bare string passes through stringify() verbatim.
|
|
||||||
input: "before ``` after",
|
|
||||||
output: "x",
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
} as any,
|
|
||||||
],
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
// The fence around the 3-backtick content must use at least 4 backticks so
|
|
||||||
// the embedded ``` run cannot terminate the block.
|
|
||||||
expect(md).toContain("````json\nbefore ``` after\n````");
|
|
||||||
// Robust anti-breakout check: the opening fence delimiter is strictly
|
|
||||||
// longer than the longest backtick run inside the wrapped content. (A naive
|
|
||||||
// `not.toContain("```json...")` is a false negative — a 4-backtick fence
|
|
||||||
// textually contains the 3-backtick substring.)
|
|
||||||
const open = md.match(/(`{3,})json\nbefore/);
|
|
||||||
expect(open).not.toBeNull();
|
|
||||||
expect(open![1].length).toBeGreaterThan(3); // > the 3-backtick run in content
|
|
||||||
});
|
|
||||||
|
|
||||||
it("uses a 5-backtick fence when the content has a 4-backtick run", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "",
|
|
||||||
metadata: {
|
|
||||||
parts: [
|
|
||||||
{
|
|
||||||
type: "tool-getPage",
|
|
||||||
state: "output-available",
|
|
||||||
input: "a ```` b",
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
||||||
} as any,
|
|
||||||
],
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("`````json\na ```` b\n`````");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("buildChatMarkdown — token totals", () => {
|
|
||||||
it("prints the total-tokens line only when the summed usage is > 0", () => {
|
|
||||||
const withTokens = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "x",
|
|
||||||
metadata: { usage: { inputTokens: 10, outputTokens: 5 } },
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(withTokens).toContain("- Total tokens: 15");
|
|
||||||
// Per-row usage footer too.
|
|
||||||
expect(withTokens).toContain("_Tokens — in: 10, out: 5, total: 15_");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("omits the total-tokens line when the sum is 0 / usage absent", () => {
|
|
||||||
const noTokens = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({ role: "user", content: "hi" }),
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "x",
|
|
||||||
metadata: { usage: { inputTokens: 0, outputTokens: 0 } },
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(noTokens).not.toContain("- Total tokens:");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("uses totalTokens when present rather than summing in/out", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "x",
|
|
||||||
metadata: { usage: { inputTokens: 3, outputTokens: 4, totalTokens: 99 } },
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("- Total tokens: 99");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("appends the reasoning figure to the row footer when reasoningTokens > 0", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "x",
|
|
||||||
metadata: {
|
|
||||||
usage: { inputTokens: 10, outputTokens: 8, reasoningTokens: 3 },
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("_Tokens — in: 10, out: 8, reasoning: 3, total: 18_");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("omits the reasoning figure when reasoningTokens is 0 / absent", () => {
|
|
||||||
const zero = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "x",
|
|
||||||
metadata: {
|
|
||||||
usage: { inputTokens: 10, outputTokens: 5, reasoningTokens: 0 },
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(zero).toContain("_Tokens — in: 10, out: 5, total: 15_");
|
|
||||||
expect(zero).not.toContain("reasoning:");
|
|
||||||
|
|
||||||
const absent = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({
|
|
||||||
role: "assistant",
|
|
||||||
content: "x",
|
|
||||||
metadata: { usage: { inputTokens: 10, outputTokens: 5 } },
|
|
||||||
}),
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(absent).not.toContain("reasoning:");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("buildChatMarkdown — pending / in-progress messages", () => {
|
|
||||||
it("continues the heading numbering after the persisted rows", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [row({ role: "user", content: "persisted" })],
|
|
||||||
pending: [
|
|
||||||
{
|
|
||||||
role: "user",
|
|
||||||
parts: [{ type: "text", text: "live question" }],
|
|
||||||
generating: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
role: "assistant",
|
|
||||||
parts: [{ type: "text", text: "live answer" }],
|
|
||||||
generating: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("## 1. You");
|
|
||||||
expect(md).toContain("## 2. You");
|
|
||||||
expect(md).toContain("## 3. AI agent");
|
|
||||||
expect(md).toContain("live question");
|
|
||||||
expect(md).toContain("live answer");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("flags a generating assistant pending message as still being generated", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [row({ role: "user", content: "persisted" })],
|
|
||||||
pending: [
|
|
||||||
{
|
|
||||||
role: "assistant",
|
|
||||||
parts: [{ type: "text", text: "partial reply" }],
|
|
||||||
generating: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("partial reply");
|
|
||||||
expect(md).toContain("still being generated");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("renders a non-generating user pending message without the note", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [row({ role: "user", content: "persisted" })],
|
|
||||||
pending: [
|
|
||||||
{
|
|
||||||
role: "user",
|
|
||||||
parts: [{ type: "text", text: "my live message" }],
|
|
||||||
generating: false,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("my live message");
|
|
||||||
expect(md).not.toContain("still being generated");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("includes the pending messages in the metadata message count", () => {
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [
|
|
||||||
row({ role: "user", content: "a" }),
|
|
||||||
row({ role: "assistant", content: "b" }),
|
|
||||||
],
|
|
||||||
pending: [
|
|
||||||
{
|
|
||||||
role: "user",
|
|
||||||
parts: [{ type: "text", text: "c" }],
|
|
||||||
generating: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
role: "assistant",
|
|
||||||
parts: [{ type: "text", text: "d" }],
|
|
||||||
generating: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
// 2 persisted rows + 2 pending = 4.
|
|
||||||
expect(md).toContain("- Messages: 4");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("emits the heading and note for a generating assistant with empty parts", () => {
|
|
||||||
expect(() =>
|
|
||||||
buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [row({ role: "user", content: "persisted" })],
|
|
||||||
pending: [
|
|
||||||
{
|
|
||||||
role: "assistant",
|
|
||||||
parts: [],
|
|
||||||
generating: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
}),
|
|
||||||
).not.toThrow();
|
|
||||||
const md = buildChatMarkdown({
|
|
||||||
title: "t",
|
|
||||||
chatId: "c",
|
|
||||||
rows: [row({ role: "user", content: "persisted" })],
|
|
||||||
pending: [
|
|
||||||
{
|
|
||||||
role: "assistant",
|
|
||||||
parts: [],
|
|
||||||
generating: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
t,
|
|
||||||
});
|
|
||||||
expect(md).toContain("## 2. AI agent");
|
|
||||||
expect(md).toContain("still being generated");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@@ -1,215 +0,0 @@
|
|||||||
/**
|
|
||||||
* Client-only Markdown builder for an AI agent chat. Serializes the already
|
|
||||||
* persisted message rows (loaded via `useAiChatMessagesQuery`) into a single
|
|
||||||
* Markdown string suitable for copying to the clipboard. NO network call is
|
|
||||||
* made and NO server/DB code is touched — this reuses the rich "request
|
|
||||||
* internals" (tool calls with input/output, per-message token usage,
|
|
||||||
* finish/error info) that the chat already holds client-side.
|
|
||||||
*
|
|
||||||
* Only role labels and tool action labels are localized via the passed-in `t`
|
|
||||||
* translator; the structural document words (Input/Output/Error/Tokens/...) are
|
|
||||||
* plain English constants because the output is a technical artifact.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import type { IAiChatMessageRow } from "@/features/ai-chat/types/ai-chat.types.ts";
|
|
||||||
import {
|
|
||||||
ToolUiPart,
|
|
||||||
getToolName,
|
|
||||||
toolRunState,
|
|
||||||
toolLabelKey,
|
|
||||||
} from "@/features/ai-chat/utils/tool-parts.tsx";
|
|
||||||
|
|
||||||
// Minimal translator signature compatible with react-i18next's `t`.
|
|
||||||
type Translate = (key: string, values?: Record<string, unknown>) => string;
|
|
||||||
|
|
||||||
interface BuildChatMarkdownArgs {
|
|
||||||
title: string | null;
|
|
||||||
chatId: string;
|
|
||||||
rows: IAiChatMessageRow[];
|
|
||||||
/** In-progress, not-yet-persisted live messages (the current streaming
|
|
||||||
* turn) to append after the persisted rows. `generating: true` adds a
|
|
||||||
* note that the message is still being produced. */
|
|
||||||
pending?: PendingMessage[];
|
|
||||||
t: Translate;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** A single AI SDK UIMessage part (text part or other). */
|
|
||||||
interface TextLikePart {
|
|
||||||
type: string;
|
|
||||||
text?: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** A live, not-yet-persisted message (current streaming turn) to append. */
|
|
||||||
interface PendingMessage {
|
|
||||||
role: "user" | "assistant" | string;
|
|
||||||
parts: TextLikePart[];
|
|
||||||
generating: boolean;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stringify an arbitrary tool input/output value for a fenced block. Strings
|
|
||||||
* pass through as-is; everything else is pretty-printed JSON, falling back to
|
|
||||||
* `String(value)` if serialization throws (e.g. a circular structure).
|
|
||||||
*/
|
|
||||||
function stringify(value: unknown): string {
|
|
||||||
if (typeof value === "string") return value;
|
|
||||||
try {
|
|
||||||
return JSON.stringify(value, null, 2);
|
|
||||||
} catch {
|
|
||||||
return String(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrap `code` in a fenced code block whose backtick delimiter is LONGER than
|
|
||||||
* the longest backtick run inside the content, so embedded backticks (or even
|
|
||||||
* a literal ``` fence) never break out of the block. Minimum 3 backticks.
|
|
||||||
*/
|
|
||||||
function fence(code: string, lang = ""): string {
|
|
||||||
const runs: string[] = code.match(/`+/g) ?? [];
|
|
||||||
const longest = runs.reduce((m, s) => Math.max(m, s.length), 0);
|
|
||||||
const delim = "`".repeat(Math.max(3, longest + 1));
|
|
||||||
return `${delim}${lang}\n${code}\n${delim}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Per-row token count, mirroring the header sum in ai-chat-window.tsx. */
|
|
||||||
function rowTokens(usage: {
|
|
||||||
inputTokens?: number;
|
|
||||||
outputTokens?: number;
|
|
||||||
totalTokens?: number;
|
|
||||||
reasoningTokens?: number;
|
|
||||||
}): number {
|
|
||||||
return (
|
|
||||||
usage.totalTokens ?? (usage.inputTokens ?? 0) + (usage.outputTokens ?? 0)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Render one message's UIMessage parts into an array of Markdown blocks
|
|
||||||
* (text blocks + tool blocks). Mirrors MessageItem's part handling. */
|
|
||||||
function renderMessageParts(parts: TextLikePart[], t: Translate): string[] {
|
|
||||||
const out: string[] = [];
|
|
||||||
|
|
||||||
for (const part of parts) {
|
|
||||||
if (part.type === "text") {
|
|
||||||
const text = (part.text ?? "").trim();
|
|
||||||
// Skip empty/whitespace-only text parts (matches MessageItem).
|
|
||||||
if (text.length > 0) out.push(text);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const isToolPart =
|
|
||||||
part.type.startsWith("tool-") || part.type === "dynamic-tool";
|
|
||||||
if (!isToolPart) continue;
|
|
||||||
|
|
||||||
const tp = part as unknown as ToolUiPart;
|
|
||||||
const name = getToolName(tp);
|
|
||||||
const { key, values } = toolLabelKey(name);
|
|
||||||
const label = t(key, values);
|
|
||||||
const state = toolRunState(tp.state);
|
|
||||||
|
|
||||||
const toolLines: string[] = [
|
|
||||||
`**Tool: ${label}** (\`${name}\`) — ${state}`,
|
|
||||||
];
|
|
||||||
if (tp.input !== undefined) {
|
|
||||||
toolLines.push("Input:");
|
|
||||||
toolLines.push(fence(stringify(tp.input), "json"));
|
|
||||||
}
|
|
||||||
if (tp.output !== undefined) {
|
|
||||||
toolLines.push("Output:");
|
|
||||||
toolLines.push(fence(stringify(tp.output), "json"));
|
|
||||||
}
|
|
||||||
if (tp.errorText) {
|
|
||||||
toolLines.push(`**Error:** ${tp.errorText}`);
|
|
||||||
}
|
|
||||||
out.push(toolLines.join("\n\n"));
|
|
||||||
}
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Serialize a chat to a Markdown string. Pure (apart from `new Date()` for the
|
|
||||||
* export timestamp), so it is straightforward to unit-test.
|
|
||||||
*/
|
|
||||||
export function buildChatMarkdown(args: BuildChatMarkdownArgs): string {
|
|
||||||
const { title, chatId, rows, pending, t } = args;
|
|
||||||
const blocks: string[] = [];
|
|
||||||
|
|
||||||
const heading = (title ?? "").trim() || t("Untitled chat");
|
|
||||||
blocks.push(`# ${heading}`);
|
|
||||||
|
|
||||||
// Metadata bullet list. Total tokens is only shown when there is a sum.
|
|
||||||
const totalTokens = rows.reduce((sum, row) => {
|
|
||||||
const usage = row.metadata?.usage;
|
|
||||||
return usage ? sum + rowTokens(usage) : sum;
|
|
||||||
}, 0);
|
|
||||||
const meta = [
|
|
||||||
`- Chat ID: \`${chatId}\``,
|
|
||||||
`- Exported: ${new Date().toISOString()}`,
|
|
||||||
`- Messages: ${rows.length + (pending?.length ?? 0)}`,
|
|
||||||
];
|
|
||||||
if (totalTokens > 0) meta.push(`- Total tokens: ${totalTokens}`);
|
|
||||||
blocks.push(meta.join("\n"));
|
|
||||||
|
|
||||||
rows.forEach((row, index) => {
|
|
||||||
blocks.push("---");
|
|
||||||
|
|
||||||
const roleLabel = row.role === "assistant" ? t("AI agent") : t("You");
|
|
||||||
blocks.push(`## ${index + 1}. ${roleLabel}`);
|
|
||||||
|
|
||||||
// Created-at kept in source as an HTML comment (out of the rendered prose).
|
|
||||||
blocks.push(`<!-- ${row.createdAt} -->`);
|
|
||||||
|
|
||||||
// Resolve parts: prefer the rich persisted parts, else a single text part
|
|
||||||
// built from the plain-text content (mirrors `rowToUiMessage`).
|
|
||||||
const parts: TextLikePart[] =
|
|
||||||
Array.isArray(row.metadata?.parts) && row.metadata.parts.length > 0
|
|
||||||
? (row.metadata.parts as TextLikePart[])
|
|
||||||
: [{ type: "text", text: row.content ?? "" }];
|
|
||||||
|
|
||||||
blocks.push(...renderMessageParts(parts, t));
|
|
||||||
|
|
||||||
if (row.metadata?.error) {
|
|
||||||
blocks.push(`**⚠️ Error:** ${row.metadata.error}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const usage = row.metadata?.usage;
|
|
||||||
if (usage) {
|
|
||||||
const total = usage.totalTokens ?? rowTokens(usage);
|
|
||||||
// Reasoning (thinking) tokens are shown only when the provider reported a
|
|
||||||
// positive count; old rows / non-reasoning providers omit it.
|
|
||||||
const reasoning =
|
|
||||||
usage.reasoningTokens && usage.reasoningTokens > 0
|
|
||||||
? `, reasoning: ${usage.reasoningTokens}`
|
|
||||||
: "";
|
|
||||||
blocks.push(
|
|
||||||
`_Tokens — in: ${usage.inputTokens ?? "?"}, out: ${usage.outputTokens ?? "?"}${reasoning}, total: ${total}_`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Append the in-progress, not-yet-persisted live messages (the current
|
|
||||||
// streaming turn) after the persisted rows. Heading numbering CONTINUES from
|
|
||||||
// the persisted rows. A `generating` assistant gets a note that the captured
|
|
||||||
// response is partial; pending messages carry no usage/token footer yet.
|
|
||||||
(pending ?? []).forEach((message, p) => {
|
|
||||||
blocks.push("---");
|
|
||||||
|
|
||||||
const num = rows.length + p + 1;
|
|
||||||
const roleLabel = message.role === "assistant" ? t("AI agent") : t("You");
|
|
||||||
blocks.push(`## ${num}. ${roleLabel}`);
|
|
||||||
|
|
||||||
blocks.push(...renderMessageParts(message.parts, t));
|
|
||||||
|
|
||||||
// A generating assistant may have empty/no parts yet — still emit the
|
|
||||||
// heading (above) and this note so the export shows the in-progress turn.
|
|
||||||
if (message.generating === true) {
|
|
||||||
blocks.push(
|
|
||||||
"_⏳ This message is still being generated — the export captured a partial, in-progress response._",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Blank line between blocks so the Markdown renders cleanly.
|
|
||||||
return blocks.join("\n\n");
|
|
||||||
}
|
|
||||||
@@ -11,7 +11,7 @@
|
|||||||
"start": "cross-env NODE_ENV=development nest start",
|
"start": "cross-env NODE_ENV=development nest start",
|
||||||
"start:dev": "cross-env NODE_ENV=development nest start --watch",
|
"start:dev": "cross-env NODE_ENV=development nest start --watch",
|
||||||
"start:debug": "cross-env NODE_ENV=development nest start --debug --watch",
|
"start:debug": "cross-env NODE_ENV=development nest start --debug --watch",
|
||||||
"start:prod": "cross-env NODE_ENV=production node dist/main",
|
"start:prod": "cross-env NODE_ENV=production node --heapsnapshot-near-heap-limit=2 dist/main",
|
||||||
"collab:prod": "cross-env NODE_ENV=production node dist/collaboration/server/collab-main",
|
"collab:prod": "cross-env NODE_ENV=production node dist/collaboration/server/collab-main",
|
||||||
"collab:dev": "cross-env NODE_ENV=development node dist/collaboration/server/collab-main",
|
"collab:dev": "cross-env NODE_ENV=development node dist/collaboration/server/collab-main",
|
||||||
"email:dev": "email dev -p 5019 -d ./src/integrations/transactional/emails",
|
"email:dev": "email dev -p 5019 -d ./src/integrations/transactional/emails",
|
||||||
|
|||||||
159
apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts
Normal file
159
apps/server/src/core/ai-chat/ai-chat.controller.export.spec.ts
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
import { ForbiddenException } from '@nestjs/common';
|
||||||
|
import { AiChatController } from './ai-chat.controller';
|
||||||
|
import {
|
||||||
|
planFinalizeAssistant,
|
||||||
|
applyFinalize,
|
||||||
|
flushAssistant,
|
||||||
|
type AssistantFlush,
|
||||||
|
} from './ai-chat.service';
|
||||||
|
import type { User, Workspace } from '@docmost/db/types/entity.types';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wiring spec for the #183 `POST /ai-chat/export` endpoint. It must: own-gate via
|
||||||
|
* the chat lookup (workspace-scoped + creator-owned), load the FULL transcript
|
||||||
|
* via findAllByChat, render server-side, and return `{ markdown }`. Exercised by
|
||||||
|
* instantiating the controller with hand-rolled mocks — no Nest graph, no DB.
|
||||||
|
*/
|
||||||
|
describe('AiChatController.export', () => {
|
||||||
|
const user = { id: 'u1' } as User;
|
||||||
|
const workspace = { id: 'ws1' } as Workspace;
|
||||||
|
|
||||||
|
function makeController(
|
||||||
|
over: {
|
||||||
|
chat?: unknown;
|
||||||
|
rows?: unknown[];
|
||||||
|
} = {},
|
||||||
|
) {
|
||||||
|
const chat =
|
||||||
|
'chat' in over
|
||||||
|
? over.chat
|
||||||
|
: { id: 'c1', creatorId: 'u1', title: 'My chat' };
|
||||||
|
const aiChatRepo = {
|
||||||
|
findById: jest.fn().mockResolvedValue(chat),
|
||||||
|
};
|
||||||
|
const aiChatMessageRepo = {
|
||||||
|
findAllByChat: jest.fn().mockResolvedValue(
|
||||||
|
over.rows ?? [
|
||||||
|
{
|
||||||
|
id: 'm1',
|
||||||
|
role: 'user',
|
||||||
|
content: 'hi',
|
||||||
|
metadata: null,
|
||||||
|
status: null,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: 'm2',
|
||||||
|
role: 'assistant',
|
||||||
|
content: 'hello',
|
||||||
|
metadata: null,
|
||||||
|
status: 'completed',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
};
|
||||||
|
const controller = new AiChatController(
|
||||||
|
{} as never,
|
||||||
|
aiChatRepo as never,
|
||||||
|
aiChatMessageRepo as never,
|
||||||
|
{} as never,
|
||||||
|
);
|
||||||
|
return { controller, aiChatRepo, aiChatMessageRepo };
|
||||||
|
}
|
||||||
|
|
||||||
|
it('renders the full transcript and returns { markdown }', async () => {
|
||||||
|
const { controller, aiChatMessageRepo } = makeController();
|
||||||
|
const res = await controller.export({ chatId: 'c1' }, user, workspace);
|
||||||
|
expect(aiChatMessageRepo.findAllByChat).toHaveBeenCalledWith('c1', 'ws1');
|
||||||
|
expect(res.markdown).toContain('# My chat');
|
||||||
|
expect(res.markdown).toContain('## 1. You');
|
||||||
|
expect(res.markdown).toContain('## 2. AI agent');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('forbids a chat the user does not own', async () => {
|
||||||
|
const { controller } = makeController({
|
||||||
|
chat: { id: 'c1', creatorId: 'someone-else', title: 'X' },
|
||||||
|
});
|
||||||
|
await expect(
|
||||||
|
controller.export({ chatId: 'c1' }, user, workspace),
|
||||||
|
).rejects.toBeInstanceOf(ForbiddenException);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('forbids a missing / foreign-workspace chat', async () => {
|
||||||
|
const { controller } = makeController({ chat: null });
|
||||||
|
await expect(
|
||||||
|
controller.export({ chatId: 'c1' }, user, workspace),
|
||||||
|
).rejects.toBeInstanceOf(ForbiddenException);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('localizes labels when lang=ru is passed', async () => {
|
||||||
|
const { controller } = makeController();
|
||||||
|
const res = await controller.export(
|
||||||
|
{ chatId: 'c1', lang: 'ru' },
|
||||||
|
user,
|
||||||
|
workspace,
|
||||||
|
);
|
||||||
|
expect(res.markdown).toContain('## 1. Вы');
|
||||||
|
expect(res.markdown).toContain('## 2. ИИ-агент');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The terminal-finalize dispatch (#183): the assistant row is INSERTed upfront
|
||||||
|
* as 'streaming' and finalized once on the terminal callback. When the upfront
|
||||||
|
* insert SUCCEEDED (we hold an id) finalize UPDATEs that row; when it FAILED
|
||||||
|
* (assistantId is undefined) finalize falls back to INSERTing the terminal row
|
||||||
|
* so the turn is not lost — the only safety against losing the turn entirely.
|
||||||
|
*
|
||||||
|
* `planFinalizeAssistant` is the pure decision; `applyFinalize` is the REAL
|
||||||
|
* dispatch the service uses, exercised here over a mock repo (not a copy of the
|
||||||
|
* logic) so a production drift would fail the test (#186 review).
|
||||||
|
*/
|
||||||
|
describe('finalizeAssistant dispatch (planFinalizeAssistant + applyFinalize)', () => {
|
||||||
|
const workspaceId = 'ws1';
|
||||||
|
|
||||||
|
// Drive the SAME applyFinalize the service calls (no duplicated logic).
|
||||||
|
async function dispatchFinalize(
|
||||||
|
repo: { insert: jest.Mock; update: jest.Mock },
|
||||||
|
assistantId: string | undefined,
|
||||||
|
flushed: AssistantFlush,
|
||||||
|
): Promise<void> {
|
||||||
|
await applyFinalize(
|
||||||
|
repo,
|
||||||
|
planFinalizeAssistant(assistantId),
|
||||||
|
{ chatId: 'c1', workspaceId, userId: 'u1' },
|
||||||
|
flushed,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
it('plan: update when the upfront insert returned an id', () => {
|
||||||
|
expect(planFinalizeAssistant('a1')).toEqual({ kind: 'update', id: 'a1' });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('plan: insert (fallback) when there is no upfront id', () => {
|
||||||
|
expect(planFinalizeAssistant(undefined)).toEqual({ kind: 'insert' });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('(a) upfront insert succeeded -> finalize UPDATEs the row by id', async () => {
|
||||||
|
const repo = { insert: jest.fn(), update: jest.fn() };
|
||||||
|
const flushed = flushAssistant([], 'final answer', 'completed', {
|
||||||
|
finishReason: 'stop',
|
||||||
|
});
|
||||||
|
await dispatchFinalize(repo, 'a1', flushed);
|
||||||
|
expect(repo.update).toHaveBeenCalledWith('a1', workspaceId, flushed);
|
||||||
|
expect(repo.insert).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('(b) upfront insert failed -> finalize INSERTs the terminal payload', async () => {
|
||||||
|
const repo = { insert: jest.fn(), update: jest.fn() };
|
||||||
|
const flushed = flushAssistant([], 'partial', 'error', { error: 'boom' });
|
||||||
|
await dispatchFinalize(repo, undefined, flushed);
|
||||||
|
expect(repo.update).not.toHaveBeenCalled();
|
||||||
|
expect(repo.insert).toHaveBeenCalledTimes(1);
|
||||||
|
const arg = repo.insert.mock.calls[0][0];
|
||||||
|
// The fallback insert carries the terminal content/status/metadata.
|
||||||
|
expect(arg.role).toBe('assistant');
|
||||||
|
expect(arg.content).toBe('partial');
|
||||||
|
expect(arg.status).toBe('error');
|
||||||
|
expect((arg.metadata as { error?: string }).error).toBe('boom');
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -20,7 +20,7 @@ import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard';
|
|||||||
import { AuthUser } from '../../common/decorators/auth-user.decorator';
|
import { AuthUser } from '../../common/decorators/auth-user.decorator';
|
||||||
import { AuthWorkspace } from '../../common/decorators/auth-workspace.decorator';
|
import { AuthWorkspace } from '../../common/decorators/auth-workspace.decorator';
|
||||||
import { SkipTransform } from '../../common/decorators/skip-transform.decorator';
|
import { SkipTransform } from '../../common/decorators/skip-transform.decorator';
|
||||||
import { User, Workspace } from '@docmost/db/types/entity.types';
|
import { AiChat, User, Workspace } from '@docmost/db/types/entity.types';
|
||||||
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
|
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
|
||||||
import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo';
|
import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo';
|
||||||
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
|
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
|
||||||
@@ -31,10 +31,12 @@ import { AiChatService, AiChatStreamBody } from './ai-chat.service';
|
|||||||
import { AiTranscriptionService } from './ai-transcription.service';
|
import { AiTranscriptionService } from './ai-transcription.service';
|
||||||
import {
|
import {
|
||||||
ChatIdDto,
|
ChatIdDto,
|
||||||
|
ExportChatDto,
|
||||||
GetChatMessagesDto,
|
GetChatMessagesDto,
|
||||||
RenameChatDto,
|
RenameChatDto,
|
||||||
} from './dto/ai-chat.dto';
|
} from './dto/ai-chat.dto';
|
||||||
import { describeProviderError } from '../../integrations/ai/ai-error.util';
|
import { describeProviderError } from '../../integrations/ai/ai-error.util';
|
||||||
|
import { buildChatMarkdown } from './chat-markdown.util';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Per-user AI chat API (§6.1). Routes are POST to match this codebase's
|
* Per-user AI chat API (§6.1). Routes are POST to match this codebase's
|
||||||
@@ -81,6 +83,36 @@ export class AiChatController {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Export a chat to Markdown (#183). The DB is the single source of truth: the
|
||||||
|
* whole transcript is loaded (oldest -> newest) and rendered server-side. Now
|
||||||
|
* that the assistant row is persisted upfront and per step, an interrupted
|
||||||
|
* turn is included up to its last finished step. Workspace-scoped and owner-
|
||||||
|
* gated via assertOwnedChat (same as the other read endpoints). Returns
|
||||||
|
* `{ markdown }`. `lang` localizes the few fixed labels (default English).
|
||||||
|
*/
|
||||||
|
@HttpCode(HttpStatus.OK)
|
||||||
|
@Post('export')
|
||||||
|
async export(
|
||||||
|
@Body() dto: ExportChatDto,
|
||||||
|
@AuthUser() user: User,
|
||||||
|
@AuthWorkspace() workspace: Workspace,
|
||||||
|
): Promise<{ markdown: string }> {
|
||||||
|
const chat = await this.assertOwnedChat(dto.chatId, user, workspace);
|
||||||
|
const rows = await this.aiChatMessageRepo.findAllByChat(
|
||||||
|
dto.chatId,
|
||||||
|
workspace.id,
|
||||||
|
);
|
||||||
|
const markdown = buildChatMarkdown({
|
||||||
|
title: chat.title ?? null,
|
||||||
|
chatId: dto.chatId,
|
||||||
|
rows,
|
||||||
|
// normalizeLang(undefined) already yields 'en', so no `?? 'en'` is needed.
|
||||||
|
lang: dto.lang,
|
||||||
|
});
|
||||||
|
return { markdown };
|
||||||
|
}
|
||||||
|
|
||||||
/** Rename a chat. */
|
/** Rename a chat. */
|
||||||
@HttpCode(HttpStatus.OK)
|
@HttpCode(HttpStatus.OK)
|
||||||
@Post('rename')
|
@Post('rename')
|
||||||
@@ -90,7 +122,11 @@ export class AiChatController {
|
|||||||
@AuthWorkspace() workspace: Workspace,
|
@AuthWorkspace() workspace: Workspace,
|
||||||
) {
|
) {
|
||||||
await this.assertOwnedChat(dto.chatId, user, workspace);
|
await this.assertOwnedChat(dto.chatId, user, workspace);
|
||||||
await this.aiChatRepo.update(dto.chatId, { title: dto.title }, workspace.id);
|
await this.aiChatRepo.update(
|
||||||
|
dto.chatId,
|
||||||
|
{ title: dto.title },
|
||||||
|
workspace.id,
|
||||||
|
);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,7 +181,10 @@ export class AiChatController {
|
|||||||
// Resolve the agent role for this turn BEFORE hijack: existing chats read it
|
// Resolve the agent role for this turn BEFORE hijack: existing chats read it
|
||||||
// from ai_chats.role_id (authoritative), a new chat from body.roleId. The
|
// from ai_chats.role_id (authoritative), a new chat from body.roleId. The
|
||||||
// role drives both the persona and the optional model override below.
|
// role drives both the persona and the optional model override below.
|
||||||
const role = await this.aiChatService.resolveRoleForRequest(workspace, body);
|
const role = await this.aiChatService.resolveRoleForRequest(
|
||||||
|
workspace,
|
||||||
|
body,
|
||||||
|
);
|
||||||
|
|
||||||
// Resolve the model (applying the role's optional override) BEFORE hijack so
|
// Resolve the model (applying the role's optional override) BEFORE hijack so
|
||||||
// an unconfigured provider — including a role pointing at an unconfigured
|
// an unconfigured provider — including a role pointing at an unconfigured
|
||||||
@@ -232,7 +271,9 @@ export class AiChatController {
|
|||||||
let file = null;
|
let file = null;
|
||||||
try {
|
try {
|
||||||
// Whisper hard-caps uploads at 25MB; allow a single file.
|
// Whisper hard-caps uploads at 25MB; allow a single file.
|
||||||
file = await req.file({ limits: { fileSize: 25 * 1024 * 1024, files: 1 } });
|
file = await req.file({
|
||||||
|
limits: { fileSize: 25 * 1024 * 1024, files: 1 },
|
||||||
|
});
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
if (err?.statusCode === 413) {
|
if (err?.statusCode === 413) {
|
||||||
throw new BadRequestException('Audio file too large (max 25MB)');
|
throw new BadRequestException('Audio file too large (max 25MB)');
|
||||||
@@ -283,11 +324,12 @@ export class AiChatController {
|
|||||||
chatId: string,
|
chatId: string,
|
||||||
user: User,
|
user: User,
|
||||||
workspace: Workspace,
|
workspace: Workspace,
|
||||||
): Promise<void> {
|
): Promise<AiChat> {
|
||||||
const chat = await this.aiChatRepo.findById(chatId, workspace.id);
|
const chat = await this.aiChatRepo.findById(chatId, workspace.id);
|
||||||
if (!chat || chat.creatorId !== user.id) {
|
if (!chat || chat.creatorId !== user.id) {
|
||||||
throw new ForbiddenException();
|
throw new ForbiddenException();
|
||||||
}
|
}
|
||||||
|
return chat;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,61 @@
|
|||||||
|
import { Logger } from '@nestjs/common';
|
||||||
|
import { AiChatService } from './ai-chat.service';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lifecycle unit tests for AiChatService.onModuleInit (#183 crash-recovery
|
||||||
|
* sweep). The sweep is BEST-EFFORT: a failure must be logged (warn) but must
|
||||||
|
* NEVER throw out of onModuleInit and block server startup. Exercised with a
|
||||||
|
* hand-rolled mock repo — no Nest graph, no DB. Only `aiChatMessageRepo` is
|
||||||
|
* touched by onModuleInit, so the other constructor deps are stubbed as never.
|
||||||
|
*/
|
||||||
|
describe('AiChatService.onModuleInit (startup sweep)', () => {
|
||||||
|
function makeService(sweepStreaming: jest.Mock) {
|
||||||
|
const aiChatMessageRepo = { sweepStreaming };
|
||||||
|
const service = new AiChatService(
|
||||||
|
{} as never, // ai
|
||||||
|
{} as never, // aiChatRepo
|
||||||
|
aiChatMessageRepo as never,
|
||||||
|
{} as never, // aiSettings
|
||||||
|
{} as never, // tools
|
||||||
|
{} as never, // mcpClients
|
||||||
|
{} as never, // aiAgentRoleRepo
|
||||||
|
{} as never, // pageRepo
|
||||||
|
{} as never, // pageAccess
|
||||||
|
);
|
||||||
|
return { service, aiChatMessageRepo };
|
||||||
|
}
|
||||||
|
|
||||||
|
afterEach(() => jest.restoreAllMocks());
|
||||||
|
|
||||||
|
it('happy path: calls sweepStreaming and resolves', async () => {
|
||||||
|
const sweepStreaming = jest.fn().mockResolvedValue(0);
|
||||||
|
const { service } = makeService(sweepStreaming);
|
||||||
|
await expect(service.onModuleInit()).resolves.toBeUndefined();
|
||||||
|
expect(sweepStreaming).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('logs how many rows were swept when > 0', async () => {
|
||||||
|
const sweepStreaming = jest.fn().mockResolvedValue(3);
|
||||||
|
const logSpy = jest
|
||||||
|
.spyOn(Logger.prototype, 'log')
|
||||||
|
.mockImplementation(() => undefined);
|
||||||
|
const { service } = makeService(sweepStreaming);
|
||||||
|
await service.onModuleInit();
|
||||||
|
expect(logSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(String(logSpy.mock.calls[0][0])).toContain('3');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sweepStreaming throws -> onModuleInit resolves (does NOT throw) and warns', async () => {
|
||||||
|
const sweepStreaming = jest
|
||||||
|
.fn()
|
||||||
|
.mockRejectedValue(new Error('db unavailable'));
|
||||||
|
const warnSpy = jest
|
||||||
|
.spyOn(Logger.prototype, 'warn')
|
||||||
|
.mockImplementation(() => undefined);
|
||||||
|
const { service } = makeService(sweepStreaming);
|
||||||
|
// Must not throw — a sweep failure may never block startup.
|
||||||
|
await expect(service.onModuleInit()).resolves.toBeUndefined();
|
||||||
|
expect(warnSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(String(warnSpy.mock.calls[0][0])).toContain('db unavailable');
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -4,7 +4,7 @@ import {
|
|||||||
serializeSteps,
|
serializeSteps,
|
||||||
rowToUiMessage,
|
rowToUiMessage,
|
||||||
prepareAgentStep,
|
prepareAgentStep,
|
||||||
buildPartialAssistantRecord,
|
flushAssistant,
|
||||||
chatStreamMetadata,
|
chatStreamMetadata,
|
||||||
accumulateStepUsage,
|
accumulateStepUsage,
|
||||||
MAX_AGENT_STEPS,
|
MAX_AGENT_STEPS,
|
||||||
@@ -94,8 +94,12 @@ describe('assistantParts', () => {
|
|||||||
const steps = [
|
const steps = [
|
||||||
{
|
{
|
||||||
text: '',
|
text: '',
|
||||||
toolCalls: [{ toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } }],
|
toolCalls: [
|
||||||
toolResults: [{ toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } }],
|
{ toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } },
|
||||||
|
],
|
||||||
|
toolResults: [
|
||||||
|
{ toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } },
|
||||||
|
],
|
||||||
},
|
},
|
||||||
];
|
];
|
||||||
const parts = assistantParts(steps, '') as AnyPart[];
|
const parts = assistantParts(steps, '') as AnyPart[];
|
||||||
@@ -109,7 +113,9 @@ describe('assistantParts', () => {
|
|||||||
const steps = [
|
const steps = [
|
||||||
{
|
{
|
||||||
text: '',
|
text: '',
|
||||||
toolCalls: [{ toolCallId: 'c9', toolName: 'insertNode', input: { node: {} } }],
|
toolCalls: [
|
||||||
|
{ toolCallId: 'c9', toolName: 'insertNode', input: { node: {} } },
|
||||||
|
],
|
||||||
toolResults: [],
|
toolResults: [],
|
||||||
},
|
},
|
||||||
];
|
];
|
||||||
@@ -136,7 +142,8 @@ describe('assistantParts', () => {
|
|||||||
];
|
];
|
||||||
const parts = assistantParts(steps, '') as AnyPart[];
|
const parts = assistantParts(steps, '') as AnyPart[];
|
||||||
const toolParts = parts.filter(
|
const toolParts = parts.filter(
|
||||||
(p) => typeof p.type === 'string' && (p.type as string).startsWith('tool-'),
|
(p) =>
|
||||||
|
typeof p.type === 'string' && (p.type as string).startsWith('tool-'),
|
||||||
);
|
);
|
||||||
expect(toolParts).toHaveLength(0);
|
expect(toolParts).toHaveLength(0);
|
||||||
});
|
});
|
||||||
@@ -222,79 +229,108 @@ describe('prepareAgentStep', () => {
|
|||||||
// The synthesis instruction is appended.
|
// The synthesis instruction is appended.
|
||||||
expect(result?.system).toContain(FINAL_STEP_INSTRUCTION);
|
expect(result?.system).toContain(FINAL_STEP_INSTRUCTION);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('pins the off-by-one boundary (MAX-2 is not final, MAX-1 is)', () => {
|
|
||||||
// Boundary expressed via the constant, not a hardcoded 18/19, so the test
|
|
||||||
// tracks MAX_AGENT_STEPS if the cap ever changes.
|
|
||||||
expect(prepareAgentStep(MAX_AGENT_STEPS - 2, 'SYS')).toBeUndefined();
|
|
||||||
const atBoundary = prepareAgentStep(MAX_AGENT_STEPS - 1, 'SYS');
|
|
||||||
expect(atBoundary).toBeDefined();
|
|
||||||
expect(atBoundary?.toolChoice).toBe('none');
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit test for buildPartialAssistantRecord: the pure helper that shapes the
|
* flushAssistant (#183): the PURE row builder behind the step-granular durable
|
||||||
* assistant-message record persisted on a partial/failed turn (the streamText
|
* write path. It runs identically for the upfront insert (empty steps,
|
||||||
* onError / onAbort paths). It captures the PARTIAL answer the user already saw
|
* 'streaming'), every per-step update, and the terminal finalize — so a future
|
||||||
* (finished steps' text + tool parts, plus the in-progress step's text) so a
|
* background worker can call the same function. These tests pin the four status
|
||||||
* provider error / disconnect no longer throws the streamed answer away. Pinning
|
* shapes and the `metadata.parts` shape that rowToUiMessage/findRecent depend on
|
||||||
* the record shape here covers the persist-partial logic without seaming
|
* (per-step text + tool parts via assistantParts, in-progress text appended).
|
||||||
* streamText itself.
|
|
||||||
*/
|
*/
|
||||||
describe('buildPartialAssistantRecord', () => {
|
describe('flushAssistant', () => {
|
||||||
type AnyPart = Record<string, unknown>;
|
type AnyPart = Record<string, unknown>;
|
||||||
|
|
||||||
it('records an empty turn with the error text (preserves old behavior)', () => {
|
const toolStep = {
|
||||||
const rec = buildPartialAssistantRecord([], '', 'error', '401: Unauthorized');
|
|
||||||
expect(rec).toEqual({
|
|
||||||
text: '',
|
|
||||||
toolCalls: null,
|
|
||||||
metadata: { finishReason: 'error', parts: [], error: '401: Unauthorized' },
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('persists in-progress text (no finished steps) as the partial answer', () => {
|
|
||||||
const rec = buildPartialAssistantRecord([], 'partial answer', 'error', 'boom');
|
|
||||||
expect(rec.text).toBe('partial answer');
|
|
||||||
expect(rec.metadata.parts).toEqual([
|
|
||||||
{ type: 'text', text: 'partial answer' },
|
|
||||||
]);
|
|
||||||
expect(rec.metadata.error).toBe('boom');
|
|
||||||
});
|
|
||||||
|
|
||||||
it('combines a finished tool step with trailing in-progress text', () => {
|
|
||||||
const steps = [
|
|
||||||
{
|
|
||||||
text: 'looked it up',
|
text: 'looked it up',
|
||||||
toolCalls: [
|
toolCalls: [{ toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } }],
|
||||||
{ toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } },
|
|
||||||
],
|
|
||||||
toolResults: [
|
toolResults: [
|
||||||
{ toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } },
|
{ toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } },
|
||||||
],
|
],
|
||||||
},
|
};
|
||||||
];
|
|
||||||
const rec = buildPartialAssistantRecord(steps, ' and then', 'error', 'boom');
|
it('upfront seed: empty streaming row (no content, no toolCalls, empty parts)', () => {
|
||||||
const parts = rec.metadata.parts as AnyPart[];
|
const f = flushAssistant([], '', 'streaming');
|
||||||
// The finished step's text part is present.
|
expect(f.status).toBe('streaming');
|
||||||
expect(parts).toContainEqual({ type: 'text', text: 'looked it up' });
|
expect(f.content).toBe('');
|
||||||
// The paired tool call+result becomes an output-available part.
|
expect(f.toolCalls).toBeNull();
|
||||||
const toolPart = parts.find((p) => p.type === 'tool-getPage');
|
expect(f.metadata.parts).toEqual([]);
|
||||||
expect(toolPart).toBeDefined();
|
// No finishReason while streaming (it is not a terminal state).
|
||||||
expect(toolPart!.state).toBe('output-available');
|
expect('finishReason' in f.metadata).toBe(false);
|
||||||
// The in-progress text is appended LAST so the parts match the stream order.
|
|
||||||
expect(parts[parts.length - 1]).toEqual({ type: 'text', text: ' and then' });
|
|
||||||
expect(rec.text).toBe('looked it up and then');
|
|
||||||
expect(rec.toolCalls).not.toBeNull();
|
|
||||||
expect(rec.metadata.error).toBe('boom');
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('omits the error key on the abort path (no errorText)', () => {
|
it('streaming update folds in finished steps but keeps status streaming', () => {
|
||||||
const rec = buildPartialAssistantRecord([], 'half', 'aborted');
|
const f = flushAssistant([toolStep], '', 'streaming');
|
||||||
expect(rec.metadata.finishReason).toBe('aborted');
|
expect(f.status).toBe('streaming');
|
||||||
expect('error' in rec.metadata).toBe(false);
|
expect(f.content).toBe('looked it up');
|
||||||
expect(rec.text).toBe('half');
|
const parts = f.metadata.parts as AnyPart[];
|
||||||
|
expect(parts).toContainEqual({ type: 'text', text: 'looked it up' });
|
||||||
|
const toolPart = parts.find((p) => p.type === 'tool-getPage');
|
||||||
|
expect(toolPart!.state).toBe('output-available');
|
||||||
|
expect(f.toolCalls).not.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('completed: attaches finishReason + normalized usage + contextTokens', () => {
|
||||||
|
const f = flushAssistant([toolStep], '', 'completed', {
|
||||||
|
finishReason: 'stop',
|
||||||
|
usage: { inputTokens: 10, outputTokens: 5, totalTokens: 15 },
|
||||||
|
contextTokens: 15,
|
||||||
|
});
|
||||||
|
expect(f.status).toBe('completed');
|
||||||
|
expect(f.metadata.finishReason).toBe('stop');
|
||||||
|
expect(f.metadata.usage).toEqual({
|
||||||
|
inputTokens: 10,
|
||||||
|
outputTokens: 5,
|
||||||
|
totalTokens: 15,
|
||||||
|
reasoningTokens: undefined,
|
||||||
|
});
|
||||||
|
expect(f.metadata.contextTokens).toBe(15);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('error: records the error and a derived finishReason', () => {
|
||||||
|
const f = flushAssistant([], 'partial answer', 'error', { error: 'boom' });
|
||||||
|
expect(f.status).toBe('error');
|
||||||
|
expect(f.content).toBe('partial answer');
|
||||||
|
expect(f.metadata.error).toBe('boom');
|
||||||
|
// Derives finishReason from the terminal status when none is supplied.
|
||||||
|
expect(f.metadata.finishReason).toBe('error');
|
||||||
|
expect(f.metadata.parts).toEqual([
|
||||||
|
{ type: 'text', text: 'partial answer' },
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('aborted: in-progress text appended last, no error key', () => {
|
||||||
|
const f = flushAssistant([toolStep], ' and then', 'aborted');
|
||||||
|
expect(f.status).toBe('aborted');
|
||||||
|
expect(f.metadata.finishReason).toBe('aborted');
|
||||||
|
expect('error' in f.metadata).toBe(false);
|
||||||
|
expect(f.content).toBe('looked it up and then');
|
||||||
|
const parts = f.metadata.parts as AnyPart[];
|
||||||
|
expect(parts[parts.length - 1]).toEqual({
|
||||||
|
type: 'text',
|
||||||
|
text: ' and then',
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('combines a finished tool step with trailing in-progress text (error path)', () => {
|
||||||
|
// The error path captures the PARTIAL answer the user already saw: each
|
||||||
|
// finished step's text + tool parts, then the in-progress step's text last.
|
||||||
|
const flushed = flushAssistant([toolStep], ' and then', 'error', {
|
||||||
|
error: 'boom',
|
||||||
|
});
|
||||||
|
const parts = flushed.metadata.parts as AnyPart[];
|
||||||
|
expect(parts).toContainEqual({ type: 'text', text: 'looked it up' });
|
||||||
|
const toolPart = parts.find((p) => p.type === 'tool-getPage');
|
||||||
|
expect(toolPart!.state).toBe('output-available');
|
||||||
|
// In-progress text appended LAST so the parts match the stream order.
|
||||||
|
expect(parts[parts.length - 1]).toEqual({
|
||||||
|
type: 'text',
|
||||||
|
text: ' and then',
|
||||||
|
});
|
||||||
|
expect(flushed.content).toBe('looked it up and then');
|
||||||
|
expect(flushed.toolCalls).not.toBeNull();
|
||||||
|
expect(flushed.metadata.error).toBe('boom');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -319,10 +355,20 @@ describe('chatStreamMetadata', () => {
|
|||||||
chatStreamMetadata(
|
chatStreamMetadata(
|
||||||
{ type: 'finish-step', usage: { outputTokens: 100 } },
|
{ type: 'finish-step', usage: { outputTokens: 100 } },
|
||||||
'chat-1',
|
'chat-1',
|
||||||
{ inputTokens: 500, outputTokens: 220, totalTokens: 720, reasoningTokens: 30 },
|
{
|
||||||
|
inputTokens: 500,
|
||||||
|
outputTokens: 220,
|
||||||
|
totalTokens: 720,
|
||||||
|
reasoningTokens: 30,
|
||||||
|
},
|
||||||
),
|
),
|
||||||
).toEqual({
|
).toEqual({
|
||||||
usage: { inputTokens: 500, outputTokens: 220, totalTokens: 720, reasoningTokens: 30 },
|
usage: {
|
||||||
|
inputTokens: 500,
|
||||||
|
outputTokens: 220,
|
||||||
|
totalTokens: 720,
|
||||||
|
reasoningTokens: 30,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -394,8 +440,18 @@ describe('accumulateStepUsage', () => {
|
|||||||
it('sums every field across two steps', () => {
|
it('sums every field across two steps', () => {
|
||||||
expect(
|
expect(
|
||||||
accumulateStepUsage(
|
accumulateStepUsage(
|
||||||
{ inputTokens: 500, outputTokens: 100, totalTokens: 600, reasoningTokens: 30 },
|
{
|
||||||
{ inputTokens: 520, outputTokens: 80, totalTokens: 600, reasoningTokens: 10 },
|
inputTokens: 500,
|
||||||
|
outputTokens: 100,
|
||||||
|
totalTokens: 600,
|
||||||
|
reasoningTokens: 30,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inputTokens: 520,
|
||||||
|
outputTokens: 80,
|
||||||
|
totalTokens: 600,
|
||||||
|
reasoningTokens: 10,
|
||||||
|
},
|
||||||
),
|
),
|
||||||
).toEqual({
|
).toEqual({
|
||||||
inputTokens: 1020,
|
inputTokens: 1020,
|
||||||
|
|||||||
@@ -1,4 +1,9 @@
|
|||||||
import { ForbiddenException, Injectable, Logger } from '@nestjs/common';
|
import {
|
||||||
|
ForbiddenException,
|
||||||
|
Injectable,
|
||||||
|
Logger,
|
||||||
|
OnModuleInit,
|
||||||
|
} from '@nestjs/common';
|
||||||
import { FastifyReply } from 'fastify';
|
import { FastifyReply } from 'fastify';
|
||||||
import {
|
import {
|
||||||
streamText,
|
streamText,
|
||||||
@@ -60,7 +65,10 @@ export function prepareAgentStep(
|
|||||||
system: string,
|
system: string,
|
||||||
): { toolChoice: 'none'; system: string } | undefined {
|
): { toolChoice: 'none'; system: string } | undefined {
|
||||||
if (stepNumber >= MAX_AGENT_STEPS - 1) {
|
if (stepNumber >= MAX_AGENT_STEPS - 1) {
|
||||||
return { toolChoice: 'none', system: `${system}\n\n${FINAL_STEP_INSTRUCTION}` };
|
return {
|
||||||
|
toolChoice: 'none',
|
||||||
|
system: `${system}\n\n${FINAL_STEP_INSTRUCTION}`,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
@@ -121,7 +129,7 @@ export interface AiChatStreamArgs {
|
|||||||
* can be rebuilt for `convertToModelMessages`.
|
* can be rebuilt for `convertToModelMessages`.
|
||||||
*/
|
*/
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class AiChatService {
|
export class AiChatService implements OnModuleInit {
|
||||||
private readonly logger = new Logger(AiChatService.name);
|
private readonly logger = new Logger(AiChatService.name);
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@@ -136,6 +144,32 @@ export class AiChatService {
|
|||||||
private readonly pageAccess: PageAccessService,
|
private readonly pageAccess: PageAccessService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Crash-recovery sweep on server start (#183): any assistant row left in the
|
||||||
|
* 'streaming' state is the relic of a turn whose process died before it
|
||||||
|
* reached a terminal status. Flip those to 'aborted' so history/export show
|
||||||
|
* them settled (with whatever finished steps were already persisted) instead
|
||||||
|
* of perpetually "streaming". Best-effort: a sweep failure is logged but must
|
||||||
|
* never block server startup.
|
||||||
|
*/
|
||||||
|
async onModuleInit(): Promise<void> {
|
||||||
|
try {
|
||||||
|
const swept = await this.aiChatMessageRepo.sweepStreaming();
|
||||||
|
if (swept > 0) {
|
||||||
|
this.logger.log(
|
||||||
|
`Startup sweep: marked ${swept} dangling 'streaming' assistant ` +
|
||||||
|
`message(s) as 'aborted'.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Startup sweep of dangling 'streaming' messages failed: ${
|
||||||
|
err instanceof Error ? err.message : 'unknown error'
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resolve the agent role that applies to this stream request, scoped to the
|
* Resolve the agent role that applies to this stream request, scoped to the
|
||||||
* workspace and soft-delete aware. For an EXISTING chat the role is read from
|
* workspace and soft-delete aware. For an EXISTING chat the role is read from
|
||||||
@@ -259,9 +293,7 @@ export class AiChatService {
|
|||||||
content: incomingText,
|
content: incomingText,
|
||||||
// jsonb column: UIMessage parts are JSON-serializable at runtime but not
|
// jsonb column: UIMessage parts are JSON-serializable at runtime but not
|
||||||
// structurally `JsonValue`, so cast through unknown.
|
// structurally `JsonValue`, so cast through unknown.
|
||||||
metadata: (incoming?.parts
|
metadata: (incoming?.parts ? { parts: incoming.parts } : null) as never,
|
||||||
? { parts: incoming.parts }
|
|
||||||
: null) as never,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Rebuild the conversation from persisted history (not the client payload),
|
// Rebuild the conversation from persisted history (not the client payload),
|
||||||
@@ -347,31 +379,6 @@ export class AiChatService {
|
|||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Persist the assistant message. Used by onFinish (full result) and the
|
|
||||||
// abort/error paths (partial result). Guarded so we persist at most once.
|
|
||||||
let persisted = false;
|
|
||||||
const persistAssistant = async (data: {
|
|
||||||
text: string;
|
|
||||||
toolCalls: unknown;
|
|
||||||
metadata: Record<string, unknown>;
|
|
||||||
}): Promise<void> => {
|
|
||||||
if (persisted) return;
|
|
||||||
persisted = true;
|
|
||||||
try {
|
|
||||||
await this.aiChatMessageRepo.insert({
|
|
||||||
chatId,
|
|
||||||
workspaceId: workspace.id,
|
|
||||||
userId: user.id,
|
|
||||||
role: 'assistant',
|
|
||||||
content: data.text ?? '',
|
|
||||||
toolCalls: (data.toolCalls ?? null) as never,
|
|
||||||
metadata: data.metadata as never,
|
|
||||||
});
|
|
||||||
} catch (err) {
|
|
||||||
this.logger.error('Failed to persist assistant message', err as Error);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Accumulate the turn's streamed output so a provider error / disconnect can
|
// Accumulate the turn's streamed output so a provider error / disconnect can
|
||||||
// persist the PARTIAL answer the user already saw — the SDK's onError/onAbort
|
// persist the PARTIAL answer the user already saw — the SDK's onError/onAbort
|
||||||
// callbacks don't hand us the in-progress text. `capturedSteps` holds finished
|
// callbacks don't hand us the in-progress text. `capturedSteps` holds finished
|
||||||
@@ -380,6 +387,101 @@ export class AiChatService {
|
|||||||
const capturedSteps: StepLike[] = [];
|
const capturedSteps: StepLike[] = [];
|
||||||
let inProgressText = '';
|
let inProgressText = '';
|
||||||
|
|
||||||
|
// Step-granular durability (#183): create the assistant row UPFRONT in the
|
||||||
|
// 'streaming' state (before any token), then UPDATE it as each step finishes
|
||||||
|
// and finalize it once on the terminal callback. If the process dies
|
||||||
|
// mid-turn the row survives with every finished step already persisted; the
|
||||||
|
// startup sweep (sweepStreaming) later flips a dangling 'streaming' row to
|
||||||
|
// 'aborted'. The DB is now the single source of truth for the turn — the
|
||||||
|
// socket is never required for the write path. A failed upfront insert is
|
||||||
|
// logged and leaves assistantId undefined; the per-step/terminal updates then
|
||||||
|
// no-op (guarded below) so the turn still streams to the user.
|
||||||
|
let assistantId: string | undefined;
|
||||||
|
try {
|
||||||
|
const seed = flushAssistant([], '', 'streaming');
|
||||||
|
const seeded = await this.aiChatMessageRepo.insert({
|
||||||
|
chatId,
|
||||||
|
workspaceId: workspace.id,
|
||||||
|
userId: user.id,
|
||||||
|
role: 'assistant',
|
||||||
|
content: seed.content,
|
||||||
|
// jsonb columns: cast through never (same as the user insert above).
|
||||||
|
toolCalls: (seed.toolCalls ?? null) as never,
|
||||||
|
metadata: seed.metadata as never,
|
||||||
|
status: seed.status,
|
||||||
|
});
|
||||||
|
assistantId = seeded?.id;
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to insert upfront assistant row (chat ${chatId}, workspace ${workspace.id})`,
|
||||||
|
err as Error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Per-step (non-terminal) update: persist the finished steps the moment a
|
||||||
|
// step ends. Tolerant — a failed update is logged and swallowed so it never
|
||||||
|
// throws into the stream. Keeps status 'streaming'.
|
||||||
|
const updateStreaming = async (): Promise<void> => {
|
||||||
|
if (!assistantId) return;
|
||||||
|
// Cheap short-circuit once the turn is finalized (see `finalized` below).
|
||||||
|
// The AUTHORITATIVE guard is `onlyIfStreaming` on the UPDATE: a late
|
||||||
|
// fire-and-forget step update could still be in flight on another pool
|
||||||
|
// connection when finalize runs, so the SQL `WHERE status='streaming'`
|
||||||
|
// (not this flag) is what prevents it clobbering the terminal row.
|
||||||
|
if (finalized) return;
|
||||||
|
try {
|
||||||
|
await this.aiChatMessageRepo.update(
|
||||||
|
assistantId,
|
||||||
|
workspace.id,
|
||||||
|
flushAssistant(capturedSteps, '', 'streaming'),
|
||||||
|
{ onlyIfStreaming: true },
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to update streaming assistant row: ${
|
||||||
|
err instanceof Error ? err.message : 'unknown error'
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Serialize the per-step updates (#183 review): onStepFinish fires them
|
||||||
|
// without await, so two could otherwise commit out of order on different pool
|
||||||
|
// connections (step N landing after N+1). Chaining each onto the previous
|
||||||
|
// keeps the persisted row monotonic with step order; each link short-circuits
|
||||||
|
// on `finalized`, so a tail of late updates is cheap.
|
||||||
|
let stepUpdateChain: Promise<void> = Promise.resolve();
|
||||||
|
|
||||||
|
// Terminal finalize: write the completed/error/aborted row exactly once
|
||||||
|
// across the (mutually-exclusive, at-most-once) onFinish/onError/onAbort
|
||||||
|
// callbacks — mirroring the pre-#183 persist-at-most-once guard for the
|
||||||
|
// TERMINAL status (the row may be updated many times with 'streaming' before
|
||||||
|
// this fires once).
|
||||||
|
let finalized = false;
|
||||||
|
const finalizeAssistant = async (
|
||||||
|
flushed: AssistantFlush,
|
||||||
|
): Promise<void> => {
|
||||||
|
if (finalized) return;
|
||||||
|
finalized = true;
|
||||||
|
const plan = planFinalizeAssistant(assistantId);
|
||||||
|
try {
|
||||||
|
// Shared dispatch (see applyFinalize): UPDATE the upfront row, or — when
|
||||||
|
// the upfront insert failed (kind 'insert') — INSERT the terminal row as
|
||||||
|
// the only safety against losing the turn entirely.
|
||||||
|
await applyFinalize(
|
||||||
|
this.aiChatMessageRepo,
|
||||||
|
plan,
|
||||||
|
{ chatId, workspaceId: workspace.id, userId: user.id },
|
||||||
|
flushed,
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to finalize assistant message (kind=${plan.kind})`,
|
||||||
|
err as Error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary. Measure
|
||||||
// first-chunk latency, the model-silent gap right before a disconnect, and
|
// first-chunk latency, the model-silent gap right before a disconnect, and
|
||||||
// how many SSE heartbeats were written, so a Safari drop can be classified
|
// how many SSE heartbeats were written, so a Safari drop can be classified
|
||||||
@@ -428,6 +530,12 @@ export class AiChatService {
|
|||||||
// the in-progress accumulator for the next step.
|
// the in-progress accumulator for the next step.
|
||||||
capturedSteps.push(step as StepLike);
|
capturedSteps.push(step as StepLike);
|
||||||
inProgressText = '';
|
inProgressText = '';
|
||||||
|
// Step-granular durability (#183): persist this finished step (its text +
|
||||||
|
// tool calls + tool RESULTS) the moment it ends, so a process death after
|
||||||
|
// this point still recovers the step. Not awaited here (never block the
|
||||||
|
// stream), but SERIALIZED via stepUpdateChain so the writes commit in
|
||||||
|
// step order; updateStreaming is error-tolerant (logs + swallows).
|
||||||
|
stepUpdateChain = stepUpdateChain.then(() => updateStreaming());
|
||||||
},
|
},
|
||||||
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
onFinish: async ({ text, finishReason, totalUsage, usage, steps }) => {
|
||||||
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: success
|
// DIAGNOSTIC (Safari stream-drop investigation) — temporary: success
|
||||||
@@ -438,28 +546,31 @@ export class AiChatService {
|
|||||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||||
`heartbeatsSent=${heartbeatsSent} steps=${steps.length}`,
|
`heartbeatsSent=${heartbeatsSent} steps=${steps.length}`,
|
||||||
);
|
);
|
||||||
await persistAssistant({
|
// Finalize the assistant row (#183): the upfront 'streaming' row is
|
||||||
text,
|
// UPDATEd to 'completed' with the turn's final text, cumulative usage and
|
||||||
toolCalls: serializeSteps(steps),
|
// full UIMessage parts. We pass the SDK `steps` (which carry the final
|
||||||
metadata: {
|
// step's text) as the captured steps so metadata.parts matches the
|
||||||
finishReason,
|
// pre-#183 onFinish record exactly; `inProgressText` is '' here (the last
|
||||||
// Persist the turn's cumulative usage WITH reasoning tokens resolved
|
// step already finished). Final-step usage (usage.input+output) ≈ the
|
||||||
// from either the new `outputTokenDetails` or the deprecated top-level
|
// conversation's CURRENT context size, distinct from totalUsage.
|
||||||
// field, so reopened history / the Markdown export show the thinking
|
//
|
||||||
// token cost too.
|
// COLUMN-SEMANTICS NOTE (#183): `content` is built by flushAssistant as
|
||||||
usage: normalizeStreamUsage(totalUsage as StreamUsage) ?? totalUsage,
|
// the CONCATENATION of every step's text (stepsText), whereas pre-#183
|
||||||
// Final-step usage = the context actually fed to the model on the last LLM
|
// it stored only the FINAL step's text. This is a deliberate, harmless
|
||||||
// call (full history + tool results) plus the answer it just generated.
|
// change: the UI and the Markdown export render from `metadata.parts`
|
||||||
// input+output of the FINAL step ≈ the conversation's CURRENT context size,
|
// (per-step text + tool parts), not from `content`; `content` is the
|
||||||
// distinct from totalUsage which sums every step (cumulative tokens spent).
|
// plain-text projection (full-text search / fallback). A multi-step
|
||||||
|
// turn's `content` therefore now holds all steps' prose, not just the
|
||||||
|
// last block.
|
||||||
|
await finalizeAssistant(
|
||||||
|
flushAssistant(steps as StepLike[], '', 'completed', {
|
||||||
|
finishReason: finishReason as string,
|
||||||
|
usage: totalUsage as StreamUsage,
|
||||||
contextTokens:
|
contextTokens:
|
||||||
(usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) || undefined,
|
(usage?.inputTokens ?? 0) + (usage?.outputTokens ?? 0) ||
|
||||||
// Persist the FULL set of UIMessage parts for the turn (text +
|
undefined,
|
||||||
// tool-call/result), so the rebuilt history replays prior tool
|
}),
|
||||||
// context to the model on later turns.
|
);
|
||||||
parts: assistantParts(steps, text),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
// Lifecycle: release the external MCP clients leased for this turn.
|
// Lifecycle: release the external MCP clients leased for this turn.
|
||||||
await closeExternalClients();
|
await closeExternalClients();
|
||||||
|
|
||||||
@@ -495,16 +606,14 @@ export class AiChatService {
|
|||||||
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
`firstChunkLatency=${firstModelChunkAt ? firstModelChunkAt - streamStartedAt : 'none'}ms ` +
|
||||||
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`,
|
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent}`,
|
||||||
);
|
);
|
||||||
// Persist the PARTIAL answer streamed before the failure (text + any
|
// Finalize the PARTIAL answer streamed before the failure (text + any
|
||||||
// finished tool steps) WITH the error in metadata, so the turn shows what
|
// finished tool steps) WITH the error in metadata, so the turn shows what
|
||||||
// the user already saw plus the cause — not just a bare error.
|
// the user already saw plus the cause — not just a bare error. Status
|
||||||
await persistAssistant(
|
// 'error' (#183).
|
||||||
buildPartialAssistantRecord(
|
await finalizeAssistant(
|
||||||
capturedSteps,
|
flushAssistant(capturedSteps, inProgressText, 'error', {
|
||||||
inProgressText,
|
error: errorText,
|
||||||
'error',
|
}),
|
||||||
errorText,
|
|
||||||
),
|
|
||||||
);
|
);
|
||||||
await closeExternalClients();
|
await closeExternalClients();
|
||||||
},
|
},
|
||||||
@@ -528,13 +637,26 @@ export class AiChatService {
|
|||||||
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` +
|
`silentGapBeforeDrop=${diagNow - lastModelChunkAt}ms heartbeatsSent=${heartbeatsSent} ` +
|
||||||
`steps=${steps.length}`,
|
`steps=${steps.length}`,
|
||||||
);
|
);
|
||||||
await persistAssistant(
|
await finalizeAssistant(
|
||||||
buildPartialAssistantRecord(capturedSteps, inProgressText, 'aborted'),
|
flushAssistant(capturedSteps, inProgressText, 'aborted'),
|
||||||
);
|
);
|
||||||
await closeExternalClients();
|
await closeExternalClients();
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Drain the stream independently of the client socket so the turn always
|
||||||
|
// runs to completion (or to its abort) and the terminal callbacks
|
||||||
|
// (onFinish/onError/onAbort) fire — releasing the per-turn object graph
|
||||||
|
// (history, the per-request toolset closures, captured steps, SDK buffers)
|
||||||
|
// and closing leased MCP clients. WITHOUT this, a client disconnect leaves
|
||||||
|
// the pipe's dead socket as the only reader; backpressure stalls the stream,
|
||||||
|
// the callbacks never run, and every dropped turn stays rooted in memory —
|
||||||
|
// the heap-OOM leak. consumeStream removes that backpressure (AI SDK v6
|
||||||
|
// "Handling client disconnects"). NOT awaited (fire-and-forget); the stream
|
||||||
|
// errors are already logged by the streamText `onError` callback above, so
|
||||||
|
// swallow here to avoid an unhandledRejection.
|
||||||
|
void result.consumeStream({ onError: () => undefined });
|
||||||
|
|
||||||
// Stream the UI-message protocol straight to the hijacked Node response.
|
// Stream the UI-message protocol straight to the hijacked Node response.
|
||||||
// Without onError the AI SDK masks the cause ('An error occurred.') and the
|
// Without onError the AI SDK masks the cause ('An error occurred.') and the
|
||||||
// UI shows a generic failure. Surface the real provider message instead.
|
// UI shows a generic failure. Surface the real provider message instead.
|
||||||
@@ -639,7 +761,10 @@ export class AiChatService {
|
|||||||
'punctuation at the end.',
|
'punctuation at the end.',
|
||||||
prompt: firstMessage.slice(0, 2000),
|
prompt: firstMessage.slice(0, 2000),
|
||||||
});
|
});
|
||||||
const title = text.trim().replace(/^["']|["']$/g, '').slice(0, 120);
|
const title = text
|
||||||
|
.trim()
|
||||||
|
.replace(/^["']|["']$/g, '')
|
||||||
|
.slice(0, 120);
|
||||||
if (title) {
|
if (title) {
|
||||||
await this.aiChatRepo.update(chatId, { title }, workspaceId);
|
await this.aiChatRepo.update(chatId, { title }, workspaceId);
|
||||||
}
|
}
|
||||||
@@ -962,38 +1087,132 @@ export function rowToUiMessage(row: AiChatMessage): Omit<UIMessage, 'id'> & {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the assistant-message record persisted on a partial/failed turn (the
|
* The persisted-row patch shape produced by {@link flushAssistant}. It is the
|
||||||
* streamText onError / onAbort paths). Captures the partial answer the user
|
* SAME shape the assistant repo insert/update consume (content + toolCalls +
|
||||||
* already saw: each finished step's text + tool parts (via assistantParts),
|
* metadata) plus the lifecycle `status` column added in #183.
|
||||||
* then the in-progress step's text appended last. When `errorText` is provided
|
|
||||||
* it is recorded in metadata.error so the cause shows in history; an aborted
|
|
||||||
* turn passes none. Pure, so the partial-recording shape is unit-testable
|
|
||||||
* without seaming streamText.
|
|
||||||
*/
|
*/
|
||||||
export function buildPartialAssistantRecord(
|
export interface AssistantFlush {
|
||||||
steps: ReadonlyArray<StepLike> | undefined,
|
content: string;
|
||||||
|
toolCalls: unknown;
|
||||||
|
metadata: Record<string, unknown>;
|
||||||
|
status: 'streaming' | 'completed' | 'error' | 'aborted';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pure decision for the terminal finalize (#183): given whether the upfront
|
||||||
|
* assistant row exists (`assistantId`), choose whether the terminal payload is
|
||||||
|
* written by UPDATEing that row or — when the upfront insert failed and there is
|
||||||
|
* no id — by INSERTing a fresh terminal row so the turn is not lost entirely.
|
||||||
|
* Returns `{ kind: 'update', id }` or `{ kind: 'insert' }`. Extracted so the
|
||||||
|
* fallback-insert branch (the only safety against losing a turn whose upfront
|
||||||
|
* insert failed) is unit-testable without seaming streamText.
|
||||||
|
*/
|
||||||
|
export function planFinalizeAssistant(
|
||||||
|
assistantId: string | undefined,
|
||||||
|
): { kind: 'update'; id: string } | { kind: 'insert' } {
|
||||||
|
return assistantId ? { kind: 'update', id: assistantId } : { kind: 'insert' };
|
||||||
|
}
|
||||||
|
|
||||||
|
/** The repo surface the terminal finalize needs (structural — the real repo and
|
||||||
|
* a test mock both satisfy it). */
|
||||||
|
export interface FinalizeRepo {
|
||||||
|
insert(insertable: Record<string, unknown>): Promise<unknown>;
|
||||||
|
update(
|
||||||
|
id: string,
|
||||||
|
workspaceId: string,
|
||||||
|
patch: AssistantFlush,
|
||||||
|
): Promise<unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply a finalize `plan` to the repo with the terminal `flushed` payload (#183):
|
||||||
|
* UPDATE the upfront row, or INSERT a fresh terminal row as the fallback when the
|
||||||
|
* upfront insert failed. The SINGLE dispatch shared by the service's
|
||||||
|
* finalizeAssistant and its test, so the test exercises the real path instead of
|
||||||
|
* a copy (#186 review). Pure of error handling — the caller wraps it.
|
||||||
|
*/
|
||||||
|
export async function applyFinalize(
|
||||||
|
repo: FinalizeRepo,
|
||||||
|
plan: { kind: 'update'; id: string } | { kind: 'insert' },
|
||||||
|
base: { chatId: string; workspaceId: string; userId: string },
|
||||||
|
flushed: AssistantFlush,
|
||||||
|
): Promise<void> {
|
||||||
|
if (plan.kind === 'update') {
|
||||||
|
await repo.update(plan.id, base.workspaceId, flushed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await repo.insert({
|
||||||
|
chatId: base.chatId,
|
||||||
|
workspaceId: base.workspaceId,
|
||||||
|
userId: base.userId,
|
||||||
|
role: 'assistant',
|
||||||
|
content: flushed.content,
|
||||||
|
toolCalls: flushed.toolCalls ?? null,
|
||||||
|
metadata: flushed.metadata,
|
||||||
|
status: flushed.status,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* PURE assistant-row builder (#183 step-granular durability). Given the turn's
|
||||||
|
* accumulated steps + the in-progress (not-yet-finished) text + the lifecycle
|
||||||
|
* status, it returns the row patch to persist. The SAME path runs for the
|
||||||
|
* upfront insert (empty steps, status 'streaming'), every per-step update, and
|
||||||
|
* the terminal finalize (completed/error/aborted) — and a future background
|
||||||
|
* worker can call it identically, so it must stay a pure function of its inputs
|
||||||
|
* (NO `this`, no IO).
|
||||||
|
*
|
||||||
|
* `metadata.parts` is built by assistantParts over the finished steps, then the
|
||||||
|
* in-progress text appended as a trailing text part, so rowToUiMessage /
|
||||||
|
* findRecent keep replaying the turn unchanged. `metadata.finishReason`,
|
||||||
|
* `metadata.error`, `metadata.usage` and `metadata.contextTokens` are attached
|
||||||
|
* only when provided/relevant, matching the pre-#183 onFinish/onError records.
|
||||||
|
*/
|
||||||
|
export function flushAssistant(
|
||||||
|
capturedSteps: ReadonlyArray<StepLike> | undefined,
|
||||||
inProgressText: string,
|
inProgressText: string,
|
||||||
finishReason: 'error' | 'aborted',
|
status: 'streaming' | 'completed' | 'error' | 'aborted',
|
||||||
errorText?: string,
|
extra?: {
|
||||||
): { text: string; toolCalls: unknown; metadata: Record<string, unknown> } {
|
finishReason?: string;
|
||||||
const finished = steps ?? [];
|
usage?: ChatStreamUsage | StreamUsage | undefined;
|
||||||
|
contextTokens?: number;
|
||||||
|
error?: string;
|
||||||
|
},
|
||||||
|
): AssistantFlush {
|
||||||
|
const finished = capturedSteps ?? [];
|
||||||
const stepsText = finished.map((s) => s.text ?? '').join('');
|
const stepsText = finished.map((s) => s.text ?? '').join('');
|
||||||
const trailing = inProgressText ?? '';
|
const trailing = inProgressText ?? '';
|
||||||
// assistantParts emits text parts only for FINISHED steps; append the
|
// assistantParts emits text parts only for FINISHED steps; append the
|
||||||
// in-progress step's text (the answer cut off by the error) as the last text
|
// in-progress step's text (the partial answer cut off by an error/abort, or
|
||||||
// part so the persisted parts match what streamed to the client.
|
// simply not yet flushed mid-stream) as the last text part so the persisted
|
||||||
|
// parts match what streamed to the client.
|
||||||
const parts = assistantParts(finished, '') as unknown as Array<
|
const parts = assistantParts(finished, '') as unknown as Array<
|
||||||
Record<string, unknown>
|
Record<string, unknown>
|
||||||
>;
|
>;
|
||||||
if (trailing) parts.push({ type: 'text', text: trailing });
|
if (trailing) parts.push({ type: 'text', text: trailing });
|
||||||
return {
|
|
||||||
text: stepsText + trailing,
|
const metadata: Record<string, unknown> = {
|
||||||
toolCalls: serializeSteps(finished),
|
|
||||||
metadata: {
|
|
||||||
finishReason,
|
|
||||||
parts: parts as unknown as UIMessage['parts'],
|
parts: parts as unknown as UIMessage['parts'],
|
||||||
...(errorText ? { error: errorText } : {}),
|
};
|
||||||
},
|
// finishReason: prefer an explicit one; else derive a sensible value from the
|
||||||
|
// terminal status (so onError/onAbort records keep their historical reason).
|
||||||
|
if (extra?.finishReason) {
|
||||||
|
metadata.finishReason = extra.finishReason;
|
||||||
|
} else if (status === 'error' || status === 'aborted') {
|
||||||
|
metadata.finishReason = status;
|
||||||
|
}
|
||||||
|
if (extra?.usage !== undefined) {
|
||||||
|
metadata.usage =
|
||||||
|
normalizeStreamUsage(extra.usage as StreamUsage) ?? extra.usage;
|
||||||
|
}
|
||||||
|
if (extra?.contextTokens) metadata.contextTokens = extra.contextTokens;
|
||||||
|
if (extra?.error) metadata.error = extra.error;
|
||||||
|
|
||||||
|
return {
|
||||||
|
content: stepsText + trailing,
|
||||||
|
toolCalls: serializeSteps(finished),
|
||||||
|
metadata,
|
||||||
|
status,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
295
apps/server/src/core/ai-chat/chat-markdown.util.spec.ts
Normal file
295
apps/server/src/core/ai-chat/chat-markdown.util.spec.ts
Normal file
@@ -0,0 +1,295 @@
|
|||||||
|
import { buildChatMarkdown, normalizeLang } from './chat-markdown.util';
|
||||||
|
import type { AiChatMessage } from '@docmost/db/types/entity.types';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* normalizeLang: the client sends `i18n.language` — a FULL locale tag like
|
||||||
|
* 'en-US' / 'ru-RU', NOT a bare 'en'/'ru'. A `@IsIn(['en','ru'])` DTO rejected
|
||||||
|
* that with a 400 (caught in real-browser testing); the export now accepts any
|
||||||
|
* string and normalizes here. Guards that regression.
|
||||||
|
*/
|
||||||
|
describe('normalizeLang', () => {
|
||||||
|
it("maps any 'ru…' locale tag to ru", () => {
|
||||||
|
expect(normalizeLang('ru')).toBe('ru');
|
||||||
|
expect(normalizeLang('ru-RU')).toBe('ru');
|
||||||
|
expect(normalizeLang('RU-ru')).toBe('ru');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('maps everything else (incl. region-qualified English) to en', () => {
|
||||||
|
expect(normalizeLang('en')).toBe('en');
|
||||||
|
expect(normalizeLang('en-US')).toBe('en');
|
||||||
|
expect(normalizeLang('fr-FR')).toBe('en');
|
||||||
|
expect(normalizeLang(undefined)).toBe('en');
|
||||||
|
expect(normalizeLang('')).toBe('en');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for the SERVER Markdown export (#183). Mirrors the coverage of the
|
||||||
|
* (now-removed) client chat-markdown tests: heading/metadata, role labels, text
|
||||||
|
* + tool blocks, token footers, the interrupted-turn note, and NULL-status
|
||||||
|
* (legacy) rows. The export embeds a live `new Date().toISOString()` timestamp;
|
||||||
|
* we never assert it, only the deterministic structure.
|
||||||
|
*/
|
||||||
|
|
||||||
|
function row(partial: Partial<AiChatMessage>): AiChatMessage {
|
||||||
|
return {
|
||||||
|
id: partial.id ?? 'id',
|
||||||
|
chatId: partial.chatId ?? 'chat-1',
|
||||||
|
workspaceId: partial.workspaceId ?? 'ws-1',
|
||||||
|
userId: partial.userId ?? null,
|
||||||
|
role: partial.role ?? 'user',
|
||||||
|
content: partial.content ?? null,
|
||||||
|
toolCalls: partial.toolCalls ?? null,
|
||||||
|
metadata: partial.metadata ?? null,
|
||||||
|
status: partial.status ?? null,
|
||||||
|
createdAt: partial.createdAt ?? ('2026-06-21T00:00:00.000Z' as never),
|
||||||
|
updatedAt: partial.updatedAt ?? ('2026-06-21T00:00:00.000Z' as never),
|
||||||
|
deletedAt: partial.deletedAt ?? null,
|
||||||
|
} as AiChatMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('buildChatMarkdown (server) — structure', () => {
|
||||||
|
it('emits the title heading, chat id and message count', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'My chat',
|
||||||
|
chatId: 'chat-123',
|
||||||
|
rows: [],
|
||||||
|
});
|
||||||
|
expect(md).toContain('# My chat');
|
||||||
|
expect(md).toContain('- Chat ID: `chat-123`');
|
||||||
|
expect(md).toContain('- Messages: 0');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('falls back to "Untitled chat" with no title (en)', () => {
|
||||||
|
const md = buildChatMarkdown({ title: null, chatId: 'c', rows: [] });
|
||||||
|
expect(md).toContain('# Untitled chat');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('localizes fixed labels with lang=ru (structure stays English)', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: null,
|
||||||
|
chatId: 'c',
|
||||||
|
lang: 'ru',
|
||||||
|
rows: [row({ role: 'assistant', content: 'hi' })],
|
||||||
|
});
|
||||||
|
expect(md).toContain('# Без названия');
|
||||||
|
expect(md).toContain('## 1. ИИ-агент');
|
||||||
|
// Structural words remain English.
|
||||||
|
expect(md).toContain('- Chat ID:');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('numbers messages and labels roles (You / AI agent)', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({ role: 'user', content: 'question' }),
|
||||||
|
row({ role: 'assistant', content: 'answer' }),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(md).toContain('## 1. You');
|
||||||
|
expect(md).toContain('question');
|
||||||
|
expect(md).toContain('## 2. AI agent');
|
||||||
|
expect(md).toContain('answer');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('renders a tool part with fenced input/output and the friendly label', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({
|
||||||
|
role: 'assistant',
|
||||||
|
content: 'done',
|
||||||
|
metadata: {
|
||||||
|
parts: [
|
||||||
|
{
|
||||||
|
type: 'tool-getPage',
|
||||||
|
state: 'output-available',
|
||||||
|
input: { id: 'p1' },
|
||||||
|
output: { title: 'Hello' },
|
||||||
|
},
|
||||||
|
{ type: 'text', text: 'done' },
|
||||||
|
],
|
||||||
|
} as never,
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(md).toContain('**Tool: Read page** (`getPage`) — done');
|
||||||
|
expect(md).toContain('Input:');
|
||||||
|
expect(md).toContain('"id": "p1"');
|
||||||
|
expect(md).toContain('Output:');
|
||||||
|
expect(md).toContain('"title": "Hello"');
|
||||||
|
});
|
||||||
|
|
||||||
|
// #186 re-review pt 1: restore the parity coverage of the removed client spec —
|
||||||
|
// error state, unknown-tool fallback (en + ru), and the circular-stringify catch.
|
||||||
|
it('renders a tool part in the error state with its errorText', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({
|
||||||
|
role: 'assistant',
|
||||||
|
metadata: {
|
||||||
|
parts: [
|
||||||
|
{
|
||||||
|
type: 'tool-getPage',
|
||||||
|
state: 'output-error',
|
||||||
|
input: { id: 'p1' },
|
||||||
|
errorText: 'page not found',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
} as never,
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(md).toContain('**Tool: Read page** (`getPage`) — error');
|
||||||
|
expect(md).toContain('**Error:** page not found');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('falls back to "Ran tool <name>" for an unknown tool (en) and the ru variant', () => {
|
||||||
|
const parts = [
|
||||||
|
{
|
||||||
|
type: 'tool-mysteryTool',
|
||||||
|
state: 'output-available',
|
||||||
|
output: { ok: 1 },
|
||||||
|
},
|
||||||
|
];
|
||||||
|
const en = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [row({ role: 'assistant', metadata: { parts } as never })],
|
||||||
|
});
|
||||||
|
expect(en).toContain('**Tool: Ran tool mysteryTool** (`mysteryTool`)');
|
||||||
|
const ru = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
lang: 'ru',
|
||||||
|
rows: [row({ role: 'assistant', metadata: { parts } as never })],
|
||||||
|
});
|
||||||
|
expect(ru).toContain('Выполнил инструмент mysteryTool');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not throw on a circular tool output (falls back to String)', () => {
|
||||||
|
const circular: Record<string, unknown> = {};
|
||||||
|
circular.self = circular;
|
||||||
|
expect(() =>
|
||||||
|
buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({
|
||||||
|
role: 'assistant',
|
||||||
|
metadata: {
|
||||||
|
parts: [
|
||||||
|
{
|
||||||
|
type: 'tool-getPage',
|
||||||
|
state: 'output-available',
|
||||||
|
output: circular,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
} as never,
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
).not.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits a token footer + total when usage is present', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({
|
||||||
|
role: 'assistant',
|
||||||
|
content: 'a',
|
||||||
|
metadata: {
|
||||||
|
usage: {
|
||||||
|
inputTokens: 100,
|
||||||
|
outputTokens: 20,
|
||||||
|
totalTokens: 120,
|
||||||
|
reasoningTokens: 8,
|
||||||
|
},
|
||||||
|
} as never,
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(md).toContain('- Total tokens: 120');
|
||||||
|
expect(md).toContain(
|
||||||
|
'_Tokens — in: 100, out: 20, reasoning: 8, total: 120_',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('flags a still-streaming (interrupted) row', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({ role: 'assistant', content: 'partial', status: 'streaming' }),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(md).toContain('still being generated');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT flag a completed row', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [row({ role: 'assistant', content: 'final', status: 'completed' })],
|
||||||
|
});
|
||||||
|
expect(md).not.toContain('still being generated');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('renders a legacy NULL-status row (no parts) from plain content', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({ role: 'assistant', content: 'legacy answer', status: null }),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(md).toContain('legacy answer');
|
||||||
|
expect(md).not.toContain('still being generated');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('renders a persisted error', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({
|
||||||
|
role: 'assistant',
|
||||||
|
content: '',
|
||||||
|
status: 'error',
|
||||||
|
metadata: { error: '401: Unauthorized' } as never,
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
expect(md).toContain('**⚠️ Error:** 401: Unauthorized');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('escapes embedded triple-backtick fences with a longer delimiter', () => {
|
||||||
|
const md = buildChatMarkdown({
|
||||||
|
title: 'T',
|
||||||
|
chatId: 'c',
|
||||||
|
rows: [
|
||||||
|
row({
|
||||||
|
role: 'assistant',
|
||||||
|
content: 'x',
|
||||||
|
metadata: {
|
||||||
|
parts: [
|
||||||
|
{
|
||||||
|
type: 'tool-getPage',
|
||||||
|
state: 'output-available',
|
||||||
|
output: '```inner```',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
} as never,
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
// A 4-backtick fence wraps content that itself contains a 3-backtick run.
|
||||||
|
expect(md).toContain('````');
|
||||||
|
});
|
||||||
|
});
|
||||||
299
apps/server/src/core/ai-chat/chat-markdown.util.ts
Normal file
299
apps/server/src/core/ai-chat/chat-markdown.util.ts
Normal file
@@ -0,0 +1,299 @@
|
|||||||
|
/**
|
||||||
|
* Server-side Markdown export for an AI agent chat (#183). The DB is the single
|
||||||
|
* source of truth: this renders a chat purely from its persisted message rows
|
||||||
|
* (`AiChatMessage[]` — role / content / metadata.parts / toolCalls / usage).
|
||||||
|
* Because the assistant row is now persisted UPFRONT and updated per step, an
|
||||||
|
* interrupted turn is included up to its last finished step.
|
||||||
|
*
|
||||||
|
* Ported from the client `utils/chat-markdown.ts`. It is a PURE function (apart
|
||||||
|
* from `new Date()` for the export timestamp), so it is straightforward to
|
||||||
|
* unit-test and a future background worker can reuse it.
|
||||||
|
*
|
||||||
|
* Only a few fixed role/tool labels are localized via the `lang` param; the
|
||||||
|
* structural document words (Input/Output/Error/Tokens/...) stay English because
|
||||||
|
* the output is a technical artifact.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { AiChatMessage } from '@docmost/db/types/entity.types';
|
||||||
|
|
||||||
|
/** Supported export label languages. Defaults to English. */
|
||||||
|
export type ExportLang = 'en' | 'ru';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Normalize an arbitrary client locale code to a supported export language. The
|
||||||
|
* client sends `i18n.language`, which is a FULL locale tag (e.g. `en-US`,
|
||||||
|
* `ru-RU`), not a bare `en`/`ru` — so match on the language subtag and fall back
|
||||||
|
* to English for anything non-Russian.
|
||||||
|
*/
|
||||||
|
export function normalizeLang(lang?: string): ExportLang {
|
||||||
|
return lang?.toLowerCase().startsWith('ru') ? 'ru' : 'en';
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A single AI SDK UIMessage part (text part or a tool part). */
|
||||||
|
interface ExportPart {
|
||||||
|
type: string;
|
||||||
|
text?: string;
|
||||||
|
state?: string;
|
||||||
|
toolName?: string;
|
||||||
|
input?: unknown;
|
||||||
|
output?: unknown;
|
||||||
|
errorText?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Authoritative per-turn usage the server attaches to a message row. */
|
||||||
|
interface UsageLike {
|
||||||
|
inputTokens?: number;
|
||||||
|
outputTokens?: number;
|
||||||
|
totalTokens?: number;
|
||||||
|
reasoningTokens?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Localized label table. The client-side Markdown builder was removed by #183
|
||||||
|
* (the export is now server-side only), so this no longer mirrors a second
|
||||||
|
* exporter — instead the tool-action labels are kept in parity with the
|
||||||
|
* on-screen action-log labels in the client's `tool-parts.tsx` (`toolLabelKey`)
|
||||||
|
* so the export reads the same as the UI. Only role + tool-action labels are
|
||||||
|
* localized; everything structural is an English constant in the renderer. */
|
||||||
|
const LABELS: Record<
|
||||||
|
ExportLang,
|
||||||
|
{
|
||||||
|
untitled: string;
|
||||||
|
aiAgent: string;
|
||||||
|
you: string;
|
||||||
|
tools: Record<string, string>;
|
||||||
|
ranTool: (name: string) => string;
|
||||||
|
stillGenerating: string;
|
||||||
|
}
|
||||||
|
> = {
|
||||||
|
en: {
|
||||||
|
untitled: 'Untitled chat',
|
||||||
|
aiAgent: 'AI agent',
|
||||||
|
you: 'You',
|
||||||
|
tools: {
|
||||||
|
searchPages: 'Searched pages',
|
||||||
|
getPage: 'Read page',
|
||||||
|
createPage: 'Created page',
|
||||||
|
updatePageContent: 'Updated page',
|
||||||
|
renamePage: 'Renamed page',
|
||||||
|
movePage: 'Moved page',
|
||||||
|
deletePage: 'Deleted page (to trash)',
|
||||||
|
createComment: 'Commented',
|
||||||
|
resolveComment: 'Resolved comment',
|
||||||
|
},
|
||||||
|
ranTool: (name) => `Ran tool ${name}`,
|
||||||
|
stillGenerating:
|
||||||
|
'This message is still being generated — the export captured a partial, in-progress response.',
|
||||||
|
},
|
||||||
|
ru: {
|
||||||
|
untitled: 'Без названия',
|
||||||
|
aiAgent: 'ИИ-агент',
|
||||||
|
you: 'Вы',
|
||||||
|
tools: {
|
||||||
|
searchPages: 'Искал по страницам',
|
||||||
|
getPage: 'Прочитал страницу',
|
||||||
|
createPage: 'Создал страницу',
|
||||||
|
updatePageContent: 'Обновил страницу',
|
||||||
|
renamePage: 'Переименовал страницу',
|
||||||
|
movePage: 'Переместил страницу',
|
||||||
|
deletePage: 'Удалил страницу (в корзину)',
|
||||||
|
createComment: 'Прокомментировал',
|
||||||
|
resolveComment: 'Закрыл комментарий',
|
||||||
|
},
|
||||||
|
ranTool: (name) => `Выполнил инструмент ${name}`,
|
||||||
|
stillGenerating:
|
||||||
|
'Это сообщение всё ещё генерируется — экспорт захватил частичный, незавершённый ответ.',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
/** True for AI SDK tool parts (static `tool-*` or `dynamic-tool`). */
|
||||||
|
function isToolPart(type: string): boolean {
|
||||||
|
return type.startsWith('tool-') || type === 'dynamic-tool';
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Extract the tool name from a part `type` of `tool-${name}` (or dynamic). */
|
||||||
|
function getToolName(part: ExportPart): string {
|
||||||
|
if (part.type === 'dynamic-tool') return part.toolName ?? '';
|
||||||
|
return part.type.startsWith('tool-')
|
||||||
|
? part.type.slice('tool-'.length)
|
||||||
|
: part.type;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Map an AI SDK tool-part state to the 3 states the action-log renders. */
|
||||||
|
function toolRunState(state: string | undefined): 'running' | 'done' | 'error' {
|
||||||
|
if (state === 'output-error' || state === 'output-denied') return 'error';
|
||||||
|
if (state === 'output-available') return 'done';
|
||||||
|
return 'running';
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Resolve a tool's friendly action-log label (localized) from its name. */
|
||||||
|
function toolLabel(name: string, lang: ExportLang): string {
|
||||||
|
return LABELS[lang].tools[name] ?? LABELS[lang].ranTool(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stringify an arbitrary tool input/output value for a fenced block. Strings
|
||||||
|
* pass through as-is; everything else is pretty-printed JSON, falling back to
|
||||||
|
* `String(value)` if serialization throws (e.g. a circular structure).
|
||||||
|
*/
|
||||||
|
function stringify(value: unknown): string {
|
||||||
|
if (typeof value === 'string') return value;
|
||||||
|
try {
|
||||||
|
return JSON.stringify(value, null, 2);
|
||||||
|
} catch {
|
||||||
|
return String(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap `code` in a fenced code block whose backtick delimiter is LONGER than the
|
||||||
|
* longest backtick run inside the content, so embedded backticks (or a literal
|
||||||
|
* ``` fence) never break out of the block. Minimum 3 backticks.
|
||||||
|
*/
|
||||||
|
function fence(code: string, lang = ''): string {
|
||||||
|
const runs: string[] = code.match(/`+/g) ?? [];
|
||||||
|
const longest = runs.reduce((m, s) => Math.max(m, s.length), 0);
|
||||||
|
const delim = '`'.repeat(Math.max(3, longest + 1));
|
||||||
|
return `${delim}${lang}\n${code}\n${delim}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Per-row token count, mirroring the header sum in the client window. */
|
||||||
|
function rowTokens(usage: UsageLike): number {
|
||||||
|
return (
|
||||||
|
usage.totalTokens ?? (usage.inputTokens ?? 0) + (usage.outputTokens ?? 0)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Render one message's UIMessage parts into an array of Markdown blocks
|
||||||
|
* (text blocks + tool blocks). Mirrors the client renderer / MessageItem. */
|
||||||
|
function renderMessageParts(parts: ExportPart[], lang: ExportLang): string[] {
|
||||||
|
const out: string[] = [];
|
||||||
|
|
||||||
|
for (const part of parts) {
|
||||||
|
if (part.type === 'text') {
|
||||||
|
const text = (part.text ?? '').trim();
|
||||||
|
if (text.length > 0) out.push(text);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isToolPart(part.type)) continue;
|
||||||
|
|
||||||
|
const name = getToolName(part);
|
||||||
|
const label = toolLabel(name, lang);
|
||||||
|
const state = toolRunState(part.state);
|
||||||
|
|
||||||
|
const toolLines: string[] = [`**Tool: ${label}** (\`${name}\`) — ${state}`];
|
||||||
|
if (part.input !== undefined) {
|
||||||
|
toolLines.push('Input:');
|
||||||
|
toolLines.push(fence(stringify(part.input), 'json'));
|
||||||
|
}
|
||||||
|
if (part.output !== undefined) {
|
||||||
|
toolLines.push('Output:');
|
||||||
|
toolLines.push(fence(stringify(part.output), 'json'));
|
||||||
|
}
|
||||||
|
if (part.errorText) {
|
||||||
|
toolLines.push(`**Error:** ${part.errorText}`);
|
||||||
|
}
|
||||||
|
out.push(toolLines.join('\n\n'));
|
||||||
|
}
|
||||||
|
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Resolve a persisted row's parts: prefer the rich persisted parts, else a
|
||||||
|
* single text part built from the plain-text content (mirrors rowToUiMessage). */
|
||||||
|
function rowParts(row: AiChatMessage): ExportPart[] {
|
||||||
|
const meta = (row.metadata ?? {}) as { parts?: ExportPart[] };
|
||||||
|
return Array.isArray(meta.parts) && meta.parts.length > 0
|
||||||
|
? meta.parts
|
||||||
|
: [{ type: 'text', text: row.content ?? '' }];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serialize a chat to a Markdown string from its persisted rows. Source = DB
|
||||||
|
* ONLY (no live client state). A row whose `status` is still 'streaming' is an
|
||||||
|
* interrupted turn that the export captured mid-flight; it is rendered up to its
|
||||||
|
* last finished step and flagged "still generating".
|
||||||
|
*/
|
||||||
|
export function buildChatMarkdown(args: {
|
||||||
|
title: string | null;
|
||||||
|
chatId: string;
|
||||||
|
rows: AiChatMessage[];
|
||||||
|
// Accepts a full client locale tag (e.g. 'en-US'/'ru-RU'); normalized below.
|
||||||
|
lang?: string;
|
||||||
|
}): string {
|
||||||
|
const { title, chatId, rows } = args;
|
||||||
|
const lang: ExportLang = normalizeLang(args.lang);
|
||||||
|
const L = LABELS[lang];
|
||||||
|
const blocks: string[] = [];
|
||||||
|
|
||||||
|
const heading = (title ?? '').trim() || L.untitled;
|
||||||
|
blocks.push(`# ${heading}`);
|
||||||
|
|
||||||
|
const usageOf = (row: AiChatMessage): UsageLike | undefined => {
|
||||||
|
const meta = (row.metadata ?? {}) as { usage?: UsageLike };
|
||||||
|
return meta.usage;
|
||||||
|
};
|
||||||
|
const errorOf = (row: AiChatMessage): string | undefined => {
|
||||||
|
const meta = (row.metadata ?? {}) as { error?: string };
|
||||||
|
return meta.error;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Metadata bullet list. Total tokens is only shown when there is a sum.
|
||||||
|
const totalTokens = rows.reduce((sum, row) => {
|
||||||
|
const usage = usageOf(row);
|
||||||
|
return usage ? sum + rowTokens(usage) : sum;
|
||||||
|
}, 0);
|
||||||
|
const meta = [
|
||||||
|
`- Chat ID: \`${chatId}\``,
|
||||||
|
`- Exported: ${new Date().toISOString()}`,
|
||||||
|
`- Messages: ${rows.length}`,
|
||||||
|
];
|
||||||
|
if (totalTokens > 0) meta.push(`- Total tokens: ${totalTokens}`);
|
||||||
|
blocks.push(meta.join('\n'));
|
||||||
|
|
||||||
|
rows.forEach((row, index) => {
|
||||||
|
blocks.push('---');
|
||||||
|
|
||||||
|
const roleLabel = row.role === 'assistant' ? L.aiAgent : L.you;
|
||||||
|
blocks.push(`## ${index + 1}. ${roleLabel}`);
|
||||||
|
|
||||||
|
// Created-at kept in source as an HTML comment (out of the rendered prose).
|
||||||
|
if (row.createdAt) {
|
||||||
|
const iso =
|
||||||
|
row.createdAt instanceof Date
|
||||||
|
? row.createdAt.toISOString()
|
||||||
|
: String(row.createdAt);
|
||||||
|
blocks.push(`<!-- ${iso} -->`);
|
||||||
|
}
|
||||||
|
|
||||||
|
blocks.push(...renderMessageParts(rowParts(row), lang));
|
||||||
|
|
||||||
|
// A still-'streaming' row is an interrupted/in-progress turn captured by the
|
||||||
|
// export; record that so the partial answer is not mistaken for complete.
|
||||||
|
if (row.status === 'streaming') {
|
||||||
|
blocks.push(`_⏳ ${L.stillGenerating}_`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const error = errorOf(row);
|
||||||
|
if (error) {
|
||||||
|
blocks.push(`**⚠️ Error:** ${error}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const usage = usageOf(row);
|
||||||
|
if (usage) {
|
||||||
|
const total = usage.totalTokens ?? rowTokens(usage);
|
||||||
|
const reasoning =
|
||||||
|
usage.reasoningTokens && usage.reasoningTokens > 0
|
||||||
|
? `, reasoning: ${usage.reasoningTokens}`
|
||||||
|
: '';
|
||||||
|
blocks.push(
|
||||||
|
`_Tokens — in: ${usage.inputTokens ?? '?'}, out: ${
|
||||||
|
usage.outputTokens ?? '?'
|
||||||
|
}${reasoning}, total: ${total}_`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Blank line between blocks so the Markdown renders cleanly.
|
||||||
|
return blocks.join('\n\n');
|
||||||
|
}
|
||||||
@@ -26,3 +26,17 @@ export class GetChatMessagesDto {
|
|||||||
@IsString()
|
@IsString()
|
||||||
cursor?: string;
|
cursor?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Export a chat to Markdown (#183). `lang` localizes the few fixed
|
||||||
|
* role/tool-action labels; defaults to English server-side. */
|
||||||
|
export class ExportChatDto {
|
||||||
|
@IsString()
|
||||||
|
chatId: string;
|
||||||
|
|
||||||
|
// A full client locale tag (e.g. 'en-US', 'ru-RU') — normalized server-side to
|
||||||
|
// a supported export language (see normalizeLang). Accept any string so a
|
||||||
|
// region-qualified locale is not rejected (the 400 that broke the real client).
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
lang?: string;
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,205 @@
|
|||||||
|
import { type Tool, type ToolCallOptions } from 'ai';
|
||||||
|
import {
|
||||||
|
wrapToolWithCallTimeout,
|
||||||
|
wrapToolsWithCallTimeout,
|
||||||
|
} from './mcp-clients.service';
|
||||||
|
import {
|
||||||
|
mcpStreamTimeoutMs,
|
||||||
|
mcpCallTimeoutMs,
|
||||||
|
} from '../../../integrations/ai/ai-streaming-fetch';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Per-call total-timeout guard for external MCP tools (mcp-clients.service).
|
||||||
|
*
|
||||||
|
* `@ai-sdk/mcp`'s tool execute has NO built-in per-call timeout — a tool that
|
||||||
|
* keeps the connection warm but never returns is otherwise unbounded. The
|
||||||
|
* wrapper attaches a fresh AbortController + timer per CALL and composes it with
|
||||||
|
* the turn's abortSignal via AbortSignal.any, so EITHER the per-call timeout OR a
|
||||||
|
* client disconnect aborts the in-flight call.
|
||||||
|
*
|
||||||
|
* Fake timers prove the timeout fires WITHOUT real waiting; no leaked timer keeps
|
||||||
|
* the process alive after a fast resolve.
|
||||||
|
*/
|
||||||
|
const CALL_TIMEOUT_MS = 900_000;
|
||||||
|
|
||||||
|
/** Build a Tool around an `execute` impl, mirroring the SDK's minimal shape. */
|
||||||
|
function toolWith(
|
||||||
|
execute: (args: unknown, options: ToolCallOptions) => unknown,
|
||||||
|
): Tool {
|
||||||
|
return { description: 'x', inputSchema: undefined, execute } as unknown as Tool;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Invoke a (possibly wrapped) tool's execute with an optional turn signal. */
|
||||||
|
function callExecute(
|
||||||
|
tool: Tool,
|
||||||
|
args: unknown,
|
||||||
|
abortSignal?: AbortSignal,
|
||||||
|
): unknown {
|
||||||
|
const execute = tool.execute as (
|
||||||
|
args: unknown,
|
||||||
|
options: ToolCallOptions,
|
||||||
|
) => unknown;
|
||||||
|
return execute(args, { abortSignal } as ToolCallOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('wrapToolWithCallTimeout', () => {
|
||||||
|
beforeEach(() => jest.useFakeTimers());
|
||||||
|
afterEach(() => {
|
||||||
|
jest.clearAllTimers();
|
||||||
|
jest.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('aborts a tool that only rejects when its abortSignal fires, after ms elapses', async () => {
|
||||||
|
// The tool resolves NEVER on its own — it only settles when the abortSignal
|
||||||
|
// it is handed aborts. So a resolution proves the per-call timer fired and
|
||||||
|
// aborted the call (not the tool finishing by itself).
|
||||||
|
let received: AbortSignal | undefined;
|
||||||
|
const tool = toolWith((_args, options) => {
|
||||||
|
received = options.abortSignal;
|
||||||
|
return new Promise((_resolve, reject) => {
|
||||||
|
options.abortSignal?.addEventListener('abort', () => {
|
||||||
|
reject(options.abortSignal?.reason ?? new Error('aborted'));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||||
|
const promise = callExecute(wrapped, { q: 'x' }) as Promise<unknown>;
|
||||||
|
// Attach the rejection handler synchronously so advancing timers cannot mark
|
||||||
|
// it an unhandled rejection.
|
||||||
|
const settled = promise.then(
|
||||||
|
() => ({ ok: true as const }),
|
||||||
|
(err: unknown) => ({ ok: false as const, err }),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Nothing fired yet.
|
||||||
|
jest.advanceTimersByTime(CALL_TIMEOUT_MS - 1);
|
||||||
|
// Past the cap -> the per-call timer aborts the composed signal.
|
||||||
|
jest.advanceTimersByTime(2);
|
||||||
|
|
||||||
|
const result = await settled;
|
||||||
|
expect(result.ok).toBe(false);
|
||||||
|
expect(received).toBeInstanceOf(AbortSignal);
|
||||||
|
// The abort reason / rejection mentions the timeout.
|
||||||
|
const message =
|
||||||
|
(result as { err: unknown }).err instanceof Error
|
||||||
|
? ((result as { err: Error }).err.message)
|
||||||
|
: String((result as { err: unknown }).err);
|
||||||
|
expect(message).toMatch(/timed out after 900000ms/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('aborts a REAL-client-style tool that never settles and ignores abort (race fix)', async () => {
|
||||||
|
// Models the ACTUAL @ai-sdk/mcp semantics: its in-flight promise does NOT
|
||||||
|
// reject on abort (it only checks the signal when a response arrives), so a
|
||||||
|
// warm-but-stuck call NEVER settles on its own and does NOT listen to the
|
||||||
|
// abort signal. The wrapper must still reject after `ms` via the race — an
|
||||||
|
// implementation that merely `await original(...)` would hang here forever.
|
||||||
|
// This test FAILS against the old await-only code and PASSES with the race.
|
||||||
|
const tool = toolWith(() => new Promise(() => {})); // never settles, no abort
|
||||||
|
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||||
|
const promise = callExecute(wrapped, { q: 'x' }) as Promise<unknown>;
|
||||||
|
// Assert the rejection without hanging: drive fake time async so the timer's
|
||||||
|
// abort -> race rejection microtasks flush, then await the rejection.
|
||||||
|
const expectation = expect(promise).rejects.toThrow(/timed out after 900000ms/);
|
||||||
|
await jest.advanceTimersByTimeAsync(CALL_TIMEOUT_MS + 1);
|
||||||
|
await expectation;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('passes a fast tool through and leaks no timer (advancing later does not throw)', async () => {
|
||||||
|
const tool = toolWith(() => Promise.resolve('fast-result'));
|
||||||
|
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||||
|
|
||||||
|
const value = await (callExecute(wrapped, {}) as Promise<unknown>);
|
||||||
|
expect(value).toBe('fast-result');
|
||||||
|
|
||||||
|
// The timer was cleared in the finally — advancing past the cap aborts
|
||||||
|
// nothing and throws nothing.
|
||||||
|
expect(() => jest.advanceTimersByTime(CALL_TIMEOUT_MS * 2)).not.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('aborts when the caller turn signal aborts before the timeout (disconnect path)', async () => {
|
||||||
|
// Real-client semantics: the tool never settles and does NOT listen to abort,
|
||||||
|
// so the wrapper must reject via the race when the caller's turn signal (a
|
||||||
|
// client disconnect) aborts BEFORE the per-call cap. The race propagates the
|
||||||
|
// caller's abort reason.
|
||||||
|
const tool = toolWith(() => new Promise(() => {})); // never settles, no abort
|
||||||
|
const wrapped = wrapToolWithCallTimeout(tool, CALL_TIMEOUT_MS);
|
||||||
|
const turn = new AbortController();
|
||||||
|
const promise = callExecute(wrapped, {}, turn.signal) as Promise<unknown>;
|
||||||
|
const settled = promise.then(
|
||||||
|
() => ({ ok: true as const }),
|
||||||
|
(err: unknown) => ({ ok: false as const, err }),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Disconnect well before the cap; the per-call timer never fires here.
|
||||||
|
turn.abort(new Error('client disconnected'));
|
||||||
|
const result = await settled;
|
||||||
|
expect(result.ok).toBe(false);
|
||||||
|
const message =
|
||||||
|
(result as { err: unknown }).err instanceof Error
|
||||||
|
? (result as { err: Error }).err.message
|
||||||
|
: String((result as { err: unknown }).err);
|
||||||
|
// The caller's abort reason propagates through the race.
|
||||||
|
expect(message).toMatch(/client disconnected/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('passes a tool with no execute through unchanged', () => {
|
||||||
|
const noExecute = { description: 'x', inputSchema: undefined } as unknown as Tool;
|
||||||
|
const wrapped = wrapToolWithCallTimeout(noExecute, CALL_TIMEOUT_MS);
|
||||||
|
// Same object back, execute still absent.
|
||||||
|
expect(wrapped).toBe(noExecute);
|
||||||
|
expect((wrapped as { execute?: unknown }).execute).toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('wrapToolsWithCallTimeout', () => {
|
||||||
|
beforeEach(() => jest.useFakeTimers());
|
||||||
|
afterEach(() => {
|
||||||
|
jest.clearAllTimers();
|
||||||
|
jest.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('wraps every tool in the map (each call gets its own guard)', async () => {
|
||||||
|
const tools: Record<string, Tool> = {
|
||||||
|
a: toolWith(() => Promise.resolve('A')),
|
||||||
|
b: toolWith(() => Promise.resolve('B')),
|
||||||
|
};
|
||||||
|
const out = wrapToolsWithCallTimeout(tools, CALL_TIMEOUT_MS);
|
||||||
|
expect(Object.keys(out)).toEqual(['a', 'b']);
|
||||||
|
expect(await (callExecute(out.a, {}) as Promise<unknown>)).toBe('A');
|
||||||
|
expect(await (callExecute(out.b, {}) as Promise<unknown>)).toBe('B');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('mcp timeout env helpers', () => {
|
||||||
|
const ORIG_SILENCE = process.env.AI_MCP_STREAM_TIMEOUT_MS;
|
||||||
|
const ORIG_CALL = process.env.AI_MCP_CALL_TIMEOUT_MS;
|
||||||
|
afterEach(() => {
|
||||||
|
if (ORIG_SILENCE === undefined) delete process.env.AI_MCP_STREAM_TIMEOUT_MS;
|
||||||
|
else process.env.AI_MCP_STREAM_TIMEOUT_MS = ORIG_SILENCE;
|
||||||
|
if (ORIG_CALL === undefined) delete process.env.AI_MCP_CALL_TIMEOUT_MS;
|
||||||
|
else process.env.AI_MCP_CALL_TIMEOUT_MS = ORIG_CALL;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('mcpStreamTimeoutMs defaults to 5 min and honors a positive override', () => {
|
||||||
|
delete process.env.AI_MCP_STREAM_TIMEOUT_MS;
|
||||||
|
expect(mcpStreamTimeoutMs()).toBe(300_000);
|
||||||
|
process.env.AI_MCP_STREAM_TIMEOUT_MS = '60000';
|
||||||
|
expect(mcpStreamTimeoutMs()).toBe(60_000);
|
||||||
|
for (const bad of ['0', '-1', 'x', '']) {
|
||||||
|
process.env.AI_MCP_STREAM_TIMEOUT_MS = bad;
|
||||||
|
expect(mcpStreamTimeoutMs()).toBe(300_000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('mcpCallTimeoutMs defaults to 15 min and honors a positive override', () => {
|
||||||
|
delete process.env.AI_MCP_CALL_TIMEOUT_MS;
|
||||||
|
expect(mcpCallTimeoutMs()).toBe(900_000);
|
||||||
|
process.env.AI_MCP_CALL_TIMEOUT_MS = '120000';
|
||||||
|
expect(mcpCallTimeoutMs()).toBe(120_000);
|
||||||
|
for (const bad of ['0', '-1', 'x', '']) {
|
||||||
|
process.env.AI_MCP_CALL_TIMEOUT_MS = bad;
|
||||||
|
expect(mcpCallTimeoutMs()).toBe(900_000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -1,12 +1,16 @@
|
|||||||
import { isIP } from 'node:net';
|
import { isIP } from 'node:net';
|
||||||
import { lookup as dnsLookup, type LookupAddress } from 'node:dns';
|
import { lookup as dnsLookup, type LookupAddress } from 'node:dns';
|
||||||
import { Injectable, Logger } from '@nestjs/common';
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
import { type Tool } from 'ai';
|
import { type Tool, type ToolCallOptions } from 'ai';
|
||||||
import { createMCPClient } from '@ai-sdk/mcp';
|
import { createMCPClient } from '@ai-sdk/mcp';
|
||||||
import { Agent, type Dispatcher } from 'undici';
|
import { Agent, type Dispatcher } from 'undici';
|
||||||
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
|
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
|
||||||
import { AiMcpServer } from '@docmost/db/types/entity.types';
|
import { AiMcpServer } from '@docmost/db/types/entity.types';
|
||||||
import { streamingDispatcherOptions } from '../../../integrations/ai/ai-streaming-fetch';
|
import {
|
||||||
|
streamingDispatcherOptions,
|
||||||
|
mcpStreamTimeoutMs,
|
||||||
|
mcpCallTimeoutMs,
|
||||||
|
} from '../../../integrations/ai/ai-streaming-fetch';
|
||||||
import { SecretBoxService } from '../../../integrations/crypto/secret-box';
|
import { SecretBoxService } from '../../../integrations/crypto/secret-box';
|
||||||
import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
|
import { isUrlAllowed, isIpAllowed } from './ssrf-guard';
|
||||||
|
|
||||||
@@ -219,6 +223,8 @@ export class McpClientsService {
|
|||||||
const tools: Record<string, Tool> = {};
|
const tools: Record<string, Tool> = {};
|
||||||
const clients: McpClient[] = [];
|
const clients: McpClient[] = [];
|
||||||
const outcomes: ServerOutcome[] = [];
|
const outcomes: ServerOutcome[] = [];
|
||||||
|
// Per-call total wall-clock cap, read once for this build (env-overridable).
|
||||||
|
const callTimeoutMs = mcpCallTimeoutMs();
|
||||||
|
|
||||||
for (const server of servers) {
|
for (const server of servers) {
|
||||||
try {
|
try {
|
||||||
@@ -230,10 +236,13 @@ export class McpClientsService {
|
|||||||
Array.isArray(allow) && allow.length > 0
|
Array.isArray(allow) && allow.length > 0
|
||||||
? pick(raw, allow)
|
? pick(raw, allow)
|
||||||
: raw;
|
: raw;
|
||||||
|
// Bound each tool's execute with a per-call total-timeout guard before
|
||||||
|
// merging, so a single chatty-but-stuck call is aborted after the cap.
|
||||||
|
const guarded = wrapToolsWithCallTimeout(picked, callTimeoutMs);
|
||||||
// Namespace each tool with the sanitized server name AND disambiguate
|
// Namespace each tool with the sanitized server name AND disambiguate
|
||||||
// against names already merged from earlier servers, so no external
|
// against names already merged from earlier servers, so no external
|
||||||
// tool is silently overwritten on collision.
|
// tool is silently overwritten on collision.
|
||||||
this.mergeNamespaced(tools, picked, server.name, server.id);
|
this.mergeNamespaced(tools, guarded, server.name, server.id);
|
||||||
outcomes.push({ name: server.name, ok: true });
|
outcomes.push({ name: server.name, ok: true });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// A failed server is skipped — the turn proceeds with the rest. Log a
|
// A failed server is skipped — the turn proceeds with the rest. Log a
|
||||||
@@ -400,17 +409,21 @@ export function validateResolvedAddresses(
|
|||||||
* to an IP literal).
|
* to an IP literal).
|
||||||
*/
|
*/
|
||||||
function buildPinnedDispatcher(): Agent {
|
function buildPinnedDispatcher(): Agent {
|
||||||
|
// External-MCP traffic uses a DEDICATED, shorter silence timeout
|
||||||
|
// (`AI_MCP_STREAM_TIMEOUT_MS`, default 5 min) — deliberately tighter than the
|
||||||
|
// chat provider's 15-min `streamTimeoutMs()` — so a byte-silent/hung MCP
|
||||||
|
// upstream is broken in ~5 min instead of 15. We keep the keep-alive options
|
||||||
|
// from `streamingDispatcherOptions()` but OVERRIDE headers/body timeouts.
|
||||||
|
// Accepted trade-off: a legitimately long but byte-silent single tool call,
|
||||||
|
// and an SSE transport idling >5 min BETWEEN tool calls, are also cut here; the
|
||||||
|
// per-call total cap (wrapToolsWithCallTimeout, `AI_MCP_CALL_TIMEOUT_MS`) is the
|
||||||
|
// complementary guard for chatty-but-stuck calls that keep the socket warm yet
|
||||||
|
// never return.
|
||||||
|
const mcpSilenceMs = mcpStreamTimeoutMs();
|
||||||
return new Agent({
|
return new Agent({
|
||||||
// Raise undici's default 300s headers/body timeouts on external MCP traffic
|
|
||||||
// to the same generous-but-finite silence timeout the chat fetch uses (#175).
|
|
||||||
// A long agent turn keeps an SSE transport (e.g. crawl4ai's /mcp/sse) open
|
|
||||||
// across the whole turn; that connection can idle BETWEEN tool calls longer
|
|
||||||
// than 5 min, and undici's bodyTimeout would otherwise sever it mid-task — a
|
|
||||||
// tool-call failure that aborts the streamed turn and shows the user "Lost
|
|
||||||
// connection to the AI provider". A slow single tool call (a crawl) can
|
|
||||||
// likewise exceed headersTimeout. The timeout stays FINITE so a genuinely
|
|
||||||
// hung server is still broken eventually.
|
|
||||||
...streamingDispatcherOptions(),
|
...streamingDispatcherOptions(),
|
||||||
|
headersTimeout: mcpSilenceMs,
|
||||||
|
bodyTimeout: mcpSilenceMs,
|
||||||
connect: {
|
connect: {
|
||||||
lookup: (hostname, _options, callback) => {
|
lookup: (hostname, _options, callback) => {
|
||||||
// Always resolve ALL addresses ourselves; do not trust the caller's
|
// Always resolve ALL addresses ourselves; do not trust the caller's
|
||||||
@@ -572,6 +585,78 @@ function disambiguate(
|
|||||||
return capName(`${name.slice(0, MAX_TOOL_NAME_LENGTH - 14)}_${Date.now()}`);
|
return capName(`${name.slice(0, MAX_TOOL_NAME_LENGTH - 14)}_${Date.now()}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wrap every tool's execute with a per-call total-timeout guard so a single
|
||||||
|
* external MCP tool call that keeps the connection warm but never returns is
|
||||||
|
* aborted after `ms` wall-clock (complements the transport silence timeout).
|
||||||
|
*/
|
||||||
|
export function wrapToolsWithCallTimeout(
|
||||||
|
tools: Record<string, Tool>,
|
||||||
|
ms: number,
|
||||||
|
): Record<string, Tool> {
|
||||||
|
const out: Record<string, Tool> = {};
|
||||||
|
for (const [name, t] of Object.entries(tools)) {
|
||||||
|
out[name] = wrapToolWithCallTimeout(t, ms);
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Per-call total-timeout wrapper for one MCP tool. A fresh AbortController +
|
||||||
|
* timer bounds the call; it is composed with the turn's abortSignal via
|
||||||
|
* AbortSignal.any so EITHER the per-call timeout OR a client disconnect aborts
|
||||||
|
* the call. We RACE the call against the composed abort signal rather than just
|
||||||
|
* awaiting it, because @ai-sdk/mcp does NOT settle its in-flight promise on abort
|
||||||
|
* (verified in @ai-sdk/mcp@1.0.52: request() only does throwIfAborted() once
|
||||||
|
* before send and only re-checks the signal inside the response-message handler,
|
||||||
|
* which runs ONLY when a response arrives). So for a warm-but-stuck call awaiting
|
||||||
|
* `original` alone would hang forever even after the timer aborts.
|
||||||
|
*/
|
||||||
|
export function wrapToolWithCallTimeout(tool: Tool, ms: number): Tool {
|
||||||
|
const original = tool.execute;
|
||||||
|
if (typeof original !== 'function') return tool;
|
||||||
|
const execute = async (args: unknown, options: ToolCallOptions) => {
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
controller.abort(new Error(`MCP tool call timed out after ${ms}ms`));
|
||||||
|
}, ms);
|
||||||
|
timer.unref?.();
|
||||||
|
const abortSignal = options?.abortSignal
|
||||||
|
? AbortSignal.any([options.abortSignal, controller.signal])
|
||||||
|
: controller.signal;
|
||||||
|
// Reject as soon as the composed signal fires, independent of whether
|
||||||
|
// `original` ever settles. The losing `original` promise is left pending; it
|
||||||
|
// is cleaned up when the client is closed at turn end, and Promise.race
|
||||||
|
// attaches a rejection handler to BOTH inputs so a late rejection of either
|
||||||
|
// is never an unhandled rejection (do NOT add an extra .catch — it could
|
||||||
|
// swallow the real result and would break the race semantics).
|
||||||
|
const aborted = new Promise<never>((_, reject) => {
|
||||||
|
const fail = () => reject(abortReason(abortSignal));
|
||||||
|
if (abortSignal.aborted) fail();
|
||||||
|
else abortSignal.addEventListener('abort', fail, { once: true });
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
return await Promise.race([
|
||||||
|
original(args, { ...options, abortSignal }),
|
||||||
|
aborted,
|
||||||
|
]);
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timer);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// `Tool` is a union whose `execute` overloads conflict; cast narrowly so the
|
||||||
|
// wrapped tool keeps every other field while swapping only `execute`.
|
||||||
|
return { ...tool, execute } as unknown as Tool;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** The signal's reason as an Error (informative thrown value on abort/timeout). */
|
||||||
|
function abortReason(signal: AbortSignal): Error {
|
||||||
|
const r = signal.reason;
|
||||||
|
return r instanceof Error
|
||||||
|
? r
|
||||||
|
: new Error(typeof r === 'string' ? r : 'MCP tool call aborted');
|
||||||
|
}
|
||||||
|
|
||||||
/** Reject a promise after `ms`, so a hung connect/tools() never stalls a turn. */
|
/** Reject a promise after `ms`, so a hung connect/tools() never stalls a turn. */
|
||||||
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
|
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
|
||||||
return new Promise<T>((resolve, reject) => {
|
return new Promise<T>((resolve, reject) => {
|
||||||
|
|||||||
@@ -244,6 +244,15 @@ export class PublicShareChatService {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Drain the stream independently of the client socket so the turn always
|
||||||
|
// runs to completion (or to its abort) even when the anonymous client
|
||||||
|
// disconnects — otherwise the dead socket is the only reader, backpressure
|
||||||
|
// stalls the stream, and the per-turn object graph stays rooted (heap-OOM
|
||||||
|
// leak). consumeStream removes that backpressure (AI SDK v6 "Handling
|
||||||
|
// client disconnects"). Fire-and-forget; stream errors are already logged
|
||||||
|
// by the streamText `onError` callback above.
|
||||||
|
void result.consumeStream({ onError: () => undefined });
|
||||||
|
|
||||||
// Stream the UI-message protocol straight to the hijacked Node response.
|
// Stream the UI-message protocol straight to the hijacked Node response.
|
||||||
// Surface the real provider message (AI SDK error bodies never carry the
|
// Surface the real provider message (AI SDK error bodies never carry the
|
||||||
// API key, so this is safe; we never dump the resolved config).
|
// API key, so this is safe; we never dump the resolved config).
|
||||||
|
|||||||
@@ -0,0 +1,18 @@
|
|||||||
|
import { type Kysely } from 'kysely';
|
||||||
|
|
||||||
|
export async function up(db: Kysely<any>): Promise<void> {
|
||||||
|
// Step-granular durability for the assistant turn (#183). The assistant row is
|
||||||
|
// now created UPFRONT (status 'streaming') and UPDATEd as each step completes,
|
||||||
|
// so a process death mid-turn no longer loses the whole answer. The column is
|
||||||
|
// NULLABLE on purpose: rows written before this migration carry NULL, which the
|
||||||
|
// app treats as 'completed' (a settled, pre-status message). Values written by
|
||||||
|
// the app: 'streaming' | 'completed' | 'error' | 'aborted'.
|
||||||
|
await db.schema
|
||||||
|
.alterTable('ai_chat_messages')
|
||||||
|
.addColumn('status', 'text', (col) => col)
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function down(db: Kysely<any>): Promise<void> {
|
||||||
|
await db.schema.alterTable('ai_chat_messages').dropColumn('status').execute();
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
import { InjectKysely } from 'nestjs-kysely';
|
import { InjectKysely } from 'nestjs-kysely';
|
||||||
import { KyselyDB, KyselyTransaction } from '../../types/kysely.types';
|
import { KyselyDB, KyselyTransaction } from '../../types/kysely.types';
|
||||||
import { dbOrTx } from '../../utils';
|
import { dbOrTx } from '../../utils';
|
||||||
@@ -9,8 +9,24 @@ import {
|
|||||||
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
|
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
|
||||||
import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagination';
|
import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagination';
|
||||||
|
|
||||||
|
// Crash-recovery sweep recency threshold (#183 review): a 'streaming' row is
|
||||||
|
// only swept to 'aborted' once it has been UNTOUCHED for this long. A live turn
|
||||||
|
// bumps `updatedAt` on every step (well under this window), so its row never
|
||||||
|
// matches; only a turn whose process truly died (no step update for >threshold)
|
||||||
|
// is swept. Chosen safely ABOVE the longest realistic turn so a fresh replica's
|
||||||
|
// boot-sweep can never abort a turn another replica is actively streaming
|
||||||
|
// (multi-instance deploy).
|
||||||
|
const SWEEP_STREAMING_STALE_MS = 10 * 60 * 1000; // 10 minutes
|
||||||
|
|
||||||
|
// Hard upper bound on the rows materialized by `findAllByChat` (export path).
|
||||||
|
// A generous cap so a pathologically huge chat cannot load an unbounded result
|
||||||
|
// into memory; far above any realistic transcript length.
|
||||||
|
const FIND_ALL_BY_CHAT_LIMIT = 5000;
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class AiChatMessageRepo {
|
export class AiChatMessageRepo {
|
||||||
|
private readonly logger = new Logger(AiChatMessageRepo.name);
|
||||||
|
|
||||||
constructor(@InjectKysely() private readonly db: KyselyDB) {}
|
constructor(@InjectKysely() private readonly db: KyselyDB) {}
|
||||||
|
|
||||||
// The `tsv` column is a trigger-maintained tsvector used only for
|
// The `tsv` column is a trigger-maintained tsvector used only for
|
||||||
@@ -25,6 +41,7 @@ export class AiChatMessageRepo {
|
|||||||
'content',
|
'content',
|
||||||
'toolCalls',
|
'toolCalls',
|
||||||
'metadata',
|
'metadata',
|
||||||
|
'status',
|
||||||
'createdAt',
|
'createdAt',
|
||||||
'updatedAt',
|
'updatedAt',
|
||||||
'deletedAt',
|
'deletedAt',
|
||||||
@@ -60,6 +77,46 @@ export class AiChatMessageRepo {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load ALL (non-deleted) messages of a chat in ascending chronological order
|
||||||
|
// (oldest -> newest), unpaginated. Used by the server-side Markdown export
|
||||||
|
// (#183), where the DB is the single source of truth and the whole transcript
|
||||||
|
// must be rendered in one pass (findByChat is cursor-paginated and would only
|
||||||
|
// return the first page).
|
||||||
|
//
|
||||||
|
// Hard-capped at FIND_ALL_BY_CHAT_LIMIT rows (a generous bound, far above any
|
||||||
|
// realistic transcript) so exporting a pathologically huge chat cannot
|
||||||
|
// materialize an unbounded result set in memory.
|
||||||
|
async findAllByChat(
|
||||||
|
chatId: string,
|
||||||
|
workspaceId: string,
|
||||||
|
// Injectable for tests so truncation can be exercised on a modest volume.
|
||||||
|
limit: number = FIND_ALL_BY_CHAT_LIMIT,
|
||||||
|
): Promise<AiChatMessage[]> {
|
||||||
|
// Fetch newest-first (+1 to DETECT truncation), so on overflow we keep the
|
||||||
|
// NEWEST `limit` messages — the recent conversation matters most for an
|
||||||
|
// export — rather than silently dropping the tail (#183 review). Reverse back
|
||||||
|
// to chronological for rendering, like findRecent.
|
||||||
|
const rows = await this.db
|
||||||
|
.selectFrom('aiChatMessages')
|
||||||
|
.select(this.baseFields)
|
||||||
|
.where('chatId', '=', chatId)
|
||||||
|
.where('workspaceId', '=', workspaceId)
|
||||||
|
.where('deletedAt', 'is', null)
|
||||||
|
.orderBy('createdAt', 'desc')
|
||||||
|
.orderBy('id', 'desc')
|
||||||
|
.limit(limit + 1)
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
if (rows.length > limit) {
|
||||||
|
rows.length = limit; // keep the newest `limit` (rows are newest-first here)
|
||||||
|
this.logger.warn(
|
||||||
|
`Chat ${chatId} export truncated to the newest ${limit} messages ` +
|
||||||
|
`(older messages omitted).`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return rows.reverse();
|
||||||
|
}
|
||||||
|
|
||||||
// Load the most RECENT `limit` messages for a chat and return them in
|
// Load the most RECENT `limit` messages for a chat and return them in
|
||||||
// ascending chronological order (oldest -> newest), as the model expects.
|
// ascending chronological order (oldest -> newest), as the model expects.
|
||||||
// `findByChat` returns the FIRST page ASC (the OLDEST messages), which loses
|
// `findByChat` returns the FIRST page ASC (the OLDEST messages), which loses
|
||||||
@@ -96,4 +153,68 @@ export class AiChatMessageRepo {
|
|||||||
.returning(this.baseFields)
|
.returning(this.baseFields)
|
||||||
.executeTakeFirst();
|
.executeTakeFirst();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update a single message in place by id + workspace (#183 step-granular
|
||||||
|
* durability). The assistant row is created UPFRONT (status 'streaming') and
|
||||||
|
* patched as each step completes, then finalized once on the terminal status.
|
||||||
|
* `updatedAt` is always bumped. Returns the updated row (baseFields) or
|
||||||
|
* undefined when no row matched (e.g. a foreign workspace / deleted row).
|
||||||
|
*/
|
||||||
|
async update(
|
||||||
|
id: string,
|
||||||
|
workspaceId: string,
|
||||||
|
patch: Partial<{
|
||||||
|
content: string | null;
|
||||||
|
toolCalls: unknown;
|
||||||
|
metadata: unknown;
|
||||||
|
status: string | null;
|
||||||
|
}>,
|
||||||
|
opts?: { onlyIfStreaming?: boolean; trx?: KyselyTransaction },
|
||||||
|
): Promise<AiChatMessage | undefined> {
|
||||||
|
const db = dbOrTx(this.db, opts?.trx);
|
||||||
|
let query = db
|
||||||
|
.updateTable('aiChatMessages')
|
||||||
|
.set({ ...(patch as Record<string, unknown>), updatedAt: new Date() })
|
||||||
|
.where('id', '=', id)
|
||||||
|
.where('workspaceId', '=', workspaceId);
|
||||||
|
// Concurrency guard (#183 review): a per-step 'streaming' update must NEVER
|
||||||
|
// overwrite a row the terminal callback already finalized. onStepFinish
|
||||||
|
// fires the streaming update fire-and-forget, so its UPDATE can land AFTER
|
||||||
|
// finalize on a DIFFERENT pool connection (commit order is not guaranteed).
|
||||||
|
// Scoping the streaming update to rows STILL in 'streaming' makes a late
|
||||||
|
// update a no-op once the row is completed/error/aborted — regardless of
|
||||||
|
// commit order. The terminal finalize runs WITHOUT this guard so it always
|
||||||
|
// wins.
|
||||||
|
if (opts?.onlyIfStreaming) {
|
||||||
|
query = query.where('status', '=', 'streaming');
|
||||||
|
}
|
||||||
|
return query.returning(this.baseFields).executeTakeFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Crash-recovery sweep (#183): flip every assistant row still left in the
|
||||||
|
* 'streaming' state (a turn that died mid-write before reaching a terminal
|
||||||
|
* status) to 'aborted'. Run once on server start. Returns the number of rows
|
||||||
|
* swept so the caller can log it. Workspace-wide on purpose — a crash can have
|
||||||
|
* dangling streaming rows across any workspace.
|
||||||
|
*
|
||||||
|
* Bounded by recency (#183 review): only rows UNTOUCHED for
|
||||||
|
* SWEEP_STREAMING_STALE_MS are swept. A live turn bumps `updatedAt` on every
|
||||||
|
* step, so an actively-streaming row never matches; this prevents a fresh
|
||||||
|
* replica's boot-sweep from aborting a turn another replica is still streaming
|
||||||
|
* in a multi-instance deploy.
|
||||||
|
*/
|
||||||
|
async sweepStreaming(trx?: KyselyTransaction): Promise<number> {
|
||||||
|
const db = dbOrTx(this.db, trx);
|
||||||
|
const staleBefore = new Date(Date.now() - SWEEP_STREAMING_STALE_MS);
|
||||||
|
const rows = await db
|
||||||
|
.updateTable('aiChatMessages')
|
||||||
|
.set({ status: 'aborted', updatedAt: new Date() })
|
||||||
|
.where('status', '=', 'streaming')
|
||||||
|
.where('updatedAt', '<', staleBefore)
|
||||||
|
.returning('id')
|
||||||
|
.execute();
|
||||||
|
return rows.length;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
4
apps/server/src/database/types/db.d.ts
vendored
4
apps/server/src/database/types/db.d.ts
vendored
@@ -620,6 +620,10 @@ export interface AiChatMessages {
|
|||||||
content: string | null;
|
content: string | null;
|
||||||
toolCalls: Json | null;
|
toolCalls: Json | null;
|
||||||
metadata: Json | null;
|
metadata: Json | null;
|
||||||
|
// Turn lifecycle status (#183): 'streaming' | 'completed' | 'error' |
|
||||||
|
// 'aborted'. NULL on rows written before the status column existed; the app
|
||||||
|
// treats NULL as 'completed' (a settled, pre-status message).
|
||||||
|
status: string | null;
|
||||||
tsv: string | null;
|
tsv: string | null;
|
||||||
createdAt: Generated<Timestamp>;
|
createdAt: Generated<Timestamp>;
|
||||||
updatedAt: Generated<Timestamp>;
|
updatedAt: Generated<Timestamp>;
|
||||||
|
|||||||
@@ -70,6 +70,47 @@ export function streamKeepAliveMs(): number {
|
|||||||
return positiveEnv('AI_STREAM_KEEPALIVE_MS', DEFAULT_STREAM_KEEPALIVE_MS);
|
return positiveEnv('AI_STREAM_KEEPALIVE_MS', DEFAULT_STREAM_KEEPALIVE_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Default SILENCE timeout for EXTERNAL-MCP transport (5 min). */
|
||||||
|
const DEFAULT_MCP_STREAM_TIMEOUT_MS = 300_000;
|
||||||
|
|
||||||
|
/** Default total wall-clock cap for ONE external MCP tool call (15 min). */
|
||||||
|
const DEFAULT_MCP_CALL_TIMEOUT_MS = 900_000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SILENCE timeout (ms) for EXTERNAL-MCP transport ONLY. Override with
|
||||||
|
* `AI_MCP_STREAM_TIMEOUT_MS`; a missing/invalid/non-positive value falls back to
|
||||||
|
* {@link DEFAULT_MCP_STREAM_TIMEOUT_MS} (5 min).
|
||||||
|
*
|
||||||
|
* Deliberately tighter than the chat provider's {@link streamTimeoutMs} (15 min)
|
||||||
|
* so a byte-silent/hung MCP upstream is broken in ~5 min instead of 15. This is
|
||||||
|
* the undici `headersTimeout`/`bodyTimeout` for the external-MCP dispatcher only
|
||||||
|
* — it must NOT change the chat provider, which legitimately needs 15 min between
|
||||||
|
* reasoning chunks (#175).
|
||||||
|
*
|
||||||
|
* Trade-off: a legitimately long but byte-silent single tool call (a slow crawl
|
||||||
|
* that emits nothing until done) and an SSE transport that idles >5 min BETWEEN
|
||||||
|
* tool calls are also cut here. The per-call total cap ({@link mcpCallTimeoutMs},
|
||||||
|
* applied in mcp-clients.service) is the complementary guard for chatty-but-stuck
|
||||||
|
* calls that keep the socket warm yet never return.
|
||||||
|
*/
|
||||||
|
export function mcpStreamTimeoutMs(): number {
|
||||||
|
return positiveEnv('AI_MCP_STREAM_TIMEOUT_MS', DEFAULT_MCP_STREAM_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total wall-clock cap (ms) for ONE external MCP tool call — APP-LEVEL, not
|
||||||
|
* transport. Override with `AI_MCP_CALL_TIMEOUT_MS`; a missing/invalid/
|
||||||
|
* non-positive value falls back to {@link DEFAULT_MCP_CALL_TIMEOUT_MS} (15 min).
|
||||||
|
*
|
||||||
|
* Catches a tool that keeps the connection warm (SSE heartbeats / trickle) but
|
||||||
|
* never returns a result — which the transport silence timeout
|
||||||
|
* ({@link mcpStreamTimeoutMs}) would never break because the socket never goes
|
||||||
|
* byte-silent.
|
||||||
|
*/
|
||||||
|
export function mcpCallTimeoutMs(): number {
|
||||||
|
return positiveEnv('AI_MCP_CALL_TIMEOUT_MS', DEFAULT_MCP_CALL_TIMEOUT_MS);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* undici `Agent` options for streaming AI traffic — the (generous, finite)
|
* undici `Agent` options for streaming AI traffic — the (generous, finite)
|
||||||
* silence timeouts plus the keep-alive recycle window. Shared by the chat
|
* silence timeouts plus the keep-alive recycle window. Shared by the chat
|
||||||
|
|||||||
270
apps/server/test/integration/ai-chat-message-status.int-spec.ts
Normal file
270
apps/server/test/integration/ai-chat-message-status.int-spec.ts
Normal file
@@ -0,0 +1,270 @@
|
|||||||
|
import { Kysely } from 'kysely';
|
||||||
|
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
|
||||||
|
import {
|
||||||
|
getTestDb,
|
||||||
|
destroyTestDb,
|
||||||
|
createWorkspace,
|
||||||
|
createUser,
|
||||||
|
createChat,
|
||||||
|
createMessage,
|
||||||
|
} from './db';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration coverage for the #183 step-granular durability primitives on
|
||||||
|
* AiChatMessageRepo: `update` (in-place patch by id+workspace, bumps updatedAt,
|
||||||
|
* returns the row) and `sweepStreaming` (crash recovery: flip dangling
|
||||||
|
* 'streaming' rows to 'aborted'). Real SQL against docmost_test, not a mock.
|
||||||
|
*/
|
||||||
|
describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
|
||||||
|
let db: Kysely<any>;
|
||||||
|
let repo: AiChatMessageRepo;
|
||||||
|
let workspaceId: string;
|
||||||
|
let otherWorkspaceId: string;
|
||||||
|
let userId: string;
|
||||||
|
let chatId: string;
|
||||||
|
let otherChatId: string;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
db = getTestDb();
|
||||||
|
repo = new AiChatMessageRepo(db as any);
|
||||||
|
workspaceId = (await createWorkspace(db)).id;
|
||||||
|
otherWorkspaceId = (await createWorkspace(db)).id;
|
||||||
|
userId = (await createUser(db, workspaceId)).id;
|
||||||
|
chatId = (await createChat(db, { workspaceId, creatorId: userId })).id;
|
||||||
|
const otherUser = await createUser(db, otherWorkspaceId);
|
||||||
|
otherChatId = (
|
||||||
|
await createChat(db, {
|
||||||
|
workspaceId: otherWorkspaceId,
|
||||||
|
creatorId: otherUser.id,
|
||||||
|
})
|
||||||
|
).id;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await destroyTestDb();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('update patches content/status/metadata and bumps updatedAt', async () => {
|
||||||
|
const seeded = await repo.insert({
|
||||||
|
chatId,
|
||||||
|
workspaceId,
|
||||||
|
userId,
|
||||||
|
role: 'assistant',
|
||||||
|
content: '',
|
||||||
|
status: 'streaming',
|
||||||
|
metadata: { parts: [] } as never,
|
||||||
|
});
|
||||||
|
const before = seeded.updatedAt;
|
||||||
|
// Ensure a measurable timestamp delta.
|
||||||
|
await new Promise((r) => setTimeout(r, 5));
|
||||||
|
|
||||||
|
const updated = await repo.update(seeded.id, workspaceId, {
|
||||||
|
content: 'final answer',
|
||||||
|
status: 'completed',
|
||||||
|
metadata: { parts: [{ type: 'text', text: 'final answer' }] },
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(updated).toBeDefined();
|
||||||
|
expect(updated!.content).toBe('final answer');
|
||||||
|
expect(updated!.status).toBe('completed');
|
||||||
|
expect((updated!.metadata as any).parts).toHaveLength(1);
|
||||||
|
// The 5ms sleep above guarantees a strictly-later timestamp.
|
||||||
|
expect(new Date(updated!.updatedAt).getTime()).toBeGreaterThan(
|
||||||
|
new Date(before).getTime(),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('onlyIfStreaming update is a NO-OP once the row is finalized (race guard)', async () => {
|
||||||
|
// Reproduce the step-update-vs-finalize race (#183 review): the row is
|
||||||
|
// finalized to 'completed', then a LATE per-step 'streaming' update lands.
|
||||||
|
// With `onlyIfStreaming` it must match nothing and leave the finalized row
|
||||||
|
// untouched (no clobber back to 'streaming', no lost usage).
|
||||||
|
const seeded = await repo.insert({
|
||||||
|
chatId,
|
||||||
|
workspaceId,
|
||||||
|
userId,
|
||||||
|
role: 'assistant',
|
||||||
|
content: 'partial',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
// Terminal finalize (unguarded) wins.
|
||||||
|
await repo.update(seeded.id, workspaceId, {
|
||||||
|
content: 'final answer',
|
||||||
|
status: 'completed',
|
||||||
|
metadata: { usage: { totalTokens: 42 } } as never,
|
||||||
|
});
|
||||||
|
// A straggler per-step update arrives AFTER finalize.
|
||||||
|
const late = await repo.update(
|
||||||
|
seeded.id,
|
||||||
|
workspaceId,
|
||||||
|
{ content: 'partial', status: 'streaming', metadata: {} as never },
|
||||||
|
{ onlyIfStreaming: true },
|
||||||
|
);
|
||||||
|
expect(late).toBeUndefined(); // matched no 'streaming' row -> no-op
|
||||||
|
const rows = await repo.findAllByChat(chatId, workspaceId);
|
||||||
|
const row = rows.find((r) => r.id === seeded.id)!;
|
||||||
|
expect(row.status).toBe('completed'); // NOT clobbered back to streaming
|
||||||
|
expect(row.content).toBe('final answer');
|
||||||
|
expect((row.metadata as any).usage.totalTokens).toBe(42); // usage preserved
|
||||||
|
});
|
||||||
|
|
||||||
|
it('update is workspace-scoped: a foreign workspace id matches nothing', async () => {
|
||||||
|
const seeded = await repo.insert({
|
||||||
|
chatId,
|
||||||
|
workspaceId,
|
||||||
|
userId,
|
||||||
|
role: 'assistant',
|
||||||
|
content: 'orig',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
const res = await repo.update(seeded.id, otherWorkspaceId, {
|
||||||
|
status: 'completed',
|
||||||
|
});
|
||||||
|
expect(res).toBeUndefined();
|
||||||
|
// The row in the real workspace is untouched.
|
||||||
|
const rows = await repo.findAllByChat(chatId, workspaceId);
|
||||||
|
const stillThere = rows.find((r) => r.id === seeded.id);
|
||||||
|
expect(stillThere!.status).toBe('streaming');
|
||||||
|
// Clean up so it does not pollute the sweep test below.
|
||||||
|
await repo.update(seeded.id, workspaceId, { status: 'completed' });
|
||||||
|
});
|
||||||
|
|
||||||
|
// Backdate a row's updatedAt so it qualifies as a STALE streaming row (the
|
||||||
|
// sweep only flips rows untouched for >10 minutes — a live turn bumps
|
||||||
|
// updatedAt every step, so it would never match).
|
||||||
|
async function backdateUpdatedAt(
|
||||||
|
id: string,
|
||||||
|
minutesAgo: number,
|
||||||
|
): Promise<void> {
|
||||||
|
await db
|
||||||
|
.updateTable('aiChatMessages')
|
||||||
|
.set({ updatedAt: new Date(Date.now() - minutesAgo * 60 * 1000) })
|
||||||
|
.where('id', '=', id)
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
it('sweepStreaming flips STALE dangling streaming rows to aborted and counts them', async () => {
|
||||||
|
// Two dangling streaming rows in our workspace + one in another workspace —
|
||||||
|
// all backdated past the staleness threshold so the sweep picks them up.
|
||||||
|
const a = await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId,
|
||||||
|
role: 'assistant',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
const b = await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId,
|
||||||
|
role: 'assistant',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
const other = await createMessage(db, {
|
||||||
|
workspaceId: otherWorkspaceId,
|
||||||
|
chatId: otherChatId,
|
||||||
|
role: 'assistant',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
await backdateUpdatedAt(a.id, 20);
|
||||||
|
await backdateUpdatedAt(b.id, 20);
|
||||||
|
await backdateUpdatedAt(other.id, 20);
|
||||||
|
|
||||||
|
// A settled row must NOT be touched.
|
||||||
|
const done = await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId,
|
||||||
|
role: 'assistant',
|
||||||
|
status: 'completed',
|
||||||
|
});
|
||||||
|
// A legacy NULL-status row must NOT be touched.
|
||||||
|
const legacy = await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId,
|
||||||
|
role: 'assistant',
|
||||||
|
status: null,
|
||||||
|
});
|
||||||
|
|
||||||
|
const swept = await repo.sweepStreaming();
|
||||||
|
// At least the 3 stale streaming rows we created (2 here + 1 in the other ws).
|
||||||
|
expect(swept).toBeGreaterThanOrEqual(3);
|
||||||
|
|
||||||
|
const rows = await repo.findAllByChat(chatId, workspaceId);
|
||||||
|
const byId = new Map(rows.map((r) => [r.id, r]));
|
||||||
|
expect(byId.get(a.id)!.status).toBe('aborted');
|
||||||
|
expect(byId.get(b.id)!.status).toBe('aborted');
|
||||||
|
expect(byId.get(done.id)!.status).toBe('completed');
|
||||||
|
expect(byId.get(legacy.id)!.status).toBeNull();
|
||||||
|
|
||||||
|
// Idempotent: a second sweep finds nothing left in our seeded set.
|
||||||
|
const again = await repo.sweepStreaming();
|
||||||
|
const rows2 = await repo.findAllByChat(chatId, workspaceId);
|
||||||
|
// Our two rows stay aborted regardless of `again`'s global count.
|
||||||
|
expect(rows2.find((r) => r.id === a.id)!.status).toBe('aborted');
|
||||||
|
expect(again).toBeGreaterThanOrEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sweepStreaming does NOT sweep a FRESH streaming row (recency bound, #183 review)', async () => {
|
||||||
|
// A row that is actively streaming (recent updatedAt) must survive the sweep:
|
||||||
|
// a fresh replica's boot-sweep must never abort a turn another replica is
|
||||||
|
// still streaming in a multi-instance deploy.
|
||||||
|
const fresh = await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId,
|
||||||
|
role: 'assistant',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
// A STALE streaming row created alongside it IS swept — proving the sweep
|
||||||
|
// ran and the only difference is recency.
|
||||||
|
const stale = await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId,
|
||||||
|
role: 'assistant',
|
||||||
|
status: 'streaming',
|
||||||
|
});
|
||||||
|
await backdateUpdatedAt(stale.id, 20);
|
||||||
|
|
||||||
|
await repo.sweepStreaming();
|
||||||
|
|
||||||
|
const rows = await repo.findAllByChat(chatId, workspaceId);
|
||||||
|
const byId = new Map(rows.map((r) => [r.id, r]));
|
||||||
|
// Fresh (recently-updated) streaming row is left untouched...
|
||||||
|
expect(byId.get(fresh.id)!.status).toBe('streaming');
|
||||||
|
// ...while the stale one alongside it was swept to 'aborted'.
|
||||||
|
expect(byId.get(stale.id)!.status).toBe('aborted');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('findAllByChat caps the result, keeping the NEWEST messages in order (#183 review)', async () => {
|
||||||
|
// A dedicated chat so the cap test is independent of the rows above.
|
||||||
|
const cappedChat = (
|
||||||
|
await createChat(db, { workspaceId, creatorId: userId })
|
||||||
|
).id;
|
||||||
|
const base = Date.now();
|
||||||
|
// Three messages at strictly increasing timestamps.
|
||||||
|
await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId: cappedChat,
|
||||||
|
content: 'm1-oldest',
|
||||||
|
createdAt: new Date(base),
|
||||||
|
});
|
||||||
|
await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId: cappedChat,
|
||||||
|
content: 'm2',
|
||||||
|
createdAt: new Date(base + 1000),
|
||||||
|
});
|
||||||
|
await createMessage(db, {
|
||||||
|
workspaceId,
|
||||||
|
chatId: cappedChat,
|
||||||
|
content: 'm3-newest',
|
||||||
|
createdAt: new Date(base + 2000),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Cap of 2 -> the OLDEST message is dropped; the newest two stay, in
|
||||||
|
// chronological order (oldest -> newest).
|
||||||
|
const capped = await repo.findAllByChat(cappedChat, workspaceId, 2);
|
||||||
|
expect(capped.map((r) => r.content)).toEqual(['m2', 'm3-newest']);
|
||||||
|
|
||||||
|
// Without a cap (well above the row count) all three come back in order.
|
||||||
|
const all = await repo.findAllByChat(cappedChat, workspaceId, 100);
|
||||||
|
expect(all.map((r) => r.content)).toEqual(['m1-oldest', 'm2', 'm3-newest']);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -104,7 +104,8 @@ export async function createWorkspace(
|
|||||||
name: overrides.name ?? `ws-${suffix}`,
|
name: overrides.name ?? `ws-${suffix}`,
|
||||||
// hostname is uniquely constrained; keep it unique per workspace.
|
// hostname is uniquely constrained; keep it unique per workspace.
|
||||||
hostname: `host-${suffix}`,
|
hostname: `host-${suffix}`,
|
||||||
settings: overrides.settings === undefined ? null : (overrides.settings as any),
|
settings:
|
||||||
|
overrides.settings === undefined ? null : (overrides.settings as any),
|
||||||
})
|
})
|
||||||
.returning(['id', 'settings'])
|
.returning(['id', 'settings'])
|
||||||
.executeTakeFirstOrThrow();
|
.executeTakeFirstOrThrow();
|
||||||
@@ -226,3 +227,37 @@ export async function createChat(
|
|||||||
.executeTakeFirstOrThrow();
|
.executeTakeFirstOrThrow();
|
||||||
return { id: row.id as string };
|
return { id: row.id as string };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function createMessage(
|
||||||
|
db: Kysely<any>,
|
||||||
|
args: {
|
||||||
|
workspaceId: string;
|
||||||
|
chatId: string;
|
||||||
|
userId?: string | null;
|
||||||
|
role?: string;
|
||||||
|
content?: string | null;
|
||||||
|
status?: string | null;
|
||||||
|
metadata?: unknown;
|
||||||
|
// Explicit timestamp so a test can control message ORDER (the default DB
|
||||||
|
// now() can tie within a millisecond, and the v4 id is not time-ordered).
|
||||||
|
createdAt?: Date;
|
||||||
|
},
|
||||||
|
): Promise<{ id: string }> {
|
||||||
|
const id = randomUUID();
|
||||||
|
const row = await db
|
||||||
|
.insertInto('aiChatMessages')
|
||||||
|
.values({
|
||||||
|
id,
|
||||||
|
workspaceId: args.workspaceId,
|
||||||
|
chatId: args.chatId,
|
||||||
|
userId: args.userId ?? null,
|
||||||
|
role: args.role ?? 'assistant',
|
||||||
|
content: args.content ?? null,
|
||||||
|
status: args.status ?? null,
|
||||||
|
metadata: (args.metadata ?? null) as any,
|
||||||
|
...(args.createdAt ? { createdAt: args.createdAt } : {}),
|
||||||
|
})
|
||||||
|
.returning(['id'])
|
||||||
|
.executeTakeFirstOrThrow();
|
||||||
|
return { id: row.id as string };
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user