# SPDX-License-Identifier: CC0-1.0 """ Haketilo unit tests - message broadcasting """ # This file is part of Haketilo # # Copyright (C) 2021, Wojtek Kosior # # This program is free software: you can redistribute it and/or modify # it under the terms of the CC0 1.0 Universal License as published by # the Creative Commons Corporation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # CC0 1.0 Universal License for more details. import pytest from selenium.webdriver.support.ui import WebDriverWait from ..script_loader import load_script from .utils import broker_js test_page_html = ''' <!DOCTYPE html> <script src="/testpage.js"></script> <h2>d0 (channel `somebodyoncetoldme`)</h2> <div id="d0"></div> <h2>d1 (channel `worldisgonnarollme`)</h2> <div id="d1"></div> <h2>d2 (both channels)</h2> <div id="d2"></div> ''' @pytest.mark.ext_data({ 'background_script': broker_js, 'test_page': test_page_html, 'extra_files': { 'testpage.js': lambda: load_script('common/broadcast.js') } }) @pytest.mark.usefixtures('webextension') def test_broadcast(driver, execute_in_page, wait_elem_text): """ A test that verifies the broadcasting system based on WebExtension messaging API and implemented in `background/broadcast_broker.js` and `common/broadcast.js` works correctly. """ # The broadcast facility is meant to enable message distribution between # multiple contexts (e.g. different tabs/windows). Let's open the same # extension's test page in a second window. driver.execute_script( ''' window.open(window.location.href, "_blank"); window.open(window.location.href, "_blank"); ''') WebDriverWait(driver, 10).until(lambda d: len(d.window_handles) == 3) windows = [*driver.window_handles] # Let's first test if a simple message can be successfully broadcasted driver.switch_to.window(windows[0]) execute_in_page( ''' const divs = [0, 1, 2].map(n => document.getElementById("d" + n)); let appender = n => (t => divs[n].append("\\n" + `[${t[0]}, ${t[1]}]`)); let listener0 = listener_connection(appender(0)); subscribe(listener0, "somebodyoncetoldme"); ''') driver.switch_to.window(windows[1]) execute_in_page( ''' let sender0 = sender_connection(); out(sender0, "somebodyoncetoldme", "iaintthesharpesttool"); ''') driver.switch_to.window(windows[0]) wait_elem_text('d0', '[somebodyoncetoldme, iaintthesharpesttool]') # Let's add 2 more listeners driver.switch_to.window(windows[0]) execute_in_page( ''' let listener1 = listener_connection(appender(1)); subscribe(listener1, "worldisgonnarollme"); let listener2 = listener_connection(appender(2)); subscribe(listener2, "worldisgonnarollme"); subscribe(listener2, "somebodyoncetoldme"); ''') # Let's send one message to one channel and one to the other. Verify they # were received by the rght listeners. driver.switch_to.window(windows[1]) execute_in_page( ''' out(sender0, "somebodyoncetoldme", "intheshed"); out(sender0, "worldisgonnarollme", "shewaslooking"); ''') driver.switch_to.window(windows[0]) wait_elem_text('d0', 'intheshed') wait_elem_text('d1', 'shewaslooking') wait_elem_text('d2', 'intheshed') wait_elem_text('d2', 'shewaslooking') text = execute_in_page('returnval(divs[0].innerText);') assert 'shewaslooking' not in text text = execute_in_page('returnval(divs[1].innerText);') assert 'intheshed' not in text # Let's create a second sender in third window and use it to send messages # with the 'prepare' feature. driver.switch_to.window(windows[2]) execute_in_page( ''' let sender1 = sender_connection(); prepare(sender1, "somebodyoncetoldme", "kindadumb"); out(sender1, "worldisgonnarollme", "withherfinger"); ''') driver.switch_to.window(windows[0]) wait_elem_text('d1', 'withherfinger') text = execute_in_page('returnval(divs[0].innerText);') assert 'kindadumb' not in text driver.switch_to.window(windows[2]) execute_in_page('flush(sender1);') driver.switch_to.window(windows[0]) wait_elem_text('d0', 'kindadumb') # Let's verify that prepare()'d messages are properly discarded when # discard() is called. driver.switch_to.window(windows[2]) execute_in_page( ''' prepare(sender1, "somebodyoncetoldme", "andherthumb"); discard(sender1); prepare(sender1, "somebodyoncetoldme", "andhermiddlefinger"); flush(sender1); ''') driver.switch_to.window(windows[0]) wait_elem_text('d0', 'andhermiddlefinger') text = execute_in_page('returnval(divs[0].innerText);') assert 'andherthumb' not in text # Let's verify prepare()'d messages are properly auto-flushed when the other # end of the connection gets killed (e.g. because browser tab gets closed). driver.switch_to.window(windows[2]) execute_in_page( ''' prepare(sender1, "worldisgonnarollme", "intheshape", 500); ''') driver.close() driver.switch_to.window(windows[0]) wait_elem_text('d2', 'intheshape') # Verify listener's connection gets closed properly. execute_in_page('close(listener0); close(listener1);') driver.switch_to.window(windows[1]) execute_in_page('out(sender0, "worldisgonnarollme", "ofanL");') execute_in_page('out(sender0, "somebodyoncetoldme", "forehead");') driver.switch_to.window(windows[0]) wait_elem_text('d2', 'ofanL') wait_elem_text('d2', 'forehead') for i in (0, 1): text = execute_in_page('returnval(divs[arguments[0]].innerText);', i) assert 'ofanL' not in text assert 'forehead' not in text