aboutsummaryrefslogtreecommitdiff
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - building pattern tree and putting it in a content script
"""

# This file is part of Haketilo
#
# Copyright (C) 2021,2022 Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import pytest
import json
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import TimeoutException

from ..script_loader import load_script

def simple_sample_mapping(patterns, fruit):
    if type(patterns) is not list:
        patterns = [patterns]
    payloads = dict([(p, {'identifier': f'{fruit}-{p}'}) for p in patterns])
    return {
        'source_copyright': [],
        'type': 'mapping',
        'identifier': f'inject-{fruit}',
        'payloads': payloads
    }

def get_content_script_values(driver, content_script):
    """
    Allow easy extraction of 'this.something = ...' values from generated
    content script and verify the content script is syntactically correct.
    """
    return driver.execute_script(
        '''
        function value_holder() {
            %s;
            return this;
        }
        return value_holder.call({});
        ''' % content_script)

# Fields that are not relevant for testing are omitted from these mapping
# definitions.
sample_mappings = [simple_sample_mapping(pats, fruit) for pats, fruit in [
    (['https://gotmyowndoma.in/index.html',
      'http://gotmyowndoma.in/index.html'], 'banana'),
    (['https://***.gotmyowndoma.in/index.html',
      'https://**.gotmyowndoma.in/index.html',
      'https://*.gotmyowndoma.in/index.html',
      'https://gotmyowndoma.in/index.html'], 'orange'),
    ('https://gotmyowndoma.in/index.html/***', 'grape'),
    ('http://gotmyowndoma.in/index.html/***', 'melon'),
    ('https://gotmyowndoma.in/index.html', 'peach'),
    ('https://gotmyowndoma.in/*', 'pear'),
    ('https://gotmyowndoma.in/**', 'raspberry'),
    ('https://gotmyowndoma.in/***', 'strawberry'),
    ('https://***.gotmyowndoma.in/index.html', 'apple'),
    ('https://***.gotmyowndoma.in/*', 'avocado'),
    ('https://***.gotmyowndoma.in/**', 'papaya'),
    ('https://***.gotmyowndoma.in/***', 'kiwi')
]]

sample_blocking = [f'http{s}://{dw}gotmyown%sdoma.in{i}{pw}'
                   for dw in ('', '***.', '**.', '*.')
                   for i in ('/index.html', '')
                   for pw in ('', '/', '/*')
                   for s in ('', 's')]
sample_blocking = [{'pattern': pattern % (i if i > 1 else ''),
                    'allow': bool(i & 1)}
                   for i, pattern in enumerate(sample_blocking)]

# Even though patterns_query_manager.js is normally meant to run from background
# page, some tests can be as well performed running it from a normal page.
@pytest.mark.get_page('https://gotmyowndoma.in')
def test_pqm_tree_building(driver, execute_in_page):
    """
    patterns_query_manager.js tracks Haketilo's internal database and builds a
    constantly-updated pattern tree based on its contents. Mock the database and
    verify tree building works properly.
    """
    execute_in_page(load_script('background/patterns_query_manager.js'))
    # Mock IndexedDB and build patterns tree.
    execute_in_page(
        '''
        const [initial_mappings, initial_blocking] = arguments.slice(0, 2);
        let mappingchange, blockingchange, settingchange;

        haketilodb.track.mapping = function (cb) {
            mappingchange = cb;

            return [{}, initial_mappings];
        }
        haketilodb.track.blocking = function (cb) {
            blockingchange = cb;

            return [{}, initial_blocking];
        }
        haketilodb.track.setting = function (cb) {
            settingchange = cb;

            return [{}, [{name: "default_allow", value: true}]];
        }

        let last_script;
        let unregister_called = 0;
        async function register_mock(injection)
        {
            await new Promise(resolve => setTimeout(resolve, 1));
            last_script = injection.js[0].code;
            return {unregister: () => unregister_called++};
        }
        browser = {contentScripts: {register: register_mock}};

        returnval(start("abracadabra"));
        ''',
        sample_mappings[0:2], sample_blocking[0:2])

    found, tree, content_script, deregistrations = execute_in_page(
        '''
        returnval([pqt.search(tree, arguments[0]).next().value,
                   tree, last_script, unregister_called]);
        ''',
        'https://gotmyowndoma.in/index.html')
    best_pattern = 'https://gotmyowndoma.in/index.html'
    assert found == \
        dict([('~allow', 1),
              *[(f'inject-{fruit}', {'identifier': f'{fruit}-{best_pattern}'})
                for fruit in ('banana', 'orange')]])
    cs_values = get_content_script_values(driver, content_script)
    assert cs_values['haketilo_secret']        == 'abracadabra'
    assert cs_values['haketilo_pattern_tree']  == tree
    assert cs_values['haketilo_default_allow'] == True
    assert deregistrations == 0

    def condition_all_added(driver):
        last_script = execute_in_page('returnval(last_script);')
        cs_values = get_content_script_values(driver, last_script)
        nums = [i for i in range(len(sample_blocking)) if i > 1]
        return (cs_values['haketilo_default_allow'] == False and
                all([('gotmyown%sdoma' % i) in last_script for i in nums]) and
                all([m['identifier'] in last_script for m in sample_mappings]))

    execute_in_page(
        '''{
        const new_setting_val = {name: "default_allow", value: false};
        settingchange({key: "default_allow", new_val: new_setting_val});
        for (const mapping of arguments[0])
            mappingchange({key: mapping.identifier, new_val: mapping});
        for (const blocking of arguments[1])
            blockingchange({key: blocking.pattern, new_val: blocking});
        }''',
        sample_mappings[2:], sample_blocking[2:])
    WebDriverWait(driver, 10).until(condition_all_added)

    odd_mappings = \
        [m['identifier'] for i, m in enumerate(sample_mappings) if i & 1]
    odd_blocking = \
        [b['pattern'] for i, b in enumerate(sample_blocking) if i & 1]
    even_mappings = \
        [m['identifier'] for i, m in enumerate(sample_mappings) if 1 - i & 1]
    even_blocking = \
        [b['pattern'] for i, b in enumerate(sample_blocking) if 1 - i & 1]

    def condition_odd_removed(driver):
        last_script = execute_in_page('returnval(last_script);')
        nums = [i for i in range(len(sample_blocking)) if i > 1 and 1 - i & 1]
        return (all([id not in last_script for id in odd_mappings]) and
                all([id in last_script for id in even_mappings]) and
                all([p not in last_script for p in odd_blocking[1:]]) and
                all([('gotmyown%sdoma' % i) in last_script for i in nums]))

    def condition_all_removed(driver):
        content_script = execute_in_page('returnval(last_script);')
        cs_values = get_content_script_values(driver, content_script)
        return cs_values['haketilo_pattern_tree'] == {}

    execute_in_page(
        '''
        arguments[0].forEach(identifier => mappingchange({key: identifier}));
        arguments[1].forEach(pattern => blockingchange({key: pattern}));
        ''',
        odd_mappings, odd_blocking)

    WebDriverWait(driver, 10).until(condition_odd_removed)

    execute_in_page(
        '''
        arguments[0].forEach(identifier => mappingchange({key: identifier}));
        arguments[1].forEach(pattern => blockingchange({key: pattern}));
        ''',
        even_mappings, even_blocking)

    WebDriverWait(driver, 10).until(condition_all_removed)

    def condition_default_allowed_again(driver):
        content_script = execute_in_page('returnval(last_script);')
        cs_values = get_content_script_values(driver, content_script)
        return cs_values['haketilo_default_allow'] == True

    execute_in_page(
        '''{
        const new_setting_val = {name: "default_allow", value: true};
        settingchange({key: "default_allow", new_val: new_setting_val});
        }''')

    WebDriverWait(driver, 10).until(condition_default_allowed_again)

content_js = '''
let already_run = false;
this.haketilo_content_script_main = function() {
    if (already_run)
        return;
    already_run = true;
    document.documentElement.innerHTML = "<body><div id='tree-json'>";
    document.getElementById("tree-json").innerText =
        JSON.stringify(this.haketilo_pattern_tree);
}
if (this.haketilo_pattern_tree !== undefined)
    this.haketilo_content_script_main();
'''

def background_js():
    pqm_js = load_script('background/patterns_query_manager.js',
                         "#IMPORT background/broadcast_broker.js")
    return pqm_js + '; broadcast_broker.start(); start();'

@pytest.mark.ext_data({
    'content_script':    content_js,
    'background_script': background_js
})
@pytest.mark.usefixtures('webextension')
def test_pqm_script_injection(driver, execute_in_page):
    # Let's open a normal page in a second window. Window 0 will be used to make
    # changes to IndexedDB and window 1 to test the working of content scripts.
    driver.execute_script('window.open("about:blank", "_blank");')
    WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2)
    windows = [*driver.window_handles]

    def get_tree_json(driver):
        return driver.execute_script(
            '''
            return (document.getElementById("tree-json") || {}).innerText;
            ''')

    def run_content_script():
        driver.switch_to.window(windows[1])
        driver.get('https://gotmyowndoma.in/index.html')
        windows[1] = driver.current_window_handle
        try:
            return WebDriverWait(driver, 10).until(get_tree_json)
        except TimeoutException:
            pass

    for attempt in range(2):
        json_txt = run_content_script()
        if json_txt and json.loads(json_txt) == {}:
            break;
        assert attempt != 1

    driver.switch_to.window(windows[0])
    execute_in_page(load_script('common/indexeddb.js'))

    sample_data = {
        'mapping': dict([(sm['identifier'], {'1.0': sm})
                         for sm in sample_mappings]),
        'resource': {},
        'file': {}
    }
    execute_in_page('returnval(save_items(arguments[0]));', sample_data)

    for attempt in range(2):
        tree_json = run_content_script() or '{}'
        json.loads(tree_json)
        if all([m['identifier'] in tree_json for m in sample_mappings]):
            break
        assert attempt != 1

    driver.switch_to.window(windows[0])
    execute_in_page(
        '''{
        const identifiers = arguments[0];
        async function remove_items()
        {
            const ctx = await start_items_transaction(["mapping"], {});
            for (const id of identifiers)
                await remove_mapping(id, ctx);
            await finalize_transaction(ctx);
        }
        returnval(remove_items());
        }''',
        [sm['identifier'] for sm in sample_mappings])

    for attempt in range(2):
        json_txt = run_content_script()
        if json_txt and json.loads(json_txt) == {}:
            break;
        assert attempt != 1