fix(ai-chat): live streaming, open-page context, any-dimension embeddings" -m "- streaming: give useChat a STABLE store id (chatId ?? per-mount generated)

so the v6 hook stops re-creating its store every render on a new chat
  (which wiped the optimistic user message + streamed deltas, so nothing
  showed until the turn finished). Also send X-Accel-Buffering:no + flushHeaders.
- context: client sends the currently-open page {id,title}; the system prompt
  tells the agent which page 'this page' refers to (it reads it via its
  CASL-scoped getPage tool; id is prompt-context only, no server-side fetch).
- embeddings: make page_embeddings.embedding dimension-agnostic (drop the
  HNSW index + ALTER to vector), remove the hard 1536 guard, filter search by
  model_dimensions — so 3072-dim (and any) models index instead of being
  skipped. Seq-scan <=> search (wiki scale); existing pages reindex on next edit.
This commit is contained in:
vvzvlad
2026-06-17 04:58:06 +03:00
parent a4b7919753
commit 65f0713a70
7 changed files with 238 additions and 37 deletions

View File

@@ -55,6 +55,13 @@ export interface BuildSystemPromptInput {
* used instead.
*/
adminPrompt?: string | null;
/**
* The page the user is currently viewing (client-supplied), if any. When it
* has an id, a CONTEXT line is added so the agent can resolve "this page" /
* "the current page" to that pageId. The page is NOT fetched here — the agent
* uses its CASL-enforced read/write page tools with the id when needed.
*/
openedPage?: { id?: string; title?: string } | null;
}
/**
@@ -65,15 +72,27 @@ export interface BuildSystemPromptInput {
export function buildSystemPrompt({
workspace,
adminPrompt,
openedPage,
}: BuildSystemPromptInput): string {
const base =
typeof adminPrompt === 'string' && adminPrompt.trim().length > 0
? adminPrompt.trim()
: DEFAULT_PROMPT;
const context = workspace?.name
? `\n\nWorkspace: ${workspace.name}.`
: '';
let context = workspace?.name ? `\n\nWorkspace: ${workspace.name}.` : '';
// When the user has a page open, tell the agent which page "this page" means.
// Context only — the agent reads/writes via its CASL-enforced page tools, so a
// spoofed id cannot escalate (getPage would 403). Added to the context section,
// never the immutable safety framework. Absent => nothing is added.
const pageId = openedPage?.id;
if (typeof pageId === 'string' && pageId.trim().length > 0) {
const title =
typeof openedPage?.title === 'string' && openedPage.title.trim().length > 0
? openedPage.title.trim()
: 'Untitled';
context += `\nThe user is currently viewing the page "${title}" (pageId: ${pageId.trim()}). When they refer to "this page", "the current page", or similar, operate on that pageId — use the read/write page tools with it.`;
}
return `${base}${context}\n${SAFETY_FRAMEWORK}`;
}

View File

@@ -24,6 +24,12 @@ import { buildSystemPrompt } from './ai-chat.prompt';
*/
export interface AiChatStreamBody {
chatId?: string;
// The page the user is currently viewing (client-supplied), or null on a
// non-page route. Used ONLY as prompt context so the agent knows what "this
// page" refers to; the page itself is never fetched server-side here. The id
// is attacker-controllable but harmless: the agent reads/writes via its
// CASL-enforced page tools, which 403 on a page the user cannot access.
openPage?: { id?: string; title?: string } | null;
// useChat sends the full UIMessage list; the last one is the new user turn.
messages?: UIMessage[];
}
@@ -140,6 +146,7 @@ export class AiChatService {
const system = buildSystemPrompt({
workspace,
adminPrompt: resolved?.systemPrompt,
openedPage: body.openPage,
});
// Pass the resolved chatId so the write tools can mint provenance tokens
@@ -310,7 +317,22 @@ export class AiChatService {
// UI shows a generic failure. Surface the real provider message instead.
// AI SDK error messages / 4xx bodies never contain the API key, so this is
// safe; we never dump the resolved config/apiKey.
//
// SSE buffering / proxy note: pipeUIMessageStreamToResponse writes the
// headers immediately (res.writeHead) and each chunk incrementally, and the
// SDK's default UI_MESSAGE_STREAM_HEADERS already include
// `x-accel-buffering: no` (disables nginx response buffering) plus
// `content-type: text/event-stream` and `cache-control: no-cache`. We pass
// `headers` explicitly anyway so the intent is visible here and survives any
// future change to the SDK defaults (prepareHeaders only fills a header when
// absent, so this never clobbers the SDK's content-type). DEPLOYMENT: the
// reverse proxy in front of this server MUST NOT buffer this route, or the
// whole response is released at once and nothing streams. nginx honours the
// `x-accel-buffering: no` header we send (and additionally set
// `proxy_buffering off; proxy_cache off;` for /api/ai-chat/stream); traefik
// does not buffer responses by default.
result.pipeUIMessageStreamToResponse(res.raw, {
headers: { 'X-Accel-Buffering': 'no' },
onError: (error: unknown) => {
const e = error as { statusCode?: number; message?: string };
return e?.statusCode
@@ -318,6 +340,13 @@ export class AiChatService {
: (e?.message ?? 'AI stream error');
},
});
// Force the status line + headers onto the socket NOW (before the model's
// first token), so the proxy sees the response start immediately even if the
// provider's first chunk is delayed. writeToServerResponse already called
// writeHead synchronously above; flushHeaders is a belt-and-braces no-op once
// headers are sent, and is guarded for response-likes that lack it.
res.raw.flushHeaders?.();
} catch (err) {
// Synchronous failure before/while wiring the stream: the terminal
// callbacks will not run, so release the leased external clients here and

View File

@@ -12,13 +12,12 @@ import { AiService } from '../../../integrations/ai/ai.service';
import { AiEmbeddingNotConfiguredException } from '../../../integrations/ai/ai-embedding-not-configured.exception';
import { jsonToText } from '../../../collaboration/collaboration.util';
/**
* Embedding dimension the `page_embeddings.embedding` column is fixed at
* (`vector(1536)`). A model whose vectors have a different dimension cannot fit
* this column — v1 limitation (§14[M7]); see the dimension guard in
* `reindexPage`.
*/
const EMBEDDING_DIMENSIONS = 1536;
// NOTE: the `page_embeddings.embedding` column is now dimension-agnostic
// (bare pgvector `vector`, see migration 20260617T140000), so the indexer
// stores WHATEVER dimension the configured model returns and records it per row
// in `model_dimensions`. There is no fixed-dimension guard any more; search
// compares only same-dimension rows. Trade-off: a dimension-agnostic column has
// no ANN index, so retrieval is a seq scan with `<=>` (fine at wiki scale).
// RecursiveCharacterTextSplitter settings. ~1000 chars per chunk with 200 char
// overlap is a reasonable default for prose retrieval (§6.7 stage D).
@@ -31,7 +30,7 @@ const CHUNK_OVERLAP = 200;
* cosine ANN retrieval.
*
* Everything is workspace-scoped. Reindex HARD-replaces a page's rows (delete +
* insert in one transaction) so the HNSW index never serves stale vectors.
* insert in one transaction) so search never serves stale vectors.
*/
@Injectable()
export class EmbeddingIndexerService {
@@ -48,9 +47,9 @@ export class EmbeddingIndexerService {
* (Re)build the embeddings for a single page.
*
* No-ops quietly when embeddings are unconfigured (so the queue never dies on
* an unconfigured workspace) and when a non-matching embedding dimension is
* returned (skip + single warning — §14[M7]). Deleted/empty pages have their
* rows purged and return.
* an unconfigured workspace). Any embedding dimension is accepted; the only
* defensive skip is a page whose chunks somehow yield mixed vector lengths.
* Deleted/empty pages have their rows purged and return.
*/
async reindexPage(pageId: string): Promise<void> {
const page = await this.pageRepo.findById(pageId, {
@@ -115,17 +114,21 @@ export class EmbeddingIndexerService {
// Embed all chunks in one batch.
const vectors = await this.aiService.embedTexts(workspaceId, chunks);
// Dimension guard (§14[M7]): the column is a fixed vector(1536). A model
// with a different output dimension cannot be stored — skip the page and
// warn once rather than failing every row insert.
const wrongDim = vectors.find((v) => v.length !== EMBEDDING_DIMENSIONS);
if (wrongDim) {
this.logger.warn(
`reindexPage: embedding dimension ${wrongDim.length} != ${EMBEDDING_DIMENSIONS} ` +
`for workspace ${workspaceId}; skipping page ${pageId}. ` +
`The embedding column is fixed at ${EMBEDDING_DIMENSIONS} dims (v1 limitation §14[M7]).`,
);
return;
// The column is dimension-agnostic, so ANY model dimension is stored as-is.
// Defensive sanity check only: all chunks of ONE page come from the SAME
// model and must share a dimension. A page that yields mixed lengths would
// poison the per-dimension search filter, so skip it with a warning rather
// than insert inconsistent rows.
const expectedDim = vectors[0]?.length;
if (expectedDim != null) {
const mixed = vectors.find((v) => v.length !== expectedDim);
if (mixed) {
this.logger.warn(
`reindexPage: mixed embedding dimensions (${expectedDim} vs ${mixed.length}) ` +
`for workspace ${workspaceId}; skipping page ${pageId}.`,
);
return;
}
}
const rows = this.buildChunkRows(
@@ -136,8 +139,8 @@ export class EmbeddingIndexerService {
modelName,
);
// HARD replace in one transaction: delete then insert so the ANN index
// never holds stale vectors for this page.
// HARD replace in one transaction: delete then insert so search never
// returns stale vectors for this page.
await executeTx(this.db, async (trx) => {
await this.pageEmbeddingRepo.deleteByPage(pageId, workspaceId, trx);
await this.pageEmbeddingRepo.insertChunks(rows, trx);

View File

@@ -0,0 +1,67 @@
import { type Kysely, sql } from 'kysely';
/**
* Make `page_embeddings.embedding` dimension-agnostic.
*
* The original column was `vector(1536)` — a FIXED dimension. On deployments
* whose embedding model emits a different dimension (e.g. OpenAI
* `text-embedding-3-large` = 3072, Gemini `text-embedding-004` = 768) every
* vector failed the indexer's dimension guard and every page was SKIPPED, so
* RAG / semanticSearch was never populated.
*
* pgvector's bare `vector` type (no `(N)`) accepts vectors of ANY dimension,
* so this migration drops the fixed dimension. The dimension is still recorded
* PER ROW in `model_dimensions`, and search filters on it so the `<=>` cosine
* operator only ever compares same-dimension vectors (pgvector errors on a
* dimension mismatch — possible when rows from a previous model linger).
*
* TRADE-OFF: an HNSW / ivfflat ANN index REQUIRES a fixed dimension, so a
* dimension-agnostic column cannot carry one. We therefore DROP the HNSW index
* and rely on a sequential scan with `<=>`. That is fine at wiki scale; if a
* single embedding dimension is ever pinned per deployment, an HNSW index can
* be re-added in a follow-up migration.
*/
export async function up(db: Kysely<any>): Promise<void> {
// The HNSW ANN index requires a fixed dimension; drop it before relaxing the
// column type. Index name mirrors 20260617T120000-page-embeddings.ts.
await sql`DROP INDEX IF EXISTS idx_page_embeddings_embedding_hnsw`.execute(db);
// Drop the (1536) dimension constraint so the column accepts any dimension.
// The identity cast `embedding::vector` is safe for existing 1536-dim rows;
// on the affected live stand the table is empty (everything was skipped), so
// there is no data risk.
await sql`
ALTER TABLE page_embeddings
ALTER COLUMN embedding TYPE vector USING embedding::vector
`.execute(db);
// Btree index supporting the scoped + dimension-filtered seq-scan search
// (workspace_id + space_id IN (...) + model_dimensions = queryDim).
await db.schema
.createIndex('idx_page_embeddings_ws_space_dim')
.ifNotExists()
.on('page_embeddings')
.columns(['workspace_id', 'space_id', 'model_dimensions'])
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
// Best-effort rollback. The `::vector(1536)` cast only succeeds if EVERY row
// is already 1536-dim — acceptable for a dev rollback (the up migration is
// the intended steady state). On non-1536 data this will (correctly) error.
await db.schema
.dropIndex('idx_page_embeddings_ws_space_dim')
.ifExists()
.execute();
await sql`
ALTER TABLE page_embeddings
ALTER COLUMN embedding TYPE vector(1536) USING embedding::vector(1536)
`.execute(db);
await sql`
CREATE INDEX IF NOT EXISTS idx_page_embeddings_embedding_hnsw
ON page_embeddings
USING hnsw (embedding vector_cosine_ops)
`.execute(db);
}

View File

@@ -9,11 +9,17 @@ import { dbOrTx } from '../../utils';
* Repository for `page_embeddings` — the pgvector store backing the AI agent's
* semantic search (§5.5 / §6.7 stage D).
*
* The `embedding` column is `vector(1536)`, which is NOT a native Kysely column
* The `embedding` column is a dimension-agnostic pgvector `vector` (no fixed
* `(N)`, see migration 20260617T140000), which is NOT a native Kysely column
* type, so every read/write of a vector is serialized with the `pgvector` npm
* helper (`pgvector.toSql(number[])` → a `'[1,2,3]'` text literal) and cast back
* to `vector` via a raw `::vector` SQL cast. Reindex is a HARD delete + insert
* (see `deleteByPage`) so the HNSW ANN index never returns stale vectors.
* (see `deleteByPage`) so search never returns stale vectors.
*
* TRADE-OFF: a dimension-agnostic column cannot carry an HNSW/ivfflat ANN index
* (those require a fixed dimension), so `searchByEmbedding` is a sequential scan
* with the `<=>` cosine operator. Fine at wiki scale; re-add an HNSW index if a
* single embedding dimension is ever pinned per deployment.
*/
/** A single chunk row to persist for a page (page-body embeddings). */
@@ -66,8 +72,8 @@ export class PageEmbeddingRepo {
/**
* Bulk-insert chunk rows for a page. The `embedding` value is serialized with
* `pgvector.toSql` and cast to `vector` so Postgres stores it in the fixed
* `vector(1536)` column. No-op on an empty array.
* `pgvector.toSql` and cast to `vector` so Postgres stores it in the
* dimension-agnostic `vector` column (any dimension). No-op on an empty array.
*/
async insertChunks(
rows: PageEmbeddingChunkRow[],
@@ -97,10 +103,17 @@ export class PageEmbeddingRepo {
}
/**
* Cosine ANN search over the embeddings, scoped to a workspace AND a set of
* Cosine search over the embeddings, scoped to a workspace AND a set of
* spaces the caller may read (see semanticSearch access-scoping). Orders by
* `embedding <=> $query` (cosine distance) and joins the page title cheaply.
* Returns [] when `spaceIds` is empty (no accessible spaces => no results).
*
* Because the column is dimension-agnostic (no ANN index), this is a seq scan
* with `<=>`. The query MUST only be compared against same-dimension rows —
* pgvector raises on a dimension mismatch, which can happen when rows from a
* previously configured embedding model still linger. We therefore filter by
* `model_dimensions = queryEmbedding.length` so the `<=>` operands always
* agree on dimension.
*/
async searchByEmbedding(
workspaceId: string,
@@ -112,6 +125,8 @@ export class PageEmbeddingRepo {
// Serialized + cast query vector reused for the distance expression.
const queryVector = sql`${pgvector.toSql(queryEmbedding)}::vector`;
// Compare only against rows produced by a model of the SAME dimension.
const queryDim = queryEmbedding.length;
const rows = await this.db
.selectFrom('pageEmbeddings as pe')
@@ -125,6 +140,9 @@ export class PageEmbeddingRepo {
])
.where('pe.workspaceId', '=', workspaceId)
.where('pe.spaceId', 'in', spaceIds)
// Same-dimension only: avoids a pgvector dimension-mismatch error against
// rows from a previously configured embedding model.
.where('pe.modelDimensions', '=', queryDim)
// Exclude chunks whose page is in the trash (defence in depth).
.where('p.deletedAt', 'is', null)
.orderBy('distance', 'asc')