diff options
Diffstat (limited to 'src/hydrilla/server/malcontent.py')
-rw-r--r-- | src/hydrilla/server/malcontent.py | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/src/hydrilla/server/malcontent.py b/src/hydrilla/server/malcontent.py new file mode 100644 index 0000000..49c0fb4 --- /dev/null +++ b/src/hydrilla/server/malcontent.py @@ -0,0 +1,312 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Processing of repository packages. +# +# This file is part of Hydrilla +# +# Copyright (C) 2021, 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +# Enable using with Python 3.7. +from __future__ import annotations + +import logging +import dataclasses as dc +import typing as t + +from pathlib import Path + +from ..translations import smart_gettext as _ +from ..exceptions import HaketiloException +from .. import versions +from .. import item_infos +from .. import pattern_tree + +VersionedType = t.TypeVar( + 'VersionedType', + item_infos.ResourceInfo, + item_infos.MappingInfo +) + +class VersionedItemInfo( + item_infos.CategorizedItemInfo[VersionedType, versions.VerTuple], + t.Generic[VersionedType] +): + """Stores data of multiple versions of given resource/mapping.""" + SelfType = t.TypeVar('SelfType', bound='VersionedItemInfo[VersionedType]') + + def register(self: 'SelfType', item_info: VersionedType) -> 'SelfType': + """ + Make item info queryable by version. Perform sanity checks for uuid. + """ + return self._update(item_info.version, lambda old_info: item_info) + + def unregister(self: 'SelfType', version: versions.VerTuple) -> 'SelfType': + """....""" + return self._update(version, lambda old_info: None) + + @property + def newest_version(self) -> versions.VerTuple: + """....""" + assert not self.is_empty() + + return max(self.items.keys()) + + @property + def newest_info(self) -> VersionedType: + """Find and return info of the newest version of item.""" + return self.items[self.newest_version] + + def get_by_ver(self, ver: t.Sequence[int]) -> t.Optional[VersionedType]: + """ + Find and return info of the specified version of the item (or None if + absent). + """ + return self.items.get(versions.normalize_version(ver)) + + def get_all(self) -> t.Iterable[VersionedType]: + """Generate item info for all its versions, from oldest to newest.""" + return [self.items[version] for version in sorted(self.items.keys())] + + +MappingTree = pattern_tree.PatternTree[item_infos.MappingInfo] + +ResourceInfos = dict[str, VersionedItemInfo[item_infos.ResourceInfo]] +MappingInfos = dict[str, VersionedItemInfo[item_infos.MappingInfo]] + +class Malcontent: + """ + Represent a directory with files that can be loaded and served by Hydrilla. + """ + def __init__( + self, + malcontent_dir_path: Path, + werror: bool, + verify_files: bool + ): + """ + When an instance of Malcontent is constructed, it searches + malcontent_dir_path for serveable site-modifying packages and loads + them into its data structures. + """ + self.werror: bool = werror + self.verify_files: bool = verify_files + + self.resource_infos: ResourceInfos = {} + self.mapping_infos: MappingInfos = {} + + self.mapping_tree: MappingTree = MappingTree() + + self.malcontent_dir_path = malcontent_dir_path + + if not self.malcontent_dir_path.is_dir(): + fmt = _('err.server.malcontent_path_not_dir_{}') + raise HaketiloException(fmt.format(malcontent_dir_path)) + + types: t.Iterable[t.Type[item_infos.AnyInfo]] = ( + item_infos.MappingInfo, + item_infos.ResourceInfo + ) + for info_type in types: + type_path = self.malcontent_dir_path / info_type.type_name + if not type_path.is_dir(): + continue + + for subpath in type_path.iterdir(): + if not subpath.is_dir(): + continue + + for ver_file in subpath.iterdir(): + try: + self._load_item(info_type, ver_file) + except: + if self.werror: + raise + + fmt = _('err.server.couldnt_load_item_from_{}') + logging.error(fmt.format(ver_file), exc_info=True) + + self._report_missing() + self._finalize() + + def _check_package_files(self, info: item_infos.AnyInfo) -> None: + by_sha256_dir = self.malcontent_dir_path / 'file' / 'sha256' + + for file_spec in info.files: + if (by_sha256_dir / file_spec.sha256).is_file(): + continue + + fmt = _('err.server.no_file_{required_by}_{ver}_{file}_{sha256}') + msg = fmt.format( + required_by = info.identifier, + ver = versions.version_string(info.version), + file = file_spec.name, + sha256 = file_spec.sha256 + ) + if (self.werror): + raise HaketiloException(msg) + else: + logging.error(msg) + + @staticmethod + def _register_info( + infos: dict[str, VersionedItemInfo[VersionedType]], + identifier: str, + item_info: VersionedType + ) -> None: + versioned_info = infos.get(identifier, VersionedItemInfo()) + infos[identifier] = versioned_info.register(item_info) + + def _load_item( + self, + info_type: t.Type[item_infos.AnyInfo], + ver_file: Path + ) -> None: + """ + Reads, validates and autocompletes serveable mapping/resource + definition, then registers information from it in data structures. + """ + version = versions.parse(ver_file.name) + identifier = ver_file.parent.name + + item_info = info_type.load(ver_file) + + if item_info.identifier != identifier: + fmt = _('err.server.item_{item}_in_file_{file}') + msg = fmt.format({'item': item_info.identifier, 'file': ver_file}) + raise HaketiloException(msg) + + if item_info.version != version: + ver_str = versions.version_string(item_info.version) + fmt = _('item_version_{ver}_in_file_{file}') + msg = fmt.format({'ver': ver_str, 'file': ver_file}) + raise HaketiloException(msg) + + if self.verify_files: + self._check_package_files(item_info) + + if isinstance(item_info, item_infos.ResourceInfo): + self._register_info(self.resource_infos, identifier, item_info) + else: + self._register_info(self.mapping_infos, identifier, item_info) + + @staticmethod + def _all_infos(infos: dict[str, VersionedItemInfo[VersionedType]]) \ + -> t.Iterator[VersionedType]: + for versioned_info in infos.values(): + for item_info in versioned_info.get_all(): + yield item_info + + def _report_missing(self) -> None: + """ + Use logger to print information about items that are referenced but + were not loaded. + """ + def report_missing_dependency( + info: item_infos.ResourceInfo, + dep: str + ) -> None: + msg = _('err.server.no_dep_{resource}_{ver}_{dep}')\ + .format(dep=dep, resource=info.identifier, + ver=versions.version_string(info.version)) + logging.error(msg) + + for resource_info in self._all_infos(self.resource_infos): + for dep_specifier in resource_info.dependencies: + identifier = dep_specifier.identifier + if identifier not in self.resource_infos: + report_missing_dependency(resource_info, identifier) + + def report_missing_payload( + info: item_infos.MappingInfo, + payload: str + ) -> None: + msg = _('err.server.no_payload_{mapping}_{ver}_{payload}')\ + .format(mapping=info.identifier, payload=payload, + ver=versions.version_string(info.version)) + logging.error(msg) + + for mapping_info in self._all_infos(self.mapping_infos): + for resource_specifier in mapping_info.payloads.values(): + identifier = resource_specifier.identifier + if identifier not in self.resource_infos: + report_missing_payload(mapping_info, identifier) + + def report_missing_mapping( + info: item_infos.AnyInfo, + required: str + ) -> None: + msg = _('err.server.no_mapping_{required_by}_{ver}_{required}')\ + .format(required_by=info.identifier, required=required, + ver=versions.version_string(info.version)) + logging.error(msg) + + infos: t.Iterable[item_infos.AnyInfo] = ( + *self._all_infos(self.mapping_infos), + *self._all_infos(self.resource_infos) + ) + for item_info in infos: + for mapping_specifier in item_info.required_mappings: + identifier = mapping_specifier.identifier + if identifier not in self.mapping_infos: + report_missing_mapping(item_info, identifier) + + def _finalize(self): + """ + Initialize structures needed to serve queries. Called once after all + data gets loaded. + """ + for info in self._all_infos(self.mapping_infos): + for pattern in info.payloads: + try: + self.mapping_tree = \ + self.mapping_tree.register(pattern, info) + except: + if self.werror: + raise + msg = _('server.err.couldnt_register_{mapping}_{ver}_{pattern}')\ + .format(mapping=info.identifier, pattern=pattern, + ver=util.version_string(info.version)) + logging.error(msg) + + def query(self, url: str) -> t.Sequence[item_infos.MappingInfo]: + """ + Return a list of registered mappings that match url. + + If multiple versions of a mapping are applicable, only the most recent + is included in the result. + """ + collected: dict[str, item_infos.MappingInfo] = {} + for result_set in self.mapping_tree.search(url): + for wrapped_mapping_info in result_set: + info = wrapped_mapping_info.item + previous = collected.get(info.identifier) + if previous and previous.version > info.version: + continue + + collected[info.identifier] = info + + return list(collected.values()) + + def get_all_resources(self) -> t.Sequence[item_infos.ResourceInfo]: + return tuple(self._all_infos(self.resource_infos)) + + def get_all_mappings(self) -> t.Sequence[item_infos.MappingInfo]: + return tuple(self._all_infos(self.mapping_infos)) |