From 95d07d8d6fcc9983fab332ae2f673b9c0a20cf00 Mon Sep 17 00:00:00 2001 From: a Date: Sun, 28 Jun 2026 02:45:09 +0300 Subject: [PATCH] fix(ai): align reindex live denominator with the steady-state count Review fixes for the reindex-progress counter (#242): 1. Denominator jump (478 -> 500 -> 478): reindexWorkspace iterated getIdsByWorkspace() (ALL non-deleted pages) but the seed/status use countEmbeddablePages (text OR existing-embedding), so the live total exceeded the steady-state total whenever empty/text-less pages existed. Add PageRepo.getEmbeddablePageIds() that selects the IDs of the EXACT same set countEmbeddablePages counts (deletedAt IS NULL AND (text_content matches a non-whitespace char OR an EXISTS non-deleted pageEmbeddings row)), and have reindexWorkspace iterate THAT set with total = its length. Iteration set and count source change together, so done reaches exactly total == the steady-state denominator. Dropping text-less pages is correct (reindexPage no-ops on them; a page that lost its text but still has stale embeddings is in the set via the EXISTS clause and still gets its stale rows cleared). Removed the contradictory "worker overwrites with the real page count" / "denominator matches" comment. 2. Mid-run re-trigger reset: reindex() unconditionally re-seeded done=0 before an enqueue that de-dupes a running job, so a second click/admin/tab reset the visible counter while the worker kept incrementing. Now seed only when get(workspaceId) === null; the worker's own start() remains the single authoritative reset. 3. TTL: documented that it is intentionally tied to write progress (start/increment) and never refreshed on get(), so a dead worker's record can't be kept alive forever by client polling. Tests: new embedding-reindex-progress.service.spec.ts (fake ioredis: hash -> ReindexProgress, malformed/missing/non-numeric -> null, non-finite startedAt -> 0, hgetall throws -> null, start/increment issue hset/hincrby+expire and swallow Redis errors); reindex() seed order + no-reseed-when-active guard; getMasked live test now uses progress.total=500 vs DB 478 to pin the progress branch; indexer specs updated to mock getEmbeddablePageIds. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../embedding-indexer.service.spec.ts | 7 +- .../embedding/embedding-indexer.service.ts | 15 +- .../src/database/repos/page/page.repo.ts | 41 +++++ .../ai/ai-settings.service.spec.ts | 81 ++++++++- .../integrations/ai/ai-settings.service.ts | 18 +- ...embedding-reindex-progress.service.spec.ts | 163 ++++++++++++++++++ .../ai/embedding-reindex-progress.service.ts | 6 + 7 files changed, 315 insertions(+), 16 deletions(-) create mode 100644 apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts index 8793ecd7..38e86d12 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts @@ -14,7 +14,8 @@ import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-e * reindexWorkspace actually touches: * - aiService.getEmbeddingModel -> a model string so the up-front configured * check passes, - * - pageRepo.getIdsByWorkspace -> three page ids, + * - pageRepo.getEmbeddablePageIds -> three page ids (the embeddable set the + * reindex iterates), * - service.reindexPage -> spied per test to drive the per-page outcome. * * The point under test is the catch block: a FATAL provider error (auth/billing) @@ -26,7 +27,7 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => { function makeService() { const pageRepo = { - getIdsByWorkspace: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']), + getEmbeddablePageIds: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']), }; const pageEmbeddingRepo = {}; const aiService = { @@ -102,7 +103,7 @@ describe('EmbeddingIndexerService.reindexWorkspace progress', () => { function makeService(pageIds: string[] = ['p1', 'p2', 'p3']) { const pageRepo = { - getIdsByWorkspace: jest.fn().mockResolvedValue(pageIds), + getEmbeddablePageIds: jest.fn().mockResolvedValue(pageIds), }; const pageEmbeddingRepo = {}; const aiService = { diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts index e8a9f2d0..9c97a971 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts @@ -215,12 +215,19 @@ export class EmbeddingIndexerService { throw err; } - const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId); + // Iterate the EMBEDDABLE set (same predicate as countEmbeddablePages), NOT + // every non-deleted page: this makes `total` here equal the steady-state + // denominator, so the live counter climbs 0 -> total and matches the + // before/after DB count exactly (no 478 -> 500 -> 478 denominator jump). + // Text-less pages are correctly skipped — reindexPage no-ops on them, and + // a page that lost its text but still has stale embeddings IS in this set + // (the EXISTS clause) so it is still visited and its stale rows cleared. + const pageIds = await this.pageRepo.getEmbeddablePageIds(workspaceId); const total = pageIds.length; const startedAt = Date.now(); - // Publish the live run progress (overwrites the enqueue-time placeholder - // with the real page count, done back to 0) so the settings status can - // report done climbing 0 -> total while this reindex runs. + // Publish the live run progress over this same set (done reset to 0). The + // counter increments once per iterated page and reaches exactly `total`, + // which equals countEmbeddablePages — the steady-state denominator. await this.reindexProgress.start(workspaceId, total); this.logger.log( `reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`, diff --git a/apps/server/src/database/repos/page/page.repo.ts b/apps/server/src/database/repos/page/page.repo.ts index a7ac3a5e..9e253c21 100644 --- a/apps/server/src/database/repos/page/page.repo.ts +++ b/apps/server/src/database/repos/page/page.repo.ts @@ -278,6 +278,47 @@ export class PageRepo { return rows.map((r) => r.id); } + /** + * IDs of the EMBEDDABLE page set for a workspace — the exact same set that + * `countEmbeddablePages` counts (a page qualifies if it has non-empty + * textContent OR already has a stored embedding row). The bulk reindex + * iterates THIS set so the live "done" counter reaches exactly + * `countEmbeddablePages` (the steady-state denominator), instead of iterating + * every non-deleted page (which would push the denominator above the + * steady-state value mid-run). + * + * IMPORTANT: the WHERE here MUST stay in lockstep with `countEmbeddablePages` + * — if one changes, change both, or the live total and steady-state total + * diverge again. Dropping text-less pages is correct: `reindexPage` no-ops on + * a page with no extractable content anyway, and a page that lost its text but + * still has stale embeddings IS in this set (the EXISTS clause), so it is still + * visited and its stale rows are cleared. + */ + async getEmbeddablePageIds(workspaceId: string): Promise { + const rows = await this.db + .selectFrom('pages as p') + .select('p.id') + .where('p.workspaceId', '=', workspaceId) + .where('p.deletedAt', 'is', null) + .where((eb) => + eb.or([ + // Has extractable body text (mirrors countEmbeddablePages: any + // non-whitespace char; raw SQL -> snake_case column name). + sql`p.text_content ~ '[^[:space:]]'`, + // OR already has at least one (non-deleted) embedding row. + eb.exists( + eb + .selectFrom('pageEmbeddings as pe') + .select(sql`1`.as('one')) + .whereRef('pe.pageId', '=', 'p.id') + .where('pe.deletedAt', 'is', null), + ), + ]), + ) + .execute(); + return rows.map((r) => r.id); + } + async deletePage(pageId: string): Promise { let query = this.db.deleteFrom('pages'); diff --git a/apps/server/src/integrations/ai/ai-settings.service.spec.ts b/apps/server/src/integrations/ai/ai-settings.service.spec.ts index 67cbc8b5..20db50eb 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.spec.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.spec.ts @@ -95,17 +95,20 @@ describe('AiSettingsService.getMasked reindex progress', () => { it('reports the live run numbers when a reindex progress record is active', async () => { const { service, reindexProgress } = makeService(); - // Mid-run: 120 of 478 pages processed. + // Use a progress.total (500) DISTINCT from the DB count (478) so the test + // actually pins the progress.total branch rather than coincidentally + // matching the DB fallback. With fix #1 the two sources agree in practice, + // but getMasked must still return progress.total when a record is active. reindexProgress.get.mockResolvedValue({ - total: 478, + total: 500, done: 120, startedAt: Date.now(), }); const masked = await service.getMasked(WORKSPACE_ID); - expect(masked.indexedPages).toBe(120); - expect(masked.totalPages).toBe(478); + expect(masked.indexedPages).toBe(120); // progress.done, not DB 478 + expect(masked.totalPages).toBe(500); // progress.total, not DB 478 expect(masked.reindexing).toBe(true); }); @@ -120,3 +123,73 @@ describe('AiSettingsService.getMasked reindex progress', () => { expect(masked.reindexing).toBe(false); }); }); + +/** + * reindex() must seed a live progress record (done=0) BEFORE enqueueing so the + * first status poll shows 0 — but ONLY when no run is already active, since + * aiQueue.add() de-duplicates a running reindex and a re-seed would reset the + * visible counter to 0 while the live worker keeps incrementing from its real + * position. + */ +describe('AiSettingsService.reindex progress seed', () => { + const WORKSPACE_ID = 'ws-1'; + + function makeService() { + const order: string[] = []; + const aiQueue = { + remove: jest.fn().mockResolvedValue(undefined), + add: jest.fn().mockImplementation(async () => { + order.push('add'); + }), + }; + const pageRepo = { + countEmbeddablePages: jest.fn().mockResolvedValue(478), + }; + const reindexProgress = { + // Default: no active run -> seed should happen. + get: jest.fn().mockResolvedValue(null), + start: jest.fn().mockImplementation(async () => { + order.push('start'); + }), + }; + + const service = new AiSettingsService( + {} as unknown as WorkspaceRepo, + {} as unknown as AiAgentRoleRepo, + {} as unknown as AiProviderCredentialsRepo, + {} as unknown as PageEmbeddingRepo, + pageRepo as unknown as PageRepo, + {} as unknown as SecretBoxService, + reindexProgress as unknown as EmbeddingReindexProgressService, + aiQueue as unknown as Queue, + ); + return { service, aiQueue, pageRepo, reindexProgress, order }; + } + + it('seeds progress (workspace, count) BEFORE enqueue when no run is active', async () => { + const { service, aiQueue, reindexProgress, order } = makeService(); + + await service.reindex(WORKSPACE_ID); + + expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478); + expect(aiQueue.add).toHaveBeenCalledTimes(1); + // Seed must precede the enqueue so the first poll already reports done=0. + expect(order).toEqual(['start', 'add']); + }); + + it('does NOT re-seed when a run is already active (mid-run re-trigger)', async () => { + const { service, aiQueue, reindexProgress } = makeService(); + // An active record exists -> a second click must not reset the counter. + reindexProgress.get.mockResolvedValue({ + total: 478, + done: 120, + startedAt: Date.now(), + }); + + await service.reindex(WORKSPACE_ID); + + expect(reindexProgress.start).not.toHaveBeenCalled(); + // The enqueue still runs (and de-duplicates against the active job). + expect(aiQueue.add).toHaveBeenCalledTimes(1); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-settings.service.ts b/apps/server/src/integrations/ai/ai-settings.service.ts index ff32d820..47bf9e7d 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.ts @@ -105,11 +105,19 @@ export class AiSettingsService { // Seed a live progress record BEFORE enqueueing so the very first status // poll already reports done=0 (the reindex POST returns the PRE-job counts, // so without this seed the first poll would still show "total of total"). - // The worker overwrites `total` with the real page count, increments `done` - // as it runs, and clears the record in a finally. `totalPages` uses the same - // source the status endpoint reports, so the counter denominator matches. - const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); - await this.reindexProgress.start(workspaceId, totalPages); + // `totalPages` uses countEmbeddablePages — the SAME set the worker iterates + // and the SAME denominator the status endpoint reports, so the live and + // steady-state totals match. + // + // ONLY seed when no run is active: aiQueue.add() de-duplicates an already- + // running reindex, so a mid-run re-trigger (second click / second admin / + // second tab) must NOT reset the visible counter to 0 — that would + // understate the live worker's real position for the rest of the run. The + // worker's own start() at run begin is the single authoritative reset. + if ((await this.reindexProgress.get(workspaceId)) === null) { + const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); + await this.reindexProgress.start(workspaceId, totalPages); + } const jobId = `ai-reindex-${workspaceId}`; // Clear a prior non-active entry so a stale job can't block this reindex. diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts new file mode 100644 index 00000000..2df8826c --- /dev/null +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts @@ -0,0 +1,163 @@ +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; +import type { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; + +/** + * Unit tests for the Redis-backed reindex-progress store. + * + * The store is a thin, BEST-EFFORT wrapper: writes (start/increment) issue an + * hset/hincrby + expire pipeline and must SWALLOW Redis errors (progress is + * cosmetic — it must never break a reindex); reads (get) must map a valid hash + * to a ReindexProgress and degrade to null on a malformed/missing record or a + * Redis failure. We drive it with a hand-rolled fake ioredis (the project mocks + * Redis with plain fakes, see public-share limiter specs). + */ +describe('EmbeddingReindexProgressService', () => { + const WORKSPACE_ID = 'ws-1'; + const KEY = 'ai:reindex:progress:ws-1'; + + /** + * Build a fake ioredis whose `multi()` returns a chainable recorder and whose + * `hgetall`/`del` are configurable jest mocks. `execImpl` lets a test make the + * pipeline reject (to assert error-swallowing). + */ + function makeRedis(opts: { execImpl?: () => Promise } = {}) { + const exec = jest + .fn() + .mockImplementation(opts.execImpl ?? (() => Promise.resolve([]))); + // mockReturnThis() returns the call's `this` (the multi object), so the + // chain hset().expire().exec() resolves correctly. + const multiObj = { + hset: jest.fn().mockReturnThis(), + hincrby: jest.fn().mockReturnThis(), + expire: jest.fn().mockReturnThis(), + exec, + }; + const multi = jest.fn(() => multiObj); + const hgetall = jest.fn().mockResolvedValue({}); + const del = jest.fn().mockResolvedValue(1); + const redis = { multi, hgetall, del } as unknown as Redis; + return { redis, multiObj, multi, hgetall, del, exec }; + } + + function makeService(redis: Redis) { + const redisService = { + getOrThrow: () => redis, + } as unknown as RedisService; + return new EmbeddingReindexProgressService(redisService); + } + + describe('get', () => { + it('maps a valid hash to a ReindexProgress object', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: '478', done: '120', startedAt: '1000' }); + const service = makeService(redis); + + await expect(service.get(WORKSPACE_ID)).resolves.toEqual({ + total: 478, + done: 120, + startedAt: 1000, + }); + expect(hgetall).toHaveBeenCalledWith(KEY); + }); + + it('returns null for an empty hash (no record)', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({}); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('returns null when `total` is missing (partial record)', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ done: '5' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('returns null for a non-numeric total', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: 'abc', done: '1', startedAt: '1' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('returns null for a non-numeric done', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: '10', done: 'xyz', startedAt: '1' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('coerces a non-finite startedAt to 0', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: '10', done: '2', startedAt: 'nope' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toEqual({ + total: 10, + done: 2, + startedAt: 0, + }); + }); + + it('degrades to null when hgetall throws (degradation contract)', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockRejectedValue(new Error('redis down')); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + }); + + describe('start', () => { + it('issues hset + expire on the workspace key', async () => { + const { redis, multiObj } = makeRedis(); + await makeService(redis).start(WORKSPACE_ID, 478); + + expect(multiObj.hset).toHaveBeenCalledWith( + KEY, + expect.objectContaining({ total: '478', done: '0' }), + ); + expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number)); + expect(multiObj.exec).toHaveBeenCalledTimes(1); + }); + + it('swallows a thrown Redis error (best-effort)', async () => { + const { redis } = makeRedis({ + execImpl: () => Promise.reject(new Error('redis down')), + }); + await expect( + makeService(redis).start(WORKSPACE_ID, 1), + ).resolves.toBeUndefined(); + }); + }); + + describe('increment', () => { + it('issues hincrby + expire on the workspace key', async () => { + const { redis, multiObj } = makeRedis(); + await makeService(redis).increment(WORKSPACE_ID); + + expect(multiObj.hincrby).toHaveBeenCalledWith(KEY, 'done', 1); + expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number)); + expect(multiObj.exec).toHaveBeenCalledTimes(1); + }); + + it('swallows a thrown Redis error (best-effort)', async () => { + const { redis } = makeRedis({ + execImpl: () => Promise.reject(new Error('redis down')), + }); + await expect( + makeService(redis).increment(WORKSPACE_ID), + ).resolves.toBeUndefined(); + }); + }); + + describe('clear', () => { + it('deletes the workspace key', async () => { + const { redis, del } = makeRedis(); + await makeService(redis).clear(WORKSPACE_ID); + expect(del).toHaveBeenCalledWith(KEY); + }); + + it('swallows a thrown Redis error (best-effort)', async () => { + const { redis, del } = makeRedis(); + del.mockRejectedValue(new Error('redis down')); + await expect( + makeService(redis).clear(WORKSPACE_ID), + ).resolves.toBeUndefined(); + }); + }); +}); diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts index ff8d164d..2d62fd65 100644 --- a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts @@ -22,6 +22,12 @@ const KEY_PREFIX = 'ai:reindex:progress:'; * reaches its `clear()` finally can still self-clean instead of leaving a stuck * "reindexing" state. Refreshed on every increment so a long run never expires * mid-flight; on a crash it disappears within TTL of the last processed page. + * + * INTENTIONALLY tied to WRITE progress (start/increment) only — never refreshed + * on get(). Refreshing on read would keep a dead worker's record alive forever + * as long as a client keeps polling (a permanently stuck reindexing:true). The + * clear() in the worker's finally handles normal completion; a dead worker's + * record expires after TTL, and the client's own poll cap stops polling anyway. */ const TTL_SECONDS = 60 * 60; // 1h