Mercurial > libervia-backend
changeset 3415:814e118d9ef3
tests: end-2-end tests first draft:
- e2e tests are launched inside the new docker e2e test environment
- `run_e2e.py` launch the docker container, mount the current code base in it, launch the
e2e tests and print report in real time
- `conftest.py` are pytest fixtures managing many things such as account creation, fake files
management, JSON or Domish.Element parsing, fake editor, etc.
- `test_jp.py` are end-to-end test done with `jp`. `sh` library is used to make tests
writting as user-friendly as possible. The `SAT_TEST_ENV_E2E` environment variable is
checked, and tests will be skipped if it's not set.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 12 Nov 2020 14:53:16 +0100 |
parents | ffe7a6d6018a |
children | d7cfb031e41f |
files | tests/e2e/conftest.py tests/e2e/run_e2e.py tests/e2e/test_jp.py |
diffstat | 3 files changed, 645 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/conftest.py Thu Nov 12 14:53:16 2020 +0100 @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import sys +import os +import tempfile +import string +import hashlib +import random +from pathlib import Path +from textwrap import dedent +import json +import pytest +from sh import jp + + +class JpJson: + """jp like commands parsing result as JSON""" + + def __init__(self): + self.subcommands = [] + + def __call__(self, *args, **kwargs): + args = self.subcommands + list(args) + self.subcommands.clear() + kwargs['output'] = 'json_raw' + kwargs['_tty_out'] = False + cmd = jp(*args, **kwargs) + return json.loads(cmd.stdout) + + def __getattr__(self, name): + if name.startswith('_'): + # no jp subcommand starts with a "_", + # and pytest uses some attributes with this name scheme + return super().__getattr__(name) + self.subcommands.append(name) + return self + + +class JpElt(JpJson): + """jp like commands parsing result as domishElement""" + + def __init__(self): + super().__init__() + from sat.tools.xml_tools import ElementParser + self.parser = ElementParser() + + def __call__(self, *args, **kwargs): + args = self.subcommands + list(args) + self.subcommands.clear() + kwargs['output'] = 'xml_raw' + kwargs['_tty_out'] = False + cmd = jp(*args, **kwargs) + return self.parser(cmd.stdout.decode().strip()) + + +class Editor: + + def __init__(self): + # temporary directory will be deleted Automatically when this object will be + # destroyed + self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") + self.tmp_dir_path = Path(self.tmp_dir_obj.name) + if not sys.executable: + raise Exception("Can't find python executable") + self.editor_set = False + self.editor_path = self.tmp_dir_path / "editor.py" + self.ori_content_path = self.tmp_dir_path / "original_content" + self.new_content_path = self.tmp_dir_path / "new_content" + self.base_script = dedent(f"""\ + #!{sys.executable} + import sys + + def content_filter(content): + return {{content_filter}} + + with open(sys.argv[1], 'r+') as f: + original_content = f.read() + f.seek(0) + new_content = content_filter(original_content) + f.write(new_content) + f.truncate() + + with open("{self.ori_content_path}", "w") as f: + f.write(original_content) + + with open("{self.new_content_path}", "w") as f: + f.write(new_content) + """ + ) + self._env = os.environ.copy() + self._env["EDITOR"] = str(self.editor_path) + + def set_filter(self, content_filter: str = "content"): + """Python code to modify original content + + The code will be applied to content received by editor. + The original content received by editor is in the "content" variable. + If filter_ is not specified, original content is written unmodified. + Code must be on a single line. + """ + if '\n' in content_filter: + raise ValueError("new lines can't be used in filter_") + with self.editor_path.open('w') as f: + f.write(self.base_script.format(content_filter=content_filter)) + self.editor_path.chmod(0o700) + self.editor_set = True + + @property + def env(self): + """Get environment variable with the editor set""" + if not self.editor_set: + self.set_filter() + return self._env + + @property + def original_content(self): + """Last content received by editor, before any modification + + returns None if editor has not yet been called + """ + try: + with self.ori_content_path.open() as f: + return f.read() + except FileNotFoundError: + return None + + @property + def new_content(self): + """Last content writen by editor + + This is the final content, after filter has been applied to original content + returns None if editor has not yet been called + """ + try: + with self.new_content_path.open() as f: + return f.read() + except FileNotFoundError: + return None + + +class FakeFile: + ALPHABET = f"{string.ascii_letters}{string.digits}_" + BUF_SIZE = 65535 + + def __init__(self): + self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_files_") + self.tmp_dir_path = Path(self.tmp_dir_obj.name) + self.source_files = self.tmp_dir_path / "source" + self.source_files.mkdir() + self.dest_files = self.tmp_dir_path / "dest" + self.dest_files.mkdir() + self.hashes = {} + + @property + def dest_path(self): + """Path of a directory where files can be received + + The directory will be deleted at the end of session. + Files from other test can be present, be sure to create a unique subdirectory or + to use a unique destination file name + """ + return self.dest_files + + def new_dest_file(self) -> Path: + """Path to a randomly named destination file + + The file will be in self.dest_path. + The file should be deleted after use. If not, it will be deleted at the end of + session with the whole temporary test files directory. + """ + name = ''.join(random.choices(self.ALPHABET, k=8)) + return self.dest_files / name + + def size(self, size: int, use_cache: bool = True): + """Create a file of requested size, and returns its path + + @param use_cache: if True and a file of this size already exists, it is re-used + """ + dest_path = self.source_files / str(size) + if not use_cache or not dest_path.exists(): + hash_ = hashlib.sha256() + remaining = size + with dest_path.open('wb') as f: + while remaining: + if remaining > self.BUF_SIZE: + to_get = self.BUF_SIZE + else: + to_get = remaining + buf = os.urandom(to_get) + f.write(buf) + hash_.update(buf) + remaining -= to_get + self.hashes[dest_path] = hash_.hexdigest() + return dest_path + + def get_source_hash(self, source_file: Path) -> str: + """Retrieve hash calculated for a generated source file""" + return self.hashes[source_file] + + def get_dest_hash(self, dest_file: Path) -> str: + """Calculate hash of file at given path""" + hash_ = hashlib.sha256() + with dest_file.open('rb') as f: + while True: + buf = f.read(self.BUF_SIZE) + if not buf: + break + hash_.update(buf) + return hash_.hexdigest() + + +@pytest.fixture(scope="session") +def jp_json(): + """Run jp with "json_raw" output, and returns the parsed value""" + return JpJson() + + +@pytest.fixture(scope="session") +def jp_elt(): + """Run jp with "xml_raw" output, and returns the parsed value""" + return JpElt() + + +@pytest.fixture(scope="session") +def test_profiles(): + """Test accounts created using in-band registration + + They will be removed at the end of session. + The number of account per servers is set in the "accounts_by_servers" dict. + Jids are in the form "account[x]@server[y].test". + The profiles used are in the form "account[x]" for server1.test, and + "account[x]_s[y]" for other servers. + Password is "test" for all profiles and XMPP accounts. + "account1" is connected and set as default profile + Profiles created are returned as a tuple + """ + profiles = [] + nb_servers = 3 + accounts_by_servers = { + 1: 1, + 2: 1, + 3: 0, + } + for server_idx in range(1, nb_servers+1): + account_stop = accounts_by_servers[server_idx] + 1 + for account_idx in range(1, account_stop): + profile_suff = f"_s{server_idx}" if server_idx>1 else "" + profile = f"account{account_idx}{profile_suff}" + profiles.append(profile) + jp.account.create( + f"account{account_idx}@server{server_idx}.test", + "test", + profile=profile, + host=f"server{server_idx}.test" + ) + jp.profile.modify(profile="account1", default=True, connect=True) + jp.profile.connect(profile="account1_s2", connect=True) + yield tuple(profiles) + for profile in profiles: + jp.account.delete(profile=profile, connect=True, force=True) + jp.profile.delete(profile, force=True) + + +@pytest.fixture(scope="class") +def pubsub_nodes(test_profiles): + """Create 2 testing nodes + + Both nodes will be created with "account1" profile, named "test" and have and "open" + access model. + One node will account1's PEP, the other one on pubsub.server1.test. + """ + jp.pubsub.node.create( + "-f", "access_model", "open", + node="test", + profile="account1", connect=True + ) + jp.pubsub.node.create( + "-f", "access_model", "open", + service="pubsub.server1.test", node="test", + profile="account1" + ) + yield + jp.pubsub.node.delete( + node="test", + profile="account1", connect=True, + force=True + ) + jp.pubsub.node.delete( + service="pubsub.server1.test", node="test", + profile="account1", + force=True + ) + + +@pytest.fixture() +def editor(): + """Create a fake editor to automatise edition from CLI""" + return Editor() + + +@pytest.fixture(scope="session") +def fake_file(): + """Manage dummy files creation and destination path""" + return FakeFile()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/run_e2e.py Thu Nov 12 14:53:16 2020 +0100 @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Run end-to-end tests in appropriate Docker environment""" + +import sys +from pathlib import Path +import tempfile +from textwrap import dedent +import sh +from sat.core import exceptions + +KEEP_OPT = "--keep" + + +def live_out(data): + sys.stdout.write(data) + sys.stdout.flush() + + +def live_err(data): + sys.stderr.write(data) + sys.stderr.flush() + + +def use_e2e_env(): + if KEEP_OPT in sys.argv: + keep_containers = True + sys.argv.remove(KEEP_OPT) + else: + keep_containers = False + for p in Path.cwd().parents: + package_path = p / "sat" + docker_path = p / "docker" + if package_path.is_dir() and docker_path.is_dir(): + sat_root_path = p + break + else: + raise exceptions.NotFound( + "Can't find root of SàT code, are you sure that you are running the test " + "from the backend repository?" + ) + + compose_e2e_path = docker_path / "docker-compose_e2e.yml" + if not compose_e2e_path.is_file(): + raise exceptions.NotFound('"docker-compose_e2e.yml" file can\'t be found') + + with tempfile.TemporaryDirectory() as temp_dir: + override_path = Path(temp_dir) / "test_override.yml" + with override_path.open("w") as f: + f.write(dedent(f"""\ + version: "3.6" + services: + sat: + volumes: + - type: bind + source: {sat_root_path} + target: /src/sat + read_only: true + """)) + + docker_compose = sh.docker_compose.bake( + "-f", compose_e2e_path, "-f", override_path) + docker_compose.up("-d") + + try: + docker_compose.exec( + "-T", "--workdir", "/src/sat/tests", "sat", + "pytest", "-o", "cache_dir=/tmp", *sys.argv[1:], color="yes", + _in=sys.stdin, _out=live_out, _out_bufsize=0, _err=live_err, _err_bufsize=0 + ) + except sh.ErrorReturnCode as e: + sys.exit(e.exit_code) + finally: + if not keep_containers: + docker_compose.down(volumes=True) + +if __name__ == "__main__": + use_e2e_env()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/e2e/test_jp.py Thu Nov 12 14:53:16 2020 +0100 @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 + +# SàT: an XMPP client +# Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import shutil +import pytest +import sh +from sh import jp +from sat.tools.common import uri + + +if os.getenv("SAT_TEST_ENV_E2E") is None: + pytest.skip( + "skipping end-to-end tests, we are not in a test environment", + allow_module_level=True + ) + + +pytestmark = pytest.mark.usefixtures("test_profiles") + + +class TestInstall: + + def test_jp_can_run(self): + jp("--version") + + +class TestJpAccount: + + def test_create_and_delete(self, jp_json): + """Create an account in-band, connect it, then delete it and its profile""" + jp.account.create( + "test_create@server1.test", + "test", + profile="test_create", + host="server1.test" + ) + profiles = jp_json.profile.list() + assert "test_create" in profiles + jp.profile.connect(connect=True, profile="test_create") + jp.account.delete(profile="test_create", force=True) + jp.profile.delete("test_create", force=True) + profiles = jp_json.profile.list() + assert "test_create" not in profiles + + +@pytest.mark.usefixtures("pubsub_nodes") +class TestJpPubsub: + + def test_node_create_info_delete(self): + node_name = "tmp_node" + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist + jp.pubsub.node.info(node=node_name) + try: + jp.pubsub.node.create(node=node_name) + # if node exist has expected, following command won't raise an exception + metadata = jp.pubsub.node.info(node=node_name) + assert len(metadata.strip()) + finally: + jp.pubsub.node.delete(node=node_name, force=True) + + with pytest.raises(sh.ErrorReturnCode_16): + # the node should not exist anymore + jp.pubsub.node.info(node=node_name) + + def test_set_get_delete_purge(self, jp_elt): + content = "test item" + payload = f"<test>{content}</test>" + + # we create 3 items and check them + item1_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + item2_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + item3_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + parsed_elt = jp_elt.pubsub.get(node="test", item=item1_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = jp_elt.pubsub.get(node="test", item=item2_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + parsed_elt = jp_elt.pubsub.get(node="test", item=item3_id) + payload = parsed_elt.firstChildElement() + assert payload.name == 'test' + assert str(payload) == content + + # deleting first item should work + jp.pubsub.delete(node="test", item=item1_id, force=True) + with pytest.raises(sh.ErrorReturnCode_16): + jp.pubsub.get(node="test", item=item1_id) + + # there must be a least item2 and item3 in the node + node_items = jp_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) >= 2 + + # after purge, node must be empty + jp.pubsub.node.purge(node="test", force=True) + node_items = jp_elt.pubsub.get(node="test") + assert len(list(node_items.elements())) == 0 + + def test_edit(self, editor, jp_elt): + content = "original item" + payload = f"<test>{content}</test>" + item_id = jp.pubsub.set(node="test", quiet=True, _in=payload) + editor.set_filter('content.replace("original", "edited")') + jp.pubsub.edit(node="test", item=item_id, _env=editor.env) + assert "original item" in editor.original_content + parsed_elt = jp_elt.pubsub.get(node="test", item=item_id) + edited_payload = parsed_elt.firstChildElement() + expected_edited_content = content.replace("original", "edited") + assert edited_payload.name == 'test' + assert str(edited_payload) == expected_edited_content + + def test_affiliations(self, jp_json): + affiliations = jp_json.pubsub.affiliations() + assert affiliations["test"] == "owner" + + def test_uri(self): + built_uri = jp.pubsub.uri( + service="pubsub.example.net", node="some_node" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node" + built_uri = jp.pubsub.uri( + service="pubsub.example.net", node="some_node", item="some_item" + ).strip() + assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" + + +class TestJpBlog: + MICROBLOG_NS = "urn:xmpp:microblog:0" + + def test_set_get(self, jp_json): + jp.blog.set(_in="markdown **bold** [link](https://example.net)") + item_data = jp_json.blog.get(max=1) + item = item_data[0][0] + metadata = item_data[1] + assert metadata['service'] == "account1@server1.test" + assert metadata['node'] == self.MICROBLOG_NS + assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} + item_id = item['id'] + expected_uri = uri.buildXMPPUri( + 'pubsub', subtype="microblog", path="account1@server1.test", + node=self.MICROBLOG_NS, item=item_id + ) + assert item['uri'] == expected_uri + assert item['content_xhtml'] == ( + '<div><p>markdown <strong>bold</strong> ' + '<a href="https://example.net">link</a></p></div>' + ) + assert isinstance(item['published'], int) + assert isinstance(item['updated'], int) + assert isinstance(item['comments'], list) + assert isinstance(item['tags'], list) + assert item['author'] == 'account1' + assert item['author_jid'] == 'account1@server1.test' + + def test_edit(self, editor, jp_json): + payload_md = "content in **markdown**" + editor.set_filter(repr(payload_md)) + jp.blog.edit(_env=editor.env) + assert len(editor.original_content) == 0 + assert editor.new_content == payload_md + items_data = jp_json.blog.get(max=1) + last_item = items_data[0][0] + last_item_id = last_item['id'] + assert last_item['content'] == "content in markdown" + assert last_item['content_xhtml'] == ( + "<div><p>content in <strong>markdown</strong></p></div>" + ) + editor.set_filter('f"{content} extended"') + jp.blog.edit("--last-item", _env=editor.env) + assert editor.original_content == payload_md + assert editor.new_content == f"{payload_md} extended" + items_data = jp_json.blog.get(max=1) + last_item = items_data[0][0] + # we check that the id hasn't been modified + assert last_item['id'] == last_item_id + assert last_item['content'] == "content in markdown extended" + assert last_item['content_xhtml'] == ( + "<div><p>content in <strong>markdown</strong> extended</p></div>" + ) + + +class TestJpFile: + + def test_upload_get(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + upload_url = jp.file.upload(source_file).strip() + + dest_file = fake_file.new_dest_file() + try: + jp.file.get(upload_url, dest_file=dest_file) + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + dest_file.unlink() + + assert source_file_hash == dest_file_hash + + def test_send_receive(self, fake_file): + source_file = fake_file.size(10240) + source_file_hash = fake_file.get_source_hash(source_file) + send_cmd = jp.file.send(source_file, "account1@server2.test", _bg=True) + dest_path = fake_file.dest_files / "test_send_receive" + dest_path.mkdir() + try: + jp.file.receive( + "account1@server1.test", profile="account1_s2", path=dest_path) + dest_file = dest_path / source_file.name + dest_file_hash = fake_file.get_dest_hash(dest_file) + finally: + shutil.rmtree(dest_path) + send_cmd.wait() + + assert source_file_hash == dest_file_hash