diff options
Diffstat (limited to 'src/hydrilla/proxy/addon.py')
-rw-r--r-- | src/hydrilla/proxy/addon.py | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/addon.py b/src/hydrilla/proxy/addon.py new file mode 100644 index 0000000..7d6487b --- /dev/null +++ b/src/hydrilla/proxy/addon.py @@ -0,0 +1,177 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Haketilo addon for Mitmproxy. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module contains the definition of a mitmproxy addon that gets instantiated +from addon script. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import os.path +import typing as t +import dataclasses as dc + +from threading import Lock +from pathlib import Path +from contextlib import contextmanager + +from mitmproxy import http, addonmanager, ctx +from mitmproxy.script import concurrent + +from .flow_handlers import make_flow_handler, FlowHandler +from .state import HaketiloState +from ..translations import smart_gettext as _ + +FlowHandlers = dict[int, FlowHandler] + +StateUpdater = t.Callable[[HaketiloState], None] + +HTTPHandlerFun = t.Callable[ + ['HaketiloAddon', http.HTTPFlow], + t.Optional[StateUpdater] +] + +def http_event_handler(handler_fun: HTTPHandlerFun): + """....decorator""" + def wrapped_handler(self: 'HaketiloAddon', flow: http.HTTPFlow): + """....""" + with self.configured_lock: + assert self.configured + + assert self.state is not None + + state_updater = handler_fun(self, flow) + + if state_updater is not None: + state_updater(self.state) + + return wrapped_handler + +@dc.dataclass +class HaketiloAddon: + """ + ....... + """ + configured: bool = False + configured_lock: Lock = dc.field(default_factory=Lock) + + state: t.Optional[HaketiloState] = None + + flow_handlers: FlowHandlers = dc.field(default_factory=dict) + handlers_lock: Lock = dc.field(default_factory=Lock) + + def load(self, loader: addonmanager.Loader) -> None: + """....""" + loader.add_option( + name = 'haketilo_dir', + typespec = str, + default = '~/.haketilo/', + help = "Point to a Haketilo data directory to use", + ) + + def configure(self, updated: set[str]) -> None: + """....""" + if 'haketilo_dir' not in updated: + return + + with self.configured_lock: + if self.configured: + ctx.log.warn(_('haketilo_dir_already_configured')) + return + + haketilo_dir = Path(ctx.options.haketilo_dir) + self.state = HaketiloState(haketilo_dir / 'store') + + def assign_handler(self, flow: http.HTTPFlow, flow_handler: FlowHandler) \ + -> None: + """....""" + with self.handlers_lock: + self.flow_handlers[id(flow)] = flow_handler + + def lookup_handler(self, flow: http.HTTPFlow) -> FlowHandler: + """....""" + with self.handlers_lock: + return self.flow_handlers[id(flow)] + + def forget_handler(self, flow: http.HTTPFlow) -> None: + """....""" + with self.handlers_lock: + self.flow_handlers.pop(id(flow), None) + + @concurrent + @http_event_handler + def requestheaders(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: + """ + ..... + """ + assert self.state is not None + + policy = self.state.select_policy(flow.request.url) + + flow_handler = make_flow_handler(flow, policy) + + self.assign_handler(flow, flow_handler) + + return flow_handler.on_requestheaders() + + @concurrent + @http_event_handler + def request(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: + """ + .... + """ + return self.lookup_handler(flow).on_request() + + @concurrent + @http_event_handler + def responseheaders(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: + """ + ...... + """ + return self.lookup_handler(flow).on_responseheaders() + + @concurrent + @http_event_handler + def response(self, flow: http.HTTPFlow) -> t.Optional[StateUpdater]: + """ + ...... + """ + updater = self.lookup_handler(flow).on_response() + + self.forget_handler(flow) + + return updater + + @http_event_handler + def error(self, flow: http.HTTPFlow) -> None: + """....""" + self.forget_handler(flow) + +addons = [ + HaketiloAddon() +] |