diff options
Diffstat (limited to 'src/hydrilla/proxy/policies/payload.py')
-rw-r--r-- | src/hydrilla/proxy/policies/payload.py | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py new file mode 100644 index 0000000..3660eac --- /dev/null +++ b/src/hydrilla/proxy/policies/payload.py @@ -0,0 +1,271 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Policies for applying payload injections to HTTP requests. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this +# code in a proprietary program, I am not going to enforce this in +# court. + +""" +..... +""" + +import dataclasses as dc +import typing as t + +from urllib.parse import urlencode + +from itsdangerous.url_safe import URLSafeSerializer +import bs4 # type: ignore + +from ...exceptions import HaketiloException +from ...url_patterns import ParsedUrl +from .. import csp +from .. import state +from .. import http_messages +from . import base + +@dc.dataclass(frozen=True) # type: ignore[misc] +class PayloadAwarePolicy(base.Policy): + """....""" + payload_data: state.PayloadData + + def _assets_base_url(self, url: ParsedUrl) -> str: + token = self.payload_data.unique_token + + base_path_segments = (*self.payload_data.pattern_path_segments, token) + + return f'{url.url_without_path}/{"/".join(base_path_segments)}/' + + def _payload_details_to_signed_query_string( + self, + _salt: str, + **extra_keys: str + ) -> str: + params: t.Mapping[str, str] = { + 'payload_id': self.payload_data.ref.id, + **extra_keys + } + + serializer = URLSafeSerializer(self.payload_data.global_secret, _salt) + + return urlencode({'details': serializer.dumps(params)}) + + +@dc.dataclass(frozen=True) # type: ignore[misc] +class PayloadAwarePolicyFactory(base.PolicyFactory): + """....""" + payload_key: state.PayloadKey + + @property + def payload_ref(self) -> state.PayloadRef: + """....""" + return self.payload_key.ref + + def __lt__(self, other: base.PolicyFactory) -> bool: + """....""" + if isinstance(other, type(self)): + return self.payload_key < other.payload_key + + return super().__lt__(other) + + +def block_attr(element: bs4.PageElement, attr_name: str) -> None: + """ + Disable HTML node attributes by prepending `blocked-'. This allows them to + still be relatively easily accessed in case they contain some useful data. + """ + blocked_value = element.attrs.pop(attr_name, None) + + while blocked_value is not None: + attr_name = f'blocked-{attr_name}' + next_blocked_value = element.attrs.pop(attr_name, None) + element.attrs[attr_name] = blocked_value + + blocked_value = next_blocked_value + +@dc.dataclass(frozen=True) +class PayloadInjectPolicy(PayloadAwarePolicy): + _process_response = base.MsgProcessOpt.MUST + + priority = base.PolicyPriority._TWO + + @property + def current_popup_settings(self) -> state.PopupSettings: + return self.haketilo_settings.default_popup_payloadon + + def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Sequence[str]: + return ['script-src'] + + def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Mapping[str, t.Sequence[str]]: + allowed_origins = [self._assets_base_url(http_info.request_info.url)] + + if self.payload_data.eval_allowed: + allowed_origins.append("'unsafe-eval'") + + return { + 'script-src': allowed_origins, + 'script-src-elem': ["'none'"], + 'script-src-attr': ["'none'"] + } + + def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]: + base_url = self._assets_base_url(url) + payload_ref = self.payload_data.ref + + yield base_url + 'api/page_init_script.js' + + for path in payload_ref.get_script_paths(): + yield base_url + '/'.join(('static', *path)) + + def _modify_response_document( + self, + http_info: http_messages.FullHTTPInfo, + encoding: t.Optional[str] + ) -> t.Union[bytes, str]: + markup = super()._modify_response_document(http_info, encoding) + if isinstance(markup, str): + encoding = None + + soup = bs4.BeautifulSoup( + markup = markup, + from_encoding = encoding, + features = 'html5lib' + ) + + # Inject scripts. + script_parent = soup.find('body') or soup.find('html') + if script_parent is None: + return http_info.response_info.body + + for script_url in self._script_urls(http_info.request_info.url): + tag = bs4.Tag(name='script', attrs={'src': script_url}) + script_parent.append(tag) + + # Remove Content Security Policy that could possibly block injected + # scripts. + for meta in soup.select('head meta[http-equiv]'): + header_name = meta.attrs.get('http-equiv', '').lower().strip() + if header_name in csp.enforce_header_names: + block_attr(meta, 'http-equiv') + block_attr(meta, 'content') + + return soup.decode() + + def make_info_page(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Optional[str]: + return self._get_info_template('payload_info.html.jinja').render( + url = http_info.request_info.url.orig_url, + payload_data = self.payload_data + ) + + +class _PayloadHasProblemsError(HaketiloException): + pass + +class AutoPayloadInjectPolicy(PayloadInjectPolicy): + priority = base.PolicyPriority._ONE + + def consume_response(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Optional[http_messages.ResponseInfo]: + try: + if self.payload_data.ref.has_problems(): + raise _PayloadHasProblemsError() + + self.payload_data.ref.ensure_items_installed() + + return super().consume_response(http_info) + except (state.RepoCommunicationError, state.FileInstallationError, + _PayloadHasProblemsError) as ex: + extra_params: dict[str, str] = { + 'next_url': http_info.response_info.url.orig_url + } + if isinstance(ex, state.FileInstallationError): + extra_params['repo_id'] = ex.repo_id + extra_params['file_sha256'] = ex.sha256 + + query = self._payload_details_to_signed_query_string( + _salt = 'auto_install_error', + **extra_params + ) + + redirect_url = 'https://hkt.mitm.it/auto_install_error?' + query + msg = 'Error occured when installing payload. Redirecting.' + + return http_messages.ResponseInfo.make( + status_code = 303, + headers = [('Location', redirect_url)], + body = msg.encode() + ) + + +@dc.dataclass(frozen=True) +class PayloadSuggestPolicy(PayloadAwarePolicy): + _process_request = base.MsgProcessOpt.MUST + _process_response = base.MsgProcessOpt.MUST_NOT + + priority = base.PolicyPriority._ONE + + def consume_request(self, request_info: http_messages.RequestInfo) \ + -> http_messages.ResponseInfo: + query = self._payload_details_to_signed_query_string( + _salt = 'package_suggestion', + next_url = request_info.url.orig_url + ) + + redirect_url = 'https://hkt.mitm.it/package_suggestion?' + query + msg = 'A package was found that could be used on this site. Redirecting.' + + return http_messages.ResponseInfo.make( + status_code = 303, + headers = [('Location', redirect_url)], + body = msg.encode() + ) + + +@dc.dataclass(frozen=True, unsafe_hash=True) +class PayloadPolicyFactory(PayloadAwarePolicyFactory): + """....""" + def make_policy(self, haketilo_state: state.HaketiloState) \ + -> t.Optional[base.Policy]: + haketilo_settings = haketilo_state.get_settings() + + try: + payload_data = self.payload_ref.get_data() + except: + return None + + if payload_data.explicitly_enabled: + return PayloadInjectPolicy(haketilo_settings, payload_data) + + mode = haketilo_settings.mapping_use_mode + + if mode == state.MappingUseMode.QUESTION: + return PayloadSuggestPolicy(haketilo_settings, payload_data) + + if mode == state.MappingUseMode.WHEN_ENABLED: + return None + + # mode == state.MappingUseMode.AUTO + return AutoPayloadInjectPolicy(haketilo_settings, payload_data) |