aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/policies
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-07-27 15:56:24 +0200
committerWojtek Kosior <koszko@koszko.org>2022-08-10 17:25:05 +0200
commit879c41927171efc8d77d1de2739b18e2eb57580f (patch)
treede0e78afe2ea49e58c9bf2c662657392a00139ee /src/hydrilla/proxy/policies
parent52d12a4fa124daa1595529e3e7008276a7986d95 (diff)
downloadhaketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.tar.gz
haketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.zip
unfinished partial work
Diffstat (limited to 'src/hydrilla/proxy/policies')
-rw-r--r--src/hydrilla/proxy/policies/__init__.py15
-rw-r--r--src/hydrilla/proxy/policies/base.py177
-rw-r--r--src/hydrilla/proxy/policies/fallback.py60
-rw-r--r--src/hydrilla/proxy/policies/payload.py315
-rw-r--r--src/hydrilla/proxy/policies/payload_resource.py160
-rw-r--r--src/hydrilla/proxy/policies/rule.py134
6 files changed, 861 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/policies/__init__.py b/src/hydrilla/proxy/policies/__init__.py
new file mode 100644
index 0000000..66e07ee
--- /dev/null
+++ b/src/hydrilla/proxy/policies/__init__.py
@@ -0,0 +1,15 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+from .base import *
+
+from .payload import PayloadPolicyFactory
+
+from .payload_resource import PayloadResourcePolicyFactory
+
+from .rule import RuleBlockPolicyFactory, RuleAllowPolicyFactory
+
+from .fallback import FallbackAllowPolicy, FallbackBlockPolicy, ErrorBlockPolicy
diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py
new file mode 100644
index 0000000..3bde6f2
--- /dev/null
+++ b/src/hydrilla/proxy/policies/base.py
@@ -0,0 +1,177 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Base defintions for policies for altering HTTP requests.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import sys
+
+if sys.version_info >= (3, 8):
+ from typing import Protocol
+else:
+ from typing_extensions import Protocol
+
+import dataclasses as dc
+import typing as t
+import enum
+
+from abc import ABC, abstractmethod
+
+from immutables import Map
+
+from ...url_patterns import ParsedUrl
+from .. import state
+
+
+class PolicyPriority(int, enum.Enum):
+ """...."""
+ _ONE = 1
+ _TWO = 2
+ _THREE = 3
+
+DefaultGetValue = t.TypeVar('DefaultGetValue', object, None)
+
+class IHeaders(Protocol):
+ """...."""
+ def __getitem__(self, key: str) -> str: ...
+
+ def get_all(self, key: str) -> t.Iterable[str]: ...
+
+ def get(self, key: str, default: DefaultGetValue = None) \
+ -> t.Union[str, DefaultGetValue]: ...
+
+ def items(self) -> t.Iterable[tuple[str, str]]: ...
+
+def encode_headers_items(headers: t.Iterable[tuple[str, str]]) \
+ -> t.Iterable[tuple[bytes, bytes]]:
+ """...."""
+ for name, value in headers:
+ yield name.encode(), value.encode()
+
+@dc.dataclass(frozen=True)
+class ProducedRequest:
+ """...."""
+ url: str
+ method: str
+ headers: t.Iterable[tuple[bytes, bytes]]
+ body: bytes
+
+@dc.dataclass(frozen=True)
+class RequestInfo:
+ """...."""
+ url: ParsedUrl
+ method: str
+ headers: IHeaders
+ body: bytes
+
+ def make_produced_request(self) -> ProducedRequest:
+ """...."""
+ return ProducedRequest(
+ url = self.url.orig_url,
+ method = self.method,
+ headers = encode_headers_items(self.headers.items()),
+ body = self.body
+ )
+
+@dc.dataclass(frozen=True)
+class ProducedResponse:
+ """...."""
+ status_code: int
+ headers: t.Iterable[tuple[bytes, bytes]]
+ body: bytes
+
+@dc.dataclass(frozen=True)
+class ResponseInfo:
+ """...."""
+ url: ParsedUrl
+ status_code: int
+ headers: IHeaders
+ body: bytes
+
+ def make_produced_response(self) -> ProducedResponse:
+ """...."""
+ return ProducedResponse(
+ status_code = self.status_code,
+ headers = encode_headers_items(self.headers.items()),
+ body = self.body
+ )
+
+class Policy(ABC):
+ """...."""
+ process_request: t.ClassVar[bool] = False
+ process_response: t.ClassVar[bool] = False
+
+ priority: t.ClassVar[PolicyPriority]
+
+ @property
+ def anticache(self) -> bool:
+ return self.process_request or self.process_response
+
+ def consume_request(self, request_info: RequestInfo) \
+ -> t.Optional[t.Union[ProducedRequest, ProducedResponse]]:
+ """...."""
+ return None
+
+ def consume_response(self, response_info: ResponseInfo) \
+ -> t.Optional[ProducedResponse]:
+ """...."""
+ return None
+
+
+# mypy needs to be corrected:
+# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704
+@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
+class PolicyFactory(ABC):
+ """...."""
+ builtin: bool
+
+ @abstractmethod
+ def make_policy(self, haketilo_state: state.HaketiloState) \
+ -> t.Optional[Policy]:
+ """...."""
+ ...
+
+ def __lt__(self, other: 'PolicyFactory'):
+ """...."""
+ return sorting_keys.get(self.__class__.__name__, 999) < \
+ sorting_keys.get(other.__class__.__name__, 999)
+
+sorting_order = (
+ 'PayloadResourcePolicyFactory',
+
+ 'PayloadPolicyFactory',
+
+ 'RuleBlockPolicyFactory',
+ 'RuleAllowPolicyFactory',
+
+ 'FallbackPolicyFactory'
+)
+
+sorting_keys = Map((cls, name) for name, cls in enumerate(sorting_order))
diff --git a/src/hydrilla/proxy/policies/fallback.py b/src/hydrilla/proxy/policies/fallback.py
new file mode 100644
index 0000000..75da61c
--- /dev/null
+++ b/src/hydrilla/proxy/policies/fallback.py
@@ -0,0 +1,60 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Policies for blocking and allowing JS when no other policies match.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import dataclasses as dc
+import typing as t
+import enum
+
+from abc import ABC, abstractmethod
+
+from .. import state
+from . import base
+from .rule import AllowPolicy, BlockPolicy
+
+
+class FallbackAllowPolicy(AllowPolicy):
+ """....."""
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
+
+
+class FallbackBlockPolicy(BlockPolicy):
+ """...."""
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
+
+
+@dc.dataclass(frozen=True)
+class ErrorBlockPolicy(BlockPolicy):
+ """...."""
+ error: Exception
+
+ builtin: bool = True
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py
new file mode 100644
index 0000000..d616f1b
--- /dev/null
+++ b/src/hydrilla/proxy/policies/payload.py
@@ -0,0 +1,315 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Policies for applying payload injections to HTTP requests.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import dataclasses as dc
+import typing as t
+import re
+
+import bs4 # type: ignore
+
+from ...url_patterns import ParsedUrl
+from .. import state
+from .. import csp
+from . import base
+
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class PayloadAwarePolicy(base.Policy):
+ """...."""
+ haketilo_state: state.HaketiloState
+ payload_data: state.PayloadData
+
+ def assets_base_url(self, request_url: ParsedUrl):
+ """...."""
+ token = self.payload_data.unique_token
+
+ base_path_segments = (*self.payload_data.pattern.path_segments, token)
+
+ return f'{request_url.url_without_path}/{"/".join(base_path_segments)}/'
+
+
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class PayloadAwarePolicyFactory(base.PolicyFactory):
+ """...."""
+ payload_key: state.PayloadKey
+
+ @property
+ def payload_ref(self) -> state.PayloadRef:
+ """...."""
+ return self.payload_key.payload_ref
+
+ def __lt__(self, other: base.PolicyFactory) -> bool:
+ """...."""
+ if isinstance(other, type(self)):
+ return self.payload_key < other.payload_key
+
+ return super().__lt__(other)
+
+
+# For details of 'Content-Type' header's structure, see:
+# https://datatracker.ietf.org/doc/html/rfc7231#section-3.1.1.1
+content_type_reg = re.compile(r'''
+^
+(?P<mime>[\w-]+/[\w-]+)
+\s*
+(?:
+ ;
+ (?:[^;]*;)* # match possible parameter other than "charset"
+)
+\s*
+charset= # no whitespace allowed in parameter as per RFC
+(?P<encoding>
+ [\w-]+
+ |
+ "[\w-]+" # quotes are optional per RFC
+)
+(?:;[^;]+)* # match possible parameter other than "charset"
+$ # forbid possible dangling characters after closing '"'
+''', re.VERBOSE | re.IGNORECASE)
+
+def deduce_content_type(headers: base.IHeaders) \
+ -> tuple[t.Optional[str], t.Optional[str]]:
+ """...."""
+ content_type = headers.get('content-type')
+ if content_type is None:
+ return (None, None)
+
+ match = content_type_reg.match(content_type)
+ if match is None:
+ return (None, None)
+
+ mime, encoding = match.group('mime'), match.group('encoding')
+
+ if encoding is not None:
+ encoding = encoding.lower()
+
+ return mime, encoding
+
+UTF8_BOM = b'\xEF\xBB\xBF'
+BOMs = (
+ (UTF8_BOM, 'utf-8'),
+ (b'\xFE\xFF', 'utf-16be'),
+ (b'\xFF\xFE', 'utf-16le')
+)
+
+def block_attr(element: bs4.PageElement, attr_name: str) -> None:
+ """
+ Disable HTML node attributes by prepending `blocked-'. This allows them to
+ still be relatively easily accessed in case they contain some useful data.
+ """
+ blocked_value = element.attrs.pop(attr_name, None)
+
+ while blocked_value is not None:
+ attr_name = f'blocked-{attr_name}'
+ next_blocked_value = element.attrs.pop(attr_name, None)
+ element.attrs[attr_name] = blocked_value
+
+ blocked_value = next_blocked_value
+
+@dc.dataclass(frozen=True)
+class PayloadInjectPolicy(PayloadAwarePolicy):
+ """...."""
+ process_response: t.ClassVar[bool] = True
+
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
+
+ def _new_csp(self, request_url: ParsedUrl) -> str:
+ """...."""
+ assets_base = self.assets_base_url(request_url)
+
+ script_src = f"script-src {assets_base}"
+
+ if self.payload_data.eval_allowed:
+ script_src = f"{script_src} 'unsafe-eval'"
+
+ return '; '.join((
+ script_src,
+ "script-src-elem 'none'",
+ "script-src-attr 'none'"
+ ))
+
+ def _modify_headers(self, response_info: base.ResponseInfo) \
+ -> t.Iterable[tuple[bytes, bytes]]:
+ """...."""
+ for header_name, header_value in response_info.headers.items():
+ if header_name.lower() not in csp.header_names_and_dispositions:
+ yield header_name.encode(), header_value.encode()
+
+ new_csp = self._new_csp(response_info.url)
+
+ yield b'Content-Security-Policy', new_csp.encode()
+
+ def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]:
+ """...."""
+ base_url = self.assets_base_url(url)
+ payload_ref = self.payload_data.payload_ref
+
+ for path in payload_ref.get_script_paths(self.haketilo_state):
+ yield base_url + '/'.join(('static', *path))
+
+ def _modify_body(
+ self,
+ url: ParsedUrl,
+ body: bytes,
+ encoding: t.Optional[str]
+ ) -> bytes:
+ """...."""
+ soup = bs4.BeautifulSoup(
+ markup = body,
+ from_encoding = encoding,
+ features = 'html5lib'
+ )
+
+ # Inject scripts.
+ script_parent = soup.find('body') or soup.find('html')
+ if script_parent is None:
+ return body
+
+ for script_url in self._script_urls(url):
+ tag = bs4.Tag(name='script', attrs={'src': script_url})
+ script_parent.append(tag)
+
+ # Remove Content Security Policy that could possibly block injected
+ # scripts.
+ for meta in soup.select('head meta[http-equiv]'):
+ header_name = meta.attrs.get('http-equiv', '').lower().strip()
+ if header_name in csp.enforce_header_names_set:
+ block_attr(meta, 'http-equiv')
+ block_attr(meta, 'content')
+
+ # Appending a three-byte Byte Order Mark (BOM) will force the browser to
+ # decode this as UTF-8 regardless of the 'Content-Type' header. See:
+ # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
+ return UTF8_BOM + soup.encode()
+
+ def _consume_response_unsafe(self, response_info: base.ResponseInfo) \
+ -> base.ProducedResponse:
+ """...."""
+ new_response = response_info.make_produced_response()
+
+ new_headers = self._modify_headers(response_info)
+
+ new_response = dc.replace(new_response, headers=new_headers)
+
+ mime, encoding = deduce_content_type(response_info.headers)
+ if mime is None or 'html' not in mime.lower():
+ return new_response
+
+ data = response_info.body
+ if data is None:
+ data = b''
+
+ # A UTF BOM overrides encoding specified by the header.
+ for bom, encoding_name in BOMs:
+ if data.startswith(bom):
+ encoding = encoding_name
+
+ new_data = self._modify_body(response_info.url, data, encoding)
+
+ return dc.replace(new_response, body=new_data)
+
+ def consume_response(self, response_info: base.ResponseInfo) \
+ -> base.ProducedResponse:
+ """...."""
+ try:
+ return self._consume_response_unsafe(response_info)
+ except Exception as e:
+ # TODO: actually describe the errors
+ import traceback
+
+ error_info_list = traceback.format_exception(
+ type(e),
+ e,
+ e.__traceback__
+ )
+
+ return base.ProducedResponse(
+ 500,
+ ((b'Content-Type', b'text/plain; charset=utf-8'),),
+ '\n'.join(error_info_list).encode()
+ )
+
+
+class AutoPayloadInjectPolicy(PayloadInjectPolicy):
+ """...."""
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
+
+ def _modify_body(
+ self,
+ url: ParsedUrl,
+ body: bytes,
+ encoding: t.Optional[str]
+ ) -> bytes:
+ """...."""
+ payload_ref = self.payload_data.payload_ref
+ mapping_ref = payload_ref.get_mapping(self.haketilo_state)
+ mapping_ref.enable(self.haketilo_state)
+
+ return super()._modify_body(url, body, encoding)
+
+
+@dc.dataclass(frozen=True)
+class PayloadSuggestPolicy(PayloadAwarePolicy):
+ """...."""
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
+
+ def make_response(self, request_info: base.RequestInfo) \
+ -> base.ProducedResponse:
+ """...."""
+ # TODO: implement
+ return base.ProducedResponse(200, ((b'a', b'b'),), b'')
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
+class PayloadPolicyFactory(PayloadAwarePolicyFactory):
+ """...."""
+ def make_policy(self, haketilo_state: state.HaketiloState) \
+ -> t.Optional[base.Policy]:
+ """...."""
+ try:
+ payload_data = self.payload_ref.get_data(haketilo_state)
+ except:
+ return None
+
+ if payload_data.explicitly_enabled:
+ return PayloadInjectPolicy(haketilo_state, payload_data)
+
+ mode = haketilo_state.get_settings().mapping_use_mode
+
+ if mode == state.MappingUseMode.QUESTION:
+ return PayloadSuggestPolicy(haketilo_state, payload_data)
+
+ if mode == state.MappingUseMode.WHEN_ENABLED:
+ return None
+
+ # mode == state.MappingUseMode.AUTO
+ return AutoPayloadInjectPolicy(haketilo_state, payload_data)
diff --git a/src/hydrilla/proxy/policies/payload_resource.py b/src/hydrilla/proxy/policies/payload_resource.py
new file mode 100644
index 0000000..84d0919
--- /dev/null
+++ b/src/hydrilla/proxy/policies/payload_resource.py
@@ -0,0 +1,160 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Policies for resolving HTTP requests with local resources.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+
+We make file resources available to HTTP clients by mapping them
+at:
+ http(s)://<pattern-matching_origin>/<pattern_path>/<token>/
+where <token> is a per-session secret unique for every mapping.
+For example, a payload with pattern like the following:
+ http*://***.example.com/a/b/**
+Could cause resources to be mapped (among others) at each of:
+ https://example.com/a/b/**/Da2uiF2UGfg/
+ https://www.example.com/a/b/**/Da2uiF2UGfg/
+ http://gnome.vs.kde.example.com/a/b/**/Da2uiF2UGfg/
+
+Unauthorized web pages running in the user's browser are exected to be
+unable to guess the secret. This way we stop them from spying on the
+user and from interfering with Haketilo's normal operation.
+
+This is only a soft prevention method. With some mechanisms
+(e.g. service workers), under certain scenarios, it might be possible
+to bypass it. Thus, to make the risk slightly smaller, we also block
+the unauthorized accesses that we can detect.
+
+Since a web page authorized to access the resources may only be served
+when the corresponding mapping is enabled (or AUTO mode is on), we
+consider accesses to non-enabled mappings' resources a security breach
+and block them by responding with 403 Not Found.
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import dataclasses as dc
+import typing as t
+
+from ...translations import smart_gettext as _
+from .. import state
+from . import base
+from .payload import PayloadAwarePolicy, PayloadAwarePolicyFactory
+
+
+@dc.dataclass(frozen=True)
+class PayloadResourcePolicy(PayloadAwarePolicy):
+ """...."""
+ process_request: t.ClassVar[bool] = True
+
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._THREE
+
+ def _make_file_resource_response(self, path: tuple[str, ...]) \
+ -> base.ProducedResponse:
+ """...."""
+ try:
+ file_data = self.payload_data.payload_ref.get_file_data(
+ self.haketilo_state,
+ path
+ )
+ except state.MissingItemError:
+ return resource_blocked_response
+
+ if file_data is None:
+ return base.ProducedResponse(
+ 404,
+ [(b'Content-Type', b'text/plain; charset=utf-8')],
+ _('api.file_not_found').encode()
+ )
+
+ return base.ProducedResponse(
+ 200,
+ ((b'Content-Type', file_data.type.encode()),),
+ file_data.contents
+ )
+
+ def consume_request(self, request_info: base.RequestInfo) \
+ -> base.ProducedResponse:
+ """...."""
+ # Payload resource pattern has path of the form:
+ # "/some/arbitrary/segments/<per-session_token>/***"
+ #
+ # Corresponding requests shall have path of the form:
+ # "/some/arbitrary/segments/<per-session_token>/actual/resource/path"
+ #
+ # Here we need to extract the "/actual/resource/path" part.
+ segments_to_drop = len(self.payload_data.pattern.path_segments) + 1
+ resource_path = request_info.url.path_segments[segments_to_drop:]
+
+ if resource_path == ():
+ return resource_blocked_response
+ elif resource_path[0] == 'static':
+ return self._make_file_resource_response(resource_path[1:])
+ elif resource_path[0] == 'api':
+ # TODO: implement Haketilo APIs
+ return resource_blocked_response
+ else:
+ return resource_blocked_response
+
+
+resource_blocked_response = base.ProducedResponse(
+ 403,
+ [(b'Content-Type', b'text/plain; charset=utf-8')],
+ _('api.resource_not_enabled_for_access').encode()
+)
+
+@dc.dataclass(frozen=True)
+class BlockedResponsePolicy(base.Policy):
+ """...."""
+ process_request: t.ClassVar[bool] = True
+
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._THREE
+
+ def consume_request(self, request_info: base.RequestInfo) \
+ -> base.ProducedResponse:
+ """...."""
+ return resource_blocked_response
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
+class PayloadResourcePolicyFactory(PayloadAwarePolicyFactory):
+ """...."""
+ def make_policy(self, haketilo_state: state.HaketiloState) \
+ -> t.Union[PayloadResourcePolicy, BlockedResponsePolicy]:
+ """...."""
+ try:
+ payload_data = self.payload_ref.get_data(haketilo_state)
+ except state.MissingItemError:
+ return BlockedResponsePolicy()
+
+ if not payload_data.explicitly_enabled and \
+ haketilo_state.get_settings().mapping_use_mode != \
+ state.MappingUseMode.AUTO:
+ return BlockedResponsePolicy()
+
+ return PayloadResourcePolicy(haketilo_state, payload_data)
+
+
diff --git a/src/hydrilla/proxy/policies/rule.py b/src/hydrilla/proxy/policies/rule.py
new file mode 100644
index 0000000..eb70147
--- /dev/null
+++ b/src/hydrilla/proxy/policies/rule.py
@@ -0,0 +1,134 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Policies for blocking and allowing JS in pages fetched with HTTP.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import dataclasses as dc
+import typing as t
+
+from ...url_patterns import ParsedPattern
+from .. import csp
+from .. import state
+from . import base
+
+
+class AllowPolicy(base.Policy):
+ """...."""
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
+
+class BlockPolicy(base.Policy):
+ """...."""
+ process_response: t.ClassVar[bool] = True
+
+ priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
+
+ def _modify_headers(self, response_info: base.ResponseInfo) \
+ -> t.Iterable[tuple[bytes, bytes]]:
+ """...."""
+ csp_policies = csp.extract(response_info.headers)
+
+ for header_name, header_value in response_info.headers.items():
+ if header_name.lower() not in csp.header_names_and_dispositions:
+ yield header_name.encode(), header_value.encode()
+
+ for policy in csp_policies:
+ if policy.disposition != 'enforce':
+ continue
+
+ directives = policy.directives.mutate()
+ directives.pop('report-to', None)
+ directives.pop('report-uri', None)
+
+ policy = dc.replace(policy, directives=directives.finish())
+
+ yield policy.header_name.encode(), policy.serialize().encode()
+
+ extra_csp = ';'.join((
+ "script-src 'none'",
+ "script-src-elem 'none'",
+ "script-src-attr 'none'"
+ ))
+
+ yield b'Content-Security-Policy', extra_csp.encode()
+
+
+ def consume_response(self, response_info: base.ResponseInfo) \
+ -> base.ProducedResponse:
+ """...."""
+ new_response = response_info.make_produced_response()
+
+ new_headers = self._modify_headers(response_info)
+
+ return dc.replace(new_response, headers=new_headers)
+
+@dc.dataclass(frozen=True)
+class RuleAllowPolicy(AllowPolicy):
+ """...."""
+ pattern: ParsedPattern
+
+
+@dc.dataclass(frozen=True)
+class RuleBlockPolicy(BlockPolicy):
+ """...."""
+ pattern: ParsedPattern
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
+class RulePolicyFactory(base.PolicyFactory):
+ """...."""
+ pattern: ParsedPattern
+
+ def __lt__(self, other: base.PolicyFactory) -> bool:
+ """...."""
+ if type(other) is type(self):
+ return super().__lt__(other)
+
+ assert isinstance(other, RulePolicyFactory)
+
+ return self.pattern < other.pattern
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
+class RuleBlockPolicyFactory(RulePolicyFactory):
+ """...."""
+ def make_policy(self, haketilo_state: state.HaketiloState) \
+ -> RuleBlockPolicy:
+ """...."""
+ return RuleBlockPolicy(self.pattern)
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
+class RuleAllowPolicyFactory(RulePolicyFactory):
+ """...."""
+ def make_policy(self, haketilo_state: state.HaketiloState) \
+ -> RuleAllowPolicy:
+ """...."""
+ return RuleAllowPolicy(self.pattern)