# SPDX-License-Identifier: GPL-3.0-or-later # Data structure for querying URL patterns. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2021, 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use of this # code in a proprietary program, I am not going to enforce this in # court. """ This module contains functions for deconstruction and construction of URLs and Haketilo URL patterns. Data structures for querying data using URL patterns are also defined there. """ # Enable using with Python 3.7. from __future__ import annotations import re import urllib.parse as up import typing as t import dataclasses as dc from immutables import Map from .translations import smart_gettext as _ from .exceptions import HaketiloException default_ports: t.Mapping[str, int] = Map(http=80, https=443, ftp=21) ParsedUrlType = t.TypeVar('ParsedUrlType', bound='ParsedUrl') @dc.dataclass(frozen=True, unsafe_hash=True, order=True) class ParsedUrl: """....""" orig_url: str # used in __hash__() and __lt__() scheme: str = dc.field(hash=False, compare=False) domain_labels: tuple[str, ...] = dc.field(hash=False, compare=False) path_segments: tuple[str, ...] = dc.field(hash=False, compare=False) query: str = dc.field(hash=False, compare=False) has_trailing_slash: bool = dc.field(hash=False, compare=False) port: t.Optional[int] = dc.field(hash=False, compare=False) @property def url_without_path(self) -> str: """....""" scheme = self.scheme netloc = '.'.join(reversed(self.domain_labels)) if self.port is not None and \ default_ports.get(scheme) != self.port: netloc += f':{self.port}' return f'{scheme}://{netloc}' def reconstruct_url(self) -> str: """....""" path = '/'.join(('', *self.path_segments)) if self.has_trailing_slash: path += '/' return self.url_without_path + path def path_append(self: ParsedUrlType, *new_segments: str) -> ParsedUrlType: """....""" new_url = self.reconstruct_url() if not self.has_trailing_slash: new_url += '/' new_url += '/'.join(new_segments) return dc.replace( self, orig_url = new_url, path_segments = tuple((*self.path_segments, *new_segments)), has_trailing_slash = False ) ParsedPattern = t.NewType('ParsedPattern', ParsedUrl) # # We sometimes need a dummy pattern that means "match everything". # catchall_pattern = ParsedPattern( # ParsedUrl( # orig_url = '' # scheme = '' # domain_labels = ('***',) # path_segments = ('***',) # has_trailing_slash = False # port = 0 # ) # ) # URLs with those schemes will be recognized but not all of them have to be # actually supported by Hydrilla server and Haketilo proxy. supported_schemes = 'http', 'https', 'ftp', 'file' def _parse_pattern_or_url( url: str, orig_url: str, is_pattern: bool = False ) -> ParsedUrl: """....""" if not is_pattern: assert orig_url == url parse_result = up.urlparse(url) # Verify the parsed URL is valid has_hostname = parse_result.hostname is not None if not parse_result.scheme or \ (parse_result.scheme == 'file' and parse_result.port is not None) or \ (parse_result.scheme == 'file' and has_hostname) or \ (parse_result.scheme != 'file' and not has_hostname): if is_pattern: msg = _('err.url_pattern_{}.bad').format(orig_url) raise HaketiloException(msg) else: raise HaketiloException(_('err.url_{}.bad') .format(url)) # Verify the URL uses a known scheme and extract it. scheme = parse_result.scheme if parse_result.scheme not in supported_schemes: if is_pattern: msg = _('err.url_pattern_{}.bad_scheme').format(orig_url) raise HaketiloException(msg) else: raise HaketiloException(_('err.url_{}.bad_scheme').format(url)) # Extract and keep information about special pattern schemas used. if is_pattern and orig_url.startswith('http*:'): if parse_result.port: fmt = _('err.url_pattern_{}.special_scheme_port') raise HaketiloException(fmt.format(orig_url)) # Extract URL's explicit port or deduce the port based on URL's protocol. try: explicit_port = parse_result.port port_out_of_range = explicit_port == 0 except ValueError: port_out_of_range = True if port_out_of_range: if is_pattern: msg = _('err.url_pattern_{}.bad_port').format(orig_url) raise HaketiloException(msg) else: raise HaketiloException(_('err.url_{}.bad_port').format(url)) port = explicit_port or default_ports.get(parse_result.scheme) # Make URL's hostname into a list of labels in reverse order. E.g. # 'https://a.bc..de.fg.com/h/i/' -> ['com', 'fg', 'de', 'bc', 'a'] hostname = parse_result.hostname or '' domain_labels_with_empty = reversed(hostname.split('.')) domain_labels = tuple(lbl for lbl in domain_labels_with_empty if lbl) # Make URL's path into a list of segments. E.g. # 'https://ab.cd/e//f/g/' -> ['e', 'f', 'g'] path_segments_with_empty = parse_result.path.split('/') path_segments = tuple(sgmt for sgmt in path_segments_with_empty if sgmt) # Record whether a trailing '/' is present in the URL. has_trailing_slash = parse_result.path.endswith('/') # Perform some additional sanity checks and return the result. if is_pattern: if parse_result.query: msg = _('err.url_pattern_{}.has_query').format(orig_url) raise HaketiloException(msg) if parse_result.fragment: msg = _('err.url_pattern_{}.has_frag').format(orig_url) raise HaketiloException(msg) query = parse_result.query return ParsedUrl( orig_url = orig_url, scheme = scheme, port = port, domain_labels = domain_labels, path_segments = path_segments, query = query, has_trailing_slash = has_trailing_slash ) replace_scheme_regex = re.compile(r'^[^:]*') def parse_pattern(url_pattern: str) -> t.Iterator[ParsedPattern]: """....""" if url_pattern.startswith('http*:'): patterns = [ replace_scheme_regex.sub('http', url_pattern), replace_scheme_regex.sub('https', url_pattern) ] else: patterns = [url_pattern] for pat in patterns: yield ParsedPattern( _parse_pattern_or_url(pat, url_pattern, True) ) def parse_url(url: str) -> ParsedUrl: """....""" return _parse_pattern_or_url(url, url) def normalize_pattern(url_pattern: str) -> str: parsed = next(parse_pattern(url_pattern)) reconstructed = parsed.reconstruct_url() if url_pattern.startswith('http*'): reconstructed = replace_scheme_regex.sub('http*', reconstructed) return reconstructed