Compare commits

..

3 Commits

Author SHA1 Message Date
claude code agent 227
686c3f9d14 fix(ai-chat): branch sendNow on live status to defuse stale-status race (#198)
Port the only substantive fix #211 was missing relative to #203 (which is
being closed): the "Send now" handler branched on the closure-captured
isStreaming, but a turn can finish between render and click. In that window
stop() is a no-op, so arming flushOnAbortRef/interruptNextSendRef would strand
those one-shot flags and leak into a later, unrelated Stop (auto-sending a
queued message the user never asked to send).

- Mirror the live useChat status in statusRef (updated each render) and branch
  sendNow on it instead of isStreaming, so the not-streaming path runs when the
  turn has already ended and the interrupt flags are never armed against a
  no-op stop().
- Belt-and-suspenders: clear flushOnAbortRef/interruptNextSendRef when a new
  turn starts streaming, defusing the sub-render-tick window where a flag could
  still be armed but the expected abort never fired. No-op for the legit
  interrupt path (both refs are consumed synchronously beforehand).

Keeps #211's existing structure and its flushNext-returns-boolean fix. The
rest of #203's divergence is comment rewording, a server-side rename of the
same pure interrupt-gate, and fewer tests — nothing else to port.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 20:40:06 +03:00
claude code agent 227
6faf2475e6 fix(ai-chat): address PR #211 review (i18n keys, dead export, flag leak)
- Register the new AI-chat keys "Send now" and "Interrupt and send now" in
  both en-US and ru-RU catalogs so the UI never renders mixed-language
  tooltip/aria-label (i18n policy).
- Make INTERRUPT_NOTE module-private (drop the unused re-export), matching the
  module's private DEFAULT_PROMPT/SAFETY_FRAMEWORK siblings.
- Reset interruptNextSendRef in the flush-on-abort branch when nothing is
  actually sent, so a stuck one-shot interrupt flag cannot tag the next
  unrelated send; flushNext now reports whether it sent.
- Add a CHANGELOG [Unreleased]/Added entry for #198.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 20:40:06 +03:00
claude code agent 227
422389d84e feat(ai-chat): interrupt agent + send queued message, keeping partial output (#198)
Add a "send now" button to queued AI-chat messages: it interrupts the
running agent and immediately sends that message, while the agent's
partial output at interruption is kept in history and the next turn is
marked as a user interrupt.

Client:
- queue-helpers: pure `promoteToHead` to move a queued message to the head.
- chat-thread: `sendNow` (promote head + abort + flush-on-abort), one-shot
  `flushOnAbortRef`/`interruptNextSendRef`, `interrupted` flag in the
  request body, and the "send now" ActionIcon in the queued list.

Server:
- `interrupted` on AiChatStreamBody; pure `isInterruptResume` confirms the
  client hint against persisted history (prev assistant turn aborted/
  streaming) before honouring it.
- prompt: INTERRUPT_NOTE injected in the context section only on a
  confirmed interrupt-resume turn so the model treats the partial answer
  above as incomplete.

Tests: promoteToHead, chat-thread send-now (abort + resend + one-shot
interrupt flag + non-streaming immediate send), isInterruptResume, and
the prompt interrupt-note injection.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 20:38:23 +03:00
13 changed files with 532 additions and 80 deletions

View File

@@ -10,6 +10,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- **Interrupt the AI agent and send a queued message now.** A queued AI-chat
message gains a "send now" action that interrupts the streaming turn and
immediately sends that message, keeping the agent's partial output. The
follow-up turn is tagged as an interrupt so the model is told its previous
answer was cut off and builds on it instead of restarting; the rest of the
queue still flushes normally afterward. (#198)
## [0.94.0] - 2026-06-26
This release makes AI chat durable and fast: assistant turns are persisted to
@@ -75,15 +84,6 @@ per-workspace rolling-day token budget.
### Changed
- **AI chat now feeds the model the full stored transcript.** The per-turn model
conversation was rebuilt from a sliding window of the 50 most recent stored
rows, which silently dropped the beginning of any longer chat. It is now
rebuilt from the complete non-deleted transcript in chronological order, so
the model sees every turn (a 5000-row backstop guards process memory — a
safety net far above any realistic chat, not a conversational limit). On a
very long chat this can eventually reach the model's context window; the
client already surfaces that as "start a new chat". (#202)
- **AI chat default provider is now `openai-compatible` (reasoning surfaced).**
For the `openai` driver the chat provider defaults to the openai-compatible
implementation, so a workspace pointing at z.ai/GLM/DeepSeek now streams the

View File

@@ -1180,6 +1180,8 @@
"Send when the agent finishes": "Send when the agent finishes",
"Queue message": "Queue message",
"Remove queued message": "Remove queued message",
"Send now": "Send now",
"Interrupt and send now": "Interrupt and send now",
"Stop": "Stop",
"Response stopped.": "Response stopped.",
"Connection lost — the answer was interrupted.": "Connection lost — the answer was interrupted.",

View File

@@ -723,6 +723,8 @@
"Send when the agent finishes": "Отправить, когда агент закончит",
"Queue message": "Поставить в очередь",
"Remove queued message": "Убрать из очереди",
"Send now": "Отправить сейчас",
"Interrupt and send now": "Прервать и отправить сейчас",
"Something went wrong": "Что-то пошло не так",
"Stop": "Стоп",
"The AI agent could not respond. Please try again.": "AI-агент не смог ответить. Попробуйте ещё раз.",

View File

@@ -0,0 +1,142 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { render, screen, fireEvent, act } from "@testing-library/react";
import { MantineProvider } from "@mantine/core";
// Shared, hoisted mock state so the @ai-sdk/react and "ai" module mocks (hoisted
// above the imports) can expose the captured useChat callbacks / transport and
// the spies back to the test body.
const h = vi.hoisted(() => ({
state: {
status: "streaming" as string,
onFinish: null as null | ((arg: Record<string, unknown>) => void),
sendMessage: vi.fn(),
stop: vi.fn(),
transport: null as null | {
prepareSendMessagesRequest: (arg: {
messages: unknown[];
body: Record<string, unknown>;
}) => { body: Record<string, unknown> };
},
},
}));
// Mock useChat: capture onFinish, return the spies and the controllable status.
vi.mock("@ai-sdk/react", () => ({
useChat: (opts: { onFinish?: (arg: Record<string, unknown>) => void }) => {
h.state.onFinish = opts.onFinish ?? null;
return {
messages: [],
sendMessage: h.state.sendMessage,
status: h.state.status,
stop: h.state.stop,
error: null,
};
},
}));
// Mock "ai": deterministic ids + a transport that records its options so the test
// can invoke prepareSendMessagesRequest and assert the `interrupted` flag.
vi.mock("ai", () => {
let counter = 0;
return {
generateId: () => `gid-${counter++}`,
DefaultChatTransport: class {
constructor(opts: {
prepareSendMessagesRequest: (arg: {
messages: unknown[];
body: Record<string, unknown>;
}) => { body: Record<string, unknown> };
}) {
h.state.transport = opts;
}
},
};
});
// Stub the heavy children: MessageList (markdown/render) and ChatInput (the
// composer). The ChatInput stub exposes a button that queues a message, the only
// interaction this test needs to populate the queue while "streaming".
vi.mock("@/features/ai-chat/components/message-list.tsx", () => ({
default: () => <div data-testid="message-list" />,
}));
vi.mock("@/features/ai-chat/components/chat-input.tsx", () => ({
default: ({ onQueue }: { onQueue: (text: string) => void }) => (
<button data-testid="queue-btn" onClick={() => onQueue("queued text")}>
queue
</button>
),
}));
import ChatThread from "./chat-thread";
function renderThread() {
const onTurnFinished = vi.fn();
render(
<MantineProvider>
<ChatThread chatId="c1" initialRows={[]} onTurnFinished={onTurnFinished} />
</MantineProvider>,
);
return { onTurnFinished };
}
describe("ChatThread — send now (#198)", () => {
beforeEach(() => {
h.state.status = "streaming";
h.state.onFinish = null;
h.state.sendMessage.mockClear();
h.state.stop.mockClear();
h.state.transport = null;
});
it("aborts the current turn and resends the queued message on the abort", () => {
renderThread();
// Queue a message while the turn is streaming.
fireEvent.click(screen.getByTestId("queue-btn"));
const sendNowBtn = screen.getByLabelText("Send now");
expect(sendNowBtn).toBeTruthy();
// "Send now" interrupts the current turn (stop), but does NOT send yet —
// the resend happens once the abort lands in onFinish.
fireEvent.click(sendNowBtn);
expect(h.state.stop).toHaveBeenCalledTimes(1);
expect(h.state.sendMessage).not.toHaveBeenCalled();
// The abort we triggered reaches onFinish: the promoted head is flushed.
act(() => {
h.state.onFinish?.({
message: { id: "a", role: "assistant", parts: [] },
isAbort: true,
isDisconnect: false,
isError: false,
});
});
expect(h.state.sendMessage).toHaveBeenCalledWith({ text: "queued text" });
});
it("tags exactly the next send as interrupted (one-shot flag)", () => {
renderThread();
fireEvent.click(screen.getByTestId("queue-btn"));
fireEvent.click(screen.getByLabelText("Send now"));
const prep = h.state.transport!.prepareSendMessagesRequest;
// The send right after "send now" carries interrupted: true...
expect(prep({ messages: [], body: {} }).body.interrupted).toBe(true);
// ...and only that one (the flag is read-and-cleared).
expect(prep({ messages: [], body: {} }).body.interrupted).toBe(false);
});
it("sends immediately without an interrupt when not streaming", () => {
h.state.status = "ready";
renderThread();
fireEvent.click(screen.getByTestId("queue-btn"));
fireEvent.click(screen.getByLabelText("Send now"));
// No turn to interrupt: sent straight away, no abort, not flagged.
expect(h.state.stop).not.toHaveBeenCalled();
expect(h.state.sendMessage).toHaveBeenCalledWith({ text: "queued text" });
const prep = h.state.transport!.prepareSendMessagesRequest;
expect(prep({ messages: [], body: {} }).body.interrupted).toBe(false);
});
});

View File

@@ -1,7 +1,11 @@
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { generateId } from "ai";
import { ActionIcon, Box, Group, Stack, Text } from "@mantine/core";
import { IconClockHour4, IconX } from "@tabler/icons-react";
import { ActionIcon, Box, Group, Stack, Text, Tooltip } from "@mantine/core";
import {
IconClockHour4,
IconPlayerPlayFilled,
IconX,
} from "@tabler/icons-react";
import { useTranslation } from "react-i18next";
import { useChat, type UIMessage } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
@@ -23,6 +27,7 @@ import { extractServerChatId } from "@/features/ai-chat/utils/adopt-chat-id.ts";
import {
dequeue,
enqueueMessage,
promoteToHead,
removeQueuedById,
type QueuedMessage,
} from "@/features/ai-chat/utils/queue-helpers.ts";
@@ -201,12 +206,25 @@ export default function ChatThread({
// helper can call the current instance from the stable `onFinish` callback.
const sendMessageRef = useRef<((m: { text: string }) => void) | null>(null);
// "Send now" single-flight flags. Kept in refs (not state) so they are read
// inside the stable `onFinish` callback and the transport closure WITHOUT a
// re-render or a stale closure. Both are one-shot (read-and-clear).
// - flushOnAbortRef: flush the promoted head on the abort WE triggered, even
// though an aborted turn normally keeps the queue intact.
// - interruptNextSendRef: tag the next send as a user interrupt so the server
// injects the "your previous answer was interrupted" note for that turn only.
const flushOnAbortRef = useRef(false);
const interruptNextSendRef = useRef(false);
// FIFO dequeue + send the next queued message (no-op when the queue is empty).
// Returns whether a message was actually sent, so callers can tell an empty
// dequeue (nothing to flush) from a real send.
const flushNext = useCallback(() => {
const { head, rest } = dequeue(queuedRef.current);
if (!head) return;
if (!head) return false;
setQueue(rest);
sendMessageRef.current?.({ text: head.text });
return true;
}, [setQueue]);
const enqueue = useCallback(
@@ -232,17 +250,26 @@ export default function ChatThread({
// when null) and tell the agent which page "this page" refers to. Both
// are read live from refs so changing chats/pages does NOT recreate the
// transport. `openPage` is null on a non-page route.
prepareSendMessagesRequest: ({ messages, body }) => ({
body: {
...body,
chatId: chatIdRef.current,
openPage: openPageRef.current,
// Honoured by the server only when creating a new chat; null =>
// universal assistant.
roleId: roleIdRef.current,
messages,
},
}),
prepareSendMessagesRequest: ({ messages, body }) => {
// Read-and-clear the interrupt flag so the "you were interrupted" note
// is carried by ONLY this request (the one resending the promoted
// message right after we aborted the previous turn). The server still
// confirms it against history before acting on it.
const interrupted = interruptNextSendRef.current;
interruptNextSendRef.current = false; // one-shot
return {
body: {
...body,
chatId: chatIdRef.current,
openPage: openPageRef.current,
// Honoured by the server only when creating a new chat; null =>
// universal assistant.
roleId: roleIdRef.current,
interrupted,
messages,
},
};
},
}),
[],
);
@@ -277,6 +304,21 @@ export default function ChatThread({
else if (isAbort) setStopNotice("manual");
else if (isDisconnect) setStopNotice("disconnect");
else setStopNotice(null);
// "Send now": WE triggered this abort to interrupt the current turn and
// immediately send the promoted head. Flush it even though the turn was
// aborted (the normal abort path below keeps the queue intact). The
// interrupt note travels with this send via interruptNextSendRef.
if (flushOnAbortRef.current) {
flushOnAbortRef.current = false;
// Suppress the "Response stopped." flash for an intentional interrupt.
setStopNotice(null);
// If the promoted head vanished (e.g. the user removed it before the
// abort landed) flushNext sends nothing — clear the one-shot interrupt
// tag so it can't leak onto the next unrelated send. On a real send the
// tag is consumed by prepareSendMessagesRequest and stays untouched.
if (!flushNext()) interruptNextSendRef.current = false;
return;
}
if (isAbort || isDisconnect || isError) return;
flushNext();
},
@@ -298,6 +340,13 @@ export default function ChatThread({
// Keep the flush helper pointed at the latest sendMessage instance.
sendMessageRef.current = sendMessage;
// Mirror the live turn status in a ref so event handlers (sendNow) branch on the
// CURRENT status rather than a value captured in a stale render closure — a turn
// can finish between render and click, and arming the interrupt refs against a
// no-op stop() would leave them set to leak into a later, unrelated Stop.
const statusRef = useRef(status);
statusRef.current = status;
// EARLY chat-id adoption (#174): the server streams the authoritative chat id
// on the assistant message metadata at the `start` chunk (message.metadata.
// chatId — see adopt-chat-id.ts / chatStreamMetadata). Forward it to the parent
@@ -329,9 +378,49 @@ export default function ChatThread({
const isStreaming = status === "submitted" || status === "streaming";
// Clear the stopped marker as soon as a new turn begins streaming.
// "Send now" on a queued message: interrupt the current turn and immediately
// send THIS message, keeping the agent's partial output. Other queued messages
// stay queued and flush normally after the new turn. Reuses the existing
// queue/flush machinery: promote the target to the head, then abort — the
// onFinish flush-on-abort branch sends exactly that head, tagged as an
// interrupt so the server notes the previous answer was cut off.
const sendNow = useCallback(
(id: string) => {
// Branch on the LIVE status (statusRef), NOT the closure-captured isStreaming:
// the turn may have finished between this render and the click, in which case
// stop() is a no-op and arming the interrupt refs would strand them for a
// later, unrelated Stop. Reading the ref always sees the current status.
const liveStreaming =
statusRef.current === "submitted" || statusRef.current === "streaming";
if (liveStreaming) {
// Promote to head so the onFinish -> flushNext path sends exactly it.
setQueue(promoteToHead(queuedRef.current, id));
flushOnAbortRef.current = true;
interruptNextSendRef.current = true;
stop(); // -> onFinish({ isAbort: true }) flushes the promoted head
} else {
// Nothing to interrupt: just send it now (no interrupt note).
const msg = queuedRef.current.find((m) => m.id === id);
if (!msg) return;
setQueue(removeQueuedById(queuedRef.current, id));
sendMessageRef.current?.({ text: msg.text });
}
},
[setQueue, stop],
);
// Clear the stopped marker as soon as a new turn begins streaming, and drop any
// stale "Send now" interrupt flags. On the legit interrupt path both refs are
// already consumed synchronously (onFinish + prepareSendMessagesRequest) before
// this effect runs, so clearing here is a no-op for it; its purpose is to defuse
// the race where a flag was armed but the expected abort never fired (the turn
// finished in the same tick as the click), so it cannot leak into a later turn.
useEffect(() => {
if (isStreaming) setStopNotice(null);
if (isStreaming) {
setStopNotice(null);
flushOnAbortRef.current = false;
interruptNextSendRef.current = false;
}
}, [isStreaming]);
// Classify the turn error into a heading + detail so the banner names the cause
@@ -423,6 +512,17 @@ export default function ChatThread({
<Text size="xs" lineClamp={2} className={classes.queuedText}>
{m.text}
</Text>
<Tooltip label={t("Interrupt and send now")} withArrow>
<ActionIcon
size="xs"
variant="subtle"
color="blue"
onClick={() => sendNow(m.id)}
aria-label={t("Send now")}
>
<IconPlayerPlayFilled size={12} />
</ActionIcon>
</Tooltip>
<ActionIcon
size="xs"
variant="subtle"

View File

@@ -2,6 +2,7 @@ import { describe, it, expect } from "vitest";
import {
enqueueMessage,
dequeue,
promoteToHead,
removeQueuedById,
type QueuedMessage,
} from "./queue-helpers";
@@ -89,6 +90,52 @@ describe("removeQueuedById", () => {
});
});
describe("promoteToHead", () => {
it("moves the matching id to the front, preserving the rest's order", () => {
const queue: QueuedMessage[] = [
{ id: "a", text: "first" },
{ id: "b", text: "second" },
{ id: "c", text: "third" },
];
expect(promoteToHead(queue, "c")).toEqual([
{ id: "c", text: "third" },
{ id: "a", text: "first" },
{ id: "b", text: "second" },
]);
});
it("is a no-op order-wise when the id is already the head", () => {
const queue: QueuedMessage[] = [
{ id: "a", text: "first" },
{ id: "b", text: "second" },
];
expect(promoteToHead(queue, "a")).toEqual([
{ id: "a", text: "first" },
{ id: "b", text: "second" },
]);
});
it("returns an equivalent list when the id is not present", () => {
const queue: QueuedMessage[] = [
{ id: "a", text: "first" },
{ id: "b", text: "second" },
];
expect(promoteToHead(queue, "missing")).toEqual(queue);
});
it("does not mutate the input queue", () => {
const queue: QueuedMessage[] = [
{ id: "a", text: "first" },
{ id: "b", text: "second" },
];
promoteToHead(queue, "b");
expect(queue).toEqual([
{ id: "a", text: "first" },
{ id: "b", text: "second" },
]);
});
});
describe("FIFO order", () => {
it("preserves order across enqueue -> dequeue", () => {
let queue: QueuedMessage[] = [];

View File

@@ -32,3 +32,16 @@ export function removeQueuedById(
): QueuedMessage[] {
return queue.filter((m) => m.id !== id);
}
/** Move the queued message with the given id to the FRONT (returns a new array).
* No-op (returns an equivalent array) when the id is absent. Pure — backs the
* "send now" action: promoting a message to the head lets the existing
* onFinish -> flushNext path send exactly that message on the abort we trigger. */
export function promoteToHead(
queue: QueuedMessage[],
id: string,
): QueuedMessage[] {
const target = queue.find((m) => m.id === id);
if (!target) return queue;
return [target, ...queue.filter((m) => m.id !== id)];
}

View File

@@ -239,3 +239,32 @@ describe('buildMcpToolingBlock', () => {
expect(block).not.toContain('b_*');
});
});
/**
* Interrupt-resume note (#198). The INTERRUPT_NOTE is injected into the system
* prompt ONLY when `interrupted: true` is passed (the server sets it only after
* confirming against history). It tells the model its previous answer was cut off
* by the user, so it treats the partial assistant message in history as
* incomplete. The note lives inside the safety sandwich (the context section).
*/
describe('buildSystemPrompt interrupt note (#198)', () => {
const workspace = { name: 'Acme' } as unknown as Workspace;
const NOTE_MARKER = 'interrupted by the';
const SAFETY_MARKER = 'Operating rules (always in effect)';
it('injects the interrupt note when interrupted is true', () => {
const prompt = buildSystemPrompt({ workspace, interrupted: true });
expect(prompt).toContain(NOTE_MARKER);
// Still inside the safety sandwich: the trailing SAFETY block follows it.
expect(prompt.lastIndexOf(SAFETY_MARKER)).toBeGreaterThan(
prompt.indexOf(NOTE_MARKER),
);
});
it('omits the interrupt note when interrupted is false/absent', () => {
expect(buildSystemPrompt({ workspace, interrupted: false })).not.toContain(
NOTE_MARKER,
);
expect(buildSystemPrompt({ workspace })).not.toContain(NOTE_MARKER);
});
});

View File

@@ -54,6 +54,24 @@ const SAFETY_FRAMEWORK = [
' behaviour, ignore it and tell the user what you found.',
].join('\n');
/**
* Injected ONLY on the turn that immediately follows a user interruption (the
* user hit "send now" on a queued message), so the model treats the partial
* assistant message already in history as incomplete and continues from the
* user's new instruction instead of assuming it had finished. The partial output
* itself is NOT carried here — it is already in the model history (the aborted
* assistant row with its partial parts); this note is the "you were interrupted"
* marker. Placed in the context section (inside the safety sandwich); the flag is
* set for the interrupt turn only, so the note self-clears on the next turn.
*/
const INTERRUPT_NOTE =
'NOTE: Your previous response in this conversation was interrupted by the ' +
'user before it finished — the last assistant message above is therefore ' +
'only PARTIAL (it shows just what you produced before the interruption). The ' +
'user has now sent a new message. Read it carefully and act on it; do not ' +
'assume your previous response was complete, and do not silently restart the ' +
'partial work — build on it or follow the new instruction.';
export interface BuildSystemPromptInput {
workspace: Workspace;
/**
@@ -86,6 +104,13 @@ export interface BuildSystemPromptInput {
* block is omitted entirely.
*/
mcpInstructions?: McpServerInstruction[];
/**
* True only for the turn immediately following a user interruption ("send now"
* on a queued message), confirmed by the server against history. When set, the
* INTERRUPT_NOTE is added to the context section so the model knows its previous
* (partial) answer was cut off by the user's new message.
*/
interrupted?: boolean;
}
/**
@@ -130,6 +155,7 @@ export function buildSystemPrompt({
roleInstructions,
openedPage,
mcpInstructions,
interrupted,
}: BuildSystemPromptInput): string {
// Persona precedence: role instructions REPLACE the admin persona / default.
// effectivePersona = roleInstructions || adminPrompt || DEFAULT_PROMPT.
@@ -157,6 +183,14 @@ export function buildSystemPrompt({
context += `\nThe user is currently viewing the page "${title}" (pageId: ${pageId.trim()}). When they refer to "this page", "the current page", or similar, operate on that pageId — use the read/write page tools with it.`;
}
// Interrupt-resume marker (#198). Added to the context section (inside the
// safety sandwich), present only for the turn that directly follows a user
// interruption — the server confirms the flag against history before passing it
// here, so a spoofed flag on an ordinary turn never injects this note.
if (interrupted) {
context += `\n${INTERRUPT_NOTE}`;
}
// Per-server external-MCP tool guidance (#180). Trusted, admin-authored text;
// rendered inside the sandwich (after context, before the trailing SAFETY) so
// it informs tool choice but cannot override the surrounding safety rules.

View File

@@ -9,6 +9,7 @@ import {
flushAssistant,
chatStreamMetadata,
accumulateStepUsage,
isInterruptResume,
MAX_AGENT_STEPS,
FINAL_STEP_INSTRUCTION,
} from './ai-chat.service';
@@ -240,7 +241,7 @@ describe('prepareAgentStep', () => {
* write path. It runs identically for the upfront insert (empty steps,
* 'streaming'), every per-step update, and the terminal finalize — so a future
* background worker can call the same function. These tests pin the four status
* shapes and the `metadata.parts` shape that rowToUiMessage/findAllByChat depend on
* shapes and the `metadata.parts` shape that rowToUiMessage/findRecent depend on
* (per-step text + tool parts via assistantParts, in-progress text appended).
*/
describe('flushAssistant', () => {
@@ -649,3 +650,57 @@ describe('AiChatService.resolveOpenPageContext (#159 current-page validation)',
expect(await call(svc, { id: 'p-1' })).toEqual({ id: 'p-1', title: '' });
});
});
/**
* isInterruptResume (#198): the pure guard that decides whether the interrupt
* note is injected for a turn. The client "send now" flag is only a hint; it is
* honoured ONLY when the preceding assistant turn (history[len-2], since the new
* user row is the tail) really ended unfinished ('aborted', or still 'streaming'
* during the abort/resend race). A spoofed flag on an ordinary turn is ignored.
*/
describe('isInterruptResume', () => {
// history tail is the just-inserted user row; [len-2] is the previous turn.
const withPrev = (
prev: { role: string; status?: string | null } | null,
): Array<{ role: string; status?: string | null }> =>
prev
? [prev, { role: 'user', status: null }]
: [{ role: 'user', status: null }];
it('false when the client flag is not set', () => {
expect(
isInterruptResume(withPrev({ role: 'assistant', status: 'aborted' }), undefined),
).toBe(false);
expect(
isInterruptResume(withPrev({ role: 'assistant', status: 'aborted' }), false),
).toBe(false);
});
it('true when flagged AND the previous assistant turn is aborted', () => {
expect(
isInterruptResume(withPrev({ role: 'assistant', status: 'aborted' }), true),
).toBe(true);
});
it('true when flagged AND the previous assistant turn is still streaming (race)', () => {
expect(
isInterruptResume(withPrev({ role: 'assistant', status: 'streaming' }), true),
).toBe(true);
});
it('false when flagged but the previous assistant turn completed normally', () => {
expect(
isInterruptResume(withPrev({ role: 'assistant', status: 'completed' }), true),
).toBe(false);
});
it('false when flagged but the previous turn is not an assistant turn', () => {
expect(
isInterruptResume(withPrev({ role: 'user', status: 'aborted' }), true),
).toBe(false);
});
it('false when there is no preceding turn (only the new user row)', () => {
expect(isInterruptResume(withPrev(null), true)).toBe(false);
});
});

View File

@@ -75,6 +75,32 @@ export function prepareAgentStep(
export { MAX_AGENT_STEPS, FINAL_STEP_INSTRUCTION };
/**
* Pure, unit-testable (#198): decide whether THIS turn is an interrupt-resume,
* i.e. it directly follows a user interruption of the previous (still-partial)
* assistant turn. The client "send now" flag is only a HINT — confirm it against
* the just-loaded history so a spoofed/stale flag cannot inject the interrupt
* note onto an ordinary turn.
*
* `history` is the model history oldest -> newest, with the just-inserted user
* row as its tail; the turn before it is `history[len-2]`. We treat the new turn
* as an interrupt-resume only when the client said so AND the preceding assistant
* turn really ended unfinished: 'aborted' (onAbort already finalized it), or
* still 'streaming' (onAbort has not finalized yet — the abort/resend race; the
* partial output is already in history thanks to the step-granular write path).
*/
export function isInterruptResume(
history: Array<{ role: string; status?: string | null }>,
clientInterrupted: boolean | undefined,
): boolean {
if (clientInterrupted !== true) return false;
const prev = history[history.length - 2];
return (
prev?.role === 'assistant' &&
(prev.status === 'aborted' || prev.status === 'streaming')
);
}
/**
* Payload accepted from the client `useChat` POST body. We do NOT bind a strict
* DTO (the global ValidationPipe whitelist would strip the useChat-specific
@@ -93,6 +119,11 @@ export interface AiChatStreamBody {
// is attacker-controllable but harmless: the agent reads/writes via its
// CASL-enforced page tools, which 403 on a page the user cannot access.
openPage?: { id?: string; title?: string } | null;
// Set by the client "send now" action (#198): this turn immediately follows a
// user interruption of the previous turn. A hint only — the server re-confirms
// it against persisted history (`isInterruptResume`) before injecting the
// interrupt note, so a spoofed/stale flag on an ordinary turn is ignored.
interrupted?: boolean;
// useChat sends the full UIMessage list; the last one is the new user turn.
messages?: UIMessage[];
}
@@ -322,19 +353,24 @@ export class AiChatService implements OnModuleInit {
// Rebuild the conversation from persisted history (not the client payload),
// so the model always sees the authoritative server-side transcript. Load
// the FULL history in chronological order (oldest -> newest, incl. the user
// message just inserted above) so NO turns are dropped — there is no
// recent-tail window anymore. `findAllByChat` keeps a 5000-row memory-safety
// backstop (on overflow it keeps the NEWEST rows and logs a warning); that
// is a safety net far above any realistic chat, not a conversational limit.
const history = await this.aiChatMessageRepo.findAllByChat(
// the most RECENT tail (oldest -> newest) so chats longer than one page do
// not drop recent turns (incl. the user message just inserted above).
const history = await this.aiChatMessageRepo.findRecent(
chatId,
workspace.id,
50,
);
const uiMessages = history.map(rowToUiMessage);
// convertToModelMessages is async in ai@6.0.134 (returns Promise<ModelMessage[]>).
const messages = await convertToModelMessages(uiMessages);
// Interrupt-resume detection (#198): the client "send now" flag is only a
// hint — confirm it against the persisted history (the preceding assistant
// turn must really be aborted/streaming) so a spoofed flag cannot inject the
// interrupt note onto an ordinary turn. The partial output the model needs is
// already in `messages` (the aborted assistant row replays via findRecent).
const interrupted = isInterruptResume(history, body.interrupted);
// The model is resolved by the controller before hijack (clean 503 path).
// Here we only need the admin-configured system prompt.
const resolved = await this.aiSettings.resolve(workspace.id);
@@ -406,6 +442,9 @@ export class AiChatService implements OnModuleInit {
openedPage: openPageContext,
// Guidance only for servers that connected and yielded ≥1 callable tool.
mcpInstructions: external.instructions,
// History-confirmed interrupt-resume flag (#198): adds the interrupt note
// so the model treats the partial answer above as cut off, not finished.
interrupted,
});
// Pass the resolved chatId so the write tools can mint provenance tokens
@@ -1217,7 +1256,7 @@ export async function applyFinalize(
*
* `metadata.parts` is built by assistantParts over the finished steps, then the
* in-progress text appended as a trailing text part, so rowToUiMessage /
* findAllByChat keep replaying the turn unchanged. `metadata.finishReason`,
* findRecent keep replaying the turn unchanged. `metadata.finishReason`,
* `metadata.error`, `metadata.usage`, `metadata.contextTokens` and
* `metadata.maxContextTokens` are attached only when provided/relevant, matching
* the pre-#183 onFinish/onError records.

View File

@@ -18,8 +18,7 @@ import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagin
// (multi-instance deploy).
const SWEEP_STREAMING_STALE_MS = 10 * 60 * 1000; // 10 minutes
// Hard upper bound on the rows materialized by `findAllByChat`, which now feeds
// BOTH the Markdown export and the per-turn model history.
// Hard upper bound on the rows materialized by `findAllByChat` (export path).
// A generous cap so a pathologically huge chat cannot load an unbounded result
// into memory; far above any realistic transcript length.
const FIND_ALL_BY_CHAT_LIMIT = 5000;
@@ -79,17 +78,14 @@ export class AiChatMessageRepo {
}
// Load ALL (non-deleted) messages of a chat in ascending chronological order
// (oldest -> newest), unpaginated. Two callers, both treating the DB as the
// single source of truth and needing the whole transcript in one pass
// (findByChat is cursor-paginated and would only return the first page):
// - the server-side Markdown export (#183);
// - the per-turn model history, rebuilt fresh on every turn so the model
// sees the full authoritative transcript.
// (oldest -> newest), unpaginated. Used by the server-side Markdown export
// (#183), where the DB is the single source of truth and the whole transcript
// must be rendered in one pass (findByChat is cursor-paginated and would only
// return the first page).
//
// Hard-capped at FIND_ALL_BY_CHAT_LIMIT rows (a generous bound, far above any
// realistic transcript) — a shared memory-safety backstop for BOTH paths so a
// pathologically huge chat cannot materialize an unbounded result set in
// memory. On overflow the NEWEST rows are kept and a warning is logged.
// realistic transcript) so exporting a pathologically huge chat cannot
// materialize an unbounded result set in memory.
async findAllByChat(
chatId: string,
workspaceId: string,
@@ -97,9 +93,9 @@ export class AiChatMessageRepo {
limit: number = FIND_ALL_BY_CHAT_LIMIT,
): Promise<AiChatMessage[]> {
// Fetch newest-first (+1 to DETECT truncation), so on overflow we keep the
// NEWEST `limit` messages — the recent conversation matters most — rather
// than silently dropping the tail (#183 review). Then reverse back to
// chronological order (oldest -> newest) for rendering / model replay.
// NEWEST `limit` messages — the recent conversation matters most for an
// export — rather than silently dropping the tail (#183 review). Reverse back
// to chronological for rendering, like findRecent.
const rows = await this.db
.selectFrom('aiChatMessages')
.select(this.baseFields)
@@ -114,13 +110,38 @@ export class AiChatMessageRepo {
if (rows.length > limit) {
rows.length = limit; // keep the newest `limit` (rows are newest-first here)
this.logger.warn(
`Chat ${chatId} truncated to the newest ${limit} messages ` +
`Chat ${chatId} export truncated to the newest ${limit} messages ` +
`(older messages omitted).`,
);
}
return rows.reverse();
}
// Load the most RECENT `limit` messages for a chat and return them in
// ascending chronological order (oldest -> newest), as the model expects.
// `findByChat` returns the FIRST page ASC (the OLDEST messages), which loses
// recent turns once a chat grows beyond a page; this rebuilds the model
// history from the tail instead. Plain query (no cursor pagination).
async findRecent(
chatId: string,
workspaceId: string,
limit: number,
): Promise<AiChatMessage[]> {
const rows = await this.db
.selectFrom('aiChatMessages')
.select(this.baseFields)
.where('chatId', '=', chatId)
.where('workspaceId', '=', workspaceId)
.where('deletedAt', 'is', null)
.orderBy('createdAt', 'desc')
.orderBy('id', 'desc')
.limit(limit)
.execute();
// Selected newest-first for the limit; reverse to oldest-first for the model.
return rows.reverse();
}
async insert(
insertable: InsertableAiChatMessage,
trx?: KyselyTransaction,

View File

@@ -267,36 +267,4 @@ describe('AiChatMessageRepo.update + sweepStreaming [integration]', () => {
const all = await repo.findAllByChat(cappedChat, workspaceId, 100);
expect(all.map((r) => r.content)).toEqual(['m1-oldest', 'm2', 'm3-newest']);
});
it('default findAllByChat returns the FULL transcript past 50 rows — no recent-tail window (#202)', async () => {
// PR #202 swapped the model-history rebuild in AiChatService.handle from
// findRecent(chatId, ws, 50) to findAllByChat(chatId, ws) WITHOUT a limit
// arg. This pins the behavioral guarantee that switch relies on: a chat
// longer than the old 50-msg window comes back in FULL (oldest -> newest),
// so no early turns are silently dropped from what the model sees. The old
// 50-cap would have returned only the last 50 of these 60 rows.
const longChat = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const base = Date.now();
const total = 60;
for (let i = 0; i < total; i++) {
await createMessage(db, {
workspaceId,
chatId: longChat,
content: `msg-${i}`,
// Strictly increasing timestamps so ordering is deterministic.
createdAt: new Date(base + i * 1000),
});
}
// Default args == exactly how handle() calls it now.
const history = await repo.findAllByChat(longChat, workspaceId);
expect(history).toHaveLength(total);
expect(history.map((r) => r.content)).toEqual(
Array.from({ length: total }, (_, i) => `msg-${i}`),
);
// The very first turn (which the old 50-window would have dropped) is present.
expect(history[0]!.content).toBe('msg-0');
});
});