aboutsummaryrefslogtreecommitdiff
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - IndexedDB access
"""

# This file is part of Haketilo
#
# Copyright (C) 2021, 2022 Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import pytest
import json
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import WebDriverException

from ..script_loader import load_script
from .utils import *

# Sample resource definitions. They'd normally contain more fields but here we
# use simplified versions.

def make_sample_resource():
    return {
        'source_copyright': [
            sample_file_ref('report.spdx'),
            sample_file_ref('LICENSES/somelicense.txt')
        ],
        'type': 'resource',
        'identifier': 'helloapple',
        'scripts': [sample_file_ref('hello.js'), sample_file_ref('bye.js')]
    }

def make_sample_mapping():
    return {
        'source_copyright': [
            sample_file_ref('report.spdx'),
            sample_file_ref('README.md')
        ],
        'type': 'mapping',
        'identifier': 'helloapple'
    }

@pytest.mark.get_page('https://gotmyowndoma.in')
def test_haketilodb_update_1_0_1(driver, execute_in_page):
    """
    indexeddb.js modifies data when updating to database version 1.0.1. Verify
    the update procedure works properly.
    """
    execute_in_page(load_script('common/indexeddb.js'))
    execute_in_page('db_version_nr = version_nr([1, 0, 0]);')
    mock_broadcast(execute_in_page)

    # Start with no database.
    clear_indexeddb(execute_in_page)

    v1_url = 'https://hydrilla.koszko.org/api_v1/'
    v2_url = 'https://hydrilla.koszko.org/api_v2/'
    urls_v1 = {'https://sample.url/abc/', v1_url}
    urls_v2 = {'https://sample.url/abc/', v2_url}

    for url in urls_v1:
        execute_in_page('returnval(set_repo(arguments[0]));', url)

    assert urls_v1 == set(execute_in_page('returnval(get_repos());'))

    # Verify that url gets updated to v2 upon database update.
    driver.get(driver.execute_script('return window.location.href;'))
    execute_in_page(load_script('common/indexeddb.js'))
    mock_broadcast(execute_in_page)

    database_contents = get_db_contents(execute_in_page)

    assert set(execute_in_page('returnval(get_repos());')) == urls_v2

    # Verify that url does not get updated when there is no database update.
    execute_in_page('returnval(del_repo(arguments[0]));', v2_url)
    execute_in_page('returnval(set_repo(arguments[0]));', v1_url)

    driver.get(driver.execute_script('return window.location.href;'))
    execute_in_page(load_script('common/indexeddb.js'))
    mock_broadcast(execute_in_page)

    database_contents = get_db_contents(execute_in_page)

    assert set(execute_in_page('returnval(get_repos());')) == urls_v1

@pytest.mark.get_page('https://gotmyowndoma.in')
def test_haketilodb_item_modifications(driver, execute_in_page):
    """
    indexeddb.js facilitates operating on Haketilo's internal database.
    Verify database operations on mappings/resources work properly.
    """
    execute_in_page(load_script('common/indexeddb.js'))
    mock_broadcast(execute_in_page)

    # Start with no database.
    clear_indexeddb(execute_in_page)

    sample_item = make_sample_resource()
    sample_item['source_copyright'][0]['extra_prop'] = True

    execute_in_page(
        '''{
        const promise = start_items_transaction(["resource"], arguments[1])
            .then(ctx => save_item(arguments[0], ctx).then(() => ctx))
            .then(finalize_transaction);
        returnval(promise);
        }''',
        sample_item, {'sha256': sample_files_by_sha256})

    database_contents = get_db_contents(execute_in_page)

    assert len(database_contents['file']) == 4
    assert all([sample_files_by_sha256[file['sha256']] == file['contents']
                for file in database_contents['file']])
    assert all([len(file) == 2 for file in database_contents['file']])

    assert len(database_contents['file_uses']) == 4
    assert all([uses['uses'] == 1 for uses in database_contents['file_uses']])
    assert set([uses['sha256'] for uses in database_contents['file_uses']]) \
        == set([file['sha256'] for file in database_contents['file']])

    assert database_contents['mapping'] == []
    assert database_contents['resource'] == [sample_item]

    # See if trying to add an item without providing all its files ends in an
    # exception and aborts the transaction as it should.
    sample_item['scripts'].append(sample_file_ref('combined.js'))
    incomplete_files = {**sample_files_by_sha256}
    incomplete_files.pop(sample_files['combined.js']['sha256'])
    exception = execute_in_page(
        '''{
        const args = arguments;
        async function try_add_item()
        {
            const context =
                await start_items_transaction(["resource"], args[1]);
            try {
                await save_item(args[0], context);
                await finalize_transaction(context);
                return;
            } catch(e) {
                return e;
            }
        }
        returnval(try_add_item());
        }''',
        sample_item, {'sha256': incomplete_files})

    previous_database_contents = database_contents
    database_contents = get_db_contents(execute_in_page)

    assert 'file not present' in exception
    for key, val in database_contents.items():
        keyfun = lambda item: item.get('sha256') or item['identifier']
        assert sorted(previous_database_contents[key], key=keyfun) \
            == sorted(val,                             key=keyfun)

    # See if adding another item that partially uses first's files works OK.
    sample_item = make_sample_mapping()
    database_contents = execute_in_page(
        '''{
        const promise = start_items_transaction(["mapping"], arguments[1])
            .then(ctx => save_item(arguments[0], ctx).then(() => ctx))
            .then(finalize_transaction);
        returnval(promise);
        }''',
        sample_item, {'sha256': sample_files_by_sha256})

    database_contents = get_db_contents(execute_in_page)

    names = ['README.md', 'report.spdx', 'LICENSES/somelicense.txt', 'hello.js',
             'bye.js']
    sample_files_list = [sample_files[name] for name in names]
    uses_list = [1, 2, 1, 1, 1]

    uses = dict([(uses['sha256'], uses['uses'])
                 for uses in database_contents['file_uses']])
    assert uses  == dict([(file['sha256'], nr)
                          for file, nr in zip(sample_files_list, uses_list)])

    files = dict([(file['sha256'], file['contents'])
                  for file in database_contents['file']])
    assert files == dict([(file['sha256'], file['contents'])
                          for file in sample_files_list])

    del database_contents['resource'][0]['source_copyright'][0]['extra_prop']
    assert database_contents['resource'] == [make_sample_resource()]
    assert database_contents['mapping']  == [sample_item]

    # Try removing the items to get an empty database again.
    results = [None, None]
    for i, item_type in enumerate(['resource', 'mapping']):
         execute_in_page(
            f'''{{
            const remover = remove_{item_type};
            const promise =
                start_items_transaction(["{item_type}"], {{}})
                .then(ctx => remover('helloapple', ctx).then(() => ctx))
                .then(finalize_transaction);
            returnval(promise);
            }}''')

         results[i] = get_db_contents(execute_in_page)

    names = ['README.md', 'report.spdx']
    sample_files_list = [sample_files[name] for name in names]
    uses_list = [1, 1]

    uses = dict([(uses['sha256'], uses['uses'])
                 for uses in results[0]['file_uses']])
    assert uses  == dict([(file['sha256'], 1) for file in sample_files_list])

    files = dict([(file['sha256'], file['contents'])
                  for file in results[0]['file']])
    assert files == dict([(file['sha256'], file['contents'])
                          for file in sample_files_list])

    assert results[0]['resource'] == []
    assert results[0]['mapping'] == [sample_item]

    assert results[1] == dict([(key, []) for key in  results[0].keys()])

    # Try initializing an empty database with sample initial data object.
    sample_resource = make_sample_resource()
    sample_mapping = make_sample_mapping()
    initial_data = {
        'resource': {
            'helloapple': {
                '1.12':   sample_resource,
                '0.9':    'something_that_should_get_ignored',
                '1':      'something_that_should_get_ignored',
                '1.1':    'something_that_should_get_ignored',
                '1.11.1': 'something_that_should_get_ignored',
            }
        },
        'mapping': {
            'helloapple': {
                '0.1.1': sample_mapping
            }
        },
        'file': {
            'sha256': sample_files_by_sha256
        }
    }

    clear_indexeddb(execute_in_page)
    execute_in_page('initial_data = arguments[0];', initial_data)
    database_contents = get_db_contents(execute_in_page)

    assert database_contents['resource'] == [sample_resource]
    assert database_contents['mapping']  == [sample_mapping]

@pytest.mark.get_page('https://gotmyowndoma.in')
def test_haketilodb_settings(driver, execute_in_page):
    """
    indexeddb.js facilitates operating on Haketilo's internal database.
    Verify assigning/retrieving values of simple "setting" item works properly.
    """
    execute_in_page(load_script('common/indexeddb.js'))
    mock_broadcast(execute_in_page)

    # Start with no database.
    clear_indexeddb(execute_in_page)

    assert get_db_contents(execute_in_page)['setting'] == []

    assert execute_in_page('returnval(get_setting("option15"));') == None

    execute_in_page('returnval(set_setting("option15", "disable"));')
    assert execute_in_page('returnval(get_setting("option15"));') == 'disable'

    execute_in_page('returnval(set_setting("option15", "enable"));')
    assert execute_in_page('returnval(get_setting("option15"));') == 'enable'

@pytest.mark.get_page('https://gotmyowndoma.in')
def test_haketilodb_allowing(driver, execute_in_page):
    """
    indexeddb.js facilitates operating on Haketilo's internal database.
    Verify changing the "blocking" configuration for a URL works properly.
    """
    execute_in_page(load_script('common/indexeddb.js'))
    mock_broadcast(execute_in_page)

    # Start with no database.
    clear_indexeddb(execute_in_page)

    assert get_db_contents(execute_in_page)['blocking'] == []

    def run_with_sample_url(expr):
        return execute_in_page(f'returnval({expr});', 'https://example.com/**')

    assert None == run_with_sample_url('get_allowing(arguments[0])')

    run_with_sample_url('set_disallowed(arguments[0])')
    assert False == run_with_sample_url('get_allowing(arguments[0])')

    run_with_sample_url('set_allowed(arguments[0])')
    assert True == run_with_sample_url('get_allowing(arguments[0])')

    run_with_sample_url('set_default_allowing(arguments[0])')
    assert None == run_with_sample_url('get_allowing(arguments[0])')

@pytest.mark.get_page('https://gotmyowndoma.in')
def test_haketilodb_repos(driver, execute_in_page):
    """
    indexeddb.js facilitates operating on Haketilo's internal database.
    Verify operations on repositories list work properly.
    """
    execute_in_page(load_script('common/indexeddb.js'))
    mock_broadcast(execute_in_page)

    # Start with no database.
    clear_indexeddb(execute_in_page)

    assert get_db_contents(execute_in_page)['repo'] == []

    sample_urls = ['https://hdrlla.example.com/', 'https://hdrlla.example.org']

    assert [] == execute_in_page('returnval(get_repos());')

    execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[0])
    assert [sample_urls[0]] == execute_in_page('returnval(get_repos());')

    execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[1])
    assert set(sample_urls) == set(execute_in_page('returnval(get_repos());'))

    execute_in_page('returnval(del_repo(arguments[0]));', sample_urls[0])
    assert [sample_urls[1]] == execute_in_page('returnval(get_repos());')

test_page_html = '''
<!DOCTYPE html>
<script src="/testpage.js"></script>
<body>
</body>
'''

@pytest.mark.ext_data({
    'background_script': broker_js,
    'test_page':         test_page_html,
    'extra_files': {
        'testpage.js':   lambda: load_script('common/indexeddb.js')
    }
})
@pytest.mark.usefixtures('webextension')
def test_haketilodb_track(driver, execute_in_page, wait_elem_text):
    """
    Verify IndexedDB object change notifications are correctly broadcasted
    through extension's background script and allow for object store contents
    to be tracked in any execution context.
    """
    # Let's open the same extension's test page in a second window. Window 1
    # will be used to make changes to IndexedDB and window 0 to "track" those
    # changes.
    driver.execute_script('window.open(window.location.href, "_blank");')
    WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2)
    windows = [*driver.window_handles]

    # Create elements that will have tracked data inserted under them.
    driver.switch_to.window(windows[0])
    execute_in_page(
        '''
        for (const store_name of trackable) {
            const h2 = document.createElement("h2");
            h2.innerText = store_name;
            document.body.append(h2);

            const ul = document.createElement("ul");
            ul.id = store_name;
            document.body.append(ul);
        }
        ''')

    # Mock initial_data.
    sample_resource = make_sample_resource()
    sample_mapping = make_sample_mapping()
    initial_data = {
        'resource': {
            'helloapple': {
                '1.0': sample_resource
            }
        },
        'mapping': {
            'helloapple': {
                '0.1.1': sample_mapping
            }
        },
        'file': {
            'sha256': sample_files_by_sha256
        }
    }
    driver.switch_to.window(windows[1])
    execute_in_page('initial_data = arguments[0];', initial_data)
    execute_in_page('returnval(set_setting("option15", "123"));')
    execute_in_page('returnval(set_repo("https://hydril.la"));')
    execute_in_page('returnval(set_disallowed("file:///*"));')

    # See if track.*() functions properly return the already-existing items.
    driver.switch_to.window(windows[0])
    execute_in_page(
        '''
        function update_item(store_name, change)
        {
            const elem_id = `${store_name}_${change.key}`;
            let elem = document.getElementById(elem_id);
            elem = elem || document.createElement("li");
            elem.id = elem_id;
            elem.innerText = JSON.stringify(change.new_val);
            document.getElementById(store_name).append(elem);
            if (change.new_val === undefined)
                elem.remove();
        }

        let resource_tracking, resource_items, mapping_tracking, mapping_items;

        async function start_reporting()
        {
            const props = new Map(stores.map(([sn, opt]) => [sn, opt.keyPath]));
            for (const store_name of trackable) {
                [tracking, items] =
                    await track[store_name](ch => update_item(store_name, ch));
                const prop = props.get(store_name);
                for (const item of items)
                    update_item(store_name, {key: item[prop], new_val: item});
            }
        }

        returnval(start_reporting());
        ''')

    item_counts = execute_in_page(
        '''{
        const childcount = id => document.getElementById(id).childElementCount;
        returnval(trackable.map(childcount));
        }''')
    assert item_counts == [1 for _ in item_counts]
    for elem_id, json_value in [
            ('resource_helloapple', sample_resource),
            ('mapping_helloapple', sample_mapping),
            ('setting_option15', {'name': 'option15', 'value': '123'}),
            ('repo_https://hydril.la', {'url': 'https://hydril.la'}),
            ('blocking_file:///*', {'pattern': 'file:///*', 'allow': False})
    ]:
        assert json.loads(driver.find_element_by_id(elem_id).text) == json_value

    # See if item additions get tracked properly.
    driver.switch_to.window(windows[1])
    sample_resource2 = make_sample_resource()
    sample_resource2['identifier'] = 'helloapple-copy'
    sample_mapping2 = make_sample_mapping()
    sample_mapping2['identifier'] = 'helloapple-copy'
    sample_data = {
        'resource': {
            'helloapple-copy': {
                '1.0': sample_resource2
            }
        },
        'mapping': {
            'helloapple-copy': {
                '0.1.1': sample_mapping2
            }
        },
        'file': {
            'sha256': sample_files_by_sha256
        },
        'repo': [
            'https://hydril2.la/'
        ]
    }
    execute_in_page('returnval(save_items(arguments[0]));', sample_data)
    execute_in_page('returnval(set_setting("option22", "abc"));')
    execute_in_page('returnval(set_repo("https://hydril3.la/"));')
    execute_in_page('returnval(set_allowed("ftp://a.bc/"));')

    driver.switch_to.window(windows[0])
    driver.implicitly_wait(10)
    for elem_id, json_value in [
            ('resource_helloapple-copy', sample_resource2),
            ('mapping_helloapple-copy', sample_mapping2),
            ('setting_option22', {'name': 'option22', 'value': 'abc'}),
            ('repo_https://hydril2.la/', {'url': 'https://hydril2.la/'}),
            ('repo_https://hydril3.la/', {'url': 'https://hydril3.la/'}),
            ('blocking_ftp://a.bc/', {'pattern': 'ftp://a.bc/', 'allow': True})
    ]:
        assert json.loads(driver.find_element_by_id(elem_id).text) == json_value
    driver.implicitly_wait(0)

    # See if item deletions/modifications get tracked properly.
    driver.switch_to.window(windows[1])
    execute_in_page(
        '''{
        async function change_remove_items()
        {
            const store_names = ["resource", "mapping"];
            const ctx = await start_items_transaction(store_names, {});
            await remove_resource("helloapple", ctx);
            await remove_mapping("helloapple-copy", ctx);
            await finalize_transaction(ctx);
            await set_setting("option22", null);
            await del_repo("https://hydril.la");
            await set_default_allowing("file:///*");
            await set_disallowed("ftp://a.bc/");
        }
        returnval(change_remove_items());
        }''')

    removed_ids = ['mapping_helloapple-copy', 'resource_helloapple',
                   'repo_https://hydril.la', 'blocking_file:///*']
    def condition_items_absent_and_changed(driver):
        for id in removed_ids:
            try:
                driver.find_element_by_id(id)
                return False
            except WebDriverException:
                pass

        option_text = driver.find_element_by_id('setting_option22').text
        blocking_text = driver.find_element_by_id('blocking_ftp://a.bc/').text
        return (json.loads(option_text)['value'] == None and
                json.loads(blocking_text)['allow'] == False)

    driver.switch_to.window(windows[0])
    WebDriverWait(driver, 10).until(condition_items_absent_and_changed)