Compare commits
6 Commits
fix/252-e2
...
fix/embedd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bdc033e689 | ||
|
|
85b38d6946 | ||
|
|
bf09eec4e1 | ||
|
|
95d07d8d6f | ||
|
|
630939e8f3 | ||
|
|
72bb03918d |
@@ -3,6 +3,9 @@ import {
|
||||
resolveCardStatus,
|
||||
isEndpointConfigured,
|
||||
resolveKeyField,
|
||||
nextReindexPollInterval,
|
||||
isReindexComplete,
|
||||
isReindexButtonLoading,
|
||||
} from './ai-provider-settings';
|
||||
|
||||
describe('resolveCardStatus', () => {
|
||||
@@ -71,3 +74,152 @@ describe('resolveKeyField (write-only key payload)', () => {
|
||||
expect(resolveKeyField('', false)).toEqual({ set: false });
|
||||
});
|
||||
});
|
||||
|
||||
describe('nextReindexPollInterval', () => {
|
||||
const INTERVAL = 5000;
|
||||
const base = { now: 1_000, intervalMs: INTERVAL };
|
||||
|
||||
it('does not poll when no reindex deadline is set', () => {
|
||||
expect(
|
||||
nextReindexPollInterval({
|
||||
...base,
|
||||
deadline: null,
|
||||
status: { reindexing: true, indexedPages: 0, totalPages: 478 },
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('keeps polling while the server reports an active run', () => {
|
||||
expect(
|
||||
nextReindexPollInterval({
|
||||
...base,
|
||||
deadline: 10_000,
|
||||
status: { reindexing: true, indexedPages: 120, totalPages: 478 },
|
||||
}),
|
||||
).toBe(INTERVAL);
|
||||
});
|
||||
|
||||
it('keeps polling during an active run even if counts momentarily look full', () => {
|
||||
// The run clears its progress record only at the very end, so a transient
|
||||
// indexed==total while reindexing is still true must NOT stop polling.
|
||||
expect(
|
||||
nextReindexPollInterval({
|
||||
...base,
|
||||
deadline: 10_000,
|
||||
status: { reindexing: true, indexedPages: 478, totalPages: 478 },
|
||||
}),
|
||||
).toBe(INTERVAL);
|
||||
});
|
||||
|
||||
it('stops once the run is finished AND fully indexed', () => {
|
||||
expect(
|
||||
nextReindexPollInterval({
|
||||
...base,
|
||||
deadline: 10_000,
|
||||
status: { reindexing: false, indexedPages: 478, totalPages: 478 },
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('keeps polling within the deadline when not yet done and no active flag', () => {
|
||||
// First poll right after enqueue, before the worker publishes progress.
|
||||
expect(
|
||||
nextReindexPollInterval({
|
||||
...base,
|
||||
deadline: 10_000,
|
||||
status: { reindexing: false, indexedPages: 0, totalPages: 478 },
|
||||
}),
|
||||
).toBe(INTERVAL);
|
||||
});
|
||||
|
||||
it('cap always wins: stops once past the deadline even if still reindexing', () => {
|
||||
expect(
|
||||
nextReindexPollInterval({
|
||||
deadline: 1_000,
|
||||
now: 2_000, // past the deadline
|
||||
intervalMs: INTERVAL,
|
||||
status: { reindexing: true, indexedPages: 200, totalPages: 478 },
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('stops on an empty workspace (0 of 0) once the run is finished', () => {
|
||||
expect(
|
||||
nextReindexPollInterval({
|
||||
...base,
|
||||
deadline: 10_000,
|
||||
status: { reindexing: false, indexedPages: 0, totalPages: 0 },
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isReindexComplete', () => {
|
||||
it('false when no status yet', () => {
|
||||
expect(isReindexComplete(undefined)).toBe(false);
|
||||
});
|
||||
|
||||
it('false while a run is still active (even at indexed==total)', () => {
|
||||
expect(
|
||||
isReindexComplete({ reindexing: true, indexedPages: 478, totalPages: 478 }),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('false when finished but not yet fully indexed', () => {
|
||||
expect(
|
||||
isReindexComplete({ reindexing: false, indexedPages: 120, totalPages: 478 }),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('true once finished and fully indexed', () => {
|
||||
expect(
|
||||
isReindexComplete({ reindexing: false, indexedPages: 478, totalPages: 478 }),
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isReindexButtonLoading', () => {
|
||||
it('loads while the POST mutation is pending', () => {
|
||||
expect(
|
||||
isReindexButtonLoading({
|
||||
mutationPending: true,
|
||||
deadline: null,
|
||||
status: false,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it('does NOT load post-cap: deadline nulled but reindexing left stale-true', () => {
|
||||
// The key case: after the poll cap fires `reindexDeadline` is null while
|
||||
// `settings.reindexing` can be a stale `true` from the last poll. Gating on
|
||||
// the deadline keeps the spinner from sticking forever so the admin can
|
||||
// restart.
|
||||
expect(
|
||||
isReindexButtonLoading({
|
||||
mutationPending: false,
|
||||
deadline: null,
|
||||
status: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it('loads during an active run within the poll window', () => {
|
||||
expect(
|
||||
isReindexButtonLoading({
|
||||
mutationPending: false,
|
||||
deadline: 10_000,
|
||||
status: true,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it('does not load once the run finished while still polling', () => {
|
||||
expect(
|
||||
isReindexButtonLoading({
|
||||
mutationPending: false,
|
||||
deadline: 10_000,
|
||||
status: false,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -37,6 +37,7 @@ import {
|
||||
} from "@/features/workspace/queries/ai-settings-query.ts";
|
||||
import {
|
||||
AiTestCapability,
|
||||
IAiSettings,
|
||||
IAiSettingsUpdate,
|
||||
SttApiStyle,
|
||||
ChatApiStyle,
|
||||
@@ -169,6 +170,71 @@ export function resolveKeyField(
|
||||
return { set: false };
|
||||
}
|
||||
|
||||
// Subset of the status payload that drives the reindex poll decisions.
|
||||
type ReindexStatus = Pick<
|
||||
IAiSettings,
|
||||
"reindexing" | "indexedPages" | "totalPages"
|
||||
>;
|
||||
|
||||
/**
|
||||
* Decide the TanStack Query `refetchInterval` while a reindex may be running.
|
||||
* Returns the poll interval (ms) to keep polling, or `false` to stop.
|
||||
*
|
||||
* Polls while the server reports an ACTIVE run (`reindexing === true`) OR we are
|
||||
* still within the deadline window and not yet fully indexed. Stops once the run
|
||||
* has finished AND everything is indexed (server cleared its progress record and
|
||||
* fell back to the DB coverage count), or the deadline cap is hit — the cap
|
||||
* always wins so a stuck/never-clearing progress record can't poll forever.
|
||||
*/
|
||||
export function nextReindexPollInterval(args: {
|
||||
deadline: number | null;
|
||||
now: number;
|
||||
intervalMs: number;
|
||||
status?: ReindexStatus;
|
||||
}): number | false {
|
||||
const { deadline, now, intervalMs, status } = args;
|
||||
if (deadline === null) return false;
|
||||
// Cap always wins.
|
||||
if (now > deadline) return false;
|
||||
// Active run → keep polling even if the momentary counts already look full.
|
||||
if (status?.reindexing) return intervalMs;
|
||||
// Finished and fully indexed (incl. an empty workspace, 0 >= 0) → stop.
|
||||
if (status && status.indexedPages >= status.totalPages) return false;
|
||||
// Within the deadline and not yet done → keep polling.
|
||||
return intervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the reindex poll deadline should be cleared: the server reports no
|
||||
* active run AND the count is complete. Mirrors the stop condition of
|
||||
* `nextReindexPollInterval` (sans the cap, which the effect handles via time).
|
||||
*/
|
||||
export function isReindexComplete(status?: ReindexStatus): boolean {
|
||||
return (
|
||||
!!status && !status.reindexing && status.indexedPages >= status.totalPages
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the reindex button should show its spinner (and stay disabled).
|
||||
*
|
||||
* Spins while the POST is in flight, and for the WHOLE background run while the
|
||||
* server reports `reindexing === true`. The `deadline !== null` gate is the
|
||||
* load-bearing part: once the 120s poll cap fires it nulls `reindexDeadline`
|
||||
* and stops refetching, so `status` (settings?.reindexing) can be a stale
|
||||
* `true` from the last poll. Without the gate the spinner would stick forever
|
||||
* for a run that outlives the cap and block a restart; gating on the active
|
||||
* poll window clears it so the admin can re-trigger.
|
||||
*/
|
||||
export function isReindexButtonLoading(args: {
|
||||
mutationPending: boolean;
|
||||
deadline: number | null;
|
||||
status?: boolean;
|
||||
}): boolean {
|
||||
const { mutationPending, deadline, status } = args;
|
||||
return mutationPending || (deadline !== null && status === true);
|
||||
}
|
||||
|
||||
// Translate the dot's tooltip label. Kept in one place so all three endpoint
|
||||
// cards share identical wording.
|
||||
function cardStatusLabel(status: CardStatus, t: (k: string) => string): string {
|
||||
@@ -215,31 +281,34 @@ export default function AiProviderSettings() {
|
||||
// PRE-job counts immediately, so the only way the "Indexed X of Y" counter
|
||||
// visibly climbs is to keep polling the settings query while the job runs.
|
||||
// `reindexDeadline` is the timestamp until which we poll (set on reindex
|
||||
// success); polling stops early once indexed === total. Bounded so a stuck
|
||||
// job can never poll forever.
|
||||
const REINDEX_POLL_INTERVAL = 3000; // ms between refetches while indexing
|
||||
// success). Polling tracks the server's `reindexing` flag: it keeps going for
|
||||
// the whole active run and stops promptly once the server reports the run is
|
||||
// finished. Bounded by the cap so a stuck/never-clearing progress record can
|
||||
// never poll forever.
|
||||
const REINDEX_POLL_INTERVAL = 5000; // ms between refetches while indexing
|
||||
const REINDEX_POLL_CAP_MS = 120000; // ~2 min hard cap
|
||||
const [reindexDeadline, setReindexDeadline] = useState<number | null>(null);
|
||||
|
||||
// Only admins may read the (masked) AI settings; the server enforces this too.
|
||||
const { data: settings, isLoading } = useAiSettingsQuery(isAdmin, (query) => {
|
||||
if (reindexDeadline === null) return false;
|
||||
// Past the cap → stop polling (cleared via the effect below too).
|
||||
if (Date.now() > reindexDeadline) return false;
|
||||
const data = query.state.data;
|
||||
// Stop once everything is indexed; otherwise keep polling.
|
||||
if (data && data.indexedPages >= data.totalPages) return false;
|
||||
return REINDEX_POLL_INTERVAL;
|
||||
});
|
||||
const { data: settings, isLoading } = useAiSettingsQuery(isAdmin, (query) =>
|
||||
nextReindexPollInterval({
|
||||
deadline: reindexDeadline,
|
||||
now: Date.now(),
|
||||
intervalMs: REINDEX_POLL_INTERVAL,
|
||||
status: query.state.data,
|
||||
}),
|
||||
);
|
||||
|
||||
// Stop polling once the work is done or the cap is reached. Also clears on
|
||||
// Stop polling once the run is finished or the cap is reached. Also clears on
|
||||
// unmount because the deadline state goes away with the component.
|
||||
useEffect(() => {
|
||||
if (reindexDeadline === null) return;
|
||||
// "Done" matches the refetchInterval stop condition (indexed >= total),
|
||||
// including an empty workspace (0 >= 0), so the deadline clears promptly
|
||||
// instead of waiting out the cap.
|
||||
if (settings && settings.indexedPages >= settings.totalPages) {
|
||||
// "Done" matches the refetchInterval stop condition: the server reports no
|
||||
// active run AND the count is complete (indexed >= total, incl. an empty
|
||||
// workspace 0 >= 0), so the deadline clears promptly instead of waiting out
|
||||
// the cap. While `reindexing` is still true we keep the deadline so polling
|
||||
// continues for the whole run.
|
||||
if (isReindexComplete(settings)) {
|
||||
setReindexDeadline(null);
|
||||
return;
|
||||
}
|
||||
@@ -1031,7 +1100,17 @@ export default function AiProviderSettings() {
|
||||
<Button
|
||||
variant="subtle"
|
||||
size="compact-sm"
|
||||
loading={reindexMutation.isPending}
|
||||
// Spin for the WHOLE run: the POST resolves immediately, but the
|
||||
// background job keeps running, so also stay loading while the
|
||||
// server reports `reindexing` (this also blocks a redundant
|
||||
// re-trigger mid-run; the server de-dupes regardless). The
|
||||
// deadline gate (and why it matters post-cap) lives in
|
||||
// `isReindexButtonLoading`, which is unit-tested.
|
||||
loading={isReindexButtonLoading({
|
||||
mutationPending: reindexMutation.isPending,
|
||||
deadline: reindexDeadline,
|
||||
status: settings?.reindexing,
|
||||
})}
|
||||
onClick={() =>
|
||||
reindexMutation.mutate(undefined, {
|
||||
// Begin bounded polling so the counter climbs as the async
|
||||
|
||||
@@ -23,8 +23,12 @@ export function useAiSettingsQuery(
|
||||
enabled: boolean = true,
|
||||
// While reindexing runs as an async background job, the counter only climbs
|
||||
// if the client keeps refetching. The component passes a refetchInterval
|
||||
// function that polls until indexed === total or a bounded deadline, then
|
||||
// returns false to stop. See AiProviderSettings.
|
||||
// function (`nextReindexPollInterval`) that keeps polling while the server
|
||||
// reports an active run (reindexing === true) OR we are still within the
|
||||
// bounded deadline and not yet fully indexed; it returns false to stop only
|
||||
// once the run has finished AND indexed >= total, or the deadline cap is hit
|
||||
// (the cap always wins). Note: a transient indexed === total during an active
|
||||
// run does NOT stop polling. See AiProviderSettings.
|
||||
refetchInterval?:
|
||||
| number
|
||||
| false
|
||||
|
||||
@@ -48,6 +48,9 @@ export interface IAiSettings {
|
||||
// RAG indexing coverage (pages indexed for semantic search).
|
||||
indexedPages: number;
|
||||
totalPages: number;
|
||||
// True while a full workspace reindex is actively running; the counts above
|
||||
// then reflect the live run progress (done climbs 0 -> total).
|
||||
reindexing?: boolean;
|
||||
}
|
||||
|
||||
// Update payload. Key semantics (same for `apiKey` and `embeddingApiKey`):
|
||||
|
||||
@@ -3,6 +3,8 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
||||
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
||||
import { AiService } from '../../../integrations/ai/ai.service';
|
||||
import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service';
|
||||
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
|
||||
|
||||
/**
|
||||
* Unit tests for EmbeddingIndexerService.reindexWorkspace's batch control flow.
|
||||
@@ -12,7 +14,8 @@ import { AiService } from '../../../integrations/ai/ai.service';
|
||||
* reindexWorkspace actually touches:
|
||||
* - aiService.getEmbeddingModel -> a model string so the up-front configured
|
||||
* check passes,
|
||||
* - pageRepo.getIdsByWorkspace -> three page ids,
|
||||
* - pageRepo.getEmbeddablePageIds -> three page ids (the embeddable set the
|
||||
* reindex iterates),
|
||||
* - service.reindexPage -> spied per test to drive the per-page outcome.
|
||||
*
|
||||
* The point under test is the catch block: a FATAL provider error (auth/billing)
|
||||
@@ -24,21 +27,30 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => {
|
||||
|
||||
function makeService() {
|
||||
const pageRepo = {
|
||||
getIdsByWorkspace: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']),
|
||||
getEmbeddablePageIds: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']),
|
||||
};
|
||||
const pageEmbeddingRepo = {};
|
||||
const aiService = {
|
||||
getEmbeddingModel: jest.fn().mockResolvedValue('some-model'),
|
||||
};
|
||||
// Progress is a best-effort cosmetic store; mock its async methods so the
|
||||
// batch control flow can be tested without Redis.
|
||||
const reindexProgress = {
|
||||
start: jest.fn().mockResolvedValue(undefined),
|
||||
increment: jest.fn().mockResolvedValue(undefined),
|
||||
clear: jest.fn().mockResolvedValue(undefined),
|
||||
get: jest.fn().mockResolvedValue(null),
|
||||
};
|
||||
const db = {};
|
||||
|
||||
const service = new EmbeddingIndexerService(
|
||||
pageRepo as unknown as PageRepo,
|
||||
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
||||
aiService as unknown as AiService,
|
||||
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||
db as unknown as KyselyDB,
|
||||
);
|
||||
return { service, pageRepo, aiService };
|
||||
return { service, pageRepo, aiService, reindexProgress };
|
||||
}
|
||||
|
||||
it('aborts after the first page on a FATAL (401) provider error', async () => {
|
||||
@@ -78,3 +90,100 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => {
|
||||
expect(reindexPage).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Live reindex-progress reporting: reindexWorkspace must publish a per-workspace
|
||||
* progress record (total at start, done incremented per processed page) and ALWAYS
|
||||
* clear it in a finally — including on a fatal abort and an unconfigured early
|
||||
* return — so the settings status can show the counter climb without ever getting
|
||||
* stuck in a "reindexing" state.
|
||||
*/
|
||||
describe('EmbeddingIndexerService.reindexWorkspace progress', () => {
|
||||
const WORKSPACE_ID = 'ws-1';
|
||||
|
||||
function makeService(pageIds: string[] = ['p1', 'p2', 'p3']) {
|
||||
const pageRepo = {
|
||||
getEmbeddablePageIds: jest.fn().mockResolvedValue(pageIds),
|
||||
};
|
||||
const pageEmbeddingRepo = {};
|
||||
const aiService = {
|
||||
getEmbeddingModel: jest.fn().mockResolvedValue('some-model'),
|
||||
};
|
||||
const reindexProgress = {
|
||||
start: jest.fn().mockResolvedValue(undefined),
|
||||
increment: jest.fn().mockResolvedValue(undefined),
|
||||
clear: jest.fn().mockResolvedValue(undefined),
|
||||
get: jest.fn().mockResolvedValue(null),
|
||||
};
|
||||
const db = {};
|
||||
const service = new EmbeddingIndexerService(
|
||||
pageRepo as unknown as PageRepo,
|
||||
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
||||
aiService as unknown as AiService,
|
||||
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||
db as unknown as KyselyDB,
|
||||
);
|
||||
return { service, pageRepo, aiService, reindexProgress };
|
||||
}
|
||||
|
||||
it('sets total at start, increments done per page, and clears in finally', async () => {
|
||||
const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']);
|
||||
jest.spyOn(service, 'reindexPage').mockResolvedValue(undefined);
|
||||
|
||||
await service.reindexWorkspace(WORKSPACE_ID);
|
||||
|
||||
expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3);
|
||||
// One increment per processed page.
|
||||
expect(reindexProgress.increment).toHaveBeenCalledTimes(3);
|
||||
expect(reindexProgress.increment).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||
// Cleared exactly once on completion.
|
||||
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||
expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||
});
|
||||
|
||||
it('counts a handled (non-fatal) per-page failure as processed', async () => {
|
||||
const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']);
|
||||
// No statusCode -> non-fatal -> isolate and continue; each counts as done.
|
||||
jest.spyOn(service, 'reindexPage').mockRejectedValue(new Error('boom'));
|
||||
|
||||
await service.reindexWorkspace(WORKSPACE_ID);
|
||||
|
||||
expect(reindexProgress.increment).toHaveBeenCalledTimes(3);
|
||||
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('clears progress in finally even when a FATAL provider error aborts the batch', async () => {
|
||||
const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']);
|
||||
// A 401 aborts on the first page (re-thrown) — the finally must still clear.
|
||||
jest
|
||||
.spyOn(service, 'reindexPage')
|
||||
.mockRejectedValue({ statusCode: 401, message: 'User not found' });
|
||||
|
||||
await expect(service.reindexWorkspace(WORKSPACE_ID)).rejects.toMatchObject({
|
||||
statusCode: 401,
|
||||
});
|
||||
|
||||
expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3);
|
||||
// Aborted page is NOT counted as processed.
|
||||
expect(reindexProgress.increment).not.toHaveBeenCalled();
|
||||
// But progress is still cleared so the run never gets stuck.
|
||||
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('clears the enqueue-seeded progress on an unconfigured early return', async () => {
|
||||
const { service, aiService, reindexProgress } = makeService();
|
||||
// Embeddings not configured: reindexWorkspace returns early WITHOUT starting
|
||||
// a fresh record, but the finally must still clear the enqueue-time seed.
|
||||
aiService.getEmbeddingModel = jest
|
||||
.fn()
|
||||
.mockRejectedValue(new AiEmbeddingNotConfiguredException());
|
||||
|
||||
await expect(
|
||||
service.reindexWorkspace(WORKSPACE_ID),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
expect(reindexProgress.start).not.toHaveBeenCalled();
|
||||
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||
expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -9,6 +9,7 @@ import { KyselyDB } from '@docmost/db/types/kysely.types';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { executeTx } from '@docmost/db/utils';
|
||||
import { AiService } from '../../../integrations/ai/ai.service';
|
||||
import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service';
|
||||
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
|
||||
import {
|
||||
describeProviderError,
|
||||
@@ -48,6 +49,7 @@ export class EmbeddingIndexerService {
|
||||
private readonly pageRepo: PageRepo,
|
||||
private readonly pageEmbeddingRepo: PageEmbeddingRepo,
|
||||
private readonly aiService: AiService,
|
||||
private readonly reindexProgress: EmbeddingReindexProgressService,
|
||||
@InjectKysely() private readonly db: KyselyDB,
|
||||
) {}
|
||||
|
||||
@@ -183,9 +185,17 @@ export class EmbeddingIndexerService {
|
||||
}
|
||||
|
||||
/**
|
||||
* (Re)build embeddings for EVERY non-deleted page in a workspace. Used by the
|
||||
* bulk reindex (WORKSPACE_CREATE_EMBEDDINGS, fired when AI Search is enabled
|
||||
* and by the manual "Reindex now" action).
|
||||
* (Re)build embeddings for the EMBEDDABLE page set of a workspace — the same
|
||||
* set countEmbeddablePages counts (via getEmbeddablePageIds): non-deleted pages
|
||||
* that have non-empty textContent OR already have a stored embedding row, NOT
|
||||
* every non-deleted page. Iterating this set keeps the live `total` equal to
|
||||
* the steady-state denominator, so the progress counter climbs 0 -> total and
|
||||
* matches the before/after DB coverage exactly. Text-less pages are correctly
|
||||
* skipped (reindexPage no-ops on them); a page that lost its text but still has
|
||||
* stale embeddings stays in the set (the EXISTS clause) so it is visited and
|
||||
* its stale rows are cleared. Used by the bulk reindex
|
||||
* (WORKSPACE_CREATE_EMBEDDINGS, fired when AI Search is enabled and by the
|
||||
* manual "Reindex now" action).
|
||||
*
|
||||
* Resolves the embeddings model once up front: if the workspace has no
|
||||
* embeddings provider configured, the whole batch is skipped (otherwise each
|
||||
@@ -194,69 +204,96 @@ export class EmbeddingIndexerService {
|
||||
* the batch.
|
||||
*/
|
||||
async reindexWorkspace(workspaceId: string): Promise<void> {
|
||||
// The whole run is wrapped so the per-workspace progress record is ALWAYS
|
||||
// cleared in the finally — on success, on a fatal-provider abort, on an
|
||||
// unconfigured early-return, or on any unexpected throw — so a failed run
|
||||
// never leaves a stuck "reindexing" state (the status then falls back to the
|
||||
// steady-state DB coverage count). A placeholder record may already exist
|
||||
// (seeded at enqueue time); the finally cleans that too.
|
||||
try {
|
||||
await this.aiService.getEmbeddingModel(workspaceId);
|
||||
} catch (err) {
|
||||
if (err instanceof AiEmbeddingNotConfiguredException) {
|
||||
this.logger.log(
|
||||
`reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
||||
const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId);
|
||||
const total = pageIds.length;
|
||||
const startedAt = Date.now();
|
||||
this.logger.log(
|
||||
`reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`,
|
||||
);
|
||||
|
||||
let failed = 0;
|
||||
for (let i = 0; i < total; i++) {
|
||||
const pageId = pageIds[i];
|
||||
const position = i + 1;
|
||||
// Log BEFORE the await: if the embedding call hangs, this is the last line
|
||||
// in the log and it names the exact page that is stuck.
|
||||
this.logger.log(
|
||||
`reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`,
|
||||
);
|
||||
const pageStartedAt = Date.now();
|
||||
try {
|
||||
await this.reindexPage(pageId);
|
||||
const elapsed = Date.now() - pageStartedAt;
|
||||
if (elapsed >= SLOW_PAGE_MS) {
|
||||
this.logger.warn(
|
||||
`reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`,
|
||||
);
|
||||
}
|
||||
await this.aiService.getEmbeddingModel(workspaceId);
|
||||
} catch (err) {
|
||||
// A fatal provider error (invalid/missing key, no credits) recurs
|
||||
// identically on EVERY remaining page. Abort the whole batch instead of
|
||||
// issuing hundreds of doomed requests against the provider.
|
||||
if (isFatalProviderError(err)) {
|
||||
this.logger.error(
|
||||
`reindexWorkspace: aborting at [${position}/${total}] for workspace ` +
|
||||
`${workspaceId} — fatal provider error, remaining pages would fail ` +
|
||||
`identically: ${describeProviderError(err)}`,
|
||||
if (err instanceof AiEmbeddingNotConfiguredException) {
|
||||
this.logger.log(
|
||||
`reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`,
|
||||
);
|
||||
throw err;
|
||||
return;
|
||||
}
|
||||
// Per-page isolation: one non-fatal failure (incl. an embedding timeout)
|
||||
// must not abort the whole batch.
|
||||
failed++;
|
||||
this.logger.error(
|
||||
`reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` +
|
||||
`after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`,
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`reindexWorkspace: done for workspace ${workspaceId}: ` +
|
||||
`${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`,
|
||||
);
|
||||
// Iterate the EMBEDDABLE set (same predicate as countEmbeddablePages), NOT
|
||||
// every non-deleted page: this makes `total` here equal the steady-state
|
||||
// denominator, so the live counter climbs 0 -> total and matches the
|
||||
// before/after DB count exactly (no 478 -> 500 -> 478 denominator jump).
|
||||
// Text-less pages are correctly skipped — reindexPage no-ops on them, and
|
||||
// a page that lost its text but still has stale embeddings IS in this set
|
||||
// (the EXISTS clause) so it is still visited and its stale rows cleared.
|
||||
const pageIds = await this.pageRepo.getEmbeddablePageIds(workspaceId);
|
||||
const total = pageIds.length;
|
||||
const startedAt = Date.now();
|
||||
// Publish the live run progress over this same set (done reset to 0). The
|
||||
// counter increments once per iterated page and reaches exactly `total`,
|
||||
// which equals countEmbeddablePages — the steady-state denominator.
|
||||
await this.reindexProgress.start(workspaceId, total);
|
||||
this.logger.log(
|
||||
`reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`,
|
||||
);
|
||||
|
||||
let failed = 0;
|
||||
for (let i = 0; i < total; i++) {
|
||||
const pageId = pageIds[i];
|
||||
const position = i + 1;
|
||||
// Log BEFORE the await: if the embedding call hangs, this is the last line
|
||||
// in the log and it names the exact page that is stuck.
|
||||
this.logger.log(
|
||||
`reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`,
|
||||
);
|
||||
const pageStartedAt = Date.now();
|
||||
try {
|
||||
await this.reindexPage(pageId);
|
||||
// Count this page as processed (matches the [position/total] log).
|
||||
await this.reindexProgress.increment(workspaceId);
|
||||
const elapsed = Date.now() - pageStartedAt;
|
||||
if (elapsed >= SLOW_PAGE_MS) {
|
||||
this.logger.warn(
|
||||
`reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
// A fatal provider error (invalid/missing key, no credits) recurs
|
||||
// identically on EVERY remaining page. Abort the whole batch instead of
|
||||
// issuing hundreds of doomed requests against the provider. Do NOT count
|
||||
// it as processed — the run aborts here (the finally clears progress).
|
||||
if (isFatalProviderError(err)) {
|
||||
this.logger.error(
|
||||
`reindexWorkspace: aborting at [${position}/${total}] for workspace ` +
|
||||
`${workspaceId} — fatal provider error, remaining pages would fail ` +
|
||||
`identically: ${describeProviderError(err)}`,
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
// Per-page isolation: one non-fatal failure (incl. an embedding timeout)
|
||||
// must not abort the whole batch. A handled failure still advances the
|
||||
// counter (matches the [position/total] log, so done reaches total).
|
||||
failed++;
|
||||
await this.reindexProgress.increment(workspaceId);
|
||||
this.logger.error(
|
||||
`reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` +
|
||||
`after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`reindexWorkspace: done for workspace ${workspaceId}: ` +
|
||||
`${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`,
|
||||
);
|
||||
} finally {
|
||||
// Always remove the progress record so the status reverts to the DB count.
|
||||
await this.reindexProgress.clear(workspaceId);
|
||||
}
|
||||
}
|
||||
|
||||
/** Purge ALL embeddings for a workspace (WORKSPACE_DELETE_EMBEDDINGS). */
|
||||
|
||||
@@ -12,6 +12,7 @@ import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagin
|
||||
import { validate as isValidUUID } from 'uuid';
|
||||
import { ExpressionBuilder, sql } from 'kysely';
|
||||
import { DB } from '@docmost/db/types/db';
|
||||
import { DbInterface } from '@docmost/db/types/db.interface';
|
||||
import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres';
|
||||
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
|
||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||
@@ -243,37 +244,68 @@ export class PageRepo {
|
||||
.selectFrom('pages as p')
|
||||
.where('p.workspaceId', '=', workspaceId)
|
||||
.where('p.deletedAt', 'is', null)
|
||||
.where((eb) =>
|
||||
eb.or([
|
||||
// Has extractable body text. The regex matches any non-whitespace
|
||||
// character, mirroring the indexer's `text.trim().length === 0` check
|
||||
// (raw SQL -> use the snake_case column name).
|
||||
sql<boolean>`p.text_content ~ '[^[:space:]]'`,
|
||||
// OR already has at least one (non-deleted) embedding row.
|
||||
eb.exists(
|
||||
eb
|
||||
.selectFrom('pageEmbeddings as pe')
|
||||
.select(sql`1`.as('one'))
|
||||
.whereRef('pe.pageId', '=', 'p.id')
|
||||
.where('pe.deletedAt', 'is', null),
|
||||
),
|
||||
]),
|
||||
)
|
||||
.where((eb) => this.embeddablePredicate(eb))
|
||||
.select((eb) => eb.fn.countAll().as('count'))
|
||||
.executeTakeFirst();
|
||||
return Number(row?.count ?? 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* IDs of all non-deleted pages in a workspace. Used by the RAG bulk reindex to
|
||||
* (re)build embeddings for every existing page.
|
||||
* The "embeddable content" qualifying predicate, shared verbatim by
|
||||
* countEmbeddablePages (the steady-state denominator) and getEmbeddablePageIds
|
||||
* (the set the bulk reindex iterates). Both MUST use the exact same condition
|
||||
* or the live total and steady-state total diverge — extracting it here is what
|
||||
* guarantees that, replacing the previous hand-duplicated copy. Callers supply
|
||||
* the trivial workspaceId/deletedAt filters inline; this returns only the
|
||||
* non-trivial OR clause, evaluated against the `p` alias of `pages`.
|
||||
*
|
||||
* A page qualifies if it has non-empty textContent OR already has a stored
|
||||
* (non-deleted) embedding row.
|
||||
*/
|
||||
async getIdsByWorkspace(workspaceId: string): Promise<string[]> {
|
||||
private embeddablePredicate(
|
||||
eb: ExpressionBuilder<DbInterface & { p: DbInterface['pages'] }, 'p'>,
|
||||
) {
|
||||
return eb.or([
|
||||
// Has extractable body text. The regex matches any non-whitespace
|
||||
// character, mirroring the indexer's `text.trim().length === 0` check
|
||||
// (raw SQL -> use the snake_case column name).
|
||||
sql<boolean>`p.text_content ~ '[^[:space:]]'`,
|
||||
// OR already has at least one (non-deleted) embedding row.
|
||||
eb.exists(
|
||||
eb
|
||||
.selectFrom('pageEmbeddings as pe')
|
||||
.select(sql`1`.as('one'))
|
||||
.whereRef('pe.pageId', '=', 'p.id')
|
||||
.where('pe.deletedAt', 'is', null),
|
||||
),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* IDs of the EMBEDDABLE page set for a workspace — the exact same set that
|
||||
* `countEmbeddablePages` counts (a page qualifies if it has non-empty
|
||||
* textContent OR already has a stored embedding row). The bulk reindex
|
||||
* iterates THIS set so the live "done" counter reaches exactly
|
||||
* `countEmbeddablePages` (the steady-state denominator), instead of iterating
|
||||
* every non-deleted page (which would push the denominator above the
|
||||
* steady-state value mid-run).
|
||||
*
|
||||
* IMPORTANT: the qualifying WHERE is shared with `countEmbeddablePages` via the
|
||||
* private `embeddablePredicate` helper, so the two can no longer drift — if the
|
||||
* embeddable definition changes, change it once there and both stay in lockstep
|
||||
* (else the live total and steady-state total diverge again). Dropping
|
||||
* text-less pages is correct: `reindexPage` no-ops on
|
||||
* a page with no extractable content anyway, and a page that lost its text but
|
||||
* still has stale embeddings IS in this set (the EXISTS clause), so it is still
|
||||
* visited and its stale rows are cleared.
|
||||
*/
|
||||
async getEmbeddablePageIds(workspaceId: string): Promise<string[]> {
|
||||
const rows = await this.db
|
||||
.selectFrom('pages')
|
||||
.select('id')
|
||||
.where('workspaceId', '=', workspaceId)
|
||||
.where('deletedAt', 'is', null)
|
||||
.selectFrom('pages as p')
|
||||
.select('p.id')
|
||||
.where('p.workspaceId', '=', workspaceId)
|
||||
.where('p.deletedAt', 'is', null)
|
||||
.where((eb) => this.embeddablePredicate(eb))
|
||||
.execute();
|
||||
return rows.map((r) => r.id);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,12 @@
|
||||
import { parsePositiveInt } from './ai-settings.service';
|
||||
import { AiSettingsService, parsePositiveInt } from './ai-settings.service';
|
||||
import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo';
|
||||
import { AiAgentRoleRepo } from '@docmost/db/repos/ai-agent-roles/ai-agent-roles.repo';
|
||||
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
||||
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { SecretBoxService } from '../crypto/secret-box';
|
||||
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||
import type { Queue } from 'bullmq';
|
||||
|
||||
/**
|
||||
* Round-trip coercion for numeric `::text` provider settings (e.g.
|
||||
@@ -41,3 +49,180 @@ describe('parsePositiveInt', () => {
|
||||
expect(parsePositiveInt(42)).toBe(42);
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* getMasked must surface the LIVE reindex run progress while a reindex is active
|
||||
* (so the "Indexed X of Y" counter can climb 0 -> total), and fall back to the
|
||||
* steady-state DB coverage count (countIndexedPages / countEmbeddablePages) when
|
||||
* no reindex is running. This is the server side of the fix for the counter that
|
||||
* otherwise stays stuck at "478 of 478" the whole reindex.
|
||||
*/
|
||||
describe('AiSettingsService.getMasked reindex progress', () => {
|
||||
const WORKSPACE_ID = 'ws-1';
|
||||
|
||||
function makeService() {
|
||||
// No driver configured -> the credentials lookup is skipped, keeping the
|
||||
// setup minimal; we only care about the indexed/total numbers here.
|
||||
const workspaceRepo = {
|
||||
findById: jest.fn().mockResolvedValue({ settings: {} }),
|
||||
};
|
||||
const aiAgentRoleRepo = {};
|
||||
const aiProviderCredentialsRepo = { find: jest.fn() };
|
||||
const pageEmbeddingRepo = {
|
||||
countIndexedPages: jest.fn().mockResolvedValue(478),
|
||||
};
|
||||
const pageRepo = {
|
||||
countEmbeddablePages: jest.fn().mockResolvedValue(478),
|
||||
};
|
||||
const secretBox = {};
|
||||
const reindexProgress = {
|
||||
get: jest.fn().mockResolvedValue(null),
|
||||
};
|
||||
const aiQueue = {};
|
||||
|
||||
const service = new AiSettingsService(
|
||||
workspaceRepo as unknown as WorkspaceRepo,
|
||||
aiAgentRoleRepo as unknown as AiAgentRoleRepo,
|
||||
aiProviderCredentialsRepo as unknown as AiProviderCredentialsRepo,
|
||||
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
||||
pageRepo as unknown as PageRepo,
|
||||
secretBox as unknown as SecretBoxService,
|
||||
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||
aiQueue as unknown as Queue,
|
||||
);
|
||||
return { service, reindexProgress, pageEmbeddingRepo };
|
||||
}
|
||||
|
||||
it('reports the live run numbers when a reindex progress record is active', async () => {
|
||||
const { service, reindexProgress } = makeService();
|
||||
// Use a progress.total (500) DISTINCT from the DB count (478) so the test
|
||||
// actually pins the progress.total branch rather than coincidentally
|
||||
// matching the DB fallback. With fix #1 the two sources agree in practice,
|
||||
// but getMasked must still return progress.total when a record is active.
|
||||
reindexProgress.get.mockResolvedValue({
|
||||
total: 500,
|
||||
done: 120,
|
||||
startedAt: Date.now(),
|
||||
});
|
||||
|
||||
const masked = await service.getMasked(WORKSPACE_ID);
|
||||
|
||||
expect(masked.indexedPages).toBe(120); // progress.done, not DB 478
|
||||
expect(masked.totalPages).toBe(500); // progress.total, not DB 478
|
||||
expect(masked.reindexing).toBe(true);
|
||||
});
|
||||
|
||||
it('falls back to countIndexedPages when no reindex is active', async () => {
|
||||
const { service, reindexProgress } = makeService();
|
||||
reindexProgress.get.mockResolvedValue(null);
|
||||
|
||||
const masked = await service.getMasked(WORKSPACE_ID);
|
||||
|
||||
expect(masked.indexedPages).toBe(478);
|
||||
expect(masked.totalPages).toBe(478);
|
||||
expect(masked.reindexing).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* reindex() must seed a live progress record (done=0) BEFORE enqueueing so the
|
||||
* first status poll shows 0 — but ONLY when no run is already active, since
|
||||
* aiQueue.add() de-duplicates a running reindex and a re-seed would reset the
|
||||
* visible counter to 0 while the live worker keeps incrementing from its real
|
||||
* position.
|
||||
*/
|
||||
describe('AiSettingsService.reindex progress seed', () => {
|
||||
const WORKSPACE_ID = 'ws-1';
|
||||
|
||||
function makeService() {
|
||||
const order: string[] = [];
|
||||
const aiQueue = {
|
||||
remove: jest.fn().mockResolvedValue(undefined),
|
||||
add: jest.fn().mockImplementation(async () => {
|
||||
order.push('add');
|
||||
}),
|
||||
};
|
||||
const pageRepo = {
|
||||
countEmbeddablePages: jest.fn().mockResolvedValue(478),
|
||||
};
|
||||
const reindexProgress = {
|
||||
// Default: no active run -> seed should happen.
|
||||
get: jest.fn().mockResolvedValue(null),
|
||||
start: jest.fn().mockImplementation(async () => {
|
||||
order.push('start');
|
||||
}),
|
||||
clear: jest.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
const service = new AiSettingsService(
|
||||
{} as unknown as WorkspaceRepo,
|
||||
{} as unknown as AiAgentRoleRepo,
|
||||
{} as unknown as AiProviderCredentialsRepo,
|
||||
{} as unknown as PageEmbeddingRepo,
|
||||
pageRepo as unknown as PageRepo,
|
||||
{} as unknown as SecretBoxService,
|
||||
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||
aiQueue as unknown as Queue,
|
||||
);
|
||||
return { service, aiQueue, pageRepo, reindexProgress, order };
|
||||
}
|
||||
|
||||
it('seeds progress (workspace, count) BEFORE enqueue when no run is active', async () => {
|
||||
const { service, aiQueue, reindexProgress, order } = makeService();
|
||||
|
||||
await service.reindex(WORKSPACE_ID);
|
||||
|
||||
expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478);
|
||||
expect(aiQueue.add).toHaveBeenCalledTimes(1);
|
||||
// Seed must precede the enqueue so the first poll already reports done=0.
|
||||
expect(order).toEqual(['start', 'add']);
|
||||
});
|
||||
|
||||
it('does NOT re-seed when a run is already active (mid-run re-trigger)', async () => {
|
||||
const { service, aiQueue, reindexProgress } = makeService();
|
||||
// An active record exists -> a second click must not reset the counter.
|
||||
reindexProgress.get.mockResolvedValue({
|
||||
total: 478,
|
||||
done: 120,
|
||||
startedAt: Date.now(),
|
||||
});
|
||||
|
||||
await service.reindex(WORKSPACE_ID);
|
||||
|
||||
expect(reindexProgress.start).not.toHaveBeenCalled();
|
||||
// The enqueue still runs (and de-duplicates against the active job).
|
||||
expect(aiQueue.add).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('clears the seed it just wrote and re-throws when enqueue fails', async () => {
|
||||
const { service, aiQueue, reindexProgress } = makeService();
|
||||
// This call seeds (get() is null) but the enqueue then blows up
|
||||
// (Redis hiccup/shutdown) -> the worker never runs and never clear()s, so
|
||||
// reindex() must roll back its own seed to avoid a 1h stuck "reindexing".
|
||||
const boom = new Error('redis down');
|
||||
aiQueue.add.mockRejectedValue(boom);
|
||||
|
||||
await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom);
|
||||
|
||||
expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478);
|
||||
expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||
});
|
||||
|
||||
it('does NOT clear a concurrent active run when enqueue fails (no seed)', async () => {
|
||||
const { service, aiQueue, reindexProgress } = makeService();
|
||||
// A run is already active, so THIS call does not seed; if the enqueue then
|
||||
// fails it must NOT wipe the live worker's record.
|
||||
reindexProgress.get.mockResolvedValue({
|
||||
total: 478,
|
||||
done: 120,
|
||||
startedAt: Date.now(),
|
||||
});
|
||||
const boom = new Error('redis down');
|
||||
aiQueue.add.mockRejectedValue(boom);
|
||||
|
||||
await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom);
|
||||
|
||||
expect(reindexProgress.start).not.toHaveBeenCalled();
|
||||
expect(reindexProgress.clear).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -8,6 +8,7 @@ import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider
|
||||
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { SecretBoxService } from '../crypto/secret-box';
|
||||
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||
import {
|
||||
AiDriver,
|
||||
AiProviderSettings,
|
||||
@@ -74,6 +75,7 @@ export class AiSettingsService {
|
||||
private readonly pageEmbeddingRepo: PageEmbeddingRepo,
|
||||
private readonly pageRepo: PageRepo,
|
||||
private readonly secretBox: SecretBoxService,
|
||||
private readonly reindexProgress: EmbeddingReindexProgressService,
|
||||
@InjectQueue(QueueName.AI_QUEUE) private readonly aiQueue: Queue,
|
||||
) {}
|
||||
|
||||
@@ -100,21 +102,52 @@ export class AiSettingsService {
|
||||
.remove(`ai-search-disabled-${workspaceId}`)
|
||||
.catch(() => undefined);
|
||||
|
||||
// Seed a live progress record BEFORE enqueueing so the very first status
|
||||
// poll already reports done=0 (the reindex POST returns the PRE-job counts,
|
||||
// so without this seed the first poll would still show "total of total").
|
||||
// `totalPages` uses countEmbeddablePages — the SAME set the worker iterates
|
||||
// and the SAME denominator the status endpoint reports, so the live and
|
||||
// steady-state totals match.
|
||||
//
|
||||
// ONLY seed when no run is active: aiQueue.add() de-duplicates an already-
|
||||
// running reindex, so a mid-run re-trigger (second click / second admin /
|
||||
// second tab) must NOT reset the visible counter to 0 — that would
|
||||
// understate the live worker's real position for the rest of the run. The
|
||||
// worker's own start() at run begin is the single authoritative reset.
|
||||
let seeded = false;
|
||||
if ((await this.reindexProgress.get(workspaceId)) === null) {
|
||||
const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId);
|
||||
await this.reindexProgress.start(workspaceId, totalPages);
|
||||
seeded = true;
|
||||
}
|
||||
|
||||
const jobId = `ai-reindex-${workspaceId}`;
|
||||
// Clear a prior non-active entry so a stale job can't block this reindex.
|
||||
// A locked/active job is left in place (remove() no-ops) and the add() below
|
||||
// de-duplicates against it, keeping the in-progress pass.
|
||||
await this.aiQueue.remove(jobId).catch(() => undefined);
|
||||
|
||||
await this.aiQueue.add(
|
||||
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
|
||||
{ workspaceId },
|
||||
{
|
||||
jobId,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
try {
|
||||
await this.aiQueue.add(
|
||||
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
|
||||
{ workspaceId },
|
||||
{
|
||||
jobId,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
} catch (err) {
|
||||
// If the enqueue fails (Redis hiccup/shutdown) the worker never runs, so
|
||||
// its finally->clear() never fires. Roll back the seed WE just wrote so
|
||||
// the status endpoint doesn't report a stuck "reindexing: 0 of N" for the
|
||||
// full TTL. Only clear when this call did the seed — never wipe a
|
||||
// concurrent active run's record (get() was non-null, seeded=false).
|
||||
if (seeded) {
|
||||
await this.reindexProgress.clear(workspaceId);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -261,6 +294,15 @@ export class AiSettingsService {
|
||||
this.pageRepo.countEmbeddablePages(workspaceId),
|
||||
]);
|
||||
|
||||
// While a reindex run is active, report its LIVE progress (done climbs 0 ->
|
||||
// total) so the settings UI can watch it advance. Without this the counter
|
||||
// never drops: the per-page reindex hard-replaces rows in its own small
|
||||
// transaction, so countIndexedPages stays ~= total for the whole run. With
|
||||
// no active record we fall back to the steady-state DB coverage count, which
|
||||
// preserves the existing display and the client's "done == total -> stop
|
||||
// polling" condition (the run ends -> record cleared -> DB count == total).
|
||||
const progress = await this.reindexProgress.get(workspaceId);
|
||||
|
||||
return {
|
||||
driver: provider.driver,
|
||||
chatModel: provider.chatModel,
|
||||
@@ -279,8 +321,10 @@ export class AiSettingsService {
|
||||
hasApiKey,
|
||||
hasEmbeddingApiKey,
|
||||
hasSttApiKey,
|
||||
indexedPages,
|
||||
totalPages,
|
||||
indexedPages: progress ? progress.done : indexedPages,
|
||||
totalPages: progress ? progress.total : totalPages,
|
||||
// Optional hint for the client: a reindex run is currently in progress.
|
||||
reindexing: progress != null,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import { QueueName } from '../queue/constants';
|
||||
import { AiService } from './ai.service';
|
||||
import { AiSettingsService } from './ai-settings.service';
|
||||
import { AiSettingsController } from './ai-settings.controller';
|
||||
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||
|
||||
/**
|
||||
* LLM driver + provider-settings unit (§6.2/§6.4).
|
||||
@@ -19,7 +20,7 @@ import { AiSettingsController } from './ai-settings.controller';
|
||||
BullModule.registerQueue({ name: QueueName.AI_QUEUE }),
|
||||
],
|
||||
controllers: [AiSettingsController],
|
||||
providers: [AiService, AiSettingsService],
|
||||
exports: [AiService, AiSettingsService],
|
||||
providers: [AiService, AiSettingsService, EmbeddingReindexProgressService],
|
||||
exports: [AiService, AiSettingsService, EmbeddingReindexProgressService],
|
||||
})
|
||||
export class AiModule {}
|
||||
|
||||
@@ -146,4 +146,7 @@ export interface MaskedAiSettings {
|
||||
// RAG indexing coverage for the settings UI.
|
||||
indexedPages: number;
|
||||
totalPages: number;
|
||||
// True while a full workspace reindex is actively running (the counts above
|
||||
// then reflect the live run progress rather than the steady-state DB count).
|
||||
reindexing?: boolean;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,163 @@
|
||||
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||
import type { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||
import type { Redis } from 'ioredis';
|
||||
|
||||
/**
|
||||
* Unit tests for the Redis-backed reindex-progress store.
|
||||
*
|
||||
* The store is a thin, BEST-EFFORT wrapper: writes (start/increment) issue an
|
||||
* hset/hincrby + expire pipeline and must SWALLOW Redis errors (progress is
|
||||
* cosmetic — it must never break a reindex); reads (get) must map a valid hash
|
||||
* to a ReindexProgress and degrade to null on a malformed/missing record or a
|
||||
* Redis failure. We drive it with a hand-rolled fake ioredis (the project mocks
|
||||
* Redis with plain fakes, see public-share limiter specs).
|
||||
*/
|
||||
describe('EmbeddingReindexProgressService', () => {
|
||||
const WORKSPACE_ID = 'ws-1';
|
||||
const KEY = 'ai:reindex:progress:ws-1';
|
||||
|
||||
/**
|
||||
* Build a fake ioredis whose `multi()` returns a chainable recorder and whose
|
||||
* `hgetall`/`del` are configurable jest mocks. `execImpl` lets a test make the
|
||||
* pipeline reject (to assert error-swallowing).
|
||||
*/
|
||||
function makeRedis(opts: { execImpl?: () => Promise<unknown> } = {}) {
|
||||
const exec = jest
|
||||
.fn()
|
||||
.mockImplementation(opts.execImpl ?? (() => Promise.resolve([])));
|
||||
// mockReturnThis() returns the call's `this` (the multi object), so the
|
||||
// chain hset().expire().exec() resolves correctly.
|
||||
const multiObj = {
|
||||
hset: jest.fn().mockReturnThis(),
|
||||
hincrby: jest.fn().mockReturnThis(),
|
||||
expire: jest.fn().mockReturnThis(),
|
||||
exec,
|
||||
};
|
||||
const multi = jest.fn(() => multiObj);
|
||||
const hgetall = jest.fn().mockResolvedValue({});
|
||||
const del = jest.fn().mockResolvedValue(1);
|
||||
const redis = { multi, hgetall, del } as unknown as Redis;
|
||||
return { redis, multiObj, multi, hgetall, del, exec };
|
||||
}
|
||||
|
||||
function makeService(redis: Redis) {
|
||||
const redisService = {
|
||||
getOrThrow: () => redis,
|
||||
} as unknown as RedisService;
|
||||
return new EmbeddingReindexProgressService(redisService);
|
||||
}
|
||||
|
||||
describe('get', () => {
|
||||
it('maps a valid hash to a ReindexProgress object', async () => {
|
||||
const { redis, hgetall } = makeRedis();
|
||||
hgetall.mockResolvedValue({ total: '478', done: '120', startedAt: '1000' });
|
||||
const service = makeService(redis);
|
||||
|
||||
await expect(service.get(WORKSPACE_ID)).resolves.toEqual({
|
||||
total: 478,
|
||||
done: 120,
|
||||
startedAt: 1000,
|
||||
});
|
||||
expect(hgetall).toHaveBeenCalledWith(KEY);
|
||||
});
|
||||
|
||||
it('returns null for an empty hash (no record)', async () => {
|
||||
const { redis, hgetall } = makeRedis();
|
||||
hgetall.mockResolvedValue({});
|
||||
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it('returns null when `total` is missing (partial record)', async () => {
|
||||
const { redis, hgetall } = makeRedis();
|
||||
hgetall.mockResolvedValue({ done: '5' });
|
||||
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it('returns null for a non-numeric total', async () => {
|
||||
const { redis, hgetall } = makeRedis();
|
||||
hgetall.mockResolvedValue({ total: 'abc', done: '1', startedAt: '1' });
|
||||
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it('returns null for a non-numeric done', async () => {
|
||||
const { redis, hgetall } = makeRedis();
|
||||
hgetall.mockResolvedValue({ total: '10', done: 'xyz', startedAt: '1' });
|
||||
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||
});
|
||||
|
||||
it('coerces a non-finite startedAt to 0', async () => {
|
||||
const { redis, hgetall } = makeRedis();
|
||||
hgetall.mockResolvedValue({ total: '10', done: '2', startedAt: 'nope' });
|
||||
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toEqual({
|
||||
total: 10,
|
||||
done: 2,
|
||||
startedAt: 0,
|
||||
});
|
||||
});
|
||||
|
||||
it('degrades to null when hgetall throws (degradation contract)', async () => {
|
||||
const { redis, hgetall } = makeRedis();
|
||||
hgetall.mockRejectedValue(new Error('redis down'));
|
||||
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('start', () => {
|
||||
it('issues hset + expire on the workspace key', async () => {
|
||||
const { redis, multiObj } = makeRedis();
|
||||
await makeService(redis).start(WORKSPACE_ID, 478);
|
||||
|
||||
expect(multiObj.hset).toHaveBeenCalledWith(
|
||||
KEY,
|
||||
expect.objectContaining({ total: '478', done: '0' }),
|
||||
);
|
||||
expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number));
|
||||
expect(multiObj.exec).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('swallows a thrown Redis error (best-effort)', async () => {
|
||||
const { redis } = makeRedis({
|
||||
execImpl: () => Promise.reject(new Error('redis down')),
|
||||
});
|
||||
await expect(
|
||||
makeService(redis).start(WORKSPACE_ID, 1),
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('increment', () => {
|
||||
it('issues hincrby + expire on the workspace key', async () => {
|
||||
const { redis, multiObj } = makeRedis();
|
||||
await makeService(redis).increment(WORKSPACE_ID);
|
||||
|
||||
expect(multiObj.hincrby).toHaveBeenCalledWith(KEY, 'done', 1);
|
||||
expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number));
|
||||
expect(multiObj.exec).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('swallows a thrown Redis error (best-effort)', async () => {
|
||||
const { redis } = makeRedis({
|
||||
execImpl: () => Promise.reject(new Error('redis down')),
|
||||
});
|
||||
await expect(
|
||||
makeService(redis).increment(WORKSPACE_ID),
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('clear', () => {
|
||||
it('deletes the workspace key', async () => {
|
||||
const { redis, del } = makeRedis();
|
||||
await makeService(redis).clear(WORKSPACE_ID);
|
||||
expect(del).toHaveBeenCalledWith(KEY);
|
||||
});
|
||||
|
||||
it('swallows a thrown Redis error (best-effort)', async () => {
|
||||
const { redis, del } = makeRedis();
|
||||
del.mockRejectedValue(new Error('redis down'));
|
||||
await expect(
|
||||
makeService(redis).clear(WORKSPACE_ID),
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,149 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||
import type { Redis } from 'ioredis';
|
||||
|
||||
/**
|
||||
* Live progress of an in-flight workspace embeddings reindex run.
|
||||
* `total` is the number of pages the run will process, `done` how many it has
|
||||
* already processed (success OR handled failure), `startedAt` the epoch-ms the
|
||||
* record was created.
|
||||
*/
|
||||
export interface ReindexProgress {
|
||||
total: number;
|
||||
done: number;
|
||||
startedAt: number;
|
||||
}
|
||||
|
||||
/** Redis key namespace for the per-workspace reindex-progress record. */
|
||||
const KEY_PREFIX = 'ai:reindex:progress:';
|
||||
|
||||
/**
|
||||
* TTL (seconds) on the progress record so a crashed/aborted worker that never
|
||||
* reaches its `clear()` finally can still self-clean instead of leaving a stuck
|
||||
* "reindexing" state. Refreshed on every increment so a long run never expires
|
||||
* mid-flight; on a crash it disappears within TTL of the last processed page.
|
||||
*
|
||||
* INTENTIONALLY tied to WRITE progress (start/increment) only — never refreshed
|
||||
* on get(). Refreshing on read would keep a dead worker's record alive forever
|
||||
* as long as a client keeps polling (a permanently stuck reindexing:true). The
|
||||
* clear() in the worker's finally handles normal completion; a dead worker's
|
||||
* record expires after TTL, and the client's own poll cap stops polling anyway.
|
||||
*/
|
||||
const TTL_SECONDS = 60 * 60; // 1h
|
||||
|
||||
/**
|
||||
* Cluster-wide store for the live progress of a workspace embeddings reindex.
|
||||
*
|
||||
* The reindex runs in a BullMQ worker (AI_QUEUE) that may be a DIFFERENT process
|
||||
* than the API handling the settings-status GET, so the progress must live in
|
||||
* the shared Redis — we reuse the same global ioredis client (RedisService from
|
||||
* @nestjs-labs/nestjs-ioredis) that backs BullMQ and the other anti-abuse
|
||||
* limiters, adding NO new Redis config.
|
||||
*
|
||||
* Everything here is best-effort and COSMETIC: progress only drives the "Indexed
|
||||
* X of Y" counter while a reindex is running. Any Redis failure degrades to the
|
||||
* existing steady-state behaviour (the status falls back to the DB coverage
|
||||
* count), so reads fail to `null` and writes are swallowed — a reindex must
|
||||
* never break because progress reporting did.
|
||||
*
|
||||
* Stored as a Redis HASH so `done` can be bumped with an atomic HINCRBY (the
|
||||
* worker is the only writer of `done`, but HINCRBY also keeps us off a
|
||||
* read-modify-write race and preserves the other fields).
|
||||
*/
|
||||
@Injectable()
|
||||
export class EmbeddingReindexProgressService {
|
||||
private readonly logger = new Logger(EmbeddingReindexProgressService.name);
|
||||
private readonly redis: Redis;
|
||||
|
||||
constructor(redisService: RedisService) {
|
||||
this.redis = redisService.getOrThrow();
|
||||
}
|
||||
|
||||
private key(workspaceId: string): string {
|
||||
return KEY_PREFIX + workspaceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Begin (or reset) the progress record for a workspace: `total` pages, `done`
|
||||
* back to 0, `startedAt` now. Called at reindex enqueue time (placeholder
|
||||
* total, so the very first status poll already reports done=0) and again at
|
||||
* the worker start (overwriting `total` with the real page count). Resets
|
||||
* `done` to 0 so a re-trigger never inherits a stale count.
|
||||
*/
|
||||
async start(workspaceId: string, total: number): Promise<void> {
|
||||
const key = this.key(workspaceId);
|
||||
try {
|
||||
await this.redis
|
||||
.multi()
|
||||
.hset(key, {
|
||||
total: String(total),
|
||||
done: '0',
|
||||
startedAt: String(Date.now()),
|
||||
})
|
||||
.expire(key, TTL_SECONDS)
|
||||
.exec();
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`reindex-progress start failed for workspace ${workspaceId}; ` +
|
||||
`progress reporting disabled for this run: ${(err as Error).message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bump the processed-page counter by one and refresh the TTL. Atomic and
|
||||
* best-effort: a missing key (cleared/expired) would be recreated with only
|
||||
* `done`, but `get()` treats a record without a numeric `total` as inactive,
|
||||
* so that partial state safely reads as "no active reindex".
|
||||
*/
|
||||
async increment(workspaceId: string): Promise<void> {
|
||||
const key = this.key(workspaceId);
|
||||
try {
|
||||
await this.redis.multi().hincrby(key, 'done', 1).expire(key, TTL_SECONDS).exec();
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`reindex-progress increment failed for workspace ${workspaceId}: ` +
|
||||
`${(err as Error).message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the progress record. Called in the worker's `finally` so a completed,
|
||||
* aborted, or unconfigured-early-return run never leaves a stuck record; the
|
||||
* status then falls back to the DB coverage count.
|
||||
*/
|
||||
async clear(workspaceId: string): Promise<void> {
|
||||
try {
|
||||
await this.redis.del(this.key(workspaceId));
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`reindex-progress clear failed for workspace ${workspaceId} ` +
|
||||
`(self-cleans via TTL): ${(err as Error).message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the live progress, or `null` when no reindex is active (no record, an
|
||||
* expired record, or a partial record without a numeric `total`). On a Redis
|
||||
* error returns `null` so the status endpoint degrades to its DB count.
|
||||
*/
|
||||
async get(workspaceId: string): Promise<ReindexProgress | null> {
|
||||
try {
|
||||
const data = await this.redis.hgetall(this.key(workspaceId));
|
||||
if (!data || data.total === undefined) return null;
|
||||
const total = Number(data.total);
|
||||
const done = Number(data.done);
|
||||
const startedAt = Number(data.startedAt);
|
||||
if (!Number.isFinite(total) || !Number.isFinite(done)) return null;
|
||||
return { total, done, startedAt: Number.isFinite(startedAt) ? startedAt : 0 };
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`reindex-progress read failed for workspace ${workspaceId}; ` +
|
||||
`falling back to DB count: ${(err as Error).message}`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
import { Kysely } from 'kysely';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
|
||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||
import { getTestDb, destroyTestDb, createWorkspace, createSpace } from './db';
|
||||
|
||||
/**
|
||||
* `PageRepo.getEmbeddablePageIds` MUST stay in lockstep with
|
||||
* `PageRepo.countEmbeddablePages` (page.repo.ts) — the bulk reindex iterates the
|
||||
* ID set while the status endpoint reports the count as the live denominator, so
|
||||
* if the two predicates ever diverge the "done X of Y" counter ends on the wrong
|
||||
* total. Both share the SAME WHERE: a page qualifies iff it is non-deleted AND
|
||||
* (text_content has a non-whitespace char OR it has a non-deleted embedding row).
|
||||
*
|
||||
* This is a DB-level invariant: the predicate lives in raw SQL (`text_content ~
|
||||
* '[^[:space:]]'`) and an EXISTS subquery, so a unit test with mocked Kysely
|
||||
* cannot observe it. We seed every boundary case against real Postgres and
|
||||
* assert the returned ID set EQUALS the count (and is exactly the expected set).
|
||||
* A future edit that touches one predicate but not the other turns this red.
|
||||
*/
|
||||
describe('PageRepo embeddable-page set: getEmbeddablePageIds <-> countEmbeddablePages [integration]', () => {
|
||||
let db: Kysely<any>;
|
||||
let repo: PageRepo;
|
||||
let workspaceId: string;
|
||||
let spaceId: string;
|
||||
|
||||
beforeAll(async () => {
|
||||
db = getTestDb();
|
||||
// Only the Kysely-backed query methods under test are exercised, so the
|
||||
// SpaceMemberRepo / EventEmitter2 deps are never touched — stub them.
|
||||
repo = new PageRepo(
|
||||
db as any,
|
||||
{} as unknown as SpaceMemberRepo,
|
||||
{} as unknown as EventEmitter2,
|
||||
);
|
||||
workspaceId = (await createWorkspace(db)).id;
|
||||
spaceId = (await createSpace(db, workspaceId)).id;
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await destroyTestDb();
|
||||
});
|
||||
|
||||
// Insert a page with explicit text_content / deleted_at (createPage in db.ts
|
||||
// sets neither), returning its id so the test can assert membership.
|
||||
async function insertPage(args: {
|
||||
textContent: string | null;
|
||||
deletedAt?: Date | null;
|
||||
}): Promise<string> {
|
||||
const id = randomUUID();
|
||||
await db
|
||||
.insertInto('pages')
|
||||
.values({
|
||||
id,
|
||||
slugId: `slug-${id.slice(0, 8)}`,
|
||||
title: `page-${id.slice(0, 8)}`,
|
||||
spaceId,
|
||||
workspaceId,
|
||||
textContent: args.textContent,
|
||||
deletedAt: args.deletedAt ?? null,
|
||||
})
|
||||
.execute();
|
||||
return id;
|
||||
}
|
||||
|
||||
// Insert one embedding chunk row for a page (NOT NULL columns + deleted_at).
|
||||
async function insertEmbedding(
|
||||
pageId: string,
|
||||
opts: { deletedAt?: Date | null } = {},
|
||||
): Promise<void> {
|
||||
await db
|
||||
.insertInto('pageEmbeddings')
|
||||
.values({
|
||||
id: randomUUID(),
|
||||
workspaceId,
|
||||
pageId,
|
||||
spaceId,
|
||||
chunkIndex: 0,
|
||||
chunkStart: 0,
|
||||
chunkLength: 1,
|
||||
content: 'x',
|
||||
modelName: 'test-model',
|
||||
modelDimensions: 1,
|
||||
deletedAt: opts.deletedAt ?? null,
|
||||
})
|
||||
.execute();
|
||||
}
|
||||
|
||||
it('returns exactly the embeddable set and its size equals countEmbeddablePages', async () => {
|
||||
// IN the set --------------------------------------------------------------
|
||||
// (a) non-deleted page with real body text.
|
||||
const withText = await insertPage({ textContent: 'hello world' });
|
||||
// (b) non-deleted page with NO text but a live embedding row (EXISTS clause:
|
||||
// a page that lost its text yet still has stale vectors must be visited
|
||||
// so the reindex can clear them).
|
||||
const noTextLiveEmbedding = await insertPage({ textContent: null });
|
||||
await insertEmbedding(noTextLiveEmbedding);
|
||||
|
||||
// OUT of the set ----------------------------------------------------------
|
||||
// (c) non-deleted, text_content NULL, no embeddings.
|
||||
await insertPage({ textContent: null });
|
||||
// (d) non-deleted, whitespace-only text (regex requires a non-space char).
|
||||
await insertPage({ textContent: ' \n\t ' });
|
||||
// (e) deleted page WITH body text — excluded by the non-deleted predicate.
|
||||
await insertPage({
|
||||
textContent: 'deleted but had text',
|
||||
deletedAt: new Date(),
|
||||
});
|
||||
// (f) non-deleted, no text, with ONLY a DELETED embedding row — the EXISTS
|
||||
// subquery filters pe.deleted_at IS NULL, so this stays out.
|
||||
const onlyDeletedEmbedding = await insertPage({ textContent: null });
|
||||
await insertEmbedding(onlyDeletedEmbedding, { deletedAt: new Date() });
|
||||
|
||||
const ids = await repo.getEmbeddablePageIds(workspaceId);
|
||||
const count = await repo.countEmbeddablePages(workspaceId);
|
||||
|
||||
// The two queries agree on the size (the load-bearing lockstep invariant)...
|
||||
expect(ids.length).toBe(count);
|
||||
// ...and the set is exactly the two qualifying pages, nothing else.
|
||||
expect(new Set(ids)).toEqual(new Set([withText, noTextLiveEmbedding]));
|
||||
expect(count).toBe(2);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user