aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/simple_dependency_satisfying.py
diff options
context:
space:
mode:
authorWojtek Kosior <koszko@koszko.org>2022-07-27 15:56:24 +0200
committerWojtek Kosior <koszko@koszko.org>2022-08-10 17:25:05 +0200
commit879c41927171efc8d77d1de2739b18e2eb57580f (patch)
treede0e78afe2ea49e58c9bf2c662657392a00139ee /src/hydrilla/proxy/simple_dependency_satisfying.py
parent52d12a4fa124daa1595529e3e7008276a7986d95 (diff)
downloadhaketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.tar.gz
haketilo-hydrilla-879c41927171efc8d77d1de2739b18e2eb57580f.zip
unfinished partial work
Diffstat (limited to 'src/hydrilla/proxy/simple_dependency_satisfying.py')
-rw-r--r--src/hydrilla/proxy/simple_dependency_satisfying.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/simple_dependency_satisfying.py b/src/hydrilla/proxy/simple_dependency_satisfying.py
new file mode 100644
index 0000000..9716fe5
--- /dev/null
+++ b/src/hydrilla/proxy/simple_dependency_satisfying.py
@@ -0,0 +1,189 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Haketilo proxy payloads dependency resolution.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use this code
+# in a proprietary program, I am not going to enforce this in court.
+
+"""
+.....
+"""
+
+# Enable using with Python 3.7.
+from __future__ import annotations
+
+import dataclasses as dc
+import typing as t
+
+from .. import item_infos
+from .. import url_patterns
+
+@dc.dataclass
+class ComputedPayload:
+ resources: list[item_infos.ResourceInfo] = dc.field(default_factory=list)
+
+ allows_eval: bool = False
+ allows_cors_bypass: bool = False
+
+SingleMappingPayloads = t.Mapping[
+ url_patterns.ParsedPattern,
+ ComputedPayload
+]
+
+ComputedPayloadsDict = dict[
+ item_infos.MappingInfo,
+ SingleMappingPayloads
+]
+
+empty_identifiers_set: set[str] = set()
+
+@dc.dataclass(frozen=True)
+class ItemsCollection:
+ resources: t.Mapping[str, item_infos.ResourceInfo]
+ mappings: t.Mapping[str, item_infos.MappingInfo]
+
+ def _satisfy_payload_resource_rec(
+ self,
+ resource_identifier: str,
+ processed_resources: set[str],
+ computed_payload: ComputedPayload
+ ) -> t.Optional[ComputedPayload]:
+ if resource_identifier in processed_resources:
+ # We forbid circular dependencies.
+ return None
+
+ resource_info = self.resources.get(resource_identifier)
+ if resource_info is None:
+ return None
+
+ if resource_info in computed_payload.resources:
+ return computed_payload
+
+ processed_resources.add(resource_identifier)
+
+ if resource_info.allows_eval:
+ computed_payload.allows_eval = True
+
+ if resource_info.allows_cors_bypass:
+ computed_payload.allows_cors_bypass = True
+
+ for dependency_spec in resource_info.dependencies:
+ if self._satisfy_payload_resource_rec(
+ dependency_spec.identifier,
+ processed_resources,
+ computed_payload
+ ) is None:
+ return None
+
+ processed_resources.remove(resource_identifier)
+
+ computed_payload.resources.append(resource_info)
+
+ return computed_payload
+
+ def _satisfy_payload_resource(self, resource_identifier: str) \
+ -> t.Optional[ComputedPayload]:
+ return self._satisfy_payload_resource_rec(
+ resource_identifier,
+ set(),
+ ComputedPayload()
+ )
+
+ def _compute_payloads_no_mapping_requirements(self) -> ComputedPayloadsDict:
+ computed_result: ComputedPayloadsDict = ComputedPayloadsDict()
+
+ for mapping_info in self.mappings.values():
+ by_pattern: dict[url_patterns.ParsedPattern, ComputedPayload] = {}
+
+ failure = False
+
+ for pattern, resource_spec in mapping_info.payloads.items():
+ computed_payload = self._satisfy_payload_resource(
+ resource_spec.identifier
+ )
+ if computed_payload is None:
+ failure = True
+ break
+
+ if mapping_info.allows_eval:
+ computed_payload.allows_eval = True
+
+ if mapping_info.allows_cors_bypass:
+ computed_payload.allows_cors_bypass = True
+
+ by_pattern[pattern] = computed_payload
+
+ if not failure:
+ computed_result[mapping_info] = by_pattern
+
+ return computed_result
+
+ def _mark_mappings_bad(
+ self,
+ identifier: str,
+ reverse_mapping_deps: t.Mapping[str, set[str]],
+ bad_mappings: set[str]
+ ) -> None:
+ if identifier in bad_mappings:
+ return
+
+ bad_mappings.add(identifier)
+
+ for requiring in reverse_mapping_deps.get(identifier, ()):
+ self._mark_mappings_bad(
+ requiring,
+ reverse_mapping_deps,
+ bad_mappings
+ )
+
+ def compute_payloads(self) -> ComputedPayloadsDict:
+ computed_result = self._compute_payloads_no_mapping_requirements()
+
+ reverse_mapping_deps: dict[str, set[str]] = {}
+
+ for mapping_info, by_pattern in computed_result.items():
+ specs_to_resolve = [*mapping_info.required_mappings]
+
+ for computed_payload in by_pattern.values():
+ for resource_info in computed_payload.resources:
+ specs_to_resolve.extend(resource_info.required_mappings)
+
+ for required_mapping_spec in specs_to_resolve:
+ identifier = required_mapping_spec.identifier
+ requiring = reverse_mapping_deps.setdefault(identifier, set())
+ requiring.add(mapping_info.identifier)
+
+ bad_mappings: set[str] = set()
+
+ for required_identifier in reverse_mapping_deps.keys():
+ if self.mappings.get(required_identifier) not in computed_result:
+ self._mark_mappings_bad(
+ required_identifier,
+ reverse_mapping_deps,
+ bad_mappings
+ )
+
+ for identifier in bad_mappings:
+ if identifier in self.mappings:
+ computed_result.pop(self.mappings[identifier], None)
+
+ return computed_result