Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0eae69086 | ||
|
|
91f24fc062 | ||
|
|
bdc033e689 | ||
|
|
85b38d6946 | ||
|
|
bf09eec4e1 | ||
|
|
95d07d8d6f | ||
|
|
630939e8f3 | ||
|
|
72bb03918d |
@@ -3,6 +3,9 @@ import {
|
|||||||
resolveCardStatus,
|
resolveCardStatus,
|
||||||
isEndpointConfigured,
|
isEndpointConfigured,
|
||||||
resolveKeyField,
|
resolveKeyField,
|
||||||
|
nextReindexPollInterval,
|
||||||
|
isReindexComplete,
|
||||||
|
isReindexButtonLoading,
|
||||||
} from './ai-provider-settings';
|
} from './ai-provider-settings';
|
||||||
|
|
||||||
describe('resolveCardStatus', () => {
|
describe('resolveCardStatus', () => {
|
||||||
@@ -71,3 +74,152 @@ describe('resolveKeyField (write-only key payload)', () => {
|
|||||||
expect(resolveKeyField('', false)).toEqual({ set: false });
|
expect(resolveKeyField('', false)).toEqual({ set: false });
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('nextReindexPollInterval', () => {
|
||||||
|
const INTERVAL = 5000;
|
||||||
|
const base = { now: 1_000, intervalMs: INTERVAL };
|
||||||
|
|
||||||
|
it('does not poll when no reindex deadline is set', () => {
|
||||||
|
expect(
|
||||||
|
nextReindexPollInterval({
|
||||||
|
...base,
|
||||||
|
deadline: null,
|
||||||
|
status: { reindexing: true, indexedPages: 0, totalPages: 478 },
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps polling while the server reports an active run', () => {
|
||||||
|
expect(
|
||||||
|
nextReindexPollInterval({
|
||||||
|
...base,
|
||||||
|
deadline: 10_000,
|
||||||
|
status: { reindexing: true, indexedPages: 120, totalPages: 478 },
|
||||||
|
}),
|
||||||
|
).toBe(INTERVAL);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps polling during an active run even if counts momentarily look full', () => {
|
||||||
|
// The run clears its progress record only at the very end, so a transient
|
||||||
|
// indexed==total while reindexing is still true must NOT stop polling.
|
||||||
|
expect(
|
||||||
|
nextReindexPollInterval({
|
||||||
|
...base,
|
||||||
|
deadline: 10_000,
|
||||||
|
status: { reindexing: true, indexedPages: 478, totalPages: 478 },
|
||||||
|
}),
|
||||||
|
).toBe(INTERVAL);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('stops once the run is finished AND fully indexed', () => {
|
||||||
|
expect(
|
||||||
|
nextReindexPollInterval({
|
||||||
|
...base,
|
||||||
|
deadline: 10_000,
|
||||||
|
status: { reindexing: false, indexedPages: 478, totalPages: 478 },
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps polling within the deadline when not yet done and no active flag', () => {
|
||||||
|
// First poll right after enqueue, before the worker publishes progress.
|
||||||
|
expect(
|
||||||
|
nextReindexPollInterval({
|
||||||
|
...base,
|
||||||
|
deadline: 10_000,
|
||||||
|
status: { reindexing: false, indexedPages: 0, totalPages: 478 },
|
||||||
|
}),
|
||||||
|
).toBe(INTERVAL);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('cap always wins: stops once past the deadline even if still reindexing', () => {
|
||||||
|
expect(
|
||||||
|
nextReindexPollInterval({
|
||||||
|
deadline: 1_000,
|
||||||
|
now: 2_000, // past the deadline
|
||||||
|
intervalMs: INTERVAL,
|
||||||
|
status: { reindexing: true, indexedPages: 200, totalPages: 478 },
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('stops on an empty workspace (0 of 0) once the run is finished', () => {
|
||||||
|
expect(
|
||||||
|
nextReindexPollInterval({
|
||||||
|
...base,
|
||||||
|
deadline: 10_000,
|
||||||
|
status: { reindexing: false, indexedPages: 0, totalPages: 0 },
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('isReindexComplete', () => {
|
||||||
|
it('false when no status yet', () => {
|
||||||
|
expect(isReindexComplete(undefined)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('false while a run is still active (even at indexed==total)', () => {
|
||||||
|
expect(
|
||||||
|
isReindexComplete({ reindexing: true, indexedPages: 478, totalPages: 478 }),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('false when finished but not yet fully indexed', () => {
|
||||||
|
expect(
|
||||||
|
isReindexComplete({ reindexing: false, indexedPages: 120, totalPages: 478 }),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('true once finished and fully indexed', () => {
|
||||||
|
expect(
|
||||||
|
isReindexComplete({ reindexing: false, indexedPages: 478, totalPages: 478 }),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('isReindexButtonLoading', () => {
|
||||||
|
it('loads while the POST mutation is pending', () => {
|
||||||
|
expect(
|
||||||
|
isReindexButtonLoading({
|
||||||
|
mutationPending: true,
|
||||||
|
deadline: null,
|
||||||
|
status: false,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT load post-cap: deadline nulled but reindexing left stale-true', () => {
|
||||||
|
// The key case: after the poll cap fires `reindexDeadline` is null while
|
||||||
|
// `settings.reindexing` can be a stale `true` from the last poll. Gating on
|
||||||
|
// the deadline keeps the spinner from sticking forever so the admin can
|
||||||
|
// restart.
|
||||||
|
expect(
|
||||||
|
isReindexButtonLoading({
|
||||||
|
mutationPending: false,
|
||||||
|
deadline: null,
|
||||||
|
status: true,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('loads during an active run within the poll window', () => {
|
||||||
|
expect(
|
||||||
|
isReindexButtonLoading({
|
||||||
|
mutationPending: false,
|
||||||
|
deadline: 10_000,
|
||||||
|
status: true,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not load once the run finished while still polling', () => {
|
||||||
|
expect(
|
||||||
|
isReindexButtonLoading({
|
||||||
|
mutationPending: false,
|
||||||
|
deadline: 10_000,
|
||||||
|
status: false,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ import {
|
|||||||
} from "@/features/workspace/queries/ai-settings-query.ts";
|
} from "@/features/workspace/queries/ai-settings-query.ts";
|
||||||
import {
|
import {
|
||||||
AiTestCapability,
|
AiTestCapability,
|
||||||
|
IAiSettings,
|
||||||
IAiSettingsUpdate,
|
IAiSettingsUpdate,
|
||||||
SttApiStyle,
|
SttApiStyle,
|
||||||
ChatApiStyle,
|
ChatApiStyle,
|
||||||
@@ -169,6 +170,73 @@ export function resolveKeyField(
|
|||||||
return { set: false };
|
return { set: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subset of the status payload that drives the reindex poll decisions.
|
||||||
|
type ReindexStatus = Pick<
|
||||||
|
IAiSettings,
|
||||||
|
"reindexing" | "indexedPages" | "totalPages"
|
||||||
|
>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decide the TanStack Query `refetchInterval` while a reindex may be running.
|
||||||
|
* Returns the poll interval (ms) to keep polling, or `false` to stop.
|
||||||
|
*
|
||||||
|
* Polls while the server reports an ACTIVE run (`reindexing === true`) OR we are
|
||||||
|
* still within the deadline window and not yet fully indexed. Stops once the run
|
||||||
|
* has finished AND everything is indexed (server cleared its progress record and
|
||||||
|
* fell back to the DB coverage count), or the deadline cap is hit — the cap
|
||||||
|
* always wins so a stuck/never-clearing progress record can't poll forever.
|
||||||
|
*/
|
||||||
|
export function nextReindexPollInterval(args: {
|
||||||
|
deadline: number | null;
|
||||||
|
now: number;
|
||||||
|
intervalMs: number;
|
||||||
|
status?: ReindexStatus;
|
||||||
|
}): number | false {
|
||||||
|
const { deadline, now, intervalMs, status } = args;
|
||||||
|
if (deadline === null) return false;
|
||||||
|
// Cap always wins.
|
||||||
|
if (now > deadline) return false;
|
||||||
|
// Active run → keep polling even if the momentary counts already look full.
|
||||||
|
if (status?.reindexing) return intervalMs;
|
||||||
|
// Finished and fully indexed (incl. an empty workspace, 0 >= 0) → stop. Reuse
|
||||||
|
// isReindexComplete so the completeness check lives in exactly one place.
|
||||||
|
if (isReindexComplete(status)) return false;
|
||||||
|
// Within the deadline and not yet done → keep polling.
|
||||||
|
return intervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the reindex poll deadline should be cleared: the server reports no
|
||||||
|
* active run AND the count is complete. The single source of truth for the
|
||||||
|
* "reindex finished" check — `nextReindexPollInterval` reuses it for its stop
|
||||||
|
* condition (sans the cap, which the effect handles via time).
|
||||||
|
*/
|
||||||
|
export function isReindexComplete(status?: ReindexStatus): boolean {
|
||||||
|
return (
|
||||||
|
!!status && !status.reindexing && status.indexedPages >= status.totalPages
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the reindex button should show its spinner (and stay disabled).
|
||||||
|
*
|
||||||
|
* Spins while the POST is in flight, and for the WHOLE background run while the
|
||||||
|
* server reports `reindexing === true`. The `deadline !== null` gate is the
|
||||||
|
* load-bearing part: once the 120s poll cap fires it nulls `reindexDeadline`
|
||||||
|
* and stops refetching, so `status` (settings?.reindexing) can be a stale
|
||||||
|
* `true` from the last poll. Without the gate the spinner would stick forever
|
||||||
|
* for a run that outlives the cap and block a restart; gating on the active
|
||||||
|
* poll window clears it so the admin can re-trigger.
|
||||||
|
*/
|
||||||
|
export function isReindexButtonLoading(args: {
|
||||||
|
mutationPending: boolean;
|
||||||
|
deadline: number | null;
|
||||||
|
status?: boolean;
|
||||||
|
}): boolean {
|
||||||
|
const { mutationPending, deadline, status } = args;
|
||||||
|
return mutationPending || (deadline !== null && status === true);
|
||||||
|
}
|
||||||
|
|
||||||
// Translate the dot's tooltip label. Kept in one place so all three endpoint
|
// Translate the dot's tooltip label. Kept in one place so all three endpoint
|
||||||
// cards share identical wording.
|
// cards share identical wording.
|
||||||
function cardStatusLabel(status: CardStatus, t: (k: string) => string): string {
|
function cardStatusLabel(status: CardStatus, t: (k: string) => string): string {
|
||||||
@@ -215,31 +283,34 @@ export default function AiProviderSettings() {
|
|||||||
// PRE-job counts immediately, so the only way the "Indexed X of Y" counter
|
// PRE-job counts immediately, so the only way the "Indexed X of Y" counter
|
||||||
// visibly climbs is to keep polling the settings query while the job runs.
|
// visibly climbs is to keep polling the settings query while the job runs.
|
||||||
// `reindexDeadline` is the timestamp until which we poll (set on reindex
|
// `reindexDeadline` is the timestamp until which we poll (set on reindex
|
||||||
// success); polling stops early once indexed === total. Bounded so a stuck
|
// success). Polling tracks the server's `reindexing` flag: it keeps going for
|
||||||
// job can never poll forever.
|
// the whole active run and stops promptly once the server reports the run is
|
||||||
const REINDEX_POLL_INTERVAL = 3000; // ms between refetches while indexing
|
// finished. Bounded by the cap so a stuck/never-clearing progress record can
|
||||||
|
// never poll forever.
|
||||||
|
const REINDEX_POLL_INTERVAL = 5000; // ms between refetches while indexing
|
||||||
const REINDEX_POLL_CAP_MS = 120000; // ~2 min hard cap
|
const REINDEX_POLL_CAP_MS = 120000; // ~2 min hard cap
|
||||||
const [reindexDeadline, setReindexDeadline] = useState<number | null>(null);
|
const [reindexDeadline, setReindexDeadline] = useState<number | null>(null);
|
||||||
|
|
||||||
// Only admins may read the (masked) AI settings; the server enforces this too.
|
// Only admins may read the (masked) AI settings; the server enforces this too.
|
||||||
const { data: settings, isLoading } = useAiSettingsQuery(isAdmin, (query) => {
|
const { data: settings, isLoading } = useAiSettingsQuery(isAdmin, (query) =>
|
||||||
if (reindexDeadline === null) return false;
|
nextReindexPollInterval({
|
||||||
// Past the cap → stop polling (cleared via the effect below too).
|
deadline: reindexDeadline,
|
||||||
if (Date.now() > reindexDeadline) return false;
|
now: Date.now(),
|
||||||
const data = query.state.data;
|
intervalMs: REINDEX_POLL_INTERVAL,
|
||||||
// Stop once everything is indexed; otherwise keep polling.
|
status: query.state.data,
|
||||||
if (data && data.indexedPages >= data.totalPages) return false;
|
}),
|
||||||
return REINDEX_POLL_INTERVAL;
|
);
|
||||||
});
|
|
||||||
|
|
||||||
// Stop polling once the work is done or the cap is reached. Also clears on
|
// Stop polling once the run is finished or the cap is reached. Also clears on
|
||||||
// unmount because the deadline state goes away with the component.
|
// unmount because the deadline state goes away with the component.
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (reindexDeadline === null) return;
|
if (reindexDeadline === null) return;
|
||||||
// "Done" matches the refetchInterval stop condition (indexed >= total),
|
// "Done" matches the refetchInterval stop condition: the server reports no
|
||||||
// including an empty workspace (0 >= 0), so the deadline clears promptly
|
// active run AND the count is complete (indexed >= total, incl. an empty
|
||||||
// instead of waiting out the cap.
|
// workspace 0 >= 0), so the deadline clears promptly instead of waiting out
|
||||||
if (settings && settings.indexedPages >= settings.totalPages) {
|
// the cap. While `reindexing` is still true we keep the deadline so polling
|
||||||
|
// continues for the whole run.
|
||||||
|
if (isReindexComplete(settings)) {
|
||||||
setReindexDeadline(null);
|
setReindexDeadline(null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -1031,7 +1102,17 @@ export default function AiProviderSettings() {
|
|||||||
<Button
|
<Button
|
||||||
variant="subtle"
|
variant="subtle"
|
||||||
size="compact-sm"
|
size="compact-sm"
|
||||||
loading={reindexMutation.isPending}
|
// Spin for the WHOLE run: the POST resolves immediately, but the
|
||||||
|
// background job keeps running, so also stay loading while the
|
||||||
|
// server reports `reindexing` (this also blocks a redundant
|
||||||
|
// re-trigger mid-run; the server de-dupes regardless). The
|
||||||
|
// deadline gate (and why it matters post-cap) lives in
|
||||||
|
// `isReindexButtonLoading`, which is unit-tested.
|
||||||
|
loading={isReindexButtonLoading({
|
||||||
|
mutationPending: reindexMutation.isPending,
|
||||||
|
deadline: reindexDeadline,
|
||||||
|
status: settings?.reindexing,
|
||||||
|
})}
|
||||||
onClick={() =>
|
onClick={() =>
|
||||||
reindexMutation.mutate(undefined, {
|
reindexMutation.mutate(undefined, {
|
||||||
// Begin bounded polling so the counter climbs as the async
|
// Begin bounded polling so the counter climbs as the async
|
||||||
|
|||||||
@@ -23,8 +23,12 @@ export function useAiSettingsQuery(
|
|||||||
enabled: boolean = true,
|
enabled: boolean = true,
|
||||||
// While reindexing runs as an async background job, the counter only climbs
|
// While reindexing runs as an async background job, the counter only climbs
|
||||||
// if the client keeps refetching. The component passes a refetchInterval
|
// if the client keeps refetching. The component passes a refetchInterval
|
||||||
// function that polls until indexed === total or a bounded deadline, then
|
// function (`nextReindexPollInterval`) that keeps polling while the server
|
||||||
// returns false to stop. See AiProviderSettings.
|
// reports an active run (reindexing === true) OR we are still within the
|
||||||
|
// bounded deadline and not yet fully indexed; it returns false to stop only
|
||||||
|
// once the run has finished AND indexed >= total, or the deadline cap is hit
|
||||||
|
// (the cap always wins). Note: a transient indexed === total during an active
|
||||||
|
// run does NOT stop polling. See AiProviderSettings.
|
||||||
refetchInterval?:
|
refetchInterval?:
|
||||||
| number
|
| number
|
||||||
| false
|
| false
|
||||||
|
|||||||
@@ -48,6 +48,9 @@ export interface IAiSettings {
|
|||||||
// RAG indexing coverage (pages indexed for semantic search).
|
// RAG indexing coverage (pages indexed for semantic search).
|
||||||
indexedPages: number;
|
indexedPages: number;
|
||||||
totalPages: number;
|
totalPages: number;
|
||||||
|
// True while a full workspace reindex is actively running; the counts above
|
||||||
|
// then reflect the live run progress (done climbs 0 -> total).
|
||||||
|
reindexing?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update payload. Key semantics (same for `apiKey` and `embeddingApiKey`):
|
// Update payload. Key semantics (same for `apiKey` and `embeddingApiKey`):
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
|||||||
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
||||||
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
||||||
import { AiService } from '../../../integrations/ai/ai.service';
|
import { AiService } from '../../../integrations/ai/ai.service';
|
||||||
|
import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service';
|
||||||
|
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for EmbeddingIndexerService.reindexWorkspace's batch control flow.
|
* Unit tests for EmbeddingIndexerService.reindexWorkspace's batch control flow.
|
||||||
@@ -12,7 +14,8 @@ import { AiService } from '../../../integrations/ai/ai.service';
|
|||||||
* reindexWorkspace actually touches:
|
* reindexWorkspace actually touches:
|
||||||
* - aiService.getEmbeddingModel -> a model string so the up-front configured
|
* - aiService.getEmbeddingModel -> a model string so the up-front configured
|
||||||
* check passes,
|
* check passes,
|
||||||
* - pageRepo.getIdsByWorkspace -> three page ids,
|
* - pageRepo.getEmbeddablePageIds -> three page ids (the embeddable set the
|
||||||
|
* reindex iterates),
|
||||||
* - service.reindexPage -> spied per test to drive the per-page outcome.
|
* - service.reindexPage -> spied per test to drive the per-page outcome.
|
||||||
*
|
*
|
||||||
* The point under test is the catch block: a FATAL provider error (auth/billing)
|
* The point under test is the catch block: a FATAL provider error (auth/billing)
|
||||||
@@ -24,21 +27,30 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => {
|
|||||||
|
|
||||||
function makeService() {
|
function makeService() {
|
||||||
const pageRepo = {
|
const pageRepo = {
|
||||||
getIdsByWorkspace: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']),
|
getEmbeddablePageIds: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']),
|
||||||
};
|
};
|
||||||
const pageEmbeddingRepo = {};
|
const pageEmbeddingRepo = {};
|
||||||
const aiService = {
|
const aiService = {
|
||||||
getEmbeddingModel: jest.fn().mockResolvedValue('some-model'),
|
getEmbeddingModel: jest.fn().mockResolvedValue('some-model'),
|
||||||
};
|
};
|
||||||
|
// Progress is a best-effort cosmetic store; mock its async methods so the
|
||||||
|
// batch control flow can be tested without Redis.
|
||||||
|
const reindexProgress = {
|
||||||
|
start: jest.fn().mockResolvedValue(undefined),
|
||||||
|
increment: jest.fn().mockResolvedValue(undefined),
|
||||||
|
clear: jest.fn().mockResolvedValue(undefined),
|
||||||
|
get: jest.fn().mockResolvedValue(null),
|
||||||
|
};
|
||||||
const db = {};
|
const db = {};
|
||||||
|
|
||||||
const service = new EmbeddingIndexerService(
|
const service = new EmbeddingIndexerService(
|
||||||
pageRepo as unknown as PageRepo,
|
pageRepo as unknown as PageRepo,
|
||||||
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
||||||
aiService as unknown as AiService,
|
aiService as unknown as AiService,
|
||||||
|
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||||
db as unknown as KyselyDB,
|
db as unknown as KyselyDB,
|
||||||
);
|
);
|
||||||
return { service, pageRepo, aiService };
|
return { service, pageRepo, aiService, reindexProgress };
|
||||||
}
|
}
|
||||||
|
|
||||||
it('aborts after the first page on a FATAL (401) provider error', async () => {
|
it('aborts after the first page on a FATAL (401) provider error', async () => {
|
||||||
@@ -78,3 +90,100 @@ describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => {
|
|||||||
expect(reindexPage).toHaveBeenCalledTimes(3);
|
expect(reindexPage).toHaveBeenCalledTimes(3);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Live reindex-progress reporting: reindexWorkspace must publish a per-workspace
|
||||||
|
* progress record (total at start, done incremented per processed page) and ALWAYS
|
||||||
|
* clear it in a finally — including on a fatal abort and an unconfigured early
|
||||||
|
* return — so the settings status can show the counter climb without ever getting
|
||||||
|
* stuck in a "reindexing" state.
|
||||||
|
*/
|
||||||
|
describe('EmbeddingIndexerService.reindexWorkspace progress', () => {
|
||||||
|
const WORKSPACE_ID = 'ws-1';
|
||||||
|
|
||||||
|
function makeService(pageIds: string[] = ['p1', 'p2', 'p3']) {
|
||||||
|
const pageRepo = {
|
||||||
|
getEmbeddablePageIds: jest.fn().mockResolvedValue(pageIds),
|
||||||
|
};
|
||||||
|
const pageEmbeddingRepo = {};
|
||||||
|
const aiService = {
|
||||||
|
getEmbeddingModel: jest.fn().mockResolvedValue('some-model'),
|
||||||
|
};
|
||||||
|
const reindexProgress = {
|
||||||
|
start: jest.fn().mockResolvedValue(undefined),
|
||||||
|
increment: jest.fn().mockResolvedValue(undefined),
|
||||||
|
clear: jest.fn().mockResolvedValue(undefined),
|
||||||
|
get: jest.fn().mockResolvedValue(null),
|
||||||
|
};
|
||||||
|
const db = {};
|
||||||
|
const service = new EmbeddingIndexerService(
|
||||||
|
pageRepo as unknown as PageRepo,
|
||||||
|
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
||||||
|
aiService as unknown as AiService,
|
||||||
|
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||||
|
db as unknown as KyselyDB,
|
||||||
|
);
|
||||||
|
return { service, pageRepo, aiService, reindexProgress };
|
||||||
|
}
|
||||||
|
|
||||||
|
it('sets total at start, increments done per page, and clears in finally', async () => {
|
||||||
|
const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']);
|
||||||
|
jest.spyOn(service, 'reindexPage').mockResolvedValue(undefined);
|
||||||
|
|
||||||
|
await service.reindexWorkspace(WORKSPACE_ID);
|
||||||
|
|
||||||
|
expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3);
|
||||||
|
// One increment per processed page.
|
||||||
|
expect(reindexProgress.increment).toHaveBeenCalledTimes(3);
|
||||||
|
expect(reindexProgress.increment).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||||
|
// Cleared exactly once on completion.
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('counts a handled (non-fatal) per-page failure as processed', async () => {
|
||||||
|
const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']);
|
||||||
|
// No statusCode -> non-fatal -> isolate and continue; each counts as done.
|
||||||
|
jest.spyOn(service, 'reindexPage').mockRejectedValue(new Error('boom'));
|
||||||
|
|
||||||
|
await service.reindexWorkspace(WORKSPACE_ID);
|
||||||
|
|
||||||
|
expect(reindexProgress.increment).toHaveBeenCalledTimes(3);
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('clears progress in finally even when a FATAL provider error aborts the batch', async () => {
|
||||||
|
const { service, reindexProgress } = makeService(['p1', 'p2', 'p3']);
|
||||||
|
// A 401 aborts on the first page (re-thrown) — the finally must still clear.
|
||||||
|
jest
|
||||||
|
.spyOn(service, 'reindexPage')
|
||||||
|
.mockRejectedValue({ statusCode: 401, message: 'User not found' });
|
||||||
|
|
||||||
|
await expect(service.reindexWorkspace(WORKSPACE_ID)).rejects.toMatchObject({
|
||||||
|
statusCode: 401,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 3);
|
||||||
|
// Aborted page is NOT counted as processed.
|
||||||
|
expect(reindexProgress.increment).not.toHaveBeenCalled();
|
||||||
|
// But progress is still cleared so the run never gets stuck.
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('clears the enqueue-seeded progress on an unconfigured early return', async () => {
|
||||||
|
const { service, aiService, reindexProgress } = makeService();
|
||||||
|
// Embeddings not configured: reindexWorkspace returns early WITHOUT starting
|
||||||
|
// a fresh record, but the finally must still clear the enqueue-time seed.
|
||||||
|
aiService.getEmbeddingModel = jest
|
||||||
|
.fn()
|
||||||
|
.mockRejectedValue(new AiEmbeddingNotConfiguredException());
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
service.reindexWorkspace(WORKSPACE_ID),
|
||||||
|
).resolves.toBeUndefined();
|
||||||
|
|
||||||
|
expect(reindexProgress.start).not.toHaveBeenCalled();
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledTimes(1);
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import { KyselyDB } from '@docmost/db/types/kysely.types';
|
|||||||
import { InjectKysely } from 'nestjs-kysely';
|
import { InjectKysely } from 'nestjs-kysely';
|
||||||
import { executeTx } from '@docmost/db/utils';
|
import { executeTx } from '@docmost/db/utils';
|
||||||
import { AiService } from '../../../integrations/ai/ai.service';
|
import { AiService } from '../../../integrations/ai/ai.service';
|
||||||
|
import { EmbeddingReindexProgressService } from '../../../integrations/ai/embedding-reindex-progress.service';
|
||||||
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
|
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
|
||||||
import {
|
import {
|
||||||
describeProviderError,
|
describeProviderError,
|
||||||
@@ -48,6 +49,7 @@ export class EmbeddingIndexerService {
|
|||||||
private readonly pageRepo: PageRepo,
|
private readonly pageRepo: PageRepo,
|
||||||
private readonly pageEmbeddingRepo: PageEmbeddingRepo,
|
private readonly pageEmbeddingRepo: PageEmbeddingRepo,
|
||||||
private readonly aiService: AiService,
|
private readonly aiService: AiService,
|
||||||
|
private readonly reindexProgress: EmbeddingReindexProgressService,
|
||||||
@InjectKysely() private readonly db: KyselyDB,
|
@InjectKysely() private readonly db: KyselyDB,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
@@ -183,7 +185,19 @@ export class EmbeddingIndexerService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* (Re)build embeddings for EVERY non-deleted page in a workspace. Used by the
|
* (Re)build embeddings for the EMBEDDABLE page set of a workspace — the same
|
||||||
|
* set countEmbeddablePages counts (via getEmbeddablePageIds): non-deleted pages
|
||||||
|
* that qualify under any of the three clauses of `embeddablePredicate` —
|
||||||
|
* non-empty textContent, OR an empty/null textContent whose ProseMirror
|
||||||
|
* `content` JSON has at least one text node (`"type":"text"`) that `jsonToText`
|
||||||
|
* can extract, OR an already-stored (non-deleted) embedding row — NOT every
|
||||||
|
* non-deleted page. Iterating this set keeps the live `total` equal to the
|
||||||
|
* steady-state denominator, so the progress counter climbs 0 -> total and
|
||||||
|
* matches the before/after DB coverage exactly. A page with truly no
|
||||||
|
* extractable text (empty textContent AND content with only non-text/atom
|
||||||
|
* nodes such as math) is correctly skipped (reindexPage no-ops on it); a page
|
||||||
|
* that lost its text but still has stale embeddings stays in the set (the
|
||||||
|
* EXISTS clause) so it is visited and its stale rows are cleared. Used by the
|
||||||
* bulk reindex (WORKSPACE_CREATE_EMBEDDINGS, fired when AI Search is enabled
|
* bulk reindex (WORKSPACE_CREATE_EMBEDDINGS, fired when AI Search is enabled
|
||||||
* and by the manual "Reindex now" action).
|
* and by the manual "Reindex now" action).
|
||||||
*
|
*
|
||||||
@@ -194,69 +208,99 @@ export class EmbeddingIndexerService {
|
|||||||
* the batch.
|
* the batch.
|
||||||
*/
|
*/
|
||||||
async reindexWorkspace(workspaceId: string): Promise<void> {
|
async reindexWorkspace(workspaceId: string): Promise<void> {
|
||||||
|
// The whole run is wrapped so the per-workspace progress record is ALWAYS
|
||||||
|
// cleared in the finally — on success, on a fatal-provider abort, on an
|
||||||
|
// unconfigured early-return, or on any unexpected throw — so a failed run
|
||||||
|
// never leaves a stuck "reindexing" state (the status then falls back to the
|
||||||
|
// steady-state DB coverage count). A placeholder record may already exist
|
||||||
|
// (seeded at enqueue time); the finally cleans that too.
|
||||||
try {
|
try {
|
||||||
await this.aiService.getEmbeddingModel(workspaceId);
|
|
||||||
} catch (err) {
|
|
||||||
if (err instanceof AiEmbeddingNotConfiguredException) {
|
|
||||||
this.logger.log(
|
|
||||||
`reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`,
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
|
|
||||||
const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId);
|
|
||||||
const total = pageIds.length;
|
|
||||||
const startedAt = Date.now();
|
|
||||||
this.logger.log(
|
|
||||||
`reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
let failed = 0;
|
|
||||||
for (let i = 0; i < total; i++) {
|
|
||||||
const pageId = pageIds[i];
|
|
||||||
const position = i + 1;
|
|
||||||
// Log BEFORE the await: if the embedding call hangs, this is the last line
|
|
||||||
// in the log and it names the exact page that is stuck.
|
|
||||||
this.logger.log(
|
|
||||||
`reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`,
|
|
||||||
);
|
|
||||||
const pageStartedAt = Date.now();
|
|
||||||
try {
|
try {
|
||||||
await this.reindexPage(pageId);
|
await this.aiService.getEmbeddingModel(workspaceId);
|
||||||
const elapsed = Date.now() - pageStartedAt;
|
|
||||||
if (elapsed >= SLOW_PAGE_MS) {
|
|
||||||
this.logger.warn(
|
|
||||||
`reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// A fatal provider error (invalid/missing key, no credits) recurs
|
if (err instanceof AiEmbeddingNotConfiguredException) {
|
||||||
// identically on EVERY remaining page. Abort the whole batch instead of
|
this.logger.log(
|
||||||
// issuing hundreds of doomed requests against the provider.
|
`reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`,
|
||||||
if (isFatalProviderError(err)) {
|
|
||||||
this.logger.error(
|
|
||||||
`reindexWorkspace: aborting at [${position}/${total}] for workspace ` +
|
|
||||||
`${workspaceId} — fatal provider error, remaining pages would fail ` +
|
|
||||||
`identically: ${describeProviderError(err)}`,
|
|
||||||
);
|
);
|
||||||
throw err;
|
return;
|
||||||
}
|
}
|
||||||
// Per-page isolation: one non-fatal failure (incl. an embedding timeout)
|
throw err;
|
||||||
// must not abort the whole batch.
|
|
||||||
failed++;
|
|
||||||
this.logger.error(
|
|
||||||
`reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` +
|
|
||||||
`after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.log(
|
// Iterate the EMBEDDABLE set (same three-clause predicate as
|
||||||
`reindexWorkspace: done for workspace ${workspaceId}: ` +
|
// countEmbeddablePages), NOT every non-deleted page: this makes `total`
|
||||||
`${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`,
|
// here equal the steady-state denominator, so the live counter climbs
|
||||||
);
|
// 0 -> total and matches the before/after DB count exactly (no
|
||||||
|
// 478 -> 500 -> 478 denominator jump). Pages whose text lives in the
|
||||||
|
// ProseMirror `content` JSON (a text node) even with empty text_content ARE
|
||||||
|
// in this set (the content-JSON clause) and get embedded; a page with no
|
||||||
|
// extractable text at all is correctly skipped — reindexPage no-ops on it —
|
||||||
|
// and a page that lost its text but still has stale embeddings IS in this
|
||||||
|
// set (the EXISTS clause) so it is still visited and its stale rows cleared.
|
||||||
|
const pageIds = await this.pageRepo.getEmbeddablePageIds(workspaceId);
|
||||||
|
const total = pageIds.length;
|
||||||
|
const startedAt = Date.now();
|
||||||
|
// Publish the live run progress over this same set (done reset to 0). The
|
||||||
|
// counter increments once per iterated page and reaches exactly `total`,
|
||||||
|
// which equals countEmbeddablePages — the steady-state denominator.
|
||||||
|
await this.reindexProgress.start(workspaceId, total);
|
||||||
|
this.logger.log(
|
||||||
|
`reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
let failed = 0;
|
||||||
|
for (let i = 0; i < total; i++) {
|
||||||
|
const pageId = pageIds[i];
|
||||||
|
const position = i + 1;
|
||||||
|
// Log BEFORE the await: if the embedding call hangs, this is the last line
|
||||||
|
// in the log and it names the exact page that is stuck.
|
||||||
|
this.logger.log(
|
||||||
|
`reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`,
|
||||||
|
);
|
||||||
|
const pageStartedAt = Date.now();
|
||||||
|
try {
|
||||||
|
await this.reindexPage(pageId);
|
||||||
|
// Count this page as processed (matches the [position/total] log).
|
||||||
|
await this.reindexProgress.increment(workspaceId);
|
||||||
|
const elapsed = Date.now() - pageStartedAt;
|
||||||
|
if (elapsed >= SLOW_PAGE_MS) {
|
||||||
|
this.logger.warn(
|
||||||
|
`reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
// A fatal provider error (invalid/missing key, no credits) recurs
|
||||||
|
// identically on EVERY remaining page. Abort the whole batch instead of
|
||||||
|
// issuing hundreds of doomed requests against the provider. Do NOT count
|
||||||
|
// it as processed — the run aborts here (the finally clears progress).
|
||||||
|
if (isFatalProviderError(err)) {
|
||||||
|
this.logger.error(
|
||||||
|
`reindexWorkspace: aborting at [${position}/${total}] for workspace ` +
|
||||||
|
`${workspaceId} — fatal provider error, remaining pages would fail ` +
|
||||||
|
`identically: ${describeProviderError(err)}`,
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
// Per-page isolation: one non-fatal failure (incl. an embedding timeout)
|
||||||
|
// must not abort the whole batch. A handled failure still advances the
|
||||||
|
// counter (matches the [position/total] log, so done reaches total).
|
||||||
|
failed++;
|
||||||
|
await this.reindexProgress.increment(workspaceId);
|
||||||
|
this.logger.error(
|
||||||
|
`reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` +
|
||||||
|
`after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.log(
|
||||||
|
`reindexWorkspace: done for workspace ${workspaceId}: ` +
|
||||||
|
`${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`,
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
// Always remove the progress record so the status reverts to the DB count.
|
||||||
|
await this.reindexProgress.clear(workspaceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Purge ALL embeddings for a workspace (WORKSPACE_DELETE_EMBEDDINGS). */
|
/** Purge ALL embeddings for a workspace (WORKSPACE_DELETE_EMBEDDINGS). */
|
||||||
|
|||||||
167
apps/server/src/database/repos/page/page.repo.embeddable.spec.ts
Normal file
167
apps/server/src/database/repos/page/page.repo.embeddable.spec.ts
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
import { PageRepo } from './page.repo';
|
||||||
|
import {
|
||||||
|
DummyDriver,
|
||||||
|
Kysely,
|
||||||
|
PostgresAdapter,
|
||||||
|
PostgresIntrospector,
|
||||||
|
PostgresQueryCompiler,
|
||||||
|
} from 'kysely';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* F6 regression guard for the embeddable-page predicate.
|
||||||
|
*
|
||||||
|
* The predicate is shared by `countEmbeddablePages` (the "Indexed N of M" coverage
|
||||||
|
* denominator) and `getEmbeddablePageIds` (the exact set a full reindex iterates).
|
||||||
|
* It MUST select pages whose `text_content` was never backfilled (null/empty) but
|
||||||
|
* whose ProseMirror `content` JSON still carries body text — `reindexPage` builds
|
||||||
|
* its chunks straight from `content`, so without a content clause such a page is
|
||||||
|
* silently SKIPPED by a mass reindex even though it is fully embeddable.
|
||||||
|
*
|
||||||
|
* The content clause keys on the structural text-node marker `"type":"text"`, NOT
|
||||||
|
* a bare `"text":` key. The bare key also appears as the `attrs.text` of atom
|
||||||
|
* nodes that carry NO extractable text — notably math (`mathBlock`/`mathInline`),
|
||||||
|
* whose LaTeX lives in `attrs.text` and has no `generateText` serializer. A
|
||||||
|
* math-ONLY page therefore yields empty `text_content` and zero embeddings; if the
|
||||||
|
* predicate matched its `attrs.text` it would land in the denominator but
|
||||||
|
* `reindexPage` would no-op on it, pinning "Indexed N of M" below 100% forever —
|
||||||
|
* the exact bug this feature fixes. The `"type":"text"` marker matches only real
|
||||||
|
* text nodes (what `jsonToText` extracts), keeping the predicate consistent with
|
||||||
|
* what gets indexed.
|
||||||
|
*
|
||||||
|
* There is no real Postgres here: a recording Kysely (DummyDriver wired to the
|
||||||
|
* Postgres query compiler) compiles the queries to SQL so we can assert the WHERE
|
||||||
|
* predicate ORs in the narrowed content clause alongside the existing text_content
|
||||||
|
* and stored-embeddings clauses — and that BOTH callers compile the identical
|
||||||
|
* clause (denominator and reindex set can never diverge).
|
||||||
|
*/
|
||||||
|
function makeRecordingDb() {
|
||||||
|
const sqls: string[] = [];
|
||||||
|
const db = new Kysely<any>({
|
||||||
|
dialect: {
|
||||||
|
createAdapter: () => new PostgresAdapter(),
|
||||||
|
createDriver: () =>
|
||||||
|
new (class extends DummyDriver {
|
||||||
|
async acquireConnection() {
|
||||||
|
return {
|
||||||
|
executeQuery: async (compiled: { sql: string }) => {
|
||||||
|
sqls.push(compiled.sql);
|
||||||
|
return { rows: [] };
|
||||||
|
},
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-empty-function
|
||||||
|
streamQuery: async function* () {},
|
||||||
|
} as any;
|
||||||
|
}
|
||||||
|
})(),
|
||||||
|
createIntrospector: (d: Kysely<any>) => new PostgresIntrospector(d),
|
||||||
|
createQueryCompiler: () => new PostgresQueryCompiler(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return { db, sqls };
|
||||||
|
}
|
||||||
|
|
||||||
|
// The narrowed content clause, as it appears in the compiled SQL. Keying on the
|
||||||
|
// structural `"type":"text"` marker (not a bare `"text":` key) is what excludes
|
||||||
|
// math-only pages whose only `"text"` key is the atom node's `attrs.text`.
|
||||||
|
const NARROWED_CLAUSE = `"type"[[:space:]]*:[[:space:]]*"text"`;
|
||||||
|
const BARE_TEXT_KEY = `"text"[[:space:]]*:`;
|
||||||
|
|
||||||
|
describe('PageRepo embeddable predicate — content-bearing pages (F6)', () => {
|
||||||
|
it('selects content-bearing pages via the narrowed "type":"text" node marker', async () => {
|
||||||
|
const { db, sqls } = makeRecordingDb();
|
||||||
|
const repo = new PageRepo(db as any, {} as any, { emit: jest.fn() } as any);
|
||||||
|
|
||||||
|
await repo.getEmbeddablePageIds('ws-1');
|
||||||
|
|
||||||
|
expect(sqls).toHaveLength(1);
|
||||||
|
const sql = sqls[0];
|
||||||
|
|
||||||
|
// Clause 1 (existing): pages with extractable text_content.
|
||||||
|
expect(sql).toContain('text_content');
|
||||||
|
// Clause 3 (the F6 fix, now narrowed): a page whose content JSON carries a
|
||||||
|
// real text node is selected even when text_content is null/empty, so a full
|
||||||
|
// reindex visits it instead of silently skipping it.
|
||||||
|
expect(sql).toContain('content::text');
|
||||||
|
expect(sql).toContain(NARROWED_CLAUSE);
|
||||||
|
// It must NOT use the old bare `"text":` key, which also matches the
|
||||||
|
// `attrs.text` of math-only atom pages (false-positive denominator inflation).
|
||||||
|
expect(sql).not.toContain(BARE_TEXT_KEY);
|
||||||
|
// Clause 2 (existing): pages that already have stored embeddings stay in the
|
||||||
|
// set so a reindex can clear their stale rows.
|
||||||
|
expect(sql.toLowerCase()).toContain('embeddings');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('countEmbeddablePages compiles the SAME narrowed clause as getEmbeddablePageIds', async () => {
|
||||||
|
// Consistency is the core requirement: the denominator (countEmbeddablePages)
|
||||||
|
// and the reindex set (getEmbeddablePageIds) MUST share the identical
|
||||||
|
// predicate, else the live "done" counter and the steady-state total diverge.
|
||||||
|
const { db, sqls } = makeRecordingDb();
|
||||||
|
const repo = new PageRepo(db as any, {} as any, { emit: jest.fn() } as any);
|
||||||
|
|
||||||
|
await repo.countEmbeddablePages('ws-1');
|
||||||
|
await repo.getEmbeddablePageIds('ws-1');
|
||||||
|
|
||||||
|
expect(sqls).toHaveLength(2);
|
||||||
|
const [countSql, idsSql] = sqls;
|
||||||
|
|
||||||
|
// Both carry the narrowed content clause...
|
||||||
|
expect(countSql).toContain(NARROWED_CLAUSE);
|
||||||
|
expect(idsSql).toContain(NARROWED_CLAUSE);
|
||||||
|
// ...neither carries the bare key...
|
||||||
|
expect(countSql).not.toContain(BARE_TEXT_KEY);
|
||||||
|
expect(idsSql).not.toContain(BARE_TEXT_KEY);
|
||||||
|
// ...and the full OR predicate (text_content + content node + embeddings
|
||||||
|
// EXISTS) is byte-identical between the two queries, so they can't drift.
|
||||||
|
const where = (s: string) => s.slice(s.indexOf('where'));
|
||||||
|
expect(where(countSql)).toEqual(where(idsSql));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('the content regex matches a text-bearing doc but NOT a math-only doc', () => {
|
||||||
|
// Semantic check of the predicate against sample `content::text` payloads.
|
||||||
|
// Note: `jsonb::text` is NOT identical to JSON.stringify — Postgres renders a
|
||||||
|
// space after each colon (`"type": "text"`), which is exactly why the POSIX
|
||||||
|
// clause uses `[[:space:]]*`. The clause `"type"[[:space:]]*:[[:space:]]*"text"`
|
||||||
|
// maps to the JS regex below (`[[:space:]]` -> `\s`, tolerating both forms);
|
||||||
|
// we evaluate it the way Postgres would.
|
||||||
|
const re = /"type"\s*:\s*"text"/;
|
||||||
|
|
||||||
|
// A real paragraph with a text node -> embeddable.
|
||||||
|
const textDoc = JSON.stringify({
|
||||||
|
type: 'doc',
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: 'paragraph',
|
||||||
|
content: [{ type: 'text', text: 'hello world' }],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
// A doc whose ONLY node is a math atom. Its LaTeX is in `attrs.text`, there is
|
||||||
|
// no text node, and `jsonToText`/`generateText` has no serializer for it -> it
|
||||||
|
// yields empty text_content and zero embeddings, so it must NOT qualify.
|
||||||
|
const mathOnlyDoc = JSON.stringify({
|
||||||
|
type: 'doc',
|
||||||
|
content: [
|
||||||
|
{ type: 'mathBlock', attrs: { text: 'E = mc^2' } },
|
||||||
|
{ type: 'mathInline', attrs: { text: '\\alpha' } },
|
||||||
|
],
|
||||||
|
});
|
||||||
|
// An empty doc has no text node either.
|
||||||
|
const emptyDoc = JSON.stringify({ type: 'doc', content: [] });
|
||||||
|
|
||||||
|
expect(re.test(textDoc)).toBe(true);
|
||||||
|
expect(re.test(mathOnlyDoc)).toBe(false);
|
||||||
|
expect(re.test(emptyDoc)).toBe(false);
|
||||||
|
// Sanity: the OLD bare-key regex WOULD have wrongly matched the math-only doc,
|
||||||
|
// which is precisely the false positive the narrowing removes.
|
||||||
|
expect(/"text"\s*:/.test(mathOnlyDoc)).toBe(true);
|
||||||
|
|
||||||
|
// A user literally TYPING `"type":"text"` in prose can't false-positive on an
|
||||||
|
// otherwise text-less page: in `content::text` the typed value's quotes are
|
||||||
|
// escaped (`\"type\":\"text\"`), so the literal-quote regex does not match the
|
||||||
|
// escaped form. (And such a page is a genuine text node anyway.)
|
||||||
|
const escapedLiteral = JSON.stringify({
|
||||||
|
type: 'doc',
|
||||||
|
content: [{ type: 'someAtom', attrs: { note: '"type":"text"' } }],
|
||||||
|
});
|
||||||
|
expect(re.test(escapedLiteral)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -12,6 +12,7 @@ import { executeWithCursorPagination } from '@docmost/db/pagination/cursor-pagin
|
|||||||
import { validate as isValidUUID } from 'uuid';
|
import { validate as isValidUUID } from 'uuid';
|
||||||
import { ExpressionBuilder, sql } from 'kysely';
|
import { ExpressionBuilder, sql } from 'kysely';
|
||||||
import { DB } from '@docmost/db/types/db';
|
import { DB } from '@docmost/db/types/db';
|
||||||
|
import { DbInterface } from '@docmost/db/types/db.interface';
|
||||||
import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres';
|
import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres';
|
||||||
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
|
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
|
||||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||||
@@ -233,9 +234,9 @@ export class PageRepo {
|
|||||||
* text-less pages (which legitimately store zero embeddings) don't keep the
|
* text-less pages (which legitimately store zero embeddings) don't keep the
|
||||||
* bar below 100% forever.
|
* bar below 100% forever.
|
||||||
*
|
*
|
||||||
* A page qualifies if it has non-empty textContent OR already has stored
|
* A page qualifies if it has non-empty textContent, OR its content JSON has at
|
||||||
* embeddings. The second clause covers pages whose text the indexer extracted
|
* least one text node (`"type":"text"`) when textContent was never backfilled,
|
||||||
* from the content JSON when textContent was null, and guarantees this total is
|
* OR it already has stored embeddings. The last clause guarantees this total is
|
||||||
* always >= countIndexedPages (the indexed count can never exceed it).
|
* always >= countIndexedPages (the indexed count can never exceed it).
|
||||||
*/
|
*/
|
||||||
async countEmbeddablePages(workspaceId: string): Promise<number> {
|
async countEmbeddablePages(workspaceId: string): Promise<number> {
|
||||||
@@ -243,37 +244,91 @@ export class PageRepo {
|
|||||||
.selectFrom('pages as p')
|
.selectFrom('pages as p')
|
||||||
.where('p.workspaceId', '=', workspaceId)
|
.where('p.workspaceId', '=', workspaceId)
|
||||||
.where('p.deletedAt', 'is', null)
|
.where('p.deletedAt', 'is', null)
|
||||||
.where((eb) =>
|
.where((eb) => this.embeddablePredicate(eb))
|
||||||
eb.or([
|
|
||||||
// Has extractable body text. The regex matches any non-whitespace
|
|
||||||
// character, mirroring the indexer's `text.trim().length === 0` check
|
|
||||||
// (raw SQL -> use the snake_case column name).
|
|
||||||
sql<boolean>`p.text_content ~ '[^[:space:]]'`,
|
|
||||||
// OR already has at least one (non-deleted) embedding row.
|
|
||||||
eb.exists(
|
|
||||||
eb
|
|
||||||
.selectFrom('pageEmbeddings as pe')
|
|
||||||
.select(sql`1`.as('one'))
|
|
||||||
.whereRef('pe.pageId', '=', 'p.id')
|
|
||||||
.where('pe.deletedAt', 'is', null),
|
|
||||||
),
|
|
||||||
]),
|
|
||||||
)
|
|
||||||
.select((eb) => eb.fn.countAll().as('count'))
|
.select((eb) => eb.fn.countAll().as('count'))
|
||||||
.executeTakeFirst();
|
.executeTakeFirst();
|
||||||
return Number(row?.count ?? 0);
|
return Number(row?.count ?? 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IDs of all non-deleted pages in a workspace. Used by the RAG bulk reindex to
|
* The "embeddable content" qualifying predicate, shared verbatim by
|
||||||
* (re)build embeddings for every existing page.
|
* countEmbeddablePages (the steady-state denominator) and getEmbeddablePageIds
|
||||||
|
* (the set the bulk reindex iterates). Both MUST use the exact same condition
|
||||||
|
* or the live total and steady-state total diverge — extracting it here is what
|
||||||
|
* guarantees that, replacing the previous hand-duplicated copy. Callers supply
|
||||||
|
* the trivial workspaceId/deletedAt filters inline; this returns only the
|
||||||
|
* non-trivial OR clause, evaluated against the `p` alias of `pages`.
|
||||||
|
*
|
||||||
|
* A page qualifies if it has non-empty textContent, OR its ProseMirror
|
||||||
|
* `content` JSON has at least one text node (`"type":"text"`) even though
|
||||||
|
* textContent was never backfilled, OR it already has a stored (non-deleted)
|
||||||
|
* embedding row.
|
||||||
*/
|
*/
|
||||||
async getIdsByWorkspace(workspaceId: string): Promise<string[]> {
|
private embeddablePredicate(
|
||||||
|
eb: ExpressionBuilder<DbInterface & { p: DbInterface['pages'] }, 'p'>,
|
||||||
|
) {
|
||||||
|
return eb.or([
|
||||||
|
// Has extractable body text. The regex matches any non-whitespace
|
||||||
|
// character, mirroring the indexer's `text.trim().length === 0` check
|
||||||
|
// (raw SQL -> use the snake_case column name).
|
||||||
|
sql<boolean>`p.text_content ~ '[^[:space:]]'`,
|
||||||
|
// OR the ProseMirror `content` JSON has at least one text node (`"type":
|
||||||
|
// "text"`) the indexer can extract, even when `text_content` is null/empty
|
||||||
|
// (never backfilled): `reindexPage` runs `jsonToText` (generateText) over
|
||||||
|
// `content`, which only emits the text of ProseMirror text nodes, so such a
|
||||||
|
// page IS embeddable and a full reindex MUST visit it (otherwise it is
|
||||||
|
// silently skipped). A text node always serialises as
|
||||||
|
// `{"type":"text","text":"..."}`, so we key on the structural `"type":
|
||||||
|
// "text"` marker — NOT a bare `"text":` key, which also appears as the
|
||||||
|
// `attrs.text` of atom nodes that carry NO extractable text (e.g. math
|
||||||
|
// `mathBlock`/`mathInline`, whose LaTeX lives in `attrs.text` and has no
|
||||||
|
// text serializer). A math-only page thus produces empty `text_content` and
|
||||||
|
// zero embeddings; matching its `attrs.text` here would wrongly inflate the
|
||||||
|
// denominator and keep "Indexed N of M" below 100% forever. An empty doc
|
||||||
|
// (no text nodes) has no `"type":"text"` and is correctly excluded. A user
|
||||||
|
// who literally types `"type":"text"` in their prose can't false-positive:
|
||||||
|
// in `content::text` that text value's quotes are escaped (`\"type\"...`),
|
||||||
|
// so the literal-quote regex won't match the escaped form (and such a page
|
||||||
|
// is a real text node anyway).
|
||||||
|
sql<boolean>`p.content::text ~ '"type"[[:space:]]*:[[:space:]]*"text"'`,
|
||||||
|
// OR already has at least one (non-deleted) embedding row.
|
||||||
|
eb.exists(
|
||||||
|
eb
|
||||||
|
.selectFrom('pageEmbeddings as pe')
|
||||||
|
.select(sql`1`.as('one'))
|
||||||
|
.whereRef('pe.pageId', '=', 'p.id')
|
||||||
|
.where('pe.deletedAt', 'is', null),
|
||||||
|
),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IDs of the EMBEDDABLE page set for a workspace — the exact same set that
|
||||||
|
* `countEmbeddablePages` counts (a page qualifies if it has non-empty
|
||||||
|
* textContent, OR content JSON with at least one text node (`"type":"text"`)
|
||||||
|
* and an empty/null textContent, OR already has a stored embedding row). The
|
||||||
|
* bulk reindex
|
||||||
|
* iterates THIS set so the live "done" counter reaches exactly
|
||||||
|
* `countEmbeddablePages` (the steady-state denominator), instead of iterating
|
||||||
|
* every non-deleted page (which would push the denominator above the
|
||||||
|
* steady-state value mid-run).
|
||||||
|
*
|
||||||
|
* IMPORTANT: the qualifying WHERE is shared with `countEmbeddablePages` via the
|
||||||
|
* private `embeddablePredicate` helper, so the two can no longer drift — if the
|
||||||
|
* embeddable definition changes, change it once there and both stay in lockstep
|
||||||
|
* (else the live total and steady-state total diverge again). Dropping
|
||||||
|
* text-less pages is correct: `reindexPage` no-ops on
|
||||||
|
* a page with no extractable content anyway, and a page that lost its text but
|
||||||
|
* still has stale embeddings IS in this set (the EXISTS clause), so it is still
|
||||||
|
* visited and its stale rows are cleared.
|
||||||
|
*/
|
||||||
|
async getEmbeddablePageIds(workspaceId: string): Promise<string[]> {
|
||||||
const rows = await this.db
|
const rows = await this.db
|
||||||
.selectFrom('pages')
|
.selectFrom('pages as p')
|
||||||
.select('id')
|
.select('p.id')
|
||||||
.where('workspaceId', '=', workspaceId)
|
.where('p.workspaceId', '=', workspaceId)
|
||||||
.where('deletedAt', 'is', null)
|
.where('p.deletedAt', 'is', null)
|
||||||
|
.where((eb) => this.embeddablePredicate(eb))
|
||||||
.execute();
|
.execute();
|
||||||
return rows.map((r) => r.id);
|
return rows.map((r) => r.id);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,12 @@
|
|||||||
import { parsePositiveInt } from './ai-settings.service';
|
import { AiSettingsService, parsePositiveInt } from './ai-settings.service';
|
||||||
|
import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo';
|
||||||
|
import { AiAgentRoleRepo } from '@docmost/db/repos/ai-agent-roles/ai-agent-roles.repo';
|
||||||
|
import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider-credentials.repo';
|
||||||
|
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
||||||
|
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||||
|
import { SecretBoxService } from '../crypto/secret-box';
|
||||||
|
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||||
|
import type { Queue } from 'bullmq';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Round-trip coercion for numeric `::text` provider settings (e.g.
|
* Round-trip coercion for numeric `::text` provider settings (e.g.
|
||||||
@@ -41,3 +49,198 @@ describe('parsePositiveInt', () => {
|
|||||||
expect(parsePositiveInt(42)).toBe(42);
|
expect(parsePositiveInt(42)).toBe(42);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getMasked must surface the LIVE reindex run progress while a reindex is active
|
||||||
|
* (so the "Indexed X of Y" counter can climb 0 -> total), and fall back to the
|
||||||
|
* steady-state DB coverage count (countIndexedPages / countEmbeddablePages) when
|
||||||
|
* no reindex is running. This is the server side of the fix for the counter that
|
||||||
|
* otherwise stays stuck at "478 of 478" the whole reindex.
|
||||||
|
*/
|
||||||
|
describe('AiSettingsService.getMasked reindex progress', () => {
|
||||||
|
const WORKSPACE_ID = 'ws-1';
|
||||||
|
|
||||||
|
function makeService() {
|
||||||
|
// No driver configured -> the credentials lookup is skipped, keeping the
|
||||||
|
// setup minimal; we only care about the indexed/total numbers here.
|
||||||
|
const workspaceRepo = {
|
||||||
|
findById: jest.fn().mockResolvedValue({ settings: {} }),
|
||||||
|
};
|
||||||
|
const aiAgentRoleRepo = {};
|
||||||
|
const aiProviderCredentialsRepo = { find: jest.fn() };
|
||||||
|
const pageEmbeddingRepo = {
|
||||||
|
countIndexedPages: jest.fn().mockResolvedValue(478),
|
||||||
|
};
|
||||||
|
const pageRepo = {
|
||||||
|
countEmbeddablePages: jest.fn().mockResolvedValue(478),
|
||||||
|
};
|
||||||
|
const secretBox = {};
|
||||||
|
const reindexProgress = {
|
||||||
|
get: jest.fn().mockResolvedValue(null),
|
||||||
|
};
|
||||||
|
const aiQueue = {};
|
||||||
|
|
||||||
|
const service = new AiSettingsService(
|
||||||
|
workspaceRepo as unknown as WorkspaceRepo,
|
||||||
|
aiAgentRoleRepo as unknown as AiAgentRoleRepo,
|
||||||
|
aiProviderCredentialsRepo as unknown as AiProviderCredentialsRepo,
|
||||||
|
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
||||||
|
pageRepo as unknown as PageRepo,
|
||||||
|
secretBox as unknown as SecretBoxService,
|
||||||
|
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||||
|
aiQueue as unknown as Queue,
|
||||||
|
);
|
||||||
|
return { service, reindexProgress, pageEmbeddingRepo };
|
||||||
|
}
|
||||||
|
|
||||||
|
it('reports the live run numbers when a reindex progress record is active', async () => {
|
||||||
|
const { service, reindexProgress } = makeService();
|
||||||
|
// Use a progress.total (500) DISTINCT from the DB count (478) so the test
|
||||||
|
// actually pins the progress.total branch rather than coincidentally
|
||||||
|
// matching the DB fallback. With fix #1 the two sources agree in practice,
|
||||||
|
// but getMasked must still return progress.total when a record is active.
|
||||||
|
reindexProgress.get.mockResolvedValue({
|
||||||
|
total: 500,
|
||||||
|
done: 120,
|
||||||
|
startedAt: Date.now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const masked = await service.getMasked(WORKSPACE_ID);
|
||||||
|
|
||||||
|
expect(masked.indexedPages).toBe(120); // progress.done, not DB 478
|
||||||
|
expect(masked.totalPages).toBe(500); // progress.total, not DB 478
|
||||||
|
expect(masked.reindexing).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('falls back to countIndexedPages when no reindex is active', async () => {
|
||||||
|
const { service, reindexProgress } = makeService();
|
||||||
|
reindexProgress.get.mockResolvedValue(null);
|
||||||
|
|
||||||
|
const masked = await service.getMasked(WORKSPACE_ID);
|
||||||
|
|
||||||
|
expect(masked.indexedPages).toBe(478);
|
||||||
|
expect(masked.totalPages).toBe(478);
|
||||||
|
expect(masked.reindexing).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* reindex() must seed a live progress record (done=0) BEFORE enqueueing so the
|
||||||
|
* first status poll shows 0 — but ONLY when no run is already active, since
|
||||||
|
* aiQueue.add() de-duplicates a running reindex and a re-seed would reset the
|
||||||
|
* visible counter to 0 while the live worker keeps incrementing from its real
|
||||||
|
* position.
|
||||||
|
*/
|
||||||
|
describe('AiSettingsService.reindex progress seed', () => {
|
||||||
|
const WORKSPACE_ID = 'ws-1';
|
||||||
|
|
||||||
|
function makeService() {
|
||||||
|
const order: string[] = [];
|
||||||
|
const aiQueue = {
|
||||||
|
remove: jest.fn().mockResolvedValue(undefined),
|
||||||
|
add: jest.fn().mockImplementation(async () => {
|
||||||
|
order.push('add');
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
const pageRepo = {
|
||||||
|
countEmbeddablePages: jest.fn().mockResolvedValue(478),
|
||||||
|
};
|
||||||
|
const reindexProgress = {
|
||||||
|
// Default: no active run -> seed should happen.
|
||||||
|
get: jest.fn().mockResolvedValue(null),
|
||||||
|
start: jest.fn().mockImplementation(async () => {
|
||||||
|
order.push('start');
|
||||||
|
}),
|
||||||
|
clear: jest.fn().mockResolvedValue(undefined),
|
||||||
|
};
|
||||||
|
|
||||||
|
const service = new AiSettingsService(
|
||||||
|
{} as unknown as WorkspaceRepo,
|
||||||
|
{} as unknown as AiAgentRoleRepo,
|
||||||
|
{} as unknown as AiProviderCredentialsRepo,
|
||||||
|
{} as unknown as PageEmbeddingRepo,
|
||||||
|
pageRepo as unknown as PageRepo,
|
||||||
|
{} as unknown as SecretBoxService,
|
||||||
|
reindexProgress as unknown as EmbeddingReindexProgressService,
|
||||||
|
aiQueue as unknown as Queue,
|
||||||
|
);
|
||||||
|
return { service, aiQueue, pageRepo, reindexProgress, order };
|
||||||
|
}
|
||||||
|
|
||||||
|
it('seeds progress (workspace, count) BEFORE enqueue when no run is active', async () => {
|
||||||
|
const { service, aiQueue, reindexProgress, order } = makeService();
|
||||||
|
|
||||||
|
await service.reindex(WORKSPACE_ID);
|
||||||
|
|
||||||
|
// The pre-seed carries the real page count AND a SHORT ttl (3rd arg) so a
|
||||||
|
// de-duplicated enqueue against a just-finishing job can't leave a phantom
|
||||||
|
// "reindexing: 0 of N" stuck for the full record TTL (F10).
|
||||||
|
expect(reindexProgress.start).toHaveBeenCalledWith(
|
||||||
|
WORKSPACE_ID,
|
||||||
|
478,
|
||||||
|
expect.any(Number),
|
||||||
|
);
|
||||||
|
const ttl = reindexProgress.start.mock.calls[0][2];
|
||||||
|
expect(ttl).toBeGreaterThan(0);
|
||||||
|
// Short pre-seed TTL, distinct from the full 1h (3600s) record TTL, but
|
||||||
|
// pinned to the client poll cap (120s) so a still-pending run can't expire
|
||||||
|
// into a false "done" while the client is still polling (F11).
|
||||||
|
expect(ttl).toBe(120);
|
||||||
|
expect(ttl).toBeLessThanOrEqual(120);
|
||||||
|
expect(aiQueue.add).toHaveBeenCalledTimes(1);
|
||||||
|
// Seed must precede the enqueue so the first poll already reports done=0.
|
||||||
|
expect(order).toEqual(['start', 'add']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT re-seed when a run is already active (mid-run re-trigger)', async () => {
|
||||||
|
const { service, aiQueue, reindexProgress } = makeService();
|
||||||
|
// An active record exists -> a second click must not reset the counter.
|
||||||
|
reindexProgress.get.mockResolvedValue({
|
||||||
|
total: 478,
|
||||||
|
done: 120,
|
||||||
|
startedAt: Date.now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
await service.reindex(WORKSPACE_ID);
|
||||||
|
|
||||||
|
expect(reindexProgress.start).not.toHaveBeenCalled();
|
||||||
|
// The enqueue still runs (and de-duplicates against the active job).
|
||||||
|
expect(aiQueue.add).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('clears the seed it just wrote and re-throws when enqueue fails', async () => {
|
||||||
|
const { service, aiQueue, reindexProgress } = makeService();
|
||||||
|
// This call seeds (get() is null) but the enqueue then blows up
|
||||||
|
// (Redis hiccup/shutdown) -> the worker never runs and never clear()s, so
|
||||||
|
// reindex() must roll back its own seed to avoid a 1h stuck "reindexing".
|
||||||
|
const boom = new Error('redis down');
|
||||||
|
aiQueue.add.mockRejectedValue(boom);
|
||||||
|
|
||||||
|
await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom);
|
||||||
|
|
||||||
|
expect(reindexProgress.start).toHaveBeenCalledWith(
|
||||||
|
WORKSPACE_ID,
|
||||||
|
478,
|
||||||
|
expect.any(Number),
|
||||||
|
);
|
||||||
|
expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does NOT clear a concurrent active run when enqueue fails (no seed)', async () => {
|
||||||
|
const { service, aiQueue, reindexProgress } = makeService();
|
||||||
|
// A run is already active, so THIS call does not seed; if the enqueue then
|
||||||
|
// fails it must NOT wipe the live worker's record.
|
||||||
|
reindexProgress.get.mockResolvedValue({
|
||||||
|
total: 478,
|
||||||
|
done: 120,
|
||||||
|
startedAt: Date.now(),
|
||||||
|
});
|
||||||
|
const boom = new Error('redis down');
|
||||||
|
aiQueue.add.mockRejectedValue(boom);
|
||||||
|
|
||||||
|
await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom);
|
||||||
|
|
||||||
|
expect(reindexProgress.start).not.toHaveBeenCalled();
|
||||||
|
expect(reindexProgress.clear).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import { AiProviderCredentialsRepo } from '@docmost/db/repos/ai-chat/ai-provider
|
|||||||
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
||||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||||
import { SecretBoxService } from '../crypto/secret-box';
|
import { SecretBoxService } from '../crypto/secret-box';
|
||||||
|
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||||
import {
|
import {
|
||||||
AiDriver,
|
AiDriver,
|
||||||
AiProviderSettings,
|
AiProviderSettings,
|
||||||
@@ -30,6 +31,30 @@ export function parsePositiveInt(raw: unknown): number | undefined {
|
|||||||
return Number.isFinite(n) && n > 0 ? Math.floor(n) : undefined;
|
return Number.isFinite(n) && n > 0 ? Math.floor(n) : undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TTL (seconds) for the enqueue-time progress PRE-SEED written by `reindex()`
|
||||||
|
* before the worker starts. Deliberately SHORT relative to the full 1h record
|
||||||
|
* TTL: if `aiQueue.add()` de-duplicates against a job that is just finishing
|
||||||
|
* (the worker's finally already ran `clear()` but removeOnComplete hasn't yet
|
||||||
|
* removed the job), no new worker runs to overwrite/clear this seed — so this
|
||||||
|
* shorter TTL lets the phantom "reindexing: 0 of N" expire instead of sticking
|
||||||
|
* for the full 1h record TTL. A worker that DOES start re-seeds with the full
|
||||||
|
* TTL, so a real run is unaffected.
|
||||||
|
*
|
||||||
|
* It MUST be >= the client poll cap (REINDEX_POLL_CAP_MS = 120000ms in
|
||||||
|
* ai-provider-settings.tsx) though: the AI_QUEUE worker runs at concurrency 1
|
||||||
|
* and shares the queue with page-level embedding jobs, so a queued reindex can
|
||||||
|
* wait well beyond a few dozen seconds before the worker re-seeds with the full
|
||||||
|
* TTL. If the pre-seed expired while the job is still pending, `get()` returns
|
||||||
|
* null and getMasked() falls back to the steady-state COUNT (indexedPages ==
|
||||||
|
* totalPages, reindexing=false) — the client reads that as "done & fully
|
||||||
|
* indexed", clears its deadline and STOPS polling, so the admin never sees the
|
||||||
|
* real climb. Pinning the pre-seed TTL to the client cap means a deduped phantom
|
||||||
|
* is bounded to ~120s — the same window the client already polls — and a genuine
|
||||||
|
* pending run never expires-into-"done" inside that window.
|
||||||
|
*/
|
||||||
|
const PRE_SEED_TTL_SECONDS = 120;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shape of the partial update accepted by `update`. Mirrors the validated
|
* Shape of the partial update accepted by `update`. Mirrors the validated
|
||||||
* controller DTO. `apiKey` / `embeddingApiKey` are write-only: undefined =
|
* controller DTO. `apiKey` / `embeddingApiKey` are write-only: undefined =
|
||||||
@@ -74,6 +99,7 @@ export class AiSettingsService {
|
|||||||
private readonly pageEmbeddingRepo: PageEmbeddingRepo,
|
private readonly pageEmbeddingRepo: PageEmbeddingRepo,
|
||||||
private readonly pageRepo: PageRepo,
|
private readonly pageRepo: PageRepo,
|
||||||
private readonly secretBox: SecretBoxService,
|
private readonly secretBox: SecretBoxService,
|
||||||
|
private readonly reindexProgress: EmbeddingReindexProgressService,
|
||||||
@InjectQueue(QueueName.AI_QUEUE) private readonly aiQueue: Queue,
|
@InjectQueue(QueueName.AI_QUEUE) private readonly aiQueue: Queue,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
@@ -100,21 +126,63 @@ export class AiSettingsService {
|
|||||||
.remove(`ai-search-disabled-${workspaceId}`)
|
.remove(`ai-search-disabled-${workspaceId}`)
|
||||||
.catch(() => undefined);
|
.catch(() => undefined);
|
||||||
|
|
||||||
|
// Seed a live progress record BEFORE enqueueing so the very first status
|
||||||
|
// poll already reports done=0 (the reindex POST returns the PRE-job counts,
|
||||||
|
// so without this seed the first poll would still show "total of total").
|
||||||
|
// `totalPages` uses countEmbeddablePages — the SAME set the worker iterates
|
||||||
|
// and the SAME denominator the status endpoint reports, so the live and
|
||||||
|
// steady-state totals match.
|
||||||
|
//
|
||||||
|
// ONLY seed when no run is active: aiQueue.add() de-duplicates an already-
|
||||||
|
// running reindex, so a mid-run re-trigger (second click / second admin /
|
||||||
|
// second tab) must NOT reset the visible counter to 0 — that would
|
||||||
|
// understate the live worker's real position for the rest of the run. The
|
||||||
|
// worker's own start() at run begin is the single authoritative reset.
|
||||||
|
let seeded = false;
|
||||||
|
if ((await this.reindexProgress.get(workspaceId)) === null) {
|
||||||
|
const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId);
|
||||||
|
// Short TTL (vs the full 1h record TTL): if add() below de-duplicates
|
||||||
|
// against a just-finishing job whose worker already clear()ed but isn't
|
||||||
|
// removed yet, no worker runs to clear this seed — the shorter TTL expires
|
||||||
|
// the phantom record rather than leaving a stuck "reindexing: 0 of N" for
|
||||||
|
// the full record TTL. It is kept >= the client poll cap (120s) so a
|
||||||
|
// genuine but still-pending run never expires into a false "done" while
|
||||||
|
// the client is still polling (see PRE_SEED_TTL_SECONDS).
|
||||||
|
await this.reindexProgress.start(
|
||||||
|
workspaceId,
|
||||||
|
totalPages,
|
||||||
|
PRE_SEED_TTL_SECONDS,
|
||||||
|
);
|
||||||
|
seeded = true;
|
||||||
|
}
|
||||||
|
|
||||||
const jobId = `ai-reindex-${workspaceId}`;
|
const jobId = `ai-reindex-${workspaceId}`;
|
||||||
// Clear a prior non-active entry so a stale job can't block this reindex.
|
// Clear a prior non-active entry so a stale job can't block this reindex.
|
||||||
// A locked/active job is left in place (remove() no-ops) and the add() below
|
// A locked/active job is left in place (remove() no-ops) and the add() below
|
||||||
// de-duplicates against it, keeping the in-progress pass.
|
// de-duplicates against it, keeping the in-progress pass.
|
||||||
await this.aiQueue.remove(jobId).catch(() => undefined);
|
await this.aiQueue.remove(jobId).catch(() => undefined);
|
||||||
|
|
||||||
await this.aiQueue.add(
|
try {
|
||||||
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
|
await this.aiQueue.add(
|
||||||
{ workspaceId },
|
QueueJob.WORKSPACE_CREATE_EMBEDDINGS,
|
||||||
{
|
{ workspaceId },
|
||||||
jobId,
|
{
|
||||||
removeOnComplete: true,
|
jobId,
|
||||||
removeOnFail: true,
|
removeOnComplete: true,
|
||||||
},
|
removeOnFail: true,
|
||||||
);
|
},
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
// If the enqueue fails (Redis hiccup/shutdown) the worker never runs, so
|
||||||
|
// its finally->clear() never fires. Roll back the seed WE just wrote so
|
||||||
|
// the status endpoint doesn't report a stuck "reindexing: 0 of N" for the
|
||||||
|
// full TTL. Only clear when this call did the seed — never wipe a
|
||||||
|
// concurrent active run's record (get() was non-null, seeded=false).
|
||||||
|
if (seeded) {
|
||||||
|
await this.reindexProgress.clear(workspaceId);
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -253,13 +321,33 @@ export class AiSettingsService {
|
|||||||
hasSttApiKey = !!creds?.sttApiKeyEnc;
|
hasSttApiKey = !!creds?.sttApiKeyEnc;
|
||||||
}
|
}
|
||||||
|
|
||||||
// totalPages now counts only pages with embeddable content (non-empty text
|
// While a reindex run is active, report its LIVE progress (done climbs 0 ->
|
||||||
// or already-stored embeddings), so empty/text-less pages don't keep the
|
// total) so the settings UI can watch it advance. Read progress FIRST and
|
||||||
// "Indexed N of M pages" bar below 100% forever.
|
// short-circuit: this endpoint is polled every ~5s for the whole run, so when
|
||||||
const [indexedPages, totalPages] = await Promise.all([
|
// a record is active we skip the two coverage COUNTs entirely (their results
|
||||||
this.pageEmbeddingRepo.countIndexedPages(workspaceId),
|
// would be discarded anyway). Without the live progress the counter never
|
||||||
this.pageRepo.countEmbeddablePages(workspaceId),
|
// drops: the per-page reindex hard-replaces rows in its own small
|
||||||
]);
|
// transaction, so countIndexedPages stays ~= total for the whole run. With no
|
||||||
|
// active record we fall back to the steady-state DB coverage count, which
|
||||||
|
// preserves the existing display and the client's "done == total -> stop
|
||||||
|
// polling" condition (the run ends -> record cleared -> DB count == total).
|
||||||
|
//
|
||||||
|
// The fallback `totalPages` counts only pages with embeddable content
|
||||||
|
// (non-empty text, content-borne text, or already-stored embeddings), so
|
||||||
|
// empty/text-less pages don't keep the "Indexed N of M pages" bar below 100%
|
||||||
|
// forever.
|
||||||
|
const progress = await this.reindexProgress.get(workspaceId);
|
||||||
|
let indexedPages: number;
|
||||||
|
let totalPages: number;
|
||||||
|
if (progress) {
|
||||||
|
indexedPages = progress.done;
|
||||||
|
totalPages = progress.total;
|
||||||
|
} else {
|
||||||
|
[indexedPages, totalPages] = await Promise.all([
|
||||||
|
this.pageEmbeddingRepo.countIndexedPages(workspaceId),
|
||||||
|
this.pageRepo.countEmbeddablePages(workspaceId),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
driver: provider.driver,
|
driver: provider.driver,
|
||||||
@@ -281,6 +369,8 @@ export class AiSettingsService {
|
|||||||
hasSttApiKey,
|
hasSttApiKey,
|
||||||
indexedPages,
|
indexedPages,
|
||||||
totalPages,
|
totalPages,
|
||||||
|
// Optional hint for the client: a reindex run is currently in progress.
|
||||||
|
reindexing: progress != null,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { QueueName } from '../queue/constants';
|
|||||||
import { AiService } from './ai.service';
|
import { AiService } from './ai.service';
|
||||||
import { AiSettingsService } from './ai-settings.service';
|
import { AiSettingsService } from './ai-settings.service';
|
||||||
import { AiSettingsController } from './ai-settings.controller';
|
import { AiSettingsController } from './ai-settings.controller';
|
||||||
|
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* LLM driver + provider-settings unit (§6.2/§6.4).
|
* LLM driver + provider-settings unit (§6.2/§6.4).
|
||||||
@@ -19,7 +20,7 @@ import { AiSettingsController } from './ai-settings.controller';
|
|||||||
BullModule.registerQueue({ name: QueueName.AI_QUEUE }),
|
BullModule.registerQueue({ name: QueueName.AI_QUEUE }),
|
||||||
],
|
],
|
||||||
controllers: [AiSettingsController],
|
controllers: [AiSettingsController],
|
||||||
providers: [AiService, AiSettingsService],
|
providers: [AiService, AiSettingsService, EmbeddingReindexProgressService],
|
||||||
exports: [AiService, AiSettingsService],
|
exports: [AiService, AiSettingsService, EmbeddingReindexProgressService],
|
||||||
})
|
})
|
||||||
export class AiModule {}
|
export class AiModule {}
|
||||||
|
|||||||
@@ -146,4 +146,7 @@ export interface MaskedAiSettings {
|
|||||||
// RAG indexing coverage for the settings UI.
|
// RAG indexing coverage for the settings UI.
|
||||||
indexedPages: number;
|
indexedPages: number;
|
||||||
totalPages: number;
|
totalPages: number;
|
||||||
|
// True while a full workspace reindex is actively running (the counts above
|
||||||
|
// then reflect the live run progress rather than the steady-state DB count).
|
||||||
|
reindexing?: boolean;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,179 @@
|
|||||||
|
import { EmbeddingReindexProgressService } from './embedding-reindex-progress.service';
|
||||||
|
import type { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||||
|
import type { Redis } from 'ioredis';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for the Redis-backed reindex-progress store.
|
||||||
|
*
|
||||||
|
* The store is a thin, BEST-EFFORT wrapper: writes (start/increment) issue an
|
||||||
|
* hset/hincrby + expire pipeline and must SWALLOW Redis errors (progress is
|
||||||
|
* cosmetic — it must never break a reindex); reads (get) must map a valid hash
|
||||||
|
* to a ReindexProgress and degrade to null on a malformed/missing record or a
|
||||||
|
* Redis failure. We drive it with a hand-rolled fake ioredis (the project mocks
|
||||||
|
* Redis with plain fakes, see public-share limiter specs).
|
||||||
|
*/
|
||||||
|
describe('EmbeddingReindexProgressService', () => {
|
||||||
|
const WORKSPACE_ID = 'ws-1';
|
||||||
|
const KEY = 'ai:reindex:progress:ws-1';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a fake ioredis whose `multi()` returns a chainable recorder and whose
|
||||||
|
* `hgetall`/`del` are configurable jest mocks. `execImpl` lets a test make the
|
||||||
|
* pipeline reject (to assert error-swallowing).
|
||||||
|
*/
|
||||||
|
function makeRedis(opts: { execImpl?: () => Promise<unknown> } = {}) {
|
||||||
|
const exec = jest
|
||||||
|
.fn()
|
||||||
|
.mockImplementation(opts.execImpl ?? (() => Promise.resolve([])));
|
||||||
|
// mockReturnThis() returns the call's `this` (the multi object), so the
|
||||||
|
// chain hset().expire().exec() resolves correctly.
|
||||||
|
const multiObj = {
|
||||||
|
hset: jest.fn().mockReturnThis(),
|
||||||
|
hincrby: jest.fn().mockReturnThis(),
|
||||||
|
expire: jest.fn().mockReturnThis(),
|
||||||
|
exec,
|
||||||
|
};
|
||||||
|
const multi = jest.fn(() => multiObj);
|
||||||
|
const hgetall = jest.fn().mockResolvedValue({});
|
||||||
|
const del = jest.fn().mockResolvedValue(1);
|
||||||
|
const redis = { multi, hgetall, del } as unknown as Redis;
|
||||||
|
return { redis, multiObj, multi, hgetall, del, exec };
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeService(redis: Redis) {
|
||||||
|
const redisService = {
|
||||||
|
getOrThrow: () => redis,
|
||||||
|
} as unknown as RedisService;
|
||||||
|
return new EmbeddingReindexProgressService(redisService);
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('get', () => {
|
||||||
|
it('maps a valid hash to a ReindexProgress object', async () => {
|
||||||
|
const { redis, hgetall } = makeRedis();
|
||||||
|
hgetall.mockResolvedValue({ total: '478', done: '120', startedAt: '1000' });
|
||||||
|
const service = makeService(redis);
|
||||||
|
|
||||||
|
await expect(service.get(WORKSPACE_ID)).resolves.toEqual({
|
||||||
|
total: 478,
|
||||||
|
done: 120,
|
||||||
|
startedAt: 1000,
|
||||||
|
});
|
||||||
|
expect(hgetall).toHaveBeenCalledWith(KEY);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns null for an empty hash (no record)', async () => {
|
||||||
|
const { redis, hgetall } = makeRedis();
|
||||||
|
hgetall.mockResolvedValue({});
|
||||||
|
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns null when `total` is missing (partial record)', async () => {
|
||||||
|
const { redis, hgetall } = makeRedis();
|
||||||
|
hgetall.mockResolvedValue({ done: '5' });
|
||||||
|
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns null for a non-numeric total', async () => {
|
||||||
|
const { redis, hgetall } = makeRedis();
|
||||||
|
hgetall.mockResolvedValue({ total: 'abc', done: '1', startedAt: '1' });
|
||||||
|
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns null for a non-numeric done', async () => {
|
||||||
|
const { redis, hgetall } = makeRedis();
|
||||||
|
hgetall.mockResolvedValue({ total: '10', done: 'xyz', startedAt: '1' });
|
||||||
|
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('coerces a non-finite startedAt to 0', async () => {
|
||||||
|
const { redis, hgetall } = makeRedis();
|
||||||
|
hgetall.mockResolvedValue({ total: '10', done: '2', startedAt: 'nope' });
|
||||||
|
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toEqual({
|
||||||
|
total: 10,
|
||||||
|
done: 2,
|
||||||
|
startedAt: 0,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('degrades to null when hgetall throws (degradation contract)', async () => {
|
||||||
|
const { redis, hgetall } = makeRedis();
|
||||||
|
hgetall.mockRejectedValue(new Error('redis down'));
|
||||||
|
await expect(makeService(redis).get(WORKSPACE_ID)).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('start', () => {
|
||||||
|
it('issues hset + expire on the workspace key', async () => {
|
||||||
|
const { redis, multiObj } = makeRedis();
|
||||||
|
await makeService(redis).start(WORKSPACE_ID, 478);
|
||||||
|
|
||||||
|
expect(multiObj.hset).toHaveBeenCalledWith(
|
||||||
|
KEY,
|
||||||
|
expect.objectContaining({ total: '478', done: '0' }),
|
||||||
|
);
|
||||||
|
expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number));
|
||||||
|
expect(multiObj.exec).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('defaults the expire TTL to the full 1h record TTL', async () => {
|
||||||
|
const { redis, multiObj } = makeRedis();
|
||||||
|
await makeService(redis).start(WORKSPACE_ID, 478);
|
||||||
|
// Default ttl = full record TTL (60 * 60) so a real run never expires
|
||||||
|
// mid-flight before the worker refreshes it on each increment.
|
||||||
|
expect(multiObj.expire).toHaveBeenCalledWith(KEY, 60 * 60);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('honours an explicit short ttlSeconds for the enqueue-time pre-seed (F10)', async () => {
|
||||||
|
const { redis, multiObj } = makeRedis();
|
||||||
|
// The reindex() pre-seed passes a short ttl so a phantom record left by a
|
||||||
|
// de-duplicated enqueue expires in seconds, not after the full 1h TTL.
|
||||||
|
await makeService(redis).start(WORKSPACE_ID, 478, 45);
|
||||||
|
expect(multiObj.expire).toHaveBeenCalledWith(KEY, 45);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('swallows a thrown Redis error (best-effort)', async () => {
|
||||||
|
const { redis } = makeRedis({
|
||||||
|
execImpl: () => Promise.reject(new Error('redis down')),
|
||||||
|
});
|
||||||
|
await expect(
|
||||||
|
makeService(redis).start(WORKSPACE_ID, 1),
|
||||||
|
).resolves.toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('increment', () => {
|
||||||
|
it('issues hincrby + expire on the workspace key', async () => {
|
||||||
|
const { redis, multiObj } = makeRedis();
|
||||||
|
await makeService(redis).increment(WORKSPACE_ID);
|
||||||
|
|
||||||
|
expect(multiObj.hincrby).toHaveBeenCalledWith(KEY, 'done', 1);
|
||||||
|
expect(multiObj.expire).toHaveBeenCalledWith(KEY, expect.any(Number));
|
||||||
|
expect(multiObj.exec).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('swallows a thrown Redis error (best-effort)', async () => {
|
||||||
|
const { redis } = makeRedis({
|
||||||
|
execImpl: () => Promise.reject(new Error('redis down')),
|
||||||
|
});
|
||||||
|
await expect(
|
||||||
|
makeService(redis).increment(WORKSPACE_ID),
|
||||||
|
).resolves.toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('clear', () => {
|
||||||
|
it('deletes the workspace key', async () => {
|
||||||
|
const { redis, del } = makeRedis();
|
||||||
|
await makeService(redis).clear(WORKSPACE_ID);
|
||||||
|
expect(del).toHaveBeenCalledWith(KEY);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('swallows a thrown Redis error (best-effort)', async () => {
|
||||||
|
const { redis, del } = makeRedis();
|
||||||
|
del.mockRejectedValue(new Error('redis down'));
|
||||||
|
await expect(
|
||||||
|
makeService(redis).clear(WORKSPACE_ID),
|
||||||
|
).resolves.toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,162 @@
|
|||||||
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
|
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
|
||||||
|
import type { Redis } from 'ioredis';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Live progress of an in-flight workspace embeddings reindex run.
|
||||||
|
* `total` is the number of pages the run will process, `done` how many it has
|
||||||
|
* already processed (success OR handled failure), `startedAt` the epoch-ms the
|
||||||
|
* record was created.
|
||||||
|
*/
|
||||||
|
export interface ReindexProgress {
|
||||||
|
total: number;
|
||||||
|
done: number;
|
||||||
|
startedAt: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Redis key namespace for the per-workspace reindex-progress record. */
|
||||||
|
const KEY_PREFIX = 'ai:reindex:progress:';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TTL (seconds) on the progress record so a crashed/aborted worker that never
|
||||||
|
* reaches its `clear()` finally can still self-clean instead of leaving a stuck
|
||||||
|
* "reindexing" state. Refreshed on every increment so a long run never expires
|
||||||
|
* mid-flight; on a crash it disappears within TTL of the last processed page.
|
||||||
|
*
|
||||||
|
* INTENTIONALLY tied to WRITE progress (start/increment) only — never refreshed
|
||||||
|
* on get(). Refreshing on read would keep a dead worker's record alive forever
|
||||||
|
* as long as a client keeps polling (a permanently stuck reindexing:true). The
|
||||||
|
* clear() in the worker's finally handles normal completion; a dead worker's
|
||||||
|
* record expires after TTL, and the client's own poll cap stops polling anyway.
|
||||||
|
*/
|
||||||
|
const TTL_SECONDS = 60 * 60; // 1h
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cluster-wide store for the live progress of a workspace embeddings reindex.
|
||||||
|
*
|
||||||
|
* The reindex runs in a BullMQ worker (AI_QUEUE) that may be a DIFFERENT process
|
||||||
|
* than the API handling the settings-status GET, so the progress must live in
|
||||||
|
* the shared Redis — we reuse the same global ioredis client (RedisService from
|
||||||
|
* @nestjs-labs/nestjs-ioredis) that backs BullMQ and the other anti-abuse
|
||||||
|
* limiters, adding NO new Redis config.
|
||||||
|
*
|
||||||
|
* Everything here is best-effort and COSMETIC: progress only drives the "Indexed
|
||||||
|
* X of Y" counter while a reindex is running. Any Redis failure degrades to the
|
||||||
|
* existing steady-state behaviour (the status falls back to the DB coverage
|
||||||
|
* count), so reads fail to `null` and writes are swallowed — a reindex must
|
||||||
|
* never break because progress reporting did.
|
||||||
|
*
|
||||||
|
* Stored as a Redis HASH so `done` can be bumped with an atomic HINCRBY (the
|
||||||
|
* worker is the only writer of `done`, but HINCRBY also keeps us off a
|
||||||
|
* read-modify-write race and preserves the other fields).
|
||||||
|
*/
|
||||||
|
@Injectable()
|
||||||
|
export class EmbeddingReindexProgressService {
|
||||||
|
private readonly logger = new Logger(EmbeddingReindexProgressService.name);
|
||||||
|
private readonly redis: Redis;
|
||||||
|
|
||||||
|
constructor(redisService: RedisService) {
|
||||||
|
this.redis = redisService.getOrThrow();
|
||||||
|
}
|
||||||
|
|
||||||
|
private key(workspaceId: string): string {
|
||||||
|
return KEY_PREFIX + workspaceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin (or reset) the progress record for a workspace: `total` pages, `done`
|
||||||
|
* back to 0, `startedAt` now. Called twice for a run, BOTH with the real page
|
||||||
|
* count (countEmbeddablePages) so the two totals coincide: once at reindex
|
||||||
|
* enqueue time (so the very first status poll already reports done=0) and again
|
||||||
|
* at the worker start (which re-asserts the same total and resets `done`).
|
||||||
|
* Resets `done` to 0 so a re-trigger never inherits a stale count.
|
||||||
|
*
|
||||||
|
* `ttlSeconds` lets the caller pick the record's lifetime. The enqueue-time
|
||||||
|
* pre-seed passes a SHORT ttl: if `aiQueue.add()` de-duplicates against a job
|
||||||
|
* that is just finishing (its worker hasn't yet removed the job but already
|
||||||
|
* ran its `clear()`), no new worker starts to clear this phantom seed, so a
|
||||||
|
* short ttl lets it expire in seconds instead of sticking for the full TTL.
|
||||||
|
* The worker's own `start()` at the begin of a real run overwrites this entry
|
||||||
|
* and raises the ttl back to the default full TTL.
|
||||||
|
*/
|
||||||
|
async start(
|
||||||
|
workspaceId: string,
|
||||||
|
total: number,
|
||||||
|
ttlSeconds: number = TTL_SECONDS,
|
||||||
|
): Promise<void> {
|
||||||
|
const key = this.key(workspaceId);
|
||||||
|
try {
|
||||||
|
await this.redis
|
||||||
|
.multi()
|
||||||
|
.hset(key, {
|
||||||
|
total: String(total),
|
||||||
|
done: '0',
|
||||||
|
startedAt: String(Date.now()),
|
||||||
|
})
|
||||||
|
.expire(key, ttlSeconds)
|
||||||
|
.exec();
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.warn(
|
||||||
|
`reindex-progress start failed for workspace ${workspaceId}; ` +
|
||||||
|
`progress reporting disabled for this run: ${(err as Error).message}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bump the processed-page counter by one and refresh the TTL. Atomic and
|
||||||
|
* best-effort: a missing key (cleared/expired) would be recreated with only
|
||||||
|
* `done`, but `get()` treats a record without a numeric `total` as inactive,
|
||||||
|
* so that partial state safely reads as "no active reindex".
|
||||||
|
*/
|
||||||
|
async increment(workspaceId: string): Promise<void> {
|
||||||
|
const key = this.key(workspaceId);
|
||||||
|
try {
|
||||||
|
await this.redis.multi().hincrby(key, 'done', 1).expire(key, TTL_SECONDS).exec();
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.warn(
|
||||||
|
`reindex-progress increment failed for workspace ${workspaceId}: ` +
|
||||||
|
`${(err as Error).message}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the progress record. Called in the worker's `finally` so a completed,
|
||||||
|
* aborted, or unconfigured-early-return run never leaves a stuck record; the
|
||||||
|
* status then falls back to the DB coverage count.
|
||||||
|
*/
|
||||||
|
async clear(workspaceId: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
await this.redis.del(this.key(workspaceId));
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.warn(
|
||||||
|
`reindex-progress clear failed for workspace ${workspaceId} ` +
|
||||||
|
`(self-cleans via TTL): ${(err as Error).message}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the live progress, or `null` when no reindex is active (no record, an
|
||||||
|
* expired record, or a partial record without a numeric `total`). On a Redis
|
||||||
|
* error returns `null` so the status endpoint degrades to its DB count.
|
||||||
|
*/
|
||||||
|
async get(workspaceId: string): Promise<ReindexProgress | null> {
|
||||||
|
try {
|
||||||
|
const data = await this.redis.hgetall(this.key(workspaceId));
|
||||||
|
if (!data || data.total === undefined) return null;
|
||||||
|
const total = Number(data.total);
|
||||||
|
const done = Number(data.done);
|
||||||
|
const startedAt = Number(data.startedAt);
|
||||||
|
if (!Number.isFinite(total) || !Number.isFinite(done)) return null;
|
||||||
|
return { total, done, startedAt: Number.isFinite(startedAt) ? startedAt : 0 };
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.warn(
|
||||||
|
`reindex-progress read failed for workspace ${workspaceId}; ` +
|
||||||
|
`falling back to DB count: ${(err as Error).message}`,
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,159 @@
|
|||||||
|
import { Kysely } from 'kysely';
|
||||||
|
import { randomUUID } from 'node:crypto';
|
||||||
|
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||||
|
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
|
||||||
|
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||||
|
import { getTestDb, destroyTestDb, createWorkspace, createSpace } from './db';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* `PageRepo.getEmbeddablePageIds` MUST stay in lockstep with
|
||||||
|
* `PageRepo.countEmbeddablePages` (page.repo.ts) — the bulk reindex iterates the
|
||||||
|
* ID set while the status endpoint reports the count as the live denominator, so
|
||||||
|
* if the two predicates ever diverge the "done X of Y" counter ends on the wrong
|
||||||
|
* total. Both share the SAME WHERE: a page qualifies iff it is non-deleted AND
|
||||||
|
* (text_content has a non-whitespace char OR it has a non-deleted embedding row).
|
||||||
|
*
|
||||||
|
* This is a DB-level invariant: the predicate lives in raw SQL (`text_content ~
|
||||||
|
* '[^[:space:]]'`) and an EXISTS subquery, so a unit test with mocked Kysely
|
||||||
|
* cannot observe it. We seed every boundary case against real Postgres and
|
||||||
|
* assert the returned ID set EQUALS the count (and is exactly the expected set).
|
||||||
|
* A future edit that touches one predicate but not the other turns this red.
|
||||||
|
*/
|
||||||
|
describe('PageRepo embeddable-page set: getEmbeddablePageIds <-> countEmbeddablePages [integration]', () => {
|
||||||
|
let db: Kysely<any>;
|
||||||
|
let repo: PageRepo;
|
||||||
|
let workspaceId: string;
|
||||||
|
let spaceId: string;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
db = getTestDb();
|
||||||
|
// Only the Kysely-backed query methods under test are exercised, so the
|
||||||
|
// SpaceMemberRepo / EventEmitter2 deps are never touched — stub them.
|
||||||
|
repo = new PageRepo(
|
||||||
|
db as any,
|
||||||
|
{} as unknown as SpaceMemberRepo,
|
||||||
|
{} as unknown as EventEmitter2,
|
||||||
|
);
|
||||||
|
workspaceId = (await createWorkspace(db)).id;
|
||||||
|
spaceId = (await createSpace(db, workspaceId)).id;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await destroyTestDb();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Insert a page with explicit text_content / content / deleted_at (createPage
|
||||||
|
// in db.ts sets none), returning its id so the test can assert membership.
|
||||||
|
// `content` is the ProseMirror doc JSON (jsonb): postgres.js serializes a plain
|
||||||
|
// object to JSON for jsonb columns, so we pass it through only when supplied so
|
||||||
|
// the rest of the rows keep the DB default.
|
||||||
|
async function insertPage(args: {
|
||||||
|
textContent: string | null;
|
||||||
|
content?: unknown;
|
||||||
|
deletedAt?: Date | null;
|
||||||
|
}): Promise<string> {
|
||||||
|
const id = randomUUID();
|
||||||
|
await db
|
||||||
|
.insertInto('pages')
|
||||||
|
.values({
|
||||||
|
id,
|
||||||
|
slugId: `slug-${id.slice(0, 8)}`,
|
||||||
|
title: `page-${id.slice(0, 8)}`,
|
||||||
|
spaceId,
|
||||||
|
workspaceId,
|
||||||
|
textContent: args.textContent,
|
||||||
|
...(args.content !== undefined ? { content: args.content as any } : {}),
|
||||||
|
deletedAt: args.deletedAt ?? null,
|
||||||
|
})
|
||||||
|
.execute();
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert one embedding chunk row for a page (NOT NULL columns + deleted_at).
|
||||||
|
async function insertEmbedding(
|
||||||
|
pageId: string,
|
||||||
|
opts: { deletedAt?: Date | null } = {},
|
||||||
|
): Promise<void> {
|
||||||
|
await db
|
||||||
|
.insertInto('pageEmbeddings')
|
||||||
|
.values({
|
||||||
|
id: randomUUID(),
|
||||||
|
workspaceId,
|
||||||
|
pageId,
|
||||||
|
spaceId,
|
||||||
|
chunkIndex: 0,
|
||||||
|
chunkStart: 0,
|
||||||
|
chunkLength: 1,
|
||||||
|
content: 'x',
|
||||||
|
modelName: 'test-model',
|
||||||
|
modelDimensions: 1,
|
||||||
|
deletedAt: opts.deletedAt ?? null,
|
||||||
|
})
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
it('returns exactly the embeddable set and its size equals countEmbeddablePages', async () => {
|
||||||
|
// IN the set --------------------------------------------------------------
|
||||||
|
// (a) non-deleted page with real body text.
|
||||||
|
const withText = await insertPage({ textContent: 'hello world' });
|
||||||
|
// (b) non-deleted page with NO text but a live embedding row (EXISTS clause:
|
||||||
|
// a page that lost its text yet still has stale vectors must be visited
|
||||||
|
// so the reindex can clear them).
|
||||||
|
const noTextLiveEmbedding = await insertPage({ textContent: null });
|
||||||
|
await insertEmbedding(noTextLiveEmbedding);
|
||||||
|
// (c) non-deleted page with EMPTY text_content but ProseMirror `content` JSON
|
||||||
|
// carrying a real text node — the content-JSON clause. This pins BOTH the
|
||||||
|
// third OR-clause AND the space-after-colon: jsonb stores the key/value
|
||||||
|
// separator as `"type": "text"` (a space after the colon), which is why
|
||||||
|
// the predicate needs `[[:space:]]*`. `reindexPage` extracts this text, so
|
||||||
|
// the page IS embeddable and the reindex MUST visit it.
|
||||||
|
const noTextContentDoc = await insertPage({
|
||||||
|
textContent: null,
|
||||||
|
content: {
|
||||||
|
type: 'doc',
|
||||||
|
content: [
|
||||||
|
{ type: 'paragraph', content: [{ type: 'text', text: 'hello' }] },
|
||||||
|
],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// OUT of the set ----------------------------------------------------------
|
||||||
|
// (d) non-deleted, text_content NULL, no embeddings.
|
||||||
|
await insertPage({ textContent: null });
|
||||||
|
// (e) non-deleted, whitespace-only text (regex requires a non-space char).
|
||||||
|
await insertPage({ textContent: ' \n\t ' });
|
||||||
|
// (f) deleted page WITH body text — excluded by the non-deleted predicate.
|
||||||
|
await insertPage({
|
||||||
|
textContent: 'deleted but had text',
|
||||||
|
deletedAt: new Date(),
|
||||||
|
});
|
||||||
|
// (g) non-deleted, no text, with ONLY a DELETED embedding row — the EXISTS
|
||||||
|
// subquery filters pe.deleted_at IS NULL, so this stays out.
|
||||||
|
const onlyDeletedEmbedding = await insertPage({ textContent: null });
|
||||||
|
await insertEmbedding(onlyDeletedEmbedding, { deletedAt: new Date() });
|
||||||
|
// (h) non-deleted, empty text_content, content JSON with ONLY a math atom
|
||||||
|
// node — its LaTeX lives in `attrs.text` (a `"text":` KEY, not a
|
||||||
|
// `"type":"text"` text node) and has no text serializer, so `jsonToText`
|
||||||
|
// yields nothing and the page produces zero embeddings. The predicate
|
||||||
|
// keys on the structural `"type":"text"` marker, so this stays OUT (a
|
||||||
|
// bare `"text":` match would wrongly inflate the denominator).
|
||||||
|
await insertPage({
|
||||||
|
textContent: null,
|
||||||
|
content: {
|
||||||
|
type: 'doc',
|
||||||
|
content: [{ type: 'mathBlock', attrs: { text: 'E=mc^2' } }],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const ids = await repo.getEmbeddablePageIds(workspaceId);
|
||||||
|
const count = await repo.countEmbeddablePages(workspaceId);
|
||||||
|
|
||||||
|
// The two queries agree on the size (the load-bearing lockstep invariant)...
|
||||||
|
expect(ids.length).toBe(count);
|
||||||
|
// ...and the set is exactly the three qualifying pages, nothing else.
|
||||||
|
expect(new Set(ids)).toEqual(
|
||||||
|
new Set([withText, noTextLiveEmbedding, noTextContentDoc]),
|
||||||
|
);
|
||||||
|
expect(count).toBe(3);
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user