# SPDX-License-Identifier: GPL-3.0-or-later # Logic for modifying mitmproxy's HTTP flows. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ This module's file gets passed to Mitmproxy as addon script and makes it serve as Haketilo proxy. """ # Enable using with Python 3.7. from __future__ import annotations import re import typing as t import dataclasses as dc import bs4 # type: ignore from mitmproxy import http from mitmproxy.net.http import Headers from mitmproxy.script import concurrent from .state import HaketiloState from . import policies StateUpdater = t.Callable[[HaketiloState], None] @dc.dataclass(frozen=True) class FlowHandler: """....""" flow: http.HTTPFlow policy: policies.Policy stream_request: bool = False stream_response: bool = False def on_requestheaders(self) -> t.Optional[StateUpdater]: """....""" if self.stream_request: self.flow.request.stream = True return None def on_request(self) -> t.Optional[StateUpdater]: """....""" return None def on_responseheaders(self) -> t.Optional[StateUpdater]: """....""" assert self.flow.response is not None if self.stream_response: self.flow.response.stream = True return None def on_response(self) -> t.Optional[StateUpdater]: """....""" return None @dc.dataclass(frozen=True) class FlowHandlerAllowScripts(FlowHandler): """....""" policy: policies.AllowPolicy stream_request: bool = True stream_response: bool = True csp_header_names_and_dispositions = ( ('content-security-policy', 'enforce'), ('content-security-policy-report-only', 'report'), ('x-content-security-policy', 'enforce'), ('x-content-security-policy', 'report'), ('x-webkit-csp', 'enforce'), ('x-webkit-csp', 'report') ) csp_enforce_header_names_set = { name for name, disposition in csp_header_names_and_dispositions if disposition == 'enforce' } @dc.dataclass class ContentSecurityPolicy: directives: dict[str, list[str]] header_name: str disposition: str @staticmethod def deserialize( serialized: str, header_name: str, disposition: str = 'enforce' ) -> 'ContentSecurityPolicy': """....""" # For more info, see: # https://www.w3.org/TR/CSP3/#parse-serialized-policy directives = {} for serialized_directive in serialized.split(';'): if not serialized_directive.isascii(): continue serialized_directive = serialized_directive.strip() if len(serialized_directive) == 0: continue tokens = serialized_directive.split() directive_name = tokens.pop(0).lower() directive_value = tokens # Specs mention giving warnings for duplicate directive names but # from our proxy's perspective this is not important right now. if directive_name in directives: continue directives[directive_name] = directive_value return ContentSecurityPolicy(directives, header_name, disposition) def serialize(self) -> str: """....""" serialized_directives = [] for name, value_list in self.directives.items(): serialized_directives.append(f'{name} {" ".join(value_list)}') return ';'.join(serialized_directives) def extract_csp(headers: Headers) -> tuple[ContentSecurityPolicy, ...]: """....""" csp_policies = [] for header_name, disposition in csp_header_names_and_dispositions: for serialized_list in headers.get(header_name, ''): for serialized in serialized_list.split(','): policy = ContentSecurityPolicy.deserialize( serialized, header_name, disposition ) if policy.directives != {}: csp_policies.append(policy) return tuple(csp_policies) csp_script_directive_names = ( 'script-src', 'script-src-elem', 'script-src-attr' ) @dc.dataclass(frozen=True) class FlowHandlerBlockScripts(FlowHandler): policy: policies.BlockPolicy stream_request: bool = True stream_response: bool = True def on_responseheaders(self) -> t.Optional[StateUpdater]: """....""" super().on_responseheaders() assert self.flow.response is not None csp_policies = extract_csp(self.flow.response.headers) for header_name, _ in csp_header_names_and_dispositions: del self.flow.response.headers[header_name] for policy in csp_policies: if policy.disposition != 'enforce': continue policy.directives.pop('report-to') policy.directives.pop('report-uri') self.flow.response.headers.add( policy.header_name, policy.serialize() ) extra_csp = ';'.join(( "script-src 'none'", "script-src-elem 'none'", "script-src-attr 'none'" )) self.flow.response.headers.add('Content-Security-Policy', extra_csp) return None # For details of 'Content-Type' header's structure, see: # https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1 content_type_reg = re.compile(r''' ^ (?P[\w-]+/[\w-]+) \s* (?: ; (?:[^;]*;)* # match possible parameter other than "charset" ) \s* charset= # no whitespace allowed in parameter as per RFC (?P [\w-]+ | "[\w-]+" # quotes are optional per RFC ) (?:;[^;]+)* # match possible parameter other than "charset" $ # forbid possible dangling characters after closing '"' ''', re.VERBOSE | re.IGNORECASE) def deduce_content_type(headers: Headers) \ -> tuple[t.Optional[str], t.Optional[str]]: """....""" content_type = headers.get('content-type') if content_type is None: return (None, None) match = content_type_reg.match(content_type) if match is None: return (None, None) mime, encoding = match.group('mime'), match.group('encoding') if encoding is not None: encoding = encoding.lower() return mime, encoding UTF8_BOM = b'\xEF\xBB\xBF' BOMs = ( (UTF8_BOM, 'utf-8'), (b'\xFE\xFF', 'utf-16be'), (b'\xFF\xFE', 'utf-16le') ) def block_attr(element: bs4.PageElement, atrr_name: str) -> None: """....""" # TODO: implement pass @dc.dataclass(frozen=True) class FlowHandlerInjectPayload(FlowHandler): """....""" policy: policies.PayloadPolicy stream_request: bool = True def __post_init__(self) -> None: """....""" script_src = f"script-src {self.policy.assets_base_url()}" if self.policy.is_eval_allowed(): script_src = f"{script_src} 'unsafe-eval'" self.new_csp = '; '.join(( script_src, "script-src-elem 'none'", "script-src-attr 'none'" )) def on_responseheaders(self) -> t.Optional[StateUpdater]: """....""" super().on_responseheaders() assert self.flow.response is not None for header_name, _ in csp_header_names_and_dispositions: del self.flow.response.headers[header_name] self.flow.response.headers.add('Content-Security-Policy', self.new_csp) return None def on_response(self) -> t.Optional[StateUpdater]: """....""" super().on_response() assert self.flow.response is not None if self.flow.response.content is None: return None mime, encoding = deduce_content_type(self.flow.response.headers) if mime is None or 'html' not in mime: return None # A UTF BOM overrides encoding specified by the header. for bom, encoding_name in BOMs: if self.flow.response.content.startswith(bom): encoding = encoding_name soup = bs4.BeautifulSoup( markup = self.flow.response.content, from_encoding = encoding, features = 'html5lib' ) # Inject scripts. script_parent = soup.find('body') or soup.find('html') if script_parent is None: return None for url in self.policy.script_urls(): script_parent.append(bs4.Tag(name='script', attrs={'src': url})) # Remove Content Security Policy that could possibly block injected # scripts. for meta in soup.select('head meta[http-equiv]'): header_name = meta.attrs.get('http-equiv', '').lower().strip() if header_name in csp_enforce_header_names_set: block_attr(meta, 'http-equiv') block_attr(meta, 'content') # Appending a three-byte Byte Order Mark (BOM) will force the browser to # decode this as UTF-8 regardless of the 'Content-Type' header. See: # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence self.flow.response.content = UTF8_BOM + soup.encode() return None @dc.dataclass(frozen=True) class FlowHandlerMetaResource(FlowHandler): """....""" policy: policies.MetaResourcePolicy def on_request(self) -> t.Optional[StateUpdater]: """....""" super().on_request() # TODO: implement #self.flow.response = .... return None def make_flow_handler(flow: http.HTTPFlow, policy: policies.Policy) \ -> FlowHandler: """....""" if isinstance(policy, policies.BlockPolicy): return FlowHandlerBlockScripts(flow, policy) if isinstance(policy, policies.AllowPolicy): return FlowHandlerAllowScripts(flow, policy) if isinstance(policy, policies.PayloadPolicy): return FlowHandlerInjectPayload(flow, policy) assert isinstance(policy, policies.MetaResourcePolicy) # def response_creator(request: http.HTTPRequest) -> http.HTTPResponse: # """....""" # replacement_details = make_replacement_resource( # policy.replacement, # request.path # ) # return http.HTTPResponse.make( # replacement_details.status_code, # replacement_details.content, # replacement_details.content_type # ) return FlowHandlerMetaResource(flow, policy)