# SPDX-License-Identifier: GPL-3.0-or-later # Proxy web UI packages loading. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use of this # code in a proprietary program, I am not going to enforce this in # court. """ ..... """ # Enable using with Python 3.7. from __future__ import annotations import tempfile import zipfile import typing as t from pathlib import Path import flask import werkzeug from ...exceptions import HaketiloException from ...translations import smart_gettext as _ from ... import item_infos from .. import state as st from . import _app bp = flask.Blueprint('items', __package__) @bp.route('/load_from_disk', methods=['GET']) def load_from_disk(errors: t.Mapping[str, bool] = {}) -> werkzeug.Response: html = flask.render_template('items/load_from_disk.html.jinja', **errors) return flask.make_response(html, 200) @bp.route('/load_from_disk', methods=['POST']) def load_from_disk_post() -> werkzeug.Response: zip_file_storage = flask.request.files.get('items_zipfile') if zip_file_storage is None: return load_from_disk() with tempfile.TemporaryDirectory() as tmpdir_str: tmpdir = Path(tmpdir_str) tmpdir_child = tmpdir / 'childdir' tmpdir_child.mkdir() try: with zipfile.ZipFile(zip_file_storage) as zip_file: zip_file.extractall(tmpdir_child) except: return load_from_disk({'uploaded_file_not_zip': True}) extracted_top_level_files = tuple(tmpdir_child.iterdir()) if extracted_top_level_files == (): return load_from_disk({'invalid_uploaded_malcontent': True}) if len(extracted_top_level_files) == 1 and \ extracted_top_level_files[0].is_dir(): malcontent_dir_path = extracted_top_level_files[0] else: malcontent_dir_path = tmpdir_child try: _app.get_haketilo_state().import_items(malcontent_dir_path) except: return load_from_disk({'invalid_uploaded_malcontent': True}) return flask.redirect(flask.url_for('.packages')) @bp.route('/packages') def packages() -> werkzeug.Response: store = _app.get_haketilo_state().mapping_store() html = flask.render_template( 'items/packages.html.jinja', display_infos = store.get_display_infos() ) return flask.make_response(html, 200) @bp.route('/libraries') def libraries() -> werkzeug.Response: store = _app.get_haketilo_state().resource_store() html = flask.render_template( 'items/libraries.html.jinja', display_infos = store.get_display_infos() ) return flask.make_response(html, 200) def item_store(state: st.HaketiloState, item_type: item_infos.ItemType) \ -> t.Union[st.MappingStore, st.ResourceStore]: if item_type == item_infos.ItemType.RESOURCE: return state.resource_store() else: return state.mapping_store() def show_item(item_id: str, item_type: item_infos.ItemType) \ -> werkzeug.Response: try: store = item_store(_app.get_haketilo_state(), item_type) display_info = store.get(str(item_id)).get_display_info() html = flask.render_template( f'items/{item_type.alt_name}_view.html.jinja', display_info = display_info ) return flask.make_response(html, 200) except st.MissingItemError: flask.abort(404) @bp.route('/libraries/view/') def show_library(item_id: str) -> werkzeug.Response: return show_item(item_id, item_infos.ItemType.RESOURCE) @bp.route('/packages/view/') def show_package(item_id: str) -> werkzeug.Response: return show_item(item_id, item_infos.ItemType.MAPPING) ItemVersionDisplayInfo = t.Union[ st.MappingVersionDisplayInfo, st.ResourceVersionDisplayInfo ] def item_version_store( state: st.HaketiloState, item_type: item_infos.ItemType ) -> t.Union[st.MappingVersionStore, st.ResourceVersionStore]: if item_type == item_infos.ItemType.RESOURCE: return state.resource_version_store() else: return state.mapping_version_store() def show_item_version( item_version_id: str, item_type: item_infos.ItemType, errors: t.Mapping[str, bool] = {} ) -> werkzeug.Response: try: store = item_version_store(_app.get_haketilo_state(), item_type) version_ref = store.get(item_version_id) display_info = version_ref.get_item_display_info() this_info: t.Optional[ItemVersionDisplayInfo] = None for info in display_info.all_versions: if info.ref == version_ref: this_info = info assert this_info is not None html = flask.render_template( f'items/{item_type.alt_name}_viewversion.html.jinja', display_info = display_info, version_display_info = this_info, **errors ) return flask.make_response(html, 200) except st.MissingItemError: flask.abort(404) @bp.route('/libraries/viewversion/') def show_library_version(item_version_id: str) -> werkzeug.Response: return show_item_version(item_version_id, item_infos.ItemType.RESOURCE) @bp.route('/packages/viewversion/') def show_package_version(item_version_id: str) -> werkzeug.Response: return show_item_version(item_version_id, item_infos.ItemType.MAPPING) def alter_item_version(item_version_id: str, item_type: item_infos.ItemType) \ -> werkzeug.Response: form_data = flask.request.form action = form_data['action'] try: store = item_version_store(_app.get_haketilo_state(), item_type) item_version_ref = store.get(item_version_id) if action == 'disable_item': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status(st.EnabledStatus.DISABLED) elif action == 'unenable_item': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status(st.EnabledStatus.NO_MARK) elif action == 'enable_item_version': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.EXACT_VERSION, ) elif action == 'unfreeze_item': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.NOT_FROZEN, ) elif action == 'freeze_to_repo': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.REPOSITORY, ) elif action == 'freeze_to_version': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.EXACT_VERSION, ) elif action == 'install_item_version': item_version_ref.install() elif action == 'uninstall_item_version': item_version_ref_after = item_version_ref.uninstall() if item_version_ref_after is None: url = flask.url_for(f'.{item_type.alt_name_plural}') return flask.redirect(url) else: return show_item_version(item_version_id, item_type) else: raise ValueError() except st.RepoCommunicationError: return show_item_version( item_version_id = item_version_id, item_type = item_type, errors = {'repo_communication_error': True} ) except st.FileInstallationError: return show_item_version( item_version_id = item_version_id, item_type = item_type, errors = {'file_installation_error': True} ) except st.ImpossibleSituation: return show_item_version( item_version_id = item_version_id, item_type = item_type, errors = {'uninstall_disallowed': True} ) except st.MissingItemError: flask.abort(404) return flask.redirect( flask.url_for( f'.show_{item_type.alt_name}_version', item_version_id = item_version_id ) ) @bp.route('/libraries/viewversion/', methods=['POST']) def alter_library_version(item_version_id: str) -> werkzeug.Response: return alter_item_version(item_version_id, item_infos.ItemType.RESOURCE) @bp.route('/packages/viewversion/', methods=['POST']) def alter_package_version(item_version_id: str) -> werkzeug.Response: return alter_item_version(item_version_id, item_infos.ItemType.MAPPING)