# SPDX-License-Identifier: GPL-3.0-or-later # Haketilo proxy payloads dependency resolution. # # This file is part of Hydrilla&Haketilo. # # Copyright (C) 2022 Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # # I, Wojtek Kosior, thereby promise not to sue for violation of this # file's license. Although I request that you do not make use this code # in a proprietary program, I am not going to enforce this in court. """ ..... """ # Enable using with Python 3.7. from __future__ import annotations import dataclasses as dc import typing as t from ..exceptions import HaketiloException from .. import item_infos from .. import url_patterns class ImpossibleSituation(HaketiloException): pass @dc.dataclass(frozen=True) class MappingRequirement: identifier: str def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool: return True @dc.dataclass(frozen=True) class MappingRepoRequirement(MappingRequirement): repo: str def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool: return info.repo == self.repo @dc.dataclass(frozen=True) class MappingVersionRequirement(MappingRequirement): version_info: item_infos.MappingInfo def __post_init__(self): assert self.version_info.identifier == self.identifier def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool: return info == self.version_info @dc.dataclass class ComputedPayload: resources: list[item_infos.ResourceInfo] = dc.field(default_factory=list) allows_eval: bool = False allows_cors_bypass: bool = False @dc.dataclass class MappingChoice: info: item_infos.MappingInfo required: bool = False payloads: dict[url_patterns.ParsedPattern, ComputedPayload] = \ dc.field(default_factory=dict) MappingsGraph = t.Union[ t.Mapping[str, set[str]], t.Mapping[str, frozenset[str]] ] def _mark_mappings( identifier: str, mappings_graph: MappingsGraph, marked_mappings: set[str] ) -> None: if identifier in marked_mappings: return marked_mappings.add(identifier) for next_mapping in mappings_graph.get(identifier, ()): _mark_mappings(next_mapping, mappings_graph, marked_mappings) ComputedChoices = dict[str, MappingChoice] @dc.dataclass(frozen=True) class _ComputationData: resources: t.Mapping[str, item_infos.ResourceInfo] mappings: t.Mapping[str, item_infos.MappingInfo] required: frozenset[str] def _satisfy_payload_resource_rec( self, resource_identifier: str, processed_resources: set[str], computed_payload: ComputedPayload ) -> t.Optional[ComputedPayload]: if resource_identifier in processed_resources: # We forbid circular dependencies. return None resource_info = self.resources.get(resource_identifier) if resource_info is None: return None if resource_info in computed_payload.resources: return computed_payload processed_resources.add(resource_identifier) if resource_info.allows_eval: computed_payload.allows_eval = True if resource_info.allows_cors_bypass: computed_payload.allows_cors_bypass = True for dependency_spec in resource_info.dependencies: if self._satisfy_payload_resource_rec( dependency_spec.identifier, processed_resources, computed_payload ) is None: return None processed_resources.remove(resource_identifier) computed_payload.resources.append(resource_info) return computed_payload def _satisfy_payload_resource(self, resource_identifier: str) \ -> t.Optional[ComputedPayload]: return self._satisfy_payload_resource_rec( resource_identifier, set(), ComputedPayload() ) def _compute_payloads_no_mapping_requirements(self) -> ComputedChoices: computed_result: ComputedChoices = ComputedChoices() for mapping_info in self.mappings.values(): mapping_choice = MappingChoice(mapping_info) failure = False for pattern, resource_spec in mapping_info.payloads.items(): computed_payload = self._satisfy_payload_resource( resource_spec.identifier ) if computed_payload is None: failure = True break if mapping_info.allows_eval: computed_payload.allows_eval = True if mapping_info.allows_cors_bypass: computed_payload.allows_cors_bypass = True mapping_choice.payloads[pattern] = computed_payload if not failure: computed_result[mapping_info.identifier] = mapping_choice return computed_result def _compute_inter_mapping_deps(self, choices: ComputedChoices) \ -> dict[str, frozenset[str]]: mapping_deps: dict[str, frozenset[str]] = {} for mapping_choice in choices.values(): specs_to_resolve = [*mapping_choice.info.required_mappings] for computed_payload in mapping_choice.payloads.values(): for resource_info in computed_payload.resources: specs_to_resolve.extend(resource_info.required_mappings) depended = frozenset(spec.identifier for spec in specs_to_resolve) mapping_deps[mapping_choice.info.identifier] = depended return mapping_deps def compute_payloads(self) -> ComputedChoices: choices = self._compute_payloads_no_mapping_requirements() mapping_deps = self._compute_inter_mapping_deps(choices) reverse_deps: dict[str, set[str]] = {} for depending, depended_set in mapping_deps.items(): for depended in depended_set: reverse_deps.setdefault(depended, set()).add(depending) bad_mappings: set[str] = set() for depended_identifier in reverse_deps.keys(): if self.mappings.get(depended_identifier) not in choices: _mark_mappings(depended_identifier, reverse_deps, bad_mappings) if any(identifier in self.required for identifier in bad_mappings): raise ImpossibleSituation() for identifier in bad_mappings: if identifier in self.required: raise ImpossibleSituation() if identifier in self.mappings: choices.pop(identifier, None) required_mappings: set[str] = set() for identifier in self.required: _mark_mappings(identifier, mapping_deps, required_mappings) for identifier in required_mappings: choices[identifier].required = True return choices AnyInfoVar = t.TypeVar( 'AnyInfoVar', item_infos.ResourceInfo, item_infos.MappingInfo ) def _choose_newest(infos: t.Iterable[AnyInfoVar]) -> dict[str, AnyInfoVar]: best_versions: dict[str, AnyInfoVar] = {} for info in infos: other = best_versions.setdefault(info.identifier, info) if (other.version, other.repo, info.repo_iteration) < \ (info.version, info.repo, other.repo_iteration): best_versions[info.identifier] = info return best_versions def compute_payloads( resources: t.Iterable[item_infos.ResourceInfo], mappings: t.Iterable[item_infos.MappingInfo], requirements: t.Iterable[MappingRequirement] ) -> ComputedChoices: reqs_by_identifier = dict((req.identifier, req) for req in requirements) filtered_mappings = [] for mapping_info in mappings: req = reqs_by_identifier.get(mapping_info.identifier) if req is not None and not req.is_fulfilled_by(mapping_info): continue filtered_mappings.append(mapping_info) best_resources = _choose_newest(resources) best_mappings = _choose_newest(filtered_mappings) required = frozenset(reqs_by_identifier.keys()) return _ComputationData(best_resources, best_mappings, required)\ .compute_payloads()