/** * Messaging Routes * API endpoints for conversations and messages * Ported from AeThex-Connect */ import { Router, Request, Response } from "express"; import { supabase } from "./supabase.js"; import { requireAuth } from "./auth.js"; import { socketService } from "./socket-service.js"; const router = Router(); // ========== USER SEARCH (must be before /:id routes) ========== /** * GET /api/conversations/users/search * Search for users to message */ router.get("/users/search", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const query = req.query.q as string; const limit = parseInt(req.query.limit as string) || 10; if (!userId) return res.status(401).json({ error: "Unauthorized" }); if (!query) return res.status(400).json({ error: "q parameter is required" }); // Search users by email (from profiles table which has email) const { data: users, error } = await supabase .from("profiles") .select("id, email, username, avatar_url") .or(`email.ilike.%${query}%,username.ilike.%${query}%`) .neq("id", userId) .limit(limit); if (error) throw error; res.json({ users: users || [] }); } catch (err: any) { console.error("[Messaging] Search users error:", err); res.status(500).json({ error: err.message }); } }); // ========== CONVERSATIONS ========== /** * GET /api/conversations * Get all conversations for the current user */ router.get("/", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; if (!userId) return res.status(401).json({ error: "Unauthorized" }); const limit = parseInt(req.query.limit as string) || 50; const offset = parseInt(req.query.offset as string) || 0; // Get conversations where user is a participant const { data: participantData, error: participantError } = await supabase .from("conversation_participants") .select("conversation_id") .eq("user_id", userId); if (participantError) throw participantError; const conversationIds = participantData?.map(p => p.conversation_id) || []; if (conversationIds.length === 0) { return res.json({ conversations: [] }); } // Get conversations with last message and unread count const { data: conversations, error: convError } = await supabase .from("conversations") .select(` id, type, title, description, avatar_url, is_archived, created_at, updated_at `) .in("id", conversationIds) .eq("is_archived", false) .order("updated_at", { ascending: false }) .range(offset, offset + limit - 1); if (convError) throw convError; // Enrich with participants and last message const enrichedConversations = await Promise.all( (conversations || []).map(async (conv) => { // Get other participants const { data: participants } = await supabase .from("conversation_participants") .select(` user_id, role, last_read_at, users:user_id ( id, email ) `) .eq("conversation_id", conv.id) .neq("user_id", userId); // Get last message const { data: lastMessages } = await supabase .from("messages") .select("id, content, sender_id, created_at") .eq("conversation_id", conv.id) .is("deleted_at", null) .order("created_at", { ascending: false }) .limit(1); // Get unread count const { data: participantInfo } = await supabase .from("conversation_participants") .select("last_read_at") .eq("conversation_id", conv.id) .eq("user_id", userId) .single(); const lastReadAt = participantInfo?.last_read_at || conv.created_at; const { count: unreadCount } = await supabase .from("messages") .select("*", { count: "exact", head: true }) .eq("conversation_id", conv.id) .neq("sender_id", userId) .gt("created_at", lastReadAt) .is("deleted_at", null); return { ...conv, otherParticipants: participants || [], lastMessage: lastMessages?.[0] || null, unreadCount: unreadCount || 0 }; }) ); res.json({ conversations: enrichedConversations }); } catch (err: any) { console.error("[Messaging] Get conversations error:", err); res.status(500).json({ error: err.message }); } }); /** * POST /api/conversations/direct * Get or create a direct conversation with another user */ router.post("/direct", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { targetUserId } = req.body; if (!userId) return res.status(401).json({ error: "Unauthorized" }); if (!targetUserId) return res.status(400).json({ error: "targetUserId is required" }); // Check if direct conversation exists const { data: existing } = await supabase.rpc("get_or_create_direct_conversation", { user1_id: userId, user2_id: targetUserId }); if (existing) { // Get the conversation details const { data: conversation, error } = await supabase .from("conversations") .select("*") .eq("id", existing) .single(); if (error) throw error; return res.json({ conversation }); } // Create new direct conversation const { data: newConv, error: convError } = await supabase .from("conversations") .insert({ type: "direct", created_by: userId }) .select() .single(); if (convError) throw convError; // Add both participants await supabase.from("conversation_participants").insert([ { conversation_id: newConv.id, user_id: userId, role: "member" }, { conversation_id: newConv.id, user_id: targetUserId, role: "member" } ]); res.json({ conversation: newConv }); } catch (err: any) { console.error("[Messaging] Create direct conversation error:", err); res.status(500).json({ error: err.message }); } }); /** * POST /api/conversations/group * Create a new group conversation */ router.post("/group", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { title, description, participantIds } = req.body; if (!userId) return res.status(401).json({ error: "Unauthorized" }); if (!title) return res.status(400).json({ error: "title is required" }); // Create group conversation const { data: conversation, error: convError } = await supabase .from("conversations") .insert({ type: "group", title, description, created_by: userId }) .select() .single(); if (convError) throw convError; // Add creator as admin await supabase.from("conversation_participants").insert({ conversation_id: conversation.id, user_id: userId, role: "admin" }); // Add other participants as members if (participantIds && participantIds.length > 0) { const otherParticipants = participantIds .filter((id: string) => id !== userId) .map((id: string) => ({ conversation_id: conversation.id, user_id: id, role: "member" })); if (otherParticipants.length > 0) { await supabase.from("conversation_participants").insert(otherParticipants); } } res.status(201).json({ conversation }); } catch (err: any) { console.error("[Messaging] Create group error:", err); res.status(500).json({ error: err.message }); } }); /** * GET /api/conversations/:id * Get conversation details */ router.get("/:id", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { id } = req.params; if (!userId) return res.status(401).json({ error: "Unauthorized" }); // Verify user is participant const { data: participant, error: participantError } = await supabase .from("conversation_participants") .select("*") .eq("conversation_id", id) .eq("user_id", userId) .single(); if (participantError || !participant) { return res.status(403).json({ error: "Access denied" }); } // Get conversation with participants const { data: conversation, error: convError } = await supabase .from("conversations") .select("*") .eq("id", id) .single(); if (convError) throw convError; // Get all participants const { data: participants } = await supabase .from("conversation_participants") .select(` user_id, role, joined_at, last_read_at `) .eq("conversation_id", id); res.json({ ...conversation, participants: participants || [], currentUserRole: participant.role }); } catch (err: any) { console.error("[Messaging] Get conversation error:", err); res.status(500).json({ error: err.message }); } }); // ========== MESSAGES ========== /** * GET /api/conversations/:id/messages * Get messages for a conversation */ router.get("/:id/messages", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { id } = req.params; const limit = parseInt(req.query.limit as string) || 50; const before = req.query.before as string; if (!userId) return res.status(401).json({ error: "Unauthorized" }); // Verify user is participant const { data: participant } = await supabase .from("conversation_participants") .select("*") .eq("conversation_id", id) .eq("user_id", userId) .single(); if (!participant) { return res.status(403).json({ error: "Access denied" }); } // Build query let query = supabase .from("messages") .select(` id, conversation_id, sender_id, content, content_type, metadata, reply_to_id, edited_at, created_at `) .eq("conversation_id", id) .is("deleted_at", null) .order("created_at", { ascending: false }) .limit(limit); if (before) { query = query.lt("created_at", before); } const { data: messages, error } = await query; if (error) throw error; // Get reactions for each message const messagesWithReactions = await Promise.all( (messages || []).map(async (msg) => { const { data: reactions } = await supabase .from("message_reactions") .select("user_id, emoji") .eq("message_id", msg.id); return { ...msg, reactions: reactions || [], isOwn: msg.sender_id === userId }; }) ); // Return in chronological order res.json({ messages: messagesWithReactions.reverse() }); } catch (err: any) { console.error("[Messaging] Get messages error:", err); res.status(500).json({ error: err.message }); } }); /** * POST /api/conversations/:id/messages * Send a message to a conversation */ router.post("/:id/messages", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { id } = req.params; const { content, contentType = "text", metadata, replyToId } = req.body; if (!userId) return res.status(401).json({ error: "Unauthorized" }); if (!content || content.trim().length === 0) { return res.status(400).json({ error: "content is required" }); } // Verify user is participant const { data: participant } = await supabase .from("conversation_participants") .select("*") .eq("conversation_id", id) .eq("user_id", userId) .single(); if (!participant) { return res.status(403).json({ error: "Access denied" }); } // Insert message const { data: message, error } = await supabase .from("messages") .insert({ conversation_id: id, sender_id: userId, content: content.trim(), content_type: contentType, metadata, reply_to_id: replyToId }) .select() .single(); if (error) throw error; // Emit real-time message to all conversation participants socketService.sendMessage(id, { ...message, isOwn: false, // Will be true only for sender on their client reactions: [] }); res.status(201).json({ message: { ...message, isOwn: true, reactions: [] } }); } catch (err: any) { console.error("[Messaging] Send message error:", err); res.status(500).json({ error: err.message }); } }); /** * PUT /api/messages/:messageId * Edit a message */ router.put("/messages/:messageId", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { messageId } = req.params; const { content } = req.body; if (!userId) return res.status(401).json({ error: "Unauthorized" }); if (!content) return res.status(400).json({ error: "content is required" }); // Verify ownership const { data: existing, error: checkError } = await supabase .from("messages") .select("sender_id") .eq("id", messageId) .single(); if (checkError || !existing) { return res.status(404).json({ error: "Message not found" }); } if (existing.sender_id !== userId) { return res.status(403).json({ error: "Can only edit your own messages" }); } // Update message const { data: message, error } = await supabase .from("messages") .update({ content, edited_at: new Date().toISOString() }) .eq("id", messageId) .select() .single(); if (error) throw error; res.json({ message }); } catch (err: any) { console.error("[Messaging] Edit message error:", err); res.status(500).json({ error: err.message }); } }); /** * DELETE /api/messages/:messageId * Delete a message (soft delete) */ router.delete("/messages/:messageId", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { messageId } = req.params; if (!userId) return res.status(401).json({ error: "Unauthorized" }); // Verify ownership const { data: existing } = await supabase .from("messages") .select("sender_id") .eq("id", messageId) .single(); if (!existing || existing.sender_id !== userId) { return res.status(403).json({ error: "Can only delete your own messages" }); } // Soft delete await supabase .from("messages") .update({ deleted_at: new Date().toISOString() }) .eq("id", messageId); res.json({ success: true }); } catch (err: any) { console.error("[Messaging] Delete message error:", err); res.status(500).json({ error: err.message }); } }); // ========== REACTIONS ========== /** * POST /api/messages/:messageId/reactions * Add reaction to a message */ router.post("/messages/:messageId/reactions", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { messageId } = req.params; const { emoji } = req.body; if (!userId) return res.status(401).json({ error: "Unauthorized" }); if (!emoji) return res.status(400).json({ error: "emoji is required" }); // Get message's conversation to verify access const { data: message } = await supabase .from("messages") .select("conversation_id") .eq("id", messageId) .single(); if (!message) { return res.status(404).json({ error: "Message not found" }); } // Verify user is participant const { data: participant } = await supabase .from("conversation_participants") .select("*") .eq("conversation_id", message.conversation_id) .eq("user_id", userId) .single(); if (!participant) { return res.status(403).json({ error: "Access denied" }); } // Add reaction (upsert to handle duplicates) const { error } = await supabase .from("message_reactions") .upsert({ message_id: messageId, user_id: userId, emoji }, { onConflict: "message_id,user_id,emoji" }); if (error) throw error; res.json({ success: true }); } catch (err: any) { console.error("[Messaging] Add reaction error:", err); res.status(500).json({ error: err.message }); } }); /** * DELETE /api/messages/:messageId/reactions/:emoji * Remove reaction from a message */ router.delete("/messages/:messageId/reactions/:emoji", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { messageId, emoji } = req.params; if (!userId) return res.status(401).json({ error: "Unauthorized" }); await supabase .from("message_reactions") .delete() .eq("message_id", messageId) .eq("user_id", userId) .eq("emoji", emoji); res.json({ success: true }); } catch (err: any) { console.error("[Messaging] Remove reaction error:", err); res.status(500).json({ error: err.message }); } }); // ========== READ RECEIPTS ========== /** * POST /api/conversations/:id/read * Mark conversation as read */ router.post("/:id/read", requireAuth, async (req: Request, res: Response) => { try { const userId = (req.session as any)?.userId; const { id } = req.params; if (!userId) return res.status(401).json({ error: "Unauthorized" }); await supabase .from("conversation_participants") .update({ last_read_at: new Date().toISOString() }) .eq("conversation_id", id) .eq("user_id", userId); res.json({ success: true }); } catch (err: any) { console.error("[Messaging] Mark read error:", err); res.status(500).json({ error: err.message }); } }); export default router;