From 879c41927171efc8d77d1de2739b18e2eb57580f Mon Sep 17 00:00:00 2001 From: Wojtek Kosior Date: Wed, 27 Jul 2022 15:56:24 +0200 Subject: unfinished partial work --- src/hydrilla/proxy/flow_handlers.py | 383 ------------------------------------ 1 file changed, 383 deletions(-) delete mode 100644 src/hydrilla/proxy/flow_handlers.py (limited to 'src/hydrilla/proxy/flow_handlers.py') diff --git a/src/hydrilla/proxy/flow_handlers.py b/src/hydrilla/proxy/flow_handlers.py deleted file mode 100644 index 605c7f9..0000000 --- a/src/hydrilla/proxy/flow_handlers.py +++ /dev/null @@ -1,383 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -# Logic for modifying mitmproxy's HTTP flows. -# -# This file is part of Hydrilla&Haketilo. -# -# Copyright (C) 2022 Wojtek Kosior -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# -# I, Wojtek Kosior, thereby promise not to sue for violation of this -# file's license. Although I request that you do not make use this code -# in a proprietary program, I am not going to enforce this in court. - -""" -This module's file gets passed to Mitmproxy as addon script and makes it serve -as Haketilo proxy. -""" - -# Enable using with Python 3.7. -from __future__ import annotations - -import re -import typing as t -import dataclasses as dc - -import bs4 # type: ignore - -from mitmproxy import http -from mitmproxy.net.http import Headers -from mitmproxy.script import concurrent - -from .state import HaketiloState -from . import policies - -StateUpdater = t.Callable[[HaketiloState], None] - -@dc.dataclass(frozen=True) -class FlowHandler: - """....""" - flow: http.HTTPFlow - policy: policies.Policy - - stream_request: bool = False - stream_response: bool = False - - def on_requestheaders(self) -> t.Optional[StateUpdater]: - """....""" - if self.stream_request: - self.flow.request.stream = True - - return None - - def on_request(self) -> t.Optional[StateUpdater]: - """....""" - return None - - def on_responseheaders(self) -> t.Optional[StateUpdater]: - """....""" - assert self.flow.response is not None - - if self.stream_response: - self.flow.response.stream = True - - return None - - def on_response(self) -> t.Optional[StateUpdater]: - """....""" - return None - -@dc.dataclass(frozen=True) -class FlowHandlerAllowScripts(FlowHandler): - """....""" - policy: policies.AllowPolicy - - stream_request: bool = True - stream_response: bool = True - -csp_header_names_and_dispositions = ( - ('content-security-policy', 'enforce'), - ('content-security-policy-report-only', 'report'), - ('x-content-security-policy', 'enforce'), - ('x-content-security-policy', 'report'), - ('x-webkit-csp', 'enforce'), - ('x-webkit-csp', 'report') -) - -csp_enforce_header_names_set = { - name for name, disposition in csp_header_names_and_dispositions - if disposition == 'enforce' -} - -@dc.dataclass -class ContentSecurityPolicy: - directives: dict[str, list[str]] - header_name: str - disposition: str - - @staticmethod - def deserialize( - serialized: str, - header_name: str, - disposition: str = 'enforce' - ) -> 'ContentSecurityPolicy': - """....""" - # For more info, see: - # https://www.w3.org/TR/CSP3/#parse-serialized-policy - directives = {} - - for serialized_directive in serialized.split(';'): - if not serialized_directive.isascii(): - continue - - serialized_directive = serialized_directive.strip() - if len(serialized_directive) == 0: - continue - - tokens = serialized_directive.split() - directive_name = tokens.pop(0).lower() - directive_value = tokens - - # Specs mention giving warnings for duplicate directive names but - # from our proxy's perspective this is not important right now. - if directive_name in directives: - continue - - directives[directive_name] = directive_value - - return ContentSecurityPolicy(directives, header_name, disposition) - - def serialize(self) -> str: - """....""" - serialized_directives = [] - for name, value_list in self.directives.items(): - serialized_directives.append(f'{name} {" ".join(value_list)}') - - return ';'.join(serialized_directives) - -def extract_csp(headers: Headers) -> tuple[ContentSecurityPolicy, ...]: - """....""" - csp_policies = [] - - for header_name, disposition in csp_header_names_and_dispositions: - for serialized_list in headers.get(header_name, ''): - for serialized in serialized_list.split(','): - policy = ContentSecurityPolicy.deserialize( - serialized, - header_name, - disposition - ) - - if policy.directives != {}: - csp_policies.append(policy) - - return tuple(csp_policies) - -csp_script_directive_names = ( - 'script-src', - 'script-src-elem', - 'script-src-attr' -) - -@dc.dataclass(frozen=True) -class FlowHandlerBlockScripts(FlowHandler): - policy: policies.BlockPolicy - - stream_request: bool = True - stream_response: bool = True - - def on_responseheaders(self) -> t.Optional[StateUpdater]: - """....""" - super().on_responseheaders() - - assert self.flow.response is not None - - csp_policies = extract_csp(self.flow.response.headers) - - for header_name, _ in csp_header_names_and_dispositions: - del self.flow.response.headers[header_name] - - for policy in csp_policies: - if policy.disposition != 'enforce': - continue - - policy.directives.pop('report-to') - policy.directives.pop('report-uri') - - self.flow.response.headers.add( - policy.header_name, - policy.serialize() - ) - - extra_csp = ';'.join(( - "script-src 'none'", - "script-src-elem 'none'", - "script-src-attr 'none'" - )) - - self.flow.response.headers.add('Content-Security-Policy', extra_csp) - - return None - -# For details of 'Content-Type' header's structure, see: -# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1 -content_type_reg = re.compile(r''' -^ -(?P[\w-]+/[\w-]+) -\s* -(?: - ; - (?:[^;]*;)* # match possible parameter other than "charset" -) -\s* -charset= # no whitespace allowed in parameter as per RFC -(?P - [\w-]+ - | - "[\w-]+" # quotes are optional per RFC -) -(?:;[^;]+)* # match possible parameter other than "charset" -$ # forbid possible dangling characters after closing '"' -''', re.VERBOSE | re.IGNORECASE) - -def deduce_content_type(headers: Headers) \ - -> tuple[t.Optional[str], t.Optional[str]]: - """....""" - content_type = headers.get('content-type') - if content_type is None: - return (None, None) - - match = content_type_reg.match(content_type) - if match is None: - return (None, None) - - mime, encoding = match.group('mime'), match.group('encoding') - - if encoding is not None: - encoding = encoding.lower() - - return mime, encoding - -UTF8_BOM = b'\xEF\xBB\xBF' -BOMs = ( - (UTF8_BOM, 'utf-8'), - (b'\xFE\xFF', 'utf-16be'), - (b'\xFF\xFE', 'utf-16le') -) - -def block_attr(element: bs4.PageElement, atrr_name: str) -> None: - """....""" - # TODO: implement - pass - -@dc.dataclass(frozen=True) -class FlowHandlerInjectPayload(FlowHandler): - """....""" - policy: policies.PayloadPolicy - - stream_request: bool = True - - def __post_init__(self) -> None: - """....""" - script_src = f"script-src {self.policy.assets_base_url()}" - if self.policy.is_eval_allowed(): - script_src = f"{script_src} 'unsafe-eval'" - - self.new_csp = '; '.join(( - script_src, - "script-src-elem 'none'", - "script-src-attr 'none'" - )) - - def on_responseheaders(self) -> t.Optional[StateUpdater]: - """....""" - super().on_responseheaders() - - assert self.flow.response is not None - - for header_name, _ in csp_header_names_and_dispositions: - del self.flow.response.headers[header_name] - - self.flow.response.headers.add('Content-Security-Policy', self.new_csp) - - return None - - def on_response(self) -> t.Optional[StateUpdater]: - """....""" - super().on_response() - - assert self.flow.response is not None - - if self.flow.response.content is None: - return None - - mime, encoding = deduce_content_type(self.flow.response.headers) - if mime is None or 'html' not in mime: - return None - - # A UTF BOM overrides encoding specified by the header. - for bom, encoding_name in BOMs: - if self.flow.response.content.startswith(bom): - encoding = encoding_name - - soup = bs4.BeautifulSoup( - markup = self.flow.response.content, - from_encoding = encoding, - features = 'html5lib' - ) - - # Inject scripts. - script_parent = soup.find('body') or soup.find('html') - if script_parent is None: - return None - - for url in self.policy.script_urls(): - script_parent.append(bs4.Tag(name='script', attrs={'src': url})) - - # Remove Content Security Policy that could possibly block injected - # scripts. - for meta in soup.select('head meta[http-equiv]'): - header_name = meta.attrs.get('http-equiv', '').lower().strip() - if header_name in csp_enforce_header_names_set: - block_attr(meta, 'http-equiv') - block_attr(meta, 'content') - - # Appending a three-byte Byte Order Mark (BOM) will force the browser to - # decode this as UTF-8 regardless of the 'Content-Type' header. See: - # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence - self.flow.response.content = UTF8_BOM + soup.encode() - - return None - -@dc.dataclass(frozen=True) -class FlowHandlerMetaResource(FlowHandler): - """....""" - policy: policies.MetaResourcePolicy - - def on_request(self) -> t.Optional[StateUpdater]: - """....""" - super().on_request() - # TODO: implement - #self.flow.response = .... - - return None - -def make_flow_handler(flow: http.HTTPFlow, policy: policies.Policy) \ - -> FlowHandler: - """....""" - if isinstance(policy, policies.BlockPolicy): - return FlowHandlerBlockScripts(flow, policy) - - if isinstance(policy, policies.AllowPolicy): - return FlowHandlerAllowScripts(flow, policy) - - if isinstance(policy, policies.PayloadPolicy): - return FlowHandlerInjectPayload(flow, policy) - - assert isinstance(policy, policies.MetaResourcePolicy) - # def response_creator(request: http.HTTPRequest) -> http.HTTPResponse: - # """....""" - # replacement_details = make_replacement_resource( - # policy.replacement, - # request.path - # ) - - # return http.HTTPResponse.make( - # replacement_details.status_code, - # replacement_details.content, - # replacement_details.content_type - # ) - return FlowHandlerMetaResource(flow, policy) -- cgit v1.2.3