Mercurial > libervia-backend
view tests/e2e/libervia-cli/test_libervia-cli.py @ 4094:c3b68fdc2de7
component AP gateway: fix handling of XMPP comments authors:
the gateway was supposing that comments where emitted from PEP of author. While this is
the case for most blog posts, it's not for comments. Instead the component is now using
`author_jid` which is retrieved by XEP-0277 plugin, and reject the item if the auhor is
not verified (i.e. if `publisher` attribute is not set by XMPP service).
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 12 Jun 2023 14:50:43 +0200 |
parents | 4b842c1fb686 |
children | dfccc90cacc6 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import json import os import shutil from time import sleep import sh from sh import li import pytest from libervia.backend.plugins.plugin_sec_oxps import NS_OXPS from libervia.backend.plugins.plugin_sec_pte import NS_PTE from libervia.backend.plugins.plugin_xep_0277 import NS_ATOM from libervia.backend.tools.common import uri if os.getenv("LIBERVIA_TEST_ENV_E2E") is None: pytest.skip( "skipping end-to-end tests, we are not in a test environment", allow_module_level=True ) pytestmark = pytest.mark.usefixtures("test_profiles") class TestInstall: def test_li_can_run(self): li("--version") class TestLiberviaCliAccount: def test_create_and_delete(self, li_json): """Create an account in-band, connect it, then delete it and its profile""" li.account.create( "test_create@server1.test", "test", profile="test_create", host="server1.test" ) profiles = li_json.profile.list() assert "test_create" in profiles li.profile.connect(connect=True, profile="test_create") li.account.delete(profile="test_create", force=True) li.profile.delete("test_create", force=True) profiles = li_json.profile.list() assert "test_create" not in profiles @pytest.mark.usefixtures("pubsub_nodes") class TestLiberviaCliPubsub: def test_node_create_info_delete(self): node_name = "tmp_node" with pytest.raises(sh.ErrorReturnCode_16): # the node should not exist li.pubsub.node.info(node=node_name) try: li.pubsub.node.create(node=node_name) # if node exist as expected, following command won't raise an exception metadata = li.pubsub.node.info(node=node_name) assert len(metadata.strip()) finally: li.pubsub.node.delete(node=node_name, force=True) with pytest.raises(sh.ErrorReturnCode_16): # the node should not exist anymore li.pubsub.node.info(node=node_name) def test_set_get_delete_purge(self, li_elt): content = "test item" payload = f"<test>{content}</test>" # we create 3 items and check them item1_id = li.pubsub.set(node="test", quiet=True, _in=payload) item2_id = li.pubsub.set(node="test", quiet=True, _in=payload) item3_id = li.pubsub.set(node="test", quiet=True, _in=payload) parsed_elt = li_elt.pubsub.get(node="test", item=item1_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content parsed_elt = li_elt.pubsub.get(node="test", item=item2_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content parsed_elt = li_elt.pubsub.get(node="test", item=item3_id) payload = parsed_elt.firstChildElement() assert payload.name == 'test' assert str(payload) == content # deleting first item should work li.pubsub.delete(node="test", item=item1_id, force=True) with pytest.raises(sh.ErrorReturnCode_16): li.pubsub.get(node="test", item=item1_id) # there must be a least item2 and item3 in the node node_items = li_elt.pubsub.get(node="test") assert len(list(node_items.elements())) >= 2 # after purge, node must be empty li.pubsub.node.purge(node="test", force=True) node_items = li_elt.pubsub.get(node="test") assert len(list(node_items.elements())) == 0 def test_edit(self, editor, li_elt): content = "original item" payload = f"<test>{content}</test>" item_id = li.pubsub.set(node="test", quiet=True, _in=payload) editor.set_filter('content.replace("original", "edited")') li.pubsub.edit(node="test", item=item_id, _env=editor.env) assert "original item" in editor.original_content parsed_elt = li_elt.pubsub.get(node="test", item=item_id) edited_payload = parsed_elt.firstChildElement() expected_edited_content = content.replace("original", "edited") assert edited_payload.name == 'test' assert str(edited_payload) == expected_edited_content def test_affiliations(self, li_json): affiliations = li_json.pubsub.affiliations() assert affiliations["test"] == "owner" def test_uri(self): built_uri = li.pubsub.uri( service="pubsub.example.net", node="some_node" ).strip() assert built_uri == "xmpp:pubsub.example.net?;node=some_node" built_uri = li.pubsub.uri( service="pubsub.example.net", node="some_node", item="some_item" ).strip() assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" def test_cache_search(self, li_json): """A Full-Text Search query can be done""" sk_txt = "this is a blog post about Slovakia" fr_txt = "this is a blog post about France" nc_txt = "this is a blog post about New Caledonia" au_txt = "this is a blog post about Australia" li.blog.set( "-t", "travel", "-t", "europe", _in=sk_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "europe", _in=fr_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "south pacific", _in=nc_txt, syntax="markdown" ) li.blog.set( "-t", "travel", "-t", "south pacific", _in="this is a blog post about Australia", title=au_txt, syntax="markdown" ) # we get the blog to activate the cache for it li.blog.get(max_items=1) # FTS found = [] for __ in range(5): found = li_json.pubsub.cache.search( type="blog", fts='Slovakia OR "New Caledonia"' ) if found: break else: # retrieving blog triggers the caching, but it's done in parallel # thus we may have nothing in cache yet sleep(0.5) assert len(found) == 2 assert all(i["content"] in (sk_txt, nc_txt) for i in found) # search by field found = li_json.pubsub.cache.search( "-F", "tags", "overlap", "travel", type="blog" ) assert len(found) == 4 found = li_json.pubsub.cache.search( "-F", "tags", "overlap", "europe", type="blog" ) assert len(found) == 2 assert all(i["content"] in (sk_txt, fr_txt) for i in found) found = li_json.pubsub.cache.search( "-F", "tags", "ioverlap", "SOUTH PACIFIC", type="blog" ) assert all(i["content"] in (nc_txt, au_txt) for i in found) class TestLiberviaCliBlog: MICROBLOG_NS = "urn:xmpp:microblog:0" def test_set_get(self, li_json): li.blog.set(_in="markdown **bold** [link](https://example.net)") item_data = li_json.blog.get(max=1, before="") item = item_data[0][0] metadata = item_data[1] assert metadata['service'] == "account1@server1.test" assert metadata['node'] == self.MICROBLOG_NS assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} item_id = item['id'] expected_uri = uri.build_xmpp_uri( 'pubsub', subtype="microblog", path="account1@server1.test", node=self.MICROBLOG_NS, item=item_id ) assert item['uri'] == expected_uri assert item['content_xhtml'] == ( '<div><p>markdown <strong>bold</strong> ' '<a href="https://example.net">link</a></p></div>' ) assert isinstance(item['published'], int) assert isinstance(item['updated'], int) assert isinstance(item['comments'], list) assert isinstance(item['tags'], list) assert item['author'] == 'account1' assert item['author_jid'] == 'account1@server1.test' def test_edit(self, editor, li_json): payload_md = "content in **markdown**" editor.set_filter(repr(payload_md)) li.blog.edit(_env=editor.env) assert len(editor.original_content) == 0 assert editor.new_content == payload_md items_data = li_json.blog.get(max_items=1) last_item = items_data[0][0] last_item_id = last_item['id'] assert last_item['content'] == "content in markdown" assert last_item['content_xhtml'] == ( "<div><p>content in <strong>markdown</strong></p></div>" ) editor.set_filter('f"{content} extended"') li.blog.edit("--last-item", _env=editor.env) assert editor.original_content == payload_md assert editor.new_content == f"{payload_md} extended" items_data = li_json.blog.get(max_items=1) last_item = items_data[0][0] # we check that the id hasn't been modified assert last_item['id'] == last_item_id assert last_item['content'] == "content in markdown extended" assert last_item['content_xhtml'] == ( "<div><p>content in <strong>markdown</strong> extended</p></div>" ) class TestLiberviaCliFile: def test_upload_get(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) upload_url = li.file.upload(source_file).strip() dest_file = fake_file.new_dest_file() try: li.file.get(upload_url, dest_file=dest_file) dest_file_hash = fake_file.get_dest_hash(dest_file) finally: dest_file.unlink() assert source_file_hash == dest_file_hash def test_send_receive(self, fake_file): source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash class TestE2EEncryption: def test_pubsub_encryption_oxps(self, li_elt): secret_blog = "this is a secret blog post" node = "e2ee_blog" li.blog.set(_in=secret_blog, node="e2ee_blog", item="test_e2ee", encrypt=True) # the item should be transparently decrypted parsed_decrypted = li_elt.pubsub.get( node=node, item="test_e2ee", no_cache=True ) entry_elt = parsed_decrypted.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_decrypted.toXml() # with --no-decrypt, we should have the encrypted item parsed_ori_item = li_elt.pubsub.get( node=node, item="test_e2ee", no_decrypt=True, no_cache=True ) encrypted_elt = parsed_ori_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_OXPS # the body must not be readable in plain text assert secret_blog not in parsed_ori_item.toXml() def test_pubsub_secrets_sharing_oxps(self, li_elt): secret_blog = "this is a secret blog post" node="secret_sharing" li.blog.set(_in=secret_blog, node=node, item="test_e2ee", encrypt=True) # the item must not be decrypted for account1_s2 (secret is not known) parsed_item = li_elt.pubsub.get( service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, profile="account1_s2" ) encrypted_elt = parsed_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_OXPS # the body must not be readable in plain text assert secret_blog not in parsed_item.toXml() # we share the secrets li.pubsub.secret.share("account1@server2.test", service="account1@server1.test", node=node) # and get the item again parsed_item = li_elt.pubsub.get( service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, profile="account1_s2" ) # now it should be decrypted entry_elt = parsed_item.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_item.toXml() def test_pubsub_signature(self, li_json): """A pubsub item can be signed, and the signature can be verified""" body = "this message is signed" service="account1@server1.test" node ="blog_signing" item="signed_item" li.blog.set(_in=body, service=service, node=node, item=item, sign=True) attachments = li_json.pubsub.attachments.get( service=service, node=node, item=item ) assert len(attachments) == 1 attachment = attachments[0] assert attachment["from"] == "account1@server1.test" signature_json = attachment["signature"] sign_data = li_json.pubsub.signature.check( json.dumps(signature_json), service=service, node=node, item=item, ) assert sign_data["signer"] == "account1@server1.test" assert sign_data["validated"] == True assert all(t == "undecided" for t in sign_data["trusts"].values()) def test_jingle_encrypted_transport_jet(self, fake_file): """A file is sent and received properly with JET OMEMO""" li.encryption.start("account1@server2.test", name="oldmemo") source_file = fake_file.size(10240) source_file_hash = fake_file.get_source_hash(source_file) send_cmd = li.file.send( source_file, "account1@server2.test", encrypt=True, _bg=True ) dest_path = fake_file.dest_files / "test_send_receive" dest_path.mkdir() try: li.file.receive( "account1@server1.test", profile="account1_s2", path=dest_path) dest_file = dest_path / source_file.name dest_file_hash = fake_file.get_dest_hash(dest_file) finally: shutil.rmtree(dest_path) send_cmd.wait() assert source_file_hash == dest_file_hash li.encryption.stop("account1@server2.test") def test_pubsub_targeted_encryption_pte(self, li_elt): """An item is encrypted for specific recipients""" secret_blog = "this is a secret blog post" node = "e2ee_blog" item = "test_pte" li.encryption.start("account1@server2.test", name="twomemo") li.encryption.start( "account1@server1.test", name="twomemo", profile="account1_s2" ) li.blog.set( _in=secret_blog, node="e2ee_blog", item=item, encrypt_for="account1@server2.test" ) # the item should be transparently decrypted parsed_decrypted = li_elt.pubsub.get( service="account1@server1.test", node=node, item=item, no_cache=True, profile="account1_s2" ) entry_elt = parsed_decrypted.firstChildElement() assert entry_elt.name == "entry" assert entry_elt.uri == NS_ATOM assert secret_blog in parsed_decrypted.toXml() # with --no-decrypt, we should have the encrypted item parsed_ori_item = li_elt.pubsub.get( node=node, item=item, no_decrypt=True, no_cache=True ) encrypted_elt = parsed_ori_item.firstChildElement() assert encrypted_elt.name == "encrypted" assert encrypted_elt.uri == NS_PTE # the body must not be readable in plain text assert secret_blog not in parsed_ori_item.toXml()