# SPDX-License-Identifier: GPL-3.0-or-later
# Reading resources, mappings and other JSON documents from the filesystem.
#
# This file is part of Hydrilla&Haketilo
#
# Copyright (C) 2021, 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
.....
"""
# Enable using with Python 3.7.
from __future__ import annotations
import sys
if sys.version_info >= (3, 8):
from typing import Protocol
else:
from typing_extensions import Protocol
import typing as t
import dataclasses as dc
from pathlib import Path, PurePosixPath
from abc import ABC
from immutables import Map
from . import versions, json_instances
from .url_patterns import parse_pattern, ParsedUrl, ParsedPattern
from .exceptions import HaketiloException
from .translations import smart_gettext as _
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemSpecifier:
"""...."""
identifier: str
SpecifierObjs = t.Sequence[t.Mapping[str, t.Any]]
def make_item_specifiers_seq(spec_objs: SpecifierObjs) \
-> tuple[ItemSpecifier, ...]:
"""...."""
return tuple(ItemSpecifier(obj['identifier']) for obj in spec_objs)
def make_required_mappings(spec_objs: t.Any, schema_compat: int) \
-> tuple[ItemSpecifier, ...]:
"""...."""
if schema_compat < 2:
return ()
return make_item_specifiers_seq(spec_objs)
@dc.dataclass(frozen=True, unsafe_hash=True)
class FileSpecifier:
"""...."""
name: str
sha256: str
def normalize_filename(name: str):
"""
This function eliminated double slashes in file name and ensures it does not
try to reference parent directories.
"""
path = PurePosixPath(name)
if '.' in path.parts or '..' in path.parts:
msg = _('err.item_info.filename_invalid_{}').format(name)
raise HaketiloException(msg)
return str(path)
def make_file_specifiers_seq(spec_objs: SpecifierObjs) \
-> tuple[FileSpecifier, ...]:
"""...."""
return tuple(
FileSpecifier(normalize_filename(obj['file']), obj['sha256'])
for obj
in spec_objs
)
@dc.dataclass(frozen=True, unsafe_hash=True)
class GeneratedBy:
"""...."""
name: str
version: t.Optional[str]
@staticmethod
def make(generated_by_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
t.Optional['GeneratedBy']:
"""...."""
if generated_by_obj is None:
return None
return GeneratedBy(
name = generated_by_obj['name'],
version = generated_by_obj.get('version')
)
def make_eval_permission(perms_obj: t.Any, schema_compat: int) -> bool:
if schema_compat < 2:
return False
return perms_obj.get('eval', False)
def make_cors_bypass_permission(perms_obj: t.Any, schema_compat: int) -> bool:
if schema_compat < 2:
return False
return perms_obj.get('cors_bypass', False)
class Categorizable(Protocol):
"""...."""
uuid: t.Optional[str]
identifier: str
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemIdentity:
repo: str
repo_iteration: int
version: versions.VerTuple
identifier: str
# mypy needs to be corrected:
# https://stackoverflow.com/questions/70999513/conflict-between-mix-ins-for-abstract-dataclasses/70999704#70999704
@dc.dataclass(frozen=True) # type: ignore[misc]
class ItemInfoBase(ABC, ItemIdentity, Categorizable):
"""...."""
type_name: t.ClassVar[str]
source_name: str = dc.field(hash=False, compare=False)
source_copyright: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False)
uuid: t.Optional[str] = dc.field(hash=False, compare=False)
long_name: str = dc.field(hash=False, compare=False)
allows_eval: bool = dc.field(hash=False, compare=False)
allows_cors_bypass: bool = dc.field(hash=False, compare=False)
required_mappings: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False)
generated_by: t.Optional[GeneratedBy] = dc.field(hash=False, compare=False)
@property
def versioned_identifier(self) -> str:
"""...."""
return f'{self.identifier}-{versions.version_string(self.version)}'
@property
def files(self) -> tuple[FileSpecifier, ...]:
return self.source_copyright
@staticmethod
def _get_base_init_kwargs(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> t.Mapping[str, t.Any]:
"""...."""
source_copyright = make_file_specifiers_seq(
item_obj['source_copyright']
)
version = versions.normalize_version(item_obj['version'])
perms_obj = item_obj.get('permissions', {})
eval_perm = make_eval_permission(perms_obj, schema_compat)
cors_bypass_perm = make_cors_bypass_permission(perms_obj, schema_compat)
required_mappings = make_required_mappings(
item_obj.get('required_mappings', []),
schema_compat
)
generated_by = GeneratedBy.make(item_obj.get('generated_by'))
return Map(
repo = repo,
repo_iteration = repo_iteration,
source_name = item_obj['source_name'],
source_copyright = source_copyright,
version = version,
identifier = item_obj['identifier'],
uuid = item_obj.get('uuid'),
long_name = item_obj['long_name'],
allows_eval = eval_perm,
allows_cors_bypass = cors_bypass_perm,
required_mappings = required_mappings,
generated_by = generated_by
)
@dc.dataclass(frozen=True, unsafe_hash=True)
class ResourceInfo(ItemInfoBase):
"""...."""
type_name: t.ClassVar[str] = 'resource'
revision: int = dc.field(hash=False, compare=False)
dependencies: tuple[ItemSpecifier, ...] = dc.field(hash=False, compare=False)
scripts: tuple[FileSpecifier, ...] = dc.field(hash=False, compare=False)
@property
def versioned_identifier(self) -> str:
"""...."""
return f'{super().versioned_identifier}-{self.revision}'
@property
def files(self) -> tuple[FileSpecifier, ...]:
return tuple((*self.source_copyright, *self.scripts))
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> 'ResourceInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repo,
repo_iteration
)
dependencies = make_item_specifiers_seq(
item_obj.get('dependencies', [])
)
scripts = make_file_specifiers_seq(
item_obj.get('scripts', [])
)
return ResourceInfo(
**base_init_kwargs,
revision = item_obj['revision'],
dependencies = dependencies,
scripts = scripts
)
@staticmethod
def load(
instance_or_path: json_instances.InstanceOrPathOrIO,
repo: str = '',
repo_iteration: int = -1
) -> 'ResourceInfo':
"""...."""
return _load_item_info(
ResourceInfo,
instance_or_path,
repo,
repo_iteration
)
def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
-> t.Mapping[ParsedPattern, ItemSpecifier]:
"""...."""
mapping: list[tuple[ParsedPattern, ItemSpecifier]] = []
for pattern, spec_obj in payloads_obj.items():
ref = ItemSpecifier(spec_obj['identifier'])
mapping.extend((parsed, ref) for parsed in parse_pattern(pattern))
return Map(mapping)
@dc.dataclass(frozen=True, unsafe_hash=True)
class MappingInfo(ItemInfoBase):
"""...."""
type_name: t.ClassVar[str] = 'mapping'
payloads: t.Mapping[ParsedPattern, ItemSpecifier] = dc.field(hash=False, compare=False)
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repo: str,
repo_iteration: int
) -> 'MappingInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repo,
repo_iteration
)
return MappingInfo(
**base_init_kwargs,
payloads = make_payloads(item_obj.get('payloads', {}))
)
@staticmethod
def load(
instance_or_path: json_instances.InstanceOrPathOrIO,
repo: str = '',
repo_iteration: int = -1
) -> 'MappingInfo':
"""...."""
return _load_item_info(
MappingInfo,
instance_or_path,
repo,
repo_iteration
)
def __lt__(self, other: 'MappingInfo') -> bool:
"""...."""
return (
self.identifier,
other.version,
self.repo,
other.repo_iteration
) < (
other.identifier,
self.version,
other.repo,
self.repo_iteration
)
AnyInfo = t.Union[ResourceInfo, MappingInfo]
LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo)
def _load_item_info(
info_type: t.Type[LoadedType],
instance_or_path: json_instances.InstanceOrPathOrIO,
repo: str,
repo_iteration: int
) -> LoadedType:
"""Read, validate and autocomplete a mapping/resource description."""
instance = json_instances.read_instance(instance_or_path)
schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json'
schema_compat = json_instances.validate_instance(instance, schema_fmt)
# We know from successful validation that instance is a dict.
return info_type.make(
t.cast('dict[str, t.Any]', instance),
schema_compat,
repo,
repo_iteration
)
CategorizedType = t.TypeVar(
'CategorizedType',
bound=Categorizable
)
CategorizedUpdater = t.Callable[
[t.Optional[CategorizedType]],
t.Optional[CategorizedType]
]
CategoryKeyType = t.TypeVar('CategoryKeyType', bound=t.Hashable)
@dc.dataclass(frozen=True)
class CategorizedItemInfo(Categorizable, t.Generic[CategorizedType, CategoryKeyType]):
"""...."""
SelfType = t.TypeVar(
'SelfType',
bound = 'CategorizedItemInfo[CategorizedType, CategoryKeyType]'
)
uuid: t.Optional[str] = None
identifier: str = ''
items: Map[CategoryKeyType, CategorizedType] = Map()
_initialized: bool = False
def _update(
self: 'SelfType',
key: CategoryKeyType,
updater: CategorizedUpdater
) -> 'SelfType':
"""...... Perform sanity checks for uuid."""
uuid = self.uuid
items = self.items.mutate()
updated = updater(items.get(key))
if updated is None:
items.pop(key, None)
identifier = self.identifier
else:
items[key] = updated
identifier = updated.identifier
if self._initialized:
assert identifier == self.identifier
if uuid is not None:
if updated.uuid is not None and uuid != updated.uuid:
raise HaketiloException(_('uuid_mismatch_{identifier}')
.format(identifier=identifier))
else:
uuid = updated.uuid
return dc.replace(
self,
identifier = identifier,
uuid = uuid,
items = items.finish(),
_initialized = self._initialized or updated is not None
)
def is_empty(self) -> bool:
"""...."""
return len(self.items) == 0