aboutsummaryrefslogtreecommitdiff
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - caching responses from remote repositories
"""

# This file is part of Haketilo
#
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import pytest
import json
from selenium.webdriver.support.ui import WebDriverWait

from ..script_loader import load_script

def content_script():
    script = load_script('content/repo_query_cacher.js')
    return f'{script}; {tab_id_asker}; start();'

def bypass_js():
    return load_script('background/CORS_bypass_server.js') + '; start();'

def fetch_through_cache(driver, tab_id, url):
    return driver.execute_script(
        '''
        return browser.tabs.sendMessage(arguments[0],
                                        ["repo_query", arguments[1]]);
        ''',
        tab_id, url)

"""
tab_id_responder is meant to be appended to background script of a test
extension.
"""
tab_id_responder = '''
function tell_tab_id(msg, sender, respond_cb) {
    if (msg[0] === "learn_tab_id")
        respond_cb(sender.tab.id);
}
browser.runtime.onMessage.addListener(tell_tab_id);
'''

"""
tab_id_asker is meant to be appended to content script of a test extension.
"""
tab_id_asker = '''
browser.runtime.sendMessage(["learn_tab_id"])
    .then(tid => window.wrappedJSObject.haketilo_tab = tid);
'''

def run_content_script_in_new_window(driver, url):
    """
    Expect an extension to be loaded which had tab_id_responder and tab_id_asker
    appended to its background and content scripts, respectively.
    Open the provided url in a new tab, find its tab id and return it, with
    current window changed back to the initial one.
    """
    handle0 = driver.current_window_handle
    initial_handles = [*driver.window_handles]
    driver.execute_script('window.open(arguments[0], "_blank");', url)
    window_added = lambda d: set(d.window_handles) != set(initial_handles)
    WebDriverWait(driver, 10).until(window_added)
    new_handle = [*set(driver.window_handles).difference(initial_handles)][0]

    driver.switch_to.window(new_handle)

    get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;')
    tab_id = WebDriverWait(driver, 10).until(get_tab_id)

    driver.switch_to.window(handle0)
    return tab_id

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': lambda: bypass_js() + ';' + tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_normal_use(driver):
    """
    Test if HTTP requests made through our cacher return correct results.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'status', 'statusText', 'headers', 'body'}
    counter_initial = json.loads(bytes.fromhex(result['body']))['counter']
    assert type(counter_initial) is int

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
        assert json.loads(bytes.fromhex(result['body'])) \
            == {'counter': counter_initial}

    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert json.loads(bytes.fromhex(result['body'])) \
        == {'counter': counter_initial + 1}

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/')
        assert result['status'] == 404

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'bad://url')
        assert set(result.keys()) == {'error'}
        assert result['error']['name'] == 'TypeError'

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_bgscript_error(driver):
    """
    Test if our cacher properly reports errors in communication with the
    background script.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'error'}
    assert set(result['error'].keys()) == \
        {'name', 'message', 'fileName', 'lineNumber'}
    assert result['error']['message'] == \
        "Couldn't communicate with background script."