aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/item_infos.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/item_infos.py')
-rw-r--r--src/hydrilla/item_infos.py699
1 files changed, 699 insertions, 0 deletions
diff --git a/src/hydrilla/item_infos.py b/src/hydrilla/item_infos.py
new file mode 100644
index 0000000..430bcd0
--- /dev/null
+++ b/src/hydrilla/item_infos.py
@@ -0,0 +1,699 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Reading resources, mappings and other JSON documents from the filesystem.
+#
+# This file is part of Hydrilla&Haketilo
+#
+# Copyright (C) 2021, 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this
+# code in a proprietary program, I am not going to enforce this in
+# court.
+
+"""
+.....
+"""
+
+import sys
+
+if sys.version_info >= (3, 8):
+ from typing import Protocol
+else:
+ from typing_extensions import Protocol
+
+import enum
+import typing as t
+import dataclasses as dc
+
+from pathlib import Path, PurePosixPath
+from abc import ABC, abstractmethod
+
+from immutables import Map
+
+from . import versions, json_instances
+from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern
+from .exceptions import HaketiloException
+from .translations import smart_gettext as _
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class ItemSpecifier:
+ """...."""
+ identifier: str
+
+ItemSpecs = t.Tuple[ItemSpecifier, ...]
+
+SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]]
+
+def make_item_specifiers_seq(spec_objs: SpecifierObjs) -> ItemSpecs:
+ return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs)
+
+def make_required_mappings(spec_objs: t.Any, schema_compat: int) -> ItemSpecs:
+ if schema_compat < 2:
+ return ()
+
+ return make_item_specifiers_seq(spec_objs)
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class FileSpecifier:
+ """...."""
+ name: str
+ sha256: str
+
+FileSpecs = t.Tuple[FileSpecifier, ...]
+
+def normalize_filename(name: str):
+ """
+ This function eliminated double slashes in file name and ensures it does not
+ try to reference parent directories.
+ """
+ path = PurePosixPath(name)
+
+ if '.' in path.parts or '..' in path.parts:
+ msg = _('err.item_info.filename_invalid_{}').format(name)
+ raise HaketiloException(msg)
+
+ return str(path)
+
+def make_file_specifiers_seq(spec_objs: SpecifierObjs) -> FileSpecs:
+ return tuple(
+ FileSpecifier(normalize_filename(obj['file']), obj['sha256'])
+ for obj
+ in spec_objs
+ )
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class GeneratedBy:
+ """...."""
+ name: str
+ version: t.Optional[str]
+
+ @staticmethod
+ def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
+ t.Optional['GeneratedBy']:
+ """...."""
+ if generated_by_obj is None:
+ return None
+
+ return GeneratedBy(
+ name = generated_by_obj['name'],
+ version = generated_by_obj.get('version')
+ )
+
+
+def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool:
+ if schema_compat < 2:
+ return False
+
+ return perms_obj.get('eval', False)
+
+
+def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool:
+ if schema_compat < 2:
+ return False
+
+ return perms_obj.get('cors_bypass', False)
+
+
+def make_version_constraint(
+ ver: t.Any,
+ schema_compat: int,
+ default: versions.VerTuple
+) -> versions.VerTuple:
+ if schema_compat < 2 or ver is None:
+ return default
+
+ return versions.normalize(ver)
+
+
+class Categorizable(Protocol):
+ """...."""
+ uuid: t.Optional[str]
+ identifier: str
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class ItemIdentity:
+ repo: str
+ repo_iteration: int
+ version: versions.VerTuple
+ identifier: str
+
+
+# mypy needs to be corrected:
+# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class ItemInfoBase(ABC, ItemIdentity, Categorizable):
+ """...."""
+ source_name: str = dc.field(hash=False, compare=False)
+ source_copyright: FileSpecs = dc.field(hash=False, compare=False)
+ uuid: t.Optional[str] = dc.field(hash=False, compare=False)
+ long_name: str = dc.field(hash=False, compare=False)
+ description: str = dc.field(hash=False, compare=False)
+ allows_eval: bool = dc.field(hash=False, compare=False)
+ allows_cors_bypass: bool = dc.field(hash=False, compare=False)
+ min_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False)
+ max_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False)
+ required_mappings: ItemSpecs = dc.field(hash=False, compare=False)
+ generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False)
+
+ @property
+ def version_string(self) -> str:
+ return versions.version_string(self.version)
+
+ @property
+ def versioned_identifier(self) -> str:
+ """...."""
+ return f'{self.identifier}-{self.version_string}'
+
+ @property
+ def files(self) -> FileSpecs:
+ return self.source_copyright
+
+ @property
+ def compatible(self) -> bool:
+ return (self.min_haketilo_ver <= versions.haketilo_version and
+ self.max_haketilo_ver >= versions.haketilo_version)
+
+ @staticmethod
+ def _get_base_init_kwargs(
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repo: str,
+ repo_iteration: int
+ ) -> t.Mapping[str, t.Any]:
+ """...."""
+ source_copyright = make_file_specifiers_seq(
+ item_obj['source_copyright']
+ )
+
+ version = versions.normalize(item_obj['version'])
+
+ perms_obj = item_obj.get('permissions', {})
+
+ eval_perm = make_eval_permission(perms_obj, schema_compat)
+ cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat)
+
+ min_haketilo_ver = make_version_constraint(
+ ver = item_obj.get('min_haketilo_version'),
+ schema_compat = schema_compat,
+ default = versions.int_ver_min
+ )
+ max_haketilo_ver = make_version_constraint(
+ ver = item_obj.get('max_haketilo_version'),
+ schema_compat = schema_compat,
+ default = versions.int_ver_max
+ )
+
+ required_mappings = make_required_mappings(
+ item_obj.get('required_mappings', []),
+ schema_compat
+ )
+
+ generated_by = GeneratedBy.make(item_obj.get('generated_by'))
+
+ return Map(
+ repo = repo,
+ repo_iteration = repo_iteration,
+ source_name = item_obj['source_name'],
+ source_copyright = source_copyright,
+ version = version,
+ identifier = item_obj['identifier'],
+ uuid = item_obj.get('uuid'),
+ long_name = item_obj['long_name'],
+ description = item_obj['description'],
+ allows_eval = eval_perm,
+ allows_cors_bypass = cors_bypass_perm,
+ min_haketilo_ver = min_haketilo_ver,
+ max_haketilo_ver = max_haketilo_ver,
+ required_mappings = required_mappings,
+ generated_by = generated_by
+ )
+
+
+AnyInfo = t.Union['ResourceInfo', 'MappingInfo']
+
+
+class ItemType(enum.Enum):
+ RESOURCE = 'resource'
+ MAPPING = 'mapping'
+
+ @property
+ def info_class(self) -> t.Type[AnyInfo]:
+ if self == ItemType.RESOURCE:
+ return ResourceInfo
+ else:
+ return MappingInfo
+
+ @property
+ def alt_name(self) -> str:
+ if self == ItemType.RESOURCE:
+ return 'library'
+ else:
+ return 'package'
+
+ @property
+ def alt_name_plural(self) -> str:
+ if self == ItemType.RESOURCE:
+ return 'libraries'
+ else:
+ return 'packages'
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class CorrespondsToResourceDCMixin:
+ type: t.ClassVar[ItemType] = ItemType.RESOURCE
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class CorrespondsToMappingDCMixin:
+ type: t.ClassVar[ItemType] = ItemType.MAPPING
+
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class ResourceInfo(ItemInfoBase, CorrespondsToResourceDCMixin):
+ """...."""
+ revision: int = dc.field(hash=False, compare=False)
+ dependencies: ItemSpecs = dc.field(hash=False, compare=False)
+ scripts: FileSpecs = dc.field(hash=False, compare=False)
+
+ @property
+ def version_string(self) -> str:
+ return f'{super().version_string}-{self.revision}'
+
+ @property
+ def files(self) -> FileSpecs:
+ return tuple((*self.source_copyright, *self.scripts))
+
+ @staticmethod
+ def make(
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repo: str,
+ repo_iteration: int
+ ) -> 'ResourceInfo':
+ """...."""
+ base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
+ item_obj,
+ schema_compat,
+ repo,
+ repo_iteration
+ )
+
+ dependencies = make_item_specifiers_seq(
+ item_obj.get('dependencies', [])
+ )
+
+ scripts = make_file_specifiers_seq(
+ item_obj.get('scripts', [])
+ )
+
+ return ResourceInfo(
+ **base_init_kwargs,
+
+ revision = item_obj['revision'],
+ dependencies = dependencies,
+ scripts = scripts
+ )
+
+ @staticmethod
+ def load(
+ instance_source: json_instances.InstanceSource,
+ repo: str = '<dummyrepo>',
+ repo_iteration: int = -1
+ ) -> 'ResourceInfo':
+ """...."""
+ return _load_item_info(
+ ResourceInfo,
+ instance_source,
+ repo,
+ repo_iteration
+ )
+
+ def __lt__(self, other: 'ResourceInfo') -> bool:
+ """...."""
+ return (
+ self.identifier,
+ other.version,
+ other.revision,
+ self.repo,
+ other.repo_iteration
+ ) < (
+ other.identifier,
+ self.version,
+ self.revision,
+ other.repo,
+ self.repo_iteration
+ )
+
+def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
+ -> t.Mapping[ParsedPattern, ItemSpecifier]:
+ """...."""
+ mapping: t.List[t.Tuple[ParsedPattern, ItemSpecifier]] = []
+
+ for pattern, spec_obj in payloads_obj.items():
+ ref = ItemSpecifier(spec_obj['identifier'])
+ mapping.extend((parsed, ref) for parsed in parse_pattern(pattern))
+
+ return Map(mapping)
+
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class MappingInfo(ItemInfoBase, CorrespondsToMappingDCMixin):
+ """...."""
+ payloads: t.Mapping[ParsedPattern, ItemSpecifier] = \
+ dc.field(hash=False, compare=False)
+
+ @staticmethod
+ def make(
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repo: str,
+ repo_iteration: int
+ ) -> 'MappingInfo':
+ """...."""
+ base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
+ item_obj,
+ schema_compat,
+ repo,
+ repo_iteration
+ )
+
+ return MappingInfo(
+ **base_init_kwargs,
+
+ payloads = make_payloads(item_obj.get('payloads', {}))
+ )
+
+ @staticmethod
+ def load(
+ instance_source: json_instances.InstanceSource,
+ repo: str = '<dummyrepo>',
+ repo_iteration: int = -1
+ ) -> 'MappingInfo':
+ """...."""
+ return _load_item_info(
+ MappingInfo,
+ instance_source,
+ repo,
+ repo_iteration
+ )
+
+ def __lt__(self, other: 'MappingInfo') -> bool:
+ """...."""
+ return (
+ self.identifier,
+ other.version,
+ self.repo,
+ other.repo_iteration
+ ) < (
+ other.identifier,
+ self.version,
+ other.repo,
+ self.repo_iteration
+ )
+
+
+LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo)
+
+def _load_item_info(
+ info_type: t.Type[LoadedType],
+ instance_source: json_instances.InstanceSource,
+ repo: str,
+ repo_iteration: int
+) -> LoadedType:
+ """Read, validate and autocomplete a mapping/resource description."""
+ instance = json_instances.read_instance(instance_source)
+
+ schema_fmt = f'api_{info_type.type.value}_description-{{}}.schema.json'
+
+ schema_compat = json_instances.validate_instance(instance, schema_fmt)
+
+ # We know from successful validation that instance is a dict.
+ return info_type.make(
+ t.cast('t.Dict[str, t.Any]', instance),
+ schema_compat,
+ repo,
+ repo_iteration
+ )
+
+
+CategorizedInfoType = t.TypeVar(
+ 'CategorizedInfoType',
+ ResourceInfo,
+ MappingInfo
+)
+
+CategorizedType = t.TypeVar(
+ 'CategorizedType',
+ bound=Categorizable
+)
+
+CategorizedUpdater = t.Callable[
+ [t.Optional[CategorizedType]],
+ t.Optional[CategorizedType]
+]
+
+CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable)
+
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class CategorizedItemInfo(
+ ABC,
+ Categorizable,
+ t.Generic[CategorizedInfoType, CategorizedType, CategoryKeyType]
+):
+ """...."""
+ SelfType = t.TypeVar(
+ 'SelfType',
+ bound = 'CategorizedItemInfo[CategorizedInfoType, CategorizedType, CategoryKeyType]'
+ )
+
+ uuid: t.Optional[str] = None
+ identifier: str = '<dummy>'
+ items: Map[CategoryKeyType, CategorizedType] = Map()
+ _initialized: bool = False
+
+ def _update(
+ self: 'SelfType',
+ key: CategoryKeyType,
+ updater: CategorizedUpdater
+ ) -> 'SelfType':
+ """...... Perform sanity checks for uuid."""
+ uuid = self.uuid
+
+ items = self.items.mutate()
+
+ updated = updater(items.get(key))
+ if updated is None:
+ items.pop(key, None)
+
+ identifier = self.identifier
+ else:
+ items[key] = updated
+
+ identifier = updated.identifier
+ if self._initialized:
+ assert identifier == self.identifier
+
+ if uuid is not None:
+ if updated.uuid is not None and uuid != updated.uuid:
+ raise HaketiloException(_('uuid_mismatch_{identifier}')
+ .format(identifier=identifier))
+ else:
+ uuid = updated.uuid
+
+ return dc.replace(
+ self,
+ identifier = identifier,
+ uuid = uuid,
+ items = items.finish(),
+ _initialized = self._initialized or updated is not None
+ )
+
+ @abstractmethod
+ def register(self: 'SelfType', info: CategorizedInfoType) -> 'SelfType':
+ ...
+
+ @abstractmethod
+ def get_all(self: 'SelfType') -> t.Sequence[CategorizedInfoType]:
+ ...
+
+ def is_empty(self) -> bool:
+ return len(self.items) == 0
+
+
+class VersionedItemInfo(
+ CategorizedItemInfo[
+ CategorizedInfoType,
+ CategorizedInfoType,
+ versions.VerTuple
+ ],
+ t.Generic[CategorizedInfoType]
+):
+ """Stores data of multiple versions of given resource/mapping."""
+ SelfType = t.TypeVar(
+ 'SelfType',
+ bound = 'VersionedItemInfo[CategorizedInfoType]'
+ )
+
+ def register(self: 'SelfType', item_info: CategorizedInfoType) \
+ -> 'SelfType':
+ """
+ Make item info queryable by version. Perform sanity checks for uuid.
+ """
+ return self._update(item_info.version, lambda old_info: item_info)
+
+ @property
+ def newest_version(self) -> versions.VerTuple:
+ """...."""
+ assert not self.is_empty()
+
+ return self.versions()[-1]
+
+ @property
+ def newest_info(self) -> CategorizedInfoType:
+ """Find and return info of the newest version of item."""
+ return self.items[self.newest_version]
+
+ def versions(self, reverse: bool = False) -> t.Sequence[versions.VerTuple]:
+ return sorted(self.items.keys(), reverse=reverse)
+
+ def get_by_ver(self, ver: t.Sequence[int]) \
+ -> t.Optional[CategorizedInfoType]:
+ """
+ Find and return info of the specified version of the item (or None if
+ absent).
+ """
+ return self.items.get(versions.normalize(ver))
+
+ def get_all(self, reverse_versions: bool = False) \
+ -> t.Sequence[CategorizedInfoType]:
+ """
+ Generate item info for all its versions, from oldest to newest unless
+ the opposite is requested.
+ """
+ versions = self.versions(reverse=reverse_versions)
+ return [self.items[ver] for ver in versions]
+
+VersionedResourceInfo = VersionedItemInfo[ResourceInfo]
+VersionedMappingInfo = VersionedItemInfo[MappingInfo]
+
+VersionedItemInfoMap = Map[str, VersionedItemInfo]
+VersionedResourceInfoMap = Map[str, VersionedResourceInfo]
+VersionedMappingInfoMap = Map[str, VersionedMappingInfo]
+
+def register_in_versioned_map(
+ map: Map[str, VersionedItemInfo[CategorizedInfoType]],
+ info: CategorizedInfoType
+) -> Map[str, VersionedItemInfo[CategorizedInfoType]]:
+ versioned_info = map.get(info.identifier, VersionedItemInfo())
+
+ return map.set(info.identifier, versioned_info.register(info))
+
+
+class MultirepoItemInfo(
+ CategorizedItemInfo[
+ CategorizedInfoType,
+ VersionedItemInfo[CategorizedInfoType],
+ t.Tuple[str, int]
+ ],
+ t.Generic[CategorizedInfoType]
+):
+ """
+ Stores data of multiple versions of given resource/mapping that may come
+ from multiple repositories.
+ """
+ SelfType = t.TypeVar(
+ 'SelfType',
+ bound = 'MultirepoItemInfo[CategorizedInfoType]'
+ )
+
+ def register(self: 'SelfType', item_info: CategorizedInfoType) \
+ -> 'SelfType':
+ """
+ Make item info queryable by repo and version. Perform sanity checks for
+ uuid.
+ """
+ def update(
+ versioned: t.Optional[VersionedItemInfo[CategorizedInfoType]]
+ ) -> VersionedItemInfo[CategorizedInfoType]:
+ if versioned is None:
+ versioned = VersionedItemInfo()
+ return versioned.register(item_info)
+
+ return self._update((item_info.repo, item_info.repo_iteration), update)
+
+ @property
+ def default_info(self) -> CategorizedInfoType:
+ """
+ Find and return info of one of the available options for the newest
+ version of item.
+ """
+ assert not self.is_empty()
+
+ return self.get_all(reverse_repos=True)[-1]
+
+ def options(self, reverse: bool = False) -> t.Sequence[t.Tuple[str, int]]:
+ return sorted(
+ self.items.keys(),
+ key = (lambda tuple: (tuple[0], 1 - tuple[1])),
+ reverse = reverse
+ )
+
+ def get_all(
+ self,
+ reverse_versions: bool = False,
+ reverse_repos: bool = False
+ ) -> t.Sequence[CategorizedInfoType]:
+ """
+ Generate item info for all its versions and options, from oldest to
+ newest version and from.
+ """
+ all_versions: t.Set[versions.VerTuple] = set()
+ for versioned in self.items.values():
+ all_versions.update(versioned.versions())
+
+ result = []
+
+ for version in sorted(all_versions, reverse=reverse_versions):
+ for option in self.options(reverse=reverse_repos):
+ info = self.items[option].get_by_ver(version)
+ if info is not None:
+ result.append(info)
+
+ return result
+
+MultirepoResourceInfo = MultirepoItemInfo[ResourceInfo]
+MultirepoMappingInfo = MultirepoItemInfo[MappingInfo]
+
+
+MultirepoItemInfoMap = Map[str, MultirepoItemInfo]
+MultirepoResourceInfoMap = Map[str, MultirepoResourceInfo]
+MultirepoMappingInfoMap = Map[str, MultirepoMappingInfo]
+
+def register_in_multirepo_map(
+ map: Map[str, MultirepoItemInfo[CategorizedInfoType]],
+ info: CategorizedInfoType
+) -> Map[str, MultirepoItemInfo[CategorizedInfoType]]:
+ multirepo_info = map.get(info.identifier, MultirepoItemInfo())
+
+ return map.set(info.identifier, multirepo_info.register(info))
+
+
+def all_map_infos(
+ map: Map[str, CategorizedItemInfo[CategorizedInfoType, t.Any, t.Any]]
+) -> t.Iterator[CategorizedInfoType]:
+ for versioned_info in map.values():
+ for item_info in versioned_info.get_all():
+ yield item_info