aboutsummaryrefslogtreecommitdiff
path: root/test/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/server.py')
-rwxr-xr-xtest/server.py101
1 files changed, 101 insertions, 0 deletions
diff --git a/test/server.py b/test/server.py
new file mode 100755
index 0000000..58a84bd
--- /dev/null
+++ b/test/server.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2021 jahoti <jahoti@tilde.team>
+# Licensing information is collated in the `copyright` file
+
+"""
+A modular "virtual network" proxy,
+wrapping the classes in proxy_core.py
+"""
+
+import proxy_core
+from urllib.parse import parse_qs
+from misc_constants import *
+from world_wide_library import catalog as internet
+
+class RequestHijacker(proxy_core.ProxyRequestHandler):
+ def handle_request(self, req_body):
+ path_components = self.path.split('?', maxsplit=1)
+ path = path_components[0]
+ try:
+ # Response format: (status_code, headers (dict. of strings),
+ # body as bytes or filename containing body as string)
+ if path in internet:
+ info = internet[path]
+ if type(info) == tuple:
+ status_code, headers, body_file = info
+ if type(body_file) == str:
+ if 'Content-Type' not in headers and '.' in body_file:
+ ext = body_file.rsplit('.', maxsplit=1)[-1]
+ if ext in mime_types:
+ headers['Content-Type'] = mime_types[ext]
+
+ with open(body_file, mode='rb') as f:
+ body_file = f.read()
+
+ else:
+ # A function to evaluate to get the response
+ get_params, post_params = {}, {}
+ if len(path_components) == 2:
+ get_params = parse_qs(path_components[1])
+
+ # Parse POST parameters; currently only supports
+ # application/x-www-form-urlencoded
+ if req_body:
+ post_params = parse_qs(req_body.encode())
+
+ status_code, headers, body_file = info(self.command, get_params, post_params)
+ if type(body_file) == str:
+ body_file = body_file.encode()
+
+ if type(status_code) != int or status_code <= 0:
+ raise Exception('Invalid status code %r' % status_code)
+
+ for header, header_value in headers.items():
+ if type(header) != str:
+ raise Exception('Invalid header key %r' % header)
+
+ elif type(header_value) != str:
+ raise Exception('Invalid header value %r' % header_value)
+ else:
+ status_code, headers = 404, {'Content-Type': 'text/plain'}
+ body_file = b'Handler for this URL not found.'
+
+ except Exception as e:
+ status_code, headers, body_file = 500, {'Content-Type': 'text/plain'}, b'Internal Error:\n' + repr(e).encode()
+
+ headers['Content-Length'] = str(len(body_file))
+ self.send_response(status_code)
+ for header, header_value in headers.items():
+ self.send_header(header, header_value)
+
+ self.end_headers()
+ self.wfile.write(body_file)
+
+
+
+def do_an_internet(certdir, port):
+ """Start up the proxy/server"""
+ proxy_core.certdir = certdir
+ httpd = proxy_core.ThreadingHTTPServer(('', port), RequestHijacker)
+ httpd.serve_forever()
+
+if __name__ == '__main__':
+ import sys
+ def fail(msg, error_code):
+ print('Error:', msg)
+ print('Usage:', sys.argv[0], '[certificates directory] (port)')
+ sys.exit(error_code)
+
+ if len(sys.argv) < 2:
+ fail('missing required argument "certificates directory".', 1)
+
+ certdir = sys.argv[1]
+ if not proxy_core.os.path.isdir(certdir):
+ fail('selected certificate directory does not exist.', 2)
+
+ port = sys.argv[2] if len(sys.argv) > 2 else '1337'
+ if not port.isnumeric():
+ fail('port must be an integer.', 3)
+
+ do_an_internet(certdir, int(port))