# SPDX-License-Identifier: GPL-3.0-or-later
# Reading resources, mappings and other JSON documents from the filesystem.
#
# This file is part of Hydrilla&Haketilo
#
# Copyright (C) 2021, 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use of this
# code in a proprietary program, I am not going to enforce this in
# court.
"""
.....
"""
import sys
if sys.version_info >= (3, 8):
from typing import Protocol
else:
from typing_extensions import Protocol
import enum
import typing as t
import dataclasses as dc
from pathlib import Path, PurePosixPath
from abc import ABC, abstractmethod
from immutables import Map
from . import versions, json_instances
from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern
from .exceptions import HaketiloException
from .translations import smart_gettext as _
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemSpecifier:
"""...."""
identifier: str
ItemSpecs = t.Tuple[ItemSpecifier, ...]
SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]]
def make_item_specifiers_seq(spec_objs: SpecifierObjs) -> ItemSpecs:
return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs)
def make_required_mappings(spec_objs: t.Any, schema_compat: int) -> ItemSpecs:
if schema_compat < 2:
return ()
return make_item_specifiers_seq(spec_objs)
@dc.dataclass(frozen=True, unsafe_hash=True)
class FileSpecifier:
"""...."""
name: str
sha256: str
FileSpecs = t.Tuple[FileSpecifier, ...]
def normalize_filename(name: str):
"""
This function eliminated double slashes in file name and ensures it does not
try to reference parent directories.
"""
path = PurePosixPath(name)
if '.' in path.parts or '..' in path.parts:
msg = _('err.item_info.filename_invalid_{}').format(name)
raise HaketiloException(msg)
return str(path)
def make_file_specifiers_seq(spec_objs: SpecifierObjs) -> FileSpecs:
return tuple(
FileSpecifier(normalize_filename(obj['file']), obj['sha256'])
for obj
in spec_objs
)
@dc.dataclass(frozen=True, unsafe_hash=True)
class GeneratedBy:
"""...."""
name: str
version: t.Optional[str]
@staticmethod
def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
t.Optional['GeneratedBy']:
"""...."""
if generated_by_obj is None:
return None
return GeneratedBy(
name = generated_by_obj['name'],
version = generated_by_obj.get('version')
)
def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool:
if schema_compat < 2:
return False
return perms_obj.get('eval', False)
def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool:
if schema_compat < 2:
return False
return perms_obj.get('cors_bypass', False)
def make_version_constraint(
ver: t.Any,
schema_compat: int,
default: versions.VerTuple
) -> versions.VerTuple:
if schema_compat < 2 or ver is None:
return default
return versions.normalize(ver)
class Categorizable(Protocol):
"""...."""
uuid: t.Optional[str]
identifier: str
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemIdentity:
repo: str
repo_iteration: int
version: versions.VerTuple
identifier: str
# mypy needs to be corrected:
# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704
@dc.dataclass(frozen=True) # type: ignore[misc]
class ItemInfoBase(ABC, ItemIdentity, Categorizable):
"""...."""
source_name: str = dc.field(hash=False, compare=False)
source_copyright: FileSpecs = dc.field(hash=False, compare=False)
uuid: t.Optional[str] = dc.field(hash=False, compare=False)
long_name: str = dc.field(hash=False, compare=False)
description: str = dc.field(hash=False, compare=False)
allows_eval: bool = dc.field(hash=False, compare=False)
allows_cors_bypass: bool = dc.field(hash=False, compare=False)
min_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False)
max_haketilo_ver: versions.VerTuple = dc.field(hash=False, compare=False)
required_mappings: ItemSpecs = dc.field(hash=False, compare=False)
generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False)
@property
def version_string(self) -> str:
return versions.version_string(self.version)
@property
def versioned_identifier(self) -> str:
"""...."""
return f'{self.identifier}-{self.version_string}'
@property
def files(self) -> FileSpecs:
return self.source_copyright
@property
def compatible(self) -> bool:
return (self.min_haketilo_ver <= versions.haketilo_version and
self.max_haketilo_ver >= versions.haketilo_version)
@staticmethod
def _get_base_init_kwargs(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> t.Mapping[str, t.Any]:
"""...."""
source_copyright = make_file_specifiers_seq(
item_obj['source_copyright']
)
version = versions.normalize(item_obj['version'])
perms_obj = item_obj.get('permissions', {})
eval_perm = make_eval_permission(perms_obj, schema_compat)
cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat)
min_haketilo_ver = make_version_constraint(
ver = item_obj.get('min_haketilo_version'),
schema_compat = schema_compat,
default = versions.int_ver_min
)
max_haketilo_ver = make_version_constraint(
ver = item_obj.get('max_haketilo_version'),
schema_compat = schema_compat,
default = versions.int_ver_max
)
required_mappings = make_required_mappings(
item_obj.get('required_mappings', []),
schema_compat
)
generated_by = GeneratedBy.make(item_obj.get('generated_by'))
return Map(
repo = repo,
repo_iteration = repo_iteration,
source_name = item_obj['source_name'],
source_copyright = source_copyright,
version = version,
identifier = item_obj['identifier'],
uuid = item_obj.get('uuid'),
long_name = item_obj['long_name'],
description = item_obj['description'],
allows_eval = eval_perm,
allows_cors_bypass = cors_bypass_perm,
min_haketilo_ver = min_haketilo_ver,
max_haketilo_ver = max_haketilo_ver,
required_mappings = required_mappings,
generated_by = generated_by
)
AnyInfo = t.Union['ResourceInfo', 'MappingInfo']
class ItemType(enum.Enum):
RESOURCE = 'resource'
MAPPING = 'mapping'
@property
def info_class(self) -> t.Type[AnyInfo]:
if self == ItemType.RESOURCE:
return ResourceInfo
else:
return MappingInfo
@property
def alt_name(self) -> str:
if self == ItemType.RESOURCE:
return 'library'
else:
return 'package'
@property
def alt_name_plural(self) -> str:
if self == ItemType.RESOURCE:
return 'libraries'
else:
return 'packages'
@dc.dataclass(frozen=True, unsafe_hash=True)
class CorrespondsToResourceDCMixin:
type: t.ClassVar[ItemType] = ItemType.RESOURCE
@dc.dataclass(frozen=True, unsafe_hash=True)
class CorrespondsToMappingDCMixin:
type: t.ClassVar[ItemType] = ItemType.MAPPING
@dc.dataclass(frozen=True, unsafe_hash=True)
class ResourceInfo(ItemInfoBase, CorrespondsToResourceDCMixin):
"""...."""
revision: int = dc.field(hash=False, compare=False)
dependencies: ItemSpecs = dc.field(hash=False, compare=False)
scripts: FileSpecs = dc.field(hash=False, compare=False)
@property
def version_string(self) -> str:
return f'{super().version_string}-{self.revision}'
@property
def files(self) -> FileSpecs:
return tuple((*self.source_copyright, *self.scripts))
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> 'ResourceInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repo,
repo_iteration
)
dependencies = make_item_specifiers_seq(
item_obj.get('dependencies', [])
)
scripts = make_file_specifiers_seq(
item_obj.get('scripts', [])
)
return ResourceInfo(
**base_init_kwargs,
revision = item_obj['revision'],
dependencies = dependencies,
scripts = scripts
)
@staticmethod
def load(
instance_source: json_instances.InstanceSource,
repo: str = '',
repo_iteration: int = -1
) -> 'ResourceInfo':
"""...."""
return _load_item_info(
ResourceInfo,
instance_source,
repo,
repo_iteration
)
def __lt__(self, other: 'ResourceInfo') -> bool:
"""...."""
return (
self.identifier,
other.version,
other.revision,
self.repo,
other.repo_iteration
) < (
other.identifier,
self.version,
self.revision,
other.repo,
self.repo_iteration
)
def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
-> t.Mapping[ParsedPattern, ItemSpecifier]:
"""...."""
mapping: t.List[t.Tuple[ParsedPattern, ItemSpecifier]] = []
for pattern, spec_obj in payloads_obj.items():
ref = ItemSpecifier(spec_obj['identifier'])
mapping.extend((parsed, ref) for parsed in parse_pattern(pattern))
return Map(mapping)
@dc.dataclass(frozen=True, unsafe_hash=True)
class MappingInfo(ItemInfoBase, CorrespondsToMappingDCMixin):
"""...."""
payloads: t.Mapping[ParsedPattern, ItemSpecifier] = \
dc.field(hash=False, compare=False)
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> 'MappingInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repo,
repo_iteration
)
return MappingInfo(
**base_init_kwargs,
payloads = make_payloads(item_obj.get('payloads', {}))
)
@staticmethod
def load(
instance_source: json_instances.InstanceSource,
repo: str = '',
repo_iteration: int = -1
) -> 'MappingInfo':
"""...."""
return _load_item_info(
MappingInfo,
instance_source,
repo,
repo_iteration
)
def __lt__(self, other: 'MappingInfo') -> bool:
"""...."""
return (
self.identifier,
other.version,
self.repo,
other.repo_iteration
) < (
other.identifier,
self.version,
other.repo,
self.repo_iteration
)
LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo)
def _load_item_info(
info_type: t.Type[LoadedType],
instance_source: json_instances.InstanceSource,
repo: str,
repo_iteration: int
) -> LoadedType:
"""Read, validate and autocomplete a mapping/resource description."""
instance = json_instances.read_instance(instance_source)
schema_fmt = f'api_{info_type.type.value}_description-{{}}.schema.json'
schema_compat = json_instances.validate_instance(instance, schema_fmt)
# We know from successful validation that instance is a dict.
return info_type.make(
t.cast('t.Dict[str, t.Any]', instance),
schema_compat,
repo,
repo_iteration
)
CategorizedInfoType = t.TypeVar(
'CategorizedInfoType',
ResourceInfo,
MappingInfo
)
CategorizedType = t.TypeVar(
'CategorizedType',
bound=Categorizable
)
CategorizedUpdater = t.Callable[
[t.Optional[CategorizedType]],
t.Optional[CategorizedType]
]
CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable)
@dc.dataclass(frozen=True) # type: ignore[misc]
class CategorizedItemInfo(
ABC,
Categorizable,
t.Generic[CategorizedInfoType, CategorizedType, CategoryKeyType]
):
"""...."""
SelfType = t.TypeVar(
'SelfType',
bound = 'CategorizedItemInfo[CategorizedInfoType, CategorizedType, CategoryKeyType]'
)
uuid: t.Optional[str] = None
identifier: str = ''
items: Map[CategoryKeyType, CategorizedType] = Map()
_initialized: bool = False
def _update(
self: 'SelfType',
key: CategoryKeyType,
updater: CategorizedUpdater
) -> 'SelfType':
"""...... Perform sanity checks for uuid."""
uuid = self.uuid
items = self.items.mutate()
updated = updater(items.get(key))
if updated is None:
items.pop(key, None)
identifier = self.identifier
else:
items[key] = updated
identifier = updated.identifier
if self._initialized:
assert identifier == self.identifier
if uuid is not None:
if updated.uuid is not None and uuid != updated.uuid:
raise HaketiloException(_('uuid_mismatch_{identifier}')
.format(identifier=identifier))
else:
uuid = updated.uuid
return dc.replace(
self,
identifier = identifier,
uuid = uuid,
items = items.finish(),
_initialized = self._initialized or updated is not None
)
@abstractmethod
def register(self: 'SelfType', info: CategorizedInfoType) -> 'SelfType':
...
@abstractmethod
def get_all(self: 'SelfType') -> t.Sequence[CategorizedInfoType]:
...
def is_empty(self) -> bool:
return len(self.items) == 0
class VersionedItemInfo(
CategorizedItemInfo[
CategorizedInfoType,
CategorizedInfoType,
versions.VerTuple
],
t.Generic[CategorizedInfoType]
):
"""Stores data of multiple versions of given resource/mapping."""
SelfType = t.TypeVar(
'SelfType',
bound = 'VersionedItemInfo[CategorizedInfoType]'
)
def register(self: 'SelfType', item_info: CategorizedInfoType) \
-> 'SelfType':
"""
Make item info queryable by version. Perform sanity checks for uuid.
"""
return self._update(item_info.version, lambda old_info: item_info)
@property
def newest_version(self) -> versions.VerTuple:
"""...."""
assert not self.is_empty()
return self.versions()[-1]
@property
def newest_info(self) -> CategorizedInfoType:
"""Find and return info of the newest version of item."""
return self.items[self.newest_version]
def versions(self, reverse: bool = False) -> t.Sequence[versions.VerTuple]:
return sorted(self.items.keys(), reverse=reverse)
def get_by_ver(self, ver: t.Sequence[int]) \
-> t.Optional[CategorizedInfoType]:
"""
Find and return info of the specified version of the item (or None if
absent).
"""
return self.items.get(versions.normalize(ver))
def get_all(self, reverse_versions: bool = False) \
-> t.Sequence[CategorizedInfoType]:
"""
Generate item info for all its versions, from oldest to newest unless
the opposite is requested.
"""
versions = self.versions(reverse=reverse_versions)
return [self.items[ver] for ver in versions]
VersionedResourceInfo = VersionedItemInfo[ResourceInfo]
VersionedMappingInfo = VersionedItemInfo[MappingInfo]
VersionedItemInfoMap = Map[str, VersionedItemInfo]
VersionedResourceInfoMap = Map[str, VersionedResourceInfo]
VersionedMappingInfoMap = Map[str, VersionedMappingInfo]
def register_in_versioned_map(
map: Map[str, VersionedItemInfo[CategorizedInfoType]],
info: CategorizedInfoType
) -> Map[str, VersionedItemInfo[CategorizedInfoType]]:
versioned_info = map.get(info.identifier, VersionedItemInfo())
return map.set(info.identifier, versioned_info.register(info))
class MultirepoItemInfo(
CategorizedItemInfo[
CategorizedInfoType,
VersionedItemInfo[CategorizedInfoType],
t.Tuple[str, int]
],
t.Generic[CategorizedInfoType]
):
"""
Stores data of multiple versions of given resource/mapping that may come
from multiple repositories.
"""
SelfType = t.TypeVar(
'SelfType',
bound = 'MultirepoItemInfo[CategorizedInfoType]'
)
def register(self: 'SelfType', item_info: CategorizedInfoType) \
-> 'SelfType':
"""
Make item info queryable by repo and version. Perform sanity checks for
uuid.
"""
def update(
versioned: t.Optional[VersionedItemInfo[CategorizedInfoType]]
) -> VersionedItemInfo[CategorizedInfoType]:
if versioned is None:
versioned = VersionedItemInfo()
return versioned.register(item_info)
return self._update((item_info.repo, item_info.repo_iteration), update)
@property
def default_info(self) -> CategorizedInfoType:
"""
Find and return info of one of the available options for the newest
version of item.
"""
assert not self.is_empty()
return self.get_all(reverse_repos=True)[-1]
def options(self, reverse: bool = False) -> t.Sequence[t.Tuple[str, int]]:
return sorted(
self.items.keys(),
key = (lambda tuple: (tuple[0], 1 - tuple[1])),
reverse = reverse
)
def get_all(
self,
reverse_versions: bool = False,
reverse_repos: bool = False
) -> t.Sequence[CategorizedInfoType]:
"""
Generate item info for all its versions and options, from oldest to
newest version and from.
"""
all_versions: t.Set[versions.VerTuple] = set()
for versioned in self.items.values():
all_versions.update(versioned.versions())
result = []
for version in sorted(all_versions, reverse=reverse_versions):
for option in self.options(reverse=reverse_repos):
info = self.items[option].get_by_ver(version)
if info is not None:
result.append(info)
return result
MultirepoResourceInfo = MultirepoItemInfo[ResourceInfo]
MultirepoMappingInfo = MultirepoItemInfo[MappingInfo]
MultirepoItemInfoMap = Map[str, MultirepoItemInfo]
MultirepoResourceInfoMap = Map[str, MultirepoResourceInfo]
MultirepoMappingInfoMap = Map[str, MultirepoMappingInfo]
def register_in_multirepo_map(
map: Map[str, MultirepoItemInfo[CategorizedInfoType]],
info: CategorizedInfoType
) -> Map[str, MultirepoItemInfo[CategorizedInfoType]]:
multirepo_info = map.get(info.identifier, MultirepoItemInfo())
return map.set(info.identifier, multirepo_info.register(info))
def all_map_infos(
map: Map[str, CategorizedItemInfo[CategorizedInfoType, t.Any, t.Any]]
) -> t.Iterator[CategorizedInfoType]:
for versioned_info in map.values():
for item_info in versioned_info.get_all():
yield item_info