# SPDX-License-Identifier: CC0-1.0 """ Haketilo unit tests - IndexedDB access """ # This file is part of Haketilo # # Copyright (C) 2021, Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the CC0 1.0 Universal License as published by # the Creative Commons Corporation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # CC0 1.0 Universal License for more details. import pytest import json from hashlib import sha256 from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import WebDriverException from ..script_loader import load_script indexeddb_js = lambda: load_script('common/indexeddb.js') broker_js = lambda: load_script('background/broadcast_broker.js') + ';start();' def sample_file(contents): return { 'hash_key': f'sha256-{sha256(contents.encode()).digest().hex()}', 'contents': contents } sample_files = { 'report.spdx': sample_file(''), 'LICENSES/somelicense.txt': sample_file('Permission is granted...'), 'hello.js': sample_file('console.log("hello!");\n'), 'bye.js': sample_file('console.log("bye!");\n'), 'combined.js': sample_file('console.log("hello!\\nbye!");\n'), 'README.md': sample_file('# Python Frobnicator\n...') } sample_files_by_hash = dict([[file['hash_key'], file['contents']] for file in sample_files.values()]) # Sample resource definitions. They'd normally contain more fields but here we # use simplified versions. def make_sample_resource(): return { 'source_copyright': [ file_ref('report.spdx'), file_ref('LICENSES/somelicense.txt') ], 'type': 'resource', 'identifier': 'helloapple', 'scripts': [file_ref('hello.js'), file_ref('bye.js')] } def make_sample_mapping(): return { 'source_copyright': [ file_ref('report.spdx'), file_ref('README.md') ], 'type': 'mapping', 'identifier': 'helloapple' } def file_ref(file_name): return {'file': file_name, 'hash_key': sample_files[file_name]['hash_key']} @pytest.mark.get_page('https://gotmyowndoma.in') def test_haketilodb_save_remove(execute_in_page): """ indexeddb.js facilitates operating on Haketilo's internal database. Verify database operations work properly. """ execute_in_page(indexeddb_js()) # Mock some unwanted imports. execute_in_page( '''{ initial_data = {}; const broadcast_mock = {}; const nop = () => {}; for (const key in broadcast) broadcast_mock[key] = nop; broadcast = broadcast_mock; }''') # Start with no database. execute_in_page( ''' async function delete_db() { if (db) { db.close(); db = null; } let resolve, reject; const result = new Promise((...cbs) => [resolve, reject] = cbs); const request = indexedDB.deleteDatabase("haketilo"); [request.onsuccess, request.onerror] = [resolve, reject]; await result; } returnval(delete_db()); ''' ) # Facilitate retrieving all IndexedDB contents. execute_in_page( ''' async function get_database_contents() { const db = await get_db(); const transaction = db.transaction(db.objectStoreNames); const store_names_reqs = [...db.objectStoreNames] .map(sn => [sn, transaction.objectStore(sn).getAll()]) const promises = store_names_reqs .map(([_, req]) => wait_request(req)); await Promise.all(promises); const result = {}; store_names_reqs.forEach(([sn, req]) => result[sn] = req.result); return result; } ''') sample_item = make_sample_resource() sample_item['source_copyright'][0]['extra_prop'] = True database_contents = execute_in_page( '''{ const promise = start_items_transaction(["resources"], arguments[1]) .then(ctx => save_item(arguments[0], ctx).then(() => ctx)) .then(finalize_items_transaction) .then(get_database_contents); returnval(promise); }''', sample_item, sample_files_by_hash) assert len(database_contents['files']) == 4 assert all([sample_files_by_hash[file['hash_key']] == file['contents'] for file in database_contents['files']]) assert all([len(file) == 2 for file in database_contents['files']]) assert len(database_contents['file_uses']) == 4 assert all([uses['uses'] == 1 for uses in database_contents['file_uses']]) assert set([uses['hash_key'] for uses in database_contents['file_uses']]) \ == set([file['hash_key'] for file in database_contents['files']]) assert database_contents['mappings'] == [] assert database_contents['resources'] == [sample_item] # See if trying to add an item without providing all its files ends in an # exception and aborts the transaction as it should. sample_item['scripts'].append(file_ref('combined.js')) incomplete_files = {**sample_files_by_hash} incomplete_files.pop(sample_files['combined.js']['hash_key']) result = execute_in_page( '''{ const promise = (async () => { const context = await start_items_transaction(["resources"], arguments[1]); try { await save_item(arguments[0], context); await finalize_items_transaction(context); return {}; } catch(e) { var exception = e; } return {exception, db_contents: await get_database_contents()}; })(); returnval(promise); }''', sample_item, incomplete_files) assert result assert 'file not present' in result['exception'] for key, val in database_contents.items(): keyfun = lambda item: item.get('hash_key') or item['identifier'] assert sorted(result['db_contents'][key], key=keyfun) \ == sorted(val, key=keyfun) # See if adding another item that partially uses first's files works OK. sample_item = make_sample_mapping() database_contents = execute_in_page( '''{ const promise = start_items_transaction(["mappings"], arguments[1]) .then(ctx => save_item(arguments[0], ctx).then(() => ctx)) .then(finalize_items_transaction) .then(get_database_contents); returnval(promise); }''', sample_item, sample_files_by_hash) names = ['README.md', 'report.spdx', 'LICENSES/somelicense.txt', 'hello.js', 'bye.js'] sample_files_list = [sample_files[name] for name in names] uses_list = [1, 2, 1, 1, 1] uses = dict([(uses['hash_key'], uses['uses']) for uses in database_contents['file_uses']]) assert uses == dict([(file['hash_key'], nr) for file, nr in zip(sample_files_list, uses_list)]) files = dict([(file['hash_key'], file['contents']) for file in database_contents['files']]) assert files == dict([(file['hash_key'], file['contents']) for file in sample_files_list]) del database_contents['resources'][0]['source_copyright'][0]['extra_prop'] assert database_contents['resources'] == [make_sample_resource()] assert database_contents['mappings'] == [sample_item] # Try removing the items to get an empty database again. results = [None, None] for i, item_type in enumerate(['resource', 'mapping']): results[i] = execute_in_page( f'''{{ const remover = remove_{item_type}; const promise = start_items_transaction(["{item_type}s"], {{}}) .then(ctx => remover('helloapple', ctx).then(() => ctx)) .then(finalize_items_transaction) .then(get_database_contents); returnval(promise); }}''') names = ['README.md', 'report.spdx'] sample_files_list = [sample_files[name] for name in names] uses_list = [1, 1] uses = dict([(uses['hash_key'], uses['uses']) for uses in results[0]['file_uses']]) assert uses == dict([(file['hash_key'], 1) for file in sample_files_list]) files = dict([(file['hash_key'], file['contents']) for file in results[0]['files']]) assert files == dict([(file['hash_key'], file['contents']) for file in sample_files_list]) assert results[0]['resources'] == [] assert results[0]['mappings'] == [sample_item] assert results[1] == dict([(key, []) for key in results[0].keys()]) # Try initializing an empty database with sample initial data object. sample_resource = make_sample_resource() sample_mapping = make_sample_mapping() initial_data = { 'resources': { 'helloapple': { '1.12': sample_resource, '0.9': 'something_that_should_get_ignored', '1': 'something_that_should_get_ignored', '1.1': 'something_that_should_get_ignored', '1.11.1': 'something_that_should_get_ignored', } }, 'mappings': { 'helloapple': { '0.1.1': sample_mapping } }, 'files': sample_files_by_hash } database_contents = execute_in_page( ''' initial_data = arguments[0]; returnval(delete_db().then(() => get_database_contents())); ''', initial_data) assert database_contents['resources'] == [sample_resource] assert database_contents['mappings'] == [sample_mapping] test_page_html = '''

resources

mappings

''' @pytest.mark.ext_data({ 'background_script': broker_js, 'test_page': test_page_html, 'extra_files': { 'testpage.js': indexeddb_js } }) @pytest.mark.usefixtures('webextension') def test_haketilodb_track(driver, execute_in_page, wait_elem_text): """ Verify IndexedDB object change notifications are correctly broadcasted through extension's background script and allow for object store contents to be tracked in any execution context. """ # Let's open the same extension's test page in a second window. Window 0 # will be used to make changed to IndexedDB and window 1 to "track" those # changes. driver.execute_script('window.open(window.location.href, "_blank");') windows = [*driver.window_handles] assert len(windows) == 2 # Mock initial_data. sample_resource = make_sample_resource() sample_mapping = make_sample_mapping() initial_data = { 'resources': { 'helloapple': { '1.0': sample_resource } }, 'mappings': { 'helloapple': { '0.1.1': sample_mapping } }, 'files': sample_files_by_hash } for window in reversed(windows): driver.switch_to.window(window) execute_in_page('initial_data = arguments[0];', initial_data) # See if track_*() functions properly return the already-existing items. execute_in_page( ''' function update_item(store_name, change) { console.log('update', ...arguments); const elem_id = `${store_name}_${change.identifier}`; let elem = document.getElementById(elem_id); elem = elem || document.createElement("li"); elem.id = elem_id; elem.innerText = JSON.stringify(change.new_val); document.getElementById(store_name).append(elem); if (change.new_val === undefined) elem.remove(); } let resource_tracking, resource_items, mapping_tracking, mapping_items; async function start_tracking() { const update_resource = change => update_item("resources", change); const update_mapping = change => update_item("mappings", change); [resource_tracking, resource_items] = await track_resources(update_resource); [mapping_tracking, mapping_items] = await track_mappings(update_mapping); for (const item of resource_items) update_resource({identifier: item.identifier, new_val: item}); for (const item of mapping_items) update_mapping({identifier: item.identifier, new_val: item}); } returnval(start_tracking()); ''') item_counts = driver.execute_script( ''' const childcount = id => document.getElementById(id).childElementCount; return ["resources", "mappings"].map(childcount); ''') assert item_counts == [1, 1] resource_json = driver.find_element_by_id('resources_helloapple').text mapping_json = driver.find_element_by_id('mappings_helloapple').text assert json.loads(resource_json) == sample_resource assert json.loads(mapping_json) == sample_mapping # See if item additions get tracked properly. driver.switch_to.window(windows[1]) sample_resource2 = make_sample_resource() sample_resource2['identifier'] = 'helloapple-copy' sample_mapping2 = make_sample_mapping() sample_mapping2['identifier'] = 'helloapple-copy' sample_data = { 'resources': { 'helloapple-copy': { '1.0': sample_resource2 } }, 'mappings': { 'helloapple-copy': { '0.1.1': sample_mapping2 } }, 'files': sample_files_by_hash } execute_in_page('returnval(save_items(arguments[0]));', sample_data) driver.switch_to.window(windows[0]) driver.implicitly_wait(10) resource_json = driver.find_element_by_id('resources_helloapple-copy').text mapping_json = driver.find_element_by_id('mappings_helloapple-copy').text driver.implicitly_wait(0) assert json.loads(resource_json) == sample_resource2 assert json.loads(mapping_json) == sample_mapping2 # See if item deletions get tracked properly. driver.switch_to.window(windows[1]) execute_in_page( '''{ async function remove_items() { const store_names = ["resources", "mappings"]; const ctx = await start_items_transaction(store_names, {}); await remove_resource("helloapple", ctx); await remove_mapping("helloapple-copy", ctx); await finalize_items_transaction(ctx); } returnval(remove_items()); }''') removed_ids = ['mappings_helloapple-copy', 'resources_helloapple'] def condition_items_absent(driver): for id in removed_ids: try: driver.find_element_by_id(id) return False except WebDriverException: pass return True driver.switch_to.window(windows[0]) WebDriverWait(driver, 10).until(condition_items_absent)