# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo addon for Mitmproxy. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ This module contains the definition of a mitmproxy addon that gets instantiated from addon script. """ # Enable using with Python 3.7. from __future__ import annotations import os.path import typing as t import dataclasses as dc from threading import Lock from pathlib import Path from contextlib import contextmanager from mitmproxy import http, addonmanager, ctx from mitmproxy.script import concurrent from .flow_handlers import make_flow_handler, FlowHandler from .state import HaketiloState from ..translations import smart_gettext as _ FlowHandlers = dict[int, FlowHandler] StateUpdater = t.Callable[[HaketiloState], None] HTTPHandlerFun = t.Callable[ ['HaketiloAddon', http.HTTPFlow], t.Optional[StateUpdater] ] def http_event_handler(handler_fun: HTTPHandlerFun): """....decorator""" def wrapped_handler(self: 'HaketiloAddon', flow: http.HTTPFlow): """....""" with self.configured_lock: assert self.configured assert self.state is not None state_updater = handler_fun(self, flow) if state_updater is not None: state_updater(self.state) return wrapped_handler @dc.dataclass class HaketiloAddon: """ ....... """ configured: bool = False configured_lock: Lock = dc.field(default_factory=Lock) state: t.Optional[HaketiloState] = None flow_handlers: FlowHandlers = dc.field(default_factory=dict) handlers_lock: Lock = dc.field(default_factory=Lock) def load(self, loader: addonmanager.Loader) -> None: """....""" loader.add_option( name = 'haketilo_dir', typespec = str, default = '~/.haketilo/', help = "Point to a Haketilo data directory to use", ) def configure(self, updated: set[str]) -> None: """....""" if 'haketilo_dir' not in updated: return with self.configured_lock: if self.configured: ctx.log.warn(_('haketilo_dir_already_configured')) return haketilo_dir = Path(ctx.options.haketilo_dir) self.state = HaketiloState(haketilo_dir / 'store') def assign_handler(self, flow: http.HTTPFlow, flow_handler: FlowHandler) \ -> None: """....""" with self.handlers_lock: self.flow_handlers[id(flow)] = flow_handler def lookup_handler(self, flow: http.HTTPFlow) -> FlowHandler: """....""" with self.handlers_lock: return self.flow_handlers[id(flow)] def forget_handler(self, flow: http.HTTPFlow) -> None: """....""" with self.handlers_lock: self.flow_handlers.pop(id(flow), None) @concurrent @http_event_handler def requestheaders(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: """ ..... """ assert self.state is not None policy = self.state.select_policy(flow.request.url) flow_handler = make_flow_handler(flow, policy) self.assign_handler(flow, flow_handler) return flow_handler.on_requestheaders() @concurrent @http_event_handler def request(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: """ .... """ return self.lookup_handler(flow).on_request() @concurrent @http_event_handler def responseheaders(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: """ ...... """ return self.lookup_handler(flow).on_responseheaders() @concurrent @http_event_handler def response(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: """ ...... """ updater = self.lookup_handler(flow).on_response() self.forget_handler(flow) return updater @http_event_handler def error(self, flow: http.HTTPFlow) -> None: """....""" self.forget_handler(flow) addons = [ HaketiloAddon() ]