# HG changeset patch # User Goffi # Date 1618590754 -7200 # Node ID d78b5eae912a21886a0ce3e6f9e0b0c7dedbea75 # Parent 73e04040d577a24ed2b047898a43f5b716094593 tests: update following names change diff -r 73e04040d577 -r d78b5eae912a doc/contribuing/testing.rst --- a/doc/contribuing/testing.rst Fri Apr 16 18:32:16 2021 +0200 +++ b/doc/contribuing/testing.rst Fri Apr 16 18:32:34 2021 +0200 @@ -61,14 +61,14 @@ ``--dev-mode`` Shortcut for ``--keep-containers``, ``--keep-profiles`` and ``--keep-vnc``. This is useful, as you guess with its names, for development of tests. User can then log-in into - the ``sat`` container, launch a Python console, and work with the automated browser in + the ``backend`` container, launch a Python console, and work with the automated browser in real-time. Basic commands to launch a browser and log-in with test account are printed at the end of the tests. Note that if you want to have profiles created, or extra tools like the fake SMTP server, you'll have to launch at least one test which require them. - To log-in into the ``sat`` container, you can use the following command, from + To log-in into the ``backend`` container, you can use the following command, from ``/docker`` directory:: - $ docker-compose -f docker-compose_e2e.yml exec sat /bin/bash + $ docker-compose -f docker-compose_e2e.yml exec backend /bin/bash Then run a python console with given instructions @@ -100,7 +100,7 @@ ``test_profiles`` Creates a bunch of test accounts which are available during the whole test session. Those account are destroyed once all the tests are finished (successful or not), except - if you set the ``SAT_TEST_E2E_KEEP_PROFILES`` environment variable (or use the + if you set the ``LIBERVIA_TEST_E2E_KEEP_PROFILES`` environment variable (or use the ``--keep-profiles`` flag in ``run_e2e.py``. The profiles created are in the form ``accountX`` for account on the ``server1.test``, @@ -127,7 +127,7 @@ and ``fake_file.get_dest_hash(dest_file_path)`` will generate its hash once written. ``sent_emails`` - When used, a fake SMTP server (already configured in container's ``sat.conf``) will be + When used, a fake SMTP server (already configured in container's ``libervia.conf``) will be launched if it's not already, and all messages sent to it since the beginning of the test will be available in the given list. Message are subclasses of ``email.message.EmailMessage`` with the extra properties ``from_``, ``to``, ``subject`` @@ -135,21 +135,21 @@ The SMTP server is terminated at the end of the test session. -jp e2e tests ------------- +libervia-cli e2e tests +---------------------- -End-to-end tests for ``jp`` are a good way to tests backend features without having to +End-to-end tests for ``libervia-cli`` are a good way to tests backend features without having to deal with frontends UI. Those tests use extensively the ``sh`` module, which helps -writing ``jp`` commands like if they where methods. +writing ``libervia-cli`` commands like if they where methods. Among the helping fixture (check the various ``conftest.py`` files for details), the following are specially good to know: -``jp_json`` - Set the ``json_raw`` output are parse it. When you use this instead of the normal ``jp``, +``li_json`` + Set the ``json_raw`` output are parse it. When you use this instead of the normal ``libervia-cli``, you'll get a Python object that you can manipulate easily. -``jp_elt`` +``li_elt`` Set the ``xml_raw`` output and parse it as a Twisted ``domish.Element``. When you use a command which can return XML, it is useful to get this object which is easy to manipulate in Python. @@ -161,7 +161,7 @@ method (this code is in a string which will be executed by Python interpreter, where the ``content`` variable is the received text). By default, the text is kept unmodified. - After ``editor`` has been used by the ``jp`` command, you can check its + After ``editor`` has been used by the ``libervia-cli`` command, you can check its ``original_content`` property to see the text that it received, and ``new_content`` property to see the text that has been written after updating the original content with the code set in ``set_filter``. @@ -198,14 +198,14 @@ Following examples have to be run from ``tests/e2e`` directory. -Run all tests for ``jp``:: +Run all tests for ``Libervia CLI``:: - $ ./run_e2e.py -k jp + $ ./run_e2e.py -k libervia-cli -Run all tests for ``Libervia`` with real-time visual feedback (note that you need to have +Run all tests for ``Libervia Web`` with real-time visual feedback (note that you need to have ``vncviewer`` installed and available in path, see above):: - $ ./run_e2e.py -k libervia --visual + $ ./run_e2e.py -k libervia-web --visual Run all tests with verbose mode (useful to know which test is currently running):: @@ -221,10 +221,10 @@ $ ./run_e2e.py -k user_can_create_account --dev-mode -…then to go into the ``sat`` container and work with the browser (to be run in ``docker`` +…then to go into the ``backend`` container and work with the browser (to be run in ``docker`` directory)…:: - $ docker-compose -f docker-compose_e2e.yml exec sat /bin/bash + $ docker-compose -f docker-compose_e2e.yml exec backend /bin/bash …and, inside the container, you can now run ``python3`` and enter instruction prints at the end of the test session. diff -r 73e04040d577 -r d78b5eae912a tests/e2e/conftest.py --- a/tests/e2e/conftest.py Fri Apr 16 18:32:16 2021 +0200 +++ b/tests/e2e/conftest.py Fri Apr 16 18:32:34 2021 +0200 @@ -26,7 +26,7 @@ from aiosmtpd.handlers import Message from email.message import EmailMessage import sh -from sh import jp +from sh import li import pytest @@ -35,7 +35,7 @@ BUF_SIZE = 65535 def __init__(self): - self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_files_") + self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="libervia_e2e_test_files_") self.tmp_dir_path = Path(self.tmp_dir_obj.name) self.source_files = self.tmp_dir_path / "source" self.source_files.mkdir() @@ -157,7 +157,7 @@ profile = f"account{account_idx}{profile_suff}" profiles.append(profile) try: - jp.account.create( + li.account.create( f"account{account_idx}@server{server_idx}.test", "test", profile=profile, @@ -167,14 +167,14 @@ # this is the conlict exit code, this can happen when tests are run # inside a container when --keep-profiles is used with run_e2e.py. pass - jp.profile.modify(profile="account1", default=True, connect=True) - jp.profile.connect(profile="account1_s2", connect=True) + li.profile.modify(profile="account1", default=True, connect=True) + li.profile.connect(profile="account1_s2", connect=True) yield tuple(profiles) # This environment may be used during tests development - if os.getenv("SAT_TEST_E2E_KEEP_PROFILES") == None: + if os.getenv("LIBERVIA_TEST_E2E_KEEP_PROFILES") == None: for profile in profiles: - jp.account.delete(profile=profile, connect=True, force=True) - jp.profile.delete(profile, force=True) + li.account.delete(profile=profile, connect=True, force=True) + li.profile.delete(profile, force=True) @pytest.fixture(scope="class") @@ -183,25 +183,25 @@ Both nodes will be created with "account1" profile, named "test" and have an "open" access model. - One node will account1's PEP, the other one on pubsub.server1.test. + One node will be on account1's PEP, the other one on pubsub.server1.test. """ - jp.pubsub.node.create( + li.pubsub.node.create( "-f", "access_model", "open", node="test", profile="account1", connect=True ) - jp.pubsub.node.create( + li.pubsub.node.create( "-f", "access_model", "open", service="pubsub.server1.test", node="test", profile="account1" ) yield - jp.pubsub.node.delete( + li.pubsub.node.delete( node="test", profile="account1", connect=True, force=True ) - jp.pubsub.node.delete( + li.pubsub.node.delete( service="pubsub.server1.test", node="test", profile="account1", force=True diff -r 73e04040d577 -r d78b5eae912a tests/e2e/jp/conftest.py --- a/tests/e2e/jp/conftest.py Fri Apr 16 18:32:16 2021 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,169 +0,0 @@ -#!/usr/bin/env python3 - -# Libervia: an XMPP client -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import sys -import os -import tempfile -from pathlib import Path -from textwrap import dedent -import json -import pytest -from sh import jp - - -class JpJson: - """jp like commands parsing result as JSON""" - - def __init__(self): - self.subcommands = [] - - def __call__(self, *args, **kwargs): - args = self.subcommands + list(args) - self.subcommands.clear() - kwargs['output'] = 'json_raw' - kwargs['_tty_out'] = False - cmd = jp(*args, **kwargs) - return json.loads(cmd.stdout) - - def __getattr__(self, name): - if name.startswith('_'): - # no jp subcommand starts with a "_", - # and pytest uses some attributes with this name scheme - return super().__getattr__(name) - self.subcommands.append(name) - return self - - -class JpElt(JpJson): - """jp like commands parsing result as domishElement""" - - def __init__(self): - super().__init__() - from sat.tools.xml_tools import ElementParser - self.parser = ElementParser() - - def __call__(self, *args, **kwargs): - args = self.subcommands + list(args) - self.subcommands.clear() - kwargs['output'] = 'xml_raw' - kwargs['_tty_out'] = False - cmd = jp(*args, **kwargs) - return self.parser(cmd.stdout.decode().strip()) - - -class Editor: - - def __init__(self): - # temporary directory will be deleted Automatically when this object will be - # destroyed - self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") - self.tmp_dir_path = Path(self.tmp_dir_obj.name) - if not sys.executable: - raise Exception("Can't find python executable") - self.editor_set = False - self.editor_path = self.tmp_dir_path / "editor.py" - self.ori_content_path = self.tmp_dir_path / "original_content" - self.new_content_path = self.tmp_dir_path / "new_content" - self.base_script = dedent(f"""\ - #!{sys.executable} - import sys - - def content_filter(content): - return {{content_filter}} - - with open(sys.argv[1], 'r+') as f: - original_content = f.read() - f.seek(0) - new_content = content_filter(original_content) - f.write(new_content) - f.truncate() - - with open("{self.ori_content_path}", "w") as f: - f.write(original_content) - - with open("{self.new_content_path}", "w") as f: - f.write(new_content) - """ - ) - self._env = os.environ.copy() - self._env["EDITOR"] = str(self.editor_path) - - def set_filter(self, content_filter: str = "content"): - """Python code to modify original content - - The code will be applied to content received by editor. - The original content received by editor is in the "content" variable. - If filter_ is not specified, original content is written unmodified. - Code must be on a single line. - """ - if '\n' in content_filter: - raise ValueError("new lines can't be used in filter_") - with self.editor_path.open('w') as f: - f.write(self.base_script.format(content_filter=content_filter)) - self.editor_path.chmod(0o700) - self.editor_set = True - - @property - def env(self): - """Get environment variable with the editor set""" - if not self.editor_set: - self.set_filter() - return self._env - - @property - def original_content(self): - """Last content received by editor, before any modification - - returns None if editor has not yet been called - """ - try: - with self.ori_content_path.open() as f: - return f.read() - except FileNotFoundError: - return None - - @property - def new_content(self): - """Last content writen by editor - - This is the final content, after filter has been applied to original content - returns None if editor has not yet been called - """ - try: - with self.new_content_path.open() as f: - return f.read() - except FileNotFoundError: - return None - - -@pytest.fixture(scope="session") -def jp_json(): - """Run jp with "json_raw" output, and returns the parsed value""" - return JpJson() - - -@pytest.fixture(scope="session") -def jp_elt(): - """Run jp with "xml_raw" output, and returns the parsed value""" - return JpElt() - - -@pytest.fixture() -def editor(): - """Create a fake editor to automatise edition from CLI""" - return Editor() diff -r 73e04040d577 -r d78b5eae912a tests/e2e/jp/test_jp.py --- a/tests/e2e/jp/test_jp.py Fri Apr 16 18:32:16 2021 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,231 +0,0 @@ -#!/usr/bin/env python3 - -# Libervia: an XMPP client -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import os -import shutil -import pytest -import sh -from sh import jp -from sat.tools.common import uri - - -if os.getenv("SAT_TEST_ENV_E2E") is None: - pytest.skip( - "skipping end-to-end tests, we are not in a test environment", - allow_module_level=True - ) - - -pytestmark = pytest.mark.usefixtures("test_profiles") - - -class TestInstall: - - def test_jp_can_run(self): - jp("--version") - - -class TestJpAccount: - - def test_create_and_delete(self, jp_json): - """Create an account in-band, connect it, then delete it and its profile""" - jp.account.create( - "test_create@server1.test", - "test", - profile="test_create", - host="server1.test" - ) - profiles = jp_json.profile.list() - assert "test_create" in profiles - jp.profile.connect(connect=True, profile="test_create") - jp.account.delete(profile="test_create", force=True) - jp.profile.delete("test_create", force=True) - profiles = jp_json.profile.list() - assert "test_create" not in profiles - - -@pytest.mark.usefixtures("pubsub_nodes") -class TestJpPubsub: - - def test_node_create_info_delete(self): - node_name = "tmp_node" - with pytest.raises(sh.ErrorReturnCode_16): - # the node should not exist - jp.pubsub.node.info(node=node_name) - try: - jp.pubsub.node.create(node=node_name) - # if node exist has expected, following command won't raise an exception - metadata = jp.pubsub.node.info(node=node_name) - assert len(metadata.strip()) - finally: - jp.pubsub.node.delete(node=node_name, force=True) - - with pytest.raises(sh.ErrorReturnCode_16): - # the node should not exist anymore - jp.pubsub.node.info(node=node_name) - - def test_set_get_delete_purge(self, jp_elt): - content = "test item" - payload = f"{content}" - - # we create 3 items and check them - item1_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - item2_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - item3_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - parsed_elt = jp_elt.pubsub.get(node="test", item=item1_id) - payload = parsed_elt.firstChildElement() - assert payload.name == 'test' - assert str(payload) == content - parsed_elt = jp_elt.pubsub.get(node="test", item=item2_id) - payload = parsed_elt.firstChildElement() - assert payload.name == 'test' - assert str(payload) == content - parsed_elt = jp_elt.pubsub.get(node="test", item=item3_id) - payload = parsed_elt.firstChildElement() - assert payload.name == 'test' - assert str(payload) == content - - # deleting first item should work - jp.pubsub.delete(node="test", item=item1_id, force=True) - with pytest.raises(sh.ErrorReturnCode_16): - jp.pubsub.get(node="test", item=item1_id) - - # there must be a least item2 and item3 in the node - node_items = jp_elt.pubsub.get(node="test") - assert len(list(node_items.elements())) >= 2 - - # after purge, node must be empty - jp.pubsub.node.purge(node="test", force=True) - node_items = jp_elt.pubsub.get(node="test") - assert len(list(node_items.elements())) == 0 - - def test_edit(self, editor, jp_elt): - content = "original item" - payload = f"{content}" - item_id = jp.pubsub.set(node="test", quiet=True, _in=payload) - editor.set_filter('content.replace("original", "edited")') - jp.pubsub.edit(node="test", item=item_id, _env=editor.env) - assert "original item" in editor.original_content - parsed_elt = jp_elt.pubsub.get(node="test", item=item_id) - edited_payload = parsed_elt.firstChildElement() - expected_edited_content = content.replace("original", "edited") - assert edited_payload.name == 'test' - assert str(edited_payload) == expected_edited_content - - def test_affiliations(self, jp_json): - affiliations = jp_json.pubsub.affiliations() - assert affiliations["test"] == "owner" - - def test_uri(self): - built_uri = jp.pubsub.uri( - service="pubsub.example.net", node="some_node" - ).strip() - assert built_uri == "xmpp:pubsub.example.net?;node=some_node" - built_uri = jp.pubsub.uri( - service="pubsub.example.net", node="some_node", item="some_item" - ).strip() - assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" - - -class TestJpBlog: - MICROBLOG_NS = "urn:xmpp:microblog:0" - - def test_set_get(self, jp_json): - jp.blog.set(_in="markdown **bold** [link](https://example.net)") - item_data = jp_json.blog.get(max=1) - item = item_data[0][0] - metadata = item_data[1] - assert metadata['service'] == "account1@server1.test" - assert metadata['node'] == self.MICROBLOG_NS - assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} - item_id = item['id'] - expected_uri = uri.buildXMPPUri( - 'pubsub', subtype="microblog", path="account1@server1.test", - node=self.MICROBLOG_NS, item=item_id - ) - assert item['uri'] == expected_uri - assert item['content_xhtml'] == ( - '

markdown bold ' - 'link

' - ) - assert isinstance(item['published'], int) - assert isinstance(item['updated'], int) - assert isinstance(item['comments'], list) - assert isinstance(item['tags'], list) - assert item['author'] == 'account1' - assert item['author_jid'] == 'account1@server1.test' - - def test_edit(self, editor, jp_json): - payload_md = "content in **markdown**" - editor.set_filter(repr(payload_md)) - jp.blog.edit(_env=editor.env) - assert len(editor.original_content) == 0 - assert editor.new_content == payload_md - items_data = jp_json.blog.get(max=1) - last_item = items_data[0][0] - last_item_id = last_item['id'] - assert last_item['content'] == "content in markdown" - assert last_item['content_xhtml'] == ( - "

content in markdown

" - ) - editor.set_filter('f"{content} extended"') - jp.blog.edit("--last-item", _env=editor.env) - assert editor.original_content == payload_md - assert editor.new_content == f"{payload_md} extended" - items_data = jp_json.blog.get(max=1) - last_item = items_data[0][0] - # we check that the id hasn't been modified - assert last_item['id'] == last_item_id - assert last_item['content'] == "content in markdown extended" - assert last_item['content_xhtml'] == ( - "

content in markdown extended

" - ) - - -class TestJpFile: - - def test_upload_get(self, fake_file): - source_file = fake_file.size(10240) - source_file_hash = fake_file.get_source_hash(source_file) - upload_url = jp.file.upload(source_file).strip() - - dest_file = fake_file.new_dest_file() - try: - jp.file.get(upload_url, dest_file=dest_file) - dest_file_hash = fake_file.get_dest_hash(dest_file) - finally: - dest_file.unlink() - - assert source_file_hash == dest_file_hash - - def test_send_receive(self, fake_file): - source_file = fake_file.size(10240) - source_file_hash = fake_file.get_source_hash(source_file) - send_cmd = jp.file.send(source_file, "account1@server2.test", _bg=True) - dest_path = fake_file.dest_files / "test_send_receive" - dest_path.mkdir() - try: - jp.file.receive( - "account1@server1.test", profile="account1_s2", path=dest_path) - dest_file = dest_path / source_file.name - dest_file_hash = fake_file.get_dest_hash(dest_file) - finally: - shutil.rmtree(dest_path) - send_cmd.wait() - - assert source_file_hash == dest_file_hash diff -r 73e04040d577 -r d78b5eae912a tests/e2e/libervia-cli/conftest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/libervia-cli/conftest.py Fri Apr 16 18:32:34 2021 +0200 @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import sys +import os +import tempfile +from pathlib import Path +from textwrap import dedent +import json +import pytest +from sh import li + + +class LiberviaCliJson: + """li like commands parsing result as JSON""" + + def __init__(self): + self.subcommands = [] + + def __call__(self, *args, **kwargs): + args = self.subcommands + list(args) + self.subcommands.clear() + kwargs['output'] = 'json_raw' + kwargs['_tty_out'] = False + cmd = li(*args, **kwargs) + return json.loads(cmd.stdout) + + def __getattr__(self, name): + if name.startswith('_'): + # no li subcommand starts with a "_", + # and pytest uses some attributes with this name scheme + return super().__getattr__(name) + self.subcommands.append(name) + return self + + +class LiberviaCliElt(LiberviaCliJson): + """li like commands parsing result as domishElement""" + + def __init__(self): + super().__init__() + from sat.tools.xml_tools import ElementParser + self.parser = ElementParser() + + def __call__(self, *args, **kwargs): + args = self.subcommands + list(args) + self.subcommands.clear() + kwargs['output'] = 'xml_raw' + kwargs['_tty_out'] = False + cmd = li(*args, **kwargs) + return self.parser(cmd.stdout.decode().strip()) + + +class Editor: + + def __init__(self): + # temporary directory will be deleted Automatically when this object will be + # destroyed + self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="libervia_e2e_test_editor_") + self.tmp_dir_path = Path(self.tmp_dir_obj.name) + if not sys.executable: + raise Exception("Can't find python executable") + self.editor_set = False + self.editor_path = self.tmp_dir_path / "editor.py" + self.ori_content_path = self.tmp_dir_path / "original_content" + self.new_content_path = self.tmp_dir_path / "new_content" + self.base_script = dedent(f"""\ + #!{sys.executable} + import sys + + def content_filter(content): + return {{content_filter}} + + with open(sys.argv[1], 'r+') as f: + original_content = f.read() + f.seek(0) + new_content = content_filter(original_content) + f.write(new_content) + f.truncate() + + with open("{self.ori_content_path}", "w") as f: + f.write(original_content) + + with open("{self.new_content_path}", "w") as f: + f.write(new_content) + """ + ) + self._env = os.environ.copy() + self._env["EDITOR"] = str(self.editor_path) + + def set_filter(self, content_filter: str = "content"): + """Python code to modify original content + + The code will be applied to content received by editor. + The original content received by editor is in the "content" variable. + If filter_ is not specified, original content is written unmodified. + Code must be on a single line. + """ + if '\n' in content_filter: + raise ValueError("new lines can't be used in filter_") + with self.editor_path.open('w') as f: + f.write(self.base_script.format(content_filter=content_filter)) + self.editor_path.chmod(0o700) + self.editor_set = True + + @property + def env(self): + """Get environment variable with the editor set""" + if not self.editor_set: + self.set_filter() + return self._env + + @property + def original_content(self): + """Last content received by editor, before any modification + + returns None if editor has not yet been called + """ + try: + with self.ori_content_path.open() as f: + return f.read() + except FileNotFoundError: + return None + + @property + def new_content(self): + """Last content writen by editor + + This is the final content, after filter has been applied to original content + returns None if editor has not yet been called + """ + try: + with self.new_content_path.open() as f: + return f.read() + except FileNotFoundError: + return None + + +@pytest.fixture(scope="session") +def li_json(): + """Run li with "json_raw" output, and returns the parsed value""" + return LiberviaCliJson() + + +@pytest.fixture(scope="session") +def li_elt(): + """Run li with "xml_raw" output, and returns the parsed value""" + return LiberviaCliElt() + + +@pytest.fixture() +def editor(): + """Create a fake editor to automatise edition from CLI""" + return Editor() diff -r 73e04040d577 -r d78b5eae912a tests/e2e/libervia-cli/test_libervia-cli.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/libervia-cli/test_libervia-cli.py Fri Apr 16 18:32:34 2021 +0200 @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import os +import shutil +import pytest +import sh +from sh import li +from sat.tools.common import uri + + +if os.getenv("LIBERVIA_TEST_ENV_E2E") is None: + pytest.skip( + "skipping end-to-end tests, we are not in a test environment", + allow_module_level=True + ) + + +pytestmark = pytest.mark.usefixtures("test_profiles") + + +class TestInstall: + + def test_li_can_run(self): + li("--version") + + +class TestLiberviaCliAccount: + + def test_create_and_delete(self, li_json): + """Create an account in-band, connect it, then delete it and its profile""" + li.account.create( + "test_create@server1.test", + "test", + profile="test_create", + host="server1.test" + ) + profiles = li_json.profile.list() + assert "test_create" in profiles + li.profile.connect(connect=True, profile="test_create") + li.account.delete(profile="test_create", force=True) + li.profile.delete("test_create", force=True) + profiles = li_json.profile.list() + assert "test_create" not in profiles + + +@pytest.mark.usefixtures("pubsub_nodes") +class TestLiberviaCliPubsub: + + def test_node_create_info_delete(self): + node_name = "tmp_node" + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist + li.pubsub.node.info(node=node_name) + try: + li.pubsub.node.create(node=node_name) + # if node exist as expected, following command won't raise an exception + metadata = li.pubsub.node.info(node=node_name) + assert len(metadata.strip()) + finally: + li.pubsub.node.delete(node=node_name, force=True) + + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist anymore + li.pubsub.node.info(node=node_name) + + def test_set_get_delete_purge(self, li_elt): + content = "test item" + payload = f"{content}" + + # we create 3 items and check them + item1_id = li.pubsub.set(node="test", quiet=True, _in=payload) + item2_id = li.pubsub.set(node="test", quiet=True, _in=payload) + item3_id = li.pubsub.set(node="test", quiet=True, _in=payload) + parsed_elt = li_elt.pubsub.get(node="test", item=item1_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = li_elt.pubsub.get(node="test", item=item2_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = li_elt.pubsub.get(node="test", item=item3_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + + # deleting first item should work + li.pubsub.delete(node="test", item=item1_id, force=True) + with pytest.raises(sh.ErrorReturnCode_16): + li.pubsub.get(node="test", item=item1_id) + + # there must be a least item2 and item3 in the node + node_items = li_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) >= 2 + + # after purge, node must be empty + li.pubsub.node.purge(node="test", force=True) + node_items = li_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) == 0 + + def test_edit(self, editor, li_elt): + content = "original item" + payload = f"{content}" + item_id = li.pubsub.set(node="test", quiet=True, _in=payload) + editor.set_filter('content.replace("original", "edited")') + li.pubsub.edit(node="test", item=item_id, _env=editor.env) + assert "original item" in editor.original_content + parsed_elt = li_elt.pubsub.get(node="test", item=item_id) + edited_payload = parsed_elt.firstChildElement() + expected_edited_content = content.replace("original", "edited") + assert edited_payload.name == 'test' + assert str(edited_payload) == expected_edited_content + + def test_affiliations(self, li_json): + affiliations = li_json.pubsub.affiliations() + assert affiliations["test"] == "owner" + + def test_uri(self): + built_uri = li.pubsub.uri( + service="pubsub.example.net", node="some_node" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node" + built_uri = li.pubsub.uri( + service="pubsub.example.net", node="some_node", item="some_item" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" + + +class TestLiberviaCliBlog: + MICROBLOG_NS = "urn:xmpp:microblog:0" + + def test_set_get(self, li_json): + li.blog.set(_in="markdown **bold** [link](https://example.net)") + item_data = li_json.blog.get(max=1) + item = item_data[0][0] + metadata = item_data[1] + assert metadata['service'] == "account1@server1.test" + assert metadata['node'] == self.MICROBLOG_NS + assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} + item_id = item['id'] + expected_uri = uri.buildXMPPUri( + 'pubsub', subtype="microblog", path="account1@server1.test", + node=self.MICROBLOG_NS, item=item_id + ) + assert item['uri'] == expected_uri + assert item['content_xhtml'] == ( + '

markdown bold ' + 'link

' + ) + assert isinstance(item['published'], int) + assert isinstance(item['updated'], int) + assert isinstance(item['comments'], list) + assert isinstance(item['tags'], list) + assert item['author'] == 'account1' + assert item['author_jid'] == 'account1@server1.test' + + def test_edit(self, editor, li_json): + payload_md = "content in **markdown**" + editor.set_filter(repr(payload_md)) + li.blog.edit(_env=editor.env) + assert len(editor.original_content) == 0 + assert editor.new_content == payload_md + items_data = li_json.blog.get(max=1) + last_item = items_data[0][0] + last_item_id = last_item['id'] + assert last_item['content'] == "content in markdown" + assert last_item['content_xhtml'] == ( + "

content in markdown

" + ) + editor.set_filter('f"{content} extended"') + li.blog.edit("--last-item", _env=editor.env) + assert editor.original_content == payload_md + assert editor.new_content == f"{payload_md} extended" + items_data = li_json.blog.get(max=1) + last_item = items_data[0][0] + # we check that the id hasn't been modified + assert last_item['id'] == last_item_id + assert last_item['content'] == "content in markdown extended" + assert last_item['content_xhtml'] == ( + "

content in markdown extended

" + ) + + +class TestLiberviaCliFile: + + def test_upload_get(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + upload_url = li.file.upload(source_file).strip() + + dest_file = fake_file.new_dest_file() + try: + li.file.get(upload_url, dest_file=dest_file) + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + dest_file.unlink() + + assert source_file_hash == dest_file_hash + + def test_send_receive(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True) + dest_path = fake_file.dest_files / "test_send_receive" + dest_path.mkdir() + try: + li.file.receive( + "account1@server1.test", profile="account1_s2", path=dest_path) + dest_file = dest_path / source_file.name + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + shutil.rmtree(dest_path) + send_cmd.wait() + + assert source_file_hash == dest_file_hash diff -r 73e04040d577 -r d78b5eae912a tests/e2e/libervia-web/conftest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/libervia-web/conftest.py Fri Apr 16 18:32:34 2021 +0200 @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import sys +import os +import socket +import pytest +import time +from datetime import datetime +from pathlib import Path +import helium + + +WEB_HOST = "libervia-web.test" +WEB_PORT_HTTPS = 8443 +BASE_URL = f"https://{WEB_HOST}:{WEB_PORT_HTTPS}" +SIZE_DESKTOP = (1024, 728) +SIZE_MOBILE = (380, 640) +accounts_cookies = {} + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + # needed to get test results in request fixture + # cf. https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures + outcome = yield + rep = outcome.get_result() + setattr(item, "rep_" + rep.when, rep) + + +@pytest.fixture +def screenshot_on_failure(request): + yield + if request.node.rep_setup.passed: + if request.node.rep_call.failed: + report_dir = Path(os.getenv("LIBERVIA_TEST_REPORT_DIR", "/tmp/tests_report")) + dest_dir = report_dir/"screenshots" + dest_dir.mkdir(parents=True, exist_ok=True) + filename = f"{datetime.now().isoformat()}_{request.node.name}.png" + dest_path = dest_dir/filename + helium.get_driver().save_screenshot(str(dest_path)) + print(f"screenshot saved to {dest_path}") + + +def wait_for_socket(host, port, retries=30): + sock = socket.socket() + while True: + try: + sock.connect((host, port)) + except ConnectionRefusedError as e: + retries -= 1 + if retries < 0: + print(f"Can't access server at {host}:{port}", file=sys.stderr) + raise e + else: + print(f"Can't connect to {host}:{port}, retrying ({retries})") + time.sleep(1) + else: + break + + +@pytest.fixture(scope="session") +def browser(): + if os.getenv("LIBERVIA_TEST_E2E_WEB_NO_HEADLESS") is not None: + kwargs = {} + else: + kwargs = {"headless": True} + driver = helium.start_firefox(**kwargs) + driver.set_window_size(*SIZE_DESKTOP) + wait_for_socket(WEB_HOST, WEB_PORT_HTTPS) + yield helium + if os.getenv("LIBERVIA_TEST_E2E_WEB_KEEP_BROWSER") is None: + helium.kill_browser() + + +@pytest.fixture +def nobody_logged_in(browser): + browser.get_driver().delete_all_cookies() + +def log_in(browser, account): + try: + account_cookies = accounts_cookies[account] + except KeyError: + browser.get_driver().delete_all_cookies() + browser.go_to("https://libervia-web.test:8443/login") + browser.write(account, into="login") + browser.write("test", into="password") + browser.click("log in") + accounts_cookies[account] = browser.get_driver().get_cookies()[0] + else: + browser.get_driver().add_cookie(account_cookies) + +@pytest.fixture +def log_in_account1(browser): + log_in(browser, "account1") + +@pytest.fixture +def log_in_account1_s2(browser): + log_in(browser, "account1_s2") + +@pytest.fixture +def mobile_screen(browser): + browser.get_driver().set_window_size(*SIZE_MOBILE) + + +@pytest.fixture +def desktop_screen(browser): + browser.get_driver().set_window_size(*SIZE_DESKTOP) diff -r 73e04040d577 -r d78b5eae912a tests/e2e/libervia-web/test_libervia-web.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/libervia-web/test_libervia-web.py Fri Apr 16 18:32:34 2021 +0200 @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import os +import re +import pytest +from helium import ( + go_to, write, press, click, drag_file, find_all, wait_until, S, Text, Link, Button, + get_driver, ENTER +) + + +if os.getenv("LIBERVIA_TEST_ENV_E2E_WEB") is None: + pytest.skip( + "skipping end-to-end tests, we are not in a test environment for Libervia", + allow_module_level=True + ) + +pytestmark = pytest.mark.usefixtures("test_profiles", "screenshot_on_failure") + + +class TestLogin: + + def test_user_can_create_account(self, nobody_logged_in, sent_emails): + go_to("https://libervia-web.test:8443/login") + click("no account yet") + write("new_account", into="login") + write("some_email@example.net", into="email") + write("testtest", into="password") + click("register new account") + wait_until(lambda: get_driver().current_url.endswith("/login")) + write("testtest", into="password") + click("log in") + wait_until(Text("you are logged").exists) + wait_until(lambda: len(sent_emails) == 2) + if sent_emails[0].to == "admin@server1.test": + admin_email, user_email = sent_emails + else: + user_email, admin_email = sent_emails + assert admin_email.to == "admin@server1.test" + # profile name must be specified in admin email + assert "new_account" in admin_email.body + assert user_email.to == "some_email@example.net" + # user jid must be specified in the email + assert "new_account@server1.test" in user_email.body + # use can now log-in + + def test_user_can_log_in(self, nobody_logged_in): + go_to("https://libervia-web.test:8443/login") + write("account1_s3", into="login") + write("test", into="password") + click("log in") + wait_until(Text("you are logged").exists) + assert Button("Disconnect").exists() + + def test_wrong_password_fails(self, nobody_logged_in): + go_to("https://libervia-web.test:8443/login") + write("account1_s2", into="login") + write("wrong_password", into="password") + click("log in") + assert Text("Your login and/or password is incorrect. Please try again.") + + +class TestPhotos: + ACCOUNT1_ALBUMS_URL = ( + "https://libervia-web.test:8443/photos/album/account1@files.server1.test/albums" + ) + TEST_ALBUM_URL = f"{ACCOUNT1_ALBUMS_URL}/test%20album" + + @pytest.mark.dependency(name="create_album") + def test_user_can_create_album(self, log_in_account1): + go_to("https://libervia-web.test:8443/photos") + wait_until(Link("create").exists) + click("create") + write("test album", into="album name") + click("create") + album_link = Link("test album") + wait_until(album_link.exists) + click(album_link) + wait_until(lambda: S("#file_drop").exists()) + wait_until(lambda: not S("#loading_screen").exists()) + drag_file("/src/sat/tests/_files/test_1.jpg", "drop photos here") + wait_until(lambda: len(find_all(S("div.progress_finished")))==1) + drag_file("/src/sat/tests/_files/test_2.jpg", "drop photos here") + wait_until(lambda: len(find_all(S("div.progress_finished")))==2) + assert S('img[alt="test_1.jpg"]').exists() + assert S('img[alt="test_2.jpg"]').exists() + + @pytest.mark.dependency(depends=["create_album"]) + def test_user_can_slideshow(self, log_in_account1): + go_to(self.TEST_ALBUM_URL) + wait_until(lambda: not S("#loading_screen").exists()) + thumb_1 = S("img[alt='test_1.jpg'].is-photo-thumbnail") + assert thumb_1.exists() + click(thumb_1) + assert S("div.slideshow").exists() + active_slide_1 = S("div.swiper-slide-active") + assert active_slide_1.exists() + # if we don't save the web_element here, the test in wait_until fails + # it seems that Helium is re-using the selector, i.e. we get the other + # slide in active_slide_1. + active_slide_1_elt = active_slide_1.web_element + click(S(".swiper-button-next")) + wait_until( + lambda: + "swiper-slide-active" not in active_slide_1_elt.get_attribute("class") + ) + active_slide_2 = S("div.swiper-slide-active") + assert active_slide_2.exists() + active_slide_2_elt = active_slide_2.web_element + assert active_slide_1_elt != active_slide_2_elt + click(S(".click_to_close")) + assert not S("div.slideshow").exists() + + @pytest.mark.dependency(name="ext_user_no_access", depends=["create_album"]) + def test_external_user_cant_access_album(self, log_in_account1_s2): + go_to(self.TEST_ALBUM_URL) + assert Text("Unauthorized").exists() + assert "Error" in get_driver().title + + @pytest.mark.dependency(name="invite_ext_user", depends=["create_album", "ext_user_no_access"]) + def test_invitation_of_external_user(self, log_in_account1): + """User can invite somebody not in its roster by its full JID""" + go_to(self.TEST_ALBUM_URL) + wait_until(lambda: not S("#loading_screen").exists()) + click("manage invitations") + assert Text("people who can access this page").exists() + contact_input = S("input[name='contact']") + write("account1@server2.test", into=contact_input) + press(ENTER) + assert contact_input.web_element.get_attribute("value") == "" + assert Text("account1@server2.test").exists() + + @pytest.mark.dependency(depends=["create_album", "invite_ext_user"]) + def test_invited_user_can_access_album(self, log_in_account1_s2): + go_to(self.TEST_ALBUM_URL) + assert not Text("Unauthorized").exists() + assert not "Error" in get_driver().title + assert len(find_all(S("img.is-photo-thumbnail"))) == 2 + + @pytest.mark.dependency(name="invite_by_email", depends=["create_album"]) + def test_invitation_by_email(self, log_in_account1, sent_emails, shared_data): + """User can invite somebody without XMPP account by email""" + go_to(self.TEST_ALBUM_URL) + wait_until(lambda: not S("#loading_screen").exists()) + click("manage invitations") + assert Text("people who can access this page").exists() + click("invite by email") + wait_until(Text("Invite somebody by email").exists) + write("somebody@example.net", "email") + write("Some Guest", "name") + click("send invitation") + wait_until(lambda: len(sent_emails) == 1) + invitation_email = sent_emails[0] + assert "Welcome" in invitation_email.body + url_match = re.search(r"https:\/\/.+\/g\/\w+", sent_emails[0].body) + assert url_match is not None + shared_data["invitation_url"] = url_match.group() + + @pytest.mark.dependency(depends=["invite_by_email"]) + def test_email_guest_can_access_album(self, nobody_logged_in, shared_data): + go_to(shared_data["invitation_url"]) + click("test album") + wait_until(lambda: not S("#loading_screen").exists()) + assert len(find_all(S("img.is-photo-thumbnail"))) == 2 diff -r 73e04040d577 -r d78b5eae912a tests/e2e/libervia/conftest.py --- a/tests/e2e/libervia/conftest.py Fri Apr 16 18:32:16 2021 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,123 +0,0 @@ -#!/usr/bin/env python3 - -# Libervia: an XMPP client -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import sys -import os -import socket -import pytest -import time -from datetime import datetime -from pathlib import Path -import helium - - -LIBERVIA_HOST = "libervia.test" -LIBERVIA_PORT_HTTPS = 8443 -BASE_URL = f"https://{LIBERVIA_HOST}:{LIBERVIA_PORT_HTTPS}" -SIZE_DESKTOP = (1024, 728) -SIZE_MOBILE = (380, 640) -accounts_cookies = {} - - -@pytest.hookimpl(tryfirst=True, hookwrapper=True) -def pytest_runtest_makereport(item, call): - # needed to get test results in request fixture - # cf. https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures - outcome = yield - rep = outcome.get_result() - setattr(item, "rep_" + rep.when, rep) - - -@pytest.fixture -def screenshot_on_failure(request): - yield - if request.node.rep_setup.passed: - if request.node.rep_call.failed: - report_dir = Path(os.getenv("SAT_TEST_REPORT_DIR", "/tmp/tests_report")) - dest_dir = report_dir/"screenshots" - dest_dir.mkdir(parents=True, exist_ok=True) - filename = f"{datetime.now().isoformat()}_{request.node.name}.png" - dest_path = dest_dir/filename - helium.get_driver().save_screenshot(str(dest_path)) - print(f"screenshot saved to {dest_path}") - - -def wait_for_socket(host, port, retries=30): - sock = socket.socket() - while True: - try: - sock.connect((host, port)) - except ConnectionRefusedError as e: - retries -= 1 - if retries < 0: - print(f"Can't access server at {host}:{port}", file=sys.stderr) - raise e - else: - print(f"Can't connect to {host}:{port}, retrying ({retries})") - time.sleep(1) - else: - break - - -@pytest.fixture(scope="session") -def browser(): - if os.getenv("SAT_TEST_E2E_LIBERVIA_NO_HEADLESS") is not None: - kwargs = {} - else: - kwargs = {"headless": True} - driver = helium.start_firefox(**kwargs) - driver.set_window_size(*SIZE_DESKTOP) - wait_for_socket(LIBERVIA_HOST, LIBERVIA_PORT_HTTPS) - yield helium - if os.getenv("SAT_TEST_E2E_LIBERVIA_KEEP_BROWSER") is None: - helium.kill_browser() - - -@pytest.fixture -def nobody_logged_in(browser): - browser.get_driver().delete_all_cookies() - -def log_in(browser, account): - try: - account_cookies = accounts_cookies[account] - except KeyError: - browser.get_driver().delete_all_cookies() - browser.go_to("https://libervia.test:8443/login") - browser.write(account, into="login") - browser.write("test", into="password") - browser.click("log in") - accounts_cookies[account] = browser.get_driver().get_cookies()[0] - else: - browser.get_driver().add_cookie(account_cookies) - -@pytest.fixture -def log_in_account1(browser): - log_in(browser, "account1") - -@pytest.fixture -def log_in_account1_s2(browser): - log_in(browser, "account1_s2") - -@pytest.fixture -def mobile_screen(browser): - browser.get_driver().set_window_size(*SIZE_MOBILE) - - -@pytest.fixture -def desktop_screen(browser): - browser.get_driver().set_window_size(*SIZE_DESKTOP) diff -r 73e04040d577 -r d78b5eae912a tests/e2e/libervia/test_libervia.py --- a/tests/e2e/libervia/test_libervia.py Fri Apr 16 18:32:16 2021 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,179 +0,0 @@ -#!/usr/bin/env python3 - -# Libervia: an XMPP client -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import os -import re -import pytest -from helium import ( - go_to, write, press, click, drag_file, find_all, wait_until, S, Text, Link, Button, - get_driver, ENTER -) - - -if os.getenv("SAT_TEST_ENV_E2E_LIBERVIA") is None: - pytest.skip( - "skipping end-to-end tests, we are not in a test environment for Libervia", - allow_module_level=True - ) - -pytestmark = pytest.mark.usefixtures("test_profiles", "screenshot_on_failure") - - -class TestLogin: - - def test_user_can_create_account(self, nobody_logged_in, sent_emails): - go_to("https://libervia.test:8443/login") - click("no account yet") - write("new_account", into="login") - write("some_email@example.net", into="email") - write("testtest", into="password") - click("register new account") - wait_until(lambda: get_driver().current_url.endswith("/login")) - write("testtest", into="password") - click("log in") - wait_until(Text("you are logged").exists) - wait_until(lambda: len(sent_emails) == 2) - if sent_emails[0].to == "admin@server1.test": - admin_email, user_email = sent_emails - else: - user_email, admin_email = sent_emails - assert admin_email.to == "admin@server1.test" - # profile name must be specified in admin email - assert "new_account" in admin_email.body - assert user_email.to == "some_email@example.net" - # user jid must be specified in the email - assert "new_account@server1.test" in user_email.body - # use can now log-in - - def test_user_can_log_in(self, nobody_logged_in): - go_to("https://libervia.test:8443/login") - write("account1_s3", into="login") - write("test", into="password") - click("log in") - wait_until(Text("you are logged").exists) - assert Button("Disconnect").exists() - - def test_wrong_password_fails(self, nobody_logged_in): - go_to("https://libervia.test:8443/login") - write("account1_s2", into="login") - write("wrong_password", into="password") - click("log in") - assert Text("Your login and/or password is incorrect. Please try again.") - - -class TestPhotos: - ACCOUNT1_ALBUMS_URL = ( - "https://libervia.test:8443/photos/album/account1@files.server1.test/albums" - ) - TEST_ALBUM_URL = f"{ACCOUNT1_ALBUMS_URL}/test%20album" - - @pytest.mark.dependency(name="create_album") - def test_user_can_create_album(self, log_in_account1): - go_to("https://libervia.test:8443/photos") - wait_until(Link("create").exists) - click("create") - write("test album", into="album name") - click("create") - album_link = Link("test album") - wait_until(album_link.exists) - click(album_link) - wait_until(lambda: not S("#loading_screen").exists()) - drag_file("/src/sat/tests/_files/test_1.jpg", "drop photos here") - wait_until(lambda: len(find_all(S("div.progress_finished")))==1) - drag_file("/src/sat/tests/_files/test_2.jpg", "drop photos here") - wait_until(lambda: len(find_all(S("div.progress_finished")))==2) - assert S('img[alt="test_1.jpg"]').exists() - assert S('img[alt="test_2.jpg"]').exists() - - @pytest.mark.dependency(depends=["create_album"]) - def test_user_can_slideshow(self, log_in_account1): - go_to(self.TEST_ALBUM_URL) - wait_until(lambda: not S("#loading_screen").exists()) - thumb_1 = S("img[alt='test_1.jpg'].is-photo-thumbnail") - assert thumb_1.exists() - click(thumb_1) - assert S("div.slideshow").exists() - active_slide_1 = S("div.swiper-slide-active") - assert active_slide_1.exists() - # if we don't save the web_element here, the test in wait_until fails - # it seems that Helium is re-using the selector, i.e. we get the other - # slide in active_slide_1. - active_slide_1_elt = active_slide_1.web_element - click(S(".swiper-button-next")) - wait_until( - lambda: - "swiper-slide-active" not in active_slide_1_elt.get_attribute("class") - ) - active_slide_2 = S("div.swiper-slide-active") - assert active_slide_2.exists() - active_slide_2_elt = active_slide_2.web_element - assert active_slide_1_elt != active_slide_2_elt - click(S(".click_to_close")) - assert not S("div.slideshow").exists() - - @pytest.mark.dependency(name="ext_user_no_access", depends=["create_album"]) - def test_external_user_cant_access_album(self, log_in_account1_s2): - go_to(self.TEST_ALBUM_URL) - assert Text("Unauthorized").exists() - assert "Error" in get_driver().title - - @pytest.mark.dependency(name="invite_ext_user", depends=["create_album", "ext_user_no_access"]) - def test_invitation_of_external_user(self, log_in_account1): - """User can invite somebody not in its roster by its full JID""" - go_to(self.TEST_ALBUM_URL) - wait_until(lambda: not S("#loading_screen").exists()) - click("manage invitations") - assert Text("people who can access this album").exists() - contact_input = S("input[name='contact']") - write("account1@server2.test", into=contact_input) - press(ENTER) - assert contact_input.web_element.get_attribute("value") == "" - assert Text("account1@server2.test").exists() - - @pytest.mark.dependency(depends=["create_album", "invite_ext_user"]) - def test_invited_user_can_access_album(self, log_in_account1_s2): - go_to(self.TEST_ALBUM_URL) - assert not Text("Unauthorized").exists() - assert not "Error" in get_driver().title - assert len(find_all(S("img.is-photo-thumbnail"))) == 2 - - @pytest.mark.dependency(name="invite_by_email", depends=["create_album"]) - def test_invitation_by_email(self, log_in_account1, sent_emails, shared_data): - """User can invite somebody without XMPP account by email""" - go_to(self.TEST_ALBUM_URL) - wait_until(lambda: not S("#loading_screen").exists()) - click("manage invitations") - assert Text("people who can access this album").exists() - click("invite by email") - wait_until(Text("Invite somebody by email").exists) - write("somebody@example.net", "email") - write("Some Guest", "name") - click("send invitation") - wait_until(lambda: len(sent_emails) == 1) - invitation_email = sent_emails[0] - assert "Welcome" in invitation_email.body - url_match = re.search(r"https:\/\/.+\/g\/\w+", sent_emails[0].body) - assert url_match is not None - shared_data["invitation_url"] = url_match.group() - - @pytest.mark.dependency(depends=["invite_by_email"]) - def test_email_guest_can_access_album(self, nobody_logged_in, shared_data): - go_to(shared_data["invitation_url"]) - click("test album") - wait_until(lambda: not S("#loading_screen").exists()) - assert len(find_all(S("img.is-photo-thumbnail"))) == 2 diff -r 73e04040d577 -r d78b5eae912a tests/e2e/run_e2e.py --- a/tests/e2e/run_e2e.py Fri Apr 16 18:32:16 2021 +0200 +++ b/tests/e2e/run_e2e.py Fri Apr 16 18:32:34 2021 +0200 @@ -25,10 +25,10 @@ from datetime import datetime import sh import io -import re import sat_templates import libervia from sat.core import exceptions +from sat.tools.common import regex import yaml try: from yaml import CLoader as Loader, CDumper as Dumper @@ -48,22 +48,48 @@ from helium import * start_firefox() - go_to("https://libervia.test:8443/login") + go_to("https://libervia-web.test:8443/login") write("account1", "login") write("test", "password") click("log in") """) report_buffer = io.StringIO() +live_out_buf = [] +live_err_buf = [] def live_out(data): - sys.stdout.write(data) + if live_out_buf: + # we may get bytes when buffer is reached and we are in the middle of an unicode + # sequence. In this case we buffer it, and print it when it's complete + if isinstance(data, str): + data = b''.join(live_out_buf).decode() + data + live_out_buf.clear() + else: + live_out_buf.append(data) + return + try: + sys.stdout.write(data) + except TypeError: + live_out_buf.append(data) + return sys.stdout.flush() report_buffer.write(data) def live_err(data): - sys.stderr.write(data) + if live_err_buf: + if isinstance(data, str): + data = b''.join(live_err_buf).decode() + data + live_err_buf.clear() + else: + live_err_buf.append(data) + return + try: + sys.stderr.write(data) + except TypeError: + live_err_buf.append(data) + return sys.stderr.flush() report_buffer.write(data) @@ -82,9 +108,24 @@ def set_env(override, name, value="1"): """Set environement variable""" - environment = override["services"]["sat"].setdefault("environment", {}) + environment = override["services"]["backend"].setdefault("environment", {}) environment[name] = value +def write_report_log(path, log_raw, with_ansi=False): + log_raw = str(log_raw) + if with_ansi: + # we save 2 versions: one with ANSI escape codes + report_ansi = path.with_suffix(".ansi") + with report_ansi.open('w') as f: + f.write(log_raw) + # and one cleaner, without them + report_log = path.with_suffix(".log") + with report_log.open('w') as f: + f.write(regex.RE_ANSI_REMOVE.sub('', log_raw)) + else: + report_log = path.with_suffix(".log") + with report_log.open('w') as f: + f.write(log_raw) def use_e2e_env(): visual = get_opt(OPT_VISUAL) @@ -105,7 +146,7 @@ package_path = p / "sat" docker_path = p / "docker" if package_path.is_dir() and docker_path.is_dir(): - sat_root_path = p + backend_root_path = p break else: raise exceptions.NotFound( @@ -113,14 +154,14 @@ "from the backend repository?" ) - libervia_path = Path(libervia.__file__).parent.resolve() - libervia_root_path = libervia_path.parent - if (libervia_root_path / ".hg").is_dir(): - libervia_source = libervia_root_path - libervia_target = "/src/libervia" + libervia_web_path = Path(libervia.__file__).parent.resolve() + libervia_web_root_path = libervia_web_path.parent + if (libervia_web_root_path / ".hg").is_dir(): + libervia_web_source = libervia_web_root_path + libervia_web_target = "/src/libervia" else: - libervia_source = libervia_path - libervia_target = "/src/libervia/libervia" + libervia_web_source = libervia_web_path + libervia_web_target = "/src/libervia/libervia" sat_templates_path = Path(sat_templates.__file__).parent.resolve() sat_templates_root_path = sat_templates_path.parent @@ -131,9 +172,9 @@ sat_templates_source = sat_templates_path sat_templates_target = "/src/sat_templates/sat_templates" - compose_e2e_path = docker_path / "docker-compose_e2e.yml" + compose_e2e_path = docker_path / "docker-compose-e2e.yml" if not compose_e2e_path.is_file(): - raise exceptions.NotFound('"docker-compose_e2e.yml" file can\'t be found') + raise exceptions.NotFound('"docker-compose-e2e.yml" file can\'t be found') with tempfile.TemporaryDirectory(prefix="sat_test_e2e_") as temp_dir: override_path = Path(temp_dir) / "test_override.yml" @@ -141,21 +182,21 @@ dedent(f"""\ version: "3.6" services: - sat: + backend: volumes: - type: bind - source: {sat_root_path} + source: {backend_root_path} target: /src/sat read_only: true - libervia: + web: volumes: - type: bind - source: {sat_root_path} + source: {backend_root_path} target: /src/sat read_only: true - type: bind - source: {libervia_source} - target: {libervia_target} + source: {libervia_web_source} + target: {libervia_web_target} read_only: true - type: bind source: {sat_templates_source} @@ -167,13 +208,13 @@ ) if keep_profiles: - set_env(override, "SAT_TEST_E2E_KEEP_PROFILES") + set_env(override, "LIBERVIA_TEST_E2E_KEEP_PROFILES") if visual: - set_env(override, "SAT_TEST_E2E_LIBERVIA_NO_HEADLESS") + set_env(override, "LIBERVIA_TEST_E2E_WEB_NO_HEADLESS") if keep_browser: - set_env(override, "SAT_TEST_E2E_LIBERVIA_KEEP_BROWSER") + set_env(override, "LIBERVIA_TEST_E2E_WEB_KEEP_BROWSER") with override_path.open("w") as f: yaml.dump(override, f, Dumper=Dumper) @@ -183,13 +224,13 @@ docker_compose.up("-d") p = docker_compose.exec( - "-T", "--workdir", "/src/sat/tests", "sat", + "-T", "--workdir", "/src/sat/tests", "backend", "pytest", "-o", "cache_dir=/tmp", *sys.argv[1:], color="yes", _in=sys.stdin, _out=live_out, _out_bufsize=0, _err=live_err, _err_bufsize=0, _bg=True ) if visual: - vnc_port = docker_compose.port("sat", "5900").split(':', 1)[1].strip() + vnc_port = docker_compose.port("backend", "5900").split(':', 1)[1].strip() p_vnc = sh.vncviewer( f"localhost:{vnc_port}", _bg=True, @@ -204,22 +245,25 @@ try: p.wait() except sh.ErrorReturnCode as e: - sat_cont_id = docker_compose.ps("-q", "sat").strip() - report_dest = Path(f"reports_{datetime.now().isoformat()}/") + libervia_cont_id = docker_compose.ps("-q", "backend").strip() + report_dest = Path(f"report_{datetime.now().isoformat()}/") # we need to make `report_dest` explicitely local with "./", otherwise # docker parse takes it as a container path due to the presence of ":" # with `isoformat()`. - sh.docker.cp(f"{sat_cont_id}:/reports", f"./{report_dest}") - # we save 2 versions: one with ANSI escape codes - report_ansi = report_dest / "report.ansi" - with report_ansi.open('w') as f: - f.write(report_buffer.getvalue()) - # and one without (cf. https://stackoverflow.com/a/14693789) - ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - report_log = report_dest / "report.log" - with report_log.open('w') as f: - f.write(ansi_escape.sub('', report_buffer.getvalue())) - + sh.docker.cp(f"{libervia_cont_id}:/reports", f"./{report_dest}") + write_report_log( + report_dest/"report", + report_buffer.getvalue(), + with_ansi=True + ) + write_report_log( + report_dest/"backend", + docker_compose.logs("--no-log-prefix", "backend") + ) + write_report_log( + report_dest/"web", + docker_compose.logs("--no-log-prefix", "web") + ) print(f"report saved to {report_dest}") sys.exit(e.exit_code) finally: