feat(collab): separate agent edits from human edits in page history

Page-history snapshots are debounced/coalesced (one per 1–5 min window,
jobId=page.id). A human edit followed by an agent edit in the same window
collapsed into a single snapshot, losing both the pre-agent human state and
a deterministic record of the agent's result.

Two provenance-aware boundaries now bracket an agent intervention:
- Before: on a user->agent transition, onStoreDocument synchronously pins the
  current (pre-agent) human content as its own history version tagged 'user',
  inside the page-write transaction, before the agent overwrites it.
- After: agent stores enqueue an immediate (delay 0), source-keyed history job
  (jobId=`${pageId}:agent`) so the agent's result snapshots deterministically
  as 'agent' and a later human edit (jobId=page.id) cannot coalesce/retag it.

Also add an `id desc` tie-break to findPageLastHistory so "last history" stays
deterministic when two snapshots share a created_at, consistent with
findPageHistoryByPageId.

Known trade-offs (Variant 1): the delay-0 worker re-reads the row, leaving a
millisecond mis-tag window; multiple agent edits in one turn may yield multiple
versions. The reverse agent->human boundary is intentionally out of scope.
This commit is contained in:
vvzvlad
2026-06-17 06:40:28 +03:00
parent b0997cb749
commit 0a9788e89a
2 changed files with 57 additions and 6 deletions

View File

@@ -8,8 +8,14 @@ import {
import * as Y from 'yjs';
import { Injectable, Logger } from '@nestjs/common';
import { TiptapTransformer } from '@hocuspocus/transformer';
import { getPageId, jsonToText, tiptapExtensions } from '../collaboration.util';
import {
getPageId,
isEmptyParagraphDoc,
jsonToText,
tiptapExtensions,
} from '../collaboration.util';
import { PageRepo } from '@docmost/db/repos/page/page.repo';
import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { executeTx } from '@docmost/db/utils';
@@ -46,6 +52,7 @@ export class PersistenceExtension implements Extension {
constructor(
private readonly pageRepo: PageRepo,
private readonly pageHistoryRepo: PageHistoryRepo,
@InjectKysely() private readonly db: KyselyDB,
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
@InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue,
@@ -157,6 +164,32 @@ export class PersistenceExtension implements Extension {
//this.logger.debug('Contributors error:' + err?.['message']);
}
// Approach A — boundary snapshot before the agent's first edit.
// When this store is the agent's and the page's currently persisted
// state was authored by a human, pin that human state as its own
// history version BEFORE the agent overwrites it. `page` still holds the
// OLD content/provenance here, so saveHistory(page) captures the
// pre-agent state tagged 'user'. The agent's new content is snapshotted
// later by the debounced PAGE_HISTORY job ('agent'). Skip if the prior
// state is already agent-authored (boundary already pinned on the
// user->agent transition), if the page is effectively empty, or if the
// latest existing snapshot already equals this human state (avoid
// duplicates).
if (lastUpdatedSource === 'agent' && page.lastUpdatedSource !== 'agent') {
const lastHistory = await this.pageHistoryRepo.findPageLastHistory(
pageId,
{ includeContent: true, trx },
);
const humanBaselineMissing =
!lastHistory || !isDeepStrictEqual(lastHistory.content, page.content);
if (!isEmptyParagraphDoc(page.content as any) && humanBaselineMissing) {
await this.pageHistoryRepo.saveHistory(page, {
contributorIds: page.contributorIds ?? undefined,
trx,
});
}
}
await this.pageRepo.updatePage(
{
content: tiptapJson,
@@ -229,7 +262,7 @@ export class PersistenceExtension implements Extension {
workspaceId: page.workspaceId,
});
await this.enqueuePageHistory(page);
await this.enqueuePageHistory(page, lastUpdatedSource);
}
}
@@ -273,17 +306,30 @@ export class PersistenceExtension implements Extension {
return touched;
}
private async enqueuePageHistory(page: Page): Promise<void> {
private async enqueuePageHistory(
page: Page,
lastUpdatedSource: string,
): Promise<void> {
// Agent edits get an immediate, source-keyed history job: they snapshot
// deterministically as 'agent' and a later human edit (jobId = page.id)
// cannot coalesce/retag them. Human edits keep the age-based debounce so
// rapid human edits still coalesce into one snapshot.
// NOTE: the agent delay MUST stay 0 — the worker re-reads the page row at
// run time, so any delay would risk reading content a later human edit has
// already overwritten (mis-tagged snapshot). 0 minimizes that window.
const isAgent = lastUpdatedSource === 'agent';
const pageAge = Date.now() - new Date(page.createdAt).getTime();
const delay =
pageAge < HISTORY_FAST_THRESHOLD
const delay = isAgent
? 0
: pageAge < HISTORY_FAST_THRESHOLD
? HISTORY_FAST_INTERVAL
: HISTORY_INTERVAL;
const jobId = isAgent ? `${page.id}:agent` : page.id;
await this.historyQueue.add(
QueueJob.PAGE_HISTORY,
{ pageId: page.id } as IPageHistoryJob,
{ jobId: page.id, delay },
{ jobId, delay },
);
}

View File

@@ -120,7 +120,12 @@ export class PageHistoryRepo {
.$if(opts?.includeContent, (qb) => qb.select('content'))
.where('pageId', '=', pageId)
.limit(1)
// Secondary `id` tie-break: two snapshots for the same page can share a
// createdAt (e.g. the synchronous pre-agent boundary row and the
// immediate agent snapshot), so order by id to keep "last history"
// deterministic and consistent with findPageHistoryByPageId (id desc).
.orderBy('createdAt', 'desc')
.orderBy('id', 'desc')
.executeTakeFirst();
}