aboutsummaryrefslogtreecommitdiff
path: root/test/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/server.py')
-rwxr-xr-xtest/server.py195
1 files changed, 101 insertions, 94 deletions
diff --git a/test/server.py b/test/server.py
index 58a84bd..6013955 100755
--- a/test/server.py
+++ b/test/server.py
@@ -1,101 +1,108 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2021 jahoti <jahoti@tilde.team>
-# Licensing information is collated in the `copyright` file
+# SPDX-License-Identifier: AGPL-3.0-or-later
"""
A modular "virtual network" proxy,
wrapping the classes in proxy_core.py
"""
-import proxy_core
+# This file is part of Haketilo.
+#
+# Copyright (C) 2021 jahoti <jahoti@tilde.team>
+# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+from pathlib import Path
from urllib.parse import parse_qs
-from misc_constants import *
-from world_wide_library import catalog as internet
-
-class RequestHijacker(proxy_core.ProxyRequestHandler):
- def handle_request(self, req_body):
- path_components = self.path.split('?', maxsplit=1)
- path = path_components[0]
- try:
- # Response format: (status_code, headers (dict. of strings),
- # body as bytes or filename containing body as string)
- if path in internet:
- info = internet[path]
- if type(info) == tuple:
- status_code, headers, body_file = info
- if type(body_file) == str:
- if 'Content-Type' not in headers and '.' in body_file:
- ext = body_file.rsplit('.', maxsplit=1)[-1]
- if ext in mime_types:
- headers['Content-Type'] = mime_types[ext]
-
- with open(body_file, mode='rb') as f:
- body_file = f.read()
-
- else:
- # A function to evaluate to get the response
- get_params, post_params = {}, {}
- if len(path_components) == 2:
- get_params = parse_qs(path_components[1])
-
- # Parse POST parameters; currently only supports
- # application/x-www-form-urlencoded
- if req_body:
- post_params = parse_qs(req_body.encode())
-
- status_code, headers, body_file = info(self.command, get_params, post_params)
- if type(body_file) == str:
- body_file = body_file.encode()
-
- if type(status_code) != int or status_code <= 0:
- raise Exception('Invalid status code %r' % status_code)
-
- for header, header_value in headers.items():
- if type(header) != str:
- raise Exception('Invalid header key %r' % header)
-
- elif type(header_value) != str:
- raise Exception('Invalid header value %r' % header_value)
- else:
- status_code, headers = 404, {'Content-Type': 'text/plain'}
- body_file = b'Handler for this URL not found.'
-
- except Exception as e:
- status_code, headers, body_file = 500, {'Content-Type': 'text/plain'}, b'Internal Error:\n' + repr(e).encode()
-
- headers['Content-Length'] = str(len(body_file))
- self.send_response(status_code)
- for header, header_value in headers.items():
- self.send_header(header, header_value)
-
- self.end_headers()
- self.wfile.write(body_file)
-
-
-
-def do_an_internet(certdir, port):
- """Start up the proxy/server"""
- proxy_core.certdir = certdir
- httpd = proxy_core.ThreadingHTTPServer(('', port), RequestHijacker)
- httpd.serve_forever()
-
-if __name__ == '__main__':
- import sys
- def fail(msg, error_code):
- print('Error:', msg)
- print('Usage:', sys.argv[0], '[certificates directory] (port)')
- sys.exit(error_code)
-
- if len(sys.argv) < 2:
- fail('missing required argument "certificates directory".', 1)
-
- certdir = sys.argv[1]
- if not proxy_core.os.path.isdir(certdir):
- fail('selected certificate directory does not exist.', 2)
-
- port = sys.argv[2] if len(sys.argv) > 2 else '1337'
- if not port.isnumeric():
- fail('port must be an integer.', 3)
-
- do_an_internet(certdir, int(port))
+from threading import Thread
+
+from .proxy_core import ProxyRequestHandler, ThreadingHTTPServer
+from .misc_constants import *
+from .world_wide_library import catalog as internet
+
+class RequestHijacker(ProxyRequestHandler):
+ def handle_request(self, req_body):
+ path_components = self.path.split('?', maxsplit=1)
+ path = path_components[0]
+ try:
+ # Response format: (status_code, headers (dict. of strings),
+ # body as bytes or filename containing body as string)
+ if path in internet:
+ info = internet[path]
+ if type(info) is tuple:
+ status_code, headers, body_file = info
+ resp_body = b''
+ if body_file is not None:
+ if 'Content-Type' not in headers:
+ ext = body_file.suffix[1:]
+ if ext and ext in mime_types:
+ headers['Content-Type'] = mime_types[ext]
+
+ with open(body_file, mode='rb') as f:
+ resp_body = f.read()
+ else:
+ # A function to evaluate to get the response
+ get_params, post_params = {}, {}
+ if len(path_components) == 2:
+ get_params = parse_qs(path_components[1])
+
+ # Parse POST parameters; currently only supports
+ # application/x-www-form-urlencoded
+ if req_body:
+ post_params = parse_qs(req_body.encode())
+
+ status_code, headers, resp_body = info(self.command, get_params, post_params)
+ if type(resp_body) == str:
+ resp_body = resp_body.encode()
+
+ if type(status_code) != int or status_code <= 0:
+ raise Exception('Invalid status code %r' % status_code)
+
+ for header, header_value in headers.items():
+ if type(header) != str:
+ raise Exception('Invalid header key %r' % header)
+
+ elif type(header_value) != str:
+ raise Exception('Invalid header value %r' % header_value)
+ else:
+ status_code, headers = 404, {'Content-Type': 'text/plain'}
+ resp_body = b'Handler for this URL not found.'
+
+ except Exception as e:
+ status_code, headers, resp_body = 500, {'Content-Type': 'text/plain'}, b'Internal Error:\n' + repr(e).encode()
+
+ headers['Content-Length'] = str(len(resp_body))
+ self.send_response(status_code)
+ for header, header_value in headers.items():
+ self.send_header(header, header_value)
+
+ self.end_headers()
+ if resp_body:
+ self.wfile.write(resp_body)
+
+def do_an_internet(certdir=default_cert_dir, port=default_proxy_port):
+ """Start up the proxy/server"""
+ class RequestHijackerWithCertdir(RequestHijacker):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, certdir=certdir, **kwargs)
+
+ httpd = ThreadingHTTPServer(('', port), RequestHijackerWithCertdir)
+ Thread(target=httpd.serve_forever).start()
+
+ return httpd