From 770ba7054173f5fa9eaaaae166418f1f4e952fcf Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Fri, 26 Jun 2026 06:05:28 +0300 Subject: [PATCH] fix(collab): retry transient store failures so autosave edits aren't lost (#206) persist-1: onStoreDocument wrapped the page write in a try/catch that only logged and swallowed the error, then resolved "successfully". hocuspocus destroys/unloads the in-memory Y.Doc right after the hook resolves (the only copy of the latest edit), so a transient DB error (deadlock, serialization failure, dropped connection) silently lost the edit. Worse, the post-store branch ran on the partially-assigned `page`, broadcasting a phantom "page.updated" and enqueueing a history snapshot for content never written. Wrap the write in a small bounded retry (3 attempts) so the save is re-attempted while we still hold the doc, and clear `page` on failure so the success-only side effects never report a save that didn't happen. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../extensions/persistence-store.spec.ts | 42 +++++ .../extensions/persistence.extension.ts | 176 ++++++++++-------- 2 files changed, 145 insertions(+), 73 deletions(-) diff --git a/apps/server/src/collaboration/extensions/persistence-store.spec.ts b/apps/server/src/collaboration/extensions/persistence-store.spec.ts index 4262d77f..d0fe703d 100644 --- a/apps/server/src/collaboration/extensions/persistence-store.spec.ts +++ b/apps/server/src/collaboration/extensions/persistence-store.spec.ts @@ -182,4 +182,46 @@ describe('PersistenceExtension.onStoreDocument — Approach-A boundary snapshot' expect(pageHistoryRepo.saveHistory).not.toHaveBeenCalled(); expect(historyQueue.add).not.toHaveBeenCalled(); }); + + // persist-1 — a transient DB failure during store must not silently lose the + // edit. hocuspocus unloads (destroys) the in-memory Y.Doc right after this + // hook resolves, so the store has to retry while it still holds the only copy. + it('retries a transient DB failure and still persists the edit (persist-1)', async () => { + const document = ydocFor(doc('NEW HUMAN CONTENT')); + pageRepo.findById.mockResolvedValue(persistedHumanPage('NEW HUMAN CONTENT')); + let attempts = 0; + pageRepo.updatePage.mockImplementation(async () => { + attempts += 1; + if (attempts === 1) throw new Error('deadlock detected'); // transient + callOrder.push('updatePage'); + }); + + await ext.onStoreDocument(buildData(document, 'user') as any); + + // First attempt failed and rolled back; the retry persisted the edit. + expect(pageRepo.updatePage).toHaveBeenCalledTimes(2); + // The edit WAS saved, so the post-store success path runs as normal. + expect((document as any).broadcastStateless).toHaveBeenCalledTimes(1); + expect(historyQueue.add).toHaveBeenCalledTimes(1); + }); + + // persist-1 — when every attempt fails the hook must NOT report a phantom + // success: no "page.updated" badge broadcast and no history snapshot for + // content that was never written. + it('does not run post-store side effects when every store attempt fails (persist-1)', async () => { + const document = ydocFor(doc('NEW HUMAN CONTENT')); + pageRepo.findById.mockResolvedValue(persistedHumanPage('NEW HUMAN CONTENT')); + pageRepo.updatePage.mockRejectedValue(new Error('connection reset')); + + await expect( + ext.onStoreDocument(buildData(document, 'user') as any), + ).resolves.toBeUndefined(); + + // Bounded retry exhausted (MAX_STORE_ATTEMPTS). + expect(pageRepo.updatePage).toHaveBeenCalledTimes(3); + // No false-success: nothing downstream fires for the unsaved content. + expect((document as any).broadcastStateless).not.toHaveBeenCalled(); + expect(historyQueue.add).not.toHaveBeenCalled(); + expect(aiQueue.add).not.toHaveBeenCalled(); + }); }); diff --git a/apps/server/src/collaboration/extensions/persistence.extension.ts b/apps/server/src/collaboration/extensions/persistence.extension.ts index e9119358..f802f229 100644 --- a/apps/server/src/collaboration/extensions/persistence.extension.ts +++ b/apps/server/src/collaboration/extensions/persistence.extension.ts @@ -181,83 +181,113 @@ export class PersistenceExtension implements Extension { context?.actor, ); - try { - await executeTx(this.db, async (trx) => { - page = await this.pageRepo.findById(pageId, { - withLock: true, - includeContent: true, - trx, - }); + // Persist with a small bounded retry. The in-memory Y.Doc is the ONLY copy + // of the latest edit until this hook returns: hocuspocus destroys/unloads the + // doc right after onStoreDocument resolves (see storeDocumentHooks' finally + // -> unloadDocument). If a transient DB error (deadlock, serialization + // failure, dropped connection) is merely logged and swallowed, the function + // resolves "successfully", the doc is unloaded, and the edit is lost silently + // (#206 persist-1). Retrying here re-attempts the write while we still hold + // the doc; on total failure we clear `page` so the post-store side effects + // (badge broadcast, history snapshot) never report a save that didn't happen. + const MAX_STORE_ATTEMPTS = 3; + for (let attempt = 1; attempt <= MAX_STORE_ATTEMPTS; attempt++) { + try { + await executeTx(this.db, async (trx) => { + page = await this.pageRepo.findById(pageId, { + withLock: true, + includeContent: true, + trx, + }); - if (!page) { - this.logger.error(`Page with id ${pageId} not found`); - return; - } - - if (isDeepStrictEqual(tiptapJson, page.content)) { - page = null; - return; - } - - let contributorIds = undefined; - try { - const existingContributors = page.contributorIds || []; - contributorIds = Array.from( - new Set([ - ...existingContributors, - ...editingUserIds, - page.creatorId, - ]), - ); - } catch (err) { - //this.logger.debug('Contributors error:' + err?.['message']); - } - - // Approach A — boundary snapshot before the agent's first edit. - // When this store is the agent's and the page's currently persisted - // state was authored by a human, pin that human state as its own - // history version BEFORE the agent overwrites it. `page` still holds the - // OLD content/provenance here, so saveHistory(page) captures the - // pre-agent state tagged 'user'. The agent's new content is snapshotted - // later by the debounced PAGE_HISTORY job ('agent'). Skip if the prior - // state is already agent-authored (boundary already pinned on the - // user->agent transition), if the page is effectively empty, or if the - // latest existing snapshot already equals this human state (avoid - // duplicates). - if (lastUpdatedSource === 'agent' && page.lastUpdatedSource !== 'agent') { - const lastHistory = await this.pageHistoryRepo.findPageLastHistory( - pageId, - { includeContent: true, trx }, - ); - const humanBaselineMissing = - !lastHistory || !isDeepStrictEqual(lastHistory.content, page.content); - if (!isEmptyParagraphDoc(page.content as any) && humanBaselineMissing) { - await this.pageHistoryRepo.saveHistory(page, { - contributorIds: page.contributorIds ?? undefined, - trx, - }); + if (!page) { + this.logger.error(`Page with id ${pageId} not found`); + return; } - } - await this.pageRepo.updatePage( - { - content: tiptapJson, - textContent: textContent, - ydoc: ydocState, - lastUpdatedById: context.user.id, - // Human stays the responsible author; these annotate the source. - lastUpdatedSource, - lastUpdatedAiChatId: context?.aiChatId ?? null, - contributorIds: contributorIds, - }, - pageId, - trx, + if (isDeepStrictEqual(tiptapJson, page.content)) { + page = null; + return; + } + + let contributorIds = undefined; + try { + const existingContributors = page.contributorIds || []; + contributorIds = Array.from( + new Set([ + ...existingContributors, + ...editingUserIds, + page.creatorId, + ]), + ); + } catch (err) { + //this.logger.debug('Contributors error:' + err?.['message']); + } + + // Approach A — boundary snapshot before the agent's first edit. + // When this store is the agent's and the page's currently persisted + // state was authored by a human, pin that human state as its own + // history version BEFORE the agent overwrites it. `page` still holds + // the OLD content/provenance here, so saveHistory(page) captures the + // pre-agent state tagged 'user'. The agent's new content is + // snapshotted later by the debounced PAGE_HISTORY job ('agent'). Skip + // if the prior state is already agent-authored (boundary already + // pinned on the user->agent transition), if the page is effectively + // empty, or if the latest existing snapshot already equals this human + // state (avoid duplicates). + if ( + lastUpdatedSource === 'agent' && + page.lastUpdatedSource !== 'agent' + ) { + const lastHistory = await this.pageHistoryRepo.findPageLastHistory( + pageId, + { includeContent: true, trx }, + ); + const humanBaselineMissing = + !lastHistory || + !isDeepStrictEqual(lastHistory.content, page.content); + if ( + !isEmptyParagraphDoc(page.content as any) && + humanBaselineMissing + ) { + await this.pageHistoryRepo.saveHistory(page, { + contributorIds: page.contributorIds ?? undefined, + trx, + }); + } + } + + await this.pageRepo.updatePage( + { + content: tiptapJson, + textContent: textContent, + ydoc: ydocState, + lastUpdatedById: context.user.id, + // Human stays the responsible author; these annotate the source. + lastUpdatedSource, + lastUpdatedAiChatId: context?.aiChatId ?? null, + contributorIds: contributorIds, + }, + pageId, + trx, + ); + + this.logger.debug(`Page updated: ${pageId} - SlugId: ${page.slugId}`); + }); + break; + } catch (err) { + this.logger.error( + `Failed to update page ${pageId} (attempt ${attempt}/${MAX_STORE_ATTEMPTS})`, + err, ); - - this.logger.debug(`Page updated: ${pageId} - SlugId: ${page.slugId}`); - }); - } catch (err) { - this.logger.error(`Failed to update page ${pageId}`, err); + // The write failed and rolled back; clear the partially-assigned `page` + // so the post-store success branch below is skipped (no false "saved" + // broadcast / history snapshot for content that was never persisted). + page = null; + if (attempt < MAX_STORE_ATTEMPTS) { + await new Promise((resolve) => setTimeout(resolve, attempt * 50)); + } + } } if (page) {