Files
gitmost/apps/server/src/collaboration/collaboration.gateway.ts
claude code agent 227 9933079de2 fix(offline-sync): keep page titles in sync between REST and Yjs
Title now lives in the page's Yjs 'title' fragment, but two paths corrupted it:

- Rename-revert: a REST/MCP title change wrote only the page.title column,
  never the Yjs fragment, so the next editor open replayed the stale Yjs title
  and reverted the rename. PageService.update now mirrors the new title into the
  Yjs 'title' fragment via CollaborationGateway.writePageTitle, which goes
  through openDirectConnection directly (Redis-independent: works with
  COLLAB_DISABLE_REDIS and in single-process deployments, unlike the
  Redis-routed handleYjsEvent path). The write is best-effort: a Yjs failure is
  logged and never rolls back the committed column write. Agent provenance
  (actor/aiChatId) is threaded into the store context.

- Untitled-on-open: an empty/just-initialized 'title' fragment clobbered a
  non-empty page.title to '' on open. onStoreDocument now treats the title as
  changed only when the extracted text is non-empty, covering both the
  title-only and body+title save branches. Empty-retitling via collab is
  intentionally impossible; the REST DTO is the place to enforce non-empty.

writeTitleFragment does a full clear+seed of the 'title' fragment (no
duplication/concatenation) and leaves the body fragment intact. Removed the dead
useTreeMutation.handleRename path. Adds unit tests for writeTitleFragment, the
gateway write, the anti-empty-clobber guard, and agent provenance.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 21:05:58 +03:00

230 lines
7.8 KiB
TypeScript

import { Hocuspocus } from '@hocuspocus/server';
import { IncomingMessage } from 'http';
import WebSocket from 'ws';
import { AuthenticationExtension } from './extensions/authentication.extension';
import { PersistenceExtension } from './extensions/persistence.extension';
import { Injectable } from '@nestjs/common';
import { EnvironmentService } from '../integrations/environment/environment.service';
import {
createRetryStrategy,
parseRedisUrl,
RedisConfig,
} from '../common/helpers';
import { LoggerExtension } from './extensions/logger.extension';
import {
RedisSyncExtension,
SerializedHTTPRequest,
} from './extensions/redis-sync';
import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper';
import RedisClient from 'ioredis';
import { pack, unpack } from 'msgpackr';
import { nanoid } from 'nanoid';
import * as os from 'node:os';
import { CollabWsAdapter } from './adapter/collab-ws.adapter';
import {
CollaborationHandler,
CollabEventHandlers,
writeTitleFragment,
} from './collaboration.handler';
import { User } from '@docmost/db/types/entity.types';
@Injectable()
export class CollaborationGateway {
private readonly hocuspocus: Hocuspocus;
private redisConfig: RedisConfig;
// @ts-ignore
private readonly redisSync: RedisSyncExtension<CollabEventHandlers> | null =
null;
private readonly withRedis: boolean;
constructor(
private authenticationExtension: AuthenticationExtension,
private persistenceExtension: PersistenceExtension,
private loggerExtension: LoggerExtension,
private environmentService: EnvironmentService,
private collabEventsService: CollaborationHandler,
) {
this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl());
this.withRedis = !this.environmentService.isCollabDisableRedis();
this.hocuspocus = new Hocuspocus({
debounce: 10000,
maxDebounce: 45000,
unloadImmediately: false,
extensions: [
this.authenticationExtension,
this.persistenceExtension,
this.loggerExtension,
],
});
if (this.withRedis) {
// @ts-ignore
this.redisSync = new RedisSyncExtension({
redis: new RedisClient({
host: this.redisConfig.host,
port: this.redisConfig.port,
password: this.redisConfig.password,
db: this.redisConfig.db,
family: this.redisConfig.family,
retryStrategy: createRetryStrategy(),
}),
serverId: `collab-${os?.hostname()}-${nanoid(10)}`,
prefix: 'collab',
pack,
unpack,
// @ts-ignore
customEvents: this.collabEventsService.getHandlers(this.hocuspocus),
});
this.hocuspocus.configuration.extensions.push(this.redisSync);
// @ts-ignore
this.redisSync.onConfigure({ instance: this.hocuspocus });
}
}
private serializeRequest(request: IncomingMessage): SerializedHTTPRequest {
return {
method: request.method ?? 'GET',
url: request.url ?? '/',
headers: {
'sec-websocket-key': request.headers['sec-websocket-key'] ?? '',
'sec-websocket-protocol':
request.headers['sec-websocket-protocol'] ?? '',
},
socket: { remoteAddress: request.socket?.remoteAddress ?? '' },
};
}
handleConnection(client: WebSocket, request: IncomingMessage): any {
if (this.redisSync) {
const serializedHTTPRequest = this.serializeRequest(request);
const socketId = serializedHTTPRequest.headers['sec-websocket-key'];
// Create wrapper socket that only receives events via emit()
// This prevents double-handling since Hocuspocus won't listen to raw WebSocket events
const wrappedSocket = new WsSocketWrapper(client);
// Route through RedisSync extension (this calls handleConnection internally)
this.redisSync.onSocketOpen(wrappedSocket as any, serializedHTTPRequest);
// Forward raw WebSocket messages to the extension
client.on('message', (data: ArrayBuffer) => {
this.redisSync!.onSocketMessage(
wrappedSocket as any,
serializedHTTPRequest,
data,
);
});
// Forward close events
client.on('close', (code: number, reason: Buffer) => {
this.redisSync!.onSocketClose(socketId, code, reason.buffer as ArrayBuffer);
});
// Forward pong events for keepalive
client.on('pong', (data: Buffer) => {
wrappedSocket.emit('pong', data);
});
} else {
// Fallback to direct Hocuspocus connection
this.hocuspocus.handleConnection(client, request);
}
}
getConnectionCount() {
return this.hocuspocus.getConnectionsCount();
}
getDocumentCount() {
return this.hocuspocus.getDocumentsCount();
}
handleYjsEvent<TName extends keyof CollabEventHandlers>(
eventName: TName,
documentName: string,
payload: Parameters<CollabEventHandlers[TName]>[1],
) {
return this.redisSync?.handleEvent(eventName, documentName, payload);
}
openDirectConnection(documentName: string, context?: any) {
return this.hocuspocus.openDirectConnection(documentName, context);
}
/**
* Write a new page title INTO the page's Yjs 'title' fragment, Redis-INDEPENDENT.
*
* Unlike the Redis-routed `handleYjsEvent` path — which routes through
* `redisSync?.handleEvent` and SILENTLY no-ops when Redis is disabled
* (COLLAB_DISABLE_REDIS=true → redisSync === null) — this goes straight
* through the local Hocuspocus `openDirectConnection`. The title sync
* therefore works in BOTH single-process (no Redis) and Redis-clustered
* deployments.
*
* openDirectConnection loads the doc from persistence when no editor is
* connected, so this works whether or not an editor is currently open: the
* clear+reseed lands on the loaded doc and is persisted by onStoreDocument.
*
* Provenance: when the caller is the agent, the actor/aiChatId are threaded
* into the connection `context` so onStoreDocument sees `context.actor ===
* 'agent'` for the resulting title store (mirrors the body/REST path). The
* resulting title store is usually a no-op anyway — PageService already wrote
* the same title to the page.title column, so onStoreDocument's
* `titleText !== page.title` guard skips the column write — but we wire the
* context for correctness regardless.
*/
async writePageTitle(
pageId: string,
title: string,
context?: { user?: User; actor?: string; aiChatId?: string },
): Promise<void> {
const documentName = `page.${pageId}`;
const connection = await this.hocuspocus.openDirectConnection(
documentName,
context ?? {},
);
try {
await connection.transact((doc) => writeTitleFragment(doc, title));
} finally {
await connection.disconnect();
}
}
/*
*Can be used before calling openDirectConnection directly
*/
async lockDocument(documentName: string) {
return this.redisSync.lockDocument(documentName);
}
/*
*Releases a document lock and stops the interval that maintains it.
*/
async releaseLock(documentName: string) {
return this.redisSync.releaseLock(documentName);
}
async destroy(collabWsAdapter: CollabWsAdapter): Promise<void> {
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
try {
// Wait for all documents to unload
this.hocuspocus.configuration.extensions.push({
async afterUnloadDocument({ instance }) {
if (instance.getDocumentsCount() === 0) resolve('');
},
});
collabWsAdapter?.close();
if (this.hocuspocus.getDocumentsCount() === 0) resolve('');
this.hocuspocus.closeConnections();
} catch (error) {
console.error(error);
}
});
await this.hocuspocus.hooks('onDestroy', { instance: this.hocuspocus });
}
}