diff options
Diffstat (limited to 'src/hydrilla/item_infos.py')
-rw-r--r-- | src/hydrilla/item_infos.py | 699 |
1 files changed, 699 insertions, 0 deletions
diff --git a/src/hydrilla/item_infos.py b/src/hydrilla/item_infos.py new file mode 100644 index 0000000..430bcd0 --- /dev/null +++ b/src/hydrilla/item_infos.py @@ -0,0 +1,699 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Reading resources, mappings and other JSON documents from the filesystem. +# +# This file is part of Hydrilla&Haketilo +# +# Copyright (C) 2021, 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this +# code in a proprietary program, I am not going to enforce this in +# court. + +""" +..... +""" + +import sys + +if sys.version_info >= (3, 8): + from typing import Protocol +else: + from typing_extensions import Protocol + +import enum +import typing as t +import dataclasses as dc + +from pathlib import Path, PurePosixPath +from abc import ABC, abstractmethod + +from immutables import Map + +from . import versions, json_instances +from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern +from .exceptions import HaketiloException +from .translations import smart_gettext as _ + + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ItemSpecifier: + """....""" + identifier: str + +ItemSpecs = t.Tuple[ItemSpecifier, ...] + +SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]] + +def make_item_specifiers_seq(spec_objs: SpecifierObjs) -> ItemSpecs: + return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs) + +def make_required_mappings(spec_objs: t.Any, schema_compat: int) -> ItemSpecs: + if schema_compat < 2: + return () + + return make_item_specifiers_seq(spec_objs) + +@dc.dataclass(frozen=True, unsafe_hash=True) +class FileSpecifier: + """....""" + name: str + sha256: str + +FileSpecs = t.Tuple[FileSpecifier, ...] + +def normalize_filename(name: str): + """ + This function eliminated double slashes in file name and ensures it does not + try to reference parent directories. + """ + path = PurePosixPath(name) + + if '.' in path.parts or '..' in path.parts: + msg = _('err.item_info.filename_invalid_{}').format(name) + raise HaketiloException(msg) + + return str(path) + +def make_file_specifiers_seq(spec_objs: SpecifierObjs) -> FileSpecs: + return tuple( + FileSpecifier(normalize_filename(obj['file']), obj['sha256']) + for obj + in spec_objs + ) + +@dc.dataclass(frozen=True, unsafe_hash=True) +class GeneratedBy: + """....""" + name: str + version: t.Optional[str] + + @staticmethod + def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ + t.Optional['GeneratedBy']: + """....""" + if generated_by_obj is None: + return None + + return GeneratedBy( + name = generated_by_obj['name'], + version = generated_by_obj.get('version') + ) + + +def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool: + if schema_compat < 2: + return False + + return perms_obj.get('eval', False) + + +def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool: + if schema_compat < 2: + return False + + return perms_obj.get('cors_bypass', False) + + +def make_version_constraint( + ver: t.Any, + schema_compat: int, + default: versions.VerTuple +) -> versions.VerTuple: + if schema_compat < 2 or ver is None: + return default + + return versions.normalize(ver) + + +class Categorizable(Protocol): + """....""" + uuid: t.Optional[str] + identifier: str + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ItemIdentity: + repo: str + repo_iteration: int + version: versions.VerTuple + identifier: str + + +# mypy needs to be corrected: +# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704 +@dc.dataclass(frozen=True) # type: ignore[misc] +class ItemInfoBase(ABC, ItemIdentity, Categorizable): + """....""" + source_name: str = dc.field(hash=False, compare=False) + source_copyright: FileSpecs = dc.field(hash=False, compare=False) + uuid: t.Optional[str] = dc.field(hash=False, compare=False) + long_name: str = dc.field(hash=False, compare=False) + description: str = dc.field(hash=False, compare=False) + allows_eval: bool = dc.field(hash=False, compare=False) + allows_cors_bypass: bool = dc.field(hash=False, compare=False) + min_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False) + max_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False) + required_mappings: ItemSpecs = dc.field(hash=False, compare=False) + generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False) + + @property + def version_string(self) -> str: + return versions.version_string(self.version) + + @property + def versioned_identifier(self) -> str: + """....""" + return f'{self.identifier}-{self.version_string}' + + @property + def files(self) -> FileSpecs: + return self.source_copyright + + @property + def compatible(self) -> bool: + return (self.min_haketilo_ver <= versions.haketilo_version and + self.max_haketilo_ver >= versions.haketilo_version) + + @staticmethod + def _get_base_init_kwargs( + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repo: str, + repo_iteration: int + ) -> t.Mapping[str, t.Any]: + """....""" + source_copyright = make_file_specifiers_seq( + item_obj['source_copyright'] + ) + + version = versions.normalize(item_obj['version']) + + perms_obj = item_obj.get('permissions', {}) + + eval_perm = make_eval_permission(perms_obj, schema_compat) + cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat) + + min_haketilo_ver = make_version_constraint( + ver = item_obj.get('min_haketilo_version'), + schema_compat = schema_compat, + default = versions.int_ver_min + ) + max_haketilo_ver = make_version_constraint( + ver = item_obj.get('max_haketilo_version'), + schema_compat = schema_compat, + default = versions.int_ver_max + ) + + required_mappings = make_required_mappings( + item_obj.get('required_mappings', []), + schema_compat + ) + + generated_by = GeneratedBy.make(item_obj.get('generated_by')) + + return Map( + repo = repo, + repo_iteration = repo_iteration, + source_name = item_obj['source_name'], + source_copyright = source_copyright, + version = version, + identifier = item_obj['identifier'], + uuid = item_obj.get('uuid'), + long_name = item_obj['long_name'], + description = item_obj['description'], + allows_eval = eval_perm, + allows_cors_bypass = cors_bypass_perm, + min_haketilo_ver = min_haketilo_ver, + max_haketilo_ver = max_haketilo_ver, + required_mappings = required_mappings, + generated_by = generated_by + ) + + +AnyInfo = t.Union['ResourceInfo', 'MappingInfo'] + + +class ItemType(enum.Enum): + RESOURCE = 'resource' + MAPPING = 'mapping' + + @property + def info_class(self) -> t.Type[AnyInfo]: + if self == ItemType.RESOURCE: + return ResourceInfo + else: + return MappingInfo + + @property + def alt_name(self) -> str: + if self == ItemType.RESOURCE: + return 'library' + else: + return 'package' + + @property + def alt_name_plural(self) -> str: + if self == ItemType.RESOURCE: + return 'libraries' + else: + return 'packages' + +@dc.dataclass(frozen=True, unsafe_hash=True) +class CorrespondsToResourceDCMixin: + type: t.ClassVar[ItemType] = ItemType.RESOURCE + +@dc.dataclass(frozen=True, unsafe_hash=True) +class CorrespondsToMappingDCMixin: + type: t.ClassVar[ItemType] = ItemType.MAPPING + + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ResourceInfo(ItemInfoBase, CorrespondsToResourceDCMixin): + """....""" + revision: int = dc.field(hash=False, compare=False) + dependencies: ItemSpecs = dc.field(hash=False, compare=False) + scripts: FileSpecs = dc.field(hash=False, compare=False) + + @property + def version_string(self) -> str: + return f'{super().version_string}-{self.revision}' + + @property + def files(self) -> FileSpecs: + return tuple((*self.source_copyright, *self.scripts)) + + @staticmethod + def make( + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repo: str, + repo_iteration: int + ) -> 'ResourceInfo': + """....""" + base_init_kwargs = ItemInfoBase._get_base_init_kwargs( + item_obj, + schema_compat, + repo, + repo_iteration + ) + + dependencies = make_item_specifiers_seq( + item_obj.get('dependencies', []) + ) + + scripts = make_file_specifiers_seq( + item_obj.get('scripts', []) + ) + + return ResourceInfo( + **base_init_kwargs, + + revision = item_obj['revision'], + dependencies = dependencies, + scripts = scripts + ) + + @staticmethod + def load( + instance_source: json_instances.InstanceSource, + repo: str = '<dummyrepo>', + repo_iteration: int = -1 + ) -> 'ResourceInfo': + """....""" + return _load_item_info( + ResourceInfo, + instance_source, + repo, + repo_iteration + ) + + def __lt__(self, other: 'ResourceInfo') -> bool: + """....""" + return ( + self.identifier, + other.version, + other.revision, + self.repo, + other.repo_iteration + ) < ( + other.identifier, + self.version, + self.revision, + other.repo, + self.repo_iteration + ) + +def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ + -> t.Mapping[ParsedPattern, ItemSpecifier]: + """....""" + mapping: t.List[t.Tuple[ParsedPattern, ItemSpecifier]] = [] + + for pattern, spec_obj in payloads_obj.items(): + ref = ItemSpecifier(spec_obj['identifier']) + mapping.extend((parsed, ref) for parsed in parse_pattern(pattern)) + + return Map(mapping) + +@dc.dataclass(frozen=True, unsafe_hash=True) +class MappingInfo(ItemInfoBase, CorrespondsToMappingDCMixin): + """....""" + payloads: t.Mapping[ParsedPattern, ItemSpecifier] = \ + dc.field(hash=False, compare=False) + + @staticmethod + def make( + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repo: str, + repo_iteration: int + ) -> 'MappingInfo': + """....""" + base_init_kwargs = ItemInfoBase._get_base_init_kwargs( + item_obj, + schema_compat, + repo, + repo_iteration + ) + + return MappingInfo( + **base_init_kwargs, + + payloads = make_payloads(item_obj.get('payloads', {})) + ) + + @staticmethod + def load( + instance_source: json_instances.InstanceSource, + repo: str = '<dummyrepo>', + repo_iteration: int = -1 + ) -> 'MappingInfo': + """....""" + return _load_item_info( + MappingInfo, + instance_source, + repo, + repo_iteration + ) + + def __lt__(self, other: 'MappingInfo') -> bool: + """....""" + return ( + self.identifier, + other.version, + self.repo, + other.repo_iteration + ) < ( + other.identifier, + self.version, + other.repo, + self.repo_iteration + ) + + +LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo) + +def _load_item_info( + info_type: t.Type[LoadedType], + instance_source: json_instances.InstanceSource, + repo: str, + repo_iteration: int +) -> LoadedType: + """Read, validate and autocomplete a mapping/resource description.""" + instance = json_instances.read_instance(instance_source) + + schema_fmt = f'api_{info_type.type.value}_description-{{}}.schema.json' + + schema_compat = json_instances.validate_instance(instance, schema_fmt) + + # We know from successful validation that instance is a dict. + return info_type.make( + t.cast('t.Dict[str, t.Any]', instance), + schema_compat, + repo, + repo_iteration + ) + + +CategorizedInfoType = t.TypeVar( + 'CategorizedInfoType', + ResourceInfo, + MappingInfo +) + +CategorizedType = t.TypeVar( + 'CategorizedType', + bound=Categorizable +) + +CategorizedUpdater = t.Callable[ + [t.Optional[CategorizedType]], + t.Optional[CategorizedType] +] + +CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable) + +@dc.dataclass(frozen=True) # type: ignore[misc] +class CategorizedItemInfo( + ABC, + Categorizable, + t.Generic[CategorizedInfoType, CategorizedType, CategoryKeyType] +): + """....""" + SelfType = t.TypeVar( + 'SelfType', + bound = 'CategorizedItemInfo[CategorizedInfoType, CategorizedType, CategoryKeyType]' + ) + + uuid: t.Optional[str] = None + identifier: str = '<dummy>' + items: Map[CategoryKeyType, CategorizedType] = Map() + _initialized: bool = False + + def _update( + self: 'SelfType', + key: CategoryKeyType, + updater: CategorizedUpdater + ) -> 'SelfType': + """...... Perform sanity checks for uuid.""" + uuid = self.uuid + + items = self.items.mutate() + + updated = updater(items.get(key)) + if updated is None: + items.pop(key, None) + + identifier = self.identifier + else: + items[key] = updated + + identifier = updated.identifier + if self._initialized: + assert identifier == self.identifier + + if uuid is not None: + if updated.uuid is not None and uuid != updated.uuid: + raise HaketiloException(_('uuid_mismatch_{identifier}') + .format(identifier=identifier)) + else: + uuid = updated.uuid + + return dc.replace( + self, + identifier = identifier, + uuid = uuid, + items = items.finish(), + _initialized = self._initialized or updated is not None + ) + + @abstractmethod + def register(self: 'SelfType', info: CategorizedInfoType) -> 'SelfType': + ... + + @abstractmethod + def get_all(self: 'SelfType') -> t.Sequence[CategorizedInfoType]: + ... + + def is_empty(self) -> bool: + return len(self.items) == 0 + + +class VersionedItemInfo( + CategorizedItemInfo[ + CategorizedInfoType, + CategorizedInfoType, + versions.VerTuple + ], + t.Generic[CategorizedInfoType] +): + """Stores data of multiple versions of given resource/mapping.""" + SelfType = t.TypeVar( + 'SelfType', + bound = 'VersionedItemInfo[CategorizedInfoType]' + ) + + def register(self: 'SelfType', item_info: CategorizedInfoType) \ + -> 'SelfType': + """ + Make item info queryable by version. Perform sanity checks for uuid. + """ + return self._update(item_info.version, lambda old_info: item_info) + + @property + def newest_version(self) -> versions.VerTuple: + """....""" + assert not self.is_empty() + + return self.versions()[-1] + + @property + def newest_info(self) -> CategorizedInfoType: + """Find and return info of the newest version of item.""" + return self.items[self.newest_version] + + def versions(self, reverse: bool = False) -> t.Sequence[versions.VerTuple]: + return sorted(self.items.keys(), reverse=reverse) + + def get_by_ver(self, ver: t.Sequence[int]) \ + -> t.Optional[CategorizedInfoType]: + """ + Find and return info of the specified version of the item (or None if + absent). + """ + return self.items.get(versions.normalize(ver)) + + def get_all(self, reverse_versions: bool = False) \ + -> t.Sequence[CategorizedInfoType]: + """ + Generate item info for all its versions, from oldest to newest unless + the opposite is requested. + """ + versions = self.versions(reverse=reverse_versions) + return [self.items[ver] for ver in versions] + +VersionedResourceInfo = VersionedItemInfo[ResourceInfo] +VersionedMappingInfo = VersionedItemInfo[MappingInfo] + +VersionedItemInfoMap = Map[str, VersionedItemInfo] +VersionedResourceInfoMap = Map[str, VersionedResourceInfo] +VersionedMappingInfoMap = Map[str, VersionedMappingInfo] + +def register_in_versioned_map( + map: Map[str, VersionedItemInfo[CategorizedInfoType]], + info: CategorizedInfoType +) -> Map[str, VersionedItemInfo[CategorizedInfoType]]: + versioned_info = map.get(info.identifier, VersionedItemInfo()) + + return map.set(info.identifier, versioned_info.register(info)) + + +class MultirepoItemInfo( + CategorizedItemInfo[ + CategorizedInfoType, + VersionedItemInfo[CategorizedInfoType], + t.Tuple[str, int] + ], + t.Generic[CategorizedInfoType] +): + """ + Stores data of multiple versions of given resource/mapping that may come + from multiple repositories. + """ + SelfType = t.TypeVar( + 'SelfType', + bound = 'MultirepoItemInfo[CategorizedInfoType]' + ) + + def register(self: 'SelfType', item_info: CategorizedInfoType) \ + -> 'SelfType': + """ + Make item info queryable by repo and version. Perform sanity checks for + uuid. + """ + def update( + versioned: t.Optional[VersionedItemInfo[CategorizedInfoType]] + ) -> VersionedItemInfo[CategorizedInfoType]: + if versioned is None: + versioned = VersionedItemInfo() + return versioned.register(item_info) + + return self._update((item_info.repo, item_info.repo_iteration), update) + + @property + def default_info(self) -> CategorizedInfoType: + """ + Find and return info of one of the available options for the newest + version of item. + """ + assert not self.is_empty() + + return self.get_all(reverse_repos=True)[-1] + + def options(self, reverse: bool = False) -> t.Sequence[t.Tuple[str, int]]: + return sorted( + self.items.keys(), + key = (lambda tuple: (tuple[0], 1 - tuple[1])), + reverse = reverse + ) + + def get_all( + self, + reverse_versions: bool = False, + reverse_repos: bool = False + ) -> t.Sequence[CategorizedInfoType]: + """ + Generate item info for all its versions and options, from oldest to + newest version and from. + """ + all_versions: t.Set[versions.VerTuple] = set() + for versioned in self.items.values(): + all_versions.update(versioned.versions()) + + result = [] + + for version in sorted(all_versions, reverse=reverse_versions): + for option in self.options(reverse=reverse_repos): + info = self.items[option].get_by_ver(version) + if info is not None: + result.append(info) + + return result + +MultirepoResourceInfo = MultirepoItemInfo[ResourceInfo] +MultirepoMappingInfo = MultirepoItemInfo[MappingInfo] + + +MultirepoItemInfoMap = Map[str, MultirepoItemInfo] +MultirepoResourceInfoMap = Map[str, MultirepoResourceInfo] +MultirepoMappingInfoMap = Map[str, MultirepoMappingInfo] + +def register_in_multirepo_map( + map: Map[str, MultirepoItemInfo[CategorizedInfoType]], + info: CategorizedInfoType +) -> Map[str, MultirepoItemInfo[CategorizedInfoType]]: + multirepo_info = map.get(info.identifier, MultirepoItemInfo()) + + return map.set(info.identifier, multirepo_info.register(info)) + + +def all_map_infos( + map: Map[str, CategorizedItemInfo[CategorizedInfoType, t.Any, t.Any]] +) -> t.Iterator[CategorizedInfoType]: + for versioned_info in map.values(): + for item_info in versioned_info.get_all(): + yield item_info |