# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy data and configuration (import of packages from disk files). # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ .... """ # Enable using with Python 3.7. from __future__ import annotations import io import hashlib import dataclasses as dc import typing as t from pathlib import Path from abc import ABC, abstractmethod import sqlite3 from ....exceptions import HaketiloException from ....translations import smart_gettext as _ from .... import versions from .... import item_infos def make_repo_iteration(cursor: sqlite3.Cursor, repo_id: int) -> int: cursor.execute( ''' SELECT next_iteration FROM repos WHERE repo_id = ?; ''', (repo_id,) ) (next_iteration,), = cursor.fetchall() cursor.execute( ''' UPDATE repos SET next_iteration = ? WHERE repo_id = ?; ''', (next_iteration + 1, repo_id) ) cursor.execute( ''' INSERT INTO repo_iterations(repo_id, iteration) VALUES(?, ?); ''', (repo_id, next_iteration) ) cursor.execute( ''' SELECT repo_iteration_id FROM repo_iterations WHERE repo_id = ? AND iteration = ?; ''', (repo_id, next_iteration) ) (repo_iteration_id,), = cursor.fetchall() return repo_iteration_id def get_or_make_item(cursor: sqlite3.Cursor, type: str, identifier: str) -> int: type_letter = {'resource': 'R', 'mapping': 'M'}[type] cursor.execute( ''' INSERT OR IGNORE INTO items(type, identifier) VALUES(?, ?); ''', (type_letter, identifier) ) cursor.execute( ''' SELECT item_id FROM items WHERE type = ? AND identifier = ?; ''', (type_letter, identifier) ) (item_id,), = cursor.fetchall() return item_id def get_or_make_item_version( cursor: sqlite3.Cursor, item_id: int, repo_iteration_id: int, version: versions.VerTuple, definition: bytes ) -> int: ver_str = versions.version_string(version) cursor.execute( ''' INSERT OR IGNORE INTO item_versions( item_id, version, repo_iteration_id, definition ) VALUES(?, ?, ?, ?); ''', (item_id, ver_str, repo_iteration_id, definition) ) cursor.execute( ''' SELECT item_version_id FROM item_versions WHERE item_id = ? AND version = ? AND repo_iteration_id = ?; ''', (item_id, ver_str, repo_iteration_id) ) (item_version_id,), = cursor.fetchall() return item_version_id def make_mapping_status(cursor: sqlite3.Cursor, item_id: int) -> None: cursor.execute( ''' INSERT OR IGNORE INTO mapping_statuses(item_id, enabled, required) VALUES(?, 'N', FALSE); ''', (item_id,) ) def get_or_make_file(cursor: sqlite3.Cursor, sha256: str, file_bytes: bytes) \ -> int: cursor.execute( ''' INSERT OR IGNORE INTO files(sha256, data) VALUES(?, ?) ''', (sha256, file_bytes) ) cursor.execute( ''' SELECT file_id FROM files WHERE sha256 = ?; ''', (sha256,) ) (file_id,), = cursor.fetchall() return file_id def make_file_use( cursor: sqlite3.Cursor, item_version_id: int, file_id: int, name: str, type: str, mime_type: str, idx: int ) -> None: cursor.execute( ''' INSERT OR IGNORE INTO file_uses( item_version_id, file_id, name, type, mime_type, idx ) VALUES(?, ?, ?, ?, ?, ?); ''', (item_version_id, file_id, name, type, mime_type, idx) ) @dc.dataclass(frozen=True) class _FileInfo: id: int is_ascii: bool class FileResolver(ABC): @abstractmethod def by_sha256(self, sha256: str) -> bytes: ... def _add_item( cursor: sqlite3.Cursor, package_file_resolver: FileResolver, info: item_infos.AnyInfo, definition: bytes, repo_iteration_id: int ) -> None: item_id = get_or_make_item(cursor, info.type_name, info.identifier) item_version_id = get_or_make_item_version( cursor, item_id, repo_iteration_id, info.version, definition ) if isinstance(info, item_infos.MappingInfo): make_mapping_status(cursor, item_id) file_infos = {} file_specifiers = [*info.source_copyright] if isinstance(info, item_infos.ResourceInfo): file_specifiers.extend(info.scripts) for file_spec in file_specifiers: file_bytes = package_file_resolver.by_sha256(file_spec.sha256) sha256 = hashlib.sha256(file_bytes).digest().hex() if sha256 != file_spec.sha256: fmt = _('err.proxy.file_hash_mismatched_{item_identifier}_{file_name}_{expected_sha256}_{actual_sha256}') msg = fmt.format( item_identifier = info.identifier, file_name = file_spec.name, expected_sha256 = file_spec.sha256, actual_sha256 = sha256 ) raise HaketiloException(msg) file_id = get_or_make_file(cursor, sha256, file_bytes) file_infos[sha256] = _FileInfo(file_id, file_bytes.isascii()) for idx, file_spec in enumerate(info.source_copyright): file_info = file_infos[file_spec.sha256] if file_info.is_ascii: mime = 'text/plain' else: mime = 'application/octet-stream' make_file_use( cursor, item_version_id = item_version_id, file_id = file_info.id, name = file_spec.name, type = 'L', mime_type = mime, idx = idx ) if isinstance(info, item_infos.MappingInfo): return for idx, file_spec in enumerate(info.scripts): file_info = file_infos[file_spec.sha256] make_file_use( cursor, item_version_id = item_version_id, file_id = file_info.id, name = file_spec.name, type = 'W', mime_type = 'application/javascript', idx = idx ) AnyInfoVar = t.TypeVar( 'AnyInfoVar', item_infos.ResourceInfo, item_infos.MappingInfo ) def _read_items(malcontent_path: Path, item_class: t.Type[AnyInfoVar]) \ -> t.Iterator[tuple[AnyInfoVar, bytes]]: item_type_path = malcontent_path / item_class.type_name if not item_type_path.is_dir(): return for item_path in item_type_path.iterdir(): if not item_path.is_dir(): continue for item_version_path in item_path.iterdir(): definition = item_version_path.read_bytes() item_info = item_class.load(definition) assert item_info.identifier == item_path.name assert versions.version_string(item_info.version) == \ item_version_path.name yield item_info, definition @dc.dataclass(frozen=True) class MalcontentFileResolver(FileResolver): malcontent_dir_path: Path def by_sha256(self, sha256: str) -> bytes: file_path = self.malcontent_dir_path / 'file' / 'sha256' / sha256 if not file_path.is_file(): fmt = _('err.proxy.file_missing_{sha256}') raise HaketiloException(fmt.format(sha256=sha256)) return file_path.read_bytes() def load_packages( cursor: sqlite3.Cursor, malcontent_path: Path, repo_id: int, package_file_resolver: t.Optional[FileResolver] = None ) -> int: if package_file_resolver is None: package_file_resolver = MalcontentFileResolver(malcontent_path) repo_iteration_id = make_repo_iteration(cursor, repo_id) types: t.Iterable[t.Type[item_infos.AnyInfo]] = \ [item_infos.ResourceInfo, item_infos.MappingInfo] for info_type in types: info: item_infos.AnyInfo for info, definition in _read_items( # type: ignore malcontent_path, info_type ): _add_item( cursor, package_file_resolver, info, definition, repo_iteration_id ) return repo_iteration_id