diff --git a/apps/server/src/collaboration/constants.ts b/apps/server/src/collaboration/constants.ts index 8ce8c825..767d12d3 100644 --- a/apps/server/src/collaboration/constants.ts +++ b/apps/server/src/collaboration/constants.ts @@ -1,3 +1,9 @@ export const HISTORY_INTERVAL = 5 * 60 * 1000; export const HISTORY_FAST_INTERVAL = 60 * 1000; export const HISTORY_FAST_THRESHOLD = 5 * 60 * 1000; + +// Redis pub/sub channel that bridges a PAGE_UPDATED tree snapshot (a title/icon +// rename) from the standalone collab process to the API process, which is the +// single broadcast authority. Imported by both halves of the bridge: +// PageTreeBridgePublisher (collab process) and PageTreeBridgeSubscriber (API process). +export const COLLAB_TREE_UPDATE_CHANNEL = 'collab:tree-update'; diff --git a/apps/server/src/collaboration/listeners/page-tree-bridge.publisher.spec.ts b/apps/server/src/collaboration/listeners/page-tree-bridge.publisher.spec.ts new file mode 100644 index 00000000..71064eac --- /dev/null +++ b/apps/server/src/collaboration/listeners/page-tree-bridge.publisher.spec.ts @@ -0,0 +1,81 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import { PageTreeBridgePublisher } from './page-tree-bridge.publisher'; +import { COLLAB_TREE_UPDATE_CHANNEL } from '../constants'; +import { + PageEvent, + TreeUpdateSnapshot, +} from '../../database/listeners/page.listener'; + +const treeUpdate: TreeUpdateSnapshot = { + id: 'page-1', + slugId: 'slug-1', + spaceId: 'space-1', + parentPageId: null, + title: 'Renamed', + icon: '🚀', +}; + +describe('PageTreeBridgePublisher', () => { + let publisher: PageTreeBridgePublisher; + let redis: { publish: jest.Mock }; + + beforeEach(async () => { + redis = { publish: jest.fn().mockResolvedValue(1) }; + const redisService = { getOrThrow: () => redis } as unknown as RedisService; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + PageTreeBridgePublisher, + { provide: RedisService, useValue: redisService }, + ], + }).compile(); + + publisher = module.get(PageTreeBridgePublisher); + }); + + it('WITH a `treeUpdate`: publishes the JSON snapshot on the channel', async () => { + const event: PageEvent = { + pageIds: ['page-1'], + workspaceId: 'ws-1', + treeUpdate, + }; + + await publisher.onPageUpdated(event); + + expect(redis.publish).toHaveBeenCalledTimes(1); + expect(redis.publish).toHaveBeenCalledWith( + COLLAB_TREE_UPDATE_CHANNEL, + JSON.stringify(treeUpdate), + ); + }); + + it('content-only save (NO `treeUpdate`): does NOT publish', async () => { + const event: PageEvent = { + pageIds: ['page-1'], + workspaceId: 'ws-1', + }; + + await publisher.onPageUpdated(event); + + expect(redis.publish).not.toHaveBeenCalled(); + }); + + it('a publish rejection is caught (no throw)', async () => { + redis.publish.mockRejectedValueOnce(new Error('redis down')); + const errorSpy = jest + .spyOn(publisher['logger'], 'error') + .mockImplementation(() => undefined); + + const event: PageEvent = { + pageIds: ['page-1'], + workspaceId: 'ws-1', + treeUpdate, + }; + + await expect(publisher.onPageUpdated(event)).resolves.toBeUndefined(); + expect(errorSpy).toHaveBeenCalledTimes(1); + + errorSpy.mockRestore(); + }); +}); diff --git a/apps/server/src/collaboration/listeners/page-tree-bridge.publisher.ts b/apps/server/src/collaboration/listeners/page-tree-bridge.publisher.ts new file mode 100644 index 00000000..468e2070 --- /dev/null +++ b/apps/server/src/collaboration/listeners/page-tree-bridge.publisher.ts @@ -0,0 +1,55 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; +import { EventName } from '../../common/events/event.contants'; +import { PageEvent } from '../../database/listeners/page.listener'; +import { COLLAB_TREE_UPDATE_CHANNEL } from '../constants'; + +/** + * Collab-process half of the cross-process tree-update bridge. + * + * The standalone collab process bootstraps `CollabAppModule`, which does NOT + * import `WsModule`/`PageWsListener`. So when a collaborative title/icon rename + * persists and emits `EventName.PAGE_UPDATED` with a `treeUpdate` snapshot, there + * is no listener in this process to broadcast it — the live tree update would be + * lost for 2-process (COLLAB_URL set) deployments. + * + * This publisher fills that gap: it forwards the `treeUpdate` snapshot over a + * Redis pub/sub channel to the API process, which re-broadcasts it via + * `WsTreeService` (the single broadcast authority). + * + * It is registered ONLY in `CollabAppModule.providers`, so it never runs in the + * API process (where `PageWsListener` already broadcasts the same event locally). + * That module placement is what prevents a double broadcast. In single-process + * mode `CollabAppModule` is not loaded at all, so this publisher never runs. + */ +@Injectable() +export class PageTreeBridgePublisher { + private readonly logger = new Logger(PageTreeBridgePublisher.name); + private readonly redis: Redis; + + constructor(private readonly redisService: RedisService) { + this.redis = this.redisService.getOrThrow(); + } + + @OnEvent(EventName.PAGE_UPDATED) + async onPageUpdated(event: PageEvent): Promise { + // Mirror PageWsListener's gating: only title/icon changes carry a snapshot. + // Content-only saves leave `treeUpdate` undefined and are ignored. + if (!event.treeUpdate) return; + + try { + await this.redis.publish( + COLLAB_TREE_UPDATE_CHANNEL, + JSON.stringify(event.treeUpdate), + ); + } catch (err) { + // A Redis publish failure must not break the store path. + this.logger.error( + `Failed to publish tree update to ${COLLAB_TREE_UPDATE_CHANNEL}`, + err instanceof Error ? err.stack : String(err), + ); + } + } +} diff --git a/apps/server/src/collaboration/server/collab-app.module.ts b/apps/server/src/collaboration/server/collab-app.module.ts index aaa9ffba..e5aa43ec 100644 --- a/apps/server/src/collaboration/server/collab-app.module.ts +++ b/apps/server/src/collaboration/server/collab-app.module.ts @@ -20,6 +20,7 @@ import { CaslModule } from '../../core/casl/casl.module'; import { ThrottleModule } from '../../integrations/throttle/throttle.module'; import { CacheModule } from '@nestjs/cache-manager'; import KeyvRedis from '@keyv/redis'; +import { PageTreeBridgePublisher } from '../listeners/page-tree-bridge.publisher'; @Module({ imports: [ @@ -54,6 +55,6 @@ import KeyvRedis from '@keyv/redis'; ? [CollaborationController] : []), ], - providers: [AppService], + providers: [AppService, PageTreeBridgePublisher], }) export class CollabAppModule {} diff --git a/apps/server/src/ws/listeners/page-tree-bridge.subscriber.spec.ts b/apps/server/src/ws/listeners/page-tree-bridge.subscriber.spec.ts new file mode 100644 index 00000000..c1982ffc --- /dev/null +++ b/apps/server/src/ws/listeners/page-tree-bridge.subscriber.spec.ts @@ -0,0 +1,114 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import { PageTreeBridgeSubscriber } from './page-tree-bridge.subscriber'; +import { WsTreeService } from '../ws-tree.service'; +import { COLLAB_TREE_UPDATE_CHANNEL } from '../../collaboration/constants'; +import { TreeUpdateSnapshot } from '../../database/listeners/page.listener'; + +const treeUpdate: TreeUpdateSnapshot = { + id: 'page-1', + slugId: 'slug-1', + spaceId: 'space-1', + parentPageId: null, + title: 'Renamed', + icon: '🚀', +}; + +describe('PageTreeBridgeSubscriber.onMessage', () => { + let subscriber: PageTreeBridgeSubscriber; + let wsTree: { broadcastPageUpdated: jest.Mock }; + + beforeEach(async () => { + wsTree = { + broadcastPageUpdated: jest.fn().mockResolvedValue(undefined), + }; + // onMessage is driven directly; no real redis connection is needed. + const redisService = { + getOrThrow: () => ({ duplicate: () => ({}) }), + } as unknown as RedisService; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + PageTreeBridgeSubscriber, + { provide: RedisService, useValue: redisService }, + { provide: WsTreeService, useValue: wsTree }, + ], + }).compile(); + + subscriber = module.get(PageTreeBridgeSubscriber); + }); + + it('valid JSON on the channel: broadcasts the parsed snapshot', async () => { + await subscriber.onMessage( + COLLAB_TREE_UPDATE_CHANNEL, + JSON.stringify(treeUpdate), + ); + + expect(wsTree.broadcastPageUpdated).toHaveBeenCalledTimes(1); + expect(wsTree.broadcastPageUpdated).toHaveBeenCalledWith(treeUpdate); + }); + + it('malformed JSON: does NOT broadcast and does not throw', async () => { + const warnSpy = jest + .spyOn(subscriber['logger'], 'warn') + .mockImplementation(() => undefined); + + await expect( + subscriber.onMessage(COLLAB_TREE_UPDATE_CHANNEL, '{not json'), + ).resolves.toBeUndefined(); + + expect(wsTree.broadcastPageUpdated).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledTimes(1); + + warnSpy.mockRestore(); + }); + + it('message on a different channel: ignored', async () => { + await subscriber.onMessage('some:other:channel', JSON.stringify(treeUpdate)); + + expect(wsTree.broadcastPageUpdated).not.toHaveBeenCalled(); + }); + + it('broadcast rejects: onMessage does not throw / produce unhandled rejection', async () => { + wsTree.broadcastPageUpdated.mockRejectedValueOnce(new Error('db down')); + const warnSpy = jest + .spyOn(subscriber['logger'], 'warn') + .mockImplementation(() => undefined); + + await expect( + subscriber.onMessage( + COLLAB_TREE_UPDATE_CHANNEL, + JSON.stringify(treeUpdate), + ), + ).resolves.toBeUndefined(); + + expect(wsTree.broadcastPageUpdated).toHaveBeenCalledTimes(1); + expect(warnSpy).toHaveBeenCalledTimes(1); + + warnSpy.mockRestore(); + }); + + it('onModuleInit when subscribe() rejects: resolves without throwing', async () => { + const sub = { + on: jest.fn(), + subscribe: jest.fn().mockRejectedValue(new Error('redis down')), + }; + const redisService = { + getOrThrow: () => ({ duplicate: () => sub }), + } as unknown as RedisService; + const local = new PageTreeBridgeSubscriber( + redisService, + wsTree as unknown as WsTreeService, + ); + const errorSpy = jest + .spyOn(local['logger'], 'error') + .mockImplementation(() => undefined); + + await expect(local.onModuleInit()).resolves.toBeUndefined(); + + expect(sub.subscribe).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledTimes(1); + + errorSpy.mockRestore(); + }); +}); diff --git a/apps/server/src/ws/listeners/page-tree-bridge.subscriber.ts b/apps/server/src/ws/listeners/page-tree-bridge.subscriber.ts new file mode 100644 index 00000000..f6efda33 --- /dev/null +++ b/apps/server/src/ws/listeners/page-tree-bridge.subscriber.ts @@ -0,0 +1,115 @@ +import { + Injectable, + Logger, + OnModuleDestroy, + OnModuleInit, +} from '@nestjs/common'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; +import { COLLAB_TREE_UPDATE_CHANNEL } from '../../collaboration/constants'; +import { TreeUpdateSnapshot } from '../../database/listeners/page.listener'; +import { WsTreeService } from '../ws-tree.service'; + +/** + * API-process half of the cross-process tree-update bridge. + * + * It subscribes to the Redis pub/sub channel that the collab process's + * `PageTreeBridgePublisher` publishes to and re-broadcasts each collab-originated + * `treeUpdate` snapshot through `WsTreeService`. This is what makes a + * collaborative rename reach other users' sidebars in 2-process (COLLAB_URL set) + * deployments. The API process is the single broadcast authority: + * `broadcastPageUpdated` routes through the restriction-aware `emitTreeEvent`, so + * this path stays authorization-safe. + * + * In single-process mode this subscriber still subscribes, but nobody publishes + * (the publisher lives only in `CollabAppModule`), so it stays idle and harmless. + * + * NOTE: this assumes a SINGLE API broadcaster. With multiple horizontally-scaled + * API replicas, every replica would receive the pub/sub message and re-broadcast, + * duplicating the client update (the Socket.IO Redis adapter already fans a single + * emit out to all replicas' clients). Scaling the API horizontally would require a + * consumer-group / leader-election scheme instead of fan-out pub/sub. That is out + * of scope for the current single-API deployment. + */ +@Injectable() +export class PageTreeBridgeSubscriber + implements OnModuleInit, OnModuleDestroy +{ + private readonly logger = new Logger(PageTreeBridgeSubscriber.name); + private sub?: Redis; + + constructor( + private readonly redisService: RedisService, + private readonly wsTree: WsTreeService, + ) {} + + async onModuleInit(): Promise { + // A connection in subscribe mode cannot run other commands, so use a + // dedicated duplicated client (mirrors RedisSyncExtension's `sub`). + this.sub = this.redisService.getOrThrow().duplicate(); + // ioredis connections emit 'error' on disconnect/reconnect; an EventEmitter + // 'error' with no listener THROWS and can crash the process. The bridge is + // optional, so just log and stay alive (mirrors RedisSyncExtension). + this.sub.on('error', (err) => + this.logger.warn(`tree-update subscriber redis error: ${err?.message}`), + ); + this.sub.on('message', (channel, message) => + this.onMessage(channel, message), + ); + // The bridge is optional for core API operation: if Redis is down at boot, + // subscribe() rejects — log and continue rather than crash API bootstrap. + try { + await this.sub.subscribe(COLLAB_TREE_UPDATE_CHANNEL); + } catch (err) { + this.logger.error( + `Failed to subscribe to ${COLLAB_TREE_UPDATE_CHANNEL}; cross-process tree updates disabled: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + + async onMessage(channel: string, message: string): Promise { + if (channel !== COLLAB_TREE_UPDATE_CHANNEL) return; + + let snapshot: TreeUpdateSnapshot; + try { + snapshot = JSON.parse(message) as TreeUpdateSnapshot; + } catch (err) { + // Malformed payload must never throw out of the message handler. + this.logger.warn( + `Dropping malformed tree update on ${COLLAB_TREE_UPDATE_CHANNEL}: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + return; + } + + // broadcastPageUpdated -> emitTreeEvent does a DB permission read that can + // reject. ioredis does not await this handler, so a rejection would become + // an unhandled promise rejection — swallow it (warn, never rethrow). + try { + await this.wsTree.broadcastPageUpdated(snapshot); + } catch (err) { + this.logger.warn( + `Failed to broadcast tree update for page ${snapshot.id}: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + + async onModuleDestroy(): Promise { + if (!this.sub) return; + try { + await this.sub.unsubscribe(COLLAB_TREE_UPDATE_CHANNEL); + await this.sub.quit(); + } catch (err) { + this.logger.warn( + `Failed to tear down tree-update subscriber: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } +} diff --git a/apps/server/src/ws/ws.module.ts b/apps/server/src/ws/ws.module.ts index 400ee253..78cbdbaa 100644 --- a/apps/server/src/ws/ws.module.ts +++ b/apps/server/src/ws/ws.module.ts @@ -3,12 +3,19 @@ import { WsGateway } from './ws.gateway'; import { WsService } from './ws.service'; import { WsTreeService } from './ws-tree.service'; import { PageWsListener } from './listeners/page-ws.listener'; +import { PageTreeBridgeSubscriber } from './listeners/page-tree-bridge.subscriber'; import { TokenModule } from '../core/auth/token.module'; @Global() @Module({ imports: [TokenModule], - providers: [WsGateway, WsService, WsTreeService, PageWsListener], + providers: [ + WsGateway, + WsService, + WsTreeService, + PageWsListener, + PageTreeBridgeSubscriber, + ], exports: [WsGateway, WsService, WsTreeService], }) export class WsModule {}