# SPDX-License-Identifier: CC0-1.0 """ Haketilo unit tests - IndexedDB access """ # This file is part of Haketilo # # Copyright (C) 2021, 2022 Wojtek Kosior <koszko@koszko.org> # # This program is free software: you can redistribute it and/or modify # it under the terms of the CC0 1.0 Universal License as published by # the Creative Commons Corporation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # CC0 1.0 Universal License for more details. import pytest import json from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.common.exceptions import WebDriverException from ..script_loader import load_script from .utils import * # Sample resource definitions. They'd normally contain more fields but here we # use simplified versions. def make_sample_resource(): return { 'source_copyright': [ sample_file_ref('report.spdx'), sample_file_ref('LICENSES/somelicense.txt') ], 'type': 'resource', 'identifier': 'helloapple', 'scripts': [sample_file_ref('hello.js'), sample_file_ref('bye.js')] } def make_sample_mapping(): return { 'source_copyright': [ sample_file_ref('report.spdx'), sample_file_ref('README.md') ], 'type': 'mapping', 'identifier': 'helloapple' } @pytest.mark.get_page('https://gotmyowndoma.in') def test_haketilodb_update_1_0_1(driver, execute_in_page): """ indexeddb.js modifies data when updating to database version 1.0.1. Verify the update procedure works properly. """ execute_in_page(load_script('common/indexeddb.js')) execute_in_page('db_version_nr = version_nr([1, 0, 0]);') mock_broadcast(execute_in_page) # Start with no database. clear_indexeddb(execute_in_page) v1_url = 'https://hydrilla.koszko.org/api_v1/' v2_url = 'https://hydrilla.koszko.org/api_v2/' urls_v1 = {'https://sample.url/abc/', v1_url} urls_v2 = {'https://sample.url/abc/', v2_url} for url in urls_v1: execute_in_page('returnval(set_repo(arguments[0]));', url) assert urls_v1 == set(execute_in_page('returnval(get_repos());')) # Verify that url gets updated to v2 upon database update. driver.get(driver.execute_script('return window.location.href;')) execute_in_page(load_script('common/indexeddb.js')) mock_broadcast(execute_in_page) database_contents = get_db_contents(execute_in_page) assert set(execute_in_page('returnval(get_repos());')) == urls_v2 # Verify that url does not get updated when there is no database update. execute_in_page('returnval(del_repo(arguments[0]));', v2_url) execute_in_page('returnval(set_repo(arguments[0]));', v1_url) driver.get(driver.execute_script('return window.location.href;')) execute_in_page(load_script('common/indexeddb.js')) mock_broadcast(execute_in_page) database_contents = get_db_contents(execute_in_page) assert set(execute_in_page('returnval(get_repos());')) == urls_v1 @pytest.mark.get_page('https://gotmyowndoma.in') def test_haketilodb_item_modifications(driver, execute_in_page): """ indexeddb.js facilitates operating on Haketilo's internal database. Verify database operations on mappings/resources work properly. """ execute_in_page(load_script('common/indexeddb.js')) mock_broadcast(execute_in_page) # Start with no database. clear_indexeddb(execute_in_page) sample_item = make_sample_resource() sample_item['source_copyright'][0]['extra_prop'] = True execute_in_page( '''{ const promise = start_items_transaction(["resource"], arguments[1]) .then(ctx => save_item(arguments[0], ctx).then(() => ctx)) .then(finalize_transaction); returnval(promise); }''', sample_item, {'sha256': sample_files_by_sha256}) database_contents = get_db_contents(execute_in_page) assert len(database_contents['file']) == 4 assert all([sample_files_by_sha256[file['sha256']] == file['contents'] for file in database_contents['file']]) assert all([len(file) == 2 for file in database_contents['file']]) assert len(database_contents['file_uses']) == 4 assert all([uses['uses'] == 1 for uses in database_contents['file_uses']]) assert set([uses['sha256'] for uses in database_contents['file_uses']]) \ == set([file['sha256'] for file in database_contents['file']]) assert database_contents['mapping'] == [] assert database_contents['resource'] == [sample_item] # See if trying to add an item without providing all its files ends in an # exception and aborts the transaction as it should. sample_item['scripts'].append(sample_file_ref('combined.js')) incomplete_files = {**sample_files_by_sha256} incomplete_files.pop(sample_files['combined.js']['sha256']) exception = execute_in_page( '''{ const args = arguments; async function try_add_item() { const context = await start_items_transaction(["resource"], args[1]); try { await save_item(args[0], context); await finalize_transaction(context); return; } catch(e) { return e; } } returnval(try_add_item()); }''', sample_item, {'sha256': incomplete_files}) previous_database_contents = database_contents database_contents = get_db_contents(execute_in_page) assert 'file not present' in exception for key, val in database_contents.items(): keyfun = lambda item: item.get('sha256') or item['identifier'] assert sorted(previous_database_contents[key], key=keyfun) \ == sorted(val, key=keyfun) # See if adding another item that partially uses first's files works OK. sample_item = make_sample_mapping() database_contents = execute_in_page( '''{ const promise = start_items_transaction(["mapping"], arguments[1]) .then(ctx => save_item(arguments[0], ctx).then(() => ctx)) .then(finalize_transaction); returnval(promise); }''', sample_item, {'sha256': sample_files_by_sha256}) database_contents = get_db_contents(execute_in_page) names = ['README.md', 'report.spdx', 'LICENSES/somelicense.txt', 'hello.js', 'bye.js'] sample_files_list = [sample_files[name] for name in names] uses_list = [1, 2, 1, 1, 1] uses = dict([(uses['sha256'], uses['uses']) for uses in database_contents['file_uses']]) assert uses == dict([(file['sha256'], nr) for file, nr in zip(sample_files_list, uses_list)]) files = dict([(file['sha256'], file['contents']) for file in database_contents['file']]) assert files == dict([(file['sha256'], file['contents']) for file in sample_files_list]) del database_contents['resource'][0]['source_copyright'][0]['extra_prop'] assert database_contents['resource'] == [make_sample_resource()] assert database_contents['mapping'] == [sample_item] # Try removing the items to get an empty database again. results = [None, None] for i, item_type in enumerate(['resource', 'mapping']): execute_in_page( f'''{{ const remover = remove_{item_type}; const promise = start_items_transaction(["{item_type}"], {{}}) .then(ctx => remover('helloapple', ctx).then(() => ctx)) .then(finalize_transaction); returnval(promise); }}''') results[i] = get_db_contents(execute_in_page) names = ['README.md', 'report.spdx'] sample_files_list = [sample_files[name] for name in names] uses_list = [1, 1] uses = dict([(uses['sha256'], uses['uses']) for uses in results[0]['file_uses']]) assert uses == dict([(file['sha256'], 1) for file in sample_files_list]) files = dict([(file['sha256'], file['contents']) for file in results[0]['file']]) assert files == dict([(file['sha256'], file['contents']) for file in sample_files_list]) assert results[0]['resource'] == [] assert results[0]['mapping'] == [sample_item] assert results[1] == dict([(key, []) for key in results[0].keys()]) # Try initializing an empty database with sample initial data object. sample_resource = make_sample_resource() sample_mapping = make_sample_mapping() initial_data = { 'resource': { 'helloapple': { '1.12': sample_resource, '0.9': 'something_that_should_get_ignored', '1': 'something_that_should_get_ignored', '1.1': 'something_that_should_get_ignored', '1.11.1': 'something_that_should_get_ignored', } }, 'mapping': { 'helloapple': { '0.1.1': sample_mapping } }, 'file': { 'sha256': sample_files_by_sha256 } } clear_indexeddb(execute_in_page) execute_in_page('initial_data = arguments[0];', initial_data) database_contents = get_db_contents(execute_in_page) assert database_contents['resource'] == [sample_resource] assert database_contents['mapping'] == [sample_mapping] @pytest.mark.get_page('https://gotmyowndoma.in') def test_haketilodb_settings(driver, execute_in_page): """ indexeddb.js facilitates operating on Haketilo's internal database. Verify assigning/retrieving values of simple "setting" item works properly. """ execute_in_page(load_script('common/indexeddb.js')) mock_broadcast(execute_in_page) # Start with no database. clear_indexeddb(execute_in_page) assert get_db_contents(execute_in_page)['setting'] == [] assert execute_in_page('returnval(get_setting("option15"));') == None execute_in_page('returnval(set_setting("option15", "disable"));') assert execute_in_page('returnval(get_setting("option15"));') == 'disable' execute_in_page('returnval(set_setting("option15", "enable"));') assert execute_in_page('returnval(get_setting("option15"));') == 'enable' @pytest.mark.get_page('https://gotmyowndoma.in') def test_haketilodb_allowing(driver, execute_in_page): """ indexeddb.js facilitates operating on Haketilo's internal database. Verify changing the "blocking" configuration for a URL works properly. """ execute_in_page(load_script('common/indexeddb.js')) mock_broadcast(execute_in_page) # Start with no database. clear_indexeddb(execute_in_page) assert get_db_contents(execute_in_page)['blocking'] == [] def run_with_sample_url(expr): return execute_in_page(f'returnval({expr});', 'https://example.com/**') assert None == run_with_sample_url('get_allowing(arguments[0])') run_with_sample_url('set_disallowed(arguments[0])') assert False == run_with_sample_url('get_allowing(arguments[0])') run_with_sample_url('set_allowed(arguments[0])') assert True == run_with_sample_url('get_allowing(arguments[0])') run_with_sample_url('set_default_allowing(arguments[0])') assert None == run_with_sample_url('get_allowing(arguments[0])') @pytest.mark.get_page('https://gotmyowndoma.in') def test_haketilodb_repos(driver, execute_in_page): """ indexeddb.js facilitates operating on Haketilo's internal database. Verify operations on repositories list work properly. """ execute_in_page(load_script('common/indexeddb.js')) mock_broadcast(execute_in_page) # Start with no database. clear_indexeddb(execute_in_page) assert get_db_contents(execute_in_page)['repo'] == [] sample_urls = ['https://hdrlla.example.com/', 'https://hdrlla.example.org'] assert [] == execute_in_page('returnval(get_repos());') execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[0]) assert [sample_urls[0]] == execute_in_page('returnval(get_repos());') execute_in_page('returnval(set_repo(arguments[0]));', sample_urls[1]) assert set(sample_urls) == set(execute_in_page('returnval(get_repos());')) execute_in_page('returnval(del_repo(arguments[0]));', sample_urls[0]) assert [sample_urls[1]] == execute_in_page('returnval(get_repos());') test_page_html = ''' <!DOCTYPE html> <script src="/testpage.js"></script> <body> </body> ''' @pytest.mark.ext_data({ 'background_script': broker_js, 'test_page': test_page_html, 'extra_files': { 'testpage.js': lambda: load_script('common/indexeddb.js') } }) @pytest.mark.usefixtures('webextension') def test_haketilodb_track(driver, execute_in_page, wait_elem_text): """ Verify IndexedDB object change notifications are correctly broadcasted through extension's background script and allow for object store contents to be tracked in any execution context. """ # Let's open the same extension's test page in a second window. Window 1 # will be used to make changes to IndexedDB and window 0 to "track" those # changes. driver.execute_script('window.open(window.location.href, "_blank");') WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 2) windows = [*driver.window_handles] # Create elements that will have tracked data inserted under them. driver.switch_to.window(windows[0]) execute_in_page( ''' for (const store_name of trackable) { const h2 = document.createElement("h2"); h2.innerText = store_name; document.body.append(h2); const ul = document.createElement("ul"); ul.id = store_name; document.body.append(ul); } ''') # Mock initial_data. sample_resource = make_sample_resource() sample_mapping = make_sample_mapping() initial_data = { 'resource': { 'helloapple': { '1.0': sample_resource } }, 'mapping': { 'helloapple': { '0.1.1': sample_mapping } }, 'file': { 'sha256': sample_files_by_sha256 } } driver.switch_to.window(windows[1]) execute_in_page('initial_data = arguments[0];', initial_data) execute_in_page('returnval(set_setting("option15", "123"));') execute_in_page('returnval(set_repo("https://hydril.la"));') execute_in_page('returnval(set_disallowed("file:///*"));') # See if track.*() functions properly return the already-existing items. driver.switch_to.window(windows[0]) execute_in_page( ''' function update_item(store_name, change) { const elem_id = `${store_name}_${change.key}`; let elem = document.getElementById(elem_id); elem = elem || document.createElement("li"); elem.id = elem_id; elem.innerText = JSON.stringify(change.new_val); document.getElementById(store_name).append(elem); if (change.new_val === undefined) elem.remove(); } let resource_tracking, resource_items, mapping_tracking, mapping_items; async function start_reporting() { const props = new Map(stores.map(([sn, opt]) => [sn, opt.keyPath])); for (const store_name of trackable) { [tracking, items] = await track[store_name](ch => update_item(store_name, ch)); const prop = props.get(store_name); for (const item of items) update_item(store_name, {key: item[prop], new_val: item}); } } returnval(start_reporting()); ''') item_counts = execute_in_page( '''{ const childcount = id => document.getElementById(id).childElementCount; returnval(trackable.map(childcount)); }''') assert item_counts == [1 for _ in item_counts] for elem_id, json_value in [ ('resource_helloapple', sample_resource), ('mapping_helloapple', sample_mapping), ('setting_option15', {'name': 'option15', 'value': '123'}), ('repo_https://hydril.la', {'url': 'https://hydril.la'}), ('blocking_file:///*', {'pattern': 'file:///*', 'allow': False}) ]: assert json.loads(driver.find_element_by_id(elem_id).text) == json_value # See if item additions get tracked properly. driver.switch_to.window(windows[1]) sample_resource2 = make_sample_resource() sample_resource2['identifier'] = 'helloapple-copy' sample_mapping2 = make_sample_mapping() sample_mapping2['identifier'] = 'helloapple-copy' sample_data = { 'resource': { 'helloapple-copy': { '1.0': sample_resource2 } }, 'mapping': { 'helloapple-copy': { '0.1.1': sample_mapping2 } }, 'file': { 'sha256': sample_files_by_sha256 }, 'repo': [ 'https://hydril2.la/' ] } execute_in_page('returnval(save_items(arguments[0]));', sample_data) execute_in_page('returnval(set_setting("option22", "abc"));') execute_in_page('returnval(set_repo("https://hydril3.la/"));') execute_in_page('returnval(set_allowed("ftp://a.bc/"));') driver.switch_to.window(windows[0]) driver.implicitly_wait(10) for elem_id, json_value in [ ('resource_helloapple-copy', sample_resource2), ('mapping_helloapple-copy', sample_mapping2), ('setting_option22', {'name': 'option22', 'value': 'abc'}), ('repo_https://hydril2.la/', {'url': 'https://hydril2.la/'}), ('repo_https://hydril3.la/', {'url': 'https://hydril3.la/'}), ('blocking_ftp://a.bc/', {'pattern': 'ftp://a.bc/', 'allow': True}) ]: assert json.loads(driver.find_element_by_id(elem_id).text) == json_value driver.implicitly_wait(0) # See if item deletions/modifications get tracked properly. driver.switch_to.window(windows[1]) execute_in_page( '''{ async function change_remove_items() { const store_names = ["resource", "mapping"]; const ctx = await start_items_transaction(store_names, {}); await remove_resource("helloapple", ctx); await remove_mapping("helloapple-copy", ctx); await finalize_transaction(ctx); await set_setting("option22", null); await del_repo("https://hydril.la"); await set_default_allowing("file:///*"); await set_disallowed("ftp://a.bc/"); } returnval(change_remove_items()); }''') removed_ids = ['mapping_helloapple-copy', 'resource_helloapple', 'repo_https://hydril.la', 'blocking_file:///*'] def condition_items_absent_and_changed(driver): for id in removed_ids: try: driver.find_element_by_id(id) return False except WebDriverException: pass option_text = driver.find_element_by_id('setting_option22').text blocking_text = driver.find_element_by_id('blocking_ftp://a.bc/').text return (json.loads(option_text)['value'] == None and json.loads(blocking_text)['allow'] == False) driver.switch_to.window(windows[0]) WebDriverWait(driver, 10).until(condition_items_absent_and_changed)