import { WebSocketGateway, WebSocketServer, SubscribeMessage, MessageBody, ConnectedSocket, OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect, } from '@nestjs/websockets'; import { Logger } from '@nestjs/common'; import { Server, Socket } from 'socket.io'; import { OrchestratorService } from './services/orchestrator.service'; import { FaceAuthService } from '../face-auth/face-auth.service'; import { CandidateService } from '../candidate/candidate.service'; /** * InterviewGateway — real-time WebSocket entry point for AI interviews. * * Handles: * - Room joining (linking a socket to a session) * - Streaming audio chunks → STT (real-time via WebSocket) * - VAD events are emitted automatically by the STT provider * - TTS audio chunks are streamed back as they are generated * - Manual end-of-speech fallback for providers without VAD * - Face verification (single frame from webcam) */ @WebSocketGateway({ cors: { origin: '*' }, namespace: '/interview', }) export class InterviewGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server: Server; private readonly logger = new Logger(InterviewGateway.name); /** Map socket → sessionId for cleanup */ private socketSessions: Map = new Map(); constructor( private readonly orchestrator: OrchestratorService, private readonly faceAuth: FaceAuthService, private readonly candidateService: CandidateService, ) { } afterInit() { this.logger.log('Interview WebSocket Gateway initialized'); } handleConnection(client: Socket) { this.logger.log(`Client connected: ${client.id}`); } handleDisconnect(client: Socket) { const sessionId = this.socketSessions.get(client.id); if (sessionId) { this.logger.log( `Client ${client.id} disconnected — cleaning up session ${sessionId}`, ); this.orchestrator.endSession(sessionId).catch((err) => { this.logger.error(`Cleanup error: ${err}`); }); this.socketSessions.delete(client.id); } } // ──────────────── Socket Events ──────────────── /** * Client joins an interview session room. * Initializes the streaming STT + TTS connections for this session. * * Payload: { candidateId: string } */ @SubscribeMessage('join-room') async handleJoinRoom( @ConnectedSocket() client: Socket, @MessageBody() data: { candidateId: string }, ) { try { const session = await this.orchestrator.createSession(data.candidateId); const sessionId = session._id.toString(); client.join(sessionId); this.socketSessions.set(client.id, sessionId); // Initialize streaming with the socket directly await this.orchestrator.initStreaming(sessionId, client); this.logger.log( `Client ${client.id} joined session ${sessionId} — streaming ready`, ); client.emit('session-created', { sessionId, stage: session.currentStage, }); } catch (err) { client.emit('error', { message: `Failed to join room: ${err}` }); } } /** * Receive a base64-encoded audio chunk from the client's microphone. * Piped directly to the STT streaming WebSocket (no buffering). * * Payload: { audio: string } (base64) or Buffer (binary array from Socket.io) */ @SubscribeMessage('audio-chunk') handleAudioChunk( @ConnectedSocket() client: Socket, @MessageBody() data: any, ) { const sessionId = this.socketSessions.get(client.id); if (!sessionId) { client.emit('error', { message: 'Not in a session' }); return; } let audioBase64: string; if (Buffer.isBuffer(data)) { // It's a binary buffer from Socket.io audioBase64 = data.toString('base64'); } else if (data instanceof ArrayBuffer) { audioBase64 = Buffer.from(data).toString('base64'); } else if (typeof data === 'string') { audioBase64 = data; } else if (data && data.audio) { audioBase64 = data.audio; } else { this.logger.warn(`Unknown audio chunk format received: ${typeof data}`); return; } // Print dot to let us know chunks are flowing without spamming logs process.stdout.write('.'); this.orchestrator.streamAudioChunk(sessionId, audioBase64); } /** * Manual end-of-speech signal (fallback for providers without VAD). * Triggers the STT → Brain → TTS pipeline with accumulated transcript. */ @SubscribeMessage('end-of-speech') async handleEndOfSpeech(@ConnectedSocket() client: Socket) { const sessionId = this.socketSessions.get(client.id); if (!sessionId) return; try { await this.orchestrator.triggerPipeline(sessionId); } catch (err) { this.logger.error(`Pipeline error: ${err}`); client.emit('error', { message: 'Pipeline processing failed' }); client.emit('ai-state', { state: 'listening' }); } } /** * Receive a video frame (JPEG buffer) for face verification. * Payload: { candidateId: string, frame: Buffer } */ @SubscribeMessage('face-verify') async handleFaceVerify( @ConnectedSocket() client: Socket, @MessageBody() data: { candidateId: string; frame: Buffer }, ) { try { const candidate = await this.candidateService.findById(data.candidateId); // Extract descriptor from the incoming frame const frameBuffer = Buffer.from(data.frame); const incomingDescriptor = await this.faceAuth.extractDescriptorFromBuffer(frameBuffer); if (!incomingDescriptor) { client.emit('face-result', { verified: false, message: 'No face detected in frame', }); return; } // If no stored descriptor, save this one as reference if ( !candidate.faceDescriptor || candidate.faceDescriptor.length === 0 ) { candidate.faceDescriptor = Array.from(incomingDescriptor); candidate.captureFaceOnCall = false; await candidate.save(); client.emit('face-result', { verified: true, message: 'Reference face captured and saved', }); return; } // Compare with stored descriptor const result = this.faceAuth.verifyFace( incomingDescriptor, candidate.faceDescriptor, ); // Update session verification status const sessionId = this.socketSessions.get(client.id); if (sessionId) { const session = await this.orchestrator.getSession(sessionId); session.faceVerified = result.match; await session.save(); } client.emit('face-result', { verified: result.match, distance: result.distance, message: result.match ? 'Face verified successfully' : 'Face mismatch — flagged in report', }); } catch (err) { this.logger.error(`Face verify error: ${err}`); client.emit('face-result', { verified: false, message: `Verification error: ${err}`, }); } } /** * End the interview session — closes streaming connections. */ @SubscribeMessage('end-interview') async handleEndInterview(@ConnectedSocket() client: Socket) { const sessionId = this.socketSessions.get(client.id); if (!sessionId) return; await this.orchestrator.endSession(sessionId); client.emit('interview-ended', { sessionId }); this.socketSessions.delete(client.id); } }