const fastify = require('fastify')({ logger: false, // 关闭默认日志,极大提升吞吐量 disableRequestLogging: true, // 关闭请求日志 connectionTimeout: 30000, // 快速释放死连接 keepAliveTimeout: 5000, // 调整 Keep-Alive }); const ffmpeg = require('fluent-ffmpeg'); const { request } = require('undici'); // High-performance HTTP client const sharp = require('sharp'); const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const EventEmitter = require('events'); // Configuration const PORT = process.env.PORT || 9520; const API_BASE = 'http://183.6.121.121:9558/api'; const CACHE_DIR = path.join(__dirname, '.cache'); // Ensure cache directory exists if (!fs.existsSync(CACHE_DIR)) { fs.mkdirSync(CACHE_DIR, { recursive: true }); } // Active downloads manager const activeDownloads = new Map(); // Global Error Handling to prevent crash process.on('uncaughtException', (err) => { console.error(`[${new Date().toISOString()}] Uncaught Exception:`, err); }); process.on('unhandledRejection', (reason, promise) => { console.error(`[${new Date().toISOString()}] Unhandled Rejection:`, reason); }); // Helper to fetch JSON from API using Undici (Faster than http.get) async function fetchApi(token, query) { const apiUrl = new URL(API_BASE); if (query) { Object.entries(query).forEach(([key, value]) => { apiUrl.searchParams.set(key, value); }); } const { statusCode, body } = await request(apiUrl, { method: 'GET', headers: { 'Connection': 'keep-alive', 'token': token }, bodyTimeout: 5000, headersTimeout: 5000 }); if (statusCode !== 200) { throw new Error(`API Status Code: ${statusCode}`); } const data = await body.json(); return data; } /** * 获取内容路径 * @param {*} uniqid * @returns */ function getContentPath(uniqid) { const subDir = 'content/' + uniqid.substring(0, 1); const dir = path.join(CACHE_DIR, subDir); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } return path.join(dir, `${uniqid}.data`); } function getMetaPath(key) { const subDir = 'meta/' + key.substring(0, 1); const dir = path.join(CACHE_DIR, subDir); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } return path.join(dir, `${key}.meta`) } function getTokenKey(token) { return crypto.createHash('md5').update(token).digest('hex'); } function normalizeApiData(metaJson) { if (metaJson && metaJson.api && metaJson.api.data) { return metaJson.api; } return metaJson; } function isValidThumbSpec(thumb) { return thumb && typeof thumb === 'object' && !Array.isArray(thumb) && Number.isFinite(thumb.w) && thumb.w > 0; } function getMimeFromUrl(u) { const ext = path.extname(new URL(u).pathname).toLowerCase(); if (ext === '.png') return 'image/png'; if (ext === '.webp') return 'image/webp'; if (ext === '.gif') return 'image/gif'; if (ext === '.jpg' || ext === '.jpeg' || ext === '.jfif') return 'image/jpeg'; if (ext === '.mp4' || ext === '.m4v' || ext === '.mov') return 'video/mp4'; if (ext === '.webm') return 'video/webm'; return 'image/jpeg'; } /** * 生成缩略图并缓存 * @param {*} reply * @param {*} apiData * @param {*} contentPath * @returns */ async function generateThumbAndCache(reply, apiData, contentPath) { const srcPath = contentPath; const dir = path.dirname(srcPath); const base = path.basename(srcPath); const thumbFinal = path.join(dir, base.replace('.data', `.thumb`)); const metaThumbPath = contentPath.replace('.data', '.thumb.meta'); if (fs.existsSync(thumbFinal) && fs.existsSync(metaThumbPath)) { const st = fs.statSync(thumbFinal); if (st.size > 0) { // metaPath 读取 metadata const metaJson = JSON.parse(fs.readFileSync(metaThumbPath, 'utf8')); // inputFormat const inputFormat = metaJson.inputFormat || 'webp'; const responseHeaders = { ...apiData.data.headers, ...{ 'Content-Type': `image/${inputFormat === 'jpg' ? 'jpeg' : inputFormat}`, 'Content-Length': st.size, 'Accept-Ranges': 'bytes', } }; reply.headers(responseHeaders); return fs.createReadStream(thumbFinal); } } try { console.log('Generating thumb:', srcPath); let attempts = 0; while (!fs.existsSync(srcPath) && attempts < 80) { await new Promise(r => setTimeout(r, 100)); attempts++; } if (!fs.existsSync(srcPath)) { console.log('Thumb source file not found:', srcPath); reply.code(500); return 'Thumb source file not found'; } let stat = fs.statSync(srcPath); attempts = 0; while (stat.size <= 0 && attempts < 80) { await new Promise(r => setTimeout(r, 100)); attempts++; stat = fs.statSync(srcPath); } if (stat.size <= 0) { reply.code(500); return 'Thumb source file is empty'; } const fit = (apiData.data.thumb && apiData.data.thumb.fit === 'max') ? 'inside' : 'cover'; const width = (apiData.data.thumb && apiData.data.thumb.w) ? apiData.data.thumb.w : 100; const contentType = (apiData.data.headers && apiData.data.headers['content-type']) || getMimeFromUrl(apiData.data.url) || 'image/jpeg'; const preferredFmt = contentType.includes('png') ? 'png' : contentType.includes('webp') ? 'webp' : 'jpeg'; if (contentType.includes('video/')) { console.log('Generating video thumb:', srcPath); const thumbFrameTemp = path.join(dir, base.replace('.data', `.thumb.frame.png.tmp`)); const { spawn } = require('child_process'); const args = ['-hide_banner', '-loglevel', 'error', '-nostdin', '-ss', '1', '-i', srcPath, '-frames:v', '1', '-vf', `scale=${width}:-2`, '-f', 'image2', '-vcodec', 'png', '-y', thumbFrameTemp]; await new Promise((resolve, reject) => { let err = ''; const p = spawn('ffmpeg', args, { stdio: ['ignore', 'ignore', 'pipe'] }); if (p.stderr) p.stderr.on('data', d => { err += d.toString(); }); p.on('error', reject); p.on('close', c => c === 0 ? resolve() : reject(new Error(`ffmpeg exit ${c}${err ? ': ' + err.trim() : ''}`))); }); const thumbTemp = path.join(dir, base.replace('.data', `.thumb.tmp`)); await sharp(thumbFrameTemp).webp({ quality: 80 }).toFile(thumbTemp); try { fs.renameSync(thumbTemp, thumbFinal); } catch (e) { if (fs.existsSync(thumbFinal)) { try { fs.unlinkSync(thumbFinal); } catch (_) { } fs.renameSync(thumbTemp, thumbFinal); } else { throw e; } } await fs.promises.writeFile(metaThumbPath, JSON.stringify({ api: apiData, headers: apiData.data.headers || {}, srcSize: stat.size, inputFormat: 'webp' })); try { if (fs.existsSync(thumbFrameTemp)) fs.unlinkSync(thumbFrameTemp); } catch (_) { } const tstat = fs.statSync(thumbFinal); reply.headers({ 'Content-Type': 'image/webp', 'Content-Length': tstat.size, 'Accept-Ranges': 'bytes', 'Access-Control-Allow-Origin': '*' }); return fs.createReadStream(thumbFinal); } const inputMeta = await sharp(srcPath).metadata(); const outFmt = preferredFmt || inputMeta.format || 'jpeg'; const thumbTemp = path.join(dir, base.replace('.data', `.thumb.tmp`)); const pipeline = sharp(srcPath).resize({ width, fit }); if (outFmt === 'jpeg') pipeline.jpeg({ quality: 85 }); else if (outFmt === 'png') pipeline.png(); else pipeline.webp({ quality: 80 }); await pipeline.toFile(thumbTemp); try { fs.renameSync(thumbTemp, thumbFinal); } catch (e) { if (fs.existsSync(thumbFinal)) { try { fs.unlinkSync(thumbFinal); } catch (_) { } fs.renameSync(thumbTemp, thumbFinal); } else { throw e; } } await fs.promises.writeFile(metaThumbPath, JSON.stringify({ api: apiData, headers: apiData.data.headers || {}, srcSize: stat.size, inputFormat: inputMeta.format || null })); const tstat = fs.statSync(thumbFinal); const responseHeaders = { ...apiData.data.headers, ...{ 'Content-Type': `image/${outFmt === 'jpeg' ? 'jpeg' : outFmt}`, 'Content-Length': tstat.size, 'Accept-Ranges': 'bytes', } }; reply.headers(responseHeaders); return fs.createReadStream(thumbFinal); } catch (e) { reply.code(500); return 'Thumb generation failed:' + e.message; } finally { // cleanup leftover temp if any const dir = path.dirname(srcPath); const base = path.basename(srcPath); const thumbTemp = path.join(dir, base.replace('.data', `.thumb.tmp`)); try { if (fs.existsSync(thumbTemp)) fs.unlinkSync(thumbTemp); } catch (_) { } } } // Serve existing file function serveCompletedCache(reply, apiData, contentPath) { const content = contentPath; const responseHeaders = { ...apiData.data.headers }; if (!responseHeaders['Access-Control-Allow-Origin']) { responseHeaders['Access-Control-Allow-Origin'] = '*'; } // Fastify handles Range requests automatically if we send the stream? // Actually, for full control over custom headers + Range, we often need manual handling or plugins. // But serving a raw stream in Fastify usually just pipes it. // For "High Performance", letting nginx handle static files is best, but here we do it in Node. // We will stick to the manual Range logic for consistency with previous "growing file" support. // To support Range properly with Fastify + Stream, we can set headers and return stream. // But for "growing" files, we need our custom pump logic. // For completed files, we can use fs.createReadStream. const range = reply.request.headers.range; const stat = fs.statSync(content); // Sync is okay for startup/metadata, but Async preferred in high-perf. // In strict high-perf, use fs.promises.stat or cache stats. const totalSize = stat.size; if (range) { const parts = range.replace(/bytes=/, "").split("-"); const start = parseInt(parts[0], 10); const end = parts[1] ? parseInt(parts[1], 10) : totalSize - 1; responseHeaders['Content-Range'] = `bytes ${start}-${end}/${totalSize}`; responseHeaders['Accept-Ranges'] = 'bytes'; responseHeaders['Content-Length'] = (end - start) + 1; reply.code(206).headers(responseHeaders); return fs.createReadStream(content, { start, end }); } else { responseHeaders['Content-Length'] = totalSize; responseHeaders['Accept-Ranges'] = 'bytes'; reply.code(200).headers(responseHeaders); return fs.createReadStream(content); } } // Download and Serve logic async function downloadAndServe(reply, apiData, contentPath, key) { let task = activeDownloads.get(key); // 如果任务已完成或出错但最终文件不存在,重置任务以便重新下载 if (task && (task.done || task.error)) { const hasFinal = fs.existsSync(contentPath); if (!hasFinal) { activeDownloads.delete(key); task = null; } } const isValidTask = task && typeof task === 'object' && task.emitter instanceof EventEmitter && !task.done; if (!isValidTask) { const finalPath = contentPath; const tempPath = path.join(path.dirname(finalPath), `${key}_${crypto.randomBytes(8).toString('hex')}.tmp`); task = { emitter: new EventEmitter(), currentSize: 0, totalSize: 0, path: tempPath, done: false, error: null }; task.emitter.setMaxListeners(0); // Prevent crash if no listeners for error event task.emitter.on('error', (err) => { console.error(`[${new Date().toISOString()}] Download Error (Key: ${key}):`, err.message); }); activeDownloads.set(key, task); // Start Download const targetUrl = apiData.data.url; // Use Undici stream for high performance download // stream() is efficient for piping const { stream } = require('undici'); stream(targetUrl, { method: 'GET' }, (res) => { const { statusCode, headers } = res; if (statusCode !== 200) { const err = new Error(`Upstream ${statusCode}`); task.error = err; task.emitter.emit('error', err); activeDownloads.delete(key); const ws = fs.createWriteStream('/dev/null'); ws.end(); return ws; } task.totalSize = parseInt(headers['content-length'] || '0', 10); const fileStream = fs.createWriteStream(task.path); fileStream.on('error', (err) => { task.emitter.emit('error', err); }); const originalWrite = fileStream.write.bind(fileStream); fileStream.write = (chunk, encoding, cb) => { const ret = originalWrite(chunk, encoding, cb); task.currentSize += chunk.length; task.emitter.emit('progress', task.currentSize); return ret; }; return fileStream; }).then(() => { try { const finalPath = contentPath; try { fs.renameSync(task.path, finalPath); } catch (err) { if (fs.existsSync(finalPath)) { try { fs.unlinkSync(finalPath); } catch (_) { } fs.renameSync(task.path, finalPath); } else { throw err; } } task.path = finalPath; task.done = true; task.emitter.emit('done'); } catch (err) { task.error = err; task.emitter.emit('error', err); fs.unlink(task.path, () => { }); } finally { activeDownloads.delete(key); } }).catch(err => { task.error = err; task.emitter.emit('error', err); activeDownloads.delete(key); fs.unlink(task.path, () => { }); }); } if (task.done && fs.existsSync(contentPath)) { console.log('Download completed:', key); if (isValidThumbSpec(apiData.data.thumb)) { return generateThumbAndCache(reply, apiData, contentPath).catch(() => { }); } return serveCompletedCache(reply, apiData, contentPath);//reply, apiData, contentPath } console.log('Downloading:', key); if (isValidThumbSpec(apiData.data.thumb)) { return generateThumbAndCache(reply, apiData, contentPath).catch(() => { }); } // Serve growing file return serveGrowingFile(reply, task, apiData); } function serveGrowingFile(reply, task, apiData) { const responseHeaders = { ...apiData.data.headers }; if (!responseHeaders['Access-Control-Allow-Origin']) responseHeaders['Access-Control-Allow-Origin'] = '*'; // 移除可能来自meta的固定 Content-Length,避免增长流卡死 delete responseHeaders['Content-Length']; delete responseHeaders['content-length']; const range = reply.request.headers.range; let start = 0; if (range) { const parts = range.replace(/bytes=/, "").split("-"); start = parseInt(parts[0], 10) || 0; responseHeaders['Accept-Ranges'] = 'bytes'; // 当总大小未知时,不能返回206,否则客户端会等待无效范围导致卡死 if (task.totalSize) { responseHeaders['Content-Range'] = `bytes ${start}-${task.totalSize - 1}/${task.totalSize}`; responseHeaders['Content-Length'] = task.totalSize - start; reply.code(206); } else { reply.code(200); } } else { if (task.totalSize) responseHeaders['Content-Length'] = task.totalSize; reply.code(200); } reply.headers(responseHeaders); // Custom stream to pump data from file to response const { Readable } = require('stream'); return new Readable({ read(size) { const self = this; let bytesSent = start; // State needs to be per-stream instance. // Wait, 'read' is called multiple times. We need to store state outside or on 'this'. if (this._bytesSent === undefined) this._bytesSent = start; pump(this); function pump(stream) { if (stream.destroyed) return; if (task.error) { stream.destroy(task.error); return; } // Open FD if needed if (!stream._fd) { fs.open(task.path, 'r', (err, fd) => { if (err) { if (err.code === 'ENOENT') { setTimeout(() => pump(stream), 100); } else { stream.destroy(err); } return; } stream._fd = fd; pump(stream); }); return; } const available = task.currentSize - stream._bytesSent; if (available > 0) { const buffer = Buffer.alloc(Math.min(available, 64 * 1024)); fs.read(stream._fd, buffer, 0, buffer.length, stream._bytesSent, (err, bytesRead) => { if (err) { stream.destroy(err); return; } if (bytesRead > 0) { stream._bytesSent += bytesRead; const keepPushing = stream.push(buffer.slice(0, bytesRead)); // If push returns false, we should stop and wait for _read again? // Actually Node streams: if push returns true, we can push more. // But here we just push what we have and wait for next _read call or event? // Standard implementation: push until it returns false. // But for "live" tailing, we might want to just push what we have and exit, // expecting _read to be called again by consumer. } else { wait(stream); } }); } else { if (task.done) { fs.close(stream._fd, () => { }); stream.push(null); // EOF } else { wait(stream); } } } function wait(stream) { // Wait for progress const onProgress = () => { cleanup(); pump(stream); }; const onDone = () => { cleanup(); pump(stream); }; const onError = (err) => { cleanup(); stream.destroy(err); }; const cleanup = () => { task.emitter.off('progress', onProgress); task.emitter.off('done', onDone); task.emitter.off('error', onError); }; task.emitter.once('progress', onProgress); task.emitter.once('done', onDone); // Check done state task.emitter.once('error', onError); // If stream destroyed, remove listeners? // Readable.read is active, so stream is active. } }, destroy(err, cb) { if (this._fd) fs.close(this._fd, () => { }); cb(err); } }); } // Global Fastify Error Handler fastify.setErrorHandler((error, request, reply) => { request.log.error(error); const statusCode = error.statusCode || 500; reply.status(statusCode).send({ error: error.name, message: error.message, statusCode }); }); // Routes fastify.get('/favicon.ico', async (request, reply) => { reply.code(204); return ''; }); fastify.get('/*', async (request, reply) => { let token = (request.raw && request.raw.url ? request.raw.url : request.url).split('?')[0].substring(1); if (!token) { reply.code(400); return 'Missing token'; } // .well-known 目录下的文件直接返回 if (token.startsWith('.well-known')) { reply.code(200); return ''; } // 获取nocache 参数 const nocache = request.query.nocache ? true : false; const random = request.query.random ? true : false; try { const key = getTokenKey(token); const metaPath = getMetaPath(key); const ONE_DAY_MS = random ? 5 * 1000 : 86400 * 7 * 1000; let apiData = null; if (fs.existsSync(metaPath) && !nocache) { try { const stat = fs.statSync(metaPath); if ((Date.now() - stat.mtimeMs) < ONE_DAY_MS) { const parsed = JSON.parse(fs.readFileSync(metaPath, 'utf8')); apiData = normalizeApiData(parsed); } } catch (e) { apiData = null; } } // 如果内容文件不存在,则强制刷新API,避免使用过期URL if (!apiData) { apiData = await fetchApi(token, request.query); if (apiData.code !== 200 || !apiData.data || !apiData.data.url) { console.log('Invalid API response:', apiData, token); reply.code(404); return 'Invalid API response'; } fs.writeFileSync(metaPath, JSON.stringify(apiData)); } if (apiData.code !== 200 || !apiData.data || !apiData.data.url) { reply.code(404); return 'Invalid API response'; } const contentPath = getContentPath(apiData.data.uniqid || key); if (fs.existsSync(contentPath) && !nocache) { if (isValidThumbSpec(apiData.data.thumb)) { return generateThumbAndCache(reply, apiData, contentPath).catch(() => { }); } return serveCompletedCache(reply, apiData, contentPath); } return await downloadAndServe(reply, apiData, contentPath, key); } catch (err) { request.log.error(err); reply.code(502); return 'Gateway Error: ' + err.message; } }); // Run fastify.listen({ port: PORT, host: '0.0.0.0' }, (err, address) => { if (err) { console.error(err); process.exit(1); } console.log(`Fastify Server running at ${address}`); });