view tests/e2e/libervia-cli/test_libervia-cli.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents f59e9421a650
children 4cd4922de876
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import json
import os
import shutil
from time import sleep

import sh
from sh import li

import pytest
from libervia.backend.plugins.plugin_sec_oxps import NS_OXPS
from libervia.backend.plugins.plugin_sec_pte import NS_PTE
from libervia.backend.plugins.plugin_xep_0277 import NS_ATOM
from libervia.backend.tools.common import uri


if os.getenv("LIBERVIA_TEST_ENV_E2E") is None:
    pytest.skip(
        "skipping end-to-end tests, we are not in a test environment",
        allow_module_level=True
    )


pytestmark = pytest.mark.usefixtures("test_profiles")


class TestInstall:

    def test_li_can_run(self):
        li("--version")


class TestLiberviaCliAccount:

    def test_create_and_delete(self, li_json):
        """Create an account in-band, connect it, then delete it and its profile"""
        li.account.create(
            "test_create@server1.test",
            "test",
            profile="test_create",
            host="server1.test"
        )
        profiles = li_json.profile.list()
        assert "test_create" in profiles
        li.profile.connect(connect=True, profile="test_create")
        li.account.delete(profile="test_create", force=True)
        li.profile.delete("test_create", force=True)
        profiles = li_json.profile.list()
        assert "test_create" not in profiles


@pytest.mark.usefixtures("pubsub_nodes")
class TestLiberviaCliPubsub:

    def test_node_create_info_delete(self):
        node_name = "tmp_node"
        with pytest.raises(sh.ErrorReturnCode_16):
            # the node should not exist
            li.pubsub.node.info(node=node_name)
        try:
            li.pubsub.node.create(node=node_name)
            # if node exist as expected, following command won't raise an exception
            metadata = li.pubsub.node.info(node=node_name)
            assert len(metadata.strip())
        finally:
            li.pubsub.node.delete(node=node_name, force=True)

        with pytest.raises(sh.ErrorReturnCode_16):
            # the node should not exist anymore
            li.pubsub.node.info(node=node_name)

    def test_set_get_delete_purge(self, li_elt):
        content = "test item"
        payload = f"<test>{content}</test>"

        # we create 3 items and check them
        item1_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        item2_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        item3_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        parsed_elt = li_elt.pubsub.get(node="test", item=item1_id)
        payload = parsed_elt.firstChildElement()
        assert payload.name == 'test'
        assert str(payload) == content
        parsed_elt = li_elt.pubsub.get(node="test", item=item2_id)
        payload = parsed_elt.firstChildElement()
        assert payload.name == 'test'
        assert str(payload) == content
        parsed_elt = li_elt.pubsub.get(node="test", item=item3_id)
        payload = parsed_elt.firstChildElement()
        assert payload.name == 'test'
        assert str(payload) == content

        # deleting first item should work
        li.pubsub.delete(node="test", item=item1_id, force=True)
        with pytest.raises(sh.ErrorReturnCode_16):
            li.pubsub.get(node="test", item=item1_id)

        # there must be a least item2 and item3 in the node
        node_items = li_elt.pubsub.get(node="test")
        assert len(list(node_items.elements())) >= 2

        # after purge, node must be empty
        li.pubsub.node.purge(node="test", force=True)
        node_items = li_elt.pubsub.get(node="test")
        assert len(list(node_items.elements())) == 0

    def test_edit(self, editor, li_elt):
        content = "original item"
        payload = f"<test>{content}</test>"
        item_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        editor.set_filter('content.replace("original", "edited")')
        li.pubsub.edit(node="test", item=item_id, _env=editor.env)
        assert "original item" in editor.original_content
        parsed_elt = li_elt.pubsub.get(node="test", item=item_id)
        edited_payload = parsed_elt.firstChildElement()
        expected_edited_content = content.replace("original", "edited")
        assert edited_payload.name == 'test'
        assert str(edited_payload) == expected_edited_content

    def test_affiliations(self, li_json):
        affiliations = li_json.pubsub.affiliations()
        assert affiliations["test"] == "owner"

    def test_uri(self):
        built_uri = li.pubsub.uri(
            service="pubsub.example.net", node="some_node"
        ).strip()
        assert built_uri == "xmpp:pubsub.example.net?;node=some_node"
        built_uri = li.pubsub.uri(
            service="pubsub.example.net", node="some_node", item="some_item"
        ).strip()
        assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item"

    def test_cache_search(self, li_json):
        """A Full-Text Search query can be done"""
        sk_txt = "this is a blog post about Slovakia"
        fr_txt = "this is a blog post about France"
        nc_txt = "this is a blog post about New Caledonia"
        au_txt = "this is a blog post about Australia"
        li.blog.set(
            "-t", "travel", "-t", "europe",
            _in=sk_txt,
            syntax="markdown"
        )
        li.blog.set(
            "-t", "travel", "-t", "europe",
            _in=fr_txt,
            syntax="markdown"
        )
        li.blog.set(
            "-t", "travel", "-t", "south pacific",
            _in=nc_txt,
            syntax="markdown"
        )
        li.blog.set(
            "-t", "travel", "-t", "south pacific",
            _in="this is a blog post about Australia",
            title=au_txt,
            syntax="markdown"
        )
        # we get the blog to activate the cache for it
        li.blog.get(max_items=1)
        # FTS
        found = []
        for __ in range(5):
            found = li_json.pubsub.cache.search(
                type="blog", fts='Slovakia OR "New Caledonia"'
            )
            if found:
                break
            else:
                # retrieving blog triggers the caching, but it's done in parallel
                # thus we may have nothing in cache yet
                sleep(0.5)
        assert len(found) == 2
        assert all(i["content"] in (sk_txt, nc_txt) for i in found)
        # search by field
        found = li_json.pubsub.cache.search(
            "-F", "tags", "overlap", "travel", type="blog"
        )
        assert len(found) == 4
        found = li_json.pubsub.cache.search(
            "-F", "tags", "overlap", "europe", type="blog"
        )
        assert len(found) == 2
        assert all(i["content"] in (sk_txt, fr_txt) for i in found)
        found = li_json.pubsub.cache.search(
            "-F", "tags", "ioverlap", "SOUTH PACIFIC", type="blog"
        )
        assert all(i["content"] in (nc_txt, au_txt) for i in found)


class TestLiberviaCliBlog:
    MICROBLOG_NS = "urn:xmpp:microblog:0"

    def test_set_get(self, li_json):
        li.blog.set(_in="markdown **bold** [link](https://example.net)")
        item_data = li_json.blog.get(max=1, before="")
        item = item_data[0][0]
        metadata = item_data[1]
        assert metadata['service'] == "account1@server1.test"
        assert metadata['node'] == self.MICROBLOG_NS
        assert metadata['rsm'].keys() <= {"first", "last", "index", "count"}
        item_id = item['id']
        expected_uri = uri.build_xmpp_uri(
            'pubsub', subtype="microblog", path="account1@server1.test",
            node=self.MICROBLOG_NS, item=item_id
        )
        assert item['uri'] == expected_uri
        assert item['content_xhtml'] == (
            '<div><p>markdown <strong>bold</strong> '
            '<a href="https://example.net">link</a></p></div>'
        )
        assert isinstance(item['published'], int)
        assert isinstance(item['updated'], int)
        assert isinstance(item['comments'], list)
        assert isinstance(item['tags'], list)
        assert item['author'] == 'account1'
        assert item['author_jid'] == 'account1@server1.test'

    def test_edit(self, editor, li_json):
        payload_md = "content in **markdown**"
        editor.set_filter(repr(payload_md))
        li.blog.edit(_env=editor.env)
        assert len(editor.original_content) == 0
        assert editor.new_content == payload_md
        items_data = li_json.blog.get(max_items=1)
        last_item = items_data[0][0]
        last_item_id = last_item['id']
        assert last_item['content'] == "content in markdown"
        assert last_item['content_xhtml'] == (
            "<div><p>content in <strong>markdown</strong></p></div>"
        )
        editor.set_filter('f"{content} extended"')
        li.blog.edit("--last-item", _env=editor.env)
        assert editor.original_content == payload_md
        assert editor.new_content == f"{payload_md} extended"
        items_data = li_json.blog.get(max_items=1)
        last_item = items_data[0][0]
        # we check that the id hasn't been modified
        assert last_item['id'] == last_item_id
        assert last_item['content'] == "content in markdown extended"
        assert last_item['content_xhtml'] == (
            "<div><p>content in <strong>markdown</strong> extended</p></div>"
        )


class TestLiberviaCliFile:

    def test_upload_get(self, fake_file):
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        upload_url = li.file.upload(source_file).strip()

        dest_file = fake_file.new_dest_file()
        try:
            li.file.get(upload_url, dest_file=dest_file)
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            dest_file.unlink()

        assert source_file_hash == dest_file_hash

    def test_send_receive(self, fake_file):
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True)
        dest_path = fake_file.dest_files / "test_send_receive"
        dest_path.mkdir()
        try:
            li.file.receive(
                "account1@server1.test", profile="account1_s2", path=dest_path)
            dest_file = dest_path / source_file.name
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            shutil.rmtree(dest_path)
        send_cmd.wait()

        assert source_file_hash == dest_file_hash

    def test_send_receive_webrtc(self, fake_file):
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        send_cmd = li.file.send(
            source_file, "account1@server2.test", webrtc=True, _bg=True
        )
        dest_path = fake_file.dest_files / "test_send_receive"
        dest_path.mkdir()
        try:
            li.file.receive(
                "account1@server1.test", profile="account1_s2", path=dest_path)
            dest_file = dest_path / source_file.name
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            shutil.rmtree(dest_path)
        send_cmd.wait()

        assert source_file_hash == dest_file_hash



class TestE2EEncryption:

    def test_pubsub_encryption_oxps(self, li_elt):
        secret_blog = "this is a secret blog post"
        node = "e2ee_blog"
        li.blog.set(_in=secret_blog, node="e2ee_blog", item="test_e2ee", encrypt=True)

        # the item should be transparently decrypted
        parsed_decrypted = li_elt.pubsub.get(
            node=node, item="test_e2ee", no_cache=True
        )
        entry_elt = parsed_decrypted.firstChildElement()
        assert entry_elt.name == "entry"
        assert entry_elt.uri == NS_ATOM
        assert secret_blog in parsed_decrypted.toXml()

        # with --no-decrypt, we should have the encrypted item
        parsed_ori_item = li_elt.pubsub.get(
            node=node, item="test_e2ee", no_decrypt=True, no_cache=True
        )
        encrypted_elt = parsed_ori_item.firstChildElement()
        assert encrypted_elt.name == "encrypted"
        assert encrypted_elt.uri == NS_OXPS
        # the body must not be readable in plain text
        assert secret_blog not in parsed_ori_item.toXml()

    def test_pubsub_secrets_sharing_oxps(self, li_elt):
        secret_blog = "this is a secret blog post"
        node="secret_sharing"

        li.blog.set(_in=secret_blog, node=node, item="test_e2ee", encrypt=True)

        # the item must not be decrypted for account1_s2 (secret is not known)
        parsed_item = li_elt.pubsub.get(
            service="account1@server1.test", node=node, item="test_e2ee", no_cache=True,
            profile="account1_s2"
        )
        encrypted_elt = parsed_item.firstChildElement()
        assert encrypted_elt.name == "encrypted"
        assert encrypted_elt.uri == NS_OXPS
        # the body must not be readable in plain text
        assert secret_blog not in parsed_item.toXml()

        # we share the secrets
        li.pubsub.secret.share("account1@server2.test", service="account1@server1.test", node=node)

        # and get the item again
        parsed_item = li_elt.pubsub.get(
            service="account1@server1.test", node=node, item="test_e2ee", no_cache=True,
            profile="account1_s2"
        )
        # now it should be decrypted
        entry_elt = parsed_item.firstChildElement()
        assert entry_elt.name == "entry"
        assert entry_elt.uri == NS_ATOM
        assert secret_blog in parsed_item.toXml()

    def test_pubsub_signature(self, li_json):
        """A pubsub item can be signed, and the signature can be verified"""
        body = "this message is signed"
        service="account1@server1.test"
        node ="blog_signing"
        item="signed_item"
        li.blog.set(_in=body, service=service, node=node, item=item, sign=True)
        attachments = li_json.pubsub.attachments.get(
            service=service, node=node, item=item
        )
        assert len(attachments) == 1
        attachment = attachments[0]
        assert attachment["from"] == "account1@server1.test"
        signature_json = attachment["signature"]
        sign_data = li_json.pubsub.signature.check(
             json.dumps(signature_json), service=service, node=node, item=item,
        )
        assert sign_data["signer"] == "account1@server1.test"
        assert sign_data["validated"] == True
        assert all(t == "undecided" for t in sign_data["trusts"].values())

    def test_jingle_encrypted_transport_jet(self, fake_file):
        """A file is sent and received properly with JET OMEMO"""
        # FIXME: transport should be checked to see if content is actually encrypted.
        #  Maybe we can use tcpdump?
        li.encryption.start("account1@server2.test", name="omemo_legacy")
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        send_cmd = li.file.send(
            source_file, "account1@server2.test", encrypt=True, _bg=True,
        )
        dest_path = fake_file.dest_files / "test_send_receive"
        dest_path.mkdir()
        try:
            li.file.receive(
                "account1@server1.test", profile="account1_s2", path=dest_path,
            )
            dest_file = dest_path / source_file.name
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            shutil.rmtree(dest_path)
        send_cmd.wait()

        assert source_file_hash == dest_file_hash
        li.encryption.stop("account1@server2.test")

    def test_pubsub_targeted_encryption_pte(self, li_elt):
        """An item is encrypted for specific recipients"""
        secret_blog = "this is a secret blog post"
        node = "e2ee_blog"
        item = "test_pte"
        li.encryption.start("account1@server2.test", name="omemo")
        li.encryption.start(
            "account1@server1.test", name="omemo", profile="account1_s2"
        )
        li.blog.set(
            _in=secret_blog, node="e2ee_blog", item=item,
            encrypt_for="account1@server2.test"
        )

        # the item should be transparently decrypted
        parsed_decrypted = li_elt.pubsub.get(
            service="account1@server1.test", node=node, item=item, no_cache=True,
            profile="account1_s2"
        )
        entry_elt = parsed_decrypted.firstChildElement()
        assert entry_elt.name == "entry"
        assert entry_elt.uri == NS_ATOM
        assert secret_blog in parsed_decrypted.toXml()

        # with --no-decrypt, we should have the encrypted item
        parsed_ori_item = li_elt.pubsub.get(
            node=node, item=item, no_decrypt=True, no_cache=True
        )
        encrypted_elt = parsed_ori_item.firstChildElement()
        assert encrypted_elt.name == "encrypted"
        assert encrypted_elt.uri == NS_PTE
        # the body must not be readable in plain text
        assert secret_blog not in parsed_ori_item.toXml()