# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy data and configuration (PayloadRef subtype). # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ This module provides an interface to interact with payloads inside Haketilo. """ # Enable using with Python 3.7. from __future__ import annotations import dataclasses as dc import typing as t from .. import state as st from . import base @dc.dataclass(frozen=True, unsafe_hash=True) class ConcretePayloadRef(st.PayloadRef): state: base.HaketiloStateWithFields = dc.field(hash=False, compare=False) def get_data(self) -> st.PayloadData: try: return self.state.payloads_data[self] except KeyError: raise st.MissingItemError() def get_mapping(self) -> st.MappingVersionRef: raise NotImplementedError() def get_script_paths(self) \ -> t.Iterable[t.Sequence[str]]: with self.state.cursor() as cursor: cursor.execute( ''' SELECT i.identifier, fu.name FROM payloads AS p LEFT JOIN resolved_depended_resources AS rdd USING (payload_id) LEFT JOIN item_versions AS iv ON rdd.resource_item_id = iv.item_version_id LEFT JOIN items AS i USING (item_id) LEFT JOIN file_uses AS fu USING (item_version_id) WHERE fu.type = 'W' AND p.payload_id = ? AND (fu.idx IS NOT NULL OR rdd.idx IS NULL) ORDER BY rdd.idx, fu.idx; ''', (self.id,) ) paths: list[t.Sequence[str]] = [] for resource_identifier, file_name in cursor.fetchall(): if resource_identifier is None: # payload found but it had no script files return () paths.append((resource_identifier, *file_name.split('/'))) if paths == []: # payload not found raise st.MissingItemError() return paths def get_file_data(self, path: t.Sequence[str]) \ -> t.Optional[st.FileData]: if len(path) == 0: raise st.MissingItemError() resource_identifier, *file_name_segments = path file_name = '/'.join(file_name_segments) with self.state.cursor() as cursor: cursor.execute( ''' SELECT f.data, fu.mime_type FROM payloads AS p JOIN resolved_depended_resources AS rdd USING (payload_id) JOIN item_versions AS iv ON rdd.resource_item_id = iv.item_version_id JOIN items AS i USING (item_id) JOIN file_uses AS fu USING (item_version_id) JOIN files AS f USING (file_id) WHERE p.payload_id = ? AND i.identifier = ? AND fu.name = ? AND fu.type = 'W'; ''', (self.id, resource_identifier, file_name) ) result = cursor.fetchall() if result == []: return None (data, mime_type), = result return st.FileData(type=mime_type, name=file_name, contents=data)