fix(ai): make RAG indexer observable and bound hung embedding calls
The bulk embedding reindex could hang on a single page forever
("Indexed 27 of 34 pages") with zero log output:
- all progress logs were debug-level, suppressed in production (pino info);
- embedMany() had no timeout, so a slow/hung embeddings endpoint blocked
the sequential per-page loop indefinitely.
Changes:
- ai.service.embedTexts: bound embedMany with AbortSignal.timeout
(configurable via AI_EMBEDDING_TIMEOUT_MS, default 120000ms); on timeout
throw a clear, greppable message, classified by both signal.aborted and
the error name (TimeoutError/AbortError/ResponseAborted) so a real
provider error racing the timer keeps its diagnostics.
- embedding-indexer.reindexWorkspace: promote lifecycle/progress logs to
info; log "[i/N] indexing page <id>" BEFORE the await so a hang names the
stuck page; warn on slow pages (>30s); add timing + final summary.
- .env.example: document AI_EMBEDDING_TIMEOUT_MS.
This commit is contained in:
@@ -77,3 +77,7 @@ MCP_DOCMOST_PASSWORD=
|
||||
# the workspace MCP toggle and network isolation (do not expose the port publicly).
|
||||
# MCP_TOKEN=
|
||||
# MCP_SESSION_IDLE_MS=1800000
|
||||
|
||||
# Per-embedding-call timeout in milliseconds for the RAG indexer.
|
||||
# A slow/hung embeddings endpoint fails after this and the batch continues.
|
||||
# AI_EMBEDDING_TIMEOUT_MS=120000
|
||||
|
||||
@@ -25,6 +25,10 @@ import { jsonToText } from '../../../collaboration/collaboration.util';
|
||||
const CHUNK_SIZE = 1000;
|
||||
const CHUNK_OVERLAP = 200;
|
||||
|
||||
// A single page taking longer than this during a bulk reindex is logged at
|
||||
// WARN as an early "slow page" signal before the hard embedding timeout.
|
||||
const SLOW_PAGE_MS = 30_000;
|
||||
|
||||
/**
|
||||
* Vector-RAG indexer (§6.7 stage D / §14[M1]). Turns a page's plain text into
|
||||
* chunk embeddings and persists them so the `semanticSearch` agent tool can do
|
||||
@@ -168,7 +172,7 @@ export class EmbeddingIndexerService {
|
||||
await this.aiService.getEmbeddingModel(workspaceId);
|
||||
} catch (err) {
|
||||
if (err instanceof AiEmbeddingNotConfiguredException) {
|
||||
this.logger.debug(
|
||||
this.logger.log(
|
||||
`reindexWorkspace: embeddings not configured for workspace ${workspaceId}, skipping`,
|
||||
);
|
||||
return;
|
||||
@@ -177,28 +181,44 @@ export class EmbeddingIndexerService {
|
||||
}
|
||||
|
||||
const pageIds = await this.pageRepo.getIdsByWorkspace(workspaceId);
|
||||
this.logger.debug(
|
||||
`reindexWorkspace: reindexing ${pageIds.length} page(s) for workspace ${workspaceId}`,
|
||||
const total = pageIds.length;
|
||||
const startedAt = Date.now();
|
||||
this.logger.log(
|
||||
`reindexWorkspace: starting reindex of ${total} page(s) for workspace ${workspaceId}`,
|
||||
);
|
||||
|
||||
let failed = 0;
|
||||
for (const pageId of pageIds) {
|
||||
for (let i = 0; i < total; i++) {
|
||||
const pageId = pageIds[i];
|
||||
const position = i + 1;
|
||||
// Log BEFORE the await: if the embedding call hangs, this is the last line
|
||||
// in the log and it names the exact page that is stuck.
|
||||
this.logger.log(
|
||||
`reindexWorkspace: [${position}/${total}] indexing page ${pageId} (workspace ${workspaceId})`,
|
||||
);
|
||||
const pageStartedAt = Date.now();
|
||||
try {
|
||||
await this.reindexPage(pageId);
|
||||
const elapsed = Date.now() - pageStartedAt;
|
||||
if (elapsed >= SLOW_PAGE_MS) {
|
||||
this.logger.warn(
|
||||
`reindexWorkspace: [${position}/${total}] page ${pageId} took ${elapsed}ms`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
// Per-page isolation: one failure must not abort the whole batch.
|
||||
// Per-page isolation: one failure (incl. an embedding timeout) must not
|
||||
// abort the whole batch.
|
||||
failed++;
|
||||
this.logger.error(
|
||||
`reindexWorkspace: failed to reindex page ${pageId}: ${describeProviderError(
|
||||
err,
|
||||
)}`,
|
||||
`reindexWorkspace: [${position}/${total}] failed to reindex page ${pageId} ` +
|
||||
`after ${Date.now() - pageStartedAt}ms: ${describeProviderError(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
this.logger.debug(
|
||||
`reindexWorkspace: done for workspace ${workspaceId} (${
|
||||
pageIds.length - failed
|
||||
}/${pageIds.length} pages)`,
|
||||
|
||||
this.logger.log(
|
||||
`reindexWorkspace: done for workspace ${workspaceId}: ` +
|
||||
`${total - failed}/${total} indexed, ${failed} failed in ${Date.now() - startedAt}ms`,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -114,8 +114,50 @@ export class AiService {
|
||||
async embedTexts(workspaceId: string, texts: string[]): Promise<number[][]> {
|
||||
if (texts.length === 0) return [];
|
||||
const model = await this.getEmbeddingModel(workspaceId);
|
||||
const { embeddings } = await embedMany({ model, values: texts });
|
||||
// Bound the embedding call: a slow/hung embeddings endpoint must fail loudly
|
||||
// (and let the caller move on to the next page) instead of blocking forever.
|
||||
// The single signal caps the WHOLE call, including the SDK's internal
|
||||
// retries/backoff (embedMany defaults to maxRetries: 2).
|
||||
const timeoutMs = AiService.embeddingTimeoutMs();
|
||||
const signal = AbortSignal.timeout(timeoutMs);
|
||||
try {
|
||||
const { embeddings } = await embedMany({
|
||||
model,
|
||||
values: texts,
|
||||
abortSignal: signal,
|
||||
});
|
||||
return embeddings;
|
||||
} catch (err) {
|
||||
// AbortSignal.timeout aborts with an opaque TimeoutError; surface a clear,
|
||||
// greppable message so a hung/slow embeddings endpoint is obvious in logs.
|
||||
// Classify by the error itself (name) AND the signal, not the flag alone:
|
||||
// a genuine provider error that loses a race with the timer would also see
|
||||
// `signal.aborted === true`, and must keep its real diagnostics.
|
||||
// Mirror the SDK's own isAbortError (@ai-sdk/provider-utils): it treats
|
||||
// TimeoutError, AbortError and ResponseAborted (Next.js) as aborts.
|
||||
const abortLike =
|
||||
err instanceof Error &&
|
||||
(err.name === 'TimeoutError' ||
|
||||
err.name === 'AbortError' ||
|
||||
err.name === 'ResponseAborted');
|
||||
if (signal.aborted && abortLike) {
|
||||
throw new Error(
|
||||
`Embedding request timed out after ${timeoutMs}ms ` +
|
||||
`(workspace ${workspaceId}, ${texts.length} chunk(s)). ` +
|
||||
`Increase AI_EMBEDDING_TIMEOUT_MS or check the embeddings endpoint.`,
|
||||
);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-embedding-call timeout in ms. Configurable via AI_EMBEDDING_TIMEOUT_MS;
|
||||
* falls back to 120000 (2 min) when unset or invalid.
|
||||
*/
|
||||
private static embeddingTimeoutMs(): number {
|
||||
const raw = Number(process.env.AI_EMBEDDING_TIMEOUT_MS);
|
||||
return Number.isFinite(raw) && raw > 0 ? raw : 120_000;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user