Mercurial > libervia-backend
view tests/e2e/libervia-cli/test_libervia-cli.py @ 4265:2417ad1d0f23
core (xmpp): fix message workflow interruption from trigger.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 12 Jun 2024 22:37:04 +0200 |
parents | f59e9421a650 |
children | 4cd4922de876 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import json import os import shutil from time import sleep import sh from sh import li import pytest from libervia.backend.plugins.plugin_sec_oxps import NS_OXPS from libervia.backend.plugins.plugin_sec_pte import NS_PTE from libervia.backend.plugins.plugin_xep_0277 import NS_ATOM from libervia.backend.tools.common import uri if os.getenv("LIBERVIA_TEST_ENV_E2E") is None: pytest.skip( "skipping end-to-end tests, we are not in a test environment", allow_module_level=True ) pytestmark = pytest.mark.usefixtures("test_profiles") class TestInstall: def test_li_can_run(self): li("--version") class TestLiberviaCliAccount: def test_create_and_delete(self, li_json): """Create an account in-band, connect it, then delete it and its profile""" li.account.create( "test_create@server1.test", "test", profile="test_create", host="server1.test" ) profiles = li_json.profile.list() assert "test_create" in profiles li.profile.connect(connect=True, profile="test_create") li.account.delete(profile="test_create", force=True) li.profile.delete("test_create", force=True) profiles = li_json.profile.list() assert "test_create" not in profiles @pytest.mark.usefixtures("pubsub_nodes") class TestLiberviaCliPubsub: def test_node_create_info_delete(self): node_name = "tmp_node" with pytest.raises(sh.ErrorReturnCode_16): # the node should not exist li.pubsub.node.info(node=node_name) try: li.pubsub.node.create(node=node_name) # if node exist as expected, following command won't raise an exception metadata = li.pubsub.node.info(node=node_name) assert len(metadata.strip()) finally: li.pubsub.node.delete(node=node_name, force=True) with pytest.raises(sh.ErrorReturnCode_16): # the node should not exist anymore li.pubsub.node.info(node=node_name) def test_set_get_delete_purge(self, li_elt): content = "test item" payload = f"<test>{content}</test>" # we create 3 items and check them item1_id = li.pubsub.set(node="test", quiet=True, _in=payload) item2_id = li.pubsub.set(node="test", quiet=True, _in=payload) item3_id = li.pubsub.set(node="test", quiet=True, _in=payload) parsed_elt = li_elt.pubsub.get(node="test", item=item1_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content parsed_elt = li_elt.pubsub.get(node="test", item=item2_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content parsed_elt = li_elt.pubsub.get(node="test", item=item3_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content # deleting first item should work li.pubsub.delete(node="test", item=item1_id, force=True) with pytest.raises(sh.ErrorReturnCode_16): li.pubsub.get(node="test", item=item1_id) # there must be a least item2 and item3 in the node node_items = li_elt.pubsub.get(node="test") assert len(list(node_items.elements())) >= 2 # after purge, node must be empty li.pubsub.node.purge(node="test", force=True) node_items = li_elt.pubsub.get(node="test") assert len(list(node_items.elements())) == 0 def test_edit(self, editor, li_elt): content = "original item" payload = f"<test>{content}</test>" item_id = li.pubsub.set(node="test", quiet=True, _in=payload) editor.set_filter('content.replace("original", "edited")') li.pubsub.edit(node="test", item=item_id, _env=editor.env) assert "original item" in editor.original_content parsed_elt = li_elt.pubsub.get(node="test", item=item_id) edited_payload = parsed_elt.firstChildElement() expected_edited_content = content.replace("original", "edited") assert edited_payload.name == 'test' assert str(edited_payload) == expected_edited_content def test_affiliations(self, li_json): affiliations = li_json.pubsub.affiliations() assert affiliations["test"] == "owner" def test_uri(self): built_uri = li.pubsub.uri( service="pubsub.example.net", node="some_node" ).strip() assert built_uri == "xmpp:pubsub.example.net?;node=some_node" built_uri = li.pubsub.uri( service="pubsub.example.net", node="some_node", item="some_item" ).strip() assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" def test_cache_search(self, li_json): """A Full-Text Search query can be done""" sk_txt = "this is a blog post about Slovakia" fr_txt = "this is a blog post about France" nc_txt = "this is a blog post about New Caledonia" au_txt = "this is a blog post about Australia" li.blog.set( "-t", "travel", "-t", "europe", _in=sk_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "europe", _in=fr_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "south pacific", _in=nc_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "south pacific", _in="this is a blog post about Australia", title=au_txt, syntax="markdown" ) # we get the blog to activate the cache for it li.blog.get(max_items=1) # FTS found = [] for __ in range(5): found = li_json.pubsub.cache.search( type="blog", fts='Slovakia OR "New Caledonia"' ) if found: break else: # retrieving blog triggers the caching, but it's done in parallel # thus we may have nothing in cache yet sleep(0.5) assert len(found) == 2 assert all(i["content"] in (sk_txt, nc_txt) for i in found) # search by field found = li_json.pubsub.cache.search( "-F", "tags", "overlap", "travel", type="blog" ) assert len(found) == 4 found = li_json.pubsub.cache.search( "-F", "tags", "overlap", "europe", type="blog" ) assert len(found) == 2 assert all(i["content"] in (sk_txt, fr_txt) for i in found) found = li_json.pubsub.cache.search( "-F", "tags", "ioverlap", "SOUTH PACIFIC", type="blog" ) assert all(i["content"] in (nc_txt, au_txt) for i in found) class TestLiberviaCliBlog: MICROBLOG_NS = "urn:xmpp:microblog:0" def test_set_get(self, li_json): li.blog.set(_in="markdown **bold** [link](https://example.net)") item_data = li_json.blog.get(max=1, before="") item = item_data[0][0] metadata = item_data[1] assert metadata['service'] == "account1@server1.test" assert metadata['node'] == self.MICROBLOG_NS assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} item_id = item['id'] expected_uri = uri.build_xmpp_uri( 'pubsub', subtype="microblog", path="account1@server1.test", node=self.MICROBLOG_NS, item=item_id ) assert item['uri'] == expected_uri assert item['content_xhtml'] == ( '<div><p>markdown <strong>bold</strong> ' '<a href="https://example.net">link</a></p></div>' ) assert isinstance(item['published'], int) assert isinstance(item['updated'], int) assert isinstance(item['comments'], list) assert isinstance(item['tags'], list) assert item['author'] == 'account1' assert item['author_jid'] == 'account1@server1.test' def test_edit(self, editor, li_json): payload_md = "content in **markdown**" editor.set_filter(repr(payload_md)) li.blog.edit(_env=editor.env) assert len(editor.original_content) == 0 assert editor.new_content == payload_md items_data = li_json.blog.get(max_items=1) last_item = items_data[0][0] last_item_id = last_item['id'] assert last_item['content'] == "content in markdown" assert last_item['content_xhtml'] == ( "<div><p>content in <strong>markdown</strong></p></div>" ) editor.set_filter('f"{content} extended"') li.blog.edit("--last-item", _env=editor.env) assert editor.original_content == payload_md assert editor.new_content == f"{payload_md} extended" items_data = li_json.blog.get(max_items=1) last_item = items_data[0][0] # we check that the id hasn't been modified assert last_item['id'] == last_item_id assert last_item['content'] == "content in markdown extended" assert last_item['content_xhtml'] == ( "<div><p>content in <strong>markdown</strong> extended</p></div>" ) class TestLiberviaCliFile: def test_upload_get(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) upload_url = li.file.upload(source_file).strip() dest_file = fake_file.new_dest_file() try: li.file.get(upload_url, dest_file=dest_file) dest_file_hash = fake_file.get_dest_hash(dest_file) finally: dest_file.unlink() assert source_file_hash == dest_file_hash def test_send_receive(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash def test_send_receive_webrtc(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send( source_file, "account1@server2.test", webrtc=True, _bg=True ) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash class TestE2EEncryption: def test_pubsub_encryption_oxps(self, li_elt): secret_blog = "this is a secret blog post" node = "e2ee_blog" li.blog.set(_in=secret_blog, node="e2ee_blog", item="test_e2ee", encrypt=True) # the item should be transparently decrypted parsed_decrypted = li_elt.pubsub.get( node=node, item="test_e2ee", no_cache=True ) entry_elt = parsed_decrypted.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_decrypted.toXml() # with --no-decrypt, we should have the encrypted item parsed_ori_item = li_elt.pubsub.get( node=node, item="test_e2ee", no_decrypt=True, no_cache=True ) encrypted_elt = parsed_ori_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_OXPS # the body must not be readable in plain text assert secret_blog not in parsed_ori_item.toXml() def test_pubsub_secrets_sharing_oxps(self, li_elt): secret_blog = "this is a secret blog post" node="secret_sharing" li.blog.set(_in=secret_blog, node=node, item="test_e2ee", encrypt=True) # the item must not be decrypted for account1_s2 (secret is not known) parsed_item = li_elt.pubsub.get( service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, profile="account1_s2" ) encrypted_elt = parsed_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_OXPS # the body must not be readable in plain text assert secret_blog not in parsed_item.toXml() # we share the secrets li.pubsub.secret.share("account1@server2.test", service="account1@server1.test", node=node) # and get the item again parsed_item = li_elt.pubsub.get( service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, profile="account1_s2" ) # now it should be decrypted entry_elt = parsed_item.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_item.toXml() def test_pubsub_signature(self, li_json): """A pubsub item can be signed, and the signature can be verified""" body = "this message is signed" service="account1@server1.test" node ="blog_signing" item="signed_item" li.blog.set(_in=body, service=service, node=node, item=item, sign=True) attachments = li_json.pubsub.attachments.get( service=service, node=node, item=item ) assert len(attachments) == 1 attachment = attachments[0] assert attachment["from"] == "account1@server1.test" signature_json = attachment["signature"] sign_data = li_json.pubsub.signature.check( json.dumps(signature_json), service=service, node=node, item=item, ) assert sign_data["signer"] == "account1@server1.test" assert sign_data["validated"] == True assert all(t == "undecided" for t in sign_data["trusts"].values()) def test_jingle_encrypted_transport_jet(self, fake_file): """A file is sent and received properly with JET OMEMO""" # FIXME: transport should be checked to see if content is actually encrypted. # Maybe we can use tcpdump? li.encryption.start("account1@server2.test", name="omemo_legacy") source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send( source_file, "account1@server2.test", encrypt=True, _bg=True, ) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path, ) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash li.encryption.stop("account1@server2.test") def test_pubsub_targeted_encryption_pte(self, li_elt): """An item is encrypted for specific recipients""" secret_blog = "this is a secret blog post" node = "e2ee_blog" item = "test_pte" li.encryption.start("account1@server2.test", name="omemo") li.encryption.start( "account1@server1.test", name="omemo", profile="account1_s2" ) li.blog.set( _in=secret_blog, node="e2ee_blog", item=item, encrypt_for="account1@server2.test" ) # the item should be transparently decrypted parsed_decrypted = li_elt.pubsub.get( service="account1@server1.test", node=node, item=item, no_cache=True, profile="account1_s2" ) entry_elt = parsed_decrypted.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_decrypted.toXml() # with --no-decrypt, we should have the encrypted item parsed_ori_item = li_elt.pubsub.get( node=node, item=item, no_decrypt=True, no_cache=True ) encrypted_elt = parsed_ori_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_PTE # the body must not be readable in plain text assert secret_blog not in parsed_ori_item.toXml()