aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/builder/build.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/builder/build.py')
-rw-r--r--src/hydrilla/builder/build.py375
1 files changed, 375 insertions, 0 deletions
diff --git a/src/hydrilla/builder/build.py b/src/hydrilla/builder/build.py
new file mode 100644
index 0000000..d89ead3
--- /dev/null
+++ b/src/hydrilla/builder/build.py
@@ -0,0 +1,375 @@
+# SPDX-License-Identifier: AGPL-3.0-or-later
+
+# Building Hydrilla packages.
+#
+# This file is part of Hydrilla
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+import json
+import re
+import zipfile
+from pathlib import Path
+from hashlib import sha256
+from sys import stderr
+
+import jsonschema
+
+from .. import util
+
+here = Path(__file__).resolve().parent
+with open(here / 'schemas' / 'package_source-1.schema.json') as schema_file:
+ index_json_schema = json.load(schema_file)
+
+class FileReferenceError(Exception):
+ """
+ Exception used to report various problems concerning files referenced from
+ source package's index.json.
+ """
+
+class ReuseError(Exception):
+ """
+ Exception used to report various problems when calling the REUSE tool.
+ """
+
+class FileBuffer:
+ """
+ Implement a file-like object that buffers data written to it.
+ """
+ def __init__(self):
+ """
+ Initialize FileBuffer.
+ """
+ self.chunks = []
+
+ def write(self, b):
+ """
+ Buffer 'b', return number of bytes buffered.
+
+ 'b' is expected to be an instance of 'bytes' or 'str', in which case it
+ gets encoded as UTF-8.
+ """
+ if type(b) is str:
+ b = b.encode()
+ self.chunks.append(b)
+ return len(b)
+
+ def flush(self):
+ """
+ A no-op mock of file-like object's flush() method.
+ """
+ pass
+
+ def get_bytes(self):
+ """
+ Return all data written so far concatenated into a single 'bytes'
+ object.
+ """
+ return b''.join(self.chunks)
+
+def generate_spdx_report(root):
+ """
+ Use REUSE tool to generate an SPDX report for sources under 'root' and
+ return the report's contents as 'bytes'.
+
+ 'root' shall be an instance of pathlib.Path.
+
+ In case the directory tree under 'root' does not constitute a
+ REUSE-compliant package, linting report is printed to standard output and
+ an exception is raised.
+
+ In case the reuse package is not installed, an exception is also raised.
+ """
+ try:
+ from reuse._main import main as reuse_main
+ except ModuleNotFoundError:
+ ReuseError("Could not import 'reuse'. Is the tool installed and visible to this Python instance?")
+
+ mocked_output = FileBuffer()
+ if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0:
+ stderr.write(mocked_output.get_bytes().decode())
+ raise ReuseError('Attempt to generate an SPDX report for a REUSE-incompliant package.')
+
+ mocked_output = FileBuffer()
+ if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0:
+ stderr.write(mocked_output.get_bytes().decode())
+ raise ReuseError("Couldn't generate an SPDX report for package.")
+
+ return mocked_output.get_bytes()
+
+class FileRef:
+ """Represent reference to a file in the package."""
+ def __init__(self, path: Path, contents: bytes):
+ """Initialize FileRef."""
+ self.include_in_distribution = False
+ self.include_in_zipfile = True
+ self.path = path
+ self.contents = contents
+
+ self.contents_hash = sha256(contents).digest().hex()
+
+ def make_ref_dict(self, filename: str):
+ """
+ Represent the file reference through a dict that can be included in JSON
+ defintions.
+ """
+ return {
+ 'file': filename,
+ 'sha256': self.contents_hash
+ }
+
+class Build:
+ """
+ Build a Hydrilla package.
+ """
+ def __init__(self, srcdir, index_json_path):
+ """
+ Initialize a build. All files to be included in a distribution package
+ are loaded into memory, all data gets validated and all necessary
+ computations (e.g. preparing of hashes) are performed.
+
+ 'srcdir' and 'index_json' are expected to be pathlib.Path objects.
+ """
+ self.srcdir = srcdir.resolve()
+ self.index_json_path = index_json_path
+ self.files_by_path = {}
+ self.resource_list = []
+ self.mapping_list = []
+
+ if not index_json_path.is_absolute():
+ self.index_json_path = (self.srcdir / self.index_json_path)
+
+ self.index_json_path = self.index_json_path.resolve()
+
+ with open(self.index_json_path, 'rt') as index_file:
+ index_json_text = index_file.read()
+
+ index_obj = json.loads(util.strip_json_comments(index_json_text))
+
+ self.files_by_path[self.srcdir / 'index.json'] = \
+ FileRef(self.srcdir / 'index.json', index_json_text.encode())
+
+ self._process_index_json(index_obj)
+
+ def _process_file(self, filename: str, include_in_distribution: bool=True):
+ """
+ Resolve 'filename' relative to srcdir, load it to memory (if not loaded
+ before), compute its hash and store its information in
+ 'self.files_by_path'.
+
+ 'filename' shall represent a relative path using '/' as a separator.
+
+ if 'include_in_distribution' is True it shall cause the file to not only
+ be included in the source package's zipfile, but also written as one of
+ built package's files.
+
+ Return file's reference object that can be included in JSON defintions
+ of various kinds.
+ """
+ path = self.srcdir
+ for segment in filename.split('/'):
+ path /= segment
+
+ path = path.resolve()
+ if not path.is_relative_to(self.srcdir):
+ raise FileReferenceError(f"Attempt to load '{filename}' which lies outside package source directory.")
+
+ if str(path.relative_to(self.srcdir)) == 'index.json':
+ raise FileReferenceError("Attempt to load 'index.json' which is a reserved filename.")
+
+ file_ref = self.files_by_path.get(path)
+ if file_ref is None:
+ with open(path, 'rb') as file_handle:
+ contents = file_handle.read()
+
+ file_ref = FileRef(path, contents)
+ self.files_by_path[path] = file_ref
+
+ if include_in_distribution:
+ file_ref.include_in_distribution = True
+
+ return file_ref.make_ref_dict(filename)
+
+ def _prepare_source_package_zip(self, root_dir_name: str):
+ """
+ Create and store in memory a .zip archive containing files needed to
+ build this source package.
+
+ 'root_dir_name' shall not contain any slashes ('/').
+
+ Return zipfile's sha256 sum's hexstring.
+ """
+ fb = FileBuffer()
+ root_dir_path = Path(root_dir_name)
+
+ def zippath(file_path):
+ file_path = root_dir_path / file_path.relative_to(self.srcdir)
+ return file_path.as_posix()
+
+ with zipfile.ZipFile(fb, 'w') as xpi:
+ for file_ref in self.files_by_path.values():
+ if file_ref.include_in_zipfile:
+ xpi.writestr(zippath(file_ref.path), file_ref.contents)
+
+ self.source_zip_contents = fb.get_bytes()
+
+ return sha256(self.source_zip_contents).digest().hex()
+
+ def _process_item(self, item_def: dict):
+ """
+ Process 'item_def' as definition of a resource/mapping and store in
+ memory its processed form and files used by it.
+
+ Return a minimal item reference suitable for using in source
+ description.
+ """
+ copy_props = ['type', 'identifier', 'long_name', 'uuid', 'description']
+ if 'comment' in item_def:
+ copy_props.append('comment')
+
+ if item_def['type'] == 'resource':
+ item_list = self.resource_list
+
+ copy_props.append('revision')
+
+ script_file_refs = [self._process_file(f['file'])
+ for f in item_def.get('scripts', [])]
+
+ new_item_obj = {
+ 'dependencies': item_def.get('dependencies', []),
+ 'scripts': script_file_refs
+ }
+ else:
+ item_list = self.mapping_list
+
+ payloads = {}
+ for pat, res_ref in item_def.get('payloads', {}).items():
+ payloads[pat] = {'identifier': res_ref['identifier']}
+
+ new_item_obj = {
+ 'payloads': payloads
+ }
+
+ new_item_obj.update([(p, item_def[p]) for p in copy_props])
+
+ new_item_obj['version'] = util.normalize_version(item_def['version'])
+ new_item_obj['api_schema_version'] = [1, 0, 1]
+ new_item_obj['source_copyright'] = self.copyright_file_refs
+ new_item_obj['source_name'] = self.source_name
+
+ item_list.append(new_item_obj)
+
+ return dict([(prop, new_item_obj[prop])
+ for prop in ('type', 'identifier', 'version')])
+
+ def _process_index_json(self, index_obj: dict):
+ """
+ Process 'index_obj' as contents of source package's index.json and store
+ in memory this source package's zipfile as well as package's individual
+ files and computed definitions of the source package and items defined
+ in it.
+ """
+ jsonschema.validate(index_obj, index_json_schema)
+
+ self.source_name = index_obj['source_name']
+
+ generate_spdx = index_obj.get('reuse_generate_spdx_report', False)
+ if generate_spdx:
+ contents = generate_spdx_report(self.srcdir)
+ spdx_path = (self.srcdir / 'report.spdx').resolve()
+ spdx_ref = FileRef(spdx_path, contents)
+
+ spdx_ref.include_in_zipfile = False
+ self.files_by_path[spdx_path] = spdx_ref
+
+ self.copyright_file_refs = \
+ [self._process_file(f['file']) for f in index_obj['copyright']]
+
+ if generate_spdx and not spdx_ref.include_in_distribution:
+ raise FileReferenceError("Told to generate 'report.spdx' but 'report.spdx' is not listed among copyright files. Refusing to proceed.")
+
+ item_refs = [self._process_item(d) for d in index_obj['definitions']]
+
+ for file_ref in index_obj.get('additional_files', []):
+ self._process_file(file_ref['file'], include_in_distribution=False)
+
+ root_dir_path = Path(self.source_name)
+
+ source_archives_obj = {
+ 'zip' : {
+ 'sha256': self._prepare_source_package_zip(root_dir_path)
+ }
+ }
+
+ self.source_description = {
+ 'api_schema_version': [1, 0, 1],
+ 'source_name': self.source_name,
+ 'source_copyright': self.copyright_file_refs,
+ 'upstream_url': index_obj['upstream_url'],
+ 'definitions': item_refs,
+ 'source_archives': source_archives_obj
+ }
+
+ if 'comment' in index_obj:
+ self.source_description['comment'] = index_obj['comment']
+
+ def write_source_package_zip(self, dstpath: Path):
+ """
+ Create a .zip archive containing files needed to build this source
+ package and write it at 'dstpath'.
+ """
+ with open(dstpath, 'wb') as output:
+ output.write(self.source_zip_contents)
+
+ def write_package_files(self, dstpath: Path):
+ """Write package files under 'dstpath' for distribution."""
+ file_dir_path = (dstpath / 'file').resolve()
+ file_dir_path.mkdir(parents=True, exist_ok=True)
+
+ for file_ref in self.files_by_path.values():
+ if file_ref.include_in_distribution:
+ file_name = f'sha256-{file_ref.contents_hash}'
+ with open(file_dir_path / file_name, 'wb') as output:
+ output.write(file_ref.contents)
+
+ source_dir_path = (dstpath / 'source').resolve()
+ source_dir_path.mkdir(parents=True, exist_ok=True)
+ source_name = self.source_description["source_name"]
+
+ with open(source_dir_path / f'{source_name}.json', 'wt') as output:
+ json.dump(self.source_description, output)
+
+ with open(source_dir_path / f'{source_name}.zip', 'wb') as output:
+ output.write(self.source_zip_contents)
+
+ for item_type, item_list in [
+ ('resource', self.resource_list),
+ ('mapping', self.mapping_list)
+ ]:
+ item_type_dir_path = (dstpath / item_type).resolve()
+
+ for item_def in item_list:
+ item_dir_path = item_type_dir_path / item_def['identifier']
+ item_dir_path.mkdir(parents=True, exist_ok=True)
+
+ version = '.'.join([str(n) for n in item_def['version']])
+ with open(item_dir_path / version, 'wt') as output:
+ json.dump(item_def, output)