From 625b418b670c2a1c0b60645afee14f1517fc0527 Mon Sep 17 00:00:00 2001 From: XiaoMo Date: Mon, 1 Sep 2025 22:21:36 +0800 Subject: [PATCH] 1111 --- source.js | 792 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 654 insertions(+), 138 deletions(-) diff --git a/source.js b/source.js index 53b6d53..e5f150d 100644 --- a/source.js +++ b/source.js @@ -9,6 +9,8 @@ const crypto = require('crypto'); const CACHE_DIR_NAME = '.cache'; const DEFAULT_PORT = 9001; const DEFAULT_API_ENDPOINT = 'http://183.6.121.121:9519/api'; +// 备用 API +const BACKUP_API_ENDPOINT = 'http://x-mo.cn:9519/api'; const cacheDir = pathModule.join(__dirname, CACHE_DIR_NAME); const pathIndex = {}; @@ -137,7 +139,7 @@ async function handleApiRedirect(res, apiData) { res.end(); } -async function processSuccessfulApiData(apiData, uniqidhex, reqPath, token, sign, res) { +async function processSuccessfulApiData(apiData, uniqidhex, reqPath, token, sign, res, req) { const { url: realUrl, cloudtype, expiration, path: apiPath, headers, uniqid, thumb } = apiData.data; const data = { realUrl, cloudtype, expiration: expiration * 1000, path: apiPath, headers, uniqid, thumb }; @@ -159,16 +161,16 @@ async function processSuccessfulApiData(apiData, uniqidhex, reqPath, token, sign const contentLength = stats.size; if (contentLength < 2048 && data.headers['content-length'] && parseInt(data.headers['content-length'], 10) !== contentLength) { console.warn(`Content length mismatch for ${cacheContentFile}. API: ${data.headers['content-length']}, Cache: ${contentLength}. Re-fetching.`); - fetchAndServe(data, tempCacheContentFile, cacheContentFile, cacheMetaFile, res); + fetchAndServe(data, tempCacheContentFile, cacheContentFile, cacheMetaFile, res, req); } else { - serveFromCache(data, cacheContentFile, cacheMetaFile, res); + serveFromCache(data, cacheContentFile, cacheMetaFile, res, req); } } else { - fetchAndServe(data, tempCacheContentFile, cacheContentFile, cacheMetaFile, res); + fetchAndServe(data, tempCacheContentFile, cacheContentFile, cacheMetaFile, res, req); } } -async function tryServeFromStaleCacheOrError(uniqidhex, res, errorMessage) { +async function tryServeFromStaleCacheOrError(uniqidhex, res, req, errorMessage) { if (pathIndex[uniqidhex]) { const cacheMetaFile = pathModule.join(cacheDir, `${uniqidhex}.meta`); const cacheContentFile = pathModule.join(cacheDir, `${pathIndex[uniqidhex].uniqid}.content`); @@ -176,7 +178,7 @@ async function tryServeFromStaleCacheOrError(uniqidhex, res, errorMessage) { console.warn(`API call failed or returned non-200. Serving stale cache for ${uniqidhex}`); try { const cacheData = JSON.parse(fs.readFileSync(cacheMetaFile, 'utf8')); - serveFromCache(cacheData, cacheContentFile, cacheMetaFile, res); + serveFromCache(cacheData, cacheContentFile, cacheMetaFile, res, req); return; } catch (parseError) { console.error(`Error parsing stale meta file ${cacheMetaFile}:`, parseError); @@ -224,7 +226,7 @@ async function handleMainRequest(req, res) { res.end(); } else { viewsInfo.increment('cacheHit'); - serveFromCache(cacheData, cacheContentFile, cacheMetaFile, res); + serveFromCache(cacheData, cacheContentFile, cacheMetaFile, res, req); } } else { try { @@ -236,15 +238,15 @@ async function handleMainRequest(req, res) { } if (apiData.code === HTTP_STATUS.OK && apiData.data && apiData.data.url) { - await processSuccessfulApiData(apiData, uniqidhex, reqPath, token, sign, res); + await processSuccessfulApiData(apiData, uniqidhex, reqPath, token, sign, res, req); } else { viewsInfo.increment('fetchApiWarning'); - await tryServeFromStaleCacheOrError(uniqidhex, res, apiData.message); + await tryServeFromStaleCacheOrError(uniqidhex, res, req, apiData.message); } } catch (error) { viewsInfo.increment('fetchApiError'); console.error('Error in API call or processing:', error); - await tryServeFromStaleCacheOrError(uniqidhex, res, `Bad Gateway: API request failed. ${error.message}`); + await tryServeFromStaleCacheOrError(uniqidhex, res, req, `Bad Gateway: API request failed. ${error.message}`); } } } @@ -259,34 +261,42 @@ async function checkCacheHeaders(req, cacheMetaFile) { const ifNoneMatch = req.headers['if-none-match']; const ifModifiedSince = req.headers['if-modified-since']; - // Check ETag first - if (ifNoneMatch && cacheData.uniqid && ifNoneMatch === cacheData.uniqid) { - return { cacheData, isNotModified: true }; + // 优先检查ETag (更精确的缓存验证方式) + if (ifNoneMatch && cacheData.uniqid) { + // 支持弱验证器格式 "W/"etag-value"" + const cleanEtag = ifNoneMatch.replace(/^W\//, '').replace(/"/g, ''); + const cleanCacheEtag = cacheData.uniqid.replace(/^W\//, '').replace(/"/g, ''); + + if (cleanEtag === cleanCacheEtag || ifNoneMatch === cacheData.uniqid) { + console.log(`304 Not Modified: ETag match for ${cacheMetaFile}`); + return { cacheData, isNotModified: true }; + } } - // Check If-Modified-Since + // 检查If-Modified-Since (基于时间的缓存验证) if (ifModifiedSince && cacheData.headers && cacheData.headers['last-modified']) { try { const lastModifiedDate = new Date(cacheData.headers['last-modified']); const ifModifiedSinceDate = new Date(ifModifiedSince); - // The time resolution of an HTTP date is one second. - // If If-Modified-Since is at least as new as Last-Modified, send 304. + + // HTTP日期的时间精度是1秒 + // 如果If-Modified-Since至少与Last-Modified一样新,则返回304 if (lastModifiedDate.getTime() <= ifModifiedSinceDate.getTime()) { + console.log(`304 Not Modified: Last-Modified check for ${cacheMetaFile}`); return { cacheData, isNotModified: true }; } } catch (dateParseError) { console.warn(`Error parsing date for cache header check (${cacheMetaFile}):`, dateParseError); - // Proceed as if not modified check failed if dates are invalid + // 如果日期无效,则继续处理,视为未修改检查失败 } } + + // 如果没有缓存验证头或验证失败,返回正常内容 return { cacheData, isNotModified: false }; } catch (error) { console.error(`Error reading or parsing cache meta file ${cacheMetaFile} in checkCacheHeaders:`, error); - // If we can't read meta, assume cache is invalid or treat as not modified: false - // Returning a dummy cacheData or null might be better depending on how caller handles it. - // For now, let it propagate and potentially fail later if cacheData is expected. - // Or, more safely, indicate cache is not valid / not modified is false. - return { cacheData: null, isNotModified: false }; // Indicate failure to load cacheData + // 如果无法读取元数据,假设缓存无效 + return { cacheData: null, isNotModified: false }; } } @@ -299,11 +309,21 @@ function isCacheValid(cacheMetaFile, cacheContentFile) { try { const metaContent = fs.readFileSync(cacheMetaFile, 'utf8'); const cacheData = JSON.parse(metaContent); - // Ensure expiration is a number and in the future + + // 确保expiration是一个数字并且在未来 + // 如果expiration未定义或为0,则设置为24小时后过期 + if (!cacheData.expiration || cacheData.expiration === 0) { + cacheData.expiration = Date.now() + CACHE_EXPIRY_MS; + // 更新缓存元数据文件 + fs.writeFileSync(cacheMetaFile, JSON.stringify(cacheData)); + console.log(`Updated missing expiration in ${cacheMetaFile} to ${cacheData.expiration}`); + return true; + } + return typeof cacheData.expiration === 'number' && cacheData.expiration > Date.now(); } catch (error) { console.warn(`Error reading or parsing cache meta file ${cacheMetaFile} for validation:`, error); - return false; // If meta file is corrupt or unreadable, cache is not valid + return false; // 如果元数据文件损坏或不可读,则缓存无效 } } @@ -312,12 +332,18 @@ function isCacheValid(cacheMetaFile, cacheContentFile) { const API_TIMEOUT_MS = 5000; const USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36'; -async function fetchApiData(reqPath, token, sign) { +// 用于跟踪API状态的变量 +let isDefaultApiHealthy = true; +let lastApiSwitchTime = 0; +const API_SWITCH_COOLDOWN_MS = 60000; // 切换API后的冷却时间,防止频繁切换 + +// 尝试使用指定的API端点获取数据 +async function tryFetchWithEndpoint(endpoint, reqPath, token, sign) { const queryParams = querystring.stringify({ type: reqPath, sign: sign }); - const apiUrl = `${apiEndpoint}?${queryParams}`; + const apiUrl = `${endpoint}?${queryParams}`; const parsedApiUrl = new URL(apiUrl); const protocol = parsedApiUrl.protocol === 'https:' ? https : http; @@ -348,10 +374,10 @@ async function fetchApiData(reqPath, token, sign) { const parsedError = JSON.parse(responseData); if (parsedError && parsedError.message) errorPayload.message = parsedError.message; } catch (e) { /* Ignore if response is not JSON */ } - resolve(errorPayload); // Resolve with error structure for consistency + resolve({ success: false, data: errorPayload }); return; } - resolve(JSON.parse(responseData)); + resolve({ success: true, data: JSON.parse(responseData) }); } catch (parseError) { console.error(`Error parsing JSON response from ${apiUrl}:`, parseError, responseData); reject(new Error(`Failed to parse API response: ${parseError.message}`)); @@ -374,87 +400,421 @@ async function fetchApiData(reqPath, token, sign) { }); } -// 从真实 URL 获取数据并写入缓存 -const REAL_URL_FETCH_TIMEOUT_MS = 0; // 0 means no timeout for the actual file download +// 主API获取函数,支持故障转移 +async function fetchApiData(reqPath, token, sign) { + // 确定当前使用的API端点 + const currentEndpoint = isDefaultApiHealthy ? DEFAULT_API_ENDPOINT : BACKUP_API_ENDPOINT; + const backupEndpoint = isDefaultApiHealthy ? BACKUP_API_ENDPOINT : DEFAULT_API_ENDPOINT; -const fetchAndServe = (data, tempCacheContentFile, cacheContentFile, cacheMetaFile, res) => { - const protocol = data.realUrl.startsWith('https:') ? https : http; + try { + // 尝试使用当前API端点 + const result = await tryFetchWithEndpoint(currentEndpoint, reqPath, token, sign); - protocol.get(data.realUrl, { timeout: REAL_URL_FETCH_TIMEOUT_MS, rejectUnauthorized: false }, (realRes) => { - const cacheStream = fs.createWriteStream(tempCacheContentFile, { flags: 'w' }); - - let isVideo = data.path && typeof data.path === 'string' && data.path.includes('.mp4'); - // 确保 content-length 是有效的 - const contentLength = realRes.headers['content-length']; - if (contentLength) { - - // contentLength 小于 2KB 且与缓存文件大小不一致时,重新获取 - if (contentLength < 2048 && data.headers['content-length'] !== contentLength) { - console.warn('Warning: content-length is different for the response from:', data.realUrl); - sendErrorResponse(res, HTTP_STATUS.BAD_GATEWAY, `Bad Gateway: Content-Length mismatch for ${data.realUrl}`); - // Clean up temp file if stream hasn't started or failed early - if (fs.existsSync(tempCacheContentFile)) { - fs.unlinkSync(tempCacheContentFile); - } - return; + // 如果当前使用的是备用API且成功了,考虑是否切回主API + if (!isDefaultApiHealthy && result.success) { + const now = Date.now(); + if (now - lastApiSwitchTime > API_SWITCH_COOLDOWN_MS) { + // 尝试恢复使用默认API + console.log(`尝试恢复使用默认API: ${DEFAULT_API_ENDPOINT}`); + isDefaultApiHealthy = true; + lastApiSwitchTime = now; } - - data.headers['content-length'] = contentLength; - // 更新 data 到缓存 cacheMetaFile - fs.writeFileSync(cacheMetaFile, JSON.stringify(data)); - } else { - console.warn('Warning: content-length is undefined for the response from:', data.realUrl); } - const baseHeaders = { + return result.data; + } catch (error) { + console.error(`API请求失败,尝试使用备用API: ${backupEndpoint}`, error); + + // 如果当前API失败,切换到备用API + if (isDefaultApiHealthy) { + console.log(`主API ${DEFAULT_API_ENDPOINT} 不可用,切换到备用API ${BACKUP_API_ENDPOINT}`); + isDefaultApiHealthy = false; + lastApiSwitchTime = Date.now(); + } else { + console.log(`备用API ${BACKUP_API_ENDPOINT} 不可用,切换回主API ${DEFAULT_API_ENDPOINT}`); + isDefaultApiHealthy = true; + lastApiSwitchTime = Date.now(); + } + + try { + // 尝试使用备用API端点 + const backupResult = await tryFetchWithEndpoint(backupEndpoint, reqPath, token, sign); + return backupResult.data; + } catch (backupError) { + console.error(`备用API也失败了,无法获取数据`, backupError); + throw backupError; // 如果备用API也失败,则抛出错误 + } + } +} + +// 从真实 URL 获取数据并写入缓存 +const REAL_URL_FETCH_TIMEOUT_MS = 30000; // 30秒超时 +const HIGH_WATER_MARK = 64 * 1024; // 64KB 缓冲区 + +// 媒体文件扩展名列表 +const VIDEO_EXTENSIONS = ['.mp4', '.mkv', '.webm', '.avi', '.mov', '.flv']; +const AUDIO_EXTENSIONS = ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a']; + +// 检查文件类型的辅助函数 +const isMediaFile = (path, extensions) => { + if (!path || typeof path !== 'string') return false; + return extensions.some(ext => path.includes(ext)); +}; + +// 从缓存提供范围请求的辅助函数 +const serveRangeFromCache = (rangeHeader, cacheContentFile, cacheMetaFile, res, isVideo, isAudio) => { + try { + const stats = fs.statSync(cacheContentFile); + const fileSize = stats.size; + + // 解析Range头 + const ranges = rangeHeader.replace(/bytes=/, '').split(','); + const rangeSpec = ranges[0].trim(); + const parts = rangeSpec.split('-'); + let start = parseInt(parts[0], 10); + let end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1; + + // 处理特殊范围格式,如 bytes=-500 (最后500字节) + if (isNaN(start)) { + start = Math.max(0, fileSize - parseInt(parts[1], 10)); + end = fileSize - 1; + } + + // 确保范围有效 + end = Math.min(end, fileSize - 1); + start = Math.min(start, end); + + if (start < fileSize && fileSize > 0) { + console.log(`Serving range request from cache: bytes ${start}-${end}/${fileSize}`); + + const chunkSize = (end - start) + 1; + const metaData = JSON.parse(fs.readFileSync(cacheMetaFile, 'utf8')); + + const readStream = fs.createReadStream(cacheContentFile, { + start, end, highWaterMark: HIGH_WATER_MARK + }); + + const contentType = (metaData.headers && metaData.headers['content-type']) || + (isVideo ? 'video/mp4' : (isAudio ? 'audio/mpeg' : 'application/octet-stream')); + + res.writeHead(206, { + 'Content-Range': `bytes ${start}-${end}/${fileSize}`, + 'Content-Length': chunkSize.toString(), + 'Content-Type': contentType, + 'Accept-Ranges': 'bytes', + 'ETag': metaData.uniqid || '', + 'Cache-Control': 'public, max-age=3600', + 'Cloud-Type': metaData.cloudtype || 'unknown', + 'Cloud-Expiration': new Date(metaData.expiration || (Date.now() + 3600000)).toLocaleString() + }); + + readStream.pipe(res); + + readStream.on('error', (err) => { + console.error(`Read stream error: ${err.message}`); + if (!res.headersSent) { + sendErrorResponse(res, HTTP_STATUS.INTERNAL_SERVER_ERROR, 'Error reading from cache'); + } else { + try { res.end(); } catch (e) { /* ignore */ } + } + }); + + return true; // 成功从缓存提供服务 + } + } catch (error) { + console.error(`Error serving range request from cache: ${error.message}`); + } + return false; // 从缓存提供服务失败 +}; + +const fetchAndServe = (data, tempCacheContentFile, cacheContentFile, cacheMetaFile, res, req) => { + const protocol = data.realUrl.startsWith('https:') ? https : http; + const cacheFileExists = fs.existsSync(cacheContentFile); + + // 如果缓存文件已存在,直接使用它 + if (cacheFileExists) { + tempCacheContentFile = cacheContentFile; + console.log(`Using existing cache file: ${cacheContentFile}`); + } + + // 检查媒体文件类型 + const isVideo = isMediaFile(data.path, VIDEO_EXTENSIONS); + const isAudio = isMediaFile(data.path, AUDIO_EXTENSIONS); + const rangeHeader = req && req.headers && req.headers.range; + + // 构建请求选项 + const requestOptions = { + timeout: REAL_URL_FETCH_TIMEOUT_MS, + rejectUnauthorized: false, + headers: { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36', + 'Accept': '*/*', + 'Accept-Encoding': 'gzip, deflate, br', + 'Connection': 'keep-alive' + } + }; + + // 处理范围请求 + if (rangeHeader) { + console.log(`Forwarding range request: ${rangeHeader} to ${data.realUrl}`); + requestOptions.headers['Range'] = rangeHeader; + + // 尝试从缓存提供范围请求 + if (cacheFileExists && fs.existsSync(cacheMetaFile)) { + if (serveRangeFromCache(rangeHeader, cacheContentFile, cacheMetaFile, res, isVideo, isAudio)) { + return; // 成功从缓存提供服务,不需要继续 + } + } + } + + // 转发条件请求头 + if (req && req.headers) { + ['if-range', 'if-match', 'if-none-match'].forEach(header => { + if (req.headers[header]) { + requestOptions.headers[header.charAt(0).toUpperCase() + header.slice(1)] = req.headers[header]; + console.log(`Forwarding ${header} header: ${req.headers[header]}`); + } + }); + } + + protocol.get(data.realUrl, requestOptions, (realRes) => { + // 处理304 Not Modified响应 + if (realRes.statusCode === 304) { + console.log(`Received 304 Not Modified from source for ${data.realUrl}`); + res.writeHead(304, { + 'ETag': data.uniqid || '', + 'Cache-Control': 'public, max-age=3600', + 'Date': new Date().toUTCString() + }); + res.end(); + return; + } + + // 使用更高效的缓冲区设置,对于已存在的文件使用追加模式 + const cacheStream = fs.createWriteStream(tempCacheContentFile, { + flags: tempCacheContentFile === cacheContentFile && fs.existsSync(tempCacheContentFile) ? 'a' : 'w', + highWaterMark: HIGH_WATER_MARK + }); + + // 处理响应头和缓存元数据 + const handleResponseHeaders = () => { + const contentLength = realRes.headers['content-length']; + + // 验证内容长度 + if (contentLength) { + // 检查内容长度是否异常 + if (contentLength < 2048 && data.headers['content-length'] !== contentLength && !rangeHeader) { + console.warn('Warning: content-length mismatch from:', data.realUrl); + sendErrorResponse(res, HTTP_STATUS.BAD_GATEWAY, `Bad Gateway: Content-Length mismatch`); + + // 清理临时文件 + if (fs.existsSync(tempCacheContentFile)) { + fs.unlinkSync(tempCacheContentFile); + } + return false; + } + + // 只在非范围请求时更新content-length + if (!rangeHeader) { + data.headers['content-length'] = contentLength; + updateCacheMetadata(); + } + } else { + console.warn('Warning: content-length undefined from:', data.realUrl); + } + + // 确保过期时间有效 + if (!data.expiration || data.expiration === 0) { + data.expiration = Date.now() + CACHE_EXPIRY_MS; + updateCacheMetadata(); + } + + return true; + }; + + // 更新缓存元数据 + const updateCacheMetadata = () => { + fs.writeFile(cacheMetaFile, JSON.stringify(data), (err) => { + if (err) console.error(`Error updating cache metadata: ${err.message}`); + }); + }; + + // 如果头部处理失败,直接返回 + if (!handleResponseHeaders()) return; + + // 格式化过期时间为可读格式 + const expirationDate = new Date(data.expiration).toLocaleString(); + + // 设置响应头 - 更精简的方式 + const responseHeaders = { 'Cloud-Type': data.cloudtype, - 'Cloud-Expiration': data.expiration, + 'Cloud-Expiration': expirationDate, 'ETag': data.uniqid || '', - 'Cache-Control': 'public, max-age=31536000', // 1 year - 'Expires': new Date(Date.now() + 31536000000).toUTCString(), + 'Cache-Control': 'public, max-age=3600, stale-while-revalidate=86400', + 'Expires': new Date(Date.now() + 3600000).toUTCString(), 'Accept-Ranges': 'bytes', 'Connection': 'keep-alive', - 'Date': new Date().toUTCString(), // Should be set by the server, but good for consistency - 'Last-Modified': data.headers['last-modified'] || new Date(fs.statSync(cacheMetaFile).mtime).toUTCString(), // Prefer API's Last-Modified if available - }; - const responseHeaders = { - ...baseHeaders, - 'Content-Type': realRes.headers['content-type'] || (isVideo ? 'video/mp4' : 'application/octet-stream'), // Prefer actual content-type - ...data.headers, // Allow API to override some headers if necessary + 'Date': new Date().toUTCString(), + 'Last-Modified': data.headers['last-modified'] || new Date().toUTCString(), + 'Vary': 'Accept-Encoding', + 'Content-Type': realRes.headers['content-type'] || (isVideo ? 'video/mp4' : 'application/octet-stream'), + ...data.headers }; + + // 处理范围响应 + if (realRes.headers['content-range']) { + responseHeaders['Content-Range'] = realRes.headers['content-range']; + console.log(`Received partial content: ${realRes.headers['content-range']} for ${data.realUrl}`); + + // 从Content-Range中提取文件总大小 + const contentRangeMatch = /\/([0-9]+)$/.exec(realRes.headers['content-range']); + if (contentRangeMatch && contentRangeMatch[1]) { + const totalSize = parseInt(contentRangeMatch[1], 10); + if (totalSize > 0 && (!data.headers['content-length'] || parseInt(data.headers['content-length'], 10) !== totalSize)) { + data.headers['content-length'] = totalSize.toString(); + console.log(`Updated content-length: ${totalSize}`); + fs.writeFile(cacheMetaFile, JSON.stringify(data), err => { + if (err) console.error(`Error updating meta: ${err.message}`); + }); + } + } + } + // 使用源服务器返回的状态码 res.writeHead(realRes.statusCode, responseHeaders); - realRes.pipe(cacheStream); - realRes.pipe(res); - + + // 使用流事件处理来优化性能 + let bytesReceived = 0; + const startTime = Date.now(); + + // 检测是否需要处理压缩内容 + const contentEncoding = realRes.headers['content-encoding']; + if (contentEncoding && (contentEncoding.includes('gzip') || contentEncoding.includes('deflate') || contentEncoding.includes('br'))) { + // 对于压缩内容,我们直接传递,不做解压处理 + console.log(`Streaming compressed content (${contentEncoding}) for ${data.realUrl}`); + } + + // 使用更高效的流处理方式,确保边下边播 + // 创建一个Transform流,同时写入缓存和响应 + const { Transform } = require('stream'); + const streamRelay = new Transform({ + transform(chunk, encoding, callback) { + // 将数据传递给下一个流 + this.push(chunk); + + // 更新接收的字节数 + bytesReceived += chunk.length; + + // 每10MB记录一次进度 + if (bytesReceived % (10 * 1024 * 1024) === 0) { + const elapsedSeconds = (Date.now() - startTime) / 1000; + const mbReceived = bytesReceived / (1024 * 1024); + const mbps = mbReceived / elapsedSeconds; + console.log(`Progress for ${data.realUrl}: ${mbReceived.toFixed(2)}MB received at ${mbps.toFixed(2)}MB/s`); + } + + callback(); + } + }); + + // 设置错误处理 + streamRelay.on('error', (err) => { + console.error(`Stream relay error for ${data.realUrl}:`, err); + try { + cacheStream.end(); + if (!res.writableEnded) res.end(); + } catch (e) { /* ignore */ } + }); + + // 优化流处理,确保数据立即流向客户端,实现真正的边下边播 + // 直接将源响应通过中继流同时发送到客户端和缓存 + realRes.pipe(streamRelay); + + // 优先将数据发送给客户端,确保低延迟 + streamRelay.pipe(res, { end: true }); + + // 同时将数据写入缓存 + streamRelay.pipe(cacheStream, { end: false }); // 不自动结束缓存流,我们需要在完成后手动处理 + + // 处理客户端提前关闭连接 + res.on('close', () => { + if (!res.writableEnded) { + console.log(`Client closed connection prematurely for ${data.realUrl}`); + // 断开客户端连接,但继续下载到缓存 + streamRelay.unpipe(res); + // 不中断缓存写入,继续下载完整文件 + } + }); + + // 处理完成事件 realRes.on('end', () => { - cacheStream.end(() => { // Ensure stream is fully flushed before renaming + const totalTime = (Date.now() - startTime) / 1000; + const totalMB = bytesReceived / (1024 * 1024); + console.log(`Completed ${data.realUrl}: ${totalMB.toFixed(2)}MB in ${totalTime.toFixed(2)}s (${(totalMB/totalTime).toFixed(2)}MB/s)`); + + // 确保缓存流正确结束 + cacheStream.end(() => { if (fs.existsSync(tempCacheContentFile)) { - try { - // Ensure the target directory exists before renaming - const targetDir = pathModule.dirname(cacheContentFile); - if (!fs.existsSync(targetDir)) { - fs.mkdirSync(targetDir, { recursive: true }); - } - fs.renameSync(tempCacheContentFile, cacheContentFile); + // 如果临时文件就是缓存文件,不需要重命名 + if (tempCacheContentFile === cacheContentFile) { console.log(`Successfully cached: ${cacheContentFile}`); - - } catch (renameError) { - console.error(`Error renaming temp cache file ${tempCacheContentFile} to ${cacheContentFile}:`, renameError); - // If rename fails, try to remove the temp file to avoid clutter - try { fs.unlinkSync(tempCacheContentFile); } catch (e) { /* ignore */ } + + // 更新缓存元数据,添加文件大小信息 + if (bytesReceived > 0) { + data.headers['content-length'] = bytesReceived.toString(); + fs.writeFile(cacheMetaFile, JSON.stringify(data), (err) => { + if (err) console.error(`Error updating content-length in cache meta file ${cacheMetaFile}:`, err); + }); + } + } else { + // 确保目标目录存在 + const targetDir = pathModule.dirname(cacheContentFile); + fs.mkdir(targetDir, { recursive: true }, (mkdirErr) => { + if (mkdirErr) { + console.error(`Error creating directory ${targetDir}:`, mkdirErr); + try { fs.unlinkSync(tempCacheContentFile); } catch (e) { /* ignore */ } + return; + } + + // 异步重命名文件 + fs.rename(tempCacheContentFile, cacheContentFile, (renameErr) => { + if (renameErr) { + console.error(`Error renaming temp cache file ${tempCacheContentFile} to ${cacheContentFile}:`, renameErr); + try { fs.unlinkSync(tempCacheContentFile); } catch (e) { /* ignore */ } + } else { + console.log(`Successfully cached: ${cacheContentFile}`); + + // 更新缓存元数据,添加文件大小信息 + if (bytesReceived > 0) { + data.headers['content-length'] = bytesReceived.toString(); + fs.writeFile(cacheMetaFile, JSON.stringify(data), (err) => { + if (err) console.error(`Error updating content-length in cache meta file ${cacheMetaFile}:`, err); + }); + } + } + }); + }); } } else { - // This case might indicate an issue if the stream ended but no temp file was created/found console.warn(`Temp cache file ${tempCacheContentFile} not found after stream end for ${data.realUrl}`); } }); }); + // 改进错误处理 realRes.on('error', (streamError) => { console.error(`Error during response stream from ${data.realUrl}:`, streamError); cacheStream.end(); // Close the writable stream - handleResponseError(res, tempCacheContentFile, data.realUrl); // tempCacheContentFile might be partially written + + // 如果响应已经开始发送,我们不能再发送错误响应 + if (!res.headersSent) { + handleResponseError(res, tempCacheContentFile, data.realUrl); + } else { + // 如果头部已发送,尝试结束响应 + try { res.end(); } catch (e) { /* ignore */ } + // 清理临时文件 + try { fs.unlinkSync(tempCacheContentFile); } catch (e) { /* ignore */ } + } }); }).on('error', (requestError) => { @@ -465,78 +825,234 @@ const fetchAndServe = (data, tempCacheContentFile, cacheContentFile, cacheMetaFi }; // 从缓存中读取数据并返回 -function serveFromCache(cacheData, cacheContentFile, cacheMetaFile, res) { +function serveFromCache(cacheData, cacheContentFile, cacheMetaFile, res, req) { if (!cacheData) { // Added check for null cacheData from checkCacheHeaders failure console.error(`serveFromCache called with null cacheData for ${cacheContentFile}`); sendErrorResponse(res, HTTP_STATUS.INTERNAL_SERVER_ERROR, 'Cache metadata unavailable.'); return; } + // 获取文件大小 + let fileSize = 0; + try { + const stats = fs.statSync(cacheContentFile); + fileSize = stats.size; + + // 更新缓存元数据中的content-length + if (fileSize > 0) { + if (!cacheData.headers) cacheData.headers = {}; + if (!cacheData.headers['content-length'] || parseInt(cacheData.headers['content-length'], 10) !== fileSize) { + cacheData.headers['content-length'] = fileSize.toString(); + // 异步更新缓存元数据 + fs.writeFile(cacheMetaFile, JSON.stringify(cacheData), (err) => { + if (err) console.error(`Error updating content-length in ${cacheMetaFile}:`, err); + else console.log(`Updated content-length in ${cacheMetaFile} to ${fileSize}`); + }); + } + } + } catch (statError) { + console.error(`Error stating cache content file ${cacheContentFile}:`, statError); + handleCacheReadError(res, cacheContentFile); + return; + } + + // 设置更强的缓存控制,确保短时间内不会重复请求 + const now = Date.now(); + const nowUTC = new Date(now).toUTCString(); + const expiresUTC = new Date(now + 3600000).toUTCString(); + + // 获取Last-Modified,优先使用API的值 + let lastModified = cacheData.headers && cacheData.headers['last-modified'] + ? cacheData.headers['last-modified'] + : new Date(fs.statSync(cacheMetaFile).mtime).toUTCString(); + + // 生成ETag,用于断点下载 + const etag = cacheData.uniqid || crypto.createHash('md5').update(cacheContentFile + fileSize).digest('hex'); + const baseHeaders = { 'Cloud-Type': cacheData.cloudtype || 'unknown', - 'Cloud-Expiration': cacheData.expiration || 'N/A', - 'ETag': cacheData.uniqid || crypto.createHash('md5').update(fs.readFileSync(cacheContentFile)).digest('hex'), // Fallback ETag if missing - 'Cache-Control': 'public, max-age=31536000', // 1 year - 'Expires': new Date(Date.now() + 31536000000).toUTCString(), - 'Accept-Ranges': 'bytes', + 'Cloud-Expiration': new Date(cacheData.expiration || (now + CACHE_EXPIRY_MS)).toLocaleString(), + 'ETag': etag, + 'Cache-Control': 'public, max-age=3600, stale-while-revalidate=86400', + 'Expires': expiresUTC, + 'Pragma': 'cache', + 'Accept-Ranges': 'bytes', // 关键:支持范围请求和断点下载 'Connection': 'keep-alive', - 'Date': new Date().toUTCString(), - 'Last-Modified': (cacheData.headers && cacheData.headers['last-modified']) || new Date(fs.statSync(cacheMetaFile).mtime).toUTCString(), + 'Date': nowUTC, + 'Last-Modified': lastModified, + 'Vary': 'Accept-Encoding', }; viewsInfo.increment('cacheCall'); - const readStream = fs.createReadStream(cacheContentFile); - const isVideo = cacheData.path && typeof cacheData.path === 'string' && cacheData.path.includes('.mp4'); - - let currentContentLength = cacheData.headers && cacheData.headers['content-length'] ? parseInt(cacheData.headers['content-length'], 10) : 0; - - if (!currentContentLength || currentContentLength === 0) { - try { - const stats = fs.statSync(cacheContentFile); - currentContentLength = stats.size; - if (currentContentLength > 0) { - if (!cacheData.headers) cacheData.headers = {}; - cacheData.headers['content-length'] = currentContentLength.toString(); - // Update meta file if content-length was missing or zero - fs.writeFileSync(cacheMetaFile, JSON.stringify(cacheData)); - console.log(`Updated content-length in ${cacheMetaFile} to ${currentContentLength}`); - } else { - console.warn(`Cached content file ${cacheContentFile} has size 0 or stat failed.`); - // Potentially treat as an error or serve as is if 0 length is valid for some files - } - } catch (statError) { - console.error(`Error stating cache content file ${cacheContentFile}:`, statError); - handleCacheReadError(res, cacheContentFile); // Treat stat error as read error + + // 检测文件类型 + const isVideo = cacheData.path && typeof cacheData.path === 'string' && + (cacheData.path.includes('.mp4') || cacheData.path.includes('.mkv') || + cacheData.path.includes('.webm') || cacheData.path.includes('.avi') || + cacheData.path.includes('.mov') || cacheData.path.includes('.flv')); + + const isAudio = cacheData.path && typeof cacheData.path === 'string' && + (cacheData.path.includes('.mp3') || cacheData.path.includes('.wav') || + cacheData.path.includes('.ogg') || cacheData.path.includes('.flac') || + cacheData.path.includes('.aac') || cacheData.path.includes('.m4a')); + + // 检查是否是断点续传请求 + const ifRange = req && req.headers && req.headers['if-range']; + const ifMatch = req && req.headers && req.headers['if-match']; + const ifNoneMatch = req && req.headers && req.headers['if-none-match']; + + // 处理条件请求 - 如果ETag匹配,返回304 + if (ifNoneMatch && ifNoneMatch.includes(etag)) { + console.log(`304 Not Modified for ${cacheContentFile} - ETag match`); + res.writeHead(304, baseHeaders); + res.end(); + return; + } + + // 处理范围请求 (Range Requests) + const rangeHeader = req && req.headers && req.headers.range; + + // 如果有If-Range头且不匹配当前ETag,则忽略Range头,返回整个文件 + const ignoreRange = ifRange && ifRange !== etag && ifRange !== lastModified; + + if (rangeHeader && fileSize > 0 && !ignoreRange) { + console.log(`Range request received: ${rangeHeader} for file ${cacheContentFile}`); + + // 解析Range头 + const ranges = rangeHeader.replace(/bytes=/, '').split(','); + + // 如果是多范围请求,我们简化处理,只返回第一个范围 + const rangeSpec = ranges[0].trim(); + const parts = rangeSpec.split('-'); + let start = parseInt(parts[0], 10); + let end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1; + + // 处理特殊范围格式,如 bytes=-500 (最后500字节) + if (isNaN(start)) { + start = Math.max(0, fileSize - parseInt(parts[1], 10)); + end = fileSize - 1; + } + + // 确保范围不超过文件大小 + end = Math.min(end, fileSize - 1); + start = Math.min(start, end); + + // 验证范围有效性 + if (start >= fileSize) { + // 无效范围,返回416错误 + res.writeHead(416, { + 'Content-Range': `bytes */${fileSize}`, + ...baseHeaders + }); + res.end(); return; } - } - - readStream.on('open', () => { - + + // 计算范围大小 + const chunkSize = (end - start) + 1; + + // 创建范围流 + const readStream = fs.createReadStream(cacheContentFile, { + start, + end, + highWaterMark: HIGH_WATER_MARK // 使用更高效的缓冲区设置 + }); + + // 设置206部分内容响应 const responseHeaders = { ...baseHeaders, - 'Content-Type': (cacheData.headers && cacheData.headers['content-type']) || (isVideo ? 'video/mp4' : 'application/octet-stream'), - // Merge other headers from cacheData.headers, letting them override base if necessary - // but ensure our critical headers like Content-Length (if updated) are preserved. - ...(cacheData.headers || {}), + 'Content-Range': `bytes ${start}-${end}/${fileSize}`, + 'Content-Length': chunkSize.toString(), + 'Content-Type': (cacheData.headers && cacheData.headers['content-type']) || + (isVideo ? 'video/mp4' : (isAudio ? 'audio/mpeg' : 'application/octet-stream')), }; - - res.writeHead(HTTP_STATUS.OK, responseHeaders); + + console.log(`Serving partial content: bytes ${start}-${end}/${fileSize} for ${cacheContentFile}`); + res.writeHead(206, responseHeaders); + + // 处理流错误 + readStream.on('error', (err) => { + console.error(`Read stream error for range request ${cacheContentFile}:`, err); + if (!res.headersSent) { + handleCacheReadError(res, cacheContentFile); + } else { + try { res.end(); } catch (e) { /* ignore */ } + } + }); + + // 监控数据传输速度 + let bytesSent = 0; + const startTime = Date.now(); + + readStream.on('data', (chunk) => { + bytesSent += chunk.length; + // 每10MB记录一次进度 + if (bytesSent % (10 * 1024 * 1024) === 0) { + const elapsedSeconds = (Date.now() - startTime) / 1000; + const mbSent = bytesSent / (1024 * 1024); + const mbps = mbSent / elapsedSeconds; + console.log(`Range progress for ${cacheContentFile}: ${mbSent.toFixed(2)}MB sent at ${mbps.toFixed(2)}MB/s`); + } + }); + + // 将范围流传输到响应 readStream.pipe(res); - }); - - readStream.on('error', (err) => { - console.error(`Read stream error for ${cacheContentFile}:`, err); - handleCacheReadError(res, cacheContentFile); - }); - - // Handle cases where client closes connection prematurely - res.on('close', () => { - if (!res.writableEnded) { - console.log(`Client closed connection prematurely for ${cacheContentFile}. Destroying read stream.`); - readStream.destroy(); - } - }); + + // 处理客户端提前关闭连接 + res.on('close', () => { + if (!res.writableEnded) { + console.log(`Client closed range request connection prematurely for ${cacheContentFile}`); + readStream.destroy(); + } + }); + } else { + // 非范围请求,返回完整文件 + const readStream = fs.createReadStream(cacheContentFile, { + highWaterMark: HIGH_WATER_MARK // 使用更高效的缓冲区设置 + }); + + readStream.on('open', () => { + const responseHeaders = { + ...baseHeaders, + 'Content-Length': fileSize.toString(), + 'Content-Type': (cacheData.headers && cacheData.headers['content-type']) || + (isVideo ? 'video/mp4' : (isAudio ? 'audio/mpeg' : 'application/octet-stream')), + ...(cacheData.headers || {}), + }; + + res.writeHead(HTTP_STATUS.OK, responseHeaders); + + // 监控数据传输速度 + let bytesSent = 0; + const startTime = Date.now(); + + readStream.on('data', (chunk) => { + bytesSent += chunk.length; + // 每10MB记录一次进度 + if (bytesSent % (10 * 1024 * 1024) === 0) { + const elapsedSeconds = (Date.now() - startTime) / 1000; + const mbSent = bytesSent / (1024 * 1024); + const mbps = mbSent / elapsedSeconds; + console.log(`Full file progress for ${cacheContentFile}: ${mbSent.toFixed(2)}MB sent at ${mbps.toFixed(2)}MB/s`); + } + }); + + readStream.pipe(res); + }); + + readStream.on('error', (err) => { + console.error(`Read stream error for ${cacheContentFile}:`, err); + handleCacheReadError(res, cacheContentFile); + }); + + // 处理客户端提前关闭连接 + res.on('close', () => { + if (!res.writableEnded) { + console.log(`Client closed connection prematurely for ${cacheContentFile}`); + readStream.destroy(); + } + }); + } }