feat(ai): wire up workspace RAG bulk reindex + manual "Reindex now"

The WORKSPACE_CREATE_EMBEDDINGS / WORKSPACE_DELETE_EMBEDDINGS jobs were
enqueued (on AI Search enable/disable) but had no AI_QUEUE handler, so
existing pages were never indexed ("Indexed 0 of N pages") and disabling
never purged embeddings.

- EmbeddingProcessor: handle WORKSPACE_CREATE_EMBEDDINGS (bulk reindex all
  live pages) and WORKSPACE_DELETE_EMBEDDINGS (purge workspace embeddings)
- EmbeddingIndexerService: add reindexWorkspace() (skips when embeddings
  unconfigured; per-page error isolation) and removeWorkspace()
- PageRepo.getIdsByWorkspace(), PageEmbeddingRepo.deleteByWorkspace()
- AiSettingsService.reindex() + admin-only POST /workspace/ai-settings/reindex
- Frontend: "Reindex now" button, service call and mutation
- Stable per-workspace jobId with remove-before-add so a stale job can't
  block future reindexes; cancel the delayed purge on enable/reindex so it
  can't wipe freshly-built embeddings
This commit is contained in:
vvzvlad
2026-06-18 02:15:18 +03:00
parent 4cf6b73d3e
commit 52e19fe678
12 changed files with 253 additions and 16 deletions

View File

@@ -18,6 +18,7 @@ import { useTranslation } from "react-i18next";
import useUserRole from "@/hooks/use-user-role.tsx";
import {
useAiSettingsQuery,
useReindexAiEmbeddingsMutation,
useTestAiConnectionMutation,
useUpdateAiSettingsMutation,
} from "@/features/workspace/queries/ai-settings-query.ts";
@@ -50,6 +51,7 @@ export default function AiProviderSettings() {
const { data: settings, isLoading } = useAiSettingsQuery(isAdmin);
const updateMutation = useUpdateAiSettingsMutation();
const testMutation = useTestAiConnectionMutation();
const reindexMutation = useReindexAiEmbeddingsMutation();
// Whether a key is currently stored server-side (drives the placeholder).
const [hasApiKey, setHasApiKey] = useState(false);
@@ -258,12 +260,24 @@ export default function AiProviderSettings() {
)}
{settings && (
<Text size="sm" c="dimmed" mt={-8}>
{t("Indexed {{indexed}} of {{total}} pages", {
indexed: settings.indexedPages ?? 0,
total: settings.totalPages ?? 0,
})}
</Text>
<Group justify="space-between" mt={-8}>
<Text size="sm" c="dimmed">
{t("Indexed {{indexed}} of {{total}} pages", {
indexed: settings.indexedPages ?? 0,
total: settings.totalPages ?? 0,
})}
</Text>
{isAdmin && (
<Button
variant="subtle"
size="compact-sm"
onClick={() => reindexMutation.mutate()}
loading={reindexMutation.isPending}
>
{t("Reindex now")}
</Button>
)}
</Group>
)}
<Textarea

View File

@@ -8,6 +8,7 @@ import {
getAiSettings,
updateAiSettings,
testAiConnection,
reindexAiEmbeddings,
IAiSettings,
IAiSettingsUpdate,
IAiTestResult,
@@ -52,3 +53,23 @@ export function useTestAiConnectionMutation() {
mutationFn: () => testAiConnection(),
});
}
export function useReindexAiEmbeddingsMutation() {
const { t } = useTranslation();
const queryClient = useQueryClient();
return useMutation<IAiSettings, Error, void>({
mutationFn: () => reindexAiEmbeddings(),
onSuccess: () => {
notifications.show({ message: t("Reindexing started") });
queryClient.invalidateQueries({ queryKey: aiSettingsKey });
},
onError: (error) => {
const errorMessage = error["response"]?.data?.message;
notifications.show({
message: errorMessage ?? t("Failed to start reindexing"),
color: "red",
});
},
});
}

View File

@@ -60,3 +60,8 @@ export async function testAiConnection(): Promise<IAiTestResult> {
const req = await api.post<IAiTestResult>("/workspace/ai-settings/test");
return req.data;
}
export async function reindexAiEmbeddings(): Promise<IAiSettings> {
const req = await api.post<IAiSettings>("/workspace/ai-settings/reindex");
return req.data;
}

View File

@@ -151,6 +151,61 @@ export class EmbeddingIndexerService {
);
}
/**
* (Re)build embeddings for EVERY non-deleted page in a workspace. Used by the
* bulk reindex (WORKSPACE_CREATE_EMBEDDINGS, fired when AI Search is enabled
* and by the manual "Reindex now" action).
*
* Resolves the embeddings model once up front: if the workspace has no
* embeddings provider configured, the whole batch is skipped (otherwise each
* page would no-op individually after a wasted read). Pages are processed
* sequentially and each is isolated in try/catch so one failure never aborts
* the batch.
*/
async reindexWorkspace(workspaceId: string): Promise<void> {
try {
await this.aiService.getEmbeddingModel(workspaceId);
} catch (err) {
if (err instanceof AiEmbeddingNotConfiguredException) {
this.logger.debug(
`reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`,
);
return;
}
throw err;
}
const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId);
this.logger.debug(
`reindexWorkspace: reindexing ${pageIds.length} page(s) for workspace ${workspaceId}`,
);
let failed = 0;
for (const pageId of pageIds) {
try {
await this.reindexPage(pageId);
} catch (err) {
// Per-page isolation: one failure must not abort the whole batch.
failed++;
this.logger.error(
`reindexWorkspace: failed to reindex page ${pageId}: ${
err instanceof Error ? err.message : 'Unknown error'
}`,
);
}
}
this.logger.debug(
`reindexWorkspace: done for workspace ${workspaceId} (${
pageIds.length - failed
}/${pageIds.length} pages)`,
);
}
/** Purge ALL embeddings for a workspace (WORKSPACE_DELETE_EMBEDDINGS). */
async removeWorkspace(workspaceId: string): Promise<void> {
await this.pageEmbeddingRepo.deleteByWorkspace(workspaceId);
}
/** Remove all embeddings for a deleted page (used by the delete path). */
async removePage(pageId: string, workspaceId: string): Promise<void> {
await this.pageEmbeddingRepo.deleteByPage(pageId, workspaceId);

View File

@@ -2,7 +2,10 @@ import { Logger, OnModuleDestroy } from '@nestjs/common';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { QueueJob, QueueName } from '../../../integrations/queue/constants';
import { IPageContentUpdatedJob } from '../../../integrations/queue/constants/queue.interface';
import {
IPageContentUpdatedJob,
IWorkspaceEmbeddingsJob,
} from '../../../integrations/queue/constants/queue.interface';
import { EmbeddingIndexerService } from './embedding-indexer.service';
/**
@@ -30,11 +33,15 @@ export class EmbeddingProcessor extends WorkerHost implements OnModuleDestroy {
super();
}
async process(job: Job<IPageContentUpdatedJob, void>): Promise<void> {
const { pageIds, workspaceId } = job.data ?? {
pageIds: [],
workspaceId: '',
};
async process(
job: Job<IPageContentUpdatedJob | IWorkspaceEmbeddingsJob, void>,
): Promise<void> {
// The workspace-wide jobs carry `{ workspaceId }` only (no `pageIds`), so
// read `pageIds` defensively — it is absent on the workspace payload.
const data: Partial<IPageContentUpdatedJob & IWorkspaceEmbeddingsJob> =
job.data ?? {};
const pageIds = data.pageIds ?? [];
const workspaceId = data.workspaceId ?? '';
const ids = Array.isArray(pageIds) ? pageIds : [];
switch (job.name) {
@@ -70,6 +77,28 @@ export class EmbeddingProcessor extends WorkerHost implements OnModuleDestroy {
break;
}
case QueueJob.WORKSPACE_CREATE_EMBEDDINGS: {
try {
await this.indexer.reindexWorkspace(workspaceId);
} catch (err) {
this.logger.error(
`Failed to reindex workspace ${workspaceId}: ${this.errMessage(err)}`,
);
}
break;
}
case QueueJob.WORKSPACE_DELETE_EMBEDDINGS: {
try {
await this.indexer.removeWorkspace(workspaceId);
} catch (err) {
this.logger.error(
`Failed to remove embeddings for workspace ${workspaceId}: ${this.errMessage(err)}`,
);
}
break;
}
default:
// Other AI_QUEUE job names are not handled here (e.g. future jobs).
this.logger.debug(`Ignoring AI_QUEUE job: ${job.name}`);

View File

@@ -513,9 +513,24 @@ export class WorkspaceService {
});
if (after.aiSearch === true) {
await this.aiQueue.add(QueueJob.WORKSPACE_CREATE_EMBEDDINGS, {
workspaceId,
});
// Cancel any pending delayed purge from a previous disable so it can't
// wipe the embeddings we are about to (re)build. The purge is a delayed
// job, so remove() simply deletes it (returning 0 if it is absent, without
// throwing). The .catch only guards against transport/Redis errors.
await this.aiQueue
.remove(`ai-search-disabled-${workspaceId}`)
.catch(() => undefined);
// Stable jobId de-duplicates with the manual "Reindex now" path and with
// repeated enable toggles (one full reindex at a time).
await this.aiQueue.add(
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
{ workspaceId },
{
jobId: `ai-reindex-${workspaceId}`,
removeOnComplete: true,
removeOnFail: true,
},
);
} else if (after.aiSearch === false) {
const deleteJobId = `ai-search-disabled-${workspaceId}`;
await this.aiQueue.add(

View File

@@ -70,6 +70,21 @@ export class PageEmbeddingRepo {
.execute();
}
/**
* HARD-delete every embedding row for an entire workspace. Used when AI Search
* is disabled for the workspace (WORKSPACE_DELETE_EMBEDDINGS).
*/
async deleteByWorkspace(
workspaceId: string,
trx?: KyselyTransaction,
): Promise<void> {
const db = dbOrTx(this.db, trx);
await db
.deleteFrom('pageEmbeddings')
.where('workspaceId', '=', workspaceId)
.execute();
}
/**
* Bulk-insert chunk rows for a page. The `embedding` value is serialized with
* `pgvector.toSql` and cast to `vector` so Postgres stores it in the

View File

@@ -195,6 +195,20 @@ export class PageRepo {
return Number(row?.count ?? 0);
}
/**
* IDs of all non-deleted pages in a workspace. Used by the RAG bulk reindex to
* (re)build embeddings for every existing page.
*/
async getIdsByWorkspace(workspaceId: string): Promise<string[]> {
const rows = await this.db
.selectFrom('pages')
.select('id')
.where('workspaceId', '=', workspaceId)
.where('deletedAt', 'is', null)
.execute();
return rows.map((r) => r.id);
}
async deletePage(pageId: string): Promise<void> {
let query = this.db.deleteFrom('pages');

View File

@@ -75,4 +75,16 @@ export class AiSettingsController {
this.assertAdmin(user, workspace);
return this.aiService.testConnection(workspace.id);
}
@HttpCode(HttpStatus.OK)
@Post('reindex')
async reindex(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
) {
this.assertAdmin(user, workspace);
await this.aiSettingsService.reindex(workspace.id);
// Return refreshed masked settings so the client can update the counter.
return this.aiSettingsService.getMasked(workspace.id);
}
}

View File

@@ -1,4 +1,7 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { QueueName, QueueJob } from '../queue/constants';
import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo';
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
@@ -43,8 +46,49 @@ export class AiSettingsService {
private readonly pageEmbeddingRepo: PageEmbeddingRepo,
private readonly pageRepo: PageRepo,
private readonly secretBox: SecretBoxService,
@InjectQueue(QueueName.AI_QUEUE) private readonly aiQueue: Queue,
) {}
/**
* Enqueue a full workspace RAG reindex (manual "Reindex now").
*
* Uses a stable per-workspace jobId so rapid re-triggers de-duplicate instead
* of stacking multiple full reindex passes. A prior non-active job with that
* id is removed first so a lingering completed/failed/waiting entry can never
* block a fresh reindex (BullMQ ignores add() when the jobId already exists).
* If a reindex is already running, remove() is a no-op (it leaves a
* locked/active job in place, returning 0 without throwing), and the add()
* below then de-duplicates against that still-present jobId — so the running
* pass is kept and no duplicate is started. The .catch only guards against
* transport/Redis errors.
*
* Also cancels any pending delayed WORKSPACE_DELETE_EMBEDDINGS job (scheduled
* when AI Search was disabled) so it cannot wipe the embeddings we are about
* to rebuild. The job no-ops if embeddings are unconfigured.
*/
async reindex(workspaceId: string): Promise<void> {
// A reindex means embeddings must persist: drop the delayed purge, if any.
await this.aiQueue
.remove(`ai-search-disabled-${workspaceId}`)
.catch(() => undefined);
const jobId = `ai-reindex-${workspaceId}`;
// Clear a prior non-active entry so a stale job can't block this reindex.
// A locked/active job is left in place (remove() no-ops) and the add() below
// de-duplicates against it, keeping the in-progress pass.
await this.aiQueue.remove(jobId).catch(() => undefined);
await this.aiQueue.add(
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
{ workspaceId },
{
jobId,
removeOnComplete: true,
removeOnFail: true,
},
);
}
/** Read the stored non-secret provider settings for a workspace. */
private async readProvider(
workspaceId: string,

View File

@@ -1,5 +1,7 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { CryptoModule } from '../crypto/crypto.module';
import { QueueName } from '../queue/constants';
import { AiService } from './ai.service';
import { AiSettingsService } from './ai-settings.service';
import { AiSettingsController } from './ai-settings.controller';
@@ -12,7 +14,10 @@ import { AiSettingsController } from './ai-settings.controller';
* (CaslModule, global) are resolved without explicit imports.
*/
@Module({
imports: [CryptoModule],
imports: [
CryptoModule,
BullModule.registerQueue({ name: QueueName.AI_QUEUE }),
],
controllers: [AiSettingsController],
providers: [AiService, AiSettingsService],
exports: [AiService, AiSettingsService],

View File

@@ -33,6 +33,14 @@ export interface IPageContentUpdatedJob {
workspaceId: string;
}
/**
* AI_QUEUE payload for workspace-wide RAG embedding jobs
* (WORKSPACE_CREATE_EMBEDDINGS / WORKSPACE_DELETE_EMBEDDINGS).
*/
export interface IWorkspaceEmbeddingsJob {
workspaceId: string;
}
export interface INotificationCreateJob {
userId: string;
workspaceId: string;