aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/flow_handlers.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/flow_handlers.py')
-rw-r--r--src/hydrilla/proxy/flow_handlers.py383
1 files changed, 0 insertions, 383 deletions
diff --git a/src/hydrilla/proxy/flow_handlers.py b/src/hydrilla/proxy/flow_handlers.py
deleted file mode 100644
index 605c7f9..0000000
--- a/src/hydrilla/proxy/flow_handlers.py
+++ /dev/null
@@ -1,383 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-# Logic for modifying mitmproxy's HTTP flows.
-#
-# This file is part of Hydrilla&Haketilo.
-#
-# Copyright (C) 2022 Wojtek Kosior
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <https://www.gnu.org/licenses/>.
-#
-#
-# I, Wojtek Kosior, thereby promise not to sue for violation of this
-# file's license. Although I request that you do not make use this code
-# in a proprietary program, I am not going to enforce this in court.
-
-"""
-This module's file gets passed to Mitmproxy as addon script and makes it serve
-as Haketilo proxy.
-"""
-
-# Enable using with Python 3.7.
-from __future__ import annotations
-
-import re
-import typing as t
-import dataclasses as dc
-
-import bs4 # type: ignore
-
-from mitmproxy import http
-from mitmproxy.net.http import Headers
-from mitmproxy.script import concurrent
-
-from .state import HaketiloState
-from . import policies
-
-StateUpdater = t.Callable[[HaketiloState], None]
-
-@dc.dataclass(frozen=True)
-class FlowHandler:
- """...."""
- flow: http.HTTPFlow
- policy: policies.Policy
-
- stream_request: bool = False
- stream_response: bool = False
-
- def on_requestheaders(self) -> t.Optional[StateUpdater]:
- """...."""
- if self.stream_request:
- self.flow.request.stream = True
-
- return None
-
- def on_request(self) -> t.Optional[StateUpdater]:
- """...."""
- return None
-
- def on_responseheaders(self) -> t.Optional[StateUpdater]:
- """...."""
- assert self.flow.response is not None
-
- if self.stream_response:
- self.flow.response.stream = True
-
- return None
-
- def on_response(self) -> t.Optional[StateUpdater]:
- """...."""
- return None
-
-@dc.dataclass(frozen=True)
-class FlowHandlerAllowScripts(FlowHandler):
- """...."""
- policy: policies.AllowPolicy
-
- stream_request: bool = True
- stream_response: bool = True
-
-csp_header_names_and_dispositions = (
- ('content-security-policy', 'enforce'),
- ('content-security-policy-report-only', 'report'),
- ('x-content-security-policy', 'enforce'),
- ('x-content-security-policy', 'report'),
- ('x-webkit-csp', 'enforce'),
- ('x-webkit-csp', 'report')
-)
-
-csp_enforce_header_names_set = {
- name for name, disposition in csp_header_names_and_dispositions
- if disposition == 'enforce'
-}
-
-@dc.dataclass
-class ContentSecurityPolicy:
- directives: dict[str, list[str]]
- header_name: str
- disposition: str
-
- @staticmethod
- def deserialize(
- serialized: str,
- header_name: str,
- disposition: str = 'enforce'
- ) -> 'ContentSecurityPolicy':
- """...."""
- # For more info, see:
- # https://www.w3.org/TR/CSP3/#parse-serialized-policy
- directives = {}
-
- for serialized_directive in serialized.split(';'):
- if not serialized_directive.isascii():
- continue
-
- serialized_directive = serialized_directive.strip()
- if len(serialized_directive) == 0:
- continue
-
- tokens = serialized_directive.split()
- directive_name = tokens.pop(0).lower()
- directive_value = tokens
-
- # Specs mention giving warnings for duplicate directive names but
- # from our proxy's perspective this is not important right now.
- if directive_name in directives:
- continue
-
- directives[directive_name] = directive_value
-
- return ContentSecurityPolicy(directives, header_name, disposition)
-
- def serialize(self) -> str:
- """...."""
- serialized_directives = []
- for name, value_list in self.directives.items():
- serialized_directives.append(f'{name} {" ".join(value_list)}')
-
- return ';'.join(serialized_directives)
-
-def extract_csp(headers: Headers) -> tuple[ContentSecurityPolicy, ...]:
- """...."""
- csp_policies = []
-
- for header_name, disposition in csp_header_names_and_dispositions:
- for serialized_list in headers.get(header_name, ''):
- for serialized in serialized_list.split(','):
- policy = ContentSecurityPolicy.deserialize(
- serialized,
- header_name,
- disposition
- )
-
- if policy.directives != {}:
- csp_policies.append(policy)
-
- return tuple(csp_policies)
-
-csp_script_directive_names = (
- 'script-src',
- 'script-src-elem',
- 'script-src-attr'
-)
-
-@dc.dataclass(frozen=True)
-class FlowHandlerBlockScripts(FlowHandler):
- policy: policies.BlockPolicy
-
- stream_request: bool = True
- stream_response: bool = True
-
- def on_responseheaders(self) -> t.Optional[StateUpdater]:
- """...."""
- super().on_responseheaders()
-
- assert self.flow.response is not None
-
- csp_policies = extract_csp(self.flow.response.headers)
-
- for header_name, _ in csp_header_names_and_dispositions:
- del self.flow.response.headers[header_name]
-
- for policy in csp_policies:
- if policy.disposition != 'enforce':
- continue
-
- policy.directives.pop('report-to')
- policy.directives.pop('report-uri')
-
- self.flow.response.headers.add(
- policy.header_name,
- policy.serialize()
- )
-
- extra_csp = ';'.join((
- "script-src 'none'",
- "script-src-elem 'none'",
- "script-src-attr 'none'"
- ))
-
- self.flow.response.headers.add('Content-Security-Policy', extra_csp)
-
- return None
-
-# For details of 'Content-Type' header's structure, see:
-# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1
-content_type_reg = re.compile(r'''
-^
-(?P<mime>[\w-]+/[\w-]+)
-\s*
-(?:
- ;
- (?:[^;]*;)* # match possible parameter other than "charset"
-)
-\s*
-charset= # no whitespace allowed in parameter as per RFC
-(?P<encoding>
- [\w-]+
- |
- "[\w-]+" # quotes are optional per RFC
-)
-(?:;[^;]+)* # match possible parameter other than "charset"
-$ # forbid possible dangling characters after closing '"'
-''', re.VERBOSE | re.IGNORECASE)
-
-def deduce_content_type(headers: Headers) \
- -> tuple[t.Optional[str], t.Optional[str]]:
- """...."""
- content_type = headers.get('content-type')
- if content_type is None:
- return (None, None)
-
- match = content_type_reg.match(content_type)
- if match is None:
- return (None, None)
-
- mime, encoding = match.group('mime'), match.group('encoding')
-
- if encoding is not None:
- encoding = encoding.lower()
-
- return mime, encoding
-
-UTF8_BOM = b'\xEF\xBB\xBF'
-BOMs = (
- (UTF8_BOM, 'utf-8'),
- (b'\xFE\xFF', 'utf-16be'),
- (b'\xFF\xFE', 'utf-16le')
-)
-
-def block_attr(element: bs4.PageElement, atrr_name: str) -> None:
- """...."""
- # TODO: implement
- pass
-
-@dc.dataclass(frozen=True)
-class FlowHandlerInjectPayload(FlowHandler):
- """...."""
- policy: policies.PayloadPolicy
-
- stream_request: bool = True
-
- def __post_init__(self) -> None:
- """...."""
- script_src = f"script-src {self.policy.assets_base_url()}"
- if self.policy.is_eval_allowed():
- script_src = f"{script_src} 'unsafe-eval'"
-
- self.new_csp = '; '.join((
- script_src,
- "script-src-elem 'none'",
- "script-src-attr 'none'"
- ))
-
- def on_responseheaders(self) -> t.Optional[StateUpdater]:
- """...."""
- super().on_responseheaders()
-
- assert self.flow.response is not None
-
- for header_name, _ in csp_header_names_and_dispositions:
- del self.flow.response.headers[header_name]
-
- self.flow.response.headers.add('Content-Security-Policy', self.new_csp)
-
- return None
-
- def on_response(self) -> t.Optional[StateUpdater]:
- """...."""
- super().on_response()
-
- assert self.flow.response is not None
-
- if self.flow.response.content is None:
- return None
-
- mime, encoding = deduce_content_type(self.flow.response.headers)
- if mime is None or 'html' not in mime:
- return None
-
- # A UTF BOM overrides encoding specified by the header.
- for bom, encoding_name in BOMs:
- if self.flow.response.content.startswith(bom):
- encoding = encoding_name
-
- soup = bs4.BeautifulSoup(
- markup = self.flow.response.content,
- from_encoding = encoding,
- features = 'html5lib'
- )
-
- # Inject scripts.
- script_parent = soup.find('body') or soup.find('html')
- if script_parent is None:
- return None
-
- for url in self.policy.script_urls():
- script_parent.append(bs4.Tag(name='script', attrs={'src': url}))
-
- # Remove Content Security Policy that could possibly block injected
- # scripts.
- for meta in soup.select('head meta[http-equiv]'):
- header_name = meta.attrs.get('http-equiv', '').lower().strip()
- if header_name in csp_enforce_header_names_set:
- block_attr(meta, 'http-equiv')
- block_attr(meta, 'content')
-
- # Appending a three-byte Byte Order Mark (BOM) will force the browser to
- # decode this as UTF-8 regardless of the 'Content-Type' header. See:
- # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
- self.flow.response.content = UTF8_BOM + soup.encode()
-
- return None
-
-@dc.dataclass(frozen=True)
-class FlowHandlerMetaResource(FlowHandler):
- """...."""
- policy: policies.MetaResourcePolicy
-
- def on_request(self) -> t.Optional[StateUpdater]:
- """...."""
- super().on_request()
- # TODO: implement
- #self.flow.response = ....
-
- return None
-
-def make_flow_handler(flow: http.HTTPFlow, policy: policies.Policy) \
- -> FlowHandler:
- """...."""
- if isinstance(policy, policies.BlockPolicy):
- return FlowHandlerBlockScripts(flow, policy)
-
- if isinstance(policy, policies.AllowPolicy):
- return FlowHandlerAllowScripts(flow, policy)
-
- if isinstance(policy, policies.PayloadPolicy):
- return FlowHandlerInjectPayload(flow, policy)
-
- assert isinstance(policy, policies.MetaResourcePolicy)
- # def response_creator(request: http.HTTPRequest) -> http.HTTPResponse:
- # """...."""
- # replacement_details = make_replacement_resource(
- # policy.replacement,
- # request.path
- # )
-
- # return http.HTTPResponse.make(
- # replacement_details.status_code,
- # replacement_details.content,
- # replacement_details.content_type
- # )
- return FlowHandlerMetaResource(flow, policy)