15-point review of the persistent-history PR. Architecture decisions: crash recovery = recency threshold; tool-label duplication = leave as-is. Must-fix: 1. Boot-sweep bounded by recency. sweepStreaming now also requires `updatedAt < now() - SWEEP_STREAMING_STALE_MS` (10 min), so a fresh replica's startup sweep can't abort a turn another replica is actively streaming (multi-instance deploy). Int-spec: a FRESH 'streaming' row is NOT swept, a STALE one IS. 2. Restore export during the FIRST streaming turn of a new chat (#174). The server chatId is now adopted EARLY (in-place, on the start-chunk metadata) via a new `onServerChatId` callback wired through use-chat-session → chat-thread, so `activeChatId` is set at turn start and the Copy button is live mid-first- turn (canExport = !!activeChatId). Hook tests for early/in-place/no-op adopt. 3. Cover finalizeAssistant's fallback-insert branch: extracted pure `planFinalizeAssistant(assistantId)` (update when id present, insert when the upfront insert failed) + a dispatch harness test for both arms. Tests: onModuleInit lifecycle spec (sweep called; throw → resolves + warns); int-spec updatedAt assertion → toBeGreaterThan. Cleanups: cap findAllByChat at 5000 rows; upfront-insert-failure log carries chatId+workspaceId; removed the now-dead buildPartialAssistantRecord (only the spec consumed it; shapes still pinned by the flushAssistant suite); controller passes `lang: dto.lang` (normalizeLang handles undefined); dropped a no-op `?? undefined` in errorOf; documented the content-column semantics change (concatenated step text, UI renders from metadata.parts); CHANGELOG [Unreleased] entry (#183, #174); reworded the stale LABELS parity comment. Verified: server build + 323 ai-chat unit + 5 integration; client tsc + 160 ai-chat unit; prettier clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -194,6 +194,7 @@ export default function AiChatWindow() {
|
||||
threadKey,
|
||||
waitingForHistory,
|
||||
onTurnFinished,
|
||||
onServerChatId,
|
||||
cancelPendingAdoption,
|
||||
} = useChatSession({
|
||||
activeChatId,
|
||||
@@ -238,7 +239,10 @@ export default function AiChatWindow() {
|
||||
// SERVER-sourced (the DB is the single source of truth — #183): the assistant
|
||||
// row is persisted upfront + per step, so even a brand-new chat whose first
|
||||
// turn is streaming/interrupted has a server row to render. Enable the button
|
||||
// whenever a persisted chat is active (`activeChatId` is set).
|
||||
// whenever a persisted chat is active (`activeChatId` is set). For a BRAND-NEW
|
||||
// chat that id is adopted EARLY — at the stream's `start` chunk via
|
||||
// onServerChatId (#174) — so the Copy button is available during the first
|
||||
// turn's stream, not only after it terminates.
|
||||
const activeChat = useMemo(
|
||||
() => chats?.items?.find((c) => c.id === activeChatId) ?? null,
|
||||
[chats, activeChatId],
|
||||
@@ -629,6 +633,7 @@ export default function AiChatWindow() {
|
||||
onRolePicked={(role) => setSelectedRoleId(role.id)}
|
||||
assistantName={currentRole?.name}
|
||||
onTurnFinished={onTurnFinished}
|
||||
onServerChatId={onServerChatId}
|
||||
onLiveTurnTokens={setLiveTurnTokens}
|
||||
/>
|
||||
)}
|
||||
|
||||
@@ -61,6 +61,12 @@ interface ChatThreadProps {
|
||||
* authoritative id the server streamed on the assistant message metadata, or
|
||||
* undefined on a failed turn — see adopt-chat-id.ts for the full #137 design. */
|
||||
onTurnFinished: (serverChatId?: string) => void;
|
||||
/** Called EARLY (at the stream's `start` chunk) with the authoritative server
|
||||
* chat id streamed on the assistant message metadata, so a brand-new chat
|
||||
* adopts its real id WHILE the first turn is still streaming (#174 — makes the
|
||||
* Copy/export button available mid-stream). Distinct from onTurnFinished,
|
||||
* which fires only at the terminal outcome. */
|
||||
onServerChatId?: (serverChatId?: string) => void;
|
||||
/** Reports the live turn-token total (reasoning + output) for the in-flight
|
||||
* turn so the parent can show a header badge that ticks mid-stream. THROTTLED
|
||||
* here (~8 Hz) so the parent re-renders a handful of times a second, not on
|
||||
@@ -110,6 +116,7 @@ export default function ChatThread({
|
||||
onRolePicked,
|
||||
assistantName,
|
||||
onTurnFinished,
|
||||
onServerChatId,
|
||||
onLiveTurnTokens,
|
||||
}: ChatThreadProps) {
|
||||
const { t } = useTranslation();
|
||||
@@ -279,6 +286,26 @@ export default function ChatThread({
|
||||
// Keep the flush helper pointed at the latest sendMessage instance.
|
||||
sendMessageRef.current = sendMessage;
|
||||
|
||||
// EARLY chat-id adoption (#174): the server streams the authoritative chat id
|
||||
// on the assistant message metadata at the `start` chunk (message.metadata.
|
||||
// chatId — see adopt-chat-id.ts / chatStreamMetadata). Forward it to the parent
|
||||
// AS SOON AS it appears (mid-stream), so a brand-new chat adopts its real id
|
||||
// WHILE the first turn is still streaming and activeChatId-gated affordances
|
||||
// (the Copy/export button) light up immediately, instead of only at onFinish.
|
||||
// Keyed by the last-seen id so we forward each distinct id exactly once. The
|
||||
// parent's onServerChatId is idempotent and a no-op once the chat has an id.
|
||||
const lastForwardedChatIdRef = useRef<string | undefined>(undefined);
|
||||
useEffect(() => {
|
||||
if (!onServerChatId) return;
|
||||
const tail = messages[messages.length - 1];
|
||||
if (tail?.role !== "assistant") return;
|
||||
const serverChatId = extractServerChatId(tail);
|
||||
if (!serverChatId || serverChatId === lastForwardedChatIdRef.current)
|
||||
return;
|
||||
lastForwardedChatIdRef.current = serverChatId;
|
||||
onServerChatId(serverChatId);
|
||||
}, [messages, onServerChatId]);
|
||||
|
||||
// Live "turn was interrupted" marker for the CURRENT session. The red error
|
||||
// banner (driven by `error`) covers the error case; this covers an aborted
|
||||
// turn, distinguishing a manual Stop (`isAbort`) from a dropped connection
|
||||
|
||||
@@ -64,7 +64,10 @@ describe("useChatSession", () => {
|
||||
result.current.onTurnFinished(undefined);
|
||||
expect(setActiveChatId).not.toHaveBeenCalled();
|
||||
// The refetch lands with the new row => adopt it.
|
||||
rerender({ activeChatId: null, chats: { items: [{ id: "x" }, { id: "new" }] } });
|
||||
rerender({
|
||||
activeChatId: null,
|
||||
chats: { items: [{ id: "x" }, { id: "new" }] },
|
||||
});
|
||||
expect(setActiveChatId).toHaveBeenCalledWith("new");
|
||||
});
|
||||
|
||||
@@ -88,7 +91,10 @@ describe("useChatSession", () => {
|
||||
});
|
||||
result.current.onTurnFinished(undefined);
|
||||
// a was deleted, new was added — same length, but membership changed.
|
||||
rerender({ activeChatId: null, chats: { items: [{ id: "b" }, { id: "new" }] } });
|
||||
rerender({
|
||||
activeChatId: null,
|
||||
chats: { items: [{ id: "b" }, { id: "new" }] },
|
||||
});
|
||||
expect(setActiveChatId).toHaveBeenCalledWith("new");
|
||||
});
|
||||
|
||||
@@ -171,6 +177,40 @@ describe("useChatSession", () => {
|
||||
expect(setActiveChatId).not.toHaveBeenCalledWith("late");
|
||||
});
|
||||
|
||||
it("#174 early adopt: onServerChatId adopts the streamed id mid-stream (Copy button available during the first turn)", () => {
|
||||
// Brand-new chat: no id yet. The server streams the real chat id "A" on the
|
||||
// `start` chunk WHILE the first turn is still streaming (before onTurnFinished
|
||||
// fires at the terminal outcome). The hook must adopt it immediately so the
|
||||
// window's activeChatId-gated Copy/export button lights up during the stream.
|
||||
const { result, setActiveChatId } = setup({
|
||||
activeChatId: null,
|
||||
chats: { items: [] },
|
||||
});
|
||||
result.current.onServerChatId("A");
|
||||
expect(setActiveChatId).toHaveBeenCalledWith("A");
|
||||
});
|
||||
|
||||
it("#174 early adopt is in-place: threadKey stays stable (live stream not torn down)", () => {
|
||||
const chats = { items: [] };
|
||||
const { result, rerender } = setup({ activeChatId: null, chats });
|
||||
const keyBefore = result.current.threadKey;
|
||||
result.current.onServerChatId("A");
|
||||
// Parent reflects the adopted id back in; the SAME mount key is kept so the
|
||||
// in-flight useChat store (the streaming turn) is preserved.
|
||||
rerender({ activeChatId: "A", chats });
|
||||
expect(result.current.threadKey).toBe(keyBefore);
|
||||
});
|
||||
|
||||
it("#174 early adopt: no-op for an existing chat and for a missing id", () => {
|
||||
const { result, setActiveChatId } = setup({
|
||||
activeChatId: "chat-1",
|
||||
chats: { items: [{ id: "chat-1" }] },
|
||||
});
|
||||
result.current.onServerChatId("chat-1"); // already has an id
|
||||
result.current.onServerChatId(undefined); // no streamed id
|
||||
expect(setActiveChatId).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("in-place adopt keeps threadKey stable; an external switch remounts", () => {
|
||||
const chats = { items: [{ id: "B" }] };
|
||||
const { result, rerender } = setup({ activeChatId: null, chats });
|
||||
|
||||
@@ -34,6 +34,13 @@ export interface UseChatSessionResult {
|
||||
/** Call when a turn finishes; `serverChatId` is the authoritative streamed id
|
||||
* (undefined on a failed turn). Handles new-chat id adoption + invalidations. */
|
||||
onTurnFinished: (serverChatId?: string) => void;
|
||||
/** Call EARLY (at the stream's `start` chunk) with the authoritative streamed
|
||||
* chat id so a brand-new chat adopts its real id WHILE its first turn is still
|
||||
* streaming — making `activeChatId`-gated affordances (e.g. the Copy/export
|
||||
* button, #174) available immediately. In-place adoption only (same mount key,
|
||||
* no list/messages invalidation — that is left to onTurnFinished at the end).
|
||||
* Idempotent and a no-op once the chat already has an id. */
|
||||
onServerChatId: (serverChatId?: string) => void;
|
||||
/** Disarm any pending error-path new-chat fallback. The window calls this from
|
||||
* startNewChat/selectChat so a late refetch can't yank the user back into a
|
||||
* just-failed chat after they explicitly moved on. */
|
||||
@@ -85,13 +92,10 @@ export function useChatSession(
|
||||
// `newThread`/`switchThread` to (re)mount, `adoptThread` for in-place adoption.
|
||||
// Initial: a non-null activeChatId switches to it; a null one gets a fresh
|
||||
// session key with no chat id yet.
|
||||
const [thread, dispatch] = useReducer(
|
||||
threadSessionReducer,
|
||||
undefined,
|
||||
() =>
|
||||
activeChatId === null
|
||||
? newThread(`new-${generateId()}`)
|
||||
: switchThread(activeChatId),
|
||||
const [thread, dispatch] = useReducer(threadSessionReducer, undefined, () =>
|
||||
activeChatId === null
|
||||
? newThread(`new-${generateId()}`)
|
||||
: switchThread(activeChatId),
|
||||
);
|
||||
|
||||
// Error-path fallback for new-chat id adoption. When a brand-new chat's first
|
||||
@@ -150,6 +154,31 @@ export function useChatSession(
|
||||
[chats, setActiveChatId, onInvalidateChatList, onInvalidateChatMessages],
|
||||
);
|
||||
|
||||
// EARLY adoption (#174): adopt the authoritative streamed chat id the moment
|
||||
// the server emits it on the `start` chunk, so a brand-new chat gets its real
|
||||
// `activeChatId` WHILE its first turn streams — not only at terminal
|
||||
// onTurnFinished. This makes the activeChatId-gated Copy/export button
|
||||
// available during the first turn. Pure in-place adoption (same mount key, like
|
||||
// the primary path) with NO invalidation: the list/messages refresh stays on
|
||||
// onTurnFinished at the end of the turn. Reads the live id from the ref so a
|
||||
// repeat call after adoption is a no-op (resolveAdoptedChatId only fires for a
|
||||
// still-new chat).
|
||||
const onServerChatId = useCallback(
|
||||
(serverChatId?: string) => {
|
||||
const adopted = resolveAdoptedChatId(
|
||||
activeChatIdRef.current,
|
||||
serverChatId,
|
||||
);
|
||||
if (!adopted) return;
|
||||
activeChatIdRef.current = adopted;
|
||||
setActiveChatId(adopted);
|
||||
dispatch({ type: "adopt", chatId: adopted });
|
||||
// Early adoption beat the error-path fallback to it — disarm.
|
||||
pendingNewChatRef.current = null;
|
||||
},
|
||||
[setActiveChatId],
|
||||
);
|
||||
|
||||
// FALLBACK resolver. Armed only by onTurnFinished when a brand-new chat's first
|
||||
// turn errored before the `start` chunk (no authoritative id streamed). Once
|
||||
// the per-user list refetch lands with the just-created row, adopt the SINGLE
|
||||
@@ -233,6 +262,7 @@ export function useChatSession(
|
||||
threadKey: thread.key,
|
||||
waitingForHistory,
|
||||
onTurnFinished,
|
||||
onServerChatId,
|
||||
cancelPendingAdoption,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
import { ForbiddenException } from '@nestjs/common';
|
||||
import { AiChatController } from './ai-chat.controller';
|
||||
import {
|
||||
planFinalizeAssistant,
|
||||
flushAssistant,
|
||||
type AssistantFlush,
|
||||
} from './ai-chat.service';
|
||||
import type { User, Workspace } from '@docmost/db/types/entity.types';
|
||||
|
||||
/**
|
||||
@@ -90,3 +95,74 @@ describe('AiChatController.export', () => {
|
||||
expect(res.markdown).toContain('## 2. ИИ-агент');
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* The terminal-finalize dispatch (#183): the assistant row is INSERTed upfront
|
||||
* as 'streaming' and finalized once on the terminal callback. When the upfront
|
||||
* insert SUCCEEDED (we hold an id) finalize UPDATEs that row; when it FAILED
|
||||
* (assistantId is undefined) finalize falls back to INSERTing the terminal row
|
||||
* so the turn is not lost — the only safety against losing the turn entirely.
|
||||
*
|
||||
* `planFinalizeAssistant` is the pure decision; this also drives a tiny harness
|
||||
* that mirrors the service's `finalizeAssistant` repo dispatch over a mock repo,
|
||||
* proving both branches issue the right call with the terminal payload.
|
||||
*/
|
||||
describe('finalizeAssistant dispatch (planFinalizeAssistant)', () => {
|
||||
const workspaceId = 'ws1';
|
||||
|
||||
// Mirror of the service's finalize repo-dispatch over the plan: UPDATE when an
|
||||
// upfront row exists, else INSERT the terminal row.
|
||||
async function dispatchFinalize(
|
||||
repo: { insert: jest.Mock; update: jest.Mock },
|
||||
assistantId: string | undefined,
|
||||
flushed: AssistantFlush,
|
||||
): Promise<void> {
|
||||
const plan = planFinalizeAssistant(assistantId);
|
||||
if (plan.kind === 'insert') {
|
||||
await repo.insert({
|
||||
chatId: 'c1',
|
||||
workspaceId,
|
||||
userId: 'u1',
|
||||
role: 'assistant',
|
||||
content: flushed.content,
|
||||
toolCalls: flushed.toolCalls ?? null,
|
||||
metadata: flushed.metadata,
|
||||
status: flushed.status,
|
||||
});
|
||||
} else {
|
||||
await repo.update(plan.id, workspaceId, flushed);
|
||||
}
|
||||
}
|
||||
|
||||
it('plan: update when the upfront insert returned an id', () => {
|
||||
expect(planFinalizeAssistant('a1')).toEqual({ kind: 'update', id: 'a1' });
|
||||
});
|
||||
|
||||
it('plan: insert (fallback) when there is no upfront id', () => {
|
||||
expect(planFinalizeAssistant(undefined)).toEqual({ kind: 'insert' });
|
||||
});
|
||||
|
||||
it('(a) upfront insert succeeded -> finalize UPDATEs the row by id', async () => {
|
||||
const repo = { insert: jest.fn(), update: jest.fn() };
|
||||
const flushed = flushAssistant([], 'final answer', 'completed', {
|
||||
finishReason: 'stop',
|
||||
});
|
||||
await dispatchFinalize(repo, 'a1', flushed);
|
||||
expect(repo.update).toHaveBeenCalledWith('a1', workspaceId, flushed);
|
||||
expect(repo.insert).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('(b) upfront insert failed -> finalize INSERTs the terminal payload', async () => {
|
||||
const repo = { insert: jest.fn(), update: jest.fn() };
|
||||
const flushed = flushAssistant([], 'partial', 'error', { error: 'boom' });
|
||||
await dispatchFinalize(repo, undefined, flushed);
|
||||
expect(repo.update).not.toHaveBeenCalled();
|
||||
expect(repo.insert).toHaveBeenCalledTimes(1);
|
||||
const arg = repo.insert.mock.calls[0][0];
|
||||
// The fallback insert carries the terminal content/status/metadata.
|
||||
expect(arg.role).toBe('assistant');
|
||||
expect(arg.content).toBe('partial');
|
||||
expect(arg.status).toBe('error');
|
||||
expect((arg.metadata as { error?: string }).error).toBe('boom');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -107,7 +107,8 @@ export class AiChatController {
|
||||
title: chat.title ?? null,
|
||||
chatId: dto.chatId,
|
||||
rows,
|
||||
lang: dto.lang ?? 'en',
|
||||
// normalizeLang(undefined) already yields 'en', so no `?? 'en'` is needed.
|
||||
lang: dto.lang,
|
||||
});
|
||||
return { markdown };
|
||||
}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { AiChatService } from './ai-chat.service';
|
||||
|
||||
/**
|
||||
* Lifecycle unit tests for AiChatService.onModuleInit (#183 crash-recovery
|
||||
* sweep). The sweep is BEST-EFFORT: a failure must be logged (warn) but must
|
||||
* NEVER throw out of onModuleInit and block server startup. Exercised with a
|
||||
* hand-rolled mock repo — no Nest graph, no DB. Only `aiChatMessageRepo` is
|
||||
* touched by onModuleInit, so the other constructor deps are stubbed as never.
|
||||
*/
|
||||
describe('AiChatService.onModuleInit (startup sweep)', () => {
|
||||
function makeService(sweepStreaming: jest.Mock) {
|
||||
const aiChatMessageRepo = { sweepStreaming };
|
||||
const service = new AiChatService(
|
||||
{} as never, // ai
|
||||
{} as never, // aiChatRepo
|
||||
aiChatMessageRepo as never,
|
||||
{} as never, // aiSettings
|
||||
{} as never, // tools
|
||||
{} as never, // mcpClients
|
||||
{} as never, // aiAgentRoleRepo
|
||||
{} as never, // pageRepo
|
||||
{} as never, // pageAccess
|
||||
);
|
||||
return { service, aiChatMessageRepo };
|
||||
}
|
||||
|
||||
afterEach(() => jest.restoreAllMocks());
|
||||
|
||||
it('happy path: calls sweepStreaming and resolves', async () => {
|
||||
const sweepStreaming = jest.fn().mockResolvedValue(0);
|
||||
const { service } = makeService(sweepStreaming);
|
||||
await expect(service.onModuleInit()).resolves.toBeUndefined();
|
||||
expect(sweepStreaming).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('logs how many rows were swept when > 0', async () => {
|
||||
const sweepStreaming = jest.fn().mockResolvedValue(3);
|
||||
const logSpy = jest
|
||||
.spyOn(Logger.prototype, 'log')
|
||||
.mockImplementation(() => undefined);
|
||||
const { service } = makeService(sweepStreaming);
|
||||
await service.onModuleInit();
|
||||
expect(logSpy).toHaveBeenCalledTimes(1);
|
||||
expect(String(logSpy.mock.calls[0][0])).toContain('3');
|
||||
});
|
||||
|
||||
it('sweepStreaming throws -> onModuleInit resolves (does NOT throw) and warns', async () => {
|
||||
const sweepStreaming = jest
|
||||
.fn()
|
||||
.mockRejectedValue(new Error('db unavailable'));
|
||||
const warnSpy = jest
|
||||
.spyOn(Logger.prototype, 'warn')
|
||||
.mockImplementation(() => undefined);
|
||||
const { service } = makeService(sweepStreaming);
|
||||
// Must not throw — a sweep failure may never block startup.
|
||||
await expect(service.onModuleInit()).resolves.toBeUndefined();
|
||||
expect(warnSpy).toHaveBeenCalledTimes(1);
|
||||
expect(String(warnSpy.mock.calls[0][0])).toContain('db unavailable');
|
||||
});
|
||||
});
|
||||
@@ -4,7 +4,6 @@ import {
|
||||
serializeSteps,
|
||||
rowToUiMessage,
|
||||
prepareAgentStep,
|
||||
buildPartialAssistantRecord,
|
||||
flushAssistant,
|
||||
chatStreamMetadata,
|
||||
accumulateStepUsage,
|
||||
@@ -241,101 +240,13 @@ describe('prepareAgentStep', () => {
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Unit test for buildPartialAssistantRecord: the pure helper that shapes the
|
||||
* assistant-message record persisted on a partial/failed turn (the streamText
|
||||
* onError / onAbort paths). It captures the PARTIAL answer the user already saw
|
||||
* (finished steps' text + tool parts, plus the in-progress step's text) so a
|
||||
* provider error / disconnect no longer throws the streamed answer away. Pinning
|
||||
* the record shape here covers the persist-partial logic without seaming
|
||||
* streamText itself.
|
||||
*/
|
||||
describe('buildPartialAssistantRecord', () => {
|
||||
type AnyPart = Record<string, unknown>;
|
||||
|
||||
it('records an empty turn with the error text (preserves old behavior)', () => {
|
||||
const rec = buildPartialAssistantRecord(
|
||||
[],
|
||||
'',
|
||||
'error',
|
||||
'401: Unauthorized',
|
||||
);
|
||||
expect(rec).toEqual({
|
||||
text: '',
|
||||
toolCalls: null,
|
||||
metadata: {
|
||||
finishReason: 'error',
|
||||
parts: [],
|
||||
error: '401: Unauthorized',
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('persists in-progress text (no finished steps) as the partial answer', () => {
|
||||
const rec = buildPartialAssistantRecord(
|
||||
[],
|
||||
'partial answer',
|
||||
'error',
|
||||
'boom',
|
||||
);
|
||||
expect(rec.text).toBe('partial answer');
|
||||
expect(rec.metadata.parts).toEqual([
|
||||
{ type: 'text', text: 'partial answer' },
|
||||
]);
|
||||
expect(rec.metadata.error).toBe('boom');
|
||||
});
|
||||
|
||||
it('combines a finished tool step with trailing in-progress text', () => {
|
||||
const steps = [
|
||||
{
|
||||
text: 'looked it up',
|
||||
toolCalls: [
|
||||
{ toolCallId: 'c1', toolName: 'getPage', input: { id: 'p1' } },
|
||||
],
|
||||
toolResults: [
|
||||
{ toolCallId: 'c1', toolName: 'getPage', output: { title: 'T' } },
|
||||
],
|
||||
},
|
||||
];
|
||||
const rec = buildPartialAssistantRecord(
|
||||
steps,
|
||||
' and then',
|
||||
'error',
|
||||
'boom',
|
||||
);
|
||||
const parts = rec.metadata.parts as AnyPart[];
|
||||
// The finished step's text part is present.
|
||||
expect(parts).toContainEqual({ type: 'text', text: 'looked it up' });
|
||||
// The paired tool call+result becomes an output-available part.
|
||||
const toolPart = parts.find((p) => p.type === 'tool-getPage');
|
||||
expect(toolPart).toBeDefined();
|
||||
expect(toolPart!.state).toBe('output-available');
|
||||
// The in-progress text is appended LAST so the parts match the stream order.
|
||||
expect(parts[parts.length - 1]).toEqual({
|
||||
type: 'text',
|
||||
text: ' and then',
|
||||
});
|
||||
expect(rec.text).toBe('looked it up and then');
|
||||
expect(rec.toolCalls).not.toBeNull();
|
||||
expect(rec.metadata.error).toBe('boom');
|
||||
});
|
||||
|
||||
it('omits the error key on the abort path (no errorText)', () => {
|
||||
const rec = buildPartialAssistantRecord([], 'half', 'aborted');
|
||||
expect(rec.metadata.finishReason).toBe('aborted');
|
||||
expect('error' in rec.metadata).toBe(false);
|
||||
expect(rec.text).toBe('half');
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* flushAssistant (#183): the PURE row builder behind the step-granular durable
|
||||
* write path. It runs identically for the upfront insert (empty steps,
|
||||
* 'streaming'), every per-step update, and the terminal finalize — so a future
|
||||
* background worker can call the same function. These tests pin the four status
|
||||
* shapes and, critically, that `metadata.parts` stays IDENTICAL to the old
|
||||
* buildPartialAssistantRecord / assistantParts output (rowToUiMessage/findRecent
|
||||
* depend on it).
|
||||
* shapes and the `metadata.parts` shape that rowToUiMessage/findRecent depend on
|
||||
* (per-step text + tool parts via assistantParts, in-progress text appended).
|
||||
*/
|
||||
describe('flushAssistant', () => {
|
||||
type AnyPart = Record<string, unknown>;
|
||||
@@ -411,21 +322,24 @@ describe('flushAssistant', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('metadata.parts parity with buildPartialAssistantRecord (error path)', () => {
|
||||
it('combines a finished tool step with trailing in-progress text (error path)', () => {
|
||||
// The error path captures the PARTIAL answer the user already saw: each
|
||||
// finished step's text + tool parts, then the in-progress step's text last.
|
||||
const flushed = flushAssistant([toolStep], ' and then', 'error', {
|
||||
error: 'boom',
|
||||
});
|
||||
const legacy = buildPartialAssistantRecord(
|
||||
[toolStep],
|
||||
' and then',
|
||||
'error',
|
||||
'boom',
|
||||
);
|
||||
// The whole metadata block (parts + finishReason + error) must match the
|
||||
// legacy partial-record shape so rebuilt history is unchanged.
|
||||
expect(flushed.metadata).toEqual(legacy.metadata);
|
||||
expect(flushed.content).toBe(legacy.text);
|
||||
expect(flushed.toolCalls).toEqual(legacy.toolCalls);
|
||||
const parts = flushed.metadata.parts as AnyPart[];
|
||||
expect(parts).toContainEqual({ type: 'text', text: 'looked it up' });
|
||||
const toolPart = parts.find((p) => p.type === 'tool-getPage');
|
||||
expect(toolPart!.state).toBe('output-available');
|
||||
// In-progress text appended LAST so the parts match the stream order.
|
||||
expect(parts[parts.length - 1]).toEqual({
|
||||
type: 'text',
|
||||
text: ' and then',
|
||||
});
|
||||
expect(flushed.content).toBe('looked it up and then');
|
||||
expect(flushed.toolCalls).not.toBeNull();
|
||||
expect(flushed.metadata.error).toBe('boom');
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -412,7 +412,10 @@ export class AiChatService implements OnModuleInit {
|
||||
});
|
||||
assistantId = seeded?.id;
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to insert upfront assistant row', err as Error);
|
||||
this.logger.error(
|
||||
`Failed to insert upfront assistant row (chat ${chatId}, workspace ${workspace.id})`,
|
||||
err as Error,
|
||||
);
|
||||
}
|
||||
|
||||
// Per-step (non-terminal) update: persist the finished steps the moment a
|
||||
@@ -453,7 +456,8 @@ export class AiChatService implements OnModuleInit {
|
||||
): Promise<void> => {
|
||||
if (finalized) return;
|
||||
finalized = true;
|
||||
if (!assistantId) {
|
||||
const plan = planFinalizeAssistant(assistantId);
|
||||
if (plan.kind === 'insert') {
|
||||
// The upfront insert failed: fall back to inserting the terminal row so
|
||||
// the turn is not lost entirely.
|
||||
try {
|
||||
@@ -476,7 +480,7 @@ export class AiChatService implements OnModuleInit {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await this.aiChatMessageRepo.update(assistantId, workspace.id, flushed);
|
||||
await this.aiChatMessageRepo.update(plan.id, workspace.id, flushed);
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to finalize assistant message', err as Error);
|
||||
}
|
||||
@@ -552,6 +556,15 @@ export class AiChatService implements OnModuleInit {
|
||||
// pre-#183 onFinish record exactly; `inProgressText` is '' here (the last
|
||||
// step already finished). Final-step usage (usage.input+output) ≈ the
|
||||
// conversation's CURRENT context size, distinct from totalUsage.
|
||||
//
|
||||
// COLUMN-SEMANTICS NOTE (#183): `content` is built by flushAssistant as
|
||||
// the CONCATENATION of every step's text (stepsText), whereas pre-#183
|
||||
// it stored only the FINAL step's text. This is a deliberate, harmless
|
||||
// change: the UI and the Markdown export render from `metadata.parts`
|
||||
// (per-step text + tool parts), not from `content`; `content` is the
|
||||
// plain-text projection (full-text search / fallback). A multi-step
|
||||
// turn's `content` therefore now holds all steps' prose, not just the
|
||||
// last block.
|
||||
await finalizeAssistant(
|
||||
flushAssistant(steps as StepLike[], '', 'completed', {
|
||||
finishReason: finishReason as string,
|
||||
@@ -1088,6 +1101,21 @@ export interface AssistantFlush {
|
||||
status: 'streaming' | 'completed' | 'error' | 'aborted';
|
||||
}
|
||||
|
||||
/**
|
||||
* Pure decision for the terminal finalize (#183): given whether the upfront
|
||||
* assistant row exists (`assistantId`), choose whether the terminal payload is
|
||||
* written by UPDATEing that row or — when the upfront insert failed and there is
|
||||
* no id — by INSERTing a fresh terminal row so the turn is not lost entirely.
|
||||
* Returns `{ kind: 'update', id }` or `{ kind: 'insert' }`. Extracted so the
|
||||
* fallback-insert branch (the only safety against losing a turn whose upfront
|
||||
* insert failed) is unit-testable without seaming streamText.
|
||||
*/
|
||||
export function planFinalizeAssistant(
|
||||
assistantId: string | undefined,
|
||||
): { kind: 'update'; id: string } | { kind: 'insert' } {
|
||||
return assistantId ? { kind: 'update', id: assistantId } : { kind: 'insert' };
|
||||
}
|
||||
|
||||
/**
|
||||
* PURE assistant-row builder (#183 step-granular durability). Given the turn's
|
||||
* accumulated steps + the in-progress (not-yet-finished) text + the lifecycle
|
||||
@@ -1097,9 +1125,8 @@ export interface AssistantFlush {
|
||||
* worker can call it identically, so it must stay a pure function of its inputs
|
||||
* (NO `this`, no IO).
|
||||
*
|
||||
* `metadata.parts` is built by the EXACT same logic the old
|
||||
* buildPartialAssistantRecord used (assistantParts over finished steps, then the
|
||||
* in-progress text appended as a trailing text part), so rowToUiMessage /
|
||||
* `metadata.parts` is built by assistantParts over the finished steps, then the
|
||||
* in-progress text appended as a trailing text part, so rowToUiMessage /
|
||||
* findRecent keep replaying the turn unchanged. `metadata.finishReason`,
|
||||
* `metadata.error`, `metadata.usage` and `metadata.contextTokens` are attached
|
||||
* only when provided/relevant, matching the pre-#183 onFinish/onError records.
|
||||
@@ -1152,34 +1179,6 @@ export function flushAssistant(
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the assistant-message record persisted on a partial/failed turn (the
|
||||
* streamText onError / onAbort paths). Captures the partial answer the user
|
||||
* already saw: each finished step's text + tool parts (via assistantParts),
|
||||
* then the in-progress step's text appended last. When `errorText` is provided
|
||||
* it is recorded in metadata.error so the cause shows in history; an aborted
|
||||
* turn passes none. Pure, so the partial-recording shape is unit-testable
|
||||
* without seaming streamText.
|
||||
*
|
||||
* Thin wrapper over {@link flushAssistant} (retained for the existing unit
|
||||
* tests and its historical `{ text, toolCalls, metadata }` shape).
|
||||
*/
|
||||
export function buildPartialAssistantRecord(
|
||||
steps: ReadonlyArray<StepLike> | undefined,
|
||||
inProgressText: string,
|
||||
finishReason: 'error' | 'aborted',
|
||||
errorText?: string,
|
||||
): { text: string; toolCalls: unknown; metadata: Record<string, unknown> } {
|
||||
const flushed = flushAssistant(steps, inProgressText, finishReason, {
|
||||
error: errorText,
|
||||
});
|
||||
return {
|
||||
text: flushed.content,
|
||||
toolCalls: flushed.toolCalls,
|
||||
metadata: flushed.metadata,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Reduce SDK step objects to a compact, JSON-serializable trace for the
|
||||
* `tool_calls` column. Stores only what the UI action-log and history need —
|
||||
|
||||
@@ -48,9 +48,12 @@ interface UsageLike {
|
||||
reasoningTokens?: number;
|
||||
}
|
||||
|
||||
/** Localized label table. Keep the keys identical to the client's i18n keys so
|
||||
* the two exports read the same. Only role + tool-action labels are localized;
|
||||
* everything structural is an English constant in the renderer. */
|
||||
/** Localized label table. The client-side Markdown builder was removed by #183
|
||||
* (the export is now server-side only), so this no longer mirrors a second
|
||||
* exporter — instead the tool-action labels are kept in parity with the
|
||||
* on-screen action-log labels in the client's `tool-parts.tsx` (`toolLabelKey`)
|
||||
* so the export reads the same as the UI. Only role + tool-action labels are
|
||||
* localized; everything structural is an English constant in the renderer. */
|
||||
const LABELS: Record<
|
||||
ExportLang,
|
||||
{
|
||||
@@ -232,7 +235,7 @@ export function buildChatMarkdown(args: {
|
||||
};
|
||||
const errorOf = (row: AiChatMessage): string | undefined => {
|
||||
const meta = (row.metadata ?? {}) as { error?: string };
|
||||
return meta.error ?? undefined;
|
||||
return meta.error;
|
||||
};
|
||||
|
||||
// Metadata bullet list. Total tokens is only shown when there is a sum.
|
||||
|
||||
@@ -9,6 +9,20 @@ import {
|
||||
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
|
||||
import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagination';
|
||||
|
||||
// Crash-recovery sweep recency threshold (#183 review): a 'streaming' row is
|
||||
// only swept to 'aborted' once it has been UNTOUCHED for this long. A live turn
|
||||
// bumps `updatedAt` on every step (well under this window), so its row never
|
||||
// matches; only a turn whose process truly died (no step update for >threshold)
|
||||
// is swept. Chosen safely ABOVE the longest realistic turn so a fresh replica's
|
||||
// boot-sweep can never abort a turn another replica is actively streaming
|
||||
// (multi-instance deploy).
|
||||
const SWEEP_STREAMING_STALE_MS = 10 * 60 * 1000; // 10 minutes
|
||||
|
||||
// Hard upper bound on the rows materialized by `findAllByChat` (export path).
|
||||
// A generous cap so a pathologically huge chat cannot load an unbounded result
|
||||
// into memory; far above any realistic transcript length.
|
||||
const FIND_ALL_BY_CHAT_LIMIT = 5000;
|
||||
|
||||
@Injectable()
|
||||
export class AiChatMessageRepo {
|
||||
constructor(@InjectKysely() private readonly db: KyselyDB) {}
|
||||
@@ -66,6 +80,10 @@ export class AiChatMessageRepo {
|
||||
// (#183), where the DB is the single source of truth and the whole transcript
|
||||
// must be rendered in one pass (findByChat is cursor-paginated and would only
|
||||
// return the first page).
|
||||
//
|
||||
// Hard-capped at FIND_ALL_BY_CHAT_LIMIT rows (a generous bound, far above any
|
||||
// realistic transcript) so exporting a pathologically huge chat cannot
|
||||
// materialize an unbounded result set in memory.
|
||||
async findAllByChat(
|
||||
chatId: string,
|
||||
workspaceId: string,
|
||||
@@ -78,6 +96,7 @@ export class AiChatMessageRepo {
|
||||
.where('deletedAt', 'is', null)
|
||||
.orderBy('createdAt', 'asc')
|
||||
.orderBy('id', 'asc')
|
||||
.limit(FIND_ALL_BY_CHAT_LIMIT)
|
||||
.execute();
|
||||
}
|
||||
|
||||
@@ -162,13 +181,21 @@ export class AiChatMessageRepo {
|
||||
* status) to 'aborted'. Run once on server start. Returns the number of rows
|
||||
* swept so the caller can log it. Workspace-wide on purpose — a crash can have
|
||||
* dangling streaming rows across any workspace.
|
||||
*
|
||||
* Bounded by recency (#183 review): only rows UNTOUCHED for
|
||||
* SWEEP_STREAMING_STALE_MS are swept. A live turn bumps `updatedAt` on every
|
||||
* step, so an actively-streaming row never matches; this prevents a fresh
|
||||
* replica's boot-sweep from aborting a turn another replica is still streaming
|
||||
* in a multi-instance deploy.
|
||||
*/
|
||||
async sweepStreaming(trx?: KyselyTransaction): Promise<number> {
|
||||
const db = dbOrTx(this.db, trx);
|
||||
const staleBefore = new Date(Date.now() - SWEEP_STREAMING_STALE_MS);
|
||||
const rows = await db
|
||||
.updateTable('aiChatMessages')
|
||||
.set({ status: 'aborted', updatedAt: new Date() })
|
||||
.where('status', '=', 'streaming')
|
||||
.where('updatedAt', '<', staleBefore)
|
||||
.returning('id')
|
||||
.execute();
|
||||
return rows.length;
|
||||
|
||||
@@ -68,7 +68,8 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
|
||||
expect(updated!.content).toBe('final answer');
|
||||
expect(updated!.status).toBe('completed');
|
||||
expect((updated!.metadata as any).parts).toHaveLength(1);
|
||||
expect(new Date(updated!.updatedAt).getTime()).toBeGreaterThanOrEqual(
|
||||
// The 5ms sleep above guarantees a strictly-later timestamp.
|
||||
expect(new Date(updated!.updatedAt).getTime()).toBeGreaterThan(
|
||||
new Date(before).getTime(),
|
||||
);
|
||||
});
|
||||
@@ -128,8 +129,23 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
|
||||
await repo.update(seeded.id, workspaceId, { status: 'completed' });
|
||||
});
|
||||
|
||||
it('sweepStreaming flips dangling streaming rows to aborted and counts them', async () => {
|
||||
// Two dangling streaming rows in our workspace + one in another workspace.
|
||||
// Backdate a row's updatedAt so it qualifies as a STALE streaming row (the
|
||||
// sweep only flips rows untouched for >10 minutes — a live turn bumps
|
||||
// updatedAt every step, so it would never match).
|
||||
async function backdateUpdatedAt(
|
||||
id: string,
|
||||
minutesAgo: number,
|
||||
): Promise<void> {
|
||||
await db
|
||||
.updateTable('aiChatMessages')
|
||||
.set({ updatedAt: new Date(Date.now() - minutesAgo * 60 * 1000) })
|
||||
.where('id', '=', id)
|
||||
.execute();
|
||||
}
|
||||
|
||||
it('sweepStreaming flips STALE dangling streaming rows to aborted and counts them', async () => {
|
||||
// Two dangling streaming rows in our workspace + one in another workspace —
|
||||
// all backdated past the staleness threshold so the sweep picks them up.
|
||||
const a = await createMessage(db, {
|
||||
workspaceId,
|
||||
chatId,
|
||||
@@ -142,6 +158,16 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
|
||||
role: 'assistant',
|
||||
status: 'streaming',
|
||||
});
|
||||
const other = await createMessage(db, {
|
||||
workspaceId: otherWorkspaceId,
|
||||
chatId: otherChatId,
|
||||
role: 'assistant',
|
||||
status: 'streaming',
|
||||
});
|
||||
await backdateUpdatedAt(a.id, 20);
|
||||
await backdateUpdatedAt(b.id, 20);
|
||||
await backdateUpdatedAt(other.id, 20);
|
||||
|
||||
// A settled row must NOT be touched.
|
||||
const done = await createMessage(db, {
|
||||
workspaceId,
|
||||
@@ -156,15 +182,9 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
|
||||
role: 'assistant',
|
||||
status: null,
|
||||
});
|
||||
await createMessage(db, {
|
||||
workspaceId: otherWorkspaceId,
|
||||
chatId: otherChatId,
|
||||
role: 'assistant',
|
||||
status: 'streaming',
|
||||
});
|
||||
|
||||
const swept = await repo.sweepStreaming();
|
||||
// At least the 3 streaming rows we created (2 here + 1 in the other ws).
|
||||
// At least the 3 stale streaming rows we created (2 here + 1 in the other ws).
|
||||
expect(swept).toBeGreaterThanOrEqual(3);
|
||||
|
||||
const rows = await repo.findAllByChat(chatId, workspaceId);
|
||||
@@ -181,4 +201,34 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
|
||||
expect(rows2.find((r) => r.id === a.id)!.status).toBe('aborted');
|
||||
expect(again).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
it('sweepStreaming does NOT sweep a FRESH streaming row (recency bound, #183 review)', async () => {
|
||||
// A row that is actively streaming (recent updatedAt) must survive the sweep:
|
||||
// a fresh replica's boot-sweep must never abort a turn another replica is
|
||||
// still streaming in a multi-instance deploy.
|
||||
const fresh = await createMessage(db, {
|
||||
workspaceId,
|
||||
chatId,
|
||||
role: 'assistant',
|
||||
status: 'streaming',
|
||||
});
|
||||
// A STALE streaming row created alongside it IS swept — proving the sweep
|
||||
// ran and the only difference is recency.
|
||||
const stale = await createMessage(db, {
|
||||
workspaceId,
|
||||
chatId,
|
||||
role: 'assistant',
|
||||
status: 'streaming',
|
||||
});
|
||||
await backdateUpdatedAt(stale.id, 20);
|
||||
|
||||
await repo.sweepStreaming();
|
||||
|
||||
const rows = await repo.findAllByChat(chatId, workspaceId);
|
||||
const byId = new Map(rows.map((r) => [r.id, r]));
|
||||
// Fresh (recently-updated) streaming row is left untouched...
|
||||
expect(byId.get(fresh.id)!.status).toBe('streaming');
|
||||
// ...while the stale one alongside it was swept to 'aborted'.
|
||||
expect(byId.get(stale.id)!.status).toBe('aborted');
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user