From 52d12a4fa124daa1595529e3e7008276a7986d95 Mon Sep 17 00:00:00 2001 From: Wojtek Kosior Date: Mon, 13 Jun 2022 11:06:49 +0200 Subject: unfinished partial work --- src/hydrilla/builder/__init__.py | 7 + src/hydrilla/builder/__main__.py | 9 + src/hydrilla/builder/_version.py | 5 + src/hydrilla/builder/build.py | 485 ++++++++++++++++++++++++++++++++++ src/hydrilla/builder/common_errors.py | 65 +++++ src/hydrilla/builder/local_apt.py | 432 ++++++++++++++++++++++++++++++ src/hydrilla/builder/piggybacking.py | 117 ++++++++ 7 files changed, 1120 insertions(+) create mode 100644 src/hydrilla/builder/__init__.py create mode 100644 src/hydrilla/builder/__main__.py create mode 100644 src/hydrilla/builder/_version.py create mode 100644 src/hydrilla/builder/build.py create mode 100644 src/hydrilla/builder/common_errors.py create mode 100644 src/hydrilla/builder/local_apt.py create mode 100644 src/hydrilla/builder/piggybacking.py (limited to 'src/hydrilla/builder') diff --git a/src/hydrilla/builder/__init__.py b/src/hydrilla/builder/__init__.py new file mode 100644 index 0000000..73dc579 --- /dev/null +++ b/src/hydrilla/builder/__init__.py @@ -0,0 +1,7 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +from .build import Build diff --git a/src/hydrilla/builder/__main__.py b/src/hydrilla/builder/__main__.py new file mode 100644 index 0000000..87dc9e2 --- /dev/null +++ b/src/hydrilla/builder/__main__.py @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: CC0-1.0 + +# Copyright (C) 2022 Wojtek Kosior +# +# Available under the terms of Creative Commons Zero v1.0 Universal. + +from . import build + +build.perform() diff --git a/src/hydrilla/builder/_version.py b/src/hydrilla/builder/_version.py new file mode 100644 index 0000000..2feb153 --- /dev/null +++ b/src/hydrilla/builder/_version.py @@ -0,0 +1,5 @@ +# coding: utf-8 +# file generated by setuptools_scm +# don't change, don't track in version control +version = '1.1b1' +version_tuple = (1, '1b1') diff --git a/src/hydrilla/builder/build.py b/src/hydrilla/builder/build.py new file mode 100644 index 0000000..acc6576 --- /dev/null +++ b/src/hydrilla/builder/build.py @@ -0,0 +1,485 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Building Hydrilla packages. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +# Enable using with Python 3.7. +from __future__ import annotations + +import json +import re +import zipfile +import subprocess +from pathlib import Path, PurePosixPath +from hashlib import sha256 +from sys import stderr +from contextlib import contextmanager +from tempfile import TemporaryDirectory, TemporaryFile +from typing import Optional, Iterable, Iterator, Union + +import jsonschema # type: ignore +import click + +from .. import _version, json_instances, versions +from ..translations import smart_gettext as _ +from . import local_apt +from .piggybacking import Piggybacked +from .common_errors import * + +here = Path(__file__).resolve().parent + +schemas_root = 'https://hydrilla.koszko.org/schemas' + +generated_by = { + 'name': 'hydrilla.builder', + 'version': _version.version +} + +class ReuseError(SubprocessError): + """ + Exception used to report various problems when calling the REUSE tool. + """ + +def generate_spdx_report(root: Path) -> bytes: + """ + Use REUSE tool to generate an SPDX report for sources under 'root' and + return the report's contents as 'bytes'. + + In case the directory tree under 'root' does not constitute a + REUSE-compliant package, as exception is raised with linting report + included in it. + + In case the reuse tool is not installed, an exception is also raised. + """ + for command in [ + ['reuse', '--root', str(root), 'lint'], + ['reuse', '--root', str(root), 'spdx'] + ]: + try: + cp = subprocess.run(command, capture_output=True, text=True) + except FileNotFoundError: + msg = _('couldnt_execute_{}_is_it_installed').format('reuse') + raise ReuseError(msg) + + if cp.returncode != 0: + msg = _('command_{}_failed').format(' '.join(command)) + raise ReuseError(msg, cp) + + return cp.stdout.encode() + +class FileRef: + """Represent reference to a file in the package.""" + def __init__(self, path: PurePosixPath, contents: bytes) -> None: + """Initialize FileRef.""" + self.include_in_distribution = False + self.include_in_source_archive = True + self.path = path + self.contents = contents + + self.contents_hash = sha256(contents).digest().hex() + + def make_ref_dict(self) -> dict[str, str]: + """ + Represent the file reference through a dict that can be included in JSON + defintions. + """ + return { + 'file': str(self.path), + 'sha256': self.contents_hash + } + +@contextmanager +def piggybacked_system(piggyback_def: Optional[dict], + piggyback_files: Optional[Path]) \ + -> Iterator[Piggybacked]: + """ + Resolve resources from a foreign software packaging system. Optionally, use + package files (.deb's, etc.) from a specified directory instead of resolving + and downloading them. + """ + if piggyback_def is None: + yield Piggybacked() + else: + # apt is the only supported system right now + assert piggyback_def['system'] == 'apt' + + with local_apt.piggybacked_system(piggyback_def, piggyback_files) \ + as piggybacked: + yield piggybacked + +class Build: + """ + Build a Hydrilla package. + """ + def __init__(self, srcdir: Path, index_json_path: Path, + piggyback_files: Optional[Path]=None): + """ + Initialize a build. All files to be included in a distribution package + are loaded into memory, all data gets validated and all necessary + computations (e.g. preparing of hashes) are performed. + """ + self.srcdir = srcdir.resolve() + self.piggyback_files = piggyback_files + if piggyback_files is None: + piggyback_default_path = \ + srcdir.parent / f'{srcdir.name}.foreign-packages' + if piggyback_default_path.exists(): + self.piggyback_files = piggyback_default_path + + self.files_by_path: dict[PurePosixPath, FileRef] = {} + self.resource_list: list[dict] = [] + self.mapping_list: list[dict] = [] + + if not index_json_path.is_absolute(): + index_json_path = (self.srcdir / index_json_path) + + index_obj = json_instances.read_instance(index_json_path) + schema_fmt = 'package_source-{}.schema.json' + major = json_instances.validate_instance(index_obj, schema_fmt) + + index_desired_path = PurePosixPath('index.json') + self.files_by_path[index_desired_path] = \ + FileRef(index_desired_path, index_json_path.read_bytes()) + + self._process_index_json(index_obj, major) + + def _process_file(self, filename: Union[str, PurePosixPath], + piggybacked: Piggybacked, + include_in_distribution: bool=True): + """ + Resolve 'filename' relative to srcdir, load it to memory (if not loaded + before), compute its hash and store its information in + 'self.files_by_path'. + + 'filename' shall represent a relative path withing package directory. + + if 'include_in_distribution' is True it shall cause the file to not only + be included in the source package's zipfile, but also written as one of + built package's files. + + For each file an attempt is made to resolve it using 'piggybacked' + object. If a file is found and pulled from foreign software packaging + system this way, it gets automatically excluded from inclusion in + Hydrilla source package's zipfile. + + Return file's reference object that can be included in JSON defintions + of various kinds. + """ + include_in_source_archive = True + + desired_path = PurePosixPath(filename) + if '..' in desired_path.parts: + msg = _('path_contains_double_dot_{}').format(filename) + raise FileReferenceError(msg) + + path = piggybacked.resolve_file(desired_path) + if path is None: + path = (self.srcdir / desired_path).resolve() + if not path.is_relative_to(self.srcdir): + raise FileReferenceError(_('loading_{}_outside_package_dir') + .format(filename)) + + if str(path.relative_to(self.srcdir)) == 'index.json': + raise FileReferenceError(_('loading_reserved_index_json')) + else: + include_in_source_archive = False + + file_ref = self.files_by_path.get(desired_path) + if file_ref is None: + if not path.is_file(): + msg = _('referenced_file_{}_missing').format(desired_path) + raise FileReferenceError(msg) + + file_ref = FileRef(desired_path, path.read_bytes()) + self.files_by_path[desired_path] = file_ref + + if include_in_distribution: + file_ref.include_in_distribution = True + + if not include_in_source_archive: + file_ref.include_in_source_archive = False + + return file_ref.make_ref_dict() + + def _prepare_source_package_zip(self, source_name: str, + piggybacked: Piggybacked) -> str: + """ + Create and store in memory a .zip archive containing files needed to + build this source package. + + 'src_dir_name' shall not contain any slashes ('/'). + + Return zipfile's sha256 sum's hexstring. + """ + tf = TemporaryFile() + source_dir_path = PurePosixPath(source_name) + piggybacked_dir_path = PurePosixPath(f'{source_name}.foreign-packages') + + with zipfile.ZipFile(tf, 'w') as zf: + for file_ref in self.files_by_path.values(): + if file_ref.include_in_source_archive: + zf.writestr(str(source_dir_path / file_ref.path), + file_ref.contents) + + for desired_path, real_path in piggybacked.archive_files(): + zf.writestr(str(piggybacked_dir_path / desired_path), + real_path.read_bytes()) + + tf.seek(0) + self.source_zip_contents = tf.read() + + return sha256(self.source_zip_contents).digest().hex() + + def _process_item(self, as_what: str, item_def: dict, + piggybacked: Piggybacked): + """ + Process 'item_def' as definition of a resource or mapping (determined by + 'as_what' param) and store in memory its processed form and files used + by it. + + Return a minimal item reference suitable for using in source + description. + """ + resulting_schema_version = [1] + + copy_props = ['identifier', 'long_name', 'description', + *filter(lambda p: p in item_def, ('comment', 'uuid'))] + + new_item_obj: dict = {} + + if as_what == 'resource': + item_list = self.resource_list + + copy_props.append('revision') + + script_file_refs = [self._process_file(f['file'], piggybacked) + for f in item_def.get('scripts', [])] + + deps = [{'identifier': res_ref['identifier']} + for res_ref in item_def.get('dependencies', [])] + + new_item_obj['dependencies'] = \ + [*piggybacked.resource_must_depend, *deps] + new_item_obj['scripts'] = script_file_refs + else: + item_list = self.mapping_list + + payloads = {} + for pat, res_ref in item_def.get('payloads', {}).items(): + payloads[pat] = {'identifier': res_ref['identifier']} + + new_item_obj['payloads'] = payloads + + new_item_obj['version'] = \ + versions.normalize_version(item_def['version']) + + if as_what == 'mapping' and item_def['type'] == "mapping_and_resource": + new_item_obj['version'].append(item_def['revision']) + + if self.source_schema_ver >= [2]: + # handle 'required_mappings' field + required = [{'identifier': map_ref['identifier']} + for map_ref in item_def.get('required_mappings', [])] + if required: + resulting_schema_version = max(resulting_schema_version, [2]) + new_item_obj['required_mappings'] = required + + # handle 'permissions' field + permissions = item_def.get('permissions', {}) + processed_permissions = {} + + if permissions.get('cors_bypass'): + processed_permissions['cors_bypass'] = True + if permissions.get('eval'): + processed_permissions['eval'] = True + + if processed_permissions: + new_item_obj['permissions'] = processed_permissions + resulting_schema_version = max(resulting_schema_version, [2]) + + # handle '{min,max}_haketilo_version' fields + for minmax, default in ('min', [1]), ('max', [65536]): + constraint = item_def.get(f'{minmax}_haketilo_version') + if constraint in (None, default): + continue + + copy_props.append(f'{minmax}_haketilo_version') + resulting_schema_version = max(resulting_schema_version, [2]) + + new_item_obj.update((p, item_def[p]) for p in copy_props) + + new_item_obj['$schema'] = ''.join([ + schemas_root, + f'/api_{as_what}_description', + '-', + versions.version_string(resulting_schema_version), + '.schema.json' + ]) + new_item_obj['type'] = as_what + new_item_obj['source_copyright'] = self.copyright_file_refs + new_item_obj['source_name'] = self.source_name + new_item_obj['generated_by'] = generated_by + + item_list.append(new_item_obj) + + props_in_ref = ('type', 'identifier', 'version', 'long_name') + return dict([(prop, new_item_obj[prop]) for prop in props_in_ref]) + + def _process_index_json(self, index_obj: dict, + major_schema_version: int) -> None: + """ + Process 'index_obj' as contents of source package's index.json and store + in memory this source package's zipfile as well as package's individual + files and computed definitions of the source package and items defined + in it. + """ + self.source_schema_ver = \ + versions.normalize_version(get_schema_version(index_obj)) + + out_schema = f'{schemas_root}/api_source_description-1.schema.json' + + self.source_name = index_obj['source_name'] + + generate_spdx = index_obj.get('reuse_generate_spdx_report', False) + if generate_spdx: + contents = generate_spdx_report(self.srcdir) + spdx_path = PurePosixPath('report.spdx') + spdx_ref = FileRef(spdx_path, contents) + + spdx_ref.include_in_source_archive = False + self.files_by_path[spdx_path] = spdx_ref + + piggyback_def = None + if self.source_schema_ver >= [2] and 'piggyback_on' in index_obj: + piggyback_def = index_obj['piggyback_on'] + + with piggybacked_system(piggyback_def, self.piggyback_files) \ + as piggybacked: + copyright_to_process = [ + *(file_ref['file'] for file_ref in index_obj['copyright']), + *piggybacked.package_license_files + ] + self.copyright_file_refs = [self._process_file(f, piggybacked) + for f in copyright_to_process] + + if generate_spdx and not spdx_ref.include_in_distribution: + raise FileReferenceError(_('report_spdx_not_in_copyright_list')) + + item_refs = [] + for item_def in index_obj['definitions']: + if 'mapping' in item_def['type']: + ref = self._process_item('mapping', item_def, piggybacked) + item_refs.append(ref) + if 'resource' in item_def['type']: + ref = self._process_item('resource', item_def, piggybacked) + item_refs.append(ref) + + for file_ref in index_obj.get('additional_files', []): + self._process_file(file_ref['file'], piggybacked, + include_in_distribution=False) + + zipfile_sha256 = self._prepare_source_package_zip\ + (self.source_name, piggybacked) + + source_archives_obj = {'zip' : {'sha256': zipfile_sha256}} + + self.source_description = { + '$schema': out_schema, + 'source_name': self.source_name, + 'source_copyright': self.copyright_file_refs, + 'upstream_url': index_obj['upstream_url'], + 'definitions': item_refs, + 'source_archives': source_archives_obj, + 'generated_by': generated_by + } + + if 'comment' in index_obj: + self.source_description['comment'] = index_obj['comment'] + + def write_source_package_zip(self, dstpath: Path): + """ + Create a .zip archive containing files needed to build this source + package and write it at 'dstpath'. + """ + with open(dstpath, 'wb') as output: + output.write(self.source_zip_contents) + + def write_package_files(self, dstpath: Path): + """Write package files under 'dstpath' for distribution.""" + file_dir_path = (dstpath / 'file' / 'sha256').resolve() + file_dir_path.mkdir(parents=True, exist_ok=True) + + for file_ref in self.files_by_path.values(): + if file_ref.include_in_distribution: + file_path = file_dir_path / file_ref.contents_hash + file_path.write_bytes(file_ref.contents) + + source_dir_path = (dstpath / 'source').resolve() + source_dir_path.mkdir(parents=True, exist_ok=True) + source_name = self.source_description["source_name"] + + with open(source_dir_path / f'{source_name}.json', 'wt') as out_str: + json.dump(self.source_description, out_str) + + with open(source_dir_path / f'{source_name}.zip', 'wb') as out_bin: + out_bin.write(self.source_zip_contents) + + for item_type, item_list in [ + ('resource', self.resource_list), + ('mapping', self.mapping_list) + ]: + item_type_dir_path = (dstpath / item_type).resolve() + + for item_def in item_list: + item_dir_path = item_type_dir_path / item_def['identifier'] + item_dir_path.mkdir(parents=True, exist_ok=True) + + version = '.'.join([str(n) for n in item_def['version']]) + with open(item_dir_path / version, 'wt') as output: + json.dump(item_def, output) + +dir_type = click.Path(exists=True, file_okay=False, resolve_path=True) + +@click.command(help=_('build_package_from_srcdir_to_dstdir')) +@click.option('-s', '--srcdir', default='./', type=dir_type, show_default=True, + help=_('source_directory_to_build_from')) +@click.option('-i', '--index-json', default='index.json', type=click.Path(), + help=_('path_instead_of_index_json')) +@click.option('-p', '--piggyback-files', type=click.Path(), + help=_('path_instead_for_piggyback_files')) +@click.option('-d', '--dstdir', type=dir_type, required=True, + help=_('built_package_files_destination')) +@click.version_option(version=_version.version, prog_name='Hydrilla builder', + message=_('%(prog)s_%(version)s_license'), + help=_('version_printing')) +def perform(srcdir, index_json, piggyback_files, dstdir): + """ + Execute Hydrilla builder to turn source package into a distributable one. + + This command is meant to be the entry point of hydrilla-builder command + exported by this package. + """ + build = Build(Path(srcdir), Path(index_json), + piggyback_files and Path(piggyback_files)) + build.write_package_files(Path(dstdir)) diff --git a/src/hydrilla/builder/common_errors.py b/src/hydrilla/builder/common_errors.py new file mode 100644 index 0000000..ed4d0d2 --- /dev/null +++ b/src/hydrilla/builder/common_errors.py @@ -0,0 +1,65 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Error classes. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module defines error types for use in other parts of Hydrilla builder. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +from pathlib import Path +from typing import Optional +from subprocess import CompletedProcess as CP + +from ..translations import smart_gettext as _ + +class DistroError(Exception): + """ + Exception used to report problems when resolving an OS distribution. + """ + +class FileReferenceError(Exception): + """ + Exception used to report various problems concerning files referenced from + source package. + """ + +class SubprocessError(Exception): + """ + Exception used to report problems related to execution of external + processes, includes. various problems when calling apt-* and dpkg-* + commands. + """ + def __init__(self, msg: str, cp: Optional[CP]=None) -> None: + """Initialize this SubprocessError""" + if cp and cp.stdout: + msg = '\n\n'.join([msg, _('STDOUT_OUTPUT_heading'), cp.stdout]) + + if cp and cp.stderr: + msg = '\n\n'.join([msg, _('STDERR_OUTPUT_heading'), cp.stderr]) + + super().__init__(msg) diff --git a/src/hydrilla/builder/local_apt.py b/src/hydrilla/builder/local_apt.py new file mode 100644 index 0000000..bdfc76f --- /dev/null +++ b/src/hydrilla/builder/local_apt.py @@ -0,0 +1,432 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Using a local APT. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +# Enable using with Python 3.7. +from __future__ import annotations + +import zipfile +import shutil +import re +import subprocess +CP = subprocess.CompletedProcess +from pathlib import Path, PurePosixPath +from tempfile import TemporaryDirectory, NamedTemporaryFile +from hashlib import sha256 +from urllib.parse import unquote +from contextlib import contextmanager +from typing import Optional, Iterable, Iterator + +from ..translations import smart_gettext as _ +from .piggybacking import Piggybacked +from .common_errors import * + +here = Path(__file__).resolve().parent + +""" +Default cache directory to save APT configurations and downloaded GPG keys in. +""" +default_apt_cache_dir = Path.home() / '.cache' / 'hydrilla' / 'builder' / 'apt' + +""" +Default keyserver to use. +""" +default_keyserver = 'hkps://keyserver.ubuntu.com:443' + +""" +Default keys to download when using a local APT. +""" +default_keys = [ + # Trisquel + 'E6C27099CA21965B734AEA31B4EFB9F38D8AEBF1', + '60364C9869F92450421F0C22B138CA450C05112F', + # Ubuntu + '630239CC130E1A7FD81A27B140976EAF437D05B5', + '790BC7277767219C42C86F933B4FE6ACC0B21F32', + 'F6ECB3762474EDA9D21B7022871920D1991BC93C', + # Debian + '6D33866EDD8FFA41C0143AEDDCC9EFBF77E11517', + '80D15823B7FD1561F9F7BCDDDC30D7C23CBBABEE', + 'AC530D520F2F3269F5E98313A48449044AAD5C5D' +] + +"""sources.list file contents for known distros.""" +default_lists = { + 'nabia': [f'{type} http://archive.trisquel.info/trisquel/ nabia{suf} main' + for type in ('deb', 'deb-src') + for suf in ('', '-updates', '-security')] +} + +class GpgError(Exception): + """ + Exception used to report various problems when calling GPG. + """ + +class AptError(SubprocessError): + """ + Exception used to report various problems when calling apt-* and dpkg-* + commands. + """ + +def run(command, **kwargs): + """A wrapped around subprocess.run that sets some default options.""" + return subprocess.run(command, **kwargs, env={'LANG': 'en_US'}, + capture_output=True, text=True) + +class Apt: + """ + This class represents an APT instance and can be used to call apt-get + commands with it. + """ + def __init__(self, apt_conf: str) -> None: + """Initialize this Apt object.""" + self.apt_conf = apt_conf + + def get(self, *args: str, **kwargs) -> CP: + """ + Run apt-get with the specified arguments and raise a meaningful AptError + when something goes wrong. + """ + command = ['apt-get', '-c', self.apt_conf, *args] + try: + cp = run(command, **kwargs) + except FileNotFoundError: + msg = _('couldnt_execute_{}_is_it_installed').format('apt-get') + raise AptError(msg) + + if cp.returncode != 0: + msg = _('command_{}_failed').format(' '.join(command)) + raise AptError(msg, cp) + + return cp + +def cache_dir() -> Path: + """ + Return the directory used to cache data (APT configurations, keyrings) to + speed up repeated operations. + + This function first ensures the directory exists. + """ + default_apt_cache_dir.mkdir(parents=True, exist_ok=True) + return default_apt_cache_dir + +class SourcesList: + """Representation of apt's sources.list contents.""" + def __init__(self, list: list[str]=[], + codename: Optional[str]=None) -> None: + """Initialize this SourcesList.""" + self.codename = None + self.list = [*list] + self.has_extra_entries = bool(self.list) + + if codename is not None: + if codename not in default_lists: + raise DistroError(_('distro_{}_unknown').format(codename)) + + self.codename = codename + self.list.extend(default_lists[codename]) + + def identity(self) -> str: + """ + Produce a string that uniquely identifies this sources.list contents. + """ + if self.codename and not self.has_extra_entries: + return self.codename + + return sha256('\n'.join(sorted(self.list)).encode()).digest().hex() + +def apt_conf(directory: Path) -> str: + """ + Given local APT's directory, produce a configuration suitable for running + APT there. + + 'directory' must not contain any special characters including quotes and + spaces. + """ + return f''' +Architecture "amd64"; +Dir "{directory}"; +Dir::State "{directory}/var/lib/apt"; +Dir::State::status "{directory}/var/lib/dpkg/status"; +Dir::Etc::SourceList "{directory}/etc/apt.sources.list"; +Dir::Etc::SourceParts ""; +Dir::Cache "{directory}/var/cache/apt"; +pkgCacheGen::Essential "none"; +Dir::Etc::Trusted "{directory}/etc/trusted.gpg"; +''' + +def apt_keyring(keys: list[str]) -> bytes: + """ + Download the requested keys if necessary and export them as a keyring + suitable for passing to APT. + + The keyring is returned as a bytes value that should be written to a file. + """ + try: + from gnupg import GPG # type: ignore + except ModuleNotFoundError: + raise GpgError(_('couldnt_import_{}_is_it_installed').format('gnupg')) + + gpg = GPG(keyring=str(cache_dir() / 'master_keyring.gpg')) + for key in keys: + if gpg.list_keys(keys=[key]) != []: + continue + + if gpg.recv_keys(default_keyserver, key).imported == 0: + raise GpgError(_('gpg_couldnt_recv_key_{}').format(key)) + + return gpg.export_keys(keys, armor=False, minimal=True) + +def cache_apt_root(apt_root: Path, destination_zip: Path) -> None: + """ + Zip an APT root directory for later use and move the zipfile to the + requested destination. + """ + temporary_zip_path = None + try: + tmpfile = NamedTemporaryFile(suffix='.zip', prefix='tmp_', + dir=cache_dir(), delete=False) + temporary_zip_path = Path(tmpfile.name) + + to_skip = {Path('etc') / 'apt.conf', Path('etc') / 'trusted.gpg'} + + with zipfile.ZipFile(tmpfile, 'w') as zf: + for member in apt_root.rglob('*'): + relative = member.relative_to(apt_root) + if relative not in to_skip: + # This call will also properly add empty folders to zip file + zf.write(member, relative, zipfile.ZIP_DEFLATED) + + shutil.move(temporary_zip_path, destination_zip) + finally: + if temporary_zip_path is not None and temporary_zip_path.exists(): + temporary_zip_path.unlink() + +def setup_local_apt(directory: Path, list: SourcesList, keys: list[str]) -> Apt: + """ + Create files and directories necessary for running APT without root rights + inside 'directory'. + + 'directory' must not contain any special characters including quotes and + spaces and must be empty. + + Return an Apt object that can be used to call apt-get commands. + """ + apt_root = directory / 'apt_root' + + conf_text = apt_conf(apt_root) + keyring_bytes = apt_keyring(keys) + + apt_zipfile = cache_dir() / f'apt_{list.identity()}.zip' + if apt_zipfile.exists(): + with zipfile.ZipFile(apt_zipfile) as zf: + zf.extractall(apt_root) + + for to_create in ( + apt_root / 'var' / 'lib' / 'apt' / 'partial', + apt_root / 'var' / 'lib' / 'apt' / 'lists', + apt_root / 'var' / 'cache' / 'apt' / 'archives' / 'partial', + apt_root / 'etc' / 'apt' / 'preferences.d', + apt_root / 'var' / 'lib' / 'dpkg', + apt_root / 'var' / 'log' / 'apt' + ): + to_create.mkdir(parents=True, exist_ok=True) + + conf_path = apt_root / 'etc' / 'apt.conf' + trusted_path = apt_root / 'etc' / 'trusted.gpg' + status_path = apt_root / 'var' / 'lib' / 'dpkg' / 'status' + list_path = apt_root / 'etc' / 'apt.sources.list' + + conf_path.write_text(conf_text) + trusted_path.write_bytes(keyring_bytes) + status_path.touch() + list_path.write_text('\n'.join(list.list)) + + apt = Apt(str(conf_path)) + apt.get('update') + + cache_apt_root(apt_root, apt_zipfile) + + return apt + +@contextmanager +def local_apt(list: SourcesList, keys: list[str]) -> Iterator[Apt]: + """ + Create a temporary directory with proper local APT configuration in it. + Yield an Apt object that can be used to issue apt-get commands. + + This function returns a context manager that will remove the directory on + close. + """ + with TemporaryDirectory() as td_str: + td = Path(td_str) + yield setup_local_apt(td, list, keys) + +def download_apt_packages(list: SourcesList, keys: list[str], + packages: list[str], destination_dir: Path, + with_deps: bool) -> list[str]: + """ + Set up a local APT, update it using the specified sources.list configuration + and use it to download the specified packages. + + This function downloads .deb files of packages matching the amd64 + architecture (which includes packages with architecture 'all') as well as + all their corresponding source package files and (if requested) the debs + and source files of all their declared dependencies. + + Return value is a list of names of all downloaded files. + """ + install_line_regex = re.compile(r'^Inst (?P\S+) \((?P\S+) ') + + with local_apt(list, keys) as apt: + if with_deps: + cp = apt.get('install', '--yes', '--just-print', *packages) + + lines = cp.stdout.split('\n') + matches = [install_line_regex.match(l) for l in lines] + packages = [f'{m.group("name")}={m.group("version")}' + for m in matches if m] + + if not packages: + raise AptError(_('apt_install_output_not_understood'), cp) + + # Download .debs to indirectly to destination_dir by first placing them + # in a temporary subdirectory. + with TemporaryDirectory(dir=destination_dir) as td_str: + td = Path(td_str) + cp = apt.get('download', *packages, cwd=td) + + deb_name_regex = re.compile( + r''' + ^ + (?P[^_]+) + _ + (?P[^_]+) + _ + .+ # architecture (or 'all') + \.deb + $ + ''', + re.VERBOSE) + + names_vers = [] + downloaded = [] + for deb_file in td.iterdir(): + match = deb_name_regex.match(deb_file.name) + if match is None: + msg = _('apt_download_gave_bad_filename_{}')\ + .format(deb_file.name) + raise AptError(msg, cp) + + names_vers.append(( + unquote(match.group('name')), + unquote(match.group('ver')) + )) + downloaded.append(deb_file.name) + + apt.get('source', '--download-only', + *[f'{n}={v}' for n, v in names_vers], cwd=td) + + for source_file in td.iterdir(): + if source_file.name in downloaded: + continue + + downloaded.append(source_file.name) + + for filename in downloaded: + shutil.move(td / filename, destination_dir / filename) + + return downloaded + +@contextmanager +def piggybacked_system(piggyback_def: dict, foreign_packages: Optional[Path]) \ + -> Iterator[Piggybacked]: + """ + Resolve resources from APT. Optionally, use package files (.deb's, etc.) + from a specified directory instead of resolving and downloading them. + + The directories and files created for the yielded Piggybacked object shall + be deleted when this context manager gets closed. + """ + assert piggyback_def['system'] == 'apt' + + with TemporaryDirectory() as td_str: + td = Path(td_str) + root = td / 'root' + root.mkdir() + + if foreign_packages is None: + archives = td / 'archives' + archives.mkdir() + else: + archives = foreign_packages / 'apt' + archives.mkdir(exist_ok=True) + + if [*archives.glob('*.deb')] == []: + sources_list = SourcesList(piggyback_def.get('sources_list', []), + piggyback_def.get('distribution')) + packages = piggyback_def['packages'] + with_deps = piggyback_def['dependencies'] + pgp_keys = [ + *default_keys, + *piggyback_def.get('trusted_keys', []) + ] + + download_apt_packages( + list=sources_list, + keys=pgp_keys, + packages=packages, + destination_dir=archives, + with_deps=with_deps + ) + + for deb in archives.glob('*.deb'): + command = ['dpkg-deb', '-x', str(deb), str(root)] + try: + cp = run(command) + except FileNotFoundError: + msg = _('couldnt_execute_{}_is_it_installed'.format('dpkg-deb')) + raise AptError(msg) + + if cp.returncode != 0: + msg = _('command_{}_failed').format(' '.join(command)) + raise AptError(msg, cp) + + docs_dir = root / 'usr' / 'share' / 'doc' + copyright_paths = [p / 'copyright' for p in docs_dir.iterdir()] \ + if docs_dir.exists() else [] + copyright_pure_paths = [PurePosixPath('.apt-root') / p.relative_to(root) + for p in copyright_paths if p.exists()] + + standard_depends = piggyback_def.get('depend_on_base_packages', True) + must_depend = [{'identifier': 'apt-common-licenses'}] \ + if standard_depends else [] + + yield Piggybacked( + archives={'apt': archives}, + roots={'.apt-root': root}, + package_license_files=copyright_pure_paths, + resource_must_depend=must_depend + ) diff --git a/src/hydrilla/builder/piggybacking.py b/src/hydrilla/builder/piggybacking.py new file mode 100644 index 0000000..5813509 --- /dev/null +++ b/src/hydrilla/builder/piggybacking.py @@ -0,0 +1,117 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Handling of software packaged for other distribution systems. +# +# This file is part of Hydrilla +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module contains definitions that may be reused by multiple piggybacked +software system backends. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +from pathlib import Path, PurePosixPath +from typing import Optional, Iterable + +from ..translations import smart_gettext as _ +from .common_errors import * + +here = Path(__file__).resolve().parent + +class Piggybacked: + """ + Store information about foreign resources in use. + + Public attributes: + 'resource_must_depend' (read-only) + 'package_license_files' (read-only) + """ + def __init__(self, archives: dict[str, Path]={}, roots: dict[str, Path]={}, + package_license_files: list[PurePosixPath]=[], + resource_must_depend: list[dict]=[]): + """ + Initialize this Piggybacked object. + + 'archives' maps piggybacked system names to directories that contain + package(s)' archive files. An 'archives' object may look like + {'apt': PosixPath('/path/to/dir/with/debs/and/tarballs')}. + + 'roots' associates directory names to be virtually inserted under + Hydrilla source package directory with paths to real filesystem + directories that hold their desired contents, i.e. unpacked foreign + packages. + + 'package_license_files' lists paths to license files that should be + included with the Haketilo package that will be produced. The paths are + to be resolved using 'roots' dictionary. + + 'resource_must_depend' lists names of Haketilo packages that the + produced resources will additionally depend on. This is meant to help + distribute common licenses with a separate Haketilo package. + """ + self.archives = archives + self.roots = roots + self.package_license_files = package_license_files + self.resource_must_depend = resource_must_depend + + def resolve_file(self, file_ref_name: PurePosixPath) -> Optional[Path]: + """ + 'file_ref_name' is a path as may appear in an index.json file. Check if + the file belongs to one of the roots we have and return either a path + to the relevant file under this root or None. + + It is not being checked whether the file actually exists in the + filesystem. + """ + parts = file_ref_name.parts + if not parts: + return None + + root_path = self.roots.get(parts[0]) + if root_path is None: + return None + + path = root_path + + for part in parts[1:]: + path = path / part + + path = path.resolve() + + if not path.is_relative_to(root_path): + raise FileReferenceError(_('loading_{}_outside_piggybacked_dir') + .format(file_ref_name)) + + return path + + def archive_files(self) -> Iterable[tuple[PurePosixPath, Path]]: + """ + Yield all archive files in use. Each yielded tuple holds file's desired + path relative to the piggybacked archives directory to be created and + its current real path. + """ + for system, real_dir in self.archives.items(): + for path in real_dir.rglob('*'): + yield PurePosixPath(system) / path.relative_to(real_dir), path -- cgit v1.2.3