# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy data and configuration (RepoRef and RepoStore subtypes). # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ This module provides an interface to interact with repositories configured inside Haketilo. """ # Enable using with Python 3.7. from __future__ import annotations import re import typing as t import dataclasses as dc from urllib.parse import urlparse from datetime import datetime import sqlite3 from .. import state as st from . import base from . import prune_packages def validate_repo_url(url: str) -> None: try: parsed = urlparse(url) except: raise st.RepoUrlInvalid() if parsed.scheme not in ('http', 'https'): raise st.RepoUrlInvalid() def ensure_repo_not_deleted(cursor: sqlite3.Cursor, repo_id: str) -> None: cursor.execute( 'SELECT deleted FROM repos WHERE repo_id = ?;', (repo_id,) ) rows = cursor.fetchall() if rows == []: raise st.MissingItemError() (deleted,), = rows if deleted: raise st.MissingItemError() def make_repo_display_info( ref: st.RepoRef, name: str, url: str, deleted: bool, last_refreshed: t.Optional[int], resource_count: int, mapping_count: int ) -> st.RepoDisplayInfo: last_refreshed_converted: t.Optional[datetime] = None if last_refreshed is not None: last_refreshed_converted = datetime.fromtimestamp(last_refreshed) return st.RepoDisplayInfo( ref = ref, is_local_semirepo = ref.id == '1', name = name, url = url, deleted = deleted, last_refreshed = last_refreshed_converted, resource_count = resource_count, mapping_count = mapping_count ) @dc.dataclass(frozen=True, unsafe_hash=True) class ConcreteRepoRef(st.RepoRef): """....""" state: base.HaketiloStateWithFields = dc.field(hash=False, compare=False) def remove(self) -> None: with self.state.cursor(transaction=True) as cursor: ensure_repo_not_deleted(cursor, self.id) cursor.execute( ''' UPDATE repos SET deleted = TRUE, url = '', active_iteration_id = NULL, last_refreshed = NULL WHERE repo_id = ?; ''', (self.id,) ) def update( self, *, name: t.Optional[str] = None, url: t.Optional[str] = None ) -> None: if name is not None: raise NotImplementedError() if url is None: return validate_repo_url(url) with self.state.cursor(transaction=True) as cursor: ensure_repo_not_deleted(cursor, self.id) cursor.execute( 'UPDATE repos SET url = ? WHERE repo_id = ?;', (url, self.id) ) prune_packages.prune(cursor) def refresh(self) -> st.RepoIterationRef: raise NotImplementedError() def get_display_info(self) -> st.RepoDisplayInfo: with self.state.cursor() as cursor: cursor.execute( ''' SELECT name, url, deleted, last_refreshed, resource_count, mapping_count FROM repo_display_infos WHERE repo_id = ?; ''', (self.id,) ) rows = cursor.fetchall() if rows == []: raise st.MissingItemError() row, = rows return make_repo_display_info(self, *row) repo_name_regex = re.compile(r''' ^ (?: []a-zA-Z0-9()<>^&$.!,?@#|;:%"'*{}[/_=+-]+ # allowed non-whitespace characters (?: # optional additional words separated by single spaces [ ] []a-zA-Z0-9()<>^&$.!,?@#|;:%"'*{}[/_=+-]+ )* ) $ ''', re.VERBOSE) @dc.dataclass(frozen=True) class ConcreteRepoStore(st.RepoStore): state: base.HaketiloStateWithFields def get(self, id: str) -> st.RepoRef: return ConcreteRepoRef(id, self.state) def add(self, name: str, url: str) -> st.RepoRef: name = name.strip() if repo_name_regex.match(name) is None: raise st.RepoNameInvalid() validate_repo_url(url) with self.state.cursor(transaction=True) as cursor: cursor.execute( ''' SELECT COUNT(repo_id) FROM repos WHERE NOT deleted AND name = ?; ''', (name,) ) (name_taken,), = cursor.fetchall() if name_taken: raise st.RepoNameTaken() cursor.execute( ''' INSERT INTO repos(name, url, deleted, next_iteration) VALUES (?, ?, FALSE, 1) ON CONFLICT (name) DO UPDATE SET name = excluded.name, url = excluded.url, deleted = FALSE, last_refreshed = NULL; ''', (name, url) ) cursor.execute('SELECT repo_id FROM repos WHERE name = ?;', (name,)) (repo_id,), = cursor.fetchall() return ConcreteRepoRef(str(repo_id), self.state) def get_display_infos(self, include_deleted: bool = False) \ -> t.Iterable[st.RepoDisplayInfo]: with self.state.cursor() as cursor: condition: str = 'TRUE' if include_deleted: condition = 'COALESCE(deleted = FALSE, TRUE)' cursor.execute( f''' SELECT repo_id, name, url, deleted, last_refreshed, resource_count, mapping_count FROM repo_display_infos WHERE {condition} ORDER BY repo_id != 1, name; ''' ) all_rows = cursor.fetchall() assert len(all_rows) > 0 and all_rows[0][0] == 1 result = [] for row in all_rows: repo_id, *rest = row ref = ConcreteRepoRef(str(repo_id), self.state) result.append(make_repo_display_info(ref, *rest)) return result