From 7fc6312d6df526b8eb49288aecf88d04668e7c45 Mon Sep 17 00:00:00 2001 From: Wojtek Kosior Date: Mon, 29 Aug 2022 13:05:35 +0200 Subject: [proxy] make it possible to also view and install/uninstall libraries (resources) through the web UI --- src/hydrilla/proxy/state_impl/items.py | 423 +++++++++++++++++++++++++++++++++ 1 file changed, 423 insertions(+) create mode 100644 src/hydrilla/proxy/state_impl/items.py (limited to 'src/hydrilla/proxy/state_impl/items.py') diff --git a/src/hydrilla/proxy/state_impl/items.py b/src/hydrilla/proxy/state_impl/items.py new file mode 100644 index 0000000..b538dc5 --- /dev/null +++ b/src/hydrilla/proxy/state_impl/items.py @@ -0,0 +1,423 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Haketilo proxy data and configuration (ResourceStore and MappingStore +# implementations). +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +This module provides an interface to interact with mappings, and resources +inside Haketilo. +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import sqlite3 +import typing as t +import dataclasses as dc + +from ... import item_infos +from .. import state as st +from . import base + +def _set_installed_status(cursor: sqlite3.Cursor, id: str, new_status: str) \ + -> None: + cursor.execute( + 'UPDATE item_versions SET installed = ? WHERE item_version_id = ?;', + (new_status, id) + ) + +def _get_statuses(cursor: sqlite3.Cursor, id: str) -> t.Tuple[str, str]: + cursor.execute( + ''' + SELECT + installed, active + FROM + item_versions + WHERE + item_version_id = ?; + ''', + (id,) + ) + + rows = cursor.fetchall() + + if rows == []: + raise st.MissingItemError() + + (installed_status, active_status), = rows + + return installed_status, active_status + +VersionRefVar = t.TypeVar( + 'VersionRefVar', + 'ConcreteResourceVersionRef', + 'ConcreteMappingVersionRef' +) + +def _install_version(ref: VersionRefVar) -> None: + with ref.state.cursor(transaction=True) as cursor: + installed_status, _ = _get_statuses(cursor, ref.id) + + if installed_status == 'I': + return + + _set_installed_status(cursor, ref.id, 'I') + + ref.state.pull_missing_files() + +def _uninstall_version(ref: VersionRefVar) -> t.Optional[VersionRefVar]: + with ref.state.cursor(transaction=True) as cursor: + installed_status, active_status = _get_statuses(cursor, ref.id) + + if installed_status == 'N': + return ref + + _set_installed_status(cursor, ref.id, 'N') + + ref.state.prune_orphans() + + if active_status == 'R': + ref.state.recompute_dependencies() + + cursor.execute( + 'SELECT COUNT(*) FROM item_versions WHERE item_version_id = ?;', + (ref.id,) + ) + + (version_still_present,), = cursor.fetchall() + return ref if version_still_present else None + + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ConcreteMappingRef(st.MappingRef): + state: base.HaketiloStateWithFields = dc.field(hash=False, compare=False) + + def get_version_display_infos(self) \ + -> t.Sequence[st.MappingVersionDisplayInfo]: + with self.state.cursor() as cursor: + cursor.execute( + ''' + SELECT + ive.item_version_id, + ive.definition, + ive.repo, + ive.repo_iteration, + ive.installed, + ive.active, + ive.is_orphan, + ive.is_local, + ms.enabled + FROM + item_versions_extra AS ive + JOIN mapping_statuses AS ms USING (item_id) + WHERE + ive.item_id = ?; + ''', + (self.id,) + ) + + rows = cursor.fetchall() + + if rows == []: + raise st.MissingItemError() + + result = [] + + for (item_version_id, definition, repo, repo_iteration, + installed_status, active_status, is_orphan, is_local, + enabled_status) in rows: + ref = ConcreteMappingVersionRef(str(item_version_id), self.state) + + item_info = item_infos.MappingInfo.load( + definition, + repo, + repo_iteration + ) + + display_info = st.MappingVersionDisplayInfo( + ref = ref, + info = item_info, + installed = st.InstalledStatus(installed_status), + active = st.ActiveStatus(active_status), + is_orphan = is_orphan, + is_local = is_local, + mapping_enabled = st.EnabledStatus(enabled_status) + ) + result.append(display_info) + + return sorted(result, key=(lambda di: di.info)) + + +@dc.dataclass(frozen=True) +class ConcreteMappingStore(st.MappingStore): + state: base.HaketiloStateWithFields + + def get(self, id: str) -> st.MappingRef: + return ConcreteMappingRef(str(int(id)), self.state) + + def get_display_infos(self) -> t.Sequence[st.MappingDisplayInfo]: + with self.state.cursor() as cursor: + cursor.execute( + ''' + WITH available_item_ids AS ( + SELECT DISTINCT item_id FROM item_versions + ) + SELECT + i.item_id, + i.identifier, + ive.item_version_id, + ive.definition, + ive.repo, + ive.repo_iteration, + ive.installed, + ive.active, + ive.is_orphan, + ive.is_local, + ms.enabled + FROM + items AS i + JOIN mapping_statuses AS ms + USING (item_id) + LEFT JOIN item_versions_extra AS ive + ON ms.active_version_id = ive.item_version_id + WHERE + i.item_id IN available_item_ids; + ''' + ) + + rows = cursor.fetchall() + + result = [] + + for (item_id, identifier, item_version_id, definition, repo, + repo_iteration, installed_status, active_status, is_orphan, + is_local, enabled_status) in rows: + ref = ConcreteMappingRef(str(item_id), self.state) + + active_version: t.Optional[st.MappingVersionDisplayInfo] = None + + if item_version_id is not None: + active_version_ref = ConcreteMappingVersionRef( + id = str(item_version_id), + state = self.state + ) + + active_version_info = item_infos.MappingInfo.load( + definition, + repo, + repo_iteration + ) + + active_version = st.MappingVersionDisplayInfo( + ref = active_version_ref, + info = active_version_info, + installed = st.InstalledStatus(installed_status), + active = st.ActiveStatus(active_status), + is_orphan = is_orphan, + is_local = is_local, + mapping_enabled = st.EnabledStatus(enabled_status) + ) + + + display_info = st.MappingDisplayInfo( + ref = ref, + identifier = identifier, + enabled = st.EnabledStatus(enabled_status), + active_version = active_version + ) + + result.append(display_info) + + return sorted(result, key=(lambda di: di.identifier)) + + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ConcreteMappingVersionRef(st.MappingVersionRef): + state: base.HaketiloStateWithFields + + def install(self) -> None: + return _install_version(self) + + def uninstall(self) -> t.Optional['ConcreteMappingVersionRef']: + return _uninstall_version(self) + + def get_all_version_display_infos(self) \ + -> t.Sequence[st.MappingVersionDisplayInfo]: + with self.state.cursor() as cursor: + cursor.execute( + ''' + SELECT + item_id + FROM + item_versions + WHERE + item_version_id = ?; + ''', + (self.id,) + ) + + rows = cursor.fetchall() + if rows == []: + raise st.MissingItemError() + + (mapping_id,), = rows + + mapping_ref = ConcreteMappingRef(str(mapping_id), self.state) + + return mapping_ref.get_version_display_infos() + + +@dc.dataclass(frozen=True) +class ConcreteMappingVersionStore(st.MappingVersionStore): + state: base.HaketiloStateWithFields + + def get(self, id: str) -> st.MappingVersionRef: + return ConcreteMappingVersionRef(str(int(id)), self.state) + + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ConcreteResourceRef(st.ResourceRef): + state: base.HaketiloStateWithFields = dc.field(hash=False, compare=False) + + def get_version_display_infos(self) \ + -> t.Sequence[st.ResourceVersionDisplayInfo]: + with self.state.cursor() as cursor: + cursor.execute( + ''' + SELECT + ive.item_version_id, + ive.definition, + ive.repo, + ive.repo_iteration, + ive.installed, + ive.active, + ive.is_orphan, + ive.is_local + FROM + item_versions_extra AS ive + JOIN items AS i USING (item_id) + WHERE + i.type = 'R' AND ive.item_id = ?; + ''', + (self.id,) + ) + + rows = cursor.fetchall() + + if rows == []: + raise st.MissingItemError() + + result = [] + + for (item_version_id, definition, repo, repo_iteration, + installed_status, active_status, is_orphan, is_local) in rows: + ref = ConcreteResourceVersionRef(str(item_version_id), self.state) + + item_info = item_infos.ResourceInfo.load( + definition, + repo, + repo_iteration + ) + + display_info = st.ResourceVersionDisplayInfo( + ref = ref, + info = item_info, + installed = st.InstalledStatus(installed_status), + active = st.ActiveStatus(active_status), + is_orphan = is_orphan, + is_local = is_local + ) + result.append(display_info) + + return sorted(result, key=(lambda di: di.info)) + + +@dc.dataclass(frozen=True) +class ConcreteResourceStore(st.ResourceStore): + state: base.HaketiloStateWithFields + + def get(self, id: str) -> st.ResourceRef: + return ConcreteResourceRef(str(int(id)), self.state) + + def get_display_infos(self) -> t.Sequence[st.ResourceDisplayInfo]: + with self.state.cursor() as cursor: + cursor.execute( + "SELECT item_id, identifier FROM items WHERE type = 'R';" + ) + + rows = cursor.fetchall() + + result = [] + + for item_id, identifier in rows: + ref = ConcreteResourceRef(str(item_id), self.state) + + result.append(st.ResourceDisplayInfo(ref, identifier)) + + return sorted(result, key=(lambda di: di.identifier)) + + +@dc.dataclass(frozen=True, unsafe_hash=True) +class ConcreteResourceVersionRef(st.ResourceVersionRef): + state: base.HaketiloStateWithFields + + def install(self) -> None: + return _install_version(self) + + def uninstall(self) -> t.Optional['ConcreteResourceVersionRef']: + return _uninstall_version(self) + + def get_all_version_display_infos(self) \ + -> t.Sequence[st.ResourceVersionDisplayInfo]: + with self.state.cursor() as cursor: + cursor.execute( + ''' + SELECT + item_id + FROM + item_versions + WHERE + item_version_id = ?; + ''', + (self.id,) + ) + + rows = cursor.fetchall() + if rows == []: + raise st.MissingItemError() + + (resource_id,), = rows + + resource_ref = ConcreteResourceRef(str(resource_id), self.state) + + return resource_ref.get_version_display_infos() + + +@dc.dataclass(frozen=True) +class ConcreteResourceVersionStore(st.ResourceVersionStore): + state: base.HaketiloStateWithFields + + def get(self, id: str) -> st.ResourceVersionRef: + return ConcreteResourceVersionRef(str(int(id)), self.state) -- cgit v1.2.3