aboutsummaryrefslogtreecommitdiff
path: root/test/server.py
blob: 58a84bd7bd3fe8d475313f0119da9cb961eb952e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
#
# Copyright (C) 2021 jahoti <jahoti@tilde.team>
# Licensing information is collated in the `copyright` file

"""
A modular "virtual network" proxy,
wrapping the classes in proxy_core.py
"""

import proxy_core
from urllib.parse import parse_qs
from misc_constants import *
from world_wide_library import catalog as internet

class RequestHijacker(proxy_core.ProxyRequestHandler):
	def handle_request(self, req_body):
		path_components = self.path.split('?', maxsplit=1)
		path = path_components[0]
		try:
			# Response format: (status_code, headers (dict. of strings),
			#       body as bytes or filename containing body as string)
			if path in internet:
				info = internet[path]
				if type(info) == tuple:
					status_code, headers, body_file = info
					if type(body_file) == str:
						if 'Content-Type' not in headers and '.' in body_file:
							ext = body_file.rsplit('.', maxsplit=1)[-1]
							if ext in mime_types:
								headers['Content-Type'] = mime_types[ext]
							
						with open(body_file, mode='rb') as f:
							body_file = f.read()
						
				else:
					# A function to evaluate to get the response
					get_params, post_params = {}, {}
					if len(path_components) == 2:
						get_params = parse_qs(path_components[1])
					
					# Parse POST parameters; currently only supports
					# application/x-www-form-urlencoded
					if req_body:
						post_params = parse_qs(req_body.encode())

					status_code, headers, body_file = info(self.command, get_params, post_params)
					if type(body_file) == str:
						body_file = body_file.encode()
			
				if type(status_code) != int or status_code <= 0:
					raise Exception('Invalid status code %r' % status_code)
				
				for header, header_value in headers.items():
					if type(header) != str:
						raise Exception('Invalid header key %r' % header)
					
					elif type(header_value) != str:
						raise Exception('Invalid header value %r' % header_value)
			else:
				status_code, headers = 404, {'Content-Type': 'text/plain'}
				body_file = b'Handler for this URL not found.'
		
		except Exception as e:
			status_code, headers, body_file = 500, {'Content-Type': 'text/plain'}, b'Internal Error:\n' + repr(e).encode()
		
		headers['Content-Length'] = str(len(body_file))
		self.send_response(status_code)
		for header, header_value in headers.items():
			self.send_header(header, header_value)
		
		self.end_headers()
		self.wfile.write(body_file)
		


def do_an_internet(certdir, port):
	"""Start up the proxy/server"""
	proxy_core.certdir = certdir
	httpd = proxy_core.ThreadingHTTPServer(('', port), RequestHijacker)
	httpd.serve_forever()

if __name__ == '__main__':
	import sys
	def fail(msg, error_code):
		print('Error:', msg)
		print('Usage:', sys.argv[0], '[certificates directory] (port)')
		sys.exit(error_code)
	
	if len(sys.argv) < 2:
		fail('missing required argument "certificates directory".', 1)
	
	certdir = sys.argv[1]
	if not proxy_core.os.path.isdir(certdir):
		fail('selected certificate directory does not exist.', 2)
	
	port = sys.argv[2] if len(sys.argv) > 2 else '1337'
	if not port.isnumeric():
		fail('port must be an integer.', 3)
	
	do_an_internet(certdir, int(port))