# SPDX-License-Identifier: GPL-3.0-or-later
# Reading resources, mappings and other JSON documents from the filesystem.
#
# This file is part of Hydrilla&Haketilo
#
# Copyright (C) 2021, 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
.....
"""
# Enable using with Python 3.7.
from __future__ import annotations
import typing as t
import dataclasses as dc
from pathlib import Path, PurePath
from immutables import Map, MapMutation
from . import versions, json_instances
from .url_patterns import parse_pattern, ParsedUrl
from .exceptions import HaketiloException
from .translations import smart_gettext as _
VerTuple = t.Tuple[int, ...]
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemRef:
"""...."""
identifier: str
RefObjs = t.Sequence[t.Mapping[str, t.Any]]
def make_item_refs_seq(ref_objs: RefObjs) -> tuple[ItemRef, ...]:
"""...."""
return tuple(ItemRef(ref['identifier']) for ref in ref_objs)
def make_required_mappings(refs_objs: t.Any, schema_compat: int) \
-> tuple[ItemRef, ...]:
"""...."""
if schema_compat < 2:
return ()
return make_item_refs_seq(refs_objs)
@dc.dataclass(frozen=True, unsafe_hash=True)
class FileRef:
"""...."""
name: str
sha256: str
def make_file_refs_seq(ref_objs: RefObjs) -> tuple[FileRef, ...]:
"""...."""
return tuple(FileRef(ref['file'], ref['sha256']) for ref in ref_objs)
@dc.dataclass(frozen=True, unsafe_hash=True)
class GeneratedBy:
"""...."""
name: str
version: t.Optional[str]
@staticmethod
def make(generated_obj: t.Optional[t.Mapping[str, t.Any]]) -> \
t.Optional['GeneratedBy']:
"""...."""
if generated_obj is None:
return None
return GeneratedBy(
name = generated_obj['name'],
version = generated_obj.get('version')
)
@dc.dataclass(frozen=True, unsafe_hash=True)
class ItemInfoBase:
"""...."""
repository: str # repository used in __hash__()
source_name: str = dc.field(hash=False)
source_copyright: tuple[FileRef, ...] = dc.field(hash=False)
version: VerTuple # version used in __hash__()
identifier: str # identifier used in __hash__()
uuid: t.Optional[str] = dc.field(hash=False)
long_name: str = dc.field(hash=False)
required_mappings: tuple[ItemRef, ...] = dc.field(hash=False)
generated_by: t.Optional[GeneratedBy] = dc.field(hash=False)
def path_relative_to_type(self) -> str:
"""
Get a relative path to this item's JSON definition with respect to
directory containing items of this type.
"""
return f'{self.identifier}/{versions.version_string(self.version)}'
def path(self) -> str:
"""
Get a relative path to this item's JSON definition with respect to
malcontent directory containing loadable items.
"""
return f'{self.type_name}/{self.path_relative_to_type()}'
@property
def versioned_identifier(self):
"""...."""
return f'{self.identifier}-{versions.version_string(self.version)}'
@staticmethod
def _get_base_init_kwargs(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repository: str
) -> t.Mapping[str, t.Any]:
"""...."""
source_copyright = make_file_refs_seq(item_obj['source_copyright'])
version = versions.normalize_version(item_obj['version'])
required_mappings = make_required_mappings(
item_obj.get('required_mappings', []),
schema_compat
)
generated_by = GeneratedBy.make(item_obj.get('generated_by'))
return Map(
repository = repository,
source_name = item_obj['source_name'],
source_copyright = source_copyright,
version = version,
identifier = item_obj['identifier'],
uuid = item_obj.get('uuid'),
long_name = item_obj['long_name'],
required_mappings = required_mappings,
generated_by = generated_by
)
# class property
type_name = '!INVALID!'
InstanceOrPath = t.Union[Path, str, dict[str, t.Any]]
@dc.dataclass(frozen=True, unsafe_hash=True)
class ResourceInfo(ItemInfoBase):
"""...."""
revision: int = dc.field(hash=False)
dependencies: tuple[ItemRef, ...] = dc.field(hash=False)
scripts: tuple[FileRef, ...] = dc.field(hash=False)
@property
def versioned_identifier(self):
"""...."""
return f'{super().versioned_identifier()}-{self.revision}'
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repository: str
) -> 'ResourceInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repository
)
return ResourceInfo(
**base_init_kwargs,
revision = item_obj['revision'],
dependencies = make_item_refs_seq(item_obj.get('dependencies', [])),
scripts = make_file_refs_seq(item_obj.get('scripts', [])),
)
@staticmethod
def load(instance_or_path: 'InstanceOrPath', repository: str) \
-> 'ResourceInfo':
"""...."""
return _load_item_info(ResourceInfo, instance_or_path, repository)
# class property
type_name = 'resource'
def make_payloads(payloads_obj: t.Mapping[str, t.Any]) \
-> t.Mapping[ParsedUrl, ItemRef]:
"""...."""
mapping: list[tuple[ParsedUrl, ItemRef]] = []
for pattern, ref_obj in payloads_obj.items():
ref = ItemRef(ref_obj['identifier'])
mapping.extend((parsed, ref) for parsed in parse_pattern(pattern))
return Map(mapping)
@dc.dataclass(frozen=True, unsafe_hash=True)
class MappingInfo(ItemInfoBase):
"""...."""
payloads: t.Mapping[ParsedUrl, ItemRef] = dc.field(hash=False)
@staticmethod
def make(
item_obj: t.Mapping[str, t.Any],
schema_compat: int,
repository: str
) -> 'MappingInfo':
"""...."""
base_init_kwargs = ItemInfoBase._get_base_init_kwargs(
item_obj,
schema_compat,
repository
)
return MappingInfo(
**base_init_kwargs,
payloads = make_payloads(item_obj.get('payloads', {}))
)
@staticmethod
def load(instance_or_path: 'InstanceOrPath', repository: str) \
-> 'MappingInfo':
"""...."""
return _load_item_info(MappingInfo, instance_or_path, repository)
# class property
type_name = 'mapping'
LoadedType = t.TypeVar('LoadedType', ResourceInfo, MappingInfo)
def _load_item_info(
info_type: t.Type[LoadedType],
instance_or_path: InstanceOrPath,
repository: str
) -> LoadedType:
"""Read, validate and autocomplete a mapping/resource description."""
instance = json_instances.read_instance(instance_or_path)
schema_fmt = f'api_{info_type.type_name}_description-{{}}.schema.json'
schema_compat = json_instances.validate_instance(instance, schema_fmt)
# We know from successful validation that instance is a dict.
return info_type.make(
t.cast('dict[str, t.Any]', instance),
schema_compat,
repository
)
VersionedType = t.TypeVar('VersionedType', ResourceInfo, MappingInfo)
@dc.dataclass(frozen=True)
class VersionedItemInfo(t.Generic[VersionedType]):
"""Stores data of multiple versions of given resource/mapping."""
uuid: t.Optional[str] = None
identifier: str = ''
_by_version: Map[VerTuple, VersionedType] = Map()
_initialized: bool = False
def register(self, item_info: VersionedType) -> 'VersionedInfoSelfType':
"""
Make item info queryable by version. Perform sanity checks for uuid.
"""
identifier = item_info.identifier
if self._initialized:
assert identifier == self.identifier
if self.uuid is not None:
uuid: t.Optional[str] = self.uuid
if item_info.uuid is not None and self.uuid != item_info.uuid:
raise HaketiloException(_('uuid_mismatch_{identifier}')
.format(identifier=identifier))
else:
uuid = item_info.uuid
by_version = self._by_version.set(item_info.version, item_info)
return VersionedItemInfo(
identifier = identifier,
uuid = uuid,
_by_version = by_version,
_initialized = True
)
def unregister(self, version: VerTuple) -> 'VersionedInfoSelfType':
"""...."""
try:
by_version = self._by_version.delete(version)
except KeyError:
by_version = self._by_version
return dc.replace(self, _by_version=by_version)
def is_empty(self) -> bool:
"""...."""
return len(self._by_version) == 0
def newest_version(self) -> VerTuple:
"""...."""
assert not self.is_empty()
return max(self._by_version.keys())
def get_newest(self) -> VersionedType:
"""Find and return info of the newest version of item."""
newest = self._by_version[self.newest_version()]
assert newest is not None
return newest
def get_by_ver(self, ver: t.Iterable[int]) -> t.Optional[VersionedType]:
"""
Find and return info of the specified version of the item (or None if
absent).
"""
return self._by_version.get(tuple(ver))
def get_all(self) -> t.Iterator[VersionedType]:
"""Generate item info for all its versions, from oldest ot newest."""
for version in sorted(self._by_version.keys()):
yield self._by_version[version]
# Below we define 1 type used by recursively-typed VersionedItemInfo.
VersionedInfoSelfType = VersionedItemInfo[VersionedType]