aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/policies/payload.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/policies/payload.py')
-rw-r--r--src/hydrilla/proxy/policies/payload.py271
1 files changed, 271 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py
new file mode 100644
index 0000000..3660eac
--- /dev/null
+++ b/src/hydrilla/proxy/policies/payload.py
@@ -0,0 +1,271 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Policies for applying payload injections to HTTP requests.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this
+# code in a proprietary program, I am not going to enforce this in
+# court.
+
+"""
+.....
+"""
+
+import dataclasses as dc
+import typing as t
+
+from urllib.parse import urlencode
+
+from itsdangerous.url_safe import URLSafeSerializer
+import bs4 # type: ignore
+
+from ...exceptions import HaketiloException
+from ...url_patterns import ParsedUrl
+from .. import csp
+from .. import state
+from .. import http_messages
+from . import base
+
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class PayloadAwarePolicy(base.Policy):
+ """...."""
+ payload_data: state.PayloadData
+
+ def _assets_base_url(self, url: ParsedUrl) -> str:
+ token = self.payload_data.unique_token
+
+ base_path_segments = (*self.payload_data.pattern_path_segments, token)
+
+ return f'{url.url_without_path}/{"/".join(base_path_segments)}/'
+
+ def _payload_details_to_signed_query_string(
+ self,
+ _salt: str,
+ **extra_keys: str
+ ) -> str:
+ params: t.Mapping[str, str] = {
+ 'payload_id': self.payload_data.ref.id,
+ **extra_keys
+ }
+
+ serializer = URLSafeSerializer(self.payload_data.global_secret, _salt)
+
+ return urlencode({'details': serializer.dumps(params)})
+
+
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class PayloadAwarePolicyFactory(base.PolicyFactory):
+ """...."""
+ payload_key: state.PayloadKey
+
+ @property
+ def payload_ref(self) -> state.PayloadRef:
+ """...."""
+ return self.payload_key.ref
+
+ def __lt__(self, other: base.PolicyFactory) -> bool:
+ """...."""
+ if isinstance(other, type(self)):
+ return self.payload_key < other.payload_key
+
+ return super().__lt__(other)
+
+
+def block_attr(element: bs4.PageElement, attr_name: str) -> None:
+ """
+ Disable HTML node attributes by prepending `blocked-'. This allows them to
+ still be relatively easily accessed in case they contain some useful data.
+ """
+ blocked_value = element.attrs.pop(attr_name, None)
+
+ while blocked_value is not None:
+ attr_name = f'blocked-{attr_name}'
+ next_blocked_value = element.attrs.pop(attr_name, None)
+ element.attrs[attr_name] = blocked_value
+
+ blocked_value = next_blocked_value
+
+@dc.dataclass(frozen=True)
+class PayloadInjectPolicy(PayloadAwarePolicy):
+ _process_response = base.MsgProcessOpt.MUST
+
+ priority = base.PolicyPriority._TWO
+
+ @property
+ def current_popup_settings(self) -> state.PopupSettings:
+ return self.haketilo_settings.default_popup_payloadon
+
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Sequence[str]:
+ return ['script-src']
+
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ allowed_origins = [self._assets_base_url(http_info.request_info.url)]
+
+ if self.payload_data.eval_allowed:
+ allowed_origins.append("'unsafe-eval'")
+
+ return {
+ 'script-src': allowed_origins,
+ 'script-src-elem': ["'none'"],
+ 'script-src-attr': ["'none'"]
+ }
+
+ def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]:
+ base_url = self._assets_base_url(url)
+ payload_ref = self.payload_data.ref
+
+ yield base_url + 'api/page_init_script.js'
+
+ for path in payload_ref.get_script_paths():
+ yield base_url + '/'.join(('static', *path))
+
+ def _modify_response_document(
+ self,
+ http_info: http_messages.FullHTTPInfo,
+ encoding: t.Optional[str]
+ ) -> t.Union[bytes, str]:
+ markup = super()._modify_response_document(http_info, encoding)
+ if isinstance(markup, str):
+ encoding = None
+
+ soup = bs4.BeautifulSoup(
+ markup = markup,
+ from_encoding = encoding,
+ features = 'html5lib'
+ )
+
+ # Inject scripts.
+ script_parent = soup.find('body') or soup.find('html')
+ if script_parent is None:
+ return http_info.response_info.body
+
+ for script_url in self._script_urls(http_info.request_info.url):
+ tag = bs4.Tag(name='script', attrs={'src': script_url})
+ script_parent.append(tag)
+
+ # Remove Content Security Policy that could possibly block injected
+ # scripts.
+ for meta in soup.select('head meta[http-equiv]'):
+ header_name = meta.attrs.get('http-equiv', '').lower().strip()
+ if header_name in csp.enforce_header_names:
+ block_attr(meta, 'http-equiv')
+ block_attr(meta, 'content')
+
+ return soup.decode()
+
+ def make_info_page(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[str]:
+ return self._get_info_template('payload_info.html.jinja').render(
+ url = http_info.request_info.url.orig_url,
+ payload_data = self.payload_data
+ )
+
+
+class _PayloadHasProblemsError(HaketiloException):
+ pass
+
+class AutoPayloadInjectPolicy(PayloadInjectPolicy):
+ priority = base.PolicyPriority._ONE
+
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[http_messages.ResponseInfo]:
+ try:
+ if self.payload_data.ref.has_problems():
+ raise _PayloadHasProblemsError()
+
+ self.payload_data.ref.ensure_items_installed()
+
+ return super().consume_response(http_info)
+ except (state.RepoCommunicationError, state.FileInstallationError,
+ _PayloadHasProblemsError) as ex:
+ extra_params: dict[str, str] = {
+ 'next_url': http_info.response_info.url.orig_url
+ }
+ if isinstance(ex, state.FileInstallationError):
+ extra_params['repo_id'] = ex.repo_id
+ extra_params['file_sha256'] = ex.sha256
+
+ query = self._payload_details_to_signed_query_string(
+ _salt = 'auto_install_error',
+ **extra_params
+ )
+
+ redirect_url = 'https://hkt.mitm.it/auto_install_error?' + query
+ msg = 'Error occured when installing payload. Redirecting.'
+
+ return http_messages.ResponseInfo.make(
+ status_code = 303,
+ headers = [('Location', redirect_url)],
+ body = msg.encode()
+ )
+
+
+@dc.dataclass(frozen=True)
+class PayloadSuggestPolicy(PayloadAwarePolicy):
+ _process_request = base.MsgProcessOpt.MUST
+ _process_response = base.MsgProcessOpt.MUST_NOT
+
+ priority = base.PolicyPriority._ONE
+
+ def consume_request(self, request_info: http_messages.RequestInfo) \
+ -> http_messages.ResponseInfo:
+ query = self._payload_details_to_signed_query_string(
+ _salt = 'package_suggestion',
+ next_url = request_info.url.orig_url
+ )
+
+ redirect_url = 'https://hkt.mitm.it/package_suggestion?' + query
+ msg = 'A package was found that could be used on this site. Redirecting.'
+
+ return http_messages.ResponseInfo.make(
+ status_code = 303,
+ headers = [('Location', redirect_url)],
+ body = msg.encode()
+ )
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class PayloadPolicyFactory(PayloadAwarePolicyFactory):
+ """...."""
+ def make_policy(self, haketilo_state: state.HaketiloState) \
+ -> t.Optional[base.Policy]:
+ haketilo_settings = haketilo_state.get_settings()
+
+ try:
+ payload_data = self.payload_ref.get_data()
+ except:
+ return None
+
+ if payload_data.explicitly_enabled:
+ return PayloadInjectPolicy(haketilo_settings, payload_data)
+
+ mode = haketilo_settings.mapping_use_mode
+
+ if mode == state.MappingUseMode.QUESTION:
+ return PayloadSuggestPolicy(haketilo_settings, payload_data)
+
+ if mode == state.MappingUseMode.WHEN_ENABLED:
+ return None
+
+ # mode == state.MappingUseMode.AUTO
+ return AutoPayloadInjectPolicy(haketilo_settings, payload_data)