From 72bb03918d9416ff0b9b7fec8fb49330cd1ef59a Mon Sep 17 00:00:00 2001 From: a Date: Sun, 28 Jun 2026 01:44:09 +0300 Subject: [PATCH 1/6] fix(ai): show live reindex progress in semantic-search settings The "Indexed X of Y pages" counter stayed stuck at "478 of 478" during a manual "Reindex now" run instead of resetting to 0 and climbing. The status reports indexedPages = countIndexedPages (DISTINCT pages with >=1 embedding row), but reindex hard-replaces each page in its OWN small transaction, so nearly all pages always have rows -> the count never drops. Add a per-workspace live reindex-progress record in Redis (reusing the existing global ioredis client via RedisService, no new Redis config): - EmbeddingReindexProgressService: start/increment/clear/get over a Redis hash with a 1h TTL self-clean; all best-effort/cosmetic so a Redis failure degrades to the existing DB-count behavior. - AiSettingsService.reindex seeds {total, done:0, startedAt} at enqueue time so the very first poll already reports done=0. - EmbeddingIndexerService.reindexWorkspace overwrites total with the real page count at start, increments done per processed page (success or handled failure), and clears the record in a finally (covers success, fatal abort, and the unconfigured early-return) so a failed run never sticks. - AiSettingsService.getMasked returns the live run numbers when a progress record is active (plus an optional reindexing flag), else falls back to countIndexedPages/countEmbeddablePages. Per-page edits (reindexPage) never touch the workspace progress record, and no mass up-front delete is introduced (search availability preserved). Tests: indexer sets/increments/clears progress (incl. fatal abort and unconfigured early-return); status reports run progress when active and falls back when not. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../workspace/services/ai-settings-service.ts | 3 + .../embedding-indexer.service.spec.ts | 110 +++++++++++++- .../embedding/embedding-indexer.service.ts | 134 +++++++++------- .../ai/ai-settings.service.spec.ts | 81 +++++++++- .../integrations/ai/ai-settings.service.ts | 26 +++- apps/server/src/integrations/ai/ai.module.ts | 5 +- apps/server/src/integrations/ai/ai.types.ts | 3 + .../ai/embedding-reindex-progress.service.ts | 143 ++++++++++++++++++ 8 files changed, 443 insertions(+), 62 deletions(-) create mode 100644 apps/server/src/integrations/ai/embedding-reindex-progress.service.ts diff --git a/apps/client/src/features/workspace/services/ai-settings-service.ts b/apps/client/src/features/workspace/services/ai-settings-service.ts index c099ca0c..e12d1ebb 100644 --- a/apps/client/src/features/workspace/services/ai-settings-service.ts +++ b/apps/client/src/features/workspace/services/ai-settings-service.ts @@ -48,6 +48,9 @@ export interface IAiSettings { // RAG indexing coverage (pages indexed for semantic search). indexedPages: number; totalPages: number; + // True while a full workspace reindex is actively running; the counts above + // then reflect the live run progress (done climbs 0 -> total). + reindexing?: boolean; } // Update payload. Key semantics (same for `apiKey` and `embeddingApiKey`): diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts index 928702b3..8793ecd7 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.spec.ts @@ -3,6 +3,8 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo'; import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo'; import { KyselyDB } from '@docmost/db/types/kysely.types'; import { AiService } from '../../../integrations/ai/ai.service'; +import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service'; +import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception'; /** * Unit tests for EmbeddingIndexerService.reindexWorkspace's batch control flow. @@ -30,15 +32,24 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => { const aiService = { getEmbeddingModel: jest.fn().mockResolvedValue('some-model'), }; + // Progress is a best-effort cosmetic store; mock its async methods so the + // batch control flow can be tested without Redis. + const reindexProgress = { + start: jest.fn().mockResolvedValue(undefined), + increment: jest.fn().mockResolvedValue(undefined), + clear: jest.fn().mockResolvedValue(undefined), + get: jest.fn().mockResolvedValue(null), + }; const db = {}; const service = new EmbeddingIndexerService( pageRepo as unknown as PageRepo, pageEmbeddingRepo as unknown as PageEmbeddingRepo, aiService as unknown as AiService, + reindexProgress as unknown as EmbeddingReindexProgressService, db as unknown as KyselyDB, ); - return { service, pageRepo, aiService }; + return { service, pageRepo, aiService, reindexProgress }; } it('aborts after the first page on a FATAL (401) provider error', async () => { @@ -78,3 +89,100 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => { expect(reindexPage).toHaveBeenCalledTimes(3); }); }); + +/** + * Live reindex-progress reporting: reindexWorkspace must publish a per-workspace + * progress record (total at start, done incremented per processed page) and ALWAYS + * clear it in a finally — including on a fatal abort and an unconfigured early + * return — so the settings status can show the counter climb without ever getting + * stuck in a "reindexing" state. + */ +describe('EmbeddingIndexerService.reindexWorkspace progress', () => { + const WORKSPACE_ID = 'ws-1'; + + function makeService(pageIds: string[] = ['p1', 'p2', 'p3']) { + const pageRepo = { + getIdsByWorkspace: jest.fn().mockResolvedValue(pageIds), + }; + const pageEmbeddingRepo = {}; + const aiService = { + getEmbeddingModel: jest.fn().mockResolvedValue('some-model'), + }; + const reindexProgress = { + start: jest.fn().mockResolvedValue(undefined), + increment: jest.fn().mockResolvedValue(undefined), + clear: jest.fn().mockResolvedValue(undefined), + get: jest.fn().mockResolvedValue(null), + }; + const db = {}; + const service = new EmbeddingIndexerService( + pageRepo as unknown as PageRepo, + pageEmbeddingRepo as unknown as PageEmbeddingRepo, + aiService as unknown as AiService, + reindexProgress as unknown as EmbeddingReindexProgressService, + db as unknown as KyselyDB, + ); + return { service, pageRepo, aiService, reindexProgress }; + } + + it('sets total at start, increments done per page, and clears in finally', async () => { + const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']); + jest.spyOn(service, 'reindexPage').mockResolvedValue(undefined); + + await service.reindexWorkspace(WORKSPACE_ID); + + expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3); + // One increment per processed page. + expect(reindexProgress.increment).toHaveBeenCalledTimes(3); + expect(reindexProgress.increment).toHaveBeenCalledWith(WORKSPACE_ID); + // Cleared exactly once on completion. + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID); + }); + + it('counts a handled (non-fatal) per-page failure as processed', async () => { + const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']); + // No statusCode -> non-fatal -> isolate and continue; each counts as done. + jest.spyOn(service, 'reindexPage').mockRejectedValue(new Error('boom')); + + await service.reindexWorkspace(WORKSPACE_ID); + + expect(reindexProgress.increment).toHaveBeenCalledTimes(3); + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + }); + + it('clears progress in finally even when a FATAL provider error aborts the batch', async () => { + const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']); + // A 401 aborts on the first page (re-thrown) — the finally must still clear. + jest + .spyOn(service, 'reindexPage') + .mockRejectedValue({ statusCode: 401, message: 'User not found' }); + + await expect(service.reindexWorkspace(WORKSPACE_ID)).rejects.toMatchObject({ + statusCode: 401, + }); + + expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3); + // Aborted page is NOT counted as processed. + expect(reindexProgress.increment).not.toHaveBeenCalled(); + // But progress is still cleared so the run never gets stuck. + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + }); + + it('clears the enqueue-seeded progress on an unconfigured early return', async () => { + const { service, aiService, reindexProgress } = makeService(); + // Embeddings not configured: reindexWorkspace returns early WITHOUT starting + // a fresh record, but the finally must still clear the enqueue-time seed. + aiService.getEmbeddingModel = jest + .fn() + .mockRejectedValue(new AiEmbeddingNotConfiguredException()); + + await expect( + service.reindexWorkspace(WORKSPACE_ID), + ).resolves.toBeUndefined(); + + expect(reindexProgress.start).not.toHaveBeenCalled(); + expect(reindexProgress.clear).toHaveBeenCalledTimes(1); + expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID); + }); +}); diff --git a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts index 5b49d92d..e8a9f2d0 100644 --- a/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts +++ b/apps/server/src/core/ai-chat/embedding/embedding-indexer.service.ts @@ -9,6 +9,7 @@ import { KyselyDB } from '@docmost/db/types/kysely.types'; import { InjectKysely } from 'nestjs-kysely'; import { executeTx } from '@docmost/db/utils'; import { AiService } from '../../../integrations/ai/ai.service'; +import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service'; import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception'; import { describeProviderError, @@ -48,6 +49,7 @@ export class EmbeddingIndexerService { private readonly pageRepo: PageRepo, private readonly pageEmbeddingRepo: PageEmbeddingRepo, private readonly aiService: AiService, + private readonly reindexProgress: EmbeddingReindexProgressService, @InjectKysely() private readonly db: KyselyDB, ) {} @@ -194,69 +196,89 @@ export class EmbeddingIndexerService { * the batch. */ async reindexWorkspace(workspaceId: string): Promise { + // The whole run is wrapped so the per-workspace progress record is ALWAYS + // cleared in the finally — on success, on a fatal-provider abort, on an + // unconfigured early-return, or on any unexpected throw — so a failed run + // never leaves a stuck "reindexing" state (the status then falls back to the + // steady-state DB coverage count). A placeholder record may already exist + // (seeded at enqueue time); the finally cleans that too. try { - await this.aiService.getEmbeddingModel(workspaceId); - } catch (err) { - if (err instanceof AiEmbeddingNotConfiguredException) { - this.logger.log( - `reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`, - ); - return; - } - throw err; - } - - const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId); - const total = pageIds.length; - const startedAt = Date.now(); - this.logger.log( - `reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`, - ); - - let failed = 0; - for (let i = 0; i < total; i++) { - const pageId = pageIds[i]; - const position = i + 1; - // Log BEFORE the await: if the embedding call hangs, this is the last line - // in the log and it names the exact page that is stuck. - this.logger.log( - `reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`, - ); - const pageStartedAt = Date.now(); try { - await this.reindexPage(pageId); - const elapsed = Date.now() - pageStartedAt; - if (elapsed >= SLOW_PAGE_MS) { - this.logger.warn( - `reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`, - ); - } + await this.aiService.getEmbeddingModel(workspaceId); } catch (err) { - // A fatal provider error (invalid/missing key, no credits) recurs - // identically on EVERY remaining page. Abort the whole batch instead of - // issuing hundreds of doomed requests against the provider. - if (isFatalProviderError(err)) { - this.logger.error( - `reindexWorkspace: aborting at [${position}/${total}] for workspace ` + - `${workspaceId} — fatal provider error, remaining pages would fail ` + - `identically: ${describeProviderError(err)}`, + if (err instanceof AiEmbeddingNotConfiguredException) { + this.logger.log( + `reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`, ); - throw err; + return; } - // Per-page isolation: one non-fatal failure (incl. an embedding timeout) - // must not abort the whole batch. - failed++; - this.logger.error( - `reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` + - `after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`, - ); + throw err; } - } - this.logger.log( - `reindexWorkspace: done for workspace ${workspaceId}: ` + - `${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`, - ); + const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId); + const total = pageIds.length; + const startedAt = Date.now(); + // Publish the live run progress (overwrites the enqueue-time placeholder + // with the real page count, done back to 0) so the settings status can + // report done climbing 0 -> total while this reindex runs. + await this.reindexProgress.start(workspaceId, total); + this.logger.log( + `reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`, + ); + + let failed = 0; + for (let i = 0; i < total; i++) { + const pageId = pageIds[i]; + const position = i + 1; + // Log BEFORE the await: if the embedding call hangs, this is the last line + // in the log and it names the exact page that is stuck. + this.logger.log( + `reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`, + ); + const pageStartedAt = Date.now(); + try { + await this.reindexPage(pageId); + // Count this page as processed (matches the [position/total] log). + await this.reindexProgress.increment(workspaceId); + const elapsed = Date.now() - pageStartedAt; + if (elapsed >= SLOW_PAGE_MS) { + this.logger.warn( + `reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`, + ); + } + } catch (err) { + // A fatal provider error (invalid/missing key, no credits) recurs + // identically on EVERY remaining page. Abort the whole batch instead of + // issuing hundreds of doomed requests against the provider. Do NOT count + // it as processed — the run aborts here (the finally clears progress). + if (isFatalProviderError(err)) { + this.logger.error( + `reindexWorkspace: aborting at [${position}/${total}] for workspace ` + + `${workspaceId} — fatal provider error, remaining pages would fail ` + + `identically: ${describeProviderError(err)}`, + ); + throw err; + } + // Per-page isolation: one non-fatal failure (incl. an embedding timeout) + // must not abort the whole batch. A handled failure still advances the + // counter (matches the [position/total] log, so done reaches total). + failed++; + await this.reindexProgress.increment(workspaceId); + this.logger.error( + `reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` + + `after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`, + ); + } + } + + this.logger.log( + `reindexWorkspace: done for workspace ${workspaceId}: ` + + `${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`, + ); + } finally { + // Always remove the progress record so the status reverts to the DB count. + await this.reindexProgress.clear(workspaceId); + } } /** Purge ALL embeddings for a workspace (WORKSPACE_DELETE_EMBEDDINGS). */ diff --git a/apps/server/src/integrations/ai/ai-settings.service.spec.ts b/apps/server/src/integrations/ai/ai-settings.service.spec.ts index b0efaa21..67cbc8b5 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.spec.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.spec.ts @@ -1,4 +1,12 @@ -import { parsePositiveInt } from './ai-settings.service'; +import { AiSettingsService, parsePositiveInt } from './ai-settings.service'; +import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo'; +import { AiAgentRoleRepo } from '@docmost/db/repos/ai-agent-roles/ai-agent-roles.repo'; +import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo'; +import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo'; +import { PageRepo } from '@docmost/db/repos/page/page.repo'; +import { SecretBoxService } from '../crypto/secret-box'; +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; +import type { Queue } from 'bullmq'; /** * Round-trip coercion for numeric `::text` provider settings (e.g. @@ -41,3 +49,74 @@ describe('parsePositiveInt', () => { expect(parsePositiveInt(42)).toBe(42); }); }); + +/** + * getMasked must surface the LIVE reindex run progress while a reindex is active + * (so the "Indexed X of Y" counter can climb 0 -> total), and fall back to the + * steady-state DB coverage count (countIndexedPages / countEmbeddablePages) when + * no reindex is running. This is the server side of the fix for the counter that + * otherwise stays stuck at "478 of 478" the whole reindex. + */ +describe('AiSettingsService.getMasked reindex progress', () => { + const WORKSPACE_ID = 'ws-1'; + + function makeService() { + // No driver configured -> the credentials lookup is skipped, keeping the + // setup minimal; we only care about the indexed/total numbers here. + const workspaceRepo = { + findById: jest.fn().mockResolvedValue({ settings: {} }), + }; + const aiAgentRoleRepo = {}; + const aiProviderCredentialsRepo = { find: jest.fn() }; + const pageEmbeddingRepo = { + countIndexedPages: jest.fn().mockResolvedValue(478), + }; + const pageRepo = { + countEmbeddablePages: jest.fn().mockResolvedValue(478), + }; + const secretBox = {}; + const reindexProgress = { + get: jest.fn().mockResolvedValue(null), + }; + const aiQueue = {}; + + const service = new AiSettingsService( + workspaceRepo as unknown as WorkspaceRepo, + aiAgentRoleRepo as unknown as AiAgentRoleRepo, + aiProviderCredentialsRepo as unknown as AiProviderCredentialsRepo, + pageEmbeddingRepo as unknown as PageEmbeddingRepo, + pageRepo as unknown as PageRepo, + secretBox as unknown as SecretBoxService, + reindexProgress as unknown as EmbeddingReindexProgressService, + aiQueue as unknown as Queue, + ); + return { service, reindexProgress, pageEmbeddingRepo }; + } + + it('reports the live run numbers when a reindex progress record is active', async () => { + const { service, reindexProgress } = makeService(); + // Mid-run: 120 of 478 pages processed. + reindexProgress.get.mockResolvedValue({ + total: 478, + done: 120, + startedAt: Date.now(), + }); + + const masked = await service.getMasked(WORKSPACE_ID); + + expect(masked.indexedPages).toBe(120); + expect(masked.totalPages).toBe(478); + expect(masked.reindexing).toBe(true); + }); + + it('falls back to countIndexedPages when no reindex is active', async () => { + const { service, reindexProgress } = makeService(); + reindexProgress.get.mockResolvedValue(null); + + const masked = await service.getMasked(WORKSPACE_ID); + + expect(masked.indexedPages).toBe(478); + expect(masked.totalPages).toBe(478); + expect(masked.reindexing).toBe(false); + }); +}); diff --git a/apps/server/src/integrations/ai/ai-settings.service.ts b/apps/server/src/integrations/ai/ai-settings.service.ts index 2ccf5580..ff32d820 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.ts @@ -8,6 +8,7 @@ import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo'; import { PageRepo } from '@docmost/db/repos/page/page.repo'; import { SecretBoxService } from '../crypto/secret-box'; +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; import { AiDriver, AiProviderSettings, @@ -74,6 +75,7 @@ export class AiSettingsService { private readonly pageEmbeddingRepo: PageEmbeddingRepo, private readonly pageRepo: PageRepo, private readonly secretBox: SecretBoxService, + private readonly reindexProgress: EmbeddingReindexProgressService, @InjectQueue(QueueName.AI_QUEUE) private readonly aiQueue: Queue, ) {} @@ -100,6 +102,15 @@ export class AiSettingsService { .remove(`ai-search-disabled-${workspaceId}`) .catch(() => undefined); + // Seed a live progress record BEFORE enqueueing so the very first status + // poll already reports done=0 (the reindex POST returns the PRE-job counts, + // so without this seed the first poll would still show "total of total"). + // The worker overwrites `total` with the real page count, increments `done` + // as it runs, and clears the record in a finally. `totalPages` uses the same + // source the status endpoint reports, so the counter denominator matches. + const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); + await this.reindexProgress.start(workspaceId, totalPages); + const jobId = `ai-reindex-${workspaceId}`; // Clear a prior non-active entry so a stale job can't block this reindex. // A locked/active job is left in place (remove() no-ops) and the add() below @@ -261,6 +272,15 @@ export class AiSettingsService { this.pageRepo.countEmbeddablePages(workspaceId), ]); + // While a reindex run is active, report its LIVE progress (done climbs 0 -> + // total) so the settings UI can watch it advance. Without this the counter + // never drops: the per-page reindex hard-replaces rows in its own small + // transaction, so countIndexedPages stays ~= total for the whole run. With + // no active record we fall back to the steady-state DB coverage count, which + // preserves the existing display and the client's "done == total -> stop + // polling" condition (the run ends -> record cleared -> DB count == total). + const progress = await this.reindexProgress.get(workspaceId); + return { driver: provider.driver, chatModel: provider.chatModel, @@ -279,8 +299,10 @@ export class AiSettingsService { hasApiKey, hasEmbeddingApiKey, hasSttApiKey, - indexedPages, - totalPages, + indexedPages: progress ? progress.done : indexedPages, + totalPages: progress ? progress.total : totalPages, + // Optional hint for the client: a reindex run is currently in progress. + reindexing: progress != null, }; } diff --git a/apps/server/src/integrations/ai/ai.module.ts b/apps/server/src/integrations/ai/ai.module.ts index 6d0ec3e9..a38c7f04 100644 --- a/apps/server/src/integrations/ai/ai.module.ts +++ b/apps/server/src/integrations/ai/ai.module.ts @@ -5,6 +5,7 @@ import { QueueName } from '../queue/constants'; import { AiService } from './ai.service'; import { AiSettingsService } from './ai-settings.service'; import { AiSettingsController } from './ai-settings.controller'; +import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service'; /** * LLM driver + provider-settings unit (§6.2/§6.4). @@ -19,7 +20,7 @@ import { AiSettingsController } from './ai-settings.controller'; BullModule.registerQueue({ name: QueueName.AI_QUEUE }), ], controllers: [AiSettingsController], - providers: [AiService, AiSettingsService], - exports: [AiService, AiSettingsService], + providers: [AiService, AiSettingsService, EmbeddingReindexProgressService], + exports: [AiService, AiSettingsService, EmbeddingReindexProgressService], }) export class AiModule {} diff --git a/apps/server/src/integrations/ai/ai.types.ts b/apps/server/src/integrations/ai/ai.types.ts index efad9857..06bf83e3 100644 --- a/apps/server/src/integrations/ai/ai.types.ts +++ b/apps/server/src/integrations/ai/ai.types.ts @@ -146,4 +146,7 @@ export interface MaskedAiSettings { // RAG indexing coverage for the settings UI. indexedPages: number; totalPages: number; + // True while a full workspace reindex is actively running (the counts above + // then reflect the live run progress rather than the steady-state DB count). + reindexing?: boolean; } diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts new file mode 100644 index 00000000..ff8d164d --- /dev/null +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts @@ -0,0 +1,143 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; + +/** + * Live progress of an in-flight workspace embeddings reindex run. + * `total` is the number of pages the run will process, `done` how many it has + * already processed (success OR handled failure), `startedAt` the epoch-ms the + * record was created. + */ +export interface ReindexProgress { + total: number; + done: number; + startedAt: number; +} + +/** Redis key namespace for the per-workspace reindex-progress record. */ +const KEY_PREFIX = 'ai:reindex:progress:'; + +/** + * TTL (seconds) on the progress record so a crashed/aborted worker that never + * reaches its `clear()` finally can still self-clean instead of leaving a stuck + * "reindexing" state. Refreshed on every increment so a long run never expires + * mid-flight; on a crash it disappears within TTL of the last processed page. + */ +const TTL_SECONDS = 60 * 60; // 1h + +/** + * Cluster-wide store for the live progress of a workspace embeddings reindex. + * + * The reindex runs in a BullMQ worker (AI_QUEUE) that may be a DIFFERENT process + * than the API handling the settings-status GET, so the progress must live in + * the shared Redis — we reuse the same global ioredis client (RedisService from + * @nestjs-labs/nestjs-ioredis) that backs BullMQ and the other anti-abuse + * limiters, adding NO new Redis config. + * + * Everything here is best-effort and COSMETIC: progress only drives the "Indexed + * X of Y" counter while a reindex is running. Any Redis failure degrades to the + * existing steady-state behaviour (the status falls back to the DB coverage + * count), so reads fail to `null` and writes are swallowed — a reindex must + * never break because progress reporting did. + * + * Stored as a Redis HASH so `done` can be bumped with an atomic HINCRBY (the + * worker is the only writer of `done`, but HINCRBY also keeps us off a + * read-modify-write race and preserves the other fields). + */ +@Injectable() +export class EmbeddingReindexProgressService { + private readonly logger = new Logger(EmbeddingReindexProgressService.name); + private readonly redis: Redis; + + constructor(redisService: RedisService) { + this.redis = redisService.getOrThrow(); + } + + private key(workspaceId: string): string { + return KEY_PREFIX + workspaceId; + } + + /** + * Begin (or reset) the progress record for a workspace: `total` pages, `done` + * back to 0, `startedAt` now. Called at reindex enqueue time (placeholder + * total, so the very first status poll already reports done=0) and again at + * the worker start (overwriting `total` with the real page count). Resets + * `done` to 0 so a re-trigger never inherits a stale count. + */ + async start(workspaceId: string, total: number): Promise { + const key = this.key(workspaceId); + try { + await this.redis + .multi() + .hset(key, { + total: String(total), + done: '0', + startedAt: String(Date.now()), + }) + .expire(key, TTL_SECONDS) + .exec(); + } catch (err) { + this.logger.warn( + `reindex-progress start failed for workspace ${workspaceId}; ` + + `progress reporting disabled for this run: ${(err as Error).message}`, + ); + } + } + + /** + * Bump the processed-page counter by one and refresh the TTL. Atomic and + * best-effort: a missing key (cleared/expired) would be recreated with only + * `done`, but `get()` treats a record without a numeric `total` as inactive, + * so that partial state safely reads as "no active reindex". + */ + async increment(workspaceId: string): Promise { + const key = this.key(workspaceId); + try { + await this.redis.multi().hincrby(key, 'done', 1).expire(key, TTL_SECONDS).exec(); + } catch (err) { + this.logger.warn( + `reindex-progress increment failed for workspace ${workspaceId}: ` + + `${(err as Error).message}`, + ); + } + } + + /** + * Remove the progress record. Called in the worker's `finally` so a completed, + * aborted, or unconfigured-early-return run never leaves a stuck record; the + * status then falls back to the DB coverage count. + */ + async clear(workspaceId: string): Promise { + try { + await this.redis.del(this.key(workspaceId)); + } catch (err) { + this.logger.warn( + `reindex-progress clear failed for workspace ${workspaceId} ` + + `(self-cleans via TTL): ${(err as Error).message}`, + ); + } + } + + /** + * Read the live progress, or `null` when no reindex is active (no record, an + * expired record, or a partial record without a numeric `total`). On a Redis + * error returns `null` so the status endpoint degrades to its DB count. + */ + async get(workspaceId: string): Promise { + try { + const data = await this.redis.hgetall(this.key(workspaceId)); + if (!data || data.total === undefined) return null; + const total = Number(data.total); + const done = Number(data.done); + const startedAt = Number(data.startedAt); + if (!Number.isFinite(total) || !Number.isFinite(done)) return null; + return { total, done, startedAt: Number.isFinite(startedAt) ? startedAt : 0 }; + } catch (err) { + this.logger.warn( + `reindex-progress read failed for workspace ${workspaceId}; ` + + `falling back to DB count: ${(err as Error).message}`, + ); + return null; + } + } +} -- 2.49.1 From 630939e8f369534c3ad276d7545bfe33353c046c Mon Sep 17 00:00:00 2001 From: a Date: Sun, 28 Jun 2026 01:51:20 +0300 Subject: [PATCH 2/6] feat(ai): tighten reindex-progress polling on the reindexing flag Make the "Indexed N of N" counter update near-realtime during a reindex by tracking the server's active-run state instead of a pure time window: - Set REINDEX_POLL_INTERVAL to 5000ms (kept bounded by the cap). - Extract two pure, exported, unit-tested helpers: - nextReindexPollInterval: keep polling while the server reports an ACTIVE run (reindexing===true) OR within the deadline and not yet done; stop once the run is finished AND fully indexed (reindexing===false && indexed>=total) or the deadline cap is hit (the cap always wins, so a stuck/never-clearing progress record can't poll forever). - isReindexComplete: deadline-clear predicate mirroring that stop condition. - Wire the refetchInterval and the deadline-clearing effect to those helpers. - Keep the Reindex button spinner active for the whole run (loading also while settings.reindexing), reusing the existing loading prop; also blocks a redundant mid-run re-trigger (server de-dupes regardless). No SSE/websockets: polling keyed on the reindexing flag is the intended scope. The counter now tracks the actual active-reindex state and stops promptly when the server reports the run is done. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../components/ai-provider-settings.spec.tsx | 105 ++++++++++++++++++ .../components/ai-provider-settings.tsx | 89 ++++++++++++--- 2 files changed, 176 insertions(+), 18 deletions(-) diff --git a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.spec.tsx b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.spec.tsx index 3b7c9335..147c426d 100644 --- a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.spec.tsx +++ b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.spec.tsx @@ -3,6 +3,8 @@ import { resolveCardStatus, isEndpointConfigured, resolveKeyField, + nextReindexPollInterval, + isReindexComplete, } from './ai-provider-settings'; describe('resolveCardStatus', () => { @@ -71,3 +73,106 @@ describe('resolveKeyField (write-only key payload)', () => { expect(resolveKeyField('', false)).toEqual({ set: false }); }); }); + +describe('nextReindexPollInterval', () => { + const INTERVAL = 5000; + const base = { now: 1_000, intervalMs: INTERVAL }; + + it('does not poll when no reindex deadline is set', () => { + expect( + nextReindexPollInterval({ + ...base, + deadline: null, + status: { reindexing: true, indexedPages: 0, totalPages: 478 }, + }), + ).toBe(false); + }); + + it('keeps polling while the server reports an active run', () => { + expect( + nextReindexPollInterval({ + ...base, + deadline: 10_000, + status: { reindexing: true, indexedPages: 120, totalPages: 478 }, + }), + ).toBe(INTERVAL); + }); + + it('keeps polling during an active run even if counts momentarily look full', () => { + // The run clears its progress record only at the very end, so a transient + // indexed==total while reindexing is still true must NOT stop polling. + expect( + nextReindexPollInterval({ + ...base, + deadline: 10_000, + status: { reindexing: true, indexedPages: 478, totalPages: 478 }, + }), + ).toBe(INTERVAL); + }); + + it('stops once the run is finished AND fully indexed', () => { + expect( + nextReindexPollInterval({ + ...base, + deadline: 10_000, + status: { reindexing: false, indexedPages: 478, totalPages: 478 }, + }), + ).toBe(false); + }); + + it('keeps polling within the deadline when not yet done and no active flag', () => { + // First poll right after enqueue, before the worker publishes progress. + expect( + nextReindexPollInterval({ + ...base, + deadline: 10_000, + status: { reindexing: false, indexedPages: 0, totalPages: 478 }, + }), + ).toBe(INTERVAL); + }); + + it('cap always wins: stops once past the deadline even if still reindexing', () => { + expect( + nextReindexPollInterval({ + deadline: 1_000, + now: 2_000, // past the deadline + intervalMs: INTERVAL, + status: { reindexing: true, indexedPages: 200, totalPages: 478 }, + }), + ).toBe(false); + }); + + it('stops on an empty workspace (0 of 0) once the run is finished', () => { + expect( + nextReindexPollInterval({ + ...base, + deadline: 10_000, + status: { reindexing: false, indexedPages: 0, totalPages: 0 }, + }), + ).toBe(false); + }); +}); + +describe('isReindexComplete', () => { + it('false when no status yet', () => { + expect(isReindexComplete(undefined)).toBe(false); + }); + + it('false while a run is still active (even at indexed==total)', () => { + expect( + isReindexComplete({ reindexing: true, indexedPages: 478, totalPages: 478 }), + ).toBe(false); + }); + + it('false when finished but not yet fully indexed', () => { + expect( + isReindexComplete({ reindexing: false, indexedPages: 120, totalPages: 478 }), + ).toBe(false); + }); + + it('true once finished and fully indexed', () => { + expect( + isReindexComplete({ reindexing: false, indexedPages: 478, totalPages: 478 }), + ).toBe(true); + }); +}); diff --git a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx index 811c2610..a06d1e0f 100644 --- a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx +++ b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx @@ -37,6 +37,7 @@ import { } from "@/features/workspace/queries/ai-settings-query.ts"; import { AiTestCapability, + IAiSettings, IAiSettingsUpdate, SttApiStyle, ChatApiStyle, @@ -169,6 +170,51 @@ export function resolveKeyField( return { set: false }; } +// Subset of the status payload that drives the reindex poll decisions. +type ReindexStatus = Pick< + IAiSettings, + "reindexing" | "indexedPages" | "totalPages" +>; + +/** + * Decide the TanStack Query `refetchInterval` while a reindex may be running. + * Returns the poll interval (ms) to keep polling, or `false` to stop. + * + * Polls while the server reports an ACTIVE run (`reindexing === true`) OR we are + * still within the deadline window and not yet fully indexed. Stops once the run + * has finished AND everything is indexed (server cleared its progress record and + * fell back to the DB coverage count), or the deadline cap is hit — the cap + * always wins so a stuck/never-clearing progress record can't poll forever. + */ +export function nextReindexPollInterval(args: { + deadline: number | null; + now: number; + intervalMs: number; + status?: ReindexStatus; +}): number | false { + const { deadline, now, intervalMs, status } = args; + if (deadline === null) return false; + // Cap always wins. + if (now > deadline) return false; + // Active run → keep polling even if the momentary counts already look full. + if (status?.reindexing) return intervalMs; + // Finished and fully indexed (incl. an empty workspace, 0 >= 0) → stop. + if (status && status.indexedPages >= status.totalPages) return false; + // Within the deadline and not yet done → keep polling. + return intervalMs; +} + +/** + * Whether the reindex poll deadline should be cleared: the server reports no + * active run AND the count is complete. Mirrors the stop condition of + * `nextReindexPollInterval` (sans the cap, which the effect handles via time). + */ +export function isReindexComplete(status?: ReindexStatus): boolean { + return ( + !!status && !status.reindexing && status.indexedPages >= status.totalPages + ); +} + // Translate the dot's tooltip label. Kept in one place so all three endpoint // cards share identical wording. function cardStatusLabel(status: CardStatus, t: (k: string) => string): string { @@ -215,31 +261,34 @@ export default function AiProviderSettings() { // PRE-job counts immediately, so the only way the "Indexed X of Y" counter // visibly climbs is to keep polling the settings query while the job runs. // `reindexDeadline` is the timestamp until which we poll (set on reindex - // success); polling stops early once indexed === total. Bounded so a stuck - // job can never poll forever. - const REINDEX_POLL_INTERVAL = 3000; // ms between refetches while indexing + // success). Polling tracks the server's `reindexing` flag: it keeps going for + // the whole active run and stops promptly once the server reports the run is + // finished. Bounded by the cap so a stuck/never-clearing progress record can + // never poll forever. + const REINDEX_POLL_INTERVAL = 5000; // ms between refetches while indexing const REINDEX_POLL_CAP_MS = 120000; // ~2 min hard cap const [reindexDeadline, setReindexDeadline] = useState(null); // Only admins may read the (masked) AI settings; the server enforces this too. - const { data: settings, isLoading } = useAiSettingsQuery(isAdmin, (query) => { - if (reindexDeadline === null) return false; - // Past the cap → stop polling (cleared via the effect below too). - if (Date.now() > reindexDeadline) return false; - const data = query.state.data; - // Stop once everything is indexed; otherwise keep polling. - if (data && data.indexedPages >= data.totalPages) return false; - return REINDEX_POLL_INTERVAL; - }); + const { data: settings, isLoading } = useAiSettingsQuery(isAdmin, (query) => + nextReindexPollInterval({ + deadline: reindexDeadline, + now: Date.now(), + intervalMs: REINDEX_POLL_INTERVAL, + status: query.state.data, + }), + ); - // Stop polling once the work is done or the cap is reached. Also clears on + // Stop polling once the run is finished or the cap is reached. Also clears on // unmount because the deadline state goes away with the component. useEffect(() => { if (reindexDeadline === null) return; - // "Done" matches the refetchInterval stop condition (indexed >= total), - // including an empty workspace (0 >= 0), so the deadline clears promptly - // instead of waiting out the cap. - if (settings && settings.indexedPages >= settings.totalPages) { + // "Done" matches the refetchInterval stop condition: the server reports no + // active run AND the count is complete (indexed >= total, incl. an empty + // workspace 0 >= 0), so the deadline clears promptly instead of waiting out + // the cap. While `reindexing` is still true we keep the deadline so polling + // continues for the whole run. + if (isReindexComplete(settings)) { setReindexDeadline(null); return; } @@ -1031,7 +1080,11 @@ export default function AiProviderSettings() {