aboutsummaryrefslogtreecommitdiff
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - modifying requests using webRequest API
"""

# This file is part of Haketilo
#
# Copyright (C) 2021, Wojtek Kosior <koszko@koszko.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import re
from hashlib import sha256
import pytest

from ..script_loader import load_script
from .utils import are_scripts_allowed

allowed_url = 'https://site.with.scripts.allow.ed/'
blocked_url = 'https://site.with.scripts.block.ed/'
payload_url = 'https://site.with.paylo.ad/'

def webrequest_js():
    return (load_script('background/webrequest.js',
                        '#IMPORT common/patterns_query_tree.js AS pqt') +
            ''';
            // Mock pattern tree.
            tree = pqt.make();
            // Mock default allow.
            default_allow = {name: "default_allow", value: true};

            // Rule to block scripts.
            pqt.register(tree, "%(blocked)s***",
                         "~allow", 0);

            // Rule to allow scripts, but overridden by payload assignment.
            pqt.register(tree, "%(payload)s***", "~allow", 1);
            pqt.register(tree, "%(payload)s***", "somemapping",
                         {identifier: "someresource"});

            // Mock stream_filter.
            stream_filter.apply = (details, headers, policy) => headers;
            ''' % {'blocked': blocked_url, 'payload': payload_url})

def webrequest_js_start_called():
    return webrequest_js() + ';\nstart("somesecret");'

ext_url = 'moz-extension://49de6ce9-49fc-49e1-8102-7ef35286389c/html/settings.html'
prefix = 'X-Haketilo-' + sha256(ext_url.encode()).digest().hex()

# Prepare a list of headers as could be sent by a website.
sample_csp_header = {
    'name': 'Content-Security-Policy',
    'value': "script-src 'self';"
}
sample_csp_header_idx = 7

sample_headers = [
    {'name': 'Content-Type',     'value': 'text/html;charset=utf-8'},
    {'name': 'Content-Length',   'value': '61954'},
    {'name': 'Content-Language', 'value': 'en'},
    {'name': 'Expires',          'value': 'Mon, 12 Mar 2012 11:04...'},
    {'name': 'Last-Modified',    'value': 'Fri, 26 Jul 2013 22:50...'},
    {'name': 'Cache-Control',    'value': 'max-age=0, s-maxage=86...'},
    {'name': 'Age',              'value': '224'},
    {'name': 'Server',           'value': 'nginx/1.1.19'},
    {'name': 'Date',             'value': 'Thu, 10 Mar 2022 12:09...'}
]

sample_headers.insert(sample_csp_header_idx, sample_csp_header)

# Prepare a list of headers as would be crafted by Haketilo when there is a
# payload to inject.
nonce_source = f'somemapping:someresource:{payload_url}:somesecret'.encode()
nonce = f'nonce-{sha256(nonce_source).digest().hex()}'

payload_csp_header = {
    'name': f'Content-Security-Policy',
    'value': ("prefetch-src 'none'; script-src-attr 'none'; "
              f"script-src '{nonce}' 'unsafe-eval'; script-src-elem '{nonce}';")
}

sample_payload_headers = [
    *sample_headers,
    {'name': prefix, 'value': ':)'},
    payload_csp_header
]

sample_payload_headers[sample_csp_header_idx] = {
    **sample_csp_header,
    'name': f'{prefix}-{sample_csp_header["name"]}',
}

# Prepare a list of headers as would be crafted by Haketilo when scripts are
# blocked.
sample_blocked_headers = [*sample_payload_headers]
sample_blocked_headers.pop()
sample_blocked_headers.append(sample_csp_header)
sample_blocked_headers.append({
    'name': f'Content-Security-Policy',
    'value': ("prefetch-src 'none'; script-src-attr 'none'; "
              "script-src 'none' 'unsafe-eval'; script-src-elem 'none';")
})

@pytest.mark.get_page('https://gotmyowndoma.in')
@pytest.mark.parametrize('params', [
    (sample_headers,         allowed_url),
    (sample_blocked_headers, blocked_url),
    (sample_payload_headers, payload_url),
])
def test_webrequest_on_headers_received(driver, execute_in_page, params):
    """Unit-test the on_headers_received() function."""
    headers_out, url = params

    execute_in_page(
        '''{
        // Mock browser object.
        const url = arguments[0];
        this.browser = {runtime: {getURL: () => url}};
        }''',
        ext_url)

    execute_in_page(webrequest_js())

    execute_in_page('secret = "somesecret";')

    for headers_in in [
            sample_headers,
            sample_blocked_headers,
            sample_payload_headers
    ]:
        details = {'url': url, 'responseHeaders': headers_in, 'fromCache': True}
        res = execute_in_page('returnval(on_headers_received(arguments[0]));',
                              details)

        assert res == {'responseHeaders': headers_out}

@pytest.mark.ext_data({'background_script': webrequest_js_start_called})
@pytest.mark.usefixtures('webextension')
def test_webrequest_real_pages(driver, execute_in_page):
    """
    Test webRequest-based header modifications by loading actual pages and
    attempting to run scripts within them.
    """
    for attempt in range(10):
        driver.get('https://site.with.scripts.block.ed/')

        if not are_scripts_allowed(driver):
            break
        assert attempt != 9

    driver.get(allowed_url)
    assert are_scripts_allowed(driver)

    driver.get(payload_url)
    assert not are_scripts_allowed(driver)
    source = 'somemapping:someresource:https://site.with.paylo.ad/index.html:somesecret'
    assert are_scripts_allowed(driver, sha256(source.encode()).digest().hex())