aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/json_instances.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/json_instances.py')
-rw-r--r--src/hydrilla/json_instances.py221
1 files changed, 221 insertions, 0 deletions
diff --git a/src/hydrilla/json_instances.py b/src/hydrilla/json_instances.py
new file mode 100644
index 0000000..b56a7e1
--- /dev/null
+++ b/src/hydrilla/json_instances.py
@@ -0,0 +1,221 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Handling JSON objects.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2021, 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this
+# code in a proprietary program, I am not going to enforce this in
+# court.
+
+"""
+This module contains utilities for reading and validation of JSON instances.
+"""
+
+import re
+import json
+import os
+import io
+import typing as t
+
+from pathlib import Path, PurePath
+
+from jsonschema import RefResolver, Draft7Validator # type: ignore
+
+from .translations import smart_gettext as _
+from .exceptions import HaketiloException
+from . import versions
+
+here = Path(__file__).resolve().parent
+
+_strip_comment_re = re.compile(r'''
+^ # match from the beginning of each line
+( # catch the part before '//' comment
+ (?: # this group matches either a string or a single out-of-string character
+ [^"/] |
+ "
+ (?: # this group matches any in-a-string character
+ [^"\\] | # match any normal character
+ \\[^u] | # match any escaped character like '\f' or '\n'
+ \\u[a-fA-F0-9]{4} # match an escape
+ )*
+ "
+ )*
+)
+# expect either end-of-line or a comment:
+# * unterminated strings will cause matching to fail
+# * bad comment (with '/' instead of '//') will be indicated by second group
+# having length 1 instead of 2 or 0
+(//?|$)
+''', re.VERBOSE)
+
+def strip_json_comments(text: str) -> str:
+ """
+ Accept JSON text with optional C++-style ('//') comments and return the text
+ with comments removed. Consecutive slashes inside strings are handled
+ properly. A spurious single slash ('/') shall generate an error. Errors in
+ JSON itself shall be ignored.
+ """
+ stripped_text = []
+ for line_num, line in enumerate(text.split('\n'), start=1):
+ match = _strip_comment_re.match(line)
+
+ if match is None: # unterminated string
+ # ignore this error, let the json module report it
+ stripped = line
+ elif len(match[2]) == 1:
+ msg_fmt = _('bad_json_comment_line_{line_num}_char_{char_num}')
+
+ raise HaketiloException(msg_fmt.format(
+ line_num = line_num,
+ char_num = len(match[1]) + 1
+ ))
+ else:
+ stripped = match[1]
+
+ stripped_text.append(stripped)
+
+ return '\n'.join(stripped_text)
+
+_schema_name_re = re.compile(r'''
+(?P<name_base>[^/]*)
+-
+(?P<ver>
+ (?P<major>[1-9][0-9]*)
+ (?: # this repeated group matches the remaining version numbers
+ \.
+ (?:[1-9][0-9]*|0)
+ )*
+)
+\.schema\.json
+$
+''', re.VERBOSE)
+
+schema_paths: t.Dict[str, Path] = {}
+for path in (here / 'schemas').rglob('*.schema.json'):
+ match = _schema_name_re.match(path.name)
+ assert match is not None
+
+ schema_name_base = match.group('name_base')
+ schema_ver_list = match.group('ver').split('.')
+
+ for i in range(len(schema_ver_list)):
+ schema_ver = '.'.join(schema_ver_list[:i+1])
+ schema_paths[f'{schema_name_base}-{schema_ver}.schema.json'] = path
+
+schema_paths.update([(f'https://hydrilla.koszko.org/schemas/{name}', path)
+ for name, path in schema_paths.items()])
+
+schemas: t.Dict[Path, t.Dict[str, t.Any]] = {}
+
+class UnknownSchemaError(HaketiloException):
+ pass
+
+def _get_schema(schema_name: str) -> t.Dict[str, t.Any]:
+ """Return loaded JSON of the requested schema. Cache results."""
+ path = schema_paths.get(schema_name)
+ if path is None:
+ raise UnknownSchemaError(_('unknown_schema_{}').format(schema_name))
+
+ if path not in schemas:
+ schemas[path] = json.loads(path.read_text())
+
+ return schemas[path]
+
+def validator_for(schema: t.Union[str, t.Dict[str, t.Any]]) -> Draft7Validator:
+ """
+ Prepare a validator for the provided schema.
+
+ Other schemas under '../schemas' can be referenced.
+ """
+ if isinstance(schema, str):
+ schema = _get_schema(schema)
+
+ resolver = RefResolver(
+ base_uri=schema['$id'],
+ referrer=schema,
+ handlers={'https': _get_schema}
+ )
+
+ return Draft7Validator(schema, resolver=resolver)
+
+def parse_instance(text: str) -> object:
+ """Parse 'text' as JSON with additional '//' comments support."""
+ return json.loads(strip_json_comments(text))
+
+InstanceSource = t.Union[Path, str, io.TextIOBase, t.Dict[str, t.Any], bytes]
+
+def read_instance(instance_or_path: InstanceSource) -> object:
+ """...."""
+ if isinstance(instance_or_path, dict):
+ return instance_or_path
+
+ if isinstance(instance_or_path, bytes):
+ encoding = json.detect_encoding(instance_or_path)
+ text = instance_or_path.decode(encoding)
+ elif isinstance(instance_or_path, io.TextIOBase):
+ try:
+ text = instance_or_path.read()
+ finally:
+ instance_or_path.close()
+ else:
+ text = Path(instance_or_path).read_text()
+
+ try:
+ return parse_instance(text)
+ except:
+ if isinstance(instance_or_path, str) or \
+ isinstance(instance_or_path, Path):
+ fmt = _('err.util.text_in_{}_not_valid_json')
+ raise HaketiloException(fmt.format(instance_or_path))
+ else:
+ raise HaketiloException(_('err.util.text_not_valid_json'))
+
+def get_schema_version(instance: object) -> versions.VerTuple:
+ """
+ Parse passed object's "$schema" property and return the schema version tuple.
+ """
+ ver_str: t.Optional[str] = None
+
+ if isinstance(instance, dict) and type(instance.get('$schema')) is str:
+ match = _schema_name_re.search(instance['$schema'])
+ ver_str = match.group('ver') if match else None
+
+ if ver_str is not None:
+ return versions.parse_normalize(ver_str)
+ else:
+ raise HaketiloException(_('no_schema_number_in_instance'))
+
+def get_schema_major_number(instance: object) -> int:
+ """
+ Parse passed object's "$schema" property and return the major number of
+ schema version.
+ """
+ return get_schema_version(instance)[0]
+
+def validate_instance(instance: object, schema_name_fmt: str) -> int:
+ """...."""
+ major = get_schema_major_number(instance)
+ schema_name = schema_name_fmt.format(major)
+ validator = validator_for(schema_name)
+
+ validator.validate(instance)
+
+ return major