feat(dictation): add realtime streaming STT (live dictation)
Layer an optional realtime speech-to-text path on top of the existing batch dictation, so transcribed text appears as the user speaks. Transport A2: browser <-> our server (Socket.IO `/ai-realtime`) <-> OpenAI Realtime (raw ws). The provider API key never leaves the server; the upstream URL is SSRF-checked before connecting; the gateway enforces the dictation+dictationRealtime gate, cookie-JWT auth and per-user/ per-workspace concurrency caps. Implemented against the GA (2026) OpenAI Realtime transcription contract (session.update / audio.input.format / server_vad), not the now-removed beta shape. Editor UI B2: interim text is shown as a meta-only ProseMirror ghost decoration (no Yjs/history noise); only completed segments are committed. Chat shows interim as a dimmed tail. The mic button switches realtime vs batch by the workspace flag; batch remains the default and fallback. Server: - AiRealtimeService (upstream ws proxy, normalized events, idle/max- duration timeouts, idempotent teardown) + parseUpstreamEvent unit tests - AiRealtimeGateway (Socket.IO `/ai-realtime`) wired into AiChatModule - admin-gated POST /ai-chat/realtime/test connectivity probe - config: settings.ai.dictationRealtime + provider sttRealtimeModel/ sttRealtimeBaseUrl (realtime key reuses sttApiKey; no new secret) Client: - pcm16 AudioWorklet (24kHz mono PCM16), RealtimeDictationClient, use-realtime-dictation hook (status/start/stop/cancel + onInterim/onFinal) - RealtimeMicButton + dictation-interim ProseMirror decoration - editor/chat integration + AI settings UI (toggle, model, test endpoint) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1179,6 +1179,11 @@
|
||||
"Semantic search": "Semantic search",
|
||||
"Voice / STT": "Voice / STT",
|
||||
"Voice dictation": "Voice dictation",
|
||||
"Realtime dictation": "Realtime dictation",
|
||||
"Realtime model": "Realtime model",
|
||||
"Realtime endpoint": "Realtime endpoint",
|
||||
"Streams audio live and inserts text as you speak (requires an OpenAI-compatible Realtime endpoint)": "Streams audio live and inserts text as you speak (requires an OpenAI-compatible Realtime endpoint)",
|
||||
"Leave empty to use the STT base URL": "Leave empty to use the STT base URL",
|
||||
"Voice dictation is not available yet.": "Voice dictation is not available yet.",
|
||||
"Test endpoint": "Test endpoint",
|
||||
"Save endpoints": "Save endpoints",
|
||||
|
||||
@@ -1,11 +1,19 @@
|
||||
import { KeyboardEvent } from "react";
|
||||
import { ActionIcon, Group, Textarea, Tooltip } from "@mantine/core";
|
||||
import { KeyboardEvent, useState } from "react";
|
||||
import {
|
||||
ActionIcon,
|
||||
Group,
|
||||
Stack,
|
||||
Text,
|
||||
Textarea,
|
||||
Tooltip,
|
||||
} from "@mantine/core";
|
||||
import { IconPlayerStopFilled, IconSend } from "@tabler/icons-react";
|
||||
import { useTranslation } from "react-i18next";
|
||||
import { useAtom, useAtomValue } from "jotai";
|
||||
import { aiChatDraftAtom } from "@/features/ai-chat/atoms/ai-chat-atom.ts";
|
||||
import { workspaceAtom } from "@/features/user/atoms/current-user-atom";
|
||||
import { MicButton } from "@/features/dictation/components/mic-button";
|
||||
import { RealtimeMicButton } from "@/features/dictation/components/realtime-mic-button";
|
||||
|
||||
interface ChatInputProps {
|
||||
onSend: (text: string) => void;
|
||||
@@ -29,12 +37,17 @@ export default function ChatInput({
|
||||
const [value, setValue] = useAtom(aiChatDraftAtom);
|
||||
const workspace = useAtomValue(workspaceAtom);
|
||||
const isDictationEnabled = workspace?.settings?.ai?.dictation === true;
|
||||
const isRealtime = workspace?.settings?.ai?.dictationRealtime === true;
|
||||
// Live interim (partial) transcript shown as a dimmed tail under the input.
|
||||
const [interim, setInterim] = useState("");
|
||||
|
||||
const send = (): void => {
|
||||
const text = value.trim();
|
||||
if (!text || isStreaming || disabled) return;
|
||||
onSend(text);
|
||||
setValue("");
|
||||
// Drop any leftover partial when a message is sent.
|
||||
setInterim("");
|
||||
};
|
||||
|
||||
const handleKeyDown = (e: KeyboardEvent<HTMLTextAreaElement>): void => {
|
||||
@@ -45,7 +58,8 @@ export default function ChatInput({
|
||||
};
|
||||
|
||||
return (
|
||||
<Group gap="xs" align="flex-end" wrap="nowrap">
|
||||
<Stack gap="xs">
|
||||
<Group gap="xs" align="flex-end" wrap="nowrap">
|
||||
<Textarea
|
||||
style={{ flex: 1 }}
|
||||
placeholder={t("Ask the AI agent…")}
|
||||
@@ -61,13 +75,24 @@ export default function ChatInput({
|
||||
// switch), so a fresh chat lands with the cursor ready in the field.
|
||||
autoFocus
|
||||
/>
|
||||
{isDictationEnabled && (
|
||||
<MicButton
|
||||
size="lg"
|
||||
disabled={isStreaming || disabled}
|
||||
onText={(text) => setValue((v) => (v ? `${v} ${text}` : text))}
|
||||
/>
|
||||
)}
|
||||
{isDictationEnabled &&
|
||||
(isRealtime ? (
|
||||
<RealtimeMicButton
|
||||
size="lg"
|
||||
disabled={isStreaming || disabled}
|
||||
onInterim={(text) => setInterim(text)}
|
||||
onFinal={(text) => {
|
||||
setValue((v) => (v ? `${v} ${text}` : text));
|
||||
setInterim("");
|
||||
}}
|
||||
/>
|
||||
) : (
|
||||
<MicButton
|
||||
size="lg"
|
||||
disabled={isStreaming || disabled}
|
||||
onText={(text) => setValue((v) => (v ? `${v} ${text}` : text))}
|
||||
/>
|
||||
))}
|
||||
{isStreaming ? (
|
||||
<Tooltip label={t("Stop")} withArrow>
|
||||
<ActionIcon
|
||||
@@ -93,6 +118,12 @@ export default function ChatInput({
|
||||
</ActionIcon>
|
||||
</Tooltip>
|
||||
)}
|
||||
</Group>
|
||||
</Group>
|
||||
{interim && (
|
||||
<Text size="sm" c="dimmed">
|
||||
{interim}
|
||||
</Text>
|
||||
)}
|
||||
</Stack>
|
||||
);
|
||||
}
|
||||
|
||||
33
apps/client/src/features/dictation/audio/audio-worklet.d.ts
vendored
Normal file
33
apps/client/src/features/dictation/audio/audio-worklet.d.ts
vendored
Normal file
@@ -0,0 +1,33 @@
|
||||
// Minimal ambient declarations for the AudioWorklet global scope.
|
||||
//
|
||||
// The client tsconfig only pulls in the DOM libs (no "webworker"/"audioworklet"
|
||||
// lib), so the symbols available inside an AudioWorkletProcessor module are not
|
||||
// known to `tsc`. These declarations are intentionally narrow: just enough for
|
||||
// `pcm16-worklet.ts` to typecheck, matching the Web Audio API spec shapes used
|
||||
// by that processor. They describe the worklet global scope, not the main thread.
|
||||
|
||||
declare abstract class AudioWorkletProcessor {
|
||||
// Message channel back to the main thread (used to transfer PCM16 buffers).
|
||||
readonly port: MessagePort;
|
||||
|
||||
constructor();
|
||||
|
||||
// Called for each render quantum. `inputs`/`outputs` are channel arrays
|
||||
// indexed as [input][channel][sample]; `parameters` maps AudioParam names to
|
||||
// their per-sample (or single-value) Float32Array. Return `true` to keep the
|
||||
// processor alive.
|
||||
abstract process(
|
||||
inputs: Float32Array[][],
|
||||
outputs: Float32Array[][],
|
||||
parameters: Record<string, Float32Array>,
|
||||
): boolean;
|
||||
}
|
||||
|
||||
// Registers a processor class under a name usable from `new AudioWorkletNode`.
|
||||
declare function registerProcessor(
|
||||
name: string,
|
||||
processorCtor: new () => AudioWorkletProcessor,
|
||||
): void;
|
||||
|
||||
// The render context's sample rate, in Hz, available in the worklet global scope.
|
||||
declare const sampleRate: number;
|
||||
123
apps/client/src/features/dictation/audio/pcm16-worklet.ts
Normal file
123
apps/client/src/features/dictation/audio/pcm16-worklet.ts
Normal file
@@ -0,0 +1,123 @@
|
||||
// Self-contained AudioWorkletProcessor that turns the live microphone stream into
|
||||
// PCM16 (signed 16-bit, little-endian), mono, 24000 Hz chunks for the realtime STT
|
||||
// upstream. It runs in the AudioWorklet global scope, so it MUST NOT import anything
|
||||
// (the worklet module has no module graph / bundler runtime around it).
|
||||
//
|
||||
// Per `process()` call the host hands us a render quantum (typically 128 frames) at
|
||||
// the context sample rate. We read the first input channel (mono), linearly resample
|
||||
// to 24000 Hz while carrying the fractional read position across calls (so we never
|
||||
// assume a particular input rate, e.g. 44.1k or 48k), accumulate the resampled
|
||||
// samples, and once we have ~150 ms worth (3600 samples) we emit them as an
|
||||
// Int16 ArrayBuffer transferred to the main thread.
|
||||
|
||||
// Target output rate required by the upstream transcription contract.
|
||||
const TARGET_RATE = 24000;
|
||||
// ~150 ms of audio at the target rate: 24000 * 0.15 = 3600 samples per message.
|
||||
const FRAME_SAMPLES = Math.round(TARGET_RATE * 0.15);
|
||||
|
||||
class Pcm16Worklet extends AudioWorkletProcessor {
|
||||
// Fractional read position within the CURRENT quantum, in input-sample units.
|
||||
// Kept across `process()` calls so resampling has no per-quantum seams. After a
|
||||
// quantum it is rebased relative to the next quantum's start, so a value in
|
||||
// [-1, 0) means "interpolate between the previous quantum's last sample and the
|
||||
// next quantum's first sample".
|
||||
private resamplePos = 0;
|
||||
|
||||
// The previous quantum's last input sample, used to interpolate across the
|
||||
// boundary between two render quanta (the conceptual sample at index -1).
|
||||
private prevSample = 0;
|
||||
|
||||
// True once at least one sample has been seen (so `prevSample` is meaningful).
|
||||
private primed = false;
|
||||
|
||||
// Accumulated resampled Float32 samples awaiting conversion + flush.
|
||||
private acc: Float32Array = new Float32Array(FRAME_SAMPLES);
|
||||
private accLen = 0;
|
||||
|
||||
process(inputs: Float32Array[][]): boolean {
|
||||
const input = inputs[0];
|
||||
// No connected input (or a momentarily empty quantum): keep the node alive
|
||||
// and emit silence below.
|
||||
const channel = input && input.length > 0 ? input[0] : undefined;
|
||||
|
||||
if (channel && channel.length > 0) {
|
||||
this.resampleAndAccumulate(channel);
|
||||
}
|
||||
|
||||
// Drive silence to the output so connecting this node to destination keeps
|
||||
// the graph running without echoing the microphone back to the speakers.
|
||||
return true;
|
||||
}
|
||||
|
||||
// Linearly resample `channel` (at the context `sampleRate`) to TARGET_RATE and
|
||||
// push the results into the accumulator, flushing whole frames as they fill.
|
||||
private resampleAndAccumulate(channel: Float32Array): void {
|
||||
const ratio = sampleRate / TARGET_RATE; // input samples consumed per output sample
|
||||
const n = channel.length;
|
||||
|
||||
if (!this.primed) {
|
||||
// First quantum: there is no real predecessor, so seed the virtual index -1
|
||||
// with this quantum's first sample and start reading from 0.
|
||||
this.prevSample = channel[0];
|
||||
this.primed = true;
|
||||
this.resamplePos = 0;
|
||||
}
|
||||
|
||||
let pos = this.resamplePos;
|
||||
|
||||
// Emit output samples whose RIGHT neighbor (floor + 1) is available within
|
||||
// this quantum, i.e. while floor + 1 <= n - 1 ⇔ pos < n - 1. The left
|
||||
// neighbor at floor === -1 is the carried `prevSample`; floor >= 0 reads the
|
||||
// quantum directly. Any leftover position (whose right neighbor would be the
|
||||
// NEXT quantum's first sample) is carried via `resamplePos` and resolved on
|
||||
// the next call. This guarantees we never read `channel[n]` (out of bounds).
|
||||
while (pos < n - 1) {
|
||||
const floor = Math.floor(pos);
|
||||
const frac = pos - floor;
|
||||
|
||||
const s0 = floor < 0 ? this.prevSample : channel[floor];
|
||||
const s1 = channel[floor + 1];
|
||||
|
||||
this.pushSample(s0 + (s1 - s0) * frac);
|
||||
pos += ratio;
|
||||
}
|
||||
|
||||
// Rebase the leftover position relative to the next quantum's start and carry
|
||||
// this quantum's last sample as the predecessor for the boundary interval.
|
||||
this.resamplePos = pos - n;
|
||||
this.prevSample = channel[n - 1];
|
||||
}
|
||||
|
||||
// Append one resampled sample; flush a full PCM16 frame whenever the
|
||||
// accumulator reaches FRAME_SAMPLES.
|
||||
private pushSample(sample: number): void {
|
||||
this.acc[this.accLen] = sample;
|
||||
this.accLen += 1;
|
||||
if (this.accLen >= FRAME_SAMPLES) {
|
||||
this.flush();
|
||||
}
|
||||
}
|
||||
|
||||
// Convert the accumulated Float32 samples to Int16 LE and post the ArrayBuffer
|
||||
// to the main thread, transferring ownership (zero-copy). DataView writes are
|
||||
// little-endian to match the PCM16 contract regardless of host endianness.
|
||||
private flush(): void {
|
||||
const count = this.accLen;
|
||||
if (count === 0) return;
|
||||
|
||||
const buffer = new ArrayBuffer(count * 2);
|
||||
const view = new DataView(buffer);
|
||||
for (let i = 0; i < count; i++) {
|
||||
// Clamp to [-1, 1] then scale to the signed 16-bit range.
|
||||
let s = this.acc[i];
|
||||
if (s > 1) s = 1;
|
||||
else if (s < -1) s = -1;
|
||||
view.setInt16(i * 2, s < 0 ? s * 0x8000 : s * 0x7fff, true);
|
||||
}
|
||||
this.accLen = 0;
|
||||
|
||||
this.port.postMessage(buffer, [buffer]);
|
||||
}
|
||||
}
|
||||
|
||||
registerProcessor("pcm16-worklet", Pcm16Worklet);
|
||||
@@ -0,0 +1,84 @@
|
||||
import { FC, useEffect, useRef } from "react";
|
||||
import { ActionIcon, Tooltip } from "@mantine/core";
|
||||
import { IconMicrophone, IconPlayerStopFilled } from "@tabler/icons-react";
|
||||
import { useTranslation } from "react-i18next";
|
||||
import {
|
||||
useRealtimeDictation,
|
||||
type RealtimeDictationStatus,
|
||||
} from "@/features/dictation/hooks/use-realtime-dictation";
|
||||
|
||||
interface RealtimeMicButtonProps {
|
||||
onInterim: (text: string) => void;
|
||||
onFinal: (text: string) => void;
|
||||
onStart?: () => void;
|
||||
disabled?: boolean;
|
||||
// Mantine ActionIcon size token; "lg" matches the chat composer, "md" the
|
||||
// editor toolbar.
|
||||
size?: "md" | "lg";
|
||||
}
|
||||
|
||||
/**
|
||||
* Streaming sibling of MicButton. Drives the realtime dictation state machine:
|
||||
* a click starts recording (mic icon), a second click stops it (stop icon).
|
||||
* Interim/final transcripts are surfaced through the onInterim/onFinal props as
|
||||
* they arrive; there is no "transcribing" state because final text lands
|
||||
* incrementally while recording. Mirrors MicButton's look and tooltips.
|
||||
*/
|
||||
export const RealtimeMicButton: FC<RealtimeMicButtonProps> = ({
|
||||
onInterim,
|
||||
onFinal,
|
||||
onStart,
|
||||
disabled,
|
||||
size = "lg",
|
||||
}) => {
|
||||
const { t } = useTranslation();
|
||||
const { status, start, stop } = useRealtimeDictation({
|
||||
onInterim,
|
||||
onFinal,
|
||||
onStart,
|
||||
});
|
||||
const iconSize = size === "lg" ? 18 : 16;
|
||||
|
||||
// When recording ends (status leaves "recording" for idle/error), clear any
|
||||
// leftover partial in the consumer once. Tracked via the previous status so
|
||||
// it only fires on the transition, not on every render.
|
||||
const prevStatusRef = useRef<RealtimeDictationStatus>(status);
|
||||
useEffect(() => {
|
||||
if (prevStatusRef.current === "recording" && status !== "recording") {
|
||||
onInterim("");
|
||||
}
|
||||
prevStatusRef.current = status;
|
||||
}, [status, onInterim]);
|
||||
|
||||
if (status === "recording") {
|
||||
return (
|
||||
<Tooltip label={t("Stop recording")} withArrow>
|
||||
<ActionIcon
|
||||
size={size}
|
||||
color="red"
|
||||
variant="light"
|
||||
onClick={stop}
|
||||
aria-label={t("Stop recording")}
|
||||
>
|
||||
<IconPlayerStopFilled size={iconSize} />
|
||||
</ActionIcon>
|
||||
</Tooltip>
|
||||
);
|
||||
}
|
||||
|
||||
// idle / error: subtle mic to (re)start. No spinner — there is no separate
|
||||
// transcribing phase in the realtime flow.
|
||||
return (
|
||||
<Tooltip label={t("Start dictation")} withArrow>
|
||||
<ActionIcon
|
||||
size={size}
|
||||
variant="subtle"
|
||||
onClick={() => void start()}
|
||||
disabled={disabled}
|
||||
aria-label={t("Start dictation")}
|
||||
>
|
||||
<IconMicrophone size={iconSize} />
|
||||
</ActionIcon>
|
||||
</Tooltip>
|
||||
);
|
||||
};
|
||||
@@ -0,0 +1,427 @@
|
||||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
import { notifications } from "@mantine/notifications";
|
||||
import { useTranslation } from "react-i18next";
|
||||
import { RealtimeDictationClient } from "@/features/dictation/services/realtime-dictation-client";
|
||||
|
||||
// The worklet module URL is produced via `new URL(..., import.meta.url)` so Vite
|
||||
// emits the processor as a separate, self-contained module chunk (it must run in
|
||||
// the AudioWorklet global scope, outside the main bundle). Built once at module
|
||||
// load — the resolved URL is stable for the app's lifetime.
|
||||
const PCM16_WORKLET_URL = new URL(
|
||||
"../audio/pcm16-worklet.ts",
|
||||
import.meta.url,
|
||||
);
|
||||
|
||||
export type RealtimeDictationStatus = "idle" | "recording" | "error";
|
||||
|
||||
export interface UseRealtimeDictationOptions {
|
||||
onInterim: (text: string) => void; // latest partial for the live segment
|
||||
onFinal: (text: string) => void; // a completed segment (trimmed)
|
||||
onStart?: () => void; // fired right when capture begins (caret snapshot)
|
||||
maxDurationMs?: number; // default 120000
|
||||
}
|
||||
|
||||
export interface UseRealtimeDictationResult {
|
||||
status: RealtimeDictationStatus;
|
||||
start: () => Promise<void>;
|
||||
stop: () => void;
|
||||
cancel: () => void;
|
||||
}
|
||||
|
||||
// AudioContext is webkit-prefixed on some older Safari builds; keep a typed
|
||||
// fallback so the hook never crashes when the standard name is missing.
|
||||
function getAudioContextCtor(): typeof AudioContext | undefined {
|
||||
if (typeof AudioContext !== "undefined") return AudioContext;
|
||||
const w = window as unknown as { webkitAudioContext?: typeof AudioContext };
|
||||
return w.webkitAudioContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Streaming sibling of `use-dictation`. Captures the mic, resamples to PCM16
|
||||
* 24 kHz in an AudioWorklet, and streams it over the normalized `/ai-realtime`
|
||||
* Socket.IO namespace, surfacing interim/final transcripts as they arrive.
|
||||
*
|
||||
* Mirrors `use-dictation`'s conventions: refs hold the live graph/client/timers
|
||||
* so re-renders never lose them, getUserMedia errors map to the same Mantine
|
||||
* notifications, and every exit path stops the MediaStream tracks and closes the
|
||||
* AudioContext. There is no `transcribing` state — final text arrives
|
||||
* incrementally while `recording`.
|
||||
*/
|
||||
export function useRealtimeDictation(
|
||||
options: UseRealtimeDictationOptions,
|
||||
): UseRealtimeDictationResult {
|
||||
const { t, i18n } = useTranslation();
|
||||
const [status, setStatus] = useState<RealtimeDictationStatus>("idle");
|
||||
|
||||
// Keep the latest callbacks in a ref so async socket handlers always call the
|
||||
// current handlers without re-creating the capture graph.
|
||||
const optionsRef = useRef(options);
|
||||
optionsRef.current = options;
|
||||
|
||||
const streamRef = useRef<MediaStream | null>(null);
|
||||
const audioContextRef = useRef<AudioContext | null>(null);
|
||||
const sourceRef = useRef<MediaStreamAudioSourceNode | null>(null);
|
||||
const workletRef = useRef<AudioWorkletNode | null>(null);
|
||||
const clientRef = useRef<RealtimeDictationClient | null>(null);
|
||||
|
||||
const timerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
const errorTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
|
||||
const canceledRef = useRef(false);
|
||||
const startingRef = useRef(false);
|
||||
// True once the server emits `ready`; audio is buffered until then, then flushed.
|
||||
const readyRef = useRef(false);
|
||||
// PCM16 chunks captured before the upstream session is ready.
|
||||
const pendingAudioRef = useRef<ArrayBuffer[]>([]);
|
||||
// Stable ref to the latest stop() so the max-duration timer (armed inside the
|
||||
// start closure) can invoke the current version without re-arming every render.
|
||||
const stopRef = useRef<() => void>(() => undefined);
|
||||
|
||||
const clearTimer = useCallback(() => {
|
||||
if (timerRef.current !== null) {
|
||||
clearTimeout(timerRef.current);
|
||||
timerRef.current = null;
|
||||
}
|
||||
}, []);
|
||||
|
||||
const stopTracks = useCallback(() => {
|
||||
streamRef.current?.getTracks().forEach((track) => track.stop());
|
||||
streamRef.current = null;
|
||||
}, []);
|
||||
|
||||
// Tear down the audio graph (worklet node, source, context). Never throws on a
|
||||
// half-built or already-closed graph.
|
||||
const teardownAudio = useCallback(() => {
|
||||
const worklet = workletRef.current;
|
||||
if (worklet) {
|
||||
worklet.port.onmessage = null;
|
||||
try {
|
||||
worklet.disconnect();
|
||||
} catch {
|
||||
// Node may already be disconnected; ignore.
|
||||
}
|
||||
workletRef.current = null;
|
||||
}
|
||||
|
||||
const source = sourceRef.current;
|
||||
if (source) {
|
||||
try {
|
||||
source.disconnect();
|
||||
} catch {
|
||||
// Ignore disconnect of an already-detached node.
|
||||
}
|
||||
sourceRef.current = null;
|
||||
}
|
||||
|
||||
const ctx = audioContextRef.current;
|
||||
if (ctx) {
|
||||
audioContextRef.current = null;
|
||||
if (ctx.state !== "closed") {
|
||||
// close() returns a promise; swallow rejections so teardown never throws.
|
||||
void ctx.close().catch(() => undefined);
|
||||
}
|
||||
}
|
||||
}, []);
|
||||
|
||||
// Full teardown shared by stop/cancel/unmount. Order: stop streaming upstream,
|
||||
// disconnect the socket, then dismantle the local audio graph and tracks, then
|
||||
// clear timers and reset the ready/pending state.
|
||||
const teardown = useCallback(() => {
|
||||
const client = clientRef.current;
|
||||
if (client) {
|
||||
clientRef.current = null;
|
||||
try {
|
||||
client.stop();
|
||||
} catch {
|
||||
// Socket may already be gone; ignore.
|
||||
}
|
||||
client.disconnect();
|
||||
}
|
||||
|
||||
teardownAudio();
|
||||
stopTracks();
|
||||
clearTimer();
|
||||
|
||||
readyRef.current = false;
|
||||
pendingAudioRef.current = [];
|
||||
startingRef.current = false;
|
||||
}, [teardownAudio, stopTracks, clearTimer]);
|
||||
|
||||
// Surface a concrete failure: log it, notify, flip to "error", and reset to
|
||||
// "idle" after a short delay (mirrors use-dictation's error timer).
|
||||
const handleError = useCallback(
|
||||
(message: string, err?: unknown) => {
|
||||
if (canceledRef.current) return;
|
||||
// Never log audio — only the textual reason.
|
||||
console.error("[realtime-dictation]", message, err ?? "");
|
||||
notifications.show({ color: "red", message });
|
||||
teardown();
|
||||
setStatus("error");
|
||||
if (errorTimerRef.current !== null) {
|
||||
clearTimeout(errorTimerRef.current);
|
||||
}
|
||||
errorTimerRef.current = setTimeout(() => {
|
||||
errorTimerRef.current = null;
|
||||
setStatus("idle");
|
||||
}, 1500);
|
||||
},
|
||||
[teardown],
|
||||
);
|
||||
|
||||
const start = useCallback(async (): Promise<void> => {
|
||||
// Synchronous live guard: status is stale between renders, so also block on
|
||||
// refs to prevent a double-click from opening two MediaStreams / sockets.
|
||||
if (
|
||||
startingRef.current ||
|
||||
streamRef.current ||
|
||||
audioContextRef.current ||
|
||||
clientRef.current
|
||||
) {
|
||||
return;
|
||||
}
|
||||
if (status !== "idle") return;
|
||||
startingRef.current = true;
|
||||
canceledRef.current = false;
|
||||
readyRef.current = false;
|
||||
pendingAudioRef.current = [];
|
||||
|
||||
if (!navigator.mediaDevices?.getUserMedia) {
|
||||
const reason =
|
||||
"navigator.mediaDevices.getUserMedia is unavailable in this context";
|
||||
console.error("[realtime-dictation] " + reason);
|
||||
notifications.show({
|
||||
color: "red",
|
||||
message: t("Audio recording is not available in this browser/context"),
|
||||
});
|
||||
setStatus("idle");
|
||||
startingRef.current = false;
|
||||
return;
|
||||
}
|
||||
|
||||
let stream: MediaStream;
|
||||
try {
|
||||
stream = await navigator.mediaDevices.getUserMedia({ audio: true });
|
||||
} catch (err) {
|
||||
// Always log the full error for diagnosis (name, message, stack).
|
||||
console.error("[realtime-dictation] getUserMedia failed", err);
|
||||
const name = (err as { name?: string })?.name;
|
||||
const detail = (err as { message?: string })?.message ?? String(err);
|
||||
let message: string;
|
||||
if (name === "NotAllowedError" || name === "SecurityError") {
|
||||
message = t("Microphone access denied");
|
||||
} else if (name === "NotFoundError" || name === "OverconstrainedError") {
|
||||
message = t("No microphone found");
|
||||
} else if (name === "NotReadableError" || name === "AbortError") {
|
||||
message = t("Microphone is unavailable or already in use");
|
||||
} else {
|
||||
// Unknown failure: show the real reason instead of a generic string.
|
||||
message = `${t("Could not start recording")}: ${name ? `${name}: ` : ""}${detail}`;
|
||||
}
|
||||
notifications.show({ color: "red", message });
|
||||
setStatus("idle");
|
||||
startingRef.current = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// If a cancel landed during the await, drop the stream and bail out.
|
||||
if (canceledRef.current) {
|
||||
stream.getTracks().forEach((track) => track.stop());
|
||||
startingRef.current = false;
|
||||
setStatus("idle");
|
||||
return;
|
||||
}
|
||||
streamRef.current = stream;
|
||||
|
||||
// Build the capture graph. The worklet still resamples robustly if the browser
|
||||
// ignores the 24 kHz hint, so any actual context rate is handled correctly.
|
||||
const AudioCtx = getAudioContextCtor();
|
||||
if (!AudioCtx) {
|
||||
stopTracks();
|
||||
notifications.show({
|
||||
color: "red",
|
||||
message: t("Audio recording is not available in this browser/context"),
|
||||
});
|
||||
setStatus("idle");
|
||||
startingRef.current = false;
|
||||
return;
|
||||
}
|
||||
|
||||
let audioContext: AudioContext;
|
||||
try {
|
||||
audioContext = new AudioCtx({ sampleRate: 24000 });
|
||||
audioContextRef.current = audioContext;
|
||||
// AudioWorklet requires a secure context (https/localhost), same constraint
|
||||
// as getUserMedia. A failure here means the UI should fall back to batch.
|
||||
await audioContext.audioWorklet.addModule(PCM16_WORKLET_URL);
|
||||
} catch (err) {
|
||||
console.error("[realtime-dictation] audio worklet setup failed", err);
|
||||
teardownAudio();
|
||||
stopTracks();
|
||||
const detail = (err as { message?: string })?.message ?? String(err);
|
||||
notifications.show({
|
||||
color: "red",
|
||||
message: `${t("Could not start recording")}: ${detail}`,
|
||||
});
|
||||
setStatus("idle");
|
||||
startingRef.current = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Another cancel could have landed during addModule().
|
||||
if (canceledRef.current) {
|
||||
teardownAudio();
|
||||
stopTracks();
|
||||
startingRef.current = false;
|
||||
setStatus("idle");
|
||||
return;
|
||||
}
|
||||
|
||||
let source: MediaStreamAudioSourceNode;
|
||||
let worklet: AudioWorkletNode;
|
||||
try {
|
||||
source = audioContext.createMediaStreamSource(stream);
|
||||
worklet = new AudioWorkletNode(audioContext, "pcm16-worklet");
|
||||
sourceRef.current = source;
|
||||
workletRef.current = worklet;
|
||||
// MediaStreamSource → worklet → destination. The worklet emits silence, so
|
||||
// connecting to destination drives the render graph without echoing the mic.
|
||||
source.connect(worklet);
|
||||
worklet.connect(audioContext.destination);
|
||||
} catch (err) {
|
||||
console.error("[realtime-dictation] audio graph wiring failed", err);
|
||||
teardownAudio();
|
||||
stopTracks();
|
||||
const detail = (err as { message?: string })?.message ?? String(err);
|
||||
notifications.show({
|
||||
color: "red",
|
||||
message: `${t("Could not start recording")}: ${detail}`,
|
||||
});
|
||||
setStatus("idle");
|
||||
startingRef.current = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Each worklet message is a PCM16 ArrayBuffer. Forward it once the upstream
|
||||
// session is ready; until then buffer so no leading audio is dropped.
|
||||
worklet.port.onmessage = (event: MessageEvent) => {
|
||||
if (canceledRef.current) return;
|
||||
const buf = event.data as ArrayBuffer;
|
||||
if (!(buf instanceof ArrayBuffer)) return;
|
||||
if (readyRef.current && clientRef.current) {
|
||||
clientRef.current.sendAudio(buf);
|
||||
} else {
|
||||
pendingAudioRef.current.push(buf);
|
||||
}
|
||||
};
|
||||
|
||||
// Wire the realtime transport. The server replies `ready` once the upstream
|
||||
// STT session is live; we then flush any buffered audio.
|
||||
const client = new RealtimeDictationClient({
|
||||
onReady: () => {
|
||||
if (canceledRef.current) return;
|
||||
readyRef.current = true;
|
||||
const pending = pendingAudioRef.current;
|
||||
pendingAudioRef.current = [];
|
||||
for (const buf of pending) clientRef.current?.sendAudio(buf);
|
||||
},
|
||||
onInterim: (_itemId, text) => {
|
||||
if (canceledRef.current) return;
|
||||
optionsRef.current.onInterim(text);
|
||||
},
|
||||
onFinal: (_itemId, text) => {
|
||||
if (canceledRef.current) return;
|
||||
const trimmed = text.trim();
|
||||
if (trimmed.length > 0) optionsRef.current.onFinal(trimmed);
|
||||
},
|
||||
onError: (message) => {
|
||||
handleError(message);
|
||||
},
|
||||
onClosed: () => {
|
||||
// The server ended the session (idle/max-duration or graceful upstream
|
||||
// close). Skip if a cancel already tore everything down, or if an error
|
||||
// path already owns the status (its error→idle timer is pending), or if a
|
||||
// local stop already cleared the live refs. Otherwise tear down the capture
|
||||
// graph + socket and return to idle so the mic/AudioContext don't leak and
|
||||
// the button doesn't stay stuck on "recording".
|
||||
if (canceledRef.current) return;
|
||||
if (errorTimerRef.current !== null) return;
|
||||
if (
|
||||
!clientRef.current &&
|
||||
!audioContextRef.current &&
|
||||
!streamRef.current
|
||||
) {
|
||||
return;
|
||||
}
|
||||
teardown();
|
||||
setStatus("idle");
|
||||
},
|
||||
});
|
||||
clientRef.current = client;
|
||||
|
||||
// Notify the caller right when capture begins (before opening the socket) so
|
||||
// the editor can snapshot the caret position.
|
||||
try {
|
||||
optionsRef.current.onStart?.();
|
||||
} catch (err) {
|
||||
console.error("[realtime-dictation] onStart callback threw", err);
|
||||
}
|
||||
|
||||
// Open the socket, then ask the server to open the upstream session. The
|
||||
// language hint is the base subtag of the resolved UI language (e.g. "en-US"
|
||||
// → "en"), since the upstream transcription model expects an ISO language
|
||||
// code, not a region-tagged locale; the server omits it upstream when absent.
|
||||
client.connect();
|
||||
const locale = i18n.resolvedLanguage || i18n.language || "";
|
||||
const language = locale.split("-")[0] || undefined;
|
||||
client.start({ language });
|
||||
|
||||
setStatus("recording");
|
||||
// Capture has truly begun; release the synchronous start guard.
|
||||
startingRef.current = false;
|
||||
|
||||
const maxDurationMs = optionsRef.current.maxDurationMs ?? 120000;
|
||||
timerRef.current = setTimeout(() => {
|
||||
// Reuse stop() so the upstream is flushed/closed gracefully.
|
||||
stopRef.current?.();
|
||||
}, maxDurationMs);
|
||||
}, [status, t, i18n, stopTracks, teardownAudio, handleError]);
|
||||
|
||||
const stop = useCallback((): void => {
|
||||
// Nothing live → no-op (never crash on an idle/destroyed state).
|
||||
if (
|
||||
!clientRef.current &&
|
||||
!audioContextRef.current &&
|
||||
!streamRef.current &&
|
||||
!startingRef.current
|
||||
) {
|
||||
return;
|
||||
}
|
||||
teardown();
|
||||
setStatus("idle");
|
||||
}, [teardown]);
|
||||
|
||||
// Keep the stop ref pointed at the latest stop() for the max-duration timer.
|
||||
stopRef.current = stop;
|
||||
|
||||
const cancel = useCallback((): void => {
|
||||
// Mark canceled first so any late socket/worklet callbacks are ignored.
|
||||
canceledRef.current = true;
|
||||
teardown();
|
||||
setStatus("idle");
|
||||
}, [teardown]);
|
||||
|
||||
// Clean up on unmount: stop tracks, close the context/worklet, disconnect the
|
||||
// socket, and clear timers.
|
||||
useEffect(() => {
|
||||
return () => {
|
||||
canceledRef.current = true;
|
||||
if (errorTimerRef.current !== null) {
|
||||
clearTimeout(errorTimerRef.current);
|
||||
errorTimerRef.current = null;
|
||||
}
|
||||
teardown();
|
||||
};
|
||||
}, [teardown]);
|
||||
|
||||
return { status, start, stop, cancel };
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
import { io, Socket } from "socket.io-client";
|
||||
import { SOCKET_URL } from "@/features/websocket/types";
|
||||
|
||||
// Handlers the hook supplies; the client translates the normalized `/ai-realtime`
|
||||
// Socket.IO events into these callbacks. The client itself owns no React state —
|
||||
// it is a thin transport wrapper so the hook can stay focused on the audio graph.
|
||||
export interface RealtimeDictationHandlers {
|
||||
// Upstream STT session is established; safe to start sending audio.
|
||||
onReady: () => void;
|
||||
// Latest partial transcript for the current (not-yet-final) segment.
|
||||
onInterim: (itemId: string, text: string) => void;
|
||||
// A completed segment's transcript.
|
||||
onFinal: (itemId: string, text: string) => void;
|
||||
// Concrete failure reason (connect error or server-surfaced error).
|
||||
onError: (message: string) => void;
|
||||
// Session ended (graceful stop or upstream closed).
|
||||
onClosed: () => void;
|
||||
}
|
||||
|
||||
interface StartOptions {
|
||||
language?: string;
|
||||
}
|
||||
|
||||
// Wraps the dedicated `/ai-realtime` Socket.IO namespace. Cookie-based auth rides
|
||||
// the handshake via `withCredentials` (no bearer token), exactly like the main
|
||||
// app socket. `autoConnect: false` lets the hook wire listeners up before the
|
||||
// handshake fires so no early event is missed.
|
||||
export class RealtimeDictationClient {
|
||||
private socket: Socket | null = null;
|
||||
// onError must fire at most once per session: the server `error` and socket
|
||||
// `connect_error` can both arrive (e.g. an error then a failed reconnect), but
|
||||
// the hook owns the error→idle flow and a second call would double-fire it.
|
||||
private erroredFlag = false;
|
||||
|
||||
constructor(private readonly handlers: RealtimeDictationHandlers) {}
|
||||
|
||||
// Forward the first error reason only; later error/connect_error are swallowed.
|
||||
private emitError(message: string): void {
|
||||
if (this.erroredFlag) return;
|
||||
this.erroredFlag = true;
|
||||
this.handlers.onError(message);
|
||||
}
|
||||
|
||||
// Create the socket, register listeners, then open the connection. Safe to call
|
||||
// once per client instance; a second call is a no-op while a socket exists.
|
||||
connect(): void {
|
||||
if (this.socket) return;
|
||||
// Fresh socket → allow onError to fire again for this connection.
|
||||
this.erroredFlag = false;
|
||||
|
||||
// SOCKET_URL is undefined in this app (socket.io derives the page origin), so
|
||||
// the `/ai-realtime` namespace rides the same `/socket.io` path as the main
|
||||
// socket — which the Vite dev server proxies as a websocket.
|
||||
const socket: Socket = SOCKET_URL
|
||||
? io(`${SOCKET_URL}/ai-realtime`, {
|
||||
transports: ["websocket"],
|
||||
withCredentials: true,
|
||||
autoConnect: false,
|
||||
})
|
||||
: io("/ai-realtime", {
|
||||
transports: ["websocket"],
|
||||
withCredentials: true,
|
||||
autoConnect: false,
|
||||
});
|
||||
|
||||
this.socket = socket;
|
||||
|
||||
socket.on("ready", () => this.handlers.onReady());
|
||||
|
||||
socket.on("interim", (payload: { itemId: string; text: string }) => {
|
||||
this.handlers.onInterim(payload?.itemId ?? "", payload?.text ?? "");
|
||||
});
|
||||
|
||||
socket.on("final", (payload: { itemId: string; text: string }) => {
|
||||
this.handlers.onFinal(payload?.itemId ?? "", payload?.text ?? "");
|
||||
});
|
||||
|
||||
socket.on("error", (payload: { message?: string } | string) => {
|
||||
const message =
|
||||
typeof payload === "string"
|
||||
? payload
|
||||
: payload?.message || "Realtime dictation error";
|
||||
this.emitError(message);
|
||||
});
|
||||
|
||||
socket.on("closed", () => this.handlers.onClosed());
|
||||
|
||||
// Low-level transport failure (handshake/auth/proxy). Surface a concrete cause.
|
||||
socket.on("connect_error", (err: Error) => {
|
||||
const message = err?.message
|
||||
? `Realtime connection failed: ${err.message}`
|
||||
: "Realtime connection failed";
|
||||
this.emitError(message);
|
||||
});
|
||||
|
||||
socket.connect();
|
||||
}
|
||||
|
||||
// Ask the server to resolve config and open the upstream STT session.
|
||||
start(opts: StartOptions): void {
|
||||
this.socket?.emit("start", { language: opts.language });
|
||||
}
|
||||
|
||||
// Forward a raw PCM16 chunk; socket.io serializes the ArrayBuffer as binary.
|
||||
sendAudio(buf: ArrayBuffer): void {
|
||||
this.socket?.emit("audio", buf);
|
||||
}
|
||||
|
||||
// Request a graceful flush/close of the upstream session.
|
||||
stop(): void {
|
||||
this.socket?.emit("stop");
|
||||
}
|
||||
|
||||
// Tear down: drop every listener and close the connection. Idempotent.
|
||||
disconnect(): void {
|
||||
const socket = this.socket;
|
||||
if (!socket) return;
|
||||
this.socket = null;
|
||||
// Reset so a subsequent connect() on a reused instance can error again.
|
||||
this.erroredFlag = false;
|
||||
socket.removeAllListeners();
|
||||
socket.disconnect();
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,21 @@
|
||||
import { FC, useRef } from "react";
|
||||
import type { Editor } from "@tiptap/react";
|
||||
import { useAtomValue } from "jotai";
|
||||
import { MicButton } from "@/features/dictation/components/mic-button";
|
||||
import { RealtimeMicButton } from "@/features/dictation/components/realtime-mic-button";
|
||||
import { workspaceAtom } from "@/features/user/atoms/current-user-atom";
|
||||
import {
|
||||
setDictationInterim,
|
||||
clearDictationInterim,
|
||||
} from "@/features/editor/extensions/dictation-interim/dictation-interim.ts";
|
||||
|
||||
interface Props {
|
||||
editor: Editor;
|
||||
}
|
||||
|
||||
export const DictationGroup: FC<Props> = ({ editor }) => {
|
||||
const workspace = useAtomValue(workspaceAtom);
|
||||
const isRealtime = workspace?.settings?.ai?.dictationRealtime === true;
|
||||
const rangeRef = useRef<{ from: number; to: number } | null>(null);
|
||||
|
||||
const handleStart = () => {
|
||||
@@ -50,6 +59,33 @@ export const DictationGroup: FC<Props> = ({ editor }) => {
|
||||
}
|
||||
};
|
||||
|
||||
// Realtime path: commit each final segment at the LIVE caret (inserts happen
|
||||
// during recording, so no fixed snapshot is needed); interim is shown via the
|
||||
// ghost decoration only.
|
||||
if (isRealtime) {
|
||||
return (
|
||||
<RealtimeMicButton
|
||||
size="md"
|
||||
disabled={!editor.isEditable}
|
||||
onStart={() => {
|
||||
if (editor && !editor.isDestroyed) clearDictationInterim(editor);
|
||||
}}
|
||||
onInterim={(text) => {
|
||||
if (editor && !editor.isDestroyed) setDictationInterim(editor, text);
|
||||
}}
|
||||
onFinal={(text) => {
|
||||
if (!editor || editor.isDestroyed) return;
|
||||
clearDictationInterim(editor);
|
||||
try {
|
||||
editor.chain().focus().insertContent(`${text} `).run();
|
||||
} catch {
|
||||
// The editor may have been destroyed mid-stream; ignore.
|
||||
}
|
||||
}}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<MicButton
|
||||
size="md"
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
import { Extension } from "@tiptap/core";
|
||||
import type { Editor } from "@tiptap/core";
|
||||
import { Plugin, PluginKey } from "@tiptap/pm/state";
|
||||
import { Decoration, DecorationSet } from "@tiptap/pm/view";
|
||||
|
||||
// Plugin key shared by the extension and the imperative helpers below so they
|
||||
// dispatch/read the same plugin state.
|
||||
const dictationInterimKey = new PluginKey<DictationInterimState>(
|
||||
"dictationInterim",
|
||||
);
|
||||
|
||||
interface DictationInterimState {
|
||||
// The current interim (partial) transcript. Empty string means "no ghost".
|
||||
text: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* B2 editor decoration: shows the realtime interim (partial) transcript as a
|
||||
* ghost widget at the caret. The interim is held ONLY in plugin meta state and
|
||||
* rendered as a widget Decoration — it is NEVER written into the document, so
|
||||
* it produces no Yjs update and no history entry. Only final segments are
|
||||
* committed (by the dictation-group / chat consumers).
|
||||
*/
|
||||
export const DictationInterim = Extension.create({
|
||||
name: "dictationInterim",
|
||||
|
||||
addProseMirrorPlugins() {
|
||||
return [
|
||||
new Plugin<DictationInterimState>({
|
||||
key: dictationInterimKey,
|
||||
state: {
|
||||
init: (): DictationInterimState => ({ text: "" }),
|
||||
apply: (tr, value): DictationInterimState => {
|
||||
const meta = tr.getMeta(dictationInterimKey) as
|
||||
| DictationInterimState
|
||||
| undefined;
|
||||
// Meta-only updates replace the interim text; everything else keeps
|
||||
// the existing value (it follows the caret on its own since the
|
||||
// decoration is recomputed against the live selection).
|
||||
if (meta) {
|
||||
return { text: meta.text };
|
||||
}
|
||||
return value;
|
||||
},
|
||||
},
|
||||
props: {
|
||||
decorations(state) {
|
||||
const pluginState = dictationInterimKey.getState(state);
|
||||
const text = pluginState?.text ?? "";
|
||||
if (!text) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Render the interim as an inline ghost at the caret. Inline styles
|
||||
// keep this self-contained — no global CSS is required.
|
||||
const widget = Decoration.widget(
|
||||
state.selection.head,
|
||||
() => {
|
||||
const span = document.createElement("span");
|
||||
span.textContent = text;
|
||||
span.setAttribute("contenteditable", "false");
|
||||
span.style.opacity = "0.5";
|
||||
span.style.fontStyle = "italic";
|
||||
return span;
|
||||
},
|
||||
{ side: 1, ignoreSelection: true },
|
||||
);
|
||||
|
||||
return DecorationSet.create(state.doc, [widget]);
|
||||
},
|
||||
},
|
||||
}),
|
||||
];
|
||||
},
|
||||
});
|
||||
|
||||
/**
|
||||
* Set the interim ghost text via a META-ONLY transaction — no doc steps, so it
|
||||
* generates no Yjs update and no history entry.
|
||||
*/
|
||||
export function setDictationInterim(editor: Editor, text: string): void {
|
||||
editor.view.dispatch(
|
||||
editor.state.tr.setMeta(dictationInterimKey, { text }),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the interim ghost text via a META-ONLY transaction (same no-op-on-doc
|
||||
* guarantee as setDictationInterim).
|
||||
*/
|
||||
export function clearDictationInterim(editor: Editor): void {
|
||||
editor.view.dispatch(
|
||||
editor.state.tr.setMeta(dictationInterimKey, { text: "" }),
|
||||
);
|
||||
}
|
||||
|
||||
export default DictationInterim;
|
||||
@@ -123,6 +123,7 @@ import { countWords } from "alfaaz";
|
||||
import AutoJoiner from "@/features/editor/extensions/autojoiner.ts";
|
||||
import GlobalDragHandle from "@/features/editor/extensions/drag-handle.ts";
|
||||
import { CleanStyles } from "@/features/editor/extensions/clean-styles.ts";
|
||||
import { DictationInterim } from "@/features/editor/extensions/dictation-interim/dictation-interim.ts";
|
||||
|
||||
const lowlight = createLowlight(common);
|
||||
lowlight.register("mermaid", plaintext);
|
||||
@@ -343,6 +344,7 @@ export const mainExtensions = [
|
||||
},
|
||||
}),
|
||||
Selection,
|
||||
DictationInterim,
|
||||
Attachment.configure({
|
||||
view: AttachmentView,
|
||||
}),
|
||||
|
||||
@@ -32,6 +32,7 @@ import {
|
||||
useAiSettingsQuery,
|
||||
useReindexAiEmbeddingsMutation,
|
||||
useTestAiConnectionMutation,
|
||||
useTestRealtimeConnectionMutation,
|
||||
useUpdateAiSettingsMutation,
|
||||
} from "@/features/workspace/queries/ai-settings-query.ts";
|
||||
import {
|
||||
@@ -62,6 +63,10 @@ const formSchema = z.object({
|
||||
// STT-specific fields. Empty base URL / key fall back to the chat ones.
|
||||
sttModel: z.string(),
|
||||
sttBaseUrl: z.string(),
|
||||
// Realtime (streaming) STT fields. Empty model falls back to sttModel and
|
||||
// empty base URL falls back to the STT base URL server-side.
|
||||
sttRealtimeModel: z.string(),
|
||||
sttRealtimeBaseUrl: z.string(),
|
||||
sttApiStyle: z.enum(["multipart", "json"]),
|
||||
sttApiKey: z.string(),
|
||||
});
|
||||
@@ -176,6 +181,8 @@ export default function AiProviderSettings() {
|
||||
const chatTest = useTestAiConnectionMutation();
|
||||
const embedTest = useTestAiConnectionMutation();
|
||||
const sttTest = useTestAiConnectionMutation();
|
||||
// Realtime probe hits a separate /ai-chat/realtime/test route (admin-gated).
|
||||
const realtimeTest = useTestRealtimeConnectionMutation();
|
||||
|
||||
// Agent roles drive the public-share assistant identity picker. Admin-gated
|
||||
// (the component returns early for non-admins), same as the AI settings query.
|
||||
@@ -192,6 +199,8 @@ export default function AiProviderSettings() {
|
||||
const [dictationEnabled, setDictationEnabled] = useState<boolean>(
|
||||
workspace?.settings?.ai?.dictation ?? false,
|
||||
);
|
||||
const [realtimeDictationEnabled, setRealtimeDictationEnabled] =
|
||||
useState<boolean>(workspace?.settings?.ai?.dictationRealtime ?? false);
|
||||
const [publicShareAssistantEnabled, setPublicShareAssistantEnabled] =
|
||||
useState<boolean>(
|
||||
workspace?.settings?.ai?.publicShareAssistant ?? false,
|
||||
@@ -199,6 +208,10 @@ export default function AiProviderSettings() {
|
||||
const [chatToggleLoading, setChatToggleLoading] = useState(false);
|
||||
const [searchToggleLoading, setSearchToggleLoading] = useState(false);
|
||||
const [dictationToggleLoading, setDictationToggleLoading] = useState(false);
|
||||
const [
|
||||
realtimeDictationToggleLoading,
|
||||
setRealtimeDictationToggleLoading,
|
||||
] = useState(false);
|
||||
const [
|
||||
publicShareAssistantToggleLoading,
|
||||
setPublicShareAssistantToggleLoading,
|
||||
@@ -232,6 +245,8 @@ export default function AiProviderSettings() {
|
||||
embeddingApiKey: "",
|
||||
sttModel: "",
|
||||
sttBaseUrl: "",
|
||||
sttRealtimeModel: "",
|
||||
sttRealtimeBaseUrl: "",
|
||||
sttApiStyle: "multipart" as SttApiStyle,
|
||||
sttApiKey: "",
|
||||
},
|
||||
@@ -253,6 +268,8 @@ export default function AiProviderSettings() {
|
||||
embeddingApiKey: "",
|
||||
sttModel: settings.sttModel ?? "",
|
||||
sttBaseUrl: settings.sttBaseUrl ?? "",
|
||||
sttRealtimeModel: settings.sttRealtimeModel ?? "",
|
||||
sttRealtimeBaseUrl: settings.sttRealtimeBaseUrl ?? "",
|
||||
sttApiStyle: settings.sttApiStyle ?? "multipart",
|
||||
sttApiKey: "",
|
||||
});
|
||||
@@ -287,6 +304,10 @@ export default function AiProviderSettings() {
|
||||
// server-side.
|
||||
sttModel: values.sttModel,
|
||||
sttBaseUrl: values.sttBaseUrl,
|
||||
// Realtime STT: empty model falls back to sttModel, empty base URL falls
|
||||
// back to the STT base URL server-side.
|
||||
sttRealtimeModel: values.sttRealtimeModel,
|
||||
sttRealtimeBaseUrl: values.sttRealtimeBaseUrl,
|
||||
sttApiStyle: values.sttApiStyle,
|
||||
};
|
||||
|
||||
@@ -434,6 +455,35 @@ export default function AiProviderSettings() {
|
||||
}
|
||||
}
|
||||
|
||||
// Optimistic toggle for the "Realtime dictation" feature
|
||||
// (settings.ai.dictationRealtime). Layered on top of batch dictation.
|
||||
async function handleToggleRealtimeDictation(value: boolean) {
|
||||
setRealtimeDictationToggleLoading(true);
|
||||
const previous = realtimeDictationEnabled;
|
||||
setRealtimeDictationEnabled(value);
|
||||
try {
|
||||
const updated = await updateWorkspace({ aiDictationRealtime: value });
|
||||
setWorkspace({
|
||||
...updated,
|
||||
settings: {
|
||||
...updated.settings,
|
||||
ai: { ...updated.settings?.ai, dictationRealtime: value },
|
||||
},
|
||||
});
|
||||
notifications.show({ message: t("Updated successfully") });
|
||||
} catch (err) {
|
||||
setRealtimeDictationEnabled(previous);
|
||||
const message = (err as { response?: { data?: { message?: string } } })
|
||||
?.response?.data?.message;
|
||||
notifications.show({
|
||||
message: message ?? t("Failed to update data"),
|
||||
color: "red",
|
||||
});
|
||||
} finally {
|
||||
setRealtimeDictationToggleLoading(false);
|
||||
}
|
||||
}
|
||||
|
||||
// Optimistic toggle for the anonymous public-share AI assistant
|
||||
// (settings.ai.publicShareAssistant). When off, the public endpoint 404s.
|
||||
async function handleTogglePublicShareAssistant(value: boolean) {
|
||||
@@ -853,13 +903,24 @@ export default function AiProviderSettings() {
|
||||
<StatusDot status={sttStatus} label={cardStatusLabel(sttStatus, t)} />
|
||||
<Text fw={600}>{t("Voice / STT")}</Text>
|
||||
</Group>
|
||||
<Switch
|
||||
label={t("Voice dictation")}
|
||||
labelPosition="left"
|
||||
checked={dictationEnabled}
|
||||
disabled={dictationToggleLoading}
|
||||
onChange={(e) => handleToggleDictation(e.currentTarget.checked)}
|
||||
/>
|
||||
<Group gap="md" align="center" wrap="nowrap">
|
||||
<Switch
|
||||
label={t("Voice dictation")}
|
||||
labelPosition="left"
|
||||
checked={dictationEnabled}
|
||||
disabled={dictationToggleLoading}
|
||||
onChange={(e) => handleToggleDictation(e.currentTarget.checked)}
|
||||
/>
|
||||
<Switch
|
||||
label={t("Realtime dictation")}
|
||||
labelPosition="left"
|
||||
checked={realtimeDictationEnabled}
|
||||
disabled={realtimeDictationToggleLoading}
|
||||
onChange={(e) =>
|
||||
handleToggleRealtimeDictation(e.currentTarget.checked)
|
||||
}
|
||||
/>
|
||||
</Group>
|
||||
</Group>
|
||||
<Text size="xs" c="dimmed" mt={4} mb="md">
|
||||
{t(
|
||||
@@ -954,6 +1015,58 @@ export default function AiProviderSettings() {
|
||||
</Text>
|
||||
))}
|
||||
</Group>
|
||||
|
||||
{/* Realtime (streaming) dictation: layered on top of batch STT and only
|
||||
shown when the workspace toggle is on. Model falls back to the STT
|
||||
model and the endpoint falls back to the STT base URL server-side. */}
|
||||
{realtimeDictationEnabled && (
|
||||
<>
|
||||
<Text size="xs" c="dimmed" mt="md" mb="xs">
|
||||
{t(
|
||||
"Streams audio live and inserts text as you speak (requires an OpenAI-compatible Realtime endpoint)",
|
||||
)}
|
||||
</Text>
|
||||
|
||||
<TextInput
|
||||
label={t("Realtime model")}
|
||||
placeholder="gpt-4o-mini-transcribe"
|
||||
disabled={isLoading}
|
||||
{...form.getInputProps("sttRealtimeModel")}
|
||||
/>
|
||||
|
||||
<TextInput
|
||||
mt="sm"
|
||||
label={t("Realtime endpoint")}
|
||||
description={t(
|
||||
"Leave empty to use the STT base URL",
|
||||
)}
|
||||
placeholder={t("Leave empty to use the STT base URL")}
|
||||
disabled={isLoading}
|
||||
{...form.getInputProps("sttRealtimeBaseUrl")}
|
||||
/>
|
||||
|
||||
<Group mt="md" align="center">
|
||||
<Button
|
||||
variant="default"
|
||||
size="sm"
|
||||
loading={realtimeTest.isPending}
|
||||
onClick={() => realtimeTest.mutate()}
|
||||
>
|
||||
{t("Test endpoint")}
|
||||
</Button>
|
||||
{realtimeTest.data &&
|
||||
(realtimeTest.data.ok ? (
|
||||
<Text size="sm" c="green">
|
||||
{t("Connection successful")}
|
||||
</Text>
|
||||
) : (
|
||||
<Text size="sm" c="red">
|
||||
{realtimeTest.data.error || t("Connection failed")}
|
||||
</Text>
|
||||
))}
|
||||
</Group>
|
||||
</>
|
||||
)}
|
||||
</Paper>
|
||||
|
||||
{/* Nested: external MCP tools the agent calls out to */}
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
getAiSettings,
|
||||
updateAiSettings,
|
||||
testAiConnection,
|
||||
testRealtimeConnection,
|
||||
reindexAiEmbeddings,
|
||||
IAiSettings,
|
||||
IAiSettingsUpdate,
|
||||
@@ -55,6 +56,12 @@ export function useTestAiConnectionMutation() {
|
||||
});
|
||||
}
|
||||
|
||||
export function useTestRealtimeConnectionMutation() {
|
||||
return useMutation<IAiTestResult, Error, void>({
|
||||
mutationFn: () => testRealtimeConnection(),
|
||||
});
|
||||
}
|
||||
|
||||
export function useReindexAiEmbeddingsMutation() {
|
||||
const { t } = useTranslation();
|
||||
const queryClient = useQueryClient();
|
||||
|
||||
@@ -32,6 +32,8 @@ export interface IAiSettings {
|
||||
// key is stored (empty means "uses the chat API key").
|
||||
sttModel?: string;
|
||||
sttBaseUrl?: string;
|
||||
sttRealtimeModel?: string;
|
||||
sttRealtimeBaseUrl?: string;
|
||||
sttApiStyle?: SttApiStyle;
|
||||
hasSttApiKey: boolean;
|
||||
// RAG indexing coverage (pages indexed for semantic search).
|
||||
@@ -59,6 +61,8 @@ export interface IAiSettingsUpdate {
|
||||
embeddingApiKey?: string;
|
||||
sttModel?: string;
|
||||
sttBaseUrl?: string;
|
||||
sttRealtimeModel?: string;
|
||||
sttRealtimeBaseUrl?: string;
|
||||
sttApiStyle?: SttApiStyle;
|
||||
// Write-only STT key (same semantics as `apiKey` / `embeddingApiKey`).
|
||||
sttApiKey?: string;
|
||||
@@ -95,6 +99,14 @@ export async function testAiConnection(
|
||||
return req.data;
|
||||
}
|
||||
|
||||
// Probes the realtime (streaming STT) endpoint. Unlike the other tests this
|
||||
// route lives under /ai-chat (not /workspace/ai-settings); it is admin-gated
|
||||
// server-side and returns the same { ok, error? } envelope at req.data.
|
||||
export async function testRealtimeConnection(): Promise<IAiTestResult> {
|
||||
const req = await api.post<IAiTestResult>("/ai-chat/realtime/test");
|
||||
return req.data;
|
||||
}
|
||||
|
||||
export async function reindexAiEmbeddings(): Promise<IAiSettings> {
|
||||
const req = await api.post<IAiSettings>("/workspace/ai-settings/reindex");
|
||||
return req.data;
|
||||
|
||||
@@ -25,6 +25,7 @@ export interface IWorkspace {
|
||||
mcpEnabled?: boolean;
|
||||
aiChat?: boolean;
|
||||
aiDictation?: boolean;
|
||||
aiDictationRealtime?: boolean;
|
||||
aiPublicShareAssistant?: boolean;
|
||||
trashRetentionDays?: number;
|
||||
restrictApiToAdmins?: boolean;
|
||||
@@ -62,6 +63,7 @@ export interface IWorkspaceAiSettings {
|
||||
mcp?: boolean;
|
||||
chat?: boolean;
|
||||
dictation?: boolean;
|
||||
dictationRealtime?: boolean;
|
||||
publicShareAssistant?: boolean;
|
||||
}
|
||||
|
||||
|
||||
@@ -27,8 +27,14 @@ import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.rep
|
||||
import { UserThrottlerGuard } from '../../integrations/throttle/user-throttler.guard';
|
||||
import { AI_CHAT_THROTTLER } from '../../integrations/throttle/throttler-names';
|
||||
import { FileInterceptor } from '../../common/interceptors/file.interceptor';
|
||||
import WorkspaceAbilityFactory from '../casl/abilities/workspace-ability.factory';
|
||||
import {
|
||||
WorkspaceCaslAction,
|
||||
WorkspaceCaslSubject,
|
||||
} from '../casl/interfaces/workspace-ability.type';
|
||||
import { AiChatService, AiChatStreamBody } from './ai-chat.service';
|
||||
import { AiTranscriptionService } from './ai-transcription.service';
|
||||
import { AiRealtimeService } from './realtime/ai-realtime.service';
|
||||
import {
|
||||
ChatIdDto,
|
||||
GetChatMessagesDto,
|
||||
@@ -51,8 +57,23 @@ export class AiChatController {
|
||||
private readonly aiChatRepo: AiChatRepo,
|
||||
private readonly aiChatMessageRepo: AiChatMessageRepo,
|
||||
private readonly aiTranscription: AiTranscriptionService,
|
||||
private readonly aiRealtimeService: AiRealtimeService,
|
||||
private readonly workspaceAbility: WorkspaceAbilityFactory,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Admin gate, identical to AiSettingsController.assertAdmin: require the
|
||||
* workspace Manage/Settings ability (same gate as POST /workspace/update).
|
||||
*/
|
||||
private assertAdmin(user: User, workspace: Workspace): void {
|
||||
const ability = this.workspaceAbility.createForUser(user, workspace);
|
||||
if (
|
||||
ability.cannot(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Settings)
|
||||
) {
|
||||
throw new ForbiddenException();
|
||||
}
|
||||
}
|
||||
|
||||
/** List the requesting user's chats in this workspace (paginated). */
|
||||
@HttpCode(HttpStatus.OK)
|
||||
@Post('chats')
|
||||
@@ -287,6 +308,23 @@ export class AiChatController {
|
||||
return { text };
|
||||
}
|
||||
|
||||
/**
|
||||
* Admin-only "test connection" probe for the realtime STT upstream. Reuses
|
||||
* AiRealtimeService.openSession to exercise the real config/SSRF/handshake path
|
||||
* and tears the socket down immediately. The API key never leaves the server.
|
||||
* Response is the FROZEN contract { ok: true } | { ok: false, error: string }
|
||||
* (the global response transform wraps it; the client reads req.data).
|
||||
*/
|
||||
@HttpCode(HttpStatus.OK)
|
||||
@Post('realtime/test')
|
||||
async testRealtime(
|
||||
@AuthUser() user: User,
|
||||
@AuthWorkspace() workspace: Workspace,
|
||||
): Promise<{ ok: true } | { ok: false; error: string }> {
|
||||
this.assertAdmin(user, workspace);
|
||||
return this.aiRealtimeService.testConnection(workspace.id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the chat exists, belongs to this workspace, AND was created by the
|
||||
* requesting user (per-user isolation). Throws ForbiddenException otherwise.
|
||||
|
||||
@@ -13,6 +13,8 @@ import { SearchModule } from '../search/search.module';
|
||||
import { PublicShareChatController } from './public-share-chat.controller';
|
||||
import { PublicShareChatService } from './public-share-chat.service';
|
||||
import { PublicShareChatToolsService } from './tools/public-share-chat-tools.service';
|
||||
import { AiRealtimeGateway } from './realtime/ai-realtime.gateway';
|
||||
import { AiRealtimeService } from './realtime/ai-realtime.service';
|
||||
|
||||
/**
|
||||
* Per-user AI chat module (§6.1).
|
||||
@@ -46,6 +48,11 @@ import { PublicShareChatToolsService } from './tools/public-share-chat-tools.ser
|
||||
AiChatToolsService,
|
||||
PublicShareChatService,
|
||||
PublicShareChatToolsService,
|
||||
// Realtime dictation: the Socket.IO `/ai-realtime` gateway + its upstream
|
||||
// proxy service. AiSettingsService comes from AiModule; WorkspaceRepo from
|
||||
// the global DatabaseModule; TokenService from TokenModule (both imported).
|
||||
AiRealtimeGateway,
|
||||
AiRealtimeService,
|
||||
],
|
||||
})
|
||||
export class AiChatModule {}
|
||||
|
||||
236
apps/server/src/core/ai-chat/realtime/ai-realtime.gateway.ts
Normal file
236
apps/server/src/core/ai-chat/realtime/ai-realtime.gateway.ts
Normal file
@@ -0,0 +1,236 @@
|
||||
import {
|
||||
OnGatewayConnection,
|
||||
OnGatewayDisconnect,
|
||||
SubscribeMessage,
|
||||
WebSocketGateway,
|
||||
} from '@nestjs/websockets';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { Socket } from 'socket.io';
|
||||
import * as cookie from 'cookie';
|
||||
import { TokenService } from '../../auth/services/token.service';
|
||||
import { JwtPayload, JwtType } from '../../auth/dto/jwt-payload';
|
||||
import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo';
|
||||
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
|
||||
import { describeProviderError } from '../../../integrations/ai/ai-error.util';
|
||||
import {
|
||||
AiRealtimeService,
|
||||
RealtimeSessionHandle,
|
||||
} from './ai-realtime.service';
|
||||
|
||||
/**
|
||||
* Realtime dictation gateway — the server side of the FROZEN normalized
|
||||
* Socket.IO `/ai-realtime` protocol. The browser talks ONLY to this namespace;
|
||||
* the raw OpenAI GA schema and the provider key never reach the client.
|
||||
*
|
||||
* Client → server: connect (cookie-JWT auth), `start` { language? }, `audio`
|
||||
* (PCM16 binary), `stop`. Server → client: `ready`, `interim`, `final`,
|
||||
* `error`, `closed`.
|
||||
*
|
||||
* Gate (before opening upstream): the workspace must have BOTH
|
||||
* `settings.ai.dictation === true` AND `settings.ai.dictationRealtime === true`.
|
||||
* Hard concurrency caps (realtime is expensive) are enforced in-memory per user
|
||||
* and per workspace.
|
||||
*/
|
||||
|
||||
/** Realtime is expensive: one live session per user, a handful per workspace. */
|
||||
const MAX_SESSIONS_PER_USER = 1;
|
||||
const MAX_SESSIONS_PER_WORKSPACE = 5;
|
||||
|
||||
// Module-level concurrency counters. A single Node process backs the gateway;
|
||||
// these caps are best-effort within that process (a horizontally-scaled
|
||||
// deployment would need a shared store, out of scope here).
|
||||
const sessionsPerUser = new Map<string, number>();
|
||||
const sessionsPerWorkspace = new Map<string, number>();
|
||||
|
||||
function incr(map: Map<string, number>, key: string): number {
|
||||
const next = (map.get(key) ?? 0) + 1;
|
||||
map.set(key, next);
|
||||
return next;
|
||||
}
|
||||
|
||||
function decr(map: Map<string, number>, key: string): void {
|
||||
const next = (map.get(key) ?? 0) - 1;
|
||||
if (next <= 0) {
|
||||
map.delete(key);
|
||||
} else {
|
||||
map.set(key, next);
|
||||
}
|
||||
}
|
||||
|
||||
/** Per-socket state we stash on client.data. */
|
||||
interface RealtimeClientData {
|
||||
userId: string;
|
||||
workspaceId: string;
|
||||
handle?: RealtimeSessionHandle;
|
||||
// What we incremented at connect time, so disconnect decrements exactly that.
|
||||
countedUserId?: string;
|
||||
countedWorkspaceId?: string;
|
||||
}
|
||||
|
||||
@WebSocketGateway({
|
||||
namespace: '/ai-realtime',
|
||||
cors: { origin: '*' },
|
||||
transports: ['websocket'],
|
||||
})
|
||||
export class AiRealtimeGateway
|
||||
implements OnGatewayConnection, OnGatewayDisconnect
|
||||
{
|
||||
private readonly logger = new Logger(AiRealtimeGateway.name);
|
||||
|
||||
constructor(
|
||||
private readonly tokenService: TokenService,
|
||||
private readonly workspaceRepo: WorkspaceRepo,
|
||||
private readonly aiRealtimeService: AiRealtimeService,
|
||||
) {}
|
||||
|
||||
async handleConnection(client: Socket): Promise<void> {
|
||||
try {
|
||||
const cookies = cookie.parse(client.handshake.headers.cookie ?? '');
|
||||
const token: JwtPayload = await this.tokenService.verifyJwt(
|
||||
cookies['authToken'],
|
||||
JwtType.ACCESS,
|
||||
);
|
||||
|
||||
const userId = token.sub;
|
||||
const workspaceId = token.workspaceId;
|
||||
|
||||
const data = client.data as RealtimeClientData;
|
||||
data.userId = userId;
|
||||
data.workspaceId = workspaceId;
|
||||
|
||||
// Gate: realtime dictation must be enabled at the workspace level.
|
||||
const workspace = await this.workspaceRepo.findById(workspaceId);
|
||||
const settings = (workspace?.settings ?? {}) as {
|
||||
ai?: { dictation?: boolean; dictationRealtime?: boolean };
|
||||
};
|
||||
if (
|
||||
settings.ai?.dictation !== true ||
|
||||
settings.ai?.dictationRealtime !== true
|
||||
) {
|
||||
client.emit('error', {
|
||||
message: 'Realtime dictation is not enabled',
|
||||
});
|
||||
client.disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
// Hard concurrency caps (realtime is expensive). Check both before
|
||||
// incrementing either, so a rejected connection leaves the counters clean.
|
||||
const userCount = sessionsPerUser.get(userId) ?? 0;
|
||||
const workspaceCount = sessionsPerWorkspace.get(workspaceId) ?? 0;
|
||||
if (userCount >= MAX_SESSIONS_PER_USER) {
|
||||
client.emit('error', {
|
||||
message:
|
||||
'A realtime dictation session is already active for your account',
|
||||
});
|
||||
client.disconnect();
|
||||
return;
|
||||
}
|
||||
if (workspaceCount >= MAX_SESSIONS_PER_WORKSPACE) {
|
||||
client.emit('error', {
|
||||
message:
|
||||
'The maximum number of concurrent realtime dictation sessions for this workspace has been reached',
|
||||
});
|
||||
client.disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
incr(sessionsPerUser, userId);
|
||||
incr(sessionsPerWorkspace, workspaceId);
|
||||
// Remember exactly what we counted so disconnect decrements symmetrically.
|
||||
data.countedUserId = userId;
|
||||
data.countedWorkspaceId = workspaceId;
|
||||
} catch (err) {
|
||||
// Auth failure (or any unexpected connect error): never leak details.
|
||||
this.logger.error('Realtime dictation connection rejected', err as Error);
|
||||
client.emit('error', { message: 'Unauthorized' });
|
||||
client.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@SubscribeMessage('start')
|
||||
async handleStart(
|
||||
client: Socket,
|
||||
data?: { language?: string },
|
||||
): Promise<void> {
|
||||
const state = client.data as RealtimeClientData;
|
||||
|
||||
// Guard double-start: a session is already open on this socket.
|
||||
if (state.handle) {
|
||||
client.emit('error', {
|
||||
message: 'A realtime dictation session is already in progress',
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const handle = await this.aiRealtimeService.openSession(
|
||||
state.workspaceId,
|
||||
{
|
||||
language: data?.language,
|
||||
onReady: () => client.emit('ready', {}),
|
||||
onInterim: (itemId, text) => client.emit('interim', { itemId, text }),
|
||||
onFinal: (itemId, text) => client.emit('final', { itemId, text }),
|
||||
onError: (message) => client.emit('error', { message }),
|
||||
onClosed: () => {
|
||||
// Session ended (graceful stop, idle/max-duration, or upstream close):
|
||||
// clear the handle so the double-start guard is released, then notify.
|
||||
state.handle = undefined;
|
||||
client.emit('closed', {});
|
||||
},
|
||||
},
|
||||
);
|
||||
state.handle = handle;
|
||||
} catch (err) {
|
||||
// Concrete reason to the client: a not-configured 503 vs a provider error.
|
||||
this.logger.error('Failed to open realtime dictation session', err as Error);
|
||||
const message =
|
||||
err instanceof AiSttNotConfiguredException
|
||||
? err.message
|
||||
: describeProviderError(err, 'Failed to start realtime dictation');
|
||||
client.emit('error', { message });
|
||||
}
|
||||
}
|
||||
|
||||
@SubscribeMessage('audio')
|
||||
handleAudio(client: Socket, payload: unknown): void {
|
||||
const state = client.data as RealtimeClientData;
|
||||
if (!state.handle) return;
|
||||
const chunk = AiRealtimeGateway.toBuffer(payload);
|
||||
if (!chunk) return;
|
||||
state.handle.appendAudio(chunk);
|
||||
}
|
||||
|
||||
@SubscribeMessage('stop')
|
||||
handleStop(client: Socket): void {
|
||||
const state = client.data as RealtimeClientData;
|
||||
state.handle?.stop();
|
||||
}
|
||||
|
||||
handleDisconnect(client: Socket): void {
|
||||
const state = client.data as RealtimeClientData;
|
||||
// Tear down the upstream session, then release the concurrency slots we took.
|
||||
state.handle?.close();
|
||||
state.handle = undefined;
|
||||
if (state.countedUserId) {
|
||||
decr(sessionsPerUser, state.countedUserId);
|
||||
state.countedUserId = undefined;
|
||||
}
|
||||
if (state.countedWorkspaceId) {
|
||||
decr(sessionsPerWorkspace, state.countedWorkspaceId);
|
||||
state.countedWorkspaceId = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize an incoming `audio` payload to a Buffer. Socket.IO delivers binary
|
||||
* as Buffer (Node) but may also surface Uint8Array / ArrayBuffer; accept all.
|
||||
* Returns null for anything we cannot interpret as binary audio.
|
||||
*/
|
||||
private static toBuffer(payload: unknown): Buffer | null {
|
||||
if (Buffer.isBuffer(payload)) return payload;
|
||||
if (payload instanceof Uint8Array) return Buffer.from(payload);
|
||||
if (payload instanceof ArrayBuffer) return Buffer.from(payload);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,186 @@
|
||||
import { parseUpstreamEvent } from './ai-realtime.service';
|
||||
|
||||
/**
|
||||
* Unit tests for the PURE `parseUpstreamEvent` normalizer (no network). They
|
||||
* feed synthetic OpenAI GA frames through a shared per-item delta accumulator
|
||||
* and assert the normalized `/ai-realtime` outputs, including that two deltas
|
||||
* for the same item_id accumulate and that the accumulator is cleared once the
|
||||
* segment completes.
|
||||
*/
|
||||
describe('parseUpstreamEvent (OpenAI GA → normalized realtime events)', () => {
|
||||
let acc: Map<string, string>;
|
||||
|
||||
beforeEach(() => {
|
||||
acc = new Map<string, string>();
|
||||
});
|
||||
|
||||
it('maps session.created / session.updated to { type: "ready" }', () => {
|
||||
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.created' }), acc)).toEqual({
|
||||
type: 'ready',
|
||||
});
|
||||
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.updated' }), acc)).toEqual({
|
||||
type: 'ready',
|
||||
});
|
||||
// No accumulator side effects from session frames.
|
||||
expect(acc.size).toBe(0);
|
||||
});
|
||||
|
||||
it('accumulates two deltas for the same item_id into the interim text', () => {
|
||||
const first = parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.delta',
|
||||
item_id: 'item-1',
|
||||
delta: 'Hello',
|
||||
}),
|
||||
acc,
|
||||
);
|
||||
expect(first).toEqual({ type: 'interim', itemId: 'item-1', text: 'Hello' });
|
||||
|
||||
const second = parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.delta',
|
||||
item_id: 'item-1',
|
||||
delta: ' world',
|
||||
}),
|
||||
acc,
|
||||
);
|
||||
// The second delta appends to the first: the interim is the full running text.
|
||||
expect(second).toEqual({
|
||||
type: 'interim',
|
||||
itemId: 'item-1',
|
||||
text: 'Hello world',
|
||||
});
|
||||
expect(acc.get('item-1')).toBe('Hello world');
|
||||
});
|
||||
|
||||
it('emits a trimmed final from the completed transcript and clears the accumulator', () => {
|
||||
// Seed an in-flight accumulation, then complete it.
|
||||
parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.delta',
|
||||
item_id: 'item-2',
|
||||
delta: 'partial',
|
||||
}),
|
||||
acc,
|
||||
);
|
||||
expect(acc.has('item-2')).toBe(true);
|
||||
|
||||
const final = parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.completed',
|
||||
item_id: 'item-2',
|
||||
transcript: ' Final transcript. ',
|
||||
}),
|
||||
acc,
|
||||
);
|
||||
expect(final).toEqual({
|
||||
type: 'final',
|
||||
itemId: 'item-2',
|
||||
text: 'Final transcript.',
|
||||
});
|
||||
// The accumulator entry for the completed segment is removed.
|
||||
expect(acc.has('item-2')).toBe(false);
|
||||
});
|
||||
|
||||
it('falls back to the accumulated text when completed omits the transcript', () => {
|
||||
parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.delta',
|
||||
item_id: 'item-3',
|
||||
delta: 'accumulated only',
|
||||
}),
|
||||
acc,
|
||||
);
|
||||
const final = parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.completed',
|
||||
item_id: 'item-3',
|
||||
}),
|
||||
acc,
|
||||
);
|
||||
expect(final).toEqual({
|
||||
type: 'final',
|
||||
itemId: 'item-3',
|
||||
text: 'accumulated only',
|
||||
});
|
||||
expect(acc.has('item-3')).toBe(false);
|
||||
});
|
||||
|
||||
it('maps an error frame to { type: "error" } with the provider message', () => {
|
||||
const out = parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'error',
|
||||
error: { message: 'invalid_api_key', code: 'invalid', type: 'auth' },
|
||||
}),
|
||||
acc,
|
||||
);
|
||||
expect(out.type).toBe('error');
|
||||
expect(out.message).toBe('invalid_api_key');
|
||||
});
|
||||
|
||||
it('maps an unknown frame to { type: "ignore" }', () => {
|
||||
expect(
|
||||
parseUpstreamEvent(JSON.stringify({ type: 'response.created' }), acc),
|
||||
).toEqual({ type: 'ignore' });
|
||||
// An unknown frame leaves a running accumulation untouched.
|
||||
expect(acc.size).toBe(0);
|
||||
});
|
||||
|
||||
it('maps an unparseable (non-JSON) frame to { type: "ignore" }', () => {
|
||||
expect(parseUpstreamEvent('not json', acc)).toEqual({ type: 'ignore' });
|
||||
});
|
||||
|
||||
it('runs the full GA sequence end-to-end and ends with a clean accumulator', () => {
|
||||
// session.created → two deltas (same item) → completed → error → unknown.
|
||||
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.created' }), acc)).toEqual({
|
||||
type: 'ready',
|
||||
});
|
||||
|
||||
expect(
|
||||
parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.delta',
|
||||
item_id: 'seg',
|
||||
delta: 'one ',
|
||||
}),
|
||||
acc,
|
||||
),
|
||||
).toEqual({ type: 'interim', itemId: 'seg', text: 'one ' });
|
||||
|
||||
expect(
|
||||
parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.delta',
|
||||
item_id: 'seg',
|
||||
delta: 'two',
|
||||
}),
|
||||
acc,
|
||||
),
|
||||
).toEqual({ type: 'interim', itemId: 'seg', text: 'one two' });
|
||||
|
||||
expect(
|
||||
parseUpstreamEvent(
|
||||
JSON.stringify({
|
||||
type: 'conversation.item.input_audio_transcription.completed',
|
||||
item_id: 'seg',
|
||||
transcript: 'one two',
|
||||
}),
|
||||
acc,
|
||||
),
|
||||
).toEqual({ type: 'final', itemId: 'seg', text: 'one two' });
|
||||
|
||||
expect(
|
||||
parseUpstreamEvent(
|
||||
JSON.stringify({ type: 'error', error: { message: 'boom' } }),
|
||||
acc,
|
||||
),
|
||||
).toEqual({ type: 'error', message: 'boom' });
|
||||
|
||||
expect(parseUpstreamEvent(JSON.stringify({ type: 'whatever' }), acc)).toEqual({
|
||||
type: 'ignore',
|
||||
});
|
||||
|
||||
// Every started segment was completed → the accumulator is empty.
|
||||
expect(acc.size).toBe(0);
|
||||
});
|
||||
});
|
||||
485
apps/server/src/core/ai-chat/realtime/ai-realtime.service.ts
Normal file
485
apps/server/src/core/ai-chat/realtime/ai-realtime.service.ts
Normal file
@@ -0,0 +1,485 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import WebSocket from 'ws';
|
||||
import { AiSettingsService } from '../../../integrations/ai/ai-settings.service';
|
||||
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
|
||||
import { describeProviderError } from '../../../integrations/ai/ai-error.util';
|
||||
import { isUrlAllowed } from '../external-mcp/ssrf-guard';
|
||||
|
||||
/**
|
||||
* Realtime STT proxy (server side of the A2 transport: browser ↔ OUR server ↔
|
||||
* OpenAI). The provider API key is resolved here and NEVER leaves the server /
|
||||
* NEVER logged. The client only ever sees the normalized events emitted via the
|
||||
* callbacks below — never the raw OpenAI GA schema.
|
||||
*
|
||||
* The upstream contract is the GA (2026) OpenAI Realtime transcription shape:
|
||||
* wss://<host>/v1/realtime?intent=transcription
|
||||
* header: Authorization: Bearer <sttApiKey> (NO OpenAI-Beta header in GA)
|
||||
* one session.update after open, then input_audio_buffer.append frames.
|
||||
*/
|
||||
|
||||
/** Normalized result of parsing a single raw upstream (OpenAI GA) event. */
|
||||
export interface ParsedUpstreamEvent {
|
||||
type: 'ready' | 'interim' | 'final' | 'error' | 'ignore';
|
||||
itemId?: string;
|
||||
text?: string;
|
||||
message?: string;
|
||||
}
|
||||
|
||||
/** Callbacks the gateway supplies to bridge upstream events to the client. */
|
||||
export interface OpenSessionOptions {
|
||||
/** Optional transcription language hint (e.g. 'en'); omitted from session.update when absent. */
|
||||
language?: string;
|
||||
/** Upstream session is live → client may start sending audio. */
|
||||
onReady: () => void;
|
||||
/** Latest accumulated partial text for a not-yet-final segment. */
|
||||
onInterim: (itemId: string, text: string) => void;
|
||||
/** A completed segment's final (trimmed) transcript. */
|
||||
onFinal: (itemId: string, text: string) => void;
|
||||
/** Concrete error reason for the client. */
|
||||
onError: (message: string) => void;
|
||||
/** Session ended (graceful stop or upstream close). */
|
||||
onClosed: () => void;
|
||||
}
|
||||
|
||||
/** Handle returned by openSession; the gateway drives audio/stop/close through it. */
|
||||
export interface RealtimeSessionHandle {
|
||||
/** Base64-encode a PCM16 chunk and forward as input_audio_buffer.append (if upstream OPEN). */
|
||||
appendAudio: (chunk: Buffer | Uint8Array) => void;
|
||||
/** Graceful stop: optionally commit, then close the upstream. */
|
||||
stop: () => void;
|
||||
/** Force-close the upstream and clear timers (idempotent). */
|
||||
close: () => void;
|
||||
}
|
||||
|
||||
/** No audio appended for this long → close the session with a clear reason. */
|
||||
const IDLE_TIMEOUT_MS = 15_000;
|
||||
/** Hard cap on a single realtime session's lifetime (mirrors the client's 120s). */
|
||||
const MAX_SESSION_DURATION_MS = 120_000;
|
||||
/** How long testConnection waits for the upstream to become ready before failing. */
|
||||
const TEST_CONNECTION_TIMEOUT_MS = 8_000;
|
||||
|
||||
/**
|
||||
* Parse ONE raw upstream (OpenAI GA) event JSON and normalize it, updating the
|
||||
* per-item delta accumulator `acc` in place. PURE (aside from the supplied `acc`
|
||||
* mutation) so it can be unit-tested without any network. Unknown or unparseable
|
||||
* frames normalize to { type: 'ignore' } so the proxy silently skips them.
|
||||
*
|
||||
* - session.created / session.updated → { type: 'ready' }
|
||||
* - conversation.item.input_audio_transcription.delta → append delta to
|
||||
* acc[item_id]; return { type: 'interim', itemId, text: <accumulated> }
|
||||
* - conversation.item.input_audio_transcription.completed → final transcript
|
||||
* (trimmed), delete acc[item_id]; return { type: 'final', itemId, text }
|
||||
* - error → { type: 'error', message } (provider message, else describeProviderError)
|
||||
* - anything else / unparseable → { type: 'ignore' }
|
||||
*/
|
||||
export function parseUpstreamEvent(
|
||||
raw: string,
|
||||
acc: Map<string, string>,
|
||||
): ParsedUpstreamEvent {
|
||||
let evt: {
|
||||
type?: string;
|
||||
item_id?: string;
|
||||
delta?: string;
|
||||
transcript?: string;
|
||||
error?: { message?: string; code?: string; type?: string };
|
||||
};
|
||||
try {
|
||||
evt = JSON.parse(raw);
|
||||
} catch {
|
||||
// Non-JSON frame: ignore rather than crash the proxy.
|
||||
return { type: 'ignore' };
|
||||
}
|
||||
|
||||
if (typeof evt !== 'object' || evt === null || typeof evt.type !== 'string') {
|
||||
return { type: 'ignore' };
|
||||
}
|
||||
|
||||
switch (evt.type) {
|
||||
case 'session.created':
|
||||
case 'session.updated':
|
||||
return { type: 'ready' };
|
||||
|
||||
case 'conversation.item.input_audio_transcription.delta': {
|
||||
const itemId = evt.item_id;
|
||||
if (!itemId) return { type: 'ignore' };
|
||||
const prev = acc.get(itemId) ?? '';
|
||||
const next = prev + (evt.delta ?? '');
|
||||
acc.set(itemId, next);
|
||||
return { type: 'interim', itemId, text: next };
|
||||
}
|
||||
|
||||
case 'conversation.item.input_audio_transcription.completed': {
|
||||
const itemId = evt.item_id;
|
||||
if (!itemId) return { type: 'ignore' };
|
||||
// Prefer the authoritative `transcript`; fall back to whatever we
|
||||
// accumulated from deltas if the completed frame omits it.
|
||||
const text = (evt.transcript ?? acc.get(itemId) ?? '').trim();
|
||||
acc.delete(itemId);
|
||||
return { type: 'final', itemId, text };
|
||||
}
|
||||
|
||||
case 'error': {
|
||||
// Surface the provider's concrete cause; never a generic message.
|
||||
const message =
|
||||
evt.error?.message?.trim() ||
|
||||
describeProviderError(evt.error, 'Realtime transcription error');
|
||||
return { type: 'error', message };
|
||||
}
|
||||
|
||||
default:
|
||||
return { type: 'ignore' };
|
||||
}
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class AiRealtimeService {
|
||||
private readonly logger = new Logger(AiRealtimeService.name);
|
||||
|
||||
constructor(private readonly aiSettings: AiSettingsService) {}
|
||||
|
||||
/**
|
||||
* Resolve the workspace STT config, SSRF-check the upstream, open the upstream
|
||||
* realtime WS and wire its events to the supplied callbacks. Returns a handle
|
||||
* the caller uses to push audio / stop / close. Throws
|
||||
* AiSttNotConfiguredException when no driver/STT model is configured, or a
|
||||
* plain Error (with a concrete reason) when the SSRF check fails.
|
||||
*/
|
||||
async openSession(
|
||||
workspaceId: string,
|
||||
opts: OpenSessionOptions,
|
||||
): Promise<RealtimeSessionHandle> {
|
||||
const cfg = await this.aiSettings.resolve(workspaceId);
|
||||
const model = cfg?.sttRealtimeModel || cfg?.sttModel;
|
||||
if (!cfg?.driver || !model) {
|
||||
throw new AiSttNotConfiguredException();
|
||||
}
|
||||
|
||||
const baseUrl = cfg.sttRealtimeBaseUrl || cfg.sttBaseUrl || cfg.baseUrl;
|
||||
const wssUrl = AiRealtimeService.deriveRealtimeUrl(baseUrl);
|
||||
|
||||
// SSRF check on the http(s) equivalent (ssrf-guard only allows http/https):
|
||||
// wss→https, ws→http. Re-checked here, right before connecting, to close the
|
||||
// DNS-rebinding window (same defense the external-MCP layer uses).
|
||||
const httpEquivalent = wssUrl.replace(/^wss:/i, 'https:').replace(/^ws:/i, 'http:');
|
||||
const check = await isUrlAllowed(httpEquivalent);
|
||||
if (!check.ok) {
|
||||
throw new Error(
|
||||
`Realtime endpoint blocked by SSRF guard: ${check.reason ?? 'not allowed'}`,
|
||||
);
|
||||
}
|
||||
|
||||
const key = cfg.sttApiKey;
|
||||
// Never log the key; only the (non-secret) URL is safe to log.
|
||||
this.logger.log(`Opening realtime STT session for workspace ${workspaceId}`);
|
||||
|
||||
const ws = new WebSocket(wssUrl, {
|
||||
headers: key ? { Authorization: `Bearer ${key}` } : {},
|
||||
// DO NOT send OpenAI-Beta: realtime=v1 — removed in GA.
|
||||
});
|
||||
|
||||
let closed = false;
|
||||
let idleTimer: NodeJS.Timeout | undefined;
|
||||
let maxTimer: NodeJS.Timeout | undefined;
|
||||
|
||||
const clearTimers = (): void => {
|
||||
if (idleTimer) {
|
||||
clearTimeout(idleTimer);
|
||||
idleTimer = undefined;
|
||||
}
|
||||
if (maxTimer) {
|
||||
clearTimeout(maxTimer);
|
||||
maxTimer = undefined;
|
||||
}
|
||||
};
|
||||
|
||||
// Idempotent teardown: clears timers, force-closes the upstream, fires
|
||||
// onClosed exactly once.
|
||||
const teardown = (): void => {
|
||||
if (closed) return;
|
||||
closed = true;
|
||||
clearTimers();
|
||||
try {
|
||||
if (
|
||||
ws.readyState === WebSocket.OPEN ||
|
||||
ws.readyState === WebSocket.CONNECTING
|
||||
) {
|
||||
ws.close();
|
||||
}
|
||||
} catch {
|
||||
// Ignore close races; the socket is being discarded anyway.
|
||||
}
|
||||
opts.onClosed();
|
||||
};
|
||||
|
||||
const failWith = (message: string): void => {
|
||||
if (closed) return;
|
||||
opts.onError(message);
|
||||
teardown();
|
||||
};
|
||||
|
||||
const resetIdleTimer = (): void => {
|
||||
if (closed) return;
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
idleTimer = setTimeout(() => {
|
||||
failWith(
|
||||
`Realtime session idle for ${IDLE_TIMEOUT_MS}ms (no audio received); closing.`,
|
||||
);
|
||||
}, IDLE_TIMEOUT_MS);
|
||||
};
|
||||
|
||||
// Hard lifetime cap, armed immediately so a never-opening or runaway session
|
||||
// is always reclaimed.
|
||||
maxTimer = setTimeout(() => {
|
||||
failWith(
|
||||
`Realtime session exceeded the maximum duration of ${MAX_SESSION_DURATION_MS}ms; closing.`,
|
||||
);
|
||||
}, MAX_SESSION_DURATION_MS);
|
||||
|
||||
// Also guard the handshake itself: if the upstream never opens / never sends,
|
||||
// the idle timer (15s) reclaims it well before the 120s max-duration cap.
|
||||
resetIdleTimer();
|
||||
|
||||
const acc = new Map<string, string>();
|
||||
|
||||
ws.on('open', () => {
|
||||
if (closed) return;
|
||||
// GA session.update: declare the transcription session, PCM16/24kHz mono
|
||||
// input, server VAD auto-segmentation, the effective model and (optional)
|
||||
// language. `language` is included only when the client supplied one.
|
||||
const transcription: { model: string; language?: string } = { model };
|
||||
if (opts.language) transcription.language = opts.language;
|
||||
const sessionUpdate = {
|
||||
type: 'session.update',
|
||||
session: {
|
||||
type: 'transcription',
|
||||
audio: {
|
||||
input: {
|
||||
format: { type: 'audio/pcm', rate: 24000 },
|
||||
turn_detection: { type: 'server_vad' },
|
||||
transcription,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
try {
|
||||
ws.send(JSON.stringify(sessionUpdate));
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to send realtime session.update', err as Error);
|
||||
failWith(describeProviderError(err, 'Failed to start realtime session'));
|
||||
return;
|
||||
}
|
||||
// Start the idle clock once the upstream is live.
|
||||
resetIdleTimer();
|
||||
});
|
||||
|
||||
ws.on('message', (data: WebSocket.RawData) => {
|
||||
if (closed) return;
|
||||
const raw = AiRealtimeService.rawDataToString(data);
|
||||
const parsed = parseUpstreamEvent(raw, acc);
|
||||
switch (parsed.type) {
|
||||
case 'ready':
|
||||
opts.onReady();
|
||||
break;
|
||||
case 'interim':
|
||||
opts.onInterim(parsed.itemId!, parsed.text ?? '');
|
||||
break;
|
||||
case 'final':
|
||||
opts.onFinal(parsed.itemId!, parsed.text ?? '');
|
||||
break;
|
||||
case 'error':
|
||||
// Log the full upstream error then surface the concrete reason.
|
||||
this.logger.error(`Realtime upstream error: ${parsed.message}`);
|
||||
failWith(parsed.message ?? 'Realtime transcription error');
|
||||
break;
|
||||
case 'ignore':
|
||||
default:
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('error', (err: Error) => {
|
||||
// Log the full error (name/message/stack); never the key/audio.
|
||||
this.logger.error('Realtime upstream socket error', err);
|
||||
failWith(describeProviderError(err, 'Realtime upstream connection error'));
|
||||
});
|
||||
|
||||
ws.on('close', (code: number, reason: Buffer) => {
|
||||
if (closed) return;
|
||||
const why = reason?.toString?.() || '';
|
||||
// An unexpected close (not via stop()/teardown) is reported as a concrete
|
||||
// reason; onClosed always fires via teardown.
|
||||
this.logger.log(
|
||||
`Realtime upstream closed (code ${code}${why ? `: ${why}` : ''})`,
|
||||
);
|
||||
if (code !== 1000) {
|
||||
failWith(
|
||||
`Realtime upstream closed (code ${code}${why ? `: ${why}` : ''}).`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
teardown();
|
||||
});
|
||||
|
||||
return {
|
||||
appendAudio: (chunk: Buffer | Uint8Array): void => {
|
||||
if (closed || ws.readyState !== WebSocket.OPEN) return;
|
||||
const audio = Buffer.from(chunk).toString('base64');
|
||||
try {
|
||||
ws.send(JSON.stringify({ type: 'input_audio_buffer.append', audio }));
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to forward realtime audio chunk', err as Error);
|
||||
failWith(describeProviderError(err, 'Failed to forward audio'));
|
||||
return;
|
||||
}
|
||||
// Audio flowing again → push the idle deadline out.
|
||||
resetIdleTimer();
|
||||
},
|
||||
stop: (): void => {
|
||||
// Graceful stop: with server_vad no manual commit is required, but an
|
||||
// explicit commit flushes any buffered tail before we close.
|
||||
if (!closed && ws.readyState === WebSocket.OPEN) {
|
||||
try {
|
||||
ws.send(JSON.stringify({ type: 'input_audio_buffer.commit' }));
|
||||
} catch (err) {
|
||||
// A failed commit is non-fatal; we still close gracefully below.
|
||||
this.logger.error('Failed to commit realtime audio buffer', err as Error);
|
||||
}
|
||||
}
|
||||
teardown();
|
||||
},
|
||||
close: (): void => {
|
||||
teardown();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Admin "test connection" probe for the realtime STT upstream. Reuses
|
||||
* openSession so the real config-resolution, SSRF check and handshake path are
|
||||
* exercised, then tears the upstream socket down immediately — no audio is ever
|
||||
* sent. Resolves to the FROZEN contract { ok: true } | { ok: false, error }.
|
||||
*
|
||||
* Resolution rules (settle exactly once, guarded by `settled`):
|
||||
* - first onReady → { ok: true }
|
||||
* - first onError(message) → { ok: false, error: message }
|
||||
* - ~8s timeout → { ok: false, error: 'Realtime connection timed out' }
|
||||
* - openSession(...) throws → { ok: false, error } (AiSttNotConfigured message,
|
||||
* else describeProviderError)
|
||||
*
|
||||
* On any outcome the upstream handle is closed and the timer cleared exactly
|
||||
* once, so this never leaves a socket open. The API key is never logged.
|
||||
*/
|
||||
async testConnection(
|
||||
workspaceId: string,
|
||||
): Promise<{ ok: true } | { ok: false; error: string }> {
|
||||
return new Promise<{ ok: true } | { ok: false; error: string }>(
|
||||
(resolve) => {
|
||||
let settled = false;
|
||||
let handle: RealtimeSessionHandle | undefined;
|
||||
let timer: NodeJS.Timeout | undefined;
|
||||
|
||||
// Settle once: clear the timer, close the upstream handle, resolve.
|
||||
const finish = (
|
||||
result: { ok: true } | { ok: false; error: string },
|
||||
): void => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
timer = undefined;
|
||||
}
|
||||
try {
|
||||
handle?.close();
|
||||
} catch {
|
||||
// Ignore close races; the socket is being discarded anyway.
|
||||
}
|
||||
resolve(result);
|
||||
};
|
||||
|
||||
// Arm the timeout before opening so a never-readying upstream is reclaimed.
|
||||
timer = setTimeout(() => {
|
||||
finish({ ok: false, error: 'Realtime connection timed out' });
|
||||
}, TEST_CONNECTION_TIMEOUT_MS);
|
||||
|
||||
this.openSession(workspaceId, {
|
||||
onReady: () => finish({ ok: true }),
|
||||
onError: (message) => finish({ ok: false, error: message }),
|
||||
// No audio is ever sent; these are no-ops for the probe.
|
||||
onInterim: () => {},
|
||||
onFinal: () => {},
|
||||
onClosed: () => {},
|
||||
})
|
||||
.then((opened) => {
|
||||
handle = opened;
|
||||
// openSession may have already errored/closed synchronously before
|
||||
// we stored the handle; if we've settled, close it now.
|
||||
if (settled) {
|
||||
try {
|
||||
handle.close();
|
||||
} catch {
|
||||
// Ignore close races.
|
||||
}
|
||||
}
|
||||
})
|
||||
.catch((err: unknown) => {
|
||||
// openSession threw (AiSttNotConfiguredException or SSRF/Error)
|
||||
// before any socket was returned: surface a concrete reason.
|
||||
const error =
|
||||
err instanceof AiSttNotConfiguredException
|
||||
? err.message
|
||||
: describeProviderError(err, 'Realtime connection failed');
|
||||
finish({ ok: false, error });
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Derive the upstream realtime WSS URL from the (optional) effective base URL.
|
||||
*
|
||||
* - No base URL → OpenAI default
|
||||
* `wss://api.openai.com/v1/realtime?intent=transcription`.
|
||||
* - Otherwise: take the base origin, ensure exactly one
|
||||
* `/v1/realtime?intent=transcription` path, and upgrade the scheme to wss
|
||||
* (http→ws→wss; https→wss). A base that already ends in `/v1` (or
|
||||
* `/v1/realtime`) does not get a duplicated `/v1`.
|
||||
*/
|
||||
static deriveRealtimeUrl(baseUrl?: string): string {
|
||||
if (!baseUrl || !baseUrl.trim()) {
|
||||
return 'wss://api.openai.com/v1/realtime?intent=transcription';
|
||||
}
|
||||
|
||||
let parsed: URL;
|
||||
try {
|
||||
parsed = new URL(baseUrl.trim());
|
||||
} catch {
|
||||
// Unparseable base: fall back to the OpenAI default rather than throwing
|
||||
// here; the SSRF check on the default still applies downstream.
|
||||
return 'wss://api.openai.com/v1/realtime?intent=transcription';
|
||||
}
|
||||
|
||||
// Normalize the path: strip a trailing slash, drop an existing
|
||||
// `/realtime` suffix, ensure a single `/v1`, then append the realtime path.
|
||||
let path = parsed.pathname.replace(/\/+$/, '');
|
||||
path = path.replace(/\/realtime$/i, '');
|
||||
if (!/\/v1$/i.test(path)) {
|
||||
path = `${path}/v1`;
|
||||
}
|
||||
path = `${path}/realtime`;
|
||||
|
||||
// Scheme → wss (secure) / ws (insecure). The SSRF guard runs on the
|
||||
// http(s) equivalent before connecting.
|
||||
const scheme = parsed.protocol === 'http:' || parsed.protocol === 'ws:' ? 'ws' : 'wss';
|
||||
|
||||
return `${scheme}://${parsed.host}${path}?intent=transcription`;
|
||||
}
|
||||
|
||||
/** Normalize a ws RawData payload (Buffer | ArrayBuffer | Buffer[]) to a string. */
|
||||
private static rawDataToString(data: WebSocket.RawData): string {
|
||||
if (typeof data === 'string') return data;
|
||||
if (Buffer.isBuffer(data)) return data.toString('utf8');
|
||||
if (Array.isArray(data)) return Buffer.concat(data).toString('utf8');
|
||||
// ArrayBuffer
|
||||
return Buffer.from(data as ArrayBuffer).toString('utf8');
|
||||
}
|
||||
}
|
||||
@@ -55,6 +55,10 @@ export class UpdateWorkspaceDto extends PartialType(CreateWorkspaceDto) {
|
||||
@IsBoolean()
|
||||
aiDictation: boolean;
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
aiDictationRealtime: boolean;
|
||||
|
||||
// Workspace master toggle that enables/disables the HTML embed block type.
|
||||
// Persisted at settings.htmlEmbed. ABSENT/false => OFF (default). The block
|
||||
// itself renders in a sandboxed iframe, so this is a feature switch, not a
|
||||
|
||||
@@ -511,6 +511,20 @@ export class WorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
if (typeof updateWorkspaceDto.aiDictationRealtime !== 'undefined') {
|
||||
const prev = settingsBefore?.ai?.dictationRealtime ?? false;
|
||||
if (prev !== updateWorkspaceDto.aiDictationRealtime) {
|
||||
before.aiDictationRealtime = prev;
|
||||
after.aiDictationRealtime = updateWorkspaceDto.aiDictationRealtime;
|
||||
}
|
||||
await this.workspaceRepo.updateAiSettings(
|
||||
workspaceId,
|
||||
'dictationRealtime',
|
||||
updateWorkspaceDto.aiDictationRealtime,
|
||||
trx,
|
||||
);
|
||||
}
|
||||
|
||||
if (typeof updateWorkspaceDto.htmlEmbed !== 'undefined') {
|
||||
const prev = settingsBefore?.htmlEmbed ?? false;
|
||||
if (prev !== updateWorkspaceDto.htmlEmbed) {
|
||||
@@ -564,6 +578,7 @@ export class WorkspaceService {
|
||||
delete updateWorkspaceDto.allowMemberTemplates;
|
||||
delete updateWorkspaceDto.aiChat;
|
||||
delete updateWorkspaceDto.aiDictation;
|
||||
delete updateWorkspaceDto.aiDictationRealtime;
|
||||
delete updateWorkspaceDto.htmlEmbed;
|
||||
delete updateWorkspaceDto.trackerHead;
|
||||
delete updateWorkspaceDto.aiPublicShareAssistant;
|
||||
|
||||
@@ -239,7 +239,7 @@ export class WorkspaceRepo {
|
||||
// is a real jsonb object, never a double-encoded string. The CASE self-heals
|
||||
// workspaces whose settings.ai.provider was previously corrupted into an
|
||||
// array/string.
|
||||
const ALLOWED = ['driver', 'chatModel', 'embeddingModel', 'baseUrl', 'embeddingBaseUrl', 'sttModel', 'sttBaseUrl', 'sttApiStyle', 'systemPrompt', 'publicShareChatModel', 'publicShareAssistantRoleId'];
|
||||
const ALLOWED = ['driver', 'chatModel', 'embeddingModel', 'baseUrl', 'embeddingBaseUrl', 'sttModel', 'sttBaseUrl', 'sttRealtimeModel', 'sttRealtimeBaseUrl', 'sttApiStyle', 'systemPrompt', 'publicShareChatModel', 'publicShareAssistantRoleId'];
|
||||
const entries = Object.entries(provider).filter(
|
||||
([k, v]) => v !== undefined && ALLOWED.includes(k),
|
||||
);
|
||||
|
||||
@@ -32,6 +32,8 @@ export interface UpdateAiSettingsInput {
|
||||
embeddingApiKey?: string;
|
||||
sttModel?: string;
|
||||
sttBaseUrl?: string;
|
||||
sttRealtimeModel?: string;
|
||||
sttRealtimeBaseUrl?: string;
|
||||
sttApiStyle?: SttApiStyle;
|
||||
sttApiKey?: string;
|
||||
publicShareChatModel?: string;
|
||||
@@ -163,6 +165,10 @@ export class AiSettingsService {
|
||||
publicShareAssistantRoleId: provider.publicShareAssistantRoleId,
|
||||
embeddingModel: provider.embeddingModel,
|
||||
sttModel: provider.sttModel,
|
||||
// Raw passthrough, NO fallback; the realtime consumer falls back to
|
||||
// `sttModel` / (`sttBaseUrl` || `baseUrl`) at use time.
|
||||
sttRealtimeModel: provider.sttRealtimeModel,
|
||||
sttRealtimeBaseUrl: provider.sttRealtimeBaseUrl,
|
||||
// Plain passthrough, no fallback; the transcribe path defaults unset to
|
||||
// 'multipart' (current behavior).
|
||||
sttApiStyle: provider.sttApiStyle,
|
||||
@@ -239,6 +245,8 @@ export class AiSettingsService {
|
||||
embeddingBaseUrl: provider.embeddingBaseUrl,
|
||||
sttModel: provider.sttModel,
|
||||
sttBaseUrl: provider.sttBaseUrl,
|
||||
sttRealtimeModel: provider.sttRealtimeModel,
|
||||
sttRealtimeBaseUrl: provider.sttRealtimeBaseUrl,
|
||||
sttApiStyle: provider.sttApiStyle,
|
||||
systemPrompt: provider.systemPrompt,
|
||||
publicShareChatModel: provider.publicShareChatModel,
|
||||
@@ -278,6 +286,8 @@ export class AiSettingsService {
|
||||
'embeddingBaseUrl',
|
||||
'sttModel',
|
||||
'sttBaseUrl',
|
||||
'sttRealtimeModel',
|
||||
'sttRealtimeBaseUrl',
|
||||
'sttApiStyle',
|
||||
'systemPrompt',
|
||||
'publicShareChatModel',
|
||||
|
||||
@@ -30,6 +30,11 @@ export interface AiProviderSettings {
|
||||
sttModel?: string;
|
||||
// STT-specific base URL. Falls back to baseUrl when empty/unset.
|
||||
sttBaseUrl?: string;
|
||||
// Realtime STT model id. Falls back to `sttModel` at use time when empty/unset.
|
||||
sttRealtimeModel?: string;
|
||||
// Realtime STT base URL. Falls back to `sttBaseUrl` || `baseUrl` at use time
|
||||
// when empty/unset.
|
||||
sttRealtimeBaseUrl?: string;
|
||||
sttApiStyle?: SttApiStyle;
|
||||
systemPrompt?: string;
|
||||
// Cheap chat model id used ONLY by the anonymous public-share assistant. The
|
||||
@@ -79,6 +84,8 @@ export interface MaskedAiSettings {
|
||||
embeddingBaseUrl?: string;
|
||||
sttModel?: string;
|
||||
sttBaseUrl?: string;
|
||||
sttRealtimeModel?: string;
|
||||
sttRealtimeBaseUrl?: string;
|
||||
sttApiStyle?: SttApiStyle;
|
||||
systemPrompt?: string;
|
||||
publicShareChatModel?: string;
|
||||
|
||||
@@ -50,6 +50,14 @@ export class UpdateAiSettingsDto {
|
||||
@IsString()
|
||||
sttBaseUrl?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
sttRealtimeModel?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
sttRealtimeBaseUrl?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsIn(STT_API_STYLES)
|
||||
sttApiStyle?: SttApiStyle;
|
||||
|
||||
Reference in New Issue
Block a user