diff options
Diffstat (limited to 'src/hydrilla/proxy/simple_dependency_satisfying.py')
-rw-r--r-- | src/hydrilla/proxy/simple_dependency_satisfying.py | 177 |
1 files changed, 122 insertions, 55 deletions
diff --git a/src/hydrilla/proxy/simple_dependency_satisfying.py b/src/hydrilla/proxy/simple_dependency_satisfying.py index 889ae98..f1371db 100644 --- a/src/hydrilla/proxy/simple_dependency_satisfying.py +++ b/src/hydrilla/proxy/simple_dependency_satisfying.py @@ -34,9 +34,40 @@ from __future__ import annotations import dataclasses as dc import typing as t +from ..exceptions import HaketiloException from .. import item_infos from .. import url_patterns + +class ImpossibleSituation(HaketiloException): + pass + + +@dc.dataclass(frozen=True) +class MappingRequirement: + identifier: str + + def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool: + return True + +@dc.dataclass(frozen=True) +class MappingRepoRequirement(MappingRequirement): + repo: str + + def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool: + return info.repo == self.repo + +@dc.dataclass(frozen=True) +class MappingVersionRequirement(MappingRequirement): + version_info: item_infos.MappingInfo + + def __post_init__(self): + assert self.version_info.identifier == self.identifier + + def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool: + return info == self.version_info + + @dc.dataclass class ComputedPayload: resources: list[item_infos.ResourceInfo] = dc.field(default_factory=list) @@ -44,22 +75,40 @@ class ComputedPayload: allows_eval: bool = False allows_cors_bypass: bool = False -SingleMappingPayloads = t.Mapping[ - url_patterns.ParsedPattern, - ComputedPayload -] +@dc.dataclass +class MappingChoice: + info: item_infos.MappingInfo + required: bool = False + payloads: dict[url_patterns.ParsedPattern, ComputedPayload] = \ + dc.field(default_factory=dict) + -ComputedPayloadsDict = dict[ - item_infos.MappingInfo, - SingleMappingPayloads +MappingsGraph = t.Union[ + t.Mapping[str, set[str]], + t.Mapping[str, frozenset[str]] ] -empty_identifiers_set: set[str] = set() +def _mark_mappings( + identifier: str, + mappings_graph: MappingsGraph, + marked_mappings: set[str] +) -> None: + if identifier in marked_mappings: + return + + marked_mappings.add(identifier) + + for next_mapping in mappings_graph.get(identifier, ()): + _mark_mappings(next_mapping, mappings_graph, marked_mappings) + + +ComputedChoices = dict[str, MappingChoice] @dc.dataclass(frozen=True) -class _ItemsCollection: +class _ComputationData: resources: t.Mapping[str, item_infos.ResourceInfo] mappings: t.Mapping[str, item_infos.MappingInfo] + required: frozenset[str] def _satisfy_payload_resource_rec( self, @@ -108,11 +157,11 @@ class _ItemsCollection: ComputedPayload() ) - def _compute_payloads_no_mapping_requirements(self) -> ComputedPayloadsDict: - computed_result: ComputedPayloadsDict = ComputedPayloadsDict() + def _compute_payloads_no_mapping_requirements(self) -> ComputedChoices: + computed_result: ComputedChoices = ComputedChoices() for mapping_info in self.mappings.values(): - by_pattern: dict[url_patterns.ParsedPattern, ComputedPayload] = {} + mapping_choice = MappingChoice(mapping_info) failure = False @@ -130,63 +179,66 @@ class _ItemsCollection: if mapping_info.allows_cors_bypass: computed_payload.allows_cors_bypass = True - by_pattern[pattern] = computed_payload + mapping_choice.payloads[pattern] = computed_payload if not failure: - computed_result[mapping_info] = by_pattern + computed_result[mapping_info.identifier] = mapping_choice return computed_result - def _mark_mappings_bad( - self, - identifier: str, - reverse_mapping_deps: t.Mapping[str, set[str]], - bad_mappings: set[str] - ) -> None: - if identifier in bad_mappings: - return + def _compute_inter_mapping_deps(self, choices: ComputedChoices) \ + -> dict[str, frozenset[str]]: + mapping_deps: dict[str, frozenset[str]] = {} - bad_mappings.add(identifier) + for mapping_choice in choices.values(): + specs_to_resolve = [*mapping_choice.info.required_mappings] - for requiring in reverse_mapping_deps.get(identifier, ()): - self._mark_mappings_bad( - requiring, - reverse_mapping_deps, - bad_mappings - ) + for computed_payload in mapping_choice.payloads.values(): + for resource_info in computed_payload.resources: + specs_to_resolve.extend(resource_info.required_mappings) - def compute_payloads(self) -> ComputedPayloadsDict: - computed_result = self._compute_payloads_no_mapping_requirements() + depended = frozenset(spec.identifier for spec in specs_to_resolve) + mapping_deps[mapping_choice.info.identifier] = depended - reverse_mapping_deps: dict[str, set[str]] = {} + return mapping_deps - for mapping_info, by_pattern in computed_result.items(): - specs_to_resolve = [*mapping_info.required_mappings] + def compute_payloads(self) -> ComputedChoices: + choices = self._compute_payloads_no_mapping_requirements() - for computed_payload in by_pattern.values(): - for resource_info in computed_payload.resources: - specs_to_resolve.extend(resource_info.required_mappings) + mapping_deps = self._compute_inter_mapping_deps(choices) + + reverse_deps: dict[str, set[str]] = {} - for required_mapping_spec in specs_to_resolve: - identifier = required_mapping_spec.identifier - requiring = reverse_mapping_deps.setdefault(identifier, set()) - requiring.add(mapping_info.identifier) + for depending, depended_set in mapping_deps.items(): + for depended in depended_set: + reverse_deps.setdefault(depended, set()).add(depending) bad_mappings: set[str] = set() - for required_identifier in reverse_mapping_deps.keys(): - if self.mappings.get(required_identifier) not in computed_result: - self._mark_mappings_bad( - required_identifier, - reverse_mapping_deps, - bad_mappings - ) + for depended_identifier in reverse_deps.keys(): + if self.mappings.get(depended_identifier) not in choices: + _mark_mappings(depended_identifier, reverse_deps, bad_mappings) + + if any(identifier in self.required for identifier in bad_mappings): + raise ImpossibleSituation() for identifier in bad_mappings: + if identifier in self.required: + raise ImpossibleSituation() + if identifier in self.mappings: - computed_result.pop(self.mappings[identifier], None) + choices.pop(identifier, None) + + required_mappings: set[str] = set() + + for identifier in self.required: + _mark_mappings(identifier, mapping_deps, required_mappings) + + for identifier in required_mappings: + choices[identifier].required = True + + return choices - return computed_result AnyInfoVar = t.TypeVar( 'AnyInfoVar', @@ -207,10 +259,25 @@ def _choose_newest(infos: t.Iterable[AnyInfoVar]) -> dict[str, AnyInfoVar]: return best_versions def compute_payloads( - resources: t.Iterable[item_infos.ResourceInfo], - mappings: t.Iterable[item_infos.MappingInfo] -) -> ComputedPayloadsDict: + resources: t.Iterable[item_infos.ResourceInfo], + mappings: t.Iterable[item_infos.MappingInfo], + requirements: t.Iterable[MappingRequirement] +) -> ComputedChoices: + reqs_by_identifier = dict((req.identifier, req) for req in requirements) + + filtered_mappings = [] + + for mapping_info in mappings: + req = reqs_by_identifier.get(mapping_info.identifier) + if req is not None and not req.is_fulfilled_by(mapping_info): + continue + + filtered_mappings.append(mapping_info) + best_resources = _choose_newest(resources) - best_mappings = _choose_newest(mappings) + best_mappings = _choose_newest(filtered_mappings) + + required = frozenset(reqs_by_identifier.keys()) - return _ItemsCollection(best_resources, best_mappings).compute_payloads() + return _ComputationData(best_resources, best_mappings, required)\ + .compute_payloads() |