From 51902353690a7dbbdf9d27c89f42e01151536b9f Mon Sep 17 00:00:00 2001 From: XiaoMo Date: Sun, 4 Jan 2026 19:13:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E9=AB=98=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E6=96=87=E4=BB=B6=E4=BB=A3=E7=90=86=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=B9=B6=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加基于fastify和undici的高性能文件代理服务,替换原有http实现 实现文件下载缓存机制,支持断点续传和并发下载管理 优化响应头处理并添加CORS支持 --- new/fastify.js | 363 +++++++++++++++++++++++++++++++++++++++++ new/new.js | 429 +++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 7 +- 3 files changed, 797 insertions(+), 2 deletions(-) create mode 100644 new/fastify.js create mode 100644 new/new.js diff --git a/new/fastify.js b/new/fastify.js new file mode 100644 index 0000000..7653723 --- /dev/null +++ b/new/fastify.js @@ -0,0 +1,363 @@ +const fastify = require('fastify')({ + logger: false, // 关闭默认日志,极大提升吞吐量 + disableRequestLogging: true, // 关闭请求日志 + connectionTimeout: 30000, // 快速释放死连接 + keepAliveTimeout: 5000, // 调整 Keep-Alive +}); +const { request } = require('undici'); // High-performance HTTP client +const fs = require('fs'); +const path = require('path'); +const crypto = require('crypto'); +const EventEmitter = require('events'); + +// Configuration +const PORT = 9520; +const API_BASE = 'http://183.6.121.121:9519/api'; +const CACHE_DIR = path.join(__dirname, '.cache'); + +// Ensure cache directory exists +if (!fs.existsSync(CACHE_DIR)) { + fs.mkdirSync(CACHE_DIR, { recursive: true }); +} + +// Active downloads manager +const activeDownloads = new Map(); + +// Helper to fetch JSON from API using Undici (Faster than http.get) +async function fetchApi(token) { + const apiUrl = new URL(API_BASE); + if (token) { + apiUrl.searchParams.set('token', token); + } + + const { statusCode, body } = await request(apiUrl, { + method: 'GET', + headers: { 'Connection': 'keep-alive' }, + bodyTimeout: 5000, + headersTimeout: 5000 + }); + + if (statusCode !== 200) { + throw new Error(`API Status Code: ${statusCode}`); + } + + const data = await body.json(); + return data; +} + +function getCacheKey(apiData) { + if (apiData.data && apiData.data.uniqid) { + return apiData.data.uniqid; + } + if (apiData.data && apiData.data.url) { + return crypto.createHash('md5').update(apiData.data.url).digest('hex'); + } + return null; +} + +function getCachePaths(key) { + const subDir = key.substring(0, 1); + const dir = path.join(CACHE_DIR, subDir); + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + return { + content: path.join(dir, `${key}.data`), + meta: path.join(dir, `${key}.meta`) + }; +} + +// Serve existing file +function serveCompletedCache(reply, cachePaths, apiData) { + const { content } = cachePaths; + const responseHeaders = { ...apiData.data.headers }; + + if (!responseHeaders['Access-Control-Allow-Origin']) { + responseHeaders['Access-Control-Allow-Origin'] = '*'; + } + + // Fastify handles Range requests automatically if we send the stream? + // Actually, for full control over custom headers + Range, we often need manual handling or plugins. + // But serving a raw stream in Fastify usually just pipes it. + // For "High Performance", letting nginx handle static files is best, but here we do it in Node. + // We will stick to the manual Range logic for consistency with previous "growing file" support. + + // To support Range properly with Fastify + Stream, we can set headers and return stream. + // But for "growing" files, we need our custom pump logic. + // For completed files, we can use fs.createReadStream. + + const range = reply.request.headers.range; + const stat = fs.statSync(content); // Sync is okay for startup/metadata, but Async preferred in high-perf. + // In strict high-perf, use fs.promises.stat or cache stats. + + const totalSize = stat.size; + + if (range) { + const parts = range.replace(/bytes=/, "").split("-"); + const start = parseInt(parts[0], 10); + const end = parts[1] ? parseInt(parts[1], 10) : totalSize - 1; + + responseHeaders['Content-Range'] = `bytes ${start}-${end}/${totalSize}`; + responseHeaders['Accept-Ranges'] = 'bytes'; + responseHeaders['Content-Length'] = (end - start) + 1; + + reply.code(206).headers(responseHeaders); + return fs.createReadStream(content, { start, end }); + } else { + responseHeaders['Content-Length'] = totalSize; + responseHeaders['Accept-Ranges'] = 'bytes'; + reply.code(200).headers(responseHeaders); + return fs.createReadStream(content); + } +} + +// Download and Serve logic +function downloadAndServe(reply, apiData, cachePaths, key) { + let task = activeDownloads.get(key); + + if (!task) { + task = { + emitter: new EventEmitter(), + currentSize: 0, + totalSize: 0, + path: cachePaths.content, + done: false, + error: null + }; + task.emitter.setMaxListeners(0); + activeDownloads.set(key, task); + + // Start Download + const targetUrl = apiData.data.url; + + // Use Undici stream for high performance download + // stream() is efficient for piping + const { stream } = require('undici'); + + stream(targetUrl, { method: 'GET', opaque: task }, ({ statusCode, headers }) => { + if (statusCode !== 200) { + // handle error + const err = new Error(`Upstream ${statusCode}`); + task.error = err; + task.emitter.emit('error', err); + activeDownloads.delete(key); + // We need to return a Writable to undici + // return new Writable(...) or simple dummy + return fs.createWriteStream('/dev/null'); + } + + // Save meta + fs.writeFileSync(cachePaths.meta, JSON.stringify(apiData)); + + task.totalSize = parseInt(headers['content-length'] || '0', 10); + + // Return the write stream to file + const fileStream = fs.createWriteStream(task.path); + + // Monitor writing + // We need to intercept the stream to update currentSize + // PassThrough stream adds overhead. + // Better to wrap the write? + // Or just fs.watchFile? (Slow). + // Let's use a custom Writable wrapper or event listener on fileStream 'drain'/'finish' isn't granular enough. + // Undici stream factory returns a Writable. + + // Let's stick to a simple approach: + // We can't easily hook into fs.WriteStream bytesWritten in real-time without polling or proxying. + // Proxying: + const originalWrite = fileStream.write.bind(fileStream); + fileStream.write = (chunk, encoding, cb) => { + const ret = originalWrite(chunk, encoding, cb); + task.currentSize += chunk.length; + task.emitter.emit('progress', task.currentSize); + return ret; + }; + + return fileStream; + + }, ({ opaque }) => { + // Finished + opaque.done = true; + opaque.emitter.emit('done'); + activeDownloads.delete(key); + }, (err, { opaque }) => { + // Error + if (err) { + opaque.error = err; + opaque.emitter.emit('error', err); + activeDownloads.delete(key); + fs.unlink(opaque.path, () => { }); + } + }); + } + + // Serve growing file + return serveGrowingFile(reply, task, apiData); +} + +function serveGrowingFile(reply, task, apiData) { + const responseHeaders = { ...apiData.data.headers }; + if (!responseHeaders['Access-Control-Allow-Origin']) responseHeaders['Access-Control-Allow-Origin'] = '*'; + + const range = reply.request.headers.range; + let start = 0; + + if (range) { + const parts = range.replace(/bytes=/, "").split("-"); + start = parseInt(parts[0], 10) || 0; + responseHeaders['Accept-Ranges'] = 'bytes'; + if (task.totalSize) { + responseHeaders['Content-Range'] = `bytes ${start}-${task.totalSize - 1}/${task.totalSize}`; + responseHeaders['Content-Length'] = task.totalSize - start; + } + reply.code(206); + } else { + if (task.totalSize) responseHeaders['Content-Length'] = task.totalSize; + reply.code(200); + } + + reply.headers(responseHeaders); + + // Custom stream to pump data from file to response + const { Readable } = require('stream'); + + return new Readable({ + read(size) { + const self = this; + let bytesSent = start; // State needs to be per-stream instance. + // Wait, 'read' is called multiple times. We need to store state outside or on 'this'. + if (this._bytesSent === undefined) this._bytesSent = start; + + pump(this); + + function pump(stream) { + if (stream.destroyed) return; + + if (task.error) { + stream.destroy(task.error); + return; + } + + // Open FD if needed + if (!stream._fd) { + fs.open(task.path, 'r', (err, fd) => { + if (err) { + if (err.code === 'ENOENT' && !task.done) { + setTimeout(() => pump(stream), 100); + } else { + stream.destroy(err); + } + return; + } + stream._fd = fd; + pump(stream); + }); + return; + } + + const available = task.currentSize - stream._bytesSent; + + if (available > 0) { + const buffer = Buffer.alloc(Math.min(available, 64 * 1024)); + fs.read(stream._fd, buffer, 0, buffer.length, stream._bytesSent, (err, bytesRead) => { + if (err) { + stream.destroy(err); + return; + } + if (bytesRead > 0) { + stream._bytesSent += bytesRead; + const keepPushing = stream.push(buffer.slice(0, bytesRead)); + // If push returns false, we should stop and wait for _read again? + // Actually Node streams: if push returns true, we can push more. + // But here we just push what we have and wait for next _read call or event? + // Standard implementation: push until it returns false. + // But for "live" tailing, we might want to just push what we have and exit, + // expecting _read to be called again by consumer. + } else { + wait(stream); + } + }); + } else { + if (task.done) { + fs.close(stream._fd, () => { }); + stream.push(null); // EOF + } else { + wait(stream); + } + } + } + + function wait(stream) { + // Wait for progress + const onProgress = () => { + pump(stream); + }; + task.emitter.once('progress', onProgress); + task.emitter.once('done', onProgress); // Check done state + + // If stream destroyed, remove listeners? + // Readable.read is active, so stream is active. + } + }, + destroy(err, cb) { + if (this._fd) fs.close(this._fd, () => { }); + cb(err); + } + }); +} + +// Routes +fastify.get('/favicon.ico', async (request, reply) => { + reply.code(204); + return ''; +}); + +fastify.get('/*', async (request, reply) => { + const token = request.url.substring(1); + if (!token) { + reply.code(400); + return 'Missing token'; + } + + try { + // 1. Fetch API + // Note: fetchApi is async, so we await + const apiData = await fetchApi(token); + + if (apiData.code !== 200 || !apiData.data || !apiData.data.url) { + reply.code(404); + return 'Invalid API response'; + } + + const key = getCacheKey(apiData); + if (!key) { + reply.code(500); + return 'Key Error'; + } + + const cachePaths = getCachePaths(key); + + // 2. Serve + if (activeDownloads.has(key)) { + return downloadAndServe(reply, apiData, cachePaths, key); + } else if (fs.existsSync(cachePaths.content) && fs.existsSync(cachePaths.meta)) { + return serveCompletedCache(reply, cachePaths, apiData); + } else { + return downloadAndServe(reply, apiData, cachePaths, key); + } + + } catch (err) { + request.log.error(err); + reply.code(502); + return 'Gateway Error: ' + err.message; + } +}); + +// Run +fastify.listen({ port: PORT, host: '0.0.0.0' }, (err, address) => { + if (err) { + console.error(err); + process.exit(1); + } + console.log(`Fastify Server running at ${address}`); +}); diff --git a/new/new.js b/new/new.js new file mode 100644 index 0000000..a19be50 --- /dev/null +++ b/new/new.js @@ -0,0 +1,429 @@ +const http = require('http'); +const https = require('https'); +const url = require('url'); +const fs = require('fs'); +const path = require('path'); +const crypto = require('crypto'); +const EventEmitter = require('events'); + +// Configuration +const PORT = 9520; +const API_BASE = 'http://183.6.121.121:9519/api'; +const CACHE_DIR = path.join(__dirname, '.cache'); + +// Ensure cache directory exists +if (!fs.existsSync(CACHE_DIR)) { + fs.mkdirSync(CACHE_DIR, { recursive: true }); +} + +// Active downloads manager +// Key -> { emitter: EventEmitter, currentSize: number, totalSize: number, path: string, done: boolean, error: any } +const activeDownloads = new Map(); + +// Helper to fetch JSON from API +function fetchApi(token) { + return new Promise((resolve, reject) => { + const apiUrl = new URL(API_BASE); + if (token) { + apiUrl.searchParams.set('token', token); + } + + const req = http.get(apiUrl, (res) => { + if (res.statusCode !== 200) { + res.resume(); + return reject(new Error(`API Status Code: ${res.statusCode}`)); + } + + let data = ''; + res.on('data', chunk => data += chunk); + res.on('end', () => { + try { + const json = JSON.parse(data); + resolve(json); + } catch (e) { + reject(e); + } + }); + }); + + req.on('error', reject); + req.setTimeout(30000, () => { + req.destroy(); + reject(new Error('API Request Timeout')); + }); + }); +} + +// Helper to determine cache key +function getCacheKey(apiData) { + if (apiData.data && apiData.data.uniqid) { + return apiData.data.uniqid; + } + // Fallback to hash of URL + if (apiData.data && apiData.data.url) { + return crypto.createHash('md5').update(apiData.data.url).digest('hex'); + } + return null; +} + +// Helper to get cache paths +function getCachePaths(key) { + const subDir = key.substring(0, 1); + const dir = path.join(CACHE_DIR, subDir); + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + return { + content: path.join(dir, `${key}.data`), + meta: path.join(dir, `${key}.meta`) + }; +} + +// Helper to parse Range header +function parseRange(range, totalSize) { + if (!range || range.indexOf('bytes=') !== 0) return null; + + const parts = range.replace(/bytes=/, "").split("-"); + const partialStart = parts[0]; + const partialEnd = parts[1]; + + const start = parseInt(partialStart, 10); + const end = partialEnd ? parseInt(partialEnd, 10) : totalSize - 1; + + if (isNaN(start) || isNaN(end) || start > end || start >= totalSize) return null; + + return { start, end }; +} + +// Serve file from cache (Completed file) +function serveCompletedCache(req, res, cachePaths, apiData) { + const { content } = cachePaths; + const responseHeaders = { ...apiData.data.headers }; + + if (!responseHeaders['Access-Control-Allow-Origin']) { + responseHeaders['Access-Control-Allow-Origin'] = '*'; + } + + fs.stat(content, (err, stats) => { + if (err) { + res.writeHead(500); + res.end('Cache file missing'); + return; + } + + const totalSize = stats.size; + const range = req.headers.range; + + if (range) { + const parsed = parseRange(range, totalSize); + if (parsed) { + const { start, end } = parsed; + const chunksize = (end - start) + 1; + + responseHeaders['Content-Range'] = `bytes ${start}-${end}/${totalSize}`; + responseHeaders['Accept-Ranges'] = 'bytes'; + responseHeaders['Content-Length'] = chunksize; + + res.writeHead(206, responseHeaders); + + const stream = fs.createReadStream(content, { start, end }); + stream.pipe(res); + return; + } + } + + responseHeaders['Content-Length'] = totalSize; + responseHeaders['Accept-Ranges'] = 'bytes'; + + res.writeHead(200, responseHeaders); + const stream = fs.createReadStream(content); + stream.pipe(res); + }); +} + +// Start download if needed, and serve the growing file +function downloadAndServe(req, res, apiData, cachePaths, key) { + let task = activeDownloads.get(key); + + // 1. If no task exists, create one + if (!task) { + task = { + emitter: new EventEmitter(), + currentSize: 0, + totalSize: 0, // unknown initially + path: cachePaths.content, + done: false, + error: null + }; + activeDownloads.set(key, task); + + // Start download process + const targetUrl = apiData.data.url; + const protocol = targetUrl.startsWith('https') ? https : http; + + const proxyReq = protocol.get(targetUrl, (proxyRes) => { + if (proxyRes.statusCode !== 200) { + task.error = new Error(`Upstream ${proxyRes.statusCode}`); + task.emitter.emit('error', task.error); + activeDownloads.delete(key); + return; + } + + // Save meta + fs.writeFileSync(cachePaths.meta, JSON.stringify(apiData)); + + const totalSize = parseInt(proxyRes.headers['content-length'] || '0', 10); + task.totalSize = totalSize; + + const fileStream = fs.createWriteStream(task.path); + + proxyRes.on('data', chunk => { + fileStream.write(chunk); + task.currentSize += chunk.length; + task.emitter.emit('progress', task.currentSize); + }); + + proxyRes.on('end', () => { + fileStream.end(); + task.done = true; + task.emitter.emit('done'); + activeDownloads.delete(key); + }); + + proxyRes.on('error', (err) => { + fileStream.end(); + fs.unlink(task.path, () => {}); + task.error = err; + task.emitter.emit('error', err); + activeDownloads.delete(key); + }); + }); + + proxyReq.on('error', (err) => { + task.error = err; + task.emitter.emit('error', err); + activeDownloads.delete(key); + }); + } + + // 2. Serve from the growing file (Tailing logic) + serveGrowingFile(req, res, task, apiData); +} + +function serveGrowingFile(req, res, task, apiData) { + const responseHeaders = { ...apiData.data.headers }; + if (!responseHeaders['Access-Control-Allow-Origin']) { + responseHeaders['Access-Control-Allow-Origin'] = '*'; + } + + // Handle Range request if present (only supports simple start range for now) + const range = req.headers.range; + let start = 0; + + if (range) { + const parts = range.replace(/bytes=/, "").split("-"); + start = parseInt(parts[0], 10); + if (isNaN(start)) start = 0; + + // Note: We don't know the end yet properly, or we support open-ended + responseHeaders['Accept-Ranges'] = 'bytes'; + // We can't strictly send Content-Range with /totalSize if we don't know it yet, + // but often we do from upstream Content-Length. + if (task.totalSize) { + responseHeaders['Content-Range'] = `bytes ${start}-${task.totalSize-1}/${task.totalSize}`; + responseHeaders['Content-Length'] = task.totalSize - start; + } + res.writeHead(206, responseHeaders); + } else { + if (task.totalSize) { + responseHeaders['Content-Length'] = task.totalSize; + } + res.writeHead(200, responseHeaders); + } + + // Stream logic + let bytesSent = start; + let fd = null; + let ended = false; + let isReading = false; + + // Open file for reading + // We might need to wait for file to be created? + // Usually writeStream creates it immediately. + + function openAndStream() { + fs.open(task.path, 'r', (err, fileDesc) => { + if (err) { + // If file not found yet, wait a bit + if (err.code === 'ENOENT' && !task.error && !task.done) { + setTimeout(openAndStream, 100); + return; + } + if (!res.headersSent) res.writeHead(500); + res.end(); + return; + } + fd = fileDesc; + pump(); + }); + } + + function onTaskDone() { + pump(); + } + + function onTaskError(err) { + close(); + } + + // Subscribe to permanent events + task.emitter.on('done', onTaskDone); + task.emitter.on('error', onTaskError); + + function pump() { + if (ended || isReading) return; + if (task.error) { + res.end(); // Upstream error + return; + } + + // Determine how much we can read + // Available bytes in file = task.currentSize + const available = task.currentSize - bytesSent; + + if (available > 0) { + isReading = true; + const buffer = Buffer.alloc(Math.min(available, 64 * 1024)); // 64KB chunks + fs.read(fd, buffer, 0, buffer.length, bytesSent, (err, bytesRead) => { + isReading = false; + if (err) { + close(); + return; + } + + if (bytesRead > 0) { + const chunk = buffer.slice(0, bytesRead); + const canContinue = res.write(chunk); + bytesSent += bytesRead; + + if (canContinue) { + pump(); + } else { + res.once('drain', pump); + } + } else { + // Should not happen if available > 0, but safety check + wait(); + } + }); + } else { + // No data available yet + if (task.done) { + close(); // We are done + } else { + wait(); + } + } + } + + function wait() { + if (ended) return; + task.emitter.once('progress', pump); + } + + function close() { + if (ended) return; + ended = true; + if (fd) fs.close(fd, () => {}); + res.end(); + + // Cleanup listeners + task.emitter.removeListener('progress', pump); + task.emitter.removeListener('done', onTaskDone); + task.emitter.removeListener('error', onTaskError); + } + + res.on('close', close); + res.on('error', close); + + openAndStream(); +} + + +const server = http.createServer(async (req, res) => { + // Basic CORS + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', 'GET, OPTIONS'); + + if (req.method === 'OPTIONS') { + res.writeHead(200); + res.end(); + return; + } + + // Ignore favicon + if (req.url === '/favicon.ico') { + res.writeHead(204); + res.end(); + return; + } + + try { + const token = req.url.substring(1); + + if (!token) { + res.writeHead(400); + res.end('Missing token'); + return; + } + + // 1. Get API Data + let apiData; + try { + apiData = await fetchApi(token); + } catch (e) { + console.error('API Fetch Error:', e.message); + res.writeHead(502); + res.end('API Error'); + return; + } + + if (apiData.code !== 200 || !apiData.data || !apiData.data.url) { + res.writeHead(404); + res.end('Invalid API response'); + return; + } + + const key = getCacheKey(apiData); + if (!key) { + res.writeHead(500); + res.end('Could not generate cache key'); + return; + } + + const cachePaths = getCachePaths(key); + + // 2. Check Cache + // If file exists and is NOT currently being downloaded + if (activeDownloads.has(key)) { + // Join the active download stream + downloadAndServe(req, res, apiData, cachePaths, key); + } else if (fs.existsSync(cachePaths.content) && fs.existsSync(cachePaths.meta)) { + serveCompletedCache(req, res, cachePaths, apiData); + } else { + // Start new download + downloadAndServe(req, res, apiData, cachePaths, key); + } + + } catch (err) { + console.error('Server Error:', err); + if (!res.headersSent) { + res.writeHead(500); + res.end('Internal Server Error'); + } + } +}); + +server.listen(PORT, () => { + console.log(`Server running on port ${PORT}`); +}); diff --git a/package.json b/package.json index c91b04d..404e50b 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,8 @@ { "dependencies": { - "javascript-obfuscator": "^4.1.1" - } + "fastify": "^5.6.2", + "javascript-obfuscator": "^4.1.1", + "undici": "^7.16.0" + }, + "packageManager": "pnpm@10.27.0+sha512.72d699da16b1179c14ba9e64dc71c9a40988cbdc65c264cb0e489db7de917f20dcf4d64d8723625f2969ba52d4b7e2a1170682d9ac2a5dcaeaab732b7e16f04a" }