# SPDX-License-Identifier: GPL-3.0-or-later # Proxy web UI package/library management. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use of this # code in a proprietary program, I am not going to enforce this in # court. """ ..... """ import typing as t from urllib.parse import unquote import flask import werkzeug from ... import item_infos from .. import state as st from . import _app bp = flask.Blueprint('items', __package__) @bp.route('/packages') def packages() -> werkzeug.Response: store = _app.get_haketilo_state().mapping_store() html = flask.render_template( 'items/packages.html.jinja', display_infos = store.get_display_infos() ) return flask.make_response(html, 200) @bp.route('/libraries') def libraries() -> werkzeug.Response: store = _app.get_haketilo_state().resource_store() html = flask.render_template( 'items/libraries.html.jinja', display_infos = store.get_display_infos() ) return flask.make_response(html, 200) def item_store(state: st.HaketiloState, item_type: item_infos.ItemType) \ -> t.Union[st.MappingStore, st.ResourceStore]: if item_type == item_infos.ItemType.RESOURCE: return state.resource_store() else: return state.mapping_store() def show_item( item_id: str, item_type: item_infos.ItemType, errors: t.Mapping[str, bool] = {} ) -> werkzeug.Response: try: store = item_store(_app.get_haketilo_state(), item_type) display_info = store.get(str(item_id)).get_display_info() html = flask.render_template( f'items/{item_type.alt_name}_view.html.jinja', display_info = display_info, **errors ) return flask.make_response(html, 200) except st.MissingItemError: flask.abort(404) @bp.route('/libraries/view/<string:item_id>') def show_library(item_id: str) -> werkzeug.Response: return show_item(item_id, item_infos.ItemType.RESOURCE) @bp.route('/packages/view/<string:item_id>') def show_package(item_id: str) -> werkzeug.Response: return show_item(item_id, item_infos.ItemType.MAPPING) def alter_item(item_id: str, item_type: item_infos.ItemType) \ -> werkzeug.Response: form_data = flask.request.form action = form_data['action'] try: store = item_store(_app.get_haketilo_state(), item_type) item_ref = store.get(item_id) if action == 'disable_item': assert isinstance(item_ref, st.MappingRef) item_ref.update_status(st.EnabledStatus.DISABLED) elif action == 'unenable_item': assert isinstance(item_ref, st.MappingRef) item_ref.update_status(st.EnabledStatus.NO_MARK) elif action in ('enable_item', 'unfreeze_item'): assert isinstance(item_ref, st.MappingRef) item_ref.update_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.NOT_FROZEN, ) elif action == 'freeze_to_repo': assert isinstance(item_ref, st.MappingRef) item_ref.update_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.REPOSITORY, ) elif action == 'freeze_to_version': assert isinstance(item_ref, st.MappingRef) item_ref.update_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.EXACT_VERSION, ) else: raise ValueError() except st.RepoCommunicationError: return show_item(item_id, item_type, {'repo_communication_error': True}) except st.FileInstallationError: return show_item(item_id, item_type, {'file_installation_error': True}) except st.ImpossibleSituation: errors = {'impossible_situation_error': True} return show_item(item_id, item_type, errors) except st.MissingItemError: flask.abort(404) return flask.redirect( flask.url_for(f'.show_{item_type.alt_name}', item_id=item_id) ) @bp.route('/libraries/view/<string:item_id>', methods=['POST']) def alter_library(item_id: str) -> werkzeug.Response: return alter_item(item_id, item_infos.ItemType.RESOURCE) @bp.route('/packages/view/<string:item_id>', methods=['POST']) def alter_package(item_id: str) -> werkzeug.Response: return alter_item(item_id, item_infos.ItemType.MAPPING) ItemVersionDisplayInfo = t.Union[ st.MappingVersionDisplayInfo, st.ResourceVersionDisplayInfo ] def item_version_store( state: st.HaketiloState, item_type: item_infos.ItemType ) -> t.Union[st.MappingVersionStore, st.ResourceVersionStore]: if item_type == item_infos.ItemType.RESOURCE: return state.resource_version_store() else: return state.mapping_version_store() def show_item_version( item_version_id: str, item_type: item_infos.ItemType, errors: t.Mapping[str, bool] = {} ) -> werkzeug.Response: state = _app.get_haketilo_state() try: store = item_version_store(state, item_type) version_ref = store.get(item_version_id) display_info = version_ref.get_item_display_info() this_info: t.Optional[ItemVersionDisplayInfo] = None for info in display_info.all_versions: if info.ref == version_ref: this_info = info assert this_info is not None html = flask.render_template( f'items/{item_type.alt_name}_viewversion.html.jinja', display_info = display_info, version_display_info = this_info, **errors ) return flask.make_response(html, 200) except st.MissingItemError: flask.abort(404) @bp.route('/libraries/viewversion/<string:item_version_id>') def show_library_version(item_version_id: str) -> werkzeug.Response: return show_item_version(item_version_id, item_infos.ItemType.RESOURCE) @bp.route('/packages/viewversion/<string:item_version_id>') def show_package_version(item_version_id: str) -> werkzeug.Response: return show_item_version(item_version_id, item_infos.ItemType.MAPPING) def alter_item_version(item_version_id: str, item_type: item_infos.ItemType) \ -> werkzeug.Response: form_data = flask.request.form action = form_data['action'] try: store = item_version_store(_app.get_haketilo_state(), item_type) item_version_ref = store.get(item_version_id) if action == 'disable_item': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status(st.EnabledStatus.DISABLED) elif action == 'unenable_item': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status(st.EnabledStatus.NO_MARK) elif action in ('enable_item_version', 'freeze_to_version'): assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.EXACT_VERSION, ) elif action == 'unfreeze_item': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.NOT_FROZEN, ) elif action == 'freeze_to_repo': assert isinstance(item_version_ref, st.MappingVersionRef) item_version_ref.update_mapping_status( enabled = st.EnabledStatus.ENABLED, frozen = st.FrozenStatus.REPOSITORY, ) elif action == 'install_item_version': item_version_ref.install() elif action == 'uninstall_item_version': item_version_ref_after = item_version_ref.uninstall() if item_version_ref_after is None: url = flask.url_for(f'.{item_type.alt_name_plural}') return flask.redirect(url) else: return show_item_version(item_version_id, item_type) else: raise ValueError() except st.RepoCommunicationError: return show_item_version( item_version_id = item_version_id, item_type = item_type, errors = {'repo_communication_error': True} ) except st.FileInstallationError: return show_item_version( item_version_id = item_version_id, item_type = item_type, errors = {'file_installation_error': True} ) except st.ImpossibleSituation: return show_item_version( item_version_id = item_version_id, item_type = item_type, errors = {'impossible_situation_error': True} ) except st.MissingItemError: flask.abort(404) return flask.redirect( flask.url_for( f'.show_{item_type.alt_name}_version', item_version_id = item_version_id ) ) @bp.route('/libraries/viewversion/<string:item_version_id>', methods=['POST']) def alter_library_version(item_version_id: str) -> werkzeug.Response: return alter_item_version(item_version_id, item_infos.ItemType.RESOURCE) @bp.route('/packages/viewversion/<string:item_version_id>', methods=['POST']) def alter_package_version(item_version_id: str) -> werkzeug.Response: return alter_item_version(item_version_id, item_infos.ItemType.MAPPING) def show_file( item_version_id: str, item_type: item_infos.ItemType, file_type: str, name: str, ) -> werkzeug.Response: if file_type not in ('license', 'web_resource'): flask.abort(404) try: store = item_version_store(_app.get_haketilo_state(), item_type) item_version_ref = store.get(item_version_id) try: if file_type == 'license': file_data = item_version_ref.get_license_file(name) else: assert isinstance(item_version_ref, st.ResourceVersionRef) file_data = item_version_ref.get_resource_file(name) return werkzeug.Response( file_data.contents, mimetype = file_data.mime_type ) except st.MissingItemError: if file_type == 'license': url = item_version_ref.get_upstream_license_file_url(name) else: assert isinstance(item_version_ref, st.ResourceVersionRef) url = item_version_ref.get_upstream_resource_file_url(name) return flask.redirect(url) except st.MissingItemError: flask.abort(404) @bp.route('/packages/viewversion/<string:item_version_id>/<string:file_type>/<path:name>') def show_mapping_file(item_version_id: str, file_type: str, name: str) \ -> werkzeug.Response: item_type = item_infos.ItemType.MAPPING return show_file(item_version_id, item_type, file_type, name) @bp.route('/libraries/viewversion/<string:item_version_id>/<string:file_type>/<path:name>') def show_resource_file(item_version_id: str, file_type: str, name: str) \ -> werkzeug.Response: item_type = item_infos.ItemType.RESOURCE return show_file(item_version_id, item_type, file_type, name) @bp.route('/libraries/viewdep/<string:item_version_id>/<string:dep_identifier>') def show_library_dep(item_version_id: str, dep_identifier: str) \ -> werkzeug.Response: state = _app.get_haketilo_state() try: store = state.resource_version_store() dep_id = store.get(item_version_id).get_dependency(dep_identifier).id url = flask.url_for('.show_library_version', item_version_id=dep_id) except st.MissingItemError: try: versionless_store = state.resource_store() item_ref = versionless_store.get_by_identifier(dep_identifier) url = flask.url_for('.show_library', item_id=item_ref.id) except st.MissingItemError: flask.abort(404) return flask.redirect(url) @bp.route('/<string:item_type>/viewrequired/<string:item_version_id>/<string:required_identifier>') def show_required_mapping( item_type: str, item_version_id: str, required_identifier: str ) -> werkzeug.Response: state = _app.get_haketilo_state() if item_type not in ('package', 'library'): flask.abort(404) found = False if item_type == 'package': try: ref = state.mapping_version_store().get(item_version_id) mapping_ver_id = ref.get_required_mapping(required_identifier).id url = flask.url_for( '.show_package_version', item_version_id = mapping_ver_id ) found = True except st.MissingItemError: pass if not found: try: versionless_store = state.mapping_store() mapping_ref = versionless_store\ .get_by_identifier(required_identifier) url = flask.url_for('.show_package', item_id=mapping_ref.id) except st.MissingItemError: flask.abort(404) return flask.redirect(url) @bp.route('/package/viewlibrary/<string:item_version_id>/<string:pattern>/<string:lib_identifier>') def show_package_library(item_version_id: str, pattern: str, lib_identifier: str) \ -> werkzeug.Response: state = _app.get_haketilo_state() try: ref = state.mapping_version_store().get(item_version_id) try: resource_ver_ref = \ ref.get_payload_resource(unquote(pattern), lib_identifier) url = flask.url_for( '.show_library_version', item_version_id = resource_ver_ref.id ) except st.MissingItemError: resource_ref = state.resource_store().get_by_identifier( lib_identifier ) url = flask.url_for('.show_library', item_id=resource_ref.id) except st.MissingItemError: flask.abort(404) return flask.redirect(url) @bp.route('/package/viewbypayload/<string:payload_id>/<string:package_identifier>') def show_payload_package(payload_id: str, package_identifier: str) \ -> werkzeug.Response: state = _app.get_haketilo_state() try: ref = state.payload_store().get(payload_id) try: mapping_ver_ref = ref.get_display_info().mapping_info.ref url = flask.url_for( '.show_package_version', item_version_id = mapping_ver_ref.id ) except st.MissingItemError: mapping_ref = state.mapping_store().get_by_identifier( package_identifier ) url = flask.url_for('.show_package', item_id=mapping_ref.id) except st.MissingItemError: flask.abort(404) return flask.redirect(url)