diff --git a/apps/server/src/collaboration/extensions/persistence.extension.spec.ts b/apps/server/src/collaboration/extensions/persistence.extension.spec.ts index 92756aa8..5d798aa9 100644 --- a/apps/server/src/collaboration/extensions/persistence.extension.spec.ts +++ b/apps/server/src/collaboration/extensions/persistence.extension.spec.ts @@ -314,7 +314,7 @@ describe('PersistenceExtension', () => { expect(pageRepo.updatePage).not.toHaveBeenCalled(); }); - it('seeds + persists when the persisted ydoc lacks a title fragment', async () => { + it('seeds + persists under a lock when the persisted ydoc lacks a title fragment', async () => { const src = TiptapTransformer.toYdoc(bodyJson, 'default', tiptapExtensions); const page = { id: 'PAGE_ID', @@ -322,6 +322,7 @@ describe('PersistenceExtension', () => { ydoc: Buffer.from(Y.encodeStateAsUpdate(src)), content: null, }; + // Both the cheap pre-check and the locked re-read return the same row. pageRepo.findById.mockResolvedValue(page); const document = { isEmpty: () => true }; @@ -330,14 +331,23 @@ describe('PersistenceExtension', () => { document, } as any); + // The locked re-read must take the row lock inside the tx. + const lockedReadCall = pageRepo.findById.mock.calls.find( + (c: any[]) => c[1]?.withLock, + ); + expect(lockedReadCall).toBeDefined(); + expect(lockedReadCall[1].trx).toBe(trx); + expect(pageRepo.updatePage).toHaveBeenCalledTimes(1); const call = pageRepo.updatePage.mock.calls[0]; expect(Buffer.isBuffer(call[0].ydoc)).toBe(true); expect(call[1]).toBe('PAGE_ID'); + // Persist must run inside the transaction. + expect(call[2]).toBe(trx); expect(result).toBeTruthy(); }); - it('does NOT persist when the ydoc already has a title fragment', async () => { + it('does NOT lock or persist when the ydoc already has a title fragment', async () => { const src = TiptapTransformer.toYdoc(bodyJson, 'default', tiptapExtensions); Y.applyUpdate(src, Y.encodeStateAsUpdate(buildTitleSeedYdoc('Has Title'))); const page = { @@ -354,11 +364,14 @@ describe('PersistenceExtension', () => { document, } as any); + // Hot path: only the cheap lock-free read, no locked re-read, no write. + expect(pageRepo.findById).toHaveBeenCalledTimes(1); + expect(pageRepo.findById.mock.calls[0][1]?.withLock).toBeFalsy(); expect(pageRepo.updatePage).not.toHaveBeenCalled(); expect(result).toBeTruthy(); }); - it('converts legacy content -> ydoc and persists the built doc', async () => { + it('converts legacy content -> ydoc inside a tx and persists a {ydoc} Buffer', async () => { const page = { id: 'PAGE_ID', title: 'T', @@ -373,8 +386,98 @@ describe('PersistenceExtension', () => { document, } as any); + const lockedReadCall = pageRepo.findById.mock.calls.find( + (c: any[]) => c[1]?.withLock, + ); + expect(lockedReadCall).toBeDefined(); + expect(lockedReadCall[1].trx).toBe(trx); + expect(pageRepo.updatePage).toHaveBeenCalledTimes(1); + const call = pageRepo.updatePage.mock.calls[0]; + expect(Buffer.isBuffer(call[0].ydoc)).toBe(true); + expect(call[2]).toBe(trx); + // The rebuilt doc carries the body. + expect(JSON.stringify(cloneOut(result))).toContain('hello'); + }); + + it('SKIPS rebuild when the locked re-read shows the ydoc was already healed', async () => { + // Simulate a concurrent process: the cheap pre-check sees ydoc=null (legacy + // rebuild path), but by the time we hold the lock another process has + // already persisted a healthy ydoc. We must adopt it, not rebuild/clobber. + const healed = TiptapTransformer.toYdoc( + { type: 'doc', content: [{ type: 'paragraph', content: [{ type: 'text', text: 'healed' }] }] }, + 'default', + tiptapExtensions, + ); + Y.applyUpdate(healed, Y.encodeStateAsUpdate(buildTitleSeedYdoc('Healed Title'))); + const healedYdoc = Buffer.from(Y.encodeStateAsUpdate(healed)); + + const preCheck = { id: 'PAGE_ID', title: 'T', ydoc: null, content: bodyJson }; + const lockedRow = { + id: 'PAGE_ID', + title: 'Healed Title', + ydoc: healedYdoc, + content: bodyJson, + }; + pageRepo.findById + .mockResolvedValueOnce(preCheck) // cheap pre-check + .mockResolvedValueOnce(lockedRow); // locked re-read + + const document = { isEmpty: () => true }; + const result = await ext.onLoadDocument({ + documentName: 'page.PAGE_ID', + document, + } as any); + + // The healthy ydoc had a title fragment already, so nothing was rebuilt or + // seeded -> no clobbering write. + expect(pageRepo.updatePage).not.toHaveBeenCalled(); + // The returned doc is the healed body, NOT a fresh rebuild of bodyJson. + expect(JSON.stringify(cloneOut(result))).toContain('healed'); + }); + + it('REJECTS the load when the rebuild persist fails (does not return an unpersisted doc)', async () => { + const page = { id: 'PAGE_ID', title: 'T', ydoc: null, content: bodyJson }; + pageRepo.findById.mockResolvedValue(page); + pageRepo.updatePage.mockRejectedValue(new Error('db down')); + const errSpy = jest + .spyOn((ext as any).logger, 'error') + .mockImplementation(() => undefined); + + const document = { isEmpty: () => true }; + await expect( + ext.onLoadDocument({ + documentName: 'page.PAGE_ID', + document, + } as any), + ).rejects.toThrow('db down'); + expect(errSpy).toHaveBeenCalled(); + }); + + it('seed-only persist FAILURE returns the doc from the existing ydoc (no throw)', async () => { + const src = TiptapTransformer.toYdoc(bodyJson, 'default', tiptapExtensions); + const page = { + id: 'PAGE_ID', + title: 'Legacy Title', + ydoc: Buffer.from(Y.encodeStateAsUpdate(src)), + content: null, + }; + pageRepo.findById.mockResolvedValue(page); + pageRepo.updatePage.mockRejectedValue(new Error('db down')); + const errSpy = jest + .spyOn((ext as any).logger, 'error') + .mockImplementation(() => undefined); + + const document = { isEmpty: () => true }; + const result = await ext.onLoadDocument({ + documentName: 'page.PAGE_ID', + document, + } as any); + + // Non-fatal: we fall back to the doc loaded from the existing page.ydoc. expect(result).toBeTruthy(); + expect(JSON.stringify(cloneOut(result))).toContain('hello'); + expect(errSpy).toHaveBeenCalled(); }); }); }); diff --git a/apps/server/src/collaboration/extensions/persistence.extension.ts b/apps/server/src/collaboration/extensions/persistence.extension.ts index 5075faab..e30e0fdc 100644 --- a/apps/server/src/collaboration/extensions/persistence.extension.ts +++ b/apps/server/src/collaboration/extensions/persistence.extension.ts @@ -116,6 +116,10 @@ export class PersistenceExtension implements Extension { return; } + // Cheap, lock-free pre-check (hot path stays lock-free). It tells us whether + // any heal (legacy rebuild and/or title seed) is needed; the heal itself + // re-reads the row FOR UPDATE and re-validates inside a transaction so it + // runs exactly once (see healUnderLock). const page = await this.pageRepo.findById(pageId, { includeContent: true, includeYdoc: true, @@ -127,25 +131,44 @@ export class PersistenceExtension implements Extension { } if (page.ydoc) { - this.logger.debug(`ydoc loaded from db: ${pageId}`); - const doc = new Y.Doc(); - const dbState = new Uint8Array(page.ydoc); - - Y.applyUpdate(doc, dbState); + Y.applyUpdate(doc, new Uint8Array(page.ydoc)); // Legacy pages persisted their title only in the `page.title` column; the - // ydoc has no 'title' fragment. Seed it once so the client's - // collaborative title editor can show/edit the title. This runs inside the - // ydoc branch (NOT gated by the top-level 'default' body guard) because a - // body that loaded from page.ydoc can still lack a title fragment. The - // seed persists back to the DB so it is one-shot per page. - const seeded = this.seedTitleFragment(doc, page.title); - if (seeded) { - await this.persistYdoc(doc, pageId); + // ydoc has no 'title' fragment. Decide cheaply (no lock) whether a seed is + // needed by inspecting the loaded doc's 'title' fragment. A seed is needed + // only when that fragment is empty AND there is a non-empty column title. + let titleSeedNeeded = false; + try { + const titleFrag = doc.get('title', Y.XmlFragment); + titleSeedNeeded = titleFrag.length === 0 && !!page.title?.trim(); + } catch (err) { + // A malformed title fragment must not break loading; skip the seed. + this.logger.warn(`failed to inspect title fragment: ${err?.['message']}`); + titleSeedNeeded = false; } - return doc; + if (!titleSeedNeeded) { + // Fully healthy: a ydoc with a title fragment (or nothing to seed). + this.logger.debug(`ydoc loaded from db: ${pageId}`); + return doc; + } + + // SEED-ONLY heal: a valid page.ydoc already exists; we only need to add the + // title fragment. If the persist fails we must NOT hand out an unpersisted + // fresh-client-id seed (it could later duplicate the title), so we fall + // back to the healthy doc loaded from the EXISTING page.ydoc, without the + // seed. The title just won't render until a later successful heal — + // non-fatal, non-corrupting. + try { + return await this.healUnderLock(pageId); + } catch (err) { + this.logger.error( + `Failed to persist seeded ydoc for page ${pageId}; serving existing ydoc without title seed`, + err, + ); + return doc; + } } // NOTE (offline-sync M1, Goal 2): this per-load self-heal converts + @@ -156,37 +179,88 @@ export class PersistenceExtension implements Extension { // which a Kysely SQL migration cannot run; no runnable-task/CLI convention // exists in this repo yet, so we deliberately avoid a fragile migration. // - // If no ydoc state in db, convert the JSON in page.content to a Y.Doc. + // If no ydoc state in db, REBUILD a Y.Doc from the JSON in page.content under + // a row lock (see healUnderLock). if (page.content) { - this.logger.debug(`converting json to ydoc: ${pageId}`); - - const ydoc = TiptapTransformer.toYdoc( - page.content, - 'default', - tiptapExtensions, - ); - - // Seed the title fragment for legacy pages here too, so the freshly built - // ydoc carries the title from the page.title column. - this.seedTitleFragment(ydoc, page.title); - - // DUPLICATION TRAP (classic Yjs): this rebuild produces a ydoc with FRESH - // Yjs client-ids each time it runs. If we returned it WITHOUT persisting, - // a later load would rebuild again with different client-ids, and a - // long-offline client holding a ydoc derived from an EARLIER rebuild could - // merge its update and DUPLICATE all the content (the two states share no - // common ancestor). Persist the built ydoc to page.ydoc immediately so - // every subsequent load takes the page.ydoc branch above and this rebuild - // never runs again for this page (one-shot per page). - await this.persistYdoc(ydoc, pageId); - - return ydoc; + // REBUILD heal: surface failures. If the persist fails we REFUSE the load + // (re-throw) rather than hand out an unpersisted fresh-client-id rebuild — + // returning it would re-arm the duplication trap. A transient DB failure + // means the client reconnects and retries: correctness over availability. + try { + return await this.healUnderLock(pageId); + } catch (err) { + this.logger.error( + `Failed to persist rebuilt ydoc for page ${pageId}; refusing load`, + err, + ); + throw err; + } } this.logger.debug(`creating fresh ydoc: ${pageId}`); return new Y.Doc(); } + /** + * Serialize the legacy self-heal (rebuild from page.content and/or seed the + * title fragment, then persist) so it runs exactly ONCE per page, closing the + * Yjs duplication trap. Both TiptapTransformer.toYdoc and buildTitleSeedYdoc + * mint FRESH Yjs client-ids every call, so two concurrent rebuilds (the API + * process via openDirectConnection AND the standalone collab process both + * seeing `ydoc IS NULL`) could each persist a different-client-id state and let + * a long-offline client merge-and-duplicate. We prevent that by re-reading the + * row FOR UPDATE inside a transaction and re-validating state under the lock: + * whoever wins the lock heals; the loser observes the healthy `ydoc` and adopts + * it instead of rebuilding. The persist happens IN THE SAME TX, so a failed + * write rolls back and propagates out (the caller then decides refuse vs. + * fall-back). + */ + private async healUnderLock(pageId: string): Promise { + return executeTx(this.db, async (trx) => { + const locked = await this.pageRepo.findById(pageId, { + withLock: true, + includeContent: true, + includeYdoc: true, + trx, + }); + + const doc = new Y.Doc(); + let rebuilt = false; + + if (locked?.ydoc) { + // Another process already healed (or the page always had a ydoc): adopt + // the healthy persisted state, do NOT rebuild. + Y.applyUpdate(doc, new Uint8Array(locked.ydoc)); + } else if (locked?.content) { + this.logger.debug(`converting json to ydoc: ${pageId}`); + const built = TiptapTransformer.toYdoc( + locked.content, + 'default', + tiptapExtensions, + ); + Y.applyUpdate(doc, Y.encodeStateAsUpdate(built)); + rebuilt = true; + } + // else: no ydoc and no content -> a fresh empty doc. + + // Idempotent, emptiness-guarded title seed (safe to call always). + const seeded = this.seedTitleFragment(doc, locked?.title ?? null); + + if (rebuilt || seeded) { + // Persist IN THE SAME TX. If this throws, the tx rolls back and the + // error propagates out of executeTx to the caller. + await this.pageRepo.updatePage( + { ydoc: Buffer.from(Y.encodeStateAsUpdate(doc)) }, + pageId, + trx, + ); + this.logger.debug(`persisted rebuilt/seeded ydoc: ${pageId}`); + } + + return doc; + }); + } + /** * Seed the 'title' fragment of `doc` from the `page.title` column for legacy * pages whose persisted ydoc has no title fragment yet. @@ -215,28 +289,6 @@ export class PersistenceExtension implements Extension { } } - /** - * Persist the current state of `doc` into page.ydoc. Used by the one-shot - * rebuild/seed self-heal in onLoadDocument so the conversion is durable and - * never repeats. Defensive (try/catch + log): a persistence failure here must - * NOT break document loading — the in-memory doc is still returned and the - * next store will persist it anyway. - */ - private async persistYdoc(doc: Y.Doc, pageId: string): Promise { - try { - await this.pageRepo.updatePage( - { ydoc: Buffer.from(Y.encodeStateAsUpdate(doc)) }, - pageId, - ); - this.logger.debug(`persisted rebuilt/seeded ydoc: ${pageId}`); - } catch (err) { - this.logger.error( - `Failed to persist rebuilt/seeded ydoc for page ${pageId}`, - err, - ); - } - } - async onStoreDocument(data: onStoreDocumentPayload) { const { documentName, document, context } = data;