# Copyright (c) 2015, inaz2 # Copyright (C) 2021 jahoti # Licensing information is collated in the `copyright` file """ The core for a "virtual network" proxy Be sure to run this inside your intended certificates directory. """ import os, socket, ssl, sys, threading, time from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn from subprocess import Popen, PIPE gen_cert_req, lock = 'openssl req -new -key cert.key -subj /CN=%s', threading.Lock() sign_cert_req = 'openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -set_serial %d -out %s' class ProxyRequestHandler(BaseHTTPRequestHandler): """Handles a network request made to the proxy""" def log_error(self, format, *args): # suppress "Request timed out: timeout('timed out',)" if isinstance(args[0], socket.timeout): return self.log_message(format, *args) def do_CONNECT(self): hostname = self.path.split(':')[0] certpath = '%s.crt' % (hostname if hostname != 'ca' else 'CA') with lock: if not os.path.isfile(certpath): p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate() self.send_response(200) self.end_headers() self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True) self.rfile = self.connection.makefile('rb', self.rbufsize) self.wfile = self.connection.makefile('wb', self.wbufsize) self.close_connection = int(self.headers.get('Proxy-Connection', '').lower() == 'close') def proxy(self): content_length = int(self.headers.get('Content-Length', 0)) req_body = self.rfile.read(content_length) if content_length else None if self.path[0] == '/': if isinstance(self.connection, ssl.SSLSocket): self.path = 'https://%s%s' % (self.headers['Host'], self.path) else: self.path = 'http://%s%s' % (self.headers['Host'], self.path) self.handle_request(req_body) do_OPTIONS = do_DELETE = do_PUT = do_HEAD = do_POST = do_GET = proxy def handle_request(self, req_body): pass class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): """The actual proxy server""" address_family, daemon_threads = socket.AF_INET6, True def handle_error(self, request, client_address): # suppress socket/ssl related errors cls, e = sys.exc_info()[:2] if not (cls is socket.error or cls is ssl.SSLError): return HTTPServer.handle_error(self, request, client_address)