aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-08-11 15:47:13 +0200
committerWojtek Kosior <koszko@koszko.org>2022-08-11 15:47:13 +0200
commita38d19576c387e505cc468b20ca5b8bcf2fa4759 (patch)
treea578f4056dfea683d3bb2714ae1620eac576da0e /src/hydrilla/proxy
parentad97639bbf982b5b3b2757e75c3f91556e3a8eac (diff)
downloadhaketilo-hydrilla-a38d19576c387e505cc468b20ca5b8bcf2fa4759.tar.gz
haketilo-hydrilla-a38d19576c387e505cc468b20ca5b8bcf2fa4759.zip
move classes/protocols for representing HTTP requests and responses data into a separate file
Diffstat (limited to 'src/hydrilla/proxy')
-rw-r--r--src/hydrilla/proxy/addon.py9
-rw-r--r--src/hydrilla/proxy/csp.py5
-rw-r--r--src/hydrilla/proxy/http_messages.py111
-rw-r--r--src/hydrilla/proxy/policies/base.py87
-rw-r--r--src/hydrilla/proxy/policies/payload.py25
-rw-r--r--src/hydrilla/proxy/policies/payload_resource.py17
-rw-r--r--src/hydrilla/proxy/policies/rule.py7
-rw-r--r--src/hydrilla/proxy/state_impl/concrete_state.py2
8 files changed, 156 insertions, 107 deletions
diff --git a/src/hydrilla/proxy/addon.py b/src/hydrilla/proxy/addon.py
index 16c2841..06bce86 100644
--- a/src/hydrilla/proxy/addon.py
+++ b/src/hydrilla/proxy/addon.py
@@ -57,6 +57,7 @@ from ..translations import smart_gettext as _
from ..url_patterns import parse_url
from .state_impl import ConcreteHaketiloState
from . import policies
+from . import http_messages
DefaultGetValue = t.TypeVar('DefaultGetValue', object, None)
@@ -210,7 +211,7 @@ class HaketiloAddon:
with self.http_safe_event_handling(flow):
policy = self.get_policy(flow)
- request_info = policies.RequestInfo(
+ request_info = http_messages.RequestInfo(
url = parse_url(flow.request.url),
method = flow.request.method,
headers = MitmproxyHeadersWrapper(flow.request.headers),
@@ -220,7 +221,7 @@ class HaketiloAddon:
result = policy.consume_request(request_info)
if result is not None:
- if isinstance(result, policies.ProducedRequest):
+ if isinstance(result, http_messages.ProducedRequest):
flow.request = http.Request.make(
url = result.url,
method = result.method,
@@ -228,7 +229,7 @@ class HaketiloAddon:
content = result.body
)
else:
- # isinstance(result, policies.ProducedResponse)
+ # isinstance(result, http_messages.ProducedResponse)
flow.response = http.Response.make(
status_code = result.status_code,
headers = http.Headers(result.headers),
@@ -260,7 +261,7 @@ class HaketiloAddon:
with self.http_safe_event_handling(flow):
policy = self.get_policy(flow)
- response_info = policies.ResponseInfo(
+ response_info = http_messages.ResponseInfo(
url = parse_url(flow.request.url),
status_code = flow.response.status_code,
headers = MitmproxyHeadersWrapper(flow.response.headers),
diff --git a/src/hydrilla/proxy/csp.py b/src/hydrilla/proxy/csp.py
index 59d93f2..52047da 100644
--- a/src/hydrilla/proxy/csp.py
+++ b/src/hydrilla/proxy/csp.py
@@ -37,7 +37,7 @@ import dataclasses as dc
from immutables import Map, MapMutation
-from .policies.base import IHeaders
+from . import http_messages
header_names_and_dispositions = (
@@ -106,7 +106,8 @@ class ContentSecurityPolicy:
disposition = disposition
)
-def extract(headers: IHeaders) -> tuple[ContentSecurityPolicy, ...]:
+def extract(headers: http_messages.IHeaders) \
+ -> tuple[ContentSecurityPolicy, ...]:
"""...."""
csp_policies = []
diff --git a/src/hydrilla/proxy/http_messages.py b/src/hydrilla/proxy/http_messages.py
new file mode 100644
index 0000000..b6bae2b
--- /dev/null
+++ b/src/hydrilla/proxy/http_messages.py
@@ -0,0 +1,111 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Classes/protocols for representing HTTP requests and responses data.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import dataclasses as dc
+import typing as t
+import sys
+
+if sys.version_info >= (3, 8):
+ from typing import Protocol
+else:
+ from typing_extensions import Protocol
+
+from .. import url_patterns
+
+
+DefaultGetValue = t.TypeVar('DefaultGetValue', object, None)
+
+class IHeaders(Protocol):
+ """...."""
+ def __getitem__(self, key: str) -> str: ...
+
+ def get_all(self, key: str) -> t.Iterable[str]: ...
+
+ def get(self, key: str, default: DefaultGetValue = None) \
+ -> t.Union[str, DefaultGetValue]: ...
+
+ def items(self) -> t.Iterable[tuple[str, str]]: ...
+
+def encode_headers_items(headers: t.Iterable[tuple[str, str]]) \
+ -> t.Iterable[tuple[bytes, bytes]]:
+ """...."""
+ for name, value in headers:
+ yield name.encode(), value.encode()
+
+@dc.dataclass(frozen=True)
+class ProducedRequest:
+ """...."""
+ url: str
+ method: str
+ headers: t.Iterable[tuple[bytes, bytes]]
+ body: bytes
+
+@dc.dataclass(frozen=True)
+class RequestInfo:
+ """...."""
+ url: url_patterns.ParsedUrl
+ method: str
+ headers: IHeaders
+ body: bytes
+
+ def make_produced_request(self) -> ProducedRequest:
+ """...."""
+ return ProducedRequest(
+ url = self.url.orig_url,
+ method = self.method,
+ headers = encode_headers_items(self.headers.items()),
+ body = self.body
+ )
+
+@dc.dataclass(frozen=True)
+class ProducedResponse:
+ """...."""
+ status_code: int
+ headers: t.Iterable[tuple[bytes, bytes]]
+ body: bytes
+
+@dc.dataclass(frozen=True)
+class ResponseInfo:
+ """...."""
+ url: url_patterns.ParsedUrl
+ status_code: int
+ headers: IHeaders
+ body: bytes
+
+ def make_produced_response(self) -> ProducedResponse:
+ """...."""
+ return ProducedResponse(
+ status_code = self.status_code,
+ headers = encode_headers_items(self.headers.items()),
+ body = self.body
+ )
diff --git a/src/hydrilla/proxy/policies/base.py b/src/hydrilla/proxy/policies/base.py
index 3bde6f2..bb95d29 100644
--- a/src/hydrilla/proxy/policies/base.py
+++ b/src/hydrilla/proxy/policies/base.py
@@ -31,13 +31,6 @@
# Enable using with Python 3.7.
from __future__ import annotations
-import sys
-
-if sys.version_info >= (3, 8):
- from typing import Protocol
-else:
- from typing_extensions import Protocol
-
import dataclasses as dc
import typing as t
import enum
@@ -46,8 +39,8 @@ from abc import ABC, abstractmethod
from immutables import Map
-from ...url_patterns import ParsedUrl
from .. import state
+from .. import http_messages
class PolicyPriority(int, enum.Enum):
@@ -56,72 +49,10 @@ class PolicyPriority(int, enum.Enum):
_TWO = 2
_THREE = 3
-DefaultGetValue = t.TypeVar('DefaultGetValue', object, None)
-
-class IHeaders(Protocol):
- """...."""
- def __getitem__(self, key: str) -> str: ...
-
- def get_all(self, key: str) -> t.Iterable[str]: ...
-
- def get(self, key: str, default: DefaultGetValue = None) \
- -> t.Union[str, DefaultGetValue]: ...
-
- def items(self) -> t.Iterable[tuple[str, str]]: ...
-
-def encode_headers_items(headers: t.Iterable[tuple[str, str]]) \
- -> t.Iterable[tuple[bytes, bytes]]:
- """...."""
- for name, value in headers:
- yield name.encode(), value.encode()
-
-@dc.dataclass(frozen=True)
-class ProducedRequest:
- """...."""
- url: str
- method: str
- headers: t.Iterable[tuple[bytes, bytes]]
- body: bytes
-
-@dc.dataclass(frozen=True)
-class RequestInfo:
- """...."""
- url: ParsedUrl
- method: str
- headers: IHeaders
- body: bytes
-
- def make_produced_request(self) -> ProducedRequest:
- """...."""
- return ProducedRequest(
- url = self.url.orig_url,
- method = self.method,
- headers = encode_headers_items(self.headers.items()),
- body = self.body
- )
-
-@dc.dataclass(frozen=True)
-class ProducedResponse:
- """...."""
- status_code: int
- headers: t.Iterable[tuple[bytes, bytes]]
- body: bytes
-
-@dc.dataclass(frozen=True)
-class ResponseInfo:
- """...."""
- url: ParsedUrl
- status_code: int
- headers: IHeaders
- body: bytes
-
- def make_produced_response(self) -> ProducedResponse:
- """...."""
- return ProducedResponse(
- status_code = self.status_code,
- headers = encode_headers_items(self.headers.items()),
- body = self.body
- )
+ProducedMessage = t.Union[
+ http_messages.ProducedRequest,
+ http_messages.ProducedResponse
+]
class Policy(ABC):
"""...."""
@@ -134,13 +65,13 @@ class Policy(ABC):
def anticache(self) -> bool:
return self.process_request or self.process_response
- def consume_request(self, request_info: RequestInfo) \
- -> t.Optional[t.Union[ProducedRequest, ProducedResponse]]:
+ def consume_request(self, request_info: http_messages.RequestInfo) \
+ -> t.Optional[ProducedMessage]:
"""...."""
return None
- def consume_response(self, response_info: ResponseInfo) \
- -> t.Optional[ProducedResponse]:
+ def consume_response(self, response_info: http_messages.ResponseInfo) \
+ -> t.Optional[http_messages.ProducedResponse]:
"""...."""
return None
diff --git a/src/hydrilla/proxy/policies/payload.py b/src/hydrilla/proxy/policies/payload.py
index 1a88ea1..ad0fa05 100644
--- a/src/hydrilla/proxy/policies/payload.py
+++ b/src/hydrilla/proxy/policies/payload.py
@@ -38,8 +38,9 @@ import re
import bs4 # type: ignore
from ...url_patterns import ParsedUrl
-from .. import state
from .. import csp
+from .. import state
+from .. import http_messages
from . import base
@dc.dataclass(frozen=True) # type: ignore[misc]
@@ -96,7 +97,7 @@ charset= # no whitespace allowed in parameter as per RFC
$ # forbid possible dangling characters after closing '"'
''', re.VERBOSE | re.IGNORECASE)
-def deduce_content_type(headers: base.IHeaders) \
+def deduce_content_type(headers: http_messages.IHeaders) \
-> tuple[t.Optional[str], t.Optional[str]]:
"""...."""
content_type = headers.get('content-type')
@@ -157,7 +158,7 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
"script-src-attr 'none'"
))
- def _modify_headers(self, response_info: base.ResponseInfo) \
+ def _modify_headers(self, response_info: http_messages.ResponseInfo) \
-> t.Iterable[tuple[bytes, bytes]]:
"""...."""
for header_name, header_value in response_info.headers.items():
@@ -211,8 +212,10 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
# https://www.w3.org/International/tests/repository/html5/the-input-byte-stream/results-basics#precedence
return UTF8_BOM + soup.encode()
- def _consume_response_unsafe(self, response_info: base.ResponseInfo) \
- -> base.ProducedResponse:
+ def _consume_response_unsafe(
+ self,
+ response_info: http_messages.ResponseInfo
+ ) -> http_messages.ProducedResponse:
"""...."""
new_response = response_info.make_produced_response()
@@ -237,8 +240,8 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
return dc.replace(new_response, body=new_data)
- def consume_response(self, response_info: base.ResponseInfo) \
- -> base.ProducedResponse:
+ def consume_response(self, response_info: http_messages.ResponseInfo) \
+ -> http_messages.ProducedResponse:
"""...."""
try:
return self._consume_response_unsafe(response_info)
@@ -252,7 +255,7 @@ class PayloadInjectPolicy(PayloadAwarePolicy):
e.__traceback__
)
- return base.ProducedResponse(
+ return http_messages.ProducedResponse(
500,
((b'Content-Type', b'text/plain; charset=utf-8'),),
'\n'.join(error_info_list).encode()
@@ -282,11 +285,11 @@ class PayloadSuggestPolicy(PayloadAwarePolicy):
"""...."""
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._ONE
- def make_response(self, request_info: base.RequestInfo) \
- -> base.ProducedResponse:
+ def make_response(self, request_info: http_messages.RequestInfo) \
+ -> http_messages.ProducedResponse:
"""...."""
# TODO: implement
- return base.ProducedResponse(200, ((b'a', b'b'),), b'')
+ return http_messages.ProducedResponse(200, ((b'a', b'b'),), b'')
@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc]
diff --git a/src/hydrilla/proxy/policies/payload_resource.py b/src/hydrilla/proxy/policies/payload_resource.py
index b255d4e..3e1b31a 100644
--- a/src/hydrilla/proxy/policies/payload_resource.py
+++ b/src/hydrilla/proxy/policies/payload_resource.py
@@ -61,6 +61,7 @@ import typing as t
from ...translations import smart_gettext as _
from .. import state
+from .. import http_messages
from . import base
from .payload import PayloadAwarePolicy, PayloadAwarePolicyFactory
@@ -73,7 +74,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._THREE
def _make_file_resource_response(self, path: tuple[str, ...]) \
- -> base.ProducedResponse:
+ -> http_messages.ProducedResponse:
"""...."""
try:
file_data = self.payload_data.payload_ref.get_file_data(
@@ -84,20 +85,20 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
return resource_blocked_response
if file_data is None:
- return base.ProducedResponse(
+ return http_messages.ProducedResponse(
404,
[(b'Content-Type', b'text/plain; charset=utf-8')],
_('api.file_not_found').encode()
)
- return base.ProducedResponse(
+ return http_messages.ProducedResponse(
200,
((b'Content-Type', file_data.type.encode()),),
file_data.contents
)
- def consume_request(self, request_info: base.RequestInfo) \
- -> base.ProducedResponse:
+ def consume_request(self, request_info: http_messages.RequestInfo) \
+ -> http_messages.ProducedResponse:
"""...."""
# Payload resource pattern has path of the form:
# "/some/arbitrary/segments/<per-session_token>/***"
@@ -120,7 +121,7 @@ class PayloadResourcePolicy(PayloadAwarePolicy):
return resource_blocked_response
-resource_blocked_response = base.ProducedResponse(
+resource_blocked_response = http_messages.ProducedResponse(
403,
[(b'Content-Type', b'text/plain; charset=utf-8')],
_('api.resource_not_enabled_for_access').encode()
@@ -133,8 +134,8 @@ class BlockedResponsePolicy(base.Policy):
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._THREE
- def consume_request(self, request_info: base.RequestInfo) \
- -> base.ProducedResponse:
+ def consume_request(self, request_info: http_messages.RequestInfo) \
+ -> http_messages.ProducedResponse:
"""...."""
return resource_blocked_response
diff --git a/src/hydrilla/proxy/policies/rule.py b/src/hydrilla/proxy/policies/rule.py
index eb70147..bcb110e 100644
--- a/src/hydrilla/proxy/policies/rule.py
+++ b/src/hydrilla/proxy/policies/rule.py
@@ -37,6 +37,7 @@ import typing as t
from ...url_patterns import ParsedPattern
from .. import csp
from .. import state
+from ..import http_messages
from . import base
@@ -50,7 +51,7 @@ class BlockPolicy(base.Policy):
priority: t.ClassVar[base.PolicyPriority] = base.PolicyPriority._TWO
- def _modify_headers(self, response_info: base.ResponseInfo) \
+ def _modify_headers(self, response_info: http_messages.ResponseInfo) \
-> t.Iterable[tuple[bytes, bytes]]:
"""...."""
csp_policies = csp.extract(response_info.headers)
@@ -80,8 +81,8 @@ class BlockPolicy(base.Policy):
yield b'Content-Security-Policy', extra_csp.encode()
- def consume_response(self, response_info: base.ResponseInfo) \
- -> base.ProducedResponse:
+ def consume_response(self, response_info: http_messages.ResponseInfo) \
+ -> http_messages.ProducedResponse:
"""...."""
new_response = response_info.make_produced_response()
diff --git a/src/hydrilla/proxy/state_impl/concrete_state.py b/src/hydrilla/proxy/state_impl/concrete_state.py
index 53f30ae..0699bf7 100644
--- a/src/hydrilla/proxy/state_impl/concrete_state.py
+++ b/src/hydrilla/proxy/state_impl/concrete_state.py
@@ -783,7 +783,7 @@ class ConcreteHaketiloState(base.HaketiloStateWithFields):
def get_payload(self, payload_id: str) -> st.PayloadRef:
return 'not implemented'
- def add_repo(self, name: t.Optional[str], url: t.Optional[str]) \
+ def add_repo(self, name: t.Optional[str], url: t.Optional[str]) \
-> st.RepoRef:
raise NotImplementedError()