diff options
author | Wojtek Kosior <koszko@koszko.org> | 2022-10-21 18:29:43 +0200 |
---|---|---|
committer | Wojtek Kosior <koszko@koszko.org> | 2022-10-21 18:29:43 +0200 |
commit | 85a0cacb28b84005d5d034a53973153d49214723 (patch) | |
tree | 3839b70f196d28ef9db21add79be633f3847ffae /src/hydrilla/proxy | |
parent | 3360fff8b25a60cc1f378b80692d2162e70dc142 (diff) | |
download | haketilo-hydrilla-85a0cacb28b84005d5d034a53973153d49214723.tar.gz haketilo-hydrilla-85a0cacb28b84005d5d034a53973153d49214723.zip |
[proxy] rework CSP manipulation
Diffstat (limited to 'src/hydrilla/proxy')
-rw-r--r-- | src/hydrilla/proxy/addon.py | 11 | ||||
-rw-r--r-- | src/hydrilla/proxy/csp.py | 146 | ||||
-rw-r--r-- | src/hydrilla/proxy/http_messages.py | 10 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/base.py | 114 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/misc.py | 10 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/payload.py | 129 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/payload_resource.py | 32 | ||||
-rw-r--r-- | src/hydrilla/proxy/policies/rule.py | 48 | ||||
-rw-r--r-- | src/hydrilla/proxy/state_impl/base.py | 5 |
9 files changed, 283 insertions, 222 deletions
diff --git a/src/hydrilla/proxy/addon.py b/src/hydrilla/proxy/addon.py index de864fc..d5b0537 100644 --- a/src/hydrilla/proxy/addon.py +++ b/src/hydrilla/proxy/addon.py @@ -105,6 +105,10 @@ class FlowHandling: body = self.flow.response.get_content(strict=False) or b'' return self.bl_response_info.with_body(body) + @property + def full_http_info(self) -> http_messages.FullHTTPInfo: + return http_messages.FullHTTPInfo(self.request_info, self.response_info) + @staticmethod def make( flow: http.HTTPFlow, @@ -234,7 +238,7 @@ class HaketiloAddon: try: parsed_url = url_patterns.parse_url(flow.request.url) except url_patterns.HaketiloURLException as e: - policy = policies.ErrorBlockPolicy(builtin=True, error=e) + policy = policies.ErrorBlockPolicy(error=e) parsed_url = url_patterns.dummy_url else: policy = self.state.select_policy(parsed_url) @@ -338,10 +342,7 @@ class HaketiloAddon: with self.http_safe_event_handling(flow): handling = self.get_flow_handling(flow) - result = handling.policy.consume_response( - request_info = handling.request_info, - response_info = handling.response_info - ) + result = handling.policy.consume_response(handling.full_http_info) if result is not None: headers_bin = result.headers.items_bin() diff --git a/src/hydrilla/proxy/csp.py b/src/hydrilla/proxy/csp.py index 8eb914f..df2f65b 100644 --- a/src/hydrilla/proxy/csp.py +++ b/src/hydrilla/proxy/csp.py @@ -38,33 +38,56 @@ from immutables import Map, MapMutation from . import http_messages -header_names_and_dispositions = ( - ('content-security-policy', 'enforce'), - ('content-security-policy-report-only', 'report'), - ('x-content-security-policy', 'enforce'), - ('x-content-security-policy', 'report'), - ('x-webkit-csp', 'enforce'), - ('x-webkit-csp', 'report') +enforce_header_names = ( + 'content-security-policy', + 'x-content-security-policy', + 'x-webkit-csp' ) -enforce_header_names_set = { - name for name, disposition in header_names_and_dispositions - if disposition == 'enforce' -} +header_names = (*enforce_header_names, 'content-security-policy-report-only') @dc.dataclass class ContentSecurityPolicy: directives: Map[str, t.Sequence[str]] - header_name: str - disposition: str + header_name: str = 'Content-Security-Policy' + disposition: str = 'enforce' - def serialize(self) -> str: - """....""" + def remove(self, directives: t.Sequence[str]) -> 'ContentSecurityPolicy': + mutation = self.directives.mutate() + + for name in directives: + mutation.pop(name, None) + + return dc.replace(self, directives = mutation.finish()) + + def extend(self, directives: t.Mapping[str, t.Sequence[str]]) \ + -> 'ContentSecurityPolicy': + mutation = self.directives.mutate() + + for name, extras in directives.items(): + if name in mutation: + mutation[name] = (*mutation[name], *extras) + + return dc.replace(self, directives = mutation.finish()) + + def serialize(self) -> tuple[str, str]: + """ + Produces (name, value) pair suitable for use as an HTTP header. + + If a deserialized policy is being reserialized, the resulting value is + not guaranteed to be the same as the original one. It shall be merely + semantically equivalent. + """ serialized_directives = [] - for name, value_list in self.directives.items(): - serialized_directives.append(f'{name} {" ".join(value_list)}') + for name, value_seq in self.directives.items(): + if all(val == "'none'" for val in value_seq): + value_seq = ["'none'"] + else: + value_seq = [val for val in value_seq if val != "'none'"] + + serialized_directives.append(f'{name} {" ".join(value_seq)}') - return ';'.join(serialized_directives) + return (self.header_name, ';'.join(serialized_directives)) @staticmethod def deserialize( @@ -72,7 +95,13 @@ class ContentSecurityPolicy: header_name: str, disposition: str = 'enforce' ) -> 'ContentSecurityPolicy': - """....""" + """ + Parses the policy as required by W3C Working Draft. + + Extra whitespace information, invalid/empty directives and the order of + directives are not preserved, only the semantically-relevant information + is. + """ # For more info, see: # https://www.w3.org/TR/CSP3/#parse-serialized-policy empty_directives: Map[str, t.Sequence[str]] = Map() @@ -104,21 +133,64 @@ class ContentSecurityPolicy: disposition = disposition ) -def extract(headers: http_messages.IHeaders) \ - -> tuple[ContentSecurityPolicy, ...]: - """....""" - csp_policies = [] - - for header_name, disposition in header_names_and_dispositions: - for serialized_list in headers.get_all(header_name): - for serialized in serialized_list.split(','): - policy = ContentSecurityPolicy.deserialize( - serialized, - header_name, - disposition - ) - - if policy.directives != Map(): - csp_policies.append(policy) - - return tuple(csp_policies) +# def extract(headers: http_messages.IHeaders) \ +# -> tuple[ContentSecurityPolicy, ...]: +# """....""" +# csp_policies = [] + +# for header_name, disposition in header_names_and_dispositions: +# for serialized_list in headers.get_all(header_name): +# for serialized in serialized_list.split(','): +# policy = ContentSecurityPolicy.deserialize( +# serialized, +# header_name, +# disposition +# ) + +# if policy.directives != Map(): +# csp_policies.append(policy) + +# return tuple(csp_policies) + +def modify( + headers: http_messages.IHeaders, + clear: t.Union[t.Sequence[str], t.Literal['all']] = (), + extend: t.Mapping[str, t.Sequence[str]] = Map(), + add: t.Mapping[str, t.Sequence[str]] = Map(), +) -> http_messages.IHeaders: + """ + This function modifies the CSP Headers. The following actions are performed + *in order* + 1. report-only CSP Headers are removed, + 2. directives with names in `clear` are removed, + 3. directives that could cause CSP reports to be sent are removed, + 4. directives from `add` are added in a separate Content-Security-Policy, + header. + 5. directives from `extend` are merged into the existing directives, + effectively loosening them, + + No measures are yet implemented to prevent fingerprinting when serving HTTP + responses with headers modified by this function. Please use wisely, you + have been warned. + """ + headers_list = [ + (key, val) + for key, val in headers.items() + if key.lower() not in header_names + ] + + if clear != 'all': + for name in header_names: + for serialized_list in headers.get_all(name): + for serialized in serialized_list.split(','): + policy = ContentSecurityPolicy.deserialize(serialized, name) + policy = policy.remove((*clear, 'report-to', 'report-uri')) + policy = policy.extend(extend) + if policy.directives != Map(): + headers_list.append(policy.serialize()) + + if add != Map(): + csp_to_add = ContentSecurityPolicy(Map(add)).extend(extend) + headers_list.append(csp_to_add.serialize()) + + return http_messages.make_headers(headers_list) diff --git a/src/hydrilla/proxy/http_messages.py b/src/hydrilla/proxy/http_messages.py index 1bed103..9aab510 100644 --- a/src/hydrilla/proxy/http_messages.py +++ b/src/hydrilla/proxy/http_messages.py @@ -195,6 +195,8 @@ class RequestInfo(HasHeadersMixin, _BaseRequestInfoFields): ) -> 'RequestInfo': return BodylessRequestInfo.make(url, method, headers).with_body(body) +AnyRequestInfo = t.Union[BodylessRequestInfo, RequestInfo] + @dc.dataclass(frozen=True) class _BaseResponseInfoFields: @@ -230,6 +232,14 @@ class ResponseInfo(HasHeadersMixin, _BaseResponseInfoFields): bl_info = BodylessResponseInfo.make(url, status_code, headers) return bl_info.with_body(body) +AnyResponseInfo = t.Union[BodylessResponseInfo, ResponseInfo] + + +@dc.dataclass(frozen=True) +class FullHTTPInfo: + request_info: RequestInfo + response_info: ResponseInfo + def is_likely_a_page( request_info: t.Union[BodylessRequestInfo, RequestInfo], diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py index 8ea792f..7ce8663 100644 --- a/src/hydrilla/proxy/policies/base.py +++ b/src/hydrilla/proxy/policies/base.py @@ -40,6 +40,7 @@ from immutables import Map from ... url_patterns import ParsedUrl from .. import state from .. import http_messages +from .. import csp class PolicyPriority(int, enum.Enum): @@ -53,6 +54,15 @@ MessageInfo = t.Union[ http_messages.ResponseInfo ] + +UTF8_BOM = b'\xEF\xBB\xBF' +BOMs = ( + (UTF8_BOM, 'utf-8'), + (b'\xFE\xFF', 'utf-16be'), + (b'\xFF\xFE', 'utf-16le') +) + + class Policy(ABC): """....""" _process_request: t.ClassVar[bool] = False @@ -70,23 +80,111 @@ class Policy(ABC): def should_process_response( self, request_info: http_messages.RequestInfo, - response_info: http_messages.BodylessResponseInfo + response_info: http_messages.AnyResponseInfo ) -> bool: return self._process_response + def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Union[t.Sequence[str], t.Literal['all']]: + return () + + def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Mapping[str, t.Sequence[str]]: + return Map() + + def _csp_to_extend(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Mapping[str, t.Sequence[str]]: + return Map() + + def _modify_response_headers(self, http_info: http_messages.FullHTTPInfo) \ + -> http_messages.IHeaders: + csp_to_clear = self._csp_to_clear(http_info) + csp_to_add = self._csp_to_add(http_info) + csp_to_extend = self._csp_to_extend(http_info) + + if len(csp_to_clear) + len(csp_to_extend) + len(csp_to_add) == 0: + return http_info.response_info.headers + + return csp.modify( + headers = http_info.response_info.headers, + clear = csp_to_clear, + add = csp_to_add, + extend = csp_to_extend + ) + + def _modify_response_document( + self, + http_info: http_messages.FullHTTPInfo, + encoding: t.Optional[str] + ) -> t.Union[str, bytes]: + return http_info.response_info.body + + def _modify_response_body(self, http_info: http_messages.FullHTTPInfo) \ + -> bytes: + if not http_messages.is_likely_a_page( + request_info = http_info.request_info, + response_info = http_info.response_info + ): + return http_info.response_info.body + + data = http_info.response_info.body + + _, encoding = http_info.response_info.deduce_content_type() + + # A UTF BOM overrides encoding specified by the header. + for bom, encoding_name in BOMs: + if data.startswith(bom): + encoding = encoding_name + + new_data = self._modify_response_document(http_info, encoding) + + if isinstance(new_data, str): + # Appending a three-byte Byte Order Mark (BOM) will force the + # browser to decode this as UTF-8 regardless of the 'Content-Type' + # header. See + # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence + new_data = UTF8_BOM + new_data.encode() + + return new_data + def consume_request(self, request_info: http_messages.RequestInfo) \ -> t.Optional[MessageInfo]: + # We're not using @abstractmethod because not every Policy needs it and + # we don't want to force child classes into implementing dummy methods. raise NotImplementedError( 'This kind of policy does not consume requests.' ) - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> t.Optional[http_messages.ResponseInfo]: - raise NotImplementedError( - 'This kind of policy does not consume responses.' + def consume_response(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Optional[http_messages.ResponseInfo]: + try: + new_headers = self._modify_response_headers(http_info) + new_body = self._modify_response_body(http_info) + except Exception as e: + # In the future we might want to actually describe eventual errors. + # For now, we're just printing the stack trace. + import traceback + + error_info_list = traceback.format_exception( + type(e), + e, + e.__traceback__ + ) + + return http_messages.ResponseInfo.make( + status_code = 500, + headers = (('Content-Type', 'text/plain; charset=utf-8'),), + body = '\n'.join(error_info_list).encode() + ) + + if (new_headers is http_info.response_info.headers and + new_body is http_info.response_info.body): + return None + + return dc.replace( + http_info.response_info, + headers = new_headers, + body = new_body ) diff --git a/src/hydrilla/proxy/policies/misc.py b/src/hydrilla/proxy/policies/misc.py index 81875a2..acce164 100644 --- a/src/hydrilla/proxy/policies/misc.py +++ b/src/hydrilla/proxy/policies/misc.py @@ -56,8 +56,6 @@ class ErrorBlockPolicy(BlockPolicy): """....""" error: Exception - builtin: bool = True - class MitmItPagePolicy(base.Policy): """ @@ -74,15 +72,9 @@ class MitmItPagePolicy(base.Policy): def consume_request(self, request_info: http_messages.RequestInfo) -> None: return None - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> None: + def consume_response(self, http_info: http_messages.FullHTTPInfo) -> None: return None - builtin: bool = True - @dc.dataclass(frozen=True, unsafe_hash=True) class MitmItPagePolicyFactory(base.PolicyFactory): builtin: bool = True diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py index b89a1c1..8aaf845 100644 --- a/src/hydrilla/proxy/policies/payload.py +++ b/src/hydrilla/proxy/policies/payload.py @@ -49,8 +49,7 @@ class PayloadAwarePolicy(base.Policy): """....""" payload_data: state.PayloadData - def assets_base_url(self, request_url: ParsedUrl): - """....""" + def _assets_base_url(self, request_url: ParsedUrl): token = self.payload_data.unique_token base_path_segments = (*self.payload_data.pattern_path_segments, token) @@ -90,13 +89,6 @@ class PayloadAwarePolicyFactory(base.PolicyFactory): return super().__lt__(other) -UTF8_BOM = b'\xEF\xBB\xBF' -BOMs = ( - (UTF8_BOM, 'utf-8'), - (b'\xFE\xFF', 'utf-16be'), - (b'\xFF\xFE', 'utf-16le') -) - def block_attr(element: bs4.PageElement, attr_name: str) -> None: """ Disable HTML node attributes by prepending `blocked-'. This allows them to @@ -118,37 +110,25 @@ class PayloadInjectPolicy(PayloadAwarePolicy): priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO - def _new_csp(self, request_url: ParsedUrl) -> str: - """....""" - assets_base = self.assets_base_url(request_url) + def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Sequence[str]: + return ['script-src'] - script_src = f"script-src {assets_base}" + def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Mapping[str, t.Sequence[str]]: + allowed_origins = [self._assets_base_url(http_info.request_info.url)] if self.payload_data.eval_allowed: - script_src = f"{script_src} 'unsafe-eval'" - - return '; '.join(( - script_src, - "script-src-elem 'none'", - "script-src-attr 'none'" - )) - - def _modify_headers(self, response_info: http_messages.ResponseInfo) \ - -> http_messages.IHeaders: - new_headers = [] - - for key, val in response_info.headers.items(): - if key.lower() not in csp.header_names_and_dispositions: - new_headers.append((key, val)) - - new_csp = self._new_csp(response_info.url) - new_headers.append(('Content-Security-Policy', new_csp)) + allowed_origins.append("'unsafe-eval'") - return http_messages.make_headers(new_headers) + return { + 'script-src': allowed_origins, + 'script-src-elem': ["'none'"], + 'script-src-attr': ["'none'"] + } def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]: - """....""" - base_url = self.assets_base_url(url) + base_url = self._assets_base_url(url) payload_ref = self.payload_data.ref yield base_url + 'api/page_init_script.js' @@ -156,15 +136,13 @@ class PayloadInjectPolicy(PayloadAwarePolicy): for path in payload_ref.get_script_paths(): yield base_url + '/'.join(('static', *path)) - def _modify_body( + def _modify_response_document( self, - url: ParsedUrl, - body: bytes, - encoding: t.Optional[str] - ) -> bytes: - """....""" + http_info: http_messages.FullHTTPInfo, + encoding: t.Optional[str] + ) -> t.Union[bytes, str]: soup = bs4.BeautifulSoup( - markup = body, + markup = http_info.response_info.body, from_encoding = encoding, features = 'html5lib' ) @@ -172,9 +150,9 @@ class PayloadInjectPolicy(PayloadAwarePolicy): # Inject scripts. script_parent = soup.find('body') or soup.find('html') if script_parent is None: - return body + return http_info.response_info.body - for script_url in self._script_urls(url): + for script_url in self._script_urls(http_info.request_info.url): tag = bs4.Tag(name='script', attrs={'src': script_url}) script_parent.append(tag) @@ -182,61 +160,11 @@ class PayloadInjectPolicy(PayloadAwarePolicy): # scripts. for meta in soup.select('head meta[http-equiv]'): header_name = meta.attrs.get('http-equiv', '').lower().strip() - if header_name in csp.enforce_header_names_set: + if header_name in csp.enforce_header_names: block_attr(meta, 'http-equiv') block_attr(meta, 'content') - # Appending a three-byte Byte Order Mark (BOM) will force the browser to - # decode this as UTF-8 regardless of the 'Content-Type' header. See: - # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence - return UTF8_BOM + soup.encode() - - def _consume_response_unsafe( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: - new_headers = self._modify_headers(response_info) - new_response = dc.replace(response_info, headers=new_headers) - - if not http_messages.is_likely_a_page(request_info, response_info): - return new_response - - data = response_info.body - - _, encoding = response_info.deduce_content_type() - - # A UTF BOM overrides encoding specified by the header. - for bom, encoding_name in BOMs: - if data.startswith(bom): - encoding = encoding_name - - new_data = self._modify_body(response_info.url, data, encoding) - - return dc.replace(new_response, body=new_data) - - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: - try: - return self._consume_response_unsafe(request_info, response_info) - except Exception as e: - # TODO: actually describe the errors - import traceback - - error_info_list = traceback.format_exception( - type(e), - e, - e.__traceback__ - ) - - return http_messages.ResponseInfo.make( - status_code = 500, - headers = (('Content-Type', 'text/plain; charset=utf-8'),), - body = '\n'.join(error_info_list).encode() - ) + return soup.decode() class _PayloadHasProblemsError(HaketiloException): @@ -246,22 +174,19 @@ class AutoPayloadInjectPolicy(PayloadInjectPolicy): """....""" priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: + def consume_response(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Optional[http_messages.ResponseInfo]: try: if self.payload_data.ref.has_problems(): raise _PayloadHasProblemsError() self.payload_data.ref.ensure_items_installed() - return super().consume_response(request_info, response_info) + return super().consume_response(http_info) except (state.RepoCommunicationError, state.FileInstallationError, _PayloadHasProblemsError) as ex: extra_params: dict[str, str] = { - 'next_url': response_info.url.orig_url + 'next_url': http_info.response_info.url.orig_url } if isinstance(ex, state.FileInstallationError): extra_params['repo_id'] = ex.repo_id diff --git a/src/hydrilla/proxy/policies/payload_resource.py b/src/hydrilla/proxy/policies/payload_resource.py index 04a148c..6695ce1 100644 --- a/src/hydrilla/proxy/policies/payload_resource.py +++ b/src/hydrilla/proxy/policies/payload_resource.py @@ -245,7 +245,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy): def should_process_response( self, request_info: http_messages.RequestInfo, - response_info: http_messages.BodylessResponseInfo + response_info: http_messages.AnyResponseInfo ) -> bool: return self.extract_resource_path(request_info.url) \ == ('api', 'unrestricted_http') @@ -279,7 +279,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy): with jinja_lock: template = jinja_env.get_template('page_init_script.js.jinja') token = self.payload_data.unique_token - base_url = self.assets_base_url(request_info.url) + base_url = self._assets_base_url(request_info.url) ver_str = json.dumps(haketilo_version) js = template.render( unique_token_encoded = encode_string_for_js(token), @@ -338,23 +338,22 @@ class PayloadResourcePolicy(PayloadAwarePolicy): else: return resource_blocked_response - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: + def consume_response(self, http_info: http_messages.FullHTTPInfo) \ + -> http_messages.ResponseInfo: """ This method shall only be called for responses to unrestricted HTTP API requests. Its purpose is to sanitize response headers and smuggle their original data using an additional header. """ - serialized = json.dumps([*response_info.headers.items()]) + serialized = json.dumps([*http_info.response_info.headers.items()]) extra_headers = [('X-Haketilo-True-Headers', quote(serialized)),] - if (300 <= response_info.status_code < 400): - location = response_info.headers.get('location') + # Greetings, adventurous code dweller! It's amazing you made it that + # deep. I hope you're having a good day. If not, read Isaiah 49:15 :) + if (300 <= http_info.response_info.status_code < 400): + location = http_info.response_info.headers.get('location') if location is not None: - orig_params = parse_qs(request_info.url.query) + orig_params = parse_qs(http_info.request_info.url.query) orig_extra_headers_str, = orig_params['extra_headers'] new_query = urlencode({ @@ -362,20 +361,17 @@ class PayloadResourcePolicy(PayloadAwarePolicy): 'extra_headers': orig_extra_headers_str }) - new_url = urljoin(request_info.url.orig_url, '?' + new_query) + orig_url = http_info.request_info.url.orig_url + new_url = urljoin(orig_url, '?' + new_query) extra_headers.append(('location', new_url)) merged_headers = merge_response_headers( - native_headers = response_info.headers, + native_headers = http_info.response_info.headers, extra_headers = extra_headers ) - return http_messages.ResponseInfo.make( - status_code = response_info.status_code, - headers = merged_headers, - body = response_info.body, - ) + return dc.replace(http_info.response_info, headers=merged_headers) resource_blocked_response = http_messages.ResponseInfo.make( diff --git a/src/hydrilla/proxy/policies/rule.py b/src/hydrilla/proxy/policies/rule.py index 8272d2f..c62f473 100644 --- a/src/hydrilla/proxy/policies/rule.py +++ b/src/hydrilla/proxy/policies/rule.py @@ -43,53 +43,23 @@ class AllowPolicy(base.Policy): """....""" priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO + +script_csp_directives = ('script-src', 'script-src-elem', 'script-src-attr') + class BlockPolicy(base.Policy): """....""" _process_response: t.ClassVar[bool] = True priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO - def _modify_headers(self, response_info: http_messages.ResponseInfo) \ - -> http_messages.IHeaders: - new_headers = [] - - csp_policies = csp.extract(response_info.headers) - - for key, val in response_info.headers.items(): - if key.lower() not in csp.header_names_and_dispositions: - new_headers.append((key, val)) - - for policy in csp_policies: - if policy.disposition != 'enforce': - continue - - directives = policy.directives.mutate() - directives.pop('report-to', None) - directives.pop('report-uri', None) - - policy = dc.replace(policy, directives=directives.finish()) - - new_headers.append((policy.header_name, policy.serialize())) - - extra_csp = ';'.join(( - "script-src 'none'", - "script-src-elem 'none'", - "script-src-attr 'none'" - )) - - new_headers.append(('Content-Security-Policy', extra_csp)) - - return http_messages.make_headers(new_headers) - + def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Sequence[str]: + return script_csp_directives - def consume_response( - self, - request_info: http_messages.RequestInfo, - response_info: http_messages.ResponseInfo - ) -> http_messages.ResponseInfo: - new_headers = self._modify_headers(response_info) + def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \ + -> t.Mapping[str, t.Sequence[str]]: + return dict((d, ["'none'"]) for d in script_csp_directives) - return dc.replace(response_info, headers=new_headers) @dc.dataclass(frozen=True) class RuleAllowPolicy(AllowPolicy): diff --git a/src/hydrilla/proxy/state_impl/base.py b/src/hydrilla/proxy/state_impl/base.py index df3287b..7437d52 100644 --- a/src/hydrilla/proxy/state_impl/base.py +++ b/src/hydrilla/proxy/state_impl/base.py @@ -210,10 +210,7 @@ class HaketiloStateWithFields(st.HaketiloState): best_priority = policy.priority best_policy = policy except Exception as e: - return policies.ErrorBlockPolicy( - builtin = True, - error = e - ) + return policies.ErrorBlockPolicy(error=e) if best_policy is not None: return best_policy |