diff --git a/apps/server/src/collaboration/processors/history.processor.spec.ts b/apps/server/src/collaboration/processors/history.processor.spec.ts index 79d403f4..4e87a322 100644 --- a/apps/server/src/collaboration/processors/history.processor.spec.ts +++ b/apps/server/src/collaboration/processors/history.processor.spec.ts @@ -136,16 +136,26 @@ describe('HistoryProcessor.process', () => { await proc.process(buildJob()); expect(collabHistory.popContributors).toHaveBeenCalledWith(PAGE_ID); + // #370 F3/F9 — the snapshot decision runs under a page-row lock. Pin the lock + // structurally so a refactor that drops withLock/trx (silently reintroducing + // the TOCTOU double-insert) turns this red. The tx stub is { __trx: true }. + expect(pageRepo.findById).toHaveBeenCalledWith( + PAGE_ID, + expect.objectContaining({ withLock: true, trx: { __trx: true } }), + ); + // #370 F7 — addPageWatchers MUST receive the trx, or its FK-check runs on a + // separate connection and self-deadlocks against our FOR UPDATE. Asserting + // the trx arg here is exactly what would have caught that regression. expect(watcherService.addPageWatchers).toHaveBeenCalledWith( ['u1', 'u2'], PAGE_ID, SPACE_ID, WORKSPACE_ID, + { __trx: true }, ); expect(pageHistoryRepo.saveHistory).toHaveBeenCalledWith( expect.objectContaining({ id: PAGE_ID }), - // #370 F3 — saveHistory now runs inside the locked tx, so it carries trx. - expect.objectContaining({ contributorIds: ['u1', 'u2'], kind: 'idle' }), + { contributorIds: ['u1', 'u2'], kind: 'idle', trx: { __trx: true } }, ); expect(generalQueue.add).toHaveBeenCalledWith( QueueJob.PAGE_BACKLINKS, @@ -197,6 +207,48 @@ describe('HistoryProcessor.process', () => { ]); }); + it('COMMIT failure (throw outside the tx callback) → contributors RESTORED', async () => { + // #370 F8 — a commit-time failure throws OUTSIDE the callback, so the inner + // try/catch does not run; the outer catch must restore the popped set (else a + // BullMQ retry writes an unattributed version). Use a db whose execute() runs + // the callback THEN throws, simulating a commit abort. + pageHistoryRepo.findPageLastHistory.mockResolvedValue({ + content: { type: 'doc', content: [] }, + }); + const commitFail = { + transaction: () => ({ + execute: async (fn: (trx: any) => Promise) => { + await fn({ __trx: true }); // callback succeeds (saveHistory ok) + throw new Error('commit aborted'); // ...but the COMMIT fails + }, + }), + }; + const procCommitFail = new HistoryProcessor( + pageHistoryRepo as any, + pageRepo as any, + collabHistory as any, + watcherService as any, + commitFail as any, + notificationQueue as any, + generalQueue as any, + ); + jest + .spyOn(procCommitFail['logger'], 'error') + .mockImplementation(() => undefined); + + await expect(procCommitFail.process(buildJob())).rejects.toThrow( + 'commit aborted', + ); + // The inner catch did NOT run (save succeeded), so only the outer catch can + // restore — assert it did. + expect(collabHistory.addContributors).toHaveBeenCalledWith(PAGE_ID, [ + 'u1', + 'u2', + ]); + // And the post-snapshot queue work must NOT have run (we rethrew). + expect(generalQueue.add).not.toHaveBeenCalled(); + }); + it('backlinks + notification queue failures are swallowed (history still committed)', async () => { pageHistoryRepo.findPageLastHistory.mockResolvedValue({ content: { type: 'doc', content: [] }, diff --git a/apps/server/src/collaboration/processors/history.processor.ts b/apps/server/src/collaboration/processors/history.processor.ts index 5d98dec8..9c4bf7a6 100644 --- a/apps/server/src/collaboration/processors/history.processor.ts +++ b/apps/server/src/collaboration/processors/history.processor.ts @@ -71,57 +71,85 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy { let contributorIds: string[] = []; let snapshotWritten = false; let lastHistoryContent: unknown; + // #370 F8 — the contributor set popped from Redis (destructive SPOP) must be + // restored if the snapshot does not durably land. The inner try/catch only + // covers a throw INSIDE the callback; a COMMIT failure (connection drop, + // serialization/deadlock abort on commit — the transient class the epic + // already retries) throws OUTSIDE it, rolling the snapshot back while the + // pop is already gone. We track the popped set here and restore it in the + // outer catch so a BullMQ retry re-attributes the version. addContributors + // is an idempotent Redis SADD, so a double-restore is harmless. + let poppedForRestore: string[] = []; - await executeTx(this.db, async (trx) => { - const lockedPage = await this.pageRepo.findById(pageId, { - includeContent: true, - withLock: true, - trx, - }); - if (!lockedPage) return; - - const lastHistory = await this.pageHistoryRepo.findPageLastHistory( - pageId, - { includeContent: true, trx }, - ); - lastHistoryContent = lastHistory?.content; - - if (!lastHistory && isEmptyParagraphDoc(lockedPage.content as any)) { - this.logger.debug( - `Skipping first history for page ${pageId}: empty content`, - ); - return; - } - - if ( - lastHistory && - isDeepStrictEqual(lastHistory.content, lockedPage.content) - ) { - return; // already snapshotted at this content — nothing to write - } - - contributorIds = await this.collabHistory.popContributors(pageId); - try { - await this.watcherService.addPageWatchers( - contributorIds, - pageId, - lockedPage.spaceId, - lockedPage.workspaceId, - ); - - // #370 — every job on this queue is a trailing idle-flush autosnapshot. - await this.pageHistoryRepo.saveHistory(lockedPage, { - contributorIds, - kind: job.data.kind ?? 'idle', + try { + await executeTx(this.db, async (trx) => { + const lockedPage = await this.pageRepo.findById(pageId, { + includeContent: true, + withLock: true, trx, }); - snapshotWritten = true; - this.logger.debug(`History created for page: ${pageId}`); - } catch (err) { - await this.collabHistory.addContributors(pageId, contributorIds); - throw err; + if (!lockedPage) return; + + const lastHistory = await this.pageHistoryRepo.findPageLastHistory( + pageId, + { includeContent: true, trx }, + ); + lastHistoryContent = lastHistory?.content; + + if (!lastHistory && isEmptyParagraphDoc(lockedPage.content as any)) { + this.logger.debug( + `Skipping first history for page ${pageId}: empty content`, + ); + return; + } + + if ( + lastHistory && + isDeepStrictEqual(lastHistory.content, lockedPage.content) + ) { + return; // already snapshotted at this content — nothing to write + } + + contributorIds = await this.collabHistory.popContributors(pageId); + poppedForRestore = contributorIds; + try { + // Pass `trx` so the watcher insert's FK check (FOR KEY SHARE on + // pages[pageId]) runs on the SAME connection that already holds the + // FOR UPDATE lock from findById — otherwise it takes the FK lock on a + // separate pool connection and self-deadlocks against our own tx. + await this.watcherService.addPageWatchers( + contributorIds, + pageId, + lockedPage.spaceId, + lockedPage.workspaceId, + trx, + ); + + // #370 — every job on this queue is a trailing idle-flush autosnapshot. + await this.pageHistoryRepo.saveHistory(lockedPage, { + contributorIds, + kind: job.data.kind ?? 'idle', + trx, + }); + snapshotWritten = true; + this.logger.debug(`History created for page: ${pageId}`); + } catch (err) { + await this.collabHistory.addContributors(pageId, contributorIds); + poppedForRestore = []; + throw err; + } + }); + } catch (err) { + // A throw here means the tx did NOT commit (callback threw, or the commit + // itself failed and rolled back). If we popped contributors and the inner + // catch did not already restore them, restore now so the retry keeps + // attribution. snapshotWritten is irrelevant: it is set before commit, so + // it can be true even when the commit rolled the snapshot back. + if (poppedForRestore.length) { + await this.collabHistory.addContributors(pageId, poppedForRestore); } - }); + throw err; + } // No snapshot written (page vanished / empty-first / unchanged content) → // clear the contributor set for the skip cases and stop.