Compare commits

..

3 Commits

Author SHA1 Message Date
agent_coder c1b2210a4e fix(#370): thread trx into addPageWatchers (F7 self-deadlock) + restore contributors on commit-failure (F8) + assert the lock (F9) (review round 2)
The round-1 F3 fix (wrapping the processor's find+save in a locked tx) itself
introduced two regressions:

F7 [CRITICAL] addPageWatchers ran WITHOUT trx inside the tx holding FOR UPDATE on
pages[pageId]. The watcher insert's FK check takes FOR KEY SHARE on the same row,
but on a DIFFERENT pool connection — a true self-deadlock (our tx connection sits
idle-in-transaction awaiting the JS await, the insert connection blocks on the
lock). Now passes trx (addPageWatchers already accepts it and routes it through
insertMany), so the FK lock is taken on the connection that already holds FOR
UPDATE — no self-conflict.

F8 [WARNING] popContributors is a destructive Redis SPOP; the inner catch only
restores on a throw INSIDE the callback. A COMMIT failure throws OUTSIDE it,
rolling the snapshot back while the pop is gone → a retry writes an unattributed
version. Now tracks the popped set and restores it in an outer catch (idempotent
SADD), leaving BullMQ to retry with attribution intact.

F9 [WARNING] The spec asserted saveHistory args with a loosened objectContaining
that stopped verifying trx, and never pinned withLock/trx on findById or the trx
on addPageWatchers — which is exactly why F7 slipped. Restored the exact
saveHistory(trx) assertion and added findById({withLock,trx}) + addPageWatchers
trx assertions (the latter would have caught F7), plus a commit-failure test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-05 05:50:19 +03:00
agent_coder b1e5193b37 fix(#370): ES2021-safe spec, source-specific idle ceiling, processor lock, tested index mapping (review round 1)
F1 [BLOCKER] persistence-store.spec used Array.prototype.at(-1) (ES2022) but the
server targets ES2021, so server tsc failed (TS2550) and ts-jest could not
compile the suite — 22 core manual-save/idle/boundary tests silently did not run
in CI. Replaced with [length - 1] index access.

F2 [WARNING] The idle burst-reset used a hardcoded IDLE_MAX_WAIT_USER for both
tiers, but computeHistoryJob's ceiling is source-specific. On a continuously
agent-edited page the burst marker stayed stale for 5..10m, forcing delay=0 on
every store and writing one idle row per store — the exact per-store bloat the
debounce prevents. The reset now uses the same source-specific max-wait.

F3 [WARNING] The processor did an unlocked findPageLastHistory -> saveHistory,
which TOCTOU-races a concurrent manual-save (that runs under a page-row lock),
producing two page_history rows with identical content (one idle, one manual) and
defeating promote-not-dup. The snapshot decision is now wrapped in executeTx with
the same page-row lock, so the second writer observes the first's committed row
and the isDeepStrictEqual gate collapses the duplicate.

F4 [WARNING] The risky client filtered-index -> full-list mapping had no tests.
Extracted it to a pure resolvePrevSnapshotId(fullItems, id) helper (diff/restore
baseline against the true previous snapshot in the FULL list, never the previous
visible version) and unit-tested it; removed the now-vestigial index threading.

F5/F6 [low] Renamed the misleading ceiling test + fixed its comment; added a
CHANGELOG entry for the user-facing versioning feature.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-05 05:31:26 +03:00
agent_coder 1542c99979 feat(#370): page-history intentionality tiers — kind column + intentional/idle/boundary triggers (PR-1 core)
PR-1 'core' of #370: introduces page_history.kind ('manual'|'agent'|'idle'|
'boundary'; legacy null = autosave) and rebuilds the snapshot triggers around a
three-tier intentionality model. Draft durability (pages/ydoc hocuspocus
autosave) is unchanged; only the frequency and labelling of history points change.

- Migration 20260705T120000: page_history.kind nullable varchar(20), no default.
- Manual Save: one stateless 'save-version' path for human AND agent; kind is
  derived SERVER-SIDE from the signed context.actor (never the payload), readOnly
  connections rejected, the fresh ydoc runs through the existing store path (no
  REST race), then broadcasts version.saved.
- Idle-flush: trailing debounce (one BullMQ job per page, remove-then-readd) with
  IDLE_INTERVAL_USER=60m / AGENT=15m AND a max-wait ceiling
  (IDLE_MAX_WAIT_USER=10m / AGENT=5m) so a continuous editing session can't starve
  the autosnapshot (review round-1 WARNING).
- Boundary: generalized from the user→agent special-case to ANY lastUpdatedSource
  transition (user↔agent↔git), same isDeepStrictEqual gate — covers git-sync free.
- Removed the agent delay=0 fast path and the old HISTORY_FAST_* constants; the
  agent joins the common idle pipeline.
- Promote-not-dup: a manual save on unchanged content promotes the latest
  autosave's kind in place (or no-ops if already manual) instead of duplicating a
  heavy content row.
- Client: mod+S hotkey + menu button (hidden when readOnly), history-panel kind
  badges, dimmed autosaves, a 'versions only' filter (indices map to the full
  list so diff/restore still target the true previous snapshot), live refresh on
  version.saved.

Internal review: APPROVE-with-suggestions; the round-1 WARNING (idle starvation)
is fixed here via the max-wait ceiling, and the generalized-boundary + ceiling
behaviours are pinned with new tests (115 collab/repo specs green, server tsc 0).

Deferred to later PRs: shares.published_mode (PR-2), the save_page_version MCP
tool + role prompts (PR-3), actor='git' wiring into #359 (PR-4).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-05 05:00:12 +03:00
27 changed files with 1048 additions and 388 deletions
+1 -1
View File
@@ -294,7 +294,7 @@ Vite SPA. Code is organized by feature under `apps/client/src/features/*` (mirro
- **Errors must never be swallowed or shown as generic messages.** Every caught error MUST (1) be logged in full to the console/logger — error name, message, stack, `cause`, and (for HTTP/provider failures) the status code and response body — and (2) be surfaced to the user with a *specific, human-readable explanation of what actually went wrong*, never a bare generic string like "Something went wrong" / "Could not start recording" / "Transcription failed". Include the real reason (the underlying error/provider message) in the user-facing text. On the server, wrap third-party/provider failures with `describeProviderError` (or equivalent) and rethrow as a meaningful HTTP status + message — never let them collapse into an opaque 500. On the client, `console.error(<context>, err)` the raw error AND show the extracted reason (e.g. `err.response?.data?.message`, or the error `name: message`) in the notification.
- The version string shown in the UI comes from `APP_VERSION` (CI/Docker) or `git describe --tags --always` (local), resolved in `vite.config.ts` — not from `package.json`.
- Server TS config is permissive (`noImplicitAny: false`, `strictNullChecks: false`, `no-explicit-any` lint disabled). Follow the existing relaxed style rather than tightening types broadly.
- Dependency versions are heavily pinned via `pnpm.overrides` and `pnpm.patchedDependencies` (`scimmy`, `yjs`, `ai`) in the root `package.json`. Don't bump pinned/patched deps casually; the patches and overrides exist for compatibility/security reasons. The `ai@6.0.134` patch disables the SDK's O(n²) cumulative `partialOutput` accumulation when no output strategy is requested (server heap OOM on long agent runs, #184; tripwire test: `apps/server/src/integrations/ai/ai-sdk-partial-output.patch.spec.ts`) — it MUST be re-created via `pnpm patch` when bumping `ai`.
- Dependency versions are heavily pinned via `pnpm.overrides` and `pnpm.patchedDependencies` (`scimmy`, `yjs`) in the root `package.json`. Don't bump pinned/patched deps casually; the patches and overrides exist for compatibility/security reasons.
- **Adding/renaming/removing an MCP tool requires updating `SERVER_INSTRUCTIONS`** in `packages/mcp/src/index.ts` — the intent-routing guide MCP clients receive on initialize. This applies both to inline `server.registerTool(...)` calls in `index.ts` and to specs in `packages/mcp/src/tool-specs.ts`. Enforced by `packages/mcp/test/unit/server-instructions.test.mjs`, which fails when a registered tool is not mentioned in the guide (deliberate opt-outs go into its `EXCEPTIONS` list). `packages/mcp/build/` is gitignored and rebuilt in CI/Docker via `pnpm build` (same convention as `git-sync`/`prosemirror-markdown`) — never commit it; rebuild locally after editing to run the tests.
## CI / release
+8 -8
View File
@@ -12,6 +12,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- **Save intentional page versions.** Press `Cmd/Ctrl+S` (or use the page menu)
to save a named version of a page. The history panel now distinguishes
intentional versions (a "Saved" / "Agent version" badge) from automatic
snapshots, dims autosaves, and offers an "Only versions" filter. Automatic
snapshots switched from a fixed interval to a trailing idle-flush with a
max-wait ceiling, and a boundary snapshot is pinned whenever the editing source
changes (e.g. a person's edits followed by the AI agent). (#370)
- **Place several images side by side in a row.** A new "Inline (side by
side)" alignment mode in the image bubble menu renders consecutive inline
images as a row that wraps onto the next line on narrow screens. The row is
@@ -169,14 +177,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- **The server no longer runs out of heap during long autonomous agent runs.** A
new pnpm patch on `ai@6.0.134` stops the SDK from building a cumulative
snapshot of the ENTIRE turn text on every streamed text-delta when no output
strategy was requested (our server never requests one). Unpatched, those
O(n²) `partialOutput` snapshots piled up in a never-consumed internal
`tee()` branch of the stream result — a ~20-step, ~28k-chunk agent run
retained ~1.7 GB and OOM'd the 2 GB JS heap. Streaming granularity is
unchanged; the patch must be re-created if `ai` is ever bumped. (#184)
- **Internal links in exported Markdown no longer lose their visible text.** A
link whose target page name had no file extension (e.g. a bare title) was
collapsed to empty text during export, producing an unclickable, label-less
@@ -1385,5 +1385,14 @@
"The commented text changed since this suggestion was made; it was not applied.": "The commented text changed since this suggestion was made; it was not applied.",
"Dismiss": "Dismiss",
"Suggestion dismissed": "Suggestion dismissed",
"Failed to dismiss suggestion": "Failed to dismiss suggestion"
"Failed to dismiss suggestion": "Failed to dismiss suggestion",
"Save version": "Save version",
"Ctrl+S": "Ctrl+S",
"Version saved": "Version saved",
"Already saved as the latest version": "Already saved as the latest version",
"Agent version": "Agent version",
"Boundary": "Boundary",
"Autosave": "Autosave",
"Only versions": "Only versions",
"No saved versions yet.": "No saved versions yet."
}
@@ -1248,5 +1248,14 @@
"The commented text changed since this suggestion was made; it was not applied.": "Прокомментированный текст изменился после создания предложения; оно не было применено.",
"Dismiss": "Не применять",
"Suggestion dismissed": "Предложение отклонено",
"Failed to dismiss suggestion": "Не удалось отклонить предложение"
"Failed to dismiss suggestion": "Не удалось отклонить предложение",
"Save version": "Сохранить версию",
"Ctrl+S": "Ctrl+S",
"Version saved": "Версия сохранена",
"Already saved as the latest version": "Уже сохранено как последняя версия",
"Agent version": "Версия агента",
"Boundary": "Граница",
"Autosave": "Автосейв",
"Only versions": "Только версии",
"No saved versions yet.": "Пока нет сохранённых версий."
}
@@ -1,10 +1,19 @@
import { atom } from "jotai";
import { Editor } from "@tiptap/core";
import type { HocuspocusProvider } from "@hocuspocus/provider";
import { PageEditMode } from "@/features/user/types/user.types.ts";
import type { DictationUnavailableReason } from "@/features/dictation/dictation-status";
export const pageEditorAtom = atom<Editor | null>(null);
// #370 — the active page's collab provider, published by the page editor so the
// header menu can emit the "save-version" stateless signal (Cmd+S / button).
// Null when the page is read-only / collab isn't connected. A typed initial
// value (rather than an explicit generic) keeps jotai's overload resolution on
// the writable PrimitiveAtom branch.
const initialCollabProvider: HocuspocusProvider | null = null;
export const collabProviderAtom = atom(initialCollabProvider);
export const titleEditorAtom = atom<Editor | null>(null);
export const readOnlyEditorAtom = atom<Editor | null>(null);
@@ -31,11 +31,18 @@ import { useAtom, useAtomValue, useSetAtom } from "jotai";
import useCollaborationUrl from "@/features/editor/hooks/use-collaboration-url";
import { currentUserAtom } from "@/features/user/atoms/current-user-atom";
import {
collabProviderAtom,
currentPageEditModeAtom,
dictationAvailabilityAtom,
pageEditorAtom,
yjsConnectionStatusAtom,
} from "@/features/editor/atoms/editor-atoms";
import { notifications } from "@mantine/notifications";
import {
VERSION_SAVED_MESSAGE_TYPE,
type VersionSavedMessage,
saveVersionPending,
} from "@/features/page-history/version-messages";
import { asideStateAtom } from "@/components/layouts/global/hooks/atoms/sidebar-atom";
import {
activeCommentIdAtom,
@@ -123,6 +130,7 @@ export default function PageEditor({
const [currentUser] = useAtom(currentUserAtom);
const [, setEditor] = useAtom(pageEditorAtom);
const setCollabProvider = useSetAtom(collabProviderAtom);
const [, setAsideState] = useAtom(asideStateAtom);
const [, setActiveCommentId] = useAtom(activeCommentIdAtom);
const [showCommentPopup, setShowCommentPopup] = useAtom(showCommentPopupAtom);
@@ -180,6 +188,24 @@ export default function PageEditor({
const onStatelessHandler = ({ payload }: onStatelessParameters) => {
try {
const message = JSON.parse(payload);
// #370 — a version was saved somewhere; live-refresh the history panel
// on every client. Only the client that pressed Save (tracked by the
// module-level flag) shows the confirmation toast.
if (message?.type === VERSION_SAVED_MESSAGE_TYPE) {
const versionMsg = message as VersionSavedMessage;
queryClient.invalidateQueries({
queryKey: ["page-history-list"],
});
if (saveVersionPending.current) {
saveVersionPending.current = false;
notifications.show({
message: versionMsg.alreadySaved
? t("Already saved as the latest version")
: t("Version saved"),
});
}
return;
}
if (message?.type !== "page.updated" || !message.updatedAt) return;
const pageData = queryClient.getQueryData<IPage>(["pages", slugId]);
if (pageData) {
@@ -237,12 +263,16 @@ export default function PageEditor({
local.on("synced", onLocalSyncedHandler);
providersRef.current = { socket, local, remote };
// #370 — publish the provider so the header menu can emit save-version.
setCollabProvider(remote);
setProvidersReady(true);
} else {
setCollabProvider(providersRef.current.remote);
setProvidersReady(true);
}
// Only destroy on final unmount
return () => {
setCollabProvider(null);
providersRef.current?.socket.destroy();
providersRef.current?.remote.destroy();
providersRef.current?.local.destroy();
@@ -1,4 +1,11 @@
import { Text, Group, UnstyledButton, Avatar, Tooltip } from "@mantine/core";
import {
Text,
Group,
UnstyledButton,
Avatar,
Tooltip,
Badge,
} from "@mantine/core";
import { CustomAvatar } from "@/components/ui/custom-avatar.tsx";
import { AgentAvatarStack } from "@/components/ui/agent-avatar-stack.tsx";
import { formattedDate } from "@/lib/time";
@@ -7,36 +14,59 @@ import clsx from "clsx";
import { IPageHistory } from "@/features/page-history/types/page.types";
import { memo, useCallback } from "react";
import { useSetAtom } from "jotai";
import { useTranslation } from "react-i18next";
import { historyAtoms } from "@/features/page-history/atoms/history-atoms.ts";
const MAX_VISIBLE_AVATARS = 5;
/**
* #370 map a snapshot's intentionality tier to its badge. `version: true`
* marks the intentional points (manual / agent); autosaves (boundary / idle /
* legacy null) are non-versions and get dimmed in the list.
*/
type HistoryKindMeta = { labelKey: string; color: string; version: boolean };
export function historyKindMeta(kind?: string | null): HistoryKindMeta {
switch (kind) {
case "manual":
return { labelKey: "Saved", color: "blue", version: true };
case "agent":
return { labelKey: "Agent version", color: "violet", version: true };
case "boundary":
return { labelKey: "Boundary", color: "gray", version: false };
default: // "idle" | null | undefined (legacy autosave)
return { labelKey: "Autosave", color: "gray", version: false };
}
}
interface HistoryItemProps {
historyItem: IPageHistory;
index: number;
onSelect: (id: string, index: number) => void;
onHover?: (id: string, index: number) => void;
// The previous snapshot for diff/restore is resolved by id from the FULL list
// in the parent (resolvePrevSnapshotId), so the item only needs to report its
// own id — never a list index (which would be the filtered-view index).
onSelect: (id: string) => void;
onHover?: (id: string) => void;
onHoverEnd?: () => void;
isActive: boolean;
}
const HistoryItem = memo(function HistoryItem({
historyItem,
index,
onSelect,
onHover,
onHoverEnd,
isActive,
}: HistoryItemProps) {
const setHistoryModalOpen = useSetAtom(historyAtoms);
const { t } = useTranslation();
const kindMeta = historyKindMeta(historyItem.kind);
const handleClick = useCallback(() => {
onSelect(historyItem.id, index);
}, [onSelect, historyItem.id, index]);
onSelect(historyItem.id);
}, [onSelect, historyItem.id]);
const handleMouseEnter = useCallback(() => {
onHover?.(historyItem.id, index);
}, [onHover, historyItem.id, index]);
onHover?.(historyItem.id);
}, [onHover, historyItem.id]);
const contributors = historyItem.contributors;
const hasContributors = contributors && contributors.length > 0;
@@ -49,8 +79,20 @@ const HistoryItem = memo(function HistoryItem({
onMouseEnter={handleMouseEnter}
onMouseLeave={onHoverEnd}
className={clsx(classes.history, { [classes.active]: isActive })}
// #370 — dim autosnapshots so intentional versions stand out.
style={{ opacity: kindMeta.version ? 1 : 0.55 }}
>
<Text size="sm">{formattedDate(new Date(historyItem.createdAt))}</Text>
<Group gap={6} wrap="nowrap" justify="space-between">
<Text size="sm">{formattedDate(new Date(historyItem.createdAt))}</Text>
<Badge
size="xs"
radius="sm"
variant={kindMeta.version ? "filled" : "light"}
color={kindMeta.color}
>
{t(kindMeta.labelKey)}
</Badge>
</Group>
<Group gap={6} wrap="nowrap" mt={4}>
{hasContributors ? (
@@ -9,7 +9,7 @@ import {
historyAtoms,
} from "@/features/page-history/atoms/history-atoms";
import { useAtom, useSetAtom } from "jotai";
import { useCallback, useEffect, useMemo, useRef } from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import {
Button,
ScrollArea,
@@ -17,9 +17,12 @@ import {
Divider,
Loader,
Center,
Switch,
Text,
} from "@mantine/core";
import { useTranslation } from "react-i18next";
import { useHistoryRestore } from "@/features/page-history/hooks";
import { resolvePrevSnapshotId } from "@/features/page-history/utils/resolve-prev-snapshot";
const PREFETCH_DELAY_MS = 150;
@@ -47,6 +50,23 @@ function HistoryList({ pageId }: Props) {
[pageHistoryData],
);
// #370 — "only versions" filter: hide autosnapshots (idle/boundary/legacy
// null), keep only intentional points (manual/agent). Filtering is over the
// already-loaded pages; the diff/restore still targets the true previous
// snapshot, so items carry their index within the FULL list.
const [onlyVersions, setOnlyVersions] = useState(false);
const isVersion = useCallback(
(kind?: string | null) => kind === "manual" || kind === "agent",
[],
);
const visibleItems = useMemo(
() =>
onlyVersions
? historyItems.filter((item) => isVersion(item.kind))
: historyItems,
[historyItems, onlyVersions, isVersion],
);
const loadMoreRef = useRef<HTMLDivElement>(null);
const prefetchTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
@@ -60,11 +80,13 @@ function HistoryList({ pageId }: Props) {
}, []);
const handleHover = useCallback(
(historyId: string, index: number) => {
(historyId: string) => {
clearPrefetchTimeout();
prefetchTimeoutRef.current = setTimeout(() => {
prefetchPageHistory(historyId);
const prevId = historyItems[index + 1]?.id;
// The true previous snapshot in the FULL list (not the previous visible
// one under the "only versions" filter).
const prevId = resolvePrevSnapshotId(historyItems, historyId);
if (prevId) {
prefetchPageHistory(prevId);
}
@@ -78,9 +100,11 @@ function HistoryList({ pageId }: Props) {
}, [clearPrefetchTimeout]);
const handleSelect = useCallback(
(id: string, index: number) => {
(id: string) => {
setActiveHistoryId(id);
setActiveHistoryPrevId(historyItems[index + 1]?.id ?? "");
// Baseline = true previous snapshot in the FULL list, so the "only
// versions" filter never diffs/restores against the wrong item.
setActiveHistoryPrevId(resolvePrevSnapshotId(historyItems, id));
},
[historyItems, setActiveHistoryId, setActiveHistoryPrevId],
);
@@ -128,12 +152,27 @@ function HistoryList({ pageId }: Props) {
return (
<div>
<Group px="xs" py={6} justify="flex-end">
<Switch
size="xs"
checked={onlyVersions}
onChange={(e) => setOnlyVersions(e.currentTarget.checked)}
label={t("Only versions")}
/>
</Group>
<ScrollArea h={620} w="100%" type="scroll" scrollbarSize={5}>
{historyItems.map((historyItem, index) => (
{onlyVersions && visibleItems.length === 0 && (
<Center py="md">
<Text size="sm" c="dimmed">
{t("No saved versions yet.")}
</Text>
</Center>
)}
{visibleItems.map((historyItem) => (
<HistoryItem
key={historyItem.id}
historyItem={historyItem}
index={index}
onSelect={handleSelect}
onHover={handleHover}
onHoverEnd={clearPrefetchTimeout}
@@ -24,6 +24,10 @@ export interface IPageHistory {
updatedAt: string;
lastUpdatedBy: IPageHistoryUser;
contributors?: IPageHistoryUser[];
// #370 — intentionality tier: 'manual'/'agent' are versions (intentional
// points), 'idle'/'boundary' are autosnapshots; null/undefined = legacy
// autosave. Derived server-side, drives the history badge + "versions" filter.
kind?: "manual" | "agent" | "idle" | "boundary" | null;
// Provenance markers copied off the page row when the snapshot was saved.
// `'agent'` marks a version written by the AI agent; `lastUpdatedAiChatId`
// (when present) deep-links to the chat that produced the edit.
@@ -0,0 +1,42 @@
import { describe, it, expect } from "vitest";
import { resolvePrevSnapshotId } from "./resolve-prev-snapshot";
// #370 F4 — the risky client path: with the "only versions" filter active, diff
// and restore must still baseline against the TRUE previous snapshot in the FULL
// list, never the previous VISIBLE version (which would skip the autosnapshots
// between two versions). These pin that the resolution is by FULL-list order.
describe("resolvePrevSnapshotId", () => {
// Newest-first, as the history list stores it: a version, then two autosaves,
// then an older version.
const full = [
{ id: "v2", kind: "manual" },
{ id: "a2", kind: "idle" },
{ id: "a1", kind: "boundary" },
{ id: "v1", kind: "manual" },
{ id: "a0", kind: null },
];
it("returns the immediate FULL-list successor, not the previous visible version", () => {
// Selecting v2 while filtered to versions-only must baseline against a2 (the
// real chronological predecessor), NOT v1 (the previous visible version).
expect(resolvePrevSnapshotId(full, "v2")).toBe("a2");
});
it("resolves an autosnapshot's predecessor by full-list order", () => {
expect(resolvePrevSnapshotId(full, "a1")).toBe("v1");
});
it("returns '' for the oldest item (no predecessor)", () => {
expect(resolvePrevSnapshotId(full, "a0")).toBe("");
});
it("returns '' for an id not in the list", () => {
expect(resolvePrevSnapshotId(full, "missing")).toBe("");
});
it("does not depend on a filtered subset — same result whatever is visible", () => {
// The helper only ever sees the full list; a filtered view cannot change the
// baseline it computes.
expect(resolvePrevSnapshotId(full, "v1")).toBe("a0");
});
});
@@ -0,0 +1,22 @@
/**
* #370 resolve the TRUE previous snapshot for a history item.
*
* The history panel can be filtered to "only versions" (manual/agent), but diff
* and restore must always compare against the immediately-preceding snapshot in
* the FULL, unfiltered list NOT the previous VISIBLE item. Comparing against
* the previous visible version would silently skip the autosnapshots between two
* versions and diff/restore the wrong baseline.
*
* Given the full (newest-first) list and an item id, this returns the id of the
* item right after it in the full list (its chronological predecessor), or "" if
* it is the oldest / not found. Pure and list-order-preserving so it can be unit
* tested without mounting the component.
*/
export function resolvePrevSnapshotId(
fullItems: ReadonlyArray<{ id: string }>,
id: string,
): string {
const index = fullItems.findIndex((item) => item.id === id);
if (index === -1) return "";
return fullItems[index + 1]?.id ?? "";
}
@@ -0,0 +1,28 @@
/**
* #370 page-version stateless wire formats. Kept in one place so the client
* emitter (Save hotkey / button) and the client listener (page-editor) agree
* with the server (PersistenceExtension) on the message shapes.
*/
/** Client server: "save a version now". The server derives the tier
* (manual/agent) from the signed connection actor, never from this payload. */
export const SAVE_VERSION_MESSAGE_TYPE = "save-version";
/** Server → all clients: a version was saved (or promoted / already existed). */
export const VERSION_SAVED_MESSAGE_TYPE = "version.saved";
export interface VersionSavedMessage {
type: typeof VERSION_SAVED_MESSAGE_TYPE;
historyId: string;
kind: "manual" | "agent";
/** True when the latest snapshot was already a manual version (a no-op save). */
alreadySaved: boolean;
}
/**
* Cross-component coordination flag so only the client that pressed Save shows
* the confirmation toast, while every other client silently refreshes its
* history panel on the broadcast. A module-level ref avoids stale-closure
* pitfalls in the editor's long-lived stateless handler.
*/
export const saveVersionPending = { current: false };
@@ -3,6 +3,7 @@ import {
IconArrowRight,
IconArrowsHorizontal,
IconClockHour4,
IconDeviceFloppy,
IconDots,
IconEye,
IconEyeOff,
@@ -17,7 +18,7 @@ import {
IconTrash,
IconWifiOff,
} from "@tabler/icons-react";
import React, { useEffect, useRef, useState } from "react";
import React, { useCallback, useEffect, useRef, useState } from "react";
import { useAsideTriggerProps } from "@/hooks/use-toggle-aside.tsx";
import { useAtom, useAtomValue } from "jotai";
import { historyAtoms } from "@/features/page-history/atoms/history-atoms.ts";
@@ -39,9 +40,14 @@ import { Trans, useTranslation } from "react-i18next";
import ExportModal from "@/components/common/export-modal";
import { htmlToMarkdown } from "@docmost/editor-ext";
import {
collabProviderAtom,
pageEditorAtom,
yjsConnectionStatusAtom,
} from "@/features/editor/atoms/editor-atoms.ts";
import {
SAVE_VERSION_MESSAGE_TYPE,
saveVersionPending,
} from "@/features/page-history/version-messages.ts";
import { formattedDate } from "@/lib/time.ts";
import { PageEditModeToggle } from "@/features/user/components/page-state-pref.tsx";
import MovePageModal from "@/features/page/components/move-page-modal.tsx";
@@ -72,9 +78,34 @@ export default function PageHeaderMenu({ readOnly }: PageHeaderMenuProps) {
});
const isDeleted = !!page?.deletedAt;
const [workspace] = useAtom(workspaceAtom);
const collabProvider = useAtomValue(collabProviderAtom);
// Community public-sharing entry point (replaces the removed EE PageShareModal)
const workspaceSharingDisabled = workspace?.settings?.sharing?.disabled === true;
// #370 — explicit "save a version" (Cmd+S / Save button). One path for the
// human; the server derives the tier from the signed actor. Readers can't save
// (the button is hidden and the collab connection is read-only server-side).
const handleSaveVersion = useCallback(() => {
if (readOnly || !collabProvider) return;
// Flag this client as the initiator so only it shows the confirmation toast;
// a safety timeout clears it if no broadcast comes back (e.g. offline).
saveVersionPending.current = true;
window.setTimeout(() => {
saveVersionPending.current = false;
}, 5000);
collabProvider.sendStateless(
JSON.stringify({ type: SAVE_VERSION_MESSAGE_TYPE }),
);
}, [readOnly, collabProvider]);
// mod+S must also block the browser's "Save page" dialog. `triggerOnContent-
// Editable` + empty ignore-list so it fires while typing in the editor/title.
useHotkeys(
[["mod+S", handleSaveVersion, { preventDefault: true }]],
[],
true,
);
useHotkeys(
[
[
@@ -133,15 +164,16 @@ export default function PageHeaderMenu({ readOnly }: PageHeaderMenuProps) {
</ActionIcon>
</Tooltip>
<PageActionMenu readOnly={readOnly} />
<PageActionMenu readOnly={readOnly} onSaveVersion={handleSaveVersion} />
</>
);
}
interface PageActionMenuProps {
readOnly?: boolean;
onSaveVersion?: () => void;
}
function PageActionMenu({ readOnly }: PageActionMenuProps) {
function PageActionMenu({ readOnly, onSaveVersion }: PageActionMenuProps) {
const { t } = useTranslation();
const [, setHistoryModalOpen] = useAtom(historyAtoms);
const clipboard = useClipboard({ timeout: 500 });
@@ -302,6 +334,20 @@ function PageActionMenu({ readOnly }: PageActionMenuProps) {
</Group>
</Menu.Item>
{!readOnly && (
<Menu.Item
leftSection={<IconDeviceFloppy size={16} />}
onClick={onSaveVersion}
rightSection={
<Text size="xs" c="dimmed">
{t("Ctrl+S")}
</Text>
}
>
{t("Save version")}
</Menu.Item>
)}
<Menu.Item
leftSection={<IconHistory size={16} />}
onClick={openHistoryModal}
+29 -3
View File
@@ -1,3 +1,29 @@
export const HISTORY_INTERVAL = 5 * 60 * 1000;
export const HISTORY_FAST_INTERVAL = 60 * 1000;
export const HISTORY_FAST_THRESHOLD = 5 * 60 * 1000;
/**
* #370 page-history intentionality tiers. Domain of `page_history.kind`.
* - 'manual' / 'agent' Tier 1 versions (intentional points)
* - 'idle' / 'boundary' Tier 0 autosnapshots (safety net)
* A legacy `null` kind is treated as an autosave.
*/
export type PageHistoryKind = 'manual' | 'agent' | 'idle' | 'boundary';
/**
* #370 trailing idle-flush windows. A page's pending idle snapshot is
* re-armed on every store and fires this long after edits go quiet, so a burst
* of edits collapses into a single autosnapshot instead of one-per-store. Human
* sessions are noisier and less risky, so they flush less often than the agent.
*/
export const IDLE_INTERVAL_USER = 60 * 60 * 1000; // 60m
export const IDLE_INTERVAL_AGENT = 15 * 60 * 1000; // 15m
/**
* #370 max-wait ceiling for the idle flush. Pure trailing debounce starves the
* safety net: hocuspocus stores at least every ~45s, so a CONTINUOUS editing
* session would re-arm the trailing timer forever and never take an idle
* snapshot until edits finally go quiet (up to IDLE_INTERVAL_USER = 60m). This
* ceiling bounds the actual wait from the FIRST edit of a burst, so an idle
* snapshot fires at least this often during a long unbroken session restoring
* a recovery point cadence closer to the old heuristic without one-per-store
* noise. Mirrors hocuspocus's own maxDebounce idea.
*/
export const IDLE_MAX_WAIT_USER = 10 * 60 * 1000; // 10m
export const IDLE_MAX_WAIT_AGENT = 5 * 60 * 1000; // 5m
@@ -1,84 +1,93 @@
import { computeHistoryJob, resolveSource } from './persistence.extension';
import {
computeHistoryJob,
resolveSource,
} from './persistence.extension';
import {
HISTORY_FAST_INTERVAL,
HISTORY_FAST_THRESHOLD,
HISTORY_INTERVAL,
IDLE_INTERVAL_AGENT,
IDLE_INTERVAL_USER,
IDLE_MAX_WAIT_AGENT,
IDLE_MAX_WAIT_USER,
} from '../constants';
// A fixed clock + fixed createdAt make pageAge deterministic.
const NOW = 1_700_000_000_000;
const PAGE_ID = '550e8400-e29b-41d4-a716-446655440000';
// Build a minimal page whose age (NOW - createdAt) is exactly `ageMs`.
const pageAged = (ageMs: number) => ({
id: PAGE_ID,
createdAt: new Date(NOW - ageMs),
});
const page = { id: PAGE_ID };
describe('computeHistoryJob', () => {
it('agent edit → delay MUST be 0 and job id is source-keyed', () => {
// INVARIANT (§15 H2 / persistence.extension): the agent delay MUST stay 0.
// The worker re-reads the page row at run time, so any non-zero delay risks
// snapshotting content a later human edit has already overwritten. This is
// the load-bearing assertion of this spec — do not relax it.
const { jobId, delay } = computeHistoryJob(pageAged(0), 'agent', NOW);
expect(delay).toBe(0);
expect(jobId).toBe(`${PAGE_ID}-agent`);
});
it('agent edit on an OLD page is still delay 0 (age never applies to agents)', () => {
// Even when the page is far older than the fast threshold, the agent path
// must short-circuit to 0 — age-based debounce is a human-only concern.
const { jobId, delay } = computeHistoryJob(
pageAged(HISTORY_FAST_THRESHOLD + 60_000),
'agent',
NOW,
);
expect(delay).toBe(0);
expect(jobId).toBe(`${PAGE_ID}-agent`);
});
it('human edit on a YOUNG page (age < threshold) → fast interval, bare job id', () => {
const { jobId, delay } = computeHistoryJob(
pageAged(HISTORY_FAST_THRESHOLD - 1),
'user',
NOW,
);
expect(delay).toBe(HISTORY_FAST_INTERVAL);
describe('computeHistoryJob (#370 — shared trailing idle pipeline)', () => {
it('human edit → user idle window, bare page.id job', () => {
// Humans and the agent now share ONE idle job per page (jobId = page.id).
// The agent's old delay=0 fast path is GONE — intentional agent points now
// arrive via the explicit save-version signal, not a zero-delay snapshot.
const { jobId, delay } = computeHistoryJob(page, 'user');
expect(delay).toBe(IDLE_INTERVAL_USER);
expect(jobId).toBe(PAGE_ID);
});
it('human edit on an OLD page (age > threshold) → standard interval', () => {
const { jobId, delay } = computeHistoryJob(
pageAged(HISTORY_FAST_THRESHOLD + 1),
'user',
NOW,
);
expect(delay).toBe(HISTORY_INTERVAL);
it('agent edit → agent idle window (shorter), still the bare page.id job', () => {
const { jobId, delay } = computeHistoryJob(page, 'agent');
expect(delay).toBe(IDLE_INTERVAL_AGENT);
// No `-agent` suffix anymore: the agent joins the common idle pipeline.
expect(jobId).toBe(PAGE_ID);
});
it('boundary: pageAge EXACTLY === threshold takes the slow branch (the `<` is strict)', () => {
// Off-by-one guard: the condition is `pageAge < HISTORY_FAST_THRESHOLD`, so
// an age of exactly the threshold is NOT "fast" — it must use HISTORY_INTERVAL.
const { delay } = computeHistoryJob(
pageAged(HISTORY_FAST_THRESHOLD),
'user',
NOW,
);
expect(delay).toBe(HISTORY_INTERVAL);
it('agent flushes sooner than a human', () => {
expect(IDLE_INTERVAL_AGENT).toBeLessThan(IDLE_INTERVAL_USER);
});
it('treats any non-"agent" source string as human', () => {
// resolveSource only ever yields 'agent' | 'user', but guard the contract:
// the agent branch keys strictly on === 'agent'.
const { jobId, delay } = computeHistoryJob(pageAged(0), 'user', NOW);
expect(delay).toBe(HISTORY_FAST_INTERVAL);
it('treats any non-"agent" source string as human (keys strictly on === agent)', () => {
const { jobId, delay } = computeHistoryJob(page, 'user');
expect(delay).toBe(IDLE_INTERVAL_USER);
expect(jobId).toBe(PAGE_ID);
});
// #370 review round-1 WARNING: the max-wait ceiling prevents autosnapshot
// starvation during a continuous editing session (the trailing timer would
// otherwise re-arm forever and never fire).
describe('max-wait ceiling', () => {
const T0 = 1_000_000; // arbitrary fixed epoch for deterministic tests
it('once a burst is armed, delay clamps to the remaining max-wait budget', () => {
// 1 minute into the burst the USER interval (60m) far exceeds the remaining
// max-wait budget (10m - 1m = 9m), so the delay is clamped DOWN to that
// remaining budget — the full interval is NOT used once a ceiling applies.
const { delay } = computeHistoryJob(page, 'user', T0, T0 + 60_000);
expect(delay).toBe(IDLE_MAX_WAIT_USER - 60_000);
});
it('never waits longer than the max-wait budget from the burst start', () => {
// A store arriving right at the ceiling → delay 0 (fire promptly).
const { delay } = computeHistoryJob(
page,
'user',
T0,
T0 + IDLE_MAX_WAIT_USER,
);
expect(delay).toBe(0);
});
it('past the ceiling never returns a negative delay', () => {
const { delay } = computeHistoryJob(
page,
'user',
T0,
T0 + IDLE_MAX_WAIT_USER + 5 * 60_000,
);
expect(delay).toBe(0);
});
it('the agent ceiling is shorter than the user ceiling', () => {
expect(IDLE_MAX_WAIT_AGENT).toBeLessThan(IDLE_MAX_WAIT_USER);
const { delay } = computeHistoryJob(
page,
'agent',
T0,
T0 + IDLE_MAX_WAIT_AGENT,
);
expect(delay).toBe(0);
});
it('without a burstStart there is no ceiling (backward-compatible)', () => {
expect(computeHistoryJob(page, 'user').delay).toBe(IDLE_INTERVAL_USER);
expect(computeHistoryJob(page, 'agent').delay).toBe(IDLE_INTERVAL_AGENT);
});
});
});
describe('resolveSource (truth table)', () => {
@@ -40,11 +40,12 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot'
let pageHistoryRepo: {
saveHistory: jest.Mock;
findPageLastHistory: jest.Mock;
updateHistoryKind: jest.Mock;
};
let aiQueue: { add: jest.Mock };
let historyQueue: { add: jest.Mock };
let historyQueue: { add: jest.Mock; remove: jest.Mock };
let notificationQueue: { add: jest.Mock };
let collabHistory: { addContributors: jest.Mock };
let collabHistory: { addContributors: jest.Mock; popContributors: jest.Mock };
let transclusionService: {
syncPageTransclusions: jest.Mock;
syncPageReferences: jest.Mock;
@@ -93,13 +94,22 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot'
pageHistoryRepo = {
saveHistory: jest.fn().mockImplementation(async () => {
callOrder.push('saveHistory');
return { id: 'history-1' };
}),
findPageLastHistory: jest.fn().mockResolvedValue(null),
updateHistoryKind: jest.fn().mockResolvedValue(undefined),
};
aiQueue = { add: jest.fn().mockResolvedValue(undefined) };
historyQueue = { add: jest.fn().mockResolvedValue(undefined) };
historyQueue = {
add: jest.fn().mockResolvedValue(undefined),
// #370 — enqueuePageHistory now removes any pending idle job before re-adding.
remove: jest.fn().mockResolvedValue(undefined),
};
notificationQueue = { add: jest.fn().mockResolvedValue(undefined) };
collabHistory = { addContributors: jest.fn().mockResolvedValue(undefined) };
collabHistory = {
addContributors: jest.fn().mockResolvedValue(undefined),
popContributors: jest.fn().mockResolvedValue([]),
};
transclusionService = {
syncPageTransclusions: jest.fn().mockResolvedValue(undefined),
syncPageReferences: jest.fn().mockResolvedValue(undefined),
@@ -165,6 +175,50 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot'
expect(pageRepo.updatePage.mock.calls[0][0].lastUpdatedSource).toBe('user');
});
// #370 review round-1 SUGGESTION: the boundary was GENERALIZED from a
// user→agent special-case to ANY lastUpdatedSource transition. These pin the
// generalized behaviour it was rebuilt for.
describe('generalized boundary — any source transition', () => {
// Same persisted page but with an explicit prior source.
const pageWithPriorSource = (prior: string | null) => ({
...persistedHumanPage('NEW CONTENT'),
lastUpdatedSource: prior,
});
it('agent→user transition fires the boundary (pins the prior agent revision)', async () => {
const document = ydocFor(doc('NEW CONTENT'));
pageRepo.findById.mockResolvedValue(pageWithPriorSource('agent'));
pageHistoryRepo.findPageLastHistory.mockResolvedValue(null);
await ext.onStoreDocument(buildData(document, 'user') as any);
expect(pageHistoryRepo.saveHistory).toHaveBeenCalledTimes(1);
expect(callOrder).toEqual(['saveHistory', 'updatePage']);
expect(pageRepo.updatePage.mock.calls[0][0].lastUpdatedSource).toBe('user');
});
it('git→user transition fires the boundary (git-sync overwrite is a source change)', async () => {
const document = ydocFor(doc('NEW CONTENT'));
pageRepo.findById.mockResolvedValue(pageWithPriorSource('git'));
pageHistoryRepo.findPageLastHistory.mockResolvedValue(null);
await ext.onStoreDocument(buildData(document, 'user') as any);
expect(pageHistoryRepo.saveHistory).toHaveBeenCalledTimes(1);
expect(callOrder).toEqual(['saveHistory', 'updatePage']);
});
it('a null prior source (first-ever edit) does NOT fire the boundary', async () => {
const document = ydocFor(doc('NEW CONTENT'));
pageRepo.findById.mockResolvedValue(pageWithPriorSource(null));
await ext.onStoreDocument(buildData(document, 'agent') as any);
expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled();
expect(pageRepo.updatePage).toHaveBeenCalledTimes(1);
});
});
it('idempotency: unchanged content → no updatePage, no history, no queues', async () => {
// The Y.Doc content equals the persisted content deeply → early skip.
// A Y.Doc round-trip normalizes attrs (e.g. paragraph indent), so derive
@@ -469,4 +523,125 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot'
// Contributors keyed by the UUID so they match the PAGE_HISTORY job (page.id).
expect(collabHistory.addContributors.mock.calls[0][0]).toBe(PAGE_ID);
});
// #370 — explicit save-version (Cmd+S / agent save tool) over the stateless
// seam. The tier is derived from the SIGNED connection actor, the store path
// is reused, and promote-not-dup avoids duplicating heavy content rows.
describe('save-version (#370)', () => {
const emitSave = (document: any, actor: 'user' | 'agent') =>
ext.onStateless({
connection: {
readOnly: false,
context: { user: { id: USER_ID, name: 'Alice' }, actor },
} as any,
documentName: `page.${PAGE_ID}`,
document: document as any,
payload: JSON.stringify({ type: 'save-version' }),
} as any);
// findById returns a page whose content already equals the live doc, so the
// store path is a no-op and we isolate the versioning decision.
const pageMatchingDoc = (document: any) => ({
...persistedHumanPage('IGNORED'),
content: TiptapTransformer.fromYdoc(document, 'default'),
});
it('human save with no prior snapshot → writes a manual version + broadcasts', async () => {
const document = ydocFor(doc('VERSION ME'));
pageRepo.findById.mockResolvedValue(pageMatchingDoc(document));
pageHistoryRepo.findPageLastHistory.mockResolvedValue(null);
await emitSave(document, 'user');
expect(pageHistoryRepo.saveHistory).toHaveBeenCalledTimes(1);
expect(pageHistoryRepo.saveHistory.mock.calls[0][1]).toEqual(
expect.objectContaining({ kind: 'manual' }),
);
// The pending idle autosnapshot is cancelled by the explicit version.
expect(historyQueue.remove).toHaveBeenCalledWith(PAGE_ID);
const msg = JSON.parse(
(document as any).broadcastStateless.mock.calls[(document as any).broadcastStateless.mock.calls.length - 1][0],
);
expect(msg).toMatchObject({
type: 'version.saved',
kind: 'manual',
alreadySaved: false,
});
});
it('agent save derives kind=agent from the signed actor', async () => {
const document = ydocFor(doc('AGENT VERSION'));
pageRepo.findById.mockResolvedValue(pageMatchingDoc(document));
pageHistoryRepo.findPageLastHistory.mockResolvedValue(null);
await emitSave(document, 'agent');
expect(pageHistoryRepo.saveHistory.mock.calls[pageHistoryRepo.saveHistory.mock.calls.length - 1][1]).toEqual(
expect.objectContaining({ kind: 'agent' }),
);
});
it('promote-not-dup: latest snapshot is an autosave with identical content → upgrades in place', async () => {
const document = ydocFor(doc('SAME'));
const page = pageMatchingDoc(document);
pageRepo.findById.mockResolvedValue(page);
pageHistoryRepo.findPageLastHistory.mockResolvedValue({
id: 'auto-1',
content: page.content,
kind: 'idle',
});
await emitSave(document, 'user');
// No heavy new content row — the existing autosave is promoted to manual.
expect(pageHistoryRepo.updateHistoryKind).toHaveBeenCalledWith(
'auto-1',
'manual',
expect.anything(),
);
expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled();
const msg = JSON.parse(
(document as any).broadcastStateless.mock.calls[(document as any).broadcastStateless.mock.calls.length - 1][0],
);
expect(msg).toMatchObject({ historyId: 'auto-1', alreadySaved: false });
});
it('no-op when the latest snapshot is already a manual version of this content', async () => {
const document = ydocFor(doc('ALREADY SAVED'));
const page = pageMatchingDoc(document);
pageRepo.findById.mockResolvedValue(page);
pageHistoryRepo.findPageLastHistory.mockResolvedValue({
id: 'ver-1',
content: page.content,
kind: 'manual',
});
await emitSave(document, 'user');
expect(pageHistoryRepo.updateHistoryKind).not.toHaveBeenCalled();
expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled();
const msg = JSON.parse(
(document as any).broadcastStateless.mock.calls[(document as any).broadcastStateless.mock.calls.length - 1][0],
);
expect(msg).toMatchObject({ alreadySaved: true, kind: 'manual' });
});
it('a read-only connection cannot save a version', async () => {
const document = ydocFor(doc('READER'));
pageRepo.findById.mockResolvedValue(pageMatchingDoc(document));
await ext.onStateless({
connection: {
readOnly: true,
context: { user: { id: USER_ID }, actor: 'user' },
} as any,
documentName: `page.${PAGE_ID}`,
document: document as any,
payload: JSON.stringify({ type: 'save-version' }),
} as any);
expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled();
expect(pageHistoryRepo.updateHistoryKind).not.toHaveBeenCalled();
});
});
});
@@ -36,9 +36,11 @@ import {
import { Page } from '@docmost/db/types/entity.types';
import { CollabHistoryService } from '../services/collab-history.service';
import {
HISTORY_FAST_INTERVAL,
HISTORY_FAST_THRESHOLD,
HISTORY_INTERVAL,
IDLE_INTERVAL_AGENT,
IDLE_INTERVAL_USER,
IDLE_MAX_WAIT_AGENT,
IDLE_MAX_WAIT_USER,
PageHistoryKind,
} from '../constants';
import { TransclusionService } from '../../core/page/transclusion/transclusion.service';
import { observeCollabStore } from '../../integrations/metrics/metrics.registry';
@@ -51,6 +53,16 @@ import { observeCollabStore } from '../../integrations/metrics/metrics.registry'
*/
export const INTENTIONAL_CLEAR_MESSAGE_TYPE = 'intentional-clear';
/**
* #370 wire format of the clientserver "save a version" signal. Sent by the
* human (Cmd+S / Save button) and by the agent's explicit save tool over the
* SAME stateless channel. The intentionality tier ('manual' vs 'agent') is
* derived SERVER-SIDE from the signed connection actor, never from this
* payload, so a version's type is unforgeable. The document is taken from the
* connection (not the payload), so the signal cannot be aimed at another page.
*/
export const SAVE_VERSION_MESSAGE_TYPE = 'save-version';
/**
* #251 how long an intentional-clear signal stays "pending" before it is
* ignored. The signal is set on the clearing keystroke but consumed by the
@@ -87,35 +99,39 @@ export function resolveSource(
}
/**
* Compute the BullMQ job id + delay for a page-history snapshot job. Pure so
* the data-loss-sensitive timing arithmetic is unit-testable; `now` is injected
* (caller passes `Date.now()`) for determinism.
* #370 compute the BullMQ job id + delay for a page's trailing idle-flush
* autosnapshot. Pure so the timing is unit-testable.
*
* - Agent edits: delay 0 and a source-keyed job id `${page.id}-agent`. The
* delay MUST stay 0 the worker re-reads the page row at run time, so any
* delay risks reading content a later human edit has already overwritten
* (mis-tagged snapshot). 0 minimizes that window. The `-agent` suffix keeps
* the job from coalescing with the bare-page.id human job.
* - Human edits: age-based debounce so rapid human edits coalesce into one
* snapshot; job id is the bare `page.id`.
*
* BullMQ forbids ':' in custom job ids (Redis key separator), so '-' is used;
* page.id is a UUID, so `${page.id}-agent` cannot collide with a human job.
* Both humans and the agent now share ONE idle pipeline (the agent's old
* `delay=0` fast path is gone intentional agent points arrive via the
* explicit save-version signal instead). The job id is the bare `page.id`, so a
* page has at most one pending idle job; the caller removes-and-re-adds it on
* every store to keep it debounced to the trailing edge of an edit burst. The
* window differs by source only: the agent flushes sooner than a human.
*/
export function computeHistoryJob(
page: Pick<Page, 'id' | 'createdAt'>,
page: Pick<Page, 'id'>,
source: string,
now: number,
// Epoch ms of the FIRST edit in the current burst (when the pending idle job
// was first armed). Used to enforce the max-wait ceiling so a continuous
// editing session cannot re-arm the trailing timer forever. `now` is injectable
// for tests; both default to a live clock / no ceiling when omitted.
burstStart?: number,
now: number = Date.now(),
): { jobId: string; delay: number } {
const isAgent = source === 'agent';
const pageAge = now - new Date(page.createdAt).getTime();
const delay = isAgent
? 0
: pageAge < HISTORY_FAST_THRESHOLD
? HISTORY_FAST_INTERVAL
: HISTORY_INTERVAL;
const jobId = isAgent ? `${page.id}-agent` : page.id;
return { jobId, delay };
const interval = isAgent ? IDLE_INTERVAL_AGENT : IDLE_INTERVAL_USER;
const maxWait = isAgent ? IDLE_MAX_WAIT_AGENT : IDLE_MAX_WAIT_USER;
let delay = interval;
if (burstStart !== undefined) {
// Time already elapsed since the burst's first edit; the snapshot must fire
// no later than `maxWait` after that, so shrink the trailing delay to the
// remaining budget (never negative, so BullMQ fires it promptly).
const remaining = burstStart + maxWait - now;
delay = Math.max(0, Math.min(interval, remaining));
}
return { jobId: page.id, delay };
}
@Injectable()
@@ -127,6 +143,11 @@ export class PersistenceExtension implements Extension {
// coalescing window" per document and OR it across all edits in the window,
// so the snapshot is marked 'agent' regardless of who wrote last.
private agentTouched: Map<string, boolean> = new Map();
// #370 — epoch ms of the FIRST edit in the current idle-flush burst, per page.
// Set when the pending idle job is first armed (empty entry), read to enforce
// the max-wait ceiling in computeHistoryJob, and cleared when the idle job is
// consumed/cancelled so the next burst starts a fresh window.
private idleBurstStart: Map<string, number> = new Map();
// #251 — per-document "intentional clear pending" flags. Keyed by
// documentName, value = expiry timestamp (ms). Set by onStateless when the
// client reports a deliberate clear; consumed once by the next
@@ -326,20 +347,19 @@ export class PersistenceExtension implements Extension {
//this.logger.debug('Contributors error:' + err?.['message']);
}
// Approach A — boundary snapshot before the agent's first edit.
// When this store is the agent's and the page's currently persisted
// state was authored by a human, pin that human state as its own
// history version BEFORE the agent overwrites it. `page` still holds
// the OLD content/provenance here, so saveHistory(page) captures the
// pre-agent state tagged 'user'. The agent's new content is
// snapshotted later by the debounced PAGE_HISTORY job ('agent'). Skip
// if the prior state is already agent-authored (boundary already
// pinned on the user->agent transition), if the page is effectively
// empty, or if the latest existing snapshot already equals this human
// state (avoid duplicates).
// #370 — boundary snapshot on ANY source transition. When the store
// flips the page's provenance (user↔agent↔git), pin the OUTGOING
// state as its own history version BEFORE the incoming source
// overwrites it. `page` still holds the OLD content/provenance here,
// so saveHistory(page) captures the pre-transition state tagged with
// its own source, kind='boundary'. The incoming content is snapshotted
// later by the debounced idle job. Skip if the page is effectively
// empty or if the latest existing snapshot already equals this state
// (the shared isDeepStrictEqual gate — avoids duplicates). Generalizing
// beyond the old user→agent special-case also covers git-sync for free.
if (
lastUpdatedSource === 'agent' &&
page.lastUpdatedSource !== 'agent'
page.lastUpdatedSource &&
page.lastUpdatedSource !== lastUpdatedSource
) {
// pageHistory.pageId is uuid-typed; use page.id (never the doc-name
// slugId) so a `page.<slugId>` doc cannot throw 22P02 here (#260).
@@ -347,15 +367,13 @@ export class PersistenceExtension implements Extension {
page.id,
{ includeContent: true, trx },
);
const humanBaselineMissing =
const baselineMissing =
!lastHistory ||
!isDeepStrictEqual(lastHistory.content, page.content);
if (
!isEmptyParagraphDoc(page.content as any) &&
humanBaselineMissing
) {
if (!isEmptyParagraphDoc(page.content as any) && baselineMissing) {
await this.pageHistoryRepo.saveHistory(page, {
contributorIds: page.contributorIds ?? undefined,
kind: 'boundary',
trx,
});
}
@@ -480,6 +498,14 @@ export class PersistenceExtension implements Extension {
return; // unrelated / malformed stateless message
}
// #370 — explicit "save a version" (human Cmd+S / agent save tool). Edit
// rights are already enforced by the readOnly reject above (a reader can't
// create a version), exactly as intentional-clear requires.
if (message?.type === SAVE_VERSION_MESSAGE_TYPE) {
await this.handleSaveVersion(data);
return;
}
if (message?.type !== INTENTIONAL_CLEAR_MESSAGE_TYPE) return;
this.intentionalClear.set(
@@ -488,6 +514,117 @@ export class PersistenceExtension implements Extension {
);
}
/**
* #370 persist an intentional version from the live in-memory ydoc.
*
* One stateless path serves BOTH the human and the agent; the tier is derived
* SERVER-SIDE from the signed connection actor ('agent' 'agent', anything
* else 'manual'), so the version type cannot be spoofed by the client. We
* take the fresh ydoc from the collab process memory and run it through the
* EXISTING store path first (so pages.content/ydoc reflect the exact content
* being versioned a REST endpoint would race the up-to-10s-stale page row),
* then snapshot it into page_history with the intentional kind.
*
* Promote-not-dup: if the latest history row already holds this exact content
* and it is an autosave (idle/boundary/legacy-null), upgrade its kind in place
* instead of duplicating a heavy content row; if it is already 'manual', it is
* a no-op (the client shows an "already saved" toast). Otherwise a fresh
* version row is written, popping the aggregated contributors from Redis.
*/
private async handleSaveVersion(data: onStatelessPayload): Promise<void> {
const { connection, document, documentName } = data;
const context = connection?.context;
const pageId = getPageId(documentName);
// Unforgeable: 'agent' only for a signed agent connection, else 'manual'.
const kind: PageHistoryKind =
context?.actor === 'agent' ? 'agent' : 'manual';
// Flush the live ydoc through the normal store path so the page row + ydoc
// hold exactly what we are about to version (also fires the idle enqueue we
// supersede below, plus any source-transition boundary). onStoreDocument
// only needs document/documentName/context.
await this.onStoreDocument({
document,
documentName,
context,
} as onStoreDocumentPayload);
let result:
| { historyId: string; kind: PageHistoryKind; alreadySaved: boolean }
| undefined;
await executeTx(this.db, async (trx) => {
const page = await this.pageRepo.findById(pageId, {
withLock: true,
includeContent: true,
trx,
});
if (!page) return;
// Never version an effectively-empty page (mirrors the processor's
// first-history guard); there is nothing intentional to pin.
if (isEmptyParagraphDoc(page.content as any)) return;
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
page.id,
{ includeContent: true, trx },
);
if (
lastHistory &&
isDeepStrictEqual(lastHistory.content, page.content)
) {
// Content is already snapshotted. Promote-not-dup.
if (lastHistory.kind === 'manual') {
result = {
historyId: lastHistory.id,
kind: 'manual',
alreadySaved: true,
};
return;
}
await this.pageHistoryRepo.updateHistoryKind(
lastHistory.id,
kind,
trx,
);
result = { historyId: lastHistory.id, kind, alreadySaved: false };
return;
}
// Fresh version row. Pop the contributors aggregated since the last
// snapshot (SPOP); restore them if the write fails so they aren't lost.
const contributorIds = await this.collabHistory.popContributors(page.id);
try {
const saved = await this.pageHistoryRepo.saveHistory(page, {
contributorIds,
kind,
trx,
});
result = { historyId: saved.id, kind, alreadySaved: false };
} catch (err) {
await this.collabHistory.addContributors(page.id, contributorIds);
throw err;
}
});
// Housekeeping: this explicit version supersedes the page's pending idle
// autosnapshot, so cancel it (delayed job → remove() just deletes it) and
// end the current idle burst so the next edit starts a fresh max-wait window.
await this.historyQueue.remove(pageId).catch(() => undefined);
this.idleBurstStart.delete(pageId);
if (result) {
document.broadcastStateless(
JSON.stringify({
type: 'version.saved',
historyId: result.historyId,
kind: result.kind,
alreadySaved: result.alreadySaved,
}),
);
}
}
async onChange(data: onChangePayload) {
const documentName = data.documentName;
const userId = data.context?.user?.id;
@@ -545,17 +682,45 @@ export class PersistenceExtension implements Extension {
page: Page,
lastUpdatedSource: string,
): Promise<void> {
// Job id + delay arithmetic lives in the pure `computeHistoryJob` (see its
// doc comment for the agent-delay-0 / age-based-debounce invariants).
// #370 — trailing idle debounce with a max-wait ceiling. One pending idle
// job per page (jobId = page.id); on every store we remove the pending
// delayed job and re-add it, so the snapshot lands `delay` after edits go
// quiet rather than once per store (precedent: workspace.service.ts).
// remove() on a delayed job simply deletes it (0 if absent, no throw); if the
// job is already ACTIVE and the remove is a no-op, the add still de-dups and
// the processor's isDeepStrictEqual gate collapses the duplicate content.
//
// The FIRST arm of a burst records `burstStart`; computeHistoryJob shrinks
// the delay to the remaining max-wait budget from that point, so a continuous
// session cannot re-arm the trailing timer forever and starve the snapshot.
// A burst marker older than THIS TIER's max-wait means the previous idle job
// has already fired — start a fresh window instead of firing immediately on
// the next edit. Must use the SAME source-specific max-wait computeHistoryJob
// uses (agent 5m / user 10m): a hardcoded USER ceiling would leave an agent
// burst's marker stale for 5..10m, forcing delay=0 on every store in that
// window and writing one idle row per store — exactly the per-store bloat the
// debounce exists to prevent, on the continuous-agent path.
const maxWait =
lastUpdatedSource === 'agent' ? IDLE_MAX_WAIT_AGENT : IDLE_MAX_WAIT_USER;
const now = Date.now();
let burstStart = this.idleBurstStart.get(page.id);
if (burstStart === undefined || now - burstStart >= maxWait) {
burstStart = now;
this.idleBurstStart.set(page.id, burstStart);
}
const { jobId, delay } = computeHistoryJob(
page,
lastUpdatedSource,
Date.now(),
burstStart,
now,
);
await this.historyQueue.remove(jobId).catch(() => undefined);
await this.historyQueue.add(
QueueJob.PAGE_HISTORY,
{ pageId: page.id } as IPageHistoryJob,
{ pageId: page.id, kind: 'idle' } as IPageHistoryJob,
{ jobId, delay },
);
}
@@ -66,6 +66,15 @@ describe('HistoryProcessor.process', () => {
notificationQueue = { add: jest.fn().mockResolvedValue(undefined) };
generalQueue = { add: jest.fn().mockResolvedValue(undefined) };
// #370 F3 — the processor now serializes its find+save under a page-row lock
// via executeTx. A db whose transaction().execute(fn) runs fn with a trx stub
// drives the real executeTx() helper without a database.
const db = {
transaction: () => ({
execute: (fn: (trx: any) => Promise<any>) => fn({ __trx: true }),
}),
};
// WorkerHost's constructor reads `this.worker`; passing repos positionally
// matches the constructor and avoids the Nest DI container.
proc = new HistoryProcessor(
@@ -73,6 +82,7 @@ describe('HistoryProcessor.process', () => {
pageRepo as any,
collabHistory as any,
watcherService as any,
db as any,
notificationQueue as any,
generalQueue as any,
);
@@ -126,15 +136,26 @@ describe('HistoryProcessor.process', () => {
await proc.process(buildJob());
expect(collabHistory.popContributors).toHaveBeenCalledWith(PAGE_ID);
// #370 F3/F9 — the snapshot decision runs under a page-row lock. Pin the lock
// structurally so a refactor that drops withLock/trx (silently reintroducing
// the TOCTOU double-insert) turns this red. The tx stub is { __trx: true }.
expect(pageRepo.findById).toHaveBeenCalledWith(
PAGE_ID,
expect.objectContaining({ withLock: true, trx: { __trx: true } }),
);
// #370 F7 — addPageWatchers MUST receive the trx, or its FK-check runs on a
// separate connection and self-deadlocks against our FOR UPDATE. Asserting
// the trx arg here is exactly what would have caught that regression.
expect(watcherService.addPageWatchers).toHaveBeenCalledWith(
['u1', 'u2'],
PAGE_ID,
SPACE_ID,
WORKSPACE_ID,
{ __trx: true },
);
expect(pageHistoryRepo.saveHistory).toHaveBeenCalledWith(
expect.objectContaining({ id: PAGE_ID }),
{ contributorIds: ['u1', 'u2'] },
{ contributorIds: ['u1', 'u2'], kind: 'idle', trx: { __trx: true } },
);
expect(generalQueue.add).toHaveBeenCalledWith(
QueueJob.PAGE_BACKLINKS,
@@ -186,6 +207,48 @@ describe('HistoryProcessor.process', () => {
]);
});
it('COMMIT failure (throw outside the tx callback) → contributors RESTORED', async () => {
// #370 F8 — a commit-time failure throws OUTSIDE the callback, so the inner
// try/catch does not run; the outer catch must restore the popped set (else a
// BullMQ retry writes an unattributed version). Use a db whose execute() runs
// the callback THEN throws, simulating a commit abort.
pageHistoryRepo.findPageLastHistory.mockResolvedValue({
content: { type: 'doc', content: [] },
});
const commitFail = {
transaction: () => ({
execute: async (fn: (trx: any) => Promise<any>) => {
await fn({ __trx: true }); // callback succeeds (saveHistory ok)
throw new Error('commit aborted'); // ...but the COMMIT fails
},
}),
};
const procCommitFail = new HistoryProcessor(
pageHistoryRepo as any,
pageRepo as any,
collabHistory as any,
watcherService as any,
commitFail as any,
notificationQueue as any,
generalQueue as any,
);
jest
.spyOn(procCommitFail['logger'], 'error')
.mockImplementation(() => undefined);
await expect(procCommitFail.process(buildJob())).rejects.toThrow(
'commit aborted',
);
// The inner catch did NOT run (save succeeded), so only the outer catch can
// restore — assert it did.
expect(collabHistory.addContributors).toHaveBeenCalledWith(PAGE_ID, [
'u1',
'u2',
]);
// And the post-snapshot queue work must NOT have run (we rethrew).
expect(generalQueue.add).not.toHaveBeenCalled();
});
it('backlinks + notification queue failures are swallowed (history still committed)', async () => {
pageHistoryRepo.findPageLastHistory.mockResolvedValue({
content: { type: 'doc', content: [] },
@@ -19,6 +19,9 @@ import { isDeepStrictEqual } from 'node:util';
import { CollabHistoryService } from '../services/collab-history.service';
import { WatcherService } from '../../core/watcher/watcher.service';
import { isEmptyParagraphDoc } from '../collaboration.util';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { executeTx } from '@docmost/db/utils';
@Processor(QueueName.HISTORY_QUEUE)
export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
@@ -29,6 +32,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
private readonly pageRepo: PageRepo,
private readonly collabHistory: CollabHistoryService,
private readonly watcherService: WatcherService,
@InjectKysely() private readonly db: KyselyDB,
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
) {
@@ -41,6 +45,9 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
try {
const { pageId } = job.data;
// Read the page WITHOUT a lock first, only to bail early on the two cheap
// no-write cases (page gone / empty first snapshot) without opening a
// transaction. The authoritative check-then-write happens locked below.
const page = await this.pageRepo.findById(pageId, {
includeContent: true,
});
@@ -51,40 +58,109 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
return;
}
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
pageId,
{ includeContent: true },
);
// #370 F3 — the snapshot decision (findPageLastHistory → saveHistory) must
// be serialized against manual-save/boundary writers, which run under a
// page-row lock in onStoreDocument. Without it, this processor and a
// concurrent manual-save each read the same lastHistory (MVCC), both see
// content != lastHistory, and both insert — producing two page_history rows
// with IDENTICAL content (one 'idle', one 'manual'), defeating
// promote-not-dup and the version-vs-autosave split. Taking the same
// page-row lock makes the second writer observe the first's committed row so
// the isDeepStrictEqual gate collapses the duplicate. Only the read+write
// is transacted; the post-snapshot queue work stays outside.
let contributorIds: string[] = [];
let snapshotWritten = false;
let lastHistoryContent: unknown;
// #370 F8 — the contributor set popped from Redis (destructive SPOP) must be
// restored if the snapshot does not durably land. The inner try/catch only
// covers a throw INSIDE the callback; a COMMIT failure (connection drop,
// serialization/deadlock abort on commit — the transient class the epic
// already retries) throws OUTSIDE it, rolling the snapshot back while the
// pop is already gone. We track the popped set here and restore it in the
// outer catch so a BullMQ retry re-attributes the version. addContributors
// is an idempotent Redis SADD, so a double-restore is harmless.
let poppedForRestore: string[] = [];
if (!lastHistory && isEmptyParagraphDoc(page.content as any)) {
this.logger.debug(
`Skipping first history for page ${pageId}: empty content`,
);
await this.collabHistory.clearContributors(pageId);
try {
await executeTx(this.db, async (trx) => {
const lockedPage = await this.pageRepo.findById(pageId, {
includeContent: true,
withLock: true,
trx,
});
if (!lockedPage) return;
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
pageId,
{ includeContent: true, trx },
);
lastHistoryContent = lastHistory?.content;
if (!lastHistory && isEmptyParagraphDoc(lockedPage.content as any)) {
this.logger.debug(
`Skipping first history for page ${pageId}: empty content`,
);
return;
}
if (
lastHistory &&
isDeepStrictEqual(lastHistory.content, lockedPage.content)
) {
return; // already snapshotted at this content — nothing to write
}
contributorIds = await this.collabHistory.popContributors(pageId);
poppedForRestore = contributorIds;
try {
// Pass `trx` so the watcher insert's FK check (FOR KEY SHARE on
// pages[pageId]) runs on the SAME connection that already holds the
// FOR UPDATE lock from findById — otherwise it takes the FK lock on a
// separate pool connection and self-deadlocks against our own tx.
await this.watcherService.addPageWatchers(
contributorIds,
pageId,
lockedPage.spaceId,
lockedPage.workspaceId,
trx,
);
// #370 — every job on this queue is a trailing idle-flush autosnapshot.
await this.pageHistoryRepo.saveHistory(lockedPage, {
contributorIds,
kind: job.data.kind ?? 'idle',
trx,
});
snapshotWritten = true;
this.logger.debug(`History created for page: ${pageId}`);
} catch (err) {
await this.collabHistory.addContributors(pageId, contributorIds);
poppedForRestore = [];
throw err;
}
});
} catch (err) {
// A throw here means the tx did NOT commit (callback threw, or the commit
// itself failed and rolled back). If we popped contributors and the inner
// catch did not already restore them, restore now so the retry keeps
// attribution. snapshotWritten is irrelevant: it is set before commit, so
// it can be true even when the commit rolled the snapshot back.
if (poppedForRestore.length) {
await this.collabHistory.addContributors(pageId, poppedForRestore);
}
throw err;
}
// No snapshot written (page vanished / empty-first / unchanged content) →
// clear the contributor set for the skip cases and stop.
if (!snapshotWritten) {
if (!lastHistoryContent && isEmptyParagraphDoc(page.content as any)) {
await this.collabHistory.clearContributors(pageId);
}
return;
}
if (
!lastHistory ||
!isDeepStrictEqual(lastHistory.content, page.content)
) {
const contributorIds = await this.collabHistory.popContributors(pageId);
try {
await this.watcherService.addPageWatchers(
contributorIds,
pageId,
page.spaceId,
page.workspaceId,
);
await this.pageHistoryRepo.saveHistory(page, { contributorIds });
this.logger.debug(`History created for page: ${pageId}`);
} catch (err) {
await this.collabHistory.addContributors(pageId, contributorIds);
throw err;
}
{
const mentions = extractMentions(page.content);
const pageMentions = extractPageMentions(mentions);
const internalLinkSlugIds = extractInternalLinkSlugIds(page.content);
@@ -102,7 +178,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
);
});
if (contributorIds.length > 0 && lastHistory?.content) {
if (contributorIds.length > 0 && lastHistoryContent) {
await this.notificationQueue
.add(QueueJob.PAGE_UPDATED, {
pageId,
@@ -0,0 +1,27 @@
import { type Kysely } from 'kysely';
/**
* #370 page-versioning intentionality tier on a history snapshot.
*
* Adds `page_history.kind`, the three-tier "how intentional was this snapshot"
* marker that lets versions (intentional points) be told apart from autosaves:
* - 'manual' a human explicitly saved a version (Cmd+S / Save button)
* - 'agent' the AI agent explicitly saved a version
* - 'idle' trailing idle-flush autosnapshot (safety net)
* - 'boundary' autosnapshot pinned on a source transition (useragentgit)
*
* Nullable with NO default (mirrors last_updated_source in the agent-provenance
* migration): legacy rows predate the marker and read back as `null`, which the
* client renders as a plain autosave. Stored as a short varchar to stay
* forward-compatible without an enum migration.
*/
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('page_history')
.addColumn('kind', 'varchar(20)', (col) => col)
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('page_history').dropColumn('kind').execute();
}
@@ -13,6 +13,7 @@ import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres';
import { ExpressionBuilder, sql } from 'kysely';
import { DB } from '@docmost/db/types/db';
import { resolveAgentProvenance } from '../agent-provenance';
import { PageHistoryKind } from '../../../collaboration/constants';
/**
* Role-resolution subquery for a page-history row's bound AI chat (#300). Joins
@@ -46,6 +47,9 @@ export class PageHistoryRepo {
'lastUpdatedById',
'lastUpdatedSource',
'lastUpdatedAiChatId',
// #370 — intentionality tier ('manual' | 'agent' | 'idle' | 'boundary');
// null on legacy rows (= autosave). Selected so callers can read/promote it.
'kind',
'contributorIds',
'spaceId',
'workspaceId',
@@ -85,9 +89,15 @@ export class PageHistoryRepo {
async saveHistory(
page: Page,
opts?: { contributorIds?: string[]; trx?: KyselyTransaction },
): Promise<void> {
await this.insertPageHistory(
opts?: {
contributorIds?: string[];
// #370 — intentionality tier for this snapshot. Omitted → null (legacy
// autosave semantics). Callers derive it server-side, never from a client.
kind?: PageHistoryKind;
trx?: KyselyTransaction;
},
): Promise<PageHistory> {
return await this.insertPageHistory(
{
pageId: page.id,
slugId: page.slugId,
@@ -99,6 +109,7 @@ export class PageHistoryRepo {
// Copy the provenance marker off the page row, as for lastUpdatedById.
lastUpdatedSource: page.lastUpdatedSource,
lastUpdatedAiChatId: page.lastUpdatedAiChatId,
kind: opts?.kind ?? null,
contributorIds: opts?.contributorIds,
spaceId: page.spaceId,
workspaceId: page.workspaceId,
@@ -107,6 +118,25 @@ export class PageHistoryRepo {
);
}
/**
* #370 promote an existing snapshot's intentionality tier in place. Used by
* the manual-save "promote-not-dup" path: when the latest history row already
* holds the exact content being versioned, we upgrade its `kind` instead of
* duplicating a heavy content row.
*/
async updateHistoryKind(
pageHistoryId: string,
kind: PageHistoryKind,
trx?: KyselyTransaction,
): Promise<void> {
const db = dbOrTx(this.db, trx);
await db
.updateTable('pageHistory')
.set({ kind })
.where('id', '=', pageHistoryId)
.execute();
}
async findPageHistoryByPageId(pageId: string, pagination: PaginationOptions) {
const query = this.db
.selectFrom('pageHistory')
+1
View File
@@ -280,6 +280,7 @@ export interface PageHistory {
createdAt: Generated<Timestamp>;
icon: string | null;
id: Generated<string>;
kind: string | null;
lastUpdatedAiChatId: string | null;
lastUpdatedById: string | null;
lastUpdatedSource: string | null;
@@ -1,124 +0,0 @@
import { readFileSync } from 'fs';
import { streamText, Output } from 'ai';
import { MockLanguageModelV3, simulateReadableStream } from 'ai/test';
/**
* Regression tests for patches/ai@6.0.134.patch (server heap OOM on long
* autonomous agent runs, #184).
*
* Unpatched ai@6.0.134 substitutes the default text() output strategy even
* when the caller passes NO `output` option. Its createOutputTransformStream
* then accumulates the ENTIRE turn text and, on EVERY text-delta, enqueues a
* flat snapshot of all text so far as `partialOutput` (O(n^2) memory). Those
* snapshots pile up in the never-consumed leftover tee() branch of
* DefaultStreamTextResult.baseStream, which is what OOM'd production during a
* ~28k-chunk agent turn. The pnpm patch skips partialOutput production
* entirely when no output strategy was requested, while keeping per-delta
* streaming granularity.
*/
describe('ai@6.0.134 pnpm patch: no partialOutput accumulation without an output strategy', () => {
const makeModel = () =>
new MockLanguageModelV3({
doStream: async () => ({
stream: simulateReadableStream({
chunks: [
{ type: 'stream-start' as const, warnings: [] },
{ type: 'text-start' as const, id: '1' },
{ type: 'text-delta' as const, id: '1', delta: 'Hello' },
{ type: 'text-delta' as const, id: '1', delta: ', ' },
{ type: 'text-delta' as const, id: '1', delta: 'world!' },
{ type: 'text-end' as const, id: '1' },
{
type: 'finish' as const,
finishReason: { unified: 'stop' as const, raw: 'stop' },
usage: {
inputTokens: {
total: 1,
noCache: undefined,
cacheRead: undefined,
cacheWrite: undefined,
},
outputTokens: { total: 1, text: 1, reasoning: undefined },
},
},
],
}),
}),
});
it('preserves per-delta streaming granularity in textStream', async () => {
const result = streamText({ model: makeModel(), prompt: 'hi' });
const deltas: string[] = [];
for await (const delta of result.textStream) {
deltas.push(delta);
}
// The patch must NOT coalesce or drop deltas: three model deltas arrive
// as three separate textStream chunks.
expect(deltas).toEqual(['Hello', ', ', 'world!']);
});
it('emits NO partialOutput values when the caller did not request an output strategy', async () => {
const result = streamText({ model: makeModel(), prompt: 'hi' });
// Fully consume the primary stream first (mirrors production usage).
for await (const _ of result.textStream) {
// drain
}
const partials: unknown[] = [];
for await (const partial of result.experimental_partialOutputStream) {
partials.push(partial);
}
// TRIPWIRE: on unpatched ai@6.0.134 the default text() output strategy
// yields one cumulative partial per text-delta here (['Hello', 'Hello, ',
// 'Hello, world!']). An empty stream proves the patch is applied and no
// cumulative snapshots are being produced (and thus none can pile up in
// the leftover internal tee branch).
expect(partials).toEqual([]);
});
it('preserves cumulative partialOutput when the caller DOES request an output strategy', async () => {
// PRESERVE-BRANCH GUARD: the patch only short-circuits partialOutput when
// `output == null`. When an output strategy IS set (here Output.text()),
// createOutputTransformStream must fall through to the ORIGINAL code path
// and keep publishing cumulative snapshots, so object/text-output consumers
// behave byte-identically to unpatched ai. A careless re-port that routed
// output-set calls into the skip branch would leave partialOutput empty and
// silently break those consumers — this test is the tripwire for that.
const result = streamText({
model: makeModel(),
prompt: 'hi',
experimental_output: Output.text(),
});
// Drain the primary stream fully and accumulate the complete output text.
let fullText = '';
for await (const delta of result.textStream) {
fullText += delta;
}
const partials: string[] = [];
for await (const partial of result.experimental_partialOutputStream) {
partials.push(partial);
}
// With a strategy set, partialOutput must be PRESERVED (non-empty) and
// cumulative: the last emitted partial equals the full accumulated text.
expect(partials.length).toBeGreaterThan(0);
expect(partials[partials.length - 1]).toBe(fullText);
expect(fullText).toBe('Hello, world!');
});
it('both installed dist builds (CJS and ESM) carry the patch marker', () => {
// Secondary guard: pins the patch to BOTH bundles the SDK ships, since
// the NestJS server consumes CJS while other tooling may load ESM.
const cjsPath = require.resolve('ai');
const mjsPath = cjsPath.replace(/index\.js$/, 'index.mjs');
expect(cjsPath).toMatch(/index\.js$/);
expect(readFileSync(cjsPath, 'utf8')).toContain('PATCH(docmost');
expect(readFileSync(mjsPath, 'utf8')).toContain('PATCH(docmost');
});
});
@@ -20,6 +20,10 @@ export interface IStripeSeatsSyncJob {
export interface IPageHistoryJob {
pageId: string;
// #370 — intentionality tier the worker stamps on the snapshot. All jobs on
// this queue are trailing idle-flush autosnapshots, so this is 'idle' (absent
// → treated as 'idle' by the processor).
kind?: 'idle';
}
/**
+1 -2
View File
@@ -96,8 +96,7 @@
"pnpm": {
"patchedDependencies": {
"scimmy@1.3.5": "patches/scimmy@1.3.5.patch",
"yjs@13.6.30": "patches/yjs@13.6.30.patch",
"ai@6.0.134": "patches/ai@6.0.134.patch"
"yjs@13.6.30": "patches/yjs@13.6.30.patch"
},
"overrides": {
"prosemirror-changeset": "2.4.0",
-68
View File
@@ -1,68 +0,0 @@
diff --git a/dist/index.js b/dist/index.js
index ae447a12f7823ec0a00837ee9f0eb809a610d5f8..a3402b2c2d021ef432cfa76e35d370073d525135 100644
--- a/dist/index.js
+++ b/dist/index.js
@@ -6578,9 +6578,19 @@ function createOutputTransformStream(output) {
controller.enqueue({ part: chunk, partialOutput: void 0 });
return;
}
- text2 += chunk.text;
textChunk += chunk.text;
textProviderMetadata = (_a21 = chunk.providerMetadata) != null ? _a21 : textProviderMetadata;
+ if (output == null) {
+ // PATCH(docmost #OOM): no output strategy requested -> publish each
+ // text-delta immediately and do NOT build cumulative partialOutput
+ // snapshots. Unpatched, the default text() output snapshots the ENTIRE
+ // accumulated turn text on every delta (O(n^2) memory) and those
+ // snapshots pile up in the never-consumed leftover tee branch of
+ // DefaultStreamTextResult.baseStream -> heap OOM on long agent turns.
+ publishTextChunk({ controller });
+ return;
+ }
+ text2 += chunk.text;
const result = await output.parsePartialOutput({ text: text2 });
if (result !== void 0) {
const currentJson = JSON.stringify(result.partial);
@@ -6959,7 +6969,7 @@ var DefaultStreamTextResult = class {
})
);
}
- this.baseStream = stream.pipeThrough(createOutputTransformStream(output != null ? output : text())).pipeThrough(eventProcessor);
+ this.baseStream = stream.pipeThrough(createOutputTransformStream(output)).pipeThrough(eventProcessor);
const { maxRetries, retry } = prepareRetries({
maxRetries: maxRetriesArg,
abortSignal
diff --git a/dist/index.mjs b/dist/index.mjs
index 663875332e3f9a9bd167c25583c515876f42951b..b840b0502c9894df983e0154805abb80e70e6331 100644
--- a/dist/index.mjs
+++ b/dist/index.mjs
@@ -6501,9 +6501,19 @@ function createOutputTransformStream(output) {
controller.enqueue({ part: chunk, partialOutput: void 0 });
return;
}
- text2 += chunk.text;
textChunk += chunk.text;
textProviderMetadata = (_a21 = chunk.providerMetadata) != null ? _a21 : textProviderMetadata;
+ if (output == null) {
+ // PATCH(docmost #OOM): no output strategy requested -> publish each
+ // text-delta immediately and do NOT build cumulative partialOutput
+ // snapshots. Unpatched, the default text() output snapshots the ENTIRE
+ // accumulated turn text on every delta (O(n^2) memory) and those
+ // snapshots pile up in the never-consumed leftover tee branch of
+ // DefaultStreamTextResult.baseStream -> heap OOM on long agent turns.
+ publishTextChunk({ controller });
+ return;
+ }
+ text2 += chunk.text;
const result = await output.parsePartialOutput({ text: text2 });
if (result !== void 0) {
const currentJson = JSON.stringify(result.partial);
@@ -6882,7 +6892,7 @@ var DefaultStreamTextResult = class {
})
);
}
- this.baseStream = stream.pipeThrough(createOutputTransformStream(output != null ? output : text())).pipeThrough(eventProcessor);
+ this.baseStream = stream.pipeThrough(createOutputTransformStream(output)).pipeThrough(eventProcessor);
const { maxRetries, retry } = prepareRetries({
maxRetries: maxRetriesArg,
abortSignal
+5 -8
View File
@@ -44,9 +44,6 @@ overrides:
ip-address: 10.1.1
patchedDependencies:
ai@6.0.134:
hash: f60bfc3357e01e1f3978c6c40fdd65aeb33fefaad7179cde8676465b6c5ff4d9
path: patches/ai@6.0.134.patch
scimmy@1.3.5:
hash: 775d80f86830b2c5dd1a250c9802c10f8fc3da3c7898373de5aa0c23993d1673
path: patches/scimmy@1.3.5.patch
@@ -626,10 +623,10 @@ importers:
version: 8.3.0(socket.io-adapter@2.5.4)
ai:
specifier: ^6.0.134
version: 6.0.134(patch_hash=f60bfc3357e01e1f3978c6c40fdd65aeb33fefaad7179cde8676465b6c5ff4d9)(zod@4.3.6)
version: 6.0.134(zod@4.3.6)
ai-sdk-ollama:
specifier: ^3.8.1
version: 3.8.1(ai@6.0.134(patch_hash=f60bfc3357e01e1f3978c6c40fdd65aeb33fefaad7179cde8676465b6c5ff4d9)(zod@4.3.6))(zod@4.3.6)
version: 3.8.1(ai@6.0.134(zod@4.3.6))(zod@4.3.6)
bcrypt:
specifier: ^6.0.0
version: 6.0.0
@@ -16358,17 +16355,17 @@ snapshots:
agent-base@7.1.4: {}
ai-sdk-ollama@3.8.1(ai@6.0.134(patch_hash=f60bfc3357e01e1f3978c6c40fdd65aeb33fefaad7179cde8676465b6c5ff4d9)(zod@4.3.6))(zod@4.3.6):
ai-sdk-ollama@3.8.1(ai@6.0.134(zod@4.3.6))(zod@4.3.6):
dependencies:
'@ai-sdk/provider': 3.0.8
'@ai-sdk/provider-utils': 4.0.21(zod@4.3.6)
ai: 6.0.134(patch_hash=f60bfc3357e01e1f3978c6c40fdd65aeb33fefaad7179cde8676465b6c5ff4d9)(zod@4.3.6)
ai: 6.0.134(zod@4.3.6)
jsonrepair: 3.13.3
ollama: 0.6.3
transitivePeerDependencies:
- zod
ai@6.0.134(patch_hash=f60bfc3357e01e1f3978c6c40fdd65aeb33fefaad7179cde8676465b6c5ff4d9)(zod@4.3.6):
ai@6.0.134(zod@4.3.6):
dependencies:
'@ai-sdk/gateway': 3.0.77(zod@4.3.6)
'@ai-sdk/provider': 3.0.8