summaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/simple_dependency_satisfying.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/simple_dependency_satisfying.py')
-rw-r--r--src/hydrilla/proxy/simple_dependency_satisfying.py177
1 files changed, 122 insertions, 55 deletions
diff --git a/src/hydrilla/proxy/simple_dependency_satisfying.py b/src/hydrilla/proxy/simple_dependency_satisfying.py
index 889ae98..f1371db 100644
--- a/src/hydrilla/proxy/simple_dependency_satisfying.py
+++ b/src/hydrilla/proxy/simple_dependency_satisfying.py
@@ -34,9 +34,40 @@ from __future__ import annotations
import dataclasses as dc
import typing as t
+from ..exceptions import HaketiloException
from .. import item_infos
from .. import url_patterns
+
+class ImpossibleSituation(HaketiloException):
+ pass
+
+
+@dc.dataclass(frozen=True)
+class MappingRequirement:
+ identifier: str
+
+ def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool:
+ return True
+
+@dc.dataclass(frozen=True)
+class MappingRepoRequirement(MappingRequirement):
+ repo: str
+
+ def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool:
+ return info.repo == self.repo
+
+@dc.dataclass(frozen=True)
+class MappingVersionRequirement(MappingRequirement):
+ version_info: item_infos.MappingInfo
+
+ def __post_init__(self):
+ assert self.version_info.identifier == self.identifier
+
+ def is_fulfilled_by(self, info: item_infos.MappingInfo) -> bool:
+ return info == self.version_info
+
+
@dc.dataclass
class ComputedPayload:
resources: list[item_infos.ResourceInfo] = dc.field(default_factory=list)
@@ -44,22 +75,40 @@ class ComputedPayload:
allows_eval: bool = False
allows_cors_bypass: bool = False
-SingleMappingPayloads = t.Mapping[
- url_patterns.ParsedPattern,
- ComputedPayload
-]
+@dc.dataclass
+class MappingChoice:
+ info: item_infos.MappingInfo
+ required: bool = False
+ payloads: dict[url_patterns.ParsedPattern, ComputedPayload] = \
+ dc.field(default_factory=dict)
+
-ComputedPayloadsDict = dict[
- item_infos.MappingInfo,
- SingleMappingPayloads
+MappingsGraph = t.Union[
+ t.Mapping[str, set[str]],
+ t.Mapping[str, frozenset[str]]
]
-empty_identifiers_set: set[str] = set()
+def _mark_mappings(
+ identifier: str,
+ mappings_graph: MappingsGraph,
+ marked_mappings: set[str]
+) -> None:
+ if identifier in marked_mappings:
+ return
+
+ marked_mappings.add(identifier)
+
+ for next_mapping in mappings_graph.get(identifier, ()):
+ _mark_mappings(next_mapping, mappings_graph, marked_mappings)
+
+
+ComputedChoices = dict[str, MappingChoice]
@dc.dataclass(frozen=True)
-class _ItemsCollection:
+class _ComputationData:
resources: t.Mapping[str, item_infos.ResourceInfo]
mappings: t.Mapping[str, item_infos.MappingInfo]
+ required: frozenset[str]
def _satisfy_payload_resource_rec(
self,
@@ -108,11 +157,11 @@ class _ItemsCollection:
ComputedPayload()
)
- def _compute_payloads_no_mapping_requirements(self) -> ComputedPayloadsDict:
- computed_result: ComputedPayloadsDict = ComputedPayloadsDict()
+ def _compute_payloads_no_mapping_requirements(self) -> ComputedChoices:
+ computed_result: ComputedChoices = ComputedChoices()
for mapping_info in self.mappings.values():
- by_pattern: dict[url_patterns.ParsedPattern, ComputedPayload] = {}
+ mapping_choice = MappingChoice(mapping_info)
failure = False
@@ -130,63 +179,66 @@ class _ItemsCollection:
if mapping_info.allows_cors_bypass:
computed_payload.allows_cors_bypass = True
- by_pattern[pattern] = computed_payload
+ mapping_choice.payloads[pattern] = computed_payload
if not failure:
- computed_result[mapping_info] = by_pattern
+ computed_result[mapping_info.identifier] = mapping_choice
return computed_result
- def _mark_mappings_bad(
- self,
- identifier: str,
- reverse_mapping_deps: t.Mapping[str, set[str]],
- bad_mappings: set[str]
- ) -> None:
- if identifier in bad_mappings:
- return
+ def _compute_inter_mapping_deps(self, choices: ComputedChoices) \
+ -> dict[str, frozenset[str]]:
+ mapping_deps: dict[str, frozenset[str]] = {}
- bad_mappings.add(identifier)
+ for mapping_choice in choices.values():
+ specs_to_resolve = [*mapping_choice.info.required_mappings]
- for requiring in reverse_mapping_deps.get(identifier, ()):
- self._mark_mappings_bad(
- requiring,
- reverse_mapping_deps,
- bad_mappings
- )
+ for computed_payload in mapping_choice.payloads.values():
+ for resource_info in computed_payload.resources:
+ specs_to_resolve.extend(resource_info.required_mappings)
- def compute_payloads(self) -> ComputedPayloadsDict:
- computed_result = self._compute_payloads_no_mapping_requirements()
+ depended = frozenset(spec.identifier for spec in specs_to_resolve)
+ mapping_deps[mapping_choice.info.identifier] = depended
- reverse_mapping_deps: dict[str, set[str]] = {}
+ return mapping_deps
- for mapping_info, by_pattern in computed_result.items():
- specs_to_resolve = [*mapping_info.required_mappings]
+ def compute_payloads(self) -> ComputedChoices:
+ choices = self._compute_payloads_no_mapping_requirements()
- for computed_payload in by_pattern.values():
- for resource_info in computed_payload.resources:
- specs_to_resolve.extend(resource_info.required_mappings)
+ mapping_deps = self._compute_inter_mapping_deps(choices)
+
+ reverse_deps: dict[str, set[str]] = {}
- for required_mapping_spec in specs_to_resolve:
- identifier = required_mapping_spec.identifier
- requiring = reverse_mapping_deps.setdefault(identifier, set())
- requiring.add(mapping_info.identifier)
+ for depending, depended_set in mapping_deps.items():
+ for depended in depended_set:
+ reverse_deps.setdefault(depended, set()).add(depending)
bad_mappings: set[str] = set()
- for required_identifier in reverse_mapping_deps.keys():
- if self.mappings.get(required_identifier) not in computed_result:
- self._mark_mappings_bad(
- required_identifier,
- reverse_mapping_deps,
- bad_mappings
- )
+ for depended_identifier in reverse_deps.keys():
+ if self.mappings.get(depended_identifier) not in choices:
+ _mark_mappings(depended_identifier, reverse_deps, bad_mappings)
+
+ if any(identifier in self.required for identifier in bad_mappings):
+ raise ImpossibleSituation()
for identifier in bad_mappings:
+ if identifier in self.required:
+ raise ImpossibleSituation()
+
if identifier in self.mappings:
- computed_result.pop(self.mappings[identifier], None)
+ choices.pop(identifier, None)
+
+ required_mappings: set[str] = set()
+
+ for identifier in self.required:
+ _mark_mappings(identifier, mapping_deps, required_mappings)
+
+ for identifier in required_mappings:
+ choices[identifier].required = True
+
+ return choices
- return computed_result
AnyInfoVar = t.TypeVar(
'AnyInfoVar',
@@ -207,10 +259,25 @@ def _choose_newest(infos: t.Iterable[AnyInfoVar]) -> dict[str, AnyInfoVar]:
return best_versions
def compute_payloads(
- resources: t.Iterable[item_infos.ResourceInfo],
- mappings: t.Iterable[item_infos.MappingInfo]
-) -> ComputedPayloadsDict:
+ resources: t.Iterable[item_infos.ResourceInfo],
+ mappings: t.Iterable[item_infos.MappingInfo],
+ requirements: t.Iterable[MappingRequirement]
+) -> ComputedChoices:
+ reqs_by_identifier = dict((req.identifier, req) for req in requirements)
+
+ filtered_mappings = []
+
+ for mapping_info in mappings:
+ req = reqs_by_identifier.get(mapping_info.identifier)
+ if req is not None and not req.is_fulfilled_by(mapping_info):
+ continue
+
+ filtered_mappings.append(mapping_info)
+
best_resources = _choose_newest(resources)
- best_mappings = _choose_newest(mappings)
+ best_mappings = _choose_newest(filtered_mappings)
+
+ required = frozenset(reqs_by_identifier.keys())
- return _ItemsCollection(best_resources, best_mappings).compute_payloads()
+ return _ComputationData(best_resources, best_mappings, required)\
+ .compute_payloads()