aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/policies
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/policies')
-rw-r--r--src/hydrilla/proxy/policies/base.py114
-rw-r--r--src/hydrilla/proxy/policies/misc.py10
-rw-r--r--src/hydrilla/proxy/policies/payload.py129
-rw-r--r--src/hydrilla/proxy/policies/payload_resource.py32
-rw-r--r--src/hydrilla/proxy/policies/rule.py48
5 files changed, 157 insertions, 176 deletions
diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py
index 8ea792f..7ce8663 100644
--- a/src/hydrilla/proxy/policies/base.py
+++ b/src/hydrilla/proxy/policies/base.py
@@ -40,6 +40,7 @@ from immutables import Map
from ... url_patterns import ParsedUrl
from .. import state
from .. import http_messages
+from .. import csp
class PolicyPriority(int, enum.Enum):
@@ -53,6 +54,15 @@ MessageInfo = t.Union[
http_messages.ResponseInfo
]
+
+UTF8_BOM = b'\xEF\xBB\xBF'
+BOMs = (
+ (UTF8_BOM, 'utf-8'),
+ (b'\xFE\xFF', 'utf-16be'),
+ (b'\xFF\xFE', 'utf-16le')
+)
+
+
class Policy(ABC):
"""...."""
_process_request: t.ClassVar[bool] = False
@@ -70,23 +80,111 @@ class Policy(ABC):
def should_process_response(
self,
request_info: http_messages.RequestInfo,
- response_info: http_messages.BodylessResponseInfo
+ response_info: http_messages.AnyResponseInfo
) -> bool:
return self._process_response
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Union[t.Sequence[str], t.Literal['all']]:
+ return ()
+
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ return Map()
+
+ def _csp_to_extend(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ return Map()
+
+ def _modify_response_headers(self, http_info: http_messages.FullHTTPInfo) \
+ -> http_messages.IHeaders:
+ csp_to_clear = self._csp_to_clear(http_info)
+ csp_to_add = self._csp_to_add(http_info)
+ csp_to_extend = self._csp_to_extend(http_info)
+
+ if len(csp_to_clear) + len(csp_to_extend) + len(csp_to_add) == 0:
+ return http_info.response_info.headers
+
+ return csp.modify(
+ headers = http_info.response_info.headers,
+ clear = csp_to_clear,
+ add = csp_to_add,
+ extend = csp_to_extend
+ )
+
+ def _modify_response_document(
+ self,
+ http_info: http_messages.FullHTTPInfo,
+ encoding: t.Optional[str]
+ ) -> t.Union[str, bytes]:
+ return http_info.response_info.body
+
+ def _modify_response_body(self, http_info: http_messages.FullHTTPInfo) \
+ -> bytes:
+ if not http_messages.is_likely_a_page(
+ request_info = http_info.request_info,
+ response_info = http_info.response_info
+ ):
+ return http_info.response_info.body
+
+ data = http_info.response_info.body
+
+ _, encoding = http_info.response_info.deduce_content_type()
+
+ # A UTF BOM overrides encoding specified by the header.
+ for bom, encoding_name in BOMs:
+ if data.startswith(bom):
+ encoding = encoding_name
+
+ new_data = self._modify_response_document(http_info, encoding)
+
+ if isinstance(new_data, str):
+ # Appending a three-byte Byte Order Mark (BOM) will force the
+ # browser to decode this as UTF-8 regardless of the 'Content-Type'
+ # header. See
+ # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
+ new_data = UTF8_BOM + new_data.encode()
+
+ return new_data
+
def consume_request(self, request_info: http_messages.RequestInfo) \
-> t.Optional[MessageInfo]:
+ # We're not using @abstractmethod because not every Policy needs it and
+ # we don't want to force child classes into implementing dummy methods.
raise NotImplementedError(
'This kind of policy does not consume requests.'
)
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> t.Optional[http_messages.ResponseInfo]:
- raise NotImplementedError(
- 'This kind of policy does not consume responses.'
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[http_messages.ResponseInfo]:
+ try:
+ new_headers = self._modify_response_headers(http_info)
+ new_body = self._modify_response_body(http_info)
+ except Exception as e:
+ # In the future we might want to actually describe eventual errors.
+ # For now, we're just printing the stack trace.
+ import traceback
+
+ error_info_list = traceback.format_exception(
+ type(e),
+ e,
+ e.__traceback__
+ )
+
+ return http_messages.ResponseInfo.make(
+ status_code = 500,
+ headers = (('Content-Type', 'text/plain; charset=utf-8'),),
+ body = '\n'.join(error_info_list).encode()
+ )
+
+ if (new_headers is http_info.response_info.headers and
+ new_body is http_info.response_info.body):
+ return None
+
+ return dc.replace(
+ http_info.response_info,
+ headers = new_headers,
+ body = new_body
)
diff --git a/src/hydrilla/proxy/policies/misc.py b/src/hydrilla/proxy/policies/misc.py
index 81875a2..acce164 100644
--- a/src/hydrilla/proxy/policies/misc.py
+++ b/src/hydrilla/proxy/policies/misc.py
@@ -56,8 +56,6 @@ class ErrorBlockPolicy(BlockPolicy):
"""...."""
error: Exception
- builtin: bool = True
-
class MitmItPagePolicy(base.Policy):
"""
@@ -74,15 +72,9 @@ class MitmItPagePolicy(base.Policy):
def consume_request(self, request_info: http_messages.RequestInfo) -> None:
return None
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> None:
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) -> None:
return None
- builtin: bool = True
-
@dc.dataclass(frozen=True, unsafe_hash=True)
class MitmItPagePolicyFactory(base.PolicyFactory):
builtin: bool = True
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py
index b89a1c1..8aaf845 100644
--- a/src/hydrilla/proxy/policies/payload.py
+++ b/src/hydrilla/proxy/policies/payload.py
@@ -49,8 +49,7 @@ class PayloadAwarePolicy(base.Policy):
"""...."""
payload_data: state.PayloadData
- def assets_base_url(self, request_url: ParsedUrl):
- """...."""
+ def _assets_base_url(self, request_url: ParsedUrl):
token = self.payload_data.unique_token
base_path_segments = (*self.payload_data.pattern_path_segments, token)
@@ -90,13 +89,6 @@ class PayloadAwarePolicyFactory(base.PolicyFactory):
return super().__lt__(other)
-UTF8_BOM = b'\xEF\xBB\xBF'
-BOMs = (
- (UTF8_BOM, 'utf-8'),
- (b'\xFE\xFF', 'utf-16be'),
- (b'\xFF\xFE', 'utf-16le')
-)
-
def block_attr(element: bs4.PageElement, attr_name: str) -> None:
"""
Disable HTML node attributes by prepending `blocked-'. This allows them to
@@ -118,37 +110,25 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
- def _new_csp(self, request_url: ParsedUrl) -> str:
- """...."""
- assets_base = self.assets_base_url(request_url)
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Sequence[str]:
+ return ['script-src']
- script_src = f"script-src {assets_base}"
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ allowed_origins = [self._assets_base_url(http_info.request_info.url)]
if self.payload_data.eval_allowed:
- script_src = f"{script_src} 'unsafe-eval'"
-
- return '; '.join((
- script_src,
- "script-src-elem 'none'",
- "script-src-attr 'none'"
- ))
-
- def _modify_headers(self, response_info: http_messages.ResponseInfo) \
- -> http_messages.IHeaders:
- new_headers = []
-
- for key, val in response_info.headers.items():
- if key.lower() not in csp.header_names_and_dispositions:
- new_headers.append((key, val))
-
- new_csp = self._new_csp(response_info.url)
- new_headers.append(('Content-Security-Policy', new_csp))
+ allowed_origins.append("'unsafe-eval'")
- return http_messages.make_headers(new_headers)
+ return {
+ 'script-src': allowed_origins,
+ 'script-src-elem': ["'none'"],
+ 'script-src-attr': ["'none'"]
+ }
def _script_urls(self, url: ParsedUrl) -> t.Iterable[str]:
- """...."""
- base_url = self.assets_base_url(url)
+ base_url = self._assets_base_url(url)
payload_ref = self.payload_data.ref
yield base_url + 'api/page_init_script.js'
@@ -156,15 +136,13 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
for path in payload_ref.get_script_paths():
yield base_url + '/'.join(('static', *path))
- def _modify_body(
+ def _modify_response_document(
self,
- url: ParsedUrl,
- body: bytes,
- encoding: t.Optional[str]
- ) -> bytes:
- """...."""
+ http_info: http_messages.FullHTTPInfo,
+ encoding: t.Optional[str]
+ ) -> t.Union[bytes, str]:
soup = bs4.BeautifulSoup(
- markup = body,
+ markup = http_info.response_info.body,
from_encoding = encoding,
features = 'html5lib'
)
@@ -172,9 +150,9 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
# Inject scripts.
script_parent = soup.find('body') or soup.find('html')
if script_parent is None:
- return body
+ return http_info.response_info.body
- for script_url in self._script_urls(url):
+ for script_url in self._script_urls(http_info.request_info.url):
tag = bs4.Tag(name='script', attrs={'src': script_url})
script_parent.append(tag)
@@ -182,61 +160,11 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
# scripts.
for meta in soup.select('head meta[http-equiv]'):
header_name = meta.attrs.get('http-equiv', '').lower().strip()
- if header_name in csp.enforce_header_names_set:
+ if header_name in csp.enforce_header_names:
block_attr(meta, 'http-equiv')
block_attr(meta, 'content')
- # Appending a three-byte Byte Order Mark (BOM) will force the browser to
- # decode this as UTF-8 regardless of the 'Content-Type' header. See:
- # https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
- return UTF8_BOM + soup.encode()
-
- def _consume_response_unsafe(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- new_headers = self._modify_headers(response_info)
- new_response = dc.replace(response_info, headers=new_headers)
-
- if not http_messages.is_likely_a_page(request_info, response_info):
- return new_response
-
- data = response_info.body
-
- _, encoding = response_info.deduce_content_type()
-
- # A UTF BOM overrides encoding specified by the header.
- for bom, encoding_name in BOMs:
- if data.startswith(bom):
- encoding = encoding_name
-
- new_data = self._modify_body(response_info.url, data, encoding)
-
- return dc.replace(new_response, body=new_data)
-
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- try:
- return self._consume_response_unsafe(request_info, response_info)
- except Exception as e:
- # TODO: actually describe the errors
- import traceback
-
- error_info_list = traceback.format_exception(
- type(e),
- e,
- e.__traceback__
- )
-
- return http_messages.ResponseInfo.make(
- status_code = 500,
- headers = (('Content-Type', 'text/plain; charset=utf-8'),),
- body = '\n'.join(error_info_list).encode()
- )
+ return soup.decode()
class _PayloadHasProblemsError(HaketiloException):
@@ -246,22 +174,19 @@ class AutoPayloadInjectPolicy(PayloadInjectPolicy):
"""...."""
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Optional[http_messages.ResponseInfo]:
try:
if self.payload_data.ref.has_problems():
raise _PayloadHasProblemsError()
self.payload_data.ref.ensure_items_installed()
- return super().consume_response(request_info, response_info)
+ return super().consume_response(http_info)
except (state.RepoCommunicationError, state.FileInstallationError,
_PayloadHasProblemsError) as ex:
extra_params: dict[str, str] = {
- 'next_url': response_info.url.orig_url
+ 'next_url': http_info.response_info.url.orig_url
}
if isinstance(ex, state.FileInstallationError):
extra_params['repo_id'] = ex.repo_id
diff --git a/src/hydrilla/proxy/policies/payload_resource.py b/src/hydrilla/proxy/policies/payload_resource.py
index 04a148c..6695ce1 100644
--- a/src/hydrilla/proxy/policies/payload_resource.py
+++ b/src/hydrilla/proxy/policies/payload_resource.py
@@ -245,7 +245,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
def should_process_response(
self,
request_info: http_messages.RequestInfo,
- response_info: http_messages.BodylessResponseInfo
+ response_info: http_messages.AnyResponseInfo
) -> bool:
return self.extract_resource_path(request_info.url) \
== ('api', 'unrestricted_http')
@@ -279,7 +279,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
with jinja_lock:
template = jinja_env.get_template('page_init_script.js.jinja')
token = self.payload_data.unique_token
- base_url = self.assets_base_url(request_info.url)
+ base_url = self._assets_base_url(request_info.url)
ver_str = json.dumps(haketilo_version)
js = template.render(
unique_token_encoded = encode_string_for_js(token),
@@ -338,23 +338,22 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
else:
return resource_blocked_response
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
+ def consume_response(self, http_info: http_messages.FullHTTPInfo) \
+ -> http_messages.ResponseInfo:
"""
This method shall only be called for responses to unrestricted HTTP API
requests. Its purpose is to sanitize response headers and smuggle their
original data using an additional header.
"""
- serialized = json.dumps([*response_info.headers.items()])
+ serialized = json.dumps([*http_info.response_info.headers.items()])
extra_headers = [('X-Haketilo-True-Headers', quote(serialized)),]
- if (300 <= response_info.status_code < 400):
- location = response_info.headers.get('location')
+ # Greetings, adventurous code dweller! It's amazing you made it that
+ # deep. I hope you're having a good day. If not, read Isaiah 49:15 :)
+ if (300 <= http_info.response_info.status_code < 400):
+ location = http_info.response_info.headers.get('location')
if location is not None:
- orig_params = parse_qs(request_info.url.query)
+ orig_params = parse_qs(http_info.request_info.url.query)
orig_extra_headers_str, = orig_params['extra_headers']
new_query = urlencode({
@@ -362,20 +361,17 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
'extra_headers': orig_extra_headers_str
})
- new_url = urljoin(request_info.url.orig_url, '?' + new_query)
+ orig_url = http_info.request_info.url.orig_url
+ new_url = urljoin(orig_url, '?' + new_query)
extra_headers.append(('location', new_url))
merged_headers = merge_response_headers(
- native_headers = response_info.headers,
+ native_headers = http_info.response_info.headers,
extra_headers = extra_headers
)
- return http_messages.ResponseInfo.make(
- status_code = response_info.status_code,
- headers = merged_headers,
- body = response_info.body,
- )
+ return dc.replace(http_info.response_info, headers=merged_headers)
resource_blocked_response = http_messages.ResponseInfo.make(
diff --git a/src/hydrilla/proxy/policies/rule.py b/src/hydrilla/proxy/policies/rule.py
index 8272d2f..c62f473 100644
--- a/src/hydrilla/proxy/policies/rule.py
+++ b/src/hydrilla/proxy/policies/rule.py
@@ -43,53 +43,23 @@ class AllowPolicy(base.Policy):
"""...."""
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
+
+script_csp_directives = ('script-src', 'script-src-elem', 'script-src-attr')
+
class BlockPolicy(base.Policy):
"""...."""
_process_response: t.ClassVar[bool] = True
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
- def _modify_headers(self, response_info: http_messages.ResponseInfo) \
- -> http_messages.IHeaders:
- new_headers = []
-
- csp_policies = csp.extract(response_info.headers)
-
- for key, val in response_info.headers.items():
- if key.lower() not in csp.header_names_and_dispositions:
- new_headers.append((key, val))
-
- for policy in csp_policies:
- if policy.disposition != 'enforce':
- continue
-
- directives = policy.directives.mutate()
- directives.pop('report-to', None)
- directives.pop('report-uri', None)
-
- policy = dc.replace(policy, directives=directives.finish())
-
- new_headers.append((policy.header_name, policy.serialize()))
-
- extra_csp = ';'.join((
- "script-src 'none'",
- "script-src-elem 'none'",
- "script-src-attr 'none'"
- ))
-
- new_headers.append(('Content-Security-Policy', extra_csp))
-
- return http_messages.make_headers(new_headers)
-
+ def _csp_to_clear(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Sequence[str]:
+ return script_csp_directives
- def consume_response(
- self,
- request_info: http_messages.RequestInfo,
- response_info: http_messages.ResponseInfo
- ) -> http_messages.ResponseInfo:
- new_headers = self._modify_headers(response_info)
+ def _csp_to_add(self, http_info: http_messages.FullHTTPInfo) \
+ -> t.Mapping[str, t.Sequence[str]]:
+ return dict((d, ["'none'"]) for d in script_csp_directives)
- return dc.replace(response_info, headers=new_headers)
@dc.dataclass(frozen=True)
class RuleAllowPolicy(AllowPolicy):