# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy payloads dependency resolution. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ ..... """ # Enable using with Python 3.7. from __future__ import annotations import dataclasses as dc import typing as t from .. import item_infos from .. import url_patterns @dc.dataclass class ComputedPayload: resources: list[item_infos.ResourceInfo] = dc.field(default_factory=list) allows_eval: bool = False allows_cors_bypass: bool = False SingleMappingPayloads = t.Mapping[ url_patterns.ParsedPattern, ComputedPayload ] ComputedPayloadsDict = dict[ item_infos.MappingInfo, SingleMappingPayloads ] empty_identifiers_set: set[str] = set() @dc.dataclass(frozen=True) class _ItemsCollection: resources: t.Mapping[str, item_infos.ResourceInfo] mappings: t.Mapping[str, item_infos.MappingInfo] def _satisfy_payload_resource_rec( self, resource_identifier: str, processed_resources: set[str], computed_payload: ComputedPayload ) -> t.Optional[ComputedPayload]: if resource_identifier in processed_resources: # We forbid circular dependencies. return None resource_info = self.resources.get(resource_identifier) if resource_info is None: return None if resource_info in computed_payload.resources: return computed_payload processed_resources.add(resource_identifier) if resource_info.allows_eval: computed_payload.allows_eval = True if resource_info.allows_cors_bypass: computed_payload.allows_cors_bypass = True for dependency_spec in resource_info.dependencies: if self._satisfy_payload_resource_rec( dependency_spec.identifier, processed_resources, computed_payload ) is None: return None processed_resources.remove(resource_identifier) computed_payload.resources.append(resource_info) return computed_payload def _satisfy_payload_resource(self, resource_identifier: str) \ -> t.Optional[ComputedPayload]: return self._satisfy_payload_resource_rec( resource_identifier, set(), ComputedPayload() ) def _compute_payloads_no_mapping_requirements(self) -> ComputedPayloadsDict: computed_result: ComputedPayloadsDict = ComputedPayloadsDict() for mapping_info in self.mappings.values(): by_pattern: dict[url_patterns.ParsedPattern, ComputedPayload] = {} failure = False for pattern, resource_spec in mapping_info.payloads.items(): computed_payload = self._satisfy_payload_resource( resource_spec.identifier ) if computed_payload is None: failure = True break if mapping_info.allows_eval: computed_payload.allows_eval = True if mapping_info.allows_cors_bypass: computed_payload.allows_cors_bypass = True by_pattern[pattern] = computed_payload if not failure: computed_result[mapping_info] = by_pattern return computed_result def _mark_mappings_bad( self, identifier: str, reverse_mapping_deps: t.Mapping[str, set[str]], bad_mappings: set[str] ) -> None: if identifier in bad_mappings: return bad_mappings.add(identifier) for requiring in reverse_mapping_deps.get(identifier, ()): self._mark_mappings_bad( requiring, reverse_mapping_deps, bad_mappings ) def compute_payloads(self) -> ComputedPayloadsDict: computed_result = self._compute_payloads_no_mapping_requirements() reverse_mapping_deps: dict[str, set[str]] = {} for mapping_info, by_pattern in computed_result.items(): specs_to_resolve = [*mapping_info.required_mappings] for computed_payload in by_pattern.values(): for resource_info in computed_payload.resources: specs_to_resolve.extend(resource_info.required_mappings) for required_mapping_spec in specs_to_resolve: identifier = required_mapping_spec.identifier requiring = reverse_mapping_deps.setdefault(identifier, set()) requiring.add(mapping_info.identifier) bad_mappings: set[str] = set() for required_identifier in reverse_mapping_deps.keys(): if self.mappings.get(required_identifier) not in computed_result: self._mark_mappings_bad( required_identifier, reverse_mapping_deps, bad_mappings ) for identifier in bad_mappings: if identifier in self.mappings: computed_result.pop(self.mappings[identifier], None) return computed_result AnyInfoVar = t.TypeVar( 'AnyInfoVar', item_infos.ResourceInfo, item_infos.MappingInfo ) def _choose_newest(infos: t.Iterable[AnyInfoVar]) -> dict[str, AnyInfoVar]: best_versions: dict[str, AnyInfoVar] = {} for info in infos: other = best_versions.setdefault(info.identifier, info) if (other.version, other.repo, info.repo_iteration) < \ (info.version, info.repo, other.repo_iteration): best_versions[info.identifier] = info return best_versions def compute_payloads( resources: t.Iterable[item_infos.ResourceInfo], mappings: t.Iterable[item_infos.MappingInfo] ) -> ComputedPayloadsDict: best_resources = _choose_newest(resources) best_mappings = _choose_newest(mappings) return _ItemsCollection(best_resources, best_mappings).compute_payloads()