diff options
Diffstat (limited to 'src/hydrilla/url_patterns.py')
-rw-r--r-- | src/hydrilla/url_patterns.py | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/src/hydrilla/url_patterns.py b/src/hydrilla/url_patterns.py new file mode 100644 index 0000000..84f56bc --- /dev/null +++ b/src/hydrilla/url_patterns.py @@ -0,0 +1,237 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Data structure for querying URL patterns. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2021, 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this +# code in a proprietary program, I am not going to enforce this in +# court. + +""" +This module contains functions for deconstruction and construction of URLs and +Haketilo URL patterns. + +Data structures for querying data using URL patterns are also defined there. +""" + +import re +import urllib.parse as up +import typing as t +import dataclasses as dc + +from immutables import Map + +from .translations import smart_gettext as _ +from .exceptions import HaketiloException + + +class HaketiloURLException(HaketiloException): + """Type used for exceptions generated when parsing a URL or URL pattern.""" + pass + + +default_ports: t.Mapping[str, int] = Map(http=80, https=443, ftp=21) + +ParsedUrlType = t.TypeVar('ParsedUrlType', bound='ParsedUrl') + +@dc.dataclass(frozen=True, unsafe_hash=True, order=True) +class ParsedUrl: + """....""" + orig_url: str # used in __hash__() and __lt__() + scheme: str = dc.field(hash=False, compare=False) + domain_labels: t.Tuple[str, ...] = dc.field(hash=False, compare=False) + path_segments: t.Tuple[str, ...] = dc.field(hash=False, compare=False) + query: str = dc.field(hash=False, compare=False) + has_trailing_slash: bool = dc.field(hash=False, compare=False) + port: t.Optional[int] = dc.field(hash=False, compare=False) + + @property + def url_without_path(self) -> str: + """....""" + scheme = self.scheme + + netloc = '.'.join(reversed(self.domain_labels)) + + if self.port is not None and \ + default_ports.get(scheme) != self.port: + netloc += f':{self.port}' + + return f'{scheme}://{netloc}' + + def reconstruct_url(self) -> str: + """....""" + path = '/'.join(('', *self.path_segments)) + if self.has_trailing_slash: + path += '/' + + return self.url_without_path + path + + def path_append(self: ParsedUrlType, *new_segments: str) -> ParsedUrlType: + """....""" + new_url = self.reconstruct_url() + if not self.has_trailing_slash: + new_url += '/' + + new_url += '/'.join(new_segments) + + return dc.replace( + self, + orig_url = new_url, + path_segments = tuple((*self.path_segments, *new_segments)), + has_trailing_slash = False + ) + +ParsedPattern = t.NewType('ParsedPattern', ParsedUrl) + + +# URLs with those schemes will be recognized but not all of them have to be +# actually supported by Hydrilla server and Haketilo proxy. +supported_schemes = 'http', 'https', 'ftp', 'file' + +def _parse_pattern_or_url( + url: str, + orig_url: str, + is_pattern: bool = False +) -> ParsedUrl: + """....""" + if not is_pattern: + assert orig_url == url + + parse_result = up.urlparse(url) + + # Verify the parsed URL is valid + has_hostname = parse_result.hostname is not None + if not parse_result.scheme or \ + (parse_result.scheme == 'file' and parse_result.port is not None) or \ + (parse_result.scheme == 'file' and has_hostname) or \ + (parse_result.scheme != 'file' and not has_hostname): + if is_pattern: + msg = _('err.url_pattern_{}.bad').format(orig_url) + raise HaketiloURLException(msg) + else: + raise HaketiloURLException(_('err.url_{}.bad') .format(url)) + + # Verify the URL uses a known scheme and extract it. + scheme = parse_result.scheme + + if parse_result.scheme not in supported_schemes: + if is_pattern: + msg = _('err.url_pattern_{}.bad_scheme').format(orig_url) + raise HaketiloURLException(msg) + else: + raise HaketiloURLException(_('err.url_{}.bad_scheme').format(url)) + + # Extract and keep information about special pattern schemas used. + if is_pattern and orig_url.startswith('http*:'): + if parse_result.port: + fmt = _('err.url_pattern_{}.special_scheme_port') + raise HaketiloURLException(fmt.format(orig_url)) + + # Extract URL's explicit port or deduce the port based on URL's protocol. + try: + explicit_port = parse_result.port + port_out_of_range = explicit_port == 0 + except ValueError: + port_out_of_range = True + + if port_out_of_range: + if is_pattern: + msg = _('err.url_pattern_{}.bad_port').format(orig_url) + raise HaketiloURLException(msg) + else: + raise HaketiloURLException(_('err.url_{}.bad_port').format(url)) + + port = explicit_port or default_ports.get(parse_result.scheme) + + # Make URL's hostname into a list of labels in reverse order. E.g. + # 'https://a.bc..de.fg.com/h/i/' -> ['com', 'fg', 'de', 'bc', 'a'] + hostname = parse_result.hostname or '' + domain_labels_with_empty = reversed(hostname.split('.')) + domain_labels = tuple(lbl for lbl in domain_labels_with_empty if lbl) + + # Make URL's path into a list of segments. E.g. + # 'https://ab.cd/e//f/g/' -> ['e', 'f', 'g'] + path_segments_with_empty = parse_result.path.split('/') + path_segments = tuple(sgmt for sgmt in path_segments_with_empty if sgmt) + + # Record whether a trailing '/' is present in the URL. + has_trailing_slash = parse_result.path.endswith('/') + + # Perform some additional sanity checks and return the result. + if is_pattern: + if parse_result.query: + msg = _('err.url_pattern_{}.has_query').format(orig_url) + raise HaketiloURLException(msg) + + if parse_result.fragment: + msg = _('err.url_pattern_{}.has_frag').format(orig_url) + raise HaketiloURLException(msg) + + query = parse_result.query + + return ParsedUrl( + orig_url = orig_url, + scheme = scheme, + port = port, + domain_labels = domain_labels, + path_segments = path_segments, + query = query, + has_trailing_slash = has_trailing_slash + ) + +replace_scheme_regex = re.compile(r'^[^:]*') + +def parse_pattern(url_pattern: str) -> t.Iterator[ParsedPattern]: + """....""" + if url_pattern.startswith('http*:'): + patterns = [ + replace_scheme_regex.sub('http', url_pattern), + replace_scheme_regex.sub('https', url_pattern) + ] + else: + patterns = [url_pattern] + + for pat in patterns: + yield ParsedPattern( + _parse_pattern_or_url(pat, url_pattern, True) + ) + +def parse_url(url: str) -> ParsedUrl: + """....""" + return _parse_pattern_or_url(url, url) + + +def normalize_pattern(url_pattern: str) -> str: + parsed = next(parse_pattern(url_pattern)) + + reconstructed = parsed.reconstruct_url() + + if url_pattern.startswith('http*'): + reconstructed = replace_scheme_regex.sub('http*', reconstructed) + + return reconstructed + + +def pattern_for_domain(url: str) -> str: + return normalize_pattern(f'http*://{up.urlparse(url).netloc}/***') + + +dummy_url = parse_url('http://dummy.replacement.url') |