aboutsummaryrefslogtreecommitdiff
path: root/test/haketilo_test/unit/test_indexeddb.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/haketilo_test/unit/test_indexeddb.py')
-rw-r--r--test/haketilo_test/unit/test_indexeddb.py490
1 files changed, 490 insertions, 0 deletions
diff --git a/test/haketilo_test/unit/test_indexeddb.py b/test/haketilo_test/unit/test_indexeddb.py
new file mode 100644
index 0000000..c2d5427
--- /dev/null
+++ b/test/haketilo_test/unit/test_indexeddb.py
@@ -0,0 +1,490 @@
+# SPDX-License-Identifier: CC0-1.0
+
+"""
+Haketilo unit tests - IndexedDB access
+"""
+
+# This file is part of Haketilo
+#
+# Copyright (C) 2021, 2022 Wojtek Kosior <koszko@koszko.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the CC0 1.0 Universal License as published by
+# the Creative Commons Corporation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# CC0 1.0 Universal License for more details.
+
+import pytest
+import json
+from selenium.webdriver.common.by import By
+from selenium.webdriver.support.ui import WebDriverWait
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.common.exceptions import WebDriverException
+
+from ..script_loader import load_script
+from .utils import *
+
+# Sample resource definitions. They'd normally contain more fields but here we
+# use simplified versions.
+
+def make_sample_resource():
+ return {
+ 'source_copyright': [
+ sample_file_ref('report.spdx'),
+ sample_file_ref('LICENSES/somelicense.txt')
+ ],
+ 'type': 'resource',
+ 'identifier': 'helloapple',
+ 'scripts': [sample_file_ref('hello.js'), sample_file_ref('bye.js')]
+ }
+
+def make_sample_mapping():
+ return {
+ 'source_copyright': [
+ sample_file_ref('report.spdx'),
+ sample_file_ref('README.md')
+ ],
+ 'type': 'mapping',
+ 'identifier': 'helloapple'
+ }
+
+@pytest.mark.get_page('https://gotmyowndoma.in')
+def test_haketilodb_item_modifications(driver, execute_in_page):
+ """
+ indexeddb.js facilitates operating on Haketilo's internal database.
+ Verify database operations on mappings/resources work properly.
+ """
+ execute_in_page(load_script('common/indexeddb.js'))
+ mock_broadcast(execute_in_page)
+
+ # Start with no database.
+ clear_indexeddb(execute_in_page)
+
+ sample_item = make_sample_resource()
+ sample_item['source_copyright'][0]['extra_prop'] = True
+
+ execute_in_page(
+ '''{
+ const promise = start_items_transaction(["resource"], arguments[1])
+ .then(ctx => save_item(arguments[0], ctx).then(() => ctx))
+ .then(finalize_transaction);
+ returnval(promise);
+ }''',
+ sample_item, {'sha256': sample_files_by_sha256})
+
+ database_contents = get_db_contents(execute_in_page)
+
+ assert len(database_contents['file']) == 4
+ assert all([sample_files_by_sha256[file['sha256']] == file['contents']
+ for file in database_contents['file']])
+ assert all([len(file) == 2 for file in database_contents['file']])
+
+ assert len(database_contents['file_uses']) == 4
+ assert all([uses['uses'] == 1 for uses in database_contents['file_uses']])
+ assert set([uses['sha256'] for uses in database_contents['file_uses']]) \
+ == set([file['sha256'] for file in database_contents['file']])
+
+ assert database_contents['mapping'] == []
+ assert database_contents['resource'] == [sample_item]
+
+ # See if trying to add an item without providing all its files ends in an
+ # exception and aborts the transaction as it should.
+ sample_item['scripts'].append(sample_file_ref('combined.js'))
+ incomplete_files = {**sample_files_by_sha256}
+ incomplete_files.pop(sample_files['combined.js']['sha256'])
+ exception = execute_in_page(
+ '''{
+ const args = arguments;
+ async function try_add_item()
+ {
+ const context =
+ await start_items_transaction(["resource"], args[1]);
+ try {
+ await save_item(args[0], context);
+ await finalize_transaction(context);
+ return;
+ } catch(e) {
+ return e;
+ }
+ }
+ returnval(try_add_item());
+ }''',
+ sample_item, {'sha256': incomplete_files})
+
+ previous_database_contents = database_contents
+ database_contents = get_db_contents(execute_in_page)
+
+ assert 'file not present' in exception
+ for key, val in database_contents.items():
+ keyfun = lambda item: item.get('sha256') or item['identifier']
+ assert sorted(previous_database_contents[key], key=keyfun) \
+ == sorted(val, key=keyfun)
+
+ # See if adding another item that partially uses first's files works OK.
+ sample_item = make_sample_mapping()
+ database_contents = execute_in_page(
+ '''{
+ const promise = start_items_transaction(["mapping"], arguments[1])
+ .then(ctx => save_item(arguments[0], ctx).then(() => ctx))
+ .then(finalize_transaction);
+ returnval(promise);
+ }''',
+ sample_item, {'sha256': sample_files_by_sha256})
+
+ database_contents = get_db_contents(execute_in_page)
+
+ names = ['README.md', 'report.spdx', 'LICENSES/somelicense.txt', 'hello.js',
+ 'bye.js']
+ sample_files_list = [sample_files[name] for name in names]
+ uses_list = [1, 2, 1, 1, 1]
+
+ uses = dict([(uses['sha256'], uses['uses'])
+ for uses in database_contents['file_uses']])
+ assert uses == dict([(file['sha256'], nr)
+ for file, nr in zip(sample_files_list, uses_list)])
+
+ files = dict([(file['sha256'], file['contents'])
+ for file in database_contents['file']])
+ assert files == dict([(file['sha256'], file['contents'])
+ for file in sample_files_list])
+
+ del database_contents['resource'][0]['source_copyright'][0]['extra_prop']
+ assert database_contents['resource'] == [make_sample_resource()]
+ assert database_contents['mapping'] == [sample_item]
+
+ # Try removing the items to get an empty database again.
+ results = [None, None]
+ for i, item_type in enumerate(['resource', 'mapping']):
+ execute_in_page(
+ f'''{{
+ const remover = remove_{item_type};
+ const promise =
+ start_items_transaction(["{item_type}"], {{}})
+ .then(ctx => remover('helloapple', ctx).then(() => ctx))
+ .then(finalize_transaction);
+ returnval(promise);
+ }}''')
+
+ results[i] = get_db_contents(execute_in_page)
+
+ names = ['README.md', 'report.spdx']
+ sample_files_list = [sample_files[name] for name in names]
+ uses_list = [1, 1]
+
+ uses = dict([(uses['sha256'], uses['uses'])
+ for uses in results[0]['file_uses']])
+ assert uses == dict([(file['sha256'], 1) for file in sample_files_list])
+
+ files = dict([(file['sha256'], file['contents'])
+ for file in results[0]['file']])
+ assert files == dict([(file['sha256'], file['contents'])
+ for file in sample_files_list])
+
+ assert results[0]['resource'] == []
+ assert results[0]['mapping'] == [sample_item]
+
+ assert results[1] == dict([(key, []) for key in results[0].keys()])
+
+ # Try initializing an empty database with sample initial data object.
+ sample_resource = make_sample_resource()
+ sample_mapping = make_sample_mapping()
+ initial_data = {
+ 'resource': {
+ 'helloapple': {
+ '1.12': sample_resource,
+ '0.9': 'something_that_should_get_ignored',
+ '1': 'something_that_should_get_ignored',
+ '1.1': 'something_that_should_get_ignored',
+ '1.11.1': 'something_that_should_get_ignored',
+ }
+ },
+ 'mapping': {
+ 'helloapple': {
+ '0.1.1': sample_mapping
+ }
+ },
+ 'file': {
+ 'sha256': sample_files_by_sha256
+ }
+ }
+
+ clear_indexeddb(execute_in_page)
+ execute_in_page('initial_data = arguments[0];', initial_data)
+ database_contents = get_db_contents(execute_in_page)
+
+ assert database_contents['resource'] == [sample_resource]
+ assert database_contents['mapping'] == [sample_mapping]
+
+@pytest.mark.get_page('https://gotmyowndoma.in')
+def test_haketilodb_settings(driver, execute_in_page):
+ """
+ indexeddb.js facilitates operating on Haketilo's internal database.
+ Verify assigning/retrieving values of simple "setting" item works properly.
+ """
+ execute_in_page(load_script('common/indexeddb.js'))
+ mock_broadcast(execute_in_page)
+
+ # Start with no database.
+ clear_indexeddb(execute_in_page)
+
+ assert get_db_contents(execute_in_page)['setting'] == []
+
+ assert execute_in_page('returnval(get_setting("option15"));') == None
+
+ execute_in_page('returnval(set_setting("option15", "disable"));')
+ assert execute_in_page('returnval(get_setting("option15"));') == 'disable'
+
+ execute_in_page('returnval(set_setting("option15", "enable"));')
+ assert execute_in_page('returnval(get_setting("option15"));') == 'enable'
+
+@pytest.mark.get_page('https://gotmyowndoma.in')
+def test_haketilodb_allowing(driver, execute_in_page):
+ """
+ indexeddb.js facilitates operating on Haketilo's internal database.
+ Verify changing the "blocking" configuration for a URL works properly.
+ """
+ execute_in_page(load_script('common/indexeddb.js'))
+ mock_broadcast(execute_in_page)
+
+ # Start with no database.
+ clear_indexeddb(execute_in_page)
+
+ assert get_db_contents(execute_in_page)['blocking'] == []
+
+ def run_with_sample_url(expr):
+ return execute_in_page(f'returnval({expr});', 'https://example.com/**')
+
+ assert None == run_with_sample_url('get_allowing(arguments[0])')
+
+ run_with_sample_url('set_disallowed(arguments[0])')
+ assert False == run_with_sample_url('get_allowing(arguments[0])')
+
+ run_with_sample_url('set_allowed(arguments[0])')
+ assert True == run_with_sample_url('get_allowing(arguments[0])')
+
+ run_with_sample_url('set_default_allowing(arguments[0])')
+ assert None == run_with_sample_url('get_allowing(arguments[0])')
+
+@pytest.mark.get_page('https://gotmyowndoma.in')
+def test_haketilodb_repos(driver, execute_in_page):
+ """
+ indexeddb.js facilitates operating on Haketilo's internal database.
+ Verify operations on repositories list work properly.
+ """
+ execute_in_page(load_script('common/indexeddb.js'))
+ mock_broadcast(execute_in_page)
+
+ # Start with no database.
+ clear_indexeddb(execute_in_page)
+
+ assert get_db_contents(execute_in_page)['repo'] == []
+
+ sample_urls = ['https://hdrlla.example.com/', 'https://hdrlla.example.org']
+
+ assert [] == execute_in_page('returnval(get_repos());')
+
+ execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[0])
+ assert [sample_urls[0]] == execute_in_page('returnval(get_repos());')
+
+ execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[1])
+ assert set(sample_urls) == set(execute_in_page('returnval(get_repos());'))
+
+ execute_in_page('returnval(del_repo(arguments[0]));', sample_urls[0])
+ assert [sample_urls[1]] == execute_in_page('returnval(get_repos());')
+
+test_page_html = '''
+<!DOCTYPE html>
+<script src="/testpage.js"></script>
+<body>
+</body>
+'''
+
+@pytest.mark.ext_data({
+ 'background_script': broker_js,
+ 'test_page': test_page_html,
+ 'extra_files': {
+ 'testpage.js': lambda: load_script('common/indexeddb.js')
+ }
+})
+@pytest.mark.usefixtures('webextension')
+def test_haketilodb_track(driver, execute_in_page, wait_elem_text):
+ """
+ Verify IndexedDB object change notifications are correctly broadcasted
+ through extension's background script and allow for object store contents
+ to be tracked in any execution context.
+ """
+ # Let's open the same extension's test page in a second window. Window 1
+ # will be used to make changes to IndexedDB and window 0 to "track" those
+ # changes.
+ driver.execute_script('window.open(window.location.href, "_blank");')
+ WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2)
+ windows = [*driver.window_handles]
+
+ # Create elements that will have tracked data inserted under them.
+ driver.switch_to.window(windows[0])
+ execute_in_page(
+ '''
+ for (const store_name of trackable) {
+ const h2 = document.createElement("h2");
+ h2.innerText = store_name;
+ document.body.append(h2);
+
+ const ul = document.createElement("ul");
+ ul.id = store_name;
+ document.body.append(ul);
+ }
+ ''')
+
+ # Mock initial_data.
+ sample_resource = make_sample_resource()
+ sample_mapping = make_sample_mapping()
+ initial_data = {
+ 'resource': {
+ 'helloapple': {
+ '1.0': sample_resource
+ }
+ },
+ 'mapping': {
+ 'helloapple': {
+ '0.1.1': sample_mapping
+ }
+ },
+ 'file': {
+ 'sha256': sample_files_by_sha256
+ }
+ }
+ driver.switch_to.window(windows[1])
+ execute_in_page('initial_data = arguments[0];', initial_data)
+ execute_in_page('returnval(set_setting("option15", "123"));')
+ execute_in_page('returnval(set_repo("https://hydril.la"));')
+ execute_in_page('returnval(set_disallowed("file:///*"));')
+
+ # See if track.*() functions properly return the already-existing items.
+ driver.switch_to.window(windows[0])
+ execute_in_page(
+ '''
+ function update_item(store_name, change)
+ {
+ const elem_id = `${store_name}_${change.key}`;
+ let elem = document.getElementById(elem_id);
+ elem = elem || document.createElement("li");
+ elem.id = elem_id;
+ elem.innerText = JSON.stringify(change.new_val);
+ document.getElementById(store_name).append(elem);
+ if (change.new_val === undefined)
+ elem.remove();
+ }
+
+ let resource_tracking, resource_items, mapping_tracking, mapping_items;
+
+ async function start_reporting()
+ {
+ const props = new Map(stores.map(([sn, opt]) => [sn, opt.keyPath]));
+ for (const store_name of trackable) {
+ [tracking, items] =
+ await track[store_name](ch => update_item(store_name, ch));
+ const prop = props.get(store_name);
+ for (const item of items)
+ update_item(store_name, {key: item[prop], new_val: item});
+ }
+ }
+
+ returnval(start_reporting());
+ ''')
+
+ item_counts = execute_in_page(
+ '''{
+ const childcount = id => document.getElementById(id).childElementCount;
+ returnval(trackable.map(childcount));
+ }''')
+ assert item_counts == [1 for _ in item_counts]
+ for elem_id, json_value in [
+ ('resource_helloapple', sample_resource),
+ ('mapping_helloapple', sample_mapping),
+ ('setting_option15', {'name': 'option15', 'value': '123'}),
+ ('repo_https://hydril.la', {'url': 'https://hydril.la'}),
+ ('blocking_file:///*', {'pattern': 'file:///*', 'allow': False})
+ ]:
+ assert json.loads(driver.find_element_by_id(elem_id).text) == json_value
+
+ # See if item additions get tracked properly.
+ driver.switch_to.window(windows[1])
+ sample_resource2 = make_sample_resource()
+ sample_resource2['identifier'] = 'helloapple-copy'
+ sample_mapping2 = make_sample_mapping()
+ sample_mapping2['identifier'] = 'helloapple-copy'
+ sample_data = {
+ 'resource': {
+ 'helloapple-copy': {
+ '1.0': sample_resource2
+ }
+ },
+ 'mapping': {
+ 'helloapple-copy': {
+ '0.1.1': sample_mapping2
+ }
+ },
+ 'file': {
+ 'sha256': sample_files_by_sha256
+ },
+ 'repo': [
+ 'https://hydril2.la/'
+ ]
+ }
+ execute_in_page('returnval(save_items(arguments[0]));', sample_data)
+ execute_in_page('returnval(set_setting("option22", "abc"));')
+ execute_in_page('returnval(set_repo("https://hydril3.la/"));')
+ execute_in_page('returnval(set_allowed("ftp://a.bc/"));')
+
+ driver.switch_to.window(windows[0])
+ driver.implicitly_wait(10)
+ for elem_id, json_value in [
+ ('resource_helloapple-copy', sample_resource2),
+ ('mapping_helloapple-copy', sample_mapping2),
+ ('setting_option22', {'name': 'option22', 'value': 'abc'}),
+ ('repo_https://hydril2.la/', {'url': 'https://hydril2.la/'}),
+ ('repo_https://hydril3.la/', {'url': 'https://hydril3.la/'}),
+ ('blocking_ftp://a.bc/', {'pattern': 'ftp://a.bc/', 'allow': True})
+ ]:
+ assert json.loads(driver.find_element_by_id(elem_id).text) == json_value
+ driver.implicitly_wait(0)
+
+ # See if item deletions/modifications get tracked properly.
+ driver.switch_to.window(windows[1])
+ execute_in_page(
+ '''{
+ async function change_remove_items()
+ {
+ const store_names = ["resource", "mapping"];
+ const ctx = await start_items_transaction(store_names, {});
+ await remove_resource("helloapple", ctx);
+ await remove_mapping("helloapple-copy", ctx);
+ await finalize_transaction(ctx);
+ await set_setting("option22", null);
+ await del_repo("https://hydril.la");
+ await set_default_allowing("file:///*");
+ await set_disallowed("ftp://a.bc/");
+ }
+ returnval(change_remove_items());
+ }''')
+
+ removed_ids = ['mapping_helloapple-copy', 'resource_helloapple',
+ 'repo_https://hydril.la', 'blocking_file:///*']
+ def condition_items_absent_and_changed(driver):
+ for id in removed_ids:
+ try:
+ driver.find_element_by_id(id)
+ return False
+ except WebDriverException:
+ pass
+
+ option_text = driver.find_element_by_id('setting_option22').text
+ blocking_text = driver.find_element_by_id('blocking_ftp://a.bc/').text
+ return (json.loads(option_text)['value'] == None and
+ json.loads(blocking_text)['allow'] == False)
+
+ driver.switch_to.window(windows[0])
+ WebDriverWait(driver, 10).until(condition_items_absent_and_changed)