From c242a5833d41fdcee6e2b35cff7af8d445b44946 Mon Sep 17 00:00:00 2001 From: Wojtek Kosior Date: Wed, 17 Aug 2022 21:46:11 +0200 Subject: remove unneeded oversimplified_state_impl.py --- src/hydrilla/proxy/oversimplified_state_impl.py | 392 ------------------------ 1 file changed, 392 deletions(-) delete mode 100644 src/hydrilla/proxy/oversimplified_state_impl.py (limited to 'src') diff --git a/src/hydrilla/proxy/oversimplified_state_impl.py b/src/hydrilla/proxy/oversimplified_state_impl.py deleted file mode 100644 index c082add..0000000 --- a/src/hydrilla/proxy/oversimplified_state_impl.py +++ /dev/null @@ -1,392 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -# Haketilo proxy data and configuration (instantiatable HaketiloState subtype). -# -# This file is part of Hydrilla&Haketilo. -# -# Copyright (C) 2022 Wojtek Kosior -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# -# I, Wojtek Kosior, thereby promise not to sue for violation of this -# file's license. Although I request that you do not make use this code -# in a proprietary program, I am not going to enforce this in court. - -""" -This module contains logic for keeping track of all settings, rules, mappings -and resources. -""" - -# Enable using with Python 3.7. -from __future__ import annotations - -import secrets -import threading -import typing as t -import dataclasses as dc - -from pathlib import Path - -from ..pattern_tree import PatternTree -from .. import url_patterns -from .. import versions -from .. import item_infos -from .simple_dependency_satisfying import ItemsCollection, ComputedPayload -from . import state as st -from . import policies - - -@dc.dataclass(frozen=True, unsafe_hash=True) # type: ignore[misc] -class ConcreteRepoRef(st.RepoRef): - def remove(self, state: st.HaketiloState) -> None: - raise NotImplementedError() - - def update( - self, - state: st.HaketiloState, - *, - name: t.Optional[str] = None, - url: t.Optional[str] = None - ) -> ConcreteRepoRef: - raise NotImplementedError() - - def refresh(self, state: st.HaketiloState) -> ConcreteRepoIterationRef: - raise NotImplementedError() - - -@dc.dataclass(frozen=True, unsafe_hash=True) -class ConcreteRepoIterationRef(st.RepoIterationRef): - pass - - -@dc.dataclass(frozen=True, unsafe_hash=True) -class ConcreteMappingRef(st.MappingRef): - def disable(self, state: st.HaketiloState) -> None: - raise NotImplementedError() - - def forget_enabled(self, state: st.HaketiloState) -> None: - raise NotImplementedError() - - -@dc.dataclass(frozen=True, unsafe_hash=True) -class ConcreteMappingVersionRef(st.MappingVersionRef): - def enable(self, state: st.HaketiloState) -> None: - raise NotImplementedError() - - -@dc.dataclass(frozen=True, unsafe_hash=True) -class ConcreteResourceRef(st.ResourceRef): - pass - - -@dc.dataclass(frozen=True, unsafe_hash=True) -class ConcreteResourceVersionRef(st.ResourceVersionRef): - pass - - -@dc.dataclass(frozen=True, unsafe_hash=True) -class ConcretePayloadRef(st.PayloadRef): - computed_payload: ComputedPayload = dc.field(hash=False, compare=False) - - def get_data(self, state: st.HaketiloState) -> st.PayloadData: - return t.cast(ConcreteHaketiloState, state).payloads_data[self.id] - - def get_mapping(self, state: st.HaketiloState) -> st.MappingVersionRef: - return 'to implement' - - def get_script_paths(self, state: st.HaketiloState) \ - -> t.Iterator[t.Sequence[str]]: - for resource_info in self.computed_payload.resources: - for file_spec in resource_info.scripts: - yield (resource_info.identifier, *file_spec.name.split('/')) - - def get_file_data(self, state: st.HaketiloState, path: t.Sequence[str]) \ - -> t.Optional[st.FileData]: - if len(path) == 0: - raise st.MissingItemError() - - resource_identifier, *file_name_segments = path - - file_name = '/'.join(file_name_segments) - - script_sha256 = '' - - matched_resource_info = False - - for resource_info in self.computed_payload.resources: - if resource_info.identifier == resource_identifier: - matched_resource_info = True - - for script_spec in resource_info.scripts: - if script_spec.name == file_name: - script_sha256 = script_spec.sha256 - - break - - if not matched_resource_info: - raise st.MissingItemError(resource_identifier) - - if script_sha256 == '': - return None - - store_dir_path = t.cast(ConcreteHaketiloState, state).store_dir - files_dir_path = store_dir_path / 'temporary_malcontent' / 'file' - file_path = files_dir_path / 'sha256' / script_sha256 - - return st.FileData( - type = 'application/javascript', - name = file_name, - contents = file_path.read_bytes() - ) - - -# @dc.dataclass(frozen=True, unsafe_hash=True) -# class DummyPayloadRef(ConcretePayloadRef): -# paths = { -# ('someresource', 'somefolder', 'somescript.js'): st.FileData( -# type = 'application/javascript', -# name = 'somefolder/somescript.js', -# contents = b'console.log("hello, mitmproxy")' -# ) -# } - -# def get_data(self, state: st.HaketiloState) -> st.PayloadData: -# parsed_pattern = next(url_patterns.parse_pattern('https://example.com')) - -# return st.PayloadData( -# payload_ref = self, -# mapping_installed = True, -# explicitly_enabled = True, -# unique_token = 'g54v45g456h4r', -# pattern = parsed_pattern, -# eval_allowed = True, -# cors_bypass_allowed = True -# ) - -# def get_mapping(self, state: st.HaketiloState) -> st.MappingVersionRef: -# return ConcreteMappingVersionRef('somemapping') - -# def get_file_paths(self, state: st.HaketiloState) \ -# -> t.Iterable[t.Sequence[str]]: -# return tuple(self.paths.keys()) - -# def get_file_data(self, state: st.HaketiloState, path: t.Sequence[str]) \ -# -> t.Optional[st.FileData]: -# return self.paths[tuple(path)] - - -PolicyTree = PatternTree[policies.PolicyFactory] - -def register_payload( - policy_tree: PolicyTree, - payload_key: st.PayloadKey, - token: str -) -> PolicyTree: - """....""" - payload_policy_factory = policies.PayloadPolicyFactory( - builtin = False, - payload_key = payload_key - ) - - policy_tree = policy_tree.register( - payload_key.pattern, - payload_policy_factory - ) - - resource_policy_factory = policies.PayloadResourcePolicyFactory( - builtin = False, - payload_key = payload_key - ) - - policy_tree = policy_tree.register( - payload_key.pattern.path_append(token, '***'), - resource_policy_factory - ) - - return policy_tree - -DataById = t.Mapping[str, st.PayloadData] - -AnyInfo = t.TypeVar('AnyInfo', item_infos.ResourceInfo, item_infos.MappingInfo) - -@dc.dataclass -class ConcreteHaketiloState(st.HaketiloState): - store_dir: Path - # settings: state.HaketiloGlobalSettings - policy_tree: PolicyTree = PatternTree() - payloads_data: DataById = dc.field(default_factory=dict) - - lock: threading.RLock = dc.field(default_factory=threading.RLock) - - def __post_init__(self) -> None: - def newest_item_path(item_dir: Path) -> t.Optional[Path]: - available_versions = tuple( - versions.parse_normalize_version(ver_path.name) - for ver_path in item_dir.iterdir() - if ver_path.is_file() - ) - - if available_versions == (): - return None - - newest_version = max(available_versions) - - version_path = item_dir / versions.version_string(newest_version) - - assert version_path.is_file() - - return version_path - - def read_items(dir_path: Path, item_class: t.Type[AnyInfo]) \ - -> t.Mapping[str, AnyInfo]: - items: dict[str, AnyInfo] = {} - - for resource_dir in dir_path.iterdir(): - if not resource_dir.is_dir(): - continue - - item_path = newest_item_path(resource_dir) - if item_path is None: - continue - - item = item_class.load(item_path) - - assert versions.version_string(item.version) == item_path.name - assert item.identifier == resource_dir.name - - items[item.identifier] = item - - return items - - malcontent_dir = self.store_dir / 'temporary_malcontent' - - items_collection = ItemsCollection( - read_items(malcontent_dir / 'resource', item_infos.ResourceInfo), - read_items(malcontent_dir / 'mapping', item_infos.MappingInfo) - ) - computed_payloads = items_collection.compute_payloads() - - payloads_data = {} - - for mapping_info, by_pattern in computed_payloads.items(): - for num, (pattern, payload) in enumerate(by_pattern.items()): - payload_id = f'{num}@{mapping_info.identifier}' - - ref = ConcretePayloadRef(payload_id, payload) - - data = st.PayloadData( - payload_ref = ref, - mapping_installed = True, - explicitly_enabled = True, - unique_token = secrets.token_urlsafe(16), - pattern = pattern, - eval_allowed = payload.allows_eval, - cors_bypass_allowed = payload.allows_cors_bypass - ) - - payloads_data[payload_id] = data - - key = st.PayloadKey( - payload_ref = ref, - mapping_identifier = mapping_info.identifier, - pattern = pattern - ) - - self.policy_tree = register_payload( - self.policy_tree, - key, - data.unique_token - ) - - self.payloads_data = payloads_data - - def get_repo(self, repo_id: str) -> st.RepoRef: - return ConcreteRepoRef(repo_id) - - def get_repo_iteration(self, repo_iteration_id: str) -> st.RepoIterationRef: - return ConcreteRepoIterationRef(repo_iteration_id) - - def get_mapping(self, mapping_id: str) -> st.MappingRef: - return ConcreteMappingRef(mapping_id) - - def get_mapping_version(self, mapping_version_id: str) \ - -> st.MappingVersionRef: - return ConcreteMappingVersionRef(mapping_version_id) - - def get_resource(self, resource_id: str) -> st.ResourceRef: - return ConcreteResourceRef(resource_id) - - def get_resource_version(self, resource_version_id: str) \ - -> st.ResourceVersionRef: - return ConcreteResourceVersionRef(resource_version_id) - - def get_payload(self, payload_id: str) -> st.PayloadRef: - return 'not implemented' - - def add_repo(self, name: t.Optional[str], url: t.Optional[str]) \ - -> st.RepoRef: - raise NotImplementedError() - - def get_settings(self) -> st.HaketiloGlobalSettings: - return st.HaketiloGlobalSettings( - mapping_use_mode = st.MappingUseMode.AUTO, - default_allow_scripts = True, - repo_refresh_seconds = 0 - ) - - def update_settings( - self, - *, - mapping_use_mode: t.Optional[st.MappingUseMode] = None, - default_allow_scripts: t.Optional[bool] = None, - repo_refresh_seconds: t.Optional[int] = None - ) -> None: - raise NotImplementedError() - - def select_policy(self, url: url_patterns.ParsedUrl) -> policies.Policy: - """....""" - with self.lock: - policy_tree = self.policy_tree - - try: - best_priority: int = 0 - best_policy: t.Optional[policies.Policy] = None - - for factories_set in policy_tree.search(url): - for stored_factory in sorted(factories_set): - factory = stored_factory.item - - policy = factory.make_policy(self) - - if policy.priority > best_priority: - best_priority = policy.priority - best_policy = policy - except Exception as e: - return policies.ErrorBlockPolicy( - builtin = True, - error = e - ) - - if best_policy is not None: - return best_policy - - if self.get_settings().default_allow_scripts: - return policies.FallbackAllowPolicy() - else: - return policies.FallbackBlockPolicy() - - @staticmethod - def make(store_dir: Path) -> 'ConcreteHaketiloState': - return ConcreteHaketiloState(store_dir=store_dir) -- cgit v1.2.3