Files
alist-proxy/new/fastify.js
XiaoMo 5190235369 feat: 新增高性能文件代理服务并优化缓存处理
添加基于fastify和undici的高性能文件代理服务,替换原有http实现
实现文件下载缓存机制,支持断点续传和并发下载管理
优化响应头处理并添加CORS支持
2026-01-04 19:13:22 +08:00

364 lines
12 KiB
JavaScript

const fastify = require('fastify')({
logger: false, // 关闭默认日志,极大提升吞吐量
disableRequestLogging: true, // 关闭请求日志
connectionTimeout: 30000, // 快速释放死连接
keepAliveTimeout: 5000, // 调整 Keep-Alive
});
const { request } = require('undici'); // High-performance HTTP client
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
const EventEmitter = require('events');
// Configuration
const PORT = 9520;
const API_BASE = 'http://183.6.121.121:9519/api';
const CACHE_DIR = path.join(__dirname, '.cache');
// Ensure cache directory exists
if (!fs.existsSync(CACHE_DIR)) {
fs.mkdirSync(CACHE_DIR, { recursive: true });
}
// Active downloads manager
const activeDownloads = new Map();
// Helper to fetch JSON from API using Undici (Faster than http.get)
async function fetchApi(token) {
const apiUrl = new URL(API_BASE);
if (token) {
apiUrl.searchParams.set('token', token);
}
const { statusCode, body } = await request(apiUrl, {
method: 'GET',
headers: { 'Connection': 'keep-alive' },
bodyTimeout: 5000,
headersTimeout: 5000
});
if (statusCode !== 200) {
throw new Error(`API Status Code: ${statusCode}`);
}
const data = await body.json();
return data;
}
function getCacheKey(apiData) {
if (apiData.data && apiData.data.uniqid) {
return apiData.data.uniqid;
}
if (apiData.data && apiData.data.url) {
return crypto.createHash('md5').update(apiData.data.url).digest('hex');
}
return null;
}
function getCachePaths(key) {
const subDir = key.substring(0, 1);
const dir = path.join(CACHE_DIR, subDir);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
return {
content: path.join(dir, `${key}.data`),
meta: path.join(dir, `${key}.meta`)
};
}
// Serve existing file
function serveCompletedCache(reply, cachePaths, apiData) {
const { content } = cachePaths;
const responseHeaders = { ...apiData.data.headers };
if (!responseHeaders['Access-Control-Allow-Origin']) {
responseHeaders['Access-Control-Allow-Origin'] = '*';
}
// Fastify handles Range requests automatically if we send the stream?
// Actually, for full control over custom headers + Range, we often need manual handling or plugins.
// But serving a raw stream in Fastify usually just pipes it.
// For "High Performance", letting nginx handle static files is best, but here we do it in Node.
// We will stick to the manual Range logic for consistency with previous "growing file" support.
// To support Range properly with Fastify + Stream, we can set headers and return stream.
// But for "growing" files, we need our custom pump logic.
// For completed files, we can use fs.createReadStream.
const range = reply.request.headers.range;
const stat = fs.statSync(content); // Sync is okay for startup/metadata, but Async preferred in high-perf.
// In strict high-perf, use fs.promises.stat or cache stats.
const totalSize = stat.size;
if (range) {
const parts = range.replace(/bytes=/, "").split("-");
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : totalSize - 1;
responseHeaders['Content-Range'] = `bytes ${start}-${end}/${totalSize}`;
responseHeaders['Accept-Ranges'] = 'bytes';
responseHeaders['Content-Length'] = (end - start) + 1;
reply.code(206).headers(responseHeaders);
return fs.createReadStream(content, { start, end });
} else {
responseHeaders['Content-Length'] = totalSize;
responseHeaders['Accept-Ranges'] = 'bytes';
reply.code(200).headers(responseHeaders);
return fs.createReadStream(content);
}
}
// Download and Serve logic
function downloadAndServe(reply, apiData, cachePaths, key) {
let task = activeDownloads.get(key);
if (!task) {
task = {
emitter: new EventEmitter(),
currentSize: 0,
totalSize: 0,
path: cachePaths.content,
done: false,
error: null
};
task.emitter.setMaxListeners(0);
activeDownloads.set(key, task);
// Start Download
const targetUrl = apiData.data.url;
// Use Undici stream for high performance download
// stream() is efficient for piping
const { stream } = require('undici');
stream(targetUrl, { method: 'GET', opaque: task }, ({ statusCode, headers }) => {
if (statusCode !== 200) {
// handle error
const err = new Error(`Upstream ${statusCode}`);
task.error = err;
task.emitter.emit('error', err);
activeDownloads.delete(key);
// We need to return a Writable to undici
// return new Writable(...) or simple dummy
return fs.createWriteStream('/dev/null');
}
// Save meta
fs.writeFileSync(cachePaths.meta, JSON.stringify(apiData));
task.totalSize = parseInt(headers['content-length'] || '0', 10);
// Return the write stream to file
const fileStream = fs.createWriteStream(task.path);
// Monitor writing
// We need to intercept the stream to update currentSize
// PassThrough stream adds overhead.
// Better to wrap the write?
// Or just fs.watchFile? (Slow).
// Let's use a custom Writable wrapper or event listener on fileStream 'drain'/'finish' isn't granular enough.
// Undici stream factory returns a Writable.
// Let's stick to a simple approach:
// We can't easily hook into fs.WriteStream bytesWritten in real-time without polling or proxying.
// Proxying:
const originalWrite = fileStream.write.bind(fileStream);
fileStream.write = (chunk, encoding, cb) => {
const ret = originalWrite(chunk, encoding, cb);
task.currentSize += chunk.length;
task.emitter.emit('progress', task.currentSize);
return ret;
};
return fileStream;
}, ({ opaque }) => {
// Finished
opaque.done = true;
opaque.emitter.emit('done');
activeDownloads.delete(key);
}, (err, { opaque }) => {
// Error
if (err) {
opaque.error = err;
opaque.emitter.emit('error', err);
activeDownloads.delete(key);
fs.unlink(opaque.path, () => { });
}
});
}
// Serve growing file
return serveGrowingFile(reply, task, apiData);
}
function serveGrowingFile(reply, task, apiData) {
const responseHeaders = { ...apiData.data.headers };
if (!responseHeaders['Access-Control-Allow-Origin']) responseHeaders['Access-Control-Allow-Origin'] = '*';
const range = reply.request.headers.range;
let start = 0;
if (range) {
const parts = range.replace(/bytes=/, "").split("-");
start = parseInt(parts[0], 10) || 0;
responseHeaders['Accept-Ranges'] = 'bytes';
if (task.totalSize) {
responseHeaders['Content-Range'] = `bytes ${start}-${task.totalSize - 1}/${task.totalSize}`;
responseHeaders['Content-Length'] = task.totalSize - start;
}
reply.code(206);
} else {
if (task.totalSize) responseHeaders['Content-Length'] = task.totalSize;
reply.code(200);
}
reply.headers(responseHeaders);
// Custom stream to pump data from file to response
const { Readable } = require('stream');
return new Readable({
read(size) {
const self = this;
let bytesSent = start; // State needs to be per-stream instance.
// Wait, 'read' is called multiple times. We need to store state outside or on 'this'.
if (this._bytesSent === undefined) this._bytesSent = start;
pump(this);
function pump(stream) {
if (stream.destroyed) return;
if (task.error) {
stream.destroy(task.error);
return;
}
// Open FD if needed
if (!stream._fd) {
fs.open(task.path, 'r', (err, fd) => {
if (err) {
if (err.code === 'ENOENT' && !task.done) {
setTimeout(() => pump(stream), 100);
} else {
stream.destroy(err);
}
return;
}
stream._fd = fd;
pump(stream);
});
return;
}
const available = task.currentSize - stream._bytesSent;
if (available > 0) {
const buffer = Buffer.alloc(Math.min(available, 64 * 1024));
fs.read(stream._fd, buffer, 0, buffer.length, stream._bytesSent, (err, bytesRead) => {
if (err) {
stream.destroy(err);
return;
}
if (bytesRead > 0) {
stream._bytesSent += bytesRead;
const keepPushing = stream.push(buffer.slice(0, bytesRead));
// If push returns false, we should stop and wait for _read again?
// Actually Node streams: if push returns true, we can push more.
// But here we just push what we have and wait for next _read call or event?
// Standard implementation: push until it returns false.
// But for "live" tailing, we might want to just push what we have and exit,
// expecting _read to be called again by consumer.
} else {
wait(stream);
}
});
} else {
if (task.done) {
fs.close(stream._fd, () => { });
stream.push(null); // EOF
} else {
wait(stream);
}
}
}
function wait(stream) {
// Wait for progress
const onProgress = () => {
pump(stream);
};
task.emitter.once('progress', onProgress);
task.emitter.once('done', onProgress); // Check done state
// If stream destroyed, remove listeners?
// Readable.read is active, so stream is active.
}
},
destroy(err, cb) {
if (this._fd) fs.close(this._fd, () => { });
cb(err);
}
});
}
// Routes
fastify.get('/favicon.ico', async (request, reply) => {
reply.code(204);
return '';
});
fastify.get('/*', async (request, reply) => {
const token = request.url.substring(1);
if (!token) {
reply.code(400);
return 'Missing token';
}
try {
// 1. Fetch API
// Note: fetchApi is async, so we await
const apiData = await fetchApi(token);
if (apiData.code !== 200 || !apiData.data || !apiData.data.url) {
reply.code(404);
return 'Invalid API response';
}
const key = getCacheKey(apiData);
if (!key) {
reply.code(500);
return 'Key Error';
}
const cachePaths = getCachePaths(key);
// 2. Serve
if (activeDownloads.has(key)) {
return downloadAndServe(reply, apiData, cachePaths, key);
} else if (fs.existsSync(cachePaths.content) && fs.existsSync(cachePaths.meta)) {
return serveCompletedCache(reply, cachePaths, apiData);
} else {
return downloadAndServe(reply, apiData, cachePaths, key);
}
} catch (err) {
request.log.error(err);
reply.code(502);
return 'Gateway Error: ' + err.message;
}
});
// Run
fastify.listen({ port: PORT, host: '0.0.0.0' }, (err, address) => {
if (err) {
console.error(err);
process.exit(1);
}
console.log(`Fastify Server running at ${address}`);
});