# SPDX-License-Identifier: GPL-3.0-or-later # Reading resources, mappings and other JSON documents from the filesystem. # # This file is part of Hydrilla&Haketilo # # Copyright (C) 2021, 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ ..... """ # Enable using with Python 3.7. from __future__ import annotations import sys if sys.version_info >= (3, 8): from typing import Protocol else: from typing_extensions import Protocol import enum import typing as t import dataclasses as dc from pathlib import Path, PurePosixPath from abc import ABC from immutables import Map from . import versions, json_instances from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern from .exceptions import HaketiloException from .translations import smart_gettext as _ @dc.dataclass(frozen=True, unsafe_hash=True) class ItemSpecifier: """....""" identifier: str SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]] def make_item_specifiers_seq(spec_objs: SpecifierObjs) \ -> tuple[ItemSpecifier, ...]: """....""" return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs) def make_required_mappings(spec_objs: t.Any, schema_compat: int) \ -> tuple[ItemSpecifier, ...]: """....""" if schema_compat < 2: return () return make_item_specifiers_seq(spec_objs) @dc.dataclass(frozen=True, unsafe_hash=True) class FileSpecifier: """....""" name: str sha256: str def normalize_filename(name: str): """ This function eliminated double slashes in file name and ensures it does not try to reference parent directories. """ path = PurePosixPath(name) if '.' in path.parts or '..' in path.parts: msg = _('err.item_info.filename_invalid_{}').format(name) raise HaketiloException(msg) return str(path) def make_file_specifiers_seq(spec_objs: SpecifierObjs) \ -> tuple[FileSpecifier, ...]: """....""" return tuple( FileSpecifier(normalize_filename(obj['file']), obj['sha256']) for obj in spec_objs ) @dc.dataclass(frozen=True, unsafe_hash=True) class GeneratedBy: """....""" name: str version: t.Optional[str] @staticmethod def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \ t.Optional['GeneratedBy']: """....""" if generated_by_obj is None: return None return GeneratedBy( name = generated_by_obj['name'], version = generated_by_obj.get('version') ) def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool: if schema_compat < 2: return False return perms_obj.get('eval', False) def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool: if schema_compat < 2: return False return perms_obj.get('cors_bypass', False) class Categorizable(Protocol): """....""" uuid: t.Optional[str] identifier: str @dc.dataclass(frozen=True, unsafe_hash=True) class ItemIdentity: repo: str repo_iteration: int version: versions.VerTuple identifier: str # mypy needs to be corrected: # https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704 @dc.dataclass(frozen=True) # type: ignore[misc] class ItemInfoBase(ABC, ItemIdentity, Categorizable): """....""" type_name: t.ClassVar[str] source_name: str = dc.field(hash=False, compare=False) source_copyright: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False) uuid: t.Optional[str] = dc.field(hash=False, compare=False) long_name: str = dc.field(hash=False, compare=False) allows_eval: bool = dc.field(hash=False, compare=False) allows_cors_bypass: bool = dc.field(hash=False, compare=False) required_mappings: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False) generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False) @property def versioned_identifier(self) -> str: """....""" return f'{self.identifier}-{versions.version_string(self.version)}' @property def files(self) -> tuple[FileSpecifier, ...]: return self.source_copyright @staticmethod def _get_base_init_kwargs( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> t.Mapping[str, t.Any]: """....""" source_copyright = make_file_specifiers_seq( item_obj['source_copyright'] ) version = versions.normalize(item_obj['version']) perms_obj = item_obj.get('permissions', {}) eval_perm = make_eval_permission(perms_obj, schema_compat) cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat) required_mappings = make_required_mappings( item_obj.get('required_mappings', []), schema_compat ) generated_by = GeneratedBy.make(item_obj.get('generated_by')) return Map( repo = repo, repo_iteration = repo_iteration, source_name = item_obj['source_name'], source_copyright = source_copyright, version = version, identifier = item_obj['identifier'], uuid = item_obj.get('uuid'), long_name = item_obj['long_name'], allows_eval = eval_perm, allows_cors_bypass = cors_bypass_perm, required_mappings = required_mappings, generated_by = generated_by ) @dc.dataclass(frozen=True, unsafe_hash=True) class ResourceInfo(ItemInfoBase): """....""" type_name: t.ClassVar[str] = 'resource' revision: int = dc.field(hash=False, compare=False) dependencies: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False) scripts: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False) @property def versioned_identifier(self) -> str: """....""" return f'{super().versioned_identifier}-{self.revision}' @property def files(self) -> tuple[FileSpecifier, ...]: return tuple((*self.source_copyright, *self.scripts)) @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> 'ResourceInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repo, repo_iteration ) dependencies = make_item_specifiers_seq( item_obj.get('dependencies', []) ) scripts = make_file_specifiers_seq( item_obj.get('scripts', []) ) return ResourceInfo( **base_init_kwargs, revision = item_obj['revision'], dependencies = dependencies, scripts = scripts ) @staticmethod def load( instance_source: json_instances.InstanceSource, repo: str = '', repo_iteration: int = -1 ) -> 'ResourceInfo': """....""" return _load_item_info( ResourceInfo, instance_source, repo, repo_iteration ) def __lt__(self, other: 'ResourceInfo') -> bool: """....""" return ( self.identifier, other.version, other.revision, self.repo, other.repo_iteration ) < ( other.identifier, self.version, self.revision, other.repo, self.repo_iteration ) def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \ -> t.Mapping[ParsedPattern, ItemSpecifier]: """....""" mapping: list[tuple[ParsedPattern, ItemSpecifier]] = [] for pattern, spec_obj in payloads_obj.items(): ref = ItemSpecifier(spec_obj['identifier']) mapping.extend((parsed, ref) for parsed in parse_pattern(pattern)) return Map(mapping) @dc.dataclass(frozen=True, unsafe_hash=True) class MappingInfo(ItemInfoBase): """....""" type_name: t.ClassVar[str] = 'mapping' payloads: t.Mapping[ParsedPattern, ItemSpecifier] = \ dc.field(hash=False, compare=False) @staticmethod def make( item_obj: t.Mapping[str, t.Any], schema_compat: int, repo: str, repo_iteration: int ) -> 'MappingInfo': """....""" base_init_kwargs = ItemInfoBase._get_base_init_kwargs( item_obj, schema_compat, repo, repo_iteration ) return MappingInfo( **base_init_kwargs, payloads = make_payloads(item_obj.get('payloads', {})) ) @staticmethod def load( instance_source: json_instances.InstanceSource, repo: str = '', repo_iteration: int = -1 ) -> 'MappingInfo': """....""" return _load_item_info( MappingInfo, instance_source, repo, repo_iteration ) def __lt__(self, other: 'MappingInfo') -> bool: """....""" return ( self.identifier, other.version, self.repo, other.repo_iteration ) < ( other.identifier, self.version, other.repo, self.repo_iteration ) AnyInfo = t.Union[ResourceInfo, MappingInfo] class ItemType(enum.Enum): RESOURCE = 'resource' MAPPING = 'mapping' @property def info_class(self) -> t.Type[AnyInfo]: if self == ItemType.RESOURCE: return ResourceInfo else: return MappingInfo @property def alt_name(self) -> str: if self == ItemType.RESOURCE: return 'library' else: return 'package' @property def alt_name_plural(self) -> str: if self == ItemType.RESOURCE: return 'libraries' else: return 'packages' LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo) def _load_item_info( info_type: t.Type[LoadedType], instance_source: json_instances.InstanceSource, repo: str, repo_iteration: int ) -> LoadedType: """Read, validate and autocomplete a mapping/resource description.""" instance = json_instances.read_instance(instance_source) schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json' schema_compat = json_instances.validate_instance(instance, schema_fmt) # We know from successful validation that instance is a dict. return info_type.make( t.cast('dict[str, t.Any]', instance), schema_compat, repo, repo_iteration ) CategorizedType = t.TypeVar( 'CategorizedType', bound=Categorizable ) CategorizedUpdater = t.Callable[ [t.Optional[CategorizedType]], t.Optional[CategorizedType] ] CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable) @dc.dataclass(frozen=True) class CategorizedItemInfo(Categorizable, t.Generic[CategorizedType, CategoryKeyType]): """....""" SelfType = t.TypeVar( 'SelfType', bound = 'CategorizedItemInfo[CategorizedType, CategoryKeyType]' ) uuid: t.Optional[str] = None identifier: str = '' items: Map[CategoryKeyType, CategorizedType] = Map() _initialized: bool = False def _update( self: 'SelfType', key: CategoryKeyType, updater: CategorizedUpdater ) -> 'SelfType': """...... Perform sanity checks for uuid.""" uuid = self.uuid items = self.items.mutate() updated = updater(items.get(key)) if updated is None: items.pop(key, None) identifier = self.identifier else: items[key] = updated identifier = updated.identifier if self._initialized: assert identifier == self.identifier if uuid is not None: if updated.uuid is not None and uuid != updated.uuid: raise HaketiloException(_('uuid_mismatch_{identifier}') .format(identifier=identifier)) else: uuid = updated.uuid return dc.replace( self, identifier = identifier, uuid = uuid, items = items.finish(), _initialized = self._initialized or updated is not None ) def is_empty(self) -> bool: """....""" return len(self.items) == 0