aboutsummaryrefslogtreecommitdiff
# SPDX-License-Identifier: GPL-3.0-or-later

# Policies for applying payload injections to HTTP requests.
#
# This file is part of Hydrilla&Haketilo.
#
# Copyright (C) 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use of this
# code in a proprietary program, I am not going to enforce this in
# court.

"""
.....
"""

import dataclasses as dc
import typing as t

from urllib.parse import urlencode

from itsdangerous.url_safe import URLSafeSerializer
import bs4 # type: ignore

from ...exceptions import HaketiloException
from ...url_patterns import ParsedUrl
from .. import csp
from .. import state
from .. import http_messages
from . import base

@dc.dataclass(frozen=True) # type: ignore[misc]
class PayloadAwarePolicy(base.Policy):
    """...."""
    payload_data:   state.PayloadData

    def _assets_base_url(self, url: ParsedUrl) -> str:
        token = self.payload_data.unique_token

        base_path_segments = (*self.payload_data.pattern_path_segments, token)

        return f'{url.url_without_path}/{"/".join(base_path_segments)}/'

    def _payload_details_to_signed_query_string(
            self,
            _salt:        str,
            **extra_keys: str
    ) -> str:
        params: t.Mapping[str, str] = {
            'payload_id': self.payload_data.ref.id,
            **extra_keys
        }

        serializer = URLSafeSerializer(self.payload_data.global_secret, _salt)

        return urlencode({'details': serializer.dumps(params)})


@dc.dataclass(frozen=True) # type: ignore[misc]
class PayloadAwarePolicyFactory(base.PolicyFactory):
    """...."""
    payload_key: state.PayloadKey

    @property
    def payload_ref(self) -> state.PayloadRef:
        """...."""
        return self.payload_key.ref

    def __lt__(self, other: base.PolicyFactory) -> bool:
        """...."""
        if isinstance(other, type(self)):
            return self.payload_key < other.payload_key

        return super().__lt__(other)


def block_attr(element: bs4.PageElement, attr_name: str) -> None:
    """
    Disable HTML node attributes by prepending `blocked-'. This allows them to
    still be relatively easily accessed in case they contain some useful data.
    """
    blocked_value = element.attrs.pop(attr_name, None)

    while blocked_value is not None:
        attr_name = f'blocked-{attr_name}'
        next_blocked_value = element.attrs.pop(attr_name, None)
        element.attrs[attr_name] = blocked_value

        blocked_value = next_blocked_value

@dc.dataclass(frozen=True)
class PayloadInjectPolicy(PayloadAwarePolicy):
    _process_response = base.MsgProcessOpt.MUST

    priority = base.PolicyPriority._TWO

    @property
    def current_popup_settings(self) -> state.PopupSettings:
        return self.haketilo_settings.default_popup_payloadon

    def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
        -> t.Sequence[str]:
        return ['script-src']

    def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
        -> t.Mapping[str, t.Sequence[str]]:
        allowed_origins = [self._assets_base_url(http_info.request_info.url)]

        if self.payload_data.eval_allowed:
            allowed_origins.append("'unsafe-eval'")

        return {
            'script-src': allowed_origins,
            'script-src-elem': ["'none'"],
            'script-src-attr': ["'none'"]
        }

    def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]:
        base_url = self._assets_base_url(url)
        payload_ref = self.payload_data.ref

        yield base_url + 'api/page_init_script.js'

        for path in payload_ref.get_script_paths():
            yield base_url + '/'.join(('static', *path))

    def _modify_response_document(
            self,
            http_info: http_messages.FullHTTPInfo,
            encoding:  t.Optional[str]
    ) -> t.Union[bytes, str]:
        markup = super()._modify_response_document(http_info, encoding)
        if isinstance(markup, str):
            encoding = None

        soup = bs4.BeautifulSoup(
            markup        = markup,
            from_encoding = encoding,
            features      = 'html5lib'
        )

        # Inject scripts.
        script_parent = soup.find('body') or soup.find('html')
        if script_parent is None:
            return http_info.response_info.body

        for script_url in self._script_urls(http_info.request_info.url):
            tag = bs4.Tag(name='script', attrs={'src': script_url})
            script_parent.append(tag)

        # Remove Content Security Policy that could possibly block injected
        # scripts.
        for meta in soup.select('head meta[http-equiv]'):
            header_name = meta.attrs.get('http-equiv', '').lower().strip()
            if header_name in csp.enforce_header_names:
                block_attr(meta, 'http-equiv')
                block_attr(meta, 'content')

        return soup.decode()

    def make_info_page(self, http_info: http_messages.FullHTTPInfo) \
        -> t.Optional[str]:
        return self._get_info_template('payload_info.html.jinja').render(
            url          = http_info.request_info.url.orig_url,
            payload_data = self.payload_data
        )


class _PayloadHasProblemsError(HaketiloException):
    pass

class AutoPayloadInjectPolicy(PayloadInjectPolicy):
    priority = base.PolicyPriority._ONE

    def consume_response(self, http_info: http_messages.FullHTTPInfo) \
        -> t.Optional[http_messages.ResponseInfo]:
        try:
            if self.payload_data.ref.has_problems():
                raise _PayloadHasProblemsError()

            self.payload_data.ref.ensure_items_installed()

            return super().consume_response(http_info)
        except (state.RepoCommunicationError, state.FileInstallationError,
                _PayloadHasProblemsError) as ex:
            extra_params: dict[str, str] = {
                'next_url': http_info.response_info.url.orig_url
            }
            if isinstance(ex, state.FileInstallationError):
                extra_params['repo_id']     = ex.repo_id
                extra_params['file_sha256'] = ex.sha256

            query = self._payload_details_to_signed_query_string(
                _salt = 'auto_install_error',
                **extra_params
            )

            redirect_url = 'https://hkt.mitm.it/auto_install_error?' + query
            msg = 'Error occured when installing payload. Redirecting.'

            return http_messages.ResponseInfo.make(
                status_code = 303,
                headers     = [('Location', redirect_url)],
                body        = msg.encode()
            )


@dc.dataclass(frozen=True)
class PayloadSuggestPolicy(PayloadAwarePolicy):
    _process_request  = base.MsgProcessOpt.MUST
    _process_response = base.MsgProcessOpt.MUST_NOT

    priority = base.PolicyPriority._ONE

    def consume_request(self, request_info: http_messages.RequestInfo) \
        -> http_messages.ResponseInfo:
        query = self._payload_details_to_signed_query_string(
            _salt    = 'package_suggestion',
            next_url = request_info.url.orig_url
        )

        redirect_url = 'https://hkt.mitm.it/package_suggestion?' + query
        msg = 'A package was found that could be used on this site. Redirecting.'

        return http_messages.ResponseInfo.make(
            status_code = 303,
            headers     = [('Location', redirect_url)],
            body        = msg.encode()
        )


@dc.dataclass(frozen=True, unsafe_hash=True)
class PayloadPolicyFactory(PayloadAwarePolicyFactory):
    """...."""
    def make_policy(self, haketilo_state: state.HaketiloState) \
        -> t.Optional[base.Policy]:
        haketilo_settings = haketilo_state.get_settings()

        try:
            payload_data = self.payload_ref.get_data()
        except:
            return None

        if payload_data.explicitly_enabled:
            return PayloadInjectPolicy(haketilo_settings, payload_data)

        mode = haketilo_settings.mapping_use_mode

        if mode == state.MappingUseMode.QUESTION:
            return PayloadSuggestPolicy(haketilo_settings, payload_data)

        if mode == state.MappingUseMode.WHEN_ENABLED:
            return None

        # mode == state.MappingUseMode.AUTO
        return AutoPayloadInjectPolicy(haketilo_settings, payload_data)