#!/usr/bin/env python3 # # Copyright (C) 2021 jahoti # Licensing information is collated in the `copyright` file """ A modular "virtual network" proxy, wrapping the classes in proxy_core.py """ import proxy_core from urllib.parse import parse_qs from misc_constants import * from world_wide_library import catalog as internet class RequestHijacker(proxy_core.ProxyRequestHandler): def handle_request(self, req_body): path_components = self.path.split('?', maxsplit=1) path = path_components[0] try: # Response format: (status_code, headers (dict. of strings), # body as bytes or filename containing body as string) if path in internet: info = internet[path] if type(info) == tuple: status_code, headers, body_file = info if type(body_file) == str: if 'Content-Type' not in headers and '.' in body_file: ext = body_file.rsplit('.', maxsplit=1)[-1] if ext in mime_types: headers['Content-Type'] = mime_types[ext] with open(body_file, mode='rb') as f: body_file = f.read() else: # A function to evaluate to get the response get_params, post_params = {}, {} if len(path_components) == 2: get_params = parse_qs(path_components[1]) # Parse POST parameters; currently only supports # application/x-www-form-urlencoded if req_body: post_params = parse_qs(req_body.encode()) status_code, headers, body_file = info(self.command, get_params, post_params) if type(body_file) == str: body_file = body_file.encode() if type(status_code) != int or status_code <= 0: raise Exception('Invalid status code %r' % status_code) for header, header_value in headers.items(): if type(header) != str: raise Exception('Invalid header key %r' % header) elif type(header_value) != str: raise Exception('Invalid header value %r' % header_value) else: status_code, headers = 404, {'Content-Type': 'text/plain'} body_file = b'Handler for this URL not found.' except Exception as e: status_code, headers, body_file = 500, {'Content-Type': 'text/plain'}, b'Internal Error:\n' + repr(e).encode() headers['Content-Length'] = str(len(body_file)) self.send_response(status_code) for header, header_value in headers.items(): self.send_header(header, header_value) self.end_headers() self.wfile.write(body_file) def do_an_internet(certdir, port): """Start up the proxy/server""" proxy_core.certdir = certdir httpd = proxy_core.ThreadingHTTPServer(('', port), RequestHijacker) httpd.serve_forever() if __name__ == '__main__': import sys def fail(msg, error_code): print('Error:', msg) print('Usage:', sys.argv[0], '[certificates directory] (port)') sys.exit(error_code) if len(sys.argv) < 2: fail('missing required argument "certificates directory".', 1) certdir = sys.argv[1] if not proxy_core.os.path.isdir(certdir): fail('selected certificate directory does not exist.', 2) port = sys.argv[2] if len(sys.argv) > 2 else '1337' if not port.isnumeric(): fail('port must be an integer.', 3) do_an_internet(certdir, int(port))