/** * Socket.io Service * Real-time communication for messaging, presence, and calls * Ported from AeThex-Connect */ import { Server as HttpServer } from "http"; import { Server, Socket } from "socket.io"; import { supabase } from "./supabase.js"; interface UserSocket extends Socket { userId?: string; userEmail?: string; voiceChannel?: string; } class SocketService { private io: Server | null = null; private userSockets: Map> = new Map(); // userId -> Set of socket IDs /** * Initialize Socket.io server */ initialize(httpServer: HttpServer): Server { this.io = new Server(httpServer, { cors: { origin: process.env.FRONTEND_URL || "http://localhost:5000", credentials: true }, path: "/socket.io" }); // Authentication middleware this.io.use(async (socket: UserSocket, next) => { try { // Get session from handshake (cookie or auth header) const sessionId = socket.handshake.auth.sessionId; const userId = socket.handshake.auth.userId; if (!userId) { return next(new Error("Authentication required")); } // Verify user exists const { data: user, error } = await supabase .from("users") .select("id, email") .eq("id", userId) .single(); if (error || !user) { return next(new Error("User not found")); } socket.userId = user.id; socket.userEmail = user.email; next(); } catch (error) { next(new Error("Authentication failed")); } }); // Connection handler this.io.on("connection", (socket: UserSocket) => { this.handleConnection(socket); }); console.log("✓ Socket.io initialized"); return this.io; } /** * Handle new socket connection */ private handleConnection(socket: UserSocket) { const userId = socket.userId!; console.log(`[Socket] User connected: ${userId}`); // Track user's socket if (!this.userSockets.has(userId)) { this.userSockets.set(userId, new Set()); } this.userSockets.get(userId)!.add(socket.id); // Update user status to online this.updateUserStatus(userId, "online"); // Join user's personal room (for direct notifications) socket.join(`user:${userId}`); // Join all conversations user is part of this.joinUserConversations(socket, userId); // ========== MESSAGE EVENTS ========== socket.on("join_conversation", (data: { conversationId: string }) => { socket.join(data.conversationId); console.log(`[Socket] User ${userId} joined conversation ${data.conversationId}`); }); socket.on("leave_conversation", (data: { conversationId: string }) => { socket.leave(data.conversationId); console.log(`[Socket] User ${userId} left conversation ${data.conversationId}`); }); socket.on("typing_start", (data: { conversationId: string }) => { socket.to(data.conversationId).emit("user_typing", { conversationId: data.conversationId, userId, userEmail: socket.userEmail }); }); socket.on("typing_stop", (data: { conversationId: string }) => { socket.to(data.conversationId).emit("user_stopped_typing", { conversationId: data.conversationId, userId }); }); // ========== CALL SIGNALING EVENTS ========== socket.on("call:offer", (data: { callId: string; targetUserId: string; offer: any }) => { console.log(`[Socket] Call offer from ${userId} to ${data.targetUserId}`); this.io?.to(`user:${data.targetUserId}`).emit("call:offer", { callId: data.callId, fromUserId: userId, offer: data.offer }); }); socket.on("call:answer", (data: { callId: string; targetUserId: string; answer: any }) => { console.log(`[Socket] Call answer from ${userId} to ${data.targetUserId}`); this.io?.to(`user:${data.targetUserId}`).emit("call:answer", { callId: data.callId, fromUserId: userId, answer: data.answer }); }); socket.on("call:ice-candidate", (data: { callId: string; targetUserId: string; candidate: any }) => { this.io?.to(`user:${data.targetUserId}`).emit("call:ice-candidate", { callId: data.callId, fromUserId: userId, candidate: data.candidate }); }); socket.on("call:hangup", (data: { callId: string; targetUserId: string }) => { this.io?.to(`user:${data.targetUserId}`).emit("call:ended", { callId: data.callId, endedBy: userId, reason: "hangup" }); }); // ========== DISCONNECT ========== socket.on("disconnect", () => { this.handleDisconnect(socket, userId); }); } /** * Join all conversations user is part of */ private async joinUserConversations(socket: UserSocket, userId: string) { try { const { data, error } = await supabase .from("conversation_participants") .select("conversation_id") .eq("user_id", userId); if (error) throw error; (data || []).forEach((row) => { socket.join(row.conversation_id); }); console.log(`[Socket] User ${userId} joined ${data?.length || 0} conversations`); } catch (error) { console.error("[Socket] Error joining conversations:", error); } } /** * Handle disconnect */ private handleDisconnect(socket: UserSocket, userId: string) { console.log(`[Socket] User disconnected: ${userId}`); // Remove socket from tracking if (this.userSockets.has(userId)) { this.userSockets.get(userId)!.delete(socket.id); // If no more sockets for this user, mark as offline if (this.userSockets.get(userId)!.size === 0) { this.userSockets.delete(userId); this.updateUserStatus(userId, "offline"); } } } /** * Update user status in database */ private async updateUserStatus(userId: string, status: "online" | "offline" | "away") { try { await supabase .from("users") .update({ status, last_seen_at: new Date().toISOString() }) .eq("id", userId); // Broadcast status change to user's contacts this.broadcastStatusChange(userId, status); } catch (error) { console.error("[Socket] Error updating user status:", error); } } /** * Broadcast status change to user's contacts */ private async broadcastStatusChange(userId: string, status: string) { try { // Get all users who have conversations with this user const { data } = await supabase .from("conversation_participants") .select("conversation_id") .eq("user_id", userId); if (!data) return; const conversationIds = data.map((d) => d.conversation_id); // Get other participants from these conversations const { data: participants } = await supabase .from("conversation_participants") .select("user_id") .in("conversation_id", conversationIds) .neq("user_id", userId); // Emit to each unique contact const notifiedUsers = new Set(); (participants || []).forEach((p) => { if (!notifiedUsers.has(p.user_id)) { notifiedUsers.add(p.user_id); this.io?.to(`user:${p.user_id}`).emit("user_status_changed", { userId, status }); } }); } catch (error) { console.error("[Socket] Error broadcasting status change:", error); } } // ========== PUBLIC METHODS (called from REST API) ========== /** * Send message to conversation (called after REST API saves message) */ sendMessage(conversationId: string, message: any) { this.io?.to(conversationId).emit("new_message", message); } /** * Notify user of new conversation */ notifyNewConversation(userId: string, conversation: any) { this.io?.to(`user:${userId}`).emit("new_conversation", conversation); } /** * Notify users of incoming call */ notifyIncomingCall(userId: string, callData: any) { this.io?.to(`user:${userId}`).emit("call:incoming", callData); } /** * Notify all participants that call ended */ notifyCallEnded(participantIds: string[], callId: string, reason: string, endedBy: string) { participantIds.forEach((userId) => { this.io?.to(`user:${userId}`).emit("call:ended", { callId, reason, endedBy }); }); } /** * Get online users */ getOnlineUsers(): string[] { return Array.from(this.userSockets.keys()); } /** * Check if user is online */ isUserOnline(userId: string): boolean { return this.userSockets.has(userId); } /** * Get the io instance */ getIO(): Server | null { return this.io; } } // Export singleton instance export const socketService = new SocketService();