# SPDX-License-Identifier: CC0-1.0 """ Haketilo unit tests - caching responses from remote repositories """ # This file is part of Haketilo # # Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> # # This program is free software: you can redistribute it and/or modify # it under the terms of the CC0 1.0 Universal License as published by # the Creative Commons Corporation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # CC0 1.0 Universal License for more details. import pytest import json from selenium.webdriver.support.ui import WebDriverWait from ..script_loader import load_script def content_script(): script = load_script('content/repo_query_cacher.js') return f'{script}; {tab_id_asker}; start();' def bypass_js(): return load_script('background/CORS_bypass_server.js') + '; start();' def fetch_through_cache(driver, tab_id, url): return driver.execute_script( ''' return browser.tabs.sendMessage(arguments[0], ["repo_query", arguments[1]]); ''', tab_id, url) """ tab_id_responder is meant to be appended to background script of a test extension. """ tab_id_responder = ''' function tell_tab_id(msg, sender, respond_cb) { if (msg[0] === "learn_tab_id") respond_cb(sender.tab.id); } browser.runtime.onMessage.addListener(tell_tab_id); ''' """ tab_id_asker is meant to be appended to content script of a test extension. """ tab_id_asker = ''' browser.runtime.sendMessage(["learn_tab_id"]) .then(tid => window.wrappedJSObject.haketilo_tab = tid); ''' def run_content_script_in_new_window(driver, url): """ Expect an extension to be loaded which had tab_id_responder and tab_id_asker appended to its background and content scripts, respectively. Open the provided url in a new tab, find its tab id and return it, with current window changed back to the initial one. """ handle0 = driver.current_window_handle initial_handles = [*driver.window_handles] driver.execute_script('window.open(arguments[0], "_blank");', url) window_added = lambda d: set(d.window_handles) != set(initial_handles) WebDriverWait(driver, 10).until(window_added) new_handle = [*set(driver.window_handles).difference(initial_handles)][0] driver.switch_to.window(new_handle) get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;') tab_id = WebDriverWait(driver, 10).until(get_tab_id) driver.switch_to.window(handle0) return tab_id @pytest.mark.ext_data({ 'content_script': content_script, 'background_script': lambda: bypass_js() + ';' + tab_id_responder }) @pytest.mark.usefixtures('webextension') def test_repo_query_cacher_normal_use(driver): """ Test if HTTP requests made through our cacher return correct results. """ tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in') result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') assert set(result.keys()) == {'status', 'statusText', 'headers', 'body'} counter_initial = json.loads(bytes.fromhex(result['body']))['counter'] assert type(counter_initial) is int for i in range(2): result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') assert json.loads(bytes.fromhex(result['body'])) \ == {'counter': counter_initial} tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in') result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') assert json.loads(bytes.fromhex(result['body'])) \ == {'counter': counter_initial + 1} for i in range(2): result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/') assert result['status'] == 404 for i in range(2): result = fetch_through_cache(driver, tab_id, 'bad://url') assert set(result.keys()) == {'error'} assert result['error']['name'] == 'TypeError' @pytest.mark.ext_data({ 'content_script': content_script, 'background_script': tab_id_responder }) @pytest.mark.usefixtures('webextension') def test_repo_query_cacher_bgscript_error(driver): """ Test if our cacher properly reports errors in communication with the background script. """ tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in') result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/') assert set(result.keys()) == {'error'} assert set(result['error'].keys()) == \ {'name', 'message', 'fileName', 'lineNumber'} assert result['error']['message'] == \ "Couldn't communicate with background script."