aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/flow_handlers.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/flow_handlers.py')
-rw-r--r--src/hydrilla/proxy/flow_handlers.py383
1 files changed, 383 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/flow_handlers.py b/src/hydrilla/proxy/flow_handlers.py
new file mode 100644
index 0000000..605c7f9
--- /dev/null
+++ b/src/hydrilla/proxy/flow_handlers.py
@@ -0,0 +1,383 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Logic for modifying mitmproxy's HTTP flows.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+This module's file gets passed to Mitmproxy as addon script and makes it serve
+as Haketilo proxy.
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import re
+import typing as t
+import dataclasses as dc
+
+import bs4 # type: ignore
+
+from mitmproxy import http
+from mitmproxy.net.http import Headers
+from mitmproxy.script import concurrent
+
+from .state import HaketiloState
+from . import policies
+
+StateUpdater = t.Callable[[HaketiloState], None]
+
+@dc.dataclass(frozen=True)
+class FlowHandler:
+ """...."""
+ flow: http.HTTPFlow
+ policy: policies.Policy
+
+ stream_request: bool = False
+ stream_response: bool = False
+
+ def on_requestheaders(self) -> t.Optional[StateUpdater]:
+ """...."""
+ if self.stream_request:
+ self.flow.request.stream = True
+
+ return None
+
+ def on_request(self) -> t.Optional[StateUpdater]:
+ """...."""
+ return None
+
+ def on_responseheaders(self) -> t.Optional[StateUpdater]:
+ """...."""
+ assert self.flow.response is not None
+
+ if self.stream_response:
+ self.flow.response.stream = True
+
+ return None
+
+ def on_response(self) -> t.Optional[StateUpdater]:
+ """...."""
+ return None
+
+@dc.dataclass(frozen=True)
+class FlowHandlerAllowScripts(FlowHandler):
+ """...."""
+ policy: policies.AllowPolicy
+
+ stream_request: bool = True
+ stream_response: bool = True
+
+csp_header_names_and_dispositions = (
+ ('content-security-policy', 'enforce'),
+ ('content-security-policy-report-only', 'report'),
+ ('x-content-security-policy', 'enforce'),
+ ('x-content-security-policy', 'report'),
+ ('x-webkit-csp', 'enforce'),
+ ('x-webkit-csp', 'report')
+)
+
+csp_enforce_header_names_set = {
+ name for name, disposition in csp_header_names_and_dispositions
+ if disposition == 'enforce'
+}
+
+@dc.dataclass
+class ContentSecurityPolicy:
+ directives: dict[str, list[str]]
+ header_name: str
+ disposition: str
+
+ @staticmethod
+ def deserialize(
+ serialized: str,
+ header_name: str,
+ disposition: str = 'enforce'
+ ) -> 'ContentSecurityPolicy':
+ """...."""
+ # For more info, see:
+ # https://www.w3.org/TR/CSP3/#parse-serialized-policy
+ directives = {}
+
+ for serialized_directive in serialized.split(';'):
+ if not serialized_directive.isascii():
+ continue
+
+ serialized_directive = serialized_directive.strip()
+ if len(serialized_directive) == 0:
+ continue
+
+ tokens = serialized_directive.split()
+ directive_name = tokens.pop(0).lower()
+ directive_value = tokens
+
+ # Specs mention giving warnings for duplicate directive names but
+ # from our proxy's perspective this is not important right now.
+ if directive_name in directives:
+ continue
+
+ directives[directive_name] = directive_value
+
+ return ContentSecurityPolicy(directives, header_name, disposition)
+
+ def serialize(self) -> str:
+ """...."""
+ serialized_directives = []
+ for name, value_list in self.directives.items():
+ serialized_directives.append(f'{name} {" ".join(value_list)}')
+
+ return ';'.join(serialized_directives)
+
+def extract_csp(headers: Headers) -> tuple[ContentSecurityPolicy, ...]:
+ """...."""
+ csp_policies = []
+
+ for header_name, disposition in csp_header_names_and_dispositions:
+ for serialized_list in headers.get(header_name, ''):
+ for serialized in serialized_list.split(','):
+ policy = ContentSecurityPolicy.deserialize(
+ serialized,
+ header_name,
+ disposition
+ )
+
+ if policy.directives != {}:
+ csp_policies.append(policy)
+
+ return tuple(csp_policies)
+
+csp_script_directive_names = (
+ 'script-src',
+ 'script-src-elem',
+ 'script-src-attr'
+)
+
+@dc.dataclass(frozen=True)
+class FlowHandlerBlockScripts(FlowHandler):
+ policy: policies.BlockPolicy
+
+ stream_request: bool = True
+ stream_response: bool = True
+
+ def on_responseheaders(self) -> t.Optional[StateUpdater]:
+ """...."""
+ super().on_responseheaders()
+
+ assert self.flow.response is not None
+
+ csp_policies = extract_csp(self.flow.response.headers)
+
+ for header_name, _ in csp_header_names_and_dispositions:
+ del self.flow.response.headers[header_name]
+
+ for policy in csp_policies:
+ if policy.disposition != 'enforce':
+ continue
+
+ policy.directives.pop('report-to')
+ policy.directives.pop('report-uri')
+
+ self.flow.response.headers.add(
+ policy.header_name,
+ policy.serialize()
+ )
+
+ extra_csp = ';'.join((
+ "script-src 'none'",
+ "script-src-elem 'none'",
+ "script-src-attr 'none'"
+ ))
+
+ self.flow.response.headers.add('Content-Security-Policy', extra_csp)
+
+ return None
+
+# For details of 'Content-Type' header's structure, see:
+# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1
+content_type_reg = re.compile(r'''
+^
+(?P<mime>[\w-]+/[\w-]+)
+\s*
+(?:
+ ;
+ (?:[^;]*;)* # match possible parameter other than "charset"
+)
+\s*
+charset= # no whitespace allowed in parameter as per RFC
+(?P<encoding>
+ [\w-]+
+ |
+ "[\w-]+" # quotes are optional per RFC
+)
+(?:;[^;]+)* # match possible parameter other than "charset"
+$ # forbid possible dangling characters after closing '"'
+''', re.VERBOSE | re.IGNORECASE)
+
+def deduce_content_type(headers: Headers) \
+ -> tuple[t.Optional[str], t.Optional[str]]:
+ """...."""
+ content_type = headers.get('content-type')
+ if content_type is None:
+ return (None, None)
+
+ match = content_type_reg.match(content_type)
+ if match is None:
+ return (None, None)
+
+ mime, encoding = match.group('mime'), match.group('encoding')
+
+ if encoding is not None:
+ encoding = encoding.lower()
+
+ return mime, encoding
+
+UTF8_BOM = b'\xEF\xBB\xBF'
+BOMs = (
+ (UTF8_BOM, 'utf-8'),
+ (b'\xFE\xFF', 'utf-16be'),
+ (b'\xFF\xFE', 'utf-16le')
+)
+
+def block_attr(element: bs4.PageElement, atrr_name: str) -> None:
+ """...."""
+ # TODO: implement
+ pass
+
+@dc.dataclass(frozen=True)
+class FlowHandlerInjectPayload(FlowHandler):
+ """...."""
+ policy: policies.PayloadPolicy
+
+ stream_request: bool = True
+
+ def __post_init__(self) -> None:
+ """...."""
+ script_src = f"script-src {self.policy.assets_base_url()}"
+ if self.policy.is_eval_allowed():
+ script_src = f"{script_src} 'unsafe-eval'"
+
+ self.new_csp = '; '.join((
+ script_src,
+ "script-src-elem 'none'",
+ "script-src-attr 'none'"
+ ))
+
+ def on_responseheaders(self) -> t.Optional[StateUpdater]:
+ """...."""
+ super().on_responseheaders()
+
+ assert self.flow.response is not None
+
+ for header_name, _ in csp_header_names_and_dispositions:
+ del self.flow.response.headers[header_name]
+
+ self.flow.response.headers.add('Content-Security-Policy', self.new_csp)
+
+ return None
+
+ def on_response(self) -> t.Optional[StateUpdater]:
+ """...."""
+ super().on_response()
+
+ assert self.flow.response is not None
+
+ if self.flow.response.content is None:
+ return None
+
+ mime, encoding = deduce_content_type(self.flow.response.headers)
+ if mime is None or 'html' not in mime:
+ return None
+
+ # A UTF BOM overrides encoding specified by the header.
+ for bom, encoding_name in BOMs:
+ if self.flow.response.content.startswith(bom):
+ encoding = encoding_name
+
+ soup = bs4.BeautifulSoup(
+ markup = self.flow.response.content,
+ from_encoding = encoding,
+ features = 'html5lib'
+ )
+
+ # Inject scripts.
+ script_parent = soup.find('body') or soup.find('html')
+ if script_parent is None:
+ return None
+
+ for url in self.policy.script_urls():
+ script_parent.append(bs4.Tag(name='script', attrs={'src': url}))
+
+ # Remove Content Security Policy that could possibly block injected
+ # scripts.
+ for meta in soup.select('head meta[http-equiv]'):
+ header_name = meta.attrs.get('http-equiv', '').lower().strip()
+ if header_name in csp_enforce_header_names_set:
+ block_attr(meta, 'http-equiv')
+ block_attr(meta, 'content')
+
+ # Appending a three-byte Byte Order Mark (BOM) will force the browser to
+ # decode this as UTF-8 regardless of the 'Content-Type' header. See:
+ # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
+ self.flow.response.content = UTF8_BOM + soup.encode()
+
+ return None
+
+@dc.dataclass(frozen=True)
+class FlowHandlerMetaResource(FlowHandler):
+ """...."""
+ policy: policies.MetaResourcePolicy
+
+ def on_request(self) -> t.Optional[StateUpdater]:
+ """...."""
+ super().on_request()
+ # TODO: implement
+ #self.flow.response = ....
+
+ return None
+
+def make_flow_handler(flow: http.HTTPFlow, policy: policies.Policy) \
+ -> FlowHandler:
+ """...."""
+ if isinstance(policy, policies.BlockPolicy):
+ return FlowHandlerBlockScripts(flow, policy)
+
+ if isinstance(policy, policies.AllowPolicy):
+ return FlowHandlerAllowScripts(flow, policy)
+
+ if isinstance(policy, policies.PayloadPolicy):
+ return FlowHandlerInjectPayload(flow, policy)
+
+ assert isinstance(policy, policies.MetaResourcePolicy)
+ # def response_creator(request: http.HTTPRequest) -> http.HTTPResponse:
+ # """...."""
+ # replacement_details = make_replacement_resource(
+ # policy.replacement,
+ # request.path
+ # )
+
+ # return http.HTTPResponse.make(
+ # replacement_details.status_code,
+ # replacement_details.content,
+ # replacement_details.content_type
+ # )
+ return FlowHandlerMetaResource(flow, policy)