aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/item_infos.py
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-07-27 15:56:24 +0200
committerWojtek Kosior <koszko@koszko.org>2022-08-10 17:25:05 +0200
commit879c41927171efc8d77d1de2739b18e2eb57580f (patch)
treede0e78afe2ea49e58c9bf2c662657392a00139ee /src/hydrilla/item_infos.py
parent52d12a4fa124daa1595529e3e7008276a7986d95 (diff)
downloadhaketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.tar.gz
haketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.zip
unfinished partial work
Diffstat (limited to 'src/hydrilla/item_infos.py')
-rw-r--r--src/hydrilla/item_infos.py529
1 files changed, 363 insertions, 166 deletions
diff --git a/src/hydrilla/item_infos.py b/src/hydrilla/item_infos.py
index c366ab5..9ba47bd 100644
--- a/src/hydrilla/item_infos.py
+++ b/src/hydrilla/item_infos.py
@@ -31,48 +31,75 @@
# Enable using with Python 3.7.
from __future__ import annotations
+import sys
+
+if sys.version_info >= (3, 8):
+ from typing import Protocol
+else:
+ from typing_extensions import Protocol
+
import typing as t
import dataclasses as dc
-from pathlib import Path, PurePath
+from pathlib import Path, PurePosixPath
+from abc import ABC
-from immutables import Map, MapMutation
+from immutables import Map
from . import versions, json_instances
-from .url_patterns import parse_pattern, ParsedUrl
+from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern
from .exceptions import HaketiloException
from .translations import smart_gettext as _
VerTuple = t.Tuple[int, ...]
@dc.dataclass(frozen=True, unsafe_hash=True)
-class ItemRef:
+class ItemSpecifier:
"""...."""
identifier: str
-RefObjs = t.Sequence[t.Mapping[str, t.Any]]
+SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]]
-def make_item_refs_seq(ref_objs: RefObjs) -> tuple[ItemRef, ...]:
+def make_item_specifiers_seq(spec_objs: SpecifierObjs) \
+ -> tuple[ItemSpecifier, ...]:
"""...."""
- return tuple(ItemRef(ref['identifier']) for ref in ref_objs)
+ return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs)
-def make_required_mappings(refs_objs: t.Any, schema_compat: int) \
- -> tuple[ItemRef, ...]:
+def make_required_mappings(spec_objs: t.Any, schema_compat: int) \
+ -> tuple[ItemSpecifier, ...]:
"""...."""
if schema_compat < 2:
return ()
- return make_item_refs_seq(refs_objs)
+ return make_item_specifiers_seq(spec_objs)
@dc.dataclass(frozen=True, unsafe_hash=True)
-class FileRef:
+class FileSpecifier:
"""...."""
name: str
sha256: str
-def make_file_refs_seq(ref_objs: RefObjs) -> tuple[FileRef, ...]:
+def normalize_filename(name: str):
+ """
+ This function eliminated double slashes in file name and ensures it does not
+ try to reference parent directories.
+ """
+ path = PurePosixPath(name)
+
+ if '.' in path.parts or '..' in path.parts:
+ msg = _('err.item_info.filename_invalid_{}').format(name)
+ raise HaketiloException(msg)
+
+ return str(path)
+
+def make_file_specifiers_seq(spec_objs: SpecifierObjs) \
+ -> tuple[FileSpecifier, ...]:
"""...."""
- return tuple(FileRef(ref['file'], ref['sha256']) for ref in ref_objs)
+ return tuple(
+ FileSpecifier(normalize_filename(obj['file']), obj['sha256'])
+ for obj
+ in spec_objs
+ )
@dc.dataclass(frozen=True, unsafe_hash=True)
class GeneratedBy:
@@ -81,60 +108,107 @@ class GeneratedBy:
version: t.Optional[str]
@staticmethod
- def make(generated_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
+ def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
t.Optional['GeneratedBy']:
"""...."""
- if generated_obj is None:
+ if generated_by_obj is None:
return None
return GeneratedBy(
- name = generated_obj['name'],
- version = generated_obj.get('version')
+ name = generated_by_obj['name'],
+ version = generated_by_obj.get('version')
)
-@dc.dataclass(frozen=True, unsafe_hash=True)
-class ItemInfoBase:
+
+def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool:
+ if schema_compat < 2:
+ return False
+
+ return perms_obj.get('eval', False)
+
+
+def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool:
+ if schema_compat < 2:
+ return False
+
+ return perms_obj.get('cors_bypass', False)
+
+
+class Categorizable(Protocol):
"""...."""
- repository: str # repository used in __hash__()
- source_name: str = dc.field(hash=False)
- source_copyright: tuple[FileRef, ...] = dc.field(hash=False)
- version: VerTuple # version used in __hash__()
- identifier: str # identifier used in __hash__()
- uuid: t.Optional[str] = dc.field(hash=False)
- long_name: str = dc.field(hash=False)
- required_mappings: tuple[ItemRef, ...] = dc.field(hash=False)
- generated_by: t.Optional[GeneratedBy] = dc.field(hash=False)
-
- def path_relative_to_type(self) -> str:
- """
- Get a relative path to this item's JSON definition with respect to
- directory containing items of this type.
- """
- return f'{self.identifier}/{versions.version_string(self.version)}'
-
- def path(self) -> str:
- """
- Get a relative path to this item's JSON definition with respect to
- malcontent directory containing loadable items.
- """
- return f'{self.type_name}/{self.path_relative_to_type()}'
+ uuid: t.Optional[str]
+ identifier: str
- @property
- def versioned_identifier(self):
- """...."""
- return f'{self.identifier}-{versions.version_string(self.version)}'
+@dc.dataclass(frozen=True, unsafe_hash=True)
+class ItemIdentity:
+ repo: str
+ repo_iteration: int
+ version: versions.VerTuple
+ identifier: str
+
+# mypy needs to be corrected:
+# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704
+@dc.dataclass(frozen=True) # type: ignore[misc]
+class ItemInfoBase(ABC, ItemIdentity, Categorizable):
+ """...."""
+ type_name: t.ClassVar[str] = '!INVALID!'
+
+ source_name: str = dc.field(hash=False)
+ source_copyright: tuple[FileSpecifier, ...] = dc.field(hash=False)
+ uuid: t.Optional[str] = dc.field(hash=False)
+ long_name: str = dc.field(hash=False)
+ allows_eval: bool = dc.field(hash=False)
+ allows_cors_bypass: bool = dc.field(hash=False)
+ required_mappings: tuple[ItemSpecifier, ...] = dc.field(hash=False)
+ generated_by: t.Optional[GeneratedBy] = dc.field(hash=False)
+
+ # def path_relative_to_type(self) -> str:
+ # """
+ # Get a relative path to this item's JSON definition with respect to
+ # directory containing items of this type.
+ # """
+ # return f'{self.identifier}/{versions.version_string(self.version)}'
+
+ # def path(self) -> str:
+ # """
+ # Get a relative path to this item's JSON definition with respect to
+ # malcontent directory containing loadable items.
+ # """
+ # return f'{self.type_name}/{self.path_relative_to_type()}'
+
+ # @property
+ # def identity(self):
+ # """...."""
+ # return ItemIdentity(
+ # repository = self.repository,
+ # version = self.version,
+ # identifier = self.identifier
+ # )
+
+ # @property
+ # def versioned_identifier(self):
+ # """...."""
+ # return f'{self.identifier}-{versions.version_string(self.version)}'
@staticmethod
def _get_base_init_kwargs(
- item_obj: t.Mapping[str, t.Any],
- schema_compat: int,
- repository: str
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repo: str,
+ repo_iteration: int
) -> t.Mapping[str, t.Any]:
"""...."""
- source_copyright = make_file_refs_seq(item_obj['source_copyright'])
+ source_copyright = make_file_specifiers_seq(
+ item_obj['source_copyright']
+ )
version = versions.normalize_version(item_obj['version'])
+ perms_obj = item_obj.get('permissions', {})
+
+ eval_perm = make_eval_permission(perms_obj, schema_compat)
+ cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat)
+
required_mappings = make_required_mappings(
item_obj.get('required_mappings', []),
schema_compat
@@ -143,28 +217,29 @@ class ItemInfoBase:
generated_by = GeneratedBy.make(item_obj.get('generated_by'))
return Map(
- repository = repository,
- source_name = item_obj['source_name'],
- source_copyright = source_copyright,
- version = version,
- identifier = item_obj['identifier'],
- uuid = item_obj.get('uuid'),
- long_name = item_obj['long_name'],
- required_mappings = required_mappings,
- generated_by = generated_by
+ repo = repo,
+ repo_iteration = repo_iteration,
+ source_name = item_obj['source_name'],
+ source_copyright = source_copyright,
+ version = version,
+ identifier = item_obj['identifier'],
+ uuid = item_obj.get('uuid'),
+ long_name = item_obj['long_name'],
+ allows_eval = eval_perm,
+ allows_cors_bypass = cors_bypass_perm,
+ required_mappings = required_mappings,
+ generated_by = generated_by
)
- # class property
- type_name = '!INVALID!'
-
-InstanceOrPath = t.Union[Path, str, dict[str, t.Any]]
@dc.dataclass(frozen=True, unsafe_hash=True)
class ResourceInfo(ItemInfoBase):
"""...."""
- revision: int = dc.field(hash=False)
- dependencies: tuple[ItemRef, ...] = dc.field(hash=False)
- scripts: tuple[FileRef, ...] = dc.field(hash=False)
+ type_name: t.ClassVar[str] = 'resource'
+
+ revision: int = dc.field(hash=False)
+ dependencies: tuple[ItemSpecifier, ...] = dc.field(hash=False)
+ scripts: tuple[FileSpecifier, ...] = dc.field(hash=False)
@property
def versioned_identifier(self):
@@ -173,41 +248,70 @@ class ResourceInfo(ItemInfoBase):
@staticmethod
def make(
- item_obj: t.Mapping[str, t.Any],
- schema_compat: int,
- repository: str
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repo: str,
+ repo_iteration: int
) -> 'ResourceInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
- repository
+ repo,
+ repo_iteration
+ )
+
+ dependencies = make_item_specifiers_seq(
+ item_obj.get('dependencies', [])
+ )
+
+ scripts = make_file_specifiers_seq(
+ item_obj.get('scripts', [])
)
return ResourceInfo(
**base_init_kwargs,
revision = item_obj['revision'],
- dependencies = make_item_refs_seq(item_obj.get('dependencies', [])),
- scripts = make_file_refs_seq(item_obj.get('scripts', [])),
+ dependencies = dependencies,
+ scripts = scripts
)
@staticmethod
- def load(instance_or_path: 'InstanceOrPath', repository: str) \
- -> 'ResourceInfo':
+ def load(
+ instance_or_path: json_instances.InstanceOrPathOrIO,
+ repo: str = '<dummyrepo>',
+ repo_iteration: int = -1
+ ) -> 'ResourceInfo':
"""...."""
- return _load_item_info(ResourceInfo, instance_or_path, repository)
+ return _load_item_info(
+ ResourceInfo,
+ instance_or_path,
+ repo,
+ repo_iteration
+ )
- # class property
- type_name = 'resource'
+ # def __lt__(self, other: 'ResourceInfo') -> bool:
+ # """...."""
+ # return (
+ # self.identifier,
+ # self.version,
+ # self.revision,
+ # self.repository
+ # ) < (
+ # other.identifier,
+ # other.version,
+ # other.revision,
+ # other.repository
+ # )
def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
- -> t.Mapping[ParsedUrl, ItemRef]:
+ -> t.Mapping[ParsedPattern, ItemSpecifier]:
"""...."""
- mapping: list[tuple[ParsedUrl, ItemRef]] = []
+ mapping: list[tuple[ParsedPattern, ItemSpecifier]] = []
- for pattern, ref_obj in payloads_obj.items():
- ref = ItemRef(ref_obj['identifier'])
+ for pattern, spec_obj in payloads_obj.items():
+ ref = ItemSpecifier(spec_obj['identifier'])
mapping.extend((parsed, ref) for parsed in parse_pattern(pattern))
return Map(mapping)
@@ -215,19 +319,23 @@ def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
@dc.dataclass(frozen=True, unsafe_hash=True)
class MappingInfo(ItemInfoBase):
"""...."""
- payloads: t.Mapping[ParsedUrl, ItemRef] = dc.field(hash=False)
+ type_name: t.ClassVar[str] = 'mapping'
+
+ payloads: t.Mapping[ParsedPattern, ItemSpecifier] = dc.field(hash=False)
@staticmethod
def make(
- item_obj: t.Mapping[str, t.Any],
- schema_compat: int,
- repository: str
+ item_obj: t.Mapping[str, t.Any],
+ schema_compat: int,
+ repo: str,
+ repo_iteration: int
) -> 'MappingInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
- repository
+ repo,
+ repo_iteration
)
return MappingInfo(
@@ -237,10 +345,23 @@ class MappingInfo(ItemInfoBase):
)
@staticmethod
- def load(instance_or_path: 'InstanceOrPath', repository: str) \
- -> 'MappingInfo':
+ def load(
+ instance_or_path: json_instances.InstanceOrPathOrIO,
+ repo: str = '<dummyrepo>',
+ repo_iteration: int = -1
+ ) -> 'MappingInfo':
"""...."""
- return _load_item_info(MappingInfo, instance_or_path, repository)
+ return _load_item_info(
+ MappingInfo,
+ instance_or_path,
+ repo,
+ repo_iteration
+ )
+
+ # def __lt__(self, other: 'MappingInfo') -> bool:
+ # """...."""
+ # return (self.identifier, self.version, self.repository) < \
+ # (other.identifier, other.version, other.repository)
# class property
type_name = 'mapping'
@@ -250,8 +371,9 @@ LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo)
def _load_item_info(
info_type: t.Type[LoadedType],
- instance_or_path: InstanceOrPath,
- repository: str
+ instance_or_path: json_instances.InstanceOrPathOrIO,
+ repo: str,
+ repo_iteration: int
) -> LoadedType:
"""Read, validate and autocomplete a mapping/resource description."""
instance = json_instances.read_instance(instance_or_path)
@@ -264,81 +386,156 @@ def _load_item_info(
return info_type.make(
t.cast('dict[str, t.Any]', instance),
schema_compat,
- repository
+ repo,
+ repo_iteration
)
-VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo)
-
-@dc.dataclass(frozen=True)
-class VersionedItemInfo(t.Generic[VersionedType]):
- """Stores data of multiple versions of given resource/mapping."""
- uuid: t.Optional[str] = None
- identifier: str = '<dummy>'
- _by_version: Map[VerTuple, VersionedType] = Map()
- _initialized: bool = False
-
- def register(self, item_info: VersionedType) -> 'VersionedInfoSelfType':
- """
- Make item info queryable by version. Perform sanity checks for uuid.
- """
- identifier = item_info.identifier
- if self._initialized:
- assert identifier == self.identifier
-
- if self.uuid is not None:
- uuid: t.Optional[str] = self.uuid
- if item_info.uuid is not None and self.uuid != item_info.uuid:
- raise HaketiloException(_('uuid_mismatch_{identifier}')
- .format(identifier=identifier))
- else:
- uuid = item_info.uuid
-
- by_version = self._by_version.set(item_info.version, item_info)
-
- return VersionedItemInfo(
- identifier = identifier,
- uuid = uuid,
- _by_version = by_version,
- _initialized = True
- )
-
- def unregister(self, version: VerTuple) -> 'VersionedInfoSelfType':
- """...."""
- try:
- by_version = self._by_version.delete(version)
- except KeyError:
- by_version = self._by_version
-
- return dc.replace(self, _by_version=by_version)
-
- def is_empty(self) -> bool:
- """...."""
- return len(self._by_version) == 0
-
- def newest_version(self) -> VerTuple:
- """...."""
- assert not self.is_empty()
-
- return max(self._by_version.keys())
-
- def get_newest(self) -> VersionedType:
- """Find and return info of the newest version of item."""
- newest = self._by_version[self.newest_version()]
- assert newest is not None
- return newest
-
- def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]:
- """
- Find and return info of the specified version of the item (or None if
- absent).
- """
- return self._by_version.get(tuple(ver))
-
- def get_all(self) -> t.Iterator[VersionedType]:
- """Generate item info for all its versions, from oldest ot newest."""
- for version in sorted(self._by_version.keys()):
- yield self._by_version[version]
-
-# Below we define 1 type used by recursively-typed VersionedItemInfo.
-VersionedInfoSelfType = VersionedItemInfo[VersionedType]
+# CategorizedType = t.TypeVar(
+# 'CategorizedType',
+# bound=Categorizable
+# )
+
+# CategorizedUpdater = t.Callable[
+# [t.Optional[CategorizedType]],
+# t.Optional[CategorizedType]
+# ]
+
+# CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable)
+
+# @dc.dataclass(frozen=True)
+# class CategorizedItemInfo(Categorizable, t.Generic[CategorizedType, CategoryKeyType]):
+# """...."""
+# SelfType = t.TypeVar(
+# 'SelfType',
+# bound = 'CategorizedItemInfo[CategorizedType, CategoryKeyType]'
+# )
+
+# uuid: t.Optional[str] = None
+# identifier: str = '<dummy>'
+# items: Map[CategoryKeyType, CategorizedType] = Map()
+# _initialized: bool = False
+
+# def _update(
+# self: 'SelfType',
+# key: CategoryKeyType,
+# updater: CategorizedUpdater
+# ) -> 'SelfType':
+# """...... Perform sanity checks for uuid."""
+# uuid = self.uuid
+
+# items = self.items.mutate()
+
+# updated = updater(items.get(key))
+# if updated is None:
+# items.pop(key, None)
+
+# identifier = self.identifier
+# else:
+# items[key] = updated
+
+# identifier = updated.identifier
+# if self._initialized:
+# assert identifier == self.identifier
+
+# if uuid is not None:
+# if updated.uuid is not None and uuid != updated.uuid:
+# raise HaketiloException(_('uuid_mismatch_{identifier}')
+# .format(identifier=identifier))
+# else:
+# uuid = updated.uuid
+
+# return dc.replace(
+# self,
+# identifier = identifier,
+# uuid = uuid,
+# items = items.finish(),
+# _initialized = self._initialized or updated is not None
+# )
+
+# def is_empty(self) -> bool:
+# """...."""
+# return len(self.items) == 0
+
+
+# VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo)
+
+# class VersionedItemInfo(
+# CategorizedItemInfo[VersionedType, VerTuple],
+# t.Generic[VersionedType]
+# ):
+# """Stores data of multiple versions of given resource/mapping."""
+# SelfType = t.TypeVar('SelfType', bound='VersionedItemInfo[VersionedType]')
+
+# def register(self: 'SelfType', item_info: VersionedType) -> 'SelfType':
+# """
+# Make item info queryable by version. Perform sanity checks for uuid.
+# """
+# return self._update(item_info.version, lambda old_info: item_info)
+
+# def unregister(self: 'SelfType', version: VerTuple) -> 'SelfType':
+# """...."""
+# return self._update(version, lambda old_info: None)
+
+# def newest_version(self) -> VerTuple:
+# """...."""
+# assert not self.is_empty()
+
+# return max(self.items.keys())
+
+# def get_newest(self) -> VersionedType:
+# """Find and return info of the newest version of item."""
+# newest = self.items[self.newest_version()]
+# assert newest is not None
+# return newest
+
+# def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]:
+# """
+# Find and return info of the specified version of the item (or None if
+# absent).
+# """
+# return self.items.get(tuple(ver))
+
+# def get_all(self) -> t.Iterator[VersionedType]:
+# """Generate item info for all its versions, from oldest to newest."""
+# for version in sorted(self.items.keys()):
+# yield self.items[version]
+
+
+# MultiRepoType = t.TypeVar('MultiRepoType', ResourceInfo, MappingInfo)
+# MultiRepoVersioned = VersionedItemInfo[MultiRepoType]
+
+# class MultiRepoItemInfo(
+# CategorizedItemInfo[MultiRepoVersioned, str],
+# t.Generic[MultiRepoType]
+# ):
+# SelfType = t.TypeVar('SelfType', bound='MultiRepoItemInfo[MultiRepoType]')
+
+# def register(self: 'SelfType', item_info: MultiRepoType) -> 'SelfType':
+# """
+# Make item info queryable by version. Perform sanity checks for uuid.
+# """
+# def updater(old_item: t.Optional[MultiRepoVersioned]) \
+# -> MultiRepoVersioned:
+# """...."""
+# if old_item is None:
+# old_item = VersionedItemInfo()
+
+# return old_item.register(item_info)
+
+# return self._update(item_info.repository, updater)
+
+# def unregister(self: 'SelfType', version: VerTuple, repository: str) \
+# -> 'SelfType':
+# """...."""
+# def updater(old_item: t.Optional[MultiRepoVersioned]) -> \
+# t.Optional[MultiRepoVersioned]:
+# """...."""
+# if old_item is None:
+# return None
+
+# new_item = old_item.unregister(version)
+
+# return None if new_item.is_empty() else new_item
+
+# return self._update(repository, updater)