aboutsummaryrefslogtreecommitdiff
path: root/test/unit/test_repo_query_cacher.py
blob: b1ce4c84182174ce9f4d34d67cc62b4376570451 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - caching responses from remote repositories
"""

# This file is part of Haketilo
#
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import pytest
import json
from selenium.webdriver.support.ui import WebDriverWait

from ..script_loader import load_script

def content_script():
    script = load_script('content/repo_query_cacher.js')
    return f'{script}; {tab_id_asker}; start();'

def bypass_js():
    return load_script('background/CORS_bypass_server.js') + '; start();'

def fetch_through_cache(driver, tab_id, url):
    return driver.execute_script(
        '''
        return browser.tabs.sendMessage(arguments[0],
                                        ["repo_query", arguments[1]]);
        ''',
        tab_id, url)

"""
tab_id_responder is meant to be appended to background script of a test
extension.
"""
tab_id_responder = '''
function tell_tab_id(msg, sender, respond_cb) {
    if (msg[0] === "learn_tab_id")
        respond_cb(sender.tab.id);
}
browser.runtime.onMessage.addListener(tell_tab_id);
'''

"""
tab_id_asker is meant to be appended to content script of a test extension.
"""
tab_id_asker = '''
browser.runtime.sendMessage(["learn_tab_id"])
    .then(tid => window.wrappedJSObject.haketilo_tab = tid);
'''

def run_content_script_in_new_window(driver, url):
    """
    Expect an extension to be loaded which had tab_id_responder and tab_id_asker
    appended to its background and content scripts, respectively.
    Open the provided url in a new tab, find its tab id and return it, with
    current window changed back to the initial one.
    """
    initial_handle = driver.current_window_handle
    handles = driver.window_handles
    driver.execute_script('window.open(arguments[0], "_blank");', url)
    WebDriverWait(driver, 10).until(lambda d: d.window_handles is not handles)
    new_handle = [h for h in driver.window_handles if h not in handles][0]

    driver.switch_to.window(new_handle)

    get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;')
    tab_id = WebDriverWait(driver, 10).until(get_tab_id)

    driver.switch_to.window(initial_handle)
    return tab_id

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': lambda: bypass_js() + ';' + tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_normal_use(driver, execute_in_page):
    """
    Test if HTTP requests made through our cacher return correct results.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'ok', 'status', 'json'}
    counter_initial = result['json']['counter']
    assert type(counter_initial) is int

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
        assert result['json']['counter'] == counter_initial

    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert result['json']['counter'] == counter_initial + 1

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/')
        assert set(result.keys()) == {'ok', 'status', 'error_json'}
        assert result['ok'] == False
        assert result['status'] == 404

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'bad://url')
        assert set(result.keys()) == {'error'}

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_bgscript_error(driver):
    """
    Test if our cacher properly reports errors in communication with the
    background script.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'error'}