From ad0fc3b58b490f02ae7026b2c6f4b81ca5f780a6 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Tue, 23 Jun 2026 15:21:14 +0300 Subject: [PATCH] fix(git-sync): merge git body into the live doc block-by-block (no clobber) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Supersedes the active-session "defer" guard with a real merge (review #5 — "запись делать через мерж", not skip-while-editing). writeBody no longer does delete-all + re-insert (which discarded a concurrent editor's in-flight changes on every sync). It now diffs the live body against the incoming git body at TOP-LEVEL BLOCK granularity (LCS over a canonical structural serialization) and applies only the minimal inserts/deletes: - a block a human is editing is left UNTOUCHED when git changed a DIFFERENT block; - an unchanged resync is a complete 0-op write; - Yjs CRDT-merges the minimal ops with concurrent edits. New yjs-body-merge.ts (mergeXmlFragments + cloneXmlNode + diffBlocks) is pure-Yjs and unit-tested with real Y.Docs (8 tests): identical->0 ops, edit-one-block keeps the other block instances, append/delete keep neighbours, marks survive the cross-doc clone. Crash-safety kept: the incoming doc is built before the connection opens, so a transform failure can't empty the body. Removed: the ActiveEditSessionError defer path and the now-unused CollaborationGateway.getActiveEditorCount. Honest limitation: this is a 2-way merge — for a block BOTH sides changed since the last sync, git wins (no common ancestor to decide). A full 3-way merge would need the last-synced base plumbed from the engine; the dominant cases (unchanged resync, edits to different blocks) are now lossless. Co-Authored-By: Claude Opus 4.8 --- .../collaboration/collaboration.gateway.ts | 15 -- .../gitmost-datasource.service.spec.ts | 44 +---- .../services/gitmost-datasource.service.ts | 54 ++---- .../git-sync/services/yjs-body-merge.spec.ts | 167 ++++++++++++++++++ .../git-sync/services/yjs-body-merge.ts | 160 +++++++++++++++++ 5 files changed, 350 insertions(+), 90 deletions(-) create mode 100644 apps/server/src/integrations/git-sync/services/yjs-body-merge.spec.ts create mode 100644 apps/server/src/integrations/git-sync/services/yjs-body-merge.ts diff --git a/apps/server/src/collaboration/collaboration.gateway.ts b/apps/server/src/collaboration/collaboration.gateway.ts index 986cc451..b46c13c8 100644 --- a/apps/server/src/collaboration/collaboration.gateway.ts +++ b/apps/server/src/collaboration/collaboration.gateway.ts @@ -137,21 +137,6 @@ export class CollaborationGateway { return this.hocuspocus.getDocumentsCount(); } - /** - * Number of LIVE human editor sessions (websocket connections) currently open - * on a document, or 0 if the document is not loaded. Unlike - * `Document.getConnectionsCount()` this deliberately excludes server-side - * direct connections (`directConnectionsCount`, e.g. the git-sync writer - * itself), so callers can tell whether a real person is editing right now. - * - * NOTE: this reflects only THIS instance. In a Redis-clustered deployment an - * editor attached to another node is not counted; for the single-instance - * deployments this guards (git-sync) that is exactly the live set. - */ - getActiveEditorCount(documentName: string): number { - return this.hocuspocus.documents.get(documentName)?.connections.size ?? 0; - } - handleYjsEvent( eventName: TName, documentName: string, diff --git a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts index 572227a6..be341a39 100644 --- a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts +++ b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts @@ -32,10 +32,7 @@ jest.mock('@docmost/editor-ext', () => ({ })); import * as Y from 'yjs'; -import { - GitmostDataSourceService, - ActiveEditSessionError, -} from './gitmost-datasource.service'; +import { GitmostDataSourceService } from './gitmost-datasource.service'; // Focused unit/contract test for the native GitSyncClient adapter (plan §3). // No DB, no real collab server: the repos/services/gateway are mocked and we @@ -56,10 +53,7 @@ interface Mocks { movePage: AnyMock; removePage: AnyMock; }; - collabGateway: { - openDirectConnection: AnyMock; - getActiveEditorCount: AnyMock; - }; + collabGateway: { openDirectConnection: AnyMock }; // Minimal Kysely-ish chainable mock for the direct-query paths. db: any; // Captured collab connection (the fake conn the gateway returns). @@ -110,8 +104,6 @@ function build(rows: any[] = []): { conn.context = ctx; return conn; }), - // Default: no live editor sessions, so body writes proceed. - getActiveEditorCount: jest.fn(() => 0), }, db: { selectFrom: jest.fn(() => makeQueryBuilder(rows)), @@ -247,31 +239,11 @@ describe('GitmostDataSourceService', () => { expect(res.updatedAt).toBe('2026-06-20T11:00:00.000Z'); }); - it('defers (throws ActiveEditSessionError) when a human is editing the page — never clobbers', async () => { - const { service, mocks } = build(); - // A live editor session on this page. - mocks.collabGateway.getActiveEditorCount.mockReturnValue(1); - - await expect( - service.bind(CTX).importPageMarkdown('p1', '# Hello\n\nworld'), - ).rejects.toBeInstanceOf(ActiveEditSessionError); - - // The destructive full-body write must NOT have happened: no connection - // opened, no transact run. The engine's push loop catches this and retries - // on the next poll once the editor disconnects. - expect(mocks.collabGateway.getActiveEditorCount).toHaveBeenCalledWith( - 'page.p1', - ); - expect(mocks.collabGateway.openDirectConnection).not.toHaveBeenCalled(); - expect(mocks.conn.transact).not.toHaveBeenCalled(); - }); - - it('crash-safe: the captured write applies real content (update built before clearing)', async () => { - // The replacement Yjs update is computed BEFORE the connection opens / the - // fragment is cleared, so a transform failure can never leave the body - // emptied. Here we run the captured transact callback against a REAL Y.Doc - // and confirm it ends up with content (the precomputed update is valid and - // applied), i.e. the write produces a non-empty body rather than wiping it. + it('crash-safe: the incoming doc is built before the connection opens, and the captured merge applies content', async () => { + // The incoming Yjs doc is built BEFORE the connection opens, so a transform + // failure can never mutate the live body. Here we run the captured transact + // callback (the block-level merge) against a REAL empty Y.Doc and confirm it + // ends up with content — the write produces a non-empty body, never wipes it. const { service, mocks } = build(); mocks.pageRepo.findById.mockResolvedValue({ id: 'p1', @@ -281,7 +253,7 @@ describe('GitmostDataSourceService', () => { const realDoc = new Y.Doc(); expect(() => mocks.conn.capturedFn?.(realDoc)).not.toThrow(); - // The body fragment is non-empty: the markdown was converted and applied. + // The body fragment is non-empty: the incoming block was merged in. expect(realDoc.getXmlFragment('default').length).toBeGreaterThan(0); }); }); diff --git a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts index 06fb40b3..fe165388 100644 --- a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts +++ b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts @@ -1,5 +1,4 @@ import { Injectable, Logger, NotFoundException } from '@nestjs/common'; -import * as Y from 'yjs'; import { TiptapTransformer } from '@hocuspocus/transformer'; import { generateJitteredKeyBetween } from 'fractional-indexing-jittered'; import { @@ -15,6 +14,7 @@ import { KyselyDB } from '@docmost/db/types/kysely.types'; import { PageService } from '../../../core/page/services/page.service'; import { CollaborationGateway } from '../../../collaboration/collaboration.gateway'; import { tiptapExtensions } from '../../../collaboration/collaboration.util'; +import { mergeXmlFragments } from './yjs-body-merge'; import { AuthProvenanceData } from '../../../common/decorators/auth-provenance.decorator'; /** @@ -42,23 +42,6 @@ const GIT_SYNC_PROVENANCE: AuthProvenanceData = { aiChatId: null, }; -/** - * Thrown when a git -> page body write is skipped because a human is editing the - * page RIGHT NOW (a live collab session). The engine's push loop catches this - * per page, records it as a (non-fatal) failure, and does NOT advance the - * loop-guard for that page — so the write is retried on the next poll once the - * editor disconnects, instead of clobbering their in-flight edits with a - * full-body replace (plan §15.6 / review #5). - */ -export class ActiveEditSessionError extends Error { - constructor(pageId: string) { - super( - `git-sync: page ${pageId} has an active edit session; deferring body write`, - ); - this.name = 'ActiveEditSessionError'; - } -} - /** * Native, in-process implementation of the engine's `GitSyncClient` seam * (plan §3). Reads go through repositories (PageRepo/SpaceRepo); body writes go @@ -398,28 +381,15 @@ export class GitmostDataSourceService { ): Promise { const documentName = `page.${pageId}`; - // Do NOT clobber a page someone is editing right now. The write below is a - // full-body replace (delete-all + re-insert); applied over a live editing - // session it would discard the user's in-flight changes. If a human editor - // is connected, defer: throw so the engine retries on the next poll once - // they disconnect (review #5 — "не писать в страницу с активной сессией"). - if (this.collabGateway.getActiveEditorCount(documentName) > 0) { - this.logger.debug( - `Skipping git-sync body write for ${documentName}: active edit session`, - ); - throw new ActiveEditSessionError(pageId); - } - - // Build the replacement Yjs state BEFORE touching the live doc. If the - // transform throws (a malformed/unsupported doc), we must NOT have already - // cleared the fragment — otherwise a conversion failure would leave the page - // with an empty body (review #5 — crash-safe conversion). - const next = TiptapTransformer.toYdoc( + // Build the incoming Yjs doc BEFORE opening the connection / touching the + // live doc. If the transform throws (a malformed/unsupported doc) we must NOT + // have mutated the live body — otherwise a conversion failure could leave the + // page empty (review #5 — crash-safe conversion). + const targetDoc = TiptapTransformer.toYdoc( prosemirrorJson, 'default', tiptapExtensions, ); - const update = Y.encodeStateAsUpdate(next); const conn = await this.collabGateway.openDirectConnection(documentName, { actor: 'git-sync', @@ -430,9 +400,15 @@ export class GitmostDataSourceService { }); try { await conn.transact((doc) => { - const fragment = doc.getXmlFragment('default'); - if (fragment.length > 0) fragment.delete(0, fragment.length); - Y.applyUpdate(doc, update); + // Block-level MERGE rather than a full-body replace (review #5): diff the + // live body against the incoming git body and apply only the blocks that + // actually changed. Blocks a human is concurrently editing — anything git + // did not change — are left untouched, and an unchanged resync is a 0-op + // write. Yjs CRDT-merges the minimal ops with live edits. + mergeXmlFragments( + doc.getXmlFragment('default'), + targetDoc.getXmlFragment('default'), + ); }); } finally { await conn.disconnect(); diff --git a/apps/server/src/integrations/git-sync/services/yjs-body-merge.spec.ts b/apps/server/src/integrations/git-sync/services/yjs-body-merge.spec.ts new file mode 100644 index 00000000..98a3025f --- /dev/null +++ b/apps/server/src/integrations/git-sync/services/yjs-body-merge.spec.ts @@ -0,0 +1,167 @@ +import * as Y from 'yjs'; + +import { + mergeXmlFragments, + cloneXmlNode, + diffBlocks, +} from './yjs-body-merge'; + +// Build a Y.XmlFragment('default') in `doc` from a list of paragraph specs. +// Each spec is the paragraph's plain text (a single XmlText child). +function buildFragment(doc: Y.Doc, paragraphs: string[]): Y.XmlFragment { + const frag = doc.getXmlFragment('default'); + const blocks = paragraphs.map((text) => { + const el = new Y.XmlElement('paragraph'); + const t = new Y.XmlText(); + if (text) t.insert(0, text); + el.insert(0, [t]); + return el; + }); + if (blocks.length) frag.insert(0, blocks); + return frag; +} + +function texts(frag: Y.XmlFragment): string[] { + return frag.toArray().map((el) => (el as Y.XmlElement).toArray() + .map((c) => (c as Y.XmlText).toString()) + .join('')); +} + +describe('yjs-body-merge', () => { + describe('diffBlocks (LCS edit script)', () => { + it('identical sequences produce only keeps (no edits)', () => { + const ops = diffBlocks(['a', 'b', 'c'], ['a', 'b', 'c']); + expect(ops.every((o) => o.op === 'keep')).toBe(true); + }); + + it('a single changed middle element is one del + one ins', () => { + const ops = diffBlocks(['a', 'b', 'c'], ['a', 'B', 'c']); + expect(ops.filter((o) => o.op === 'del')).toHaveLength(1); + expect(ops.filter((o) => o.op === 'ins')).toHaveLength(1); + expect(ops.filter((o) => o.op === 'keep')).toHaveLength(2); + }); + }); + + describe('mergeXmlFragments', () => { + it('identical content is a complete no-op (0 ops) — never clobbers an unchanged resync', () => { + const live = new Y.Doc(); + const target = new Y.Doc(); + const liveFrag = buildFragment(live, ['one', 'two', 'three']); + const targetFrag = buildFragment(target, ['one', 'two', 'three']); + + // Capture block identities to prove they are left untouched. + const before = liveFrag.toArray(); + let applied = -1; + live.transact(() => { + applied = mergeXmlFragments(liveFrag, targetFrag); + }); + + expect(applied).toBe(0); + // Same Y.XmlElement instances — nothing was deleted/recreated. + expect(liveFrag.toArray()).toEqual(before); + expect(texts(liveFrag)).toEqual(['one', 'two', 'three']); + }); + + it('a human edit to one block survives a git change to a DIFFERENT block', () => { + // Live: the human has the doc open; block 0 holds their edit. Git changed + // only block 2. The merge must touch ONLY block 2 and leave block 0 (and + // its in-flight edit) exactly as-is. + const live = new Y.Doc(); + const target = new Y.Doc(); + const liveFrag = buildFragment(live, ['HUMAN EDIT', 'shared', 'old tail']); + const targetFrag = buildFragment(target, [ + 'HUMAN EDIT', + 'shared', + 'new tail from git', + ]); + + const block0Before = liveFrag.get(0); // the human's block instance + const block1Before = liveFrag.get(1); + + let applied = -1; + live.transact(() => { + applied = mergeXmlFragments(liveFrag, targetFrag); + }); + + // Only block 2 was replaced: one del + one ins. + expect(applied).toBe(2); + // The human's block and the shared block are the SAME instances (untouched). + expect(liveFrag.get(0)).toBe(block0Before); + expect(liveFrag.get(1)).toBe(block1Before); + // Block 2 now carries git's content. + expect(texts(liveFrag)).toEqual([ + 'HUMAN EDIT', + 'shared', + 'new tail from git', + ]); + }); + + it('appends a new trailing block without disturbing existing ones', () => { + const live = new Y.Doc(); + const target = new Y.Doc(); + const liveFrag = buildFragment(live, ['a', 'b']); + const targetFrag = buildFragment(target, ['a', 'b', 'c']); + const a = liveFrag.get(0); + const b = liveFrag.get(1); + + let applied = -1; + live.transact(() => { + applied = mergeXmlFragments(liveFrag, targetFrag); + }); + + expect(applied).toBe(1); // single insert + expect(liveFrag.get(0)).toBe(a); + expect(liveFrag.get(1)).toBe(b); + expect(texts(liveFrag)).toEqual(['a', 'b', 'c']); + }); + + it('deletes a removed block, keeping its neighbours', () => { + const live = new Y.Doc(); + const target = new Y.Doc(); + const liveFrag = buildFragment(live, ['a', 'b', 'c']); + const targetFrag = buildFragment(target, ['a', 'c']); + const a = liveFrag.get(0); + + let applied = -1; + live.transact(() => { + applied = mergeXmlFragments(liveFrag, targetFrag); + }); + + expect(applied).toBe(1); // single delete + expect(liveFrag.get(0)).toBe(a); + expect(texts(liveFrag)).toEqual(['a', 'c']); + }); + + it('a fully different body is replaced (and stays valid)', () => { + const live = new Y.Doc(); + const target = new Y.Doc(); + const liveFrag = buildFragment(live, ['x', 'y']); + const targetFrag = buildFragment(target, ['p', 'q', 'r']); + live.transact(() => mergeXmlFragments(liveFrag, targetFrag)); + expect(texts(liveFrag)).toEqual(['p', 'q', 'r']); + }); + }); + + describe('cloneXmlNode', () => { + it('preserves text marks (XmlText delta) across docs', () => { + const src = new Y.Doc(); + const srcFrag = src.getXmlFragment('default'); + const el = new Y.XmlElement('paragraph'); + const t = new Y.XmlText(); + t.insert(0, 'plain '); + t.insert(6, 'bold', { bold: true }); + el.insert(0, [t]); + srcFrag.insert(0, [el]); + + const dst = new Y.Doc(); + const dstFrag = dst.getXmlFragment('default'); + dstFrag.insert(0, [cloneXmlNode(srcFrag.get(0) as Y.XmlElement)]); + + const clonedText = (dstFrag.get(0) as Y.XmlElement).get(0) as Y.XmlText; + expect(clonedText.toDelta()).toEqual([ + { insert: 'plain ' }, + { insert: 'bold', attributes: { bold: true } }, + ]); + }); + }); +}); diff --git a/apps/server/src/integrations/git-sync/services/yjs-body-merge.ts b/apps/server/src/integrations/git-sync/services/yjs-body-merge.ts new file mode 100644 index 00000000..ccc2e465 --- /dev/null +++ b/apps/server/src/integrations/git-sync/services/yjs-body-merge.ts @@ -0,0 +1,160 @@ +import * as Y from 'yjs'; + +/** + * Block-level merge of an incoming (git) page body into a LIVE Yjs document, + * replacing the previous full-body "delete everything + re-insert" write that + * clobbered concurrent human edits on every sync (review #5 — "запись делать + * через мерж"). + * + * Strategy: diff the two documents at TOP-LEVEL BLOCK granularity (an LCS over a + * canonical structural serialization of each block) and apply only the minimal + * insert/delete operations. Blocks that are byte-identical on both sides are + * left UNTOUCHED in the live doc — so a human editing one paragraph is unaffected + * when git changes a different paragraph, and an unchanged re-sync is a complete + * no-op (zero Yjs operations). Yjs then CRDT-merges the minimal ops with any + * concurrent edits. + * + * Limitation (honest): this is a 2-way merge (live vs incoming). For a block that + * BOTH sides changed since the last sync it cannot tell which is newer without a + * common ancestor, so the incoming (git) version wins for that one block. A full + * 3-way merge would need the last-synced base plumbed from the engine; the common + * cases — unchanged resync, and edits to DIFFERENT blocks — are handled losslessly. + */ + +type XmlNode = Y.XmlElement | Y.XmlText | Y.XmlHook; + +/** + * Canonical, comparable serialization of a Yjs XML node (structure + text + + * marks + attributes), with attribute keys sorted so equal blocks always produce + * an identical string regardless of attribute insertion order. + */ +export function serializeXmlNode(node: unknown): unknown { + if (node instanceof Y.XmlText) { + return { t: node.toDelta() }; + } + if (node instanceof Y.XmlElement) { + const attrs = node.getAttributes() as Record; + const sorted: Record = {}; + for (const k of Object.keys(attrs).sort()) sorted[k] = attrs[k]; + return { + n: node.nodeName, + a: sorted, + c: node.toArray().map(serializeXmlNode), + }; + } + // XmlHook / unknown: fall back to a stable string so it compares by identity + // of its serialized form (these do not occur in the Docmost block schema). + return { u: String(node) }; +} + +const key = (node: unknown): string => JSON.stringify(serializeXmlNode(node)); + +/** + * Deep-clone a detached/owned Yjs XML node into a fresh node that can be inserted + * into ANOTHER document (Yjs types are bound to their doc, so cross-doc moves are + * impossible — we rebuild). Preserves nodeName, attributes, text+marks (via the + * XmlText delta) and the full child subtree. + */ +export function cloneXmlNode(node: XmlNode): Y.XmlElement | Y.XmlText { + if (node instanceof Y.XmlText) { + const t = new Y.XmlText(); + const delta = node.toDelta(); + if (delta.length) t.applyDelta(delta); + return t; + } + if (node instanceof Y.XmlElement) { + const el = new Y.XmlElement(node.nodeName); + const attrs = node.getAttributes() as Record; + for (const k of Object.keys(attrs)) el.setAttribute(k, attrs[k] as string); + const kids = node.toArray().map((c) => cloneXmlNode(c as XmlNode)); + if (kids.length) el.insert(0, kids); + return el; + } + // Best-effort for any other node type (XmlHook — does not occur in the + // Docmost block schema): an empty paragraph so the merge never crashes. + return new Y.XmlElement('paragraph'); +} + +type Op = + | { op: 'keep' } + | { op: 'del' } + | { op: 'ins'; bi: number }; + +/** + * LCS-based edit script turning sequence `a` (live block keys) into `b` (incoming + * block keys): a run of keep/del/ins ops. O(n*m) table — fine for page block + * counts. + */ +export function diffBlocks(a: string[], b: string[]): Op[] { + const n = a.length; + const m = b.length; + const dp: number[][] = Array.from({ length: n + 1 }, () => + new Array(m + 1).fill(0), + ); + for (let i = n - 1; i >= 0; i--) { + for (let j = m - 1; j >= 0; j--) { + dp[i][j] = + a[i] === b[j] + ? dp[i + 1][j + 1] + 1 + : Math.max(dp[i + 1][j], dp[i][j + 1]); + } + } + const ops: Op[] = []; + let i = 0; + let j = 0; + while (i < n && j < m) { + if (a[i] === b[j]) { + ops.push({ op: 'keep' }); + i++; + j++; + } else if (dp[i + 1][j] >= dp[i][j + 1]) { + ops.push({ op: 'del' }); + i++; + } else { + ops.push({ op: 'ins', bi: j }); + j++; + } + } + while (i < n) { + ops.push({ op: 'del' }); + i++; + } + while (j < m) { + ops.push({ op: 'ins', bi: j }); + j++; + } + return ops; +} + +/** + * Merge `target` block children into `live`, mutating `live` in place with the + * minimal set of inserts/deletes. MUST be called inside a Yjs transaction. + * Returns the number of block operations applied (0 == content already identical). + */ +export function mergeXmlFragments( + live: Y.XmlFragment, + target: Y.XmlFragment, +): number { + const liveKids = live.toArray(); + const targetKids = target.toArray(); + const liveKeys = liveKids.map(key); + const targetKeys = targetKids.map(key); + + const ops = diffBlocks(liveKeys, targetKeys); + + let cursor = 0; // index into the LIVE fragment as we mutate it + let applied = 0; + for (const op of ops) { + if (op.op === 'keep') { + cursor++; + } else if (op.op === 'del') { + live.delete(cursor, 1); // remove the live block at the cursor; do not advance + applied++; + } else { + live.insert(cursor, [cloneXmlNode(targetKids[op.bi] as XmlNode)]); + cursor++; + applied++; + } + } + return applied; +}