aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/addon.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/addon.py')
-rw-r--r--src/hydrilla/proxy/addon.py379
1 files changed, 379 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/addon.py b/src/hydrilla/proxy/addon.py
new file mode 100644
index 0000000..98894e7
--- /dev/null
+++ b/src/hydrilla/proxy/addon.py
@@ -0,0 +1,379 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Haketilo addon for Mitmproxy.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this
+# code in a proprietary program, I am not going to enforce this in
+# court.
+
+"""
+This module contains the definition of a mitmproxy addon that gets instantiated
+from addon script.
+"""
+
+import sys
+import re
+import threading
+import secrets
+import typing as t
+import dataclasses as dc
+import traceback as tb
+
+from pathlib import Path
+from contextlib import contextmanager
+from urllib.parse import urlparse
+
+from mitmproxy import tls, http, addonmanager, ctx
+from mitmproxy.script import concurrent
+
+from ..exceptions import HaketiloException
+from ..translations import smart_gettext as _
+from .. import url_patterns
+from .state_impl import ConcreteHaketiloState
+from . import state
+from . import policies
+from . import http_messages
+
+
+class LoggerToMitmproxy(state.Logger):
+ def warn(self, msg: str) -> None:
+ ctx.log.warn(f'Haketilo: {msg}')
+
+
+def safe_parse_url(url: str) -> url_patterns.ParsedUrl:
+ try:
+ return url_patterns.parse_url(url)
+ except url_patterns.HaketiloURLException:
+ return url_patterns.dummy_url
+
+
+@dc.dataclass
+class FlowHandling:
+ flow: http.HTTPFlow
+ policy: policies.Policy
+ _bl_request_info: http_messages.BodylessRequestInfo
+ _request_info: t.Optional[http_messages.RequestInfo] = None
+ _bl_response_info: t.Optional[http_messages.BodylessResponseInfo] = None
+
+ @property
+ def bl_request_info(self) -> http_messages.BodylessRequestInfo:
+ return self._bl_request_info
+
+ @property
+ def request_info(self) -> http_messages.RequestInfo:
+ if self._request_info is None:
+ body = self.flow.request.get_content(strict=False) or b''
+ self._request_info = self._bl_request_info.with_body(body)
+
+ return self._request_info
+
+ @property
+ def bl_response_info(self) -> http_messages.BodylessResponseInfo:
+ if self._bl_response_info is None:
+ assert self.flow.response is not None
+
+ self._bl_response_info = http_messages.BodylessResponseInfo.make(
+ url = safe_parse_url(self.flow.request.url),
+ status_code = self.flow.response.status_code,
+ headers = self.flow.response.headers
+ )
+
+ return self._bl_response_info
+
+ @property
+ def response_info(self) -> http_messages.ResponseInfo:
+ assert self.flow.response is not None
+
+ body = self.flow.response.get_content(strict=False) or b''
+ return self.bl_response_info.with_body(body)
+
+ @property
+ def full_http_info(self) -> http_messages.FullHTTPInfo:
+ return http_messages.FullHTTPInfo(self.request_info, self.response_info)
+
+ @staticmethod
+ def make(
+ flow: http.HTTPFlow,
+ policy: policies.Policy,
+ url: url_patterns.ParsedUrl
+ ) -> 'FlowHandling':
+ bl_request_info = http_messages.BodylessRequestInfo.make(
+ url = url,
+ method = flow.request.method,
+ headers = flow.request.headers
+ )
+
+ return FlowHandling(flow, policy, bl_request_info)
+
+
+@dc.dataclass
+class PassedOptions:
+ haketilo_dir: t.Optional[str] = None
+ haketilo_listen_host: t.Optional[str] = None
+ haketilo_listen_port: t.Optional[int] = None
+ haketilo_launch_browser: t.Optional[bool] = None
+
+ @property
+ def fully_configured(self) -> bool:
+ return (self.haketilo_dir is not None and
+ self.haketilo_listen_host is not None and
+ self.haketilo_listen_port is not None and
+ self.haketilo_launch_browser is not None)
+
+
+Lock = threading.Lock
+
+@dc.dataclass
+class HaketiloAddon:
+ initial_options: PassedOptions = PassedOptions()
+ configured: bool = False
+ configured_lock: Lock = dc.field(default_factory=Lock)
+
+ handling_dict: dict[int, FlowHandling] = dc.field(default_factory=dict)
+ handling_dict_lock: Lock = dc.field(default_factory=Lock)
+
+ logger: LoggerToMitmproxy = dc.field(default_factory=LoggerToMitmproxy)
+
+ state: t.Optional[ConcreteHaketiloState] = None
+
+ def load(self, loader: addonmanager.Loader) -> None:
+ """...."""
+ loader.add_option(
+ name = 'haketilo_dir',
+ typespec = str,
+ default = '~/.haketilo/',
+ help = "Point to a Haketilo data directory to use"
+ )
+ loader.add_option(
+ name = 'haketilo_listen_host',
+ typespec = str,
+ default = '127.0.0.1',
+ help = "Specify the address proxy listens on"
+ )
+ loader.add_option(
+ name = 'haketilo_listen_port',
+ typespec = int,
+ default = 8080,
+ help = "Specify the port listens on"
+ )
+ loader.add_option(
+ name = 'haketilo_launch_browser',
+ typespec = bool,
+ default = True,
+ help = "Specify whether to attempt to open a browser window with Haketilo page displayed inside"
+ )
+
+ def configure(self, updated: set[str]) -> None:
+ with self.configured_lock:
+ val_names = ('dir', 'listen_host', 'listen_port', 'launch_browser')
+ for val_name in val_names:
+ key = f'haketilo_{val_name}'
+
+ if key not in updated:
+ continue
+
+ if getattr(self.initial_options, key) is not None:
+ fmt = _('warn.proxy.setting_already_configured_{}')
+ self.logger.warn(fmt.format(key))
+ continue
+
+ new_val = getattr(ctx.options, key)
+ setattr(self.initial_options, key, new_val)
+
+ if self.configured or not self.initial_options.fully_configured:
+ return
+
+ try:
+ haketilo_dir = self.initial_options.haketilo_dir
+ listen_host = self.initial_options.haketilo_listen_host
+ listen_port = self.initial_options.haketilo_listen_port
+
+ self.state = ConcreteHaketiloState.make(
+ store_dir = Path(t.cast(str, haketilo_dir)) / 'store',
+ listen_host = t.cast(str, listen_host),
+ listen_port = t.cast(int, listen_port),
+ logger = self.logger
+ )
+ except Exception as e:
+ tb.print_exception(None, e, e.__traceback__)
+ sys.exit(1)
+
+ self.configured = True
+
+ def running(self) -> None:
+ with self.configured_lock:
+ assert self.configured
+
+ assert self.state is not None
+
+ if self.initial_options.haketilo_launch_browser:
+ if not self.state.launch_browser():
+ self.logger.warn(_('warn.proxy.couldnt_launch_browser'))
+
+ def get_flow_handling(self, flow: http.HTTPFlow) -> FlowHandling:
+ policy: policies.Policy
+
+ assert self.state is not None
+
+ with self.handling_dict_lock:
+ handling = self.handling_dict.get(id(flow))
+
+ if handling is None:
+ try:
+ parsed_url = url_patterns.parse_url(flow.request.url)
+ except url_patterns.HaketiloURLException as e:
+ haketilo_settings = self.state.get_settings()
+ policy = policies.ErrorBlockPolicy(haketilo_settings, error=e)
+ parsed_url = url_patterns.dummy_url
+ else:
+ policy = self.state.select_policy(parsed_url)
+
+ handling = FlowHandling.make(flow, policy, parsed_url)
+
+ with self.handling_dict_lock:
+ self.handling_dict[id(flow)] = handling
+
+ return handling
+
+ def forget_flow_handling(self, flow: http.HTTPFlow) -> None:
+ with self.handling_dict_lock:
+ self.handling_dict.pop(id(flow), None)
+
+ @contextmanager
+ def http_safe_event_handling(self, flow: http.HTTPFlow) -> t.Iterator:
+ """...."""
+ with self.configured_lock:
+ assert self.configured
+
+ try:
+ yield
+ except Exception as e:
+ tb_string = ''.join(tb.format_exception(None, e, e.__traceback__))
+ error_text = _('err.proxy.unknown_error_{}_try_again')\
+ .format(tb_string)\
+ .encode()
+ flow.response = http.Response.make(
+ status_code = 500,
+ content = error_text,
+ headers = [(b'Content-Type', b'text/plain; charset=utf-8')]
+ )
+
+ self.forget_flow_handling(flow)
+
+ @concurrent
+ def requestheaders(self, flow: http.HTTPFlow) -> None:
+ with self.http_safe_event_handling(flow):
+ referrer = flow.request.headers.get('referer')
+ if referrer is not None:
+ if urlparse(referrer).netloc == 'hkt.mitm.it' and \
+ urlparse(flow.request.url).netloc != 'hkt.mitm.it':
+ # Do not reveal to the site that Haketilo meta-site was
+ # visited before.
+ flow.request.headers.pop('referer', None)
+
+ handling = self.get_flow_handling(flow)
+ policy = handling.policy
+
+ if not policy.should_process_request(handling.bl_request_info):
+ flow.request.stream = True
+ if policy.anticache:
+ flow.request.anticache()
+
+ @concurrent
+ def request(self, flow: http.HTTPFlow) -> None:
+ if flow.request.stream:
+ return
+
+ with self.http_safe_event_handling(flow):
+ handling = self.get_flow_handling(flow)
+
+ result = handling.policy.consume_request(handling.request_info)
+
+ if result is not None:
+ mitmproxy_headers = http.Headers(result.headers.items_bin())
+
+ if isinstance(result, http_messages.RequestInfo):
+ flow.request.url = result.url.orig_url
+ flow.request.method = result.method
+ flow.request.headers = mitmproxy_headers
+ flow.request.set_content(result.body or None)
+ else:
+ # isinstance(result, http_messages.ResponseInfo)
+ flow.response = http.Response.make(
+ status_code = result.status_code,
+ headers = mitmproxy_headers,
+ content = result.body
+ )
+
+ def responseheaders(self, flow: http.HTTPFlow) -> None:
+ assert flow.response is not None
+
+ with self.http_safe_event_handling(flow):
+ handling = self.get_flow_handling(flow)
+
+ if not handling.policy.should_process_response(
+ request_info = handling.request_info,
+ response_info = handling.bl_response_info
+ ):
+ flow.response.stream = True
+
+ @concurrent
+ def response(self, flow: http.HTTPFlow) -> None:
+ assert flow.response is not None
+
+ if flow.response.stream:
+ return
+
+ with self.http_safe_event_handling(flow):
+ handling = self.get_flow_handling(flow)
+
+ new_nonce = secrets.token_urlsafe(8)
+ setattr(policies.response_work_data, 'nonce', new_nonce)
+
+ try:
+ http_info = handling.full_http_info
+ result = handling.policy.consume_response(http_info)
+ finally:
+ delattr(policies.response_work_data, 'nonce')
+
+ if result is not None:
+ headers_bin = result.headers.items_bin()
+
+ flow.response.status_code = result.status_code
+ flow.response.headers = http.Headers(headers_bin)
+ flow.response.set_content(result.body)
+
+ self.forget_flow_handling(flow)
+
+ def tls_clienthello(self, data: tls.ClientHelloData):
+ if data.context.server.address is None:
+ return
+
+ host, port = data.context.server.address
+ if (host == 'hkt.mitm.it' or host.endswith('.hkt.mitm.it')) and \
+ port == 443:
+ return
+
+ data.establish_server_tls_first = True
+
+ def error(self, flow: http.HTTPFlow) -> None:
+ self.forget_flow_handling(flow)