# SPDX-License-Identifier: GPL-3.0-or-later
# Policies for applying payload injections to HTTP requests.
#
# This file is part of Hydrilla&Haketilo.
#
# Copyright (C) 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
.....
"""
# Enable using with Python 3.7.
from __future__ import annotations
import dataclasses as dc
import typing as t
import re
from urllib.parse import urlencode
from itsdangerous.url_safe import URLSafeSerializer
import bs4 # type: ignore
from ...exceptions import HaketiloException
from ...url_patterns import ParsedUrl
from .. import csp
from .. import state
from .. import http_messages
from . import base
@dc.dataclass(frozen=True) # type: ignore[misc]
class PayloadAwarePolicy(base.Policy):
"""...."""
payload_data: state.PayloadData
def assets_base_url(self, request_url: ParsedUrl):
"""...."""
token = self.payload_data.unique_token
base_path_segments = (*self.payload_data.pattern_path_segments, token)
return f'{request_url.url_without_path}/{"/".join(base_path_segments)}/'
def _payload_details_to_signed_query_string(
self,
_salt: str,
**extra_keys: str
) -> str:
params: t.Mapping[str, str] = {
'payload_id': self.payload_data.ref.id,
**extra_keys
}
serializer = URLSafeSerializer(self.payload_data.global_secret, _salt)
return urlencode({'details': serializer.dumps(params)})
@dc.dataclass(frozen=True) # type: ignore[misc]
class PayloadAwarePolicyFactory(base.PolicyFactory):
"""...."""
payload_key: state.PayloadKey
@property
def payload_ref(self) -> state.PayloadRef:
"""...."""
return self.payload_key.ref
def __lt__(self, other: base.PolicyFactory) -> bool:
"""...."""
if isinstance(other, type(self)):
return self.payload_key < other.payload_key
return super().__lt__(other)
# For details of 'Content-Type' header's structure, see:
# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1
content_type_reg = re.compile(r'''
^
(?P[\w-]+/[\w-]+)
\s*
(?:
;
(?:[^;]*;)* # match possible parameter other than "charset"
)
\s*
charset= # no whitespace allowed in parameter as per RFC
(?P
[\w-]+
|
"[\w-]+" # quotes are optional per RFC
)
(?:;[^;]+)* # match possible parameter other than "charset"
$ # forbid possible dangling characters after closing '"'
''', re.VERBOSE | re.IGNORECASE)
def deduce_content_type(headers: http_messages.IHeaders) \
-> tuple[t.Optional[str], t.Optional[str]]:
"""...."""
content_type = headers.get('content-type')
if content_type is None:
return (None, None)
match = content_type_reg.match(content_type)
if match is None:
return (None, None)
mime, encoding = match.group('mime'), match.group('encoding')
if encoding is not None:
encoding = encoding.lower()
return mime, encoding
UTF8_BOM = b'\xEF\xBB\xBF'
BOMs = (
(UTF8_BOM, 'utf-8'),
(b'\xFE\xFF', 'utf-16be'),
(b'\xFF\xFE', 'utf-16le')
)
def block_attr(element: bs4.PageElement, attr_name: str) -> None:
"""
Disable HTML node attributes by prepending `blocked-'. This allows them to
still be relatively easily accessed in case they contain some useful data.
"""
blocked_value = element.attrs.pop(attr_name, None)
while blocked_value is not None:
attr_name = f'blocked-{attr_name}'
next_blocked_value = element.attrs.pop(attr_name, None)
element.attrs[attr_name] = blocked_value
blocked_value = next_blocked_value
@dc.dataclass(frozen=True)
class PayloadInjectPolicy(PayloadAwarePolicy):
"""...."""
process_response: t.ClassVar[bool] = True
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
def _new_csp(self, request_url: ParsedUrl) -> str:
"""...."""
assets_base = self.assets_base_url(request_url)
script_src = f"script-src {assets_base}"
if self.payload_data.eval_allowed:
script_src = f"{script_src} 'unsafe-eval'"
return '; '.join((
script_src,
"script-src-elem 'none'",
"script-src-attr 'none'"
))
def _modify_headers(self, response_info: http_messages.ResponseInfo) \
-> t.Iterable[tuple[bytes, bytes]]:
"""...."""
for header_name, header_value in response_info.headers.items():
if header_name.lower() not in csp.header_names_and_dispositions:
yield header_name.encode(), header_value.encode()
new_csp = self._new_csp(response_info.url)
yield b'Content-Security-Policy', new_csp.encode()
def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]:
"""...."""
base_url = self.assets_base_url(url)
payload_ref = self.payload_data.ref
for path in payload_ref.get_script_paths():
yield base_url + '/'.join(('static', *path))
def _modify_body(
self,
url: ParsedUrl,
body: bytes,
encoding: t.Optional[str]
) -> bytes:
"""...."""
soup = bs4.BeautifulSoup(
markup = body,
from_encoding = encoding,
features = 'html5lib'
)
# Inject scripts.
script_parent = soup.find('body') or soup.find('html')
if script_parent is None:
return body
for script_url in self._script_urls(url):
tag = bs4.Tag(name='script', attrs={'src': script_url})
script_parent.append(tag)
# Remove Content Security Policy that could possibly block injected
# scripts.
for meta in soup.select('head meta[http-equiv]'):
header_name = meta.attrs.get('http-equiv', '').lower().strip()
if header_name in csp.enforce_header_names_set:
block_attr(meta, 'http-equiv')
block_attr(meta, 'content')
# Appending a three-byte Byte Order Mark (BOM) will force the browser to
# decode this as UTF-8 regardless of the 'Content-Type' header. See:
# https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
return UTF8_BOM + soup.encode()
def _consume_response_unsafe(
self,
response_info: http_messages.ResponseInfo
) -> http_messages.ProducedResponse:
"""...."""
new_response = response_info.make_produced_response()
new_headers = self._modify_headers(response_info)
new_response = dc.replace(new_response, headers=new_headers)
mime, encoding = deduce_content_type(response_info.headers)
if mime is None or 'html' not in mime.lower():
return new_response
data = response_info.body
if data is None:
data = b''
# A UTF BOM overrides encoding specified by the header.
for bom, encoding_name in BOMs:
if data.startswith(bom):
encoding = encoding_name
new_data = self._modify_body(response_info.url, data, encoding)
return dc.replace(new_response, body=new_data)
def consume_response(self, response_info: http_messages.ResponseInfo) \
-> http_messages.ProducedResponse:
"""...."""
try:
return self._consume_response_unsafe(response_info)
except Exception as e:
# TODO: actually describe the errors
import traceback
error_info_list = traceback.format_exception(
type(e),
e,
e.__traceback__
)
return http_messages.ProducedResponse(
500,
((b'Content-Type', b'text/plain; charset=utf-8'),),
'\n'.join(error_info_list).encode()
)
class _PayloadHasProblemsError(HaketiloException):
pass
class AutoPayloadInjectPolicy(PayloadInjectPolicy):
"""...."""
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
def consume_response(self, response_info: http_messages.ResponseInfo) \
-> http_messages.ProducedResponse:
try:
if self.payload_data.ref.has_problems():
raise _PayloadHasProblemsError()
self.payload_data.ref.ensure_items_installed()
return super().consume_response(response_info)
except (state.RepoCommunicationError, state.FileInstallationError,
_PayloadHasProblemsError) as ex:
extra_params: dict[str, str] = {
'next_url': response_info.url.orig_url
}
if isinstance(ex, state.FileInstallationError):
extra_params['repo_id'] = ex.repo_id
extra_params['file_sha256'] = ex.sha256
query = self._payload_details_to_signed_query_string(
_salt = 'auto_install_error',
**extra_params
)
redirect_url = 'https://hkt.mitm.it/auto_install_error?' + query
msg = 'Error occured when installing payload. Redirecting.'
return http_messages.ProducedResponse(
status_code = 303,
headers = [(b'Location', redirect_url.encode())],
body = msg.encode()
)
@dc.dataclass(frozen=True)
class PayloadSuggestPolicy(PayloadAwarePolicy):
"""...."""
process_request: t.ClassVar[bool] = True
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
def consume_request(self, request_info: http_messages.RequestInfo) \
-> http_messages.ProducedResponse:
query = self._payload_details_to_signed_query_string(
_salt = 'package_suggestion',
next_url = request_info.url.orig_url
)
redirect_url = 'https://hkt.mitm.it/package_suggestion?' + query
msg = 'A package was found that could be used on this site. Redirecting.'
return http_messages.ProducedResponse(
status_code = 303,
headers = [(b'Location', redirect_url.encode())],
body = msg.encode()
)
@dc.dataclass(frozen=True, unsafe_hash=True)
class PayloadPolicyFactory(PayloadAwarePolicyFactory):
"""...."""
def make_policy(self, haketilo_state: state.HaketiloState) \
-> t.Optional[base.Policy]:
"""...."""
try:
payload_data = self.payload_ref.get_data()
except:
return None
if payload_data.explicitly_enabled:
return PayloadInjectPolicy(payload_data)
mode = haketilo_state.get_settings().mapping_use_mode
if mode == state.MappingUseMode.QUESTION:
return PayloadSuggestPolicy(payload_data)
if mode == state.MappingUseMode.WHEN_ENABLED:
return None
# mode == state.MappingUseMode.AUTO
return AutoPayloadInjectPolicy(payload_data)