# SPDX-License-Identifier: GPL-3.0-or-later
# Haketilo proxy data and configuration (import of packages from disk files).
#
# This file is part of Hydrilla&Haketilo.
#
# Copyright (C) 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
....
"""
# Enable using with Python 3.7.
from __future__ import annotations
import io
import hashlib
import dataclasses as dc
import typing as t
from pathlib import Path
import sqlite3
from ...exceptions import HaketiloException
from ...translations import smart_gettext as _
from ... import versions
from ... import item_infos
from . import base
def get_or_make_repo_iteration(cursor: sqlite3.Cursor, repo_name: str) -> int:
cursor.execute(
'''
SELECT
repo_id, next_iteration - 1
FROM
repos
WHERE
name = ?;
''',
(repo_name,)
)
(repo_id, last_iteration), = cursor.fetchall()
cursor.execute(
'''
INSERT OR IGNORE INTO repo_iterations(repo_id, iteration)
VALUES(?, ?);
''',
(repo_id, last_iteration)
)
cursor.execute(
'''
SELECT
repo_iteration_id
FROM
repo_iterations
WHERE
repo_id = ? AND iteration = ?;
''',
(repo_id, last_iteration)
)
(repo_iteration_id,), = cursor.fetchall()
return repo_iteration_id
def get_or_make_item(cursor: sqlite3.Cursor, type: str, identifier: str) -> int:
type_letter = {'resource': 'R', 'mapping': 'M'}[type]
cursor.execute(
'''
INSERT OR IGNORE INTO items(type, identifier)
VALUES(?, ?);
''',
(type_letter, identifier)
)
cursor.execute(
'''
SELECT
item_id
FROM
items
WHERE
type = ? AND identifier = ?;
''',
(type_letter, identifier)
)
(item_id,), = cursor.fetchall()
return item_id
def get_or_make_item_version(
cursor: sqlite3.Cursor,
item_id: int,
repo_iteration_id: int,
version: versions.VerTuple,
definition: str
) -> int:
ver_str = versions.version_string(version)
cursor.execute(
'''
INSERT OR IGNORE INTO item_versions(
item_id,
version,
repo_iteration_id,
definition
)
VALUES(?, ?, ?, ?);
''',
(item_id, ver_str, repo_iteration_id, definition)
)
cursor.execute(
'''
SELECT
item_version_id
FROM
item_versions
WHERE
item_id = ? AND version = ? AND repo_iteration_id = ?;
''',
(item_id, ver_str, repo_iteration_id)
)
(item_version_id,), = cursor.fetchall()
return item_version_id
def make_mapping_status(cursor: sqlite3.Cursor, item_id: int) -> None:
cursor.execute(
'''
INSERT OR IGNORE INTO mapping_statuses(item_id, enabled, frozen)
VALUES(?, 'E', 'R');
''',
(item_id,)
)
def get_or_make_file(cursor: sqlite3.Cursor, sha256: str, file_bytes: bytes) \
-> int:
cursor.execute(
'''
INSERT OR IGNORE INTO files(sha256, data)
VALUES(?, ?)
''',
(sha256, file_bytes)
)
cursor.execute(
'''
SELECT
file_id
FROM
files
WHERE
sha256 = ?;
''',
(sha256,)
)
(file_id,), = cursor.fetchall()
return file_id
def make_file_use(
cursor: sqlite3.Cursor,
item_version_id: int,
file_id: int,
name: str,
type: str,
mime_type: str,
idx: int
) -> None:
cursor.execute(
'''
INSERT OR IGNORE INTO file_uses(
item_version_id,
file_id,
name,
type,
mime_type,
idx
)
VALUES(?, ?, ?, ?, ?, ?);
''',
(item_version_id, file_id, name, type, mime_type, idx)
)
@dc.dataclass(frozen=True)
class _FileInfo:
id: int
is_ascii: bool
def _add_item(
cursor: sqlite3.Cursor,
files_by_sha256_path: Path,
info: item_infos.AnyInfo,
definition: str
) -> None:
repo_iteration_id = get_or_make_repo_iteration(cursor, '')
item_id = get_or_make_item(cursor, info.type_name, info.identifier)
item_version_id = get_or_make_item_version(
cursor,
item_id,
repo_iteration_id,
info.version,
definition
)
if isinstance(info, item_infos.MappingInfo):
make_mapping_status(cursor, item_id)
file_infos = {}
file_specifiers = [*info.source_copyright]
if isinstance(info, item_infos.ResourceInfo):
file_specifiers.extend(info.scripts)
for file_spec in file_specifiers:
file_path = files_by_sha256_path / file_spec.sha256
if not file_path.is_file():
fmt = _('err.proxy.file_missing_{item_identifier}_{file_name}_{sha256}')
msg = fmt.format(
item_identifier = info.identifier,
file_name = file_spec.name,
sha256 = file_spec.sha256
)
raise HaketiloException(msg)
file_bytes = file_path.read_bytes()
sha256 = hashlib.sha256(file_bytes).digest().hex()
if sha256 != file_spec.sha256:
fmt = _('err.proxy.file_hash_mismatched_{item_identifier}_{file_name}_{expected_sha256}_{actual_sha256}')
msg = fmt.format(
item_identifier = info.identifier,
file_name = file_spec.name,
expected_sha256 = file_spec.sha256,
actual_sha256 = sha256
)
raise HaketiloException(msg)
file_id = get_or_make_file(cursor, sha256, file_bytes)
file_infos[sha256] = _FileInfo(file_id, file_bytes.isascii())
for idx, file_spec in enumerate(info.source_copyright):
file_info = file_infos[file_spec.sha256]
if file_info.is_ascii:
mime = 'text/plain'
else:
mime = 'application/octet-stream'
make_file_use(
cursor,
item_version_id = item_version_id,
file_id = file_info.id,
name = file_spec.name,
type = 'L',
mime_type = mime,
idx = idx
)
if isinstance(info, item_infos.MappingInfo):
return
for idx, file_spec in enumerate(info.scripts):
file_info = file_infos[file_spec.sha256]
make_file_use(
cursor,
item_version_id = item_version_id,
file_id = file_info.id,
name = file_spec.name,
type = 'W',
mime_type = 'application/javascript',
idx = idx
)
AnyInfoVar = t.TypeVar(
'AnyInfoVar',
item_infos.ResourceInfo,
item_infos.MappingInfo
)
def _read_items(malcontent_path: Path, item_class: t.Type[AnyInfoVar]) \
-> t.Iterator[tuple[AnyInfoVar, str]]:
item_type_path = malcontent_path / item_class.type_name
if not item_type_path.is_dir():
return
for item_path in item_type_path.iterdir():
if not item_path.is_dir():
continue
for item_version_path in item_path.iterdir():
definition = item_version_path.read_text()
item_info = item_class.load(io.StringIO(definition))
assert item_info.identifier == item_path.name
assert versions.version_string(item_info.version) == \
item_version_path.name
yield item_info, definition
def load_packages(
state: base.HaketiloStateWithFields,
cursor: sqlite3.Cursor,
malcontent_path: Path
) -> None:
files_by_sha256_path = malcontent_path / 'file' / 'sha256'
for info_type in [item_infos.ResourceInfo, item_infos.MappingInfo]:
info: item_infos.AnyInfo
for info, definition in _read_items(
malcontent_path,
info_type # type: ignore
):
_add_item(cursor, files_by_sha256_path, info, definition)