# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo addon for Mitmproxy. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use of this # code in a proprietary program, I am not going to enforce this in # court. """ This module contains the definition of a mitmproxy addon that gets instantiated from addon script. """ import sys import re import typing as t import dataclasses as dc import traceback as tb from threading import Lock from pathlib import Path from contextlib import contextmanager from urllib.parse import urlparse from mitmproxy import tls, http, addonmanager, ctx from mitmproxy.script import concurrent from ..exceptions import HaketiloException from ..translations import smart_gettext as _ from ..url_patterns import parse_url, ParsedUrl from .state_impl import ConcreteHaketiloState from . import state from . import policies from . import http_messages DefaultGetValue = t.TypeVar('DefaultGetValue', str, None) class MitmproxyHeadersWrapper(): """....""" def __init__(self, headers: http.Headers) -> None: """....""" self.headers = headers __getitem__ = lambda self, key: self.headers[key] get_all = lambda self, key: self.headers.get_all(key) @t.overload def get(self, key: str) -> t.Optional[str]: ... @t.overload def get(self, key: str, default: DefaultGetValue) \ -> t.Union[str, DefaultGetValue]: ... def get(self, key, default = None): value = self.headers.get(key) if value is None: return default else: return t.cast(str, value) def items(self) -> t.Iterable[tuple[str, str]]: """....""" return self.headers.items(multi=True) class LoggerToMitmproxy(state.Logger): def warn(self, msg: str) -> None: ctx.log.warn(f'Haketilo: {msg}') @dc.dataclass(frozen=True) class FlowHandlingData: request_url: ParsedUrl policy: policies.Policy @dc.dataclass class PassedOptions: haketilo_dir: t.Optional[str] = None haketilo_listen_host: t.Optional[str] = None haketilo_listen_port: t.Optional[int] = None haketilo_launch_browser: t.Optional[bool] = None @property def fully_configured(self) -> bool: return (self.haketilo_dir is not None and self.haketilo_listen_host is not None and self.haketilo_listen_port is not None and self.haketilo_launch_browser is not None) magical_mitm_it_url_reg = re.compile(r'^http://mitm.it(/.*)?$') dummy_url = parse_url('http://dummy.replacement.url') @dc.dataclass class HaketiloAddon: initial_options: PassedOptions = PassedOptions() configured: bool = False configured_lock: Lock = dc.field(default_factory=Lock) flows_data: dict[int, FlowHandlingData] = dc.field(default_factory=dict) flows_data_lock: Lock = dc.field(default_factory=Lock) logger: LoggerToMitmproxy = dc.field(default_factory=LoggerToMitmproxy) state: t.Optional[ConcreteHaketiloState] = None def load(self, loader: addonmanager.Loader) -> None: """....""" loader.add_option( name = 'haketilo_dir', typespec = str, default = '~/.haketilo/', help = "Point to a Haketilo data directory to use" ) loader.add_option( name = 'haketilo_listen_host', typespec = str, default = '127.0.0.1', help = "Specify the address proxy listens on" ) loader.add_option( name = 'haketilo_listen_port', typespec = int, default = 8080, help = "Specify the port listens on" ) loader.add_option( name = 'haketilo_launch_browser', typespec = bool, default = True, help = "Specify whether to attempt to open a browser window with Haketilo page displayed inside" ) def configure(self, updated: set[str]) -> None: with self.configured_lock: val_names = ('dir', 'listen_host', 'listen_port', 'launch_browser') for val_name in val_names: key = f'haketilo_{val_name}' if key not in updated: continue if getattr(self.initial_options, key) is not None: fmt = _('warn.proxy.setting_already_configured_{}') self.logger.warn(fmt.format(key)) continue new_val = getattr(ctx.options, key) setattr(self.initial_options, key, new_val) if self.configured or not self.initial_options.fully_configured: return try: haketilo_dir = self.initial_options.haketilo_dir listen_host = self.initial_options.haketilo_listen_host listen_port = self.initial_options.haketilo_listen_port self.state = ConcreteHaketiloState.make( store_dir = Path(t.cast(str, haketilo_dir)) / 'store', listen_host = t.cast(str, listen_host), listen_port = t.cast(int, listen_port), logger = self.logger ) except Exception as e: tb.print_exception(None, e, e.__traceback__) sys.exit(1) self.configured = True def running(self) -> None: with self.configured_lock: assert self.configured assert self.state is not None if self.initial_options.haketilo_launch_browser: if not self.state.launch_browser(): self.logger.warn(_('warn.proxy.couldnt_launch_browser')) def get_handling_data(self, flow: http.HTTPFlow) -> FlowHandlingData: policy: policies.Policy assert self.state is not None with self.flows_data_lock: handling_data = self.flows_data.get(id(flow)) if handling_data is None: parsed_url = dummy_url if magical_mitm_it_url_reg.match(flow.request.url): policy = policies.DoNothingPolicy() else: try: parsed_url = parse_url(flow.request.url) policy = self.state.select_policy(parsed_url) except HaketiloException as e: policy = policies.ErrorBlockPolicy(builtin=True, error=e) handling_data = FlowHandlingData(parsed_url, policy) with self.flows_data_lock: self.flows_data[id(flow)] = handling_data return handling_data def forget_handling_data(self, flow: http.HTTPFlow) -> None: """....""" with self.flows_data_lock: self.flows_data.pop(id(flow), None) @contextmanager def http_safe_event_handling(self, flow: http.HTTPFlow) -> t.Iterator: """....""" with self.configured_lock: assert self.configured try: yield except Exception as e: tb_string = ''.join(tb.format_exception(None, e, e.__traceback__)) error_text = _('err.proxy.unknown_error_{}_try_again')\ .format(tb_string)\ .encode() flow.response = http.Response.make( status_code = 500, content = error_text, headers = [(b'Content-Type', b'text/plain; charset=utf-8')] ) self.forget_handling_data(flow) @concurrent def requestheaders(self, flow: http.HTTPFlow) -> None: with self.http_safe_event_handling(flow): referrer = flow.request.headers.get('referer') if referrer is not None: if urlparse(referrer).netloc == 'hkt.mitm.it' and \ urlparse(flow.request.url).netloc != 'hkt.mitm.it': # Do not reveal to the site that Haketilo meta-site was # visited before. flow.request.headers.pop('referer', None) handling_data = self.get_handling_data(flow) policy = handling_data.policy if not policy.should_process_request(handling_data.request_url): flow.request.stream = True if policy.anticache: flow.request.anticache() @concurrent def request(self, flow: http.HTTPFlow) -> None: """ .... """ if flow.request.stream: return with self.http_safe_event_handling(flow): handling_data = self.get_handling_data(flow) request_info = http_messages.RequestInfo( url = handling_data.request_url, method = flow.request.method, headers = MitmproxyHeadersWrapper(flow.request.headers), body = flow.request.get_content(strict=False) or b'' ) result = handling_data.policy.consume_request(request_info) if result is not None: if isinstance(result, http_messages.ProducedRequest): flow.request.url = result.url flow.request.method = result.method flow.request.headers = http.Headers(result.headers) flow.request.set_content(result.body or None) else: # isinstance(result, http_messages.ProducedResponse) flow.response = http.Response.make( status_code = result.status_code, headers = http.Headers(result.headers), content = result.body ) def responseheaders(self, flow: http.HTTPFlow) -> None: """ ...... """ assert flow.response is not None with self.http_safe_event_handling(flow): handling_data = self.get_handling_data(flow) policy = handling_data.policy if not policy.should_process_response(handling_data.request_url): flow.response.stream = True @concurrent def response(self, flow: http.HTTPFlow) -> None: """ ...... """ assert flow.response is not None if flow.response.stream: return with self.http_safe_event_handling(flow): handling_data = self.get_handling_data(flow) response_info = http_messages.ResponseInfo( url = parse_url(flow.request.url), orig_url = handling_data.request_url, status_code = flow.response.status_code, headers = MitmproxyHeadersWrapper(flow.response.headers), body = flow.response.get_content(strict=False) or b'' ) result = handling_data.policy.consume_response(response_info) if result is not None: flow.response.status_code = result.status_code flow.response.headers = http.Headers(result.headers) flow.response.set_content(result.body) self.forget_handling_data(flow) def tls_clienthello(self, data: tls.ClientHelloData): if data.context.server.address is None: return host, port = data.context.server.address if (host == 'hkt.mitm.it' or host.endswith('.hkt.mitm.it')) and \ port == 443: return data.establish_server_tls_first = True def error(self, flow: http.HTTPFlow) -> None: """....""" self.forget_handling_data(flow)