diff --git a/CHANGELOG.md b/CHANGELOG.md index aea1854d..a107b71a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Save intentional page versions.** Press `Cmd/Ctrl+S` (or use the page menu) + to save a named version of a page. The history panel now distinguishes + intentional versions (a "Saved" / "Agent version" badge) from automatic + snapshots, dims autosaves, and offers an "Only versions" filter. Automatic + snapshots switched from a fixed interval to a trailing idle-flush with a + max-wait ceiling, and a boundary snapshot is pinned whenever the editing source + changes (e.g. a person's edits followed by the AI agent). (#370) + - **Place several images side by side in a row.** A new "Inline (side by side)" alignment mode in the image bubble menu renders consecutive inline images as a row that wraps onto the next line on narrow screens. The row is diff --git a/apps/client/public/locales/en-US/translation.json b/apps/client/public/locales/en-US/translation.json index 89988d9b..00b4b237 100644 --- a/apps/client/public/locales/en-US/translation.json +++ b/apps/client/public/locales/en-US/translation.json @@ -1385,5 +1385,14 @@ "The commented text changed since this suggestion was made; it was not applied.": "The commented text changed since this suggestion was made; it was not applied.", "Dismiss": "Dismiss", "Suggestion dismissed": "Suggestion dismissed", - "Failed to dismiss suggestion": "Failed to dismiss suggestion" + "Failed to dismiss suggestion": "Failed to dismiss suggestion", + "Save version": "Save version", + "Ctrl+S": "Ctrl+S", + "Version saved": "Version saved", + "Already saved as the latest version": "Already saved as the latest version", + "Agent version": "Agent version", + "Boundary": "Boundary", + "Autosave": "Autosave", + "Only versions": "Only versions", + "No saved versions yet.": "No saved versions yet." } diff --git a/apps/client/public/locales/ru-RU/translation.json b/apps/client/public/locales/ru-RU/translation.json index 0c792632..51b8e3bc 100644 --- a/apps/client/public/locales/ru-RU/translation.json +++ b/apps/client/public/locales/ru-RU/translation.json @@ -1248,5 +1248,14 @@ "The commented text changed since this suggestion was made; it was not applied.": "Прокомментированный текст изменился после создания предложения; оно не было применено.", "Dismiss": "Не применять", "Suggestion dismissed": "Предложение отклонено", - "Failed to dismiss suggestion": "Не удалось отклонить предложение" + "Failed to dismiss suggestion": "Не удалось отклонить предложение", + "Save version": "Сохранить версию", + "Ctrl+S": "Ctrl+S", + "Version saved": "Версия сохранена", + "Already saved as the latest version": "Уже сохранено как последняя версия", + "Agent version": "Версия агента", + "Boundary": "Граница", + "Autosave": "Автосейв", + "Only versions": "Только версии", + "No saved versions yet.": "Пока нет сохранённых версий." } diff --git a/apps/client/src/features/editor/atoms/editor-atoms.ts b/apps/client/src/features/editor/atoms/editor-atoms.ts index 605a71db..7b513cc1 100644 --- a/apps/client/src/features/editor/atoms/editor-atoms.ts +++ b/apps/client/src/features/editor/atoms/editor-atoms.ts @@ -1,10 +1,19 @@ import { atom } from "jotai"; import { Editor } from "@tiptap/core"; +import type { HocuspocusProvider } from "@hocuspocus/provider"; import { PageEditMode } from "@/features/user/types/user.types.ts"; import type { DictationUnavailableReason } from "@/features/dictation/dictation-status"; export const pageEditorAtom = atom(null); +// #370 — the active page's collab provider, published by the page editor so the +// header menu can emit the "save-version" stateless signal (Cmd+S / button). +// Null when the page is read-only / collab isn't connected. A typed initial +// value (rather than an explicit generic) keeps jotai's overload resolution on +// the writable PrimitiveAtom branch. +const initialCollabProvider: HocuspocusProvider | null = null; +export const collabProviderAtom = atom(initialCollabProvider); + export const titleEditorAtom = atom(null); export const readOnlyEditorAtom = atom(null); diff --git a/apps/client/src/features/editor/page-editor.tsx b/apps/client/src/features/editor/page-editor.tsx index e1244ee5..60949e8a 100644 --- a/apps/client/src/features/editor/page-editor.tsx +++ b/apps/client/src/features/editor/page-editor.tsx @@ -31,11 +31,18 @@ import { useAtom, useAtomValue, useSetAtom } from "jotai"; import useCollaborationUrl from "@/features/editor/hooks/use-collaboration-url"; import { currentUserAtom } from "@/features/user/atoms/current-user-atom"; import { + collabProviderAtom, currentPageEditModeAtom, dictationAvailabilityAtom, pageEditorAtom, yjsConnectionStatusAtom, } from "@/features/editor/atoms/editor-atoms"; +import { notifications } from "@mantine/notifications"; +import { + VERSION_SAVED_MESSAGE_TYPE, + type VersionSavedMessage, + saveVersionPending, +} from "@/features/page-history/version-messages"; import { asideStateAtom } from "@/components/layouts/global/hooks/atoms/sidebar-atom"; import { activeCommentIdAtom, @@ -123,6 +130,7 @@ export default function PageEditor({ const [currentUser] = useAtom(currentUserAtom); const [, setEditor] = useAtom(pageEditorAtom); + const setCollabProvider = useSetAtom(collabProviderAtom); const [, setAsideState] = useAtom(asideStateAtom); const [, setActiveCommentId] = useAtom(activeCommentIdAtom); const [showCommentPopup, setShowCommentPopup] = useAtom(showCommentPopupAtom); @@ -180,6 +188,24 @@ export default function PageEditor({ const onStatelessHandler = ({ payload }: onStatelessParameters) => { try { const message = JSON.parse(payload); + // #370 — a version was saved somewhere; live-refresh the history panel + // on every client. Only the client that pressed Save (tracked by the + // module-level flag) shows the confirmation toast. + if (message?.type === VERSION_SAVED_MESSAGE_TYPE) { + const versionMsg = message as VersionSavedMessage; + queryClient.invalidateQueries({ + queryKey: ["page-history-list"], + }); + if (saveVersionPending.current) { + saveVersionPending.current = false; + notifications.show({ + message: versionMsg.alreadySaved + ? t("Already saved as the latest version") + : t("Version saved"), + }); + } + return; + } if (message?.type !== "page.updated" || !message.updatedAt) return; const pageData = queryClient.getQueryData(["pages", slugId]); if (pageData) { @@ -237,12 +263,16 @@ export default function PageEditor({ local.on("synced", onLocalSyncedHandler); providersRef.current = { socket, local, remote }; + // #370 — publish the provider so the header menu can emit save-version. + setCollabProvider(remote); setProvidersReady(true); } else { + setCollabProvider(providersRef.current.remote); setProvidersReady(true); } // Only destroy on final unmount return () => { + setCollabProvider(null); providersRef.current?.socket.destroy(); providersRef.current?.remote.destroy(); providersRef.current?.local.destroy(); diff --git a/apps/client/src/features/page-history/components/history-item.tsx b/apps/client/src/features/page-history/components/history-item.tsx index bc810eca..aa76b4f2 100644 --- a/apps/client/src/features/page-history/components/history-item.tsx +++ b/apps/client/src/features/page-history/components/history-item.tsx @@ -1,4 +1,11 @@ -import { Text, Group, UnstyledButton, Avatar, Tooltip } from "@mantine/core"; +import { + Text, + Group, + UnstyledButton, + Avatar, + Tooltip, + Badge, +} from "@mantine/core"; import { CustomAvatar } from "@/components/ui/custom-avatar.tsx"; import { AgentAvatarStack } from "@/components/ui/agent-avatar-stack.tsx"; import { formattedDate } from "@/lib/time"; @@ -7,36 +14,59 @@ import clsx from "clsx"; import { IPageHistory } from "@/features/page-history/types/page.types"; import { memo, useCallback } from "react"; import { useSetAtom } from "jotai"; +import { useTranslation } from "react-i18next"; import { historyAtoms } from "@/features/page-history/atoms/history-atoms.ts"; const MAX_VISIBLE_AVATARS = 5; +/** + * #370 — map a snapshot's intentionality tier to its badge. `version: true` + * marks the intentional points (manual / agent); autosaves (boundary / idle / + * legacy null) are non-versions and get dimmed in the list. + */ +type HistoryKindMeta = { labelKey: string; color: string; version: boolean }; +export function historyKindMeta(kind?: string | null): HistoryKindMeta { + switch (kind) { + case "manual": + return { labelKey: "Saved", color: "blue", version: true }; + case "agent": + return { labelKey: "Agent version", color: "violet", version: true }; + case "boundary": + return { labelKey: "Boundary", color: "gray", version: false }; + default: // "idle" | null | undefined (legacy autosave) + return { labelKey: "Autosave", color: "gray", version: false }; + } +} + interface HistoryItemProps { historyItem: IPageHistory; - index: number; - onSelect: (id: string, index: number) => void; - onHover?: (id: string, index: number) => void; + // The previous snapshot for diff/restore is resolved by id from the FULL list + // in the parent (resolvePrevSnapshotId), so the item only needs to report its + // own id — never a list index (which would be the filtered-view index). + onSelect: (id: string) => void; + onHover?: (id: string) => void; onHoverEnd?: () => void; isActive: boolean; } const HistoryItem = memo(function HistoryItem({ historyItem, - index, onSelect, onHover, onHoverEnd, isActive, }: HistoryItemProps) { const setHistoryModalOpen = useSetAtom(historyAtoms); + const { t } = useTranslation(); + const kindMeta = historyKindMeta(historyItem.kind); const handleClick = useCallback(() => { - onSelect(historyItem.id, index); - }, [onSelect, historyItem.id, index]); + onSelect(historyItem.id); + }, [onSelect, historyItem.id]); const handleMouseEnter = useCallback(() => { - onHover?.(historyItem.id, index); - }, [onHover, historyItem.id, index]); + onHover?.(historyItem.id); + }, [onHover, historyItem.id]); const contributors = historyItem.contributors; const hasContributors = contributors && contributors.length > 0; @@ -49,8 +79,20 @@ const HistoryItem = memo(function HistoryItem({ onMouseEnter={handleMouseEnter} onMouseLeave={onHoverEnd} className={clsx(classes.history, { [classes.active]: isActive })} + // #370 — dim autosnapshots so intentional versions stand out. + style={{ opacity: kindMeta.version ? 1 : 0.55 }} > - {formattedDate(new Date(historyItem.createdAt))} + + {formattedDate(new Date(historyItem.createdAt))} + + {t(kindMeta.labelKey)} + + {hasContributors ? ( diff --git a/apps/client/src/features/page-history/components/history-list.tsx b/apps/client/src/features/page-history/components/history-list.tsx index 4024901b..b3400d5f 100644 --- a/apps/client/src/features/page-history/components/history-list.tsx +++ b/apps/client/src/features/page-history/components/history-list.tsx @@ -9,7 +9,7 @@ import { historyAtoms, } from "@/features/page-history/atoms/history-atoms"; import { useAtom, useSetAtom } from "jotai"; -import { useCallback, useEffect, useMemo, useRef } from "react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { Button, ScrollArea, @@ -17,9 +17,12 @@ import { Divider, Loader, Center, + Switch, + Text, } from "@mantine/core"; import { useTranslation } from "react-i18next"; import { useHistoryRestore } from "@/features/page-history/hooks"; +import { resolvePrevSnapshotId } from "@/features/page-history/utils/resolve-prev-snapshot"; const PREFETCH_DELAY_MS = 150; @@ -47,6 +50,23 @@ function HistoryList({ pageId }: Props) { [pageHistoryData], ); + // #370 — "only versions" filter: hide autosnapshots (idle/boundary/legacy + // null), keep only intentional points (manual/agent). Filtering is over the + // already-loaded pages; the diff/restore still targets the true previous + // snapshot, so items carry their index within the FULL list. + const [onlyVersions, setOnlyVersions] = useState(false); + const isVersion = useCallback( + (kind?: string | null) => kind === "manual" || kind === "agent", + [], + ); + const visibleItems = useMemo( + () => + onlyVersions + ? historyItems.filter((item) => isVersion(item.kind)) + : historyItems, + [historyItems, onlyVersions, isVersion], + ); + const loadMoreRef = useRef(null); const prefetchTimeoutRef = useRef | null>(null); @@ -60,11 +80,13 @@ function HistoryList({ pageId }: Props) { }, []); const handleHover = useCallback( - (historyId: string, index: number) => { + (historyId: string) => { clearPrefetchTimeout(); prefetchTimeoutRef.current = setTimeout(() => { prefetchPageHistory(historyId); - const prevId = historyItems[index + 1]?.id; + // The true previous snapshot in the FULL list (not the previous visible + // one under the "only versions" filter). + const prevId = resolvePrevSnapshotId(historyItems, historyId); if (prevId) { prefetchPageHistory(prevId); } @@ -78,9 +100,11 @@ function HistoryList({ pageId }: Props) { }, [clearPrefetchTimeout]); const handleSelect = useCallback( - (id: string, index: number) => { + (id: string) => { setActiveHistoryId(id); - setActiveHistoryPrevId(historyItems[index + 1]?.id ?? ""); + // Baseline = true previous snapshot in the FULL list, so the "only + // versions" filter never diffs/restores against the wrong item. + setActiveHistoryPrevId(resolvePrevSnapshotId(historyItems, id)); }, [historyItems, setActiveHistoryId, setActiveHistoryPrevId], ); @@ -128,12 +152,27 @@ function HistoryList({ pageId }: Props) { return (
+ + setOnlyVersions(e.currentTarget.checked)} + label={t("Only versions")} + /> + + - {historyItems.map((historyItem, index) => ( + {onlyVersions && visibleItems.length === 0 && ( +
+ + {t("No saved versions yet.")} + +
+ )} + {visibleItems.map((historyItem) => ( { + // Newest-first, as the history list stores it: a version, then two autosaves, + // then an older version. + const full = [ + { id: "v2", kind: "manual" }, + { id: "a2", kind: "idle" }, + { id: "a1", kind: "boundary" }, + { id: "v1", kind: "manual" }, + { id: "a0", kind: null }, + ]; + + it("returns the immediate FULL-list successor, not the previous visible version", () => { + // Selecting v2 while filtered to versions-only must baseline against a2 (the + // real chronological predecessor), NOT v1 (the previous visible version). + expect(resolvePrevSnapshotId(full, "v2")).toBe("a2"); + }); + + it("resolves an autosnapshot's predecessor by full-list order", () => { + expect(resolvePrevSnapshotId(full, "a1")).toBe("v1"); + }); + + it("returns '' for the oldest item (no predecessor)", () => { + expect(resolvePrevSnapshotId(full, "a0")).toBe(""); + }); + + it("returns '' for an id not in the list", () => { + expect(resolvePrevSnapshotId(full, "missing")).toBe(""); + }); + + it("does not depend on a filtered subset — same result whatever is visible", () => { + // The helper only ever sees the full list; a filtered view cannot change the + // baseline it computes. + expect(resolvePrevSnapshotId(full, "v1")).toBe("a0"); + }); +}); diff --git a/apps/client/src/features/page-history/utils/resolve-prev-snapshot.ts b/apps/client/src/features/page-history/utils/resolve-prev-snapshot.ts new file mode 100644 index 00000000..73320cd2 --- /dev/null +++ b/apps/client/src/features/page-history/utils/resolve-prev-snapshot.ts @@ -0,0 +1,22 @@ +/** + * #370 — resolve the TRUE previous snapshot for a history item. + * + * The history panel can be filtered to "only versions" (manual/agent), but diff + * and restore must always compare against the immediately-preceding snapshot in + * the FULL, unfiltered list — NOT the previous VISIBLE item. Comparing against + * the previous visible version would silently skip the autosnapshots between two + * versions and diff/restore the wrong baseline. + * + * Given the full (newest-first) list and an item id, this returns the id of the + * item right after it in the full list (its chronological predecessor), or "" if + * it is the oldest / not found. Pure and list-order-preserving so it can be unit + * tested without mounting the component. + */ +export function resolvePrevSnapshotId( + fullItems: ReadonlyArray<{ id: string }>, + id: string, +): string { + const index = fullItems.findIndex((item) => item.id === id); + if (index === -1) return ""; + return fullItems[index + 1]?.id ?? ""; +} diff --git a/apps/client/src/features/page-history/version-messages.ts b/apps/client/src/features/page-history/version-messages.ts new file mode 100644 index 00000000..1e944397 --- /dev/null +++ b/apps/client/src/features/page-history/version-messages.ts @@ -0,0 +1,28 @@ +/** + * #370 — page-version stateless wire formats. Kept in one place so the client + * emitter (Save hotkey / button) and the client listener (page-editor) agree + * with the server (PersistenceExtension) on the message shapes. + */ + +/** Client → server: "save a version now". The server derives the tier + * (manual/agent) from the signed connection actor, never from this payload. */ +export const SAVE_VERSION_MESSAGE_TYPE = "save-version"; + +/** Server → all clients: a version was saved (or promoted / already existed). */ +export const VERSION_SAVED_MESSAGE_TYPE = "version.saved"; + +export interface VersionSavedMessage { + type: typeof VERSION_SAVED_MESSAGE_TYPE; + historyId: string; + kind: "manual" | "agent"; + /** True when the latest snapshot was already a manual version (a no-op save). */ + alreadySaved: boolean; +} + +/** + * Cross-component coordination flag so only the client that pressed Save shows + * the confirmation toast, while every other client silently refreshes its + * history panel on the broadcast. A module-level ref avoids stale-closure + * pitfalls in the editor's long-lived stateless handler. + */ +export const saveVersionPending = { current: false }; diff --git a/apps/client/src/features/page/components/header/page-header-menu.tsx b/apps/client/src/features/page/components/header/page-header-menu.tsx index 732d9136..438dfee4 100644 --- a/apps/client/src/features/page/components/header/page-header-menu.tsx +++ b/apps/client/src/features/page/components/header/page-header-menu.tsx @@ -3,6 +3,7 @@ import { IconArrowRight, IconArrowsHorizontal, IconClockHour4, + IconDeviceFloppy, IconDots, IconEye, IconEyeOff, @@ -17,7 +18,7 @@ import { IconTrash, IconWifiOff, } from "@tabler/icons-react"; -import React, { useEffect, useRef, useState } from "react"; +import React, { useCallback, useEffect, useRef, useState } from "react"; import { useAsideTriggerProps } from "@/hooks/use-toggle-aside.tsx"; import { useAtom, useAtomValue } from "jotai"; import { historyAtoms } from "@/features/page-history/atoms/history-atoms.ts"; @@ -39,9 +40,14 @@ import { Trans, useTranslation } from "react-i18next"; import ExportModal from "@/components/common/export-modal"; import { htmlToMarkdown } from "@docmost/editor-ext"; import { + collabProviderAtom, pageEditorAtom, yjsConnectionStatusAtom, } from "@/features/editor/atoms/editor-atoms.ts"; +import { + SAVE_VERSION_MESSAGE_TYPE, + saveVersionPending, +} from "@/features/page-history/version-messages.ts"; import { formattedDate } from "@/lib/time.ts"; import { PageEditModeToggle } from "@/features/user/components/page-state-pref.tsx"; import MovePageModal from "@/features/page/components/move-page-modal.tsx"; @@ -72,9 +78,34 @@ export default function PageHeaderMenu({ readOnly }: PageHeaderMenuProps) { }); const isDeleted = !!page?.deletedAt; const [workspace] = useAtom(workspaceAtom); + const collabProvider = useAtomValue(collabProviderAtom); // Community public-sharing entry point (replaces the removed EE PageShareModal) const workspaceSharingDisabled = workspace?.settings?.sharing?.disabled === true; + // #370 — explicit "save a version" (Cmd+S / Save button). One path for the + // human; the server derives the tier from the signed actor. Readers can't save + // (the button is hidden and the collab connection is read-only server-side). + const handleSaveVersion = useCallback(() => { + if (readOnly || !collabProvider) return; + // Flag this client as the initiator so only it shows the confirmation toast; + // a safety timeout clears it if no broadcast comes back (e.g. offline). + saveVersionPending.current = true; + window.setTimeout(() => { + saveVersionPending.current = false; + }, 5000); + collabProvider.sendStateless( + JSON.stringify({ type: SAVE_VERSION_MESSAGE_TYPE }), + ); + }, [readOnly, collabProvider]); + + // mod+S must also block the browser's "Save page" dialog. `triggerOnContent- + // Editable` + empty ignore-list so it fires while typing in the editor/title. + useHotkeys( + [["mod+S", handleSaveVersion, { preventDefault: true }]], + [], + true, + ); + useHotkeys( [ [ @@ -133,15 +164,16 @@ export default function PageHeaderMenu({ readOnly }: PageHeaderMenuProps) { - + ); } interface PageActionMenuProps { readOnly?: boolean; + onSaveVersion?: () => void; } -function PageActionMenu({ readOnly }: PageActionMenuProps) { +function PageActionMenu({ readOnly, onSaveVersion }: PageActionMenuProps) { const { t } = useTranslation(); const [, setHistoryModalOpen] = useAtom(historyAtoms); const clipboard = useClipboard({ timeout: 500 }); @@ -302,6 +334,20 @@ function PageActionMenu({ readOnly }: PageActionMenuProps) { + {!readOnly && ( + } + onClick={onSaveVersion} + rightSection={ + + {t("Ctrl+S")} + + } + > + {t("Save version")} + + )} + } onClick={openHistoryModal} diff --git a/apps/server/src/collaboration/constants.ts b/apps/server/src/collaboration/constants.ts index 8ce8c825..315d70ba 100644 --- a/apps/server/src/collaboration/constants.ts +++ b/apps/server/src/collaboration/constants.ts @@ -1,3 +1,29 @@ -export const HISTORY_INTERVAL = 5 * 60 * 1000; -export const HISTORY_FAST_INTERVAL = 60 * 1000; -export const HISTORY_FAST_THRESHOLD = 5 * 60 * 1000; +/** + * #370 — page-history intentionality tiers. Domain of `page_history.kind`. + * - 'manual' / 'agent' → Tier 1 versions (intentional points) + * - 'idle' / 'boundary' → Tier 0 autosnapshots (safety net) + * A legacy `null` kind is treated as an autosave. + */ +export type PageHistoryKind = 'manual' | 'agent' | 'idle' | 'boundary'; + +/** + * #370 — trailing idle-flush windows. A page's pending idle snapshot is + * re-armed on every store and fires this long after edits go quiet, so a burst + * of edits collapses into a single autosnapshot instead of one-per-store. Human + * sessions are noisier and less risky, so they flush less often than the agent. + */ +export const IDLE_INTERVAL_USER = 60 * 60 * 1000; // 60m +export const IDLE_INTERVAL_AGENT = 15 * 60 * 1000; // 15m + +/** + * #370 — max-wait ceiling for the idle flush. Pure trailing debounce starves the + * safety net: hocuspocus stores at least every ~45s, so a CONTINUOUS editing + * session would re-arm the trailing timer forever and never take an idle + * snapshot until edits finally go quiet (up to IDLE_INTERVAL_USER = 60m). This + * ceiling bounds the actual wait from the FIRST edit of a burst, so an idle + * snapshot fires at least this often during a long unbroken session — restoring + * a recovery point cadence closer to the old heuristic without one-per-store + * noise. Mirrors hocuspocus's own maxDebounce idea. + */ +export const IDLE_MAX_WAIT_USER = 10 * 60 * 1000; // 10m +export const IDLE_MAX_WAIT_AGENT = 5 * 60 * 1000; // 5m diff --git a/apps/server/src/collaboration/extensions/compute-history-job.spec.ts b/apps/server/src/collaboration/extensions/compute-history-job.spec.ts index aa21f14f..be42040f 100644 --- a/apps/server/src/collaboration/extensions/compute-history-job.spec.ts +++ b/apps/server/src/collaboration/extensions/compute-history-job.spec.ts @@ -1,84 +1,93 @@ +import { computeHistoryJob, resolveSource } from './persistence.extension'; import { - computeHistoryJob, - resolveSource, -} from './persistence.extension'; -import { - HISTORY_FAST_INTERVAL, - HISTORY_FAST_THRESHOLD, - HISTORY_INTERVAL, + IDLE_INTERVAL_AGENT, + IDLE_INTERVAL_USER, + IDLE_MAX_WAIT_AGENT, + IDLE_MAX_WAIT_USER, } from '../constants'; -// A fixed clock + fixed createdAt make pageAge deterministic. -const NOW = 1_700_000_000_000; const PAGE_ID = '550e8400-e29b-41d4-a716-446655440000'; -// Build a minimal page whose age (NOW - createdAt) is exactly `ageMs`. -const pageAged = (ageMs: number) => ({ - id: PAGE_ID, - createdAt: new Date(NOW - ageMs), -}); +const page = { id: PAGE_ID }; -describe('computeHistoryJob', () => { - it('agent edit → delay MUST be 0 and job id is source-keyed', () => { - // INVARIANT (§15 H2 / persistence.extension): the agent delay MUST stay 0. - // The worker re-reads the page row at run time, so any non-zero delay risks - // snapshotting content a later human edit has already overwritten. This is - // the load-bearing assertion of this spec — do not relax it. - const { jobId, delay } = computeHistoryJob(pageAged(0), 'agent', NOW); - expect(delay).toBe(0); - expect(jobId).toBe(`${PAGE_ID}-agent`); - }); - - it('agent edit on an OLD page is still delay 0 (age never applies to agents)', () => { - // Even when the page is far older than the fast threshold, the agent path - // must short-circuit to 0 — age-based debounce is a human-only concern. - const { jobId, delay } = computeHistoryJob( - pageAged(HISTORY_FAST_THRESHOLD + 60_000), - 'agent', - NOW, - ); - expect(delay).toBe(0); - expect(jobId).toBe(`${PAGE_ID}-agent`); - }); - - it('human edit on a YOUNG page (age < threshold) → fast interval, bare job id', () => { - const { jobId, delay } = computeHistoryJob( - pageAged(HISTORY_FAST_THRESHOLD - 1), - 'user', - NOW, - ); - expect(delay).toBe(HISTORY_FAST_INTERVAL); +describe('computeHistoryJob (#370 — shared trailing idle pipeline)', () => { + it('human edit → user idle window, bare page.id job', () => { + // Humans and the agent now share ONE idle job per page (jobId = page.id). + // The agent's old delay=0 fast path is GONE — intentional agent points now + // arrive via the explicit save-version signal, not a zero-delay snapshot. + const { jobId, delay } = computeHistoryJob(page, 'user'); + expect(delay).toBe(IDLE_INTERVAL_USER); expect(jobId).toBe(PAGE_ID); }); - it('human edit on an OLD page (age > threshold) → standard interval', () => { - const { jobId, delay } = computeHistoryJob( - pageAged(HISTORY_FAST_THRESHOLD + 1), - 'user', - NOW, - ); - expect(delay).toBe(HISTORY_INTERVAL); + it('agent edit → agent idle window (shorter), still the bare page.id job', () => { + const { jobId, delay } = computeHistoryJob(page, 'agent'); + expect(delay).toBe(IDLE_INTERVAL_AGENT); + // No `-agent` suffix anymore: the agent joins the common idle pipeline. expect(jobId).toBe(PAGE_ID); }); - it('boundary: pageAge EXACTLY === threshold takes the slow branch (the `<` is strict)', () => { - // Off-by-one guard: the condition is `pageAge < HISTORY_FAST_THRESHOLD`, so - // an age of exactly the threshold is NOT "fast" — it must use HISTORY_INTERVAL. - const { delay } = computeHistoryJob( - pageAged(HISTORY_FAST_THRESHOLD), - 'user', - NOW, - ); - expect(delay).toBe(HISTORY_INTERVAL); + it('agent flushes sooner than a human', () => { + expect(IDLE_INTERVAL_AGENT).toBeLessThan(IDLE_INTERVAL_USER); }); - it('treats any non-"agent" source string as human', () => { - // resolveSource only ever yields 'agent' | 'user', but guard the contract: - // the agent branch keys strictly on === 'agent'. - const { jobId, delay } = computeHistoryJob(pageAged(0), 'user', NOW); - expect(delay).toBe(HISTORY_FAST_INTERVAL); + it('treats any non-"agent" source string as human (keys strictly on === agent)', () => { + const { jobId, delay } = computeHistoryJob(page, 'user'); + expect(delay).toBe(IDLE_INTERVAL_USER); expect(jobId).toBe(PAGE_ID); }); + + // #370 review round-1 WARNING: the max-wait ceiling prevents autosnapshot + // starvation during a continuous editing session (the trailing timer would + // otherwise re-arm forever and never fire). + describe('max-wait ceiling', () => { + const T0 = 1_000_000; // arbitrary fixed epoch for deterministic tests + + it('once a burst is armed, delay clamps to the remaining max-wait budget', () => { + // 1 minute into the burst the USER interval (60m) far exceeds the remaining + // max-wait budget (10m - 1m = 9m), so the delay is clamped DOWN to that + // remaining budget — the full interval is NOT used once a ceiling applies. + const { delay } = computeHistoryJob(page, 'user', T0, T0 + 60_000); + expect(delay).toBe(IDLE_MAX_WAIT_USER - 60_000); + }); + + it('never waits longer than the max-wait budget from the burst start', () => { + // A store arriving right at the ceiling → delay 0 (fire promptly). + const { delay } = computeHistoryJob( + page, + 'user', + T0, + T0 + IDLE_MAX_WAIT_USER, + ); + expect(delay).toBe(0); + }); + + it('past the ceiling never returns a negative delay', () => { + const { delay } = computeHistoryJob( + page, + 'user', + T0, + T0 + IDLE_MAX_WAIT_USER + 5 * 60_000, + ); + expect(delay).toBe(0); + }); + + it('the agent ceiling is shorter than the user ceiling', () => { + expect(IDLE_MAX_WAIT_AGENT).toBeLessThan(IDLE_MAX_WAIT_USER); + const { delay } = computeHistoryJob( + page, + 'agent', + T0, + T0 + IDLE_MAX_WAIT_AGENT, + ); + expect(delay).toBe(0); + }); + + it('without a burstStart there is no ceiling (backward-compatible)', () => { + expect(computeHistoryJob(page, 'user').delay).toBe(IDLE_INTERVAL_USER); + expect(computeHistoryJob(page, 'agent').delay).toBe(IDLE_INTERVAL_AGENT); + }); + }); }); describe('resolveSource (truth table)', () => { diff --git a/apps/server/src/collaboration/extensions/persistence-store.spec.ts b/apps/server/src/collaboration/extensions/persistence-store.spec.ts index e707290f..c387b0c4 100644 --- a/apps/server/src/collaboration/extensions/persistence-store.spec.ts +++ b/apps/server/src/collaboration/extensions/persistence-store.spec.ts @@ -40,11 +40,12 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot' let pageHistoryRepo: { saveHistory: jest.Mock; findPageLastHistory: jest.Mock; + updateHistoryKind: jest.Mock; }; let aiQueue: { add: jest.Mock }; - let historyQueue: { add: jest.Mock }; + let historyQueue: { add: jest.Mock; remove: jest.Mock }; let notificationQueue: { add: jest.Mock }; - let collabHistory: { addContributors: jest.Mock }; + let collabHistory: { addContributors: jest.Mock; popContributors: jest.Mock }; let transclusionService: { syncPageTransclusions: jest.Mock; syncPageReferences: jest.Mock; @@ -93,13 +94,22 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot' pageHistoryRepo = { saveHistory: jest.fn().mockImplementation(async () => { callOrder.push('saveHistory'); + return { id: 'history-1' }; }), findPageLastHistory: jest.fn().mockResolvedValue(null), + updateHistoryKind: jest.fn().mockResolvedValue(undefined), }; aiQueue = { add: jest.fn().mockResolvedValue(undefined) }; - historyQueue = { add: jest.fn().mockResolvedValue(undefined) }; + historyQueue = { + add: jest.fn().mockResolvedValue(undefined), + // #370 — enqueuePageHistory now removes any pending idle job before re-adding. + remove: jest.fn().mockResolvedValue(undefined), + }; notificationQueue = { add: jest.fn().mockResolvedValue(undefined) }; - collabHistory = { addContributors: jest.fn().mockResolvedValue(undefined) }; + collabHistory = { + addContributors: jest.fn().mockResolvedValue(undefined), + popContributors: jest.fn().mockResolvedValue([]), + }; transclusionService = { syncPageTransclusions: jest.fn().mockResolvedValue(undefined), syncPageReferences: jest.fn().mockResolvedValue(undefined), @@ -165,6 +175,50 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot' expect(pageRepo.updatePage.mock.calls[0][0].lastUpdatedSource).toBe('user'); }); + // #370 review round-1 SUGGESTION: the boundary was GENERALIZED from a + // user→agent special-case to ANY lastUpdatedSource transition. These pin the + // generalized behaviour it was rebuilt for. + describe('generalized boundary — any source transition', () => { + // Same persisted page but with an explicit prior source. + const pageWithPriorSource = (prior: string | null) => ({ + ...persistedHumanPage('NEW CONTENT'), + lastUpdatedSource: prior, + }); + + it('agent→user transition fires the boundary (pins the prior agent revision)', async () => { + const document = ydocFor(doc('NEW CONTENT')); + pageRepo.findById.mockResolvedValue(pageWithPriorSource('agent')); + pageHistoryRepo.findPageLastHistory.mockResolvedValue(null); + + await ext.onStoreDocument(buildData(document, 'user') as any); + + expect(pageHistoryRepo.saveHistory).toHaveBeenCalledTimes(1); + expect(callOrder).toEqual(['saveHistory', 'updatePage']); + expect(pageRepo.updatePage.mock.calls[0][0].lastUpdatedSource).toBe('user'); + }); + + it('git→user transition fires the boundary (git-sync overwrite is a source change)', async () => { + const document = ydocFor(doc('NEW CONTENT')); + pageRepo.findById.mockResolvedValue(pageWithPriorSource('git')); + pageHistoryRepo.findPageLastHistory.mockResolvedValue(null); + + await ext.onStoreDocument(buildData(document, 'user') as any); + + expect(pageHistoryRepo.saveHistory).toHaveBeenCalledTimes(1); + expect(callOrder).toEqual(['saveHistory', 'updatePage']); + }); + + it('a null prior source (first-ever edit) does NOT fire the boundary', async () => { + const document = ydocFor(doc('NEW CONTENT')); + pageRepo.findById.mockResolvedValue(pageWithPriorSource(null)); + + await ext.onStoreDocument(buildData(document, 'agent') as any); + + expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled(); + expect(pageRepo.updatePage).toHaveBeenCalledTimes(1); + }); + }); + it('idempotency: unchanged content → no updatePage, no history, no queues', async () => { // The Y.Doc content equals the persisted content deeply → early skip. // A Y.Doc round-trip normalizes attrs (e.g. paragraph indent), so derive @@ -469,4 +523,125 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot' // Contributors keyed by the UUID so they match the PAGE_HISTORY job (page.id). expect(collabHistory.addContributors.mock.calls[0][0]).toBe(PAGE_ID); }); + + // #370 — explicit save-version (Cmd+S / agent save tool) over the stateless + // seam. The tier is derived from the SIGNED connection actor, the store path + // is reused, and promote-not-dup avoids duplicating heavy content rows. + describe('save-version (#370)', () => { + const emitSave = (document: any, actor: 'user' | 'agent') => + ext.onStateless({ + connection: { + readOnly: false, + context: { user: { id: USER_ID, name: 'Alice' }, actor }, + } as any, + documentName: `page.${PAGE_ID}`, + document: document as any, + payload: JSON.stringify({ type: 'save-version' }), + } as any); + + // findById returns a page whose content already equals the live doc, so the + // store path is a no-op and we isolate the versioning decision. + const pageMatchingDoc = (document: any) => ({ + ...persistedHumanPage('IGNORED'), + content: TiptapTransformer.fromYdoc(document, 'default'), + }); + + it('human save with no prior snapshot → writes a manual version + broadcasts', async () => { + const document = ydocFor(doc('VERSION ME')); + pageRepo.findById.mockResolvedValue(pageMatchingDoc(document)); + pageHistoryRepo.findPageLastHistory.mockResolvedValue(null); + + await emitSave(document, 'user'); + + expect(pageHistoryRepo.saveHistory).toHaveBeenCalledTimes(1); + expect(pageHistoryRepo.saveHistory.mock.calls[0][1]).toEqual( + expect.objectContaining({ kind: 'manual' }), + ); + // The pending idle autosnapshot is cancelled by the explicit version. + expect(historyQueue.remove).toHaveBeenCalledWith(PAGE_ID); + const msg = JSON.parse( + (document as any).broadcastStateless.mock.calls[(document as any).broadcastStateless.mock.calls.length - 1][0], + ); + expect(msg).toMatchObject({ + type: 'version.saved', + kind: 'manual', + alreadySaved: false, + }); + }); + + it('agent save derives kind=agent from the signed actor', async () => { + const document = ydocFor(doc('AGENT VERSION')); + pageRepo.findById.mockResolvedValue(pageMatchingDoc(document)); + pageHistoryRepo.findPageLastHistory.mockResolvedValue(null); + + await emitSave(document, 'agent'); + + expect(pageHistoryRepo.saveHistory.mock.calls[pageHistoryRepo.saveHistory.mock.calls.length - 1][1]).toEqual( + expect.objectContaining({ kind: 'agent' }), + ); + }); + + it('promote-not-dup: latest snapshot is an autosave with identical content → upgrades in place', async () => { + const document = ydocFor(doc('SAME')); + const page = pageMatchingDoc(document); + pageRepo.findById.mockResolvedValue(page); + pageHistoryRepo.findPageLastHistory.mockResolvedValue({ + id: 'auto-1', + content: page.content, + kind: 'idle', + }); + + await emitSave(document, 'user'); + + // No heavy new content row — the existing autosave is promoted to manual. + expect(pageHistoryRepo.updateHistoryKind).toHaveBeenCalledWith( + 'auto-1', + 'manual', + expect.anything(), + ); + expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled(); + const msg = JSON.parse( + (document as any).broadcastStateless.mock.calls[(document as any).broadcastStateless.mock.calls.length - 1][0], + ); + expect(msg).toMatchObject({ historyId: 'auto-1', alreadySaved: false }); + }); + + it('no-op when the latest snapshot is already a manual version of this content', async () => { + const document = ydocFor(doc('ALREADY SAVED')); + const page = pageMatchingDoc(document); + pageRepo.findById.mockResolvedValue(page); + pageHistoryRepo.findPageLastHistory.mockResolvedValue({ + id: 'ver-1', + content: page.content, + kind: 'manual', + }); + + await emitSave(document, 'user'); + + expect(pageHistoryRepo.updateHistoryKind).not.toHaveBeenCalled(); + expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled(); + const msg = JSON.parse( + (document as any).broadcastStateless.mock.calls[(document as any).broadcastStateless.mock.calls.length - 1][0], + ); + expect(msg).toMatchObject({ alreadySaved: true, kind: 'manual' }); + }); + + it('a read-only connection cannot save a version', async () => { + const document = ydocFor(doc('READER')); + pageRepo.findById.mockResolvedValue(pageMatchingDoc(document)); + + await ext.onStateless({ + connection: { + readOnly: true, + context: { user: { id: USER_ID }, actor: 'user' }, + } as any, + documentName: `page.${PAGE_ID}`, + document: document as any, + payload: JSON.stringify({ type: 'save-version' }), + } as any); + + expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled(); + expect(pageHistoryRepo.updateHistoryKind).not.toHaveBeenCalled(); + }); + }); }); diff --git a/apps/server/src/collaboration/extensions/persistence.extension.ts b/apps/server/src/collaboration/extensions/persistence.extension.ts index 35310cf4..fc848a1e 100644 --- a/apps/server/src/collaboration/extensions/persistence.extension.ts +++ b/apps/server/src/collaboration/extensions/persistence.extension.ts @@ -36,9 +36,11 @@ import { import { Page } from '@docmost/db/types/entity.types'; import { CollabHistoryService } from '../services/collab-history.service'; import { - HISTORY_FAST_INTERVAL, - HISTORY_FAST_THRESHOLD, - HISTORY_INTERVAL, + IDLE_INTERVAL_AGENT, + IDLE_INTERVAL_USER, + IDLE_MAX_WAIT_AGENT, + IDLE_MAX_WAIT_USER, + PageHistoryKind, } from '../constants'; import { TransclusionService } from '../../core/page/transclusion/transclusion.service'; import { observeCollabStore } from '../../integrations/metrics/metrics.registry'; @@ -51,6 +53,16 @@ import { observeCollabStore } from '../../integrations/metrics/metrics.registry' */ export const INTENTIONAL_CLEAR_MESSAGE_TYPE = 'intentional-clear'; +/** + * #370 — wire format of the client→server "save a version" signal. Sent by the + * human (Cmd+S / Save button) and by the agent's explicit save tool over the + * SAME stateless channel. The intentionality tier ('manual' vs 'agent') is + * derived SERVER-SIDE from the signed connection actor, never from this + * payload, so a version's type is unforgeable. The document is taken from the + * connection (not the payload), so the signal cannot be aimed at another page. + */ +export const SAVE_VERSION_MESSAGE_TYPE = 'save-version'; + /** * #251 — how long an intentional-clear signal stays "pending" before it is * ignored. The signal is set on the clearing keystroke but consumed by the @@ -87,35 +99,39 @@ export function resolveSource( } /** - * Compute the BullMQ job id + delay for a page-history snapshot job. Pure so - * the data-loss-sensitive timing arithmetic is unit-testable; `now` is injected - * (caller passes `Date.now()`) for determinism. + * #370 — compute the BullMQ job id + delay for a page's trailing idle-flush + * autosnapshot. Pure so the timing is unit-testable. * - * - Agent edits: delay 0 and a source-keyed job id `${page.id}-agent`. The - * delay MUST stay 0 — the worker re-reads the page row at run time, so any - * delay risks reading content a later human edit has already overwritten - * (mis-tagged snapshot). 0 minimizes that window. The `-agent` suffix keeps - * the job from coalescing with the bare-page.id human job. - * - Human edits: age-based debounce so rapid human edits coalesce into one - * snapshot; job id is the bare `page.id`. - * - * BullMQ forbids ':' in custom job ids (Redis key separator), so '-' is used; - * page.id is a UUID, so `${page.id}-agent` cannot collide with a human job. + * Both humans and the agent now share ONE idle pipeline (the agent's old + * `delay=0` fast path is gone — intentional agent points arrive via the + * explicit save-version signal instead). The job id is the bare `page.id`, so a + * page has at most one pending idle job; the caller removes-and-re-adds it on + * every store to keep it debounced to the trailing edge of an edit burst. The + * window differs by source only: the agent flushes sooner than a human. */ export function computeHistoryJob( - page: Pick, + page: Pick, source: string, - now: number, + // Epoch ms of the FIRST edit in the current burst (when the pending idle job + // was first armed). Used to enforce the max-wait ceiling so a continuous + // editing session cannot re-arm the trailing timer forever. `now` is injectable + // for tests; both default to a live clock / no ceiling when omitted. + burstStart?: number, + now: number = Date.now(), ): { jobId: string; delay: number } { const isAgent = source === 'agent'; - const pageAge = now - new Date(page.createdAt).getTime(); - const delay = isAgent - ? 0 - : pageAge < HISTORY_FAST_THRESHOLD - ? HISTORY_FAST_INTERVAL - : HISTORY_INTERVAL; - const jobId = isAgent ? `${page.id}-agent` : page.id; - return { jobId, delay }; + const interval = isAgent ? IDLE_INTERVAL_AGENT : IDLE_INTERVAL_USER; + const maxWait = isAgent ? IDLE_MAX_WAIT_AGENT : IDLE_MAX_WAIT_USER; + + let delay = interval; + if (burstStart !== undefined) { + // Time already elapsed since the burst's first edit; the snapshot must fire + // no later than `maxWait` after that, so shrink the trailing delay to the + // remaining budget (never negative, so BullMQ fires it promptly). + const remaining = burstStart + maxWait - now; + delay = Math.max(0, Math.min(interval, remaining)); + } + return { jobId: page.id, delay }; } @Injectable() @@ -127,6 +143,11 @@ export class PersistenceExtension implements Extension { // coalescing window" per document and OR it across all edits in the window, // so the snapshot is marked 'agent' regardless of who wrote last. private agentTouched: Map = new Map(); + // #370 — epoch ms of the FIRST edit in the current idle-flush burst, per page. + // Set when the pending idle job is first armed (empty entry), read to enforce + // the max-wait ceiling in computeHistoryJob, and cleared when the idle job is + // consumed/cancelled so the next burst starts a fresh window. + private idleBurstStart: Map = new Map(); // #251 — per-document "intentional clear pending" flags. Keyed by // documentName, value = expiry timestamp (ms). Set by onStateless when the // client reports a deliberate clear; consumed once by the next @@ -326,20 +347,19 @@ export class PersistenceExtension implements Extension { //this.logger.debug('Contributors error:' + err?.['message']); } - // Approach A — boundary snapshot before the agent's first edit. - // When this store is the agent's and the page's currently persisted - // state was authored by a human, pin that human state as its own - // history version BEFORE the agent overwrites it. `page` still holds - // the OLD content/provenance here, so saveHistory(page) captures the - // pre-agent state tagged 'user'. The agent's new content is - // snapshotted later by the debounced PAGE_HISTORY job ('agent'). Skip - // if the prior state is already agent-authored (boundary already - // pinned on the user->agent transition), if the page is effectively - // empty, or if the latest existing snapshot already equals this human - // state (avoid duplicates). + // #370 — boundary snapshot on ANY source transition. When the store + // flips the page's provenance (user↔agent↔git), pin the OUTGOING + // state as its own history version BEFORE the incoming source + // overwrites it. `page` still holds the OLD content/provenance here, + // so saveHistory(page) captures the pre-transition state tagged with + // its own source, kind='boundary'. The incoming content is snapshotted + // later by the debounced idle job. Skip if the page is effectively + // empty or if the latest existing snapshot already equals this state + // (the shared isDeepStrictEqual gate — avoids duplicates). Generalizing + // beyond the old user→agent special-case also covers git-sync for free. if ( - lastUpdatedSource === 'agent' && - page.lastUpdatedSource !== 'agent' + page.lastUpdatedSource && + page.lastUpdatedSource !== lastUpdatedSource ) { // pageHistory.pageId is uuid-typed; use page.id (never the doc-name // slugId) so a `page.` doc cannot throw 22P02 here (#260). @@ -347,15 +367,13 @@ export class PersistenceExtension implements Extension { page.id, { includeContent: true, trx }, ); - const humanBaselineMissing = + const baselineMissing = !lastHistory || !isDeepStrictEqual(lastHistory.content, page.content); - if ( - !isEmptyParagraphDoc(page.content as any) && - humanBaselineMissing - ) { + if (!isEmptyParagraphDoc(page.content as any) && baselineMissing) { await this.pageHistoryRepo.saveHistory(page, { contributorIds: page.contributorIds ?? undefined, + kind: 'boundary', trx, }); } @@ -480,6 +498,14 @@ export class PersistenceExtension implements Extension { return; // unrelated / malformed stateless message } + // #370 — explicit "save a version" (human Cmd+S / agent save tool). Edit + // rights are already enforced by the readOnly reject above (a reader can't + // create a version), exactly as intentional-clear requires. + if (message?.type === SAVE_VERSION_MESSAGE_TYPE) { + await this.handleSaveVersion(data); + return; + } + if (message?.type !== INTENTIONAL_CLEAR_MESSAGE_TYPE) return; this.intentionalClear.set( @@ -488,6 +514,117 @@ export class PersistenceExtension implements Extension { ); } + /** + * #370 — persist an intentional version from the live in-memory ydoc. + * + * One stateless path serves BOTH the human and the agent; the tier is derived + * SERVER-SIDE from the signed connection actor ('agent' → 'agent', anything + * else → 'manual'), so the version type cannot be spoofed by the client. We + * take the fresh ydoc from the collab process memory and run it through the + * EXISTING store path first (so pages.content/ydoc reflect the exact content + * being versioned — a REST endpoint would race the up-to-10s-stale page row), + * then snapshot it into page_history with the intentional kind. + * + * Promote-not-dup: if the latest history row already holds this exact content + * and it is an autosave (idle/boundary/legacy-null), upgrade its kind in place + * instead of duplicating a heavy content row; if it is already 'manual', it is + * a no-op (the client shows an "already saved" toast). Otherwise a fresh + * version row is written, popping the aggregated contributors from Redis. + */ + private async handleSaveVersion(data: onStatelessPayload): Promise { + const { connection, document, documentName } = data; + const context = connection?.context; + const pageId = getPageId(documentName); + // Unforgeable: 'agent' only for a signed agent connection, else 'manual'. + const kind: PageHistoryKind = + context?.actor === 'agent' ? 'agent' : 'manual'; + + // Flush the live ydoc through the normal store path so the page row + ydoc + // hold exactly what we are about to version (also fires the idle enqueue we + // supersede below, plus any source-transition boundary). onStoreDocument + // only needs document/documentName/context. + await this.onStoreDocument({ + document, + documentName, + context, + } as onStoreDocumentPayload); + + let result: + | { historyId: string; kind: PageHistoryKind; alreadySaved: boolean } + | undefined; + + await executeTx(this.db, async (trx) => { + const page = await this.pageRepo.findById(pageId, { + withLock: true, + includeContent: true, + trx, + }); + if (!page) return; + // Never version an effectively-empty page (mirrors the processor's + // first-history guard); there is nothing intentional to pin. + if (isEmptyParagraphDoc(page.content as any)) return; + + const lastHistory = await this.pageHistoryRepo.findPageLastHistory( + page.id, + { includeContent: true, trx }, + ); + + if ( + lastHistory && + isDeepStrictEqual(lastHistory.content, page.content) + ) { + // Content is already snapshotted. Promote-not-dup. + if (lastHistory.kind === 'manual') { + result = { + historyId: lastHistory.id, + kind: 'manual', + alreadySaved: true, + }; + return; + } + await this.pageHistoryRepo.updateHistoryKind( + lastHistory.id, + kind, + trx, + ); + result = { historyId: lastHistory.id, kind, alreadySaved: false }; + return; + } + + // Fresh version row. Pop the contributors aggregated since the last + // snapshot (SPOP); restore them if the write fails so they aren't lost. + const contributorIds = await this.collabHistory.popContributors(page.id); + try { + const saved = await this.pageHistoryRepo.saveHistory(page, { + contributorIds, + kind, + trx, + }); + result = { historyId: saved.id, kind, alreadySaved: false }; + } catch (err) { + await this.collabHistory.addContributors(page.id, contributorIds); + throw err; + } + }); + + // Housekeeping: this explicit version supersedes the page's pending idle + // autosnapshot, so cancel it (delayed job → remove() just deletes it) and + // end the current idle burst so the next edit starts a fresh max-wait window. + await this.historyQueue.remove(pageId).catch(() => undefined); + this.idleBurstStart.delete(pageId); + + if (result) { + document.broadcastStateless( + JSON.stringify({ + type: 'version.saved', + historyId: result.historyId, + kind: result.kind, + alreadySaved: result.alreadySaved, + }), + ); + } + } + async onChange(data: onChangePayload) { const documentName = data.documentName; const userId = data.context?.user?.id; @@ -545,17 +682,45 @@ export class PersistenceExtension implements Extension { page: Page, lastUpdatedSource: string, ): Promise { - // Job id + delay arithmetic lives in the pure `computeHistoryJob` (see its - // doc comment for the agent-delay-0 / age-based-debounce invariants). + // #370 — trailing idle debounce with a max-wait ceiling. One pending idle + // job per page (jobId = page.id); on every store we remove the pending + // delayed job and re-add it, so the snapshot lands `delay` after edits go + // quiet rather than once per store (precedent: workspace.service.ts). + // remove() on a delayed job simply deletes it (0 if absent, no throw); if the + // job is already ACTIVE and the remove is a no-op, the add still de-dups and + // the processor's isDeepStrictEqual gate collapses the duplicate content. + // + // The FIRST arm of a burst records `burstStart`; computeHistoryJob shrinks + // the delay to the remaining max-wait budget from that point, so a continuous + // session cannot re-arm the trailing timer forever and starve the snapshot. + // A burst marker older than THIS TIER's max-wait means the previous idle job + // has already fired — start a fresh window instead of firing immediately on + // the next edit. Must use the SAME source-specific max-wait computeHistoryJob + // uses (agent 5m / user 10m): a hardcoded USER ceiling would leave an agent + // burst's marker stale for 5..10m, forcing delay=0 on every store in that + // window and writing one idle row per store — exactly the per-store bloat the + // debounce exists to prevent, on the continuous-agent path. + const maxWait = + lastUpdatedSource === 'agent' ? IDLE_MAX_WAIT_AGENT : IDLE_MAX_WAIT_USER; + const now = Date.now(); + let burstStart = this.idleBurstStart.get(page.id); + if (burstStart === undefined || now - burstStart >= maxWait) { + burstStart = now; + this.idleBurstStart.set(page.id, burstStart); + } + const { jobId, delay } = computeHistoryJob( page, lastUpdatedSource, - Date.now(), + burstStart, + now, ); + await this.historyQueue.remove(jobId).catch(() => undefined); + await this.historyQueue.add( QueueJob.PAGE_HISTORY, - { pageId: page.id } as IPageHistoryJob, + { pageId: page.id, kind: 'idle' } as IPageHistoryJob, { jobId, delay }, ); } diff --git a/apps/server/src/collaboration/processors/history.processor.spec.ts b/apps/server/src/collaboration/processors/history.processor.spec.ts index bdcf846e..4e87a322 100644 --- a/apps/server/src/collaboration/processors/history.processor.spec.ts +++ b/apps/server/src/collaboration/processors/history.processor.spec.ts @@ -66,6 +66,15 @@ describe('HistoryProcessor.process', () => { notificationQueue = { add: jest.fn().mockResolvedValue(undefined) }; generalQueue = { add: jest.fn().mockResolvedValue(undefined) }; + // #370 F3 — the processor now serializes its find+save under a page-row lock + // via executeTx. A db whose transaction().execute(fn) runs fn with a trx stub + // drives the real executeTx() helper without a database. + const db = { + transaction: () => ({ + execute: (fn: (trx: any) => Promise) => fn({ __trx: true }), + }), + }; + // WorkerHost's constructor reads `this.worker`; passing repos positionally // matches the constructor and avoids the Nest DI container. proc = new HistoryProcessor( @@ -73,6 +82,7 @@ describe('HistoryProcessor.process', () => { pageRepo as any, collabHistory as any, watcherService as any, + db as any, notificationQueue as any, generalQueue as any, ); @@ -126,15 +136,26 @@ describe('HistoryProcessor.process', () => { await proc.process(buildJob()); expect(collabHistory.popContributors).toHaveBeenCalledWith(PAGE_ID); + // #370 F3/F9 — the snapshot decision runs under a page-row lock. Pin the lock + // structurally so a refactor that drops withLock/trx (silently reintroducing + // the TOCTOU double-insert) turns this red. The tx stub is { __trx: true }. + expect(pageRepo.findById).toHaveBeenCalledWith( + PAGE_ID, + expect.objectContaining({ withLock: true, trx: { __trx: true } }), + ); + // #370 F7 — addPageWatchers MUST receive the trx, or its FK-check runs on a + // separate connection and self-deadlocks against our FOR UPDATE. Asserting + // the trx arg here is exactly what would have caught that regression. expect(watcherService.addPageWatchers).toHaveBeenCalledWith( ['u1', 'u2'], PAGE_ID, SPACE_ID, WORKSPACE_ID, + { __trx: true }, ); expect(pageHistoryRepo.saveHistory).toHaveBeenCalledWith( expect.objectContaining({ id: PAGE_ID }), - { contributorIds: ['u1', 'u2'] }, + { contributorIds: ['u1', 'u2'], kind: 'idle', trx: { __trx: true } }, ); expect(generalQueue.add).toHaveBeenCalledWith( QueueJob.PAGE_BACKLINKS, @@ -186,6 +207,48 @@ describe('HistoryProcessor.process', () => { ]); }); + it('COMMIT failure (throw outside the tx callback) → contributors RESTORED', async () => { + // #370 F8 — a commit-time failure throws OUTSIDE the callback, so the inner + // try/catch does not run; the outer catch must restore the popped set (else a + // BullMQ retry writes an unattributed version). Use a db whose execute() runs + // the callback THEN throws, simulating a commit abort. + pageHistoryRepo.findPageLastHistory.mockResolvedValue({ + content: { type: 'doc', content: [] }, + }); + const commitFail = { + transaction: () => ({ + execute: async (fn: (trx: any) => Promise) => { + await fn({ __trx: true }); // callback succeeds (saveHistory ok) + throw new Error('commit aborted'); // ...but the COMMIT fails + }, + }), + }; + const procCommitFail = new HistoryProcessor( + pageHistoryRepo as any, + pageRepo as any, + collabHistory as any, + watcherService as any, + commitFail as any, + notificationQueue as any, + generalQueue as any, + ); + jest + .spyOn(procCommitFail['logger'], 'error') + .mockImplementation(() => undefined); + + await expect(procCommitFail.process(buildJob())).rejects.toThrow( + 'commit aborted', + ); + // The inner catch did NOT run (save succeeded), so only the outer catch can + // restore — assert it did. + expect(collabHistory.addContributors).toHaveBeenCalledWith(PAGE_ID, [ + 'u1', + 'u2', + ]); + // And the post-snapshot queue work must NOT have run (we rethrew). + expect(generalQueue.add).not.toHaveBeenCalled(); + }); + it('backlinks + notification queue failures are swallowed (history still committed)', async () => { pageHistoryRepo.findPageLastHistory.mockResolvedValue({ content: { type: 'doc', content: [] }, diff --git a/apps/server/src/collaboration/processors/history.processor.ts b/apps/server/src/collaboration/processors/history.processor.ts index 6f26d3fa..9c4bf7a6 100644 --- a/apps/server/src/collaboration/processors/history.processor.ts +++ b/apps/server/src/collaboration/processors/history.processor.ts @@ -19,6 +19,9 @@ import { isDeepStrictEqual } from 'node:util'; import { CollabHistoryService } from '../services/collab-history.service'; import { WatcherService } from '../../core/watcher/watcher.service'; import { isEmptyParagraphDoc } from '../collaboration.util'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { executeTx } from '@docmost/db/utils'; @Processor(QueueName.HISTORY_QUEUE) export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { @@ -29,6 +32,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { private readonly pageRepo: PageRepo, private readonly collabHistory: CollabHistoryService, private readonly watcherService: WatcherService, + @InjectKysely() private readonly db: KyselyDB, @InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue, @InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue, ) { @@ -41,6 +45,9 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { try { const { pageId } = job.data; + // Read the page WITHOUT a lock first, only to bail early on the two cheap + // no-write cases (page gone / empty first snapshot) without opening a + // transaction. The authoritative check-then-write happens locked below. const page = await this.pageRepo.findById(pageId, { includeContent: true, }); @@ -51,40 +58,109 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { return; } - const lastHistory = await this.pageHistoryRepo.findPageLastHistory( - pageId, - { includeContent: true }, - ); + // #370 F3 — the snapshot decision (findPageLastHistory → saveHistory) must + // be serialized against manual-save/boundary writers, which run under a + // page-row lock in onStoreDocument. Without it, this processor and a + // concurrent manual-save each read the same lastHistory (MVCC), both see + // content != lastHistory, and both insert — producing two page_history rows + // with IDENTICAL content (one 'idle', one 'manual'), defeating + // promote-not-dup and the version-vs-autosave split. Taking the same + // page-row lock makes the second writer observe the first's committed row so + // the isDeepStrictEqual gate collapses the duplicate. Only the read+write + // is transacted; the post-snapshot queue work stays outside. + let contributorIds: string[] = []; + let snapshotWritten = false; + let lastHistoryContent: unknown; + // #370 F8 — the contributor set popped from Redis (destructive SPOP) must be + // restored if the snapshot does not durably land. The inner try/catch only + // covers a throw INSIDE the callback; a COMMIT failure (connection drop, + // serialization/deadlock abort on commit — the transient class the epic + // already retries) throws OUTSIDE it, rolling the snapshot back while the + // pop is already gone. We track the popped set here and restore it in the + // outer catch so a BullMQ retry re-attributes the version. addContributors + // is an idempotent Redis SADD, so a double-restore is harmless. + let poppedForRestore: string[] = []; - if (!lastHistory && isEmptyParagraphDoc(page.content as any)) { - this.logger.debug( - `Skipping first history for page ${pageId}: empty content`, - ); - await this.collabHistory.clearContributors(pageId); + try { + await executeTx(this.db, async (trx) => { + const lockedPage = await this.pageRepo.findById(pageId, { + includeContent: true, + withLock: true, + trx, + }); + if (!lockedPage) return; + + const lastHistory = await this.pageHistoryRepo.findPageLastHistory( + pageId, + { includeContent: true, trx }, + ); + lastHistoryContent = lastHistory?.content; + + if (!lastHistory && isEmptyParagraphDoc(lockedPage.content as any)) { + this.logger.debug( + `Skipping first history for page ${pageId}: empty content`, + ); + return; + } + + if ( + lastHistory && + isDeepStrictEqual(lastHistory.content, lockedPage.content) + ) { + return; // already snapshotted at this content — nothing to write + } + + contributorIds = await this.collabHistory.popContributors(pageId); + poppedForRestore = contributorIds; + try { + // Pass `trx` so the watcher insert's FK check (FOR KEY SHARE on + // pages[pageId]) runs on the SAME connection that already holds the + // FOR UPDATE lock from findById — otherwise it takes the FK lock on a + // separate pool connection and self-deadlocks against our own tx. + await this.watcherService.addPageWatchers( + contributorIds, + pageId, + lockedPage.spaceId, + lockedPage.workspaceId, + trx, + ); + + // #370 — every job on this queue is a trailing idle-flush autosnapshot. + await this.pageHistoryRepo.saveHistory(lockedPage, { + contributorIds, + kind: job.data.kind ?? 'idle', + trx, + }); + snapshotWritten = true; + this.logger.debug(`History created for page: ${pageId}`); + } catch (err) { + await this.collabHistory.addContributors(pageId, contributorIds); + poppedForRestore = []; + throw err; + } + }); + } catch (err) { + // A throw here means the tx did NOT commit (callback threw, or the commit + // itself failed and rolled back). If we popped contributors and the inner + // catch did not already restore them, restore now so the retry keeps + // attribution. snapshotWritten is irrelevant: it is set before commit, so + // it can be true even when the commit rolled the snapshot back. + if (poppedForRestore.length) { + await this.collabHistory.addContributors(pageId, poppedForRestore); + } + throw err; + } + + // No snapshot written (page vanished / empty-first / unchanged content) → + // clear the contributor set for the skip cases and stop. + if (!snapshotWritten) { + if (!lastHistoryContent && isEmptyParagraphDoc(page.content as any)) { + await this.collabHistory.clearContributors(pageId); + } return; } - if ( - !lastHistory || - !isDeepStrictEqual(lastHistory.content, page.content) - ) { - const contributorIds = await this.collabHistory.popContributors(pageId); - - try { - await this.watcherService.addPageWatchers( - contributorIds, - pageId, - page.spaceId, - page.workspaceId, - ); - - await this.pageHistoryRepo.saveHistory(page, { contributorIds }); - this.logger.debug(`History created for page: ${pageId}`); - } catch (err) { - await this.collabHistory.addContributors(pageId, contributorIds); - throw err; - } - + { const mentions = extractMentions(page.content); const pageMentions = extractPageMentions(mentions); const internalLinkSlugIds = extractInternalLinkSlugIds(page.content); @@ -102,7 +178,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { ); }); - if (contributorIds.length > 0 && lastHistory?.content) { + if (contributorIds.length > 0 && lastHistoryContent) { await this.notificationQueue .add(QueueJob.PAGE_UPDATED, { pageId, diff --git a/apps/server/src/database/migrations/20260705T120000-page-history-kind.ts b/apps/server/src/database/migrations/20260705T120000-page-history-kind.ts new file mode 100644 index 00000000..c2292e6f --- /dev/null +++ b/apps/server/src/database/migrations/20260705T120000-page-history-kind.ts @@ -0,0 +1,27 @@ +import { type Kysely } from 'kysely'; + +/** + * #370 — page-versioning intentionality tier on a history snapshot. + * + * Adds `page_history.kind`, the three-tier "how intentional was this snapshot" + * marker that lets versions (intentional points) be told apart from autosaves: + * - 'manual' — a human explicitly saved a version (Cmd+S / Save button) + * - 'agent' — the AI agent explicitly saved a version + * - 'idle' — trailing idle-flush autosnapshot (safety net) + * - 'boundary' — autosnapshot pinned on a source transition (user↔agent↔git) + * + * Nullable with NO default (mirrors last_updated_source in the agent-provenance + * migration): legacy rows predate the marker and read back as `null`, which the + * client renders as a plain autosave. Stored as a short varchar to stay + * forward-compatible without an enum migration. + */ +export async function up(db: Kysely): Promise { + await db.schema + .alterTable('page_history') + .addColumn('kind', 'varchar(20)', (col) => col) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.alterTable('page_history').dropColumn('kind').execute(); +} diff --git a/apps/server/src/database/repos/page/page-history.repo.ts b/apps/server/src/database/repos/page/page-history.repo.ts index e6ba1d61..6eb5b145 100644 --- a/apps/server/src/database/repos/page/page-history.repo.ts +++ b/apps/server/src/database/repos/page/page-history.repo.ts @@ -13,6 +13,7 @@ import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres'; import { ExpressionBuilder, sql } from 'kysely'; import { DB } from '@docmost/db/types/db'; import { resolveAgentProvenance } from '../agent-provenance'; +import { PageHistoryKind } from '../../../collaboration/constants'; /** * Role-resolution subquery for a page-history row's bound AI chat (#300). Joins @@ -46,6 +47,9 @@ export class PageHistoryRepo { 'lastUpdatedById', 'lastUpdatedSource', 'lastUpdatedAiChatId', + // #370 — intentionality tier ('manual' | 'agent' | 'idle' | 'boundary'); + // null on legacy rows (= autosave). Selected so callers can read/promote it. + 'kind', 'contributorIds', 'spaceId', 'workspaceId', @@ -85,9 +89,15 @@ export class PageHistoryRepo { async saveHistory( page: Page, - opts?: { contributorIds?: string[]; trx?: KyselyTransaction }, - ): Promise { - await this.insertPageHistory( + opts?: { + contributorIds?: string[]; + // #370 — intentionality tier for this snapshot. Omitted → null (legacy + // autosave semantics). Callers derive it server-side, never from a client. + kind?: PageHistoryKind; + trx?: KyselyTransaction; + }, + ): Promise { + return await this.insertPageHistory( { pageId: page.id, slugId: page.slugId, @@ -99,6 +109,7 @@ export class PageHistoryRepo { // Copy the provenance marker off the page row, as for lastUpdatedById. lastUpdatedSource: page.lastUpdatedSource, lastUpdatedAiChatId: page.lastUpdatedAiChatId, + kind: opts?.kind ?? null, contributorIds: opts?.contributorIds, spaceId: page.spaceId, workspaceId: page.workspaceId, @@ -107,6 +118,25 @@ export class PageHistoryRepo { ); } + /** + * #370 — promote an existing snapshot's intentionality tier in place. Used by + * the manual-save "promote-not-dup" path: when the latest history row already + * holds the exact content being versioned, we upgrade its `kind` instead of + * duplicating a heavy content row. + */ + async updateHistoryKind( + pageHistoryId: string, + kind: PageHistoryKind, + trx?: KyselyTransaction, + ): Promise { + const db = dbOrTx(this.db, trx); + await db + .updateTable('pageHistory') + .set({ kind }) + .where('id', '=', pageHistoryId) + .execute(); + } + async findPageHistoryByPageId(pageId: string, pagination: PaginationOptions) { const query = this.db .selectFrom('pageHistory') diff --git a/apps/server/src/database/types/db.d.ts b/apps/server/src/database/types/db.d.ts index bf5a9663..69294d32 100644 --- a/apps/server/src/database/types/db.d.ts +++ b/apps/server/src/database/types/db.d.ts @@ -280,6 +280,7 @@ export interface PageHistory { createdAt: Generated; icon: string | null; id: Generated; + kind: string | null; lastUpdatedAiChatId: string | null; lastUpdatedById: string | null; lastUpdatedSource: string | null; diff --git a/apps/server/src/integrations/queue/constants/queue.interface.ts b/apps/server/src/integrations/queue/constants/queue.interface.ts index 19445944..f95e5eda 100644 --- a/apps/server/src/integrations/queue/constants/queue.interface.ts +++ b/apps/server/src/integrations/queue/constants/queue.interface.ts @@ -20,6 +20,10 @@ export interface IStripeSeatsSyncJob { export interface IPageHistoryJob { pageId: string; + // #370 — intentionality tier the worker stamps on the snapshot. All jobs on + // this queue are trailing idle-flush autosnapshots, so this is 'idle' (absent + // → treated as 'idle' by the processor). + kind?: 'idle'; } /**