diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts index 8793ecd7..38e86d12 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts @@ -14,7 +14,8 @@ import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-e * reindexWorkspace actually touches: * - aiService.getEmbeddingModel -> a model string so the up-front configured * check passes, - * - pageRepo.getIdsByWorkspace -> three page ids, + * - pageRepo.getEmbeddablePageIds -> three page ids (the embeddable set the + * reindex iterates), * - service.reindexPage -> spied per test to drive the per-page outcome. * * The point under test is the catch block: a FATAL provider error (auth/billing) @@ -26,7 +27,7 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => { function makeService() { const pageRepo = { - getIdsByWorkspace: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']), + getEmbeddablePageIds: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']), }; const pageEmbeddingRepo = {}; const aiService = { @@ -102,7 +103,7 @@ describe('EmbeddingIndexerService.reindexWorkspace progress', () => { function makeService(pageIds: string[] = ['p1', 'p2', 'p3']) { const pageRepo = { - getIdsByWorkspace: jest.fn().mockResolvedValue(pageIds), + getEmbeddablePageIds: jest.fn().mockResolvedValue(pageIds), }; const pageEmbeddingRepo = {}; const aiService = { diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts index e8a9f2d0..9c97a971 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts @@ -215,12 +215,19 @@ export class EmbeddingIndexerService { throw err; } - const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId); + // Iterate the EMBEDDABLE set (same predicate as countEmbeddablePages), NOT + // every non-deleted page: this makes `total` here equal the steady-state + // denominator, so the live counter climbs 0 -> total and matches the + // before/after DB count exactly (no 478 -> 500 -> 478 denominator jump). + // Text-less pages are correctly skipped — reindexPage no-ops on them, and + // a page that lost its text but still has stale embeddings IS in this set + // (the EXISTS clause) so it is still visited and its stale rows cleared. + const pageIds = await this.pageRepo.getEmbeddablePageIds(workspaceId); const total = pageIds.length; const startedAt = Date.now(); - // Publish the live run progress (overwrites the enqueue-time placeholder - // with the real page count, done back to 0) so the settings status can - // report done climbing 0 -> total while this reindex runs. + // Publish the live run progress over this same set (done reset to 0). The + // counter increments once per iterated page and reaches exactly `total`, + // which equals countEmbeddablePages — the steady-state denominator. await this.reindexProgress.start(workspaceId, total); this.logger.log( `reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`, diff --git a/apps/server/src/database/repos/page/page.repo.ts b/apps/server/src/database/repos/page/page.repo.ts index a7ac3a5e..9e253c21 100644 --- a/apps/server/src/database/repos/page/page.repo.ts +++ b/apps/server/src/database/repos/page/page.repo.ts @@ -278,6 +278,47 @@ export class PageRepo { return rows.map((r) => r.id); } + /** + * IDs of the EMBEDDABLE page set for a workspace — the exact same set that + * `countEmbeddablePages` counts (a page qualifies if it has non-empty + * textContent OR already has a stored embedding row). The bulk reindex + * iterates THIS set so the live "done" counter reaches exactly + * `countEmbeddablePages` (the steady-state denominator), instead of iterating + * every non-deleted page (which would push the denominator above the + * steady-state value mid-run). + * + * IMPORTANT: the WHERE here MUST stay in lockstep with `countEmbeddablePages` + * — if one changes, change both, or the live total and steady-state total + * diverge again. Dropping text-less pages is correct: `reindexPage` no-ops on + * a page with no extractable content anyway, and a page that lost its text but + * still has stale embeddings IS in this set (the EXISTS clause), so it is still + * visited and its stale rows are cleared. + */ + async getEmbeddablePageIds(workspaceId: string): Promise { + const rows = await this.db + .selectFrom('pages as p') + .select('p.id') + .where('p.workspaceId', '=', workspaceId) + .where('p.deletedAt', 'is', null) + .where((eb) => + eb.or([ + // Has extractable body text (mirrors countEmbeddablePages: any + // non-whitespace char; raw SQL -> snake_case column name). + sql`p.text_content ~ '[^[:space:]]'`, + // OR already has at least one (non-deleted) embedding row. + eb.exists( + eb + .selectFrom('pageEmbeddings as pe') + .select(sql`1`.as('one')) + .whereRef('pe.pageId', '=', 'p.id') + .where('pe.deletedAt', 'is', null), + ), + ]), + ) + .execute(); + return rows.map((r) => r.id); + } + async deletePage(pageId: string): Promise { let query = this.db.deleteFrom('pages'); diff --git a/apps/server/src/integrations/ai/ai-settings.service.spec.ts b/apps/server/src/integrations/ai/ai-settings.service.spec.ts index 67cbc8b5..20db50eb 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.spec.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.spec.ts @@ -95,17 +95,20 @@ describe('AiSettingsService.getMasked reindex progress', () => { it('reports the live run numbers when a reindex progress record is active', async () => { const { service, reindexProgress } = makeService(); - // Mid-run: 120 of 478 pages processed. + // Use a progress.total (500) DISTINCT from the DB count (478) so the test + // actually pins the progress.total branch rather than coincidentally + // matching the DB fallback. With fix #1 the two sources agree in practice, + // but getMasked must still return progress.total when a record is active. reindexProgress.get.mockResolvedValue({ - total: 478, + total: 500, done: 120, startedAt: Date.now(), }); const masked = await service.getMasked(WORKSPACE_ID); - expect(masked.indexedPages).toBe(120); - expect(masked.totalPages).toBe(478); + expect(masked.indexedPages).toBe(120); // progress.done, not DB 478 + expect(masked.totalPages).toBe(500); // progress.total, not DB 478 expect(masked.reindexing).toBe(true); }); @@ -120,3 +123,73 @@ describe('AiSettingsService.getMasked reindex progress', () => { expect(masked.reindexing).toBe(false); }); }); + +/** + * reindex() must seed a live progress record (done=0) BEFORE enqueueing so the + * first status poll shows 0 — but ONLY when no run is already active, since + * aiQueue.add() de-duplicates a running reindex and a re-seed would reset the + * visible counter to 0 while the live worker keeps incrementing from its real + * position. + */ +describe('AiSettingsService.reindex progress seed', () => { + const WORKSPACE_ID = 'ws-1'; + + function makeService() { + const order: string[] = []; + const aiQueue = { + remove: jest.fn().mockResolvedValue(undefined), + add: jest.fn().mockImplementation(async () => { + order.push('add'); + }), + }; + const pageRepo = { + countEmbeddablePages: jest.fn().mockResolvedValue(478), + }; + const reindexProgress = { + // Default: no active run -> seed should happen. + get: jest.fn().mockResolvedValue(null), + start: jest.fn().mockImplementation(async () => { + order.push('start'); + }), + }; + + const service = new AiSettingsService( + {} as unknown as WorkspaceRepo, + {} as unknown as AiAgentRoleRepo, + {} as unknown as AiProviderCredentialsRepo, + {} as unknown as PageEmbeddingRepo, + pageRepo as unknown as PageRepo, + {} as unknown as SecretBoxService, + reindexProgress as unknown as EmbeddingReindexProgressService, + aiQueue as unknown as Queue, + ); + return { service, aiQueue, pageRepo, reindexProgress, order }; + } + + it('seeds progress (workspace, count) BEFORE enqueue when no run is active', async () => { + const { service, aiQueue, reindexProgress, order } = makeService(); + + await service.reindex(WORKSPACE_ID); + + expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478); + expect(aiQueue.add).toHaveBeenCalledTimes(1); + // Seed must precede the enqueue so the first poll already reports done=0. + expect(order).toEqual(['start', 'add']); + }); + + it('does NOT re-seed when a run is already active (mid-run re-trigger)', async () => { + const { service, aiQueue, reindexProgress } = makeService(); + // An active record exists -> a second click must not reset the counter. + reindexProgress.get.mockResolvedValue({ + total: 478, + done: 120, + startedAt: Date.now(), + }); + + await service.reindex(WORKSPACE_ID); + + expect(reindexProgress.start).not.toHaveBeenCalled(); + // The enqueue still runs (and de-duplicates against the active job). + expect(aiQueue.add).toHaveBeenCalledTimes(1); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-settings.service.ts b/apps/server/src/integrations/ai/ai-settings.service.ts index ff32d820..47bf9e7d 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.ts @@ -105,11 +105,19 @@ export class AiSettingsService { // Seed a live progress record BEFORE enqueueing so the very first status // poll already reports done=0 (the reindex POST returns the PRE-job counts, // so without this seed the first poll would still show "total of total"). - // The worker overwrites `total` with the real page count, increments `done` - // as it runs, and clears the record in a finally. `totalPages` uses the same - // source the status endpoint reports, so the counter denominator matches. - const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); - await this.reindexProgress.start(workspaceId, totalPages); + // `totalPages` uses countEmbeddablePages — the SAME set the worker iterates + // and the SAME denominator the status endpoint reports, so the live and + // steady-state totals match. + // + // ONLY seed when no run is active: aiQueue.add() de-duplicates an already- + // running reindex, so a mid-run re-trigger (second click / second admin / + // second tab) must NOT reset the visible counter to 0 — that would + // understate the live worker's real position for the rest of the run. The + // worker's own start() at run begin is the single authoritative reset. + if ((await this.reindexProgress.get(workspaceId)) === null) { + const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); + await this.reindexProgress.start(workspaceId, totalPages); + } const jobId = `ai-reindex-${workspaceId}`; // Clear a prior non-active entry so a stale job can't block this reindex. diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts new file mode 100644 index 00000000..2df8826c --- /dev/null +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts @@ -0,0 +1,163 @@ +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; +import type { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; + +/** + * Unit tests for the Redis-backed reindex-progress store. + * + * The store is a thin, BEST-EFFORT wrapper: writes (start/increment) issue an + * hset/hincrby + expire pipeline and must SWALLOW Redis errors (progress is + * cosmetic — it must never break a reindex); reads (get) must map a valid hash + * to a ReindexProgress and degrade to null on a malformed/missing record or a + * Redis failure. We drive it with a hand-rolled fake ioredis (the project mocks + * Redis with plain fakes, see public-share limiter specs). + */ +describe('EmbeddingReindexProgressService', () => { + const WORKSPACE_ID = 'ws-1'; + const KEY = 'ai:reindex:progress:ws-1'; + + /** + * Build a fake ioredis whose `multi()` returns a chainable recorder and whose + * `hgetall`/`del` are configurable jest mocks. `execImpl` lets a test make the + * pipeline reject (to assert error-swallowing). + */ + function makeRedis(opts: { execImpl?: () => Promise } = {}) { + const exec = jest + .fn() + .mockImplementation(opts.execImpl ?? (() => Promise.resolve([]))); + // mockReturnThis() returns the call's `this` (the multi object), so the + // chain hset().expire().exec() resolves correctly. + const multiObj = { + hset: jest.fn().mockReturnThis(), + hincrby: jest.fn().mockReturnThis(), + expire: jest.fn().mockReturnThis(), + exec, + }; + const multi = jest.fn(() => multiObj); + const hgetall = jest.fn().mockResolvedValue({}); + const del = jest.fn().mockResolvedValue(1); + const redis = { multi, hgetall, del } as unknown as Redis; + return { redis, multiObj, multi, hgetall, del, exec }; + } + + function makeService(redis: Redis) { + const redisService = { + getOrThrow: () => redis, + } as unknown as RedisService; + return new EmbeddingReindexProgressService(redisService); + } + + describe('get', () => { + it('maps a valid hash to a ReindexProgress object', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: '478', done: '120', startedAt: '1000' }); + const service = makeService(redis); + + await expect(service.get(WORKSPACE_ID)).resolves.toEqual({ + total: 478, + done: 120, + startedAt: 1000, + }); + expect(hgetall).toHaveBeenCalledWith(KEY); + }); + + it('returns null for an empty hash (no record)', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({}); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('returns null when `total` is missing (partial record)', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ done: '5' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('returns null for a non-numeric total', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: 'abc', done: '1', startedAt: '1' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('returns null for a non-numeric done', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: '10', done: 'xyz', startedAt: '1' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + + it('coerces a non-finite startedAt to 0', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockResolvedValue({ total: '10', done: '2', startedAt: 'nope' }); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toEqual({ + total: 10, + done: 2, + startedAt: 0, + }); + }); + + it('degrades to null when hgetall throws (degradation contract)', async () => { + const { redis, hgetall } = makeRedis(); + hgetall.mockRejectedValue(new Error('redis down')); + await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull(); + }); + }); + + describe('start', () => { + it('issues hset + expire on the workspace key', async () => { + const { redis, multiObj } = makeRedis(); + await makeService(redis).start(WORKSPACE_ID, 478); + + expect(multiObj.hset).toHaveBeenCalledWith( + KEY, + expect.objectContaining({ total: '478', done: '0' }), + ); + expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number)); + expect(multiObj.exec).toHaveBeenCalledTimes(1); + }); + + it('swallows a thrown Redis error (best-effort)', async () => { + const { redis } = makeRedis({ + execImpl: () => Promise.reject(new Error('redis down')), + }); + await expect( + makeService(redis).start(WORKSPACE_ID, 1), + ).resolves.toBeUndefined(); + }); + }); + + describe('increment', () => { + it('issues hincrby + expire on the workspace key', async () => { + const { redis, multiObj } = makeRedis(); + await makeService(redis).increment(WORKSPACE_ID); + + expect(multiObj.hincrby).toHaveBeenCalledWith(KEY, 'done', 1); + expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number)); + expect(multiObj.exec).toHaveBeenCalledTimes(1); + }); + + it('swallows a thrown Redis error (best-effort)', async () => { + const { redis } = makeRedis({ + execImpl: () => Promise.reject(new Error('redis down')), + }); + await expect( + makeService(redis).increment(WORKSPACE_ID), + ).resolves.toBeUndefined(); + }); + }); + + describe('clear', () => { + it('deletes the workspace key', async () => { + const { redis, del } = makeRedis(); + await makeService(redis).clear(WORKSPACE_ID); + expect(del).toHaveBeenCalledWith(KEY); + }); + + it('swallows a thrown Redis error (best-effort)', async () => { + const { redis, del } = makeRedis(); + del.mockRejectedValue(new Error('redis down')); + await expect( + makeService(redis).clear(WORKSPACE_ID), + ).resolves.toBeUndefined(); + }); + }); +}); diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts index ff8d164d..2d62fd65 100644 --- a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts @@ -22,6 +22,12 @@ const KEY_PREFIX = 'ai:reindex:progress:'; * reaches its `clear()` finally can still self-clean instead of leaving a stuck * "reindexing" state. Refreshed on every increment so a long run never expires * mid-flight; on a crash it disappears within TTL of the last processed page. + * + * INTENTIONALLY tied to WRITE progress (start/increment) only — never refreshed + * on get(). Refreshing on read would keep a dead worker's record alive forever + * as long as a client keeps polling (a permanently stuck reindexing:true). The + * clear() in the worker's finally handles normal completion; a dead worker's + * record expires after TTL, and the client's own poll cap stops polling anyway. */ const TTL_SECONDS = 60 * 60; // 1h