feat(dictation): add realtime streaming STT (live dictation)

Layer an optional realtime speech-to-text path on top of the existing
batch dictation, so transcribed text appears as the user speaks.

Transport A2: browser <-> our server (Socket.IO `/ai-realtime`) <->
OpenAI Realtime (raw ws). The provider API key never leaves the server;
the upstream URL is SSRF-checked before connecting; the gateway enforces
the dictation+dictationRealtime gate, cookie-JWT auth and per-user/
per-workspace concurrency caps. Implemented against the GA (2026) OpenAI
Realtime transcription contract (session.update / audio.input.format /
server_vad), not the now-removed beta shape.

Editor UI B2: interim text is shown as a meta-only ProseMirror ghost
decoration (no Yjs/history noise); only completed segments are committed.
Chat shows interim as a dimmed tail. The mic button switches realtime vs
batch by the workspace flag; batch remains the default and fallback.

Server:
- AiRealtimeService (upstream ws proxy, normalized events, idle/max-
  duration timeouts, idempotent teardown) + parseUpstreamEvent unit tests
- AiRealtimeGateway (Socket.IO `/ai-realtime`) wired into AiChatModule
- admin-gated POST /ai-chat/realtime/test connectivity probe
- config: settings.ai.dictationRealtime + provider sttRealtimeModel/
  sttRealtimeBaseUrl (realtime key reuses sttApiKey; no new secret)

Client:
- pcm16 AudioWorklet (24kHz mono PCM16), RealtimeDictationClient,
  use-realtime-dictation hook (status/start/stop/cancel + onInterim/onFinal)
- RealtimeMicButton + dictation-interim ProseMirror decoration
- editor/chat integration + AI settings UI (toggle, model, test endpoint)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
claude_code
2026-06-21 14:47:28 +03:00
committed by claude code agent 227
parent 74e2b7ad7f
commit 7db3f007cb
25 changed files with 2111 additions and 19 deletions

View File

@@ -27,8 +27,14 @@ import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.rep
import { UserThrottlerGuard } from '../../integrations/throttle/user-throttler.guard';
import { AI_CHAT_THROTTLER } from '../../integrations/throttle/throttler-names';
import { FileInterceptor } from '../../common/interceptors/file.interceptor';
import WorkspaceAbilityFactory from '../casl/abilities/workspace-ability.factory';
import {
WorkspaceCaslAction,
WorkspaceCaslSubject,
} from '../casl/interfaces/workspace-ability.type';
import { AiChatService, AiChatStreamBody } from './ai-chat.service';
import { AiTranscriptionService } from './ai-transcription.service';
import { AiRealtimeService } from './realtime/ai-realtime.service';
import {
ChatIdDto,
GetChatMessagesDto,
@@ -51,8 +57,23 @@ export class AiChatController {
private readonly aiChatRepo: AiChatRepo,
private readonly aiChatMessageRepo: AiChatMessageRepo,
private readonly aiTranscription: AiTranscriptionService,
private readonly aiRealtimeService: AiRealtimeService,
private readonly workspaceAbility: WorkspaceAbilityFactory,
) {}
/**
* Admin gate, identical to AiSettingsController.assertAdmin: require the
* workspace Manage/Settings ability (same gate as POST /workspace/update).
*/
private assertAdmin(user: User, workspace: Workspace): void {
const ability = this.workspaceAbility.createForUser(user, workspace);
if (
ability.cannot(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Settings)
) {
throw new ForbiddenException();
}
}
/** List the requesting user's chats in this workspace (paginated). */
@HttpCode(HttpStatus.OK)
@Post('chats')
@@ -287,6 +308,23 @@ export class AiChatController {
return { text };
}
/**
* Admin-only "test connection" probe for the realtime STT upstream. Reuses
* AiRealtimeService.openSession to exercise the real config/SSRF/handshake path
* and tears the socket down immediately. The API key never leaves the server.
* Response is the FROZEN contract { ok: true } | { ok: false, error: string }
* (the global response transform wraps it; the client reads req.data).
*/
@HttpCode(HttpStatus.OK)
@Post('realtime/test')
async testRealtime(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
): Promise<{ ok: true } | { ok: false; error: string }> {
this.assertAdmin(user, workspace);
return this.aiRealtimeService.testConnection(workspace.id);
}
/**
* Ensure the chat exists, belongs to this workspace, AND was created by the
* requesting user (per-user isolation). Throws ForbiddenException otherwise.

View File

@@ -13,6 +13,8 @@ import { SearchModule } from '../search/search.module';
import { PublicShareChatController } from './public-share-chat.controller';
import { PublicShareChatService } from './public-share-chat.service';
import { PublicShareChatToolsService } from './tools/public-share-chat-tools.service';
import { AiRealtimeGateway } from './realtime/ai-realtime.gateway';
import { AiRealtimeService } from './realtime/ai-realtime.service';
/**
* Per-user AI chat module (§6.1).
@@ -46,6 +48,11 @@ import { PublicShareChatToolsService } from './tools/public-share-chat-tools.ser
AiChatToolsService,
PublicShareChatService,
PublicShareChatToolsService,
// Realtime dictation: the Socket.IO `/ai-realtime` gateway + its upstream
// proxy service. AiSettingsService comes from AiModule; WorkspaceRepo from
// the global DatabaseModule; TokenService from TokenModule (both imported).
AiRealtimeGateway,
AiRealtimeService,
],
})
export class AiChatModule {}

View File

@@ -0,0 +1,236 @@
import {
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
WebSocketGateway,
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Socket } from 'socket.io';
import * as cookie from 'cookie';
import { TokenService } from '../../auth/services/token.service';
import { JwtPayload, JwtType } from '../../auth/dto/jwt-payload';
import { WorkspaceRepo } from '@docmost/db/repos/workspace/workspace.repo';
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
import { describeProviderError } from '../../../integrations/ai/ai-error.util';
import {
AiRealtimeService,
RealtimeSessionHandle,
} from './ai-realtime.service';
/**
* Realtime dictation gateway — the server side of the FROZEN normalized
* Socket.IO `/ai-realtime` protocol. The browser talks ONLY to this namespace;
* the raw OpenAI GA schema and the provider key never reach the client.
*
* Client → server: connect (cookie-JWT auth), `start` { language? }, `audio`
* (PCM16 binary), `stop`. Server → client: `ready`, `interim`, `final`,
* `error`, `closed`.
*
* Gate (before opening upstream): the workspace must have BOTH
* `settings.ai.dictation === true` AND `settings.ai.dictationRealtime === true`.
* Hard concurrency caps (realtime is expensive) are enforced in-memory per user
* and per workspace.
*/
/** Realtime is expensive: one live session per user, a handful per workspace. */
const MAX_SESSIONS_PER_USER = 1;
const MAX_SESSIONS_PER_WORKSPACE = 5;
// Module-level concurrency counters. A single Node process backs the gateway;
// these caps are best-effort within that process (a horizontally-scaled
// deployment would need a shared store, out of scope here).
const sessionsPerUser = new Map<string, number>();
const sessionsPerWorkspace = new Map<string, number>();
function incr(map: Map<string, number>, key: string): number {
const next = (map.get(key) ?? 0) + 1;
map.set(key, next);
return next;
}
function decr(map: Map<string, number>, key: string): void {
const next = (map.get(key) ?? 0) - 1;
if (next <= 0) {
map.delete(key);
} else {
map.set(key, next);
}
}
/** Per-socket state we stash on client.data. */
interface RealtimeClientData {
userId: string;
workspaceId: string;
handle?: RealtimeSessionHandle;
// What we incremented at connect time, so disconnect decrements exactly that.
countedUserId?: string;
countedWorkspaceId?: string;
}
@WebSocketGateway({
namespace: '/ai-realtime',
cors: { origin: '*' },
transports: ['websocket'],
})
export class AiRealtimeGateway
implements OnGatewayConnection, OnGatewayDisconnect
{
private readonly logger = new Logger(AiRealtimeGateway.name);
constructor(
private readonly tokenService: TokenService,
private readonly workspaceRepo: WorkspaceRepo,
private readonly aiRealtimeService: AiRealtimeService,
) {}
async handleConnection(client: Socket): Promise<void> {
try {
const cookies = cookie.parse(client.handshake.headers.cookie ?? '');
const token: JwtPayload = await this.tokenService.verifyJwt(
cookies['authToken'],
JwtType.ACCESS,
);
const userId = token.sub;
const workspaceId = token.workspaceId;
const data = client.data as RealtimeClientData;
data.userId = userId;
data.workspaceId = workspaceId;
// Gate: realtime dictation must be enabled at the workspace level.
const workspace = await this.workspaceRepo.findById(workspaceId);
const settings = (workspace?.settings ?? {}) as {
ai?: { dictation?: boolean; dictationRealtime?: boolean };
};
if (
settings.ai?.dictation !== true ||
settings.ai?.dictationRealtime !== true
) {
client.emit('error', {
message: 'Realtime dictation is not enabled',
});
client.disconnect();
return;
}
// Hard concurrency caps (realtime is expensive). Check both before
// incrementing either, so a rejected connection leaves the counters clean.
const userCount = sessionsPerUser.get(userId) ?? 0;
const workspaceCount = sessionsPerWorkspace.get(workspaceId) ?? 0;
if (userCount >= MAX_SESSIONS_PER_USER) {
client.emit('error', {
message:
'A realtime dictation session is already active for your account',
});
client.disconnect();
return;
}
if (workspaceCount >= MAX_SESSIONS_PER_WORKSPACE) {
client.emit('error', {
message:
'The maximum number of concurrent realtime dictation sessions for this workspace has been reached',
});
client.disconnect();
return;
}
incr(sessionsPerUser, userId);
incr(sessionsPerWorkspace, workspaceId);
// Remember exactly what we counted so disconnect decrements symmetrically.
data.countedUserId = userId;
data.countedWorkspaceId = workspaceId;
} catch (err) {
// Auth failure (or any unexpected connect error): never leak details.
this.logger.error('Realtime dictation connection rejected', err as Error);
client.emit('error', { message: 'Unauthorized' });
client.disconnect();
}
}
@SubscribeMessage('start')
async handleStart(
client: Socket,
data?: { language?: string },
): Promise<void> {
const state = client.data as RealtimeClientData;
// Guard double-start: a session is already open on this socket.
if (state.handle) {
client.emit('error', {
message: 'A realtime dictation session is already in progress',
});
return;
}
try {
const handle = await this.aiRealtimeService.openSession(
state.workspaceId,
{
language: data?.language,
onReady: () => client.emit('ready', {}),
onInterim: (itemId, text) => client.emit('interim', { itemId, text }),
onFinal: (itemId, text) => client.emit('final', { itemId, text }),
onError: (message) => client.emit('error', { message }),
onClosed: () => {
// Session ended (graceful stop, idle/max-duration, or upstream close):
// clear the handle so the double-start guard is released, then notify.
state.handle = undefined;
client.emit('closed', {});
},
},
);
state.handle = handle;
} catch (err) {
// Concrete reason to the client: a not-configured 503 vs a provider error.
this.logger.error('Failed to open realtime dictation session', err as Error);
const message =
err instanceof AiSttNotConfiguredException
? err.message
: describeProviderError(err, 'Failed to start realtime dictation');
client.emit('error', { message });
}
}
@SubscribeMessage('audio')
handleAudio(client: Socket, payload: unknown): void {
const state = client.data as RealtimeClientData;
if (!state.handle) return;
const chunk = AiRealtimeGateway.toBuffer(payload);
if (!chunk) return;
state.handle.appendAudio(chunk);
}
@SubscribeMessage('stop')
handleStop(client: Socket): void {
const state = client.data as RealtimeClientData;
state.handle?.stop();
}
handleDisconnect(client: Socket): void {
const state = client.data as RealtimeClientData;
// Tear down the upstream session, then release the concurrency slots we took.
state.handle?.close();
state.handle = undefined;
if (state.countedUserId) {
decr(sessionsPerUser, state.countedUserId);
state.countedUserId = undefined;
}
if (state.countedWorkspaceId) {
decr(sessionsPerWorkspace, state.countedWorkspaceId);
state.countedWorkspaceId = undefined;
}
}
/**
* Normalize an incoming `audio` payload to a Buffer. Socket.IO delivers binary
* as Buffer (Node) but may also surface Uint8Array / ArrayBuffer; accept all.
* Returns null for anything we cannot interpret as binary audio.
*/
private static toBuffer(payload: unknown): Buffer | null {
if (Buffer.isBuffer(payload)) return payload;
if (payload instanceof Uint8Array) return Buffer.from(payload);
if (payload instanceof ArrayBuffer) return Buffer.from(payload);
return null;
}
}

View File

@@ -0,0 +1,186 @@
import { parseUpstreamEvent } from './ai-realtime.service';
/**
* Unit tests for the PURE `parseUpstreamEvent` normalizer (no network). They
* feed synthetic OpenAI GA frames through a shared per-item delta accumulator
* and assert the normalized `/ai-realtime` outputs, including that two deltas
* for the same item_id accumulate and that the accumulator is cleared once the
* segment completes.
*/
describe('parseUpstreamEvent (OpenAI GA → normalized realtime events)', () => {
let acc: Map<string, string>;
beforeEach(() => {
acc = new Map<string, string>();
});
it('maps session.created / session.updated to { type: "ready" }', () => {
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.created' }), acc)).toEqual({
type: 'ready',
});
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.updated' }), acc)).toEqual({
type: 'ready',
});
// No accumulator side effects from session frames.
expect(acc.size).toBe(0);
});
it('accumulates two deltas for the same item_id into the interim text', () => {
const first = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-1',
delta: 'Hello',
}),
acc,
);
expect(first).toEqual({ type: 'interim', itemId: 'item-1', text: 'Hello' });
const second = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-1',
delta: ' world',
}),
acc,
);
// The second delta appends to the first: the interim is the full running text.
expect(second).toEqual({
type: 'interim',
itemId: 'item-1',
text: 'Hello world',
});
expect(acc.get('item-1')).toBe('Hello world');
});
it('emits a trimmed final from the completed transcript and clears the accumulator', () => {
// Seed an in-flight accumulation, then complete it.
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-2',
delta: 'partial',
}),
acc,
);
expect(acc.has('item-2')).toBe(true);
const final = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'item-2',
transcript: ' Final transcript. ',
}),
acc,
);
expect(final).toEqual({
type: 'final',
itemId: 'item-2',
text: 'Final transcript.',
});
// The accumulator entry for the completed segment is removed.
expect(acc.has('item-2')).toBe(false);
});
it('falls back to the accumulated text when completed omits the transcript', () => {
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'item-3',
delta: 'accumulated only',
}),
acc,
);
const final = parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'item-3',
}),
acc,
);
expect(final).toEqual({
type: 'final',
itemId: 'item-3',
text: 'accumulated only',
});
expect(acc.has('item-3')).toBe(false);
});
it('maps an error frame to { type: "error" } with the provider message', () => {
const out = parseUpstreamEvent(
JSON.stringify({
type: 'error',
error: { message: 'invalid_api_key', code: 'invalid', type: 'auth' },
}),
acc,
);
expect(out.type).toBe('error');
expect(out.message).toBe('invalid_api_key');
});
it('maps an unknown frame to { type: "ignore" }', () => {
expect(
parseUpstreamEvent(JSON.stringify({ type: 'response.created' }), acc),
).toEqual({ type: 'ignore' });
// An unknown frame leaves a running accumulation untouched.
expect(acc.size).toBe(0);
});
it('maps an unparseable (non-JSON) frame to { type: "ignore" }', () => {
expect(parseUpstreamEvent('not json', acc)).toEqual({ type: 'ignore' });
});
it('runs the full GA sequence end-to-end and ends with a clean accumulator', () => {
// session.created → two deltas (same item) → completed → error → unknown.
expect(parseUpstreamEvent(JSON.stringify({ type: 'session.created' }), acc)).toEqual({
type: 'ready',
});
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'seg',
delta: 'one ',
}),
acc,
),
).toEqual({ type: 'interim', itemId: 'seg', text: 'one ' });
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.delta',
item_id: 'seg',
delta: 'two',
}),
acc,
),
).toEqual({ type: 'interim', itemId: 'seg', text: 'one two' });
expect(
parseUpstreamEvent(
JSON.stringify({
type: 'conversation.item.input_audio_transcription.completed',
item_id: 'seg',
transcript: 'one two',
}),
acc,
),
).toEqual({ type: 'final', itemId: 'seg', text: 'one two' });
expect(
parseUpstreamEvent(
JSON.stringify({ type: 'error', error: { message: 'boom' } }),
acc,
),
).toEqual({ type: 'error', message: 'boom' });
expect(parseUpstreamEvent(JSON.stringify({ type: 'whatever' }), acc)).toEqual({
type: 'ignore',
});
// Every started segment was completed → the accumulator is empty.
expect(acc.size).toBe(0);
});
});

View File

@@ -0,0 +1,485 @@
import { Injectable, Logger } from '@nestjs/common';
import WebSocket from 'ws';
import { AiSettingsService } from '../../../integrations/ai/ai-settings.service';
import { AiSttNotConfiguredException } from '../../../integrations/ai/ai-stt-not-configured.exception';
import { describeProviderError } from '../../../integrations/ai/ai-error.util';
import { isUrlAllowed } from '../external-mcp/ssrf-guard';
/**
* Realtime STT proxy (server side of the A2 transport: browser ↔ OUR server ↔
* OpenAI). The provider API key is resolved here and NEVER leaves the server /
* NEVER logged. The client only ever sees the normalized events emitted via the
* callbacks below — never the raw OpenAI GA schema.
*
* The upstream contract is the GA (2026) OpenAI Realtime transcription shape:
* wss://<host>/v1/realtime?intent=transcription
* header: Authorization: Bearer <sttApiKey> (NO OpenAI-Beta header in GA)
* one session.update after open, then input_audio_buffer.append frames.
*/
/** Normalized result of parsing a single raw upstream (OpenAI GA) event. */
export interface ParsedUpstreamEvent {
type: 'ready' | 'interim' | 'final' | 'error' | 'ignore';
itemId?: string;
text?: string;
message?: string;
}
/** Callbacks the gateway supplies to bridge upstream events to the client. */
export interface OpenSessionOptions {
/** Optional transcription language hint (e.g. 'en'); omitted from session.update when absent. */
language?: string;
/** Upstream session is live → client may start sending audio. */
onReady: () => void;
/** Latest accumulated partial text for a not-yet-final segment. */
onInterim: (itemId: string, text: string) => void;
/** A completed segment's final (trimmed) transcript. */
onFinal: (itemId: string, text: string) => void;
/** Concrete error reason for the client. */
onError: (message: string) => void;
/** Session ended (graceful stop or upstream close). */
onClosed: () => void;
}
/** Handle returned by openSession; the gateway drives audio/stop/close through it. */
export interface RealtimeSessionHandle {
/** Base64-encode a PCM16 chunk and forward as input_audio_buffer.append (if upstream OPEN). */
appendAudio: (chunk: Buffer | Uint8Array) => void;
/** Graceful stop: optionally commit, then close the upstream. */
stop: () => void;
/** Force-close the upstream and clear timers (idempotent). */
close: () => void;
}
/** No audio appended for this long → close the session with a clear reason. */
const IDLE_TIMEOUT_MS = 15_000;
/** Hard cap on a single realtime session's lifetime (mirrors the client's 120s). */
const MAX_SESSION_DURATION_MS = 120_000;
/** How long testConnection waits for the upstream to become ready before failing. */
const TEST_CONNECTION_TIMEOUT_MS = 8_000;
/**
* Parse ONE raw upstream (OpenAI GA) event JSON and normalize it, updating the
* per-item delta accumulator `acc` in place. PURE (aside from the supplied `acc`
* mutation) so it can be unit-tested without any network. Unknown or unparseable
* frames normalize to { type: 'ignore' } so the proxy silently skips them.
*
* - session.created / session.updated → { type: 'ready' }
* - conversation.item.input_audio_transcription.delta → append delta to
* acc[item_id]; return { type: 'interim', itemId, text: <accumulated> }
* - conversation.item.input_audio_transcription.completed → final transcript
* (trimmed), delete acc[item_id]; return { type: 'final', itemId, text }
* - error → { type: 'error', message } (provider message, else describeProviderError)
* - anything else / unparseable → { type: 'ignore' }
*/
export function parseUpstreamEvent(
raw: string,
acc: Map<string, string>,
): ParsedUpstreamEvent {
let evt: {
type?: string;
item_id?: string;
delta?: string;
transcript?: string;
error?: { message?: string; code?: string; type?: string };
};
try {
evt = JSON.parse(raw);
} catch {
// Non-JSON frame: ignore rather than crash the proxy.
return { type: 'ignore' };
}
if (typeof evt !== 'object' || evt === null || typeof evt.type !== 'string') {
return { type: 'ignore' };
}
switch (evt.type) {
case 'session.created':
case 'session.updated':
return { type: 'ready' };
case 'conversation.item.input_audio_transcription.delta': {
const itemId = evt.item_id;
if (!itemId) return { type: 'ignore' };
const prev = acc.get(itemId) ?? '';
const next = prev + (evt.delta ?? '');
acc.set(itemId, next);
return { type: 'interim', itemId, text: next };
}
case 'conversation.item.input_audio_transcription.completed': {
const itemId = evt.item_id;
if (!itemId) return { type: 'ignore' };
// Prefer the authoritative `transcript`; fall back to whatever we
// accumulated from deltas if the completed frame omits it.
const text = (evt.transcript ?? acc.get(itemId) ?? '').trim();
acc.delete(itemId);
return { type: 'final', itemId, text };
}
case 'error': {
// Surface the provider's concrete cause; never a generic message.
const message =
evt.error?.message?.trim() ||
describeProviderError(evt.error, 'Realtime transcription error');
return { type: 'error', message };
}
default:
return { type: 'ignore' };
}
}
@Injectable()
export class AiRealtimeService {
private readonly logger = new Logger(AiRealtimeService.name);
constructor(private readonly aiSettings: AiSettingsService) {}
/**
* Resolve the workspace STT config, SSRF-check the upstream, open the upstream
* realtime WS and wire its events to the supplied callbacks. Returns a handle
* the caller uses to push audio / stop / close. Throws
* AiSttNotConfiguredException when no driver/STT model is configured, or a
* plain Error (with a concrete reason) when the SSRF check fails.
*/
async openSession(
workspaceId: string,
opts: OpenSessionOptions,
): Promise<RealtimeSessionHandle> {
const cfg = await this.aiSettings.resolve(workspaceId);
const model = cfg?.sttRealtimeModel || cfg?.sttModel;
if (!cfg?.driver || !model) {
throw new AiSttNotConfiguredException();
}
const baseUrl = cfg.sttRealtimeBaseUrl || cfg.sttBaseUrl || cfg.baseUrl;
const wssUrl = AiRealtimeService.deriveRealtimeUrl(baseUrl);
// SSRF check on the http(s) equivalent (ssrf-guard only allows http/https):
// wss→https, ws→http. Re-checked here, right before connecting, to close the
// DNS-rebinding window (same defense the external-MCP layer uses).
const httpEquivalent = wssUrl.replace(/^wss:/i, 'https:').replace(/^ws:/i, 'http:');
const check = await isUrlAllowed(httpEquivalent);
if (!check.ok) {
throw new Error(
`Realtime endpoint blocked by SSRF guard: ${check.reason ?? 'not allowed'}`,
);
}
const key = cfg.sttApiKey;
// Never log the key; only the (non-secret) URL is safe to log.
this.logger.log(`Opening realtime STT session for workspace ${workspaceId}`);
const ws = new WebSocket(wssUrl, {
headers: key ? { Authorization: `Bearer ${key}` } : {},
// DO NOT send OpenAI-Beta: realtime=v1 — removed in GA.
});
let closed = false;
let idleTimer: NodeJS.Timeout | undefined;
let maxTimer: NodeJS.Timeout | undefined;
const clearTimers = (): void => {
if (idleTimer) {
clearTimeout(idleTimer);
idleTimer = undefined;
}
if (maxTimer) {
clearTimeout(maxTimer);
maxTimer = undefined;
}
};
// Idempotent teardown: clears timers, force-closes the upstream, fires
// onClosed exactly once.
const teardown = (): void => {
if (closed) return;
closed = true;
clearTimers();
try {
if (
ws.readyState === WebSocket.OPEN ||
ws.readyState === WebSocket.CONNECTING
) {
ws.close();
}
} catch {
// Ignore close races; the socket is being discarded anyway.
}
opts.onClosed();
};
const failWith = (message: string): void => {
if (closed) return;
opts.onError(message);
teardown();
};
const resetIdleTimer = (): void => {
if (closed) return;
if (idleTimer) clearTimeout(idleTimer);
idleTimer = setTimeout(() => {
failWith(
`Realtime session idle for ${IDLE_TIMEOUT_MS}ms (no audio received); closing.`,
);
}, IDLE_TIMEOUT_MS);
};
// Hard lifetime cap, armed immediately so a never-opening or runaway session
// is always reclaimed.
maxTimer = setTimeout(() => {
failWith(
`Realtime session exceeded the maximum duration of ${MAX_SESSION_DURATION_MS}ms; closing.`,
);
}, MAX_SESSION_DURATION_MS);
// Also guard the handshake itself: if the upstream never opens / never sends,
// the idle timer (15s) reclaims it well before the 120s max-duration cap.
resetIdleTimer();
const acc = new Map<string, string>();
ws.on('open', () => {
if (closed) return;
// GA session.update: declare the transcription session, PCM16/24kHz mono
// input, server VAD auto-segmentation, the effective model and (optional)
// language. `language` is included only when the client supplied one.
const transcription: { model: string; language?: string } = { model };
if (opts.language) transcription.language = opts.language;
const sessionUpdate = {
type: 'session.update',
session: {
type: 'transcription',
audio: {
input: {
format: { type: 'audio/pcm', rate: 24000 },
turn_detection: { type: 'server_vad' },
transcription,
},
},
},
};
try {
ws.send(JSON.stringify(sessionUpdate));
} catch (err) {
this.logger.error('Failed to send realtime session.update', err as Error);
failWith(describeProviderError(err, 'Failed to start realtime session'));
return;
}
// Start the idle clock once the upstream is live.
resetIdleTimer();
});
ws.on('message', (data: WebSocket.RawData) => {
if (closed) return;
const raw = AiRealtimeService.rawDataToString(data);
const parsed = parseUpstreamEvent(raw, acc);
switch (parsed.type) {
case 'ready':
opts.onReady();
break;
case 'interim':
opts.onInterim(parsed.itemId!, parsed.text ?? '');
break;
case 'final':
opts.onFinal(parsed.itemId!, parsed.text ?? '');
break;
case 'error':
// Log the full upstream error then surface the concrete reason.
this.logger.error(`Realtime upstream error: ${parsed.message}`);
failWith(parsed.message ?? 'Realtime transcription error');
break;
case 'ignore':
default:
break;
}
});
ws.on('error', (err: Error) => {
// Log the full error (name/message/stack); never the key/audio.
this.logger.error('Realtime upstream socket error', err);
failWith(describeProviderError(err, 'Realtime upstream connection error'));
});
ws.on('close', (code: number, reason: Buffer) => {
if (closed) return;
const why = reason?.toString?.() || '';
// An unexpected close (not via stop()/teardown) is reported as a concrete
// reason; onClosed always fires via teardown.
this.logger.log(
`Realtime upstream closed (code ${code}${why ? `: ${why}` : ''})`,
);
if (code !== 1000) {
failWith(
`Realtime upstream closed (code ${code}${why ? `: ${why}` : ''}).`,
);
return;
}
teardown();
});
return {
appendAudio: (chunk: Buffer | Uint8Array): void => {
if (closed || ws.readyState !== WebSocket.OPEN) return;
const audio = Buffer.from(chunk).toString('base64');
try {
ws.send(JSON.stringify({ type: 'input_audio_buffer.append', audio }));
} catch (err) {
this.logger.error('Failed to forward realtime audio chunk', err as Error);
failWith(describeProviderError(err, 'Failed to forward audio'));
return;
}
// Audio flowing again → push the idle deadline out.
resetIdleTimer();
},
stop: (): void => {
// Graceful stop: with server_vad no manual commit is required, but an
// explicit commit flushes any buffered tail before we close.
if (!closed && ws.readyState === WebSocket.OPEN) {
try {
ws.send(JSON.stringify({ type: 'input_audio_buffer.commit' }));
} catch (err) {
// A failed commit is non-fatal; we still close gracefully below.
this.logger.error('Failed to commit realtime audio buffer', err as Error);
}
}
teardown();
},
close: (): void => {
teardown();
},
};
}
/**
* Admin "test connection" probe for the realtime STT upstream. Reuses
* openSession so the real config-resolution, SSRF check and handshake path are
* exercised, then tears the upstream socket down immediately — no audio is ever
* sent. Resolves to the FROZEN contract { ok: true } | { ok: false, error }.
*
* Resolution rules (settle exactly once, guarded by `settled`):
* - first onReady → { ok: true }
* - first onError(message) → { ok: false, error: message }
* - ~8s timeout → { ok: false, error: 'Realtime connection timed out' }
* - openSession(...) throws → { ok: false, error } (AiSttNotConfigured message,
* else describeProviderError)
*
* On any outcome the upstream handle is closed and the timer cleared exactly
* once, so this never leaves a socket open. The API key is never logged.
*/
async testConnection(
workspaceId: string,
): Promise<{ ok: true } | { ok: false; error: string }> {
return new Promise<{ ok: true } | { ok: false; error: string }>(
(resolve) => {
let settled = false;
let handle: RealtimeSessionHandle | undefined;
let timer: NodeJS.Timeout | undefined;
// Settle once: clear the timer, close the upstream handle, resolve.
const finish = (
result: { ok: true } | { ok: false; error: string },
): void => {
if (settled) return;
settled = true;
if (timer) {
clearTimeout(timer);
timer = undefined;
}
try {
handle?.close();
} catch {
// Ignore close races; the socket is being discarded anyway.
}
resolve(result);
};
// Arm the timeout before opening so a never-readying upstream is reclaimed.
timer = setTimeout(() => {
finish({ ok: false, error: 'Realtime connection timed out' });
}, TEST_CONNECTION_TIMEOUT_MS);
this.openSession(workspaceId, {
onReady: () => finish({ ok: true }),
onError: (message) => finish({ ok: false, error: message }),
// No audio is ever sent; these are no-ops for the probe.
onInterim: () => {},
onFinal: () => {},
onClosed: () => {},
})
.then((opened) => {
handle = opened;
// openSession may have already errored/closed synchronously before
// we stored the handle; if we've settled, close it now.
if (settled) {
try {
handle.close();
} catch {
// Ignore close races.
}
}
})
.catch((err: unknown) => {
// openSession threw (AiSttNotConfiguredException or SSRF/Error)
// before any socket was returned: surface a concrete reason.
const error =
err instanceof AiSttNotConfiguredException
? err.message
: describeProviderError(err, 'Realtime connection failed');
finish({ ok: false, error });
});
},
);
}
/**
* Derive the upstream realtime WSS URL from the (optional) effective base URL.
*
* - No base URL → OpenAI default
* `wss://api.openai.com/v1/realtime?intent=transcription`.
* - Otherwise: take the base origin, ensure exactly one
* `/v1/realtime?intent=transcription` path, and upgrade the scheme to wss
* (http→ws→wss; https→wss). A base that already ends in `/v1` (or
* `/v1/realtime`) does not get a duplicated `/v1`.
*/
static deriveRealtimeUrl(baseUrl?: string): string {
if (!baseUrl || !baseUrl.trim()) {
return 'wss://api.openai.com/v1/realtime?intent=transcription';
}
let parsed: URL;
try {
parsed = new URL(baseUrl.trim());
} catch {
// Unparseable base: fall back to the OpenAI default rather than throwing
// here; the SSRF check on the default still applies downstream.
return 'wss://api.openai.com/v1/realtime?intent=transcription';
}
// Normalize the path: strip a trailing slash, drop an existing
// `/realtime` suffix, ensure a single `/v1`, then append the realtime path.
let path = parsed.pathname.replace(/\/+$/, '');
path = path.replace(/\/realtime$/i, '');
if (!/\/v1$/i.test(path)) {
path = `${path}/v1`;
}
path = `${path}/realtime`;
// Scheme → wss (secure) / ws (insecure). The SSRF guard runs on the
// http(s) equivalent before connecting.
const scheme = parsed.protocol === 'http:' || parsed.protocol === 'ws:' ? 'ws' : 'wss';
return `${scheme}://${parsed.host}${path}?intent=transcription`;
}
/** Normalize a ws RawData payload (Buffer | ArrayBuffer | Buffer[]) to a string. */
private static rawDataToString(data: WebSocket.RawData): string {
if (typeof data === 'string') return data;
if (Buffer.isBuffer(data)) return data.toString('utf8');
if (Array.isArray(data)) return Buffer.concat(data).toString('utf8');
// ArrayBuffer
return Buffer.from(data as ArrayBuffer).toString('utf8');
}
}

View File

@@ -55,6 +55,10 @@ export class UpdateWorkspaceDto extends PartialType(CreateWorkspaceDto) {
@IsBoolean()
aiDictation: boolean;
@IsOptional()
@IsBoolean()
aiDictationRealtime: boolean;
// Workspace master toggle that enables/disables the HTML embed block type.
// Persisted at settings.htmlEmbed. ABSENT/false => OFF (default). The block
// itself renders in a sandboxed iframe, so this is a feature switch, not a

View File

@@ -511,6 +511,20 @@ export class WorkspaceService {
);
}
if (typeof updateWorkspaceDto.aiDictationRealtime !== 'undefined') {
const prev = settingsBefore?.ai?.dictationRealtime ?? false;
if (prev !== updateWorkspaceDto.aiDictationRealtime) {
before.aiDictationRealtime = prev;
after.aiDictationRealtime = updateWorkspaceDto.aiDictationRealtime;
}
await this.workspaceRepo.updateAiSettings(
workspaceId,
'dictationRealtime',
updateWorkspaceDto.aiDictationRealtime,
trx,
);
}
if (typeof updateWorkspaceDto.htmlEmbed !== 'undefined') {
const prev = settingsBefore?.htmlEmbed ?? false;
if (prev !== updateWorkspaceDto.htmlEmbed) {
@@ -564,6 +578,7 @@ export class WorkspaceService {
delete updateWorkspaceDto.allowMemberTemplates;
delete updateWorkspaceDto.aiChat;
delete updateWorkspaceDto.aiDictation;
delete updateWorkspaceDto.aiDictationRealtime;
delete updateWorkspaceDto.htmlEmbed;
delete updateWorkspaceDto.trackerHead;
delete updateWorkspaceDto.aiPublicShareAssistant;