import type { Request, Response } from "express"; import { createClient } from "@supabase/supabase-js"; const RUNPOD_API_KEY = process.env.RUNPOD_API_KEY || ""; const RUNPOD_ENDPOINT_ID = process.env.RUNPOD_ENDPOINT_ID || ""; const SIGNALING_URL = process.env.SIGNALING_URL || "wss://signal.aethex.tech/ws"; const TURN_SERVER = process.env.TURN_SERVER || "turn://aethex:changeme-turn-password@turn.aethex.tech:3478"; const supabaseAdmin = createClient( process.env.SUPABASE_URL!, process.env.SUPABASE_SERVICE_ROLE! ); async function getUserFromToken(token: string) { const { data: { user }, error } = await supabaseAdmin.auth.getUser(token); if (error || !user) return null; return user; } async function spawnJob(input: Record) { if (!RUNPOD_ENDPOINT_ID) throw new Error("RUNPOD_ENDPOINT_ID not configured"); const res = await fetch(`https://api.runpod.io/v2/${RUNPOD_ENDPOINT_ID}/run`, { method: "POST", headers: { "Content-Type": "application/json", "Authorization": `Bearer ${RUNPOD_API_KEY}`, }, body: JSON.stringify({ input }), }); const data = await res.json() as any; if (!res.ok || data.error) throw new Error(data.error || `RunPod error ${res.status}`); return data as { id: string; status: string }; } async function cancelJob(jobId: string) { if (!RUNPOD_ENDPOINT_ID) return; await fetch(`https://api.runpod.io/v2/${RUNPOD_ENDPOINT_ID}/cancel/${jobId}`, { method: "POST", headers: { "Authorization": `Bearer ${RUNPOD_API_KEY}` }, }); } // POST /api/stream/session/start export async function startSession(req: Request, res: Response) { const token = req.headers.authorization?.replace("Bearer ", ""); if (!token) return res.status(401).json({ error: "Unauthorized" }); const user = await getUserFromToken(token); if (!user) return res.status(401).json({ error: "Invalid token" }); const { mode = "game", game } = req.body as { mode?: string; game?: string }; const { data: existing } = await supabaseAdmin .from("stream_sessions") .select("id, pod_id") .eq("user_id", user.id) .in("status", ["active", "starting"]) .maybeSingle(); if (existing) { return res.json({ sessionId: existing.id, jobId: existing.pod_id, alreadyActive: true }); } if (mode === "devstation") { const { data: profile } = await supabaseAdmin .from("profiles") .select("role") .eq("id", user.id) .maybeSingle(); if (!["developer", "admin", "oversee"].includes(profile?.role ?? "")) { return res.status(403).json({ error: "Dev station access requires developer role" }); } } try { const job = await spawnJob({ MODE: mode, SIGNALING_URL, TURN_SERVER, AETHEX_USER_ID: user.id, IDLE_TIMEOUT: "600", MAX_SESSION: "7200", ...(game ? { GAME: game } : {}), }); console.log(`[stream] serverless job spawned: ${job.id} status=${job.status}`); const { data: session } = await supabaseAdmin .from("stream_sessions") .insert({ user_id: user.id, pod_id: job.id, mode, game: game || null, status: "starting", }) .select("id") .single(); res.json({ sessionId: session!.id, jobId: job.id, status: "starting" }); } catch (err: any) { console.error("[stream/start] error:", err.message); res.status(500).json({ error: err.message }); } } // POST /api/stream/session/stop export async function stopSession(req: Request, res: Response) { const token = req.headers.authorization?.replace("Bearer ", ""); if (!token) return res.status(401).json({ error: "Unauthorized" }); const user = await getUserFromToken(token); if (!user) return res.status(401).json({ error: "Invalid token" }); const { data: session } = await supabaseAdmin .from("stream_sessions") .select("id, pod_id") .eq("user_id", user.id) .in("status", ["active", "starting"]) .maybeSingle(); if (!session) return res.json({ ok: true, message: "No active session" }); await cancelJob(session.pod_id); await supabaseAdmin .from("stream_sessions") .update({ status: "stopped", ended_at: new Date().toISOString() }) .eq("id", session.id); res.json({ ok: true }); } // POST /api/stream/session/cpu — VP8 job for free users (same endpoint, FORCE_VP8=1) export async function startCpuSession(req: Request, res: Response) { const token = req.headers.authorization?.replace("Bearer ", ""); if (!token) return res.status(401).json({ error: "Unauthorized" }); const user = await getUserFromToken(token); if (!user) return res.status(401).json({ error: "Invalid token" }); const { data: existing } = await supabaseAdmin .from("stream_sessions") .select("id, pod_id") .eq("user_id", user.id) .in("status", ["active", "starting"]) .maybeSingle(); if (existing) return res.json({ sessionId: existing.id, jobId: existing.pod_id, alreadyActive: true }); try { const job = await spawnJob({ MODE: "game", SIGNALING_URL, TURN_SERVER, AETHEX_USER_ID: user.id, FORCE_VP8: "1", IDLE_TIMEOUT: "600", MAX_SESSION: "7200", }); console.log(`[stream/cpu] serverless job spawned: ${job.id}`); const { data: session } = await supabaseAdmin .from("stream_sessions") .insert({ user_id: user.id, pod_id: job.id, mode: "game", status: "starting" }) .select("id").single(); res.json({ sessionId: session!.id, jobId: job.id, status: "starting" }); } catch (err: any) { console.error("[stream/cpu] error:", err.message); res.status(500).json({ error: err.message }); } } // GET /api/stream/online (public — no auth) export async function onlineCount(_req: Request, res: Response) { const { count } = await supabaseAdmin .from("stream_sessions") .select("*", { count: "exact", head: true }) .in("status", ["active", "starting"]); res.json({ count: count ?? 0 }); } // GET /api/stream/session/status export async function sessionStatus(req: Request, res: Response) { const token = req.headers.authorization?.replace("Bearer ", ""); if (!token) return res.status(401).json({ error: "Unauthorized" }); const user = await getUserFromToken(token); if (!user) return res.status(401).json({ error: "Invalid token" }); const { data: session } = await supabaseAdmin .from("stream_sessions") .select("id, pod_id, mode, game, status, created_at") .eq("user_id", user.id) .order("created_at", { ascending: false }) .limit(1) .maybeSingle(); res.json({ session: session || null }); }