diff options
Diffstat (limited to 'src/hydrilla/proxy/policies/base.py')
-rw-r--r-- | src/hydrilla/proxy/policies/base.py | 95 |
1 files changed, 77 insertions, 18 deletions
diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py index 7ce5105..1626b5c 100644 --- a/src/hydrilla/proxy/policies/base.py +++ b/src/hydrilla/proxy/policies/base.py @@ -31,10 +31,10 @@ import enum import re +import threading import dataclasses as dc import typing as t -from threading import Lock from abc import ABC, abstractmethod from hashlib import sha256 from base64 import b64encode @@ -43,24 +43,60 @@ import jinja2 from immutables import Map -from ... url_patterns import ParsedUrl +from ...translations import translation as make_translation +from ... import url_patterns +from ... import common_jinja_templates from .. import state from .. import http_messages from .. import csp -loader = jinja2.PackageLoader(__package__, package_path='injectable_scripts') -jinja_env = jinja2.Environment( - loader = loader, +_info_loader = jinja2.PackageLoader( + __package__, + package_path = 'info_pages_templates' +) +_combined_loader = common_jinja_templates.combine_with_loaders([_info_loader]) +_jinja_info_env = jinja2.Environment( + loader = _combined_loader, + autoescape = jinja2.select_autoescape(['html.jinja']), + lstrip_blocks = True, + extensions = ['jinja2.ext.i18n', 'jinja2.ext.do'] +) +_jinja_info_env.install_gettext_translations(make_translation()) # type: ignore +_jinja_info_env.globals['url_patterns'] = url_patterns +_jinja_info_lock = threading.Lock() + +def get_info_template(template_file_name: str) -> jinja2.Template: + with _jinja_info_lock: + return _jinja_info_env.get_template(template_file_name) + + +_jinja_script_loader = jinja2.PackageLoader( + __package__, + package_path = 'injectable_scripts' +) +_jinja_script_env = jinja2.Environment( + loader = _jinja_script_loader, + autoescape = False, lstrip_blocks = True, - autoescape = False + extensions = ['jinja2.ext.do'] ) -jinja_lock = Lock() +_jinja_script_lock = threading.Lock() +def get_script_template(template_file_name: str) -> jinja2.Template: + with _jinja_script_lock: + return _jinja_script_env.get_template(template_file_name) -popup_script = jinja_env.get_template('popup.js.jinja').render() -popup_script_sha256_bytes = sha256(popup_script.encode()).digest() -popup_script_sha256_b64 = b64encode(popup_script_sha256_bytes).decode() + +response_work_data = threading.local() + +def response_nonce() -> str: + """ + When called multiple times during consume_response(), each time returns the + same unpredictable string unique to this response. The string is used as a + nonce for script elements. + """ + return response_work_data.nonce class PolicyPriority(int, enum.Enum): @@ -140,7 +176,9 @@ class Policy(ABC): -> t.Mapping[str, t.Sequence[str]]: if (self.current_popup_settings.popup_enabled and http_info.is_likely_a_page): - return {'script-src': [f"'sha256-{popup_script_sha256_b64}'"]} + nonce_source = f"'nonce-{response_nonce()}'" + directives = ('script-src', 'style-src', 'frame-src') + return dict((directive, [nonce_source]) for directive in directives) else: return Map() @@ -167,8 +205,26 @@ class Policy(ABC): ) -> t.Union[str, bytes]: popup_settings = self.current_popup_settings - if (popup_settings.popup_enabled and - http_info.is_likely_a_page): + if popup_settings.popup_enabled: + nonce = response_nonce() + + popup_page = self.make_info_page(http_info) + if popup_page is None: + template = get_info_template('special_page_info.html.jinja') + popup_page = template.render( + url = http_info.request_info.url.orig_url + ) + + template = get_script_template('popup.js.jinja') + popup_script = template.render( + popup_page_b64 = b64encode(popup_page.encode()).decode(), + nonce_b64 = b64encode(nonce.encode()).decode(), + # TODO: add an option to configure popup style in the web UI. + # Then start passing the real style value. + #popup_style = popup_settings.style.value + popup_style = 'D' + ) + if encoding is None: encoding = 'utf-8' @@ -180,16 +236,15 @@ class Policy(ABC): dotype_decl = body[0:doctype_decl_len] doc_rest = body[doctype_decl_len:] - return f'{dotype_decl}<script>{popup_script}</script>{doc_rest}' + script_tag = f'<script nonce="{nonce}">{popup_script}</script>' + + return dotype_decl + script_tag + doc_rest else: return http_info.response_info.body def _modify_response_body(self, http_info: http_messages.FullHTTPInfo) \ -> bytes: - if not http_messages.is_likely_a_page( - request_info = http_info.request_info, - response_info = http_info.response_info - ): + if not http_info.is_likely_a_page: return http_info.response_info.body data = http_info.response_info.body @@ -252,6 +307,10 @@ class Policy(ABC): body = new_body ) + def make_info_page(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Optional[str]: + return None + @dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] class PolicyFactory(ABC): |