aboutsummaryrefslogtreecommitdiff
# SPDX-License-Identifier: GPL-3.0-or-later

# Handling JSON objects.
#
# This file is part of Hydrilla&Haketilo.
#
# Copyright (C) 2021, 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use of this
# code in a proprietary program, I am not going to enforce this in
# court.

"""
This module contains utilities for reading and validation of JSON instances.
"""

import re
import json
import os
import io
import typing as t

from pathlib import Path, PurePath

from jsonschema import RefResolver, Draft7Validator # type: ignore

from .translations import smart_gettext as _
from .exceptions import HaketiloException
from . import versions

here = Path(__file__).resolve().parent

_strip_comment_re = re.compile(r'''
^ # match from the beginning of each line
( # catch the part before '//' comment
  (?: # this group matches either a string or a single out-of-string character
    [^"/] |
    "
    (?: # this group matches any in-a-string character
      [^"\\] |          # match any normal character
      \\[^u] |          # match any escaped character like '\f' or '\n'
      \\u[a-fA-F0-9]{4} # match an escape
    )*
    "
  )*
)
# expect either end-of-line or a comment:
# * unterminated strings will cause matching to fail
# * bad comment (with '/' instead of '//') will be indicated by second group
#   having length 1 instead of 2 or 0
(//?|$)
''', re.VERBOSE)

def strip_json_comments(text: str) -> str:
    """
    Accept JSON text with optional C++-style ('//') comments and return the text
    with comments removed. Consecutive slashes inside strings are handled
    properly. A spurious single slash ('/') shall generate an error. Errors in
    JSON itself shall be ignored.
    """
    stripped_text = []
    for line_num, line in enumerate(text.split('\n'), start=1):
        match = _strip_comment_re.match(line)

        if match is None: # unterminated string
            # ignore this error, let the json module report it
            stripped = line
        elif len(match[2]) == 1:
            msg_fmt = _('bad_json_comment_line_{line_num}_char_{char_num}')

            raise HaketiloException(msg_fmt.format(
                line_num = line_num,
                char_num = len(match[1]) + 1
            ))
        else:
            stripped = match[1]

        stripped_text.append(stripped)

    return '\n'.join(stripped_text)

_schema_name_re = re.compile(r'''
(?P<name_base>[^/]*)
-
(?P<ver>
  (?P<major>[1-9][0-9]*)
  (?: # this repeated group matches the remaining version numbers
    \.
    (?:[1-9][0-9]*|0)
  )*
)
\.schema\.json
$
''', re.VERBOSE)

schema_paths: t.Dict[str, Path] = {}
for path in (here / 'schemas').rglob('*.schema.json'):
    match = _schema_name_re.match(path.name)
    assert match is not None

    schema_name_base = match.group('name_base')
    schema_ver_list = match.group('ver').split('.')

    for i in range(len(schema_ver_list)):
        schema_ver = '.'.join(schema_ver_list[:i+1])
        schema_paths[f'{schema_name_base}-{schema_ver}.schema.json'] = path

schema_paths.update([(f'https://hydrilla.koszko.org/schemas/{name}', path)
                     for name, path in schema_paths.items()])

schemas: t.Dict[Path, t.Dict[str, t.Any]] = {}

class UnknownSchemaError(HaketiloException):
    pass

def _get_schema(schema_name: str) -> t.Dict[str, t.Any]:
    """Return loaded JSON of the requested schema. Cache results."""
    path = schema_paths.get(schema_name)
    if path is None:
        raise UnknownSchemaError(_('unknown_schema_{}').format(schema_name))

    if path not in schemas:
        schemas[path] = json.loads(path.read_text())

    return schemas[path]

def validator_for(schema: t.Union[str, t.Dict[str, t.Any]]) -> Draft7Validator:
    """
    Prepare a validator for the provided schema.

    Other schemas under '../schemas' can be referenced.
    """
    if isinstance(schema, str):
        schema = _get_schema(schema)

    resolver = RefResolver(
        base_uri=schema['$id'],
        referrer=schema,
        handlers={'https': _get_schema}
    )

    return Draft7Validator(schema, resolver=resolver)

def parse_instance(text: str) -> object:
    """Parse 'text' as JSON with additional '//' comments support."""
    return json.loads(strip_json_comments(text))

InstanceSource = t.Union[Path, str, io.TextIOBase, t.Dict[str, t.Any], bytes]

def read_instance(instance_or_path: InstanceSource) -> object:
    """...."""
    if isinstance(instance_or_path, dict):
        return instance_or_path

    if isinstance(instance_or_path, bytes):
        encoding = json.detect_encoding(instance_or_path)
        text = instance_or_path.decode(encoding)
    elif isinstance(instance_or_path, io.TextIOBase):
        try:
            text = instance_or_path.read()
        finally:
            instance_or_path.close()
    else:
        text = Path(instance_or_path).read_text()

    try:
        return parse_instance(text)
    except:
        if isinstance(instance_or_path, str) or \
           isinstance(instance_or_path, Path):
            fmt = _('err.util.text_in_{}_not_valid_json')
            raise HaketiloException(fmt.format(instance_or_path))
        else:
            raise HaketiloException(_('err.util.text_not_valid_json'))

def get_schema_version(instance: object) -> versions.VerTuple:
    """
    Parse passed object's "$schema" property and return the schema version tuple.
    """
    ver_str: t.Optional[str] = None

    if isinstance(instance, dict) and type(instance.get('$schema')) is str:
        match = _schema_name_re.search(instance['$schema'])
        ver_str = match.group('ver') if match else None

    if ver_str is not None:
        return versions.parse_normalize(ver_str)
    else:
        raise HaketiloException(_('no_schema_number_in_instance'))

def get_schema_major_number(instance: object) -> int:
    """
    Parse passed object's "$schema" property and return the major number of
    schema version.
    """
    return get_schema_version(instance)[0]

def validate_instance(instance: object, schema_name_fmt: str) -> int:
    """...."""
    major = get_schema_major_number(instance)
    schema_name = schema_name_fmt.format(major)
    validator = validator_for(schema_name)

    validator.validate(instance)

    return major