alist-proxy/index.py
2024-08-30 09:36:55 +08:00

113 lines
4.3 KiB
Python

import http.server
import urllib.request
import urllib.parse
import json
import ssl
import time
class ProxyHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
api_endpoint = 'https://oss.x-php.com/alist/link'
request_timeout = 10 # 10 seconds
cache = {}
def do_GET(self):
self.proxy_request()
def do_POST(self):
self.proxy_request()
def proxy_request(self):
path = self.path
# Filter out favicon.ico requests
if path == '/favicon.ico':
self.send_response(204)
self.end_headers()
return
sign = self.headers.get('sign', '')
# Check if the data is in cache and not expired
cache_entry = self.cache.get(path)
if cache_entry and cache_entry['expiration'] > time.time():
self.serve_from_cache(cache_entry)
return
else:
self.cache.pop(path, None) # Remove expired cache entry if exists
# Construct the POST data
post_data = urllib.parse.urlencode({'path': path, 'sign': sign}).encode('utf-8')
try:
# Request the real URL from the API
context = ssl._create_unverified_context()
req = urllib.request.Request(self.api_endpoint, data=post_data, method='POST')
req.add_header('Accept', 'application/json')
with urllib.request.urlopen(req, timeout=self.request_timeout, context=context) as response:
api_response = response.read().decode('utf-8')
# Ensure the response is JSON
try:
api_data = json.loads(api_response)
except json.JSONDecodeError:
self.send_error(502, 'Bad Gateway: Failed to decode JSON')
return
if isinstance(api_data, dict) and api_data.get('code') == 200 and api_data.get('data') and api_data['data'].get('url'):
real_url = api_data['data']['url']
cloud_type = api_data['data']['cloudtype']
expiration = int(api_data['data'].get('expiration', 0)) # Convert expiration to int
# Cache the response if expiration is greater than 0
if expiration > 0:
self.cache[path] = {
'real_url': real_url,
'cloud_type': cloud_type,
'expiration': time.time() + expiration
}
self.fetch_and_serve(real_url, cloud_type)
else:
self.send_error(502, api_data.get('message', 'Bad Gateway'))
except urllib.error.URLError as api_error:
if isinstance(api_error.reason, str) and 'timed out' in api_error.reason:
self.send_error(504, 'Gateway Timeout')
else:
self.send_error(500, 'Internal Server Error')
def fetch_and_serve(self, real_url, cloud_type):
try:
context = ssl._create_unverified_context()
with urllib.request.urlopen(real_url, timeout=self.request_timeout, context=context) as real_response:
self.send_response(real_response.status)
for key, value in real_response.getheaders():
self.send_header(key, value)
self.send_header('cloudtype', cloud_type)
self.end_headers()
self.wfile.write(real_response.read())
except ConnectionResetError:
print(f"Connection reset by peer when fetching {real_url}")
except BrokenPipeError:
print(f"Broken pipe when serving {real_url}")
except Exception as real_error:
self.send_error(502, f'Bad Gateway: {real_url}')
def serve_from_cache(self, cache_entry):
real_url = cache_entry['real_url']
cloud_type = cache_entry['cloud_type']
self.fetch_and_serve(real_url, cloud_type)
def run(server_class=http.server.HTTPServer, handler_class=ProxyHTTPRequestHandler, port=3000):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print(f"Proxy server is running on http://localhost:{port}")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nServer is shutting down...")
httpd.server_close()
if __name__ == '__main__':
run()