fix(offline-sync): bridge collaborative tree updates across processes via Redis
In 2-process deployments (COLLAB_URL set) the standalone collab process runs Hocuspocus onStoreDocument, which emits PAGE_UPDATED with a treeUpdate snapshot on a collaborative rename. But CollabAppModule has no WsModule, so PageWsListener (the broadcaster) only exists in the API process — the collab-originated tree update never reached clients, and other users' sidebars/breadcrumbs went stale. Bridge it over Redis pub/sub with the API process as the single broadcast authority: - PageTreeBridgePublisher (registered ONLY in CollabAppModule) listens for PAGE_UPDATED and, when a treeUpdate snapshot is present, publishes it to the collab:tree-update channel. Gated exactly like PageWsListener so content-only saves never publish noise. - PageTreeBridgeSubscriber (registered in WsModule, API process) subscribes on a dedicated duplicated connection and re-broadcasts each snapshot through WsTreeService.broadcastPageUpdated — the same restriction-aware emitTreeEvent path, so authorization is preserved. Double-broadcast is prevented by module placement: the publisher lives only in the standalone collab process's root module, so in single-process mode it is never loaded and the local PageWsListener stays the sole broadcaster. The bridge is optional and fail-safe: publish errors, malformed payloads, broadcast rejections, an unlistened 'error' on the subscriber connection, and a subscribe() failure at boot are all caught and logged, never crashing or blocking the process. NOTE: assumes a single API broadcaster; horizontal API scaling would need a consumer-group/leader-election instead of fan-out pub/sub. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
114
apps/server/src/ws/listeners/page-tree-bridge.subscriber.spec.ts
Normal file
114
apps/server/src/ws/listeners/page-tree-bridge.subscriber.spec.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { Test, TestingModule } from '@nestjs/testing';
|
||||
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||
import { PageTreeBridgeSubscriber } from './page-tree-bridge.subscriber';
|
||||
import { WsTreeService } from '../ws-tree.service';
|
||||
import { COLLAB_TREE_UPDATE_CHANNEL } from '../../collaboration/constants';
|
||||
import { TreeUpdateSnapshot } from '../../database/listeners/page.listener';
|
||||
|
||||
const treeUpdate: TreeUpdateSnapshot = {
|
||||
id: 'page-1',
|
||||
slugId: 'slug-1',
|
||||
spaceId: 'space-1',
|
||||
parentPageId: null,
|
||||
title: 'Renamed',
|
||||
icon: '🚀',
|
||||
};
|
||||
|
||||
describe('PageTreeBridgeSubscriber.onMessage', () => {
|
||||
let subscriber: PageTreeBridgeSubscriber;
|
||||
let wsTree: { broadcastPageUpdated: jest.Mock };
|
||||
|
||||
beforeEach(async () => {
|
||||
wsTree = {
|
||||
broadcastPageUpdated: jest.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
// onMessage is driven directly; no real redis connection is needed.
|
||||
const redisService = {
|
||||
getOrThrow: () => ({ duplicate: () => ({}) }),
|
||||
} as unknown as RedisService;
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
PageTreeBridgeSubscriber,
|
||||
{ provide: RedisService, useValue: redisService },
|
||||
{ provide: WsTreeService, useValue: wsTree },
|
||||
],
|
||||
}).compile();
|
||||
|
||||
subscriber = module.get<PageTreeBridgeSubscriber>(PageTreeBridgeSubscriber);
|
||||
});
|
||||
|
||||
it('valid JSON on the channel: broadcasts the parsed snapshot', async () => {
|
||||
await subscriber.onMessage(
|
||||
COLLAB_TREE_UPDATE_CHANNEL,
|
||||
JSON.stringify(treeUpdate),
|
||||
);
|
||||
|
||||
expect(wsTree.broadcastPageUpdated).toHaveBeenCalledTimes(1);
|
||||
expect(wsTree.broadcastPageUpdated).toHaveBeenCalledWith(treeUpdate);
|
||||
});
|
||||
|
||||
it('malformed JSON: does NOT broadcast and does not throw', async () => {
|
||||
const warnSpy = jest
|
||||
.spyOn(subscriber['logger'], 'warn')
|
||||
.mockImplementation(() => undefined);
|
||||
|
||||
await expect(
|
||||
subscriber.onMessage(COLLAB_TREE_UPDATE_CHANNEL, '{not json'),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(wsTree.broadcastPageUpdated).not.toHaveBeenCalled();
|
||||
expect(warnSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('message on a different channel: ignored', async () => {
|
||||
await subscriber.onMessage('some:other:channel', JSON.stringify(treeUpdate));
|
||||
|
||||
expect(wsTree.broadcastPageUpdated).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('broadcast rejects: onMessage does not throw / produce unhandled rejection', async () => {
|
||||
wsTree.broadcastPageUpdated.mockRejectedValueOnce(new Error('db down'));
|
||||
const warnSpy = jest
|
||||
.spyOn(subscriber['logger'], 'warn')
|
||||
.mockImplementation(() => undefined);
|
||||
|
||||
await expect(
|
||||
subscriber.onMessage(
|
||||
COLLAB_TREE_UPDATE_CHANNEL,
|
||||
JSON.stringify(treeUpdate),
|
||||
),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(wsTree.broadcastPageUpdated).toHaveBeenCalledTimes(1);
|
||||
expect(warnSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('onModuleInit when subscribe() rejects: resolves without throwing', async () => {
|
||||
const sub = {
|
||||
on: jest.fn(),
|
||||
subscribe: jest.fn().mockRejectedValue(new Error('redis down')),
|
||||
};
|
||||
const redisService = {
|
||||
getOrThrow: () => ({ duplicate: () => sub }),
|
||||
} as unknown as RedisService;
|
||||
const local = new PageTreeBridgeSubscriber(
|
||||
redisService,
|
||||
wsTree as unknown as WsTreeService,
|
||||
);
|
||||
const errorSpy = jest
|
||||
.spyOn(local['logger'], 'error')
|
||||
.mockImplementation(() => undefined);
|
||||
|
||||
await expect(local.onModuleInit()).resolves.toBeUndefined();
|
||||
|
||||
expect(sub.subscribe).toHaveBeenCalledTimes(1);
|
||||
expect(errorSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
errorSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
115
apps/server/src/ws/listeners/page-tree-bridge.subscriber.ts
Normal file
115
apps/server/src/ws/listeners/page-tree-bridge.subscriber.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import {
|
||||
Injectable,
|
||||
Logger,
|
||||
OnModuleDestroy,
|
||||
OnModuleInit,
|
||||
} from '@nestjs/common';
|
||||
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||
import type { Redis } from 'ioredis';
|
||||
import { COLLAB_TREE_UPDATE_CHANNEL } from '../../collaboration/constants';
|
||||
import { TreeUpdateSnapshot } from '../../database/listeners/page.listener';
|
||||
import { WsTreeService } from '../ws-tree.service';
|
||||
|
||||
/**
|
||||
* API-process half of the cross-process tree-update bridge.
|
||||
*
|
||||
* It subscribes to the Redis pub/sub channel that the collab process's
|
||||
* `PageTreeBridgePublisher` publishes to and re-broadcasts each collab-originated
|
||||
* `treeUpdate` snapshot through `WsTreeService`. This is what makes a
|
||||
* collaborative rename reach other users' sidebars in 2-process (COLLAB_URL set)
|
||||
* deployments. The API process is the single broadcast authority:
|
||||
* `broadcastPageUpdated` routes through the restriction-aware `emitTreeEvent`, so
|
||||
* this path stays authorization-safe.
|
||||
*
|
||||
* In single-process mode this subscriber still subscribes, but nobody publishes
|
||||
* (the publisher lives only in `CollabAppModule`), so it stays idle and harmless.
|
||||
*
|
||||
* NOTE: this assumes a SINGLE API broadcaster. With multiple horizontally-scaled
|
||||
* API replicas, every replica would receive the pub/sub message and re-broadcast,
|
||||
* duplicating the client update (the Socket.IO Redis adapter already fans a single
|
||||
* emit out to all replicas' clients). Scaling the API horizontally would require a
|
||||
* consumer-group / leader-election scheme instead of fan-out pub/sub. That is out
|
||||
* of scope for the current single-API deployment.
|
||||
*/
|
||||
@Injectable()
|
||||
export class PageTreeBridgeSubscriber
|
||||
implements OnModuleInit, OnModuleDestroy
|
||||
{
|
||||
private readonly logger = new Logger(PageTreeBridgeSubscriber.name);
|
||||
private sub?: Redis;
|
||||
|
||||
constructor(
|
||||
private readonly redisService: RedisService,
|
||||
private readonly wsTree: WsTreeService,
|
||||
) {}
|
||||
|
||||
async onModuleInit(): Promise<void> {
|
||||
// A connection in subscribe mode cannot run other commands, so use a
|
||||
// dedicated duplicated client (mirrors RedisSyncExtension's `sub`).
|
||||
this.sub = this.redisService.getOrThrow().duplicate();
|
||||
// ioredis connections emit 'error' on disconnect/reconnect; an EventEmitter
|
||||
// 'error' with no listener THROWS and can crash the process. The bridge is
|
||||
// optional, so just log and stay alive (mirrors RedisSyncExtension).
|
||||
this.sub.on('error', (err) =>
|
||||
this.logger.warn(`tree-update subscriber redis error: ${err?.message}`),
|
||||
);
|
||||
this.sub.on('message', (channel, message) =>
|
||||
this.onMessage(channel, message),
|
||||
);
|
||||
// The bridge is optional for core API operation: if Redis is down at boot,
|
||||
// subscribe() rejects — log and continue rather than crash API bootstrap.
|
||||
try {
|
||||
await this.sub.subscribe(COLLAB_TREE_UPDATE_CHANNEL);
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`Failed to subscribe to ${COLLAB_TREE_UPDATE_CHANNEL}; cross-process tree updates disabled: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async onMessage(channel: string, message: string): Promise<void> {
|
||||
if (channel !== COLLAB_TREE_UPDATE_CHANNEL) return;
|
||||
|
||||
let snapshot: TreeUpdateSnapshot;
|
||||
try {
|
||||
snapshot = JSON.parse(message) as TreeUpdateSnapshot;
|
||||
} catch (err) {
|
||||
// Malformed payload must never throw out of the message handler.
|
||||
this.logger.warn(
|
||||
`Dropping malformed tree update on ${COLLAB_TREE_UPDATE_CHANNEL}: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// broadcastPageUpdated -> emitTreeEvent does a DB permission read that can
|
||||
// reject. ioredis does not await this handler, so a rejection would become
|
||||
// an unhandled promise rejection — swallow it (warn, never rethrow).
|
||||
try {
|
||||
await this.wsTree.broadcastPageUpdated(snapshot);
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`Failed to broadcast tree update for page ${snapshot.id}: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
if (!this.sub) return;
|
||||
try {
|
||||
await this.sub.unsubscribe(COLLAB_TREE_UPDATE_CHANNEL);
|
||||
await this.sub.quit();
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`Failed to tear down tree-update subscriber: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,19 @@ import { WsGateway } from './ws.gateway';
|
||||
import { WsService } from './ws.service';
|
||||
import { WsTreeService } from './ws-tree.service';
|
||||
import { PageWsListener } from './listeners/page-ws.listener';
|
||||
import { PageTreeBridgeSubscriber } from './listeners/page-tree-bridge.subscriber';
|
||||
import { TokenModule } from '../core/auth/token.module';
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
imports: [TokenModule],
|
||||
providers: [WsGateway, WsService, WsTreeService, PageWsListener],
|
||||
providers: [
|
||||
WsGateway,
|
||||
WsService,
|
||||
WsTreeService,
|
||||
PageWsListener,
|
||||
PageTreeBridgeSubscriber,
|
||||
],
|
||||
exports: [WsGateway, WsService, WsTreeService],
|
||||
})
|
||||
export class WsModule {}
|
||||
|
||||
Reference in New Issue
Block a user