aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/web_ui/items.py
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-08-29 13:05:35 +0200
committerWojtek Kosior <koszko@koszko.org>2022-09-28 12:54:54 +0200
commit7fc6312d6df526b8eb49288aecf88d04668e7c45 (patch)
treebc9bda05270991892383839379c101515a440576 /src/hydrilla/proxy/web_ui/items.py
parent367ea85057368047a50ae98a3510e0113eadd744 (diff)
downloadhaketilo-hydrilla-7fc6312d6df526b8eb49288aecf88d04668e7c45.tar.gz
haketilo-hydrilla-7fc6312d6df526b8eb49288aecf88d04668e7c45.zip
[proxy] make it possible to also view and install/uninstall libraries (resources) through the web UI
Diffstat (limited to 'src/hydrilla/proxy/web_ui/items.py')
-rw-r--r--src/hydrilla/proxy/web_ui/items.py275
1 files changed, 275 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/web_ui/items.py b/src/hydrilla/proxy/web_ui/items.py
new file mode 100644
index 0000000..03f2f2d
--- /dev/null
+++ b/src/hydrilla/proxy/web_ui/items.py
@@ -0,0 +1,275 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Proxy web UI packages loading.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import tempfile
+import zipfile
+import typing as t
+
+from pathlib import Path
+
+import flask
+import werkzeug
+
+from ...exceptions import HaketiloException
+from ...translations import smart_gettext as _
+from ... import item_infos
+from .. import state as st
+from . import _app
+
+
+class InvalidUploadedMalcontent(HaketiloException):
+ def __init__(self):
+ super().__init__(_('err.proxy.uploaded_malcontent_invalid'))
+
+
+bp = flask.Blueprint('items', __package__)
+
+@bp.route('/packages/load_from_disk', methods=['GET'])
+def load_from_disk() -> werkzeug.Response:
+ html = flask.render_template('packages/load_from_disk.html.jinja')
+ return flask.make_response(html, 200)
+
+@bp.route('/packages/load_from_disk', methods=['POST'])
+def load_from_disk_post() -> werkzeug.Response:
+ zip_file_storage = flask.request.files.get('packages_zipfile')
+ if zip_file_storage is None:
+ return load_from_disk()
+
+ with tempfile.TemporaryDirectory() as tmpdir_str:
+ tmpdir = Path(tmpdir_str)
+ tmpdir_child = tmpdir / 'childdir'
+ tmpdir_child.mkdir()
+
+ try:
+ with zipfile.ZipFile(zip_file_storage) as zip_file:
+ zip_file.extractall(tmpdir_child)
+ except:
+ raise HaketiloException(_('err.proxy.uploaded_file_not_zip'))
+
+ extracted_top_level_files = tuple(tmpdir_child.iterdir())
+ if extracted_top_level_files == ():
+ raise InvalidUploadedMalcontent()
+
+ if len(extracted_top_level_files) == 1 and \
+ extracted_top_level_files[0].is_dir():
+ malcontent_dir_path = extracted_top_level_files[0]
+ else:
+ malcontent_dir_path = tmpdir_child
+
+ try:
+ _app.get_haketilo_state().import_items(malcontent_dir_path)
+ except:
+ raise InvalidUploadedMalcontent()
+
+ return flask.redirect(flask.url_for('.packages'))
+
+@bp.route('/packages')
+def packages() -> werkzeug.Response:
+ store = _app.get_haketilo_state().mapping_store()
+
+ html = flask.render_template(
+ 'packages/index.html.jinja',
+ display_infos = store.get_display_infos()
+ )
+ return flask.make_response(html, 200)
+
+@bp.route('/libraries')
+def libraries() -> werkzeug.Response:
+ store = _app.get_haketilo_state().resource_store()
+
+ html = flask.render_template(
+ 'libraries/index.html.jinja',
+ display_infos = store.get_display_infos()
+ )
+ return flask.make_response(html, 200)
+
+def item_store(state: st.HaketiloState, item_type: item_infos.ItemType) \
+ -> t.Union[st.MappingStore, st.ResourceStore]:
+ if item_type == item_infos.ItemType.RESOURCE:
+ return state.resource_store()
+ else:
+ return state.mapping_store()
+
+def show_item(item_id: str, item_type: item_infos.ItemType) \
+ -> werkzeug.Response:
+ try:
+ store = item_store(_app.get_haketilo_state(), item_type)
+ item_ref = store.get(str(item_id))
+ version_display_infos = item_ref.get_version_display_infos()
+
+ display_info: t.Union[st.MappingDisplayInfo, st.ResourceDisplayInfo]
+
+ if isinstance(item_ref, st.ResourceRef):
+ display_info = st.ResourceDisplayInfo(
+ ref = item_ref,
+ identifier = version_display_infos[0].info.identifier
+ )
+ else:
+ version_display_infos = t.cast(
+ t.Sequence[st.MappingVersionDisplayInfo],
+ version_display_infos
+ )
+
+ active_version: t.Optional[st.MappingVersionDisplayInfo] = None
+
+ for info in version_display_infos:
+ if info.active != st.ActiveStatus.NOT_ACTIVE:
+ active_version = info
+
+ display_info = st.MappingDisplayInfo(
+ ref = item_ref,
+ identifier = version_display_infos[0].info.identifier,
+ enabled = version_display_infos[0].mapping_enabled,
+ active_version = active_version
+ )
+
+ html = flask.render_template(
+ f'{item_type.alt_name_plural}/show_single.html.jinja',
+ display_info = display_info,
+ version_display_infos = version_display_infos
+ )
+ return flask.make_response(html, 200)
+ except st.MissingItemError:
+ flask.abort(404)
+
+
+@bp.route('/libraries/view/<string:item_id>')
+def show_library(item_id: str) -> werkzeug.Response:
+ return show_item(item_id, item_infos.ItemType.RESOURCE)
+
+@bp.route('/packages/view/<string:item_id>')
+def show_package(item_id: str) -> werkzeug.Response:
+ return show_item(item_id, item_infos.ItemType.MAPPING)
+
+ItemVersionDisplayInfo = t.Union[
+ st.MappingVersionDisplayInfo,
+ st.ResourceVersionDisplayInfo
+]
+
+def item_version_store(
+ state: st.HaketiloState,
+ item_type: item_infos.ItemType
+) -> t.Union[st.MappingVersionStore, st.ResourceVersionStore]:
+ if item_type == item_infos.ItemType.RESOURCE:
+ return state.resource_version_store()
+ else:
+ return state.mapping_version_store()
+
+def show_item_version(
+ item_version_id: str,
+ item_type: item_infos.ItemType,
+ errors: t.Mapping[str, bool] = {}
+) -> werkzeug.Response:
+ try:
+ store = item_version_store(_app.get_haketilo_state(), item_type)
+ version_ref = store.get(item_version_id)
+ display_infos = version_ref.get_all_version_display_infos()
+
+ other_infos: list[ItemVersionDisplayInfo] = []
+ this_info: t.Optional[ItemVersionDisplayInfo] = None
+
+ for info in display_infos:
+ if info.ref == version_ref:
+ this_info = info
+ else:
+ other_infos.append(info)
+
+ assert this_info is not None
+
+ html = flask.render_template(
+ f'{item_type.alt_name_plural}/show_single_version.html.jinja',
+ display_info = this_info,
+ version_display_infos = other_infos,
+ **errors
+ )
+ return flask.make_response(html, 200)
+ except st.MissingItemError:
+ flask.abort(404)
+
+@bp.route('/libraries/viewversion/<string:item_version_id>')
+def show_library_version(item_version_id: str) -> werkzeug.Response:
+ return show_item_version(item_version_id, item_infos.ItemType.RESOURCE)
+
+@bp.route('/packages/viewversion/<string:item_version_id>')
+def show_package_version(item_version_id: str) -> werkzeug.Response:
+ return show_item_version(item_version_id, item_infos.ItemType.MAPPING)
+
+def alter_item_version(item_version_id: str, item_type: item_infos.ItemType) \
+ -> werkzeug.Response:
+ form_data = flask.request.form
+ action = form_data['action']
+
+ try:
+ store = item_version_store(_app.get_haketilo_state(), item_type)
+ item_version_ref = store.get(item_version_id)
+
+ if action == 'install_item_version':
+ item_version_ref.install()
+ elif action == 'uninstall_item_version':
+ item_version_ref_after = item_version_ref.uninstall()
+ if item_version_ref_after is None:
+ url = flask.url_for(f'.{item_type.alt_name_plural}')
+ return flask.redirect(url)
+ else:
+ return show_item_version(item_version_id, item_type)
+ else:
+ raise ValueError()
+ except st.FileInstallationError:
+ return show_item_version(
+ item_version_id = item_version_id,
+ item_type = item_type,
+ errors = {'file_installation_error': True}
+ )
+ except st.ImpossibleSituation:
+ return show_item_version(
+ item_version_id = item_version_id,
+ item_type = item_type,
+ errors = {'uninstall_disallowed': True}
+ )
+ except st.MissingItemError:
+ flask.abort(404)
+
+ return flask.redirect(
+ flask.url_for(
+ f'.show_{item_type.alt_name}_version',
+ item_version_id = item_version_id
+ )
+ )
+
+@bp.route('/libraries/viewversion/<string:item_version_id>', methods=['POST'])
+def alter_library_version(item_version_id: str) -> werkzeug.Response:
+ return alter_item_version(item_version_id, item_infos.ItemType.RESOURCE)
+
+@bp.route('/packages/viewversion/<string:item_version_id>', methods=['POST'])
+def alter_package_version(item_version_id: str) -> werkzeug.Response:
+ return alter_item_version(item_version_id, item_infos.ItemType.MAPPING)