# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy data and configuration (import of packages from disk files). # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ .... """ # Enable using with Python 3.7. from __future__ import annotations import io import hashlib import dataclasses as dc import typing as t from pathlib import Path import sqlite3 from ...exceptions import HaketiloException from ...translations import smart_gettext as _ from ... import versions from ... import item_infos from . import base def get_or_make_repo_iteration(cursor: sqlite3.Cursor, repo_name: str) -> int: cursor.execute( ''' SELECT repo_id, next_iteration - 1 FROM repos WHERE name = ?; ''', (repo_name,) ) (repo_id, last_iteration), = cursor.fetchall() cursor.execute( ''' INSERT OR IGNORE INTO repo_iterations(repo_id, iteration) VALUES(?, ?); ''', (repo_id, last_iteration) ) cursor.execute( ''' SELECT repo_iteration_id FROM repo_iterations WHERE repo_id = ? AND iteration = ?; ''', (repo_id, last_iteration) ) (repo_iteration_id,), = cursor.fetchall() return repo_iteration_id def get_or_make_item(cursor: sqlite3.Cursor, type: str, identifier: str) -> int: type_letter = {'resource': 'R', 'mapping': 'M'}[type] cursor.execute( ''' INSERT OR IGNORE INTO items(type, identifier) VALUES(?, ?); ''', (type_letter, identifier) ) cursor.execute( ''' SELECT item_id FROM items WHERE type = ? AND identifier = ?; ''', (type_letter, identifier) ) (item_id,), = cursor.fetchall() return item_id def get_or_make_item_version( cursor: sqlite3.Cursor, item_id: int, repo_iteration_id: int, version: versions.VerTuple, definition: str ) -> int: ver_str = versions.version_string(version) cursor.execute( ''' INSERT OR IGNORE INTO item_versions( item_id, version, repo_iteration_id, definition ) VALUES(?, ?, ?, ?); ''', (item_id, ver_str, repo_iteration_id, definition) ) cursor.execute( ''' SELECT item_version_id FROM item_versions WHERE item_id = ? AND version = ? AND repo_iteration_id = ?; ''', (item_id, ver_str, repo_iteration_id) ) (item_version_id,), = cursor.fetchall() return item_version_id def make_mapping_status(cursor: sqlite3.Cursor, item_id: int) -> None: cursor.execute( ''' INSERT OR IGNORE INTO mapping_statuses(item_id, enabled, frozen) VALUES(?, 'E', 'R'); ''', (item_id,) ) def get_or_make_file(cursor: sqlite3.Cursor, sha256: str, file_bytes: bytes) \ -> int: cursor.execute( ''' INSERT OR IGNORE INTO files(sha256, data) VALUES(?, ?) ''', (sha256, file_bytes) ) cursor.execute( ''' SELECT file_id FROM files WHERE sha256 = ?; ''', (sha256,) ) (file_id,), = cursor.fetchall() return file_id def make_file_use( cursor: sqlite3.Cursor, item_version_id: int, file_id: int, name: str, type: str, mime_type: str, idx: int ) -> None: cursor.execute( ''' INSERT OR IGNORE INTO file_uses( item_version_id, file_id, name, type, mime_type, idx ) VALUES(?, ?, ?, ?, ?, ?); ''', (item_version_id, file_id, name, type, mime_type, idx) ) @dc.dataclass(frozen=True) class _FileInfo: id: int is_ascii: bool def _add_item( cursor: sqlite3.Cursor, files_by_sha256_path: Path, info: item_infos.AnyInfo, definition: str ) -> None: repo_iteration_id = get_or_make_repo_iteration(cursor, '') item_id = get_or_make_item(cursor, info.type_name, info.identifier) item_version_id = get_or_make_item_version( cursor, item_id, repo_iteration_id, info.version, definition ) if isinstance(info, item_infos.MappingInfo): make_mapping_status(cursor, item_id) file_infos = {} file_specifiers = [*info.source_copyright] if isinstance(info, item_infos.ResourceInfo): file_specifiers.extend(info.scripts) for file_spec in file_specifiers: file_path = files_by_sha256_path / file_spec.sha256 if not file_path.is_file(): fmt = _('err.proxy.file_missing_{item_identifier}_{file_name}_{sha256}') msg = fmt.format( item_identifier = info.identifier, file_name = file_spec.name, sha256 = file_spec.sha256 ) raise HaketiloException(msg) file_bytes = file_path.read_bytes() sha256 = hashlib.sha256(file_bytes).digest().hex() if sha256 != file_spec.sha256: fmt = _('err.proxy.file_hash_mismatched_{item_identifier}_{file_name}_{expected_sha256}_{actual_sha256}') msg = fmt.format( item_identifier = info.identifier, file_name = file_spec.name, expected_sha256 = file_spec.sha256, actual_sha256 = sha256 ) raise HaketiloException(msg) file_id = get_or_make_file(cursor, sha256, file_bytes) file_infos[sha256] = _FileInfo(file_id, file_bytes.isascii()) for idx, file_spec in enumerate(info.source_copyright): file_info = file_infos[file_spec.sha256] if file_info.is_ascii: mime = 'text/plain' else: mime = 'application/octet-stream' make_file_use( cursor, item_version_id = item_version_id, file_id = file_info.id, name = file_spec.name, type = 'L', mime_type = mime, idx = idx ) if isinstance(info, item_infos.MappingInfo): return for idx, file_spec in enumerate(info.scripts): file_info = file_infos[file_spec.sha256] make_file_use( cursor, item_version_id = item_version_id, file_id = file_info.id, name = file_spec.name, type = 'W', mime_type = 'application/javascript', idx = idx ) AnyInfoVar = t.TypeVar( 'AnyInfoVar', item_infos.ResourceInfo, item_infos.MappingInfo ) def _read_items(malcontent_path: Path, item_class: t.Type[AnyInfoVar]) \ -> t.Iterator[tuple[AnyInfoVar, str]]: item_type_path = malcontent_path / item_class.type_name if not item_type_path.is_dir(): return for item_path in item_type_path.iterdir(): if not item_path.is_dir(): continue for item_version_path in item_path.iterdir(): definition = item_version_path.read_text() item_info = item_class.load(io.StringIO(definition)) assert item_info.identifier == item_path.name assert versions.version_string(item_info.version) == \ item_version_path.name yield item_info, definition def load_packages( state: base.HaketiloStateWithFields, cursor: sqlite3.Cursor, malcontent_path: Path ) -> None: files_by_sha256_path = malcontent_path / 'file' / 'sha256' for info_type in [item_infos.ResourceInfo, item_infos.MappingInfo]: info: item_infos.AnyInfo for info, definition in _read_items( malcontent_path, info_type # type: ignore ): _add_item(cursor, files_by_sha256_path, info, definition)