fix(git-sync): propagate remote custom-event handler errors instead of 30s timeout
When a git-sync body write (gitSyncWriteBody) is routed to the collab instance
that owns the doc, the handler runs remotely inside handleRedisMessage and CAN
throw (markdown->ProseMirror transform). Previously the throw was uncaught: the
customEventComplete reply was never published, so the origin's writePageBody
promise only rejected after customEventTTL (~30s) as a generic 'TIMEOUT', and an
unhandledRejection escaped the async messageBuffer listener on the owning
instance.
Now the owner wraps handleEventLocally in try/catch and, on throw, publishes a
customEventComplete carrying an `error` field on the same correlation channel.
The origin's pendingReplies holds {resolve, reject} and rejects promptly with the
real Error. The TTL TIMEOUT remains as the fallback for a genuinely lost reply.
The no-throw and local (same-instance) paths are unchanged.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,208 @@
|
||||
// Regression coverage for the custom-event request/reply protocol in the
|
||||
// RedisSyncExtension. git-sync routes its body write through a custom event
|
||||
// (`gitSyncWriteBody`) which, when the target doc is owned by a DIFFERENT collab
|
||||
// instance, runs REMOTELY inside `handleRedisMessage` on the owning instance. The
|
||||
// remote handler can THROW (markdown->ProseMirror transform on a malformed body).
|
||||
//
|
||||
// Before the fix the throw was uncaught: (1) no `customEventComplete` reply was
|
||||
// published, so the origin's awaiting promise only rejected after `customEventTTL`
|
||||
// (~30s) as a generic 'TIMEOUT', and (2) an unhandledRejection escaped the async
|
||||
// `messageBuffer` listener on the owning instance. These tests assert the throw is
|
||||
// turned into an error-carrying reply that rejects the origin PROMPTLY with the
|
||||
// real message, with the no-throw and local paths unchanged.
|
||||
|
||||
import { RedisSyncExtension } from './redis-sync.extension';
|
||||
|
||||
type Listener = (channel: Buffer, message: Buffer) => unknown;
|
||||
|
||||
// Minimal in-memory pub/sub + lock store shared across FakeRedis duplicates,
|
||||
// modelling the two-instance topology (origin + owner) over one Redis.
|
||||
class FakeRedisBus {
|
||||
instances: FakeRedis[] = [];
|
||||
locks = new Map<string, string>();
|
||||
published: { channel: string; message: Buffer }[] = [];
|
||||
|
||||
register(inst: FakeRedis) {
|
||||
this.instances.push(inst);
|
||||
}
|
||||
|
||||
publish(channel: string, message: Buffer) {
|
||||
this.published.push({ channel, message });
|
||||
for (const inst of this.instances) {
|
||||
if (!inst.subscribed.has(channel)) continue;
|
||||
for (const listener of inst.messageListeners) {
|
||||
// ioredis delivers async; `void` mirrors the production listener
|
||||
// registration (`sub.on('messageBuffer', ...)`), whose rejection would
|
||||
// surface as an unhandledRejection if the handler did not catch.
|
||||
void listener(Buffer.from(channel), message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FakeRedis {
|
||||
subscribed = new Set<string>();
|
||||
messageListeners: Listener[] = [];
|
||||
|
||||
constructor(private bus: FakeRedisBus) {
|
||||
bus.register(this);
|
||||
}
|
||||
|
||||
duplicate() {
|
||||
return new FakeRedis(this.bus);
|
||||
}
|
||||
|
||||
subscribe(...channels: string[]) {
|
||||
for (const c of channels) this.subscribed.add(c);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
on(event: string, cb: any) {
|
||||
if (event === 'messageBuffer') this.messageListeners.push(cb as Listener);
|
||||
return this;
|
||||
}
|
||||
|
||||
publish(channel: string, message: Buffer) {
|
||||
this.bus.publish(channel, message);
|
||||
return Promise.resolve(1);
|
||||
}
|
||||
|
||||
// Models `SET key val PX ttl NX GET`: only writes when absent (NX); returns the
|
||||
// previous value (GET) so the origin observes the owner already holding the lock.
|
||||
set(key: string, val: string, ...args: any[]) {
|
||||
const hasNX = args.includes('NX');
|
||||
const hasGET = args.includes('GET');
|
||||
const old = this.bus.locks.get(key) ?? null;
|
||||
if (!hasNX || old === null) this.bus.locks.set(key, val);
|
||||
return Promise.resolve(hasGET ? old : 'OK');
|
||||
}
|
||||
|
||||
del(key: string) {
|
||||
this.bus.locks.delete(key);
|
||||
return Promise.resolve(1);
|
||||
}
|
||||
|
||||
disconnect() {}
|
||||
}
|
||||
|
||||
const pack = (m: any) => Buffer.from(JSON.stringify(m));
|
||||
const unpack = (b: Buffer) => JSON.parse(b.toString());
|
||||
|
||||
function makeExtension(
|
||||
bus: FakeRedisBus,
|
||||
serverId: string,
|
||||
customEvents: Record<string, (doc: string, payload: any) => Promise<any>>,
|
||||
) {
|
||||
const ext = new RedisSyncExtension({
|
||||
redis: new FakeRedis(bus) as any,
|
||||
pack: pack as any,
|
||||
unpack: unpack as any,
|
||||
serverId,
|
||||
customEvents: customEvents as any,
|
||||
customEventTTL: 30_000,
|
||||
});
|
||||
// Doc is NOT loaded on this instance -> handleEvent takes the remote/proxy path.
|
||||
(ext as any).instance = { documents: new Map() };
|
||||
return ext;
|
||||
}
|
||||
|
||||
describe('RedisSyncExtension custom-event error propagation', () => {
|
||||
let unhandled: unknown[];
|
||||
let onUnhandled: (e: unknown) => void;
|
||||
|
||||
beforeEach(() => {
|
||||
// Fake timers so the 30s TTL fallback timer never fires (and never dangles).
|
||||
jest.useFakeTimers();
|
||||
unhandled = [];
|
||||
onUnhandled = (e) => unhandled.push(e);
|
||||
process.on('unhandledRejection', onUnhandled);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
process.off('unhandledRejection', onUnhandled);
|
||||
jest.useRealTimers();
|
||||
});
|
||||
|
||||
const flush = async () => {
|
||||
for (let i = 0; i < 10; i++) await Promise.resolve();
|
||||
};
|
||||
|
||||
it('owner publishes an error-carrying reply (no unhandledRejection) when the remote handler throws', async () => {
|
||||
const bus = new FakeRedisBus();
|
||||
const owner = makeExtension(bus, 'owner', {
|
||||
boom: async () => {
|
||||
throw new Error('kaboom');
|
||||
},
|
||||
});
|
||||
|
||||
// Drive the remote branch directly, as if the origin's customEventStart arrived.
|
||||
await (owner as any).handleRedisMessage(
|
||||
Buffer.from('collabMsg:owner'),
|
||||
pack({
|
||||
type: 'customEventStart',
|
||||
documentName: 'page.x',
|
||||
eventName: 'boom',
|
||||
payload: {},
|
||||
replyTo: 'collabMsg:origin',
|
||||
replyId: 7,
|
||||
}),
|
||||
);
|
||||
await flush();
|
||||
|
||||
const replies = bus.published
|
||||
.filter((p) => p.channel === 'collabMsg:origin')
|
||||
.map((p) => unpack(p.message));
|
||||
expect(replies).toHaveLength(1);
|
||||
expect(replies[0]).toMatchObject({
|
||||
type: 'customEventComplete',
|
||||
replyId: 7,
|
||||
error: 'kaboom',
|
||||
});
|
||||
expect(unhandled).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('origin rejects PROMPTLY with the real error (not a TTL TIMEOUT) when the remote handler throws', async () => {
|
||||
const bus = new FakeRedisBus();
|
||||
// Owner already holds the document lock.
|
||||
bus.locks.set('collabLock:page.x', 'owner');
|
||||
makeExtension(bus, 'owner', {
|
||||
boom: async () => {
|
||||
throw new Error('kaboom');
|
||||
},
|
||||
});
|
||||
const origin = makeExtension(bus, 'origin', {
|
||||
boom: async () => undefined,
|
||||
});
|
||||
|
||||
const promise = (origin as any).handleEvent('boom', 'page.x', { foo: 1 });
|
||||
// Attach a catch immediately so a rejection is never momentarily unhandled.
|
||||
const settled = promise.then(
|
||||
() => ({ ok: true as const }),
|
||||
(e: unknown) => ({ ok: false as const, error: e }),
|
||||
);
|
||||
|
||||
await flush();
|
||||
// Resolves WITHOUT advancing any timer -> the 30s TIMEOUT fallback did not fire.
|
||||
const result = await settled;
|
||||
expect(result.ok).toBe(false);
|
||||
expect((result as any).error).toBeInstanceOf(Error);
|
||||
expect(((result as any).error as Error).message).toBe('kaboom');
|
||||
expect(unhandled).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('origin resolves with the payload when the remote handler succeeds (unchanged behavior)', async () => {
|
||||
const bus = new FakeRedisBus();
|
||||
bus.locks.set('collabLock:page.x', 'owner');
|
||||
makeExtension(bus, 'owner', {
|
||||
ok: async (_doc: string, payload: any) => ({ echoed: payload }),
|
||||
});
|
||||
const origin = makeExtension(bus, 'origin', {
|
||||
ok: async () => undefined,
|
||||
});
|
||||
|
||||
const promise = (origin as any).handleEvent('ok', 'page.x', { foo: 1 });
|
||||
await flush();
|
||||
await expect(promise).resolves.toEqual({ echoed: { foo: 1 } });
|
||||
expect(unhandled).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
@@ -51,9 +51,15 @@ export class RedisSyncExtension<TCE extends CustomEvents> implements Extension {
|
||||
private instance!: Hocuspocus;
|
||||
private readonly customEvents: TCE;
|
||||
private replyIdCounter: number = 0;
|
||||
// @ts-ignore
|
||||
private pendingReplies: Record<number, PromiseWithResolvers<any>['resolve']> =
|
||||
{};
|
||||
private pendingReplies: Record<
|
||||
number,
|
||||
{
|
||||
// @ts-ignore
|
||||
resolve: PromiseWithResolvers<any>['resolve'];
|
||||
// @ts-ignore
|
||||
reject: PromiseWithResolvers<any>['reject'];
|
||||
}
|
||||
> = {};
|
||||
|
||||
constructor(configuration: Configuration<TCE>) {
|
||||
const {
|
||||
@@ -176,25 +182,45 @@ export class RedisSyncExtension<TCE extends CustomEvents> implements Extension {
|
||||
}
|
||||
if (type === 'customEventStart') {
|
||||
const { documentName, eventName, payload, replyTo, replyId } = msg;
|
||||
const res = await this.handleEventLocally(
|
||||
eventName as Extract<keyof TCE, string>,
|
||||
documentName,
|
||||
payload,
|
||||
);
|
||||
const reply: RSAMessageCustomEventComplete = {
|
||||
type: 'customEventComplete',
|
||||
replyId,
|
||||
payload: res,
|
||||
};
|
||||
let reply: RSAMessageCustomEventComplete;
|
||||
try {
|
||||
const res = await this.handleEventLocally(
|
||||
eventName as Extract<keyof TCE, string>,
|
||||
documentName,
|
||||
payload,
|
||||
);
|
||||
reply = {
|
||||
type: 'customEventComplete',
|
||||
replyId,
|
||||
payload: res,
|
||||
};
|
||||
} catch (err) {
|
||||
// The remote handler threw (e.g. the markdown->ProseMirror transform in
|
||||
// gitSyncWriteBody can throw on a malformed body). Reply with the error on
|
||||
// the SAME correlation channel so the origin rejects promptly with the real
|
||||
// message instead of waiting out customEventTTL as a generic 'TIMEOUT'.
|
||||
// Catching here also keeps the throw from escaping this async messageBuffer
|
||||
// listener as an unhandledRejection on the owning instance.
|
||||
reply = {
|
||||
type: 'customEventComplete',
|
||||
replyId,
|
||||
payload: undefined,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
}
|
||||
this.pub.publish(`${replyTo}`, this.pack(reply));
|
||||
return;
|
||||
}
|
||||
if (type === 'customEventComplete') {
|
||||
const { replyId, payload } = msg;
|
||||
const resolveFn = this.pendingReplies[replyId];
|
||||
if (!resolveFn) return;
|
||||
const { replyId, payload, error } = msg;
|
||||
const pending = this.pendingReplies[replyId];
|
||||
if (!pending) return;
|
||||
delete this.pendingReplies[replyId];
|
||||
resolveFn(payload);
|
||||
if (error !== undefined) {
|
||||
pending.reject(new Error(error));
|
||||
} else {
|
||||
pending.resolve(payload);
|
||||
}
|
||||
return;
|
||||
}
|
||||
const { socketId } = msg;
|
||||
@@ -273,11 +299,22 @@ export class RedisSyncExtension<TCE extends CustomEvents> implements Extension {
|
||||
};
|
||||
const msg = this.pack(proxyMessage);
|
||||
this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg);
|
||||
// @ts-ignore
|
||||
const { promise, resolve, reject } = Promise.withResolvers();
|
||||
this.pendingReplies[replyId] = resolve;
|
||||
// Manual deferred (no Promise.withResolvers) so this runs on Node < 22 too.
|
||||
let resolve!: (v: unknown) => void;
|
||||
let reject!: (e: unknown) => void;
|
||||
const promise = new Promise((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
this.pendingReplies[replyId] = { resolve, reject };
|
||||
setTimeout(() => {
|
||||
reject('TIMEOUT');
|
||||
// Fallback for a genuinely lost reply. A handler that threw now rejects
|
||||
// promptly via the error-carrying customEventComplete above; this TIMEOUT
|
||||
// only fires when no reply ever comes back.
|
||||
if (this.pendingReplies[replyId]) {
|
||||
delete this.pendingReplies[replyId];
|
||||
reject('TIMEOUT');
|
||||
}
|
||||
}, this.customEventTTL);
|
||||
return promise as Promise<ReturnType<TCE[TName]>>;
|
||||
}
|
||||
|
||||
@@ -72,6 +72,10 @@ export type RSAMessageCustomEventComplete = {
|
||||
type: 'customEventComplete';
|
||||
replyId: number;
|
||||
payload: unknown;
|
||||
// When the remote handler THREW, the owner sends back the error message here
|
||||
// instead of a payload, so the origin can reject its awaiting promise promptly
|
||||
// (with the real error) rather than waiting out the customEventTTL timeout.
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type RSAMessage =
|
||||
|
||||
@@ -49,6 +49,12 @@ type XmlNode = Y.XmlElement | Y.XmlText | Y.XmlHook;
|
||||
* stable Yjs block (and any in-flight human edit on it) stays put. This mirrors
|
||||
* `canonicalize.ts`, which already strips the regenerated block `id` from the
|
||||
* round-trip idempotency comparison for exactly the same reason.
|
||||
*
|
||||
* Known limitation (accepted trade-off of content-based matching): two GENUINELY
|
||||
* DISTINCT blocks whose content is byte-identical now collapse to the same content
|
||||
* key, so when git deletes one of the duplicates the LCS may drop the OTHER live
|
||||
* instance instead. The visible result is identical (one copy removed, one kept),
|
||||
* but a concurrent in-flight human edit on the dropped instance could be lost.
|
||||
*/
|
||||
const VOLATILE_KEY_ATTRS = new Set(['id']);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user