aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-06-13 11:06:49 +0200
committerWojtek Kosior <koszko@koszko.org>2022-07-16 16:31:44 +0200
commit52d12a4fa124daa1595529e3e7008276a7986d95 (patch)
tree9b56fe2d28ff0242f8511aca570be455112ad3df /tests
parent9dcbfdfe8620cc417438d1727aa1e0c89846e9bf (diff)
downloadhaketilo-hydrilla-52d12a4fa124daa1595529e3e7008276a7986d95.tar.gz
haketilo-hydrilla-52d12a4fa124daa1595529e3e7008276a7986d95.zip
unfinished partial work
Diffstat (limited to 'tests')
-rw-r--r--tests/helpers.py51
-rw-r--r--tests/test_build.py818
-rw-r--r--tests/test_item_infos.py527
-rw-r--r--tests/test_json_instances.py194
-rw-r--r--tests/test_local_apt.py754
-rw-r--r--tests/test_pattern_tree.py454
-rw-r--r--tests/test_server.py30
-rw-r--r--tests/test_url_patterns.py188
-rw-r--r--tests/test_versions.py41
-rw-r--r--tests/url_patterns_common.py23
10 files changed, 3061 insertions, 19 deletions
diff --git a/tests/helpers.py b/tests/helpers.py
new file mode 100644
index 0000000..df474b0
--- /dev/null
+++ b/tests/helpers.py
@@ -0,0 +1,51 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import re
+
+variable_word_re = re.compile(r'^<(.+)>$')
+
+def process_command(command, expected_command):
+ """Validate the command line and extract its variable parts (if any)."""
+ assert len(command) == len(expected_command)
+
+ extracted = {}
+ for word, expected_word in zip(command, expected_command):
+ match = variable_word_re.match(expected_word)
+ if match:
+ extracted[match.group(1)] = word
+ else:
+ assert word == expected_word
+
+ return extracted
+
+def run_missing_executable(command, **kwargs):
+ """
+ Instead of running a command, raise FileNotFoundError as if its executable
+ was missing.
+ """
+ raise FileNotFoundError('dummy')
+
+class MockedCompletedProcess:
+ """
+ Object with some fields similar to those of subprocess.CompletedProcess.
+ """
+ def __init__(self, args, returncode=0,
+ stdout='some output', stderr='some error output',
+ text_output=True):
+ """
+ Initialize MockedCompletedProcess. Convert strings to bytes if needed.
+ """
+ self.args = args
+ self.returncode = returncode
+
+ if type(stdout) is str and not text_output:
+ stdout = stdout.encode()
+ if type(stderr) is str and not text_output:
+ stderr = stderr.encode()
+
+ self.stdout = stdout
+ self.stderr = stderr
diff --git a/tests/test_build.py b/tests/test_build.py
new file mode 100644
index 0000000..868594e
--- /dev/null
+++ b/tests/test_build.py
@@ -0,0 +1,818 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import pytest
+import json
+import shutil
+import functools as ft
+
+from tempfile import TemporaryDirectory
+from pathlib import Path, PurePosixPath
+from hashlib import sha256
+from zipfile import ZipFile
+from contextlib import contextmanager
+
+from jsonschema import ValidationError
+
+from hydrilla import _version, json_instances, versions
+from hydrilla.json_instances import _schema_name_re
+from hydrilla.builder import build, local_apt
+from hydrilla.builder.common_errors import *
+
+from .helpers import *
+
+here = Path(__file__).resolve().parent
+
+expected_generated_by = {
+ 'name': 'hydrilla.builder',
+ 'version': _version.version
+}
+
+orig_srcdir = here / 'source-package-example'
+
+index_obj = json_instances.read_instance(orig_srcdir / 'index.json')
+
+def read_files(*file_list):
+ """
+ Take names of files under srcdir and return a dict that maps them to their
+ contents (as bytes).
+ """
+ return dict((name, (orig_srcdir / name).read_bytes()) for name in file_list)
+
+dist_files = {
+ **read_files('LICENSES/CC0-1.0.txt', 'bye.js', 'hello.js', 'message.js'),
+ 'report.spdx': b'dummy spdx output'
+}
+src_files = {
+ **dist_files,
+ **read_files('README.txt', 'README.txt.license', '.reuse/dep5',
+ 'index.json')
+}
+extra_archive_files = {
+}
+
+sha256_hashes = dict((name, sha256(contents).digest().hex())
+ for name, contents in src_files.items())
+
+del src_files['report.spdx']
+
+expected_source_copyright = [{
+ 'file': 'report.spdx',
+ 'sha256': sha256_hashes['report.spdx']
+}, {
+ 'file': 'LICENSES/CC0-1.0.txt',
+ 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
+}]
+
+expected_resources = [{
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': expected_source_copyright,
+ 'type': 'resource',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68',
+ 'version': [2021, 11, 10],
+ 'revision': 1,
+ 'description': 'greets an apple',
+ 'dependencies': [{'identifier': 'hello-message'}],
+ 'scripts': [{
+ 'file': 'hello.js',
+ 'sha256': sha256_hashes['hello.js']
+ }, {
+ 'file': 'bye.js',
+ 'sha256': sha256_hashes['bye.js']
+ }],
+ 'generated_by': expected_generated_by
+}, {
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': expected_source_copyright,
+ 'type': 'resource',
+ 'identifier': 'hello-message',
+ 'long_name': 'Hello Message',
+ 'uuid': '1ec36229-298c-4b35-8105-c4f2e1b9811e',
+ 'version': [2021, 11, 10],
+ 'revision': 2,
+ 'description': 'define messages for saying hello and bye',
+ 'dependencies': [],
+ 'scripts': [{
+ 'file': 'message.js',
+ 'sha256': sha256_hashes['message.js']
+ }],
+ 'generated_by': expected_generated_by
+}]
+
+expected_mapping = {
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': expected_source_copyright,
+ 'type': 'mapping',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7',
+ 'version': [2021, 11, 10],
+ 'description': 'causes apple to get greeted on Hydrillabugs issue tracker',
+ 'payloads': {
+ 'https://hydrillabugs.koszko.org/***': {
+ 'identifier': 'helloapple'
+ },
+ 'https://hachettebugs.koszko.org/***': {
+ 'identifier': 'helloapple'
+ }
+ },
+ 'generated_by': expected_generated_by
+}
+
+expected_source_description = {
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_source_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': expected_source_copyright,
+ 'source_archives': {
+ 'zip': {
+ 'sha256': '!!!!value to fill during test!!!!',
+ }
+ },
+ 'upstream_url': 'https://git.koszko.org/hydrilla-source-package-example',
+ 'definitions': [{
+ 'type': 'mapping',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'version': [2021, 11, 10],
+ }, {
+ 'type': 'resource',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'version': [2021, 11, 10],
+ }, {
+ 'type': 'resource',
+ 'identifier': 'hello-message',
+ 'long_name': 'Hello Message',
+ 'version': [2021, 11, 10],
+ }],
+ 'generated_by': expected_generated_by
+}
+
+expected = [expected_mapping, *expected_resources, expected_source_description]
+expected_items = expected[:3]
+
+def run_reuse(command, **kwargs):
+ """
+ Instead of running a 'reuse' command, check if 'mock_reuse_missing' file
+ exists under root directory. If yes, raise FileNotFoundError as if 'reuse'
+ command was missing. If not, check if 'README.txt.license' file exists
+ in the requested directory and return zero if it does.
+ """
+ expected = ['reuse', '--root', '<root>',
+ 'lint' if 'lint' in command else 'spdx']
+
+ root_path = Path(process_command(command, expected)['root'])
+
+ if (root_path / 'mock_reuse_missing').exists():
+ raise FileNotFoundError('dummy')
+
+ is_reuse_compliant = (root_path / 'README.txt.license').exists()
+
+ return MockedCompletedProcess(command, 1 - is_reuse_compliant,
+ stdout=f'dummy {expected[-1]} output',
+ text_output=kwargs.get('text'))
+
+mocked_piggybacked_archives = [
+ PurePosixPath('apt/something.deb'),
+ PurePosixPath('apt/something.orig.tar.gz'),
+ PurePosixPath('apt/something.debian.tar.xz'),
+ PurePosixPath('othersystem/other-something.tar.gz')
+]
+
+@pytest.fixture
+def mock_piggybacked_apt_system(monkeypatch):
+ """Make local_apt.piggybacked_system() return a mocked result."""
+ # We set 'td' to a temporary dir path further below.
+ td = None
+
+ class MockedPiggybacked:
+ """Minimal mock of Piggybacked object."""
+ package_license_files = [PurePosixPath('.apt-root/.../copyright')]
+ resource_must_depend = [{'identifier': 'apt-common-licenses'}]
+
+ def resolve_file(path):
+ """
+ For each path that starts with '.apt-root' return a valid dummy file
+ path.
+ """
+ if path.parts[0] != '.apt-root':
+ return None
+
+ (td / path.name).write_text(f'dummy {path.name}')
+
+ return (td / path.name)
+
+ def archive_files():
+ """Yield some valid dummy file path tuples."""
+ for desired_path in mocked_piggybacked_archives:
+ real_path = td / desired_path.name
+ real_path.write_text(f'dummy {desired_path.name}')
+
+ yield desired_path, real_path
+
+ @contextmanager
+ def mocked_piggybacked_system(piggyback_def, piggyback_files):
+ """Mock the execution of local_apt.piggybacked_system()."""
+ assert piggyback_def == {
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['somelib=1.0'],
+ 'dependencies': False
+ }
+ if piggyback_files is not None:
+ assert {str(path) for path in mocked_piggybacked_archives} == \
+ {path.relative_to(piggyback_files).as_posix()
+ for path in piggyback_files.rglob('*') if path.is_file()}
+
+ yield MockedPiggybacked
+
+ monkeypatch.setattr(local_apt, 'piggybacked_system',
+ mocked_piggybacked_system)
+
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ yield
+
+@pytest.fixture
+def sample_source():
+ """Prepare a directory with sample Haketilo source package."""
+ with TemporaryDirectory() as td:
+ sample_source = Path(td) / 'hello'
+ for name, contents in src_files.items():
+ path = sample_source / name
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_bytes(contents)
+
+ yield sample_source
+
+def collect(list):
+ """Decorate function by appending it to the specified list."""
+ def decorator(function):
+ """The actual decorator that will be applied."""
+ list.append(function)
+ return function
+
+ return decorator
+
+variant_makers = []
+
+@collect(variant_makers)
+def sample_source_change_index_json(monkeypatch, sample_source):
+ """
+ Return a non-standard path for index.json. Ensure parent directories exist.
+ """
+ # Use a path under sample_source so that it gets auto-deleted after the
+ # test. Use a file under .git because .git is ignored by REUSE.
+ path = sample_source / '.git' / 'replacement.json'
+ path.parent.mkdir()
+ return path
+
+@collect(variant_makers)
+def sample_source_add_comments(monkeypatch, sample_source):
+ """Add index.json comments that should be preserved."""
+ for dictionary in index_obj, *index_obj['definitions'], *expected:
+ monkeypatch.setitem(dictionary, 'comment', 'index.json comment')
+
+@collect(variant_makers)
+def sample_source_remove_spdx(monkeypatch, sample_source):
+ """Remove spdx report generation."""
+ monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report')
+
+ pred = lambda ref: ref['file'] != 'report.spdx'
+ copy_refs_in = list(filter(pred, index_obj['copyright']))
+ monkeypatch.setitem(index_obj, 'copyright', copy_refs_in)
+
+ copy_refs_out = list(filter(pred, expected_source_copyright))
+ for obj in expected:
+ monkeypatch.setitem(obj, 'source_copyright', copy_refs_out)
+
+ monkeypatch.delitem(dist_files, 'report.spdx')
+
+ # To verify that reuse does not get called now, make mocked subprocess.run()
+ # raise an error if called.
+ (sample_source / 'mock_reuse_missing').touch()
+
+@collect(variant_makers)
+def sample_source_remove_additional_files(monkeypatch, sample_source):
+ """Use default value ([]) for 'additionall_files' property."""
+ monkeypatch.delitem(index_obj, 'additional_files')
+
+ for name in 'README.txt', 'README.txt.license', '.reuse/dep5':
+ monkeypatch.delitem(src_files, name)
+
+@collect(variant_makers)
+def sample_source_remove_script(monkeypatch, sample_source):
+ """Use default value ([]) for 'scripts' property in one of the resources."""
+ monkeypatch.delitem(index_obj['definitions'][2], 'scripts')
+
+ monkeypatch.setitem(expected_resources[1], 'scripts', [])
+
+ for files in dist_files, src_files:
+ monkeypatch.delitem(files, 'message.js')
+
+@collect(variant_makers)
+def sample_source_remove_payloads(monkeypatch, sample_source):
+ """Use default value ({}) for 'payloads' property in mapping."""
+ monkeypatch.delitem(index_obj['definitions'][0], 'payloads')
+
+ monkeypatch.setitem(expected_mapping, 'payloads', {})
+
+@collect(variant_makers)
+def sample_source_remove_uuids(monkeypatch, sample_source):
+ """Don't use UUIDs (they are optional)."""
+ for definition in index_obj['definitions']:
+ monkeypatch.delitem(definition, 'uuid')
+
+ for description in expected:
+ if 'uuid' in description:
+ monkeypatch.delitem(description, 'uuid')
+
+@collect(variant_makers)
+def sample_source_add_extra_props(monkeypatch, sample_source):
+ """Add some unrecognized properties that should be stripped."""
+ to_process = [index_obj]
+ while to_process:
+ processed = to_process.pop()
+
+ if type(processed) is list:
+ to_process.extend(processed)
+ elif type(processed) is dict and 'spurious_property' not in processed:
+ to_process.extend(v for k, v in processed.items()
+ if k != 'payloads')
+ monkeypatch.setitem(processed, 'spurious_property', 'some_value')
+
+@collect(variant_makers)
+def sample_source_make_version_2(monkeypatch, sample_source,
+ expected_documents_to_modify=[]):
+ """Increase sources' schema version from 1 to 2."""
+ for obj in index_obj, *expected_documents_to_modify:
+ monkeypatch.setitem(obj, '$schema', obj['$schema'].replace('1', '2'))
+
+permission_variant_makers = []
+
+@collect(permission_variant_makers)
+def sample_source_bool_perm_ignored(permission, monkeypatch, sample_source,
+ value=True):
+ """
+ Specify a boolean permissions in sources, but keep sources' schema version
+ at 1.
+ """
+ for definition in index_obj['definitions']:
+ monkeypatch.setitem(definition, 'permissions', {permission: value})
+
+@collect(permission_variant_makers)
+def sample_source_bool_perm(permission, monkeypatch, sample_source):
+ """Specify a boolean permission in sources."""
+ sample_source_bool_perm_ignored(permission, monkeypatch, sample_source)
+ sample_source_make_version_2(monkeypatch, sample_source, expected_items)
+
+ for obj in expected_items:
+ monkeypatch.setitem(obj, 'permissions', {permission: True})
+
+@collect(permission_variant_makers)
+def sample_source_bool_perm_defaults(permission, monkeypatch, sample_source):
+ """
+ Specify a boolean permission in sources but use the default value ("False").
+ """
+ sample_source_bool_perm_ignored(permission, monkeypatch, sample_source,
+ value=False)
+ sample_source_make_version_2(monkeypatch, sample_source)
+
+for permission in 'cors_bypass', 'eval':
+ for variant_maker in permission_variant_makers:
+ variant_makers.append(ft.partial(variant_maker, permission))
+
+@collect(variant_makers)
+def sample_source_req_mappings_ignored(monkeypatch, sample_source,
+ value=[{'identifier': 'mapping-dep'}]):
+ """
+ Specify dependencies on mappings, but keep sources' schema version at 1.
+ """
+ for definition in index_obj['definitions']:
+ monkeypatch.setitem(definition, 'required_mappings', value);
+
+@collect(variant_makers)
+def sample_source_req_mappings(monkeypatch, sample_source):
+ """Specify dependencies on mappings."""
+ sample_source_req_mappings_ignored(monkeypatch, sample_source)
+ sample_source_make_version_2(monkeypatch, sample_source, expected_items)
+
+ for obj in expected_items:
+ monkeypatch.setitem(obj, 'required_mappings',
+ [{'identifier': 'mapping-dep'}])
+
+@collect(variant_makers)
+def sample_source_req_mappings_defaults(monkeypatch, sample_source):
+ """Specify dependencies of a mapping, but use the default value ("[]")."""
+ sample_source_req_mappings_ignored(monkeypatch, sample_source, value=[])
+ sample_source_make_version_2(monkeypatch, sample_source)
+
+@collect(variant_makers)
+def sample_source_combined_def(monkeypatch, sample_source):
+ """Define mapping and resource together."""
+ sample_source_make_version_2(monkeypatch, sample_source)
+
+ mapping_def = index_obj['definitions'][0]
+ resource_defs = index_obj['definitions'][1:3]
+
+ item_defs_shortened = [mapping_def, resource_defs[1]]
+ monkeypatch.setitem(index_obj, 'definitions', item_defs_shortened)
+
+ monkeypatch.setitem(mapping_def, 'type', 'mapping_and_resource')
+
+ new_mapping_ver = [*expected_mapping['version'], 1]
+ monkeypatch.setitem(mapping_def, 'revision', 1)
+ monkeypatch.setitem(expected_mapping, 'version', new_mapping_ver)
+
+ for prop in 'scripts', 'dependencies':
+ monkeypatch.setitem(mapping_def, prop, resource_defs[0][prop])
+
+ monkeypatch.setitem(expected_resources[0], 'uuid', mapping_def['uuid'])
+ monkeypatch.setitem(expected_resources[0], 'description',
+ mapping_def['description'])
+
+ monkeypatch.setitem(expected_source_description['definitions'][0],
+ 'version', new_mapping_ver)
+
+@collect(variant_makers)
+def sample_source_minmax_haketilo_ver_ignored(monkeypatch, sample_source,
+ min_ver=[1, 2], max_ver=[1, 2]):
+ """
+ Specify version constraints on Haketilo, but keep sources' schema version at
+ 1.
+ """
+ mapping_def = index_obj['definitions'][0]
+ monkeypatch.setitem(mapping_def, 'min_haketilo_version', min_ver)
+ monkeypatch.setitem(mapping_def, 'max_haketilo_version', max_ver)
+
+@collect(variant_makers)
+def sample_source_minmax_haketilo_ver(monkeypatch, sample_source):
+ """Specify version constraints on Haketilo."""
+ sample_source_minmax_haketilo_ver_ignored(monkeypatch, sample_source)
+ sample_source_make_version_2(monkeypatch, sample_source, [expected_mapping])
+
+ monkeypatch.setitem(expected_mapping, 'min_haketilo_version', [1, 2])
+ monkeypatch.setitem(expected_mapping, 'max_haketilo_version', [1, 2])
+
+@collect(variant_makers)
+def sample_source_minmax_haketilo_ver_default(monkeypatch, sample_source):
+ """Specify version constraints on Haketilo, but use default values."""
+ sample_source_minmax_haketilo_ver_ignored(monkeypatch, sample_source,
+ min_ver=[1], max_ver=[65536])
+ sample_source_make_version_2(monkeypatch, sample_source)
+
+piggyback_archive_names = [
+ 'apt/something.deb',
+ 'apt/something.orig.tar.gz',
+ 'apt/something.debian.tar.xz',
+ 'othersystem/other-something.tar.gz'
+]
+
+@collect(variant_makers)
+def sample_source_add_piggyback_ignored(monkeypatch, sample_source,
+ extra_build_args={}):
+ """
+ Add piggybacked foreign system packages, but keep sources' schema version at
+ 1.
+ """
+ old_build = build.Build
+ new_build = lambda *a, **kwa: old_build(*a, **kwa, **extra_build_args)
+ monkeypatch.setattr(build, 'Build', new_build)
+
+ monkeypatch.setitem(index_obj, 'piggyback_on', {
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['somelib=1.0'],
+ 'dependencies': False
+ })
+
+@collect(variant_makers)
+def sample_source_add_piggyback(monkeypatch, sample_source,
+ extra_build_args={}):
+ """Add piggybacked foreign system packages."""
+ sample_source_add_piggyback_ignored\
+ (monkeypatch, sample_source, extra_build_args)
+
+ sample_source_make_version_2(monkeypatch, sample_source)
+
+ new_refs = {}
+ for name in '.apt-root/.../copyright', '.apt-root/.../script.js':
+ contents = f'dummy {PurePosixPath(name).name}'.encode()
+ digest = sha256(contents).digest().hex()
+ monkeypatch.setitem(dist_files, name, contents)
+ monkeypatch.setitem(sha256_hashes, name, digest)
+ new_refs[PurePosixPath(name).name] = {'file': name, 'sha256': digest}
+
+ new_list = [*expected_source_copyright, new_refs['copyright']]
+ for obj in expected:
+ monkeypatch.setitem(obj, 'source_copyright', new_list)
+
+ for obj in expected_resources:
+ new_list = [{'identifier': 'apt-common-licenses'}, *obj['dependencies']]
+ monkeypatch.setitem(obj, 'dependencies', new_list)
+
+ for obj in index_obj['definitions'][1], expected_resources[0]:
+ new_list = [new_refs['script.js'], *obj['scripts']]
+ monkeypatch.setitem(obj, 'scripts', new_list)
+
+ for name in piggyback_archive_names:
+ path = PurePosixPath('hello.foreign-packages') / name
+ monkeypatch.setitem(extra_archive_files, str(path),
+ f'dummy {path.name}'.encode())
+
+def prepare_foreign_packages_dir(path):
+ """
+ Put some dummy archive in the directory so that it can be passed to
+ piggybacked_system().
+ """
+ for name in piggyback_archive_names:
+ archive_path = path / name
+ archive_path.parent.mkdir(parents=True, exist_ok=True)
+ archive_path.write_text(f'dummy {archive_path.name}')
+
+@collect(variant_makers)
+def sample_source_add_piggyback_pass_archives(monkeypatch, sample_source):
+ """
+ Add piggybacked foreign system packages, use pre-downloaded foreign package
+ archives (have Build() find them in their default directory).
+ """
+ # Dir next to 'sample_source' will also be gc'd by sample_source() fixture.
+ foreign_packages_dir = sample_source.parent / 'arbitrary-name'
+
+ prepare_foreign_packages_dir(foreign_packages_dir)
+
+ sample_source_add_piggyback(monkeypatch, sample_source,
+ {'piggyback_files': foreign_packages_dir})
+
+@collect(variant_makers)
+def sample_source_add_piggyback_find_archives(monkeypatch, sample_source):
+ """
+ Add piggybacked foreign system packages, use pre-downloaded foreign package
+ archives (specify their directory as argument to Build()).
+ """
+ # Dir next to 'sample_source' will also be gc'd by sample_source() fixture.
+ foreign_packages_dir = sample_source.parent / 'hello.foreign-packages'
+
+ prepare_foreign_packages_dir(foreign_packages_dir)
+
+ sample_source_add_piggyback(monkeypatch, sample_source)
+
+@collect(variant_makers)
+def sample_source_add_piggyback_no_download(monkeypatch, sample_source,
+ pass_directory_to_build=False):
+ """
+ Add piggybacked foreign system packages, use pre-downloaded foreign package
+ archives.
+ """
+ # Use a dir next to 'sample_source'; have it gc'd by sample_source fixture.
+ if pass_directory_to_build:
+ foreign_packages_dir = sample_source.parent / 'arbitrary-name'
+ else:
+ foreign_packages_dir = sample_source.parent / 'hello.foreign-packages'
+
+ prepare_foreign_packages_dir(foreign_packages_dir)
+
+ sample_source_add_piggyback(monkeypatch, sample_source)
+
+@pytest.fixture(params=[lambda m, s: None, *variant_makers])
+def sample_source_make_variants(request, monkeypatch, sample_source,
+ mock_piggybacked_apt_system):
+ """
+ Prepare a directory with sample Haketilo source package in multiple slightly
+ different versions (all correct). Return an index.json path that should be
+ used when performing test build.
+ """
+ index_path = request.param(monkeypatch, sample_source) or Path('index.json')
+
+ index_text = json.dumps(index_obj)
+
+ (sample_source / index_path).write_text(index_text)
+
+ monkeypatch.setitem(src_files, 'index.json', index_text.encode())
+
+ return index_path
+
+def try_validate(as_what, instance):
+ """
+ Select the right JSON schema. Return without errors only if the instance
+ validates against it.
+ """
+ schema_fmt = f'{as_what}-{{}}.schema.json'
+ json_instances.validate_instance(instance, schema_fmt)
+
+@pytest.mark.subprocess_run(build, run_reuse)
+@pytest.mark.usefixtures('mock_subprocess_run')
+def test_build(sample_source, sample_source_make_variants, tmpdir):
+ """Build the sample source package and verify the produced files."""
+ index_json_path = sample_source_make_variants
+
+ # First, build the package
+ build.Build(sample_source, index_json_path).write_package_files(tmpdir)
+
+ # Verify directories under destination directory
+ assert {'file', 'resource', 'mapping', 'source'} == \
+ set([path.name for path in tmpdir.iterdir()])
+
+ # Verify files under 'file/'
+ file_dir = tmpdir / 'file' / 'sha256'
+
+ for name, contents in dist_files.items():
+ dist_file_path = file_dir / sha256_hashes[name]
+ assert dist_file_path.is_file()
+ assert dist_file_path.read_bytes() == contents
+
+ assert {p.name for p in file_dir.iterdir()} == \
+ {sha256_hashes[name] for name in dist_files.keys()}
+
+ # Verify files under 'resource/'
+ resource_dir = tmpdir / 'resource'
+
+ assert {rj['identifier'] for rj in expected_resources} == \
+ {path.name for path in resource_dir.iterdir()}
+
+ for resource_json in expected_resources:
+ subdir = resource_dir / resource_json['identifier']
+ ver_str = versions.version_string(resource_json['version'])
+ assert [ver_str] == [path.name for path in subdir.iterdir()]
+
+ assert json.loads((subdir / ver_str).read_text()) == resource_json
+
+ try_validate('api_resource_description', resource_json)
+
+ # Verify files under 'mapping/'
+ mapping_dir = tmpdir / 'mapping'
+ assert ['helloapple'] == [path.name for path in mapping_dir.iterdir()]
+
+ subdir = mapping_dir / 'helloapple'
+
+ ver_str = versions.version_string(expected_mapping['version'])
+ assert [ver_str] == [path.name for path in subdir.iterdir()]
+
+ assert json.loads((subdir / ver_str).read_text()) == expected_mapping
+
+ try_validate('api_mapping_description', expected_mapping)
+
+ # Verify files under 'source/'
+ source_dir = tmpdir / 'source'
+ assert {'hello.json', 'hello.zip'} == \
+ {path.name for path in source_dir.iterdir()}
+
+ archive_files = {**dict((f'hello/{name}', contents)
+ for name, contents in src_files.items()),
+ **extra_archive_files}
+
+ with ZipFile(source_dir / 'hello.zip', 'r') as archive:
+ print(archive.namelist())
+ assert len(archive.namelist()) == len(archive_files)
+
+ for name, contents in archive_files.items():
+ assert archive.read(name) == contents
+
+ zip_ref = expected_source_description['source_archives']['zip']
+ zip_contents = (source_dir / 'hello.zip').read_bytes()
+ zip_ref['sha256'] = sha256(zip_contents).digest().hex()
+
+ assert json.loads((source_dir / 'hello.json').read_text()) == \
+ expected_source_description
+
+ try_validate('api_source_description', expected_source_description)
+
+error_makers = []
+
+@collect(error_makers)
+def sample_source_error_missing_file(monkeypatch, sample_source):
+ """
+ Modify index.json to expect missing report.spdx file and cause an error.
+ """
+ monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report')
+ return FileReferenceError, '^referenced_file_report.spdx_missing$'
+
+@collect(error_makers)
+def sample_source_error_index_schema(monkeypatch, sample_source):
+ """Modify index.json to be incompliant with the schema."""
+ monkeypatch.delitem(index_obj, 'definitions')
+ return ValidationError,
+
+@collect(error_makers)
+def sample_source_error_unknown_index_schema(monkeypatch, sample_source):
+ """Modify index.json to be use a not-yet-released schema."""
+ schema_id = \
+ 'https://hydrilla.koszko.org/schemas/package_source-65536.schema.json'
+ monkeypatch.setitem(index_obj, "$schema", schema_id)
+ return hydrilla_util.UnknownSchemaError, \
+ r'^unknown_schema_package_source_.*/hello/index\.json$'
+
+@collect(error_makers)
+def sample_source_error_bad_comment(monkeypatch, sample_source):
+ """Modify index.json to have an invalid '/' in it."""
+ return json.JSONDecodeError, '^bad_comment: .*', \
+ json.dumps(index_obj) + '/something\n'
+
+@collect(error_makers)
+def sample_source_error_bad_json(monkeypatch, sample_source):
+ """Modify index.json to not be valid json even after comment stripping."""
+ return json.JSONDecodeError, '', json.dumps(index_obj) + '???\n'
+
+@collect(error_makers)
+def sample_source_error_missing_reuse(monkeypatch, sample_source):
+ """Cause mocked reuse process invocation to fail with FileNotFoundError."""
+ (sample_source / 'mock_reuse_missing').touch()
+ return build.ReuseError, '^couldnt_execute_reuse_is_it_installed$'
+
+@collect(error_makers)
+def sample_source_error_missing_license(monkeypatch, sample_source):
+ """Remove a file to make package REUSE-incompliant."""
+ (sample_source / 'README.txt.license').unlink()
+
+ error_regex = """^\
+command_reuse --root \\S+ lint_failed
+
+STDOUT_OUTPUT_heading
+
+dummy lint output
+
+STDERR_OUTPUT_heading
+
+some error output\
+$\
+"""
+
+ return build.ReuseError, error_regex
+
+@collect(error_makers)
+def sample_source_error_file_outside(monkeypatch, sample_source):
+ """Make index.json illegally reference a file outside srcdir."""
+ new_list = [*index_obj['copyright'], {'file': '../abc'}]
+ monkeypatch.setitem(index_obj, 'copyright', new_list)
+ return FileReferenceError, '^path_contains_double_dot_\\.\\./abc$'
+
+@collect(error_makers)
+def sample_source_error_reference_itself(monkeypatch, sample_source):
+ """Make index.json illegally reference index.json."""
+ new_list = [*index_obj['copyright'], {'file': 'index.json'}]
+ monkeypatch.setitem(index_obj, 'copyright', new_list)
+ return FileReferenceError, '^loading_reserved_index_json$'
+
+@collect(error_makers)
+def sample_source_error_report_excluded(monkeypatch, sample_source):
+ """
+ Make index.json require generation of report.spdx but don't include it among
+ copyright files.
+ """
+ new_list = [file_ref for file_ref in index_obj['copyright']
+ if file_ref['file'] != 'report.spdx']
+ monkeypatch.setitem(index_obj, 'copyright', new_list)
+ return FileReferenceError, '^report_spdx_not_in_copyright_list$'
+
+@collect(error_makers)
+def sample_source_error_combined_unsupported(monkeypatch, sample_source):
+ """
+ Define mapping and resource together but leave source schema version at 1.x
+ where this is unsupported.
+ """
+ mapping_def = index_obj['definitions'][0]
+ monkeypatch.setitem(mapping_def, 'type', 'mapping_and_resource')
+
+ return ValidationError,
+
+@pytest.fixture(params=error_makers)
+def sample_source_make_errors(request, monkeypatch, sample_source):
+ """
+ Prepare a directory with sample Haketilo source package in multiple slightly
+ broken versions. Return an error type that should be raised when running
+ test build.
+ """
+ error_type, error_regex, index_text = \
+ [*request.param(monkeypatch, sample_source), '', ''][0:3]
+
+ index_text = index_text or json.dumps(index_obj)
+
+ (sample_source / 'index.json').write_text(index_text)
+
+ monkeypatch.setitem(src_files, 'index.json', index_text.encode())
+
+ return error_type, error_regex
+
+@pytest.mark.subprocess_run(build, run_reuse)
+@pytest.mark.usefixtures('mock_subprocess_run')
+def test_build_error(tmpdir, sample_source, sample_source_make_errors):
+ """Try building the sample source package and verify generated errors."""
+ error_type, error_regex = sample_source_make_errors
+
+ dstdir = Path(tmpdir) / 'dstdir'
+ dstdir.mkdir(exist_ok=True)
+
+ with pytest.raises(error_type, match=error_regex):
+ build.Build(sample_source, Path('index.json'))\
+ .write_package_files(dstdir)
diff --git a/tests/test_item_infos.py b/tests/test_item_infos.py
new file mode 100644
index 0000000..9de3c96
--- /dev/null
+++ b/tests/test_item_infos.py
@@ -0,0 +1,527 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+import pathlib
+import re
+import dataclasses as dc
+
+from immutables import Map
+
+from hydrilla import item_infos, versions, json_instances
+from hydrilla.exceptions import HaketiloException
+
+def test_make_item_refs_seq_empty():
+ """...."""
+ assert item_infos.make_item_refs_seq([]) == ()
+
+def test_get_item_refs_seq_nonempty():
+ """...."""
+ ref_objs = [{'identifier': 'abc'}, {'identifier': 'def'}]
+
+ result = item_infos.make_item_refs_seq(ref_objs)
+
+ assert type(result) is tuple
+ assert [ref.identifier for ref in result] == ['abc', 'def']
+
+@pytest.fixture
+def mock_make_item_refs_seq(monkeypatch):
+ """...."""
+ def mocked_make_item_refs_seq(ref_objs):
+ """...."""
+ assert ref_objs == getattr(
+ mocked_make_item_refs_seq,
+ 'expected',
+ [{'identifier': 'abc'}, {'identifier': 'def'}]
+ )
+
+ return (item_infos.ItemRef('abc'), item_infos.ItemRef('def'))
+
+ monkeypatch.setattr(item_infos, 'make_item_refs_seq',
+ mocked_make_item_refs_seq)
+
+ return mocked_make_item_refs_seq
+
+def test_make_required_mappings_compat_too_low():
+ """...."""
+ assert item_infos.make_required_mappings('whatever', 1) == ()
+
+@pytest.mark.usefixtures('mock_make_item_refs_seq')
+def test_make_required_mappings_compat_ok():
+ """...."""
+ ref_objs = [{'identifier': 'abc'}, {'identifier': 'def'}]
+
+ assert item_infos.make_required_mappings(ref_objs, 2) == \
+ (item_infos.ItemRef('abc'), item_infos.ItemRef('def'))
+
+def test_make_file_refs_seq_empty():
+ """...."""
+ assert item_infos.make_file_refs_seq([]) == ()
+
+def test_make_file_refs_seq_nonempty():
+ """...."""
+ ref_objs = [{'file': 'abc', 'sha256': 'dummy_hash1'},
+ {'file': 'def', 'sha256': 'dummy_hash2'}]
+
+ result = item_infos.make_file_refs_seq(ref_objs)
+
+ assert type(result) is tuple
+ assert [ref.name for ref in result] == ['abc', 'def']
+ assert [ref.sha256 for ref in result] == ['dummy_hash1', 'dummy_hash2']
+
+def test_generated_by_make_empty():
+ """...."""
+ assert item_infos.GeneratedBy.make(None) == None
+
+@pytest.mark.parametrize('_in, out_version', [
+ ({'name': 'abc'}, None),
+ ({'name': 'abc', 'version': '1.1.1'}, '1.1.1')
+])
+def test_generated_by_make_nonempty(_in, out_version):
+ """...."""
+ generated_by = item_infos.GeneratedBy.make(_in)
+
+ assert generated_by.name == 'abc'
+ assert generated_by.version == out_version
+
+def test_load_item_info(monkeypatch):
+ """...."""
+ def mocked_read_instance(instance_or_path):
+ """...."""
+ assert instance_or_path == 'dummy_path'
+ return 'dummy_instance'
+
+ monkeypatch.setattr(json_instances, 'read_instance', mocked_read_instance)
+
+ def mocked_validate_instance(instance, schema_fmt):
+ """...."""
+ assert instance == 'dummy_instance'
+ assert schema_fmt == 'api_exotictype_description-{}.schema.json'
+ return 7
+
+ monkeypatch.setattr(json_instances, 'validate_instance',
+ mocked_validate_instance)
+
+ class MockedLoadedType:
+ """...."""
+ def make(instance, schema_compat, repository):
+ """...."""
+ assert instance == 'dummy_instance'
+ assert schema_compat == 7
+ assert repository == 'somerepo'
+ return 'dummy_item_info'
+
+ type_name = 'exotictype'
+
+ assert item_infos._load_item_info(
+ MockedLoadedType,
+ 'dummy_path',
+ 'somerepo'
+ ) == 'dummy_item_info'
+
+def test_make_payloads(monkeypatch):
+ """...."""
+ payloads_obj = {'http*://example.com/': {'identifier': 'someresource'}}
+
+ def mocked_parse_pattern(pattern):
+ """...."""
+ assert pattern == 'http*://example.com/'
+
+ yield 'dummy_parsed_pattern_1'
+ yield 'dummy_parsed_pattern_2'
+
+ monkeypatch.setattr(item_infos, 'parse_pattern', mocked_parse_pattern)
+
+ assert item_infos.make_payloads(payloads_obj) == Map({
+ 'dummy_parsed_pattern_1': item_infos.ItemRef('someresource'),
+ 'dummy_parsed_pattern_2': item_infos.ItemRef('someresource')
+ })
+
+@pytest.mark.parametrize('info_mod, in_mod', [
+ ({}, {}),
+ ({'uuid': 'dummy_uuid'}, {}),
+ ({}, {'uuid': 'dummy_uuid'}),
+ ({'uuid': 'dummy_uuid'}, {'uuid': 'dummy_uuid'}),
+ ({}, {'identifier': 'abc', '_initialized': True}),
+ ({}, {'_by_version': Map({(1, 2): 'dummy_old_info'})})
+])
+def test_versioned_item_info_register(info_mod, in_mod):
+ """...."""
+ class DummyInfo:
+ """...."""
+ uuid = None
+ identifier = 'abc'
+ version = (1, 2)
+
+ for name, value in info_mod.items():
+ setattr(DummyInfo, name, value)
+
+ in_fields = {
+ 'uuid': None,
+ 'identifier': '<dummy>',
+ '_by_version': Map(),
+ '_initialized': False,
+ **in_mod
+ }
+ out_fields = {
+ 'uuid': DummyInfo.uuid or in_mod.get('uuid'),
+ 'identifier': DummyInfo.identifier,
+ '_by_version': Map({(1, 2): DummyInfo}),
+ '_initialized': True
+ }
+
+ versioned = item_infos.VersionedItemInfo(**in_fields)
+ new_versioned = versioned.register(DummyInfo)
+
+ assert dc.asdict(versioned) == in_fields
+ assert dc.asdict(new_versioned) == out_fields
+
+def test_versioned_item_info_register_bad_uuid():
+ """...."""
+ versioned = item_infos.VersionedItemInfo(
+ identifier='abc',
+ uuid='old_uuid'
+ )
+
+ class DummyInfo:
+ """...."""
+ uuid = 'new_uuid'
+ identifier = 'abc'
+ version = (1, 2)
+
+ with pytest.raises(HaketiloException, match='^uuid_mismatch_abc$'):
+ versioned.register(DummyInfo)
+
+@pytest.mark.parametrize('previous_registrations', [
+ Map(),
+ Map({(1, 2): 'dummy_info'})
+])
+def test_versioned_item_info_unregister(previous_registrations):
+ """...."""
+ versioned = item_infos.VersionedItemInfo(
+ identifier = 'abc',
+ _by_version = previous_registrations
+ )
+
+ assert versioned.unregister((1, 2)) == \
+ dc.replace(versioned, _by_version=Map())
+
+@pytest.mark.parametrize('registrations, out', [
+ (Map(), True),
+ (Map({(1, 2): 'dummy_info'}), False)
+])
+def test_versioned_item_info_is_empty(registrations, out):
+ """...."""
+ versioned = item_infos.VersionedItemInfo(
+ identifier = 'abc',
+ _by_version = registrations
+ )
+
+ assert versioned.is_empty() == out
+
+@pytest.mark.parametrize('versions, out', [
+ ([(1, 2), (1, 2, 1), (0, 9999, 4), (1, 0, 2)], (1, 2, 1)),
+ ([(1, 2)], (1, 2))
+])
+def test_versioned_item_info_newest_version(versions, out):
+ """...."""
+ versioned = item_infos.VersionedItemInfo(
+ identifier = 'abc',
+ _by_version = Map((ver, 'dummy_info') for ver in versions)
+ )
+
+ assert versioned.newest_version() == out
+
+def test_versioned_item_info_newest_version_bad(monkeypatch):
+ """...."""
+ monkeypatch.setattr(item_infos.VersionedItemInfo, 'newest_version',
+ lambda self: 'dummy_ver1')
+
+ versioned = item_infos.VersionedItemInfo(
+ identifier = 'abc',
+ _by_version = Map(dummy_ver1='dummy_info1', dummy_ver2='dummy_info2')
+ )
+
+ assert versioned.get_newest() == 'dummy_info1'
+
+def test_versioned_item_info_get_by_ver():
+ """...."""
+ versioned = item_infos.VersionedItemInfo(
+ identifier = 'abc',
+ _by_version = Map({(1, 2): 'dummy_info1', (3, 4, 5): 'dummy_info2'})
+ )
+
+ assert versioned.get_by_ver(range(1, 3)) == 'dummy_info1'
+
+@pytest.mark.parametrize('versions, out', [
+ ([(1, 2), (0, 999, 4), (1, 0, 2)], ['(0, 999, 4)', '(1, 0, 2)', '(1, 2)']),
+ ([], [])
+])
+def test_versioned_item_get_all(versions, out):
+ """...."""
+ versioned = item_infos.VersionedItemInfo(
+ identifier = 'abc',
+ _by_version = Map((ver, str(ver)) for ver in versions)
+ )
+
+ assert [*versioned.get_all()] == out
+
+sample_resource_obj = {
+ 'source_name': 'somesource',
+ 'source_copyright': [{'file': 'ABC', 'sha256': 'dummy_sha256'}],
+ 'version': [1, 2, 3, 0],
+ 'identifier': 'someid',
+ 'uuid': None,
+ 'long_name': 'Some Thing',
+ 'required_mappings': [{'identifier': 'required1'}],
+ 'generated_by': {'name': 'sometool', 'version': '1.1.1'},
+ 'revision': 4,
+ 'dependencies': [{'identifier': 'abc'}, {'identifier': 'def'}],
+ 'scripts': [{'file': 'ABC', 'sha256': 'dummy_sha256'}]
+}
+
+sample_mapping_obj = {
+ **sample_resource_obj,
+ 'payloads': {
+ 'https://example.com/': {'identifier': 'someresource'}
+ }
+}
+
+del sample_mapping_obj['dependencies']
+del sample_mapping_obj['scripts']
+
+@pytest.fixture(scope='session')
+def sample_resource_info():
+ """...."""
+ return item_infos.ResourceInfo(
+ repository = 'somerepo',
+ source_name = 'somesource',
+ source_copyright = (item_infos.FileRef('ABC', 'dummy_sha256'),),
+ version = (1, 2, 3),
+ identifier = 'someid',
+ uuid = None,
+ long_name = 'Some Thing',
+ required_mappings = (item_infos.ItemRef('required1'),),
+ generated_by = item_infos.GeneratedBy('sometool', '1.1.1'),
+ revision = 4,
+ dependencies = (item_infos.ItemRef('abc'),
+ item_infos.ItemRef('def')),
+ scripts = (item_infos.FileRef('ABC', 'dummy_sha256'),)
+ )
+
+@pytest.fixture(scope='session')
+def sample_mapping_info():
+ """...."""
+ payloads = Map({'https://example.com/': item_infos.ItemRef('someresource')})
+
+ return item_infos.MappingInfo(
+ repository = 'somerepo',
+ source_name = 'somesource',
+ source_copyright = (item_infos.FileRef('ABC', 'dummy_sha256'),),
+ version = (1, 2, 3),
+ identifier = 'someid',
+ uuid = None,
+ long_name = 'Some Thing',
+ required_mappings = (item_infos.ItemRef('required1'),),
+ generated_by = item_infos.GeneratedBy('sometool', '1.1.1'),
+ payloads = payloads
+ )
+
+@pytest.fixture(scope='session')
+def sample_info_base_init_kwargs(sample_resource_info):
+ kwargs = {}
+ for field_name in item_infos.ItemInfoBase.__annotations__.keys():
+ kwargs[field_name] = getattr(sample_resource_info, field_name)
+
+ return Map(kwargs)
+
+@pytest.fixture
+def mock_version_string(monkeypatch):
+ """...."""
+ def mocked_version_string(version, revision=None):
+ """...."""
+ assert version == (1, 2, 3)
+ assert revision in (None, 4)
+ return '1.2.3' if revision is None else '1.2.3-4'
+
+ monkeypatch.setattr(versions, 'version_string', mocked_version_string)
+
+@pytest.mark.usefixtures('mock_version_string')
+def test_item_info_path(sample_resource_info):
+ """...."""
+ assert sample_resource_info.path_relative_to_type() == 'someid/1.2.3'
+ assert sample_resource_info.path() == 'resource/someid/1.2.3'
+
+@pytest.mark.usefixtures('mock_version_string')
+def test_resource_info_versioned_identifier(sample_resource_info, monkeypatch):
+ """...."""
+ monkeypatch.setattr(item_infos.ItemInfoBase, 'versioned_identifier',
+ lambda self: '<dummy>')
+
+ assert sample_resource_info.versioned_identifier == '<dummy>-4'
+
+@pytest.mark.usefixtures('mock_version_string')
+def test_mapping_info_versioned_identifier(sample_mapping_info):
+ """...."""
+ assert sample_mapping_info.versioned_identifier == 'someid-1.2.3'
+
+@pytest.fixture
+def mock_make_file_refs_seq(monkeypatch):
+ """...."""
+ def mocked_make_file_refs_seq(ref_objs):
+ """...."""
+ assert ref_objs == getattr(
+ mocked_make_file_refs_seq,
+ 'expected',
+ [{'file': 'ABC', 'sha256': 'dummy_sha256'}]
+ )
+
+ return (item_infos.FileRef(name='ABC', sha256='dummy_sha256'),)
+
+ monkeypatch.setattr(item_infos, 'make_file_refs_seq',
+ mocked_make_file_refs_seq)
+
+ return mocked_make_file_refs_seq
+
+@pytest.mark.parametrize('missing_prop', [
+ 'required_mappings',
+ 'generated_by',
+ 'uuid'
+])
+@pytest.mark.usefixtures('mock_make_item_refs_seq', 'mock_make_file_refs_seq')
+def test_item_info_get_base_init_kwargs(
+ missing_prop,
+ monkeypatch,
+ sample_resource_info,
+ sample_info_base_init_kwargs,
+ mock_make_file_refs_seq
+):
+ """...."""
+ monkeypatch.delitem(sample_resource_obj, missing_prop)
+
+ def mocked_normalize_version(version):
+ """...."""
+ assert version == [1, 2, 3, 0]
+
+ return (1, 2, 3)
+
+ monkeypatch.setattr(versions, 'normalize_version', mocked_normalize_version)
+
+ def mocked_make_required_mappings(ref_objs, schema_compat):
+ """...."""
+ if missing_prop == 'required_mappings':
+ assert ref_objs == []
+ else:
+ assert ref_objs == [{'identifier': 'required1'}]
+
+ assert schema_compat == 2
+
+ return (item_infos.ItemRef('required1'),)
+
+ monkeypatch.setattr(item_infos, 'make_required_mappings',
+ mocked_make_required_mappings)
+
+ def mocked_generated_by_make(generated_by_obj):
+ """...."""
+ if missing_prop == 'generated_by':
+ assert generated_by_obj == None
+ else:
+ assert generated_by_obj == {'name': 'sometool', 'version': '1.1.1'}
+
+ return item_infos.GeneratedBy(name='sometool', version='1.1.1')
+
+ monkeypatch.setattr(item_infos.GeneratedBy, 'make',
+ mocked_generated_by_make)
+
+ expected = sample_info_base_init_kwargs
+ if missing_prop == 'uuid':
+ expected = expected.set('uuid', None)
+
+ Base = item_infos.ItemInfoBase
+ assert Base._get_base_init_kwargs(sample_resource_obj, 2, 'somerepo') == \
+ expected
+
+@pytest.fixture
+def mock_get_base_init_kwargs(monkeypatch, sample_info_base_init_kwargs):
+ """...."""
+ def mocked_get_base_init_kwargs(item_obj, schema_compat, repository):
+ """...."""
+ assert schema_compat == 2
+ assert item_obj['identifier'] == 'someid'
+ assert repository == 'somerepo'
+
+ return sample_info_base_init_kwargs
+
+ monkeypatch.setattr(item_infos.ItemInfoBase, '_get_base_init_kwargs',
+ mocked_get_base_init_kwargs)
+
+@pytest.mark.parametrize('missing_prop', ['dependencies', 'scripts'])
+@pytest.mark.usefixtures('mock_get_base_init_kwargs')
+def test_resource_info_make(
+ missing_prop,
+ monkeypatch,
+ sample_resource_info,
+ mock_make_item_refs_seq,
+ mock_make_file_refs_seq
+):
+ """...."""
+ _in = sample_resource_obj
+ monkeypatch.delitem(_in, missing_prop)
+
+ if missing_prop == 'dependencies':
+ mock_make_item_refs_seq.expected = []
+ elif missing_prop == 'scripts':
+ mock_make_file_refs_seq.expected = []
+
+ assert item_infos.ResourceInfo.make(_in, 2, 'somerepo') == \
+ sample_resource_info
+
+@pytest.mark.parametrize('missing_payloads', [True, False])
+@pytest.mark.usefixtures('mock_get_base_init_kwargs', 'mock_make_item_refs_seq')
+def test_mapping_info_make(missing_payloads, monkeypatch, sample_mapping_info):
+ """...."""
+ _in = sample_mapping_obj
+ if missing_payloads:
+ monkeypatch.delitem(_in, 'payloads')
+
+ def mocked_make_payloads(payloads_obj):
+ """...."""
+ if missing_payloads:
+ assert payloads_obj == {}
+ else:
+ assert payloads_obj == \
+ {'https://example.com/': {'identifier': 'someresource'}}
+
+ return Map({'https://example.com/': item_infos.ItemRef('someresource')})
+
+ monkeypatch.setattr(item_infos, 'make_payloads', mocked_make_payloads)
+
+ assert item_infos.MappingInfo.make(_in, 2, 'somerepo') == \
+ sample_mapping_info
+
+@pytest.mark.parametrize('type_name', ['ResourceInfo', 'MappingInfo'])
+def test_make_item_info(type_name, monkeypatch):
+ """...."""
+ info_type = getattr(item_infos, type_name)
+
+ def mocked_load_item_info(_info_type, instance_or_path, repository):
+ """...."""
+ assert _info_type == info_type
+ assert instance_or_path == 'dummy_path'
+ assert repository == 'somerepo'
+ return 'dummy_info'
+
+ monkeypatch.setattr(item_infos, '_load_item_info', mocked_load_item_info)
+
+ assert info_type.load('dummy_path', 'somerepo') == 'dummy_info'
+
+def test_resource_info_hash(sample_resource_info):
+ """...."""
+ hash(sample_resource_info)
+
+def test_mapping_info_hash(sample_mapping_info):
+ """...."""
+ hash(sample_mapping_info)
diff --git a/tests/test_json_instances.py b/tests/test_json_instances.py
new file mode 100644
index 0000000..f5bd270
--- /dev/null
+++ b/tests/test_json_instances.py
@@ -0,0 +1,194 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+import re
+
+from hydrilla import json_instances
+from hydrilla.exceptions import HaketiloException
+
+sample_json_no_comments = '{\n"so/me":\n"//json//"\n}\n'
+
+@pytest.mark.parametrize('_in', [
+ '{\n"so/me":\n"//json//"\n}\n',
+ '{//we\n"so/me"://will\n"//json//"//rock\n}//you\n'
+])
+def test_strip_json_comments(_in):
+ """...."""
+ assert json_instances.strip_json_comments(_in) == sample_json_no_comments
+
+@pytest.mark.parametrize('_in, line, char', [
+ ('/{\n"so/me":\n"//json//"\n}\n', 1, 1),
+ ('{\n"so/me":/\n"//json//"\n}/\n', 2, 9),
+ ('{\n"so/me":/ huehue, I am an invalid comment\n"//json//"\n}\n', 2, 9)
+])
+def test_strip_json_comments_bad(_in, line, char):
+ """...."""
+ error_regex = f'^bad_json_comment_line_{line}_char_{char}$'
+ with pytest.raises(HaketiloException, match=error_regex):
+ json_instances.strip_json_comments(_in)
+
+@pytest.mark.parametrize('schema_name, full_schema_name', [
+ ('package_source-1.0.1.schema.json', 'package_source-1.0.1.schema.json'),
+ ('package_source-1.0.schema.json', 'package_source-1.0.1.schema.json'),
+ ('package_source-1.schema.json', 'package_source-1.0.1.schema.json'),
+ ('package_source-2.schema.json', 'package_source-2.schema.json')
+])
+def test_get_schema(schema_name, full_schema_name):
+ """...."""
+ url_prefix = 'https://hydrilla.koszko.org/schemas/'
+
+ for prefix in ('', url_prefix):
+ schema1 = json_instances._get_schema(prefix + schema_name)
+ assert schema1['$id'] == url_prefix + full_schema_name
+
+ schema2 = json_instances._get_schema(prefix + schema_name)
+ assert schema2 is schema1
+
+@pytest.mark.parametrize('_in', ['dummy_uri', {'$id': 'dummy_uri'}])
+def test_validator_for(_in, monkeypatch):
+ """...."""
+ def mocked_get_schema(schema_id):
+ """...."""
+ assert schema_id == 'dummy_uri'
+ return {'$id': 'dummy_uri'}
+
+ monkeypatch.setattr(json_instances, '_get_schema', mocked_get_schema)
+
+ def MockedRefResolver(base_uri, referrer, handlers):
+ """....<function replaces a class...>"""
+ assert base_uri == referrer['$id']
+ assert referrer == {'$id': 'dummy_uri'}
+ assert handlers == {'https': mocked_get_schema}
+ return 'dummy_resolver'
+
+ monkeypatch.setattr(json_instances, 'RefResolver', MockedRefResolver)
+
+ def MockedDraft7Validator(schema, resolver):
+ """....<same as above>"""
+ assert schema == {'$id': 'dummy_uri'}
+ assert resolver == 'dummy_resolver'
+ return 'dummy_validator'
+
+ monkeypatch.setattr(json_instances, 'Draft7Validator',
+ MockedDraft7Validator)
+
+ assert json_instances.validator_for(_in) == 'dummy_validator'
+
+def test_parse_instance(monkeypatch):
+ """...."""
+ def mocked_strip_json_comments(text):
+ """...."""
+ assert text == 'dummy_commented_json'
+ return '{"dummy": 1}'
+
+ monkeypatch.setattr(json_instances, 'strip_json_comments',
+ mocked_strip_json_comments)
+
+ assert json_instances.parse_instance('dummy_commented_json') == {'dummy': 1}
+
+
+def test_read_instance(monkeypatch, tmpdir):
+ """...."""
+ def mocked_parse_instance(text):
+ """...."""
+ assert text == 'dummy_JSON_text'
+ return {'dummy': 1}
+
+ monkeypatch.setattr(json_instances, 'parse_instance', mocked_parse_instance)
+
+ somepath = tmpdir / 'somefile'
+ somepath.write_text('dummy_JSON_text')
+
+ for instance_or_path in (somepath, str(somepath), {'dummy': 1}):
+ assert json_instances.read_instance(instance_or_path) == {'dummy': 1}
+
+def test_read_instance_bad(monkeypatch, tmpdir):
+ """...."""
+ monkeypatch.setattr(json_instances, 'parse_instance', lambda: 3 / 0)
+
+ somepath = tmpdir / 'somefile'
+ somepath.write_text('dummy_JSON_text')
+
+ error_regex = f'^text_in_{re.escape(str(somepath))}_not_valid_json$'
+ with pytest.raises(HaketiloException, match=error_regex):
+ json_instances.read_instance(somepath)
+
+@pytest.mark.parametrize('instance, ver_str', [
+ ({'$schema': 'a_b_c-1.0.1.0.schema.json'}, '1.0.1.0'),
+ ({'$schema': '9-9-9-10.5.600.schema.json'}, '10.5.600'),
+ ({'$schema': 'https://ab.cd-2.schema.json'}, '2')
+])
+def test_get_schema_version(instance, ver_str, monkeypatch):
+ """...."""
+ def mocked_parse_version(_ver_str):
+ """...."""
+ assert _ver_str == ver_str
+ return 'dummy_version'
+
+ monkeypatch.setattr(json_instances, 'parse_version', mocked_parse_version)
+
+ assert json_instances.get_schema_version(instance) == 'dummy_version'
+
+@pytest.mark.parametrize('instance', [
+ {'$schema': 'https://ab.cd-0.schema.json'},
+ {'$schema': 'https://ab.cd-02.schema.json'},
+ {'$schema': 'https://ab.cd-2.00.schema.json'},
+ {'$schema': 'https://ab.cd-2.01.schema.json'},
+ {'$schema': 'https://ab.cd-2.schema.json5'},
+ {'$schema': 'https://ab.cd-2.schema@json'},
+ {'$schema': 'https://ab.cd_2.schema.json'},
+ {'$schema': '2.schema.json'},
+ {'$schema': 'https://ab.cd-.schema.json'},
+ {'$schema': b'https://ab.cd-2.schema.json'},
+ {},
+ 'not dict'
+])
+def test_get_schema_version_bad(instance):
+ """...."""
+ error_regex = '^no_schema_number_in_instance$'
+ with pytest.raises(HaketiloException, match=error_regex):
+ json_instances.get_schema_version(instance)
+
+def test_get_schema_major_number(monkeypatch):
+ """...."""
+ def mocked_get_schema_version(instance):
+ """...."""
+ assert instance == 'dummy_instance'
+ return (3, 4, 6)
+
+ monkeypatch.setattr(json_instances, 'get_schema_version',
+ mocked_get_schema_version)
+
+ assert json_instances.get_schema_major_number('dummy_instance') == 3
+
+def test_validate_instance(monkeypatch):
+ """...."""
+ def mocked_get_schema_major_number(instance):
+ """...."""
+ assert instance == 'dummy_instance'
+ return 4
+
+ monkeypatch.setattr(json_instances, 'get_schema_major_number',
+ mocked_get_schema_major_number)
+
+ class mocked_validator_for:
+ """....<class instead of function>"""
+ def __init__(self, schema_name):
+ """...."""
+ assert schema_name == 'https://ab.cd/something-4.schema.json'
+
+ def validate(self, instance):
+ """...."""
+ assert instance == 'dummy_instance'
+
+ monkeypatch.setattr(json_instances, 'validator_for', mocked_validator_for)
+
+ schema_name_fmt = 'https://ab.cd/something-{}.schema.json'
+ assert json_instances.validate_instance(
+ 'dummy_instance',
+ schema_name_fmt
+ ) == 4
diff --git a/tests/test_local_apt.py b/tests/test_local_apt.py
new file mode 100644
index 0000000..9122408
--- /dev/null
+++ b/tests/test_local_apt.py
@@ -0,0 +1,754 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+import tempfile
+import re
+import json
+from pathlib import Path, PurePosixPath
+from zipfile import ZipFile
+from tempfile import TemporaryDirectory
+
+from hydrilla.builder import local_apt
+from hydrilla.builder.common_errors import *
+
+here = Path(__file__).resolve().parent
+
+from .helpers import *
+
+@pytest.fixture
+def mock_cache_dir(monkeypatch):
+ """Make local_apt.py cache files to a temporary directory."""
+ with tempfile.TemporaryDirectory() as td:
+ td_path = Path(td)
+ monkeypatch.setattr(local_apt, 'default_apt_cache_dir', td_path)
+ yield td_path
+
+@pytest.fixture
+def mock_gnupg_import(monkeypatch, mock_cache_dir):
+ """Mock gnupg library when imported dynamically."""
+
+ gnupg_mock_dir = mock_cache_dir / 'gnupg_mock'
+ gnupg_mock_dir.mkdir()
+ (gnupg_mock_dir / 'gnupg.py').write_text('GPG = None\n')
+
+ monkeypatch.syspath_prepend(str(gnupg_mock_dir))
+
+ import gnupg
+
+ keyring_path = mock_cache_dir / 'master_keyring.gpg'
+
+ class MockedImportResult:
+ """gnupg.ImportResult replacement"""
+ def __init__(self):
+ """Initialize MockedImportResult object."""
+ self.imported = 1
+
+ class MockedGPG:
+ """GPG replacement that does not really invoke GPG."""
+ def __init__(self, keyring):
+ """Verify the keyring path and initialize MockedGPG."""
+ assert keyring == str(keyring_path)
+
+ self.known_keys = {*keyring_path.read_text().split('\n')} \
+ if keyring_path.exists() else set()
+
+ def recv_keys(self, keyserver, key):
+ """Mock key receiving - record requested key as received."""
+ assert keyserver == local_apt.default_keyserver
+ assert key not in self.known_keys
+
+ self.known_keys.add(key)
+ keyring_path.write_text('\n'.join(self.known_keys))
+
+ return MockedImportResult()
+
+ def list_keys(self, keys=None):
+ """Mock key listing - return a list with dummy items."""
+ if keys is None:
+ return ['dummy'] * len(self.known_keys)
+ else:
+ return ['dummy' for k in keys if k in self.known_keys]
+
+ def export_keys(self, keys, **kwargs):
+ """
+ Mock key export - check that the call has the expected arguments and
+ return a dummy bytes array.
+ """
+ assert kwargs['armor'] == False
+ assert kwargs['minimal'] == True
+ assert {*keys} == self.known_keys
+
+ return b'<dummy keys export>'
+
+ monkeypatch.setattr(gnupg, 'GPG', MockedGPG)
+
+def process_run_args(command, kwargs, expected_command):
+ """
+ Perform assertions common to all mocked subprocess.run() invocations and
+ extract variable parts of the command line (if any).
+ """
+ assert kwargs['env'] == {'LANG': 'en_US'}
+ assert kwargs['capture_output'] == True
+
+ return process_command(command, expected_command)
+
+def run_apt_get_update(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get update' command just touch some file in apt
+ root to indicate that the call was made.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'update']
+ conf_path = Path(process_run_args(command, kwargs, expected)['conf_path'])
+
+ (conf_path.parent / 'update_called').touch()
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+"""
+Output of 'apt-get install --yes --just-print libjs-mathjax' on some APT-based
+system.
+"""
+sample_install_stdout = '''\
+NOTE: This is only a simulation!
+ apt-get needs root privileges for real execution.
+ Keep also in mind that locking is deactivated,
+ so don't depend on the relevance to the real current situation!
+Reading package lists...
+Building dependency tree...
+Reading state information...
+The following additional packages will be installed:
+ fonts-mathjax
+Suggested packages:
+ fonts-mathjax-extras fonts-stix libjs-mathjax-doc
+The following NEW packages will be installed:
+ fonts-mathjax libjs-mathjax
+0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded.
+Inst fonts-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+Inst libjs-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+Conf fonts-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+Conf libjs-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+'''
+
+def run_apt_get_install(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get install' command just print a possible
+ output of one.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'install',
+ '--yes', '--just-print', 'libjs-mathjax']
+
+ conf_path = Path(process_run_args(command, kwargs, expected)['conf_path'])
+
+ return MockedCompletedProcess(command, returncode,
+ stdout=sample_install_stdout,
+ text_output=kwargs.get('text'))
+
+def run_apt_get_download(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get download' command just write some dummy
+ .deb to the appropriate directory.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'download']
+ if 'libjs-mathjax' in command:
+ expected.append('libjs-mathjax')
+ else:
+ expected.append('fonts-mathjax=0:2.7.9+dfsg-1')
+ expected.append('libjs-mathjax=0:2.7.9+dfsg-1')
+
+ conf_path = Path(process_run_args(command, kwargs, expected)['conf_path'])
+
+ destination = Path(kwargs.get('cwd') or Path.cwd())
+
+ package_name_regex = re.compile(r'^[^=]+-mathjax')
+
+ for word in expected:
+ match = package_name_regex.match(word)
+ if match:
+ filename = f'{match.group(0)}_0%3a2.7.9+dfsg-1_all.deb'
+ deb_path = destination / filename
+ deb_path.write_text(f'dummy {deb_path.name}')
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+def run_apt_get_source(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get source' command just write some dummy
+ "tarballs" to the appropriate directory.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'source',
+ '--download-only', 'libjs-mathjax=0:2.7.9+dfsg-1']
+ if 'fonts-mathjax=0:2.7.9+dfsg-1' in command:
+ if command[-1] == 'fonts-mathjax=0:2.7.9+dfsg-1':
+ expected.append('fonts-mathjax=0:2.7.9+dfsg-1')
+ else:
+ expected.insert(-1, 'fonts-mathjax=0:2.7.9+dfsg-1')
+
+ destination = Path(kwargs.get('cwd') or Path.cwd())
+ for filename in [
+ 'mathjax_2.7.9+dfsg-1.debian.tar.xz',
+ 'mathjax_2.7.9+dfsg-1.dsc',
+ 'mathjax_2.7.9+dfsg.orig.tar.xz'
+ ]:
+ (destination / filename).write_text(f'dummy {filename}')
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+def make_run_apt_get(**returncodes):
+ """
+ Produce a function that chooses and runs the appropriate one of
+ subprocess_run_apt_get_*() mock functions.
+ """
+ def mock_run(command, **kwargs):
+ """
+ Chooses and runs the appropriate one of subprocess_run_apt_get_*() mock
+ functions.
+ """
+ for subcommand, run in [
+ ('update', run_apt_get_update),
+ ('install', run_apt_get_install),
+ ('download', run_apt_get_download),
+ ('source', run_apt_get_source)
+ ]:
+ if subcommand in command:
+ returncode = returncodes.get(f'{subcommand}_code', 0)
+ return run(command, returncode, **kwargs)
+
+ raise Exception('Unknown command: {}'.format(' '.join(command)))
+
+ return mock_run
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get())
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_contextmanager(mock_cache_dir):
+ """
+ Verify that the local_apt() function creates a proper apt environment and
+ that it also properly restores it from cache.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ apt_root = Path(apt.apt_conf).parent.parent
+
+ assert (apt_root / 'etc' / 'trusted.gpg').read_bytes() == \
+ b'<dummy keys export>'
+
+ assert (apt_root / 'etc' / 'update_called').exists()
+
+ assert (apt_root / 'etc' / 'apt.sources.list').read_text() == \
+ 'deb-src sth\ndeb sth'
+
+ conf_lines = (apt_root / 'etc' / 'apt.conf').read_text().split('\n')
+
+ # check mocked keyring
+ assert {*local_apt.default_keys} == \
+ {*(mock_cache_dir / 'master_keyring.gpg').read_text().split('\n')}
+
+ assert not apt_root.exists()
+
+ expected_conf = {
+ 'Architecture': 'amd64',
+ 'Dir': str(apt_root),
+ 'Dir::State': f'{apt_root}/var/lib/apt',
+ 'Dir::State::status': f'{apt_root}/var/lib/dpkg/status',
+ 'Dir::Etc::SourceList': f'{apt_root}/etc/apt.sources.list',
+ 'Dir::Etc::SourceParts': '',
+ 'Dir::Cache': f'{apt_root}/var/cache/apt',
+ 'pkgCacheGen::Essential': 'none',
+ 'Dir::Etc::Trusted': f'{apt_root}/etc/trusted.gpg',
+ }
+
+ conf_regex = re.compile(r'^(?P<key>\S+)\s"(?P<val>\S*)";$')
+ assert dict([(m.group('key'), m.group('val'))
+ for l in conf_lines if l for m in [conf_regex.match(l)]]) == \
+ expected_conf
+
+ with ZipFile(mock_cache_dir / f'apt_{sources_list.identity()}.zip') as zf:
+ # reuse the same APT, its cached zip file should exist now
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ apt_root = Path(apt.apt_conf).parent.parent
+
+ expected_members = {*apt_root.rglob('*')}
+ expected_members.remove(apt_root / 'etc' / 'apt.conf')
+ expected_members.remove(apt_root / 'etc' / 'trusted.gpg')
+
+ names = zf.namelist()
+ assert len(names) == len(expected_members)
+
+ for name in names:
+ path = apt_root / name
+ assert path in expected_members
+ assert zf.read(name) == \
+ (b'' if path.is_dir() else path.read_bytes())
+
+ assert not apt_root.exists()
+
+@pytest.mark.subprocess_run(local_apt, run_missing_executable)
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_missing(mock_cache_dir):
+ """
+ Verify that the local_apt() function raises a proper error when 'apt-get'
+ command is missing.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+
+ with pytest.raises(local_apt.AptError,
+ match='^couldnt_execute_apt-get_is_it_installed$'):
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ pass
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(update_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_update_fail(mock_cache_dir):
+ """
+ Verify that the local_apt() function raises a proper error when
+ 'apt-get update' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+
+ error_regex = """^\
+command_apt-get -c \\S+ update_failed
+
+STDOUT_OUTPUT_heading
+
+some output
+
+STDERR_OUTPUT_heading
+
+some error output\
+$\
+"""
+
+ with pytest.raises(local_apt.AptError, match=error_regex):
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ pass
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get())
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_download(mock_cache_dir):
+ """
+ Verify that download_apt_packages() function properly performs the download
+ of .debs and sources.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination, False)
+
+ libjs_mathjax_path = destination / 'libjs-mathjax_0%3a2.7.9+dfsg-1_all.deb'
+ fonts_mathjax_path = destination / 'fonts-mathjax_0%3a2.7.9+dfsg-1_all.deb'
+
+ source_paths = [
+ destination / 'mathjax_2.7.9+dfsg-1.debian.tar.xz',
+ destination / 'mathjax_2.7.9+dfsg-1.dsc',
+ destination / 'mathjax_2.7.9+dfsg.orig.tar.xz'
+ ]
+
+ assert {*destination.iterdir()} == {libjs_mathjax_path, *source_paths}
+
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination,
+ with_deps=True)
+
+ assert {*destination.iterdir()} == \
+ {libjs_mathjax_path, fonts_mathjax_path, *source_paths}
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(install_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_install_fail(mock_cache_dir):
+ """
+ Verify that the download_apt_packages() function raises a proper error when
+ 'apt-get install' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ error_regex = f"""^\
+command_apt-get -c \\S+ install --yes --just-print libjs-mathjax_failed
+
+STDOUT_OUTPUT_heading
+
+{re.escape(sample_install_stdout)}
+
+STDERR_OUTPUT_heading
+
+some error output\
+$\
+"""
+
+ with pytest.raises(local_apt.AptError, match=error_regex):
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination,
+ with_deps=True)
+
+ assert [*destination.iterdir()] == []
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(download_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_download_fail(mock_cache_dir):
+ """
+ Verify that the download_apt_packages() function raises a proper error when
+ 'apt-get download' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ error_regex = """^\
+command_apt-get -c \\S+ download libjs-mathjax_failed
+
+STDOUT_OUTPUT_heading
+
+some output
+
+STDERR_OUTPUT_heading
+
+some error output\
+$\
+"""
+
+ with pytest.raises(local_apt.AptError, match=error_regex):
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination, False)
+
+ assert [*destination.iterdir()] == []
+
+@pytest.fixture
+def mock_bad_deb_file(monkeypatch, mock_subprocess_run):
+ """
+ Make mocked 'apt-get download' command produce an incorrectly-named file.
+ """
+ old_run = local_apt.subprocess.run
+
+ def twice_mocked_run(command, **kwargs):
+ """
+ Create an evil file if needed; then act just like the run() function
+ that got replaced by this one.
+ """
+ if 'download' in command:
+ destination = Path(kwargs.get('cwd') or Path.cwd())
+ (destination / 'arbitrary-name').write_text('anything')
+
+ return old_run(command, **kwargs)
+
+ monkeypatch.setattr(local_apt.subprocess, 'run', twice_mocked_run)
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get())
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import',
+ 'mock_bad_deb_file')
+def test_local_apt_download_bad_filename(mock_cache_dir):
+ """
+ Verify that the download_apt_packages() function raises a proper error when
+ 'apt-get download' command produces an incorrectly-named file.
+ """
+ sources_list = local_apt.SourcesList([], 'nabia')
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ error_regex = """^\
+apt_download_gave_bad_filename_arbitrary-name
+
+STDOUT_OUTPUT_heading
+
+some output
+
+STDERR_OUTPUT_heading
+
+some error output\
+$\
+"""
+
+ with pytest.raises(local_apt.AptError, match=error_regex):
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination, False)
+
+ assert [*destination.iterdir()] == []
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(source_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_source_fail(mock_cache_dir):
+ """
+ Verify that the download_apt_packages() function raises a proper error when
+ 'apt-get source' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ error_regex = """^\
+command_apt-get -c \\S* source --download-only \\S+_failed
+
+STDOUT_OUTPUT_heading
+
+some output
+
+STDERR_OUTPUT_heading
+
+some error output\
+$\
+"""
+
+ with pytest.raises(local_apt.AptError, match=error_regex):
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination, False)
+
+ assert [*destination.iterdir()] == []
+
+def test_sources_list():
+ """Verify that the SourcesList class works properly."""
+ list = local_apt.SourcesList([], 'nabia')
+ assert list.identity() == 'nabia'
+
+ with pytest.raises(local_apt.DistroError, match='^distro_nabiał_unknown$'):
+ local_apt.SourcesList([], 'nabiał')
+
+ list = local_apt.SourcesList(['deb sth', 'deb-src sth'], 'nabia')
+ assert list.identity() == \
+ 'ef28d408b96046eae45c8ab3094ce69b2ac0c02a887e796b1d3d1a4f06fb49f1'
+
+def run_dpkg_deb(command, returncode=0, **kwargs):
+ """
+ Insted of running an 'dpkg-deb -x' command just create some dummy file
+ in the destination directory.
+ """
+ expected = ['dpkg-deb', '-x', '<deb_path>', '<dst_path>']
+
+ variables = process_run_args(command, kwargs, expected)
+ deb_path = Path(variables['deb_path'])
+ dst_path = Path(variables['dst_path'])
+
+ package_name = re.match('^([^_]+)_.*', deb_path.name).group(1)
+ for path in [
+ dst_path / 'etc' / f'dummy_{package_name}_config',
+ dst_path / 'usr/share/doc' / package_name / 'copyright'
+ ]:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_text(f'dummy {path.name}')
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+def download_apt_packages(list, keys, packages, destination_dir,
+ with_deps=False):
+ """
+ Replacement for download_apt_packages() function in local_apt.py, for
+ unit-testing the piggybacked_system() function.
+ """
+ for path in [
+ destination_dir / 'some-bin-package_1.1-2_all.deb',
+ destination_dir / 'another-package_1.1-2_all.deb',
+ destination_dir / 'some-source-package_1.1.orig.tar.gz',
+ destination_dir / 'some-source-package_1.1-1.dsc'
+ ]:
+ path.write_text(f'dummy {path.name}')
+
+ with open(destination_dir / 'test_data.json', 'w') as out:
+ json.dump({
+ 'list_identity': list.identity(),
+ 'keys': keys,
+ 'packages': packages,
+ 'with_deps': with_deps
+ }, out)
+
+@pytest.fixture
+def mock_download_packages(monkeypatch):
+ """Mock the download_apt_packages() function in local_apt.py."""
+ monkeypatch.setattr(local_apt, 'download_apt_packages',
+ download_apt_packages)
+
+@pytest.mark.subprocess_run(local_apt, run_dpkg_deb)
+@pytest.mark.parametrize('params', [
+ {
+ 'with_deps': False,
+ 'base_depends': True,
+ 'identity': 'nabia',
+ 'props': {'distribution': 'nabia', 'dependencies': False},
+ 'all_keys': local_apt.default_keys,
+ 'prepared_directory': False
+ },
+ {
+ 'with_deps': True,
+ 'base_depends': False,
+ 'identity': '38db0b4fa2f6610cd1398b66a2c05d9abb1285f9a055a96eb96dee0f6b72aca8',
+ 'props': {
+ 'sources_list': [f'deb{suf} http://example.com/ stable main'
+ for suf in ('', '-src')],
+ 'trusted_keys': ['AB' * 20],
+ 'dependencies': True,
+ 'depend_on_base_packages': False
+ },
+ 'all_keys': [*local_apt.default_keys, 'AB' * 20],
+ 'prepared_directory': True
+ }
+])
+@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run')
+def test_piggybacked_system_download(params, tmpdir):
+ """
+ Verify that the piggybacked_system() function properly downloads and unpacks
+ APT packages.
+ """
+ foreign_packages_dir = tmpdir if params['prepared_directory'] else None
+
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ **params['props'],
+ 'packages': ['some-bin-package', 'another-package=1.1-2']
+ }, foreign_packages_dir) as piggybacked:
+ expected_depends = [{'identifier': 'apt-common-licenses'}] \
+ if params['base_depends'] else []
+ assert piggybacked.resource_must_depend == expected_depends
+
+ archive_files = dict(piggybacked.archive_files())
+
+ archive_names = [
+ 'some-bin-package_1.1-2_all.deb',
+ 'another-package_1.1-2_all.deb',
+ 'some-source-package_1.1.orig.tar.gz',
+ 'some-source-package_1.1-1.dsc',
+ 'test_data.json'
+ ]
+ assert {*archive_files.keys()} == \
+ {PurePosixPath('apt') / n for n in archive_names}
+
+ for path in archive_files.values():
+ if path.name == 'test_data.json':
+ assert json.loads(path.read_text()) == {
+ 'list_identity': params['identity'],
+ 'keys': params['all_keys'],
+ 'packages': ['some-bin-package', 'another-package=1.1-2'],
+ 'with_deps': params['with_deps']
+ }
+ else:
+ assert path.read_text() == f'dummy {path.name}'
+
+ if foreign_packages_dir is not None:
+ assert path.parent == foreign_packages_dir / 'apt'
+
+ license_files = {*piggybacked.package_license_files}
+
+ assert license_files == {
+ PurePosixPath('.apt-root/usr/share/doc/another-package/copyright'),
+ PurePosixPath('.apt-root/usr/share/doc/some-bin-package/copyright')
+ }
+
+ assert ['dummy copyright'] * 2 == \
+ [piggybacked.resolve_file(p).read_text() for p in license_files]
+
+ for name in ['some-bin-package', 'another-package']:
+ path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config')
+ assert piggybacked.resolve_file(path).read_text() == \
+ f'dummy {path.name}'
+
+ assert piggybacked.resolve_file(PurePosixPath('a/b/c')) == None
+ assert piggybacked.resolve_file(PurePosixPath('')) == None
+
+ output_text = 'loading_.apt-root/a/../../../b_outside_piggybacked_dir'
+ with pytest.raises(FileReferenceError,
+ match=f'^{re.escape(output_text)}$'):
+ piggybacked.resolve_file(PurePosixPath('.apt-root/a/../../../b'))
+
+ root = piggybacked.resolve_file(PurePosixPath('.apt-root/dummy')).parent
+ assert root.is_dir()
+
+ assert not root.exists()
+
+ if foreign_packages_dir:
+ assert [*tmpdir.iterdir()] == [tmpdir / 'apt']
+
+@pytest.mark.subprocess_run(local_apt, run_dpkg_deb)
+@pytest.mark.usefixtures('mock_subprocess_run')
+def test_piggybacked_system_no_download():
+ """
+ Verify that the piggybacked_system() function is able to use pre-downloaded
+ APT packages.
+ """
+ archive_names = {
+ f'{package}{rest}'
+ for package in ('some-lib_1:2.3', 'other-lib_4.45.2')
+ for rest in ('-1_all.deb', '.orig.tar.gz', '-1.debian.tar.xz', '-1.dsc')
+ }
+
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ (td / 'apt').mkdir()
+ for name in archive_names:
+ (td / 'apt' / name).write_text(f'dummy {name}')
+
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'dependencies': True,
+ 'packages': ['whatever', 'whatever2']
+ }, td) as piggybacked:
+ archive_files = dict(piggybacked.archive_files())
+
+ assert {*archive_files.keys()} == \
+ {PurePosixPath('apt') / name for name in archive_names}
+
+ for path in archive_files.values():
+ assert path.read_text() == f'dummy {path.name}'
+
+ assert {*piggybacked.package_license_files} == {
+ PurePosixPath('.apt-root/usr/share/doc/some-lib/copyright'),
+ PurePosixPath('.apt-root/usr/share/doc/other-lib/copyright')
+ }
+
+ for name in ['some-lib', 'other-lib']:
+ path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config')
+ assert piggybacked.resolve_file(path).read_text() == \
+ f'dummy {path.name}'
+
+@pytest.mark.subprocess_run(local_apt, run_missing_executable)
+@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run')
+def test_piggybacked_system_missing():
+ """
+ Verify that the piggybacked_system() function raises a proper error when
+ 'dpkg-deb' is missing.
+ """
+ with pytest.raises(local_apt.AptError,
+ match='^couldnt_execute_dpkg-deb_is_it_installed$'):
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['some-package'],
+ 'dependencies': False
+ }, None) as piggybacked:
+ pass
+
+@pytest.mark.subprocess_run(local_apt, lambda c, **kw: run_dpkg_deb(c, 1, **kw))
+@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run')
+def test_piggybacked_system_fail():
+ """
+ Verify that the piggybacked_system() function raises a proper error when
+ 'dpkg-deb -x' command returns non-0.
+ """
+ error_regex = """^\
+command_dpkg-deb -x \\S+\\.deb \\S+_failed
+
+STDOUT_OUTPUT_heading
+
+some output
+
+STDERR_OUTPUT_heading
+
+some error output\
+$\
+"""
+
+ with pytest.raises(local_apt.AptError, match=error_regex):
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['some-package'],
+ 'dependencies': False
+ }, None) as piggybacked:
+ pass
diff --git a/tests/test_pattern_tree.py b/tests/test_pattern_tree.py
new file mode 100644
index 0000000..4238d66
--- /dev/null
+++ b/tests/test_pattern_tree.py
@@ -0,0 +1,454 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+import re
+import dataclasses as dc
+
+from immutables import Map
+
+from hydrilla import pattern_tree
+
+from .url_patterns_common import *
+
+@pytest.mark.parametrize('_in, out', [
+ (Map(), True),
+ ({'children': Map(non_empty='non_emtpy')}, False),
+ ({'literal_match': 'non-None'}, False),
+ ({'children': Map(non_empty='non_emtpy')}, False),
+ ({'literal_match': 'non-None', 'children': 'non-empty'}, False)
+])
+def test_pattern_tree_node_is_empty(_in, out):
+ """...."""
+ assert pattern_tree.PatternTreeNode(**_in).is_empty() == out
+
+def test_pattern_tree_node_update_literal_match():
+ """...."""
+ node1 = pattern_tree.PatternTreeNode()
+ node2 = node1.update_literal_match('dummy match item')
+
+ assert node1.literal_match is None
+ assert node2.literal_match == 'dummy match item'
+
+def test_pattern_tree_node_get_child():
+ """...."""
+ node = pattern_tree.PatternTreeNode(children=Map(dummy_key='dummy_val'))
+
+ assert node.get_child('dummy_key') == 'dummy_val'
+ assert node.get_child('other_key') is None
+
+def test_pattern_tree_node_remove_child():
+ """...."""
+ node1 = pattern_tree.PatternTreeNode(children=Map(dummy_key='dummy_val'))
+ node2 = node1.remove_child('dummy_key')
+
+ assert node1.children == Map(dummy_key='dummy_val')
+ assert node2.children == Map()
+
+def test_pattern_tree_node_set_child():
+ """...."""
+ node1 = pattern_tree.PatternTreeNode(children=Map(dummy_key='dummy_val'))
+ node2 = node1.set_child('other_key', 'other_val')
+
+ assert node1.children == Map(dummy_key='dummy_val')
+ assert node2.children == Map(dummy_key='dummy_val', other_key='other_val')
+
+@pytest.mark.parametrize('root_empty', [True, False])
+def test_pattern_tree_branch_is_empty(root_empty):
+ """...."""
+ class DummyEmptyRoot:
+ """...."""
+ is_empty = lambda: root_empty
+
+ branch = pattern_tree.PatternTreeBranch(root_node=DummyEmptyRoot)
+ assert branch.is_empty() == root_empty
+
+# def test_pattern_tree_branch_copy():
+# """...."""
+# class DummyRoot:
+# """...."""
+# pass
+
+# branch1 = pattern_tree.PatternTreeBranch(root_node=DummyRoot)
+# branch2 = branch1.copy()
+
+# assert branch1 is not branch2
+# for val_b1, val_b2 in zip(dc.astuple(branch1), dc.astuple(branch2)):
+# assert val_b1 is val_b2
+
+@pytest.fixture
+def empty_branch():
+ """...."""
+ return pattern_tree.PatternTreeBranch(
+ root_node = pattern_tree.PatternTreeNode()
+ )
+
+@pytest.fixture
+def branch_with_a_b():
+ """...."""
+ return pattern_tree.PatternTreeBranch(
+ root_node = pattern_tree.PatternTreeNode(
+ children = Map(
+ a = pattern_tree.PatternTreeNode(
+ children = Map(
+ b = pattern_tree.PatternTreeNode(
+ literal_match = frozenset({'myitem'})
+ )
+ )
+ )
+ )
+ )
+ )
+
+def test_pattern_tree_branch_update_add_first(empty_branch, branch_with_a_b):
+ """...."""
+ updated_branch = empty_branch.update(
+ ['a', 'b'],
+ lambda s: frozenset({*(s or []), 'myitem'})
+ )
+
+ assert updated_branch == branch_with_a_b
+ assert empty_branch.root_node.children == Map()
+
+def test_pattern_tree_branch_update_add_second(branch_with_a_b):
+ """...."""
+ updated_branch = branch_with_a_b.update(
+ ['a', 'b'],
+ lambda s: frozenset({*(s or []), 'myotheritem'})
+ )
+
+ leaf_node = updated_branch.root_node.children['a'].children['b']
+ assert leaf_node.literal_match == frozenset({'myitem', 'myotheritem'})
+
+def test_pattern_tree_branch_update_add_different_path(branch_with_a_b):
+ """...."""
+ updated_branch = branch_with_a_b.update(
+ ['a', 'not_b'],
+ lambda s: frozenset({*(s or []), 'myotheritem'})
+ )
+
+ for segment, item in [('b', 'myitem'), ('not_b', 'myotheritem')]:
+ leaf_node = updated_branch.root_node.children['a'].children[segment]
+ assert leaf_node.literal_match == frozenset({item})
+
+# def test_pattern_tree_branch_update_is_value_copied(branch_with_a_b):
+# """...."""
+# updated_branch = branch_with_a_b.update(['a', 'b'], lambda s: s)
+
+# leaf_node_orig = updated_branch.root_node.children['a'].children['b']
+# leaf_node_new = branch_with_a_b.root_node.children['a'].children['b']
+
+# assert leaf_node_orig.literal_match == leaf_node_new.literal_match
+# assert leaf_node_orig.literal_match is not leaf_node_new.literal_match
+
+def test_pattern_tree_branch_remove(branch_with_a_b, empty_branch):
+ """...."""
+ updated_branch = branch_with_a_b.update(['a', 'b'], lambda s: None)
+
+ assert updated_branch == empty_branch
+
+def test_pattern_tree_branch_search_empty(empty_branch):
+ """...."""
+ assert [*empty_branch.search(['a', 'b'])] == []
+
+@pytest.fixture
+def branch_with_wildcards():
+ """...."""
+ return pattern_tree.PatternTreeBranch(
+ root_node = pattern_tree.PatternTreeNode(
+ children = Map(
+ a = pattern_tree.PatternTreeNode(
+ children = Map(
+ b = pattern_tree.PatternTreeNode(
+ children = Map({
+ 'c': pattern_tree.PatternTreeNode(
+ literal_match = 'dummy/c'
+ ),
+ '*': pattern_tree.PatternTreeNode(
+ literal_match = 'dummy/*'
+ ),
+ '**': pattern_tree.PatternTreeNode(
+ literal_match = 'dummy/**'
+ ),
+ '***': pattern_tree.PatternTreeNode(
+ literal_match = 'dummy/***'
+ )
+ })
+ )
+ )
+ )
+ )
+ )
+ )
+
+@pytest.mark.parametrize('_in, out', [
+ (['a'], []),
+ (['a', 'x', 'y', 'z'], []),
+ (['a', 'b'], ['dummy/***']),
+ (['a', 'b', 'c'], ['dummy/c', 'dummy/*', 'dummy/***']),
+ (['a', 'b', 'u'], ['dummy/*', 'dummy/***']),
+ (['a', 'b', '*'], ['dummy/*', 'dummy/***']),
+ (['a', 'b', '**'], ['dummy/**', 'dummy/*', 'dummy/***']),
+ (['a', 'b', '***'], ['dummy/***', 'dummy/*']),
+ (['a', 'b', 'u', 'l'], ['dummy/**', 'dummy/***']),
+ (['a', 'b', 'u', 'l', 'y'], ['dummy/**', 'dummy/***'])
+])
+def test_pattern_tree_branch_search_wildcards(_in, out, branch_with_wildcards):
+ """...."""
+ assert [*branch_with_wildcards.search(_in)] == out
+
+def test_filter_by_trailing_slash(sample_url_parsed):
+ """...."""
+ sample_url_parsed2 = dc.replace(sample_url_parsed, has_trailing_slash=True)
+ item1 = pattern_tree.StoredTreeItem(sample_url_parsed, 'dummy_it1')
+ item2 = pattern_tree.StoredTreeItem(sample_url_parsed2, 'dummy_it2')
+
+ assert pattern_tree.filter_by_trailing_slash((item1, item2), False) == \
+ frozenset({item1})
+
+ assert pattern_tree.filter_by_trailing_slash((item1, item2), True) == \
+ frozenset({item2})
+
+@pytest.mark.parametrize('register_mode', [True, False])
+@pytest.mark.parametrize('empty_at_start', [True, False])
+@pytest.mark.parametrize('empty_at_end', [True, False])
+def test_pattern_tree_privatemethod_register(
+ register_mode,
+ empty_at_start,
+ empty_at_end,
+ monkeypatch,
+ sample_url_parsed
+):
+ """...."""
+ dummy_it = pattern_tree.StoredTreeItem(sample_url_parsed, 'dummy_it')
+ other_dummy_it = pattern_tree.StoredTreeItem(
+ sample_url_parsed,
+ 'other_dummy_it'
+ )
+
+ class MockedTreeBranch:
+ """...."""
+ def is_empty(self):
+ """...."""
+ return empty_at_end
+
+ def update(self, segments, item_updater):
+ """...."""
+ if segments == ('com', 'example'):
+ return self._update_as_domain_branch(item_updater)
+ else:
+ assert segments == ('aa', 'bb')
+ return self._update_as_path_branch(item_updater)
+
+ def _update_as_domain_branch(self, item_updater):
+ """...."""
+ for updater_input in (None, MockedTreeBranch()):
+ updated = item_updater(updater_input)
+ if empty_at_end:
+ assert updated is None
+ else:
+ assert type(updated) is MockedTreeBranch
+
+ return MockedTreeBranch()
+
+ def _update_as_path_branch(self, item_updater):
+ """...."""
+ set_with_1_item = frozenset()
+ set_with_2_items = frozenset({dummy_it, other_dummy_it})
+ for updater_input in (None, set_with_1_item, set_with_2_items):
+ updated = item_updater(updater_input)
+ if register_mode:
+ assert dummy_it in updated
+ elif updater_input is set_with_2_items:
+ assert dummy_it not in updated
+ else:
+ assert updated is None
+
+ return MockedTreeBranch()
+
+ monkeypatch.setattr(pattern_tree, 'PatternTreeBranch', MockedTreeBranch)
+
+ initial_root = Map() if empty_at_start else \
+ Map({('http', 80): MockedTreeBranch()})
+
+ tree = pattern_tree.PatternTree(_by_scheme_and_port=initial_root)
+
+ new_tree = tree._register(
+ sample_url_parsed,
+ 'dummy_it',
+ register=register_mode
+ )
+
+ assert new_tree is not tree
+
+ if empty_at_end:
+ assert new_tree._by_scheme_and_port == Map()
+ else:
+ assert len(new_tree._by_scheme_and_port) == 1
+ assert type(new_tree._by_scheme_and_port[('http', 80)]) is \
+ MockedTreeBranch
+
+# @pytest.mark.parametrize('register_mode', [True, False])
+# def test_pattern_tree_privatemethod_register(
+# register_mode,
+# monkeypatch,
+# sample_url_parsed
+# ):
+# """...."""
+# registered_count = 0
+
+# def mocked_parse_pattern(url_pattern):
+# """...."""
+# assert url_pattern == 'dummy_pattern'
+
+# for _ in range(2):
+# yield sample_url_parsed
+
+# monkeypatch.setattr(pattern_tree, 'parse_pattern', mocked_parse_pattern)
+
+# def mocked_reconstruct_url(self):
+# """...."""
+# return 'dummy_reconstructed_pattern'
+
+# monkeypatch.setattr(pattern_tree.ParsedUrl, 'reconstruct_url',
+# mocked_reconstruct_url)
+
+# def mocked_register_with_parsed_pattern(
+# self,
+# parsed_pat,
+# wrapped_item,
+# register=True
+# ):
+# """...."""
+# nonlocal registered_count
+
+# assert parsed_pat is sample_url_parsed
+# assert wrapped_item.pattern == 'dummy_reconstructed_pattern'
+# assert register == register_mode
+
+# registered_count += 1
+
+# return 'dummy_new_tree' if registered_count == 2 else dc.replace(self)
+
+# monkeypatch.setattr(
+# pattern_tree.PatternTree,
+# '_register_with_parsed_pattern',
+# mocked_register_with_parsed_pattern
+# )
+
+# pattern_tree = pattern_tree.PatternTree()
+
+# new_tree = pattern_tree._register(
+# 'dummy_pattern',
+# 'dummy_item',
+# register_mode
+# )
+
+# assert new_tree == 'dummy_new_tree'
+
+@pytest.mark.parametrize('method_name, register_mode', [
+ ('register', True),
+ ('deregister', False)
+])
+def test_pattern_tree_register(method_name, register_mode, monkeypatch):
+ """...."""
+ def mocked_privatemethod_register(self, parsed_pat, item, register=True):
+ """...."""
+ assert (parsed_pat, item, register) == \
+ ('dummy_pattern', 'dummy_url', register_mode)
+
+ return 'dummy_new_tree'
+
+ monkeypatch.setattr(
+ pattern_tree.PatternTree,
+ '_register',
+ mocked_privatemethod_register
+ )
+
+ method = getattr(pattern_tree.PatternTree(), method_name)
+ assert method('dummy_pattern', 'dummy_url') == 'dummy_new_tree'
+
+@pytest.fixture
+def mock_parse_url(monkeypatch, sample_url_parsed):
+ """...."""
+ def mocked_parse_url(url):
+ """...."""
+ assert url == 'dummy_url'
+ return dc.replace(
+ sample_url_parsed,
+ **getattr(mocked_parse_url, 'url_mod', {})
+ )
+
+ monkeypatch.setattr(pattern_tree, 'parse_url', mocked_parse_url)
+
+ return mocked_parse_url
+
+@pytest.mark.usefixtures('mock_parse_url')
+def test_pattern_tree_search_empty(sample_url_parsed):
+ """...."""
+ for url in ('dummy_url', sample_url_parsed):
+ assert [*pattern_tree.PatternTree().search(url)] == []
+
+@pytest.mark.parametrize('url_mod, out', [
+ ({},
+ ['dummy_set_A', 'dummy_set_B', 'dummy_set_C']),
+
+ ({'has_trailing_slash': True},
+ ['dummy_set_A_with_slash', 'dummy_set_A',
+ 'dummy_set_B_with_slash', 'dummy_set_B',
+ 'dummy_set_C_with_slash', 'dummy_set_C'])
+])
+def test_pattern_tree_search(
+ url_mod,
+ out,
+ monkeypatch,
+ sample_url_parsed,
+ mock_parse_url,
+):
+ """...."""
+ mock_parse_url.url_mod = url_mod
+
+ dummy_tree_contents = [
+ ['dummy_set_A', 'dummy_set_B'],
+ [],
+ ['dummy_empty_set'] * 3,
+ ['dummy_set_C']
+ ]
+
+ def mocked_filter_by_trailing_slash(items, with_slash):
+ """...."""
+ if items == 'dummy_empty_set':
+ return frozenset()
+
+ return items + ('_with_slash' if with_slash else '')
+
+ monkeypatch.setattr(pattern_tree, 'filter_by_trailing_slash',
+ mocked_filter_by_trailing_slash)
+
+ class MockedDomainBranch:
+ """...."""
+ def search(self, labels):
+ """...."""
+ assert labels == sample_url_parsed.domain_labels
+
+ for item_sets in dummy_tree_contents:
+ class MockedPathBranch:
+ """...."""
+ def search(self, segments, item_sets=item_sets):
+ """...."""
+ assert segments == sample_url_parsed.path_segments
+
+ for dummy_items_set in item_sets:
+ yield dummy_items_set
+
+ yield MockedPathBranch()
+
+ tree = pattern_tree.PatternTree(
+ _by_scheme_and_port = {('http', 80): MockedDomainBranch()}
+ )
+
+ for url in ('dummy_url', mock_parse_url('dummy_url')):
+ assert [*tree.search(url)] == out
diff --git a/tests/test_server.py b/tests/test_server.py
index 02b9742..854b5f0 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -24,9 +24,6 @@
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
-# Enable using with Python 3.7.
-from __future__ import annotations
-
import pytest
import sys
import shutil
@@ -42,9 +39,9 @@ from flask.testing import FlaskClient
from markupsafe import escape
from werkzeug import Response
-from hydrilla import util as hydrilla_util
+from hydrilla import _version, json_instances
from hydrilla.builder import Build
-from hydrilla.server import config, _version
+from hydrilla.server import config
from hydrilla.server.serve import HydrillaApp
here = Path(__file__).resolve().parent
@@ -125,7 +122,7 @@ def index_json_modification(modify_index_json):
def handle_index_json(setup):
"""Modify index.json before build."""
index_path = setup.source_dir / 'index.json'
- index_json, _ = hydrilla_util.load_instance_from_file(index_path)
+ index_json, _ = json_instances.read_instance(index_path)
index_json = modify_index_json(index_json) or index_json
@@ -193,8 +190,8 @@ def test_get_newest(setup: Setup, item_type: str) -> None:
assert ('uuid' in definition) == (setup is not uuidless_setup)
- hydrilla_util.validator_for(f'api_{item_type}_description-1.0.1.schema.json')\
- .validate(definition)
+ schema_name = f'api_{item_type}_description-1.0.1.schema.json'
+ json_instances.validator_for(schema_name).validate(definition)
@pytest.mark.parametrize('item_type', ['resource', 'mapping'])
def test_get_nonexistent(item_type: str) -> None:
@@ -241,8 +238,8 @@ def test_empty_query() -> None:
'generated_by': expected_generated_by
}
- hydrilla_util.validator_for('api_query_result-1.0.1.schema.json')\
- .validate(response_object)
+ schema_name = 'api_query_result-1.0.1.schema.json'
+ json_instances.validator_for(schema_name).validate(response_object)
def test_query() -> None:
"""
@@ -264,8 +261,8 @@ def test_query() -> None:
'generated_by': expected_generated_by
}
- hydrilla_util.validator_for('api_query_result-1.schema.json')\
- .validate(response_object)
+ schema_name = 'api_query_result-1.schema.json'
+ json_instances.validator_for(schema_name).validate(response_object)
def test_source() -> None:
"""Verify source descriptions are properly served."""
@@ -282,8 +279,8 @@ def test_source() -> None:
response = def_get(f'/source/hello.zip')
assert sha256(response.data).digest().hex() == zipfile_hash
- hydrilla_util.validator_for('api_source_description-1.schema.json')\
- .validate(description)
+ schema_name = 'api_source_description-1.schema.json'
+ json_instances.validator_for(schema_name).validate(description)
def test_missing_source() -> None:
"""Verify requests for nonexistent sources result in 404."""
@@ -292,8 +289,3 @@ def test_missing_source() -> None:
response = def_get(f'/source/nonexistent.zip')
assert response.status_code == 404
-
-def test_normalize_version():
- assert hydrilla_util.normalize_version([4, 5, 3, 0, 0]) == [4, 5, 3]
- assert hydrilla_util.normalize_version([1, 0, 5, 0]) == [1, 0, 5]
- assert hydrilla_util.normalize_version([3, 3]) == [3, 3]
diff --git a/tests/test_url_patterns.py b/tests/test_url_patterns.py
new file mode 100644
index 0000000..c308f18
--- /dev/null
+++ b/tests/test_url_patterns.py
@@ -0,0 +1,188 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+import re
+import dataclasses as dc
+
+from immutables import Map
+
+from hydrilla import url_patterns
+from hydrilla.exceptions import HaketiloException
+
+from .url_patterns_common import *
+
+# @pytest.mark.parametrize('_in, out', [
+# ({}, sample_url_str),
+# ({'path_segments': ()}, 'http://example.com'),
+# ({'has_trailing_slash': True}, 'http://example.com/aa/bb/'),
+# ({'orig_scheme': 'http*'}, 'http*://example.com/aa/bb'),
+# ({'scheme': 'http_sth'}, 'http://example.com/aa/bb'),
+# ({'port': 443}, 'http://example.com:443/aa/bb'),
+
+# ({'path_segments': (),
+# 'has_trailing_slash': True},
+# 'http://example.com/'),
+
+# ({'orig_scheme': 'https',
+# 'scheme': 'https',
+# 'port': 443},
+# 'https://example.com/aa/bb'),
+
+# ({'orig_scheme': 'ftp',
+# 'scheme': 'ftp',
+# 'port': 21},
+# 'ftp://example.com/aa/bb'),
+
+# ({'orig_scheme': 'file',
+# 'scheme': 'file',
+# 'port': None,
+# 'domain_labels': ()},
+# 'file:///aa/bb')
+# ])
+# def test_reconstruct_parsed_url(_in, out, sample_url_parsed):
+# """Test the reconstruct_url() method of ParsedUrl class."""
+# parsed_url = dc.replace(sample_url_parsed, **_in)
+# assert parsed_url.reconstruct_url() == out
+
+@pytest.mark.parametrize('_in, out', [
+ ({'url': sample_url_str}, {}),
+ ({'url': 'http://example.com:80/aa/bb'}, {}),
+ ({'url': 'http://example.com//aa///bb'}, {}),
+ ({'url': 'http://example...com/aa/bb'}, {}),
+ ({'url': 'http://example.com/aa/bb?c=d#ef'}, {}),
+ ({'url': 'http://example.com'}, {'path_segments': ()}),
+ ({'url': 'http://example.com/aa/bb/'}, {'has_trailing_slash': True}),
+ ({'url': 'http://example.com:443/aa/bb'}, {'port': 443}),
+
+ ({'url': 'http://example.com/'},
+ {'path_segments': (),
+ 'has_trailing_slash': True}),
+
+ ({'url': 'http://example.com/aa/bb',
+ 'is_pattern': True,
+ 'orig_url': 'http*://example.com/aa/bb/'},
+ {}),
+
+ ({'url': 'https://example.com/aa/bb'},
+ {'scheme': 'https',
+ 'port': 443}),
+
+ ({'url': 'ftp://example.com/aa/bb'},
+ {'scheme': 'ftp',
+ 'port': 21}),
+
+ ({'url': 'file:///aa/bb'},
+ {'scheme': 'file',
+ 'port': None,
+ 'domain_labels': ()})
+])
+def test_parse_pattern_or_url(_in, out, sample_url_parsed):
+ """Test normal use (no errors) of the _parse_pattern_or_url() function."""
+ if 'orig_url' not in _in:
+ _in = {**_in, 'orig_url': _in['url']}
+
+ out = {**out, 'orig_url': _in['orig_url']}
+
+ parsed_url = url_patterns._parse_pattern_or_url(**_in)
+ assert parsed_url == dc.replace(sample_url_parsed, **out)
+
+@pytest.mark.parametrize('_in, err', [
+ ({'url': 'file://:78/unexpected/port'}, 'err.url_{}.bad'),
+ ({'url': 'file://unexpected.hostname/'}, 'err.url_{}.bad'),
+ ({'url': 'http:///no/hostname'}, 'err.url_{}.bad'),
+ ({'url': 'invalid?://example.com'}, 'err.url_{}.bad'),
+ ({'url': 'invalid?://example.com',
+ 'orig_url': 'invalid?://example.com',
+ 'is_pattern': True},
+ 'err.url_pattern_{}.bad'),
+
+ ({'url': 'unknown://example.com'}, 'err.url_{}.bad_scheme'),
+ ({'url': 'unknown://example.com',
+ 'orig_url': 'unknown://example.com',
+ 'is_pattern': True},
+ 'err.url_pattern_{}.bad_scheme'),
+
+ ({'url': 'http://example.com:80',
+ 'orig_url': 'http*://example.com:80',
+ 'is_pattern': True},
+ 'err.url_pattern_{}.special_scheme_port'),
+
+ ({'url': 'http://example.com:65536'}, 'err.url_{}.bad_port'),
+ ({'url': 'http://example.com:0'}, 'err.url_{}.bad_port'),
+ ({'url': 'http://example.com:65537',
+ 'orig_url': 'http://example.com:65537',
+ 'is_pattern': True},
+ 'err.url_pattern_{}.bad_port'),
+
+ ({'url': 'http://example.com/?a=b',
+ 'orig_url': 'http://example.com/?a=b',
+ 'is_pattern': True},
+ 'err.url_pattern_{}.has_query'),
+
+ ({'url': 'http://example.com/#abc',
+ 'orig_url': 'http://example.com/#abc',
+ 'is_pattern': True},
+ 'err.url_pattern_{}.has_frag')
+])
+def test_parse_pattern_or_url_err(_in, err, sample_url_parsed):
+ """Test error conditions of the _parse_pattern_or_url() function."""
+ if 'orig_url' not in _in:
+ _in = {**_in, 'orig_url': _in['url']}
+
+ err_url = _in['orig_url']
+ err_regex = err.format(re.escape(err_url))
+
+ with pytest.raises(HaketiloException, match=f'^{err_regex}$'):
+ url_patterns._parse_pattern_or_url(**_in)
+
+def test_parse_pattern_or_url_different_urls():
+ """
+ Verify the _parse_pattern_or_url() function allows passed URLs to be
+ different only when parsing a pattern.
+ """
+ urls = [sample_url_str, sample_url_str.replace('http', 'http*')]
+
+ url_patterns._parse_pattern_or_url(*urls, is_pattern=True)
+
+ with pytest.raises(AssertionError):
+ url_patterns._parse_pattern_or_url(*urls)
+
+@pytest.mark.parametrize('_in, out', [
+ ('http://example.com', ('mocked_pr_http://example.com',)),
+ ('ftp://example.com', ('mocked_pr_ftp://example.com',)),
+ ('http*://example.com', ('mocked_pr_http://example.com',
+ 'mocked_pr_https://example.com'))
+])
+def test_parse_pattern(monkeypatch, _in, out):
+ """...."""
+ def mocked_parse_pattern_or_url(url, orig_url, is_pattern=False):
+ """...."""
+ assert is_pattern
+ assert orig_url == _in
+
+ return f'mocked_pr_{url}'
+
+ monkeypatch.setattr(url_patterns, '_parse_pattern_or_url',
+ mocked_parse_pattern_or_url)
+
+ assert url_patterns.parse_pattern(_in) == out
+
+def test_parse_url(monkeypatch):
+ """...."""
+ def mocked_parse_pattern_or_url(url, orig_url):
+ """...."""
+ return f'mocked_pr_{url}'
+
+ monkeypatch.setattr(url_patterns, '_parse_pattern_or_url',
+ mocked_parse_pattern_or_url)
+
+ assert url_patterns.parse_url('https://example.com') == \
+ 'mocked_pr_https://example.com'
+
+def test_parsed_url_hash(sample_url_parsed):
+ """...."""
+ hash(sample_url_parsed)
diff --git a/tests/test_versions.py b/tests/test_versions.py
new file mode 100644
index 0000000..43a3f33
--- /dev/null
+++ b/tests/test_versions.py
@@ -0,0 +1,41 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+
+from hydrilla import versions
+
+sample_version_tuples = [(4, 5, 3), (1, 0, 5), (3,)]
+sample_version_strings = ['4.5.3', '1.0.5', '3']
+
+sample_versions = [*zip(sample_version_tuples, sample_version_strings)]
+
+@pytest.mark.parametrize('version_tuple', sample_version_tuples)
+def test_normalize_version(version_tuple):
+ """Verify that normalize_version() produces proper results."""
+ assert versions.normalize_version([*version_tuple]) == version_tuple
+ assert versions.normalize_version([*version_tuple, 0]) == version_tuple
+
+@pytest.mark.parametrize('version_tuple, string', sample_versions)
+def test_parse_version(version_tuple, string):
+ """Verify that parse_version() produces proper results."""
+ assert versions.parse_version(string)
+ assert versions.parse_version(string + '.0') == tuple([*version_tuple, 0])
+
+def test_parse_version_bad_string():
+ """Verify that parse_version() raises when passed an invalid string."""
+ with pytest.raises(ValueError):
+ versions.parse_version('i am not a valid version')
+
+@pytest.mark.parametrize('version_tuple, string', sample_versions)
+def test_version_string(version_tuple, string):
+ """Verify that version_string() produces proper results."""
+ for _version_tuple, _string in [
+ (version_tuple, string),
+ (tuple([*version_tuple, 0]), f'{string}.0')
+ ]:
+ assert versions.version_string(_version_tuple) == _string
+ assert versions.version_string(_version_tuple, 5) == f'{_string}-5'
diff --git a/tests/url_patterns_common.py b/tests/url_patterns_common.py
new file mode 100644
index 0000000..de6651d
--- /dev/null
+++ b/tests/url_patterns_common.py
@@ -0,0 +1,23 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+
+from hydrilla import url_patterns
+
+sample_url_str = 'http://example.com/aa/bb'
+
+@pytest.fixture(scope='session')
+def sample_url_parsed():
+ """Generate a simple ParsedUrl object."""
+ return url_patterns.ParsedUrl(
+ orig_url = sample_url_str,
+ scheme = 'http',
+ domain_labels = ('com', 'example'),
+ path_segments = ('aa', 'bb'),
+ has_trailing_slash = False,
+ port = 80
+ )