# SPDX-License-Identifier: GPL-3.0-or-later
# Proxy web UI packages loading.
#
# This file is part of Hydrilla&Haketilo.
#
# Copyright (C) 2022 Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
#
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
"""
.....
"""
# Enable using with Python 3.7.
from __future__ import annotations
import tempfile
import zipfile
import typing as t
from pathlib import Path
import flask
import werkzeug
from ...exceptions import HaketiloException
from ...translations import smart_gettext as _
from ... import item_infos
from .. import state as st
from . import _app
bp = flask.Blueprint('items', __package__)
@bp.route('/load_from_disk', methods=['GET'])
def load_from_disk(errors: t.Mapping[str, bool] = {}) -> werkzeug.Response:
html = flask.render_template('items/load_from_disk.html.jinja', **errors)
return flask.make_response(html, 200)
@bp.route('/load_from_disk', methods=['POST'])
def load_from_disk_post() -> werkzeug.Response:
zip_file_storage = flask.request.files.get('items_zipfile')
if zip_file_storage is None:
return load_from_disk()
with tempfile.TemporaryDirectory() as tmpdir_str:
tmpdir = Path(tmpdir_str)
tmpdir_child = tmpdir / 'childdir'
tmpdir_child.mkdir()
try:
with zipfile.ZipFile(zip_file_storage) as zip_file:
zip_file.extractall(tmpdir_child)
except:
return load_from_disk({'uploaded_file_not_zip': True})
extracted_top_level_files = tuple(tmpdir_child.iterdir())
if extracted_top_level_files == ():
return load_from_disk({'invalid_uploaded_malcontent': True})
if len(extracted_top_level_files) == 1 and \
extracted_top_level_files[0].is_dir():
malcontent_dir_path = extracted_top_level_files[0]
else:
malcontent_dir_path = tmpdir_child
try:
_app.get_haketilo_state().import_items(malcontent_dir_path)
except:
return load_from_disk({'invalid_uploaded_malcontent': True})
return flask.redirect(flask.url_for('.packages'))
@bp.route('/packages')
def packages() -> werkzeug.Response:
store = _app.get_haketilo_state().mapping_store()
html = flask.render_template(
'items/packages.html.jinja',
display_infos = store.get_display_infos()
)
return flask.make_response(html, 200)
@bp.route('/libraries')
def libraries() -> werkzeug.Response:
store = _app.get_haketilo_state().resource_store()
html = flask.render_template(
'items/libraries.html.jinja',
display_infos = store.get_display_infos()
)
return flask.make_response(html, 200)
def item_store(state: st.HaketiloState, item_type: item_infos.ItemType) \
-> t.Union[st.MappingStore, st.ResourceStore]:
if item_type == item_infos.ItemType.RESOURCE:
return state.resource_store()
else:
return state.mapping_store()
def show_item(item_id: str, item_type: item_infos.ItemType) \
-> werkzeug.Response:
try:
store = item_store(_app.get_haketilo_state(), item_type)
display_info = store.get(str(item_id)).get_display_info()
html = flask.render_template(
f'items/{item_type.alt_name}_view.html.jinja',
display_info = display_info
)
return flask.make_response(html, 200)
except st.MissingItemError:
flask.abort(404)
@bp.route('/libraries/view/')
def show_library(item_id: str) -> werkzeug.Response:
return show_item(item_id, item_infos.ItemType.RESOURCE)
@bp.route('/packages/view/')
def show_package(item_id: str) -> werkzeug.Response:
return show_item(item_id, item_infos.ItemType.MAPPING)
ItemVersionDisplayInfo = t.Union[
st.MappingVersionDisplayInfo,
st.ResourceVersionDisplayInfo
]
def item_version_store(
state: st.HaketiloState,
item_type: item_infos.ItemType
) -> t.Union[st.MappingVersionStore, st.ResourceVersionStore]:
if item_type == item_infos.ItemType.RESOURCE:
return state.resource_version_store()
else:
return state.mapping_version_store()
def show_item_version(
item_version_id: str,
item_type: item_infos.ItemType,
errors: t.Mapping[str, bool] = {}
) -> werkzeug.Response:
try:
store = item_version_store(_app.get_haketilo_state(), item_type)
version_ref = store.get(item_version_id)
display_info = version_ref.get_item_display_info()
this_info: t.Optional[ItemVersionDisplayInfo] = None
for info in display_info.all_versions:
if info.ref == version_ref:
this_info = info
assert this_info is not None
html = flask.render_template(
f'items/{item_type.alt_name}_viewversion.html.jinja',
display_info = display_info,
version_display_info = this_info,
**errors
)
return flask.make_response(html, 200)
except st.MissingItemError:
flask.abort(404)
@bp.route('/libraries/viewversion/')
def show_library_version(item_version_id: str) -> werkzeug.Response:
return show_item_version(item_version_id, item_infos.ItemType.RESOURCE)
@bp.route('/packages/viewversion/')
def show_package_version(item_version_id: str) -> werkzeug.Response:
return show_item_version(item_version_id, item_infos.ItemType.MAPPING)
def alter_item_version(item_version_id: str, item_type: item_infos.ItemType) \
-> werkzeug.Response:
form_data = flask.request.form
action = form_data['action']
try:
store = item_version_store(_app.get_haketilo_state(), item_type)
item_version_ref = store.get(item_version_id)
if action == 'disable_item':
assert isinstance(item_version_ref, st.MappingVersionRef)
item_version_ref.update_mapping_status(st.EnabledStatus.DISABLED)
elif action == 'unenable_item':
assert isinstance(item_version_ref, st.MappingVersionRef)
item_version_ref.update_mapping_status(st.EnabledStatus.NO_MARK)
elif action == 'enable_item_version':
assert isinstance(item_version_ref, st.MappingVersionRef)
item_version_ref.update_mapping_status(
enabled = st.EnabledStatus.ENABLED,
frozen = st.FrozenStatus.EXACT_VERSION,
)
elif action == 'unfreeze_item':
assert isinstance(item_version_ref, st.MappingVersionRef)
item_version_ref.update_mapping_status(
enabled = st.EnabledStatus.ENABLED,
frozen = st.FrozenStatus.NOT_FROZEN,
)
elif action == 'freeze_to_repo':
assert isinstance(item_version_ref, st.MappingVersionRef)
item_version_ref.update_mapping_status(
enabled = st.EnabledStatus.ENABLED,
frozen = st.FrozenStatus.REPOSITORY,
)
elif action == 'freeze_to_version':
assert isinstance(item_version_ref, st.MappingVersionRef)
item_version_ref.update_mapping_status(
enabled = st.EnabledStatus.ENABLED,
frozen = st.FrozenStatus.EXACT_VERSION,
)
elif action == 'install_item_version':
item_version_ref.install()
elif action == 'uninstall_item_version':
item_version_ref_after = item_version_ref.uninstall()
if item_version_ref_after is None:
url = flask.url_for(f'.{item_type.alt_name_plural}')
return flask.redirect(url)
else:
return show_item_version(item_version_id, item_type)
else:
raise ValueError()
except st.RepoCommunicationError:
return show_item_version(
item_version_id = item_version_id,
item_type = item_type,
errors = {'repo_communication_error': True}
)
except st.FileInstallationError:
return show_item_version(
item_version_id = item_version_id,
item_type = item_type,
errors = {'file_installation_error': True}
)
except st.ImpossibleSituation:
return show_item_version(
item_version_id = item_version_id,
item_type = item_type,
errors = {'uninstall_disallowed': True}
)
except st.MissingItemError:
flask.abort(404)
return flask.redirect(
flask.url_for(
f'.show_{item_type.alt_name}_version',
item_version_id = item_version_id
)
)
@bp.route('/libraries/viewversion/', methods=['POST'])
def alter_library_version(item_version_id: str) -> werkzeug.Response:
return alter_item_version(item_version_id, item_infos.ItemType.RESOURCE)
@bp.route('/packages/viewversion/', methods=['POST'])
def alter_package_version(item_version_id: str) -> werkzeug.Response:
return alter_item_version(item_version_id, item_infos.ItemType.MAPPING)