diff options
author | Wojtek Kosior <koszko@koszko.org> | 2022-06-13 11:06:49 +0200 |
---|---|---|
committer | Wojtek Kosior <koszko@koszko.org> | 2022-07-16 16:31:44 +0200 |
commit | 52d12a4fa124daa1595529e3e7008276a7986d95 (patch) | |
tree | 9b56fe2d28ff0242f8511aca570be455112ad3df /tests | |
parent | 9dcbfdfe8620cc417438d1727aa1e0c89846e9bf (diff) | |
download | haketilo-hydrilla-52d12a4fa124daa1595529e3e7008276a7986d95.tar.gz haketilo-hydrilla-52d12a4fa124daa1595529e3e7008276a7986d95.zip |
unfinished partial work
Diffstat (limited to 'tests')
-rw-r--r-- | tests/helpers.py | 51 | ||||
-rw-r--r-- | tests/test_build.py | 818 | ||||
-rw-r--r-- | tests/test_item_infos.py | 527 | ||||
-rw-r--r-- | tests/test_json_instances.py | 194 | ||||
-rw-r--r-- | tests/test_local_apt.py | 754 | ||||
-rw-r--r-- | tests/test_pattern_tree.py | 454 | ||||
-rw-r--r-- | tests/test_server.py | 30 | ||||
-rw-r--r-- | tests/test_url_patterns.py | 188 | ||||
-rw-r--r-- | tests/test_versions.py | 41 | ||||
-rw-r--r-- | tests/url_patterns_common.py | 23 |
10 files changed, 3061 insertions, 19 deletions
diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..df474b0 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,51 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import re + +variable_word_re = re.compile(r'^<(.+)>$') + +def process_command(command, expected_command): + """Validate the command line and extract its variable parts (if any).""" + assert len(command) == len(expected_command) + + extracted = {} + for word, expected_word in zip(command, expected_command): + match = variable_word_re.match(expected_word) + if match: + extracted[match.group(1)] = word + else: + assert word == expected_word + + return extracted + +def run_missing_executable(command, **kwargs): + """ + Instead of running a command, raise FileNotFoundError as if its executable + was missing. + """ + raise FileNotFoundError('dummy') + +class MockedCompletedProcess: + """ + Object with some fields similar to those of subprocess.CompletedProcess. + """ + def __init__(self, args, returncode=0, + stdout='some output', stderr='some error output', + text_output=True): + """ + Initialize MockedCompletedProcess. Convert strings to bytes if needed. + """ + self.args = args + self.returncode = returncode + + if type(stdout) is str and not text_output: + stdout = stdout.encode() + if type(stderr) is str and not text_output: + stderr = stderr.encode() + + self.stdout = stdout + self.stderr = stderr diff --git a/tests/test_build.py b/tests/test_build.py new file mode 100644 index 0000000..868594e --- /dev/null +++ b/tests/test_build.py @@ -0,0 +1,818 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +# Enable using with Python 3.7. +from __future__ import annotations + +import pytest +import json +import shutil +import functools as ft + +from tempfile import TemporaryDirectory +from pathlib import Path, PurePosixPath +from hashlib import sha256 +from zipfile import ZipFile +from contextlib import contextmanager + +from jsonschema import ValidationError + +from hydrilla import _version, json_instances, versions +from hydrilla.json_instances import _schema_name_re +from hydrilla.builder import build, local_apt +from hydrilla.builder.common_errors import * + +from .helpers import * + +here = Path(__file__).resolve().parent + +expected_generated_by = { + 'name': 'hydrilla.builder', + 'version': _version.version +} + +orig_srcdir = here / 'source-package-example' + +index_obj = json_instances.read_instance(orig_srcdir / 'index.json') + +def read_files(*file_list): + """ + Take names of files under srcdir and return a dict that maps them to their + contents (as bytes). + """ + return dict((name, (orig_srcdir / name).read_bytes()) for name in file_list) + +dist_files = { + **read_files('LICENSES/CC0-1.0.txt', 'bye.js', 'hello.js', 'message.js'), + 'report.spdx': b'dummy spdx output' +} +src_files = { + **dist_files, + **read_files('README.txt', 'README.txt.license', '.reuse/dep5', + 'index.json') +} +extra_archive_files = { +} + +sha256_hashes = dict((name, sha256(contents).digest().hex()) + for name, contents in src_files.items()) + +del src_files['report.spdx'] + +expected_source_copyright = [{ + 'file': 'report.spdx', + 'sha256': sha256_hashes['report.spdx'] +}, { + 'file': 'LICENSES/CC0-1.0.txt', + 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt'] +}] + +expected_resources = [{ + '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': expected_source_copyright, + 'type': 'resource', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68', + 'version': [2021, 11, 10], + 'revision': 1, + 'description': 'greets an apple', + 'dependencies': [{'identifier': 'hello-message'}], + 'scripts': [{ + 'file': 'hello.js', + 'sha256': sha256_hashes['hello.js'] + }, { + 'file': 'bye.js', + 'sha256': sha256_hashes['bye.js'] + }], + 'generated_by': expected_generated_by +}, { + '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': expected_source_copyright, + 'type': 'resource', + 'identifier': 'hello-message', + 'long_name': 'Hello Message', + 'uuid': '1ec36229-298c-4b35-8105-c4f2e1b9811e', + 'version': [2021, 11, 10], + 'revision': 2, + 'description': 'define messages for saying hello and bye', + 'dependencies': [], + 'scripts': [{ + 'file': 'message.js', + 'sha256': sha256_hashes['message.js'] + }], + 'generated_by': expected_generated_by +}] + +expected_mapping = { + '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': expected_source_copyright, + 'type': 'mapping', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7', + 'version': [2021, 11, 10], + 'description': 'causes apple to get greeted on Hydrillabugs issue tracker', + 'payloads': { + 'https://hydrillabugs.koszko.org/***': { + 'identifier': 'helloapple' + }, + 'https://hachettebugs.koszko.org/***': { + 'identifier': 'helloapple' + } + }, + 'generated_by': expected_generated_by +} + +expected_source_description = { + '$schema': 'https://hydrilla.koszko.org/schemas/api_source_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': expected_source_copyright, + 'source_archives': { + 'zip': { + 'sha256': '!!!!value to fill during test!!!!', + } + }, + 'upstream_url': 'https://git.koszko.org/hydrilla-source-package-example', + 'definitions': [{ + 'type': 'mapping', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'version': [2021, 11, 10], + }, { + 'type': 'resource', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'version': [2021, 11, 10], + }, { + 'type': 'resource', + 'identifier': 'hello-message', + 'long_name': 'Hello Message', + 'version': [2021, 11, 10], + }], + 'generated_by': expected_generated_by +} + +expected = [expected_mapping, *expected_resources, expected_source_description] +expected_items = expected[:3] + +def run_reuse(command, **kwargs): + """ + Instead of running a 'reuse' command, check if 'mock_reuse_missing' file + exists under root directory. If yes, raise FileNotFoundError as if 'reuse' + command was missing. If not, check if 'README.txt.license' file exists + in the requested directory and return zero if it does. + """ + expected = ['reuse', '--root', '<root>', + 'lint' if 'lint' in command else 'spdx'] + + root_path = Path(process_command(command, expected)['root']) + + if (root_path / 'mock_reuse_missing').exists(): + raise FileNotFoundError('dummy') + + is_reuse_compliant = (root_path / 'README.txt.license').exists() + + return MockedCompletedProcess(command, 1 - is_reuse_compliant, + stdout=f'dummy {expected[-1]} output', + text_output=kwargs.get('text')) + +mocked_piggybacked_archives = [ + PurePosixPath('apt/something.deb'), + PurePosixPath('apt/something.orig.tar.gz'), + PurePosixPath('apt/something.debian.tar.xz'), + PurePosixPath('othersystem/other-something.tar.gz') +] + +@pytest.fixture +def mock_piggybacked_apt_system(monkeypatch): + """Make local_apt.piggybacked_system() return a mocked result.""" + # We set 'td' to a temporary dir path further below. + td = None + + class MockedPiggybacked: + """Minimal mock of Piggybacked object.""" + package_license_files = [PurePosixPath('.apt-root/.../copyright')] + resource_must_depend = [{'identifier': 'apt-common-licenses'}] + + def resolve_file(path): + """ + For each path that starts with '.apt-root' return a valid dummy file + path. + """ + if path.parts[0] != '.apt-root': + return None + + (td / path.name).write_text(f'dummy {path.name}') + + return (td / path.name) + + def archive_files(): + """Yield some valid dummy file path tuples.""" + for desired_path in mocked_piggybacked_archives: + real_path = td / desired_path.name + real_path.write_text(f'dummy {desired_path.name}') + + yield desired_path, real_path + + @contextmanager + def mocked_piggybacked_system(piggyback_def, piggyback_files): + """Mock the execution of local_apt.piggybacked_system().""" + assert piggyback_def == { + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['somelib=1.0'], + 'dependencies': False + } + if piggyback_files is not None: + assert {str(path) for path in mocked_piggybacked_archives} == \ + {path.relative_to(piggyback_files).as_posix() + for path in piggyback_files.rglob('*') if path.is_file()} + + yield MockedPiggybacked + + monkeypatch.setattr(local_apt, 'piggybacked_system', + mocked_piggybacked_system) + + with TemporaryDirectory() as td: + td = Path(td) + yield + +@pytest.fixture +def sample_source(): + """Prepare a directory with sample Haketilo source package.""" + with TemporaryDirectory() as td: + sample_source = Path(td) / 'hello' + for name, contents in src_files.items(): + path = sample_source / name + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(contents) + + yield sample_source + +def collect(list): + """Decorate function by appending it to the specified list.""" + def decorator(function): + """The actual decorator that will be applied.""" + list.append(function) + return function + + return decorator + +variant_makers = [] + +@collect(variant_makers) +def sample_source_change_index_json(monkeypatch, sample_source): + """ + Return a non-standard path for index.json. Ensure parent directories exist. + """ + # Use a path under sample_source so that it gets auto-deleted after the + # test. Use a file under .git because .git is ignored by REUSE. + path = sample_source / '.git' / 'replacement.json' + path.parent.mkdir() + return path + +@collect(variant_makers) +def sample_source_add_comments(monkeypatch, sample_source): + """Add index.json comments that should be preserved.""" + for dictionary in index_obj, *index_obj['definitions'], *expected: + monkeypatch.setitem(dictionary, 'comment', 'index.json comment') + +@collect(variant_makers) +def sample_source_remove_spdx(monkeypatch, sample_source): + """Remove spdx report generation.""" + monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report') + + pred = lambda ref: ref['file'] != 'report.spdx' + copy_refs_in = list(filter(pred, index_obj['copyright'])) + monkeypatch.setitem(index_obj, 'copyright', copy_refs_in) + + copy_refs_out = list(filter(pred, expected_source_copyright)) + for obj in expected: + monkeypatch.setitem(obj, 'source_copyright', copy_refs_out) + + monkeypatch.delitem(dist_files, 'report.spdx') + + # To verify that reuse does not get called now, make mocked subprocess.run() + # raise an error if called. + (sample_source / 'mock_reuse_missing').touch() + +@collect(variant_makers) +def sample_source_remove_additional_files(monkeypatch, sample_source): + """Use default value ([]) for 'additionall_files' property.""" + monkeypatch.delitem(index_obj, 'additional_files') + + for name in 'README.txt', 'README.txt.license', '.reuse/dep5': + monkeypatch.delitem(src_files, name) + +@collect(variant_makers) +def sample_source_remove_script(monkeypatch, sample_source): + """Use default value ([]) for 'scripts' property in one of the resources.""" + monkeypatch.delitem(index_obj['definitions'][2], 'scripts') + + monkeypatch.setitem(expected_resources[1], 'scripts', []) + + for files in dist_files, src_files: + monkeypatch.delitem(files, 'message.js') + +@collect(variant_makers) +def sample_source_remove_payloads(monkeypatch, sample_source): + """Use default value ({}) for 'payloads' property in mapping.""" + monkeypatch.delitem(index_obj['definitions'][0], 'payloads') + + monkeypatch.setitem(expected_mapping, 'payloads', {}) + +@collect(variant_makers) +def sample_source_remove_uuids(monkeypatch, sample_source): + """Don't use UUIDs (they are optional).""" + for definition in index_obj['definitions']: + monkeypatch.delitem(definition, 'uuid') + + for description in expected: + if 'uuid' in description: + monkeypatch.delitem(description, 'uuid') + +@collect(variant_makers) +def sample_source_add_extra_props(monkeypatch, sample_source): + """Add some unrecognized properties that should be stripped.""" + to_process = [index_obj] + while to_process: + processed = to_process.pop() + + if type(processed) is list: + to_process.extend(processed) + elif type(processed) is dict and 'spurious_property' not in processed: + to_process.extend(v for k, v in processed.items() + if k != 'payloads') + monkeypatch.setitem(processed, 'spurious_property', 'some_value') + +@collect(variant_makers) +def sample_source_make_version_2(monkeypatch, sample_source, + expected_documents_to_modify=[]): + """Increase sources' schema version from 1 to 2.""" + for obj in index_obj, *expected_documents_to_modify: + monkeypatch.setitem(obj, '$schema', obj['$schema'].replace('1', '2')) + +permission_variant_makers = [] + +@collect(permission_variant_makers) +def sample_source_bool_perm_ignored(permission, monkeypatch, sample_source, + value=True): + """ + Specify a boolean permissions in sources, but keep sources' schema version + at 1. + """ + for definition in index_obj['definitions']: + monkeypatch.setitem(definition, 'permissions', {permission: value}) + +@collect(permission_variant_makers) +def sample_source_bool_perm(permission, monkeypatch, sample_source): + """Specify a boolean permission in sources.""" + sample_source_bool_perm_ignored(permission, monkeypatch, sample_source) + sample_source_make_version_2(monkeypatch, sample_source, expected_items) + + for obj in expected_items: + monkeypatch.setitem(obj, 'permissions', {permission: True}) + +@collect(permission_variant_makers) +def sample_source_bool_perm_defaults(permission, monkeypatch, sample_source): + """ + Specify a boolean permission in sources but use the default value ("False"). + """ + sample_source_bool_perm_ignored(permission, monkeypatch, sample_source, + value=False) + sample_source_make_version_2(monkeypatch, sample_source) + +for permission in 'cors_bypass', 'eval': + for variant_maker in permission_variant_makers: + variant_makers.append(ft.partial(variant_maker, permission)) + +@collect(variant_makers) +def sample_source_req_mappings_ignored(monkeypatch, sample_source, + value=[{'identifier': 'mapping-dep'}]): + """ + Specify dependencies on mappings, but keep sources' schema version at 1. + """ + for definition in index_obj['definitions']: + monkeypatch.setitem(definition, 'required_mappings', value); + +@collect(variant_makers) +def sample_source_req_mappings(monkeypatch, sample_source): + """Specify dependencies on mappings.""" + sample_source_req_mappings_ignored(monkeypatch, sample_source) + sample_source_make_version_2(monkeypatch, sample_source, expected_items) + + for obj in expected_items: + monkeypatch.setitem(obj, 'required_mappings', + [{'identifier': 'mapping-dep'}]) + +@collect(variant_makers) +def sample_source_req_mappings_defaults(monkeypatch, sample_source): + """Specify dependencies of a mapping, but use the default value ("[]").""" + sample_source_req_mappings_ignored(monkeypatch, sample_source, value=[]) + sample_source_make_version_2(monkeypatch, sample_source) + +@collect(variant_makers) +def sample_source_combined_def(monkeypatch, sample_source): + """Define mapping and resource together.""" + sample_source_make_version_2(monkeypatch, sample_source) + + mapping_def = index_obj['definitions'][0] + resource_defs = index_obj['definitions'][1:3] + + item_defs_shortened = [mapping_def, resource_defs[1]] + monkeypatch.setitem(index_obj, 'definitions', item_defs_shortened) + + monkeypatch.setitem(mapping_def, 'type', 'mapping_and_resource') + + new_mapping_ver = [*expected_mapping['version'], 1] + monkeypatch.setitem(mapping_def, 'revision', 1) + monkeypatch.setitem(expected_mapping, 'version', new_mapping_ver) + + for prop in 'scripts', 'dependencies': + monkeypatch.setitem(mapping_def, prop, resource_defs[0][prop]) + + monkeypatch.setitem(expected_resources[0], 'uuid', mapping_def['uuid']) + monkeypatch.setitem(expected_resources[0], 'description', + mapping_def['description']) + + monkeypatch.setitem(expected_source_description['definitions'][0], + 'version', new_mapping_ver) + +@collect(variant_makers) +def sample_source_minmax_haketilo_ver_ignored(monkeypatch, sample_source, + min_ver=[1, 2], max_ver=[1, 2]): + """ + Specify version constraints on Haketilo, but keep sources' schema version at + 1. + """ + mapping_def = index_obj['definitions'][0] + monkeypatch.setitem(mapping_def, 'min_haketilo_version', min_ver) + monkeypatch.setitem(mapping_def, 'max_haketilo_version', max_ver) + +@collect(variant_makers) +def sample_source_minmax_haketilo_ver(monkeypatch, sample_source): + """Specify version constraints on Haketilo.""" + sample_source_minmax_haketilo_ver_ignored(monkeypatch, sample_source) + sample_source_make_version_2(monkeypatch, sample_source, [expected_mapping]) + + monkeypatch.setitem(expected_mapping, 'min_haketilo_version', [1, 2]) + monkeypatch.setitem(expected_mapping, 'max_haketilo_version', [1, 2]) + +@collect(variant_makers) +def sample_source_minmax_haketilo_ver_default(monkeypatch, sample_source): + """Specify version constraints on Haketilo, but use default values.""" + sample_source_minmax_haketilo_ver_ignored(monkeypatch, sample_source, + min_ver=[1], max_ver=[65536]) + sample_source_make_version_2(monkeypatch, sample_source) + +piggyback_archive_names = [ + 'apt/something.deb', + 'apt/something.orig.tar.gz', + 'apt/something.debian.tar.xz', + 'othersystem/other-something.tar.gz' +] + +@collect(variant_makers) +def sample_source_add_piggyback_ignored(monkeypatch, sample_source, + extra_build_args={}): + """ + Add piggybacked foreign system packages, but keep sources' schema version at + 1. + """ + old_build = build.Build + new_build = lambda *a, **kwa: old_build(*a, **kwa, **extra_build_args) + monkeypatch.setattr(build, 'Build', new_build) + + monkeypatch.setitem(index_obj, 'piggyback_on', { + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['somelib=1.0'], + 'dependencies': False + }) + +@collect(variant_makers) +def sample_source_add_piggyback(monkeypatch, sample_source, + extra_build_args={}): + """Add piggybacked foreign system packages.""" + sample_source_add_piggyback_ignored\ + (monkeypatch, sample_source, extra_build_args) + + sample_source_make_version_2(monkeypatch, sample_source) + + new_refs = {} + for name in '.apt-root/.../copyright', '.apt-root/.../script.js': + contents = f'dummy {PurePosixPath(name).name}'.encode() + digest = sha256(contents).digest().hex() + monkeypatch.setitem(dist_files, name, contents) + monkeypatch.setitem(sha256_hashes, name, digest) + new_refs[PurePosixPath(name).name] = {'file': name, 'sha256': digest} + + new_list = [*expected_source_copyright, new_refs['copyright']] + for obj in expected: + monkeypatch.setitem(obj, 'source_copyright', new_list) + + for obj in expected_resources: + new_list = [{'identifier': 'apt-common-licenses'}, *obj['dependencies']] + monkeypatch.setitem(obj, 'dependencies', new_list) + + for obj in index_obj['definitions'][1], expected_resources[0]: + new_list = [new_refs['script.js'], *obj['scripts']] + monkeypatch.setitem(obj, 'scripts', new_list) + + for name in piggyback_archive_names: + path = PurePosixPath('hello.foreign-packages') / name + monkeypatch.setitem(extra_archive_files, str(path), + f'dummy {path.name}'.encode()) + +def prepare_foreign_packages_dir(path): + """ + Put some dummy archive in the directory so that it can be passed to + piggybacked_system(). + """ + for name in piggyback_archive_names: + archive_path = path / name + archive_path.parent.mkdir(parents=True, exist_ok=True) + archive_path.write_text(f'dummy {archive_path.name}') + +@collect(variant_makers) +def sample_source_add_piggyback_pass_archives(monkeypatch, sample_source): + """ + Add piggybacked foreign system packages, use pre-downloaded foreign package + archives (have Build() find them in their default directory). + """ + # Dir next to 'sample_source' will also be gc'd by sample_source() fixture. + foreign_packages_dir = sample_source.parent / 'arbitrary-name' + + prepare_foreign_packages_dir(foreign_packages_dir) + + sample_source_add_piggyback(monkeypatch, sample_source, + {'piggyback_files': foreign_packages_dir}) + +@collect(variant_makers) +def sample_source_add_piggyback_find_archives(monkeypatch, sample_source): + """ + Add piggybacked foreign system packages, use pre-downloaded foreign package + archives (specify their directory as argument to Build()). + """ + # Dir next to 'sample_source' will also be gc'd by sample_source() fixture. + foreign_packages_dir = sample_source.parent / 'hello.foreign-packages' + + prepare_foreign_packages_dir(foreign_packages_dir) + + sample_source_add_piggyback(monkeypatch, sample_source) + +@collect(variant_makers) +def sample_source_add_piggyback_no_download(monkeypatch, sample_source, + pass_directory_to_build=False): + """ + Add piggybacked foreign system packages, use pre-downloaded foreign package + archives. + """ + # Use a dir next to 'sample_source'; have it gc'd by sample_source fixture. + if pass_directory_to_build: + foreign_packages_dir = sample_source.parent / 'arbitrary-name' + else: + foreign_packages_dir = sample_source.parent / 'hello.foreign-packages' + + prepare_foreign_packages_dir(foreign_packages_dir) + + sample_source_add_piggyback(monkeypatch, sample_source) + +@pytest.fixture(params=[lambda m, s: None, *variant_makers]) +def sample_source_make_variants(request, monkeypatch, sample_source, + mock_piggybacked_apt_system): + """ + Prepare a directory with sample Haketilo source package in multiple slightly + different versions (all correct). Return an index.json path that should be + used when performing test build. + """ + index_path = request.param(monkeypatch, sample_source) or Path('index.json') + + index_text = json.dumps(index_obj) + + (sample_source / index_path).write_text(index_text) + + monkeypatch.setitem(src_files, 'index.json', index_text.encode()) + + return index_path + +def try_validate(as_what, instance): + """ + Select the right JSON schema. Return without errors only if the instance + validates against it. + """ + schema_fmt = f'{as_what}-{{}}.schema.json' + json_instances.validate_instance(instance, schema_fmt) + +@pytest.mark.subprocess_run(build, run_reuse) +@pytest.mark.usefixtures('mock_subprocess_run') +def test_build(sample_source, sample_source_make_variants, tmpdir): + """Build the sample source package and verify the produced files.""" + index_json_path = sample_source_make_variants + + # First, build the package + build.Build(sample_source, index_json_path).write_package_files(tmpdir) + + # Verify directories under destination directory + assert {'file', 'resource', 'mapping', 'source'} == \ + set([path.name for path in tmpdir.iterdir()]) + + # Verify files under 'file/' + file_dir = tmpdir / 'file' / 'sha256' + + for name, contents in dist_files.items(): + dist_file_path = file_dir / sha256_hashes[name] + assert dist_file_path.is_file() + assert dist_file_path.read_bytes() == contents + + assert {p.name for p in file_dir.iterdir()} == \ + {sha256_hashes[name] for name in dist_files.keys()} + + # Verify files under 'resource/' + resource_dir = tmpdir / 'resource' + + assert {rj['identifier'] for rj in expected_resources} == \ + {path.name for path in resource_dir.iterdir()} + + for resource_json in expected_resources: + subdir = resource_dir / resource_json['identifier'] + ver_str = versions.version_string(resource_json['version']) + assert [ver_str] == [path.name for path in subdir.iterdir()] + + assert json.loads((subdir / ver_str).read_text()) == resource_json + + try_validate('api_resource_description', resource_json) + + # Verify files under 'mapping/' + mapping_dir = tmpdir / 'mapping' + assert ['helloapple'] == [path.name for path in mapping_dir.iterdir()] + + subdir = mapping_dir / 'helloapple' + + ver_str = versions.version_string(expected_mapping['version']) + assert [ver_str] == [path.name for path in subdir.iterdir()] + + assert json.loads((subdir / ver_str).read_text()) == expected_mapping + + try_validate('api_mapping_description', expected_mapping) + + # Verify files under 'source/' + source_dir = tmpdir / 'source' + assert {'hello.json', 'hello.zip'} == \ + {path.name for path in source_dir.iterdir()} + + archive_files = {**dict((f'hello/{name}', contents) + for name, contents in src_files.items()), + **extra_archive_files} + + with ZipFile(source_dir / 'hello.zip', 'r') as archive: + print(archive.namelist()) + assert len(archive.namelist()) == len(archive_files) + + for name, contents in archive_files.items(): + assert archive.read(name) == contents + + zip_ref = expected_source_description['source_archives']['zip'] + zip_contents = (source_dir / 'hello.zip').read_bytes() + zip_ref['sha256'] = sha256(zip_contents).digest().hex() + + assert json.loads((source_dir / 'hello.json').read_text()) == \ + expected_source_description + + try_validate('api_source_description', expected_source_description) + +error_makers = [] + +@collect(error_makers) +def sample_source_error_missing_file(monkeypatch, sample_source): + """ + Modify index.json to expect missing report.spdx file and cause an error. + """ + monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report') + return FileReferenceError, '^referenced_file_report.spdx_missing$' + +@collect(error_makers) +def sample_source_error_index_schema(monkeypatch, sample_source): + """Modify index.json to be incompliant with the schema.""" + monkeypatch.delitem(index_obj, 'definitions') + return ValidationError, + +@collect(error_makers) +def sample_source_error_unknown_index_schema(monkeypatch, sample_source): + """Modify index.json to be use a not-yet-released schema.""" + schema_id = \ + 'https://hydrilla.koszko.org/schemas/package_source-65536.schema.json' + monkeypatch.setitem(index_obj, "$schema", schema_id) + return hydrilla_util.UnknownSchemaError, \ + r'^unknown_schema_package_source_.*/hello/index\.json$' + +@collect(error_makers) +def sample_source_error_bad_comment(monkeypatch, sample_source): + """Modify index.json to have an invalid '/' in it.""" + return json.JSONDecodeError, '^bad_comment: .*', \ + json.dumps(index_obj) + '/something\n' + +@collect(error_makers) +def sample_source_error_bad_json(monkeypatch, sample_source): + """Modify index.json to not be valid json even after comment stripping.""" + return json.JSONDecodeError, '', json.dumps(index_obj) + '???\n' + +@collect(error_makers) +def sample_source_error_missing_reuse(monkeypatch, sample_source): + """Cause mocked reuse process invocation to fail with FileNotFoundError.""" + (sample_source / 'mock_reuse_missing').touch() + return build.ReuseError, '^couldnt_execute_reuse_is_it_installed$' + +@collect(error_makers) +def sample_source_error_missing_license(monkeypatch, sample_source): + """Remove a file to make package REUSE-incompliant.""" + (sample_source / 'README.txt.license').unlink() + + error_regex = """^\ +command_reuse --root \\S+ lint_failed + +STDOUT_OUTPUT_heading + +dummy lint output + +STDERR_OUTPUT_heading + +some error output\ +$\ +""" + + return build.ReuseError, error_regex + +@collect(error_makers) +def sample_source_error_file_outside(monkeypatch, sample_source): + """Make index.json illegally reference a file outside srcdir.""" + new_list = [*index_obj['copyright'], {'file': '../abc'}] + monkeypatch.setitem(index_obj, 'copyright', new_list) + return FileReferenceError, '^path_contains_double_dot_\\.\\./abc$' + +@collect(error_makers) +def sample_source_error_reference_itself(monkeypatch, sample_source): + """Make index.json illegally reference index.json.""" + new_list = [*index_obj['copyright'], {'file': 'index.json'}] + monkeypatch.setitem(index_obj, 'copyright', new_list) + return FileReferenceError, '^loading_reserved_index_json$' + +@collect(error_makers) +def sample_source_error_report_excluded(monkeypatch, sample_source): + """ + Make index.json require generation of report.spdx but don't include it among + copyright files. + """ + new_list = [file_ref for file_ref in index_obj['copyright'] + if file_ref['file'] != 'report.spdx'] + monkeypatch.setitem(index_obj, 'copyright', new_list) + return FileReferenceError, '^report_spdx_not_in_copyright_list$' + +@collect(error_makers) +def sample_source_error_combined_unsupported(monkeypatch, sample_source): + """ + Define mapping and resource together but leave source schema version at 1.x + where this is unsupported. + """ + mapping_def = index_obj['definitions'][0] + monkeypatch.setitem(mapping_def, 'type', 'mapping_and_resource') + + return ValidationError, + +@pytest.fixture(params=error_makers) +def sample_source_make_errors(request, monkeypatch, sample_source): + """ + Prepare a directory with sample Haketilo source package in multiple slightly + broken versions. Return an error type that should be raised when running + test build. + """ + error_type, error_regex, index_text = \ + [*request.param(monkeypatch, sample_source), '', ''][0:3] + + index_text = index_text or json.dumps(index_obj) + + (sample_source / 'index.json').write_text(index_text) + + monkeypatch.setitem(src_files, 'index.json', index_text.encode()) + + return error_type, error_regex + +@pytest.mark.subprocess_run(build, run_reuse) +@pytest.mark.usefixtures('mock_subprocess_run') +def test_build_error(tmpdir, sample_source, sample_source_make_errors): + """Try building the sample source package and verify generated errors.""" + error_type, error_regex = sample_source_make_errors + + dstdir = Path(tmpdir) / 'dstdir' + dstdir.mkdir(exist_ok=True) + + with pytest.raises(error_type, match=error_regex): + build.Build(sample_source, Path('index.json'))\ + .write_package_files(dstdir) diff --git a/tests/test_item_infos.py b/tests/test_item_infos.py new file mode 100644 index 0000000..9de3c96 --- /dev/null +++ b/tests/test_item_infos.py @@ -0,0 +1,527 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest +import pathlib +import re +import dataclasses as dc + +from immutables import Map + +from hydrilla import item_infos, versions, json_instances +from hydrilla.exceptions import HaketiloException + +def test_make_item_refs_seq_empty(): + """....""" + assert item_infos.make_item_refs_seq([]) == () + +def test_get_item_refs_seq_nonempty(): + """....""" + ref_objs = [{'identifier': 'abc'}, {'identifier': 'def'}] + + result = item_infos.make_item_refs_seq(ref_objs) + + assert type(result) is tuple + assert [ref.identifier for ref in result] == ['abc', 'def'] + +@pytest.fixture +def mock_make_item_refs_seq(monkeypatch): + """....""" + def mocked_make_item_refs_seq(ref_objs): + """....""" + assert ref_objs == getattr( + mocked_make_item_refs_seq, + 'expected', + [{'identifier': 'abc'}, {'identifier': 'def'}] + ) + + return (item_infos.ItemRef('abc'), item_infos.ItemRef('def')) + + monkeypatch.setattr(item_infos, 'make_item_refs_seq', + mocked_make_item_refs_seq) + + return mocked_make_item_refs_seq + +def test_make_required_mappings_compat_too_low(): + """....""" + assert item_infos.make_required_mappings('whatever', 1) == () + +@pytest.mark.usefixtures('mock_make_item_refs_seq') +def test_make_required_mappings_compat_ok(): + """....""" + ref_objs = [{'identifier': 'abc'}, {'identifier': 'def'}] + + assert item_infos.make_required_mappings(ref_objs, 2) == \ + (item_infos.ItemRef('abc'), item_infos.ItemRef('def')) + +def test_make_file_refs_seq_empty(): + """....""" + assert item_infos.make_file_refs_seq([]) == () + +def test_make_file_refs_seq_nonempty(): + """....""" + ref_objs = [{'file': 'abc', 'sha256': 'dummy_hash1'}, + {'file': 'def', 'sha256': 'dummy_hash2'}] + + result = item_infos.make_file_refs_seq(ref_objs) + + assert type(result) is tuple + assert [ref.name for ref in result] == ['abc', 'def'] + assert [ref.sha256 for ref in result] == ['dummy_hash1', 'dummy_hash2'] + +def test_generated_by_make_empty(): + """....""" + assert item_infos.GeneratedBy.make(None) == None + +@pytest.mark.parametrize('_in, out_version', [ + ({'name': 'abc'}, None), + ({'name': 'abc', 'version': '1.1.1'}, '1.1.1') +]) +def test_generated_by_make_nonempty(_in, out_version): + """....""" + generated_by = item_infos.GeneratedBy.make(_in) + + assert generated_by.name == 'abc' + assert generated_by.version == out_version + +def test_load_item_info(monkeypatch): + """....""" + def mocked_read_instance(instance_or_path): + """....""" + assert instance_or_path == 'dummy_path' + return 'dummy_instance' + + monkeypatch.setattr(json_instances, 'read_instance', mocked_read_instance) + + def mocked_validate_instance(instance, schema_fmt): + """....""" + assert instance == 'dummy_instance' + assert schema_fmt == 'api_exotictype_description-{}.schema.json' + return 7 + + monkeypatch.setattr(json_instances, 'validate_instance', + mocked_validate_instance) + + class MockedLoadedType: + """....""" + def make(instance, schema_compat, repository): + """....""" + assert instance == 'dummy_instance' + assert schema_compat == 7 + assert repository == 'somerepo' + return 'dummy_item_info' + + type_name = 'exotictype' + + assert item_infos._load_item_info( + MockedLoadedType, + 'dummy_path', + 'somerepo' + ) == 'dummy_item_info' + +def test_make_payloads(monkeypatch): + """....""" + payloads_obj = {'http*://example.com/': {'identifier': 'someresource'}} + + def mocked_parse_pattern(pattern): + """....""" + assert pattern == 'http*://example.com/' + + yield 'dummy_parsed_pattern_1' + yield 'dummy_parsed_pattern_2' + + monkeypatch.setattr(item_infos, 'parse_pattern', mocked_parse_pattern) + + assert item_infos.make_payloads(payloads_obj) == Map({ + 'dummy_parsed_pattern_1': item_infos.ItemRef('someresource'), + 'dummy_parsed_pattern_2': item_infos.ItemRef('someresource') + }) + +@pytest.mark.parametrize('info_mod, in_mod', [ + ({}, {}), + ({'uuid': 'dummy_uuid'}, {}), + ({}, {'uuid': 'dummy_uuid'}), + ({'uuid': 'dummy_uuid'}, {'uuid': 'dummy_uuid'}), + ({}, {'identifier': 'abc', '_initialized': True}), + ({}, {'_by_version': Map({(1, 2): 'dummy_old_info'})}) +]) +def test_versioned_item_info_register(info_mod, in_mod): + """....""" + class DummyInfo: + """....""" + uuid = None + identifier = 'abc' + version = (1, 2) + + for name, value in info_mod.items(): + setattr(DummyInfo, name, value) + + in_fields = { + 'uuid': None, + 'identifier': '<dummy>', + '_by_version': Map(), + '_initialized': False, + **in_mod + } + out_fields = { + 'uuid': DummyInfo.uuid or in_mod.get('uuid'), + 'identifier': DummyInfo.identifier, + '_by_version': Map({(1, 2): DummyInfo}), + '_initialized': True + } + + versioned = item_infos.VersionedItemInfo(**in_fields) + new_versioned = versioned.register(DummyInfo) + + assert dc.asdict(versioned) == in_fields + assert dc.asdict(new_versioned) == out_fields + +def test_versioned_item_info_register_bad_uuid(): + """....""" + versioned = item_infos.VersionedItemInfo( + identifier='abc', + uuid='old_uuid' + ) + + class DummyInfo: + """....""" + uuid = 'new_uuid' + identifier = 'abc' + version = (1, 2) + + with pytest.raises(HaketiloException, match='^uuid_mismatch_abc$'): + versioned.register(DummyInfo) + +@pytest.mark.parametrize('previous_registrations', [ + Map(), + Map({(1, 2): 'dummy_info'}) +]) +def test_versioned_item_info_unregister(previous_registrations): + """....""" + versioned = item_infos.VersionedItemInfo( + identifier = 'abc', + _by_version = previous_registrations + ) + + assert versioned.unregister((1, 2)) == \ + dc.replace(versioned, _by_version=Map()) + +@pytest.mark.parametrize('registrations, out', [ + (Map(), True), + (Map({(1, 2): 'dummy_info'}), False) +]) +def test_versioned_item_info_is_empty(registrations, out): + """....""" + versioned = item_infos.VersionedItemInfo( + identifier = 'abc', + _by_version = registrations + ) + + assert versioned.is_empty() == out + +@pytest.mark.parametrize('versions, out', [ + ([(1, 2), (1, 2, 1), (0, 9999, 4), (1, 0, 2)], (1, 2, 1)), + ([(1, 2)], (1, 2)) +]) +def test_versioned_item_info_newest_version(versions, out): + """....""" + versioned = item_infos.VersionedItemInfo( + identifier = 'abc', + _by_version = Map((ver, 'dummy_info') for ver in versions) + ) + + assert versioned.newest_version() == out + +def test_versioned_item_info_newest_version_bad(monkeypatch): + """....""" + monkeypatch.setattr(item_infos.VersionedItemInfo, 'newest_version', + lambda self: 'dummy_ver1') + + versioned = item_infos.VersionedItemInfo( + identifier = 'abc', + _by_version = Map(dummy_ver1='dummy_info1', dummy_ver2='dummy_info2') + ) + + assert versioned.get_newest() == 'dummy_info1' + +def test_versioned_item_info_get_by_ver(): + """....""" + versioned = item_infos.VersionedItemInfo( + identifier = 'abc', + _by_version = Map({(1, 2): 'dummy_info1', (3, 4, 5): 'dummy_info2'}) + ) + + assert versioned.get_by_ver(range(1, 3)) == 'dummy_info1' + +@pytest.mark.parametrize('versions, out', [ + ([(1, 2), (0, 999, 4), (1, 0, 2)], ['(0, 999, 4)', '(1, 0, 2)', '(1, 2)']), + ([], []) +]) +def test_versioned_item_get_all(versions, out): + """....""" + versioned = item_infos.VersionedItemInfo( + identifier = 'abc', + _by_version = Map((ver, str(ver)) for ver in versions) + ) + + assert [*versioned.get_all()] == out + +sample_resource_obj = { + 'source_name': 'somesource', + 'source_copyright': [{'file': 'ABC', 'sha256': 'dummy_sha256'}], + 'version': [1, 2, 3, 0], + 'identifier': 'someid', + 'uuid': None, + 'long_name': 'Some Thing', + 'required_mappings': [{'identifier': 'required1'}], + 'generated_by': {'name': 'sometool', 'version': '1.1.1'}, + 'revision': 4, + 'dependencies': [{'identifier': 'abc'}, {'identifier': 'def'}], + 'scripts': [{'file': 'ABC', 'sha256': 'dummy_sha256'}] +} + +sample_mapping_obj = { + **sample_resource_obj, + 'payloads': { + 'https://example.com/': {'identifier': 'someresource'} + } +} + +del sample_mapping_obj['dependencies'] +del sample_mapping_obj['scripts'] + +@pytest.fixture(scope='session') +def sample_resource_info(): + """....""" + return item_infos.ResourceInfo( + repository = 'somerepo', + source_name = 'somesource', + source_copyright = (item_infos.FileRef('ABC', 'dummy_sha256'),), + version = (1, 2, 3), + identifier = 'someid', + uuid = None, + long_name = 'Some Thing', + required_mappings = (item_infos.ItemRef('required1'),), + generated_by = item_infos.GeneratedBy('sometool', '1.1.1'), + revision = 4, + dependencies = (item_infos.ItemRef('abc'), + item_infos.ItemRef('def')), + scripts = (item_infos.FileRef('ABC', 'dummy_sha256'),) + ) + +@pytest.fixture(scope='session') +def sample_mapping_info(): + """....""" + payloads = Map({'https://example.com/': item_infos.ItemRef('someresource')}) + + return item_infos.MappingInfo( + repository = 'somerepo', + source_name = 'somesource', + source_copyright = (item_infos.FileRef('ABC', 'dummy_sha256'),), + version = (1, 2, 3), + identifier = 'someid', + uuid = None, + long_name = 'Some Thing', + required_mappings = (item_infos.ItemRef('required1'),), + generated_by = item_infos.GeneratedBy('sometool', '1.1.1'), + payloads = payloads + ) + +@pytest.fixture(scope='session') +def sample_info_base_init_kwargs(sample_resource_info): + kwargs = {} + for field_name in item_infos.ItemInfoBase.__annotations__.keys(): + kwargs[field_name] = getattr(sample_resource_info, field_name) + + return Map(kwargs) + +@pytest.fixture +def mock_version_string(monkeypatch): + """....""" + def mocked_version_string(version, revision=None): + """....""" + assert version == (1, 2, 3) + assert revision in (None, 4) + return '1.2.3' if revision is None else '1.2.3-4' + + monkeypatch.setattr(versions, 'version_string', mocked_version_string) + +@pytest.mark.usefixtures('mock_version_string') +def test_item_info_path(sample_resource_info): + """....""" + assert sample_resource_info.path_relative_to_type() == 'someid/1.2.3' + assert sample_resource_info.path() == 'resource/someid/1.2.3' + +@pytest.mark.usefixtures('mock_version_string') +def test_resource_info_versioned_identifier(sample_resource_info, monkeypatch): + """....""" + monkeypatch.setattr(item_infos.ItemInfoBase, 'versioned_identifier', + lambda self: '<dummy>') + + assert sample_resource_info.versioned_identifier == '<dummy>-4' + +@pytest.mark.usefixtures('mock_version_string') +def test_mapping_info_versioned_identifier(sample_mapping_info): + """....""" + assert sample_mapping_info.versioned_identifier == 'someid-1.2.3' + +@pytest.fixture +def mock_make_file_refs_seq(monkeypatch): + """....""" + def mocked_make_file_refs_seq(ref_objs): + """....""" + assert ref_objs == getattr( + mocked_make_file_refs_seq, + 'expected', + [{'file': 'ABC', 'sha256': 'dummy_sha256'}] + ) + + return (item_infos.FileRef(name='ABC', sha256='dummy_sha256'),) + + monkeypatch.setattr(item_infos, 'make_file_refs_seq', + mocked_make_file_refs_seq) + + return mocked_make_file_refs_seq + +@pytest.mark.parametrize('missing_prop', [ + 'required_mappings', + 'generated_by', + 'uuid' +]) +@pytest.mark.usefixtures('mock_make_item_refs_seq', 'mock_make_file_refs_seq') +def test_item_info_get_base_init_kwargs( + missing_prop, + monkeypatch, + sample_resource_info, + sample_info_base_init_kwargs, + mock_make_file_refs_seq +): + """....""" + monkeypatch.delitem(sample_resource_obj, missing_prop) + + def mocked_normalize_version(version): + """....""" + assert version == [1, 2, 3, 0] + + return (1, 2, 3) + + monkeypatch.setattr(versions, 'normalize_version', mocked_normalize_version) + + def mocked_make_required_mappings(ref_objs, schema_compat): + """....""" + if missing_prop == 'required_mappings': + assert ref_objs == [] + else: + assert ref_objs == [{'identifier': 'required1'}] + + assert schema_compat == 2 + + return (item_infos.ItemRef('required1'),) + + monkeypatch.setattr(item_infos, 'make_required_mappings', + mocked_make_required_mappings) + + def mocked_generated_by_make(generated_by_obj): + """....""" + if missing_prop == 'generated_by': + assert generated_by_obj == None + else: + assert generated_by_obj == {'name': 'sometool', 'version': '1.1.1'} + + return item_infos.GeneratedBy(name='sometool', version='1.1.1') + + monkeypatch.setattr(item_infos.GeneratedBy, 'make', + mocked_generated_by_make) + + expected = sample_info_base_init_kwargs + if missing_prop == 'uuid': + expected = expected.set('uuid', None) + + Base = item_infos.ItemInfoBase + assert Base._get_base_init_kwargs(sample_resource_obj, 2, 'somerepo') == \ + expected + +@pytest.fixture +def mock_get_base_init_kwargs(monkeypatch, sample_info_base_init_kwargs): + """....""" + def mocked_get_base_init_kwargs(item_obj, schema_compat, repository): + """....""" + assert schema_compat == 2 + assert item_obj['identifier'] == 'someid' + assert repository == 'somerepo' + + return sample_info_base_init_kwargs + + monkeypatch.setattr(item_infos.ItemInfoBase, '_get_base_init_kwargs', + mocked_get_base_init_kwargs) + +@pytest.mark.parametrize('missing_prop', ['dependencies', 'scripts']) +@pytest.mark.usefixtures('mock_get_base_init_kwargs') +def test_resource_info_make( + missing_prop, + monkeypatch, + sample_resource_info, + mock_make_item_refs_seq, + mock_make_file_refs_seq +): + """....""" + _in = sample_resource_obj + monkeypatch.delitem(_in, missing_prop) + + if missing_prop == 'dependencies': + mock_make_item_refs_seq.expected = [] + elif missing_prop == 'scripts': + mock_make_file_refs_seq.expected = [] + + assert item_infos.ResourceInfo.make(_in, 2, 'somerepo') == \ + sample_resource_info + +@pytest.mark.parametrize('missing_payloads', [True, False]) +@pytest.mark.usefixtures('mock_get_base_init_kwargs', 'mock_make_item_refs_seq') +def test_mapping_info_make(missing_payloads, monkeypatch, sample_mapping_info): + """....""" + _in = sample_mapping_obj + if missing_payloads: + monkeypatch.delitem(_in, 'payloads') + + def mocked_make_payloads(payloads_obj): + """....""" + if missing_payloads: + assert payloads_obj == {} + else: + assert payloads_obj == \ + {'https://example.com/': {'identifier': 'someresource'}} + + return Map({'https://example.com/': item_infos.ItemRef('someresource')}) + + monkeypatch.setattr(item_infos, 'make_payloads', mocked_make_payloads) + + assert item_infos.MappingInfo.make(_in, 2, 'somerepo') == \ + sample_mapping_info + +@pytest.mark.parametrize('type_name', ['ResourceInfo', 'MappingInfo']) +def test_make_item_info(type_name, monkeypatch): + """....""" + info_type = getattr(item_infos, type_name) + + def mocked_load_item_info(_info_type, instance_or_path, repository): + """....""" + assert _info_type == info_type + assert instance_or_path == 'dummy_path' + assert repository == 'somerepo' + return 'dummy_info' + + monkeypatch.setattr(item_infos, '_load_item_info', mocked_load_item_info) + + assert info_type.load('dummy_path', 'somerepo') == 'dummy_info' + +def test_resource_info_hash(sample_resource_info): + """....""" + hash(sample_resource_info) + +def test_mapping_info_hash(sample_mapping_info): + """....""" + hash(sample_mapping_info) diff --git a/tests/test_json_instances.py b/tests/test_json_instances.py new file mode 100644 index 0000000..f5bd270 --- /dev/null +++ b/tests/test_json_instances.py @@ -0,0 +1,194 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest +import re + +from hydrilla import json_instances +from hydrilla.exceptions import HaketiloException + +sample_json_no_comments = '{\n"so/me":\n"//json//"\n}\n' + +@pytest.mark.parametrize('_in', [ + '{\n"so/me":\n"//json//"\n}\n', + '{//we\n"so/me"://will\n"//json//"//rock\n}//you\n' +]) +def test_strip_json_comments(_in): + """....""" + assert json_instances.strip_json_comments(_in) == sample_json_no_comments + +@pytest.mark.parametrize('_in, line, char', [ + ('/{\n"so/me":\n"//json//"\n}\n', 1, 1), + ('{\n"so/me":/\n"//json//"\n}/\n', 2, 9), + ('{\n"so/me":/ huehue, I am an invalid comment\n"//json//"\n}\n', 2, 9) +]) +def test_strip_json_comments_bad(_in, line, char): + """....""" + error_regex = f'^bad_json_comment_line_{line}_char_{char}$' + with pytest.raises(HaketiloException, match=error_regex): + json_instances.strip_json_comments(_in) + +@pytest.mark.parametrize('schema_name, full_schema_name', [ + ('package_source-1.0.1.schema.json', 'package_source-1.0.1.schema.json'), + ('package_source-1.0.schema.json', 'package_source-1.0.1.schema.json'), + ('package_source-1.schema.json', 'package_source-1.0.1.schema.json'), + ('package_source-2.schema.json', 'package_source-2.schema.json') +]) +def test_get_schema(schema_name, full_schema_name): + """....""" + url_prefix = 'https://hydrilla.koszko.org/schemas/' + + for prefix in ('', url_prefix): + schema1 = json_instances._get_schema(prefix + schema_name) + assert schema1['$id'] == url_prefix + full_schema_name + + schema2 = json_instances._get_schema(prefix + schema_name) + assert schema2 is schema1 + +@pytest.mark.parametrize('_in', ['dummy_uri', {'$id': 'dummy_uri'}]) +def test_validator_for(_in, monkeypatch): + """....""" + def mocked_get_schema(schema_id): + """....""" + assert schema_id == 'dummy_uri' + return {'$id': 'dummy_uri'} + + monkeypatch.setattr(json_instances, '_get_schema', mocked_get_schema) + + def MockedRefResolver(base_uri, referrer, handlers): + """....<function replaces a class...>""" + assert base_uri == referrer['$id'] + assert referrer == {'$id': 'dummy_uri'} + assert handlers == {'https': mocked_get_schema} + return 'dummy_resolver' + + monkeypatch.setattr(json_instances, 'RefResolver', MockedRefResolver) + + def MockedDraft7Validator(schema, resolver): + """....<same as above>""" + assert schema == {'$id': 'dummy_uri'} + assert resolver == 'dummy_resolver' + return 'dummy_validator' + + monkeypatch.setattr(json_instances, 'Draft7Validator', + MockedDraft7Validator) + + assert json_instances.validator_for(_in) == 'dummy_validator' + +def test_parse_instance(monkeypatch): + """....""" + def mocked_strip_json_comments(text): + """....""" + assert text == 'dummy_commented_json' + return '{"dummy": 1}' + + monkeypatch.setattr(json_instances, 'strip_json_comments', + mocked_strip_json_comments) + + assert json_instances.parse_instance('dummy_commented_json') == {'dummy': 1} + + +def test_read_instance(monkeypatch, tmpdir): + """....""" + def mocked_parse_instance(text): + """....""" + assert text == 'dummy_JSON_text' + return {'dummy': 1} + + monkeypatch.setattr(json_instances, 'parse_instance', mocked_parse_instance) + + somepath = tmpdir / 'somefile' + somepath.write_text('dummy_JSON_text') + + for instance_or_path in (somepath, str(somepath), {'dummy': 1}): + assert json_instances.read_instance(instance_or_path) == {'dummy': 1} + +def test_read_instance_bad(monkeypatch, tmpdir): + """....""" + monkeypatch.setattr(json_instances, 'parse_instance', lambda: 3 / 0) + + somepath = tmpdir / 'somefile' + somepath.write_text('dummy_JSON_text') + + error_regex = f'^text_in_{re.escape(str(somepath))}_not_valid_json$' + with pytest.raises(HaketiloException, match=error_regex): + json_instances.read_instance(somepath) + +@pytest.mark.parametrize('instance, ver_str', [ + ({'$schema': 'a_b_c-1.0.1.0.schema.json'}, '1.0.1.0'), + ({'$schema': '9-9-9-10.5.600.schema.json'}, '10.5.600'), + ({'$schema': 'https://ab.cd-2.schema.json'}, '2') +]) +def test_get_schema_version(instance, ver_str, monkeypatch): + """....""" + def mocked_parse_version(_ver_str): + """....""" + assert _ver_str == ver_str + return 'dummy_version' + + monkeypatch.setattr(json_instances, 'parse_version', mocked_parse_version) + + assert json_instances.get_schema_version(instance) == 'dummy_version' + +@pytest.mark.parametrize('instance', [ + {'$schema': 'https://ab.cd-0.schema.json'}, + {'$schema': 'https://ab.cd-02.schema.json'}, + {'$schema': 'https://ab.cd-2.00.schema.json'}, + {'$schema': 'https://ab.cd-2.01.schema.json'}, + {'$schema': 'https://ab.cd-2.schema.json5'}, + {'$schema': 'https://ab.cd-2.schema@json'}, + {'$schema': 'https://ab.cd_2.schema.json'}, + {'$schema': '2.schema.json'}, + {'$schema': 'https://ab.cd-.schema.json'}, + {'$schema': b'https://ab.cd-2.schema.json'}, + {}, + 'not dict' +]) +def test_get_schema_version_bad(instance): + """....""" + error_regex = '^no_schema_number_in_instance$' + with pytest.raises(HaketiloException, match=error_regex): + json_instances.get_schema_version(instance) + +def test_get_schema_major_number(monkeypatch): + """....""" + def mocked_get_schema_version(instance): + """....""" + assert instance == 'dummy_instance' + return (3, 4, 6) + + monkeypatch.setattr(json_instances, 'get_schema_version', + mocked_get_schema_version) + + assert json_instances.get_schema_major_number('dummy_instance') == 3 + +def test_validate_instance(monkeypatch): + """....""" + def mocked_get_schema_major_number(instance): + """....""" + assert instance == 'dummy_instance' + return 4 + + monkeypatch.setattr(json_instances, 'get_schema_major_number', + mocked_get_schema_major_number) + + class mocked_validator_for: + """....<class instead of function>""" + def __init__(self, schema_name): + """....""" + assert schema_name == 'https://ab.cd/something-4.schema.json' + + def validate(self, instance): + """....""" + assert instance == 'dummy_instance' + + monkeypatch.setattr(json_instances, 'validator_for', mocked_validator_for) + + schema_name_fmt = 'https://ab.cd/something-{}.schema.json' + assert json_instances.validate_instance( + 'dummy_instance', + schema_name_fmt + ) == 4 diff --git a/tests/test_local_apt.py b/tests/test_local_apt.py new file mode 100644 index 0000000..9122408 --- /dev/null +++ b/tests/test_local_apt.py @@ -0,0 +1,754 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest +import tempfile +import re +import json +from pathlib import Path, PurePosixPath +from zipfile import ZipFile +from tempfile import TemporaryDirectory + +from hydrilla.builder import local_apt +from hydrilla.builder.common_errors import * + +here = Path(__file__).resolve().parent + +from .helpers import * + +@pytest.fixture +def mock_cache_dir(monkeypatch): + """Make local_apt.py cache files to a temporary directory.""" + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + monkeypatch.setattr(local_apt, 'default_apt_cache_dir', td_path) + yield td_path + +@pytest.fixture +def mock_gnupg_import(monkeypatch, mock_cache_dir): + """Mock gnupg library when imported dynamically.""" + + gnupg_mock_dir = mock_cache_dir / 'gnupg_mock' + gnupg_mock_dir.mkdir() + (gnupg_mock_dir / 'gnupg.py').write_text('GPG = None\n') + + monkeypatch.syspath_prepend(str(gnupg_mock_dir)) + + import gnupg + + keyring_path = mock_cache_dir / 'master_keyring.gpg' + + class MockedImportResult: + """gnupg.ImportResult replacement""" + def __init__(self): + """Initialize MockedImportResult object.""" + self.imported = 1 + + class MockedGPG: + """GPG replacement that does not really invoke GPG.""" + def __init__(self, keyring): + """Verify the keyring path and initialize MockedGPG.""" + assert keyring == str(keyring_path) + + self.known_keys = {*keyring_path.read_text().split('\n')} \ + if keyring_path.exists() else set() + + def recv_keys(self, keyserver, key): + """Mock key receiving - record requested key as received.""" + assert keyserver == local_apt.default_keyserver + assert key not in self.known_keys + + self.known_keys.add(key) + keyring_path.write_text('\n'.join(self.known_keys)) + + return MockedImportResult() + + def list_keys(self, keys=None): + """Mock key listing - return a list with dummy items.""" + if keys is None: + return ['dummy'] * len(self.known_keys) + else: + return ['dummy' for k in keys if k in self.known_keys] + + def export_keys(self, keys, **kwargs): + """ + Mock key export - check that the call has the expected arguments and + return a dummy bytes array. + """ + assert kwargs['armor'] == False + assert kwargs['minimal'] == True + assert {*keys} == self.known_keys + + return b'<dummy keys export>' + + monkeypatch.setattr(gnupg, 'GPG', MockedGPG) + +def process_run_args(command, kwargs, expected_command): + """ + Perform assertions common to all mocked subprocess.run() invocations and + extract variable parts of the command line (if any). + """ + assert kwargs['env'] == {'LANG': 'en_US'} + assert kwargs['capture_output'] == True + + return process_command(command, expected_command) + +def run_apt_get_update(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get update' command just touch some file in apt + root to indicate that the call was made. + """ + expected = ['apt-get', '-c', '<conf_path>', 'update'] + conf_path = Path(process_run_args(command, kwargs, expected)['conf_path']) + + (conf_path.parent / 'update_called').touch() + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +""" +Output of 'apt-get install --yes --just-print libjs-mathjax' on some APT-based +system. +""" +sample_install_stdout = '''\ +NOTE: This is only a simulation! + apt-get needs root privileges for real execution. + Keep also in mind that locking is deactivated, + so don't depend on the relevance to the real current situation! +Reading package lists... +Building dependency tree... +Reading state information... +The following additional packages will be installed: + fonts-mathjax +Suggested packages: + fonts-mathjax-extras fonts-stix libjs-mathjax-doc +The following NEW packages will be installed: + fonts-mathjax libjs-mathjax +0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded. +Inst fonts-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +Inst libjs-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +Conf fonts-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +Conf libjs-mathjax (0:2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +''' + +def run_apt_get_install(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get install' command just print a possible + output of one. + """ + expected = ['apt-get', '-c', '<conf_path>', 'install', + '--yes', '--just-print', 'libjs-mathjax'] + + conf_path = Path(process_run_args(command, kwargs, expected)['conf_path']) + + return MockedCompletedProcess(command, returncode, + stdout=sample_install_stdout, + text_output=kwargs.get('text')) + +def run_apt_get_download(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get download' command just write some dummy + .deb to the appropriate directory. + """ + expected = ['apt-get', '-c', '<conf_path>', 'download'] + if 'libjs-mathjax' in command: + expected.append('libjs-mathjax') + else: + expected.append('fonts-mathjax=0:2.7.9+dfsg-1') + expected.append('libjs-mathjax=0:2.7.9+dfsg-1') + + conf_path = Path(process_run_args(command, kwargs, expected)['conf_path']) + + destination = Path(kwargs.get('cwd') or Path.cwd()) + + package_name_regex = re.compile(r'^[^=]+-mathjax') + + for word in expected: + match = package_name_regex.match(word) + if match: + filename = f'{match.group(0)}_0%3a2.7.9+dfsg-1_all.deb' + deb_path = destination / filename + deb_path.write_text(f'dummy {deb_path.name}') + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +def run_apt_get_source(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get source' command just write some dummy + "tarballs" to the appropriate directory. + """ + expected = ['apt-get', '-c', '<conf_path>', 'source', + '--download-only', 'libjs-mathjax=0:2.7.9+dfsg-1'] + if 'fonts-mathjax=0:2.7.9+dfsg-1' in command: + if command[-1] == 'fonts-mathjax=0:2.7.9+dfsg-1': + expected.append('fonts-mathjax=0:2.7.9+dfsg-1') + else: + expected.insert(-1, 'fonts-mathjax=0:2.7.9+dfsg-1') + + destination = Path(kwargs.get('cwd') or Path.cwd()) + for filename in [ + 'mathjax_2.7.9+dfsg-1.debian.tar.xz', + 'mathjax_2.7.9+dfsg-1.dsc', + 'mathjax_2.7.9+dfsg.orig.tar.xz' + ]: + (destination / filename).write_text(f'dummy {filename}') + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +def make_run_apt_get(**returncodes): + """ + Produce a function that chooses and runs the appropriate one of + subprocess_run_apt_get_*() mock functions. + """ + def mock_run(command, **kwargs): + """ + Chooses and runs the appropriate one of subprocess_run_apt_get_*() mock + functions. + """ + for subcommand, run in [ + ('update', run_apt_get_update), + ('install', run_apt_get_install), + ('download', run_apt_get_download), + ('source', run_apt_get_source) + ]: + if subcommand in command: + returncode = returncodes.get(f'{subcommand}_code', 0) + return run(command, returncode, **kwargs) + + raise Exception('Unknown command: {}'.format(' '.join(command))) + + return mock_run + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get()) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_contextmanager(mock_cache_dir): + """ + Verify that the local_apt() function creates a proper apt environment and + that it also properly restores it from cache. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + apt_root = Path(apt.apt_conf).parent.parent + + assert (apt_root / 'etc' / 'trusted.gpg').read_bytes() == \ + b'<dummy keys export>' + + assert (apt_root / 'etc' / 'update_called').exists() + + assert (apt_root / 'etc' / 'apt.sources.list').read_text() == \ + 'deb-src sth\ndeb sth' + + conf_lines = (apt_root / 'etc' / 'apt.conf').read_text().split('\n') + + # check mocked keyring + assert {*local_apt.default_keys} == \ + {*(mock_cache_dir / 'master_keyring.gpg').read_text().split('\n')} + + assert not apt_root.exists() + + expected_conf = { + 'Architecture': 'amd64', + 'Dir': str(apt_root), + 'Dir::State': f'{apt_root}/var/lib/apt', + 'Dir::State::status': f'{apt_root}/var/lib/dpkg/status', + 'Dir::Etc::SourceList': f'{apt_root}/etc/apt.sources.list', + 'Dir::Etc::SourceParts': '', + 'Dir::Cache': f'{apt_root}/var/cache/apt', + 'pkgCacheGen::Essential': 'none', + 'Dir::Etc::Trusted': f'{apt_root}/etc/trusted.gpg', + } + + conf_regex = re.compile(r'^(?P<key>\S+)\s"(?P<val>\S*)";$') + assert dict([(m.group('key'), m.group('val')) + for l in conf_lines if l for m in [conf_regex.match(l)]]) == \ + expected_conf + + with ZipFile(mock_cache_dir / f'apt_{sources_list.identity()}.zip') as zf: + # reuse the same APT, its cached zip file should exist now + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + apt_root = Path(apt.apt_conf).parent.parent + + expected_members = {*apt_root.rglob('*')} + expected_members.remove(apt_root / 'etc' / 'apt.conf') + expected_members.remove(apt_root / 'etc' / 'trusted.gpg') + + names = zf.namelist() + assert len(names) == len(expected_members) + + for name in names: + path = apt_root / name + assert path in expected_members + assert zf.read(name) == \ + (b'' if path.is_dir() else path.read_bytes()) + + assert not apt_root.exists() + +@pytest.mark.subprocess_run(local_apt, run_missing_executable) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_missing(mock_cache_dir): + """ + Verify that the local_apt() function raises a proper error when 'apt-get' + command is missing. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + + with pytest.raises(local_apt.AptError, + match='^couldnt_execute_apt-get_is_it_installed$'): + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + pass + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(update_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_update_fail(mock_cache_dir): + """ + Verify that the local_apt() function raises a proper error when + 'apt-get update' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + + error_regex = """^\ +command_apt-get -c \\S+ update_failed + +STDOUT_OUTPUT_heading + +some output + +STDERR_OUTPUT_heading + +some error output\ +$\ +""" + + with pytest.raises(local_apt.AptError, match=error_regex): + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + pass + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get()) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_download(mock_cache_dir): + """ + Verify that download_apt_packages() function properly performs the download + of .debs and sources. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, False) + + libjs_mathjax_path = destination / 'libjs-mathjax_0%3a2.7.9+dfsg-1_all.deb' + fonts_mathjax_path = destination / 'fonts-mathjax_0%3a2.7.9+dfsg-1_all.deb' + + source_paths = [ + destination / 'mathjax_2.7.9+dfsg-1.debian.tar.xz', + destination / 'mathjax_2.7.9+dfsg-1.dsc', + destination / 'mathjax_2.7.9+dfsg.orig.tar.xz' + ] + + assert {*destination.iterdir()} == {libjs_mathjax_path, *source_paths} + + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, + with_deps=True) + + assert {*destination.iterdir()} == \ + {libjs_mathjax_path, fonts_mathjax_path, *source_paths} + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(install_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_install_fail(mock_cache_dir): + """ + Verify that the download_apt_packages() function raises a proper error when + 'apt-get install' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + error_regex = f"""^\ +command_apt-get -c \\S+ install --yes --just-print libjs-mathjax_failed + +STDOUT_OUTPUT_heading + +{re.escape(sample_install_stdout)} + +STDERR_OUTPUT_heading + +some error output\ +$\ +""" + + with pytest.raises(local_apt.AptError, match=error_regex): + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, + with_deps=True) + + assert [*destination.iterdir()] == [] + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(download_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_download_fail(mock_cache_dir): + """ + Verify that the download_apt_packages() function raises a proper error when + 'apt-get download' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + error_regex = """^\ +command_apt-get -c \\S+ download libjs-mathjax_failed + +STDOUT_OUTPUT_heading + +some output + +STDERR_OUTPUT_heading + +some error output\ +$\ +""" + + with pytest.raises(local_apt.AptError, match=error_regex): + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, False) + + assert [*destination.iterdir()] == [] + +@pytest.fixture +def mock_bad_deb_file(monkeypatch, mock_subprocess_run): + """ + Make mocked 'apt-get download' command produce an incorrectly-named file. + """ + old_run = local_apt.subprocess.run + + def twice_mocked_run(command, **kwargs): + """ + Create an evil file if needed; then act just like the run() function + that got replaced by this one. + """ + if 'download' in command: + destination = Path(kwargs.get('cwd') or Path.cwd()) + (destination / 'arbitrary-name').write_text('anything') + + return old_run(command, **kwargs) + + monkeypatch.setattr(local_apt.subprocess, 'run', twice_mocked_run) + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get()) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import', + 'mock_bad_deb_file') +def test_local_apt_download_bad_filename(mock_cache_dir): + """ + Verify that the download_apt_packages() function raises a proper error when + 'apt-get download' command produces an incorrectly-named file. + """ + sources_list = local_apt.SourcesList([], 'nabia') + destination = mock_cache_dir / 'destination' + destination.mkdir() + + error_regex = """^\ +apt_download_gave_bad_filename_arbitrary-name + +STDOUT_OUTPUT_heading + +some output + +STDERR_OUTPUT_heading + +some error output\ +$\ +""" + + with pytest.raises(local_apt.AptError, match=error_regex): + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, False) + + assert [*destination.iterdir()] == [] + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(source_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_source_fail(mock_cache_dir): + """ + Verify that the download_apt_packages() function raises a proper error when + 'apt-get source' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + error_regex = """^\ +command_apt-get -c \\S* source --download-only \\S+_failed + +STDOUT_OUTPUT_heading + +some output + +STDERR_OUTPUT_heading + +some error output\ +$\ +""" + + with pytest.raises(local_apt.AptError, match=error_regex): + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, False) + + assert [*destination.iterdir()] == [] + +def test_sources_list(): + """Verify that the SourcesList class works properly.""" + list = local_apt.SourcesList([], 'nabia') + assert list.identity() == 'nabia' + + with pytest.raises(local_apt.DistroError, match='^distro_nabiał_unknown$'): + local_apt.SourcesList([], 'nabiał') + + list = local_apt.SourcesList(['deb sth', 'deb-src sth'], 'nabia') + assert list.identity() == \ + 'ef28d408b96046eae45c8ab3094ce69b2ac0c02a887e796b1d3d1a4f06fb49f1' + +def run_dpkg_deb(command, returncode=0, **kwargs): + """ + Insted of running an 'dpkg-deb -x' command just create some dummy file + in the destination directory. + """ + expected = ['dpkg-deb', '-x', '<deb_path>', '<dst_path>'] + + variables = process_run_args(command, kwargs, expected) + deb_path = Path(variables['deb_path']) + dst_path = Path(variables['dst_path']) + + package_name = re.match('^([^_]+)_.*', deb_path.name).group(1) + for path in [ + dst_path / 'etc' / f'dummy_{package_name}_config', + dst_path / 'usr/share/doc' / package_name / 'copyright' + ]: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(f'dummy {path.name}') + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +def download_apt_packages(list, keys, packages, destination_dir, + with_deps=False): + """ + Replacement for download_apt_packages() function in local_apt.py, for + unit-testing the piggybacked_system() function. + """ + for path in [ + destination_dir / 'some-bin-package_1.1-2_all.deb', + destination_dir / 'another-package_1.1-2_all.deb', + destination_dir / 'some-source-package_1.1.orig.tar.gz', + destination_dir / 'some-source-package_1.1-1.dsc' + ]: + path.write_text(f'dummy {path.name}') + + with open(destination_dir / 'test_data.json', 'w') as out: + json.dump({ + 'list_identity': list.identity(), + 'keys': keys, + 'packages': packages, + 'with_deps': with_deps + }, out) + +@pytest.fixture +def mock_download_packages(monkeypatch): + """Mock the download_apt_packages() function in local_apt.py.""" + monkeypatch.setattr(local_apt, 'download_apt_packages', + download_apt_packages) + +@pytest.mark.subprocess_run(local_apt, run_dpkg_deb) +@pytest.mark.parametrize('params', [ + { + 'with_deps': False, + 'base_depends': True, + 'identity': 'nabia', + 'props': {'distribution': 'nabia', 'dependencies': False}, + 'all_keys': local_apt.default_keys, + 'prepared_directory': False + }, + { + 'with_deps': True, + 'base_depends': False, + 'identity': '38db0b4fa2f6610cd1398b66a2c05d9abb1285f9a055a96eb96dee0f6b72aca8', + 'props': { + 'sources_list': [f'deb{suf} http://example.com/ stable main' + for suf in ('', '-src')], + 'trusted_keys': ['AB' * 20], + 'dependencies': True, + 'depend_on_base_packages': False + }, + 'all_keys': [*local_apt.default_keys, 'AB' * 20], + 'prepared_directory': True + } +]) +@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run') +def test_piggybacked_system_download(params, tmpdir): + """ + Verify that the piggybacked_system() function properly downloads and unpacks + APT packages. + """ + foreign_packages_dir = tmpdir if params['prepared_directory'] else None + + with local_apt.piggybacked_system({ + 'system': 'apt', + **params['props'], + 'packages': ['some-bin-package', 'another-package=1.1-2'] + }, foreign_packages_dir) as piggybacked: + expected_depends = [{'identifier': 'apt-common-licenses'}] \ + if params['base_depends'] else [] + assert piggybacked.resource_must_depend == expected_depends + + archive_files = dict(piggybacked.archive_files()) + + archive_names = [ + 'some-bin-package_1.1-2_all.deb', + 'another-package_1.1-2_all.deb', + 'some-source-package_1.1.orig.tar.gz', + 'some-source-package_1.1-1.dsc', + 'test_data.json' + ] + assert {*archive_files.keys()} == \ + {PurePosixPath('apt') / n for n in archive_names} + + for path in archive_files.values(): + if path.name == 'test_data.json': + assert json.loads(path.read_text()) == { + 'list_identity': params['identity'], + 'keys': params['all_keys'], + 'packages': ['some-bin-package', 'another-package=1.1-2'], + 'with_deps': params['with_deps'] + } + else: + assert path.read_text() == f'dummy {path.name}' + + if foreign_packages_dir is not None: + assert path.parent == foreign_packages_dir / 'apt' + + license_files = {*piggybacked.package_license_files} + + assert license_files == { + PurePosixPath('.apt-root/usr/share/doc/another-package/copyright'), + PurePosixPath('.apt-root/usr/share/doc/some-bin-package/copyright') + } + + assert ['dummy copyright'] * 2 == \ + [piggybacked.resolve_file(p).read_text() for p in license_files] + + for name in ['some-bin-package', 'another-package']: + path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config') + assert piggybacked.resolve_file(path).read_text() == \ + f'dummy {path.name}' + + assert piggybacked.resolve_file(PurePosixPath('a/b/c')) == None + assert piggybacked.resolve_file(PurePosixPath('')) == None + + output_text = 'loading_.apt-root/a/../../../b_outside_piggybacked_dir' + with pytest.raises(FileReferenceError, + match=f'^{re.escape(output_text)}$'): + piggybacked.resolve_file(PurePosixPath('.apt-root/a/../../../b')) + + root = piggybacked.resolve_file(PurePosixPath('.apt-root/dummy')).parent + assert root.is_dir() + + assert not root.exists() + + if foreign_packages_dir: + assert [*tmpdir.iterdir()] == [tmpdir / 'apt'] + +@pytest.mark.subprocess_run(local_apt, run_dpkg_deb) +@pytest.mark.usefixtures('mock_subprocess_run') +def test_piggybacked_system_no_download(): + """ + Verify that the piggybacked_system() function is able to use pre-downloaded + APT packages. + """ + archive_names = { + f'{package}{rest}' + for package in ('some-lib_1:2.3', 'other-lib_4.45.2') + for rest in ('-1_all.deb', '.orig.tar.gz', '-1.debian.tar.xz', '-1.dsc') + } + + with TemporaryDirectory() as td: + td = Path(td) + (td / 'apt').mkdir() + for name in archive_names: + (td / 'apt' / name).write_text(f'dummy {name}') + + with local_apt.piggybacked_system({ + 'system': 'apt', + 'distribution': 'nabia', + 'dependencies': True, + 'packages': ['whatever', 'whatever2'] + }, td) as piggybacked: + archive_files = dict(piggybacked.archive_files()) + + assert {*archive_files.keys()} == \ + {PurePosixPath('apt') / name for name in archive_names} + + for path in archive_files.values(): + assert path.read_text() == f'dummy {path.name}' + + assert {*piggybacked.package_license_files} == { + PurePosixPath('.apt-root/usr/share/doc/some-lib/copyright'), + PurePosixPath('.apt-root/usr/share/doc/other-lib/copyright') + } + + for name in ['some-lib', 'other-lib']: + path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config') + assert piggybacked.resolve_file(path).read_text() == \ + f'dummy {path.name}' + +@pytest.mark.subprocess_run(local_apt, run_missing_executable) +@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run') +def test_piggybacked_system_missing(): + """ + Verify that the piggybacked_system() function raises a proper error when + 'dpkg-deb' is missing. + """ + with pytest.raises(local_apt.AptError, + match='^couldnt_execute_dpkg-deb_is_it_installed$'): + with local_apt.piggybacked_system({ + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['some-package'], + 'dependencies': False + }, None) as piggybacked: + pass + +@pytest.mark.subprocess_run(local_apt, lambda c, **kw: run_dpkg_deb(c, 1, **kw)) +@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run') +def test_piggybacked_system_fail(): + """ + Verify that the piggybacked_system() function raises a proper error when + 'dpkg-deb -x' command returns non-0. + """ + error_regex = """^\ +command_dpkg-deb -x \\S+\\.deb \\S+_failed + +STDOUT_OUTPUT_heading + +some output + +STDERR_OUTPUT_heading + +some error output\ +$\ +""" + + with pytest.raises(local_apt.AptError, match=error_regex): + with local_apt.piggybacked_system({ + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['some-package'], + 'dependencies': False + }, None) as piggybacked: + pass diff --git a/tests/test_pattern_tree.py b/tests/test_pattern_tree.py new file mode 100644 index 0000000..4238d66 --- /dev/null +++ b/tests/test_pattern_tree.py @@ -0,0 +1,454 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest +import re +import dataclasses as dc + +from immutables import Map + +from hydrilla import pattern_tree + +from .url_patterns_common import * + +@pytest.mark.parametrize('_in, out', [ + (Map(), True), + ({'children': Map(non_empty='non_emtpy')}, False), + ({'literal_match': 'non-None'}, False), + ({'children': Map(non_empty='non_emtpy')}, False), + ({'literal_match': 'non-None', 'children': 'non-empty'}, False) +]) +def test_pattern_tree_node_is_empty(_in, out): + """....""" + assert pattern_tree.PatternTreeNode(**_in).is_empty() == out + +def test_pattern_tree_node_update_literal_match(): + """....""" + node1 = pattern_tree.PatternTreeNode() + node2 = node1.update_literal_match('dummy match item') + + assert node1.literal_match is None + assert node2.literal_match == 'dummy match item' + +def test_pattern_tree_node_get_child(): + """....""" + node = pattern_tree.PatternTreeNode(children=Map(dummy_key='dummy_val')) + + assert node.get_child('dummy_key') == 'dummy_val' + assert node.get_child('other_key') is None + +def test_pattern_tree_node_remove_child(): + """....""" + node1 = pattern_tree.PatternTreeNode(children=Map(dummy_key='dummy_val')) + node2 = node1.remove_child('dummy_key') + + assert node1.children == Map(dummy_key='dummy_val') + assert node2.children == Map() + +def test_pattern_tree_node_set_child(): + """....""" + node1 = pattern_tree.PatternTreeNode(children=Map(dummy_key='dummy_val')) + node2 = node1.set_child('other_key', 'other_val') + + assert node1.children == Map(dummy_key='dummy_val') + assert node2.children == Map(dummy_key='dummy_val', other_key='other_val') + +@pytest.mark.parametrize('root_empty', [True, False]) +def test_pattern_tree_branch_is_empty(root_empty): + """....""" + class DummyEmptyRoot: + """....""" + is_empty = lambda: root_empty + + branch = pattern_tree.PatternTreeBranch(root_node=DummyEmptyRoot) + assert branch.is_empty() == root_empty + +# def test_pattern_tree_branch_copy(): +# """....""" +# class DummyRoot: +# """....""" +# pass + +# branch1 = pattern_tree.PatternTreeBranch(root_node=DummyRoot) +# branch2 = branch1.copy() + +# assert branch1 is not branch2 +# for val_b1, val_b2 in zip(dc.astuple(branch1), dc.astuple(branch2)): +# assert val_b1 is val_b2 + +@pytest.fixture +def empty_branch(): + """....""" + return pattern_tree.PatternTreeBranch( + root_node = pattern_tree.PatternTreeNode() + ) + +@pytest.fixture +def branch_with_a_b(): + """....""" + return pattern_tree.PatternTreeBranch( + root_node = pattern_tree.PatternTreeNode( + children = Map( + a = pattern_tree.PatternTreeNode( + children = Map( + b = pattern_tree.PatternTreeNode( + literal_match = frozenset({'myitem'}) + ) + ) + ) + ) + ) + ) + +def test_pattern_tree_branch_update_add_first(empty_branch, branch_with_a_b): + """....""" + updated_branch = empty_branch.update( + ['a', 'b'], + lambda s: frozenset({*(s or []), 'myitem'}) + ) + + assert updated_branch == branch_with_a_b + assert empty_branch.root_node.children == Map() + +def test_pattern_tree_branch_update_add_second(branch_with_a_b): + """....""" + updated_branch = branch_with_a_b.update( + ['a', 'b'], + lambda s: frozenset({*(s or []), 'myotheritem'}) + ) + + leaf_node = updated_branch.root_node.children['a'].children['b'] + assert leaf_node.literal_match == frozenset({'myitem', 'myotheritem'}) + +def test_pattern_tree_branch_update_add_different_path(branch_with_a_b): + """....""" + updated_branch = branch_with_a_b.update( + ['a', 'not_b'], + lambda s: frozenset({*(s or []), 'myotheritem'}) + ) + + for segment, item in [('b', 'myitem'), ('not_b', 'myotheritem')]: + leaf_node = updated_branch.root_node.children['a'].children[segment] + assert leaf_node.literal_match == frozenset({item}) + +# def test_pattern_tree_branch_update_is_value_copied(branch_with_a_b): +# """....""" +# updated_branch = branch_with_a_b.update(['a', 'b'], lambda s: s) + +# leaf_node_orig = updated_branch.root_node.children['a'].children['b'] +# leaf_node_new = branch_with_a_b.root_node.children['a'].children['b'] + +# assert leaf_node_orig.literal_match == leaf_node_new.literal_match +# assert leaf_node_orig.literal_match is not leaf_node_new.literal_match + +def test_pattern_tree_branch_remove(branch_with_a_b, empty_branch): + """....""" + updated_branch = branch_with_a_b.update(['a', 'b'], lambda s: None) + + assert updated_branch == empty_branch + +def test_pattern_tree_branch_search_empty(empty_branch): + """....""" + assert [*empty_branch.search(['a', 'b'])] == [] + +@pytest.fixture +def branch_with_wildcards(): + """....""" + return pattern_tree.PatternTreeBranch( + root_node = pattern_tree.PatternTreeNode( + children = Map( + a = pattern_tree.PatternTreeNode( + children = Map( + b = pattern_tree.PatternTreeNode( + children = Map({ + 'c': pattern_tree.PatternTreeNode( + literal_match = 'dummy/c' + ), + '*': pattern_tree.PatternTreeNode( + literal_match = 'dummy/*' + ), + '**': pattern_tree.PatternTreeNode( + literal_match = 'dummy/**' + ), + '***': pattern_tree.PatternTreeNode( + literal_match = 'dummy/***' + ) + }) + ) + ) + ) + ) + ) + ) + +@pytest.mark.parametrize('_in, out', [ + (['a'], []), + (['a', 'x', 'y', 'z'], []), + (['a', 'b'], ['dummy/***']), + (['a', 'b', 'c'], ['dummy/c', 'dummy/*', 'dummy/***']), + (['a', 'b', 'u'], ['dummy/*', 'dummy/***']), + (['a', 'b', '*'], ['dummy/*', 'dummy/***']), + (['a', 'b', '**'], ['dummy/**', 'dummy/*', 'dummy/***']), + (['a', 'b', '***'], ['dummy/***', 'dummy/*']), + (['a', 'b', 'u', 'l'], ['dummy/**', 'dummy/***']), + (['a', 'b', 'u', 'l', 'y'], ['dummy/**', 'dummy/***']) +]) +def test_pattern_tree_branch_search_wildcards(_in, out, branch_with_wildcards): + """....""" + assert [*branch_with_wildcards.search(_in)] == out + +def test_filter_by_trailing_slash(sample_url_parsed): + """....""" + sample_url_parsed2 = dc.replace(sample_url_parsed, has_trailing_slash=True) + item1 = pattern_tree.StoredTreeItem(sample_url_parsed, 'dummy_it1') + item2 = pattern_tree.StoredTreeItem(sample_url_parsed2, 'dummy_it2') + + assert pattern_tree.filter_by_trailing_slash((item1, item2), False) == \ + frozenset({item1}) + + assert pattern_tree.filter_by_trailing_slash((item1, item2), True) == \ + frozenset({item2}) + +@pytest.mark.parametrize('register_mode', [True, False]) +@pytest.mark.parametrize('empty_at_start', [True, False]) +@pytest.mark.parametrize('empty_at_end', [True, False]) +def test_pattern_tree_privatemethod_register( + register_mode, + empty_at_start, + empty_at_end, + monkeypatch, + sample_url_parsed +): + """....""" + dummy_it = pattern_tree.StoredTreeItem(sample_url_parsed, 'dummy_it') + other_dummy_it = pattern_tree.StoredTreeItem( + sample_url_parsed, + 'other_dummy_it' + ) + + class MockedTreeBranch: + """....""" + def is_empty(self): + """....""" + return empty_at_end + + def update(self, segments, item_updater): + """....""" + if segments == ('com', 'example'): + return self._update_as_domain_branch(item_updater) + else: + assert segments == ('aa', 'bb') + return self._update_as_path_branch(item_updater) + + def _update_as_domain_branch(self, item_updater): + """....""" + for updater_input in (None, MockedTreeBranch()): + updated = item_updater(updater_input) + if empty_at_end: + assert updated is None + else: + assert type(updated) is MockedTreeBranch + + return MockedTreeBranch() + + def _update_as_path_branch(self, item_updater): + """....""" + set_with_1_item = frozenset() + set_with_2_items = frozenset({dummy_it, other_dummy_it}) + for updater_input in (None, set_with_1_item, set_with_2_items): + updated = item_updater(updater_input) + if register_mode: + assert dummy_it in updated + elif updater_input is set_with_2_items: + assert dummy_it not in updated + else: + assert updated is None + + return MockedTreeBranch() + + monkeypatch.setattr(pattern_tree, 'PatternTreeBranch', MockedTreeBranch) + + initial_root = Map() if empty_at_start else \ + Map({('http', 80): MockedTreeBranch()}) + + tree = pattern_tree.PatternTree(_by_scheme_and_port=initial_root) + + new_tree = tree._register( + sample_url_parsed, + 'dummy_it', + register=register_mode + ) + + assert new_tree is not tree + + if empty_at_end: + assert new_tree._by_scheme_and_port == Map() + else: + assert len(new_tree._by_scheme_and_port) == 1 + assert type(new_tree._by_scheme_and_port[('http', 80)]) is \ + MockedTreeBranch + +# @pytest.mark.parametrize('register_mode', [True, False]) +# def test_pattern_tree_privatemethod_register( +# register_mode, +# monkeypatch, +# sample_url_parsed +# ): +# """....""" +# registered_count = 0 + +# def mocked_parse_pattern(url_pattern): +# """....""" +# assert url_pattern == 'dummy_pattern' + +# for _ in range(2): +# yield sample_url_parsed + +# monkeypatch.setattr(pattern_tree, 'parse_pattern', mocked_parse_pattern) + +# def mocked_reconstruct_url(self): +# """....""" +# return 'dummy_reconstructed_pattern' + +# monkeypatch.setattr(pattern_tree.ParsedUrl, 'reconstruct_url', +# mocked_reconstruct_url) + +# def mocked_register_with_parsed_pattern( +# self, +# parsed_pat, +# wrapped_item, +# register=True +# ): +# """....""" +# nonlocal registered_count + +# assert parsed_pat is sample_url_parsed +# assert wrapped_item.pattern == 'dummy_reconstructed_pattern' +# assert register == register_mode + +# registered_count += 1 + +# return 'dummy_new_tree' if registered_count == 2 else dc.replace(self) + +# monkeypatch.setattr( +# pattern_tree.PatternTree, +# '_register_with_parsed_pattern', +# mocked_register_with_parsed_pattern +# ) + +# pattern_tree = pattern_tree.PatternTree() + +# new_tree = pattern_tree._register( +# 'dummy_pattern', +# 'dummy_item', +# register_mode +# ) + +# assert new_tree == 'dummy_new_tree' + +@pytest.mark.parametrize('method_name, register_mode', [ + ('register', True), + ('deregister', False) +]) +def test_pattern_tree_register(method_name, register_mode, monkeypatch): + """....""" + def mocked_privatemethod_register(self, parsed_pat, item, register=True): + """....""" + assert (parsed_pat, item, register) == \ + ('dummy_pattern', 'dummy_url', register_mode) + + return 'dummy_new_tree' + + monkeypatch.setattr( + pattern_tree.PatternTree, + '_register', + mocked_privatemethod_register + ) + + method = getattr(pattern_tree.PatternTree(), method_name) + assert method('dummy_pattern', 'dummy_url') == 'dummy_new_tree' + +@pytest.fixture +def mock_parse_url(monkeypatch, sample_url_parsed): + """....""" + def mocked_parse_url(url): + """....""" + assert url == 'dummy_url' + return dc.replace( + sample_url_parsed, + **getattr(mocked_parse_url, 'url_mod', {}) + ) + + monkeypatch.setattr(pattern_tree, 'parse_url', mocked_parse_url) + + return mocked_parse_url + +@pytest.mark.usefixtures('mock_parse_url') +def test_pattern_tree_search_empty(sample_url_parsed): + """....""" + for url in ('dummy_url', sample_url_parsed): + assert [*pattern_tree.PatternTree().search(url)] == [] + +@pytest.mark.parametrize('url_mod, out', [ + ({}, + ['dummy_set_A', 'dummy_set_B', 'dummy_set_C']), + + ({'has_trailing_slash': True}, + ['dummy_set_A_with_slash', 'dummy_set_A', + 'dummy_set_B_with_slash', 'dummy_set_B', + 'dummy_set_C_with_slash', 'dummy_set_C']) +]) +def test_pattern_tree_search( + url_mod, + out, + monkeypatch, + sample_url_parsed, + mock_parse_url, +): + """....""" + mock_parse_url.url_mod = url_mod + + dummy_tree_contents = [ + ['dummy_set_A', 'dummy_set_B'], + [], + ['dummy_empty_set'] * 3, + ['dummy_set_C'] + ] + + def mocked_filter_by_trailing_slash(items, with_slash): + """....""" + if items == 'dummy_empty_set': + return frozenset() + + return items + ('_with_slash' if with_slash else '') + + monkeypatch.setattr(pattern_tree, 'filter_by_trailing_slash', + mocked_filter_by_trailing_slash) + + class MockedDomainBranch: + """....""" + def search(self, labels): + """....""" + assert labels == sample_url_parsed.domain_labels + + for item_sets in dummy_tree_contents: + class MockedPathBranch: + """....""" + def search(self, segments, item_sets=item_sets): + """....""" + assert segments == sample_url_parsed.path_segments + + for dummy_items_set in item_sets: + yield dummy_items_set + + yield MockedPathBranch() + + tree = pattern_tree.PatternTree( + _by_scheme_and_port = {('http', 80): MockedDomainBranch()} + ) + + for url in ('dummy_url', mock_parse_url('dummy_url')): + assert [*tree.search(url)] == out diff --git a/tests/test_server.py b/tests/test_server.py index 02b9742..854b5f0 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -24,9 +24,6 @@ # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. -# Enable using with Python 3.7. -from __future__ import annotations - import pytest import sys import shutil @@ -42,9 +39,9 @@ from flask.testing import FlaskClient from markupsafe import escape from werkzeug import Response -from hydrilla import util as hydrilla_util +from hydrilla import _version, json_instances from hydrilla.builder import Build -from hydrilla.server import config, _version +from hydrilla.server import config from hydrilla.server.serve import HydrillaApp here = Path(__file__).resolve().parent @@ -125,7 +122,7 @@ def index_json_modification(modify_index_json): def handle_index_json(setup): """Modify index.json before build.""" index_path = setup.source_dir / 'index.json' - index_json, _ = hydrilla_util.load_instance_from_file(index_path) + index_json, _ = json_instances.read_instance(index_path) index_json = modify_index_json(index_json) or index_json @@ -193,8 +190,8 @@ def test_get_newest(setup: Setup, item_type: str) -> None: assert ('uuid' in definition) == (setup is not uuidless_setup) - hydrilla_util.validator_for(f'api_{item_type}_description-1.0.1.schema.json')\ - .validate(definition) + schema_name = f'api_{item_type}_description-1.0.1.schema.json' + json_instances.validator_for(schema_name).validate(definition) @pytest.mark.parametrize('item_type', ['resource', 'mapping']) def test_get_nonexistent(item_type: str) -> None: @@ -241,8 +238,8 @@ def test_empty_query() -> None: 'generated_by': expected_generated_by } - hydrilla_util.validator_for('api_query_result-1.0.1.schema.json')\ - .validate(response_object) + schema_name = 'api_query_result-1.0.1.schema.json' + json_instances.validator_for(schema_name).validate(response_object) def test_query() -> None: """ @@ -264,8 +261,8 @@ def test_query() -> None: 'generated_by': expected_generated_by } - hydrilla_util.validator_for('api_query_result-1.schema.json')\ - .validate(response_object) + schema_name = 'api_query_result-1.schema.json' + json_instances.validator_for(schema_name).validate(response_object) def test_source() -> None: """Verify source descriptions are properly served.""" @@ -282,8 +279,8 @@ def test_source() -> None: response = def_get(f'/source/hello.zip') assert sha256(response.data).digest().hex() == zipfile_hash - hydrilla_util.validator_for('api_source_description-1.schema.json')\ - .validate(description) + schema_name = 'api_source_description-1.schema.json' + json_instances.validator_for(schema_name).validate(description) def test_missing_source() -> None: """Verify requests for nonexistent sources result in 404.""" @@ -292,8 +289,3 @@ def test_missing_source() -> None: response = def_get(f'/source/nonexistent.zip') assert response.status_code == 404 - -def test_normalize_version(): - assert hydrilla_util.normalize_version([4, 5, 3, 0, 0]) == [4, 5, 3] - assert hydrilla_util.normalize_version([1, 0, 5, 0]) == [1, 0, 5] - assert hydrilla_util.normalize_version([3, 3]) == [3, 3] diff --git a/tests/test_url_patterns.py b/tests/test_url_patterns.py new file mode 100644 index 0000000..c308f18 --- /dev/null +++ b/tests/test_url_patterns.py @@ -0,0 +1,188 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest +import re +import dataclasses as dc + +from immutables import Map + +from hydrilla import url_patterns +from hydrilla.exceptions import HaketiloException + +from .url_patterns_common import * + +# @pytest.mark.parametrize('_in, out', [ +# ({}, sample_url_str), +# ({'path_segments': ()}, 'http://example.com'), +# ({'has_trailing_slash': True}, 'http://example.com/aa/bb/'), +# ({'orig_scheme': 'http*'}, 'http*://example.com/aa/bb'), +# ({'scheme': 'http_sth'}, 'http://example.com/aa/bb'), +# ({'port': 443}, 'http://example.com:443/aa/bb'), + +# ({'path_segments': (), +# 'has_trailing_slash': True}, +# 'http://example.com/'), + +# ({'orig_scheme': 'https', +# 'scheme': 'https', +# 'port': 443}, +# 'https://example.com/aa/bb'), + +# ({'orig_scheme': 'ftp', +# 'scheme': 'ftp', +# 'port': 21}, +# 'ftp://example.com/aa/bb'), + +# ({'orig_scheme': 'file', +# 'scheme': 'file', +# 'port': None, +# 'domain_labels': ()}, +# 'file:///aa/bb') +# ]) +# def test_reconstruct_parsed_url(_in, out, sample_url_parsed): +# """Test the reconstruct_url() method of ParsedUrl class.""" +# parsed_url = dc.replace(sample_url_parsed, **_in) +# assert parsed_url.reconstruct_url() == out + +@pytest.mark.parametrize('_in, out', [ + ({'url': sample_url_str}, {}), + ({'url': 'http://example.com:80/aa/bb'}, {}), + ({'url': 'http://example.com//aa///bb'}, {}), + ({'url': 'http://example...com/aa/bb'}, {}), + ({'url': 'http://example.com/aa/bb?c=d#ef'}, {}), + ({'url': 'http://example.com'}, {'path_segments': ()}), + ({'url': 'http://example.com/aa/bb/'}, {'has_trailing_slash': True}), + ({'url': 'http://example.com:443/aa/bb'}, {'port': 443}), + + ({'url': 'http://example.com/'}, + {'path_segments': (), + 'has_trailing_slash': True}), + + ({'url': 'http://example.com/aa/bb', + 'is_pattern': True, + 'orig_url': 'http*://example.com/aa/bb/'}, + {}), + + ({'url': 'https://example.com/aa/bb'}, + {'scheme': 'https', + 'port': 443}), + + ({'url': 'ftp://example.com/aa/bb'}, + {'scheme': 'ftp', + 'port': 21}), + + ({'url': 'file:///aa/bb'}, + {'scheme': 'file', + 'port': None, + 'domain_labels': ()}) +]) +def test_parse_pattern_or_url(_in, out, sample_url_parsed): + """Test normal use (no errors) of the _parse_pattern_or_url() function.""" + if 'orig_url' not in _in: + _in = {**_in, 'orig_url': _in['url']} + + out = {**out, 'orig_url': _in['orig_url']} + + parsed_url = url_patterns._parse_pattern_or_url(**_in) + assert parsed_url == dc.replace(sample_url_parsed, **out) + +@pytest.mark.parametrize('_in, err', [ + ({'url': 'file://:78/unexpected/port'}, 'err.url_{}.bad'), + ({'url': 'file://unexpected.hostname/'}, 'err.url_{}.bad'), + ({'url': 'http:///no/hostname'}, 'err.url_{}.bad'), + ({'url': 'invalid?://example.com'}, 'err.url_{}.bad'), + ({'url': 'invalid?://example.com', + 'orig_url': 'invalid?://example.com', + 'is_pattern': True}, + 'err.url_pattern_{}.bad'), + + ({'url': 'unknown://example.com'}, 'err.url_{}.bad_scheme'), + ({'url': 'unknown://example.com', + 'orig_url': 'unknown://example.com', + 'is_pattern': True}, + 'err.url_pattern_{}.bad_scheme'), + + ({'url': 'http://example.com:80', + 'orig_url': 'http*://example.com:80', + 'is_pattern': True}, + 'err.url_pattern_{}.special_scheme_port'), + + ({'url': 'http://example.com:65536'}, 'err.url_{}.bad_port'), + ({'url': 'http://example.com:0'}, 'err.url_{}.bad_port'), + ({'url': 'http://example.com:65537', + 'orig_url': 'http://example.com:65537', + 'is_pattern': True}, + 'err.url_pattern_{}.bad_port'), + + ({'url': 'http://example.com/?a=b', + 'orig_url': 'http://example.com/?a=b', + 'is_pattern': True}, + 'err.url_pattern_{}.has_query'), + + ({'url': 'http://example.com/#abc', + 'orig_url': 'http://example.com/#abc', + 'is_pattern': True}, + 'err.url_pattern_{}.has_frag') +]) +def test_parse_pattern_or_url_err(_in, err, sample_url_parsed): + """Test error conditions of the _parse_pattern_or_url() function.""" + if 'orig_url' not in _in: + _in = {**_in, 'orig_url': _in['url']} + + err_url = _in['orig_url'] + err_regex = err.format(re.escape(err_url)) + + with pytest.raises(HaketiloException, match=f'^{err_regex}$'): + url_patterns._parse_pattern_or_url(**_in) + +def test_parse_pattern_or_url_different_urls(): + """ + Verify the _parse_pattern_or_url() function allows passed URLs to be + different only when parsing a pattern. + """ + urls = [sample_url_str, sample_url_str.replace('http', 'http*')] + + url_patterns._parse_pattern_or_url(*urls, is_pattern=True) + + with pytest.raises(AssertionError): + url_patterns._parse_pattern_or_url(*urls) + +@pytest.mark.parametrize('_in, out', [ + ('http://example.com', ('mocked_pr_http://example.com',)), + ('ftp://example.com', ('mocked_pr_ftp://example.com',)), + ('http*://example.com', ('mocked_pr_http://example.com', + 'mocked_pr_https://example.com')) +]) +def test_parse_pattern(monkeypatch, _in, out): + """....""" + def mocked_parse_pattern_or_url(url, orig_url, is_pattern=False): + """....""" + assert is_pattern + assert orig_url == _in + + return f'mocked_pr_{url}' + + monkeypatch.setattr(url_patterns, '_parse_pattern_or_url', + mocked_parse_pattern_or_url) + + assert url_patterns.parse_pattern(_in) == out + +def test_parse_url(monkeypatch): + """....""" + def mocked_parse_pattern_or_url(url, orig_url): + """....""" + return f'mocked_pr_{url}' + + monkeypatch.setattr(url_patterns, '_parse_pattern_or_url', + mocked_parse_pattern_or_url) + + assert url_patterns.parse_url('https://example.com') == \ + 'mocked_pr_https://example.com' + +def test_parsed_url_hash(sample_url_parsed): + """....""" + hash(sample_url_parsed) diff --git a/tests/test_versions.py b/tests/test_versions.py new file mode 100644 index 0000000..43a3f33 --- /dev/null +++ b/tests/test_versions.py @@ -0,0 +1,41 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest + +from hydrilla import versions + +sample_version_tuples = [(4, 5, 3), (1, 0, 5), (3,)] +sample_version_strings = ['4.5.3', '1.0.5', '3'] + +sample_versions = [*zip(sample_version_tuples, sample_version_strings)] + +@pytest.mark.parametrize('version_tuple', sample_version_tuples) +def test_normalize_version(version_tuple): + """Verify that normalize_version() produces proper results.""" + assert versions.normalize_version([*version_tuple]) == version_tuple + assert versions.normalize_version([*version_tuple, 0]) == version_tuple + +@pytest.mark.parametrize('version_tuple, string', sample_versions) +def test_parse_version(version_tuple, string): + """Verify that parse_version() produces proper results.""" + assert versions.parse_version(string) + assert versions.parse_version(string + '.0') == tuple([*version_tuple, 0]) + +def test_parse_version_bad_string(): + """Verify that parse_version() raises when passed an invalid string.""" + with pytest.raises(ValueError): + versions.parse_version('i am not a valid version') + +@pytest.mark.parametrize('version_tuple, string', sample_versions) +def test_version_string(version_tuple, string): + """Verify that version_string() produces proper results.""" + for _version_tuple, _string in [ + (version_tuple, string), + (tuple([*version_tuple, 0]), f'{string}.0') + ]: + assert versions.version_string(_version_tuple) == _string + assert versions.version_string(_version_tuple, 5) == f'{_string}-5' diff --git a/tests/url_patterns_common.py b/tests/url_patterns_common.py new file mode 100644 index 0000000..de6651d --- /dev/null +++ b/tests/url_patterns_common.py @@ -0,0 +1,23 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest + +from hydrilla import url_patterns + +sample_url_str = 'http://example.com/aa/bb' + +@pytest.fixture(scope='session') +def sample_url_parsed(): + """Generate a simple ParsedUrl object.""" + return url_patterns.ParsedUrl( + orig_url = sample_url_str, + scheme = 'http', + domain_labels = ('com', 'example'), + path_segments = ('aa', 'bb'), + has_trailing_slash = False, + port = 80 + ) |