From 72bb03918d9416ff0b9b7fec8fb49330cd1ef59a Mon Sep 17 00:00:00 2001 From: a Date: Sun, 28 Jun 2026 01:44:09 +0300 Subject: [PATCH] fix(ai): show live reindex progress in semantic-search settings The "Indexed X of Y pages" counter stayed stuck at "478 of 478" during a manual "Reindex now" run instead of resetting to 0 and climbing. The status reports indexedPages = countIndexedPages (DISTINCT pages with >=1 embedding row), but reindex hard-replaces each page in its OWN small transaction, so nearly all pages always have rows -> the count never drops. Add a per-workspace live reindex-progress record in Redis (reusing the existing global ioredis client via RedisService, no new Redis config): - EmbeddingReindexProgressService: start/increment/clear/get over a Redis hash with a 1h TTL self-clean; all best-effort/cosmetic so a Redis failure degrades to the existing DB-count behavior. - AiSettingsService.reindex seeds {total, done:0, startedAt} at enqueue time so the very first poll already reports done=0. - EmbeddingIndexerService.reindexWorkspace overwrites total with the real page count at start, increments done per processed page (success or handled failure), and clears the record in a finally (covers success, fatal abort, and the unconfigured early-return) so a failed run never sticks. - AiSettingsService.getMasked returns the live run numbers when a progress record is active (plus an optional reindexing flag), else falls back to countIndexedPages/countEmbeddablePages. Per-page edits (reindexPage) never touch the workspace progress record, and no mass up-front delete is introduced (search availability preserved). Tests: indexer sets/increments/clears progress (incl. fatal abort and unconfigured early-return); status reports run progress when active and falls back when not. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../workspace/services/ai-settings-service.ts | 3 + .../embedding-indexer.service.spec.ts | 110 +++++++++++++- .../embedding/embedding-indexer.service.ts | 134 +++++++++------- .../ai/ai-settings.service.spec.ts | 81 +++++++++- .../integrations/ai/ai-settings.service.ts | 26 +++- apps/server/src/integrations/ai/ai.module.ts | 5 +- apps/server/src/integrations/ai/ai.types.ts | 3 + .../ai/embedding-reindex-progress.service.ts | 143 ++++++++++++++++++ 8 files changed, 443 insertions(+), 62 deletions(-) create mode 100644 apps/server/src/integrations/ai/embedding-reindex-progress.service.ts diff --git a/apps/client/src/features/workspace/services/ai-settings-service.ts b/apps/client/src/features/workspace/services/ai-settings-service.ts index c099ca0c..e12d1ebb 100644 --- a/apps/client/src/features/workspace/services/ai-settings-service.ts +++ b/apps/client/src/features/workspace/services/ai-settings-service.ts @@ -48,6 +48,9 @@ export interface IAiSettings { // RAG indexing coverage (pages indexed for semantic search). indexedPages: number; totalPages: number; + // True while a full workspace reindex is actively running; the counts above + // then reflect the live run progress (done climbs 0 -> total). + reindexing?: boolean; } // Update payload. Key semantics (same for `apiKey` and `embeddingApiKey`): diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts index 928702b3..8793ecd7 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts @@ -3,6 +3,8 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo'; import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo'; import { KyselyDB } from '@docmost/db/types/kysely.types'; import { AiService } from '../../../integrations/ai/ai.service'; +import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service'; +import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception'; /** * Unit tests for EmbeddingIndexerService.reindexWorkspace's batch control flow. @@ -30,15 +32,24 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => { const aiService = { getEmbeddingModel: jest.fn().mockResolvedValue('some-model'), }; + // Progress is a best-effort cosmetic store; mock its async methods so the + // batch control flow can be tested without Redis. + const reindexProgress = { + start: jest.fn().mockResolvedValue(undefined), + increment: jest.fn().mockResolvedValue(undefined), + clear: jest.fn().mockResolvedValue(undefined), + get: jest.fn().mockResolvedValue(null), + }; const db = {}; const service = new EmbeddingIndexerService( pageRepo as unknown as PageRepo, pageEmbeddingRepo as unknown as PageEmbeddingRepo, aiService as unknown as AiService, + reindexProgress as unknown as EmbeddingReindexProgressService, db as unknown as KyselyDB, ); - return { service, pageRepo, aiService }; + return { service, pageRepo, aiService, reindexProgress }; } it('aborts after the first page on a FATAL (401) provider error', async () => { @@ -78,3 +89,100 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => { expect(reindexPage).toHaveBeenCalledTimes(3); }); }); + +/** + * Live reindex-progress reporting: reindexWorkspace must publish a per-workspace + * progress record (total at start, done incremented per processed page) and ALWAYS + * clear it in a finally — including on a fatal abort and an unconfigured early + * return — so the settings status can show the counter climb without ever getting + * stuck in a "reindexing" state. + */ +describe('EmbeddingIndexerService.reindexWorkspace progress', () => { + const WORKSPACE_ID = 'ws-1'; + + function makeService(pageIds: string[] = ['p1', 'p2', 'p3']) { + const pageRepo = { + getIdsByWorkspace: jest.fn().mockResolvedValue(pageIds), + }; + const pageEmbeddingRepo = {}; + const aiService = { + getEmbeddingModel: jest.fn().mockResolvedValue('some-model'), + }; + const reindexProgress = { + start: jest.fn().mockResolvedValue(undefined), + increment: jest.fn().mockResolvedValue(undefined), + clear: jest.fn().mockResolvedValue(undefined), + get: jest.fn().mockResolvedValue(null), + }; + const db = {}; + const service = new EmbeddingIndexerService( + pageRepo as unknown as PageRepo, + pageEmbeddingRepo as unknown as PageEmbeddingRepo, + aiService as unknown as AiService, + reindexProgress as unknown as EmbeddingReindexProgressService, + db as unknown as KyselyDB, + ); + return { service, pageRepo, aiService, reindexProgress }; + } + + it('sets total at start, increments done per page, and clears in finally', async () => { + const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']); + jest.spyOn(service, 'reindexPage').mockResolvedValue(undefined); + + await service.reindexWorkspace(WORKSPACE_ID); + + expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3); + // One increment per processed page. + expect(reindexProgress.increment).toHaveBeenCalledTimes(3); + expect(reindexProgress.increment).toHaveBeenCalledWith(WORKSPACE_ID); + // Cleared exactly once on completion. + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID); + }); + + it('counts a handled (non-fatal) per-page failure as processed', async () => { + const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']); + // No statusCode -> non-fatal -> isolate and continue; each counts as done. + jest.spyOn(service, 'reindexPage').mockRejectedValue(new Error('boom')); + + await service.reindexWorkspace(WORKSPACE_ID); + + expect(reindexProgress.increment).toHaveBeenCalledTimes(3); + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + }); + + it('clears progress in finally even when a FATAL provider error aborts the batch', async () => { + const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']); + // A 401 aborts on the first page (re-thrown) — the finally must still clear. + jest + .spyOn(service, 'reindexPage') + .mockRejectedValue({ statusCode: 401, message: 'User not found' }); + + await expect(service.reindexWorkspace(WORKSPACE_ID)).rejects.toMatchObject({ + statusCode: 401, + }); + + expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3); + // Aborted page is NOT counted as processed. + expect(reindexProgress.increment).not.toHaveBeenCalled(); + // But progress is still cleared so the run never gets stuck. + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + }); + + it('clears the enqueue-seeded progress on an unconfigured early return', async () => { + const { service, aiService, reindexProgress } = makeService(); + // Embeddings not configured: reindexWorkspace returns early WITHOUT starting + // a fresh record, but the finally must still clear the enqueue-time seed. + aiService.getEmbeddingModel = jest + .fn() + .mockRejectedValue(new AiEmbeddingNotConfiguredException()); + + await expect( + service.reindexWorkspace(WORKSPACE_ID), + ).resolves.toBeUndefined(); + + expect(reindexProgress.start).not.toHaveBeenCalled(); + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID); + }); +}); diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts index 5b49d92d..e8a9f2d0 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts @@ -9,6 +9,7 @@ import { KyselyDB } from '@docmost/db/types/kysely.types'; import { InjectKysely } from 'nestjs-kysely'; import { executeTx } from '@docmost/db/utils'; import { AiService } from '../../../integrations/ai/ai.service'; +import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service'; import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception'; import { describeProviderError, @@ -48,6 +49,7 @@ export class EmbeddingIndexerService { private readonly pageRepo: PageRepo, private readonly pageEmbeddingRepo: PageEmbeddingRepo, private readonly aiService: AiService, + private readonly reindexProgress: EmbeddingReindexProgressService, @InjectKysely() private readonly db: KyselyDB, ) {} @@ -194,69 +196,89 @@ export class EmbeddingIndexerService { * the batch. */ async reindexWorkspace(workspaceId: string): Promise { + // The whole run is wrapped so the per-workspace progress record is ALWAYS + // cleared in the finally — on success, on a fatal-provider abort, on an + // unconfigured early-return, or on any unexpected throw — so a failed run + // never leaves a stuck "reindexing" state (the status then falls back to the + // steady-state DB coverage count). A placeholder record may already exist + // (seeded at enqueue time); the finally cleans that too. try { - await this.aiService.getEmbeddingModel(workspaceId); - } catch (err) { - if (err instanceof AiEmbeddingNotConfiguredException) { - this.logger.log( - `reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`, - ); - return; - } - throw err; - } - - const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId); - const total = pageIds.length; - const startedAt = Date.now(); - this.logger.log( - `reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`, - ); - - let failed = 0; - for (let i = 0; i < total; i++) { - const pageId = pageIds[i]; - const position = i + 1; - // Log BEFORE the await: if the embedding call hangs, this is the last line - // in the log and it names the exact page that is stuck. - this.logger.log( - `reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`, - ); - const pageStartedAt = Date.now(); try { - await this.reindexPage(pageId); - const elapsed = Date.now() - pageStartedAt; - if (elapsed >= SLOW_PAGE_MS) { - this.logger.warn( - `reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`, - ); - } + await this.aiService.getEmbeddingModel(workspaceId); } catch (err) { - // A fatal provider error (invalid/missing key, no credits) recurs - // identically on EVERY remaining page. Abort the whole batch instead of - // issuing hundreds of doomed requests against the provider. - if (isFatalProviderError(err)) { - this.logger.error( - `reindexWorkspace: aborting at [${position}/${total}] for workspace ` + - `${workspaceId} — fatal provider error, remaining pages would fail ` + - `identically: ${describeProviderError(err)}`, + if (err instanceof AiEmbeddingNotConfiguredException) { + this.logger.log( + `reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`, ); - throw err; + return; } - // Per-page isolation: one non-fatal failure (incl. an embedding timeout) - // must not abort the whole batch. - failed++; - this.logger.error( - `reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` + - `after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`, - ); + throw err; } - } - this.logger.log( - `reindexWorkspace: done for workspace ${workspaceId}: ` + - `${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`, - ); + const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId); + const total = pageIds.length; + const startedAt = Date.now(); + // Publish the live run progress (overwrites the enqueue-time placeholder + // with the real page count, done back to 0) so the settings status can + // report done climbing 0 -> total while this reindex runs. + await this.reindexProgress.start(workspaceId, total); + this.logger.log( + `reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`, + ); + + let failed = 0; + for (let i = 0; i < total; i++) { + const pageId = pageIds[i]; + const position = i + 1; + // Log BEFORE the await: if the embedding call hangs, this is the last line + // in the log and it names the exact page that is stuck. + this.logger.log( + `reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`, + ); + const pageStartedAt = Date.now(); + try { + await this.reindexPage(pageId); + // Count this page as processed (matches the [position/total] log). + await this.reindexProgress.increment(workspaceId); + const elapsed = Date.now() - pageStartedAt; + if (elapsed >= SLOW_PAGE_MS) { + this.logger.warn( + `reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`, + ); + } + } catch (err) { + // A fatal provider error (invalid/missing key, no credits) recurs + // identically on EVERY remaining page. Abort the whole batch instead of + // issuing hundreds of doomed requests against the provider. Do NOT count + // it as processed — the run aborts here (the finally clears progress). + if (isFatalProviderError(err)) { + this.logger.error( + `reindexWorkspace: aborting at [${position}/${total}] for workspace ` + + `${workspaceId} — fatal provider error, remaining pages would fail ` + + `identically: ${describeProviderError(err)}`, + ); + throw err; + } + // Per-page isolation: one non-fatal failure (incl. an embedding timeout) + // must not abort the whole batch. A handled failure still advances the + // counter (matches the [position/total] log, so done reaches total). + failed++; + await this.reindexProgress.increment(workspaceId); + this.logger.error( + `reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` + + `after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`, + ); + } + } + + this.logger.log( + `reindexWorkspace: done for workspace ${workspaceId}: ` + + `${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`, + ); + } finally { + // Always remove the progress record so the status reverts to the DB count. + await this.reindexProgress.clear(workspaceId); + } } /** Purge ALL embeddings for a workspace (WORKSPACE_DELETE_EMBEDDINGS). */ diff --git a/apps/server/src/integrations/ai/ai-settings.service.spec.ts b/apps/server/src/integrations/ai/ai-settings.service.spec.ts index b0efaa21..67cbc8b5 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.spec.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.spec.ts @@ -1,4 +1,12 @@ -import { parsePositiveInt } from './ai-settings.service'; +import { AiSettingsService, parsePositiveInt } from './ai-settings.service'; +import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo'; +import { AiAgentRoleRepo } from '@docmost/db/repos/ai-agent-roles/ai-agent-roles.repo'; +import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; +import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo'; +import { PageRepo } from '@docmost/db/repos/page/page.repo'; +import { SecretBoxService } from '../crypto/secret-box'; +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; +import type { Queue } from 'bullmq'; /** * Round-trip coercion for numeric `::text` provider settings (e.g. @@ -41,3 +49,74 @@ describe('parsePositiveInt', () => { expect(parsePositiveInt(42)).toBe(42); }); }); + +/** + * getMasked must surface the LIVE reindex run progress while a reindex is active + * (so the "Indexed X of Y" counter can climb 0 -> total), and fall back to the + * steady-state DB coverage count (countIndexedPages / countEmbeddablePages) when + * no reindex is running. This is the server side of the fix for the counter that + * otherwise stays stuck at "478 of 478" the whole reindex. + */ +describe('AiSettingsService.getMasked reindex progress', () => { + const WORKSPACE_ID = 'ws-1'; + + function makeService() { + // No driver configured -> the credentials lookup is skipped, keeping the + // setup minimal; we only care about the indexed/total numbers here. + const workspaceRepo = { + findById: jest.fn().mockResolvedValue({ settings: {} }), + }; + const aiAgentRoleRepo = {}; + const aiProviderCredentialsRepo = { find: jest.fn() }; + const pageEmbeddingRepo = { + countIndexedPages: jest.fn().mockResolvedValue(478), + }; + const pageRepo = { + countEmbeddablePages: jest.fn().mockResolvedValue(478), + }; + const secretBox = {}; + const reindexProgress = { + get: jest.fn().mockResolvedValue(null), + }; + const aiQueue = {}; + + const service = new AiSettingsService( + workspaceRepo as unknown as WorkspaceRepo, + aiAgentRoleRepo as unknown as AiAgentRoleRepo, + aiProviderCredentialsRepo as unknown as AiProviderCredentialsRepo, + pageEmbeddingRepo as unknown as PageEmbeddingRepo, + pageRepo as unknown as PageRepo, + secretBox as unknown as SecretBoxService, + reindexProgress as unknown as EmbeddingReindexProgressService, + aiQueue as unknown as Queue, + ); + return { service, reindexProgress, pageEmbeddingRepo }; + } + + it('reports the live run numbers when a reindex progress record is active', async () => { + const { service, reindexProgress } = makeService(); + // Mid-run: 120 of 478 pages processed. + reindexProgress.get.mockResolvedValue({ + total: 478, + done: 120, + startedAt: Date.now(), + }); + + const masked = await service.getMasked(WORKSPACE_ID); + + expect(masked.indexedPages).toBe(120); + expect(masked.totalPages).toBe(478); + expect(masked.reindexing).toBe(true); + }); + + it('falls back to countIndexedPages when no reindex is active', async () => { + const { service, reindexProgress } = makeService(); + reindexProgress.get.mockResolvedValue(null); + + const masked = await service.getMasked(WORKSPACE_ID); + + expect(masked.indexedPages).toBe(478); + expect(masked.totalPages).toBe(478); + expect(masked.reindexing).toBe(false); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-settings.service.ts b/apps/server/src/integrations/ai/ai-settings.service.ts index 2ccf5580..ff32d820 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.ts @@ -8,6 +8,7 @@ import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo'; import { PageRepo } from '@docmost/db/repos/page/page.repo'; import { SecretBoxService } from '../crypto/secret-box'; +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; import { AiDriver, AiProviderSettings, @@ -74,6 +75,7 @@ export class AiSettingsService { private readonly pageEmbeddingRepo: PageEmbeddingRepo, private readonly pageRepo: PageRepo, private readonly secretBox: SecretBoxService, + private readonly reindexProgress: EmbeddingReindexProgressService, @InjectQueue(QueueName.AI_QUEUE) private readonly aiQueue: Queue, ) {} @@ -100,6 +102,15 @@ export class AiSettingsService { .remove(`ai-search-disabled-${workspaceId}`) .catch(() => undefined); + // Seed a live progress record BEFORE enqueueing so the very first status + // poll already reports done=0 (the reindex POST returns the PRE-job counts, + // so without this seed the first poll would still show "total of total"). + // The worker overwrites `total` with the real page count, increments `done` + // as it runs, and clears the record in a finally. `totalPages` uses the same + // source the status endpoint reports, so the counter denominator matches. + const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); + await this.reindexProgress.start(workspaceId, totalPages); + const jobId = `ai-reindex-${workspaceId}`; // Clear a prior non-active entry so a stale job can't block this reindex. // A locked/active job is left in place (remove() no-ops) and the add() below @@ -261,6 +272,15 @@ export class AiSettingsService { this.pageRepo.countEmbeddablePages(workspaceId), ]); + // While a reindex run is active, report its LIVE progress (done climbs 0 -> + // total) so the settings UI can watch it advance. Without this the counter + // never drops: the per-page reindex hard-replaces rows in its own small + // transaction, so countIndexedPages stays ~= total for the whole run. With + // no active record we fall back to the steady-state DB coverage count, which + // preserves the existing display and the client's "done == total -> stop + // polling" condition (the run ends -> record cleared -> DB count == total). + const progress = await this.reindexProgress.get(workspaceId); + return { driver: provider.driver, chatModel: provider.chatModel, @@ -279,8 +299,10 @@ export class AiSettingsService { hasApiKey, hasEmbeddingApiKey, hasSttApiKey, - indexedPages, - totalPages, + indexedPages: progress ? progress.done : indexedPages, + totalPages: progress ? progress.total : totalPages, + // Optional hint for the client: a reindex run is currently in progress. + reindexing: progress != null, }; } diff --git a/apps/server/src/integrations/ai/ai.module.ts b/apps/server/src/integrations/ai/ai.module.ts index 6d0ec3e9..a38c7f04 100644 --- a/apps/server/src/integrations/ai/ai.module.ts +++ b/apps/server/src/integrations/ai/ai.module.ts @@ -5,6 +5,7 @@ import { QueueName } from '../queue/constants'; import { AiService } from './ai.service'; import { AiSettingsService } from './ai-settings.service'; import { AiSettingsController } from './ai-settings.controller'; +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; /** * LLM driver + provider-settings unit (§6.2/§6.4). @@ -19,7 +20,7 @@ import { AiSettingsController } from './ai-settings.controller'; BullModule.registerQueue({ name: QueueName.AI_QUEUE }), ], controllers: [AiSettingsController], - providers: [AiService, AiSettingsService], - exports: [AiService, AiSettingsService], + providers: [AiService, AiSettingsService, EmbeddingReindexProgressService], + exports: [AiService, AiSettingsService, EmbeddingReindexProgressService], }) export class AiModule {} diff --git a/apps/server/src/integrations/ai/ai.types.ts b/apps/server/src/integrations/ai/ai.types.ts index efad9857..06bf83e3 100644 --- a/apps/server/src/integrations/ai/ai.types.ts +++ b/apps/server/src/integrations/ai/ai.types.ts @@ -146,4 +146,7 @@ export interface MaskedAiSettings { // RAG indexing coverage for the settings UI. indexedPages: number; totalPages: number; + // True while a full workspace reindex is actively running (the counts above + // then reflect the live run progress rather than the steady-state DB count). + reindexing?: boolean; } diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts new file mode 100644 index 00000000..ff8d164d --- /dev/null +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts @@ -0,0 +1,143 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; + +/** + * Live progress of an in-flight workspace embeddings reindex run. + * `total` is the number of pages the run will process, `done` how many it has + * already processed (success OR handled failure), `startedAt` the epoch-ms the + * record was created. + */ +export interface ReindexProgress { + total: number; + done: number; + startedAt: number; +} + +/** Redis key namespace for the per-workspace reindex-progress record. */ +const KEY_PREFIX = 'ai:reindex:progress:'; + +/** + * TTL (seconds) on the progress record so a crashed/aborted worker that never + * reaches its `clear()` finally can still self-clean instead of leaving a stuck + * "reindexing" state. Refreshed on every increment so a long run never expires + * mid-flight; on a crash it disappears within TTL of the last processed page. + */ +const TTL_SECONDS = 60 * 60; // 1h + +/** + * Cluster-wide store for the live progress of a workspace embeddings reindex. + * + * The reindex runs in a BullMQ worker (AI_QUEUE) that may be a DIFFERENT process + * than the API handling the settings-status GET, so the progress must live in + * the shared Redis — we reuse the same global ioredis client (RedisService from + * @nestjs-labs/nestjs-ioredis) that backs BullMQ and the other anti-abuse + * limiters, adding NO new Redis config. + * + * Everything here is best-effort and COSMETIC: progress only drives the "Indexed + * X of Y" counter while a reindex is running. Any Redis failure degrades to the + * existing steady-state behaviour (the status falls back to the DB coverage + * count), so reads fail to `null` and writes are swallowed — a reindex must + * never break because progress reporting did. + * + * Stored as a Redis HASH so `done` can be bumped with an atomic HINCRBY (the + * worker is the only writer of `done`, but HINCRBY also keeps us off a + * read-modify-write race and preserves the other fields). + */ +@Injectable() +export class EmbeddingReindexProgressService { + private readonly logger = new Logger(EmbeddingReindexProgressService.name); + private readonly redis: Redis; + + constructor(redisService: RedisService) { + this.redis = redisService.getOrThrow(); + } + + private key(workspaceId: string): string { + return KEY_PREFIX + workspaceId; + } + + /** + * Begin (or reset) the progress record for a workspace: `total` pages, `done` + * back to 0, `startedAt` now. Called at reindex enqueue time (placeholder + * total, so the very first status poll already reports done=0) and again at + * the worker start (overwriting `total` with the real page count). Resets + * `done` to 0 so a re-trigger never inherits a stale count. + */ + async start(workspaceId: string, total: number): Promise { + const key = this.key(workspaceId); + try { + await this.redis + .multi() + .hset(key, { + total: String(total), + done: '0', + startedAt: String(Date.now()), + }) + .expire(key, TTL_SECONDS) + .exec(); + } catch (err) { + this.logger.warn( + `reindex-progress start failed for workspace ${workspaceId}; ` + + `progress reporting disabled for this run: ${(err as Error).message}`, + ); + } + } + + /** + * Bump the processed-page counter by one and refresh the TTL. Atomic and + * best-effort: a missing key (cleared/expired) would be recreated with only + * `done`, but `get()` treats a record without a numeric `total` as inactive, + * so that partial state safely reads as "no active reindex". + */ + async increment(workspaceId: string): Promise { + const key = this.key(workspaceId); + try { + await this.redis.multi().hincrby(key, 'done', 1).expire(key, TTL_SECONDS).exec(); + } catch (err) { + this.logger.warn( + `reindex-progress increment failed for workspace ${workspaceId}: ` + + `${(err as Error).message}`, + ); + } + } + + /** + * Remove the progress record. Called in the worker's `finally` so a completed, + * aborted, or unconfigured-early-return run never leaves a stuck record; the + * status then falls back to the DB coverage count. + */ + async clear(workspaceId: string): Promise { + try { + await this.redis.del(this.key(workspaceId)); + } catch (err) { + this.logger.warn( + `reindex-progress clear failed for workspace ${workspaceId} ` + + `(self-cleans via TTL): ${(err as Error).message}`, + ); + } + } + + /** + * Read the live progress, or `null` when no reindex is active (no record, an + * expired record, or a partial record without a numeric `total`). On a Redis + * error returns `null` so the status endpoint degrades to its DB count. + */ + async get(workspaceId: string): Promise { + try { + const data = await this.redis.hgetall(this.key(workspaceId)); + if (!data || data.total === undefined) return null; + const total = Number(data.total); + const done = Number(data.done); + const startedAt = Number(data.startedAt); + if (!Number.isFinite(total) || !Number.isFinite(done)) return null; + return { total, done, startedAt: Number.isFinite(startedAt) ? startedAt : 0 }; + } catch (err) { + this.logger.warn( + `reindex-progress read failed for workspace ${workspaceId}; ` + + `falling back to DB count: ${(err as Error).message}`, + ); + return null; + } + } +}