diff options
Diffstat (limited to 'src/hydrilla/proxy/addon.py')
-rw-r--r-- | src/hydrilla/proxy/addon.py | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/addon.py b/src/hydrilla/proxy/addon.py new file mode 100644 index 0000000..98894e7 --- /dev/null +++ b/src/hydrilla/proxy/addon.py @@ -0,0 +1,379 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Haketilo addon for Mitmproxy. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this +# code in a proprietary program, I am not going to enforce this in +# court. + +""" +This module contains the definition of a mitmproxy addon that gets instantiated +from addon script. +""" + +import sys +import re +import threading +import secrets +import typing as t +import dataclasses as dc +import traceback as tb + +from pathlib import Path +from contextlib import contextmanager +from urllib.parse import urlparse + +from mitmproxy import tls, http, addonmanager, ctx +from mitmproxy.script import concurrent + +from ..exceptions import HaketiloException +from ..translations import smart_gettext as _ +from .. import url_patterns +from .state_impl import ConcreteHaketiloState +from . import state +from . import policies +from . import http_messages + + +class LoggerToMitmproxy(state.Logger): + def warn(self, msg: str) -> None: + ctx.log.warn(f'Haketilo: {msg}') + + +def safe_parse_url(url: str) -> url_patterns.ParsedUrl: + try: + return url_patterns.parse_url(url) + except url_patterns.HaketiloURLException: + return url_patterns.dummy_url + + +@dc.dataclass +class FlowHandling: + flow: http.HTTPFlow + policy: policies.Policy + _bl_request_info: http_messages.BodylessRequestInfo + _request_info: t.Optional[http_messages.RequestInfo] = None + _bl_response_info: t.Optional[http_messages.BodylessResponseInfo] = None + + @property + def bl_request_info(self) -> http_messages.BodylessRequestInfo: + return self._bl_request_info + + @property + def request_info(self) -> http_messages.RequestInfo: + if self._request_info is None: + body = self.flow.request.get_content(strict=False) or b'' + self._request_info = self._bl_request_info.with_body(body) + + return self._request_info + + @property + def bl_response_info(self) -> http_messages.BodylessResponseInfo: + if self._bl_response_info is None: + assert self.flow.response is not None + + self._bl_response_info = http_messages.BodylessResponseInfo.make( + url = safe_parse_url(self.flow.request.url), + status_code = self.flow.response.status_code, + headers = self.flow.response.headers + ) + + return self._bl_response_info + + @property + def response_info(self) -> http_messages.ResponseInfo: + assert self.flow.response is not None + + body = self.flow.response.get_content(strict=False) or b'' + return self.bl_response_info.with_body(body) + + @property + def full_http_info(self) -> http_messages.FullHTTPInfo: + return http_messages.FullHTTPInfo(self.request_info, self.response_info) + + @staticmethod + def make( + flow: http.HTTPFlow, + policy: policies.Policy, + url: url_patterns.ParsedUrl + ) -> 'FlowHandling': + bl_request_info = http_messages.BodylessRequestInfo.make( + url = url, + method = flow.request.method, + headers = flow.request.headers + ) + + return FlowHandling(flow, policy, bl_request_info) + + +@dc.dataclass +class PassedOptions: + haketilo_dir: t.Optional[str] = None + haketilo_listen_host: t.Optional[str] = None + haketilo_listen_port: t.Optional[int] = None + haketilo_launch_browser: t.Optional[bool] = None + + @property + def fully_configured(self) -> bool: + return (self.haketilo_dir is not None and + self.haketilo_listen_host is not None and + self.haketilo_listen_port is not None and + self.haketilo_launch_browser is not None) + + +Lock = threading.Lock + +@dc.dataclass +class HaketiloAddon: + initial_options: PassedOptions = PassedOptions() + configured: bool = False + configured_lock: Lock = dc.field(default_factory=Lock) + + handling_dict: dict[int, FlowHandling] = dc.field(default_factory=dict) + handling_dict_lock: Lock = dc.field(default_factory=Lock) + + logger: LoggerToMitmproxy = dc.field(default_factory=LoggerToMitmproxy) + + state: t.Optional[ConcreteHaketiloState] = None + + def load(self, loader: addonmanager.Loader) -> None: + """....""" + loader.add_option( + name = 'haketilo_dir', + typespec = str, + default = '~/.haketilo/', + help = "Point to a Haketilo data directory to use" + ) + loader.add_option( + name = 'haketilo_listen_host', + typespec = str, + default = '127.0.0.1', + help = "Specify the address proxy listens on" + ) + loader.add_option( + name = 'haketilo_listen_port', + typespec = int, + default = 8080, + help = "Specify the port listens on" + ) + loader.add_option( + name = 'haketilo_launch_browser', + typespec = bool, + default = True, + help = "Specify whether to attempt to open a browser window with Haketilo page displayed inside" + ) + + def configure(self, updated: set[str]) -> None: + with self.configured_lock: + val_names = ('dir', 'listen_host', 'listen_port', 'launch_browser') + for val_name in val_names: + key = f'haketilo_{val_name}' + + if key not in updated: + continue + + if getattr(self.initial_options, key) is not None: + fmt = _('warn.proxy.setting_already_configured_{}') + self.logger.warn(fmt.format(key)) + continue + + new_val = getattr(ctx.options, key) + setattr(self.initial_options, key, new_val) + + if self.configured or not self.initial_options.fully_configured: + return + + try: + haketilo_dir = self.initial_options.haketilo_dir + listen_host = self.initial_options.haketilo_listen_host + listen_port = self.initial_options.haketilo_listen_port + + self.state = ConcreteHaketiloState.make( + store_dir = Path(t.cast(str, haketilo_dir)) / 'store', + listen_host = t.cast(str, listen_host), + listen_port = t.cast(int, listen_port), + logger = self.logger + ) + except Exception as e: + tb.print_exception(None, e, e.__traceback__) + sys.exit(1) + + self.configured = True + + def running(self) -> None: + with self.configured_lock: + assert self.configured + + assert self.state is not None + + if self.initial_options.haketilo_launch_browser: + if not self.state.launch_browser(): + self.logger.warn(_('warn.proxy.couldnt_launch_browser')) + + def get_flow_handling(self, flow: http.HTTPFlow) -> FlowHandling: + policy: policies.Policy + + assert self.state is not None + + with self.handling_dict_lock: + handling = self.handling_dict.get(id(flow)) + + if handling is None: + try: + parsed_url = url_patterns.parse_url(flow.request.url) + except url_patterns.HaketiloURLException as e: + haketilo_settings = self.state.get_settings() + policy = policies.ErrorBlockPolicy(haketilo_settings, error=e) + parsed_url = url_patterns.dummy_url + else: + policy = self.state.select_policy(parsed_url) + + handling = FlowHandling.make(flow, policy, parsed_url) + + with self.handling_dict_lock: + self.handling_dict[id(flow)] = handling + + return handling + + def forget_flow_handling(self, flow: http.HTTPFlow) -> None: + with self.handling_dict_lock: + self.handling_dict.pop(id(flow), None) + + @contextmanager + def http_safe_event_handling(self, flow: http.HTTPFlow) -> t.Iterator: + """....""" + with self.configured_lock: + assert self.configured + + try: + yield + except Exception as e: + tb_string = ''.join(tb.format_exception(None, e, e.__traceback__)) + error_text = _('err.proxy.unknown_error_{}_try_again')\ + .format(tb_string)\ + .encode() + flow.response = http.Response.make( + status_code = 500, + content = error_text, + headers = [(b'Content-Type', b'text/plain; charset=utf-8')] + ) + + self.forget_flow_handling(flow) + + @concurrent + def requestheaders(self, flow: http.HTTPFlow) -> None: + with self.http_safe_event_handling(flow): + referrer = flow.request.headers.get('referer') + if referrer is not None: + if urlparse(referrer).netloc == 'hkt.mitm.it' and \ + urlparse(flow.request.url).netloc != 'hkt.mitm.it': + # Do not reveal to the site that Haketilo meta-site was + # visited before. + flow.request.headers.pop('referer', None) + + handling = self.get_flow_handling(flow) + policy = handling.policy + + if not policy.should_process_request(handling.bl_request_info): + flow.request.stream = True + if policy.anticache: + flow.request.anticache() + + @concurrent + def request(self, flow: http.HTTPFlow) -> None: + if flow.request.stream: + return + + with self.http_safe_event_handling(flow): + handling = self.get_flow_handling(flow) + + result = handling.policy.consume_request(handling.request_info) + + if result is not None: + mitmproxy_headers = http.Headers(result.headers.items_bin()) + + if isinstance(result, http_messages.RequestInfo): + flow.request.url = result.url.orig_url + flow.request.method = result.method + flow.request.headers = mitmproxy_headers + flow.request.set_content(result.body or None) + else: + # isinstance(result, http_messages.ResponseInfo) + flow.response = http.Response.make( + status_code = result.status_code, + headers = mitmproxy_headers, + content = result.body + ) + + def responseheaders(self, flow: http.HTTPFlow) -> None: + assert flow.response is not None + + with self.http_safe_event_handling(flow): + handling = self.get_flow_handling(flow) + + if not handling.policy.should_process_response( + request_info = handling.request_info, + response_info = handling.bl_response_info + ): + flow.response.stream = True + + @concurrent + def response(self, flow: http.HTTPFlow) -> None: + assert flow.response is not None + + if flow.response.stream: + return + + with self.http_safe_event_handling(flow): + handling = self.get_flow_handling(flow) + + new_nonce = secrets.token_urlsafe(8) + setattr(policies.response_work_data, 'nonce', new_nonce) + + try: + http_info = handling.full_http_info + result = handling.policy.consume_response(http_info) + finally: + delattr(policies.response_work_data, 'nonce') + + if result is not None: + headers_bin = result.headers.items_bin() + + flow.response.status_code = result.status_code + flow.response.headers = http.Headers(headers_bin) + flow.response.set_content(result.body) + + self.forget_flow_handling(flow) + + def tls_clienthello(self, data: tls.ClientHelloData): + if data.context.server.address is None: + return + + host, port = data.context.server.address + if (host == 'hkt.mitm.it' or host.endswith('.hkt.mitm.it')) and \ + port == 443: + return + + data.establish_server_tls_first = True + + def error(self, flow: http.HTTPFlow) -> None: + self.forget_flow_handling(flow) |