summaryrefslogtreecommitdiff
path: root/src/hydrilla_builder/build.py
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-02-07 16:51:11 +0100
committerWojtek Kosior <koszko@koszko.org>2022-02-07 16:51:11 +0100
commit16eaeb86948349141b1e6072eb6540c7cece10b6 (patch)
treea5d69a3dbc448b15f016316db31c8280a13a1b10 /src/hydrilla_builder/build.py
parentb5eb89e10762fb7e71c2e8e0cf28ab7c5206884a (diff)
downloadhydrilla-builder-16eaeb86948349141b1e6072eb6540c7cece10b6.tar.gz
hydrilla-builder-16eaeb86948349141b1e6072eb6540c7cece10b6.zip
move to a namespace package under 'hydrilla'
Diffstat (limited to 'src/hydrilla_builder/build.py')
-rw-r--r--src/hydrilla_builder/build.py434
1 files changed, 0 insertions, 434 deletions
diff --git a/src/hydrilla_builder/build.py b/src/hydrilla_builder/build.py
deleted file mode 100644
index 652e537..0000000
--- a/src/hydrilla_builder/build.py
+++ /dev/null
@@ -1,434 +0,0 @@
-# SPDX-License-Identifier: AGPL-3.0-or-later
-
-# Building Hydrilla packages.
-#
-# This file is part of Hydrilla
-#
-# Copyright (C) 2021,2022 Wojtek Kosior
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <https://www.gnu.org/licenses/>.
-#
-#
-# I, Wojtek Kosior, thereby promise not to sue for violation of this
-# file's license. Although I request that you do not make use this code
-# in a proprietary program, I am not going to enforce this in court.
-
-
-import json
-import re
-import zipfile
-from pathlib import Path
-from hashlib import sha256
-from sys import stderr
-
-import jsonschema
-
-here = Path(__file__).resolve().parent
-with open(here / 'schemas' / 'package_source-1.schema.json') as schema_file:
- index_json_schema = json.load(schema_file)
-
-class FileReferenceError(Exception):
- """
- Exception used to report various problems concerning files referenced from
- source package's index.json.
- """
-
-class ReuseError(Exception):
- """
- Exception used to report various problems when calling the REUSE tool.
- """
-
-strip_comment_re = re.compile(r'''
-^ # match from the beginning of each line
-( # catch the part before '//' comment
- (?: # this group matches either a string or a single out-of-string character
- [^"/] |
- "
- (?: # this group matches any in-a-string character
- [^"\\] | # match any normal character
- \\[^u] | # match any escaped character like '\f' or '\n'
- \\u[a-fA-F0-9]{4} # match an escape
- )*
- "
- )*
-)
-# expect either end-of-line or a comment:
-# * unterminated strings will cause matching to fail
-# * bad comment (with '/' instead of '//') will be indicated by second group
-# having length 1 instead of 2 or 0
-(//?|$)
-''', re.VERBOSE)
-
-def strip_json_comments(text):
- """
- Accept JSON text with optional C++-style ('//') comments and return the text
- with comments removed. Consecutive slashes inside strings are handled
- properly. A spurious single slash ('/') shall generate an error. Errors in
- JSON itself shall be ignored.
- """
- processed = 0
- stripped_text = []
- for line in text.split('\n'):
- match = strip_comment_re.match(line)
-
- if match is None: # unterminated string
- # ignore this error, let json module report it
- stripped = line
- elif len(match[2]) == 1:
- raise json.JSONDecodeError('bad comment', text,
- processed + len(match[1]))
- else:
- stripped = match[1]
-
- stripped_text.append(stripped)
- processed += len(line) + 1
-
- return '\n'.join(stripped_text)
-
-def normalize_version(ver):
- '''
- 'ver' is an array of integers. Strip right-most zeroes from ver.
-
- Returns a *new* array. Doesn't modify its argument.
- '''
- new_len = 0
- for i, num in enumerate(ver):
- if num != 0:
- new_len = i + 1
-
- return ver[:new_len]
-
-class FileBuffer:
- """
- Implement a file-like object that buffers data written to it.
- """
- def __init__(self):
- """
- Initialize FileBuffer.
- """
- self.chunks = []
-
- def write(self, b):
- """
- Buffer 'b', return number of bytes buffered.
-
- 'b' is expected to be an instance of 'bytes' or 'str', in which case it
- gets encoded as UTF-8.
- """
- if type(b) is str:
- b = b.encode()
- self.chunks.append(b)
- return len(b)
-
- def flush(self):
- """
- A no-op mock of file-like object's flush() method.
- """
- pass
-
- def get_bytes(self):
- """
- Return all data written so far concatenated into a single 'bytes'
- object.
- """
- return b''.join(self.chunks)
-
-def generate_spdx_report(root):
- """
- Use REUSE tool to generate an SPDX report for sources under 'root' and
- return the report's contents as 'bytes'.
-
- 'root' shall be an instance of pathlib.Path.
-
- In case the directory tree under 'root' does not constitute a
- REUSE-compliant package, linting report is printed to standard output and
- an exception is raised.
-
- In case the reuse package is not installed, an exception is also raised.
- """
- try:
- from reuse._main import main as reuse_main
- except ModuleNotFoundError:
- ReuseError("Could not import 'reuse'. Is the tool installed and visible to this Python instance?")
-
- mocked_output = FileBuffer()
- if reuse_main(args=['--root', str(root), 'lint'], out=mocked_output) != 0:
- stderr.write(mocked_output.get_bytes().decode())
- raise ReuseError('Attempt to generate an SPDX report for a REUSE-incompliant package.')
-
- mocked_output = FileBuffer()
- if reuse_main(args=['--root', str(root), 'spdx'], out=mocked_output) != 0:
- stderr.write(mocked_output.get_bytes().decode())
- raise ReuseError("Couldn't generate an SPDX report for package.")
-
- return mocked_output.get_bytes()
-
-class FileRef:
- """Represent reference to a file in the package."""
- def __init__(self, path: Path, contents: bytes):
- """Initialize FileRef."""
- self.include_in_distribution = False
- self.include_in_zipfile = True
- self.path = path
- self.contents = contents
-
- self.contents_hash = sha256(contents).digest().hex()
-
- def make_ref_dict(self, filename: str):
- """
- Represent the file reference through a dict that can be included in JSON
- defintions.
- """
- return {
- 'file': filename,
- 'sha256': self.contents_hash
- }
-
-class Build:
- """
- Build a Hydrilla package.
- """
- def __init__(self, srcdir, index_json_path):
- """
- Initialize a build. All files to be included in a distribution package
- are loaded into memory, all data gets validated and all necessary
- computations (e.g. preparing of hashes) are performed.
-
- 'srcdir' and 'index_json' are expected to be pathlib.Path objects.
- """
- self.srcdir = srcdir.resolve()
- self.index_json_path = index_json_path
- self.files_by_path = {}
- self.resource_list = []
- self.mapping_list = []
-
- if not index_json_path.is_absolute():
- self.index_json_path = (self.srcdir / self.index_json_path)
-
- self.index_json_path = self.index_json_path.resolve()
-
- with open(self.index_json_path, 'rt') as index_file:
- index_json_text = index_file.read()
-
- index_obj = json.loads(strip_json_comments(index_json_text))
-
- self.files_by_path[self.srcdir / 'index.json'] = \
- FileRef(self.srcdir / 'index.json', index_json_text.encode())
-
- self._process_index_json(index_obj)
-
- def _process_file(self, filename: str, include_in_distribution: bool=True):
- """
- Resolve 'filename' relative to srcdir, load it to memory (if not loaded
- before), compute its hash and store its information in
- 'self.files_by_path'.
-
- 'filename' shall represent a relative path using '/' as a separator.
-
- if 'include_in_distribution' is True it shall cause the file to not only
- be included in the source package's zipfile, but also written as one of
- built package's files.
-
- Return file's reference object that can be included in JSON defintions
- of various kinds.
- """
- path = self.srcdir
- for segment in filename.split('/'):
- path /= segment
-
- path = path.resolve()
- if not path.is_relative_to(self.srcdir):
- raise FileReferenceError(f"Attempt to load '{filename}' which lies outside package source directory.")
-
- if str(path.relative_to(self.srcdir)) == 'index.json':
- raise FileReferenceError("Attempt to load 'index.json' which is a reserved filename.")
-
- file_ref = self.files_by_path.get(path)
- if file_ref is None:
- with open(path, 'rb') as file_handle:
- contents = file_handle.read()
-
- file_ref = FileRef(path, contents)
- self.files_by_path[path] = file_ref
-
- if include_in_distribution:
- file_ref.include_in_distribution = True
-
- return file_ref.make_ref_dict(filename)
-
- def _prepare_source_package_zip(self, root_dir_name: str):
- """
- Create and store in memory a .zip archive containing files needed to
- build this source package.
-
- 'root_dir_name' shall not contain any slashes ('/').
-
- Return zipfile's sha256 sum's hexstring.
- """
- fb = FileBuffer()
- root_dir_path = Path(root_dir_name)
-
- def zippath(file_path):
- file_path = root_dir_path / file_path.relative_to(self.srcdir)
- return file_path.as_posix()
-
- with zipfile.ZipFile(fb, 'w') as xpi:
- for file_ref in self.files_by_path.values():
- if file_ref.include_in_zipfile:
- xpi.writestr(zippath(file_ref.path), file_ref.contents)
-
- self.source_zip_contents = fb.get_bytes()
-
- return sha256(self.source_zip_contents).digest().hex()
-
- def _process_item(self, item_def: dict):
- """
- Process 'item_def' as definition of a resource/mapping and store in
- memory its processed form and files used by it.
-
- Return a minimal item reference suitable for using in source
- description.
- """
- copy_props = ['type', 'identifier', 'long_name', 'uuid', 'description']
- if 'comment' in item_def:
- copy_props.append('comment')
-
- if item_def['type'] == 'resource':
- item_list = self.resource_list
-
- copy_props.append('revision')
-
- script_file_refs = [self._process_file(f['file'])
- for f in item_def.get('scripts', [])]
-
- new_item_obj = {
- 'dependencies': item_def.get('dependencies', []),
- 'scripts': script_file_refs
- }
- else:
- item_list = self.mapping_list
-
- payloads = {}
- for pat, res_ref in item_def.get('payloads', {}).items():
- payloads[pat] = {'identifier': res_ref['identifier']}
-
- new_item_obj = {
- 'payloads': payloads
- }
-
- new_item_obj.update([(p, item_def[p]) for p in copy_props])
-
- new_item_obj['version'] = normalize_version(item_def['version'])
- new_item_obj['api_schema_version'] = [1, 0, 1]
- new_item_obj['source_copyright'] = self.copyright_file_refs
- new_item_obj['source_name'] = self.source_name
-
- item_list.append(new_item_obj)
-
- return dict([(prop, new_item_obj[prop])
- for prop in ('type', 'identifier', 'version')])
-
- def _process_index_json(self, index_obj: dict):
- """
- Process 'index_obj' as contents of source package's index.json and store
- in memory this source package's zipfile as well as package's individual
- files and computed definitions of the source package and items defined
- in it.
- """
- jsonschema.validate(index_obj, index_json_schema)
-
- self.source_name = index_obj['source_name']
-
- generate_spdx = index_obj.get('reuse_generate_spdx_report', False)
- if generate_spdx:
- contents = generate_spdx_report(self.srcdir)
- spdx_path = (self.srcdir / 'report.spdx').resolve()
- spdx_ref = FileRef(spdx_path, contents)
-
- spdx_ref.include_in_zipfile = False
- self.files_by_path[spdx_path] = spdx_ref
-
- self.copyright_file_refs = \
- [self._process_file(f['file']) for f in index_obj['copyright']]
-
- if generate_spdx and not spdx_ref.include_in_distribution:
- raise FileReferenceError("Told to generate 'report.spdx' but 'report.spdx' is not listed among copyright files. Refusing to proceed.")
-
- item_refs = [self._process_item(d) for d in index_obj['definitions']]
-
- for file_ref in index_obj.get('additional_files', []):
- self._process_file(file_ref['file'], include_in_distribution=False)
-
- root_dir_path = Path(self.source_name)
-
- source_archives_obj = {
- 'zip' : {
- 'sha256': self._prepare_source_package_zip(root_dir_path)
- }
- }
-
- self.source_description = {
- 'api_schema_version': [1, 0, 1],
- 'source_name': self.source_name,
- 'source_copyright': self.copyright_file_refs,
- 'upstream_url': index_obj['upstream_url'],
- 'definitions': item_refs,
- 'source_archives': source_archives_obj
- }
-
- if 'comment' in index_obj:
- self.source_description['comment'] = index_obj['comment']
-
- def write_source_package_zip(self, dstpath: Path):
- """
- Create a .zip archive containing files needed to build this source
- package and write it at 'dstpath'.
- """
- with open(dstpath, 'wb') as output:
- output.write(self.source_zip_contents)
-
- def write_package_files(self, dstpath: Path):
- """Write package files under 'dstpath' for distribution."""
- file_dir_path = (dstpath / 'file').resolve()
- file_dir_path.mkdir(parents=True, exist_ok=True)
-
- for file_ref in self.files_by_path.values():
- if file_ref.include_in_distribution:
- file_name = f'sha256-{file_ref.contents_hash}'
- with open(file_dir_path / file_name, 'wb') as output:
- output.write(file_ref.contents)
-
- source_dir_path = (dstpath / 'source').resolve()
- source_dir_path.mkdir(parents=True, exist_ok=True)
- source_name = self.source_description["source_name"]
-
- with open(source_dir_path / f'{source_name}.json', 'wt') as output:
- json.dump(self.source_description, output)
-
- with open(source_dir_path / f'{source_name}.zip', 'wb') as output:
- output.write(self.source_zip_contents)
-
- for item_type, item_list in [
- ('resource', self.resource_list),
- ('mapping', self.mapping_list)
- ]:
- item_type_dir_path = (dstpath / item_type).resolve()
-
- for item_def in item_list:
- item_dir_path = item_type_dir_path / item_def['identifier']
- item_dir_path.mkdir(parents=True, exist_ok=True)
-
- version = '.'.join([str(n) for n in item_def['version']])
- with open(item_dir_path / version, 'wt') as output:
- json.dump(item_def, output)