diff --git a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx index dac956c2..f9e5ee76 100644 --- a/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx +++ b/apps/client/src/features/workspace/components/settings/components/ai-provider-settings.tsx @@ -198,16 +198,18 @@ export function nextReindexPollInterval(args: { if (now > deadline) return false; // Active run → keep polling even if the momentary counts already look full. if (status?.reindexing) return intervalMs; - // Finished and fully indexed (incl. an empty workspace, 0 >= 0) → stop. - if (status && status.indexedPages >= status.totalPages) return false; + // Finished and fully indexed (incl. an empty workspace, 0 >= 0) → stop. Reuse + // isReindexComplete so the completeness check lives in exactly one place. + if (isReindexComplete(status)) return false; // Within the deadline and not yet done → keep polling. return intervalMs; } /** * Whether the reindex poll deadline should be cleared: the server reports no - * active run AND the count is complete. Mirrors the stop condition of - * `nextReindexPollInterval` (sans the cap, which the effect handles via time). + * active run AND the count is complete. The single source of truth for the + * "reindex finished" check — `nextReindexPollInterval` reuses it for its stop + * condition (sans the cap, which the effect handles via time). */ export function isReindexComplete(status?: ReindexStatus): boolean { return ( diff --git a/apps/server/src/database/repos/page/page.repo.embeddable.spec.ts b/apps/server/src/database/repos/page/page.repo.embeddable.spec.ts new file mode 100644 index 00000000..1fb1828f --- /dev/null +++ b/apps/server/src/database/repos/page/page.repo.embeddable.spec.ts @@ -0,0 +1,167 @@ +import { PageRepo } from './page.repo'; +import { + DummyDriver, + Kysely, + PostgresAdapter, + PostgresIntrospector, + PostgresQueryCompiler, +} from 'kysely'; + +/** + * F6 regression guard for the embeddable-page predicate. + * + * The predicate is shared by `countEmbeddablePages` (the "Indexed N of M" coverage + * denominator) and `getEmbeddablePageIds` (the exact set a full reindex iterates). + * It MUST select pages whose `text_content` was never backfilled (null/empty) but + * whose ProseMirror `content` JSON still carries body text — `reindexPage` builds + * its chunks straight from `content`, so without a content clause such a page is + * silently SKIPPED by a mass reindex even though it is fully embeddable. + * + * The content clause keys on the structural text-node marker `"type":"text"`, NOT + * a bare `"text":` key. The bare key also appears as the `attrs.text` of atom + * nodes that carry NO extractable text — notably math (`mathBlock`/`mathInline`), + * whose LaTeX lives in `attrs.text` and has no `generateText` serializer. A + * math-ONLY page therefore yields empty `text_content` and zero embeddings; if the + * predicate matched its `attrs.text` it would land in the denominator but + * `reindexPage` would no-op on it, pinning "Indexed N of M" below 100% forever — + * the exact bug this feature fixes. The `"type":"text"` marker matches only real + * text nodes (what `jsonToText` extracts), keeping the predicate consistent with + * what gets indexed. + * + * There is no real Postgres here: a recording Kysely (DummyDriver wired to the + * Postgres query compiler) compiles the queries to SQL so we can assert the WHERE + * predicate ORs in the narrowed content clause alongside the existing text_content + * and stored-embeddings clauses — and that BOTH callers compile the identical + * clause (denominator and reindex set can never diverge). + */ +function makeRecordingDb() { + const sqls: string[] = []; + const db = new Kysely({ + dialect: { + createAdapter: () => new PostgresAdapter(), + createDriver: () => + new (class extends DummyDriver { + async acquireConnection() { + return { + executeQuery: async (compiled: { sql: string }) => { + sqls.push(compiled.sql); + return { rows: [] }; + }, + // eslint-disable-next-line @typescript-eslint/no-empty-function + streamQuery: async function* () {}, + } as any; + } + })(), + createIntrospector: (d: Kysely) => new PostgresIntrospector(d), + createQueryCompiler: () => new PostgresQueryCompiler(), + }, + }); + return { db, sqls }; +} + +// The narrowed content clause, as it appears in the compiled SQL. Keying on the +// structural `"type":"text"` marker (not a bare `"text":` key) is what excludes +// math-only pages whose only `"text"` key is the atom node's `attrs.text`. +const NARROWED_CLAUSE = `"type"[[:space:]]*:[[:space:]]*"text"`; +const BARE_TEXT_KEY = `"text"[[:space:]]*:`; + +describe('PageRepo embeddable predicate — content-bearing pages (F6)', () => { + it('selects content-bearing pages via the narrowed "type":"text" node marker', async () => { + const { db, sqls } = makeRecordingDb(); + const repo = new PageRepo(db as any, {} as any, { emit: jest.fn() } as any); + + await repo.getEmbeddablePageIds('ws-1'); + + expect(sqls).toHaveLength(1); + const sql = sqls[0]; + + // Clause 1 (existing): pages with extractable text_content. + expect(sql).toContain('text_content'); + // Clause 3 (the F6 fix, now narrowed): a page whose content JSON carries a + // real text node is selected even when text_content is null/empty, so a full + // reindex visits it instead of silently skipping it. + expect(sql).toContain('content::text'); + expect(sql).toContain(NARROWED_CLAUSE); + // It must NOT use the old bare `"text":` key, which also matches the + // `attrs.text` of math-only atom pages (false-positive denominator inflation). + expect(sql).not.toContain(BARE_TEXT_KEY); + // Clause 2 (existing): pages that already have stored embeddings stay in the + // set so a reindex can clear their stale rows. + expect(sql.toLowerCase()).toContain('embeddings'); + }); + + it('countEmbeddablePages compiles the SAME narrowed clause as getEmbeddablePageIds', async () => { + // Consistency is the core requirement: the denominator (countEmbeddablePages) + // and the reindex set (getEmbeddablePageIds) MUST share the identical + // predicate, else the live "done" counter and the steady-state total diverge. + const { db, sqls } = makeRecordingDb(); + const repo = new PageRepo(db as any, {} as any, { emit: jest.fn() } as any); + + await repo.countEmbeddablePages('ws-1'); + await repo.getEmbeddablePageIds('ws-1'); + + expect(sqls).toHaveLength(2); + const [countSql, idsSql] = sqls; + + // Both carry the narrowed content clause... + expect(countSql).toContain(NARROWED_CLAUSE); + expect(idsSql).toContain(NARROWED_CLAUSE); + // ...neither carries the bare key... + expect(countSql).not.toContain(BARE_TEXT_KEY); + expect(idsSql).not.toContain(BARE_TEXT_KEY); + // ...and the full OR predicate (text_content + content node + embeddings + // EXISTS) is byte-identical between the two queries, so they can't drift. + const where = (s: string) => s.slice(s.indexOf('where')); + expect(where(countSql)).toEqual(where(idsSql)); + }); + + it('the content regex matches a text-bearing doc but NOT a math-only doc', () => { + // Semantic check of the predicate against sample `content::text` payloads. + // Note: `jsonb::text` is NOT identical to JSON.stringify — Postgres renders a + // space after each colon (`"type": "text"`), which is exactly why the POSIX + // clause uses `[[:space:]]*`. The clause `"type"[[:space:]]*:[[:space:]]*"text"` + // maps to the JS regex below (`[[:space:]]` -> `\s`, tolerating both forms); + // we evaluate it the way Postgres would. + const re = /"type"\s*:\s*"text"/; + + // A real paragraph with a text node -> embeddable. + const textDoc = JSON.stringify({ + type: 'doc', + content: [ + { + type: 'paragraph', + content: [{ type: 'text', text: 'hello world' }], + }, + ], + }); + // A doc whose ONLY node is a math atom. Its LaTeX is in `attrs.text`, there is + // no text node, and `jsonToText`/`generateText` has no serializer for it -> it + // yields empty text_content and zero embeddings, so it must NOT qualify. + const mathOnlyDoc = JSON.stringify({ + type: 'doc', + content: [ + { type: 'mathBlock', attrs: { text: 'E = mc^2' } }, + { type: 'mathInline', attrs: { text: '\\alpha' } }, + ], + }); + // An empty doc has no text node either. + const emptyDoc = JSON.stringify({ type: 'doc', content: [] }); + + expect(re.test(textDoc)).toBe(true); + expect(re.test(mathOnlyDoc)).toBe(false); + expect(re.test(emptyDoc)).toBe(false); + // Sanity: the OLD bare-key regex WOULD have wrongly matched the math-only doc, + // which is precisely the false positive the narrowing removes. + expect(/"text"\s*:/.test(mathOnlyDoc)).toBe(true); + + // A user literally TYPING `"type":"text"` in prose can't false-positive on an + // otherwise text-less page: in `content::text` the typed value's quotes are + // escaped (`\"type\":\"text\"`), so the literal-quote regex does not match the + // escaped form. (And such a page is a genuine text node anyway.) + const escapedLiteral = JSON.stringify({ + type: 'doc', + content: [{ type: 'someAtom', attrs: { note: '"type":"text"' } }], + }); + expect(re.test(escapedLiteral)).toBe(false); + }); +}); diff --git a/apps/server/src/database/repos/page/page.repo.ts b/apps/server/src/database/repos/page/page.repo.ts index 72a979ce..c8626553 100644 --- a/apps/server/src/database/repos/page/page.repo.ts +++ b/apps/server/src/database/repos/page/page.repo.ts @@ -234,9 +234,9 @@ export class PageRepo { * text-less pages (which legitimately store zero embeddings) don't keep the * bar below 100% forever. * - * A page qualifies if it has non-empty textContent OR already has stored - * embeddings. The second clause covers pages whose text the indexer extracted - * from the content JSON when textContent was null, and guarantees this total is + * A page qualifies if it has non-empty textContent, OR its content JSON has at + * least one text node (`"type":"text"`) when textContent was never backfilled, + * OR it already has stored embeddings. The last clause guarantees this total is * always >= countIndexedPages (the indexed count can never exceed it). */ async countEmbeddablePages(workspaceId: string): Promise { @@ -259,8 +259,10 @@ export class PageRepo { * the trivial workspaceId/deletedAt filters inline; this returns only the * non-trivial OR clause, evaluated against the `p` alias of `pages`. * - * A page qualifies if it has non-empty textContent OR already has a stored - * (non-deleted) embedding row. + * A page qualifies if it has non-empty textContent, OR its ProseMirror + * `content` JSON has at least one text node (`"type":"text"`) even though + * textContent was never backfilled, OR it already has a stored (non-deleted) + * embedding row. */ private embeddablePredicate( eb: ExpressionBuilder, @@ -270,6 +272,25 @@ export class PageRepo { // character, mirroring the indexer's `text.trim().length === 0` check // (raw SQL -> use the snake_case column name). sql`p.text_content ~ '[^[:space:]]'`, + // OR the ProseMirror `content` JSON has at least one text node (`"type": + // "text"`) the indexer can extract, even when `text_content` is null/empty + // (never backfilled): `reindexPage` runs `jsonToText` (generateText) over + // `content`, which only emits the text of ProseMirror text nodes, so such a + // page IS embeddable and a full reindex MUST visit it (otherwise it is + // silently skipped). A text node always serialises as + // `{"type":"text","text":"..."}`, so we key on the structural `"type": + // "text"` marker — NOT a bare `"text":` key, which also appears as the + // `attrs.text` of atom nodes that carry NO extractable text (e.g. math + // `mathBlock`/`mathInline`, whose LaTeX lives in `attrs.text` and has no + // text serializer). A math-only page thus produces empty `text_content` and + // zero embeddings; matching its `attrs.text` here would wrongly inflate the + // denominator and keep "Indexed N of M" below 100% forever. An empty doc + // (no text nodes) has no `"type":"text"` and is correctly excluded. A user + // who literally types `"type":"text"` in their prose can't false-positive: + // in `content::text` that text value's quotes are escaped (`\"type\"...`), + // so the literal-quote regex won't match the escaped form (and such a page + // is a real text node anyway). + sql`p.content::text ~ '"type"[[:space:]]*:[[:space:]]*"text"'`, // OR already has at least one (non-deleted) embedding row. eb.exists( eb @@ -284,7 +305,9 @@ export class PageRepo { /** * IDs of the EMBEDDABLE page set for a workspace — the exact same set that * `countEmbeddablePages` counts (a page qualifies if it has non-empty - * textContent OR already has a stored embedding row). The bulk reindex + * textContent, OR content JSON with at least one text node (`"type":"text"`) + * and an empty/null textContent, OR already has a stored embedding row). The + * bulk reindex * iterates THIS set so the live "done" counter reaches exactly * `countEmbeddablePages` (the steady-state denominator), instead of iterating * every non-deleted page (which would push the denominator above the diff --git a/apps/server/src/integrations/ai/ai-settings.service.spec.ts b/apps/server/src/integrations/ai/ai-settings.service.spec.ts index 97f81b56..557a0566 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.spec.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.spec.ts @@ -172,7 +172,17 @@ describe('AiSettingsService.reindex progress seed', () => { await service.reindex(WORKSPACE_ID); - expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478); + // The pre-seed carries the real page count AND a SHORT ttl (3rd arg) so a + // de-duplicated enqueue against a just-finishing job can't leave a phantom + // "reindexing: 0 of N" stuck for the full record TTL (F10). + expect(reindexProgress.start).toHaveBeenCalledWith( + WORKSPACE_ID, + 478, + expect.any(Number), + ); + const ttl = reindexProgress.start.mock.calls[0][2]; + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(60); // short, not the full 1h record TTL expect(aiQueue.add).toHaveBeenCalledTimes(1); // Seed must precede the enqueue so the first poll already reports done=0. expect(order).toEqual(['start', 'add']); @@ -204,7 +214,11 @@ describe('AiSettingsService.reindex progress seed', () => { await expect(service.reindex(WORKSPACE_ID)).rejects.toBe(boom); - expect(reindexProgress.start).toHaveBeenCalledWith(WORKSPACE_ID, 478); + expect(reindexProgress.start).toHaveBeenCalledWith( + WORKSPACE_ID, + 478, + expect.any(Number), + ); expect(reindexProgress.clear).toHaveBeenCalledWith(WORKSPACE_ID); }); diff --git a/apps/server/src/integrations/ai/ai-settings.service.ts b/apps/server/src/integrations/ai/ai-settings.service.ts index 7dc2238f..f8525871 100644 --- a/apps/server/src/integrations/ai/ai-settings.service.ts +++ b/apps/server/src/integrations/ai/ai-settings.service.ts @@ -31,6 +31,17 @@ export function parsePositiveInt(raw: unknown): number | undefined { return Number.isFinite(n) && n > 0 ? Math.floor(n) : undefined; } +/** + * TTL (seconds) for the enqueue-time progress PRE-SEED written by `reindex()` + * before the worker starts. Deliberately SHORT: if `aiQueue.add()` de-duplicates + * against a job that is just finishing (the worker's finally already ran + * `clear()` but removeOnComplete hasn't yet removed the job), no new worker runs + * to overwrite/clear this seed — so a short TTL lets the phantom "reindexing: + * 0 of N" expire in seconds instead of sticking for the full 1h record TTL. A + * worker that DOES start re-seeds with the full TTL, so a real run is unaffected. + */ +const PRE_SEED_TTL_SECONDS = 45; + /** * Shape of the partial update accepted by `update`. Mirrors the validated * controller DTO. `apiKey` / `embeddingApiKey` are write-only: undefined = @@ -117,7 +128,15 @@ export class AiSettingsService { let seeded = false; if ((await this.reindexProgress.get(workspaceId)) === null) { const totalPages = await this.pageRepo.countEmbeddablePages(workspaceId); - await this.reindexProgress.start(workspaceId, totalPages); + // Short TTL: if add() below de-duplicates against a just-finishing job + // whose worker already clear()ed but isn't removed yet, no worker runs to + // clear this seed — the short TTL expires the phantom record in seconds + // rather than leaving a stuck "reindexing: 0 of N" for the full record TTL. + await this.reindexProgress.start( + workspaceId, + totalPages, + PRE_SEED_TTL_SECONDS, + ); seeded = true; } @@ -286,22 +305,33 @@ export class AiSettingsService { hasSttApiKey = !!creds?.sttApiKeyEnc; } - // totalPages now counts only pages with embeddable content (non-empty text - // or already-stored embeddings), so empty/text-less pages don't keep the - // "Indexed N of M pages" bar below 100% forever. - const [indexedPages, totalPages] = await Promise.all([ - this.pageEmbeddingRepo.countIndexedPages(workspaceId), - this.pageRepo.countEmbeddablePages(workspaceId), - ]); - // While a reindex run is active, report its LIVE progress (done climbs 0 -> - // total) so the settings UI can watch it advance. Without this the counter - // never drops: the per-page reindex hard-replaces rows in its own small - // transaction, so countIndexedPages stays ~= total for the whole run. With - // no active record we fall back to the steady-state DB coverage count, which + // total) so the settings UI can watch it advance. Read progress FIRST and + // short-circuit: this endpoint is polled every ~5s for the whole run, so when + // a record is active we skip the two coverage COUNTs entirely (their results + // would be discarded anyway). Without the live progress the counter never + // drops: the per-page reindex hard-replaces rows in its own small + // transaction, so countIndexedPages stays ~= total for the whole run. With no + // active record we fall back to the steady-state DB coverage count, which // preserves the existing display and the client's "done == total -> stop // polling" condition (the run ends -> record cleared -> DB count == total). + // + // The fallback `totalPages` counts only pages with embeddable content + // (non-empty text, content-borne text, or already-stored embeddings), so + // empty/text-less pages don't keep the "Indexed N of M pages" bar below 100% + // forever. const progress = await this.reindexProgress.get(workspaceId); + let indexedPages: number; + let totalPages: number; + if (progress) { + indexedPages = progress.done; + totalPages = progress.total; + } else { + [indexedPages, totalPages] = await Promise.all([ + this.pageEmbeddingRepo.countIndexedPages(workspaceId), + this.pageRepo.countEmbeddablePages(workspaceId), + ]); + } return { driver: provider.driver, @@ -321,8 +351,8 @@ export class AiSettingsService { hasApiKey, hasEmbeddingApiKey, hasSttApiKey, - indexedPages: progress ? progress.done : indexedPages, - totalPages: progress ? progress.total : totalPages, + indexedPages, + totalPages, // Optional hint for the client: a reindex run is currently in progress. reindexing: progress != null, }; diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts index 2df8826c..496d55e8 100644 --- a/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.spec.ts @@ -115,6 +115,22 @@ describe('EmbeddingReindexProgressService', () => { expect(multiObj.exec).toHaveBeenCalledTimes(1); }); + it('defaults the expire TTL to the full 1h record TTL', async () => { + const { redis, multiObj } = makeRedis(); + await makeService(redis).start(WORKSPACE_ID, 478); + // Default ttl = full record TTL (60 * 60) so a real run never expires + // mid-flight before the worker refreshes it on each increment. + expect(multiObj.expire).toHaveBeenCalledWith(KEY, 60 * 60); + }); + + it('honours an explicit short ttlSeconds for the enqueue-time pre-seed (F10)', async () => { + const { redis, multiObj } = makeRedis(); + // The reindex() pre-seed passes a short ttl so a phantom record left by a + // de-duplicated enqueue expires in seconds, not after the full 1h TTL. + await makeService(redis).start(WORKSPACE_ID, 478, 45); + expect(multiObj.expire).toHaveBeenCalledWith(KEY, 45); + }); + it('swallows a thrown Redis error (best-effort)', async () => { const { redis } = makeRedis({ execImpl: () => Promise.reject(new Error('redis down')), diff --git a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts index 2d62fd65..2f551f0d 100644 --- a/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts +++ b/apps/server/src/integrations/ai/embedding-reindex-progress.service.ts @@ -65,12 +65,25 @@ export class EmbeddingReindexProgressService { /** * Begin (or reset) the progress record for a workspace: `total` pages, `done` - * back to 0, `startedAt` now. Called at reindex enqueue time (placeholder - * total, so the very first status poll already reports done=0) and again at - * the worker start (overwriting `total` with the real page count). Resets - * `done` to 0 so a re-trigger never inherits a stale count. + * back to 0, `startedAt` now. Called twice for a run, BOTH with the real page + * count (countEmbeddablePages) so the two totals coincide: once at reindex + * enqueue time (so the very first status poll already reports done=0) and again + * at the worker start (which re-asserts the same total and resets `done`). + * Resets `done` to 0 so a re-trigger never inherits a stale count. + * + * `ttlSeconds` lets the caller pick the record's lifetime. The enqueue-time + * pre-seed passes a SHORT ttl: if `aiQueue.add()` de-duplicates against a job + * that is just finishing (its worker hasn't yet removed the job but already + * ran its `clear()`), no new worker starts to clear this phantom seed, so a + * short ttl lets it expire in seconds instead of sticking for the full TTL. + * The worker's own `start()` at the begin of a real run overwrites this entry + * and raises the ttl back to the default full TTL. */ - async start(workspaceId: string, total: number): Promise { + async start( + workspaceId: string, + total: number, + ttlSeconds: number = TTL_SECONDS, + ): Promise { const key = this.key(workspaceId); try { await this.redis @@ -80,7 +93,7 @@ export class EmbeddingReindexProgressService { done: '0', startedAt: String(Date.now()), }) - .expire(key, TTL_SECONDS) + .expire(key, ttlSeconds) .exec(); } catch (err) { this.logger.warn(