# SPDX-License-Identifier: GPL-3.0-or-later # Reading resources, mappings and other JSON documents from the filesystem. # # This file is part of Hydrilla&Haketilo # # Copyright (C) 2021, 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use of this # code in a proprietary program, I am not going to enforce this in # court. """ ..... """ import sys if sys.version_info >= (3, 8): from typing import Protocol else: from typing_extensions import Protocol import enum import typing as t import dataclasses as dc from pathlib import Path, PurePosixPath from abc import ABC, abstractmethod from immutables import Map from . import versions, json_instances from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern from .exceptions import HaketiloException from .translations import smart_gettext as _ @dc.dataclass(frozen=True, unsafe_hash=True) class ItemSpecifier: """....""" identifier: str ItemSpecs = t.Tuple[ItemSpecifier, ...] SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]] def make_item_specifiers_seq(spec_objs: SpecifierObjs) -> ItemSpecs: return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs) def make_required_mappings(spec_objs: t.Any, schema_compat: int) -> ItemSpecs: if schema_compat < 2: return () return make_item_specifiers_seq(spec_objs) @dc.dataclass(frozen=True, unsafe_hash=True) class FileSpecifier: """....""" name: str sha256: str FileSpecs = t.Tuple[FileSpecifier, ...] def normalize_filename(name: str): """ This function eliminated double slashes in file name and ensures it does not try to reference parent directories. """ path = PurePosixPath(name) if '.' in path.parts or '..' in path.parts: msg = _('err.item_info.filename_invalid_{}').format(name) raise HaketiloException(msg) return str(path) def make_file_specifiers_seq(spec_objs: SpecifierObjs) -> FileSpecs: return tuple( FileSpecifier(normalize_filename(obj['file']), obj['sha256']) for obj in spec_objs ) @dc.dataclass(frozen=True, unsafe_hash=True) class GeneratedBy: """....""" name: str version: t.Optional[str] @staticmethod def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ t.Optional['GeneratedBy']: """....""" if generated_by_obj is None: return None return GeneratedBy( name = generated_by_obj['name'], version = generated_by_obj.get('version') ) def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool: if schema_compat < 2: return False return perms_obj.get('eval', False) def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool: if schema_compat < 2: return False return perms_obj.get('cors_bypass', False) def make_version_constraint( ver: t.Any, schema_compat: int, default: versions.VerTuple ) -> versions.VerTuple: if schema_compat < 2 or ver is None: return default return versions.normalize(ver) class Categorizable(Protocol): """....""" uuid: t.Optional[str] identifier: str @dc.dataclass(frozen=True, unsafe_hash=True) class ItemIdentity: repo: str repo_iteration: int version: versions.VerTuple identifier: str # mypy needs to be corrected: # https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704 @dc.dataclass(frozen=True) # type: ignore[misc] class ItemInfoBase(ABC, ItemIdentity, Categorizable): """....""" source_name: str = dc.field(hash=False, compare=False) source_copyright: FileSpecs = dc.field(hash=False, compare=False) uuid: t.Optional[str] = dc.field(hash=False, compare=False) long_name: str = dc.field(hash=False, compare=False) description: str = dc.field(hash=False, compare=False) allows_eval: bool = dc.field(hash=False, compare=False) allows_cors_bypass: bool = dc.field(hash=False, compare=False) min_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False) max_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False) required_mappings: ItemSpecs = dc.field(hash=False, compare=False) generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False) @property def version_string(self) -> str: return versions.version_string(self.version) @property def versioned_identifier(self) -> str: """....""" return f'{self.identifier}-{self.version_string}' @property def files(self) -> FileSpecs: return self.source_copyright @property def compatible(self) -> bool: return (self.min_haketilo_ver <= versions.haketilo_version and self.max_haketilo_ver >= versions.haketilo_version) @staticmethod def _get_base_init_kwargs( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> t.Mapping[str, t.Any]: """....""" source_copyright = make_file_specifiers_seq( item_obj['source_copyright'] ) version = versions.normalize(item_obj['version']) perms_obj = item_obj.get('permissions', {}) eval_perm = make_eval_permission(perms_obj, schema_compat) cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat) min_haketilo_ver = make_version_constraint( ver = item_obj.get('min_haketilo_version'), schema_compat = schema_compat, default = versions.int_ver_min ) max_haketilo_ver = make_version_constraint( ver = item_obj.get('max_haketilo_version'), schema_compat = schema_compat, default = versions.int_ver_max ) required_mappings = make_required_mappings( item_obj.get('required_mappings', []), schema_compat ) generated_by = GeneratedBy.make(item_obj.get('generated_by')) return Map( repo = repo, repo_iteration = repo_iteration, source_name = item_obj['source_name'], source_copyright = source_copyright, version = version, identifier = item_obj['identifier'], uuid = item_obj.get('uuid'), long_name = item_obj['long_name'], description = item_obj['description'], allows_eval = eval_perm, allows_cors_bypass = cors_bypass_perm, min_haketilo_ver = min_haketilo_ver, max_haketilo_ver = max_haketilo_ver, required_mappings = required_mappings, generated_by = generated_by ) AnyInfo = t.Union['ResourceInfo', 'MappingInfo'] class ItemType(enum.Enum): RESOURCE = 'resource' MAPPING = 'mapping' @property def info_class(self) -> t.Type[AnyInfo]: if self == ItemType.RESOURCE: return ResourceInfo else: return MappingInfo @property def alt_name(self) -> str: if self == ItemType.RESOURCE: return 'library' else: return 'package' @property def alt_name_plural(self) -> str: if self == ItemType.RESOURCE: return 'libraries' else: return 'packages' @dc.dataclass(frozen=True, unsafe_hash=True) class CorrespondsToResourceDCMixin: type: t.ClassVar[ItemType] = ItemType.RESOURCE @dc.dataclass(frozen=True, unsafe_hash=True) class CorrespondsToMappingDCMixin: type: t.ClassVar[ItemType] = ItemType.MAPPING @dc.dataclass(frozen=True, unsafe_hash=True) class ResourceInfo(ItemInfoBase, CorrespondsToResourceDCMixin): """....""" revision: int = dc.field(hash=False, compare=False) dependencies: ItemSpecs = dc.field(hash=False, compare=False) scripts: FileSpecs = dc.field(hash=False, compare=False) @property def version_string(self) -> str: return f'{super().version_string}-{self.revision}' @property def files(self) -> FileSpecs: return tuple((*self.source_copyright, *self.scripts)) @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> 'ResourceInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repo, repo_iteration ) dependencies = make_item_specifiers_seq( item_obj.get('dependencies', []) ) scripts = make_file_specifiers_seq( item_obj.get('scripts', []) ) return ResourceInfo( **base_init_kwargs, revision = item_obj['revision'], dependencies = dependencies, scripts = scripts ) @staticmethod def load( instance_source: json_instances.InstanceSource, repo: str = '<dummyrepo>', repo_iteration: int = -1 ) -> 'ResourceInfo': """....""" return _load_item_info( ResourceInfo, instance_source, repo, repo_iteration ) def __lt__(self, other: 'ResourceInfo') -> bool: """....""" return ( self.identifier, other.version, other.revision, self.repo, other.repo_iteration ) < ( other.identifier, self.version, self.revision, other.repo, self.repo_iteration ) def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ -> t.Mapping[ParsedPattern, ItemSpecifier]: """....""" mapping: t.List[t.Tuple[ParsedPattern, ItemSpecifier]] = [] for pattern, spec_obj in payloads_obj.items(): ref = ItemSpecifier(spec_obj['identifier']) mapping.extend((parsed, ref) for parsed in parse_pattern(pattern)) return Map(mapping) @dc.dataclass(frozen=True, unsafe_hash=True) class MappingInfo(ItemInfoBase, CorrespondsToMappingDCMixin): """....""" payloads: t.Mapping[ParsedPattern, ItemSpecifier] = \ dc.field(hash=False, compare=False) @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> 'MappingInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repo, repo_iteration ) return MappingInfo( **base_init_kwargs, payloads = make_payloads(item_obj.get('payloads', {})) ) @staticmethod def load( instance_source: json_instances.InstanceSource, repo: str = '<dummyrepo>', repo_iteration: int = -1 ) -> 'MappingInfo': """....""" return _load_item_info( MappingInfo, instance_source, repo, repo_iteration ) def __lt__(self, other: 'MappingInfo') -> bool: """....""" return ( self.identifier, other.version, self.repo, other.repo_iteration ) < ( other.identifier, self.version, other.repo, self.repo_iteration ) LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo) def _load_item_info( info_type: t.Type[LoadedType], instance_source: json_instances.InstanceSource, repo: str, repo_iteration: int ) -> LoadedType: """Read, validate and autocomplete a mapping/resource description.""" instance = json_instances.read_instance(instance_source) schema_fmt = f'api_{info_type.type.value}_description-{{}}.schema.json' schema_compat = json_instances.validate_instance(instance, schema_fmt) # We know from successful validation that instance is a dict. return info_type.make( t.cast('t.Dict[str, t.Any]', instance), schema_compat, repo, repo_iteration ) CategorizedInfoType = t.TypeVar( 'CategorizedInfoType', ResourceInfo, MappingInfo ) CategorizedType = t.TypeVar( 'CategorizedType', bound=Categorizable ) CategorizedUpdater = t.Callable[ [t.Optional[CategorizedType]], t.Optional[CategorizedType] ] CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable) @dc.dataclass(frozen=True) # type: ignore[misc] class CategorizedItemInfo( ABC, Categorizable, t.Generic[CategorizedInfoType, CategorizedType, CategoryKeyType] ): """....""" SelfType = t.TypeVar( 'SelfType', bound = 'CategorizedItemInfo[CategorizedInfoType, CategorizedType, CategoryKeyType]' ) uuid: t.Optional[str] = None identifier: str = '<dummy>' items: Map[CategoryKeyType, CategorizedType] = Map() _initialized: bool = False def _update( self: 'SelfType', key: CategoryKeyType, updater: CategorizedUpdater ) -> 'SelfType': """...... Perform sanity checks for uuid.""" uuid = self.uuid items = self.items.mutate() updated = updater(items.get(key)) if updated is None: items.pop(key, None) identifier = self.identifier else: items[key] = updated identifier = updated.identifier if self._initialized: assert identifier == self.identifier if uuid is not None: if updated.uuid is not None and uuid != updated.uuid: raise HaketiloException(_('uuid_mismatch_{identifier}') .format(identifier=identifier)) else: uuid = updated.uuid return dc.replace( self, identifier = identifier, uuid = uuid, items = items.finish(), _initialized = self._initialized or updated is not None ) @abstractmethod def register(self: 'SelfType', info: CategorizedInfoType) -> 'SelfType': ... @abstractmethod def get_all(self: 'SelfType') -> t.Sequence[CategorizedInfoType]: ... def is_empty(self) -> bool: return len(self.items) == 0 class VersionedItemInfo( CategorizedItemInfo[ CategorizedInfoType, CategorizedInfoType, versions.VerTuple ], t.Generic[CategorizedInfoType] ): """Stores data of multiple versions of given resource/mapping.""" SelfType = t.TypeVar( 'SelfType', bound = 'VersionedItemInfo[CategorizedInfoType]' ) def register(self: 'SelfType', item_info: CategorizedInfoType) \ -> 'SelfType': """ Make item info queryable by version. Perform sanity checks for uuid. """ return self._update(item_info.version, lambda old_info: item_info) @property def newest_version(self) -> versions.VerTuple: """....""" assert not self.is_empty() return self.versions()[-1] @property def newest_info(self) -> CategorizedInfoType: """Find and return info of the newest version of item.""" return self.items[self.newest_version] def versions(self, reverse: bool = False) -> t.Sequence[versions.VerTuple]: return sorted(self.items.keys(), reverse=reverse) def get_by_ver(self, ver: t.Sequence[int]) \ -> t.Optional[CategorizedInfoType]: """ Find and return info of the specified version of the item (or None if absent). """ return self.items.get(versions.normalize(ver)) def get_all(self, reverse_versions: bool = False) \ -> t.Sequence[CategorizedInfoType]: """ Generate item info for all its versions, from oldest to newest unless the opposite is requested. """ versions = self.versions(reverse=reverse_versions) return [self.items[ver] for ver in versions] VersionedResourceInfo = VersionedItemInfo[ResourceInfo] VersionedMappingInfo = VersionedItemInfo[MappingInfo] VersionedItemInfoMap = Map[str, VersionedItemInfo] VersionedResourceInfoMap = Map[str, VersionedResourceInfo] VersionedMappingInfoMap = Map[str, VersionedMappingInfo] def register_in_versioned_map( map: Map[str, VersionedItemInfo[CategorizedInfoType]], info: CategorizedInfoType ) -> Map[str, VersionedItemInfo[CategorizedInfoType]]: versioned_info = map.get(info.identifier, VersionedItemInfo()) return map.set(info.identifier, versioned_info.register(info)) class MultirepoItemInfo( CategorizedItemInfo[ CategorizedInfoType, VersionedItemInfo[CategorizedInfoType], t.Tuple[str, int] ], t.Generic[CategorizedInfoType] ): """ Stores data of multiple versions of given resource/mapping that may come from multiple repositories. """ SelfType = t.TypeVar( 'SelfType', bound = 'MultirepoItemInfo[CategorizedInfoType]' ) def register(self: 'SelfType', item_info: CategorizedInfoType) \ -> 'SelfType': """ Make item info queryable by repo and version. Perform sanity checks for uuid. """ def update( versioned: t.Optional[VersionedItemInfo[CategorizedInfoType]] ) -> VersionedItemInfo[CategorizedInfoType]: if versioned is None: versioned = VersionedItemInfo() return versioned.register(item_info) return self._update((item_info.repo, item_info.repo_iteration), update) @property def default_info(self) -> CategorizedInfoType: """ Find and return info of one of the available options for the newest version of item. """ assert not self.is_empty() return self.get_all(reverse_repos=True)[-1] def options(self, reverse: bool = False) -> t.Sequence[t.Tuple[str, int]]: return sorted( self.items.keys(), key = (lambda tuple: (tuple[0], 1 - tuple[1])), reverse = reverse ) def get_all( self, reverse_versions: bool = False, reverse_repos: bool = False ) -> t.Sequence[CategorizedInfoType]: """ Generate item info for all its versions and options, from oldest to newest version and from. """ all_versions: t.Set[versions.VerTuple] = set() for versioned in self.items.values(): all_versions.update(versioned.versions()) result = [] for version in sorted(all_versions, reverse=reverse_versions): for option in self.options(reverse=reverse_repos): info = self.items[option].get_by_ver(version) if info is not None: result.append(info) return result MultirepoResourceInfo = MultirepoItemInfo[ResourceInfo] MultirepoMappingInfo = MultirepoItemInfo[MappingInfo] MultirepoItemInfoMap = Map[str, MultirepoItemInfo] MultirepoResourceInfoMap = Map[str, MultirepoResourceInfo] MultirepoMappingInfoMap = Map[str, MultirepoMappingInfo] def register_in_multirepo_map( map: Map[str, MultirepoItemInfo[CategorizedInfoType]], info: CategorizedInfoType ) -> Map[str, MultirepoItemInfo[CategorizedInfoType]]: multirepo_info = map.get(info.identifier, MultirepoItemInfo()) return map.set(info.identifier, multirepo_info.register(info)) def all_map_infos( map: Map[str, CategorizedItemInfo[CategorizedInfoType, t.Any, t.Any]] ) -> t.Iterator[CategorizedInfoType]: for versioned_info in map.values(): for item_info in versioned_info.get_all(): yield item_info