fix(git-sync): merge git body into the live doc block-by-block (no clobber)

Supersedes the active-session "defer" guard with a real merge (review #5 —
"запись делать через мерж", not skip-while-editing).

writeBody no longer does delete-all + re-insert (which discarded a concurrent
editor's in-flight changes on every sync). It now diffs the live body against the
incoming git body at TOP-LEVEL BLOCK granularity (LCS over a canonical structural
serialization) and applies only the minimal inserts/deletes:
- a block a human is editing is left UNTOUCHED when git changed a DIFFERENT block;
- an unchanged resync is a complete 0-op write;
- Yjs CRDT-merges the minimal ops with concurrent edits.

New yjs-body-merge.ts (mergeXmlFragments + cloneXmlNode + diffBlocks) is pure-Yjs
and unit-tested with real Y.Docs (8 tests): identical->0 ops, edit-one-block keeps
the other block instances, append/delete keep neighbours, marks survive the
cross-doc clone. Crash-safety kept: the incoming doc is built before the
connection opens, so a transform failure can't empty the body.

Removed: the ActiveEditSessionError defer path and the now-unused
CollaborationGateway.getActiveEditorCount.

Honest limitation: this is a 2-way merge — for a block BOTH sides changed since the
last sync, git wins (no common ancestor to decide). A full 3-way merge would need
the last-synced base plumbed from the engine; the dominant cases (unchanged
resync, edits to different blocks) are now lossless.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
claude code agent 227
2026-06-23 15:21:14 +03:00
parent 9c805e8069
commit 3bba9425f4
5 changed files with 350 additions and 90 deletions

View File

@@ -32,10 +32,7 @@ jest.mock('@docmost/editor-ext', () => ({
}));
import * as Y from 'yjs';
import {
GitmostDataSourceService,
ActiveEditSessionError,
} from './gitmost-datasource.service';
import { GitmostDataSourceService } from './gitmost-datasource.service';
// Focused unit/contract test for the native GitSyncClient adapter (plan §3).
// No DB, no real collab server: the repos/services/gateway are mocked and we
@@ -56,10 +53,7 @@ interface Mocks {
movePage: AnyMock;
removePage: AnyMock;
};
collabGateway: {
openDirectConnection: AnyMock;
getActiveEditorCount: AnyMock;
};
collabGateway: { openDirectConnection: AnyMock };
// Minimal Kysely-ish chainable mock for the direct-query paths.
db: any;
// Captured collab connection (the fake conn the gateway returns).
@@ -110,8 +104,6 @@ function build(rows: any[] = []): {
conn.context = ctx;
return conn;
}),
// Default: no live editor sessions, so body writes proceed.
getActiveEditorCount: jest.fn(() => 0),
},
db: {
selectFrom: jest.fn(() => makeQueryBuilder(rows)),
@@ -247,31 +239,11 @@ describe('GitmostDataSourceService', () => {
expect(res.updatedAt).toBe('2026-06-20T11:00:00.000Z');
});
it('defers (throws ActiveEditSessionError) when a human is editing the page — never clobbers', async () => {
const { service, mocks } = build();
// A live editor session on this page.
mocks.collabGateway.getActiveEditorCount.mockReturnValue(1);
await expect(
service.bind(CTX).importPageMarkdown('p1', '# Hello\n\nworld'),
).rejects.toBeInstanceOf(ActiveEditSessionError);
// The destructive full-body write must NOT have happened: no connection
// opened, no transact run. The engine's push loop catches this and retries
// on the next poll once the editor disconnects.
expect(mocks.collabGateway.getActiveEditorCount).toHaveBeenCalledWith(
'page.p1',
);
expect(mocks.collabGateway.openDirectConnection).not.toHaveBeenCalled();
expect(mocks.conn.transact).not.toHaveBeenCalled();
});
it('crash-safe: the captured write applies real content (update built before clearing)', async () => {
// The replacement Yjs update is computed BEFORE the connection opens / the
// fragment is cleared, so a transform failure can never leave the body
// emptied. Here we run the captured transact callback against a REAL Y.Doc
// and confirm it ends up with content (the precomputed update is valid and
// applied), i.e. the write produces a non-empty body rather than wiping it.
it('crash-safe: the incoming doc is built before the connection opens, and the captured merge applies content', async () => {
// The incoming Yjs doc is built BEFORE the connection opens, so a transform
// failure can never mutate the live body. Here we run the captured transact
// callback (the block-level merge) against a REAL empty Y.Doc and confirm it
// ends up with content — the write produces a non-empty body, never wipes it.
const { service, mocks } = build();
mocks.pageRepo.findById.mockResolvedValue({
id: 'p1',
@@ -281,7 +253,7 @@ describe('GitmostDataSourceService', () => {
const realDoc = new Y.Doc();
expect(() => mocks.conn.capturedFn?.(realDoc)).not.toThrow();
// The body fragment is non-empty: the markdown was converted and applied.
// The body fragment is non-empty: the incoming block was merged in.
expect(realDoc.getXmlFragment('default').length).toBeGreaterThan(0);
});
});

View File

@@ -1,5 +1,4 @@
import { Injectable, Logger, NotFoundException } from '@nestjs/common';
import * as Y from 'yjs';
import { TiptapTransformer } from '@hocuspocus/transformer';
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
import {
@@ -15,6 +14,7 @@ import { KyselyDB } from '@docmost/db/types/kysely.types';
import { PageService } from '../../../core/page/services/page.service';
import { CollaborationGateway } from '../../../collaboration/collaboration.gateway';
import { tiptapExtensions } from '../../../collaboration/collaboration.util';
import { mergeXmlFragments } from './yjs-body-merge';
import { AuthProvenanceData } from '../../../common/decorators/auth-provenance.decorator';
/**
@@ -42,23 +42,6 @@ const GIT_SYNC_PROVENANCE: AuthProvenanceData = {
aiChatId: null,
};
/**
* Thrown when a git -> page body write is skipped because a human is editing the
* page RIGHT NOW (a live collab session). The engine's push loop catches this
* per page, records it as a (non-fatal) failure, and does NOT advance the
* loop-guard for that page — so the write is retried on the next poll once the
* editor disconnects, instead of clobbering their in-flight edits with a
* full-body replace (plan §15.6 / review #5).
*/
export class ActiveEditSessionError extends Error {
constructor(pageId: string) {
super(
`git-sync: page ${pageId} has an active edit session; deferring body write`,
);
this.name = 'ActiveEditSessionError';
}
}
/**
* Native, in-process implementation of the engine's `GitSyncClient` seam
* (plan §3). Reads go through repositories (PageRepo/SpaceRepo); body writes go
@@ -398,28 +381,15 @@ export class GitmostDataSourceService {
): Promise<void> {
const documentName = `page.${pageId}`;
// Do NOT clobber a page someone is editing right now. The write below is a
// full-body replace (delete-all + re-insert); applied over a live editing
// session it would discard the user's in-flight changes. If a human editor
// is connected, defer: throw so the engine retries on the next poll once
// they disconnect (review #5 — "не писать в страницу с активной сессией").
if (this.collabGateway.getActiveEditorCount(documentName) > 0) {
this.logger.debug(
`Skipping git-sync body write for ${documentName}: active edit session`,
);
throw new ActiveEditSessionError(pageId);
}
// Build the replacement Yjs state BEFORE touching the live doc. If the
// transform throws (a malformed/unsupported doc), we must NOT have already
// cleared the fragment — otherwise a conversion failure would leave the page
// with an empty body (review #5 — crash-safe conversion).
const next = TiptapTransformer.toYdoc(
// Build the incoming Yjs doc BEFORE opening the connection / touching the
// live doc. If the transform throws (a malformed/unsupported doc) we must NOT
// have mutated the live body — otherwise a conversion failure could leave the
// page empty (review #5 — crash-safe conversion).
const targetDoc = TiptapTransformer.toYdoc(
prosemirrorJson,
'default',
tiptapExtensions,
);
const update = Y.encodeStateAsUpdate(next);
const conn = await this.collabGateway.openDirectConnection(documentName, {
actor: 'git-sync',
@@ -430,9 +400,15 @@ export class GitmostDataSourceService {
});
try {
await conn.transact((doc) => {
const fragment = doc.getXmlFragment('default');
if (fragment.length > 0) fragment.delete(0, fragment.length);
Y.applyUpdate(doc, update);
// Block-level MERGE rather than a full-body replace (review #5): diff the
// live body against the incoming git body and apply only the blocks that
// actually changed. Blocks a human is concurrently editing — anything git
// did not change — are left untouched, and an unchanged resync is a 0-op
// write. Yjs CRDT-merges the minimal ops with live edits.
mergeXmlFragments(
doc.getXmlFragment('default'),
targetDoc.getXmlFragment('default'),
);
});
} finally {
await conn.disconnect();

View File

@@ -0,0 +1,167 @@
import * as Y from 'yjs';
import {
mergeXmlFragments,
cloneXmlNode,
diffBlocks,
} from './yjs-body-merge';
// Build a Y.XmlFragment('default') in `doc` from a list of paragraph specs.
// Each spec is the paragraph's plain text (a single XmlText child).
function buildFragment(doc: Y.Doc, paragraphs: string[]): Y.XmlFragment {
const frag = doc.getXmlFragment('default');
const blocks = paragraphs.map((text) => {
const el = new Y.XmlElement('paragraph');
const t = new Y.XmlText();
if (text) t.insert(0, text);
el.insert(0, [t]);
return el;
});
if (blocks.length) frag.insert(0, blocks);
return frag;
}
function texts(frag: Y.XmlFragment): string[] {
return frag.toArray().map((el) => (el as Y.XmlElement).toArray()
.map((c) => (c as Y.XmlText).toString())
.join(''));
}
describe('yjs-body-merge', () => {
describe('diffBlocks (LCS edit script)', () => {
it('identical sequences produce only keeps (no edits)', () => {
const ops = diffBlocks(['a', 'b', 'c'], ['a', 'b', 'c']);
expect(ops.every((o) => o.op === 'keep')).toBe(true);
});
it('a single changed middle element is one del + one ins', () => {
const ops = diffBlocks(['a', 'b', 'c'], ['a', 'B', 'c']);
expect(ops.filter((o) => o.op === 'del')).toHaveLength(1);
expect(ops.filter((o) => o.op === 'ins')).toHaveLength(1);
expect(ops.filter((o) => o.op === 'keep')).toHaveLength(2);
});
});
describe('mergeXmlFragments', () => {
it('identical content is a complete no-op (0 ops) — never clobbers an unchanged resync', () => {
const live = new Y.Doc();
const target = new Y.Doc();
const liveFrag = buildFragment(live, ['one', 'two', 'three']);
const targetFrag = buildFragment(target, ['one', 'two', 'three']);
// Capture block identities to prove they are left untouched.
const before = liveFrag.toArray();
let applied = -1;
live.transact(() => {
applied = mergeXmlFragments(liveFrag, targetFrag);
});
expect(applied).toBe(0);
// Same Y.XmlElement instances — nothing was deleted/recreated.
expect(liveFrag.toArray()).toEqual(before);
expect(texts(liveFrag)).toEqual(['one', 'two', 'three']);
});
it('a human edit to one block survives a git change to a DIFFERENT block', () => {
// Live: the human has the doc open; block 0 holds their edit. Git changed
// only block 2. The merge must touch ONLY block 2 and leave block 0 (and
// its in-flight edit) exactly as-is.
const live = new Y.Doc();
const target = new Y.Doc();
const liveFrag = buildFragment(live, ['HUMAN EDIT', 'shared', 'old tail']);
const targetFrag = buildFragment(target, [
'HUMAN EDIT',
'shared',
'new tail from git',
]);
const block0Before = liveFrag.get(0); // the human's block instance
const block1Before = liveFrag.get(1);
let applied = -1;
live.transact(() => {
applied = mergeXmlFragments(liveFrag, targetFrag);
});
// Only block 2 was replaced: one del + one ins.
expect(applied).toBe(2);
// The human's block and the shared block are the SAME instances (untouched).
expect(liveFrag.get(0)).toBe(block0Before);
expect(liveFrag.get(1)).toBe(block1Before);
// Block 2 now carries git's content.
expect(texts(liveFrag)).toEqual([
'HUMAN EDIT',
'shared',
'new tail from git',
]);
});
it('appends a new trailing block without disturbing existing ones', () => {
const live = new Y.Doc();
const target = new Y.Doc();
const liveFrag = buildFragment(live, ['a', 'b']);
const targetFrag = buildFragment(target, ['a', 'b', 'c']);
const a = liveFrag.get(0);
const b = liveFrag.get(1);
let applied = -1;
live.transact(() => {
applied = mergeXmlFragments(liveFrag, targetFrag);
});
expect(applied).toBe(1); // single insert
expect(liveFrag.get(0)).toBe(a);
expect(liveFrag.get(1)).toBe(b);
expect(texts(liveFrag)).toEqual(['a', 'b', 'c']);
});
it('deletes a removed block, keeping its neighbours', () => {
const live = new Y.Doc();
const target = new Y.Doc();
const liveFrag = buildFragment(live, ['a', 'b', 'c']);
const targetFrag = buildFragment(target, ['a', 'c']);
const a = liveFrag.get(0);
let applied = -1;
live.transact(() => {
applied = mergeXmlFragments(liveFrag, targetFrag);
});
expect(applied).toBe(1); // single delete
expect(liveFrag.get(0)).toBe(a);
expect(texts(liveFrag)).toEqual(['a', 'c']);
});
it('a fully different body is replaced (and stays valid)', () => {
const live = new Y.Doc();
const target = new Y.Doc();
const liveFrag = buildFragment(live, ['x', 'y']);
const targetFrag = buildFragment(target, ['p', 'q', 'r']);
live.transact(() => mergeXmlFragments(liveFrag, targetFrag));
expect(texts(liveFrag)).toEqual(['p', 'q', 'r']);
});
});
describe('cloneXmlNode', () => {
it('preserves text marks (XmlText delta) across docs', () => {
const src = new Y.Doc();
const srcFrag = src.getXmlFragment('default');
const el = new Y.XmlElement('paragraph');
const t = new Y.XmlText();
t.insert(0, 'plain ');
t.insert(6, 'bold', { bold: true });
el.insert(0, [t]);
srcFrag.insert(0, [el]);
const dst = new Y.Doc();
const dstFrag = dst.getXmlFragment('default');
dstFrag.insert(0, [cloneXmlNode(srcFrag.get(0) as Y.XmlElement)]);
const clonedText = (dstFrag.get(0) as Y.XmlElement).get(0) as Y.XmlText;
expect(clonedText.toDelta()).toEqual([
{ insert: 'plain ' },
{ insert: 'bold', attributes: { bold: true } },
]);
});
});
});

View File

@@ -0,0 +1,160 @@
import * as Y from 'yjs';
/**
* Block-level merge of an incoming (git) page body into a LIVE Yjs document,
* replacing the previous full-body "delete everything + re-insert" write that
* clobbered concurrent human edits on every sync (review #5 — "запись делать
* через мерж").
*
* Strategy: diff the two documents at TOP-LEVEL BLOCK granularity (an LCS over a
* canonical structural serialization of each block) and apply only the minimal
* insert/delete operations. Blocks that are byte-identical on both sides are
* left UNTOUCHED in the live doc — so a human editing one paragraph is unaffected
* when git changes a different paragraph, and an unchanged re-sync is a complete
* no-op (zero Yjs operations). Yjs then CRDT-merges the minimal ops with any
* concurrent edits.
*
* Limitation (honest): this is a 2-way merge (live vs incoming). For a block that
* BOTH sides changed since the last sync it cannot tell which is newer without a
* common ancestor, so the incoming (git) version wins for that one block. A full
* 3-way merge would need the last-synced base plumbed from the engine; the common
* cases — unchanged resync, and edits to DIFFERENT blocks — are handled losslessly.
*/
type XmlNode = Y.XmlElement | Y.XmlText | Y.XmlHook;
/**
* Canonical, comparable serialization of a Yjs XML node (structure + text +
* marks + attributes), with attribute keys sorted so equal blocks always produce
* an identical string regardless of attribute insertion order.
*/
export function serializeXmlNode(node: unknown): unknown {
if (node instanceof Y.XmlText) {
return { t: node.toDelta() };
}
if (node instanceof Y.XmlElement) {
const attrs = node.getAttributes() as Record<string, unknown>;
const sorted: Record<string, unknown> = {};
for (const k of Object.keys(attrs).sort()) sorted[k] = attrs[k];
return {
n: node.nodeName,
a: sorted,
c: node.toArray().map(serializeXmlNode),
};
}
// XmlHook / unknown: fall back to a stable string so it compares by identity
// of its serialized form (these do not occur in the Docmost block schema).
return { u: String(node) };
}
const key = (node: unknown): string => JSON.stringify(serializeXmlNode(node));
/**
* Deep-clone a detached/owned Yjs XML node into a fresh node that can be inserted
* into ANOTHER document (Yjs types are bound to their doc, so cross-doc moves are
* impossible — we rebuild). Preserves nodeName, attributes, text+marks (via the
* XmlText delta) and the full child subtree.
*/
export function cloneXmlNode(node: XmlNode): Y.XmlElement | Y.XmlText {
if (node instanceof Y.XmlText) {
const t = new Y.XmlText();
const delta = node.toDelta();
if (delta.length) t.applyDelta(delta);
return t;
}
if (node instanceof Y.XmlElement) {
const el = new Y.XmlElement(node.nodeName);
const attrs = node.getAttributes() as Record<string, unknown>;
for (const k of Object.keys(attrs)) el.setAttribute(k, attrs[k] as string);
const kids = node.toArray().map((c) => cloneXmlNode(c as XmlNode));
if (kids.length) el.insert(0, kids);
return el;
}
// Best-effort for any other node type (XmlHook — does not occur in the
// Docmost block schema): an empty paragraph so the merge never crashes.
return new Y.XmlElement('paragraph');
}
type Op =
| { op: 'keep' }
| { op: 'del' }
| { op: 'ins'; bi: number };
/**
* LCS-based edit script turning sequence `a` (live block keys) into `b` (incoming
* block keys): a run of keep/del/ins ops. O(n*m) table — fine for page block
* counts.
*/
export function diffBlocks(a: string[], b: string[]): Op[] {
const n = a.length;
const m = b.length;
const dp: number[][] = Array.from({ length: n + 1 }, () =>
new Array(m + 1).fill(0),
);
for (let i = n - 1; i >= 0; i--) {
for (let j = m - 1; j >= 0; j--) {
dp[i][j] =
a[i] === b[j]
? dp[i + 1][j + 1] + 1
: Math.max(dp[i + 1][j], dp[i][j + 1]);
}
}
const ops: Op[] = [];
let i = 0;
let j = 0;
while (i < n && j < m) {
if (a[i] === b[j]) {
ops.push({ op: 'keep' });
i++;
j++;
} else if (dp[i + 1][j] >= dp[i][j + 1]) {
ops.push({ op: 'del' });
i++;
} else {
ops.push({ op: 'ins', bi: j });
j++;
}
}
while (i < n) {
ops.push({ op: 'del' });
i++;
}
while (j < m) {
ops.push({ op: 'ins', bi: j });
j++;
}
return ops;
}
/**
* Merge `target` block children into `live`, mutating `live` in place with the
* minimal set of inserts/deletes. MUST be called inside a Yjs transaction.
* Returns the number of block operations applied (0 == content already identical).
*/
export function mergeXmlFragments(
live: Y.XmlFragment,
target: Y.XmlFragment,
): number {
const liveKids = live.toArray();
const targetKids = target.toArray();
const liveKeys = liveKids.map(key);
const targetKeys = targetKids.map(key);
const ops = diffBlocks(liveKeys, targetKeys);
let cursor = 0; // index into the LIVE fragment as we mutate it
let applied = 0;
for (const op of ops) {
if (op.op === 'keep') {
cursor++;
} else if (op.op === 'del') {
live.delete(cursor, 1); // remove the live block at the cursor; do not advance
applied++;
} else {
live.insert(cursor, [cloneXmlNode(targetKids[op.bi] as XmlNode)]);
cursor++;
applied++;
}
}
return applied;
}