diff options
Diffstat (limited to 'src/hydrilla/proxy/web_ui/items.py')
-rw-r--r-- | src/hydrilla/proxy/web_ui/items.py | 440 |
1 files changed, 440 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/web_ui/items.py b/src/hydrilla/proxy/web_ui/items.py new file mode 100644 index 0000000..d0f0f2e --- /dev/null +++ b/src/hydrilla/proxy/web_ui/items.py @@ -0,0 +1,440 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Proxy web UI package/library management. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this +# code in a proprietary program, I am not going to enforce this in +# court. + +""" +..... +""" + +import typing as t + +from urllib.parse import unquote + +import flask +import werkzeug + +from ... import item_infos +from .. import state as st +from . import _app + + +bp = flask.Blueprint('items', __package__) + +@bp.route('/packages') +def packages() -> werkzeug.Response: + store = _app.get_haketilo_state().mapping_store() + + html = flask.render_template( + 'items/packages.html.jinja', + display_infos = store.get_display_infos() + ) + return flask.make_response(html, 200) + +@bp.route('/libraries') +def libraries() -> werkzeug.Response: + store = _app.get_haketilo_state().resource_store() + + html = flask.render_template( + 'items/libraries.html.jinja', + display_infos = store.get_display_infos() + ) + return flask.make_response(html, 200) + +def item_store(state: st.HaketiloState, item_type: item_infos.ItemType) \ + -> t.Union[st.MappingStore, st.ResourceStore]: + if item_type == item_infos.ItemType.RESOURCE: + return state.resource_store() + else: + return state.mapping_store() + +def show_item( + item_id: str, + item_type: item_infos.ItemType, + errors: t.Mapping[str, bool] = {} +) -> werkzeug.Response: + try: + store = item_store(_app.get_haketilo_state(), item_type) + display_info = store.get(str(item_id)).get_display_info() + + html = flask.render_template( + f'items/{item_type.alt_name}_view.html.jinja', + display_info = display_info, + **errors + ) + return flask.make_response(html, 200) + except st.MissingItemError: + flask.abort(404) + + +@bp.route('/libraries/view/<string:item_id>') +def show_library(item_id: str) -> werkzeug.Response: + return show_item(item_id, item_infos.ItemType.RESOURCE) + +@bp.route('/packages/view/<string:item_id>') +def show_package(item_id: str) -> werkzeug.Response: + return show_item(item_id, item_infos.ItemType.MAPPING) + +def alter_item(item_id: str, item_type: item_infos.ItemType) \ + -> werkzeug.Response: + form_data = flask.request.form + action = form_data['action'] + + try: + store = item_store(_app.get_haketilo_state(), item_type) + item_ref = store.get(item_id) + + if action == 'disable_item': + assert isinstance(item_ref, st.MappingRef) + item_ref.update_status(st.EnabledStatus.DISABLED) + elif action == 'unenable_item': + assert isinstance(item_ref, st.MappingRef) + item_ref.update_status(st.EnabledStatus.NO_MARK) + elif action in ('enable_item', 'unfreeze_item'): + assert isinstance(item_ref, st.MappingRef) + item_ref.update_status( + enabled = st.EnabledStatus.ENABLED, + frozen = st.FrozenStatus.NOT_FROZEN, + ) + elif action == 'freeze_to_repo': + assert isinstance(item_ref, st.MappingRef) + item_ref.update_status( + enabled = st.EnabledStatus.ENABLED, + frozen = st.FrozenStatus.REPOSITORY, + ) + elif action == 'freeze_to_version': + assert isinstance(item_ref, st.MappingRef) + item_ref.update_status( + enabled = st.EnabledStatus.ENABLED, + frozen = st.FrozenStatus.EXACT_VERSION, + ) + else: + raise ValueError() + except st.RepoCommunicationError: + return show_item(item_id, item_type, {'repo_communication_error': True}) + except st.FileInstallationError: + return show_item(item_id, item_type, {'file_installation_error': True}) + except st.ImpossibleSituation: + errors = {'impossible_situation_error': True} + return show_item(item_id, item_type, errors) + except st.MissingItemError: + flask.abort(404) + + return flask.redirect( + flask.url_for(f'.show_{item_type.alt_name}', item_id=item_id) + ) + +@bp.route('/libraries/view/<string:item_id>', methods=['POST']) +def alter_library(item_id: str) -> werkzeug.Response: + return alter_item(item_id, item_infos.ItemType.RESOURCE) + +@bp.route('/packages/view/<string:item_id>', methods=['POST']) +def alter_package(item_id: str) -> werkzeug.Response: + return alter_item(item_id, item_infos.ItemType.MAPPING) + + +ItemVersionDisplayInfo = t.Union[ + st.MappingVersionDisplayInfo, + st.ResourceVersionDisplayInfo +] + +def item_version_store( + state: st.HaketiloState, + item_type: item_infos.ItemType +) -> t.Union[st.MappingVersionStore, st.ResourceVersionStore]: + if item_type == item_infos.ItemType.RESOURCE: + return state.resource_version_store() + else: + return state.mapping_version_store() + +def show_item_version( + item_version_id: str, + item_type: item_infos.ItemType, + errors: t.Mapping[str, bool] = {} +) -> werkzeug.Response: + state = _app.get_haketilo_state() + + try: + store = item_version_store(state, item_type) + version_ref = store.get(item_version_id) + display_info = version_ref.get_item_display_info() + + this_info: t.Optional[ItemVersionDisplayInfo] = None + + for info in display_info.all_versions: + if info.ref == version_ref: + this_info = info + + assert this_info is not None + + html = flask.render_template( + f'items/{item_type.alt_name}_viewversion.html.jinja', + display_info = display_info, + version_display_info = this_info, + **errors + ) + return flask.make_response(html, 200) + except st.MissingItemError: + flask.abort(404) + +@bp.route('/libraries/viewversion/<string:item_version_id>') +def show_library_version(item_version_id: str) -> werkzeug.Response: + return show_item_version(item_version_id, item_infos.ItemType.RESOURCE) + +@bp.route('/packages/viewversion/<string:item_version_id>') +def show_package_version(item_version_id: str) -> werkzeug.Response: + return show_item_version(item_version_id, item_infos.ItemType.MAPPING) + +def alter_item_version(item_version_id: str, item_type: item_infos.ItemType) \ + -> werkzeug.Response: + form_data = flask.request.form + action = form_data['action'] + + try: + store = item_version_store(_app.get_haketilo_state(), item_type) + item_version_ref = store.get(item_version_id) + + if action == 'disable_item': + assert isinstance(item_version_ref, st.MappingVersionRef) + item_version_ref.update_mapping_status(st.EnabledStatus.DISABLED) + elif action == 'unenable_item': + assert isinstance(item_version_ref, st.MappingVersionRef) + item_version_ref.update_mapping_status(st.EnabledStatus.NO_MARK) + elif action in ('enable_item_version', 'freeze_to_version'): + assert isinstance(item_version_ref, st.MappingVersionRef) + item_version_ref.update_mapping_status( + enabled = st.EnabledStatus.ENABLED, + frozen = st.FrozenStatus.EXACT_VERSION, + ) + elif action == 'unfreeze_item': + assert isinstance(item_version_ref, st.MappingVersionRef) + item_version_ref.update_mapping_status( + enabled = st.EnabledStatus.ENABLED, + frozen = st.FrozenStatus.NOT_FROZEN, + ) + elif action == 'freeze_to_repo': + assert isinstance(item_version_ref, st.MappingVersionRef) + item_version_ref.update_mapping_status( + enabled = st.EnabledStatus.ENABLED, + frozen = st.FrozenStatus.REPOSITORY, + ) + elif action == 'install_item_version': + item_version_ref.install() + elif action == 'uninstall_item_version': + item_version_ref_after = item_version_ref.uninstall() + if item_version_ref_after is None: + url = flask.url_for(f'.{item_type.alt_name_plural}') + return flask.redirect(url) + else: + return show_item_version(item_version_id, item_type) + else: + raise ValueError() + except st.RepoCommunicationError: + return show_item_version( + item_version_id = item_version_id, + item_type = item_type, + errors = {'repo_communication_error': True} + ) + except st.FileInstallationError: + return show_item_version( + item_version_id = item_version_id, + item_type = item_type, + errors = {'file_installation_error': True} + ) + except st.ImpossibleSituation: + return show_item_version( + item_version_id = item_version_id, + item_type = item_type, + errors = {'impossible_situation_error': True} + ) + except st.MissingItemError: + flask.abort(404) + + return flask.redirect( + flask.url_for( + f'.show_{item_type.alt_name}_version', + item_version_id = item_version_id + ) + ) + +@bp.route('/libraries/viewversion/<string:item_version_id>', methods=['POST']) +def alter_library_version(item_version_id: str) -> werkzeug.Response: + return alter_item_version(item_version_id, item_infos.ItemType.RESOURCE) + +@bp.route('/packages/viewversion/<string:item_version_id>', methods=['POST']) +def alter_package_version(item_version_id: str) -> werkzeug.Response: + return alter_item_version(item_version_id, item_infos.ItemType.MAPPING) + +def show_file( + item_version_id: str, + item_type: item_infos.ItemType, + file_type: str, + name: str, +) -> werkzeug.Response: + if file_type not in ('license', 'web_resource'): + flask.abort(404) + + try: + store = item_version_store(_app.get_haketilo_state(), item_type) + item_version_ref = store.get(item_version_id) + + try: + if file_type == 'license': + file_data = item_version_ref.get_license_file(name) + else: + assert isinstance(item_version_ref, st.ResourceVersionRef) + file_data = item_version_ref.get_resource_file(name) + + return werkzeug.Response( + file_data.contents, + mimetype = file_data.mime_type + ) + except st.MissingItemError: + if file_type == 'license': + url = item_version_ref.get_upstream_license_file_url(name) + else: + assert isinstance(item_version_ref, st.ResourceVersionRef) + url = item_version_ref.get_upstream_resource_file_url(name) + + return flask.redirect(url) + + except st.MissingItemError: + flask.abort(404) + +@bp.route('/packages/viewversion/<string:item_version_id>/<string:file_type>/<path:name>') +def show_mapping_file(item_version_id: str, file_type: str, name: str) \ + -> werkzeug.Response: + item_type = item_infos.ItemType.MAPPING + return show_file(item_version_id, item_type, file_type, name) + +@bp.route('/libraries/viewversion/<string:item_version_id>/<string:file_type>/<path:name>') +def show_resource_file(item_version_id: str, file_type: str, name: str) \ + -> werkzeug.Response: + item_type = item_infos.ItemType.RESOURCE + return show_file(item_version_id, item_type, file_type, name) + +@bp.route('/libraries/viewdep/<string:item_version_id>/<string:dep_identifier>') +def show_library_dep(item_version_id: str, dep_identifier: str) \ + -> werkzeug.Response: + state = _app.get_haketilo_state() + + try: + store = state.resource_version_store() + dep_id = store.get(item_version_id).get_dependency(dep_identifier).id + url = flask.url_for('.show_library_version', item_version_id=dep_id) + except st.MissingItemError: + try: + versionless_store = state.resource_store() + item_ref = versionless_store.get_by_identifier(dep_identifier) + url = flask.url_for('.show_library', item_id=item_ref.id) + except st.MissingItemError: + flask.abort(404) + + return flask.redirect(url) + +@bp.route('/<string:item_type>/viewrequired/<string:item_version_id>/<string:required_identifier>') +def show_required_mapping( + item_type: str, + item_version_id: str, + required_identifier: str +) -> werkzeug.Response: + state = _app.get_haketilo_state() + + if item_type not in ('package', 'library'): + flask.abort(404) + + found = False + + if item_type == 'package': + try: + ref = state.mapping_version_store().get(item_version_id) + mapping_ver_id = ref.get_required_mapping(required_identifier).id + url = flask.url_for( + '.show_package_version', + item_version_id = mapping_ver_id + ) + found = True + except st.MissingItemError: + pass + + if not found: + try: + versionless_store = state.mapping_store() + mapping_ref = versionless_store\ + .get_by_identifier(required_identifier) + url = flask.url_for('.show_package', item_id=mapping_ref.id) + except st.MissingItemError: + flask.abort(404) + + return flask.redirect(url) + +@bp.route('/package/viewlibrary/<string:item_version_id>/<string:pattern>/<string:lib_identifier>') +def show_package_library(item_version_id: str, pattern: str, lib_identifier: str) \ + -> werkzeug.Response: + state = _app.get_haketilo_state() + + try: + ref = state.mapping_version_store().get(item_version_id) + + try: + resource_ver_ref = \ + ref.get_payload_resource(unquote(pattern), lib_identifier) + url = flask.url_for( + '.show_library_version', + item_version_id = resource_ver_ref.id + ) + except st.MissingItemError: + resource_ref = state.resource_store().get_by_identifier( + lib_identifier + ) + url = flask.url_for('.show_library', item_id=resource_ref.id) + except st.MissingItemError: + flask.abort(404) + + return flask.redirect(url) + +@bp.route('/package/viewbypayload/<string:payload_id>/<string:package_identifier>') +def show_payload_package(payload_id: str, package_identifier: str) \ + -> werkzeug.Response: + state = _app.get_haketilo_state() + + try: + ref = state.payload_store().get(payload_id) + + try: + mapping_ver_ref = ref.get_display_info().mapping_info.ref + url = flask.url_for( + '.show_package_version', + item_version_id = mapping_ver_ref.id + ) + except st.MissingItemError: + mapping_ref = state.mapping_store().get_by_identifier( + package_identifier + ) + url = flask.url_for('.show_package', item_id=mapping_ref.id) + except st.MissingItemError: + flask.abort(404) + + return flask.redirect(url) |