1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# SPDX-License-Identifier: CC0-1.0
"""
Haketilo unit tests - caching responses from remote repositories
"""
# This file is part of Haketilo
#
# Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# CC0 1.0 Universal License for more details.
import pytest
import json
from selenium.webdriver.support.ui import WebDriverWait
from ..script_loader import load_script
def content_script():
script = load_script('content/repo_query_cacher.js')
return f'{script}; {tab_id_asker}; start();'
def bypass_js():
return load_script('background/CORS_bypass_server.js') + '; start();'
def fetch_through_cache(driver, tab_id, url):
return driver.execute_script(
'''
return browser.tabs.sendMessage(arguments[0],
["repo_query", arguments[1]]);
''',
tab_id, url)
"""
tab_id_responder is meant to be appended to background script of a test
extension.
"""
tab_id_responder = '''
function tell_tab_id(msg, sender, respond_cb) {
if (msg[0] === "learn_tab_id")
respond_cb(sender.tab.id);
}
browser.runtime.onMessage.addListener(tell_tab_id);
'''
"""
tab_id_asker is meant to be appended to content script of a test extension.
"""
tab_id_asker = '''
browser.runtime.sendMessage(["learn_tab_id"])
.then(tid => window.wrappedJSObject.haketilo_tab = tid);
'''
def run_content_script_in_new_window(driver, url):
"""
Expect an extension to be loaded which had tab_id_responder and tab_id_asker
appended to its background and content scripts, respectively.
Open the provided url in a new tab, find its tab id and return it, with
current window changed back to the initial one.
"""
initial_handle = driver.current_window_handle
handles = driver.window_handles
driver.execute_script('window.open(arguments[0], "_blank");', url)
WebDriverWait(driver, 10).until(lambda d: d.window_handles is not handles)
new_handle = [h for h in driver.window_handles if h not in handles][0]
driver.switch_to.window(new_handle)
get_tab_id = lambda d: d.execute_script('return window.haketilo_tab;')
tab_id = WebDriverWait(driver, 10).until(get_tab_id)
driver.switch_to.window(initial_handle)
return tab_id
@pytest.mark.ext_data({
'content_script': content_script,
'background_script': lambda: bypass_js() + ';' + tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_normal_use(driver, execute_in_page):
"""
Test if HTTP requests made through our cacher return correct results.
"""
tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
assert set(result.keys()) == {'ok', 'status', 'json'}
counter_initial = result['json']['counter']
assert type(counter_initial) is int
for i in range(2):
result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
assert result['json']['counter'] == counter_initial
tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
assert result['json']['counter'] == counter_initial + 1
for i in range(2):
result = fetch_through_cache(driver, tab_id, 'https://nxdoma.in/')
assert set(result.keys()) == {'ok', 'status', 'error_json'}
assert result['ok'] == False
assert result['status'] == 404
for i in range(2):
result = fetch_through_cache(driver, tab_id, 'bad://url')
assert set(result.keys()) == {'error'}
@pytest.mark.ext_data({
'content_script': content_script,
'background_script': tab_id_responder
})
@pytest.mark.usefixtures('webextension')
def test_repo_query_cacher_bgscript_error(driver):
"""
Test if our cacher properly reports errors in communication with the
background script.
"""
tab_id = run_content_script_in_new_window(driver, 'https://gotmyowndoma.in')
result = fetch_through_cache(driver, tab_id, 'https://counterdoma.in/')
assert set(result.keys()) == {'error'}
|