Files
gitmost/apps/server/src/collaboration/collaboration.gateway.ts
T
claude code agent 227 8274720281 fix(server): close leaked redis sockets so e2e jest exits (#252)
The full-AppModule e2e (apps/server/test/app.e2e-spec.ts) passed but jest
never exited, burning CI to its timeout. Diagnosis (process._getActiveHandles
after app.close()) showed exactly two ioredis sockets to :6379 still open after
shutdown; everything else (BullMQ queues/workers, @nestjs/schedule intervals,
nestjs-ioredis, nestjs-kysely pg pool, @nestjs/cache-manager Keyv store,
hocuspocus pub/sub) already closes on app.close().

The two leaks were owned-but-never-closed clients:

1. ThrottleModule passed a pre-built `new Redis(...)` instance to
   ThrottlerStorageRedisService. With an instance, the lib sets
   disconnectRequired=false, so its onModuleDestroy never disconnects.
   Pass ioredis options instead so the service owns + disconnects the client.

2. CollaborationGateway created a source `new RedisClient(...)` that
   RedisSyncExtension only duplicates into pub/sub; the extension's onDestroy
   disconnects those duplicates but not the source. Keep a reference and
   disconnect it after the hocuspocus onDestroy hook in destroy().

Both are real lifecycle fixes (production shutdown is now clean too), so no
--forceExit is needed. Verified against real Postgres+Redis:
  - test:e2e (no forceExit, --runInBand) exits 0 in ~18s (was: hung forever)
  - --detectOpenHandles exits 0 with no open-handle report
  - active handles after app.close(): none
CI timeout-minutes safety nets left untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 04:11:51 +03:00

200 lines
6.6 KiB
TypeScript

import { Hocuspocus } from '@hocuspocus/server';
import { IncomingMessage } from 'http';
import WebSocket from 'ws';
import { AuthenticationExtension } from './extensions/authentication.extension';
import { PersistenceExtension } from './extensions/persistence.extension';
import { Injectable } from '@nestjs/common';
import { EnvironmentService } from '../integrations/environment/environment.service';
import {
createRetryStrategy,
parseRedisUrl,
RedisConfig,
} from '../common/helpers';
import { LoggerExtension } from './extensions/logger.extension';
import {
RedisSyncExtension,
SerializedHTTPRequest,
} from './extensions/redis-sync';
import { WsSocketWrapper } from './extensions/redis-sync/ws-socket-wrapper';
import RedisClient from 'ioredis';
import { pack, unpack } from 'msgpackr';
import { nanoid } from 'nanoid';
import * as os from 'node:os';
import { CollabWsAdapter } from './adapter/collab-ws.adapter';
import {
CollaborationHandler,
CollabEventHandlers,
} from './collaboration.handler';
@Injectable()
export class CollaborationGateway {
private readonly hocuspocus: Hocuspocus;
private redisConfig: RedisConfig;
// @ts-ignore
private readonly redisSync: RedisSyncExtension<CollabEventHandlers> | null =
null;
// Source ioredis client that RedisSyncExtension duplicates into its pub/sub
// pair. The extension's onDestroy only disconnects those duplicates, so we
// keep a reference here and disconnect the source ourselves on shutdown
// (otherwise the socket leaks and jest never exits in e2e).
private redisClient: RedisClient | null = null;
private readonly withRedis: boolean;
constructor(
private authenticationExtension: AuthenticationExtension,
private persistenceExtension: PersistenceExtension,
private loggerExtension: LoggerExtension,
private environmentService: EnvironmentService,
private collabEventsService: CollaborationHandler,
) {
this.redisConfig = parseRedisUrl(this.environmentService.getRedisUrl());
this.withRedis = !this.environmentService.isCollabDisableRedis();
this.hocuspocus = new Hocuspocus({
debounce: 10000,
maxDebounce: 45000,
unloadImmediately: false,
extensions: [
this.authenticationExtension,
this.persistenceExtension,
this.loggerExtension,
],
});
if (this.withRedis) {
this.redisClient = new RedisClient({
host: this.redisConfig.host,
port: this.redisConfig.port,
password: this.redisConfig.password,
db: this.redisConfig.db,
family: this.redisConfig.family,
retryStrategy: createRetryStrategy(),
});
// @ts-ignore
this.redisSync = new RedisSyncExtension({
redis: this.redisClient,
serverId: `collab-${os?.hostname()}-${nanoid(10)}`,
prefix: 'collab',
pack,
unpack,
// @ts-ignore
customEvents: this.collabEventsService.getHandlers(this.hocuspocus),
});
this.hocuspocus.configuration.extensions.push(this.redisSync);
// @ts-ignore
this.redisSync.onConfigure({ instance: this.hocuspocus });
}
}
private serializeRequest(request: IncomingMessage): SerializedHTTPRequest {
return {
method: request.method ?? 'GET',
url: request.url ?? '/',
headers: {
'sec-websocket-key': request.headers['sec-websocket-key'] ?? '',
'sec-websocket-protocol':
request.headers['sec-websocket-protocol'] ?? '',
},
socket: { remoteAddress: request.socket?.remoteAddress ?? '' },
};
}
handleConnection(client: WebSocket, request: IncomingMessage): any {
if (this.redisSync) {
const serializedHTTPRequest = this.serializeRequest(request);
const socketId = serializedHTTPRequest.headers['sec-websocket-key'];
// Create wrapper socket that only receives events via emit()
// This prevents double-handling since Hocuspocus won't listen to raw WebSocket events
const wrappedSocket = new WsSocketWrapper(client);
// Route through RedisSync extension (this calls handleConnection internally)
this.redisSync.onSocketOpen(wrappedSocket as any, serializedHTTPRequest);
// Forward raw WebSocket messages to the extension
client.on('message', (data: ArrayBuffer) => {
this.redisSync!.onSocketMessage(
wrappedSocket as any,
serializedHTTPRequest,
data,
);
});
// Forward close events
client.on('close', (code: number, reason: Buffer) => {
this.redisSync!.onSocketClose(socketId, code, reason.buffer as ArrayBuffer);
});
// Forward pong events for keepalive
client.on('pong', (data: Buffer) => {
wrappedSocket.emit('pong', data);
});
} else {
// Fallback to direct Hocuspocus connection
this.hocuspocus.handleConnection(client, request);
}
}
getConnectionCount() {
return this.hocuspocus.getConnectionsCount();
}
getDocumentCount() {
return this.hocuspocus.getDocumentsCount();
}
handleYjsEvent<TName extends keyof CollabEventHandlers>(
eventName: TName,
documentName: string,
payload: Parameters<CollabEventHandlers[TName]>[1],
) {
return this.redisSync?.handleEvent(eventName, documentName, payload);
}
openDirectConnection(documentName: string, context?: any) {
return this.hocuspocus.openDirectConnection(documentName, context);
}
/*
*Can be used before calling openDirectConnection directly
*/
async lockDocument(documentName: string) {
return this.redisSync.lockDocument(documentName);
}
/*
*Releases a document lock and stops the interval that maintains it.
*/
async releaseLock(documentName: string) {
return this.redisSync.releaseLock(documentName);
}
async destroy(collabWsAdapter: CollabWsAdapter): Promise<void> {
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
try {
// Wait for all documents to unload
this.hocuspocus.configuration.extensions.push({
async afterUnloadDocument({ instance }) {
if (instance.getDocumentsCount() === 0) resolve('');
},
});
collabWsAdapter?.close();
if (this.hocuspocus.getDocumentsCount() === 0) resolve('');
this.hocuspocus.closeConnections();
} catch (error) {
console.error(error);
}
});
await this.hocuspocus.hooks('onDestroy', { instance: this.hocuspocus });
// RedisSyncExtension.onDestroy (run via the hook above) disconnects only the
// duplicated pub/sub clients; the source client created here is ours to close.
this.redisClient?.disconnect();
this.redisClient = null;
}
}