# SPDX-License-Identifier: GPL-3.0-or-later # Base defintions for policies for altering HTTP requests. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use of this # code in a proprietary program, I am not going to enforce this in # court. """ ..... """ import enum import re import dataclasses as dc import typing as t from threading import Lock from abc import ABC, abstractmethod from hashlib import sha256 from base64 import b64encode import jinja2 from immutables import Map from ... url_patterns import ParsedUrl from .. import state from .. import http_messages from .. import csp loader = jinja2.PackageLoader(__package__, package_path='injectable_scripts') jinja_env = jinja2.Environment( loader = loader, lstrip_blocks = True, autoescape = False ) jinja_lock = Lock() popup_script = jinja_env.get_template('popup.js.jinja').render() popup_script_sha256_bytes = sha256(popup_script.encode()).digest() popup_script_sha256_b64 = b64encode(popup_script_sha256_bytes).decode() class PolicyPriority(int, enum.Enum): """....""" _ONE = 1 _TWO = 2 _THREE = 3 MessageInfo = t.Union[ http_messages.RequestInfo, http_messages.ResponseInfo ] # We're doing *very* simple doctype matching for now. If a site wanted, it could # trick us into getting this wrong. doctype_re = re.compile(r'^\s*]*>', re.IGNORECASE) UTF8_BOM = b'\xEF\xBB\xBF' BOMs = ( (UTF8_BOM, 'utf-8'), (b'\xFE\xFF', 'utf-16be'), (b'\xFF\xFE', 'utf-16le') ) # mypy needs to be corrected: # https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704 @dc.dataclass(frozen=True) # type: ignore[misc] class Policy(ABC): _process_request: t.ClassVar[bool] = False _process_response: t.ClassVar[bool] = False anticache: t.ClassVar[bool] = True priority: t.ClassVar[PolicyPriority] haketilo_settings: state.HaketiloGlobalSettings @property def current_popup_settings(self) -> state.PopupSettings: return self.haketilo_settings.default_popup_jsallowed def should_process_request( self, request_info: http_messages.BodylessRequestInfo ) -> bool: return self._process_request def should_process_response( self, request_info: http_messages.RequestInfo, response_info: http_messages.AnyResponseInfo ) -> bool: if self._process_response: return True return (self.current_popup_settings.popup_enabled and http_messages.is_likely_a_page(request_info, response_info)) def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \ -> t.Union[t.Sequence[str], t.Literal['all']]: return () def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \ -> t.Mapping[str, t.Sequence[str]]: return Map() def _csp_to_extend(self, http_info: http_messages.FullHTTPInfo) \ -> t.Mapping[str, t.Sequence[str]]: if (self.current_popup_settings.popup_enabled and http_info.is_likely_a_page): return {'script-src': [f"'sha256-{popup_script_sha256_b64}'"]} else: return Map() def _modify_response_headers(self, http_info: http_messages.FullHTTPInfo) \ -> http_messages.IHeaders: csp_to_clear = self._csp_to_clear(http_info) csp_to_add = self._csp_to_add(http_info) csp_to_extend = self._csp_to_extend(http_info) if len(csp_to_clear) + len(csp_to_extend) + len(csp_to_add) == 0: return http_info.response_info.headers return csp.modify( headers = http_info.response_info.headers, clear = csp_to_clear, add = csp_to_add, extend = csp_to_extend ) def _modify_response_document( self, http_info: http_messages.FullHTTPInfo, encoding: t.Optional[str] ) -> t.Union[str, bytes]: popup_settings = self.current_popup_settings if (popup_settings.popup_enabled and http_info.is_likely_a_page): if encoding is None: encoding = 'utf-8' body_bytes = http_info.response_info.body body = body_bytes.decode(encoding, errors='replace') match = doctype_re.match(body) doctype_decl_len = 0 if match is None else match.end() dotype_decl = body[0:doctype_decl_len] doc_rest = body[doctype_decl_len:] return f'{dotype_decl}{doc_rest}' else: return http_info.response_info.body def _modify_response_body(self, http_info: http_messages.FullHTTPInfo) \ -> bytes: if not http_messages.is_likely_a_page( request_info = http_info.request_info, response_info = http_info.response_info ): return http_info.response_info.body data = http_info.response_info.body _, encoding = http_info.response_info.deduce_content_type() # A UTF BOM overrides encoding specified by the header. for bom, encoding_name in BOMs: if data.startswith(bom): encoding = encoding_name new_data = self._modify_response_document(http_info, encoding) if isinstance(new_data, str): # Appending a three-byte Byte Order Mark (BOM) will force the # browser to decode this as UTF-8 regardless of the 'Content-Type' # header. See # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence new_data = UTF8_BOM + new_data.encode() return new_data def consume_request(self, request_info: http_messages.RequestInfo) \ -> t.Optional[MessageInfo]: # We're not using @abstractmethod because not every Policy needs it and # we don't want to force child classes into implementing dummy methods. raise NotImplementedError( 'This kind of policy does not consume requests.' ) def consume_response(self, http_info: http_messages.FullHTTPInfo) \ -> t.Optional[http_messages.ResponseInfo]: try: new_headers = self._modify_response_headers(http_info) new_body = self._modify_response_body(http_info) except Exception as e: # In the future we might want to actually describe eventual errors. # For now, we're just printing the stack trace. import traceback error_info_list = traceback.format_exception( type(e), e, e.__traceback__ ) return http_messages.ResponseInfo.make( status_code = 500, headers = (('Content-Type', 'text/plain; charset=utf-8'),), body = '\n'.join(error_info_list).encode() ) if (new_headers is http_info.response_info.headers and new_body is http_info.response_info.body): return None return dc.replace( http_info.response_info, headers = new_headers, body = new_body ) @dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] class PolicyFactory(ABC): """....""" builtin: bool @abstractmethod def make_policy(self, haketilo_state: state.HaketiloState) \ -> t.Optional[Policy]: """....""" ... def __lt__(self, other: 'PolicyFactory'): """....""" return sorting_keys.get(self.__class__.__name__, 999) < \ sorting_keys.get(other.__class__.__name__, 999) sorting_order = ( 'WebUIMainPolicyFactory', 'WebUILandingPolicyFactory', 'MitmItPagePolicyFactory', 'PayloadResourcePolicyFactory', 'PayloadPolicyFactory', 'RuleBlockPolicyFactory', 'RuleAllowPolicyFactory', 'FallbackPolicyFactory' ) sorting_keys = Map((cls, name) for name, cls in enumerate(sorting_order))