diff options
Diffstat (limited to 'src/hydrilla/item_infos.py')
-rw-r--r-- | src/hydrilla/item_infos.py | 529 |
1 files changed, 363 insertions, 166 deletions
diff --git a/src/hydrilla/item_infos.py b/src/hydrilla/item_infos.py index c366ab5..9ba47bd 100644 --- a/src/hydrilla/item_infos.py +++ b/src/hydrilla/item_infos.py @@ -31,48 +31,75 @@ # Enable using with Python 3.7. from __future__ import annotations +import sys + +if sys.version_info >= (3, 8): + from typing import Protocol +else: + from typing_extensions import Protocol + import typing as t import dataclasses as dc -from pathlib import Path, PurePath +from pathlib import Path, PurePosixPath +from abc import ABC -from immutables import Map, MapMutation +from immutables import Map from . import versions, json_instances -from .url_patterns import parse_pattern, ParsedUrl +from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern from .exceptions import HaketiloException from .translations import smart_gettext as _ VerTuple = t.Tuple[int, ...] @dc.dataclass(frozen=True, unsafe_hash=True) -class ItemRef: +class ItemSpecifier: """....""" identifier: str -RefObjs = t.Sequence[t.Mapping[str, t.Any]] +SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]] -def make_item_refs_seq(ref_objs: RefObjs) -> tuple[ItemRef, ...]: +def make_item_specifiers_seq(spec_objs: SpecifierObjs) \ + -> tuple[ItemSpecifier, ...]: """....""" - return tuple(ItemRef(ref['identifier']) for ref in ref_objs) + return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs) -def make_required_mappings(refs_objs: t.Any, schema_compat: int) \ - -> tuple[ItemRef, ...]: +def make_required_mappings(spec_objs: t.Any, schema_compat: int) \ + -> tuple[ItemSpecifier, ...]: """....""" if schema_compat < 2: return () - return make_item_refs_seq(refs_objs) + return make_item_specifiers_seq(spec_objs) @dc.dataclass(frozen=True, unsafe_hash=True) -class FileRef: +class FileSpecifier: """....""" name: str sha256: str -def make_file_refs_seq(ref_objs: RefObjs) -> tuple[FileRef, ...]: +def normalize_filename(name: str): + """ + This function eliminated double slashes in file name and ensures it does not + try to reference parent directories. + """ + path = PurePosixPath(name) + + if '.' in path.parts or '..' in path.parts: + msg = _('err.item_info.filename_invalid_{}').format(name) + raise HaketiloException(msg) + + return str(path) + +def make_file_specifiers_seq(spec_objs: SpecifierObjs) \ + -> tuple[FileSpecifier, ...]: """....""" - return tuple(FileRef(ref['file'], ref['sha256']) for ref in ref_objs) + return tuple( + FileSpecifier(normalize_filename(obj['file']), obj['sha256']) + for obj + in spec_objs + ) @dc.dataclass(frozen=True, unsafe_hash=True) class GeneratedBy: @@ -81,60 +108,107 @@ class GeneratedBy: version: t.Optional[str] @staticmethod - def make(generated_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ + def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ t.Optional['GeneratedBy']: """....""" - if generated_obj is None: + if generated_by_obj is None: return None return GeneratedBy( - name = generated_obj['name'], - version = generated_obj.get('version') + name = generated_by_obj['name'], + version = generated_by_obj.get('version') ) -@dc.dataclass(frozen=True, unsafe_hash=True) -class ItemInfoBase: + +def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool: + if schema_compat < 2: + return False + + return perms_obj.get('eval', False) + + +def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool: + if schema_compat < 2: + return False + + return perms_obj.get('cors_bypass', False) + + +class Categorizable(Protocol): """....""" - repository: str # repository used in __hash__() - source_name: str = dc.field(hash=False) - source_copyright: tuple[FileRef, ...] = dc.field(hash=False) - version: VerTuple # version used in __hash__() - identifier: str # identifier used in __hash__() - uuid: t.Optional[str] = dc.field(hash=False) - long_name: str = dc.field(hash=False) - required_mappings: tuple[ItemRef, ...] = dc.field(hash=False) - generated_by: t.Optional[GeneratedBy] = dc.field(hash=False) - - def path_relative_to_type(self) -> str: - """ - Get a relative path to this item's JSON definition with respect to - directory containing items of this type. - """ - return f'{self.identifier}/{versions.version_string(self.version)}' - - def path(self) -> str: - """ - Get a relative path to this item's JSON definition with respect to - malcontent directory containing loadable items. - """ - return f'{self.type_name}/{self.path_relative_to_type()}' + uuid: t.Optional[str] + identifier: str - @property - def versioned_identifier(self): - """....""" - return f'{self.identifier}-{versions.version_string(self.version)}' +@dc.dataclass(frozen=True, unsafe_hash=True) +class ItemIdentity: + repo: str + repo_iteration: int + version: versions.VerTuple + identifier: str + +# mypy needs to be corrected: +# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704 +@dc.dataclass(frozen=True) # type: ignore[misc] +class ItemInfoBase(ABC, ItemIdentity, Categorizable): + """....""" + type_name: t.ClassVar[str] = '!INVALID!' + + source_name: str = dc.field(hash=False) + source_copyright: tuple[FileSpecifier, ...] = dc.field(hash=False) + uuid: t.Optional[str] = dc.field(hash=False) + long_name: str = dc.field(hash=False) + allows_eval: bool = dc.field(hash=False) + allows_cors_bypass: bool = dc.field(hash=False) + required_mappings: tuple[ItemSpecifier, ...] = dc.field(hash=False) + generated_by: t.Optional[GeneratedBy] = dc.field(hash=False) + + # def path_relative_to_type(self) -> str: + # """ + # Get a relative path to this item's JSON definition with respect to + # directory containing items of this type. + # """ + # return f'{self.identifier}/{versions.version_string(self.version)}' + + # def path(self) -> str: + # """ + # Get a relative path to this item's JSON definition with respect to + # malcontent directory containing loadable items. + # """ + # return f'{self.type_name}/{self.path_relative_to_type()}' + + # @property + # def identity(self): + # """....""" + # return ItemIdentity( + # repository = self.repository, + # version = self.version, + # identifier = self.identifier + # ) + + # @property + # def versioned_identifier(self): + # """....""" + # return f'{self.identifier}-{versions.version_string(self.version)}' @staticmethod def _get_base_init_kwargs( - item_obj: t.Mapping[str, t.Any], - schema_compat: int, - repository: str + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repo: str, + repo_iteration: int ) -> t.Mapping[str, t.Any]: """....""" - source_copyright = make_file_refs_seq(item_obj['source_copyright']) + source_copyright = make_file_specifiers_seq( + item_obj['source_copyright'] + ) version = versions.normalize_version(item_obj['version']) + perms_obj = item_obj.get('permissions', {}) + + eval_perm = make_eval_permission(perms_obj, schema_compat) + cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat) + required_mappings = make_required_mappings( item_obj.get('required_mappings', []), schema_compat @@ -143,28 +217,29 @@ class ItemInfoBase: generated_by = GeneratedBy.make(item_obj.get('generated_by')) return Map( - repository = repository, - source_name = item_obj['source_name'], - source_copyright = source_copyright, - version = version, - identifier = item_obj['identifier'], - uuid = item_obj.get('uuid'), - long_name = item_obj['long_name'], - required_mappings = required_mappings, - generated_by = generated_by + repo = repo, + repo_iteration = repo_iteration, + source_name = item_obj['source_name'], + source_copyright = source_copyright, + version = version, + identifier = item_obj['identifier'], + uuid = item_obj.get('uuid'), + long_name = item_obj['long_name'], + allows_eval = eval_perm, + allows_cors_bypass = cors_bypass_perm, + required_mappings = required_mappings, + generated_by = generated_by ) - # class property - type_name = '!INVALID!' - -InstanceOrPath = t.Union[Path, str, dict[str, t.Any]] @dc.dataclass(frozen=True, unsafe_hash=True) class ResourceInfo(ItemInfoBase): """....""" - revision: int = dc.field(hash=False) - dependencies: tuple[ItemRef, ...] = dc.field(hash=False) - scripts: tuple[FileRef, ...] = dc.field(hash=False) + type_name: t.ClassVar[str] = 'resource' + + revision: int = dc.field(hash=False) + dependencies: tuple[ItemSpecifier, ...] = dc.field(hash=False) + scripts: tuple[FileSpecifier, ...] = dc.field(hash=False) @property def versioned_identifier(self): @@ -173,41 +248,70 @@ class ResourceInfo(ItemInfoBase): @staticmethod def make( - item_obj: t.Mapping[str, t.Any], - schema_compat: int, - repository: str + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repo: str, + repo_iteration: int ) -> 'ResourceInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, - repository + repo, + repo_iteration + ) + + dependencies = make_item_specifiers_seq( + item_obj.get('dependencies', []) + ) + + scripts = make_file_specifiers_seq( + item_obj.get('scripts', []) ) return ResourceInfo( **base_init_kwargs, revision = item_obj['revision'], - dependencies = make_item_refs_seq(item_obj.get('dependencies', [])), - scripts = make_file_refs_seq(item_obj.get('scripts', [])), + dependencies = dependencies, + scripts = scripts ) @staticmethod - def load(instance_or_path: 'InstanceOrPath', repository: str) \ - -> 'ResourceInfo': + def load( + instance_or_path: json_instances.InstanceOrPathOrIO, + repo: str = '<dummyrepo>', + repo_iteration: int = -1 + ) -> 'ResourceInfo': """....""" - return _load_item_info(ResourceInfo, instance_or_path, repository) + return _load_item_info( + ResourceInfo, + instance_or_path, + repo, + repo_iteration + ) - # class property - type_name = 'resource' + # def __lt__(self, other: 'ResourceInfo') -> bool: + # """....""" + # return ( + # self.identifier, + # self.version, + # self.revision, + # self.repository + # ) < ( + # other.identifier, + # other.version, + # other.revision, + # other.repository + # ) def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ - -> t.Mapping[ParsedUrl, ItemRef]: + -> t.Mapping[ParsedPattern, ItemSpecifier]: """....""" - mapping: list[tuple[ParsedUrl, ItemRef]] = [] + mapping: list[tuple[ParsedPattern, ItemSpecifier]] = [] - for pattern, ref_obj in payloads_obj.items(): - ref = ItemRef(ref_obj['identifier']) + for pattern, spec_obj in payloads_obj.items(): + ref = ItemSpecifier(spec_obj['identifier']) mapping.extend((parsed, ref) for parsed in parse_pattern(pattern)) return Map(mapping) @@ -215,19 +319,23 @@ def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ @dc.dataclass(frozen=True, unsafe_hash=True) class MappingInfo(ItemInfoBase): """....""" - payloads: t.Mapping[ParsedUrl, ItemRef] = dc.field(hash=False) + type_name: t.ClassVar[str] = 'mapping' + + payloads: t.Mapping[ParsedPattern, ItemSpecifier] = dc.field(hash=False) @staticmethod def make( - item_obj: t.Mapping[str, t.Any], - schema_compat: int, - repository: str + item_obj: t.Mapping[str, t.Any], + schema_compat: int, + repo: str, + repo_iteration: int ) -> 'MappingInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, - repository + repo, + repo_iteration ) return MappingInfo( @@ -237,10 +345,23 @@ class MappingInfo(ItemInfoBase): ) @staticmethod - def load(instance_or_path: 'InstanceOrPath', repository: str) \ - -> 'MappingInfo': + def load( + instance_or_path: json_instances.InstanceOrPathOrIO, + repo: str = '<dummyrepo>', + repo_iteration: int = -1 + ) -> 'MappingInfo': """....""" - return _load_item_info(MappingInfo, instance_or_path, repository) + return _load_item_info( + MappingInfo, + instance_or_path, + repo, + repo_iteration + ) + + # def __lt__(self, other: 'MappingInfo') -> bool: + # """....""" + # return (self.identifier, self.version, self.repository) < \ + # (other.identifier, other.version, other.repository) # class property type_name = 'mapping' @@ -250,8 +371,9 @@ LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo) def _load_item_info( info_type: t.Type[LoadedType], - instance_or_path: InstanceOrPath, - repository: str + instance_or_path: json_instances.InstanceOrPathOrIO, + repo: str, + repo_iteration: int ) -> LoadedType: """Read, validate and autocomplete a mapping/resource description.""" instance = json_instances.read_instance(instance_or_path) @@ -264,81 +386,156 @@ def _load_item_info( return info_type.make( t.cast('dict[str, t.Any]', instance), schema_compat, - repository + repo, + repo_iteration ) -VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo) - -@dc.dataclass(frozen=True) -class VersionedItemInfo(t.Generic[VersionedType]): - """Stores data of multiple versions of given resource/mapping.""" - uuid: t.Optional[str] = None - identifier: str = '<dummy>' - _by_version: Map[VerTuple, VersionedType] = Map() - _initialized: bool = False - - def register(self, item_info: VersionedType) -> 'VersionedInfoSelfType': - """ - Make item info queryable by version. Perform sanity checks for uuid. - """ - identifier = item_info.identifier - if self._initialized: - assert identifier == self.identifier - - if self.uuid is not None: - uuid: t.Optional[str] = self.uuid - if item_info.uuid is not None and self.uuid != item_info.uuid: - raise HaketiloException(_('uuid_mismatch_{identifier}') - .format(identifier=identifier)) - else: - uuid = item_info.uuid - - by_version = self._by_version.set(item_info.version, item_info) - - return VersionedItemInfo( - identifier = identifier, - uuid = uuid, - _by_version = by_version, - _initialized = True - ) - - def unregister(self, version: VerTuple) -> 'VersionedInfoSelfType': - """....""" - try: - by_version = self._by_version.delete(version) - except KeyError: - by_version = self._by_version - - return dc.replace(self, _by_version=by_version) - - def is_empty(self) -> bool: - """....""" - return len(self._by_version) == 0 - - def newest_version(self) -> VerTuple: - """....""" - assert not self.is_empty() - - return max(self._by_version.keys()) - - def get_newest(self) -> VersionedType: - """Find and return info of the newest version of item.""" - newest = self._by_version[self.newest_version()] - assert newest is not None - return newest - - def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]: - """ - Find and return info of the specified version of the item (or None if - absent). - """ - return self._by_version.get(tuple(ver)) - - def get_all(self) -> t.Iterator[VersionedType]: - """Generate item info for all its versions, from oldest ot newest.""" - for version in sorted(self._by_version.keys()): - yield self._by_version[version] - -# Below we define 1 type used by recursively-typed VersionedItemInfo. -VersionedInfoSelfType = VersionedItemInfo[VersionedType] +# CategorizedType = t.TypeVar( +# 'CategorizedType', +# bound=Categorizable +# ) + +# CategorizedUpdater = t.Callable[ +# [t.Optional[CategorizedType]], +# t.Optional[CategorizedType] +# ] + +# CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable) + +# @dc.dataclass(frozen=True) +# class CategorizedItemInfo(Categorizable, t.Generic[CategorizedType, CategoryKeyType]): +# """....""" +# SelfType = t.TypeVar( +# 'SelfType', +# bound = 'CategorizedItemInfo[CategorizedType, CategoryKeyType]' +# ) + +# uuid: t.Optional[str] = None +# identifier: str = '<dummy>' +# items: Map[CategoryKeyType, CategorizedType] = Map() +# _initialized: bool = False + +# def _update( +# self: 'SelfType', +# key: CategoryKeyType, +# updater: CategorizedUpdater +# ) -> 'SelfType': +# """...... Perform sanity checks for uuid.""" +# uuid = self.uuid + +# items = self.items.mutate() + +# updated = updater(items.get(key)) +# if updated is None: +# items.pop(key, None) + +# identifier = self.identifier +# else: +# items[key] = updated + +# identifier = updated.identifier +# if self._initialized: +# assert identifier == self.identifier + +# if uuid is not None: +# if updated.uuid is not None and uuid != updated.uuid: +# raise HaketiloException(_('uuid_mismatch_{identifier}') +# .format(identifier=identifier)) +# else: +# uuid = updated.uuid + +# return dc.replace( +# self, +# identifier = identifier, +# uuid = uuid, +# items = items.finish(), +# _initialized = self._initialized or updated is not None +# ) + +# def is_empty(self) -> bool: +# """....""" +# return len(self.items) == 0 + + +# VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo) + +# class VersionedItemInfo( +# CategorizedItemInfo[VersionedType, VerTuple], +# t.Generic[VersionedType] +# ): +# """Stores data of multiple versions of given resource/mapping.""" +# SelfType = t.TypeVar('SelfType', bound='VersionedItemInfo[VersionedType]') + +# def register(self: 'SelfType', item_info: VersionedType) -> 'SelfType': +# """ +# Make item info queryable by version. Perform sanity checks for uuid. +# """ +# return self._update(item_info.version, lambda old_info: item_info) + +# def unregister(self: 'SelfType', version: VerTuple) -> 'SelfType': +# """....""" +# return self._update(version, lambda old_info: None) + +# def newest_version(self) -> VerTuple: +# """....""" +# assert not self.is_empty() + +# return max(self.items.keys()) + +# def get_newest(self) -> VersionedType: +# """Find and return info of the newest version of item.""" +# newest = self.items[self.newest_version()] +# assert newest is not None +# return newest + +# def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]: +# """ +# Find and return info of the specified version of the item (or None if +# absent). +# """ +# return self.items.get(tuple(ver)) + +# def get_all(self) -> t.Iterator[VersionedType]: +# """Generate item info for all its versions, from oldest to newest.""" +# for version in sorted(self.items.keys()): +# yield self.items[version] + + +# MultiRepoType = t.TypeVar('MultiRepoType', ResourceInfo, MappingInfo) +# MultiRepoVersioned = VersionedItemInfo[MultiRepoType] + +# class MultiRepoItemInfo( +# CategorizedItemInfo[MultiRepoVersioned, str], +# t.Generic[MultiRepoType] +# ): +# SelfType = t.TypeVar('SelfType', bound='MultiRepoItemInfo[MultiRepoType]') + +# def register(self: 'SelfType', item_info: MultiRepoType) -> 'SelfType': +# """ +# Make item info queryable by version. Perform sanity checks for uuid. +# """ +# def updater(old_item: t.Optional[MultiRepoVersioned]) \ +# -> MultiRepoVersioned: +# """....""" +# if old_item is None: +# old_item = VersionedItemInfo() + +# return old_item.register(item_info) + +# return self._update(item_info.repository, updater) + +# def unregister(self: 'SelfType', version: VerTuple, repository: str) \ +# -> 'SelfType': +# """....""" +# def updater(old_item: t.Optional[MultiRepoVersioned]) -> \ +# t.Optional[MultiRepoVersioned]: +# """....""" +# if old_item is None: +# return None + +# new_item = old_item.unregister(version) + +# return None if new_item.is_empty() else new_item + +# return self._update(repository, updater) |