From b1444d9c9ea065d7c97d5809c3ec5259cb01a1da Mon Sep 17 00:00:00 2001 From: jahoti Date: Mon, 6 Sep 2021 00:00:00 +0000 Subject: Incorporate test suite from jahoti branch --- test/server.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100755 test/server.py (limited to 'test/server.py') diff --git a/test/server.py b/test/server.py new file mode 100755 index 0000000..58a84bd --- /dev/null +++ b/test/server.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 jahoti +# Licensing information is collated in the `copyright` file + +""" +A modular "virtual network" proxy, +wrapping the classes in proxy_core.py +""" + +import proxy_core +from urllib.parse import parse_qs +from misc_constants import * +from world_wide_library import catalog as internet + +class RequestHijacker(proxy_core.ProxyRequestHandler): + def handle_request(self, req_body): + path_components = self.path.split('?', maxsplit=1) + path = path_components[0] + try: + # Response format: (status_code, headers (dict. of strings), + # body as bytes or filename containing body as string) + if path in internet: + info = internet[path] + if type(info) == tuple: + status_code, headers, body_file = info + if type(body_file) == str: + if 'Content-Type' not in headers and '.' in body_file: + ext = body_file.rsplit('.', maxsplit=1)[-1] + if ext in mime_types: + headers['Content-Type'] = mime_types[ext] + + with open(body_file, mode='rb') as f: + body_file = f.read() + + else: + # A function to evaluate to get the response + get_params, post_params = {}, {} + if len(path_components) == 2: + get_params = parse_qs(path_components[1]) + + # Parse POST parameters; currently only supports + # application/x-www-form-urlencoded + if req_body: + post_params = parse_qs(req_body.encode()) + + status_code, headers, body_file = info(self.command, get_params, post_params) + if type(body_file) == str: + body_file = body_file.encode() + + if type(status_code) != int or status_code <= 0: + raise Exception('Invalid status code %r' % status_code) + + for header, header_value in headers.items(): + if type(header) != str: + raise Exception('Invalid header key %r' % header) + + elif type(header_value) != str: + raise Exception('Invalid header value %r' % header_value) + else: + status_code, headers = 404, {'Content-Type': 'text/plain'} + body_file = b'Handler for this URL not found.' + + except Exception as e: + status_code, headers, body_file = 500, {'Content-Type': 'text/plain'}, b'Internal Error:\n' + repr(e).encode() + + headers['Content-Length'] = str(len(body_file)) + self.send_response(status_code) + for header, header_value in headers.items(): + self.send_header(header, header_value) + + self.end_headers() + self.wfile.write(body_file) + + + +def do_an_internet(certdir, port): + """Start up the proxy/server""" + proxy_core.certdir = certdir + httpd = proxy_core.ThreadingHTTPServer(('', port), RequestHijacker) + httpd.serve_forever() + +if __name__ == '__main__': + import sys + def fail(msg, error_code): + print('Error:', msg) + print('Usage:', sys.argv[0], '[certificates directory] (port)') + sys.exit(error_code) + + if len(sys.argv) < 2: + fail('missing required argument "certificates directory".', 1) + + certdir = sys.argv[1] + if not proxy_core.os.path.isdir(certdir): + fail('selected certificate directory does not exist.', 2) + + port = sys.argv[2] if len(sys.argv) > 2 else '1337' + if not port.isnumeric(): + fail('port must be an integer.', 3) + + do_an_internet(certdir, int(port)) -- cgit v1.2.3