aboutsummaryrefslogtreecommitdiff
path: root/test/unit/test_repo_query_cacher.py
blob: d5c7396d0b4fe5436e07480701f86df3de21b49d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - caching responses from remote repositories
"""

# This file is part of Haketilo
#
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import pytest
import json
from selenium.webdriver.support.ui import WebDriverWait

from ..script_loader import load_script

tab_id_responder = '''
function tell_tab_id(msg, sender, respond_cb) {
    if (msg[0] === "learn_tab_id")
        respond_cb(sender.tab.id);
}
browser.runtime.onMessage.addListener(tell_tab_id);
'''

def content_script():
    return load_script('content/repo_query_cacher.js') + ''';
    start();
    browser.runtime.sendMessage(["learn_tab_id"])
        .then(tid => window.wrappedJSObject.haketilo_tab = tid);
    '''

def bypass_js():
    return load_script('background/CORS_bypass_server.js') + '; start();'

def run_content_script_in_new_window(driver, url):
    initial_handle = driver.current_window_handle
    handles = driver.window_handles
    driver.execute_script('window.open(arguments[0], "_blank");', url)
    WebDriverWait(driver, 10).until(lambda d: d.window_handles is not handles)
    new_handle = [h for h in driver.window_handles if h not in handles][0]

    driver.switch_to.window(new_handle)

    get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;')
    tab_id = WebDriverWait(driver, 10).until(get_tab_id)

    driver.switch_to.window(initial_handle)
    return tab_id

def fetch_through_cache(driver, tab_id, url):
    return driver.execute_script(
        '''
        return browser.tabs.sendMessage(arguments[0],
                                        ["repo_query", arguments[1]]);
        ''',
        tab_id, url)

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': lambda: bypass_js() + ';' + tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_normal_use(driver, execute_in_page):
    """
    Test if HTTP requests made through our cacher return correct results.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'ok', 'status', 'json'}
    counter_initial = result['json']['counter']
    assert type(counter_initial) is int

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
        assert result['json']['counter'] == counter_initial

    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert result['json']['counter'] == counter_initial + 1

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/')
        assert set(result.keys()) == {'ok', 'status', 'error-json'}
        assert result['ok'] == False
        assert result['status'] == 404

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'bad://url')
        assert set(result.keys()) == {'error'}

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_bgscript_error(driver):
    """
    Test if our cacher properly reports errors in communication with the
    background script.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'error'}