fix(collab): retry transient store failures so autosave edits aren't lost (#206)
persist-1: onStoreDocument wrapped the page write in a try/catch that only logged and swallowed the error, then resolved "successfully". hocuspocus destroys/unloads the in-memory Y.Doc right after the hook resolves (the only copy of the latest edit), so a transient DB error (deadlock, serialization failure, dropped connection) silently lost the edit. Worse, the post-store branch ran on the partially-assigned `page`, broadcasting a phantom "page.updated" and enqueueing a history snapshot for content never written. Wrap the write in a small bounded retry (3 attempts) so the save is re-attempted while we still hold the doc, and clear `page` on failure so the success-only side effects never report a save that didn't happen. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -182,4 +182,46 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot'
|
|||||||
expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled();
|
expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled();
|
||||||
expect(historyQueue.add).not.toHaveBeenCalled();
|
expect(historyQueue.add).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// persist-1 — a transient DB failure during store must not silently lose the
|
||||||
|
// edit. hocuspocus unloads (destroys) the in-memory Y.Doc right after this
|
||||||
|
// hook resolves, so the store has to retry while it still holds the only copy.
|
||||||
|
it('retries a transient DB failure and still persists the edit (persist-1)', async () => {
|
||||||
|
const document = ydocFor(doc('NEW HUMAN CONTENT'));
|
||||||
|
pageRepo.findById.mockResolvedValue(persistedHumanPage('NEW HUMAN CONTENT'));
|
||||||
|
let attempts = 0;
|
||||||
|
pageRepo.updatePage.mockImplementation(async () => {
|
||||||
|
attempts += 1;
|
||||||
|
if (attempts === 1) throw new Error('deadlock detected'); // transient
|
||||||
|
callOrder.push('updatePage');
|
||||||
|
});
|
||||||
|
|
||||||
|
await ext.onStoreDocument(buildData(document, 'user') as any);
|
||||||
|
|
||||||
|
// First attempt failed and rolled back; the retry persisted the edit.
|
||||||
|
expect(pageRepo.updatePage).toHaveBeenCalledTimes(2);
|
||||||
|
// The edit WAS saved, so the post-store success path runs as normal.
|
||||||
|
expect((document as any).broadcastStateless).toHaveBeenCalledTimes(1);
|
||||||
|
expect(historyQueue.add).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
// persist-1 — when every attempt fails the hook must NOT report a phantom
|
||||||
|
// success: no "page.updated" badge broadcast and no history snapshot for
|
||||||
|
// content that was never written.
|
||||||
|
it('does not run post-store side effects when every store attempt fails (persist-1)', async () => {
|
||||||
|
const document = ydocFor(doc('NEW HUMAN CONTENT'));
|
||||||
|
pageRepo.findById.mockResolvedValue(persistedHumanPage('NEW HUMAN CONTENT'));
|
||||||
|
pageRepo.updatePage.mockRejectedValue(new Error('connection reset'));
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
ext.onStoreDocument(buildData(document, 'user') as any),
|
||||||
|
).resolves.toBeUndefined();
|
||||||
|
|
||||||
|
// Bounded retry exhausted (MAX_STORE_ATTEMPTS).
|
||||||
|
expect(pageRepo.updatePage).toHaveBeenCalledTimes(3);
|
||||||
|
// No false-success: nothing downstream fires for the unsaved content.
|
||||||
|
expect((document as any).broadcastStateless).not.toHaveBeenCalled();
|
||||||
|
expect(historyQueue.add).not.toHaveBeenCalled();
|
||||||
|
expect(aiQueue.add).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -181,6 +181,17 @@ export class PersistenceExtension implements Extension {
|
|||||||
context?.actor,
|
context?.actor,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Persist with a small bounded retry. The in-memory Y.Doc is the ONLY copy
|
||||||
|
// of the latest edit until this hook returns: hocuspocus destroys/unloads the
|
||||||
|
// doc right after onStoreDocument resolves (see storeDocumentHooks' finally
|
||||||
|
// -> unloadDocument). If a transient DB error (deadlock, serialization
|
||||||
|
// failure, dropped connection) is merely logged and swallowed, the function
|
||||||
|
// resolves "successfully", the doc is unloaded, and the edit is lost silently
|
||||||
|
// (#206 persist-1). Retrying here re-attempts the write while we still hold
|
||||||
|
// the doc; on total failure we clear `page` so the post-store side effects
|
||||||
|
// (badge broadcast, history snapshot) never report a save that didn't happen.
|
||||||
|
const MAX_STORE_ATTEMPTS = 3;
|
||||||
|
for (let attempt = 1; attempt <= MAX_STORE_ATTEMPTS; attempt++) {
|
||||||
try {
|
try {
|
||||||
await executeTx(this.db, async (trx) => {
|
await executeTx(this.db, async (trx) => {
|
||||||
page = await this.pageRepo.findById(pageId, {
|
page = await this.pageRepo.findById(pageId, {
|
||||||
@@ -216,22 +227,29 @@ export class PersistenceExtension implements Extension {
|
|||||||
// Approach A — boundary snapshot before the agent's first edit.
|
// Approach A — boundary snapshot before the agent's first edit.
|
||||||
// When this store is the agent's and the page's currently persisted
|
// When this store is the agent's and the page's currently persisted
|
||||||
// state was authored by a human, pin that human state as its own
|
// state was authored by a human, pin that human state as its own
|
||||||
// history version BEFORE the agent overwrites it. `page` still holds the
|
// history version BEFORE the agent overwrites it. `page` still holds
|
||||||
// OLD content/provenance here, so saveHistory(page) captures the
|
// the OLD content/provenance here, so saveHistory(page) captures the
|
||||||
// pre-agent state tagged 'user'. The agent's new content is snapshotted
|
// pre-agent state tagged 'user'. The agent's new content is
|
||||||
// later by the debounced PAGE_HISTORY job ('agent'). Skip if the prior
|
// snapshotted later by the debounced PAGE_HISTORY job ('agent'). Skip
|
||||||
// state is already agent-authored (boundary already pinned on the
|
// if the prior state is already agent-authored (boundary already
|
||||||
// user->agent transition), if the page is effectively empty, or if the
|
// pinned on the user->agent transition), if the page is effectively
|
||||||
// latest existing snapshot already equals this human state (avoid
|
// empty, or if the latest existing snapshot already equals this human
|
||||||
// duplicates).
|
// state (avoid duplicates).
|
||||||
if (lastUpdatedSource === 'agent' && page.lastUpdatedSource !== 'agent') {
|
if (
|
||||||
|
lastUpdatedSource === 'agent' &&
|
||||||
|
page.lastUpdatedSource !== 'agent'
|
||||||
|
) {
|
||||||
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
|
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
|
||||||
pageId,
|
pageId,
|
||||||
{ includeContent: true, trx },
|
{ includeContent: true, trx },
|
||||||
);
|
);
|
||||||
const humanBaselineMissing =
|
const humanBaselineMissing =
|
||||||
!lastHistory || !isDeepStrictEqual(lastHistory.content, page.content);
|
!lastHistory ||
|
||||||
if (!isEmptyParagraphDoc(page.content as any) && humanBaselineMissing) {
|
!isDeepStrictEqual(lastHistory.content, page.content);
|
||||||
|
if (
|
||||||
|
!isEmptyParagraphDoc(page.content as any) &&
|
||||||
|
humanBaselineMissing
|
||||||
|
) {
|
||||||
await this.pageHistoryRepo.saveHistory(page, {
|
await this.pageHistoryRepo.saveHistory(page, {
|
||||||
contributorIds: page.contributorIds ?? undefined,
|
contributorIds: page.contributorIds ?? undefined,
|
||||||
trx,
|
trx,
|
||||||
@@ -256,8 +274,20 @@ export class PersistenceExtension implements Extension {
|
|||||||
|
|
||||||
this.logger.debug(`Page updated: ${pageId} - SlugId: ${page.slugId}`);
|
this.logger.debug(`Page updated: ${pageId} - SlugId: ${page.slugId}`);
|
||||||
});
|
});
|
||||||
|
break;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.error(`Failed to update page ${pageId}`, err);
|
this.logger.error(
|
||||||
|
`Failed to update page ${pageId} (attempt ${attempt}/${MAX_STORE_ATTEMPTS})`,
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
// The write failed and rolled back; clear the partially-assigned `page`
|
||||||
|
// so the post-store success branch below is skipped (no false "saved"
|
||||||
|
// broadcast / history snapshot for content that was never persisted).
|
||||||
|
page = null;
|
||||||
|
if (attempt < MAX_STORE_ATTEMPTS) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, attempt * 50));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (page) {
|
if (page) {
|
||||||
|
|||||||
Reference in New Issue
Block a user