From b1444d9c9ea065d7c97d5809c3ec5259cb01a1da Mon Sep 17 00:00:00 2001 From: jahoti Date: Mon, 6 Sep 2021 00:00:00 +0000 Subject: Incorporate test suite from jahoti branch --- test/proxy_core.py | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 test/proxy_core.py (limited to 'test/proxy_core.py') diff --git a/test/proxy_core.py b/test/proxy_core.py new file mode 100644 index 0000000..dd4225d --- /dev/null +++ b/test/proxy_core.py @@ -0,0 +1,74 @@ +# Copyright (c) 2015, inaz2 +# Copyright (C) 2021 jahoti +# Licensing information is collated in the `copyright` file + +""" +The core for a "virtual network" proxy + +Be sure to set certdir to your intended certificates directory before running. +""" + +import os, socket, ssl, subprocess, sys, threading, time +from http.server import HTTPServer, BaseHTTPRequestHandler +from socketserver import ThreadingMixIn + +gen_cert_req, lock = 'openssl req -new -key %scert.key -subj /CN=%s', threading.Lock() +sign_cert_req = 'openssl x509 -req -days 3650 -CA %sca.crt -CAkey %sca.key -set_serial %d -out %s' + +def popen(command, *args, **kwargs): + return subprocess.Popen((command % args).split(' '), **kwargs) + +class ProxyRequestHandler(BaseHTTPRequestHandler): + """Handles a network request made to the proxy""" + def log_error(self, format, *args): + # suppress "Request timed out: timeout('timed out',)" + if isinstance(args[0], socket.timeout): + return + + self.log_message(format, *args) + + def do_CONNECT(self): + hostname = self.path.split(':')[0] + certpath = '%s%s.crt' % (certdir, hostname if hostname != 'ca' else 'CA') + + with lock: + if not os.path.isfile(certpath): + p1 = popen(gen_cert_req, certdir, hostname, stdout=subprocess.PIPE).stdout + popen(sign_cert_req, certdir, certdir, time.time() * 1000, certpath, stdin=p1, stderr=subprocess.PIPE).communicate() + + self.send_response(200) + self.end_headers() + + self.connection = ssl.wrap_socket(self.connection, keyfile=certdir+'cert.key', certfile=certpath, server_side=True) + self.rfile = self.connection.makefile('rb', self.rbufsize) + self.wfile = self.connection.makefile('wb', self.wbufsize) + + self.close_connection = int(self.headers.get('Proxy-Connection', '').lower() == 'close') + + def proxy(self): + content_length = int(self.headers.get('Content-Length', 0)) + req_body = self.rfile.read(content_length) if content_length else None + + if self.path[0] == '/': + if isinstance(self.connection, ssl.SSLSocket): + self.path = 'https://%s%s' % (self.headers['Host'], self.path) + else: + self.path = 'http://%s%s' % (self.headers['Host'], self.path) + + self.handle_request(req_body) + + do_OPTIONS = do_DELETE = do_PUT = do_HEAD = do_POST = do_GET = proxy + + def handle_request(self, req_body): + pass + + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + """The actual proxy server""" + address_family, daemon_threads = socket.AF_INET6, True + + def handle_error(self, request, client_address): + # suppress socket/ssl related errors + cls, e = sys.exc_info()[:2] + if not (cls is socket.error or cls is ssl.SSLError): + return HTTPServer.handle_error(self, request, client_address) -- cgit v1.2.3