view tests/unit/test_plugin_xep_0167.py @ 4237:a1e7e82a8921 default tip @

core: implement SCRAM-SHA auth algorithm: Twisted auth mechanism are outdated, and as a result, Libervia was not supporting the mandatory SCRAM-SHA auth mechanism. This patch implements it for SCRAM-SHA-1, SCRAM-SHA-256 and SCRAM-SHA-512 variants.
author Goffi <goffi@goffi.org>
date Mon, 08 Apr 2024 12:29:40 +0200
parents 716dd791be46
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
from unittest.mock import MagicMock, patch

from pytest import fixture
from pytest import raises
from twisted.words.protocols.jabber import jid

from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
from libervia.backend.plugins.plugin_xep_0167 import XEP_0167, mapping
from libervia.backend.plugins.plugin_xep_0167.constants import NS_JINGLE_RTP, NS_JINGLE_RTP_INFO
from libervia.backend.tools import xml_tools
from libervia.backend.tools.common import data_format


@fixture(autouse=True)
def no_application_register(monkeypatch):
    """Do not register the application in XEP-0166"""
    monkeypatch.setattr(XEP_0166, "register_application", lambda *a, **kw: None)


class TestXEP0167Mapping:
    @fixture(scope="class", autouse=True)
    def set_mapping_host(self, host):
        mapping.host = host

    def test_senders_to_sdp(self):
        """Senders are mapped to SDP attribute"""
        assert mapping.senders_to_sdp("both", {"role": "initiator"}) == "a=sendrecv"
        assert mapping.senders_to_sdp("none", {"role": "initiator"}) == "a=inactive"
        assert mapping.senders_to_sdp("initiator", {"role": "initiator"}) == "a=sendonly"
        assert mapping.senders_to_sdp("responder", {"role": "initiator"}) == "a=recvonly"

    def test_generate_sdp_from_session(self):
        """SDP is correctly generated from session data"""
        session = {
            "local_jid": jid.JID("toto@example.org/test"),
            "metadata": {},
            "contents": {
                "audio": {
                    "application_data": {
                        "media": "audio",
                        "local_data": {
                            "payload_types": {
                                96: {
                                    "name": "opus",
                                    "clockrate": 48000,
                                    "parameters": {"sprop-stereo": "1"},
                                }
                            }
                        },
                    },
                    "transport_data": {
                        "local_ice_data": {
                            "ufrag": "ufrag",
                            "pwd": "pwd",
                            "candidates": [
                                {
                                    "foundation": "1",
                                    "component_id": 1,
                                    "transport": "UDP",
                                    "priority": 1,
                                    "address": "10.0.0.1",
                                    "port": 12345,
                                    "type": "host",
                                }
                            ],
                        }
                    },
                    "senders": "both",
                }
            },
        }

        expected_sdp = (
            "v=0\r\n"
            f"o={base64.b64encode('toto@example.org/test'.encode()).decode()} 1 1 IN IP4 0.0.0.0\r\n"
            "s=-\r\n"
            "t=0 0\r\n"
            "a=msid-semantic:WMS *\r\n"
            "a=ice-options:trickle\r\n"
            "m=audio 9 UDP/TLS/RTP/SAVPF 96\r\n"
            "c=IN IP4 0.0.0.0\r\n"
            "a=mid:audio\r\n"
            "a=sendrecv\r\n"
            "a=rtpmap:96 opus/48000\r\n"
            "a=fmtp:96 sprop-stereo=1\r\n"
            "a=ice-ufrag:ufrag\r\n"
            "a=ice-pwd:pwd\r\n"
            "a=candidate:1 1 UDP 1 10.0.0.1 12345 typ host\r\n"
        )

        assert mapping.generate_sdp_from_session(session, True) == expected_sdp

    def test_parse_sdp(self):
        """SDP is correctly parsed to session data"""
        sdp = (
            "v=0\r\n"
            "o=toto@example.org/test 1 1 IN IP4 0.0.0.0\r\n"
            "s=-\r\n"
            "t=0 0\r\n"
            "a=sendrecv\r\n"
            "a=msid-semantic:WMS *\r\n"
            "m=audio 9999 UDP/TLS/RTP/SAVPF 96\r\n"
            "c=IN IP4 0.0.0.0\r\n"
            "a=mid:audio\r\n"
            "a=rtpmap:96 opus/48000\r\n"
            "a=fmtp:96 sprop-stereo=1\r\n"
            "a=ice-ufrag:ufrag\r\n"
            "a=ice-pwd:pwd\r\n"
            "a=candidate:1 1 UDP 1 10.0.0.1 12345 typ host\r\n"
        )

        expected_session = {
            "audio": {
                "application_data": {
                    "media": "audio",
                    "payload_types": {
                        96: {
                            "id": 96,
                            "name": "opus",
                            "clockrate": 48000,
                            "parameters": {"sprop-stereo": "1"},
                        }
                    },
                },
                "transport_data": {
                    "port": 9999,
                    "pwd": "pwd",
                    "ufrag": "ufrag",
                    "candidates": [
                        {
                            "foundation": "1",
                            "component_id": 1,
                            "transport": "UDP",
                            "priority": 1,
                            "address": "10.0.0.1",
                            "port": 12345,
                            "type": "host",
                        }
                    ],
                },
                "id": "audio",
            },
            "metadata": {},
        }

        assert mapping.parse_sdp(sdp) == expected_session

    def test_build_description(self):
        """<description> element is generated from media data"""
        session = {"metadata": {}}

        media_data = {
            "payload_types": {
                96: {
                    "channels": "2",
                    "clockrate": "48000",
                    "id": "96",
                    "maxptime": "60",
                    "name": "opus",
                    "ptime": "20",
                    "parameters": {"sprop-stereo": "1"},
                }
            },
            "bandwidth": "AS:40000",
            "rtcp-mux": True,
            "encryption": [
                {
                    "tag": "1",
                    "crypto-suite": "AES_CM_128_HMAC_SHA1_80",
                    "key-params": "inline:DPSKYRle84Ua2MjbScjadpCvFQH5Tutuls2N4/xx",
                    "session-params": "",
                }
            ],
        }

        description_element = mapping.build_description("audio", media_data, session)

        # Assertions
        assert description_element.name == "description"
        assert description_element.uri == NS_JINGLE_RTP
        assert description_element["media"] == "audio"

        # Payload types
        payload_types = list(description_element.elements(NS_JINGLE_RTP, "payload-type"))
        assert len(payload_types) == 1
        assert payload_types[0].name == "payload-type"
        assert payload_types[0]["id"] == "96"
        assert payload_types[0]["channels"] == "2"
        assert payload_types[0]["clockrate"] == "48000"
        assert payload_types[0]["maxptime"] == "60"
        assert payload_types[0]["name"] == "opus"
        assert payload_types[0]["ptime"] == "20"

        # Parameters
        parameters = list(payload_types[0].elements(NS_JINGLE_RTP, "parameter"))
        assert len(parameters) == 1
        assert parameters[0].name == "parameter"
        assert parameters[0]["name"] == "sprop-stereo"
        assert parameters[0]["value"] == "1"

        # Bandwidth
        bandwidth = list(description_element.elements(NS_JINGLE_RTP, "bandwidth"))
        assert len(bandwidth) == 1
        assert bandwidth[0]["type"] == "AS:40000"

        # RTCP-mux
        rtcp_mux = list(description_element.elements(NS_JINGLE_RTP, "rtcp-mux"))
        assert len(rtcp_mux) == 1

        # Encryption
        encryption = list(description_element.elements(NS_JINGLE_RTP, "encryption"))
        assert len(encryption) == 1
        assert encryption[0]["required"] == "1"
        crypto = list(encryption[0].elements("crypto"))
        assert len(crypto) == 1
        assert crypto[0]["tag"] == "1"
        assert crypto[0]["crypto-suite"] == "AES_CM_128_HMAC_SHA1_80"
        assert (
            crypto[0]["key-params"] == "inline:DPSKYRle84Ua2MjbScjadpCvFQH5Tutuls2N4/xx"
        )
        assert crypto[0]["session-params"] == ""

    def test_parse_description(self):
        """Parsing <description> to a dict is successful"""
        description_element = xml_tools.parse(
            """
            <description xmlns="urn:xmpp:jingle:apps:rtp:1" media="audio">
                <payload-type id="96" channels="2" clockrate="48000" maxptime="60" name="opus" ptime="20">
                    <parameter name="sprop-stereo" value="1" />
                </payload-type>
                <bandwidth type="AS:40000" />
                <rtcp-mux />
                <encryption required="1">
                    <crypto tag="1" crypto-suite="AES_CM_128_HMAC_SHA1_80" key-params="inline:DPSKYRle84Ua2MjbScjadpCvFQH5Tutuls2N4/xx" session-params="" />
                </encryption>
            </description>
            """
        )

        parsed_data = mapping.parse_description(description_element)

        # Assertions
        assert parsed_data["payload_types"] == {
            96: {
                "channels": "2",
                "clockrate": "48000",
                "maxptime": "60",
                "name": "opus",
                "ptime": "20",
                "parameters": {"sprop-stereo": "1"},
            }
        }
        assert parsed_data["bandwidth"] == "AS:40000"
        assert parsed_data["rtcp-mux"] is True
        assert parsed_data["encryption_required"] is True
        assert parsed_data["encryption"] == [
            {
                "tag": "1",
                "crypto-suite": "AES_CM_128_HMAC_SHA1_80",
                "key-params": "inline:DPSKYRle84Ua2MjbScjadpCvFQH5Tutuls2N4/xx",
                "session-params": "",
            }
        ]


class TestXEP0167:
    def test_jingle_session_info(self, host, client):
        """Bridge's call_info method is called with correct parameters."""
        xep_0167 = XEP_0167(host)
        session = {"id": "123"}
        mock_call_info = MagicMock()
        host.bridge.call_info = mock_call_info

        jingle_elt = xml_tools.parse(
            """
            <jingle xmlns='urn:xmpp:jingle:1'
                action='session-info'
                initiator='client1@example.org'
                sid='a73sjjvkla37jfea'>
                <mute xmlns="urn:xmpp:jingle:apps:rtp:info:1" name="mute_name"/>
            </jingle>
            """
        )

        xep_0167.jingle_session_info(client, "mute", session, "content_name", jingle_elt)

        mock_call_info.assert_called_with(
            session["id"],
            "mute",
            data_format.serialise({"name": "mute_name"}),
            client.profile,
        )

    def test_jingle_session_info_invalid_actions(self, host, client):
        """When receiving invalid actions, no further action is taken."""
        xep_0167 = XEP_0167(host)
        session = {"id": "123"}
        mock_call_info = MagicMock()
        host.bridge.call_info = mock_call_info

        jingle_elt = xml_tools.parse(
            """
            <jingle xmlns='urn:xmpp:jingle:1'
                action='session-info'
                initiator='client1@example.org'
                sid='a73sjjvkla37jfea'>
                <invalid xmlns="urn:xmpp:jingle:apps:rtp:info:1" name="invalid_name"/>
            </jingle>
            """
        )

        xep_0167.jingle_session_info(
            client, "invalid", session, "content_name", jingle_elt
        )
        mock_call_info.assert_not_called()

    def test_send_info(self, host, client):
        """A jingle element with the correct info is created and sent."""
        xep_0167 = XEP_0167(host)
        session_id = "123"
        extra = {"name": "test"}

        iq_elt = xml_tools.parse(
            """
            <iq from='client1@example.org'
                id='yh3gr714'
                to='client2@example.net'
                type='set'>
                <jingle xmlns='urn:xmpp:jingle:1'
                        action='session-info'
                        initiator='client1@example.org'
                        sid='a73sjjvkla37jfea'>
                    <active xmlns='urn:xmpp:jingle:apps:rtp:info:1'/>
                </jingle>
            </iq>
            """
        )
        jingle_elt = iq_elt.firstChildElement()
        mock_send = MagicMock()
        iq_elt.send = mock_send

        with patch.object(
            xep_0167._j, "build_session_info", return_value=(iq_elt, jingle_elt)
        ):
            xep_0167.send_info(client, session_id, "mute", extra)

        info_elt = jingle_elt.firstChildElement()
        assert info_elt.name == "active"
        assert info_elt.uri == NS_JINGLE_RTP_INFO
        mock_send.assert_called()

    def test_send_info_invalid_actions(self, host, client):
        """When trying to send invalid actions, an error is raised."""
        xep_0167 = XEP_0167(host)
        session_id = "123"
        extra = {"name": "test"}

        with raises(ValueError, match="Unkown info type 'invalid_action'"):
            xep_0167.send_info(client, session_id, "invalid_action", extra)