diff options
Diffstat (limited to 'src/hydrilla/proxy/policies/payload.py')
-rw-r--r-- | src/hydrilla/proxy/policies/payload.py | 129 |
1 files changed, 27 insertions, 102 deletions
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py index b89a1c1..8aaf845 100644 --- a/src/hydrilla/proxy/policies/payload.py +++ b/src/hydrilla/proxy/policies/payload.py @@ -49,8 +49,7 @@ class PayloadAwarePolicy(base.Policy): """....""" payload_data: state.PayloadData - def assets_base_url(self, request_url: ParsedUrl): - """....""" + def _assets_base_url(self, request_url: ParsedUrl): token = self.payload_data.unique_token base_path_segments = (*self.payload_data.pattern_path_segments, token) @@ -90,13 +89,6 @@ class PayloadAwarePolicyFactory(base.PolicyFactory): return super().__lt__(other) -UTF8_BOM = b'\xEF\xBB\xBF' -BOMs = ( - (UTF8_BOM, 'utf-8'), - (b'\xFE\xFF', 'utf-16be'), - (b'\xFF\xFE', 'utf-16le') -) - def block_attr(element: bs4.PageElement, attr_name: str) -> None: """ Disable HTML node attributes by prepending `blocked-'. This allows them to @@ -118,37 +110,25 @@ class PayloadInjectPolicy(PayloadAwarePolicy): priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO - def _new_csp(self, request_url: ParsedUrl) -> str: - """....""" - assets_base = self.assets_base_url(request_url) + def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Sequence[str]: + return ['script-src'] - script_src = f"script-src {assets_base}" + def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Mapping[str, t.Sequence[str]]: + allowed_origins = [self._assets_base_url(http_info.request_info.url)] if self.payload_data.eval_allowed: - script_src = f"{script_src} 'unsafe-eval'" - - return '; '.join(( - script_src, - "script-src-elem 'none'", - "script-src-attr 'none'" - )) - - def _modify_headers(self, response_info: http_messages.ResponseInfo) \ - -> http_messages.IHeaders: - new_headers = [] - - for key, val in response_info.headers.items(): - if key.lower() not in csp.header_names_and_dispositions: - new_headers.append((key, val)) - - new_csp = self._new_csp(response_info.url) - new_headers.append(('Content-Security-Policy', new_csp)) + allowed_origins.append("'unsafe-eval'") - return http_messages.make_headers(new_headers) + return { + 'script-src': allowed_origins, + 'script-src-elem': ["'none'"], + 'script-src-attr': ["'none'"] + } def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]: - """....""" - base_url = self.assets_base_url(url) + base_url = self._assets_base_url(url) payload_ref = self.payload_data.ref yield base_url + 'api/page_init_script.js' @@ -156,15 +136,13 @@ class PayloadInjectPolicy(PayloadAwarePolicy): for path in payload_ref.get_script_paths(): yield base_url + '/'.join(('static', *path)) - def _modify_body( + def _modify_response_document( self, - url: ParsedUrl, - body: bytes, - encoding: t.Optional[str] - ) -> bytes: - """....""" + http_info: http_messages.FullHTTPInfo, + encoding: t.Optional[str] + ) -> t.Union[bytes, str]: soup = bs4.BeautifulSoup( - markup = body, + markup = http_info.response_info.body, from_encoding = encoding, features = 'html5lib' ) @@ -172,9 +150,9 @@ class PayloadInjectPolicy(PayloadAwarePolicy): # Inject scripts. script_parent = soup.find('body') or soup.find('html') if script_parent is None: - return body + return http_info.response_info.body - for script_url in self._script_urls(url): + for script_url in self._script_urls(http_info.request_info.url): tag = bs4.Tag(name='script', attrs={'src': script_url}) script_parent.append(tag) @@ -182,61 +160,11 @@ class PayloadInjectPolicy(PayloadAwarePolicy): # scripts. for meta in soup.select('head meta[http-equiv]'): header_name = meta.attrs.get('http-equiv', '').lower().strip() - if header_name in csp.enforce_header_names_set: + if header_name in csp.enforce_header_names: block_attr(meta, 'http-equiv') block_attr(meta, 'content') - # Appending a three-byte Byte Order Mark (BOM) will force the browser to - # decode this as UTF-8 regardless of the 'Content-Type' header. See: - # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence - return UTF8_BOM + soup.encode() - - def _consume_response_unsafe( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: - new_headers = self._modify_headers(response_info) - new_response = dc.replace(response_info, headers=new_headers) - - if not http_messages.is_likely_a_page(request_info, response_info): - return new_response - - data = response_info.body - - _, encoding = response_info.deduce_content_type() - - # A UTF BOM overrides encoding specified by the header. - for bom, encoding_name in BOMs: - if data.startswith(bom): - encoding = encoding_name - - new_data = self._modify_body(response_info.url, data, encoding) - - return dc.replace(new_response, body=new_data) - - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: - try: - return self._consume_response_unsafe(request_info, response_info) - except Exception as e: - # TODO: actually describe the errors - import traceback - - error_info_list = traceback.format_exception( - type(e), - e, - e.__traceback__ - ) - - return http_messages.ResponseInfo.make( - status_code = 500, - headers = (('Content-Type', 'text/plain; charset=utf-8'),), - body = '\n'.join(error_info_list).encode() - ) + return soup.decode() class _PayloadHasProblemsError(HaketiloException): @@ -246,22 +174,19 @@ class AutoPayloadInjectPolicy(PayloadInjectPolicy): """....""" priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: + def consume_response(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Optional[http_messages.ResponseInfo]: try: if self.payload_data.ref.has_problems(): raise _PayloadHasProblemsError() self.payload_data.ref.ensure_items_installed() - return super().consume_response(request_info, response_info) + return super().consume_response(http_info) except (state.RepoCommunicationError, state.FileInstallationError, _PayloadHasProblemsError) as ex: extra_params: dict[str, str] = { - 'next_url': response_info.url.orig_url + 'next_url': http_info.response_info.url.orig_url } if isinstance(ex, state.FileInstallationError): extra_params['repo_id'] = ex.repo_id |