Merge pull request 'feat(tree): server-authoritative realtime tree updates' (#15) from feat/realtime-tree-server into develop

This commit was merged in pull request #15.
This commit is contained in:
claude_code
2026-06-20 19:48:36 +03:00
22 changed files with 1413 additions and 534 deletions

View File

@@ -3,6 +3,7 @@ export enum EventName {
PAGE_CREATED = 'page.created',
PAGE_UPDATED = 'page.updated',
PAGE_CONTENT_UPDATED = 'page-content-updated',
PAGE_MOVED = 'page.moved',
PAGE_MOVED_TO_SPACE = 'page-moved-to-space',
PAGE_DELETED = 'page.deleted',
PAGE_SOFT_DELETED = 'page.soft_deleted',

View File

@@ -767,7 +767,11 @@ export class PageController {
@AuthUser() user: User,
@AuthProvenance() provenance: AuthProvenanceData,
) {
const movedPage = await this.pageRepo.findById(dto.pageId);
// includeHasChildren so movePage's PAGE_MOVED snapshot carries an accurate
// hasChildren — receivers need it to keep the moved node's chevron correct.
const movedPage = await this.pageRepo.findById(dto.pageId, {
includeHasChildren: true,
});
if (!movedPage) {
throw new NotFoundException('Moved page not found');
}

View File

@@ -758,9 +758,14 @@ export class PageService {
}
const insertedPageIds = insertablePages.map((page) => page.id);
// `spaceId` is the single destination space for the whole copy/duplicate
// (every inserted page above gets `spaceId: spaceId`). It lets the WS
// listener trigger a root refetch for the bulk subtree (no `pages` snapshot
// here on purpose — we want the refetch fallback, not per-node addTreeNode).
this.eventEmitter.emit(EventName.PAGE_CREATED, {
pageIds: insertedPageIds,
workspaceId: authUser.workspaceId,
spaceId,
});
//TODO: best to handle this in a queue
@@ -887,6 +892,35 @@ export class PageService {
},
dto.pageId,
);
// The generic PAGE_UPDATED emitted by updatePage above is intentionally NOT
// used to drive the tree `moveTreeNode` broadcast: it also fires on rename /
// content-save and carries neither oldParentId nor the new position. Emit a
// dedicated PAGE_MOVED so the WS listener can build a precise moveTreeNode
// without a DB read (variant A: snapshot in the event).
//
// `parentPageId` is `undefined` when only the position changed (same
// parent); resolve it back to the page's actual parent for the snapshot.
const newParentPageId =
parentPageId === undefined ? movedPage.parentPageId : parentPageId;
this.eventEmitter.emit(EventName.PAGE_MOVED, {
workspaceId: movedPage.workspaceId,
oldParentId: movedPage.parentPageId ?? null,
// `hasChildren` is selected by findById({ includeHasChildren: true }) in
// the controller; it isn't on the base Page type, hence the cast.
hasChildren:
(movedPage as Page & { hasChildren?: boolean }).hasChildren ?? false,
node: {
id: movedPage.id,
slugId: movedPage.slugId,
title: movedPage.title,
icon: movedPage.icon,
position: dto.position,
spaceId: movedPage.spaceId,
parentPageId: newParentPageId ?? null,
},
});
}
async getPageBreadCrumbs(childPageId: string) {

View File

@@ -6,9 +6,46 @@ import { QueueJob, QueueName } from '../../integrations/queue/constants';
import { Queue } from 'bullmq';
import { EnvironmentService } from '../../integrations/environment/environment.service';
/**
* Thin snapshot of a page node carried inside domain events so the WebSocket
* tree listener can broadcast a tree update WITHOUT reading the DB. This is
* "variant A" of the realtime-tree design: enriching the event avoids the
* in-transaction visibility race where a separate SELECT in the listener could
* run before the emitting `trx` has committed and therefore not see the row.
*/
export interface TreeNodeSnapshot {
id: string;
slugId: string;
title: string | null;
icon: string | null;
position: string;
spaceId: string;
parentPageId: string | null;
}
export class PageEvent {
pageIds: string[];
workspaceId: string;
// Optional tree snapshots so the WS listener can broadcast without a DB read
// (avoids the in-transaction visibility race on PAGE_CREATED /
// PAGE_SOFT_DELETED / PAGE_DELETED). The existing search/AI listeners ignore
// this field — they only enqueue work keyed by pageIds.
pages?: TreeNodeSnapshot[];
// Set on PAGE_RESTORED so the WS listener can scope a refetchRootTreeNodeEvent
// to the affected space (restore can re-attach a whole subtree).
spaceId?: string;
}
/**
* Emitted by `PageService.movePage` after a successful re-parent / reorder.
* Carries both the old and new parent plus the new position so the WS listener
* can build a `moveTreeNode` broadcast without a DB read.
*/
export class PageMovedEvent {
workspaceId: string;
oldParentId: string | null;
node: TreeNodeSnapshot;
hasChildren: boolean;
}
@Injectable()

View File

@@ -173,9 +173,23 @@ export class PageRepo {
.returning(this.baseFields)
.executeTakeFirst();
// Enrich the event with a thin node snapshot (variant A) so the WS tree
// listener can broadcast `addTreeNode` without re-reading the DB. `result`
// already comes from `returning(this.baseFields)`, so no extra query.
this.eventEmitter.emit(EventName.PAGE_CREATED, {
pageIds: [result.id],
workspaceId: result.workspaceId,
pages: [
{
id: result.id,
slugId: result.slugId,
title: result.title,
icon: result.icon,
position: result.position,
spaceId: result.spaceId,
parentPageId: result.parentPageId,
},
],
});
return result;
@@ -266,6 +280,25 @@ export class PageRepo {
): Promise<void> {
const currentDate = new Date();
// Read the root snapshot up front so PAGE_SOFT_DELETED can carry it without
// a post-commit DB read (variant A). Only the root of the deleted subtree is
// needed for the tree broadcast — the client `treeModel.remove` drops all
// descendants, so we don't snapshot/broadcast every descendant.
const rootSnapshot = await this.db
.selectFrom('pages')
.select([
'id',
'slugId',
'title',
'icon',
'position',
'spaceId',
'parentPageId',
])
.where('id', '=', pageId)
.where('deletedAt', 'is', null)
.executeTakeFirst();
const descendants = await this.db
.withRecursive('page_descendants', (db) =>
db
@@ -305,6 +338,21 @@ export class PageRepo {
this.eventEmitter.emit(EventName.PAGE_SOFT_DELETED, {
pageIds: pageIds,
workspaceId,
// Root-only snapshot: one `deleteTreeNode` is enough, the client removes
// the whole subtree. Skip if the root vanished between the two reads.
pages: rootSnapshot
? [
{
id: rootSnapshot.id,
slugId: rootSnapshot.slugId,
title: rootSnapshot.title,
icon: rootSnapshot.icon,
position: rootSnapshot.position,
spaceId: rootSnapshot.spaceId,
parentPageId: rootSnapshot.parentPageId,
},
]
: [],
});
}
}
@@ -313,7 +361,7 @@ export class PageRepo {
// First, check if the page being restored has a deleted parent
const pageToRestore = await this.db
.selectFrom('pages')
.select(['id', 'parentPageId'])
.select(['id', 'parentPageId', 'spaceId'])
.where('id', '=', pageId)
.executeTakeFirst();
@@ -372,6 +420,10 @@ export class PageRepo {
this.eventEmitter.emit(EventName.PAGE_RESTORED, {
pageIds: pageIds,
workspaceId: workspaceId,
// spaceId lets the WS listener send a space-scoped refetchRootTreeNodeEvent.
// Restore can re-attach a whole subtree, so a root refetch is simpler and
// more robust than N pointwise addTreeNode events.
spaceId: pageToRestore.spaceId,
});
}

View File

@@ -552,9 +552,13 @@ export class FileImportTaskService {
}
if (validPageIds.size > 0) {
// Carry the destination spaceId so the WS listener can trigger a root
// refetch for the imported subtree (no `pages` snapshot -> refetch
// fallback rather than per-node addTreeNode).
this.eventEmitter.emit(EventName.PAGE_CREATED, {
pageIds: Array.from(validPageIds),
workspaceId: fileTask.workspaceId,
spaceId: fileTask.spaceId,
});
}

View File

@@ -0,0 +1,95 @@
import { Test, TestingModule } from '@nestjs/testing';
import { PageWsListener } from './page-ws.listener';
import { WsTreeService } from '../ws-tree.service';
import {
PageEvent,
TreeNodeSnapshot,
} from '../../database/listeners/page.listener';
const snapshot: TreeNodeSnapshot = {
id: 'page-1',
slugId: 'slug-1',
title: 'Hello',
icon: '📄',
position: 'a1',
spaceId: 'space-1',
parentPageId: null,
};
describe('PageWsListener.onPageCreated', () => {
let listener: PageWsListener;
let wsTree: {
broadcastPageCreated: jest.Mock;
broadcastRefetchRoot: jest.Mock;
};
beforeEach(async () => {
wsTree = {
broadcastPageCreated: jest.fn().mockResolvedValue(undefined),
broadcastRefetchRoot: jest.fn().mockResolvedValue(undefined),
};
const module: TestingModule = await Test.createTestingModule({
providers: [
PageWsListener,
{ provide: WsTreeService, useValue: wsTree },
],
}).compile();
listener = module.get<PageWsListener>(PageWsListener);
});
it('with `pages`: broadcasts a per-node addTreeNode and does NOT refetch root', async () => {
const event: PageEvent = {
pageIds: ['page-1'],
workspaceId: 'ws-1',
pages: [snapshot],
};
await listener.onPageCreated(event);
expect(wsTree.broadcastPageCreated).toHaveBeenCalledTimes(1);
expect(wsTree.broadcastPageCreated).toHaveBeenCalledWith(snapshot);
expect(wsTree.broadcastRefetchRoot).not.toHaveBeenCalled();
});
it('without `pages` but WITH `spaceId` (bulk create): falls back to a root refetch', async () => {
const event: PageEvent = {
pageIds: ['page-1', 'page-2'],
workspaceId: 'ws-1',
spaceId: 'space-9',
};
await listener.onPageCreated(event);
expect(wsTree.broadcastPageCreated).not.toHaveBeenCalled();
expect(wsTree.broadcastRefetchRoot).toHaveBeenCalledTimes(1);
expect(wsTree.broadcastRefetchRoot).toHaveBeenCalledWith('space-9');
});
it('with an EMPTY `pages` array but WITH `spaceId`: still falls back to a root refetch', async () => {
const event: PageEvent = {
pageIds: ['page-1'],
workspaceId: 'ws-1',
pages: [],
spaceId: 'space-9',
};
await listener.onPageCreated(event);
expect(wsTree.broadcastPageCreated).not.toHaveBeenCalled();
expect(wsTree.broadcastRefetchRoot).toHaveBeenCalledWith('space-9');
});
it('without `pages` and without `spaceId`: does nothing (no broadcast)', async () => {
const event: PageEvent = {
pageIds: ['page-1'],
workspaceId: 'ws-1',
};
await listener.onPageCreated(event);
expect(wsTree.broadcastPageCreated).not.toHaveBeenCalled();
expect(wsTree.broadcastRefetchRoot).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,81 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { EventName } from '../../common/events/event.contants';
import {
PageEvent,
PageMovedEvent,
} from '../../database/listeners/page.listener';
import { WsTreeService } from '../ws-tree.service';
/**
* Server-authoritative realtime tree updates.
*
* Listens to page lifecycle domain events and broadcasts the corresponding
* tree mutation to everyone in the space room. Because the events carry thin
* node snapshots (variant A), this listener performs NO DB reads — that is what
* keeps it safe against the in-transaction visibility race (a synchronous
* SELECT here could run before the emitting `trx` committed).
*
* Scope of this PR: create, move, soft-delete/delete, restore.
*
* Deferred follow-ups (intentionally NOT handled here):
* - rename / icon change: would broadcast `updateOne` on PAGE_UPDATED, but
* PAGE_UPDATED also fires on every content save, so it needs a title/icon
* diff filter to avoid noise.
* - cross-space move (`movePageToSpace` / PAGE_MOVED_TO_SPACE): needs a
* deleteTreeNode in the old space + addTreeNode/refetch in the new space.
*/
@Injectable()
export class PageWsListener {
private readonly logger = new Logger(PageWsListener.name);
constructor(private readonly wsTree: WsTreeService) {}
@OnEvent(EventName.PAGE_CREATED)
async onPageCreated(event: PageEvent): Promise<void> {
// Two creation shapes:
// - Single-page create carries precise node snapshots (`pages`), so we
// broadcast a pointwise addTreeNode per node.
// - Bulk create (copy/duplicate, import) produces whole subtrees and omits
// `pages`; per-node placement would be fragile, so we fall back to a root
// refetch (carries no page data, clients re-fetch via the permission-
// checked API). Same mechanism PAGE_RESTORED uses.
if (event.pages?.length) {
for (const page of event.pages) {
await this.wsTree.broadcastPageCreated(page);
}
return;
}
if (event.spaceId) {
await this.wsTree.broadcastRefetchRoot(event.spaceId);
}
}
// Both soft-delete and hard-delete remove the node from the tree. The event
// carries only the ROOT snapshot of the deleted subtree — the client
// `treeModel.remove` drops all descendants, so one deleteTreeNode is enough.
@OnEvent(EventName.PAGE_SOFT_DELETED)
@OnEvent(EventName.PAGE_DELETED)
async onPageDeleted(event: PageEvent): Promise<void> {
for (const page of event.pages ?? []) {
await this.wsTree.broadcastPageDeleted(page);
}
}
@OnEvent(EventName.PAGE_MOVED)
async onPageMoved(event: PageMovedEvent): Promise<void> {
await this.wsTree.broadcastPageMoved(event);
}
@OnEvent(EventName.PAGE_RESTORED)
async onPageRestored(event: PageEvent): Promise<void> {
// Restore can re-attach a whole subtree; a root refetch is simpler and more
// robust than N pointwise addTreeNode events.
if (!event.spaceId) {
this.logger.warn('PAGE_RESTORED event without spaceId; skipping refetch');
return;
}
await this.wsTree.broadcastRefetchRoot(event.spaceId);
}
}

View File

@@ -0,0 +1,331 @@
import { Test, TestingModule } from '@nestjs/testing';
import { WsTreeService } from './ws-tree.service';
import { WsService } from './ws.service';
import { PagePermissionRepo } from '@docmost/db/repos/page/page-permission.repo';
import { CACHE_MANAGER } from '@nestjs/cache-manager';
import {
PageMovedEvent,
TreeNodeSnapshot,
} from '../database/listeners/page.listener';
import {
getSpaceRoomName,
WS_SPACE_RESTRICTION_CACHE_PREFIX,
} from './ws.utils';
const snapshot: TreeNodeSnapshot = {
id: 'page-1',
slugId: 'slug-1',
title: 'Hello',
icon: '📄',
position: 'a1',
spaceId: 'space-1',
parentPageId: null,
};
describe('WsTreeService', () => {
let service: WsTreeService;
let wsService: {
emitTreeEvent: jest.Mock;
emitToSpaceRoom: jest.Mock;
emitDeleteToUnauthorized: jest.Mock;
emitToAuthorizedUsers: jest.Mock;
};
let pagePermissionRepo: { hasRestrictedAncestor: jest.Mock };
beforeEach(async () => {
wsService = {
emitTreeEvent: jest.fn().mockResolvedValue(undefined),
emitToSpaceRoom: jest.fn(),
emitDeleteToUnauthorized: jest.fn().mockResolvedValue(undefined),
emitToAuthorizedUsers: jest.fn().mockResolvedValue(undefined),
};
pagePermissionRepo = {
// Default: not restricted, so broadcastPageMoved skips the compensating
// delete unless a test opts in.
hasRestrictedAncestor: jest.fn().mockResolvedValue(false),
};
const module: TestingModule = await Test.createTestingModule({
providers: [
WsTreeService,
{ provide: WsService, useValue: wsService },
{ provide: PagePermissionRepo, useValue: pagePermissionRepo },
],
}).compile();
service = module.get<WsTreeService>(WsTreeService);
});
it('broadcastPageCreated emits addTreeNode with the expected shape', async () => {
await service.broadcastPageCreated(snapshot);
expect(wsService.emitTreeEvent).toHaveBeenCalledWith(
'space-1',
'page-1',
expect.objectContaining({
operation: 'addTreeNode',
spaceId: 'space-1',
payload: expect.objectContaining({
parentId: null,
index: 0,
data: expect.objectContaining({
id: 'page-1',
slugId: 'slug-1',
name: 'Hello',
title: 'Hello',
icon: '📄',
position: 'a1',
spaceId: 'space-1',
parentPageId: null,
hasChildren: false,
children: [],
}),
}),
}),
);
});
it('broadcastPageDeleted emits deleteTreeNode with the root node only', async () => {
await service.broadcastPageDeleted({
...snapshot,
parentPageId: 'parent-9',
});
expect(wsService.emitTreeEvent).toHaveBeenCalledWith(
'space-1',
'page-1',
expect.objectContaining({
operation: 'deleteTreeNode',
spaceId: 'space-1',
payload: {
node: { id: 'page-1', slugId: 'slug-1', parentPageId: 'parent-9' },
},
}),
);
});
it('broadcastPageMoved emits moveTreeNode with old + new parent and position', async () => {
const event: PageMovedEvent = {
workspaceId: 'ws-1',
oldParentId: 'old-parent',
hasChildren: true,
node: { ...snapshot, parentPageId: 'new-parent', position: 'a5' },
};
await service.broadcastPageMoved(event);
expect(wsService.emitTreeEvent).toHaveBeenCalledWith(
'space-1',
'page-1',
expect.objectContaining({
operation: 'moveTreeNode',
spaceId: 'space-1',
payload: expect.objectContaining({
id: 'page-1',
parentId: 'new-parent',
oldParentId: 'old-parent',
index: 0,
position: 'a5',
pageData: expect.objectContaining({
id: 'page-1',
slugId: 'slug-1',
position: 'a5',
parentPageId: 'new-parent',
hasChildren: true,
}),
}),
}),
);
});
it('broadcastPageMoved into an UNrestricted location does NOT emit a compensating delete', async () => {
pagePermissionRepo.hasRestrictedAncestor.mockResolvedValue(false);
const event: PageMovedEvent = {
workspaceId: 'ws-1',
oldParentId: 'old-parent',
hasChildren: false,
node: { ...snapshot, parentPageId: 'new-parent', position: 'a5' },
};
await service.broadcastPageMoved(event);
// Normal path: move goes to the whole room via emitTreeEvent, and neither
// the authorized-only move path nor the compensating delete fire.
expect(wsService.emitTreeEvent).toHaveBeenCalledTimes(1);
expect(wsService.emitToAuthorizedUsers).not.toHaveBeenCalled();
expect(wsService.emitDeleteToUnauthorized).not.toHaveBeenCalled();
});
it('broadcastPageMoved into a RESTRICTED subtree routes the move to authorized users only AND emits a compensating delete to unauthorized — from one fresh decision', async () => {
// Destination is now under a restricted ancestor.
pagePermissionRepo.hasRestrictedAncestor.mockResolvedValue(true);
const event: PageMovedEvent = {
workspaceId: 'ws-1',
oldParentId: 'old-parent',
hasChildren: false,
node: { ...snapshot, parentPageId: 'restricted-parent', position: 'a5' },
};
await service.broadcastPageMoved(event);
// The single fresh restriction decision was read exactly once...
expect(pagePermissionRepo.hasRestrictedAncestor).toHaveBeenCalledTimes(1);
expect(pagePermissionRepo.hasRestrictedAncestor).toHaveBeenCalledWith(
'page-1',
);
// ...and it must NOT go through the cache-gated room-wide emitTreeEvent,
// which could leak the move to the whole room during the stale-cache window.
expect(wsService.emitTreeEvent).not.toHaveBeenCalled();
// The move is delivered to authorized users only.
expect(wsService.emitToAuthorizedUsers).toHaveBeenCalledTimes(1);
expect(wsService.emitToAuthorizedUsers).toHaveBeenCalledWith(
'space-1',
'page-1',
expect.objectContaining({
operation: 'moveTreeNode',
spaceId: 'space-1',
payload: expect.objectContaining({ id: 'page-1' }),
}),
);
// The users who lost access get a deleteTreeNode for the moved node, scoped
// to the same page id (same fresh authorized set → disjoint from the move).
expect(wsService.emitDeleteToUnauthorized).toHaveBeenCalledTimes(1);
expect(wsService.emitDeleteToUnauthorized).toHaveBeenCalledWith(
'space-1',
'page-1',
expect.objectContaining({
operation: 'deleteTreeNode',
spaceId: 'space-1',
payload: {
node: expect.objectContaining({ id: 'page-1', slugId: 'slug-1' }),
},
}),
);
});
it('broadcastRefetchRoot emits refetchRootTreeNodeEvent to the space room', async () => {
await service.broadcastRefetchRoot('space-7');
expect(wsService.emitToSpaceRoom).toHaveBeenCalledWith('space-7', {
operation: 'refetchRootTreeNodeEvent',
spaceId: 'space-7',
});
});
});
describe('WsService.emitTreeEvent', () => {
let service: WsService;
let pagePermissionRepo: {
hasRestrictedPagesInSpace: jest.Mock;
hasRestrictedAncestor: jest.Mock;
getUserIdsWithPageAccess: jest.Mock;
};
let cache: { get: jest.Mock; set: jest.Mock; del: jest.Mock };
let roomEmit: jest.Mock;
let server: any;
beforeEach(async () => {
pagePermissionRepo = {
hasRestrictedPagesInSpace: jest.fn(),
hasRestrictedAncestor: jest.fn(),
getUserIdsWithPageAccess: jest.fn(),
};
cache = {
get: jest.fn().mockResolvedValue(null),
set: jest.fn().mockResolvedValue(undefined),
del: jest.fn(),
};
const module: TestingModule = await Test.createTestingModule({
providers: [
WsService,
{ provide: PagePermissionRepo, useValue: pagePermissionRepo },
{ provide: CACHE_MANAGER, useValue: cache },
],
}).compile();
service = module.get<WsService>(WsService);
roomEmit = jest.fn();
server = {
to: jest.fn().mockReturnValue({ emit: roomEmit }),
in: jest.fn().mockReturnValue({ fetchSockets: jest.fn() }),
};
service.setServer(server);
});
it('open space: broadcasts to the whole space room', async () => {
pagePermissionRepo.hasRestrictedPagesInSpace.mockResolvedValue(false);
const data = { operation: 'addTreeNode' };
await service.emitTreeEvent('space-1', 'page-1', data);
expect(server.to).toHaveBeenCalledWith(getSpaceRoomName('space-1'));
expect(roomEmit).toHaveBeenCalledWith('message', data);
expect(pagePermissionRepo.hasRestrictedAncestor).not.toHaveBeenCalled();
});
it('restricted page: only authorized users receive the event', async () => {
pagePermissionRepo.hasRestrictedPagesInSpace.mockResolvedValue(true);
pagePermissionRepo.hasRestrictedAncestor.mockResolvedValue(true);
pagePermissionRepo.getUserIdsWithPageAccess.mockResolvedValue(['user-ok']);
const okEmit = jest.fn();
const noEmit = jest.fn();
const sockets = [
{ id: 's1', data: { userId: 'user-ok' }, emit: okEmit },
{ id: 's2', data: { userId: 'user-no' }, emit: noEmit },
];
server.in.mockReturnValue({
fetchSockets: jest.fn().mockResolvedValue(sockets),
});
const data = { operation: 'addTreeNode' };
await service.emitTreeEvent('space-1', 'page-1', data);
// Did NOT broadcast to the whole room.
expect(roomEmit).not.toHaveBeenCalled();
expect(okEmit).toHaveBeenCalledWith('message', data);
expect(noEmit).not.toHaveBeenCalled();
});
it('invalidateSpaceRestrictionCache deletes the cached restriction verdict for that space only', async () => {
await service.invalidateSpaceRestrictionCache('space-42');
expect(cache.del).toHaveBeenCalledTimes(1);
expect(cache.del).toHaveBeenCalledWith(
`${WS_SPACE_RESTRICTION_CACHE_PREFIX}space-42`,
);
});
it('emitDeleteToUnauthorized sends ONLY to sockets whose user lacks page access', async () => {
pagePermissionRepo.getUserIdsWithPageAccess.mockResolvedValue(['user-ok']);
const okEmit = jest.fn();
const noEmit = jest.fn();
const anonEmit = jest.fn();
const sockets = [
{ id: 's1', data: { userId: 'user-ok' }, emit: okEmit },
{ id: 's2', data: { userId: 'user-no' }, emit: noEmit },
// Unauthenticated socket (no userId) — must also receive the delete.
{ id: 's3', data: {}, emit: anonEmit },
];
server.in.mockReturnValue({
fetchSockets: jest.fn().mockResolvedValue(sockets),
});
const data = { operation: 'deleteTreeNode' };
await service.emitDeleteToUnauthorized('space-1', 'page-1', data);
// Authorized user does NOT get the delete (they got the move instead).
expect(okEmit).not.toHaveBeenCalled();
// Unauthorized + anonymous sockets DO get the delete.
expect(noEmit).toHaveBeenCalledWith('message', data);
expect(anonEmit).toHaveBeenCalledWith('message', data);
});
});

View File

@@ -1,32 +1,31 @@
import { Injectable } from '@nestjs/common';
import { Page } from '@docmost/db/types/entity.types';
import { PagePermissionRepo } from '@docmost/db/repos/page/page-permission.repo';
import { WsService } from './ws.service';
import {
PageMovedEvent,
TreeNodeSnapshot,
} from '../database/listeners/page.listener';
@Injectable()
export class WsTreeService {
constructor(private readonly wsService: WsService) {}
constructor(
private readonly wsService: WsService,
private readonly pagePermissionRepo: PagePermissionRepo,
) {}
async notifyPageRestricted(page: Page, excludeUserId: string): Promise<void> {
await this.wsService.emitToSpaceExceptUsers(page.spaceId, [excludeUserId], {
operation: 'deleteTreeNode',
spaceId: page.spaceId,
payload: {
node: {
id: page.id,
slugId: page.slugId,
},
},
});
}
// Server-origin tree broadcasts. Built from thin node snapshots carried in the
// domain events (variant A) so no DB read happens here — this avoids the
// in-transaction visibility race. Payload shapes mirror what the client
// receiver (`use-tree-socket.ts`) consumes.
async notifyPermissionGranted(page: Page, userIds: string[]): Promise<void> {
if (userIds.length === 0) return;
await this.wsService.emitToUsers(userIds, {
async broadcastPageCreated(page: TreeNodeSnapshot): Promise<void> {
await this.wsService.emitTreeEvent(page.spaceId, page.id, {
operation: 'addTreeNode',
spaceId: page.spaceId,
payload: {
parentId: page.parentPageId ?? null,
// Receivers place by `position` among already-loaded siblings, not by
// this absolute index (sender's loaded set differs from receivers').
index: 0,
data: {
id: page.id,
@@ -37,11 +36,112 @@ export class WsTreeService {
position: page.position,
spaceId: page.spaceId,
parentPageId: page.parentPageId,
creatorId: page.creatorId,
hasChildren: false,
children: [],
},
},
});
}
async broadcastPageDeleted(page: TreeNodeSnapshot): Promise<void> {
await this.wsService.emitTreeEvent(page.spaceId, page.id, {
operation: 'deleteTreeNode',
spaceId: page.spaceId,
payload: {
node: {
id: page.id,
slugId: page.slugId,
parentPageId: page.parentPageId ?? null,
},
},
});
}
async broadcastPageMoved(event: PageMovedEvent): Promise<void> {
const { node } = event;
const movePayload = {
operation: 'moveTreeNode',
spaceId: node.spaceId,
payload: {
id: node.id,
parentId: node.parentPageId ?? null,
oldParentId: event.oldParentId ?? null,
// See broadcastPageCreated: receivers place by `position`, not index.
index: 0,
position: node.position,
pageData: {
id: node.id,
slugId: node.slugId,
title: node.title,
icon: node.icon,
position: node.position,
spaceId: node.spaceId,
parentPageId: node.parentPageId ?? null,
hasChildren: event.hasChildren,
},
},
};
// Decide the node's restricted state ONCE, fresh (uncached), and drive BOTH
// the move broadcast and the compensating delete from this single decision.
//
// Why not just emitTreeEvent for the move? emitTreeEvent gates the move on
// the CACHED spaceHasRestrictions (30s TTL, never invalidated). In the window
// right after a space gets its FIRST restriction, that cache still says
// "no restrictions" → emitTreeEvent would fan the move out to the WHOLE room
// (including unauthorized users) while the delete below (computed from the
// UNCACHED hasRestrictedAncestor) also fires. An unauthorized user then gets
// BOTH, and if the delete lands first it is a no-op and the later move
// renders the restricted node → leak. So when the node is known-restricted we
// must NOT route the move through the cache-gated path.
const isRestricted = await this.pagePermissionRepo.hasRestrictedAncestor(
node.id,
);
if (!isRestricted) {
// Normal case: not under a restricted ancestor. One moveTreeNode to the
// whole space room (emitTreeEvent's open-space fast path), no delete.
await this.wsService.emitTreeEvent(node.spaceId, node.id, movePayload);
return;
}
// Restricted case: a move can push a previously-visible page UNDER a
// restricted ancestor. Route the move to authorized users ONLY (same fresh
// getUserIdsWithPageAccess set the delete uses) and send the compensating
// delete to everyone else. Both sets come from one fresh decision, so they
// are guaranteed disjoint: authorized users get exactly the moveTreeNode,
// unauthorized users get exactly the deleteTreeNode, nobody gets both.
//
// Users who LOSE visibility need the delete because otherwise the node would
// linger in their tree at its old parent with its real title/slugId/icon
// (existence + metadata leak).
await this.wsService.emitToAuthorizedUsers(
node.spaceId,
node.id,
movePayload,
);
await this.wsService.emitDeleteToUnauthorized(node.spaceId, node.id, {
operation: 'deleteTreeNode',
spaceId: node.spaceId,
payload: {
node: {
id: node.id,
slugId: node.slugId,
parentPageId: event.oldParentId ?? null,
},
},
});
}
// Used for restore (and other subtree re-attachments): rather than emitting N
// pointwise addTreeNode events, ask clients in the space to refetch the root
// tree. The client already understands `refetchRootTreeNodeEvent`.
async broadcastRefetchRoot(spaceId: string): Promise<void> {
this.wsService.emitToSpaceRoom(spaceId, {
operation: 'refetchRootTreeNodeEvent',
spaceId,
});
}
}

View File

@@ -62,10 +62,10 @@ export class WsGateway
}
@SubscribeMessage('message')
async handleMessage(client: Socket, data: any): Promise<void> {
if (this.wsService.isTreeEvent(data)) {
await this.wsService.handleTreeEvent(client, data);
}
handleMessage(_client: Socket, _data: any): void {
// Inbound tree events from clients are no longer accepted: tree updates are
// now server-authoritative (broadcast by PageWsListener from domain events).
// The old client-relay path was removed to close that attack surface.
}
/*

View File

@@ -2,12 +2,13 @@ import { Global, Module } from '@nestjs/common';
import { WsGateway } from './ws.gateway';
import { WsService } from './ws.service';
import { WsTreeService } from './ws-tree.service';
import { PageWsListener } from './listeners/page-ws.listener';
import { TokenModule } from '../core/auth/token.module';
@Global()
@Module({
imports: [TokenModule],
providers: [WsGateway, WsService, WsTreeService],
providers: [WsGateway, WsService, WsTreeService, PageWsListener],
exports: [WsGateway, WsService, WsTreeService],
})
export class WsModule {}

View File

@@ -1,14 +1,12 @@
import { Inject, Injectable } from '@nestjs/common';
import { CACHE_MANAGER } from '@nestjs/cache-manager';
import { Cache } from 'cache-manager';
import { Server, Socket } from 'socket.io';
import { Server } from 'socket.io';
import { PagePermissionRepo } from '@docmost/db/repos/page/page-permission.repo';
import {
TREE_EVENTS,
WS_SPACE_RESTRICTION_CACHE_PREFIX,
WS_CACHE_TTL_MS,
getSpaceRoomName,
getUserRoomName,
} from './ws.utils';
@Injectable()
@@ -24,39 +22,25 @@ export class WsService {
this.server = server;
}
async handleTreeEvent(client: Socket, data: any): Promise<void> {
const room = getSpaceRoomName(data.spaceId);
if (!client.rooms.has(room)) {
return;
}
if (data.operation === 'refetchRootTreeNodeEvent') {
client.broadcast.to(room).emit('message', data);
return;
}
const hasRestrictions = await this.spaceHasRestrictions(data.spaceId);
if (!hasRestrictions) {
client.broadcast.to(room).emit('message', data);
return;
}
const pageId = this.extractPageId(data);
if (!pageId) {
return;
}
const isRestricted =
await this.pagePermissionRepo.hasRestrictedAncestor(pageId);
if (!isRestricted) {
client.broadcast.to(room).emit('message', data);
return;
}
await this.broadcastToAuthorizedUsers(room, client.id, pageId, data);
}
// Drop the cached spaceHasRestrictions verdict for a space. spaceHasRestrictions
// caches "does this space have ANY restricted page" for WS_CACHE_TTL_MS (30s),
// and emitTreeEvent / emitCommentEvent take a room-wide fast path when it is
// false. The FIRST time a space gains a restriction (or loses its last one)
// this cached verdict goes stale for up to the TTL, during which a title/icon-
// bearing tree payload could fan out to the whole room. This MUST be called by
// whatever code creates or removes a page's restriction (the page-access /
// page-permission grant/revoke/restrict path), passing the affected page's
// spaceId, so the next emit re-reads hasRestrictedPagesInSpace.
//
// NOTE: on this branch there is no permission-mutation site to call this from —
// the page-access/page-permission repo mutators (insertPageAccess /
// insertPagePermissions / deletePagePermission* / updatePagePermissionRole)
// have ZERO callers in apps/server/src; PageAccessService only validates access.
// This primitive is kept (and tested) so that flow, when it lands, has the
// correct hook to invalidate the cache.
//
// TODO: the future restriction-mutation endpoint (restrict/grant/revoke page
// access) MUST call this with the affected page's spaceId.
async invalidateSpaceRestrictionCache(spaceId: string): Promise<void> {
await this.cacheManager.del(
`${WS_SPACE_RESTRICTION_CACHE_PREFIX}${spaceId}`,
@@ -86,31 +70,101 @@ export class WsService {
await this.broadcastToAuthorizedUsers(room, null, pageId, data);
}
async emitToUsers(userIds: string[], data: any): Promise<void> {
if (userIds.length === 0) return;
const rooms = userIds.map((id) => getUserRoomName(id));
this.server.to(rooms).emit('message', data);
// Server-origin tree broadcast. Mirrors emitCommentEvent exactly: respects
// per-space page restrictions (spaceHasRestrictions -> hasRestrictedAncestor
// -> broadcastToAuthorizedUsers), otherwise fans the event out to everyone in
// the space room.
//
// The author is NOT excluded. The client receiver is idempotent (addTreeNode
// early-returns if the node id already exists; deleteTreeNode is a no-op if
// the node is gone), so the UI author's optimistic node is preserved, and
// non-UI creators (MCP / AI / REST API) still see their own page appear.
async emitTreeEvent(
spaceId: string,
pageId: string,
data: any,
): Promise<void> {
const room = getSpaceRoomName(spaceId);
const hasRestrictions = await this.spaceHasRestrictions(spaceId);
if (!hasRestrictions) {
this.server.to(room).emit('message', data);
return;
}
const isRestricted =
await this.pagePermissionRepo.hasRestrictedAncestor(pageId);
if (!isRestricted) {
this.server.to(room).emit('message', data);
return;
}
await this.broadcastToAuthorizedUsers(room, null, pageId, data);
}
async emitToSpaceExceptUsers(
// Unconditional broadcast to everyone in the space room. Used for space-wide
// signals that carry no page payload (e.g. refetchRootTreeNodeEvent on
// restore): there is no per-page data to leak, and each client refetches the
// root tree through its own authorized query (refetchRootTreeNodeEvent carries
// no per-page data, so no restriction check is needed).
emitToSpaceRoom(spaceId: string, data: any): void {
this.server.to(getSpaceRoomName(spaceId)).emit('message', data);
}
// Broadcast `data` (a deleteTreeNode) to every socket in the space room whose
// user is NOT authorized to see `pageId`. Used to compensate a move that pushes
// a previously-visible page UNDER a restricted ancestor: authorized users get
// the moveTreeNode (via emitTreeEvent), everyone else gets a deleteTreeNode so
// the now-restricted node disappears from their tree instead of lingering with
// its real title/slugId/icon. The two event sets are disjoint by construction
// (a user is either authorized or not), so no socket receives both.
async emitDeleteToUnauthorized(
spaceId: string,
excludeUserIds: string[],
pageId: string,
data: any,
): Promise<void> {
const room = getSpaceRoomName(spaceId);
const sockets = await this.server.in(room).fetchSockets();
const excludeSet = new Set(excludeUserIds);
if (sockets.length === 0) return;
const userIds = Array.from(
new Set(
sockets
.map((s) => s.data.userId as string)
.filter((id): id is string => !!id),
),
);
if (userIds.length === 0) return;
const authorizedUserIds =
await this.pagePermissionRepo.getUserIdsWithPageAccess(pageId, userIds);
const authorizedSet = new Set(authorizedUserIds);
for (const socket of sockets) {
const userId = socket.data.userId as string;
if (userId && !excludeSet.has(userId)) {
// Unauthenticated sockets (no userId) cannot see restricted content; send
// them the delete too so a leaked node can't linger.
if (!userId || !authorizedSet.has(userId)) {
socket.emit('message', data);
}
}
}
isTreeEvent(data: any): boolean {
return TREE_EVENTS.has(data?.operation) && !!data?.spaceId;
// Server-origin broadcast of `data` to exactly the users in the space room who
// ARE authorized to see `pageId`. This is the counterpart of
// emitDeleteToUnauthorized: both resolve the authorized set from the SAME
// fetchSockets + getUserIdsWithPageAccess call shape, so a caller that drives
// both from one decision gets two disjoint sets (authorized vs. not) with no
// socket in both. Unlike emitTreeEvent, this does NOT consult the cached
// spaceHasRestrictions: the caller already knows the page is restricted, so we
// must not risk a stale cache fanning the move out to the whole room.
async emitToAuthorizedUsers(
spaceId: string,
pageId: string,
data: any,
): Promise<void> {
const room = getSpaceRoomName(spaceId);
await this.broadcastToAuthorizedUsers(room, null, pageId, data);
}
private async broadcastToAuthorizedUsers(
@@ -175,19 +229,4 @@ export class WsService {
return hasRestrictions;
}
private extractPageId(data: any): string | null {
switch (data.operation) {
case 'addTreeNode':
return data.payload?.data?.id ?? null;
case 'moveTreeNode':
return data.payload?.id ?? null;
case 'deleteTreeNode':
return data.payload?.node?.id ?? null;
case 'updateOne':
return data.id ?? null;
default:
return null;
}
}
}

View File

@@ -8,11 +8,3 @@ export function getSpaceRoomName(spaceId: string): string {
export function getUserRoomName(userId: string): string {
return `user-${userId}`;
}
export const TREE_EVENTS = new Set([
'updateOne',
'addTreeNode',
'moveTreeNode',
'deleteTreeNode',
'refetchRootTreeNodeEvent',
]);