fix(page): make movePage cycle-check + update atomic to prevent concurrent A<->B cycles (#207)
The server-side move cycle guard (getPageBreadCrumbs) and the move UPDATE ran
as two separate, unlocked statements. Two concurrent moves ("A under B" and
"B under A") could each read the same pre-write acyclic snapshot, both pass the
guard, then persist A.parentPageId=B AND B.parentPageId=A — a parent/child
cycle (TOCTOU, #207 #7).
Run the cycle check and the UPDATE inside one transaction (executeTx) guarded
by a per-space advisory lock (pg_advisory_xact_lock, held until COMMIT) so all
moves within a space serialize: the second mover blocks until the first commits
and then sees the freshly written parent, so its guard rejects the cycle.
getPageBreadCrumbs gains an optional trx so the check runs on the locked snapshot.
Adds an integration test driving two opposing concurrent movePage calls and
asserting no cycle ever persists and exactly one move is rejected. Updates the
movePage unit-test stubs for the new transactional path.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -57,11 +57,28 @@ describe('PageService', () => {
|
|||||||
|
|
||||||
const eventEmitter = { emit: jest.fn() };
|
const eventEmitter = { emit: jest.fn() };
|
||||||
|
|
||||||
|
// movePage now runs the cycle-check + UPDATE inside executeTx(this.db),
|
||||||
|
// i.e. this.db.transaction().execute(fn => fn(trx)). A permissive chainable
|
||||||
|
// Proxy stands in for the Kysely trx so the per-space advisory-lock
|
||||||
|
// `sql``.execute(trx)` resolves; a thrown BadRequestException still
|
||||||
|
// propagates out of the transaction unchanged.
|
||||||
|
const trxStub: any = new Proxy(function () {}, {
|
||||||
|
get: (_t, p) =>
|
||||||
|
p === 'then'
|
||||||
|
? undefined
|
||||||
|
: p === 'execute' || p === 'executeTakeFirst'
|
||||||
|
? () => Promise.resolve([])
|
||||||
|
: () => trxStub,
|
||||||
|
});
|
||||||
|
const db = {
|
||||||
|
transaction: () => ({ execute: (fn: any) => fn(trxStub) }),
|
||||||
|
};
|
||||||
|
|
||||||
const svc = new PageService(
|
const svc = new PageService(
|
||||||
pageRepo as any, // pageRepo
|
pageRepo as any, // pageRepo
|
||||||
{} as any, // pagePermissionRepo
|
{} as any, // pagePermissionRepo
|
||||||
{} as any, // attachmentRepo
|
{} as any, // attachmentRepo
|
||||||
{} as any, // db
|
db as any, // db
|
||||||
{} as any, // storageService
|
{} as any, // storageService
|
||||||
{} as any, // attachmentQueue
|
{} as any, // attachmentQueue
|
||||||
{} as any, // aiQueue
|
{} as any, // aiQueue
|
||||||
@@ -268,9 +285,23 @@ describe('PageService', () => {
|
|||||||
}),
|
}),
|
||||||
updatePage: jest.fn().mockResolvedValue({ numUpdatedRows: 1n }),
|
updatePage: jest.fn().mockResolvedValue({ numUpdatedRows: 1n }),
|
||||||
};
|
};
|
||||||
|
// movePage now runs the cycle-check + UPDATE inside executeTx(this.db),
|
||||||
|
// which calls this.db.transaction().execute(fn => fn(trx)). A permissive
|
||||||
|
// chainable Proxy stands in for the Kysely trx so the per-space
|
||||||
|
// advisory-lock `sql``.execute(trx)` resolves and updatePage receives it.
|
||||||
|
const trxStub: any = new Proxy(function () {}, {
|
||||||
|
get: (_t, p) =>
|
||||||
|
p === 'then'
|
||||||
|
? undefined
|
||||||
|
: p === 'execute' || p === 'executeTakeFirst'
|
||||||
|
? () => Promise.resolve([])
|
||||||
|
: () => trxStub,
|
||||||
|
});
|
||||||
const svc = makeSvc({
|
const svc = makeSvc({
|
||||||
pageRepo,
|
pageRepo,
|
||||||
db: {} as any,
|
db: {
|
||||||
|
transaction: () => ({ execute: (fn: any) => fn(trxStub) }),
|
||||||
|
} as any,
|
||||||
});
|
});
|
||||||
// Legitimate move: destination ancestors do NOT include the moved page.
|
// Legitimate move: destination ancestors do NOT include the moved page.
|
||||||
jest
|
jest
|
||||||
|
|||||||
@@ -15,13 +15,13 @@ import {
|
|||||||
executeWithCursorPagination,
|
executeWithCursorPagination,
|
||||||
} from '@docmost/db/pagination/cursor-pagination';
|
} from '@docmost/db/pagination/cursor-pagination';
|
||||||
import { InjectKysely } from 'nestjs-kysely';
|
import { InjectKysely } from 'nestjs-kysely';
|
||||||
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
import { KyselyDB, KyselyTransaction } from '@docmost/db/types/kysely.types';
|
||||||
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
|
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
|
||||||
import { MovePageDto } from '../dto/move-page.dto';
|
import { MovePageDto } from '../dto/move-page.dto';
|
||||||
import { shapeSidebarPagesTree } from './sidebar-pages-tree.util';
|
import { shapeSidebarPagesTree } from './sidebar-pages-tree.util';
|
||||||
import { generateSlugId } from '../../../common/helpers';
|
import { generateSlugId } from '../../../common/helpers';
|
||||||
import { getPageTitle } from '../../../common/helpers';
|
import { getPageTitle } from '../../../common/helpers';
|
||||||
import { executeTx } from '@docmost/db/utils';
|
import { dbOrTx, executeTx } from '@docmost/db/utils';
|
||||||
import { AttachmentRepo } from '@docmost/db/repos/attachment/attachment.repo';
|
import { AttachmentRepo } from '@docmost/db/repos/attachment/attachment.repo';
|
||||||
import { v7 as uuid7 } from 'uuid';
|
import { v7 as uuid7 } from 'uuid';
|
||||||
import {
|
import {
|
||||||
@@ -62,6 +62,13 @@ import {
|
|||||||
agentSourceFields,
|
agentSourceFields,
|
||||||
} from '../../../common/decorators/auth-provenance.decorator';
|
} from '../../../common/decorators/auth-provenance.decorator';
|
||||||
|
|
||||||
|
// Advisory-lock namespace (the first key of pg_advisory_xact_lock) used to
|
||||||
|
// serialize concurrent page moves within a single space so the cycle check and
|
||||||
|
// the move UPDATE stay atomic (see movePage, #207 #7). A dedicated namespace
|
||||||
|
// constant keeps these locks from colliding with any other advisory lock; the
|
||||||
|
// second key is hashtext(spaceId). Fits a signed int4 ('page' in ASCII).
|
||||||
|
const PAGE_MOVE_LOCK_NAMESPACE = 0x70616765;
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class PageService {
|
export class PageService {
|
||||||
private readonly logger = new Logger(PageService.name);
|
private readonly logger = new Logger(PageService.name);
|
||||||
@@ -915,34 +922,61 @@ export class PageService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server-side cycle guard: a page may not be moved into itself or into any
|
// Server-side cycle guard + the move UPDATE run in ONE transaction. A page
|
||||||
// page within its own subtree. Without this, an MCP/REST/agent caller (or a
|
// may not be moved into itself or into any page within its own subtree;
|
||||||
// fast drag racing the client check) could persist a cycle and broadcast it.
|
// without this an MCP/REST/agent caller (or a fast drag racing the client
|
||||||
// Only relevant when re-parenting under a concrete parent; moving to root
|
// check) could persist a cycle and broadcast it. Crucially, doing the guard
|
||||||
// (parentPageId null/undefined) can never create a cycle.
|
// and the write as two separate, unlocked statements is a TOCTOU race: two
|
||||||
if (dto.parentPageId) {
|
// concurrent moves ("A under B" and "B under A") can each read the same
|
||||||
if (dto.parentPageId === dto.pageId) {
|
// pre-write acyclic snapshot, both pass the guard, then persist
|
||||||
throw new BadRequestException('Cannot move a page into its own subtree');
|
// A.parentPageId=B AND B.parentPageId=A — a parent/child cycle (#207 #7). A
|
||||||
}
|
// per-space advisory lock (held until COMMIT) serializes all moves within a
|
||||||
// Walk the destination parent's ancestor chain (reusing the breadcrumb
|
// space: the second mover blocks until the first commits and then sees the
|
||||||
// ancestor CTE). If the page being moved appears among those ancestors,
|
// freshly written parent, so its guard rejects the cycle.
|
||||||
// the destination lives inside the moved page's subtree -> cycle.
|
const updateResult = await executeTx(this.db, async (trx) => {
|
||||||
const destAncestors = await this.getPageBreadCrumbs(dto.parentPageId);
|
await sql`select pg_advisory_xact_lock(${sql.lit(
|
||||||
if (destAncestors.some((ancestor) => ancestor.id === dto.pageId)) {
|
PAGE_MOVE_LOCK_NAMESPACE,
|
||||||
throw new BadRequestException('Cannot move a page into its own subtree');
|
)}, hashtext(${movedPage.spaceId}))`.execute(trx);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const updateResult = await this.pageRepo.updatePage(
|
// Only relevant when re-parenting under a concrete parent; moving to root
|
||||||
{
|
// (parentPageId null/undefined) can never create a cycle.
|
||||||
position: dto.position,
|
if (dto.parentPageId) {
|
||||||
parentPageId: parentPageId,
|
if (dto.parentPageId === dto.pageId) {
|
||||||
// Agent-edit provenance: annotate the source on an agent move. A normal
|
throw new BadRequestException(
|
||||||
// user request leaves the existing source value unchanged.
|
'Cannot move a page into its own subtree',
|
||||||
...agentSourceFields(provenance, 'lastUpdatedSource', 'lastUpdatedAiChatId'),
|
);
|
||||||
},
|
}
|
||||||
dto.pageId,
|
// Walk the destination parent's ancestor chain (reusing the breadcrumb
|
||||||
);
|
// ancestor CTE) inside the lock. If the page being moved appears among
|
||||||
|
// those ancestors, the destination lives inside the moved page's
|
||||||
|
// subtree -> cycle.
|
||||||
|
const destAncestors = await this.getPageBreadCrumbs(
|
||||||
|
dto.parentPageId,
|
||||||
|
trx,
|
||||||
|
);
|
||||||
|
if (destAncestors.some((ancestor) => ancestor.id === dto.pageId)) {
|
||||||
|
throw new BadRequestException(
|
||||||
|
'Cannot move a page into its own subtree',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.pageRepo.updatePage(
|
||||||
|
{
|
||||||
|
position: dto.position,
|
||||||
|
parentPageId: parentPageId,
|
||||||
|
// Agent-edit provenance: annotate the source on an agent move. A
|
||||||
|
// normal user request leaves the existing source value unchanged.
|
||||||
|
...agentSourceFields(
|
||||||
|
provenance,
|
||||||
|
'lastUpdatedSource',
|
||||||
|
'lastUpdatedAiChatId',
|
||||||
|
),
|
||||||
|
},
|
||||||
|
dto.pageId,
|
||||||
|
trx,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
// Guard against a phantom broadcast: if the row was concurrently deleted or
|
// Guard against a phantom broadcast: if the row was concurrently deleted or
|
||||||
// otherwise not updated, skip the PAGE_MOVED event so we don't replay a move
|
// otherwise not updated, skip the PAGE_MOVED event so we don't replay a move
|
||||||
@@ -981,8 +1015,8 @@ export class PageService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async getPageBreadCrumbs(childPageId: string) {
|
async getPageBreadCrumbs(childPageId: string, trx?: KyselyTransaction) {
|
||||||
const ancestors = await this.db
|
const ancestors = await dbOrTx(this.db, trx)
|
||||||
.withRecursive('page_ancestors', (db) =>
|
.withRecursive('page_ancestors', (db) =>
|
||||||
db
|
db
|
||||||
.selectFrom('pages')
|
.selectFrom('pages')
|
||||||
|
|||||||
133
apps/server/test/integration/page-move-cycle.int-spec.ts
Normal file
133
apps/server/test/integration/page-move-cycle.int-spec.ts
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
import { Kysely } from 'kysely';
|
||||||
|
import { generateJitteredKeyBetween } from 'fractional-indexing-jittered';
|
||||||
|
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||||
|
import { PageService } from 'src/core/page/services/page.service';
|
||||||
|
import { Page } from '@docmost/db/types/entity.types';
|
||||||
|
import {
|
||||||
|
getTestDb,
|
||||||
|
destroyTestDb,
|
||||||
|
createWorkspace,
|
||||||
|
createSpace,
|
||||||
|
createPage,
|
||||||
|
} from './db';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* #207 #7 — TOCTOU in PageService.movePage: two concurrent moves
|
||||||
|
* ("A under B" + "B under A") must NOT be able to persist a parent/child cycle.
|
||||||
|
*
|
||||||
|
* Before the fix the cycle check (getPageBreadCrumbs) and the UPDATE were two
|
||||||
|
* separate, unlocked statements, so both movers could read the same pre-write
|
||||||
|
* acyclic snapshot, both pass the guard, and persist A.parentPageId=B AND
|
||||||
|
* B.parentPageId=A. The fix runs the guard + UPDATE in one transaction behind a
|
||||||
|
* per-space advisory lock, so the moves serialize: whichever commits second
|
||||||
|
* sees the first's write and its guard rejects the cycle.
|
||||||
|
*
|
||||||
|
* This test drives the real PageService.movePage against a real Postgres,
|
||||||
|
* firing the two opposing moves concurrently, and asserts that no cycle ever
|
||||||
|
* persists (walking parentPageId from both pages always reaches a root with no
|
||||||
|
* repeated id) and that exactly one of the two opposing moves is rejected.
|
||||||
|
*/
|
||||||
|
describe('PageService.movePage concurrent A<->B cycle guard [integration]', () => {
|
||||||
|
let db: Kysely<any>;
|
||||||
|
let pageRepo: PageRepo;
|
||||||
|
let pageService: PageService;
|
||||||
|
let workspaceId: string;
|
||||||
|
let spaceId: string;
|
||||||
|
|
||||||
|
// A valid fractional-index position key; movePage validates the position.
|
||||||
|
const position = generateJitteredKeyBetween(null, null);
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
db = getTestDb();
|
||||||
|
// Event emission is a side effect movePage performs but the cycle behaviour
|
||||||
|
// does not depend on; a no-op emitter keeps the harness minimal.
|
||||||
|
const eventEmitter = { emit: () => true } as any;
|
||||||
|
pageRepo = new PageRepo(db as any, {} as any, eventEmitter);
|
||||||
|
// Only pageRepo (1), db (4) and eventEmitter (9) are touched by movePage;
|
||||||
|
// the remaining constructor deps are unused on this path.
|
||||||
|
pageService = new PageService(
|
||||||
|
pageRepo,
|
||||||
|
undefined as any,
|
||||||
|
undefined as any,
|
||||||
|
db as any,
|
||||||
|
undefined as any,
|
||||||
|
undefined as any,
|
||||||
|
undefined as any,
|
||||||
|
undefined as any,
|
||||||
|
eventEmitter,
|
||||||
|
undefined as any,
|
||||||
|
undefined as any,
|
||||||
|
undefined as any,
|
||||||
|
);
|
||||||
|
|
||||||
|
workspaceId = (await createWorkspace(db)).id;
|
||||||
|
spaceId = (await createSpace(db, workspaceId)).id;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await destroyTestDb();
|
||||||
|
});
|
||||||
|
|
||||||
|
async function findPage(id: string): Promise<Page> {
|
||||||
|
const page = await pageRepo.findById(id);
|
||||||
|
if (!page) throw new Error(`page ${id} not found`);
|
||||||
|
return page;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Walk parentPageId upward from startId. Throws if a node repeats (cycle) or
|
||||||
|
// the walk fails to terminate; returns normally only when a root is reached.
|
||||||
|
async function assertReachesRoot(startId: string): Promise<void> {
|
||||||
|
const seen = new Set<string>();
|
||||||
|
let cur: string | null = startId;
|
||||||
|
let steps = 0;
|
||||||
|
while (cur) {
|
||||||
|
if (seen.has(cur)) {
|
||||||
|
throw new Error(`cycle detected: revisited ${cur}`);
|
||||||
|
}
|
||||||
|
seen.add(cur);
|
||||||
|
const row: { parentPageId: string | null } | undefined = await db
|
||||||
|
.selectFrom('pages')
|
||||||
|
.select('parentPageId')
|
||||||
|
.where('id', '=', cur)
|
||||||
|
.executeTakeFirst();
|
||||||
|
cur = row?.parentPageId ?? null;
|
||||||
|
if (++steps > 1000) {
|
||||||
|
throw new Error('parent walk did not terminate');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it('two opposing concurrent moves never persist a parent/child cycle', async () => {
|
||||||
|
// Repeat to exercise different scheduler interleavings of the two moves.
|
||||||
|
for (let i = 0; i < 8; i++) {
|
||||||
|
const a = await createPage(db, { workspaceId, spaceId, title: `A-${i}` });
|
||||||
|
const b = await createPage(db, { workspaceId, spaceId, title: `B-${i}` });
|
||||||
|
|
||||||
|
const movedA = await findPage(a.id);
|
||||||
|
const movedB = await findPage(b.id);
|
||||||
|
|
||||||
|
const results = await Promise.allSettled([
|
||||||
|
pageService.movePage(
|
||||||
|
{ pageId: a.id, parentPageId: b.id, position } as any,
|
||||||
|
movedA,
|
||||||
|
),
|
||||||
|
pageService.movePage(
|
||||||
|
{ pageId: b.id, parentPageId: a.id, position } as any,
|
||||||
|
movedB,
|
||||||
|
),
|
||||||
|
]);
|
||||||
|
|
||||||
|
// No cycle may have been persisted by either ordering.
|
||||||
|
await assertReachesRoot(a.id);
|
||||||
|
await assertReachesRoot(b.id);
|
||||||
|
|
||||||
|
// The serialization guarantees exactly one of the opposing moves wins;
|
||||||
|
// the other must be rejected as a subtree cycle.
|
||||||
|
const rejected = results.filter(
|
||||||
|
(r): r is PromiseRejectedResult => r.status === 'rejected',
|
||||||
|
);
|
||||||
|
expect(rejected).toHaveLength(1);
|
||||||
|
expect(rejected[0].reason?.message).toMatch(/into its own subtree/);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user