# SPDX-License-Identifier: CC0-1.0 """ Haketilo unit tests - repository querying """ # This file is part of Haketilo # # Copyright (C) 2022 Wojtek Kosior <koszko@koszko.org> # # This program is free software: you can redistribute it and/or modify # it under the terms of the CC0 1.0 Universal License as published by # the Creative Commons Corporation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # CC0 1.0 Universal License for more details. import pytest from selenium.webdriver.support.ui import WebDriverWait from ..extension_crafting import ExtraHTML from ..script_loader import load_script from .utils import * repo_urls = [f'https://hydril.la/{s}' for s in ('', '1/', '2/', '3/', '4/')] queried_url = 'https://example_a.com/something' def setup_view(execute_in_page, repo_urls, tab={'id': 0}): execute_in_page(mock_cacher_code) execute_in_page(load_script('html/repo_query.js')) execute_in_page( ''' const repo_proms = arguments[0].map(url => haketilodb.set_repo(url)); const cb_calls = []; const view = new RepoQueryView(arguments[1], () => cb_calls.push("show"), () => cb_calls.push("hide")); document.body.append(view.main_div); const shw = slice => [cb_calls.slice(slice || 0), view.shown]; returnval(Promise.all(repo_proms)); ''', repo_urls, tab) repo_query_ext_data = { 'background_script': broker_js, 'extra_html': ExtraHTML('html/repo_query.html', {}), 'navigate_to': 'html/repo_query.html' } @pytest.mark.ext_data(repo_query_ext_data) @pytest.mark.usefixtures('webextension') def test_repo_query_normal_usage(driver, execute_in_page): """ Test of using the repo query view to browse results from repository and to start installation. """ setup_view(execute_in_page, repo_urls) assert execute_in_page('returnval(shw());') == [[], False] execute_in_page('view.show(arguments[0]);', queried_url) assert execute_in_page('returnval(shw());') == [['show'], True] def get_repo_entries(driver): return execute_in_page( f'returnval((view.repo_entries || []).map({nodes_props_code}));' ) repo_entries = WebDriverWait(driver, 10).until(get_repo_entries) assert len(repo_urls) == len(repo_entries) for url, entry in reversed(list(zip(repo_urls, repo_entries))): assert url in entry['main_li'].text but_ids = ('show_results_but', 'hide_results_but') for but_idx in (0, 1, 0): assert bool(but_idx) == entry['list_container'].is_displayed() assert not entry[but_ids[1 - but_idx]].is_displayed() entry[but_ids[but_idx]].click() def get_mapping_entries(driver): return execute_in_page( f'''{{ const result_entries = (view.repo_entries[0].result_entries || []); returnval(result_entries.map({nodes_props_code})); }}''') mapping_entries = WebDriverWait(driver, 10).until(get_mapping_entries) assert len(mapping_entries) == 3 expected_names = ['MAPPING-ABCD', 'MAPPING-ABCD-DEFG-GHIJ', 'MAPPING-A'] for name, entry in zip(expected_names, mapping_entries): assert entry['mapping_name'].text == name assert entry['mapping_id'].text == f'{name.lower()}-2022.5.11' containers = execute_in_page( '''{ const reductor = (acc, k) => Object.assign(acc, {[k]: view[k]}); returnval(container_ids.reduce(reductor, {})); }''') for id, container in containers.items(): assert (id == 'repos_list_container') == container.is_displayed() entry['install_but'].click() for id, container in containers.items(): assert (id == 'install_view_container') == container.is_displayed() execute_in_page('returnval(view.install_view.cancel_but);').click() for id, container in containers.items(): assert (id == 'repos_list_container') == container.is_displayed() assert execute_in_page('returnval(shw());') == [['show'], True] execute_in_page('returnval(view.cancel_but);').click() assert execute_in_page('returnval(shw());') == [['show', 'hide'], False] @pytest.mark.ext_data(repo_query_ext_data) @pytest.mark.usefixtures('webextension') @pytest.mark.parametrize('message', [ 'browsing_for', 'no_repos', 'private_mode', 'failure_to_communicate', 'HTTP_code', 'invalid_JSON', 'newer_API_version', 'invalid_response_format', 'querying_repo', 'no_results' ]) def test_repo_query_messages(driver, execute_in_page, message): """ Test of loading and error messages shown in parts of the repo query view. """ def has_msg(message, elem=None): def has_msg_and_is_visible(dummy_driver): if elem: return elem.is_displayed() and message in elem.text else: return message in driver.page_source return has_msg_and_is_visible def show_and_wait_for_repo_entry(): execute_in_page('view.show(arguments[0]);', queried_url) done = lambda d: execute_in_page('returnval(!!view.repo_entries);') WebDriverWait(driver, 10).until(done) execute_in_page( ''' if (view.repo_entries.length > 0) view.repo_entries[0].show_results_but.click(); ''') if message == 'browsing_for': setup_view(execute_in_page, []) show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.url_span.parentNode);') assert has_msg(f'Browsing custom resources for: {queried_url}', elem)(0) elif message == 'no_repos': setup_view(execute_in_page, []) show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repos_list);') assert has_msg('You have no repositories configured :(', elem)(0) elif message == 'private_mode': setup_view(execute_in_page, repo_urls, tab={'id': 0, 'incognito': True}) show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.top_text);') assert has_msg('when in Private Browsing mode', elem)(0) elif message == 'failure_to_communicate': setup_view(execute_in_page, repo_urls) execute_in_page( ''' window.mock_cacher_fetch = () => {throw new Error("Something happened :o")}; ''') show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repo_entries[0].info_div);') done = has_msg('Failure to communicate with repository :(', elem) WebDriverWait(driver, 10).until(done) elif message == 'HTTP_code': setup_view(execute_in_page, repo_urls) execute_in_page( ''' const response = new Response("", {status: 405}); window.mock_cacher_fetch = () => Promise.resolve(response); ''') show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repo_entries[0].info_div);') done = has_msg('Repository sent HTTP code 405 :(', elem) WebDriverWait(driver, 10).until(done) elif message == 'invalid_JSON': setup_view(execute_in_page, repo_urls) execute_in_page( ''' const response = new Response("sth", {status: 200}); window.mock_cacher_fetch = () => Promise.resolve(response); ''') show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repo_entries[0].info_div);') done = has_msg("Repository's response is not valid JSON :(", elem) WebDriverWait(driver, 10).until(done) elif message == 'newer_API_version': setup_view(execute_in_page, repo_urls) execute_in_page( ''' const newer_schema_url = "https://hydrilla.koszko.org/schemas/api_query_result-255.2.1.schema.json"; const mocked_json_data = JSON.stringify({$schema: newer_schema_url}); const response = new Response(mocked_json_data, {status: 200}); window.mock_cacher_fetch = () => Promise.resolve(response); ''') show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repo_entries[0].info_div);') msg = 'Results were served using unsupported Hydrilla API version. You might need to update Haketilo.' WebDriverWait(driver, 10).until(has_msg(msg, elem)) elif message == 'invalid_response_format': setup_view(execute_in_page, repo_urls) execute_in_page( ''' window.mock_cacher_fetch = async function(...args) { const response = await fetch(...args); const json = await response.json(); /* $schema is no longer a string as it should be. */ json.$schema = null; return new Response(JSON.stringify(json), { status: response.status, statusText: response.statusText, headers: [...response.headers.entries()] }); } ''') show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repo_entries[0].info_div);') msg = 'Results were served using a nonconforming response format.' WebDriverWait(driver, 10).until(has_msg(msg, elem)) elif message == 'querying_repo': setup_view(execute_in_page, repo_urls) execute_in_page( 'window.mock_cacher_fetch = () => new Promise(cb => {});' ) show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repo_entries[0].info_div);') assert has_msg('Querying repository...', elem)(0) elif message == 'no_results': setup_view(execute_in_page, repo_urls) execute_in_page( ''' const schema_url = "https://hydrilla.koszko.org/schemas/api_query_result-1.schema.json"; const mocked_json_data = JSON.stringify({$schema: schema_url, mappings: []}); const response = new Response(mocked_json_data, {status: 200}); window.mock_cacher_fetch = () => Promise.resolve(response); ''') show_and_wait_for_repo_entry() elem = execute_in_page('returnval(view.repo_entries[0].info_div);') WebDriverWait(driver, 10).until(has_msg('No results :(', elem)) else: raise Exception('made a typo in test function params?')