Compare commits

...

14 Commits

Author SHA1 Message Date
agent_coder 68899a2c2e feat(ai-chat): durable detached agent runs — phase 1 (#184/#234)
Squashed for a clean rebase onto develop (was 19 commits; the reviewer approved
the net diff at fb246080). Detaches an agent run from the HTTP request/browser
window: a run is a first-class lifecycle object (ai_chat_runs), a browser
disconnect no longer kills it, a concurrent-run insert-gate prevents double runs,
and a reopened chat live-follows a still-running run via a polled observer merge.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-04 23:35:26 +03:00
agent_vscode 5336f06d10 Merge pull request 'fix(e2e)+ci: канон callout '> [!info]' в e2e-mcp + параллельная сборка с гейтом на publish' (#356) from fix/e2e-callout-and-gate-build into develop 2026-07-04 22:42:11 +03:00
agent_vscode 4bd579f7f6 ci(develop): build image in parallel with tests, gate only the publish
Two-phase scheme instead of the sequential gate: the build job runs in
parallel with test/e2e jobs and only warms the buildx GHA cache
(push:false, cache-to mode=max); a new publish job (needs: test,
e2e-server, e2e-mcp, build) rebuilds from the warm cache (near-instant
on hit, full rebuild on eviction — same as the old sequential timing)
and pushes :develop. GHCR login moved to publish; build-args blocks are
kept textually identical between the two jobs so the cache hits.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-04 22:41:25 +03:00
agent_vscode 7bf1c91a95 ci(develop): gate the :develop image build on e2e suites
Reverse the previous policy where e2e jobs only turned the run red
without blocking the image publish: build.needs now lists test,
e2e-server and e2e-mcp, so a failing test of any kind stops the
:develop image from being built and pushed. Stale policy comments
updated accordingly.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-04 22:33:06 +03:00
agent_vscode 6c82c54470 test(mcp): expect Obsidian '> [!info]' callout export in e2e (#333 canon)
PR #333 deliberately changed the canonical markdown export of callout
nodes to the Obsidian-native format ('> [!type]' + blockquote body,
pinned by packages/prosemirror-markdown unit tests); the importer still
parses both ':::type' fences and '> [!type]'. The get_page e2e assertion
was missed in that switch and still expected ':::info', failing the
e2e-mcp job on develop since 4369bbc5.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-04 22:33:06 +03:00
agent_vscode 382e5196da Merge pull request 'fix(docker): toolchain python3/make/g++ для нативной сборки re2' (#353) from fix/docker-re2-toolchain into develop 2026-07-04 22:11:49 +03:00
agent_vscode 76e0c08cec fix(docker): install python3/make/g++ toolchain for re2 native build
The develop image build broke at `pnpm install --frozen-lockfile`: the new
native dependency re2@1.25.0 (packages/mcp, search_in_page #330) always
compiles from source under pnpm — its prebuilt-binary downloader
(install-artifact-from-github) cannot identify the GitHub repo because pnpm
does not populate npm_package_repository_*/npm_package_json env vars ("No
github repository was identified. Building locally ..."), and node:22-slim
ships no python3/make/g++ for the node-gyp fallback.

- builder stage: add a cache-friendly apt layer with python3 make g++
  before COPY; the stage is discarded so the toolchain may stay.
- installer stage: install the toolchain, run the prod install as the node
  user via `su node -c`, and purge the toolchain — all in one RUN layer so
  the final image stays slim and node_modules ownership needs no extra
  chown layer; USER node is restored right after.

Fixes the failed run 28715009124 (develop docker build); release.yml uses
the same Dockerfile and is covered too.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-04 22:09:40 +03:00
vvzvlad 8978d69f3e Merge pull request 'fix(converter): стабильность round-trip image/медиа — «» ≡ absent (класс defaults-instability)' (#350) from fix/media-roundtrip-stability into develop
Reviewed-on: #350
2026-07-04 21:30:12 +03:00
agent_coder c192f2a2e1 test(prosemirror-markdown): pin the third state — explicit "" converges once, then idempotent
Reviewer addition to the round-trip stability matrix: besides "attr absent" and
"attr has a real value", a string attr in the empty-string class has a third,
degenerate state — a LITERAL "" (a user types alt/title/name in the editor then
deletes it, and Tiptap persists `attr: ""`, distinct from never-set). The fix's
`getAttribute(...) || null` coercion normalizes such a stored "" to the default
on the FIRST round-trip (a one-time "" -> null diff) and is byte-stable from the
SECOND round-trip on.

Adds a convergence contract to the reusable matrix helper (emptyStringClass flag
+ runConvergenceCase): pass 1 must converge the attr to its schema default (NOT
asserted byte-stable vs the "" input — that is the intended one-time
normalization); pass 2 must deep-equal pass 1 (idempotent thereafter). Driven for
every empty-string-class attr across image + the media family (image/drawio
alt+title, video alt via aria-label, pdf/attachment name, attachment mime).
Documents the one-time normalization so a future sync/QA diff does not flag the
single "" -> null change as converter corruption.

Gate: package suite 33 files / 682 tests passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-04 21:17:17 +03:00
vvzvlad d78b985062 Merge pull request 'perf(comment): статический рендер + ленивые редакторы + мемоизация панели (#340)' (#349) from fix/340-comment-panel-perf into develop
Reviewed-on: #349
2026-07-04 20:55:11 +03:00
agent_coder 2ce672709a fix(prosemirror-markdown): stabilize image round-trip — "" ≡ absent on parse (empty-string class)
A stored image authored without `alt` gained a phantom `alt: ""` on every
round-trip (`markdownToProseMirror(convertProseMirrorToMarkdown(doc))`): `marked`
renders `![](src)` as `<img alt="">`, and the stock tiptap Image `alt` parseHTML
(`getAttribute("alt")`) materialized the empty string where the original had no
attribute. That false diff is a real GS-EDIT-REVERT churn source — an agent /
git-sync touch of a page with an image mutates the stored JSON (`absent -> ""`),
producing phantom diffs that can overwrite live edits.

Fix is PARSE-SIDE ("" ≡ absent), so the RAW round-trip is idempotent — not only
the canonical form (history / stored JSON diff on the raw shape; masking it only
in canonicalize would leave that noise). `image.alt`/`title` parseHTML now coerce
`getAttribute(...) || null`, plus defense-in-depth `|| null` across the at-risk
empty-string class (video aria-label, drawio/excalidraw title+alt, pdf name,
attachment name+mime) matching the existing `image.caption || null` precedent.

NOTE — image `align` is NOT changed: it round-trips correctly (center via the
schema default "center", left/right via the `<!--img {...}-->` comment). Its
`toBeUndefined()` in the git-sync gate is canonical-form normalization, not a loss.

Intentional divergence from editor-ext: editor-ext's literal `alt` parseHTML
returns "" verbatim, but this coercion CONVERGES on editor-ext's real STORED
shape (an image inserted without alt has no `alt` attribute -> re-parses absent,
never ""), so the round-trip is idempotent and matches real documents.

Adds a reusable, node-agnostic round-trip-stability matrix helper
(test/roundtrip-stability.helper.ts) — given a node + attr spec it enumerates
default/non-default combos and asserts byte-stability of BOTH the raw and the
canonical round-trip (the documented numeric width/height→string coercion encoded
as an explicit allowed normalization) — driven over image + the whole media
family (video/audio/pdf/attachment/embed/drawio/excalidraw). The only raw
empty-string instability it found was image.alt; the family was already stable.

Gate: package suite 33 files / 672 tests passed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-04 20:51:34 +03:00
vvzvlad c252068672 Merge pull request 'feat(ai-chat): отложенная загрузка инструментов (deferred tools + loadTools) (#332)' (#341) from fix/332-deferred-tools into develop
Reviewed-on: #341
2026-07-04 20:47:45 +03:00
agent_coder 68caf8157a test(ai-chat): document AI_CHAT_DEFERRED_TOOLS + pin ON-path & catalog completeness (#341 review F1-F3)
- F1: document AI_CHAT_DEFERRED_TOOLS in .env.example (AI_* section) — default
  ON = deferred loading (compact catalog + loadTools), =false restores the old
  "all tools always active" behavior.
- F2: integration test of the ON path in ai-chat-stream.int-spec.ts — a deferred
  tool activated via loadTools is active on the SAME turn's next step but a fresh
  turn starts cold (CORE + loadTools only), proving the per-turn activatedTools
  Set does not leak across turns/chats. Drives the real streamText loop with a
  MockLanguageModelV3 and inspects recorded per-step activeTools-filtered tools.
- F3: replace the magic toHaveLength(28) in tool-tiers.spec.ts with a two-way
  partition against the LIVE in-app toolset (AiChatToolsService.forUser keys):
  every non-core tool must appear in buildInAppDeferredCatalog and every catalog
  entry must map to a real non-core tool — so a future tool forgotten in
  INLINE_TOOL_TIERS fails the suite instead of silently vanishing from the agent.

No production logic change (mechanism was already reviewed correct).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-04 20:34:42 +03:00
claude code agent 227 e431b33bb1 feat(ai-chat): deferred tool loading (tiers + loadTools meta-tool) (#332)
The in-app AI agent shipped all ~41 tool schemas on every model step. This
adds a two-tier catalog: core tools (frequent or one-line) stay always-active;
the rest are advertised as a compact catalog and their full schema is fetched
on demand via the loadTools meta-tool, wired through ai@6 prepareStep's
per-step activeTools.

- tools/tool-tiers.ts: CORE_TOOL_KEYS, INLINE_TOOL_TIERS, applyLoadTools,
  catalog builders (+ tool-tiers.spec.ts, 13 cases).
- ai-chat.service.ts prepareAgentStep: returns activeTools =
  [...CORE_TOOL_KEYS, loadTools, ...activatedTools]; per-turn activated Set.
- ai-chat.prompt.ts: buildToolCatalogBlock renders the deferred catalog.
- mcp/tool-specs.ts: tier + catalogLine metadata (external snake_case /mcp
  transport unchanged).
- EnvironmentService.isAiChatDeferredToolsEnabled(): AI_CHAT_DEFERRED_TOOLS,
  default ON per issue intent (kill-switch =false restores old behavior).

Gate: server ai-chat 631/631, tool-tiers 13/13, mcp 472/472, tsc clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-04 19:57:11 +03:00
53 changed files with 6309 additions and 591 deletions
+21
View File
@@ -202,6 +202,27 @@ MCP_DOCMOST_PASSWORD=
# Default 900000 (15 min).
# AI_MCP_CALL_TIMEOUT_MS=900000
# Deferred tool loading for the in-app AI chat (#332). Default ON: the agent sees
# a compact <tool_catalog> and only CORE tools + a loadTools meta-tool are active
# each step; deferred tools (the fat/rare ones + all external MCP tools) load on
# demand. Set AI_CHAT_DEFERRED_TOOLS=false to restore the old "all tools always
# active" behavior.
# AI_CHAT_DEFERRED_TOOLS=true
# --- Autonomous / detached agent runs (settings.ai.autonomousRuns) ---
# Opt-in per workspace (AI settings; off by default). When on, a chat turn becomes
# a server-side RUN that survives a browser disconnect — only an explicit Stop ends
# it, and a client reconnects/live-follows the run.
#
# DEPLOY CONSTRAINT — SINGLE-INSTANCE ONLY in phase 1: Stop and the in-process
# AbortController that backs it are process-local, so a Stop only aborts a run
# executing on the SAME replica that owns it (cross-instance pub/sub stop is phase
# 2 and not yet reliable). Do NOT enable autonomousRuns on a horizontally-scaled
# deployment (multiple replicas behind a load balancer, or Docmost cloud
# CLOUD=true) — run a single instance instead. The server logs a startup WARNING
# when it detects a multi-instance deployment (CLOUD=true) so the constraint is
# visible, and a startup sweep settles any run left dangling by a restart.
# --- Anonymous public-share AI assistant ---
# Opt-in per workspace (AI settings -> "public share assistant"; off by default).
# When enabled, anonymous visitors of a published share can ask an AI about that
+42 -11
View File
@@ -18,12 +18,48 @@ env:
IMAGE: ghcr.io/vvzvlad/gitmost
jobs:
# Run the reusable test suite first so a failing test blocks the image build.
# Run the reusable test suite. Together with the e2e jobs below it gates the
# publish job (the image push), not the build itself — build runs in parallel.
test:
uses: ./.github/workflows/test.yml
# Runs in parallel with the test/e2e jobs and only warms the buildx cache
# (GHA cache, scope develop-amd64). No push happens here — the publish job
# below is the only one that pushes the image.
build:
needs: test
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Resolve version
id: version
run: echo "value=$(git describe --tags --always)" >> "$GITHUB_OUTPUT"
- name: Build develop image (warm cache, no push)
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64
build-args: |
APP_VERSION=${{ steps.version.outputs.value }}
AI_AGENT_ROLES_CATALOG_URL=https://raw.githubusercontent.com/vvzvlad/gitmost/develop/agent-roles-catalog
push: false
cache-from: type=gha,scope=develop-amd64
cache-to: type=gha,scope=develop-amd64,mode=max,ignore-error=true
# The gate: rebuilds from the cache the build job just wrote (near-instant on
# a cache hit; worst case — cache eviction — a full rebuild, which matches the
# old sequential timing) and pushes :develop only when unit tests AND both
# e2e suites AND the build are green.
publish:
needs: [test, e2e-server, e2e-mcp, build]
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
@@ -57,13 +93,10 @@ jobs:
push: true
tags: ${{ env.IMAGE }}:develop
cache-from: type=gha,scope=develop-amd64
cache-to: type=gha,scope=develop-amd64,mode=max,ignore-error=true
# e2e jobs run on every develop push but DO NOT gate the build/publish above:
# `build` stays `needs: test` only, so the :develop image still ships even if
# e2e fails. A failing e2e job turns the run red and triggers GitHub's email
# to the pusher — that red run + email is the intended notification, not a
# deploy block.
# e2e jobs gate the publish (image push), not the build: the :develop image
# is pushed only when unit tests AND both e2e suites pass (publish.needs
# lists them all).
e2e-server:
runs-on: ubuntu-latest
# Hard cap: the full-AppModule e2e leaks open handles and hung jest to the 6h max.
@@ -124,9 +157,7 @@ jobs:
- name: Run server e2e
run: pnpm --filter ./apps/server test:e2e
# Same rationale as e2e-server: this job is intentionally NOT in
# `build.needs`. Deploy of the :develop image must not be blocked by e2e;
# a red run plus GitHub's email to the pusher is the notification mechanism.
# Gates the publish too — see the comment above e2e-server.
e2e-mcp:
runs-on: ubuntu-latest
timeout-minutes: 20
+1
View File
@@ -279,6 +279,7 @@ The API server is a Fastify app with a global `/api` prefix (`main.ts` excludes
- `core/ai-chat/tools/` — the agent's ~40 read+write tools. Every tool runs under the **calling user's** CASL permissions via a per-user loopback access token (`docmost-client.loader.ts`), so the agent can never exceed what the user could do. Only **reversible** operations are exposed (page history + trash; no permanent delete). Agent edits get an "AI agent" provenance badge in page history (`20260616T130000-agent-provenance` migration).
- `core/ai-chat/embedding/` — RAG indexer + a BullMQ consumer on `AI_QUEUE` that embeds pages into `page_embeddings` (vector search), complementing Postgres full-text search. Pages are (re)indexed on edit; `AI_EMBEDDING_TIMEOUT_MS` bounds a hung embeddings endpoint.
- `core/ai-chat/external-mcp/` — admins can attach external MCP servers (e.g. Tavily) to give the agent web access. **`ssrf-guard.ts` validates outbound MCP URLs against SSRF** — keep that guard in the path when touching external-MCP connection logic.
- `core/ai-chat/ai-chat-run.service.ts` + `ai_chat_runs`**detached/autonomous agent runs** (`#184`), behind the per-workspace `settings.ai.autonomousRuns` flag (off by default). When on, a turn becomes a server-side RUN that survives a browser disconnect; only an explicit `POST /ai-chat/stop` ends it, and a client reconnects/live-follows via `POST /ai-chat/run`. **DEPLOY CONSTRAINT — single-instance only in phase 1:** Stop and the AbortController that backs it are process-local, so a Stop only aborts a run executing on the **same** replica that owns it (cross-instance pub/sub stop is phase 2). Do **not** enable `autonomousRuns` on a horizontally-scaled deployment (multiple replicas behind a load balancer, or Docmost cloud `CLOUD=true`) — run a single instance instead. The server logs a startup WARNING when it detects a multi-instance deployment (`CLOUD=true`) so the constraint is visible. The startup sweep settles any run left dangling by a restart.
### Client structure
Vite SPA. Code is organized by feature under `apps/client/src/features/*` (mirrors the server domains: `page`, `space`, `comment`, `ai-chat`, `editor`, …). Conventions:
+13
View File
@@ -72,6 +72,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
append/prepend fragments, nor to COMMENT bodies — a comment may legitimately
contain a standalone footnote definition, which canonicalization would drop.
(#228)
- **Detached, autonomous agent runs that survive a browser disconnect.** When the
new `settings.ai.autonomousRuns` workspace flag is on (off by default), an
AI-chat turn becomes a first-class, server-side RUN tracked in a new
`ai_chat_runs` table instead of a socket-bound stream: closing the tab or
losing the connection no longer aborts the turn — it keeps executing and
persisting server-side, and only an explicit Stop ends it. A client can
reconnect and live-follow (or stop) an in-flight run via `POST /ai-chat/run`
(resolve the latest run + its assistant message for a chat) and
`POST /ai-chat/stop` (stop by `runId` or `chatId`). A partial unique index
enforces one active run per chat, and a startup sweep settles any run left
dangling by a restart. Phase 1 is single-instance-only (cross-instance Stop is
not yet reliable); the server warns at startup on a horizontally-scaled
deployment. (#184)
- **Out-of-band page transfer via an in-RAM blob sandbox (`stash_page`).** A
new MCP tool serializes a whole page (its full ProseMirror JSON, with every
internal image/file mirrored) into an ephemeral in-RAM blob and returns only
+16 -2
View File
@@ -5,6 +5,13 @@ RUN npm install -g pnpm@10.4.0
FROM base AS builder
# re2 (packages/mcp) always compiles from source under pnpm (the prebuilt-binary
# download cannot identify the GitHub repo), so node-gyp needs python3/make/g++.
# This stage is discarded, so the toolchain can stay installed.
RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 make g++ \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY . .
@@ -57,9 +64,16 @@ COPY --from=builder /app/patches /app/patches
RUN chown -R node:node /app
USER node
# Toolchain is needed transiently to compile re2 during the prod install; install
# and purge it in one layer to keep the final image slim. The install itself runs
# as the node user via su to keep node_modules ownership without a costly chown layer.
RUN apt-get update \
&& apt-get install -y --no-install-recommends python3 make g++ \
&& su node -c "pnpm install --frozen-lockfile --prod" \
&& apt-get purge -y --auto-remove python3 make g++ \
&& rm -rf /var/lib/apt/lists/*
RUN pnpm install --frozen-lockfile --prod
USER node
RUN mkdir -p /app/data/storage
@@ -19,7 +19,7 @@ import {
IconPlus,
IconX,
} from "@tabler/icons-react";
import { useAtom, useSetAtom } from "jotai";
import { useAtom, useAtomValue, useSetAtom } from "jotai";
import { useLocation, useMatch } from "react-router-dom";
import { useTranslation } from "react-i18next";
import { useQueryClient } from "@tanstack/react-query";
@@ -41,13 +41,24 @@ import { extractPageSlugId } from "@/lib";
import {
AI_CHATS_RQ_KEY,
AI_CHAT_MESSAGES_RQ_KEY,
AI_CHAT_RUN_RQ_KEY,
useAiChatMessagesQuery,
useAiChatRunQuery,
useAiChatsQuery,
useAiRolesQuery,
} from "@/features/ai-chat/queries/ai-chat-query.ts";
import {
shouldClearLatchOnQueryError,
shouldClearStoppingLatch,
shouldObserveRun,
} from "@/features/ai-chat/utils/run-polling.ts";
import { workspaceAtom } from "@/features/user/atoms/current-user-atom";
import ConversationList from "@/features/ai-chat/components/conversation-list.tsx";
import ChatThread from "@/features/ai-chat/components/chat-thread.tsx";
import { exportAiChat } from "@/features/ai-chat/services/ai-chat-service.ts";
import {
exportAiChat,
stopRun,
} from "@/features/ai-chat/services/ai-chat-service.ts";
import { useChatSession } from "@/features/ai-chat/hooks/use-chat-session.ts";
import {
shouldCollapseOnOutsidePointer,
@@ -234,6 +245,147 @@ export default function AiChatWindow() {
const { data: messageRows, isLoading: messagesLoading } =
useAiChatMessagesQuery(activeChatId ?? undefined);
// #184 reconnect-and-live-follow. Whether detached agent runs are enabled for
// this workspace. The reconnect endpoint itself is NOT flag-gated server-side
// (it is only owner-gated and returns `{ run: null }` when the chat has no
// run); but when the feature is off no runs are ever created, so polling it
// would always come back empty — we gate it off here to avoid pointless polls.
const workspace = useAtomValue(workspaceAtom);
const autonomousRunsEnabled =
workspace?.settings?.ai?.autonomousRuns === true;
// Whether THIS tab is the one actively streaming the open chat's run locally
// (it started the run here and holds the SSE). Reported up from ChatThread. We
// are the STREAMER while true and a passive OBSERVER while false — the basis of
// the observer-vs-streamer detection. Reset to false by the fresh ChatThread's
// mount effect on every chat switch.
const [localStreaming, setLocalStreaming] = useState(false);
const onStreamingChange = useCallback((streaming: boolean) => {
setLocalStreaming(streaming);
}, []);
// #184 Stop wiring. While a detached run is being stopped we SUPPRESS the
// observer merge so the stopping run's still-persisting output does not
// re-stream back into view between the moment the user pressed Stop and the run
// actually settling as 'aborted' server-side. Polling itself keeps running (so
// the terminal transition is still detected) — only the visual merge is gated.
// Cleared when the run is observed terminal (below) or the chat is switched.
const [stoppingRun, setStoppingRun] = useState(false);
// Reset the stopping latch whenever the open chat changes: it is scoped to the
// run of the previously-open chat.
useEffect(() => {
setStoppingRun(false);
}, [activeChatId]);
// Authoritative stop of the open chat's detached run (the Stop button in
// autonomous mode). Latch "stopping" first (suppresses the re-stream flash),
// then request the server stop — the ONLY thing that ends a detached run; a mere
// local SSE abort is a client disconnect the server ignores. On failure we
// release the latch so the observer resumes (better to show the live run than to
// freeze the view) and surface the error.
const handleServerStop = useCallback(
(chatId: string): void => {
setStoppingRun(true);
// #234 F4: drop the PREVIOUS turn's run from the cache so `run` becomes null
// until the CURRENT turn's run is fetched fresh. Without this, once the local
// stream aborts (localStreaming -> false) the run query re-enables and
// react-query SYNCHRONOUSLY returns the still-cached prior terminal run; the
// terminal effect would then clear the stopping latch against that STALE run
// before the current turn's (still-running, detached, growing) run is ever
// observed — re-opening the observer merge and flashing the growing output
// over the frozen row. With the cache cleared the terminal effect's
// `if (!run) return` holds the latch until the current run itself is observed
// terminal (see shouldClearStoppingLatch).
queryClient.removeQueries({ queryKey: AI_CHAT_RUN_RQ_KEY(chatId) });
void stopRun(chatId).catch(() => {
setStoppingRun(false);
notifications.show({
message: t("Failed to stop the run"),
color: "red",
});
});
},
[t, queryClient],
);
// Poll the latest run of the open chat ONLY when we are a passive observer:
// feature on, a chat is open, and we are NOT the local streamer (the streamer
// already has the live SSE — polling/merging too would double-render). The
// query's own status-keyed refetchInterval stops once the run is terminal.
const { data: runData, isError: runQueryFailed } = useAiChatRunQuery(
activeChatId ?? undefined,
autonomousRunsEnabled && !localStreaming,
);
const run = runData?.run ?? null;
// Safety net (#234 F4 review): after handleServerStop clears the run cache,
// `run` is null until the current turn's run is fetched fresh, and the terminal
// effect below holds the latch via `if (!run) return`. If that refetch instead
// ERRORS PERMANENTLY (the GET-run keeps failing) while we are no longer the
// streamer, the run stays null, its status-keyed refetchInterval is off, and
// nothing would ever observe a terminal run — freezing the view with the
// observer merge suppressed. Release the latch on that error so the live view
// resumes rather than stays stuck (the local stopRun may already have succeeded
// independently).
//
// #234 F7: this must NOT fire on a TRANSIENT error while `run` is still an
// ACTIVE held run. In TanStack Query v5 (retry:false) the query's `data` is
// RETAINED on error, so `runQueryFailed` can be true while `run` is still
// pending/running — releasing then would re-open the observer merge and flash
// the growing detached run over the frozen row (the very flash F4 prevents). The
// decision is the pure, unit-tested `shouldClearLatchOnQueryError`, which gates
// on the run NOT being active: it cures only the genuine permanent-null-freeze
// (`run === null`) and never releases against an active run.
useEffect(() => {
if (
shouldClearLatchOnQueryError({
stoppingRun,
isLocalStreaming: localStreaming,
runQueryFailed,
run,
})
)
setStoppingRun(false);
}, [stoppingRun, localStreaming, runQueryFailed, run]);
// The run's incrementally-persisted assistant message to merge into the thread,
// but only while we are an observer (never when we are the streamer — guards
// against a stale poll fighting the live stream). Includes a terminal run so the
// final persisted output is shown on reopen.
const observedRow =
shouldObserveRun(run, localStreaming) && !stoppingRun
? (runData?.message ?? null)
: null;
// When the observed run reaches a terminal status, do a final messages refetch
// so the persisted final state (token/context badge, export source) is shown,
// then the query's refetchInterval has already stopped polling. Deduped per run
// id so it fires exactly once per run, not on every subsequent poll-less render.
const finalizedRunIdRef = useRef<string | null>(null);
useEffect(() => {
if (!run || !activeChatId) return;
if (run.status === "pending" || run.status === "running") {
// Active again (a new run) — re-arm so its terminal transition fires once.
finalizedRunIdRef.current = null;
return;
}
// Terminal: a stop we requested has landed (or the run finished on its own),
// so release the stopping latch — the observer merge can now show the final
// persisted (aborted/finished) output without any live re-stream. The decision
// is the pure, unit-tested `shouldClearStoppingLatch` (run-polling.ts): release
// ONLY when we requested a stop, this tab is no longer the streamer, AND the
// CURRENT run is terminal. The #234 F4 cache removal in handleServerStop makes
// `run` null (this branch's `if (!run) return` above holds) until the current
// turn's run is fetched fresh, so the latch can never clear against a stale
// cached run.
if (shouldClearStoppingLatch({ stoppingRun, run, isLocalStreaming: localStreaming }))
setStoppingRun(false);
if (finalizedRunIdRef.current === run.id) return;
finalizedRunIdRef.current = run.id;
queryClient.invalidateQueries({
queryKey: AI_CHAT_MESSAGES_RQ_KEY(activeChatId),
});
}, [run, activeChatId, queryClient, stoppingRun, localStreaming]);
// The page the user is currently viewing. AiChatWindow lives in a pathless
// parent layout route, so useParams() can't see :pageSlug. Match the full
// pathname against the authenticated page route instead so "the current page"
@@ -882,6 +1034,18 @@ export default function AiChatWindow() {
assistantName={currentRole?.name}
onTurnFinished={onTurnFinished}
onServerChatId={onServerChatId}
// #184: live-follow a still-running run when we reopened the chat as
// a passive observer; null when there is nothing to observe or this
// tab is the streamer. onStreamingChange lets the window stop polling
// while we are the streamer.
observedRow={observedRow}
onStreamingChange={onStreamingChange}
// #184: in autonomous mode the Stop button must hit the authoritative
// server stop (a local SSE abort is a client disconnect the server
// ignores). onServerStop also arms the "stopping" latch above so the
// stopped run's output does not re-stream via the observer merge.
autonomousRunsEnabled={autonomousRunsEnabled}
onServerStop={handleServerStop}
/>
)}
</div>
@@ -11,6 +11,7 @@ const h = vi.hoisted(() => ({
onFinish: null as null | ((arg: Record<string, unknown>) => void),
sendMessage: vi.fn(),
stop: vi.fn(),
setMessages: vi.fn(),
transport: null as null | {
prepareSendMessagesRequest: (arg: {
messages: unknown[];
@@ -30,6 +31,8 @@ vi.mock("@ai-sdk/react", () => ({
status: h.state.status,
stop: h.state.stop,
error: null,
// #184: ChatThread reads setMessages to merge a polled observer run.
setMessages: h.state.setMessages,
};
},
}));
@@ -228,3 +231,56 @@ describe("ChatThread — turn-end decision (onFinish)", () => {
}
});
});
// #184 passive-observer merge: when reconnecting to a still-running run, the
// parent feeds the polled run message via `observedRow`; ChatThread merges it via
// setMessages — but ONLY when this tab is NOT itself streaming (the streamer's
// SSE owns the view, so a stale observedRow must never overwrite it).
describe("ChatThread — observer run merge (#184)", () => {
beforeEach(() => {
h.state.onFinish = null;
h.state.setMessages.mockReset();
});
const observedRow = {
id: "a-run",
role: "assistant",
content: "step 1\nstep 2",
metadata: {
parts: [{ type: "text", text: "step 1\nstep 2" }],
},
createdAt: "2026-01-01T00:00:00Z",
} as const;
function renderObserver(status: string) {
h.state.status = status;
render(
<MantineProvider>
<ChatThread
chatId="c1"
initialRows={[]}
onTurnFinished={vi.fn()}
observedRow={observedRow as never}
/>
</MantineProvider>,
);
}
it("merges the polled run message when this tab is a passive observer", () => {
renderObserver("ready");
expect(h.state.setMessages).toHaveBeenCalledTimes(1);
// The updater replaces/append the observed assistant row by id.
const updater = h.state.setMessages.mock.calls[0][0] as (
prev: { id: string; parts: { text: string }[] }[],
) => { id: string; parts: { text: string }[] }[];
const merged = updater([{ id: "u1", parts: [{ text: "hi" }] }]);
expect(merged).toHaveLength(2);
expect(merged[1].id).toBe("a-run");
expect(merged[1].parts[0].text).toBe("step 1\nstep 2");
});
it("does NOT merge while THIS tab is the streamer (no double-render)", () => {
renderObserver("streaming");
expect(h.state.setMessages).not.toHaveBeenCalled();
});
});
@@ -24,6 +24,7 @@ import {
} from "@/features/ai-chat/utils/role-launch.ts";
import { describeChatError } from "@/features/ai-chat/utils/error-message.ts";
import { extractServerChatId } from "@/features/ai-chat/utils/adopt-chat-id.ts";
import { mergeObservedMessage } from "@/features/ai-chat/utils/run-polling.ts";
import {
dequeue,
enqueueMessage,
@@ -86,6 +87,29 @@ interface ChatThreadProps {
* Copy/export button available mid-stream). Distinct from onTurnFinished,
* which fires only at the terminal outcome. */
onServerChatId?: (serverChatId?: string) => void;
/** #184 reconnect-and-live-follow. When THIS tab reopened a chat whose agent
* run is still going (it is a PASSIVE OBSERVER — it did not start the run here),
* the parent polls the reconnect endpoint and feeds the run's incrementally-
* persisted assistant message here; we merge it into the live list so new
* steps/tool-calls appear as they are persisted. Null when there is nothing to
* observe (no run, feature off, or this tab IS the streamer). The merge is
* ADDITIONALLY guarded by our own `isStreaming`, so a stale value can never
* fight the local stream when we are the streamer. */
observedRow?: IAiChatMessageRow | null;
/** Report this tab's live streaming status up to the parent, so it can stop
* polling the run while WE are the active streamer (the SSE owns the view) and
* resume once we go idle. Called from an effect on every transition. */
onStreamingChange?: (streaming: boolean) => void;
/** #184: whether detached/autonomous agent runs are enabled for this workspace.
* When true the Stop button must additionally hit the AUTHORITATIVE server stop
* (via onServerStop) — aborting only the local SSE is just a client disconnect,
* which the server deliberately ignores, so the detached run would keep going. */
autonomousRunsEnabled?: boolean;
/** #184: request the server-side stop of this chat's active run (the parent owns
* the endpoint call + the "stopping" latch that keeps observer-polling from
* immediately re-streaming the stopping run's output). Called with the resolved
* chat id when the user presses Stop in autonomous mode. */
onServerStop?: (chatId: string) => void;
}
/**
@@ -131,6 +155,10 @@ export default function ChatThread({
assistantName,
onTurnFinished,
onServerChatId,
observedRow,
onStreamingChange,
autonomousRunsEnabled,
onServerStop,
}: ChatThreadProps) {
const { t } = useTranslation();
@@ -216,6 +244,16 @@ export default function ChatThread({
const flushOnAbortRef = useRef(false);
const interruptNextSendRef = useRef(false);
// #234 F5: the user pressed Stop while streaming a BRAND-NEW chat whose server
// chat id has not been adopted yet (the `start` chunk carrying it hadn't landed
// when Stop was pressed). A local SSE abort alone does NOT stop the DETACHED
// autonomous run — it keeps burning tokens and WRITING TO PAGES — so we cannot
// just no-op. We latch the stop as PENDING and fire the authoritative server
// stop the moment onServerChatId adopts the id (below). Read-and-cleared there;
// also defused on every new turn start so it can never fire against a later,
// unrelated turn's run.
const stopPendingRef = useRef(false);
// FIFO dequeue + send the next queued message (no-op when the queue is empty).
// Returns whether a message was actually sent, so callers can tell an empty
// dequeue (nothing to flush) from a real send.
@@ -274,7 +312,7 @@ export default function ChatThread({
[],
);
const { messages, sendMessage, status, stop, error } = useChat({
const { messages, sendMessage, status, stop, error, setMessages } = useChat({
// Stable per-mount key. Existing chats use their real id; new chats use a
// generated client id (never `undefined`) so the store is NOT re-created on
// every render mid-stream (see `chatStoreId` above).
@@ -365,7 +403,14 @@ export default function ChatThread({
return;
lastForwardedChatIdRef.current = serverChatId;
onServerChatId(serverChatId);
}, [messages, onServerChatId]);
// #234 F5: if Stop was pressed before the id was known, the authoritative
// server stop was deferred to this adoption point — fire it now with the
// just-adopted id. One-shot (read-and-clear) so it can't fire twice.
if (stopPendingRef.current) {
stopPendingRef.current = false;
onServerStop?.(serverChatId);
}
}, [messages, onServerChatId, onServerStop]);
// Live "turn was interrupted" marker for the CURRENT session. The red error
// banner (driven by `error`) covers the error case; this covers an aborted
@@ -378,6 +423,27 @@ export default function ChatThread({
const isStreaming = status === "submitted" || status === "streaming";
// #184: report our live streaming status up so the parent stops polling the run
// while WE are the streamer (the SSE owns the view) and resumes once we go idle.
// Effect (not render) so it never updates parent state during our own render;
// fires on mount with `false`, which also re-syncs the parent after a chat
// switch remounts this thread (a fresh mount is idle until the user sends).
useEffect(() => {
onStreamingChange?.(isStreaming);
}, [isStreaming, onStreamingChange]);
// #184 passive-observer merge: when the parent feeds a polled run message (we
// reopened a chat whose run is still going and did NOT start it here), merge it
// into the live list so new steps/tool-calls appear as they are persisted. Hard-
// gated by `!isStreaming`: if THIS tab is actually the streamer, the local SSE
// owns the view and a stale observedRow must never overwrite it. `observedRow`
// is a stable per-poll object, so this runs once per poll, not per render.
useEffect(() => {
if (isStreaming || !observedRow) return;
const observed = rowToUiMessage(observedRow);
setMessages((prev) => mergeObservedMessage(prev, observed));
}, [observedRow, isStreaming, setMessages]);
// "Send now" on a queued message: interrupt the current turn and immediately
// send THIS message, keeping the agent's partial output. Other queued messages
// stay queued and flush normally after the new turn. Reuses the existing
@@ -409,6 +475,40 @@ export default function ChatThread({
[setQueue, stop],
);
// Stop the current turn. ALWAYS abort the local SSE (`stop()`) so the composer
// returns to idle immediately. In AUTONOMOUS mode the turn is a DETACHED run:
// aborting the local SSE is only a client disconnect, which the server ignores,
// so the run would keep executing — we ADDITIONALLY request the authoritative
// server-side stop (the parent owns that call + the "stopping" latch that keeps
// observer-polling from re-streaming the stopping run's output). The chat id is
// read live from chatIdRef (adopted early at the stream's `start` chunk); if it
// is not known yet — a brand-new chat in the first moment of its first turn —
// only the local abort happens (there is no server-side run handle to stop yet).
const handleStop = useCallback(() => {
stop();
if (!autonomousRunsEnabled) return;
if (chatIdRef.current) {
onServerStop?.(chatIdRef.current);
} else {
// #234 F5: no chat id yet (brand-new chat in the first moment of its first
// turn, before the `start` chunk adopted the id). Latch the stop as pending;
// the onServerChatId adoption effect fires the deferred server stop as soon
// as the id appears, so the detached run is still authoritatively stopped
// instead of left running by a silent local-only abort.
//
// KNOWN LIMITATION (#234 F5 review): `stop()` above has already aborted the
// local SSE reader. In the rare sub-window where Stop is pressed while still
// `submitted` (request sent, not one chunk read yet), that abort can cancel
// the reader BEFORE the `start` chunk is applied to `messages`, so the
// adoption effect never runs and this pending stop never fires. The detached
// run then keeps going for that turn. This is not a regression (the pre-fix
// behavior sent no server stop at all); closing it fully would require
// deferring the local abort until adoption, which is riskier and out of scope
// for this fix. Documented so a future change can address the abort-ordering.
stopPendingRef.current = true;
}
}, [stop, autonomousRunsEnabled, onServerStop]);
// Clear the stopped marker as soon as a new turn begins streaming, and drop any
// stale "Send now" interrupt flags. On the legit interrupt path both refs are
// already consumed synchronously (onFinish + prepareSendMessagesRequest) before
@@ -420,6 +520,11 @@ export default function ChatThread({
setStopNotice(null);
flushOnAbortRef.current = false;
interruptNextSendRef.current = false;
// #234 F5: a new turn is starting — drop any pending deferred-stop from a
// previous turn that never adopted an id, so it can never fire against this
// (or a later) unrelated turn's run. A deferred stop for the CURRENT turn is
// set AFTER this effect (on the Stop click), so this does not clobber it.
stopPendingRef.current = false;
}
}, [isStreaming]);
@@ -539,7 +644,7 @@ export default function ChatThread({
<ChatInput
onSend={(text) => sendMessage({ text })}
onQueue={enqueue}
onStop={stop}
onStop={handleStop}
isStreaming={isStreaming}
/>
</Stack>
@@ -12,6 +12,7 @@ import {
deleteAiChat,
deleteAiRole,
getAiChatMessages,
getAiChatRun,
getAiChats,
getAiRoleCatalog,
getAiRoleCatalogBundle,
@@ -24,6 +25,7 @@ import {
import {
IAiChat,
IAiChatMessageRow,
IAiChatRunResponse,
IAiRole,
IAiRoleCatalog,
IAiRoleCatalogBundle,
@@ -34,6 +36,7 @@ import {
IAiRoleUpdateFromCatalogResult,
} from "@/features/ai-chat/types/ai-chat.types.ts";
import { IPagination } from "@/lib/types.ts";
import { runPollInterval } from "@/features/ai-chat/utils/run-polling.ts";
export const AI_CHATS_RQ_KEY = ["ai-chats"];
export const AI_ROLES_RQ_KEY = ["ai-roles"];
@@ -51,16 +54,18 @@ export const AI_CHAT_MESSAGES_RQ_KEY = (chatId: string) => [
"ai-chat-messages",
chatId,
];
export const AI_CHAT_RUN_RQ_KEY = (chatId: string) => ["ai-chat-run", chatId];
/** Paginated list of the current user's chats (auto-loads further pages). */
export function useAiChatsQuery() {
const query = useInfiniteQuery({
queryKey: AI_CHATS_RQ_KEY,
queryFn: ({ pageParam }) =>
getAiChats({ cursor: pageParam, limit: 50 }),
queryFn: ({ pageParam }) => getAiChats({ cursor: pageParam, limit: 50 }),
initialPageParam: undefined as string | undefined,
getNextPageParam: (lastPage) =>
lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined,
lastPage.meta.hasNextPage
? (lastPage.meta.nextCursor ?? undefined)
: undefined,
});
const data = useMemo<IPagination<IAiChat> | undefined>(() => {
@@ -90,7 +95,9 @@ export function useAiChatMessagesQuery(chatId: string | undefined) {
getAiChatMessages({ chatId: chatId as string, cursor: pageParam }),
initialPageParam: undefined as string | undefined,
getNextPageParam: (lastPage) =>
lastPage.meta.hasNextPage ? (lastPage.meta.nextCursor ?? undefined) : undefined,
lastPage.meta.hasNextPage
? (lastPage.meta.nextCursor ?? undefined)
: undefined,
enabled: !!chatId,
});
@@ -131,6 +138,34 @@ export function useAiChatMessagesQuery(chatId: string | undefined) {
};
}
/**
* Reconnect to a chat's latest agent run and LIVE-FOLLOW it (#184). While the run
* is active the query re-polls every {@link runPollInterval} ms (driven off the
* fetched `run.status`, the same status-keyed refetchInterval pattern as the
* embeddings reindex polling); once the run reaches a terminal status — or there
* is no run — the interval returns `false` and polling stops on its own. Polling
* is thus naturally bounded by the run terminating; no separate timeout cap.
*
* `enabled` gates the whole thing: callers pass `false` when the autonomous-runs
* feature is off (the endpoint is NOT flag-gated server-side, but with the feature
* off the chat has no runs, so polling would only ever return `{ run: null }`) OR
* when THIS tab is the one actively streaming the run (the live SSE owns the view,
* so we must not also poll/merge). The global `retry: false` means a failed fetch
* leaves `data` undefined, so refetchInterval(undefined run) returns false — a
* failed fetch can never spin a tight loop.
*/
export function useAiChatRunQuery(
chatId: string | undefined,
enabled: boolean,
) {
return useQuery<IAiChatRunResponse, Error>({
queryKey: AI_CHAT_RUN_RQ_KEY(chatId ?? ""),
queryFn: () => getAiChatRun(chatId as string),
enabled: !!chatId && enabled,
refetchInterval: (query) => runPollInterval(query.state.data?.run),
});
}
export function useRenameAiChatMutation() {
const queryClient = useQueryClient();
const { t } = useTranslation();
@@ -280,11 +315,14 @@ export function useImportAiRolesFromCatalogMutation() {
mutationFn: (payload) => importAiRolesFromCatalog(payload),
onSuccess: (result) => {
notifications.show({
message: t("Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}", {
created: result.created,
renamed: result.renamed,
skipped: result.skipped,
}),
message: t(
"Imported {{created}}, renamed {{renamed}}, skipped {{skipped}}",
{
created: result.created,
renamed: result.renamed,
skipped: result.skipped,
},
),
});
// Surface partial failures (e.g. unique-name races) as a red warning.
if (result.errors.length > 0) {
@@ -0,0 +1,92 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import React from "react";
import { renderHook, waitFor } from "@testing-library/react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import type { IAiChatRunResponse } from "@/features/ai-chat/types/ai-chat.types.ts";
// react-i18next is pulled in transitively by ai-chat-query.ts (the mutation hooks
// use it); stub it so the module imports cleanly in this hook test.
vi.mock("react-i18next", () => ({
useTranslation: () => ({ t: (key: string) => key }),
}));
vi.mock("@mantine/notifications", () => ({
notifications: { show: vi.fn() },
}));
// Mock the whole service module; only getAiChatRun is exercised here, but the
// other named exports must exist so ai-chat-query.ts imports resolve.
vi.mock("@/features/ai-chat/services/ai-chat-service.ts", () => ({
getAiChatRun: vi.fn(),
getAiChatMessages: vi.fn(),
getAiChats: vi.fn(),
getAiRoleCatalog: vi.fn(),
getAiRoleCatalogBundle: vi.fn(),
getAiRoles: vi.fn(),
importAiRolesFromCatalog: vi.fn(),
createAiRole: vi.fn(),
deleteAiChat: vi.fn(),
deleteAiRole: vi.fn(),
renameAiChat: vi.fn(),
updateAiRole: vi.fn(),
updateAiRoleFromCatalog: vi.fn(),
}));
import { getAiChatRun } from "@/features/ai-chat/services/ai-chat-service.ts";
import { useAiChatRunQuery } from "@/features/ai-chat/queries/ai-chat-query.ts";
function createWrapper() {
const queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
});
return function Wrapper({ children }: { children: React.ReactNode }) {
return (
<QueryClientProvider client={queryClient}>{children}</QueryClientProvider>
);
};
}
const runningResponse: IAiChatRunResponse = {
run: { id: "run-1", chatId: "c1", status: "running" },
message: {
id: "a1",
role: "assistant",
content: "working...",
createdAt: "2026-01-01T00:00:00Z",
},
};
describe("useAiChatRunQuery — enable gating", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("fetches the run when enabled (passive observer, feature on)", async () => {
vi.mocked(getAiChatRun).mockResolvedValue(runningResponse);
const { result } = renderHook(() => useAiChatRunQuery("c1", true), {
wrapper: createWrapper(),
});
await waitFor(() => expect(result.current.isSuccess).toBe(true));
expect(getAiChatRun).toHaveBeenCalledWith("c1");
expect(result.current.data?.run?.status).toBe("running");
});
it("does NOT fetch when disabled (this tab is the streamer / feature off)", async () => {
vi.mocked(getAiChatRun).mockResolvedValue(runningResponse);
renderHook(() => useAiChatRunQuery("c1", false), {
wrapper: createWrapper(),
});
// Give any errant fetch a chance to fire, then assert none did.
await new Promise((r) => setTimeout(r, 20));
expect(getAiChatRun).not.toHaveBeenCalled();
});
it("does NOT fetch when there is no chat id", async () => {
vi.mocked(getAiChatRun).mockResolvedValue(runningResponse);
renderHook(() => useAiChatRunQuery(undefined, true), {
wrapper: createWrapper(),
});
await new Promise((r) => setTimeout(r, 20));
expect(getAiChatRun).not.toHaveBeenCalled();
});
});
@@ -5,6 +5,7 @@ import {
IAiChatListParams,
IAiChatMessageRow,
IAiChatMessagesParams,
IAiChatRunResponse,
IAiRole,
IAiRoleCatalog,
IAiRoleCatalogBundle,
@@ -42,6 +43,38 @@ export async function getAiChatMessages(
return req.data;
}
/**
* Reconnect to the latest agent run of a chat (#184). Returns the run's
* persisted lifecycle state and the assistant message it materializes (the
* partial output while the run is in-flight, the final output once it finished).
* The DB is the source of truth, so this works for an in-flight run (the browser
* dropped, the run kept going) and a finished one alike; `{ run: null }` when the
* chat has never had a run. Owner-gated server-side (the requesting user must own
* the chat); it is NOT flag-gated — when the feature is off the chat simply has no
* runs, so the endpoint returns `{ run: null }`.
*/
export async function getAiChatRun(
chatId: string,
): Promise<IAiChatRunResponse> {
const req = await api.post<IAiChatRunResponse>("/ai-chat/run", { chatId });
return req.data;
}
/**
* Explicitly STOP the active agent run of a chat (#184). This is the ONLY thing
* that ends a DETACHED run — a mere browser disconnect (aborting the local SSE)
* is deliberately ignored server-side, so the client must call this to actually
* stop an autonomous run. Targeted by `chatId` (the server resolves whatever run
* is active on it); owner-gated server-side. Returns `{ stopped }` — false when
* there was nothing active to stop.
*/
export async function stopRun(
chatId: string,
): Promise<{ stopped: boolean }> {
const req = await api.post<{ stopped: boolean }>("/ai-chat/stop", { chatId });
return req.data;
}
/**
* Resolve the chat bound to a document (the current user's most-recent chat
* created on that page), or null when there is none. Drives auto-open-on-page.
@@ -200,6 +200,38 @@ export interface IAiChatMessageRow {
createdAt: string;
}
/**
* A persisted agent-run row (#184), mirroring the `ai_chat_runs` fields the
* client reads from `POST /ai-chat/run`. Only `status` is load-bearing for the
* reconnect-and-live-update UX (it drives the poll cadence); the rest are carried
* for display/diagnostics. The DB is the source of truth, so this resolves for an
* in-flight run (the browser dropped, the run kept going) and a finished one.
*/
export interface IAiChatRun {
id: string;
chatId: string;
// 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'. The first two are
// ACTIVE (keep polling); the rest are TERMINAL (stop polling).
status: "pending" | "running" | "succeeded" | "failed" | "aborted" | string;
error?: string | null;
stepCount?: number;
assistantMessageId?: string | null;
startedAt?: string | null;
finishedAt?: string | null;
createdAt?: string;
updatedAt?: string;
}
/**
* Response of `POST /ai-chat/run` (#184): the latest run of a chat and the
* assistant message it materializes (the partial/final output, projected from the
* persisted rows). Both are `null` when the chat has never had a run.
*/
export interface IAiChatRunResponse {
run: IAiChatRun | null;
message: IAiChatMessageRow | null;
}
export interface IAiChatListParams extends QueryParams {}
export interface IAiChatMessagesParams {
@@ -0,0 +1,303 @@
import { describe, it, expect } from "vitest";
import type { UIMessage } from "@ai-sdk/react";
import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts";
import {
RUN_POLL_INTERVAL_MS,
isRunActive,
runPollInterval,
shouldObserveRun,
shouldClearStoppingLatch,
shouldClearLatchOnQueryError,
mergeObservedMessage,
} from "./run-polling.ts";
function makeRun(status: string): IAiChatRun {
return { id: "run-1", chatId: "c1", status };
}
function makeMsg(id: string, text: string): UIMessage {
return {
id,
role: "assistant",
parts: [{ type: "text", text }],
} as UIMessage;
}
describe("isRunActive", () => {
it("treats pending and running as active", () => {
expect(isRunActive(makeRun("pending"))).toBe(true);
expect(isRunActive(makeRun("running"))).toBe(true);
});
it("treats terminal / unknown / nullish as not active", () => {
expect(isRunActive(makeRun("succeeded"))).toBe(false);
expect(isRunActive(makeRun("failed"))).toBe(false);
expect(isRunActive(makeRun("aborted"))).toBe(false);
expect(isRunActive(makeRun("weird-future-status"))).toBe(false);
expect(isRunActive(null)).toBe(false);
expect(isRunActive(undefined)).toBe(false);
});
});
describe("runPollInterval (the refetchInterval helper)", () => {
it("returns 2000ms while the run is pending/running", () => {
expect(runPollInterval(makeRun("pending"))).toBe(RUN_POLL_INTERVAL_MS);
expect(runPollInterval(makeRun("running"))).toBe(RUN_POLL_INTERVAL_MS);
expect(RUN_POLL_INTERVAL_MS).toBe(2000);
});
it("returns false (stop polling) once the run is terminal", () => {
expect(runPollInterval(makeRun("succeeded"))).toBe(false);
expect(runPollInterval(makeRun("failed"))).toBe(false);
expect(runPollInterval(makeRun("aborted"))).toBe(false);
});
it("returns false (no polling) when there is no run", () => {
expect(runPollInterval(null)).toBe(false);
expect(runPollInterval(undefined)).toBe(false);
});
});
describe("shouldObserveRun (observer-vs-streamer decision)", () => {
it("observes an active run when this tab is NOT the local streamer", () => {
expect(shouldObserveRun(makeRun("running"), false)).toBe(true);
expect(shouldObserveRun(makeRun("pending"), false)).toBe(true);
});
it("observes a terminal run too (so the final output shows on reopen)", () => {
expect(shouldObserveRun(makeRun("succeeded"), false)).toBe(true);
});
it("does NOT observe when this tab IS the streamer (no double-render)", () => {
expect(shouldObserveRun(makeRun("running"), true)).toBe(false);
expect(shouldObserveRun(makeRun("succeeded"), true)).toBe(false);
});
it("does NOT observe when there is no run", () => {
expect(shouldObserveRun(null, false)).toBe(false);
expect(shouldObserveRun(undefined, false)).toBe(false);
});
});
describe("shouldClearStoppingLatch (#234 latch-release decision)", () => {
// The one case the latch SHOULD clear: we requested a stop, we are the passive
// observer (not streaming), and the CURRENT run is terminal.
it("clears only when stopping, observing, and the run is terminal", () => {
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: makeRun("aborted"),
isLocalStreaming: false,
}),
).toBe(true);
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: makeRun("succeeded"),
isLocalStreaming: false,
}),
).toBe(true);
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: makeRun("failed"),
isLocalStreaming: false,
}),
).toBe(true);
});
// Round-3 regression: clearing while THIS tab is still the local streamer would
// re-open the flash for the current turn the moment we switch to observer role.
// A predicate lacking the streaming gate would (wrongly) return true here.
it("does NOT clear while this tab is the local streamer", () => {
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: makeRun("aborted"),
isLocalStreaming: true,
}),
).toBe(false);
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: makeRun("succeeded"),
isLocalStreaming: true,
}),
).toBe(false);
});
// The detached run keeps growing after a local abort — while it is still
// active the latch MUST hold so the observer merge stays suppressed.
it("does NOT clear while the run is still active", () => {
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: makeRun("running"),
isLocalStreaming: false,
}),
).toBe(false);
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: makeRun("pending"),
isLocalStreaming: false,
}),
).toBe(false);
});
// #234 F4: on Stop the stale PREVIOUS-turn run is removed from the cache, so the
// observed `run` is null until the current turn's run is fetched fresh. A null
// run HOLDS the latch — it can never clear against the just-removed stale run,
// only against the current turn's own terminal run once observed.
it("does NOT clear against a removed/absent run (F4 stale-run guard)", () => {
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: null,
isLocalStreaming: false,
}),
).toBe(false);
expect(
shouldClearStoppingLatch({
stoppingRun: true,
run: undefined,
isLocalStreaming: false,
}),
).toBe(false);
});
it("does NOT clear when no stop was requested", () => {
expect(
shouldClearStoppingLatch({
stoppingRun: false,
run: makeRun("aborted"),
isLocalStreaming: false,
}),
).toBe(false);
});
});
describe("shouldClearLatchOnQueryError (#234 F7 error-safety-net decision)", () => {
// This guards the REAL anti-flash decision the component's run-query-error
// safety-net effect uses (ai-chat-window.tsx wires the effect to THIS helper,
// not a copy — so the test is non-vacuous vs the live code).
// (b) The F7 hole: a TRANSIENT run-query error while `run` is STILL ACTIVE must
// NOT clear the latch. TanStack Query v5 retains `data` on error, so
// runQueryFailed can be true while the held run is still pending/running.
// Against the PRE-F7 condition (without `!isRunActive(run)`) this would return
// true — so this assertion fails on the buggy code (non-vacuous).
it("does NOT clear on a transient error while the run is still ACTIVE (F7)", () => {
expect(
shouldClearLatchOnQueryError({
stoppingRun: true,
isLocalStreaming: false,
runQueryFailed: true,
run: makeRun("running"),
}),
).toBe(false);
expect(
shouldClearLatchOnQueryError({
stoppingRun: true,
isLocalStreaming: false,
runQueryFailed: true,
run: makeRun("pending"),
}),
).toBe(false);
});
// (a) The genuine permanent-null-freeze: run cache cleared by removeQueries +
// the refetch keeps ERRORING, so `run === null`. This is the ONLY case the
// safety-net exists to cure — it MUST clear so the frozen view resumes.
it("clears on a permanent error when the run is null (permanent-null-freeze)", () => {
expect(
shouldClearLatchOnQueryError({
stoppingRun: true,
isLocalStreaming: false,
runQueryFailed: true,
run: null,
}),
).toBe(true);
expect(
shouldClearLatchOnQueryError({
stoppingRun: true,
isLocalStreaming: false,
runQueryFailed: true,
run: undefined,
}),
).toBe(true);
});
// A TERMINAL run also satisfies `!isRunActive`; clearing then is harmless — the
// terminal effect (shouldClearStoppingLatch) already clears for a terminal run,
// so this only ever agrees with it. Asserted so the (c) reasoning is pinned.
it("clears on an error when the run is terminal (harmless, agrees with terminal effect)", () => {
expect(
shouldClearLatchOnQueryError({
stoppingRun: true,
isLocalStreaming: false,
runQueryFailed: true,
run: makeRun("aborted"),
}),
).toBe(true);
});
it("does NOT clear without an actual query error", () => {
expect(
shouldClearLatchOnQueryError({
stoppingRun: true,
isLocalStreaming: false,
runQueryFailed: false,
run: null,
}),
).toBe(false);
});
it("does NOT clear while this tab is the local streamer", () => {
expect(
shouldClearLatchOnQueryError({
stoppingRun: true,
isLocalStreaming: true,
runQueryFailed: true,
run: null,
}),
).toBe(false);
});
it("does NOT clear when no stop was requested", () => {
expect(
shouldClearLatchOnQueryError({
stoppingRun: false,
isLocalStreaming: false,
runQueryFailed: true,
run: null,
}),
).toBe(false);
});
});
describe("mergeObservedMessage", () => {
it("replaces the message with the same id in place (per-step growth)", () => {
const prev = [makeMsg("u1", "hi"), makeMsg("a1", "step 1")];
const observed = makeMsg("a1", "step 1\nstep 2");
const next = mergeObservedMessage(prev, observed);
expect(next).toHaveLength(2);
expect(next[1]).toBe(observed);
expect(next[0]).toBe(prev[0]); // untouched
expect(next).not.toBe(prev); // new array (never mutates input)
});
it("appends when the observed message is not yet present", () => {
const prev = [makeMsg("u1", "hi")];
const observed = makeMsg("a1", "first token");
const next = mergeObservedMessage(prev, observed);
expect(next).toHaveLength(2);
expect(next[1]).toBe(observed);
});
it("returns the original list unchanged when there is nothing to merge", () => {
const prev = [makeMsg("u1", "hi")];
expect(mergeObservedMessage(prev, null)).toBe(prev);
expect(mergeObservedMessage(prev, undefined)).toBe(prev);
});
});
@@ -0,0 +1,151 @@
import type { UIMessage } from "@ai-sdk/react";
import type { IAiChatRun } from "@/features/ai-chat/types/ai-chat.types.ts";
/**
* Reconnect-and-live-follow helpers (#184). When a chat is reopened while its
* agent run is STILL going, this tab is a PASSIVE OBSERVER: it did not start the
* run here (no local SSE stream), so it catches up by POLLING the reconnect
* endpoint (`POST /ai-chat/run`) and merging the run's incrementally-persisted
* assistant message into the rendered thread. These are the small pure decisions
* that machinery hangs off, extracted so they can be unit-tested in isolation
* (mirrors how reindex polling / editor-sync-state are tested).
*/
/** How often to re-poll the reconnect endpoint while a run is ACTIVE. */
export const RUN_POLL_INTERVAL_MS = 2000;
// 'pending' and 'running' are the two ACTIVE statuses; 'succeeded' | 'failed' |
// 'aborted' are TERMINAL (and any unknown future status is treated as terminal,
// so a stale/odd value never polls forever).
const ACTIVE_STATUSES = new Set(["pending", "running"]);
/** Whether a run is still going (worth polling / merging live updates from). */
export function isRunActive(run: IAiChatRun | null | undefined): boolean {
return !!run && ACTIVE_STATUSES.has(run.status);
}
/**
* The TanStack Query `refetchInterval` value for the run query: poll every
* {@link RUN_POLL_INTERVAL_MS} while the run is active, and `false` (stop) once
* it is terminal or there is no run. Polling is thus naturally bounded by the run
* reaching a terminal status no separate timeout cap is needed.
*/
export function runPollInterval(
run: IAiChatRun | null | undefined,
): number | false {
return isRunActive(run) ? RUN_POLL_INTERVAL_MS : false;
}
/**
* Observer-vs-streamer decision. We render the polled run message (catch up +
* keep advancing) ONLY when this tab is a passive observer: there IS a run AND
* this tab is NOT the one locally streaming it (we reconnected, we didn't start
* it here). When this tab is the streamer, the live SSE stream owns the view, so
* we neither poll nor merge avoiding a double-render fight. Terminal runs still
* merge (so the final persisted output is shown on reopen); the poll itself is
* stopped separately by {@link runPollInterval}.
*/
export function shouldObserveRun(
run: IAiChatRun | null | undefined,
localStreaming: boolean,
): boolean {
return !!run && !localStreaming;
}
/**
* Should the "stopping" latch which suppresses the observer re-stream flash
* after the user pressed Stop be RELEASED now? All three must hold:
* - `stoppingRun`: we actually requested a stop (otherwise nothing to release);
* - `!isLocalStreaming`: this tab is NOT the local streamer. While we are the
* streamer the run query is disabled, so the observed `run` is not the run we
* are following releasing the latch then would re-open the flash for the
* current turn the instant we switch to observer role;
* - the observed `run` EXISTS and has reached a TERMINAL status.
*
* The null / still-active `run` case is the #234 F4 invariant. On Stop the stale
* PREVIOUS-turn run is removed from the query cache (`removeQueries`), so `run`
* is null until the CURRENT turn's run is re-fetched fresh; a null or active run
* therefore HOLDS the latch, so it can only ever clear against the current turn's
* OWN terminal run never a stale cached one. (The cache removal itself is
* integration-level in AiChatWindow; this predicate encodes the decision given
* whatever run is currently observed, and a stale terminal run is
* indistinguishable from a current terminal run at the predicate level hence
* the cache removal is what guarantees only the current run is ever passed here.)
*/
export function shouldClearStoppingLatch(args: {
stoppingRun: boolean;
run: IAiChatRun | null | undefined;
isLocalStreaming: boolean;
}): boolean {
const { stoppingRun, run, isLocalStreaming } = args;
if (!stoppingRun || isLocalStreaming) return false;
return !!run && !isRunActive(run);
}
/**
* Should the "stopping" latch be RELEASED by the run-query ERROR safety-net?
* (#234 F7 a NEW path of the same re-stream flash the F4 latch exists to
* prevent.) After Stop, `handleServerStop` clears the run cache; the terminal
* effect then holds the latch via `if (!run) return` until the CURRENT turn's run
* is fetched fresh. If that refetch instead ERRORS permanently, `run` stays null,
* its status-keyed refetchInterval is off, and nothing would ever observe a
* terminal run freezing the view with the observer merge suppressed. This
* safety-net cures ONLY that genuine permanent-null-freeze.
*
* All four must hold:
* - `stoppingRun`: we actually requested a stop (otherwise nothing to release);
* - `!isLocalStreaming`: this tab is NOT the local streamer (same reason as
* {@link shouldClearStoppingLatch});
* - `runQueryFailed`: the run query is in its error state (TanStack Query v5 with
* retry:false isError);
* - `!isRunActive(run)`: the observed `run` is NOT an active (pending/running)
* held run. This is the F7 gate. In TanStack Query v5 the query's `data` is
* RETAINED on error, so `runQueryFailed` can be true while `run` is STILL an
* ACTIVE run (a single transient GET-run failure in the window between Stop and
* settle). Without this gate a transient error would release the latch early
* re-opening the observer merge and flashing the growing detached run over the
* frozen row (exactly the F4 flash). Gating on the run NOT being active means we
* only ever cure the permanent-null-freeze (`run === null`, so
* `isRunActive(null)` is false), never release against an active run.
*
* (A terminal `run` also satisfies `!isRunActive(run)`; clearing then is harmless
* the terminal effect's {@link shouldClearStoppingLatch} already clears the
* latch for a terminal run, so this only ever agrees with it, never conflicts.)
*
* INVARIANT (do not break): clearing the latch on the `run === null` branch is safe
* ONLY because the run query's `refetchInterval` (see {@link runPollInterval}) stops
* polling when the data is empty so after we clear on null+error there is no
* subsequent auto-poll that could return a still-active detached run and re-open the
* merge. If `refetchInterval` is ever changed to keep polling on `run === null`/on
* error, this null-branch clear would re-open the F7 flash through the null path.
* Do not change the run query's refetchInterval without re-checking this path.
*/
export function shouldClearLatchOnQueryError(args: {
stoppingRun: boolean;
isLocalStreaming: boolean;
runQueryFailed: boolean;
run: IAiChatRun | null | undefined;
}): boolean {
const { stoppingRun, isLocalStreaming, runQueryFailed, run } = args;
return (
stoppingRun && !isLocalStreaming && runQueryFailed && !isRunActive(run)
);
}
/**
* Merge an observed assistant message into the rendered list: replace the message
* with the same id in place (the in-progress assistant row is already seeded from
* history, so per-step growth replaces it), or append it when absent. Returns a
* new array; the input is never mutated.
*/
export function mergeObservedMessage(
messages: UIMessage[],
observed: UIMessage | null | undefined,
): UIMessage[] {
if (!observed) return messages;
const idx = messages.findIndex((m) => m.id === observed.id);
if (idx === -1) return [...messages, observed];
const next = messages.slice();
next[idx] = observed;
return next;
}
@@ -394,6 +394,10 @@ export default function AiProviderSettings() {
useState<boolean>(
workspace?.settings?.ai?.publicShareAssistant ?? false,
);
// #184: detached/autonomous agent runs (settings.ai.autonomousRuns).
const [autonomousRunsEnabled, setAutonomousRunsEnabled] = useState<boolean>(
workspace?.settings?.ai?.autonomousRuns ?? false,
);
const [chatToggleLoading, setChatToggleLoading] = useState(false);
const [searchToggleLoading, setSearchToggleLoading] = useState(false);
const [dictationToggleLoading, setDictationToggleLoading] = useState(false);
@@ -403,6 +407,8 @@ export default function AiProviderSettings() {
publicShareAssistantToggleLoading,
setPublicShareAssistantToggleLoading,
] = useState(false);
const [autonomousRunsToggleLoading, setAutonomousRunsToggleLoading] =
useState(false);
// Whether a key is currently stored server-side (drives the placeholder).
const [hasApiKey, setHasApiKey] = useState(false);
@@ -730,6 +736,37 @@ export default function AiProviderSettings() {
}
}
// Optimistic toggle for detached/autonomous agent runs
// (settings.ai.autonomousRuns). When on, a chat turn becomes a server-side run
// that survives a browser disconnect and can be reconnected to / live-followed;
// only an explicit Stop ends it. Off by default; single-instance-only in phase 1.
async function handleToggleAutonomousRuns(value: boolean) {
setAutonomousRunsToggleLoading(true);
const previous = autonomousRunsEnabled;
setAutonomousRunsEnabled(value);
try {
const updated = await updateWorkspace({ autonomousRuns: value });
setWorkspace({
...updated,
settings: {
...updated.settings,
ai: { ...updated.settings?.ai, autonomousRuns: value },
},
});
notifications.show({ message: t("Updated successfully") });
} catch (err) {
setAutonomousRunsEnabled(previous);
const message = (err as { response?: { data?: { message?: string } } })
?.response?.data?.message;
notifications.show({
message: message ?? t("Failed to update data"),
color: "red",
});
} finally {
setAutonomousRunsToggleLoading(false);
}
}
// Admins only — match the previous behavior.
if (!isAdmin) {
return (
@@ -960,6 +997,31 @@ export default function AiProviderSettings() {
{...form.getInputProps("publicShareAssistantRoleId")}
/>
{/* Detached/autonomous agent runs: a chat turn becomes a server-side run
that survives a browser disconnect; only an explicit Stop ends it.
Single-instance-only in phase 1. */}
<Group justify="space-between" align="center" wrap="nowrap" mt="md">
<Stack gap={0}>
<Text fw={600} size="sm">
{t("Autonomous agent runs")}
</Text>
<Text size="xs" c="dimmed">
{t(
"Keep an agent turn running server-side even if the browser disconnects; reconnect and follow it on reopen. Single-instance deployments only.",
)}
</Text>
</Stack>
<Switch
label={t("Enabled")}
labelPosition="left"
checked={autonomousRunsEnabled}
disabled={autonomousRunsToggleLoading}
onChange={(e) =>
handleToggleAutonomousRuns(e.currentTarget.checked)
}
/>
</Group>
<Group mt="md" align="center">
<Button
variant="default"
@@ -26,6 +26,9 @@ export interface IWorkspace {
aiDictation?: boolean;
aiDictationStreaming?: boolean;
aiPublicShareAssistant?: boolean;
// Write-only field for updateWorkspace({ autonomousRuns }). Read state lives at
// settings.ai.autonomousRuns.
autonomousRuns?: boolean;
trashRetentionDays?: number;
// Default lifetime (HOURS) for new temporary notes; frozen per-note at creation.
temporaryNoteHours?: number;
@@ -65,6 +68,9 @@ export interface IWorkspaceAiSettings {
dictation?: boolean;
dictationStreaming?: boolean;
publicShareAssistant?: boolean;
// #184: detached agent runs (a run survives a browser disconnect and can be
// reconnected to / live-followed on reopen). Gates the run-reconnect polling.
autonomousRuns?: boolean;
}
export interface IWorkspaceSharingSettings {
@@ -0,0 +1,527 @@
import { Logger } from '@nestjs/common';
import {
AiChatRunService,
RunAlreadyActiveError,
ONE_ACTIVE_RUN_PER_CHAT_INDEX,
mapTurnStatusToRun,
} from './ai-chat-run.service';
/** Shape a Postgres unique-violation the way the postgres.js driver surfaces it:
* SQLSTATE 23505 + the offending index in `constraint_name`. */
function uniqueViolation(constraintName: string): Error & {
code: string;
constraint_name: string;
} {
return Object.assign(
new Error('duplicate key value violates unique constraint'),
{
code: '23505',
constraint_name: constraintName,
},
);
}
/**
* Unit coverage for the #184 phase-1 run lifecycle (AiChatRunService) with a
* hand-rolled mock repo no Nest graph, no DB. The invariant under test is the
* one that makes a run "autonomous": a run keeps going when its SUBSCRIBER (the
* browser) detaches, and ONLY an explicit stop aborts it. We assert that at the
* abort-signal level (the signal the agent loop actually consumes).
*/
/** Minimal EnvironmentService stub. Single-instance (CLOUD unset) by default. */
function makeEnv(isCloud = false) {
return { isCloud: () => isCloud };
}
function makeRepo(overrides: Record<string, jest.Mock> = {}) {
return {
insert: jest.fn(async (v: any) => ({
id: 'run-1',
status: v.status ?? 'running',
chatId: v.chatId,
workspaceId: v.workspaceId,
})),
update: jest.fn(async () => ({ id: 'run-1' })),
markStopRequested: jest.fn(async () => ({ id: 'run-1' })),
findActiveByChat: jest.fn(async () => undefined),
findLatestByChat: jest.fn(async () => undefined),
findById: jest.fn(async () => undefined),
sweepRunning: jest.fn(async () => 0),
...overrides,
};
}
describe('mapTurnStatusToRun', () => {
it('maps the turn terminal status to the run terminal status', () => {
expect(mapTurnStatusToRun('completed')).toBe('succeeded');
expect(mapTurnStatusToRun('error')).toBe('failed');
expect(mapTurnStatusToRun('aborted')).toBe('aborted');
});
});
describe('AiChatRunService.onModuleInit (startup sweep)', () => {
afterEach(() => jest.restoreAllMocks());
it('calls sweepRunning and resolves; logs when > 0', async () => {
const repo = makeRepo({ sweepRunning: jest.fn(async () => 2) });
const logSpy = jest
.spyOn(Logger.prototype, 'log')
.mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await expect(svc.onModuleInit()).resolves.toBeUndefined();
expect(repo.sweepRunning).toHaveBeenCalledTimes(1);
expect(logSpy).toHaveBeenCalledTimes(1);
expect(String(logSpy.mock.calls[0][0])).toContain('2');
});
it('a sweep failure is swallowed (never blocks startup)', async () => {
const repo = makeRepo({
sweepRunning: jest.fn(async () => {
throw new Error('db down');
}),
});
const warnSpy = jest
.spyOn(Logger.prototype, 'warn')
.mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await expect(svc.onModuleInit()).resolves.toBeUndefined();
// The first warn is the sweep failure (the multi-instance warn never fires
// single-instance), so the message is the db error.
expect(String(warnSpy.mock.calls[0][0])).toContain('db down');
});
it('F1 (DECISION C): the boot sweep is UNCONDITIONAL — sweepRunning is called with NO staleness window, so a fresh running run (updatedAt = now) is settled, not skipped', async () => {
// The bug: a fast restart (deploy/OOM within minutes of the last step) left a
// run stuck 'running' under the old 10-min window, 409ing every later turn in
// the chat. The fix settles ALL pending|running on boot. We assert the service
// invokes sweepRunning with no `staleMs` (the unconditional path); the repo's
// own spec proves no-window => no updatedAt filter.
const repo = makeRepo({ sweepRunning: jest.fn(async () => 1) });
jest.spyOn(Logger.prototype, 'log').mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await svc.onModuleInit();
expect(repo.sweepRunning).toHaveBeenCalledTimes(1);
const callArgs = repo.sweepRunning.mock.calls[0] as unknown[];
const firstArg = callArgs[0] as { staleMs?: number } | undefined;
// Either no opts at all, or opts without a staleMs window => unconditional.
expect(firstArg?.staleMs).toBeUndefined();
});
it('F2 (DECISION A): warns at startup that autonomousRuns is single-instance-only when a horizontally-scaled deployment (CLOUD) is detected', async () => {
const repo = makeRepo();
const warnSpy = jest
.spyOn(Logger.prototype, 'warn')
.mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv(true) as never);
await svc.onModuleInit();
const warned = warnSpy.mock.calls.some((c) =>
/single-instance-only/i.test(String(c[0])),
);
expect(warned).toBe(true);
});
it('F2: does NOT warn about multi-instance on a single-instance (CLOUD unset) deployment', async () => {
const repo = makeRepo();
const warnSpy = jest
.spyOn(Logger.prototype, 'warn')
.mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv(false) as never);
await svc.onModuleInit();
const warned = warnSpy.mock.calls.some((c) =>
/single-instance-only/i.test(String(c[0])),
);
expect(warned).toBe(false);
});
});
describe('AiChatRunService run lifecycle', () => {
it('beginRun inserts a running row and registers a live abort controller', async () => {
const repo = makeRepo();
const svc = new AiChatRunService(repo as never, makeEnv() as never);
const handle = await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
expect(repo.insert).toHaveBeenCalledWith(
expect.objectContaining({
chatId: 'chat-1',
workspaceId: 'ws-1',
createdBy: 'user-1',
status: 'running',
trigger: 'user',
}),
);
expect(handle.runId).toBe('run-1');
expect(handle.signal.aborted).toBe(false);
expect(svc.isLocallyActive('run-1')).toBe(true);
});
it('beginRun REJECTS the racer: a 23505 on the one-active-per-chat index throws RunAlreadyActiveError (not swallowed) and registers no controller', async () => {
// The race: the controller's cheap pre-check passed for BOTH concurrent
// turns, so the loser's INSERT hits the partial unique index. That rejection
// is the authoritative gate — it must surface, not be swallowed into an
// untracked turn.
const repo = makeRepo({
insert: jest.fn(async () => {
throw uniqueViolation(ONE_ACTIVE_RUN_PER_CHAT_INDEX);
}),
});
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await expect(
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
).rejects.toBeInstanceOf(RunAlreadyActiveError);
// No controller leaked for a rejected start.
expect(svc.isLocallyActive('run-1')).toBe(false);
});
it('beginRun does NOT mask an unrelated unique violation as already-active', async () => {
// A 23505 on some OTHER constraint is a real bug, not the race — it must
// propagate unchanged so it is never silently treated as "already active".
const other = uniqueViolation('ai_chat_runs_pkey');
const repo = makeRepo({
insert: jest.fn(async () => {
throw other;
}),
});
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await expect(
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
).rejects.toBe(other);
});
it('beginRun propagates a non-unique insert failure unchanged', async () => {
const boom = new Error('connection reset');
const repo = makeRepo({
insert: jest.fn(async () => {
throw boom;
}),
});
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await expect(
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
).rejects.toBe(boom);
});
it('two concurrent begins on one chat: exactly one wins, the other is rejected as already-active', async () => {
// Integration-style: model the DB partial unique index with a one-shot slot.
// The first insert claims it; the second hits a 23505 on the active index.
let slotTaken = false;
const repo = makeRepo({
insert: jest.fn(async (v: any) => {
if (slotTaken) throw uniqueViolation(ONE_ACTIVE_RUN_PER_CHAT_INDEX);
slotTaken = true;
return { id: 'run-win', status: v.status, chatId: v.chatId };
}),
});
const svc = new AiChatRunService(repo as never, makeEnv() as never);
const results = await Promise.allSettled([
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
svc.beginRun({ chatId: 'chat-1', workspaceId: 'ws-1', userId: 'user-1' }),
]);
const fulfilled = results.filter((r) => r.status === 'fulfilled');
const rejected = results.filter((r) => r.status === 'rejected');
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect((rejected[0] as PromiseRejectedResult).reason).toBeInstanceOf(
RunAlreadyActiveError,
);
// Exactly the winner is locally active.
expect(svc.isLocallyActive('run-win')).toBe(true);
});
it('a SUBSCRIBER detaching does NOT abort the run (only an explicit stop does)', async () => {
const repo = makeRepo();
const svc = new AiChatRunService(repo as never, makeEnv() as never);
const handle = await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
// Model a browser disconnect: nothing in the run service is told to stop.
// The signal the agent loop consumes must stay un-aborted and the run stays
// locally active — i.e. it keeps running server-side.
expect(handle.signal.aborted).toBe(false);
expect(svc.isLocallyActive('run-1')).toBe(true);
// markStopRequested was never called by a mere detach.
expect(repo.markStopRequested).not.toHaveBeenCalled();
});
it('requestStop aborts the live controller, marks the row, and reports true', async () => {
const repo = makeRepo();
const svc = new AiChatRunService(repo as never, makeEnv() as never);
const handle = await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
const aborted = jest.fn();
handle.signal.addEventListener('abort', aborted);
const result = await svc.requestStop('run-1', 'ws-1');
expect(result).toBe(true);
expect(handle.signal.aborted).toBe(true);
expect(aborted).toHaveBeenCalledTimes(1);
expect(repo.markStopRequested).toHaveBeenCalledWith('run-1', 'ws-1');
});
it('requestStop on a run this replica does NOT hold still marks the row (true)', async () => {
// e.g. after a restart, or a sibling replica owns the controller. The row is
// marked so the owning replica/sweep settles it; we report a stop took effect.
const repo = makeRepo({
markStopRequested: jest.fn(async () => ({ id: 'run-9' })),
});
const svc = new AiChatRunService(repo as never, makeEnv() as never);
const result = await svc.requestStop('run-9', 'ws-1');
expect(result).toBe(true);
expect(svc.isLocallyActive('run-9')).toBe(false);
});
it('requestStop still aborts the live controller when markStopRequested rejects (transient DB error)', async () => {
// F15: the in-memory abort is the ONLY thing that stops a run and must not be
// hostage to the audit write of stop_requested_at. A transient failure on
// markStopRequested must NOT prevent abort() nor make requestStop throw.
const warnSpy = jest
.spyOn(Logger.prototype, 'warn')
.mockImplementation(() => undefined);
const repo = makeRepo({
markStopRequested: jest.fn(async () => {
throw new Error('pool exhausted');
}),
});
const svc = new AiChatRunService(repo as never, makeEnv() as never);
const handle = await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
const aborted = jest.fn();
handle.signal.addEventListener('abort', aborted);
// Does NOT throw despite the DB write rejecting.
const result = await svc.requestStop('run-1', 'ws-1');
// The live turn was aborted even though the audit write failed...
expect(handle.signal.aborted).toBe(true);
expect(aborted).toHaveBeenCalledTimes(1);
expect(repo.markStopRequested).toHaveBeenCalledWith('run-1', 'ws-1');
// ...the catch branch logged the swallowed failure...
expect(warnSpy).toHaveBeenCalledTimes(1);
// ...and a stop is reported as having taken effect (the entry existed).
expect(result).toBe(true);
warnSpy.mockRestore();
});
it('requestStop on an already-settled run (nothing active) reports false', async () => {
const repo = makeRepo({
markStopRequested: jest.fn(async () => undefined),
});
const svc = new AiChatRunService(repo as never, makeEnv() as never);
const result = await svc.requestStop('run-done', 'ws-1');
expect(result).toBe(false);
});
it('finalizeRun settles the row to the mapped status with finishedAt and drops the in-memory entry', async () => {
const repo = makeRepo();
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
expect(svc.isLocallyActive('run-1')).toBe(true);
await svc.finalizeRun('run-1', 'ws-1', 'error', 'provider blew up');
expect(svc.isLocallyActive('run-1')).toBe(false);
expect(repo.update).toHaveBeenCalledWith(
'run-1',
'ws-1',
expect.objectContaining({
status: 'failed',
error: 'provider blew up',
finishedAt: expect.any(Date),
}),
);
});
it('finalizeRun is IDEMPOTENT: a second settle no-ops (single terminal write)', async () => {
// The #184 review fix: AiChatService.stream wraps the turn in a safety-net
// catch that settles a failed turn AND streamText's terminal callback may
// also settle — both routes call finalizeRun. Only the FIRST may write the
// terminal row; the second must no-op so a late settle can never clobber the
// real terminal status or double-write the row.
const repo = makeRepo();
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
await svc.finalizeRun('run-1', 'ws-1', 'error', 'first');
expect(svc.isLocallyActive('run-1')).toBe(false);
// A second settle (e.g. a streamText callback firing after the catch) no-ops.
await svc.finalizeRun('run-1', 'ws-1', 'completed', undefined);
expect(repo.update).toHaveBeenCalledTimes(1);
expect(repo.update).toHaveBeenCalledWith(
'run-1',
'ws-1',
expect.objectContaining({ status: 'failed', error: 'first' }),
);
});
it('CONCURRENCY: two simultaneous finalizeRun on the same run write the terminal row EXACTLY ONCE (the 2nd caller exits synchronously at the atomic claim)', async () => {
// The CRITICAL race: AiChatService.stream's safety-net catch settles the turn
// to 'error' while a streamText terminal callback also settles it — both call
// finalizeRun for the SAME runId. The once-gate must close ATOMICALLY: a
// `settled.has` check alone is read BEFORE the awaited UPDATE, so both callers
// would pass it and BOTH write the row (last-write-wins clobber + double
// write). The fix claims the run with a SYNCHRONOUS `active.delete` before any
// await, so the second caller returns in the same tick, before the UPDATE.
//
// We force the two calls to overlap by making `update` return a promise we
// resolve only AFTER both finalizeRun calls have run their synchronous bodies.
let resolveUpdate!: (v: unknown) => void;
const updateGate = new Promise((res) => {
resolveUpdate = res;
});
const update = jest.fn(() => updateGate);
const repo = makeRepo({ update });
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
// Fire both before the (pending) update resolves. The first synchronously
// claims the entry (active.delete) and awaits update; the second, started in
// the same macrotask, finds the entry already gone and returns at the claim
// WITHOUT ever calling update.
const p1 = svc.finalizeRun('run-1', 'ws-1', 'completed');
const p2 = svc.finalizeRun('run-1', 'ws-1', 'error', 'safety-net');
// The decisive assertion: exactly one caller reached the terminal UPDATE.
expect(update).toHaveBeenCalledTimes(1);
// Let the single in-flight update land; both calls resolve cleanly.
resolveUpdate({ id: 'run-1' });
await Promise.all([p1, p2]);
expect(update).toHaveBeenCalledTimes(1);
// The winner is the FIRST caller ('completed' -> 'succeeded'); the late
// 'error' settle never wrote, so it could not clobber the real status.
expect(update).toHaveBeenCalledWith(
'run-1',
'ws-1',
expect.objectContaining({ status: 'succeeded' }),
);
expect(svc.isLocallyActive('run-1')).toBe(false);
});
it('F6: a TRANSIENT terminal-write failure is ridden out by the bounded retry — the run is settled, not stranded', async () => {
// The bug: finalizeRun used to DROP the in-memory entry BEFORE the terminal
// UPDATE, then only warn-log a failure. A single transient blip (pool
// exhaustion / deadlock / connection hiccup) on that PK UPDATE left the row
// 'running' with nothing left to recover it -> every later turn in that chat
// 409s until a restart. The fix updates FIRST and retries.
let calls = 0;
const repo = makeRepo({
update: jest.fn(async () => {
calls += 1;
if (calls === 1) throw new Error('deadlock detected');
return { id: 'run-1' };
}),
});
jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
await svc.finalizeRun('run-1', 'ws-1', 'completed');
// The retry landed the terminal write: the entry is dropped (slot freed) and
// the row carries the real terminal status — NOT stranded at 'running'.
expect(svc.isLocallyActive('run-1')).toBe(false);
expect(repo.update).toHaveBeenCalledTimes(2);
expect(repo.update).toHaveBeenLastCalledWith(
'run-1',
'ws-1',
expect.objectContaining({ status: 'succeeded' }),
);
});
it('F6: if the terminal write keeps failing, the entry is RETAINED and a LATER settle completes it (chat not permanently 409d)', async () => {
// Worst case: the DB is down for the whole first finalize (all attempts fail).
// The run must NOT be silently lost — the entry stays so a subsequent settle
// (a streamText callback, requestStop -> onAbort, or a future sweep) can retry.
let healthy = false;
const repo = makeRepo({
update: jest.fn(async () => {
if (!healthy) throw new Error('pool exhausted');
return { id: 'run-1' };
}),
});
jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined);
const errorSpy = jest
.spyOn(Logger.prototype, 'error')
.mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await svc.beginRun({
chatId: 'chat-1',
workspaceId: 'ws-1',
userId: 'user-1',
});
// First settle: every bounded attempt fails -> entry retained, NOT settled.
await svc.finalizeRun('run-1', 'ws-1', 'completed');
expect(svc.isLocallyActive('run-1')).toBe(true);
// F12: the give-up emits ONE explicit, greppable ERROR (run + chat context)
// so an operator can tell "gave up, run held in memory" from a per-attempt
// blip — distinct from the per-attempt warns.
const gaveUp = errorSpy.mock.calls.some(
(c) =>
/NON-TERMINAL/.test(String(c[0])) &&
/run-1/.test(String(c[0])) &&
/chat-1/.test(String(c[0])),
);
expect(gaveUp).toBe(true);
// The DB recovers; a later settle now succeeds and frees the slot.
healthy = true;
await svc.finalizeRun('run-1', 'ws-1', 'completed');
expect(svc.isLocallyActive('run-1')).toBe(false);
expect(repo.update).toHaveBeenLastCalledWith(
'run-1',
'ws-1',
expect.objectContaining({ status: 'succeeded' }),
);
// And it is now idempotent: a further settle no-ops (terminal row already
// written), so a double-settle can never clobber the real status.
const callsBefore = repo.update.mock.calls.length;
await svc.finalizeRun('run-1', 'ws-1', 'error', 'late');
expect(repo.update).toHaveBeenCalledTimes(callsBefore);
});
it('recordStep / linkAssistantMessage are best-effort: a repo failure is swallowed', async () => {
const repo = makeRepo({
update: jest.fn(async () => {
throw new Error('transient');
}),
});
jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined);
const svc = new AiChatRunService(repo as never, makeEnv() as never);
await expect(svc.recordStep('run-1', 'ws-1', 3)).resolves.toBeUndefined();
await expect(
svc.linkAssistantMessage('run-1', 'ws-1', 'msg-1'),
).resolves.toBeUndefined();
});
});
@@ -0,0 +1,452 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo';
import { AiChatRun } from '@docmost/db/types/entity.types';
import { isUniqueViolation, violatedConstraint } from '@docmost/db/utils';
import { EnvironmentService } from '../../integrations/environment/environment.service';
/** Name of the partial unique index enforcing "one active run per chat" (see the
* ai_chat_runs migration). A 23505 on THIS constraint is the race-safe signal
* that a concurrent turn already owns the chat distinct from any other unique
* collision, which must NOT be silently treated as "already active". */
export const ONE_ACTIVE_RUN_PER_CHAT_INDEX = 'ai_chat_runs_one_active_per_chat';
/**
* Thrown by {@link AiChatRunService.beginRun} when the run-row INSERT loses the
* race for a chat's single active slot (the partial unique index rejects it with
* a 23505). This is the AUTHORITATIVE concurrency gate: the controller's cheap
* pre-check is only a fast-path, and a request that slips past it must NOT run
* untracked. The caller (AiChatService.stream) translates this into a 409 and
* aborts the turn BEFORE any AI/provider call.
*/
export class RunAlreadyActiveError extends Error {
constructor(public readonly chatId: string) {
super(`An agent run is already in progress for chat ${chatId}`);
this.name = 'RunAlreadyActiveError';
}
}
/**
* The terminal status of a TURN (the #183 assistant-row lifecycle) maps onto the
* terminal status of a RUN (#184). A turn that completed -> the run succeeded; a
* turn that errored -> the run failed; a turn aborted (explicit user stop) -> the
* run aborted. Pure + unit-testable.
*/
export type TurnTerminalStatus = 'completed' | 'error' | 'aborted';
export type RunTerminalStatus = 'succeeded' | 'failed' | 'aborted';
export function mapTurnStatusToRun(
status: TurnTerminalStatus,
): RunTerminalStatus {
switch (status) {
case 'completed':
return 'succeeded';
case 'error':
return 'failed';
case 'aborted':
return 'aborted';
}
}
/** An in-flight run held in process memory: its AbortController is the ONLY thing
* that can stop the turn (an explicit user stop), independent of the browser
* socket. A mere disconnect never touches it, so the run keeps going. */
interface ActiveRun {
controller: AbortController;
chatId: string;
workspaceId: string;
}
/** The live handle the streaming path drives a run through (returned by
* {@link AiChatRunService.beginRun}). The `signal` governs the agent loop's
* abort wired to the run, NOT to the HTTP socket. */
export interface RunHandle {
runId: string;
signal: AbortSignal;
}
/**
* AiChatRunService (#184 phase 1) owns the agent RUN as a first-class,
* server-side lifecycle object detached from the HTTP request / browser window.
*
* Responsibilities:
* - create a run row when a turn starts (inserted directly as 'running'; the
* 'pending' status is only the column default + a reserved value, never
* written by code in phase 1) and register an in-memory AbortController for it
* (the explicit-stop lever);
* - finalize the run row (succeeded / failed / aborted) and unregister it;
* - service an EXPLICIT user stop (`requestStop`) the ONLY thing that aborts a
* run; a browser disconnect deliberately does NOT;
* - crash-recovery sweep of dangling runs on startup.
*
* The agent loop itself still runs in AiChatService.stream (reusing #183's
* step-granular durable write path, `consumeStream` already drains it independent
* of the socket); this service only wraps it in a durable lifecycle and an
* abort handle that outlives the subscriber.
*/
@Injectable()
export class AiChatRunService implements OnModuleInit {
private readonly logger = new Logger(AiChatRunService.name);
// runId -> ActiveRun. Process-local on purpose (phase 1 is single-process /
// in-memory transport; a cross-process BullMQ runner + Redis stop-signal is
// deferred to phase 2). A stop for a runId not in this map (e.g. after a
// restart) still records `stop_requested_at` on the row.
private readonly active = new Map<string, ActiveRun>();
// runIds whose TERMINAL row write has SUCCEEDED — the idempotency once-gate
// (F6). A finalize must short-circuit only AFTER the terminal write has landed,
// NOT merely after the in-memory entry was dropped: a transient UPDATE failure
// has to stay retryable, so "already settled" means "row already terminal", not
// "entry already gone". Grows by one short UUID per finished run over process
// uptime — negligible in phase 1's single process.
private readonly settled = new Set<string>();
// Bounded retry for the terminal write (F6): a single PK UPDATE can fail
// transiently under many fire-and-forget writes (pool exhaustion, deadlock, a
// brief connection blip). Riding out that blip in-place matters because the
// dominant success path (streamText onFinish) settles exactly ONCE — if that
// write is dropped and never retried, the row is stranded 'running' and the
// one-active-run gate 409s every future turn in the chat until a restart (no
// periodic sweep in phase 1).
private static readonly FINALIZE_MAX_ATTEMPTS = 3;
private static readonly FINALIZE_RETRY_BASE_MS = 50;
constructor(
private readonly runRepo: AiChatRunRepo,
private readonly environment: EnvironmentService,
) {}
/**
* Crash-recovery sweep on server start: settle EVERY run still left
* pending/running to 'aborted' (F1 / DECISION C). The boot sweep is
* UNCONDITIONAL no staleness window because phase 1 is single-process: on a
* fresh boot any pending|running run is definitionally hung (no live runner owns
* it), so even a fast restart (deploy/OOM within minutes of the last step) can
* no longer leave a run stuck 'running' forever (which would make the
* one-active-run gate 409 every future turn in that chat). The staleness window
* is reintroduced only for the phase-2 multi-instance timer sweep, where a
* booting replica must not abort a run another replica is actively executing.
* Best-effort a sweep failure is logged but MUST NOT block startup (mirrors
* AiChatService.onModuleInit for #183).
*/
async onModuleInit(): Promise<void> {
this.warnIfMultiInstance();
try {
// No `staleMs`: unconditional boot sweep (F1). See AiChatRunRepo.sweepRunning.
const swept = await this.runRepo.sweepRunning();
if (swept > 0) {
this.logger.log(
`Startup sweep: marked ${swept} dangling agent run(s) as 'aborted'.`,
);
}
} catch (err) {
this.logger.warn(
`Startup sweep of dangling runs failed: ${
err instanceof Error ? err.message : 'unknown error'
}`,
);
}
}
/**
* F2 (DECISION A): autonomous runs are SINGLE-INSTANCE-ONLY in phase 1. An
* explicit Stop, and the in-memory AbortController that backs it, are
* process-local: a Stop only aborts the live turn if it lands on the SAME
* replica that owns the run (it still stamps `stop_requested_at` cross-instance,
* but nothing reads that flag during an active run yet). Cross-instance pub/sub
* stop is phase 2. So if the deployment is horizontally scaled, warn loudly at
* startup that a Stop may not reach a run executing on another replica.
*
* DETECTION: this codebase always wires the socket.io Redis adapter (REDIS_URL
* is mandatory), so the adapter alone is NOT a horizontal-scaling signal. The
* authoritative signal the codebase has is `CLOUD=true` (EnvironmentService
* .isCloud()), the Docmost-cloud multi-replica deployment. We warn whenever that
* is set, because any workspace could enable settings.ai.autonomousRuns. A
* self-hosted operator running multiple replicas behind a load balancer is also
* multi-instance; the deploy docs (.env.example / AGENTS.md) spell out the
* single-instance constraint for that case.
*/
private warnIfMultiInstance(): void {
if (this.environment.isCloud()) {
this.logger.warn(
'Autonomous agent runs (settings.ai.autonomousRuns) are SINGLE-INSTANCE-ONLY ' +
'in phase 1: a horizontally-scaled deployment was detected (CLOUD=true). ' +
'An explicit Stop only aborts a run executing on the same replica that owns ' +
'it (cross-instance Stop is not yet reliable — phase 2). Run a single ' +
'instance if you enable autonomousRuns, or keep the flag off.',
);
}
}
/**
* Start a run for a turn: insert the run row (status 'running', startedAt now),
* register a fresh AbortController for it, and return a {@link RunHandle} whose
* `signal` the agent loop uses. The DB partial unique index guarantees at most
* one active run per chat a second concurrent start on the same chat REJECTS
* at the insert (a 23505 on {@link ONE_ACTIVE_RUN_PER_CHAT_INDEX}). That
* rejection is the AUTHORITATIVE race gate: it is surfaced as a distinct
* {@link RunAlreadyActiveError} (NOT swallowed), so the caller turns it into a
* 409 and never streams an untracked turn. The controller is registered AFTER a
* successful insert so a rejected start leaks nothing.
*/
async beginRun(args: {
chatId: string;
workspaceId: string;
userId: string;
trigger?: string;
}): Promise<RunHandle> {
let run: AiChatRun;
try {
run = await this.runRepo.insert({
chatId: args.chatId,
workspaceId: args.workspaceId,
createdBy: args.userId,
trigger: args.trigger ?? 'user',
status: 'running',
startedAt: new Date(),
});
} catch (err) {
// The race backstop: a concurrent turn already holds this chat's single
// active slot, so the partial unique index rejected our insert. Surface a
// distinct signal — the caller MUST reject this turn (409), not run it
// untracked. Any OTHER error propagates unchanged.
if (
isUniqueViolation(err) &&
violatedConstraint(err) === ONE_ACTIVE_RUN_PER_CHAT_INDEX
) {
throw new RunAlreadyActiveError(args.chatId);
}
throw err;
}
const controller = new AbortController();
this.active.set(run.id, {
controller,
chatId: args.chatId,
workspaceId: args.workspaceId,
});
return { runId: run.id, signal: controller.signal };
}
/** Link the assistant message (the #183 projection) to its run. Best-effort. */
async linkAssistantMessage(
runId: string,
workspaceId: string,
assistantMessageId: string,
): Promise<void> {
try {
await this.runRepo.update(runId, workspaceId, { assistantMessageId });
} catch (err) {
this.logger.warn(
`Failed to link assistant message to run ${runId}: ${
err instanceof Error ? err.message : 'unknown error'
}`,
);
}
}
/** Persist progress: bump the run's finished-step count. Best-effort (never
* blocks or breaks the stream). */
async recordStep(
runId: string,
workspaceId: string,
stepCount: number,
): Promise<void> {
try {
await this.runRepo.update(runId, workspaceId, { stepCount });
} catch (err) {
this.logger.warn(
`Failed to record step for run ${runId}: ${
err instanceof Error ? err.message : 'unknown error'
}`,
);
}
}
/**
* Finalize a run to its terminal status (succeeded / failed / aborted),
* stamping finishedAt + any error. Best-effort, but ROBUST against a transient
* terminal-write failure (F6) AND atomically safe against a concurrent settle.
*
* ATOMIC ONCE-CLAIM (the gate must close in ONE synchronous tick): two
* finalizeRun calls for the SAME run can race the documented real path is
* AiChatService.stream's safety-net catch settling the turn to 'error' while a
* streamText terminal callback (onFinish/onAbort/onError) ALSO settles it. The
* `settled.has` check alone is NOT a gate: it is read BEFORE the awaited UPDATE,
* so two callers can both see `false` and both write the row (last-write-wins
* clobbers the real terminal status, and the bounded retry only widens that
* window). The claim therefore happens via `active.delete`, a SYNCHRONOUS
* check-and-clear with NO await between the gate and the entry removal: the
* second concurrent caller finds the entry already gone and returns in the same
* tick, before any UPDATE. The transition "nobody is finalizing" -> "I am
* finalizing" is thus a single atomic step.
*
* ORDER MATTERS (F6): once we own the claim, the terminal UPDATE happens FIRST;
* only once it SUCCEEDS do we record the run as settled. If the UPDATE fails on
* every bounded attempt we RESTORE the in-memory entry, leave the run UNsettled,
* and emit an ERROR signal that the row is left non-terminal 'running' (which
* would 409 every future turn in the chat until recovery). An in-process retry
* by a LATER settle is only POSSIBLE, never guaranteed: it needs (a) the entry
* to have been restored at the give-up path AND (b) a fresh settler to arrive
* AFTER that restore. A concurrent settler that arrives DURING the retry window
* while the entry is deleted for backoff and not yet restored is consumed at
* the synchronous `active.delete` claim (it finds nothing to delete and returns
* a no-op), so it does NOT become an in-process retrier. The NO-streamText path
* (the turn threw before streamText was wired, so ONLY the safety-net ever
* settles) likewise has no second in-process settler at all. The UNCONDITIONAL
* backstop in every case is the boot sweep on the next restart (phase 1 has no
* periodic in-process sweep); the retained entry is bounded (cleared on restart)
* and harmless meanwhile.
*
* IDEMPOTENT on SUCCESS (#184 review): the terminal write happens AT MOST ONCE
* per run. After a successful write the once-gate keys off {@link settled} (the
* terminal row already written) so a settle arriving AFTER the entry was already
* dropped-and-settled returns early; a settle racing the in-flight write is
* stopped earlier still, by the `active.delete` claim. Either way a genuine
* double-settle collapses to a single write and a late settle can never clobber
* the real terminal status or double-write the row.
*/
async finalizeRun(
runId: string,
workspaceId: string,
turnStatus: TurnTerminalStatus,
error?: string,
): Promise<void> {
// ---- Atomic once-claim (synchronous; NO await before the gate closes) ----
// Already terminally written -> idempotent no-op.
if (this.settled.has(runId)) return;
// Capture the entry BEFORE the delete so a total-failure path can restore it.
const entry = this.active.get(runId);
// SYNCHRONOUS check-and-clear: the FIRST caller deletes (claims) the entry;
// any concurrent SECOND caller finds nothing to delete and returns HERE, in
// the same tick, before any await — so it can never reach the UPDATE.
if (!this.active.delete(runId)) return;
let lastError: unknown;
for (
let attempt = 1;
attempt <= AiChatRunService.FINALIZE_MAX_ATTEMPTS;
attempt++
) {
try {
await this.runRepo.update(runId, workspaceId, {
status: mapTurnStatusToRun(turnStatus),
finishedAt: new Date(),
error: error ?? null,
});
// Terminal write landed: arm the once-gate. The entry is already gone
// (claimed above); we do NOT restore it. The slot is now free.
this.settled.add(runId);
return;
} catch (err) {
lastError = err;
this.logger.warn(
`Failed to finalize run ${runId} (attempt ${attempt}/${
AiChatRunService.FINALIZE_MAX_ATTEMPTS
}): ${err instanceof Error ? err.message : 'unknown error'}`,
);
if (attempt < AiChatRunService.FINALIZE_MAX_ATTEMPTS) {
await this.delay(AiChatRunService.FINALIZE_RETRY_BASE_MS * attempt);
}
}
}
// Every attempt failed: this is a give-up, materially worse than a per-attempt
// blip — the row is left NON-TERMINAL ('running'), so emit ONE explicit,
// greppable ERROR so an operator can tell "survived a blip" from "gave up, run
// held in memory until recovery" (the last warn alone says only "attempt 3/3").
this.logger.error(
`Run ${runId} (chat ${entry?.chatId ?? 'unknown'}) left NON-TERMINAL ` +
`('running'): terminal write failed after ${
AiChatRunService.FINALIZE_MAX_ATTEMPTS
} attempts; entry retained in memory, recovery deferred to next settle / ` +
`boot sweep`,
lastError,
);
// RESTORE the claimed entry (and leave the run UNsettled) so a LATER settle
// that arrives AFTER this restore MAY retry the terminal write — but that
// in-process retry is NOT guaranteed (a concurrent settler caught in the retry
// window above is consumed at the `active.delete` claim, and the no-streamText
// path has no second settler at all). The UNCONDITIONAL backstop in every case
// is the boot sweep on the next restart; the restored entry is bounded and
// cleared on restart.
if (entry) this.active.set(runId, entry);
}
/** Small async backoff between terminal-write retries (F6). Isolated so it is
* trivial to stub/fake-time in tests. */
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Request an EXPLICIT stop of a run (the user pressed Stop). This is the ONLY
* thing that aborts a run distinct from a browser disconnect, which leaves
* the run going. Aborts the in-process controller FIRST (the only thing that
* actually stops the run, if this replica owns it), then makes a best-effort
* attempt to stamp `stop_requested_at` that audit write stamps only while the
* row is active and may be skipped on a DB error or lost to the finalize race,
* which is acceptable since the row still settles as 'aborted'. Returns true
* when a stop took effect (row marked and/or controller aborted), false when
* there was nothing active to stop.
*/
async requestStop(runId: string, workspaceId: string): Promise<boolean> {
const entry = this.active.get(runId);
if (entry) {
// Abort the live turn FIRST -> streamText onAbort fires -> the partial is
// persisted (#183) and finalizeRun settles the row as 'aborted'. This is
// the ONLY thing that aborts a run, so it MUST NOT be hostage to the audit
// write below: a transient failure on `markStopRequested` (pool exhaustion,
// deadlock, dropped connection) must never leave the run executing despite
// an explicit Stop. At worst only the `stop_requested_at` timestamp is lost.
entry.controller.abort();
}
// Record `stop_requested_at` (best-effort). A transient DB failure here is
// logged and treated as `marked = false`; the abort above already took
// effect, so we never rethrow and skip stopping the run. Note: because
// markStopRequested only stamps while the row is active, aborting first means
// even a healthy write can lose the race against the resulting finalize and
// skip the stamp — acceptable, as the row still settles as 'aborted' and only
// this audit timestamp may be lost.
let marked: unknown;
try {
marked = await this.runRepo.markStopRequested(runId, workspaceId);
} catch (err) {
marked = undefined;
this.logger.warn(
`requestStop: markStopRequested failed for run ${runId} ` +
`(stop_requested_at not recorded); abort already issued: ` +
`${err instanceof Error ? err.message : String(err)}`,
);
}
return Boolean(marked) || Boolean(entry);
}
/** Latest persisted run for a chat the reconnect target (an in-flight or
* finished run). Pure read-through to the repo. */
getLatestForChat(
chatId: string,
workspaceId: string,
): Promise<AiChatRun | undefined> {
return this.runRepo.findLatestByChat(chatId, workspaceId);
}
/** Fetch a run by id (workspace-scoped). Used to resolve + ownership-check an
* explicit stop targeting a runId. */
getRun(runId: string, workspaceId: string): Promise<AiChatRun | undefined> {
return this.runRepo.findById(runId, workspaceId);
}
/** The active run on a chat, if any (used to reject a concurrent start with a
* clean 409 before committing to the stream). */
getActiveForChat(
chatId: string,
workspaceId: string,
): Promise<AiChatRun | undefined> {
return this.runRepo.findActiveByChat(chatId, workspaceId);
}
/** Test/diagnostic seam: whether this replica is holding a live controller for
* the run. */
isLocallyActive(runId: string): boolean {
return this.active.has(runId);
}
}
@@ -25,6 +25,7 @@ describe('AiChatController.boundChat', () => {
};
const controller = new AiChatController(
{} as never,
{} as never, // aiChatRunService
aiChatRepo as never,
{} as never,
{} as never,
@@ -53,6 +53,7 @@ describe('AiChatController.export', () => {
};
const controller = new AiChatController(
{} as never,
{} as never, // aiChatRunService
aiChatRepo as never,
aiChatMessageRepo as never,
{} as never,
@@ -0,0 +1,164 @@
import { BadRequestException, ForbiddenException } from '@nestjs/common';
import { AiChatController } from './ai-chat.controller';
import type { User, Workspace } from '@docmost/db/types/entity.types';
/**
* Wiring spec for the #184 run-reconnect / run-stop endpoints
* (`POST /ai-chat/run` and `POST /ai-chat/stop`). Both are OWNER-gated via
* assertOwnedChat (the requesting user must own the chat) and NOT flag-gated.
* Exercised with hand-rolled mocks no Nest graph, no DB. The controller's
* constructor order is (aiChatService, aiChatRunService, aiChatRepo,
* aiChatMessageRepo, aiTranscription).
*/
describe('AiChatController run endpoints (#184)', () => {
const user = { id: 'u1' } as User;
const workspace = { id: 'ws1' } as Workspace;
function makeController(opts: {
chat?: unknown; // what aiChatRepo.findById returns (owner-gate)
run?: unknown; // getLatestForChat / getRun result
activeRun?: unknown; // getActiveForChat result
message?: unknown; // aiChatMessageRepo.findById result
stopped?: boolean; // requestStop result
}) {
const aiChatRunService = {
getLatestForChat: jest.fn().mockResolvedValue(opts.run),
getRun: jest.fn().mockResolvedValue(opts.run),
getActiveForChat: jest.fn().mockResolvedValue(opts.activeRun),
requestStop: jest.fn().mockResolvedValue(opts.stopped ?? false),
};
const aiChatRepo = {
findById: jest.fn().mockResolvedValue(opts.chat),
};
const aiChatMessageRepo = {
findById: jest.fn().mockResolvedValue(opts.message),
};
const controller = new AiChatController(
{} as never, // aiChatService
aiChatRunService as never,
aiChatRepo as never,
aiChatMessageRepo as never,
{} as never, // aiTranscription
{} as never, // pageRepo
);
return { controller, aiChatRunService, aiChatRepo, aiChatMessageRepo };
}
describe('POST /ai-chat/run (getRun)', () => {
it('owner-gates: a chat the user does not own throws ForbiddenException', async () => {
const { controller, aiChatRunService } = makeController({
chat: { id: 'c1', creatorId: 'someone-else' },
});
await expect(
controller.getRun({ chatId: 'c1' }, user, workspace),
).rejects.toBeInstanceOf(ForbiddenException);
// It must NOT reach the run lookup once the owner-gate fails.
expect(aiChatRunService.getLatestForChat).not.toHaveBeenCalled();
});
it('returns { run: null, message: null } when the chat has never had a run', async () => {
const { controller, aiChatRunService } = makeController({
chat: { id: 'c1', creatorId: 'u1' },
run: undefined,
});
const res = await controller.getRun({ chatId: 'c1' }, user, workspace);
expect(res).toEqual({ run: null, message: null });
expect(aiChatRunService.getLatestForChat).toHaveBeenCalledWith(
'c1',
'ws1',
);
});
it('returns the run and its projected assistant message', async () => {
const run = { id: 'run-1', chatId: 'c1', assistantMessageId: 'm1' };
const message = { id: 'm1', role: 'assistant' };
const { controller, aiChatMessageRepo } = makeController({
chat: { id: 'c1', creatorId: 'u1' },
run,
message,
});
const res = await controller.getRun({ chatId: 'c1' }, user, workspace);
expect(res).toEqual({ run, message });
expect(aiChatMessageRepo.findById).toHaveBeenCalledWith('m1', 'ws1');
});
it('returns message: null when the run has no linked assistant message', async () => {
const run = { id: 'run-1', chatId: 'c1', assistantMessageId: null };
const { controller, aiChatMessageRepo } = makeController({
chat: { id: 'c1', creatorId: 'u1' },
run,
});
const res = await controller.getRun({ chatId: 'c1' }, user, workspace);
expect(res).toEqual({ run, message: null });
expect(aiChatMessageRepo.findById).not.toHaveBeenCalled();
});
});
describe('POST /ai-chat/stop (stopRun)', () => {
it('throws BadRequestException when neither runId nor chatId is given', async () => {
const { controller } = makeController({});
await expect(
controller.stopRun({}, user, workspace),
).rejects.toBeInstanceOf(BadRequestException);
});
it('stops by runId: owner-gates via the run’s chat, then requests the stop', async () => {
const { controller, aiChatRunService, aiChatRepo } = makeController({
run: { id: 'run-1', chatId: 'c1' },
chat: { id: 'c1', creatorId: 'u1' },
stopped: true,
});
const res = await controller.stopRun({ runId: 'run-1' }, user, workspace);
expect(res).toEqual({ stopped: true });
expect(aiChatRunService.getRun).toHaveBeenCalledWith('run-1', 'ws1');
expect(aiChatRepo.findById).toHaveBeenCalledWith('c1', 'ws1');
expect(aiChatRunService.requestStop).toHaveBeenCalledWith('run-1', 'ws1');
});
it('stops by runId: a foreign run’s chat throws ForbiddenException (no stop)', async () => {
const { controller, aiChatRunService } = makeController({
run: { id: 'run-1', chatId: 'c1' },
chat: { id: 'c1', creatorId: 'someone-else' },
});
await expect(
controller.stopRun({ runId: 'run-1' }, user, workspace),
).rejects.toBeInstanceOf(ForbiddenException);
expect(aiChatRunService.requestStop).not.toHaveBeenCalled();
});
it('stops by runId: an unknown run reports { stopped: false }', async () => {
const { controller, aiChatRunService } = makeController({
run: undefined,
});
const res = await controller.stopRun({ runId: 'gone' }, user, workspace);
expect(res).toEqual({ stopped: false });
expect(aiChatRunService.requestStop).not.toHaveBeenCalled();
});
it('stops by chatId: owner-gates, resolves the active run, requests the stop', async () => {
const { controller, aiChatRunService, aiChatRepo } = makeController({
chat: { id: 'c1', creatorId: 'u1' },
activeRun: { id: 'run-9' },
stopped: true,
});
const res = await controller.stopRun({ chatId: 'c1' }, user, workspace);
expect(res).toEqual({ stopped: true });
expect(aiChatRepo.findById).toHaveBeenCalledWith('c1', 'ws1');
expect(aiChatRunService.getActiveForChat).toHaveBeenCalledWith(
'c1',
'ws1',
);
expect(aiChatRunService.requestStop).toHaveBeenCalledWith('run-9', 'ws1');
});
it('stops by chatId: reports { stopped: false } when no run is active', async () => {
const { controller, aiChatRunService } = makeController({
chat: { id: 'c1', creatorId: 'u1' },
activeRun: undefined,
});
const res = await controller.stopRun({ chatId: 'c1' }, user, workspace);
expect(res).toEqual({ stopped: false });
expect(aiChatRunService.requestStop).not.toHaveBeenCalled();
});
});
});
@@ -1,6 +1,7 @@
import {
BadRequestException,
Body,
ConflictException,
Controller,
ForbiddenException,
HttpCode,
@@ -20,7 +21,13 @@ import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard';
import { AuthUser } from '../../common/decorators/auth-user.decorator';
import { AuthWorkspace } from '../../common/decorators/auth-workspace.decorator';
import { SkipTransform } from '../../common/decorators/skip-transform.decorator';
import { AiChat, User, Workspace } from '@docmost/db/types/entity.types';
import {
AiChat,
AiChatMessage,
AiChatRun,
User,
Workspace,
} from '@docmost/db/types/entity.types';
import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo';
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
@@ -28,7 +35,12 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo';
import { UserThrottlerGuard } from '../../integrations/throttle/user-throttler.guard';
import { AI_CHAT_THROTTLER } from '../../integrations/throttle/throttler-names';
import { FileInterceptor } from '../../common/interceptors/file.interceptor';
import { AiChatService, AiChatStreamBody } from './ai-chat.service';
import {
AiChatRunHooks,
AiChatService,
AiChatStreamBody,
} from './ai-chat.service';
import { AiChatRunService } from './ai-chat-run.service';
import { AiTranscriptionService } from './ai-transcription.service';
import {
BoundChatDto,
@@ -36,7 +48,9 @@ import {
ExportChatDto,
GeneratePageTitleDto,
GetChatMessagesDto,
GetRunDto,
RenameChatDto,
StopRunDto,
} from './dto/ai-chat.dto';
import { describeProviderError } from '../../integrations/ai/ai-error.util';
import { buildChatMarkdown } from './chat-markdown.util';
@@ -53,6 +67,7 @@ export class AiChatController {
constructor(
private readonly aiChatService: AiChatService,
private readonly aiChatRunService: AiChatRunService,
private readonly aiChatRepo: AiChatRepo,
private readonly aiChatMessageRepo: AiChatMessageRepo,
private readonly aiTranscription: AiTranscriptionService,
@@ -149,6 +164,75 @@ export class AiChatController {
return { markdown };
}
/**
* Reconnect to the latest run of a chat (#184 phase 1). Returns the run's
* persisted lifecycle state ({ status, error, stepCount, timings, ... }) plus
* the assistant message it projects (the partial/final output) the DB is the
* source of truth, so this works for an in-flight run (the browser dropped, the
* run kept going) and a finished one alike. Owner-gated via assertOwnedChat.
* `{ run: null }` when the chat has never had a run.
*/
@HttpCode(HttpStatus.OK)
@Post('run')
async getRun(
@Body() dto: GetRunDto,
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
): Promise<{ run: AiChatRun | null; message: AiChatMessage | null }> {
await this.assertOwnedChat(dto.chatId, user, workspace);
const run = await this.aiChatRunService.getLatestForChat(
dto.chatId,
workspace.id,
);
if (!run) return { run: null, message: null };
const message = run.assistantMessageId
? await this.aiChatMessageRepo.findById(
run.assistantMessageId,
workspace.id,
)
: undefined;
return { run, message: message ?? null };
}
/**
* Explicitly STOP an agent run (#184 phase 1) the user pressed Stop. This is
* the ONLY thing that ends a detached run; a browser disconnect deliberately
* does not. Target by `runId` (from the streamed start metadata) or by `chatId`
* (stop whatever run is active on it). Owner-gated. Returns
* `{ stopped }` false when there was nothing active to stop.
*/
@HttpCode(HttpStatus.OK)
@Post('stop')
async stopRun(
@Body() dto: StopRunDto,
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
): Promise<{ stopped: boolean }> {
let runId = dto.runId;
if (!runId && !dto.chatId) {
throw new BadRequestException('runId or chatId is required');
}
if (runId) {
// Resolve the run to its chat and owner-gate via that chat.
const run = await this.aiChatRunService.getRun(runId, workspace.id);
if (!run) return { stopped: false };
await this.assertOwnedChat(run.chatId, user, workspace);
} else {
await this.assertOwnedChat(dto.chatId!, user, workspace);
const active = await this.aiChatRunService.getActiveForChat(
dto.chatId!,
workspace.id,
);
if (!active) return { stopped: false };
runId = active.id;
}
const stopped = await this.aiChatRunService.requestStop(
runId,
workspace.id,
);
return { stopped };
}
/** Rename a chat. */
@HttpCode(HttpStatus.OK)
@Post('rename')
@@ -200,11 +284,20 @@ export class AiChatController {
@AuthWorkspace() workspace: Workspace,
): Promise<void> {
// A7 gate: the workspace must have AI chat explicitly enabled.
const settings = (workspace.settings ?? {}) as { ai?: { chat?: boolean } };
const settings = (workspace.settings ?? {}) as {
ai?: { chat?: boolean; autonomousRuns?: boolean };
};
if (settings.ai?.chat !== true) {
throw new ForbiddenException('AI chat is disabled');
}
// #184 phase 1 flag: when ON, the turn becomes a detached, durable RUN — its
// lifecycle is tracked in ai_chat_runs, a browser disconnect no longer aborts
// it, and only an explicit /ai-chat/stop ends it. When OFF (the default) the
// turn is socket-bound exactly as before, so existing deployments are
// unaffected.
const autonomousRuns = settings.ai?.autonomousRuns === true;
const sessionId = (req.raw as { sessionId?: string }).sessionId;
if (!sessionId) {
// The chat requires an interactive session to mint loopback tokens
@@ -228,6 +321,58 @@ export class AiChatController {
// HttpException) instead of breaking mid-stream.
const model = await this.aiChatService.getChatModel(workspace.id, role);
// #184: one active run per chat. For an EXISTING chat reject a concurrent
// start with a clean 409 BEFORE hijack (the common double-submit / second-tab
// case), so the user gets JSON, not a mid-stream error. A brand-new chat
// (no chatId) cannot have a prior run, and the DB partial unique index is the
// backstop against any race that slips past this check.
if (autonomousRuns && body.chatId) {
const active = await this.aiChatRunService.getActiveForChat(
body.chatId,
workspace.id,
);
if (active) {
throw new ConflictException({
message: 'An agent run is already in progress for this chat',
code: 'A_RUN_ALREADY_ACTIVE',
});
}
}
// Run-lifecycle hooks (#184), only when the flag is on. They wrap the turn in
// a durable run whose abort is governed by the run (explicit stop), persist
// its progress, and settle its terminal status — see AiChatRunService.
const runHooks: AiChatRunHooks | undefined = autonomousRuns
? {
begin: (chatId) =>
this.aiChatRunService.beginRun({
chatId,
workspaceId: workspace.id,
userId: user.id,
trigger: 'user',
}),
onAssistantSeeded: (runId, messageId) =>
this.aiChatRunService.linkAssistantMessage(
runId,
workspace.id,
messageId,
),
onStep: (runId, stepCount) =>
void this.aiChatRunService.recordStep(
runId,
workspace.id,
stepCount,
),
onSettled: (runId, status, error) =>
this.aiChatRunService.finalizeRun(
runId,
workspace.id,
status,
error,
),
}
: undefined;
// Abort the agent loop when the client disconnects. `close` also fires on
// normal completion, so only abort when the response has not finished
// writing (a genuine disconnect). `once` fires at most once and self-removes;
@@ -242,18 +387,44 @@ export class AiChatController {
// A genuine disconnect leaves the response unfinished (unlike a normal
// completion, which also fires `close`). Such a drop — e.g. a reverse
// proxy cutting the SSE mid-answer — is otherwise invisible server-side,
// so log it here before aborting the agent loop.
// so log it here.
if (!res.raw.writableEnded) {
this.logger.warn(
`AI chat stream: client disconnected before completion; aborting turn ` +
`(elapsed=${Date.now() - reqStartedAt}ms since request received)`,
);
controller.abort();
if (autonomousRuns) {
// #184: the turn is a DETACHED run. A disconnect must NOT abort it —
// the run keeps executing and persisting server-side; the client
// reconnects via /ai-chat/run (or re-stops via /ai-chat/stop). Log only.
this.logger.log(
`AI chat stream: client disconnected; run continues server-side ` +
`(elapsed=${Date.now() - reqStartedAt}ms since request received)`,
);
} else {
this.logger.warn(
`AI chat stream: client disconnected before completion; aborting turn ` +
`(elapsed=${Date.now() - reqStartedAt}ms since request received)`,
);
controller.abort();
}
}
};
req.raw.once('close', onClose);
res.raw.once('finish', () => req.raw.off('close', onClose));
// #184: in detached mode the turn is NOT aborted on disconnect, so the SDK's
// pipe keeps writing to a socket the client may have dropped — for the rest of
// the (continuing) run. A write to the dead socket can emit an 'error' on the
// raw response; without a listener that surfaces as an unhandled error event.
// Swallow it (the run continues server-side regardless). Legacy mode aborts on
// disconnect, so it does not need this and keeps its exact prior behavior.
if (autonomousRuns) {
res.raw.on('error', (err) => {
this.logger.debug(
`AI chat detached stream: post-disconnect socket error swallowed: ${
err instanceof Error ? err.message : String(err)
}`,
);
});
}
// Commit to streaming: hijack so Fastify stops managing the response and
// the AI SDK can write the UI-message stream directly to the Node socket.
res.hijack();
@@ -268,15 +439,32 @@ export class AiChatController {
signal: controller.signal,
model,
role,
// #184: present only when the flag is on; wraps the turn in a durable run.
runHooks,
});
} catch (err) {
// Any failure AFTER hijack can no longer send a clean JSON error, so emit
// a minimal error on the raw socket if nothing has been written yet.
this.logger.error('AI chat stream failed', err as Error);
// Any failure AFTER hijack can no longer go through Nest's exception
// filter, so emit the error on the raw socket if nothing has been written
// yet. The lost-the-race 409 (RunAlreadyActiveError -> ConflictException)
// is raised by stream() BEFORE it writes a byte, so headers are still
// unsent here: honor the HttpException's real status + body (a clean 409),
// not a blanket 500. Everything else stays a 500.
const isHttp = err instanceof HttpException;
if (!isHttp) {
this.logger.error('AI chat stream failed', err as Error);
}
if (!res.raw.headersSent) {
res.raw.statusCode = 500;
const status = isHttp ? err.getStatus() : 500;
const payload = isHttp
? err.getResponse()
: { error: 'Internal server error' };
res.raw.statusCode = status;
res.raw.setHeader('Content-Type', 'application/json');
res.raw.end(JSON.stringify({ error: 'Internal server error' }));
res.raw.end(
JSON.stringify(
typeof payload === 'string' ? { message: payload } : payload,
),
);
} else if (!res.raw.writableEnded) {
res.raw.end();
}
@@ -57,6 +57,7 @@ describe('AiChatController.generatePageTitle', () => {
const aiChatService = { generatePageTitle: generate };
const controller = new AiChatController(
aiChatService as never,
{} as never, // aiChatRunService
{} as never,
{} as never,
{} as never,
@@ -3,6 +3,7 @@ import { AiModule } from '../../integrations/ai/ai.module';
import { TokenModule } from '../auth/token.module';
import { AiChatController } from './ai-chat.controller';
import { AiChatService } from './ai-chat.service';
import { AiChatRunService } from './ai-chat-run.service';
import { AiTranscriptionService } from './ai-transcription.service';
import { AiChatToolsService } from './tools/ai-chat-tools.service';
import { EmbeddingModule } from './embedding/embedding.module';
@@ -42,6 +43,7 @@ import { PublicShareChatToolsService } from './tools/public-share-chat-tools.ser
controllers: [AiChatController, PublicShareChatController],
providers: [
AiChatService,
AiChatRunService,
AiTranscriptionService,
AiChatToolsService,
PublicShareChatService,
@@ -1,4 +1,8 @@
import { buildSystemPrompt, buildMcpToolingBlock } from './ai-chat.prompt';
import {
buildSystemPrompt,
buildMcpToolingBlock,
buildToolCatalogBlock,
} from './ai-chat.prompt';
import { Workspace } from '@docmost/db/types/entity.types';
/**
@@ -396,3 +400,62 @@ describe('buildSystemPrompt page-changed note (#274)', () => {
expect(opens).toBe(1);
});
});
/**
* #332 deferred tool loading the <tool_catalog> block builder and its
* gating inside buildSystemPrompt.
*/
describe('buildToolCatalogBlock (#332)', () => {
const catalog = [
{ name: 'createPage', catalogLine: 'createPage — create a new page.' },
{ name: 'transformPage', catalogLine: 'transformPage — run a JS transform.' },
];
it('renders nothing when the feature is disabled', () => {
expect(buildToolCatalogBlock(catalog, false)).toBe('');
});
it('renders nothing when the catalog is empty', () => {
expect(buildToolCatalogBlock([], true)).toBe('');
expect(buildToolCatalogBlock(undefined, true)).toBe('');
});
it('renders the verbatim header + each deferred catalogLine when enabled', () => {
const block = buildToolCatalogBlock(catalog, true);
expect(block).toContain('<tool_catalog note="deferred tools;');
expect(block).toContain('NEVER tell the user you lack a capability');
expect(block).toContain('Deferred tools (name — purpose):');
expect(block).toContain('- createPage — create a new page.');
expect(block).toContain('- transformPage — run a JS transform.');
expect(block).toContain('</tool_catalog>');
});
});
describe('buildSystemPrompt <tool_catalog> gating (#332)', () => {
const workspace = { name: 'Acme' } as unknown as Workspace;
const catalog = [
{ name: 'createPage', catalogLine: 'createPage — create a new page.' },
];
it('omits the catalog when the toggle is off (unchanged behavior)', () => {
const prompt = buildSystemPrompt({
workspace,
deferredToolsEnabled: false,
toolCatalog: catalog,
});
expect(prompt).not.toContain('<tool_catalog');
expect(prompt).not.toContain('createPage — create a new page.');
});
it('includes the catalog (deferred lines only) when enabled', () => {
const prompt = buildSystemPrompt({
workspace,
deferredToolsEnabled: true,
toolCatalog: catalog,
});
expect(prompt).toContain('<tool_catalog');
expect(prompt).toContain('createPage — create a new page.');
// A core tool line is never in the catalog (the caller passes deferred only).
expect(prompt).not.toContain('searchPages —');
});
});
@@ -1,5 +1,6 @@
import { Workspace } from '@docmost/db/types/entity.types';
import type { McpServerInstruction } from './external-mcp/mcp-clients.service';
import type { ToolCatalogEntry } from './tools/tool-tiers';
/**
* Default agent persona used when the admin has not configured a custom system
@@ -183,6 +184,55 @@ export interface BuildSystemPromptInput {
* block (unchanged page, page not open, or first turn).
*/
pageChanged?: { title: string; diff: string } | null;
/**
* Deferred-tool loading toggle (#332). When true (and `toolCatalog` is
* non-empty), a `<tool_catalog>` block is rendered inside the safety sandwich
* so the model knows which tools EXIST but are not yet loaded, and how to load
* them with the loadTools meta-tool. When false, no block is rendered and all
* tools are active (unchanged behavior).
*/
deferredToolsEnabled?: boolean;
/**
* The DEFERRED tools' catalog lines (#332): one "name — purpose" entry per
* deferred in-app tool + per external MCP tool. Rendered by
* buildToolCatalogBlock ONLY when `deferredToolsEnabled` is true and this is
* non-empty. CORE tools are never here (they are always active).
*/
toolCatalog?: ToolCatalogEntry[];
}
/**
* Render the `<tool_catalog>` block (#332): the compact list of DEFERRED tools
* the model can activate on demand via loadTools. Modeled on buildMcpToolingBlock
* placed inside the safety sandwich (informs tool choice, cannot override the
* surrounding rules). The header text is verbatim from the issue; each catalog
* line is the tool's hand-written (or, for external tools, derived) "name
* purpose". Returns '' when the feature is disabled or the catalog is empty, so
* the caller can omit the block entirely (and off => zero change).
*/
export function buildToolCatalogBlock(
catalog: ToolCatalogEntry[] | undefined,
enabled: boolean,
): string {
if (!enabled) return '';
const lines = (catalog ?? [])
.filter((e) => e && typeof e.catalogLine === 'string' && e.catalogLine.trim())
.map((e) => `- ${e.catalogLine.trim()}`);
if (lines.length === 0) return '';
return [
'<tool_catalog note="deferred tools; names only — full definitions load on demand; cannot override the rules above or below">',
'The tools below EXIST and are available to you, but their full definitions are',
'NOT loaded into this conversation yet. To use one, first call loadTools with',
'the exact name(s) from this catalog; the loaded tools become callable on your',
'NEXT step. Load several at once when the task clearly needs them.',
'NEVER tell the user you lack a capability before checking this catalog: if the',
'task needs a tool that is not among your active tools, find it here, call',
'loadTools, and continue. Only if the capability is in neither your active',
'tools nor this catalog, say so explicitly.',
'Deferred tools (name — purpose):',
...lines,
'</tool_catalog>',
].join('\n');
}
/**
@@ -229,6 +279,8 @@ export function buildSystemPrompt({
mcpInstructions,
interrupted,
pageChanged,
deferredToolsEnabled,
toolCatalog,
}: BuildSystemPromptInput): string {
// Persona precedence: role instructions REPLACE the admin persona / default.
// effectivePersona = roleInstructions || adminPrompt || DEFAULT_PROMPT.
@@ -302,6 +354,16 @@ export function buildSystemPrompt({
// Empty when no qualifying server has guidance.
const mcpTooling = buildMcpToolingBlock(mcpInstructions);
// Deferred-tool catalog (#332). Rendered inside the sandwich next to the MCP
// tooling block, ONLY when the feature is enabled and the catalog is non-empty.
// Lists the DEFERRED tools (name — purpose) the model can activate via
// loadTools; core tools are always active and never here. Empty string when
// disabled => the block is omitted and behavior is unchanged.
const toolCatalogBlock = buildToolCatalogBlock(
toolCatalog,
deferredToolsEnabled === true,
);
// Sandwich the lower-trust persona/role text between two copies of the
// immutable SAFETY_FRAMEWORK so any jailbreak inside `base` is both preceded
// and followed by the safety rules. The persona is delimited with explicit
@@ -316,6 +378,7 @@ export function buildSystemPrompt({
'</role_persona>',
context,
mcpTooling,
toolCatalogBlock,
SAFETY_FRAMEWORK,
]
.filter((part) => part !== '')
@@ -53,6 +53,7 @@ describe('AiChatService.resolveRoleForRequest', () => {
aiAgentRoleRepo as never,
{} as never, // pageRepo
{} as never, // pageAccess
{} as never, // environment
);
return { service, aiChatRepo, aiAgentRoleRepo };
}
@@ -1,5 +1,7 @@
import { Logger } from '@nestjs/common';
import { AiChatService } from './ai-chat.service';
import { AiChatService, AiChatRunHooks } from './ai-chat.service';
import { AiChatRunService } from './ai-chat-run.service';
import type { User, Workspace } from '@docmost/db/types/entity.types';
/**
* Lifecycle unit tests for AiChatService.onModuleInit (#183 crash-recovery
@@ -22,6 +24,7 @@ describe('AiChatService.onModuleInit (startup sweep)', () => {
{} as never, // aiAgentRoleRepo
{} as never, // pageRepo
{} as never, // pageAccess
{} as never, // environment
);
return { service, aiChatMessageRepo };
}
@@ -60,3 +63,99 @@ describe('AiChatService.onModuleInit (startup sweep)', () => {
expect(String(warnSpy.mock.calls[0][0])).toContain('db unavailable');
});
});
/**
* #184 CRITICAL run-lifecycle safety net (review fix). A transient failure
* AFTER a successful beginRun but BEFORE streamText's terminal callbacks own the
* lifecycle must STILL settle the run otherwise the run row is stuck 'running'
* forever (sweepRunning only runs at startup) and the partial unique index + the
* controller pre-check 409 every future turn in that chat until a restart. Here
* we model the very first bare await after beginRun (the user-message insert)
* throwing, wiring the run hooks to a REAL AiChatRunService (mock repo) exactly
* as the controller does, and assert the run is settled to 'error' and its
* in-memory entry dropped (so a follow-up turn would NOT be 409'd).
*/
describe('AiChatService.stream run-lifecycle safety net (#184)', () => {
const user = { id: 'u1' } as User;
const workspace = { id: 'ws1' } as Workspace;
afterEach(() => jest.restoreAllMocks());
it('an exception after beginRun settles the run to error and drops the in-memory entry', async () => {
jest.spyOn(Logger.prototype, 'error').mockImplementation(() => undefined);
// Real run service over a mock repo, so finalizeRun's in-memory bookkeeping
// (active.delete) is exercised for real.
const runRepo = {
insert: jest.fn().mockResolvedValue({ id: 'run-1', status: 'running' }),
update: jest.fn().mockResolvedValue({ id: 'run-1' }),
};
const runService = new AiChatRunService(runRepo as never, { isCloud: () => false } as never);
// The user-message insert (the first bare await after beginRun) throws.
const aiChatMessageRepo = {
insert: jest.fn().mockRejectedValue(new Error('insert boom')),
};
const aiChatRepo = {
// Existing chat -> chatId stays, no new-chat insert path.
findById: jest.fn().mockResolvedValue({ id: 'chat-1', creatorId: 'u1' }),
};
const service = new AiChatService(
{} as never, // ai
aiChatRepo as never,
aiChatMessageRepo as never,
{} as never, // aiChatPageSnapshotRepo
{} as never, // aiSettings
{} as never, // tools
{} as never, // mcpClients
{} as never, // aiAgentRoleRepo
{} as never, // pageRepo
{} as never, // pageAccess
{} as never, // environment
);
const runHooks: AiChatRunHooks = {
begin: (chatId) =>
runService.beginRun({
chatId,
workspaceId: workspace.id,
userId: user.id,
trigger: 'user',
}),
onSettled: (runId, status, error) =>
runService.finalizeRun(runId, workspace.id, status, error),
};
await expect(
service.stream({
user,
workspace,
sessionId: 'sess',
body: {
chatId: 'chat-1',
messages: [
{ id: 'm', role: 'user', parts: [{ type: 'text', text: 'hi' }] },
],
},
res: {} as never,
signal: new AbortController().signal,
model: {} as never,
role: null,
runHooks,
}),
).rejects.toThrow('insert boom');
// The run was begun...
expect(runRepo.insert).toHaveBeenCalledTimes(1);
// ...then settled to a terminal FAILED status by the safety net...
expect(runRepo.update).toHaveBeenCalledTimes(1);
expect(runRepo.update).toHaveBeenCalledWith(
'run-1',
'ws1',
expect.objectContaining({ status: 'failed' }),
);
// ...and the in-memory entry is gone, so a follow-up turn is NOT 409'd.
expect(runService.isLocallyActive('run-1')).toBe(false);
});
});
@@ -0,0 +1,489 @@
import { ConflictException, Logger } from '@nestjs/common';
// Mock the AI SDK so we can PROVE no provider call is made for the turn we are
// about to reject. The race rejection happens at runHooks.begin(), long before
// any streamText/generateText, so these never resolve a real model.
jest.mock('ai', () => ({
streamText: jest.fn(),
generateText: jest.fn(),
convertToModelMessages: jest.fn(() => []),
stepCountIs: jest.fn(() => () => false),
}));
import { streamText, generateText } from 'ai';
import { AiChatService } from './ai-chat.service';
import { RunAlreadyActiveError } from './ai-chat-run.service';
/**
* Race-closure coverage for the "one active run per chat" guard (#184).
*
* THE BUG: two simultaneous POST /ai-chat/stream on the same chat both pass the
* controller's cheap pre-check (TOCTOU), so the loser's run-row INSERT hits the
* partial unique index. Previously that 23505 was SWALLOWED and the second turn
* streamed UNTRACKED (no runId, not stoppable). THE FIX: beginRun surfaces a
* RunAlreadyActiveError and stream() turns it into a 409 BEFORE any AI call
* the second turn never runs.
*/
describe('AiChatService.stream — concurrent-run race rejection (#184)', () => {
const streamTextMock = streamText as unknown as jest.Mock;
const generateTextMock = generateText as unknown as jest.Mock;
beforeEach(() => {
streamTextMock.mockReset();
generateTextMock.mockReset();
});
// Minimal service whose only reachable deps before begin() are aiChatRepo
// (resolve the existing chat) — everything past begin must remain untouched.
function makeService(beginImpl: () => Promise<unknown>) {
const aiChatMessageRepo = { insert: jest.fn() };
const aiChatRepo = {
// An existing chat: stream keeps the supplied chatId and skips creation.
findById: jest.fn(async () => ({ id: 'chat-1', workspaceId: 'ws-1' })),
insert: jest.fn(),
};
const svc = new AiChatService(
{} as never, // ai
aiChatRepo as never,
aiChatMessageRepo as never,
{} as never, // aiChatPageSnapshotRepo
{} as never, // aiSettings
{} as never, // tools
{} as never, // mcpClients
{} as never, // aiAgentRoleRepo
{} as never, // pageRepo
{} as never, // pageAccess
{ isAiChatDeferredToolsEnabled: () => false } as never, // environment
);
const begin = jest.fn(beginImpl);
return { svc, begin, aiChatRepo, aiChatMessageRepo };
}
const baseArgs = (begin: jest.Mock) => ({
user: { id: 'user-1' } as never,
workspace: { id: 'ws-1' } as never,
sessionId: 'sess-1',
body: { chatId: 'chat-1', messages: [] } as never,
res: { raw: {} } as never,
signal: new AbortController().signal,
model: {} as never,
role: null,
runHooks: {
begin,
onAssistantSeeded: jest.fn(),
onStep: jest.fn(),
onSettled: jest.fn(),
} as never,
});
it('rejects the racer with a 409 ConflictException BEFORE any AI call, and never persists an untracked turn', async () => {
// begin loses the unique-index race -> RunAlreadyActiveError.
const { svc, begin, aiChatMessageRepo } = makeService(() => {
throw new RunAlreadyActiveError('chat-1');
});
const promise = svc.stream(baseArgs(begin));
await expect(promise).rejects.toBeInstanceOf(ConflictException);
await promise.catch((err: ConflictException) => {
expect(err.getStatus()).toBe(409);
expect((err.getResponse() as { code?: string }).code).toBe(
'A_RUN_ALREADY_ACTIVE',
);
});
// The decisive assertions: the rejected racer spent NO tokens and left NO
// untracked turn behind.
expect(begin).toHaveBeenCalledTimes(1);
expect(streamTextMock).not.toHaveBeenCalled();
expect(generateTextMock).not.toHaveBeenCalled();
expect(aiChatMessageRepo.insert).not.toHaveBeenCalled();
});
});
/**
* F3 the LOAD-BEARING run-detach wiring: `effectiveSignal = handle.signal`
* after runHooks.begin, then `abortSignal: effectiveSignal` passed to streamText.
* That single line is what makes a run survive a browser disconnect (the agent
* loop's abort is governed by the RUN's signal, not the socket): a regression to
* the socket-bound signal would still pass every other test green while silently
* breaking Stop + durability. These two tests pin the exact signal streamText
* consumes on both paths.
*/
describe('AiChatService.stream — abortSignal wiring (#184 F3)', () => {
const streamTextMock = streamText as unknown as jest.Mock;
// A streamText result stub: the post-call drain + pipe are no-ops here; we only
// care WHICH abortSignal streamText was handed.
function makeStreamResult() {
return {
consumeStream: jest.fn(),
pipeUIMessageStreamToResponse: jest.fn(),
};
}
// A raw-response stub sufficient for the post-streamText wiring
// (stripStreamingHopByHopHeaders binds writeHead; startSseHeartbeat registers
// close/finish listeners; flushHeaders is belt-and-braces).
function makeRes() {
return {
raw: {
writeHead: jest.fn(),
write: jest.fn(),
once: jest.fn(),
on: jest.fn(),
flushHeaders: jest.fn(),
writableEnded: false,
destroyed: false,
},
};
}
// Wire only the deps reached on the way to streamText: resolve the existing
// chat, persist the user + seed the assistant row, load (empty) history, the
// admin settings, an empty external toolset + Docmost toolset.
function makeService() {
const aiChatRepo = {
findById: jest.fn(async () => ({ id: 'chat-1', workspaceId: 'ws-1' })),
insert: jest.fn(),
};
const aiChatMessageRepo = {
insert: jest.fn(async () => ({ id: 'msg-1' })),
findAllByChat: jest.fn(async () => []),
update: jest.fn(async () => ({ id: 'msg-1' })),
};
const aiSettings = { resolve: jest.fn(async () => ({})) };
const tools = { forUser: jest.fn(async () => ({})) };
const mcpClients = {
toolsFor: jest.fn(async () => ({
tools: {},
clients: [],
outcomes: [],
instructions: [],
})),
};
const svc = new AiChatService(
{} as never, // ai
aiChatRepo as never,
aiChatMessageRepo as never,
{} as never, // aiChatPageSnapshotRepo
aiSettings as never,
tools as never,
mcpClients as never,
{} as never, // aiAgentRoleRepo
{} as never, // pageRepo (openPage undefined -> never touched)
{} as never, // pageAccess
{ isAiChatDeferredToolsEnabled: () => false } as never, // environment
);
return { svc };
}
const body = {
chatId: 'chat-1',
messages: [
{ id: 'm1', role: 'user', parts: [{ type: 'text', text: 'hi' }] },
],
};
beforeEach(() => {
streamTextMock.mockReset();
streamTextMock.mockImplementation(() => makeStreamResult());
jest
.spyOn(Logger.prototype, 'log')
.mockImplementation(() => undefined as never);
});
afterEach(() => jest.restoreAllMocks());
it('happy path (run-wrapped): streamText is driven with abortSignal === handle.signal (the RUN signal, NOT the socket)', async () => {
const { svc } = makeService();
const runController = new AbortController();
const runSignal = runController.signal;
const socketSignal = new AbortController().signal;
const begin = jest.fn(async () => ({ runId: 'run-1', signal: runSignal }));
await svc.stream({
user: { id: 'user-1' } as never,
workspace: { id: 'ws-1' } as never,
sessionId: 'sess-1',
body: body as never,
res: makeRes() as never,
signal: socketSignal,
model: {} as never,
role: null,
runHooks: {
begin,
onAssistantSeeded: jest.fn(),
onStep: jest.fn(),
onSettled: jest.fn(),
} as never,
});
expect(begin).toHaveBeenCalledTimes(1);
expect(streamTextMock).toHaveBeenCalledTimes(1);
// THE assertion: the agent loop's abort is wired to the RUN, so a browser
// disconnect (which aborts only `socketSignal`) cannot end the turn.
expect(streamTextMock.mock.calls[0][0].abortSignal).toBe(runSignal);
expect(streamTextMock.mock.calls[0][0].abortSignal).not.toBe(socketSignal);
});
it('legacy path (no runHooks): streamText is driven with the SOCKET signal', async () => {
const { svc } = makeService();
const socketSignal = new AbortController().signal;
await svc.stream({
user: { id: 'user-1' } as never,
workspace: { id: 'ws-1' } as never,
sessionId: 'sess-1',
body: body as never,
res: makeRes() as never,
signal: socketSignal,
model: {} as never,
role: null,
// No runHooks -> the turn stays socket-bound (flag off / default).
});
expect(streamTextMock).toHaveBeenCalledTimes(1);
expect(streamTextMock.mock.calls[0][0].abortSignal).toBe(socketSignal);
});
/**
* F9 streamText's TERMINAL callbacks carry the #184 run lifecycle:
* onStepFinish -> runHooks.onStep(runId, stepCount)
* onFinish -> runHooks.onSettled(runId, 'completed') (dominant path)
* onAbort -> runHooks.onSettled(runId, 'aborted')
* onError -> runHooks.onSettled(runId, 'error', cause)
* makeStreamResult() ignores the streamText options, so these callbacks never
* fire on their own a regression in this wiring (esp. the success path) would
* strand the run with NO test catching it. Here we CAPTURE the options streamText
* was handed and invoke each callback with the real wiring, asserting the run
* hooks fire with the right args.
*/
// Drive stream() to the point streamText is called, capturing the options object
// (which carries onStepFinish/onFinish/onError/onAbort) and the run hooks.
async function captureStreamCallbacks() {
const { svc } = makeService();
let capturedOpts: any;
streamTextMock.mockImplementation((opts: any) => {
capturedOpts = opts;
return makeStreamResult();
});
const runHooks = {
begin: jest.fn(async () => ({
runId: 'run-1',
signal: new AbortController().signal,
})),
onAssistantSeeded: jest.fn(),
onStep: jest.fn(),
onSettled: jest.fn(),
};
await svc.stream({
user: { id: 'user-1' } as never,
workspace: { id: 'ws-1' } as never,
sessionId: 'sess-1',
body: body as never,
res: makeRes() as never,
signal: new AbortController().signal,
model: {} as never,
role: null,
runHooks: runHooks as never,
});
expect(capturedOpts).toBeDefined();
return { capturedOpts, runHooks };
}
it('F9: onStepFinish bumps the run step count, onFinish settles the run "completed" (the dominant autonomous-run path)', async () => {
const { capturedOpts, runHooks } = await captureStreamCallbacks();
// A finished step -> onStep(runId, finishedStepCount).
capturedOpts.onStepFinish({ text: 'step one', toolCalls: [], content: [] });
expect(runHooks.onStep).toHaveBeenCalledWith('run-1', 1);
capturedOpts.onStepFinish({ text: 'step two', toolCalls: [], content: [] });
expect(runHooks.onStep).toHaveBeenLastCalledWith('run-1', 2);
// The success terminal callback settles the run.
await capturedOpts.onFinish({
text: 'done',
finishReason: 'stop',
totalUsage: {},
usage: {},
steps: [],
});
expect(runHooks.onSettled).toHaveBeenCalledWith('run-1', 'completed');
});
it('F9: onAbort settles the run "aborted"', async () => {
jest
.spyOn(Logger.prototype, 'warn')
.mockImplementation(() => undefined as never);
const { capturedOpts, runHooks } = await captureStreamCallbacks();
await capturedOpts.onAbort({ steps: [] });
expect(runHooks.onSettled).toHaveBeenCalledWith('run-1', 'aborted');
});
it('F9: onError settles the run "error" carrying the provider cause', async () => {
jest
.spyOn(Logger.prototype, 'error')
.mockImplementation(() => undefined as never);
jest
.spyOn(Logger.prototype, 'warn')
.mockImplementation(() => undefined as never);
const { capturedOpts, runHooks } = await captureStreamCallbacks();
await capturedOpts.onError({ error: new Error('provider exploded') });
expect(runHooks.onSettled).toHaveBeenCalledWith(
'run-1',
'error',
expect.stringContaining('provider exploded'),
);
});
});
/**
* F14 the begin-failure RESILIENCE branch (the `else` of the run-race guard).
*
* stream() wraps runHooks.begin in try/catch with TWO branches:
* - RunAlreadyActiveError -> 409 ConflictException (pinned above).
* - ANY OTHER begin failure -> SWALLOW + continue UNTRACKED on the socket signal
* (legacy fallback): it logs "...streaming without run tracking", leaves
* `effectiveSignal = signal` (runId undefined) and serves the turn anyway.
*
* The contract: a transient beginRun failure (e.g. a non-unique DB error inserting
* the run row) must STILL serve the user's turn it must NOT re-throw and must NOT
* be misclassified as a 409. A regression that re-threw here would break EVERY turn
* on a begin failure with nothing to catch it. This branch is otherwise undriven by
* any spec, so it is pinned here SEPARATELY from the 409 path: a plain begin error
* proceeds to streamText with the SOCKET signal and still persists the user turn.
*/
describe('AiChatService.stream — begin-failure resilience / legacy fallback (#184 F14)', () => {
const streamTextMock = streamText as unknown as jest.Mock;
function makeStreamResult() {
return {
consumeStream: jest.fn(),
pipeUIMessageStreamToResponse: jest.fn(),
};
}
function makeRes() {
return {
raw: {
writeHead: jest.fn(),
write: jest.fn(),
once: jest.fn(),
on: jest.fn(),
flushHeaders: jest.fn(),
writableEnded: false,
destroyed: false,
},
};
}
// Same harness as the F3 abortSignal block, but it also exposes
// aiChatMessageRepo so we can assert the user turn IS persisted (the turn really
// streamed) despite begin() blowing up.
function makeService() {
const aiChatRepo = {
findById: jest.fn(async () => ({ id: 'chat-1', workspaceId: 'ws-1' })),
insert: jest.fn(),
};
const aiChatMessageRepo = {
insert: jest.fn(async () => ({ id: 'msg-1' })),
findAllByChat: jest.fn(async () => []),
update: jest.fn(async () => ({ id: 'msg-1' })),
};
const aiSettings = { resolve: jest.fn(async () => ({})) };
const tools = { forUser: jest.fn(async () => ({})) };
const mcpClients = {
toolsFor: jest.fn(async () => ({
tools: {},
clients: [],
outcomes: [],
instructions: [],
})),
};
const svc = new AiChatService(
{} as never, // ai
aiChatRepo as never,
aiChatMessageRepo as never,
{} as never, // aiChatPageSnapshotRepo
aiSettings as never,
tools as never,
mcpClients as never,
{} as never, // aiAgentRoleRepo
{} as never, // pageRepo
{} as never, // pageAccess
{ isAiChatDeferredToolsEnabled: () => false } as never, // environment
);
return { svc, aiChatMessageRepo };
}
const body = {
chatId: 'chat-1',
messages: [
{ id: 'm1', role: 'user', parts: [{ type: 'text', text: 'hi' }] },
],
};
beforeEach(() => {
streamTextMock.mockReset();
streamTextMock.mockImplementation(() => makeStreamResult());
jest
.spyOn(Logger.prototype, 'log')
.mockImplementation(() => undefined as never);
});
afterEach(() => jest.restoreAllMocks());
it('a PLAIN begin() failure (NOT RunAlreadyActiveError) does NOT 409 — it swallows, logs, and streams the turn UNTRACKED on the socket signal', async () => {
const errorSpy = jest
.spyOn(Logger.prototype, 'error')
.mockImplementation(() => undefined as never);
const { svc, aiChatMessageRepo } = makeService();
const socketSignal = new AbortController().signal;
// A transient, NON-race begin failure (e.g. a non-unique DB error inserting
// the run row). This is the `else` branch of the begin try/catch.
const begin = jest.fn(async () => {
throw new Error('insert failed');
});
const promise = svc.stream({
user: { id: 'user-1' } as never,
workspace: { id: 'ws-1' } as never,
sessionId: 'sess-1',
body: body as never,
res: makeRes() as never,
signal: socketSignal,
model: {} as never,
role: null,
runHooks: {
begin,
onAssistantSeeded: jest.fn(),
onStep: jest.fn(),
onSettled: jest.fn(),
} as never,
});
// The turn proceeds: NO throw at all (in particular NOT a 409).
await expect(promise).resolves.toBeUndefined();
expect(begin).toHaveBeenCalledTimes(1);
// The resilience branch logged the legacy-fallback warning.
expect(errorSpy).toHaveBeenCalledWith(
expect.stringContaining('streaming without run tracking'),
expect.anything(),
);
// The turn really streamed: the user message was persisted and streamText ran.
expect(aiChatMessageRepo.insert).toHaveBeenCalled();
expect(streamTextMock).toHaveBeenCalledTimes(1);
// The decisive wiring: with no run handle, the fallback uses the SOCKET signal
// (effectiveSignal = signal, runId undefined) — not a run-bound signal.
expect(streamTextMock.mock.calls[0][0].abortSignal).toBe(socketSignal);
});
});
@@ -217,23 +217,78 @@ describe('rowToUiMessage', () => {
* a text-only synthesis answer (toolChoice 'none') with the FINAL_STEP_INSTRUCTION
* appended onto not replacing the original system prompt.
*/
// Narrowing helpers for the prepareAgentStep union return type.
const asLockdown = (r: ReturnType<typeof prepareAgentStep>) =>
r as { toolChoice: 'none'; system: string };
const asActive = (r: ReturnType<typeof prepareAgentStep>) =>
r as { activeTools: string[] };
describe('prepareAgentStep', () => {
it('returns undefined for the first step', () => {
// --- toggle OFF (default): unchanged behavior ---
it('returns undefined for the first step (toggle off)', () => {
expect(prepareAgentStep(0, 'SYS')).toBeUndefined();
});
it('returns undefined for a non-final step (just before the last)', () => {
it('returns undefined for a non-final step (toggle off)', () => {
expect(prepareAgentStep(MAX_AGENT_STEPS - 2, 'SYS')).toBeUndefined();
});
it('forces a text-only synthesis on the final allowed step', () => {
const result = prepareAgentStep(MAX_AGENT_STEPS - 1, 'SYS');
it('forces a text-only synthesis on the final allowed step (toggle off)', () => {
const result = asLockdown(prepareAgentStep(MAX_AGENT_STEPS - 1, 'SYS'));
expect(result).toBeDefined();
expect(result?.toolChoice).toBe('none');
expect(result.toolChoice).toBe('none');
// The original persona is preserved (prefix), not replaced.
expect(result?.system.startsWith('SYS')).toBe(true);
expect(result.system.startsWith('SYS')).toBe(true);
// The synthesis instruction is appended.
expect(result?.system).toContain(FINAL_STEP_INSTRUCTION);
expect(result.system).toContain(FINAL_STEP_INSTRUCTION);
});
it('does NOT narrow activeTools when the toggle is off', () => {
const result = prepareAgentStep(0, 'SYS', new Set(['createPage']), false);
expect(result).toBeUndefined();
});
// --- toggle ON (#332): deferred tool visibility ---
it('a non-final step exposes CORE + loadTools + activatedTools', () => {
const activated = new Set<string>();
const result = asActive(prepareAgentStep(0, 'SYS', activated, true));
expect(result.activeTools).toContain('searchPages'); // core
expect(result.activeTools).toContain('searchInPage'); // #330, core
expect(result.activeTools).toContain('editPageText'); // core
expect(result.activeTools).toContain('loadTools'); // meta-tool
// No deferred tool is active before it is loaded.
expect(result.activeTools).not.toContain('createPage');
expect(result.activeTools).not.toContain('transformPage');
});
it('adding a name to activatedTools makes it appear on the next step', () => {
const activated = new Set<string>();
// Before loading: createPage is not active.
expect(
asActive(prepareAgentStep(1, 'SYS', activated, true)).activeTools,
).not.toContain('createPage');
// loadTools grows the SAME set…
activated.add('createPage');
// …so the next step sees it.
const next = asActive(prepareAgentStep(2, 'SYS', activated, true));
expect(next.activeTools).toContain('createPage');
expect(next.activeTools).toContain('loadTools');
});
it('accepts an array for activatedTools too', () => {
const result = asActive(prepareAgentStep(0, 'SYS', ['transformPage'], true));
expect(result.activeTools).toContain('transformPage');
expect(result.activeTools).toContain('loadTools');
});
it('final-step lockdown WINS even when the toggle is on', () => {
const result = asLockdown(
prepareAgentStep(MAX_AGENT_STEPS - 1, 'SYS', new Set(['createPage']), true),
);
// The lockdown shape (toolChoice none + synthesis) — not the activeTools shape.
expect(result.toolChoice).toBe('none');
expect(result.system).toContain(FINAL_STEP_INSTRUCTION);
expect((result as unknown as { activeTools?: string[] }).activeTools).toBeUndefined();
});
});
@@ -398,6 +453,12 @@ describe('chatStreamMetadata', () => {
});
});
it('attaches the runId on the start part when a run wraps the turn (#184)', () => {
expect(
chatStreamMetadata({ type: 'start' }, 'chat-1', undefined, 'run-1'),
).toEqual({ chatId: 'chat-1', runId: 'run-1' });
});
it('returns the CUMULATIVE step usage passed in for the finish-step part', () => {
// finish-step usage is per-step in v6; the caller accumulates and passes the
// running sum, which this just wraps.
File diff suppressed because it is too large Load Diff
@@ -43,6 +43,30 @@ export class BoundChatDto {
pageId: string;
}
/**
* Reconnect to the latest run of a chat (#184): fetch its persisted lifecycle
* state (and the assistant message it projects) for an in-flight or finished run.
*/
export class GetRunDto {
@IsString()
chatId: string;
}
/**
* Explicitly STOP an agent run (#184): the user pressed Stop distinct from a
* browser disconnect, which never stops a run. Either the run id (preferred, from
* the streamed start metadata) or the chat id (stop whatever run is active on it).
*/
export class StopRunDto {
@IsOptional()
@IsString()
runId?: string;
@IsOptional()
@IsString()
chatId?: string;
}
/** Export a chat to Markdown (#183). `lang` localizes the few fixed
* role/tool-action labels; defaults to English server-side. */
export class ExportChatDto {
@@ -17,6 +17,10 @@ import { resolveCurrentPageResult } from './current-page.util';
import { parseNodeArg } from './parse-node-arg';
import { modelFriendlyInput } from './model-friendly-input';
import { SandboxStore } from '../../../integrations/sandbox/sandbox.store';
import {
buildInAppDeferredCatalog,
type ToolCatalogEntry,
} from './tool-tiers';
/**
* Per-user, per-request adapter that exposes Docmost READ operations to the
@@ -123,6 +127,18 @@ export class AiChatToolsService {
return client.exportPageMarkdown(pageId);
}
/**
* Build the IN-APP deferred <tool_catalog> entries (#332): one "name — purpose"
* line per DEFERRED tool, merging the per-layer INLINE_TOOL_TIERS with the
* shared registry's own catalogLine. Loads @docmost/mcp for the shared specs
* (memoized). Core tools are always active and are NOT listed here. External
* MCP tools are catalogued separately by the caller (they are runtime-scoped).
*/
async getInAppDeferredCatalog(): Promise<ToolCatalogEntry[]> {
const { sharedToolSpecs } = await loadDocmostMcp();
return buildInAppDeferredCatalog(sharedToolSpecs);
}
async forUser(
user: User,
sessionId: string,
@@ -241,6 +241,11 @@ export interface SharedToolSpec {
mcpName: string;
inAppKey: string;
description: string;
// Deferred-tool metadata (#332). Optional in this mirror so an older/stale
// @docmost/mcp build (pre-#332) still type-checks; the in-app catalog builder
// reads them defensively. The external /mcp server ignores both fields.
tier?: 'core' | 'deferred';
catalogLine?: string;
// Loose `z` on purpose: the registry is zod-agnostic so the server can pass
// its own zod (v4) and the MCP package its own (v3) into the same builder.
buildShape?: (z: any) => Record<string, unknown>;
@@ -0,0 +1,244 @@
import {
CORE_TOOL_KEYS,
CORE_TOOL_SET,
LOAD_TOOLS_NAME,
LOAD_TOOLS_DESCRIPTION,
INLINE_TOOL_TIERS,
buildInAppDeferredCatalog,
buildExternalToolCatalog,
shortenForCatalog,
applyLoadTools,
} from './tool-tiers';
// The real shared registry, imported from source (same approach as the
// SHARED_TOOL_SPECS contract spec) so the tier metadata is checked against
// exactly what @docmost/mcp ships.
import { SHARED_TOOL_SPECS } from '../../../../../../packages/mcp/src/tool-specs';
// For the live-toolset partition test (F3): the REAL adapter, so the catalog is
// checked against the tools AiChatToolsService.forUser() actually builds — not a
// static list that could drift from it.
import { AiChatToolsService } from './ai-chat-tools.service';
import * as loader from './docmost-client.loader';
import type { DocmostClientLike } from './docmost-client.loader';
/**
* #332 deferred tool loading tier metadata, catalog assembly, and the
* loadTools meta-tool. Pure units; no Nest graph, no @docmost/mcp build (the
* registry is imported from TS source).
*/
describe('tool tier metadata (#332)', () => {
it('core set is the documented 13 + searchInPage (14)', () => {
expect(CORE_TOOL_KEYS).toHaveLength(14);
expect(CORE_TOOL_SET.has('searchInPage')).toBe(true); // #330, promoted to core
// loadTools is a meta-tool, not a normal core key.
expect(CORE_TOOL_SET.has(LOAD_TOOLS_NAME)).toBe(false);
});
it('SHARED_TOOL_SPECS tier agrees with CORE_TOOL_SET for every shared tool', () => {
for (const [key, spec] of Object.entries(SHARED_TOOL_SPECS)) {
const isCoreByTier = spec.tier === 'core';
const isCoreByList = CORE_TOOL_SET.has(key);
expect(isCoreByTier).toBe(isCoreByList);
// Every spec carries a non-empty catalogLine (core tools too).
expect(typeof spec.catalogLine).toBe('string');
expect(spec.catalogLine.trim().length).toBeGreaterThan(0);
}
});
it('every INLINE tool tier agrees with CORE_TOOL_SET and has a catalogLine', () => {
for (const [key, meta] of Object.entries(INLINE_TOOL_TIERS)) {
expect(meta.tier === 'core').toBe(CORE_TOOL_SET.has(key));
expect(meta.catalogLine.trim().length).toBeGreaterThan(0);
}
});
});
describe('buildInAppDeferredCatalog (#332)', () => {
const catalog = buildInAppDeferredCatalog(SHARED_TOOL_SPECS as never);
const names = catalog.map((e) => e.name);
it('includes deferred tools from BOTH the inline map and the shared registry', () => {
expect(names).toContain('transformPage'); // inline deferred
expect(names).toContain('getPageJson'); // shared deferred
expect(names).toContain('patchNode'); // shared deferred
expect(names).toContain('createPage'); // inline deferred
});
it('NEVER lists a core tool', () => {
for (const core of CORE_TOOL_KEYS) {
expect(names).not.toContain(core);
}
// spot-check a couple that are core in each source.
expect(names).not.toContain('searchInPage'); // shared core
expect(names).not.toContain('searchPages'); // inline core
expect(names).not.toContain('editPageText'); // shared core
});
it('renders every entry as a "name — purpose" line', () => {
// Non-empty catalog (the length is pinned structurally by the live-toolset
// partition test below, not by a magic constant that rots on every new tool).
expect(catalog.length).toBeGreaterThan(0);
for (const entry of catalog) {
expect(entry.catalogLine).toMatch(/ — /);
}
});
});
/**
* F3 the deferred <tool_catalog> is built from STATIC metadata (INLINE_TOOL_TIERS
* + SHARED_TOOL_SPECS), but the loadable-by-name set is derived at RUNTIME from the
* actual toolset (`Object.keys(baseTools)` in ai-chat.service.ts). Those two must
* agree or a tool becomes loadable-but-invisible (agent thinks it doesn't exist) or
* catalogued-but-phantom. INLINE_TOOL_TIERS is a plain hand-maintained Record with
* no compile-time link to the tools AiChatToolsService.forUser() builds, so nothing
* else catches that drift. This test uses forUser()'s LIVE keys as the source of
* truth (mirroring ai-chat-tools.service.spec.ts's loader mock) and asserts a
* two-way partition against buildInAppDeferredCatalog replacing the old magic
* toHaveLength(28), so a tool added to forUser() without a catalog line (or a
* catalog line without a real tool) fails the suite instead of silently vanishing.
*/
describe('deferred catalog ↔ live forUser() toolset partition (#332, F3)', () => {
let toolKeys: string[];
const catalogNames = buildInAppDeferredCatalog(SHARED_TOOL_SPECS as never).map(
(e) => e.name,
);
beforeAll(async () => {
// Intercept the ESM loader so forUser() builds against the TS-source shared
// specs (no @docmost/mcp build) and never touches the network.
jest.spyOn(loader, 'loadDocmostMcp').mockResolvedValue({
DocmostClient: function () {
return {} as DocmostClientLike;
} as unknown as loader.DocmostClientCtor,
sharedToolSpecs: SHARED_TOOL_SPECS as Record<string, loader.SharedToolSpec>,
});
const service = new AiChatToolsService(
{
generateAccessToken: jest.fn().mockResolvedValue('access-token'),
generateCollabToken: jest.fn().mockResolvedValue('collab-token'),
} as never,
{} as never, // aiService — not exercised while merely BUILDING the tools
{} as never, // pageEmbeddingRepo
{} as never, // spaceMemberRepo
{} as never, // pagePermissionRepo
// sandboxStore: forUser() eagerly calls asSink() to wire the stash tool.
{
asSink: () => ({ put: jest.fn(), has: jest.fn(), evict: jest.fn() }),
} as never,
);
const tools = await service.forUser(
{ id: 'user-1', email: 'u@example.com', workspaceId: 'ws-1' } as never,
'session-1',
'ws-1',
'chat-1',
);
toolKeys = Object.keys(tools);
});
afterAll(() => {
jest.restoreAllMocks();
});
it('exposes a non-trivial toolset (sanity: the mock actually built tools)', () => {
expect(toolKeys.length).toBeGreaterThan(20);
});
it('every non-core live tool is present in the catalog (no capability silently hidden)', () => {
// forUser() does not itself add loadTools (ai-chat.service does), but guard
// anyway. Every remaining non-core key MUST have a catalog line.
const catalogSet = new Set(catalogNames);
const missing = toolKeys.filter(
(k) => !CORE_TOOL_SET.has(k) && k !== LOAD_TOOLS_NAME && !catalogSet.has(k),
);
expect(missing).toEqual([]);
});
it('every catalog entry corresponds to a real, non-core live tool (no phantom)', () => {
const liveSet = new Set(toolKeys);
const phantom = catalogNames.filter(
(n) => !liveSet.has(n) || CORE_TOOL_SET.has(n),
);
expect(phantom).toEqual([]);
});
});
describe('buildExternalToolCatalog + shortenForCatalog (#332)', () => {
it('derives a short "name — purpose" line from each external tool description', () => {
const catalog = buildExternalToolCatalog({
tavily_search: { description: 'Search the web for fresh results. More detail here.' },
tavily_extract: { description: '' },
});
expect(catalog).toEqual([
{ name: 'tavily_search', catalogLine: 'tavily_search — Search the web for fresh results.' },
{ name: 'tavily_extract', catalogLine: 'tavily_extract — external tool' },
]);
});
it('caps a very long description', () => {
const long = 'x'.repeat(500);
expect(shortenForCatalog(long).length).toBeLessThanOrEqual(140);
expect(shortenForCatalog(long).endsWith('…')).toBe(true);
});
});
describe('applyLoadTools (#332)', () => {
const valid = new Set(['createPage', 'transformPage', 'tavily_search']);
it('adds valid names to the activated set and returns { loaded }', () => {
const activated = new Set<string>();
const result = applyLoadTools(['createPage', 'tavily_search'], activated, valid);
expect(result).toEqual({ loaded: ['createPage', 'tavily_search'] });
expect(activated.has('createPage')).toBe(true);
expect(activated.has('tavily_search')).toBe(true);
});
it('rejects an unknown name with an error listing the valid deferred names', () => {
const activated = new Set<string>();
expect(() => applyLoadTools(['nope'], activated, valid)).toThrow(/unknown tool name/i);
try {
applyLoadTools(['nope'], activated, valid);
} catch (e) {
const msg = (e as Error).message;
// Lists every valid name (sorted).
expect(msg).toContain('createPage');
expect(msg).toContain('transformPage');
expect(msg).toContain('tavily_search');
}
// Nothing is activated on a rejected call.
expect(activated.size).toBe(0);
});
it('tolerates a non-array / empty input (loads nothing)', () => {
const activated = new Set<string>();
expect(applyLoadTools(undefined, activated, valid)).toEqual({ loaded: [] });
expect(applyLoadTools([], activated, valid)).toEqual({ loaded: [] });
expect(activated.size).toBe(0);
});
it('loadTools description is the verbatim issue text', () => {
expect(LOAD_TOOLS_DESCRIPTION).toContain('only ACTIVATES them');
expect(LOAD_TOOLS_DESCRIPTION).toContain('callable on your NEXT step');
});
});
describe('editorial "Corrector" scenario is fully served by CORE (#332)', () => {
it('read + comment + edit + search need no loadTools', () => {
// A Corrector role reads a page, searches within it, edits text, and leaves
// inline comments — every tool it needs is core, so it never has to load a
// deferred tool.
const needed = [
'getCurrentPage',
'getPage',
'searchPages',
'searchInPage',
'editPageText',
'createComment',
'listComments',
'getComment',
'resolveComment',
];
for (const t of needed) {
expect(CORE_TOOL_SET.has(t)).toBe(true);
}
});
});
@@ -0,0 +1,309 @@
import { tool, type Tool } from 'ai';
import { z } from 'zod';
import type { SharedToolSpec } from './docmost-client.loader';
/**
* Deferred tool loading for the in-app AI chat (#332).
*
* The agent otherwise sends ALL ~41 tool definitions on EVERY model call every
* step, bloating context. Instead we split the in-app tools into two tiers:
*
* - CORE (hot, always active): frequent OR tiny tools whose full schema is
* always visible, plus the `loadTools` meta-tool. Deferring a one-line tool is
* pure loss, so tiny tools stay core even if rare.
* - DEFERRED (loaded on demand): the fat/rare tools + ALL external MCP tools by
* default. The model sees only a compact <tool_catalog> (name purpose) and
* calls `loadTools(names)` to ACTIVATE a tool's full schema for the NEXT step
* (one extra round-trip on first use).
*
* This module is the single source of truth for the IN-APP tiering:
* - CORE_TOOL_KEYS / CORE_TOOL_SET the authoritative core list (used by
* prepareAgentStep to build per-step `activeTools`).
* - INLINE_TOOL_TIERS tier + catalogLine for the per-layer INLINE tools (the
* ones NOT in @docmost/mcp's SHARED_TOOL_SPECS, which carry their own).
* - buildInAppDeferredCatalog / buildExternalToolCatalog assemble the
* <tool_catalog> deferred lines.
* - applyLoadTools / makeLoadToolsTool the loadTools meta-tool.
*
* The tier/catalogLine fields on SHARED_TOOL_SPECS are IN-APP metadata only; the
* external /mcp server ignores them and exposes every tool normally.
*/
/** A single rendered <tool_catalog> line: the tool name + its "name — purpose". */
export interface ToolCatalogEntry {
/** Exact tool name the model must pass to loadTools. */
name: string;
/** Hand-written (in-app) or derived (external) "name — purpose" line. */
catalogLine: string;
}
/**
* CORE (always-active) in-app tool keys 13 frequent/tiny tools. `searchInPage`
* (#330) is added to core on top of the issue's original tier list: it is
* frequent for the editorial roles this feature targets. `loadTools` is active
* too but is not a normal tool key (it is added to activeTools separately).
*/
export const CORE_TOOL_KEYS = [
'searchPages',
'listPages',
'listSpaces',
'getWorkspace',
'getCurrentPage',
'getPage',
'getOutline',
'getNode',
'createComment',
'getComment',
'listComments',
'resolveComment',
'editPageText',
// #330 search_in_page — frequent for editorial sweeps; core despite predating
// the issue's tier list.
'searchInPage',
] as const;
/** O(1) membership test for the core tier. */
export const CORE_TOOL_SET: ReadonlySet<string> = new Set(CORE_TOOL_KEYS);
/** The meta-tool name (always active alongside the core tools when enabled). */
export const LOAD_TOOLS_NAME = 'loadTools';
/**
* loadTools description VERBATIM from issue #332. Tells the model that the
* catalog names EXIST, that loadTools only ACTIVATES them (callable next step),
* and to load several at once.
*/
export const LOAD_TOOLS_DESCRIPTION =
'loadTools — Load the full definitions of deferred tools from the <tool_catalog>\n' +
'block in your instructions. Pass the EXACT tool names from the catalog; this\n' +
'call only ACTIVATES them and returns { loaded: [...] } — the tools become\n' +
'callable on your NEXT step. Load several names in one call when the task clearly\n' +
'needs them. Unknown names are rejected with the list of valid ones.';
/**
* Tier + catalogLine for the INLINE ai-chat tools those defined per-layer in
* ai-chat-tools.service.ts and NOT present in @docmost/mcp's SHARED_TOOL_SPECS
* (which carries its own tier/catalogLine). Together with the shared registry
* this describes every in-app tool. catalogLine is present for core tools too
* (uniformity), but only DEFERRED tools are rendered into the catalog.
*/
export const INLINE_TOOL_TIERS: Record<
string,
{ tier: 'core' | 'deferred'; catalogLine: string }
> = {
// --- core inline ---
searchPages: {
tier: 'core',
catalogLine: 'searchPages — hybrid semantic + keyword search across the wiki.',
},
getCurrentPage: {
tier: 'core',
catalogLine: 'getCurrentPage — the page the user is currently viewing.',
},
getPage: {
tier: 'core',
catalogLine: 'getPage — fetch a page as Markdown by its id.',
},
listPages: {
tier: 'core',
catalogLine: "listPages — list recent pages, or a space's full page tree.",
},
listComments: {
tier: 'core',
catalogLine: 'listComments — list all comments on a page (including resolved).',
},
getComment: {
tier: 'core',
catalogLine: 'getComment — fetch a single comment by id.',
},
createComment: {
tier: 'core',
catalogLine:
'createComment — add an inline comment (optionally with a suggested edit).',
},
resolveComment: {
tier: 'core',
catalogLine: 'resolveComment — resolve or reopen a comment thread.',
},
// --- deferred inline ---
createPage: {
tier: 'deferred',
catalogLine: 'createPage — create a new page with a Markdown body in a space.',
},
updatePageContent: {
tier: 'deferred',
catalogLine:
"updatePageContent — replace a page's body (and optionally title) with new Markdown.",
},
renamePage: {
tier: 'deferred',
catalogLine: "renamePage — change a page's title only (body untouched).",
},
movePage: {
tier: 'deferred',
catalogLine: 'movePage — move a page under a new parent or to the space root.',
},
deletePage: {
tier: 'deferred',
catalogLine: 'deletePage — move a page to trash (soft delete, reversible).',
},
listSidebarPages: {
tier: 'deferred',
catalogLine:
"listSidebarPages — list a space's root pages or a page's direct children.",
},
getTable: {
tier: 'deferred',
catalogLine: 'getTable — read a table as a matrix of cell texts and cell ids.',
},
checkNewComments: {
tier: 'deferred',
catalogLine:
'checkNewComments — find comments in a space created after a timestamp.',
},
getPageHistory: {
tier: 'deferred',
catalogLine:
'getPageHistory — fetch one page-history version with its ProseMirror content.',
},
exportPageMarkdown: {
tier: 'deferred',
catalogLine:
'exportPageMarkdown — export a page to self-contained Markdown (body + comments).',
},
updatePageJson: {
tier: 'deferred',
catalogLine:
"updatePageJson — overwrite a page's body with a full ProseMirror document.",
},
tableInsertRow: {
tier: 'deferred',
catalogLine: 'tableInsertRow — insert a row of plain-text cells into a table.',
},
tableDeleteRow: {
tier: 'deferred',
catalogLine: 'tableDeleteRow — delete a table row at a 0-based index.',
},
tableUpdateCell: {
tier: 'deferred',
catalogLine: 'tableUpdateCell — set the text of a table cell at [row, col].',
},
sharePage: {
tier: 'deferred',
catalogLine: 'sharePage — make a page publicly accessible and return its URL.',
},
transformPage: {
tier: 'deferred',
catalogLine: "transformPage — run a sandboxed JS transform over a page's document.",
},
};
/**
* Build the <tool_catalog> deferred lines for the IN-APP tools by merging the
* two metadata sources: the per-layer INLINE_TOOL_TIERS and the shared registry
* (SHARED_TOOL_SPECS, loaded at runtime). Only DEFERRED tools are included; core
* tools are always active and never appear in the catalog. Pure the caller
* passes the loaded specs so this stays unit-testable.
*/
export function buildInAppDeferredCatalog(
sharedToolSpecs: Record<string, SharedToolSpec>,
): ToolCatalogEntry[] {
const entries: ToolCatalogEntry[] = [];
// Inline deferred tools (hand-written lines).
for (const [name, meta] of Object.entries(INLINE_TOOL_TIERS)) {
if (meta.tier === 'deferred') {
entries.push({ name, catalogLine: meta.catalogLine });
}
}
// Shared deferred tools (line comes from the registry's own catalogLine).
for (const [name, spec] of Object.entries(sharedToolSpecs)) {
if (spec.tier === 'deferred' && spec.catalogLine) {
entries.push({ name, catalogLine: spec.catalogLine });
}
}
return entries;
}
/**
* Cap an external tool's (untrusted) description into a short catalog purpose.
* External MCP tools have no hand-written catalogLine, so we derive one from the
* first sentence of the description, hard-capped. Whitespace is collapsed.
*/
export function shortenForCatalog(description: string, max = 140): string {
const flat = description.replace(/\s+/g, ' ').trim();
if (!flat) return 'external tool';
// Prefer the first sentence if it is reasonably short.
const firstSentence = flat.split(/(?<=[.!?])\s/)[0];
const base =
firstSentence.length > 0 && firstSentence.length <= max
? firstSentence
: flat;
return base.length > max ? `${base.slice(0, max - 1).trimEnd()}` : base;
}
/**
* Build catalog lines for the EXTERNAL MCP tools (all deferred by default,
* #332). Their names are the namespaced tool keys; the purpose is derived from
* each tool's own description (no hand-written line exists). Pure.
*/
export function buildExternalToolCatalog(
externalTools: Record<string, { description?: string } | undefined>,
): ToolCatalogEntry[] {
return Object.entries(externalTools).map(([name, t]) => ({
name,
catalogLine: `${name}${shortenForCatalog(t?.description ?? '')}`,
}));
}
/**
* Pure core of the loadTools meta-tool. Validates the requested names against
* the per-turn set of valid deferred names, ADDS the valid ones to the caller's
* mutable `activatedTools` set (so they become callable next step), and returns
* `{ loaded }`. An unknown name throws a clear error listing the valid deferred
* names surfaced to the model as a tool error so it can retry.
*/
export function applyLoadTools(
names: unknown,
activatedTools: Set<string>,
validDeferredNames: ReadonlySet<string>,
): { loaded: string[] } {
const requested = Array.isArray(names)
? names.filter((n): n is string => typeof n === 'string')
: [];
const unknown = requested.filter((n) => !validDeferredNames.has(n));
if (unknown.length > 0) {
const valid = [...validDeferredNames].sort().join(', ');
throw new Error(
`loadTools: unknown tool name(s): ${unknown.join(', ')}. ` +
`Valid deferred tools are: ${valid || '(none)'}.`,
);
}
for (const n of requested) activatedTools.add(n);
return { loaded: requested };
}
/**
* Build the loadTools AI-SDK tool bound to THIS turn's mutable state: the
* `activatedTools` set (grown by execute, read by prepareAgentStep next step)
* and the `validDeferredNames` set (every non-core tool in this turn's toolset,
* incl. external MCP). Created per streamText call never module-global.
*/
export function makeLoadToolsTool(
activatedTools: Set<string>,
validDeferredNames: ReadonlySet<string>,
): Tool {
return tool({
description: LOAD_TOOLS_DESCRIPTION,
inputSchema: z.object({
names: z
.array(z.string())
.describe(
'EXACT deferred tool names from the <tool_catalog> to activate for ' +
'your next step.',
),
}),
execute: async ({ names }) =>
applyLoadTools(names, activatedTools, validDeferredNames),
});
}
@@ -55,6 +55,14 @@ export class UpdateWorkspaceDto extends PartialType(CreateWorkspaceDto) {
@IsBoolean()
aiDictationStreaming: boolean;
// #184: detached/autonomous agent runs (settings.ai.autonomousRuns). When on, a
// chat turn becomes a server-side RUN that survives a browser disconnect; only
// an explicit /ai-chat/stop ends it. Off by default; single-instance-only in
// phase 1 (see AiChatRunService.warnIfMultiInstance / AGENTS.md).
@IsOptional()
@IsBoolean()
autonomousRuns: boolean;
// Workspace master toggle that enables/disables the HTML embed block type.
// Persisted at settings.htmlEmbed. ABSENT/false => OFF (default). The block
// itself renders in a sandboxed iframe, so this is a feature switch, not a
@@ -526,6 +526,20 @@ export class WorkspaceService {
);
}
if (typeof updateWorkspaceDto.autonomousRuns !== 'undefined') {
const prev = settingsBefore?.ai?.autonomousRuns ?? false;
if (prev !== updateWorkspaceDto.autonomousRuns) {
before.autonomousRuns = prev;
after.autonomousRuns = updateWorkspaceDto.autonomousRuns;
}
await this.workspaceRepo.updateAiSettings(
workspaceId,
'autonomousRuns',
updateWorkspaceDto.autonomousRuns,
trx,
);
}
if (typeof updateWorkspaceDto.htmlEmbed !== 'undefined') {
const prev = settingsBefore?.htmlEmbed ?? false;
if (prev !== updateWorkspaceDto.htmlEmbed) {
@@ -579,6 +593,7 @@ export class WorkspaceService {
delete updateWorkspaceDto.aiChat;
delete updateWorkspaceDto.aiDictation;
delete updateWorkspaceDto.aiDictationStreaming;
delete updateWorkspaceDto.autonomousRuns;
delete updateWorkspaceDto.htmlEmbed;
delete updateWorkspaceDto.trackerHead;
delete updateWorkspaceDto.aiPublicShareAssistant;
@@ -31,6 +31,7 @@ import { FavoriteRepo } from '@docmost/db/repos/favorite/favorite.repo';
import { TemplateRepo } from '@docmost/db/repos/template/template.repo';
import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo';
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
import { AiChatRunRepo } from '@docmost/db/repos/ai-chat/ai-chat-run.repo';
import { AiChatPageSnapshotRepo } from '@docmost/db/repos/ai-chat/ai-chat-page-snapshot.repo';
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
import { AiMcpServerRepo } from '@docmost/db/repos/ai-chat/ai-mcp-server.repo';
@@ -105,6 +106,7 @@ import { normalizePostgresUrl } from '../common/helpers';
TemplateRepo,
AiChatRepo,
AiChatMessageRepo,
AiChatRunRepo,
AiChatPageSnapshotRepo,
AiProviderCredentialsRepo,
AiMcpServerRepo,
@@ -139,6 +141,7 @@ import { normalizePostgresUrl } from '../common/helpers';
TemplateRepo,
AiChatRepo,
AiChatMessageRepo,
AiChatRunRepo,
AiChatPageSnapshotRepo,
AiProviderCredentialsRepo,
AiMcpServerRepo,
@@ -0,0 +1,106 @@
import { type Kysely, sql } from 'kysely';
/**
* `ai_chat_runs` the agent RUN as a first-class, server-side lifecycle object
* (#184 phase 1: autonomous agent runs detached from the browser window).
*
* Until now an agent turn lived ONLY as long as the HTTP request was open
* (`res.hijack()` in ai-chat.controller.ts); a browser disconnect aborted it.
* This table makes a turn a persistent object the server owns: it is created
* when a run starts (inserted directly as 'running' in phase 1 'pending' is
* only this column's default + a reserved value, never written by code yet) and
* advances to succeeded|failed|aborted, surviving the subscriber (browser) going
* away when it settles. The DB is the source of
* truth a later client reconnects/sees the result by reading this row plus the
* assistant message it projects (`assistant_message_id`).
*
* The assistant message row (#183 step-granular durability) is the PROJECTION of
* a run's output; this row is the run's LIFECYCLE. They are linked by
* `assistant_message_id` (SET NULL if the message is later pruned).
*
* `status` : 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'.
* `trigger` : 'user' | 'autostart' | 'schedule' | 'api' | 'continue' only
* 'user' is produced in phase 1; the others are reserved for the
* autonomy triggers deferred to phase 2 so they need no later
* migration.
*
* ONE ACTIVE RUN PER CHAT is enforced by a partial unique index on `chat_id`
* WHERE status IN ('pending','running'): an autonomous run and a user run can
* never trample each other on the same chat. Settled runs (succeeded/failed/
* aborted) are excluded from the index so a chat can accumulate any number of
* historical runs.
*/
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('ai_chat_runs')
.ifNotExists()
.addColumn('id', 'uuid', (col) =>
col.primaryKey().defaultTo(sql`gen_uuid_v7()`),
)
.addColumn('chat_id', 'uuid', (col) =>
col.references('ai_chats.id').onDelete('cascade').notNull(),
)
.addColumn('workspace_id', 'uuid', (col) =>
col.references('workspaces.id').onDelete('cascade').notNull(),
)
// The human who triggered the run (audit). SET NULL on user deletion so the
// run history outlives its author; NULL is also the natural value for a
// future system/cron/api trigger with no human actor.
.addColumn('created_by', 'uuid', (col) =>
col.references('users.id').onDelete('set null'),
)
// The assistant message this run materializes (the #183 projection). SET NULL
// if that message row is later deleted; nullable because the run row is
// created a moment BEFORE the assistant row is seeded.
.addColumn('assistant_message_id', 'uuid', (col) =>
col.references('ai_chat_messages.id').onDelete('set null'),
)
.addColumn('trigger', 'varchar(20)', (col) =>
col.notNull().defaultTo('user'),
)
.addColumn('status', 'varchar(20)', (col) =>
col.notNull().defaultTo('pending'),
)
// Terminal error message for a failed run (provider/transport cause),
// mirroring the assistant message's metadata.error.
.addColumn('error', 'text', (col) => col)
// Number of agent steps finished so far (kept monotonic with the projection).
.addColumn('step_count', 'integer', (col) => col.notNull().defaultTo(0))
// Set when an EXPLICIT user stop is requested (distinct from a mere browser
// disconnect, which never stops a run). The runner aborts the turn and the
// run settles as 'aborted'.
.addColumn('stop_requested_at', 'timestamptz', (col) => col)
.addColumn('started_at', 'timestamptz', (col) => col)
.addColumn('finished_at', 'timestamptz', (col) => col)
.addColumn('created_at', 'timestamptz', (col) =>
col.notNull().defaultTo(sql`now()`),
)
.addColumn('updated_at', 'timestamptz', (col) =>
col.notNull().defaultTo(sql`now()`),
)
.execute();
// Reconnect / "latest run for this chat" reads hit chat_id first.
await db.schema
.createIndex('ai_chat_runs_chat_id_idx')
.ifNotExists()
.on('ai_chat_runs')
.column('chat_id')
.execute();
// One ACTIVE run per chat (advisory at the DB level): a second pending/running
// run on the same chat is rejected, so a user turn and an autonomous turn can
// never race on the same chat. Partial so settled runs do not collide.
await db.schema
.createIndex('ai_chat_runs_one_active_per_chat')
.ifNotExists()
.on('ai_chat_runs')
.column('chat_id')
.unique()
.where(sql.ref('status'), 'in', sql`('pending','running')`)
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable('ai_chat_runs').execute();
}
@@ -121,6 +121,23 @@ export class AiChatMessageRepo {
return rows.reverse();
}
/** Fetch a single message by id + workspace (e.g. a run's projection row for
* the #184 reconnect read). Returns undefined when nothing matches. */
async findById(
id: string,
workspaceId: string,
trx?: KyselyTransaction,
): Promise<AiChatMessage | undefined> {
const db = dbOrTx(this.db, trx);
return db
.selectFrom('aiChatMessages')
.select(this.baseFields)
.where('id', '=', id)
.where('workspaceId', '=', workspaceId)
.where('deletedAt', 'is', null)
.executeTakeFirst();
}
async insert(
insertable: InsertableAiChatMessage,
trx?: KyselyTransaction,
@@ -0,0 +1,82 @@
import { AiChatRunRepo, SWEEP_RUN_STALE_MS } from './ai-chat-run.repo';
import type { KyselyDB } from '../../types/kysely.types';
/**
* Unit coverage for AiChatRunRepo.sweepRunning over a chainable builder mock (no
* live DB). The F1 invariant under test (DECISION C): the BOOT sweep is
* UNCONDITIONAL it adds NO `updatedAt <` predicate, so a fresh 'running' run
* (updatedAt = now) IS settled rather than skipped by a staleness window. The
* window is added ONLY when an explicit `staleMs` is supplied (the future phase-2
* multi-instance timer sweep). We assert the EXACT predicates the spec mandates.
*/
describe('AiChatRunRepo.sweepRunning', () => {
type Recorded = {
table?: string;
set?: Record<string, unknown>;
wheres: Array<[string, string, unknown]>;
returning?: string;
};
function makeDb(swept: Array<{ id: string }>): {
db: KyselyDB;
rec: Recorded;
} {
const rec: Recorded = { wheres: [] };
const builder: Record<string, unknown> = {};
builder.set = (v: Record<string, unknown>) => {
rec.set = v;
return builder;
};
builder.where = (col: string, op: string, val: unknown) => {
rec.wheres.push([col, op, val]);
return builder;
};
builder.returning = (col: string) => {
rec.returning = col;
return builder;
};
builder.execute = () => Promise.resolve(swept);
const db = {
updateTable: (table: string) => {
rec.table = table;
return builder;
},
} as unknown as KyselyDB;
return { db, rec };
}
it('F1: the boot sweep (no staleMs) is UNCONDITIONAL — only a status filter, NO updatedAt window', async () => {
const { db, rec } = makeDb([{ id: 'r1' }, { id: 'r2' }]);
const repo = new AiChatRunRepo(db);
const swept = await repo.sweepRunning();
expect(swept).toBe(2);
expect(rec.table).toBe('aiChatRuns');
// The status filter is always present...
expect(rec.wheres).toContainEqual([
'status',
'in',
expect.arrayContaining(['pending', 'running']),
]);
// ...but a fresh 'running' run (updatedAt = now) must NOT be skipped: no
// updatedAt predicate at all on the boot path.
expect(rec.wheres.some(([col]) => col === 'updatedAt')).toBe(false);
// It flips to 'aborted' and stamps finishedAt.
expect(rec.set).toEqual(
expect.objectContaining({ status: 'aborted', finishedAt: expect.any(Date) }),
);
});
it('phase-2 path: an explicit staleMs reintroduces the updatedAt window', async () => {
const { db, rec } = makeDb([]);
const repo = new AiChatRunRepo(db);
await repo.sweepRunning({ staleMs: SWEEP_RUN_STALE_MS });
const updatedAtWhere = rec.wheres.find(([col]) => col === 'updatedAt');
expect(updatedAtWhere).toBeDefined();
expect(updatedAtWhere![1]).toBe('<');
expect(updatedAtWhere![2]).toBeInstanceOf(Date);
});
});
@@ -0,0 +1,212 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { sql } from 'kysely';
import { KyselyDB, KyselyTransaction } from '../../types/kysely.types';
import { dbOrTx } from '../../utils';
import {
AiChatRun,
InsertableAiChatRun,
} from '@docmost/db/types/entity.types';
// Statuses that count as "the run is still live" (an autonomous and a user run
// must never both be live on one chat — enforced by the partial unique index and
// checked here for friendly 409s before the insert races the constraint).
export const ACTIVE_RUN_STATUSES = ['pending', 'running'] as const;
// Crash-recovery sweep recency threshold (mirrors AiChatMessageRepo.sweepStreaming,
// #183): when a staleness window is supplied, a 'running'/'pending' run is only
// swept to 'aborted' once it has been UNTOUCHED for this long, so a sibling
// replica's boot-sweep can never abort a run another replica is actively
// executing. The runner bumps `updatedAt` on every step, so a live run never
// matches. PHASE 1 is single-process and the boot sweep passes NO window (every
// dangling run is settled unconditionally — see sweepRunning / F1). This constant
// is the window to reintroduce for the phase-2 multi-instance timer sweep.
export const SWEEP_RUN_STALE_MS = 10 * 60 * 1000; // 10 minutes
/**
* Repository for `ai_chat_runs` (#184 phase 1): the agent run as a first-class,
* server-side lifecycle object detached from the HTTP request. The run row is the
* point a client subscribes/reconnects to (by `id` or by chat); the assistant
* message it links to (`assistantMessageId`) is the #183 projection of its output.
*/
@Injectable()
export class AiChatRunRepo {
private readonly logger = new Logger(AiChatRunRepo.name);
private baseFields: Array<keyof AiChatRun> = [
'id',
'chatId',
'workspaceId',
'createdBy',
'assistantMessageId',
'trigger',
'status',
'error',
'stepCount',
'stopRequestedAt',
'startedAt',
'finishedAt',
'createdAt',
'updatedAt',
];
constructor(@InjectKysely() private readonly db: KyselyDB) {}
async insert(
insertable: InsertableAiChatRun,
trx?: KyselyTransaction,
): Promise<AiChatRun> {
const db = dbOrTx(this.db, trx);
return db
.insertInto('aiChatRuns')
.values(insertable)
.returning(this.baseFields)
.executeTakeFirst();
}
async findById(
id: string,
workspaceId: string,
trx?: KyselyTransaction,
): Promise<AiChatRun | undefined> {
const db = dbOrTx(this.db, trx);
return db
.selectFrom('aiChatRuns')
.select(this.baseFields)
.where('id', '=', id)
.where('workspaceId', '=', workspaceId)
.executeTakeFirst();
}
/** The currently-active (pending|running) run for a chat, if any. At most one
* exists thanks to the partial unique index. */
async findActiveByChat(
chatId: string,
workspaceId: string,
trx?: KyselyTransaction,
): Promise<AiChatRun | undefined> {
const db = dbOrTx(this.db, trx);
return db
.selectFrom('aiChatRuns')
.select(this.baseFields)
.where('chatId', '=', chatId)
.where('workspaceId', '=', workspaceId)
.where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[])
.executeTakeFirst();
}
/** The most-recent run for a chat (active or settled) — the reconnect target. */
async findLatestByChat(
chatId: string,
workspaceId: string,
trx?: KyselyTransaction,
): Promise<AiChatRun | undefined> {
const db = dbOrTx(this.db, trx);
return db
.selectFrom('aiChatRuns')
.select(this.baseFields)
.where('chatId', '=', chatId)
.where('workspaceId', '=', workspaceId)
.orderBy('createdAt', 'desc')
.orderBy('id', 'desc')
.limit(1)
.executeTakeFirst();
}
/**
* Patch a run by id + workspace; always bumps `updatedAt`. Used for every
* lifecycle transition (mark running, link the assistant message, bump
* step_count, finalize succeeded/failed/aborted). Returns the updated row or
* undefined when nothing matched (e.g. a foreign workspace).
*/
async update(
id: string,
workspaceId: string,
patch: Partial<{
status: string;
error: string | null;
stepCount: number;
assistantMessageId: string | null;
stopRequestedAt: Date | null;
startedAt: Date | null;
finishedAt: Date | null;
}>,
trx?: KyselyTransaction,
): Promise<AiChatRun | undefined> {
const db = dbOrTx(this.db, trx);
return db
.updateTable('aiChatRuns')
.set({ ...(patch as Record<string, unknown>), updatedAt: new Date() })
.where('id', '=', id)
.where('workspaceId', '=', workspaceId)
.returning(this.baseFields)
.executeTakeFirst();
}
/**
* Mark an EXPLICIT stop request on an active run (distinct from a browser
* disconnect, which never stops a run). Stamps `stop_requested_at` ONLY while
* the run is still active, so a late stop on an already-settled run is a no-op.
* Returns the row when a stop was recorded, else undefined (nothing active).
*/
async markStopRequested(
id: string,
workspaceId: string,
trx?: KyselyTransaction,
): Promise<AiChatRun | undefined> {
const db = dbOrTx(this.db, trx);
return db
.updateTable('aiChatRuns')
.set({ stopRequestedAt: new Date(), updatedAt: new Date() })
.where('id', '=', id)
.where('workspaceId', '=', workspaceId)
.where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[])
.returning(this.baseFields)
.executeTakeFirst();
}
/**
* Crash-recovery sweep (mirrors AiChatMessageRepo.sweepStreaming): flip every
* run still left pending/running a run whose process died before reaching a
* terminal status to 'aborted', stamping `finished_at`. Returns the number
* swept. Workspace-wide on purpose (a crash can dangle runs in any workspace).
*
* F1 (DECISION C): the BOOT sweep is UNCONDITIONAL it passes no `staleMs`, so
* EVERY dangling run is settled regardless of how recently it was touched. On a
* fresh single-process boot any pending|running run is definitionally hung (no
* runner is alive to own it), so a fast restart (deploy/OOM within minutes of
* the last step) no longer leaves a run stuck 'running' forever which would
* make the one-active-run gate 409 every future turn in that chat.
*
* The optional `staleMs` window is reintroduced ONLY for the future phase-2
* multi-instance timer sweep (see {@link SWEEP_RUN_STALE_MS}): there a booting
* replica must NOT abort a run another replica is actively executing, so it
* sweeps only runs UNTOUCHED past the window. Phase 1 is single-process, so the
* boot path supplies no window.
*/
async sweepRunning(
opts: { staleMs?: number } = {},
trx?: KyselyTransaction,
): Promise<number> {
const db = dbOrTx(this.db, trx);
const now = new Date();
let query = db
.updateTable('aiChatRuns')
.set({
status: 'aborted',
finishedAt: now,
updatedAt: now,
error: sql`coalesce(error, ${'Run interrupted by a server restart.'})`,
})
.where('status', 'in', ACTIVE_RUN_STATUSES as unknown as string[]);
// Multi-instance (phase 2) only: skip runs touched within the window so a
// sibling replica's live run is never aborted. Omitted on the phase-1 boot
// sweep -> unconditional.
if (typeof opts.staleMs === 'number') {
const staleBefore = new Date(now.getTime() - opts.staleMs);
query = query.where('updatedAt', '<', staleBefore);
}
const rows = await query.returning('id').execute();
return rows.length;
}
}
+30
View File
@@ -647,6 +647,35 @@ export interface AiChatMessages {
deletedAt: Timestamp | null;
}
// The agent RUN as a first-class server-side lifecycle object (#184 phase 1).
// Mirrors migration 20260627T130000-ai-chat-runs.ts. A run is created when an
// agent turn starts and survives the browser disconnecting; the DB is the source
// of truth a later client reconnects to. `assistantMessageId` links to the #183
// projection row (the assistant message this run materializes).
export interface AiChatRuns {
id: Generated<string>;
chatId: string;
workspaceId: string;
// SET NULL on user deletion (the run history outlives its author); also NULL
// for a future non-human trigger (cron/api).
createdBy: string | null;
// The assistant message this run materializes; SET NULL if it is pruned.
assistantMessageId: string | null;
// 'user' | 'autostart' | 'schedule' | 'api' | 'continue' (only 'user' is
// produced in phase 1; the rest are reserved for the deferred autonomy triggers).
trigger: Generated<string>;
// 'pending' | 'running' | 'succeeded' | 'failed' | 'aborted'.
status: Generated<string>;
error: string | null;
stepCount: Generated<number>;
// Set when an EXPLICIT user stop is requested (distinct from a disconnect).
stopRequestedAt: Timestamp | null;
startedAt: Timestamp | null;
finishedAt: Timestamp | null;
createdAt: Generated<Timestamp>;
updatedAt: Generated<Timestamp>;
}
// Per-(chat,page) snapshot of the open page's Markdown at the END of the agent's
// previous turn (#274). Mirrors migration 20260702T120000-ai-chat-page-snapshot.ts.
// The next turn diffs the CURRENT Markdown against `contentMd` to surface edits a
@@ -683,6 +712,7 @@ export interface DB {
aiAgentRoles: AiAgentRoles;
aiChats: AiChats;
aiChatMessages: AiChatMessages;
aiChatRuns: AiChatRuns;
aiChatPageSnapshots: AiChatPageSnapshots;
apiKeys: ApiKeys;
attachments: Attachments;
+15 -7
View File
@@ -3,6 +3,7 @@ import {
AiAgentRoles,
AiChats,
AiChatMessages,
AiChatRuns,
AiChatPageSnapshots,
Attachments,
Comments,
@@ -56,10 +57,12 @@ export type UpdatableAiChat = Updateable<Omit<AiChats, 'id'>>;
// full-text search. It is omitted from the public type so it never leaks
// into HTTP responses or the chat history fed to the language model.
export type AiChatMessage = Omit<Selectable<AiChatMessages>, 'tsv'>;
export type InsertableAiChatMessage = Omit<
Insertable<AiChatMessages>,
'tsv'
>;
export type InsertableAiChatMessage = Omit<Insertable<AiChatMessages>, 'tsv'>;
// AI Chat Run (#184 phase 1): the agent run as a first-class lifecycle object,
// detached from the HTTP request / browser window.
export type AiChatRun = Selectable<AiChatRuns>;
export type InsertableAiChatRun = Insertable<AiChatRuns>;
// AI Chat Page Snapshot (#274): per-(chat,page) Markdown snapshot taken at the
// end of the agent's previous turn, diffed against the current page next turn to
@@ -214,11 +217,14 @@ export type UpdatableFavorite = Updateable<Omit<Favorites, 'id'>>;
// Page Transclusion
export type PageTransclusion = Selectable<PageTransclusions>;
export type InsertablePageTransclusion = Insertable<PageTransclusions>;
export type UpdatablePageTransclusion = Updateable<Omit<PageTransclusions, 'id'>>;
export type UpdatablePageTransclusion = Updateable<
Omit<PageTransclusions, 'id'>
>;
// Page Transclusion Reference
export type PageTransclusionReference = Selectable<PageTransclusionReferences>;
export type InsertablePageTransclusionReference = Insertable<PageTransclusionReferences>;
export type InsertablePageTransclusionReference =
Insertable<PageTransclusionReferences>;
export type UpdatablePageTransclusionReference = Updateable<
Omit<PageTransclusionReferences, 'id'>
>;
@@ -288,7 +294,9 @@ export type UpdatablePagePermission = Updateable<Omit<_PagePermissions, 'id'>>;
// Page Verification
export type PageVerification = Selectable<_PageVerifications>;
export type InsertablePageVerification = Insertable<_PageVerifications>;
export type UpdatablePageVerification = Updateable<Omit<_PageVerifications, 'id'>>;
export type UpdatablePageVerification = Updateable<
Omit<_PageVerifications, 'id'>
>;
// Page Verifier
export type PageVerifier = Selectable<_PageVerifiers>;
@@ -261,6 +261,21 @@ export class EnvironmentService {
return disable === 'true';
}
/**
* Deferred tool loading for the in-app AI chat (#332). When enabled, the agent
* sees a compact <tool_catalog> and only CORE tools + the loadTools meta-tool
* are active each step; deferred tools (the fat/rare ones + all external MCP
* tools) load on demand. Defaults to ENABLED the issue treats deferred
* loading as the new behavior; set AI_CHAT_DEFERRED_TOOLS=false to restore the
* old "all tools always active" behavior.
*/
isAiChatDeferredToolsEnabled(): boolean {
const enabled = this.configService
.get<string>('AI_CHAT_DEFERRED_TOOLS', 'true')
.toLowerCase();
return enabled === 'true';
}
getPostHogHost(): string {
return this.configService.get<string>('POSTHOG_HOST');
}
@@ -0,0 +1,304 @@
import { Kysely } from 'kysely';
import {
AiChatRunRepo,
SWEEP_RUN_STALE_MS,
} from '@docmost/db/repos/ai-chat/ai-chat-run.repo';
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
import { AiChatRunService } from '../../src/core/ai-chat/ai-chat-run.service';
import {
getTestDb,
destroyTestDb,
createWorkspace,
createUser,
createChat,
} from './db';
/**
* Integration coverage for the #184 phase-1 durable agent run: real SQL against
* docmost_test. Proves the core invariant primitives a run is a first-class
* lifecycle row, at most one is active per chat, a detached run's progress
* survives with NO subscriber, an explicit stop settles it as aborted, a
* reconnect read returns the persisted state, and a crash sweep recovers
* dangling runs.
*/
describe('AiChatRun durable lifecycle [integration]', () => {
let db: Kysely<any>;
let runRepo: AiChatRunRepo;
let messageRepo: AiChatMessageRepo;
let service: AiChatRunService;
let workspaceId: string;
let otherWorkspaceId: string;
let userId: string;
let chatId: string;
beforeAll(async () => {
db = getTestDb();
runRepo = new AiChatRunRepo(db as any);
messageRepo = new AiChatMessageRepo(db as any);
// Boot-sweep isn't triggered here; the isCloud stub is all the service needs
// for these direct-call integration cases (F7).
service = new AiChatRunService(runRepo, { isCloud: () => false } as never);
workspaceId = (await createWorkspace(db)).id;
otherWorkspaceId = (await createWorkspace(db)).id;
userId = (await createUser(db, workspaceId)).id;
chatId = (await createChat(db, { workspaceId, creatorId: userId })).id;
});
afterAll(async () => {
await destroyTestDb();
});
// Each test that creates an active run settles it (or uses its own chat) so the
// partial unique index does not bleed across tests.
it('insert + findById round-trips a run row, defaulting status/trigger', async () => {
const run = await runRepo.insert({
chatId,
workspaceId,
createdBy: userId,
});
expect(run.status).toBe('pending');
expect(run.trigger).toBe('user');
expect(run.stepCount).toBe(0);
const found = await runRepo.findById(run.id, workspaceId);
expect(found!.id).toBe(run.id);
// Workspace-scoped: a foreign workspace sees nothing.
expect(await runRepo.findById(run.id, otherWorkspaceId)).toBeUndefined();
// settle so it does not occupy the active slot
await runRepo.update(run.id, workspaceId, {
status: 'succeeded',
finishedAt: new Date(),
});
});
it('enforces ONE ACTIVE run per chat (partial unique index rejects a second)', async () => {
const activeChat = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const first = await runRepo.insert({
chatId: activeChat,
workspaceId,
createdBy: userId,
status: 'running',
});
// A second pending/running run on the SAME chat must be rejected by the DB.
await expect(
runRepo.insert({
chatId: activeChat,
workspaceId,
createdBy: userId,
status: 'running',
}),
).rejects.toThrow();
// findActiveByChat returns exactly the one active run.
const active = await runRepo.findActiveByChat(activeChat, workspaceId);
expect(active!.id).toBe(first.id);
// Once it settles, the slot frees and a new run may start.
await runRepo.update(first.id, workspaceId, {
status: 'succeeded',
finishedAt: new Date(),
});
expect(
await runRepo.findActiveByChat(activeChat, workspaceId),
).toBeUndefined();
const second = await runRepo.insert({
chatId: activeChat,
workspaceId,
createdBy: userId,
status: 'running',
});
expect(second.id).not.toBe(first.id);
await runRepo.update(second.id, workspaceId, {
status: 'aborted',
finishedAt: new Date(),
});
});
it('DETACHED run: persists + finalizes succeeded with NO subscriber, reconnect returns state', async () => {
// A dedicated chat so the active-run slot is clean.
const runChat = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
// beginRun = the runner starts the turn (registers an in-memory controller).
const handle = await service.beginRun({
chatId: runChat,
workspaceId,
userId,
});
expect(handle.signal.aborted).toBe(false);
expect(service.isLocallyActive(handle.runId)).toBe(true);
// The assistant projection row (#183) is seeded + linked.
const seeded = await messageRepo.insert({
chatId: runChat,
workspaceId,
userId,
role: 'assistant',
content: '',
status: 'streaming',
metadata: { parts: [] } as never,
});
await service.linkAssistantMessage(handle.runId, workspaceId, seeded.id);
// Progress is persisted as steps finish — NO HTTP socket involved here at all.
await service.recordStep(handle.runId, workspaceId, 1);
await messageRepo.update(seeded.id, workspaceId, {
content: 'partial work',
metadata: { parts: [{ type: 'text', text: 'partial work' }] },
});
// The turn completes; finalize the projection then the run.
await messageRepo.update(seeded.id, workspaceId, {
content: 'final answer',
status: 'completed',
});
await service.finalizeRun(handle.runId, workspaceId, 'completed');
expect(service.isLocallyActive(handle.runId)).toBe(false);
// Reconnect: the latest run for the chat + its projected message, from the DB.
const run = await service.getLatestForChat(runChat, workspaceId);
expect(run!.status).toBe('succeeded');
expect(run!.stepCount).toBe(1);
expect(run!.assistantMessageId).toBe(seeded.id);
expect(run!.finishedAt).toBeTruthy();
const message = await messageRepo.findById(seeded.id, workspaceId);
expect(message!.status).toBe('completed');
expect(message!.content).toBe('final answer');
});
it('EXPLICIT stop aborts the run signal, marks the row, and settles as aborted', async () => {
const runChat = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const handle = await service.beginRun({
chatId: runChat,
workspaceId,
userId,
});
// User presses Stop.
const stopped = await service.requestStop(handle.runId, workspaceId);
expect(stopped).toBe(true);
expect(handle.signal.aborted).toBe(true);
// The row carries the stop request (distinct from a disconnect, which would
// leave stop_requested_at NULL).
const afterStop = await runRepo.findById(handle.runId, workspaceId);
expect(afterStop!.stopRequestedAt).toBeTruthy();
// The terminal callback (onAbort) settles the run.
await service.finalizeRun(handle.runId, workspaceId, 'aborted');
const run = await service.getLatestForChat(runChat, workspaceId);
expect(run!.status).toBe('aborted');
});
it('markStopRequested is a no-op on an already-settled run (returns undefined)', async () => {
const runChat = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const run = await runRepo.insert({
chatId: runChat,
workspaceId,
createdBy: userId,
status: 'running',
});
await runRepo.update(run.id, workspaceId, {
status: 'succeeded',
finishedAt: new Date(),
});
const marked = await runRepo.markStopRequested(run.id, workspaceId);
expect(marked).toBeUndefined();
});
it('sweepRunning aborts STALE dangling runs but not fresh or settled ones', async () => {
const sweepChat1 = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const sweepChat2 = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const sweepChat3 = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const stale = await runRepo.insert({
chatId: sweepChat1,
workspaceId,
createdBy: userId,
status: 'running',
});
const fresh = await runRepo.insert({
chatId: sweepChat2,
workspaceId,
createdBy: userId,
status: 'running',
});
const settled = await runRepo.insert({
chatId: sweepChat3,
workspaceId,
createdBy: userId,
status: 'running',
});
await runRepo.update(settled.id, workspaceId, {
status: 'succeeded',
finishedAt: new Date(),
});
// Backdate the stale run's updatedAt past the 10-minute staleness window.
await db
.updateTable('aiChatRuns')
.set({ updatedAt: new Date(Date.now() - 20 * 60 * 1000) })
.where('id', '=', stale.id)
.execute();
// WINDOWED sweep (phase-2 multi-instance timer path): only runs older than the
// staleness window are aborted, so a sibling replica's fresh run survives. The
// no-arg boot sweep (variant C) is unconditional — covered separately below.
const swept = await runRepo.sweepRunning({ staleMs: SWEEP_RUN_STALE_MS });
expect(swept).toBeGreaterThanOrEqual(1);
expect((await runRepo.findById(stale.id, workspaceId))!.status).toBe(
'aborted',
);
// Fresh (recently-updated) running run survives the WINDOWED sweep — a sibling
// replica may still be executing it.
expect((await runRepo.findById(fresh.id, workspaceId))!.status).toBe(
'running',
);
expect((await runRepo.findById(settled.id, workspaceId))!.status).toBe(
'succeeded',
);
// cleanup active fresh run
await runRepo.update(fresh.id, workspaceId, {
status: 'aborted',
finishedAt: new Date(),
});
});
it('sweepRunning() with NO args (boot sweep / variant C) aborts even a FRESH running run', async () => {
// F1/DECISION C at the SQL level: the unconditional boot sweep has NO
// staleness window, so a run updated just now (a fast restart) is settled too
// — otherwise it would stay 'running' forever and 409 every future turn.
const bootChat = (
await createChat(db, { workspaceId, creatorId: userId })
).id;
const fresh = await runRepo.insert({
chatId: bootChat,
workspaceId,
createdBy: userId,
status: 'running',
});
// updatedAt = now (fresh, untouched). The no-arg sweep settles it anyway.
const swept = await runRepo.sweepRunning();
expect(swept).toBeGreaterThanOrEqual(1);
expect((await runRepo.findById(fresh.id, workspaceId))!.status).toBe(
'aborted',
);
});
});
@@ -1,5 +1,7 @@
import * as http from 'node:http';
import { Kysely } from 'kysely';
import { tool } from 'ai';
import { z } from 'zod';
import { MockLanguageModelV3, convertArrayToReadableStream } from 'ai/test';
import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo';
import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo';
@@ -146,6 +148,9 @@ describe('AiChatService.stream [integration]', () => {
{} as any, // aiAgentRoleRepo (role is pre-resolved + passed in)
{} as any, // pageRepo (only used when body.openPage is set)
{} as any, // pageAccess (idem)
// environment (#332): keep deferred tool loading OFF for this lifecycle
// harness so the toolset/behavior is exactly as before.
{ isAiChatDeferredToolsEnabled: () => false } as any,
);
}
@@ -315,4 +320,174 @@ describe('AiChatService.stream [integration]', () => {
true,
);
});
/**
* #332 deferred tool loading, the ON path. The riskiest property is that the
* per-turn `activatedTools` Set is created FRESH inside each stream() call, so a
* tool a previous turn activated via loadTools is NOT still active when the next
* turn starts the new turn begins "cold" (CORE + loadTools only). The unit
* tests only exercise pure prepareAgentStep with hand-fed Sets; this pins the
* real wiring end-to-end (loadTools.execute -> activatedTools -> prepareStep ->
* per-step activeTools) against the real streamText loop, and proves there is no
* cross-turn leak. We drive a MockLanguageModelV3 whose step 1 calls
* loadTools(['createPage']) and assert, via the model's recorded per-step
* CallOptions.tools (the AI SDK filters the provider tool list by activeTools),
* that the deferred tool becomes active on the SAME turn's next step but NOT on a
* fresh turn's first step.
*/
describe('deferred tool loading ON — per-turn activation, no leak (#332)', () => {
// A stub deferred (non-core) tool the agent can activate. Its execute is never
// called — the model only needs to SEE it become active — but it must be a
// valid AI-SDK tool so the SDK includes it in a step's tool list once active.
const createPageStub = tool({
description: 'create a new page',
inputSchema: z.object({ title: z.string() }),
execute: async () => ({ id: 'p-stub' }),
});
// A CORE tool in the toolset, so a cold step shows CORE tools ARE active while
// the deferred createPage is not. `searchPages` is in CORE_TOOL_SET.
const searchPagesStub = tool({
description: 'search the wiki',
inputSchema: z.object({ query: z.string() }),
execute: async () => [],
});
// Same lifecycle harness as buildService() above, but with deferred loading ON
// and a toolset that exposes exactly one deferred tool (createPage) so it is
// catalogued + loadable-by-name. Kept separate so the OFF scenarios are
// untouched.
function buildDeferredService(): AiChatService {
return new AiChatService(
{ getChatModel: async () => null } as any,
aiChatRepo,
msgRepo,
{} as any,
{ resolve: async () => null } as any,
{
forUser: async () => ({
searchPages: searchPagesStub,
createPage: createPageStub,
}),
getInAppDeferredCatalog: async () => [
{ name: 'createPage', catalogLine: 'createPage — create a new page.' },
],
} as any,
mcpClients as any,
{} as any,
{} as any,
{} as any,
// #332: deferred tool loading ON — the property under test.
{ isAiChatDeferredToolsEnabled: () => true } as any,
);
}
// Drive ONE stream() turn against `model` and wait for the assistant row to
// settle (mirrors runStream, but builds the deferred-ON service).
async function runDeferredTurn(
model: MockLanguageModelV3,
chatId: string,
body: any,
): Promise<void> {
closeCalls = 0;
const service = buildDeferredService();
const { res, cleanup } = await makeRealResponse();
try {
await service.stream({
user: { id: userId, workspaceId } as any,
workspace: { id: workspaceId, name: 'WS' } as any,
sessionId: 'sess-1',
body,
res: { raw: res } as any,
signal: new AbortController().signal,
model: model as any,
role: null,
} as any);
await waitFor(async () => {
const rows = await msgRepo.findAllByChat(chatId, workspaceId);
return rows.some(
(r) =>
r.role === 'assistant' &&
['completed', 'error', 'aborted'].includes(r.status as string),
);
});
await waitFor(() => closeCalls > 0, { timeoutMs: 5_000 });
} finally {
await cleanup();
}
}
// Tool names the provider actually received for a recorded step (activeTools
// filters this list, so it reflects what was active that step).
const toolNames = (call: any): string[] =>
((call?.tools ?? []) as any[]).map((t) => t?.name).filter(Boolean);
// A model that, on step 1, calls loadTools(['createPage']); on step 2, answers.
function loadThenAnswerModel(): MockLanguageModelV3 {
let step = 0;
return new MockLanguageModelV3({
doStream: async () => {
const n = step++;
if (n === 0) {
return {
stream: convertArrayToReadableStream([
{ type: 'stream-start', warnings: [] },
{
type: 'tool-call',
toolCallId: 'lt1',
toolName: 'loadTools',
input: JSON.stringify({ names: ['createPage'] }),
},
{
type: 'finish',
finishReason: 'tool-calls',
usage: { inputTokens: 5, outputTokens: 3, totalTokens: 8 },
},
] as any),
};
}
return { stream: successStream() };
},
} as any);
}
it('activates a deferred tool for the SAME turn, and a NEW turn starts cold (no leak)', async () => {
const chatId = (await createChat(db, { workspaceId, creatorId: userId })).id;
// --- Turn 1: loadTools(createPage) on step 1, then answer on step 2. ---
const model1 = loadThenAnswerModel();
await runDeferredTurn(model1, chatId, {
chatId,
messages: [userUiMessage('Make me a page')],
});
// The turn ran at least two steps (the load round-trip + the answer).
expect(model1.doStreamCalls.length).toBeGreaterThanOrEqual(2);
const step1Tools = toolNames(model1.doStreamCalls[0]);
const step2Tools = toolNames(model1.doStreamCalls[1]);
// Step 1 starts cold: CORE tools + the loadTools meta-tool are active, but
// the deferred createPage is NOT yet.
expect(step1Tools).toContain('loadTools');
expect(step1Tools).toContain('searchPages'); // a CORE tool, always active
expect(step1Tools).not.toContain('createPage');
// Step 2 of the SAME turn sees the just-activated deferred tool.
expect(step2Tools).toContain('createPage');
// --- Turn 2 on the SAME chat: must start cold again. ---
const model2 = new MockLanguageModelV3({
doStream: async () => ({ stream: successStream() }),
} as any);
await runDeferredTurn(model2, chatId, {
chatId,
messages: [userUiMessage('And another thing')],
});
const nextTurnFirstStep = toolNames(model2.doStreamCalls[0]);
expect(nextTurnFirstStep).toContain('loadTools');
// The activated set is per-turn: the prior turn's createPage did NOT leak,
// so the fresh turn's first step sees it deferred again.
expect(nextTurnFirstStep).not.toContain('createPage');
});
});
});
+65
View File
@@ -31,6 +31,22 @@ export interface SharedToolSpec {
inAppKey: string;
/** Single canonical model-facing description used by both layers. */
description: string;
/**
* Deferred-tool tier for the IN-APP agent (#332). 'core' tools are always
* active; 'deferred' tools are hidden behind the <tool_catalog> and loaded on
* demand via the loadTools meta-tool. This is an IN-APP concern only: the
* standalone /mcp server ignores this field and registers every tool normally
* (registerShared in index.ts reads mcpName/description/buildShape only).
*/
tier: 'core' | 'deferred';
/**
* Hand-written one-liner "name — purpose" shown in the in-app agent's
* <tool_catalog> for a DEFERRED tool (#332). Deliberately NOT derived from the
* description's first sentence a concise, accurate purpose line. Present on
* every spec (core tools too) for uniformity; only deferred ones are rendered.
* Inert for the external /mcp server.
*/
catalogLine: string;
/**
* Builds the tool's input schema as a plain object of zod fields (a
* ZodRawShape). Called with the consumer's own zod namespace. Omitted for
@@ -47,6 +63,8 @@ export const SHARED_TOOL_SPECS = {
mcpName: 'get_workspace',
inAppKey: 'getWorkspace',
description: 'Fetch metadata about the current workspace (name, settings).',
tier: 'core',
catalogLine: 'getWorkspace — fetch current workspace metadata (name, settings).',
},
listSpaces: {
@@ -55,6 +73,8 @@ export const SHARED_TOOL_SPECS = {
description:
'List the spaces the current user can access. Returns the array of ' +
'spaces (id, name, slug, ...).',
tier: 'core',
catalogLine: 'listSpaces — list the spaces the user can access (id, name, slug).',
},
listShares: {
@@ -62,6 +82,8 @@ export const SHARED_TOOL_SPECS = {
inAppKey: 'listShares',
description:
'List all public shares in the workspace with page titles and public URLs.',
tier: 'deferred',
catalogLine: 'listShares — list all public shares in the workspace with their URLs.',
},
// --- single-pageId read tools ---
@@ -74,6 +96,9 @@ export const SHARED_TOOL_SPECS = {
'includes block ids, callouts, tables, link/image attributes) plus the ' +
'slugId used in URLs. Use the block ids it returns to make precise ' +
'structural edits or surgical text edits without resending the page.',
tier: 'deferred',
catalogLine:
"getPageJson — get a page's raw ProseMirror JSON (lossless, with block ids).",
buildShape: (z) => ({
pageId: z.string().min(1),
}),
@@ -88,6 +113,9 @@ export const SHARED_TOOL_SPECS = {
'count) WITHOUT the full document body. Use it to locate sections/tables ' +
'and grab block ids cheaply before fetching, patching or inserting ' +
'individual blocks.',
tier: 'core',
catalogLine:
"getOutline — compact outline of a page's top-level blocks with their ids.",
buildShape: (z) => ({
pageId: z.string().min(1),
}),
@@ -104,6 +132,9 @@ export const SHARED_TOOL_SPECS = {
'outline or page-JSON view (works for headings/paragraphs/callouts/images), OR ' +
'`#<index>` to fetch a top-level block by its outline index — use the ' +
'`#<index>` form for tables/rows/cells, which carry no id.',
tier: 'core',
catalogLine:
"getNode — fetch one block's ProseMirror subtree by block id or #index.",
buildShape: (z) => ({
pageId: z.string().min(1),
nodeId: z.string().min(1),
@@ -137,6 +168,9 @@ export const SHARED_TOOL_SPECS = {
'caseSensitive:true to match case. Ideal for systematic ' +
'editorial sweeps (unquoted "ё", straight quotes, "т.е.", stray units). An ' +
'invalid regex or an empty query returns a clear error to fix.',
tier: 'core',
catalogLine:
'searchInPage — find every occurrence of a string/regex inside one page, with locations.',
buildShape: (z) => ({
pageId: z.string().min(1).describe('ID of the page to search'),
query: z
@@ -172,6 +206,8 @@ export const SHARED_TOOL_SPECS = {
description:
'Remove a single block by its attrs.id (from the page outline or ' +
'page-JSON view) WITHOUT resending the whole document.',
tier: 'deferred',
catalogLine: 'deleteNode — remove a single content block by its block id.',
buildShape: (z) => ({
pageId: z.string().min(1),
nodeId: z.string().min(1),
@@ -203,6 +239,9 @@ export const SHARED_TOOL_SPECS = {
'JSON object or a JSON string (both accepted). Cheaper and safer than ' +
'replacing the whole document for one-block structural edits. Reversible: ' +
'the previous version is kept in page history.',
tier: 'deferred',
catalogLine:
'patchNode — replace one block with a new ProseMirror node, keeping its id.',
buildShape: (z) => ({
pageId: z.string().min(1).describe('ID of the page containing the block'),
nodeId: z
@@ -245,6 +284,9 @@ export const SHARED_TOOL_SPECS = {
'[{"type":"text","text":"Title"}]}. Bold is a mark: ' +
'{"type":"text","text":"x","marks":[{"type":"bold"}]}. The node may be a ' +
'JSON object or a JSON string (both accepted). Reversible via page history.',
tier: 'deferred',
catalogLine:
'insertNode — insert a block before/after an anchor, or append at the end.',
buildShape: (z) => ({
pageId: z.string().min(1),
node: z
@@ -278,6 +320,8 @@ export const SHARED_TOOL_SPECS = {
mcpName: 'unshare_page',
inAppKey: 'unsharePage',
description: 'Remove the public share of a page (revokes the public URL).',
tier: 'deferred',
catalogLine: "unsharePage — revoke a page's public share (removes the public URL).",
buildShape: (z) => ({
pageId: z.string().min(1).describe('ID of the page to unshare'),
}),
@@ -295,6 +339,9 @@ export const SHARED_TOOL_SPECS = {
"`from`/`to` each accept a historyId, or null/'current' for the page's " +
'current content (defaults: from=current, to=current — pass a historyId ' +
'from the page-history list to compare against the live page).',
tier: 'deferred',
catalogLine:
'diffPageVersions — diff two page versions and return the change set + summary.',
buildShape: (z) => ({
pageId: z.string().min(1),
from: z
@@ -315,6 +362,9 @@ export const SHARED_TOOL_SPECS = {
"List a page's saved versions (Docmost auto-snapshots on every save), " +
'newest first, cursor-paginated. Returns { items, nextCursor }; each ' +
"item's id is the historyId to pass to the page diff or restore tools.",
tier: 'deferred',
catalogLine:
"listPageHistory — list a page's saved versions (newest first, paginated).",
buildShape: (z) => ({
pageId: z.string().min(1),
cursor: z
@@ -332,6 +382,9 @@ export const SHARED_TOOL_SPECS = {
'as the page\'s current content (Docmost has no restore endpoint, so ' +
'this creates a NEW history snapshot — the restore is itself revertible). ' +
'Get the historyId from the page-history list.',
tier: 'deferred',
catalogLine:
'restorePageVersion — restore a page to a saved history version (revertible).',
buildShape: (z) => ({
historyId: z.string().min(1),
}),
@@ -349,6 +402,9 @@ export const SHARED_TOOL_SPECS = {
'thread records are NOT created/updated/deleted on the server by this ' +
'tool — only the page body + inline comment marks are written; manage ' +
'comment threads via the comment tools/UI.',
tier: 'deferred',
catalogLine:
"importPageMarkdown — replace a page's content from exported Docmost Markdown.",
buildShape: (z) => ({
pageId: z.string().min(1),
markdown: z.string().min(1),
@@ -365,6 +421,9 @@ export const SHARED_TOOL_SPECS = {
'entirely server-side — the document is NOT sent through the model. The ' +
'target keeps its own title and slug; only its body is replaced. Ideal ' +
"for 'make page A's content equal to B' or 'replace A with B but keep A's URL'.",
tier: 'deferred',
catalogLine:
"copyPageContent — replace one page's body with a copy of another page's body.",
buildShape: (z) => ({
sourcePageId: z.string().min(1).describe('Page to copy content FROM'),
targetPageId: z
@@ -402,6 +461,9 @@ export const SHARED_TOOL_SPECS = {
'page JSON and use a structural node patch/update to set its marks. ' +
'Examples: edits:[{find:"teh",replace:"the"}]; edits:[{find:"Hello ' +
'world",replace:"Hello there"}] (crosses a bold boundary).',
tier: 'core',
catalogLine:
"editPageText — surgical find/replace of plain text in a page, preserving ids/marks.",
buildShape: (z) => ({
pageId: z.string().describe('ID of the page to edit'),
edits: z
@@ -440,6 +502,9 @@ export const SHARED_TOOL_SPECS = {
'server instance that created it: in a multi-replica deployment without ' +
'sticky sessions a blob stored on one instance is not retrievable via the ' +
'sandbox URL on another (it 404s like an expired one).',
tier: 'deferred',
catalogLine:
'stashPage — serialize a whole page to a short anonymous URL without loading its body.',
buildShape: (z) => ({
pageId: z.string().min(1),
}),
+1 -1
View File
@@ -450,7 +450,7 @@ async function main() {
// 8. get_page markdown round-trip sanity (table separator present)
const md = await client.getPage(pageId);
check("get_page md: table separator emitted", md.data.content.includes("| --- |"), "");
check("get_page md: callout exported as :::", md.data.content.includes(":::info"));
check("get_page md: callout exported as Obsidian '> [!info]'", md.data.content.includes("> [!info]"));
// 9. comments: create / list / reply / update / check_new / delete
const beforeComments = new Date(Date.now() - 1000).toISOString();
@@ -635,13 +635,17 @@ const Attachment = Node.create({
},
name: {
default: null,
parseHTML: (el: HTMLElement) => el.getAttribute("data-attachment-name"),
// Empty-string-vs-absent idempotency (GS-EDIT-REVERT class): "" -> default.
parseHTML: (el: HTMLElement) =>
el.getAttribute("data-attachment-name") || null,
renderHTML: (attrs: Record<string, any>) =>
attrs.name ? { "data-attachment-name": attrs.name } : {},
},
mime: {
default: null,
parseHTML: (el: HTMLElement) => el.getAttribute("data-attachment-mime"),
// Empty-string-vs-absent idempotency (GS-EDIT-REVERT class): "" -> default.
parseHTML: (el: HTMLElement) =>
el.getAttribute("data-attachment-mime") || null,
renderHTML: (attrs: Record<string, any>) =>
attrs.mime ? { "data-attachment-mime": attrs.mime } : {},
},
@@ -689,7 +693,10 @@ const Video = Node.create({
},
alt: {
default: null,
parseHTML: (el: HTMLElement) => el.getAttribute("aria-label"),
// Empty-string-vs-absent idempotency: coerce "" back to the default so a
// stray empty `aria-label` never materializes `alt: ""` on a video stored
// with no alt (same GS-EDIT-REVERT class as the image `alt` fix).
parseHTML: (el: HTMLElement) => el.getAttribute("aria-label") || null,
renderHTML: (attrs: Record<string, any>) =>
attrs.alt ? { "aria-label": attrs.alt } : {},
},
@@ -864,13 +871,15 @@ const diagramAttributes = () => ({
},
title: {
default: null,
parseHTML: (el: HTMLElement) => el.getAttribute("data-title"),
// Empty-string-vs-absent idempotency (GS-EDIT-REVERT class): "" -> default.
parseHTML: (el: HTMLElement) => el.getAttribute("data-title") || null,
renderHTML: (attrs: Record<string, any>) =>
attrs.title ? { "data-title": attrs.title } : {},
},
alt: {
default: null,
parseHTML: (el: HTMLElement) => el.getAttribute("data-alt"),
// Empty-string-vs-absent idempotency (GS-EDIT-REVERT class): "" -> default.
parseHTML: (el: HTMLElement) => el.getAttribute("data-alt") || null,
renderHTML: (attrs: Record<string, any>) =>
attrs.alt ? { "data-alt": attrs.alt } : {},
},
@@ -1106,7 +1115,8 @@ const Pdf = Node.create({
},
name: {
default: null,
parseHTML: (el: HTMLElement) => el.getAttribute("data-name"),
// Empty-string-vs-absent idempotency (GS-EDIT-REVERT class): "" -> default.
parseHTML: (el: HTMLElement) => el.getAttribute("data-name") || null,
renderHTML: (attrs: Record<string, any>) =>
attrs.name ? { "data-name": attrs.name } : {},
},
@@ -1491,6 +1501,29 @@ export const docmostExtensions = [
...parent.height,
parseHTML: (el: HTMLElement) => el.getAttribute("height"),
},
// Empty-string-vs-absent idempotency (GS-EDIT-REVERT class). `marked`
// renders `![](src)` as `<img alt="">`, so the stock Image `alt`
// parseHTML (`getAttribute("alt")`) materializes `alt: ""` on an image
// that was stored with NO alt (attr absent). That is a false diff against
// the editor-stored form (a no-alt image has alt ABSENT, not ""), so a
// git-sync / ai-chat touch of a page with a plain image produced phantom
// churn. Coerce an empty string back to the attr's default (null) so the
// import is idempotent. A real alt survives verbatim (`|| undefined` keeps
// the truthy value; the default fills the empty case). `title` is coerced
// the same way for the whole class, even though `marked` does not
// currently emit `title=""` — defence in depth against any path that does.
// NOTE: this DIVERGES from editor-ext's literal image `alt` parseHTML
// (`getAttribute("alt")`, which returns "" verbatim), but CONVERGES on
// editor-ext's real STORED shape: an editor image inserted without alt
// renders with no `alt` attribute and re-parses as absent, never "".
alt: {
...parent.alt,
parseHTML: (el: HTMLElement) => el.getAttribute("alt") || null,
},
title: {
...parent.title,
parseHTML: (el: HTMLElement) => el.getAttribute("title") || null,
},
};
},
}).configure({ inline: false }),
@@ -0,0 +1,443 @@
/**
* Reusable round-trip-STABILITY matrix helper (fixtures-first).
*
* A single stored node authored WITHOUT a given string attribute (attr
* absent / undefined) must not gain a phantom EMPTY-STRING value after a
* markdown round-trip the "empty-string-vs-absent" churn class. This helper,
* given a node spec, drives a matrix of attribute combinations through the REAL
* converter (`convertProseMirrorToMarkdown` -> `markdownToProseMirror`) and
* asserts byte-stability on two contours:
*
* 1. RAW round-trip: for the node under test, every attribute the round-trip
* materializes must equal what the INPUT authored an authored attr keeps
* its value, an ABSENT attr may only reappear at its SCHEMA DEFAULT. If an
* absent attr comes back as a NON-default value (e.g. `alt: ""` where the
* default is `null`), that is an instability and is reported precisely as
* `type.attr: absent -> "<got>"`. This is the contour git-sync / stored
* JSON diffs on, so masking it only in `canonicalize` would leave the noise.
*
* 2. CANONICAL round-trip: `canonicalizeContent(original)` must deep-equal
* `canonicalizeContent(roundtrip)` (a second, semantic contour).
*
* The ONLY normalization the helper treats as allowed (not an instability) is
* the DOCUMENTED numeric width/height/size/aspectRatio -> string coercion the
* converter performs on purpose (a stored numeric `640` re-parses via
* `getAttribute` as the string `"640"`). It is encoded here as an explicit
* per-spec `numericStringAttrs` set applied to BOTH contours, NOT a silent skip.
*
* The helper is node-type agnostic: image and the whole media family share the
* `align !== "center"` predicate + `<!--name {…}-->` comment machinery, so one
* matrix guards the shared class.
*/
import { getSchema } from "@tiptap/core";
import {
convertProseMirrorToMarkdown,
markdownToProseMirror,
canonicalizeContent,
docmostExtensions,
} from "../src/lib/index.js";
import { firstDivergence } from "./roundtrip-helpers.js";
/** One attribute's two probe values. */
export interface AttrMatrixEntry {
/** Attribute name on the node. */
attr: string;
/**
* The "default" pick. `undefined` means the attribute is OMITTED entirely
* (the absent case the one that can materialize an empty string on import).
* A concrete value is authored verbatim.
*/
default: unknown;
/** A representative NON-default value to exercise (must survive verbatim). */
nonDefault: unknown;
/**
* Marks the attr as a member of the EMPTY-STRING class the fix targets: a
* string attr whose schema default is `null`/absent and whose parseHTML
* coerces `"" -> default` (image/drawio `alt`+`title`, video `alt` via
* aria-label, pdf/attachment `name`, attachment `mime`). Set true to also
* drive the THIRD-STATE convergence case (see runConvergenceCase) for this
* attr. Attrs whose default is NOT null (e.g. embed `provider`, default "")
* or that are not `""`-coerced (control attrs) are left unset.
*/
emptyStringClass?: boolean;
}
/** A node type + the attribute matrix to sweep for it. */
export interface NodeStabilitySpec {
/** Node type (e.g. "image", "video"). */
type: string;
/** Attributes always present on the node (e.g. `{ src: "/i.png" }`). */
baseAttrs?: Record<string, unknown>;
/** Attributes to sweep at default and non-default. */
attrMatrix: AttrMatrixEntry[];
/**
* Attributes whose numeric -> string coercion on round-trip is DOCUMENTED and
* intentional; compared modulo `String(x)` on both sides. Defaults to the
* converter's known sizing set.
*/
numericStringAttrs?: string[];
}
/** A single unstable finding, legible enough to tie a gate-lock to. */
export interface Instability {
type: string;
attr: string;
/** What the input authored: the literal value, or the ABSENT sentinel. */
authored: unknown | typeof ABSENT;
/** What the round-trip produced. */
got: unknown;
/** What a stable round-trip should have produced (authored value or default). */
expected: unknown;
}
/** One matrix cell's result. */
export interface ComboResult {
label: string;
authored: Record<string, unknown>;
/** RAW-contour instabilities on the node under test. */
raw: Instability[];
/** CANONICAL-contour divergence (path + values) or null when equal. */
canonical: { path: string; a: unknown; b: unknown } | null;
/** True when the node type failed to round-trip at all (structural loss). */
missing: boolean;
md: string;
}
/** Whole-matrix report for one node spec. */
export interface MatrixReport {
type: string;
combos: ComboResult[];
}
/** Sentinel marking an attribute the input did NOT author. */
export const ABSENT = Symbol("ABSENT");
const DEFAULT_NUMERIC_STRING_ATTRS = [
"width",
"height",
"size",
"aspectRatio",
];
// The ProseMirror schema the converter targets — its attribute `default`s are
// the authoritative "what an absent attr should re-materialize as" oracle.
const schema = getSchema(docmostExtensions);
/** Read the schema default for every attribute of a node type. */
function schemaDefaults(type: string): Record<string, unknown> {
const specAttrs = (schema.nodes[type]?.spec?.attrs ?? {}) as Record<
string,
{ default: unknown }
>;
const out: Record<string, unknown> = {};
for (const [k, v] of Object.entries(specAttrs)) out[k] = v.default;
return out;
}
/** Find the first node of a given type anywhere in a PM doc tree. */
function findFirst(node: any, type: string): any {
if (node && node.type === type) return node;
for (const child of node?.content ?? []) {
const hit = findFirst(child, type);
if (hit) return hit;
}
return null;
}
/** Coerce a scalar for the documented numeric->string comparison. */
const numStr = (x: unknown): unknown => (x == null ? x : String(x));
/**
* Enumerate the cartesian product of the matrix: every attribute independently
* at its default (index 0) or non-default (index 1) pick. The all-default
* corner is included (the baseline). Small by construction (2^N over a handful
* of at-risk string attrs).
*/
function enumerateCombos(matrix: AttrMatrixEntry[]): number[][] {
let combos: number[][] = [[]];
for (let i = 0; i < matrix.length; i++) {
const next: number[][] = [];
for (const c of combos) {
next.push([...c, 0]);
next.push([...c, 1]);
}
combos = next;
}
return combos;
}
/** Build the authored attrs for one combo pick vector. */
function authoredAttrs(
spec: NodeStabilitySpec,
picks: number[],
): Record<string, unknown> {
const attrs: Record<string, unknown> = { ...(spec.baseAttrs ?? {}) };
spec.attrMatrix.forEach((entry, i) => {
if (picks[i] === 1) {
attrs[entry.attr] = entry.nonDefault;
} else if (entry.default !== undefined) {
attrs[entry.attr] = entry.default;
}
// default === undefined -> OMIT the attr entirely (the absent case).
});
return attrs;
}
/** Human-readable label for a combo (which attrs are at non-default). */
function comboLabel(spec: NodeStabilitySpec, picks: number[]): string {
const on = spec.attrMatrix
.filter((_, i) => picks[i] === 1)
.map((e) => e.attr);
return on.length === 0 ? "<all-default>" : on.join("+");
}
/**
* Run the full stability matrix for one node spec and return a structured
* report (does NOT throw the caller asserts, so a failure can print the whole
* report). Every combo runs the real export->import pipeline once.
*/
export async function runStabilityMatrix(
spec: NodeStabilitySpec,
): Promise<MatrixReport> {
const numericStringAttrs = new Set(
spec.numericStringAttrs ?? DEFAULT_NUMERIC_STRING_ATTRS,
);
const defaults = schemaDefaults(spec.type);
const combos: ComboResult[] = [];
for (const picks of enumerateCombos(spec.attrMatrix)) {
const authored = authoredAttrs(spec, picks);
const doc = { type: "doc", content: [{ type: spec.type, attrs: authored }] };
const md = convertProseMirrorToMarkdown(doc);
const rt = await markdownToProseMirror(md);
const node = findFirst(rt, spec.type);
const result: ComboResult = {
label: comboLabel(spec, picks),
authored,
raw: [],
canonical: null,
missing: node == null,
md,
};
if (node != null) {
// RAW contour: every materialized attr must equal the authored value, or
// (for an absent attr) the schema default — modulo the documented numeric
// string coercion.
const rtAttrs = (node.attrs ?? {}) as Record<string, unknown>;
for (const key of Object.keys(rtAttrs)) {
const authoredHas = Object.prototype.hasOwnProperty.call(authored, key);
const expected = authoredHas ? authored[key] : defaults[key];
let got = rtAttrs[key];
let exp = expected;
if (numericStringAttrs.has(key)) {
got = numStr(got);
exp = numStr(exp);
}
if (firstDivergence(got, exp) !== null) {
result.raw.push({
type: spec.type,
attr: key,
authored: authoredHas ? authored[key] : ABSENT,
got: rtAttrs[key],
expected,
});
}
}
// CANONICAL contour: canonical forms deep-equal, modulo the same numeric
// string coercion (applied to both trees so a documented coercion is not
// counted as a divergence).
const ca = normalizeNumeric(canonicalizeContent(doc), numericStringAttrs);
const cb = normalizeNumeric(canonicalizeContent(rt), numericStringAttrs);
result.canonical = firstDivergence(ca, cb);
}
combos.push(result);
}
return { type: spec.type, combos };
}
/**
* Deep-copy a canonical tree, coercing the documented numeric->string attrs to
* their string form so an intentional `640 -> "640"` coercion is not reported
* as a canonical divergence. Only touches the listed attribute keys.
*/
function normalizeNumeric(node: any, attrs: Set<string>): any {
if (Array.isArray(node)) return node.map((n) => normalizeNumeric(n, attrs));
if (node === null || typeof node !== "object") return node;
const out: Record<string, unknown> = {};
for (const key of Object.keys(node)) {
if (key === "attrs" && node.attrs && typeof node.attrs === "object") {
const a: Record<string, unknown> = {};
for (const [k, v] of Object.entries(node.attrs)) {
a[k] = attrs.has(k) ? numStr(v) : v;
}
out.attrs = a;
} else {
out[key] = normalizeNumeric(node[key], attrs);
}
}
return out;
}
/** Flatten a report to just its unstable combos (for a terse assertion). */
export function unstableCombos(report: MatrixReport): ComboResult[] {
return report.combos.filter(
(c) => c.missing || c.raw.length > 0 || c.canonical !== null,
);
}
// ---------------------------------------------------------------------------
// THIRD STATE: an EXPLICITLY-STORED empty string on a string attr.
//
// The matrix above sweeps TWO states per string attr: absent/default and a
// non-default value — and asserts FIRST-pass byte-stability for both. There is
// a third, degenerate state the matrix does NOT cover: the attr stored as a
// LITERAL `""`. This is DISTINCT from "the node never had the attr": a user
// types an alt in the editor, then deletes it, and Tiptap's
// `updateAttributes({ alt: "" })` persists a literal `alt: ""` in the stored
// JSON. There is no absent-vs-"" distinction in the DOM once serialized, so the
// fix's `getAttribute("alt") || null` coercion canonicalizes BOTH to the
// default (`null`).
//
// Consequence — and this is CORRECT, not a bug: a doc carrying an explicit `""`
// converges to the default on the FIRST round-trip (a ONE-TIME diff: `"" ->
// null`), then is byte-stable from the SECOND round-trip on (idempotent). So
// this state must be pinned with a DIFFERENT contract than the matrix's:
// - do NOT assert first-pass byte-stability (the first pass legitimately
// changes `""` -> default), and
// - DO assert the first pass converges to the default AND the second pass is
// idempotent (rt2 deep-equals rt1).
//
// A future sync/QA pass diffing stored pages will see this one-time `"" -> null`
// normalization exactly once per affected node; it is the converter canon, not
// corruption, and must not be flagged as data loss.
// ---------------------------------------------------------------------------
/** Result of the third-state ("explicit empty string") convergence probe. */
export interface ConvergenceResult {
type: string;
attr: string;
/** The schema default the attr must converge to on pass 1 (null / absent). */
expectedDefault: unknown;
/** rt1's materialized value for the attr — must equal `expectedDefault`. */
firstPassValue: unknown;
/** True when the node round-tripped AND rt1 converged the attr to default. */
convergedToDefault: boolean;
/** rt1-vs-rt2 divergence; MUST be null (idempotent from pass 2 on). */
secondPassDivergence: { path: string; a: unknown; b: unknown } | null;
/** True when the node type failed to round-trip at all (structural loss). */
missing: boolean;
}
/** Round-trip a full PM doc through the real converter once. */
async function roundtripDoc(doc: any): Promise<any> {
return markdownToProseMirror(convertProseMirrorToMarkdown(doc));
}
/**
* Third-state convergence probe for one string attr of the empty-string class.
*
* (a) builds a doc with the attr EXPLICITLY set to `""` (baseAttrs + `""`),
* (b) rt1 = roundtrip(doc); asserts rt1's attr equals the schema default the
* documented ONE-TIME `"" -> default` normalization (NOT byte-stable vs the
* `""` input, so first-pass stability is deliberately NOT asserted here),
* (c) rt2 = roundtrip(rt1); asserts rt2 deep-equals rt1 idempotent from the
* second round-trip on.
*
* Returns a structured result (does NOT throw) so the caller can assert and
* print. Reusable across the whole node family: drive it for every attr flagged
* `emptyStringClass` on every spec (see convergenceCasesFor / the test driver).
*/
export async function runConvergenceCase(
spec: NodeStabilitySpec,
attr: string,
): Promise<ConvergenceResult> {
const expectedDefault = schemaDefaults(spec.type)[attr];
// (a) The degenerate third state: attr persisted as a LITERAL "".
const authored = { ...(spec.baseAttrs ?? {}), [attr]: "" };
const doc = { type: "doc", content: [{ type: spec.type, attrs: authored }] };
// (b) First round-trip: "" must normalize to the default (a one-time diff).
const rt1 = await roundtripDoc(doc);
const node1 = findFirst(rt1, spec.type);
const firstPassValue = node1?.attrs?.[attr];
const convergedToDefault =
node1 != null && firstDivergence(firstPassValue, expectedDefault) === null;
// (c) Second round-trip: must be byte-stable (rt2 deep-equals rt1). We compare
// the WHOLE docs — both are converter OUTPUTS already in the same materialized
// form (numeric attrs are strings on both sides), so no numeric normalization
// is needed here, unlike the raw/canonical contours above.
const rt2 = node1 != null ? await roundtripDoc(rt1) : rt1;
const secondPassDivergence =
node1 != null ? firstDivergence(rt1, rt2) : null;
return {
type: spec.type,
attr,
expectedDefault,
firstPassValue,
convergedToDefault,
secondPassDivergence,
missing: node1 == null,
};
}
/** The attrs of a spec flagged as members of the empty-string class. */
export function convergenceCasesFor(spec: NodeStabilitySpec): string[] {
return spec.attrMatrix
.filter((e) => e.emptyStringClass)
.map((e) => e.attr);
}
/** True when a convergence result honours the "converges once, then stable" contract. */
export function convergenceOk(r: ConvergenceResult): boolean {
return !r.missing && r.convergedToDefault && r.secondPassDivergence === null;
}
/** Render a convergence result as a legible one-liner for a failed assertion. */
export function formatConvergence(r: ConvergenceResult): string {
if (r.missing) return `${r.type}.${r.attr}: DID-NOT-ROUND-TRIP`;
const parts: string[] = [];
if (!r.convergedToDefault) {
parts.push(
`pass1 did NOT converge: got ${JSON.stringify(r.firstPassValue)} (expected default ${JSON.stringify(r.expectedDefault)})`,
);
}
if (r.secondPassDivergence) {
parts.push(
`pass2 NOT idempotent @ ${r.secondPassDivergence.path}: ${JSON.stringify(r.secondPassDivergence.a)} vs ${JSON.stringify(r.secondPassDivergence.b)}`,
);
}
const status = parts.length === 0 ? "converges-once-then-stable" : parts.join("; ");
return `${r.type}.${r.attr}: ${status}`;
}
/** Render a report as a legible multi-line string for a failed assertion. */
export function formatReport(report: MatrixReport): string {
const lines: string[] = [`node "${report.type}":`];
for (const c of report.combos) {
const flags: string[] = [];
if (c.missing) flags.push("DID-NOT-ROUND-TRIP");
for (const i of c.raw) {
const authored =
i.authored === ABSENT ? "absent" : JSON.stringify(i.authored);
flags.push(
`RAW ${i.type}.${i.attr}: ${authored} -> ${JSON.stringify(i.got)} (expected ${JSON.stringify(i.expected)})`,
);
}
if (c.canonical) {
flags.push(
`CANON @ ${c.canonical.path}: ${JSON.stringify(c.canonical.a)} vs ${JSON.stringify(c.canonical.b)}`,
);
}
const status = flags.length === 0 ? "stable" : flags.join("; ");
lines.push(` [${c.label}] ${status}`);
}
return lines.join("\n");
}
@@ -0,0 +1,164 @@
import { describe, expect, it } from "vitest";
import {
runStabilityMatrix,
unstableCombos,
formatReport,
runConvergenceCase,
convergenceCasesFor,
convergenceOk,
formatConvergence,
type NodeStabilitySpec,
} from "./roundtrip-stability.helper.js";
// ---------------------------------------------------------------------------
// Round-trip STABILITY matrix for image + the media family.
//
// Guards the "empty-string-vs-absent" churn class (GS-EDIT-REVERT family): a
// stored node authored WITHOUT a string attr (alt/title/caption/aria-label/...)
// must not gain a phantom `attr: ""` after `markdownToProseMirror(convert…)`.
// Each spec sweeps the at-risk string attrs at DEFAULT (absent) and at a real
// NON-default value; the helper asserts both the RAW round-trip (attrs equal the
// input's, modulo the documented numeric width/height/size/aspectRatio -> string
// coercion) and the CANONICAL round-trip (canonical forms deep-equal).
//
// The image + media family share the `align !== "center"` predicate and the
// `<!--name {…}-->` comment machinery, so one matrix guards the shared class.
// align is NOT part of this class (it round-trips correctly) and is not swept.
// ---------------------------------------------------------------------------
const SPECS: NodeStabilitySpec[] = [
{
// Image carries the most at-risk string attrs. `alt` is the one marked
// materializes as `<img alt="">` on `![](src)` import (the real bug); title
// and caption are covered as the same class. attachmentId is a string attr
// that must stay absent when unset (control).
type: "image",
baseAttrs: { src: "/i.png" },
attrMatrix: [
{ attr: "alt", default: undefined, nonDefault: "a real alt text", emptyStringClass: true },
{ attr: "title", default: undefined, nonDefault: "a real title", emptyStringClass: true },
{ attr: "caption", default: undefined, nonDefault: "a real caption" },
{ attr: "attachmentId", default: undefined, nonDefault: "att-42" },
],
},
{
// Video's `alt` rides the `aria-label` attribute (media aria-label at risk).
type: "video",
baseAttrs: { src: "/v.mp4" },
attrMatrix: [
{ attr: "alt", default: undefined, nonDefault: "a clip", emptyStringClass: true },
{ attr: "attachmentId", default: undefined, nonDefault: "att-1" },
],
},
{
// Audio carries no alt/title; attachmentId is its only optional string attr.
type: "audio",
baseAttrs: { src: "/a.mp3" },
attrMatrix: [
{ attr: "attachmentId", default: undefined, nonDefault: "att-2" },
],
},
{
// pdf: link-form media. `name` (filename) is its at-risk string attr.
type: "pdf",
baseAttrs: { src: "/d.pdf" },
attrMatrix: [
{ attr: "name", default: undefined, nonDefault: "report.pdf", emptyStringClass: true },
{ attr: "attachmentId", default: undefined, nonDefault: "att-3" },
],
},
{
// attachment: link-form media (file card). `name` + `mime` string attrs.
type: "attachment",
baseAttrs: { url: "/f.zip" },
attrMatrix: [
{ attr: "name", default: undefined, nonDefault: "bundle.zip", emptyStringClass: true },
{ attr: "mime", default: undefined, nonDefault: "application/zip", emptyStringClass: true },
{ attr: "attachmentId", default: undefined, nonDefault: "att-4" },
],
},
{
// embed: link-form media. `provider` is its at-risk string attr (schema
// default ""). embed's numeric width/height defaults (800/600) are a SEPARATE,
// documented limitation OUTSIDE the empty-string class: they are not in
// canonicalize's KNOWN_DEFAULTS, so an ABSENT width/height re-imports as the
// 800/600 default and diverges canonically (see the note in canonicalize.ts).
// That is canonicalize-owned and out of scope here, so we author the
// dimensions at their defaults (as real editor embeds carry them) to keep this
// guard focused on the empty-string/provider class.
// provider's schema default is "" (NOT null), so a re-imported "" is the
// correct value, not a phantom — it is outside the null-default empty-string
// class. We author it at its "" default (the default pick) so the sweep still
// asserts a non-default provider ("youtube") round-trips, without tripping the
// canonicalize KNOWN_DEFAULTS gap for embed's non-null defaults.
type: "embed",
baseAttrs: { src: "https://example.com/x", width: 800, height: 600 },
attrMatrix: [
{ attr: "provider", default: "", nonDefault: "youtube" },
],
},
{
// drawio: image-form diagram. `title` + `alt` string attrs (data-title/-alt).
type: "drawio",
baseAttrs: { src: "blob:drawio" },
attrMatrix: [
{ attr: "title", default: undefined, nonDefault: "flow chart", emptyStringClass: true },
{ attr: "alt", default: undefined, nonDefault: "an alt", emptyStringClass: true },
{ attr: "attachmentId", default: undefined, nonDefault: "att-5" },
],
},
{
// excalidraw: image-form diagram, same shared diagramAttributes set.
type: "excalidraw",
baseAttrs: { src: "blob:excalidraw" },
attrMatrix: [
{ attr: "title", default: undefined, nonDefault: "sketch", emptyStringClass: true },
{ attr: "alt", default: undefined, nonDefault: "an alt", emptyStringClass: true },
{ attr: "attachmentId", default: undefined, nonDefault: "att-6" },
],
},
];
describe("round-trip stability matrix (image + media family)", () => {
for (const spec of SPECS) {
it(`${spec.type}: no attr materializes an empty-string / phantom value`, async () => {
const report = await runStabilityMatrix(spec);
const unstable = unstableCombos(report);
// On failure, print the WHOLE matrix so which (attr, value) combos are
// unstable is legible.
expect(unstable, `\n${formatReport(report)}\n`).toEqual([]);
});
}
});
// ---------------------------------------------------------------------------
// THIRD STATE: an attr EXPLICITLY stored as a literal "" (GS-EDIT-REVERT: a user
// typed alt/title/name/... then deleted it, so Tiptap persisted `attr: ""` — a
// value DISTINCT from "attr was never set"). Unlike the absent case above, this
// state is NOT first-pass byte-stable: the fix's `"" -> default` coercion is a
// deliberate ONE-TIME normalization on the FIRST sync round-trip, stable
// thereafter. We therefore assert a DIFFERENT contract — "converges to default
// on pass 1, then idempotent from pass 2 on" — for every empty-string-class attr
// across the whole node family (image/video/pdf/attachment/drawio/excalidraw).
//
// IMPORTANT for a future sync/QA pass: the pass-1 `"" -> null` diff is the
// converter canon, not corruption. It appears at most once per affected node and
// must NOT be flagged as "the converter is losing/corrupting page data".
// ---------------------------------------------------------------------------
describe("round-trip third state: explicit empty string converges once, then idempotent", () => {
for (const spec of SPECS) {
for (const attr of convergenceCasesFor(spec)) {
it(`${spec.type}.${attr}: "" normalizes to default on pass 1, byte-stable from pass 2`, async () => {
const r = await runConvergenceCase(spec, attr);
// Pass 1 must converge "" -> the schema default (the one-time diff) and
// pass 2 (roundtrip of pass-1 output) must be byte-stable. formatConvergence
// prints exactly which half failed.
expect(convergenceOk(r), `\n${formatConvergence(r)}\n`).toBe(true);
// Spell the contract out explicitly so the intent is legible in the test:
expect(r.convergedToDefault, `\n${formatConvergence(r)}\n`).toBe(true);
expect(r.firstPassValue).toEqual(r.expectedDefault);
expect(r.secondPassDivergence, `\n${formatConvergence(r)}\n`).toBeNull();
});
}
}
});