const http = require('http'); const https = require('https'); const url = require('url'); const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const EventEmitter = require('events'); // Configuration const PORT = process.env.PORT || 9520; const API_BASE = process.env.API_BASE || 'http://127.0.0.1:9558/api'; const CACHE_DIR = process.env.CACHE_DIR ? path.resolve(process.env.CACHE_DIR) : path.join(__dirname, '.cache'); // Ensure cache directory exists if (!fs.existsSync(CACHE_DIR)) { fs.mkdirSync(CACHE_DIR, { recursive: true }); } // Active downloads manager // Key -> { emitter: EventEmitter, currentSize: number, totalSize: number, path: string, done: boolean, error: any } const activeDownloads = new Map(); // Helper to fetch JSON from API function fetchApi(token) { return new Promise((resolve, reject) => { const apiUrl = new URL(API_BASE); if (token) { apiUrl.searchParams.set('token', token); } const req = http.get(apiUrl, (res) => { if (res.statusCode !== 200) { res.resume(); return reject(new Error(`API Status Code: ${res.statusCode}`)); } let data = ''; res.on('data', chunk => data += chunk); res.on('end', () => { try { const json = JSON.parse(data); resolve(json); } catch (e) { reject(e); } }); }); req.on('error', reject); req.setTimeout(30000, () => { req.destroy(); reject(new Error('API Request Timeout')); }); }); } // Helper to determine cache key function getCacheKey(apiData) { if (apiData.data && apiData.data.uniqid) { return apiData.data.uniqid; } // Fallback to hash of URL if (apiData.data && apiData.data.url) { return crypto.createHash('md5').update(apiData.data.url).digest('hex'); } return null; } // Helper to get cache paths function getCachePaths(key) { const subDir = key.substring(0, 1); const dir = path.join(CACHE_DIR, subDir); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } return { content: path.join(dir, `${key}.data`), meta: path.join(dir, `${key}.meta`) }; } // Helper to parse Range header function parseRange(range, totalSize) { if (!range || range.indexOf('bytes=') !== 0) return null; const parts = range.replace(/bytes=/, "").split("-"); const partialStart = parts[0]; const partialEnd = parts[1]; const start = parseInt(partialStart, 10); const end = partialEnd ? parseInt(partialEnd, 10) : totalSize - 1; if (isNaN(start) || isNaN(end) || start > end || start >= totalSize) return null; return { start, end }; } // Serve file from cache (Completed file) function serveCompletedCache(req, res, cachePaths, apiData) { const { content } = cachePaths; const responseHeaders = { ...apiData.data.headers }; if (!responseHeaders['Access-Control-Allow-Origin']) { responseHeaders['Access-Control-Allow-Origin'] = '*'; } fs.stat(content, (err, stats) => { if (err) { res.writeHead(500); res.end('Cache file missing'); return; } const totalSize = stats.size; const range = req.headers.range; if (range) { const parsed = parseRange(range, totalSize); if (parsed) { const { start, end } = parsed; const chunksize = (end - start) + 1; responseHeaders['Content-Range'] = `bytes ${start}-${end}/${totalSize}`; responseHeaders['Accept-Ranges'] = 'bytes'; responseHeaders['Content-Length'] = chunksize; res.writeHead(206, responseHeaders); const stream = fs.createReadStream(content, { start, end }); stream.pipe(res); return; } } responseHeaders['Content-Length'] = totalSize; responseHeaders['Accept-Ranges'] = 'bytes'; res.writeHead(200, responseHeaders); const stream = fs.createReadStream(content); stream.pipe(res); }); } // Start download if needed, and serve the growing file function downloadAndServe(req, res, apiData, cachePaths, key) { let task = activeDownloads.get(key); // 1. If no task exists, create one if (!task) { task = { emitter: new EventEmitter(), currentSize: 0, totalSize: 0, // unknown initially path: cachePaths.content, done: false, error: null }; activeDownloads.set(key, task); // Start download process const targetUrl = apiData.data.url; const protocol = targetUrl.startsWith('https') ? https : http; const proxyReq = protocol.get(targetUrl, (proxyRes) => { if (proxyRes.statusCode !== 200) { task.error = new Error(`Upstream ${proxyRes.statusCode}`); task.emitter.emit('error', task.error); activeDownloads.delete(key); return; } // Save meta const metaPath = cachePaths.meta.replace('.meta', '.thumb.meta'); fs.writeFileSync(metaPath, JSON.stringify(apiData)); const totalSize = parseInt(proxyRes.headers['content-length'] || '0', 10); task.totalSize = totalSize; const fileStream = fs.createWriteStream(task.path); proxyRes.on('data', chunk => { fileStream.write(chunk); task.currentSize += chunk.length; task.emitter.emit('progress', task.currentSize); }); proxyRes.on('end', () => { fileStream.end(); task.done = true; task.emitter.emit('done'); activeDownloads.delete(key); }); proxyRes.on('error', (err) => { fileStream.end(); fs.unlink(task.path, () => {}); task.error = err; task.emitter.emit('error', err); activeDownloads.delete(key); }); }); proxyReq.on('error', (err) => { task.error = err; task.emitter.emit('error', err); activeDownloads.delete(key); }); } // 2. Serve from the growing file (Tailing logic) serveGrowingFile(req, res, task, apiData); } function serveGrowingFile(req, res, task, apiData) { const responseHeaders = { ...apiData.data.headers }; if (!responseHeaders['Access-Control-Allow-Origin']) { responseHeaders['Access-Control-Allow-Origin'] = '*'; } // Handle Range request if present (only supports simple start range for now) const range = req.headers.range; let start = 0; if (range) { const parts = range.replace(/bytes=/, "").split("-"); start = parseInt(parts[0], 10); if (isNaN(start)) start = 0; // Note: We don't know the end yet properly, or we support open-ended responseHeaders['Accept-Ranges'] = 'bytes'; // We can't strictly send Content-Range with /totalSize if we don't know it yet, // but often we do from upstream Content-Length. if (task.totalSize) { responseHeaders['Content-Range'] = `bytes ${start}-${task.totalSize-1}/${task.totalSize}`; responseHeaders['Content-Length'] = task.totalSize - start; } res.writeHead(206, responseHeaders); } else { if (task.totalSize) { responseHeaders['Content-Length'] = task.totalSize; } res.writeHead(200, responseHeaders); } // Stream logic let bytesSent = start; let fd = null; let ended = false; let isReading = false; // Open file for reading // We might need to wait for file to be created? // Usually writeStream creates it immediately. function openAndStream() { fs.open(task.path, 'r', (err, fileDesc) => { if (err) { // If file not found yet, wait a bit if (err.code === 'ENOENT' && !task.error && !task.done) { setTimeout(openAndStream, 100); return; } if (!res.headersSent) res.writeHead(500); res.end(); return; } fd = fileDesc; pump(); }); } function onTaskDone() { pump(); } function onTaskError(err) { close(); } // Subscribe to permanent events task.emitter.on('done', onTaskDone); task.emitter.on('error', onTaskError); function pump() { if (ended || isReading) return; if (task.error) { res.end(); // Upstream error return; } // Determine how much we can read // Available bytes in file = task.currentSize const available = task.currentSize - bytesSent; if (available > 0) { isReading = true; const buffer = Buffer.alloc(Math.min(available, 64 * 1024)); // 64KB chunks fs.read(fd, buffer, 0, buffer.length, bytesSent, (err, bytesRead) => { isReading = false; if (err) { close(); return; } if (bytesRead > 0) { const chunk = buffer.slice(0, bytesRead); const canContinue = res.write(chunk); bytesSent += bytesRead; if (canContinue) { pump(); } else { res.once('drain', pump); } } else { // Should not happen if available > 0, but safety check wait(); } }); } else { // No data available yet if (task.done) { close(); // We are done } else { wait(); } } } function wait() { if (ended) return; task.emitter.once('progress', pump); } function close() { if (ended) return; ended = true; if (fd) fs.close(fd, () => {}); res.end(); // Cleanup listeners task.emitter.removeListener('progress', pump); task.emitter.removeListener('done', onTaskDone); task.emitter.removeListener('error', onTaskError); } res.on('close', close); res.on('error', close); openAndStream(); } const server = http.createServer(async (req, res) => { // Basic CORS res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, OPTIONS'); if (req.method === 'OPTIONS') { res.writeHead(200); res.end(); return; } // Ignore favicon if (req.url === '/favicon.ico') { res.writeHead(204); res.end(); return; } try { const token = req.url.substring(1); if (!token) { res.writeHead(400); res.end('Missing token'); return; } // 1. Get API Data let apiData; try { apiData = await fetchApi(token); } catch (e) { console.error('API Fetch Error:', e.message); res.writeHead(502); res.end('API Error'); return; } if (apiData.code !== 200 || !apiData.data || !apiData.data.url) { res.writeHead(404); res.end('Invalid API response'); return; } const key = getCacheKey(apiData); if (!key) { res.writeHead(500); res.end('Could not generate cache key'); return; } const cachePaths = getCachePaths(key); // 2. Check Cache // If file exists and is NOT currently being downloaded if (activeDownloads.has(key)) { // Join the active download stream downloadAndServe(req, res, apiData, cachePaths, key); } else if (fs.existsSync(cachePaths.content) && fs.existsSync(cachePaths.meta)) { serveCompletedCache(req, res, cachePaths, apiData); } else { // Start new download downloadAndServe(req, res, apiData, cachePaths, key); } } catch (err) { console.error('Server Error:', err); if (!res.headersSent) { res.writeHead(500); res.end('Internal Server Error'); } } }); server.listen(PORT, () => { console.log(`Server running on port ${PORT}`); });