diff options
author | Wojtek Kosior <koszko@koszko.org> | 2022-06-13 11:06:49 +0200 |
---|---|---|
committer | Wojtek Kosior <koszko@koszko.org> | 2022-07-16 16:31:44 +0200 |
commit | 52d12a4fa124daa1595529e3e7008276a7986d95 (patch) | |
tree | 9b56fe2d28ff0242f8511aca570be455112ad3df /src/hydrilla/item_infos.py | |
parent | 9dcbfdfe8620cc417438d1727aa1e0c89846e9bf (diff) | |
download | haketilo-hydrilla-52d12a4fa124daa1595529e3e7008276a7986d95.tar.gz haketilo-hydrilla-52d12a4fa124daa1595529e3e7008276a7986d95.zip |
unfinished partial work
Diffstat (limited to 'src/hydrilla/item_infos.py')
-rw-r--r-- | src/hydrilla/item_infos.py | 344 |
1 files changed, 344 insertions, 0 deletions
diff --git a/src/hydrilla/item_infos.py b/src/hydrilla/item_infos.py new file mode 100644 index 0000000..c366ab5 --- /dev/null +++ b/src/hydrilla/item_infos.py @@ -0,0 +1,344 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Reading resources, mappings and other JSON documents from the filesystem. +# +# This file is part of Hydrilla&Haketilo +# +# Copyright (C) 2021, 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +..... +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import typing as t +import dataclasses as dc + +from pathlib import Path, PurePath + +from immutables import Map, MapMutation + +from . import versions, json_instances +from .url_patterns import parse_pattern, ParsedUrl +from .exceptions import HaketiloException +from .translations import smart_gettext as _ + +VerTuple = t.Tuple[int, ...] + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ItemRef: + """....""" + identifier: str + +RefObjs = t.Sequence[t.Mapping[str, t.Any]] + +def make_item_refs_seq(ref_objs: RefObjs) -> tuple[ItemRef, ...]: + """....""" + return tuple(ItemRef(ref['identifier']) for ref in ref_objs) + +def make_required_mappings(refs_objs: t.Any, schema_compat: int) \ + -> tuple[ItemRef, ...]: + """....""" + if schema_compat < 2: + return () + + return make_item_refs_seq(refs_objs) + +@dc.dataclass(frozen=True, unsafe_hash=True) +class FileRef: + """....""" + name: str + sha256: str + +def make_file_refs_seq(ref_objs: RefObjs) -> tuple[FileRef, ...]: + """....""" + return tuple(FileRef(ref['file'], ref['sha256']) for ref in ref_objs) + +@dc.dataclass(frozen=True, unsafe_hash=True) +class GeneratedBy: + """....""" + name: str + version: t.Optional[str] + + @staticmethod + def make(generated_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ + t.Optional['GeneratedBy']: + """....""" + if generated_obj is None: + return None + + return GeneratedBy( + name = generated_obj['name'], + version = generated_obj.get('version') + ) + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ItemInfoBase: + """....""" + repository: str # repository used in __hash__() + source_name: str = dc.field(hash=False) + source_copyright: tuple[FileRef, ...] = dc.field(hash=False) + version: VerTuple # version used in __hash__() + identifier: str # identifier used in __hash__() + uuid: t.Optional[str] = dc.field(hash=False) + long_name: str = dc.field(hash=False) + required_mappings: tuple[ItemRef, ...] = dc.field(hash=False) + generated_by: t.Optional[GeneratedBy] = dc.field(hash=False) + + def path_relative_to_type(self) -> str: + """ + Get a relative path to this item's JSON definition with respect to + directory containing items of this type. + """ + return f'{self.identifier}/{versions.version_string(self.version)}' + + def path(self) -> str: + """ + Get a relative path to this item's JSON definition with respect to + malcontent directory containing loadable items. + """ + return f'{self.type_name}/{self.path_relative_to_type()}' + + @property + def versioned_identifier(self): + """....""" + return f'{self.identifier}-{versions.version_string(self.version)}' + + @staticmethod + def _get_base_init_kwargs( + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repository: str + ) -> t.Mapping[str, t.Any]: + """....""" + source_copyright = make_file_refs_seq(item_obj['source_copyright']) + + version = versions.normalize_version(item_obj['version']) + + required_mappings = make_required_mappings( + item_obj.get('required_mappings', []), + schema_compat + ) + + generated_by = GeneratedBy.make(item_obj.get('generated_by')) + + return Map( + repository = repository, + source_name = item_obj['source_name'], + source_copyright = source_copyright, + version = version, + identifier = item_obj['identifier'], + uuid = item_obj.get('uuid'), + long_name = item_obj['long_name'], + required_mappings = required_mappings, + generated_by = generated_by + ) + + # class property + type_name = '!INVALID!' + +InstanceOrPath = t.Union[Path, str, dict[str, t.Any]] + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ResourceInfo(ItemInfoBase): + """....""" + revision: int = dc.field(hash=False) + dependencies: tuple[ItemRef, ...] = dc.field(hash=False) + scripts: tuple[FileRef, ...] = dc.field(hash=False) + + @property + def versioned_identifier(self): + """....""" + return f'{super().versioned_identifier()}-{self.revision}' + + @staticmethod + def make( + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repository: str + ) -> 'ResourceInfo': + """....""" + base_init_kwargs = ItemInfoBase._get_base_init_kwargs( + item_obj, + schema_compat, + repository + ) + + return ResourceInfo( + **base_init_kwargs, + + revision = item_obj['revision'], + dependencies = make_item_refs_seq(item_obj.get('dependencies', [])), + scripts = make_file_refs_seq(item_obj.get('scripts', [])), + ) + + @staticmethod + def load(instance_or_path: 'InstanceOrPath', repository: str) \ + -> 'ResourceInfo': + """....""" + return _load_item_info(ResourceInfo, instance_or_path, repository) + + # class property + type_name = 'resource' + +def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ + -> t.Mapping[ParsedUrl, ItemRef]: + """....""" + mapping: list[tuple[ParsedUrl, ItemRef]] = [] + + for pattern, ref_obj in payloads_obj.items(): + ref = ItemRef(ref_obj['identifier']) + mapping.extend((parsed, ref) for parsed in parse_pattern(pattern)) + + return Map(mapping) + +@dc.dataclass(frozen=True, unsafe_hash=True) +class MappingInfo(ItemInfoBase): + """....""" + payloads: t.Mapping[ParsedUrl, ItemRef] = dc.field(hash=False) + + @staticmethod + def make( + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repository: str + ) -> 'MappingInfo': + """....""" + base_init_kwargs = ItemInfoBase._get_base_init_kwargs( + item_obj, + schema_compat, + repository + ) + + return MappingInfo( + **base_init_kwargs, + + payloads = make_payloads(item_obj.get('payloads', {})) + ) + + @staticmethod + def load(instance_or_path: 'InstanceOrPath', repository: str) \ + -> 'MappingInfo': + """....""" + return _load_item_info(MappingInfo, instance_or_path, repository) + + # class property + type_name = 'mapping' + + +LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo) + +def _load_item_info( + info_type: t.Type[LoadedType], + instance_or_path: InstanceOrPath, + repository: str +) -> LoadedType: + """Read, validate and autocomplete a mapping/resource description.""" + instance = json_instances.read_instance(instance_or_path) + + schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json' + + schema_compat = json_instances.validate_instance(instance, schema_fmt) + + # We know from successful validation that instance is a dict. + return info_type.make( + t.cast('dict[str, t.Any]', instance), + schema_compat, + repository + ) + + +VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo) + +@dc.dataclass(frozen=True) +class VersionedItemInfo(t.Generic[VersionedType]): + """Stores data of multiple versions of given resource/mapping.""" + uuid: t.Optional[str] = None + identifier: str = '<dummy>' + _by_version: Map[VerTuple, VersionedType] = Map() + _initialized: bool = False + + def register(self, item_info: VersionedType) -> 'VersionedInfoSelfType': + """ + Make item info queryable by version. Perform sanity checks for uuid. + """ + identifier = item_info.identifier + if self._initialized: + assert identifier == self.identifier + + if self.uuid is not None: + uuid: t.Optional[str] = self.uuid + if item_info.uuid is not None and self.uuid != item_info.uuid: + raise HaketiloException(_('uuid_mismatch_{identifier}') + .format(identifier=identifier)) + else: + uuid = item_info.uuid + + by_version = self._by_version.set(item_info.version, item_info) + + return VersionedItemInfo( + identifier = identifier, + uuid = uuid, + _by_version = by_version, + _initialized = True + ) + + def unregister(self, version: VerTuple) -> 'VersionedInfoSelfType': + """....""" + try: + by_version = self._by_version.delete(version) + except KeyError: + by_version = self._by_version + + return dc.replace(self, _by_version=by_version) + + def is_empty(self) -> bool: + """....""" + return len(self._by_version) == 0 + + def newest_version(self) -> VerTuple: + """....""" + assert not self.is_empty() + + return max(self._by_version.keys()) + + def get_newest(self) -> VersionedType: + """Find and return info of the newest version of item.""" + newest = self._by_version[self.newest_version()] + assert newest is not None + return newest + + def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]: + """ + Find and return info of the specified version of the item (or None if + absent). + """ + return self._by_version.get(tuple(ver)) + + def get_all(self) -> t.Iterator[VersionedType]: + """Generate item info for all its versions, from oldest ot newest.""" + for version in sorted(self._by_version.keys()): + yield self._by_version[version] + +# Below we define 1 type used by recursively-typed VersionedItemInfo. +VersionedInfoSelfType = VersionedItemInfo[VersionedType] |