# SPDX-License-Identifier: GPL-3.0-or-later # Reading resources, mappings and other JSON documents from the filesystem. # # This file is part of Hydrilla&Haketilo # # Copyright (C) 2021, 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ ..... """ # Enable using with Python 3.7. from __future__ import annotations import typing as t import dataclasses as dc from pathlib import Path, PurePath from immutables import Map, MapMutation from . import versions, json_instances from .url_patterns import parse_pattern, ParsedUrl from .exceptions import HaketiloException from .translations import smart_gettext as _ VerTuple = t.Tuple[int, ...] @dc.dataclass(frozen=True, unsafe_hash=True) class ItemRef: """....""" identifier: str RefObjs = t.Sequence[t.Mapping[str, t.Any]] def make_item_refs_seq(ref_objs: RefObjs) -> tuple[ItemRef, ...]: """....""" return tuple(ItemRef(ref['identifier']) for ref in ref_objs) def make_required_mappings(refs_objs: t.Any, schema_compat: int) \ -> tuple[ItemRef, ...]: """....""" if schema_compat < 2: return () return make_item_refs_seq(refs_objs) @dc.dataclass(frozen=True, unsafe_hash=True) class FileRef: """....""" name: str sha256: str def make_file_refs_seq(ref_objs: RefObjs) -> tuple[FileRef, ...]: """....""" return tuple(FileRef(ref['file'], ref['sha256']) for ref in ref_objs) @dc.dataclass(frozen=True, unsafe_hash=True) class GeneratedBy: """....""" name: str version: t.Optional[str] @staticmethod def make(generated_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ t.Optional['GeneratedBy']: """....""" if generated_obj is None: return None return GeneratedBy( name = generated_obj['name'], version = generated_obj.get('version') ) @dc.dataclass(frozen=True, unsafe_hash=True) class ItemInfoBase: """....""" repository: str # repository used in __hash__() source_name: str = dc.field(hash=False) source_copyright: tuple[FileRef, ...] = dc.field(hash=False) version: VerTuple # version used in __hash__() identifier: str # identifier used in __hash__() uuid: t.Optional[str] = dc.field(hash=False) long_name: str = dc.field(hash=False) required_mappings: tuple[ItemRef, ...] = dc.field(hash=False) generated_by: t.Optional[GeneratedBy] = dc.field(hash=False) def path_relative_to_type(self) -> str: """ Get a relative path to this item's JSON definition with respect to directory containing items of this type. """ return f'{self.identifier}/{versions.version_string(self.version)}' def path(self) -> str: """ Get a relative path to this item's JSON definition with respect to malcontent directory containing loadable items. """ return f'{self.type_name}/{self.path_relative_to_type()}' @property def versioned_identifier(self): """....""" return f'{self.identifier}-{versions.version_string(self.version)}' @staticmethod def _get_base_init_kwargs( item_obj: t.Mapping[str, t.Any], schema_compat: int, repository: str ) -> t.Mapping[str, t.Any]: """....""" source_copyright = make_file_refs_seq(item_obj['source_copyright']) version = versions.normalize_version(item_obj['version']) required_mappings = make_required_mappings( item_obj.get('required_mappings', []), schema_compat ) generated_by = GeneratedBy.make(item_obj.get('generated_by')) return Map( repository = repository, source_name = item_obj['source_name'], source_copyright = source_copyright, version = version, identifier = item_obj['identifier'], uuid = item_obj.get('uuid'), long_name = item_obj['long_name'], required_mappings = required_mappings, generated_by = generated_by ) # class property type_name = '!INVALID!' InstanceOrPath = t.Union[Path, str, dict[str, t.Any]] @dc.dataclass(frozen=True, unsafe_hash=True) class ResourceInfo(ItemInfoBase): """....""" revision: int = dc.field(hash=False) dependencies: tuple[ItemRef, ...] = dc.field(hash=False) scripts: tuple[FileRef, ...] = dc.field(hash=False) @property def versioned_identifier(self): """....""" return f'{super().versioned_identifier()}-{self.revision}' @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repository: str ) -> 'ResourceInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repository ) return ResourceInfo( **base_init_kwargs, revision = item_obj['revision'], dependencies = make_item_refs_seq(item_obj.get('dependencies', [])), scripts = make_file_refs_seq(item_obj.get('scripts', [])), ) @staticmethod def load(instance_or_path: 'InstanceOrPath', repository: str) \ -> 'ResourceInfo': """....""" return _load_item_info(ResourceInfo, instance_or_path, repository) # class property type_name = 'resource' def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ -> t.Mapping[ParsedUrl, ItemRef]: """....""" mapping: list[tuple[ParsedUrl, ItemRef]] = [] for pattern, ref_obj in payloads_obj.items(): ref = ItemRef(ref_obj['identifier']) mapping.extend((parsed, ref) for parsed in parse_pattern(pattern)) return Map(mapping) @dc.dataclass(frozen=True, unsafe_hash=True) class MappingInfo(ItemInfoBase): """....""" payloads: t.Mapping[ParsedUrl, ItemRef] = dc.field(hash=False) @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repository: str ) -> 'MappingInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repository ) return MappingInfo( **base_init_kwargs, payloads = make_payloads(item_obj.get('payloads', {})) ) @staticmethod def load(instance_or_path: 'InstanceOrPath', repository: str) \ -> 'MappingInfo': """....""" return _load_item_info(MappingInfo, instance_or_path, repository) # class property type_name = 'mapping' LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo) def _load_item_info( info_type: t.Type[LoadedType], instance_or_path: InstanceOrPath, repository: str ) -> LoadedType: """Read, validate and autocomplete a mapping/resource description.""" instance = json_instances.read_instance(instance_or_path) schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json' schema_compat = json_instances.validate_instance(instance, schema_fmt) # We know from successful validation that instance is a dict. return info_type.make( t.cast('dict[str, t.Any]', instance), schema_compat, repository ) VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo) @dc.dataclass(frozen=True) class VersionedItemInfo(t.Generic[VersionedType]): """Stores data of multiple versions of given resource/mapping.""" uuid: t.Optional[str] = None identifier: str = '' _by_version: Map[VerTuple, VersionedType] = Map() _initialized: bool = False def register(self, item_info: VersionedType) -> 'VersionedInfoSelfType': """ Make item info queryable by version. Perform sanity checks for uuid. """ identifier = item_info.identifier if self._initialized: assert identifier == self.identifier if self.uuid is not None: uuid: t.Optional[str] = self.uuid if item_info.uuid is not None and self.uuid != item_info.uuid: raise HaketiloException(_('uuid_mismatch_{identifier}') .format(identifier=identifier)) else: uuid = item_info.uuid by_version = self._by_version.set(item_info.version, item_info) return VersionedItemInfo( identifier = identifier, uuid = uuid, _by_version = by_version, _initialized = True ) def unregister(self, version: VerTuple) -> 'VersionedInfoSelfType': """....""" try: by_version = self._by_version.delete(version) except KeyError: by_version = self._by_version return dc.replace(self, _by_version=by_version) def is_empty(self) -> bool: """....""" return len(self._by_version) == 0 def newest_version(self) -> VerTuple: """....""" assert not self.is_empty() return max(self._by_version.keys()) def get_newest(self) -> VersionedType: """Find and return info of the newest version of item.""" newest = self._by_version[self.newest_version()] assert newest is not None return newest def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]: """ Find and return info of the specified version of the item (or None if absent). """ return self._by_version.get(tuple(ver)) def get_all(self) -> t.Iterator[VersionedType]: """Generate item info for all its versions, from oldest ot newest.""" for version in sorted(self._by_version.keys()): yield self._by_version[version] # Below we define 1 type used by recursively-typed VersionedItemInfo. VersionedInfoSelfType = VersionedItemInfo[VersionedType]