aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/policies/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/policies/base.py')
-rw-r--r--src/hydrilla/proxy/policies/base.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py
new file mode 100644
index 0000000..967e2c4
--- /dev/null
+++ b/src/hydrilla/proxy/policies/base.py
@@ -0,0 +1,363 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Base defintions for policies for altering HTTP requests.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this
+# code in a proprietary program, I am not going to enforce this in
+# court.
+
+"""
+.....
+"""
+
+import enum
+import re
+import threading
+import dataclasses as dc
+import typing as t
+
+from abc import ABC, abstractmethod
+from hashlib import sha256
+from base64 import b64encode
+
+import jinja2
+
+from immutables import Map
+
+from ... import translations
+from ... import url_patterns
+from ... import common_jinja_templates
+from .. import state
+from .. import http_messages
+from .. import csp
+
+
+_info_loader = jinja2.PackageLoader(
+ __package__,
+ package_path = 'info_pages_templates'
+)
+_combined_loader = common_jinja_templates.combine_with_loaders([_info_loader])
+_jinja_info_env = jinja2.Environment(
+ loader = _combined_loader,
+ autoescape = jinja2.select_autoescape(['html.jinja']),
+ lstrip_blocks = True,
+ extensions = ['jinja2.ext.i18n', 'jinja2.ext.do']
+)
+_jinja_info_env.globals['url_patterns'] = url_patterns
+_jinja_info_lock = threading.Lock()
+
+
+_jinja_script_loader = jinja2.PackageLoader(
+ __package__,
+ package_path = 'injectable_scripts'
+)
+_jinja_script_env = jinja2.Environment(
+ loader = _jinja_script_loader,
+ autoescape = False,
+ lstrip_blocks = True,
+ extensions = ['jinja2.ext.do']
+)
+_jinja_script_lock = threading.Lock()
+
+def get_script_template(template_file_name: str) -> jinja2.Template:
+ with _jinja_script_lock:
+ return _jinja_script_env.get_template(template_file_name)
+
+
+response_work_data = threading.local()
+
+def response_nonce() -> str:
+ """
+ When called multiple times during consume_response(), each time returns the
+ same unpredictable string unique to this response. The string is used as a
+ nonce for script elements.
+ """
+ return response_work_data.nonce
+
+
+class PolicyPriority(int, enum.Enum):
+ """...."""
+ _ONE = 1
+ _TWO = 2
+ _THREE = 3
+
+
+class MsgProcessOpt(enum.Enum):
+ """...."""
+ MUST = True
+ MUST_NOT = False
+
+
+MessageInfo = t.Union[
+ http_messages.RequestInfo,
+ http_messages.ResponseInfo
+]
+
+
+# We're doing *very* simple doctype matching for now. If a site wanted, it could
+# trick us into getting this wrong.
+doctype_re = re.compile(r'^\s*<!doctype[^>]*>', re.IGNORECASE)
+
+
+UTF8_BOM = b'\xEF\xBB\xBF'
+BOMs = (
+ (UTF8_BOM, 'utf-8'),
+ (b'\xFE\xFF', 'utf-16be'),
+ (b'\xFF\xFE', 'utf-16le')
+)
+
+
+# mypy needs to be corrected:
+# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class Policy(ABC):
+ _process_request: t.ClassVar[t.Optional[MsgProcessOpt]] = None
+ _process_response: t.ClassVar[t.Optional[MsgProcessOpt]] = None
+ anticache: t.ClassVar[bool] = True
+
+ priority: t.ClassVar[PolicyPriority]
+
+ haketilo_settings: state.HaketiloGlobalSettings
+
+ @property
+ def current_popup_settings(self) -> state.PopupSettings:
+ return self.haketilo_settings.default_popup_jsallowed
+
+ def should_process_request(
+ self,
+ request_info: http_messages.BodylessRequestInfo
+ ) -> bool:
+ return self._process_request == MsgProcessOpt.MUST
+
+ def should_process_response(
+ self,
+ request_info: http_messages.RequestInfo,
+ response_info: http_messages.AnyResponseInfo
+ ) -> bool:
+ if self._process_response is not None:
+ return self._process_response.value
+
+ return (self.current_popup_settings.popup_enabled and
+ http_messages.is_likely_a_page(request_info, response_info))
+
+ def _get_info_template(self, template_file_name: str) -> jinja2.Template:
+ with _jinja_info_lock:
+ chosen_locale = self.haketilo_settings.locale
+ if chosen_locale not in translations.supported_locales:
+ chosen_locale = None
+
+ if chosen_locale is None:
+ chosen_locale = translations.default_locale
+
+ trans = translations.translation(chosen_locale)
+ _jinja_info_env.install_gettext_translations(trans) # type: ignore
+ return _jinja_info_env.get_template(template_file_name)
+
+
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Union[t.Sequence[str], t.Literal['all']]:
+ return ()
+
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ return Map()
+
+ def _csp_to_extend(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ if (self.current_popup_settings.popup_enabled and
+ http_info.is_likely_a_page):
+ nonce_source = f"'nonce-{response_nonce()}'"
+ directives = (
+ 'script-src',
+ 'script-src-elem',
+ 'style-src',
+ 'frame-src'
+ )
+ return dict((directive, [nonce_source]) for directive in directives)
+ else:
+ return Map()
+
+ def _modify_response_headers(self, http_info: http_messages.FullHTTPInfo) \
+ -> http_messages.IHeaders:
+ csp_to_clear = self._csp_to_clear(http_info)
+ csp_to_add = self._csp_to_add(http_info)
+ csp_to_extend = self._csp_to_extend(http_info)
+
+ if len(csp_to_clear) + len(csp_to_extend) + len(csp_to_add) == 0:
+ return http_info.response_info.headers
+
+ return csp.modify(
+ headers = http_info.response_info.headers,
+ clear = csp_to_clear,
+ add = csp_to_add,
+ extend = csp_to_extend
+ )
+
+ def _modify_response_document(
+ self,
+ http_info: http_messages.FullHTTPInfo,
+ encoding: t.Optional[str]
+ ) -> t.Union[str, bytes]:
+ popup_settings = self.current_popup_settings
+
+ if popup_settings.popup_enabled:
+ nonce = response_nonce()
+
+ popup_page = self.make_info_page(http_info)
+ if popup_page is None:
+ template = self._get_info_template(
+ 'special_page_info.html.jinja'
+ )
+ popup_page = template.render(
+ url = http_info.request_info.url.orig_url
+ )
+
+ template = get_script_template('popup.js.jinja')
+ popup_script = template.render(
+ popup_page_b64 = b64encode(popup_page.encode()).decode(),
+ nonce_b64 = b64encode(nonce.encode()).decode(),
+ # TODO: add an option to configure popup style in the web UI.
+ # Then start passing the real style value.
+ #popup_style = popup_settings.style.value
+ popup_style = 'D'
+ )
+
+ if encoding is None:
+ encoding = 'utf-8'
+
+ body_bytes = http_info.response_info.body
+ body = body_bytes.decode(encoding, errors='replace')
+
+ match = doctype_re.match(body)
+ doctype_decl_len = 0 if match is None else match.end()
+
+ dotype_decl = body[0:doctype_decl_len]
+ doc_rest = body[doctype_decl_len:]
+ script_tag = f'<script nonce="{nonce}">{popup_script}</script>'
+
+ return dotype_decl + script_tag + doc_rest
+ else:
+ return http_info.response_info.body
+
+ def _modify_response_body(self, http_info: http_messages.FullHTTPInfo) \
+ -> bytes:
+ if not http_info.is_likely_a_page:
+ return http_info.response_info.body
+
+ data = http_info.response_info.body
+
+ _, encoding = http_info.response_info.deduce_content_type()
+
+ # A UTF BOM overrides encoding specified by the header.
+ for bom, encoding_name in BOMs:
+ if data.startswith(bom):
+ encoding = encoding_name
+
+ new_data = self._modify_response_document(http_info, encoding)
+
+ if isinstance(new_data, str):
+ # Appending a three-byte Byte Order Mark (BOM) will force the
+ # browser to decode this as UTF-8 regardless of the 'Content-Type'
+ # header. See
+ # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
+ new_data = UTF8_BOM + new_data.encode()
+
+ return new_data
+
+ def consume_request(self, request_info: http_messages.RequestInfo) \
+ -> t.Optional[MessageInfo]:
+ # We're not using @abstractmethod because not every Policy needs it and
+ # we don't want to force child classes into implementing dummy methods.
+ raise NotImplementedError(
+ 'This kind of policy does not consume requests.'
+ )
+
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[http_messages.ResponseInfo]:
+ try:
+ new_headers = self._modify_response_headers(http_info)
+ new_body = self._modify_response_body(http_info)
+ except Exception as e:
+ # In the future we might want to actually describe eventual errors.
+ # For now, we're just printing the stack trace.
+ import traceback
+
+ error_info_list = traceback.format_exception(
+ type(e),
+ e,
+ e.__traceback__
+ )
+
+ return http_messages.ResponseInfo.make(
+ status_code = 500,
+ headers = (('Content-Type', 'text/plain; charset=utf-8'),),
+ body = '\n'.join(error_info_list).encode()
+ )
+
+ if (new_headers is http_info.response_info.headers and
+ new_body is http_info.response_info.body):
+ return None
+
+ return dc.replace(
+ http_info.response_info,
+ headers = new_headers,
+ body = new_body
+ )
+
+ def make_info_page(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[str]:
+ return None
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
+class PolicyFactory(ABC):
+ """...."""
+ builtin: bool
+
+ @abstractmethod
+ def make_policy(self, haketilo_state: state.HaketiloState) \
+ -> t.Optional[Policy]:
+ """...."""
+ ...
+
+ def __lt__(self, other: 'PolicyFactory'):
+ """...."""
+ return sorting_keys.get(self.__class__.__name__, 999) < \
+ sorting_keys.get(other.__class__.__name__, 999)
+
+sorting_order = (
+ 'WebUIMainPolicyFactory',
+ 'WebUILandingPolicyFactory',
+
+ 'MitmItPagePolicyFactory',
+
+ 'PayloadResourcePolicyFactory',
+
+ 'PayloadPolicyFactory',
+
+ 'RuleBlockPolicyFactory',
+ 'RuleAllowPolicyFactory',
+
+ 'FallbackPolicyFactory'
+)
+
+sorting_keys = Map((cls, name) for name, cls in enumerate(sorting_order))