Review finding #5: the git -> page body write (writeBody) did a full-body replace
(delete-all + re-insert) on the shared Yjs doc. Applied while a human is editing
the page, it discarded their in-flight changes; and TiptapTransformer.toYdoc ran
AFTER the fragment was cleared, so a conversion failure could leave the page with
an empty body.
Fixes:
- Active-session guard: CollaborationGateway.getActiveEditorCount(documentName)
reports live human (websocket) editor sessions for a doc, excluding server-side
direct connections. writeBody now throws ActiveEditSessionError when an editor
is connected. The engine's push loop already isolates each importPageMarkdown in
try/catch and does not advance the loop-guard on failure, so the write is simply
retried on the next poll once the editor disconnects — never a clobber.
- Crash-safe conversion: build the replacement Yjs update BEFORE opening the
connection / clearing the fragment, so a transform failure can never leave the
body empty.
Also updates the server-side converter gate spec to the corrected round-trip
shape: the block-image hoist no longer leaves a leading empty paragraph (the
git-sync converter fix in 7d39c16b, now reaching the built package).
A true merge of git content into a live Yjs session is out of scope (it needs a
real 3-way text merge with no shared update lineage); deferring the write while a
page is being edited is the safe, owner-approved minimum.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
204 lines
6.8 KiB
TypeScript
204 lines
6.8 KiB
TypeScript
import { Hocuspocus } from '@hocuspocus/server';
|
|
import { IncomingMessage } from 'http';
|
|
import WebSocket from 'ws';
|
|
import { AuthenticationExtension } from './extensions/authentication.extension';
|
|
import { PersistenceExtension } from './extensions/persistence.extension';
|
|
import { Injectable } from '@nestjs/common';
|
|
import { EnvironmentService } from '../integrations/environment/environment.service';
|
|
import {
|
|
createRetryStrategy,
|
|
parseRedisUrl,
|
|
RedisConfig,
|
|
} from '../common/helpers';
|
|
import { LoggerExtension } from './extensions/logger.extension';
|
|
import {
|
|
RedisSyncExtension,
|
|
SerializedHTTPRequest,
|
|
} from './extensions/redis-sync';
|
|
import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper';
|
|
import RedisClient from 'ioredis';
|
|
import { pack, unpack } from 'msgpackr';
|
|
import { nanoid } from 'nanoid';
|
|
import * as os from 'node:os';
|
|
import { CollabWsAdapter } from './adapter/collab-ws.adapter';
|
|
import {
|
|
CollaborationHandler,
|
|
CollabEventHandlers,
|
|
} from './collaboration.handler';
|
|
|
|
@Injectable()
|
|
export class CollaborationGateway {
|
|
private readonly hocuspocus: Hocuspocus;
|
|
private redisConfig: RedisConfig;
|
|
// @ts-ignore
|
|
private readonly redisSync: RedisSyncExtension<CollabEventHandlers> | null =
|
|
null;
|
|
private readonly withRedis: boolean;
|
|
|
|
constructor(
|
|
private authenticationExtension: AuthenticationExtension,
|
|
private persistenceExtension: PersistenceExtension,
|
|
private loggerExtension: LoggerExtension,
|
|
private environmentService: EnvironmentService,
|
|
private collabEventsService: CollaborationHandler,
|
|
) {
|
|
this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl());
|
|
this.withRedis = !this.environmentService.isCollabDisableRedis();
|
|
|
|
this.hocuspocus = new Hocuspocus({
|
|
debounce: 10000,
|
|
maxDebounce: 45000,
|
|
unloadImmediately: false,
|
|
extensions: [
|
|
this.authenticationExtension,
|
|
this.persistenceExtension,
|
|
this.loggerExtension,
|
|
],
|
|
});
|
|
|
|
if (this.withRedis) {
|
|
// @ts-ignore
|
|
this.redisSync = new RedisSyncExtension({
|
|
redis: new RedisClient({
|
|
host: this.redisConfig.host,
|
|
port: this.redisConfig.port,
|
|
password: this.redisConfig.password,
|
|
db: this.redisConfig.db,
|
|
family: this.redisConfig.family,
|
|
retryStrategy: createRetryStrategy(),
|
|
}),
|
|
serverId: `collab-${os?.hostname()}-${nanoid(10)}`,
|
|
prefix: 'collab',
|
|
pack,
|
|
unpack,
|
|
// @ts-ignore
|
|
customEvents: this.collabEventsService.getHandlers(this.hocuspocus),
|
|
});
|
|
this.hocuspocus.configuration.extensions.push(this.redisSync);
|
|
// @ts-ignore
|
|
this.redisSync.onConfigure({ instance: this.hocuspocus });
|
|
}
|
|
}
|
|
|
|
private serializeRequest(request: IncomingMessage): SerializedHTTPRequest {
|
|
return {
|
|
method: request.method ?? 'GET',
|
|
url: request.url ?? '/',
|
|
headers: {
|
|
'sec-websocket-key': request.headers['sec-websocket-key'] ?? '',
|
|
'sec-websocket-protocol':
|
|
request.headers['sec-websocket-protocol'] ?? '',
|
|
},
|
|
socket: { remoteAddress: request.socket?.remoteAddress ?? '' },
|
|
};
|
|
}
|
|
|
|
handleConnection(client: WebSocket, request: IncomingMessage): any {
|
|
if (this.redisSync) {
|
|
const serializedHTTPRequest = this.serializeRequest(request);
|
|
const socketId = serializedHTTPRequest.headers['sec-websocket-key'];
|
|
|
|
// Create wrapper socket that only receives events via emit()
|
|
// This prevents double-handling since Hocuspocus won't listen to raw WebSocket events
|
|
const wrappedSocket = new WsSocketWrapper(client);
|
|
|
|
// Route through RedisSync extension (this calls handleConnection internally)
|
|
this.redisSync.onSocketOpen(wrappedSocket as any, serializedHTTPRequest);
|
|
|
|
// Forward raw WebSocket messages to the extension
|
|
client.on('message', (data: ArrayBuffer) => {
|
|
this.redisSync!.onSocketMessage(
|
|
wrappedSocket as any,
|
|
serializedHTTPRequest,
|
|
data,
|
|
);
|
|
});
|
|
|
|
// Forward close events
|
|
client.on('close', (code: number, reason: Buffer) => {
|
|
this.redisSync!.onSocketClose(socketId, code, reason.buffer as ArrayBuffer);
|
|
});
|
|
|
|
// Forward pong events for keepalive
|
|
client.on('pong', (data: Buffer) => {
|
|
wrappedSocket.emit('pong', data);
|
|
});
|
|
} else {
|
|
// Fallback to direct Hocuspocus connection
|
|
this.hocuspocus.handleConnection(client, request);
|
|
}
|
|
}
|
|
|
|
getConnectionCount() {
|
|
return this.hocuspocus.getConnectionsCount();
|
|
}
|
|
|
|
getDocumentCount() {
|
|
return this.hocuspocus.getDocumentsCount();
|
|
}
|
|
|
|
/**
|
|
* Number of LIVE human editor sessions (websocket connections) currently open
|
|
* on a document, or 0 if the document is not loaded. Unlike
|
|
* `Document.getConnectionsCount()` this deliberately excludes server-side
|
|
* direct connections (`directConnectionsCount`, e.g. the git-sync writer
|
|
* itself), so callers can tell whether a real person is editing right now.
|
|
*
|
|
* NOTE: this reflects only THIS instance. In a Redis-clustered deployment an
|
|
* editor attached to another node is not counted; for the single-instance
|
|
* deployments this guards (git-sync) that is exactly the live set.
|
|
*/
|
|
getActiveEditorCount(documentName: string): number {
|
|
return this.hocuspocus.documents.get(documentName)?.connections.size ?? 0;
|
|
}
|
|
|
|
handleYjsEvent<TName extends keyof CollabEventHandlers>(
|
|
eventName: TName,
|
|
documentName: string,
|
|
payload: Parameters<CollabEventHandlers[TName]>[1],
|
|
) {
|
|
return this.redisSync?.handleEvent(eventName, documentName, payload);
|
|
}
|
|
|
|
openDirectConnection(documentName: string, context?: any) {
|
|
return this.hocuspocus.openDirectConnection(documentName, context);
|
|
}
|
|
|
|
/*
|
|
*Can be used before calling openDirectConnection directly
|
|
*/
|
|
async lockDocument(documentName: string) {
|
|
return this.redisSync.lockDocument(documentName);
|
|
}
|
|
|
|
/*
|
|
*Releases a document lock and stops the interval that maintains it.
|
|
*/
|
|
async releaseLock(documentName: string) {
|
|
return this.redisSync.releaseLock(documentName);
|
|
}
|
|
|
|
async destroy(collabWsAdapter: CollabWsAdapter): Promise<void> {
|
|
// eslint-disable-next-line no-async-promise-executor
|
|
await new Promise(async (resolve) => {
|
|
try {
|
|
// Wait for all documents to unload
|
|
this.hocuspocus.configuration.extensions.push({
|
|
async afterUnloadDocument({ instance }) {
|
|
if (instance.getDocumentsCount() === 0) resolve('');
|
|
},
|
|
});
|
|
|
|
collabWsAdapter?.close();
|
|
|
|
if (this.hocuspocus.getDocumentsCount() === 0) resolve('');
|
|
this.hocuspocus.closeConnections();
|
|
} catch (error) {
|
|
console.error(error);
|
|
}
|
|
});
|
|
|
|
await this.hocuspocus.hooks('onDestroy', { instance: this.hocuspocus });
|
|
}
|
|
}
|