# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy data and configuration (ResourceStore and MappingStore # implementations). # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ This module provides an interface to interact with mappings, and resources inside Haketilo. """ # Enable using with Python 3.7. from __future__ import annotations import sqlite3 import typing as t import dataclasses as dc from ... import item_infos from .. import state as st from . import base def _set_installed_status(cursor: sqlite3.Cursor, id: str, new_status: str) \ -> None: cursor.execute( 'UPDATE item_versions SET installed = ? WHERE item_version_id = ?;', (new_status, id) ) def _get_statuses(cursor: sqlite3.Cursor, id: str) -> t.Tuple[str, str]: cursor.execute( ''' SELECT installed, active FROM item_versions WHERE item_version_id = ?; ''', (id,) ) rows = cursor.fetchall() if rows == []: raise st.MissingItemError() (installed_status, active_status), = rows return installed_status, active_status VersionRefVar = t.TypeVar( 'VersionRefVar', 'ConcreteResourceVersionRef', 'ConcreteMappingVersionRef' ) def _install_version(ref: VersionRefVar) -> None: with ref.state.cursor(transaction=True) as cursor: installed_status, _ = _get_statuses(cursor, ref.id) if installed_status == 'I': return _set_installed_status(cursor, ref.id, 'I') ref.state.pull_missing_files() def _uninstall_version(ref: VersionRefVar) -> t.Optional[VersionRefVar]: with ref.state.cursor(transaction=True) as cursor: installed_status, active_status = _get_statuses(cursor, ref.id) if installed_status == 'N': return ref _set_installed_status(cursor, ref.id, 'N') ref.state.prune_orphans() if active_status == 'R': ref.state.recompute_dependencies() cursor.execute( 'SELECT COUNT(*) FROM item_versions WHERE item_version_id = ?;', (ref.id,) ) (version_still_present,), = cursor.fetchall() return ref if version_still_present else None @dc.dataclass(frozen=True, unsafe_hash=True) class ConcreteMappingRef(st.MappingRef): state: base.HaketiloStateWithFields = dc.field(hash=False, compare=False) def get_version_display_infos(self) \ -> t.Sequence[st.MappingVersionDisplayInfo]: with self.state.cursor() as cursor: cursor.execute( ''' SELECT ive.item_version_id, ive.definition, ive.repo, ive.repo_iteration, ive.installed, ive.active, ive.is_orphan, ive.is_local, ms.frozen, ms.enabled FROM item_versions_extra AS ive JOIN mapping_statuses AS ms USING (item_id) WHERE ive.item_id = ?; ''', (self.id,) ) rows = cursor.fetchall() if rows == []: raise st.MissingItemError() result = [] for (item_version_id, definition, repo, repo_iteration, installed_status, active_status, is_orphan, is_local, frozen_status, enabled_status) in rows: ref = ConcreteMappingVersionRef(str(item_version_id), self.state) item_info = item_infos.MappingInfo.load( definition, repo, repo_iteration ) display_info = st.MappingVersionDisplayInfo( ref = ref, info = item_info, installed = st.InstalledStatus(installed_status), active = st.ActiveStatus(active_status), is_orphan = is_orphan, is_local = is_local, mapping_enabled = st.EnabledStatus(enabled_status), mapping_frozen = st.FrozenStatus.make(frozen_status) ) result.append(display_info) return sorted(result, key=(lambda di: di.info)) @dc.dataclass(frozen=True) class ConcreteMappingStore(st.MappingStore): state: base.HaketiloStateWithFields def get(self, id: str) -> st.MappingRef: return ConcreteMappingRef(str(int(id)), self.state) def get_display_infos(self) -> t.Sequence[st.MappingDisplayInfo]: with self.state.cursor() as cursor: cursor.execute( ''' WITH available_item_ids AS ( SELECT DISTINCT item_id FROM item_versions ) SELECT i.item_id, i.identifier, ive.item_version_id, ive.definition, ive.repo, ive.repo_iteration, ive.installed, ive.active, ive.is_orphan, ive.is_local, ms.enabled, ms.frozen FROM items AS i JOIN mapping_statuses AS ms USING (item_id) LEFT JOIN item_versions_extra AS ive ON ms.active_version_id = ive.item_version_id WHERE i.item_id IN available_item_ids; ''' ) rows = cursor.fetchall() result = [] for (item_id, identifier, item_version_id, definition, repo, repo_iteration, installed_status, active_status, is_orphan, is_local, enabled_status, frozen_status) in rows: ref = ConcreteMappingRef(str(item_id), self.state) active_version: t.Optional[st.MappingVersionDisplayInfo] = None if item_version_id is not None: active_version_ref = ConcreteMappingVersionRef( id = str(item_version_id), state = self.state ) active_version_info = item_infos.MappingInfo.load( definition, repo, repo_iteration ) active_version = st.MappingVersionDisplayInfo( ref = active_version_ref, info = active_version_info, installed = st.InstalledStatus(installed_status), active = st.ActiveStatus(active_status), is_orphan = is_orphan, is_local = is_local, mapping_enabled = st.EnabledStatus(enabled_status), mapping_frozen = st.FrozenStatus.make(frozen_status) ) display_info = st.MappingDisplayInfo( ref = ref, identifier = identifier, enabled = st.EnabledStatus(enabled_status), frozen = st.FrozenStatus.make(frozen_status), active_version = active_version ) result.append(display_info) return sorted(result, key=(lambda di: di.identifier)) @dc.dataclass(frozen=True, unsafe_hash=True) class ConcreteMappingVersionRef(st.MappingVersionRef): state: base.HaketiloStateWithFields def install(self) -> None: return _install_version(self) def uninstall(self) -> t.Optional['ConcreteMappingVersionRef']: return _uninstall_version(self) def get_all_version_display_infos(self) \ -> t.Sequence[st.MappingVersionDisplayInfo]: with self.state.cursor() as cursor: cursor.execute( ''' SELECT item_id FROM item_versions WHERE item_version_id = ?; ''', (self.id,) ) rows = cursor.fetchall() if rows == []: raise st.MissingItemError() (mapping_id,), = rows mapping_ref = ConcreteMappingRef(str(mapping_id), self.state) return mapping_ref.get_version_display_infos() @dc.dataclass(frozen=True) class ConcreteMappingVersionStore(st.MappingVersionStore): state: base.HaketiloStateWithFields def get(self, id: str) -> st.MappingVersionRef: return ConcreteMappingVersionRef(str(int(id)), self.state) @dc.dataclass(frozen=True, unsafe_hash=True) class ConcreteResourceRef(st.ResourceRef): state: base.HaketiloStateWithFields = dc.field(hash=False, compare=False) def get_version_display_infos(self) \ -> t.Sequence[st.ResourceVersionDisplayInfo]: with self.state.cursor() as cursor: cursor.execute( ''' SELECT ive.item_version_id, ive.definition, ive.repo, ive.repo_iteration, ive.installed, ive.active, ive.is_orphan, ive.is_local FROM item_versions_extra AS ive JOIN items AS i USING (item_id) WHERE i.type = 'R' AND ive.item_id = ?; ''', (self.id,) ) rows = cursor.fetchall() if rows == []: raise st.MissingItemError() result = [] for (item_version_id, definition, repo, repo_iteration, installed_status, active_status, is_orphan, is_local) in rows: ref = ConcreteResourceVersionRef(str(item_version_id), self.state) item_info = item_infos.ResourceInfo.load( definition, repo, repo_iteration ) display_info = st.ResourceVersionDisplayInfo( ref = ref, info = item_info, installed = st.InstalledStatus(installed_status), active = st.ActiveStatus(active_status), is_orphan = is_orphan, is_local = is_local ) result.append(display_info) return sorted(result, key=(lambda di: di.info)) @dc.dataclass(frozen=True) class ConcreteResourceStore(st.ResourceStore): state: base.HaketiloStateWithFields def get(self, id: str) -> st.ResourceRef: return ConcreteResourceRef(str(int(id)), self.state) def get_display_infos(self) -> t.Sequence[st.ResourceDisplayInfo]: with self.state.cursor() as cursor: cursor.execute( "SELECT item_id, identifier FROM items WHERE type = 'R';" ) rows = cursor.fetchall() result = [] for item_id, identifier in rows: ref = ConcreteResourceRef(str(item_id), self.state) result.append(st.ResourceDisplayInfo(ref, identifier)) return sorted(result, key=(lambda di: di.identifier)) @dc.dataclass(frozen=True, unsafe_hash=True) class ConcreteResourceVersionRef(st.ResourceVersionRef): state: base.HaketiloStateWithFields def install(self) -> None: return _install_version(self) def uninstall(self) -> t.Optional['ConcreteResourceVersionRef']: return _uninstall_version(self) def get_all_version_display_infos(self) \ -> t.Sequence[st.ResourceVersionDisplayInfo]: with self.state.cursor() as cursor: cursor.execute( ''' SELECT item_id FROM item_versions WHERE item_version_id = ?; ''', (self.id,) ) rows = cursor.fetchall() if rows == []: raise st.MissingItemError() (resource_id,), = rows resource_ref = ConcreteResourceRef(str(resource_id), self.state) return resource_ref.get_version_display_infos() @dc.dataclass(frozen=True) class ConcreteResourceVersionStore(st.ResourceVersionStore): state: base.HaketiloStateWithFields def get(self, id: str) -> st.ResourceVersionRef: return ConcreteResourceVersionRef(str(int(id)), self.state)