aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-10-21 18:29:43 +0200
committerWojtek Kosior <koszko@koszko.org>2022-10-21 18:29:43 +0200
commit85a0cacb28b84005d5d034a53973153d49214723 (patch)
tree3839b70f196d28ef9db21add79be633f3847ffae /src/hydrilla/proxy
parent3360fff8b25a60cc1f378b80692d2162e70dc142 (diff)
downloadhaketilo-hydrilla-85a0cacb28b84005d5d034a53973153d49214723.tar.gz
haketilo-hydrilla-85a0cacb28b84005d5d034a53973153d49214723.zip
[proxy] rework CSP manipulation
Diffstat (limited to 'src/hydrilla/proxy')
-rw-r--r--src/hydrilla/proxy/addon.py11
-rw-r--r--src/hydrilla/proxy/csp.py146
-rw-r--r--src/hydrilla/proxy/http_messages.py10
-rw-r--r--src/hydrilla/proxy/policies/base.py114
-rw-r--r--src/hydrilla/proxy/policies/misc.py10
-rw-r--r--src/hydrilla/proxy/policies/payload.py129
-rw-r--r--src/hydrilla/proxy/policies/payload_resource.py32
-rw-r--r--src/hydrilla/proxy/policies/rule.py48
-rw-r--r--src/hydrilla/proxy/state_impl/base.py5
9 files changed, 283 insertions, 222 deletions
diff --git a/src/hydrilla/proxy/addon.py b/src/hydrilla/proxy/addon.py
index de864fc..d5b0537 100644
--- a/src/hydrilla/proxy/addon.py
+++ b/src/hydrilla/proxy/addon.py
@@ -105,6 +105,10 @@ class FlowHandling:
body = self.flow.response.get_content(strict=False) or b''
return self.bl_response_info.with_body(body)
+ @property
+ def full_http_info(self) -> http_messages.FullHTTPInfo:
+ return http_messages.FullHTTPInfo(self.request_info, self.response_info)
+
@staticmethod
def make(
flow: http.HTTPFlow,
@@ -234,7 +238,7 @@ class HaketiloAddon:
try:
parsed_url = url_patterns.parse_url(flow.request.url)
except url_patterns.HaketiloURLException as e:
- policy = policies.ErrorBlockPolicy(builtin=True, error=e)
+ policy = policies.ErrorBlockPolicy(error=e)
parsed_url = url_patterns.dummy_url
else:
policy = self.state.select_policy(parsed_url)
@@ -338,10 +342,7 @@ class HaketiloAddon:
with self.http_safe_event_handling(flow):
handling = self.get_flow_handling(flow)
- result = handling.policy.consume_response(
- request_info = handling.request_info,
- response_info = handling.response_info
- )
+ result = handling.policy.consume_response(handling.full_http_info)
if result is not None:
headers_bin = result.headers.items_bin()
diff --git a/src/hydrilla/proxy/csp.py b/src/hydrilla/proxy/csp.py
index 8eb914f..df2f65b 100644
--- a/src/hydrilla/proxy/csp.py
+++ b/src/hydrilla/proxy/csp.py
@@ -38,33 +38,56 @@ from immutables import Map, MapMutation
from . import http_messages
-header_names_and_dispositions = (
- ('content-security-policy', 'enforce'),
- ('content-security-policy-report-only', 'report'),
- ('x-content-security-policy', 'enforce'),
- ('x-content-security-policy', 'report'),
- ('x-webkit-csp', 'enforce'),
- ('x-webkit-csp', 'report')
+enforce_header_names = (
+ 'content-security-policy',
+ 'x-content-security-policy',
+ 'x-webkit-csp'
)
-enforce_header_names_set = {
- name for name, disposition in header_names_and_dispositions
- if disposition == 'enforce'
-}
+header_names = (*enforce_header_names, 'content-security-policy-report-only')
@dc.dataclass
class ContentSecurityPolicy:
directives: Map[str, t.Sequence[str]]
- header_name: str
- disposition: str
+ header_name: str = 'Content-Security-Policy'
+ disposition: str = 'enforce'
- def serialize(self) -> str:
- """...."""
+ def remove(self, directives: t.Sequence[str]) -> 'ContentSecurityPolicy':
+ mutation = self.directives.mutate()
+
+ for name in directives:
+ mutation.pop(name, None)
+
+ return dc.replace(self, directives = mutation.finish())
+
+ def extend(self, directives: t.Mapping[str, t.Sequence[str]]) \
+ -> 'ContentSecurityPolicy':
+ mutation = self.directives.mutate()
+
+ for name, extras in directives.items():
+ if name in mutation:
+ mutation[name] = (*mutation[name], *extras)
+
+ return dc.replace(self, directives = mutation.finish())
+
+ def serialize(self) -> tuple[str, str]:
+ """
+ Produces (name, value) pair suitable for use as an HTTP header.
+
+ If a deserialized policy is being reserialized, the resulting value is
+ not guaranteed to be the same as the original one. It shall be merely
+ semantically equivalent.
+ """
serialized_directives = []
- for name, value_list in self.directives.items():
- serialized_directives.append(f'{name} {" ".join(value_list)}')
+ for name, value_seq in self.directives.items():
+ if all(val == "'none'" for val in value_seq):
+ value_seq = ["'none'"]
+ else:
+ value_seq = [val for val in value_seq if val != "'none'"]
+
+ serialized_directives.append(f'{name} {" ".join(value_seq)}')
- return ';'.join(serialized_directives)
+ return (self.header_name, ';'.join(serialized_directives))
@staticmethod
def deserialize(
@@ -72,7 +95,13 @@ class ContentSecurityPolicy:
header_name: str,
disposition: str = 'enforce'
) -> 'ContentSecurityPolicy':
- """...."""
+ """
+ Parses the policy as required by W3C Working Draft.
+
+ Extra whitespace information, invalid/empty directives and the order of
+ directives are not preserved, only the semantically-relevant information
+ is.
+ """
# For more info, see:
# https://www.w3.org/TR/CSP3/#parse-serialized-policy
empty_directives: Map[str, t.Sequence[str]] = Map()
@@ -104,21 +133,64 @@ class ContentSecurityPolicy:
disposition = disposition
)
-def extract(headers: http_messages.IHeaders) \
- -> tuple[ContentSecurityPolicy, ...]:
- """...."""
- csp_policies = []
-
- for header_name, disposition in header_names_and_dispositions:
- for serialized_list in headers.get_all(header_name):
- for serialized in serialized_list.split(','):
- policy = ContentSecurityPolicy.deserialize(
- serialized,
- header_name,
- disposition
- )
-
- if policy.directives != Map():
- csp_policies.append(policy)
-
- return tuple(csp_policies)
+# def extract(headers: http_messages.IHeaders) \
+# -> tuple[ContentSecurityPolicy, ...]:
+# """...."""
+# csp_policies = []
+
+# for header_name, disposition in header_names_and_dispositions:
+# for serialized_list in headers.get_all(header_name):
+# for serialized in serialized_list.split(','):
+# policy = ContentSecurityPolicy.deserialize(
+# serialized,
+# header_name,
+# disposition
+# )
+
+# if policy.directives != Map():
+# csp_policies.append(policy)
+
+# return tuple(csp_policies)
+
+def modify(
+ headers: http_messages.IHeaders,
+ clear: t.Union[t.Sequence[str], t.Literal['all']] = (),
+ extend: t.Mapping[str, t.Sequence[str]] = Map(),
+ add: t.Mapping[str, t.Sequence[str]] = Map(),
+) -> http_messages.IHeaders:
+ """
+ This function modifies the CSP Headers. The following actions are performed
+ *in order*
+ 1. report-only CSP Headers are removed,
+ 2. directives with names in `clear` are removed,
+ 3. directives that could cause CSP reports to be sent are removed,
+ 4. directives from `add` are added in a separate Content-Security-Policy,
+ header.
+ 5. directives from `extend` are merged into the existing directives,
+ effectively loosening them,
+
+ No measures are yet implemented to prevent fingerprinting when serving HTTP
+ responses with headers modified by this function. Please use wisely, you
+ have been warned.
+ """
+ headers_list = [
+ (key, val)
+ for key, val in headers.items()
+ if key.lower() not in header_names
+ ]
+
+ if clear != 'all':
+ for name in header_names:
+ for serialized_list in headers.get_all(name):
+ for serialized in serialized_list.split(','):
+ policy = ContentSecurityPolicy.deserialize(serialized, name)
+ policy = policy.remove((*clear, 'report-to', 'report-uri'))
+ policy = policy.extend(extend)
+ if policy.directives != Map():
+ headers_list.append(policy.serialize())
+
+ if add != Map():
+ csp_to_add = ContentSecurityPolicy(Map(add)).extend(extend)
+ headers_list.append(csp_to_add.serialize())
+
+ return http_messages.make_headers(headers_list)
diff --git a/src/hydrilla/proxy/http_messages.py b/src/hydrilla/proxy/http_messages.py
index 1bed103..9aab510 100644
--- a/src/hydrilla/proxy/http_messages.py
+++ b/src/hydrilla/proxy/http_messages.py
@@ -195,6 +195,8 @@ class RequestInfo(HasHeadersMixin, _BaseRequestInfoFields):
) -> 'RequestInfo':
return BodylessRequestInfo.make(url, method, headers).with_body(body)
+AnyRequestInfo = t.Union[BodylessRequestInfo, RequestInfo]
+
@dc.dataclass(frozen=True)
class _BaseResponseInfoFields:
@@ -230,6 +232,14 @@ class ResponseInfo(HasHeadersMixin, _BaseResponseInfoFields):
bl_info = BodylessResponseInfo.make(url, status_code, headers)
return bl_info.with_body(body)
+AnyResponseInfo = t.Union[BodylessResponseInfo, ResponseInfo]
+
+
+@dc.dataclass(frozen=True)
+class FullHTTPInfo:
+ request_info: RequestInfo
+ response_info: ResponseInfo
+
def is_likely_a_page(
request_info: t.Union[BodylessRequestInfo, RequestInfo],
diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py
index 8ea792f..7ce8663 100644
--- a/src/hydrilla/proxy/policies/base.py
+++ b/src/hydrilla/proxy/policies/base.py
@@ -40,6 +40,7 @@ from immutables import Map
from ... url_patterns import ParsedUrl
from .. import state
from .. import http_messages
+from .. import csp
class PolicyPriority(int, enum.Enum):
@@ -53,6 +54,15 @@ MessageInfo = t.Union[
http_messages.ResponseInfo
]
+
+UTF8_BOM = b'\xEF\xBB\xBF'
+BOMs = (
+ (UTF8_BOM, 'utf-8'),
+ (b'\xFE\xFF', 'utf-16be'),
+ (b'\xFF\xFE', 'utf-16le')
+)
+
+
class Policy(ABC):
"""...."""
_process_request: t.ClassVar[bool] = False
@@ -70,23 +80,111 @@ class Policy(ABC):
def should_process_response(
self,
request_info: http_messages.RequestInfo,
- response_info: http_messages.BodylessResponseInfo
+ response_info: http_messages.AnyResponseInfo
) -> bool:
return self._process_response
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Union[t.Sequence[str], t.Literal['all']]:
+ return ()
+
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ return Map()
+
+ def _csp_to_extend(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ return Map()
+
+ def _modify_response_headers(self, http_info: http_messages.FullHTTPInfo) \
+ -> http_messages.IHeaders:
+ csp_to_clear = self._csp_to_clear(http_info)
+ csp_to_add = self._csp_to_add(http_info)
+ csp_to_extend = self._csp_to_extend(http_info)
+
+ if len(csp_to_clear) + len(csp_to_extend) + len(csp_to_add) == 0:
+ return http_info.response_info.headers
+
+ return csp.modify(
+ headers = http_info.response_info.headers,
+ clear = csp_to_clear,
+ add = csp_to_add,
+ extend = csp_to_extend
+ )
+
+ def _modify_response_document(
+ self,
+ http_info: http_messages.FullHTTPInfo,
+ encoding: t.Optional[str]
+ ) -> t.Union[str, bytes]:
+ return http_info.response_info.body
+
+ def _modify_response_body(self, http_info: http_messages.FullHTTPInfo) \
+ -> bytes:
+ if not http_messages.is_likely_a_page(
+ request_info = http_info.request_info,
+ response_info = http_info.response_info
+ ):
+ return http_info.response_info.body
+
+ data = http_info.response_info.body
+
+ _, encoding = http_info.response_info.deduce_content_type()
+
+ # A UTF BOM overrides encoding specified by the header.
+ for bom, encoding_name in BOMs:
+ if data.startswith(bom):
+ encoding = encoding_name
+
+ new_data = self._modify_response_document(http_info, encoding)
+
+ if isinstance(new_data, str):
+ # Appending a three-byte Byte Order Mark (BOM) will force the
+ # browser to decode this as UTF-8 regardless of the 'Content-Type'
+ # header. See
+ # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
+ new_data = UTF8_BOM + new_data.encode()
+
+ return new_data
+
def consume_request(self, request_info: http_messages.RequestInfo) \
-> t.Optional[MessageInfo]:
+ # We're not using @abstractmethod because not every Policy needs it and
+ # we don't want to force child classes into implementing dummy methods.
raise NotImplementedError(
'This kind of policy does not consume requests.'
)
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> t.Optional[http_messages.ResponseInfo]:
- raise NotImplementedError(
- 'This kind of policy does not consume responses.'
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[http_messages.ResponseInfo]:
+ try:
+ new_headers = self._modify_response_headers(http_info)
+ new_body = self._modify_response_body(http_info)
+ except Exception as e:
+ # In the future we might want to actually describe eventual errors.
+ # For now, we're just printing the stack trace.
+ import traceback
+
+ error_info_list = traceback.format_exception(
+ type(e),
+ e,
+ e.__traceback__
+ )
+
+ return http_messages.ResponseInfo.make(
+ status_code = 500,
+ headers = (('Content-Type', 'text/plain; charset=utf-8'),),
+ body = '\n'.join(error_info_list).encode()
+ )
+
+ if (new_headers is http_info.response_info.headers and
+ new_body is http_info.response_info.body):
+ return None
+
+ return dc.replace(
+ http_info.response_info,
+ headers = new_headers,
+ body = new_body
)
diff --git a/src/hydrilla/proxy/policies/misc.py b/src/hydrilla/proxy/policies/misc.py
index 81875a2..acce164 100644
--- a/src/hydrilla/proxy/policies/misc.py
+++ b/src/hydrilla/proxy/policies/misc.py
@@ -56,8 +56,6 @@ class ErrorBlockPolicy(BlockPolicy):
"""...."""
error: Exception
- builtin: bool = True
-
class MitmItPagePolicy(base.Policy):
"""
@@ -74,15 +72,9 @@ class MitmItPagePolicy(base.Policy):
def consume_request(self, request_info: http_messages.RequestInfo) -> None:
return None
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> None:
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) -> None:
return None
- builtin: bool = True
-
@dc.dataclass(frozen=True, unsafe_hash=True)
class MitmItPagePolicyFactory(base.PolicyFactory):
builtin: bool = True
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py
index b89a1c1..8aaf845 100644
--- a/src/hydrilla/proxy/policies/payload.py
+++ b/src/hydrilla/proxy/policies/payload.py
@@ -49,8 +49,7 @@ class PayloadAwarePolicy(base.Policy):
"""...."""
payload_data: state.PayloadData
- def assets_base_url(self, request_url: ParsedUrl):
- """...."""
+ def _assets_base_url(self, request_url: ParsedUrl):
token = self.payload_data.unique_token
base_path_segments = (*self.payload_data.pattern_path_segments, token)
@@ -90,13 +89,6 @@ class PayloadAwarePolicyFactory(base.PolicyFactory):
return super().__lt__(other)
-UTF8_BOM = b'\xEF\xBB\xBF'
-BOMs = (
- (UTF8_BOM, 'utf-8'),
- (b'\xFE\xFF', 'utf-16be'),
- (b'\xFF\xFE', 'utf-16le')
-)
-
def block_attr(element: bs4.PageElement, attr_name: str) -> None:
"""
Disable HTML node attributes by prepending `blocked-'. This allows them to
@@ -118,37 +110,25 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
- def _new_csp(self, request_url: ParsedUrl) -> str:
- """...."""
- assets_base = self.assets_base_url(request_url)
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Sequence[str]:
+ return ['script-src']
- script_src = f"script-src {assets_base}"
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ allowed_origins = [self._assets_base_url(http_info.request_info.url)]
if self.payload_data.eval_allowed:
- script_src = f"{script_src} 'unsafe-eval'"
-
- return '; '.join((
- script_src,
- "script-src-elem 'none'",
- "script-src-attr 'none'"
- ))
-
- def _modify_headers(self, response_info: http_messages.ResponseInfo) \
- -> http_messages.IHeaders:
- new_headers = []
-
- for key, val in response_info.headers.items():
- if key.lower() not in csp.header_names_and_dispositions:
- new_headers.append((key, val))
-
- new_csp = self._new_csp(response_info.url)
- new_headers.append(('Content-Security-Policy', new_csp))
+ allowed_origins.append("'unsafe-eval'")
- return http_messages.make_headers(new_headers)
+ return {
+ 'script-src': allowed_origins,
+ 'script-src-elem': ["'none'"],
+ 'script-src-attr': ["'none'"]
+ }
def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]:
- """...."""
- base_url = self.assets_base_url(url)
+ base_url = self._assets_base_url(url)
payload_ref = self.payload_data.ref
yield base_url + 'api/page_init_script.js'
@@ -156,15 +136,13 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
for path in payload_ref.get_script_paths():
yield base_url + '/'.join(('static', *path))
- def _modify_body(
+ def _modify_response_document(
self,
- url: ParsedUrl,
- body: bytes,
- encoding: t.Optional[str]
- ) -> bytes:
- """...."""
+ http_info: http_messages.FullHTTPInfo,
+ encoding: t.Optional[str]
+ ) -> t.Union[bytes, str]:
soup = bs4.BeautifulSoup(
- markup = body,
+ markup = http_info.response_info.body,
from_encoding = encoding,
features = 'html5lib'
)
@@ -172,9 +150,9 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
# Inject scripts.
script_parent = soup.find('body') or soup.find('html')
if script_parent is None:
- return body
+ return http_info.response_info.body
- for script_url in self._script_urls(url):
+ for script_url in self._script_urls(http_info.request_info.url):
tag = bs4.Tag(name='script', attrs={'src': script_url})
script_parent.append(tag)
@@ -182,61 +160,11 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
# scripts.
for meta in soup.select('head meta[http-equiv]'):
header_name = meta.attrs.get('http-equiv', '').lower().strip()
- if header_name in csp.enforce_header_names_set:
+ if header_name in csp.enforce_header_names:
block_attr(meta, 'http-equiv')
block_attr(meta, 'content')
- # Appending a three-byte Byte Order Mark (BOM) will force the browser to
- # decode this as UTF-8 regardless of the 'Content-Type' header. See:
- # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
- return UTF8_BOM + soup.encode()
-
- def _consume_response_unsafe(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- new_headers = self._modify_headers(response_info)
- new_response = dc.replace(response_info, headers=new_headers)
-
- if not http_messages.is_likely_a_page(request_info, response_info):
- return new_response
-
- data = response_info.body
-
- _, encoding = response_info.deduce_content_type()
-
- # A UTF BOM overrides encoding specified by the header.
- for bom, encoding_name in BOMs:
- if data.startswith(bom):
- encoding = encoding_name
-
- new_data = self._modify_body(response_info.url, data, encoding)
-
- return dc.replace(new_response, body=new_data)
-
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- try:
- return self._consume_response_unsafe(request_info, response_info)
- except Exception as e:
- # TODO: actually describe the errors
- import traceback
-
- error_info_list = traceback.format_exception(
- type(e),
- e,
- e.__traceback__
- )
-
- return http_messages.ResponseInfo.make(
- status_code = 500,
- headers = (('Content-Type', 'text/plain; charset=utf-8'),),
- body = '\n'.join(error_info_list).encode()
- )
+ return soup.decode()
class _PayloadHasProblemsError(HaketiloException):
@@ -246,22 +174,19 @@ class AutoPayloadInjectPolicy(PayloadInjectPolicy):
"""...."""
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[http_messages.ResponseInfo]:
try:
if self.payload_data.ref.has_problems():
raise _PayloadHasProblemsError()
self.payload_data.ref.ensure_items_installed()
- return super().consume_response(request_info, response_info)
+ return super().consume_response(http_info)
except (state.RepoCommunicationError, state.FileInstallationError,
_PayloadHasProblemsError) as ex:
extra_params: dict[str, str] = {
- 'next_url': response_info.url.orig_url
+ 'next_url': http_info.response_info.url.orig_url
}
if isinstance(ex, state.FileInstallationError):
extra_params['repo_id'] = ex.repo_id
diff --git a/src/hydrilla/proxy/policies/payload_resource.py b/src/hydrilla/proxy/policies/payload_resource.py
index 04a148c..6695ce1 100644
--- a/src/hydrilla/proxy/policies/payload_resource.py
+++ b/src/hydrilla/proxy/policies/payload_resource.py
@@ -245,7 +245,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
def should_process_response(
self,
request_info: http_messages.RequestInfo,
- response_info: http_messages.BodylessResponseInfo
+ response_info: http_messages.AnyResponseInfo
) -> bool:
return self.extract_resource_path(request_info.url) \
== ('api', 'unrestricted_http')
@@ -279,7 +279,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
with jinja_lock:
template = jinja_env.get_template('page_init_script.js.jinja')
token = self.payload_data.unique_token
- base_url = self.assets_base_url(request_info.url)
+ base_url = self._assets_base_url(request_info.url)
ver_str = json.dumps(haketilo_version)
js = template.render(
unique_token_encoded = encode_string_for_js(token),
@@ -338,23 +338,22 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
else:
return resource_blocked_response
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> http_messages.ResponseInfo:
"""
This method shall only be called for responses to unrestricted HTTP API
requests. Its purpose is to sanitize response headers and smuggle their
original data using an additional header.
"""
- serialized = json.dumps([*response_info.headers.items()])
+ serialized = json.dumps([*http_info.response_info.headers.items()])
extra_headers = [('X-Haketilo-True-Headers', quote(serialized)),]
- if (300 <= response_info.status_code < 400):
- location = response_info.headers.get('location')
+ # Greetings, adventurous code dweller! It's amazing you made it that
+ # deep. I hope you're having a good day. If not, read Isaiah 49:15 :)
+ if (300 <= http_info.response_info.status_code < 400):
+ location = http_info.response_info.headers.get('location')
if location is not None:
- orig_params = parse_qs(request_info.url.query)
+ orig_params = parse_qs(http_info.request_info.url.query)
orig_extra_headers_str, = orig_params['extra_headers']
new_query = urlencode({
@@ -362,20 +361,17 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
'extra_headers': orig_extra_headers_str
})
- new_url = urljoin(request_info.url.orig_url, '?' + new_query)
+ orig_url = http_info.request_info.url.orig_url
+ new_url = urljoin(orig_url, '?' + new_query)
extra_headers.append(('location', new_url))
merged_headers = merge_response_headers(
- native_headers = response_info.headers,
+ native_headers = http_info.response_info.headers,
extra_headers = extra_headers
)
- return http_messages.ResponseInfo.make(
- status_code = response_info.status_code,
- headers = merged_headers,
- body = response_info.body,
- )
+ return dc.replace(http_info.response_info, headers=merged_headers)
resource_blocked_response = http_messages.ResponseInfo.make(
diff --git a/src/hydrilla/proxy/policies/rule.py b/src/hydrilla/proxy/policies/rule.py
index 8272d2f..c62f473 100644
--- a/src/hydrilla/proxy/policies/rule.py
+++ b/src/hydrilla/proxy/policies/rule.py
@@ -43,53 +43,23 @@ class AllowPolicy(base.Policy):
"""...."""
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
+
+script_csp_directives = ('script-src', 'script-src-elem', 'script-src-attr')
+
class BlockPolicy(base.Policy):
"""...."""
_process_response: t.ClassVar[bool] = True
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
- def _modify_headers(self, response_info: http_messages.ResponseInfo) \
- -> http_messages.IHeaders:
- new_headers = []
-
- csp_policies = csp.extract(response_info.headers)
-
- for key, val in response_info.headers.items():
- if key.lower() not in csp.header_names_and_dispositions:
- new_headers.append((key, val))
-
- for policy in csp_policies:
- if policy.disposition != 'enforce':
- continue
-
- directives = policy.directives.mutate()
- directives.pop('report-to', None)
- directives.pop('report-uri', None)
-
- policy = dc.replace(policy, directives=directives.finish())
-
- new_headers.append((policy.header_name, policy.serialize()))
-
- extra_csp = ';'.join((
- "script-src 'none'",
- "script-src-elem 'none'",
- "script-src-attr 'none'"
- ))
-
- new_headers.append(('Content-Security-Policy', extra_csp))
-
- return http_messages.make_headers(new_headers)
-
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Sequence[str]:
+ return script_csp_directives
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- new_headers = self._modify_headers(response_info)
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ return dict((d, ["'none'"]) for d in script_csp_directives)
- return dc.replace(response_info, headers=new_headers)
@dc.dataclass(frozen=True)
class RuleAllowPolicy(AllowPolicy):
diff --git a/src/hydrilla/proxy/state_impl/base.py b/src/hydrilla/proxy/state_impl/base.py
index df3287b..7437d52 100644
--- a/src/hydrilla/proxy/state_impl/base.py
+++ b/src/hydrilla/proxy/state_impl/base.py
@@ -210,10 +210,7 @@ class HaketiloStateWithFields(st.HaketiloState):
best_priority = policy.priority
best_policy = policy
except Exception as e:
- return policies.ErrorBlockPolicy(
- builtin = True,
- error = e
- )
+ return policies.ErrorBlockPolicy(error=e)
if best_policy is not None:
return best_policy