aboutsummaryrefslogtreecommitdiff
path: root/test/proxy_core.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/proxy_core.py')
-rw-r--r--test/proxy_core.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/test/proxy_core.py b/test/proxy_core.py
new file mode 100644
index 0000000..dd4225d
--- /dev/null
+++ b/test/proxy_core.py
@@ -0,0 +1,74 @@
+# Copyright (c) 2015, inaz2
+# Copyright (C) 2021 jahoti <jahoti@tilde.team>
+# Licensing information is collated in the `copyright` file
+
+"""
+The core for a "virtual network" proxy
+
+Be sure to set certdir to your intended certificates directory before running.
+"""
+
+import os, socket, ssl, subprocess, sys, threading, time
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from socketserver import ThreadingMixIn
+
+gen_cert_req, lock = 'openssl req -new -key %scert.key -subj /CN=%s', threading.Lock()
+sign_cert_req = 'openssl x509 -req -days 3650 -CA %sca.crt -CAkey %sca.key -set_serial %d -out %s'
+
+def popen(command, *args, **kwargs):
+ return subprocess.Popen((command % args).split(' '), **kwargs)
+
+class ProxyRequestHandler(BaseHTTPRequestHandler):
+ """Handles a network request made to the proxy"""
+ def log_error(self, format, *args):
+ # suppress "Request timed out: timeout('timed out',)"
+ if isinstance(args[0], socket.timeout):
+ return
+
+ self.log_message(format, *args)
+
+ def do_CONNECT(self):
+ hostname = self.path.split(':')[0]
+ certpath = '%s%s.crt' % (certdir, hostname if hostname != 'ca' else 'CA')
+
+ with lock:
+ if not os.path.isfile(certpath):
+ p1 = popen(gen_cert_req, certdir, hostname, stdout=subprocess.PIPE).stdout
+ popen(sign_cert_req, certdir, certdir, time.time() * 1000, certpath, stdin=p1, stderr=subprocess.PIPE).communicate()
+
+ self.send_response(200)
+ self.end_headers()
+
+ self.connection = ssl.wrap_socket(self.connection, keyfile=certdir+'cert.key', certfile=certpath, server_side=True)
+ self.rfile = self.connection.makefile('rb', self.rbufsize)
+ self.wfile = self.connection.makefile('wb', self.wbufsize)
+
+ self.close_connection = int(self.headers.get('Proxy-Connection', '').lower() == 'close')
+
+ def proxy(self):
+ content_length = int(self.headers.get('Content-Length', 0))
+ req_body = self.rfile.read(content_length) if content_length else None
+
+ if self.path[0] == '/':
+ if isinstance(self.connection, ssl.SSLSocket):
+ self.path = 'https://%s%s' % (self.headers['Host'], self.path)
+ else:
+ self.path = 'http://%s%s' % (self.headers['Host'], self.path)
+
+ self.handle_request(req_body)
+
+ do_OPTIONS = do_DELETE = do_PUT = do_HEAD = do_POST = do_GET = proxy
+
+ def handle_request(self, req_body):
+ pass
+
+
+class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
+ """The actual proxy server"""
+ address_family, daemon_threads = socket.AF_INET6, True
+
+ def handle_error(self, request, client_address):
+ # suppress socket/ssl related errors
+ cls, e = sys.exc_info()[:2]
+ if not (cls is socket.error or cls is ssl.SSLError):
+ return HTTPServer.handle_error(self, request, client_address)