fix(ai): address reindex-progress review (PR #242)
- Delete the now-orphaned PageRepo.getIdsByWorkspace (its only caller, reindexWorkspace, switched to getEmbeddablePageIds). Its docstring still claimed "Used by the RAG bulk reindex"; re-grep confirmed zero callers. - ai-settings.service.reindex(): if aiQueue.add() throws (Redis hiccup/ shutdown) the worker never runs so its finally->clear() never fires, leaving the seeded progress record stuck for the full 1h TTL (button stuck "reindexing: 0 of N"). Roll back the seed THIS call wrote (seeded flag, only when get() was null) before re-throwing, so a concurrent active run's record is never wiped. Add tests for both the clear-on-throw and the don't-clear-a-concurrent-run paths. - Add an integration spec (real Postgres) proving getEmbeddablePageIds' WHERE stays in lockstep with countEmbeddablePages: seeds every boundary case and asserts the returned id set equals the count. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -264,20 +264,6 @@ export class PageRepo {
|
|||||||
return Number(row?.count ?? 0);
|
return Number(row?.count ?? 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* IDs of all non-deleted pages in a workspace. Used by the RAG bulk reindex to
|
|
||||||
* (re)build embeddings for every existing page.
|
|
||||||
*/
|
|
||||||
async getIdsByWorkspace(workspaceId: string): Promise<string[]> {
|
|
||||||
const rows = await this.db
|
|
||||||
.selectFrom('pages')
|
|
||||||
.select('id')
|
|
||||||
.where('workspaceId', '=', workspaceId)
|
|
||||||
.where('deletedAt', 'is', null)
|
|
||||||
.execute();
|
|
||||||
return rows.map((r) => r.id);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IDs of the EMBEDDABLE page set for a workspace — the exact same set that
|
* IDs of the EMBEDDABLE page set for a workspace — the exact same set that
|
||||||
* `countEmbeddablePages` counts (a page qualifies if it has non-empty
|
* `countEmbeddablePages` counts (a page qualifies if it has non-empty
|
||||||
|
|||||||
@@ -151,6 +151,7 @@ describe('AiSettingsService.reindex progress seed', () => {
|
|||||||
start: jest.fn().mockImplementation(async () => {
|
start: jest.fn().mockImplementation(async () => {
|
||||||
order.push('start');
|
order.push('start');
|
||||||
}),
|
}),
|
||||||
|
clear: jest.fn().mockResolvedValue(undefined),
|
||||||
};
|
};
|
||||||
|
|
||||||
const service = new AiSettingsService(
|
const service = new AiSettingsService(
|
||||||
@@ -192,4 +193,36 @@ describe('AiSettingsService.reindex progress seed', () => {
|
|||||||
// The enqueue still runs (and de-duplicates against the active job).
|
// The enqueue still runs (and de-duplicates against the active job).
|
||||||
expect(aiQueue.add).toHaveBeenCalledTimes(1);
|
expect(aiQueue.add).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('clears the seed it just wrote and re-throws when enqueue fails', async () => {
|
||||||
|
const { service, aiQueue, reindexProgress } = makeService();
|
||||||
|
// This call seeds (get() is null) but the enqueue then blows up
|
||||||
|
// (Redis hiccup/shutdown) -> the worker never runs and never clear()s, so
|
||||||
|
// reindex() must roll back its own seed to avoid a 1h stuck "reindexing".
|
||||||
|
const boom = new Error('redis down');
|
||||||
|
aiQueue.add.mockRejectedValue(boom);
|
||||||
|
|
||||||
|
await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom);
|
||||||
|
|
||||||
|
expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478);
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT clear a concurrent active run when enqueue fails (no seed)', async () => {
|
||||||
|
const { service, aiQueue, reindexProgress } = makeService();
|
||||||
|
// A run is already active, so THIS call does not seed; if the enqueue then
|
||||||
|
// fails it must NOT wipe the live worker's record.
|
||||||
|
reindexProgress.get.mockResolvedValue({
|
||||||
|
total: 478,
|
||||||
|
done: 120,
|
||||||
|
startedAt: Date.now(),
|
||||||
|
});
|
||||||
|
const boom = new Error('redis down');
|
||||||
|
aiQueue.add.mockRejectedValue(boom);
|
||||||
|
|
||||||
|
await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom);
|
||||||
|
|
||||||
|
expect(reindexProgress.start).not.toHaveBeenCalled();
|
||||||
|
expect(reindexProgress.clear).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -114,9 +114,11 @@ export class AiSettingsService {
|
|||||||
// second tab) must NOT reset the visible counter to 0 — that would
|
// second tab) must NOT reset the visible counter to 0 — that would
|
||||||
// understate the live worker's real position for the rest of the run. The
|
// understate the live worker's real position for the rest of the run. The
|
||||||
// worker's own start() at run begin is the single authoritative reset.
|
// worker's own start() at run begin is the single authoritative reset.
|
||||||
|
let seeded = false;
|
||||||
if ((await this.reindexProgress.get(workspaceId)) === null) {
|
if ((await this.reindexProgress.get(workspaceId)) === null) {
|
||||||
const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId);
|
const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId);
|
||||||
await this.reindexProgress.start(workspaceId, totalPages);
|
await this.reindexProgress.start(workspaceId, totalPages);
|
||||||
|
seeded = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
const jobId = `ai-reindex-${workspaceId}`;
|
const jobId = `ai-reindex-${workspaceId}`;
|
||||||
@@ -125,15 +127,27 @@ export class AiSettingsService {
|
|||||||
// de-duplicates against it, keeping the in-progress pass.
|
// de-duplicates against it, keeping the in-progress pass.
|
||||||
await this.aiQueue.remove(jobId).catch(() => undefined);
|
await this.aiQueue.remove(jobId).catch(() => undefined);
|
||||||
|
|
||||||
await this.aiQueue.add(
|
try {
|
||||||
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
|
await this.aiQueue.add(
|
||||||
{ workspaceId },
|
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
|
||||||
{
|
{ workspaceId },
|
||||||
jobId,
|
{
|
||||||
removeOnComplete: true,
|
jobId,
|
||||||
removeOnFail: true,
|
removeOnComplete: true,
|
||||||
},
|
removeOnFail: true,
|
||||||
);
|
},
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
// If the enqueue fails (Redis hiccup/shutdown) the worker never runs, so
|
||||||
|
// its finally->clear() never fires. Roll back the seed WE just wrote so
|
||||||
|
// the status endpoint doesn't report a stuck "reindexing: 0 of N" for the
|
||||||
|
// full TTL. Only clear when this call did the seed — never wipe a
|
||||||
|
// concurrent active run's record (get() was non-null, seeded=false).
|
||||||
|
if (seeded) {
|
||||||
|
await this.reindexProgress.clear(workspaceId);
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,124 @@
|
|||||||
|
import { Kysely } from 'kysely';
|
||||||
|
import { randomUUID } from 'node:crypto';
|
||||||
|
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||||
|
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
|
||||||
|
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||||
|
import { getTestDb, destroyTestDb, createWorkspace, createSpace } from './db';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* `PageRepo.getEmbeddablePageIds` MUST stay in lockstep with
|
||||||
|
* `PageRepo.countEmbeddablePages` (page.repo.ts) — the bulk reindex iterates the
|
||||||
|
* ID set while the status endpoint reports the count as the live denominator, so
|
||||||
|
* if the two predicates ever diverge the "done X of Y" counter ends on the wrong
|
||||||
|
* total. Both share the SAME WHERE: a page qualifies iff it is non-deleted AND
|
||||||
|
* (text_content has a non-whitespace char OR it has a non-deleted embedding row).
|
||||||
|
*
|
||||||
|
* This is a DB-level invariant: the predicate lives in raw SQL (`text_content ~
|
||||||
|
* '[^[:space:]]'`) and an EXISTS subquery, so a unit test with mocked Kysely
|
||||||
|
* cannot observe it. We seed every boundary case against real Postgres and
|
||||||
|
* assert the returned ID set EQUALS the count (and is exactly the expected set).
|
||||||
|
* A future edit that touches one predicate but not the other turns this red.
|
||||||
|
*/
|
||||||
|
describe('PageRepo embeddable-page set: getEmbeddablePageIds <-> countEmbeddablePages [integration]', () => {
|
||||||
|
let db: Kysely<any>;
|
||||||
|
let repo: PageRepo;
|
||||||
|
let workspaceId: string;
|
||||||
|
let spaceId: string;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
db = getTestDb();
|
||||||
|
// Only the Kysely-backed query methods under test are exercised, so the
|
||||||
|
// SpaceMemberRepo / EventEmitter2 deps are never touched — stub them.
|
||||||
|
repo = new PageRepo(
|
||||||
|
db as any,
|
||||||
|
{} as unknown as SpaceMemberRepo,
|
||||||
|
{} as unknown as EventEmitter2,
|
||||||
|
);
|
||||||
|
workspaceId = (await createWorkspace(db)).id;
|
||||||
|
spaceId = (await createSpace(db, workspaceId)).id;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await destroyTestDb();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Insert a page with explicit text_content / deleted_at (createPage in db.ts
|
||||||
|
// sets neither), returning its id so the test can assert membership.
|
||||||
|
async function insertPage(args: {
|
||||||
|
textContent: string | null;
|
||||||
|
deletedAt?: Date | null;
|
||||||
|
}): Promise<string> {
|
||||||
|
const id = randomUUID();
|
||||||
|
await db
|
||||||
|
.insertInto('pages')
|
||||||
|
.values({
|
||||||
|
id,
|
||||||
|
slugId: `slug-${id.slice(0, 8)}`,
|
||||||
|
title: `page-${id.slice(0, 8)}`,
|
||||||
|
spaceId,
|
||||||
|
workspaceId,
|
||||||
|
textContent: args.textContent,
|
||||||
|
deletedAt: args.deletedAt ?? null,
|
||||||
|
})
|
||||||
|
.execute();
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert one embedding chunk row for a page (NOT NULL columns + deleted_at).
|
||||||
|
async function insertEmbedding(
|
||||||
|
pageId: string,
|
||||||
|
opts: { deletedAt?: Date | null } = {},
|
||||||
|
): Promise<void> {
|
||||||
|
await db
|
||||||
|
.insertInto('pageEmbeddings')
|
||||||
|
.values({
|
||||||
|
id: randomUUID(),
|
||||||
|
workspaceId,
|
||||||
|
pageId,
|
||||||
|
spaceId,
|
||||||
|
chunkIndex: 0,
|
||||||
|
chunkStart: 0,
|
||||||
|
chunkLength: 1,
|
||||||
|
content: 'x',
|
||||||
|
modelName: 'test-model',
|
||||||
|
modelDimensions: 1,
|
||||||
|
deletedAt: opts.deletedAt ?? null,
|
||||||
|
})
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
it('returns exactly the embeddable set and its size equals countEmbeddablePages', async () => {
|
||||||
|
// IN the set --------------------------------------------------------------
|
||||||
|
// (a) non-deleted page with real body text.
|
||||||
|
const withText = await insertPage({ textContent: 'hello world' });
|
||||||
|
// (b) non-deleted page with NO text but a live embedding row (EXISTS clause:
|
||||||
|
// a page that lost its text yet still has stale vectors must be visited
|
||||||
|
// so the reindex can clear them).
|
||||||
|
const noTextLiveEmbedding = await insertPage({ textContent: null });
|
||||||
|
await insertEmbedding(noTextLiveEmbedding);
|
||||||
|
|
||||||
|
// OUT of the set ----------------------------------------------------------
|
||||||
|
// (c) non-deleted, text_content NULL, no embeddings.
|
||||||
|
await insertPage({ textContent: null });
|
||||||
|
// (d) non-deleted, whitespace-only text (regex requires a non-space char).
|
||||||
|
await insertPage({ textContent: ' \n\t ' });
|
||||||
|
// (e) deleted page WITH body text — excluded by the non-deleted predicate.
|
||||||
|
await insertPage({
|
||||||
|
textContent: 'deleted but had text',
|
||||||
|
deletedAt: new Date(),
|
||||||
|
});
|
||||||
|
// (f) non-deleted, no text, with ONLY a DELETED embedding row — the EXISTS
|
||||||
|
// subquery filters pe.deleted_at IS NULL, so this stays out.
|
||||||
|
const onlyDeletedEmbedding = await insertPage({ textContent: null });
|
||||||
|
await insertEmbedding(onlyDeletedEmbedding, { deletedAt: new Date() });
|
||||||
|
|
||||||
|
const ids = await repo.getEmbeddablePageIds(workspaceId);
|
||||||
|
const count = await repo.countEmbeddablePages(workspaceId);
|
||||||
|
|
||||||
|
// The two queries agree on the size (the load-bearing lockstep invariant)...
|
||||||
|
expect(ids.length).toBe(count);
|
||||||
|
// ...and the set is exactly the two qualifying pages, nothing else.
|
||||||
|
expect(new Set(ids)).toEqual(new Set([withText, noTextLiveEmbedding]));
|
||||||
|
expect(count).toBe(2);
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user