# SPDX-License-Identifier: GPL-3.0-or-later
# Handling JSON objects.
#
# This file is part of Hydrilla&Haketilo.
#
# Copyright (C) 2021, 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
This module contains utilities for reading and validation of JSON instances.
"""
# Enable using with Python 3.7.
from __future__ import annotations
import re
import json
import os
import io
import typing as t
from pathlib import Path, PurePath
from jsonschema import RefResolver, Draft7Validator # type: ignore
from .translations import smart_gettext as _
from .exceptions import HaketiloException
from . import versions
here = Path(__file__).resolve().parent
_strip_comment_re = re.compile(r'''
^ # match from the beginning of each line
( # catch the part before '//' comment
(?: # this group matches either a string or a single out-of-string character
[^"/] |
"
(?: # this group matches any in-a-string character
[^"\\] | # match any normal character
\\[^u] | # match any escaped character like '\f' or '\n'
\\u[a-fA-F0-9]{4} # match an escape
)*
"
)*
)
# expect either end-of-line or a comment:
# * unterminated strings will cause matching to fail
# * bad comment (with '/' instead of '//') will be indicated by second group
# having length 1 instead of 2 or 0
(//?|$)
''', re.VERBOSE)
def strip_json_comments(text: str) -> str:
"""
Accept JSON text with optional C++-style ('//') comments and return the text
with comments removed. Consecutive slashes inside strings are handled
properly. A spurious single slash ('/') shall generate an error. Errors in
JSON itself shall be ignored.
"""
stripped_text = []
for line_num, line in enumerate(text.split('\n'), start=1):
match = _strip_comment_re.match(line)
if match is None: # unterminated string
# ignore this error, let the json module report it
stripped = line
elif len(match[2]) == 1:
msg_fmt = _('bad_json_comment_line_{line_num}_char_{char_num}')
raise HaketiloException(msg_fmt.format(
line_num = line_num,
char_num = len(match[1]) + 1
))
else:
stripped = match[1]
stripped_text.append(stripped)
return '\n'.join(stripped_text)
_schema_name_re = re.compile(r'''
(?P[^/]*)
-
(?P
(?P[1-9][0-9]*)
(?: # this repeated group matches the remaining version numbers
\.
(?:[1-9][0-9]*|0)
)*
)
\.schema\.json
$
''', re.VERBOSE)
schema_paths: dict[str, Path] = {}
for path in (here / 'schemas').rglob('*.schema.json'):
match = _schema_name_re.match(path.name)
assert match is not None
schema_name_base = match.group('name_base')
schema_ver_list = match.group('ver').split('.')
for i in range(len(schema_ver_list)):
schema_ver = '.'.join(schema_ver_list[:i+1])
schema_paths[f'{schema_name_base}-{schema_ver}.schema.json'] = path
schema_paths.update([(f'https://hydrilla.koszko.org/schemas/{name}', path)
for name, path in schema_paths.items()])
schemas: dict[Path, dict[str, t.Any]] = {}
class UnknownSchemaError(HaketiloException):
pass
def _get_schema(schema_name: str) -> dict[str, t.Any]:
"""Return loaded JSON of the requested schema. Cache results."""
path = schema_paths.get(schema_name)
if path is None:
raise UnknownSchemaError(_('unknown_schema_{}').format(schema_name))
if path not in schemas:
schemas[path] = json.loads(path.read_text())
return schemas[path]
def validator_for(schema: t.Union[str, dict[str, t.Any]]) -> Draft7Validator:
"""
Prepare a validator for the provided schema.
Other schemas under '../schemas' can be referenced.
"""
if isinstance(schema, str):
schema = _get_schema(schema)
resolver = RefResolver(
base_uri=schema['$id'],
referrer=schema,
handlers={'https': _get_schema}
)
return Draft7Validator(schema, resolver=resolver)
def parse_instance(text: str) -> object:
"""Parse 'text' as JSON with additional '//' comments support."""
return json.loads(strip_json_comments(text))
InstanceSource = t.Union[Path, str, io.TextIOBase, dict[str, t.Any], bytes]
def read_instance(instance_or_path: InstanceSource) -> object:
"""...."""
if isinstance(instance_or_path, dict):
return instance_or_path
if isinstance(instance_or_path, bytes):
encoding = json.detect_encoding(instance_or_path)
text = instance_or_path.decode(encoding)
elif isinstance(instance_or_path, io.TextIOBase):
try:
text = instance_or_path.read()
finally:
instance_or_path.close()
else:
text = Path(instance_or_path).read_text()
try:
return parse_instance(text)
except:
if isinstance(instance_or_path, str) or \
isinstance(instance_or_path, Path):
fmt = _('err.util.text_in_{}_not_valid_json')
raise HaketiloException(fmt.format(instance_or_path))
else:
raise HaketiloException(_('err.util.text_not_valid_json'))
def get_schema_version(instance: object) -> tuple[int, ...]:
"""
Parse passed object's "$schema" property and return the schema version tuple.
"""
ver_str: t.Optional[str] = None
if isinstance(instance, dict) and type(instance.get('$schema')) is str:
match = _schema_name_re.search(instance['$schema'])
ver_str = match.group('ver') if match else None
if ver_str is not None:
return versions.parse(ver_str)
else:
raise HaketiloException(_('no_schema_number_in_instance'))
def get_schema_major_number(instance: object) -> int:
"""
Parse passed object's "$schema" property and return the major number of
schema version.
"""
return get_schema_version(instance)[0]
def validate_instance(instance: object, schema_name_fmt: str) -> int:
"""...."""
major = get_schema_major_number(instance)
schema_name = schema_name_fmt.format(major)
validator = validator_for(schema_name)
validator.validate(instance)
return major