fix(#370): thread trx into addPageWatchers (F7 self-deadlock) + restore contributors on commit-failure (F8) + assert the lock (F9) (review round 2)
The round-1 F3 fix (wrapping the processor's find+save in a locked tx) itself
introduced two regressions:
F7 [CRITICAL] addPageWatchers ran WITHOUT trx inside the tx holding FOR UPDATE on
pages[pageId]. The watcher insert's FK check takes FOR KEY SHARE on the same row,
but on a DIFFERENT pool connection — a true self-deadlock (our tx connection sits
idle-in-transaction awaiting the JS await, the insert connection blocks on the
lock). Now passes trx (addPageWatchers already accepts it and routes it through
insertMany), so the FK lock is taken on the connection that already holds FOR
UPDATE — no self-conflict.
F8 [WARNING] popContributors is a destructive Redis SPOP; the inner catch only
restores on a throw INSIDE the callback. A COMMIT failure throws OUTSIDE it,
rolling the snapshot back while the pop is gone → a retry writes an unattributed
version. Now tracks the popped set and restores it in an outer catch (idempotent
SADD), leaving BullMQ to retry with attribution intact.
F9 [WARNING] The spec asserted saveHistory args with a loosened objectContaining
that stopped verifying trx, and never pinned withLock/trx on findById or the trx
on addPageWatchers — which is exactly why F7 slipped. Restored the exact
saveHistory(trx) assertion and added findById({withLock,trx}) + addPageWatchers
trx assertions (the latter would have caught F7), plus a commit-failure test.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -136,16 +136,26 @@ describe('HistoryProcessor.process', () => {
|
||||
await proc.process(buildJob());
|
||||
|
||||
expect(collabHistory.popContributors).toHaveBeenCalledWith(PAGE_ID);
|
||||
// #370 F3/F9 — the snapshot decision runs under a page-row lock. Pin the lock
|
||||
// structurally so a refactor that drops withLock/trx (silently reintroducing
|
||||
// the TOCTOU double-insert) turns this red. The tx stub is { __trx: true }.
|
||||
expect(pageRepo.findById).toHaveBeenCalledWith(
|
||||
PAGE_ID,
|
||||
expect.objectContaining({ withLock: true, trx: { __trx: true } }),
|
||||
);
|
||||
// #370 F7 — addPageWatchers MUST receive the trx, or its FK-check runs on a
|
||||
// separate connection and self-deadlocks against our FOR UPDATE. Asserting
|
||||
// the trx arg here is exactly what would have caught that regression.
|
||||
expect(watcherService.addPageWatchers).toHaveBeenCalledWith(
|
||||
['u1', 'u2'],
|
||||
PAGE_ID,
|
||||
SPACE_ID,
|
||||
WORKSPACE_ID,
|
||||
{ __trx: true },
|
||||
);
|
||||
expect(pageHistoryRepo.saveHistory).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ id: PAGE_ID }),
|
||||
// #370 F3 — saveHistory now runs inside the locked tx, so it carries trx.
|
||||
expect.objectContaining({ contributorIds: ['u1', 'u2'], kind: 'idle' }),
|
||||
{ contributorIds: ['u1', 'u2'], kind: 'idle', trx: { __trx: true } },
|
||||
);
|
||||
expect(generalQueue.add).toHaveBeenCalledWith(
|
||||
QueueJob.PAGE_BACKLINKS,
|
||||
@@ -197,6 +207,48 @@ describe('HistoryProcessor.process', () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it('COMMIT failure (throw outside the tx callback) → contributors RESTORED', async () => {
|
||||
// #370 F8 — a commit-time failure throws OUTSIDE the callback, so the inner
|
||||
// try/catch does not run; the outer catch must restore the popped set (else a
|
||||
// BullMQ retry writes an unattributed version). Use a db whose execute() runs
|
||||
// the callback THEN throws, simulating a commit abort.
|
||||
pageHistoryRepo.findPageLastHistory.mockResolvedValue({
|
||||
content: { type: 'doc', content: [] },
|
||||
});
|
||||
const commitFail = {
|
||||
transaction: () => ({
|
||||
execute: async (fn: (trx: any) => Promise<any>) => {
|
||||
await fn({ __trx: true }); // callback succeeds (saveHistory ok)
|
||||
throw new Error('commit aborted'); // ...but the COMMIT fails
|
||||
},
|
||||
}),
|
||||
};
|
||||
const procCommitFail = new HistoryProcessor(
|
||||
pageHistoryRepo as any,
|
||||
pageRepo as any,
|
||||
collabHistory as any,
|
||||
watcherService as any,
|
||||
commitFail as any,
|
||||
notificationQueue as any,
|
||||
generalQueue as any,
|
||||
);
|
||||
jest
|
||||
.spyOn(procCommitFail['logger'], 'error')
|
||||
.mockImplementation(() => undefined);
|
||||
|
||||
await expect(procCommitFail.process(buildJob())).rejects.toThrow(
|
||||
'commit aborted',
|
||||
);
|
||||
// The inner catch did NOT run (save succeeded), so only the outer catch can
|
||||
// restore — assert it did.
|
||||
expect(collabHistory.addContributors).toHaveBeenCalledWith(PAGE_ID, [
|
||||
'u1',
|
||||
'u2',
|
||||
]);
|
||||
// And the post-snapshot queue work must NOT have run (we rethrew).
|
||||
expect(generalQueue.add).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('backlinks + notification queue failures are swallowed (history still committed)', async () => {
|
||||
pageHistoryRepo.findPageLastHistory.mockResolvedValue({
|
||||
content: { type: 'doc', content: [] },
|
||||
|
||||
@@ -71,57 +71,85 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
let contributorIds: string[] = [];
|
||||
let snapshotWritten = false;
|
||||
let lastHistoryContent: unknown;
|
||||
// #370 F8 — the contributor set popped from Redis (destructive SPOP) must be
|
||||
// restored if the snapshot does not durably land. The inner try/catch only
|
||||
// covers a throw INSIDE the callback; a COMMIT failure (connection drop,
|
||||
// serialization/deadlock abort on commit — the transient class the epic
|
||||
// already retries) throws OUTSIDE it, rolling the snapshot back while the
|
||||
// pop is already gone. We track the popped set here and restore it in the
|
||||
// outer catch so a BullMQ retry re-attributes the version. addContributors
|
||||
// is an idempotent Redis SADD, so a double-restore is harmless.
|
||||
let poppedForRestore: string[] = [];
|
||||
|
||||
await executeTx(this.db, async (trx) => {
|
||||
const lockedPage = await this.pageRepo.findById(pageId, {
|
||||
includeContent: true,
|
||||
withLock: true,
|
||||
trx,
|
||||
});
|
||||
if (!lockedPage) return;
|
||||
|
||||
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
|
||||
pageId,
|
||||
{ includeContent: true, trx },
|
||||
);
|
||||
lastHistoryContent = lastHistory?.content;
|
||||
|
||||
if (!lastHistory && isEmptyParagraphDoc(lockedPage.content as any)) {
|
||||
this.logger.debug(
|
||||
`Skipping first history for page ${pageId}: empty content`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
lastHistory &&
|
||||
isDeepStrictEqual(lastHistory.content, lockedPage.content)
|
||||
) {
|
||||
return; // already snapshotted at this content — nothing to write
|
||||
}
|
||||
|
||||
contributorIds = await this.collabHistory.popContributors(pageId);
|
||||
try {
|
||||
await this.watcherService.addPageWatchers(
|
||||
contributorIds,
|
||||
pageId,
|
||||
lockedPage.spaceId,
|
||||
lockedPage.workspaceId,
|
||||
);
|
||||
|
||||
// #370 — every job on this queue is a trailing idle-flush autosnapshot.
|
||||
await this.pageHistoryRepo.saveHistory(lockedPage, {
|
||||
contributorIds,
|
||||
kind: job.data.kind ?? 'idle',
|
||||
try {
|
||||
await executeTx(this.db, async (trx) => {
|
||||
const lockedPage = await this.pageRepo.findById(pageId, {
|
||||
includeContent: true,
|
||||
withLock: true,
|
||||
trx,
|
||||
});
|
||||
snapshotWritten = true;
|
||||
this.logger.debug(`History created for page: ${pageId}`);
|
||||
} catch (err) {
|
||||
await this.collabHistory.addContributors(pageId, contributorIds);
|
||||
throw err;
|
||||
if (!lockedPage) return;
|
||||
|
||||
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
|
||||
pageId,
|
||||
{ includeContent: true, trx },
|
||||
);
|
||||
lastHistoryContent = lastHistory?.content;
|
||||
|
||||
if (!lastHistory && isEmptyParagraphDoc(lockedPage.content as any)) {
|
||||
this.logger.debug(
|
||||
`Skipping first history for page ${pageId}: empty content`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
lastHistory &&
|
||||
isDeepStrictEqual(lastHistory.content, lockedPage.content)
|
||||
) {
|
||||
return; // already snapshotted at this content — nothing to write
|
||||
}
|
||||
|
||||
contributorIds = await this.collabHistory.popContributors(pageId);
|
||||
poppedForRestore = contributorIds;
|
||||
try {
|
||||
// Pass `trx` so the watcher insert's FK check (FOR KEY SHARE on
|
||||
// pages[pageId]) runs on the SAME connection that already holds the
|
||||
// FOR UPDATE lock from findById — otherwise it takes the FK lock on a
|
||||
// separate pool connection and self-deadlocks against our own tx.
|
||||
await this.watcherService.addPageWatchers(
|
||||
contributorIds,
|
||||
pageId,
|
||||
lockedPage.spaceId,
|
||||
lockedPage.workspaceId,
|
||||
trx,
|
||||
);
|
||||
|
||||
// #370 — every job on this queue is a trailing idle-flush autosnapshot.
|
||||
await this.pageHistoryRepo.saveHistory(lockedPage, {
|
||||
contributorIds,
|
||||
kind: job.data.kind ?? 'idle',
|
||||
trx,
|
||||
});
|
||||
snapshotWritten = true;
|
||||
this.logger.debug(`History created for page: ${pageId}`);
|
||||
} catch (err) {
|
||||
await this.collabHistory.addContributors(pageId, contributorIds);
|
||||
poppedForRestore = [];
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
} catch (err) {
|
||||
// A throw here means the tx did NOT commit (callback threw, or the commit
|
||||
// itself failed and rolled back). If we popped contributors and the inner
|
||||
// catch did not already restore them, restore now so the retry keeps
|
||||
// attribution. snapshotWritten is irrelevant: it is set before commit, so
|
||||
// it can be true even when the commit rolled the snapshot back.
|
||||
if (poppedForRestore.length) {
|
||||
await this.collabHistory.addContributors(pageId, poppedForRestore);
|
||||
}
|
||||
});
|
||||
throw err;
|
||||
}
|
||||
|
||||
// No snapshot written (page vanished / empty-first / unchanged content) →
|
||||
// clear the contributor set for the skip cases and stop.
|
||||
|
||||
Reference in New Issue
Block a user