# SPDX-License-Identifier: GPL-3.0-or-later # Reading resources, mappings and other JSON documents from the filesystem. # # This file is part of Hydrilla&Haketilo # # Copyright (C) 2021, 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ ..... """ # Enable using with Python 3.7. from __future__ import annotations import sys if sys.version_info >= (3, 8): from typing import Protocol else: from typing_extensions import Protocol import typing as t import dataclasses as dc from pathlib import Path, PurePosixPath from abc import ABC from immutables import Map from . import versions, json_instances from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern from .exceptions import HaketiloException from .translations import smart_gettext as _ VerTuple = t.Tuple[int, ...] @dc.dataclass(frozen=True, unsafe_hash=True) class ItemSpecifier: """....""" identifier: str SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]] def make_item_specifiers_seq(spec_objs: SpecifierObjs) \ -> tuple[ItemSpecifier, ...]: """....""" return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs) def make_required_mappings(spec_objs: t.Any, schema_compat: int) \ -> tuple[ItemSpecifier, ...]: """....""" if schema_compat < 2: return () return make_item_specifiers_seq(spec_objs) @dc.dataclass(frozen=True, unsafe_hash=True) class FileSpecifier: """....""" name: str sha256: str def normalize_filename(name: str): """ This function eliminated double slashes in file name and ensures it does not try to reference parent directories. """ path = PurePosixPath(name) if '.' in path.parts or '..' in path.parts: msg = _('err.item_info.filename_invalid_{}').format(name) raise HaketiloException(msg) return str(path) def make_file_specifiers_seq(spec_objs: SpecifierObjs) \ -> tuple[FileSpecifier, ...]: """....""" return tuple( FileSpecifier(normalize_filename(obj['file']), obj['sha256']) for obj in spec_objs ) @dc.dataclass(frozen=True, unsafe_hash=True) class GeneratedBy: """....""" name: str version: t.Optional[str] @staticmethod def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ t.Optional['GeneratedBy']: """....""" if generated_by_obj is None: return None return GeneratedBy( name = generated_by_obj['name'], version = generated_by_obj.get('version') ) def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool: if schema_compat < 2: return False return perms_obj.get('eval', False) def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool: if schema_compat < 2: return False return perms_obj.get('cors_bypass', False) class Categorizable(Protocol): """....""" uuid: t.Optional[str] identifier: str @dc.dataclass(frozen=True, unsafe_hash=True) class ItemIdentity: repo: str repo_iteration: int version: versions.VerTuple identifier: str # mypy needs to be corrected: # https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704 @dc.dataclass(frozen=True) # type: ignore[misc] class ItemInfoBase(ABC, ItemIdentity, Categorizable): """....""" type_name: t.ClassVar[str] source_name: str = dc.field(hash=False, compare=False) source_copyright: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False) uuid: t.Optional[str] = dc.field(hash=False, compare=False) long_name: str = dc.field(hash=False, compare=False) allows_eval: bool = dc.field(hash=False, compare=False) allows_cors_bypass: bool = dc.field(hash=False, compare=False) required_mappings: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False) generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False) # def path_relative_to_type(self) -> str: # """ # Get a relative path to this item's JSON definition with respect to # directory containing items of this type. # """ # return f'{self.identifier}/{versions.version_string(self.version)}' # def path(self) -> str: # """ # Get a relative path to this item's JSON definition with respect to # malcontent directory containing loadable items. # """ # return f'{self.type_name}/{self.path_relative_to_type()}' # @property # def identity(self): # """....""" # return ItemIdentity( # repository = self.repository, # version = self.version, # identifier = self.identifier # ) # @property # def versioned_identifier(self): # """....""" # return f'{self.identifier}-{versions.version_string(self.version)}' @staticmethod def _get_base_init_kwargs( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> t.Mapping[str, t.Any]: """....""" source_copyright = make_file_specifiers_seq( item_obj['source_copyright'] ) version = versions.normalize_version(item_obj['version']) perms_obj = item_obj.get('permissions', {}) eval_perm = make_eval_permission(perms_obj, schema_compat) cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat) required_mappings = make_required_mappings( item_obj.get('required_mappings', []), schema_compat ) generated_by = GeneratedBy.make(item_obj.get('generated_by')) return Map( repo = repo, repo_iteration = repo_iteration, source_name = item_obj['source_name'], source_copyright = source_copyright, version = version, identifier = item_obj['identifier'], uuid = item_obj.get('uuid'), long_name = item_obj['long_name'], allows_eval = eval_perm, allows_cors_bypass = cors_bypass_perm, required_mappings = required_mappings, generated_by = generated_by ) @dc.dataclass(frozen=True, unsafe_hash=True) class ResourceInfo(ItemInfoBase): """....""" type_name: t.ClassVar[str] = 'resource' revision: int = dc.field(hash=False, compare=False) dependencies: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False) scripts: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False) @property def versioned_identifier(self): """....""" return f'{super().versioned_identifier()}-{self.revision}' @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> 'ResourceInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repo, repo_iteration ) dependencies = make_item_specifiers_seq( item_obj.get('dependencies', []) ) scripts = make_file_specifiers_seq( item_obj.get('scripts', []) ) return ResourceInfo( **base_init_kwargs, revision = item_obj['revision'], dependencies = dependencies, scripts = scripts ) @staticmethod def load( instance_or_path: json_instances.InstanceOrPathOrIO, repo: str = '', repo_iteration: int = -1 ) -> 'ResourceInfo': """....""" return _load_item_info( ResourceInfo, instance_or_path, repo, repo_iteration ) # def __lt__(self, other: 'ResourceInfo') -> bool: # """....""" # return ( # self.identifier, # self.version, # self.revision, # self.repository # ) < ( # other.identifier, # other.version, # other.revision, # other.repository # ) def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ -> t.Mapping[ParsedPattern, ItemSpecifier]: """....""" mapping: list[tuple[ParsedPattern, ItemSpecifier]] = [] for pattern, spec_obj in payloads_obj.items(): ref = ItemSpecifier(spec_obj['identifier']) mapping.extend((parsed, ref) for parsed in parse_pattern(pattern)) return Map(mapping) @dc.dataclass(frozen=True, unsafe_hash=True) class MappingInfo(ItemInfoBase): """....""" type_name: t.ClassVar[str] = 'mapping' payloads: t.Mapping[ParsedPattern, ItemSpecifier] = dc.field(hash=False, compare=False) @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> 'MappingInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repo, repo_iteration ) return MappingInfo( **base_init_kwargs, payloads = make_payloads(item_obj.get('payloads', {})) ) @staticmethod def load( instance_or_path: json_instances.InstanceOrPathOrIO, repo: str = '', repo_iteration: int = -1 ) -> 'MappingInfo': """....""" return _load_item_info( MappingInfo, instance_or_path, repo, repo_iteration ) # def __lt__(self, other: 'MappingInfo') -> bool: # """....""" # return (self.identifier, self.version, self.repository) < \ # (other.identifier, other.version, other.repository) AnyInfo = t.Union[ResourceInfo, MappingInfo] LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo) def _load_item_info( info_type: t.Type[LoadedType], instance_or_path: json_instances.InstanceOrPathOrIO, repo: str, repo_iteration: int ) -> LoadedType: """Read, validate and autocomplete a mapping/resource description.""" instance = json_instances.read_instance(instance_or_path) schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json' schema_compat = json_instances.validate_instance(instance, schema_fmt) # We know from successful validation that instance is a dict. return info_type.make( t.cast('dict[str, t.Any]', instance), schema_compat, repo, repo_iteration ) # CategorizedType = t.TypeVar( # 'CategorizedType', # bound=Categorizable # ) # CategorizedUpdater = t.Callable[ # [t.Optional[CategorizedType]], # t.Optional[CategorizedType] # ] # CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable) # @dc.dataclass(frozen=True) # class CategorizedItemInfo(Categorizable, t.Generic[CategorizedType, CategoryKeyType]): # """....""" # SelfType = t.TypeVar( # 'SelfType', # bound = 'CategorizedItemInfo[CategorizedType, CategoryKeyType]' # ) # uuid: t.Optional[str] = None # identifier: str = '' # items: Map[CategoryKeyType, CategorizedType] = Map() # _initialized: bool = False # def _update( # self: 'SelfType', # key: CategoryKeyType, # updater: CategorizedUpdater # ) -> 'SelfType': # """...... Perform sanity checks for uuid.""" # uuid = self.uuid # items = self.items.mutate() # updated = updater(items.get(key)) # if updated is None: # items.pop(key, None) # identifier = self.identifier # else: # items[key] = updated # identifier = updated.identifier # if self._initialized: # assert identifier == self.identifier # if uuid is not None: # if updated.uuid is not None and uuid != updated.uuid: # raise HaketiloException(_('uuid_mismatch_{identifier}') # .format(identifier=identifier)) # else: # uuid = updated.uuid # return dc.replace( # self, # identifier = identifier, # uuid = uuid, # items = items.finish(), # _initialized = self._initialized or updated is not None # ) # def is_empty(self) -> bool: # """....""" # return len(self.items) == 0 # VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo) # class VersionedItemInfo( # CategorizedItemInfo[VersionedType, VerTuple], # t.Generic[VersionedType] # ): # """Stores data of multiple versions of given resource/mapping.""" # SelfType = t.TypeVar('SelfType', bound='VersionedItemInfo[VersionedType]') # def register(self: 'SelfType', item_info: VersionedType) -> 'SelfType': # """ # Make item info queryable by version. Perform sanity checks for uuid. # """ # return self._update(item_info.version, lambda old_info: item_info) # def unregister(self: 'SelfType', version: VerTuple) -> 'SelfType': # """....""" # return self._update(version, lambda old_info: None) # def newest_version(self) -> VerTuple: # """....""" # assert not self.is_empty() # return max(self.items.keys()) # def get_newest(self) -> VersionedType: # """Find and return info of the newest version of item.""" # newest = self.items[self.newest_version()] # assert newest is not None # return newest # def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]: # """ # Find and return info of the specified version of the item (or None if # absent). # """ # return self.items.get(tuple(ver)) # def get_all(self) -> t.Iterator[VersionedType]: # """Generate item info for all its versions, from oldest to newest.""" # for version in sorted(self.items.keys()): # yield self.items[version] # MultiRepoType = t.TypeVar('MultiRepoType', ResourceInfo, MappingInfo) # MultiRepoVersioned = VersionedItemInfo[MultiRepoType] # class MultiRepoItemInfo( # CategorizedItemInfo[MultiRepoVersioned, str], # t.Generic[MultiRepoType] # ): # SelfType = t.TypeVar('SelfType', bound='MultiRepoItemInfo[MultiRepoType]') # def register(self: 'SelfType', item_info: MultiRepoType) -> 'SelfType': # """ # Make item info queryable by version. Perform sanity checks for uuid. # """ # def updater(old_item: t.Optional[MultiRepoVersioned]) \ # -> MultiRepoVersioned: # """....""" # if old_item is None: # old_item = VersionedItemInfo() # return old_item.register(item_info) # return self._update(item_info.repository, updater) # def unregister(self: 'SelfType', version: VerTuple, repository: str) \ # -> 'SelfType': # """....""" # def updater(old_item: t.Optional[MultiRepoVersioned]) -> \ # t.Optional[MultiRepoVersioned]: # """....""" # if old_item is None: # return None # new_item = old_item.unregister(version) # return None if new_item.is_empty() else new_item # return self._update(repository, updater)