aboutsummaryrefslogtreecommitdiff
path: root/test/haketilo_test/unit/test_repo_query_cacher.py
blob: 3f0a00d9ecf40f3e672629144d0b2620f487abec (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - caching responses from remote repositories
"""

# This file is part of Haketilo
#
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import pytest
import json
from selenium.webdriver.support.ui import WebDriverWait

from ..script_loader import load_script

def content_script():
    script = load_script('content/repo_query_cacher.js')
    return f'{script}; {tab_id_asker}; start();'

def bypass_js():
    return load_script('background/CORS_bypass_server.js') + '; start();'

def fetch_through_cache(driver, tab_id, url):
    return driver.execute_script(
        '''
        return browser.tabs.sendMessage(arguments[0],
                                        ["repo_query", arguments[1]]);
        ''',
        tab_id, url)

"""
tab_id_responder is meant to be appended to background script of a test
extension.
"""
tab_id_responder = '''
function tell_tab_id(msg, sender, respond_cb) {
    if (msg[0] === "learn_tab_id")
        respond_cb(sender.tab.id);
}
browser.runtime.onMessage.addListener(tell_tab_id);
'''

"""
tab_id_asker is meant to be appended to content script of a test extension.
"""
tab_id_asker = '''
browser.runtime.sendMessage(["learn_tab_id"])
    .then(tid => window.wrappedJSObject.haketilo_tab = tid);
'''

def run_content_script_in_new_window(driver, url):
    """
    Expect an extension to be loaded which had tab_id_responder and tab_id_asker
    appended to its background and content scripts, respectively.
    Open the provided url in a new tab, find its tab id and return it, with
    current window changed back to the initial one.
    """
    handle0 = driver.current_window_handle
    initial_handles = [*driver.window_handles]
    driver.execute_script('window.open(arguments[0], "_blank");', url)
    window_added = lambda d: set(d.window_handles) != set(initial_handles)
    WebDriverWait(driver, 10).until(window_added)
    new_handle = [*set(driver.window_handles).difference(initial_handles)][0]

    driver.switch_to.window(new_handle)

    get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;')
    tab_id = WebDriverWait(driver, 10).until(get_tab_id)

    driver.switch_to.window(handle0)
    return tab_id

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': lambda: bypass_js() + ';' + tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_normal_use(driver):
    """
    Test if HTTP requests made through our cacher return correct results.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'status', 'statusText', 'headers', 'body'}
    counter_initial = json.loads(bytes.fromhex(result['body']))['counter']
    assert type(counter_initial) is int

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
        assert json.loads(bytes.fromhex(result['body'])) \
            == {'counter': counter_initial}

    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert json.loads(bytes.fromhex(result['body'])) \
        == {'counter': counter_initial + 1}

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/')
        assert result['status'] == 404

    for i in range(2):
        result = fetch_through_cache(driver, tab_id, 'bad://url')
        assert set(result.keys()) == {'error'}
        assert result['error']['name'] == 'TypeError'

@pytest.mark.ext_data({
    'content_script': content_script,
    'background_script': tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_bgscript_error(driver):
    """
    Test if our cacher properly reports errors in communication with the
    background script.
    """
    tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')

    result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
    assert set(result.keys()) == {'error'}
    assert set(result['error'].keys()) == \
        {'name', 'message', 'fileName', 'lineNumber'}
    assert result['error']['message'] == \
        "Couldn't communicate with background script."