# HG changeset patch # User Goffi # Date 1606491580 -3600 # Node ID d4558f3cbf136d660ff17376261c590d3ca7a0a6 # Parent a6ea53248c14fce265526bf2b46f4bed4f070c57 tests, docker(e2e): added e2e tests for Libervia: - moved jp tests to `e2e/jp` - new fixtures - adapted docker-compose - improved `run_e2e` with several flags + report on failure - doc to come diff -r a6ea53248c14 -r d4558f3cbf13 docker/backend_e2e/sat.conf --- a/docker/backend_e2e/sat.conf Fri Nov 27 16:32:40 2020 +0100 +++ b/docker/backend_e2e/sat.conf Fri Nov 27 16:39:40 2020 +0100 @@ -1,5 +1,8 @@ [DEFAULT] xmpp_domain = server1.test +email_server = sat.test +email_port = 8025 +email_admins_list = admin@server1.test [component file_sharing] tls_certificate = /usr/share/sat/certificates/server1.test.pem diff -r a6ea53248c14 -r d4558f3cbf13 docker/docker-compose_e2e.yml --- a/docker/docker-compose_e2e.yml Fri Nov 27 16:32:40 2020 +0100 +++ b/docker/docker-compose_e2e.yml Fri Nov 27 16:39:40 2020 +0100 @@ -42,6 +42,10 @@ image: salutatoi/sat_e2e environment: SAT_TEST_ENV_E2E: "1" + SAT_TEST_ENV_E2E_LIBERVIA: "1" + ports: + # VNC server for Libervia e2e tests visual mode + - 5900 networks: default: aliases: @@ -57,3 +61,7 @@ ports: - "8080" - "8443" + networks: + default: + aliases: + - libervia.test diff -r a6ea53248c14 -r d4558f3cbf13 tests/e2e/conftest.py --- a/tests/e2e/conftest.py Fri Nov 27 16:32:40 2020 +0100 +++ b/tests/e2e/conftest.py Fri Nov 27 16:39:40 2020 +0100 @@ -16,142 +16,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import sys import os import tempfile import string import hashlib import random from pathlib import Path -from textwrap import dedent -import json -import pytest +from aiosmtpd.controller import Controller +from aiosmtpd.handlers import Message +from email.message import EmailMessage from sh import jp - - -class JpJson: - """jp like commands parsing result as JSON""" - - def __init__(self): - self.subcommands = [] - - def __call__(self, *args, **kwargs): - args = self.subcommands + list(args) - self.subcommands.clear() - kwargs['output'] = 'json_raw' - kwargs['_tty_out'] = False - cmd = jp(*args, **kwargs) - return json.loads(cmd.stdout) - - def __getattr__(self, name): - if name.startswith('_'): - # no jp subcommand starts with a "_", - # and pytest uses some attributes with this name scheme - return super().__getattr__(name) - self.subcommands.append(name) - return self - - -class JpElt(JpJson): - """jp like commands parsing result as domishElement""" - - def __init__(self): - super().__init__() - from sat.tools.xml_tools import ElementParser - self.parser = ElementParser() - - def __call__(self, *args, **kwargs): - args = self.subcommands + list(args) - self.subcommands.clear() - kwargs['output'] = 'xml_raw' - kwargs['_tty_out'] = False - cmd = jp(*args, **kwargs) - return self.parser(cmd.stdout.decode().strip()) - - -class Editor: - - def __init__(self): - # temporary directory will be deleted Automatically when this object will be - # destroyed - self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") - self.tmp_dir_path = Path(self.tmp_dir_obj.name) - if not sys.executable: - raise Exception("Can't find python executable") - self.editor_set = False - self.editor_path = self.tmp_dir_path / "editor.py" - self.ori_content_path = self.tmp_dir_path / "original_content" - self.new_content_path = self.tmp_dir_path / "new_content" - self.base_script = dedent(f"""\ - #!{sys.executable} - import sys - - def content_filter(content): - return {{content_filter}} - - with open(sys.argv[1], 'r+') as f: - original_content = f.read() - f.seek(0) - new_content = content_filter(original_content) - f.write(new_content) - f.truncate() - - with open("{self.ori_content_path}", "w") as f: - f.write(original_content) - - with open("{self.new_content_path}", "w") as f: - f.write(new_content) - """ - ) - self._env = os.environ.copy() - self._env["EDITOR"] = str(self.editor_path) - - def set_filter(self, content_filter: str = "content"): - """Python code to modify original content - - The code will be applied to content received by editor. - The original content received by editor is in the "content" variable. - If filter_ is not specified, original content is written unmodified. - Code must be on a single line. - """ - if '\n' in content_filter: - raise ValueError("new lines can't be used in filter_") - with self.editor_path.open('w') as f: - f.write(self.base_script.format(content_filter=content_filter)) - self.editor_path.chmod(0o700) - self.editor_set = True - - @property - def env(self): - """Get environment variable with the editor set""" - if not self.editor_set: - self.set_filter() - return self._env - - @property - def original_content(self): - """Last content received by editor, before any modification - - returns None if editor has not yet been called - """ - try: - with self.ori_content_path.open() as f: - return f.read() - except FileNotFoundError: - return None - - @property - def new_content(self): - """Last content writen by editor - - This is the final content, after filter has been applied to original content - returns None if editor has not yet been called - """ - try: - with self.new_content_path.open() as f: - return f.read() - except FileNotFoundError: - return None +import pytest class FakeFile: @@ -225,16 +100,33 @@ return hash_.hexdigest() -@pytest.fixture(scope="session") -def jp_json(): - """Run jp with "json_raw" output, and returns the parsed value""" - return JpJson() +class TestMessage(EmailMessage): + + @property + def subject(self): + return self['subject'] + + @property + def from_(self): + return self['from'] + + @property + def to(self): + return self['to'] + + @property + def body(self): + return self.get_payload(decode=True).decode() -@pytest.fixture(scope="session") -def jp_elt(): - """Run jp with "xml_raw" output, and returns the parsed value""" - return JpElt() +class SMTPMessageHandler(Message): + messages = [] + + def __init__(self): + super().__init__(message_class=TestMessage) + + def handle_message(self, message): + self.messages.append(message) @pytest.fixture(scope="session") @@ -272,16 +164,18 @@ jp.profile.modify(profile="account1", default=True, connect=True) jp.profile.connect(profile="account1_s2", connect=True) yield tuple(profiles) - for profile in profiles: - jp.account.delete(profile=profile, connect=True, force=True) - jp.profile.delete(profile, force=True) + # This environment may be used during tests development + if os.getenv("SAT_TEST_E2E_KEEP_PROFILES") == None: + for profile in profiles: + jp.account.delete(profile=profile, connect=True, force=True) + jp.profile.delete(profile, force=True) @pytest.fixture(scope="class") def pubsub_nodes(test_profiles): """Create 2 testing nodes - Both nodes will be created with "account1" profile, named "test" and have and "open" + Both nodes will be created with "account1" profile, named "test" and have an "open" access model. One node will account1's PEP, the other one on pubsub.server1.test. """ @@ -308,13 +202,30 @@ ) -@pytest.fixture() -def editor(): - """Create a fake editor to automatise edition from CLI""" - return Editor() - - @pytest.fixture(scope="session") def fake_file(): """Manage dummy files creation and destination path""" return FakeFile() + + +@pytest.fixture(scope="session") +def test_files(): + """Return a Path to test files directory""" + return Path(__file__).parent.parent / "_files" + + +@pytest.fixture(scope="session") +def fake_smtp(): + """Create a fake STMP server to check sent emails""" + controller = Controller(SMTPMessageHandler()) + controller.hostname = "0.0.0.0" + controller.start() + yield + controller.stop() + + +@pytest.fixture +def sent_emails(fake_smtp): + """Catch email sent during the tests""" + SMTPMessageHandler.messages.clear() + return SMTPMessageHandler.messages diff -r a6ea53248c14 -r d4558f3cbf13 tests/e2e/jp/conftest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/jp/conftest.py Fri Nov 27 16:39:40 2020 +0100 @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import sys +import os +import tempfile +from pathlib import Path +from textwrap import dedent +import json +import pytest +from sh import jp + + +class JpJson: + """jp like commands parsing result as JSON""" + + def __init__(self): + self.subcommands = [] + + def __call__(self, *args, **kwargs): + args = self.subcommands + list(args) + self.subcommands.clear() + kwargs['output'] = 'json_raw' + kwargs['_tty_out'] = False + cmd = jp(*args, **kwargs) + return json.loads(cmd.stdout) + + def __getattr__(self, name): + if name.startswith('_'): + # no jp subcommand starts with a "_", + # and pytest uses some attributes with this name scheme + return super().__getattr__(name) + self.subcommands.append(name) + return self + + +class JpElt(JpJson): + """jp like commands parsing result as domishElement""" + + def __init__(self): + super().__init__() + from sat.tools.xml_tools import ElementParser + self.parser = ElementParser() + + def __call__(self, *args, **kwargs): + args = self.subcommands + list(args) + self.subcommands.clear() + kwargs['output'] = 'xml_raw' + kwargs['_tty_out'] = False + cmd = jp(*args, **kwargs) + return self.parser(cmd.stdout.decode().strip()) + + +class Editor: + + def __init__(self): + # temporary directory will be deleted Automatically when this object will be + # destroyed + self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") + self.tmp_dir_path = Path(self.tmp_dir_obj.name) + if not sys.executable: + raise Exception("Can't find python executable") + self.editor_set = False + self.editor_path = self.tmp_dir_path / "editor.py" + self.ori_content_path = self.tmp_dir_path / "original_content" + self.new_content_path = self.tmp_dir_path / "new_content" + self.base_script = dedent(f"""\ + #!{sys.executable} + import sys + + def content_filter(content): + return {{content_filter}} + + with open(sys.argv[1], 'r+') as f: + original_content = f.read() + f.seek(0) + new_content = content_filter(original_content) + f.write(new_content) + f.truncate() + + with open("{self.ori_content_path}", "w") as f: + f.write(original_content) + + with open("{self.new_content_path}", "w") as f: + f.write(new_content) + """ + ) + self._env = os.environ.copy() + self._env["EDITOR"] = str(self.editor_path) + + def set_filter(self, content_filter: str = "content"): + """Python code to modify original content + + The code will be applied to content received by editor. + The original content received by editor is in the "content" variable. + If filter_ is not specified, original content is written unmodified. + Code must be on a single line. + """ + if '\n' in content_filter: + raise ValueError("new lines can't be used in filter_") + with self.editor_path.open('w') as f: + f.write(self.base_script.format(content_filter=content_filter)) + self.editor_path.chmod(0o700) + self.editor_set = True + + @property + def env(self): + """Get environment variable with the editor set""" + if not self.editor_set: + self.set_filter() + return self._env + + @property + def original_content(self): + """Last content received by editor, before any modification + + returns None if editor has not yet been called + """ + try: + with self.ori_content_path.open() as f: + return f.read() + except FileNotFoundError: + return None + + @property + def new_content(self): + """Last content writen by editor + + This is the final content, after filter has been applied to original content + returns None if editor has not yet been called + """ + try: + with self.new_content_path.open() as f: + return f.read() + except FileNotFoundError: + return None + + +@pytest.fixture(scope="session") +def jp_json(): + """Run jp with "json_raw" output, and returns the parsed value""" + return JpJson() + + +@pytest.fixture(scope="session") +def jp_elt(): + """Run jp with "xml_raw" output, and returns the parsed value""" + return JpElt() + + +@pytest.fixture() +def editor(): + """Create a fake editor to automatise edition from CLI""" + return Editor() diff -r a6ea53248c14 -r d4558f3cbf13 tests/e2e/jp/test_jp.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/jp/test_jp.py Fri Nov 27 16:39:40 2020 +0100 @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import os +import shutil +import pytest +import sh +from sh import jp +from sat.tools.common import uri + + +if os.getenv("SAT_TEST_ENV_E2E") is None: + pytest.skip( + "skipping end-to-end tests, we are not in a test environment", + allow_module_level=True + ) + + +pytestmark = pytest.mark.usefixtures("test_profiles") + + +class TestInstall: + + def test_jp_can_run(self): + jp("--version") + + +class TestJpAccount: + + def test_create_and_delete(self, jp_json): + """Create an account in-band, connect it, then delete it and its profile""" + jp.account.create( + "test_create@server1.test", + "test", + profile="test_create", + host="server1.test" + ) + profiles = jp_json.profile.list() + assert "test_create" in profiles + jp.profile.connect(connect=True, profile="test_create") + jp.account.delete(profile="test_create", force=True) + jp.profile.delete("test_create", force=True) + profiles = jp_json.profile.list() + assert "test_create" not in profiles + + +@pytest.mark.usefixtures("pubsub_nodes") +class TestJpPubsub: + + def test_node_create_info_delete(self): + node_name = "tmp_node" + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist + jp.pubsub.node.info(node=node_name) + try: + jp.pubsub.node.create(node=node_name) + # if node exist has expected, following command won't raise an exception + metadata = jp.pubsub.node.info(node=node_name) + assert len(metadata.strip()) + finally: + jp.pubsub.node.delete(node=node_name, force=True) + + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist anymore + jp.pubsub.node.info(node=node_name) + + def test_set_get_delete_purge(self, jp_elt): + content = "test item" + payload = f"{content}" + + # we create 3 items and check them + item1_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + item2_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + item3_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + parsed_elt = jp_elt.pubsub.get(node="test", item=item1_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = jp_elt.pubsub.get(node="test", item=item2_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = jp_elt.pubsub.get(node="test", item=item3_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + + # deleting first item should work + jp.pubsub.delete(node="test", item=item1_id, force=True) + with pytest.raises(sh.ErrorReturnCode_16): + jp.pubsub.get(node="test", item=item1_id) + + # there must be a least item2 and item3 in the node + node_items = jp_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) >= 2 + + # after purge, node must be empty + jp.pubsub.node.purge(node="test", force=True) + node_items = jp_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) == 0 + + def test_edit(self, editor, jp_elt): + content = "original item" + payload = f"{content}" + item_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + editor.set_filter('content.replace("original", "edited")') + jp.pubsub.edit(node="test", item=item_id, _env=editor.env) + assert "original item" in editor.original_content + parsed_elt = jp_elt.pubsub.get(node="test", item=item_id) + edited_payload = parsed_elt.firstChildElement() + expected_edited_content = content.replace("original", "edited") + assert edited_payload.name == 'test' + assert str(edited_payload) == expected_edited_content + + def test_affiliations(self, jp_json): + affiliations = jp_json.pubsub.affiliations() + assert affiliations["test"] == "owner" + + def test_uri(self): + built_uri = jp.pubsub.uri( + service="pubsub.example.net", node="some_node" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node" + built_uri = jp.pubsub.uri( + service="pubsub.example.net", node="some_node", item="some_item" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" + + +class TestJpBlog: + MICROBLOG_NS = "urn:xmpp:microblog:0" + + def test_set_get(self, jp_json): + jp.blog.set(_in="markdown **bold** [link](https://example.net)") + item_data = jp_json.blog.get(max=1) + item = item_data[0][0] + metadata = item_data[1] + assert metadata['service'] == "account1@server1.test" + assert metadata['node'] == self.MICROBLOG_NS + assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} + item_id = item['id'] + expected_uri = uri.buildXMPPUri( + 'pubsub', subtype="microblog", path="account1@server1.test", + node=self.MICROBLOG_NS, item=item_id + ) + assert item['uri'] == expected_uri + assert item['content_xhtml'] == ( + '

markdown bold ' + 'link

' + ) + assert isinstance(item['published'], int) + assert isinstance(item['updated'], int) + assert isinstance(item['comments'], list) + assert isinstance(item['tags'], list) + assert item['author'] == 'account1' + assert item['author_jid'] == 'account1@server1.test' + + def test_edit(self, editor, jp_json): + payload_md = "content in **markdown**" + editor.set_filter(repr(payload_md)) + jp.blog.edit(_env=editor.env) + assert len(editor.original_content) == 0 + assert editor.new_content == payload_md + items_data = jp_json.blog.get(max=1) + last_item = items_data[0][0] + last_item_id = last_item['id'] + assert last_item['content'] == "content in markdown" + assert last_item['content_xhtml'] == ( + "

content in markdown

" + ) + editor.set_filter('f"{content} extended"') + jp.blog.edit("--last-item", _env=editor.env) + assert editor.original_content == payload_md + assert editor.new_content == f"{payload_md} extended" + items_data = jp_json.blog.get(max=1) + last_item = items_data[0][0] + # we check that the id hasn't been modified + assert last_item['id'] == last_item_id + assert last_item['content'] == "content in markdown extended" + assert last_item['content_xhtml'] == ( + "

content in markdown extended

" + ) + + +class TestJpFile: + + def test_upload_get(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + upload_url = jp.file.upload(source_file).strip() + + dest_file = fake_file.new_dest_file() + try: + jp.file.get(upload_url, dest_file=dest_file) + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + dest_file.unlink() + + assert source_file_hash == dest_file_hash + + def test_send_receive(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + send_cmd = jp.file.send(source_file, "account1@server2.test", _bg=True) + dest_path = fake_file.dest_files / "test_send_receive" + dest_path.mkdir() + try: + jp.file.receive( + "account1@server1.test", profile="account1_s2", path=dest_path) + dest_file = dest_path / source_file.name + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + shutil.rmtree(dest_path) + send_cmd.wait() + + assert source_file_hash == dest_file_hash diff -r a6ea53248c14 -r d4558f3cbf13 tests/e2e/libervia/conftest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/libervia/conftest.py Fri Nov 27 16:39:40 2020 +0100 @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import sys +import os +import socket +import pytest +import time +from datetime import datetime +from pathlib import Path +import helium + + +LIBERVIA_HOST = "libervia.test" +LIBERVIA_PORT_HTTPS = 8443 +BASE_URL = f"https://{LIBERVIA_HOST}:{LIBERVIA_PORT_HTTPS}" +SIZE_DESKTOP = (1024, 728) +SIZE_MOBILE = (380, 640) +account_1_cookies = None + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + # needed to get test results in request fixture + # cf. https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures + outcome = yield + rep = outcome.get_result() + setattr(item, "rep_" + rep.when, rep) + + +@pytest.fixture +def screenshot_on_failure(request): + yield + if request.node.rep_setup.passed: + if request.node.rep_call.failed: + report_dir = Path(os.getenv("SAT_TEST_REPORT_DIR", "/tmp/tests_report")) + dest_dir = report_dir/"screenshots" + dest_dir.mkdir(parents=True, exist_ok=True) + filename = f"{datetime.now().isoformat()}_{request.node.name}.png" + dest_path = dest_dir/filename + helium.get_driver().save_screenshot(str(dest_path)) + print(f"screenshot saved to {dest_path}") + + +def wait_for_socket(host, port, retries=30): + sock = socket.socket() + while True: + try: + sock.connect((host, port)) + except ConnectionRefusedError as e: + retries -= 1 + if retries < 0: + print(f"Can't access server at {host}:{port}", file=sys.stderr) + raise e + else: + print(f"Can't connect to {host}:{port}, retrying ({retries})") + time.sleep(1) + else: + break + + +@pytest.fixture(scope="session") +def browser(): + if os.getenv("SAT_TEST_E2E_LIBERVIA_NO_HEADLESS") is not None: + kwargs = {} + else: + kwargs = {"headless": True} + driver = helium.start_firefox(**kwargs) + driver.set_window_size(*SIZE_DESKTOP) + wait_for_socket(LIBERVIA_HOST, LIBERVIA_PORT_HTTPS) + yield helium + if os.getenv("SAT_TEST_E2E_LIBERVIA_KEEP_BROWSER") is None: + helium.kill_browser() + + +@pytest.fixture +def nobody_logged_in(browser): + browser.get_driver().delete_all_cookies() + + +@pytest.fixture +def log_in_account1(browser): + global account_1_cookies + if account_1_cookies is None: + browser.get_driver().delete_all_cookies() + browser.go_to("https://libervia.test:8443/login") + browser.write("account1", into="login") + browser.write("test", into="password") + browser.click("log in") + account_1_cookies = browser.get_driver().get_cookies() + else: + browser.get_driver().add_cookie(account_1_cookies) + + +@pytest.fixture +def mobile_screen(browser): + browser.get_driver().set_window_size(*SIZE_MOBILE) + + +@pytest.fixture +def desktop_screen(browser): + browser.get_driver().set_window_size(*SIZE_DESKTOP) diff -r a6ea53248c14 -r d4558f3cbf13 tests/e2e/libervia/test_libervia.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/libervia/test_libervia.py Fri Nov 27 16:39:40 2020 +0100 @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import os +import pytest +from helium import ( + go_to, write, click, drag_file, find_all, wait_until, S, Text, Link, get_driver +) + + +if os.getenv("SAT_TEST_ENV_E2E_LIBERVIA") is None: + pytest.skip( + "skipping end-to-end tests, we are not in a test environment for Libervia", + allow_module_level=True + ) + +pytestmark = pytest.mark.usefixtures("test_profiles", "screenshot_on_failure") + + +class TestLogin: + + def test_user_can_create_account(self, nobody_logged_in, sent_emails): + go_to("https://libervia.test:8443/login") + click("no account yet") + write("new_account", into="login") + write("some_email@example.net", into="email") + write("testtest", into="password") + click("register new account") + wait_until(lambda: get_driver().current_url.endswith("/login")) + write("testtest", into="password") + click("log in") + wait_until(Text("you are logged").exists) + wait_until(lambda: len(sent_emails) == 2) + if sent_emails[0].to == "admin@server1.test": + admin_email, user_email = sent_emails + else: + user_email, admin_email = sent_emails + assert admin_email.to == "admin@server1.test" + # profile name must be specified in admin email + assert "new_account" in admin_email.body + assert user_email.to == "some_email@example.net" + # user jid must be specified in the email + assert "new_account@server1.test" in user_email.body + # use can now log-in + + def test_user_can_log_in(self, nobody_logged_in): + go_to("https://libervia.test:8443/login") + write("account1_s2", into="login") + write("test", into="password") + click("log in") + assert Text("you are logged").exists() + + def test_wrong_password_fails(self, nobody_logged_in): + go_to("https://libervia.test:8443/login") + write("account1_s2", into="login") + write("wrong_password", into="password") + click("log in") + assert Text("Your login and/or password is incorrect. Please try again.") + + +class TestPhotos: + + def test_user_can_create_album(self, log_in_account1): + go_to("https://libervia.test:8443/photos") + wait_until(Link("create").exists) + click("create") + write("test album", into="album name") + click("create") + album_link = Link("test album") + wait_until(album_link.exists) + click(album_link) + wait_until(lambda: not S("#loading_screen").exists()) + drag_file("/src/sat/tests/_files/test_1.jpg", "drop photos here") + wait_until(lambda: len(find_all(S("div.progress_finished")))==1) + drag_file("/src/sat/tests/_files/test_2.jpg", "drop photos here") + wait_until(lambda: len(find_all(S("div.progress_finished")))==2) + assert S('img[alt="test_1.jpg"]').exists() + assert S('img[alt="test_2.jpg"]').exists() diff -r a6ea53248c14 -r d4558f3cbf13 tests/e2e/run_e2e.py --- a/tests/e2e/run_e2e.py Fri Nov 27 16:32:40 2020 +0100 +++ b/tests/e2e/run_e2e.py Fri Nov 27 16:39:40 2020 +0100 @@ -22,28 +22,85 @@ from pathlib import Path import tempfile from textwrap import dedent +from datetime import datetime import sh +import io +import re +import sat_templates +import libervia from sat.core import exceptions +import yaml +try: + from yaml import CLoader as Loader, CDumper as Dumper +except ImportError: + from yaml import Loader, Dumper -KEEP_OPT = "--keep" + +OPT_KEEP_CONTAINERS = "--keep-containers" +OPT_KEEP_PROFILES = "--keep-profiles" +OPT_KEEP_VNC = "--keep-vnc" +OPT_KEEP_BROWSER = "--keep-browser" +OPT_VISUAL = "--visual" +OPT_DEV_MODE = "--dev-mode" + +dev_mode_inst = dedent("""\ + Here is a short script to start working with a logged account: + + from helium import * + start_firefox() + go_to("https://libervia.test:8443/login") + write("account1", "login") + write("test", "password") + click("log in") + """) +report_buffer = io.StringIO() def live_out(data): sys.stdout.write(data) sys.stdout.flush() + report_buffer.write(data) def live_err(data): sys.stderr.write(data) sys.stderr.flush() + report_buffer.write(data) + + +def get_opt(opt_name): + """Check is an option flag is set, and remove it for sys.argv + + This allow to have simple flags without interfering with pytest options + """ + if opt_name in sys.argv: + sys.argv.remove(opt_name) + return True + else: + return False + + +def set_env(override, name, value="1"): + """Set environement variable""" + environment = override["services"]["sat"].setdefault("environment", {}) + environment[name] = value def use_e2e_env(): - if KEEP_OPT in sys.argv: + visual = get_opt(OPT_VISUAL) + keep_containers = get_opt(OPT_KEEP_CONTAINERS) + keep_profiles = get_opt(OPT_KEEP_PROFILES) + keep_vnc = get_opt(OPT_KEEP_VNC) + keep_browser = get_opt(OPT_KEEP_BROWSER) + if keep_browser: keep_containers = True - sys.argv.remove(KEEP_OPT) - else: - keep_containers = False + keep_vnc = True + if keep_vnc: + visual = True + dev_mode = get_opt(OPT_DEV_MODE) + if dev_mode: + keep_containers = keep_profiles = keep_vnc = visual = True + for p in Path.cwd().parents: package_path = p / "sat" docker_path = p / "docker" @@ -56,14 +113,32 @@ "from the backend repository?" ) + libervia_path = Path(libervia.__file__).parent.resolve() + libervia_root_path = libervia_path.parent + if (libervia_root_path / ".hg").is_dir(): + libervia_source = libervia_root_path + libervia_target = "/src/libervia" + else: + libervia_source = libervia_path + libervia_target = "/src/libervia/libervia" + + sat_templates_path = Path(sat_templates.__file__).parent.resolve() + sat_templates_root_path = sat_templates_path.parent + if (sat_templates_root_path / ".hg").is_dir(): + sat_templates_source = sat_templates_root_path + sat_templates_target = "/src/sat_templates" + else: + sat_templates_source = sat_templates_path + sat_templates_target = "/src/sat_templates/sat_templates" + compose_e2e_path = docker_path / "docker-compose_e2e.yml" if not compose_e2e_path.is_file(): raise exceptions.NotFound('"docker-compose_e2e.yml" file can\'t be found') - with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory(prefix="sat_test_e2e_") as temp_dir: override_path = Path(temp_dir) / "test_override.yml" - with override_path.open("w") as f: - f.write(dedent(f"""\ + override = yaml.load( + dedent(f"""\ version: "3.6" services: sat: @@ -72,23 +147,89 @@ source: {sat_root_path} target: /src/sat read_only: true - """)) + libervia: + volumes: + - type: bind + source: {sat_root_path} + target: /src/sat + read_only: true + - type: bind + source: {libervia_source} + target: {libervia_target} + read_only: true + - type: bind + source: {sat_templates_source} + target: {sat_templates_target} + read_only: true + """ + ), + Loader=Loader + ) + + if keep_profiles: + set_env(override, "SAT_TEST_E2E_KEEP_PROFILES") + + if visual: + set_env(override, "SAT_TEST_E2E_LIBERVIA_NO_HEADLESS") + + if keep_browser: + set_env(override, "SAT_TEST_E2E_LIBERVIA_KEEP_BROWSER") + + with override_path.open("w") as f: + yaml.dump(override, f, Dumper=Dumper) docker_compose = sh.docker_compose.bake( "-f", compose_e2e_path, "-f", override_path) docker_compose.up("-d") - try: - docker_compose.exec( + p = docker_compose.exec( "-T", "--workdir", "/src/sat/tests", "sat", "pytest", "-o", "cache_dir=/tmp", *sys.argv[1:], color="yes", - _in=sys.stdin, _out=live_out, _out_bufsize=0, _err=live_err, _err_bufsize=0 + _in=sys.stdin, _out=live_out, _out_bufsize=0, _err=live_err, _err_bufsize=0, + _bg=True ) + if visual: + vnc_port = docker_compose.port("sat", "5900").split(':', 1)[1].strip() + p_vnc = sh.vncviewer( + f"localhost:{vnc_port}", + _bg=True, + # vncviewer exits with 1 when we send an SIGTERM to it, and it's printed + # before we can catch it (it's happening in a thread). Thus we set exit + # code 1 as OK to avoid the backtrace. + _ok_code=[0, 1] + ) + else: + p_vnc = None + + try: + p.wait() except sh.ErrorReturnCode as e: + sat_cont_id = docker_compose.ps("-q", "sat").strip() + report_dest = Path(f"reports_{datetime.now().isoformat()}/") + # we need to make `report_dest` explicitely local with "./", otherwise + # docker parse takes it as a container path due to the presence of ":" + # with `isoformat()`. + sh.docker.cp(f"{sat_cont_id}:/reports", f"./{report_dest}") + # we save 2 versions: one with ANSI escape codes + report_ansi = report_dest / "report.ansi" + with report_ansi.open('w') as f: + f.write(report_buffer.getvalue()) + # and one without (cf. https://stackoverflow.com/a/14693789) + ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + report_log = report_dest / "report.log" + with report_log.open('w') as f: + f.write(ansi_escape.sub('', report_buffer.getvalue())) + + print(f"report saved to {report_dest}") sys.exit(e.exit_code) finally: + if p_vnc is not None and p_vnc.is_alive() and not keep_vnc: + p_vnc.terminate() if not keep_containers: docker_compose.down(volumes=True) + if dev_mode: + print(dev_mode_inst) + if __name__ == "__main__": use_e2e_env() diff -r a6ea53248c14 -r d4558f3cbf13 tests/e2e/test_jp.py --- a/tests/e2e/test_jp.py Fri Nov 27 16:32:40 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,231 +0,0 @@ -#!/usr/bin/env python3 - -# SàT: an XMPP client -# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import os -import shutil -import pytest -import sh -from sh import jp -from sat.tools.common import uri - - -if os.getenv("SAT_TEST_ENV_E2E") is None: - pytest.skip( - "skipping end-to-end tests, we are not in a test environment", - allow_module_level=True - ) - - -pytestmark = pytest.mark.usefixtures("test_profiles") - - -class TestInstall: - - def test_jp_can_run(self): - jp("--version") - - -class TestJpAccount: - - def test_create_and_delete(self, jp_json): - """Create an account in-band, connect it, then delete it and its profile""" - jp.account.create( - "test_create@server1.test", - "test", - profile="test_create", - host="server1.test" - ) - profiles = jp_json.profile.list() - assert "test_create" in profiles - jp.profile.connect(connect=True, profile="test_create") - jp.account.delete(profile="test_create", force=True) - jp.profile.delete("test_create", force=True) - profiles = jp_json.profile.list() - assert "test_create" not in profiles - - -@pytest.mark.usefixtures("pubsub_nodes") -class TestJpPubsub: - - def test_node_create_info_delete(self): - node_name = "tmp_node" - with pytest.raises(sh.ErrorReturnCode_16): - # the node should not exist - jp.pubsub.node.info(node=node_name) - try: - jp.pubsub.node.create(node=node_name) - # if node exist has expected, following command won't raise an exception - metadata = jp.pubsub.node.info(node=node_name) - assert len(metadata.strip()) - finally: - jp.pubsub.node.delete(node=node_name, force=True) - - with pytest.raises(sh.ErrorReturnCode_16): - # the node should not exist anymore - jp.pubsub.node.info(node=node_name) - - def test_set_get_delete_purge(self, jp_elt): - content = "test item" - payload = f"{content}" - - # we create 3 items and check them - item1_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - item2_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - item3_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - parsed_elt = jp_elt.pubsub.get(node="test", item=item1_id) - payload = parsed_elt.firstChildElement() - assert payload.name == 'test' - assert str(payload) == content - parsed_elt = jp_elt.pubsub.get(node="test", item=item2_id) - payload = parsed_elt.firstChildElement() - assert payload.name == 'test' - assert str(payload) == content - parsed_elt = jp_elt.pubsub.get(node="test", item=item3_id) - payload = parsed_elt.firstChildElement() - assert payload.name == 'test' - assert str(payload) == content - - # deleting first item should work - jp.pubsub.delete(node="test", item=item1_id, force=True) - with pytest.raises(sh.ErrorReturnCode_16): - jp.pubsub.get(node="test", item=item1_id) - - # there must be a least item2 and item3 in the node - node_items = jp_elt.pubsub.get(node="test") - assert len(list(node_items.elements())) >= 2 - - # after purge, node must be empty - jp.pubsub.node.purge(node="test", force=True) - node_items = jp_elt.pubsub.get(node="test") - assert len(list(node_items.elements())) == 0 - - def test_edit(self, editor, jp_elt): - content = "original item" - payload = f"{content}" - item_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - editor.set_filter('content.replace("original", "edited")') - jp.pubsub.edit(node="test", item=item_id, _env=editor.env) - assert "original item" in editor.original_content - parsed_elt = jp_elt.pubsub.get(node="test", item=item_id) - edited_payload = parsed_elt.firstChildElement() - expected_edited_content = content.replace("original", "edited") - assert edited_payload.name == 'test' - assert str(edited_payload) == expected_edited_content - - def test_affiliations(self, jp_json): - affiliations = jp_json.pubsub.affiliations() - assert affiliations["test"] == "owner" - - def test_uri(self): - built_uri = jp.pubsub.uri( - service="pubsub.example.net", node="some_node" - ).strip() - assert built_uri == "xmpp:pubsub.example.net?;node=some_node" - built_uri = jp.pubsub.uri( - service="pubsub.example.net", node="some_node", item="some_item" - ).strip() - assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" - - -class TestJpBlog: - MICROBLOG_NS = "urn:xmpp:microblog:0" - - def test_set_get(self, jp_json): - jp.blog.set(_in="markdown **bold** [link](https://example.net)") - item_data = jp_json.blog.get(max=1) - item = item_data[0][0] - metadata = item_data[1] - assert metadata['service'] == "account1@server1.test" - assert metadata['node'] == self.MICROBLOG_NS - assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} - item_id = item['id'] - expected_uri = uri.buildXMPPUri( - 'pubsub', subtype="microblog", path="account1@server1.test", - node=self.MICROBLOG_NS, item=item_id - ) - assert item['uri'] == expected_uri - assert item['content_xhtml'] == ( - '

markdown bold ' - 'link

' - ) - assert isinstance(item['published'], int) - assert isinstance(item['updated'], int) - assert isinstance(item['comments'], list) - assert isinstance(item['tags'], list) - assert item['author'] == 'account1' - assert item['author_jid'] == 'account1@server1.test' - - def test_edit(self, editor, jp_json): - payload_md = "content in **markdown**" - editor.set_filter(repr(payload_md)) - jp.blog.edit(_env=editor.env) - assert len(editor.original_content) == 0 - assert editor.new_content == payload_md - items_data = jp_json.blog.get(max=1) - last_item = items_data[0][0] - last_item_id = last_item['id'] - assert last_item['content'] == "content in markdown" - assert last_item['content_xhtml'] == ( - "

content in markdown

" - ) - editor.set_filter('f"{content} extended"') - jp.blog.edit("--last-item", _env=editor.env) - assert editor.original_content == payload_md - assert editor.new_content == f"{payload_md} extended" - items_data = jp_json.blog.get(max=1) - last_item = items_data[0][0] - # we check that the id hasn't been modified - assert last_item['id'] == last_item_id - assert last_item['content'] == "content in markdown extended" - assert last_item['content_xhtml'] == ( - "

content in markdown extended

" - ) - - -class TestJpFile: - - def test_upload_get(self, fake_file): - source_file = fake_file.size(10240) - source_file_hash = fake_file.get_source_hash(source_file) - upload_url = jp.file.upload(source_file).strip() - - dest_file = fake_file.new_dest_file() - try: - jp.file.get(upload_url, dest_file=dest_file) - dest_file_hash = fake_file.get_dest_hash(dest_file) - finally: - dest_file.unlink() - - assert source_file_hash == dest_file_hash - - def test_send_receive(self, fake_file): - source_file = fake_file.size(10240) - source_file_hash = fake_file.get_source_hash(source_file) - send_cmd = jp.file.send(source_file, "account1@server2.test", _bg=True) - dest_path = fake_file.dest_files / "test_send_receive" - dest_path.mkdir() - try: - jp.file.receive( - "account1@server1.test", profile="account1_s2", path=dest_path) - dest_file = dest_path / source_file.name - dest_file_hash = fake_file.get_dest_hash(dest_file) - finally: - shutil.rmtree(dest_path) - send_cmd.wait() - - assert source_file_hash == dest_file_hash