aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/http_messages.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/http_messages.py')
-rw-r--r--src/hydrilla/proxy/http_messages.py244
1 files changed, 244 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/http_messages.py b/src/hydrilla/proxy/http_messages.py
new file mode 100644
index 0000000..74f1f02
--- /dev/null
+++ b/src/hydrilla/proxy/http_messages.py
@@ -0,0 +1,244 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Classes/protocols for representing HTTP requests and responses data.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this
+# code in a proprietary program, I am not going to enforce this in
+# court.
+
+"""
+.....
+"""
+
+import re
+import cgi
+import dataclasses as dc
+import typing as t
+import sys
+
+if sys.version_info >= (3, 8):
+ from typing import Protocol
+else:
+ from typing_extensions import Protocol
+
+import mitmproxy.http
+
+from .. import url_patterns
+
+
+DefaultGetValue = t.TypeVar('DefaultGetValue', str, None)
+
+class _MitmproxyHeadersWrapper():
+ def __init__(self, headers: mitmproxy.http.Headers) -> None:
+ self.headers = headers
+
+ __getitem__ = lambda self, key: self.headers[key]
+ get_all = lambda self, key: self.headers.get_all(key)
+
+ @t.overload
+ def get(self, key: str) -> t.Optional[str]:
+ ...
+ @t.overload
+ def get(self, key: str, default: DefaultGetValue) \
+ -> t.Union[str, DefaultGetValue]:
+ ...
+ def get(self, key, default = None):
+ value = self.headers.get(key)
+
+ if value is None:
+ return default
+ else:
+ return t.cast(str, value)
+
+ def items(self) -> t.Iterable[tuple[str, str]]:
+ return self.headers.items(multi=True)
+
+ def items_bin(self) -> t.Iterable[tuple[bytes, bytes]]:
+ return tuple((key.encode(), val.encode()) for key, val in self.items())
+
+class IHeaders(Protocol):
+ def __getitem__(self, key: str) -> str: ...
+
+ def get_all(self, key: str) -> t.Iterable[str]: ...
+
+ @t.overload
+ def get(self, key: str) -> t.Optional[str]:
+ ...
+ @t.overload
+ def get(self, key: str, default: DefaultGetValue) \
+ -> t.Union[str, DefaultGetValue]:
+ ...
+
+ def items(self) -> t.Iterable[tuple[str, str]]: ...
+
+ def items_bin(self) -> t.Iterable[tuple[bytes, bytes]]: ...
+
+_AnyHeaders = t.Union[
+ t.Iterable[tuple[bytes, bytes]],
+ t.Iterable[tuple[str, str]],
+ mitmproxy.http.Headers,
+ IHeaders
+]
+
+def make_headers(headers: _AnyHeaders) -> IHeaders:
+ if not isinstance(headers, mitmproxy.http.Headers):
+ if isinstance(headers, t.Iterable):
+ headers = tuple(headers)
+ if not headers or isinstance(headers[0][0], str):
+ headers = ((key.encode(), val.encode()) for key, val in headers)
+
+ headers = mitmproxy.http.Headers(headers)
+ else:
+ # isinstance(headers, IHeaders)
+ return headers
+
+ return _MitmproxyHeadersWrapper(headers)
+
+
+_AnyUrl = t.Union[str, url_patterns.ParsedUrl]
+
+def make_parsed_url(url: t.Union[str, url_patterns.ParsedUrl]) \
+ -> url_patterns.ParsedUrl:
+ return url_patterns.parse_url(url) if isinstance(url, str) else url
+
+
+@dc.dataclass(frozen=True)
+class HasHeadersMixin:
+ headers: IHeaders
+
+ def deduce_content_type(self) -> tuple[t.Optional[str], t.Optional[str]]:
+ content_type_header = self.headers.get('content-type')
+ if content_type_header is None:
+ return (None, None)
+
+ mime, options = cgi.parse_header(content_type_header)
+
+ encoding = options.get('charset')
+ if encoding is not None:
+ encoding = encoding.lower()
+
+ return mime, encoding
+
+
+@dc.dataclass(frozen=True)
+class _BaseRequestInfoFields:
+ url: url_patterns.ParsedUrl
+ method: str
+ headers: IHeaders
+
+@dc.dataclass(frozen=True)
+class BodylessRequestInfo(HasHeadersMixin, _BaseRequestInfoFields):
+ def with_body(self, body: bytes) -> 'RequestInfo':
+ return RequestInfo(self.url, self.method, self.headers, body)
+
+ @staticmethod
+ def make(
+ url: t.Union[str, url_patterns.ParsedUrl],
+ method: str,
+ headers: _AnyHeaders
+ ) -> 'BodylessRequestInfo':
+ url = make_parsed_url(url)
+ return BodylessRequestInfo(url, method, make_headers(headers))
+
+@dc.dataclass(frozen=True)
+class RequestInfo(HasHeadersMixin, _BaseRequestInfoFields):
+ body: bytes
+
+ @staticmethod
+ def make(
+ url: _AnyUrl = url_patterns.dummy_url,
+ method: str = 'GET',
+ headers: _AnyHeaders = (),
+ body: bytes = b''
+ ) -> 'RequestInfo':
+ return BodylessRequestInfo.make(url, method, headers).with_body(body)
+
+AnyRequestInfo = t.Union[BodylessRequestInfo, RequestInfo]
+
+
+@dc.dataclass(frozen=True)
+class _BaseResponseInfoFields:
+ url: url_patterns.ParsedUrl
+ status_code: int
+ headers: IHeaders
+
+@dc.dataclass(frozen=True)
+class BodylessResponseInfo(HasHeadersMixin, _BaseResponseInfoFields):
+ def with_body(self, body: bytes) -> 'ResponseInfo':
+ return ResponseInfo(self.url, self.status_code, self.headers, body)
+
+ @staticmethod
+ def make(
+ url: t.Union[str, url_patterns.ParsedUrl],
+ status_code: int,
+ headers: _AnyHeaders
+ ) -> 'BodylessResponseInfo':
+ url = make_parsed_url(url)
+ return BodylessResponseInfo(url, status_code, make_headers(headers))
+
+@dc.dataclass(frozen=True)
+class ResponseInfo(HasHeadersMixin, _BaseResponseInfoFields):
+ body: bytes
+
+ @staticmethod
+ def make(
+ url: _AnyUrl = url_patterns.dummy_url,
+ status_code: int = 404,
+ headers: _AnyHeaders = (),
+ body: bytes = b''
+ ) -> 'ResponseInfo':
+ bl_info = BodylessResponseInfo.make(url, status_code, headers)
+ return bl_info.with_body(body)
+
+AnyResponseInfo = t.Union[BodylessResponseInfo, ResponseInfo]
+
+
+def is_likely_a_page(
+ request_info: AnyRequestInfo,
+ response_info: AnyResponseInfo
+) -> bool:
+ fetch_dest = request_info.headers.get('sec-fetch-dest')
+ if fetch_dest is None:
+ if 'html' in request_info.headers.get('accept', ''):
+ fetch_dest = 'document'
+ else:
+ fetch_dest = 'unknown'
+
+ if fetch_dest not in ('document', 'iframe', 'frame', 'embed', 'object'):
+ return False
+
+ mime, encoding = response_info.deduce_content_type()
+
+ # Right now out of all response headers we're only taking Content-Type into
+ # account. In the future we might also want to consider the
+ # Content-Disposition header.
+ return mime is not None and 'html' in mime
+
+
+@dc.dataclass(frozen=True)
+class FullHTTPInfo:
+ request_info: RequestInfo
+ response_info: ResponseInfo
+
+ @property
+ def is_likely_a_page(self) -> bool:
+ return is_likely_a_page(self.request_info, self.response_info)