diff options
Diffstat (limited to 'test/haketilo_test')
40 files changed, 6662 insertions, 0 deletions
diff --git a/test/haketilo_test/__init__.py b/test/haketilo_test/__init__.py new file mode 100644 index 0000000..2b351bb --- /dev/null +++ b/test/haketilo_test/__init__.py @@ -0,0 +1,2 @@ +# SPDX-License-Identifier: CC0-1.0 +# Copyright (C) 2021 Wojtek Kosior diff --git a/test/haketilo_test/__main__.py b/test/haketilo_test/__main__.py new file mode 100644 index 0000000..7afda55 --- /dev/null +++ b/test/haketilo_test/__main__.py @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +""" +Run a Firefox-type browser with WebDriver attached and Python console open +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021 jahoti <jahoti@tilde.team> +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this code +# in a proprietary program, I am not going to enforce this in court. + +import sys +import time +import code +from rlcompleter import Completer +import readline + +from .server import do_an_internet +from .misc_constants import * +from .profiles import firefox_safe_mode +from .extension_crafting import get_extension_base_url + +def fail(msg, error_code): + print('Error:', msg) + print('Usage:', sys.argv[0], '[--load-haketilo]', '[certificates_directory] [proxy_port]') + sys.exit(error_code) + +load_haketilo = False +argv_idx = 1 +if len(sys.argv) > argv_idx and sys.argv[argv_idx] == '--load-haketilo': + load_haketilo = True + argv_idx += 1 + +certdir = Path(sys.argv[argv_idx]).resolve() if len(sys.argv) > argv_idx \ + else default_cert_dir + +if not certdir.is_dir(): + fail('selected certificate directory does not exist.', 2) + +argv_idx += 1 + +port = sys.argv[argv_idx] if len(sys.argv) > argv_idx \ + else str(default_proxy_port) + +if not port.isnumeric(): + fail('port must be an integer.', 3) + +httpd = do_an_internet(certdir, int(port)) +driver = firefox_safe_mode(proxy_port=int(port)) + +if load_haketilo: + driver.install_addon(str(here.parent / 'mozilla-build.zip'), temporary=True) + driver.get(get_extension_base_url(driver) + 'html/settings.html') + +print("You can now control the browser through 'driver' object") + +# Here we enable readline-enhanced editing: +# https://stackoverflow.com/questions/35115208/is-there-any-way-to-combine-readline-rlcompleter-and-interactiveconsole-in-pytho#answer-35116399 +readline.parse_and_bind('tab: complete'); +console_locals = globals() +readline.set_completer(Completer(console_locals).complete) +code.InteractiveConsole(locals=globals()).interact() + +driver.quit() +httpd.shutdown() diff --git a/test/haketilo_test/conftest.py b/test/haketilo_test/conftest.py new file mode 100644 index 0000000..9103ac8 --- /dev/null +++ b/test/haketilo_test/conftest.py @@ -0,0 +1,187 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +Common fixtures for Haketilo unit tests +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this file's +# license. Although I request that you do not make use of this code in a +# proprietary program, I am not going to enforce this in court. + +import pytest +from pathlib import Path +from tempfile import TemporaryDirectory +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + +from .profiles import firefox_safe_mode +from .server import do_an_internet +from .extension_crafting import make_extension +from .world_wide_library import start_serving_script, dump_scripts +from .misc_constants import proj_root + +@pytest.fixture(scope="session") +def proxy(): + httpd = do_an_internet() + yield httpd + httpd.shutdown() + +@pytest.fixture(scope="session") +def _driver(proxy): + with firefox_safe_mode() as driver: + yield driver + driver.quit() + +def close_all_but_one_window(driver): + while len(driver.window_handles) > 1: + driver.switch_to.window(driver.window_handles[-1]) + driver.close() + driver.switch_to.window(driver.window_handles[0]) + +@pytest.fixture() +def driver(_driver, request): + nav_target = request.node.get_closest_marker('get_page') + nav_target = nav_target.args[0] if nav_target else 'about:blank' + + second_driver = request.node.get_closest_marker('second_driver') + + if second_driver: + with firefox_safe_mode() as _driver: + _driver.get(nav_target) + yield _driver + _driver.quit() + else: + close_all_but_one_window(_driver) + _driver.get(nav_target) + _driver.implicitly_wait(0) + yield _driver + +@pytest.fixture() +def webextension(driver, request): + ext_data = request.node.get_closest_marker('ext_data') + if ext_data is None: + raise Exception('"webextension" fixture requires "ext_data" marker to be set') + ext_data = ext_data.args[0].copy() + + navigate_to = ext_data.get('navigate_to') + if navigate_to is not None: + del ext_data['navigate_to'] + + driver.get('https://gotmyowndoma.in/') + ext_path = make_extension(Path(driver.firefox_profile.path), **ext_data) + addon_id = driver.install_addon(str(ext_path), temporary=True) + get_url = lambda d: d.execute_script('return window.ext_page_url;') + ext_page_url = WebDriverWait(driver, 10).until(get_url) + driver.get(ext_page_url) + + if navigate_to is not None: + driver.get(driver.current_url.replace('testpage.html', navigate_to)) + + yield + + # Unloading an extension might cause its windows to vanish. Make sure + # there's at least one window navigated to some other page before + # uninstalling the addon. Otherwise, we could be left with a windowless + # browser :c + driver.switch_to.window(driver.window_handles[-1]) + driver.get('https://gotmyowndoma.in/') + driver.uninstall_addon(addon_id) + ext_path.unlink() + +@pytest.fixture() +def haketilo(driver): + addon_id = driver.install_addon(str(Path.cwd() / 'mozilla-build.zip'), + temporary=True) + + yield + + driver.uninstall_addon(addon_id) + +script_injector_script = '''\ +/* + * Selenium by default executes scripts in some weird one-time context. We want + * separately-loaded scripts to be able to access global variables defined + * before, including those declared with `const` or `let`. To achieve that, we + * run our scripts by injecting them into the page with a <script> tag that runs + * javascript served by our proxy. We use custom properties of the `window` + * object to communicate with injected code. + */ +const inject = async () => { + delete window.haketilo_selenium_return_value; + delete window.haketilo_selenium_exception; + window.returnval = val => window.haketilo_selenium_return_value = val; + + const injectee = document.createElement('script'); + injectee.src = arguments[0]; + injectee.type = "application/javascript"; + injectee.async = true; + const prom = new Promise(cb => injectee.onload = cb); + + window.arguments = arguments[1]; + document.body.append(injectee); + + await prom; + + /* + * To ease debugging, we want this script to signal all exceptions from the + * injectee. + */ + if (window.haketilo_selenium_exception !== false) + throw ['haketilo_selenium_error', + 'Error in injected script! Check your geckodriver.log and ./injected_scripts/!']; + + return window.haketilo_selenium_return_value; +} +return inject(); +''' + +def _execute_in_page_context(driver, script, args): + script = script + '\n;\nwindow.haketilo_selenium_exception = false;' + script_url = start_serving_script(script) + + try: + result = driver.execute_script(script_injector_script, script_url, args) + if type(result) is list and len(result) == 2 and \ + result[0] == 'haketilo_selenium_error': + raise Exception(result[1]) + return result + except Exception as e: + dump_scripts() + raise e from None + +# Some fixtures here just define functions that operate on driver. We should +# consider making them into webdriver wrapper class methods. + +@pytest.fixture() +def execute_in_page(driver): + def do_execute(script, *args): + return _execute_in_page_context(driver, script, args) + + yield do_execute + +@pytest.fixture() +def wait_elem_text(driver): + def do_wait(id, text): + WebDriverWait(driver, 10).until( + EC.text_to_be_present_in_element((By.ID, id), text) + ) + + yield do_wait diff --git a/test/haketilo_test/data/pages/gotmyowndomain.html b/test/haketilo_test/data/pages/gotmyowndomain.html new file mode 100644 index 0000000..390cbcc --- /dev/null +++ b/test/haketilo_test/data/pages/gotmyowndomain.html @@ -0,0 +1,35 @@ +<!DOCTYPE html> +<!-- + SPDX-License-Identifier: AGPL-3.0-or-later + + Sample testing page + + This file is part of Haketilo. + + Copyright (C) 2021 jahoti <jahoti@tilde.team> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + --> +<html> + <head> + <meta name=charset value="latin1"> + <title>Schrodinger's Document</title> + </head> + <body> + A nice, simple page for testing. + <script> + document.write('<p><b>Or so you thought...</b></p>'); + </script> + </body> +</html> diff --git a/test/haketilo_test/data/pages/gotmyowndomain_https.html b/test/haketilo_test/data/pages/gotmyowndomain_https.html new file mode 100644 index 0000000..f602950 --- /dev/null +++ b/test/haketilo_test/data/pages/gotmyowndomain_https.html @@ -0,0 +1,35 @@ +<!DOCTYPE html> +<!-- + SPDX-License-Identifier: AGPL-3.0-or-later + + Sample testing page to serve over HTTPS + + This file is part of Haketilo. + + Copyright (C) 2021 jahoti <jahoti@tilde.team> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + --> +<html> + <head> + <meta name="charset" value="latin1"> + <title>Schrodinger's Document</title> + </head> + <body> + A nice, simple page for testing (using HTTPS). + <script> + document.write('<p><b>Or so you thought...</b></p>'); + </script> + </body> +</html> diff --git a/test/haketilo_test/data/pages/scripts_to_block_1.html b/test/haketilo_test/data/pages/scripts_to_block_1.html new file mode 100644 index 0000000..164979d --- /dev/null +++ b/test/haketilo_test/data/pages/scripts_to_block_1.html @@ -0,0 +1,45 @@ +<!DOCTYPE html> +<!-- + SPDX-License-Identifier: CC0-1.0 + + A testing page with various scripts that need to get blocked. + + This file is part of Haketilo. + + Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> + + This program is free software: you can redistribute it and/or modify + it under the terms of the CC0 1.0 Universal License as published by + the Creative Commons Corporation. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + CC0 1.0 Universal License for more details. + --> +<html> + <head> + <meta name="charset" value="latin1"> + <script> + window.__run = [...(window.__run || []), 'inline']; + </script> + <!-- the one below shall not execute even when blocking is off... --> + <script type="application/json"> + window.__run = [...(window.__run || []), 'json']; + </script> + </head> + <body> + <button id="clickme1" + onclick="window.__run = [...(window.__run || []), 'on'];"> + Click Meee! + </button> + <a id="clickme2" + href="javascript:window.__run = [...(window.__run || []), 'href'];void(0);"> + Click Meee! + </a> + <iframe src="javascript:void(window.parent.__run = [...(window.parent.__run || []), 'src']);"> + </iframe> + <object data="javascript:window.__run = [...(window.__run || []), 'data'];"> + </object> + </body> +</html> diff --git a/test/haketilo_test/default_profiles/icecat_empty/extensions.json b/test/haketilo_test/default_profiles/icecat_empty/extensions.json new file mode 100644 index 0000000..5f74ff3 --- /dev/null +++ b/test/haketilo_test/default_profiles/icecat_empty/extensions.json @@ -0,0 +1 @@ +{"schemaVersion":25,"addons":[{"id":"jid1-KtlZuoiikVfFew@jetpack","location":"app-global","userDisabled":true,"path":"/usr/lib/icecat/browser/extensions/jid1-KtlZuoiikVfFew@jetpack"},{"id":"uBlock0@raymondhill.net","location":"app-global","userDisabled":true,"path":"/usr/lib/icecat/browser/extensions/uBlock0@raymondhill.net.xpi"},{"id":"SubmitMe@0xbeef.coffee","location":"app-global","userDisabled":true,"path":"/usr/lib/icecat/browser/extensions/SubmitMe@0xbeef.coffee"},{"id":"FreeUSPS@0xbeef.coffee","location":"app-global","userDisabled":true,"path":"/usr/lib/icecat/browser/extensions/FreeUSPS@0xbeef.coffee"},{"id":"tortm-browser-button@jeremybenthum","location":"app-global","userDisabled":true,"path":"/usr/lib/icecat/browser/extensions/tortm-browser-button@jeremybenthum"},{"id":"tprb.addon@searxes.danwin1210.me","location":"app-global","userDisabled":true,"path":"/usr/lib/icecat/browser/extensions/tprb.addon@searxes.danwin1210.me"},{"id":"SimpleSumOfUs@0xbeef.coffee","location":"app-global","userDisabled":true,"path":"/usr/lib/icecat/browser/extensions/SimpleSumOfUs@0xbeef.coffee"}]}
\ No newline at end of file diff --git a/test/haketilo_test/extension_crafting.py b/test/haketilo_test/extension_crafting.py new file mode 100644 index 0000000..97f5027 --- /dev/null +++ b/test/haketilo_test/extension_crafting.py @@ -0,0 +1,215 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +Making temporary WebExtensions for use in the test suite +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021, 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this file's +# license. Although I request that you do not make use of this code in a +# proprietary program, I am not going to enforce this in court. + +import json +import zipfile +import re +import shutil +import subprocess + +from pathlib import Path +from uuid import uuid4 +from tempfile import TemporaryDirectory + +from selenium.webdriver.support.ui import WebDriverWait +from selenium.common.exceptions import NoSuchElementException + +from .misc_constants import * + +class ManifestTemplateValueToFill: + pass + +def manifest_template(): + return { + 'manifest_version': 2, + 'name': 'Haketilo test extension', + 'version': '1.0', + 'applications': { + 'gecko': { + 'id': ManifestTemplateValueToFill(), + 'strict_min_version': '60.0' + } + }, + 'permissions': [ + 'contextMenus', + 'webRequest', + 'webRequestBlocking', + 'activeTab', + 'notifications', + 'sessions', + 'storage', + 'tabs', + '<all_urls>', + 'unlimitedStorage' + ], + 'content_security_policy': "object-src 'none'; script-src 'self' https://serve.scrip.ts;", + 'web_accessible_resources': ['testpage.html'], + 'options_ui': { + 'page': 'testpage.html', + 'open_in_tab': True + }, + 'background': { + 'persistent': True, + 'scripts': ['__open_test_page.js', 'background.js'] + }, + 'content_scripts': [ + { + 'run_at': 'document_start', + 'matches': ['<all_urls>'], + 'match_about_blank': True, + 'all_frames': True, + 'js': ['content.js'] + } + ] + } + +class ExtraHTML: + def __init__(self, html_path, append={}, wrap_into_htmldoc=True): + self.html_path = html_path + self.append = append + self.wrap_into_htmldoc = wrap_into_htmldoc + + def add_to_xpi(self, xpi, tmpdir=None): + if tmpdir is None: + with TemporaryDirectory() as tmpdir: + return self.add_to_xpi(xpi, tmpdir) + + append_flags = [] + for filename, code in self.append.items(): + append_flags.extend(['-A', f'{filename}:{code}']) + + awk = subprocess.run( + ['awk', '-f', awk_script_name, '--', *unit_test_defines, + *append_flags, '-H', self.html_path, '--write-js-deps', + '--output=files-to-copy', f'--output-dir={tmpdir}'], + stdout=subprocess.PIPE, cwd=proj_root, check=True + ) + + for path in filter(None, awk.stdout.decode().split('\n')): + xpi.write(proj_root / path, path) + + tmpdir = Path(tmpdir) + for path in tmpdir.rglob('*'): + relpath = str(path.relative_to(tmpdir)) + if not path.is_dir() and relpath != self.html_path: + xpi.write(path, relpath) + + with open(tmpdir / self.html_path, 'rt') as html_file: + html = html_file.read() + if self.wrap_into_htmldoc: + html = f'<!DOCTYPE html><html><body>{html}</body></html>' + xpi.writestr(self.html_path, html) + +default_background_script = '' +default_content_script = '' +default_test_page = ''' +<!DOCTYPE html> +<html> + <head> + <title>Extension's options page for testing</title> + </head> + <body> + <h1>Extension's options page for testing</h1> + </body> +</html> +''' + +open_test_page_script = '''(() => { +const page_url = browser.runtime.getURL("testpage.html"); +const execute_details = { + code: `window.wrappedJSObject.ext_page_url=${JSON.stringify(page_url)};` +}; +browser.tabs.query({currentWindow: true, active: true}) + .then(t => browser.tabs.executeScript(t.id, execute_details)); +})();''' + +def make_extension(destination_dir, + background_script=default_background_script, + content_script=default_content_script, + test_page=default_test_page, + extra_files={}, extra_html=[]): + if not hasattr(extra_html, '__iter__'): + extra_html = [extra_html] + manifest = manifest_template() + extension_id = '{%s}' % uuid4() + manifest['applications']['gecko']['id'] = extension_id + files = { + 'manifest.json' : json.dumps(manifest), + '__open_test_page.js': open_test_page_script, + 'background.js' : background_script, + 'content.js' : content_script, + 'testpage.html' : test_page, + **extra_files + } + destination_path = destination_dir / f'{extension_id}.xpi' + with zipfile.ZipFile(destination_path, 'x') as xpi: + for filename, contents in files.items(): + if hasattr(contents, '__call__'): + contents = contents() + xpi.writestr(filename, contents) + for html in extra_html: + html.add_to_xpi(xpi) + + return destination_path + +extract_base_url_re = re.compile(r'^(.*)manifest.json$') + +def get_extension_base_url(driver): + """ + Extension's internall UUID is not directly exposed in Selenium. Instead, we + can navigate to about:debugging and inspect the manifest URL present there + to get the base url like: + moz-extension://b225c78f-d108-4caa-8406-f38b37d8dee5/ + which can then be used to navigate to extension-bundled pages. + """ + # For newer Firefoxes + driver.get('about:debugging#/runtime/this-firefox') + + def get_manifest_link_newer_ff(driver): + try: + return driver.find_element_by_class_name('qa-manifest-url') + except NoSuchElementException: + pass + + try: + details = driver.find_element_by_class_name('error-page-details') + except NoSuchElementException: + return False + + if '#/runtime/this-firefox' in details.text: + return "not_newer_ff" + + manifest_link = WebDriverWait(driver, 10).until(get_manifest_link_newer_ff) + + if manifest_link == "not_newer_ff": + driver.get("about:debugging#addons") + driver.implicitly_wait(10) + manifest_link = driver.find_element_by_class_name('manifest-url') + driver.implicitly_wait(0) + + manifest_url = manifest_link.get_attribute('href') + return extract_base_url_re.match(manifest_url).group(1) diff --git a/test/haketilo_test/misc_constants.py b/test/haketilo_test/misc_constants.py new file mode 100644 index 0000000..9cac9dc --- /dev/null +++ b/test/haketilo_test/misc_constants.py @@ -0,0 +1,82 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +""" +Miscellaneous data that were found useful +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021 jahoti <jahoti@tilde.team> +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this code +# in a proprietary program, I am not going to enforce this in court. + +import re +from pathlib import Path + +here = Path(__file__).resolve().parent +proj_root = here.parent.parent +awk_script_name = 'compute_scripts.awk' + +unit_test_defines = ['-D', 'MOZILLA', '-D', 'MV2', '-D', 'TEST', + '-D', 'UNIT_TEST', '-D', 'DEBUG'] + +conf_line_regex = re.compile(r'^([^=]+) = (.*)$') +conf_settings = {} +with open(Path.cwd() / 'record.conf', 'rt') as conf: + for line in conf.readlines(): + match = conf_line_regex.match(line) + if match: + conf_settings[match.group(1).strip()] = match.group(2) + +default_proxy_host = '127.0.0.1' +default_proxy_port = 1337 + +default_cert_dir = proj_root / 'test' / 'certs' + +default_extension_uuid = 'a1291446-be95-48ad-a4c6-a475e389399b' +default_haketilo_id = '{6fe13369-88e9-440f-b837-5012fb3bedec}' + +mime_types = { + "7z": "application/x-7z-compressed", "oga": "audio/ogg", + "abw": "application/x-abiword", "ogv": "video/ogg", + "arc": "application/x-freearc", "ogx": "application/ogg", + "bin": "application/octet-stream", "opus": "audio/opus", + "bz": "application/x-bzip", "otf": "font/otf", + "bz2": "application/x-bzip2", "pdf": "application/pdf", + "css": "text/css", "png": "image/png", + "csv": "text/csv", "sh": "application/x-sh", + "gif": "image/gif", "svg": "image/svg+xml", + "gz": "application/gzip", "tar": "application/x-tar", + "htm": "text/html", "ts": "video/mp2t", + "html": "text/html", "ttf": "font/ttf", + "ico": "image/vnd.microsoft.icon", "txt": "text/plain", + "js": "text/javascript", "wav": "audio/wav", + "jpeg": "image/jpeg", "weba": "audio/webm", + "jpg": "image/jpeg", "webm": "video/webm", + "json": "application/json", "woff": "font/woff", + "mjs": "text/javascript", "woff2": "font/woff2", + "mp3": "audio/mpeg", "xhtml": "application/xhtml+xml", + "mp4": "video/mp4", "zip": "application/zip", + "mpeg": "video/mpeg", + "odp": "application/vnd.oasis.opendocument.presentation", + "ods": "application/vnd.oasis.opendocument.spreadsheet", + "odt": "application/vnd.oasis.opendocument.text", + "xml": "application/xml" # text/xml if readable from casual users +} diff --git a/test/haketilo_test/profiles.py b/test/haketilo_test/profiles.py new file mode 100755 index 0000000..ae997fc --- /dev/null +++ b/test/haketilo_test/profiles.py @@ -0,0 +1,116 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +Browser profiles and Selenium driver initialization +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this file's +# license. Although I request that you do not make use of this code in a +# proprietary program, I am not going to enforce this in court. + +from selenium import webdriver +from selenium.webdriver.firefox.options import Options +import json +from shutil import rmtree + +from .misc_constants import * + +class HaketiloFirefox(webdriver.Firefox): + """ + This wrapper class around selenium.webdriver.Firefox facilitates removing + the temporary profile directory after Firefox quits. + """ + def quit(self, *args, **kwargs): + profile_path = self.firefox_profile.path + super().quit(*args, **kwargs) + rmtree(profile_path, ignore_errors=True) + +def set_profile_proxy(profile, proxy_host, proxy_port): + """ + Create a Firefox profile that uses the specified HTTP proxy for all + protocols. + """ + # proxy type 1 designates "manual" + profile.set_preference('network.proxy.type', 1) + profile.set_preference('network.proxy.no_proxies_on', '') + profile.set_preference('network.proxy.share_proxy_settings', True) + + for proto in ['http', 'ftp', 'socks', 'ssl']: + profile.set_preference(f'network.proxy.{proto}', proxy_host) + profile.set_preference(f'network.proxy.{proto}_port', proxy_port) + profile.set_preference(f'network.proxy.backup.{proto}', '') + profile.set_preference(f'network.proxy.backup.{proto}_port', 0) + +def set_profile_csp_enabled(profile): + """ + By default, Firefox Driver disables CSP. The extension we're testing uses + CSP extensively, so we use this function to prepare a Firefox profile that + has it enabled. + """ + profile.set_preference('security.csp.enable', True) + +# The function below seems not to work for extensions that are +# temporarily-installed in Firefox safe mode. Testing is needed to see if it +# works with non-temporary extensions (without safe mode). +def set_webextension_uuid(profile, extension_id, uuid=default_extension_uuid): + """ + Firefox would normally assign a unique, random UUID to installed extension. + This UUID is needed to easily navigate to extension's settings page (and + other extension's pages). Since there's no way to learn such UUID with + current WebDriver implementation, this function works around this by telling + Firefox to use a predefined UUID for a certain extension. + """ + profile.set_preference('extensions.webextensions.uuids', + json.dumps({extension_id: uuid})) + +def firefox_safe_mode(firefox_binary=conf_settings['BROWSER_BINARY'], + proxy_host=default_proxy_host, + proxy_port=default_proxy_port): + """ + Initialize a Firefox instance controlled by selenium. The instance is + started in safe mode. + """ + profile = webdriver.FirefoxProfile() + set_profile_proxy(profile, proxy_host, proxy_port) + set_profile_csp_enabled(profile) + + options = Options() + options.add_argument('--safe-mode') + + return HaketiloFirefox(options=options, firefox_profile=profile, + firefox_binary=firefox_binary) + +def firefox_with_profile(firefox_binary=conf_settings['BROWSER_BINARY'], + profile_dir=conf_settings['CLEAN_PROFILE'], + proxy_host=default_proxy_host, + proxy_port=default_proxy_port): + """ + Initialize a Firefox instance controlled by selenium. The instance is + started using an empty profile (either the default one or the one passed to + `configure` script). The empty profile is meant to make Firefox start with + globally-installed extensions disabled. + """ + profile = webdriver.FirefoxProfile(profile_dir) + set_profile_proxy(profile, proxy_host, proxy_port) + set_profile_csp_enabled(profile) + set_webextension_uuid(profile, default_haketilo_id) + + return HaketiloFirefox(firefox_profile=profile, + firefox_binary=firefox_binary) diff --git a/test/haketilo_test/proxy_core.py b/test/haketilo_test/proxy_core.py new file mode 100644 index 0000000..f6bb820 --- /dev/null +++ b/test/haketilo_test/proxy_core.py @@ -0,0 +1,141 @@ +# SPDX-License-Identifier: BSD-3-Clause + +""" +The core for a "virtual network" proxy. +""" + +# This file is part of Haketilo. +# +# Copyright (c) 2015, inaz2 +# Copyright (C) 2021 jahoti <jahoti@tilde.team> +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of proxy2 nor the names of its contributors may be used to +# endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this file's +# license. Although I request that you do not make use of this code in a way +# incompliant with the license, I am not going to enforce this in court. + +from pathlib import Path +import socket, ssl, subprocess, sys, threading +from http.server import HTTPServer, BaseHTTPRequestHandler +from socketserver import ThreadingMixIn + +lock = threading.Lock() + +class ProxyRequestHandler(BaseHTTPRequestHandler): + """ + Handles a network request made to the proxy. Configures SSL encryption when + needed. + """ + def __init__(self, *args, **kwargs): + """ + Initialize self. Uses the same arguments as + http.server.BaseHTTPRequestHandler's constructor but also expect a + `certdir` keyword argument with appropriate path. + """ + self.certdir = Path(kwargs.pop('certdir')).resolve() + super().__init__(*args, **kwargs) + + def log_error(self, *args, **kwargs): + """ + Like log_error in http.server.BaseHTTPRequestHandler but suppresses + "Request timed out: timeout('timed out',)". + """ + if not isinstance(args[0], socket.timeout): + super().log_error(*args, **kwargs) + + def get_cert(self, hostname): + """ + If needed, generate a signed x509 certificate for `hostname`. Return + paths to certificate's key file and to certificate itself in a tuple. + """ + root_keyfile = self.certdir / 'rootCA.key' + root_certfile = self.certdir / 'rootCA.pem' + keyfile = self.certdir / 'site.key' + certfile = self.certdir / f'{hostname}.crt' + + with lock: + requestfile = self.certdir / f'{hostname}.csr' + if not certfile.exists(): + subprocess.run([ + 'openssl', 'req', '-new', '-key', str(keyfile), + '-subj', f'/CN={hostname}', '-out', str(requestfile) + ], check=True) + subprocess.run([ + 'openssl', 'x509', '-req', '-in', str(requestfile), + '-CA', str(root_certfile), '-CAkey', str(root_keyfile), + '-CAcreateserial', '-out', str(certfile), '-days', '1024' + ], check=True) + + return keyfile, certfile + + def do_CONNECT(self): + """Wrap the connection with SSL using on-demand signed certificate.""" + hostname = self.path.split(':')[0] + sslargs = {'server_side': True} + sslargs['keyfile'], sslargs['certfile'] = self.get_cert(hostname) + + self.send_response(200) + self.end_headers() + + self.connection = ssl.wrap_socket(self.connection, **sslargs) + self.rfile = self.connection.makefile('rb', self.rbufsize) + self.wfile = self.connection.makefile('wb', self.wbufsize) + + connection_header = self.headers.get('Proxy-Connection', '').lower() + self.close_connection = int(connection_header == 'close') + + def do_GET(self): + content_length = int(self.headers.get('Content-Length', 0)) + req_body = self.rfile.read(content_length) if content_length else None + + if self.path[0] == '/': + secure = 's' if isinstance(self.connection, ssl.SSLSocket) else '' + self.path = f'http{secure}://{self.headers["Host"]}{self.path}' + + self.handle_request(req_body) + + do_OPTIONS = do_DELETE = do_PUT = do_HEAD = do_POST = do_GET + + def handle_request(self, req_body): + """Default handler that does nothing. Please override.""" + pass + + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + """The actual proxy server""" + address_family, daemon_threads = socket.AF_INET6, True + + def handle_error(self, request, client_address): + """ + Like handle_error in http.server.HTTPServer but suppresses socket/ssl + related errors. + """ + cls, e = sys.exc_info()[:2] + if not (cls is socket.error or cls is ssl.SSLError): + return super().handle_error(request, client_address) diff --git a/test/haketilo_test/script_loader.py b/test/haketilo_test/script_loader.py new file mode 100644 index 0000000..66130bb --- /dev/null +++ b/test/haketilo_test/script_loader.py @@ -0,0 +1,72 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +Loading of parts of Haketilo source for testing in browser +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this file's +# license. Although I request that you do not make use of this code in a +# proprietary program, I am not going to enforce this in court. + +from pathlib import Path +import subprocess, re + +from .misc_constants import * + +def make_relative_path(path): + path = Path(path) + + if path.is_absolute(): + path = path.relative_to(proj_root) + + return path + +script_cache = {} + +def load_script(path, code_to_add=None): + """ + `path` is a .js file path in Haketilo sources. It may be absolute or + specified relative to Haketilo's project directory. `code_to_add` is + optional code to be appended to the end of the main file being imported. + it can contain directives like `#IMPORT`. + + Return a string containing script from `path` together with all other + scripts it depends on. Dependencies are wrapped in the same way Haketilo's + build system wraps them, with imports properly satisfied. The main script + being loaded is wrapped partially - it also has its imports satisfied, but + its code is executed in global scope instead of within an anonymous function + and imported variables are defined with `let` instead of `const` to allow + a dependency to be substituted by a mocked value. + """ + path = make_relative_path(path) + key = (str(path), code_to_add) + if key in script_cache: + return script_cache[key] + + append_flags = () if code_to_add is None else ('-A', ':'.join(key)) + + awk = subprocess.run(['awk', '-f', awk_script_name, '--', + *unit_test_defines, *append_flags, + '--output=amalgamate-js:' + str(path)], + stdout=subprocess.PIPE, cwd=proj_root, check=True) + script = awk.stdout.decode() + script_cache[key] = script + + return script diff --git a/test/haketilo_test/server.py b/test/haketilo_test/server.py new file mode 100755 index 0000000..7dc5e9e --- /dev/null +++ b/test/haketilo_test/server.py @@ -0,0 +1,112 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +""" +A modular "virtual network" proxy, +wrapping the classes in proxy_core.py +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021 jahoti <jahoti@tilde.team> +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this code +# in a proprietary program, I am not going to enforce this in court. + +from pathlib import Path +from urllib.parse import parse_qs +from threading import Thread +import traceback + +from .proxy_core import ProxyRequestHandler, ThreadingHTTPServer +from .misc_constants import * +from .world_wide_library import catalog as internet + +class RequestHijacker(ProxyRequestHandler): + def handle_request(self, req_body): + path_components = self.path.split('?', maxsplit=1) + path = path_components[0] + try: + # Response format: (status_code, headers (dict. of strings), + # body as bytes or filename containing body as string) + if path in internet: + info = internet[path] + if type(info) is tuple: + status_code, headers, body_file = info + resp_body = b'' + if body_file is not None: + if 'Content-Type' not in headers: + ext = body_file.suffix[1:] + if ext and ext in mime_types: + headers['Content-Type'] = mime_types[ext] + + with open(body_file, mode='rb') as f: + resp_body = f.read() + else: + # A function to evaluate to get the response + get_params, post_params = {}, {} + if len(path_components) == 2: + get_params = parse_qs(path_components[1]) + + # Parse POST parameters; currently only supports + # application/x-www-form-urlencoded + if req_body: + post_params = parse_qs(req_body.encode()) + + status_code, headers, resp_body = info(self.command, get_params, post_params) + if type(resp_body) == str: + resp_body = resp_body.encode() + + if type(status_code) != int or status_code <= 0: + raise Exception('Invalid status code %r' % status_code) + + for header, header_value in headers.items(): + if type(header) != str: + raise Exception('Invalid header key %r' % header) + + elif type(header_value) != str: + raise Exception('Invalid header value %r' % header_value) + else: + status_code, headers = 404, {'Content-Type': 'text/plain'} + resp_body = b'Handler for this URL not found.' + + except Exception: + status_code = 500 + headers = {'Content-Type': 'text/plain'} + resp_body = b'Internal Error:\n' + traceback.format_exc().encode() + + headers['Content-Length'] = str(len(resp_body)) + self.send_response(status_code) + for header, header_value in headers.items(): + self.send_header(header, header_value) + + self.end_headers() + if resp_body: + self.wfile.write(resp_body) + +def do_an_internet(certdir=Path.cwd() / 'certs', + port=default_proxy_port): + """Start up the proxy/server""" + class RequestHijackerWithCertdir(RequestHijacker): + def __init__(self, *args, **kwargs): + super().__init__(*args, certdir=certdir, **kwargs) + + httpd = ThreadingHTTPServer(('', port), RequestHijackerWithCertdir) + Thread(target=httpd.serve_forever).start() + + return httpd diff --git a/test/haketilo_test/test_integration.py b/test/haketilo_test/test_integration.py new file mode 100644 index 0000000..87d1827 --- /dev/null +++ b/test/haketilo_test/test_integration.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo integration tests +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest + +from .extension_crafting import get_extension_base_url + +@pytest.mark.usefixtures('haketilo') +def test_integration(driver): + """ + Verify that the entire extension functions properly. Also verify bunlded + default settings get loaded properly. + """ + base_url = get_extension_base_url(driver) + driver.get(base_url + 'html/settings.html') + + for tab_head_id, item_text in [ + ('resources_head', 'Haketilo demonstrational script'), + ('mappings_head', 'Haketilo demonstrational message'), + ]: + driver.find_element_by_id(tab_head_id).click() + lst = driver.find_element_by_css_selector('.active_tab .item_list') + assert lst.is_displayed() + assert item_text in lst.text + + driver.find_element_by_id('repos_head').click() + lst = driver.find_element_by_css_selector('.active_tab .text_entries') + assert 'https://hydrilla.koszko.org/api_v1' in lst.text + + # TODO: do some more tests, including popup interaction and repository + # querying diff --git a/test/haketilo_test/unit/__init__.py b/test/haketilo_test/unit/__init__.py new file mode 100644 index 0000000..2b351bb --- /dev/null +++ b/test/haketilo_test/unit/__init__.py @@ -0,0 +1,2 @@ +# SPDX-License-Identifier: CC0-1.0 +# Copyright (C) 2021 Wojtek Kosior diff --git a/test/haketilo_test/unit/test_CORS_bypass_server.py b/test/haketilo_test/unit/test_CORS_bypass_server.py new file mode 100644 index 0000000..45e4ebb --- /dev/null +++ b/test/haketilo_test/unit/test_CORS_bypass_server.py @@ -0,0 +1,109 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - routing HTTP requests through background script +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +from selenium.webdriver.support.ui import WebDriverWait + +from ..script_loader import load_script +from ..world_wide_library import some_data + +urls = { + 'resource': 'https://anotherdoma.in/resource/blocked/by/CORS.json', + 'nonexistent': 'https://nxdoma.in/resource.json', + 'invalid': 'w3csucks://invalid.url/' +} + +content_script = '''\ +const urls = %s; + +function fetch_data(url) { + return { + url, + to_get: ["ok", "status"], + to_call: ["text", "json"] + }; +} + +async function fetch_resources() { + const results = {}; + const promises = []; + for (const [name, url] of Object.entries(urls)) { + const sending = browser.runtime.sendMessage(["CORS_bypass", + fetch_data(url)]); + promises.push(sending.then(response => results[name] = response)); + } + + await Promise.all(promises); + + window.wrappedJSObject.haketilo_fetch_results = results; +} + +fetch_resources(); +''' + +content_script = content_script % json.dumps(urls); + +@pytest.mark.ext_data({ + 'content_script': content_script, + 'background_script': + lambda: load_script('background/CORS_bypass_server.js') + '; start();' +}) +@pytest.mark.usefixtures('webextension') +def test_CORS_bypass_server(driver, execute_in_page): + """ + Test if CORS bypassing works and if errors get properly forwarded. + """ + driver.get('https://gotmyowndoma.in/') + + # First, verify that requests without CORS bypass measures fail. + results = execute_in_page( + ''' + const result = {}; + let promises = []; + for (const [name, url] of Object.entries(arguments[0])) { + const [ok_cb, err_cb] = + ["ok", "err"].map(status => () => result[name] = status); + promises.push(fetch(url).then(ok_cb, err_cb)); + } + // Make the promises non-failing. + promises = promises.map(p => new Promise(cb => p.then(cb, cb))); + returnval(Promise.all(promises).then(() => result)); + ''', + {**urls, 'sameorigin': './nonexistent_resource'}) + + assert results == dict([*[(k, 'err') for k in urls.keys()], + ('sameorigin', 'ok')]) + + done = lambda d: d.execute_script('return window.haketilo_fetch_results;') + results = WebDriverWait(driver, 10).until(done) + + assert set(results['invalid'].keys()) == {'error'} + + assert set(results['nonexistent'].keys()) == \ + {'ok', 'status', 'text', 'error_json'} + assert results['nonexistent']['ok'] == False + assert results['nonexistent']['status'] == 404 + assert results['nonexistent']['text'] == 'Handler for this URL not found.' + + assert set(results['resource'].keys()) == {'ok', 'status', 'text', 'json'} + assert results['resource']['ok'] == True + assert results['resource']['status'] == 200 + assert results['resource']['text'] == some_data + assert results['resource']['json'] == json.loads(some_data) diff --git a/test/haketilo_test/unit/test_basic.py b/test/haketilo_test/unit/test_basic.py new file mode 100644 index 0000000..6ec54cc --- /dev/null +++ b/test/haketilo_test/unit/test_basic.py @@ -0,0 +1,77 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - base +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021, Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest + +from ..script_loader import load_script +from ..extension_crafting import ExtraHTML + +def test_driver(driver): + """ + A trivial test case that verifies mocked web pages served by proxy can be + accessed by the browser driven. + """ + for proto in ['http://', 'https://']: + driver.get(proto + 'gotmyowndoma.in') + title = driver.execute_script( + 'return document.getElementsByTagName("title")[0].innerText;' + ) + assert "Schrodinger's Document" in title + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_script_loader(execute_in_page): + """ + A trivial test case that verifies Haketilo's .js files can be properly + loaded into a test page together with their dependencies. + """ + execute_in_page(load_script('common/indexeddb.js')) + + assert 'mapping' in execute_in_page('returnval(stores.map(s => s[0]));') + +@pytest.mark.ext_data({}) +@pytest.mark.usefixtures('webextension') +def test_webextension(driver): + """ + A trivial test case that verifies a test WebExtension created and installed + by the `webextension` fixture works and redirects specially-constructed URLs + to its test page. + """ + heading = driver.execute_script( + 'return document.getElementsByTagName("h1")[0].innerText;' + ) + assert "Extension's options page for testing" in heading + +@pytest.mark.ext_data({ + 'extra_html': ExtraHTML( + 'html/default_blocking_policy.html', + { + 'html/default_blocking_policy.js': + 'document.body.innerHTML = `ski-ba-bop-ba ${typeof by_id}`;' + } + ), + 'navigate_to': 'html/default_blocking_policy.html' +}) +@pytest.mark.usefixtures('webextension') +def test_extra_html(driver): + """ + A trivial test case of the facility for loading the Haketilo's HTML files + into test WebExtension for unit-testing. + """ + assert driver.execute_script('return document.body.innerText') == \ + 'ski-ba-bop-ba function' diff --git a/test/haketilo_test/unit/test_broadcast.py b/test/haketilo_test/unit/test_broadcast.py new file mode 100644 index 0000000..7c2c051 --- /dev/null +++ b/test/haketilo_test/unit/test_broadcast.py @@ -0,0 +1,175 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - message broadcasting +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021, Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +from selenium.webdriver.support.ui import WebDriverWait + +from ..script_loader import load_script +from .utils import broker_js + +test_page_html = ''' +<!DOCTYPE html> +<script src="/testpage.js"></script> +<h2>d0 (channel `somebodyoncetoldme`)</h2> +<div id="d0"></div> +<h2>d1 (channel `worldisgonnarollme`)</h2> +<div id="d1"></div> +<h2>d2 (both channels)</h2> +<div id="d2"></div> +''' + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'test_page': test_page_html, + 'extra_files': { + 'testpage.js': lambda: load_script('common/broadcast.js') + } +}) +@pytest.mark.usefixtures('webextension') +def test_broadcast(driver, execute_in_page, wait_elem_text): + """ + A test that verifies the broadcasting system based on WebExtension messaging + API and implemented in `background/broadcast_broker.js` and + `common/broadcast.js` works correctly. + """ + # The broadcast facility is meant to enable message distribution between + # multiple contexts (e.g. different tabs/windows). Let's open the same + # extension's test page in a second window. + driver.execute_script( + ''' + window.open(window.location.href, "_blank"); + window.open(window.location.href, "_blank"); + ''') + WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 3) + windows = [*driver.window_handles] + + # Let's first test if a simple message can be successfully broadcasted + driver.switch_to.window(windows[0]) + execute_in_page( + ''' + const divs = [0, 1, 2].map(n => document.getElementById("d" + n)); + let appender = n => (t => divs[n].append("\\n" + `[${t[0]}, ${t[1]}]`)); + let listener0 = listener_connection(appender(0)); + subscribe(listener0, "somebodyoncetoldme"); + ''') + + driver.switch_to.window(windows[1]) + execute_in_page( + ''' + let sender0 = sender_connection(); + out(sender0, "somebodyoncetoldme", "iaintthesharpesttool"); + ''') + + driver.switch_to.window(windows[0]) + wait_elem_text('d0', '[somebodyoncetoldme, iaintthesharpesttool]') + + # Let's add 2 more listeners + driver.switch_to.window(windows[0]) + execute_in_page( + ''' + let listener1 = listener_connection(appender(1)); + subscribe(listener1, "worldisgonnarollme"); + let listener2 = listener_connection(appender(2)); + subscribe(listener2, "worldisgonnarollme"); + subscribe(listener2, "somebodyoncetoldme"); + ''') + + # Let's send one message to one channel and one to the other. Verify they + # were received by the rght listeners. + driver.switch_to.window(windows[1]) + execute_in_page( + ''' + out(sender0, "somebodyoncetoldme", "intheshed"); + out(sender0, "worldisgonnarollme", "shewaslooking"); + ''') + + driver.switch_to.window(windows[0]) + wait_elem_text('d0', 'intheshed') + wait_elem_text('d1', 'shewaslooking') + wait_elem_text('d2', 'intheshed') + wait_elem_text('d2', 'shewaslooking') + + text = execute_in_page('returnval(divs[0].innerText);') + assert 'shewaslooking' not in text + text = execute_in_page('returnval(divs[1].innerText);') + assert 'intheshed' not in text + + # Let's create a second sender in third window and use it to send messages + # with the 'prepare' feature. + driver.switch_to.window(windows[2]) + execute_in_page( + ''' + let sender1 = sender_connection(); + prepare(sender1, "somebodyoncetoldme", "kindadumb"); + out(sender1, "worldisgonnarollme", "withherfinger"); + ''') + + driver.switch_to.window(windows[0]) + wait_elem_text('d1', 'withherfinger') + text = execute_in_page('returnval(divs[0].innerText);') + assert 'kindadumb' not in text + + driver.switch_to.window(windows[2]) + execute_in_page('flush(sender1);') + + driver.switch_to.window(windows[0]) + wait_elem_text('d0', 'kindadumb') + + # Let's verify that prepare()'d messages are properly discarded when + # discard() is called. + driver.switch_to.window(windows[2]) + execute_in_page( + ''' + prepare(sender1, "somebodyoncetoldme", "andherthumb"); + discard(sender1); + prepare(sender1, "somebodyoncetoldme", "andhermiddlefinger"); + flush(sender1); + ''') + + driver.switch_to.window(windows[0]) + wait_elem_text('d0', 'andhermiddlefinger') + text = execute_in_page('returnval(divs[0].innerText);') + assert 'andherthumb' not in text + + # Let's verify prepare()'d messages are properly auto-flushed when the other + # end of the connection gets killed (e.g. because browser tab gets closed). + driver.switch_to.window(windows[2]) + execute_in_page( + ''' + prepare(sender1, "worldisgonnarollme", "intheshape", 500); + ''') + driver.close() + + driver.switch_to.window(windows[0]) + wait_elem_text('d2', 'intheshape') + + # Verify listener's connection gets closed properly. + execute_in_page('close(listener0); close(listener1);') + + driver.switch_to.window(windows[1]) + execute_in_page('out(sender0, "worldisgonnarollme", "ofanL");') + execute_in_page('out(sender0, "somebodyoncetoldme", "forehead");') + + driver.switch_to.window(windows[0]) + wait_elem_text('d2', 'ofanL') + wait_elem_text('d2', 'forehead') + for i in (0, 1): + text = execute_in_page('returnval(divs[arguments[0]].innerText);', i) + assert 'ofanL' not in text + assert 'forehead' not in text diff --git a/test/haketilo_test/unit/test_content.py b/test/haketilo_test/unit/test_content.py new file mode 100644 index 0000000..8220160 --- /dev/null +++ b/test/haketilo_test/unit/test_content.py @@ -0,0 +1,190 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - main content script +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +from selenium.webdriver.support.ui import WebDriverWait + +from ..script_loader import load_script + +# From: +# https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/contentScripts/register +# it is unclear whether the dynamically-registered content script is guaranteed +# to be always executed after statically-registered ones. We want to test both +# cases, so we'll make the mocked dynamic content script execute before +# content.js on http:// pages and after it on https:// pages. +dynamic_script = \ + '''; + this.haketilo_secret = "abracadabra"; + this.haketilo_pattern_tree = {}; + this.haketilo_default_allow = false; + + if (this.haketilo_content_script_main) + this.haketilo_content_script_main(); + ''' + +content_script = \ + ''' + /* Mock dynamic content script - case 'before'. */ + if (/dynamic_before/.test(document.URL)) { + %s; + } + + /* Place amalgamated content.js here. */ + %s; + + /* Rest of mocks */ + + function mock_decide_policy() { + nonce = "12345"; + return { + allow: false, + mapping: "what-is-programmers-favorite-drinking-place", + payload: {identifier: "foo-bar"}, + nonce, + csp: "prefetch-src 'none'; script-src-attr 'none'; script-src 'nonce-12345'; script-src-elem 'nonce-12345';" + }; + } + + async function mock_payload_error([type, res_id]) { + if (type === "indexeddb_files") + return {error: {haketilo_error_type: "missing", id: res_id}}; + } + + async function mock_payload_ok([type, res_id]) { + if (type === "indexeddb_files") + return {files: [1, 2].map(n => `window.hak_injected_${n} = ${n};`)}; + } + + if (/payload_error/.test(document.URL)) { + browser.runtime.sendMessage = mock_payload_error; + decide_policy = mock_decide_policy; + } else if (/payload_ok/.test(document.URL)) { + browser.runtime.sendMessage = mock_payload_ok; + decide_policy = mock_decide_policy; + } + /* Otherwise, script blocking policy without payload to inject is used. */ + + const data_to_verify = {}; + function data_set(prop, val) { + data_to_verify[prop] = val; + window.wrappedJSObject.data_to_verify = JSON.stringify(data_to_verify); + } + + repo_query_cacher.start = () => data_set("cacher_started", true); + + enforce_blocking = policy => data_set("enforcing", policy); + + browser.runtime.onMessage.addListener = async function (listener_cb) { + await new Promise(cb => setTimeout(cb, 10)); + + /* Mock a good request. */ + const set_good = val => data_set("good_request_result", val); + data_set("good_request_returned", + !!listener_cb(["page_info"], {}, val => set_good(val))); + + /* Mock a bad request. */ + const set_bad = val => data_set("bad_request_result", val); + data_set("bad_request_returned", + !!listener_cb(["???"], {}, val => set_bad(val))); + } + + /* main() call - normally present in content.js, inside '#IF !UNIT_TEST'. */ + main(); + + /* Mock dynamic content script - case 'after'. */ + if (/#dynamic_after/.test(document.URL)) { + %s; + } + + data_set("script_run_without_errors", true); + ''' % (dynamic_script, load_script('content/content.js'), dynamic_script) + +@pytest.mark.ext_data({'content_script': content_script}) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('target1', ['dynamic_before'])#, 'dynamic_after']) +@pytest.mark.parametrize('target2', [ + 'scripts_blocked', + 'payload_error', + 'payload_ok' +]) +def test_content_unprivileged_page(driver, execute_in_page, target1, target2): + """ + Test functioning of content.js on an page using unprivileged schema (e.g. + 'https://' and not 'about:'). + """ + driver.get(f'https://gotmyowndoma.in/index.html#{target1}-{target2}') + + def get_data(driver): + data = driver.execute_script('return window.data_to_verify;') + return data if 'good_request_result' in data else False + + data = json.loads(WebDriverWait(driver, 10).until(get_data)) + + assert 'gotmyowndoma.in' in data['good_request_result']['url'] + assert 'bad_request_result' not in data + + assert data['good_request_returned'] == True + assert data['bad_request_returned'] == False + + assert data['cacher_started'] == True + + for obj in (data['good_request_result'], data['enforcing']): + assert obj['allow'] == False + + assert 'error' not in data['enforcing'] + + if target2.startswith('payload'): + for obj in (data['good_request_result'], data['enforcing']): + assert obj['payload']['identifier'] == 'foo-bar' + assert 'mapping' in obj + else: + assert 'payload' not in data['enforcing'] + assert 'mapping' not in data['enforcing'] + + assert data['script_run_without_errors'] == True + + def vars_made_by_payload(driver): + vars_values = driver.execute_script( + 'return [1, 2].map(n => window[`hak_injected_${n}`]);' + ) + if vars_values != [None, None]: + return vars_values + + if target2 == 'payload_error': + assert data['good_request_result']['error'] == { + 'haketilo_error_type': 'missing', + 'id': 'foo-bar' + } + elif target2 == 'payload_ok': + vars_values = WebDriverWait(driver, 10).until(vars_made_by_payload) + assert vars_values == [1, 2] + +@pytest.mark.ext_data({'content_script': content_script}) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('target', ['dynamic_before', 'dynamic_after']) +def test_content_privileged_page(driver, execute_in_page, target): + """ + Test functioning of content.js on an page considered privileged (e.g. a + directory listing at 'file:///'). + """ + driver.get(f'file:///#{target}') + data = json.loads(driver.execute_script('return window.data_to_verify;')) + + assert data == {'script_run_without_errors': True} diff --git a/test/haketilo_test/unit/test_default_policy_dialog.py b/test/haketilo_test/unit/test_default_policy_dialog.py new file mode 100644 index 0000000..a1c825f --- /dev/null +++ b/test/haketilo_test/unit/test_default_policy_dialog.py @@ -0,0 +1,49 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - default script blocking policy dialog +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022, Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import broker_js + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'extra_html': ExtraHTML( + 'html/default_blocking_policy.html', + { + 'html/default_blocking_policy.js': + 'init_default_policy_dialog();' + } + ), + 'navigate_to': 'html/default_blocking_policy.html' +}) +@pytest.mark.usefixtures('webextension') +def test_default_blocking_policy_dialog(driver, wait_elem_text): + """ + A test case for the dialog that facilitates toggling the default policy of + script blocking. + """ + wait_elem_text('current_policy_span', 'block') + + driver.find_element_by_id('toggle_policy_but').click() + wait_elem_text('current_policy_span', 'allow') + + driver.find_element_by_id('toggle_policy_but').click() + wait_elem_text('current_policy_span', 'block') diff --git a/test/haketilo_test/unit/test_dialog.py b/test/haketilo_test/unit/test_dialog.py new file mode 100644 index 0000000..63af79e --- /dev/null +++ b/test/haketilo_test/unit/test_dialog.py @@ -0,0 +1,143 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - showing an error/info/question dalog +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022, Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script + +@pytest.mark.ext_data({ + 'extra_html': ExtraHTML('html/dialog.html', {}), + 'navigate_to': 'html/dialog.html' +}) +@pytest.mark.usefixtures('webextension') +def test_dialog_show_close(driver, execute_in_page): + """ + A test case of basic dialog showing/closing. + """ + execute_in_page(load_script('html/dialog.js')) + buts = execute_in_page( + ''' + let cb_calls, call_prom; + const dialog_context = make(() => cb_calls.push("show"), + () => cb_calls.push("hide")); + document.body.append(dialog_context.main_div); + const buts = {}; + for (const but of document.getElementsByTagName("button")) + buts[but.textContent] = but; + returnval(buts); + ''') + + for i, (dialog_function, but_text, hidden, expected_result) in enumerate([ + ('info', 'Ok', ['Yes', 'No'], None), + ('error', 'Ok', ['Yes', 'No'], None), + ('error', None, ['Yes', 'No'], None), + ('loader', None, ['Yes', 'No', 'Ok'], None), + ('ask', 'Yes', ['Ok'], True), + ('ask', None, ['Ok'], None), + ('ask', 'No', ['Ok'], False) + ]): + cb_calls, is_shown = execute_in_page( + f''' + cb_calls = []; + call_prom = {dialog_function}(dialog_context, + `sample_text_${{arguments[0]}}`); + returnval([cb_calls, dialog_context.shown]); + ''', + i) + assert cb_calls == ['show'] + assert is_shown == True + + page_source = driver.page_source + assert f'sample_text_{i}' in page_source + assert f'sample_text_{i - 1}' not in page_source + + # Verify the right buttons are displayed. + for text, but in buts.items(): + if text in hidden: + assert not but.is_displayed() + # Verify clicking a hidden button does nothing. + execute_in_page('buts[arguments[0]].click();', text) + assert execute_in_page('returnval(cb_calls);') == cb_calls + else: + assert but.is_displayed() + + if but_text is None: + execute_in_page('close_dialog(dialog_context);') + else: + buts[but_text].click() + + cb_calls, result, is_shown = execute_in_page( + '''{ + const values_cb = r => [cb_calls, r, dialog_context.shown]; + returnval(call_prom.then(values_cb)); + }''') + assert cb_calls == ['show', 'hide'] + assert result == expected_result + assert is_shown == False + +@pytest.mark.ext_data({ + 'extra_html': ExtraHTML('html/dialog.html', {}), + 'navigate_to': 'html/dialog.html' +}) +@pytest.mark.usefixtures('webextension') +def test_dialog_queue(driver, execute_in_page): + """ + A test case of queuing dialog display operations. + """ + execute_in_page(load_script('html/dialog.js')) + execute_in_page( + ''' + let cb_calls = [], call_proms = []; + const dialog_context = make(() => cb_calls.push("show"), + () => cb_calls.push("hide")); + document.body.append(dialog_context.main_div); + ''') + + buts = driver.find_elements_by_tag_name('button') + buts = dict([(but.text, but) for but in buts]) + + for i in range(5): + cb_calls, is_shown, msg_elem = execute_in_page( + ''' + call_proms.push(ask(dialog_context, "somequestion" + arguments[0])); + returnval([cb_calls, dialog_context.shown, dialog_context.msg]); + ''', + i) + assert cb_calls == ['show'] + assert is_shown == True + assert msg_elem.text == 'somequestion0' + + for i in range(5): + buts['Yes' if i & 1 else 'No'].click() + cb_calls, is_shown, msg_elem, result = execute_in_page( + '''{ + const values_cb = + r => [cb_calls, dialog_context.shown, dialog_context.msg, r]; + returnval(call_proms.splice(0, 1)[0].then(values_cb)); + }''') + if i < 4: + assert cb_calls == ['show'] + assert is_shown == True + assert msg_elem.text == f'somequestion{i + 1}' + else: + assert cb_calls == ['show', 'hide'] + assert is_shown == False + + assert result == bool(i & 1) diff --git a/test/haketilo_test/unit/test_indexeddb.py b/test/haketilo_test/unit/test_indexeddb.py new file mode 100644 index 0000000..c2d5427 --- /dev/null +++ b/test/haketilo_test/unit/test_indexeddb.py @@ -0,0 +1,490 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - IndexedDB access +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021, 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import WebDriverException + +from ..script_loader import load_script +from .utils import * + +# Sample resource definitions. They'd normally contain more fields but here we +# use simplified versions. + +def make_sample_resource(): + return { + 'source_copyright': [ + sample_file_ref('report.spdx'), + sample_file_ref('LICENSES/somelicense.txt') + ], + 'type': 'resource', + 'identifier': 'helloapple', + 'scripts': [sample_file_ref('hello.js'), sample_file_ref('bye.js')] + } + +def make_sample_mapping(): + return { + 'source_copyright': [ + sample_file_ref('report.spdx'), + sample_file_ref('README.md') + ], + 'type': 'mapping', + 'identifier': 'helloapple' + } + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_haketilodb_item_modifications(driver, execute_in_page): + """ + indexeddb.js facilitates operating on Haketilo's internal database. + Verify database operations on mappings/resources work properly. + """ + execute_in_page(load_script('common/indexeddb.js')) + mock_broadcast(execute_in_page) + + # Start with no database. + clear_indexeddb(execute_in_page) + + sample_item = make_sample_resource() + sample_item['source_copyright'][0]['extra_prop'] = True + + execute_in_page( + '''{ + const promise = start_items_transaction(["resource"], arguments[1]) + .then(ctx => save_item(arguments[0], ctx).then(() => ctx)) + .then(finalize_transaction); + returnval(promise); + }''', + sample_item, {'sha256': sample_files_by_sha256}) + + database_contents = get_db_contents(execute_in_page) + + assert len(database_contents['file']) == 4 + assert all([sample_files_by_sha256[file['sha256']] == file['contents'] + for file in database_contents['file']]) + assert all([len(file) == 2 for file in database_contents['file']]) + + assert len(database_contents['file_uses']) == 4 + assert all([uses['uses'] == 1 for uses in database_contents['file_uses']]) + assert set([uses['sha256'] for uses in database_contents['file_uses']]) \ + == set([file['sha256'] for file in database_contents['file']]) + + assert database_contents['mapping'] == [] + assert database_contents['resource'] == [sample_item] + + # See if trying to add an item without providing all its files ends in an + # exception and aborts the transaction as it should. + sample_item['scripts'].append(sample_file_ref('combined.js')) + incomplete_files = {**sample_files_by_sha256} + incomplete_files.pop(sample_files['combined.js']['sha256']) + exception = execute_in_page( + '''{ + const args = arguments; + async function try_add_item() + { + const context = + await start_items_transaction(["resource"], args[1]); + try { + await save_item(args[0], context); + await finalize_transaction(context); + return; + } catch(e) { + return e; + } + } + returnval(try_add_item()); + }''', + sample_item, {'sha256': incomplete_files}) + + previous_database_contents = database_contents + database_contents = get_db_contents(execute_in_page) + + assert 'file not present' in exception + for key, val in database_contents.items(): + keyfun = lambda item: item.get('sha256') or item['identifier'] + assert sorted(previous_database_contents[key], key=keyfun) \ + == sorted(val, key=keyfun) + + # See if adding another item that partially uses first's files works OK. + sample_item = make_sample_mapping() + database_contents = execute_in_page( + '''{ + const promise = start_items_transaction(["mapping"], arguments[1]) + .then(ctx => save_item(arguments[0], ctx).then(() => ctx)) + .then(finalize_transaction); + returnval(promise); + }''', + sample_item, {'sha256': sample_files_by_sha256}) + + database_contents = get_db_contents(execute_in_page) + + names = ['README.md', 'report.spdx', 'LICENSES/somelicense.txt', 'hello.js', + 'bye.js'] + sample_files_list = [sample_files[name] for name in names] + uses_list = [1, 2, 1, 1, 1] + + uses = dict([(uses['sha256'], uses['uses']) + for uses in database_contents['file_uses']]) + assert uses == dict([(file['sha256'], nr) + for file, nr in zip(sample_files_list, uses_list)]) + + files = dict([(file['sha256'], file['contents']) + for file in database_contents['file']]) + assert files == dict([(file['sha256'], file['contents']) + for file in sample_files_list]) + + del database_contents['resource'][0]['source_copyright'][0]['extra_prop'] + assert database_contents['resource'] == [make_sample_resource()] + assert database_contents['mapping'] == [sample_item] + + # Try removing the items to get an empty database again. + results = [None, None] + for i, item_type in enumerate(['resource', 'mapping']): + execute_in_page( + f'''{{ + const remover = remove_{item_type}; + const promise = + start_items_transaction(["{item_type}"], {{}}) + .then(ctx => remover('helloapple', ctx).then(() => ctx)) + .then(finalize_transaction); + returnval(promise); + }}''') + + results[i] = get_db_contents(execute_in_page) + + names = ['README.md', 'report.spdx'] + sample_files_list = [sample_files[name] for name in names] + uses_list = [1, 1] + + uses = dict([(uses['sha256'], uses['uses']) + for uses in results[0]['file_uses']]) + assert uses == dict([(file['sha256'], 1) for file in sample_files_list]) + + files = dict([(file['sha256'], file['contents']) + for file in results[0]['file']]) + assert files == dict([(file['sha256'], file['contents']) + for file in sample_files_list]) + + assert results[0]['resource'] == [] + assert results[0]['mapping'] == [sample_item] + + assert results[1] == dict([(key, []) for key in results[0].keys()]) + + # Try initializing an empty database with sample initial data object. + sample_resource = make_sample_resource() + sample_mapping = make_sample_mapping() + initial_data = { + 'resource': { + 'helloapple': { + '1.12': sample_resource, + '0.9': 'something_that_should_get_ignored', + '1': 'something_that_should_get_ignored', + '1.1': 'something_that_should_get_ignored', + '1.11.1': 'something_that_should_get_ignored', + } + }, + 'mapping': { + 'helloapple': { + '0.1.1': sample_mapping + } + }, + 'file': { + 'sha256': sample_files_by_sha256 + } + } + + clear_indexeddb(execute_in_page) + execute_in_page('initial_data = arguments[0];', initial_data) + database_contents = get_db_contents(execute_in_page) + + assert database_contents['resource'] == [sample_resource] + assert database_contents['mapping'] == [sample_mapping] + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_haketilodb_settings(driver, execute_in_page): + """ + indexeddb.js facilitates operating on Haketilo's internal database. + Verify assigning/retrieving values of simple "setting" item works properly. + """ + execute_in_page(load_script('common/indexeddb.js')) + mock_broadcast(execute_in_page) + + # Start with no database. + clear_indexeddb(execute_in_page) + + assert get_db_contents(execute_in_page)['setting'] == [] + + assert execute_in_page('returnval(get_setting("option15"));') == None + + execute_in_page('returnval(set_setting("option15", "disable"));') + assert execute_in_page('returnval(get_setting("option15"));') == 'disable' + + execute_in_page('returnval(set_setting("option15", "enable"));') + assert execute_in_page('returnval(get_setting("option15"));') == 'enable' + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_haketilodb_allowing(driver, execute_in_page): + """ + indexeddb.js facilitates operating on Haketilo's internal database. + Verify changing the "blocking" configuration for a URL works properly. + """ + execute_in_page(load_script('common/indexeddb.js')) + mock_broadcast(execute_in_page) + + # Start with no database. + clear_indexeddb(execute_in_page) + + assert get_db_contents(execute_in_page)['blocking'] == [] + + def run_with_sample_url(expr): + return execute_in_page(f'returnval({expr});', 'https://example.com/**') + + assert None == run_with_sample_url('get_allowing(arguments[0])') + + run_with_sample_url('set_disallowed(arguments[0])') + assert False == run_with_sample_url('get_allowing(arguments[0])') + + run_with_sample_url('set_allowed(arguments[0])') + assert True == run_with_sample_url('get_allowing(arguments[0])') + + run_with_sample_url('set_default_allowing(arguments[0])') + assert None == run_with_sample_url('get_allowing(arguments[0])') + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_haketilodb_repos(driver, execute_in_page): + """ + indexeddb.js facilitates operating on Haketilo's internal database. + Verify operations on repositories list work properly. + """ + execute_in_page(load_script('common/indexeddb.js')) + mock_broadcast(execute_in_page) + + # Start with no database. + clear_indexeddb(execute_in_page) + + assert get_db_contents(execute_in_page)['repo'] == [] + + sample_urls = ['https://hdrlla.example.com/', 'https://hdrlla.example.org'] + + assert [] == execute_in_page('returnval(get_repos());') + + execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[0]) + assert [sample_urls[0]] == execute_in_page('returnval(get_repos());') + + execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[1]) + assert set(sample_urls) == set(execute_in_page('returnval(get_repos());')) + + execute_in_page('returnval(del_repo(arguments[0]));', sample_urls[0]) + assert [sample_urls[1]] == execute_in_page('returnval(get_repos());') + +test_page_html = ''' +<!DOCTYPE html> +<script src="/testpage.js"></script> +<body> +</body> +''' + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'test_page': test_page_html, + 'extra_files': { + 'testpage.js': lambda: load_script('common/indexeddb.js') + } +}) +@pytest.mark.usefixtures('webextension') +def test_haketilodb_track(driver, execute_in_page, wait_elem_text): + """ + Verify IndexedDB object change notifications are correctly broadcasted + through extension's background script and allow for object store contents + to be tracked in any execution context. + """ + # Let's open the same extension's test page in a second window. Window 1 + # will be used to make changes to IndexedDB and window 0 to "track" those + # changes. + driver.execute_script('window.open(window.location.href, "_blank");') + WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2) + windows = [*driver.window_handles] + + # Create elements that will have tracked data inserted under them. + driver.switch_to.window(windows[0]) + execute_in_page( + ''' + for (const store_name of trackable) { + const h2 = document.createElement("h2"); + h2.innerText = store_name; + document.body.append(h2); + + const ul = document.createElement("ul"); + ul.id = store_name; + document.body.append(ul); + } + ''') + + # Mock initial_data. + sample_resource = make_sample_resource() + sample_mapping = make_sample_mapping() + initial_data = { + 'resource': { + 'helloapple': { + '1.0': sample_resource + } + }, + 'mapping': { + 'helloapple': { + '0.1.1': sample_mapping + } + }, + 'file': { + 'sha256': sample_files_by_sha256 + } + } + driver.switch_to.window(windows[1]) + execute_in_page('initial_data = arguments[0];', initial_data) + execute_in_page('returnval(set_setting("option15", "123"));') + execute_in_page('returnval(set_repo("https://hydril.la"));') + execute_in_page('returnval(set_disallowed("file:///*"));') + + # See if track.*() functions properly return the already-existing items. + driver.switch_to.window(windows[0]) + execute_in_page( + ''' + function update_item(store_name, change) + { + const elem_id = `${store_name}_${change.key}`; + let elem = document.getElementById(elem_id); + elem = elem || document.createElement("li"); + elem.id = elem_id; + elem.innerText = JSON.stringify(change.new_val); + document.getElementById(store_name).append(elem); + if (change.new_val === undefined) + elem.remove(); + } + + let resource_tracking, resource_items, mapping_tracking, mapping_items; + + async function start_reporting() + { + const props = new Map(stores.map(([sn, opt]) => [sn, opt.keyPath])); + for (const store_name of trackable) { + [tracking, items] = + await track[store_name](ch => update_item(store_name, ch)); + const prop = props.get(store_name); + for (const item of items) + update_item(store_name, {key: item[prop], new_val: item}); + } + } + + returnval(start_reporting()); + ''') + + item_counts = execute_in_page( + '''{ + const childcount = id => document.getElementById(id).childElementCount; + returnval(trackable.map(childcount)); + }''') + assert item_counts == [1 for _ in item_counts] + for elem_id, json_value in [ + ('resource_helloapple', sample_resource), + ('mapping_helloapple', sample_mapping), + ('setting_option15', {'name': 'option15', 'value': '123'}), + ('repo_https://hydril.la', {'url': 'https://hydril.la'}), + ('blocking_file:///*', {'pattern': 'file:///*', 'allow': False}) + ]: + assert json.loads(driver.find_element_by_id(elem_id).text) == json_value + + # See if item additions get tracked properly. + driver.switch_to.window(windows[1]) + sample_resource2 = make_sample_resource() + sample_resource2['identifier'] = 'helloapple-copy' + sample_mapping2 = make_sample_mapping() + sample_mapping2['identifier'] = 'helloapple-copy' + sample_data = { + 'resource': { + 'helloapple-copy': { + '1.0': sample_resource2 + } + }, + 'mapping': { + 'helloapple-copy': { + '0.1.1': sample_mapping2 + } + }, + 'file': { + 'sha256': sample_files_by_sha256 + }, + 'repo': [ + 'https://hydril2.la/' + ] + } + execute_in_page('returnval(save_items(arguments[0]));', sample_data) + execute_in_page('returnval(set_setting("option22", "abc"));') + execute_in_page('returnval(set_repo("https://hydril3.la/"));') + execute_in_page('returnval(set_allowed("ftp://a.bc/"));') + + driver.switch_to.window(windows[0]) + driver.implicitly_wait(10) + for elem_id, json_value in [ + ('resource_helloapple-copy', sample_resource2), + ('mapping_helloapple-copy', sample_mapping2), + ('setting_option22', {'name': 'option22', 'value': 'abc'}), + ('repo_https://hydril2.la/', {'url': 'https://hydril2.la/'}), + ('repo_https://hydril3.la/', {'url': 'https://hydril3.la/'}), + ('blocking_ftp://a.bc/', {'pattern': 'ftp://a.bc/', 'allow': True}) + ]: + assert json.loads(driver.find_element_by_id(elem_id).text) == json_value + driver.implicitly_wait(0) + + # See if item deletions/modifications get tracked properly. + driver.switch_to.window(windows[1]) + execute_in_page( + '''{ + async function change_remove_items() + { + const store_names = ["resource", "mapping"]; + const ctx = await start_items_transaction(store_names, {}); + await remove_resource("helloapple", ctx); + await remove_mapping("helloapple-copy", ctx); + await finalize_transaction(ctx); + await set_setting("option22", null); + await del_repo("https://hydril.la"); + await set_default_allowing("file:///*"); + await set_disallowed("ftp://a.bc/"); + } + returnval(change_remove_items()); + }''') + + removed_ids = ['mapping_helloapple-copy', 'resource_helloapple', + 'repo_https://hydril.la', 'blocking_file:///*'] + def condition_items_absent_and_changed(driver): + for id in removed_ids: + try: + driver.find_element_by_id(id) + return False + except WebDriverException: + pass + + option_text = driver.find_element_by_id('setting_option22').text + blocking_text = driver.find_element_by_id('blocking_ftp://a.bc/').text + return (json.loads(option_text)['value'] == None and + json.loads(blocking_text)['allow'] == False) + + driver.switch_to.window(windows[0]) + WebDriverWait(driver, 10).until(condition_items_absent_and_changed) diff --git a/test/haketilo_test/unit/test_indexeddb_files_server.py b/test/haketilo_test/unit/test_indexeddb_files_server.py new file mode 100644 index 0000000..6ddfba8 --- /dev/null +++ b/test/haketilo_test/unit/test_indexeddb_files_server.py @@ -0,0 +1,171 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - serving indexeddb resource script files to content scripts +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021,2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import copy +from uuid import uuid4 +from selenium.webdriver.support.ui import WebDriverWait + +from ..script_loader import load_script +from .utils import * + +""" +How many test resources we're going to have. +""" +count = 15 + +sample_files_list = [(f'file_{n}_{i}', f'contents {n} {i}') + for n in range(count) for i in range(2)] + +sample_files = dict(sample_files_list) + +sample_files, sample_files_by_sha256 = make_sample_files(sample_files) + +def make_sample_resource_with_deps(n): + resource = make_sample_resource(with_files=False) + + resource['identifier'] = f'res-{n}' + resource['dependencies'] = [{'identifier': f'res-{m}'} + for m in range(max(n - 4, 0), n)] + resource['scripts'] = [sample_file_ref(f'file_{n}_{i}', sample_files) + for i in range(2)] + + return resource + +resources = [make_sample_resource_with_deps(n) for n in range(count)] + +sample_data = { + 'resource': sample_data_dict(resources), + 'mapping': {}, + 'file': { + 'sha256': sample_files_by_sha256 + } +} + +def prepare_test_page(initial_indexeddb_data, execute_in_page): + js = load_script('background/indexeddb_files_server.js', + code_to_add='#IMPORT common/broadcast.js') + execute_in_page(js) + + mock_broadcast(execute_in_page) + clear_indexeddb(execute_in_page) + + execute_in_page( + ''' + let registered_listener; + const new_addListener = cb => registered_listener = cb; + + browser = {runtime: {onMessage: {addListener: new_addListener}}}; + + haketilodb.save_items(arguments[0]); + + start(); + ''', + initial_indexeddb_data) + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_indexeddb_files_server_normal_usage(driver, execute_in_page): + """ + Test querying resource files (with resource dependency resolution) + from IndexedDB and serving them in messages to content scripts. + """ + prepare_test_page(sample_data, execute_in_page) + + # Verify other types of messages are ignored. + function_returned_value = execute_in_page( + ''' + returnval(registered_listener(["???"], {}, + () => location.reload())); + ''') + assert function_returned_value == None + + # Verify single resource's files get properly resolved. + function_returned_value = execute_in_page( + ''' + var result_cb, contents_prom = new Promise(cb => result_cb = cb); + + returnval(registered_listener(["indexeddb_files", "res-0"], + {}, result_cb)); + ''') + assert function_returned_value == True + + assert execute_in_page('returnval(contents_prom);') == \ + {'files': [tuple[1] for tuple in sample_files_list[0:2]]} + + # Verify multiple resources' files get properly resolved. + function_returned_value = execute_in_page( + ''' + var result_cb, contents_prom = new Promise(cb => result_cb = cb); + + returnval(registered_listener(["indexeddb_files", arguments[0]], + {}, result_cb)); + ''', + f'res-{count - 1}') + assert function_returned_value == True + + assert execute_in_page('returnval(contents_prom);') == \ + {'files': [tuple[1] for tuple in sample_files_list]} + +@pytest.mark.get_page('https://gotmyowndoma.in') +@pytest.mark.parametrize('error', [ + 'missing', + 'circular', + 'db', + 'other' +]) +def test_indexeddb_files_server_errors(driver, execute_in_page, error): + """ + Test reporting of errors when querying resource files (with resource + dependency resolution) from IndexedDB and serving them in messages to + content scripts. + """ + sample_data_copy = copy.deepcopy(sample_data) + + if error == 'missing': + del sample_data_copy['resource']['res-3'] + elif error == 'circular': + res3_defs = sample_data_copy['resource']['res-3'].values() + next(iter(res3_defs))['dependencies'].append({'identifier': 'res-8'}) + + prepare_test_page(sample_data_copy, execute_in_page) + + if error == 'db': + execute_in_page('haketilodb.idb_get = t => t.onerror("oooops");') + elif error == 'other': + execute_in_page('haketilodb.idb_get = () => {throw "oooops"};') + + response = execute_in_page( + ''' + var result_cb, contents_prom = new Promise(cb => result_cb = cb); + + registered_listener(["indexeddb_files", arguments[0]], + {}, result_cb); + + returnval(contents_prom); + ''', + f'res-{count - 1}') + + assert response['error']['haketilo_error_type'] == error + + if error == 'missing': + assert response['error']['id'] == 'res-3' + elif error == 'circular': + assert response['error']['id'] in ('res-3', 'res-8') + elif error not in ('db', 'other'): + raise Exception('made a typo in test function params?') diff --git a/test/haketilo_test/unit/test_install.py b/test/haketilo_test/unit/test_install.py new file mode 100644 index 0000000..f4bc483 --- /dev/null +++ b/test/haketilo_test/unit/test_install.py @@ -0,0 +1,423 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - item installation dialog +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +from selenium.webdriver.support.ui import WebDriverWait + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +def setup_view(driver, execute_in_page): + mock_cacher(execute_in_page) + + execute_in_page(load_script('html/install.js')) + container_ids, containers_objects = execute_in_page( + ''' + const cb_calls = []; + const install_view = new InstallView(0, + () => cb_calls.push("show"), + () => cb_calls.push("hide")); + document.body.append(install_view.main_div); + const ets = () => install_view.item_entries; + const shw = slice => [cb_calls.slice(slice || 0), install_view.shown]; + returnval([container_ids, container_ids.map(cid => install_view[cid])]); + ''') + + containers = dict(zip(container_ids, containers_objects)) + + def assert_container_displayed(container_id): + for cid, cobj in zip(container_ids, containers_objects): + assert (cid == container_id) == cobj.is_displayed() + + return containers, assert_container_displayed + +install_ext_data = { + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/install.html', {}), + 'navigate_to': 'html/install.html' +} + +@pytest.mark.ext_data(install_ext_data) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('complex_variant', [False, True]) +def test_install_normal_usage(driver, execute_in_page, complex_variant): + """ + Test of the normal package installation procedure with one mapping and, + depending on parameter, one or many resources. + """ + containers, assert_container_displayed = setup_view(driver, execute_in_page) + + assert execute_in_page('returnval(shw());') == [[], False] + + if complex_variant: + # The resource/mapping others depend on. + root_id = 'abcd-defg-ghij' + root_resource_id = f'resource_{root_id}' + root_mapping_id = f'mapping_{root_id}' + # Those ids are used to check the alphabetical ordering. + resource_ids = [f'resource_{letters}' for letters in ( + 'a', 'abcd', root_id, 'b', 'c', + 'd', 'defg', 'e', 'f', + 'g', 'ghij', 'h', 'i', 'j' + )] + files_count = 9 + else: + root_resource_id = f'resource_a' + root_mapping_id = f'mapping_a' + resource_ids = [root_resource_id] + files_count = 0 + + # Preview the installation of a resource, show resource's details, close + # the details and cancel installation. + execute_in_page('returnval(install_view.show(...arguments));', + 'https://hydril.la/', 'resource', root_resource_id) + + assert execute_in_page('returnval(shw());') == [['show'], True] + assert f'{root_resource_id}-2021.11.11-1'\ + in containers['install_preview'].text + assert_container_displayed('install_preview') + + entries = execute_in_page('returnval(ets().map(e => e.main_li.innerText));') + assert len(entries) == len(resource_ids) + # Verify alphabetical ordering. + assert all([id in text for id, text in zip(resource_ids, entries)]) + + assert not execute_in_page('returnval(ets()[0].old_ver);').is_displayed() + execute_in_page('returnval(ets()[0].details_but);').click() + assert 'resource_a' in containers['resource_preview_container'].text + assert_container_displayed('resource_preview_container') + + execute_in_page('returnval(install_view.resource_back_but);').click() + assert_container_displayed('install_preview') + + assert execute_in_page('returnval(shw());') == [['show'], True] + execute_in_page('returnval(install_view.cancel_but);').click() + assert execute_in_page('returnval(shw());') == [['show', 'hide'], False] + + # Preview the installation of a mapping and a resource, show mapping's + # details, close the details and commit the installation. + execute_in_page('returnval(install_view.show(...arguments));', + 'https://hydril.la/', 'mapping', + root_mapping_id, [2022, 5, 10]) + + assert execute_in_page('returnval(shw(2));') == [['show'], True] + assert_container_displayed('install_preview') + + entries = execute_in_page('returnval(ets().map(e => e.main_li.innerText));') + assert len(entries) == len(resource_ids) + 1 + assert f'{root_mapping_id}-2022.5.10' in entries[0] + # Verify alphabetical ordering. + assert all([id in text for id, text in zip(resource_ids, entries[1:])]) + + assert not execute_in_page('returnval(ets()[0].old_ver);').is_displayed() + execute_in_page('returnval(ets()[0].details_but);').click() + assert root_mapping_id in containers['mapping_preview_container'].text + assert_container_displayed('mapping_preview_container') + + execute_in_page('returnval(install_view.mapping_back_but);').click() + assert_container_displayed('install_preview') + + execute_in_page('returnval(install_view.install_but);').click() + installed = lambda d: 'ly installed!' in containers['dialog_container'].text + WebDriverWait(driver, 10).until(installed) + + assert execute_in_page('returnval(shw(2));') == [['show'], True] + execute_in_page('returnval(install_view.dialog_ctx.ok_but);').click() + assert execute_in_page('returnval(shw(2));') == [['show', 'hide'], False] + + # Verify the install + db_contents = get_db_contents(execute_in_page) + for item_type, ids in \ + [('mapping', {root_mapping_id}), ('resource', set(resource_ids))]: + assert set([it['identifier'] for it in db_contents[item_type]]) == ids + + assert all([len(db_contents[store]) == files_count + for store in ('file', 'file_uses')]) + + # Update the installed mapping to a newer version. + execute_in_page('returnval(install_view.show(...arguments));', + 'https://hydril.la/', 'mapping', root_mapping_id) + assert execute_in_page('returnval(shw(4));') == [['show'], True] + # resources are already in the newest versions, hence they should not appear + # in the install preview list. + assert execute_in_page('returnval(ets().length);') == 1 + # Mapping's version update information should be displayed. + assert execute_in_page('returnval(ets()[0].old_ver);').is_displayed() + execute_in_page('returnval(install_view.install_but);').click() + + WebDriverWait(driver, 10).until(installed) + + assert execute_in_page('returnval(shw(4));') == [['show'], True] + execute_in_page('returnval(install_view.dialog_ctx.ok_but);').click() + assert execute_in_page('returnval(shw(4));') == [['show', 'hide'], False] + + # Verify the newer version install. + old_db_contents, db_contents = db_contents, get_db_contents(execute_in_page) + old_db_contents['mapping'][0]['version'][-1] += 1 + assert db_contents['mapping'] == old_db_contents['mapping'] + + # All items are up to date - verify dialog is instead shown in this case. + execute_in_page('install_view.show(...arguments);', + 'https://hydril.la/', 'mapping', root_mapping_id) + + fetched = lambda d: 'Fetching ' not in containers['dialog_container'].text + WebDriverWait(driver, 10).until(fetched) + + assert 'Nothing to do - packages already installed.' \ + in containers['dialog_container'].text + assert_container_displayed('dialog_container') + + assert execute_in_page('returnval(shw(6));') == [['show'], True] + execute_in_page('returnval(install_view.dialog_ctx.ok_but);').click() + assert execute_in_page('returnval(shw(6));') == [['show', 'hide'], False] + +@pytest.mark.ext_data(install_ext_data) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('message', [ + 'fetching_data', + 'failure_to_communicate_sendmessage', + 'HTTP_code_item', + 'invalid_JSON', + 'newer_API_version', + 'invalid_response_format', + 'indexeddb_error_item', + 'installing', + 'indexeddb_error_file_uses', + 'failure_to_communicate_fetch', + 'HTTP_code_file', + 'not_valid_text', + 'sha256_mismatch', + 'indexeddb_error_write' +]) +def test_install_dialogs(driver, execute_in_page, message): + """ + Test of various error and loading messages used in install view. + """ + containers, assert_container_displayed = setup_view(driver, execute_in_page) + + def dlg_buts(): + return execute_in_page( + '''{ + const dlg = install_view.dialog_ctx; + const ids = ['ask_buts', 'conf_buts']; + returnval(ids.filter(id => !dlg[id].classList.contains("hide"))); + }''') + + def dialog_txt(): + return execute_in_page( + 'returnval(install_view.dialog_ctx.msg.textContent);' + ) + + def assert_dlg(awaited_buttons, expected_msg, hides_install_view=True, + button_to_click='ok_but'): + WebDriverWait(driver, 10).until(lambda d: dlg_buts() == awaited_buttons) + + assert expected_msg == dialog_txt() + + execute_in_page( + f'returnval(install_view.dialog_ctx.{button_to_click});' + ).click() + + if hides_install_view: + assert execute_in_page('returnval(shw());') == \ + [['show', 'hide'], False] + + if message == 'fetching_data': + execute_in_page( + ''' + browser.tabs.sendMessage = () => new Promise(cb => {}); + install_view.show(...arguments); + ''', + 'https://hydril.la/', 'mapping', 'mapping_a') + + assert dlg_buts() == [] + assert dialog_txt() == 'Fetching data from repository...' + elif message == 'failure_to_communicate_sendmessage': + execute_in_page( + ''' + browser.tabs.sendMessage = () => Promise.resolve({error: "sth"}); + install_view.show(...arguments); + ''', + 'https://hydril.la/', 'mapping', 'mapping_a') + + assert_dlg(['conf_buts'], 'Failure to communicate with repository :(') + elif message == 'HTTP_code_item': + execute_in_page( + ''' + const response = {ok: false, status: 404}; + browser.tabs.sendMessage = () => Promise.resolve(response); + install_view.show(...arguments); + ''', + 'https://hydril.la/', 'mapping', 'mapping_a') + + assert_dlg(['conf_buts'], 'Repository sent HTTP code 404 :(') + elif message == 'invalid_JSON': + execute_in_page( + ''' + const response = {ok: true, status: 200, error_json: "sth"}; + browser.tabs.sendMessage = () => Promise.resolve(response); + install_view.show(...arguments); + ''', + 'https://hydril.la/', 'mapping', 'mapping_a') + + assert_dlg(['conf_buts'], "Repository's response is not valid JSON :(") + elif message == 'newer_API_version': + execute_in_page( + ''' + const response = { + ok: true, + status: 200, + json: {$schema: "https://hydrilla.koszko.org/schemas/api_mapping_description-2.1.schema.json"} + }; + browser.tabs.sendMessage = () => Promise.resolve(response); + install_view.show(...arguments); + ''', + 'https://hydril.la/', 'mapping', 'somemapping', [2, 1]) + + assert_dlg(['conf_buts'], + 'Mapping somemapping-2.1 was served using unsupported Hydrilla API version. You might need to update Haketilo.') + elif message == 'invalid_response_format': + execute_in_page( + ''' + const response = { + ok: true, + status: 200, + /* $schema is not a string as it should be. */ + json: {$schema: null} + }; + browser.tabs.sendMessage = () => Promise.resolve(response); + install_view.show(...arguments); + ''', + 'https://hydril.la/', 'resource', 'someresource') + + assert_dlg(['conf_buts'], + 'Resource someresource was served using a nonconforming response format.') + elif message == 'indexeddb_error_item': + execute_in_page( + ''' + haketilodb.idb_get = () => {throw "some error";}; + install_view.show(...arguments); + ''', + 'https://hydril.la/', 'mapping', 'mapping_a') + + assert_dlg(['conf_buts'], + "Error accessing Haketilo's internal database :(") + elif message == 'installing': + execute_in_page( + ''' + haketilodb.save_items = () => new Promise(() => {}); + returnval(install_view.show(...arguments)); + ''', + 'https://hydril.la/', 'mapping', 'mapping_b') + + execute_in_page('returnval(install_view.install_but);').click() + + assert dlg_buts() == [] + assert dialog_txt() == 'Installing...' + elif message == 'indexeddb_error_file_uses': + execute_in_page( + ''' + const old_idb_get = haketilodb.idb_get; + haketilodb.idb_get = function(transaction, store_name, identifier) { + if (store_name === "file_uses") + throw "some error"; + return old_idb_get(...arguments); + } + returnval(install_view.show(...arguments)); + ''', + 'https://hydril.la/', 'mapping', 'mapping_b') + + execute_in_page('returnval(install_view.install_but);').click() + + assert_dlg(['conf_buts'], + "Error accessing Haketilo's internal database :(") + elif message == 'failure_to_communicate_fetch': + execute_in_page( + ''' + fetch = () => {throw "some error";}; + returnval(install_view.show(...arguments)); + ''', + 'https://hydril.la/', 'mapping', 'mapping_b') + + execute_in_page('returnval(install_view.install_but);').click() + + assert_dlg(['conf_buts'], + 'Failure to communicate with repository :(') + elif message == 'HTTP_code_file': + execute_in_page( + ''' + fetch = () => Promise.resolve({ok: false, status: 400}); + returnval(install_view.show(...arguments)); + ''', + 'https://hydril.la/', 'mapping', 'mapping_b') + + execute_in_page('returnval(install_view.install_but);').click() + + assert_dlg(['conf_buts'], 'Repository sent HTTP code 400 :(') + elif message == 'not_valid_text': + execute_in_page( + ''' + const err = () => {throw "some error";}; + fetch = () => Promise.resolve({ok: true, status: 200, text: err}); + returnval(install_view.show(...arguments)); + ''', + 'https://hydril.la/', 'mapping', 'mapping_b') + + execute_in_page('returnval(install_view.install_but);').click() + + assert_dlg(['conf_buts'], "Repository's response is not valid text :(") + elif message == 'sha256_mismatch': + execute_in_page( + ''' + let old_fetch = fetch, url_used; + fetch = async function(url) { + url_used = url; + const response = await old_fetch(...arguments); + const text = () => response.text().then(t => t + ":d"); + return {ok: response.ok, status: response.status, text}; + } + returnval(install_view.show(...arguments)); + ''', + 'https://hydril.la/', 'mapping', 'mapping_b') + + execute_in_page('returnval(install_view.install_but);').click() + + get_url_used = lambda d: execute_in_page('returnval(url_used);') + url_used = WebDriverWait(driver, 10).until(get_url_used) + print ((url_used,)) + + assert dlg_buts() == ['conf_buts'] + assert dialog_txt() == \ + f'{url_used} served a file with different SHA256 cryptographic sum :(' + elif message == 'indexeddb_error_write': + execute_in_page( + ''' + haketilodb.save_items = () => {throw "some error";}; + returnval(install_view.show(...arguments)); + ''', + 'https://hydril.la/', 'mapping', 'mapping_b') + + execute_in_page('returnval(install_view.install_but);').click() + + assert_dlg(['conf_buts'], + "Error writing to Haketilo's internal database :(") + else: + raise Exception('made a typo in test function params?') diff --git a/test/haketilo_test/unit/test_item_list.py b/test/haketilo_test/unit/test_item_list.py new file mode 100644 index 0000000..35ed1d5 --- /dev/null +++ b/test/haketilo_test/unit/test_item_list.py @@ -0,0 +1,280 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - displaying list of resources/mappings +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022, Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +from selenium.webdriver.support.ui import WebDriverWait + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +def make_sample_resource(identifier, long_name): + return { + 'source_name': 'hello', + 'source_copyright': [ + sample_file_ref('report.spdx'), + sample_file_ref('LICENSES/CC0-1.0.txt') + ], + 'type': 'resource', + 'identifier': identifier, + 'long_name': long_name, + 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68', + 'version': [2021, 11, 10], + 'revision': 1, + 'description': 'greets an apple', + 'dependencies': [{'identifier': 'hello-message'}], + 'scripts': [ + sample_file_ref('hello.js'), + sample_file_ref('bye.js') + ] + } + +def make_sample_mapping(identifier, long_name): + return { + 'source_name': 'example-org-fixes-new', + 'source_copyright': [ + sample_file_ref('report.spdx'), + sample_file_ref('LICENSES/CC0-1.0.txt') + ], + 'type': 'mapping', + 'identifier': identifier, + 'long_name': long_name, + 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7', + 'version': [2022, 5, 10], + 'description': 'suckless something something', + 'payloads': { + 'https://example.org/a/*': { + 'identifier': 'some-KISS-resource' + }, + 'https://example.org/t/*': { + 'identifier': 'another-KISS-resource' + } + } + } + +def make_item(item_type, *args): + return make_sample_resource(*args) if item_type == 'resource' \ + else make_sample_mapping(*args) + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/item_list.html', {}), + 'navigate_to': 'html/item_list.html' +}) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('item_type', ['resource', 'mapping']) +def test_item_list_ordering(driver, execute_in_page, item_type): + """ + A test case of items list proper ordering. + """ + execute_in_page(load_script('html/item_list.js')) + + # Choose sample long names so as to test automatic sorting of items. + long_names = ['sample', 'sample it', 'Sample it', 'SAMPLE IT', + 'test', 'test it', 'Test it', 'TEST IT'] + # Let's operate on a reverse-sorted copy + long_names_reversed = [*long_names] + long_names_reversed.reverse() + + items = [make_item(item_type, f'it_{hex(2 * i + copy)[-1]}', name) + for i, name in enumerate(long_names_reversed) + for copy in (1, 0)] + # When adding/updating items this item will be updated at the end and this + # last update will be used to verify that a set of opertions completed. + extra_item = make_item(item_type, 'extraitem', 'extra item') + + # After this reversal items are sorted in the exact order they are expected + # to appear in the HTML list. + items.reverse() + + sample_data = { + 'resource': {}, + 'mapping': {}, + 'file': { + 'sha256': sample_files_by_sha256 + } + } + + indexes_added = set() + for iteration, to_include in enumerate([ + set([i for i in range(len(items)) if is_prime(i)]), + set([i for i in range(len(items)) + if not is_prime(i) and i & 1]), + set([i for i in range(len(items)) if i % 3 == 0]), + set([i for i in range(len(items)) + if i % 3 and not i & 1 and not is_prime(i)]), + set(range(len(items))) + ]): + # On the last iteration, re-add ALL items but with changed names. + if len(to_include) == len(items): + for it in items: + it['long_name'] = f'somewhat renamed {it["long_name"]}' + + items_to_inclue = [items[i] for i in sorted(to_include)] + sample_data[item_type] = sample_data_dict(items_to_inclue) + execute_in_page('returnval(haketilodb.save_items(arguments[0]));', + sample_data) + + extra_item['long_name'] = f'{iteration} {extra_item["long_name"]}' + sample_data[item_type] = sample_data_dict([extra_item]) + execute_in_page('returnval(haketilodb.save_items(arguments[0]));', + sample_data) + + if iteration == 0: + execute_in_page( + f''' + let list_ctx; + async function create_list() {{ + list_ctx = await {item_type}_list(); + document.body.append(list_ctx.main_div); + }} + returnval(create_list()); + ''') + + def lis_ready(driver): + return extra_item['long_name'] == execute_in_page( + 'returnval(list_ctx.ul.firstElementChild.textContent);' + ) + + indexes_added.update(to_include) + WebDriverWait(driver, 10).until(lis_ready) + + li_texts = execute_in_page( + ''' + var lis = [...list_ctx.ul.children].slice(1); + returnval(lis.map(li => li.textContent)); + ''') + assert li_texts == [items[i]['long_name'] for i in indexes_added] + + preview_texts = execute_in_page( + '''{ + const get_texts = + li => [li.click(), list_ctx.preview_container.textContent][1]; + returnval(lis.map(get_texts)); + }''') + + for i, text in zip(sorted(indexes_added), preview_texts): + assert items[i]['identifier'] in text + assert items[i]['long_name'] in text + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/item_list.html', {}), + 'navigate_to': 'html/item_list.html' +}) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('item_type', ['resource', 'mapping']) +def test_item_list_displaying(driver, execute_in_page, item_type): + """ + A test case of items list interaction with preview and dialog. + """ + execute_in_page(load_script('html/item_list.js')) + + items = [make_item(item_type, f'item{i}', f'Item {i}') for i in range(3)] + + sample_data = { + 'resource': {}, + 'mapping': {}, + 'file': { + 'sha256': sample_files_by_sha256 + } + } + sample_data[item_type] = sample_data_dict(items) + + preview_container, dialog_container, ul = execute_in_page( + f''' + let list_ctx, sample_data = arguments[0]; + async function create_list() {{ + await haketilodb.save_items(sample_data); + list_ctx = await {item_type}_list(); + document.body.append(list_ctx.main_div); + return [list_ctx.preview_container, list_ctx.dialog_container, + list_ctx.ul]; + }} + returnval(create_list()); + ''', + sample_data) + + assert not preview_container.is_displayed() + + # Check that preview is displayed correctly. + for i in range(3): + execute_in_page('list_ctx.ul.children[arguments[0]].click();', i) + assert preview_container.is_displayed() + text = preview_container.text + assert f'item{i}' in text + assert f'Item {i}' in text + + # Check that item removal confirmation dialog is displayed correctly. + execute_in_page('list_ctx.remove_but.click();') + WebDriverWait(driver, 10).until(lambda _: dialog_container.is_displayed()) + assert 'list_disabled' in ul.get_attribute('class') + assert not preview_container.is_displayed() + msg = execute_in_page('returnval(list_ctx.dialog_ctx.msg.textContent);') + assert msg == "Are you sure you want to delete 'item2'?" + + # Check that previewing other item is impossible while dialog is open. + execute_in_page('list_ctx.ul.children[0].click();') + assert dialog_container.is_displayed() + assert 'list_disabled' in ul.get_attribute('class') + assert not preview_container.is_displayed() + + # Check that queuing multiple removal confirmation dialogs is impossible. + execute_in_page('list_ctx.remove_but.click();') + + # Check that answering "No" causes the item not to be removed and unhides + # item preview. + execute_in_page('list_ctx.dialog_ctx.no_but.click();') + WebDriverWait(driver, 10).until(lambda _: preview_container.is_displayed()) + assert not dialog_container.is_displayed() + assert 'list_disabled' not in ul.get_attribute('class') + assert execute_in_page('returnval(list_ctx.ul.children.length);') == 3 + + # Check that item removal works properly. + def remove_current_item(): + execute_in_page('list_ctx.remove_but.click();') + WebDriverWait(driver, 10)\ + .until(lambda _: dialog_container.is_displayed()) + execute_in_page('list_ctx.dialog_ctx.yes_but.click();') + + remove_current_item() + + def item_deleted(driver): + return execute_in_page('returnval(list_ctx.ul.children.length);') == 2 + WebDriverWait(driver, 10).until(item_deleted) + assert not dialog_container.is_displayed() + assert not preview_container.is_displayed() + assert 'list_disabled' not in ul.get_attribute('class') + + execute_in_page('list_ctx.ul.children[1].click();') + + # Check that item removal failure causes the right error dialog to appear. + execute_in_page('haketilodb.finalize_transaction = () => {throw "sth";};') + remove_current_item() + WebDriverWait(driver, 10).until(lambda _: dialog_container.is_displayed()) + msg = execute_in_page('returnval(list_ctx.dialog_ctx.msg.textContent);') + assert msg == "Couldn't remove 'item1' :(" + + # Destroy item list. + assert True == execute_in_page( + ''' + const main_div = list_ctx.main_div; + destroy_list(list_ctx); + returnval(main_div.parentElement === null); + ''') diff --git a/test/haketilo_test/unit/test_item_preview.py b/test/haketilo_test/unit/test_item_preview.py new file mode 100644 index 0000000..fe9a98e --- /dev/null +++ b/test/haketilo_test/unit/test_item_preview.py @@ -0,0 +1,208 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - displaying resources and mappings details +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022, Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +from selenium.webdriver.support.ui import WebDriverWait +from selenium.common.exceptions import NoSuchWindowException + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +@pytest.mark.ext_data({ + 'extra_html': ExtraHTML('html/item_preview.html', {}), + 'navigate_to': 'html/item_preview.html' +}) +@pytest.mark.usefixtures('webextension') +def test_resource_preview(driver, execute_in_page): + """ + A test case of the resource preview display function. + """ + execute_in_page(load_script('html/item_preview.js')) + + sample_resource = make_sample_resource() + + preview_div = execute_in_page( + ''' + let preview_object = resource_preview(arguments[0]); + document.body.append(preview_object.main_div); + returnval(preview_object.main_div); + ''', + sample_resource) + text = preview_div.text + + assert '...' not in text + + for string in [ + *filter(lambda v: type(v) is str, sample_resource.values()), + *[rr['identifier'] for rr in sample_resource['dependencies']], + *[c['file'] for k in ('source_copyright', 'scripts') + for c in sample_resource[k]], + item_version_string(sample_resource, True) + ]: + assert string in text + + sample_resource['identifier'] = 'hellopear' + sample_resource['long_name'] = 'Hello Pear' + sample_resource['description'] = 'greets a pear' + sample_resource['dependencies'] = [{'identifier': 'hello-msg'}] + for key in ('scripts', 'source_copyright'): + for file_ref in sample_resource[key]: + file_ref['file'] = file_ref['file'].replace('.', '_') + + preview_div = execute_in_page( + ''' + returnval(resource_preview(arguments[0], preview_object).main_div); + ''', + sample_resource) + text = preview_div.text + + for string in ['...', 'pple', 'hello-message', 'report.spdx', + 'LICENSES/CC0-1.0.txt', 'hello.js', 'bye.js']: + assert string not in text + + for string in ['hellopear', 'Hello Pear', 'hello-msg', 'greets a pear', + 'report_spdx', 'LICENSES/CC0-1_0_txt', 'hello_js', 'bye_js']: + assert string in text + +@pytest.mark.ext_data({ + 'extra_html': ExtraHTML('html/item_preview.html', {}), + 'navigate_to': 'html/item_preview.html' +}) +@pytest.mark.usefixtures('webextension') +def test_mapping_preview(driver, execute_in_page): + """ + A test case of the mapping preview display function. + """ + execute_in_page(load_script('html/item_preview.js')) + + sample_mapping = make_sample_mapping() + + preview_div = execute_in_page( + ''' + let preview_object = mapping_preview(arguments[0]); + document.body.append(preview_object.main_div); + returnval(preview_object.main_div); + ''', + sample_mapping) + text = preview_div.text + + assert '...' not in text + + for string in [ + *filter(lambda v: type(v) is str, sample_mapping.values()), + *[p['identifier'] for p in sample_mapping['payloads'].values()], + *[c['file'] for c in sample_mapping['source_copyright']], + item_version_string(sample_mapping) + ]: + assert string in text + + sample_mapping['identifier'] = 'example-org-bloated' + sample_mapping['long_name'] = 'Example.org Bloated', + sample_mapping['payloads'] = dict( + [(pat.replace('.org', '.com'), res_id) + for pat, res_id in sample_mapping['payloads'].items()] + ) + for file_ref in sample_mapping['source_copyright']: + file_ref['file'] = file_ref['file'].replace('.', '_') + + preview_div = execute_in_page( + ''' + returnval(mapping_preview(arguments[0], preview_object).main_div); + ''', + sample_mapping) + text = preview_div.text + + for string in ['...', 'inimal', 'example.org', 'report.spdx', + 'LICENSES/CC0-1.0.txt']: + assert string not in text + + for string in ['example-org-bloated', 'Example.org Bloated', 'example.com', + 'report_spdx', 'LICENSES/CC0-1_0_txt']: + assert string in text + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'extra_html': [ + ExtraHTML('html/item_preview.html', {}), + ExtraHTML('html/file_preview.html', {}, wrap_into_htmldoc=False) + ], + 'navigate_to': 'html/item_preview.html' +}) +@pytest.mark.usefixtures('webextension') +def test_file_preview_link(driver, execute_in_page): + """ + A test case of <a> links created by preview functions that allow a + referenced file to be previewed. + """ + execute_in_page(load_script('html/item_preview.js')) + + sample_data = make_complete_sample_data() + sample_data['mapping'] = {} + execute_in_page('returnval(haketilodb.save_items(arguments[0]));', + sample_data) + + # Cause the "link" to `bye.js` to be invalid. + sample_resource = make_sample_resource() + sample_resource['scripts'][1]['sha256'] = 'dummy nonexistent hash' + + execute_in_page( + ''' + let resource_preview_object = resource_preview(arguments[0], undefined); + document.body.append(resource_preview_object.main_div); + ''', + sample_resource) + + window0 = driver.window_handles[0] + driver.find_element_by_link_text('hello.js').click() + + def blob_url_navigated(driver): + if len(driver.window_handles) < 2: + return + window1 = [wh for wh in driver.window_handles if wh != window0][0] + driver.switch_to.window(window1) + try: + return driver.current_url.startswith('blob') + except NoSuchWindowException: + pass + + WebDriverWait(driver, 10).until(blob_url_navigated) + + assert sample_files['hello.js']['contents'].strip() \ + in driver.find_element_by_tag_name("pre").text + + driver.close() + driver.switch_to.window(window0) + + driver.find_element_by_link_text('bye.js').click() + + def get_error_span(driver): + if len(driver.window_handles) < 2: + return + window1 = [wh for wh in driver.window_handles if wh != window0][0] + driver.switch_to.window(window1) + try: + return driver.find_element_by_id('error_msg') + except NoSuchWindowException: + pass + + error_span = WebDriverWait(driver, 10).until(get_error_span) + assert error_span.is_displayed() + assert "Couldn't find file in Haketilo's internal database :(" \ + in error_span.text diff --git a/test/haketilo_test/unit/test_patterns.py b/test/haketilo_test/unit/test_patterns.py new file mode 100644 index 0000000..f2eeaf8 --- /dev/null +++ b/test/haketilo_test/unit/test_patterns.py @@ -0,0 +1,152 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - URL patterns +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021, Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest + +from ..script_loader import load_script + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_regexes(execute_in_page): + """ + patterns.js contains regexes used for URL parsing. + Verify they work properly. + """ + execute_in_page(load_script('common/patterns.js')) + + valid_url = 'https://example.com/a/b?ver=1.2.3#heading2' + valid_url_rest = 'example.com/a/b?ver=1.2.3#heading2' + + # Test matching of URL protocol. + match = execute_in_page('returnval(proto_regex.exec(arguments[0]));', + valid_url) + assert match + assert match[1] == 'https' + assert match[2] == valid_url_rest + + match = execute_in_page('returnval(proto_regex.exec(arguments[0]));', + '://bad-url.missing/protocol') + assert match is None + + # Test matching of http(s) URLs. + match = execute_in_page('returnval(http_regex.exec(arguments[0]));', + valid_url_rest) + assert match + assert match[1] == 'example.com' + assert match[2] == '/a/b' + assert match[3] == '?ver=1.2.3' + + match = execute_in_page('returnval(http_regex.exec(arguments[0]));', + 'another.example.com') + assert match + assert match[1] == 'another.example.com' + assert match[2] == '' + assert match[3] == '' + + match = execute_in_page('returnval(http_regex.exec(arguments[0]));', + '/bad/http/example') + assert match == None + + # Test matching of file URLs. + match = execute_in_page('returnval(file_regex.exec(arguments[0]));', + '/good/file/example') + assert match + assert match[1] == '/good/file/example' + + # Test matching of ftp URLs. + match = execute_in_page('returnval(ftp_regex.exec(arguments[0]));', + 'example.com/a/b#heading2') + assert match + assert match[1] is None + assert match[2] == 'example.com' + assert match[3] == '/a/b' + + match = execute_in_page('returnval(ftp_regex.exec(arguments[0]));', + 'some_user@localhost') + assert match + assert match[1] == 'some_user@' + assert match[2] == 'localhost' + assert match[3] == '' + + match = execute_in_page('returnval(ftp_regex.exec(arguments[0]));', + '@bad.url/') + assert match is None + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_deconstruct_url(execute_in_page): + """ + patterns.js contains deconstruct_url() function that handles URL parsing. + Verify it works properly. + """ + execute_in_page(load_script('common/patterns.js')) + + deco = execute_in_page('returnval(deconstruct_url(arguments[0]));', + 'https://eXaMpLe.com/a/b?ver=1.2.3#heading2') + assert deco + assert deco['trailing_slash'] == False + assert deco['proto'] == 'https' + assert deco['domain'] == ['example', 'com'] + assert deco['path'] == ['a', 'b'] + + deco = execute_in_page('returnval(deconstruct_url(arguments[0]));', + 'http://**.example.com/') + assert deco + assert deco['trailing_slash'] == True + assert deco['proto'] == 'http' + assert deco['domain'] == ['**', 'example', 'com'] + assert deco['path'] == [] + + deco = execute_in_page('returnval(deconstruct_url(arguments[0]));', + 'ftp://user@ftp.example.com/all///passwords.txt/') + assert deco + assert deco['trailing_slash'] == True + assert deco['proto'] == 'ftp' + assert deco['domain'] == ['ftp', 'example', 'com'] + assert deco['path'] == ['all', 'passwords.txt'] + + deco = execute_in_page('returnval(deconstruct_url(arguments[0]));', + 'ftp://mirror.edu.pl.eu.org') + assert deco + assert deco['trailing_slash'] == False + assert deco['proto'] == 'ftp' + assert deco['domain'] == ['mirror', 'edu', 'pl', 'eu', 'org'] + assert deco['path'] == [] + + deco = execute_in_page('returnval(deconstruct_url(arguments[0]));', + 'file:///mnt/parabola_chroot///etc/passwd') + assert deco + assert deco['trailing_slash'] == False + assert deco['proto'] == 'file' + assert deco['path'] == ['mnt', 'parabola_chroot', 'etc', 'passwd'] + assert 'domain' not in deco + + for bad_url in [ + '://bad-url.missing/protocol', + 'http:/example.com/a/b', + 'unknown://example.com/a/b', + 'idontfancypineapple', + 'ftp://@example.org/', + 'https:///some/path/', + 'file://non-absolute/path' + ]: + with pytest.raises(Exception, match=r'Error in injected script'): + deco = execute_in_page('returnval(deconstruct_url(arguments[0]));', + bad_url) + + # at some point we might also consider testing url deconstruction with + # length limits... diff --git a/test/haketilo_test/unit/test_patterns_query_manager.py b/test/haketilo_test/unit/test_patterns_query_manager.py new file mode 100644 index 0000000..9fbc438 --- /dev/null +++ b/test/haketilo_test/unit/test_patterns_query_manager.py @@ -0,0 +1,307 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - building pattern tree and putting it in a content script +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021,2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +from selenium.webdriver.support.ui import WebDriverWait +from selenium.common.exceptions import TimeoutException + +from ..script_loader import load_script + +def simple_sample_mapping(patterns, fruit): + if type(patterns) is not list: + patterns = [patterns] + payloads = dict([(p, {'identifier': f'{fruit}-{p}'}) for p in patterns]) + return { + 'source_copyright': [], + 'type': 'mapping', + 'identifier': f'inject-{fruit}', + 'payloads': payloads + } + +def get_content_script_values(driver, content_script): + """ + Allow easy extraction of 'this.something = ...' values from generated + content script and verify the content script is syntactically correct. + """ + return driver.execute_script( + ''' + function value_holder() { + %s; + return this; + } + return value_holder.call({}); + ''' % content_script) + +# Fields that are not relevant for testing are omitted from these mapping +# definitions. +sample_mappings = [simple_sample_mapping(pats, fruit) for pats, fruit in [ + (['https://gotmyowndoma.in/index.html', + 'http://gotmyowndoma.in/index.html'], 'banana'), + (['https://***.gotmyowndoma.in/index.html', + 'https://**.gotmyowndoma.in/index.html', + 'https://*.gotmyowndoma.in/index.html', + 'https://gotmyowndoma.in/index.html'], 'orange'), + ('https://gotmyowndoma.in/index.html/***', 'grape'), + ('http://gotmyowndoma.in/index.html/***', 'melon'), + ('https://gotmyowndoma.in/index.html', 'peach'), + ('https://gotmyowndoma.in/*', 'pear'), + ('https://gotmyowndoma.in/**', 'raspberry'), + ('https://gotmyowndoma.in/***', 'strawberry'), + ('https://***.gotmyowndoma.in/index.html', 'apple'), + ('https://***.gotmyowndoma.in/*', 'avocado'), + ('https://***.gotmyowndoma.in/**', 'papaya'), + ('https://***.gotmyowndoma.in/***', 'kiwi') +]] + +sample_blocking = [f'http{s}://{dw}gotmyown%sdoma.in{i}{pw}' + for dw in ('', '***.', '**.', '*.') + for i in ('/index.html', '') + for pw in ('', '/', '/*') + for s in ('', 's')] +sample_blocking = [{'pattern': pattern % (i if i > 1 else ''), + 'allow': bool(i & 1)} + for i, pattern in enumerate(sample_blocking)] + +# Even though patterns_query_manager.js is normally meant to run from background +# page, some tests can be as well performed running it from a normal page. +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_pqm_tree_building(driver, execute_in_page): + """ + patterns_query_manager.js tracks Haketilo's internal database and builds a + constantly-updated pattern tree based on its contents. Mock the database and + verify tree building works properly. + """ + execute_in_page(load_script('background/patterns_query_manager.js')) + # Mock IndexedDB and build patterns tree. + execute_in_page( + ''' + const [initial_mappings, initial_blocking] = arguments.slice(0, 2); + let mappingchange, blockingchange, settingchange; + + haketilodb.track.mapping = function (cb) { + mappingchange = cb; + + return [{}, initial_mappings]; + } + haketilodb.track.blocking = function (cb) { + blockingchange = cb; + + return [{}, initial_blocking]; + } + haketilodb.track.setting = function (cb) { + settingchange = cb; + + return [{}, [{name: "default_allow", value: true}]]; + } + + let last_script; + let unregister_called = 0; + async function register_mock(injection) + { + await new Promise(resolve => setTimeout(resolve, 1)); + last_script = injection.js[0].code; + return {unregister: () => unregister_called++}; + } + browser = {contentScripts: {register: register_mock}}; + + returnval(start("abracadabra")); + ''', + sample_mappings[0:2], sample_blocking[0:2]) + + found, tree, content_script, deregistrations = execute_in_page( + ''' + returnval([pqt.search(tree, arguments[0]).next().value, + tree, last_script, unregister_called]); + ''', + 'https://gotmyowndoma.in/index.html') + best_pattern = 'https://gotmyowndoma.in/index.html' + assert found == \ + dict([('~allow', 1), + *[(f'inject-{fruit}', {'identifier': f'{fruit}-{best_pattern}'}) + for fruit in ('banana', 'orange')]]) + cs_values = get_content_script_values(driver, content_script) + assert cs_values['haketilo_secret'] == 'abracadabra' + assert cs_values['haketilo_pattern_tree'] == tree + assert cs_values['haketilo_default_allow'] == True + assert deregistrations == 0 + + def condition_all_added(driver): + last_script = execute_in_page('returnval(last_script);') + cs_values = get_content_script_values(driver, last_script) + nums = [i for i in range(len(sample_blocking)) if i > 1] + return (cs_values['haketilo_default_allow'] == False and + all([('gotmyown%sdoma' % i) in last_script for i in nums]) and + all([m['identifier'] in last_script for m in sample_mappings])) + + execute_in_page( + '''{ + const new_setting_val = {name: "default_allow", value: false}; + settingchange({key: "default_allow", new_val: new_setting_val}); + for (const mapping of arguments[0]) + mappingchange({key: mapping.identifier, new_val: mapping}); + for (const blocking of arguments[1]) + blockingchange({key: blocking.pattern, new_val: blocking}); + }''', + sample_mappings[2:], sample_blocking[2:]) + WebDriverWait(driver, 10).until(condition_all_added) + + odd_mappings = \ + [m['identifier'] for i, m in enumerate(sample_mappings) if i & 1] + odd_blocking = \ + [b['pattern'] for i, b in enumerate(sample_blocking) if i & 1] + even_mappings = \ + [m['identifier'] for i, m in enumerate(sample_mappings) if 1 - i & 1] + even_blocking = \ + [b['pattern'] for i, b in enumerate(sample_blocking) if 1 - i & 1] + + def condition_odd_removed(driver): + last_script = execute_in_page('returnval(last_script);') + nums = [i for i in range(len(sample_blocking)) if i > 1 and 1 - i & 1] + return (all([id not in last_script for id in odd_mappings]) and + all([id in last_script for id in even_mappings]) and + all([p not in last_script for p in odd_blocking[1:]]) and + all([('gotmyown%sdoma' % i) in last_script for i in nums])) + + def condition_all_removed(driver): + content_script = execute_in_page('returnval(last_script);') + cs_values = get_content_script_values(driver, content_script) + return cs_values['haketilo_pattern_tree'] == {} + + execute_in_page( + ''' + arguments[0].forEach(identifier => mappingchange({key: identifier})); + arguments[1].forEach(pattern => blockingchange({key: pattern})); + ''', + odd_mappings, odd_blocking) + + WebDriverWait(driver, 10).until(condition_odd_removed) + + execute_in_page( + ''' + arguments[0].forEach(identifier => mappingchange({key: identifier})); + arguments[1].forEach(pattern => blockingchange({key: pattern})); + ''', + even_mappings, even_blocking) + + WebDriverWait(driver, 10).until(condition_all_removed) + + def condition_default_allowed_again(driver): + content_script = execute_in_page('returnval(last_script);') + cs_values = get_content_script_values(driver, content_script) + return cs_values['haketilo_default_allow'] == True + + execute_in_page( + '''{ + const new_setting_val = {name: "default_allow", value: true}; + settingchange({key: "default_allow", new_val: new_setting_val}); + }''') + + WebDriverWait(driver, 10).until(condition_default_allowed_again) + +content_js = ''' +let already_run = false; +this.haketilo_content_script_main = function() { + if (already_run) + return; + already_run = true; + document.documentElement.innerHTML = "<body><div id='tree-json'>"; + document.getElementById("tree-json").innerText = + JSON.stringify(this.haketilo_pattern_tree); +} +if (this.haketilo_pattern_tree !== undefined) + this.haketilo_content_script_main(); +''' + +def background_js(): + pqm_js = load_script('background/patterns_query_manager.js', + "#IMPORT background/broadcast_broker.js") + return pqm_js + '; broadcast_broker.start(); start();' + +@pytest.mark.ext_data({ + 'content_script': content_js, + 'background_script': background_js +}) +@pytest.mark.usefixtures('webextension') +def test_pqm_script_injection(driver, execute_in_page): + # Let's open a normal page in a second window. Window 0 will be used to make + # changes to IndexedDB and window 1 to test the working of content scripts. + driver.execute_script('window.open("about:blank", "_blank");') + WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2) + windows = [*driver.window_handles] + + def get_tree_json(driver): + return driver.execute_script( + ''' + return (document.getElementById("tree-json") || {}).innerText; + ''') + + def run_content_script(): + driver.switch_to.window(windows[1]) + driver.get('https://gotmyowndoma.in/index.html') + windows[1] = driver.current_window_handle + try: + return WebDriverWait(driver, 10).until(get_tree_json) + except TimeoutException: + pass + + for attempt in range(2): + json_txt = run_content_script() + if json_txt and json.loads(json_txt) == {}: + break; + assert attempt != 1 + + driver.switch_to.window(windows[0]) + execute_in_page(load_script('common/indexeddb.js')) + + sample_data = { + 'mapping': dict([(sm['identifier'], {'1.0': sm}) + for sm in sample_mappings]), + 'resource': {}, + 'file': {} + } + execute_in_page('returnval(save_items(arguments[0]));', sample_data) + + for attempt in range(2): + tree_json = run_content_script() or '{}' + json.loads(tree_json) + if all([m['identifier'] in tree_json for m in sample_mappings]): + break + assert attempt != 1 + + driver.switch_to.window(windows[0]) + execute_in_page( + '''{ + const identifiers = arguments[0]; + async function remove_items() + { + const ctx = await start_items_transaction(["mapping"], {}); + for (const id of identifiers) + await remove_mapping(id, ctx); + await finalize_transaction(ctx); + } + returnval(remove_items()); + }''', + [sm['identifier'] for sm in sample_mappings]) + + for attempt in range(2): + json_txt = run_content_script() + if json_txt and json.loads(json_txt) == {}: + break; + assert attempt != 1 diff --git a/test/haketilo_test/unit/test_patterns_query_tree.py b/test/haketilo_test/unit/test_patterns_query_tree.py new file mode 100644 index 0000000..80bf554 --- /dev/null +++ b/test/haketilo_test/unit/test_patterns_query_tree.py @@ -0,0 +1,474 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - URL patterns +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021, Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest + +from ..script_loader import load_script + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_modify_branch(execute_in_page): + """ + patterns_query_tree.js contains Pattern Tree data structure that allows + arrays of string labels to be mapped to items. + Verify operations modifying a single branch of such tree work properly. + """ + execute_in_page(load_script('common/patterns_query_tree.js')) + execute_in_page( + ''' + let items_added; + let items_removed; + + function _item_adder(item, array) + { + items_added++; + return [...(array || []), item]; + } + + function item_adder(item) + { + items_added = 0; + return array => _item_adder(item, array); + } + + function _item_remover(array) + { + if (array !== null) { + items_removed++; + array.pop(); + } + return (array && array.length > 0) ? array : null; + } + + function item_remover() + { + items_removed = 0; + return _item_remover; + }''') + + # Let's construct some tree branch while checking that each addition gives + # the right result. + branch = execute_in_page( + '''{ + const branch = empty_node(); + modify_sequence(branch, ['com', 'example'], item_adder('some_item')); + returnval(branch); + }''') + assert branch == { + 'literal_match': None, + 'wildcard_matches': [None, None, None], + 'children': { + 'com': { + 'literal_match': None, + 'wildcard_matches': [None, None, None], + 'children': { + 'example': { + 'literal_match': ['some_item'], + 'wildcard_matches': [None, None, None], + 'children': { + } + } + } + } + } + } + + branch, items_added = execute_in_page( + '''{ + const branch = arguments[0]; + modify_sequence(branch, ['com', 'example'], item_adder('other_item')); + returnval([branch, items_added]); + }''', branch) + assert items_added == 1 + assert branch['children']['com']['children']['example']['literal_match'] \ + == ['some_item', 'other_item'] + + for i in range(3): + for expected_array in [['third_item'], ['third_item', '4th_item']]: + wildcard = '*' * (i + 1) + branch, items_added = execute_in_page( + '''{ + const branch = arguments[0]; + modify_sequence(branch, ['com', 'sample', arguments[1]], + item_adder(arguments[2])); + returnval([branch, items_added]); + }''', + branch, wildcard, expected_array[-1]) + assert items_added == 2 + sample = branch['children']['com']['children']['sample'] + assert sample['wildcard_matches'][i] == expected_array + assert sample['children'][wildcard]['literal_match'] \ + == expected_array + + branch, items_added = execute_in_page( + '''{ + const branch = arguments[0]; + modify_sequence(branch, ['org', 'koszko', '***', '123'], + item_adder('5th_item')); + returnval([branch, items_added]); + }''', + branch) + assert items_added == 1 + assert branch['children']['org']['children']['koszko']['children']['***']\ + ['children']['123']['literal_match'] == ['5th_item'] + + # Let's verify that removing a nonexistent element doesn't modify the tree. + branch2, items_removed = execute_in_page( + '''{ + const branch = arguments[0]; + modify_sequence(branch, ['com', 'not', 'registered', '*'], + item_remover()); + returnval([branch, items_removed]); + }''', + branch) + assert branch == branch2 + assert items_removed == 0 + + # Let's remove all elements in the tree branch while checking that each + # removal gives the right result. + branch, items_removed = execute_in_page( + '''{ + const branch = arguments[0]; + modify_sequence(branch, ['org', 'koszko', '***', '123'], + item_remover()); + returnval([branch, items_removed]); + }''', + branch) + assert items_removed == 1 + assert 'org' not in branch['children'] + + for i in range(3): + for expected_array in [['third_item'], None]: + wildcard = '*' * (i + 1) + branch, items_removed = execute_in_page( + '''{ + const branch = arguments[0]; + modify_sequence(branch, ['com', 'sample', arguments[1]], + item_remover()); + returnval([branch, items_removed]); + }''', + branch, wildcard) + assert items_removed == 2 + if i == 2 and expected_array == []: + break + sample = branch['children']['com']['children'].get('sample', {}) + assert sample.get('wildcard_matches', [None, None, None])[i] \ + == expected_array + assert sample.get('children', {}).get(wildcard, {})\ + .get('literal_match') == expected_array + + for i in range(2): + branch, items_removed = execute_in_page( + '''{ + const branch = arguments[0]; + modify_sequence(branch, ['com', 'example'], item_remover()); + returnval([branch, items_removed]); + }''', + branch) + assert items_removed == 1 + if i == 0: + assert branch['children']['com']['children']['example']\ + ['literal_match'] == ['some_item'] + else: + assert branch == { + 'literal_match': None, + 'wildcard_matches': [None, None, None], + 'children': { + } + } + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_search_branch(execute_in_page): + """ + patterns_query_tree.js contains Pattern Tree data structure that allows + arrays of string labels to be mapped to items. + Verify searching a single branch of such tree work properly. + """ + execute_in_page(load_script('common/patterns_query_tree.js')) + execute_in_page( + ''' + const item_adder = item => (array => [...(array || []), item]); + ''') + + # Let's construct some tree branch to test on. + execute_in_page( + ''' + var branch = empty_node(); + + for (const [item, sequence] of [ + ['(root)', []], + ['***', ['***']], + ['**', ['**']], + ['*', ['*']], + + ['a', ['a']], + ['A', ['a']], + ['b', ['b']], + + ['a/***', ['a', '***']], + ['A/***', ['a', '***']], + ['a/**', ['a', '**']], + ['A/**', ['a', '**']], + ['a/*', ['a', '*']], + ['A/*', ['a', '*']], + ['a/sth', ['a', 'sth']], + ['A/sth', ['a', 'sth']], + + ['b/***', ['b', '***']], + ['b/**', ['b', '**']], + ['b/*', ['b', '*']], + ['b/sth', ['b', 'sth']], + ]) + modify_sequence(branch, sequence, item_adder(item)); + ''') + + # Let's make the actual searches on our testing branch. + for sequence, expected in [ + ([], [{'(root)'}, {'***'}]), + (['a'], [{'a', 'A'}, {'a/***', 'A/***'}, {'*'}, {'***'}]), + (['b'], [{'b'}, {'b/***'}, {'*'}, {'***'}]), + (['c'], [ {'*'}, {'***'}]), + (['***'], [{'***'}, {'*'} ]), + (['**'], [{'**'}, {'*'}, {'***'}]), + (['**'], [{'**'}, {'*'}, {'***'}]), + (['*'], [{'*'}, {'***'}]), + + (['a', 'sth'], [{'a/sth', 'A/sth'}, {'a/*', 'A/*'}, {'a/***', 'A/***'}, {'**'}, {'***'}]), + (['b', 'sth'], [{'b/sth'}, {'b/*'}, {'b/***'}, {'**'}, {'***'}]), + (['a', 'hts'], [ {'a/*', 'A/*'}, {'a/***', 'A/***'}, {'**'}, {'***'}]), + (['b', 'hts'], [ {'b/*'}, {'b/***'}, {'**'}, {'***'}]), + (['a', '***'], [{'a/***', 'A/***'}, {'a/*', 'A/*'}, {'**'}, {'***'}]), + (['b', '***'], [{'b/***'}, {'b/*'}, {'**'}, {'***'}]), + (['a', '**'], [{'a/**', 'A/**'}, {'a/*', 'A/*'}, {'a/***', 'A/***'}, {'**'}, {'***'}]), + (['b', '**'], [{'b/**'}, {'b/*'}, {'b/***'}, {'**'}, {'***'}]), + (['a', '*'], [{'a/*', 'A/*'}, {'a/***', 'A/***'}, {'**'}, {'***'}]), + (['b', '*'], [{'b/*'}, {'b/***'}, {'**'}, {'***'}]), + + (['a', 'c', 'd'], [{'a/**', 'A/**'}, {'a/***', 'A/***'}, {'**'}, {'***'}]), + (['b', 'c', 'd'], [{'b/**'}, {'b/***'}, {'**'}, {'***'}]) + ]: + result = execute_in_page( + ''' + returnval([...search_sequence(branch, arguments[0])]); + ''', + sequence) + + try: + assert len(result) == len(expected) + + for expected_set, result_array in zip(expected, result): + assert len(expected_set) == len(result_array) + assert expected_set == set(result_array) + except Exception as e: + import sys + print('sequence:', sequence, '\nexpected:', expected, + '\nresult:', result, file=sys.stderr) + raise e from None + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_pattern_tree(execute_in_page): + """ + patterns_query_tree.js contains Pattern Tree data structure that allows + arrays of string labels to be mapped to items. + Verify operations on entire such tree work properly. + """ + execute_in_page(load_script('common/patterns_query_tree.js')) + + # Perform tests with all possible patterns for a simple URL. + url = 'https://example.com' + patterns = [ + 'https://example.com', + 'https://example.com/***', + 'https://***.example.com', + 'https://***.example.com/***' + ] + bad_patterns = [ + 'http://example.com', + 'https://a.example.com', + 'https://*.example.com', + 'https://**.example.com', + 'https://example.com/a', + 'https://example.com/*', + 'https://example.com/**', + ] + + expected = [{'key': p} for p in patterns] + + tree, result = execute_in_page( + '''{ + const tree = pattern_tree_make(); + for (const pattern of arguments[0].concat(arguments[1])) { + pattern_tree_register(tree, pattern, 'key', pattern); + pattern_tree_register(tree, pattern + '/', 'key', pattern + '/'); + } + returnval([tree, [...pattern_tree_search(tree, arguments[2])]]); + }''', + patterns, bad_patterns, url) + assert expected == result + + # Also verify that deregistering half of the good patterns works correctly. + patterns_removed = [pattern for i, pattern in enumerate(patterns) if i % 2] + patterns = [pattern for i, pattern in enumerate(patterns) if not (i % 2)] + expected = [{'key': p} for p in patterns] + tree, result = execute_in_page( + '''{ + const tree = arguments[0]; + for (const pattern of arguments[1]) { + pattern_tree_deregister(tree, pattern, 'key'); + pattern_tree_deregister(tree, pattern + '/', 'key'); + } + returnval([tree, [...pattern_tree_search(tree, arguments[2])]]); + }''', + tree, patterns_removed, url) + assert expected == result + + # Also verify that deregistering all the patterns works correctly. + tree = execute_in_page( + '''{ + const tree = arguments[0]; + for (const pattern of arguments[1].concat(arguments[2])) { + pattern_tree_deregister(tree, pattern, 'key'); + pattern_tree_deregister(tree, pattern + '/', 'key'); + } + returnval(tree); + }''', + tree, patterns, bad_patterns) + assert tree == {} + + # Perform tests with all possible patterns for a complex URL. + url = 'http://settings.query.example.com/google/tries/destroy/adblockers//' + patterns = [ + 'http://settings.query.example.com/google/tries/destroy/adblockers', + 'http://settings.query.example.com/google/tries/destroy/adblockers/***', + 'http://settings.query.example.com/google/tries/destroy/*', + 'http://settings.query.example.com/google/tries/destroy/***', + 'http://settings.query.example.com/google/tries/**', + 'http://settings.query.example.com/google/tries/***', + 'http://settings.query.example.com/google/**', + 'http://settings.query.example.com/google/***', + 'http://settings.query.example.com/**', + 'http://settings.query.example.com/***', + + 'http://***.settings.query.example.com/google/tries/destroy/adblockers', + 'http://***.settings.query.example.com/google/tries/destroy/adblockers/***', + 'http://***.settings.query.example.com/google/tries/destroy/*', + 'http://***.settings.query.example.com/google/tries/destroy/***', + 'http://***.settings.query.example.com/google/tries/**', + 'http://***.settings.query.example.com/google/tries/***', + 'http://***.settings.query.example.com/google/**', + 'http://***.settings.query.example.com/google/***', + 'http://***.settings.query.example.com/**', + 'http://***.settings.query.example.com/***', + 'http://*.query.example.com/google/tries/destroy/adblockers', + 'http://*.query.example.com/google/tries/destroy/adblockers/***', + 'http://*.query.example.com/google/tries/destroy/*', + 'http://*.query.example.com/google/tries/destroy/***', + 'http://*.query.example.com/google/tries/**', + 'http://*.query.example.com/google/tries/***', + 'http://*.query.example.com/google/**', + 'http://*.query.example.com/google/***', + 'http://*.query.example.com/**', + 'http://*.query.example.com/***', + 'http://***.query.example.com/google/tries/destroy/adblockers', + 'http://***.query.example.com/google/tries/destroy/adblockers/***', + 'http://***.query.example.com/google/tries/destroy/*', + 'http://***.query.example.com/google/tries/destroy/***', + 'http://***.query.example.com/google/tries/**', + 'http://***.query.example.com/google/tries/***', + 'http://***.query.example.com/google/**', + 'http://***.query.example.com/google/***', + 'http://***.query.example.com/**', + 'http://***.query.example.com/***', + 'http://**.example.com/google/tries/destroy/adblockers', + 'http://**.example.com/google/tries/destroy/adblockers/***', + 'http://**.example.com/google/tries/destroy/*', + 'http://**.example.com/google/tries/destroy/***', + 'http://**.example.com/google/tries/**', + 'http://**.example.com/google/tries/***', + 'http://**.example.com/google/**', + 'http://**.example.com/google/***', + 'http://**.example.com/**', + 'http://**.example.com/***', + 'http://***.example.com/google/tries/destroy/adblockers', + 'http://***.example.com/google/tries/destroy/adblockers/***', + 'http://***.example.com/google/tries/destroy/*', + 'http://***.example.com/google/tries/destroy/***', + 'http://***.example.com/google/tries/**', + 'http://***.example.com/google/tries/***', + 'http://***.example.com/google/**', + 'http://***.example.com/google/***', + 'http://***.example.com/**', + 'http://***.example.com/***' + ] + bad_patterns = [ + 'https://settings.query.example.com/google/tries/destroy/adblockers', + 'http://settings.query.example.com/google/tries/destroy/adblockers/a', + 'http://settings.query.example.com/google/tries/destroy/adblockers/*', + 'http://settings.query.example.com/google/tries/destroy/adblockers/**', + 'http://settings.query.example.com/google/tries/destroy/a', + 'http://settings.query.example.com/google/tries/destroy/**', + 'http://settings.query.example.com/google/tries/*', + 'http://a.settings.query.example.com/google/tries/destroy/adblockers', + 'http://*.settings.query.example.com/google/tries/destroy/adblockers', + 'http://**.settings.query.example.com/google/tries/destroy/adblockers', + 'http://a.query.example.com/google/tries/destroy/adblockers', + 'http://**.query.example.com/google/tries/destroy/adblockers', + 'http://*.example.com/google/tries/destroy/adblockers' + ] + + expected = [{'key': p + s} for p in patterns for s in ['/', '']] + + tree, result = execute_in_page( + '''{ + const tree = pattern_tree_make(); + for (const pattern of arguments[0].concat(arguments[1])) { + pattern_tree_register(tree, pattern, 'key', pattern); + pattern_tree_register(tree, pattern + '/', 'key', pattern + '/'); + } + returnval([tree, [...pattern_tree_search(tree, arguments[2])]]); + }''', + patterns, bad_patterns, url) + assert expected == result + + # Also verify that deregistering all patterns with trailing slash works + # correctly. + expected = [{'key': p} for p in patterns] + tree, result = execute_in_page( + '''{ + const tree = arguments[0]; + for (const pattern of arguments[1]) + pattern_tree_deregister(tree, pattern + '/', 'key'); + returnval([tree, [...pattern_tree_search(tree, arguments[2])]]); + }''', + tree, patterns, url) + assert expected == result + + # Also verify that deregistering all the patterns works correctly. + tree = execute_in_page( + '''{ + const tree = arguments[0]; + for (const pattern of arguments[1]) + pattern_tree_deregister(tree, pattern, 'key'); + for (const pattern of arguments[2]) { + pattern_tree_deregister(tree, pattern, 'key'); + pattern_tree_deregister(tree, pattern + '/', 'key'); + } + returnval(tree); + }''', + tree, patterns, bad_patterns) + assert tree == {} diff --git a/test/haketilo_test/unit/test_payload_create.py b/test/haketilo_test/unit/test_payload_create.py new file mode 100644 index 0000000..9689c37 --- /dev/null +++ b/test/haketilo_test/unit/test_payload_create.py @@ -0,0 +1,248 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - using a form to create simple site payload +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import re +from hashlib import sha256 + +from selenium.webdriver.support.ui import WebDriverWait + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +uuidv4_re = re.compile( + r'^[0-9A-F]{8}-[0-9A-F]{4}-4[0-9A-F]{3}-[89AB][0-9A-F]{3}-[0-9A-F]{12}$', + re.IGNORECASE +) + +sample_patterns = ''' +http://example.com/*** + +https://*.example.org/**''' + +sample_form_data = { + 'identifier': 'someid', + 'long_name': 'Some Name', + 'description': 'blah blah blah', + 'patterns': sample_patterns, + 'script': sample_files['hello.js']['contents'] +} + +def fill_form_with_sample_data(execute_in_page, sample_data_override={}, + form_ctx='form_ctx'): + form_data = sample_form_data.copy() + form_data.update(sample_data_override) + execute_in_page( + f''' + for (const [key, value] of Object.entries(arguments[0])) + {form_ctx}[key].value = value; + ''', + form_data) + return form_data + +cleared_form_inputs = { + 'identifier': '', + 'long_name': '', + 'description': '', + 'patterns': 'https://example.com/***', + 'script': 'console.log("Hello, World!");' +} +def assert_form_contents(execute_in_page, inputs=cleared_form_inputs): + inputs_keys = [*inputs.keys()] + values = execute_in_page( + 'returnval(arguments[0].map(i => form_ctx[i].value));', + inputs_keys + ) + for key, value in zip(inputs_keys, values): + assert inputs[key] == value + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/payload_create.html', {}), + 'navigate_to': 'html/payload_create.html' +}) +@pytest.mark.usefixtures('webextension') +def test_payload_create_normal_usage(driver, execute_in_page): + """ + A test case of normal usage of simple payload creation form. + """ + execute_in_page(load_script('html/payload_create.js')) + + create_but, form_container, dialog_container = execute_in_page( + ''' + const form_ctx = payload_create_form(); + document.body.append(form_ctx.main_div); + returnval([form_ctx.create_but, form_ctx.form_container, + form_ctx.dialog_container]); + ''') + + assert patterns_doc_url == \ + driver.find_element_by_link_text('URL patterns').get_attribute('href') + + assert form_container.is_displayed() + assert not dialog_container.is_displayed() + + assert_form_contents(execute_in_page) + + form_data = fill_form_with_sample_data(execute_in_page) + + create_but.click() + + assert not form_container.is_displayed() + assert dialog_container.is_displayed() + + def success_reported(driver): + return 'Successfully saved payload' in dialog_container.text + + WebDriverWait(driver, 10).until(success_reported) + execute_in_page('form_ctx.dialog_ctx.ok_but.click();') + + assert form_container.is_displayed() + assert not dialog_container.is_displayed() + + def assert_db_contents(): + db_contents = get_db_contents(execute_in_page) + + assert uuidv4_re.match(db_contents['resource'][0]['uuid']) + + localid = f'local-{form_data["identifier"]}' + long_name = form_data['long_name'] or form_data['identifier'] + payloads = dict([(pat, {'identifier': localid}) + for pat in form_data['patterns'].split('\n') if pat]) + + assert db_contents['resource'] == [{ + 'source_name': localid, + 'source_copyright': [], + 'type': 'resource', + 'identifier': localid, + 'uuid': db_contents['resource'][0]['uuid'], + 'version': [1], + 'description': form_data['description'], + 'dependencies': [], + 'long_name': long_name, + 'scripts': [{ + 'file': 'payload.js', + 'sha256': sha256(form_data['script'].encode()).digest().hex() + }] + }] + + assert uuidv4_re.match(db_contents['mapping'][0]['uuid']) + assert db_contents['mapping'] == [{ + 'source_name': localid, + 'source_copyright': [], + 'type': 'mapping', + 'identifier': localid, + 'uuid': db_contents['mapping'][0]['uuid'], + 'version': [1], + 'description': form_data['description'], + 'long_name': long_name, + 'payloads': payloads + }] + + assert_db_contents() + + form_data = fill_form_with_sample_data(execute_in_page, { + 'long_name': '', + 'description': 'bam bam bam', + 'patterns': 'https://new.example.com/***', + 'script': sample_files['bye.js']['contents'] + }) + + create_but.click() + + for type in ('Resource', 'Mapping'): + def override_asked(driver): + return f"{type} 'local-someid' already exists. Override?" \ + in dialog_container.text + WebDriverWait(driver, 10).until(override_asked) + execute_in_page('form_ctx.dialog_ctx.yes_but.click();') + + assert_db_contents() + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/payload_create.html', {}), + 'navigate_to': 'html/payload_create.html' +}) +@pytest.mark.usefixtures('webextension') +def test_payload_create_errors(driver, execute_in_page): + """ + A test case of various error the simple payload form might show. + """ + execute_in_page(load_script('html/payload_create.js')) + + create_but, dialog_container = execute_in_page( + ''' + const form_ctx = payload_create_form(); + document.body.append(form_ctx.main_div); + returnval([form_ctx.create_but, form_ctx.dialog_container]); + ''') + + for data_override, expected_msg in [ + ({'identifier': ''}, "The 'identifier' field is required!"), + ({'identifier': ':('}, 'Identifier may only contain '), + ({'script': ''}, "The 'script' field is required!"), + ({'patterns': ''}, "The 'URL patterns' field is required!"), + ({'patterns': ':d'}, "':d' is not a valid URL pattern. See here for more details."), + ({'patterns': '\n'.join(['http://example.com'] * 2)}, + "Pattern 'http://example.com' specified multiple times!") + ]: + # Attempt creating the payload + form_data = fill_form_with_sample_data(execute_in_page, data_override) + create_but.click() + # Verify the error message + assert expected_msg in dialog_container.text + + # Verify patterns documentation <a> link. + if expected_msg == {'patterns': ':d'}: + doc_link_elem = driver.find_element_by_link_text('here') + assert doc_link.get_attribute('href') == patterns_doc_url + + # Verify the form was NOT cleared upon failed saving. + execute_in_page('form_ctx.dialog_ctx.ok_but.click();') + assert_form_contents(execute_in_page, form_data) + + # Add a sample item and attempt overriding it. + fill_form_with_sample_data(execute_in_page) + create_but.click() + WebDriverWait(driver, 10).until(lambda _: 'Succes' in dialog_container.text) + execute_in_page('form_ctx.dialog_ctx.ok_but.click();') + + # Verify that denying override leads to saving failure. + form_data = fill_form_with_sample_data(execute_in_page) + create_but.click() + WebDriverWait(driver, 10).until(lambda _: 'Overri' in dialog_container.text) + execute_in_page('form_ctx.dialog_ctx.no_but.click();') + assert 'Failed to save payload :(' in dialog_container.text + execute_in_page('form_ctx.dialog_ctx.ok_but.click();') + assert_form_contents(execute_in_page, form_data) + + # Verify that IndexedDB errors get caught and reported as saving failures. + execute_in_page('haketilodb.get = async () => {throw "someerror";}') + form_data = fill_form_with_sample_data(execute_in_page, {'identifier': 'o'}) + create_but.click() + WebDriverWait(driver, 10).until(lambda _: 'Failed' in dialog_container.text) + execute_in_page('form_ctx.dialog_ctx.ok_but.click();') + assert_form_contents(execute_in_page, form_data) + + # Verify that the loading message gets shown during IndexedDB operations. + execute_in_page('haketilodb.get = () => new Promise(cb => null);') + create_but.click() + assert 'Saving payload...' in dialog_container.text diff --git a/test/haketilo_test/unit/test_policy_deciding.py b/test/haketilo_test/unit/test_policy_deciding.py new file mode 100644 index 0000000..75b35ac --- /dev/null +++ b/test/haketilo_test/unit/test_policy_deciding.py @@ -0,0 +1,135 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - determining what to do on a given web page +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021, Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import re +from hashlib import sha256 +import pytest + +from ..script_loader import load_script + +csp_re = re.compile(r'^\S+\s+\S+;(?:\s+\S+\s+\S+;)*$') +rule_re = re.compile(r'^\s*(?P<src_kind>\S+)\s+(?P<allowed_origins>\S+)$') +def parse_csp(csp): + ''' + Parsing of CSP string into a dict. A simplified format of CSP is assumed. + ''' + assert csp_re.match(csp) + + result = {} + + for rule in csp.split(';')[:-1]: + match = rule_re.match(rule) + result[match.group('src_kind')] = match.group('allowed_origins') + + return result + +@pytest.mark.get_page('https://gotmyowndoma.in') +def test_decide_policy(execute_in_page): + """ + policy.js contains code that, using a Pattern Query Tree instance and a URL, + decides what Haketilo should do on a page opened at that URL, i.e. whether + it should block or allow script execution and whether it should inject its + own scripts and which ones. Test that the policy object gets constructed + properly. + """ + execute_in_page(load_script('common/policy.js')) + + policy = execute_in_page( + ''' + returnval(decide_policy(pqt.make(), "http://unkno.wn/", true, "abcd")); + ''') + assert policy['allow'] == True + for prop in ('mapping', 'payload', 'nonce', 'csp', 'error'): + assert prop not in policy + + policy = execute_in_page( + '''{ + const tree = pqt.make(); + pqt.register(tree, "http://kno.wn", "~allow", 1); + returnval(decide_policy(tree, "http://kno.wn/", false, "abcd")); + }''') + assert policy['allow'] == True + assert policy['mapping'] == '~allow' + for prop in ('payload', 'nonce', 'csp', 'error'): + assert prop not in policy + + policy = execute_in_page( + ''' + returnval(decide_policy(pqt.make(), "http://unkno.wn/", false, "abcd")); + ''' + ) + assert policy['allow'] == False + for prop in ('mapping', 'payload', 'nonce', 'error'): + assert prop not in policy + assert parse_csp(policy['csp']) == { + 'prefetch-src': "'none'", + 'script-src-attr': "'none'", + 'script-src': "'none'", + 'script-src-elem': "'none'" + } + + policy = execute_in_page( + '''{ + const tree = pqt.make(); + pqt.register(tree, "http://kno.wn", "~allow", 0); + returnval(decide_policy(tree, "http://kno.wn/", true, "abcd")); + }''') + assert policy['allow'] == False + assert policy['mapping'] == '~allow' + for prop in ('payload', 'nonce', 'error'): + assert prop not in policy + assert parse_csp(policy['csp']) == { + 'prefetch-src': "'none'", + 'script-src-attr': "'none'", + 'script-src': "'none'", + 'script-src-elem': "'none'" + } + + policy = execute_in_page( + '''{ + const tree = pqt.make(); + pqt.register(tree, "http://kno.wn", "m1", {identifier: "res1"}); + returnval(decide_policy(tree, "http://kno.wn/", true, "abcd")); + }''') + assert policy['allow'] == False + assert policy['mapping'] == 'm1' + assert policy['payload'] == {'identifier': 'res1'} + assert 'error' not in policy + assert policy['nonce'] == \ + sha256('m1:res1:http://kno.wn/:abcd'.encode()).digest().hex() + assert parse_csp(policy['csp']) == { + 'prefetch-src': f"'none'", + 'script-src-attr': f"'none'", + 'script-src': f"'nonce-{policy['nonce']}'", + 'script-src-elem': f"'nonce-{policy['nonce']}'" + } + + policy = execute_in_page( + 'returnval(decide_policy(pqt.make(), "<bad_url>", true, "abcd"));' + ) + assert policy['allow'] == False + assert policy['error'] == {'haketilo_error_type': 'deciding_policy'} + for prop in ('mapping', 'payload', 'nonce'): + assert prop not in policy + assert parse_csp(policy['csp']) == { + 'prefetch-src': "'none'", + 'script-src-attr': "'none'", + 'script-src': "'none'", + 'script-src-elem': "'none'" + } diff --git a/test/haketilo_test/unit/test_policy_enforcing.py b/test/haketilo_test/unit/test_policy_enforcing.py new file mode 100644 index 0000000..4b7c173 --- /dev/null +++ b/test/haketilo_test/unit/test_policy_enforcing.py @@ -0,0 +1,114 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - enforcing script blocking policy from content script +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +import urllib.parse +from selenium.webdriver.support.ui import WebDriverWait + +from ..script_loader import load_script +from .utils import are_scripts_allowed + +# For simplicity, we'll use one nonce in all test cases. +nonce = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' + +allow_policy = {'allow': True} +block_policy = { + 'allow': False, + 'csp': f"prefetch-src 'none'; script-src-attr 'none'; script-src 'none'; script-src-elem 'none'; frame-src http://* https://*;" +} +payload_policy = { + 'mapping': 'somemapping', + 'payload': {'identifier': 'someresource'}, + 'csp': f"prefetch-src 'none'; script-src-attr 'none'; script-src 'nonce-{nonce}'; script-src-elem 'nonce-{nonce}';" +} + +content_script = load_script('content/policy_enforcing.js') + ''';{ +const smuggled_what_to_do = /^[^#]*#?(.*)$/.exec(document.URL)[1]; +const what_to_do = smuggled_what_to_do === "" ? {policy: {allow: true}} : + JSON.parse(decodeURIComponent(smuggled_what_to_do)); + +if (what_to_do.csp_off) { + const orig_DOMParser = window.DOMParser; + window.DOMParser = function() { + const parser = new orig_DOMParser(); + this.parseFromString = () => parser.parseFromString('', 'text/html'); + } +} + +enforce_blocking(what_to_do.policy); +}''' + +def get(driver, page, what_to_do): + driver.get(page + '#' + urllib.parse.quote(json.dumps(what_to_do))) + driver.execute_script('window.before_reload = true; location.reload();') + done = lambda _: not driver.execute_script('return window.before_reload;') + WebDriverWait(driver, 10).until(done) + +@pytest.mark.ext_data({'content_script': content_script}) +@pytest.mark.usefixtures('webextension') +# Under Mozilla we use several mechanisms of script blocking. Some serve as +# fallbacks in case others break. CSP one of those mechanisms. Here we run the +# test once with CSP blocking on and once without it. This allows us to verify +# that the CSP-less blocking approaches by themselves also work. We don't do the +# reverse (CSP on and other mechanisms off) because CSP rules added through +# <meta> injection are not reliable enough - they do not always take effect +# immediately and there's nothing we can do to fix it. +@pytest.mark.parametrize('csp_off_setting', [{}, {'csp_off': True}]) +def test_policy_enforcing_html(driver, execute_in_page, csp_off_setting): + """ + A test case of sanitizing <script>s and intrinsic javascript in pages. + """ + # First, see if scripts run when not blocked. + get(driver, 'https://gotmyowndoma.in/scripts_to_block_1.html', { + 'policy': allow_policy, + **csp_off_setting + }) + + for i in range(1, 3): + driver.find_element_by_id(f'clickme{i}').click() + + assert set(driver.execute_script('return window.__run || [];')) == \ + {'inline', 'on', 'href', 'src', 'data'} + assert are_scripts_allowed(driver) + + # Now, verify scripts don't run when blocked. + get(driver, 'https://gotmyowndoma.in/scripts_to_block_1.html', { + 'policy': block_policy, + **csp_off_setting + }) + + for i in range(1, 3): + driver.find_element_by_id(f'clickme{i}').click() + + assert set(driver.execute_script('return window.__run || [];')) == set() + assert bool(csp_off_setting) == are_scripts_allowed(driver) + + # Now, verify only scripts with nonce can run when payload is injected. + get(driver, 'https://gotmyowndoma.in/scripts_to_block_1.html', { + 'policy': payload_policy, + **csp_off_setting + }) + + for i in range(1, 3): + driver.find_element_by_id(f'clickme{i}').click() + + assert set(driver.execute_script('return window.__run || [];')) == set() + assert bool(csp_off_setting) == are_scripts_allowed(driver) + assert are_scripts_allowed(driver, nonce) diff --git a/test/haketilo_test/unit/test_popup.py b/test/haketilo_test/unit/test_popup.py new file mode 100644 index 0000000..1fc262c --- /dev/null +++ b/test/haketilo_test/unit/test_popup.py @@ -0,0 +1,257 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - repository querying +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +from selenium.webdriver.support.ui import WebDriverWait + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +unprivileged_page_info = { + 'url': 'https://example_a.com/something', + 'allow': False +} + +mapping_page_info = { + **unprivileged_page_info, + 'mapping': 'm1', + 'payload': {'identifier': 'res1'} +} + +mocked_page_infos = { + 'privileged': { + 'url': 'moz-extension://<some-id>/file.html', + 'privileged': True + }, + 'blocked_default': unprivileged_page_info, + 'allowed_default': { + **unprivileged_page_info, + 'allow': True + }, + 'blocked_rule': { + **unprivileged_page_info, + 'mapping': '~allow' + }, + 'allowed_rule': { + **unprivileged_page_info, + 'allow': True, + 'mapping': '~allow' + }, + 'mapping': mapping_page_info, + 'error_deciding_policy': { + **mapping_page_info, + 'error': {'haketilo_error_type': 'deciding_policy'} + }, + 'error_missing': { + **mapping_page_info, + 'error': {'haketilo_error_type': 'missing', 'id': 'some-missing-res'} + }, + 'error_circular': { + **mapping_page_info, + 'error': {'haketilo_error_type': 'circular', 'id': 'some-circular-res'} + }, + 'error_db': { + **mapping_page_info, + 'error': {'haketilo_error_type': 'db'} + }, + 'error_other': { + **mapping_page_info, + 'error': {'haketilo_error_type': 'other'} + } +} + +tab_mock_js = ''' +; +const mocked_page_info = (%s)[/#mock_page_info-(.*)$/.exec(document.URL)[1]]; +browser.tabs.sendMessage = async function(tab_id, msg) { + const this_tab_id = (await browser.tabs.getCurrent()).id; + if (tab_id !== this_tab_id) + throw `not current tab id (${tab_id} instead of ${this_tab_id})`; + + if (msg[0] === "page_info") { + return mocked_page_info; + } else if (msg[0] === "repo_query") { + const response = await fetch(msg[1]); + if (!response) + return {error: "Something happened :o"}; + + const result = {ok: response.ok, status: response.status}; + try { + result.json = await response.json(); + } catch(e) { + result.error_json = "" + e; + } + return result; + } else { + throw `bad sendMessage message type: '${msg[0]}'`; + } +} + +const old_tabs_query = browser.tabs.query; +browser.tabs.query = async function(query) { + const tabs = await old_tabs_query(query); + tabs.forEach(t => t.url = mocked_page_info.url); + return tabs; +} +''' % json.dumps(mocked_page_infos) + +popup_ext_data = { + 'background_script': broker_js, + 'extra_html': ExtraHTML( + 'html/popup.html', + { + 'common/browser.js': tab_mock_js, + 'common/indexeddb.js': '; set_repo("https://hydril.la/");' + }, + wrap_into_htmldoc=False + ), + 'navigate_to': 'html/popup.html' +} + +@pytest.mark.ext_data(popup_ext_data) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('page_info_key', ['', *mocked_page_infos.keys()]) +def test_popup_display(driver, execute_in_page, page_info_key): + """ + Test popup viewing while on a page. Test parametrized with different + possible values of page_info object passed in message from the content + script. + """ + initial_url = driver.current_url + driver.get('about:blank') + driver.get(f'{initial_url}#mock_page_info-{page_info_key}') + + by_id = driver.execute_script( + ''' + const nodes = [...document.querySelectorAll("[id]")]; + const reductor = (ob, node) => Object.assign(ob, {[node.id]: node}); + return nodes.reduce(reductor, {}); + ''') + + if page_info_key == '': + error_msg = 'Page info not avaialable. Try reloading the page.' + error_msg_shown = lambda d: by_id['loading_info'].text == error_msg + WebDriverWait(driver, 10).until(error_msg_shown) + return + + WebDriverWait(driver, 10).until(lambda d: by_id['info_form'].is_displayed()) + assert (page_info_key == 'privileged') == \ + by_id['privileged_page_info'].is_displayed() + assert (page_info_key == 'privileged') ^ \ + by_id['unprivileged_page_info'].is_displayed() + assert by_id['page_url'].text == mocked_page_infos[page_info_key]['url'] + assert not by_id['repo_query_container'].is_displayed() + + if 'allow' in page_info_key: + assert by_id['scripts_blocked'].text.lower() == 'no' + elif page_info_key != 'privileged': + assert by_id['scripts_blocked'].text.lower() == 'yes' + + payload_text = by_id['injected_payload'].text + if page_info_key == 'mapping': + assert payload_text == 'res1' + elif page_info_key == 'error_missing': + assert payload_text == \ + "None (error: resource with id 'some-missing-res' missing from the database)" + elif page_info_key == 'error_circular': + assert payload_text == \ + "None (error: circular dependency of resource with id 'some-circular-res' on itself)" + elif page_info_key == 'error_db': + assert payload_text == \ + 'None (error: failure reading Haketilo internal database)' + elif page_info_key == 'error_other': + assert payload_text == \ + 'None (error: unknown failure occured)' + elif page_info_key != 'privileged': + assert payload_text == 'None' + + mapping_text = by_id['mapping_used'].text + + if page_info_key == 'error_deciding_policy': + assert mapping_text == 'None (error occured when determining policy)' + elif page_info_key == 'mapping' or page_info_key.startswith('error'): + assert mapping_text == 'm1' + + if 'allowed' in page_info_key: + assert 'None (scripts allowed by' in mapping_text + elif 'blocked' in page_info_key: + assert 'None (scripts blocked by' in mapping_text + + if 'rule' in page_info_key: + assert 'by a rule)' in mapping_text + elif 'default' in page_info_key: + assert 'by default policy)' in mapping_text + +@pytest.mark.ext_data(popup_ext_data) +@pytest.mark.usefixtures('webextension') +def test_popup_repo_query(driver, execute_in_page): + """ + Test opening and closing the repo query view in popup. + """ + initial_url = driver.current_url + driver.get('about:blank') + driver.get(f'{initial_url}#mock_page_info-blocked_rule') + + search_but = driver.find_element_by_id("search_resources_but") + WebDriverWait(driver, 10).until(lambda d: search_but.is_displayed()) + search_but.click() + + containers = dict([(name, driver.find_element_by_id(f'{name}_container')) + for name in ('page_info', 'repo_query')]) + assert not containers['page_info'].is_displayed() + assert containers['repo_query'].is_displayed() + shown = lambda d: 'https://hydril.la/' in containers['repo_query'].text + WebDriverWait(driver, 10).until(shown) + + # Click the "Show results" button. + selector = '.repo_query_buttons > button:first-child' + driver.find_element_by_css_selector(selector).click() + shown = lambda d: 'MAPPING_A' in containers['repo_query'].text + WebDriverWait(driver, 10).until(shown) + + # Click the "Cancel" button + selector = '.repo_query_bottom_buttons > button' + driver.find_element_by_css_selector(selector).click() + assert containers['page_info'].is_displayed() + assert not containers['repo_query'].is_displayed() + +@pytest.mark.ext_data(popup_ext_data) +@pytest.mark.usefixtures('webextension') +# Under Parabola's Iceweasel 75 the settings page's window opened during this +# test is impossible to close using driver.close() - it raises an exception with +# message 'closeTab() not supported in iceweasel'. To avoid such error during +# test cleanup, we use the mark below to tell our driver fixture to span a +# separate browser instance for this test. +@pytest.mark.second_driver() +def test_popup_settings_opening(driver, execute_in_page): + """ + Test opening the settings page from popup through button click. + """ + driver.find_element_by_id("settings_but").click() + + first_handle = driver.current_window_handle + WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2) + new_handle = [h for h in driver.window_handles if h != first_handle][0] + + driver.switch_to.window(new_handle) + driver.implicitly_wait(10) + assert "Extension's options page for testing" in \ + driver.find_element_by_tag_name("h1").text diff --git a/test/haketilo_test/unit/test_repo_query.py b/test/haketilo_test/unit/test_repo_query.py new file mode 100644 index 0000000..c8c4875 --- /dev/null +++ b/test/haketilo_test/unit/test_repo_query.py @@ -0,0 +1,274 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - repository querying +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +from selenium.webdriver.support.ui import WebDriverWait + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +repo_urls = [f'https://hydril.la/{s}' for s in ('', '1/', '2/', '3/', '4/')] + +queried_url = 'https://example_a.com/something' + +def setup_view(execute_in_page, repo_urls): + mock_cacher(execute_in_page) + + execute_in_page(load_script('html/repo_query.js')) + execute_in_page( + ''' + const repo_proms = arguments[0].map(url => haketilodb.set_repo(url)); + + const cb_calls = []; + const view = new RepoQueryView(0, + () => cb_calls.push("show"), + () => cb_calls.push("hide")); + document.body.append(view.main_div); + const shw = slice => [cb_calls.slice(slice || 0), view.shown]; + + returnval(Promise.all(repo_proms)); + ''', + repo_urls) + +repo_query_ext_data = { + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/repo_query.html', {}), + 'navigate_to': 'html/repo_query.html' +} + +@pytest.mark.ext_data(repo_query_ext_data) +@pytest.mark.usefixtures('webextension') +def test_repo_query_normal_usage(driver, execute_in_page): + """ + Test of using the repo query view to browse results from repository and to + start installation. + """ + setup_view(execute_in_page, repo_urls) + + assert execute_in_page('returnval(shw());') == [[], False] + + execute_in_page('view.show(arguments[0]);', queried_url) + + assert execute_in_page('returnval(shw());') == [['show'], True] + + def get_repo_entries(driver): + return execute_in_page( + f'returnval((view.repo_entries || []).map({nodes_props_code}));' + ) + + repo_entries = WebDriverWait(driver, 10).until(get_repo_entries) + + assert len(repo_urls) == len(repo_entries) + + for url, entry in reversed(list(zip(repo_urls, repo_entries))): + assert url in entry['main_li'].text + + but_ids = ('show_results_but', 'hide_results_but') + for but_idx in (0, 1, 0): + assert bool(but_idx) == entry['list_container'].is_displayed() + + assert not entry[but_ids[1 - but_idx]].is_displayed() + + entry[but_ids[but_idx]].click() + + def get_mapping_entries(driver): + return execute_in_page( + f'''{{ + const result_entries = (view.repo_entries[0].result_entries || []); + returnval(result_entries.map({nodes_props_code})); + }}''') + + mapping_entries = WebDriverWait(driver, 10).until(get_mapping_entries) + + assert len(mapping_entries) == 3 + + expected_names = ['MAPPING_ABCD', 'MAPPING_ABCD-DEFG-GHIJ', 'MAPPING_A'] + + for name, entry in zip(expected_names, mapping_entries): + assert entry['mapping_name'].text == name + assert entry['mapping_id'].text == f'{name.lower()}-2022.5.11' + + containers = execute_in_page( + '''{ + const reductor = (acc, k) => Object.assign(acc, {[k]: view[k]}); + returnval(container_ids.reduce(reductor, {})); + }''') + + for id, container in containers.items(): + assert (id == 'repos_list_container') == container.is_displayed() + + entry['install_but'].click() + + for id, container in containers.items(): + assert (id == 'install_view_container') == container.is_displayed() + + execute_in_page('returnval(view.install_view.cancel_but);').click() + + for id, container in containers.items(): + assert (id == 'repos_list_container') == container.is_displayed() + + assert execute_in_page('returnval(shw());') == [['show'], True] + execute_in_page('returnval(view.cancel_but);').click() + assert execute_in_page('returnval(shw());') == [['show', 'hide'], False] + +@pytest.mark.ext_data(repo_query_ext_data) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('message', [ + 'browsing_for', + 'no_repos', + 'failure_to_communicate', + 'HTTP_code', + 'invalid_JSON', + 'newer_API_version', + 'invalid_response_format', + 'querying_repo', + 'no_results' +]) +def test_repo_query_messages(driver, execute_in_page, message): + """ + Test of loading and error messages shown in parts of the repo query view. + """ + def has_msg(message, elem=None): + def has_msg_and_is_visible(dummy_driver): + if elem: + return elem.is_displayed() and message in elem.text + else: + return message in driver.page_source + return has_msg_and_is_visible + + def show_and_wait_for_repo_entry(): + execute_in_page('view.show(arguments[0]);', queried_url) + done = lambda d: execute_in_page('returnval(!!view.repo_entries);') + WebDriverWait(driver, 10).until(done) + execute_in_page( + ''' + if (view.repo_entries.length > 0) + view.repo_entries[0].show_results_but.click(); + ''') + + if message == 'browsing_for': + setup_view(execute_in_page, []) + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.url_span.parentNode);') + assert has_msg(f'Browsing custom resources for: {queried_url}', elem)(0) + elif message == 'no_repos': + setup_view(execute_in_page, []) + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repos_list);') + done = has_msg('You have no repositories configured :(', elem) + WebDriverWait(driver, 10).until(done) + elif message == 'failure_to_communicate': + setup_view(execute_in_page, repo_urls) + execute_in_page( + 'browser.tabs.sendMessage = () => Promise.resolve({error: "sth"});' + ) + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repo_entries[0].info_div);') + done = has_msg('Failure to communicate with repository :(', elem) + WebDriverWait(driver, 10).until(done) + elif message == 'HTTP_code': + setup_view(execute_in_page, repo_urls) + execute_in_page( + ''' + const response = {ok: false, status: 405}; + browser.tabs.sendMessage = () => Promise.resolve(response); + ''') + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repo_entries[0].info_div);') + done = has_msg('Repository sent HTTP code 405 :(', elem) + WebDriverWait(driver, 10).until(done) + elif message == 'invalid_JSON': + setup_view(execute_in_page, repo_urls) + execute_in_page( + ''' + const response = {ok: true, status: 200, error_json: "sth"}; + browser.tabs.sendMessage = () => Promise.resolve(response); + ''') + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repo_entries[0].info_div);') + done = has_msg("Repository's response is not valid JSON :(", elem) + WebDriverWait(driver, 10).until(done) + elif message == 'newer_API_version': + setup_view(execute_in_page, repo_urls) + execute_in_page( + ''' + const response = { + ok: true, + status: 200, + json: {$schema: "https://hydrilla.koszko.org/schemas/api_query_result-3.2.1.schema.json"} + }; + browser.tabs.sendMessage = () => Promise.resolve(response); + ''') + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repo_entries[0].info_div);') + msg = 'Results were served using unsupported Hydrilla API version. You might need to update Haketilo.' + WebDriverWait(driver, 10).until(has_msg(msg, elem)) + elif message == 'invalid_response_format': + setup_view(execute_in_page, repo_urls) + execute_in_page( + ''' + const response = { + ok: true, + status: 200, + /* $schema is not a string as it should be. */ + json: {$schema: null} + }; + browser.tabs.sendMessage = () => Promise.resolve(response); + ''') + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repo_entries[0].info_div);') + msg = 'Results were served using a nonconforming response format.' + WebDriverWait(driver, 10).until(has_msg(msg, elem)) + elif message == 'querying_repo': + setup_view(execute_in_page, repo_urls) + execute_in_page( + 'browser.tabs.sendMessage = () => new Promise(() => {});' + ) + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repo_entries[0].info_div);') + assert has_msg('Querying repository...', elem)(0) + elif message == 'no_results': + setup_view(execute_in_page, repo_urls) + execute_in_page( + ''' + const response = { + ok: true, + status: 200, + json: { + $schema: "https://hydrilla.koszko.org/schemas/api_query_result-1.schema.json", + mappings: [] + } + }; + browser.tabs.sendMessage = () => Promise.resolve(response); + ''') + show_and_wait_for_repo_entry() + + elem = execute_in_page('returnval(view.repo_entries[0].info_div);') + WebDriverWait(driver, 10).until(has_msg('No results :(', elem)) + else: + raise Exception('made a typo in test function params?') diff --git a/test/haketilo_test/unit/test_repo_query_cacher.py b/test/haketilo_test/unit/test_repo_query_cacher.py new file mode 100644 index 0000000..5fbc5cd --- /dev/null +++ b/test/haketilo_test/unit/test_repo_query_cacher.py @@ -0,0 +1,130 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - caching responses from remote repositories +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +import json +from selenium.webdriver.support.ui import WebDriverWait + +from ..script_loader import load_script + +def content_script(): + script = load_script('content/repo_query_cacher.js') + return f'{script}; {tab_id_asker}; start();' + +def bypass_js(): + return load_script('background/CORS_bypass_server.js') + '; start();' + +def fetch_through_cache(driver, tab_id, url): + return driver.execute_script( + ''' + return browser.tabs.sendMessage(arguments[0], + ["repo_query", arguments[1]]); + ''', + tab_id, url) + +""" +tab_id_responder is meant to be appended to background script of a test +extension. +""" +tab_id_responder = ''' +function tell_tab_id(msg, sender, respond_cb) { + if (msg[0] === "learn_tab_id") + respond_cb(sender.tab.id); +} +browser.runtime.onMessage.addListener(tell_tab_id); +''' + +""" +tab_id_asker is meant to be appended to content script of a test extension. +""" +tab_id_asker = ''' +browser.runtime.sendMessage(["learn_tab_id"]) + .then(tid => window.wrappedJSObject.haketilo_tab = tid); +''' + +def run_content_script_in_new_window(driver, url): + """ + Expect an extension to be loaded which had tab_id_responder and tab_id_asker + appended to its background and content scripts, respectively. + Open the provided url in a new tab, find its tab id and return it, with + current window changed back to the initial one. + """ + handle0 = driver.current_window_handle + initial_handles = [*driver.window_handles] + driver.execute_script('window.open(arguments[0], "_blank");', url) + window_added = lambda d: set(d.window_handles) != set(initial_handles) + WebDriverWait(driver, 10).until(window_added) + new_handle = [*set(driver.window_handles).difference(initial_handles)][0] + + driver.switch_to.window(new_handle) + + get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;') + tab_id = WebDriverWait(driver, 10).until(get_tab_id) + + driver.switch_to.window(handle0) + return tab_id + +@pytest.mark.ext_data({ + 'content_script': content_script, + 'background_script': lambda: bypass_js() + ';' + tab_id_responder +}) +@pytest.mark.usefixtures('webextension') +def test_repo_query_cacher_normal_use(driver, execute_in_page): + """ + Test if HTTP requests made through our cacher return correct results. + """ + tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in') + + result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') + assert set(result.keys()) == {'ok', 'status', 'json'} + counter_initial = result['json']['counter'] + assert type(counter_initial) is int + + for i in range(2): + result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') + assert result['json']['counter'] == counter_initial + + tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in') + result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') + assert result['json']['counter'] == counter_initial + 1 + + for i in range(2): + result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/') + assert set(result.keys()) == {'ok', 'status', 'error_json'} + assert result['ok'] == False + assert result['status'] == 404 + + for i in range(2): + result = fetch_through_cache(driver, tab_id, 'bad://url') + assert set(result.keys()) == {'error'} + +@pytest.mark.ext_data({ + 'content_script': content_script, + 'background_script': tab_id_responder +}) +@pytest.mark.usefixtures('webextension') +def test_repo_query_cacher_bgscript_error(driver): + """ + Test if our cacher properly reports errors in communication with the + background script. + """ + tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in') + + result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') + assert set(result.keys()) == {'error'} diff --git a/test/haketilo_test/unit/test_settings.py b/test/haketilo_test/unit/test_settings.py new file mode 100644 index 0000000..7cdb76f --- /dev/null +++ b/test/haketilo_test/unit/test_settings.py @@ -0,0 +1,63 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - entire settings page +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +from .utils import * + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +@pytest.mark.ext_data({ + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/settings.html', wrap_into_htmldoc=False) +}) +@pytest.mark.usefixtures('webextension') +def test_settings_page_tabs(driver, execute_in_page): + """ + Test navigation throught the tabs of the settings page. + """ + # First, put some sample data in IndexedDB. + execute_in_page(load_script('common/indexeddb.js')) + execute_in_page( + ''' + initial_data = arguments[0]; + returnval(get_db().then(() => {})); + ''', + make_complete_sample_data()) + + # Now navigate to settings page. + testpage_url = driver.execute_script('return window.location.href;') + driver.get(testpage_url.replace('testpage.html', 'html/settings.html')) + + names = ['blocking', 'mappings', 'resources', 'new_payload', 'repos'] + tabs = dict([(n, driver.find_element_by_id(f'{n}_tab')) for n in names]) + heads = dict([(n, driver.find_element_by_id(f'{n}_head')) for n in names]) + + for i, tab_name in enumerate(['new_payload', *names]): + if (i > 0): + heads[tab_name].click() + + assert 'active_head' in heads[tab_name].get_attribute('class') + assert 'active_tab' in tabs[tab_name].get_attribute('class') + assert tabs[tab_name].is_displayed() + for tab_name_2 in [n for n in names if n != tab_name]: + assert 'active_head' not in heads[tab_name_2].get_attribute('class') + assert 'active_tab' not in tabs[tab_name_2].get_attribute('class') + assert not tabs[tab_name_2].is_displayed() diff --git a/test/haketilo_test/unit/test_text_entry_list.py b/test/haketilo_test/unit/test_text_entry_list.py new file mode 100644 index 0000000..3135a59 --- /dev/null +++ b/test/haketilo_test/unit/test_text_entry_list.py @@ -0,0 +1,387 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - list of editable entries +""" + +# This file is part of Haketilo +# +# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import pytest +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.common.keys import Keys +import inspect + +from ..extension_crafting import ExtraHTML +from ..script_loader import load_script +from .utils import * + +list_code_template = '(await blocking_allowing_lists(%%s))[%d]' +mode_parameters = [ + #add_action del_action instantiate_code + ('set_repo', 'del_repo', 'await repo_list(%s)'), + ('set_disallowed', 'set_default_allowing', list_code_template % 0), + ('set_disallowed', 'set_allowed', list_code_template % 0), + ('set_allowed', 'set_default_allowing', list_code_template % 1), + ('set_allowed', 'set_disallowed', list_code_template % 1) +] + +def instantiate_list(to_return): + instantiate_code = inspect.stack()[1].frame.f_locals['instantiate_code'] + return inspect.stack()[1].frame.f_locals['execute_in_page']( + f''' + let dialog_ctx = dialog.make(() => {{}}, () => {{}}), list; + async function make_list() {{ + list = {instantiate_code % 'dialog_ctx'}; + document.body.append(list.main_div, dialog_ctx.main_div); + return [{', '.join(to_return)}]; + }} + returnval(make_list()); + ''') + +dialog_html_append = {'html/text_entry_list.html': '#INCLUDE html/dialog.html'} +dialog_html_test_ext_data = { + 'background_script': broker_js, + 'extra_html': ExtraHTML('html/text_entry_list.html', dialog_html_append), + 'navigate_to': 'html/text_entry_list.html' +} + +@pytest.mark.ext_data(dialog_html_test_ext_data) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('mode', mode_parameters) +def test_text_entry_list_ordering(driver, execute_in_page, mode): + """ + A test case of ordering of repo URLs or URL patterns in the list. + """ + add_action, del_action, instantiate_code = mode + + execute_in_page(load_script('html/text_entry_list.js')) + + endings = ['hyd/', 'hydrilla/', 'Hydrilla/', 'HYDRILLA/', + 'test/', 'test^it/', 'Test^it/', 'TEST^IT/'] + + indexes_added = set() + + for iteration, to_include in enumerate([ + set([i for i in range(len(endings)) if is_prime(i)]), + set([i for i in range(len(endings)) + if not is_prime(i) and i & 1]), + set([i for i in range(len(endings)) if i % 3 == 0]), + set([i for i in range(len(endings)) + if i % 3 and not i & 1 and not is_prime(i)]), + set(range(len(endings))) + ]): + endings_to_include = [endings[i] for i in sorted(to_include)] + urls = [f'https://example.com/{e}' for e in endings_to_include] + + def add_urls(): + execute_in_page( + '''{ + async function add_urls(urls, add_action) { + for (const url of urls) + await haketilodb[add_action](url); + } + returnval(add_urls(...arguments)); + }''', + urls, add_action) + + def wait_for_completed(wait_id): + """ + We add an extra url to IndexedDB and wait for it to appear in the + DOM list. Once this happes, we know other operations must have also + finished. + """ + url = f'https://example.org/{iteration}/{wait_id}' + execute_in_page( + ''' + returnval(haketilodb[arguments[1]](arguments[0])); + ''', + url, add_action) + WebDriverWait(driver, 10).until(lambda _: url in list_div.text) + + def assert_order(indexes_present, empty_entry_expected=False): + entries_texts = execute_in_page( + ''' + returnval([...list.list_div.children].map(n => n.textContent)); + ''') + + if empty_entry_expected: + assert 'example' not in entries_texts[0] + entries_texts.pop(0) + + for i, et in zip(sorted(indexes_present), entries_texts): + assert f'https://example.com/{endings[i]}' in et + + for et in entries_texts[len(indexes_present):]: + assert 'example.org' in et + + add_urls() + + if iteration == 0: + list_div, new_entry_but = \ + instantiate_list(['list.list_div', 'list.new_but']) + + indexes_added.update(to_include) + wait_for_completed(0) + assert_order(indexes_added) + + execute_in_page( + '''{ + async function remove_urls(urls, del_action) { + for (const url of urls) + await haketilodb[del_action](url); + } + returnval(remove_urls(...arguments)); + }''', + urls, del_action) + wait_for_completed(1) + assert_order(indexes_added.difference(to_include)) + + # On the last iteration, add a new editable entry before re-additions. + if len(to_include) == len(endings): + new_entry_but.click() + add_urls() + wait_for_completed(2) + assert_order(indexes_added, empty_entry_expected=True) + else: + add_urls() + +def active(id): + return inspect.stack()[1].frame.f_locals['execute_in_page']\ + (f'returnval(list.active_entry.{id});') +def existing(id, entry_nr=0): + return inspect.stack()[1].frame.f_locals['execute_in_page']( + ''' + returnval(list.entries_by_text.get(list.shown_texts[arguments[0]])\ + [arguments[1]]); + ''', + entry_nr, id) + +@pytest.mark.ext_data(dialog_html_test_ext_data) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('mode', [mp for mp in mode_parameters + if mp[1] != 'set_default_allowing']) +def test_text_entry_list_editing(driver, execute_in_page, mode): + """ + A test case of editing entries in repo URLs list. + """ + add_action, _, instantiate_code = mode + + execute_in_page(load_script('html/text_entry_list.js')) + + execute_in_page( + ''' + let original_loader = dialog.loader, last_loader_msg; + dialog.loader = (ctx, ...msg) => { + last_loader_msg = msg; + return original_loader(ctx, ...msg); + } + ''') + last_loader_msg = lambda: execute_in_page('returnval(last_loader_msg);') + + list_div, new_entry_but = \ + instantiate_list(['list.list_div', 'list.new_but']) + + if 'allow' in add_action: + assert last_loader_msg() == ['Loading script blocking settings...'] + else: + assert last_loader_msg() == ['Loading repositories...'] + + assert execute_in_page('returnval(dialog_ctx.shown);') == False + + # Test adding new item. Submit via button click. + new_entry_but.click() + assert not active('noneditable_view').is_displayed() + assert not active('save_but').is_displayed() + assert active('add_but').is_displayed() + assert active('cancel_but').is_displayed() + active('input').send_keys('https://example.com///') + active('add_but').click() + WebDriverWait(driver, 10).until(lambda _: 'example.com' in list_div.text) + assert execute_in_page('returnval(list.list_div.children.length);') == 1 + if 'disallow' in add_action: + assert last_loader_msg() == \ + ["Blocking scripts on 'https://example.com/'..."] + elif 'allow' in add_action: + assert last_loader_msg() == \ + ["Allowing scripts on 'https://example.com/'..."] + else: + assert last_loader_msg() == \ + ["Adding repository 'https://example.com/'..."] + + assert not existing('editable_view').is_displayed() + assert existing('text').is_displayed() + assert existing('remove_but').is_displayed() + + # Test editing item. Submit via 'Enter' hit. Also test url pattern + # normalization. + existing('text').click() + assert not active('noneditable_view').is_displayed() + assert not active('add_but').is_displayed() + assert active('save_but').is_displayed() + assert active('cancel_but').is_displayed() + assert active('input.value') == 'https://example.com/' + active('input').send_keys(Keys.BACKSPACE * 30 + 'https://example.org//a//b' + + Keys.ENTER) + WebDriverWait(driver, 10).until(lambda _: 'example.org' in list_div.text) + assert execute_in_page('returnval(list.list_div.children.length);') == 1 + if 'disallow' in add_action: + assert last_loader_msg() == ['Rewriting script blocking rule...'] + elif 'allow' in add_action: + assert last_loader_msg() == ['Rewriting script allowing rule...'] + else: + assert last_loader_msg() == ['Replacing repository...'] + + # Test entry removal. + existing('remove_but').click() + WebDriverWait(driver, 10).until(lambda _: 'xample.org' not in list_div.text) + assert execute_in_page('returnval(list.list_div.children.length);') == 0 + if 'allow' in add_action: + assert last_loader_msg() == \ + ["Setting default scripts blocking policy on 'https://example.org/a/b'..."] + else: + assert last_loader_msg() == ["Removing repository 'https://example.org//a//b/'..."] + + # The rest of this test remains the same regardless of mode. No point + # testing the same thing multiple times. + if 'repo' not in add_action: + return + + # Test that clicking hidden buttons of item not being edited does nothing. + new_entry_but.click() + active('input').send_keys('https://example.foo' + Keys.ENTER) + WebDriverWait(driver, 10).until(lambda _: 'xample.foo/' in list_div.text) + existing('add_but.click()') + existing('save_but.click()') + existing('cancel_but.click()') + assert execute_in_page('returnval(dialog_ctx.shown);') == False + assert execute_in_page('returnval(list.list_div.children.length);') == 1 + assert not existing('editable_view').is_displayed() + + # Test that clicking hidden buttons of item being edited does nothing. + existing('text').click() + active('remove_but.click()') + active('add_but.click()') + assert execute_in_page('returnval(dialog_ctx.shown);') == False + assert execute_in_page('returnval(list.list_div.children.length);') == 1 + assert not active('noneditable_view').is_displayed() + + # Test that creating a new entry makes the other one noneditable again. + new_entry_but.click() + assert existing('text').is_displayed() + + # Test that clicking hidden buttons of new item entry does nothing. + active('remove_but.click()') + active('save_but.click()') + assert execute_in_page('returnval(dialog_ctx.shown);') == False + assert execute_in_page('returnval(list.list_div.children.length);') == 2 + assert not active('noneditable_view').is_displayed() + + # Test that starting edit of another entry removes the new entry. + existing('text').click() + assert existing('editable_view').is_displayed() + assert execute_in_page('returnval(list.list_div.children.length);') == 1 + + # Test that starting edit of another entry cancels edit of the first entry. + new_entry_but.click() + active('input').send_keys('https://example.net' + Keys.ENTER) + WebDriverWait(driver, 10).until(lambda _: 'example.net/' in list_div.text) + assert execute_in_page('returnval(list.list_div.children.length);') == 2 + existing('text', 0).click() + assert existing('editable_view', 0).is_displayed() + assert not existing('editable_view', 1).is_displayed() + existing('text', 1).click() + assert not existing('editable_view', 0).is_displayed() + assert existing('editable_view', 1).is_displayed() + +@pytest.mark.ext_data(dialog_html_test_ext_data) +@pytest.mark.usefixtures('webextension') +@pytest.mark.parametrize('mode', [mp for mp in mode_parameters + if mp[1] != 'set_default_allowing']) +def test_text_entry_list_errors(driver, execute_in_page, mode): + """ + A test case of error dialogs shown by repo URL list. + """ + add_action, _, instantiate_code = mode + + execute_in_page(load_script('html/text_entry_list.js')) + + to_return = ['list.list_div', 'list.new_but', 'dialog_ctx.main_div'] + list_div, new_entry_but, dialog_div = instantiate_list(to_return) + + # Prepare one entry to use later. + new_entry_but.click() + active('input').send_keys('https://example.com' + Keys.ENTER) + + # Check invalid URL errors. + for clickable in (existing('text'), new_entry_but): + clickable.click() + active('input').send_keys(Keys.BACKSPACE * 30 + 'ws://example' + + Keys.ENTER) + execute_in_page('dialog.close(dialog_ctx);') + + if 'allow' in add_action: + assert "'ws://example' is not a valid URL pattern. See here for more details." \ + in dialog_div.text + assert patterns_doc_url == \ + driver.find_element_by_link_text('here').get_attribute('href') + continue + else: + assert 'Repository URLs shoud use https:// schema.' \ + in dialog_div.text + + active('input').send_keys(Keys.BACKSPACE * 30 + 'https://example' + + Keys.ENTER) + assert 'Provided URL is not valid.' in dialog_div.text + execute_in_page('dialog.close(dialog_ctx);') + + # Mock errors to force error messages to appear. + execute_in_page( + ''' + for (const action of [ + "set_repo", "del_repo", "set_allowed", "set_default_allowing" + ]) + haketilodb[action] = () => {throw "reckless, limitless scope";}; + ''') + + # Check database error dialogs. + def check_reported_failure(txt): + fail = lambda _: txt in dialog_div.text + WebDriverWait(driver, 10).until(fail) + execute_in_page('dialog.close(dialog_ctx);') + + existing('text').click() + active('input').send_keys(Keys.BACKSPACE * 30 + 'https://example.org' + + Keys.ENTER) + if 'disallow' in add_action: + check_reported_failure('Failed to rewrite blocking rule :(') + elif 'allow' in add_action: + check_reported_failure('Failed to rewrite allowing rule :(') + else: + check_reported_failure('Failed to replace repository :(') + + active('cancel_but').click() + existing('remove_but').click() + if 'allow' in add_action: + check_reported_failure("Failed to remove rule for 'https://example.com' :(") + else: + check_reported_failure("Failed to remove repository 'https://example.com/' :(") + + new_entry_but.click() + active('input').send_keys('https://example.org' + Keys.ENTER) + if 'disallow' in add_action: + check_reported_failure("Failed to write blocking rule for 'https://example.org' :(") + elif 'allow' in add_action: + check_reported_failure("Failed to write allowing rule for 'https://example.org' :(") + else: + check_reported_failure("Failed to add repository 'https://example.org/' :(") diff --git a/test/haketilo_test/unit/test_webrequest.py b/test/haketilo_test/unit/test_webrequest.py new file mode 100644 index 0000000..fb24b3d --- /dev/null +++ b/test/haketilo_test/unit/test_webrequest.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: CC0-1.0 + +""" +Haketilo unit tests - modifying requests using webRequest API +""" + +# This file is part of Haketilo +# +# Copyright (C) 2021, Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the CC0 1.0 Universal License as published by +# the Creative Commons Corporation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# CC0 1.0 Universal License for more details. + +import re +from hashlib import sha256 +import pytest + +from ..script_loader import load_script +from .utils import are_scripts_allowed + +def webrequest_js(): + return (load_script('background/webrequest.js', + '#IMPORT common/patterns_query_tree.js AS pqt') + + '''; + // Mock pattern tree. + tree = pqt.make(); + // Mock default allow. + default_allow = {name: "default_allow", value: true}; + + // Rule to block scripts. + pqt.register(tree, "https://site.with.scripts.block.ed/***", + "~allow", 0); + + // Rule to allow scripts, but overridden by payload assignment. + pqt.register(tree, "https://site.with.paylo.ad/***", "~allow", 1); + pqt.register(tree, "https://site.with.paylo.ad/***", + "somemapping", {identifier: "someresource"}); + + // Mock stream_filter. + stream_filter.apply = (details, headers, policy) => headers; + + // Mock secret and start webrequest operations. + start("somesecret"); + ''') + +@pytest.mark.ext_data({'background_script': webrequest_js}) +@pytest.mark.usefixtures('webextension') +def test_on_headers_received(driver, execute_in_page): + for attempt in range(10): + driver.get('https://site.with.scripts.block.ed/') + + if not are_scripts_allowed(driver): + break + assert attempt != 9 + + driver.get('https://site.with.scripts.allow.ed/') + assert are_scripts_allowed(driver) + + driver.get('https://site.with.paylo.ad/') + assert not are_scripts_allowed(driver) + source = 'somemapping:someresource:https://site.with.paylo.ad/index.html:somesecret' + assert are_scripts_allowed(driver, sha256(source.encode()).digest().hex()) diff --git a/test/haketilo_test/unit/utils.py b/test/haketilo_test/unit/utils.py new file mode 100644 index 0000000..b27a209 --- /dev/null +++ b/test/haketilo_test/unit/utils.py @@ -0,0 +1,293 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +""" +Various functions and objects that can be reused between unit tests +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021,2022 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this file's +# license. Although I request that you do not make use of this code in a +# proprietary program, I am not going to enforce this in court. + +from hashlib import sha256 +from selenium.webdriver.support.ui import WebDriverWait + +from ..script_loader import load_script + +patterns_doc_url = \ + 'https://hydrillabugs.koszko.org/projects/haketilo/wiki/URL_patterns' + +def sample_file(contents): + return { + 'sha256': sha256(contents.encode()).digest().hex(), + 'contents': contents + } + +def make_sample_files(names_contents): + """ + Take a dict mapping file names to file contents. Return, as a tuple, dicts + mapping file names to file objects (dicts) and file hash keys to file + contents. + """ + sample_files = dict([(name, sample_file(contents)) + for name, contents in names_contents.items()]) + + sample_files_by_sha256 = dict([[file['sha256'], file['contents']] + for file in sample_files.values()]) + + return sample_files, sample_files_by_sha256 + +sample_files, sample_files_by_sha256 = make_sample_files({ + 'report.spdx': '<!-- dummy report -->', + 'LICENSES/somelicense.txt': 'Permission is granted...', + 'LICENSES/CC0-1.0.txt': 'Dummy Commons...', + 'hello.js': 'console.log("uńićódę hello!");\n', + 'bye.js': 'console.log("bye!");\n', + 'combined.js': 'console.log("hello!\\nbye!");\n', + 'README.md': '# Python Frobnicator\n...' +}) + +def sample_file_ref(file_name, sample_files_dict=sample_files): + """ + Return a dictionary suitable for using as file reference in resource/mapping + definition. + """ + return { + 'file': file_name, + 'sha256': sample_files_dict[file_name]['sha256'] + } + +def make_sample_mapping(with_files=True): + """ + Procude a sample mapping definition that can be dumped to JSON and put into + Haketilo's IndexedDB. + """ + return { + '$schema': 'https://hydrilla.koszko.org/schemas/api_mapping_description-1.schema.json', + 'generated_by': { + 'name': 'human', + 'version': 'sapiens-0.8.14' + }, + 'source_name': 'example-org-fixes-new', + 'source_copyright': [ + sample_file_ref('report.spdx'), + sample_file_ref('LICENSES/CC0-1.0.txt') + ] if with_files else [], + 'type': 'mapping', + 'identifier': 'example-org-minimal', + 'long_name': 'Example.org Minimal', + 'uuid': '54d23bba-472e-42f5-9194-eaa24c0e3ee7', + 'version': [2022, 5, 10], + 'description': 'suckless something something', + 'payloads': { + 'https://example.org/a/*': { + 'identifier': 'some-KISS-resource' + }, + 'https://example.org/t/*': { + 'identifier': 'another-KISS-resource' + } + } + } + +def make_sample_resource(with_files=True): + """ + Procude a sample resource definition that can be dumped to JSON and put into + Haketilo's IndexedDB. + """ + return { + '$schema': 'https://hydrilla.koszko.org/schemas/api_resource_description-1.schema.json', + 'generated_by': { + 'name': 'human', + 'version': 'sapiens-0.8.14' + }, + 'source_name': 'hello', + 'source_copyright': [ + sample_file_ref('report.spdx'), + sample_file_ref('LICENSES/CC0-1.0.txt') + ] if with_files else [], + 'type': 'resource', + 'identifier': 'helloapple', + 'long_name': 'Hello Apple', + 'uuid': 'a6754dcb-58d8-4b7a-a245-24fd7ad4cd68', + 'version': [2021, 11, 10], + 'revision': 1, + 'description': 'greets an apple', + 'dependencies': [{'identifier': 'hello-message'}], + 'scripts': [ + sample_file_ref('hello.js'), + sample_file_ref('bye.js') + ] if with_files else [] + } + +def item_version_string(definition, include_revision=False): + """ + Given a resource or mapping definition, read its "version" property (and + also "revision" if applicable) and produce a corresponding version string. + """ + ver = '.'.join([str(num) for num in definition['version']]) + revision = definition.get('revision') if include_revision else None + return f'{ver}-{revision}' if revision is not None else ver + +def sample_data_dict(items): + """ + Some IndexedDB functions expect saved items to be provided in a nested dict + that makes them queryable by identifier by version. This function converts + items list to such dict. + """ + return dict([(it['identifier'], {item_version_string(it): it}) + for it in items]) + +def make_complete_sample_data(): + """ + Craft a JSON data item with 1 sample resource and 1 sample mapping that can + be used to populate IndexedDB. + """ + return { + 'resource': sample_data_dict([make_sample_resource()]), + 'mapping': sample_data_dict([make_sample_mapping()]), + 'file': { + 'sha256': sample_files_by_sha256 + } + } + +def clear_indexeddb(execute_in_page): + """ + Remove Haketilo data from IndexedDB. If variables from common/indexeddb.js + are in the global scope, this function will handle closing the opened + database instance (if any). Otherwise, the caller is responsible for making + sure the database being deleted is not opened anywhere. + """ + execute_in_page( + '''{ + async function delete_db() { + if (typeof db !== "undefined" && db) { + db.close(); + db = null; + } + let resolve, reject; + const result = new Promise((...cbs) => [resolve, reject] = cbs); + const request = indexedDB.deleteDatabase("haketilo"); + [request.onsuccess, request.onerror] = [resolve, reject]; + await result; + } + + returnval(delete_db()); + }''' + ) + +def get_db_contents(execute_in_page): + """ + Retrieve all IndexedDB contents. It is expected that either variables from + common/indexeddb.js are in the global scope or common/indexeddb.js is + imported as haketilodb. + """ + return execute_in_page( + '''{ + async function get_database_contents() + { + const db_getter = + typeof haketilodb === "undefined" ? get_db : haketilodb.get; + const db = await db_getter(); + + const transaction = db.transaction(db.objectStoreNames); + const result = {}; + + for (const store_name of db.objectStoreNames) { + const req = transaction.objectStore(store_name).getAll(); + await new Promise(cb => req.onsuccess = cb); + result[store_name] = req.result; + } + + return result; + } + returnval(get_database_contents()); + }''') + +def is_prime(n): + return n > 1 and all([n % i != 0 for i in range(2, n)]) + +broker_js = lambda: load_script('background/broadcast_broker.js') + ';start();' + +def are_scripts_allowed(driver, nonce=None): + return driver.execute_script( + ''' + document.haketilo_scripts_allowed = false; + const script = document.createElement("script"); + script.innerHTML = "document.haketilo_scripts_allowed = true;"; + if (arguments[0]) + script.setAttribute("nonce", arguments[0]); + document.head.append(script); + return document.haketilo_scripts_allowed; + ''', + nonce) + +def mock_broadcast(execute_in_page): + """ + Make all broadcast operations no-ops (broadcast must be imported). + """ + execute_in_page( + 'Object.keys(broadcast).forEach(k => broadcast[k] = () => {});' + ) + +def mock_cacher(execute_in_page): + """ + Some parts of code depend on content/repo_query_cacher.js and + background/CORS_bypass_server.js running in their appropriate contexts. This + function modifies the relevant browser.runtime.sendMessage function to + perform fetch(), bypassing the cacher. + """ + execute_in_page( + '''{ + const old_sendMessage = browser.tabs.sendMessage, old_fetch = fetch; + async function new_sendMessage(tab_id, msg) { + if (msg[0] !== "repo_query") + return old_sendMessage(tab_id, msg); + + /* Use snapshotted fetch(), allow other test code to override it. */ + const response = await old_fetch(msg[1]); + if (!response) + return {error: "Something happened :o"}; + + const result = {ok: response.ok, status: response.status}; + try { + result.json = await response.json(); + } catch(e) { + result.error_json = "" + e; + } + return result; + } + + browser.tabs.sendMessage = new_sendMessage; + }''') + +""" +Convenience snippet of code to retrieve a copy of given object with only those +properties present which are DOM nodes. This makes it possible to easily access +DOM nodes stored in a javascript object that also happens to contain some +other properties that make it impossible to return from a Selenium script. +""" +nodes_props_code = '''\ +(obj => { + const result = {}; + for (const [key, value] of Object.entries(obj)) { + if (value instanceof Node) + result[key] = value; + } + return result; +})''' diff --git a/test/haketilo_test/world_wide_library.py b/test/haketilo_test/world_wide_library.py new file mode 100644 index 0000000..1eb826e --- /dev/null +++ b/test/haketilo_test/world_wide_library.py @@ -0,0 +1,270 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later + +""" +Our helpful little stand-in for the Internet +""" + +# This file is part of Haketilo. +# +# Copyright (C) 2021 jahoti <jahoti@tilde.team> +# Copyright (C) 2021 Wojtek Kosior <koszko@koszko.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this code +# in a proprietary program, I am not going to enforce this in court. + +from hashlib import sha256 +from pathlib import Path +from shutil import rmtree +from threading import Lock +from uuid import uuid4 +import json + +from .misc_constants import here +from .unit.utils import * # sample repo data + +# TODO: instead of having the entire catalog defined here, make it possible to +# add catalog items from within individual test files. + +served_scripts = {} +served_scripts_lock = Lock() + +def start_serving_script(script_text): + """ + Register given script so that it is served at + https://serve.scrip.ts/?sha256=<script's_sha256_sum> + + Returns the URL at which script will be served. + + This function lacks thread safety. Might moght consider fixing this if it + turns + """ + sha256sum = sha256(script_text.encode()).digest().hex() + served_scripts_lock.acquire() + served_scripts[sha256sum] = script_text + served_scripts_lock.release() + + return f'https://serve.scrip.ts/?sha256={sha256sum}' + +def serve_script(command, get_params, post_params): + """ + info() callback to pass to request-handling code in server.py. Facilitates + serving scripts that have been registered with start_serving_script(). + """ + served_scripts_lock.acquire() + try: + script = served_scripts.get(get_params['sha256'][0]) + finally: + served_scripts_lock.release() + if script is None: + return 404, {}, b'' + + return 200, {'Content-Type': 'application/javascript'}, script + +def dump_scripts(directory=(Path.cwd() / 'injected_scripts')): + """ + Write all scripts that have been registered with start_serving_script() + under the provided directory. If the directory already exists, it is wiped + beforehand. If it doesn't exist, it is created. + """ + directory = Path(directory) + rmtree(directory, ignore_errors=True) + directory.mkdir(parents=True) + + served_scripts_lock.acquire() + for sha256, script in served_scripts.items(): + with open(directory / sha256, 'wt') as file: + file.write(script) + served_scripts_lock.release() + +some_data = '{"some": "data"}' + +# used by handler function of https://counterdoma.in +request_counter = 0 + +def serve_counter(command, get_params, post_params): + global request_counter + request_counter += 1 + return ( + 200, + {'Cache-Control': 'private, max-age=0, no-store'}, + json.dumps({'counter': request_counter}) + ) + +# Mock a Hydrilla repository. + +make_handler = lambda txt: lambda c, g, p: (200, {}, txt) + +# Mock files in the repository. +sample_contents = [f'Mi povas manĝi vitron, ĝi ne damaĝas min {i}' + for i in range(9)] +sample_hashes = [sha256(c.encode()).digest().hex() for c in sample_contents] + +file_url = lambda hashed: f'https://hydril.la/file/sha256/{hashed}' + +sample_files_catalog = dict([(file_url(h), make_handler(c)) + for h, c in zip(sample_hashes, sample_contents)]) + +# Mock resources and mappings in the repository. +sample_resource_templates = [] + +for deps in [(0, 1, 2, 3), (3, 4, 5, 6), (6, 7, 8, 9)]: + letters = [chr(ord('a') + i) for i in deps] + sample_resource_templates.append({ + 'id_suffix': ''.join(letters), + 'files_count': deps[0], + 'dependencies': [{'identifier': f'resource_{l}'} for l in letters] + }) + +suffixes = [srt['id_suffix'] for srt in sample_resource_templates] +sample_resource_templates.append({ + 'id_suffix': '-'.join(suffixes), + 'files_count': 2, + 'dependencies': [{'identifier': f'resource_{suf}'} for suf in suffixes] +}) + +for i in range(10): + sample_resource_templates.append({ + 'id_suffix': chr(ord('a') + i), + 'files_count': i, + 'dependencies': [] + }) + +sample_resources_catalog = {} +sample_mappings_catalog = {} +sample_queries = {} + +for srt in sample_resource_templates: + resource = make_sample_resource() + resource['identifier'] = f'resource_{srt["id_suffix"]}' + resource['long_name'] = resource['identifier'].upper() + resource['uuid'] = str(uuid4()) + resource['dependencies'] = srt['dependencies'] + resource['source_copyright'] = [] + resource['scripts'] = [] + for i in range(srt['files_count']): + file_ref = {'file': f'file_{i}', 'sha256': sample_hashes[i]} + resource[('source_copyright', 'scripts')[i & 1]].append(file_ref) + + resource_versions = [resource['version'], resource['version'].copy()] + resource_versions[1][-1] += 1 + + mapping = make_sample_mapping() + mapping['identifier'] = f'mapping_{srt["id_suffix"]}' + mapping['long_name'] = mapping['identifier'].upper() + mapping['uuid'] = str(uuid4()) + mapping['source_copyright'] = resource['source_copyright'] + + mapping_versions = [mapping['version'], mapping['version'].copy()] + mapping_versions[1][-1] += 1 + + sufs = [srt["id_suffix"], *[l for l in srt["id_suffix"] if l.isalpha()]] + patterns = [f'https://example_{suf}.com/*' for suf in set(sufs)] + payloads = {} + + for pat in patterns: + payloads[pat] = {'identifier': resource['identifier']} + + queryable_url = pat.replace('*', 'something') + if queryable_url not in sample_queries: + sample_queries[queryable_url] = [] + + sample_queries[queryable_url].append({ + 'identifier': mapping['identifier'], + 'long_name': mapping['long_name'], + 'version': mapping_versions[1] + }) + + mapping['payloads'] = payloads + + for item, versions, catalog in [ + (resource, resource_versions, sample_resources_catalog), + (mapping, mapping_versions, sample_mappings_catalog) + ]: + fmt = f'https://hydril.la/{item["type"]}/{item["identifier"]}%s.json' + # Make 2 versions of each item so that we can test updates. + for ver in versions: + item['version'] = ver + for fmt_arg in ('', '/' + item_version_string(item)): + catalog[fmt % fmt_arg] = make_handler(json.dumps(item)) + +def serve_query(command, get_params, post_params): + response = { + '$schema': 'https://hydrilla.koszko.org/schemas/api_query_result-1.schema.json', + 'generated_by': { + 'name': 'human', + 'version': 'sapiens-0.8.15' + }, + 'mappings': sample_queries[get_params['url'][0]] + } + + return (200, {}, json.dumps(response)) + +sample_queries_catalog = dict([(f'https://hydril.la/{suf}query', serve_query) + for suf in ('', '1/', '2/', '3/', '4/')]) + +catalog = { + 'http://gotmyowndoma.in': + (302, {'location': 'http://gotmyowndoma.in/index.html'}, None), + 'http://gotmyowndoma.in/': + (302, {'location': 'http://gotmyowndoma.in/index.html'}, None), + 'http://gotmyowndoma.in/index.html': + (200, {}, here / 'data' / 'pages' / 'gotmyowndomain.html'), + + 'https://gotmyowndoma.in': + (302, {'location': 'https://gotmyowndoma.in/index.html'}, None), + 'https://gotmyowndoma.in/': + (302, {'location': 'https://gotmyowndoma.in/index.html'}, None), + 'https://gotmyowndoma.in/index.html': + (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), + + 'https://gotmyowndoma.in/scripts_to_block_1.html': + (200, {}, here / 'data' / 'pages' / 'scripts_to_block_1.html'), + + 'https://anotherdoma.in/resource/blocked/by/CORS.json': + lambda command, get_params, post_params: (200, {}, some_data), + + 'https://counterdoma.in/': serve_counter, + + 'https://serve.scrip.ts/': serve_script, + + 'https://site.with.scripts.block.ed': + (302, {'location': 'https://site.with.scripts.block.ed/index.html'}, None), + 'https://site.with.scripts.block.ed/': + (302, {'location': 'https://site.with.scripts.block.ed/index.html'}, None), + 'https://site.with.scripts.block.ed/index.html': + (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), + + 'https://site.with.scripts.allow.ed': + (302, {'location': 'https://site.with.scripts.allow.ed/index.html'}, None), + 'https://site.with.scripts.allow.ed/': + (302, {'location': 'https://site.with.scripts.allow.ed/index.html'}, None), + 'https://site.with.scripts.allow.ed/index.html': + (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), + + 'https://site.with.paylo.ad': + (302, {'location': 'https://site.with.paylo.ad/index.html'}, None), + 'https://site.with.paylo.ad/': + (302, {'location': 'https://site.with.paylo.ad/index.html'}, None), + 'https://site.with.paylo.ad/index.html': + (200, {}, here / 'data' / 'pages' / 'gotmyowndomain_https.html'), + + **sample_files_catalog, + **sample_resources_catalog, + **sample_mappings_catalog, + **sample_queries_catalog +} |