# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo addon for Mitmproxy. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use of this # code in a proprietary program, I am not going to enforce this in # court. """ This module contains the definition of a mitmproxy addon that gets instantiated from addon script. """ import sys import re import threading import secrets import typing as t import dataclasses as dc import traceback as tb from pathlib import Path from contextlib import contextmanager from urllib.parse import urlparse from mitmproxy import tls, http, addonmanager, ctx from mitmproxy.script import concurrent from ..exceptions import HaketiloException from ..translations import smart_gettext as _ from .. import url_patterns from .state_impl import ConcreteHaketiloState from . import state from . import policies from . import http_messages class LoggerToMitmproxy(state.Logger): def warn(self, msg: str) -> None: ctx.log.warn(f'Haketilo: {msg}') def safe_parse_url(url: str) -> url_patterns.ParsedUrl: try: return url_patterns.parse_url(url) except url_patterns.HaketiloURLException: return url_patterns.dummy_url @dc.dataclass class FlowHandling: flow: http.HTTPFlow policy: policies.Policy _bl_request_info: http_messages.BodylessRequestInfo _request_info: t.Optional[http_messages.RequestInfo] = None _bl_response_info: t.Optional[http_messages.BodylessResponseInfo] = None @property def bl_request_info(self) -> http_messages.BodylessRequestInfo: return self._bl_request_info @property def request_info(self) -> http_messages.RequestInfo: if self._request_info is None: body = self.flow.request.get_content(strict=False) or b'' self._request_info = self._bl_request_info.with_body(body) return self._request_info @property def bl_response_info(self) -> http_messages.BodylessResponseInfo: if self._bl_response_info is None: assert self.flow.response is not None self._bl_response_info = http_messages.BodylessResponseInfo.make( url = safe_parse_url(self.flow.request.url), status_code = self.flow.response.status_code, headers = self.flow.response.headers ) return self._bl_response_info @property def response_info(self) -> http_messages.ResponseInfo: assert self.flow.response is not None body = self.flow.response.get_content(strict=False) or b'' return self.bl_response_info.with_body(body) @property def full_http_info(self) -> http_messages.FullHTTPInfo: return http_messages.FullHTTPInfo(self.request_info, self.response_info) @staticmethod def make( flow: http.HTTPFlow, policy: policies.Policy, url: url_patterns.ParsedUrl ) -> 'FlowHandling': bl_request_info = http_messages.BodylessRequestInfo.make( url = url, method = flow.request.method, headers = flow.request.headers ) return FlowHandling(flow, policy, bl_request_info) @dc.dataclass class PassedOptions: haketilo_dir: t.Optional[str] = None haketilo_listen_host: t.Optional[str] = None haketilo_listen_port: t.Optional[int] = None haketilo_launch_browser: t.Optional[bool] = None @property def fully_configured(self) -> bool: return (self.haketilo_dir is not None and self.haketilo_listen_host is not None and self.haketilo_listen_port is not None and self.haketilo_launch_browser is not None) Lock = threading.Lock @dc.dataclass class HaketiloAddon: initial_options: PassedOptions = PassedOptions() configured: bool = False configured_lock: Lock = dc.field(default_factory=Lock) handling_dict: dict[int, FlowHandling] = dc.field(default_factory=dict) handling_dict_lock: Lock = dc.field(default_factory=Lock) logger: LoggerToMitmproxy = dc.field(default_factory=LoggerToMitmproxy) state: t.Optional[ConcreteHaketiloState] = None def load(self, loader: addonmanager.Loader) -> None: """....""" loader.add_option( name = 'haketilo_dir', typespec = str, default = '~/.haketilo/', help = "Point to a Haketilo data directory to use" ) loader.add_option( name = 'haketilo_listen_host', typespec = str, default = '127.0.0.1', help = "Specify the address proxy listens on" ) loader.add_option( name = 'haketilo_listen_port', typespec = int, default = 8080, help = "Specify the port listens on" ) loader.add_option( name = 'haketilo_launch_browser', typespec = bool, default = True, help = "Specify whether to attempt to open a browser window with Haketilo page displayed inside" ) def configure(self, updated: set[str]) -> None: with self.configured_lock: val_names = ('dir', 'listen_host', 'listen_port', 'launch_browser') for val_name in val_names: key = f'haketilo_{val_name}' if key not in updated: continue if getattr(self.initial_options, key) is not None: fmt = _('warn.proxy.setting_already_configured_{}') self.logger.warn(fmt.format(key)) continue new_val = getattr(ctx.options, key) setattr(self.initial_options, key, new_val) if self.configured or not self.initial_options.fully_configured: return try: haketilo_dir = self.initial_options.haketilo_dir listen_host = self.initial_options.haketilo_listen_host listen_port = self.initial_options.haketilo_listen_port self.state = ConcreteHaketiloState.make( store_dir = Path(t.cast(str, haketilo_dir)) / 'store', listen_host = t.cast(str, listen_host), listen_port = t.cast(int, listen_port), logger = self.logger ) except Exception as e: tb.print_exception(None, e, e.__traceback__) sys.exit(1) self.configured = True def running(self) -> None: with self.configured_lock: assert self.configured assert self.state is not None if self.initial_options.haketilo_launch_browser: if not self.state.launch_browser(): self.logger.warn(_('warn.proxy.couldnt_launch_browser')) def get_flow_handling(self, flow: http.HTTPFlow) -> FlowHandling: policy: policies.Policy assert self.state is not None with self.handling_dict_lock: handling = self.handling_dict.get(id(flow)) if handling is None: try: parsed_url = url_patterns.parse_url(flow.request.url) except url_patterns.HaketiloURLException as e: haketilo_settings = self.state.get_settings() policy = policies.ErrorBlockPolicy(haketilo_settings, error=e) parsed_url = url_patterns.dummy_url else: policy = self.state.select_policy(parsed_url) handling = FlowHandling.make(flow, policy, parsed_url) with self.handling_dict_lock: self.handling_dict[id(flow)] = handling return handling def forget_flow_handling(self, flow: http.HTTPFlow) -> None: with self.handling_dict_lock: self.handling_dict.pop(id(flow), None) @contextmanager def http_safe_event_handling(self, flow: http.HTTPFlow) -> t.Iterator: """....""" with self.configured_lock: assert self.configured try: yield except Exception as e: tb_string = ''.join(tb.format_exception(None, e, e.__traceback__)) error_text = _('err.proxy.unknown_error_{}_try_again')\ .format(tb_string)\ .encode() flow.response = http.Response.make( status_code = 500, content = error_text, headers = [(b'Content-Type', b'text/plain; charset=utf-8')] ) self.forget_flow_handling(flow) @concurrent def requestheaders(self, flow: http.HTTPFlow) -> None: with self.http_safe_event_handling(flow): referrer = flow.request.headers.get('referer') if referrer is not None: if urlparse(referrer).netloc == 'hkt.mitm.it' and \ urlparse(flow.request.url).netloc != 'hkt.mitm.it': # Do not reveal to the site that Haketilo meta-site was # visited before. flow.request.headers.pop('referer', None) handling = self.get_flow_handling(flow) policy = handling.policy if not policy.should_process_request(handling.bl_request_info): flow.request.stream = True if policy.anticache: flow.request.anticache() @concurrent def request(self, flow: http.HTTPFlow) -> None: if flow.request.stream: return with self.http_safe_event_handling(flow): handling = self.get_flow_handling(flow) result = handling.policy.consume_request(handling.request_info) if result is not None: mitmproxy_headers = http.Headers(result.headers.items_bin()) if isinstance(result, http_messages.RequestInfo): flow.request.url = result.url.orig_url flow.request.method = result.method flow.request.headers = mitmproxy_headers flow.request.set_content(result.body or None) else: # isinstance(result, http_messages.ResponseInfo) flow.response = http.Response.make( status_code = result.status_code, headers = mitmproxy_headers, content = result.body ) def responseheaders(self, flow: http.HTTPFlow) -> None: assert flow.response is not None with self.http_safe_event_handling(flow): handling = self.get_flow_handling(flow) if not handling.policy.should_process_response( request_info = handling.request_info, response_info = handling.bl_response_info ): flow.response.stream = True @concurrent def response(self, flow: http.HTTPFlow) -> None: assert flow.response is not None if flow.response.stream: return with self.http_safe_event_handling(flow): handling = self.get_flow_handling(flow) new_nonce = secrets.token_urlsafe(8) setattr(policies.response_work_data, 'nonce', new_nonce) try: http_info = handling.full_http_info result = handling.policy.consume_response(http_info) finally: delattr(policies.response_work_data, 'nonce') if result is not None: headers_bin = result.headers.items_bin() flow.response.status_code = result.status_code flow.response.headers = http.Headers(headers_bin) flow.response.set_content(result.body) self.forget_flow_handling(flow) def tls_clienthello(self, data: tls.ClientHelloData): if data.context.server.address is None: return host, port = data.context.server.address if (host == 'hkt.mitm.it' or host.endswith('.hkt.mitm.it')) and \ port == 443: return data.establish_server_tls_first = True def error(self, flow: http.HTTPFlow) -> None: self.forget_flow_handling(flow)