diff options
author | Wojtek Kosior <koszko@koszko.org> | 2022-07-27 15:56:24 +0200 |
---|---|---|
committer | Wojtek Kosior <koszko@koszko.org> | 2022-08-10 17:25:05 +0200 |
commit | 879c41927171efc8d77d1de2739b18e2eb57580f (patch) | |
tree | de0e78afe2ea49e58c9bf2c662657392a00139ee /src/hydrilla/proxy/policies | |
parent | 52d12a4fa124daa1595529e3e7008276a7986d95 (diff) | |
download | haketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.tar.gz haketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.zip |
unfinished partial work
Diffstat (limited to 'src/hydrilla/proxy/policies')
-rw-r--r-- | src/hydrilla/proxy/policies/__init__.py | 15 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/base.py | 177 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/fallback.py | 60 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/payload.py | 315 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/payload_resource.py | 160 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/rule.py | 134 |
6 files changed, 861 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/policies/__init__.py b/src/hydrilla/proxy/policies/__init__.py new file mode 100644 index 0000000..66e07ee --- /dev/null +++ b/src/hydrilla/proxy/policies/__init__.py @@ -0,0 +1,15 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +from .base import * + +from .payload import PayloadPolicyFactory + +from .payload_resource import PayloadResourcePolicyFactory + +from .rule import RuleBlockPolicyFactory, RuleAllowPolicyFactory + +from .fallback import FallbackAllowPolicy, FallbackBlockPolicy, ErrorBlockPolicy diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py new file mode 100644 index 0000000..3bde6f2 --- /dev/null +++ b/src/hydrilla/proxy/policies/base.py @@ -0,0 +1,177 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Base defintions for policies for altering HTTP requests. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +..... +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import sys + +if sys.version_info >= (3, 8): + from typing import Protocol +else: + from typing_extensions import Protocol + +import dataclasses as dc +import typing as t +import enum + +from abc import ABC, abstractmethod + +from immutables import Map + +from ...url_patterns import ParsedUrl +from .. import state + + +class PolicyPriority(int, enum.Enum): + """....""" + _ONE = 1 + _TWO = 2 + _THREE = 3 + +DefaultGetValue = t.TypeVar('DefaultGetValue', object, None) + +class IHeaders(Protocol): + """....""" + def __getitem__(self, key: str) -> str: ... + + def get_all(self, key: str) -> t.Iterable[str]: ... + + def get(self, key: str, default: DefaultGetValue = None) \ + -> t.Union[str, DefaultGetValue]: ... + + def items(self) -> t.Iterable[tuple[str, str]]: ... + +def encode_headers_items(headers: t.Iterable[tuple[str, str]]) \ + -> t.Iterable[tuple[bytes, bytes]]: + """....""" + for name, value in headers: + yield name.encode(), value.encode() + +@dc.dataclass(frozen=True) +class ProducedRequest: + """....""" + url: str + method: str + headers: t.Iterable[tuple[bytes, bytes]] + body: bytes + +@dc.dataclass(frozen=True) +class RequestInfo: + """....""" + url: ParsedUrl + method: str + headers: IHeaders + body: bytes + + def make_produced_request(self) -> ProducedRequest: + """....""" + return ProducedRequest( + url = self.url.orig_url, + method = self.method, + headers = encode_headers_items(self.headers.items()), + body = self.body + ) + +@dc.dataclass(frozen=True) +class ProducedResponse: + """....""" + status_code: int + headers: t.Iterable[tuple[bytes, bytes]] + body: bytes + +@dc.dataclass(frozen=True) +class ResponseInfo: + """....""" + url: ParsedUrl + status_code: int + headers: IHeaders + body: bytes + + def make_produced_response(self) -> ProducedResponse: + """....""" + return ProducedResponse( + status_code = self.status_code, + headers = encode_headers_items(self.headers.items()), + body = self.body + ) + +class Policy(ABC): + """....""" + process_request: t.ClassVar[bool] = False + process_response: t.ClassVar[bool] = False + + priority: t.ClassVar[PolicyPriority] + + @property + def anticache(self) -> bool: + return self.process_request or self.process_response + + def consume_request(self, request_info: RequestInfo) \ + -> t.Optional[t.Union[ProducedRequest, ProducedResponse]]: + """....""" + return None + + def consume_response(self, response_info: ResponseInfo) \ + -> t.Optional[ProducedResponse]: + """....""" + return None + + +# mypy needs to be corrected: +# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704 +@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] +class PolicyFactory(ABC): + """....""" + builtin: bool + + @abstractmethod + def make_policy(self, haketilo_state: state.HaketiloState) \ + -> t.Optional[Policy]: + """....""" + ... + + def __lt__(self, other: 'PolicyFactory'): + """....""" + return sorting_keys.get(self.__class__.__name__, 999) < \ + sorting_keys.get(other.__class__.__name__, 999) + +sorting_order = ( + 'PayloadResourcePolicyFactory', + + 'PayloadPolicyFactory', + + 'RuleBlockPolicyFactory', + 'RuleAllowPolicyFactory', + + 'FallbackPolicyFactory' +) + +sorting_keys = Map((cls, name) for name, cls in enumerate(sorting_order)) diff --git a/src/hydrilla/proxy/policies/fallback.py b/src/hydrilla/proxy/policies/fallback.py new file mode 100644 index 0000000..75da61c --- /dev/null +++ b/src/hydrilla/proxy/policies/fallback.py @@ -0,0 +1,60 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Policies for blocking and allowing JS when no other policies match. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +..... +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import dataclasses as dc +import typing as t +import enum + +from abc import ABC, abstractmethod + +from .. import state +from . import base +from .rule import AllowPolicy, BlockPolicy + + +class FallbackAllowPolicy(AllowPolicy): + """.....""" + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE + + +class FallbackBlockPolicy(BlockPolicy): + """....""" + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE + + +@dc.dataclass(frozen=True) +class ErrorBlockPolicy(BlockPolicy): + """....""" + error: Exception + + builtin: bool = True diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py new file mode 100644 index 0000000..d616f1b --- /dev/null +++ b/src/hydrilla/proxy/policies/payload.py @@ -0,0 +1,315 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Policies for applying payload injections to HTTP requests. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +..... +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import dataclasses as dc +import typing as t +import re + +import bs4 # type: ignore + +from ...url_patterns import ParsedUrl +from .. import state +from .. import csp +from . import base + +@dc.dataclass(frozen=True) # type: ignore[misc] +class PayloadAwarePolicy(base.Policy): + """....""" + haketilo_state: state.HaketiloState + payload_data: state.PayloadData + + def assets_base_url(self, request_url: ParsedUrl): + """....""" + token = self.payload_data.unique_token + + base_path_segments = (*self.payload_data.pattern.path_segments, token) + + return f'{request_url.url_without_path}/{"/".join(base_path_segments)}/' + + +@dc.dataclass(frozen=True) # type: ignore[misc] +class PayloadAwarePolicyFactory(base.PolicyFactory): + """....""" + payload_key: state.PayloadKey + + @property + def payload_ref(self) -> state.PayloadRef: + """....""" + return self.payload_key.payload_ref + + def __lt__(self, other: base.PolicyFactory) -> bool: + """....""" + if isinstance(other, type(self)): + return self.payload_key < other.payload_key + + return super().__lt__(other) + + +# For details of 'Content-Type' header's structure, see: +# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1 +content_type_reg = re.compile(r''' +^ +(?P<mime>[\w-]+/[\w-]+) +\s* +(?: + ; + (?:[^;]*;)* # match possible parameter other than "charset" +) +\s* +charset= # no whitespace allowed in parameter as per RFC +(?P<encoding> + [\w-]+ + | + "[\w-]+" # quotes are optional per RFC +) +(?:;[^;]+)* # match possible parameter other than "charset" +$ # forbid possible dangling characters after closing '"' +''', re.VERBOSE | re.IGNORECASE) + +def deduce_content_type(headers: base.IHeaders) \ + -> tuple[t.Optional[str], t.Optional[str]]: + """....""" + content_type = headers.get('content-type') + if content_type is None: + return (None, None) + + match = content_type_reg.match(content_type) + if match is None: + return (None, None) + + mime, encoding = match.group('mime'), match.group('encoding') + + if encoding is not None: + encoding = encoding.lower() + + return mime, encoding + +UTF8_BOM = b'\xEF\xBB\xBF' +BOMs = ( + (UTF8_BOM, 'utf-8'), + (b'\xFE\xFF', 'utf-16be'), + (b'\xFF\xFE', 'utf-16le') +) + +def block_attr(element: bs4.PageElement, attr_name: str) -> None: + """ + Disable HTML node attributes by prepending `blocked-'. This allows them to + still be relatively easily accessed in case they contain some useful data. + """ + blocked_value = element.attrs.pop(attr_name, None) + + while blocked_value is not None: + attr_name = f'blocked-{attr_name}' + next_blocked_value = element.attrs.pop(attr_name, None) + element.attrs[attr_name] = blocked_value + + blocked_value = next_blocked_value + +@dc.dataclass(frozen=True) +class PayloadInjectPolicy(PayloadAwarePolicy): + """....""" + process_response: t.ClassVar[bool] = True + + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO + + def _new_csp(self, request_url: ParsedUrl) -> str: + """....""" + assets_base = self.assets_base_url(request_url) + + script_src = f"script-src {assets_base}" + + if self.payload_data.eval_allowed: + script_src = f"{script_src} 'unsafe-eval'" + + return '; '.join(( + script_src, + "script-src-elem 'none'", + "script-src-attr 'none'" + )) + + def _modify_headers(self, response_info: base.ResponseInfo) \ + -> t.Iterable[tuple[bytes, bytes]]: + """....""" + for header_name, header_value in response_info.headers.items(): + if header_name.lower() not in csp.header_names_and_dispositions: + yield header_name.encode(), header_value.encode() + + new_csp = self._new_csp(response_info.url) + + yield b'Content-Security-Policy', new_csp.encode() + + def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]: + """....""" + base_url = self.assets_base_url(url) + payload_ref = self.payload_data.payload_ref + + for path in payload_ref.get_script_paths(self.haketilo_state): + yield base_url + '/'.join(('static', *path)) + + def _modify_body( + self, + url: ParsedUrl, + body: bytes, + encoding: t.Optional[str] + ) -> bytes: + """....""" + soup = bs4.BeautifulSoup( + markup = body, + from_encoding = encoding, + features = 'html5lib' + ) + + # Inject scripts. + script_parent = soup.find('body') or soup.find('html') + if script_parent is None: + return body + + for script_url in self._script_urls(url): + tag = bs4.Tag(name='script', attrs={'src': script_url}) + script_parent.append(tag) + + # Remove Content Security Policy that could possibly block injected + # scripts. + for meta in soup.select('head meta[http-equiv]'): + header_name = meta.attrs.get('http-equiv', '').lower().strip() + if header_name in csp.enforce_header_names_set: + block_attr(meta, 'http-equiv') + block_attr(meta, 'content') + + # Appending a three-byte Byte Order Mark (BOM) will force the browser to + # decode this as UTF-8 regardless of the 'Content-Type' header. See: + # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence + return UTF8_BOM + soup.encode() + + def _consume_response_unsafe(self, response_info: base.ResponseInfo) \ + -> base.ProducedResponse: + """....""" + new_response = response_info.make_produced_response() + + new_headers = self._modify_headers(response_info) + + new_response = dc.replace(new_response, headers=new_headers) + + mime, encoding = deduce_content_type(response_info.headers) + if mime is None or 'html' not in mime.lower(): + return new_response + + data = response_info.body + if data is None: + data = b'' + + # A UTF BOM overrides encoding specified by the header. + for bom, encoding_name in BOMs: + if data.startswith(bom): + encoding = encoding_name + + new_data = self._modify_body(response_info.url, data, encoding) + + return dc.replace(new_response, body=new_data) + + def consume_response(self, response_info: base.ResponseInfo) \ + -> base.ProducedResponse: + """....""" + try: + return self._consume_response_unsafe(response_info) + except Exception as e: + # TODO: actually describe the errors + import traceback + + error_info_list = traceback.format_exception( + type(e), + e, + e.__traceback__ + ) + + return base.ProducedResponse( + 500, + ((b'Content-Type', b'text/plain; charset=utf-8'),), + '\n'.join(error_info_list).encode() + ) + + +class AutoPayloadInjectPolicy(PayloadInjectPolicy): + """....""" + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE + + def _modify_body( + self, + url: ParsedUrl, + body: bytes, + encoding: t.Optional[str] + ) -> bytes: + """....""" + payload_ref = self.payload_data.payload_ref + mapping_ref = payload_ref.get_mapping(self.haketilo_state) + mapping_ref.enable(self.haketilo_state) + + return super()._modify_body(url, body, encoding) + + +@dc.dataclass(frozen=True) +class PayloadSuggestPolicy(PayloadAwarePolicy): + """....""" + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE + + def make_response(self, request_info: base.RequestInfo) \ + -> base.ProducedResponse: + """....""" + # TODO: implement + return base.ProducedResponse(200, ((b'a', b'b'),), b'') + + +@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] +class PayloadPolicyFactory(PayloadAwarePolicyFactory): + """....""" + def make_policy(self, haketilo_state: state.HaketiloState) \ + -> t.Optional[base.Policy]: + """....""" + try: + payload_data = self.payload_ref.get_data(haketilo_state) + except: + return None + + if payload_data.explicitly_enabled: + return PayloadInjectPolicy(haketilo_state, payload_data) + + mode = haketilo_state.get_settings().mapping_use_mode + + if mode == state.MappingUseMode.QUESTION: + return PayloadSuggestPolicy(haketilo_state, payload_data) + + if mode == state.MappingUseMode.WHEN_ENABLED: + return None + + # mode == state.MappingUseMode.AUTO + return AutoPayloadInjectPolicy(haketilo_state, payload_data) diff --git a/src/hydrilla/proxy/policies/payload_resource.py b/src/hydrilla/proxy/policies/payload_resource.py new file mode 100644 index 0000000..84d0919 --- /dev/null +++ b/src/hydrilla/proxy/policies/payload_resource.py @@ -0,0 +1,160 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Policies for resolving HTTP requests with local resources. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +..... + +We make file resources available to HTTP clients by mapping them +at: + http(s)://<pattern-matching_origin>/<pattern_path>/<token>/ +where <token> is a per-session secret unique for every mapping. +For example, a payload with pattern like the following: + http*://***.example.com/a/b/** +Could cause resources to be mapped (among others) at each of: + https://example.com/a/b/**/Da2uiF2UGfg/ + https://www.example.com/a/b/**/Da2uiF2UGfg/ + http://gnome.vs.kde.example.com/a/b/**/Da2uiF2UGfg/ + +Unauthorized web pages running in the user's browser are exected to be +unable to guess the secret. This way we stop them from spying on the +user and from interfering with Haketilo's normal operation. + +This is only a soft prevention method. With some mechanisms +(e.g. service workers), under certain scenarios, it might be possible +to bypass it. Thus, to make the risk slightly smaller, we also block +the unauthorized accesses that we can detect. + +Since a web page authorized to access the resources may only be served +when the corresponding mapping is enabled (or AUTO mode is on), we +consider accesses to non-enabled mappings' resources a security breach +and block them by responding with 403 Not Found. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import dataclasses as dc +import typing as t + +from ...translations import smart_gettext as _ +from .. import state +from . import base +from .payload import PayloadAwarePolicy, PayloadAwarePolicyFactory + + +@dc.dataclass(frozen=True) +class PayloadResourcePolicy(PayloadAwarePolicy): + """....""" + process_request: t.ClassVar[bool] = True + + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._THREE + + def _make_file_resource_response(self, path: tuple[str, ...]) \ + -> base.ProducedResponse: + """....""" + try: + file_data = self.payload_data.payload_ref.get_file_data( + self.haketilo_state, + path + ) + except state.MissingItemError: + return resource_blocked_response + + if file_data is None: + return base.ProducedResponse( + 404, + [(b'Content-Type', b'text/plain; charset=utf-8')], + _('api.file_not_found').encode() + ) + + return base.ProducedResponse( + 200, + ((b'Content-Type', file_data.type.encode()),), + file_data.contents + ) + + def consume_request(self, request_info: base.RequestInfo) \ + -> base.ProducedResponse: + """....""" + # Payload resource pattern has path of the form: + # "/some/arbitrary/segments/<per-session_token>/***" + # + # Corresponding requests shall have path of the form: + # "/some/arbitrary/segments/<per-session_token>/actual/resource/path" + # + # Here we need to extract the "/actual/resource/path" part. + segments_to_drop = len(self.payload_data.pattern.path_segments) + 1 + resource_path = request_info.url.path_segments[segments_to_drop:] + + if resource_path == (): + return resource_blocked_response + elif resource_path[0] == 'static': + return self._make_file_resource_response(resource_path[1:]) + elif resource_path[0] == 'api': + # TODO: implement Haketilo APIs + return resource_blocked_response + else: + return resource_blocked_response + + +resource_blocked_response = base.ProducedResponse( + 403, + [(b'Content-Type', b'text/plain; charset=utf-8')], + _('api.resource_not_enabled_for_access').encode() +) + +@dc.dataclass(frozen=True) +class BlockedResponsePolicy(base.Policy): + """....""" + process_request: t.ClassVar[bool] = True + + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._THREE + + def consume_request(self, request_info: base.RequestInfo) \ + -> base.ProducedResponse: + """....""" + return resource_blocked_response + + +@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] +class PayloadResourcePolicyFactory(PayloadAwarePolicyFactory): + """....""" + def make_policy(self, haketilo_state: state.HaketiloState) \ + -> t.Union[PayloadResourcePolicy, BlockedResponsePolicy]: + """....""" + try: + payload_data = self.payload_ref.get_data(haketilo_state) + except state.MissingItemError: + return BlockedResponsePolicy() + + if not payload_data.explicitly_enabled and \ + haketilo_state.get_settings().mapping_use_mode != \ + state.MappingUseMode.AUTO: + return BlockedResponsePolicy() + + return PayloadResourcePolicy(haketilo_state, payload_data) + + diff --git a/src/hydrilla/proxy/policies/rule.py b/src/hydrilla/proxy/policies/rule.py new file mode 100644 index 0000000..eb70147 --- /dev/null +++ b/src/hydrilla/proxy/policies/rule.py @@ -0,0 +1,134 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Policies for blocking and allowing JS in pages fetched with HTTP. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +..... +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import dataclasses as dc +import typing as t + +from ...url_patterns import ParsedPattern +from .. import csp +from .. import state +from . import base + + +class AllowPolicy(base.Policy): + """....""" + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO + +class BlockPolicy(base.Policy): + """....""" + process_response: t.ClassVar[bool] = True + + priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO + + def _modify_headers(self, response_info: base.ResponseInfo) \ + -> t.Iterable[tuple[bytes, bytes]]: + """....""" + csp_policies = csp.extract(response_info.headers) + + for header_name, header_value in response_info.headers.items(): + if header_name.lower() not in csp.header_names_and_dispositions: + yield header_name.encode(), header_value.encode() + + for policy in csp_policies: + if policy.disposition != 'enforce': + continue + + directives = policy.directives.mutate() + directives.pop('report-to', None) + directives.pop('report-uri', None) + + policy = dc.replace(policy, directives=directives.finish()) + + yield policy.header_name.encode(), policy.serialize().encode() + + extra_csp = ';'.join(( + "script-src 'none'", + "script-src-elem 'none'", + "script-src-attr 'none'" + )) + + yield b'Content-Security-Policy', extra_csp.encode() + + + def consume_response(self, response_info: base.ResponseInfo) \ + -> base.ProducedResponse: + """....""" + new_response = response_info.make_produced_response() + + new_headers = self._modify_headers(response_info) + + return dc.replace(new_response, headers=new_headers) + +@dc.dataclass(frozen=True) +class RuleAllowPolicy(AllowPolicy): + """....""" + pattern: ParsedPattern + + +@dc.dataclass(frozen=True) +class RuleBlockPolicy(BlockPolicy): + """....""" + pattern: ParsedPattern + + +@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] +class RulePolicyFactory(base.PolicyFactory): + """....""" + pattern: ParsedPattern + + def __lt__(self, other: base.PolicyFactory) -> bool: + """....""" + if type(other) is type(self): + return super().__lt__(other) + + assert isinstance(other, RulePolicyFactory) + + return self.pattern < other.pattern + + +@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] +class RuleBlockPolicyFactory(RulePolicyFactory): + """....""" + def make_policy(self, haketilo_state: state.HaketiloState) \ + -> RuleBlockPolicy: + """....""" + return RuleBlockPolicy(self.pattern) + + +@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] +class RuleAllowPolicyFactory(RulePolicyFactory): + """....""" + def make_policy(self, haketilo_state: state.HaketiloState) \ + -> RuleAllowPolicy: + """....""" + return RuleAllowPolicy(self.pattern) |