aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/policies/payload.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/policies/payload.py')
-rw-r--r--src/hydrilla/proxy/policies/payload.py129
1 files changed, 27 insertions, 102 deletions
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py
index b89a1c1..8aaf845 100644
--- a/src/hydrilla/proxy/policies/payload.py
+++ b/src/hydrilla/proxy/policies/payload.py
@@ -49,8 +49,7 @@ class PayloadAwarePolicy(base.Policy):
"""...."""
payload_data: state.PayloadData
- def assets_base_url(self, request_url: ParsedUrl):
- """...."""
+ def _assets_base_url(self, request_url: ParsedUrl):
token = self.payload_data.unique_token
base_path_segments = (*self.payload_data.pattern_path_segments, token)
@@ -90,13 +89,6 @@ class PayloadAwarePolicyFactory(base.PolicyFactory):
return super().__lt__(other)
-UTF8_BOM = b'\xEF\xBB\xBF'
-BOMs = (
- (UTF8_BOM, 'utf-8'),
- (b'\xFE\xFF', 'utf-16be'),
- (b'\xFF\xFE', 'utf-16le')
-)
-
def block_attr(element: bs4.PageElement, attr_name: str) -> None:
"""
Disable HTML node attributes by prepending `blocked-'. This allows them to
@@ -118,37 +110,25 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
- def _new_csp(self, request_url: ParsedUrl) -> str:
- """...."""
- assets_base = self.assets_base_url(request_url)
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Sequence[str]:
+ return ['script-src']
- script_src = f"script-src {assets_base}"
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ allowed_origins = [self._assets_base_url(http_info.request_info.url)]
if self.payload_data.eval_allowed:
- script_src = f"{script_src} 'unsafe-eval'"
-
- return '; '.join((
- script_src,
- "script-src-elem 'none'",
- "script-src-attr 'none'"
- ))
-
- def _modify_headers(self, response_info: http_messages.ResponseInfo) \
- -> http_messages.IHeaders:
- new_headers = []
-
- for key, val in response_info.headers.items():
- if key.lower() not in csp.header_names_and_dispositions:
- new_headers.append((key, val))
-
- new_csp = self._new_csp(response_info.url)
- new_headers.append(('Content-Security-Policy', new_csp))
+ allowed_origins.append("'unsafe-eval'")
- return http_messages.make_headers(new_headers)
+ return {
+ 'script-src': allowed_origins,
+ 'script-src-elem': ["'none'"],
+ 'script-src-attr': ["'none'"]
+ }
def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]:
- """...."""
- base_url = self.assets_base_url(url)
+ base_url = self._assets_base_url(url)
payload_ref = self.payload_data.ref
yield base_url + 'api/page_init_script.js'
@@ -156,15 +136,13 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
for path in payload_ref.get_script_paths():
yield base_url + '/'.join(('static', *path))
- def _modify_body(
+ def _modify_response_document(
self,
- url: ParsedUrl,
- body: bytes,
- encoding: t.Optional[str]
- ) -> bytes:
- """...."""
+ http_info: http_messages.FullHTTPInfo,
+ encoding: t.Optional[str]
+ ) -> t.Union[bytes, str]:
soup = bs4.BeautifulSoup(
- markup = body,
+ markup = http_info.response_info.body,
from_encoding = encoding,
features = 'html5lib'
)
@@ -172,9 +150,9 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
# Inject scripts.
script_parent = soup.find('body') or soup.find('html')
if script_parent is None:
- return body
+ return http_info.response_info.body
- for script_url in self._script_urls(url):
+ for script_url in self._script_urls(http_info.request_info.url):
tag = bs4.Tag(name='script', attrs={'src': script_url})
script_parent.append(tag)
@@ -182,61 +160,11 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
# scripts.
for meta in soup.select('head meta[http-equiv]'):
header_name = meta.attrs.get('http-equiv', '').lower().strip()
- if header_name in csp.enforce_header_names_set:
+ if header_name in csp.enforce_header_names:
block_attr(meta, 'http-equiv')
block_attr(meta, 'content')
- # Appending a three-byte Byte Order Mark (BOM) will force the browser to
- # decode this as UTF-8 regardless of the 'Content-Type' header. See:
- # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
- return UTF8_BOM + soup.encode()
-
- def _consume_response_unsafe(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- new_headers = self._modify_headers(response_info)
- new_response = dc.replace(response_info, headers=new_headers)
-
- if not http_messages.is_likely_a_page(request_info, response_info):
- return new_response
-
- data = response_info.body
-
- _, encoding = response_info.deduce_content_type()
-
- # A UTF BOM overrides encoding specified by the header.
- for bom, encoding_name in BOMs:
- if data.startswith(bom):
- encoding = encoding_name
-
- new_data = self._modify_body(response_info.url, data, encoding)
-
- return dc.replace(new_response, body=new_data)
-
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- try:
- return self._consume_response_unsafe(request_info, response_info)
- except Exception as e:
- # TODO: actually describe the errors
- import traceback
-
- error_info_list = traceback.format_exception(
- type(e),
- e,
- e.__traceback__
- )
-
- return http_messages.ResponseInfo.make(
- status_code = 500,
- headers = (('Content-Type', 'text/plain; charset=utf-8'),),
- body = '\n'.join(error_info_list).encode()
- )
+ return soup.decode()
class _PayloadHasProblemsError(HaketiloException):
@@ -246,22 +174,19 @@ class AutoPayloadInjectPolicy(PayloadInjectPolicy):
"""...."""
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[http_messages.ResponseInfo]:
try:
if self.payload_data.ref.has_problems():
raise _PayloadHasProblemsError()
self.payload_data.ref.ensure_items_installed()
- return super().consume_response(request_info, response_info)
+ return super().consume_response(http_info)
except (state.RepoCommunicationError, state.FileInstallationError,
_PayloadHasProblemsError) as ex:
extra_params: dict[str, str] = {
- 'next_url': response_info.url.orig_url
+ 'next_url': http_info.response_info.url.orig_url
}
if isinstance(ex, state.FileInstallationError):
extra_params['repo_id'] = ex.repo_id