# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo addon for Mitmproxy. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ This module contains the definition of a mitmproxy addon that gets instantiated from addon script. """ # Enable using with Python 3.7. from __future__ import annotations import sys import typing as t import dataclasses as dc import traceback as tb from threading import Lock from pathlib import Path from contextlib import contextmanager # for mitmproxy 6.* from mitmproxy.net import http if not hasattr(http, 'Headers'): # for mitmproxy 8.* from mitmproxy import http # type: ignore from mitmproxy.http import HTTPFlow from mitmproxy import addonmanager, ctx from mitmproxy.script import concurrent from ..exceptions import HaketiloException from ..translations import smart_gettext as _ from ..url_patterns import parse_url from .state_impl import ConcreteHaketiloState from . import policies from . import http_messages DefaultGetValue = t.TypeVar('DefaultGetValue', object, None) class MitmproxyHeadersWrapper(): """....""" def __init__(self, headers: http.Headers) -> None: """....""" self.headers = headers __getitem__ = lambda self, key: self.headers[key] get_all = lambda self, key: self.headers.get_all(key) def get(self, key: str, default: DefaultGetValue = None) \ -> t.Union[str, DefaultGetValue]: """....""" value = self.headers.get(key) if value is None: return default else: return t.cast(str, value) def items(self) -> t.Iterable[tuple[str, str]]: """....""" return self.headers.items(multi=True) @dc.dataclass class HaketiloAddon: """ ....... """ configured: bool = False configured_lock: Lock = dc.field(default_factory=Lock) flow_policies: dict[int, policies.Policy] = dc.field(default_factory=dict) policies_lock: Lock = dc.field(default_factory=Lock) state: t.Optional[ConcreteHaketiloState] = None def load(self, loader: addonmanager.Loader) -> None: """....""" loader.add_option( name = 'haketilo_dir', typespec = str, default = '~/.haketilo/', help = "Point to a Haketilo data directory to use", ) def configure(self, updated: set[str]) -> None: """....""" if 'haketilo_dir' not in updated: return with self.configured_lock: if self.configured: ctx.log.warn(_('haketilo_dir_already_configured')) return try: haketilo_dir = Path(ctx.options.haketilo_dir) self.state = ConcreteHaketiloState.make(haketilo_dir / 'store') except Exception as e: tb.print_exception(None, e, e.__traceback__) sys.exit(1) self.configured = True def try_get_policy(self, flow: HTTPFlow, fail_ok: bool = True) -> \ t.Optional[policies.Policy]: """....""" with self.policies_lock: policy = self.flow_policies.get(id(flow)) if policy is None: try: parsed_url = parse_url(flow.request.url) except HaketiloException: if fail_ok: return None else: raise assert self.state is not None policy = self.state.select_policy(parsed_url) with self.policies_lock: self.flow_policies[id(flow)] = policy return policy def get_policy(self, flow: HTTPFlow) -> policies.Policy: return t.cast(policies.Policy, self.try_get_policy(flow, fail_ok=False)) def forget_policy(self, flow: HTTPFlow) -> None: """....""" with self.policies_lock: self.flow_policies.pop(id(flow), None) @contextmanager def http_safe_event_handling(self, flow: HTTPFlow) -> t.Iterator: """....""" with self.configured_lock: assert self.configured try: yield except Exception as e: tb_string = ''.join(tb.format_exception(None, e, e.__traceback__)) error_text = _('err.proxy.unknown_error_{}_try_again')\ .format(tb_string)\ .encode() flow.response = http.Response.make( status_code = 500, content = error_text, headers = [(b'Content-Type', b'text/plain; charset=utf-8')] ) self.forget_policy(flow) @concurrent def requestheaders(self, flow: HTTPFlow) -> None: # TODO: don't account for mitmproxy 6 in the code # Mitmproxy 6 causes even more strange behavior than described below. # This cannot be easily worked around. Let's just use version 8 and # make an APT package for it. """ Under mitmproxy 8 this handler deduces an appropriate policy for flow's URL and assigns it to the flow. Under mitmproxy 6 the URL is not yet available at this point, so the handler effectively does nothing. """ with self.http_safe_event_handling(flow): policy = self.try_get_policy(flow) if policy is not None: if not policy.process_request: flow.request.stream = True if policy.anticache: flow.request.anticache() @concurrent def request(self, flow: HTTPFlow) -> None: """ .... """ if flow.request.stream: return with self.http_safe_event_handling(flow): policy = self.get_policy(flow) request_info = http_messages.RequestInfo( url = parse_url(flow.request.url), method = flow.request.method, headers = MitmproxyHeadersWrapper(flow.request.headers), body = flow.request.get_content(strict=False) or b'' ) result = policy.consume_request(request_info) if result is not None: if isinstance(result, http_messages.ProducedRequest): flow.request = http.Request.make( url = result.url, method = result.method, headers = http.Headers(result.headers), content = result.body ) else: # isinstance(result, http_messages.ProducedResponse) flow.response = http.Response.make( status_code = result.status_code, headers = http.Headers(result.headers), content = result.body ) def responseheaders(self, flow: HTTPFlow) -> None: """ ...... """ assert flow.response is not None with self.http_safe_event_handling(flow): policy = self.get_policy(flow) if not policy.process_response: flow.response.stream = True @concurrent def response(self, flow: HTTPFlow) -> None: """ ...... """ assert flow.response is not None if flow.response.stream: return with self.http_safe_event_handling(flow): policy = self.get_policy(flow) response_info = http_messages.ResponseInfo( url = parse_url(flow.request.url), status_code = flow.response.status_code, headers = MitmproxyHeadersWrapper(flow.response.headers), body = flow.response.get_content(strict=False) or b'' ) result = policy.consume_response(response_info) if result is not None: flow.response.status_code = result.status_code flow.response.headers = http.Headers(result.headers) flow.response.set_content(result.body) self.forget_policy(flow) def error(self, flow: HTTPFlow) -> None: """....""" self.forget_policy(flow)