From 879c41927171efc8d77d1de2739b18e2eb57580f Mon Sep 17 00:00:00 2001 From: Wojtek Kosior Date: Wed, 27 Jul 2022 15:56:24 +0200 Subject: unfinished partial work --- src/hydrilla/proxy/simple_dependency_satisfying.py | 189 +++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 src/hydrilla/proxy/simple_dependency_satisfying.py (limited to 'src/hydrilla/proxy/simple_dependency_satisfying.py') diff --git a/src/hydrilla/proxy/simple_dependency_satisfying.py b/src/hydrilla/proxy/simple_dependency_satisfying.py new file mode 100644 index 0000000..9716fe5 --- /dev/null +++ b/src/hydrilla/proxy/simple_dependency_satisfying.py @@ -0,0 +1,189 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Haketilo proxy payloads dependency resolution. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use this code +# in a proprietary program, I am not going to enforce this in court. + +""" +..... +""" + +# Enable using with Python 3.7. +from __future__ import annotations + +import dataclasses as dc +import typing as t + +from .. import item_infos +from .. import url_patterns + +@dc.dataclass +class ComputedPayload: + resources: list[item_infos.ResourceInfo] = dc.field(default_factory=list) + + allows_eval: bool = False + allows_cors_bypass: bool = False + +SingleMappingPayloads = t.Mapping[ + url_patterns.ParsedPattern, + ComputedPayload +] + +ComputedPayloadsDict = dict[ + item_infos.MappingInfo, + SingleMappingPayloads +] + +empty_identifiers_set: set[str] = set() + +@dc.dataclass(frozen=True) +class ItemsCollection: + resources: t.Mapping[str, item_infos.ResourceInfo] + mappings: t.Mapping[str, item_infos.MappingInfo] + + def _satisfy_payload_resource_rec( + self, + resource_identifier: str, + processed_resources: set[str], + computed_payload: ComputedPayload + ) -> t.Optional[ComputedPayload]: + if resource_identifier in processed_resources: + # We forbid circular dependencies. + return None + + resource_info = self.resources.get(resource_identifier) + if resource_info is None: + return None + + if resource_info in computed_payload.resources: + return computed_payload + + processed_resources.add(resource_identifier) + + if resource_info.allows_eval: + computed_payload.allows_eval = True + + if resource_info.allows_cors_bypass: + computed_payload.allows_cors_bypass = True + + for dependency_spec in resource_info.dependencies: + if self._satisfy_payload_resource_rec( + dependency_spec.identifier, + processed_resources, + computed_payload + ) is None: + return None + + processed_resources.remove(resource_identifier) + + computed_payload.resources.append(resource_info) + + return computed_payload + + def _satisfy_payload_resource(self, resource_identifier: str) \ + -> t.Optional[ComputedPayload]: + return self._satisfy_payload_resource_rec( + resource_identifier, + set(), + ComputedPayload() + ) + + def _compute_payloads_no_mapping_requirements(self) -> ComputedPayloadsDict: + computed_result: ComputedPayloadsDict = ComputedPayloadsDict() + + for mapping_info in self.mappings.values(): + by_pattern: dict[url_patterns.ParsedPattern, ComputedPayload] = {} + + failure = False + + for pattern, resource_spec in mapping_info.payloads.items(): + computed_payload = self._satisfy_payload_resource( + resource_spec.identifier + ) + if computed_payload is None: + failure = True + break + + if mapping_info.allows_eval: + computed_payload.allows_eval = True + + if mapping_info.allows_cors_bypass: + computed_payload.allows_cors_bypass = True + + by_pattern[pattern] = computed_payload + + if not failure: + computed_result[mapping_info] = by_pattern + + return computed_result + + def _mark_mappings_bad( + self, + identifier: str, + reverse_mapping_deps: t.Mapping[str, set[str]], + bad_mappings: set[str] + ) -> None: + if identifier in bad_mappings: + return + + bad_mappings.add(identifier) + + for requiring in reverse_mapping_deps.get(identifier, ()): + self._mark_mappings_bad( + requiring, + reverse_mapping_deps, + bad_mappings + ) + + def compute_payloads(self) -> ComputedPayloadsDict: + computed_result = self._compute_payloads_no_mapping_requirements() + + reverse_mapping_deps: dict[str, set[str]] = {} + + for mapping_info, by_pattern in computed_result.items(): + specs_to_resolve = [*mapping_info.required_mappings] + + for computed_payload in by_pattern.values(): + for resource_info in computed_payload.resources: + specs_to_resolve.extend(resource_info.required_mappings) + + for required_mapping_spec in specs_to_resolve: + identifier = required_mapping_spec.identifier + requiring = reverse_mapping_deps.setdefault(identifier, set()) + requiring.add(mapping_info.identifier) + + bad_mappings: set[str] = set() + + for required_identifier in reverse_mapping_deps.keys(): + if self.mappings.get(required_identifier) not in computed_result: + self._mark_mappings_bad( + required_identifier, + reverse_mapping_deps, + bad_mappings + ) + + for identifier in bad_mappings: + if identifier in self.mappings: + computed_result.pop(self.mappings[identifier], None) + + return computed_result -- cgit v1.2.3