# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Our helpful little stand-in for the Internet
"""

# This file is part of Haketilo. Although I request that you do not make use of this code # in a proprietary program, I am not going to enforce this in court. from hashlib import sha256 from pathlib import Path from shutil import rmtree from threading import Lock from uuid import uuid4 import json from .misc_constants import here from .unit.utils import * # sample repo data # TODO: instead of having the entire catalog defined here, make it possible to # add catalog items from within individual test files. served_scripts = {} served_scripts_lock = Lock() def start_serving_script(script_text): """ Register given script so that it is served at https://serve.scrip.ts/?sha256= Returns the URL at which script will be served. This function lacks thread safety. Might moght consider fixing this if it turns """ sha256sum = sha256(script_text.encode()).digest().hex() served_scripts_lock.acquire() served_scripts[sha256sum] = script_text served_scripts_lock.release() return f'https://serve.scrip.ts/?sha256={sha256sum}' def serve_script(command, get_params, post_params): """ info() callback to pass to request-handling code in server.py. Facilitates serving scripts that have been registered with start_serving_script(). """ served_scripts_lock.acquire() try: script = served_scripts.get(get_params['sha256'][0]) finally: served_scripts_lock.release() if script is None: return 404, {}, b'' return 200, {'Content-Type': 'application/javascript'}, script def dump_scripts(directory='./injected_scripts'): """ Write all scripts that have been registered with start_serving_script() under the provided directory. If the directory already exists, it is wiped beforehand. If it doesn't exist, it is created. """ directory = Path(directory) rmtree(directory, ignore_errors=True) directory.mkdir(parents=True) served_scripts_lock.acquire() for sha256, script in served_scripts.items(): with open(directory / sha256, 'wt') as file: file.write(script) served_scripts_lock.release() some_data = '{"some": "data"}' # used by handler function of https://counterdoma.in request_counter = 0 def serve_counter(command, get_params, post_params): global request_counter request_counter += 1 return ( 200, {'Cache-Control': 'private, max-age=0, no-store'}, json.dumps({'counter': request_counter}) ) # Mock a Hydrilla repository. # Mock files in the repository. sample_contents = [f'Mi povas manĝi vitron, ĝi ne damaĝas min {i}' for i in range(9)] sample_hashes = [sha256(c.encode()).digest().hex() for c in sample_contents] file_url = lambda hashed: f'https://hydril.la/file/sha256-{hashed}' file_handler = lambda contents: lambda c, g, p: (200, {}, contents) sample_files_catalog = dict([(file_url(h), file_handler(c)) for h, c in zip(sample_hashes, sample_contents)]) # Mock resources and mappings in the repository. sample_resource_templates = [] for deps in [(0, 1, 2, 3), (3, 4, 5, 6), (6, 7, 8, 9)]: letters = [chr(ord('a') + i) for i in deps] sample_resource_templates.append({ 'id_suffix': ''.join(letters), 'files_count': deps[0], 'dependencies': [f'resource_{l}' for l in letters] }) suffixes = [srt['id_suffix'] for srt in sample_resource_templates] sample_resource_templates.append({ 'id_suffix': '-'.join(suffixes), 'files_count': 2, 'dependencies': [f'resource_{suffix}' for suffix in suffixes] }) for i in range(10): sample_resource_templates.append({ 'id_suffix': chr(ord('a') + i), 'files_count': i, 'dependencies': [] }) sample_resources_catalog = {} sample_mappings_catalog = {} for srt in sample_resource_templates: resource = make_sample_resource() resource['api_schema_version'] = [1] resource['api_schema_revision'] = 1 resource['identifier'] = f'resource_{srt["id_suffix"]}' resource['long_name'] = resource['identifier'].upper() resource['uuid'] = str(uuid4()) resource['dependencies'] = srt['dependencies'] resource['source_copyright'] = [] resource['scripts'] = [] for i in range(srt['files_count']): file_ref = {'file': f'file_{i}', 'sha256': sample_hashes[i]} resource[('source_copyright', 'scripts')[i & 1]].append(file_ref) # Keeping it simple - just make one corresponding mapping for each resource. payloads = {'https://example.com/*': {'identifier': resource['identifier']}} mapping = make_sample_mapping() mapping['api_schema_version'] = [1] mapping['api_schema_revision'] = 1 mapping['identifier'] = f'mapping_{srt["id_suffix"]}' mapping['long_name'] = mapping['identifier'].upper() mapping['uuid'] = str(uuid4()) mapping['source_copyright'] = resource['source_copyright'] mapping['payloads'] = payloads make_handler = lambda txt: lambda c, g, p: (200, {}, txt) for item, catalog in [ (resource, sample_resources_catalog), (mapping, sample_mappings_catalog) ]: fmt = f'https://hydril.la/{item["type"]}/{item["identifier"]}%s.json' # Make 2 versions of each item so that we can test updates. for i in range(2): for fmt_arg in ('', '/' + item_version_string(item)): catalog[fmt % fmt_arg] = make_handler(json.dumps(item)) item['version'][-1] += 1 catalog = { 'http://gotmyowndoma.in': (302, {'location': 'http://gotmyowndoma.in/index.html'}, None), 'http://gotmyowndoma.in/': (302, {'location': 'http://gotmyowndoma.in/index.html'}, None), 'http://gotmyowndoma.in/index.html': (200, {}, here / 'data' / 'pages' / 'gotmyowndomain.html'), 'https://gotmyowndoma.in': (302, {'location': 'https://gotmyowndoma.in/index.html'}, None), 'https://gotmyowndoma.in/': (302, {'location': 'https://gotmyowndoma.in/index.html'}, None), 'https://gotmyowndoma.in/index.html': (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), 'https://gotmyowndoma.in/scripts_to_block_1.html': (200, {}, here / 'data' / 'pages' / 'scripts_to_block_1.html'), 'https://anotherdoma.in/resource/blocked/by/CORS.json': lambda command, get_params, post_params: (200, {}, some_data), 'https://counterdoma.in/': serve_counter, 'https://serve.scrip.ts/': serve_script, 'https://site.with.scripts.block.ed': (302, {'location': 'https://site.with.scripts.block.ed/index.html'}, None), 'https://site.with.scripts.block.ed/': (302, {'location': 'https://site.with.scripts.block.ed/index.html'}, None), 'https://site.with.scripts.block.ed/index.html': (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), 'https://site.with.scripts.allow.ed': (302, {'location': 'https://site.with.scripts.allow.ed/index.html'}, None), 'https://site.with.scripts.allow.ed/': (302, {'location': 'https://site.with.scripts.allow.ed/index.html'}, None), 'https://site.with.scripts.allow.ed/index.html': (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), 'https://site.with.paylo.ad': (302, {'location': 'https://site.with.paylo.ad/index.html'}, None), 'https://site.with.paylo.ad/': (302, {'location': 'https://site.with.paylo.ad/index.html'}, None), 'https://site.with.paylo.ad/index.html': (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), **sample_files_catalog, **sample_resources_catalog, **sample_mappings_catalog }