# SPDX-License-Identifier: GPL-3.0-or-later
# Reading resources, mappings and other JSON documents from the filesystem.
#
# This file is part of Hydrilla&Haketilo
#
# Copyright (C) 2021, 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
.....
"""
# Enable using with Python 3.7.
from __future__ import annotations
import sys
if sys.version_info >= (3, 8):
from typing import Protocol
else:
from typing_extensions import Protocol
import typing as t
import dataclasses as dc
from pathlib import Path, PurePosixPath
from abc import ABC
from immutables import Map
from . import versions, json_instances
from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern
from .exceptions import HaketiloException
from .translations import smart_gettext as _
VerTuple = t.Tuple[int, ...]
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemSpecifier:
"""...."""
identifier: str
SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]]
def make_item_specifiers_seq(spec_objs: SpecifierObjs) \
-> tuple[ItemSpecifier, ...]:
"""...."""
return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs)
def make_required_mappings(spec_objs: t.Any, schema_compat: int) \
-> tuple[ItemSpecifier, ...]:
"""...."""
if schema_compat < 2:
return ()
return make_item_specifiers_seq(spec_objs)
@dc.dataclass(frozen=True, unsafe_hash=True)
class FileSpecifier:
"""...."""
name: str
sha256: str
def normalize_filename(name: str):
"""
This function eliminated double slashes in file name and ensures it does not
try to reference parent directories.
"""
path = PurePosixPath(name)
if '.' in path.parts or '..' in path.parts:
msg = _('err.item_info.filename_invalid_{}').format(name)
raise HaketiloException(msg)
return str(path)
def make_file_specifiers_seq(spec_objs: SpecifierObjs) \
-> tuple[FileSpecifier, ...]:
"""...."""
return tuple(
FileSpecifier(normalize_filename(obj['file']), obj['sha256'])
for obj
in spec_objs
)
@dc.dataclass(frozen=True, unsafe_hash=True)
class GeneratedBy:
"""...."""
name: str
version: t.Optional[str]
@staticmethod
def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
t.Optional['GeneratedBy']:
"""...."""
if generated_by_obj is None:
return None
return GeneratedBy(
name = generated_by_obj['name'],
version = generated_by_obj.get('version')
)
def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool:
if schema_compat < 2:
return False
return perms_obj.get('eval', False)
def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool:
if schema_compat < 2:
return False
return perms_obj.get('cors_bypass', False)
class Categorizable(Protocol):
"""...."""
uuid: t.Optional[str]
identifier: str
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemIdentity:
repo: str
repo_iteration: int
version: versions.VerTuple
identifier: str
# mypy needs to be corrected:
# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704
@dc.dataclass(frozen=True) # type: ignore[misc]
class ItemInfoBase(ABC, ItemIdentity, Categorizable):
"""...."""
type_name: t.ClassVar[str]
source_name: str = dc.field(hash=False, compare=False)
source_copyright: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False)
uuid: t.Optional[str] = dc.field(hash=False, compare=False)
long_name: str = dc.field(hash=False, compare=False)
allows_eval: bool = dc.field(hash=False, compare=False)
allows_cors_bypass: bool = dc.field(hash=False, compare=False)
required_mappings: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False)
generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False)
# def path_relative_to_type(self) -> str:
# """
# Get a relative path to this item's JSON definition with respect to
# directory containing items of this type.
# """
# return f'{self.identifier}/{versions.version_string(self.version)}'
# def path(self) -> str:
# """
# Get a relative path to this item's JSON definition with respect to
# malcontent directory containing loadable items.
# """
# return f'{self.type_name}/{self.path_relative_to_type()}'
# @property
# def identity(self):
# """...."""
# return ItemIdentity(
# repository = self.repository,
# version = self.version,
# identifier = self.identifier
# )
@property
def versioned_identifier(self):
"""...."""
return f'{self.identifier}-{versions.version_string(self.version)}'
@staticmethod
def _get_base_init_kwargs(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> t.Mapping[str, t.Any]:
"""...."""
source_copyright = make_file_specifiers_seq(
item_obj['source_copyright']
)
version = versions.normalize_version(item_obj['version'])
perms_obj = item_obj.get('permissions', {})
eval_perm = make_eval_permission(perms_obj, schema_compat)
cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat)
required_mappings = make_required_mappings(
item_obj.get('required_mappings', []),
schema_compat
)
generated_by = GeneratedBy.make(item_obj.get('generated_by'))
return Map(
repo = repo,
repo_iteration = repo_iteration,
source_name = item_obj['source_name'],
source_copyright = source_copyright,
version = version,
identifier = item_obj['identifier'],
uuid = item_obj.get('uuid'),
long_name = item_obj['long_name'],
allows_eval = eval_perm,
allows_cors_bypass = cors_bypass_perm,
required_mappings = required_mappings,
generated_by = generated_by
)
@dc.dataclass(frozen=True, unsafe_hash=True)
class ResourceInfo(ItemInfoBase):
"""...."""
type_name: t.ClassVar[str] = 'resource'
revision: int = dc.field(hash=False, compare=False)
dependencies: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False)
scripts: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False)
@property
def versioned_identifier(self):
"""...."""
return f'{super().versioned_identifier()}-{self.revision}'
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> 'ResourceInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repo,
repo_iteration
)
dependencies = make_item_specifiers_seq(
item_obj.get('dependencies', [])
)
scripts = make_file_specifiers_seq(
item_obj.get('scripts', [])
)
return ResourceInfo(
**base_init_kwargs,
revision = item_obj['revision'],
dependencies = dependencies,
scripts = scripts
)
@staticmethod
def load(
instance_or_path: json_instances.InstanceOrPathOrIO,
repo: str = '',
repo_iteration: int = -1
) -> 'ResourceInfo':
"""...."""
return _load_item_info(
ResourceInfo,
instance_or_path,
repo,
repo_iteration
)
# def __lt__(self, other: 'ResourceInfo') -> bool:
# """...."""
# return (
# self.identifier,
# self.version,
# self.revision,
# self.repository
# ) < (
# other.identifier,
# other.version,
# other.revision,
# other.repository
# )
def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
-> t.Mapping[ParsedPattern, ItemSpecifier]:
"""...."""
mapping: list[tuple[ParsedPattern, ItemSpecifier]] = []
for pattern, spec_obj in payloads_obj.items():
ref = ItemSpecifier(spec_obj['identifier'])
mapping.extend((parsed, ref) for parsed in parse_pattern(pattern))
return Map(mapping)
@dc.dataclass(frozen=True, unsafe_hash=True)
class MappingInfo(ItemInfoBase):
"""...."""
type_name: t.ClassVar[str] = 'mapping'
payloads: t.Mapping[ParsedPattern, ItemSpecifier] = dc.field(hash=False, compare=False)
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> 'MappingInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repo,
repo_iteration
)
return MappingInfo(
**base_init_kwargs,
payloads = make_payloads(item_obj.get('payloads', {}))
)
@staticmethod
def load(
instance_or_path: json_instances.InstanceOrPathOrIO,
repo: str = '',
repo_iteration: int = -1
) -> 'MappingInfo':
"""...."""
return _load_item_info(
MappingInfo,
instance_or_path,
repo,
repo_iteration
)
def __lt__(self, other: 'MappingInfo') -> bool:
"""...."""
return (
self.identifier,
other.version,
self.repo,
other.repo_iteration
) < (
other.identifier,
self.version,
other.repo,
self.repo_iteration
)
AnyInfo = t.Union[ResourceInfo, MappingInfo]
LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo)
def _load_item_info(
info_type: t.Type[LoadedType],
instance_or_path: json_instances.InstanceOrPathOrIO,
repo: str,
repo_iteration: int
) -> LoadedType:
"""Read, validate and autocomplete a mapping/resource description."""
instance = json_instances.read_instance(instance_or_path)
schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json'
schema_compat = json_instances.validate_instance(instance, schema_fmt)
# We know from successful validation that instance is a dict.
return info_type.make(
t.cast('dict[str, t.Any]', instance),
schema_compat,
repo,
repo_iteration
)
# CategorizedType = t.TypeVar(
# 'CategorizedType',
# bound=Categorizable
# )
# CategorizedUpdater = t.Callable[
# [t.Optional[CategorizedType]],
# t.Optional[CategorizedType]
# ]
# CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable)
# @dc.dataclass(frozen=True)
# class CategorizedItemInfo(Categorizable, t.Generic[CategorizedType, CategoryKeyType]):
# """...."""
# SelfType = t.TypeVar(
# 'SelfType',
# bound = 'CategorizedItemInfo[CategorizedType, CategoryKeyType]'
# )
# uuid: t.Optional[str] = None
# identifier: str = ''
# items: Map[CategoryKeyType, CategorizedType] = Map()
# _initialized: bool = False
# def _update(
# self: 'SelfType',
# key: CategoryKeyType,
# updater: CategorizedUpdater
# ) -> 'SelfType':
# """...... Perform sanity checks for uuid."""
# uuid = self.uuid
# items = self.items.mutate()
# updated = updater(items.get(key))
# if updated is None:
# items.pop(key, None)
# identifier = self.identifier
# else:
# items[key] = updated
# identifier = updated.identifier
# if self._initialized:
# assert identifier == self.identifier
# if uuid is not None:
# if updated.uuid is not None and uuid != updated.uuid:
# raise HaketiloException(_('uuid_mismatch_{identifier}')
# .format(identifier=identifier))
# else:
# uuid = updated.uuid
# return dc.replace(
# self,
# identifier = identifier,
# uuid = uuid,
# items = items.finish(),
# _initialized = self._initialized or updated is not None
# )
# def is_empty(self) -> bool:
# """...."""
# return len(self.items) == 0
# VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo)
# class VersionedItemInfo(
# CategorizedItemInfo[VersionedType, VerTuple],
# t.Generic[VersionedType]
# ):
# """Stores data of multiple versions of given resource/mapping."""
# SelfType = t.TypeVar('SelfType', bound='VersionedItemInfo[VersionedType]')
# def register(self: 'SelfType', item_info: VersionedType) -> 'SelfType':
# """
# Make item info queryable by version. Perform sanity checks for uuid.
# """
# return self._update(item_info.version, lambda old_info: item_info)
# def unregister(self: 'SelfType', version: VerTuple) -> 'SelfType':
# """...."""
# return self._update(version, lambda old_info: None)
# def newest_version(self) -> VerTuple:
# """...."""
# assert not self.is_empty()
# return max(self.items.keys())
# def get_newest(self) -> VersionedType:
# """Find and return info of the newest version of item."""
# newest = self.items[self.newest_version()]
# assert newest is not None
# return newest
# def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]:
# """
# Find and return info of the specified version of the item (or None if
# absent).
# """
# return self.items.get(tuple(ver))
# def get_all(self) -> t.Iterator[VersionedType]:
# """Generate item info for all its versions, from oldest to newest."""
# for version in sorted(self.items.keys()):
# yield self.items[version]
# MultiRepoType = t.TypeVar('MultiRepoType', ResourceInfo, MappingInfo)
# MultiRepoVersioned = VersionedItemInfo[MultiRepoType]
# class MultiRepoItemInfo(
# CategorizedItemInfo[MultiRepoVersioned, str],
# t.Generic[MultiRepoType]
# ):
# SelfType = t.TypeVar('SelfType', bound='MultiRepoItemInfo[MultiRepoType]')
# def register(self: 'SelfType', item_info: MultiRepoType) -> 'SelfType':
# """
# Make item info queryable by version. Perform sanity checks for uuid.
# """
# def updater(old_item: t.Optional[MultiRepoVersioned]) \
# -> MultiRepoVersioned:
# """...."""
# if old_item is None:
# old_item = VersionedItemInfo()
# return old_item.register(item_info)
# return self._update(item_info.repository, updater)
# def unregister(self: 'SelfType', version: VerTuple, repository: str) \
# -> 'SelfType':
# """...."""
# def updater(old_item: t.Optional[MultiRepoVersioned]) -> \
# t.Optional[MultiRepoVersioned]:
# """...."""
# if old_item is None:
# return None
# new_item = old_item.unregister(version)
# return None if new_item.is_empty() else new_item
# return self._update(repository, updater)