Compare commits

...

2 Commits

Author SHA1 Message Date
claude code agent 227
1095c5679f fix(dictation): address PR #118 review feedback (security, stability, tests)
Implements all reviewer comments (code-review, red-team, and test-strategy
audit), accepting the recommended variants.

Server — realtime service (ai-realtime.service.ts):
- SSRF: pin the validated IP via a WebSocket `lookup` hook that re-checks every
  resolved address with isIpAllowed (mirrors external-mcp buildPinnedDispatcher),
  closing the TOCTOU/DNS-rebinding window; fix the misleading comment.
- no-silent-loss: on Stop, drain the in-flight segment (bounded 2.5s) and deliver
  the final via onFinal before closing instead of dropping the tail.
- fail-closed deriveRealtimeUrl: a non-empty unparseable base now THROWS (no
  silent api.openai.com fallback that would leak a self-hosted key); http://ws://
  bases rejected (plaintext key). Path normalization preserved.
- parseUpstreamEvent keys the accumulator by item_id+content_index so GA segments
  don't concatenate.
- inject a wsFactory seam for testing; also fix a latent bug — `import WebSocket
  from 'ws'` resolved to undefined at runtime (no esModuleInterop) -> import=require.
- unref idle/max/drain timers.

Server — realtime gateway (ai-realtime.gateway.ts, session-limits.ts):
- reject revoked/disabled users and inactive sessions (mirror jwt.strategy:
  findById+isUserDisabled + findActiveById) with NO counter increment.
- CSWSH: Origin allowlist (matching APP_URL, or no Origin for native clients)
  before auth, no increment.
- extract SessionCounters (delete-at-zero, never negative) + pure canConnect
  (both caps >= checked before any increment); document the per-process/in-memory
  cap caveat (single-replica only).

Client:
- dictation-group: realtime final now inserts at the captured rangeRef SNAPSHOT
  (not the live caret) and guards editor.isEditable; single-space separator.
- use-realtime-dictation/realtime-dictation-client: stop-during-acquisition tears
  down the mic (no leak / button reset); reconnect re-emits start (double-start
  guarded); interim ghost cleared on teardown; io() options de-duplicated.
- pcm16-worklet: flush the partial sub-frame tail on stop; one-pole anti-aliasing
  low-pass before 48k->24k.
- extract shared mic-capture (acquireMicStream/mapGetUserMediaError, used by batch
  + realtime), pure DSP (pcm16-dsp.ts), and the session reducer/baseLanguageSubtag;
  extract applyInterimMeta/clampRange/resolveUrl/appendFinalToDraft.

Tests + infra: +~150 server tests (deriveRealtimeUrl, parseUpstreamEvent branches,
openSession/lifecycle/timers/testConnection via fake ws, gateway auth/caps/no-leak,
realtime-test admin contract, AiSettings update/resolve, DTO boolean, SSRF deny)
and +~140 client tests (DSP property/edge, resampler continuity, framing, reducer,
mic-capture, RealtimeDictationClient/MicButton, ProseMirror interim regression +
history guards, appendFinalToDraft, resolveKeyField, route contract). Added
@vitest/coverage-v8. CHANGELOG [Unreleased] entry incl. the single-replica caveat.

Review: APPROVE WITH SUGGESTIONS (no critical/regression); applied the drain-timer
unref. Server tsc clean + 358 tests; client tsc clean + 201 tests; vite build ok.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-21 17:15:33 +03:00
claude_code
0b3d595572 feat(dictation): add realtime streaming STT (live dictation)
Layer an optional realtime speech-to-text path on top of the existing
batch dictation, so transcribed text appears as the user speaks.

Transport A2: browser <-> our server (Socket.IO `/ai-realtime`) <->
OpenAI Realtime (raw ws). The provider API key never leaves the server;
the upstream URL is SSRF-checked before connecting; the gateway enforces
the dictation+dictationRealtime gate, cookie-JWT auth and per-user/
per-workspace concurrency caps. Implemented against the GA (2026) OpenAI
Realtime transcription contract (session.update / audio.input.format /
server_vad), not the now-removed beta shape.

Editor UI B2: interim text is shown as a meta-only ProseMirror ghost
decoration (no Yjs/history noise); only completed segments are committed.
Chat shows interim as a dimmed tail. The mic button switches realtime vs
batch by the workspace flag; batch remains the default and fallback.

Server:
- AiRealtimeService (upstream ws proxy, normalized events, idle/max-
  duration timeouts, idempotent teardown) + parseUpstreamEvent unit tests
- AiRealtimeGateway (Socket.IO `/ai-realtime`) wired into AiChatModule
- admin-gated POST /ai-chat/realtime/test connectivity probe
- config: settings.ai.dictationRealtime + provider sttRealtimeModel/
  sttRealtimeBaseUrl (realtime key reuses sttApiKey; no new secret)

Client:
- pcm16 AudioWorklet (24kHz mono PCM16), RealtimeDictationClient,
  use-realtime-dictation hook (status/start/stop/cancel + onInterim/onFinal)
- RealtimeMicButton + dictation-interim ProseMirror decoration
- editor/chat integration + AI settings UI (toggle, model, test endpoint)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-21 14:47:28 +03:00
51 changed files with 5922 additions and 59 deletions

1
.gitignore vendored
View File

@@ -45,3 +45,4 @@ lerna-debug.log*
# TypeScript incremental build artifacts
*.tsbuildinfo
apps/client/coverage/

View File

@@ -12,6 +12,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- **Realtime streaming dictation**: a new live-dictation mic mode layered on top
of the existing batch STT. Audio streams over a dedicated `/ai-realtime`
Socket.IO namespace and text is inserted as you speak (interim partials shown
as a ghost decoration, only finals committed to the document). Gated by a new
`dictationRealtime` workspace toggle, with `sttRealtimeModel` and
`sttRealtimeBaseUrl` settings (empty model falls back to `sttModel`; empty base
URL falls back to the STT base URL server-side).
- **Ops caveat (single-process assumption):** the realtime concurrency caps
(1 concurrent session per user, 5 per workspace) are enforced **in-memory,
per API process**. They are therefore authoritative only on a **single API
replica** — running multiple API instances (horizontal scale / load
balancing) lets a user or workspace exceed these caps, since each process
counts only its own sessions. Treat the limits as per-process until the
counters are moved to a shared store.
- Admin-only "Analytics / tracker" workspace setting: a raw HTML/JS snippet
injected into the `<head>` of public share pages only (for analytics such as
Google Analytics or Yandex.Metrika).

View File

@@ -79,6 +79,7 @@
"@types/react": "18.3.12",
"@types/react-dom": "18.3.1",
"@vitejs/plugin-react": "6.0.1",
"@vitest/coverage-v8": "4.1.6",
"eslint": "9.28.0",
"eslint-plugin-react": "7.37.5",
"eslint-plugin-react-hooks": "7.0.1",

View File

@@ -1179,6 +1179,11 @@
"Semantic search": "Semantic search",
"Voice / STT": "Voice / STT",
"Voice dictation": "Voice dictation",
"Realtime dictation": "Realtime dictation",
"Realtime model": "Realtime model",
"Realtime endpoint": "Realtime endpoint",
"Streams audio live and inserts text as you speak (requires an OpenAI-compatible Realtime endpoint)": "Streams audio live and inserts text as you speak (requires an OpenAI-compatible Realtime endpoint)",
"Leave empty to use the STT base URL": "Leave empty to use the STT base URL",
"Voice dictation is not available yet.": "Voice dictation is not available yet.",
"Test endpoint": "Test endpoint",
"Save endpoints": "Save endpoints",

View File

@@ -0,0 +1,26 @@
import { describe, it, expect } from "vitest";
import { appendFinalToDraft } from "./chat-input";
describe("appendFinalToDraft", () => {
it("an empty draft becomes the final verbatim", () => {
expect(appendFinalToDraft("", "hello")).toBe("hello");
});
it("a non-empty draft gets the final appended with exactly one space", () => {
expect(appendFinalToDraft("draft", "final")).toBe("draft final");
});
it("never introduces a leading or double space", () => {
const out = appendFinalToDraft("draft", "final");
expect(out.startsWith(" ")).toBe(false);
expect(out).not.toContain(" ");
});
it("accumulates left-to-right across repeated calls", () => {
let draft = "";
draft = appendFinalToDraft(draft, "a");
draft = appendFinalToDraft(draft, "b");
draft = appendFinalToDraft(draft, "c");
expect(draft).toBe("a b c");
});
});

View File

@@ -1,11 +1,19 @@
import { KeyboardEvent } from "react";
import { ActionIcon, Group, Textarea, Tooltip } from "@mantine/core";
import { KeyboardEvent, useState } from "react";
import {
ActionIcon,
Group,
Stack,
Text,
Textarea,
Tooltip,
} from "@mantine/core";
import { IconPlayerStopFilled, IconSend } from "@tabler/icons-react";
import { useTranslation } from "react-i18next";
import { useAtom, useAtomValue } from "jotai";
import { aiChatDraftAtom } from "@/features/ai-chat/atoms/ai-chat-atom.ts";
import { workspaceAtom } from "@/features/user/atoms/current-user-atom";
import { MicButton } from "@/features/dictation/components/mic-button";
import { RealtimeMicButton } from "@/features/dictation/components/realtime-mic-button";
interface ChatInputProps {
onSend: (text: string) => void;
@@ -14,6 +22,16 @@ interface ChatInputProps {
disabled?: boolean;
}
/**
* Merge a finalized dictation segment into the existing draft. Pure +
* unit-testable. An empty draft becomes the final verbatim; a non-empty draft
* gets the final appended with exactly one space separator. Repeated calls
* accumulate left-to-right ("a" then "b" -> "a b").
*/
export function appendFinalToDraft(draft: string, final: string): string {
return draft ? `${draft} ${final}` : final;
}
/**
* Message composer. Enter sends, Shift+Enter inserts a newline. While the agent
* is streaming, the send button becomes a Stop button (calls `stop()`); the
@@ -29,12 +47,17 @@ export default function ChatInput({
const [value, setValue] = useAtom(aiChatDraftAtom);
const workspace = useAtomValue(workspaceAtom);
const isDictationEnabled = workspace?.settings?.ai?.dictation === true;
const isRealtime = workspace?.settings?.ai?.dictationRealtime === true;
// Live interim (partial) transcript shown as a dimmed tail under the input.
const [interim, setInterim] = useState("");
const send = (): void => {
const text = value.trim();
if (!text || isStreaming || disabled) return;
onSend(text);
setValue("");
// Drop any leftover partial when a message is sent.
setInterim("");
};
const handleKeyDown = (e: KeyboardEvent<HTMLTextAreaElement>): void => {
@@ -45,7 +68,8 @@ export default function ChatInput({
};
return (
<Group gap="xs" align="flex-end" wrap="nowrap">
<Stack gap="xs">
<Group gap="xs" align="flex-end" wrap="nowrap">
<Textarea
style={{ flex: 1 }}
placeholder={t("Ask the AI agent…")}
@@ -61,13 +85,24 @@ export default function ChatInput({
// switch), so a fresh chat lands with the cursor ready in the field.
autoFocus
/>
{isDictationEnabled && (
<MicButton
size="lg"
disabled={isStreaming || disabled}
onText={(text) => setValue((v) => (v ? `${v} ${text}` : text))}
/>
)}
{isDictationEnabled &&
(isRealtime ? (
<RealtimeMicButton
size="lg"
disabled={isStreaming || disabled}
onInterim={(text) => setInterim(text)}
onFinal={(text) => {
setValue((v) => appendFinalToDraft(v, text));
setInterim("");
}}
/>
) : (
<MicButton
size="lg"
disabled={isStreaming || disabled}
onText={(text) => setValue((v) => appendFinalToDraft(v, text))}
/>
))}
{isStreaming ? (
<Tooltip label={t("Stop")} withArrow>
<ActionIcon
@@ -93,6 +128,12 @@ export default function ChatInput({
</ActionIcon>
</Tooltip>
)}
</Group>
</Group>
{interim && (
<Text size="sm" c="dimmed">
{interim}
</Text>
)}
</Stack>
);
}

View File

@@ -0,0 +1,33 @@
// Minimal ambient declarations for the AudioWorklet global scope.
//
// The client tsconfig only pulls in the DOM libs (no "webworker"/"audioworklet"
// lib), so the symbols available inside an AudioWorkletProcessor module are not
// known to `tsc`. These declarations are intentionally narrow: just enough for
// `pcm16-worklet.ts` to typecheck, matching the Web Audio API spec shapes used
// by that processor. They describe the worklet global scope, not the main thread.
declare abstract class AudioWorkletProcessor {
// Message channel back to the main thread (used to transfer PCM16 buffers).
readonly port: MessagePort;
constructor();
// Called for each render quantum. `inputs`/`outputs` are channel arrays
// indexed as [input][channel][sample]; `parameters` maps AudioParam names to
// their per-sample (or single-value) Float32Array. Return `true` to keep the
// processor alive.
abstract process(
inputs: Float32Array[][],
outputs: Float32Array[][],
parameters: Record<string, Float32Array>,
): boolean;
}
// Registers a processor class under a name usable from `new AudioWorkletNode`.
declare function registerProcessor(
name: string,
processorCtor: new () => AudioWorkletProcessor,
): void;
// The render context's sample rate, in Hz, available in the worklet global scope.
declare const sampleRate: number;

View File

@@ -0,0 +1,87 @@
import { describe, it, expect } from "vitest";
import {
mapGetUserMediaError,
canStartCapture,
MicUnavailableError,
} from "./mic-capture";
// Identity translator so assertions read the source key (the i18n layer is not
// under test here).
const t = (k: string) => k;
describe("mapGetUserMediaError", () => {
it("maps NotAllowedError / SecurityError to denied", () => {
expect(mapGetUserMediaError({ name: "NotAllowedError" }, t)).toBe(
"Microphone access denied",
);
expect(mapGetUserMediaError({ name: "SecurityError" }, t)).toBe(
"Microphone access denied",
);
});
it("maps NotFoundError / OverconstrainedError to not found", () => {
expect(mapGetUserMediaError({ name: "NotFoundError" }, t)).toBe(
"No microphone found",
);
expect(mapGetUserMediaError({ name: "OverconstrainedError" }, t)).toBe(
"No microphone found",
);
});
it("maps NotReadableError / AbortError to in-use", () => {
expect(mapGetUserMediaError({ name: "NotReadableError" }, t)).toBe(
"Microphone is unavailable or already in use",
);
expect(mapGetUserMediaError({ name: "AbortError" }, t)).toBe(
"Microphone is unavailable or already in use",
);
});
it("falls back to a detailed message for unknown errors", () => {
const msg = mapGetUserMediaError(
{ name: "WeirdError", message: "boom" },
t,
);
expect(msg).toContain("Could not start recording");
expect(msg).toContain("WeirdError");
expect(msg).toContain("boom");
});
it("falls back without a name", () => {
const msg = mapGetUserMediaError(new Error("nope"), t);
expect(msg).toContain("Could not start recording");
expect(msg).toContain("nope");
});
});
describe("canStartCapture", () => {
const base = {
starting: false,
hasStream: false,
hasLiveResource: false,
statusIsIdle: true,
};
it("allows when idle and nothing live", () => {
expect(canStartCapture(base)).toBe(true);
});
it("blocks while already starting", () => {
expect(canStartCapture({ ...base, starting: true })).toBe(false);
});
it("blocks when a stream is live", () => {
expect(canStartCapture({ ...base, hasStream: true })).toBe(false);
});
it("blocks when a downstream resource is live", () => {
expect(canStartCapture({ ...base, hasLiveResource: true })).toBe(false);
});
it("blocks when status is not idle", () => {
expect(canStartCapture({ ...base, statusIsIdle: false })).toBe(false);
});
});
describe("MicUnavailableError", () => {
it("is identifiable via instanceof", () => {
const e = new MicUnavailableError();
expect(e).toBeInstanceOf(MicUnavailableError);
expect(e.name).toBe("MicUnavailableError");
});
});

View File

@@ -0,0 +1,68 @@
// Shared microphone-acquisition front-end used by BOTH the batch (`use-dictation`)
// and streaming (`use-realtime-dictation`) hooks. Only the getUserMedia handshake
// and its error→message mapping live here — the two hooks keep their own distinct
// downstream graphs (MediaRecorder vs AudioWorklet) and their own streamRef
// ownership. This collapses the ~37 duplicated lines without merging the hooks.
// Translate function shape (react-i18next's `t`). Kept structural so this module
// has no i18next dependency and stays trivially testable.
export type Translate = (key: string) => string;
/** Thrown by `acquireMicStream` when the environment cannot capture audio. */
export class MicUnavailableError extends Error {
constructor() {
super("navigator.mediaDevices.getUserMedia is unavailable in this context");
this.name = "MicUnavailableError";
}
}
/**
* Map a getUserMedia rejection to a user-facing, localized message. Mirrors the
* branching both hooks used previously so behavior is identical. Pure aside from
* the injected `t`; safe to unit-test with a stub translator.
*/
export function mapGetUserMediaError(err: unknown, t: Translate): string {
const name = (err as { name?: string })?.name;
const detail = (err as { message?: string })?.message ?? String(err);
if (name === "NotAllowedError" || name === "SecurityError") {
return t("Microphone access denied");
}
if (name === "NotFoundError" || name === "OverconstrainedError") {
return t("No microphone found");
}
if (name === "NotReadableError" || name === "AbortError") {
return t("Microphone is unavailable or already in use");
}
// Unknown failure: show the real reason instead of a generic string.
return `${t("Could not start recording")}: ${name ? `${name}: ` : ""}${detail}`;
}
/**
* Request the microphone. Throws `MicUnavailableError` when the API is missing
* (so callers can show the "not available in this context" notification), and
* otherwise rethrows the raw getUserMedia error for `mapGetUserMediaError`. The
* caller owns the returned stream (assigns it to its own streamRef and is
* responsible for stopping the tracks on every exit path).
*/
export async function acquireMicStream(): Promise<MediaStream> {
if (!navigator.mediaDevices?.getUserMedia) {
throw new MicUnavailableError();
}
return navigator.mediaDevices.getUserMedia({ audio: true });
}
/**
* Shared synchronous double-start guard. Returns true when a new capture may
* begin, false when one is already starting or live (so the second click is a
* no-op and never opens a leaking second MediaStream). `status` is the React
* status; the refs cover the window before the next render commits.
*/
export function canStartCapture(args: {
starting: boolean;
hasStream: boolean;
hasLiveResource: boolean;
statusIsIdle: boolean;
}): boolean {
if (args.starting || args.hasStream || args.hasLiveResource) return false;
return args.statusIsIdle;
}

View File

@@ -0,0 +1,178 @@
import { describe, it, expect } from "vitest";
import {
floatSampleToInt16,
floatToPcm16LE,
LinearResampler,
OnePoleLowPass,
FrameAccumulator,
FRAME_SAMPLES,
} from "./pcm16-dsp";
// Read back the LE int16 values from a PCM16 ArrayBuffer for assertions.
function readInt16LE(buf: ArrayBuffer): number[] {
const view = new DataView(buf);
const out: number[] = [];
for (let i = 0; i < buf.byteLength; i += 2) out.push(view.getInt16(i, true));
return out;
}
describe("floatSampleToInt16 / floatToPcm16LE", () => {
it("maps +1 → 32767, -1 → -32768, 0 → 0", () => {
expect(floatSampleToInt16(1)).toBe(32767);
expect(floatSampleToInt16(-1)).toBe(-32768);
expect(floatSampleToInt16(0)).toBe(0);
});
it("clamps +2 / -2 without overflow", () => {
expect(floatSampleToInt16(2)).toBe(32767);
expect(floatSampleToInt16(-2)).toBe(-32768);
expect(floatSampleToInt16(1000)).toBe(32767);
expect(floatSampleToInt16(-1000)).toBe(-32768);
});
it("handles NaN and Infinity", () => {
expect(floatSampleToInt16(NaN)).toBe(0);
expect(floatSampleToInt16(Infinity)).toBe(32767);
expect(floatSampleToInt16(-Infinity)).toBe(-32768);
});
it("writes little-endian byte order", () => {
// 1 → 32767 = 0x7FFF → LE bytes [0xFF, 0x7F].
const buf = floatToPcm16LE([1]);
const bytes = new Uint8Array(buf);
expect(bytes[0]).toBe(0xff);
expect(bytes[1]).toBe(0x7f);
expect(buf.byteLength).toBe(2);
});
it("emits exactly length*2 bytes and round-trips", () => {
const input = [0, 1, -1, 0.5, -0.5];
const buf = floatToPcm16LE(input);
expect(buf.byteLength).toBe(input.length * 2);
const back = readInt16LE(buf);
expect(back[0]).toBe(0);
expect(back[1]).toBe(32767);
expect(back[2]).toBe(-32768);
});
it("property: output is always within [-32768, 32767]", () => {
for (let i = 0; i < 1000; i++) {
const v = (Math.random() - 0.5) * 10; // span well beyond [-1,1]
const out = floatSampleToInt16(v);
expect(out).toBeGreaterThanOrEqual(-32768);
expect(out).toBeLessThanOrEqual(32767);
}
// Include hostile values explicitly.
for (const v of [NaN, Infinity, -Infinity, 1e308, -1e308]) {
const out = floatSampleToInt16(v);
expect(out).toBeGreaterThanOrEqual(-32768);
expect(out).toBeLessThanOrEqual(32767);
}
});
});
describe("LinearResampler", () => {
function ramp(n: number): Float32Array {
const a = new Float32Array(n);
for (let i = 0; i < n; i++) a[i] = i / n;
return a;
}
it("48k → 24k produces ~half the samples", () => {
const rs = new LinearResampler(48000, 24000);
const out = rs.process(ramp(1000));
expect(out.length).toBeGreaterThan(480);
expect(out.length).toBeLessThan(520);
});
it("ratio = 1 is approximately a passthrough (length-wise)", () => {
const rs = new LinearResampler(24000, 24000);
const input = ramp(1000);
const out = rs.process(input);
expect(Math.abs(out.length - input.length)).toBeLessThanOrEqual(1);
});
it("44.1k → 24k fractional ratio yields the expected count", () => {
const rs = new LinearResampler(44100, 24000);
const n = 4410;
const out = rs.process(ramp(n));
const expected = n * (24000 / 44100); // ~2400
expect(Math.abs(out.length - expected)).toBeLessThan(3);
});
it("cross-quantum continuity: split == single", () => {
const input = ramp(2000);
const single = new LinearResampler(48000, 24000).process(input);
const split = new LinearResampler(48000, 24000);
const a = split.process(input.subarray(0, 777));
const b = split.process(input.subarray(777));
const joined = new Float32Array(a.length + b.length);
joined.set(a, 0);
joined.set(b, a.length);
expect(joined.length).toBe(single.length);
for (let i = 0; i < single.length; i++) {
expect(joined[i]).toBeCloseTo(single[i], 6);
}
});
it("never reads out of bounds (no NaN in output)", () => {
const rs = new LinearResampler(48000, 24000);
for (let q = 0; q < 50; q++) {
const out = rs.process(ramp(128));
for (const v of out) expect(Number.isNaN(v)).toBe(false);
}
});
});
describe("OnePoleLowPass", () => {
it("is a passthrough when not downsampling", () => {
const lp = new OnePoleLowPass(24000, 24000);
for (const v of [0.5, -0.3, 1, -1]) expect(lp.process(v)).toBe(v);
});
it("attenuates a step (smooths) when downsampling", () => {
const lp = new OnePoleLowPass(48000, 24000);
const first = lp.process(1);
// One-pole on a step from 0 should not jump straight to 1.
expect(first).toBeLessThan(1);
expect(first).toBeGreaterThan(0);
});
});
describe("FrameAccumulator", () => {
it("emits exactly one 7200-byte frame for FRAME_SAMPLES samples", () => {
expect(FRAME_SAMPLES).toBe(3600);
const acc = new FrameAccumulator();
const frames = acc.push(new Float32Array(FRAME_SAMPLES));
expect(frames).toHaveLength(1);
expect(frames[0].byteLength).toBe(7200);
expect(acc.pending).toBe(0);
});
it("emits no frame for FRAME_SAMPLES-1 samples and carries the remainder", () => {
const acc = new FrameAccumulator();
const frames = acc.push(new Float32Array(FRAME_SAMPLES - 1));
expect(frames).toHaveLength(0);
expect(acc.pending).toBe(FRAME_SAMPLES - 1);
});
it("carries the remainder across pushes", () => {
const acc = new FrameAccumulator();
expect(acc.push(new Float32Array(2000))).toHaveLength(0);
const frames = acc.push(new Float32Array(2000)); // 4000 total → one frame
expect(frames).toHaveLength(1);
expect(acc.pending).toBe(400); // 4000 - 3600
});
it("flush emits the partial tail then clears", () => {
const acc = new FrameAccumulator();
acc.push(new Float32Array(100));
const tail = acc.flush();
expect(tail).not.toBeNull();
expect(tail!.byteLength).toBe(200);
expect(acc.pending).toBe(0);
expect(acc.flush()).toBeNull();
});
});

View File

@@ -0,0 +1,187 @@
// Pure DSP primitives for the realtime dictation capture path. These functions
// carry NO Web Audio / worklet dependencies so they can be unit-tested directly
// in jsdom/node. The AudioWorklet processor (`pcm16-worklet.ts`) re-implements
// the same math inline (the worklet global scope forbids ES imports at runtime),
// but THIS module is the single canonical reference the tests exercise and the
// worklet is kept byte-identical in behavior to it. See the note in
// `pcm16-worklet.ts`.
// Target output rate required by the upstream transcription contract.
export const TARGET_RATE = 24000;
// ~150 ms of audio at the target rate: 24000 * 0.15 = 3600 samples per message.
export const FRAME_SAMPLES = Math.round(TARGET_RATE * 0.15);
/**
* Convert a single normalized float audio sample in [-1, 1] to a signed 16-bit
* integer. Values outside the range are clamped; NaN/Inf collapse to 0/±range so
* the output is ALWAYS within [-32768, 32767]. Negative values scale by 0x8000
* and non-negative by 0x7fff so that +1 → 32767 and -1 → -32768 exactly.
*/
export function floatSampleToInt16(sample: number): number {
let s = sample;
if (Number.isNaN(s)) return 0;
if (s > 1) s = 1;
else if (s < -1) s = -1;
const scaled = s < 0 ? s * 0x8000 : s * 0x7fff;
// Math.round to the nearest integer, then a hard clamp as a final guard.
let v = Math.round(scaled);
if (v > 32767) v = 32767;
else if (v < -32768) v = -32768;
return v;
}
/**
* Convert a Float32 sample buffer to little-endian PCM16 bytes. The returned
* ArrayBuffer is exactly `float32.length * 2` bytes; byte order is LE regardless
* of host endianness (DataView writes are explicit).
*/
export function floatToPcm16LE(float32: ArrayLike<number>): ArrayBuffer {
const count = float32.length;
const buffer = new ArrayBuffer(count * 2);
const view = new DataView(buffer);
for (let i = 0; i < count; i++) {
view.setInt16(i * 2, floatSampleToInt16(float32[i]), true);
}
return buffer;
}
/**
* A simple one-pole IIR low-pass filter used as a cheap anti-aliasing stage
* before downsampling (e.g. 48k → 24k). The coefficient is derived from the
* normalized cutoff so the filter attenuates content above the output Nyquist,
* reducing aliasing noise that would otherwise confuse the STT model. State is
* carried across quanta via the returned `prev` so there are no per-quantum
* seams. When `inputRate <= outputRate` (no downsampling) the filter is a
* passthrough.
*/
export class OnePoleLowPass {
private alpha: number;
private prev: number;
private readonly enabled: boolean;
constructor(inputRate: number, outputRate: number, primed = 0) {
// Cutoff a touch below the output Nyquist to leave transition room.
const cutoff = (outputRate / 2) * 0.9;
this.enabled = inputRate > outputRate && cutoff > 0 && inputRate > 0;
// Standard one-pole alpha: dt / (rc + dt), rc = 1 / (2π fc).
const dt = 1 / Math.max(inputRate, 1);
const rc = 1 / (2 * Math.PI * Math.max(cutoff, 1));
this.alpha = dt / (rc + dt);
this.prev = primed;
}
/** Filter one sample in place; passthrough when disabled. */
process(sample: number): number {
if (!this.enabled) return sample;
this.prev = this.prev + this.alpha * (sample - this.prev);
return this.prev;
}
}
/**
* Stateful linear resampler that converts a stream of input quanta at
* `inputRate` to `outputRate`, carrying the fractional read position and the
* boundary sample across calls so splitting a signal into two `process()` calls
* yields the same output as one call (cross-quantum continuity). Never reads out
* of bounds: the right neighbor of every emitted sample is guaranteed to exist
* within the current quantum; any leftover position is carried.
*/
export class LinearResampler {
private readonly ratio: number;
private resamplePos = 0;
private prevSample = 0;
private primed = false;
constructor(inputRate: number, outputRate: number) {
// Input samples consumed per output sample. >1 when downsampling.
this.ratio = inputRate / outputRate;
}
/**
* Resample one quantum and return the produced output samples. The optional
* `filter` is applied to each input sample as it is consumed (anti-aliasing).
*/
process(channel: ArrayLike<number>, filter?: OnePoleLowPass): Float32Array {
const n = channel.length;
if (n === 0) return new Float32Array(0);
// Apply the anti-aliasing filter once over the raw input, keeping the result
// in a local buffer so resampling reads filtered values. The filter state is
// carried inside `filter` across calls.
let src: ArrayLike<number> = channel;
if (filter) {
const filtered = new Float32Array(n);
for (let i = 0; i < n; i++) filtered[i] = filter.process(channel[i]);
src = filtered;
}
if (!this.primed) {
this.prevSample = src[0];
this.primed = true;
this.resamplePos = 0;
}
// Worst case output count for sizing; trim at the end.
const out: number[] = [];
let pos = this.resamplePos;
while (pos < n - 1) {
const floor = Math.floor(pos);
const frac = pos - floor;
const s0 = floor < 0 ? this.prevSample : src[floor];
const s1 = src[floor + 1];
out.push(s0 + (s1 - s0) * frac);
pos += this.ratio;
}
this.resamplePos = pos - n;
this.prevSample = src[n - 1];
return Float32Array.from(out);
}
}
/**
* Accumulates resampled Float32 samples and emits whole PCM16 frames of exactly
* FRAME_SAMPLES (7200-byte ArrayBuffers). The remainder is carried until the
* next push completes a frame. `flush()` emits any partial remainder (used on
* teardown so the final ~150 ms is not lost).
*/
export class FrameAccumulator {
private acc: Float32Array;
private accLen = 0;
private readonly frameSamples: number;
constructor(frameSamples: number = FRAME_SAMPLES) {
this.frameSamples = frameSamples;
this.acc = new Float32Array(frameSamples);
}
/**
* Push samples; returns zero or more complete PCM16 frame buffers (each
* `frameSamples * 2` bytes). The carried remainder stays buffered.
*/
push(samples: ArrayLike<number>): ArrayBuffer[] {
const frames: ArrayBuffer[] = [];
for (let i = 0; i < samples.length; i++) {
this.acc[this.accLen] = samples[i];
this.accLen += 1;
if (this.accLen >= this.frameSamples) {
frames.push(floatToPcm16LE(this.acc.subarray(0, this.accLen)));
this.accLen = 0;
}
}
return frames;
}
/** Emit the partial remainder (if any) as one frame and clear it. */
flush(): ArrayBuffer | null {
if (this.accLen === 0) return null;
const buf = floatToPcm16LE(this.acc.subarray(0, this.accLen));
this.accLen = 0;
return buf;
}
/** Number of buffered samples not yet flushed. */
get pending(): number {
return this.accLen;
}
}

View File

@@ -0,0 +1,179 @@
// Self-contained AudioWorkletProcessor that turns the live microphone stream into
// PCM16 (signed 16-bit, little-endian), mono, 24000 Hz chunks for the realtime STT
// upstream. It runs in the AudioWorklet global scope, so it MUST NOT import anything
// (the worklet module has no module graph / bundler runtime around it).
//
// IMPORTANT — single source of truth: the DSP math below (float→PCM16 conversion,
// the one-pole anti-aliasing low-pass, linear resampling, and frame accumulation)
// is the SAME algorithm exported as pure, unit-tested functions from the sibling
// `pcm16-dsp.ts`. Because the worklet scope cannot `import` at runtime, the logic
// is mirrored here inline rather than imported, and the tests assert that the pure
// module behaves identically. Any change to one MUST be mirrored in the other.
//
// Per `process()` call the host hands us a render quantum (typically 128 frames) at
// the context sample rate. We read the first input channel (mono), apply a cheap
// anti-aliasing low-pass, linearly resample to 24000 Hz while carrying the
// fractional read position across calls (so we never assume a particular input
// rate, e.g. 44.1k or 48k), accumulate the resampled samples, and once we have
// ~150 ms worth (3600 samples) we emit them as an Int16 ArrayBuffer transferred to
// the main thread. A 'flush' message from the main thread emits the partial tail so
// the last ~150 ms is not lost on stop.
// Target output rate required by the upstream transcription contract.
const TARGET_RATE = 24000;
// ~150 ms of audio at the target rate: 24000 * 0.15 = 3600 samples per message.
const FRAME_SAMPLES = Math.round(TARGET_RATE * 0.15);
class Pcm16Worklet extends AudioWorkletProcessor {
// Fractional read position within the CURRENT quantum, in input-sample units.
// Kept across `process()` calls so resampling has no per-quantum seams. After a
// quantum it is rebased relative to the next quantum's start, so a value in
// [-1, 0) means "interpolate between the previous quantum's last sample and the
// next quantum's first sample".
private resamplePos = 0;
// The previous quantum's last input sample, used to interpolate across the
// boundary between two render quanta (the conceptual sample at index -1).
private prevSample = 0;
// True once at least one sample has been seen (so `prevSample` is meaningful).
private primed = false;
// Accumulated resampled Float32 samples awaiting conversion + flush.
private acc: Float32Array = new Float32Array(FRAME_SAMPLES);
private accLen = 0;
// --- Anti-aliasing one-pole low-pass state (see OnePoleLowPass in pcm16-dsp) ---
// Configured lazily on the first quantum once `sampleRate` is known.
private lpAlpha = 1;
private lpPrev = 0;
private lpEnabled = false;
private lpConfigured = false;
constructor() {
super();
// The main thread asks for a tail flush on stop so the last partial frame
// (~150 ms) is not dropped. Any message triggers a flush of the remainder.
this.port.onmessage = (event: MessageEvent) => {
if (event.data === "flush") this.flush();
};
}
private configureLowPass(): void {
if (this.lpConfigured) return;
this.lpConfigured = true;
const inputRate = sampleRate;
const outputRate = TARGET_RATE;
const cutoff = (outputRate / 2) * 0.9;
this.lpEnabled = inputRate > outputRate && cutoff > 0 && inputRate > 0;
const dt = 1 / Math.max(inputRate, 1);
const rc = 1 / (2 * Math.PI * Math.max(cutoff, 1));
this.lpAlpha = dt / (rc + dt);
this.lpPrev = 0;
}
private lowPass(sample: number): number {
if (!this.lpEnabled) return sample;
this.lpPrev = this.lpPrev + this.lpAlpha * (sample - this.lpPrev);
return this.lpPrev;
}
process(inputs: Float32Array[][]): boolean {
const input = inputs[0];
// No connected input (or a momentarily empty quantum): keep the node alive
// and emit silence below.
const channel = input && input.length > 0 ? input[0] : undefined;
if (channel && channel.length > 0) {
this.resampleAndAccumulate(channel);
}
// Drive silence to the output so connecting this node to destination keeps
// the graph running without echoing the microphone back to the speakers.
return true;
}
// Apply anti-aliasing, linearly resample `channel` (at the context `sampleRate`)
// to TARGET_RATE, and push the results into the accumulator, flushing whole
// frames as they fill.
private resampleAndAccumulate(channel: Float32Array): void {
this.configureLowPass();
const ratio = sampleRate / TARGET_RATE; // input samples consumed per output sample
const n = channel.length;
// Anti-alias the raw input first; carry the filter state across quanta.
const src = new Float32Array(n);
for (let i = 0; i < n; i++) src[i] = this.lowPass(channel[i]);
if (!this.primed) {
// First quantum: there is no real predecessor, so seed the virtual index -1
// with this quantum's first sample and start reading from 0.
this.prevSample = src[0];
this.primed = true;
this.resamplePos = 0;
}
let pos = this.resamplePos;
// Emit output samples whose RIGHT neighbor (floor + 1) is available within
// this quantum, i.e. while floor + 1 <= n - 1 ⇔ pos < n - 1. The left
// neighbor at floor === -1 is the carried `prevSample`; floor >= 0 reads the
// quantum directly. Any leftover position (whose right neighbor would be the
// NEXT quantum's first sample) is carried via `resamplePos` and resolved on
// the next call. This guarantees we never read `src[n]` (out of bounds).
while (pos < n - 1) {
const floor = Math.floor(pos);
const frac = pos - floor;
const s0 = floor < 0 ? this.prevSample : src[floor];
const s1 = src[floor + 1];
this.pushSample(s0 + (s1 - s0) * frac);
pos += ratio;
}
// Rebase the leftover position relative to the next quantum's start and carry
// this quantum's last sample as the predecessor for the boundary interval.
this.resamplePos = pos - n;
this.prevSample = src[n - 1];
}
// Append one resampled sample; flush a full PCM16 frame whenever the
// accumulator reaches FRAME_SAMPLES.
private pushSample(sample: number): void {
this.acc[this.accLen] = sample;
this.accLen += 1;
if (this.accLen >= FRAME_SAMPLES) {
this.flush();
}
}
// Convert the accumulated Float32 samples to Int16 LE and post the ArrayBuffer
// to the main thread, transferring ownership (zero-copy). DataView writes are
// little-endian to match the PCM16 contract regardless of host endianness.
// Also invoked on a 'flush' message to emit a partial tail frame on stop.
private flush(): void {
const count = this.accLen;
if (count === 0) return;
const buffer = new ArrayBuffer(count * 2);
const view = new DataView(buffer);
for (let i = 0; i < count; i++) {
// Clamp to [-1, 1] then scale to the signed 16-bit range. Mirrors
// floatSampleToInt16 in pcm16-dsp.ts.
let s = this.acc[i];
if (Number.isNaN(s)) s = 0;
else if (s > 1) s = 1;
else if (s < -1) s = -1;
let v = Math.round(s < 0 ? s * 0x8000 : s * 0x7fff);
if (v > 32767) v = 32767;
else if (v < -32768) v = -32768;
view.setInt16(i * 2, v, true);
}
this.accLen = 0;
this.port.postMessage(buffer, [buffer]);
}
}
registerProcessor("pcm16-worklet", Pcm16Worklet);

View File

@@ -0,0 +1,103 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { render, screen, fireEvent, cleanup } from "@testing-library/react";
import { MantineProvider } from "@mantine/core";
// jsdom has no matchMedia; Mantine's color-scheme provider needs it. Stub a
// minimal, inert implementation before any MantineProvider mounts.
if (typeof window.matchMedia !== "function") {
window.matchMedia = (query: string) =>
({
matches: false,
media: query,
onchange: null,
addListener: () => undefined,
removeListener: () => undefined,
addEventListener: () => undefined,
removeEventListener: () => undefined,
dispatchEvent: () => false,
}) as unknown as MediaQueryList;
}
// Mock i18n so labels render the raw key.
vi.mock("react-i18next", () => ({
useTranslation: () => ({ t: (k: string) => k, i18n: {} }),
}));
// Controllable mock of the dictation hook. Tests set the returned status and
// inspect the start/stop spies.
const hookState: {
status: "idle" | "recording" | "error";
start: ReturnType<typeof vi.fn>;
stop: ReturnType<typeof vi.fn>;
cancel: ReturnType<typeof vi.fn>;
} = {
status: "idle",
start: vi.fn(),
stop: vi.fn(),
cancel: vi.fn(),
};
vi.mock("@/features/dictation/hooks/use-realtime-dictation", () => ({
useRealtimeDictation: () => hookState,
}));
import { RealtimeMicButton } from "./realtime-mic-button";
function renderButton(props: Partial<Parameters<typeof RealtimeMicButton>[0]> = {}) {
const onInterim = vi.fn();
const onFinal = vi.fn();
const utils = render(
<MantineProvider>
<RealtimeMicButton onInterim={onInterim} onFinal={onFinal} {...props} />
</MantineProvider>,
);
return { onInterim, onFinal, ...utils };
}
beforeEach(() => {
cleanup();
hookState.status = "idle";
hookState.start = vi.fn();
hookState.stop = vi.fn();
hookState.cancel = vi.fn();
});
describe("RealtimeMicButton", () => {
it("idle: clicking calls start", () => {
renderButton();
fireEvent.click(screen.getByLabelText("Start dictation"));
expect(hookState.start).toHaveBeenCalledTimes(1);
expect(hookState.stop).not.toHaveBeenCalled();
});
it("recording: clicking calls stop", () => {
hookState.status = "recording";
renderButton();
fireEvent.click(screen.getByLabelText("Stop recording"));
expect(hookState.stop).toHaveBeenCalledTimes(1);
expect(hookState.start).not.toHaveBeenCalled();
});
it("recording → idle transition fires onInterim('') exactly once", () => {
hookState.status = "recording";
const { onInterim, rerender } = renderButton();
expect(onInterim).not.toHaveBeenCalled();
hookState.status = "idle";
rerender(
<MantineProvider>
<RealtimeMicButton onInterim={onInterim} onFinal={vi.fn()} />
</MantineProvider>,
);
expect(onInterim).toHaveBeenCalledTimes(1);
expect(onInterim).toHaveBeenCalledWith("");
// A further re-render in idle does not fire it again.
rerender(
<MantineProvider>
<RealtimeMicButton onInterim={onInterim} onFinal={vi.fn()} />
</MantineProvider>,
);
expect(onInterim).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,84 @@
import { FC, useEffect, useRef } from "react";
import { ActionIcon, Tooltip } from "@mantine/core";
import { IconMicrophone, IconPlayerStopFilled } from "@tabler/icons-react";
import { useTranslation } from "react-i18next";
import {
useRealtimeDictation,
type RealtimeDictationStatus,
} from "@/features/dictation/hooks/use-realtime-dictation";
interface RealtimeMicButtonProps {
onInterim: (text: string) => void;
onFinal: (text: string) => void;
onStart?: () => void;
disabled?: boolean;
// Mantine ActionIcon size token; "lg" matches the chat composer, "md" the
// editor toolbar.
size?: "md" | "lg";
}
/**
* Streaming sibling of MicButton. Drives the realtime dictation state machine:
* a click starts recording (mic icon), a second click stops it (stop icon).
* Interim/final transcripts are surfaced through the onInterim/onFinal props as
* they arrive; there is no "transcribing" state because final text lands
* incrementally while recording. Mirrors MicButton's look and tooltips.
*/
export const RealtimeMicButton: FC<RealtimeMicButtonProps> = ({
onInterim,
onFinal,
onStart,
disabled,
size = "lg",
}) => {
const { t } = useTranslation();
const { status, start, stop } = useRealtimeDictation({
onInterim,
onFinal,
onStart,
});
const iconSize = size === "lg" ? 18 : 16;
// When recording ends (status leaves "recording" for idle/error), clear any
// leftover partial in the consumer once. Tracked via the previous status so
// it only fires on the transition, not on every render.
const prevStatusRef = useRef<RealtimeDictationStatus>(status);
useEffect(() => {
if (prevStatusRef.current === "recording" && status !== "recording") {
onInterim("");
}
prevStatusRef.current = status;
}, [status, onInterim]);
if (status === "recording") {
return (
<Tooltip label={t("Stop recording")} withArrow>
<ActionIcon
size={size}
color="red"
variant="light"
onClick={stop}
aria-label={t("Stop recording")}
>
<IconPlayerStopFilled size={iconSize} />
</ActionIcon>
</Tooltip>
);
}
// idle / error: subtle mic to (re)start. No spinner — there is no separate
// transcribing phase in the realtime flow.
return (
<Tooltip label={t("Start dictation")} withArrow>
<ActionIcon
size={size}
variant="subtle"
onClick={() => void start()}
disabled={disabled}
aria-label={t("Start dictation")}
>
<IconMicrophone size={iconSize} />
</ActionIcon>
</Tooltip>
);
};

View File

@@ -2,6 +2,12 @@ import { useCallback, useEffect, useRef, useState } from "react";
import { notifications } from "@mantine/notifications";
import { useTranslation } from "react-i18next";
import { transcribeAudio } from "@/features/dictation/services/dictation-service";
import {
acquireMicStream,
canStartCapture,
mapGetUserMediaError,
MicUnavailableError,
} from "@/features/dictation/audio/mic-capture";
export type DictationStatus = "idle" | "recording" | "transcribing" | "error";
@@ -83,46 +89,38 @@ export function useDictation(
}, []);
const start = useCallback(async (): Promise<void> => {
// Synchronous live guard: status is stale between renders, so also block on
// refs to prevent a double-click from opening two MediaStreams (the first
// would leak).
if (startingRef.current || recorderRef.current || streamRef.current) return;
if (status !== "idle") return;
startingRef.current = true;
if (!navigator.mediaDevices?.getUserMedia) {
const reason =
"navigator.mediaDevices.getUserMedia is unavailable in this context";
console.error("[dictation] " + reason);
notifications.show({
color: "red",
message: t("Audio recording is not available in this browser/context"),
});
setStatus("idle");
startingRef.current = false;
// Synchronous live guard (shared with the streaming hook): status is stale
// between renders, so also block on refs to prevent a double-click from
// opening two MediaStreams (the first would leak).
if (
!canStartCapture({
starting: startingRef.current,
hasStream: streamRef.current !== null,
hasLiveResource: recorderRef.current !== null,
statusIsIdle: status === "idle",
})
) {
return;
}
startingRef.current = true;
let stream: MediaStream;
try {
stream = await navigator.mediaDevices.getUserMedia({ audio: true });
stream = await acquireMicStream();
} catch (err) {
// Always log the full error for diagnosis (name, message, stack).
console.error("[dictation] getUserMedia failed", err);
const name = (err as { name?: string })?.name;
const detail = (err as { message?: string })?.message ?? String(err);
let message: string;
if (name === "NotAllowedError" || name === "SecurityError") {
message = t("Microphone access denied");
} else if (name === "NotFoundError" || name === "OverconstrainedError") {
message = t("No microphone found");
} else if (name === "NotReadableError" || name === "AbortError") {
message = t("Microphone is unavailable or already in use");
if (err instanceof MicUnavailableError) {
console.error("[dictation] " + err.message);
notifications.show({
color: "red",
message: t(
"Audio recording is not available in this browser/context",
),
});
} else {
// Unknown failure: show the real reason instead of a generic string.
message = `${t("Could not start recording")}: ${name ? `${name}: ` : ""}${detail}`;
// Always log the full error for diagnosis (name, message, stack).
console.error("[dictation] getUserMedia failed", err);
notifications.show({ color: "red", message: mapGetUserMediaError(err, t) });
}
notifications.show({ color: "red", message });
setStatus("idle");
startingRef.current = false;
return;

View File

@@ -0,0 +1,492 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { notifications } from "@mantine/notifications";
import { useTranslation } from "react-i18next";
import { RealtimeDictationClient } from "@/features/dictation/services/realtime-dictation-client";
import {
acquireMicStream,
canStartCapture,
mapGetUserMediaError,
MicUnavailableError,
} from "@/features/dictation/audio/mic-capture";
import { baseLanguageSubtag } from "@/features/dictation/services/dictation-reducer";
// The worklet module URL is produced via `new URL(..., import.meta.url)` so Vite
// emits the processor as a separate, self-contained module chunk (it must run in
// the AudioWorklet global scope, outside the main bundle). Built once at module
// load — the resolved URL is stable for the app's lifetime.
const PCM16_WORKLET_URL = new URL(
"../audio/pcm16-worklet.ts",
import.meta.url,
);
export type RealtimeDictationStatus = "idle" | "recording" | "error";
export interface UseRealtimeDictationOptions {
onInterim: (text: string) => void; // latest partial for the live segment
onFinal: (text: string) => void; // a completed segment (trimmed)
onStart?: () => void; // fired right when capture begins (caret snapshot)
maxDurationMs?: number; // default 120000
}
export interface UseRealtimeDictationResult {
status: RealtimeDictationStatus;
start: () => Promise<void>;
stop: () => void;
cancel: () => void;
}
// AudioContext is webkit-prefixed on some older Safari builds; keep a typed
// fallback so the hook never crashes when the standard name is missing.
function getAudioContextCtor(): typeof AudioContext | undefined {
if (typeof AudioContext !== "undefined") return AudioContext;
const w = window as unknown as { webkitAudioContext?: typeof AudioContext };
return w.webkitAudioContext;
}
/**
* Streaming sibling of `use-dictation`. Captures the mic, resamples to PCM16
* 24 kHz in an AudioWorklet, and streams it over the normalized `/ai-realtime`
* Socket.IO namespace, surfacing interim/final transcripts as they arrive.
*
* Mirrors `use-dictation`'s conventions: refs hold the live graph/client/timers
* so re-renders never lose them, getUserMedia errors map to the same Mantine
* notifications, and every exit path stops the MediaStream tracks and closes the
* AudioContext. There is no `transcribing` state — final text arrives
* incrementally while `recording`.
*/
export function useRealtimeDictation(
options: UseRealtimeDictationOptions,
): UseRealtimeDictationResult {
const { t, i18n } = useTranslation();
const [status, setStatus] = useState<RealtimeDictationStatus>("idle");
// Keep the latest callbacks in a ref so async socket handlers always call the
// current handlers without re-creating the capture graph.
const optionsRef = useRef(options);
optionsRef.current = options;
const streamRef = useRef<MediaStream | null>(null);
const audioContextRef = useRef<AudioContext | null>(null);
const sourceRef = useRef<MediaStreamAudioSourceNode | null>(null);
const workletRef = useRef<AudioWorkletNode | null>(null);
const clientRef = useRef<RealtimeDictationClient | null>(null);
const timerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const errorTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
// Defers the upstream/socket teardown a short beat after a graceful stop so the
// worklet's flushed tail frame can round-trip and be forwarded before we close.
const flushTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const canceledRef = useRef(false);
const startingRef = useRef(false);
// True once the server emits `ready`; audio is buffered until then, then flushed.
const readyRef = useRef(false);
// PCM16 chunks captured before the upstream session is ready.
const pendingAudioRef = useRef<ArrayBuffer[]>([]);
// Stable ref to the latest stop() so the max-duration timer (armed inside the
// start closure) can invoke the current version without re-arming every render.
const stopRef = useRef<() => void>(() => undefined);
const clearTimer = useCallback(() => {
if (timerRef.current !== null) {
clearTimeout(timerRef.current);
timerRef.current = null;
}
if (flushTimerRef.current !== null) {
clearTimeout(flushTimerRef.current);
flushTimerRef.current = null;
}
}, []);
const stopTracks = useCallback(() => {
streamRef.current?.getTracks().forEach((track) => track.stop());
streamRef.current = null;
}, []);
// Tear down the audio graph (worklet node, source, context). Never throws on a
// half-built or already-closed graph.
const teardownAudio = useCallback(() => {
const worklet = workletRef.current;
if (worklet) {
worklet.port.onmessage = null;
try {
worklet.disconnect();
} catch {
// Node may already be disconnected; ignore.
}
workletRef.current = null;
}
const source = sourceRef.current;
if (source) {
try {
source.disconnect();
} catch {
// Ignore disconnect of an already-detached node.
}
sourceRef.current = null;
}
const ctx = audioContextRef.current;
if (ctx) {
audioContextRef.current = null;
if (ctx.state !== "closed") {
// close() returns a promise; swallow rejections so teardown never throws.
void ctx.close().catch(() => undefined);
}
}
}, []);
// Full teardown shared by stop/cancel/unmount. Order: stop streaming upstream,
// disconnect the socket, then dismantle the local audio graph and tracks, then
// clear timers and reset the ready/pending state. Also clears the interim
// "ghost" decoration in the consumer so it does not stick when the toolbar
// closes mid-recording (the unmount path runs teardown).
const teardown = useCallback(() => {
const client = clientRef.current;
if (client) {
clientRef.current = null;
try {
client.stop();
} catch {
// Socket may already be gone; ignore.
}
client.disconnect();
}
teardownAudio();
stopTracks();
clearTimer();
readyRef.current = false;
pendingAudioRef.current = [];
startingRef.current = false;
// Clear any leftover interim decoration. Guarded so a throwing consumer
// callback can never break teardown.
try {
optionsRef.current.onInterim("");
} catch (err) {
console.error("[realtime-dictation] onInterim('') during teardown threw", err);
}
}, [teardownAudio, stopTracks, clearTimer]);
// Ask the worklet to emit its partial tail frame (the last ~150 ms that has not
// yet filled a full frame) so it is not lost on stop. The worklet posts the
// remaining samples back over the existing port.onmessage handler, which
// forwards them upstream before the socket is closed.
const flushWorklet = useCallback(() => {
try {
workletRef.current?.port.postMessage("flush");
} catch {
// Port may already be closed; ignore.
}
}, []);
// Surface a concrete failure: log it, notify, flip to "error", and reset to
// "idle" after a short delay (mirrors use-dictation's error timer).
const handleError = useCallback(
(message: string, err?: unknown) => {
if (canceledRef.current) return;
// Never log audio — only the textual reason.
console.error("[realtime-dictation]", message, err ?? "");
notifications.show({ color: "red", message });
teardown();
setStatus("error");
if (errorTimerRef.current !== null) {
clearTimeout(errorTimerRef.current);
}
errorTimerRef.current = setTimeout(() => {
errorTimerRef.current = null;
setStatus("idle");
}, 1500);
},
[teardown],
);
const start = useCallback(async (): Promise<void> => {
// Synchronous live guard (shared with the batch hook): status is stale between
// renders, so also block on refs to prevent a double-click from opening two
// MediaStreams / sockets.
if (
!canStartCapture({
starting: startingRef.current,
hasStream: streamRef.current !== null,
hasLiveResource:
audioContextRef.current !== null || clientRef.current !== null,
statusIsIdle: status === "idle",
})
) {
return;
}
startingRef.current = true;
canceledRef.current = false;
readyRef.current = false;
pendingAudioRef.current = [];
let stream: MediaStream;
try {
stream = await acquireMicStream();
} catch (err) {
if (err instanceof MicUnavailableError) {
console.error("[realtime-dictation] " + err.message);
notifications.show({
color: "red",
message: t(
"Audio recording is not available in this browser/context",
),
});
} else {
// Always log the full error for diagnosis (name, message, stack).
console.error("[realtime-dictation] getUserMedia failed", err);
notifications.show({
color: "red",
message: mapGetUserMediaError(err, t),
});
}
setStatus("idle");
startingRef.current = false;
return;
}
// If a stop/cancel landed during the await (the button was pressed while the
// permission prompt was still pending), drop the just-acquired stream and bail
// out cleanly so the mic does not stay physically on and the button does not
// stick on "recording".
if (canceledRef.current) {
stream.getTracks().forEach((track) => track.stop());
startingRef.current = false;
setStatus("idle");
return;
}
streamRef.current = stream;
// Build the capture graph. The worklet still resamples robustly if the browser
// ignores the 24 kHz hint, so any actual context rate is handled correctly.
const AudioCtx = getAudioContextCtor();
if (!AudioCtx) {
stopTracks();
notifications.show({
color: "red",
message: t("Audio recording is not available in this browser/context"),
});
setStatus("idle");
startingRef.current = false;
return;
}
let audioContext: AudioContext;
try {
audioContext = new AudioCtx({ sampleRate: 24000 });
audioContextRef.current = audioContext;
// AudioWorklet requires a secure context (https/localhost), same constraint
// as getUserMedia. A failure here means the UI should fall back to batch.
await audioContext.audioWorklet.addModule(PCM16_WORKLET_URL);
} catch (err) {
console.error("[realtime-dictation] audio worklet setup failed", err);
teardownAudio();
stopTracks();
const detail = (err as { message?: string })?.message ?? String(err);
notifications.show({
color: "red",
message: `${t("Could not start recording")}: ${detail}`,
});
setStatus("idle");
startingRef.current = false;
return;
}
// Another cancel could have landed during addModule().
if (canceledRef.current) {
teardownAudio();
stopTracks();
startingRef.current = false;
setStatus("idle");
return;
}
let source: MediaStreamAudioSourceNode;
let worklet: AudioWorkletNode;
try {
source = audioContext.createMediaStreamSource(stream);
worklet = new AudioWorkletNode(audioContext, "pcm16-worklet");
sourceRef.current = source;
workletRef.current = worklet;
// MediaStreamSource → worklet → destination. The worklet emits silence, so
// connecting to destination drives the render graph without echoing the mic.
source.connect(worklet);
worklet.connect(audioContext.destination);
} catch (err) {
console.error("[realtime-dictation] audio graph wiring failed", err);
teardownAudio();
stopTracks();
const detail = (err as { message?: string })?.message ?? String(err);
notifications.show({
color: "red",
message: `${t("Could not start recording")}: ${detail}`,
});
setStatus("idle");
startingRef.current = false;
return;
}
// Each worklet message is a PCM16 ArrayBuffer. Forward it once the upstream
// session is ready; until then buffer so no leading audio is dropped.
worklet.port.onmessage = (event: MessageEvent) => {
if (canceledRef.current) return;
const buf = event.data as ArrayBuffer;
if (!(buf instanceof ArrayBuffer)) return;
if (readyRef.current && clientRef.current) {
clientRef.current.sendAudio(buf);
} else {
pendingAudioRef.current.push(buf);
}
};
// Wire the realtime transport. The server replies `ready` once the upstream
// STT session is live; we then flush any buffered audio.
const client = new RealtimeDictationClient({
onReady: () => {
if (canceledRef.current) return;
readyRef.current = true;
const pending = pendingAudioRef.current;
pendingAudioRef.current = [];
for (const buf of pending) clientRef.current?.sendAudio(buf);
},
onInterim: (_itemId, text) => {
if (canceledRef.current) return;
optionsRef.current.onInterim(text);
},
onFinal: (_itemId, text) => {
if (canceledRef.current) return;
const trimmed = text.trim();
if (trimmed.length > 0) optionsRef.current.onFinal(trimmed);
},
onError: (message) => {
handleError(message);
},
onClosed: () => {
// The server ended the session (idle/max-duration or graceful upstream
// close). Skip if a cancel already tore everything down, or if an error
// path already owns the status (its error→idle timer is pending), or if a
// local stop already cleared the live refs. Otherwise tear down the capture
// graph + socket and return to idle so the mic/AudioContext don't leak and
// the button doesn't stay stuck on "recording".
if (canceledRef.current) return;
if (errorTimerRef.current !== null) return;
if (
!clientRef.current &&
!audioContextRef.current &&
!streamRef.current
) {
return;
}
teardown();
setStatus("idle");
},
});
clientRef.current = client;
// Notify the caller right when capture begins (before opening the socket) so
// the editor can snapshot the caret position.
try {
optionsRef.current.onStart?.();
} catch (err) {
console.error("[realtime-dictation] onStart callback threw", err);
}
// Open the socket, then ask the server to open the upstream session. The
// language hint is the base subtag of the resolved UI language (e.g. "en-US"
// → "en"), since the upstream transcription model expects an ISO language
// code, not a region-tagged locale; the server omits it upstream when absent.
client.connect();
const locale = i18n.resolvedLanguage || i18n.language || "";
const language = baseLanguageSubtag(locale);
client.start({ language });
setStatus("recording");
// Capture has truly begun; release the synchronous start guard.
startingRef.current = false;
const maxDurationMs = optionsRef.current.maxDurationMs ?? 120000;
timerRef.current = setTimeout(() => {
// Reuse stop() so the upstream is flushed/closed gracefully.
stopRef.current?.();
}, maxDurationMs);
}, [status, t, i18n, stopTracks, teardownAudio, handleError]);
const stop = useCallback((): void => {
// Nothing live and not mid-acquisition → no-op (never crash on idle).
if (
!clientRef.current &&
!audioContextRef.current &&
!streamRef.current &&
!startingRef.current
) {
return;
}
// If stop() is pressed while getUserMedia / addModule is still pending, the
// start() continuation has not yet stored every ref. Set the cancel flag so
// the awaiting start() path bails after its await and stops the stream it just
// acquired (otherwise the mic stays physically ON, the red indicator sticks,
// and the button stays on "recording"). teardown() below also tears down
// anything already wired (partial graph / socket), so either path leaves us
// fully idle. The flag is the same one cancel() uses to neutralize late
// socket/worklet callbacks.
if (startingRef.current) {
// Mid-acquisition: no worklet/socket to flush. Set the cancel flag (the
// awaiting start() bails and stops the just-acquired stream) and tear down
// anything already wired. UI returns to idle immediately.
canceledRef.current = true;
teardown();
setStatus("idle");
return;
}
// Graceful stop of a fully-live session: ask the worklet to emit its partial
// tail frame, then defer the socket/graph teardown a short beat so that tail
// can round-trip and be forwarded upstream before the session closes. The UI
// returns to idle right away; the deferred teardown is idempotent and is also
// cancelled by clearTimer() on any subsequent start/cancel/unmount.
if (workletRef.current && clientRef.current) {
flushWorklet();
if (flushTimerRef.current !== null) clearTimeout(flushTimerRef.current);
flushTimerRef.current = setTimeout(() => {
flushTimerRef.current = null;
teardown();
}, 60);
setStatus("idle");
return;
}
// No live worklet (e.g. graph half-built): tear down immediately.
teardown();
setStatus("idle");
}, [teardown, flushWorklet]);
// Keep the stop ref pointed at the latest stop() for the max-duration timer.
stopRef.current = stop;
const cancel = useCallback((): void => {
// Mark canceled first so any late socket/worklet callbacks are ignored.
canceledRef.current = true;
teardown();
setStatus("idle");
}, [teardown]);
// Clean up on unmount: stop tracks, close the context/worklet, disconnect the
// socket, and clear timers.
useEffect(() => {
return () => {
canceledRef.current = true;
if (errorTimerRef.current !== null) {
clearTimeout(errorTimerRef.current);
errorTimerRef.current = null;
}
teardown();
};
}, [teardown]);
return { status, start, stop, cancel };
}

View File

@@ -0,0 +1,91 @@
import { describe, it, expect } from "vitest";
import {
baseLanguageSubtag,
initSessionState,
onAudio,
onReady,
onInterim,
onFinal,
onCancel,
onStop,
} from "./dictation-reducer";
describe("baseLanguageSubtag", () => {
it("reduces a region-tagged locale to its base subtag", () => {
expect(baseLanguageSubtag("en-US")).toBe("en");
});
it("returns a bare subtag unchanged", () => {
expect(baseLanguageSubtag("en")).toBe("en");
});
it("returns undefined for empty / blank / nullish", () => {
expect(baseLanguageSubtag("")).toBeUndefined();
expect(baseLanguageSubtag(" ")).toBeUndefined();
expect(baseLanguageSubtag(undefined)).toBeUndefined();
expect(baseLanguageSubtag(null)).toBeUndefined();
});
});
function ab(n: number): ArrayBuffer {
return new ArrayBuffer(n);
}
describe("dictation session reducer", () => {
it("buffers audio until ready then flushes in order", () => {
const s = initSessionState();
const a = ab(1);
const b = ab(2);
expect(onAudio(s, a).send).toEqual([]);
expect(onAudio(s, b).send).toEqual([]);
expect(s.pending).toHaveLength(2);
const ready = onReady(s);
expect(ready.send).toEqual([a, b]); // flushed in arrival order
expect(s.pending).toHaveLength(0);
expect(s.ready).toBe(true);
// After ready, audio is sent immediately.
const c = ab(3);
expect(onAudio(s, c).send).toEqual([c]);
});
it("interim replaces interim", () => {
const s = initSessionState();
expect(onInterim(s, "hel").emitInterim).toBe("hel");
expect(onInterim(s, "hello").emitInterim).toBe("hello");
expect(s.interim).toBe("hello");
});
it("final trims and drops empty, clearing the interim", () => {
const s = initSessionState();
onInterim(s, "draft");
expect(onFinal(s, " hi there ").emitFinal).toBe("hi there");
expect(s.interim).toBe("");
const empty = onFinal(s, " ");
expect(empty.emitFinal).toBeUndefined();
});
it("cancel drops pending and ignores later events", () => {
const s = initSessionState();
onAudio(s, ab(1));
onCancel(s);
expect(s.pending).toHaveLength(0);
expect(s.canceled).toBe(true);
// Later events are no-ops.
expect(onAudio(s, ab(2)).send).toEqual([]);
expect(onReady(s).send).toEqual([]);
expect(onInterim(s, "late").emitInterim).toBeUndefined();
expect(onFinal(s, "late").emitFinal).toBeUndefined();
});
it("closed/stop after stop is a no-op", () => {
const s = initSessionState();
onReady(s);
onStop(s);
expect(s.canceled).toBe(true);
// Audio arriving after stop is ignored (server has no session).
expect(onAudio(s, ab(1)).send).toEqual([]);
expect(onInterim(s, "x").emitInterim).toBeUndefined();
});
});

View File

@@ -0,0 +1,113 @@
// Pure logic extracted from `use-realtime-dictation` so the transcript/session
// state machine can be unit-tested without React or a live socket. The hook wires
// these to refs/callbacks; nothing here touches the DOM or Web Audio.
/**
* Reduce a BCP-47 locale to its base language subtag for the upstream STT model,
* which expects an ISO language code, not a region-tagged locale.
* "en-US" → "en", "en" → "en", "" → undefined, " " → undefined.
* Returns undefined when no usable subtag exists so the server can omit the hint.
*/
export function baseLanguageSubtag(locale: string | undefined | null): string | undefined {
if (!locale) return undefined;
const base = locale.trim().split("-")[0]?.trim();
return base && base.length > 0 ? base : undefined;
}
/**
* Session/transcript reducer. Models the audio-buffering + interim/final/cancel
* lifecycle as a pure state object so the ordering rules (buffer-until-ready,
* cancel ignores later events, closed-after-stop is a no-op) are testable. The
* hook keeps the live socket/graph; this only decides what to emit.
*/
export interface DictationSessionState {
// Server has confirmed the upstream session; audio may flow.
ready: boolean;
// Local stop/cancel happened; later interim/final/audio are ignored.
canceled: boolean;
// Audio captured before `ready`; flushed in arrival order once ready.
pending: ArrayBuffer[];
// Latest interim transcript for the live (not-yet-final) segment.
interim: string;
}
export function initSessionState(): DictationSessionState {
return { ready: false, canceled: false, pending: [], interim: "" };
}
// Effects the hook should perform after a reduction. Keeps the reducer pure: it
// describes what to do, the hook does it (send over the socket, call callbacks).
export interface DictationEffects {
// Audio chunks to send upstream now, in order.
send: ArrayBuffer[];
// Interim text to surface, if it changed.
emitInterim?: string;
// Final (trimmed, non-empty) text to surface.
emitFinal?: string;
}
const NONE: DictationEffects = { send: [] };
/** Audio chunk captured: send immediately if ready, else buffer it. */
export function onAudio(
state: DictationSessionState,
buf: ArrayBuffer,
): DictationEffects {
if (state.canceled) return NONE;
if (state.ready) return { send: [buf] };
state.pending.push(buf);
return NONE;
}
/** Server ready: flush all buffered audio in order, then stream live. */
export function onReady(state: DictationSessionState): DictationEffects {
if (state.canceled) return NONE;
state.ready = true;
const send = state.pending;
state.pending = [];
return { send };
}
/** Interim transcript: replaces the previous interim for the live segment. */
export function onInterim(
state: DictationSessionState,
text: string,
): DictationEffects {
if (state.canceled) return NONE;
state.interim = text;
return { send: [], emitInterim: text };
}
/**
* Final transcript: trim and drop if empty; the live interim segment is cleared
* (the final supersedes it).
*/
export function onFinal(
state: DictationSessionState,
text: string,
): DictationEffects {
if (state.canceled) return NONE;
const trimmed = text.trim();
state.interim = "";
if (trimmed.length === 0) return { send: [] };
return { send: [], emitFinal: trimmed };
}
/** Cancel: drop pending audio and ignore all later events. */
export function onCancel(state: DictationSessionState): DictationEffects {
state.canceled = true;
state.pending = [];
state.interim = "";
return NONE;
}
/**
* Stop: like cancel for the purposes of "no more events should be processed".
* Distinct name kept so the hook can flush the worklet tail before stopping; the
* reducer treats post-stop events as no-ops the same way.
*/
export function onStop(state: DictationSessionState): DictationEffects {
state.canceled = true;
state.pending = [];
return NONE;
}

View File

@@ -0,0 +1,185 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
// --- Mock socket.io-client with a controllable fake socket --------------------
// The mock records registered listeners so tests can fire server events, and
// records emits so the start/reconnect behavior can be asserted.
interface FakeSocket {
connected: boolean;
listeners: Record<string, ((...args: unknown[]) => void)[]>;
emits: { event: string; args: unknown[] }[];
on: (e: string, cb: (...a: unknown[]) => void) => FakeSocket;
emit: (e: string, ...a: unknown[]) => void;
connect: () => void;
disconnect: () => void;
removeAllListeners: () => void;
fire: (e: string, ...a: unknown[]) => void;
}
function makeFakeSocket(): FakeSocket {
const socket: FakeSocket = {
connected: false,
listeners: {},
emits: [],
on(e, cb) {
(socket.listeners[e] ??= []).push(cb);
return socket;
},
emit(e, ...a) {
socket.emits.push({ event: e, args: a });
},
connect() {
socket.connected = true;
socket.fire("connect");
},
disconnect() {
socket.connected = false;
},
removeAllListeners() {
socket.listeners = {};
},
fire(e, ...a) {
(socket.listeners[e] ?? []).forEach((cb) => cb(...a));
},
};
return socket;
}
let lastSocket: FakeSocket;
const ioMock = vi.fn((..._args: unknown[]) => {
lastSocket = makeFakeSocket();
return lastSocket;
});
vi.mock("socket.io-client", () => ({
io: (...args: unknown[]) => ioMock(...args),
Socket: class {},
}));
vi.mock("@/features/websocket/types", () => ({ SOCKET_URL: undefined }));
import { RealtimeDictationClient } from "./realtime-dictation-client";
function makeHandlers() {
return {
onReady: vi.fn(),
onInterim: vi.fn(),
onFinal: vi.fn(),
onError: vi.fn(),
onClosed: vi.fn(),
};
}
beforeEach(() => {
ioMock.mockClear();
});
describe("RealtimeDictationClient", () => {
it("uses a single io() call with the bare namespace URL and shared opts", () => {
const c = new RealtimeDictationClient(makeHandlers());
c.connect();
expect(ioMock).toHaveBeenCalledTimes(1);
const call = ioMock.mock.calls[0] as unknown[];
expect(call[0]).toBe("/ai-realtime");
expect(call[1]).toMatchObject({
transports: ["websocket"],
withCredentials: true,
autoConnect: false,
});
});
it("decodes ready/interim/final with ?? '' defaults", () => {
const h = makeHandlers();
const c = new RealtimeDictationClient(h);
c.connect();
lastSocket.fire("ready");
expect(h.onReady).toHaveBeenCalledTimes(1);
lastSocket.fire("interim", { itemId: "a", text: "hi" });
expect(h.onInterim).toHaveBeenCalledWith("a", "hi");
lastSocket.fire("interim", {});
expect(h.onInterim).toHaveBeenCalledWith("", "");
lastSocket.fire("final", { itemId: "b", text: "done" });
expect(h.onFinal).toHaveBeenCalledWith("b", "done");
lastSocket.fire("final", undefined);
expect(h.onFinal).toHaveBeenCalledWith("", "");
});
it("surfaces error (string and object) and connect_error", () => {
const h = makeHandlers();
const c = new RealtimeDictationClient(h);
c.connect();
lastSocket.fire("error", "boom");
expect(h.onError).toHaveBeenCalledWith("boom");
});
it("error fires at most once per connection (error-once guard)", () => {
const h = makeHandlers();
const c = new RealtimeDictationClient(h);
c.connect();
lastSocket.fire("error", { message: "first" });
lastSocket.fire("connect_error", new Error("second"));
lastSocket.fire("error", "third");
expect(h.onError).toHaveBeenCalledTimes(1);
expect(h.onError).toHaveBeenCalledWith("first");
});
it("connect_error builds a concrete message", () => {
const h = makeHandlers();
const c = new RealtimeDictationClient(h);
c.connect();
lastSocket.fire("connect_error", new Error("handshake"));
expect(h.onError).toHaveBeenCalledWith(
"Realtime connection failed: handshake",
);
});
it("emits start once on first connect after start()", () => {
const c = new RealtimeDictationClient(makeHandlers());
c.connect(); // fires connect, socket.connected = true
c.start({ language: "en" });
const starts = lastSocket.emits.filter((e) => e.event === "start");
expect(starts).toHaveLength(1);
expect(starts[0].args[0]).toEqual({ language: "en" });
});
it("re-emits start on reconnect (does not double-start while live)", () => {
const c = new RealtimeDictationClient(makeHandlers());
c.connect();
c.start({ language: "en" });
// A second connect with no disconnect must NOT re-start (still live).
lastSocket.fire("connect");
let starts = lastSocket.emits.filter((e) => e.event === "start");
expect(starts).toHaveLength(1);
// Transient drop then reconnect → re-establish the session exactly once.
lastSocket.fire("disconnect");
lastSocket.fire("connect");
starts = lastSocket.emits.filter((e) => e.event === "start");
expect(starts).toHaveLength(2);
expect(starts[1].args[0]).toEqual({ language: "en" });
});
it("disconnect removes listeners and resets the error flag", () => {
const h = makeHandlers();
const c = new RealtimeDictationClient(h);
c.connect();
const removeSpy = vi.spyOn(lastSocket, "removeAllListeners");
c.disconnect();
expect(removeSpy).toHaveBeenCalled();
expect(lastSocket.connected).toBe(false);
// A fresh connect on the reused instance can error again.
c.connect();
lastSocket.fire("error", "again");
expect(h.onError).toHaveBeenCalledWith("again");
});
it("connect() is a no-op while a socket already exists", () => {
const c = new RealtimeDictationClient(makeHandlers());
c.connect();
c.connect();
expect(ioMock).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,156 @@
import { io, Socket } from "socket.io-client";
import { SOCKET_URL } from "@/features/websocket/types";
// Handlers the hook supplies; the client translates the normalized `/ai-realtime`
// Socket.IO events into these callbacks. The client itself owns no React state —
// it is a thin transport wrapper so the hook can stay focused on the audio graph.
export interface RealtimeDictationHandlers {
// Upstream STT session is established; safe to start sending audio.
onReady: () => void;
// Latest partial transcript for the current (not-yet-final) segment.
onInterim: (itemId: string, text: string) => void;
// A completed segment's transcript.
onFinal: (itemId: string, text: string) => void;
// Concrete failure reason (connect error or server-surfaced error).
onError: (message: string) => void;
// Session ended (graceful stop or upstream closed).
onClosed: () => void;
}
interface StartOptions {
language?: string;
}
// Wraps the dedicated `/ai-realtime` Socket.IO namespace. Cookie-based auth rides
// the handshake via `withCredentials` (no bearer token), exactly like the main
// app socket. `autoConnect: false` lets the hook wire listeners up before the
// handshake fires so no early event is missed.
export class RealtimeDictationClient {
private socket: Socket | null = null;
// onError must fire at most once per session: the server `error` and socket
// `connect_error` can both arrive (e.g. an error then a failed reconnect), but
// the hook owns the error→idle flow and a second call would double-fire it.
private erroredFlag = false;
// The last `start` params, retained so we can re-establish the upstream session
// after a transient socket.io reconnect (otherwise the server has no session and
// silently drops audio). Null until start() is first called.
private startOptions: StartOptions | null = null;
// True between a successful `start` emit and the next disconnect, so a reconnect
// re-emits `start` exactly once and we never double-start a live session.
private started = false;
constructor(private readonly handlers: RealtimeDictationHandlers) {}
// Forward the first error reason only; later error/connect_error are swallowed.
private emitError(message: string): void {
if (this.erroredFlag) return;
this.erroredFlag = true;
this.handlers.onError(message);
}
// Create the socket, register listeners, then open the connection. Safe to call
// once per client instance; a second call is a no-op while a socket exists.
connect(): void {
if (this.socket) return;
// Fresh socket → allow onError to fire again for this connection.
this.erroredFlag = false;
// SOCKET_URL is undefined in this app (socket.io derives the page origin), so
// the `/ai-realtime` namespace rides the same `/socket.io` path as the main
// socket — which the Vite dev server proxies as a websocket. The URL is the
// only thing that varies; the options are shared (single io() call).
const url = SOCKET_URL ? `${SOCKET_URL}/ai-realtime` : "/ai-realtime";
const socket: Socket = io(url, {
transports: ["websocket"],
withCredentials: true,
autoConnect: false,
});
this.socket = socket;
// On every (re)connect, re-establish the upstream session if start() has run
// but we are not currently in a started session. The first connect after
// start() is handled by start() itself (started === true by then); this branch
// covers reconnects after a transient drop, where the server lost the session
// and would otherwise silently discard all subsequent audio. The `started`
// guard prevents double-starting a live session.
socket.on("connect", () => {
if (this.startOptions && !this.started) {
this.started = true;
socket.emit("start", { language: this.startOptions.language });
}
});
// A disconnect (transient drop or close) ends the server-side session; clear
// `started` so the next `connect` re-emits `start`.
socket.on("disconnect", () => {
this.started = false;
});
socket.on("ready", () => this.handlers.onReady());
socket.on("interim", (payload: { itemId: string; text: string }) => {
this.handlers.onInterim(payload?.itemId ?? "", payload?.text ?? "");
});
socket.on("final", (payload: { itemId: string; text: string }) => {
this.handlers.onFinal(payload?.itemId ?? "", payload?.text ?? "");
});
socket.on("error", (payload: { message?: string } | string) => {
const message =
typeof payload === "string"
? payload
: payload?.message || "Realtime dictation error";
this.emitError(message);
});
socket.on("closed", () => this.handlers.onClosed());
// Low-level transport failure (handshake/auth/proxy). Surface a concrete cause.
socket.on("connect_error", (err: Error) => {
const message = err?.message
? `Realtime connection failed: ${err.message}`
: "Realtime connection failed";
this.emitError(message);
});
socket.connect();
}
// Ask the server to resolve config and open the upstream STT session. The params
// are retained so a post-reconnect `connect` can re-establish the session.
start(opts: StartOptions): void {
this.startOptions = opts;
// If the socket is already connected, emit now and mark started; otherwise the
// `connect` handler will emit once the handshake completes.
if (this.socket?.connected && !this.started) {
this.started = true;
this.socket.emit("start", { language: opts.language });
}
}
// Forward a raw PCM16 chunk; socket.io serializes the ArrayBuffer as binary.
sendAudio(buf: ArrayBuffer): void {
this.socket?.emit("audio", buf);
}
// Request a graceful flush/close of the upstream session.
stop(): void {
this.socket?.emit("stop");
}
// Tear down: drop every listener and close the connection. Idempotent.
disconnect(): void {
const socket = this.socket;
if (!socket) return;
this.socket = null;
// Reset so a subsequent connect() on a reused instance can error again and a
// fresh session can be started.
this.erroredFlag = false;
this.started = false;
this.startOptions = null;
socket.removeAllListeners();
socket.disconnect();
}
}

View File

@@ -0,0 +1,28 @@
import { describe, it, expect } from "vitest";
import { clampRange } from "./dictation-group";
describe("clampRange", () => {
it("returns the range unchanged when it is within bounds", () => {
expect(clampRange(2, 5, 10)).toEqual({ from: 2, to: 5 });
});
it("clamps the upper bound to the doc size (off-by-one at the end)", () => {
expect(clampRange(8, 12, 10)).toEqual({ from: 8, to: 10 });
// Exactly at the size is in-bounds, not clamped.
expect(clampRange(10, 10, 10)).toEqual({ from: 10, to: 10 });
});
it("clamps negative positions up to 0 (off-by-one at the start)", () => {
expect(clampRange(-3, 4, 10)).toEqual({ from: 0, to: 4 });
expect(clampRange(-5, -1, 10)).toEqual({ from: 0, to: 0 });
});
it("clamps both ends when both are out of range", () => {
expect(clampRange(-2, 99, 6)).toEqual({ from: 0, to: 6 });
});
it("handles an empty document (size 0)", () => {
expect(clampRange(0, 0, 0)).toEqual({ from: 0, to: 0 });
expect(clampRange(3, 7, 0)).toEqual({ from: 0, to: 0 });
});
});

View File

@@ -1,12 +1,36 @@
import { FC, useRef } from "react";
import type { Editor } from "@tiptap/react";
import { useAtomValue } from "jotai";
import { MicButton } from "@/features/dictation/components/mic-button";
import { RealtimeMicButton } from "@/features/dictation/components/realtime-mic-button";
import { workspaceAtom } from "@/features/user/atoms/current-user-atom";
import {
setDictationInterim,
clearDictationInterim,
} from "@/features/editor/extensions/dictation-interim/dictation-interim.ts";
interface Props {
editor: Editor;
}
/**
* Clamp a [from, to] range into the current document bounds [0, size]. The
* document may have shrunk during transcription (e.g. a collaborative edit), so
* a captured snapshot range can point past the end; clamp it before inserting.
* Pure + unit-testable.
*/
export function clampRange(
from: number,
to: number,
size: number,
): { from: number; to: number } {
const clamp = (p: number) => Math.max(0, Math.min(p, size));
return { from: clamp(from), to: clamp(to) };
}
export const DictationGroup: FC<Props> = ({ editor }) => {
const workspace = useAtomValue(workspaceAtom);
const isRealtime = workspace?.settings?.ai?.dictationRealtime === true;
const rangeRef = useRef<{ from: number; to: number } | null>(null);
const handleStart = () => {
@@ -23,18 +47,15 @@ export const DictationGroup: FC<Props> = ({ editor }) => {
// The document may have shrunk during transcription (e.g. a collaborative
// edit), so clamp the snapshot into the current bounds before inserting.
const docSize = editor.state.doc.content.size;
const clamp = (p: number) => Math.max(0, Math.min(p, docSize));
try {
if (snapshot) {
// Insert at the snapshotted caret; a trailing space keeps words
// separated (the hook already trims the transcribed text).
const range = clampRange(snapshot.from, snapshot.to, docSize);
editor
.chain()
.focus()
.insertContentAt(
{ from: clamp(snapshot.from), to: clamp(snapshot.to) },
`${text} `,
)
.insertContentAt(range, `${text} `)
.run();
} else {
editor.chain().focus().insertContent(`${text} `).run();
@@ -50,6 +71,66 @@ export const DictationGroup: FC<Props> = ({ editor }) => {
}
};
// Realtime path: commit each final segment at the SNAPSHOT range captured on
// start — the same snapshot the batch branch uses. The live caret can drift
// while a segment streams (the user keeps typing, a collaborator edits), so
// inserting at the current selection would land text in the wrong place and
// split words. We advance the snapshot past each committed segment so the
// next final lands right after it (left-to-right accumulation). Interim is
// shown via the ghost decoration only.
if (isRealtime) {
return (
<RealtimeMicButton
size="md"
disabled={!editor.isEditable}
onStart={() => {
if (editor && !editor.isDestroyed) {
const { from, to } = editor.state.selection;
rangeRef.current = { from, to };
clearDictationInterim(editor);
}
}}
onInterim={(text) => {
if (editor && !editor.isDestroyed) setDictationInterim(editor, text);
}}
onFinal={(text) => {
if (!editor || editor.isDestroyed) return;
// Never write into a read-only page (e.g. a collaborator revoked edit
// access, or this is a read-only view): dictated text must not land.
if (!editor.isEditable) return;
clearDictationInterim(editor);
const snapshot = rangeRef.current;
const docSize = editor.state.doc.content.size;
const content = `${text} `;
try {
if (snapshot) {
const range = clampRange(snapshot.from, snapshot.to, docSize);
editor
.chain()
.focus()
.insertContentAt(range, content)
.run();
// Advance the snapshot past what we just inserted so the next
// final segment appends after it instead of overwriting it.
const next = range.from + content.length;
rangeRef.current = { from: next, to: next };
} else {
editor.chain().focus().insertContent(content).run();
}
} catch {
// The snapshot drifted out of range or the editor was destroyed
// mid-stream; fall back to the current caret.
try {
editor.chain().focus().insertContent(content).run();
} catch {
// The editor may have been destroyed; ignore.
}
}
}}
/>
);
}
return (
<MicButton
size="md"

View File

@@ -0,0 +1,158 @@
import { describe, it, expect, beforeEach } from "vitest";
import { Editor } from "@tiptap/core";
import Document from "@tiptap/extension-document";
import Paragraph from "@tiptap/extension-paragraph";
import TiptapText from "@tiptap/extension-text";
import { History } from "@tiptap/extension-history";
import {
DictationInterim,
applyInterimMeta,
setDictationInterim,
clearDictationInterim,
} from "./dictation-interim";
// --- applyInterimMeta (pure reducer) ---------------------------------------
describe("applyInterimMeta", () => {
it("replaces the interim text on a meta-only update", () => {
expect(applyInterimMeta({ text: "hello" }, { text: "" })).toEqual({
text: "hello",
});
});
it("replaces with empty text (clear) when meta carries an empty string", () => {
expect(applyInterimMeta({ text: "" }, { text: "old" })).toEqual({
text: "",
});
});
it("passes the previous state through when there is no meta", () => {
const prev = { text: "kept" };
expect(applyInterimMeta(undefined, prev)).toBe(prev);
});
});
// --- editor integration (regression guard) ---------------------------------
//
// A minimal headless editor in jsdom: doc/paragraph/text only, plus History so
// we can assert the interim never creates an undo step. The whole point of the
// feature is that interim text is a DECORATION, never written into the doc —
// these tests are the regression guard for that invariant.
function makeEditor(content = "<p>seed</p>") {
const element = document.createElement("div");
document.body.appendChild(element);
return new Editor({
element,
extensions: [
Document,
Paragraph,
TiptapText,
History,
DictationInterim,
],
content,
});
}
describe("DictationInterim editor integration", () => {
let editor: Editor;
beforeEach(() => {
editor = makeEditor();
});
it("set/clear produce NO doc change and NO steps (interim is never inserted)", () => {
const before = editor.state.doc.toJSON();
let captured: { docChanged: boolean; steps: number } | null = null;
const handler = ({ transaction }: { transaction: any }) => {
captured = {
docChanged: transaction.docChanged,
steps: transaction.steps.length,
};
};
editor.on("transaction", handler);
setDictationInterim(editor, "partial words");
expect(captured).not.toBeNull();
expect(captured!.docChanged).toBe(false);
expect(captured!.steps).toBe(0);
captured = null;
clearDictationInterim(editor);
expect(captured!.docChanged).toBe(false);
expect(captured!.steps).toBe(0);
editor.off("transaction", handler);
expect(editor.state.doc.toJSON()).toEqual(before);
});
it("interim updates add no undo steps; undo reverts typed text, not the interim", () => {
// Type real text (this IS an undoable step).
editor.commands.insertContent(" typed");
const afterTyping = editor.getText();
expect(afterTyping).toContain("typed");
// Interim updates must not stack onto history.
setDictationInterim(editor, "ghost");
setDictationInterim(editor, "ghost two");
clearDictationInterim(editor);
// A single undo reverts the typed text — proving the interim added no
// history entries that would have to be undone first.
editor.commands.undo();
expect(editor.getText()).not.toContain("typed");
});
it("empty interim text yields no decoration widget", () => {
setDictationInterim(editor, "");
const decos = decorationCount(editor);
expect(decos).toBe(0);
});
it("non-empty interim yields exactly one contenteditable=false widget at the caret", () => {
setDictationInterim(editor, "live");
expect(decorationCount(editor)).toBe(1);
const span = editor.view.dom.querySelector(
'span[contenteditable="false"]',
) as HTMLElement | null;
expect(span).not.toBeNull();
expect(span!.textContent).toBe("live");
});
it("the decoration remaps to follow the caret/selection head on edits", () => {
setDictationInterim(editor, "tail");
const headBefore = decorationPos(editor);
expect(headBefore).toBe(editor.state.selection.head);
// Move the caret to the document start; the widget must follow selection.head.
editor.commands.setTextSelection(1);
expect(decorationPos(editor)).toBe(editor.state.selection.head);
});
});
// Helpers reaching into the plugin's decoration set via the editor view.
function decorationCount(editor: Editor): number {
let count = 0;
editor.state.plugins.forEach((plugin) => {
const decos = plugin.props?.decorations?.call(plugin, editor.state);
if (decos && (decos as any).find) {
count += (decos as any).find().length;
}
});
return count;
}
function decorationPos(editor: Editor): number | null {
let pos: number | null = null;
editor.state.plugins.forEach((plugin) => {
const decos = plugin.props?.decorations?.call(plugin, editor.state);
if (decos && (decos as any).find) {
const found = (decos as any).find();
if (found.length > 0) pos = found[0].from;
}
});
return pos;
}

View File

@@ -0,0 +1,108 @@
import { Extension } from "@tiptap/core";
import type { Editor } from "@tiptap/core";
import { Plugin, PluginKey } from "@tiptap/pm/state";
import { Decoration, DecorationSet } from "@tiptap/pm/view";
// Plugin key shared by the extension and the imperative helpers below so they
// dispatch/read the same plugin state.
const dictationInterimKey = new PluginKey<DictationInterimState>(
"dictationInterim",
);
interface DictationInterimState {
// The current interim (partial) transcript. Empty string means "no ghost".
text: string;
}
/**
* Pure interim-state reducer (extracted for unit testing): a meta-only update
* replaces the interim text; any other transaction passes the previous state
* through unchanged. The decoration follows the caret on its own because it is
* recomputed against the live selection on every render — so non-meta edits do
* not need to touch this state.
*/
export function applyInterimMeta(
meta: DictationInterimState | undefined,
prev: DictationInterimState,
): DictationInterimState {
if (meta) {
return { text: meta.text };
}
return prev;
}
/**
* B2 editor decoration: shows the realtime interim (partial) transcript as a
* ghost widget at the caret. The interim is held ONLY in plugin meta state and
* rendered as a widget Decoration — it is NEVER written into the document, so
* it produces no Yjs update and no history entry. Only final segments are
* committed (by the dictation-group / chat consumers).
*/
export const DictationInterim = Extension.create({
name: "dictationInterim",
addProseMirrorPlugins() {
return [
new Plugin<DictationInterimState>({
key: dictationInterimKey,
state: {
init: (): DictationInterimState => ({ text: "" }),
apply: (tr, value): DictationInterimState => {
const meta = tr.getMeta(dictationInterimKey) as
| DictationInterimState
| undefined;
return applyInterimMeta(meta, value);
},
},
props: {
decorations(state) {
const pluginState = dictationInterimKey.getState(state);
const text = pluginState?.text ?? "";
if (!text) {
return null;
}
// Render the interim as an inline ghost at the caret. Inline styles
// keep this self-contained — no global CSS is required.
const widget = Decoration.widget(
state.selection.head,
() => {
const span = document.createElement("span");
span.textContent = text;
span.setAttribute("contenteditable", "false");
span.style.opacity = "0.5";
span.style.fontStyle = "italic";
return span;
},
{ side: 1, ignoreSelection: true },
);
return DecorationSet.create(state.doc, [widget]);
},
},
}),
];
},
});
/**
* Set the interim ghost text via a META-ONLY transaction — no doc steps, so it
* generates no Yjs update and no history entry.
*/
export function setDictationInterim(editor: Editor, text: string): void {
editor.view.dispatch(
editor.state.tr.setMeta(dictationInterimKey, { text }),
);
}
/**
* Clear the interim ghost text via a META-ONLY transaction (same no-op-on-doc
* guarantee as setDictationInterim).
*/
export function clearDictationInterim(editor: Editor): void {
editor.view.dispatch(
editor.state.tr.setMeta(dictationInterimKey, { text: "" }),
);
}
export default DictationInterim;

View File

@@ -123,6 +123,7 @@ import { countWords } from "alfaaz";
import AutoJoiner from "@/features/editor/extensions/autojoiner.ts";
import GlobalDragHandle from "@/features/editor/extensions/drag-handle.ts";
import { CleanStyles } from "@/features/editor/extensions/clean-styles.ts";
import { DictationInterim } from "@/features/editor/extensions/dictation-interim/dictation-interim.ts";
const lowlight = createLowlight(common);
lowlight.register("mermaid", plaintext);
@@ -343,6 +344,7 @@ export const mainExtensions = [
},
}),
Selection,
DictationInterim,
Attachment.configure({
view: AttachmentView,
}),

View File

@@ -0,0 +1,103 @@
import { describe, it, expect } from "vitest";
import {
resolveUrl,
resolveKeyField,
resolveCardStatus,
isEndpointConfigured,
} from "./ai-provider-settings";
describe("resolveUrl", () => {
it("trims a single trailing slash before appending the path", () => {
expect(resolveUrl("https://api.example.com/", "/chat/completions")).toBe(
"https://api.example.com/chat/completions",
);
});
it("leaves a base without a trailing slash intact", () => {
expect(resolveUrl("https://api.example.com", "/embeddings")).toBe(
"https://api.example.com/embeddings",
);
});
it("falls back to the fallback base when the base is empty", () => {
expect(resolveUrl("", "/audio/transcriptions", "https://chat.example.com")).toBe(
"https://chat.example.com/audio/transcriptions",
);
});
it("falls back when the base is whitespace-only", () => {
expect(resolveUrl(" ", "/embeddings", "https://chat.example.com")).toBe(
"https://chat.example.com/embeddings",
);
});
it("returns just the path when both base and fallback are empty", () => {
expect(resolveUrl("", "/chat/completions")).toBe("/chat/completions");
});
});
describe("resolveKeyField", () => {
it("a non-empty buffer is set to that value", () => {
expect(resolveKeyField("secret", false)).toEqual({
set: true,
value: "secret",
});
});
it("explicitly cleared with an empty buffer sets the empty string", () => {
expect(resolveKeyField("", true)).toEqual({ set: true, value: "" });
});
it("untouched (empty buffer, not cleared) omits the key", () => {
expect(resolveKeyField("", false)).toEqual({ set: false });
});
it("a buffer wins over the cleared flag (typed key takes precedence)", () => {
// Security-relevant for the write-only sttApiKey: a freshly typed secret
// must be written even if a prior clear was requested.
expect(resolveKeyField("new-secret", true)).toEqual({
set: true,
value: "new-secret",
});
});
});
describe("isEndpointConfigured", () => {
it("model + own base URL -> configured", () => {
expect(isEndpointConfigured("model", "https://own", "")).toBe(true);
});
it("model + inherited chat base URL (own empty) -> configured", () => {
expect(isEndpointConfigured("model", "", "https://chat")).toBe(true);
});
it("model set but both base URLs empty -> not configured", () => {
expect(isEndpointConfigured("model", "", "")).toBe(false);
});
it("whitespace-only base URLs do not count as filled", () => {
expect(isEndpointConfigured("model", " ", " ")).toBe(false);
});
it("empty model -> not configured even with a base URL", () => {
expect(isEndpointConfigured("", "https://own", "https://chat")).toBe(false);
});
});
describe("resolveCardStatus", () => {
it("configured + enabled -> ready", () => {
expect(resolveCardStatus(true, true)).toBe("ready");
});
it("configured + disabled -> configured", () => {
expect(resolveCardStatus(true, false)).toBe("configured");
});
it("not configured + disabled -> off", () => {
expect(resolveCardStatus(false, false)).toBe("off");
});
it("enabled but not configured -> warning (a real misconfiguration)", () => {
expect(resolveCardStatus(false, true)).toBe("warning");
});
});

View File

@@ -32,6 +32,7 @@ import {
useAiSettingsQuery,
useReindexAiEmbeddingsMutation,
useTestAiConnectionMutation,
useTestRealtimeConnectionMutation,
useUpdateAiSettingsMutation,
} from "@/features/workspace/queries/ai-settings-query.ts";
import {
@@ -62,6 +63,10 @@ const formSchema = z.object({
// STT-specific fields. Empty base URL / key fall back to the chat ones.
sttModel: z.string(),
sttBaseUrl: z.string(),
// Realtime (streaming) STT fields. Empty model falls back to sttModel and
// empty base URL falls back to the STT base URL server-side.
sttRealtimeModel: z.string(),
sttRealtimeBaseUrl: z.string(),
sttApiStyle: z.enum(["multipart", "json"]),
sttApiKey: z.string(),
});
@@ -81,7 +86,7 @@ type CardStatus = "ready" | "configured" | "off" | "warning";
// Resolve a "Base URL + path" hint defensively: trim a single trailing slash
// off the base, then append the path. Empty base falls back to `fallback`
// (the chat base URL for the embedding/voice endpoints). Purely cosmetic.
function resolveUrl(base: string, path: string, fallback = ""): string {
export function resolveUrl(base: string, path: string, fallback = ""): string {
const trimmed = (base.trim() || fallback.trim()).replace(/\/$/, "");
return `${trimmed}${path}`;
}
@@ -176,6 +181,8 @@ export default function AiProviderSettings() {
const chatTest = useTestAiConnectionMutation();
const embedTest = useTestAiConnectionMutation();
const sttTest = useTestAiConnectionMutation();
// Realtime probe hits a separate /ai-chat/realtime/test route (admin-gated).
const realtimeTest = useTestRealtimeConnectionMutation();
// Agent roles drive the public-share assistant identity picker. Admin-gated
// (the component returns early for non-admins), same as the AI settings query.
@@ -192,6 +199,8 @@ export default function AiProviderSettings() {
const [dictationEnabled, setDictationEnabled] = useState<boolean>(
workspace?.settings?.ai?.dictation ?? false,
);
const [realtimeDictationEnabled, setRealtimeDictationEnabled] =
useState<boolean>(workspace?.settings?.ai?.dictationRealtime ?? false);
const [publicShareAssistantEnabled, setPublicShareAssistantEnabled] =
useState<boolean>(
workspace?.settings?.ai?.publicShareAssistant ?? false,
@@ -199,6 +208,10 @@ export default function AiProviderSettings() {
const [chatToggleLoading, setChatToggleLoading] = useState(false);
const [searchToggleLoading, setSearchToggleLoading] = useState(false);
const [dictationToggleLoading, setDictationToggleLoading] = useState(false);
const [
realtimeDictationToggleLoading,
setRealtimeDictationToggleLoading,
] = useState(false);
const [
publicShareAssistantToggleLoading,
setPublicShareAssistantToggleLoading,
@@ -232,6 +245,8 @@ export default function AiProviderSettings() {
embeddingApiKey: "",
sttModel: "",
sttBaseUrl: "",
sttRealtimeModel: "",
sttRealtimeBaseUrl: "",
sttApiStyle: "multipart" as SttApiStyle,
sttApiKey: "",
},
@@ -253,6 +268,8 @@ export default function AiProviderSettings() {
embeddingApiKey: "",
sttModel: settings.sttModel ?? "",
sttBaseUrl: settings.sttBaseUrl ?? "",
sttRealtimeModel: settings.sttRealtimeModel ?? "",
sttRealtimeBaseUrl: settings.sttRealtimeBaseUrl ?? "",
sttApiStyle: settings.sttApiStyle ?? "multipart",
sttApiKey: "",
});
@@ -287,6 +304,10 @@ export default function AiProviderSettings() {
// server-side.
sttModel: values.sttModel,
sttBaseUrl: values.sttBaseUrl,
// Realtime STT: empty model falls back to sttModel, empty base URL falls
// back to the STT base URL server-side.
sttRealtimeModel: values.sttRealtimeModel,
sttRealtimeBaseUrl: values.sttRealtimeBaseUrl,
sttApiStyle: values.sttApiStyle,
};
@@ -434,6 +455,35 @@ export default function AiProviderSettings() {
}
}
// Optimistic toggle for the "Realtime dictation" feature
// (settings.ai.dictationRealtime). Layered on top of batch dictation.
async function handleToggleRealtimeDictation(value: boolean) {
setRealtimeDictationToggleLoading(true);
const previous = realtimeDictationEnabled;
setRealtimeDictationEnabled(value);
try {
const updated = await updateWorkspace({ aiDictationRealtime: value });
setWorkspace({
...updated,
settings: {
...updated.settings,
ai: { ...updated.settings?.ai, dictationRealtime: value },
},
});
notifications.show({ message: t("Updated successfully") });
} catch (err) {
setRealtimeDictationEnabled(previous);
const message = (err as { response?: { data?: { message?: string } } })
?.response?.data?.message;
notifications.show({
message: message ?? t("Failed to update data"),
color: "red",
});
} finally {
setRealtimeDictationToggleLoading(false);
}
}
// Optimistic toggle for the anonymous public-share AI assistant
// (settings.ai.publicShareAssistant). When off, the public endpoint 404s.
async function handleTogglePublicShareAssistant(value: boolean) {
@@ -853,13 +903,24 @@ export default function AiProviderSettings() {
<StatusDot status={sttStatus} label={cardStatusLabel(sttStatus, t)} />
<Text fw={600}>{t("Voice / STT")}</Text>
</Group>
<Switch
label={t("Voice dictation")}
labelPosition="left"
checked={dictationEnabled}
disabled={dictationToggleLoading}
onChange={(e) => handleToggleDictation(e.currentTarget.checked)}
/>
<Group gap="md" align="center" wrap="nowrap">
<Switch
label={t("Voice dictation")}
labelPosition="left"
checked={dictationEnabled}
disabled={dictationToggleLoading}
onChange={(e) => handleToggleDictation(e.currentTarget.checked)}
/>
<Switch
label={t("Realtime dictation")}
labelPosition="left"
checked={realtimeDictationEnabled}
disabled={realtimeDictationToggleLoading}
onChange={(e) =>
handleToggleRealtimeDictation(e.currentTarget.checked)
}
/>
</Group>
</Group>
<Text size="xs" c="dimmed" mt={4} mb="md">
{t(
@@ -954,6 +1015,58 @@ export default function AiProviderSettings() {
</Text>
))}
</Group>
{/* Realtime (streaming) dictation: layered on top of batch STT and only
shown when the workspace toggle is on. Model falls back to the STT
model and the endpoint falls back to the STT base URL server-side. */}
{realtimeDictationEnabled && (
<>
<Text size="xs" c="dimmed" mt="md" mb="xs">
{t(
"Streams audio live and inserts text as you speak (requires an OpenAI-compatible Realtime endpoint)",
)}
</Text>
<TextInput
label={t("Realtime model")}
placeholder="gpt-4o-mini-transcribe"
disabled={isLoading}
{...form.getInputProps("sttRealtimeModel")}
/>
<TextInput
mt="sm"
label={t("Realtime endpoint")}
description={t(
"Leave empty to use the STT base URL",
)}
placeholder={t("Leave empty to use the STT base URL")}
disabled={isLoading}
{...form.getInputProps("sttRealtimeBaseUrl")}
/>
<Group mt="md" align="center">
<Button
variant="default"
size="sm"
loading={realtimeTest.isPending}
onClick={() => realtimeTest.mutate()}
>
{t("Test endpoint")}
</Button>
{realtimeTest.data &&
(realtimeTest.data.ok ? (
<Text size="sm" c="green">
{t("Connection successful")}
</Text>
) : (
<Text size="sm" c="red">
{realtimeTest.data.error || t("Connection failed")}
</Text>
))}
</Group>
</>
)}
</Paper>
{/* Nested: external MCP tools the agent calls out to */}

View File

@@ -8,6 +8,7 @@ import {
getAiSettings,
updateAiSettings,
testAiConnection,
testRealtimeConnection,
reindexAiEmbeddings,
IAiSettings,
IAiSettingsUpdate,
@@ -55,6 +56,12 @@ export function useTestAiConnectionMutation() {
});
}
export function useTestRealtimeConnectionMutation() {
return useMutation<IAiTestResult, Error, void>({
mutationFn: () => testRealtimeConnection(),
});
}
export function useReindexAiEmbeddingsMutation() {
const { t } = useTranslation();
const queryClient = useQueryClient();

View File

@@ -0,0 +1,47 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
// Mock the api client module so we can assert the route and unwrap behavior
// without a real network call.
const post = vi.fn();
vi.mock("@/lib/api-client", () => ({
default: { post: (...args: unknown[]) => post(...args) },
}));
import { testRealtimeConnection } from "./ai-settings-service";
describe("testRealtimeConnection", () => {
beforeEach(() => {
post.mockReset();
});
it("POSTs to the /ai-chat/realtime/test route (NOT the /workspace/ai-settings prefix)", async () => {
post.mockResolvedValue({ data: { ok: true } });
await testRealtimeConnection();
expect(post).toHaveBeenCalledTimes(1);
const [route] = post.mock.calls[0];
expect(route).toBe("/ai-chat/realtime/test");
expect(route).not.toContain("/workspace/ai-settings");
});
it("sends no request body", async () => {
post.mockResolvedValue({ data: { ok: true } });
await testRealtimeConnection();
// Only the route argument — no payload.
expect(post.mock.calls[0].length).toBe(1);
});
it("unwraps the { ok } envelope from res.data", async () => {
post.mockResolvedValue({ data: { ok: true } });
await expect(testRealtimeConnection()).resolves.toEqual({ ok: true });
});
it("surfaces the failure envelope (ok:false + error) verbatim", async () => {
post.mockResolvedValue({ data: { ok: false, error: "unreachable" } });
await expect(testRealtimeConnection()).resolves.toEqual({
ok: false,
error: "unreachable",
});
});
});

View File

@@ -32,6 +32,8 @@ export interface IAiSettings {
// key is stored (empty means "uses the chat API key").
sttModel?: string;
sttBaseUrl?: string;
sttRealtimeModel?: string;
sttRealtimeBaseUrl?: string;
sttApiStyle?: SttApiStyle;
hasSttApiKey: boolean;
// RAG indexing coverage (pages indexed for semantic search).
@@ -59,6 +61,8 @@ export interface IAiSettingsUpdate {
embeddingApiKey?: string;
sttModel?: string;
sttBaseUrl?: string;
sttRealtimeModel?: string;
sttRealtimeBaseUrl?: string;
sttApiStyle?: SttApiStyle;
// Write-only STT key (same semantics as `apiKey` / `embeddingApiKey`).
sttApiKey?: string;
@@ -95,6 +99,14 @@ export async function testAiConnection(
return req.data;
}
// Probes the realtime (streaming STT) endpoint. Unlike the other tests this
// route lives under /ai-chat (not /workspace/ai-settings); it is admin-gated
// server-side and returns the same { ok, error? } envelope at req.data.
export async function testRealtimeConnection(): Promise<IAiTestResult> {
const req = await api.post<IAiTestResult>("/ai-chat/realtime/test");
return req.data;
}
export async function reindexAiEmbeddings(): Promise<IAiSettings> {
const req = await api.post<IAiSettings>("/workspace/ai-settings/reindex");
return req.data;

View File

@@ -25,6 +25,7 @@ export interface IWorkspace {
mcpEnabled?: boolean;
aiChat?: boolean;
aiDictation?: boolean;
aiDictationRealtime?: boolean;
aiPublicShareAssistant?: boolean;
trashRetentionDays?: number;
restrictApiToAdmins?: boolean;
@@ -62,6 +63,7 @@ export interface IWorkspaceAiSettings {
mcp?: boolean;
chat?: boolean;
dictation?: boolean;
dictationRealtime?: boolean;
publicShareAssistant?: boolean;
}

View File

@@ -27,8 +27,14 @@ import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.rep
import { UserThrottlerGuard } from '../../integrations/throttle/user-throttler.guard';
import { AI_CHAT_THROTTLER } from '../../integrations/throttle/throttler-names';
import { FileInterceptor } from '../../common/interceptors/file.interceptor';
import WorkspaceAbilityFactory from '../casl/abilities/workspace-ability.factory';
import {
WorkspaceCaslAction,
WorkspaceCaslSubject,
} from '../casl/interfaces/workspace-ability.type';
import { AiChatService, AiChatStreamBody } from './ai-chat.service';
import { AiTranscriptionService } from './ai-transcription.service';
import { AiRealtimeService } from './realtime/ai-realtime.service';
import {
ChatIdDto,
GetChatMessagesDto,
@@ -51,8 +57,23 @@ export class AiChatController {
private readonly aiChatRepo: AiChatRepo,
private readonly aiChatMessageRepo: AiChatMessageRepo,
private readonly aiTranscription: AiTranscriptionService,
private readonly aiRealtimeService: AiRealtimeService,
private readonly workspaceAbility: WorkspaceAbilityFactory,
) {}
/**
* Admin gate, identical to AiSettingsController.assertAdmin: require the
* workspace Manage/Settings ability (same gate as POST /workspace/update).
*/
private assertAdmin(user: User, workspace: Workspace): void {
const ability = this.workspaceAbility.createForUser(user, workspace);
if (
ability.cannot(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Settings)
) {
throw new ForbiddenException();
}
}
/** List the requesting user's chats in this workspace (paginated). */
@HttpCode(HttpStatus.OK)
@Post('chats')
@@ -287,6 +308,23 @@ export class AiChatController {
return { text };
}
/**
* Admin-only "test connection" probe for the realtime STT upstream. Reuses
* AiRealtimeService.openSession to exercise the real config/SSRF/handshake path
* and tears the socket down immediately. The API key never leaves the server.
* Response is the FROZEN contract { ok: true } | { ok: false, error: string }
* (the global response transform wraps it; the client reads req.data).
*/
@HttpCode(HttpStatus.OK)
@Post('realtime/test')
async testRealtime(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
): Promise<{ ok: true } | { ok: false; error: string }> {
this.assertAdmin(user, workspace);
return this.aiRealtimeService.testConnection(workspace.id);
}
/**
* Ensure the chat exists, belongs to this workspace, AND was created by the
* requesting user (per-user isolation). Throws ForbiddenException otherwise.

View File

@@ -13,6 +13,8 @@ import { SearchModule } from '../search/search.module';
import { PublicShareChatController } from './public-share-chat.controller';
import { PublicShareChatService } from './public-share-chat.service';
import { PublicShareChatToolsService } from './tools/public-share-chat-tools.service';
import { AiRealtimeGateway } from './realtime/ai-realtime.gateway';
import { AiRealtimeService } from './realtime/ai-realtime.service';
/**
* Per-user AI chat module (§6.1).
@@ -46,6 +48,15 @@ import { PublicShareChatToolsService } from './tools/public-share-chat-tools.ser
AiChatToolsService,
PublicShareChatService,
PublicShareChatToolsService,
// Realtime dictation: the Socket.IO `/ai-realtime` gateway + its upstream
// proxy service. AiSettingsService comes from AiModule; WorkspaceRepo /
// UserRepo / UserSessionRepo from the global DatabaseModule;
// EnvironmentService from the global EnvironmentModule; TokenService from
// TokenModule (imported). The repos/env are used by the gateway's
// handleConnection auth (active-session + disabled-user) and CSWSH Origin
// checks that mirror the REST jwt.strategy.
AiRealtimeGateway,
AiRealtimeService,
],
})
export class AiChatModule {}

View File

@@ -0,0 +1,68 @@
import { ForbiddenException } from '@nestjs/common';
import { AiChatController } from './ai-chat.controller';
import WorkspaceAbilityFactory from '../casl/abilities/workspace-ability.factory';
import { UserRole } from '../../common/helpers/types/permission';
// Contract for POST ai-chat/realtime/test:
// - admin gate: CASL Manage Settings. A non-admin (MEMBER) → Forbidden BEFORE
// aiRealtimeService.testConnection is ever called.
// - response form is the FROZEN { ok: true } | { ok: false, error } passthrough
// from the service (the global transform wraps it; the client reads req.data).
// Constructed directly with stubs + the REAL ability factory (the gate under
// test), mirroring the other controller specs in this codebase.
function buildController(testConnection = jest.fn()) {
const aiRealtimeService = { testConnection };
const controller = new AiChatController(
{} as any, // aiChatService
{} as any, // aiChatRepo
{} as any, // aiChatMessageRepo
{} as any, // aiTranscription
aiRealtimeService as any,
new WorkspaceAbilityFactory(), // REAL ability factory (the admin gate)
);
return { controller, testConnection };
}
const workspace = { id: 'w1' } as any;
const userWith = (role: UserRole) => ({ id: 'u1', role }) as any;
describe('AiChatController.testRealtime admin gate', () => {
it('forbids a MEMBER and never calls testConnection', async () => {
const { controller, testConnection } = buildController();
await expect(
controller.testRealtime(userWith(UserRole.MEMBER), workspace),
).rejects.toBeInstanceOf(ForbiddenException);
expect(testConnection).not.toHaveBeenCalled();
});
it('allows an OWNER and returns the service result for the workspace', async () => {
const testConnection = jest.fn().mockResolvedValue({ ok: true });
const { controller } = buildController(testConnection);
const result = await controller.testRealtime(
userWith(UserRole.OWNER),
workspace,
);
expect(testConnection).toHaveBeenCalledWith('w1');
expect(result).toEqual({ ok: true });
});
it('allows an ADMIN and passes through a failure result verbatim', async () => {
const testConnection = jest
.fn()
.mockResolvedValue({ ok: false, error: 'boom' });
const { controller } = buildController(testConnection);
const result = await controller.testRealtime(
userWith(UserRole.ADMIN),
workspace,
);
// The frozen { ok:false, error } shape is forwarded unchanged.
expect(result).toEqual({ ok: false, error: 'boom' });
});
});

View File

@@ -0,0 +1,494 @@
import {
AiRealtimeGateway,
__testCounters,
} from './ai-realtime.gateway';
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
// The gateway authenticates over a cookie-JWT and mirrors the REST jwt.strategy
// active-session / disabled-user checks, enforces a CSWSH Origin allowlist, then
// gates on the workspace feature flags + per-process concurrency caps. These
// specs construct the gateway with `new` + mocked deps (no Socket.IO / DB) and
// drive handleConnection / handleStart / handleAudio / handleStop /
// handleDisconnect directly.
const APP_ORIGIN = 'https://app.example.com';
function buildSocket(opts?: {
cookie?: string;
origin?: string | undefined;
noOriginHeader?: boolean;
}) {
const headers: Record<string, string> = {};
if (opts?.cookie !== undefined) headers.cookie = opts.cookie;
if (!opts?.noOriginHeader) {
headers.origin = opts?.origin ?? APP_ORIGIN;
}
return {
handshake: { headers },
data: {} as any,
emit: jest.fn(),
disconnect: jest.fn(),
};
}
function buildGateway(overrides?: {
verifyJwt?: jest.Mock;
findUser?: jest.Mock;
findActiveSession?: jest.Mock;
findWorkspace?: jest.Mock;
openSession?: jest.Mock;
}) {
const tokenService = {
verifyJwt:
overrides?.verifyJwt ??
jest
.fn()
.mockResolvedValue({ sub: 'u1', workspaceId: 'w1', sessionId: 's1' }),
};
const userRepo = {
findById:
overrides?.findUser ??
jest.fn().mockResolvedValue({ id: 'u1', deactivatedAt: null, deletedAt: null }),
};
const userSessionRepo = {
findActiveById:
overrides?.findActiveSession ??
jest
.fn()
.mockResolvedValue({ id: 's1', userId: 'u1', workspaceId: 'w1' }),
};
const environmentService = {
getAppUrl: jest.fn().mockReturnValue(APP_ORIGIN),
};
const workspaceRepo = {
findById:
overrides?.findWorkspace ??
jest.fn().mockResolvedValue({
settings: { ai: { dictation: true, dictationRealtime: true } },
}),
};
const aiRealtimeService = {
openSession: overrides?.openSession ?? jest.fn(),
};
const gateway = new AiRealtimeGateway(
tokenService as any,
workspaceRepo as any,
userRepo as any,
userSessionRepo as any,
environmentService as any,
aiRealtimeService as any,
);
return {
gateway,
tokenService,
userRepo,
userSessionRepo,
environmentService,
workspaceRepo,
aiRealtimeService,
};
}
beforeEach(() => {
__testCounters.reset();
});
describe('AiRealtimeGateway.handleConnection auth', () => {
it('rejects an invalid/expired JWT: error Unauthorized, disconnect, no counter', async () => {
const { gateway } = buildGateway({
verifyJwt: jest.fn().mockRejectedValue(new Error('jwt expired')),
});
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: 'Unauthorized',
});
expect(socket.disconnect).toHaveBeenCalled();
expect(__testCounters.user.count('u1')).toBe(0);
expect(__testCounters.workspace.count('w1')).toBe(0);
});
it('rejects a missing cookie (no authToken) the same way', async () => {
const { gateway } = buildGateway({
verifyJwt: jest.fn().mockRejectedValue(new Error('no token')),
});
const socket = buildSocket({ cookie: '' });
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: 'Unauthorized',
});
expect(socket.disconnect).toHaveBeenCalled();
});
it('rejects a disabled (deactivated) user: Unauthorized, no counter', async () => {
const { gateway } = buildGateway({
findUser: jest.fn().mockResolvedValue({
id: 'u1',
deactivatedAt: new Date(),
deletedAt: null,
}),
});
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: 'Unauthorized',
});
expect(socket.disconnect).toHaveBeenCalled();
expect(__testCounters.user.count('u1')).toBe(0);
});
it('rejects a missing user', async () => {
const { gateway } = buildGateway({
findUser: jest.fn().mockResolvedValue(undefined),
});
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: 'Unauthorized',
});
});
it('rejects an inactive/revoked session even with a valid signature', async () => {
const { gateway } = buildGateway({
findActiveSession: jest.fn().mockResolvedValue(undefined),
});
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: 'Unauthorized',
});
expect(socket.disconnect).toHaveBeenCalled();
expect(__testCounters.user.count('u1')).toBe(0);
});
it('rejects a session whose user/workspace does not match the token', async () => {
const { gateway } = buildGateway({
findActiveSession: jest
.fn()
.mockResolvedValue({ id: 's1', userId: 'OTHER', workspaceId: 'w1' }),
});
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: 'Unauthorized',
});
});
it('rejects a bad Origin (CSWSH) before auth and increments nothing', async () => {
const { gateway, tokenService } = buildGateway();
const socket = buildSocket({ origin: 'https://evil.example.com' });
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: 'Unauthorized',
});
expect(socket.disconnect).toHaveBeenCalled();
// Origin check runs first: auth is never attempted.
expect(tokenService.verifyJwt).not.toHaveBeenCalled();
expect(__testCounters.user.count('u1')).toBe(0);
});
it('allows a request with no Origin header (native/non-browser client)', async () => {
const { gateway } = buildGateway();
const socket = buildSocket({ noOriginHeader: true });
await gateway.handleConnection(socket as any);
expect(socket.disconnect).not.toHaveBeenCalled();
expect(__testCounters.user.count('u1')).toBe(1);
expect(__testCounters.workspace.count('w1')).toBe(1);
});
it('accepts a matching Origin and increments both counters', async () => {
const { gateway } = buildGateway();
const socket = buildSocket({ origin: APP_ORIGIN });
await gateway.handleConnection(socket as any);
expect(socket.disconnect).not.toHaveBeenCalled();
expect(__testCounters.user.count('u1')).toBe(1);
expect(__testCounters.workspace.count('w1')).toBe(1);
expect(socket.data.countedUserId).toBe('u1');
expect(socket.data.countedWorkspaceId).toBe('w1');
});
});
describe('AiRealtimeGateway.handleConnection gate + caps', () => {
it('disconnects when the feature gate is off and leaves counters clean', async () => {
const { gateway } = buildGateway({
findWorkspace: jest
.fn()
.mockResolvedValue({ settings: { ai: { dictation: true } } }),
});
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: expect.stringMatching(/not enabled/i),
});
expect(socket.disconnect).toHaveBeenCalled();
expect(__testCounters.user.count('u1')).toBe(0);
expect(__testCounters.workspace.count('w1')).toBe(0);
});
it('refuses when the per-user cap is already reached (no increment)', async () => {
__testCounters.user.increment('u1'); // user already at cap (1)
const { gateway } = buildGateway();
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: expect.stringMatching(/already active/i),
});
expect(socket.disconnect).toHaveBeenCalled();
// Still exactly 1 (the pre-existing slot), not bumped to 2.
expect(__testCounters.user.count('u1')).toBe(1);
});
it('checks BOTH caps before incrementing EITHER (workspace cap full → user untouched)', async () => {
// Workspace at its cap (5), user at 0. The connection must be refused and the
// user counter must NOT have been bumped.
for (let i = 0; i < 5; i++) __testCounters.workspace.increment('w1');
const { gateway } = buildGateway();
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: expect.stringMatching(/maximum number/i),
});
expect(__testCounters.user.count('u1')).toBe(0);
expect(__testCounters.workspace.count('w1')).toBe(5);
});
});
describe('AiRealtimeGateway.handleStart lifecycle', () => {
function connectedSocket() {
return {
handshake: { headers: {} },
data: { userId: 'u1', workspaceId: 'w1' } as any,
emit: jest.fn(),
disconnect: jest.fn(),
};
}
it('relays onReady→ready, onInterim→interim{itemId,text}, onFinal→final', async () => {
let captured: any;
const openSession = jest.fn().mockImplementation((_ws, opts) => {
captured = opts;
return { appendAudio: jest.fn(), stop: jest.fn(), close: jest.fn() };
});
const { gateway } = buildGateway({ openSession });
const socket = connectedSocket();
await gateway.handleStart(socket as any, { language: 'en' });
expect(openSession).toHaveBeenCalledWith('w1', expect.any(Object));
captured.onReady();
expect(socket.emit).toHaveBeenCalledWith('ready', {});
captured.onInterim('item-1', 'hello');
expect(socket.emit).toHaveBeenCalledWith('interim', {
itemId: 'item-1',
text: 'hello',
});
captured.onFinal('item-1', 'hello world');
expect(socket.emit).toHaveBeenCalledWith('final', {
itemId: 'item-1',
text: 'hello world',
});
expect(socket.data.handle).toBeDefined();
});
it('onClosed clears the handle and emits closed (releases double-start guard)', async () => {
let captured: any;
const openSession = jest.fn().mockImplementation((_ws, opts) => {
captured = opts;
return { appendAudio: jest.fn(), stop: jest.fn(), close: jest.fn() };
});
const { gateway } = buildGateway({ openSession });
const socket = connectedSocket();
await gateway.handleStart(socket as any);
expect(socket.data.handle).toBeDefined();
captured.onClosed();
expect(socket.data.handle).toBeUndefined();
expect(socket.emit).toHaveBeenCalledWith('closed', {});
});
it('guards a double-start: a session already in progress → error, openSession called once', async () => {
const openSession = jest.fn().mockResolvedValue({
appendAudio: jest.fn(),
stop: jest.fn(),
close: jest.fn(),
});
const { gateway } = buildGateway({ openSession });
const socket = connectedSocket();
await gateway.handleStart(socket as any);
await gateway.handleStart(socket as any);
expect(openSession).toHaveBeenCalledTimes(1);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: expect.stringMatching(/already in progress/i),
});
});
it('surfaces AiSttNotConfigured message verbatim (a clean 503 reason)', async () => {
const err = new AiSttNotConfiguredException();
const { gateway } = buildGateway({
openSession: jest.fn().mockRejectedValue(err),
});
const socket = connectedSocket();
await gateway.handleStart(socket as any);
expect(socket.emit).toHaveBeenCalledWith('error', {
message: err.message,
});
});
it('does not leak a raw key/stack on a provider error (uses describeProviderError)', async () => {
const err = new Error('connect ECONNREFUSED sk-secret-key-1234');
(err as any).stack = 'Error: at openSession sk-secret-key-1234';
const { gateway } = buildGateway({
openSession: jest.fn().mockRejectedValue(err),
});
const socket = connectedSocket();
await gateway.handleStart(socket as any);
const call = socket.emit.mock.calls.find((c) => c[0] === 'error');
expect(call).toBeDefined();
const message = call![1].message as string;
// A concrete, sanitized reason — never the raw stack.
expect(message).not.toContain('at openSession');
expect(typeof message).toBe('string');
});
});
describe('AiRealtimeGateway.handleAudio / handleStop guards', () => {
function socketWithHandle(handle?: any) {
return {
handshake: { headers: {} },
data: { userId: 'u1', workspaceId: 'w1', handle } as any,
emit: jest.fn(),
disconnect: jest.fn(),
};
}
it('no handle → handleAudio never calls appendAudio', () => {
const { gateway } = buildGateway();
const socket = socketWithHandle(undefined);
gateway.handleAudio(socket as any, Buffer.from([1, 2, 3]));
// Nothing to assert beyond not throwing — there is no handle to call.
expect(socket.data.handle).toBeUndefined();
});
it('a Buffer payload → exactly one appendAudio', () => {
const appendAudio = jest.fn();
const { gateway } = buildGateway();
const socket = socketWithHandle({ appendAudio });
gateway.handleAudio(socket as any, Buffer.from([1, 2, 3]));
expect(appendAudio).toHaveBeenCalledTimes(1);
});
it('a non-binary payload → no appendAudio', () => {
const appendAudio = jest.fn();
const { gateway } = buildGateway();
const socket = socketWithHandle({ appendAudio });
gateway.handleAudio(socket as any, 'not binary');
expect(appendAudio).not.toHaveBeenCalled();
});
it('handleStop with no handle does not throw', () => {
const { gateway } = buildGateway();
const socket = socketWithHandle(undefined);
expect(() => gateway.handleStop(socket as any)).not.toThrow();
});
it('handleStop with a handle calls stop once', () => {
const stop = jest.fn();
const { gateway } = buildGateway();
const socket = socketWithHandle({ stop });
gateway.handleStop(socket as any);
expect(stop).toHaveBeenCalledTimes(1);
});
});
describe('AiRealtimeGateway.handleDisconnect no-leak', () => {
it('decrements both counters for a fully-accepted connection', async () => {
const { gateway } = buildGateway();
const socket = buildSocket();
await gateway.handleConnection(socket as any);
expect(__testCounters.user.count('u1')).toBe(1);
gateway.handleDisconnect(socket as any);
expect(__testCounters.user.count('u1')).toBe(0);
expect(__testCounters.workspace.count('w1')).toBe(0);
});
it('is a no-op when the connection was rejected before incrementing', async () => {
const { gateway } = buildGateway({
verifyJwt: jest.fn().mockRejectedValue(new Error('bad')),
});
const socket = buildSocket();
await gateway.handleConnection(socket as any);
// No counted ids stashed → disconnect must not touch counters.
gateway.handleDisconnect(socket as any);
expect(__testCounters.user.count('u1')).toBe(0);
expect(__testCounters.workspace.count('w1')).toBe(0);
});
it('is idempotent: a double disconnect never goes negative', async () => {
const { gateway } = buildGateway();
const socket = buildSocket();
await gateway.handleConnection(socket as any);
gateway.handleDisconnect(socket as any);
gateway.handleDisconnect(socket as any);
expect(__testCounters.user.count('u1')).toBe(0);
expect(__testCounters.workspace.count('w1')).toBe(0);
});
it('closes the upstream handle on disconnect', async () => {
const close = jest.fn();
const { gateway } = buildGateway();
const socket = buildSocket();
await gateway.handleConnection(socket as any);
socket.data.handle = { close };
gateway.handleDisconnect(socket as any);
expect(close).toHaveBeenCalledTimes(1);
expect(socket.data.handle).toBeUndefined();
});
});
describe('AiRealtimeGateway.toBuffer', () => {
// toBuffer is private static; access it via the class for the documented
// contract (Buffer/Uint8Array/ArrayBuffer → Buffer; everything else → null).
const toBuffer = (AiRealtimeGateway as any).toBuffer as (
p: unknown,
) => Buffer | null;
it('passes a Buffer through', () => {
const buf = Buffer.from([1, 2, 3]);
expect(toBuffer(buf)).toBe(buf);
});
it('converts a Uint8Array to a Buffer', () => {
const out = toBuffer(new Uint8Array([1, 2, 3]));
expect(Buffer.isBuffer(out)).toBe(true);
expect(out).toEqual(Buffer.from([1, 2, 3]));
});
it('converts an ArrayBuffer to a Buffer', () => {
const out = toBuffer(new Uint8Array([4, 5]).buffer);
expect(Buffer.isBuffer(out)).toBe(true);
});
it('returns null for a string', () => {
expect(toBuffer('abc')).toBeNull();
});
it('returns null for null', () => {
expect(toBuffer(null)).toBeNull();
});
});

View File

@@ -0,0 +1,291 @@
import {
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
WebSocketGateway,
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Socket } from 'socket.io';
import * as cookie from 'cookie';
import { TokenService } from '../../auth/services/token.service';
import { JwtPayload, JwtType } from '../../auth/dto/jwt-payload';
import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo';
import { UserRepo } from '@docmost/db/repos/user/user.repo';
import { UserSessionRepo } from '@docmost/db/repos/session/user-session.repo';
import { EnvironmentService } from '../../../integrations/environment/environment.service';
import { isUserDisabled } from '../../../common/helpers';
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
import { describeProviderError } from '../../../integrations/ai/ai-error.util';
import {
AiRealtimeService,
RealtimeSessionHandle,
} from './ai-realtime.service';
import {
SessionCounters,
canConnect,
MAX_SESSIONS_PER_USER,
MAX_SESSIONS_PER_WORKSPACE,
} from './session-limits';
/**
* Realtime dictation gateway — the server side of the FROZEN normalized
* Socket.IO `/ai-realtime` protocol. The browser talks ONLY to this namespace;
* the raw OpenAI GA schema and the provider key never reach the client.
*
* Client → server: connect (cookie-JWT auth), `start` { language? }, `audio`
* (PCM16 binary), `stop`. Server → client: `ready`, `interim`, `final`,
* `error`, `closed`.
*
* Gate (before opening upstream): the workspace must have BOTH
* `settings.ai.dictation === true` AND `settings.ai.dictationRealtime === true`.
* Hard concurrency caps (realtime is expensive) are enforced in-memory per user
* and per workspace.
*
* ────────────────────────────────────────────────────────────────────────────
* OPS WARNING — CONCURRENCY CAPS ARE PER-PROCESS / IN-MEMORY.
* The `userSessions` / `workspaceSessions` counters live in this Node process's
* heap. They are correct ONLY on a SINGLE API replica. With N horizontally
* scaled replicas behind a load balancer the EFFECTIVE limit is N × the cap
* (each replica counts only the sockets it terminates), and the caps are not
* shared. A multi-replica deployment that must enforce a true global cap needs
* a shared store (e.g. Redis). This is by design for the single-process default
* and is documented loudly so it is not mistaken for a global guarantee.
* ────────────────────────────────────────────────────────────────────────────
*/
// Module-level concurrency counters (one Node process backs the gateway). See
// the OPS WARNING above: these are per-process/in-memory and only correct on a
// single API replica.
const userSessions = new SessionCounters();
const workspaceSessions = new SessionCounters();
/** Per-socket state we stash on client.data. */
interface RealtimeClientData {
userId: string;
workspaceId: string;
handle?: RealtimeSessionHandle;
// What we incremented at connect time, so disconnect decrements exactly that.
countedUserId?: string;
countedWorkspaceId?: string;
}
@WebSocketGateway({
namespace: '/ai-realtime',
// CORS origin is '*' at the socket.io layer, but this is a NEW externally
// reachable, provider-billed surface that authenticates via cookie. To block
// cross-site WebSocket hijacking (CSWSH) we enforce an explicit Origin
// allowlist in handleConnection (see assertAllowedOrigin) rather than relying
// on this permissive transport-level setting. (The pre-existing WsGateway is
// out of scope and intentionally unchanged.)
cors: { origin: '*' },
transports: ['websocket'],
})
export class AiRealtimeGateway
implements OnGatewayConnection, OnGatewayDisconnect
{
private readonly logger = new Logger(AiRealtimeGateway.name);
constructor(
private readonly tokenService: TokenService,
private readonly workspaceRepo: WorkspaceRepo,
private readonly userRepo: UserRepo,
private readonly userSessionRepo: UserSessionRepo,
private readonly environmentService: EnvironmentService,
private readonly aiRealtimeService: AiRealtimeService,
) {}
/**
* CSWSH guard: allow the handshake only when the browser-supplied `Origin`
* matches the app's configured origin, OR when there is no Origin header at
* all (native / non-browser clients never send one and cannot be coerced by a
* malicious page into doing so). A mismatched Origin → reject. Scoped to this
* realtime gateway only.
*/
private assertAllowedOrigin(client: Socket): void {
const origin = client.handshake.headers.origin;
// No Origin header → non-browser client; nothing to spoof, allow.
if (!origin) return;
const appOrigin = this.environmentService.getAppUrl();
if (origin !== appOrigin) {
throw new Error(`Origin not allowed: ${origin}`);
}
}
async handleConnection(client: Socket): Promise<void> {
try {
// 1) CSWSH Origin allowlist (before touching auth or any counter).
this.assertAllowedOrigin(client);
const cookies = cookie.parse(client.handshake.headers.cookie ?? '');
const token: JwtPayload = await this.tokenService.verifyJwt(
cookies['authToken'],
JwtType.ACCESS,
);
const userId = token.sub;
const workspaceId = token.workspaceId;
if (!workspaceId) {
throw new Error('Token missing workspaceId');
}
// 2) Mirror the REST jwt.strategy active-session / disabled-user checks
// (jwt.strategy.ts:53-72): a signed-but-stale token (revoked session,
// deactivated/deleted user) must NOT open a realtime session even though
// the signature is still valid until expiry (default 90d). Verifying the
// signature alone is not enough.
const user = await this.userRepo.findById(userId, workspaceId);
if (!user || isUserDisabled(user)) {
throw new Error('User disabled or not found');
}
const sessionId = token.sessionId;
if (sessionId) {
const session = await this.userSessionRepo.findActiveById(sessionId);
if (
!session ||
session.userId !== userId ||
session.workspaceId !== workspaceId
) {
throw new Error('Session not active');
}
}
const data = client.data as RealtimeClientData;
data.userId = userId;
data.workspaceId = workspaceId;
// Gate + concurrency caps. canConnect is a pure decision over the current
// counts; it checks BOTH the feature gate and BOTH caps before we mutate
// either counter, so a rejected connection leaves the counters clean.
const workspace = await this.workspaceRepo.findById(workspaceId);
const settings = (workspace?.settings ?? {}) as {
ai?: { dictation?: boolean; dictationRealtime?: boolean };
};
const decision = canConnect(userId, workspaceId, settings, {
userCount: userSessions.count(userId),
workspaceCount: workspaceSessions.count(workspaceId),
});
if (decision.allowed === false) {
client.emit('error', { message: decision.reason });
client.disconnect();
return;
}
userSessions.increment(userId);
workspaceSessions.increment(workspaceId);
// Remember exactly what we counted so disconnect decrements symmetrically.
data.countedUserId = userId;
data.countedWorkspaceId = workspaceId;
} catch (err) {
// Auth/origin failure (or any unexpected connect error): never leak
// details, never increment a counter.
this.logger.error('Realtime dictation connection rejected', err as Error);
client.emit('error', { message: 'Unauthorized' });
client.disconnect();
}
}
@SubscribeMessage('start')
async handleStart(
client: Socket,
data?: { language?: string },
): Promise<void> {
const state = client.data as RealtimeClientData;
// Guard double-start: a session is already open on this socket.
if (state.handle) {
client.emit('error', {
message: 'A realtime dictation session is already in progress',
});
return;
}
try {
const handle = await this.aiRealtimeService.openSession(
state.workspaceId,
{
language: data?.language,
onReady: () => client.emit('ready', {}),
onInterim: (itemId, text) => client.emit('interim', { itemId, text }),
onFinal: (itemId, text) => client.emit('final', { itemId, text }),
onError: (message) => client.emit('error', { message }),
onClosed: () => {
// Session ended (graceful stop, idle/max-duration, or upstream close):
// clear the handle so the double-start guard is released, then notify.
state.handle = undefined;
client.emit('closed', {});
},
},
);
state.handle = handle;
} catch (err) {
// Concrete reason to the client: a not-configured 503 vs a provider error.
this.logger.error('Failed to open realtime dictation session', err as Error);
const message =
err instanceof AiSttNotConfiguredException
? err.message
: describeProviderError(err, 'Failed to start realtime dictation');
client.emit('error', { message });
}
}
@SubscribeMessage('audio')
handleAudio(client: Socket, payload: unknown): void {
const state = client.data as RealtimeClientData;
if (!state.handle) return;
const chunk = AiRealtimeGateway.toBuffer(payload);
if (!chunk) return;
state.handle.appendAudio(chunk);
}
@SubscribeMessage('stop')
handleStop(client: Socket): void {
const state = client.data as RealtimeClientData;
state.handle?.stop();
}
handleDisconnect(client: Socket): void {
const state = client.data as RealtimeClientData;
// Tear down the upstream session, then release the concurrency slots we took.
state.handle?.close();
state.handle = undefined;
if (state.countedUserId) {
userSessions.decrement(state.countedUserId);
state.countedUserId = undefined;
}
if (state.countedWorkspaceId) {
workspaceSessions.decrement(state.countedWorkspaceId);
state.countedWorkspaceId = undefined;
}
}
/**
* Normalize an incoming `audio` payload to a Buffer. Socket.IO delivers binary
* as Buffer (Node) but may also surface Uint8Array / ArrayBuffer; accept all.
* Returns null for anything we cannot interpret as binary audio.
*/
private static toBuffer(payload: unknown): Buffer | null {
if (Buffer.isBuffer(payload)) return payload;
if (payload instanceof Uint8Array) return Buffer.from(payload);
if (payload instanceof ArrayBuffer) return Buffer.from(payload);
return null;
}
}
// Re-export the extracted concurrency cap constants for the gateway's existing
// importers and any callers that referenced them off this module.
export { MAX_SESSIONS_PER_USER, MAX_SESSIONS_PER_WORKSPACE };
/**
* Test-only handles onto the module-level counters. Exposed so the gateway
* lifecycle / no-leak specs can assert and reset the per-process state between
* cases (the counters are shared across every instance in-process). NOT for
* production use.
*/
export const __testCounters = {
user: userSessions,
workspace: workspaceSessions,
reset(): void {
userSessions.reset();
workspaceSessions.reset();
},
};

View File

@@ -0,0 +1,831 @@
import { EventEmitter } from 'node:events';
// Mock the SSRF guard so openSession's pre-flight check is controllable and the
// pinned `lookup` never touches real DNS in these unit tests.
jest.mock('../external-mcp/ssrf-guard', () => ({
isUrlAllowed: jest.fn(async () => ({ ok: true })),
isIpAllowed: jest.fn(() => ({ ok: true })),
}));
import {
parseUpstreamEvent,
AiRealtimeService,
type WsFactory,
} from './ai-realtime.service';
import { isUrlAllowed } from '../external-mcp/ssrf-guard';
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
const mockedIsUrlAllowed = isUrlAllowed as jest.MockedFunction<
typeof isUrlAllowed
>;
/** ws readyState constants (CONNECTING/OPEN/CLOSING/CLOSED). */
const WS_CONNECTING = 0;
const WS_OPEN = 1;
const WS_CLOSED = 3;
/**
* Minimal fake of the `ws` WebSocket: an EventEmitter with readyState/send/close.
* `sent` records every frame; `failSend` makes the next send throw; helpers
* simulate the upstream emitting open/message/close/error.
*/
class FakeWs extends EventEmitter {
static readonly OPEN = WS_OPEN;
static readonly CONNECTING = WS_CONNECTING;
static readonly CLOSED = WS_CLOSED;
readyState = WS_CONNECTING;
sent: string[] = [];
closed = false;
failSend = false;
lastOpts: unknown;
constructor(opts?: unknown) {
super();
this.lastOpts = opts;
}
send(data: string): void {
if (this.failSend) throw new Error('send boom');
this.sent.push(data);
}
close(): void {
this.closed = true;
this.readyState = WS_CLOSED;
}
/** Parsed view of the sent frames. */
sentJson(): Array<Record<string, unknown>> {
return this.sent.map((s) => JSON.parse(s) as Record<string, unknown>);
}
// --- upstream-side simulation helpers ---
open(): void {
this.readyState = WS_OPEN;
this.emit('open');
}
message(obj: unknown): void {
this.emit('message', Buffer.from(JSON.stringify(obj)));
}
rawMessage(raw: string): void {
this.emit('message', Buffer.from(raw));
}
upstreamClose(code: number, reason = ''): void {
this.readyState = WS_CLOSED;
this.emit('close', code, Buffer.from(reason));
}
errorEvent(err: Error): void {
this.emit('error', err);
}
}
/** Build a service with a stub AiSettingsService.resolve and the fake ws factory. */
function makeService(
resolveValue: Record<string, unknown> | null,
): { service: AiRealtimeService; created: FakeWs[] } {
const created: FakeWs[] = [];
const aiSettings = {
resolve: jest.fn(async () => resolveValue),
} as unknown as ConstructorParameters<typeof AiRealtimeService>[0];
const service = new AiRealtimeService(aiSettings);
const factory: WsFactory = (_url, opts) => {
const ws = new FakeWs(opts);
created.push(ws);
return ws as unknown as ReturnType<WsFactory>;
};
service.setWsFactory(factory);
return { service, created };
}
/** A fully-configured STT config that resolves to the OpenAI default URL. */
const OPENAI_CFG = {
driver: 'openai',
sttRealtimeModel: 'gpt-4o-transcribe',
sttApiKey: 'sk-test',
};
/** Callback spies + the options object to pass to openSession. */
function makeCallbacks(language?: string) {
const cb = {
onReady: jest.fn(),
onInterim: jest.fn(),
onFinal: jest.fn(),
onError: jest.fn(),
onClosed: jest.fn(),
};
return { ...cb, language };
}
/**
* Unit tests for the PURE `parseUpstreamEvent` normalizer (no network). They
* feed synthetic OpenAI GA frames through a shared per-item delta accumulator
* and assert the normalized `/ai-realtime` outputs, including that two deltas
* for the same item_id accumulate and that the accumulator is cleared once the
* segment completes.
*/
describe('parseUpstreamEvent (OpenAI GA → normalized realtime events)', () => {
let acc: Map<string, string>;
beforeEach(() => {
acc = new Map<string, string>();
});
it('maps session.created / session.updated to { type: "ready" }', () => {
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.created' }), acc)).toEqual({
type: 'ready',
});
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.updated' }), acc)).toEqual({
type: 'ready',
});
// No accumulator side effects from session frames.
expect(acc.size).toBe(0);
});
it('accumulates two deltas for the same item_id into the interim text', () => {
const first = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-1',
delta: 'Hello',
}),
acc,
);
expect(first).toEqual({ type: 'interim', itemId: 'item-1', text: 'Hello' });
const second = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-1',
delta: ' world',
}),
acc,
);
// The second delta appends to the first: the interim is the full running text.
expect(second).toEqual({
type: 'interim',
itemId: 'item-1',
text: 'Hello world',
});
// Accumulator is keyed by `${item_id}:${content_index ?? 0}`.
expect(acc.get('item-1:0')).toBe('Hello world');
});
it('emits a trimmed final from the completed transcript and clears the accumulator', () => {
// Seed an in-flight accumulation, then complete it.
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-2',
delta: 'partial',
}),
acc,
);
expect(acc.has('item-2:0')).toBe(true);
const final = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'item-2',
transcript: ' Final transcript. ',
}),
acc,
);
expect(final).toEqual({
type: 'final',
itemId: 'item-2',
text: 'Final transcript.',
});
// The accumulator entry for the completed segment is removed.
expect(acc.has('item-2:0')).toBe(false);
});
it('falls back to the accumulated text when completed omits the transcript', () => {
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-3',
delta: 'accumulated only',
}),
acc,
);
const final = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'item-3',
}),
acc,
);
expect(final).toEqual({
type: 'final',
itemId: 'item-3',
text: 'accumulated only',
});
expect(acc.has('item-3:0')).toBe(false);
});
it('maps an error frame to { type: "error" } with the provider message', () => {
const out = parseUpstreamEvent(
JSON.stringify({
type: 'error',
error: { message: 'invalid_api_key', code: 'invalid', type: 'auth' },
}),
acc,
);
expect(out.type).toBe('error');
expect(out.message).toBe('invalid_api_key');
});
it('maps an unknown frame to { type: "ignore" }', () => {
expect(
parseUpstreamEvent(JSON.stringify({ type: 'response.created' }), acc),
).toEqual({ type: 'ignore' });
// An unknown frame leaves a running accumulation untouched.
expect(acc.size).toBe(0);
});
it('maps an unparseable (non-JSON) frame to { type: "ignore" }', () => {
expect(parseUpstreamEvent('not json', acc)).toEqual({ type: 'ignore' });
});
it('runs the full GA sequence end-to-end and ends with a clean accumulator', () => {
// session.created → two deltas (same item) → completed → error → unknown.
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.created' }), acc)).toEqual({
type: 'ready',
});
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'seg',
delta: 'one ',
}),
acc,
),
).toEqual({ type: 'interim', itemId: 'seg', text: 'one ' });
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'seg',
delta: 'two',
}),
acc,
),
).toEqual({ type: 'interim', itemId: 'seg', text: 'one two' });
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'seg',
transcript: 'one two',
}),
acc,
),
).toEqual({ type: 'final', itemId: 'seg', text: 'one two' });
expect(
parseUpstreamEvent(
JSON.stringify({ type: 'error', error: { message: 'boom' } }),
acc,
),
).toEqual({ type: 'error', message: 'boom' });
expect(parseUpstreamEvent(JSON.stringify({ type: 'whatever' }), acc)).toEqual({
type: 'ignore',
});
// Every started segment was completed → the accumulator is empty.
expect(acc.size).toBe(0);
});
it('ignores a delta with no item_id', () => {
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
delta: 'orphan',
}),
acc,
),
).toEqual({ type: 'ignore' });
expect(acc.size).toBe(0);
});
it('ignores a completed with no item_id', () => {
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
transcript: 'orphan',
}),
acc,
),
).toEqual({ type: 'ignore' });
});
it('falls back to describeProviderError when an error frame carries no message', () => {
const out = parseUpstreamEvent(
JSON.stringify({ type: 'error', error: { code: 'x', type: 'server' } }),
acc,
);
expect(out.type).toBe('error');
// No provider message → the generic fallback string.
expect(out.message).toBe('Realtime transcription error');
});
it('emits text:"" for a completed with no transcript and an empty accumulator', () => {
const out = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'empty-1',
}),
acc,
);
expect(out).toEqual({ type: 'final', itemId: 'empty-1', text: '' });
});
it('ignores non-object JSON payloads ("42", "null")', () => {
expect(parseUpstreamEvent('42', acc)).toEqual({ type: 'ignore' });
expect(parseUpstreamEvent('null', acc)).toEqual({ type: 'ignore' });
expect(acc.size).toBe(0);
});
it('keeps two content_index parts under the same item_id separate', () => {
// Two segments share item_id 'seg' but differ by content_index.
const a = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'seg',
content_index: 0,
delta: 'first',
}),
acc,
);
expect(a).toEqual({ type: 'interim', itemId: 'seg', text: 'first' });
const b = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'seg',
content_index: 1,
delta: 'second',
}),
acc,
);
// content_index 1 must NOT be concatenated onto content_index 0.
expect(b).toEqual({ type: 'interim', itemId: 'seg', text: 'second' });
// Two distinct accumulator entries.
expect(acc.get('seg:0')).toBe('first');
expect(acc.get('seg:1')).toBe('second');
// Completing content_index 0 leaves content_index 1 intact.
const finalA = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'seg',
content_index: 0,
transcript: 'first',
}),
acc,
);
expect(finalA).toEqual({ type: 'final', itemId: 'seg', text: 'first' });
expect(acc.has('seg:0')).toBe(false);
expect(acc.get('seg:1')).toBe('second');
});
});
describe('AiRealtimeService.deriveRealtimeUrl', () => {
const OPENAI_DEFAULT = 'wss://api.openai.com/v1/realtime?intent=transcription';
it('returns the OpenAI default for no base / whitespace base', () => {
expect(AiRealtimeService.deriveRealtimeUrl()).toBe(OPENAI_DEFAULT);
expect(AiRealtimeService.deriveRealtimeUrl('')).toBe(OPENAI_DEFAULT);
expect(AiRealtimeService.deriveRealtimeUrl(' ')).toBe(OPENAI_DEFAULT);
});
it('derives /v1/realtime for a bare host base', () => {
expect(AiRealtimeService.deriveRealtimeUrl('https://stt.example.com')).toBe(
'wss://stt.example.com/v1/realtime?intent=transcription',
);
});
it('does not duplicate /v1 when the base already ends in /v1', () => {
expect(
AiRealtimeService.deriveRealtimeUrl('https://stt.example.com/v1'),
).toBe('wss://stt.example.com/v1/realtime?intent=transcription');
});
it('does not duplicate /v1 when the base already ends in /v1/realtime', () => {
expect(
AiRealtimeService.deriveRealtimeUrl('https://stt.example.com/v1/realtime'),
).toBe('wss://stt.example.com/v1/realtime?intent=transcription');
});
it('strips a trailing slash before normalizing the path', () => {
expect(
AiRealtimeService.deriveRealtimeUrl('https://stt.example.com/v1/'),
).toBe('wss://stt.example.com/v1/realtime?intent=transcription');
});
it('upgrades https to wss', () => {
expect(
AiRealtimeService.deriveRealtimeUrl('https://stt.example.com:8443/v1'),
).toBe('wss://stt.example.com:8443/v1/realtime?intent=transcription');
});
it('THROWS (fail-closed) for a non-empty unparseable base', () => {
expect(() => AiRealtimeService.deriveRealtimeUrl('not a url')).toThrow(
/could not be parsed/i,
);
// The bad base is mentioned in the message.
expect(() => AiRealtimeService.deriveRealtimeUrl('not a url')).toThrow(
/not a url/,
);
});
it('THROWS for an http:// base (Bearer key would be plaintext)', () => {
expect(() =>
AiRealtimeService.deriveRealtimeUrl('http://stt.example.com/v1'),
).toThrow(/secure|plaintext|http:/i);
});
it('THROWS for a ws:// base (insecure scheme)', () => {
expect(() =>
AiRealtimeService.deriveRealtimeUrl('ws://stt.example.com/v1'),
).toThrow(/secure|plaintext|ws:/i);
});
});
describe('AiRealtimeService.openSession (fake ws seam)', () => {
beforeEach(() => {
mockedIsUrlAllowed.mockReset();
mockedIsUrlAllowed.mockResolvedValue({ ok: true });
});
it('not configured (no driver/model) → throws AiSttNotConfiguredException and creates NO socket', async () => {
const { service, created } = makeService(null);
await expect(
service.openSession('ws-1', makeCallbacks()),
).rejects.toBeInstanceOf(AiSttNotConfiguredException);
expect(created).toHaveLength(0);
});
it('SSRF guard blocks (isUrlAllowed=false) → throws and creates NO socket', async () => {
mockedIsUrlAllowed.mockResolvedValue({ ok: false, reason: 'blocked range' });
const { service, created } = makeService(OPENAI_CFG);
await expect(service.openSession('ws-1', makeCallbacks())).rejects.toThrow(
/SSRF guard.*blocked range/i,
);
expect(created).toHaveLength(0);
});
it('on open sends exactly one session.update with the GA transcription shape', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
await service.openSession('ws-1', cb);
const ws = created[0];
// Authorization header set; NO OpenAI-Beta header.
const opts = ws.lastOpts as { headers?: Record<string, string> };
expect(opts.headers?.Authorization).toBe('Bearer sk-test');
expect(Object.keys(opts.headers ?? {})).not.toContain('OpenAI-Beta');
ws.open();
expect(ws.sent).toHaveLength(1);
const frame = ws.sentJson()[0];
expect(frame.type).toBe('session.update');
const session = frame.session as Record<string, unknown>;
expect(session.type).toBe('transcription');
const input = (session.audio as Record<string, Record<string, unknown>>)
.input;
expect(input.format).toEqual({ type: 'audio/pcm', rate: 24000 });
expect(input.turn_detection).toEqual({ type: 'server_vad' });
const transcription = input.transcription as Record<string, unknown>;
expect(transcription.model).toBe('gpt-4o-transcribe');
// No language was supplied → omitted.
expect(transcription).not.toHaveProperty('language');
});
it('includes language only when supplied', async () => {
const { service, created } = makeService(OPENAI_CFG);
await service.openSession('ws-1', makeCallbacks('en'));
const ws = created[0];
ws.open();
const input = (
(ws.sentJson()[0].session as Record<string, unknown>).audio as Record<
string,
Record<string, unknown>
>
).input;
const transcription = input.transcription as Record<string, unknown>;
expect(transcription.language).toBe('en');
});
it('fires onReady once on a session.created/updated frame', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
ws.message({ type: 'session.created' });
expect(cb.onReady).toHaveBeenCalledTimes(1);
ws.message({ type: 'session.updated' });
expect(cb.onReady).toHaveBeenCalledTimes(2);
});
it('routes interim → onInterim and completed → onFinal', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
ws.message({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'i1',
delta: 'hi',
});
expect(cb.onInterim).toHaveBeenCalledWith('i1', 'hi');
ws.message({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'i1',
transcript: 'hi there',
});
expect(cb.onFinal).toHaveBeenCalledWith('i1', 'hi there');
});
it('routes an error frame → onError + teardown (onClosed once)', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
ws.message({ type: 'error', error: { message: 'invalid_api_key' } });
expect(cb.onError).toHaveBeenCalledWith('invalid_api_key');
expect(cb.onClosed).toHaveBeenCalledTimes(1);
expect(ws.closed).toBe(true);
});
it('ignores messages after teardown (post-close → no-op)', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
handle.close();
expect(cb.onClosed).toHaveBeenCalledTimes(1);
ws.message({ type: 'session.created' });
ws.message({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'late',
transcript: 'dropped',
});
expect(cb.onReady).not.toHaveBeenCalled();
expect(cb.onFinal).not.toHaveBeenCalled();
});
it('appendAudio is a no-op when the socket is not OPEN', async () => {
const { service, created } = makeService(OPENAI_CFG);
const handle = await service.openSession('ws-1', makeCallbacks());
const ws = created[0];
// Still CONNECTING (no open yet).
handle.appendAudio(Buffer.from([1, 2, 3]));
expect(ws.sent).toHaveLength(0);
});
it('appendAudio base64-encodes and sends input_audio_buffer.append when OPEN', async () => {
const { service, created } = makeService(OPENAI_CFG);
const handle = await service.openSession('ws-1', makeCallbacks());
const ws = created[0];
ws.open();
ws.sent.length = 0; // drop the session.update frame
handle.appendAudio(Buffer.from([1, 2, 3]));
const frame = ws.sentJson()[0];
expect(frame.type).toBe('input_audio_buffer.append');
expect(frame.audio).toBe(Buffer.from([1, 2, 3]).toString('base64'));
});
it('appendAudio send throw → onError + teardown', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
ws.failSend = true;
handle.appendAudio(Buffer.from([1]));
expect(cb.onError).toHaveBeenCalled();
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
});
describe('AiRealtimeService stop() drain + idempotency', () => {
beforeEach(() => {
mockedIsUrlAllowed.mockReset();
mockedIsUrlAllowed.mockResolvedValue({ ok: true });
});
it('with nothing in flight, stop commits then tears down immediately', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
ws.sent.length = 0;
handle.stop();
// Commit was sent, socket closed, onClosed fired once.
expect(ws.sentJson()[0].type).toBe('input_audio_buffer.commit');
expect(ws.closed).toBe(true);
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
it('drains an in-flight segment: holds open until the tail completed, delivers it, then closes', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
// A segment is in flight (delta but no completed yet).
ws.message({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'tail',
delta: 'last phrase',
});
ws.sent.length = 0;
handle.stop();
// Commit sent; socket NOT yet closed (draining).
expect(ws.sentJson()[0].type).toBe('input_audio_buffer.commit');
expect(ws.closed).toBe(false);
expect(cb.onClosed).not.toHaveBeenCalled();
// The tail's completed arrives after stop → it is still delivered.
ws.message({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'tail',
transcript: 'last phrase',
});
expect(cb.onFinal).toHaveBeenCalledWith('tail', 'last phrase');
// Now closed.
expect(ws.closed).toBe(true);
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
it('drain timeout closes if the tail never arrives (fake timers)', async () => {
jest.useFakeTimers();
try {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
ws.message({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'tail',
delta: 'partial',
});
handle.stop();
expect(ws.closed).toBe(false);
// Drain window (2.5s) elapses with no completed → close.
jest.advanceTimersByTime(3_000);
expect(ws.closed).toBe(true);
expect(cb.onClosed).toHaveBeenCalledTimes(1);
} finally {
jest.useRealTimers();
}
});
it('is idempotent: double close fires onClosed exactly once', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
created[0].open();
handle.close();
handle.close();
handle.stop();
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
});
describe('AiRealtimeService session timers (fake timers)', () => {
beforeEach(() => {
mockedIsUrlAllowed.mockReset();
mockedIsUrlAllowed.mockResolvedValue({ ok: true });
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
it('idle 15s with no audio → onError + onClosed', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
await service.openSession('ws-1', cb);
created[0].open();
jest.advanceTimersByTime(15_000);
expect(cb.onError).toHaveBeenCalledWith(
expect.stringContaining('idle'),
);
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
it('max 120s duration → onError + onClosed', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
created[0].open();
// Keep pushing audio so the idle timer never fires before the max cap.
for (let t = 0; t < 120_000; t += 10_000) {
handle.appendAudio(Buffer.from([1]));
jest.advanceTimersByTime(10_000);
}
expect(cb.onError).toHaveBeenCalledWith(
expect.stringContaining('maximum duration'),
);
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
it('an unexpected upstream close (code !== 1000) reports onError', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
await service.openSession('ws-1', cb);
const ws = created[0];
ws.open();
ws.upstreamClose(1006, 'abnormal');
expect(cb.onError).toHaveBeenCalledWith(
expect.stringContaining('1006'),
);
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
it('no timer fires after teardown', async () => {
const { service, created } = makeService(OPENAI_CFG);
const cb = makeCallbacks();
const handle = await service.openSession('ws-1', cb);
created[0].open();
handle.close();
expect(cb.onClosed).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(200_000);
expect(cb.onError).not.toHaveBeenCalled();
expect(cb.onClosed).toHaveBeenCalledTimes(1);
});
});
describe('AiRealtimeService.testConnection (settle once)', () => {
beforeEach(() => {
mockedIsUrlAllowed.mockReset();
mockedIsUrlAllowed.mockResolvedValue({ ok: true });
});
it('resolves { ok: true } on the first onReady', async () => {
const { service, created } = makeService(OPENAI_CFG);
const promise = service.testConnection('ws-1');
// Let openSession's await resolve and create the socket.
await Promise.resolve();
await Promise.resolve();
const ws = created[0];
ws.open();
ws.message({ type: 'session.created' });
await expect(promise).resolves.toEqual({ ok: true });
expect(ws.closed).toBe(true);
});
it('resolves { ok: false, error } on the first onError', async () => {
const { service, created } = makeService(OPENAI_CFG);
const promise = service.testConnection('ws-1');
await Promise.resolve();
await Promise.resolve();
const ws = created[0];
ws.open();
ws.message({ type: 'error', error: { message: 'bad key' } });
await expect(promise).resolves.toEqual({ ok: false, error: 'bad key' });
});
it('times out after 8s → { ok: false } (fake timers)', async () => {
jest.useFakeTimers();
try {
const { service, created } = makeService(OPENAI_CFG);
const promise = service.testConnection('ws-1');
// Flush the openSession microtasks so the socket exists.
await Promise.resolve();
await Promise.resolve();
created[0].open();
jest.advanceTimersByTime(8_000);
await expect(promise).resolves.toEqual({
ok: false,
error: 'Realtime connection timed out',
});
} finally {
jest.useRealTimers();
}
});
it('not configured → { ok: false } with the not-configured message', async () => {
const { service } = makeService(null);
await expect(service.testConnection('ws-1')).resolves.toEqual({
ok: false,
error: 'AI STT model not configured',
});
});
});

View File

@@ -0,0 +1,674 @@
import { Injectable, Logger } from '@nestjs/common';
import {
lookup as dnsLookup,
type LookupAddress,
type LookupAllOptions,
type LookupOneOptions,
} from 'node:dns';
import type { LookupFunction } from 'node:net';
// CJS import-equals so the WebSocket class (and its static OPEN/CONNECTING
// constants) resolves as a runtime VALUE regardless of esModuleInterop — a
// plain `import WebSocket from 'ws'` compiles to `ws_1.default`, which is
// undefined for this CommonJS module.
import WebSocket = require('ws');
import { AiSettingsService } from '../../../integrations/ai/ai-settings.service';
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
import { describeProviderError } from '../../../integrations/ai/ai-error.util';
import { isUrlAllowed, isIpAllowed } from '../external-mcp/ssrf-guard';
/**
* WebSocket client options plus the `lookup` DNS hook. `@types/ws` does not
* declare `lookup`, but the `ws` lib spreads the options into the underlying
* `http(s).request` → `net`/`tls.connect`, both of which honor `lookup`. We add
* it here so the SSRF IP-pinning hook is type-checked.
*/
export type RealtimeWsOptions = WebSocket.ClientOptions & {
lookup?: LookupFunction;
};
/**
* Factory that creates the upstream WebSocket. Defaults to the real `ws`
* constructor; tests inject a fake (an EventEmitter with readyState/send/close)
* so the handshake/message-routing/timer logic can be exercised without a
* network. The `lookup` SSRF-pinning option lives in `opts` (see openSession).
*/
export type WsFactory = (url: string, opts: RealtimeWsOptions) => WebSocket;
/** The default, production WebSocket factory. */
const defaultWsFactory: WsFactory = (url, opts) => new WebSocket(url, opts);
/**
* Build a `net`/`tls` `lookup` option that pins the connection to SSRF-validated
* addresses. The `ws` lib forwards this option through `http(s).request` into
* `net.connect`/`tls.connect`, so it is invoked for EVERY DNS resolution the
* socket performs — there is no second, unchecked resolution after our pre-flight
* `isUrlAllowed` check. We always resolve ALL addresses ourselves, validate each
* with `isIpAllowed`, and only ever hand back validated addresses; a single
* blocked address fails the whole connect. Mirrors buildPinnedDispatcher in the
* external-MCP layer. The hostname (SNI / Host header) is left untouched so TLS
* certificate validation still uses the real hostname.
*/
function buildPinnedLookup(): LookupFunction {
const pinnedLookup = (
hostname: string,
options: LookupOneOptions | LookupAllOptions | number,
callback: (
err: NodeJS.ErrnoException | null,
address: string | LookupAddress[],
family?: number,
) => void,
): void => {
// Whether net requested a single address or all of them, resolve ALL
// ourselves and validate every one; do not trust the caller's `all` flag.
const wantAll =
typeof options === 'object' && options !== null && options.all === true;
dnsLookup(hostname, { all: true }, (err, addresses) => {
if (err) {
callback(err, '', 0);
return;
}
const addrs = addresses as LookupAddress[];
if (addrs.length === 0) {
callback(new Error(`No address resolved for ${hostname}`), '', 0);
return;
}
const blocked = addrs.find((a) => !isIpAllowed(a.address).ok);
if (blocked) {
// Refuse the connection: net/tls.connect never sees this address.
callback(new Error(`Blocked address for ${hostname}`), '', 0);
return;
}
const validated: LookupAddress[] = addrs.map((a) => ({
address: a.address,
family: a.family,
}));
if (wantAll) {
callback(null, validated);
} else {
// Single-address form: hand back the first validated entry.
callback(null, validated[0].address, validated[0].family);
}
});
};
return pinnedLookup as LookupFunction;
}
/**
* Realtime STT proxy (server side of the A2 transport: browser ↔ OUR server ↔
* OpenAI). The provider API key is resolved here and NEVER leaves the server /
* NEVER logged. The client only ever sees the normalized events emitted via the
* callbacks below — never the raw OpenAI GA schema.
*
* The upstream contract is the GA (2026) OpenAI Realtime transcription shape:
* wss://<host>/v1/realtime?intent=transcription
* header: Authorization: Bearer <sttApiKey> (NO OpenAI-Beta header in GA)
* one session.update after open, then input_audio_buffer.append frames.
*/
/** Normalized result of parsing a single raw upstream (OpenAI GA) event. */
export interface ParsedUpstreamEvent {
type: 'ready' | 'interim' | 'final' | 'error' | 'ignore';
itemId?: string;
text?: string;
message?: string;
}
/** Callbacks the gateway supplies to bridge upstream events to the client. */
export interface OpenSessionOptions {
/** Optional transcription language hint (e.g. 'en'); omitted from session.update when absent. */
language?: string;
/** Upstream session is live → client may start sending audio. */
onReady: () => void;
/** Latest accumulated partial text for a not-yet-final segment. */
onInterim: (itemId: string, text: string) => void;
/** A completed segment's final (trimmed) transcript. */
onFinal: (itemId: string, text: string) => void;
/** Concrete error reason for the client. */
onError: (message: string) => void;
/** Session ended (graceful stop or upstream close). */
onClosed: () => void;
}
/** Handle returned by openSession; the gateway drives audio/stop/close through it. */
export interface RealtimeSessionHandle {
/** Base64-encode a PCM16 chunk and forward as input_audio_buffer.append (if upstream OPEN). */
appendAudio: (chunk: Buffer | Uint8Array) => void;
/** Graceful stop: optionally commit, then close the upstream. */
stop: () => void;
/** Force-close the upstream and clear timers (idempotent). */
close: () => void;
}
/** No audio appended for this long → close the session with a clear reason. */
const IDLE_TIMEOUT_MS = 15_000;
/** Hard cap on a single realtime session's lifetime (mirrors the client's 120s). */
const MAX_SESSION_DURATION_MS = 120_000;
/**
* On graceful stop, how long to wait for the final in-flight segment's
* `...transcription.completed` (the tail after the last VAD pause) before
* closing anyway. Without this drain the last dictated phrase is silently lost.
*/
const STOP_DRAIN_TIMEOUT_MS = 2_500;
/** How long testConnection waits for the upstream to become ready before failing. */
const TEST_CONNECTION_TIMEOUT_MS = 8_000;
/**
* Parse ONE raw upstream (OpenAI GA) event JSON and normalize it, updating the
* per-item delta accumulator `acc` in place. PURE (aside from the supplied `acc`
* mutation) so it can be unit-tested without any network. Unknown or unparseable
* frames normalize to { type: 'ignore' } so the proxy silently skips them.
*
* - session.created / session.updated → { type: 'ready' }
* - conversation.item.input_audio_transcription.delta → append delta to
* acc[`${item_id}:${content_index}`]; return { type: 'interim', itemId,
* text: <accumulated> }
* - conversation.item.input_audio_transcription.completed → final transcript
* (trimmed), delete acc entry; return { type: 'final', itemId, text }
* - error → { type: 'error', message } (provider message, else describeProviderError)
* - anything else / unparseable → { type: 'ignore' }
*
* The accumulator is keyed by `${item_id}:${content_index ?? 0}` so distinct
* content parts of the same item (GA may stream multiple `content_index`
* segments under one item_id) are never concatenated together.
*/
export function parseUpstreamEvent(
raw: string,
acc: Map<string, string>,
): ParsedUpstreamEvent {
let evt: {
type?: string;
item_id?: string;
content_index?: number;
delta?: string;
transcript?: string;
error?: { message?: string; code?: string; type?: string };
};
try {
evt = JSON.parse(raw);
} catch {
// Non-JSON frame: ignore rather than crash the proxy.
return { type: 'ignore' };
}
if (typeof evt !== 'object' || evt === null || typeof evt.type !== 'string') {
return { type: 'ignore' };
}
// Compose the accumulator key from item_id + content_index so two distinct
// content parts under the same item_id don't get merged.
const accKey = (itemId: string): string =>
`${itemId}:${evt.content_index ?? 0}`;
switch (evt.type) {
case 'session.created':
case 'session.updated':
return { type: 'ready' };
case 'conversation.item.input_audio_transcription.delta': {
const itemId = evt.item_id;
if (!itemId) return { type: 'ignore' };
const key = accKey(itemId);
const prev = acc.get(key) ?? '';
const next = prev + (evt.delta ?? '');
acc.set(key, next);
return { type: 'interim', itemId, text: next };
}
case 'conversation.item.input_audio_transcription.completed': {
const itemId = evt.item_id;
if (!itemId) return { type: 'ignore' };
const key = accKey(itemId);
// Prefer the authoritative `transcript`; fall back to whatever we
// accumulated from deltas if the completed frame omits it.
const text = (evt.transcript ?? acc.get(key) ?? '').trim();
acc.delete(key);
return { type: 'final', itemId, text };
}
case 'error': {
// Surface the provider's concrete cause; never a generic message.
const message =
evt.error?.message?.trim() ||
describeProviderError(evt.error, 'Realtime transcription error');
return { type: 'error', message };
}
default:
return { type: 'ignore' };
}
}
@Injectable()
export class AiRealtimeService {
private readonly logger = new Logger(AiRealtimeService.name);
/**
* WebSocket factory seam (R-SVC1). Defaults to the real `ws` constructor;
* tests can override it with `setWsFactory` to inject a fake socket. Nest
* always constructs the service with the single `aiSettings` dependency, so
* the factory is settable rather than a constructor param.
*/
private wsFactory: WsFactory = defaultWsFactory;
constructor(private readonly aiSettings: AiSettingsService) {}
/** Test seam: override the WebSocket factory (R-SVC1). */
setWsFactory(factory: WsFactory): void {
this.wsFactory = factory;
}
/**
* Resolve the workspace STT config, SSRF-check the upstream, open the upstream
* realtime WS and wire its events to the supplied callbacks. Returns a handle
* the caller uses to push audio / stop / close. Throws
* AiSttNotConfiguredException when no driver/STT model is configured, or a
* plain Error (with a concrete reason) when the SSRF check fails.
*/
async openSession(
workspaceId: string,
opts: OpenSessionOptions,
): Promise<RealtimeSessionHandle> {
const cfg = await this.aiSettings.resolve(workspaceId);
const model = cfg?.sttRealtimeModel || cfg?.sttModel;
if (!cfg?.driver || !model) {
throw new AiSttNotConfiguredException();
}
const baseUrl = cfg.sttRealtimeBaseUrl || cfg.sttBaseUrl || cfg.baseUrl;
const wssUrl = AiRealtimeService.deriveRealtimeUrl(baseUrl);
// Fast pre-flight SSRF check on the http(s) equivalent (ssrf-guard only
// allows http/https): wss→https, ws→http. This is only a cheap pre-check —
// the `ws` lib re-resolves DNS independently when it connects, so a pass here
// does NOT guarantee the socket connects to a public address (TOCTOU /
// DNS-rebinding). The authoritative defense is the pinned `lookup` below,
// which validates EVERY resolved address right before connect (full parity
// with the external-MCP buildPinnedDispatcher — no second unchecked DNS
// resolution).
const httpEquivalent = wssUrl.replace(/^wss:/i, 'https:').replace(/^ws:/i, 'http:');
const check = await isUrlAllowed(httpEquivalent);
if (!check.ok) {
throw new Error(
`Realtime endpoint blocked by SSRF guard: ${check.reason ?? 'not allowed'}`,
);
}
const key = cfg.sttApiKey;
// Never log the key; only the (non-secret) URL is safe to log.
this.logger.log(`Opening realtime STT session for workspace ${workspaceId}`);
const ws = this.wsFactory(wssUrl, {
headers: key ? { Authorization: `Bearer ${key}` } : {},
// DO NOT send OpenAI-Beta: realtime=v1 — removed in GA.
// SSRF IP-pinning: the `ws` lib forwards this `lookup` into the underlying
// net/tls connect, so every resolved address is validated before connect.
lookup: buildPinnedLookup(),
});
let closed = false;
// Graceful-stop drain state: once stop() commits with an in-flight segment,
// `draining` is true and we hold the socket open until that segment's
// `completed` arrives (delivered via onFinal) or the drain timer fires.
let draining = false;
let idleTimer: NodeJS.Timeout | undefined;
let maxTimer: NodeJS.Timeout | undefined;
let drainTimer: NodeJS.Timeout | undefined;
const clearTimers = (): void => {
if (idleTimer) {
clearTimeout(idleTimer);
idleTimer = undefined;
}
if (maxTimer) {
clearTimeout(maxTimer);
maxTimer = undefined;
}
if (drainTimer) {
clearTimeout(drainTimer);
drainTimer = undefined;
}
};
// Idempotent teardown: clears timers, force-closes the upstream, fires
// onClosed exactly once.
const teardown = (): void => {
if (closed) return;
closed = true;
clearTimers();
try {
if (
ws.readyState === WebSocket.OPEN ||
ws.readyState === WebSocket.CONNECTING
) {
ws.close();
}
} catch {
// Ignore close races; the socket is being discarded anyway.
}
opts.onClosed();
};
const failWith = (message: string): void => {
if (closed) return;
opts.onError(message);
teardown();
};
const resetIdleTimer = (): void => {
if (closed) return;
if (idleTimer) clearTimeout(idleTimer);
idleTimer = setTimeout(() => {
failWith(
`Realtime session idle for ${IDLE_TIMEOUT_MS}ms (no audio received); closing.`,
);
}, IDLE_TIMEOUT_MS);
idleTimer.unref?.();
};
// Hard lifetime cap, armed immediately so a never-opening or runaway session
// is always reclaimed.
maxTimer = setTimeout(() => {
failWith(
`Realtime session exceeded the maximum duration of ${MAX_SESSION_DURATION_MS}ms; closing.`,
);
}, MAX_SESSION_DURATION_MS);
maxTimer.unref?.();
// Also guard the handshake itself: if the upstream never opens / never sends,
// the idle timer (15s) reclaims it well before the 120s max-duration cap.
resetIdleTimer();
const acc = new Map<string, string>();
ws.on('open', () => {
if (closed) return;
// GA session.update: declare the transcription session, PCM16/24kHz mono
// input, server VAD auto-segmentation, the effective model and (optional)
// language. `language` is included only when the client supplied one.
const transcription: { model: string; language?: string } = { model };
if (opts.language) transcription.language = opts.language;
const sessionUpdate = {
type: 'session.update',
session: {
type: 'transcription',
audio: {
input: {
format: { type: 'audio/pcm', rate: 24000 },
turn_detection: { type: 'server_vad' },
transcription,
},
},
},
};
try {
ws.send(JSON.stringify(sessionUpdate));
} catch (err) {
this.logger.error('Failed to send realtime session.update', err as Error);
failWith(describeProviderError(err, 'Failed to start realtime session'));
return;
}
// Start the idle clock once the upstream is live.
resetIdleTimer();
});
ws.on('message', (data: WebSocket.RawData) => {
if (closed) return;
const raw = AiRealtimeService.rawDataToString(data);
const parsed = parseUpstreamEvent(raw, acc);
switch (parsed.type) {
case 'ready':
opts.onReady();
break;
case 'interim':
opts.onInterim(parsed.itemId!, parsed.text ?? '');
break;
case 'final':
opts.onFinal(parsed.itemId!, parsed.text ?? '');
// If we're draining on a graceful stop and the accumulator is now
// empty (no more in-flight segments), the tail we were waiting for has
// arrived and been delivered → close now.
if (draining && acc.size === 0) {
teardown();
}
break;
case 'error':
// Log the full upstream error then surface the concrete reason.
this.logger.error(`Realtime upstream error: ${parsed.message}`);
failWith(parsed.message ?? 'Realtime transcription error');
break;
case 'ignore':
default:
break;
}
});
ws.on('error', (err: Error) => {
// Log the full error (name/message/stack); never the key/audio.
this.logger.error('Realtime upstream socket error', err);
failWith(describeProviderError(err, 'Realtime upstream connection error'));
});
ws.on('close', (code: number, reason: Buffer) => {
if (closed) return;
const why = reason?.toString?.() || '';
// An unexpected close (not via stop()/teardown) is reported as a concrete
// reason; onClosed always fires via teardown.
this.logger.log(
`Realtime upstream closed (code ${code}${why ? `: ${why}` : ''})`,
);
if (code !== 1000) {
failWith(
`Realtime upstream closed (code ${code}${why ? `: ${why}` : ''}).`,
);
return;
}
teardown();
});
return {
appendAudio: (chunk: Buffer | Uint8Array): void => {
if (closed || ws.readyState !== WebSocket.OPEN) return;
const audio = Buffer.from(chunk).toString('base64');
try {
ws.send(JSON.stringify({ type: 'input_audio_buffer.append', audio }));
} catch (err) {
this.logger.error('Failed to forward realtime audio chunk', err as Error);
failWith(describeProviderError(err, 'Failed to forward audio'));
return;
}
// Audio flowing again → push the idle deadline out.
resetIdleTimer();
},
stop: (): void => {
if (closed) return;
// Graceful stop: with server_vad no manual commit is required, but an
// explicit commit flushes any buffered tail before we close.
if (ws.readyState === WebSocket.OPEN) {
try {
ws.send(JSON.stringify({ type: 'input_audio_buffer.commit' }));
} catch (err) {
// A failed commit is non-fatal; we still close gracefully below.
this.logger.error('Failed to commit realtime audio buffer', err as Error);
}
}
// If there is an in-flight segment (a non-empty accumulation), do NOT
// close immediately: the committed tail's `...transcription.completed`
// arrives a beat later and would be dropped by the `closed` guard,
// silently losing the last dictated phrase. Instead drain: hold the
// socket open until that `completed` lands (delivered via onFinal, which
// then tears down) or a short timeout fires. With nothing in flight we
// can close right away.
if (ws.readyState === WebSocket.OPEN && acc.size > 0) {
draining = true;
// The idle timer would otherwise fire mid-drain; the drain timer is
// the bounded fallback that closes if the tail never arrives.
if (idleTimer) {
clearTimeout(idleTimer);
idleTimer = undefined;
}
drainTimer = setTimeout(() => {
// Tail never arrived within the drain window: close anyway. The
// partial (if any) was already delivered as the latest interim.
teardown();
}, STOP_DRAIN_TIMEOUT_MS);
// Don't let the drain timer keep the event loop alive (mirror idle/max).
drainTimer.unref?.();
return;
}
teardown();
},
close: (): void => {
teardown();
},
};
}
/**
* Admin "test connection" probe for the realtime STT upstream. Reuses
* openSession so the real config-resolution, SSRF check and handshake path are
* exercised, then tears the upstream socket down immediately — no audio is ever
* sent. Resolves to the FROZEN contract { ok: true } | { ok: false, error }.
*
* Resolution rules (settle exactly once, guarded by `settled`):
* - first onReady → { ok: true }
* - first onError(message) → { ok: false, error: message }
* - ~8s timeout → { ok: false, error: 'Realtime connection timed out' }
* - openSession(...) throws → { ok: false, error } (AiSttNotConfigured message,
* else describeProviderError)
*
* On any outcome the upstream handle is closed and the timer cleared exactly
* once, so this never leaves a socket open. The API key is never logged.
*/
async testConnection(
workspaceId: string,
): Promise<{ ok: true } | { ok: false; error: string }> {
return new Promise<{ ok: true } | { ok: false; error: string }>(
(resolve) => {
let settled = false;
let handle: RealtimeSessionHandle | undefined;
let timer: NodeJS.Timeout | undefined;
// Settle once: clear the timer, close the upstream handle, resolve.
const finish = (
result: { ok: true } | { ok: false; error: string },
): void => {
if (settled) return;
settled = true;
if (timer) {
clearTimeout(timer);
timer = undefined;
}
try {
handle?.close();
} catch {
// Ignore close races; the socket is being discarded anyway.
}
resolve(result);
};
// Arm the timeout before opening so a never-readying upstream is reclaimed.
timer = setTimeout(() => {
finish({ ok: false, error: 'Realtime connection timed out' });
}, TEST_CONNECTION_TIMEOUT_MS);
this.openSession(workspaceId, {
onReady: () => finish({ ok: true }),
onError: (message) => finish({ ok: false, error: message }),
// No audio is ever sent; these are no-ops for the probe.
onInterim: () => {},
onFinal: () => {},
onClosed: () => {},
})
.then((opened) => {
handle = opened;
// openSession may have already errored/closed synchronously before
// we stored the handle; if we've settled, close it now.
if (settled) {
try {
handle.close();
} catch {
// Ignore close races.
}
}
})
.catch((err: unknown) => {
// openSession threw (AiSttNotConfiguredException or SSRF/Error)
// before any socket was returned: surface a concrete reason.
const error =
err instanceof AiSttNotConfiguredException
? err.message
: describeProviderError(err, 'Realtime connection failed');
finish({ ok: false, error });
});
},
);
}
/**
* Derive the upstream realtime WSS URL from the (optional) effective base URL.
*
* - Empty / whitespace base → OpenAI default
* `wss://api.openai.com/v1/realtime?intent=transcription`.
* - A non-empty base that fails to parse → THROW. We must NOT silently fall
* back to OpenAI: a self-hosted STT key + live audio would then leak to
* OpenAI (fail-closed, not fail-open).
* - A non-secure scheme (`http://` / `ws://`) → THROW. The Authorization
* Bearer key would otherwise travel in plaintext to a self-hosted endpoint.
* - Otherwise: take the base origin, ensure exactly one
* `/v1/realtime?intent=transcription` path, and produce a `wss://` URL. A
* base that already ends in `/v1` (or `/v1/realtime`) does not get a
* duplicated `/v1`.
*/
static deriveRealtimeUrl(baseUrl?: string): string {
if (!baseUrl || !baseUrl.trim()) {
return 'wss://api.openai.com/v1/realtime?intent=transcription';
}
const trimmed = baseUrl.trim();
let parsed: URL;
try {
parsed = new URL(trimmed);
} catch {
// Fail-closed: a configured-but-unparseable base must NOT silently fall
// back to OpenAI (would leak a self-hosted key + audio to OpenAI).
throw new Error(
`Invalid realtime STT base URL: "${trimmed}" could not be parsed.`,
);
}
// Require a secure scheme. Reject http/ws: the Bearer key would be sent in
// plaintext. (Default is reject; loopback-only relaxations, if ever needed,
// would be added explicitly here.)
if (parsed.protocol === 'http:' || parsed.protocol === 'ws:') {
throw new Error(
`Insecure realtime STT base URL: "${trimmed}" uses ${parsed.protocol}//; ` +
'a secure https:// / wss:// scheme is required so the API key is not sent in plaintext.',
);
}
// Normalize the path: strip a trailing slash, drop an existing
// `/realtime` suffix, ensure a single `/v1`, then append the realtime path.
let path = parsed.pathname.replace(/\/+$/, '');
path = path.replace(/\/realtime$/i, '');
if (!/\/v1$/i.test(path)) {
path = `${path}/v1`;
}
path = `${path}/realtime`;
// The scheme is already secure (https: / wss:); the resulting realtime URL
// is always wss://. The SSRF guard runs on the https equivalent before
// connecting, and the pinned lookup validates each resolved address.
return `wss://${parsed.host}${path}?intent=transcription`;
}
/** Normalize a ws RawData payload (Buffer | ArrayBuffer | Buffer[]) to a string. */
private static rawDataToString(data: WebSocket.RawData): string {
if (typeof data === 'string') return data;
if (Buffer.isBuffer(data)) return data.toString('utf8');
if (Array.isArray(data)) return Buffer.concat(data).toString('utf8');
// ArrayBuffer
return Buffer.from(data as ArrayBuffer).toString('utf8');
}
}

View File

@@ -0,0 +1,145 @@
import {
SessionCounters,
canConnect,
MAX_SESSIONS_PER_USER,
MAX_SESSIONS_PER_WORKSPACE,
} from './session-limits';
describe('SessionCounters', () => {
let counters: SessionCounters;
beforeEach(() => {
counters = new SessionCounters();
});
it('starts at 0 for an unknown key', () => {
expect(counters.count('a')).toBe(0);
});
it('increments and reports the running count', () => {
expect(counters.increment('a')).toBe(1);
expect(counters.increment('a')).toBe(2);
expect(counters.count('a')).toBe(2);
// Distinct keys are independent.
expect(counters.count('b')).toBe(0);
});
it('decrements and deletes the key at zero (no phantom zero-entry)', () => {
counters.increment('a');
counters.increment('a');
expect(counters.decrement('a')).toBe(1);
expect(counters.decrement('a')).toBe(0);
expect(counters.count('a')).toBe(0);
// Re-incrementing a deleted key starts fresh at 1.
expect(counters.increment('a')).toBe(1);
});
it('decrementing an absent key is a no-op: no negative, no phantom slot', () => {
expect(counters.decrement('missing')).toBe(0);
expect(counters.count('missing')).toBe(0);
// A subsequent increment must start at 1, proving no -1 was stored.
expect(counters.increment('missing')).toBe(1);
});
it('double-decrement past zero never goes negative', () => {
counters.increment('a');
expect(counters.decrement('a')).toBe(0);
expect(counters.decrement('a')).toBe(0);
expect(counters.count('a')).toBe(0);
});
it('reset() clears all counts', () => {
counters.increment('a');
counters.increment('b');
counters.reset();
expect(counters.count('a')).toBe(0);
expect(counters.count('b')).toBe(0);
});
});
describe('canConnect', () => {
const enabled = { ai: { dictation: true, dictationRealtime: true } };
const zero = { userCount: 0, workspaceCount: 0 };
it('allows when both flags are true and caps are not hit', () => {
expect(canConnect('u', 'w', enabled, zero)).toEqual({ allowed: true });
});
it('denies when only dictation is enabled (XOR)', () => {
const res = canConnect('u', 'w', { ai: { dictation: true } }, zero);
expect(res.allowed).toBe(false);
if (res.allowed === false) expect(res.reason).toMatch(/not enabled/i);
});
it('denies when only dictationRealtime is enabled (XOR)', () => {
const res = canConnect(
'u',
'w',
{ ai: { dictationRealtime: true } },
zero,
);
expect(res.allowed).toBe(false);
});
it('denies when settings.ai is absent', () => {
const res = canConnect('u', 'w', {}, zero);
expect(res.allowed).toBe(false);
if (res.allowed === false) expect(res.reason).toMatch(/not enabled/i);
});
it('denies when settings is undefined', () => {
const res = canConnect('u', 'w', undefined, zero);
expect(res.allowed).toBe(false);
});
it('enforces the user cap with >= (no off-by-one)', () => {
// At exactly the cap → deny (>= boundary).
const atCap = canConnect('u', 'w', enabled, {
userCount: MAX_SESSIONS_PER_USER,
workspaceCount: 0,
});
expect(atCap.allowed).toBe(false);
if (atCap.allowed === false) expect(atCap.reason).toMatch(/already active/i);
// One below the cap → allow.
const below = canConnect('u', 'w', enabled, {
userCount: MAX_SESSIONS_PER_USER - 1,
workspaceCount: 0,
});
expect(below.allowed).toBe(true);
});
it('enforces the workspace cap with >= (no off-by-one)', () => {
const atCap = canConnect('u', 'w', enabled, {
userCount: 0,
workspaceCount: MAX_SESSIONS_PER_WORKSPACE,
});
expect(atCap.allowed).toBe(false);
if (atCap.allowed === false) expect(atCap.reason).toMatch(/maximum number/i);
const below = canConnect('u', 'w', enabled, {
userCount: 0,
workspaceCount: MAX_SESSIONS_PER_WORKSPACE - 1,
});
expect(below.allowed).toBe(true);
});
it('reports the gate reason before any cap reason (gate has priority)', () => {
// Feature disabled AND caps exceeded → the gate reason wins.
const res = canConnect('u', 'w', {}, {
userCount: MAX_SESSIONS_PER_USER,
workspaceCount: MAX_SESSIONS_PER_WORKSPACE,
});
expect(res.allowed).toBe(false);
if (res.allowed === false) expect(res.reason).toMatch(/not enabled/i);
});
it('reports the user-cap reason before the workspace-cap reason', () => {
const res = canConnect('u', 'w', enabled, {
userCount: MAX_SESSIONS_PER_USER,
workspaceCount: MAX_SESSIONS_PER_WORKSPACE,
});
expect(res.allowed).toBe(false);
if (res.allowed === false) expect(res.reason).toMatch(/already active/i);
});
});

View File

@@ -0,0 +1,132 @@
/**
* Extracted, dependency-free concurrency-cap primitives for the realtime
* dictation gateway. Pulled out of the gateway so the cap arithmetic and the
* connect decision can be unit-tested directly, without standing up Socket.IO,
* the token service, or the database.
*
* ────────────────────────────────────────────────────────────────────────────
* OPS WARNING — PER-PROCESS / IN-MEMORY ONLY.
* SessionCounters is a plain in-heap Map. The caps it backs are correct ONLY on
* a SINGLE API replica. With N horizontally scaled replicas the effective limit
* is N × the cap (each replica counts only the sockets it terminates). A true
* global cap across replicas needs a shared store (e.g. Redis). By design for
* the single-process default; documented loudly so it is never mistaken for a
* global guarantee.
* ────────────────────────────────────────────────────────────────────────────
*/
/** Realtime is expensive: one live session per user, a handful per workspace. */
export const MAX_SESSIONS_PER_USER = 1;
export const MAX_SESSIONS_PER_WORKSPACE = 5;
/**
* A small Map-backed counter keyed by an arbitrary string (user id / workspace
* id). `increment` adds a slot, `decrement` removes one and deletes the key at
* zero so the map never accumulates phantom zero-entries and never goes
* negative. `count` reads the current value, `reset` clears everything (tests).
*/
export class SessionCounters {
private readonly counts = new Map<string, number>();
/** Returns the current count for a key (0 when absent). */
count(key: string): number {
return this.counts.get(key) ?? 0;
}
/** Add one slot for `key`; returns the new count. */
increment(key: string): number {
const next = (this.counts.get(key) ?? 0) + 1;
this.counts.set(key, next);
return next;
}
/**
* Remove one slot for `key`. Deletes the key once it reaches zero so no
* phantom entry lingers, and never records a negative count (decrementing an
* absent or zero key is a no-op). Returns the resulting count.
*/
decrement(key: string): number {
const current = this.counts.get(key);
// Absent / zero / negative → nothing to release; never create a phantom slot.
if (!current || current <= 0) {
this.counts.delete(key);
return 0;
}
const next = current - 1;
if (next <= 0) {
this.counts.delete(key);
return 0;
}
this.counts.set(key, next);
return next;
}
/** Drop all counts. Test helper. */
reset(): void {
this.counts.clear();
}
}
/** Current concurrency snapshot passed into `canConnect`. */
export interface SessionCounts {
userCount: number;
workspaceCount: number;
}
/** Minimal view of the workspace settings `canConnect` needs. */
export interface RealtimeGateSettings {
ai?: { dictation?: boolean; dictationRealtime?: boolean };
}
export type CanConnectResult =
| { allowed: true }
| { allowed: false; reason: string };
/**
* Pure decision: may a new realtime dictation socket open right now?
*
* Gate (both required): `settings.ai.dictation === true` AND
* `settings.ai.dictationRealtime === true`. Missing `settings.ai` → deny.
*
* Caps (checked with `>=`, no off-by-one): deny when the user already holds
* `MAX_SESSIONS_PER_USER` sessions, or the workspace already holds
* `MAX_SESSIONS_PER_WORKSPACE`. The gate is evaluated before the caps so a
* disabled feature reports the gate reason, not a cap reason.
*
* The caller MUST check this BEFORE incrementing any counter so a denied
* connection leaves the counters untouched.
*/
export function canConnect(
_userId: string,
_workspaceId: string,
settings: RealtimeGateSettings | undefined,
counts: SessionCounts,
): CanConnectResult {
// Feature gate first.
if (
settings?.ai?.dictation !== true ||
settings?.ai?.dictationRealtime !== true
) {
return { allowed: false, reason: 'Realtime dictation is not enabled' };
}
// Per-user cap.
if (counts.userCount >= MAX_SESSIONS_PER_USER) {
return {
allowed: false,
reason:
'A realtime dictation session is already active for your account',
};
}
// Per-workspace cap.
if (counts.workspaceCount >= MAX_SESSIONS_PER_WORKSPACE) {
return {
allowed: false,
reason:
'The maximum number of concurrent realtime dictation sessions for this workspace has been reached',
};
}
return { allowed: true };
}

View File

@@ -48,6 +48,28 @@ describe('UpdateWorkspaceDto.trackerHead validation', () => {
});
});
describe('UpdateWorkspaceDto.aiDictationRealtime validation', () => {
it('accepts aiDictationRealtime: true', async () => {
const errors = await validateDto({ aiDictationRealtime: true });
expect(hasError(errors, 'aiDictationRealtime')).toBe(false);
});
it('accepts aiDictationRealtime: false', async () => {
const errors = await validateDto({ aiDictationRealtime: false });
expect(hasError(errors, 'aiDictationRealtime')).toBe(false);
});
it('rejects a non-boolean aiDictationRealtime with an isBoolean error', async () => {
const errors = await validateDto({ aiDictationRealtime: 'yes' });
expect(hasError(errors, 'aiDictationRealtime', 'isBoolean')).toBe(true);
});
it('accepts an omitted aiDictationRealtime (optional)', async () => {
const errors = await validateDto({});
expect(hasError(errors, 'aiDictationRealtime')).toBe(false);
});
});
describe('UpdateWorkspaceDto.htmlEmbed validation', () => {
it('accepts htmlEmbed: true', async () => {
const errors = await validateDto({ htmlEmbed: true });

View File

@@ -55,6 +55,10 @@ export class UpdateWorkspaceDto extends PartialType(CreateWorkspaceDto) {
@IsBoolean()
aiDictation: boolean;
@IsOptional()
@IsBoolean()
aiDictationRealtime: boolean;
// Workspace master toggle that enables/disables the HTML embed block type.
// Persisted at settings.htmlEmbed. ABSENT/false => OFF (default). The block
// itself renders in a sandboxed iframe, so this is a feature switch, not a

View File

@@ -0,0 +1,131 @@
import { WorkspaceService } from './workspace.service';
/**
* Exercises the REAL WorkspaceService.update aiDictationRealtime branch at the
* service seam:
* - an update carrying `aiDictationRealtime` calls
* `workspaceRepo.updateAiSettings(workspaceId, 'dictationRealtime', value, trx)`
* (note the SETTING KEY is 'dictationRealtime', not the DTO field name);
* - the change is audited (before/after) when the value actually changes;
* - the `aiDictationRealtime` field is removed from the DTO BEFORE the generic
* `updateWorkspace(dto, ...)` runs, so it can never be written as a (non-
* existent) workspaces column.
*
* The repo, db transaction, and audit service are mocked; `executeTx` runs the
* callback against a fake trx.
*/
describe('WorkspaceService.update — aiDictationRealtime branch (real code)', () => {
function buildService(opts: { settingsBefore?: Record<string, any> }) {
const updateAiSettings = jest.fn().mockResolvedValue(undefined);
const updateWorkspace = jest.fn().mockResolvedValue(undefined);
const workspaceRepo = {
// First call: read settingsBefore. Second call (with options): the updated
// workspace (must include a licenseKey because update() destructures it).
findById: jest
.fn()
.mockResolvedValueOnce({ id: 'w1', settings: opts.settingsBefore ?? {} })
.mockResolvedValueOnce({ id: 'w1', name: 'WS', licenseKey: null }),
updateAiSettings,
updateWorkspace,
};
// Fake kysely db: only .transaction().execute(cb) is used on this path.
const db = {
transaction: jest.fn(() => ({
execute: jest.fn(async (cb: any) => cb({ __trx: true })),
})),
};
const auditService = { log: jest.fn() };
const service = new WorkspaceService(
workspaceRepo as any, // workspaceRepo
{} as any, // spaceService
{} as any, // spaceMemberService
{} as any, // groupRepo
{} as any, // groupUserRepo
{} as any, // userRepo
{} as any, // environmentService
{} as any, // domainService
{} as any, // licenseCheckService
{} as any, // shareRepo
{} as any, // watcherRepo
{} as any, // favoriteRepo
db as any, // db (InjectKysely)
{} as any, // attachmentQueue
{} as any, // billingQueue
{} as any, // aiQueue
auditService as any, // auditService
{} as any, // userSessionRepo
);
return { service, workspaceRepo, updateAiSettings, updateWorkspace, auditService };
}
it("persists true via updateAiSettings with the 'dictationRealtime' key", async () => {
const { service, updateAiSettings } = buildService({});
await service.update('w1', { aiDictationRealtime: true } as any);
expect(updateAiSettings).toHaveBeenCalledTimes(1);
expect(updateAiSettings).toHaveBeenCalledWith(
'w1',
'dictationRealtime',
true,
expect.anything(), // the transaction handle
);
});
it('persists false (explicit disable is not dropped)', async () => {
const { service, updateAiSettings } = buildService({
settingsBefore: { ai: { dictationRealtime: true } },
});
await service.update('w1', { aiDictationRealtime: false } as any);
expect(updateAiSettings).toHaveBeenCalledWith(
'w1',
'dictationRealtime',
false,
expect.anything(),
);
});
it('does NOT call updateAiSettings when aiDictationRealtime is undefined', async () => {
const { service, updateAiSettings } = buildService({});
await service.update('w1', { name: 'New name' } as any);
// updateAiSettings is only reached by AI branches; none fire here.
expect(updateAiSettings).not.toHaveBeenCalled();
});
it('audits the change (before/after) when the value actually changes', async () => {
const { service, auditService } = buildService({
settingsBefore: { ai: { dictationRealtime: false } },
});
await service.update('w1', { aiDictationRealtime: true } as any);
expect(auditService.log).toHaveBeenCalledTimes(1);
const logged = auditService.log.mock.calls[0][0];
expect(logged.changes.before.aiDictationRealtime).toBe(false);
expect(logged.changes.after.aiDictationRealtime).toBe(true);
});
it('removes aiDictationRealtime from the DTO before the generic updateWorkspace', async () => {
const { service, updateWorkspace } = buildService({});
await service.update('w1', {
aiDictationRealtime: true,
name: 'New name',
} as any);
expect(updateWorkspace).toHaveBeenCalledTimes(1);
const [dtoPassed] = updateWorkspace.mock.calls[0];
// The AI toggle must NOT reach the generic column writer (no such column).
expect('aiDictationRealtime' in dtoPassed).toBe(false);
// A genuine column (name) still flows through.
expect(dtoPassed.name).toBe('New name');
});
});

View File

@@ -511,6 +511,20 @@ export class WorkspaceService {
);
}
if (typeof updateWorkspaceDto.aiDictationRealtime !== 'undefined') {
const prev = settingsBefore?.ai?.dictationRealtime ?? false;
if (prev !== updateWorkspaceDto.aiDictationRealtime) {
before.aiDictationRealtime = prev;
after.aiDictationRealtime = updateWorkspaceDto.aiDictationRealtime;
}
await this.workspaceRepo.updateAiSettings(
workspaceId,
'dictationRealtime',
updateWorkspaceDto.aiDictationRealtime,
trx,
);
}
if (typeof updateWorkspaceDto.htmlEmbed !== 'undefined') {
const prev = settingsBefore?.htmlEmbed ?? false;
if (prev !== updateWorkspaceDto.htmlEmbed) {
@@ -564,6 +578,7 @@ export class WorkspaceService {
delete updateWorkspaceDto.allowMemberTemplates;
delete updateWorkspaceDto.aiChat;
delete updateWorkspaceDto.aiDictation;
delete updateWorkspaceDto.aiDictationRealtime;
delete updateWorkspaceDto.htmlEmbed;
delete updateWorkspaceDto.trackerHead;
delete updateWorkspaceDto.aiPublicShareAssistant;

View File

@@ -239,7 +239,7 @@ export class WorkspaceRepo {
// is a real jsonb object, never a double-encoded string. The CASE self-heals
// workspaces whose settings.ai.provider was previously corrupted into an
// array/string.
const ALLOWED = ['driver', 'chatModel', 'embeddingModel', 'baseUrl', 'embeddingBaseUrl', 'sttModel', 'sttBaseUrl', 'sttApiStyle', 'systemPrompt', 'publicShareChatModel', 'publicShareAssistantRoleId'];
const ALLOWED = ['driver', 'chatModel', 'embeddingModel', 'baseUrl', 'embeddingBaseUrl', 'sttModel', 'sttBaseUrl', 'sttRealtimeModel', 'sttRealtimeBaseUrl', 'sttApiStyle', 'systemPrompt', 'publicShareChatModel', 'publicShareAssistantRoleId'];
const entries = Object.entries(provider).filter(
([k, v]) => v !== undefined && ALLOWED.includes(k),
);

View File

@@ -0,0 +1,130 @@
import { AiSettingsService } from './ai-settings.service';
// Unit tests for the partial-merge behaviour of AiSettingsService.update and the
// key-fallback behaviour of resolve. Constructed directly with stub deps (no
// Nest graph): we assert exactly which repo calls fire for a given partial DTO,
// proving the realtime STT fields merge in without clobbering chat fields and
// that an empty patch writes nothing.
interface Deps {
updateAiProviderSettings: jest.Mock;
readProviderResult?: Record<string, unknown>;
getMaskedResult?: unknown;
}
function buildService(deps: Deps) {
const workspaceRepo = {
updateAiProviderSettings: deps.updateAiProviderSettings,
// findById feeds the private readProvider() (target-driver resolution).
findById: jest.fn().mockResolvedValue({
settings: { ai: { provider: deps.readProviderResult ?? {} } },
}),
};
const aiProviderCredentialsRepo = {
find: jest.fn(),
upsert: jest.fn(),
clearKey: jest.fn(),
upsertEmbeddingKey: jest.fn(),
clearEmbeddingKey: jest.fn(),
upsertSttKey: jest.fn(),
clearSttKey: jest.fn(),
};
const secretBox = {
encryptSecret: jest.fn((v: string) => `enc(${v})`),
decryptSecret: jest.fn((v: string) => `dec(${v})`),
};
const pageEmbeddingRepo = { countIndexedPages: jest.fn().mockResolvedValue(0) };
const pageRepo = { countEmbeddablePages: jest.fn().mockResolvedValue(0) };
const service = new AiSettingsService(
workspaceRepo as any,
{} as any, // aiAgentRoleRepo
aiProviderCredentialsRepo as any,
pageEmbeddingRepo as any,
pageRepo as any,
secretBox as any,
{} as any, // aiQueue
);
// getMasked is exercised at the end of update(); stub it so update() resolves
// without a second repo round-trip we don't care about here.
jest
.spyOn(service, 'getMasked')
.mockResolvedValue((deps.getMaskedResult ?? {}) as any);
return { service, workspaceRepo, aiProviderCredentialsRepo, secretBox };
}
describe('AiSettingsService.update partial merge', () => {
it('a DTO with only realtime fields patches exactly those keys', async () => {
const updateAiProviderSettings = jest.fn().mockResolvedValue(undefined);
const { service } = buildService({ updateAiProviderSettings });
await service.update('w1', {
sttRealtimeModel: 'gpt-4o-realtime',
sttRealtimeBaseUrl: 'https://api.example.com/v1',
});
expect(updateAiProviderSettings).toHaveBeenCalledTimes(1);
const [, patch] = updateAiProviderSettings.mock.calls[0];
expect(Object.keys(patch).sort()).toEqual(
['sttRealtimeBaseUrl', 'sttRealtimeModel'].sort(),
);
});
it('a DTO with chatModel does NOT clobber realtime fields (only chatModel patched)', async () => {
const updateAiProviderSettings = jest.fn().mockResolvedValue(undefined);
const { service } = buildService({ updateAiProviderSettings });
await service.update('w1', { chatModel: 'gpt-4o' });
const [, patch] = updateAiProviderSettings.mock.calls[0];
expect(patch).toEqual({ chatModel: 'gpt-4o' });
expect(patch).not.toHaveProperty('sttRealtimeModel');
expect(patch).not.toHaveProperty('sttRealtimeBaseUrl');
});
it('an empty patch never calls updateAiProviderSettings', async () => {
const updateAiProviderSettings = jest.fn().mockResolvedValue(undefined);
const { service } = buildService({ updateAiProviderSettings });
await service.update('w1', {});
expect(updateAiProviderSettings).not.toHaveBeenCalled();
});
});
describe('AiSettingsService.resolve STT key fallback', () => {
it('uses the STT-specific key when sttApiKeyEnc is present (decrypt)', async () => {
const { service, aiProviderCredentialsRepo, secretBox } = buildService({
updateAiProviderSettings: jest.fn(),
readProviderResult: { driver: 'openai', chatModel: 'gpt-4o' },
});
aiProviderCredentialsRepo.find.mockResolvedValue({
apiKeyEnc: 'CHAT',
sttApiKeyEnc: 'STT',
});
const cfg = await service.resolve('w1');
expect(cfg?.sttApiKey).toBe('dec(STT)');
expect(secretBox.decryptSecret).toHaveBeenCalledWith('STT');
});
it('falls back to the chat apiKey when sttApiKeyEnc is absent', async () => {
const { service, aiProviderCredentialsRepo } = buildService({
updateAiProviderSettings: jest.fn(),
readProviderResult: { driver: 'openai', chatModel: 'gpt-4o' },
});
aiProviderCredentialsRepo.find.mockResolvedValue({
apiKeyEnc: 'CHAT',
// no sttApiKeyEnc
});
const cfg = await service.resolve('w1');
// sttApiKey === the resolved chat apiKey (dec(CHAT)).
expect(cfg?.sttApiKey).toBe('dec(CHAT)');
expect(cfg?.apiKey).toBe('dec(CHAT)');
});
});

View File

@@ -32,6 +32,8 @@ export interface UpdateAiSettingsInput {
embeddingApiKey?: string;
sttModel?: string;
sttBaseUrl?: string;
sttRealtimeModel?: string;
sttRealtimeBaseUrl?: string;
sttApiStyle?: SttApiStyle;
sttApiKey?: string;
publicShareChatModel?: string;
@@ -163,6 +165,10 @@ export class AiSettingsService {
publicShareAssistantRoleId: provider.publicShareAssistantRoleId,
embeddingModel: provider.embeddingModel,
sttModel: provider.sttModel,
// Raw passthrough, NO fallback; the realtime consumer falls back to
// `sttModel` / (`sttBaseUrl` || `baseUrl`) at use time.
sttRealtimeModel: provider.sttRealtimeModel,
sttRealtimeBaseUrl: provider.sttRealtimeBaseUrl,
// Plain passthrough, no fallback; the transcribe path defaults unset to
// 'multipart' (current behavior).
sttApiStyle: provider.sttApiStyle,
@@ -239,6 +245,8 @@ export class AiSettingsService {
embeddingBaseUrl: provider.embeddingBaseUrl,
sttModel: provider.sttModel,
sttBaseUrl: provider.sttBaseUrl,
sttRealtimeModel: provider.sttRealtimeModel,
sttRealtimeBaseUrl: provider.sttRealtimeBaseUrl,
sttApiStyle: provider.sttApiStyle,
systemPrompt: provider.systemPrompt,
publicShareChatModel: provider.publicShareChatModel,
@@ -278,6 +286,8 @@ export class AiSettingsService {
'embeddingBaseUrl',
'sttModel',
'sttBaseUrl',
'sttRealtimeModel',
'sttRealtimeBaseUrl',
'sttApiStyle',
'systemPrompt',
'publicShareChatModel',

View File

@@ -30,6 +30,11 @@ export interface AiProviderSettings {
sttModel?: string;
// STT-specific base URL. Falls back to baseUrl when empty/unset.
sttBaseUrl?: string;
// Realtime STT model id. Falls back to `sttModel` at use time when empty/unset.
sttRealtimeModel?: string;
// Realtime STT base URL. Falls back to `sttBaseUrl` || `baseUrl` at use time
// when empty/unset.
sttRealtimeBaseUrl?: string;
sttApiStyle?: SttApiStyle;
systemPrompt?: string;
// Cheap chat model id used ONLY by the anonymous public-share assistant. The
@@ -79,6 +84,8 @@ export interface MaskedAiSettings {
embeddingBaseUrl?: string;
sttModel?: string;
sttBaseUrl?: string;
sttRealtimeModel?: string;
sttRealtimeBaseUrl?: string;
sttApiStyle?: SttApiStyle;
systemPrompt?: string;
publicShareChatModel?: string;

View File

@@ -50,6 +50,14 @@ export class UpdateAiSettingsDto {
@IsString()
sttBaseUrl?: string;
@IsOptional()
@IsString()
sttRealtimeModel?: string;
@IsOptional()
@IsString()
sttRealtimeBaseUrl?: string;
@IsOptional()
@IsIn(STT_API_STYLES)
sttApiStyle?: SttApiStyle;

View File

@@ -0,0 +1,54 @@
import 'reflect-metadata';
import { plainToInstance } from 'class-transformer';
import { validate } from 'class-validator';
import { UpdateAiSettingsDto } from './update-ai-settings.dto';
import { isUrlAllowed } from '../../../core/ai-chat/external-mcp/ssrf-guard';
// SSRF contract for sttRealtimeBaseUrl.
//
// The DTO intentionally validates sttRealtimeBaseUrl with @IsString() ONLY (no
// @IsUrl): an admin may legitimately point at an internal-looking host that DNS
// resolves to a public address, and over-strict URL validation would reject
// valid setups. The real defense is the CONNECT-TIME SSRF guard (isUrlAllowed on
// the http-equivalent of the wss URL), which blocks link-local/loopback/private
// targets. This pins both halves of that contract.
async function validateDto(payload: Record<string, unknown>) {
const dto = plainToInstance(UpdateAiSettingsDto, payload);
return validate(dto as object);
}
describe('UpdateAiSettingsDto.sttRealtimeBaseUrl is @IsString only (no @IsUrl)', () => {
it('accepts a metadata-service URL at the DTO layer (string, not URL-validated)', async () => {
const errors = await validateDto({
sttRealtimeBaseUrl: 'http://169.254.169.254/v1',
});
const fieldErr = errors.find(
(e) => e.property === 'sttRealtimeBaseUrl',
);
// No DTO-level rejection: blocking is deferred to the connect-time guard.
expect(fieldErr).toBeUndefined();
});
it('rejects a non-string sttRealtimeBaseUrl with an isString error', async () => {
const errors = await validateDto({ sttRealtimeBaseUrl: 123 });
const fieldErr = errors.find(
(e) => e.property === 'sttRealtimeBaseUrl',
);
expect(Object.keys(fieldErr?.constraints ?? {})).toContain('isString');
});
});
describe('connect-time SSRF guard blocks the metadata service', () => {
it('isUrlAllowed denies the http-equivalent of the cloud metadata endpoint', async () => {
// The realtime path derives a wss URL then checks isUrlAllowed on the
// http(s)-equivalent. For http://169.254.169.254 the equivalent is itself.
const result = await isUrlAllowed('http://169.254.169.254/v1');
expect(result.ok).toBe(false);
});
it('isUrlAllowed denies loopback', async () => {
const result = await isUrlAllowed('http://127.0.0.1/v1');
expect(result.ok).toBe(false);
});
});

99
pnpm-lock.yaml generated
View File

@@ -447,6 +447,9 @@ importers:
'@vitejs/plugin-react':
specifier: 6.0.1
version: 6.0.1(vite@8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3))
'@vitest/coverage-v8':
specifier: 4.1.6
version: 4.1.6(vitest@4.1.6)
eslint:
specifier: 9.28.0
version: 9.28.0(jiti@2.4.2)
@@ -491,7 +494,7 @@ importers:
version: 8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3)
vitest:
specifier: 4.1.6
version: 4.1.6(@opentelemetry/api@1.9.0)(@types/node@22.19.1)(happy-dom@20.8.9)(jsdom@25.0.0)(vite@8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3))
version: 4.1.6(@opentelemetry/api@1.9.0)(@types/node@22.19.1)(@vitest/coverage-v8@4.1.6)(happy-dom@20.8.9)(jsdom@25.0.0)(vite@8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3))
apps/server:
dependencies:
@@ -1427,10 +1430,18 @@ packages:
resolution: {integrity: sha512-qMlSxKbpRlAridDExk92nSobyDdpPijUq2DW6oDnUqd0iOGxmQjyqhMIihI9+zv4LPyZdRje2cavWPbCbWm3eA==}
engines: {node: '>=6.9.0'}
'@babel/helper-string-parser@7.29.7':
resolution: {integrity: sha512-Pb5ijPrZ89GDH8223L4UP8i6QApWxs04RbPQJTeWDV0/keR2E36MeKnyr6LYmUUvqRRI+Iv87SuF1W6ErINzYw==}
engines: {node: '>=6.9.0'}
'@babel/helper-validator-identifier@7.28.5':
resolution: {integrity: sha512-qSs4ifwzKJSV39ucNjsvc6WVHs6b7S03sOh2OcHF9UHfVPqWWALUsNUVzhSBiItjRZoLHx7nIarVjqKVusUZ1Q==}
engines: {node: '>=6.9.0'}
'@babel/helper-validator-identifier@7.29.7':
resolution: {integrity: sha512-qehxGkRj55h/ff8EMaJ+cYhyaKlHIxqYDn682wQD7RNp9UujOQsHog2uS0r2vzr4pW+sXf90NeeayjcNaX3fFg==}
engines: {node: '>=6.9.0'}
'@babel/helper-validator-option@7.27.1':
resolution: {integrity: sha512-YvjJow9FxbhFFKDSuFnVCe2WxXk1zWc22fFePVNEaWJEu8IrZVlda6N0uHwzZrUM1il7NC9Mlp4MaJYbYd9JSg==}
engines: {node: '>=6.9.0'}
@@ -1453,6 +1464,11 @@ packages:
engines: {node: '>=6.0.0'}
hasBin: true
'@babel/parser@7.29.7':
resolution: {integrity: sha512-hnORnjP/1P/zFEndoeX+n+t1RwWRJiJpM/jO7FW32Kn9r5+sJB2JWOdYo4L6k78j15eCwY3Gm/7364B1EMwtNg==}
engines: {node: '>=6.0.0'}
hasBin: true
'@babel/plugin-bugfix-safari-id-destructuring-collision-in-function-expression@7.23.3':
resolution: {integrity: sha512-iRkKcCqb7iGnq9+3G6rZ+Ciz5VywC4XNRHe57lKM+jOeYAoR0lVqdeeDRfh0tQcTfw/+vBhHn926FmQhLtlFLQ==}
engines: {node: '>=6.9.0'}
@@ -1942,9 +1958,17 @@ packages:
resolution: {integrity: sha512-qQ5m48eI/MFLQ5PxQj4PFaprjyCTLI37ElWMmNs0K8Lk3dVeOdNpB3ks8jc7yM5CDmVC73eMVk/trk3fgmrUpA==}
engines: {node: '>=6.9.0'}
'@babel/types@7.29.7':
resolution: {integrity: sha512-4zBIxpPzowiZpusoFkyGVwakdRJUyuH5PxQ/PrqghfdFWWasvnCdPfQXHrenDai+gyLARulZjZowCOj6fjT4pA==}
engines: {node: '>=6.9.0'}
'@bcoe/v8-coverage@0.2.3':
resolution: {integrity: sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==}
'@bcoe/v8-coverage@1.0.2':
resolution: {integrity: sha512-6zABk/ECA/QYSCQ1NGiVwwbQerUCZ+TQbp64Q3AgmfNvurHH0j8TtXa1qbShXA6qqkpAj4V5W8pP6mLe1mcMqA==}
engines: {node: '>=18'}
'@borewit/text-codec@0.2.1':
resolution: {integrity: sha512-k7vvKPbf7J2fZ5klGRD9AeKfUvojuZIQ3BT5u7Jfv+puwXkUBUT5PVyMDfJZpy30CBDXGMgw7fguK/lpOMBvgw==}
@@ -5373,6 +5397,15 @@ packages:
babel-plugin-react-compiler:
optional: true
'@vitest/coverage-v8@4.1.6':
resolution: {integrity: sha512-36l628fQ/9a/8ihy97eOtEnvWQEdqULQOJtcaxtoNq0G1w3Mxd4szSahOaMM9/NGyZ+hyKcMtIW/WIxq0XQViQ==}
peerDependencies:
'@vitest/browser': 4.1.6
vitest: 4.1.6
peerDependenciesMeta:
'@vitest/browser':
optional: true
'@vitest/expect@4.1.6':
resolution: {integrity: sha512-7EHDquPthALSV0jhhjgEW8FXaviMx7rSqu8W6oqCoAuOhKov814P99QDV1pxMA3QPv21YudvJngIhjrNI4opLg==}
@@ -5658,6 +5691,9 @@ packages:
resolution: {integrity: sha512-Izi8RQcffqCeNVgFigKli1ssklIbpHnCYc6AknXGYoB6grJqyeby7jv12JUQgmTAnIDnbck1uxksT4dzN3PWBA==}
engines: {node: '>=12'}
ast-v8-to-istanbul@1.0.4:
resolution: {integrity: sha512-0bC0/4bTSrnwdhU3IsZDwEdojvuPrSg59OYZfKsLRtJZ0u8VBx9DebfqqG8bRdCC0I7vjgxmPi41P0lpkhJHtA==}
async-lock@1.4.1:
resolution: {integrity: sha512-Az2ZTpuytrtqENulXwO3GGv1Bztugx6TT37NIo7imr/Qo0gsYiGtSdBa2B6fsXhTpVZDNfu1Qn3pk531e3q+nQ==}
@@ -7575,6 +7611,10 @@ packages:
resolution: {integrity: sha512-BewmUXImeuRk2YY0PVbxgKAysvhRPUQE0h5QRM++nVWyubKGV0l8qQ5op8+B2DOmwSe63Jivj0BjkPQVf8fP5g==}
engines: {node: '>=8'}
istanbul-reports@3.2.0:
resolution: {integrity: sha512-HGYWWS/ehqTV3xN10i23tkPkpH46MLCIMFNCaaKNavAXTF1RkqxawEPtnjnGZ6XKSInBKkiOA5BKS+aZiY3AvA==}
engines: {node: '>=8'}
iterare@1.2.1:
resolution: {integrity: sha512-RKYVTCjAnRthyJes037NX/IiqeidgN1xc3j1RjFfECFp28A1GVwK9nA+i0rJPaHqSZwygLzRnFlzUuHFoWWy+Q==}
engines: {node: '>=6'}
@@ -7800,6 +7840,9 @@ packages:
js-tiktoken@1.0.21:
resolution: {integrity: sha512-biOj/6M5qdgx5TKjDnFT1ymSpM5tbd3ylwDtrQvFQSu0Z7bBYko2dF+W/aUkXUPuk6IVpRxk/3Q2sHOzGlS36g==}
js-tokens@10.0.0:
resolution: {integrity: sha512-lM/UBzQmfJRo9ABXbPWemivdCW8V2G8FHaHdypQaIy523snUjog0W71ayWXTjiR+ixeMyVHN2XcpnTd/liPg/Q==}
js-tokens@4.0.0:
resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==}
@@ -8245,6 +8288,9 @@ packages:
magic-string@0.30.21:
resolution: {integrity: sha512-vd2F4YUyEXKGcLHoq+TEyCjxueSeHnFxyyjNp80yg0XV4vUhnDer/lvvlqM/arB5bXQN5K2/3oinyCRyx8T2CQ==}
magicast@0.5.3:
resolution: {integrity: sha512-pVKE4UdSQ7DvHzivsCIFx2BJn1mHG6KsyrFcaxFx6tONdneEuThrDx0Cj3AMg58KyN4pzYT+LHOotxDQDjNvkw==}
make-dir@2.1.0:
resolution: {integrity: sha512-LS9X+dc8KLxXCb8dni79fLIIUA5VyZoyjSMCwTluaXA0o27cCK0bhXkpgw+sTXVpPy/lSO57ilRixqk0vDmtRA==}
engines: {node: '>=6'}
@@ -11540,8 +11586,12 @@ snapshots:
'@babel/helper-string-parser@7.27.1': {}
'@babel/helper-string-parser@7.29.7': {}
'@babel/helper-validator-identifier@7.28.5': {}
'@babel/helper-validator-identifier@7.29.7': {}
'@babel/helper-validator-option@7.27.1': {}
'@babel/helper-wrap-function@7.22.20':
@@ -11563,6 +11613,10 @@ snapshots:
dependencies:
'@babel/types': 7.28.5
'@babel/parser@7.29.7':
dependencies:
'@babel/types': 7.29.7
'@babel/plugin-bugfix-safari-id-destructuring-collision-in-function-expression@7.23.3(@babel/core@7.28.5)':
dependencies:
'@babel/core': 7.28.5
@@ -12168,8 +12222,15 @@ snapshots:
'@babel/helper-string-parser': 7.27.1
'@babel/helper-validator-identifier': 7.28.5
'@babel/types@7.29.7':
dependencies:
'@babel/helper-string-parser': 7.29.7
'@babel/helper-validator-identifier': 7.29.7
'@bcoe/v8-coverage@0.2.3': {}
'@bcoe/v8-coverage@1.0.2': {}
'@borewit/text-codec@0.2.1': {}
'@braintree/sanitize-url@6.0.2': {}
@@ -15818,6 +15879,20 @@ snapshots:
'@rolldown/pluginutils': 1.0.0-rc.7
vite: 8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3)
'@vitest/coverage-v8@4.1.6(vitest@4.1.6)':
dependencies:
'@bcoe/v8-coverage': 1.0.2
'@vitest/utils': 4.1.6
ast-v8-to-istanbul: 1.0.4
istanbul-lib-coverage: 3.2.2
istanbul-lib-report: 3.0.1
istanbul-reports: 3.2.0
magicast: 0.5.3
obug: 2.1.1
std-env: 4.1.0
tinyrainbow: 3.1.0
vitest: 4.1.6(@opentelemetry/api@1.9.0)(@types/node@22.19.1)(@vitest/coverage-v8@4.1.6)(happy-dom@20.8.9)(jsdom@25.0.0)(vite@8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3))
'@vitest/expect@4.1.6':
dependencies:
'@standard-schema/spec': 1.1.0
@@ -16147,6 +16222,12 @@ snapshots:
assertion-error@2.0.1: {}
ast-v8-to-istanbul@1.0.4:
dependencies:
'@jridgewell/trace-mapping': 0.3.31
estree-walker: 3.0.3
js-tokens: 10.0.0
async-lock@1.4.1: {}
async-mutex@0.5.0:
@@ -18358,6 +18439,11 @@ snapshots:
html-escaper: 2.0.2
istanbul-lib-report: 3.0.1
istanbul-reports@3.2.0:
dependencies:
html-escaper: 2.0.2
istanbul-lib-report: 3.0.1
iterare@1.2.1: {}
iterator.prototype@1.1.5:
@@ -18768,6 +18854,8 @@ snapshots:
dependencies:
base64-js: 1.5.1
js-tokens@10.0.0: {}
js-tokens@4.0.0: {}
js-yaml@3.14.2:
@@ -19204,6 +19292,12 @@ snapshots:
dependencies:
'@jridgewell/sourcemap-codec': 1.5.5
magicast@0.5.3:
dependencies:
'@babel/parser': 7.29.7
'@babel/types': 7.29.7
source-map-js: 1.2.1
make-dir@2.1.0:
dependencies:
pify: 4.0.1
@@ -21528,7 +21622,7 @@ snapshots:
tsx: 4.21.0
yaml: 2.8.3
vitest@4.1.6(@opentelemetry/api@1.9.0)(@types/node@22.19.1)(happy-dom@20.8.9)(jsdom@25.0.0)(vite@8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3)):
vitest@4.1.6(@opentelemetry/api@1.9.0)(@types/node@22.19.1)(@vitest/coverage-v8@4.1.6)(happy-dom@20.8.9)(jsdom@25.0.0)(vite@8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3)):
dependencies:
'@vitest/expect': 4.1.6
'@vitest/mocker': 4.1.6(vite@8.0.5(@types/node@22.19.1)(esbuild@0.28.0)(jiti@2.4.2)(less@4.2.0)(sugarss@5.0.1(postcss@8.5.14))(terser@5.39.0)(tsx@4.21.0)(yaml@2.8.3))
@@ -21553,6 +21647,7 @@ snapshots:
optionalDependencies:
'@opentelemetry/api': 1.9.0
'@types/node': 22.19.1
'@vitest/coverage-v8': 4.1.6(vitest@4.1.6)
happy-dom: 20.8.9
jsdom: 25.0.0
transitivePeerDependencies: