diff --git a/apps/server/src/database/repos/page/page.repo.ts b/apps/server/src/database/repos/page/page.repo.ts index 9e253c21..a9b79c35 100644 --- a/apps/server/src/database/repos/page/page.repo.ts +++ b/apps/server/src/database/repos/page/page.repo.ts @@ -264,20 +264,6 @@ export class PageRepo { return Number(row?.count ?? 0); } - /** - * IDs of all non-deleted pages in a workspace. Used by the RAG bulk reindex to - * (re)build embeddings for every existing page. - */ - async getIdsByWorkspace(workspaceId: string): Promise { - const rows = await this.db - .selectFrom('pages') - .select('id') - .where('workspaceId', '=', workspaceId) - .where('deletedAt', 'is', null) - .execute(); - return rows.map((r) => r.id); - } - /** * IDs of the EMBEDDABLE page set for a workspace — the exact same set that * `countEmbeddablePages` counts (a page qualifies if it has non-empty diff --git a/apps/server/src/integrations/ai/ai-settings.service.spec.ts b/apps/server/src/integrations/ai/ai-settings.service.spec.ts index 20db50eb..97f81b56 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.spec.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.spec.ts @@ -151,6 +151,7 @@ describe('AiSettingsService.reindex progress seed', () => { start: jest.fn().mockImplementation(async () => { order.push('start'); }), + clear: jest.fn().mockResolvedValue(undefined), }; const service = new AiSettingsService( @@ -192,4 +193,36 @@ describe('AiSettingsService.reindex progress seed', () => { // The enqueue still runs (and de-duplicates against the active job). expect(aiQueue.add).toHaveBeenCalledTimes(1); }); + + it('clears the seed it just wrote and re-throws when enqueue fails', async () => { + const { service, aiQueue, reindexProgress } = makeService(); + // This call seeds (get() is null) but the enqueue then blows up + // (Redis hiccup/shutdown) -> the worker never runs and never clear()s, so + // reindex() must roll back its own seed to avoid a 1h stuck "reindexing". + const boom = new Error('redis down'); + aiQueue.add.mockRejectedValue(boom); + + await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom); + + expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478); + expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID); + }); + + it('does NOT clear a concurrent active run when enqueue fails (no seed)', async () => { + const { service, aiQueue, reindexProgress } = makeService(); + // A run is already active, so THIS call does not seed; if the enqueue then + // fails it must NOT wipe the live worker's record. + reindexProgress.get.mockResolvedValue({ + total: 478, + done: 120, + startedAt: Date.now(), + }); + const boom = new Error('redis down'); + aiQueue.add.mockRejectedValue(boom); + + await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom); + + expect(reindexProgress.start).not.toHaveBeenCalled(); + expect(reindexProgress.clear).not.toHaveBeenCalled(); + }); }); diff --git a/apps/server/src/integrations/ai/ai-settings.service.ts b/apps/server/src/integrations/ai/ai-settings.service.ts index 47bf9e7d..7dc2238f 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.ts @@ -114,9 +114,11 @@ export class AiSettingsService { // second tab) must NOT reset the visible counter to 0 — that would // understate the live worker's real position for the rest of the run. The // worker's own start() at run begin is the single authoritative reset. + let seeded = false; if ((await this.reindexProgress.get(workspaceId)) === null) { const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); await this.reindexProgress.start(workspaceId, totalPages); + seeded = true; } const jobId = `ai-reindex-${workspaceId}`; @@ -125,15 +127,27 @@ export class AiSettingsService { // de-duplicates against it, keeping the in-progress pass. await this.aiQueue.remove(jobId).catch(() => undefined); - await this.aiQueue.add( - QueueJob.WORKSPACE_CREATE_EMBEDDINGS, - { workspaceId }, - { - jobId, - removeOnComplete: true, - removeOnFail: true, - }, - ); + try { + await this.aiQueue.add( + QueueJob.WORKSPACE_CREATE_EMBEDDINGS, + { workspaceId }, + { + jobId, + removeOnComplete: true, + removeOnFail: true, + }, + ); + } catch (err) { + // If the enqueue fails (Redis hiccup/shutdown) the worker never runs, so + // its finally->clear() never fires. Roll back the seed WE just wrote so + // the status endpoint doesn't report a stuck "reindexing: 0 of N" for the + // full TTL. Only clear when this call did the seed — never wipe a + // concurrent active run's record (get() was non-null, seeded=false). + if (seeded) { + await this.reindexProgress.clear(workspaceId); + } + throw err; + } } /** diff --git a/apps/server/test/integration/page-embeddable-ids-lockstep.int-spec.ts b/apps/server/test/integration/page-embeddable-ids-lockstep.int-spec.ts new file mode 100644 index 00000000..ae7ffde5 --- /dev/null +++ b/apps/server/test/integration/page-embeddable-ids-lockstep.int-spec.ts @@ -0,0 +1,124 @@ +import { Kysely } from 'kysely'; +import { randomUUID } from 'node:crypto'; +import { PageRepo } from '@docmost/db/repos/page/page.repo'; +import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { getTestDb, destroyTestDb, createWorkspace, createSpace } from './db'; + +/** + * `PageRepo.getEmbeddablePageIds` MUST stay in lockstep with + * `PageRepo.countEmbeddablePages` (page.repo.ts) — the bulk reindex iterates the + * ID set while the status endpoint reports the count as the live denominator, so + * if the two predicates ever diverge the "done X of Y" counter ends on the wrong + * total. Both share the SAME WHERE: a page qualifies iff it is non-deleted AND + * (text_content has a non-whitespace char OR it has a non-deleted embedding row). + * + * This is a DB-level invariant: the predicate lives in raw SQL (`text_content ~ + * '[^[:space:]]'`) and an EXISTS subquery, so a unit test with mocked Kysely + * cannot observe it. We seed every boundary case against real Postgres and + * assert the returned ID set EQUALS the count (and is exactly the expected set). + * A future edit that touches one predicate but not the other turns this red. + */ +describe('PageRepo embeddable-page set: getEmbeddablePageIds <-> countEmbeddablePages [integration]', () => { + let db: Kysely; + let repo: PageRepo; + let workspaceId: string; + let spaceId: string; + + beforeAll(async () => { + db = getTestDb(); + // Only the Kysely-backed query methods under test are exercised, so the + // SpaceMemberRepo / EventEmitter2 deps are never touched — stub them. + repo = new PageRepo( + db as any, + {} as unknown as SpaceMemberRepo, + {} as unknown as EventEmitter2, + ); + workspaceId = (await createWorkspace(db)).id; + spaceId = (await createSpace(db, workspaceId)).id; + }); + + afterAll(async () => { + await destroyTestDb(); + }); + + // Insert a page with explicit text_content / deleted_at (createPage in db.ts + // sets neither), returning its id so the test can assert membership. + async function insertPage(args: { + textContent: string | null; + deletedAt?: Date | null; + }): Promise { + const id = randomUUID(); + await db + .insertInto('pages') + .values({ + id, + slugId: `slug-${id.slice(0, 8)}`, + title: `page-${id.slice(0, 8)}`, + spaceId, + workspaceId, + textContent: args.textContent, + deletedAt: args.deletedAt ?? null, + }) + .execute(); + return id; + } + + // Insert one embedding chunk row for a page (NOT NULL columns + deleted_at). + async function insertEmbedding( + pageId: string, + opts: { deletedAt?: Date | null } = {}, + ): Promise { + await db + .insertInto('pageEmbeddings') + .values({ + id: randomUUID(), + workspaceId, + pageId, + spaceId, + chunkIndex: 0, + chunkStart: 0, + chunkLength: 1, + content: 'x', + modelName: 'test-model', + modelDimensions: 1, + deletedAt: opts.deletedAt ?? null, + }) + .execute(); + } + + it('returns exactly the embeddable set and its size equals countEmbeddablePages', async () => { + // IN the set -------------------------------------------------------------- + // (a) non-deleted page with real body text. + const withText = await insertPage({ textContent: 'hello world' }); + // (b) non-deleted page with NO text but a live embedding row (EXISTS clause: + // a page that lost its text yet still has stale vectors must be visited + // so the reindex can clear them). + const noTextLiveEmbedding = await insertPage({ textContent: null }); + await insertEmbedding(noTextLiveEmbedding); + + // OUT of the set ---------------------------------------------------------- + // (c) non-deleted, text_content NULL, no embeddings. + await insertPage({ textContent: null }); + // (d) non-deleted, whitespace-only text (regex requires a non-space char). + await insertPage({ textContent: ' \n\t ' }); + // (e) deleted page WITH body text — excluded by the non-deleted predicate. + await insertPage({ + textContent: 'deleted but had text', + deletedAt: new Date(), + }); + // (f) non-deleted, no text, with ONLY a DELETED embedding row — the EXISTS + // subquery filters pe.deleted_at IS NULL, so this stays out. + const onlyDeletedEmbedding = await insertPage({ textContent: null }); + await insertEmbedding(onlyDeletedEmbedding, { deletedAt: new Date() }); + + const ids = await repo.getEmbeddablePageIds(workspaceId); + const count = await repo.countEmbeddablePages(workspaceId); + + // The two queries agree on the size (the load-bearing lockstep invariant)... + expect(ids.length).toBe(count); + // ...and the set is exactly the two qualifying pages, nothing else. + expect(new Set(ids)).toEqual(new Set([withText, noTextLiveEmbedding])); + expect(count).toBe(2); + }); +});