1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# SPDX-License-Identifier: CC0-1.0
"""
Haketilo unit tests - modifying requests using webRequest API
"""
# This file is part of Haketilo
#
# Copyright (C) 2021, Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# CC0 1.0 Universal License for more details.
import re
from hashlib import sha256
import pytest
from ..script_loader import load_script
from .utils import are_scripts_allowed
allowed_url = 'https://site.with.scripts.allow.ed/'
blocked_url = 'https://site.with.scripts.block.ed/'
payload_url = 'https://site.with.paylo.ad/'
def webrequest_js():
return (load_script('background/webrequest.js',
'#IMPORT common/patterns_query_tree.js AS pqt') +
''';
// Mock pattern tree.
tree = pqt.make();
// Mock default allow.
default_allow = {name: "default_allow", value: true};
// Rule to block scripts.
pqt.register(tree, "%(blocked)s***",
"~allow", 0);
// Rule to allow scripts, but overridden by payload assignment.
pqt.register(tree, "%(payload)s***", "~allow", 1);
pqt.register(tree, "%(payload)s***", "somemapping",
{identifier: "someresource"});
// Mock stream_filter.
stream_filter.apply = (details, headers, policy) => headers;
''' % {'blocked': blocked_url, 'payload': payload_url})
def webrequest_js_start_called():
return webrequest_js() + ';\nstart("somesecret");'
ext_url = 'moz-extension://49de6ce9-49fc-49e1-8102-7ef35286389c/html/settings.html'
prefix = 'X-Haketilo-' + sha256(ext_url.encode()).digest().hex()
# Prepare a list of headers as could be sent by a website.
sample_csp_header = {
'name': 'Content-Security-Policy',
'value': "script-src 'self';"
}
sample_csp_header_idx = 7
sample_headers = [
{'name': 'Content-Type', 'value': 'text/html;charset=utf-8'},
{'name': 'Content-Length', 'value': '61954'},
{'name': 'Content-Language', 'value': 'en'},
{'name': 'Expires', 'value': 'Mon, 12 Mar 2012 11:04...'},
{'name': 'Last-Modified', 'value': 'Fri, 26 Jul 2013 22:50...'},
{'name': 'Cache-Control', 'value': 'max-age=0, s-maxage=86...'},
{'name': 'Age', 'value': '224'},
{'name': 'Server', 'value': 'nginx/1.1.19'},
{'name': 'Date', 'value': 'Thu, 10 Mar 2022 12:09...'}
]
sample_headers.insert(sample_csp_header_idx, sample_csp_header)
# Prepare a list of headers as would be crafted by Haketilo when there is a
# payload to inject.
nonce_source = f'somemapping:someresource:{payload_url}:somesecret'.encode()
nonce = f'nonce-{sha256(nonce_source).digest().hex()}'
payload_csp_header = {
'name': f'Content-Security-Policy',
'value': ("prefetch-src 'none'; script-src-attr 'none'; "
f"script-src '{nonce}'; script-src-elem '{nonce}';")
}
sample_payload_headers = [
*sample_headers,
{'name': prefix, 'value': ':)'},
payload_csp_header
]
sample_payload_headers[sample_csp_header_idx] = {
**sample_csp_header,
'name': f'{prefix}-{sample_csp_header["name"]}',
}
# Prepare a list of headers as would be crafted by Haketilo when scripts are
# blocked.
sample_blocked_headers = [*sample_payload_headers]
sample_blocked_headers.pop()
sample_blocked_headers.append(sample_csp_header)
sample_blocked_headers.append({
'name': f'Content-Security-Policy',
'value': ("prefetch-src 'none'; script-src-attr 'none'; "
f"script-src 'none'; script-src-elem 'none';")
})
@pytest.mark.get_page('https://gotmyowndoma.in')
@pytest.mark.parametrize('params', [
(sample_headers, allowed_url),
(sample_blocked_headers, blocked_url),
(sample_payload_headers, payload_url),
])
def test_webrequest_on_headers_received(driver, execute_in_page, params):
"""Unit-test the on_headers_received() function."""
headers_out, url = params
execute_in_page(
'''{
// Mock browser object.
const url = arguments[0];
this.browser = {runtime: {getURL: () => url}};
}''',
ext_url)
execute_in_page(webrequest_js())
execute_in_page('secret = "somesecret";')
for headers_in in [
sample_headers,
sample_blocked_headers,
sample_payload_headers
]:
details = {'url': url, 'responseHeaders': headers_in, 'fromCache': True}
res = execute_in_page('returnval(on_headers_received(arguments[0]));',
details)
assert res == {'responseHeaders': headers_out}
@pytest.mark.ext_data({'background_script': webrequest_js_start_called})
@pytest.mark.usefixtures('webextension')
def test_webrequest_real_pages(driver, execute_in_page):
"""
Test webRequest-based header modifications by loading actual pages and
attempting to run scripts within them.
"""
for attempt in range(10):
driver.get('https://site.with.scripts.block.ed/')
if not are_scripts_allowed(driver):
break
assert attempt != 9
driver.get(allowed_url)
assert are_scripts_allowed(driver)
driver.get(payload_url)
assert not are_scripts_allowed(driver)
source = 'somemapping:someresource:https://site.with.paylo.ad/index.html:somesecret'
assert are_scripts_allowed(driver, sha256(source.encode()).digest().hex())
|