fix(ai-chat): add per-workspace rolling-day token budget for anonymous share assistant (#159)
The anonymous public-share assistant only capped the COUNT of requests (100/hour/workspace), not their cost. One accepted turn runs the agent loop up to stepCountIs(5), re-sending the whole client-held transcript as input on every step, while maxOutputTokens caps only the output; the request window is hourly with no daily ceiling, so a steady stream at the cap sustains ~24x its count per day. Counting requests therefore does not bound the owner's LLM bill (red-team finding #5). Add a second cost contour: a cluster-wide, sliding-window per-workspace TOKEN budget over a rolling day. It is checked read-only BEFORE a turn streams (429, no request slot consumed, nothing spent) and the turn's real usage (totalUsage: input re-sent per step + output, summed across all steps) is recorded once it finishes via streamText onFinish. Fails closed on the check (deny when Redis can't prove we're under budget); best-effort on the record. Env-overridable via SHARE_AI_WORKSPACE_TOKEN_BUDGET_PER_DAY (default 1M/day). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -34,6 +34,7 @@ describe('resolveShareAssistantRequest (extracted controller funnel)', () => {
|
|||||||
resolveShareRole?: jest.Mock;
|
resolveShareRole?: jest.Mock;
|
||||||
getShareChatModel?: jest.Mock;
|
getShareChatModel?: jest.Mock;
|
||||||
tryConsumeWorkspaceQuota?: jest.Mock;
|
tryConsumeWorkspaceQuota?: jest.Mock;
|
||||||
|
withinShareTokenBudget?: jest.Mock;
|
||||||
} = {}) {
|
} = {}) {
|
||||||
const aiSettings = {
|
const aiSettings = {
|
||||||
isPublicShareAssistantEnabled: jest
|
isPublicShareAssistantEnabled: jest
|
||||||
@@ -65,6 +66,8 @@ describe('resolveShareAssistantRequest (extracted controller funnel)', () => {
|
|||||||
over.getShareChatModel ?? jest.fn().mockResolvedValue('MODEL'),
|
over.getShareChatModel ?? jest.fn().mockResolvedValue('MODEL'),
|
||||||
tryConsumeWorkspaceQuota:
|
tryConsumeWorkspaceQuota:
|
||||||
over.tryConsumeWorkspaceQuota ?? jest.fn().mockResolvedValue(true),
|
over.tryConsumeWorkspaceQuota ?? jest.fn().mockResolvedValue(true),
|
||||||
|
withinShareTokenBudget:
|
||||||
|
over.withinShareTokenBudget ?? jest.fn().mockResolvedValue(true),
|
||||||
};
|
};
|
||||||
const deps: ShareAssistantDeps = {
|
const deps: ShareAssistantDeps = {
|
||||||
aiSettings: aiSettings as never,
|
aiSettings: aiSettings as never,
|
||||||
@@ -191,6 +194,39 @@ describe('resolveShareAssistantRequest (extracted controller funnel)', () => {
|
|||||||
expect(publicShareChat.tryConsumeWorkspaceQuota).toHaveBeenCalledWith('ws-1');
|
expect(publicShareChat.tryConsumeWorkspaceQuota).toHaveBeenCalledWith('ws-1');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('withinShareTokenBudget false => 429 thrown BEFORE any stream (cost cap, #159 #5)', async () => {
|
||||||
|
const { deps, publicShareChat } = makeDeps({
|
||||||
|
withinShareTokenBudget: jest.fn().mockResolvedValue(false),
|
||||||
|
});
|
||||||
|
expect(await statusOf(deps, body())).toBe(429);
|
||||||
|
expect(publicShareChat.withinShareTokenBudget).toHaveBeenCalledWith('ws-1');
|
||||||
|
// The token budget is the COST backstop: an over-budget workspace must be
|
||||||
|
// rejected WITHOUT consuming a request slot, so the request cap never runs.
|
||||||
|
expect(publicShareChat.tryConsumeWorkspaceQuota).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('the token budget is checked BEFORE the request cap (over-budget wins, no slot spent)', async () => {
|
||||||
|
// Over budget AND the request cap would also reject: the read-only budget
|
||||||
|
// gate must win so the (mutating) request-slot consume is never reached.
|
||||||
|
const { deps, publicShareChat } = makeDeps({
|
||||||
|
withinShareTokenBudget: jest.fn().mockResolvedValue(false),
|
||||||
|
tryConsumeWorkspaceQuota: jest.fn().mockResolvedValue(false),
|
||||||
|
});
|
||||||
|
expect(await statusOf(deps, body())).toBe(429);
|
||||||
|
expect(publicShareChat.tryConsumeWorkspaceQuota).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('the token-budget gate is checked BEFORE the payload caps (429 wins over 413)', async () => {
|
||||||
|
const { deps } = makeDeps({
|
||||||
|
withinShareTokenBudget: jest.fn().mockResolvedValue(false),
|
||||||
|
});
|
||||||
|
const huge = {
|
||||||
|
role: 'user',
|
||||||
|
parts: [{ type: 'text', text: 'x'.repeat(MAX_SHARE_MESSAGE_CHARS + 1) }],
|
||||||
|
};
|
||||||
|
expect(await statusOf(deps, body({ messages: [huge] }))).toBe(429);
|
||||||
|
});
|
||||||
|
|
||||||
it('messages over MAX_SHARE_MESSAGES => 413', async () => {
|
it('messages over MAX_SHARE_MESSAGES => 413', async () => {
|
||||||
const { deps } = makeDeps();
|
const { deps } = makeDeps();
|
||||||
const tooMany = Array.from({ length: MAX_SHARE_MESSAGES + 1 }, () => ({
|
const tooMany = Array.from({ length: MAX_SHARE_MESSAGES + 1 }, () => ({
|
||||||
|
|||||||
@@ -151,6 +151,7 @@ export interface ShareAssistantDeps {
|
|||||||
| 'resolveShareRole'
|
| 'resolveShareRole'
|
||||||
| 'getShareChatModel'
|
| 'getShareChatModel'
|
||||||
| 'tryConsumeWorkspaceQuota'
|
| 'tryConsumeWorkspaceQuota'
|
||||||
|
| 'withinShareTokenBudget'
|
||||||
>;
|
>;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -267,9 +268,21 @@ export async function resolveShareAssistantRequest(
|
|||||||
throw new NotFoundException('Not found');
|
throw new NotFoundException('Not found');
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Per-WORKSPACE anti-abuse cap (IP-independent; defense in depth). Checked
|
// 5a. Per-WORKSPACE rolling-day TOKEN budget (the COST backstop). Read-only and
|
||||||
// BEFORE res.hijack(), so an over-cap workspace gets a clean 429 and spends
|
// checked FIRST so a workspace that has already burned its day's token
|
||||||
// nothing.
|
// budget gets a clean 429 WITHOUT consuming a request slot, and spends
|
||||||
|
// nothing. Counting requests alone does not bound the owner's provider
|
||||||
|
// bill (issue #159, finding #5).
|
||||||
|
if (!(await deps.publicShareChat.withinShareTokenBudget(workspaceId))) {
|
||||||
|
throw new HttpException(
|
||||||
|
'This documentation assistant has reached its usage budget. Please try again later.',
|
||||||
|
HttpStatus.TOO_MANY_REQUESTS,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5b. Per-WORKSPACE anti-abuse request cap (IP-independent; defense in depth).
|
||||||
|
// Checked BEFORE res.hijack(), so an over-cap workspace gets a clean 429
|
||||||
|
// and spends nothing.
|
||||||
if (!(await deps.publicShareChat.tryConsumeWorkspaceQuota(workspaceId))) {
|
if (!(await deps.publicShareChat.tryConsumeWorkspaceQuota(workspaceId))) {
|
||||||
throw new HttpException(
|
throw new HttpException(
|
||||||
'This documentation assistant is temporarily busy. Please try again later.',
|
'This documentation assistant is temporarily busy. Please try again later.',
|
||||||
|
|||||||
@@ -17,7 +17,9 @@ import { buildShareSystemPrompt } from './public-share-chat.prompt';
|
|||||||
import { roleModelOverride } from './roles/role-model-config';
|
import { roleModelOverride } from './roles/role-model-config';
|
||||||
import {
|
import {
|
||||||
PublicShareWorkspaceLimiter,
|
PublicShareWorkspaceLimiter,
|
||||||
|
PublicShareWorkspaceTokenBudget,
|
||||||
createPublicShareWorkspaceLimiter,
|
createPublicShareWorkspaceLimiter,
|
||||||
|
createPublicShareWorkspaceTokenBudget,
|
||||||
} from './public-share-workspace-limiter';
|
} from './public-share-workspace-limiter';
|
||||||
import { describeProviderError } from '../../integrations/ai/ai-error.util';
|
import { describeProviderError } from '../../integrations/ai/ai-error.util';
|
||||||
import {
|
import {
|
||||||
@@ -125,6 +127,16 @@ export class PublicShareChatService {
|
|||||||
*/
|
*/
|
||||||
private readonly workspaceLimiter: PublicShareWorkspaceLimiter;
|
private readonly workspaceLimiter: PublicShareWorkspaceLimiter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* COST contour two: a per-workspace TOKEN budget over a rolling day. The
|
||||||
|
* request-count limiter above bounds how many anonymous calls run; this bounds
|
||||||
|
* how many provider TOKENS they spend (input re-sent per step + output),
|
||||||
|
* which is what the owner is actually billed for (issue #159, finding #5).
|
||||||
|
* Checked read-only before a turn streams; the real usage is recorded once the
|
||||||
|
* turn finishes (`onFinish`).
|
||||||
|
*/
|
||||||
|
private readonly tokenBudget: PublicShareWorkspaceTokenBudget;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly ai: AiService,
|
private readonly ai: AiService,
|
||||||
private readonly aiSettings: AiSettingsService,
|
private readonly aiSettings: AiSettingsService,
|
||||||
@@ -133,6 +145,7 @@ export class PublicShareChatService {
|
|||||||
private readonly aiAgentRoleRepo: AiAgentRoleRepo,
|
private readonly aiAgentRoleRepo: AiAgentRoleRepo,
|
||||||
) {
|
) {
|
||||||
this.workspaceLimiter = createPublicShareWorkspaceLimiter(redisService);
|
this.workspaceLimiter = createPublicShareWorkspaceLimiter(redisService);
|
||||||
|
this.tokenBudget = createPublicShareWorkspaceTokenBudget(redisService);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -144,6 +157,25 @@ export class PublicShareChatService {
|
|||||||
return this.workspaceLimiter.tryConsume(workspaceId);
|
return this.workspaceLimiter.tryConsume(workspaceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read-only pre-stream COST gate: true while the workspace is under its
|
||||||
|
* rolling-day token budget, false once the trailing-day token spend has
|
||||||
|
* reached it (the controller must then 429 BEFORE starting the stream). This
|
||||||
|
* bounds the owner's actual provider bill, which counting requests alone does
|
||||||
|
* not (issue #159, finding #5).
|
||||||
|
*/
|
||||||
|
async withinShareTokenBudget(workspaceId: string): Promise<boolean> {
|
||||||
|
return this.tokenBudget.withinBudget(workspaceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a finished turn's real token spend against the rolling-day budget.
|
||||||
|
* Best-effort (the turn already ran): failures are swallowed by the budget.
|
||||||
|
*/
|
||||||
|
async recordShareTokens(workspaceId: string, tokens: number): Promise<void> {
|
||||||
|
return this.tokenBudget.record(workspaceId, tokens);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resolve the admin-selected agent role for the anonymous public-share
|
* Resolve the admin-selected agent role for the anonymous public-share
|
||||||
* assistant, scoped to the workspace and soft-delete aware. Returns null when
|
* assistant, scoped to the workspace and soft-delete aware. Returns null when
|
||||||
@@ -231,6 +263,18 @@ export class PublicShareChatService {
|
|||||||
// bill even if the per-IP throttle is evaded; worst case = steps × this.
|
// bill even if the per-IP throttle is evaded; worst case = steps × this.
|
||||||
maxOutputTokens: resolveShareAiMaxOutputTokens(),
|
maxOutputTokens: resolveShareAiMaxOutputTokens(),
|
||||||
abortSignal: signal,
|
abortSignal: signal,
|
||||||
|
onFinish: ({ totalUsage }) => {
|
||||||
|
// Account the turn's REAL token spend (input re-sent per step + output,
|
||||||
|
// summed across all steps) against the per-workspace rolling-day budget
|
||||||
|
// so a future turn over budget is rejected up front (issue #159 #5).
|
||||||
|
// totalUsage fields are `number | undefined`; fall back to the sum of
|
||||||
|
// input+output when the provider omits totalTokens. Fire-and-forget:
|
||||||
|
// the turn already streamed, so a record failure must not break it.
|
||||||
|
const u = totalUsage ?? ({} as typeof totalUsage);
|
||||||
|
const tokens =
|
||||||
|
u?.totalTokens ?? (u?.inputTokens ?? 0) + (u?.outputTokens ?? 0);
|
||||||
|
void this.recordShareTokens(workspaceId, tokens);
|
||||||
|
},
|
||||||
onError: ({ error }) => {
|
onError: ({ error }) => {
|
||||||
// Reuse the shared formatter so provider error formatting stays
|
// Reuse the shared formatter so provider error formatting stays
|
||||||
// unified (statusCode + body) with the authenticated path.
|
// unified (statusCode + body) with the authenticated path.
|
||||||
|
|||||||
@@ -11,8 +11,11 @@ import {
|
|||||||
import { PublicShareChatToolsService } from './tools/public-share-chat-tools.service';
|
import { PublicShareChatToolsService } from './tools/public-share-chat-tools.service';
|
||||||
import {
|
import {
|
||||||
PublicShareWorkspaceLimiter,
|
PublicShareWorkspaceLimiter,
|
||||||
|
PublicShareWorkspaceTokenBudget,
|
||||||
resolveShareAiWorkspaceMax,
|
resolveShareAiWorkspaceMax,
|
||||||
|
resolveShareAiWorkspaceTokenBudget,
|
||||||
SHARE_AI_WORKSPACE_MAX_PER_WINDOW,
|
SHARE_AI_WORKSPACE_MAX_PER_WINDOW,
|
||||||
|
SHARE_AI_WORKSPACE_TOKEN_BUDGET_DEFAULT,
|
||||||
} from './public-share-workspace-limiter';
|
} from './public-share-workspace-limiter';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -546,6 +549,185 @@ describe('PublicShareWorkspaceLimiter (cluster-wide sliding-window per-workspace
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In-memory fake of the ioredis slice the TOKEN budget uses. Unlike the request
|
||||||
|
* limiter (one Lua), the budget runs TWO scripts over the same sorted set:
|
||||||
|
* - the read-only CHECK (sums the token counts encoded as each member's leading
|
||||||
|
* integer, admits while the sum is under budget, never mutates), and
|
||||||
|
* - the RECORD (ZADDs a finished turn's `<tokens>:<unique>` member).
|
||||||
|
* The fake faithfully reproduces both (branching on the script body) so the spec
|
||||||
|
* exercises the REAL budget math, not a re-implementation.
|
||||||
|
*/
|
||||||
|
class FakeTokenRedis {
|
||||||
|
private sets = new Map<string, Array<{ score: number; member: string }>>();
|
||||||
|
|
||||||
|
async eval(
|
||||||
|
script: string,
|
||||||
|
_numKeys: number,
|
||||||
|
key: string,
|
||||||
|
nowStr: string,
|
||||||
|
windowMsStr: string,
|
||||||
|
arg3: string,
|
||||||
|
): Promise<number> {
|
||||||
|
const now = Number(nowStr);
|
||||||
|
const windowMs = Number(windowMsStr);
|
||||||
|
const cutoff = now - windowMs;
|
||||||
|
const arr = (this.sets.get(key) ?? []).filter((e) => e.score > cutoff);
|
||||||
|
if (script.includes('ZADD')) {
|
||||||
|
// RECORD: arg3 is the `<tokens>:<unique>` member; append at score=now.
|
||||||
|
arr.push({ score: now, member: arg3 });
|
||||||
|
this.sets.set(key, arr);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
// CHECK: arg3 is the budget; sum the leading integer of each survivor.
|
||||||
|
const budget = Number(arg3);
|
||||||
|
this.sets.set(key, arr);
|
||||||
|
const total = arr.reduce((sum, e) => {
|
||||||
|
const m = /^(\d+)/.exec(e.member);
|
||||||
|
return sum + (m ? Number(m[1]) : 0);
|
||||||
|
}, 0);
|
||||||
|
return total >= budget ? 0 : 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeTokenBudget(budget: number, windowMs: number, clock: () => number) {
|
||||||
|
const redis = new FakeTokenRedis() as unknown as import('ioredis').Redis;
|
||||||
|
return new PublicShareWorkspaceTokenBudget(redis, budget, windowMs, clock);
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('resolveShareAiWorkspaceTokenBudget (env-overridable per-day token budget)', () => {
|
||||||
|
const KEY = 'SHARE_AI_WORKSPACE_TOKEN_BUDGET_PER_DAY';
|
||||||
|
const saved = process.env[KEY];
|
||||||
|
afterEach(() => {
|
||||||
|
if (saved === undefined) delete process.env[KEY];
|
||||||
|
else process.env[KEY] = saved;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('falls back to the default when unset', () => {
|
||||||
|
delete process.env[KEY];
|
||||||
|
expect(resolveShareAiWorkspaceTokenBudget()).toBe(
|
||||||
|
SHARE_AI_WORKSPACE_TOKEN_BUDGET_DEFAULT,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('honors a positive override', () => {
|
||||||
|
process.env[KEY] = '250000';
|
||||||
|
expect(resolveShareAiWorkspaceTokenBudget()).toBe(250000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('ignores a non-positive / unparseable value (uses the default)', () => {
|
||||||
|
for (const bad of ['0', '-5', 'nope', '']) {
|
||||||
|
process.env[KEY] = bad;
|
||||||
|
expect(resolveShareAiWorkspaceTokenBudget()).toBe(
|
||||||
|
SHARE_AI_WORKSPACE_TOKEN_BUDGET_DEFAULT,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('PublicShareWorkspaceTokenBudget (cluster-wide rolling-day token cap)', () => {
|
||||||
|
it('admits while under budget and rejects once the recorded spend reaches it', async () => {
|
||||||
|
const budget = makeTokenBudget(1000, 60_000, () => 1_000);
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(true); // nothing spent yet
|
||||||
|
await budget.record('ws-1', 600);
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(true); // 600 < 1000
|
||||||
|
await budget.record('ws-1', 400);
|
||||||
|
// 1000 >= 1000: the budget is exhausted, so the next turn is rejected up front.
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('counts TOKENS, not requests: one fat turn can exhaust the budget alone', async () => {
|
||||||
|
const budget = makeTokenBudget(1000, 60_000, () => 1_000);
|
||||||
|
// A single accepted turn re-sends the whole transcript across 5 steps; here
|
||||||
|
// it lands as 1200 tokens — already over the day budget on its own.
|
||||||
|
await budget.record('ws-1', 1200);
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('ages out spend older than the window so the budget recovers', async () => {
|
||||||
|
let now = 0;
|
||||||
|
const budget = makeTokenBudget(1000, 60_000, () => now);
|
||||||
|
await budget.record('ws-1', 1000); // at budget
|
||||||
|
now += 59_999; // still inside the day window
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(false);
|
||||||
|
now += 2; // the spend is now strictly older than windowMs
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('ignores non-positive / non-finite usage (never records phantom spend)', async () => {
|
||||||
|
const budget = makeTokenBudget(1000, 60_000, () => 1_000);
|
||||||
|
await budget.record('ws-1', 0);
|
||||||
|
await budget.record('ws-1', -50);
|
||||||
|
await budget.record('ws-1', Number.NaN);
|
||||||
|
await budget.record('ws-1', Infinity);
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(true); // nothing accumulated
|
||||||
|
});
|
||||||
|
|
||||||
|
it('keeps separate budgets per workspace', async () => {
|
||||||
|
const budget = makeTokenBudget(500, 60_000, () => 1_000);
|
||||||
|
await budget.record('ws-a', 500); // ws-a exhausted
|
||||||
|
expect(await budget.withinBudget('ws-a')).toBe(false);
|
||||||
|
expect(await budget.withinBudget('ws-b')).toBe(true); // ws-b untouched
|
||||||
|
});
|
||||||
|
|
||||||
|
it('FAILS CLOSED on the read-only check when Redis rejects', async () => {
|
||||||
|
const failingRedis = {
|
||||||
|
eval: () => Promise.reject(new Error('redis down')),
|
||||||
|
} as unknown as import('ioredis').Redis;
|
||||||
|
const budget = new PublicShareWorkspaceTokenBudget(
|
||||||
|
failingRedis,
|
||||||
|
1000,
|
||||||
|
60_000,
|
||||||
|
() => 1_000,
|
||||||
|
);
|
||||||
|
const errSpy = jest
|
||||||
|
.spyOn(Logger.prototype, 'error')
|
||||||
|
.mockImplementation(() => undefined);
|
||||||
|
expect(await budget.withinBudget('ws-1')).toBe(false);
|
||||||
|
expect(errSpy).toHaveBeenCalled();
|
||||||
|
errSpy.mockRestore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('SWALLOWS a record failure (best-effort post-accounting, never throws)', async () => {
|
||||||
|
// The turn already streamed; a record failure must not surface to the caller.
|
||||||
|
const failingRedis = {
|
||||||
|
eval: () => Promise.reject(new Error('redis down')),
|
||||||
|
} as unknown as import('ioredis').Redis;
|
||||||
|
const budget = new PublicShareWorkspaceTokenBudget(
|
||||||
|
failingRedis,
|
||||||
|
1000,
|
||||||
|
60_000,
|
||||||
|
() => 1_000,
|
||||||
|
);
|
||||||
|
const errSpy = jest
|
||||||
|
.spyOn(Logger.prototype, 'error')
|
||||||
|
.mockImplementation(() => undefined);
|
||||||
|
await expect(budget.record('ws-1', 100)).resolves.toBeUndefined();
|
||||||
|
expect(errSpy).toHaveBeenCalled();
|
||||||
|
errSpy.mockRestore();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('PublicShareChatService.withinShareTokenBudget / recordShareTokens', () => {
|
||||||
|
it('delegates the cost gate + accounting to the redis-backed token budget', async () => {
|
||||||
|
const redis = new FakeTokenRedis();
|
||||||
|
const redisService = { getOrThrow: () => redis } as never;
|
||||||
|
const service = new PublicShareChatService(
|
||||||
|
{} as never,
|
||||||
|
{} as never,
|
||||||
|
{} as never,
|
||||||
|
redisService,
|
||||||
|
{} as never,
|
||||||
|
);
|
||||||
|
// Default budget is large, so a fresh workspace is under budget; recording a
|
||||||
|
// modest spend keeps it under budget (asserts the wiring the controller +
|
||||||
|
// onFinish rely on).
|
||||||
|
expect(await service.withinShareTokenBudget('ws-1')).toBe(true);
|
||||||
|
await service.recordShareTokens('ws-1', 1234);
|
||||||
|
expect(await service.withinShareTokenBudget('ws-1')).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('PublicShareChatService.tryConsumeWorkspaceQuota', () => {
|
describe('PublicShareChatService.tryConsumeWorkspaceQuota', () => {
|
||||||
it('delegates to the redis-backed per-workspace limiter', async () => {
|
it('delegates to the redis-backed per-workspace limiter', async () => {
|
||||||
const redis = new FakeRedis();
|
const redis = new FakeRedis();
|
||||||
|
|||||||
@@ -136,6 +136,177 @@ export class PublicShareWorkspaceLimiter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SECOND cost contour: a per-workspace TOKEN budget over a rolling DAY.
|
||||||
|
*
|
||||||
|
* The request-count cap above bounds how MANY anonymous calls a workspace
|
||||||
|
* admits, but NOT how expensive each one is: one accepted call runs the agent
|
||||||
|
* loop up to `stepCountIs(5)`, and every step re-sends the WHOLE client-held
|
||||||
|
* transcript (~hundreds of KB) as input, so the provider input alone can be tens
|
||||||
|
* of thousands of tokens PER step while `maxOutputTokens` only caps the output.
|
||||||
|
* The request cap is also hourly with no daily ceiling, so a steady stream at
|
||||||
|
* the hourly cap sustains ~24x its count per day. Counting requests therefore
|
||||||
|
* does not bound the owner's actual LLM bill (issue #159, finding #5).
|
||||||
|
*
|
||||||
|
* This contour caps the SPEND directly: the actual tokens consumed (input +
|
||||||
|
* output, summed across all steps of every accepted turn) over the trailing
|
||||||
|
* `windowMs` (one rolling day) must stay under `budget`. It is checked BEFORE a
|
||||||
|
* turn streams (read-only) and the turn's real usage is recorded AFTER it
|
||||||
|
* finishes (`streamText` onFinish). Like the request cap it is cluster-wide
|
||||||
|
* (shared Redis) and uses a sliding-window LOG so the day boundary cannot be
|
||||||
|
* gamed for a 2x burst.
|
||||||
|
*
|
||||||
|
* Pre-check is read-only, so a turn already over budget is rejected, but the
|
||||||
|
* tokens of an in-flight turn are not yet known and are accounted only once it
|
||||||
|
* finishes. The worst-case overshoot past the budget is therefore one turn
|
||||||
|
* (bounded by steps x (maxOutputTokens + transcript size)) — acceptable for a
|
||||||
|
* cost backstop on an optional anonymous assistant.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** Default per-workspace token budget over the rolling day. */
|
||||||
|
export const SHARE_AI_WORKSPACE_TOKEN_BUDGET_DEFAULT = 1_000_000;
|
||||||
|
/** Default token-budget window length: one rolling day. */
|
||||||
|
export const SHARE_AI_WORKSPACE_TOKEN_WINDOW_MS = 24 * 60 * 60 * 1000;
|
||||||
|
|
||||||
|
/** Redis key namespace for the per-workspace token-spend sliding-window log. */
|
||||||
|
const TOKEN_KEY_PREFIX = 'share-ai:ws-tokens:';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read-only sliding-window token-budget check.
|
||||||
|
*
|
||||||
|
* KEYS[1] = the per-workspace token sorted-set key
|
||||||
|
* ARGV[1] = now (epoch ms)
|
||||||
|
* ARGV[2] = windowMs
|
||||||
|
* ARGV[3] = budget (max tokens in the trailing window)
|
||||||
|
*
|
||||||
|
* Drops entries older than the window, then sums the token counts encoded as the
|
||||||
|
* leading integer of each surviving member. Returns 1 if the running total is
|
||||||
|
* still UNDER budget (admit), 0 once it has reached/exceeded the budget. Does NOT
|
||||||
|
* add anything — the turn's real usage is recorded separately once it finishes.
|
||||||
|
*/
|
||||||
|
const TOKEN_BUDGET_CHECK_LUA = `
|
||||||
|
local key = KEYS[1]
|
||||||
|
local now = tonumber(ARGV[1])
|
||||||
|
local windowMs = tonumber(ARGV[2])
|
||||||
|
local budget = tonumber(ARGV[3])
|
||||||
|
redis.call('ZREMRANGEBYSCORE', key, 0, now - windowMs)
|
||||||
|
local members = redis.call('ZRANGE', key, 0, -1)
|
||||||
|
local total = 0
|
||||||
|
for i = 1, #members do
|
||||||
|
local t = tonumber(string.match(members[i], '^(%d+)'))
|
||||||
|
if t then total = total + t end
|
||||||
|
end
|
||||||
|
if total >= budget then
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
return 1
|
||||||
|
`;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record one finished turn's token spend in the sliding-window log.
|
||||||
|
*
|
||||||
|
* KEYS[1] = the per-workspace token sorted-set key
|
||||||
|
* ARGV[1] = now (epoch ms) — the entry score
|
||||||
|
* ARGV[2] = windowMs
|
||||||
|
* ARGV[3] = member (`<tokens>:<unique>`; the leading integer is the token count)
|
||||||
|
*
|
||||||
|
* Always ZADDs (the turn already ran and spent the tokens) and refreshes the
|
||||||
|
* key TTL so idle workspaces cost no memory. Trims expired entries first so the
|
||||||
|
* set never grows unbounded for a busy workspace.
|
||||||
|
*/
|
||||||
|
const TOKEN_RECORD_LUA = `
|
||||||
|
local key = KEYS[1]
|
||||||
|
local now = tonumber(ARGV[1])
|
||||||
|
local windowMs = tonumber(ARGV[2])
|
||||||
|
local member = ARGV[3]
|
||||||
|
redis.call('ZREMRANGEBYSCORE', key, 0, now - windowMs)
|
||||||
|
redis.call('ZADD', key, now, member)
|
||||||
|
redis.call('PEXPIRE', key, windowMs)
|
||||||
|
return 1
|
||||||
|
`;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cluster-wide, sliding-window per-workspace TOKEN budget backed by Redis.
|
||||||
|
* `withinBudget(key)` is a read-only pre-stream gate; `record(key, tokens)`
|
||||||
|
* accounts a finished turn's real usage. Decoupled from NestJS so it is testable
|
||||||
|
* against a mocked/real ioredis client, mirroring the request-count limiter.
|
||||||
|
*/
|
||||||
|
export class PublicShareWorkspaceTokenBudget {
|
||||||
|
private readonly logger = new Logger(PublicShareWorkspaceTokenBudget.name);
|
||||||
|
private counter = 0;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly redis: Redis,
|
||||||
|
private readonly budget: number = SHARE_AI_WORKSPACE_TOKEN_BUDGET_DEFAULT,
|
||||||
|
private readonly windowMs: number = SHARE_AI_WORKSPACE_TOKEN_WINDOW_MS,
|
||||||
|
private readonly now: () => number = Date.now,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read-only pre-stream check. Returns true while the workspace is under its
|
||||||
|
* rolling-day token budget, false once the trailing-window spend has reached
|
||||||
|
* it (caller must then 429 BEFORE streaming any tokens).
|
||||||
|
*
|
||||||
|
* FAILS CLOSED (false) on a Redis error: identical reasoning to the request
|
||||||
|
* limiter — when we cannot prove the workspace is under budget we DENY rather
|
||||||
|
* than admit an unmetered billable call. The assistant is optional, so a
|
||||||
|
* transient Redis blip briefly disabling it beats an unbounded provider bill.
|
||||||
|
*/
|
||||||
|
async withinBudget(key: string): Promise<boolean> {
|
||||||
|
const t = this.now();
|
||||||
|
try {
|
||||||
|
const admitted = await this.redis.eval(
|
||||||
|
TOKEN_BUDGET_CHECK_LUA,
|
||||||
|
1,
|
||||||
|
TOKEN_KEY_PREFIX + key,
|
||||||
|
String(t),
|
||||||
|
String(this.windowMs),
|
||||||
|
String(this.budget),
|
||||||
|
);
|
||||||
|
return admitted === 1;
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`share-ai token budget Redis failure for key "${key}"; failing closed`,
|
||||||
|
err as Error,
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a finished turn's token spend. Best-effort: the turn already ran, so
|
||||||
|
* a Redis failure here is logged but not propagated — it would only cause a
|
||||||
|
* slight under-count of the running budget, never a wrong answer to the
|
||||||
|
* caller. Non-positive / non-finite usage is ignored.
|
||||||
|
*/
|
||||||
|
async record(key: string, tokens: number): Promise<void> {
|
||||||
|
if (!Number.isFinite(tokens) || tokens <= 0) return;
|
||||||
|
const spend = Math.floor(tokens);
|
||||||
|
const t = this.now();
|
||||||
|
// Member: `<tokens>:<unique>` — the check Lua sums the leading integer, and
|
||||||
|
// the unique suffix keeps distinct turns in the same ms from colliding on
|
||||||
|
// the sorted-set member (which would drop one entry and under-count).
|
||||||
|
const member = `${spend}:${t}-${this.counter++}-${Math.random()
|
||||||
|
.toString(36)
|
||||||
|
.slice(2)}`;
|
||||||
|
try {
|
||||||
|
await this.redis.eval(
|
||||||
|
TOKEN_RECORD_LUA,
|
||||||
|
1,
|
||||||
|
TOKEN_KEY_PREFIX + key,
|
||||||
|
String(t),
|
||||||
|
String(this.windowMs),
|
||||||
|
member,
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`share-ai token budget record failure for key "${key}" (${spend} tokens); ignoring`,
|
||||||
|
err as Error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read the per-workspace cap from the environment (overridable seam), falling
|
* Read the per-workspace cap from the environment (overridable seam), falling
|
||||||
* back to the sane default. A non-positive / unparseable value uses the default.
|
* back to the sane default. A non-positive / unparseable value uses the default.
|
||||||
@@ -162,3 +333,31 @@ export function createPublicShareWorkspaceLimiter(
|
|||||||
SHARE_AI_WORKSPACE_WINDOW_MS,
|
SHARE_AI_WORKSPACE_WINDOW_MS,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the per-workspace rolling-day token budget from the environment
|
||||||
|
* (overridable seam), falling back to the sane default. A non-positive /
|
||||||
|
* unparseable value uses the default.
|
||||||
|
*/
|
||||||
|
export function resolveShareAiWorkspaceTokenBudget(): number {
|
||||||
|
const raw = Number(process.env.SHARE_AI_WORKSPACE_TOKEN_BUDGET_PER_DAY);
|
||||||
|
return Number.isFinite(raw) && raw > 0
|
||||||
|
? Math.floor(raw)
|
||||||
|
: SHARE_AI_WORKSPACE_TOKEN_BUDGET_DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the per-workspace token budget from the injected RedisService (the same
|
||||||
|
* global ioredis client used by the request-count limiter). Tiny factory so the
|
||||||
|
* service constructor stays declarative and the budget stays unit-testable with
|
||||||
|
* a hand-rolled fake redis.
|
||||||
|
*/
|
||||||
|
export function createPublicShareWorkspaceTokenBudget(
|
||||||
|
redisService: RedisService,
|
||||||
|
): PublicShareWorkspaceTokenBudget {
|
||||||
|
return new PublicShareWorkspaceTokenBudget(
|
||||||
|
redisService.getOrThrow(),
|
||||||
|
resolveShareAiWorkspaceTokenBudget(),
|
||||||
|
SHARE_AI_WORKSPACE_TOKEN_WINDOW_MS,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user