import { BadRequestException, Body, ConflictException, Controller, ForbiddenException, HttpCode, HttpException, HttpStatus, Logger, Post, Req, Res, ServiceUnavailableException, UseGuards, UseInterceptors, } from '@nestjs/common'; import { Throttle } from '@nestjs/throttler'; import { FastifyReply, FastifyRequest } from 'fastify'; import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard'; import { AuthUser } from '../../common/decorators/auth-user.decorator'; import { AuthWorkspace } from '../../common/decorators/auth-workspace.decorator'; import { SkipTransform } from '../../common/decorators/skip-transform.decorator'; import { AiChat, AiChatMessage, AiChatRun, User, Workspace, } from '@docmost/db/types/entity.types'; import { PaginationOptions } from '@docmost/db/pagination/pagination-options'; import { AiChatRepo } from '@docmost/db/repos/ai-chat/ai-chat.repo'; import { AiChatMessageRepo } from '@docmost/db/repos/ai-chat/ai-chat-message.repo'; import { UserThrottlerGuard } from '../../integrations/throttle/user-throttler.guard'; import { AI_CHAT_THROTTLER } from '../../integrations/throttle/throttler-names'; import { FileInterceptor } from '../../common/interceptors/file.interceptor'; import { AiChatRunHooks, AiChatService, AiChatStreamBody, } from './ai-chat.service'; import { AiChatRunService } from './ai-chat-run.service'; import { AiTranscriptionService } from './ai-transcription.service'; import { BoundChatDto, ChatIdDto, ExportChatDto, GeneratePageTitleDto, GetChatMessagesDto, GetRunDto, RenameChatDto, StopRunDto, } from './dto/ai-chat.dto'; import { describeProviderError } from '../../integrations/ai/ai-error.util'; import { buildChatMarkdown } from './chat-markdown.util'; /** * Per-user AI chat API (§6.1). Routes are POST to match this codebase's * convention (it uses POST for reads too). Everything is workspace-scoped and * limited to chats the requesting user created. */ @UseGuards(JwtAuthGuard) @Controller('ai-chat') export class AiChatController { private readonly logger = new Logger(AiChatController.name); constructor( private readonly aiChatService: AiChatService, private readonly aiChatRunService: AiChatRunService, private readonly aiChatRepo: AiChatRepo, private readonly aiChatMessageRepo: AiChatMessageRepo, private readonly aiTranscription: AiTranscriptionService, ) {} /** List the requesting user's chats in this workspace (paginated). */ @HttpCode(HttpStatus.OK) @Post('chats') async listChats( @Body() pagination: PaginationOptions, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ) { return this.aiChatRepo.findByCreator(user.id, workspace.id, pagination); } /** * Resolve the chat bound to a document for the requesting user: the most-recent * non-deleted chat created on that page (ai_chats.page_id). Returns * { chatId: null } when the page has no owned chat (-> a fresh chat). No page * access check needed: only the caller's OWN chats are matched, so a foreign * pageId reveals nothing. */ @HttpCode(HttpStatus.OK) @Post('bound-chat') async boundChat( @Body() dto: BoundChatDto, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ): Promise<{ chatId: string | null }> { const chat = await this.aiChatRepo.findLatestByPage( user.id, workspace.id, dto.pageId, ); return { chatId: chat?.id ?? null }; } /** Fetch the messages of a chat (oldest first, paginated). */ @HttpCode(HttpStatus.OK) @Post('messages') async getMessages( @Body() dto: GetChatMessagesDto, @Body() pagination: PaginationOptions, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ) { await this.assertOwnedChat(dto.chatId, user, workspace); return this.aiChatMessageRepo.findByChat( dto.chatId, workspace.id, pagination, ); } /** * Export a chat to Markdown (#183). The DB is the single source of truth: the * whole transcript is loaded (oldest -> newest) and rendered server-side. Now * that the assistant row is persisted upfront and per step, an interrupted * turn is included up to its last finished step. Workspace-scoped and owner- * gated via assertOwnedChat (same as the other read endpoints). Returns * `{ markdown }`. `lang` localizes the few fixed labels (default English). */ @HttpCode(HttpStatus.OK) @Post('export') async export( @Body() dto: ExportChatDto, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ): Promise<{ markdown: string }> { const chat = await this.assertOwnedChat(dto.chatId, user, workspace); const rows = await this.aiChatMessageRepo.findAllByChat( dto.chatId, workspace.id, ); const markdown = buildChatMarkdown({ title: chat.title ?? null, chatId: dto.chatId, rows, // normalizeLang(undefined) already yields 'en', so no `?? 'en'` is needed. lang: dto.lang, }); return { markdown }; } /** * Reconnect to the latest run of a chat (#184 phase 1). Returns the run's * persisted lifecycle state ({ status, error, stepCount, timings, ... }) plus * the assistant message it projects (the partial/final output) — the DB is the * source of truth, so this works for an in-flight run (the browser dropped, the * run kept going) and a finished one alike. Owner-gated via assertOwnedChat. * `{ run: null }` when the chat has never had a run. */ @HttpCode(HttpStatus.OK) @Post('run') async getRun( @Body() dto: GetRunDto, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ): Promise<{ run: AiChatRun | null; message: AiChatMessage | null }> { await this.assertOwnedChat(dto.chatId, user, workspace); const run = await this.aiChatRunService.getLatestForChat( dto.chatId, workspace.id, ); if (!run) return { run: null, message: null }; const message = run.assistantMessageId ? await this.aiChatMessageRepo.findById( run.assistantMessageId, workspace.id, ) : undefined; return { run, message: message ?? null }; } /** * Explicitly STOP an agent run (#184 phase 1) — the user pressed Stop. This is * the ONLY thing that ends a detached run; a browser disconnect deliberately * does not. Target by `runId` (from the streamed start metadata) or by `chatId` * (stop whatever run is active on it). Owner-gated. Returns * `{ stopped }` — false when there was nothing active to stop. */ @HttpCode(HttpStatus.OK) @Post('stop') async stopRun( @Body() dto: StopRunDto, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ): Promise<{ stopped: boolean }> { let runId = dto.runId; if (!runId && !dto.chatId) { throw new BadRequestException('runId or chatId is required'); } if (runId) { // Resolve the run to its chat and owner-gate via that chat. const run = await this.aiChatRunService.getRun(runId, workspace.id); if (!run) return { stopped: false }; await this.assertOwnedChat(run.chatId, user, workspace); } else { await this.assertOwnedChat(dto.chatId!, user, workspace); const active = await this.aiChatRunService.getActiveForChat( dto.chatId!, workspace.id, ); if (!active) return { stopped: false }; runId = active.id; } const stopped = await this.aiChatRunService.requestStop( runId, workspace.id, ); return { stopped }; } /** Rename a chat. */ @HttpCode(HttpStatus.OK) @Post('rename') async rename( @Body() dto: RenameChatDto, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ) { await this.assertOwnedChat(dto.chatId, user, workspace); await this.aiChatRepo.update( dto.chatId, { title: dto.title }, workspace.id, ); return { success: true }; } /** Soft-delete a chat. */ @HttpCode(HttpStatus.OK) @Post('delete') async remove( @Body() dto: ChatIdDto, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ) { await this.assertOwnedChat(dto.chatId, user, workspace); await this.aiChatRepo.softDelete(dto.chatId, workspace.id); return { success: true }; } /** * Stream an agent turn. The useChat payload is read straight off `req.body` * (binding a strict DTO would let the global ValidationPipe whitelist strip * useChat fields). * * Ordering matters: feature gating (A7) and model resolution happen BEFORE * `res.hijack()`, so a disabled feature (403) or an unconfigured provider * (503) returns clean JSON. Only once we are committed to streaming do we * hijack and hand off to the service. */ @SkipTransform() @UseGuards(JwtAuthGuard, UserThrottlerGuard) @Throttle({ [AI_CHAT_THROTTLER]: { limit: 25, ttl: 60000 } }) @Post('stream') async stream( @Req() req: FastifyRequest, @Res() res: FastifyReply, @AuthUser() user: User, @AuthWorkspace() workspace: Workspace, ): Promise { // A7 gate: the workspace must have AI chat explicitly enabled. const settings = (workspace.settings ?? {}) as { ai?: { chat?: boolean; autonomousRuns?: boolean }; }; if (settings.ai?.chat !== true) { throw new ForbiddenException('AI chat is disabled'); } // #184 phase 1 flag: when ON, the turn becomes a detached, durable RUN — its // lifecycle is tracked in ai_chat_runs, a browser disconnect no longer aborts // it, and only an explicit /ai-chat/stop ends it. When OFF (the default) the // turn is socket-bound exactly as before, so existing deployments are // unaffected. const autonomousRuns = settings.ai?.autonomousRuns === true; const sessionId = (req.raw as { sessionId?: string }).sessionId; if (!sessionId) { // The chat requires an interactive session to mint loopback tokens // (§15[C1]); Bearer/API-key requests without a session are rejected. throw new ForbiddenException('AI chat requires an interactive session'); } const body = (req.body ?? {}) as AiChatStreamBody; // Resolve the agent role for this turn BEFORE hijack: existing chats read it // from ai_chats.role_id (authoritative), a new chat from body.roleId. The // role drives both the persona and the optional model override below. const role = await this.aiChatService.resolveRoleForRequest( workspace, body, ); // Resolve the model (applying the role's optional override) BEFORE hijack so // an unconfigured provider — including a role pointing at an unconfigured // driver — returns a clean JSON 503 (AiNotConfiguredException is a 503 // HttpException) instead of breaking mid-stream. const model = await this.aiChatService.getChatModel(workspace.id, role); // #184: one active run per chat. For an EXISTING chat reject a concurrent // start with a clean 409 BEFORE hijack (the common double-submit / second-tab // case), so the user gets JSON, not a mid-stream error. A brand-new chat // (no chatId) cannot have a prior run, and the DB partial unique index is the // backstop against any race that slips past this check. if (autonomousRuns && body.chatId) { const active = await this.aiChatRunService.getActiveForChat( body.chatId, workspace.id, ); if (active) { throw new ConflictException({ message: 'An agent run is already in progress for this chat', code: 'A_RUN_ALREADY_ACTIVE', }); } } // Run-lifecycle hooks (#184), only when the flag is on. They wrap the turn in // a durable run whose abort is governed by the run (explicit stop), persist // its progress, and settle its terminal status — see AiChatRunService. const runHooks: AiChatRunHooks | undefined = autonomousRuns ? { begin: (chatId) => this.aiChatRunService.beginRun({ chatId, workspaceId: workspace.id, userId: user.id, trigger: 'user', }), onAssistantSeeded: (runId, messageId) => this.aiChatRunService.linkAssistantMessage( runId, workspace.id, messageId, ), onStep: (runId, stepCount) => void this.aiChatRunService.recordStep( runId, workspace.id, stepCount, ), onSettled: (runId, status, error) => this.aiChatRunService.finalizeRun( runId, workspace.id, status, error, ), } : undefined; // Abort the agent loop when the client disconnects. `close` also fires on // normal completion, so only abort when the response has not finished // writing (a genuine disconnect). `once` fires at most once and self-removes; // we also drop it on response `finish` so it never lingers after the stream // completes normally (the AI SDK pipes the response fire-and-forget, so we // cannot simply remove it once `stream()` returns). // DIAGNOSTIC (Safari stream-drop investigation) — temporary: wall-clock at // which a Safari disconnect is observed, measured from request receipt. const reqStartedAt = Date.now(); const controller = new AbortController(); const onClose = (): void => { // A genuine disconnect leaves the response unfinished (unlike a normal // completion, which also fires `close`). Such a drop — e.g. a reverse // proxy cutting the SSE mid-answer — is otherwise invisible server-side, // so log it here. if (!res.raw.writableEnded) { if (autonomousRuns) { // #184: the turn is a DETACHED run. A disconnect must NOT abort it — // the run keeps executing and persisting server-side; the client // reconnects via /ai-chat/run (or re-stops via /ai-chat/stop). Log only. this.logger.log( `AI chat stream: client disconnected; run continues server-side ` + `(elapsed=${Date.now() - reqStartedAt}ms since request received)`, ); } else { this.logger.warn( `AI chat stream: client disconnected before completion; aborting turn ` + `(elapsed=${Date.now() - reqStartedAt}ms since request received)`, ); controller.abort(); } } }; req.raw.once('close', onClose); res.raw.once('finish', () => req.raw.off('close', onClose)); // #184: in detached mode the turn is NOT aborted on disconnect, so the SDK's // pipe keeps writing to a socket the client may have dropped — for the rest of // the (continuing) run. A write to the dead socket can emit an 'error' on the // raw response; without a listener that surfaces as an unhandled error event. // Swallow it (the run continues server-side regardless). Legacy mode aborts on // disconnect, so it does not need this and keeps its exact prior behavior. if (autonomousRuns) { res.raw.on('error', (err) => { this.logger.debug( `AI chat detached stream: post-disconnect socket error swallowed: ${ err instanceof Error ? err.message : String(err) }`, ); }); } // Commit to streaming: hijack so Fastify stops managing the response and // the AI SDK can write the UI-message stream directly to the Node socket. res.hijack(); try { await this.aiChatService.stream({ user, workspace, sessionId, body, res, signal: controller.signal, model, role, // #184: present only when the flag is on; wraps the turn in a durable run. runHooks, }); } catch (err) { // Any failure AFTER hijack can no longer go through Nest's exception // filter, so emit the error on the raw socket if nothing has been written // yet. The lost-the-race 409 (RunAlreadyActiveError -> ConflictException) // is raised by stream() BEFORE it writes a byte, so headers are still // unsent here: honor the HttpException's real status + body (a clean 409), // not a blanket 500. Everything else stays a 500. const isHttp = err instanceof HttpException; if (!isHttp) { this.logger.error('AI chat stream failed', err as Error); } if (!res.raw.headersSent) { const status = isHttp ? err.getStatus() : 500; const payload = isHttp ? err.getResponse() : { error: 'Internal server error' }; res.raw.statusCode = status; res.raw.setHeader('Content-Type', 'application/json'); res.raw.end( JSON.stringify( typeof payload === 'string' ? { message: payload } : payload, ), ); } else if (!res.raw.writableEnded) { res.raw.end(); } } } /** * Transcribe an uploaded audio clip to text using the workspace STT model. * Gated by settings.ai.dictation (403 when disabled). Returns { text }. */ @HttpCode(HttpStatus.OK) @UseGuards(JwtAuthGuard, UserThrottlerGuard) @Throttle({ [AI_CHAT_THROTTLER]: { limit: 20, ttl: 60000 } }) @Post('transcribe') @UseInterceptors(FileInterceptor) async transcribe( @Req() req: any, @AuthWorkspace() workspace: Workspace, ): Promise<{ text: string }> { // Gate: dictation must be explicitly enabled for the workspace. const settings = (workspace.settings ?? {}) as { ai?: { dictation?: boolean }; }; if (settings.ai?.dictation !== true) { throw new ForbiddenException('Dictation is disabled'); } let file = null; try { // Whisper hard-caps uploads at 25MB; allow a single file. file = await req.file({ limits: { fileSize: 25 * 1024 * 1024, files: 1 }, }); } catch (err: any) { if (err?.statusCode === 413) { throw new BadRequestException('Audio file too large (max 25MB)'); } throw err; } if (!file) throw new BadRequestException('No audio uploaded'); // Resolve + whitelist the upload's container type (MediaRecorder mimetypes // carry parameters, e.g. "audio/webm;codecs=opus"). A non-whitelisted type // is rejected; an allowed one yields the STT container-format hint. const resolved = resolveAudioFormat(file.mimetype); if (!resolved.ok) { throw new BadRequestException('Unsupported audio format'); } const { format } = resolved; let buf: Buffer; try { buf = await file.toBuffer(); } catch (err: any) { // With @fastify/multipart throwFileSizeLimit:true, the 25MB cap is enforced // when the stream is consumed (here), not at req.file(). if (err?.statusCode === 413) { throw new BadRequestException('Audio file too large (max 25MB)'); } throw err; } let text: string; try { text = await this.aiTranscription.transcribe(workspace.id, buf, format); } catch (err) { // Preserve meaningful HTTP errors (e.g. AiSttNotConfiguredException -> 503). if (err instanceof HttpException) throw err; // Log the full error and surface the real provider/transport reason instead // of an opaque 500 (e.g. "the STT endpoint returned 404 ..."). this.logger.error('AI transcription failed', err as Error); throw new ServiceUnavailableException(describeProviderError(err)); } return { text }; } /** * Generate a page title from supplied note content (#199). One-shot, * non-streaming. Gated by the AI chat flag (settings.ai.chat, the same toggle * that enables the chat agent); returns { title }. * The endpoint NEVER writes the page — the client applies the title via the * existing /pages/update route (which enforces edit permission), so access * checks are not duplicated here. Throttled per user via AI_CHAT_THROTTLER. */ @HttpCode(HttpStatus.OK) @UseGuards(JwtAuthGuard, UserThrottlerGuard) @Throttle({ [AI_CHAT_THROTTLER]: { limit: 20, ttl: 60000 } }) @Post('generate-page-title') async generatePageTitle( @Body() dto: GeneratePageTitleDto, @AuthWorkspace() workspace: Workspace, ): Promise<{ title: string }> { const settings = (workspace.settings ?? {}) as { ai?: { chat?: boolean }; }; if (settings.ai?.chat !== true) { throw new ForbiddenException('AI title generation is disabled'); } try { const title = await this.aiChatService.generatePageTitle( workspace.id, dto.content, ); return { title }; } catch (err) { // Preserve meaningful HTTP errors (e.g. AiNotConfiguredException -> 503). if (err instanceof HttpException) throw err; // Surface the real provider/transport reason instead of an opaque 500. this.logger.error('AI title generation failed', err as Error); throw new ServiceUnavailableException(describeProviderError(err)); } } /** * Ensure the chat exists, belongs to this workspace, AND was created by the * requesting user (per-user isolation). Throws ForbiddenException otherwise. */ private async assertOwnedChat( chatId: string, user: User, workspace: Workspace, ): Promise { const chat = await this.aiChatRepo.findById(chatId, workspace.id); if (!chat || chat.creatorId !== user.id) { throw new ForbiddenException(); } return chat; } } /** * Whitelist audio container types produced by browser MediaRecorder (Chrome/FF: * webm/opus, Safari: mp4) plus common STT-accepted formats. The value maps each * allowed base mime to the container-format hint passed to JSON-style STT * providers (e.g. OpenRouter); multipart endpoints ignore the hint. */ const AUDIO_FORMAT_MAP: Record = { 'audio/webm': 'webm', 'audio/ogg': 'ogg', 'audio/mp4': 'mp4', 'audio/mpeg': 'mp3', 'audio/wav': 'wav', 'audio/x-wav': 'wav', 'audio/wave': 'wav', 'audio/m4a': 'm4a', 'audio/x-m4a': 'm4a', }; /** * Resolve and whitelist an uploaded clip's mimetype. MediaRecorder mimetypes * carry parameters (e.g. "audio/webm;codecs=opus"), so the base type is split * out (lowercased, trimmed) before the whitelist check. Returns ok=false for a * non-whitelisted container; otherwise the base mime and its STT format hint. * Pure — the caller throws BadRequestException on !ok. */ export function resolveAudioFormat( mimetype: string, ): { ok: true; baseMime: string; format: string } | { ok: false } { const baseMime = mimetype.split(';')[0].trim().toLowerCase(); const format = AUDIO_FORMAT_MAP[baseMime]; if (format === undefined) { return { ok: false }; } return { ok: true, baseMime, format }; }