view tests/e2e/libervia-cli/test_libervia-cli.py @ 4242:8acf46ed7f36

frontends: remote control implementation: This is the frontends common part of remote control implementation. It handle the creation of WebRTC session, and management of inputs. For now the reception use freedesktop.org Desktop portal, and works mostly with Wayland based Desktop Environments. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:43 +0200
parents f59e9421a650
children 4cd4922de876
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import json
import os
import shutil
from time import sleep

import sh
from sh import li

import pytest
from libervia.backend.plugins.plugin_sec_oxps import NS_OXPS
from libervia.backend.plugins.plugin_sec_pte import NS_PTE
from libervia.backend.plugins.plugin_xep_0277 import NS_ATOM
from libervia.backend.tools.common import uri


if os.getenv("LIBERVIA_TEST_ENV_E2E") is None:
    pytest.skip(
        "skipping end-to-end tests, we are not in a test environment",
        allow_module_level=True
    )


pytestmark = pytest.mark.usefixtures("test_profiles")


class TestInstall:

    def test_li_can_run(self):
        li("--version")


class TestLiberviaCliAccount:

    def test_create_and_delete(self, li_json):
        """Create an account in-band, connect it, then delete it and its profile"""
        li.account.create(
            "test_create@server1.test",
            "test",
            profile="test_create",
            host="server1.test"
        )
        profiles = li_json.profile.list()
        assert "test_create" in profiles
        li.profile.connect(connect=True, profile="test_create")
        li.account.delete(profile="test_create", force=True)
        li.profile.delete("test_create", force=True)
        profiles = li_json.profile.list()
        assert "test_create" not in profiles


@pytest.mark.usefixtures("pubsub_nodes")
class TestLiberviaCliPubsub:

    def test_node_create_info_delete(self):
        node_name = "tmp_node"
        with pytest.raises(sh.ErrorReturnCode_16):
            # the node should not exist
            li.pubsub.node.info(node=node_name)
        try:
            li.pubsub.node.create(node=node_name)
            # if node exist as expected, following command won't raise an exception
            metadata = li.pubsub.node.info(node=node_name)
            assert len(metadata.strip())
        finally:
            li.pubsub.node.delete(node=node_name, force=True)

        with pytest.raises(sh.ErrorReturnCode_16):
            # the node should not exist anymore
            li.pubsub.node.info(node=node_name)

    def test_set_get_delete_purge(self, li_elt):
        content = "test item"
        payload = f"<test>{content}</test>"

        # we create 3 items and check them
        item1_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        item2_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        item3_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        parsed_elt = li_elt.pubsub.get(node="test", item=item1_id)
        payload = parsed_elt.firstChildElement()
        assert payload.name == 'test'
        assert str(payload) == content
        parsed_elt = li_elt.pubsub.get(node="test", item=item2_id)
        payload = parsed_elt.firstChildElement()
        assert payload.name == 'test'
        assert str(payload) == content
        parsed_elt = li_elt.pubsub.get(node="test", item=item3_id)
        payload = parsed_elt.firstChildElement()
        assert payload.name == 'test'
        assert str(payload) == content

        # deleting first item should work
        li.pubsub.delete(node="test", item=item1_id, force=True)
        with pytest.raises(sh.ErrorReturnCode_16):
            li.pubsub.get(node="test", item=item1_id)

        # there must be a least item2 and item3 in the node
        node_items = li_elt.pubsub.get(node="test")
        assert len(list(node_items.elements())) >= 2

        # after purge, node must be empty
        li.pubsub.node.purge(node="test", force=True)
        node_items = li_elt.pubsub.get(node="test")
        assert len(list(node_items.elements())) == 0

    def test_edit(self, editor, li_elt):
        content = "original item"
        payload = f"<test>{content}</test>"
        item_id = li.pubsub.set(node="test", quiet=True, _in=payload)
        editor.set_filter('content.replace("original", "edited")')
        li.pubsub.edit(node="test", item=item_id, _env=editor.env)
        assert "original item" in editor.original_content
        parsed_elt = li_elt.pubsub.get(node="test", item=item_id)
        edited_payload = parsed_elt.firstChildElement()
        expected_edited_content = content.replace("original", "edited")
        assert edited_payload.name == 'test'
        assert str(edited_payload) == expected_edited_content

    def test_affiliations(self, li_json):
        affiliations = li_json.pubsub.affiliations()
        assert affiliations["test"] == "owner"

    def test_uri(self):
        built_uri = li.pubsub.uri(
            service="pubsub.example.net", node="some_node"
        ).strip()
        assert built_uri == "xmpp:pubsub.example.net?;node=some_node"
        built_uri = li.pubsub.uri(
            service="pubsub.example.net", node="some_node", item="some_item"
        ).strip()
        assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item"

    def test_cache_search(self, li_json):
        """A Full-Text Search query can be done"""
        sk_txt = "this is a blog post about Slovakia"
        fr_txt = "this is a blog post about France"
        nc_txt = "this is a blog post about New Caledonia"
        au_txt = "this is a blog post about Australia"
        li.blog.set(
            "-t", "travel", "-t", "europe",
            _in=sk_txt,
            syntax="markdown"
        )
        li.blog.set(
            "-t", "travel", "-t", "europe",
            _in=fr_txt,
            syntax="markdown"
        )
        li.blog.set(
            "-t", "travel", "-t", "south pacific",
            _in=nc_txt,
            syntax="markdown"
        )
        li.blog.set(
            "-t", "travel", "-t", "south pacific",
            _in="this is a blog post about Australia",
            title=au_txt,
            syntax="markdown"
        )
        # we get the blog to activate the cache for it
        li.blog.get(max_items=1)
        # FTS
        found = []
        for __ in range(5):
            found = li_json.pubsub.cache.search(
                type="blog", fts='Slovakia OR "New Caledonia"'
            )
            if found:
                break
            else:
                # retrieving blog triggers the caching, but it's done in parallel
                # thus we may have nothing in cache yet
                sleep(0.5)
        assert len(found) == 2
        assert all(i["content"] in (sk_txt, nc_txt) for i in found)
        # search by field
        found = li_json.pubsub.cache.search(
            "-F", "tags", "overlap", "travel", type="blog"
        )
        assert len(found) == 4
        found = li_json.pubsub.cache.search(
            "-F", "tags", "overlap", "europe", type="blog"
        )
        assert len(found) == 2
        assert all(i["content"] in (sk_txt, fr_txt) for i in found)
        found = li_json.pubsub.cache.search(
            "-F", "tags", "ioverlap", "SOUTH PACIFIC", type="blog"
        )
        assert all(i["content"] in (nc_txt, au_txt) for i in found)


class TestLiberviaCliBlog:
    MICROBLOG_NS = "urn:xmpp:microblog:0"

    def test_set_get(self, li_json):
        li.blog.set(_in="markdown **bold** [link](https://example.net)")
        item_data = li_json.blog.get(max=1, before="")
        item = item_data[0][0]
        metadata = item_data[1]
        assert metadata['service'] == "account1@server1.test"
        assert metadata['node'] == self.MICROBLOG_NS
        assert metadata['rsm'].keys() <= {"first", "last", "index", "count"}
        item_id = item['id']
        expected_uri = uri.build_xmpp_uri(
            'pubsub', subtype="microblog", path="account1@server1.test",
            node=self.MICROBLOG_NS, item=item_id
        )
        assert item['uri'] == expected_uri
        assert item['content_xhtml'] == (
            '<div><p>markdown <strong>bold</strong> '
            '<a href="https://example.net">link</a></p></div>'
        )
        assert isinstance(item['published'], int)
        assert isinstance(item['updated'], int)
        assert isinstance(item['comments'], list)
        assert isinstance(item['tags'], list)
        assert item['author'] == 'account1'
        assert item['author_jid'] == 'account1@server1.test'

    def test_edit(self, editor, li_json):
        payload_md = "content in **markdown**"
        editor.set_filter(repr(payload_md))
        li.blog.edit(_env=editor.env)
        assert len(editor.original_content) == 0
        assert editor.new_content == payload_md
        items_data = li_json.blog.get(max_items=1)
        last_item = items_data[0][0]
        last_item_id = last_item['id']
        assert last_item['content'] == "content in markdown"
        assert last_item['content_xhtml'] == (
            "<div><p>content in <strong>markdown</strong></p></div>"
        )
        editor.set_filter('f"{content} extended"')
        li.blog.edit("--last-item", _env=editor.env)
        assert editor.original_content == payload_md
        assert editor.new_content == f"{payload_md} extended"
        items_data = li_json.blog.get(max_items=1)
        last_item = items_data[0][0]
        # we check that the id hasn't been modified
        assert last_item['id'] == last_item_id
        assert last_item['content'] == "content in markdown extended"
        assert last_item['content_xhtml'] == (
            "<div><p>content in <strong>markdown</strong> extended</p></div>"
        )


class TestLiberviaCliFile:

    def test_upload_get(self, fake_file):
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        upload_url = li.file.upload(source_file).strip()

        dest_file = fake_file.new_dest_file()
        try:
            li.file.get(upload_url, dest_file=dest_file)
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            dest_file.unlink()

        assert source_file_hash == dest_file_hash

    def test_send_receive(self, fake_file):
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True)
        dest_path = fake_file.dest_files / "test_send_receive"
        dest_path.mkdir()
        try:
            li.file.receive(
                "account1@server1.test", profile="account1_s2", path=dest_path)
            dest_file = dest_path / source_file.name
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            shutil.rmtree(dest_path)
        send_cmd.wait()

        assert source_file_hash == dest_file_hash

    def test_send_receive_webrtc(self, fake_file):
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        send_cmd = li.file.send(
            source_file, "account1@server2.test", webrtc=True, _bg=True
        )
        dest_path = fake_file.dest_files / "test_send_receive"
        dest_path.mkdir()
        try:
            li.file.receive(
                "account1@server1.test", profile="account1_s2", path=dest_path)
            dest_file = dest_path / source_file.name
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            shutil.rmtree(dest_path)
        send_cmd.wait()

        assert source_file_hash == dest_file_hash



class TestE2EEncryption:

    def test_pubsub_encryption_oxps(self, li_elt):
        secret_blog = "this is a secret blog post"
        node = "e2ee_blog"
        li.blog.set(_in=secret_blog, node="e2ee_blog", item="test_e2ee", encrypt=True)

        # the item should be transparently decrypted
        parsed_decrypted = li_elt.pubsub.get(
            node=node, item="test_e2ee", no_cache=True
        )
        entry_elt = parsed_decrypted.firstChildElement()
        assert entry_elt.name == "entry"
        assert entry_elt.uri == NS_ATOM
        assert secret_blog in parsed_decrypted.toXml()

        # with --no-decrypt, we should have the encrypted item
        parsed_ori_item = li_elt.pubsub.get(
            node=node, item="test_e2ee", no_decrypt=True, no_cache=True
        )
        encrypted_elt = parsed_ori_item.firstChildElement()
        assert encrypted_elt.name == "encrypted"
        assert encrypted_elt.uri == NS_OXPS
        # the body must not be readable in plain text
        assert secret_blog not in parsed_ori_item.toXml()

    def test_pubsub_secrets_sharing_oxps(self, li_elt):
        secret_blog = "this is a secret blog post"
        node="secret_sharing"

        li.blog.set(_in=secret_blog, node=node, item="test_e2ee", encrypt=True)

        # the item must not be decrypted for account1_s2 (secret is not known)
        parsed_item = li_elt.pubsub.get(
            service="account1@server1.test", node=node, item="test_e2ee", no_cache=True,
            profile="account1_s2"
        )
        encrypted_elt = parsed_item.firstChildElement()
        assert encrypted_elt.name == "encrypted"
        assert encrypted_elt.uri == NS_OXPS
        # the body must not be readable in plain text
        assert secret_blog not in parsed_item.toXml()

        # we share the secrets
        li.pubsub.secret.share("account1@server2.test", service="account1@server1.test", node=node)

        # and get the item again
        parsed_item = li_elt.pubsub.get(
            service="account1@server1.test", node=node, item="test_e2ee", no_cache=True,
            profile="account1_s2"
        )
        # now it should be decrypted
        entry_elt = parsed_item.firstChildElement()
        assert entry_elt.name == "entry"
        assert entry_elt.uri == NS_ATOM
        assert secret_blog in parsed_item.toXml()

    def test_pubsub_signature(self, li_json):
        """A pubsub item can be signed, and the signature can be verified"""
        body = "this message is signed"
        service="account1@server1.test"
        node ="blog_signing"
        item="signed_item"
        li.blog.set(_in=body, service=service, node=node, item=item, sign=True)
        attachments = li_json.pubsub.attachments.get(
            service=service, node=node, item=item
        )
        assert len(attachments) == 1
        attachment = attachments[0]
        assert attachment["from"] == "account1@server1.test"
        signature_json = attachment["signature"]
        sign_data = li_json.pubsub.signature.check(
             json.dumps(signature_json), service=service, node=node, item=item,
        )
        assert sign_data["signer"] == "account1@server1.test"
        assert sign_data["validated"] == True
        assert all(t == "undecided" for t in sign_data["trusts"].values())

    def test_jingle_encrypted_transport_jet(self, fake_file):
        """A file is sent and received properly with JET OMEMO"""
        # FIXME: transport should be checked to see if content is actually encrypted.
        #  Maybe we can use tcpdump?
        li.encryption.start("account1@server2.test", name="omemo_legacy")
        source_file = fake_file.size(10240)
        source_file_hash = fake_file.get_source_hash(source_file)
        send_cmd = li.file.send(
            source_file, "account1@server2.test", encrypt=True, _bg=True,
        )
        dest_path = fake_file.dest_files / "test_send_receive"
        dest_path.mkdir()
        try:
            li.file.receive(
                "account1@server1.test", profile="account1_s2", path=dest_path,
            )
            dest_file = dest_path / source_file.name
            dest_file_hash = fake_file.get_dest_hash(dest_file)
        finally:
            shutil.rmtree(dest_path)
        send_cmd.wait()

        assert source_file_hash == dest_file_hash
        li.encryption.stop("account1@server2.test")

    def test_pubsub_targeted_encryption_pte(self, li_elt):
        """An item is encrypted for specific recipients"""
        secret_blog = "this is a secret blog post"
        node = "e2ee_blog"
        item = "test_pte"
        li.encryption.start("account1@server2.test", name="omemo")
        li.encryption.start(
            "account1@server1.test", name="omemo", profile="account1_s2"
        )
        li.blog.set(
            _in=secret_blog, node="e2ee_blog", item=item,
            encrypt_for="account1@server2.test"
        )

        # the item should be transparently decrypted
        parsed_decrypted = li_elt.pubsub.get(
            service="account1@server1.test", node=node, item=item, no_cache=True,
            profile="account1_s2"
        )
        entry_elt = parsed_decrypted.firstChildElement()
        assert entry_elt.name == "entry"
        assert entry_elt.uri == NS_ATOM
        assert secret_blog in parsed_decrypted.toXml()

        # with --no-decrypt, we should have the encrypted item
        parsed_ori_item = li_elt.pubsub.get(
            node=node, item=item, no_decrypt=True, no_cache=True
        )
        encrypted_elt = parsed_ori_item.firstChildElement()
        assert encrypted_elt.name == "encrypted"
        assert encrypted_elt.uri == NS_PTE
        # the body must not be readable in plain text
        assert secret_blog not in parsed_ori_item.toXml()