aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/builder
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-05-02 21:26:59 +0200
committerWojtek Kosior <koszko@koszko.org>2022-05-10 12:41:38 +0200
commit61f0aa75c64732063988826400ebc9f8e01ee3bb (patch)
tree3f1fadb196afe06892194eb31c731964c0f62f21 /src/hydrilla/builder
parent9dda3aa988a9482d6292a655f4846f7d4b450315 (diff)
downloadhydrilla-builder-61f0aa75c64732063988826400ebc9f8e01ee3bb.tar.gz
hydrilla-builder-61f0aa75c64732063988826400ebc9f8e01ee3bb.zip
support piggybacking on APT packages
Diffstat (limited to 'src/hydrilla/builder')
-rw-r--r--src/hydrilla/builder/build.py298
-rw-r--r--src/hydrilla/builder/common_errors.py67
-rw-r--r--src/hydrilla/builder/local_apt.py428
-rw-r--r--src/hydrilla/builder/piggybacking.py115
4 files changed, 771 insertions, 137 deletions
diff --git a/src/hydrilla/builder/build.py b/src/hydrilla/builder/build.py
index 8eec4a4..ce4935c 100644
--- a/src/hydrilla/builder/build.py
+++ b/src/hydrilla/builder/build.py
@@ -30,21 +30,28 @@ from __future__ import annotations
import json
import re
import zipfile
-from pathlib import Path
+import subprocess
+from pathlib import Path, PurePosixPath
from hashlib import sha256
from sys import stderr
+from contextlib import contextmanager
+from tempfile import TemporaryDirectory, TemporaryFile
+from typing import Optional, Iterable, Union
import jsonschema
import click
from .. import util
from . import _version
+from . import local_apt
+from .piggybacking import Piggybacked
+from .common_errors import *
here = Path(__file__).resolve().parent
_ = util.translation(here / 'locales').gettext
-index_validator = util.validator_for('package_source-1.0.1.schema.json')
+index_validator = util.validator_for('package_source-2.schema.json')
schemas_root = 'https://hydrilla.koszko.org/schemas'
@@ -53,202 +60,201 @@ generated_by = {
'version': _version.version
}
-class FileReferenceError(Exception):
- """
- Exception used to report various problems concerning files referenced from
- source package's index.json.
- """
-
-class ReuseError(Exception):
+class ReuseError(SubprocessError):
"""
Exception used to report various problems when calling the REUSE tool.
"""
-class FileBuffer:
- """
- Implement a file-like object that buffers data written to it.
- """
- def __init__(self):
- """
- Initialize FileBuffer.
- """
- self.chunks = []
-
- def write(self, b):
- """
- Buffer 'b', return number of bytes buffered.
-
- 'b' is expected to be an instance of 'bytes' or 'str', in which case it
- gets encoded as UTF-8.
- """
- if type(b) is str:
- b = b.encode()
- self.chunks.append(b)
- return len(b)
-
- def flush(self):
- """
- A no-op mock of file-like object's flush() method.
- """
- pass
-
- def get_bytes(self):
- """
- Return all data written so far concatenated into a single 'bytes'
- object.
- """
- return b''.join(self.chunks)
-
-def generate_spdx_report(root):
+def generate_spdx_report(root: Path) -> bytes:
"""
Use REUSE tool to generate an SPDX report for sources under 'root' and
return the report's contents as 'bytes'.
- 'root' shall be an instance of pathlib.Path.
-
In case the directory tree under 'root' does not constitute a
- REUSE-compliant package, linting report is printed to standard output and
- an exception is raised.
+ REUSE-compliant package, as exception is raised with linting report
+ included in it.
- In case the reuse package is not installed, an exception is also raised.
+ In case the reuse tool is not installed, an exception is also raised.
"""
- try:
- from reuse._main import main as reuse_main
- except ModuleNotFoundError:
- raise ReuseError(_('couldnt_import_reuse_is_it_installed'))
+ for command in [
+ ['reuse', '--root', str(root), 'lint'],
+ ['reuse', '--root', str(root), 'spdx']
+ ]:
+ try:
+ cp = subprocess.run(command, capture_output=True, text=True)
+ except FileNotFoundError:
+ raise ReuseError(_('couldnt_execute_reuse_is_it_installed'))
- mocked_output = FileBuffer()
- if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0:
- stderr.write(mocked_output.get_bytes().decode())
- raise ReuseError(_('spdx_report_from_reuse_incompliant'))
+ if cp.returncode != 0:
+ msg = _('reuse_command_{}_failed').format(' '.join(command))
+ raise ReuseError(msg, cp)
- mocked_output = FileBuffer()
- if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0:
- stderr.write(mocked_output.get_bytes().decode())
- raise ReuseError("Couldn't generate an SPDX report for package.")
-
- return mocked_output.get_bytes()
+ return cp.stdout.encode()
class FileRef:
"""Represent reference to a file in the package."""
- def __init__(self, path: Path, contents: bytes):
+ def __init__(self, path: PurePosixPath, contents: bytes) -> None:
"""Initialize FileRef."""
- self.include_in_distribution = False
- self.include_in_zipfile = True
- self.path = path
- self.contents = contents
+ self.include_in_distribution = False
+ self.include_in_source_archive = True
+ self.path = path
+ self.contents = contents
self.contents_hash = sha256(contents).digest().hex()
- def make_ref_dict(self, filename: str):
+ def make_ref_dict(self) -> dict[str, str]:
"""
Represent the file reference through a dict that can be included in JSON
defintions.
"""
return {
- 'file': filename,
+ 'file': str(self.path),
'sha256': self.contents_hash
}
+@contextmanager
+def piggybacked_system(piggyback_def: Optional[dict],
+ piggyback_files: Optional[Path]) \
+ -> Iterable[Piggybacked]:
+ """
+ Resolve resources from a foreign software packaging system. Optionally, use
+ package files (.deb's, etc.) from a specified directory instead of resolving
+ and downloading them.
+ """
+ if piggyback_def is None:
+ yield Piggybacked()
+ else:
+ # apt is the only supported system right now
+ assert piggyback_def['system'] == 'apt'
+
+ with local_apt.piggybacked_system(piggyback_def, piggyback_files) \
+ as piggybacked:
+ yield piggybacked
+
class Build:
"""
Build a Hydrilla package.
"""
- def __init__(self, srcdir, index_json_path):
+ def __init__(self, srcdir: Path, index_json_path: Path,
+ piggyback_files: Optional[Path]=None):
"""
Initialize a build. All files to be included in a distribution package
are loaded into memory, all data gets validated and all necessary
computations (e.g. preparing of hashes) are performed.
-
- 'srcdir' and 'index_json' are expected to be pathlib.Path objects.
"""
self.srcdir = srcdir.resolve()
- self.index_json_path = index_json_path
+ self.piggyback_files = piggyback_files
+ # TODO: the piggyback files we set are ignored for now; use them
+ if piggyback_files is None:
+ piggyback_default_path = \
+ srcdir.parent / f'{srcdir.name}.foreign-packages'
+ if piggyback_default_path.exists():
+ self.piggyback_files = piggyback_default_path
self.files_by_path = {}
self.resource_list = []
self.mapping_list = []
if not index_json_path.is_absolute():
- self.index_json_path = (self.srcdir / self.index_json_path)
-
- self.index_json_path = self.index_json_path.resolve()
+ index_json_path = (self.srcdir / index_json_path)
- with open(self.index_json_path, 'rt') as index_file:
+ with open(index_json_path, 'rt') as index_file:
index_json_text = index_file.read()
index_obj = json.loads(util.strip_json_comments(index_json_text))
- self.files_by_path[self.srcdir / 'index.json'] = \
- FileRef(self.srcdir / 'index.json', index_json_text.encode())
+ index_desired_path = PurePosixPath('index.json')
+ self.files_by_path[index_desired_path] = \
+ FileRef(index_desired_path, index_json_text.encode())
self._process_index_json(index_obj)
- def _process_file(self, filename: str, include_in_distribution: bool=True):
+ def _process_file(self, filename: Union[str, PurePosixPath],
+ piggybacked: Piggybacked,
+ include_in_distribution: bool=True):
"""
Resolve 'filename' relative to srcdir, load it to memory (if not loaded
before), compute its hash and store its information in
'self.files_by_path'.
- 'filename' shall represent a relative path using '/' as a separator.
+ 'filename' shall represent a relative path withing package directory.
if 'include_in_distribution' is True it shall cause the file to not only
be included in the source package's zipfile, but also written as one of
built package's files.
+ For each file an attempt is made to resolve it using 'piggybacked'
+ object. If a file is found and pulled from foreign software packaging
+ system this way, it gets automatically excluded from inclusion in
+ Hydrilla source package's zipfile.
+
Return file's reference object that can be included in JSON defintions
of various kinds.
"""
- path = self.srcdir
- for segment in filename.split('/'):
- path /= segment
-
- path = path.resolve()
- if not path.is_relative_to(self.srcdir):
- raise FileReferenceError(_('loading_{}_outside_package_dir')
- .format(filename))
-
- if str(path.relative_to(self.srcdir)) == 'index.json':
- raise FileReferenceError(_('loading_reserved_index_json'))
+ include_in_source_archive = True
+
+ desired_path = PurePosixPath(filename)
+ if '..' in desired_path.parts:
+ msg = _('path_contains_double_dot_{}').format(filename)
+ raise FileReferenceError(msg)
+
+ path = piggybacked.resolve_file(desired_path)
+ if path is None:
+ path = (self.srcdir / desired_path).resolve()
+ if not path.is_relative_to(self.srcdir):
+ raise FileReferenceError(_('loading_{}_outside_package_dir')
+ .format(filename))
+
+ if str(path.relative_to(self.srcdir)) == 'index.json':
+ raise FileReferenceError(_('loading_reserved_index_json'))
+ else:
+ include_in_source_archive = False
- file_ref = self.files_by_path.get(path)
+ file_ref = self.files_by_path.get(desired_path)
if file_ref is None:
with open(path, 'rb') as file_handle:
contents = file_handle.read()
- file_ref = FileRef(path, contents)
- self.files_by_path[path] = file_ref
+ file_ref = FileRef(desired_path, contents)
+ self.files_by_path[desired_path] = file_ref
if include_in_distribution:
file_ref.include_in_distribution = True
- return file_ref.make_ref_dict(filename)
+ if not include_in_source_archive:
+ file_ref.include_in_source_archive = False
+
+ return file_ref.make_ref_dict()
- def _prepare_source_package_zip(self, root_dir_name: str):
+ def _prepare_source_package_zip(self, source_name: str,
+ piggybacked: Piggybacked) -> str:
"""
Create and store in memory a .zip archive containing files needed to
build this source package.
- 'root_dir_name' shall not contain any slashes ('/').
+ 'src_dir_name' shall not contain any slashes ('/').
Return zipfile's sha256 sum's hexstring.
"""
- fb = FileBuffer()
- root_dir_path = Path(root_dir_name)
+ tf = TemporaryFile()
+ source_dir_path = PurePosixPath(source_name)
+ piggybacked_dir_path = PurePosixPath(f'{source_name}.foreign-packages')
- def zippath(file_path):
- file_path = root_dir_path / file_path.relative_to(self.srcdir)
- return file_path.as_posix()
-
- with zipfile.ZipFile(fb, 'w') as xpi:
+ with zipfile.ZipFile(tf, 'w') as zf:
for file_ref in self.files_by_path.values():
- if file_ref.include_in_zipfile:
- xpi.writestr(zippath(file_ref.path), file_ref.contents)
+ if file_ref.include_in_source_archive:
+ zf.writestr(str(source_dir_path / file_ref.path),
+ file_ref.contents)
+
+ for desired_path, real_path in piggybacked.archive_files():
+ zf.writestr(str(piggybacked_dir_path / desired_path),
+ real_path.read_bytes())
- self.source_zip_contents = fb.get_bytes()
+ tf.seek(0)
+ self.source_zip_contents = tf.read()
return sha256(self.source_zip_contents).digest().hex()
- def _process_item(self, item_def: dict):
+ def _process_item(self, item_def: dict, piggybacked: Piggybacked):
"""
Process 'item_def' as definition of a resource/mapping and store in
memory its processed form and files used by it.
@@ -266,14 +272,14 @@ class Build:
copy_props.append('revision')
- script_file_refs = [self._process_file(f['file'])
+ script_file_refs = [self._process_file(f['file'], piggybacked)
for f in item_def.get('scripts', [])]
deps = [{'identifier': res_ref['identifier']}
for res_ref in item_def.get('dependencies', [])]
new_item_obj = {
- 'dependencies': deps,
+ 'dependencies': [*piggybacked.package_must_depend, *deps],
'scripts': script_file_refs
}
else:
@@ -308,41 +314,54 @@ class Build:
in it.
"""
index_validator.validate(index_obj)
+ match = re.match(r'.*-((([1-9][0-9]*|0)\.)+)schema\.json$',
+ index_obj['$schema'])
+ self.source_schema_ver = \
+ [int(n) for n in filter(None, match.group(1).split('.'))]
- schema = f'{schemas_root}/api_source_description-1.schema.json'
+ out_schema = f'{schemas_root}/api_source_description-1.schema.json'
self.source_name = index_obj['source_name']
generate_spdx = index_obj.get('reuse_generate_spdx_report', False)
if generate_spdx:
contents = generate_spdx_report(self.srcdir)
- spdx_path = (self.srcdir / 'report.spdx').resolve()
+ spdx_path = PurePosixPath('report.spdx')
spdx_ref = FileRef(spdx_path, contents)
- spdx_ref.include_in_zipfile = False
+ spdx_ref.include_in_source_archive = False
self.files_by_path[spdx_path] = spdx_ref
- self.copyright_file_refs = \
- [self._process_file(f['file']) for f in index_obj['copyright']]
+ piggyback_def = None
+ if self.source_schema_ver >= [1, 1] and 'piggyback_on' in index_obj:
+ piggyback_def = index_obj['piggyback_on']
- if generate_spdx and not spdx_ref.include_in_distribution:
- raise FileReferenceError(_('report_spdx_not_in_copyright_list'))
+ with piggybacked_system(piggyback_def, self.piggyback_files) \
+ as piggybacked:
+ copyright_to_process = [
+ *(file_ref['file'] for file_ref in index_obj['copyright']),
+ *piggybacked.package_license_files
+ ]
+ self.copyright_file_refs = [self._process_file(f, piggybacked)
+ for f in copyright_to_process]
- item_refs = [self._process_item(d) for d in index_obj['definitions']]
+ if generate_spdx and not spdx_ref.include_in_distribution:
+ raise FileReferenceError(_('report_spdx_not_in_copyright_list'))
- for file_ref in index_obj.get('additional_files', []):
- self._process_file(file_ref['file'], include_in_distribution=False)
+ item_refs = [self._process_item(d, piggybacked)
+ for d in index_obj['definitions']]
- root_dir_path = Path(self.source_name)
+ for file_ref in index_obj.get('additional_files', []):
+ self._process_file(file_ref['file'], piggybacked,
+ include_in_distribution=False)
- source_archives_obj = {
- 'zip' : {
- 'sha256': self._prepare_source_package_zip(root_dir_path)
- }
- }
+ zipfile_sha256 = self._prepare_source_package_zip\
+ (self.source_name, piggybacked)
+
+ source_archives_obj = {'zip' : {'sha256': zipfile_sha256}}
self.source_description = {
- '$schema': schema,
+ '$schema': out_schema,
'source_name': self.source_name,
'source_copyright': self.copyright_file_refs,
'upstream_url': index_obj['upstream_url'],
@@ -398,20 +417,25 @@ class Build:
dir_type = click.Path(exists=True, file_okay=False, resolve_path=True)
+@click.command(help=_('build_package_from_srcdir_to_dstdir'))
@click.option('-s', '--srcdir', default='./', type=dir_type, show_default=True,
help=_('source_directory_to_build_from'))
@click.option('-i', '--index-json', default='index.json', type=click.Path(),
help=_('path_instead_of_index_json'))
+@click.option('-p', '--piggyback-files', type=click.Path(),
+ help=_('path_instead_for_piggyback_files'))
@click.option('-d', '--dstdir', type=dir_type, required=True,
help=_('built_package_files_destination'))
@click.version_option(version=_version.version, prog_name='Hydrilla builder',
message=_('%(prog)s_%(version)s_license'),
help=_('version_printing'))
-def perform(srcdir, index_json, dstdir):
- """<this will be replaced by a localized docstring for Click to pick up>"""
- build = Build(Path(srcdir), Path(index_json))
- build.write_package_files(Path(dstdir))
-
-perform.__doc__ = _('build_package_from_srcdir_to_dstdir')
+def perform(srcdir, index_json, piggyback_files, dstdir):
+ """
+ Execute Hydrilla builder to turn source package into a distributable one.
-perform = click.command()(perform)
+ This command is meant to be the entry point of hydrilla-builder command
+ exported by this package.
+ """
+ build = Build(Path(srcdir), Path(index_json),
+ piggyback_files and Path(piggyback_files))
+ build.write_package_files(Path(dstdir))
diff --git a/src/hydrilla/builder/common_errors.py b/src/hydrilla/builder/common_errors.py
new file mode 100644
index 0000000..29782e1
--- /dev/null
+++ b/src/hydrilla/builder/common_errors.py
@@ -0,0 +1,67 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+# Error classes.
+#
+# This file is part of Hydrilla
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+This module defines error types for use in other parts of Hydrilla builder.
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+from pathlib import Path
+
+from .. import util
+
+here = Path(__file__).resolve().parent
+
+_ = util.translation(here / 'locales').gettext
+
+class DistroError(Exception):
+ """
+ Exception used to report problems when resolving an OS distribution.
+ """
+
+class FileReferenceError(Exception):
+ """
+ Exception used to report various problems concerning files referenced from
+ source package.
+ """
+
+class SubprocessError(Exception):
+ """
+ Exception used to report problems related to execution of external
+ processes, includes. various problems when calling apt-* and dpkg-*
+ commands.
+ """
+ def __init__(self, msg: str, cp: Optional[CP]=None) -> None:
+ """Initialize this SubprocessError"""
+ if cp and cp.stdout:
+ msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout])
+
+ if cp and cp.stderr:
+ msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr])
+
+ super().__init__(msg)
diff --git a/src/hydrilla/builder/local_apt.py b/src/hydrilla/builder/local_apt.py
new file mode 100644
index 0000000..8382af8
--- /dev/null
+++ b/src/hydrilla/builder/local_apt.py
@@ -0,0 +1,428 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+# Using a local APT.
+#
+# This file is part of Hydrilla
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import zipfile
+import shutil
+import re
+import subprocess
+CP = subprocess.CompletedProcess
+from pathlib import Path, PurePosixPath
+from tempfile import TemporaryDirectory, NamedTemporaryFile
+from hashlib import sha256
+from contextlib import contextmanager
+from typing import Optional, Iterable
+
+from .. import util
+from .piggybacking import Piggybacked
+from .common_errors import *
+
+here = Path(__file__).resolve().parent
+
+_ = util.translation(here / 'locales').gettext
+
+"""
+Default cache directory to save APT configurations and downloaded GPG keys in.
+"""
+default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt'
+
+"""
+Default keyserver to use.
+"""
+default_keyserver = 'hkps://keyserver.ubuntu.com:443'
+
+"""
+Default keys to download when using a local APT.
+"""
+default_keys = [
+ # Trisquel
+ 'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1',
+ '60364C9869F92450421F0C22B138CA450C05112F',
+ # Ubuntu
+ '630239CC130E1A7FD81A27B140976EAF437D05B5',
+ '790BC7277767219C42C86F933B4FE6ACC0B21F32',
+ 'F6ECB3762474EDA9D21B7022871920D1991BC93C',
+ # Debian
+ '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517',
+ '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE',
+ 'AC530D520F2F3269F5E98313A48449044AAD5C5D'
+]
+
+"""sources.list file contents for known distros."""
+default_lists = {
+ 'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main'
+ for type in ('deb', 'deb-src')
+ for suf in ('', '-updates', '-security')]
+}
+
+class GpgError(Exception):
+ """
+ Exception used to report various problems when calling GPG.
+ """
+
+class AptError(SubprocessError):
+ """
+ Exception used to report various problems when calling apt-* and dpkg-*
+ commands.
+ """
+
+def run(command, **kwargs):
+ """A wrapped around subprocess.run that sets some default options."""
+ return subprocess.run(command, **kwargs, env={'LANG': 'en_US'},
+ capture_output=True, text=True)
+
+class Apt:
+ """
+ This class represents an APT instance and can be used to call apt-get
+ commands with it.
+ """
+ def __init__(self, apt_conf: str) -> None:
+ """Initialize this Apt object."""
+ self.apt_conf = apt_conf
+
+ def get(self, *args: str, **kwargs) -> CP:
+ """
+ Run apt-get with the specified arguments and raise a meaningful AptError
+ when something goes wrong.
+ """
+ command = ['apt-get', '-c', self.apt_conf, *args]
+ try:
+ cp = run(command, **kwargs)
+ except FileNotFoundError:
+ raise AptError(_('couldnt_execute_apt_get_is_it_installed'))
+
+ if cp.returncode != 0:
+ msg = _('apt_get_command_{}_failed').format(' '.join(command))
+ raise AptError(msg, cp)
+
+ return cp
+
+def cache_dir() -> Path:
+ """
+ Return the directory used to cache data (APT configurations, keyrings) to
+ speed up repeated operations.
+
+ This function first ensures the directory exists.
+ """
+ default_apt_cache_dir.mkdir(parents=True, exist_ok=True)
+ return default_apt_cache_dir
+
+class SourcesList:
+ """Representation of apt's sources.list contents."""
+ def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None:
+ """Initialize this SourcesList."""
+ self.codename = None
+ self.list = [*list]
+ self.has_extra_entries = bool(self.list)
+
+ if codename is not None:
+ if codename not in default_lists:
+ raise DistroError(_('distro_{}_unknown').format(codename))
+
+ self.codename = codename
+ self.list.extend(default_lists[codename])
+
+ def identity(self) -> str:
+ """
+ Produce a string that uniquely identifies this sources.list contents.
+ """
+ if self.codename and not self.has_extra_entries:
+ return self.codename
+
+ return sha256('\n'.join(sorted(self.list)).encode()).digest().hex()
+
+def apt_conf(directory: Path) -> str:
+ """
+ Given local APT's directory, produce a configuration suitable for running
+ APT there.
+
+ 'directory' must not contain any special characters including quotes and
+ spaces.
+ """
+ return f'''
+Dir "{directory}";
+Dir::State "{directory}/var/lib/apt";
+Dir::State::status "{directory}/var/lib/dpkg/status";
+Dir::Etc::SourceList "{directory}/etc/apt.sources.list";
+Dir::Etc::SourceParts "";
+Dir::Cache "{directory}/var/cache/apt";
+pkgCacheGen::Essential "none";
+Dir::Etc::Trusted "{directory}/etc/trusted.gpg";
+'''
+
+def apt_keyring(keys: [str]) -> bytes:
+ """
+ Download the requested keys if necessary and export them as a keyring
+ suitable for passing to APT.
+
+ The keyring is returned as a bytes value that should be written to a file.
+ """
+ try:
+ from gnupg import GPG
+ except ModuleNotFoundError:
+ raise GpgError(_('couldnt_import_gnupg_is_it_installed'))
+
+ gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg'))
+ for key in keys:
+ if gpg.list_keys(keys=[key]) != []:
+ continue
+
+ if gpg.recv_keys(default_keyserver, key).imported == 0:
+ raise GpgError(_('gpg_couldnt_recv_key'))
+
+ return gpg.export_keys(keys, armor=False, minimal=True)
+
+def cache_apt_root(apt_root: Path, destination_zip: Path) -> None:
+ """
+ Zip an APT root directory for later use and move the zipfile to the
+ requested destination.
+ """
+ temporary_zip_path = None
+ try:
+ tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_',
+ dir=cache_dir(), delete=False)
+ temporary_zip_path = Path(tmpfile.name)
+
+ to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'}
+
+ with zipfile.ZipFile(tmpfile, 'w') as zf:
+ for member in apt_root.rglob('*'):
+ relative = member.relative_to(apt_root)
+ if relative not in to_skip:
+ # This call will also properly add empty folders to zip file
+ zf.write(member, relative, zipfile.ZIP_DEFLATED)
+
+ shutil.move(temporary_zip_path, destination_zip)
+ finally:
+ if temporary_zip_path is not None and temporary_zip_path.exists():
+ temporary_zip_path.unlink()
+
+def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt:
+ """
+ Create files and directories necessary for running APT without root rights
+ inside 'directory'.
+
+ 'directory' must not contain any special characters including quotes and
+ spaces and must be empty.
+
+ Return an Apt object that can be used to call apt-get commands.
+ """
+ apt_root = directory / 'apt_root'
+
+ conf_text = apt_conf(apt_root)
+ keyring_bytes = apt_keyring(keys)
+
+ apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip'
+ if apt_zipfile.exists():
+ with zipfile.ZipFile(apt_zipfile) as zf:
+ zf.extractall(apt_root)
+
+ for to_create in (
+ apt_root / 'var' / 'lib' / 'apt' / 'partial',
+ apt_root / 'var' / 'lib' / 'apt' / 'lists',
+ apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial',
+ apt_root / 'etc' / 'apt' / 'preferences.d',
+ apt_root / 'var' / 'lib' / 'dpkg',
+ apt_root / 'var' / 'log' / 'apt'
+ ):
+ to_create.mkdir(parents=True, exist_ok=True)
+
+ conf_path = apt_root / 'etc' / 'apt.conf'
+ trusted_path = apt_root / 'etc' / 'trusted.gpg'
+ status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status'
+ list_path = apt_root / 'etc' / 'apt.sources.list'
+
+ conf_path.write_text(conf_text)
+ trusted_path.write_bytes(keyring_bytes)
+ status_path.touch()
+ list_path.write_text('\n'.join(list.list))
+
+ apt = Apt(str(conf_path))
+ apt.get('update')
+
+ cache_apt_root(apt_root, apt_zipfile)
+
+ return apt
+
+@contextmanager
+def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]:
+ """
+ Create a temporary directory with proper local APT configuration in it.
+ Yield an Apt object that can be used to issue apt-get commands.
+
+ This function returns a context manager that will remove the directory on
+ close.
+ """
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ yield setup_local_apt(td, list, keys)
+
+def download_apt_packages(list: SourcesList, keys: [str], packages: [str],
+ destination_dir: Path, with_deps=False) -> [str]:
+ """
+ Set up a local APT, update it using the specified sources.list configuration
+ and use it to download the specified packages.
+
+ This function downloads a .deb file of the packages matching the current
+ architecture (which includes packages with architecture 'all') as well as
+ all theis corresponding source package files and (if requested) the debs
+ and source files of all their declared dependencies.
+
+ Return value is a list of names of all downloaded files.
+ """
+ with local_apt(list, keys) as apt:
+ if with_deps:
+ cp = apt.get('install', '--yes', '--just-print', *packages)
+
+ deps_listing = re.match(
+ r'''
+ .*
+ The\sfollowing\sNEW\spackages\swill\sbe\sinstalled:
+ (.*)
+ 0\supgraded,
+ ''',
+ cp.stdout,
+ re.MULTILINE | re.DOTALL | re.VERBOSE)
+
+ if deps_listing is None:
+ raise AptError(_('apt_install_output_not_understood'), cp)
+
+ packages = deps_listing.group(1).split()
+
+ # Download .debs to indirectly to destination_dir by first placing them
+ # in a temporary subdirectory.
+ with TemporaryDirectory(dir=destination_dir) as td:
+ td = Path(td)
+ cp = apt.get('download', *packages, cwd=td)
+
+ deb_name_regex = re.compile(
+ r'''
+ ^
+ (?P<name>[^_]+)
+ _
+ (?P<ver>[^_]+)
+ _
+ .+ # architecture (or 'all')
+ \.deb
+ $
+ ''',
+ re.VERBOSE)
+
+ names_vers = []
+ downloaded = []
+ for deb_file in td.iterdir():
+ match = deb_name_regex.match(deb_file.name)
+ if match is None:
+ msg = _('apt_download_gave_bad_filename_{}')\
+ .format(deb_file.name)
+ raise AptError(msg, cp)
+
+ names_vers.append((match.group('name'), match.group('ver')))
+ downloaded.append(deb_file.name)
+
+ apt.get('source', '--download-only',
+ *[f'{n}={v}' for n, v in names_vers], cwd=td)
+
+ for source_file in td.iterdir():
+ if source_file.name in downloaded:
+ continue
+
+ downloaded.append(source_file.name)
+
+ for filename in downloaded:
+ shutil.move(td / filename, destination_dir / filename)
+
+ return downloaded
+
+@contextmanager
+def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \
+ -> Iterable[Piggybacked]:
+ """
+ Resolve resources from APT. Optionally, use package files (.deb's, etc.)
+ from a specified directory instead of resolving and downloading them.
+
+ The directories and files created for the yielded Piggybacked object shall
+ be deleted when this context manager gets closed.
+ """
+ assert piggyback_def['system'] == 'apt'
+
+ with TemporaryDirectory() as td:
+ td = Path(td)
+ root = td / 'root'
+ root.mkdir()
+
+ if foreign_packages is None:
+ archives = td / 'archives'
+ archives.mkdir()
+
+ sources_list = SourcesList(piggyback_def.get('sources_list', []),
+ piggyback_def.get('distribution'))
+ packages = piggyback_def['packages']
+ with_deps = piggyback_def['dependencies']
+ pgp_keys = [
+ *default_keys,
+ *piggyback_def.get('trusted_keys', [])
+ ]
+
+ download_apt_packages(
+ list=sources_list,
+ keys=pgp_keys,
+ packages=packages,
+ destination_dir=archives,
+ with_deps=with_deps
+ )
+ else:
+ archives = foreign_packages / 'apt'
+
+ for deb in archives.glob('*.deb'):
+ command = ['dpkg-deb', '-x', str(deb), str(root)]
+ try:
+ cp = run(command)
+ except FileNotFoundError:
+ raise AptError(_('couldnt_execute_dpkg_deb_is_it_installed'))
+
+ if cp.returncode != 0:
+ msg = _('dpkg_deb_command_{}_failed').format(' '.join(command))
+ raise AptError(msg, cp)
+
+ docs_dir = root / 'usr' / 'share' / 'doc'
+ copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \
+ if docs_dir.exists() else []
+ copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root)
+ for p in copyright_paths if p.exists()]
+
+ standard_depends = piggyback_def.get('depend_on_base_packages', True)
+ must_depend = [{'identifier': 'apt-common-licenses'}] \
+ if standard_depends else []
+
+ yield Piggybacked(
+ archives={'apt': archives},
+ roots={'.apt-root': root},
+ package_license_files=copyright_paths,
+ package_must_depend=must_depend
+ )
diff --git a/src/hydrilla/builder/piggybacking.py b/src/hydrilla/builder/piggybacking.py
new file mode 100644
index 0000000..799422d
--- /dev/null
+++ b/src/hydrilla/builder/piggybacking.py
@@ -0,0 +1,115 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+# Handling of software packaged for other distribution systems.
+#
+# This file is part of Hydrilla
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+This module contains definitions that may be reused by multiple piggybacked
+software system backends.
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+from pathlib import Path, PurePosixPath
+from typing import Optional, Iterable
+
+from .. import util
+from .common_errors import *
+
+here = Path(__file__).resolve().parent
+
+_ = util.translation(here / 'locales').gettext
+
+class Piggybacked:
+ """
+ Store information about foreign resources in use.
+
+ Public attributes:
+ 'package_must_depend' (read-only)
+ 'package_license_files' (read-only)
+ """
+ def __init__(self, archives: dict[str, Path]={}, roots: dict[str, Path]={},
+ package_license_files: list[PurePosixPath]=[],
+ package_must_depend: list[dict]=[]):
+ """
+ Initialize this Piggybacked object.
+
+ 'archives' maps piggybacked system names to directories that contain
+ package(s)' archive files. An 'archives' object may look like
+ {'apt': PosixPath('/path/to/dir/with/debs/and/tarballs')}.
+
+ 'roots' associates directory names to be virtually inserted under
+ Hydrilla source package directory with paths to real filesystem
+ directories that hold their desired contents, i.e. unpacked foreign
+ packages.
+
+ 'package_license_files' lists paths to license files that should be
+ included with the Haketilo package that will be produced. The paths are
+ to be resolved using 'roots' dictionary.
+
+ 'package_must_depend' lists names of Haketilo packages that the produced
+ package will additionally depend on. This is meant to help distribute
+ common licenses with a separate Haketilo package.
+ """
+ self.archives = archives
+ self.roots = roots
+ self.package_license_files = package_license_files
+ self.package_must_depend = package_must_depend
+
+ def resolve_file(self, file_ref_name: PurePosixPath) -> Optional[Path]:
+ """
+ 'file_ref_name' is a path as may appear in an index.json file. Check if
+ the file belongs to one of the roots we have and return either a path
+ to the relevant file under this root or None.
+
+ It is not being checked whether the file actually exists in the
+ filesystem.
+ """
+ parts = file_ref_name.parts
+ root_path = self.roots.get(parts and parts[0])
+ path = root_path
+ if path is None:
+ return None
+
+ for part in parts[1:]:
+ path = path / part
+
+ path = path.resolve()
+
+ if not path.is_relative_to(root_path):
+ raise FileReferenceError(_('loading_{}_outside_piggybacked_dir')
+ .format(file_ref_name))
+
+ return path
+
+ def archive_files(self) -> Iterable[tuple[PurePosixPath, Path]]:
+ """
+ Yield all archive files in use. Each yielded tuple holds file's desired
+ path relative to the piggybacked archives directory to be created and
+ its current real path.
+ """
+ for system, real_dir in self.archives.items():
+ for path in real_dir.rglob('*'):
+ yield PurePosixPath(system) / path.relative_to(real_dir), path