aboutsummaryrefslogtreecommitdiff
path: root/test/haketilo_test/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/haketilo_test/server.py')
-rwxr-xr-xtest/haketilo_test/server.py112
1 files changed, 112 insertions, 0 deletions
diff --git a/test/haketilo_test/server.py b/test/haketilo_test/server.py
new file mode 100755
index 0000000..7dc5e9e
--- /dev/null
+++ b/test/haketilo_test/server.py
@@ -0,0 +1,112 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+"""
+A modular "virtual network" proxy,
+wrapping the classes in proxy_core.py
+"""
+
+# This file is part of Haketilo.
+#
+# Copyright (C) 2021 jahoti <jahoti@tilde.team>
+# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this code
+# in a proprietary program, I am not going to enforce this in court.
+
+from pathlib import Path
+from urllib.parse import parse_qs
+from threading import Thread
+import traceback
+
+from .proxy_core import ProxyRequestHandler, ThreadingHTTPServer
+from .misc_constants import *
+from .world_wide_library import catalog as internet
+
+class RequestHijacker(ProxyRequestHandler):
+ def handle_request(self, req_body):
+ path_components = self.path.split('?', maxsplit=1)
+ path = path_components[0]
+ try:
+ # Response format: (status_code, headers (dict. of strings),
+ # body as bytes or filename containing body as string)
+ if path in internet:
+ info = internet[path]
+ if type(info) is tuple:
+ status_code, headers, body_file = info
+ resp_body = b''
+ if body_file is not None:
+ if 'Content-Type' not in headers:
+ ext = body_file.suffix[1:]
+ if ext and ext in mime_types:
+ headers['Content-Type'] = mime_types[ext]
+
+ with open(body_file, mode='rb') as f:
+ resp_body = f.read()
+ else:
+ # A function to evaluate to get the response
+ get_params, post_params = {}, {}
+ if len(path_components) == 2:
+ get_params = parse_qs(path_components[1])
+
+ # Parse POST parameters; currently only supports
+ # application/x-www-form-urlencoded
+ if req_body:
+ post_params = parse_qs(req_body.encode())
+
+ status_code, headers, resp_body = info(self.command, get_params, post_params)
+ if type(resp_body) == str:
+ resp_body = resp_body.encode()
+
+ if type(status_code) != int or status_code <= 0:
+ raise Exception('Invalid status code %r' % status_code)
+
+ for header, header_value in headers.items():
+ if type(header) != str:
+ raise Exception('Invalid header key %r' % header)
+
+ elif type(header_value) != str:
+ raise Exception('Invalid header value %r' % header_value)
+ else:
+ status_code, headers = 404, {'Content-Type': 'text/plain'}
+ resp_body = b'Handler for this URL not found.'
+
+ except Exception:
+ status_code = 500
+ headers = {'Content-Type': 'text/plain'}
+ resp_body = b'Internal Error:\n' + traceback.format_exc().encode()
+
+ headers['Content-Length'] = str(len(resp_body))
+ self.send_response(status_code)
+ for header, header_value in headers.items():
+ self.send_header(header, header_value)
+
+ self.end_headers()
+ if resp_body:
+ self.wfile.write(resp_body)
+
+def do_an_internet(certdir=Path.cwd() / 'certs',
+ port=default_proxy_port):
+ """Start up the proxy/server"""
+ class RequestHijackerWithCertdir(RequestHijacker):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, certdir=certdir, **kwargs)
+
+ httpd = ThreadingHTTPServer(('', port), RequestHijackerWithCertdir)
+ Thread(target=httpd.serve_forever).start()
+
+ return httpd