Files
gitmost/apps/server/src/collaboration/collaboration.gateway.ts
claude code agent 227 2e7e07bb65 fix(git-sync): don't clobber pages with a live editing session; crash-safe body write
Review finding #5: the git -> page body write (writeBody) did a full-body replace
(delete-all + re-insert) on the shared Yjs doc. Applied while a human is editing
the page, it discarded their in-flight changes; and TiptapTransformer.toYdoc ran
AFTER the fragment was cleared, so a conversion failure could leave the page with
an empty body.

Fixes:
- Active-session guard: CollaborationGateway.getActiveEditorCount(documentName)
  reports live human (websocket) editor sessions for a doc, excluding server-side
  direct connections. writeBody now throws ActiveEditSessionError when an editor
  is connected. The engine's push loop already isolates each importPageMarkdown in
  try/catch and does not advance the loop-guard on failure, so the write is simply
  retried on the next poll once the editor disconnects — never a clobber.
- Crash-safe conversion: build the replacement Yjs update BEFORE opening the
  connection / clearing the fragment, so a transform failure can never leave the
  body empty.

Also updates the server-side converter gate spec to the corrected round-trip
shape: the block-image hoist no longer leaves a leading empty paragraph (the
git-sync converter fix in 7d39c16b, now reaching the built package).

A true merge of git content into a live Yjs session is out of scope (it needs a
real 3-way text merge with no shared update lineage); deferring the write while a
page is being edited is the safe, owner-approved minimum.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 20:38:46 +03:00

204 lines
6.8 KiB
TypeScript

import { Hocuspocus } from '@hocuspocus/server';
import { IncomingMessage } from 'http';
import WebSocket from 'ws';
import { AuthenticationExtension } from './extensions/authentication.extension';
import { PersistenceExtension } from './extensions/persistence.extension';
import { Injectable } from '@nestjs/common';
import { EnvironmentService } from '../integrations/environment/environment.service';
import {
createRetryStrategy,
parseRedisUrl,
RedisConfig,
} from '../common/helpers';
import { LoggerExtension } from './extensions/logger.extension';
import {
RedisSyncExtension,
SerializedHTTPRequest,
} from './extensions/redis-sync';
import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper';
import RedisClient from 'ioredis';
import { pack, unpack } from 'msgpackr';
import { nanoid } from 'nanoid';
import * as os from 'node:os';
import { CollabWsAdapter } from './adapter/collab-ws.adapter';
import {
CollaborationHandler,
CollabEventHandlers,
} from './collaboration.handler';
@Injectable()
export class CollaborationGateway {
private readonly hocuspocus: Hocuspocus;
private redisConfig: RedisConfig;
// @ts-ignore
private readonly redisSync: RedisSyncExtension<CollabEventHandlers> | null =
null;
private readonly withRedis: boolean;
constructor(
private authenticationExtension: AuthenticationExtension,
private persistenceExtension: PersistenceExtension,
private loggerExtension: LoggerExtension,
private environmentService: EnvironmentService,
private collabEventsService: CollaborationHandler,
) {
this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl());
this.withRedis = !this.environmentService.isCollabDisableRedis();
this.hocuspocus = new Hocuspocus({
debounce: 10000,
maxDebounce: 45000,
unloadImmediately: false,
extensions: [
this.authenticationExtension,
this.persistenceExtension,
this.loggerExtension,
],
});
if (this.withRedis) {
// @ts-ignore
this.redisSync = new RedisSyncExtension({
redis: new RedisClient({
host: this.redisConfig.host,
port: this.redisConfig.port,
password: this.redisConfig.password,
db: this.redisConfig.db,
family: this.redisConfig.family,
retryStrategy: createRetryStrategy(),
}),
serverId: `collab-${os?.hostname()}-${nanoid(10)}`,
prefix: 'collab',
pack,
unpack,
// @ts-ignore
customEvents: this.collabEventsService.getHandlers(this.hocuspocus),
});
this.hocuspocus.configuration.extensions.push(this.redisSync);
// @ts-ignore
this.redisSync.onConfigure({ instance: this.hocuspocus });
}
}
private serializeRequest(request: IncomingMessage): SerializedHTTPRequest {
return {
method: request.method ?? 'GET',
url: request.url ?? '/',
headers: {
'sec-websocket-key': request.headers['sec-websocket-key'] ?? '',
'sec-websocket-protocol':
request.headers['sec-websocket-protocol'] ?? '',
},
socket: { remoteAddress: request.socket?.remoteAddress ?? '' },
};
}
handleConnection(client: WebSocket, request: IncomingMessage): any {
if (this.redisSync) {
const serializedHTTPRequest = this.serializeRequest(request);
const socketId = serializedHTTPRequest.headers['sec-websocket-key'];
// Create wrapper socket that only receives events via emit()
// This prevents double-handling since Hocuspocus won't listen to raw WebSocket events
const wrappedSocket = new WsSocketWrapper(client);
// Route through RedisSync extension (this calls handleConnection internally)
this.redisSync.onSocketOpen(wrappedSocket as any, serializedHTTPRequest);
// Forward raw WebSocket messages to the extension
client.on('message', (data: ArrayBuffer) => {
this.redisSync!.onSocketMessage(
wrappedSocket as any,
serializedHTTPRequest,
data,
);
});
// Forward close events
client.on('close', (code: number, reason: Buffer) => {
this.redisSync!.onSocketClose(socketId, code, reason.buffer as ArrayBuffer);
});
// Forward pong events for keepalive
client.on('pong', (data: Buffer) => {
wrappedSocket.emit('pong', data);
});
} else {
// Fallback to direct Hocuspocus connection
this.hocuspocus.handleConnection(client, request);
}
}
getConnectionCount() {
return this.hocuspocus.getConnectionsCount();
}
getDocumentCount() {
return this.hocuspocus.getDocumentsCount();
}
/**
* Number of LIVE human editor sessions (websocket connections) currently open
* on a document, or 0 if the document is not loaded. Unlike
* `Document.getConnectionsCount()` this deliberately excludes server-side
* direct connections (`directConnectionsCount`, e.g. the git-sync writer
* itself), so callers can tell whether a real person is editing right now.
*
* NOTE: this reflects only THIS instance. In a Redis-clustered deployment an
* editor attached to another node is not counted; for the single-instance
* deployments this guards (git-sync) that is exactly the live set.
*/
getActiveEditorCount(documentName: string): number {
return this.hocuspocus.documents.get(documentName)?.connections.size ?? 0;
}
handleYjsEvent<TName extends keyof CollabEventHandlers>(
eventName: TName,
documentName: string,
payload: Parameters<CollabEventHandlers[TName]>[1],
) {
return this.redisSync?.handleEvent(eventName, documentName, payload);
}
openDirectConnection(documentName: string, context?: any) {
return this.hocuspocus.openDirectConnection(documentName, context);
}
/*
*Can be used before calling openDirectConnection directly
*/
async lockDocument(documentName: string) {
return this.redisSync.lockDocument(documentName);
}
/*
*Releases a document lock and stops the interval that maintains it.
*/
async releaseLock(documentName: string) {
return this.redisSync.releaseLock(documentName);
}
async destroy(collabWsAdapter: CollabWsAdapter): Promise<void> {
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
try {
// Wait for all documents to unload
this.hocuspocus.configuration.extensions.push({
async afterUnloadDocument({ instance }) {
if (instance.getDocumentsCount() === 0) resolve('');
},
});
collabWsAdapter?.close();
if (this.hocuspocus.getDocumentsCount() === 0) resolve('');
this.hocuspocus.closeConnections();
} catch (error) {
console.error(error);
}
});
await this.hocuspocus.hooks('onDestroy', { instance: this.hocuspocus });
}
}