agent-roles: concurrency-safe catalog import + unified source validator

Item 1 (concurrency-safe import): add a partial UNIQUE index on
(workspace_id, source->>'slug', source->>'language') WHERE source IS NOT NULL
AND deleted_at IS NULL, so two concurrent imports of the same bundle can no
longer create duplicate roles for one catalog slug+language. The in-memory
installedKeys snapshot cannot see a sibling request's writes; the index is the
backstop. importFromCatalog now catches the 23505 from THIS index (keyed off
the constraint name) and treats it as "already installed" -> skip, batch
continues. A 23505 from the name-uniqueness index keeps its existing friendly
per-role error behavior (distinguished by constraint name; an indeterminate
23505 falls back to that path, so no regression).

Item 2 (single source validator): strengthen parseSource into THE single form
validator for the source jsonb column -> returns a fully-valid RoleSource | null
(slug/language non-empty strings, version a number). The service's weaker
roleSource is removed and both layers share the RoleSource type (defined in the
db entity.types module both already import AiAgentRole from, so no import
cycle). normalizeRow / the read path now only ever yield a valid RoleSource or
null; a malformed stored source normalizes to null (tolerated by the service).

Tests: parseSource null for {} / {slug:123} / {slug:'a'} / empty-string keys /
string version, typed value for a full valid shape; service test that a
source-uniqueness 23505 is skipped (not errored) and the batch continues.
Verified the partial index rejects a duplicate source-not-null row but allows
two source-NULL rows, and the migration up/down run cleanly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
claude code agent 227
2026-06-26 23:40:25 +03:00
parent 50e79275e1
commit 26ca19f89e
6 changed files with 193 additions and 47 deletions

View File

@@ -647,6 +647,38 @@ describe('AiAgentRolesService guards', () => {
]);
});
it('source-uniqueness 23505 (concurrent import of same slug+language) => skipped, NOT an error, batch continues', async () => {
// Two parallel imports of the same bundle each build installedKeys from a
// stale snapshot, so both reach the insert for slug 'a'. The DB partial
// unique index on (workspace, source->>slug, source->>language) rejects the
// loser with a 23505 carrying the source-index constraint name. That must
// be treated as "already installed" (skip), not a per-role error, and the
// rest of the batch (slug 'b') must still import.
const { service, repo } = makeImportService({
bundleRoles: [
catalogRole({ slug: 'a', name: 'A' }),
catalogRole({ slug: 'b', name: 'B' }),
],
indexRoles: [
{ slug: 'a', version: 1 },
{ slug: 'b', version: 1 },
],
});
const sourceRace = Object.assign(new Error('duplicate key'), {
code: '23505',
constraint: 'ai_agent_roles_workspace_source_unique',
});
repo.insert
.mockRejectedValueOnce(sourceRace)
.mockImplementationOnce((v) => Promise.resolve(makeRow(v)));
const res = await service.importFromCatalog('ws-1', 'u1', dto());
// 'a' converged on the concurrent install (skip); 'b' imported; no errors.
expect(res).toMatchObject({ created: 1, skipped: 1, renamed: 0 });
expect(res.errors).toEqual([]);
// Both inserts were attempted (the batch did not abort on the 23505).
expect(repo.insert).toHaveBeenCalledTimes(2);
});
it('non-unique insert error => generic message, root cause logged, import continues', async () => {
const logSpy = jest
.spyOn(Logger.prototype, 'error')

View File

@@ -5,21 +5,17 @@ import {
Injectable,
Logger,
} from '@nestjs/common';
import { AiAgentRoleRepo } from '@docmost/db/repos/ai-agent-roles/ai-agent-roles.repo';
import { AiAgentRole } from '@docmost/db/types/entity.types';
import {
AiAgentRoleRepo,
parseSource,
} from '@docmost/db/repos/ai-agent-roles/ai-agent-roles.repo';
import { AiAgentRole, RoleSource } from '@docmost/db/types/entity.types';
import { CreateAgentRoleDto, UpdateAgentRoleDto } from './dto/agent-role.dto';
import { ImportFromCatalogDto, UpdateFromCatalogDto } from './dto/agent-role-catalog.dto';
import { RoleModelConfig } from './role-model-config';
import { AiAgentRolesCatalogProvider } from './catalog/ai-agent-roles-catalog.provider';
import { CatalogBundleMeta } from './catalog/catalog-types';
/** The `source` jsonb shape that links an imported role to its catalog origin. */
interface RoleSource {
slug: string;
language: string;
version: number;
}
/**
* Full (admin) view of an agent role. There are no secret columns on this table
* (the model creds live in ai_provider_credentials, keyed by driver), so the
@@ -39,7 +35,7 @@ export interface AgentRoleView {
// Catalog origin of an imported role, or null for a manually-created one. The
// admin UI uses `version` to offer an UPDATE when the catalog ships a newer
// revision. Admin-only (deliberately absent from AgentRolePickerView).
source: { slug: string; language: string; version: number } | null;
source: RoleSource | null;
createdAt: Date;
updatedAt: Date;
}
@@ -318,7 +314,7 @@ export class AiAgentRolesService {
// `ru` variant of a slug already installed as `en` is a separate install.
const installedKeys = new Set(
existingRoles
.map((r) => roleSource(r))
.map((r) => parseSource(r.source))
.filter((s): s is RoleSource => s !== null)
.map((s) => `${s.slug}:${s.language}`),
);
@@ -375,10 +371,21 @@ export class AiAgentRolesService {
takenNames.add(name.toLowerCase());
installedKeys.add(installKey);
} catch (err) {
// A unique-name race is expected and self-explanatory (it becomes a
// friendly per-role error). Any OTHER insert failure is unexpected, so
// log the root cause with enough context to diagnose it — the
// user-facing message is deliberately generic.
// A 23505 from the source-uniqueness index means a CONCURRENT import
// already installed this exact slug+language between our snapshot
// (installedKeys) and this insert: the in-process snapshot cannot see a
// sibling request's writes, so the partial unique index is the backstop.
// Outcome is identical to the snapshot-based skip above — count it as
// skipped (already installed) and continue; do NOT abort or error.
if (isSourceUniqueViolation(err)) {
skipped++;
installedKeys.add(installKey);
continue;
}
// Otherwise: a unique-NAME race (23505 on the name index) is expected and
// self-explanatory (it becomes a friendly per-role error). Any OTHER
// insert failure is unexpected, so log the root cause with enough context
// to diagnose it — the user-facing message is deliberately generic.
if (!isUniqueViolation(err)) {
this.logger.error(
`Failed to import catalog role (workspaceId=${workspaceId} bundleId=${dto.bundleId} slug=${role.slug}): ${err instanceof Error ? err.stack ?? err.message : String(err)}`,
@@ -408,7 +415,7 @@ export class AiAgentRolesService {
const role = await this.repo.findById(dto.id, workspaceId);
if (!role) throw new BadRequestException('Role not found');
const source = roleSource(role);
const source = parseSource(role.source);
if (!source || !source.slug) {
throw new BadRequestException('Role was not imported from the catalog');
}
@@ -495,7 +502,9 @@ export class AiAgentRolesService {
enabled: row.enabled,
autoStart: row.autoStart,
launchMessage: row.launchMessage ?? null,
source: (row.source ?? null) as AgentRoleView['source'],
// parseSource yields a fully-valid RoleSource | null (the row is already
// normalized; this also keeps the field type honest without a cast).
source: parseSource(row.source),
createdAt: row.createdAt,
updatedAt: row.updatedAt,
};
@@ -542,6 +551,30 @@ function isUniqueViolation(err: unknown): boolean {
);
}
/**
* The partial unique index name from the
* 20260626T160000-ai-agent-roles-catalog-source-unique migration: unique on
* (workspace_id, source->>'slug', source->>'language') for catalog-imported,
* non-deleted rows. A 23505 carrying this constraint name is a source-collision
* (concurrent import of the same slug+language), distinct from a name-collision.
*/
const SOURCE_UNIQUE_CONSTRAINT = 'ai_agent_roles_workspace_source_unique';
/**
* Whether `err` is the 23505 raised by the SOURCE-uniqueness index specifically
* (vs the name-uniqueness index). Postgres sets `constraint` to the violated
* index name on a unique violation; we key off that so a source race is skipped
* while a name race still surfaces as a friendly per-role error. A 23505 with no
* constraint name (e.g. a wrapped/test error) is NOT treated as a source
* collision, preserving the existing name-race behavior.
*/
function isSourceUniqueViolation(err: unknown): boolean {
return (
isUniqueViolation(err) &&
(err as { constraint?: unknown }).constraint === SOURCE_UNIQUE_CONSTRAINT
);
}
/** '' / whitespace-only / undefined / null => null; otherwise the trimmed value. */
function emptyToNull(value: string | null | undefined): string | null {
if (value === undefined || value === null) return null;
@@ -549,17 +582,6 @@ function emptyToNull(value: string | null | undefined): string | null {
return trimmed.length > 0 ? trimmed : null;
}
/** Read + shape-check a role's `source` jsonb into a RoleSource, or null. */
function roleSource(role: AiAgentRole): RoleSource | null {
const s = role.source as unknown;
if (!s || typeof s !== 'object' || Array.isArray(s)) return null;
const obj = s as Record<string, unknown>;
if (typeof obj.slug !== 'string') return null;
if (typeof obj.language !== 'string') return null;
if (typeof obj.version !== 'number') return null;
return { slug: obj.slug, language: obj.language, version: obj.version };
}
/** slug -> version map from a bundle's index metadata. */
function versionMap(meta: CatalogBundleMeta): Map<string, number> {
return new Map(meta.roles.map((r) => [r.slug, r.version]));

View File

@@ -0,0 +1,31 @@
import { type Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
// A catalog-imported role is uniquely identified within a workspace by its
// `source.slug` + `source.language` (a multilingual catalog: the `ru` variant
// of a slug installed as `en` is a SEPARATE install — hence both keys). The
// import path skips a slug+language already installed using an in-memory
// snapshot (installedKeys), but two CONCURRENT imports of the same bundle each
// read a stale snapshot and would both insert the same slug+language,
// duplicating the role. This partial unique index is the database-level
// backstop: the second insert gets a 23505 the service treats as
// "already installed" (skip), so the two imports converge on ONE role.
//
// Partial on `source IS NOT NULL` so MANUALLY-created roles (source NULL) are
// unconstrained — there can be many of those. Also partial on
// `deleted_at IS NULL` (like the existing name-unique index) so a soft-deleted
// role does not block re-importing the same slug+language later, matching the
// app's snapshot (listByWorkspace filters out soft-deleted rows).
await sql`
CREATE UNIQUE INDEX IF NOT EXISTS ai_agent_roles_workspace_source_unique
ON ai_agent_roles (workspace_id, (source ->> 'slug'), (source ->> 'language'))
WHERE source IS NOT NULL AND deleted_at IS NULL
`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema
.dropIndex('ai_agent_roles_workspace_source_unique')
.ifExists()
.execute();
}

View File

@@ -157,25 +157,52 @@ describe('AiAgentRoleRepo insert/update auto-start columns', () => {
});
/**
* parseSource: a JSON-string (legacy double-encoded) is parsed; a real object
* passes through; null / a non-object / an array degrade to null (= manual role).
* parseSource is THE single form validator for the `source` jsonb column: a
* JSON-string (legacy double-encoded) is parsed; a FULLY-VALID object
* ({ slug, language, version }) passes through as a typed RoleSource; anything
* partial or wrong-shaped degrades to null (= manual role). This is the
* stricter-than-before guard that closes the drift where a weak `{}`/`{slug:123}`
* value used to be stamped as a valid source by the read path.
*/
describe('parseSource', () => {
it('parses a legacy double-encoded JSON string into the object', () => {
it('parses a legacy double-encoded JSON string into the typed source', () => {
expect(
parseSource('{"slug":"researcher","language":"en","version":1}'),
).toEqual({ slug: 'researcher', language: 'en', version: 1 });
});
it('passes an already-parsed object through', () => {
it('passes a fully-valid already-parsed object through', () => {
const obj = { slug: 's', language: 'en', version: 2 };
expect(parseSource(obj)).toEqual(obj);
});
it('returns the typed RoleSource (extra keys tolerated) for a valid shape', () => {
const src = parseSource({ slug: 's', language: 'ru', version: 3 });
expect(src).not.toBeNull();
// Narrowed to RoleSource: the fields are present and correctly typed.
expect(src?.slug).toBe('s');
expect(src?.language).toBe('ru');
expect(src?.version).toBe(3);
});
it('null / array / non-object / unparseable string => null', () => {
expect(parseSource(null)).toBeNull();
expect(parseSource([1, 2])).toBeNull();
expect(parseSource(42)).toBeNull();
expect(parseSource('not json')).toBeNull();
});
it('partial / wrong-typed shapes => null (no weak-but-typed-as-valid drift)', () => {
// Empty object: no slug/language/version.
expect(parseSource({})).toBeNull();
// slug present but not a string.
expect(parseSource({ slug: 123, language: 'en', version: 1 })).toBeNull();
// slug only, missing language + version.
expect(parseSource({ slug: 'a' })).toBeNull();
// empty-string slug / language are not valid catalog keys.
expect(parseSource({ slug: '', language: 'en', version: 1 })).toBeNull();
expect(parseSource({ slug: 'a', language: '', version: 1 })).toBeNull();
// version must be a number, not a numeric string.
expect(parseSource({ slug: 'a', language: 'en', version: '1' })).toBeNull();
});
});

View File

@@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB, KyselyTransaction } from '../../types/kysely.types';
import { dbOrTx, jsonbBind, parseJsonbValue } from '../../utils';
import { AiAgentRole } from '@docmost/db/types/entity.types';
import { AiAgentRole, RoleSource } from '@docmost/db/types/entity.types';
/** The jsonb shape persisted in `model_config` (loosely typed for the column). */
type ModelConfigValue = Record<string, unknown> | null;
@@ -203,29 +203,45 @@ export function parseModelConfig(
}
/**
* Parse the `source` jsonb value read from the DB into an object or null,
* analogous to {@link parseModelConfig}. Same legacy double-encoding self-heal
* (a JSON string is parsed once) and the same shape guard: not null, an object,
* and not an array. A corrupt / wrong-shaped value degrades to null (= manually
* created), so a bad row never breaks the read path.
* THE single form validator for the `source` jsonb column: parse the value read
* from the DB into a fully-valid {@link RoleSource} or null. Same legacy
* double-encoding self-heal as {@link parseModelConfig} (a JSON string is parsed
* once), then validates the FULL shape — `slug` and `language` non-empty
* strings, `version` a number. A null / corrupt / partially-shaped value (e.g.
* `{}`, `{ slug: 123 }`, `{ slug: 'a' }` missing language/version) degrades to
* null (= manually created, no catalog provenance), so a bad row never breaks
* the read path AND never stamps a half-built object as a valid `RoleSource`.
* Both the repo read-path and the service share this so the contract cannot
* drift between layers.
*/
export function parseSource(value: unknown): Record<string, unknown> | null {
return parseJsonbValue(
value,
(v): v is Record<string, unknown> =>
v !== null && typeof v === 'object' && !Array.isArray(v),
export function parseSource(value: unknown): RoleSource | null {
return parseJsonbValue(value, isRoleSource);
}
/** Full-shape guard for a persisted `source` jsonb value (see parseSource). */
function isRoleSource(v: unknown): v is RoleSource {
if (v === null || typeof v !== 'object' || Array.isArray(v)) return false;
const obj = v as Record<string, unknown>;
return (
typeof obj.slug === 'string' &&
obj.slug.length > 0 &&
typeof obj.language === 'string' &&
obj.language.length > 0 &&
typeof obj.version === 'number'
);
}
/** Normalize a DB row so `modelConfig` and `source` are always an object or
* null. The casts bridge the concrete `Record | null` to the column's broad
* generated `JsonValue` type (an object is a valid JsonValue at runtime). */
/** Normalize a DB row so `modelConfig` and `source` are always a valid object or
* null. The casts bridge the concrete parsed types (`Record | null`,
* `RoleSource | null`) to the column's broad generated `JsonValue` type — both
* are valid JsonValues at runtime; RoleSource lacks the JsonObject index
* signature so it routes through `unknown`. */
function normalizeRow(row: AiAgentRole): AiAgentRole {
return {
...row,
modelConfig: parseModelConfig(
row.modelConfig,
) as AiAgentRole['modelConfig'],
source: parseSource(row.source) as AiAgentRole['source'],
source: parseSource(row.source) as unknown as AiAgentRole['source'],
};
}

View File

@@ -81,6 +81,24 @@ export type UpdatableAiMcpServer = Updateable<Omit<AiMcpServersTable, 'id'>>;
// A role replaces the persona layer of the system prompt (instructions) and may
// optionally override the chat model (`modelConfig`). Soft-deletable.
export type AiAgentRole = Selectable<AiAgentRoles>;
/**
* The validated shape of the `source` jsonb column on ai_agent_roles: the
* catalog origin of an imported role. `version` lets the admin UI offer an
* UPDATE when the catalog ships a newer revision of the same slug; null `source`
* (not this type) means a manually-created role with no catalog provenance.
*
* THE single contract for that column, shared by the repo read-path
* (`parseSource`, the only form validator) and the service, so the persisted
* shape can never be validated weakly in one layer and strongly in another.
* Defined here (a leaf db-types module both already import `AiAgentRole` from) to
* avoid an import cycle between the repo and the service.
*/
export interface RoleSource {
slug: string;
language: string;
version: number;
}
export type InsertableAiAgentRole = Insertable<AiAgentRoles>;
export type UpdatableAiAgentRole = Updateable<Omit<AiAgentRoles, 'id'>>;