# SPDX-License-Identifier: CC0-1.0 """ Haketilo unit tests - building pattern tree and putting it in a content script """ # This file is part of Haketilo # # Copyright (C) 2021,2022 Wojtek Kosior <koszko@koszko.org> # # This program is free software: you can redistribute it and/or modify # it under the terms of the CC0 1.0 Universal License as published by # the Creative Commons Corporation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # CC0 1.0 Universal License for more details. import pytest import json from selenium.webdriver.support.ui import WebDriverWait from selenium.common.exceptions import TimeoutException from ..script_loader import load_script def simple_sample_mapping(patterns, fruit): if type(patterns) is not list: patterns = [patterns] payloads = dict([(p, {'identifier': f'{fruit}-{p}'}) for p in patterns]) return { 'source_copyright': [], 'type': 'mapping', 'identifier': f'inject-{fruit}', 'payloads': payloads } def get_content_script_values(driver, content_script): """ Allow easy extraction of 'this.something = ...' values from generated content script and verify the content script is syntactically correct. """ return driver.execute_script( ''' function value_holder() { %s; return this; } return value_holder.call({}); ''' % content_script) # Fields that are not relevant for testing are omitted from these mapping # definitions. sample_mappings = [simple_sample_mapping(pats, fruit) for pats, fruit in [ (['https://gotmyowndoma.in/index.html', 'http://gotmyowndoma.in/index.html'], 'banana'), (['https://***.gotmyowndoma.in/index.html', 'https://**.gotmyowndoma.in/index.html', 'https://*.gotmyowndoma.in/index.html', 'https://gotmyowndoma.in/index.html'], 'orange'), ('https://gotmyowndoma.in/index.html/***', 'grape'), ('http://gotmyowndoma.in/index.html/***', 'melon'), ('https://gotmyowndoma.in/index.html', 'peach'), ('https://gotmyowndoma.in/*', 'pear'), ('https://gotmyowndoma.in/**', 'raspberry'), ('https://gotmyowndoma.in/***', 'strawberry'), ('https://***.gotmyowndoma.in/index.html', 'apple'), ('https://***.gotmyowndoma.in/*', 'avocado'), ('https://***.gotmyowndoma.in/**', 'papaya'), ('https://***.gotmyowndoma.in/***', 'kiwi') ]] sample_blocking = [f'http{s}://{dw}gotmyown%sdoma.in{i}{pw}' for dw in ('', '***.', '**.', '*.') for i in ('/index.html', '') for pw in ('', '/', '/*') for s in ('', 's')] sample_blocking = [{'pattern': pattern % (i if i > 1 else ''), 'allow': bool(i & 1)} for i, pattern in enumerate(sample_blocking)] # Even though patterns_query_manager.js is normally meant to run from background # page, some tests can be as well performed running it from a normal page. @pytest.mark.get_page('https://gotmyowndoma.in') def test_pqm_tree_building(driver, execute_in_page): """ patterns_query_manager.js tracks Haketilo's internal database and builds a constantly-updated pattern tree based on its contents. Mock the database and verify tree building works properly. """ execute_in_page(load_script('background/patterns_query_manager.js')) # Mock IndexedDB and build patterns tree. execute_in_page( ''' const [initial_mappings, initial_blocking] = arguments.slice(0, 2); let mappingchange, blockingchange, settingchange; haketilodb.track.mapping = function (cb) { mappingchange = cb; return [{}, initial_mappings]; } haketilodb.track.blocking = function (cb) { blockingchange = cb; return [{}, initial_blocking]; } haketilodb.track.setting = function (cb) { settingchange = cb; return [{}, [{name: "default_allow", value: true}]]; } let last_script; let unregister_called = 0; async function register_mock(injection) { await new Promise(resolve => setTimeout(resolve, 1)); last_script = injection.js[0].code; return {unregister: () => unregister_called++}; } browser = {contentScripts: {register: register_mock}}; returnval(start("abracadabra")); ''', sample_mappings[0:2], sample_blocking[0:2]) found, tree, content_script, deregistrations = execute_in_page( ''' returnval([pqt.search(tree, arguments[0]).next().value, tree, last_script, unregister_called]); ''', 'https://gotmyowndoma.in/index.html') best_pattern = 'https://gotmyowndoma.in/index.html' assert found == \ dict([('~allow', 1), *[(f'inject-{fruit}', {'identifier': f'{fruit}-{best_pattern}'}) for fruit in ('banana', 'orange')]]) cs_values = get_content_script_values(driver, content_script) assert cs_values['haketilo_secret'] == 'abracadabra' assert cs_values['haketilo_pattern_tree'] == tree assert cs_values['haketilo_default_allow'] == True assert deregistrations == 0 def condition_all_added(driver): last_script = execute_in_page('returnval(last_script);') cs_values = get_content_script_values(driver, last_script) nums = [i for i in range(len(sample_blocking)) if i > 1] return (cs_values['haketilo_default_allow'] == False and all([('gotmyown%sdoma' % i) in last_script for i in nums]) and all([m['identifier'] in last_script for m in sample_mappings])) execute_in_page( '''{ const new_setting_val = {name: "default_allow", value: false}; settingchange({key: "default_allow", new_val: new_setting_val}); for (const mapping of arguments[0]) mappingchange({key: mapping.identifier, new_val: mapping}); for (const blocking of arguments[1]) blockingchange({key: blocking.pattern, new_val: blocking}); }''', sample_mappings[2:], sample_blocking[2:]) WebDriverWait(driver, 10).until(condition_all_added) odd_mappings = \ [m['identifier'] for i, m in enumerate(sample_mappings) if i & 1] odd_blocking = \ [b['pattern'] for i, b in enumerate(sample_blocking) if i & 1] even_mappings = \ [m['identifier'] for i, m in enumerate(sample_mappings) if 1 - i & 1] even_blocking = \ [b['pattern'] for i, b in enumerate(sample_blocking) if 1 - i & 1] def condition_odd_removed(driver): last_script = execute_in_page('returnval(last_script);') nums = [i for i in range(len(sample_blocking)) if i > 1 and 1 - i & 1] return (all([id not in last_script for id in odd_mappings]) and all([id in last_script for id in even_mappings]) and all([p not in last_script for p in odd_blocking[1:]]) and all([('gotmyown%sdoma' % i) in last_script for i in nums])) def condition_all_removed(driver): content_script = execute_in_page('returnval(last_script);') cs_values = get_content_script_values(driver, content_script) return cs_values['haketilo_pattern_tree'] == {} execute_in_page( ''' arguments[0].forEach(identifier => mappingchange({key: identifier})); arguments[1].forEach(pattern => blockingchange({key: pattern})); ''', odd_mappings, odd_blocking) WebDriverWait(driver, 10).until(condition_odd_removed) execute_in_page( ''' arguments[0].forEach(identifier => mappingchange({key: identifier})); arguments[1].forEach(pattern => blockingchange({key: pattern})); ''', even_mappings, even_blocking) WebDriverWait(driver, 10).until(condition_all_removed) def condition_default_allowed_again(driver): content_script = execute_in_page('returnval(last_script);') cs_values = get_content_script_values(driver, content_script) return cs_values['haketilo_default_allow'] == True execute_in_page( '''{ const new_setting_val = {name: "default_allow", value: true}; settingchange({key: "default_allow", new_val: new_setting_val}); }''') WebDriverWait(driver, 10).until(condition_default_allowed_again) content_js = ''' let already_run = false; this.haketilo_content_script_main = function() { if (already_run) return; already_run = true; document.documentElement.innerHTML = "<body><div id='tree-json'>"; document.getElementById("tree-json").innerText = JSON.stringify(this.haketilo_pattern_tree); } if (this.haketilo_pattern_tree !== undefined) this.haketilo_content_script_main(); ''' def background_js(): pqm_js = load_script('background/patterns_query_manager.js', "#IMPORT background/broadcast_broker.js") return pqm_js + '; broadcast_broker.start(); start();' @pytest.mark.ext_data({ 'content_script': content_js, 'background_script': background_js }) @pytest.mark.usefixtures('webextension') def test_pqm_script_injection(driver, execute_in_page): # Let's open a normal page in a second window. Window 0 will be used to make # changes to IndexedDB and window 1 to test the working of content scripts. driver.execute_script('window.open("about:blank", "_blank");') WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2) windows = [*driver.window_handles] def get_tree_json(driver): return driver.execute_script( ''' return (document.getElementById("tree-json") || {}).innerText; ''') def run_content_script(): driver.switch_to.window(windows[1]) driver.get('https://gotmyowndoma.in/index.html') windows[1] = driver.current_window_handle try: return WebDriverWait(driver, 10).until(get_tree_json) except TimeoutException: pass for attempt in range(2): json_txt = run_content_script() if json_txt and json.loads(json_txt) == {}: break; assert attempt != 1 driver.switch_to.window(windows[0]) execute_in_page(load_script('common/indexeddb.js')) sample_data = { 'mapping': dict([(sm['identifier'], {'1.0': sm}) for sm in sample_mappings]), 'resource': {}, 'file': {} } execute_in_page('returnval(save_items(arguments[0]));', sample_data) for attempt in range(2): tree_json = run_content_script() or '{}' json.loads(tree_json) if all([m['identifier'] in tree_json for m in sample_mappings]): break assert attempt != 1 driver.switch_to.window(windows[0]) execute_in_page( '''{ const identifiers = arguments[0]; async function remove_items() { const ctx = await start_items_transaction(["mapping"], {}); for (const id of identifiers) await remove_mapping(id, ctx); await finalize_transaction(ctx); } returnval(remove_items()); }''', [sm['identifier'] for sm in sample_mappings]) for attempt in range(2): json_txt = run_content_script() if json_txt and json.loads(json_txt) == {}: break; assert attempt != 1