diff options
Diffstat (limited to 'src/hydrilla/builder')
-rw-r--r-- | src/hydrilla/builder/_version.py | 4 | ||||
-rw-r--r-- | src/hydrilla/builder/build.py | 398 | ||||
-rw-r--r-- | src/hydrilla/builder/common_errors.py | 67 | ||||
-rw-r--r-- | src/hydrilla/builder/local_apt.py | 432 | ||||
-rw-r--r-- | src/hydrilla/builder/locales/en_US/LC_MESSAGES/hydrilla-messages.po | 100 | ||||
-rw-r--r-- | src/hydrilla/builder/piggybacking.py | 117 |
6 files changed, 936 insertions, 182 deletions
diff --git a/src/hydrilla/builder/_version.py b/src/hydrilla/builder/_version.py index d953eef..2feb153 100644 --- a/src/hydrilla/builder/_version.py +++ b/src/hydrilla/builder/_version.py @@ -1,5 +1,5 @@ # coding: utf-8 # file generated by setuptools_scm # don't change, don't track in version control -version = '1.0' -version_tuple = (1, 0) +version = '1.1b1' +version_tuple = (1, '1b1') diff --git a/src/hydrilla/builder/build.py b/src/hydrilla/builder/build.py index 8eec4a4..89c1f5a 100644 --- a/src/hydrilla/builder/build.py +++ b/src/hydrilla/builder/build.py @@ -30,22 +30,27 @@ from __future__ import annotations import json import re import zipfile -from pathlib import Path +import subprocess +from pathlib import Path, PurePosixPath from hashlib import sha256 from sys import stderr +from contextlib import contextmanager +from tempfile import TemporaryDirectory, TemporaryFile +from typing import Optional, Iterable, Union import jsonschema import click from .. import util from . import _version +from . import local_apt +from .piggybacking import Piggybacked +from .common_errors import * here = Path(__file__).resolve().parent _ = util.translation(here / 'locales').gettext -index_validator = util.validator_for('package_source-1.0.1.schema.json') - schemas_root = 'https://hydrilla.koszko.org/schemas' generated_by = { @@ -53,227 +58,233 @@ generated_by = { 'version': _version.version } -class FileReferenceError(Exception): - """ - Exception used to report various problems concerning files referenced from - source package's index.json. - """ - -class ReuseError(Exception): +class ReuseError(SubprocessError): """ Exception used to report various problems when calling the REUSE tool. """ -class FileBuffer: - """ - Implement a file-like object that buffers data written to it. - """ - def __init__(self): - """ - Initialize FileBuffer. - """ - self.chunks = [] - - def write(self, b): - """ - Buffer 'b', return number of bytes buffered. - - 'b' is expected to be an instance of 'bytes' or 'str', in which case it - gets encoded as UTF-8. - """ - if type(b) is str: - b = b.encode() - self.chunks.append(b) - return len(b) - - def flush(self): - """ - A no-op mock of file-like object's flush() method. - """ - pass - - def get_bytes(self): - """ - Return all data written so far concatenated into a single 'bytes' - object. - """ - return b''.join(self.chunks) - -def generate_spdx_report(root): +def generate_spdx_report(root: Path) -> bytes: """ Use REUSE tool to generate an SPDX report for sources under 'root' and return the report's contents as 'bytes'. - 'root' shall be an instance of pathlib.Path. - In case the directory tree under 'root' does not constitute a - REUSE-compliant package, linting report is printed to standard output and - an exception is raised. + REUSE-compliant package, as exception is raised with linting report + included in it. - In case the reuse package is not installed, an exception is also raised. + In case the reuse tool is not installed, an exception is also raised. """ - try: - from reuse._main import main as reuse_main - except ModuleNotFoundError: - raise ReuseError(_('couldnt_import_reuse_is_it_installed')) - - mocked_output = FileBuffer() - if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0: - stderr.write(mocked_output.get_bytes().decode()) - raise ReuseError(_('spdx_report_from_reuse_incompliant')) - - mocked_output = FileBuffer() - if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0: - stderr.write(mocked_output.get_bytes().decode()) - raise ReuseError("Couldn't generate an SPDX report for package.") - - return mocked_output.get_bytes() + for command in [ + ['reuse', '--root', str(root), 'lint'], + ['reuse', '--root', str(root), 'spdx'] + ]: + try: + cp = subprocess.run(command, capture_output=True, text=True) + except FileNotFoundError: + msg = _('couldnt_execute_{}_is_it_installed').format('reuse') + raise ReuseError(msg) + + if cp.returncode != 0: + msg = _('command_{}_failed').format(' '.join(command)) + raise ReuseError(msg, cp) + + return cp.stdout.encode() class FileRef: """Represent reference to a file in the package.""" - def __init__(self, path: Path, contents: bytes): + def __init__(self, path: PurePosixPath, contents: bytes) -> None: """Initialize FileRef.""" - self.include_in_distribution = False - self.include_in_zipfile = True - self.path = path - self.contents = contents + self.include_in_distribution = False + self.include_in_source_archive = True + self.path = path + self.contents = contents self.contents_hash = sha256(contents).digest().hex() - def make_ref_dict(self, filename: str): + def make_ref_dict(self) -> dict[str, str]: """ Represent the file reference through a dict that can be included in JSON defintions. """ return { - 'file': filename, + 'file': str(self.path), 'sha256': self.contents_hash } +@contextmanager +def piggybacked_system(piggyback_def: Optional[dict], + piggyback_files: Optional[Path]) \ + -> Iterable[Piggybacked]: + """ + Resolve resources from a foreign software packaging system. Optionally, use + package files (.deb's, etc.) from a specified directory instead of resolving + and downloading them. + """ + if piggyback_def is None: + yield Piggybacked() + else: + # apt is the only supported system right now + assert piggyback_def['system'] == 'apt' + + with local_apt.piggybacked_system(piggyback_def, piggyback_files) \ + as piggybacked: + yield piggybacked + class Build: """ Build a Hydrilla package. """ - def __init__(self, srcdir, index_json_path): + def __init__(self, srcdir: Path, index_json_path: Path, + piggyback_files: Optional[Path]=None): """ Initialize a build. All files to be included in a distribution package are loaded into memory, all data gets validated and all necessary computations (e.g. preparing of hashes) are performed. - - 'srcdir' and 'index_json' are expected to be pathlib.Path objects. """ self.srcdir = srcdir.resolve() - self.index_json_path = index_json_path + self.piggyback_files = piggyback_files + if piggyback_files is None: + piggyback_default_path = \ + srcdir.parent / f'{srcdir.name}.foreign-packages' + if piggyback_default_path.exists(): + self.piggyback_files = piggyback_default_path self.files_by_path = {} self.resource_list = [] self.mapping_list = [] if not index_json_path.is_absolute(): - self.index_json_path = (self.srcdir / self.index_json_path) + index_json_path = (self.srcdir / index_json_path) - self.index_json_path = self.index_json_path.resolve() + index_obj, major = util.load_instance_from_file(index_json_path) - with open(self.index_json_path, 'rt') as index_file: - index_json_text = index_file.read() + if major not in (1, 2): + msg = _('unknown_schema_package_source_{}')\ + .format(index_json_path) + raise util.UnknownSchemaError(msg) - index_obj = json.loads(util.strip_json_comments(index_json_text)) + index_desired_path = PurePosixPath('index.json') + self.files_by_path[index_desired_path] = \ + FileRef(index_desired_path, index_json_path.read_bytes()) - self.files_by_path[self.srcdir / 'index.json'] = \ - FileRef(self.srcdir / 'index.json', index_json_text.encode()) + self._process_index_json(index_obj, major) - self._process_index_json(index_obj) - - def _process_file(self, filename: str, include_in_distribution: bool=True): + def _process_file(self, filename: Union[str, PurePosixPath], + piggybacked: Piggybacked, + include_in_distribution: bool=True): """ Resolve 'filename' relative to srcdir, load it to memory (if not loaded before), compute its hash and store its information in 'self.files_by_path'. - 'filename' shall represent a relative path using '/' as a separator. + 'filename' shall represent a relative path withing package directory. if 'include_in_distribution' is True it shall cause the file to not only be included in the source package's zipfile, but also written as one of built package's files. + For each file an attempt is made to resolve it using 'piggybacked' + object. If a file is found and pulled from foreign software packaging + system this way, it gets automatically excluded from inclusion in + Hydrilla source package's zipfile. + Return file's reference object that can be included in JSON defintions of various kinds. """ - path = self.srcdir - for segment in filename.split('/'): - path /= segment - - path = path.resolve() - if not path.is_relative_to(self.srcdir): - raise FileReferenceError(_('loading_{}_outside_package_dir') - .format(filename)) - - if str(path.relative_to(self.srcdir)) == 'index.json': - raise FileReferenceError(_('loading_reserved_index_json')) + include_in_source_archive = True + + desired_path = PurePosixPath(filename) + if '..' in desired_path.parts: + msg = _('path_contains_double_dot_{}').format(filename) + raise FileReferenceError(msg) + + path = piggybacked.resolve_file(desired_path) + if path is None: + path = (self.srcdir / desired_path).resolve() + try: + rel_path = path.relative_to(self.srcdir) + except ValueError: + raise FileReferenceError(_('loading_{}_outside_package_dir') + .format(filename)) + + if str(rel_path) == 'index.json': + raise FileReferenceError(_('loading_reserved_index_json')) + else: + include_in_source_archive = False - file_ref = self.files_by_path.get(path) + file_ref = self.files_by_path.get(desired_path) if file_ref is None: - with open(path, 'rb') as file_handle: - contents = file_handle.read() + if not path.is_file(): + msg = _('referenced_file_{}_missing').format(desired_path) + raise FileReferenceError(msg) - file_ref = FileRef(path, contents) - self.files_by_path[path] = file_ref + file_ref = FileRef(desired_path, path.read_bytes()) + self.files_by_path[desired_path] = file_ref if include_in_distribution: file_ref.include_in_distribution = True - return file_ref.make_ref_dict(filename) + if not include_in_source_archive: + file_ref.include_in_source_archive = False + + return file_ref.make_ref_dict() - def _prepare_source_package_zip(self, root_dir_name: str): + def _prepare_source_package_zip(self, source_name: str, + piggybacked: Piggybacked) -> str: """ Create and store in memory a .zip archive containing files needed to build this source package. - 'root_dir_name' shall not contain any slashes ('/'). + 'src_dir_name' shall not contain any slashes ('/'). Return zipfile's sha256 sum's hexstring. """ - fb = FileBuffer() - root_dir_path = Path(root_dir_name) + tf = TemporaryFile() + source_dir_path = PurePosixPath(source_name) + piggybacked_dir_path = PurePosixPath(f'{source_name}.foreign-packages') - def zippath(file_path): - file_path = root_dir_path / file_path.relative_to(self.srcdir) - return file_path.as_posix() - - with zipfile.ZipFile(fb, 'w') as xpi: + with zipfile.ZipFile(tf, 'w') as zf: for file_ref in self.files_by_path.values(): - if file_ref.include_in_zipfile: - xpi.writestr(zippath(file_ref.path), file_ref.contents) + if file_ref.include_in_source_archive: + zf.writestr(str(source_dir_path / file_ref.path), + file_ref.contents) + + for desired_path, real_path in piggybacked.archive_files(): + zf.writestr(str(piggybacked_dir_path / desired_path), + real_path.read_bytes()) - self.source_zip_contents = fb.get_bytes() + tf.seek(0) + self.source_zip_contents = tf.read() return sha256(self.source_zip_contents).digest().hex() - def _process_item(self, item_def: dict): + def _process_item(self, as_what: str, item_def: dict, + piggybacked: Piggybacked): """ - Process 'item_def' as definition of a resource/mapping and store in - memory its processed form and files used by it. + Process 'item_def' as definition of a resource or mapping (determined by + 'as_what' param) and store in memory its processed form and files used + by it. Return a minimal item reference suitable for using in source description. """ - copy_props = ['type', 'identifier', 'long_name', 'description'] - for prop in ('comment', 'uuid'): - if prop in item_def: - copy_props.append(prop) + resulting_schema_version = [1] + + copy_props = ['identifier', 'long_name', 'description', + *filter(lambda p: p in item_def, ('comment', 'uuid'))] - if item_def['type'] == 'resource': + if as_what == 'resource': item_list = self.resource_list copy_props.append('revision') - script_file_refs = [self._process_file(f['file']) + script_file_refs = [self._process_file(f['file'], piggybacked) for f in item_def.get('scripts', [])] deps = [{'identifier': res_ref['identifier']} for res_ref in item_def.get('dependencies', [])] new_item_obj = { - 'dependencies': deps, + 'dependencies': [*piggybacked.resource_must_depend, *deps], 'scripts': script_file_refs } else: @@ -287,62 +298,126 @@ class Build: 'payloads': payloads } - new_item_obj.update([(p, item_def[p]) for p in copy_props]) - new_item_obj['version'] = util.normalize_version(item_def['version']) - new_item_obj['$schema'] = f'{schemas_root}/api_{item_def["type"]}_description-1.schema.json' + + if as_what == 'mapping' and item_def['type'] == "mapping_and_resource": + new_item_obj['version'].append(item_def['revision']) + + if self.source_schema_ver >= [2]: + # handle 'required_mappings' field + required = [{'identifier': map_ref['identifier']} + for map_ref in item_def.get('required_mappings', [])] + if required: + resulting_schema_version = max(resulting_schema_version, [2]) + new_item_obj['required_mappings'] = required + + # handle 'permissions' field + permissions = item_def.get('permissions', {}) + processed_permissions = {} + + if permissions.get('cors_bypass'): + processed_permissions['cors_bypass'] = True + if permissions.get('eval'): + processed_permissions['eval'] = True + + if processed_permissions: + new_item_obj['permissions'] = processed_permissions + resulting_schema_version = max(resulting_schema_version, [2]) + + # handle '{min,max}_haketilo_version' fields + for minmax, default in ('min', [1]), ('max', [65536]): + constraint = item_def.get(f'{minmax}_haketilo_version') + if constraint in (None, default): + continue + + copy_props.append(f'{minmax}_haketilo_version') + resulting_schema_version = max(resulting_schema_version, [2]) + + new_item_obj.update((p, item_def[p]) for p in copy_props) + + new_item_obj['$schema'] = ''.join([ + schemas_root, + f'/api_{as_what}_description', + '-', + util.version_string(resulting_schema_version), + '.schema.json' + ]) + new_item_obj['type'] = as_what new_item_obj['source_copyright'] = self.copyright_file_refs - new_item_obj['source_name'] = self.source_name - new_item_obj['generated_by'] = generated_by + new_item_obj['source_name'] = self.source_name + new_item_obj['generated_by'] = generated_by item_list.append(new_item_obj) props_in_ref = ('type', 'identifier', 'version', 'long_name') return dict([(prop, new_item_obj[prop]) for prop in props_in_ref]) - def _process_index_json(self, index_obj: dict): + def _process_index_json(self, index_obj: dict, + major_schema_version: int) -> None: """ Process 'index_obj' as contents of source package's index.json and store in memory this source package's zipfile as well as package's individual files and computed definitions of the source package and items defined in it. """ - index_validator.validate(index_obj) + schema_name = f'package_source-{major_schema_version}.schema.json'; + + util.validator_for(schema_name).validate(index_obj) - schema = f'{schemas_root}/api_source_description-1.schema.json' + match = re.match(r'.*-((([1-9][0-9]*|0)\.)+)schema\.json$', + index_obj['$schema']) + self.source_schema_ver = \ + [int(n) for n in filter(None, match.group(1).split('.'))] + + out_schema = f'{schemas_root}/api_source_description-1.schema.json' self.source_name = index_obj['source_name'] generate_spdx = index_obj.get('reuse_generate_spdx_report', False) if generate_spdx: contents = generate_spdx_report(self.srcdir) - spdx_path = (self.srcdir / 'report.spdx').resolve() + spdx_path = PurePosixPath('report.spdx') spdx_ref = FileRef(spdx_path, contents) - spdx_ref.include_in_zipfile = False + spdx_ref.include_in_source_archive = False self.files_by_path[spdx_path] = spdx_ref - self.copyright_file_refs = \ - [self._process_file(f['file']) for f in index_obj['copyright']] + piggyback_def = None + if self.source_schema_ver >= [1, 1] and 'piggyback_on' in index_obj: + piggyback_def = index_obj['piggyback_on'] - if generate_spdx and not spdx_ref.include_in_distribution: - raise FileReferenceError(_('report_spdx_not_in_copyright_list')) + with piggybacked_system(piggyback_def, self.piggyback_files) \ + as piggybacked: + copyright_to_process = [ + *(file_ref['file'] for file_ref in index_obj['copyright']), + *piggybacked.package_license_files + ] + self.copyright_file_refs = [self._process_file(f, piggybacked) + for f in copyright_to_process] - item_refs = [self._process_item(d) for d in index_obj['definitions']] + if generate_spdx and not spdx_ref.include_in_distribution: + raise FileReferenceError(_('report_spdx_not_in_copyright_list')) - for file_ref in index_obj.get('additional_files', []): - self._process_file(file_ref['file'], include_in_distribution=False) + item_refs = [] + for item_def in index_obj['definitions']: + if 'mapping' in item_def['type']: + ref = self._process_item('mapping', item_def, piggybacked) + item_refs.append(ref) + if 'resource' in item_def['type']: + ref = self._process_item('resource', item_def, piggybacked) + item_refs.append(ref) - root_dir_path = Path(self.source_name) + for file_ref in index_obj.get('additional_files', []): + self._process_file(file_ref['file'], piggybacked, + include_in_distribution=False) - source_archives_obj = { - 'zip' : { - 'sha256': self._prepare_source_package_zip(root_dir_path) - } - } + zipfile_sha256 = self._prepare_source_package_zip\ + (self.source_name, piggybacked) + + source_archives_obj = {'zip' : {'sha256': zipfile_sha256}} self.source_description = { - '$schema': schema, + '$schema': out_schema, 'source_name': self.source_name, 'source_copyright': self.copyright_file_refs, 'upstream_url': index_obj['upstream_url'], @@ -398,20 +473,25 @@ class Build: dir_type = click.Path(exists=True, file_okay=False, resolve_path=True) +@click.command(help=_('build_package_from_srcdir_to_dstdir')) @click.option('-s', '--srcdir', default='./', type=dir_type, show_default=True, help=_('source_directory_to_build_from')) @click.option('-i', '--index-json', default='index.json', type=click.Path(), help=_('path_instead_of_index_json')) +@click.option('-p', '--piggyback-files', type=click.Path(), + help=_('path_instead_for_piggyback_files')) @click.option('-d', '--dstdir', type=dir_type, required=True, help=_('built_package_files_destination')) @click.version_option(version=_version.version, prog_name='Hydrilla builder', message=_('%(prog)s_%(version)s_license'), help=_('version_printing')) -def perform(srcdir, index_json, dstdir): - """<this will be replaced by a localized docstring for Click to pick up>""" - build = Build(Path(srcdir), Path(index_json)) - build.write_package_files(Path(dstdir)) - -perform.__doc__ = _('build_package_from_srcdir_to_dstdir') +def perform(srcdir, index_json, piggyback_files, dstdir): + """ + Execute Hydrilla builder to turn source package into a distributable one. -perform = click.command()(perform) + This command is meant to be the entry point of hydrilla-builder command + exported by this package. + """ + build = Build(Path(srcdir), Path(index_json), + piggyback_files and Path(piggyback_files)) + build.write_package_files(Path(dstdir)) diff --git a/src/hydrilla/builder/common_errors.py b/src/hydrilla/builder/common_errors.py new file mode 100644 index 0000000..29782e1 --- /dev/null +++ b/src/hydrilla/builder/common_errors.py @@ -0,0 +1,67 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Error classes. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module defines error types for use in other parts of Hydrilla builder. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +from pathlib import Path + +from .. import util + +here = Path(__file__).resolve().parent + +_ = util.translation(here / 'locales').gettext + +class DistroError(Exception): + """ + Exception used to report problems when resolving an OS distribution. + """ + +class FileReferenceError(Exception): + """ + Exception used to report various problems concerning files referenced from + source package. + """ + +class SubprocessError(Exception): + """ + Exception used to report problems related to execution of external + processes, includes. various problems when calling apt-* and dpkg-* + commands. + """ + def __init__(self, msg: str, cp: Optional[CP]=None) -> None: + """Initialize this SubprocessError""" + if cp and cp.stdout: + msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout]) + + if cp and cp.stderr: + msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr]) + + super().__init__(msg) diff --git a/src/hydrilla/builder/local_apt.py b/src/hydrilla/builder/local_apt.py new file mode 100644 index 0000000..0301da2 --- /dev/null +++ b/src/hydrilla/builder/local_apt.py @@ -0,0 +1,432 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Using a local APT. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +# Enable using with Python 3.7. +from __future__ import annotations + +import zipfile +import shutil +import re +import subprocess +CP = subprocess.CompletedProcess +from pathlib import Path, PurePosixPath +from tempfile import TemporaryDirectory, NamedTemporaryFile +from hashlib import sha256 +from urllib.parse import unquote +from contextlib import contextmanager +from typing import Optional, Iterable + +from .. import util +from .piggybacking import Piggybacked +from .common_errors import * + +here = Path(__file__).resolve().parent + +_ = util.translation(here / 'locales').gettext + +""" +Default cache directory to save APT configurations and downloaded GPG keys in. +""" +default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt' + +""" +Default keyserver to use. +""" +default_keyserver = 'hkps://keyserver.ubuntu.com:443' + +""" +Default keys to download when using a local APT. +""" +default_keys = [ + # Trisquel + 'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1', + '60364C9869F92450421F0C22B138CA450C05112F', + # Ubuntu + '630239CC130E1A7FD81A27B140976EAF437D05B5', + '790BC7277767219C42C86F933B4FE6ACC0B21F32', + 'F6ECB3762474EDA9D21B7022871920D1991BC93C', + # Debian + '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517', + '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE', + 'AC530D520F2F3269F5E98313A48449044AAD5C5D' +] + +"""sources.list file contents for known distros.""" +default_lists = { + 'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main' + for type in ('deb', 'deb-src') + for suf in ('', '-updates', '-security')] +} + +class GpgError(Exception): + """ + Exception used to report various problems when calling GPG. + """ + +class AptError(SubprocessError): + """ + Exception used to report various problems when calling apt-* and dpkg-* + commands. + """ + +def run(command, **kwargs): + """A wrapped around subprocess.run that sets some default options.""" + return subprocess.run(command, **kwargs, env={'LANG': 'en_US'}, + capture_output=True, text=True) + +class Apt: + """ + This class represents an APT instance and can be used to call apt-get + commands with it. + """ + def __init__(self, apt_conf: str) -> None: + """Initialize this Apt object.""" + self.apt_conf = apt_conf + + def get(self, *args: str, **kwargs) -> CP: + """ + Run apt-get with the specified arguments and raise a meaningful AptError + when something goes wrong. + """ + command = ['apt-get', '-c', self.apt_conf, *args] + try: + cp = run(command, **kwargs) + except FileNotFoundError: + msg = _('couldnt_execute_{}_is_it_installed').format('apt-get') + raise AptError(msg) + + if cp.returncode != 0: + msg = _('command_{}_failed').format(' '.join(command)) + raise AptError(msg, cp) + + return cp + +def cache_dir() -> Path: + """ + Return the directory used to cache data (APT configurations, keyrings) to + speed up repeated operations. + + This function first ensures the directory exists. + """ + default_apt_cache_dir.mkdir(parents=True, exist_ok=True) + return default_apt_cache_dir + +class SourcesList: + """Representation of apt's sources.list contents.""" + def __init__(self, list: [str]=[], codename: Optional[str]=None) -> None: + """Initialize this SourcesList.""" + self.codename = None + self.list = [*list] + self.has_extra_entries = bool(self.list) + + if codename is not None: + if codename not in default_lists: + raise DistroError(_('distro_{}_unknown').format(codename)) + + self.codename = codename + self.list.extend(default_lists[codename]) + + def identity(self) -> str: + """ + Produce a string that uniquely identifies this sources.list contents. + """ + if self.codename and not self.has_extra_entries: + return self.codename + + return sha256('\n'.join(sorted(self.list)).encode()).digest().hex() + +def apt_conf(directory: Path) -> str: + """ + Given local APT's directory, produce a configuration suitable for running + APT there. + + 'directory' must not contain any special characters including quotes and + spaces. + """ + return f''' +Architecture "amd64"; +Dir "{directory}"; +Dir::State "{directory}/var/lib/apt"; +Dir::State::status "{directory}/var/lib/dpkg/status"; +Dir::Etc::SourceList "{directory}/etc/apt.sources.list"; +Dir::Etc::SourceParts ""; +Dir::Cache "{directory}/var/cache/apt"; +pkgCacheGen::Essential "none"; +Dir::Etc::Trusted "{directory}/etc/trusted.gpg"; +''' + +def apt_keyring(keys: [str]) -> bytes: + """ + Download the requested keys if necessary and export them as a keyring + suitable for passing to APT. + + The keyring is returned as a bytes value that should be written to a file. + """ + try: + from gnupg import GPG + except ModuleNotFoundError: + raise GpgError(_('couldnt_import_{}_is_it_installed').format('gnupg')) + + gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg')) + for key in keys: + if gpg.list_keys(keys=[key]) != []: + continue + + if gpg.recv_keys(default_keyserver, key).imported == 0: + raise GpgError(_('gpg_couldnt_recv_key_{}').format(key)) + + return gpg.export_keys(keys, armor=False, minimal=True) + +def cache_apt_root(apt_root: Path, destination_zip: Path) -> None: + """ + Zip an APT root directory for later use and move the zipfile to the + requested destination. + """ + temporary_zip_path = None + try: + tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_', + dir=cache_dir(), delete=False) + temporary_zip_path = Path(tmpfile.name) + + to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'} + + with zipfile.ZipFile(tmpfile, 'w') as zf: + for member in apt_root.rglob('*'): + relative = member.relative_to(apt_root) + if relative not in to_skip: + # This call will also properly add empty folders to zip file + zf.write(member, relative, zipfile.ZIP_DEFLATED) + + shutil.move(temporary_zip_path, destination_zip) + finally: + if temporary_zip_path is not None and temporary_zip_path.exists(): + temporary_zip_path.unlink() + +def setup_local_apt(directory: Path, list: SourcesList, keys: [str]) -> Apt: + """ + Create files and directories necessary for running APT without root rights + inside 'directory'. + + 'directory' must not contain any special characters including quotes and + spaces and must be empty. + + Return an Apt object that can be used to call apt-get commands. + """ + apt_root = directory / 'apt_root' + + conf_text = apt_conf(apt_root) + keyring_bytes = apt_keyring(keys) + + apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip' + if apt_zipfile.exists(): + with zipfile.ZipFile(apt_zipfile) as zf: + zf.extractall(apt_root) + + for to_create in ( + apt_root / 'var' / 'lib' / 'apt' / 'partial', + apt_root / 'var' / 'lib' / 'apt' / 'lists', + apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial', + apt_root / 'etc' / 'apt' / 'preferences.d', + apt_root / 'var' / 'lib' / 'dpkg', + apt_root / 'var' / 'log' / 'apt' + ): + to_create.mkdir(parents=True, exist_ok=True) + + conf_path = apt_root / 'etc' / 'apt.conf' + trusted_path = apt_root / 'etc' / 'trusted.gpg' + status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status' + list_path = apt_root / 'etc' / 'apt.sources.list' + + conf_path.write_text(conf_text) + trusted_path.write_bytes(keyring_bytes) + status_path.touch() + list_path.write_text('\n'.join(list.list)) + + apt = Apt(str(conf_path)) + apt.get('update') + + cache_apt_root(apt_root, apt_zipfile) + + return apt + +@contextmanager +def local_apt(list: SourcesList, keys: [str]) -> Iterable[Apt]: + """ + Create a temporary directory with proper local APT configuration in it. + Yield an Apt object that can be used to issue apt-get commands. + + This function returns a context manager that will remove the directory on + close. + """ + with TemporaryDirectory() as td: + td = Path(td) + yield setup_local_apt(td, list, keys) + +def download_apt_packages(list: SourcesList, keys: [str], packages: [str], + destination_dir: Path, with_deps: bool) -> [str]: + """ + Set up a local APT, update it using the specified sources.list configuration + and use it to download the specified packages. + + This function downloads .deb files of packages matching the amd64 + architecture (which includes packages with architecture 'all') as well as + all their corresponding source package files and (if requested) the debs + and source files of all their declared dependencies. + + Return value is a list of names of all downloaded files. + """ + install_line_regex = re.compile(r'^Inst (?P<name>\S+) \((?P<version>\S+) ') + + with local_apt(list, keys) as apt: + if with_deps: + cp = apt.get('install', '--yes', '--just-print', *packages) + + lines = cp.stdout.split('\n') + matches = [install_line_regex.match(l) for l in lines] + packages = [f'{m.group("name")}={m.group("version")}' + for m in matches if m] + + if not packages: + raise AptError(_('apt_install_output_not_understood'), cp) + + # Download .debs to indirectly to destination_dir by first placing them + # in a temporary subdirectory. + with TemporaryDirectory(dir=destination_dir) as td: + td = Path(td) + cp = apt.get('download', *packages, cwd=td) + + deb_name_regex = re.compile( + r''' + ^ + (?P<name>[^_]+) + _ + (?P<ver>[^_]+) + _ + .+ # architecture (or 'all') + \.deb + $ + ''', + re.VERBOSE) + + names_vers = [] + downloaded = [] + for deb_file in td.iterdir(): + match = deb_name_regex.match(deb_file.name) + if match is None: + msg = _('apt_download_gave_bad_filename_{}')\ + .format(deb_file.name) + raise AptError(msg, cp) + + names_vers.append(( + unquote(match.group('name')), + unquote(match.group('ver')) + )) + downloaded.append(deb_file.name) + + apt.get('source', '--download-only', + *[f'{n}={v}' for n, v in names_vers], cwd=td) + + for source_file in td.iterdir(): + if source_file.name in downloaded: + continue + + downloaded.append(source_file.name) + + for filename in downloaded: + shutil.move(td / filename, destination_dir / filename) + + return downloaded + +@contextmanager +def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \ + -> Iterable[Piggybacked]: + """ + Resolve resources from APT. Optionally, use package files (.deb's, etc.) + from a specified directory instead of resolving and downloading them. + + The directories and files created for the yielded Piggybacked object shall + be deleted when this context manager gets closed. + """ + assert piggyback_def['system'] == 'apt' + + with TemporaryDirectory() as td: + td = Path(td) + root = td / 'root' + root.mkdir() + + if foreign_packages is None: + archives = td / 'archives' + archives.mkdir() + else: + archives = foreign_packages / 'apt' + archives.mkdir(exist_ok=True) + + if [*archives.glob('*.deb')] == []: + sources_list = SourcesList(piggyback_def.get('sources_list', []), + piggyback_def.get('distribution')) + packages = piggyback_def['packages'] + with_deps = piggyback_def['dependencies'] + pgp_keys = [ + *default_keys, + *piggyback_def.get('trusted_keys', []) + ] + + download_apt_packages( + list=sources_list, + keys=pgp_keys, + packages=packages, + destination_dir=archives, + with_deps=with_deps + ) + + for deb in archives.glob('*.deb'): + command = ['dpkg-deb', '-x', str(deb), str(root)] + try: + cp = run(command) + except FileNotFoundError: + msg = _('couldnt_execute_{}_is_it_installed'.format('dpkg-deb')) + raise AptError(msg) + + if cp.returncode != 0: + msg = _('command_{}_failed').format(' '.join(command)) + raise AptError(msg, cp) + + docs_dir = root / 'usr' / 'share' / 'doc' + copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \ + if docs_dir.exists() else [] + copyright_paths = [PurePosixPath('.apt-root') / p.relative_to(root) + for p in copyright_paths if p.exists()] + + standard_depends = piggyback_def.get('depend_on_base_packages', True) + must_depend = [{'identifier': 'apt-common-licenses'}] \ + if standard_depends else [] + + yield Piggybacked( + archives={'apt': archives}, + roots={'.apt-root': root}, + package_license_files=copyright_paths, + resource_must_depend=must_depend + ) diff --git a/src/hydrilla/builder/locales/en_US/LC_MESSAGES/hydrilla-messages.po b/src/hydrilla/builder/locales/en_US/LC_MESSAGES/hydrilla-messages.po index e3ab525..821f74b 100644 --- a/src/hydrilla/builder/locales/en_US/LC_MESSAGES/hydrilla-messages.po +++ b/src/hydrilla/builder/locales/en_US/LC_MESSAGES/hydrilla-messages.po @@ -7,7 +7,7 @@ msgid "" msgstr "" "Project-Id-Version: hydrilla.builder 0.1.dev16+g4e46d7f.d20220211\n" "Report-Msgid-Bugs-To: koszko@koszko.org\n" -"POT-Creation-Date: 2022-04-19 13:51+0200\n" +"POT-Creation-Date: 2022-05-27 18:49+0200\n" "PO-Revision-Date: 2022-02-12 00:00+0000\n" "Last-Translator: Wojtek Kosior <koszko@koszko.org>\n" "Language: en_US\n" @@ -18,45 +18,73 @@ msgstr "" "Content-Transfer-Encoding: 8bit\n" "Generated-By: Babel 2.8.0\n" -#: src/hydrilla/builder/build.py:118 -msgid "couldnt_import_reuse_is_it_installed" -msgstr "" -"Could not import 'reuse'. Is the tool installed and visible to this " -"Python instance?" +#: src/hydrilla/builder/build.py:93 src/hydrilla/builder/local_apt.py:118 +#: src/hydrilla/builder/local_apt.py:410 +msgid "couldnt_execute_{}_is_it_installed" +msgstr "Could not execute '{}'. Is the tool installed and reachable via PATH?" + +#: src/hydrilla/builder/build.py:97 src/hydrilla/builder/local_apt.py:122 +#: src/hydrilla/builder/local_apt.py:414 +msgid "command_{}_failed" +msgstr "The following command finished execution with a non-zero exit status: {}" -#: src/hydrilla/builder/build.py:123 -msgid "spdx_report_from_reuse_incompliant" -msgstr "Attempt to generate an SPDX report for a REUSE-incompliant package." +#: src/hydrilla/builder/build.py:171 +msgid "unknown_schema_package_source_{}" +msgstr "" +"The provided JSON at '{}' does not use any of the known package source " +"JSON schemas." #: src/hydrilla/builder/build.py:207 +msgid "path_contains_double_dot_{}" +msgstr "" +"Attempt to load '{}' which includes a forbidden parent reference ('..') " +"in the path." + +#: src/hydrilla/builder/build.py:214 msgid "loading_{}_outside_package_dir" msgstr "Attempt to load '{}' which lies outside package source directory." -#: src/hydrilla/builder/build.py:211 +#: src/hydrilla/builder/build.py:218 msgid "loading_reserved_index_json" msgstr "Attempt to load 'index.json' which is a reserved filename." -#: src/hydrilla/builder/build.py:329 +#: src/hydrilla/builder/build.py:225 +msgid "referenced_file_{}_missing" +msgstr "Referenced file '{}' is missing." + +#: src/hydrilla/builder/build.py:362 msgid "report_spdx_not_in_copyright_list" msgstr "" "Told to generate 'report.spdx' but 'report.spdx' is not listed among " "copyright files. Refusing to proceed." -#: src/hydrilla/builder/build.py:402 +#: src/hydrilla/builder/build.py:433 +msgid "build_package_from_srcdir_to_dstdir" +msgstr "" +"Build Hydrilla package from `scrdir` and write the resulting files under " +"`dstdir`." + +#: src/hydrilla/builder/build.py:435 msgid "source_directory_to_build_from" msgstr "Source directory to build from." -#: src/hydrilla/builder/build.py:404 +#: src/hydrilla/builder/build.py:437 msgid "path_instead_of_index_json" msgstr "" "Path to file to be processed instead of index.json (if not absolute, " "resolved relative to srcdir)." -#: src/hydrilla/builder/build.py:406 +#: src/hydrilla/builder/build.py:439 +msgid "path_instead_for_piggyback_files" +msgstr "" +"Path to a non-standard directory with foreign packages' archive files to " +"use." + +#: src/hydrilla/builder/build.py:441 msgid "built_package_files_destination" msgstr "Destination directory to write built package files to." -#: src/hydrilla/builder/build.py:408 +#: src/hydrilla/builder/build.py:443 #, python-format msgid "%(prog)s_%(version)s_license" msgstr "" @@ -67,17 +95,47 @@ msgstr "" "This is free software: you are free to change and redistribute it.\n" "There is NO WARRANTY, to the extent permitted by law." -#: src/hydrilla/builder/build.py:409 +#: src/hydrilla/builder/build.py:444 msgid "version_printing" msgstr "Print version information and exit." -#: src/hydrilla/builder/build.py:415 -msgid "build_package_from_srcdir_to_dstdir" +#: src/hydrilla/builder/common_errors.py:62 +msgid "STDOUT_OUTPUT_heading" +msgstr "## Command's standard output ##" + +#: src/hydrilla/builder/common_errors.py:65 +msgid "STDERR_OUTPUT_heading" +msgstr "## Command's standard error output ##" + +#: src/hydrilla/builder/local_apt.py:147 +msgid "distro_{}_unknown" +msgstr "Attempt to use an unknown software distribution '{}'." + +#: src/hydrilla/builder/local_apt.py:191 +msgid "couldnt_import_{}_is_it_installed" msgstr "" -"Build Hydrilla package from `scrdir` and write the resulting files under " -"`dstdir`." +"Could not import '{}'. Is the module installed and visible to this Python" +" instance?" + +#: src/hydrilla/builder/local_apt.py:199 +msgid "gpg_couldnt_recv_key_{}" +msgstr "Could not import PGP key '{}'." + +#: src/hydrilla/builder/local_apt.py:313 +msgid "apt_install_output_not_understood" +msgstr "The output of an 'apt-get install' command was not understood." + +#: src/hydrilla/builder/local_apt.py:339 +msgid "apt_download_gave_bad_filename_{}" +msgstr "The 'apt-get download' command produced a file with unexpected name '{}'." + +#: src/hydrilla/builder/piggybacking.py:102 +msgid "loading_{}_outside_piggybacked_dir" +msgstr "" +"Attempt to load '{}' which lies outside piggybacked packages files root " +"directory." -#: src/hydrilla/util/_util.py:79 +#: src/hydrilla/util/_util.py:86 msgid "bad_comment" msgstr "bad comment" diff --git a/src/hydrilla/builder/piggybacking.py b/src/hydrilla/builder/piggybacking.py new file mode 100644 index 0000000..00186bc --- /dev/null +++ b/src/hydrilla/builder/piggybacking.py @@ -0,0 +1,117 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Handling of software packaged for other distribution systems. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module contains definitions that may be reused by multiple piggybacked +software system backends. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +from pathlib import Path, PurePosixPath +from typing import Optional, Iterable + +from .. import util +from .common_errors import * + +here = Path(__file__).resolve().parent + +_ = util.translation(here / 'locales').gettext + +class Piggybacked: + """ + Store information about foreign resources in use. + + Public attributes: + 'resource_must_depend' (read-only) + 'package_license_files' (read-only) + """ + def __init__(self, archives: dict[str, Path]={}, roots: dict[str, Path]={}, + package_license_files: list[PurePosixPath]=[], + resource_must_depend: list[dict]=[]): + """ + Initialize this Piggybacked object. + + 'archives' maps piggybacked system names to directories that contain + package(s)' archive files. An 'archives' object may look like + {'apt': PosixPath('/path/to/dir/with/debs/and/tarballs')}. + + 'roots' associates directory names to be virtually inserted under + Hydrilla source package directory with paths to real filesystem + directories that hold their desired contents, i.e. unpacked foreign + packages. + + 'package_license_files' lists paths to license files that should be + included with the Haketilo package that will be produced. The paths are + to be resolved using 'roots' dictionary. + + 'resource_must_depend' lists names of Haketilo packages that the + produced resources will additionally depend on. This is meant to help + distribute common licenses with a separate Haketilo package. + """ + self.archives = archives + self.roots = roots + self.package_license_files = package_license_files + self.resource_must_depend = resource_must_depend + + def resolve_file(self, file_ref_name: PurePosixPath) -> Optional[Path]: + """ + 'file_ref_name' is a path as may appear in an index.json file. Check if + the file belongs to one of the roots we have and return either a path + to the relevant file under this root or None. + + It is not being checked whether the file actually exists in the + filesystem. + """ + parts = file_ref_name.parts + root_path = self.roots.get(parts and parts[0]) + path = root_path + if path is None: + return None + + for part in parts[1:]: + path = path / part + + path = path.resolve() + + try: + path.relative_to(root_path) + except ValueError: + raise FileReferenceError(_('loading_{}_outside_piggybacked_dir') + .format(file_ref_name)) + + return path + + def archive_files(self) -> Iterable[tuple[PurePosixPath, Path]]: + """ + Yield all archive files in use. Each yielded tuple holds file's desired + path relative to the piggybacked archives directory to be created and + its current real path. + """ + for system, real_dir in self.archives.items(): + for path in real_dir.rglob('*'): + yield PurePosixPath(system) / path.relative_to(real_dir), path |