feat(ai-chat): realtime token counter + reasoning tokens, Claude-Code style (#151)
Tokens were only counted post-hoc (onFinish) and the header badge updated only on
chat open/switch; reasoning wasn't requested or shown. Now a counter ticks LIVE
during generation and surfaces reasoning ("thinking") tokens separately, like
Claude Code's `Thinking… · N tokens`.
Architecture (AI SDK v6): no provider gives exact per-token usage mid-stream, so
the live number is a cheap client estimate (chars/≈4) reconciled to AUTHORITATIVE
provider usage at step boundaries and turn end. The useChat per-delta re-render is
the existing realtime engine.
- server: `chatStreamMetadata` now also forwards usage on `finish-step` + `finish`;
`sendReasoning: true`; persisted `metadata.usage` carries `reasoningTokens`
(normalized from `outputTokenDetails` or the deprecated field).
- client: pure `count-stream-tokens` (estimateTokens / liveTurnTokens, prefers
authoritative usage else estimate); `Thinking… · N tokens` in the typing
indicator; collapsible "Thinking" reasoning block; throttled (~8 Hz) live
turn-token header badge; `reasoningTokens` in types + Markdown export.
Review fixes folded in:
- v6 `finish-step.usage` is PER-STEP, not cumulative — the server now ACCUMULATES
a running sum (new pure `accumulateStepUsage`) and sends the cumulative, which
converges to `finish.totalUsage`, so the live counter never jumps DOWN on a
multi-step agent turn.
- reasoning double-count: the authoritative turn-total is attributed to a block
ONLY for a single-reasoning-part (one-step) turn; multi-step blocks each show
their own estimate (the authoritative total stays in the header).
- no "0" badge flash at turn start (require live > 0, else show context size).
- comment refreshed (finish-step trigger).
Tests: server `accumulateStepUsage` + updated `chatStreamMetadata` (34 in the
suite); client pure-fn tests. Both tsc clean; 162 client ai-chat + the ai-chat
server suite pass.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -5,7 +5,8 @@ import {
|
||||
rowToUiMessage,
|
||||
prepareAgentStep,
|
||||
buildPartialAssistantRecord,
|
||||
chatStreamStartMetadata,
|
||||
chatStreamMetadata,
|
||||
accumulateStepUsage,
|
||||
MAX_AGENT_STEPS,
|
||||
FINAL_STEP_INSTRUCTION,
|
||||
} from './ai-chat.service';
|
||||
@@ -298,18 +299,135 @@ describe('buildPartialAssistantRecord', () => {
|
||||
});
|
||||
|
||||
/**
|
||||
* chatStreamStartMetadata: attach the authoritative chatId to the streamed
|
||||
* assistant UI message ONLY on the `start` part (so the client adopts the real
|
||||
* created chat id at the first chunk — see #137). Any non-start part adds none.
|
||||
* chatStreamMetadata: attach metadata to the streamed assistant UI message per
|
||||
* part type — `chatId` on `start` (so the client adopts the real created chat id
|
||||
* at the first chunk — see #137), and AUTHORITATIVE usage (incl. reasoning
|
||||
* tokens) on `finish-step` and `finish` so the client's live token counter snaps
|
||||
* to exact at each step/turn boundary.
|
||||
*/
|
||||
describe('chatStreamStartMetadata', () => {
|
||||
describe('chatStreamMetadata', () => {
|
||||
it('returns { chatId } for the start part', () => {
|
||||
expect(chatStreamStartMetadata({ type: 'start' }, 'chat-1')).toEqual({
|
||||
expect(chatStreamMetadata({ type: 'start' }, 'chat-1')).toEqual({
|
||||
chatId: 'chat-1',
|
||||
});
|
||||
});
|
||||
|
||||
it('returns undefined for a finish part (any non-start part)', () => {
|
||||
expect(chatStreamStartMetadata({ type: 'finish' }, 'chat-1')).toBeUndefined();
|
||||
it('returns the CUMULATIVE step usage passed in for the finish-step part', () => {
|
||||
// finish-step usage is per-step in v6; the caller accumulates and passes the
|
||||
// running sum, which this just wraps.
|
||||
expect(
|
||||
chatStreamMetadata(
|
||||
{ type: 'finish-step', usage: { outputTokens: 100 } },
|
||||
'chat-1',
|
||||
{ inputTokens: 500, outputTokens: 220, totalTokens: 720, reasoningTokens: 30 },
|
||||
),
|
||||
).toEqual({
|
||||
usage: { inputTokens: 500, outputTokens: 220, totalTokens: 720, reasoningTokens: 30 },
|
||||
});
|
||||
});
|
||||
|
||||
it('returns turn usage for the finish part (reasoning from deprecated top-level field)', () => {
|
||||
expect(
|
||||
chatStreamMetadata(
|
||||
{
|
||||
type: 'finish',
|
||||
totalUsage: {
|
||||
inputTokens: 1000,
|
||||
outputTokens: 250,
|
||||
totalTokens: 1250,
|
||||
reasoningTokens: 50,
|
||||
},
|
||||
},
|
||||
'chat-1',
|
||||
),
|
||||
).toEqual({
|
||||
usage: {
|
||||
inputTokens: 1000,
|
||||
outputTokens: 250,
|
||||
totalTokens: 1250,
|
||||
reasoningTokens: 50,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('prefers outputTokenDetails.reasoningTokens over the deprecated field (finish)', () => {
|
||||
expect(
|
||||
chatStreamMetadata(
|
||||
{
|
||||
type: 'finish',
|
||||
totalUsage: {
|
||||
outputTokens: 100,
|
||||
reasoningTokens: 5,
|
||||
outputTokenDetails: { reasoningTokens: 30 },
|
||||
},
|
||||
},
|
||||
'chat-1',
|
||||
),
|
||||
).toEqual({
|
||||
usage: {
|
||||
inputTokens: undefined,
|
||||
outputTokens: 100,
|
||||
totalTokens: undefined,
|
||||
reasoningTokens: 30,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('returns undefined for a finish-step with no accumulated usage', () => {
|
||||
expect(
|
||||
chatStreamMetadata({ type: 'finish-step' }, 'chat-1'),
|
||||
).toBeUndefined();
|
||||
});
|
||||
|
||||
it('returns undefined for an unrelated part (e.g. text-delta)', () => {
|
||||
expect(
|
||||
chatStreamMetadata({ type: 'text-delta' }, 'chat-1'),
|
||||
).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* accumulateStepUsage: sums per-step usage into a running cumulative total so the
|
||||
* client never sees the live counter jump DOWN on a multi-step agent turn (#151).
|
||||
*/
|
||||
describe('accumulateStepUsage', () => {
|
||||
it('sums every field across two steps', () => {
|
||||
expect(
|
||||
accumulateStepUsage(
|
||||
{ inputTokens: 500, outputTokens: 100, totalTokens: 600, reasoningTokens: 30 },
|
||||
{ inputTokens: 520, outputTokens: 80, totalTokens: 600, reasoningTokens: 10 },
|
||||
),
|
||||
).toEqual({
|
||||
inputTokens: 1020,
|
||||
outputTokens: 180,
|
||||
totalTokens: 1200,
|
||||
reasoningTokens: 40,
|
||||
});
|
||||
});
|
||||
|
||||
it('returns the step as-is when there is no accumulator yet', () => {
|
||||
expect(accumulateStepUsage(undefined, { outputTokens: 10 })).toEqual({
|
||||
outputTokens: 10,
|
||||
});
|
||||
});
|
||||
|
||||
it('returns the accumulator unchanged when the step usage is absent', () => {
|
||||
const acc = { outputTokens: 10 };
|
||||
expect(accumulateStepUsage(acc, undefined)).toBe(acc);
|
||||
});
|
||||
|
||||
it('returns undefined when both sides are absent', () => {
|
||||
expect(accumulateStepUsage(undefined, undefined)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('keeps a field undefined only when neither side has it', () => {
|
||||
expect(
|
||||
accumulateStepUsage({ outputTokens: 5 }, { outputTokens: 7 }),
|
||||
).toEqual({
|
||||
inputTokens: undefined,
|
||||
outputTokens: 12,
|
||||
totalTokens: undefined,
|
||||
reasoningTokens: undefined,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -420,7 +420,11 @@ export class AiChatService {
|
||||
toolCalls: serializeSteps(steps),
|
||||
metadata: {
|
||||
finishReason,
|
||||
usage: totalUsage,
|
||||
// Persist the turn's cumulative usage WITH reasoning tokens resolved
|
||||
// from either the new `outputTokenDetails` or the deprecated top-level
|
||||
// field, so reopened history / the Markdown export show the thinking
|
||||
// token cost too.
|
||||
usage: normalizeStreamUsage(totalUsage as StreamUsage) ?? totalUsage,
|
||||
// Final-step usage = the context actually fed to the model on the last LLM
|
||||
// call (full history + tool results) plus the answer it just generated.
|
||||
// input+output of the FINAL step ≈ the conversation's CURRENT context size,
|
||||
@@ -512,17 +516,42 @@ export class AiChatService {
|
||||
// does not buffer responses by default.
|
||||
// Scrub the SDK's hop-by-hop Connection header before it writes the head (Safari/HTTP2).
|
||||
stripStreamingHopByHopHeaders(res.raw);
|
||||
// Running sum of per-step usage (v6 `finish-step.usage` is per-step). Sent
|
||||
// as the cumulative authoritative usage so the client never jumps DOWN.
|
||||
let cumulativeStepUsage: ChatStreamUsage | undefined;
|
||||
result.pipeUIMessageStreamToResponse(res.raw, {
|
||||
headers: { 'X-Accel-Buffering': 'no' },
|
||||
// Surface the authoritative chatId on the streamed assistant UI message so
|
||||
// the client adopts the REAL id of the row we created, instead of guessing
|
||||
// the newest chat in its list. `messageMetadata` is invoked by the AI SDK
|
||||
// on the `start` and `finish` stream parts (ai@6); we attach `chatId` on the
|
||||
// `start` part so it reaches the client (as message.metadata.chatId) at the
|
||||
// very first chunk — before any second tab can race a newer chat into the
|
||||
// list. This fixes the two-tab "adoption race" (#137) where a new chat in
|
||||
// tab A could adopt tab B's id and leak its turns into the wrong row.
|
||||
messageMetadata: ({ part }) => chatStreamStartMetadata(part, chatId),
|
||||
// on the `start`, `finish-step` and `finish` stream parts (ai@6 — note the
|
||||
// `finish-step` trigger relies on it being delivered as its own
|
||||
// message-metadata chunk); we attach `chatId` on the `start` part so it
|
||||
// reaches the client (as message.metadata.chatId) at the very first chunk —
|
||||
// before any second tab can race a newer chat into the list. This fixes the
|
||||
// two-tab "adoption race" (#137).
|
||||
//
|
||||
// `finish-step.usage` is PER-STEP (not cumulative) in v6, and the client
|
||||
// merges each metadata.usage by replacement — so on a multi-step agent turn
|
||||
// (up to MAX_AGENT_STEPS) the naive per-step value would make the live
|
||||
// counter jump DOWN at each boundary. We keep a running sum here and send
|
||||
// the CUMULATIVE usage, which converges to `finish.totalUsage` (#151).
|
||||
messageMetadata: ({ part }) => {
|
||||
const p = part as StreamMetadataPart;
|
||||
if (p.type === 'finish-step') {
|
||||
cumulativeStepUsage = accumulateStepUsage(
|
||||
cumulativeStepUsage,
|
||||
normalizeStreamUsage(p.usage),
|
||||
);
|
||||
}
|
||||
return chatStreamMetadata(p, chatId, cumulativeStepUsage);
|
||||
},
|
||||
// Stream reasoning (thinking) parts to the client so the live counter can
|
||||
// estimate reasoning tokens from streamed text. v6 default is already
|
||||
// true; set explicitly so the intent survives any future SDK default
|
||||
// change. Providers that don't emit reasoning text still surface the
|
||||
// count via the authoritative `usage.reasoningTokens` on finish-step.
|
||||
sendReasoning: true,
|
||||
onError: (error: unknown) => {
|
||||
// Reuse the shared formatter so provider error formatting stays
|
||||
// unified between the log line and the streamed error message.
|
||||
@@ -573,16 +602,97 @@ export class AiChatService {
|
||||
}
|
||||
}
|
||||
|
||||
/** Shape of the AI SDK v6 LanguageModelUsage we forward to the client. The SDK
|
||||
* exposes `reasoningTokens` both as a (deprecated) top-level field and under
|
||||
* `outputTokenDetails.reasoningTokens`; we normalize to a single field so the
|
||||
* client gets one stable usage shape regardless of provider/SDK version. */
|
||||
interface StreamUsage {
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
totalTokens?: number;
|
||||
reasoningTokens?: number;
|
||||
outputTokenDetails?: { reasoningTokens?: number };
|
||||
}
|
||||
|
||||
/** A streamed part the messageMetadata callback can receive (only the fields we read). */
|
||||
interface StreamMetadataPart {
|
||||
type: string;
|
||||
usage?: StreamUsage;
|
||||
totalUsage?: StreamUsage;
|
||||
}
|
||||
|
||||
/** Authoritative usage we attach to a streamed assistant message's metadata. */
|
||||
export interface ChatStreamUsage {
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
totalTokens?: number;
|
||||
reasoningTokens?: number;
|
||||
}
|
||||
|
||||
/** Normalize an AI SDK usage object to our flat client-facing shape, resolving
|
||||
* reasoning tokens from either the new `outputTokenDetails` or the deprecated
|
||||
* top-level field. Returns undefined for a missing usage object. */
|
||||
function normalizeStreamUsage(
|
||||
usage: StreamUsage | undefined,
|
||||
): ChatStreamUsage | undefined {
|
||||
if (!usage) return undefined;
|
||||
const reasoningTokens =
|
||||
usage.outputTokenDetails?.reasoningTokens ?? usage.reasoningTokens;
|
||||
return {
|
||||
inputTokens: usage.inputTokens,
|
||||
outputTokens: usage.outputTokens,
|
||||
totalTokens: usage.totalTokens,
|
||||
reasoningTokens,
|
||||
};
|
||||
}
|
||||
|
||||
/** Sum a (normalized) per-step usage into a running cumulative usage. v6's
|
||||
* `finish-step.usage` is PER-STEP, so the caller accumulates across steps; the
|
||||
* cumulative sum converges to the turn's `totalUsage` (no down-jump on the
|
||||
* client). Returns undefined only when both sides are absent. Pure. */
|
||||
export function accumulateStepUsage(
|
||||
acc: ChatStreamUsage | undefined,
|
||||
step: ChatStreamUsage | undefined,
|
||||
): ChatStreamUsage | undefined {
|
||||
if (!acc) return step;
|
||||
if (!step) return acc;
|
||||
const add = (a?: number, b?: number): number | undefined =>
|
||||
a == null && b == null ? undefined : (a ?? 0) + (b ?? 0);
|
||||
return {
|
||||
inputTokens: add(acc.inputTokens, step.inputTokens),
|
||||
outputTokens: add(acc.outputTokens, step.outputTokens),
|
||||
totalTokens: add(acc.totalTokens, step.totalTokens),
|
||||
reasoningTokens: add(acc.reasoningTokens, step.reasoningTokens),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Attach the authoritative `chatId` to the streamed assistant message's `start`
|
||||
* part (as `message.metadata.chatId`) so the client can adopt the real id for a
|
||||
* new chat. See the client's adopt-chat-id.ts for the full #137 design.
|
||||
* Pure metadata builder for the streamed assistant UI message. The AI SDK calls
|
||||
* `messageMetadata` on the `start`, `finish-step` and `finish` stream parts; we
|
||||
* attach (as `message.metadata`):
|
||||
* - `start` -> `{ chatId }` so the client adopts the real created chat id
|
||||
* at the first chunk (see adopt-chat-id.ts / #137).
|
||||
* - `finish-step` -> `{ usage }` the CUMULATIVE authoritative usage so far
|
||||
* (incl. reasoning tokens) — the caller passes the running
|
||||
* sum (`cumulativeStepUsage`), since v6 per-step usage is not
|
||||
* cumulative; the client snaps to exact without jumping down.
|
||||
* - `finish` -> `{ usage }` from the turn's `totalUsage` (final reconcile).
|
||||
* Any other part type contributes no metadata. Pure + unit-testable.
|
||||
*/
|
||||
export function chatStreamStartMetadata(
|
||||
part: { type: string },
|
||||
export function chatStreamMetadata(
|
||||
part: StreamMetadataPart,
|
||||
chatId: string,
|
||||
): { chatId: string } | undefined {
|
||||
return part.type === 'start' ? { chatId } : undefined;
|
||||
cumulativeStepUsage?: ChatStreamUsage,
|
||||
): { chatId: string } | { usage: ChatStreamUsage } | undefined {
|
||||
if (part.type === 'start') return { chatId };
|
||||
if (part.type === 'finish-step') {
|
||||
return cumulativeStepUsage ? { usage: cumulativeStepUsage } : undefined;
|
||||
}
|
||||
if (part.type === 'finish') {
|
||||
const usage = normalizeStreamUsage(part.totalUsage);
|
||||
return usage ? { usage } : undefined;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/** The last message with role 'user' from a useChat payload, if any. */
|
||||
|
||||
Reference in New Issue
Block a user