Mercurial > libervia-backend
view tests/e2e/libervia-cli/test_libervia-cli.py @ 4242:8acf46ed7f36
frontends: remote control implementation:
This is the frontends common part of remote control implementation. It handle the creation
of WebRTC session, and management of inputs. For now the reception use freedesktop.org
Desktop portal, and works mostly with Wayland based Desktop Environments.
rel 436
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 11 May 2024 13:52:43 +0200 |
parents | f59e9421a650 |
children | 4cd4922de876 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import json import os import shutil from time import sleep import sh from sh import li import pytest from libervia.backend.plugins.plugin_sec_oxps import NS_OXPS from libervia.backend.plugins.plugin_sec_pte import NS_PTE from libervia.backend.plugins.plugin_xep_0277 import NS_ATOM from libervia.backend.tools.common import uri if os.getenv("LIBERVIA_TEST_ENV_E2E") is None: pytest.skip( "skipping end-to-end tests, we are not in a test environment", allow_module_level=True ) pytestmark = pytest.mark.usefixtures("test_profiles") class TestInstall: def test_li_can_run(self): li("--version") class TestLiberviaCliAccount: def test_create_and_delete(self, li_json): """Create an account in-band, connect it, then delete it and its profile""" li.account.create( "test_create@server1.test", "test", profile="test_create", host="server1.test" ) profiles = li_json.profile.list() assert "test_create" in profiles li.profile.connect(connect=True, profile="test_create") li.account.delete(profile="test_create", force=True) li.profile.delete("test_create", force=True) profiles = li_json.profile.list() assert "test_create" not in profiles @pytest.mark.usefixtures("pubsub_nodes") class TestLiberviaCliPubsub: def test_node_create_info_delete(self): node_name = "tmp_node" with pytest.raises(sh.ErrorReturnCode_16): # the node should not exist li.pubsub.node.info(node=node_name) try: li.pubsub.node.create(node=node_name) # if node exist as expected, following command won't raise an exception metadata = li.pubsub.node.info(node=node_name) assert len(metadata.strip()) finally: li.pubsub.node.delete(node=node_name, force=True) with pytest.raises(sh.ErrorReturnCode_16): # the node should not exist anymore li.pubsub.node.info(node=node_name) def test_set_get_delete_purge(self, li_elt): content = "test item" payload = f"<test>{content}</test>" # we create 3 items and check them item1_id = li.pubsub.set(node="test", quiet=True, _in=payload) item2_id = li.pubsub.set(node="test", quiet=True, _in=payload) item3_id = li.pubsub.set(node="test", quiet=True, _in=payload) parsed_elt = li_elt.pubsub.get(node="test", item=item1_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content parsed_elt = li_elt.pubsub.get(node="test", item=item2_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content parsed_elt = li_elt.pubsub.get(node="test", item=item3_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content # deleting first item should work li.pubsub.delete(node="test", item=item1_id, force=True) with pytest.raises(sh.ErrorReturnCode_16): li.pubsub.get(node="test", item=item1_id) # there must be a least item2 and item3 in the node node_items = li_elt.pubsub.get(node="test") assert len(list(node_items.elements())) >= 2 # after purge, node must be empty li.pubsub.node.purge(node="test", force=True) node_items = li_elt.pubsub.get(node="test") assert len(list(node_items.elements())) == 0 def test_edit(self, editor, li_elt): content = "original item" payload = f"<test>{content}</test>" item_id = li.pubsub.set(node="test", quiet=True, _in=payload) editor.set_filter('content.replace("original", "edited")') li.pubsub.edit(node="test", item=item_id, _env=editor.env) assert "original item" in editor.original_content parsed_elt = li_elt.pubsub.get(node="test", item=item_id) edited_payload = parsed_elt.firstChildElement() expected_edited_content = content.replace("original", "edited") assert edited_payload.name == 'test' assert str(edited_payload) == expected_edited_content def test_affiliations(self, li_json): affiliations = li_json.pubsub.affiliations() assert affiliations["test"] == "owner" def test_uri(self): built_uri = li.pubsub.uri( service="pubsub.example.net", node="some_node" ).strip() assert built_uri == "xmpp:pubsub.example.net?;node=some_node" built_uri = li.pubsub.uri( service="pubsub.example.net", node="some_node", item="some_item" ).strip() assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" def test_cache_search(self, li_json): """A Full-Text Search query can be done""" sk_txt = "this is a blog post about Slovakia" fr_txt = "this is a blog post about France" nc_txt = "this is a blog post about New Caledonia" au_txt = "this is a blog post about Australia" li.blog.set( "-t", "travel", "-t", "europe", _in=sk_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "europe", _in=fr_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "south pacific", _in=nc_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "south pacific", _in="this is a blog post about Australia", title=au_txt, syntax="markdown" ) # we get the blog to activate the cache for it li.blog.get(max_items=1) # FTS found = [] for __ in range(5): found = li_json.pubsub.cache.search( type="blog", fts='Slovakia OR "New Caledonia"' ) if found: break else: # retrieving blog triggers the caching, but it's done in parallel # thus we may have nothing in cache yet sleep(0.5) assert len(found) == 2 assert all(i["content"] in (sk_txt, nc_txt) for i in found) # search by field found = li_json.pubsub.cache.search( "-F", "tags", "overlap", "travel", type="blog" ) assert len(found) == 4 found = li_json.pubsub.cache.search( "-F", "tags", "overlap", "europe", type="blog" ) assert len(found) == 2 assert all(i["content"] in (sk_txt, fr_txt) for i in found) found = li_json.pubsub.cache.search( "-F", "tags", "ioverlap", "SOUTH PACIFIC", type="blog" ) assert all(i["content"] in (nc_txt, au_txt) for i in found) class TestLiberviaCliBlog: MICROBLOG_NS = "urn:xmpp:microblog:0" def test_set_get(self, li_json): li.blog.set(_in="markdown **bold** [link](https://example.net)") item_data = li_json.blog.get(max=1, before="") item = item_data[0][0] metadata = item_data[1] assert metadata['service'] == "account1@server1.test" assert metadata['node'] == self.MICROBLOG_NS assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} item_id = item['id'] expected_uri = uri.build_xmpp_uri( 'pubsub', subtype="microblog", path="account1@server1.test", node=self.MICROBLOG_NS, item=item_id ) assert item['uri'] == expected_uri assert item['content_xhtml'] == ( '<div><p>markdown <strong>bold</strong> ' '<a href="https://example.net">link</a></p></div>' ) assert isinstance(item['published'], int) assert isinstance(item['updated'], int) assert isinstance(item['comments'], list) assert isinstance(item['tags'], list) assert item['author'] == 'account1' assert item['author_jid'] == 'account1@server1.test' def test_edit(self, editor, li_json): payload_md = "content in **markdown**" editor.set_filter(repr(payload_md)) li.blog.edit(_env=editor.env) assert len(editor.original_content) == 0 assert editor.new_content == payload_md items_data = li_json.blog.get(max_items=1) last_item = items_data[0][0] last_item_id = last_item['id'] assert last_item['content'] == "content in markdown" assert last_item['content_xhtml'] == ( "<div><p>content in <strong>markdown</strong></p></div>" ) editor.set_filter('f"{content} extended"') li.blog.edit("--last-item", _env=editor.env) assert editor.original_content == payload_md assert editor.new_content == f"{payload_md} extended" items_data = li_json.blog.get(max_items=1) last_item = items_data[0][0] # we check that the id hasn't been modified assert last_item['id'] == last_item_id assert last_item['content'] == "content in markdown extended" assert last_item['content_xhtml'] == ( "<div><p>content in <strong>markdown</strong> extended</p></div>" ) class TestLiberviaCliFile: def test_upload_get(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) upload_url = li.file.upload(source_file).strip() dest_file = fake_file.new_dest_file() try: li.file.get(upload_url, dest_file=dest_file) dest_file_hash = fake_file.get_dest_hash(dest_file) finally: dest_file.unlink() assert source_file_hash == dest_file_hash def test_send_receive(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash def test_send_receive_webrtc(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send( source_file, "account1@server2.test", webrtc=True, _bg=True ) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash class TestE2EEncryption: def test_pubsub_encryption_oxps(self, li_elt): secret_blog = "this is a secret blog post" node = "e2ee_blog" li.blog.set(_in=secret_blog, node="e2ee_blog", item="test_e2ee", encrypt=True) # the item should be transparently decrypted parsed_decrypted = li_elt.pubsub.get( node=node, item="test_e2ee", no_cache=True ) entry_elt = parsed_decrypted.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_decrypted.toXml() # with --no-decrypt, we should have the encrypted item parsed_ori_item = li_elt.pubsub.get( node=node, item="test_e2ee", no_decrypt=True, no_cache=True ) encrypted_elt = parsed_ori_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_OXPS # the body must not be readable in plain text assert secret_blog not in parsed_ori_item.toXml() def test_pubsub_secrets_sharing_oxps(self, li_elt): secret_blog = "this is a secret blog post" node="secret_sharing" li.blog.set(_in=secret_blog, node=node, item="test_e2ee", encrypt=True) # the item must not be decrypted for account1_s2 (secret is not known) parsed_item = li_elt.pubsub.get( service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, profile="account1_s2" ) encrypted_elt = parsed_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_OXPS # the body must not be readable in plain text assert secret_blog not in parsed_item.toXml() # we share the secrets li.pubsub.secret.share("account1@server2.test", service="account1@server1.test", node=node) # and get the item again parsed_item = li_elt.pubsub.get( service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, profile="account1_s2" ) # now it should be decrypted entry_elt = parsed_item.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_item.toXml() def test_pubsub_signature(self, li_json): """A pubsub item can be signed, and the signature can be verified""" body = "this message is signed" service="account1@server1.test" node ="blog_signing" item="signed_item" li.blog.set(_in=body, service=service, node=node, item=item, sign=True) attachments = li_json.pubsub.attachments.get( service=service, node=node, item=item ) assert len(attachments) == 1 attachment = attachments[0] assert attachment["from"] == "account1@server1.test" signature_json = attachment["signature"] sign_data = li_json.pubsub.signature.check( json.dumps(signature_json), service=service, node=node, item=item, ) assert sign_data["signer"] == "account1@server1.test" assert sign_data["validated"] == True assert all(t == "undecided" for t in sign_data["trusts"].values()) def test_jingle_encrypted_transport_jet(self, fake_file): """A file is sent and received properly with JET OMEMO""" # FIXME: transport should be checked to see if content is actually encrypted. # Maybe we can use tcpdump? li.encryption.start("account1@server2.test", name="omemo_legacy") source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send( source_file, "account1@server2.test", encrypt=True, _bg=True, ) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path, ) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash li.encryption.stop("account1@server2.test") def test_pubsub_targeted_encryption_pte(self, li_elt): """An item is encrypted for specific recipients""" secret_blog = "this is a secret blog post" node = "e2ee_blog" item = "test_pte" li.encryption.start("account1@server2.test", name="omemo") li.encryption.start( "account1@server1.test", name="omemo", profile="account1_s2" ) li.blog.set( _in=secret_blog, node="e2ee_blog", item=item, encrypt_for="account1@server2.test" ) # the item should be transparently decrypted parsed_decrypted = li_elt.pubsub.get( service="account1@server1.test", node=node, item=item, no_cache=True, profile="account1_s2" ) entry_elt = parsed_decrypted.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_decrypted.toXml() # with --no-decrypt, we should have the encrypted item parsed_ori_item = li_elt.pubsub.get( node=node, item=item, no_decrypt=True, no_cache=True ) encrypted_elt = parsed_ori_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_PTE # the body must not be readable in plain text assert secret_blog not in parsed_ori_item.toXml()