diff options
author | Wojtek Kosior <koszko@koszko.org> | 2022-05-02 21:26:59 +0200 |
---|---|---|
committer | Wojtek Kosior <koszko@koszko.org> | 2022-05-10 12:41:38 +0200 |
commit | 61f0aa75c64732063988826400ebc9f8e01ee3bb (patch) | |
tree | 3f1fadb196afe06892194eb31c731964c0f62f21 | |
parent | 9dda3aa988a9482d6292a655f4846f7d4b450315 (diff) | |
download | hydrilla-builder-61f0aa75c64732063988826400ebc9f8e01ee3bb.tar.gz hydrilla-builder-61f0aa75c64732063988826400ebc9f8e01ee3bb.zip |
support piggybacking on APT packages
-rw-r--r-- | .gitmodules | 4 | ||||
-rw-r--r-- | conftest.py | 25 | ||||
-rw-r--r-- | pyproject.toml | 5 | ||||
-rw-r--r-- | src/hydrilla/builder/build.py | 298 | ||||
-rw-r--r-- | src/hydrilla/builder/common_errors.py | 67 | ||||
-rw-r--r-- | src/hydrilla/builder/local_apt.py | 428 | ||||
-rw-r--r-- | src/hydrilla/builder/piggybacking.py | 115 | ||||
m--------- | src/hydrilla/schemas | 0 | ||||
-rw-r--r-- | tests/__init__.py | 5 | ||||
-rw-r--r-- | tests/helpers.py | 51 | ||||
-rw-r--r-- | tests/test_build.py | 674 | ||||
-rw-r--r-- | tests/test_hydrilla_builder.py | 472 | ||||
-rw-r--r-- | tests/test_local_apt.py | 651 |
13 files changed, 2183 insertions, 612 deletions
diff --git a/.gitmodules b/.gitmodules index 6e47d90..ccb70a3 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,9 +4,9 @@ # # Available under the terms of Creative Commons Zero v1.0 Universal. -[submodule "src/hydrilla/schemas"] +[submodule "hydrilla-json-schemas"] path = src/hydrilla/schemas url = ../hydrilla-json-schemas -[submodule "src/test/source-package-example"] +[submodule "hydrilla-source-package-example"] path = tests/source-package-example url = ../hydrilla-source-package-example diff --git a/conftest.py b/conftest.py index 1aef80a..141cba5 100644 --- a/conftest.py +++ b/conftest.py @@ -7,5 +7,30 @@ import sys from pathlib import Path +import pytest + here = Path(__file__).resolve().parent sys.path.insert(0, str(here / 'src')) + +@pytest.fixture(autouse=True) +def no_requests(monkeypatch): + """Remove requests.sessions.Session.request for all tests.""" + monkeypatch.delattr('requests.sessions.Session.request') + +@pytest.fixture +def mock_subprocess_run(monkeypatch, request): + """ + Temporarily replace subprocess.run() with a function supplied through pytest + marker 'subprocess_run'. + + The marker excepts 2 arguments: + * the module inside which the subprocess attribute should be mocked and + * a run() function to use. + """ + where, mocked_run = request.node.get_closest_marker('subprocess_run').args + + class MockedSubprocess: + """Minimal mocked version of the subprocess module.""" + run = mocked_run + + monkeypatch.setattr(where, 'subprocess', MockedSubprocess) diff --git a/pyproject.toml b/pyproject.toml index 968455f..41eaf49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,10 @@ write_to = "src/hydrilla/builder/_version.py" [tool.pytest.ini_options] minversion = "6.0" -addopts = "-ra -q" +addopts = "-ra" testpaths = [ "tests" ] +markers = [ + "subprocess_run: define how mocked subprocess.run should behave" +] diff --git a/src/hydrilla/builder/build.py b/src/hydrilla/builder/build.py index 8eec4a4..ce4935c 100644 --- a/src/hydrilla/builder/build.py +++ b/src/hydrilla/builder/build.py @@ -30,21 +30,28 @@ from __future__ import annotations import json import re import zipfile -from pathlib import Path +import subprocess +from pathlib import Path, PurePosixPath from hashlib import sha256 from sys import stderr +from contextlib import contextmanager +from tempfile import TemporaryDirectory, TemporaryFile +from typing import Optional, Iterable, Union import jsonschema import click from .. import util from . import _version +from . import local_apt +from .piggybacking import Piggybacked +from .common_errors import * here = Path(__file__).resolve().parent _ = util.translation(here / 'locales').gettext -index_validator = util.validator_for('package_source-1.0.1.schema.json') +index_validator = util.validator_for('package_source-2.schema.json') schemas_root = 'https://hydrilla.koszko.org/schemas' @@ -53,202 +60,201 @@ generated_by = { 'version': _version.version } -class FileReferenceError(Exception): - """ - Exception used to report various problems concerning files referenced from - source package's index.json. - """ - -class ReuseError(Exception): +class ReuseError(SubprocessError): """ Exception used to report various problems when calling the REUSE tool. """ -class FileBuffer: - """ - Implement a file-like object that buffers data written to it. - """ - def __init__(self): - """ - Initialize FileBuffer. - """ - self.chunks = [] - - def write(self, b): - """ - Buffer 'b', return number of bytes buffered. - - 'b' is expected to be an instance of 'bytes' or 'str', in which case it - gets encoded as UTF-8. - """ - if type(b) is str: - b = b.encode() - self.chunks.append(b) - return len(b) - - def flush(self): - """ - A no-op mock of file-like object's flush() method. - """ - pass - - def get_bytes(self): - """ - Return all data written so far concatenated into a single 'bytes' - object. - """ - return b''.join(self.chunks) - -def generate_spdx_report(root): +def generate_spdx_report(root: Path) -> bytes: """ Use REUSE tool to generate an SPDX report for sources under 'root' and return the report's contents as 'bytes'. - 'root' shall be an instance of pathlib.Path. - In case the directory tree under 'root' does not constitute a - REUSE-compliant package, linting report is printed to standard output and - an exception is raised. + REUSE-compliant package, as exception is raised with linting report + included in it. - In case the reuse package is not installed, an exception is also raised. + In case the reuse tool is not installed, an exception is also raised. """ - try: - from reuse._main import main as reuse_main - except ModuleNotFoundError: - raise ReuseError(_('couldnt_import_reuse_is_it_installed')) + for command in [ + ['reuse', '--root', str(root), 'lint'], + ['reuse', '--root', str(root), 'spdx'] + ]: + try: + cp = subprocess.run(command, capture_output=True, text=True) + except FileNotFoundError: + raise ReuseError(_('couldnt_execute_reuse_is_it_installed')) - mocked_output = FileBuffer() - if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0: - stderr.write(mocked_output.get_bytes().decode()) - raise ReuseError(_('spdx_report_from_reuse_incompliant')) + if cp.returncode != 0: + msg = _('reuse_command_{}_failed').format(' '.join(command)) + raise ReuseError(msg, cp) - mocked_output = FileBuffer() - if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0: - stderr.write(mocked_output.get_bytes().decode()) - raise ReuseError("Couldn't generate an SPDX report for package.") - - return mocked_output.get_bytes() + return cp.stdout.encode() class FileRef: """Represent reference to a file in the package.""" - def __init__(self, path: Path, contents: bytes): + def __init__(self, path: PurePosixPath, contents: bytes) -> None: """Initialize FileRef.""" - self.include_in_distribution = False - self.include_in_zipfile = True - self.path = path - self.contents = contents + self.include_in_distribution = False + self.include_in_source_archive = True + self.path = path + self.contents = contents self.contents_hash = sha256(contents).digest().hex() - def make_ref_dict(self, filename: str): + def make_ref_dict(self) -> dict[str, str]: """ Represent the file reference through a dict that can be included in JSON defintions. """ return { - 'file': filename, + 'file': str(self.path), 'sha256': self.contents_hash } +@contextmanager +def piggybacked_system(piggyback_def: Optional[dict], + piggyback_files: Optional[Path]) \ + -> Iterable[Piggybacked]: + """ + Resolve resources from a foreign software packaging system. Optionally, use + package files (.deb's, etc.) from a specified directory instead of resolving + and downloading them. + """ + if piggyback_def is None: + yield Piggybacked() + else: + # apt is the only supported system right now + assert piggyback_def['system'] == 'apt' + + with local_apt.piggybacked_system(piggyback_def, piggyback_files) \ + as piggybacked: + yield piggybacked + class Build: """ Build a Hydrilla package. """ - def __init__(self, srcdir, index_json_path): + def __init__(self, srcdir: Path, index_json_path: Path, + piggyback_files: Optional[Path]=None): """ Initialize a build. All files to be included in a distribution package are loaded into memory, all data gets validated and all necessary computations (e.g. preparing of hashes) are performed. - - 'srcdir' and 'index_json' are expected to be pathlib.Path objects. """ self.srcdir = srcdir.resolve() - self.index_json_path = index_json_path + self.piggyback_files = piggyback_files + # TODO: the piggyback files we set are ignored for now; use them + if piggyback_files is None: + piggyback_default_path = \ + srcdir.parent / f'{srcdir.name}.foreign-packages' + if piggyback_default_path.exists(): + self.piggyback_files = piggyback_default_path self.files_by_path = {} self.resource_list = [] self.mapping_list = [] if not index_json_path.is_absolute(): - self.index_json_path = (self.srcdir / self.index_json_path) - - self.index_json_path = self.index_json_path.resolve() + index_json_path = (self.srcdir / index_json_path) - with open(self.index_json_path, 'rt') as index_file: + with open(index_json_path, 'rt') as index_file: index_json_text = index_file.read() index_obj = json.loads(util.strip_json_comments(index_json_text)) - self.files_by_path[self.srcdir / 'index.json'] = \ - FileRef(self.srcdir / 'index.json', index_json_text.encode()) + index_desired_path = PurePosixPath('index.json') + self.files_by_path[index_desired_path] = \ + FileRef(index_desired_path, index_json_text.encode()) self._process_index_json(index_obj) - def _process_file(self, filename: str, include_in_distribution: bool=True): + def _process_file(self, filename: Union[str, PurePosixPath], + piggybacked: Piggybacked, + include_in_distribution: bool=True): """ Resolve 'filename' relative to srcdir, load it to memory (if not loaded before), compute its hash and store its information in 'self.files_by_path'. - 'filename' shall represent a relative path using '/' as a separator. + 'filename' shall represent a relative path withing package directory. if 'include_in_distribution' is True it shall cause the file to not only be included in the source package's zipfile, but also written as one of built package's files. + For each file an attempt is made to resolve it using 'piggybacked' + object. If a file is found and pulled from foreign software packaging + system this way, it gets automatically excluded from inclusion in + Hydrilla source package's zipfile. + Return file's reference object that can be included in JSON defintions of various kinds. """ - path = self.srcdir - for segment in filename.split('/'): - path /= segment - - path = path.resolve() - if not path.is_relative_to(self.srcdir): - raise FileReferenceError(_('loading_{}_outside_package_dir') - .format(filename)) - - if str(path.relative_to(self.srcdir)) == 'index.json': - raise FileReferenceError(_('loading_reserved_index_json')) + include_in_source_archive = True + + desired_path = PurePosixPath(filename) + if '..' in desired_path.parts: + msg = _('path_contains_double_dot_{}').format(filename) + raise FileReferenceError(msg) + + path = piggybacked.resolve_file(desired_path) + if path is None: + path = (self.srcdir / desired_path).resolve() + if not path.is_relative_to(self.srcdir): + raise FileReferenceError(_('loading_{}_outside_package_dir') + .format(filename)) + + if str(path.relative_to(self.srcdir)) == 'index.json': + raise FileReferenceError(_('loading_reserved_index_json')) + else: + include_in_source_archive = False - file_ref = self.files_by_path.get(path) + file_ref = self.files_by_path.get(desired_path) if file_ref is None: with open(path, 'rb') as file_handle: contents = file_handle.read() - file_ref = FileRef(path, contents) - self.files_by_path[path] = file_ref + file_ref = FileRef(desired_path, contents) + self.files_by_path[desired_path] = file_ref if include_in_distribution: file_ref.include_in_distribution = True - return file_ref.make_ref_dict(filename) + if not include_in_source_archive: + file_ref.include_in_source_archive = False + + return file_ref.make_ref_dict() - def _prepare_source_package_zip(self, root_dir_name: str): + def _prepare_source_package_zip(self, source_name: str, + piggybacked: Piggybacked) -> str: """ Create and store in memory a .zip archive containing files needed to build this source package. - 'root_dir_name' shall not contain any slashes ('/'). + 'src_dir_name' shall not contain any slashes ('/'). Return zipfile's sha256 sum's hexstring. """ - fb = FileBuffer() - root_dir_path = Path(root_dir_name) + tf = TemporaryFile() + source_dir_path = PurePosixPath(source_name) + piggybacked_dir_path = PurePosixPath(f'{source_name}.foreign-packages') - def zippath(file_path): - file_path = root_dir_path / file_path.relative_to(self.srcdir) - return file_path.as_posix() - - with zipfile.ZipFile(fb, 'w') as xpi: + with zipfile.ZipFile(tf, 'w') as zf: for file_ref in self.files_by_path.values(): - if file_ref.include_in_zipfile: - xpi.writestr(zippath(file_ref.path), file_ref.contents) + if file_ref.include_in_source_archive: + zf.writestr(str(source_dir_path / file_ref.path), + file_ref.contents) + + for desired_path, real_path in piggybacked.archive_files(): + zf.writestr(str(piggybacked_dir_path / desired_path), + real_path.read_bytes()) - self.source_zip_contents = fb.get_bytes() + tf.seek(0) + self.source_zip_contents = tf.read() return sha256(self.source_zip_contents).digest().hex() - def _process_item(self, item_def: dict): + def _process_item(self, item_def: dict, piggybacked: Piggybacked): """ Process 'item_def' as definition of a resource/mapping and store in memory its processed form and files used by it. @@ -266,14 +272,14 @@ class Build: copy_props.append('revision') - script_file_refs = [self._process_file(f['file']) + script_file_refs = [self._process_file(f['file'], piggybacked) for f in item_def.get('scripts', [])] deps = [{'identifier': res_ref['identifier']} for res_ref in item_def.get('dependencies', [])] new_item_obj = { - 'dependencies': deps, + 'dependencies': [*piggybacked.package_must_depend, *deps], 'scripts': script_file_refs } else: @@ -308,41 +314,54 @@ class Build: in it. """ index_validator.validate(index_obj) + match = re.match(r'.*-((([1-9][0-9]*|0)\.)+)schema\.json$', + index_obj['$schema']) + self.source_schema_ver = \ + [int(n) for n in filter(None, match.group(1).split('.'))] - schema = f'{schemas_root}/api_source_description-1.schema.json' + out_schema = f'{schemas_root}/api_source_description-1.schema.json' self.source_name = index_obj['source_name'] generate_spdx = index_obj.get('reuse_generate_spdx_report', False) if generate_spdx: contents = generate_spdx_report(self.srcdir) - spdx_path = (self.srcdir / 'report.spdx').resolve() + spdx_path = PurePosixPath('report.spdx') spdx_ref = FileRef(spdx_path, contents) - spdx_ref.include_in_zipfile = False + spdx_ref.include_in_source_archive = False self.files_by_path[spdx_path] = spdx_ref - self.copyright_file_refs = \ - [self._process_file(f['file']) for f in index_obj['copyright']] + piggyback_def = None + if self.source_schema_ver >= [1, 1] and 'piggyback_on' in index_obj: + piggyback_def = index_obj['piggyback_on'] - if generate_spdx and not spdx_ref.include_in_distribution: - raise FileReferenceError(_('report_spdx_not_in_copyright_list')) + with piggybacked_system(piggyback_def, self.piggyback_files) \ + as piggybacked: + copyright_to_process = [ + *(file_ref['file'] for file_ref in index_obj['copyright']), + *piggybacked.package_license_files + ] + self.copyright_file_refs = [self._process_file(f, piggybacked) + for f in copyright_to_process] - item_refs = [self._process_item(d) for d in index_obj['definitions']] + if generate_spdx and not spdx_ref.include_in_distribution: + raise FileReferenceError(_('report_spdx_not_in_copyright_list')) - for file_ref in index_obj.get('additional_files', []): - self._process_file(file_ref['file'], include_in_distribution=False) + item_refs = [self._process_item(d, piggybacked) + for d in index_obj['definitions']] - root_dir_path = Path(self.source_name) + for file_ref in index_obj.get('additional_files', []): + self._process_file(file_ref['file'], piggybacked, + include_in_distribution=False) - source_archives_obj = { - 'zip' : { - 'sha256': self._prepare_source_package_zip(root_dir_path) - } - } + zipfile_sha256 = self._prepare_source_package_zip\ + (self.source_name, piggybacked) + + source_archives_obj = {'zip' : {'sha256': zipfile_sha256}} self.source_description = { - '$schema': schema, + '$schema': out_schema, 'source_name': self.source_name, 'source_copyright': self.copyright_file_refs, 'upstream_url': index_obj['upstream_url'], @@ -398,20 +417,25 @@ class Build: dir_type = click.Path(exists=True, file_okay=False, resolve_path=True) +@click.command(help=_('build_package_from_srcdir_to_dstdir')) @click.option('-s', '--srcdir', default='./', type=dir_type, show_default=True, help=_('source_directory_to_build_from')) @click.option('-i', '--index-json', default='index.json', type=click.Path(), help=_('path_instead_of_index_json')) +@click.option('-p', '--piggyback-files', type=click.Path(), + help=_('path_instead_for_piggyback_files')) @click.option('-d', '--dstdir', type=dir_type, required=True, help=_('built_package_files_destination')) @click.version_option(version=_version.version, prog_name='Hydrilla builder', message=_('%(prog)s_%(version)s_license'), help=_('version_printing')) -def perform(srcdir, index_json, dstdir): - """<this will be replaced by a localized docstring for Click to pick up>""" - build = Build(Path(srcdir), Path(index_json)) - build.write_package_files(Path(dstdir)) - -perform.__doc__ = _('build_package_from_srcdir_to_dstdir') +def perform(srcdir, index_json, piggyback_files, dstdir): + """ + Execute Hydrilla builder to turn source package into a distributable one. -perform = click.command()(perform) + This command is meant to be the entry point of hydrilla-builder command + exported by this package. + """ + build = Build(Path(srcdir), Path(index_json), + piggyback_files and Path(piggyback_files)) + build.write_package_files(Path(dstdir)) diff --git a/src/hydrilla/builder/common_errors.py b/src/hydrilla/builder/common_errors.py new file mode 100644 index 0000000..29782e1 --- /dev/null +++ b/src/hydrilla/builder/common_errors.py @@ -0,0 +1,67 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Error classes. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module defines error types for use in other parts of Hydrilla builder. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +from pathlib import Path + +from .. import util + +here = Path(__file__).resolve().parent + +_ = util.translation(here / 'locales').gettext + +class DistroError(Exception): + """ + Exception used to report problems when resolving an OS distribution. + """ + +class FileReferenceError(Exception): + """ + Exception used to report various problems concerning files referenced from + source package. + """ + +class SubprocessError(Exception): + """ + Exception used to report problems related to execution of external + processes, includes. various problems when calling apt-* and dpkg-* + commands. + """ + def __init__(self, msg: str, cp: Optional[CP]=None) -> None: + """Initialize this SubprocessError""" + if cp and cp.stdout: + msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout]) + + if cp and cp.stderr: + msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr]) + + super().__init__(msg) diff --git a/src/hydrilla/builder/local_apt.py b/src/hydrilla/builder/local_apt.py new file mode 100644 index 0000000..8382af8 --- /dev/null +++ b/src/hydrilla/builder/local_apt.py @@ -0,0 +1,428 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Using a local APT. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +# Enable using with Python 3.7. +from __future__ import annotations + +import zipfile +import shutil +import re +import subprocess +CP = subprocess.CompletedProcess +from pathlib import Path, PurePosixPath +from tempfile import TemporaryDirectory, NamedTemporaryFile +from hashlib import sha256 +from contextlib import contextmanager +from typing import Optional, Iterable + +from .. import util +from .piggybacking import Piggybacked +from .common_errors import * + +here = Path(__file__).resolve().parent + +_ = util.translation(here / 'locales').gettext + +""" +Default cache directory to save APT configurations and downloaded GPG keys in. +""" +default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt' + +""" +Default keyserver to use. +""" +default_keyserver = 'hkps://keyserver.ubuntu.com:443' + +""" +Default keys to download when using a local APT. +""" +default_keys = [ + # Trisquel + 'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1', + '60364C9869F92450421F0C22B138CA450C05112F', + # Ubuntu + '630239CC130E1A7FD81A27B140976EAF437D05B5', + '790BC7277767219C42C86F933B4FE6ACC0B21F32', + 'F6ECB3762474EDA9D21B7022871920D1991BC93C', + # Debian + '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517', + '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE', + 'AC530D520F2F3269F5E98313A48449044AAD5C5D' +] + +"""sources.list file contents for known distros.""" +default_lists = { + 'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main' + for type in ('deb', 'deb-src') + for suf in ('', '-updates', '-security')] +} + +class GpgError(Exception): + """ + Exception used to report various problems when calling GPG. + """ + +class AptError(SubprocessError): + """ + Exception used to report various problems when calling apt-* and dpkg-* + commands. + """ + +def run(command, **kwargs): + """A wrapped around subprocess.run that sets some default options.""" + return subprocess.run(command, **kwargs, env={'LANG': 'en_US'}, + capture_output=True, text=True) + +class Apt: + """ + This class represents an APT instance and can be used to call apt-get + commands with it. + """ + def __init__(self, apt_conf: str) -> None: + """Initialize this Apt object.""" + self.apt_conf = apt_conf + + def get(self, *args: str, **kwargs) -> CP: + """ + Run apt-get with the specified arguments and raise a meaningful AptError + when something goes wrong. + """ + command = ['apt-get', '-c', self.apt_conf, *args] + try: + cp = run(command, **kwargs) + except FileNotFoundError: + raise AptError(_('couldnt_execute_apt_get_is_it_installed')) + + if cp.returncode != 0: + msg = _('apt_get_command_{}_failed').format(' '.join(command)) + raise AptError(msg, cp) + + return cp + +def cache_dir() -> Path: + """ + Return the directory used to cache data (APT configurations, keyrings) to + speed up repeated operations. + + This function first ensures the directory exists. + """ + default_apt_cache_dir.mkdir(parents=True, exist_ok=True) + return default_apt_cache_dir + +class SourcesList: + """Representation of apt's sources.list contents.""" + def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None: + """Initialize this SourcesList.""" + self.codename = None + self.list = [*list] + self.has_extra_entries = bool(self.list) + + if codename is not None: + if codename not in default_lists: + raise DistroError(_('distro_{}_unknown').format(codename)) + + self.codename = codename + self.list.extend(default_lists[codename]) + + def identity(self) -> str: + """ + Produce a string that uniquely identifies this sources.list contents. + """ + if self.codename and not self.has_extra_entries: + return self.codename + + return sha256('\n'.join(sorted(self.list)).encode()).digest().hex() + +def apt_conf(directory: Path) -> str: + """ + Given local APT's directory, produce a configuration suitable for running + APT there. + + 'directory' must not contain any special characters including quotes and + spaces. + """ + return f''' +Dir "{directory}"; +Dir::State "{directory}/var/lib/apt"; +Dir::State::status "{directory}/var/lib/dpkg/status"; +Dir::Etc::SourceList "{directory}/etc/apt.sources.list"; +Dir::Etc::SourceParts ""; +Dir::Cache "{directory}/var/cache/apt"; +pkgCacheGen::Essential "none"; +Dir::Etc::Trusted "{directory}/etc/trusted.gpg"; +''' + +def apt_keyring(keys: [str]) -> bytes: + """ + Download the requested keys if necessary and export them as a keyring + suitable for passing to APT. + + The keyring is returned as a bytes value that should be written to a file. + """ + try: + from gnupg import GPG + except ModuleNotFoundError: + raise GpgError(_('couldnt_import_gnupg_is_it_installed')) + + gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg')) + for key in keys: + if gpg.list_keys(keys=[key]) != []: + continue + + if gpg.recv_keys(default_keyserver, key).imported == 0: + raise GpgError(_('gpg_couldnt_recv_key')) + + return gpg.export_keys(keys, armor=False, minimal=True) + +def cache_apt_root(apt_root: Path, destination_zip: Path) -> None: + """ + Zip an APT root directory for later use and move the zipfile to the + requested destination. + """ + temporary_zip_path = None + try: + tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_', + dir=cache_dir(), delete=False) + temporary_zip_path = Path(tmpfile.name) + + to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'} + + with zipfile.ZipFile(tmpfile, 'w') as zf: + for member in apt_root.rglob('*'): + relative = member.relative_to(apt_root) + if relative not in to_skip: + # This call will also properly add empty folders to zip file + zf.write(member, relative, zipfile.ZIP_DEFLATED) + + shutil.move(temporary_zip_path, destination_zip) + finally: + if temporary_zip_path is not None and temporary_zip_path.exists(): + temporary_zip_path.unlink() + +def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt: + """ + Create files and directories necessary for running APT without root rights + inside 'directory'. + + 'directory' must not contain any special characters including quotes and + spaces and must be empty. + + Return an Apt object that can be used to call apt-get commands. + """ + apt_root = directory / 'apt_root' + + conf_text = apt_conf(apt_root) + keyring_bytes = apt_keyring(keys) + + apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip' + if apt_zipfile.exists(): + with zipfile.ZipFile(apt_zipfile) as zf: + zf.extractall(apt_root) + + for to_create in ( + apt_root / 'var' / 'lib' / 'apt' / 'partial', + apt_root / 'var' / 'lib' / 'apt' / 'lists', + apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial', + apt_root / 'etc' / 'apt' / 'preferences.d', + apt_root / 'var' / 'lib' / 'dpkg', + apt_root / 'var' / 'log' / 'apt' + ): + to_create.mkdir(parents=True, exist_ok=True) + + conf_path = apt_root / 'etc' / 'apt.conf' + trusted_path = apt_root / 'etc' / 'trusted.gpg' + status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status' + list_path = apt_root / 'etc' / 'apt.sources.list' + + conf_path.write_text(conf_text) + trusted_path.write_bytes(keyring_bytes) + status_path.touch() + list_path.write_text('\n'.join(list.list)) + + apt = Apt(str(conf_path)) + apt.get('update') + + cache_apt_root(apt_root, apt_zipfile) + + return apt + +@contextmanager +def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]: + """ + Create a temporary directory with proper local APT configuration in it. + Yield an Apt object that can be used to issue apt-get commands. + + This function returns a context manager that will remove the directory on + close. + """ + with TemporaryDirectory() as td: + td = Path(td) + yield setup_local_apt(td, list, keys) + +def download_apt_packages(list: SourcesList, keys: [str], packages: [str], + destination_dir: Path, with_deps=False) -> [str]: + """ + Set up a local APT, update it using the specified sources.list configuration + and use it to download the specified packages. + + This function downloads a .deb file of the packages matching the current + architecture (which includes packages with architecture 'all') as well as + all theis corresponding source package files and (if requested) the debs + and source files of all their declared dependencies. + + Return value is a list of names of all downloaded files. + """ + with local_apt(list, keys) as apt: + if with_deps: + cp = apt.get('install', '--yes', '--just-print', *packages) + + deps_listing = re.match( + r''' + .* + The\sfollowing\sNEW\spackages\swill\sbe\sinstalled: + (.*) + 0\supgraded, + ''', + cp.stdout, + re.MULTILINE | re.DOTALL | re.VERBOSE) + + if deps_listing is None: + raise AptError(_('apt_install_output_not_understood'), cp) + + packages = deps_listing.group(1).split() + + # Download .debs to indirectly to destination_dir by first placing them + # in a temporary subdirectory. + with TemporaryDirectory(dir=destination_dir) as td: + td = Path(td) + cp = apt.get('download', *packages, cwd=td) + + deb_name_regex = re.compile( + r''' + ^ + (?P<name>[^_]+) + _ + (?P<ver>[^_]+) + _ + .+ # architecture (or 'all') + \.deb + $ + ''', + re.VERBOSE) + + names_vers = [] + downloaded = [] + for deb_file in td.iterdir(): + match = deb_name_regex.match(deb_file.name) + if match is None: + msg = _('apt_download_gave_bad_filename_{}')\ + .format(deb_file.name) + raise AptError(msg, cp) + + names_vers.append((match.group('name'), match.group('ver'))) + downloaded.append(deb_file.name) + + apt.get('source', '--download-only', + *[f'{n}={v}' for n, v in names_vers], cwd=td) + + for source_file in td.iterdir(): + if source_file.name in downloaded: + continue + + downloaded.append(source_file.name) + + for filename in downloaded: + shutil.move(td / filename, destination_dir / filename) + + return downloaded + +@contextmanager +def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \ + -> Iterable[Piggybacked]: + """ + Resolve resources from APT. Optionally, use package files (.deb's, etc.) + from a specified directory instead of resolving and downloading them. + + The directories and files created for the yielded Piggybacked object shall + be deleted when this context manager gets closed. + """ + assert piggyback_def['system'] == 'apt' + + with TemporaryDirectory() as td: + td = Path(td) + root = td / 'root' + root.mkdir() + + if foreign_packages is None: + archives = td / 'archives' + archives.mkdir() + + sources_list = SourcesList(piggyback_def.get('sources_list', []), + piggyback_def.get('distribution')) + packages = piggyback_def['packages'] + with_deps = piggyback_def['dependencies'] + pgp_keys = [ + *default_keys, + *piggyback_def.get('trusted_keys', []) + ] + + download_apt_packages( + list=sources_list, + keys=pgp_keys, + packages=packages, + destination_dir=archives, + with_deps=with_deps + ) + else: + archives = foreign_packages / 'apt' + + for deb in archives.glob('*.deb'): + command = ['dpkg-deb', '-x', str(deb), str(root)] + try: + cp = run(command) + except FileNotFoundError: + raise AptError(_('couldnt_execute_dpkg_deb_is_it_installed')) + + if cp.returncode != 0: + msg = _('dpkg_deb_command_{}_failed').format(' '.join(command)) + raise AptError(msg, cp) + + docs_dir = root / 'usr' / 'share' / 'doc' + copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \ + if docs_dir.exists() else [] + copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root) + for p in copyright_paths if p.exists()] + + standard_depends = piggyback_def.get('depend_on_base_packages', True) + must_depend = [{'identifier': 'apt-common-licenses'}] \ + if standard_depends else [] + + yield Piggybacked( + archives={'apt': archives}, + roots={'.apt-root': root}, + package_license_files=copyright_paths, + package_must_depend=must_depend + ) diff --git a/src/hydrilla/builder/piggybacking.py b/src/hydrilla/builder/piggybacking.py new file mode 100644 index 0000000..799422d --- /dev/null +++ b/src/hydrilla/builder/piggybacking.py @@ -0,0 +1,115 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Handling of software packaged for other distribution systems. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module contains definitions that may be reused by multiple piggybacked +software system backends. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +from pathlib import Path, PurePosixPath +from typing import Optional, Iterable + +from .. import util +from .common_errors import * + +here = Path(__file__).resolve().parent + +_ = util.translation(here / 'locales').gettext + +class Piggybacked: + """ + Store information about foreign resources in use. + + Public attributes: + 'package_must_depend' (read-only) + 'package_license_files' (read-only) + """ + def __init__(self, archives: dict[str, Path]={}, roots: dict[str, Path]={}, + package_license_files: list[PurePosixPath]=[], + package_must_depend: list[dict]=[]): + """ + Initialize this Piggybacked object. + + 'archives' maps piggybacked system names to directories that contain + package(s)' archive files. An 'archives' object may look like + {'apt': PosixPath('/path/to/dir/with/debs/and/tarballs')}. + + 'roots' associates directory names to be virtually inserted under + Hydrilla source package directory with paths to real filesystem + directories that hold their desired contents, i.e. unpacked foreign + packages. + + 'package_license_files' lists paths to license files that should be + included with the Haketilo package that will be produced. The paths are + to be resolved using 'roots' dictionary. + + 'package_must_depend' lists names of Haketilo packages that the produced + package will additionally depend on. This is meant to help distribute + common licenses with a separate Haketilo package. + """ + self.archives = archives + self.roots = roots + self.package_license_files = package_license_files + self.package_must_depend = package_must_depend + + def resolve_file(self, file_ref_name: PurePosixPath) -> Optional[Path]: + """ + 'file_ref_name' is a path as may appear in an index.json file. Check if + the file belongs to one of the roots we have and return either a path + to the relevant file under this root or None. + + It is not being checked whether the file actually exists in the + filesystem. + """ + parts = file_ref_name.parts + root_path = self.roots.get(parts and parts[0]) + path = root_path + if path is None: + return None + + for part in parts[1:]: + path = path / part + + path = path.resolve() + + if not path.is_relative_to(root_path): + raise FileReferenceError(_('loading_{}_outside_piggybacked_dir') + .format(file_ref_name)) + + return path + + def archive_files(self) -> Iterable[tuple[PurePosixPath, Path]]: + """ + Yield all archive files in use. Each yielded tuple holds file's desired + path relative to the piggybacked archives directory to be created and + its current real path. + """ + for system, real_dir in self.archives.items(): + for path in real_dir.rglob('*'): + yield PurePosixPath(system) / path.relative_to(real_dir), path diff --git a/src/hydrilla/schemas b/src/hydrilla/schemas -Subproject 09634f3446866f712a022327683b1149d8f46bf +Subproject 4b4da5a02bc311603469eea7b3dfd4f1bbb911f diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d382ead --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,5 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..df474b0 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,51 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import re + +variable_word_re = re.compile(r'^<(.+)>$') + +def process_command(command, expected_command): + """Validate the command line and extract its variable parts (if any).""" + assert len(command) == len(expected_command) + + extracted = {} + for word, expected_word in zip(command, expected_command): + match = variable_word_re.match(expected_word) + if match: + extracted[match.group(1)] = word + else: + assert word == expected_word + + return extracted + +def run_missing_executable(command, **kwargs): + """ + Instead of running a command, raise FileNotFoundError as if its executable + was missing. + """ + raise FileNotFoundError('dummy') + +class MockedCompletedProcess: + """ + Object with some fields similar to those of subprocess.CompletedProcess. + """ + def __init__(self, args, returncode=0, + stdout='some output', stderr='some error output', + text_output=True): + """ + Initialize MockedCompletedProcess. Convert strings to bytes if needed. + """ + self.args = args + self.returncode = returncode + + if type(stdout) is str and not text_output: + stdout = stdout.encode() + if type(stderr) is str and not text_output: + stderr = stderr.encode() + + self.stdout = stdout + self.stderr = stderr diff --git a/tests/test_build.py b/tests/test_build.py new file mode 100644 index 0000000..a30cff4 --- /dev/null +++ b/tests/test_build.py @@ -0,0 +1,674 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +# Enable using with Python 3.7. +from __future__ import annotations + +import pytest +import json +import shutil + +from tempfile import TemporaryDirectory +from pathlib import Path, PurePosixPath +from hashlib import sha256 +from zipfile import ZipFile +from contextlib import contextmanager + +from jsonschema import ValidationError + +from hydrilla import util as hydrilla_util +from hydrilla.builder import build, _version, local_apt +from hydrilla.builder.common_errors import * + +from .helpers import * + +here = Path(__file__).resolve().parent + +expected_generated_by = { + 'name': 'hydrilla.builder', + 'version': _version.version +} + +orig_srcdir = here / 'source-package-example' + +index_text = (orig_srcdir / 'index.json').read_text() +index_obj = json.loads(hydrilla_util.strip_json_comments(index_text)) + +def read_files(*file_list): + """ + Take names of files under srcdir and return a dict that maps them to their + contents (as bytes). + """ + return dict((name, (orig_srcdir / name).read_bytes()) for name in file_list) + +dist_files = { + **read_files('LICENSES/CC0-1.0.txt', 'bye.js', 'hello.js', 'message.js'), + 'report.spdx': b'dummy spdx output' +} +src_files = { + **dist_files, + **read_files('README.txt', 'README.txt.license', '.reuse/dep5', + 'index.json') +} +extra_archive_files = { +} + +sha256_hashes = dict((name, sha256(contents).digest().hex()) + for name, contents in src_files.items()) + +del src_files['report.spdx'] + +expected_resources = [{ + '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': [{ + 'file': 'report.spdx', + 'sha256': sha256_hashes['report.spdx'] + }, { + 'file': 'LICENSES/CC0-1.0.txt', + 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt'] + }], + 'type': 'resource', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68', + 'version': [2021, 11, 10], + 'revision': 1, + 'description': 'greets an apple', + 'dependencies': [{'identifier': 'hello-message'}], + 'scripts': [{ + 'file': 'hello.js', + 'sha256': sha256_hashes['hello.js'] + }, { + 'file': 'bye.js', + 'sha256': sha256_hashes['bye.js'] + }], + 'generated_by': expected_generated_by +}, { + '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': [{ + 'file': 'report.spdx', + 'sha256': sha256_hashes['report.spdx'] + }, { + 'file': 'LICENSES/CC0-1.0.txt', + 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt'] + }], + 'type': 'resource', + 'identifier': 'hello-message', + 'long_name': 'Hello Message', + 'uuid': '1ec36229-298c-4b35-8105-c4f2e1b9811e', + 'version': [2021, 11, 10], + 'revision': 2, + 'description': 'define messages for saying hello and bye', + 'dependencies': [], + 'scripts': [{ + 'file': 'message.js', + 'sha256': sha256_hashes['message.js'] + }], + 'generated_by': expected_generated_by +}] + +expected_mapping = { + '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': [{ + 'file': 'report.spdx', + 'sha256': sha256_hashes['report.spdx'] + }, { + 'file': 'LICENSES/CC0-1.0.txt', + 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt'] + }], + 'type': 'mapping', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7', + 'version': [2021, 11, 10], + 'description': 'causes apple to get greeted on Hydrillabugs issue tracker', + 'payloads': { + 'https://hydrillabugs.koszko.org/***': { + 'identifier': 'helloapple' + }, + 'https://hachettebugs.koszko.org/***': { + 'identifier': 'helloapple' + } + }, + 'generated_by': expected_generated_by +} + +expected_source_description = { + '$schema': 'https://hydrilla.koszko.org/schemas/api_source_description-1.schema.json', + 'source_name': 'hello', + 'source_copyright': [{ + 'file': 'report.spdx', + 'sha256': sha256_hashes['report.spdx'] + }, { + 'file': 'LICENSES/CC0-1.0.txt', + 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt'] + }], + 'source_archives': { + 'zip': { + 'sha256': '!!!!value to fill during test!!!!', + } + }, + 'upstream_url': 'https://git.koszko.org/hydrilla-source-package-example', + 'definitions': [{ + 'type': 'resource', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'version': [2021, 11, 10], + }, { + 'type': 'resource', + 'identifier': 'hello-message', + 'long_name': 'Hello Message', + 'version': [2021, 11, 10], + }, { + 'type': 'mapping', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'version': [2021, 11, 10], + }], + 'generated_by': expected_generated_by +} + +expected = [*expected_resources, expected_mapping, expected_source_description] + +@pytest.fixture +def tmpdir() -> Iterable[str]: + """ + Provide test case with a temporary directory that will be automatically + deleted after the test. + """ + with TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + +def run_reuse(command, **kwargs): + """ + Instead of running a 'reuse' command, check if 'mock_reuse_missing' file + exists under root directory. If yes, raise FileNotFoundError as if 'reuse' + command was missing. If not, check if 'README.txt.license' file exists + in the requested directory and return zero if it does. + """ + expected = ['reuse', '--root', '<root>', + 'lint' if 'lint' in command else 'spdx'] + + root_path = Path(process_command(command, expected)['root']) + + if (root_path / 'mock_reuse_missing').exists(): + raise FileNotFoundError('dummy') + + is_reuse_compliant = (root_path / 'README.txt.license').exists() + + return MockedCompletedProcess(command, 1 - is_reuse_compliant, + stdout=f'dummy {expected[-1]} output', + text_output=kwargs.get('text')) + +mocked_piggybacked_archives = [ + PurePosixPath('apt/something.deb'), + PurePosixPath('apt/something.orig.tar.gz'), + PurePosixPath('apt/something.debian.tar.xz'), + PurePosixPath('othersystem/other-something.tar.gz') +] + +@pytest.fixture +def mock_piggybacked_apt_system(monkeypatch): + """Make local_apt.piggybacked_system() return a mocked result.""" + # We set 'td' to a temporary dir path further below. + td = None + + class MockedPiggybacked: + """Minimal mock of Piggybacked object.""" + package_license_files = [PurePosixPath('.apt-root/.../copyright')] + package_must_depend = [{'identifier': 'apt-common-licenses'}] + + def resolve_file(path): + """ + For each path that starts with '.apt-root' return a valid + dummy file path. + """ + if path.parts[0] != '.apt-root': + return None + + (td / path.name).write_text(f'dummy {path.name}') + + return (td / path.name) + + def archive_files(): + """Yield some valid dummy file path tuples.""" + for desired_path in mocked_piggybacked_archives: + real_path = td / desired_path.name + real_path.write_text(f'dummy {desired_path.name}') + + yield desired_path, real_path + + @contextmanager + def mocked_piggybacked_system(piggyback_def, piggyback_files): + """Mock the execution of local_apt.piggybacked_system().""" + assert piggyback_def == { + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['somelib=1.0'], + 'dependencies': False + } + if piggyback_files is not None: + assert {str(path) for path in mocked_piggybacked_archives} == \ + {path.relative_to(piggyback_files).as_posix() + for path in piggyback_files.rglob('*') if path.is_file()} + + yield MockedPiggybacked + + monkeypatch.setattr(local_apt, 'piggybacked_system', + mocked_piggybacked_system) + + with TemporaryDirectory() as td: + td = Path(td) + yield + +@pytest.fixture +def sample_source(): + """Prepare a directory with sample Haketilo source package.""" + with TemporaryDirectory() as td: + sample_source = Path(td) / 'hello' + for name, contents in src_files.items(): + path = sample_source / name + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(contents) + + yield sample_source + +variant_makers = [] +def variant_maker(function): + """Decorate function by placing it in variant_makers array.""" + variant_makers.append(function) + return function + +@variant_maker +def sample_source_change_index_json(monkeypatch, sample_source): + """ + Return a non-standard path for index.json. Ensure parent directories exist. + """ + # Use a path under sample_source so that it gets auto-deleted after the + # test. Use a file under .git because .git is ignored by REUSE. + path = sample_source / '.git' / 'replacement.json' + path.parent.mkdir() + return path + +@variant_maker +def sample_source_add_comments(monkeypatch, sample_source): + """Add index.json comments that should be preserved.""" + for dictionary in (index_obj, expected_source_description): + monkeypatch.setitem(dictionary, 'comment', 'index.json comment') + + for i, dicts in enumerate(zip(index_obj['definitions'], expected)): + for dictionary in dicts: + monkeypatch.setitem(dictionary, 'comment', 'index.json comment') + +@variant_maker +def sample_source_remove_spdx(monkeypatch, sample_source): + """Remove spdx report generation.""" + monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report') + + for obj, key in [ + (index_obj, 'copyright'), + *((definition, 'source_copyright') for definition in expected) + ]: + new_list = [r for r in obj[key] if r['file'] != 'report.spdx'] + monkeypatch.setitem(obj, key, new_list) + + monkeypatch.delitem(dist_files, 'report.spdx') + + # To verify that reuse does not get called now, make mocked subprocess.run() + # raise an error if called. + (sample_source / 'mock_reuse_missing').touch() + +@variant_maker +def sample_source_remove_additional_files(monkeypatch, sample_source): + """Use default value ([]) for 'additionall_files' property.""" + monkeypatch.delitem(index_obj, 'additional_files') + + for name in 'README.txt', 'README.txt.license', '.reuse/dep5': + monkeypatch.delitem(src_files, name) + +@variant_maker +def sample_source_remove_script(monkeypatch, sample_source): + """Use default value ([]) for 'scripts' property in one of the resources.""" + monkeypatch.delitem(index_obj['definitions'][1], 'scripts') + + monkeypatch.setitem(expected_resources[1], 'scripts', []) + + for files in dist_files, src_files: + monkeypatch.delitem(files, 'message.js') + +@variant_maker +def sample_source_remove_payloads(monkeypatch, sample_source): + """Use default value ({}) for 'payloads' property in mapping.""" + monkeypatch.delitem(index_obj['definitions'][2], 'payloads') + + monkeypatch.setitem(expected_mapping, 'payloads', {}) + +@variant_maker +def sample_source_remove_uuids(monkeypatch, sample_source): + """Don't use UUIDs (they are optional).""" + for definition in index_obj['definitions']: + monkeypatch.delitem(definition, 'uuid') + + for description in expected: + if 'uuid' in description: + monkeypatch.delitem(description, 'uuid') + +@variant_maker +def sample_source_add_extra_props(monkeypatch, sample_source): + """Add some unrecognized properties that should be stripped.""" + to_process = [index_obj] + while to_process: + processed = to_process.pop() + + if type(processed) is list: + to_process.extend(processed) + elif type(processed) is dict and 'spurious_property' not in processed: + to_process.extend(v for k, v in processed.items() + if k != 'payloads') + monkeypatch.setitem(processed, 'spurious_property', 'some_value') + +piggyback_archive_names = [ + 'apt/something.deb', + 'apt/something.orig.tar.gz', + 'apt/something.debian.tar.xz', + 'othersystem/other-something.tar.gz' +] + +@variant_maker +def sample_source_add_piggyback(monkeypatch, sample_source, + extra_build_args={}): + """Add piggybacked foreign system packages.""" + old_build = build.Build + new_build = lambda *a, **kwa: old_build(*a, **kwa, **extra_build_args) + monkeypatch.setattr(build, 'Build', new_build) + + monkeypatch.setitem(index_obj, 'piggyback_on', { + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['somelib=1.0'], + 'dependencies': False + }) + schema = 'https://hydrilla.koszko.org/schemas/package_source-2.schema.json' + monkeypatch.setitem(index_obj, '$schema', schema) + + new_refs = {} + for name in '.apt-root/.../copyright', '.apt-root/.../script.js': + contents = f'dummy {PurePosixPath(name).name}'.encode() + digest = sha256(contents).digest().hex() + monkeypatch.setitem(dist_files, name, contents) + monkeypatch.setitem(sha256_hashes, name, digest) + new_refs[PurePosixPath(name).name] = {'file': name, 'sha256': digest} + + for obj in expected: + new_list = [*obj['source_copyright'], new_refs['copyright']] + monkeypatch.setitem(obj, 'source_copyright', new_list) + + for obj in expected_resources: + new_list = [{'identifier': 'apt-common-licenses'}, *obj['dependencies']] + monkeypatch.setitem(obj, 'dependencies', new_list) + + for obj in index_obj['definitions'][0], expected_resources[0]: + new_list = [new_refs['script.js'], *obj['scripts']] + monkeypatch.setitem(obj, 'scripts', new_list) + + for name in piggyback_archive_names: + path = PurePosixPath('hello.foreign-packages') / name + monkeypatch.setitem(extra_archive_files, str(path), + f'dummy {path.name}'.encode()) + +def prepare_foreign_packages_dir(path): + """ + Put some dummy archive in the directory so that it can be passed to + piggybacked_system(). + """ + for name in piggyback_archive_names: + archive_path = path / name + archive_path.parent.mkdir(parents=True, exist_ok=True) + archive_path.write_text(f'dummy {archive_path.name}') + +@variant_maker +def sample_source_add_piggyback_pass_archives(monkeypatch, sample_source): + """ + Add piggybacked foreign system packages, use pre-downloaded foreign package + archives (have Build() find them in their default directory). + """ + # Dir next to 'sample_source' will also be gc'd by sample_source() fixture. + foreign_packages_dir = sample_source.parent / 'arbitrary-name' + + prepare_foreign_packages_dir(foreign_packages_dir) + + sample_source_add_piggyback(monkeypatch, sample_source, + {'piggyback_files': foreign_packages_dir}) + +@variant_maker +def sample_source_add_piggyback_find_archives(monkeypatch, sample_source): + """ + Add piggybacked foreign system packages, use pre-downloaded foreign package + archives (specify their directory as argument to Build()). + """ + # Dir next to 'sample_source' will also be gc'd by sample_source() fixture. + foreign_packages_dir = sample_source.parent / 'hello.foreign-packages' + + prepare_foreign_packages_dir(foreign_packages_dir) + + sample_source_add_piggyback(monkeypatch, sample_source) + +@variant_maker +def sample_source_add_piggyback_no_download(monkeypatch, sample_source, + pass_directory_to_build=False): + """ + Add piggybacked foreign system packages, use pre-downloaded foreign package + archives. + """ + # Use a dir next to 'sample_source'; have it gc'd by sample_source fixture. + if pass_directory_to_build: + foreign_packages_dir = sample_source.parent / 'arbitrary-name' + else: + foreign_packages_dir = sample_source.parent / 'hello.foreign-packages' + + prepare_foreign_packages_dir(foreign_packages_dir) + + sample_source_add_piggyback(monkeypatch, sample_source) + +@pytest.fixture(params=[lambda m, s: None, *variant_makers]) +def sample_source_make_variants(request, monkeypatch, sample_source, + mock_piggybacked_apt_system): + """ + Prepare a directory with sample Haketilo source package in multiple slightly + different versions (all correct). Return an index.json path that should be + used when performing test build. + """ + index_path = request.param(monkeypatch, sample_source) or Path('index.json') + + index_text = json.dumps(index_obj) + + (sample_source / index_path).write_text(index_text) + + monkeypatch.setitem(src_files, 'index.json', index_text.encode()) + + return index_path + +@pytest.mark.subprocess_run(build, run_reuse) +@pytest.mark.usefixtures('mock_subprocess_run') +def test_build(sample_source, sample_source_make_variants, tmpdir): + """Build the sample source package and verify the produced files.""" + index_json_path = sample_source_make_variants + + # First, build the package + build.Build(sample_source, index_json_path).write_package_files(tmpdir) + + # Verify directories under destination directory + assert {'file', 'resource', 'mapping', 'source'} == \ + set([path.name for path in tmpdir.iterdir()]) + + # Verify files under 'file/' + file_dir = tmpdir / 'file' / 'sha256' + + for name, contents in dist_files.items(): + dist_file_path = file_dir / sha256_hashes[name] + assert dist_file_path.is_file() + assert dist_file_path.read_bytes() == contents + + assert {p.name for p in file_dir.iterdir()} == \ + {sha256_hashes[name] for name in dist_files.keys()} + + # Verify files under 'resource/' + resource_dir = tmpdir / 'resource' + + assert {rj['identifier'] for rj in expected_resources} == \ + {path.name for path in resource_dir.iterdir()} + + for resource_json in expected_resources: + subdir = resource_dir / resource_json['identifier'] + assert ['2021.11.10'] == [path.name for path in subdir.iterdir()] + + assert json.loads((subdir / '2021.11.10').read_text()) == resource_json + + hydrilla_util.validator_for('api_resource_description-1.0.1.schema.json')\ + .validate(resource_json) + + # Verify files under 'mapping/' + mapping_dir = tmpdir / 'mapping' + assert ['helloapple'] == [path.name for path in mapping_dir.iterdir()] + + subdir = mapping_dir / 'helloapple' + assert ['2021.11.10'] == [path.name for path in subdir.iterdir()] + + assert json.loads((subdir / '2021.11.10').read_text()) == expected_mapping + + hydrilla_util.validator_for('api_mapping_description-1.0.1.schema.json')\ + .validate(expected_mapping) + + # Verify files under 'source/' + source_dir = tmpdir / 'source' + assert {'hello.json', 'hello.zip'} == \ + {path.name for path in source_dir.iterdir()} + + archive_files = {**dict((f'hello/{name}', contents) + for name, contents in src_files.items()), + **extra_archive_files} + + with ZipFile(source_dir / 'hello.zip', 'r') as archive: + print(archive.namelist()) + assert len(archive.namelist()) == len(archive_files) + + for name, contents in archive_files.items(): + assert archive.read(name) == contents + + zip_ref = expected_source_description['source_archives']['zip'] + zip_contents = (source_dir / 'hello.zip').read_bytes() + zip_ref['sha256'] = sha256(zip_contents).digest().hex() + + assert json.loads((source_dir / 'hello.json').read_text()) == \ + expected_source_description + + hydrilla_util.validator_for('api_source_description-1.0.1.schema.json')\ + .validate(expected_source_description) + +error_makers = [] +def error_maker(function): + """Decorate function by placing it in error_makers array.""" + error_makers.append(function) + +@error_maker +def sample_source_error_missing_file(monkeypatch, sample_source): + """ + Modify index.json to expect missing report.spdx file and cause an error. + """ + monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report') + return FileNotFoundError + +@error_maker +def sample_source_error_index_schema(monkeypatch, sample_source): + """Modify index.json to be incompliant with the schema.""" + monkeypatch.delitem(index_obj, 'definitions') + return ValidationError + +@error_maker +def sample_source_error_bad_comment(monkeypatch, sample_source): + """Modify index.json to have an invalid '/' in it.""" + return json.JSONDecodeError, json.dumps(index_obj) + '/something\n' + +@error_maker +def sample_source_error_bad_json(monkeypatch, sample_source): + """Modify index.json to not be valid json even after comment stripping.""" + return json.JSONDecodeError, json.dumps(index_obj) + '???/\n' + +@error_maker +def sample_source_error_missing_reuse(monkeypatch, sample_source): + """Cause mocked reuse process invocation to fail with FileNotFoundError.""" + (sample_source / 'mock_reuse_missing').touch() + return build.ReuseError + +@error_maker +def sample_source_error_missing_license(monkeypatch, sample_source): + """Remove a file to make package REUSE-incompliant.""" + (sample_source / 'README.txt.license').unlink() + return build.ReuseError + +@error_maker +def sample_source_error_file_outside(monkeypatch, sample_source): + """Make index.json illegally reference a file outside srcdir.""" + new_list = [*index_obj['copyright'], {'file': '../abc'}] + monkeypatch.setitem(index_obj, 'copyright', new_list) + return FileReferenceError + +@error_maker +def sample_source_error_reference_itself(monkeypatch, sample_source): + """Make index.json illegally reference index.json.""" + new_list = [*index_obj['copyright'], {'file': 'index.json'}] + monkeypatch.setitem(index_obj, 'copyright', new_list) + return FileReferenceError + +@error_maker +def sample_source_error_report_excluded(monkeypatch, sample_source): + """ + Make index.json require generation of report.spdx but don't include it among + copyright files. + """ + new_list = [file_ref for file_ref in index_obj['copyright'] + if file_ref['file'] != 'report.spdx'] + monkeypatch.setitem(index_obj, 'copyright', new_list) + return FileReferenceError + +@pytest.fixture(params=error_makers) +def sample_source_make_errors(request, monkeypatch, sample_source): + """ + Prepare a directory with sample Haketilo source package in multiple slightly + broken versions. Return an error type that should be raised when running + test build. + """ + index_text = None + error_type = request.param(monkeypatch, sample_source) + if type(error_type) is tuple: + error_type, index_text = error_type + + index_text = index_text or json.dumps(index_obj) + + (sample_source / 'index.json').write_text(index_text) + + monkeypatch.setitem(src_files, 'index.json', index_text.encode()) + + return error_type + +@pytest.mark.subprocess_run(build, run_reuse) +@pytest.mark.usefixtures('mock_subprocess_run') +def test_build_error(tmpdir, sample_source, sample_source_make_errors): + """Try building the sample source package and verify generated errors.""" + error_type = sample_source_make_errors + + dstdir = Path(tmpdir) / 'dstdir' + tmpdir = Path(tmpdir) / 'example' + + dstdir.mkdir(exist_ok=True) + tmpdir.mkdir(exist_ok=True) + + with pytest.raises(error_type): + build.Build(sample_source, Path('index.json'))\ + .write_package_files(dstdir) diff --git a/tests/test_hydrilla_builder.py b/tests/test_hydrilla_builder.py deleted file mode 100644 index 851b5cd..0000000 --- a/tests/test_hydrilla_builder.py +++ /dev/null @@ -1,472 +0,0 @@ -# SPDX-License-Identifier: CC0-1.0 - -# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> -# -# Available under the terms of Creative Commons Zero v1.0 Universal. - -# Enable using with Python 3.7. -from __future__ import annotations - -import pytest -import json -import shutil - -from tempfile import TemporaryDirectory -from pathlib import Path -from hashlib import sha256, sha1 -from zipfile import ZipFile -from typing import Callable, Optional, Iterable - -from jsonschema import ValidationError - -from hydrilla import util as hydrilla_util -from hydrilla.builder import build, _version - -here = Path(__file__).resolve().parent - -expected_generated_by = { - 'name': 'hydrilla.builder', - 'version': _version.version -} - -default_srcdir = here / 'source-package-example' - -default_js_filenames = ['bye.js', 'hello.js', 'message.js'] -default_dist_filenames = [*default_js_filenames, 'LICENSES/CC0-1.0.txt'] -default_src_filenames = [ - *default_dist_filenames, - 'README.txt', 'README.txt.license', '.reuse/dep5', 'index.json' -] - -default_sha1_hashes = {} -default_sha256_hashes = {} -default_contents = {} - -for fn in default_src_filenames: - with open(default_srcdir / fn, 'rb') as file_handle: - default_contents[fn] = file_handle.read() - default_sha256_hashes[fn] = sha256(default_contents[fn]).digest().hex() - default_sha1_hashes[fn] = sha1(default_contents[fn]).digest().hex() - -class CaseSettings: - """Gather parametrized values in a class.""" - def __init__(self): - """Init CaseSettings with default values.""" - self.srcdir = default_srcdir - self.index_json_path = Path('index.json') - self.report_spdx_included = True - - self.js_filenames = default_js_filenames.copy() - self.dist_filenames = default_dist_filenames.copy() - self.src_filenames = default_src_filenames.copy() - - self.sha1_hashes = default_sha1_hashes.copy() - self.sha256_hashes = default_sha256_hashes.copy() - self.contents = default_contents.copy() - - self.expected_resources = [{ - '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json', - 'source_name': 'hello', - 'source_copyright': [{ - 'file': 'report.spdx', - 'sha256': '!!!!value to fill during test!!!!' - }, { - 'file': 'LICENSES/CC0-1.0.txt', - 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt'] - }], - 'type': 'resource', - 'identifier': 'helloapple', - 'long_name': 'Hello Apple', - 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68', - 'version': [2021, 11, 10], - 'revision': 1, - 'description': 'greets an apple', - 'dependencies': [{'identifier': 'hello-message'}], - 'scripts': [{ - 'file': 'hello.js', - 'sha256': self.sha256_hashes['hello.js'] - }, { - 'file': 'bye.js', - 'sha256': self.sha256_hashes['bye.js'] - }], - 'generated_by': expected_generated_by - }, { - '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json', - 'source_name': 'hello', - 'source_copyright': [{ - 'file': 'report.spdx', - 'sha256': '!!!!value to fill during test!!!!' - }, { - 'file': 'LICENSES/CC0-1.0.txt', - 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt'] - }], - 'type': 'resource', - 'identifier': 'hello-message', - 'long_name': 'Hello Message', - 'uuid': '1ec36229-298c-4b35-8105-c4f2e1b9811e', - 'version': [2021, 11, 10], - 'revision': 2, - 'description': 'define messages for saying hello and bye', - 'dependencies': [], - 'scripts': [{ - 'file': 'message.js', - 'sha256': self.sha256_hashes['message.js'] - }], - 'generated_by': expected_generated_by - }] - self.expected_mapping = { - '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json', - 'source_name': 'hello', - 'source_copyright': [{ - 'file': 'report.spdx', - 'sha256': '!!!!value to fill during test!!!!' - }, { - 'file': 'LICENSES/CC0-1.0.txt', - 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt'] - }], - 'type': 'mapping', - 'identifier': 'helloapple', - 'long_name': 'Hello Apple', - 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7', - 'version': [2021, 11, 10], - 'description': 'causes apple to get greeted on Hydrillabugs issue tracker', - 'payloads': { - 'https://hydrillabugs.koszko.org/***': { - 'identifier': 'helloapple' - }, - 'https://hachettebugs.koszko.org/***': { - 'identifier': 'helloapple' - } - }, - 'generated_by': expected_generated_by - } - self.expected_source_description = { - '$schema': 'https://hydrilla.koszko.org/schemas/api_source_description-1.schema.json', - 'source_name': 'hello', - 'source_copyright': [{ - 'file': 'report.spdx', - 'sha256': '!!!!value to fill during test!!!!' - }, { - 'file': 'LICENSES/CC0-1.0.txt', - 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt'] - }], - 'source_archives': { - 'zip': { - 'sha256': '!!!!value to fill during test!!!!', - } - }, - 'upstream_url': 'https://git.koszko.org/hydrilla-source-package-example', - 'definitions': [{ - 'type': 'resource', - 'identifier': 'helloapple', - 'long_name': 'Hello Apple', - 'version': [2021, 11, 10], - }, { - 'type': 'resource', - 'identifier': 'hello-message', - 'long_name': 'Hello Message', - 'version': [2021, 11, 10], - }, { - 'type': 'mapping', - 'identifier': 'helloapple', - 'long_name': 'Hello Apple', - 'version': [2021, 11, 10], - }], - 'generated_by': expected_generated_by - } - - def expected(self) -> list[dict]: - """ - Convenience method to get a list of expected jsons of 2 resources, - 1 mapping and 1 source description we have. - """ - return [ - *self.expected_resources, - self.expected_mapping, - self.expected_source_description - ] - -ModifyCb = Callable[[CaseSettings, dict], Optional[str]] - -def prepare_modified(tmpdir: Path, modify_cb: ModifyCb) -> CaseSettings: - """ - Use sample source package directory with an alternative, modified - index.json. - """ - settings = CaseSettings() - - for fn in settings.src_filenames: - copy_path = tmpdir / 'srcdir_copy' / fn - copy_path.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(settings.srcdir / fn, copy_path) - - settings.srcdir = tmpdir / 'srcdir_copy' - - with open(settings.srcdir / 'index.json', 'rt') as file_handle: - obj = json.loads(hydrilla_util.strip_json_comments(file_handle.read())) - - contents = modify_cb(settings, obj) - - # Replace the other index.json with new one - settings.index_json_path = tmpdir / 'replacement.json' - - if contents is None: - contents = json.dumps(obj) - - contents = contents.encode() - - settings.contents['index.json'] = contents - - settings.sha256_hashes['index.json'] = sha256(contents).digest().hex() - settings.sha1_hashes['index.json'] = sha1(contents).digest().hex() - - with open(settings.index_json_path, 'wb') as file_handle: - file_handle.write(contents) - - return settings - -@pytest.fixture() -def tmpdir() -> Iterable[str]: - with TemporaryDirectory() as tmpdir: - yield tmpdir - -def prepare_default(tmpdir: Path) -> CaseSettings: - """Use sample source package directory as exists in VCS.""" - return CaseSettings() - -def modify_index_good(settings: CaseSettings, obj: dict) -> None: - """ - Modify index.json object to make a slightly different but *also correct* one - that can be used to test some different cases. - """ - # Add comments that should be preserved. - for dictionary in (obj, settings.expected_source_description): - dictionary['comment'] = 'index_json comment' - - for i, dicts in enumerate(zip(obj['definitions'], settings.expected())): - for dictionary in dicts: - dictionary['comment'] = f'item {i}' - - # Remove spdx report generation - del obj['reuse_generate_spdx_report'] - obj['copyright'].remove({'file': 'report.spdx'}) - - settings.report_spdx_included = False - - for json_description in settings.expected(): - json_description['source_copyright'] = \ - [fr for fr in json_description['source_copyright'] - if fr['file'] != 'report.spdx'] - - # Use default value ([]) for 'additionall_files' property - del obj['additional_files'] - - settings.src_filenames = [*settings.dist_filenames, 'index.json'] - - # Use default value ([]) for 'scripts' property in one of the resources - del obj['definitions'][1]['scripts'] - - settings.expected_resources[1]['scripts'] = [] - - for prefix in ('js', 'dist', 'src'): - getattr(settings, f'{prefix}_filenames').remove('message.js') - - # Use default value ({}) for 'pyloads' property in mapping - del obj['definitions'][2]['payloads'] - - settings.expected_mapping['payloads'] = {} - - # Don't use UUIDs (they are optional) - for definition in obj['definitions']: - del definition['uuid'] - - for description in settings.expected(): - if 'uuid' in description: - del description['uuid'] - - # Add some unrecognized properties that should be stripped - to_process = [obj] - while to_process: - processed = to_process.pop() - - if type(processed) is list: - to_process.extend(processed) - elif type(processed) is dict and 'spurious_property' not in processed: - to_process.extend(processed.values()) - processed['spurious_property'] = 'some value' - -@pytest.mark.parametrize('prepare_source_example', [ - prepare_default, - lambda tmpdir: prepare_modified(tmpdir, modify_index_good) -]) -def test_build(tmpdir, prepare_source_example): - """Build the sample source package and verify the produced files.""" - # First, build the package - dstdir = Path(tmpdir) / 'dstdir' - tmpdir = Path(tmpdir) / 'example' - - dstdir.mkdir(exist_ok=True) - tmpdir.mkdir(exist_ok=True) - - settings = prepare_source_example(tmpdir) - - build.Build(settings.srcdir, settings.index_json_path)\ - .write_package_files(dstdir) - - # Verify directories under destination directory - assert {'file', 'resource', 'mapping', 'source'} == \ - set([path.name for path in dstdir.iterdir()]) - - # Verify files under 'file/' - file_dir = dstdir / 'file' / 'sha256' - - for fn in settings.dist_filenames: - dist_file_path = file_dir / settings.sha256_hashes[fn] - assert dist_file_path.is_file() - - assert dist_file_path.read_bytes() == settings.contents[fn] - - sha256_hashes_set = set([settings.sha256_hashes[fn] - for fn in settings.dist_filenames]) - - spdx_report_sha256 = None - - for path in file_dir.iterdir(): - if path.name in sha256_hashes_set: - continue - - assert spdx_report_sha256 is None and settings.report_spdx_included - - with open(path, 'rt') as file_handle: - spdx_contents = file_handle.read() - - spdx_report_sha256 = sha256(spdx_contents.encode()).digest().hex() - assert spdx_report_sha256 == path.name - - for fn in settings.src_filenames: - if not any([n in fn.lower() for n in ('license', 'reuse')]): - assert settings.sha1_hashes[fn] - - if settings.report_spdx_included: - assert spdx_report_sha256 - for obj in settings.expected(): - for file_ref in obj['source_copyright']: - if file_ref['file'] == 'report.spdx': - file_ref['sha256'] = spdx_report_sha256 - - # Verify files under 'resource/' - resource_dir = dstdir / 'resource' - - assert set([rj['identifier'] for rj in settings.expected_resources]) == \ - set([path.name for path in resource_dir.iterdir()]) - - for resource_json in settings.expected_resources: - subdir = resource_dir / resource_json['identifier'] - assert ['2021.11.10'] == [path.name for path in subdir.iterdir()] - - with open(subdir / '2021.11.10', 'rt') as file_handle: - assert json.load(file_handle) == resource_json - - hydrilla_util.validator_for('api_resource_description-1.0.1.schema.json')\ - .validate(resource_json) - - # Verify files under 'mapping/' - mapping_dir = dstdir / 'mapping' - assert ['helloapple'] == [path.name for path in mapping_dir.iterdir()] - - subdir = mapping_dir / 'helloapple' - assert ['2021.11.10'] == [path.name for path in subdir.iterdir()] - - with open(subdir / '2021.11.10', 'rt') as file_handle: - assert json.load(file_handle) == settings.expected_mapping - - hydrilla_util.validator_for('api_mapping_description-1.0.1.schema.json')\ - .validate(settings.expected_mapping) - - # Verify files under 'source/' - source_dir = dstdir / 'source' - assert {'hello.json', 'hello.zip'} == \ - set([path.name for path in source_dir.iterdir()]) - - zip_filenames = [f'hello/{fn}' for fn in settings.src_filenames] - - with ZipFile(source_dir / 'hello.zip', 'r') as archive: - assert set([f.filename for f in archive.filelist]) == set(zip_filenames) - - for zip_fn, src_fn in zip(zip_filenames, settings.src_filenames): - with archive.open(zip_fn, 'r') as zip_file_handle: - assert zip_file_handle.read() == settings.contents[src_fn] - - zip_ref = settings.expected_source_description['source_archives']['zip'] - with open(source_dir / 'hello.zip', 'rb') as file_handle: - zip_ref['sha256'] = sha256(file_handle.read()).digest().hex() - - with open(source_dir / 'hello.json', 'rt') as file_handle: - assert json.load(file_handle) == settings.expected_source_description - - hydrilla_util.validator_for('api_source_description-1.0.1.schema.json')\ - .validate(settings.expected_source_description) - -def modify_index_missing_file(dummy: CaseSettings, obj: dict) -> None: - """ - Modify index.json to expect missing report.spdx file and cause an error. - """ - del obj['reuse_generate_spdx_report'] - -def modify_index_schema_error(dummy: CaseSettings, obj: dict) -> None: - """Modify index.json to be incompliant with the schema.""" - del obj['definitions'] - -def modify_index_bad_comment(dummy: CaseSettings, obj: dict) -> str: - """Modify index.json to have an invalid '/' in it.""" - return json.dumps(obj) + '/something\n' - -def modify_index_bad_json(dummy: CaseSettings, obj: dict) -> str: - """Modify index.json to not be valid json even after comment stripping.""" - return json.dumps(obj) + '???/\n' - -def modify_index_missing_license(settings: CaseSettings, obj: dict) -> None: - """Remove a file to make package REUSE-incompliant.""" - (settings.srcdir / 'README.txt.license').unlink() - -def modify_index_file_outside(dummy: CaseSettings, obj: dict) -> None: - """Make index.json illegally reference a file outside srcdir.""" - obj['copyright'].append({'file': '../abc'}) - -def modify_index_reference_itself(dummy: CaseSettings, obj: dict) -> None: - """Make index.json illegally reference index.json.""" - obj['copyright'].append({'file': 'index.json'}) - -def modify_index_report_excluded(dummy: CaseSettings, obj: dict) -> None: - """ - Make index.json require generation of index.json but not include it among - copyright files. - """ - obj['copyright'] = [fr for fr in obj['copyright'] - if fr['file'] != 'report.spdx'] - -@pytest.mark.parametrize('break_index_json', [ - (modify_index_missing_file, FileNotFoundError), - (modify_index_schema_error, ValidationError), - (modify_index_bad_comment, json.JSONDecodeError), - (modify_index_bad_json, json.JSONDecodeError), - (modify_index_missing_license, build.ReuseError), - (modify_index_file_outside, build.FileReferenceError), - (modify_index_reference_itself, build.FileReferenceError), - (modify_index_report_excluded, build.FileReferenceError) -]) -def test_build_error(tmpdir: str, break_index_json: tuple[ModifyCb, type]): - """Build the sample source package and verify the produced files.""" - dstdir = Path(tmpdir) / 'dstdir' - tmpdir = Path(tmpdir) / 'example' - - dstdir.mkdir(exist_ok=True) - tmpdir.mkdir(exist_ok=True) - - modify_cb, error_type = break_index_json - - settings = prepare_modified(tmpdir, modify_cb) - - with pytest.raises(error_type): - build.Build(settings.srcdir, settings.index_json_path)\ - .write_package_files(dstdir) diff --git a/tests/test_local_apt.py b/tests/test_local_apt.py new file mode 100644 index 0000000..4f3a831 --- /dev/null +++ b/tests/test_local_apt.py @@ -0,0 +1,651 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +import pytest +import tempfile +import re +import json +from pathlib import Path, PurePosixPath +from zipfile import ZipFile +from tempfile import TemporaryDirectory + +from hydrilla.builder import local_apt +from hydrilla.builder.common_errors import * + +here = Path(__file__).resolve().parent + +from .helpers import * + +@pytest.fixture +def mock_cache_dir(monkeypatch): + """Make local_apt.py cache files to a temporary directory.""" + with tempfile.TemporaryDirectory() as td: + td_path = Path(td) + monkeypatch.setattr(local_apt, 'default_apt_cache_dir', td_path) + yield td_path + +@pytest.fixture +def mock_gnupg_import(monkeypatch, mock_cache_dir): + """Mock gnupg library when imported dynamically.""" + + gnupg_mock_dir = mock_cache_dir / 'gnupg_mock' + gnupg_mock_dir.mkdir() + (gnupg_mock_dir / 'gnupg.py').write_text('GPG = None\n') + + monkeypatch.syspath_prepend(str(gnupg_mock_dir)) + + import gnupg + + keyring_path = mock_cache_dir / 'master_keyring.gpg' + + class MockedImportResult: + """gnupg.ImportResult replacement""" + def __init__(self): + """Initialize MockedImportResult object.""" + self.imported = 1 + + class MockedGPG: + """GPG replacement that does not really invoke GPG.""" + def __init__(self, keyring): + """Verify the keyring path and initialize MockedGPG.""" + assert keyring == str(keyring_path) + + self.known_keys = {*keyring_path.read_text().split('\n')} \ + if keyring_path.exists() else set() + + def recv_keys(self, keyserver, key): + """Mock key receiving - record requested key as received.""" + assert keyserver == local_apt.default_keyserver + assert key not in self.known_keys + + self.known_keys.add(key) + keyring_path.write_text('\n'.join(self.known_keys)) + + return MockedImportResult() + + def list_keys(self, keys=None): + """Mock key listing - return a list with dummy items.""" + if keys is None: + return ['dummy'] * len(self.known_keys) + else: + return ['dummy' for k in keys if k in self.known_keys] + + def export_keys(self, keys, **kwargs): + """ + Mock key export - check that the call has the expected arguments and + return a dummy bytes array. + """ + assert kwargs['armor'] == False + assert kwargs['minimal'] == True + assert {*keys} == self.known_keys + + return b'<dummy keys export>' + + monkeypatch.setattr(gnupg, 'GPG', MockedGPG) + +def process_run_args(command, kwargs, expected_command): + """ + Perform assertions common to all mocked subprocess.run() invocations and + extract variable parts of the command line (if any). + """ + assert kwargs['env'] == {'LANG': 'en_US'} + assert kwargs['capture_output'] == True + + return process_command(command, expected_command) + +def run_apt_get_update(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get update' command just touch some file in apt + root to indicate that the call was made. + """ + expected = ['apt-get', '-c', '<conf_path>', 'update'] + conf_path = Path(process_run_args(command, kwargs, expected)['conf_path']) + + (conf_path.parent / 'update_called').touch() + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +""" +Output of 'apt-get install --yes --just-print libjs-mathjax' on some APT-based +system. +""" +sample_install_stdout = '''\ +NOTE: This is only a simulation! + apt-get needs root privileges for real execution. + Keep also in mind that locking is deactivated, + so don't depend on the relevance to the real current situation! +Reading package lists... +Building dependency tree... +Reading state information... +The following additional packages will be installed: + fonts-mathjax +Suggested packages: + fonts-mathjax-extras fonts-stix libjs-mathjax-doc +The following NEW packages will be installed: + fonts-mathjax libjs-mathjax +0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded. +Inst fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +Inst libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +Conf fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +Conf libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all]) +''' + +def run_apt_get_install(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get install' command just print a possible + output of one. + """ + expected = ['apt-get', '-c', '<conf_path>', 'install', + '--yes', '--just-print', 'libjs-mathjax'] + + conf_path = Path(process_run_args(command, kwargs, expected)['conf_path']) + + return MockedCompletedProcess(command, returncode, + stdout=sample_install_stdout, + text_output=kwargs.get('text')) + +def run_apt_get_download(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get download' command just write some dummy + .deb to the appropriate directory. + """ + expected = ['apt-get', '-c', '<conf_path>', 'download', 'libjs-mathjax'] + if 'fonts-mathjax' in command: + expected.insert(-1, 'fonts-mathjax') + + conf_path = Path(process_run_args(command, kwargs, expected)['conf_path']) + + destination = Path(kwargs.get('cwd') or Path.cwd()) + + for word in expected: + if word.endswith('-mathjax'): + deb_path = destination / f'{word}_2.7.9+dfsg-1_all.deb' + deb_path.write_text(f'dummy {deb_path.name}') + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +def run_apt_get_source(command, returncode=0, **kwargs): + """ + Instead of running an 'apt-get source' command just write some dummy + "tarballs" to the appropriate directory. + """ + expected = ['apt-get', '-c', '<conf_path>', 'source', + '--download-only', 'libjs-mathjax=2.7.9+dfsg-1'] + if 'fonts-mathjax=2.7.9+dfsg-1' in command: + if command[-1] == 'fonts-mathjax=2.7.9+dfsg-1': + expected.append('fonts-mathjax=2.7.9+dfsg-1') + else: + expected.insert(-1, 'fonts-mathjax=2.7.9+dfsg-1') + + destination = Path(kwargs.get('cwd') or Path.cwd()) + for filename in [ + 'mathjax_2.7.9+dfsg-1.debian.tar.xz', + 'mathjax_2.7.9+dfsg-1.dsc', + 'mathjax_2.7.9+dfsg.orig.tar.xz' + ]: + (destination / filename).write_text(f'dummy {filename}') + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +def make_run_apt_get(**returncodes): + """ + Produce a function that chooses and runs the appropriate one of + subprocess_run_apt_get_*() mock functions. + """ + def mock_run(command, **kwargs): + """ + Chooses and runs the appropriate one of subprocess_run_apt_get_*() mock + functions. + """ + for subcommand, run in [ + ('update', run_apt_get_update), + ('install', run_apt_get_install), + ('download', run_apt_get_download), + ('source', run_apt_get_source) + ]: + if subcommand in command: + returncode = returncodes.get(f'{subcommand}_code', 0) + return run(command, returncode, **kwargs) + + raise Exception('Unknown command: {}'.format(' '.join(command))) + + return mock_run + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get()) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_contextmanager(mock_cache_dir): + """ + Verify that the local_apt() function creates a proper apt environment and + that it also properly restores it from cache. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + apt_root = Path(apt.apt_conf).parent.parent + + assert (apt_root / 'etc' / 'trusted.gpg').read_bytes() == \ + b'<dummy keys export>' + + assert (apt_root / 'etc' / 'update_called').exists() + + assert (apt_root / 'etc' / 'apt.sources.list').read_text() == \ + 'deb-src sth\ndeb sth' + + conf_lines = (apt_root / 'etc' / 'apt.conf').read_text().split('\n') + + # check mocked keyring + assert {*local_apt.default_keys} == \ + {*(mock_cache_dir / 'master_keyring.gpg').read_text().split('\n')} + + assert not apt_root.exists() + + expected_conf = { + 'Dir': str(apt_root), + 'Dir::State': f'{apt_root}/var/lib/apt', + 'Dir::State::status': f'{apt_root}/var/lib/dpkg/status', + 'Dir::Etc::SourceList': f'{apt_root}/etc/apt.sources.list', + 'Dir::Etc::SourceParts': '', + 'Dir::Cache': f'{apt_root}/var/cache/apt', + 'pkgCacheGen::Essential': 'none', + 'Dir::Etc::Trusted': f'{apt_root}/etc/trusted.gpg', + } + + conf_regex = re.compile(r'^(?P<key>\S+)\s"(?P<val>\S*)";$') + assert dict([(m.group('key'), m.group('val')) + for l in conf_lines if l for m in [conf_regex.match(l)]]) == \ + expected_conf + + with ZipFile(mock_cache_dir / f'apt_{sources_list.identity()}.zip') as zf: + # reuse the same APT, its cached zip file should exist now + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + apt_root = Path(apt.apt_conf).parent.parent + + expected_members = {*apt_root.rglob('*')} + expected_members.remove(apt_root / 'etc' / 'apt.conf') + expected_members.remove(apt_root / 'etc' / 'trusted.gpg') + + names = zf.namelist() + assert len(names) == len(expected_members) + + for name in names: + path = apt_root / name + assert path in expected_members + assert zf.read(name) == \ + (b'' if path.is_dir() else path.read_bytes()) + + assert not apt_root.exists() + +@pytest.mark.subprocess_run(local_apt, run_missing_executable) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_missing(mock_cache_dir): + """ + Verify that the local_apt() function raises a proper error when 'apt-get' + command is missing. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + + with pytest.raises(local_apt.AptError) as excinfo: + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + pass + + assert len(excinfo.value.args) == 1 + assert isinstance(excinfo.value.args[0], str) + assert '\n' not in excinfo.value.args[0] + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(update_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_update_fail(mock_cache_dir): + """ + Verify that the local_apt() function raises a proper error when + 'apt-get update' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + + with pytest.raises(local_apt.AptError) as excinfo: + with local_apt.local_apt(sources_list, local_apt.default_keys) as apt: + pass + + assert len(excinfo.value.args) == 1 + + assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output', + excinfo.value.args[0]) + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get()) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_download(mock_cache_dir): + """ + Verify that download_apt_packages() function properly performs the download + of .debs and sources. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination) + + libjs_mathjax_path = destination / 'libjs-mathjax_2.7.9+dfsg-1_all.deb' + fonts_mathjax_path = destination / 'fonts-mathjax_2.7.9+dfsg-1_all.deb' + + source_paths = [ + destination / 'mathjax_2.7.9+dfsg-1.debian.tar.xz', + destination / 'mathjax_2.7.9+dfsg-1.dsc', + destination / 'mathjax_2.7.9+dfsg.orig.tar.xz' + ] + + assert {*destination.iterdir()} == {libjs_mathjax_path, *source_paths} + + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, + with_deps=True) + + assert {*destination.iterdir()} == \ + {libjs_mathjax_path, fonts_mathjax_path, *source_paths} + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(install_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_install_fail(mock_cache_dir): + """ + Verify that the download_apt_packages() function raises a proper error when + 'apt-get install' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + with pytest.raises(local_apt.AptError) as excinfo: + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination, + with_deps=True) + + assert len(excinfo.value.args) == 1 + + assert re.match(r'^.*\n\n.*\n\n', excinfo.value.args[0]) + assert re.search(r'\n\nsome error output$', excinfo.value.args[0]) + assert sample_install_stdout in excinfo.value.args[0] + + assert [*destination.iterdir()] == [] + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(download_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_download_fail(mock_cache_dir): + """ + Verify that the download_apt_packages() function raises a proper error when + 'apt-get download' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + with pytest.raises(local_apt.AptError) as excinfo: + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination) + + assert len(excinfo.value.args) == 1 + + assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output', + excinfo.value.args[0]) + + assert [*destination.iterdir()] == [] + +@pytest.mark.subprocess_run(local_apt, make_run_apt_get(source_code=1)) +@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import') +def test_local_apt_source_fail(mock_cache_dir): + """ + Verify that the download_apt_packages() function raises a proper error when + 'apt-get source' command returns non-0. + """ + sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth']) + destination = mock_cache_dir / 'destination' + destination.mkdir() + + with pytest.raises(local_apt.AptError) as excinfo: + local_apt.download_apt_packages(sources_list, local_apt.default_keys, + ['libjs-mathjax'], destination) + + assert len(excinfo.value.args) == 1 + + assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output', + excinfo.value.args[0]) + + assert [*destination.iterdir()] == [] + +def test_sources_list(): + """Verify that the SourcesList class works properly.""" + list = local_apt.SourcesList([], 'nabia') + assert list.identity() == 'nabia' + + with pytest.raises(local_apt.DistroError): + local_apt.SourcesList([], 'nabiaĆ') + + list = local_apt.SourcesList(['deb sth', 'deb-src sth'], 'nabia') + assert list.identity() == \ + 'ef28d408b96046eae45c8ab3094ce69b2ac0c02a887e796b1d3d1a4f06fb49f1' + +def run_dpkg_deb(command, returncode=0, **kwargs): + """ + Insted of running an 'dpkg-deb -x' command just create some dummy file + in the destination directory. + """ + expected = ['dpkg-deb', '-x', '<deb_path>', '<dst_path>'] + + variables = process_run_args(command, kwargs, expected) + deb_path = Path(variables['deb_path']) + dst_path = Path(variables['dst_path']) + + package_name = re.match('^([^_]+)_.*', deb_path.name).group(1) + for path in [ + dst_path / 'etc' / f'dummy_{package_name}_config', + dst_path / 'usr/share/doc' / package_name / 'copyright' + ]: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(f'dummy {path.name}') + + return MockedCompletedProcess(command, returncode, + text_output=kwargs.get('text')) + +def download_apt_packages(list, keys, packages, destination_dir, + with_deps=False): + """ + Replacement for download_apt_packages() function in local_apt.py, for + unit-testing the piggybacked_system() function. + """ + for path in [ + destination_dir / 'some-bin-package_1.1-2_all.deb', + destination_dir / 'another-package_1.1-2_all.deb', + destination_dir / 'some-source-package_1.1.orig.tar.gz', + destination_dir / 'some-source-package_1.1-1.dsc' + ]: + path.write_text(f'dummy {path.name}') + + with open(destination_dir / 'test_data.json', 'w') as out: + json.dump({ + 'list_identity': list.identity(), + 'keys': keys, + 'packages': packages, + 'with_deps': with_deps + }, out) + +@pytest.fixture +def mock_download_packages(monkeypatch): + """Mock the download_apt_packages() function in local_apt.py.""" + monkeypatch.setattr(local_apt, 'download_apt_packages', + download_apt_packages) + +@pytest.mark.subprocess_run(local_apt, run_dpkg_deb) +@pytest.mark.parametrize('params', [ + { + 'with_deps': False, + 'base_depends': True, + 'identity': 'nabia', + 'props': {'distribution': 'nabia', 'dependencies': False}, + 'all_keys': local_apt.default_keys + }, + { + 'with_deps': True, + 'base_depends': False, + 'identity': '38db0b4fa2f6610cd1398b66a2c05d9abb1285f9a055a96eb96dee0f6b72aca8', + 'props': { + 'sources_list': [f'deb{suf} http://example.com/ stable main' + for suf in ('', '-src')], + 'trusted_keys': ['AB' * 20], + 'dependencies': True, + 'depend_on_base_packages': False + }, + 'all_keys': [*local_apt.default_keys, 'AB' * 20], + } +]) +@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run') +def test_piggybacked_system_download(params): + """ + Verify that the piggybacked_system() function properly downloads and unpacks + APT packages. + """ + with local_apt.piggybacked_system({ + 'system': 'apt', + **params['props'], + 'packages': ['some-bin-package', 'another-package=1.1-2'] + }, None) as piggybacked: + expected_depends = [{'identifier': 'apt-common-licenses'}] \ + if params['base_depends'] else [] + assert piggybacked.package_must_depend == expected_depends + + archive_files = dict(piggybacked.archive_files()) + + archive_names = [ + 'some-bin-package_1.1-2_all.deb', + 'another-package_1.1-2_all.deb', + 'some-source-package_1.1.orig.tar.gz', + 'some-source-package_1.1-1.dsc', + 'test_data.json' + ] + assert {*archive_files.keys()} == \ + {PurePosixPath('apt') / n for n in archive_names} + + for path in archive_files.values(): + if path.name == 'test_data.json': + assert json.loads(path.read_text()) == { + 'list_identity': params['identity'], + 'keys': params['all_keys'], + 'packages': ['some-bin-package', 'another-package=1.1-2'], + 'with_deps': params['with_deps'] + } + else: + assert path.read_text() == f'dummy {path.name}' + + license_files = {*piggybacked.package_license_files} + + assert license_files == { + PurePosixPath('.apt-root/usr/share/doc/another-package/copyright'), + PurePosixPath('.apt-root/usr/share/doc/some-bin-package/copyright') + } + + assert ['dummy copyright'] * 2 == \ + [piggybacked.resolve_file(p).read_text() for p in license_files] + + for name in ['some-bin-package', 'another-package']: + path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config') + assert piggybacked.resolve_file(path).read_text() == \ + f'dummy {path.name}' + + assert piggybacked.resolve_file(PurePosixPath('a/b/c')) == None + assert piggybacked.resolve_file(PurePosixPath('')) == None + + with pytest.raises(FileReferenceError): + piggybacked.resolve_file(PurePosixPath('.apt-root/a/../../../b')) + + root = piggybacked.resolve_file(PurePosixPath('.apt-root/dummy')).parent + assert root.is_dir() + + assert not root.exists() + +@pytest.mark.subprocess_run(local_apt, run_dpkg_deb) +@pytest.mark.usefixtures('mock_subprocess_run') +def test_piggybacked_system_no_download(): + """ + Verify that the piggybacked_system() function is able to use pre-downloaded + APT packages. + """ + archive_names = { + f'{package}{rest}' + for package in ('some-lib_1:2.3', 'other-lib_4.45.2') + for rest in ('-1_all.deb', '.orig.tar.gz', '-1.debian.tar.xz', '-1.dsc') + } + + with TemporaryDirectory() as td: + td = Path(td) + (td / 'apt').mkdir() + for name in archive_names: + (td / 'apt' / name).write_text(f'dummy {name}') + + with local_apt.piggybacked_system({ + 'system': 'apt', + 'distribution': 'nabia', + 'dependencies': True, + 'packages': ['whatever', 'whatever2'] + }, td) as piggybacked: + archive_files = dict(piggybacked.archive_files()) + + assert {*archive_files.keys()} == \ + {PurePosixPath('apt') / name for name in archive_names} + + for path in archive_files.values(): + assert path.read_text() == f'dummy {path.name}' + + assert {*piggybacked.package_license_files} == { + PurePosixPath('.apt-root/usr/share/doc/some-lib/copyright'), + PurePosixPath('.apt-root/usr/share/doc/other-lib/copyright') + } + + for name in ['some-lib', 'other-lib']: + path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config') + assert piggybacked.resolve_file(path).read_text() == \ + f'dummy {path.name}' + +@pytest.mark.subprocess_run(local_apt, run_missing_executable) +@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run') +def test_piggybacked_system_missing(): + """ + Verify that the piggybacked_system() function raises a proper error when + 'dpkg-deb' is missing. + """ + with pytest.raises(local_apt.AptError) as excinfo: + with local_apt.piggybacked_system({ + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['some-package'], + 'dependencies': False + }, None) as piggybacked: + pass + + assert len(excinfo.value.args) == 1 + + assert '\n' not in excinfo.value.args[0] + + +@pytest.mark.subprocess_run(local_apt, lambda c, **kw: run_dpkg_deb(c, 1, **kw)) +@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run') +def test_piggybacked_system_fail(): + """ + Verify that the piggybacked_system() function raises a proper error when + 'dpkg-deb -x' command returns non-0. + """ + with pytest.raises(local_apt.AptError) as excinfo: + with local_apt.piggybacked_system({ + 'system': 'apt', + 'distribution': 'nabia', + 'packages': ['some-package'], + 'dependencies': False + }, None) as piggybacked: + pass + + assert len(excinfo.value.args) == 1 + + assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output', + excinfo.value.args[0]) |