diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.spec.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.spec.ts new file mode 100644 index 00000000..2a6ffbc8 --- /dev/null +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.spec.ts @@ -0,0 +1,208 @@ +// Regression coverage for the custom-event request/reply protocol in the +// RedisSyncExtension. git-sync routes its body write through a custom event +// (`gitSyncWriteBody`) which, when the target doc is owned by a DIFFERENT collab +// instance, runs REMOTELY inside `handleRedisMessage` on the owning instance. The +// remote handler can THROW (markdown->ProseMirror transform on a malformed body). +// +// Before the fix the throw was uncaught: (1) no `customEventComplete` reply was +// published, so the origin's awaiting promise only rejected after `customEventTTL` +// (~30s) as a generic 'TIMEOUT', and (2) an unhandledRejection escaped the async +// `messageBuffer` listener on the owning instance. These tests assert the throw is +// turned into an error-carrying reply that rejects the origin PROMPTLY with the +// real message, with the no-throw and local paths unchanged. + +import { RedisSyncExtension } from './redis-sync.extension'; + +type Listener = (channel: Buffer, message: Buffer) => unknown; + +// Minimal in-memory pub/sub + lock store shared across FakeRedis duplicates, +// modelling the two-instance topology (origin + owner) over one Redis. +class FakeRedisBus { + instances: FakeRedis[] = []; + locks = new Map(); + published: { channel: string; message: Buffer }[] = []; + + register(inst: FakeRedis) { + this.instances.push(inst); + } + + publish(channel: string, message: Buffer) { + this.published.push({ channel, message }); + for (const inst of this.instances) { + if (!inst.subscribed.has(channel)) continue; + for (const listener of inst.messageListeners) { + // ioredis delivers async; `void` mirrors the production listener + // registration (`sub.on('messageBuffer', ...)`), whose rejection would + // surface as an unhandledRejection if the handler did not catch. + void listener(Buffer.from(channel), message); + } + } + } +} + +class FakeRedis { + subscribed = new Set(); + messageListeners: Listener[] = []; + + constructor(private bus: FakeRedisBus) { + bus.register(this); + } + + duplicate() { + return new FakeRedis(this.bus); + } + + subscribe(...channels: string[]) { + for (const c of channels) this.subscribed.add(c); + return Promise.resolve(); + } + + on(event: string, cb: any) { + if (event === 'messageBuffer') this.messageListeners.push(cb as Listener); + return this; + } + + publish(channel: string, message: Buffer) { + this.bus.publish(channel, message); + return Promise.resolve(1); + } + + // Models `SET key val PX ttl NX GET`: only writes when absent (NX); returns the + // previous value (GET) so the origin observes the owner already holding the lock. + set(key: string, val: string, ...args: any[]) { + const hasNX = args.includes('NX'); + const hasGET = args.includes('GET'); + const old = this.bus.locks.get(key) ?? null; + if (!hasNX || old === null) this.bus.locks.set(key, val); + return Promise.resolve(hasGET ? old : 'OK'); + } + + del(key: string) { + this.bus.locks.delete(key); + return Promise.resolve(1); + } + + disconnect() {} +} + +const pack = (m: any) => Buffer.from(JSON.stringify(m)); +const unpack = (b: Buffer) => JSON.parse(b.toString()); + +function makeExtension( + bus: FakeRedisBus, + serverId: string, + customEvents: Record Promise>, +) { + const ext = new RedisSyncExtension({ + redis: new FakeRedis(bus) as any, + pack: pack as any, + unpack: unpack as any, + serverId, + customEvents: customEvents as any, + customEventTTL: 30_000, + }); + // Doc is NOT loaded on this instance -> handleEvent takes the remote/proxy path. + (ext as any).instance = { documents: new Map() }; + return ext; +} + +describe('RedisSyncExtension custom-event error propagation', () => { + let unhandled: unknown[]; + let onUnhandled: (e: unknown) => void; + + beforeEach(() => { + // Fake timers so the 30s TTL fallback timer never fires (and never dangles). + jest.useFakeTimers(); + unhandled = []; + onUnhandled = (e) => unhandled.push(e); + process.on('unhandledRejection', onUnhandled); + }); + + afterEach(() => { + process.off('unhandledRejection', onUnhandled); + jest.useRealTimers(); + }); + + const flush = async () => { + for (let i = 0; i < 10; i++) await Promise.resolve(); + }; + + it('owner publishes an error-carrying reply (no unhandledRejection) when the remote handler throws', async () => { + const bus = new FakeRedisBus(); + const owner = makeExtension(bus, 'owner', { + boom: async () => { + throw new Error('kaboom'); + }, + }); + + // Drive the remote branch directly, as if the origin's customEventStart arrived. + await (owner as any).handleRedisMessage( + Buffer.from('collabMsg:owner'), + pack({ + type: 'customEventStart', + documentName: 'page.x', + eventName: 'boom', + payload: {}, + replyTo: 'collabMsg:origin', + replyId: 7, + }), + ); + await flush(); + + const replies = bus.published + .filter((p) => p.channel === 'collabMsg:origin') + .map((p) => unpack(p.message)); + expect(replies).toHaveLength(1); + expect(replies[0]).toMatchObject({ + type: 'customEventComplete', + replyId: 7, + error: 'kaboom', + }); + expect(unhandled).toHaveLength(0); + }); + + it('origin rejects PROMPTLY with the real error (not a TTL TIMEOUT) when the remote handler throws', async () => { + const bus = new FakeRedisBus(); + // Owner already holds the document lock. + bus.locks.set('collabLock:page.x', 'owner'); + makeExtension(bus, 'owner', { + boom: async () => { + throw new Error('kaboom'); + }, + }); + const origin = makeExtension(bus, 'origin', { + boom: async () => undefined, + }); + + const promise = (origin as any).handleEvent('boom', 'page.x', { foo: 1 }); + // Attach a catch immediately so a rejection is never momentarily unhandled. + const settled = promise.then( + () => ({ ok: true as const }), + (e: unknown) => ({ ok: false as const, error: e }), + ); + + await flush(); + // Resolves WITHOUT advancing any timer -> the 30s TIMEOUT fallback did not fire. + const result = await settled; + expect(result.ok).toBe(false); + expect((result as any).error).toBeInstanceOf(Error); + expect(((result as any).error as Error).message).toBe('kaboom'); + expect(unhandled).toHaveLength(0); + }); + + it('origin resolves with the payload when the remote handler succeeds (unchanged behavior)', async () => { + const bus = new FakeRedisBus(); + bus.locks.set('collabLock:page.x', 'owner'); + makeExtension(bus, 'owner', { + ok: async (_doc: string, payload: any) => ({ echoed: payload }), + }); + const origin = makeExtension(bus, 'origin', { + ok: async () => undefined, + }); + + const promise = (origin as any).handleEvent('ok', 'page.x', { foo: 1 }); + await flush(); + await expect(promise).resolves.toEqual({ echoed: { foo: 1 } }); + expect(unhandled).toHaveLength(0); + }); +}); diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts index 38747465..c2b7e743 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.extension.ts @@ -51,9 +51,15 @@ export class RedisSyncExtension implements Extension { private instance!: Hocuspocus; private readonly customEvents: TCE; private replyIdCounter: number = 0; - // @ts-ignore - private pendingReplies: Record['resolve']> = - {}; + private pendingReplies: Record< + number, + { + // @ts-ignore + resolve: PromiseWithResolvers['resolve']; + // @ts-ignore + reject: PromiseWithResolvers['reject']; + } + > = {}; constructor(configuration: Configuration) { const { @@ -176,25 +182,45 @@ export class RedisSyncExtension implements Extension { } if (type === 'customEventStart') { const { documentName, eventName, payload, replyTo, replyId } = msg; - const res = await this.handleEventLocally( - eventName as Extract, - documentName, - payload, - ); - const reply: RSAMessageCustomEventComplete = { - type: 'customEventComplete', - replyId, - payload: res, - }; + let reply: RSAMessageCustomEventComplete; + try { + const res = await this.handleEventLocally( + eventName as Extract, + documentName, + payload, + ); + reply = { + type: 'customEventComplete', + replyId, + payload: res, + }; + } catch (err) { + // The remote handler threw (e.g. the markdown->ProseMirror transform in + // gitSyncWriteBody can throw on a malformed body). Reply with the error on + // the SAME correlation channel so the origin rejects promptly with the real + // message instead of waiting out customEventTTL as a generic 'TIMEOUT'. + // Catching here also keeps the throw from escaping this async messageBuffer + // listener as an unhandledRejection on the owning instance. + reply = { + type: 'customEventComplete', + replyId, + payload: undefined, + error: err instanceof Error ? err.message : String(err), + }; + } this.pub.publish(`${replyTo}`, this.pack(reply)); return; } if (type === 'customEventComplete') { - const { replyId, payload } = msg; - const resolveFn = this.pendingReplies[replyId]; - if (!resolveFn) return; + const { replyId, payload, error } = msg; + const pending = this.pendingReplies[replyId]; + if (!pending) return; delete this.pendingReplies[replyId]; - resolveFn(payload); + if (error !== undefined) { + pending.reject(new Error(error)); + } else { + pending.resolve(payload); + } return; } const { socketId } = msg; @@ -273,11 +299,22 @@ export class RedisSyncExtension implements Extension { }; const msg = this.pack(proxyMessage); this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg); - // @ts-ignore - const { promise, resolve, reject } = Promise.withResolvers(); - this.pendingReplies[replyId] = resolve; + // Manual deferred (no Promise.withResolvers) so this runs on Node < 22 too. + let resolve!: (v: unknown) => void; + let reject!: (e: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + this.pendingReplies[replyId] = { resolve, reject }; setTimeout(() => { - reject('TIMEOUT'); + // Fallback for a genuinely lost reply. A handler that threw now rejects + // promptly via the error-carrying customEventComplete above; this TIMEOUT + // only fires when no reply ever comes back. + if (this.pendingReplies[replyId]) { + delete this.pendingReplies[replyId]; + reject('TIMEOUT'); + } }, this.customEventTTL); return promise as Promise>; } diff --git a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts index 1bbab80a..6cd5e2ac 100644 --- a/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts +++ b/apps/server/src/collaboration/extensions/redis-sync/redis-sync.types.ts @@ -72,6 +72,10 @@ export type RSAMessageCustomEventComplete = { type: 'customEventComplete'; replyId: number; payload: unknown; + // When the remote handler THREW, the owner sends back the error message here + // instead of a payload, so the origin can reject its awaiting promise promptly + // (with the real error) rather than waiting out the customEventTTL timeout. + error?: string; }; export type RSAMessage = diff --git a/apps/server/src/integrations/git-sync/services/yjs-body-merge.ts b/apps/server/src/integrations/git-sync/services/yjs-body-merge.ts index 62decbca..db8a8c5c 100644 --- a/apps/server/src/integrations/git-sync/services/yjs-body-merge.ts +++ b/apps/server/src/integrations/git-sync/services/yjs-body-merge.ts @@ -49,6 +49,12 @@ type XmlNode = Y.XmlElement | Y.XmlText | Y.XmlHook; * stable Yjs block (and any in-flight human edit on it) stays put. This mirrors * `canonicalize.ts`, which already strips the regenerated block `id` from the * round-trip idempotency comparison for exactly the same reason. + * + * Known limitation (accepted trade-off of content-based matching): two GENUINELY + * DISTINCT blocks whose content is byte-identical now collapse to the same content + * key, so when git deletes one of the duplicates the LCS may drop the OTHER live + * instance instead. The visible result is identical (one copy removed, one kept), + * but a concurrent in-flight human edit on the dropped instance could be lost. */ const VOLATILE_KEY_ATTRS = new Set(['id']);