From 7796e55405e2c27f053122bdec25ffc06df92b4f Mon Sep 17 00:00:00 2001 From: jahoti Date: Fri, 6 Aug 2021 00:00:00 +0000 Subject: Add the beginnings of a test suite --- test/init.sh | 12 +++++++ test/proxy_core.py | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 test/init.sh create mode 100644 test/proxy_core.py (limited to 'test') diff --git a/test/init.sh b/test/init.sh new file mode 100644 index 0000000..915db76 --- /dev/null +++ b/test/init.sh @@ -0,0 +1,12 @@ +#!/bin/sh +# +# Copyright (c) 2015, inaz2 +# Copyright (C) 2021 jahoti +# Licensing information is collated in the `copyright` file + +# Initialize the root certificate for the tests proxy server +# Make sure this is run in the directory where they will be put! + +openssl genrsa -out ca.key 2048 +openssl genrsa -out cert.key 2048 +openssl req -new -x509 -days 3650 -key ca.key -out ca.crt -subj "/CN=Hachette Test" diff --git a/test/proxy_core.py b/test/proxy_core.py new file mode 100644 index 0000000..6b214cf --- /dev/null +++ b/test/proxy_core.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2015, inaz2 +# Copyright (C) 2021 jahoti +# Licensing information is collated in the `copyright` file + +""" +The core for a "virtual network" proxy + +Be sure to run this inside your intended certificates directory. +""" + +import os, socket, ssl, sys, threading, time +from http.server import HTTPServer, BaseHTTPRequestHandler +from socketserver import ThreadingMixIn +from subprocess import Popen, PIPE + +gen_cert_req, lock = 'openssl req -new -key cert.key -subj /CN=%s', threading.Lock() +sign_cert_req = 'openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -set_serial %d -out %s' + + +class ProxyRequestHandler(BaseHTTPRequestHandler): + """Handles a network request made to the proxy""" + + def log_error(self, format, *args): + # suppress "Request timed out: timeout('timed out',)" + if isinstance(args[0], socket.timeout): + return + + self.log_message(format, *args) + + def do_CONNECT(self): + hostname = self.path.split(':')[0] + certpath = '%s.crt' % (hostname if hostname != 'ca' else 'CA') + + with lock: + if not os.path.isfile(certpath): + p1 = Popen((gen_cert_req % hostname).split(' '), stdout=PIPE).stdout + Popen((sign_cert_req % (time.time() * 1000, certpath)).split(' '), stdin=p1, stderr=PIPE).communicate() + + self.wfile.write('HTTP/1.1 200 Connection Established\r\n') + self.end_headers() + + self.connection = ssl.wrap_socket(self.connection, keyfile='cert.key', certfile=certpath, server_side=True) + self.rfile = self.connection.makefile('rb', self.rbufsize) + self.wfile = self.connection.makefile('wb', self.wbufsize) + + self.close_connection = int(self.headers.get('Proxy-Connection', '').lower() == 'close') + + def proxy(self): + if self.path == 'http://hachette.test/': + with open('ca.crt', 'rb') as f: + data = f.read() + + self.wfile.write('HTTP/1.1 200 OK\r\n') + self.send_header('Content-Type', 'application/x-x509-ca-cert') + self.send_header('Content-Length', len(data)) + self.send_header('Connection', 'close') + self.end_headers() + self.wfile.write(data) + return + + content_length = int(self.headers.get('Content-Length', 0)) + req_body = self.rfile.read(content_length) if content_length else None + + if self.path[0] == '/': + if isinstance(self.connection, ssl.SSLSocket): + self.path = 'https://%s%s' % (self.headers['Host'], self.path) + else: + self.path = 'http://%s%s' % (self.headers['Host'], self.path) + + self.handle_request(req_body) + + do_OPTIONS = do_DELETE = do_PUT = do_HEAD = do_POST = do_GET = proxy + + def handle_request(self, req_body): + pass + + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + """The actual proxy server""" + address_family, daemon_threads, handler = socket.AF_INET6, True, ProxyRequestHandler + + def handle_error(self, request, client_address): + # suppress socket/ssl related errors + cls, e = sys.exc_info()[:2] + if not (cls is socket.error or cls is ssl.SSLError): + return HTTPServer.handle_error(self, request, client_address) + + +def start(server_class, port=1337): + """Start up the proxy/server""" + + print('Serving HTTP Proxy') + httpd = server_class(('::1', port), ProxyRequestHandler) + httpd.serve_forever() -- cgit v1.2.3