fix(ai-embedding): abort bulk reindex on fatal provider errors
reindexWorkspace isolated every per-page failure, so an invalid/missing API key (401 "User not found") made all pages fail identically while the batch kept issuing hundreds of doomed requests against the provider. Add isFatalProviderError() (401/403 auth, 402 billing) and abort the whole batch on such errors; 429 rate-limit and embedding timeouts stay per-page isolated. Adds unit tests for the predicate and a regression test for the abort/iterate control flow. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,80 @@
|
|||||||
|
import { EmbeddingIndexerService } from './embedding-indexer.service';
|
||||||
|
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||||
|
import { PageEmbeddingRepo } from '@docmost/db/repos/ai-chat/page-embedding.repo';
|
||||||
|
import { KyselyDB } from '@docmost/db/types/kysely.types';
|
||||||
|
import { AiService } from '../../../integrations/ai/ai.service';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for EmbeddingIndexerService.reindexWorkspace's batch control flow.
|
||||||
|
*
|
||||||
|
* The constructor body only stores its deps, so the service can be unit-built
|
||||||
|
* with lightweight mocks — no Nest module graph. We stub only the methods that
|
||||||
|
* reindexWorkspace actually touches:
|
||||||
|
* - aiService.getEmbeddingModel -> a model string so the up-front configured
|
||||||
|
* check passes,
|
||||||
|
* - pageRepo.getIdsByWorkspace -> three page ids,
|
||||||
|
* - service.reindexPage -> spied per test to drive the per-page outcome.
|
||||||
|
*
|
||||||
|
* The point under test is the catch block: a FATAL provider error (auth/billing)
|
||||||
|
* must abort the whole batch (re-throw, stop iterating), while a non-fatal error
|
||||||
|
* keeps per-page isolation (failed++, continue to the next page).
|
||||||
|
*/
|
||||||
|
describe('EmbeddingIndexerService.reindexWorkspace fail-fast', () => {
|
||||||
|
const WORKSPACE_ID = 'ws-1';
|
||||||
|
|
||||||
|
function makeService() {
|
||||||
|
const pageRepo = {
|
||||||
|
getIdsByWorkspace: jest.fn().mockResolvedValue(['p1', 'p2', 'p3']),
|
||||||
|
};
|
||||||
|
const pageEmbeddingRepo = {};
|
||||||
|
const aiService = {
|
||||||
|
getEmbeddingModel: jest.fn().mockResolvedValue('some-model'),
|
||||||
|
};
|
||||||
|
const db = {};
|
||||||
|
|
||||||
|
const service = new EmbeddingIndexerService(
|
||||||
|
pageRepo as unknown as PageRepo,
|
||||||
|
pageEmbeddingRepo as unknown as PageEmbeddingRepo,
|
||||||
|
aiService as unknown as AiService,
|
||||||
|
db as unknown as KyselyDB,
|
||||||
|
);
|
||||||
|
return { service, pageRepo, aiService };
|
||||||
|
}
|
||||||
|
|
||||||
|
it('aborts after the first page on a FATAL (401) provider error', async () => {
|
||||||
|
const { service } = makeService();
|
||||||
|
// A 401 "User not found" recurs identically on every page -> must abort.
|
||||||
|
const reindexPage = jest
|
||||||
|
.spyOn(service, 'reindexPage')
|
||||||
|
.mockRejectedValue({ statusCode: 401, message: 'User not found' });
|
||||||
|
|
||||||
|
await expect(service.reindexWorkspace(WORKSPACE_ID)).rejects.toMatchObject({
|
||||||
|
statusCode: 401,
|
||||||
|
});
|
||||||
|
// Aborted on the first page: pages 2 and 3 were never attempted.
|
||||||
|
expect(reindexPage).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps per-page isolation on a non-fatal error (plain Error, no statusCode)', async () => {
|
||||||
|
const { service } = makeService();
|
||||||
|
// No statusCode -> non-fatal -> isolate per page and continue.
|
||||||
|
const reindexPage = jest
|
||||||
|
.spyOn(service, 'reindexPage')
|
||||||
|
.mockRejectedValue(new Error('boom'));
|
||||||
|
|
||||||
|
// Resolves (does not throw) even though every page failed.
|
||||||
|
await expect(service.reindexWorkspace(WORKSPACE_ID)).resolves.toBeUndefined();
|
||||||
|
// All three pages were attempted despite the failures.
|
||||||
|
expect(reindexPage).toHaveBeenCalledTimes(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('processes every page on the all-success path', async () => {
|
||||||
|
const { service } = makeService();
|
||||||
|
const reindexPage = jest
|
||||||
|
.spyOn(service, 'reindexPage')
|
||||||
|
.mockResolvedValue(undefined);
|
||||||
|
|
||||||
|
await expect(service.reindexWorkspace(WORKSPACE_ID)).resolves.toBeUndefined();
|
||||||
|
expect(reindexPage).toHaveBeenCalledTimes(3);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -10,7 +10,10 @@ import { InjectKysely } from 'nestjs-kysely';
|
|||||||
import { executeTx } from '@docmost/db/utils';
|
import { executeTx } from '@docmost/db/utils';
|
||||||
import { AiService } from '../../../integrations/ai/ai.service';
|
import { AiService } from '../../../integrations/ai/ai.service';
|
||||||
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
|
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
|
||||||
import { describeProviderError } from '../../../integrations/ai/ai-error.util';
|
import {
|
||||||
|
describeProviderError,
|
||||||
|
isFatalProviderError,
|
||||||
|
} from '../../../integrations/ai/ai-error.util';
|
||||||
import { jsonToText } from '../../../collaboration/collaboration.util';
|
import { jsonToText } from '../../../collaboration/collaboration.util';
|
||||||
|
|
||||||
// NOTE: the `page_embeddings.embedding` column is now dimension-agnostic
|
// NOTE: the `page_embeddings.embedding` column is now dimension-agnostic
|
||||||
@@ -229,8 +232,19 @@ export class EmbeddingIndexerService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// Per-page isolation: one failure (incl. an embedding timeout) must not
|
// A fatal provider error (invalid/missing key, no credits) recurs
|
||||||
// abort the whole batch.
|
// identically on EVERY remaining page. Abort the whole batch instead of
|
||||||
|
// issuing hundreds of doomed requests against the provider.
|
||||||
|
if (isFatalProviderError(err)) {
|
||||||
|
this.logger.error(
|
||||||
|
`reindexWorkspace: aborting at [${position}/${total}] for workspace ` +
|
||||||
|
`${workspaceId} — fatal provider error, remaining pages would fail ` +
|
||||||
|
`identically: ${describeProviderError(err)}`,
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
// Per-page isolation: one non-fatal failure (incl. an embedding timeout)
|
||||||
|
// must not abort the whole batch.
|
||||||
failed++;
|
failed++;
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
`reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` +
|
`reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` +
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { describeProviderError } from './ai-error.util';
|
import { describeProviderError, isFatalProviderError } from './ai-error.util';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for describeProviderError: the shared formatter used both for the
|
* Unit tests for describeProviderError: the shared formatter used both for the
|
||||||
@@ -129,3 +129,58 @@ describe('describeProviderError', () => {
|
|||||||
expect(describeProviderError({ foo: 'bar' } as never)).toBe('Unknown error');
|
expect(describeProviderError({ foo: 'bar' } as never)).toBe('Unknown error');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for isFatalProviderError: the predicate that decides whether a
|
||||||
|
* provider error should abort an ENTIRE batch (bulk reindex) rather than be
|
||||||
|
* isolated per item. Authentication (401/403) and billing (402) recur
|
||||||
|
* identically on every request and are fatal; a 429 rate limit is transient and
|
||||||
|
* intentionally NOT fatal (handled by per-item isolation / backoff). Anything
|
||||||
|
* without a recognised numeric statusCode is non-fatal.
|
||||||
|
*/
|
||||||
|
describe('isFatalProviderError', () => {
|
||||||
|
it('returns true for authentication errors (401/403)', () => {
|
||||||
|
expect(isFatalProviderError({ statusCode: 401, message: 'User not found' })).toBe(
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
expect(isFatalProviderError({ statusCode: 403, message: 'Forbidden' })).toBe(
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns true for a billing error (402)', () => {
|
||||||
|
expect(
|
||||||
|
isFatalProviderError({ statusCode: 402, message: 'Payment Required' }),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for a 429 rate limit (transient, intentionally non-fatal)', () => {
|
||||||
|
expect(
|
||||||
|
isFatalProviderError({ statusCode: 429, message: 'Too Many Requests' }),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for a 500 server error', () => {
|
||||||
|
expect(isFatalProviderError({ statusCode: 500, message: 'Server error' })).toBe(
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for an embedding timeout (plain Error, no statusCode)', () => {
|
||||||
|
expect(
|
||||||
|
isFatalProviderError(new Error('Embedding request timed out after 60000ms')),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for non-object errors (null/undefined/string/number)', () => {
|
||||||
|
expect(isFatalProviderError(null)).toBe(false);
|
||||||
|
expect(isFatalProviderError(undefined)).toBe(false);
|
||||||
|
expect(isFatalProviderError('boom')).toBe(false);
|
||||||
|
// A bare numeric 401 is NOT an object carrying a statusCode field.
|
||||||
|
expect(isFatalProviderError(401)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for an object without a statusCode', () => {
|
||||||
|
expect(isFatalProviderError({})).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -47,6 +47,20 @@ export function describeProviderError(
|
|||||||
return label ? `${label} — ${detail}` : detail;
|
return label ? `${label} — ${detail}` : detail;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether a provider error is FATAL for an ENTIRE batch operation rather than
|
||||||
|
* specific to one item. Authentication (401/403 — invalid or missing API key)
|
||||||
|
* and billing (402 — insufficient credits/quota) failures recur identically on
|
||||||
|
* every subsequent request, so a bulk reindex should abort immediately instead
|
||||||
|
* of issuing hundreds of doomed calls. A 429 rate limit is intentionally NOT
|
||||||
|
* fatal: it is transient and better handled by per-item isolation / backoff.
|
||||||
|
*/
|
||||||
|
export function isFatalProviderError(err: unknown): boolean {
|
||||||
|
if (typeof err !== 'object' || err === null) return false;
|
||||||
|
const status = (err as { statusCode?: number }).statusCode;
|
||||||
|
return status === 401 || status === 403 || status === 402;
|
||||||
|
}
|
||||||
|
|
||||||
// Map a small set of well-known provider HTTP statuses to a clear,
|
// Map a small set of well-known provider HTTP statuses to a clear,
|
||||||
// human-readable cause. Returns null for anything else so the existing
|
// human-readable cause. Returns null for anything else so the existing
|
||||||
// "<status>: <message> | response body: …" output is preserved unchanged.
|
// "<status>: <message> | response body: …" output is preserved unchanged.
|
||||||
|
|||||||
Reference in New Issue
Block a user