diff --git a/apps/server/src/collaboration/collaboration.gateway.ts b/apps/server/src/collaboration/collaboration.gateway.ts index b46c13c8..0b57eff6 100644 --- a/apps/server/src/collaboration/collaboration.gateway.ts +++ b/apps/server/src/collaboration/collaboration.gateway.ts @@ -149,6 +149,45 @@ export class CollaborationGateway { return this.hocuspocus.openDirectConnection(documentName, context); } + /** + * Write a git-originated body into a page, applying the merge on the instance + * that OWNS the live Y.Doc so a connected editor CONVERGES on the change. + * + * git-sync must NOT use openDirectConnection directly for this: that opens the + * document on whichever instance/process runs git-sync (the API/worker). When + * an editor is connected to a DIFFERENT collab instance/process, that is a + * SEPARATE, detached Y.Doc — the merge lands in the detached doc and the DB, + * but the live editor never receives the Yjs update; its next debounced + * autosave then overwrites the DB with its stale state and SILENTLY REVERTS + * the git change (the data-loss bug). Routing through the custom-event channel + * runs the merge on the owning instance's shared Document, whose update is + * broadcast to every connection (handleUpdate), so the editor's CRDT converges + * on the merged result. + * + * Without redis there is a single instance, so the write runs locally — which + * is already the owning (and only) instance the editor is connected to. + */ + async writePageBody( + documentName: string, + payload: { + prosemirrorJson: unknown; + baseProsemirrorJson?: unknown; + userId: string; + }, + ): Promise { + if (this.redisSync) { + await this.handleYjsEvent( + 'gitSyncWriteBody', + documentName, + payload as any, + ); + return; + } + await this.collabEventsService + .getHandlers(this.hocuspocus) + .gitSyncWriteBody(documentName, payload as any); + } + /* *Can be used before calling openDirectConnection directly */ diff --git a/apps/server/src/collaboration/collaboration.handler.git-sync.spec.ts b/apps/server/src/collaboration/collaboration.handler.git-sync.spec.ts new file mode 100644 index 00000000..2aadfbde --- /dev/null +++ b/apps/server/src/collaboration/collaboration.handler.git-sync.spec.ts @@ -0,0 +1,182 @@ +// Exercises the REAL `gitSyncWriteBody` collab handler (the owner-routed body +// write the data-loss fix introduces). The handler imports the editor graph via +// collaboration.util / yjs.util (tiptapExtensions -> editor-ext -> react-dom, +// unloadable under jest's node env, same coupling noted in +// gitmost-datasource.service.spec.ts), so we stub those + the transformer. The +// stubbed toYdoc builds paragraph blocks straight from the ProseMirror JSON so +// we can assert convergence on real text. +jest.mock('./collaboration.util', () => ({ + tiptapExtensions: [], + getPageId: (name: string) => name.replace(/^page\./, ''), + prosemirrorNodeToYElement: jest.fn(), +})); +jest.mock('./yjs.util', () => ({ + setYjsMark: jest.fn(), + updateYjsMarkAttribute: jest.fn(), +})); +jest.mock('@hocuspocus/transformer', () => { + const Yjs = require('yjs'); + return { + TiptapTransformer: { + toYdoc: (json: any) => { + if (json?.__throw) throw new Error('boom: malformed doc'); + const d = new Yjs.Doc(); + const frag = d.getXmlFragment('default'); + const blocks = (json?.content ?? []).map((node: any) => { + const el = new Yjs.XmlElement(node.type || 'paragraph'); + const text = (node.content ?? []) + .map((t: any) => t.text ?? '') + .join(''); + const t = new Yjs.XmlText(); + if (text) t.insert(0, text); + el.insert(0, [t]); + return el; + }); + if (blocks.length) frag.insert(0, blocks); + return d; + }, + }, + }; +}); + +import * as Y from 'yjs'; +import { CollaborationHandler } from './collaboration.handler'; + +const pmDoc = (...paras: string[]) => ({ + type: 'doc', + content: paras.map((text) => ({ + type: 'paragraph', + content: text ? [{ type: 'text', text }] : [], + })), +}); + +const texts = (frag: Y.XmlFragment): string[] => + frag.toArray().map((el) => + (el as Y.XmlElement) + .toArray() + .map((c) => (c as Y.XmlText).toString()) + .join(''), + ); + +// Build a fake Hocuspocus whose openDirectConnection yields a DirectConnection +// over a REAL shared Document, with a connected "editor" doc that receives the +// shared doc's updates (modelling Document.handleUpdate's broadcast on the +// OWNING instance). Initial content carries live block ids; the editor starts +// fully synced with the shared doc. +function fakeHocuspocus(initial: { text: string; id: string }[]) { + const shared = new Y.Doc(); + const frag = shared.getXmlFragment('default'); + shared.transact(() => { + frag.insert( + 0, + initial.map((s) => { + const el = new Y.XmlElement('paragraph'); + el.setAttribute('id', s.id); + const t = new Y.XmlText(); + if (s.text) t.insert(0, s.text); + el.insert(0, [t]); + return el; + }), + ); + }); + const editor = new Y.Doc(); + Y.applyUpdate(editor, Y.encodeStateAsUpdate(shared)); + // Broadcast relay: server-originated updates flow to the connected editor. + shared.on('update', (u: Uint8Array, origin: any) => { + if (origin !== 'editor') Y.applyUpdate(editor, u, 'server'); + }); + + const openDirectConnection = jest.fn(async () => ({ + // DirectConnection.transact runs the fn directly against the Document (no + // wrapping Y transaction), exactly like @hocuspocus/server. + transact: async (fn: (doc: Y.Doc) => void) => fn(shared), + disconnect: jest.fn(async () => undefined), + })); + + return { hocuspocus: { openDirectConnection } as any, shared, editor }; +} + +describe('CollaborationHandler.gitSyncWriteBody (owner-routed body write)', () => { + it('converges a connected editor on the git change (no silent revert)', async () => { + const { hocuspocus, shared, editor } = fakeHocuspocus([ + { text: 'alpha', id: 'p1' }, + { text: 'beta', id: 'p2' }, + ]); + const handler = new CollaborationHandler(); + const handlers = handler.getHandlers(hocuspocus); + + // git changed block 1 beta -> beta2; base is the pre-change content. + await handlers.gitSyncWriteBody('page.x', { + prosemirrorJson: pmDoc('alpha', 'beta2'), + baseProsemirrorJson: pmDoc('alpha', 'beta'), + userId: 'svc-user', + }); + + // The shared (owning-instance) doc holds the merge... + expect(texts(shared.getXmlFragment('default'))).toEqual(['alpha', 'beta2']); + // ...and the connected editor CONVERGED via the broadcast (the bug would + // leave it on 'beta' and revert the page on its next autosave). + expect(texts(editor.getXmlFragment('default'))).toEqual(['alpha', 'beta2']); + }); + + it('preserves a concurrent edit to a DIFFERENT block (3-way, finding #2)', async () => { + const { hocuspocus, shared, editor } = fakeHocuspocus([ + { text: 'alpha', id: 'p1' }, + { text: 'beta', id: 'p2' }, + ]); + // The editor is actively editing block 0 while the push arrives. + const eFrag = editor.getXmlFragment('default'); + editor.transact( + () => (eFrag.get(0) as Y.XmlElement).get(0) instanceof Y.XmlText && + ((eFrag.get(0) as Y.XmlElement).get(0) as Y.XmlText).insert(5, ' EDIT'), + 'editor', + ); + Y.applyUpdate(shared, Y.encodeStateAsUpdate(editor), 'editor'); + + const handler = new CollaborationHandler(); + await handler.getHandlers(hocuspocus).gitSyncWriteBody('page.x', { + prosemirrorJson: pmDoc('alpha', 'beta2'), + baseProsemirrorJson: pmDoc('alpha', 'beta'), + userId: 'svc-user', + }); + + // Human's block-0 edit AND git's block-1 change both survive on the editor. + expect(texts(editor.getXmlFragment('default'))).toEqual([ + 'alpha EDIT', + 'beta2', + ]); + }); + + it('crash-safe: a transform failure never opens the connection or mutates the live doc', async () => { + const { hocuspocus, shared } = fakeHocuspocus([{ text: 'alpha', id: 'p1' }]); + const before = texts(shared.getXmlFragment('default')); + const handler = new CollaborationHandler(); + + await expect( + handler.getHandlers(hocuspocus).gitSyncWriteBody('page.x', { + prosemirrorJson: { __throw: true } as any, + userId: 'svc-user', + }), + ).rejects.toThrow('boom'); + + // The incoming doc is built BEFORE opening the connection, so the throw + // happens first: the live doc is untouched and no connection was opened. + expect(hocuspocus.openDirectConnection).not.toHaveBeenCalled(); + expect(texts(shared.getXmlFragment('default'))).toEqual(before); + }); + + it('falls back to a 2-way merge when no base is supplied', async () => { + const { hocuspocus, shared, editor } = fakeHocuspocus([ + { text: 'alpha', id: 'p1' }, + ]); + const handler = new CollaborationHandler(); + + await handler.getHandlers(hocuspocus).gitSyncWriteBody('page.x', { + prosemirrorJson: pmDoc('alpha', 'gamma'), + userId: 'svc-user', + }); + + expect(texts(shared.getXmlFragment('default'))).toEqual(['alpha', 'gamma']); + expect(texts(editor.getXmlFragment('default'))).toEqual(['alpha', 'gamma']); + }); +}); diff --git a/apps/server/src/collaboration/collaboration.handler.ts b/apps/server/src/collaboration/collaboration.handler.ts index 2d4ae58f..d6074e8b 100644 --- a/apps/server/src/collaboration/collaboration.handler.ts +++ b/apps/server/src/collaboration/collaboration.handler.ts @@ -8,6 +8,10 @@ import { import { setYjsMark, updateYjsMarkAttribute, YjsSelection } from './yjs.util'; import * as Y from 'yjs'; import { User } from '@docmost/db/types/entity.types'; +import { + mergeXmlFragments, + mergeXmlFragments3Way, +} from '../integrations/git-sync/services/yjs-body-merge'; export type CollabEventHandlers = ReturnType< CollaborationHandler['getHandlers'] @@ -112,6 +116,69 @@ export class CollaborationHandler { }, ); }, + /** + * Git-sync body write, applied as a block-level MERGE into the LIVE doc on + * the instance that OWNS it (routed here via the custom-event channel — + * see CollaborationGateway.writePageBody). Running on the owning instance + * is what makes a connected editor CONVERGE: the merge mutates the shared + * Document, whose update is broadcast to every connection, so the editor's + * CRDT applies the git change instead of silently reverting it on its next + * autosave (the data-loss bug this fixes). + * + * With a `baseProsemirrorJson` (the last-synced common ancestor) it does a + * THREE-WAY merge — a block only the human changed is kept, a block only + * git changed is taken (conflicts -> git). Without a base it falls back to + * the 2-way merge. + */ + gitSyncWriteBody: async ( + documentName: string, + payload: { + prosemirrorJson: any; + baseProsemirrorJson?: any; + userId: string; + }, + ) => { + const { prosemirrorJson, baseProsemirrorJson, userId } = payload; + + // Build the incoming (and base) Yjs docs BEFORE opening the connection / + // touching the live doc. If a transform throws (a malformed/unsupported + // doc) we must NOT have mutated the live body — otherwise a conversion + // failure could leave the page empty (crash-safe conversion). + const targetDoc = TiptapTransformer.toYdoc( + prosemirrorJson, + 'default', + tiptapExtensions, + ); + const baseDoc = + baseProsemirrorJson != null + ? TiptapTransformer.toYdoc( + baseProsemirrorJson, + 'default', + tiptapExtensions, + ) + : null; + + // actor:'git-sync' + the service user flow into PersistenceExtension + // (lastUpdatedSource='git-sync', lastUpdatedById=userId). + await this.withYdocConnection( + hocuspocus, + documentName, + { actor: 'git-sync', user: { id: userId } }, + (doc) => { + const liveFrag = doc.getXmlFragment('default'); + const targetFrag = targetDoc.getXmlFragment('default'); + if (baseDoc) { + mergeXmlFragments3Way( + liveFrag, + targetFrag, + baseDoc.getXmlFragment('default'), + ); + } else { + mergeXmlFragments(liveFrag, targetFrag); + } + }, + ); + }, }; } diff --git a/apps/server/src/integrations/git-sync/services/git-ingest-convergence.spec.ts b/apps/server/src/integrations/git-sync/services/git-ingest-convergence.spec.ts new file mode 100644 index 00000000..39bb17cd --- /dev/null +++ b/apps/server/src/integrations/git-sync/services/git-ingest-convergence.spec.ts @@ -0,0 +1,180 @@ +import * as Y from 'yjs'; + +import { mergeXmlFragments3Way } from './yjs-body-merge'; + +/** + * Convergence repro for the git-ingest "silent revert" data-loss bug. + * + * ROOT CAUSE (confirmed): the merge logic itself is correct, but the git-ingest + * write was applied via `openDirectConnection` on whichever instance/process + * runs git-sync (the api/worker). When an editor is connected to a DIFFERENT + * collab instance/process, that opens a SEPARATE, detached Y.Doc. The merge + * lands in that detached doc (and the DB), but the live editor's Y.Doc never + * receives the Yjs update — so its next debounced autosave overwrites the DB + * with its STALE state and silently reverts the git change. + * + * These tests reproduce the invariant deterministically at the Yjs level (two + * Y.Docs exchanging updates), because the real failure is DISTRIBUTED — it only + * manifests when the write and the editor live on different instances, which a + * single in-process Hocuspocus cannot reproduce (in one process the direct + * connection already shares the editor's doc). HONEST SCOPE: this models the two + * outcomes; full cross-instance convergence is not (and cannot be) proven in a + * unit test without a live multi-instance Hocuspocus + redis. + * + * PATH B (the BUG): the git update is NOT delivered to the editor's doc — the + * editor's later autosave reverts the change. Asserts the LOSS. + * PATH A (the FIX): the git update IS delivered to the editor's doc as a Yjs + * update — which is exactly what running the merge on the OWNING instance's + * shared Document does (its update is broadcast to every connection). The + * editor's CRDT converges and a later autosave preserves the git change. + * + * The fix routes git-sync's body write through CollaborationGateway.writePageBody + * (the custom-event channel) so it executes on the owning instance — turning + * PATH B into PATH A. + */ + +type Spec = { text: string; id?: string }; + +// Build a Y.XmlFragment('default'). `id` is set only when provided, mirroring +// the live doc (block UniqueIDs present) vs a git-parsed body (ids absent). +function buildFragment(doc: Y.Doc, specs: Spec[]): Y.XmlFragment { + const frag = doc.getXmlFragment('default'); + const blocks = specs.map((s) => { + const el = new Y.XmlElement('paragraph'); + if (s.id) el.setAttribute('id', s.id); + const t = new Y.XmlText(); + if (s.text) t.insert(0, s.text); + el.insert(0, [t]); + return el; + }); + if (blocks.length) frag.insert(0, blocks); + return frag; +} + +const texts = (frag: Y.XmlFragment): string[] => + frag.toArray().map((el) => + (el as Y.XmlElement) + .toArray() + .map((c) => (c as Y.XmlText).toString()) + .join(''), + ); + +// Append '!' to the end of the given block's text — a tiny human edit that +// stands in for a connected editor's autosave-triggering keystroke. +function humanEdit(doc: Y.Doc, blockIndex: number, mark = '!'): void { + const frag = doc.getXmlFragment('default'); + const el = frag.get(blockIndex) as Y.XmlElement; + const t = el.get(0) as Y.XmlText; + doc.transact(() => t.insert(t.length, mark)); +} + +describe('git-ingest convergence with an open editor', () => { + // Shared setup: the page is persisted with two blocks (live ids), and BOTH the + // server-side ingest doc (S) and the connected editor's doc (C) load that same + // state — they start fully synced, exactly like two instances that each loaded + // the page from the DB. + function setup() { + const db = new Y.Doc(); + buildFragment(db, [ + { text: 'alpha', id: 'p1' }, + { text: 'beta', id: 'p2' }, + ]); + const state0 = Y.encodeStateAsUpdate(db); + + const server = new Y.Doc(); // where the git merge is applied + Y.applyUpdate(server, state0); + const editor = new Y.Doc(); // the browser's live in-memory doc + Y.applyUpdate(editor, state0); + + // base (last-synced, from git markdown — no ids) == the pre-change content. + const baseDoc = new Y.Doc(); + const baseFrag = buildFragment(baseDoc, [{ text: 'alpha' }, { text: 'beta' }]); + return { state0, server, editor, baseFrag }; + } + + // git changed the SECOND block alpha/beta -> beta2; the editor is idle on it. + function applyGitMerge(server: Y.Doc, baseFrag: Y.XmlFragment): Uint8Array { + const targetDoc = new Y.Doc(); + const targetFrag = buildFragment(targetDoc, [ + { text: 'alpha' }, + { text: 'beta2' }, + ]); + let captured: Uint8Array | null = null; + const onUpdate = (u: Uint8Array) => { + // Accumulate (the merge emits one update per op when unwrapped); here a + // single transact yields one update covering the whole merge. + captured = captured ? Y.mergeUpdates([captured, u]) : u; + }; + server.on('update', onUpdate); + server.transact(() => + mergeXmlFragments3Way( + server.getXmlFragment('default'), + targetFrag, + baseFrag, + ), + ); + server.off('update', onUpdate); + return captured!; + } + + it('PATH B (the BUG): undelivered git update is reverted by the editor autosave — DATA LOSS', () => { + const { server, editor, baseFrag } = setup(); + + // git merge lands on the server doc only. + applyGitMerge(server, baseFrag); + expect(texts(server.getXmlFragment('default'))).toEqual(['alpha', 'beta2']); + + // The editor NEVER receives the update (detached doc on another instance). + // It makes an unrelated edit on block 0 and autosaves its full state. + humanEdit(editor, 0); + const persisted = new Y.Doc(); + Y.applyUpdate(persisted, Y.encodeStateAsUpdate(editor)); + + // git's 'beta2' is gone — the page reverted to 'beta'. This is the bug. + expect(texts(persisted.getXmlFragment('default'))).toEqual([ + 'alpha!', + 'beta', + ]); + }); + + it('PATH A (the FIX): delivering the git update to the editor converges — git change SURVIVES', () => { + const { server, editor, baseFrag } = setup(); + + // git merge on the server doc, capturing the broadcastable Yjs update. + const gitUpdate = applyGitMerge(server, baseFrag); + + // Running on the OWNING instance broadcasts the update to the connected + // editor (Document.handleUpdate). Model that: the editor applies it. + Y.applyUpdate(editor, gitUpdate); + expect(texts(editor.getXmlFragment('default'))).toEqual(['alpha', 'beta2']); + + // The editor now autosaves (unrelated edit on block 0). Its full state still + // carries git's change — no revert. + humanEdit(editor, 0); + const persisted = new Y.Doc(); + Y.applyUpdate(persisted, Y.encodeStateAsUpdate(editor)); + expect(texts(persisted.getXmlFragment('default'))).toEqual([ + 'alpha!', + 'beta2', + ]); + }); + + it('PATH A — concurrent edits to DIFFERENT paragraphs both survive (finding #2)', () => { + const { server, editor, baseFrag } = setup(); + + // The editor is actively editing block 0 (concurrent with the push). + humanEdit(editor, 0, ' EDIT'); + + // git changes block 1; merge on the server, broadcast to the editor. + const gitUpdate = applyGitMerge(server, baseFrag); + Y.applyUpdate(editor, gitUpdate); + + // Both sides preserved: the human's block-0 edit AND git's block-1 change. + const persisted = new Y.Doc(); + Y.applyUpdate(persisted, Y.encodeStateAsUpdate(editor)); + expect(texts(persisted.getXmlFragment('default'))).toEqual([ + 'alpha EDIT', + 'beta2', + ]); + }); +}); diff --git a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts index 8287da39..3027b392 100644 --- a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts +++ b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.spec.ts @@ -46,13 +46,7 @@ jest.mock('../git-sync.loader', () => ({ })), })); -import * as Y from 'yjs'; import { GitmostDataSourceService } from './gitmost-datasource.service'; -// The body-write seam picks 2-way vs 3-way merge based on whether a base doc was -// built. We spy on the real module exports (ts-jest CJS output references them -// through the namespace object, so the spies intercept the SUT's calls) and let -// them call through, so we assert WHICH merge ran without mocking the behaviour. -import * as bodyMerge from './yjs-body-merge'; // Focused unit/contract test for the native GitSyncClient adapter. // No DB, no real collab server: the repos/services/gateway are mocked and we @@ -73,16 +67,9 @@ interface Mocks { movePage: AnyMock; removePage: AnyMock; }; - collabGateway: { openDirectConnection: AnyMock }; + collabGateway: { writePageBody: AnyMock }; // Minimal Kysely-ish chainable mock for the direct-query paths. db: any; - // Captured collab connection (the fake conn the gateway returns). - conn: { - transact: AnyMock; - disconnect: AnyMock; - context?: any; - capturedFn?: (doc: any) => void; - }; } function makeQueryBuilder(rows: any[]) { @@ -99,13 +86,6 @@ function build(rows: any[] = []): { service: GitmostDataSourceService; mocks: Mocks; } { - const conn: Mocks['conn'] = { - transact: jest.fn(async (fn: (doc: any) => void) => { - conn.capturedFn = fn; - }), - disconnect: jest.fn(async () => undefined), - }; - const mocks: Mocks = { pageRepo: { findById: jest.fn(), @@ -120,15 +100,11 @@ function build(rows: any[] = []): { removePage: jest.fn(async () => undefined), }, collabGateway: { - openDirectConnection: jest.fn(async (_name: string, ctx: any) => { - conn.context = ctx; - return conn; - }), + writePageBody: jest.fn(async () => undefined), }, db: { selectFrom: jest.fn(() => makeQueryBuilder(rows)), }, - conn, }; const service = new GitmostDataSourceService( @@ -232,7 +208,7 @@ describe('GitmostDataSourceService', () => { }); describe('importPageMarkdown', () => { - it('parses md, converts to ProseMirror, and writes body via collab', async () => { + it('parses md, converts to ProseMirror, and routes the body write to the owning instance', async () => { const { service, mocks } = build(); mocks.pageRepo.findById.mockResolvedValue({ id: 'p1', @@ -243,77 +219,50 @@ describe('GitmostDataSourceService', () => { .bind(CTX) .importPageMarkdown('p1', '# Hello\n\nworld'); - // writeBody opened a collab connection tagged git-sync + service user. - expect(mocks.collabGateway.openDirectConnection).toHaveBeenCalledTimes(1); - const [docName, ctx] = mocks.collabGateway.openDirectConnection.mock - .calls[0]; + // writeBody routes through writePageBody (NOT openDirectConnection): the + // merge must run on the instance that owns the live doc so a connected + // editor converges instead of silently reverting the change. The service + // user rides on the payload as the responsible author. + expect(mocks.collabGateway.writePageBody).toHaveBeenCalledTimes(1); + const [docName, payload] = mocks.collabGateway.writePageBody.mock.calls[0]; expect(docName).toBe('page.p1'); - expect(ctx.actor).toBe('git-sync'); - expect(ctx.user).toEqual({ id: 'svc-user' }); - - // transact ran and connection was disconnected (finally). - expect(mocks.conn.transact).toHaveBeenCalledTimes(1); - expect(mocks.conn.disconnect).toHaveBeenCalledTimes(1); - expect(typeof mocks.conn.capturedFn).toBe('function'); + expect(payload.userId).toBe('svc-user'); + // A converted ProseMirror doc was passed; no base on a plain import. + expect(payload.prosemirrorJson).toEqual( + expect.objectContaining({ type: 'doc' }), + ); + expect(payload.baseProsemirrorJson).toBeUndefined(); expect(res.updatedAt).toBe('2026-06-20T11:00:00.000Z'); }); - it('crash-safe: the incoming doc is built before the connection opens, and the captured merge applies content', async () => { - // The incoming Yjs doc is built BEFORE the connection opens, so a transform - // failure can never mutate the live body. Here we run the captured transact - // callback (the block-level merge) against a REAL empty Y.Doc and confirm it - // ends up with content — the write produces a non-empty body, never wipes it. - const { service, mocks } = build(); - mocks.pageRepo.findById.mockResolvedValue({ - id: 'p1', - updatedAt: new Date('2026-06-20T11:00:00.000Z'), - }); - await service.bind(CTX).importPageMarkdown('p1', '# Hi\n\nthere'); - - const realDoc = new Y.Doc(); - expect(() => mocks.conn.capturedFn?.(realDoc)).not.toThrow(); - // The body fragment is non-empty: the incoming block was merged in. - expect(realDoc.getXmlFragment('default').length).toBeGreaterThan(0); - }); - // The 2-way path (no base) is covered above; this exercises the THREE-WAY - // branch that only fires when a `baseMarkdown` is supplied (review #5). + // branch that only fires when a `baseMarkdown` is supplied (review #5). The + // merge dispatch itself now lives in the collab handler (gitSyncWriteBody); + // here we assert the datasource forwards the base so the owning instance can + // run the 3-way reconcile. describe('with a baseMarkdown (three-way merge)', () => { - afterEach(() => jest.restoreAllMocks()); - - it('builds a base doc and dispatches to mergeXmlFragments3Way (not the 2-way merge)', async () => { + it('forwards the parsed base body so the owning instance can three-way merge', async () => { const { service, mocks } = build(); mocks.pageRepo.findById.mockResolvedValue({ id: 'p1', updatedAt: new Date('2026-06-20T11:00:00.000Z'), }); - // Spy through to the real implementations so we observe the dispatch. - const merge3 = jest.spyOn(bodyMerge, 'mergeXmlFragments3Way'); - const merge2 = jest.spyOn(bodyMerge, 'mergeXmlFragments'); await service .bind(CTX) .importPageMarkdown('p1', '# Full\n\ngit', '# Base\n\nbase'); - // The body write was staged through collab as before. - expect(mocks.conn.transact).toHaveBeenCalledTimes(1); - expect(typeof mocks.conn.capturedFn).toBe('function'); - - // Running the captured merge against a real live doc takes the 3-way path: - // the base was parsed/built and the 3-way helper is invoked with three - // fragments; the 2-way fallback is NOT used. - const liveDoc = new Y.Doc(); - expect(() => mocks.conn.capturedFn?.(liveDoc)).not.toThrow(); - - expect(merge3).toHaveBeenCalledTimes(1); - expect(merge2).not.toHaveBeenCalled(); - const [liveFrag, gitFrag, baseFrag] = merge3.mock.calls[0]; - expect(liveFrag).toBeInstanceOf(Y.XmlFragment); - expect(gitFrag).toBeInstanceOf(Y.XmlFragment); - // The third arg is the BASE fragment — proof the base markdown was parsed - // and converted into its own doc for the common-ancestor comparison. - expect(baseFrag).toBeInstanceOf(Y.XmlFragment); + expect(mocks.collabGateway.writePageBody).toHaveBeenCalledTimes(1); + const [, payload] = mocks.collabGateway.writePageBody.mock.calls[0]; + // Both the incoming body AND the last-synced base were converted and + // forwarded — proof the 3-way common-ancestor is plumbed through. + expect(payload.prosemirrorJson).toEqual( + expect.objectContaining({ type: 'doc' }), + ); + expect(payload.baseProsemirrorJson).toEqual( + expect.objectContaining({ type: 'doc' }), + ); }); }); }); @@ -337,9 +286,9 @@ describe('GitmostDataSourceService', () => { { spaceId: 'space-1', title: 'Title', parentPageId: 'parent-1' }, { actor: 'git-sync', aiChatId: null }, ); - expect(mocks.collabGateway.openDirectConnection).toHaveBeenCalledWith( + expect(mocks.collabGateway.writePageBody).toHaveBeenCalledWith( 'page.new-id', - expect.objectContaining({ actor: 'git-sync' }), + expect.objectContaining({ userId: 'svc-user' }), ); expect(res).toEqual({ data: { id: 'new-id' }, diff --git a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts index 0978be35..caf7364f 100644 --- a/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts +++ b/apps/server/src/integrations/git-sync/services/gitmost-datasource.service.ts @@ -1,5 +1,4 @@ import { Injectable, Logger, NotFoundException } from '@nestjs/common'; -import { TiptapTransformer } from '@hocuspocus/transformer'; import { generateJitteredKeyBetween } from 'fractional-indexing-jittered'; import type { GitSyncClient, @@ -12,8 +11,6 @@ import { InjectKysely } from 'nestjs-kysely'; import { KyselyDB } from '@docmost/db/types/kysely.types'; import { PageService } from '../../../core/page/services/page.service'; import { CollaborationGateway } from '../../../collaboration/collaboration.gateway'; -import { tiptapExtensions } from '../../../collaboration/collaboration.util'; -import { mergeXmlFragments, mergeXmlFragments3Way } from './yjs-body-merge'; import { AuthProvenanceData } from '../../../common/decorators/auth-provenance.decorator'; /** @@ -387,15 +384,27 @@ export class GitmostDataSourceService { // --- linchpin: native body write (§3.3) ----------------------------------- /** - * In-process body write — no loopback websocket, no service-user token. Mirrors - * the collab handler's 'replace' operation exactly: open a direct connection, - * drop the existing fragment, apply the converted doc, then disconnect. + * In-process body write — no loopback websocket, no service-user token. * - * The `{ actor: 'git-sync', user: { id: userId } }` context flows into + * Routes the write through `CollaborationGateway.writePageBody`, which applies + * the block-level MERGE on the instance that OWNS the live Y.Doc (via the + * custom-event channel) rather than opening a direct connection on this + * (api/worker) instance. That distinction is load-bearing: when an editor is + * connected to a different collab instance/process, a direct connection here + * mutates a SEPARATE, detached doc the editor never sees — the editor's next + * autosave then silently REVERTS the git change (data loss). Running on the + * owning instance broadcasts the merge as a Yjs update so the editor converges + * (see CollaborationGateway.writePageBody for the full rationale). + * + * The merge itself stays a block-level reconcile, not a full-body replace + * (review #5): only changed blocks are touched, concurrently-edited blocks are + * left untouched, and an unchanged resync is a 0-op write. With a `base` (the + * last-synced version) it is a THREE-WAY merge so a block ONLY the human + * changed is kept and a block ONLY git changed is taken (conflicts -> git); + * without a base (e.g. createPage) it falls back to the 2-way merge. The + * `{ actor: 'git-sync', user: { id: userId } }` context flows into * PersistenceExtension.onStoreDocument, which persists ydoc+content+textContent, - * stamps `lastUpdatedSource = 'git-sync'`, and broadcasts `page.updated`. The - * service user (`user.id`) stays the responsible `lastUpdatedById`; the actor - * marks provenance. + * stamps `lastUpdatedSource = 'git-sync'`, and broadcasts `page.updated`. */ private async writeBody( pageId: string, @@ -404,51 +413,10 @@ export class GitmostDataSourceService { baseProsemirrorJson?: unknown, ): Promise { const documentName = `page.${pageId}`; - - // Build the incoming (and base) Yjs docs BEFORE opening the connection / - // touching the live doc. If a transform throws (a malformed/unsupported doc) - // we must NOT have mutated the live body — otherwise a conversion failure - // could leave the page empty (review #5 — crash-safe conversion). - const targetDoc = TiptapTransformer.toYdoc( + await this.collabGateway.writePageBody(documentName, { prosemirrorJson, - 'default', - tiptapExtensions, - ); - const baseDoc = - baseProsemirrorJson != null - ? TiptapTransformer.toYdoc(baseProsemirrorJson, 'default', tiptapExtensions) - : null; - - const conn = await this.collabGateway.openDirectConnection(documentName, { - actor: 'git-sync', - // PersistenceExtension reads `context.user.id` for lastUpdatedById, so the - // service user is required on the context (unlike the bare `{ actor }` - // sketch in issue #194). - user: { id: userId }, + baseProsemirrorJson, + userId, }); - try { - await conn.transact((doc) => { - const liveFrag = doc.getXmlFragment('default'); - const targetFrag = targetDoc.getXmlFragment('default'); - // Block-level MERGE rather than a full-body replace (review #5): diff the - // live body against the incoming git body and apply only the blocks that - // actually changed; concurrently-edited blocks are left untouched and an - // unchanged resync is a 0-op write. With a `base` (the last-synced - // version) do a THREE-WAY merge so a block ONLY the human changed is kept - // and a block ONLY git changed is taken (conflicts -> git). Without a base - // (e.g. createPage), fall back to the 2-way merge. - if (baseDoc) { - mergeXmlFragments3Way( - liveFrag, - targetFrag, - baseDoc.getXmlFragment('default'), - ); - } else { - mergeXmlFragments(liveFrag, targetFrag); - } - }); - } finally { - await conn.disconnect(); - } } }