Mercurial > libervia-backend
diff tests/e2e/test_jp.py @ 3415:814e118d9ef3
tests: end-2-end tests first draft:
- e2e tests are launched inside the new docker e2e test environment
- `run_e2e.py` launch the docker container, mount the current code base in it, launch the
e2e tests and print report in real time
- `conftest.py` are pytest fixtures managing many things such as account creation, fake files
management, JSON or Domish.Element parsing, fake editor, etc.
- `test_jp.py` are end-to-end test done with `jp`. `sh` library is used to make tests
writting as user-friendly as possible. The `SAT_TEST_ENV_E2E` environment variable is
checked, and tests will be skipped if it's not set.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 12 Nov 2020 14:53:16 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/test_jp.py Thu Nov 12 14:53:16 2020 +0100 @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import shutil +import pytest +import sh +from sh import jp +from sat.tools.common import uri + + +if os.getenv("SAT_TEST_ENV_E2E") is None: + pytest.skip( + "skipping end-to-end tests, we are not in a test environment", + allow_module_level=True + ) + + +pytestmark = pytest.mark.usefixtures("test_profiles") + + +class TestInstall: + + def test_jp_can_run(self): + jp("--version") + + +class TestJpAccount: + + def test_create_and_delete(self, jp_json): + """Create an account in-band, connect it, then delete it and its profile""" + jp.account.create( + "test_create@server1.test", + "test", + profile="test_create", + host="server1.test" + ) + profiles = jp_json.profile.list() + assert "test_create" in profiles + jp.profile.connect(connect=True, profile="test_create") + jp.account.delete(profile="test_create", force=True) + jp.profile.delete("test_create", force=True) + profiles = jp_json.profile.list() + assert "test_create" not in profiles + + +@pytest.mark.usefixtures("pubsub_nodes") +class TestJpPubsub: + + def test_node_create_info_delete(self): + node_name = "tmp_node" + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist + jp.pubsub.node.info(node=node_name) + try: + jp.pubsub.node.create(node=node_name) + # if node exist has expected, following command won't raise an exception + metadata = jp.pubsub.node.info(node=node_name) + assert len(metadata.strip()) + finally: + jp.pubsub.node.delete(node=node_name, force=True) + + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist anymore + jp.pubsub.node.info(node=node_name) + + def test_set_get_delete_purge(self, jp_elt): + content = "test item" + payload = f"<test>{content}</test>" + + # we create 3 items and check them + item1_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + item2_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + item3_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + parsed_elt = jp_elt.pubsub.get(node="test", item=item1_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = jp_elt.pubsub.get(node="test", item=item2_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = jp_elt.pubsub.get(node="test", item=item3_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + + # deleting first item should work + jp.pubsub.delete(node="test", item=item1_id, force=True) + with pytest.raises(sh.ErrorReturnCode_16): + jp.pubsub.get(node="test", item=item1_id) + + # there must be a least item2 and item3 in the node + node_items = jp_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) >= 2 + + # after purge, node must be empty + jp.pubsub.node.purge(node="test", force=True) + node_items = jp_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) == 0 + + def test_edit(self, editor, jp_elt): + content = "original item" + payload = f"<test>{content}</test>" + item_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + editor.set_filter('content.replace("original", "edited")') + jp.pubsub.edit(node="test", item=item_id, _env=editor.env) + assert "original item" in editor.original_content + parsed_elt = jp_elt.pubsub.get(node="test", item=item_id) + edited_payload = parsed_elt.firstChildElement() + expected_edited_content = content.replace("original", "edited") + assert edited_payload.name == 'test' + assert str(edited_payload) == expected_edited_content + + def test_affiliations(self, jp_json): + affiliations = jp_json.pubsub.affiliations() + assert affiliations["test"] == "owner" + + def test_uri(self): + built_uri = jp.pubsub.uri( + service="pubsub.example.net", node="some_node" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node" + built_uri = jp.pubsub.uri( + service="pubsub.example.net", node="some_node", item="some_item" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" + + +class TestJpBlog: + MICROBLOG_NS = "urn:xmpp:microblog:0" + + def test_set_get(self, jp_json): + jp.blog.set(_in="markdown **bold** [link](https://example.net)") + item_data = jp_json.blog.get(max=1) + item = item_data[0][0] + metadata = item_data[1] + assert metadata['service'] == "account1@server1.test" + assert metadata['node'] == self.MICROBLOG_NS + assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} + item_id = item['id'] + expected_uri = uri.buildXMPPUri( + 'pubsub', subtype="microblog", path="account1@server1.test", + node=self.MICROBLOG_NS, item=item_id + ) + assert item['uri'] == expected_uri + assert item['content_xhtml'] == ( + '<div><p>markdown <strong>bold</strong> ' + '<a href="https://example.net">link</a></p></div>' + ) + assert isinstance(item['published'], int) + assert isinstance(item['updated'], int) + assert isinstance(item['comments'], list) + assert isinstance(item['tags'], list) + assert item['author'] == 'account1' + assert item['author_jid'] == 'account1@server1.test' + + def test_edit(self, editor, jp_json): + payload_md = "content in **markdown**" + editor.set_filter(repr(payload_md)) + jp.blog.edit(_env=editor.env) + assert len(editor.original_content) == 0 + assert editor.new_content == payload_md + items_data = jp_json.blog.get(max=1) + last_item = items_data[0][0] + last_item_id = last_item['id'] + assert last_item['content'] == "content in markdown" + assert last_item['content_xhtml'] == ( + "<div><p>content in <strong>markdown</strong></p></div>" + ) + editor.set_filter('f"{content} extended"') + jp.blog.edit("--last-item", _env=editor.env) + assert editor.original_content == payload_md + assert editor.new_content == f"{payload_md} extended" + items_data = jp_json.blog.get(max=1) + last_item = items_data[0][0] + # we check that the id hasn't been modified + assert last_item['id'] == last_item_id + assert last_item['content'] == "content in markdown extended" + assert last_item['content_xhtml'] == ( + "<div><p>content in <strong>markdown</strong> extended</p></div>" + ) + + +class TestJpFile: + + def test_upload_get(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + upload_url = jp.file.upload(source_file).strip() + + dest_file = fake_file.new_dest_file() + try: + jp.file.get(upload_url, dest_file=dest_file) + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + dest_file.unlink() + + assert source_file_hash == dest_file_hash + + def test_send_receive(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + send_cmd = jp.file.send(source_file, "account1@server2.test", _bg=True) + dest_path = fake_file.dest_files / "test_send_receive" + dest_path.mkdir() + try: + jp.file.receive( + "account1@server1.test", profile="account1_s2", path=dest_path) + dest_file = dest_path / source_file.name + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + shutil.rmtree(dest_path) + send_cmd.wait() + + assert source_file_hash == dest_file_hash