# SPDX-License-Identifier: GPL-3.0-or-later
# Logic for modifying mitmproxy's HTTP flows.
#
# This file is part of Hydrilla&Haketilo.
#
# Copyright (C) 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
This module's file gets passed to Mitmproxy as addon script and makes it serve
as Haketilo proxy.
"""
# Enable using with Python 3.7.
from __future__ import annotations
import re
import typing as t
import dataclasses as dc
import bs4 # type: ignore
from mitmproxy import http
from mitmproxy.net.http import Headers
from mitmproxy.script import concurrent
from .state import HaketiloState
from . import policies
StateUpdater = t.Callable[[HaketiloState], None]
@dc.dataclass(frozen=True)
class FlowHandler:
"""...."""
flow: http.HTTPFlow
policy: policies.Policy
stream_request: bool = False
stream_response: bool = False
def on_requestheaders(self) -> t.Optional[StateUpdater]:
"""...."""
if self.stream_request:
self.flow.request.stream = True
return None
def on_request(self) -> t.Optional[StateUpdater]:
"""...."""
return None
def on_responseheaders(self) -> t.Optional[StateUpdater]:
"""...."""
assert self.flow.response is not None
if self.stream_response:
self.flow.response.stream = True
return None
def on_response(self) -> t.Optional[StateUpdater]:
"""...."""
return None
@dc.dataclass(frozen=True)
class FlowHandlerAllowScripts(FlowHandler):
"""...."""
policy: policies.AllowPolicy
stream_request: bool = True
stream_response: bool = True
csp_header_names_and_dispositions = (
('content-security-policy', 'enforce'),
('content-security-policy-report-only', 'report'),
('x-content-security-policy', 'enforce'),
('x-content-security-policy', 'report'),
('x-webkit-csp', 'enforce'),
('x-webkit-csp', 'report')
)
csp_enforce_header_names_set = {
name for name, disposition in csp_header_names_and_dispositions
if disposition == 'enforce'
}
@dc.dataclass
class ContentSecurityPolicy:
directives: dict[str, list[str]]
header_name: str
disposition: str
@staticmethod
def deserialize(
serialized: str,
header_name: str,
disposition: str = 'enforce'
) -> 'ContentSecurityPolicy':
"""...."""
# For more info, see:
# https://www.w3.org/TR/CSP3/#parse-serialized-policy
directives = {}
for serialized_directive in serialized.split(';'):
if not serialized_directive.isascii():
continue
serialized_directive = serialized_directive.strip()
if len(serialized_directive) == 0:
continue
tokens = serialized_directive.split()
directive_name = tokens.pop(0).lower()
directive_value = tokens
# Specs mention giving warnings for duplicate directive names but
# from our proxy's perspective this is not important right now.
if directive_name in directives:
continue
directives[directive_name] = directive_value
return ContentSecurityPolicy(directives, header_name, disposition)
def serialize(self) -> str:
"""...."""
serialized_directives = []
for name, value_list in self.directives.items():
serialized_directives.append(f'{name} {" ".join(value_list)}')
return ';'.join(serialized_directives)
def extract_csp(headers: Headers) -> tuple[ContentSecurityPolicy, ...]:
"""...."""
csp_policies = []
for header_name, disposition in csp_header_names_and_dispositions:
for serialized_list in headers.get(header_name, ''):
for serialized in serialized_list.split(','):
policy = ContentSecurityPolicy.deserialize(
serialized,
header_name,
disposition
)
if policy.directives != {}:
csp_policies.append(policy)
return tuple(csp_policies)
csp_script_directive_names = (
'script-src',
'script-src-elem',
'script-src-attr'
)
@dc.dataclass(frozen=True)
class FlowHandlerBlockScripts(FlowHandler):
policy: policies.BlockPolicy
stream_request: bool = True
stream_response: bool = True
def on_responseheaders(self) -> t.Optional[StateUpdater]:
"""...."""
super().on_responseheaders()
assert self.flow.response is not None
csp_policies = extract_csp(self.flow.response.headers)
for header_name, _ in csp_header_names_and_dispositions:
del self.flow.response.headers[header_name]
for policy in csp_policies:
if policy.disposition != 'enforce':
continue
policy.directives.pop('report-to')
policy.directives.pop('report-uri')
self.flow.response.headers.add(
policy.header_name,
policy.serialize()
)
extra_csp = ';'.join((
"script-src 'none'",
"script-src-elem 'none'",
"script-src-attr 'none'"
))
self.flow.response.headers.add('Content-Security-Policy', extra_csp)
return None
# For details of 'Content-Type' header's structure, see:
# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1
content_type_reg = re.compile(r'''
^
(?P[\w-]+/[\w-]+)
\s*
(?:
;
(?:[^;]*;)* # match possible parameter other than "charset"
)
\s*
charset= # no whitespace allowed in parameter as per RFC
(?P
[\w-]+
|
"[\w-]+" # quotes are optional per RFC
)
(?:;[^;]+)* # match possible parameter other than "charset"
$ # forbid possible dangling characters after closing '"'
''', re.VERBOSE | re.IGNORECASE)
def deduce_content_type(headers: Headers) \
-> tuple[t.Optional[str], t.Optional[str]]:
"""...."""
content_type = headers.get('content-type')
if content_type is None:
return (None, None)
match = content_type_reg.match(content_type)
if match is None:
return (None, None)
mime, encoding = match.group('mime'), match.group('encoding')
if encoding is not None:
encoding = encoding.lower()
return mime, encoding
UTF8_BOM = b'\xEF\xBB\xBF'
BOMs = (
(UTF8_BOM, 'utf-8'),
(b'\xFE\xFF', 'utf-16be'),
(b'\xFF\xFE', 'utf-16le')
)
def block_attr(element: bs4.PageElement, atrr_name: str) -> None:
"""...."""
# TODO: implement
pass
@dc.dataclass(frozen=True)
class FlowHandlerInjectPayload(FlowHandler):
"""...."""
policy: policies.PayloadPolicy
stream_request: bool = True
def __post_init__(self) -> None:
"""...."""
script_src = f"script-src {self.policy.assets_base_url()}"
if self.policy.is_eval_allowed():
script_src = f"{script_src} 'unsafe-eval'"
self.new_csp = '; '.join((
script_src,
"script-src-elem 'none'",
"script-src-attr 'none'"
))
def on_responseheaders(self) -> t.Optional[StateUpdater]:
"""...."""
super().on_responseheaders()
assert self.flow.response is not None
for header_name, _ in csp_header_names_and_dispositions:
del self.flow.response.headers[header_name]
self.flow.response.headers.add('Content-Security-Policy', self.new_csp)
return None
def on_response(self) -> t.Optional[StateUpdater]:
"""...."""
super().on_response()
assert self.flow.response is not None
if self.flow.response.content is None:
return None
mime, encoding = deduce_content_type(self.flow.response.headers)
if mime is None or 'html' not in mime:
return None
# A UTF BOM overrides encoding specified by the header.
for bom, encoding_name in BOMs:
if self.flow.response.content.startswith(bom):
encoding = encoding_name
soup = bs4.BeautifulSoup(
markup = self.flow.response.content,
from_encoding = encoding,
features = 'html5lib'
)
# Inject scripts.
script_parent = soup.find('body') or soup.find('html')
if script_parent is None:
return None
for url in self.policy.script_urls():
script_parent.append(bs4.Tag(name='script', attrs={'src': url}))
# Remove Content Security Policy that could possibly block injected
# scripts.
for meta in soup.select('head meta[http-equiv]'):
header_name = meta.attrs.get('http-equiv', '').lower().strip()
if header_name in csp_enforce_header_names_set:
block_attr(meta, 'http-equiv')
block_attr(meta, 'content')
# Appending a three-byte Byte Order Mark (BOM) will force the browser to
# decode this as UTF-8 regardless of the 'Content-Type' header. See:
# https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
self.flow.response.content = UTF8_BOM + soup.encode()
return None
@dc.dataclass(frozen=True)
class FlowHandlerMetaResource(FlowHandler):
"""...."""
policy: policies.MetaResourcePolicy
def on_request(self) -> t.Optional[StateUpdater]:
"""...."""
super().on_request()
# TODO: implement
#self.flow.response = ....
return None
def make_flow_handler(flow: http.HTTPFlow, policy: policies.Policy) \
-> FlowHandler:
"""...."""
if isinstance(policy, policies.BlockPolicy):
return FlowHandlerBlockScripts(flow, policy)
if isinstance(policy, policies.AllowPolicy):
return FlowHandlerAllowScripts(flow, policy)
if isinstance(policy, policies.PayloadPolicy):
return FlowHandlerInjectPayload(flow, policy)
assert isinstance(policy, policies.MetaResourcePolicy)
# def response_creator(request: http.HTTPRequest) -> http.HTTPResponse:
# """...."""
# replacement_details = make_replacement_resource(
# policy.replacement,
# request.path
# )
# return http.HTTPResponse.make(
# replacement_details.status_code,
# replacement_details.content,
# replacement_details.content_type
# )
return FlowHandlerMetaResource(flow, policy)