# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy data and configuration. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ This module contains logic for keeping track of all settings, rules, mappings and resources. """ # Enable using with Python 3.7. from __future__ import annotations import secrets import threading import typing as t import dataclasses as dc from pathlib import Path from immutables import Map from ..url_patterns import ParsedUrl from ..pattern_tree import PatternTree from .store import HaketiloStore from . import state from . import policies PolicyTree = PatternTree[policies.PolicyFactory] def register_builtin_policies(policy_tree: PolicyTree) -> PolicyTree: """....""" # TODO: implement pass def register_payload( policy_tree: PolicyTree, payload_ref: state.PayloadRef, token: str ) -> tuple[PolicyTree, t.Iterable[policies.PolicyFactory]]: """....""" payload_policy_factory = policies.PayloadPolicyFactory( builtin = False, payload_ref = payload_ref ) policy_tree = policy_tree.register( payload_ref.pattern, payload_policy_factory ) resource_policy_factory = policies.PayloadResourcePolicyFactory( builtin = False, payload_ref = payload_ref ) policy_tree = policy_tree.register( payload_ref.pattern.path_append(token, '***'), resource_policy_factory ) return policy_tree, (payload_policy_factory, resource_policy_factory) def register_mapping( policy_tree: PolicyTree, payload_refs: t.Iterable[state.PayloadRef], token: str ) -> tuple[PolicyTree, t.Iterable[policies.PolicyFactory]]: """....""" policy_factories: list[policies.PolicyFactory] = [] for payload_ref in payload_refs: policy_tree, factories = register_payload( policy_tree, payload_ref, token ) policy_factories.extend(factories) return policy_tree, policy_factories @dc.dataclass(frozen=True) class RichMappingData(state.MappingData): """....""" policy_factories: t.Iterable[policies.PolicyFactory] # @dc.dataclass(frozen=True) # class HaketiloData: # settings: state.HaketiloGlobalSettings # policy_tree: PolicyTree # mappings_data: Map[state.MappingRef, RichMappingData] = Map() MonitoredMethodType = t.TypeVar('MonitoredMethodType', bound=t.Callable) def with_lock(wrapped_fun: MonitoredMethodType) -> MonitoredMethodType: """....""" def wrapper(self: 'ConcreteHaketiloState', *args, **kwargs): """....""" with self.lock: return wrapped_fun(self, *args, **kwargs) return t.cast(MonitoredMethodType, wrapper) @dc.dataclass class ConcreteHaketiloState(state.HaketiloState): """....""" store: HaketiloStore settings: state.HaketiloGlobalSettings policy_tree: PolicyTree = PatternTree() mappings_data: Map[state.MappingRef, RichMappingData] = Map() lock: threading.RLock = dc.field(default_factory=threading.RLock) def __post_init__(self) -> None: """....""" self.policy_tree = register_builtin_policies(self.policy_tree) self._init_mappings() def _init_mappings(self) -> None: """....""" store_mappings_data = self.store.load_installed_mappings_data() payload_items = self.store.load_payloads_data().items() for mapping_ref, payload_refs in payload_items: installed = True enabled_status = store_mappings_data.get(mapping_ref) if enabled_status is None: installed = False enabled_status = state.EnabledStatus.NO_MARK self._register_mapping( mapping_ref, payload_refs, enabled = enabled_status, installed = installed ) @with_lock def _register_mapping( self, mapping_ref: state.MappingRef, payload_refs: t.Iterable[state.PayloadRef], enabled: state.EnabledStatus, installed: bool ) -> None: """....""" token = secrets.token_urlsafe(8) self.policy_tree, factories = register_mapping( self.policy_tree, payload_refs, token ) runtime_data = RichMappingData( mapping_ref = mapping_ref, enabled_status = enabled, unique_token = token, policy_factories = factories ) self.mappings_data = self.mappings_data.set(mapping_ref, runtime_data) @with_lock def get_mapping_data(self, mapping_ref: state.MappingRef) \ -> state.MappingData: try: return self.mappings_data[mapping_ref] except KeyError: raise state.MissingItemError('no such mapping') @with_lock def get_payload_data(self, payload_ref: state.PayloadRef) \ -> state.PayloadData: # TODO!!! try: return t.cast(state.PayloadData, None) except: raise state.MissingItemError('no such payload') @with_lock def get_file_paths(self, payload_ref: state.PayloadRef) \ -> t.Iterable[t.Sequence[str]]: # TODO!!! return [] @with_lock def get_file_data( self, payload_ref: state.PayloadRef, file_path: t.Sequence[str] ) -> t.Optional[state.FileData]: if len(file_path) == 0: raise state.MissingItemError('empty file path') path_str = '/'.join(file_path[1:]) return self.store.load_file_data(payload_ref, file_path[0], path_str) @with_lock def ensure_installed(self, mapping: state.MappingRef) -> None: # TODO!!! pass @with_lock def get_settings(self) -> state.HaketiloGlobalSettings: return self.settings @with_lock def update_settings(self, updater: state.SettingsUpdater) -> None: new_settings = updater(self.settings) self.store.write_global_settings(new_settings) self.settings = new_settings def select_policy(self, url: str) -> policies.Policy: """....""" with self.lock: policy_tree = self.policy_tree try: best_priority: int = 0 best_policy: t.Optional[policies.Policy] = None for factories_set in policy_tree.search(url): for stored_factory in sorted(factories_set): factory = stored_factory.item policy = factory.make_policy(self) policy_priority = policy.priority() if policy_priority > best_priority: best_priority = policy_priority best_policy = policy except Exception as e: return policies.ErrorBlockPolicy( builtin = True, error = e ) if best_policy is not None: return best_policy if self.get_settings().default_allow_scripts: return policies.FallbackAllowPolicy() else: return policies.FallbackBlockPolicy() @staticmethod def make(store_dir: Path) -> 'ConcreteHaketiloState': """....""" store = HaketiloStore(store_dir) settings = store.load_global_settings() return ConcreteHaketiloState(store=store, settings=settings)