aboutsummaryrefslogtreecommitdiff
# SPDX-License-Identifier: CC0-1.0

"""
Haketilo unit tests - message broadcasting
"""

# This file is part of Haketilo
#
# Copyright (C) 2021, Wojtek Kosior
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the CC0 1.0 Universal License as published by
# the Creative Commons Corporation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# CC0 1.0 Universal License for more details.

import pytest
from selenium.webdriver.support.ui import WebDriverWait

from ..script_loader import load_script
from .utils import broker_js

test_page_html = '''
<!DOCTYPE html>
<script src="/testpage.js"></script>
<h2>d0 (channel `somebodyoncetoldme`)</h2>
<div id="d0"></div>
<h2>d1 (channel `worldisgonnarollme`)</h2>
<div id="d1"></div>
<h2>d2 (both channels)</h2>
<div id="d2"></div>
'''

@pytest.mark.ext_data({
    'background_script': broker_js,
    'test_page':         test_page_html,
    'extra_files': {
        'testpage.js':   lambda: load_script('common/broadcast.js')
    }
})
@pytest.mark.usefixtures('webextension')
def test_broadcast(driver, execute_in_page, wait_elem_text):
    """
    A test that verifies the broadcasting system based on WebExtension messaging
    API and implemented in `background/broadcast_broker.js` and
    `common/broadcast.js` works correctly.
    """
    # The broadcast facility is meant to enable message distribution between
    # multiple contexts (e.g. different tabs/windows). Let's open the same
    # extension's test page in a second window.
    driver.execute_script(
        '''
        window.open(window.location.href, "_blank");
        window.open(window.location.href, "_blank");
        ''')
    WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 3)
    windows = [*driver.window_handles]

    # Let's first test if a simple message can be successfully broadcasted
    driver.switch_to.window(windows[0])
    execute_in_page(
        '''
        const divs = [0, 1, 2].map(n => document.getElementById("d" + n));
        let appender = n => (t => divs[n].append("\\n" + `[${t[0]}, ${t[1]}]`));
        let listener0 = listener_connection(appender(0));
        subscribe(listener0, "somebodyoncetoldme");
        ''')

    driver.switch_to.window(windows[1])
    execute_in_page(
        '''
        let sender0 = sender_connection();
        out(sender0, "somebodyoncetoldme", "iaintthesharpesttool");
        ''')

    driver.switch_to.window(windows[0])
    wait_elem_text('d0', '[somebodyoncetoldme, iaintthesharpesttool]')

    # Let's add 2 more listeners
    driver.switch_to.window(windows[0])
    execute_in_page(
        '''
        let listener1 = listener_connection(appender(1));
        subscribe(listener1, "worldisgonnarollme");
        let listener2 = listener_connection(appender(2));
        subscribe(listener2, "worldisgonnarollme");
        subscribe(listener2, "somebodyoncetoldme");
        ''')

    # Let's send one message to one channel and one to the other. Verify they
    # were received by the rght listeners.
    driver.switch_to.window(windows[1])
    execute_in_page(
        '''
        out(sender0, "somebodyoncetoldme", "intheshed");
        out(sender0, "worldisgonnarollme", "shewaslooking");
        ''')

    driver.switch_to.window(windows[0])
    wait_elem_text('d0', 'intheshed')
    wait_elem_text('d1', 'shewaslooking')
    wait_elem_text('d2', 'intheshed')
    wait_elem_text('d2', 'shewaslooking')

    text = execute_in_page('returnval(divs[0].innerText);')
    assert 'shewaslooking' not in text
    text = execute_in_page('returnval(divs[1].innerText);')
    assert 'intheshed' not in text

    # Let's create a second sender in third window and use it to send messages
    # with the 'prepare' feature.
    driver.switch_to.window(windows[2])
    execute_in_page(
        '''
        let sender1 = sender_connection();
        prepare(sender1, "somebodyoncetoldme", "kindadumb");
        out(sender1, "worldisgonnarollme", "withherfinger");
        ''')

    driver.switch_to.window(windows[0])
    wait_elem_text('d1', 'withherfinger')
    text = execute_in_page('returnval(divs[0].innerText);')
    assert 'kindadumb' not in text

    driver.switch_to.window(windows[2])
    execute_in_page('flush(sender1);')

    driver.switch_to.window(windows[0])
    wait_elem_text('d0', 'kindadumb')

    # Let's verify that prepare()'d messages are properly discarded when
    # discard() is called.
    driver.switch_to.window(windows[2])
    execute_in_page(
        '''
        prepare(sender1, "somebodyoncetoldme", "andherthumb");
        discard(sender1);
        prepare(sender1, "somebodyoncetoldme", "andhermiddlefinger");
        flush(sender1);
        ''')

    driver.switch_to.window(windows[0])
    wait_elem_text('d0', 'andhermiddlefinger')
    text = execute_in_page('returnval(divs[0].innerText);')
    assert 'andherthumb' not in text

    # Let's verify prepare()'d messages are properly auto-flushed when the other
    # end of the connection gets killed (e.g. because browser tab gets closed).
    driver.switch_to.window(windows[2])
    execute_in_page(
        '''
        prepare(sender1, "worldisgonnarollme", "intheshape", 500);
        ''')
    driver.close()

    driver.switch_to.window(windows[0])
    wait_elem_text('d2', 'intheshape')

    # Verify listener's connection gets closed properly.
    execute_in_page('close(listener0); close(listener1);')

    driver.switch_to.window(windows[1])
    execute_in_page('out(sender0, "worldisgonnarollme", "ofanL");')
    execute_in_page('out(sender0, "somebodyoncetoldme", "forehead");')

    driver.switch_to.window(windows[0])
    wait_elem_text('d2', 'ofanL')
    wait_elem_text('d2', 'forehead')
    for i in (0, 1):
        text = execute_in_page('returnval(divs[arguments[0]].innerText);', i)
        assert 'ofanL' not in text
        assert 'forehead' not in text