aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitmodules4
-rw-r--r--conftest.py25
-rw-r--r--pyproject.toml5
-rw-r--r--src/hydrilla/builder/build.py298
-rw-r--r--src/hydrilla/builder/common_errors.py67
-rw-r--r--src/hydrilla/builder/local_apt.py428
-rw-r--r--src/hydrilla/builder/piggybacking.py115
m---------src/hydrilla/schemas0
-rw-r--r--tests/__init__.py5
-rw-r--r--tests/helpers.py51
-rw-r--r--tests/test_build.py674
-rw-r--r--tests/test_hydrilla_builder.py472
-rw-r--r--tests/test_local_apt.py651
13 files changed, 2183 insertions, 612 deletions
diff --git a/.gitmodules b/.gitmodules
index 6e47d90..ccb70a3 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,9 +4,9 @@
#
# Available under the terms of Creative Commons Zero v1.0 Universal.
-[submodule "src/hydrilla/schemas"]
+[submodule "hydrilla-json-schemas"]
path = src/hydrilla/schemas
url = ../hydrilla-json-schemas
-[submodule "src/test/source-package-example"]
+[submodule "hydrilla-source-package-example"]
path = tests/source-package-example
url = ../hydrilla-source-package-example
diff --git a/conftest.py b/conftest.py
index 1aef80a..141cba5 100644
--- a/conftest.py
+++ b/conftest.py
@@ -7,5 +7,30 @@
import sys
from pathlib import Path
+import pytest
+
here = Path(__file__).resolve().parent
sys.path.insert(0, str(here / 'src'))
+
+@pytest.fixture(autouse=True)
+def no_requests(monkeypatch):
+ """Remove requests.sessions.Session.request for all tests."""
+ monkeypatch.delattr('requests.sessions.Session.request')
+
+@pytest.fixture
+def mock_subprocess_run(monkeypatch, request):
+ """
+ Temporarily replace subprocess.run() with a function supplied through pytest
+ marker 'subprocess_run'.
+
+ The marker excepts 2 arguments:
+ * the module inside which the subprocess attribute should be mocked and
+ * a run() function to use.
+ """
+ where, mocked_run = request.node.get_closest_marker('subprocess_run').args
+
+ class MockedSubprocess:
+ """Minimal mocked version of the subprocess module."""
+ run = mocked_run
+
+ monkeypatch.setattr(where, 'subprocess', MockedSubprocess)
diff --git a/pyproject.toml b/pyproject.toml
index 968455f..41eaf49 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -13,7 +13,10 @@ write_to = "src/hydrilla/builder/_version.py"
[tool.pytest.ini_options]
minversion = "6.0"
-addopts = "-ra -q"
+addopts = "-ra"
testpaths = [
"tests"
]
+markers = [
+ "subprocess_run: define how mocked subprocess.run should behave"
+]
diff --git a/src/hydrilla/builder/build.py b/src/hydrilla/builder/build.py
index 8eec4a4..ce4935c 100644
--- a/src/hydrilla/builder/build.py
+++ b/src/hydrilla/builder/build.py
@@ -30,21 +30,28 @@ from __future__ import annotations
import json
import re
import zipfile
-from pathlib import Path
+import subprocess
+from pathlib import Path, PurePosixPath
from hashlib import sha256
from sys import stderr
+from contextlib import contextmanager
+from tempfile import TemporaryDirectory, TemporaryFile
+from typing import Optional, Iterable, Union
import jsonschema
import click
from .. import util
from . import _version
+from . import local_apt
+from .piggybacking import Piggybacked
+from .common_errors import *
here = Path(__file__).resolve().parent
_ = util.translation(here / 'locales').gettext
-index_validator = util.validator_for('package_source-1.0.1.schema.json')
+index_validator = util.validator_for('package_source-2.schema.json')
schemas_root = 'https://hydrilla.koszko.org/schemas'
@@ -53,202 +60,201 @@ generated_by = {
'version': _version.version
}
-class FileReferenceError(Exception):
- """
- Exception used to report various problems concerning files referenced from
- source package's index.json.
- """
-
-class ReuseError(Exception):
+class ReuseError(SubprocessError):
"""
Exception used to report various problems when calling the REUSE tool.
"""
-class FileBuffer:
- """
- Implement a file-like object that buffers data written to it.
- """
- def __init__(self):
- """
- Initialize FileBuffer.
- """
- self.chunks = []
-
- def write(self, b):
- """
- Buffer 'b', return number of bytes buffered.
-
- 'b' is expected to be an instance of 'bytes' or 'str', in which case it
- gets encoded as UTF-8.
- """
- if type(b) is str:
- b = b.encode()
- self.chunks.append(b)
- return len(b)
-
- def flush(self):
- """
- A no-op mock of file-like object's flush() method.
- """
- pass
-
- def get_bytes(self):
- """
- Return all data written so far concatenated into a single 'bytes'
- object.
- """
- return b''.join(self.chunks)
-
-def generate_spdx_report(root):
+def generate_spdx_report(root: Path) -> bytes:
"""
Use REUSE tool to generate an SPDX report for sources under 'root' and
return the report's contents as 'bytes'.
- 'root' shall be an instance of pathlib.Path.
-
In case the directory tree under 'root' does not constitute a
- REUSE-compliant package, linting report is printed to standard output and
- an exception is raised.
+ REUSE-compliant package, as exception is raised with linting report
+ included in it.
- In case the reuse package is not installed, an exception is also raised.
+ In case the reuse tool is not installed, an exception is also raised.
"""
- try:
- from reuse._main import main as reuse_main
- except ModuleNotFoundError:
- raise ReuseError(_('couldnt_import_reuse_is_it_installed'))
+ for command in [
+ ['reuse', '--root', str(root), 'lint'],
+ ['reuse', '--root', str(root), 'spdx']
+ ]:
+ try:
+ cp = subprocess.run(command, capture_output=True, text=True)
+ except FileNotFoundError:
+ raise ReuseError(_('couldnt_execute_reuse_is_it_installed'))
- mocked_output = FileBuffer()
- if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0:
- stderr.write(mocked_output.get_bytes().decode())
- raise ReuseError(_('spdx_report_from_reuse_incompliant'))
+ if cp.returncode != 0:
+ msg = _('reuse_command_{}_failed').format(' '.join(command))
+ raise ReuseError(msg, cp)
- mocked_output = FileBuffer()
- if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0:
- stderr.write(mocked_output.get_bytes().decode())
- raise ReuseError("Couldn't generate an SPDX report for package.")
-
- return mocked_output.get_bytes()
+ return cp.stdout.encode()
class FileRef:
"""Represent reference to a file in the package."""
- def __init__(self, path: Path, contents: bytes):
+ def __init__(self, path: PurePosixPath, contents: bytes) -> None:
"""Initialize FileRef."""
- self.include_in_distribution = False
- self.include_in_zipfile = True
- self.path = path
- self.contents = contents
+ self.include_in_distribution = False
+ self.include_in_source_archive = True
+ self.path = path
+ self.contents = contents
self.contents_hash = sha256(contents).digest().hex()
- def make_ref_dict(self, filename: str):
+ def make_ref_dict(self) -> dict[str, str]:
"""
Represent the file reference through a dict that can be included in JSON
defintions.
"""
return {
- 'file': filename,
+ 'file': str(self.path),
'sha256': self.contents_hash
}
+@contextmanager
+def piggybacked_system(piggyback_def: Optional[dict],
+ piggyback_files: Optional[Path]) \
+ -> Iterable[Piggybacked]:
+ """
+ Resolve resources from a foreign software packaging system. Optionally, use
+ package files (.deb's, etc.) from a specified directory instead of resolving
+ and downloading them.
+ """
+ if piggyback_def is None:
+ yield Piggybacked()
+ else:
+ # apt is the only supported system right now
+ assert piggyback_def['system'] == 'apt'
+
+ with local_apt.piggybacked_system(piggyback_def, piggyback_files) \
+ as piggybacked:
+ yield piggybacked
+
class Build:
"""
Build a Hydrilla package.
"""
- def __init__(self, srcdir, index_json_path):
+ def __init__(self, srcdir: Path, index_json_path: Path,
+ piggyback_files: Optional[Path]=None):
"""
Initialize a build. All files to be included in a distribution package
are loaded into memory, all data gets validated and all necessary
computations (e.g. preparing of hashes) are performed.
-
- 'srcdir' and 'index_json' are expected to be pathlib.Path objects.
"""
self.srcdir = srcdir.resolve()
- self.index_json_path = index_json_path
+ self.piggyback_files = piggyback_files
+ # TODO: the piggyback files we set are ignored for now; use them
+ if piggyback_files is None:
+ piggyback_default_path = \
+ srcdir.parent / f'{srcdir.name}.foreign-packages'
+ if piggyback_default_path.exists():
+ self.piggyback_files = piggyback_default_path
self.files_by_path = {}
self.resource_list = []
self.mapping_list = []
if not index_json_path.is_absolute():
- self.index_json_path = (self.srcdir / self.index_json_path)
-
- self.index_json_path = self.index_json_path.resolve()
+ index_json_path = (self.srcdir / index_json_path)
- with open(self.index_json_path, 'rt') as index_file:
+ with open(index_json_path, 'rt') as index_file:
index_json_text = index_file.read()
index_obj = json.loads(util.strip_json_comments(index_json_text))
- self.files_by_path[self.srcdir / 'index.json'] = \
- FileRef(self.srcdir / 'index.json', index_json_text.encode())
+ index_desired_path = PurePosixPath('index.json')
+ self.files_by_path[index_desired_path] = \
+ FileRef(index_desired_path, index_json_text.encode())
self._process_index_json(index_obj)
- def _process_file(self, filename: str, include_in_distribution: bool=True):
+ def _process_file(self, filename: Union[str, PurePosixPath],
+ piggybacked: Piggybacked,
+ include_in_distribution: bool=True):
"""
Resolve 'filename' relative to srcdir, load it to memory (if not loaded
before), compute its hash and store its information in
'self.files_by_path'.
- 'filename' shall represent a relative path using '/' as a separator.
+ 'filename' shall represent a relative path withing package directory.
if 'include_in_distribution' is True it shall cause the file to not only
be included in the source package's zipfile, but also written as one of
built package's files.
+ For each file an attempt is made to resolve it using 'piggybacked'
+ object. If a file is found and pulled from foreign software packaging
+ system this way, it gets automatically excluded from inclusion in
+ Hydrilla source package's zipfile.
+
Return file's reference object that can be included in JSON defintions
of various kinds.
"""
- path = self.srcdir
- for segment in filename.split('/'):
- path /= segment
-
- path = path.resolve()
- if not path.is_relative_to(self.srcdir):
- raise FileReferenceError(_('loading_{}_outside_package_dir')
- .format(filename))
-
- if str(path.relative_to(self.srcdir)) == 'index.json':
- raise FileReferenceError(_('loading_reserved_index_json'))
+ include_in_source_archive = True
+
+ desired_path = PurePosixPath(filename)
+ if '..' in desired_path.parts:
+ msg = _('path_contains_double_dot_{}').format(filename)
+ raise FileReferenceError(msg)
+
+ path = piggybacked.resolve_file(desired_path)
+ if path is None:
+ path = (self.srcdir / desired_path).resolve()
+ if not path.is_relative_to(self.srcdir):
+ raise FileReferenceError(_('loading_{}_outside_package_dir')
+ .format(filename))
+
+ if str(path.relative_to(self.srcdir)) == 'index.json':
+ raise FileReferenceError(_('loading_reserved_index_json'))
+ else:
+ include_in_source_archive = False
- file_ref = self.files_by_path.get(path)
+ file_ref = self.files_by_path.get(desired_path)
if file_ref is None:
with open(path, 'rb') as file_handle:
contents = file_handle.read()
- file_ref = FileRef(path, contents)
- self.files_by_path[path] = file_ref
+ file_ref = FileRef(desired_path, contents)
+ self.files_by_path[desired_path] = file_ref
if include_in_distribution:
file_ref.include_in_distribution = True
- return file_ref.make_ref_dict(filename)
+ if not include_in_source_archive:
+ file_ref.include_in_source_archive = False
+
+ return file_ref.make_ref_dict()
- def _prepare_source_package_zip(self, root_dir_name: str):
+ def _prepare_source_package_zip(self, source_name: str,
+ piggybacked: Piggybacked) -> str:
"""
Create and store in memory a .zip archive containing files needed to
build this source package.
- 'root_dir_name' shall not contain any slashes ('/').
+ 'src_dir_name' shall not contain any slashes ('/').
Return zipfile's sha256 sum's hexstring.
"""
- fb = FileBuffer()
- root_dir_path = Path(root_dir_name)
+ tf = TemporaryFile()
+ source_dir_path = PurePosixPath(source_name)
+ piggybacked_dir_path = PurePosixPath(f'{source_name}.foreign-packages')
- def zippath(file_path):
- file_path = root_dir_path / file_path.relative_to(self.srcdir)
- return file_path.as_posix()
-
- with zipfile.ZipFile(fb, 'w') as xpi:
+ with zipfile.ZipFile(tf, 'w') as zf:
for file_ref in self.files_by_path.values():
- if file_ref.include_in_zipfile:
- xpi.writestr(zippath(file_ref.path), file_ref.contents)
+ if file_ref.include_in_source_archive:
+ zf.writestr(str(source_dir_path / file_ref.path),
+ file_ref.contents)
+
+ for desired_path, real_path in piggybacked.archive_files():
+ zf.writestr(str(piggybacked_dir_path / desired_path),
+ real_path.read_bytes())
- self.source_zip_contents = fb.get_bytes()
+ tf.seek(0)
+ self.source_zip_contents = tf.read()
return sha256(self.source_zip_contents).digest().hex()
- def _process_item(self, item_def: dict):
+ def _process_item(self, item_def: dict, piggybacked: Piggybacked):
"""
Process 'item_def' as definition of a resource/mapping and store in
memory its processed form and files used by it.
@@ -266,14 +272,14 @@ class Build:
copy_props.append('revision')
- script_file_refs = [self._process_file(f['file'])
+ script_file_refs = [self._process_file(f['file'], piggybacked)
for f in item_def.get('scripts', [])]
deps = [{'identifier': res_ref['identifier']}
for res_ref in item_def.get('dependencies', [])]
new_item_obj = {
- 'dependencies': deps,
+ 'dependencies': [*piggybacked.package_must_depend, *deps],
'scripts': script_file_refs
}
else:
@@ -308,41 +314,54 @@ class Build:
in it.
"""
index_validator.validate(index_obj)
+ match = re.match(r'.*-((([1-9][0-9]*|0)\.)+)schema\.json$',
+ index_obj['$schema'])
+ self.source_schema_ver = \
+ [int(n) for n in filter(None, match.group(1).split('.'))]
- schema = f'{schemas_root}/api_source_description-1.schema.json'
+ out_schema = f'{schemas_root}/api_source_description-1.schema.json'
self.source_name = index_obj['source_name']
generate_spdx = index_obj.get('reuse_generate_spdx_report', False)
if generate_spdx:
contents = generate_spdx_report(self.srcdir)
- spdx_path = (self.srcdir / 'report.spdx').resolve()
+ spdx_path = PurePosixPath('report.spdx')
spdx_ref = FileRef(spdx_path, contents)
- spdx_ref.include_in_zipfile = False
+ spdx_ref.include_in_source_archive = False
self.files_by_path[spdx_path] = spdx_ref
- self.copyright_file_refs = \
- [self._process_file(f['file']) for f in index_obj['copyright']]
+ piggyback_def = None
+ if self.source_schema_ver >= [1, 1] and 'piggyback_on' in index_obj:
+ piggyback_def = index_obj['piggyback_on']
- if generate_spdx and not spdx_ref.include_in_distribution:
- raise FileReferenceError(_('report_spdx_not_in_copyright_list'))
+ with piggybacked_system(piggyback_def, self.piggyback_files) \
+ as piggybacked:
+ copyright_to_process = [
+ *(file_ref['file'] for file_ref in index_obj['copyright']),
+ *piggybacked.package_license_files
+ ]
+ self.copyright_file_refs = [self._process_file(f, piggybacked)
+ for f in copyright_to_process]
- item_refs = [self._process_item(d) for d in index_obj['definitions']]
+ if generate_spdx and not spdx_ref.include_in_distribution:
+ raise FileReferenceError(_('report_spdx_not_in_copyright_list'))
- for file_ref in index_obj.get('additional_files', []):
- self._process_file(file_ref['file'], include_in_distribution=False)
+ item_refs = [self._process_item(d, piggybacked)
+ for d in index_obj['definitions']]
- root_dir_path = Path(self.source_name)
+ for file_ref in index_obj.get('additional_files', []):
+ self._process_file(file_ref['file'], piggybacked,
+ include_in_distribution=False)
- source_archives_obj = {
- 'zip' : {
- 'sha256': self._prepare_source_package_zip(root_dir_path)
- }
- }
+ zipfile_sha256 = self._prepare_source_package_zip\
+ (self.source_name, piggybacked)
+
+ source_archives_obj = {'zip' : {'sha256': zipfile_sha256}}
self.source_description = {
- '$schema': schema,
+ '$schema': out_schema,
'source_name': self.source_name,
'source_copyright': self.copyright_file_refs,
'upstream_url': index_obj['upstream_url'],
@@ -398,20 +417,25 @@ class Build:
dir_type = click.Path(exists=True, file_okay=False, resolve_path=True)
+@click.command(help=_('build_package_from_srcdir_to_dstdir'))
@click.option('-s', '--srcdir', default='./', type=dir_type, show_default=True,
help=_('source_directory_to_build_from'))
@click.option('-i', '--index-json', default='index.json', type=click.Path(),
help=_('path_instead_of_index_json'))
+@click.option('-p', '--piggyback-files', type=click.Path(),
+ help=_('path_instead_for_piggyback_files'))
@click.option('-d', '--dstdir', type=dir_type, required=True,
help=_('built_package_files_destination'))
@click.version_option(version=_version.version, prog_name='Hydrilla builder',
message=_('%(prog)s_%(version)s_license'),
help=_('version_printing'))
-def perform(srcdir, index_json, dstdir):
- """<this will be replaced by a localized docstring for Click to pick up>"""
- build = Build(Path(srcdir), Path(index_json))
- build.write_package_files(Path(dstdir))
-
-perform.__doc__ = _('build_package_from_srcdir_to_dstdir')
+def perform(srcdir, index_json, piggyback_files, dstdir):
+ """
+ Execute Hydrilla builder to turn source package into a distributable one.
-perform = click.command()(perform)
+ This command is meant to be the entry point of hydrilla-builder command
+ exported by this package.
+ """
+ build = Build(Path(srcdir), Path(index_json),
+ piggyback_files and Path(piggyback_files))
+ build.write_package_files(Path(dstdir))
diff --git a/src/hydrilla/builder/common_errors.py b/src/hydrilla/builder/common_errors.py
new file mode 100644
index 0000000..29782e1
--- /dev/null
+++ b/src/hydrilla/builder/common_errors.py
@@ -0,0 +1,67 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+# Error classes.
+#
+# This file is part of Hydrilla
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+This module defines error types for use in other parts of Hydrilla builder.
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+from pathlib import Path
+
+from .. import util
+
+here = Path(__file__).resolve().parent
+
+_ = util.translation(here / 'locales').gettext
+
+class DistroError(Exception):
+ """
+ Exception used to report problems when resolving an OS distribution.
+ """
+
+class FileReferenceError(Exception):
+ """
+ Exception used to report various problems concerning files referenced from
+ source package.
+ """
+
+class SubprocessError(Exception):
+ """
+ Exception used to report problems related to execution of external
+ processes, includes. various problems when calling apt-* and dpkg-*
+ commands.
+ """
+ def __init__(self, msg: str, cp: Optional[CP]=None) -> None:
+ """Initialize this SubprocessError"""
+ if cp and cp.stdout:
+ msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout])
+
+ if cp and cp.stderr:
+ msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr])
+
+ super().__init__(msg)
diff --git a/src/hydrilla/builder/local_apt.py b/src/hydrilla/builder/local_apt.py
new file mode 100644
index 0000000..8382af8
--- /dev/null
+++ b/src/hydrilla/builder/local_apt.py
@@ -0,0 +1,428 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+# Using a local APT.
+#
+# This file is part of Hydrilla
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import zipfile
+import shutil
+import re
+import subprocess
+CP = subprocess.CompletedProcess
+from pathlib import Path, PurePosixPath
+from tempfile import TemporaryDirectory, NamedTemporaryFile
+from hashlib import sha256
+from contextlib import contextmanager
+from typing import Optional, Iterable
+
+from .. import util
+from .piggybacking import Piggybacked
+from .common_errors import *
+
+here = Path(__file__).resolve().parent
+
+_ = util.translation(here / 'locales').gettext
+
+"""
+Default cache directory to save APT configurations and downloaded GPG keys in.
+"""
+default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
+
+"""
+Default keyserver to use.
+"""
+default_keyserver = 'hkps://keyserver.ubuntu.com:443'
+
+"""
+Default keys to download when using a local APT.
+"""
+default_keys = [
+ # Trisquel
+ 'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
+ '60364C9869F92450421F0C22B138CA450C05112F',
+ # Ubuntu
+ '630239CC130E1A7FD81A27B140976EAF437D05B5',
+ '790BC7277767219C42C86F933B4FE6ACC0B21F32',
+ 'F6ECB3762474EDA9D21B7022871920D1991BC93C',
+ # Debian
+ '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
+ '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
+ 'AC530D520F2F3269F5E98313A48449044AAD5C5D'
+]
+
+"""sources.list file contents for known distros."""
+default_lists = {
+ 'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
+ for type in ('deb', 'deb-src')
+ for suf in ('', '-updates', '-security')]
+}
+
+class GpgError(Exception):
+ """
+ Exception used to report various problems when calling GPG.
+ """
+
+class AptError(SubprocessError):
+ """
+ Exception used to report various problems when calling apt-* and dpkg-*
+ commands.
+ """
+
+def run(command, **kwargs):
+ """A wrapped around subprocess.run that sets some default options."""
+ return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
+ capture_output=True, text=True)
+
+class Apt:
+ """
+ This class represents an APT instance and can be used to call apt-get
+ commands with it.
+ """
+ def __init__(self, apt_conf: str) -> None:
+ """Initialize this Apt object."""
+ self.apt_conf = apt_conf
+
+ def get(self, *args: str, **kwargs) -> CP:
+ """
+ Run apt-get with the specified arguments and raise a meaningful AptError
+ when something goes wrong.
+ """
+ command = ['apt-get', '-c', self.apt_conf, *args]
+ try:
+ cp = run(command, **kwargs)
+ except FileNotFoundError:
+ raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
+
+ if cp.returncode != 0:
+ msg = _('apt_get_command_{}_failed').format(' '.join(command))
+ raise AptError(msg, cp)
+
+ return cp
+
+def cache_dir() -> Path:
+ """
+ Return the directory used to cache data (APT configurations, keyrings) to
+ speed up repeated operations.
+
+ This function first ensures the directory exists.
+ """
+ default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
+ return default_apt_cache_dir
+
+class SourcesList:
+ """Representation of apt's sources.list contents."""
+ def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
+ """Initialize this SourcesList."""
+ self.codename = None
+ self.list = [*list]
+ self.has_extra_entries = bool(self.list)
+
+ if codename is not None:
+ if codename not in default_lists:
+ raise DistroError(_('distro_{}_unknown').format(codename))
+
+ self.codename = codename
+ self.list.extend(default_lists[codename])
+
+ def identity(self) -> str:
+ """
+ Produce a string that uniquely identifies this sources.list contents.
+ """
+ if self.codename and not self.has_extra_entries:
+ return self.codename
+
+ return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
+
+def apt_conf(directory: Path) -> str:
+ """
+ Given local APT's directory, produce a configuration suitable for running
+ APT there.
+
+ 'directory' must not contain any special characters including quotes and
+ spaces.
+ """
+ return f'''
+Dir "{directory}";
+Dir::State "{directory}/var/lib/apt";
+Dir::State::status "{directory}/var/lib/dpkg/status";
+Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
+Dir::Etc::SourceParts "";
+Dir::Cache "{directory}/var/cache/apt";
+pkgCacheGen::Essential "none";
+Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
+'''
+
+def apt_keyring(keys: [str]) -> bytes:
+ """
+ Download the requested keys if necessary and export them as a keyring
+ suitable for passing to APT.
+
+ The keyring is returned as a bytes value that should be written to a file.
+ """
+ try:
+ from gnupg import GPG
+ except ModuleNotFoundError:
+ raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
+
+ gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
+ for key in keys:
+ if gpg.list_keys(keys=[key]) != []:
+ continue
+
+ if gpg.recv_keys(default_keyserver, key).imported == 0:
+ raise GpgError(_('gpg_couldnt_recv_key'))
+
+ return gpg.export_keys(keys, armor=False, minimal=True)
+
+def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
+ """
+ Zip an APT root directory for later use and move the zipfile to the
+ requested destination.
+ """
+ temporary_zip_path = None
+ try:
+ tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
+ dir=cache_dir(), delete=False)
+ temporary_zip_path = Path(tmpfile.name)
+
+ to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
+
+ with zipfile.ZipFile(tmpfile, 'w') as zf:
+ for member in apt_root.rglob('*'):
+ relative = member.relative_to(apt_root)
+ if relative not in to_skip:
+ # This call will also properly add empty folders to zip file
+ zf.write(member, relative, zipfile.ZIP_DEFLATED)
+
+ shutil.move(temporary_zip_path, destination_zip)
+ finally:
+ if temporary_zip_path is not None and temporary_zip_path.exists():
+ temporary_zip_path.unlink()
+
+def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
+ """
+ Create files and directories necessary for running APT without root rights
+ inside 'directory'.
+
+ 'directory' must not contain any special characters including quotes and
+ spaces and must be empty.
+
+ Return an Apt object that can be used to call apt-get commands.
+ """
+ apt_root = directory / 'apt_root'
+
+ conf_text = apt_conf(apt_root)
+ keyring_bytes = apt_keyring(keys)
+
+ apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
+ if apt_zipfile.exists():
+ with zipfile.ZipFile(apt_zipfile) as zf:
+ zf.extractall(apt_root)
+
+ for to_create in (
+ apt_root / 'var' / 'lib' / 'apt' / 'partial',
+ apt_root / 'var' / 'lib' / 'apt' / 'lists',
+ apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
+ apt_root / 'etc' / 'apt' / 'preferences.d',
+ apt_root / 'var' / 'lib' / 'dpkg',
+ apt_root / 'var' / 'log' / 'apt'
+ ):
+ to_create.mkdir(parents=True, exist_ok=True)
+
+ conf_path = apt_root / 'etc' / 'apt.conf'
+ trusted_path = apt_root / 'etc' / 'trusted.gpg'
+ status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
+ list_path = apt_root / 'etc' / 'apt.sources.list'
+
+ conf_path.write_text(conf_text)
+ trusted_path.write_bytes(keyring_bytes)
+ status_path.touch()
+ list_path.write_text('\n'.join(list.list))
+
+ apt = Apt(str(conf_path))
+ apt.get('update')
+
+ cache_apt_root(apt_root, apt_zipfile)
+
+ return apt
+
+@contextmanager
+def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
+ """
+ Create a temporary directory with proper local APT configuration in it.
+ Yield an Apt object that can be used to issue apt-get commands.
+
+ This function returns a context manager that will remove the directory on
+ close.
+ """
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ yield setup_local_apt(td, list, keys)
+
+def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
+ destination_dir: Path, with_deps=False) -> [str]:
+ """
+ Set up a local APT, update it using the specified sources.list configuration
+ and use it to download the specified packages.
+
+ This function downloads a .deb file of the packages matching the current
+ architecture (which includes packages with architecture 'all') as well as
+ all theis corresponding source package files and (if requested) the debs
+ and source files of all their declared dependencies.
+
+ Return value is a list of names of all downloaded files.
+ """
+ with local_apt(list, keys) as apt:
+ if with_deps:
+ cp = apt.get('install', '--yes', '--just-print', *packages)
+
+ deps_listing = re.match(
+ r'''
+ .*
+ The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
+ (.*)
+ 0\supgraded,
+ ''',
+ cp.stdout,
+ re.MULTILINE | re.DOTALL | re.VERBOSE)
+
+ if deps_listing is None:
+ raise AptError(_('apt_install_output_not_understood'), cp)
+
+ packages = deps_listing.group(1).split()
+
+ # Download .debs to indirectly to destination_dir by first placing them
+ # in a temporary subdirectory.
+ with TemporaryDirectory(dir=destination_dir) as td:
+ td = Path(td)
+ cp = apt.get('download', *packages, cwd=td)
+
+ deb_name_regex = re.compile(
+ r'''
+ ^
+ (?P<name>[^_]+)
+ _
+ (?P<ver>[^_]+)
+ _
+ .+ # architecture (or 'all')
+ \.deb
+ $
+ ''',
+ re.VERBOSE)
+
+ names_vers = []
+ downloaded = []
+ for deb_file in td.iterdir():
+ match = deb_name_regex.match(deb_file.name)
+ if match is None:
+ msg = _('apt_download_gave_bad_filename_{}')\
+ .format(deb_file.name)
+ raise AptError(msg, cp)
+
+ names_vers.append((match.group('name'), match.group('ver')))
+ downloaded.append(deb_file.name)
+
+ apt.get('source', '--download-only',
+ *[f'{n}={v}' for n, v in names_vers], cwd=td)
+
+ for source_file in td.iterdir():
+ if source_file.name in downloaded:
+ continue
+
+ downloaded.append(source_file.name)
+
+ for filename in downloaded:
+ shutil.move(td / filename, destination_dir / filename)
+
+ return downloaded
+
+@contextmanager
+def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
+ -> Iterable[Piggybacked]:
+ """
+ Resolve resources from APT. Optionally, use package files (.deb's, etc.)
+ from a specified directory instead of resolving and downloading them.
+
+ The directories and files created for the yielded Piggybacked object shall
+ be deleted when this context manager gets closed.
+ """
+ assert piggyback_def['system'] == 'apt'
+
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ root = td / 'root'
+ root.mkdir()
+
+ if foreign_packages is None:
+ archives = td / 'archives'
+ archives.mkdir()
+
+ sources_list = SourcesList(piggyback_def.get('sources_list', []),
+ piggyback_def.get('distribution'))
+ packages = piggyback_def['packages']
+ with_deps = piggyback_def['dependencies']
+ pgp_keys = [
+ *default_keys,
+ *piggyback_def.get('trusted_keys', [])
+ ]
+
+ download_apt_packages(
+ list=sources_list,
+ keys=pgp_keys,
+ packages=packages,
+ destination_dir=archives,
+ with_deps=with_deps
+ )
+ else:
+ archives = foreign_packages / 'apt'
+
+ for deb in archives.glob('*.deb'):
+ command = ['dpkg-deb', '-x', str(deb), str(root)]
+ try:
+ cp = run(command)
+ except FileNotFoundError:
+ raise AptError(_('couldnt_execute_dpkg_deb_is_it_installed'))
+
+ if cp.returncode != 0:
+ msg = _('dpkg_deb_command_{}_failed').format(' '.join(command))
+ raise AptError(msg, cp)
+
+ docs_dir = root / 'usr' / 'share' / 'doc'
+ copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
+ if docs_dir.exists() else []
+ copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
+ for p in copyright_paths if p.exists()]
+
+ standard_depends = piggyback_def.get('depend_on_base_packages', True)
+ must_depend = [{'identifier': 'apt-common-licenses'}] \
+ if standard_depends else []
+
+ yield Piggybacked(
+ archives={'apt': archives},
+ roots={'.apt-root': root},
+ package_license_files=copyright_paths,
+ package_must_depend=must_depend
+ )
diff --git a/src/hydrilla/builder/piggybacking.py b/src/hydrilla/builder/piggybacking.py
new file mode 100644
index 0000000..799422d
--- /dev/null
+++ b/src/hydrilla/builder/piggybacking.py
@@ -0,0 +1,115 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+# Handling of software packaged for other distribution systems.
+#
+# This file is part of Hydrilla
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+This module contains definitions that may be reused by multiple piggybacked
+software system backends.
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+from pathlib import Path, PurePosixPath
+from typing import Optional, Iterable
+
+from .. import util
+from .common_errors import *
+
+here = Path(__file__).resolve().parent
+
+_ = util.translation(here / 'locales').gettext
+
+class Piggybacked:
+ """
+ Store information about foreign resources in use.
+
+ Public attributes:
+ 'package_must_depend' (read-only)
+ 'package_license_files' (read-only)
+ """
+ def __init__(self, archives: dict[str, Path]={}, roots: dict[str, Path]={},
+ package_license_files: list[PurePosixPath]=[],
+ package_must_depend: list[dict]=[]):
+ """
+ Initialize this Piggybacked object.
+
+ 'archives' maps piggybacked system names to directories that contain
+ package(s)' archive files. An 'archives' object may look like
+ {'apt': PosixPath('/path/to/dir/with/debs/and/tarballs')}.
+
+ 'roots' associates directory names to be virtually inserted under
+ Hydrilla source package directory with paths to real filesystem
+ directories that hold their desired contents, i.e. unpacked foreign
+ packages.
+
+ 'package_license_files' lists paths to license files that should be
+ included with the Haketilo package that will be produced. The paths are
+ to be resolved using 'roots' dictionary.
+
+ 'package_must_depend' lists names of Haketilo packages that the produced
+ package will additionally depend on. This is meant to help distribute
+ common licenses with a separate Haketilo package.
+ """
+ self.archives = archives
+ self.roots = roots
+ self.package_license_files = package_license_files
+ self.package_must_depend = package_must_depend
+
+ def resolve_file(self, file_ref_name: PurePosixPath) -> Optional[Path]:
+ """
+ 'file_ref_name' is a path as may appear in an index.json file. Check if
+ the file belongs to one of the roots we have and return either a path
+ to the relevant file under this root or None.
+
+ It is not being checked whether the file actually exists in the
+ filesystem.
+ """
+ parts = file_ref_name.parts
+ root_path = self.roots.get(parts and parts[0])
+ path = root_path
+ if path is None:
+ return None
+
+ for part in parts[1:]:
+ path = path / part
+
+ path = path.resolve()
+
+ if not path.is_relative_to(root_path):
+ raise FileReferenceError(_('loading_{}_outside_piggybacked_dir')
+ .format(file_ref_name))
+
+ return path
+
+ def archive_files(self) -> Iterable[tuple[PurePosixPath, Path]]:
+ """
+ Yield all archive files in use. Each yielded tuple holds file's desired
+ path relative to the piggybacked archives directory to be created and
+ its current real path.
+ """
+ for system, real_dir in self.archives.items():
+ for path in real_dir.rglob('*'):
+ yield PurePosixPath(system) / path.relative_to(real_dir), path
diff --git a/src/hydrilla/schemas b/src/hydrilla/schemas
-Subproject 09634f3446866f712a022327683b1149d8f46bf
+Subproject 4b4da5a02bc311603469eea7b3dfd4f1bbb911f
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..d382ead
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,5 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
diff --git a/tests/helpers.py b/tests/helpers.py
new file mode 100644
index 0000000..df474b0
--- /dev/null
+++ b/tests/helpers.py
@@ -0,0 +1,51 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import re
+
+variable_word_re = re.compile(r'^<(.+)>$')
+
+def process_command(command, expected_command):
+ """Validate the command line and extract its variable parts (if any)."""
+ assert len(command) == len(expected_command)
+
+ extracted = {}
+ for word, expected_word in zip(command, expected_command):
+ match = variable_word_re.match(expected_word)
+ if match:
+ extracted[match.group(1)] = word
+ else:
+ assert word == expected_word
+
+ return extracted
+
+def run_missing_executable(command, **kwargs):
+ """
+ Instead of running a command, raise FileNotFoundError as if its executable
+ was missing.
+ """
+ raise FileNotFoundError('dummy')
+
+class MockedCompletedProcess:
+ """
+ Object with some fields similar to those of subprocess.CompletedProcess.
+ """
+ def __init__(self, args, returncode=0,
+ stdout='some output', stderr='some error output',
+ text_output=True):
+ """
+ Initialize MockedCompletedProcess. Convert strings to bytes if needed.
+ """
+ self.args = args
+ self.returncode = returncode
+
+ if type(stdout) is str and not text_output:
+ stdout = stdout.encode()
+ if type(stderr) is str and not text_output:
+ stderr = stderr.encode()
+
+ self.stdout = stdout
+ self.stderr = stderr
diff --git a/tests/test_build.py b/tests/test_build.py
new file mode 100644
index 0000000..a30cff4
--- /dev/null
+++ b/tests/test_build.py
@@ -0,0 +1,674 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import pytest
+import json
+import shutil
+
+from tempfile import TemporaryDirectory
+from pathlib import Path, PurePosixPath
+from hashlib import sha256
+from zipfile import ZipFile
+from contextlib import contextmanager
+
+from jsonschema import ValidationError
+
+from hydrilla import util as hydrilla_util
+from hydrilla.builder import build, _version, local_apt
+from hydrilla.builder.common_errors import *
+
+from .helpers import *
+
+here = Path(__file__).resolve().parent
+
+expected_generated_by = {
+ 'name': 'hydrilla.builder',
+ 'version': _version.version
+}
+
+orig_srcdir = here / 'source-package-example'
+
+index_text = (orig_srcdir / 'index.json').read_text()
+index_obj = json.loads(hydrilla_util.strip_json_comments(index_text))
+
+def read_files(*file_list):
+ """
+ Take names of files under srcdir and return a dict that maps them to their
+ contents (as bytes).
+ """
+ return dict((name, (orig_srcdir / name).read_bytes()) for name in file_list)
+
+dist_files = {
+ **read_files('LICENSES/CC0-1.0.txt', 'bye.js', 'hello.js', 'message.js'),
+ 'report.spdx': b'dummy spdx output'
+}
+src_files = {
+ **dist_files,
+ **read_files('README.txt', 'README.txt.license', '.reuse/dep5',
+ 'index.json')
+}
+extra_archive_files = {
+}
+
+sha256_hashes = dict((name, sha256(contents).digest().hex())
+ for name, contents in src_files.items())
+
+del src_files['report.spdx']
+
+expected_resources = [{
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': [{
+ 'file': 'report.spdx',
+ 'sha256': sha256_hashes['report.spdx']
+ }, {
+ 'file': 'LICENSES/CC0-1.0.txt',
+ 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
+ }],
+ 'type': 'resource',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68',
+ 'version': [2021, 11, 10],
+ 'revision': 1,
+ 'description': 'greets an apple',
+ 'dependencies': [{'identifier': 'hello-message'}],
+ 'scripts': [{
+ 'file': 'hello.js',
+ 'sha256': sha256_hashes['hello.js']
+ }, {
+ 'file': 'bye.js',
+ 'sha256': sha256_hashes['bye.js']
+ }],
+ 'generated_by': expected_generated_by
+}, {
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': [{
+ 'file': 'report.spdx',
+ 'sha256': sha256_hashes['report.spdx']
+ }, {
+ 'file': 'LICENSES/CC0-1.0.txt',
+ 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
+ }],
+ 'type': 'resource',
+ 'identifier': 'hello-message',
+ 'long_name': 'Hello Message',
+ 'uuid': '1ec36229-298c-4b35-8105-c4f2e1b9811e',
+ 'version': [2021, 11, 10],
+ 'revision': 2,
+ 'description': 'define messages for saying hello and bye',
+ 'dependencies': [],
+ 'scripts': [{
+ 'file': 'message.js',
+ 'sha256': sha256_hashes['message.js']
+ }],
+ 'generated_by': expected_generated_by
+}]
+
+expected_mapping = {
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': [{
+ 'file': 'report.spdx',
+ 'sha256': sha256_hashes['report.spdx']
+ }, {
+ 'file': 'LICENSES/CC0-1.0.txt',
+ 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
+ }],
+ 'type': 'mapping',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7',
+ 'version': [2021, 11, 10],
+ 'description': 'causes apple to get greeted on Hydrillabugs issue tracker',
+ 'payloads': {
+ 'https://hydrillabugs.koszko.org/***': {
+ 'identifier': 'helloapple'
+ },
+ 'https://hachettebugs.koszko.org/***': {
+ 'identifier': 'helloapple'
+ }
+ },
+ 'generated_by': expected_generated_by
+}
+
+expected_source_description = {
+ '$schema': 'https://hydrilla.koszko.org/schemas/api_source_description-1.schema.json',
+ 'source_name': 'hello',
+ 'source_copyright': [{
+ 'file': 'report.spdx',
+ 'sha256': sha256_hashes['report.spdx']
+ }, {
+ 'file': 'LICENSES/CC0-1.0.txt',
+ 'sha256': sha256_hashes['LICENSES/CC0-1.0.txt']
+ }],
+ 'source_archives': {
+ 'zip': {
+ 'sha256': '!!!!value to fill during test!!!!',
+ }
+ },
+ 'upstream_url': 'https://git.koszko.org/hydrilla-source-package-example',
+ 'definitions': [{
+ 'type': 'resource',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'version': [2021, 11, 10],
+ }, {
+ 'type': 'resource',
+ 'identifier': 'hello-message',
+ 'long_name': 'Hello Message',
+ 'version': [2021, 11, 10],
+ }, {
+ 'type': 'mapping',
+ 'identifier': 'helloapple',
+ 'long_name': 'Hello Apple',
+ 'version': [2021, 11, 10],
+ }],
+ 'generated_by': expected_generated_by
+}
+
+expected = [*expected_resources, expected_mapping, expected_source_description]
+
+@pytest.fixture
+def tmpdir() -> Iterable[str]:
+ """
+ Provide test case with a temporary directory that will be automatically
+ deleted after the test.
+ """
+ with TemporaryDirectory() as tmpdir:
+ yield Path(tmpdir)
+
+def run_reuse(command, **kwargs):
+ """
+ Instead of running a 'reuse' command, check if 'mock_reuse_missing' file
+ exists under root directory. If yes, raise FileNotFoundError as if 'reuse'
+ command was missing. If not, check if 'README.txt.license' file exists
+ in the requested directory and return zero if it does.
+ """
+ expected = ['reuse', '--root', '<root>',
+ 'lint' if 'lint' in command else 'spdx']
+
+ root_path = Path(process_command(command, expected)['root'])
+
+ if (root_path / 'mock_reuse_missing').exists():
+ raise FileNotFoundError('dummy')
+
+ is_reuse_compliant = (root_path / 'README.txt.license').exists()
+
+ return MockedCompletedProcess(command, 1 - is_reuse_compliant,
+ stdout=f'dummy {expected[-1]} output',
+ text_output=kwargs.get('text'))
+
+mocked_piggybacked_archives = [
+ PurePosixPath('apt/something.deb'),
+ PurePosixPath('apt/something.orig.tar.gz'),
+ PurePosixPath('apt/something.debian.tar.xz'),
+ PurePosixPath('othersystem/other-something.tar.gz')
+]
+
+@pytest.fixture
+def mock_piggybacked_apt_system(monkeypatch):
+ """Make local_apt.piggybacked_system() return a mocked result."""
+ # We set 'td' to a temporary dir path further below.
+ td = None
+
+ class MockedPiggybacked:
+ """Minimal mock of Piggybacked object."""
+ package_license_files = [PurePosixPath('.apt-root/.../copyright')]
+ package_must_depend = [{'identifier': 'apt-common-licenses'}]
+
+ def resolve_file(path):
+ """
+ For each path that starts with '.apt-root' return a valid
+ dummy file path.
+ """
+ if path.parts[0] != '.apt-root':
+ return None
+
+ (td / path.name).write_text(f'dummy {path.name}')
+
+ return (td / path.name)
+
+ def archive_files():
+ """Yield some valid dummy file path tuples."""
+ for desired_path in mocked_piggybacked_archives:
+ real_path = td / desired_path.name
+ real_path.write_text(f'dummy {desired_path.name}')
+
+ yield desired_path, real_path
+
+ @contextmanager
+ def mocked_piggybacked_system(piggyback_def, piggyback_files):
+ """Mock the execution of local_apt.piggybacked_system()."""
+ assert piggyback_def == {
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['somelib=1.0'],
+ 'dependencies': False
+ }
+ if piggyback_files is not None:
+ assert {str(path) for path in mocked_piggybacked_archives} == \
+ {path.relative_to(piggyback_files).as_posix()
+ for path in piggyback_files.rglob('*') if path.is_file()}
+
+ yield MockedPiggybacked
+
+ monkeypatch.setattr(local_apt, 'piggybacked_system',
+ mocked_piggybacked_system)
+
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ yield
+
+@pytest.fixture
+def sample_source():
+ """Prepare a directory with sample Haketilo source package."""
+ with TemporaryDirectory() as td:
+ sample_source = Path(td) / 'hello'
+ for name, contents in src_files.items():
+ path = sample_source / name
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_bytes(contents)
+
+ yield sample_source
+
+variant_makers = []
+def variant_maker(function):
+ """Decorate function by placing it in variant_makers array."""
+ variant_makers.append(function)
+ return function
+
+@variant_maker
+def sample_source_change_index_json(monkeypatch, sample_source):
+ """
+ Return a non-standard path for index.json. Ensure parent directories exist.
+ """
+ # Use a path under sample_source so that it gets auto-deleted after the
+ # test. Use a file under .git because .git is ignored by REUSE.
+ path = sample_source / '.git' / 'replacement.json'
+ path.parent.mkdir()
+ return path
+
+@variant_maker
+def sample_source_add_comments(monkeypatch, sample_source):
+ """Add index.json comments that should be preserved."""
+ for dictionary in (index_obj, expected_source_description):
+ monkeypatch.setitem(dictionary, 'comment', 'index.json comment')
+
+ for i, dicts in enumerate(zip(index_obj['definitions'], expected)):
+ for dictionary in dicts:
+ monkeypatch.setitem(dictionary, 'comment', 'index.json comment')
+
+@variant_maker
+def sample_source_remove_spdx(monkeypatch, sample_source):
+ """Remove spdx report generation."""
+ monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report')
+
+ for obj, key in [
+ (index_obj, 'copyright'),
+ *((definition, 'source_copyright') for definition in expected)
+ ]:
+ new_list = [r for r in obj[key] if r['file'] != 'report.spdx']
+ monkeypatch.setitem(obj, key, new_list)
+
+ monkeypatch.delitem(dist_files, 'report.spdx')
+
+ # To verify that reuse does not get called now, make mocked subprocess.run()
+ # raise an error if called.
+ (sample_source / 'mock_reuse_missing').touch()
+
+@variant_maker
+def sample_source_remove_additional_files(monkeypatch, sample_source):
+ """Use default value ([]) for 'additionall_files' property."""
+ monkeypatch.delitem(index_obj, 'additional_files')
+
+ for name in 'README.txt', 'README.txt.license', '.reuse/dep5':
+ monkeypatch.delitem(src_files, name)
+
+@variant_maker
+def sample_source_remove_script(monkeypatch, sample_source):
+ """Use default value ([]) for 'scripts' property in one of the resources."""
+ monkeypatch.delitem(index_obj['definitions'][1], 'scripts')
+
+ monkeypatch.setitem(expected_resources[1], 'scripts', [])
+
+ for files in dist_files, src_files:
+ monkeypatch.delitem(files, 'message.js')
+
+@variant_maker
+def sample_source_remove_payloads(monkeypatch, sample_source):
+ """Use default value ({}) for 'payloads' property in mapping."""
+ monkeypatch.delitem(index_obj['definitions'][2], 'payloads')
+
+ monkeypatch.setitem(expected_mapping, 'payloads', {})
+
+@variant_maker
+def sample_source_remove_uuids(monkeypatch, sample_source):
+ """Don't use UUIDs (they are optional)."""
+ for definition in index_obj['definitions']:
+ monkeypatch.delitem(definition, 'uuid')
+
+ for description in expected:
+ if 'uuid' in description:
+ monkeypatch.delitem(description, 'uuid')
+
+@variant_maker
+def sample_source_add_extra_props(monkeypatch, sample_source):
+ """Add some unrecognized properties that should be stripped."""
+ to_process = [index_obj]
+ while to_process:
+ processed = to_process.pop()
+
+ if type(processed) is list:
+ to_process.extend(processed)
+ elif type(processed) is dict and 'spurious_property' not in processed:
+ to_process.extend(v for k, v in processed.items()
+ if k != 'payloads')
+ monkeypatch.setitem(processed, 'spurious_property', 'some_value')
+
+piggyback_archive_names = [
+ 'apt/something.deb',
+ 'apt/something.orig.tar.gz',
+ 'apt/something.debian.tar.xz',
+ 'othersystem/other-something.tar.gz'
+]
+
+@variant_maker
+def sample_source_add_piggyback(monkeypatch, sample_source,
+ extra_build_args={}):
+ """Add piggybacked foreign system packages."""
+ old_build = build.Build
+ new_build = lambda *a, **kwa: old_build(*a, **kwa, **extra_build_args)
+ monkeypatch.setattr(build, 'Build', new_build)
+
+ monkeypatch.setitem(index_obj, 'piggyback_on', {
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['somelib=1.0'],
+ 'dependencies': False
+ })
+ schema = 'https://hydrilla.koszko.org/schemas/package_source-2.schema.json'
+ monkeypatch.setitem(index_obj, '$schema', schema)
+
+ new_refs = {}
+ for name in '.apt-root/.../copyright', '.apt-root/.../script.js':
+ contents = f'dummy {PurePosixPath(name).name}'.encode()
+ digest = sha256(contents).digest().hex()
+ monkeypatch.setitem(dist_files, name, contents)
+ monkeypatch.setitem(sha256_hashes, name, digest)
+ new_refs[PurePosixPath(name).name] = {'file': name, 'sha256': digest}
+
+ for obj in expected:
+ new_list = [*obj['source_copyright'], new_refs['copyright']]
+ monkeypatch.setitem(obj, 'source_copyright', new_list)
+
+ for obj in expected_resources:
+ new_list = [{'identifier': 'apt-common-licenses'}, *obj['dependencies']]
+ monkeypatch.setitem(obj, 'dependencies', new_list)
+
+ for obj in index_obj['definitions'][0], expected_resources[0]:
+ new_list = [new_refs['script.js'], *obj['scripts']]
+ monkeypatch.setitem(obj, 'scripts', new_list)
+
+ for name in piggyback_archive_names:
+ path = PurePosixPath('hello.foreign-packages') / name
+ monkeypatch.setitem(extra_archive_files, str(path),
+ f'dummy {path.name}'.encode())
+
+def prepare_foreign_packages_dir(path):
+ """
+ Put some dummy archive in the directory so that it can be passed to
+ piggybacked_system().
+ """
+ for name in piggyback_archive_names:
+ archive_path = path / name
+ archive_path.parent.mkdir(parents=True, exist_ok=True)
+ archive_path.write_text(f'dummy {archive_path.name}')
+
+@variant_maker
+def sample_source_add_piggyback_pass_archives(monkeypatch, sample_source):
+ """
+ Add piggybacked foreign system packages, use pre-downloaded foreign package
+ archives (have Build() find them in their default directory).
+ """
+ # Dir next to 'sample_source' will also be gc'd by sample_source() fixture.
+ foreign_packages_dir = sample_source.parent / 'arbitrary-name'
+
+ prepare_foreign_packages_dir(foreign_packages_dir)
+
+ sample_source_add_piggyback(monkeypatch, sample_source,
+ {'piggyback_files': foreign_packages_dir})
+
+@variant_maker
+def sample_source_add_piggyback_find_archives(monkeypatch, sample_source):
+ """
+ Add piggybacked foreign system packages, use pre-downloaded foreign package
+ archives (specify their directory as argument to Build()).
+ """
+ # Dir next to 'sample_source' will also be gc'd by sample_source() fixture.
+ foreign_packages_dir = sample_source.parent / 'hello.foreign-packages'
+
+ prepare_foreign_packages_dir(foreign_packages_dir)
+
+ sample_source_add_piggyback(monkeypatch, sample_source)
+
+@variant_maker
+def sample_source_add_piggyback_no_download(monkeypatch, sample_source,
+ pass_directory_to_build=False):
+ """
+ Add piggybacked foreign system packages, use pre-downloaded foreign package
+ archives.
+ """
+ # Use a dir next to 'sample_source'; have it gc'd by sample_source fixture.
+ if pass_directory_to_build:
+ foreign_packages_dir = sample_source.parent / 'arbitrary-name'
+ else:
+ foreign_packages_dir = sample_source.parent / 'hello.foreign-packages'
+
+ prepare_foreign_packages_dir(foreign_packages_dir)
+
+ sample_source_add_piggyback(monkeypatch, sample_source)
+
+@pytest.fixture(params=[lambda m, s: None, *variant_makers])
+def sample_source_make_variants(request, monkeypatch, sample_source,
+ mock_piggybacked_apt_system):
+ """
+ Prepare a directory with sample Haketilo source package in multiple slightly
+ different versions (all correct). Return an index.json path that should be
+ used when performing test build.
+ """
+ index_path = request.param(monkeypatch, sample_source) or Path('index.json')
+
+ index_text = json.dumps(index_obj)
+
+ (sample_source / index_path).write_text(index_text)
+
+ monkeypatch.setitem(src_files, 'index.json', index_text.encode())
+
+ return index_path
+
+@pytest.mark.subprocess_run(build, run_reuse)
+@pytest.mark.usefixtures('mock_subprocess_run')
+def test_build(sample_source, sample_source_make_variants, tmpdir):
+ """Build the sample source package and verify the produced files."""
+ index_json_path = sample_source_make_variants
+
+ # First, build the package
+ build.Build(sample_source, index_json_path).write_package_files(tmpdir)
+
+ # Verify directories under destination directory
+ assert {'file', 'resource', 'mapping', 'source'} == \
+ set([path.name for path in tmpdir.iterdir()])
+
+ # Verify files under 'file/'
+ file_dir = tmpdir / 'file' / 'sha256'
+
+ for name, contents in dist_files.items():
+ dist_file_path = file_dir / sha256_hashes[name]
+ assert dist_file_path.is_file()
+ assert dist_file_path.read_bytes() == contents
+
+ assert {p.name for p in file_dir.iterdir()} == \
+ {sha256_hashes[name] for name in dist_files.keys()}
+
+ # Verify files under 'resource/'
+ resource_dir = tmpdir / 'resource'
+
+ assert {rj['identifier'] for rj in expected_resources} == \
+ {path.name for path in resource_dir.iterdir()}
+
+ for resource_json in expected_resources:
+ subdir = resource_dir / resource_json['identifier']
+ assert ['2021.11.10'] == [path.name for path in subdir.iterdir()]
+
+ assert json.loads((subdir / '2021.11.10').read_text()) == resource_json
+
+ hydrilla_util.validator_for('api_resource_description-1.0.1.schema.json')\
+ .validate(resource_json)
+
+ # Verify files under 'mapping/'
+ mapping_dir = tmpdir / 'mapping'
+ assert ['helloapple'] == [path.name for path in mapping_dir.iterdir()]
+
+ subdir = mapping_dir / 'helloapple'
+ assert ['2021.11.10'] == [path.name for path in subdir.iterdir()]
+
+ assert json.loads((subdir / '2021.11.10').read_text()) == expected_mapping
+
+ hydrilla_util.validator_for('api_mapping_description-1.0.1.schema.json')\
+ .validate(expected_mapping)
+
+ # Verify files under 'source/'
+ source_dir = tmpdir / 'source'
+ assert {'hello.json', 'hello.zip'} == \
+ {path.name for path in source_dir.iterdir()}
+
+ archive_files = {**dict((f'hello/{name}', contents)
+ for name, contents in src_files.items()),
+ **extra_archive_files}
+
+ with ZipFile(source_dir / 'hello.zip', 'r') as archive:
+ print(archive.namelist())
+ assert len(archive.namelist()) == len(archive_files)
+
+ for name, contents in archive_files.items():
+ assert archive.read(name) == contents
+
+ zip_ref = expected_source_description['source_archives']['zip']
+ zip_contents = (source_dir / 'hello.zip').read_bytes()
+ zip_ref['sha256'] = sha256(zip_contents).digest().hex()
+
+ assert json.loads((source_dir / 'hello.json').read_text()) == \
+ expected_source_description
+
+ hydrilla_util.validator_for('api_source_description-1.0.1.schema.json')\
+ .validate(expected_source_description)
+
+error_makers = []
+def error_maker(function):
+ """Decorate function by placing it in error_makers array."""
+ error_makers.append(function)
+
+@error_maker
+def sample_source_error_missing_file(monkeypatch, sample_source):
+ """
+ Modify index.json to expect missing report.spdx file and cause an error.
+ """
+ monkeypatch.delitem(index_obj, 'reuse_generate_spdx_report')
+ return FileNotFoundError
+
+@error_maker
+def sample_source_error_index_schema(monkeypatch, sample_source):
+ """Modify index.json to be incompliant with the schema."""
+ monkeypatch.delitem(index_obj, 'definitions')
+ return ValidationError
+
+@error_maker
+def sample_source_error_bad_comment(monkeypatch, sample_source):
+ """Modify index.json to have an invalid '/' in it."""
+ return json.JSONDecodeError, json.dumps(index_obj) + '/something\n'
+
+@error_maker
+def sample_source_error_bad_json(monkeypatch, sample_source):
+ """Modify index.json to not be valid json even after comment stripping."""
+ return json.JSONDecodeError, json.dumps(index_obj) + '???/\n'
+
+@error_maker
+def sample_source_error_missing_reuse(monkeypatch, sample_source):
+ """Cause mocked reuse process invocation to fail with FileNotFoundError."""
+ (sample_source / 'mock_reuse_missing').touch()
+ return build.ReuseError
+
+@error_maker
+def sample_source_error_missing_license(monkeypatch, sample_source):
+ """Remove a file to make package REUSE-incompliant."""
+ (sample_source / 'README.txt.license').unlink()
+ return build.ReuseError
+
+@error_maker
+def sample_source_error_file_outside(monkeypatch, sample_source):
+ """Make index.json illegally reference a file outside srcdir."""
+ new_list = [*index_obj['copyright'], {'file': '../abc'}]
+ monkeypatch.setitem(index_obj, 'copyright', new_list)
+ return FileReferenceError
+
+@error_maker
+def sample_source_error_reference_itself(monkeypatch, sample_source):
+ """Make index.json illegally reference index.json."""
+ new_list = [*index_obj['copyright'], {'file': 'index.json'}]
+ monkeypatch.setitem(index_obj, 'copyright', new_list)
+ return FileReferenceError
+
+@error_maker
+def sample_source_error_report_excluded(monkeypatch, sample_source):
+ """
+ Make index.json require generation of report.spdx but don't include it among
+ copyright files.
+ """
+ new_list = [file_ref for file_ref in index_obj['copyright']
+ if file_ref['file'] != 'report.spdx']
+ monkeypatch.setitem(index_obj, 'copyright', new_list)
+ return FileReferenceError
+
+@pytest.fixture(params=error_makers)
+def sample_source_make_errors(request, monkeypatch, sample_source):
+ """
+ Prepare a directory with sample Haketilo source package in multiple slightly
+ broken versions. Return an error type that should be raised when running
+ test build.
+ """
+ index_text = None
+ error_type = request.param(monkeypatch, sample_source)
+ if type(error_type) is tuple:
+ error_type, index_text = error_type
+
+ index_text = index_text or json.dumps(index_obj)
+
+ (sample_source / 'index.json').write_text(index_text)
+
+ monkeypatch.setitem(src_files, 'index.json', index_text.encode())
+
+ return error_type
+
+@pytest.mark.subprocess_run(build, run_reuse)
+@pytest.mark.usefixtures('mock_subprocess_run')
+def test_build_error(tmpdir, sample_source, sample_source_make_errors):
+ """Try building the sample source package and verify generated errors."""
+ error_type = sample_source_make_errors
+
+ dstdir = Path(tmpdir) / 'dstdir'
+ tmpdir = Path(tmpdir) / 'example'
+
+ dstdir.mkdir(exist_ok=True)
+ tmpdir.mkdir(exist_ok=True)
+
+ with pytest.raises(error_type):
+ build.Build(sample_source, Path('index.json'))\
+ .write_package_files(dstdir)
diff --git a/tests/test_hydrilla_builder.py b/tests/test_hydrilla_builder.py
deleted file mode 100644
index 851b5cd..0000000
--- a/tests/test_hydrilla_builder.py
+++ /dev/null
@@ -1,472 +0,0 @@
-# SPDX-License-Identifier: CC0-1.0
-
-# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
-#
-# Available under the terms of Creative Commons Zero v1.0 Universal.
-
-# Enable using with Python 3.7.
-from __future__ import annotations
-
-import pytest
-import json
-import shutil
-
-from tempfile import TemporaryDirectory
-from pathlib import Path
-from hashlib import sha256, sha1
-from zipfile import ZipFile
-from typing import Callable, Optional, Iterable
-
-from jsonschema import ValidationError
-
-from hydrilla import util as hydrilla_util
-from hydrilla.builder import build, _version
-
-here = Path(__file__).resolve().parent
-
-expected_generated_by = {
- 'name': 'hydrilla.builder',
- 'version': _version.version
-}
-
-default_srcdir = here / 'source-package-example'
-
-default_js_filenames = ['bye.js', 'hello.js', 'message.js']
-default_dist_filenames = [*default_js_filenames, 'LICENSES/CC0-1.0.txt']
-default_src_filenames = [
- *default_dist_filenames,
- 'README.txt', 'README.txt.license', '.reuse/dep5', 'index.json'
-]
-
-default_sha1_hashes = {}
-default_sha256_hashes = {}
-default_contents = {}
-
-for fn in default_src_filenames:
- with open(default_srcdir / fn, 'rb') as file_handle:
- default_contents[fn] = file_handle.read()
- default_sha256_hashes[fn] = sha256(default_contents[fn]).digest().hex()
- default_sha1_hashes[fn] = sha1(default_contents[fn]).digest().hex()
-
-class CaseSettings:
- """Gather parametrized values in a class."""
- def __init__(self):
- """Init CaseSettings with default values."""
- self.srcdir = default_srcdir
- self.index_json_path = Path('index.json')
- self.report_spdx_included = True
-
- self.js_filenames = default_js_filenames.copy()
- self.dist_filenames = default_dist_filenames.copy()
- self.src_filenames = default_src_filenames.copy()
-
- self.sha1_hashes = default_sha1_hashes.copy()
- self.sha256_hashes = default_sha256_hashes.copy()
- self.contents = default_contents.copy()
-
- self.expected_resources = [{
- '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
- 'source_name': 'hello',
- 'source_copyright': [{
- 'file': 'report.spdx',
- 'sha256': '!!!!value to fill during test!!!!'
- }, {
- 'file': 'LICENSES/CC0-1.0.txt',
- 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt']
- }],
- 'type': 'resource',
- 'identifier': 'helloapple',
- 'long_name': 'Hello Apple',
- 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68',
- 'version': [2021, 11, 10],
- 'revision': 1,
- 'description': 'greets an apple',
- 'dependencies': [{'identifier': 'hello-message'}],
- 'scripts': [{
- 'file': 'hello.js',
- 'sha256': self.sha256_hashes['hello.js']
- }, {
- 'file': 'bye.js',
- 'sha256': self.sha256_hashes['bye.js']
- }],
- 'generated_by': expected_generated_by
- }, {
- '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json',
- 'source_name': 'hello',
- 'source_copyright': [{
- 'file': 'report.spdx',
- 'sha256': '!!!!value to fill during test!!!!'
- }, {
- 'file': 'LICENSES/CC0-1.0.txt',
- 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt']
- }],
- 'type': 'resource',
- 'identifier': 'hello-message',
- 'long_name': 'Hello Message',
- 'uuid': '1ec36229-298c-4b35-8105-c4f2e1b9811e',
- 'version': [2021, 11, 10],
- 'revision': 2,
- 'description': 'define messages for saying hello and bye',
- 'dependencies': [],
- 'scripts': [{
- 'file': 'message.js',
- 'sha256': self.sha256_hashes['message.js']
- }],
- 'generated_by': expected_generated_by
- }]
- self.expected_mapping = {
- '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json',
- 'source_name': 'hello',
- 'source_copyright': [{
- 'file': 'report.spdx',
- 'sha256': '!!!!value to fill during test!!!!'
- }, {
- 'file': 'LICENSES/CC0-1.0.txt',
- 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt']
- }],
- 'type': 'mapping',
- 'identifier': 'helloapple',
- 'long_name': 'Hello Apple',
- 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7',
- 'version': [2021, 11, 10],
- 'description': 'causes apple to get greeted on Hydrillabugs issue tracker',
- 'payloads': {
- 'https://hydrillabugs.koszko.org/***': {
- 'identifier': 'helloapple'
- },
- 'https://hachettebugs.koszko.org/***': {
- 'identifier': 'helloapple'
- }
- },
- 'generated_by': expected_generated_by
- }
- self.expected_source_description = {
- '$schema': 'https://hydrilla.koszko.org/schemas/api_source_description-1.schema.json',
- 'source_name': 'hello',
- 'source_copyright': [{
- 'file': 'report.spdx',
- 'sha256': '!!!!value to fill during test!!!!'
- }, {
- 'file': 'LICENSES/CC0-1.0.txt',
- 'sha256': self.sha256_hashes['LICENSES/CC0-1.0.txt']
- }],
- 'source_archives': {
- 'zip': {
- 'sha256': '!!!!value to fill during test!!!!',
- }
- },
- 'upstream_url': 'https://git.koszko.org/hydrilla-source-package-example',
- 'definitions': [{
- 'type': 'resource',
- 'identifier': 'helloapple',
- 'long_name': 'Hello Apple',
- 'version': [2021, 11, 10],
- }, {
- 'type': 'resource',
- 'identifier': 'hello-message',
- 'long_name': 'Hello Message',
- 'version': [2021, 11, 10],
- }, {
- 'type': 'mapping',
- 'identifier': 'helloapple',
- 'long_name': 'Hello Apple',
- 'version': [2021, 11, 10],
- }],
- 'generated_by': expected_generated_by
- }
-
- def expected(self) -> list[dict]:
- """
- Convenience method to get a list of expected jsons of 2 resources,
- 1 mapping and 1 source description we have.
- """
- return [
- *self.expected_resources,
- self.expected_mapping,
- self.expected_source_description
- ]
-
-ModifyCb = Callable[[CaseSettings, dict], Optional[str]]
-
-def prepare_modified(tmpdir: Path, modify_cb: ModifyCb) -> CaseSettings:
- """
- Use sample source package directory with an alternative, modified
- index.json.
- """
- settings = CaseSettings()
-
- for fn in settings.src_filenames:
- copy_path = tmpdir / 'srcdir_copy' / fn
- copy_path.parent.mkdir(parents=True, exist_ok=True)
- shutil.copy(settings.srcdir / fn, copy_path)
-
- settings.srcdir = tmpdir / 'srcdir_copy'
-
- with open(settings.srcdir / 'index.json', 'rt') as file_handle:
- obj = json.loads(hydrilla_util.strip_json_comments(file_handle.read()))
-
- contents = modify_cb(settings, obj)
-
- # Replace the other index.json with new one
- settings.index_json_path = tmpdir / 'replacement.json'
-
- if contents is None:
- contents = json.dumps(obj)
-
- contents = contents.encode()
-
- settings.contents['index.json'] = contents
-
- settings.sha256_hashes['index.json'] = sha256(contents).digest().hex()
- settings.sha1_hashes['index.json'] = sha1(contents).digest().hex()
-
- with open(settings.index_json_path, 'wb') as file_handle:
- file_handle.write(contents)
-
- return settings
-
-@pytest.fixture()
-def tmpdir() -> Iterable[str]:
- with TemporaryDirectory() as tmpdir:
- yield tmpdir
-
-def prepare_default(tmpdir: Path) -> CaseSettings:
- """Use sample source package directory as exists in VCS."""
- return CaseSettings()
-
-def modify_index_good(settings: CaseSettings, obj: dict) -> None:
- """
- Modify index.json object to make a slightly different but *also correct* one
- that can be used to test some different cases.
- """
- # Add comments that should be preserved.
- for dictionary in (obj, settings.expected_source_description):
- dictionary['comment'] = 'index_json comment'
-
- for i, dicts in enumerate(zip(obj['definitions'], settings.expected())):
- for dictionary in dicts:
- dictionary['comment'] = f'item {i}'
-
- # Remove spdx report generation
- del obj['reuse_generate_spdx_report']
- obj['copyright'].remove({'file': 'report.spdx'})
-
- settings.report_spdx_included = False
-
- for json_description in settings.expected():
- json_description['source_copyright'] = \
- [fr for fr in json_description['source_copyright']
- if fr['file'] != 'report.spdx']
-
- # Use default value ([]) for 'additionall_files' property
- del obj['additional_files']
-
- settings.src_filenames = [*settings.dist_filenames, 'index.json']
-
- # Use default value ([]) for 'scripts' property in one of the resources
- del obj['definitions'][1]['scripts']
-
- settings.expected_resources[1]['scripts'] = []
-
- for prefix in ('js', 'dist', 'src'):
- getattr(settings, f'{prefix}_filenames').remove('message.js')
-
- # Use default value ({}) for 'pyloads' property in mapping
- del obj['definitions'][2]['payloads']
-
- settings.expected_mapping['payloads'] = {}
-
- # Don't use UUIDs (they are optional)
- for definition in obj['definitions']:
- del definition['uuid']
-
- for description in settings.expected():
- if 'uuid' in description:
- del description['uuid']
-
- # Add some unrecognized properties that should be stripped
- to_process = [obj]
- while to_process:
- processed = to_process.pop()
-
- if type(processed) is list:
- to_process.extend(processed)
- elif type(processed) is dict and 'spurious_property' not in processed:
- to_process.extend(processed.values())
- processed['spurious_property'] = 'some value'
-
-@pytest.mark.parametrize('prepare_source_example', [
- prepare_default,
- lambda tmpdir: prepare_modified(tmpdir, modify_index_good)
-])
-def test_build(tmpdir, prepare_source_example):
- """Build the sample source package and verify the produced files."""
- # First, build the package
- dstdir = Path(tmpdir) / 'dstdir'
- tmpdir = Path(tmpdir) / 'example'
-
- dstdir.mkdir(exist_ok=True)
- tmpdir.mkdir(exist_ok=True)
-
- settings = prepare_source_example(tmpdir)
-
- build.Build(settings.srcdir, settings.index_json_path)\
- .write_package_files(dstdir)
-
- # Verify directories under destination directory
- assert {'file', 'resource', 'mapping', 'source'} == \
- set([path.name for path in dstdir.iterdir()])
-
- # Verify files under 'file/'
- file_dir = dstdir / 'file' / 'sha256'
-
- for fn in settings.dist_filenames:
- dist_file_path = file_dir / settings.sha256_hashes[fn]
- assert dist_file_path.is_file()
-
- assert dist_file_path.read_bytes() == settings.contents[fn]
-
- sha256_hashes_set = set([settings.sha256_hashes[fn]
- for fn in settings.dist_filenames])
-
- spdx_report_sha256 = None
-
- for path in file_dir.iterdir():
- if path.name in sha256_hashes_set:
- continue
-
- assert spdx_report_sha256 is None and settings.report_spdx_included
-
- with open(path, 'rt') as file_handle:
- spdx_contents = file_handle.read()
-
- spdx_report_sha256 = sha256(spdx_contents.encode()).digest().hex()
- assert spdx_report_sha256 == path.name
-
- for fn in settings.src_filenames:
- if not any([n in fn.lower() for n in ('license', 'reuse')]):
- assert settings.sha1_hashes[fn]
-
- if settings.report_spdx_included:
- assert spdx_report_sha256
- for obj in settings.expected():
- for file_ref in obj['source_copyright']:
- if file_ref['file'] == 'report.spdx':
- file_ref['sha256'] = spdx_report_sha256
-
- # Verify files under 'resource/'
- resource_dir = dstdir / 'resource'
-
- assert set([rj['identifier'] for rj in settings.expected_resources]) == \
- set([path.name for path in resource_dir.iterdir()])
-
- for resource_json in settings.expected_resources:
- subdir = resource_dir / resource_json['identifier']
- assert ['2021.11.10'] == [path.name for path in subdir.iterdir()]
-
- with open(subdir / '2021.11.10', 'rt') as file_handle:
- assert json.load(file_handle) == resource_json
-
- hydrilla_util.validator_for('api_resource_description-1.0.1.schema.json')\
- .validate(resource_json)
-
- # Verify files under 'mapping/'
- mapping_dir = dstdir / 'mapping'
- assert ['helloapple'] == [path.name for path in mapping_dir.iterdir()]
-
- subdir = mapping_dir / 'helloapple'
- assert ['2021.11.10'] == [path.name for path in subdir.iterdir()]
-
- with open(subdir / '2021.11.10', 'rt') as file_handle:
- assert json.load(file_handle) == settings.expected_mapping
-
- hydrilla_util.validator_for('api_mapping_description-1.0.1.schema.json')\
- .validate(settings.expected_mapping)
-
- # Verify files under 'source/'
- source_dir = dstdir / 'source'
- assert {'hello.json', 'hello.zip'} == \
- set([path.name for path in source_dir.iterdir()])
-
- zip_filenames = [f'hello/{fn}' for fn in settings.src_filenames]
-
- with ZipFile(source_dir / 'hello.zip', 'r') as archive:
- assert set([f.filename for f in archive.filelist]) == set(zip_filenames)
-
- for zip_fn, src_fn in zip(zip_filenames, settings.src_filenames):
- with archive.open(zip_fn, 'r') as zip_file_handle:
- assert zip_file_handle.read() == settings.contents[src_fn]
-
- zip_ref = settings.expected_source_description['source_archives']['zip']
- with open(source_dir / 'hello.zip', 'rb') as file_handle:
- zip_ref['sha256'] = sha256(file_handle.read()).digest().hex()
-
- with open(source_dir / 'hello.json', 'rt') as file_handle:
- assert json.load(file_handle) == settings.expected_source_description
-
- hydrilla_util.validator_for('api_source_description-1.0.1.schema.json')\
- .validate(settings.expected_source_description)
-
-def modify_index_missing_file(dummy: CaseSettings, obj: dict) -> None:
- """
- Modify index.json to expect missing report.spdx file and cause an error.
- """
- del obj['reuse_generate_spdx_report']
-
-def modify_index_schema_error(dummy: CaseSettings, obj: dict) -> None:
- """Modify index.json to be incompliant with the schema."""
- del obj['definitions']
-
-def modify_index_bad_comment(dummy: CaseSettings, obj: dict) -> str:
- """Modify index.json to have an invalid '/' in it."""
- return json.dumps(obj) + '/something\n'
-
-def modify_index_bad_json(dummy: CaseSettings, obj: dict) -> str:
- """Modify index.json to not be valid json even after comment stripping."""
- return json.dumps(obj) + '???/\n'
-
-def modify_index_missing_license(settings: CaseSettings, obj: dict) -> None:
- """Remove a file to make package REUSE-incompliant."""
- (settings.srcdir / 'README.txt.license').unlink()
-
-def modify_index_file_outside(dummy: CaseSettings, obj: dict) -> None:
- """Make index.json illegally reference a file outside srcdir."""
- obj['copyright'].append({'file': '../abc'})
-
-def modify_index_reference_itself(dummy: CaseSettings, obj: dict) -> None:
- """Make index.json illegally reference index.json."""
- obj['copyright'].append({'file': 'index.json'})
-
-def modify_index_report_excluded(dummy: CaseSettings, obj: dict) -> None:
- """
- Make index.json require generation of index.json but not include it among
- copyright files.
- """
- obj['copyright'] = [fr for fr in obj['copyright']
- if fr['file'] != 'report.spdx']
-
-@pytest.mark.parametrize('break_index_json', [
- (modify_index_missing_file, FileNotFoundError),
- (modify_index_schema_error, ValidationError),
- (modify_index_bad_comment, json.JSONDecodeError),
- (modify_index_bad_json, json.JSONDecodeError),
- (modify_index_missing_license, build.ReuseError),
- (modify_index_file_outside, build.FileReferenceError),
- (modify_index_reference_itself, build.FileReferenceError),
- (modify_index_report_excluded, build.FileReferenceError)
-])
-def test_build_error(tmpdir: str, break_index_json: tuple[ModifyCb, type]):
- """Build the sample source package and verify the produced files."""
- dstdir = Path(tmpdir) / 'dstdir'
- tmpdir = Path(tmpdir) / 'example'
-
- dstdir.mkdir(exist_ok=True)
- tmpdir.mkdir(exist_ok=True)
-
- modify_cb, error_type = break_index_json
-
- settings = prepare_modified(tmpdir, modify_cb)
-
- with pytest.raises(error_type):
- build.Build(settings.srcdir, settings.index_json_path)\
- .write_package_files(dstdir)
diff --git a/tests/test_local_apt.py b/tests/test_local_apt.py
new file mode 100644
index 0000000..4f3a831
--- /dev/null
+++ b/tests/test_local_apt.py
@@ -0,0 +1,651 @@
+# SPDX-License-Identifier: CC0-1.0
+
+# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# Available under the terms of Creative Commons Zero v1.0 Universal.
+
+import pytest
+import tempfile
+import re
+import json
+from pathlib import Path, PurePosixPath
+from zipfile import ZipFile
+from tempfile import TemporaryDirectory
+
+from hydrilla.builder import local_apt
+from hydrilla.builder.common_errors import *
+
+here = Path(__file__).resolve().parent
+
+from .helpers import *
+
+@pytest.fixture
+def mock_cache_dir(monkeypatch):
+ """Make local_apt.py cache files to a temporary directory."""
+ with tempfile.TemporaryDirectory() as td:
+ td_path = Path(td)
+ monkeypatch.setattr(local_apt, 'default_apt_cache_dir', td_path)
+ yield td_path
+
+@pytest.fixture
+def mock_gnupg_import(monkeypatch, mock_cache_dir):
+ """Mock gnupg library when imported dynamically."""
+
+ gnupg_mock_dir = mock_cache_dir / 'gnupg_mock'
+ gnupg_mock_dir.mkdir()
+ (gnupg_mock_dir / 'gnupg.py').write_text('GPG = None\n')
+
+ monkeypatch.syspath_prepend(str(gnupg_mock_dir))
+
+ import gnupg
+
+ keyring_path = mock_cache_dir / 'master_keyring.gpg'
+
+ class MockedImportResult:
+ """gnupg.ImportResult replacement"""
+ def __init__(self):
+ """Initialize MockedImportResult object."""
+ self.imported = 1
+
+ class MockedGPG:
+ """GPG replacement that does not really invoke GPG."""
+ def __init__(self, keyring):
+ """Verify the keyring path and initialize MockedGPG."""
+ assert keyring == str(keyring_path)
+
+ self.known_keys = {*keyring_path.read_text().split('\n')} \
+ if keyring_path.exists() else set()
+
+ def recv_keys(self, keyserver, key):
+ """Mock key receiving - record requested key as received."""
+ assert keyserver == local_apt.default_keyserver
+ assert key not in self.known_keys
+
+ self.known_keys.add(key)
+ keyring_path.write_text('\n'.join(self.known_keys))
+
+ return MockedImportResult()
+
+ def list_keys(self, keys=None):
+ """Mock key listing - return a list with dummy items."""
+ if keys is None:
+ return ['dummy'] * len(self.known_keys)
+ else:
+ return ['dummy' for k in keys if k in self.known_keys]
+
+ def export_keys(self, keys, **kwargs):
+ """
+ Mock key export - check that the call has the expected arguments and
+ return a dummy bytes array.
+ """
+ assert kwargs['armor'] == False
+ assert kwargs['minimal'] == True
+ assert {*keys} == self.known_keys
+
+ return b'<dummy keys export>'
+
+ monkeypatch.setattr(gnupg, 'GPG', MockedGPG)
+
+def process_run_args(command, kwargs, expected_command):
+ """
+ Perform assertions common to all mocked subprocess.run() invocations and
+ extract variable parts of the command line (if any).
+ """
+ assert kwargs['env'] == {'LANG': 'en_US'}
+ assert kwargs['capture_output'] == True
+
+ return process_command(command, expected_command)
+
+def run_apt_get_update(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get update' command just touch some file in apt
+ root to indicate that the call was made.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'update']
+ conf_path = Path(process_run_args(command, kwargs, expected)['conf_path'])
+
+ (conf_path.parent / 'update_called').touch()
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+"""
+Output of 'apt-get install --yes --just-print libjs-mathjax' on some APT-based
+system.
+"""
+sample_install_stdout = '''\
+NOTE: This is only a simulation!
+ apt-get needs root privileges for real execution.
+ Keep also in mind that locking is deactivated,
+ so don't depend on the relevance to the real current situation!
+Reading package lists...
+Building dependency tree...
+Reading state information...
+The following additional packages will be installed:
+ fonts-mathjax
+Suggested packages:
+ fonts-mathjax-extras fonts-stix libjs-mathjax-doc
+The following NEW packages will be installed:
+ fonts-mathjax libjs-mathjax
+0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded.
+Inst fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+Inst libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+Conf fonts-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+Conf libjs-mathjax (2.7.9+dfsg-1 Devuan:4.0/stable, Devuan:1.0.0/unstable [all])
+'''
+
+def run_apt_get_install(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get install' command just print a possible
+ output of one.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'install',
+ '--yes', '--just-print', 'libjs-mathjax']
+
+ conf_path = Path(process_run_args(command, kwargs, expected)['conf_path'])
+
+ return MockedCompletedProcess(command, returncode,
+ stdout=sample_install_stdout,
+ text_output=kwargs.get('text'))
+
+def run_apt_get_download(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get download' command just write some dummy
+ .deb to the appropriate directory.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'download', 'libjs-mathjax']
+ if 'fonts-mathjax' in command:
+ expected.insert(-1, 'fonts-mathjax')
+
+ conf_path = Path(process_run_args(command, kwargs, expected)['conf_path'])
+
+ destination = Path(kwargs.get('cwd') or Path.cwd())
+
+ for word in expected:
+ if word.endswith('-mathjax'):
+ deb_path = destination / f'{word}_2.7.9+dfsg-1_all.deb'
+ deb_path.write_text(f'dummy {deb_path.name}')
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+def run_apt_get_source(command, returncode=0, **kwargs):
+ """
+ Instead of running an 'apt-get source' command just write some dummy
+ "tarballs" to the appropriate directory.
+ """
+ expected = ['apt-get', '-c', '<conf_path>', 'source',
+ '--download-only', 'libjs-mathjax=2.7.9+dfsg-1']
+ if 'fonts-mathjax=2.7.9+dfsg-1' in command:
+ if command[-1] == 'fonts-mathjax=2.7.9+dfsg-1':
+ expected.append('fonts-mathjax=2.7.9+dfsg-1')
+ else:
+ expected.insert(-1, 'fonts-mathjax=2.7.9+dfsg-1')
+
+ destination = Path(kwargs.get('cwd') or Path.cwd())
+ for filename in [
+ 'mathjax_2.7.9+dfsg-1.debian.tar.xz',
+ 'mathjax_2.7.9+dfsg-1.dsc',
+ 'mathjax_2.7.9+dfsg.orig.tar.xz'
+ ]:
+ (destination / filename).write_text(f'dummy {filename}')
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+def make_run_apt_get(**returncodes):
+ """
+ Produce a function that chooses and runs the appropriate one of
+ subprocess_run_apt_get_*() mock functions.
+ """
+ def mock_run(command, **kwargs):
+ """
+ Chooses and runs the appropriate one of subprocess_run_apt_get_*() mock
+ functions.
+ """
+ for subcommand, run in [
+ ('update', run_apt_get_update),
+ ('install', run_apt_get_install),
+ ('download', run_apt_get_download),
+ ('source', run_apt_get_source)
+ ]:
+ if subcommand in command:
+ returncode = returncodes.get(f'{subcommand}_code', 0)
+ return run(command, returncode, **kwargs)
+
+ raise Exception('Unknown command: {}'.format(' '.join(command)))
+
+ return mock_run
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get())
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_contextmanager(mock_cache_dir):
+ """
+ Verify that the local_apt() function creates a proper apt environment and
+ that it also properly restores it from cache.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ apt_root = Path(apt.apt_conf).parent.parent
+
+ assert (apt_root / 'etc' / 'trusted.gpg').read_bytes() == \
+ b'<dummy keys export>'
+
+ assert (apt_root / 'etc' / 'update_called').exists()
+
+ assert (apt_root / 'etc' / 'apt.sources.list').read_text() == \
+ 'deb-src sth\ndeb sth'
+
+ conf_lines = (apt_root / 'etc' / 'apt.conf').read_text().split('\n')
+
+ # check mocked keyring
+ assert {*local_apt.default_keys} == \
+ {*(mock_cache_dir / 'master_keyring.gpg').read_text().split('\n')}
+
+ assert not apt_root.exists()
+
+ expected_conf = {
+ 'Dir': str(apt_root),
+ 'Dir::State': f'{apt_root}/var/lib/apt',
+ 'Dir::State::status': f'{apt_root}/var/lib/dpkg/status',
+ 'Dir::Etc::SourceList': f'{apt_root}/etc/apt.sources.list',
+ 'Dir::Etc::SourceParts': '',
+ 'Dir::Cache': f'{apt_root}/var/cache/apt',
+ 'pkgCacheGen::Essential': 'none',
+ 'Dir::Etc::Trusted': f'{apt_root}/etc/trusted.gpg',
+ }
+
+ conf_regex = re.compile(r'^(?P<key>\S+)\s"(?P<val>\S*)";$')
+ assert dict([(m.group('key'), m.group('val'))
+ for l in conf_lines if l for m in [conf_regex.match(l)]]) == \
+ expected_conf
+
+ with ZipFile(mock_cache_dir / f'apt_{sources_list.identity()}.zip') as zf:
+ # reuse the same APT, its cached zip file should exist now
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ apt_root = Path(apt.apt_conf).parent.parent
+
+ expected_members = {*apt_root.rglob('*')}
+ expected_members.remove(apt_root / 'etc' / 'apt.conf')
+ expected_members.remove(apt_root / 'etc' / 'trusted.gpg')
+
+ names = zf.namelist()
+ assert len(names) == len(expected_members)
+
+ for name in names:
+ path = apt_root / name
+ assert path in expected_members
+ assert zf.read(name) == \
+ (b'' if path.is_dir() else path.read_bytes())
+
+ assert not apt_root.exists()
+
+@pytest.mark.subprocess_run(local_apt, run_missing_executable)
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_missing(mock_cache_dir):
+ """
+ Verify that the local_apt() function raises a proper error when 'apt-get'
+ command is missing.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+
+ with pytest.raises(local_apt.AptError) as excinfo:
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ pass
+
+ assert len(excinfo.value.args) == 1
+ assert isinstance(excinfo.value.args[0], str)
+ assert '\n' not in excinfo.value.args[0]
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(update_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_update_fail(mock_cache_dir):
+ """
+ Verify that the local_apt() function raises a proper error when
+ 'apt-get update' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+
+ with pytest.raises(local_apt.AptError) as excinfo:
+ with local_apt.local_apt(sources_list, local_apt.default_keys) as apt:
+ pass
+
+ assert len(excinfo.value.args) == 1
+
+ assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output',
+ excinfo.value.args[0])
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get())
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_download(mock_cache_dir):
+ """
+ Verify that download_apt_packages() function properly performs the download
+ of .debs and sources.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination)
+
+ libjs_mathjax_path = destination / 'libjs-mathjax_2.7.9+dfsg-1_all.deb'
+ fonts_mathjax_path = destination / 'fonts-mathjax_2.7.9+dfsg-1_all.deb'
+
+ source_paths = [
+ destination / 'mathjax_2.7.9+dfsg-1.debian.tar.xz',
+ destination / 'mathjax_2.7.9+dfsg-1.dsc',
+ destination / 'mathjax_2.7.9+dfsg.orig.tar.xz'
+ ]
+
+ assert {*destination.iterdir()} == {libjs_mathjax_path, *source_paths}
+
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination,
+ with_deps=True)
+
+ assert {*destination.iterdir()} == \
+ {libjs_mathjax_path, fonts_mathjax_path, *source_paths}
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(install_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_install_fail(mock_cache_dir):
+ """
+ Verify that the download_apt_packages() function raises a proper error when
+ 'apt-get install' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ with pytest.raises(local_apt.AptError) as excinfo:
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination,
+ with_deps=True)
+
+ assert len(excinfo.value.args) == 1
+
+ assert re.match(r'^.*\n\n.*\n\n', excinfo.value.args[0])
+ assert re.search(r'\n\nsome error output$', excinfo.value.args[0])
+ assert sample_install_stdout in excinfo.value.args[0]
+
+ assert [*destination.iterdir()] == []
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(download_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_download_fail(mock_cache_dir):
+ """
+ Verify that the download_apt_packages() function raises a proper error when
+ 'apt-get download' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ with pytest.raises(local_apt.AptError) as excinfo:
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination)
+
+ assert len(excinfo.value.args) == 1
+
+ assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output',
+ excinfo.value.args[0])
+
+ assert [*destination.iterdir()] == []
+
+@pytest.mark.subprocess_run(local_apt, make_run_apt_get(source_code=1))
+@pytest.mark.usefixtures('mock_subprocess_run', 'mock_gnupg_import')
+def test_local_apt_source_fail(mock_cache_dir):
+ """
+ Verify that the download_apt_packages() function raises a proper error when
+ 'apt-get source' command returns non-0.
+ """
+ sources_list = local_apt.SourcesList(['deb-src sth', 'deb sth'])
+ destination = mock_cache_dir / 'destination'
+ destination.mkdir()
+
+ with pytest.raises(local_apt.AptError) as excinfo:
+ local_apt.download_apt_packages(sources_list, local_apt.default_keys,
+ ['libjs-mathjax'], destination)
+
+ assert len(excinfo.value.args) == 1
+
+ assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output',
+ excinfo.value.args[0])
+
+ assert [*destination.iterdir()] == []
+
+def test_sources_list():
+ """Verify that the SourcesList class works properly."""
+ list = local_apt.SourcesList([], 'nabia')
+ assert list.identity() == 'nabia'
+
+ with pytest.raises(local_apt.DistroError):
+ local_apt.SourcesList([], 'nabiaƂ')
+
+ list = local_apt.SourcesList(['deb sth', 'deb-src sth'], 'nabia')
+ assert list.identity() == \
+ 'ef28d408b96046eae45c8ab3094ce69b2ac0c02a887e796b1d3d1a4f06fb49f1'
+
+def run_dpkg_deb(command, returncode=0, **kwargs):
+ """
+ Insted of running an 'dpkg-deb -x' command just create some dummy file
+ in the destination directory.
+ """
+ expected = ['dpkg-deb', '-x', '<deb_path>', '<dst_path>']
+
+ variables = process_run_args(command, kwargs, expected)
+ deb_path = Path(variables['deb_path'])
+ dst_path = Path(variables['dst_path'])
+
+ package_name = re.match('^([^_]+)_.*', deb_path.name).group(1)
+ for path in [
+ dst_path / 'etc' / f'dummy_{package_name}_config',
+ dst_path / 'usr/share/doc' / package_name / 'copyright'
+ ]:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_text(f'dummy {path.name}')
+
+ return MockedCompletedProcess(command, returncode,
+ text_output=kwargs.get('text'))
+
+def download_apt_packages(list, keys, packages, destination_dir,
+ with_deps=False):
+ """
+ Replacement for download_apt_packages() function in local_apt.py, for
+ unit-testing the piggybacked_system() function.
+ """
+ for path in [
+ destination_dir / 'some-bin-package_1.1-2_all.deb',
+ destination_dir / 'another-package_1.1-2_all.deb',
+ destination_dir / 'some-source-package_1.1.orig.tar.gz',
+ destination_dir / 'some-source-package_1.1-1.dsc'
+ ]:
+ path.write_text(f'dummy {path.name}')
+
+ with open(destination_dir / 'test_data.json', 'w') as out:
+ json.dump({
+ 'list_identity': list.identity(),
+ 'keys': keys,
+ 'packages': packages,
+ 'with_deps': with_deps
+ }, out)
+
+@pytest.fixture
+def mock_download_packages(monkeypatch):
+ """Mock the download_apt_packages() function in local_apt.py."""
+ monkeypatch.setattr(local_apt, 'download_apt_packages',
+ download_apt_packages)
+
+@pytest.mark.subprocess_run(local_apt, run_dpkg_deb)
+@pytest.mark.parametrize('params', [
+ {
+ 'with_deps': False,
+ 'base_depends': True,
+ 'identity': 'nabia',
+ 'props': {'distribution': 'nabia', 'dependencies': False},
+ 'all_keys': local_apt.default_keys
+ },
+ {
+ 'with_deps': True,
+ 'base_depends': False,
+ 'identity': '38db0b4fa2f6610cd1398b66a2c05d9abb1285f9a055a96eb96dee0f6b72aca8',
+ 'props': {
+ 'sources_list': [f'deb{suf} http://example.com/ stable main'
+ for suf in ('', '-src')],
+ 'trusted_keys': ['AB' * 20],
+ 'dependencies': True,
+ 'depend_on_base_packages': False
+ },
+ 'all_keys': [*local_apt.default_keys, 'AB' * 20],
+ }
+])
+@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run')
+def test_piggybacked_system_download(params):
+ """
+ Verify that the piggybacked_system() function properly downloads and unpacks
+ APT packages.
+ """
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ **params['props'],
+ 'packages': ['some-bin-package', 'another-package=1.1-2']
+ }, None) as piggybacked:
+ expected_depends = [{'identifier': 'apt-common-licenses'}] \
+ if params['base_depends'] else []
+ assert piggybacked.package_must_depend == expected_depends
+
+ archive_files = dict(piggybacked.archive_files())
+
+ archive_names = [
+ 'some-bin-package_1.1-2_all.deb',
+ 'another-package_1.1-2_all.deb',
+ 'some-source-package_1.1.orig.tar.gz',
+ 'some-source-package_1.1-1.dsc',
+ 'test_data.json'
+ ]
+ assert {*archive_files.keys()} == \
+ {PurePosixPath('apt') / n for n in archive_names}
+
+ for path in archive_files.values():
+ if path.name == 'test_data.json':
+ assert json.loads(path.read_text()) == {
+ 'list_identity': params['identity'],
+ 'keys': params['all_keys'],
+ 'packages': ['some-bin-package', 'another-package=1.1-2'],
+ 'with_deps': params['with_deps']
+ }
+ else:
+ assert path.read_text() == f'dummy {path.name}'
+
+ license_files = {*piggybacked.package_license_files}
+
+ assert license_files == {
+ PurePosixPath('.apt-root/usr/share/doc/another-package/copyright'),
+ PurePosixPath('.apt-root/usr/share/doc/some-bin-package/copyright')
+ }
+
+ assert ['dummy copyright'] * 2 == \
+ [piggybacked.resolve_file(p).read_text() for p in license_files]
+
+ for name in ['some-bin-package', 'another-package']:
+ path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config')
+ assert piggybacked.resolve_file(path).read_text() == \
+ f'dummy {path.name}'
+
+ assert piggybacked.resolve_file(PurePosixPath('a/b/c')) == None
+ assert piggybacked.resolve_file(PurePosixPath('')) == None
+
+ with pytest.raises(FileReferenceError):
+ piggybacked.resolve_file(PurePosixPath('.apt-root/a/../../../b'))
+
+ root = piggybacked.resolve_file(PurePosixPath('.apt-root/dummy')).parent
+ assert root.is_dir()
+
+ assert not root.exists()
+
+@pytest.mark.subprocess_run(local_apt, run_dpkg_deb)
+@pytest.mark.usefixtures('mock_subprocess_run')
+def test_piggybacked_system_no_download():
+ """
+ Verify that the piggybacked_system() function is able to use pre-downloaded
+ APT packages.
+ """
+ archive_names = {
+ f'{package}{rest}'
+ for package in ('some-lib_1:2.3', 'other-lib_4.45.2')
+ for rest in ('-1_all.deb', '.orig.tar.gz', '-1.debian.tar.xz', '-1.dsc')
+ }
+
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ (td / 'apt').mkdir()
+ for name in archive_names:
+ (td / 'apt' / name).write_text(f'dummy {name}')
+
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'dependencies': True,
+ 'packages': ['whatever', 'whatever2']
+ }, td) as piggybacked:
+ archive_files = dict(piggybacked.archive_files())
+
+ assert {*archive_files.keys()} == \
+ {PurePosixPath('apt') / name for name in archive_names}
+
+ for path in archive_files.values():
+ assert path.read_text() == f'dummy {path.name}'
+
+ assert {*piggybacked.package_license_files} == {
+ PurePosixPath('.apt-root/usr/share/doc/some-lib/copyright'),
+ PurePosixPath('.apt-root/usr/share/doc/other-lib/copyright')
+ }
+
+ for name in ['some-lib', 'other-lib']:
+ path = PurePosixPath(f'.apt-root/etc/dummy_{name}_config')
+ assert piggybacked.resolve_file(path).read_text() == \
+ f'dummy {path.name}'
+
+@pytest.mark.subprocess_run(local_apt, run_missing_executable)
+@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run')
+def test_piggybacked_system_missing():
+ """
+ Verify that the piggybacked_system() function raises a proper error when
+ 'dpkg-deb' is missing.
+ """
+ with pytest.raises(local_apt.AptError) as excinfo:
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['some-package'],
+ 'dependencies': False
+ }, None) as piggybacked:
+ pass
+
+ assert len(excinfo.value.args) == 1
+
+ assert '\n' not in excinfo.value.args[0]
+
+
+@pytest.mark.subprocess_run(local_apt, lambda c, **kw: run_dpkg_deb(c, 1, **kw))
+@pytest.mark.usefixtures('mock_download_packages', 'mock_subprocess_run')
+def test_piggybacked_system_fail():
+ """
+ Verify that the piggybacked_system() function raises a proper error when
+ 'dpkg-deb -x' command returns non-0.
+ """
+ with pytest.raises(local_apt.AptError) as excinfo:
+ with local_apt.piggybacked_system({
+ 'system': 'apt',
+ 'distribution': 'nabia',
+ 'packages': ['some-package'],
+ 'dependencies': False
+ }, None) as piggybacked:
+ pass
+
+ assert len(excinfo.value.args) == 1
+
+ assert re.match(r'.*\n\n.*\n\nsome output\n\n.*\n\nsome error output',
+ excinfo.value.args[0])