path: root/src/hydrilla/item_infos.py
diff options
authorWojtek Kosior <koszko@koszko.org>2022-06-13 11:06:49 +0200
committerWojtek Kosior <koszko@koszko.org>2022-07-16 16:31:44 +0200
commit52d12a4fa124daa1595529e3e7008276a7986d95 (patch)
tree9b56fe2d28ff0242f8511aca570be455112ad3df /src/hydrilla/item_infos.py
parent9dcbfdfe8620cc417438d1727aa1e0c89846e9bf (diff)
unfinished partial work
Diffstat (limited to 'src/hydrilla/item_infos.py')
1 files changed, 344 insertions, 0 deletions
diff --git a/src/hydrilla/item_infos.py b/src/hydrilla/item_infos.py
new file mode 100644
index 0000000..c366ab5
--- /dev/null
+++ b/src/hydrilla/item_infos.py
@@ -0,0 +1,344 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+# Reading resources, mappings and other JSON documents from the filesystem.
+# This file is part of Hydrilla&Haketilo
+# Copyright (C) 2021, 2022 Wojtek Kosior
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+# Enable using with Python 3.7.
+from __future__ import annotations
+import typing as t
+import dataclasses as dc
+from pathlib import Path, PurePath
+from immutables import Map, MapMutation
+from . import versions, json_instances
+from .url_patterns import parse_pattern, ParsedUrl
+from .exceptions import HaketiloException
+from .translations import smart_gettext as _
+VerTuple = t.Tuple[int, ...]
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class ItemRef:
+ """...."""
+ identifier: str
+RefObjs = t.Sequence[t.Mapping[str, t.Any]]
+def make_item_refs_seq(ref_objs: RefObjs) -> tuple[ItemRef, ...]:
+ """...."""
+ return tuple(ItemRef(ref['identifier']) for ref in ref_objs)
+def make_required_mappings(refs_objs: t.Any, schema_compat: int) \
+ -> tuple[ItemRef, ...]:
+ """...."""
+ if schema_compat < 2:
+ return ()
+ return make_item_refs_seq(refs_objs)
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class FileRef:
+ """...."""
+ name: str
+ sha256: str
+def make_file_refs_seq(ref_objs: RefObjs) -> tuple[FileRef, ...]:
+ """...."""
+ return tuple(FileRef(ref['file'], ref['sha256']) for ref in ref_objs)
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class GeneratedBy:
+ """...."""
+ name: str
+ version: t.Optional[str]
+ @staticmethod
+ def make(generated_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
+ t.Optional['GeneratedBy']:
+ """...."""
+ if generated_obj is None:
+ return None
+ return GeneratedBy(
+ name = generated_obj['name'],
+ version = generated_obj.get('version')
+ )
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class ItemInfoBase:
+ """...."""
+ repository: str # repository used in __hash__()
+ source_name: str = dc.field(hash=False)
+ source_copyright: tuple[FileRef, ...] = dc.field(hash=False)
+ version: VerTuple # version used in __hash__()
+ identifier: str # identifier used in __hash__()
+ uuid: t.Optional[str] = dc.field(hash=False)
+ long_name: str = dc.field(hash=False)
+ required_mappings: tuple[ItemRef, ...] = dc.field(hash=False)
+ generated_by: t.Optional[GeneratedBy] = dc.field(hash=False)
+ def path_relative_to_type(self) -> str:
+ """
+ Get a relative path to this item's JSON definition with respect to
+ directory containing items of this type.
+ """
+ return f'{self.identifier}/{versions.version_string(self.version)}'
+ def path(self) -> str:
+ """
+ Get a relative path to this item's JSON definition with respect to
+ malcontent directory containing loadable items.
+ """
+ return f'{self.type_name}/{self.path_relative_to_type()}'
+ @property
+ def versioned_identifier(self):
+ """...."""
+ return f'{self.identifier}-{versions.version_string(self.version)}'
+ @staticmethod
+ def _get_base_init_kwargs(
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repository: str
+ ) -> t.Mapping[str, t.Any]:
+ """...."""
+ source_copyright = make_file_refs_seq(item_obj['source_copyright'])
+ version = versions.normalize_version(item_obj['version'])
+ required_mappings = make_required_mappings(
+ item_obj.get('required_mappings', []),
+ schema_compat
+ )
+ generated_by = GeneratedBy.make(item_obj.get('generated_by'))
+ return Map(
+ repository = repository,
+ source_name = item_obj['source_name'],
+ source_copyright = source_copyright,
+ version = version,
+ identifier = item_obj['identifier'],
+ uuid = item_obj.get('uuid'),
+ long_name = item_obj['long_name'],
+ required_mappings = required_mappings,
+ generated_by = generated_by
+ )
+ # class property
+ type_name = '!INVALID!'
+InstanceOrPath = t.Union[Path, str, dict[str, t.Any]]
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class ResourceInfo(ItemInfoBase):
+ """...."""
+ revision: int = dc.field(hash=False)
+ dependencies: tuple[ItemRef, ...] = dc.field(hash=False)
+ scripts: tuple[FileRef, ...] = dc.field(hash=False)
+ @property
+ def versioned_identifier(self):
+ """...."""
+ return f'{super().versioned_identifier()}-{self.revision}'
+ @staticmethod
+ def make(
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repository: str
+ ) -> 'ResourceInfo':
+ """...."""
+ base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
+ item_obj,
+ schema_compat,
+ repository
+ )
+ return ResourceInfo(
+ **base_init_kwargs,
+ revision = item_obj['revision'],
+ dependencies = make_item_refs_seq(item_obj.get('dependencies', [])),
+ scripts = make_file_refs_seq(item_obj.get('scripts', [])),
+ )
+ @staticmethod
+ def load(instance_or_path: 'InstanceOrPath', repository: str) \
+ -> 'ResourceInfo':
+ """...."""
+ return _load_item_info(ResourceInfo, instance_or_path, repository)
+ # class property
+ type_name = 'resource'
+def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
+ -> t.Mapping[ParsedUrl, ItemRef]:
+ """...."""
+ mapping: list[tuple[ParsedUrl, ItemRef]] = []
+ for pattern, ref_obj in payloads_obj.items():
+ ref = ItemRef(ref_obj['identifier'])
+ mapping.extend((parsed, ref) for parsed in parse_pattern(pattern))
+ return Map(mapping)
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class MappingInfo(ItemInfoBase):
+ """...."""
+ payloads: t.Mapping[ParsedUrl, ItemRef] = dc.field(hash=False)
+ @staticmethod
+ def make(
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repository: str
+ ) -> 'MappingInfo':
+ """...."""
+ base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
+ item_obj,
+ schema_compat,
+ repository
+ )
+ return MappingInfo(
+ **base_init_kwargs,
+ payloads = make_payloads(item_obj.get('payloads', {}))
+ )
+ @staticmethod
+ def load(instance_or_path: 'InstanceOrPath', repository: str) \
+ -> 'MappingInfo':
+ """...."""
+ return _load_item_info(MappingInfo, instance_or_path, repository)
+ # class property
+ type_name = 'mapping'
+LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo)
+def _load_item_info(
+ info_type: t.Type[LoadedType],
+ instance_or_path: InstanceOrPath,
+ repository: str
+) -> LoadedType:
+ """Read, validate and autocomplete a mapping/resource description."""
+ instance = json_instances.read_instance(instance_or_path)
+ schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json'
+ schema_compat = json_instances.validate_instance(instance, schema_fmt)
+ # We know from successful validation that instance is a dict.
+ return info_type.make(
+ t.cast('dict[str, t.Any]', instance),
+ schema_compat,
+ repository
+ )
+VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo)
+class VersionedItemInfo(t.Generic[VersionedType]):
+ """Stores data of multiple versions of given resource/mapping."""
+ uuid: t.Optional[str] = None
+ identifier: str = '<dummy>'
+ _by_version: Map[VerTuple, VersionedType] = Map()
+ _initialized: bool = False
+ def register(self, item_info: VersionedType) -> 'VersionedInfoSelfType':
+ """
+ Make item info queryable by version. Perform sanity checks for uuid.
+ """
+ identifier = item_info.identifier
+ if self._initialized:
+ assert identifier == self.identifier
+ if self.uuid is not None:
+ uuid: t.Optional[str] = self.uuid
+ if item_info.uuid is not None and self.uuid != item_info.uuid:
+ raise HaketiloException(_('uuid_mismatch_{identifier}')
+ .format(identifier=identifier))
+ else:
+ uuid = item_info.uuid
+ by_version = self._by_version.set(item_info.version, item_info)
+ return VersionedItemInfo(
+ identifier = identifier,
+ uuid = uuid,
+ _by_version = by_version,
+ _initialized = True
+ )
+ def unregister(self, version: VerTuple) -> 'VersionedInfoSelfType':
+ """...."""
+ try:
+ by_version = self._by_version.delete(version)
+ except KeyError:
+ by_version = self._by_version
+ return dc.replace(self, _by_version=by_version)
+ def is_empty(self) -> bool:
+ """...."""
+ return len(self._by_version) == 0
+ def newest_version(self) -> VerTuple:
+ """...."""
+ assert not self.is_empty()
+ return max(self._by_version.keys())
+ def get_newest(self) -> VersionedType:
+ """Find and return info of the newest version of item."""
+ newest = self._by_version[self.newest_version()]
+ assert newest is not None
+ return newest
+ def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]:
+ """
+ Find and return info of the specified version of the item (or None if
+ absent).
+ """
+ return self._by_version.get(tuple(ver))
+ def get_all(self) -> t.Iterator[VersionedType]:
+ """Generate item info for all its versions, from oldest ot newest."""
+ for version in sorted(self._by_version.keys()):
+ yield self._by_version[version]
+# Below we define 1 type used by recursively-typed VersionedItemInfo.
+VersionedInfoSelfType = VersionedItemInfo[VersionedType]