diff options
Diffstat (limited to 'src/hydrilla/json_instances.py')
-rw-r--r-- | src/hydrilla/json_instances.py | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/src/hydrilla/json_instances.py b/src/hydrilla/json_instances.py new file mode 100644 index 0000000..b56a7e1 --- /dev/null +++ b/src/hydrilla/json_instances.py @@ -0,0 +1,221 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Handling JSON objects. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2021, 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this +# code in a proprietary program, I am not going to enforce this in +# court. + +""" +This module contains utilities for reading and validation of JSON instances. +""" + +import re +import json +import os +import io +import typing as t + +from pathlib import Path, PurePath + +from jsonschema import RefResolver, Draft7Validator # type: ignore + +from .translations import smart_gettext as _ +from .exceptions import HaketiloException +from . import versions + +here = Path(__file__).resolve().parent + +_strip_comment_re = re.compile(r''' +^ # match from the beginning of each line +( # catch the part before '//' comment + (?: # this group matches either a string or a single out-of-string character + [^"/] | + " + (?: # this group matches any in-a-string character + [^"\\] | # match any normal character + \\[^u] | # match any escaped character like '\f' or '\n' + \\u[a-fA-F0-9]{4} # match an escape + )* + " + )* +) +# expect either end-of-line or a comment: +# * unterminated strings will cause matching to fail +# * bad comment (with '/' instead of '//') will be indicated by second group +# having length 1 instead of 2 or 0 +(//?|$) +''', re.VERBOSE) + +def strip_json_comments(text: str) -> str: + """ + Accept JSON text with optional C++-style ('//') comments and return the text + with comments removed. Consecutive slashes inside strings are handled + properly. A spurious single slash ('/') shall generate an error. Errors in + JSON itself shall be ignored. + """ + stripped_text = [] + for line_num, line in enumerate(text.split('\n'), start=1): + match = _strip_comment_re.match(line) + + if match is None: # unterminated string + # ignore this error, let the json module report it + stripped = line + elif len(match[2]) == 1: + msg_fmt = _('bad_json_comment_line_{line_num}_char_{char_num}') + + raise HaketiloException(msg_fmt.format( + line_num = line_num, + char_num = len(match[1]) + 1 + )) + else: + stripped = match[1] + + stripped_text.append(stripped) + + return '\n'.join(stripped_text) + +_schema_name_re = re.compile(r''' +(?P<name_base>[^/]*) +- +(?P<ver> + (?P<major>[1-9][0-9]*) + (?: # this repeated group matches the remaining version numbers + \. + (?:[1-9][0-9]*|0) + )* +) +\.schema\.json +$ +''', re.VERBOSE) + +schema_paths: t.Dict[str, Path] = {} +for path in (here / 'schemas').rglob('*.schema.json'): + match = _schema_name_re.match(path.name) + assert match is not None + + schema_name_base = match.group('name_base') + schema_ver_list = match.group('ver').split('.') + + for i in range(len(schema_ver_list)): + schema_ver = '.'.join(schema_ver_list[:i+1]) + schema_paths[f'{schema_name_base}-{schema_ver}.schema.json'] = path + +schema_paths.update([(f'https://hydrilla.koszko.org/schemas/{name}', path) + for name, path in schema_paths.items()]) + +schemas: t.Dict[Path, t.Dict[str, t.Any]] = {} + +class UnknownSchemaError(HaketiloException): + pass + +def _get_schema(schema_name: str) -> t.Dict[str, t.Any]: + """Return loaded JSON of the requested schema. Cache results.""" + path = schema_paths.get(schema_name) + if path is None: + raise UnknownSchemaError(_('unknown_schema_{}').format(schema_name)) + + if path not in schemas: + schemas[path] = json.loads(path.read_text()) + + return schemas[path] + +def validator_for(schema: t.Union[str, t.Dict[str, t.Any]]) -> Draft7Validator: + """ + Prepare a validator for the provided schema. + + Other schemas under '../schemas' can be referenced. + """ + if isinstance(schema, str): + schema = _get_schema(schema) + + resolver = RefResolver( + base_uri=schema['$id'], + referrer=schema, + handlers={'https': _get_schema} + ) + + return Draft7Validator(schema, resolver=resolver) + +def parse_instance(text: str) -> object: + """Parse 'text' as JSON with additional '//' comments support.""" + return json.loads(strip_json_comments(text)) + +InstanceSource = t.Union[Path, str, io.TextIOBase, t.Dict[str, t.Any], bytes] + +def read_instance(instance_or_path: InstanceSource) -> object: + """....""" + if isinstance(instance_or_path, dict): + return instance_or_path + + if isinstance(instance_or_path, bytes): + encoding = json.detect_encoding(instance_or_path) + text = instance_or_path.decode(encoding) + elif isinstance(instance_or_path, io.TextIOBase): + try: + text = instance_or_path.read() + finally: + instance_or_path.close() + else: + text = Path(instance_or_path).read_text() + + try: + return parse_instance(text) + except: + if isinstance(instance_or_path, str) or \ + isinstance(instance_or_path, Path): + fmt = _('err.util.text_in_{}_not_valid_json') + raise HaketiloException(fmt.format(instance_or_path)) + else: + raise HaketiloException(_('err.util.text_not_valid_json')) + +def get_schema_version(instance: object) -> versions.VerTuple: + """ + Parse passed object's "$schema" property and return the schema version tuple. + """ + ver_str: t.Optional[str] = None + + if isinstance(instance, dict) and type(instance.get('$schema')) is str: + match = _schema_name_re.search(instance['$schema']) + ver_str = match.group('ver') if match else None + + if ver_str is not None: + return versions.parse_normalize(ver_str) + else: + raise HaketiloException(_('no_schema_number_in_instance')) + +def get_schema_major_number(instance: object) -> int: + """ + Parse passed object's "$schema" property and return the major number of + schema version. + """ + return get_schema_version(instance)[0] + +def validate_instance(instance: object, schema_name_fmt: str) -> int: + """....""" + major = get_schema_major_number(instance) + schema_name = schema_name_fmt.format(major) + validator = validator_for(schema_name) + + validator.validate(instance) + + return major |