From 52d12a4fa124daa1595529e3e7008276a7986d95 Mon Sep 17 00:00:00 2001 From: Wojtek Kosior Date: Mon, 13 Jun 2022 11:06:49 +0200 Subject: unfinished partial work --- src/hydrilla/proxy/flow_handlers.py | 383 ++++++++++++++++++++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 src/hydrilla/proxy/flow_handlers.py (limited to 'src/hydrilla/proxy/flow_handlers.py') diff --git a/src/hydrilla/proxy/flow_handlers.py b/src/hydrilla/proxy/flow_handlers.py new file mode 100644 index 0000000..605c7f9 --- /dev/null +++ b/src/hydrilla/proxy/flow_handlers.py @@ -0,0 +1,383 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Logic for modifying mitmproxy's HTTP flows. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module's file gets passed to Mitmproxy as addon script and makes it serve +as Haketilo proxy. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import re +import typing as t +import dataclasses as dc + +import bs4 # type: ignore + +from mitmproxy import http +from mitmproxy.net.http import Headers +from mitmproxy.script import concurrent + +from .state import HaketiloState +from . import policies + +StateUpdater = t.Callable[[HaketiloState], None] + +@dc.dataclass(frozen=True) +class FlowHandler: + """....""" + flow: http.HTTPFlow + policy: policies.Policy + + stream_request: bool = False + stream_response: bool = False + + def on_requestheaders(self) -> t.Optional[StateUpdater]: + """....""" + if self.stream_request: + self.flow.request.stream = True + + return None + + def on_request(self) -> t.Optional[StateUpdater]: + """....""" + return None + + def on_responseheaders(self) -> t.Optional[StateUpdater]: + """....""" + assert self.flow.response is not None + + if self.stream_response: + self.flow.response.stream = True + + return None + + def on_response(self) -> t.Optional[StateUpdater]: + """....""" + return None + +@dc.dataclass(frozen=True) +class FlowHandlerAllowScripts(FlowHandler): + """....""" + policy: policies.AllowPolicy + + stream_request: bool = True + stream_response: bool = True + +csp_header_names_and_dispositions = ( + ('content-security-policy', 'enforce'), + ('content-security-policy-report-only', 'report'), + ('x-content-security-policy', 'enforce'), + ('x-content-security-policy', 'report'), + ('x-webkit-csp', 'enforce'), + ('x-webkit-csp', 'report') +) + +csp_enforce_header_names_set = { + name for name, disposition in csp_header_names_and_dispositions + if disposition == 'enforce' +} + +@dc.dataclass +class ContentSecurityPolicy: + directives: dict[str, list[str]] + header_name: str + disposition: str + + @staticmethod + def deserialize( + serialized: str, + header_name: str, + disposition: str = 'enforce' + ) -> 'ContentSecurityPolicy': + """....""" + # For more info, see: + # https://www.w3.org/TR/CSP3/#parse-serialized-policy + directives = {} + + for serialized_directive in serialized.split(';'): + if not serialized_directive.isascii(): + continue + + serialized_directive = serialized_directive.strip() + if len(serialized_directive) == 0: + continue + + tokens = serialized_directive.split() + directive_name = tokens.pop(0).lower() + directive_value = tokens + + # Specs mention giving warnings for duplicate directive names but + # from our proxy's perspective this is not important right now. + if directive_name in directives: + continue + + directives[directive_name] = directive_value + + return ContentSecurityPolicy(directives, header_name, disposition) + + def serialize(self) -> str: + """....""" + serialized_directives = [] + for name, value_list in self.directives.items(): + serialized_directives.append(f'{name} {" ".join(value_list)}') + + return ';'.join(serialized_directives) + +def extract_csp(headers: Headers) -> tuple[ContentSecurityPolicy, ...]: + """....""" + csp_policies = [] + + for header_name, disposition in csp_header_names_and_dispositions: + for serialized_list in headers.get(header_name, ''): + for serialized in serialized_list.split(','): + policy = ContentSecurityPolicy.deserialize( + serialized, + header_name, + disposition + ) + + if policy.directives != {}: + csp_policies.append(policy) + + return tuple(csp_policies) + +csp_script_directive_names = ( + 'script-src', + 'script-src-elem', + 'script-src-attr' +) + +@dc.dataclass(frozen=True) +class FlowHandlerBlockScripts(FlowHandler): + policy: policies.BlockPolicy + + stream_request: bool = True + stream_response: bool = True + + def on_responseheaders(self) -> t.Optional[StateUpdater]: + """....""" + super().on_responseheaders() + + assert self.flow.response is not None + + csp_policies = extract_csp(self.flow.response.headers) + + for header_name, _ in csp_header_names_and_dispositions: + del self.flow.response.headers[header_name] + + for policy in csp_policies: + if policy.disposition != 'enforce': + continue + + policy.directives.pop('report-to') + policy.directives.pop('report-uri') + + self.flow.response.headers.add( + policy.header_name, + policy.serialize() + ) + + extra_csp = ';'.join(( + "script-src 'none'", + "script-src-elem 'none'", + "script-src-attr 'none'" + )) + + self.flow.response.headers.add('Content-Security-Policy', extra_csp) + + return None + +# For details of 'Content-Type' header's structure, see: +# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1 +content_type_reg = re.compile(r''' +^ +(?P[\w-]+/[\w-]+) +\s* +(?: + ; + (?:[^;]*;)* # match possible parameter other than "charset" +) +\s* +charset= # no whitespace allowed in parameter as per RFC +(?P + [\w-]+ + | + "[\w-]+" # quotes are optional per RFC +) +(?:;[^;]+)* # match possible parameter other than "charset" +$ # forbid possible dangling characters after closing '"' +''', re.VERBOSE | re.IGNORECASE) + +def deduce_content_type(headers: Headers) \ + -> tuple[t.Optional[str], t.Optional[str]]: + """....""" + content_type = headers.get('content-type') + if content_type is None: + return (None, None) + + match = content_type_reg.match(content_type) + if match is None: + return (None, None) + + mime, encoding = match.group('mime'), match.group('encoding') + + if encoding is not None: + encoding = encoding.lower() + + return mime, encoding + +UTF8_BOM = b'\xEF\xBB\xBF' +BOMs = ( + (UTF8_BOM, 'utf-8'), + (b'\xFE\xFF', 'utf-16be'), + (b'\xFF\xFE', 'utf-16le') +) + +def block_attr(element: bs4.PageElement, atrr_name: str) -> None: + """....""" + # TODO: implement + pass + +@dc.dataclass(frozen=True) +class FlowHandlerInjectPayload(FlowHandler): + """....""" + policy: policies.PayloadPolicy + + stream_request: bool = True + + def __post_init__(self) -> None: + """....""" + script_src = f"script-src {self.policy.assets_base_url()}" + if self.policy.is_eval_allowed(): + script_src = f"{script_src} 'unsafe-eval'" + + self.new_csp = '; '.join(( + script_src, + "script-src-elem 'none'", + "script-src-attr 'none'" + )) + + def on_responseheaders(self) -> t.Optional[StateUpdater]: + """....""" + super().on_responseheaders() + + assert self.flow.response is not None + + for header_name, _ in csp_header_names_and_dispositions: + del self.flow.response.headers[header_name] + + self.flow.response.headers.add('Content-Security-Policy', self.new_csp) + + return None + + def on_response(self) -> t.Optional[StateUpdater]: + """....""" + super().on_response() + + assert self.flow.response is not None + + if self.flow.response.content is None: + return None + + mime, encoding = deduce_content_type(self.flow.response.headers) + if mime is None or 'html' not in mime: + return None + + # A UTF BOM overrides encoding specified by the header. + for bom, encoding_name in BOMs: + if self.flow.response.content.startswith(bom): + encoding = encoding_name + + soup = bs4.BeautifulSoup( + markup = self.flow.response.content, + from_encoding = encoding, + features = 'html5lib' + ) + + # Inject scripts. + script_parent = soup.find('body') or soup.find('html') + if script_parent is None: + return None + + for url in self.policy.script_urls(): + script_parent.append(bs4.Tag(name='script', attrs={'src': url})) + + # Remove Content Security Policy that could possibly block injected + # scripts. + for meta in soup.select('head meta[http-equiv]'): + header_name = meta.attrs.get('http-equiv', '').lower().strip() + if header_name in csp_enforce_header_names_set: + block_attr(meta, 'http-equiv') + block_attr(meta, 'content') + + # Appending a three-byte Byte Order Mark (BOM) will force the browser to + # decode this as UTF-8 regardless of the 'Content-Type' header. See: + # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence + self.flow.response.content = UTF8_BOM + soup.encode() + + return None + +@dc.dataclass(frozen=True) +class FlowHandlerMetaResource(FlowHandler): + """....""" + policy: policies.MetaResourcePolicy + + def on_request(self) -> t.Optional[StateUpdater]: + """....""" + super().on_request() + # TODO: implement + #self.flow.response = .... + + return None + +def make_flow_handler(flow: http.HTTPFlow, policy: policies.Policy) \ + -> FlowHandler: + """....""" + if isinstance(policy, policies.BlockPolicy): + return FlowHandlerBlockScripts(flow, policy) + + if isinstance(policy, policies.AllowPolicy): + return FlowHandlerAllowScripts(flow, policy) + + if isinstance(policy, policies.PayloadPolicy): + return FlowHandlerInjectPayload(flow, policy) + + assert isinstance(policy, policies.MetaResourcePolicy) + # def response_creator(request: http.HTTPRequest) -> http.HTTPResponse: + # """....""" + # replacement_details = make_replacement_resource( + # policy.replacement, + # request.path + # ) + + # return http.HTTPResponse.make( + # replacement_details.status_code, + # replacement_details.content, + # replacement_details.content_type + # ) + return FlowHandlerMetaResource(flow, policy) -- cgit v1.2.3