view tests/unit/test_plugin_xep_0176.py @ 4237:a1e7e82a8921 default tip @

core: implement SCRAM-SHA auth algorithm: Twisted auth mechanism are outdated, and as a result, Libervia was not supporting the mandatory SCRAM-SHA auth mechanism. This patch implements it for SCRAM-SHA-1, SCRAM-SHA-256 and SCRAM-SHA-512 variants.
author Goffi <goffi@goffi.org>
date Mon, 08 Apr 2024 12:29:40 +0200
parents 0fbe5c605eb6
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from unittest.mock import AsyncMock, MagicMock

from pytest import fixture
from pytest_twisted import ensureDeferred as ed

from libervia.backend.plugins.plugin_xep_0166 import XEP_0166
from libervia.backend.plugins.plugin_xep_0176 import NS_JINGLE_ICE_UDP, XEP_0176
from libervia.backend.tools import xml_tools


@fixture(autouse=True)
def no_transport_register(monkeypatch):
    """Do not register the transport in XEP-0166"""
    monkeypatch.setattr(XEP_0166, "register_transport", lambda *a, **kw: None)


class TestXEP0176:
    def create_mock_session(self, content_name):
        return {
            "id": "test-session-id",
            "state": XEP_0166.STATE_ACTIVE,
            "contents": {
                content_name: {
                    "application_data": {"media": "audio"},
                    "transport": MagicMock(),
                    "transport_data": {
                        "local_ice_data": {
                            "ufrag": "testufrag",
                            "pwd": "testpwd",
                            "candidates": [
                                {
                                    "id": "candidate_1",
                                    "component_id": 1,
                                    "foundation": "1",
                                    "address": "192.0.2.1",
                                    "port": 1234,
                                    "priority": 1,
                                    "transport": "udp",
                                    "type": "host",
                                }
                            ],
                        }
                    },
                }
            },
        }

    def test_build_transport(self, host, monkeypatch):
        """ICE data is correctly transformed into transport element"""
        xep_0176 = XEP_0176(host)

        ice_data = {
            "ufrag": "user1",
            "pwd": "password1",
            "candidates": [
                {
                    "component_id": 1,
                    "foundation": "1",
                    "address": "192.168.0.1",
                    "port": 1234,
                    "priority": 100,
                    "transport": "udp",
                    "type": "host",
                    "generation": "0",
                    "network": "0",
                },
                {
                    "component_id": 2,
                    "foundation": "1",
                    "address": "192.168.0.2",
                    "port": 5678,
                    "priority": 100,
                    "transport": "udp",
                    "type": "host",
                    "generation": "0",
                    "network": "0",
                    "rel_addr": "10.0.0.1",
                    "rel_port": 9012,
                },
            ],
        }

        transport_elt = xep_0176.build_transport(ice_data)

        assert transport_elt.name == "transport"
        assert transport_elt.uri == NS_JINGLE_ICE_UDP
        assert transport_elt.getAttribute("ufrag") == "user1"
        assert transport_elt.getAttribute("pwd") == "password1"

        candidates = list(transport_elt.elements(NS_JINGLE_ICE_UDP, "candidate"))

        assert len(candidates) == len(ice_data["candidates"])

        for i, candidate_elt in enumerate(candidates):
            ice_candidate = ice_data["candidates"][i]
            assert (
                int(candidate_elt.getAttribute("component"))
                == ice_candidate["component_id"]
            )
            assert candidate_elt.getAttribute("foundation") == ice_candidate["foundation"]
            assert candidate_elt.getAttribute("ip") == ice_candidate["address"]
            assert int(candidate_elt.getAttribute("port")) == ice_candidate["port"]
            assert (
                int(candidate_elt.getAttribute("priority")) == ice_candidate["priority"]
            )
            assert candidate_elt.getAttribute("protocol") == ice_candidate["transport"]
            assert candidate_elt.getAttribute("type") == ice_candidate["type"]
            assert candidate_elt.getAttribute("generation") == str(
                ice_candidate.get("generation", "0")
            )
            assert candidate_elt.getAttribute("network") == str(
                ice_candidate.get("network", "0")
            )

            if "rel_addr" in ice_candidate:
                assert candidate_elt.getAttribute("rel-addr") == ice_candidate["rel_addr"]
                assert (
                    int(candidate_elt.getAttribute("rel-port"))
                    == ice_candidate["rel_port"]
                )
            else:
                assert candidate_elt.getAttribute("rel-addr") is None
                assert candidate_elt.getAttribute("rel-port") is None

    def test_parse_transport(self, host):
        """Transport element is correctly parsed into ICE data"""
        xep_0176 = XEP_0176(host)

        transport_elt = xml_tools.parse(
            """
            <transport xmlns="urn:xmpp:jingle:transports:ice-udp:1"
                       pwd="password1"
                       ufrag="user1">
                <candidate component="1"
                           foundation="1"
                           generation="0"
                           id="uuid1"
                           ip="192.168.0.1"
                           network="0"
                           port="1234"
                           priority="100"
                           protocol="udp"
                           type="host" />
                <candidate component="2"
                           foundation="1"
                           generation="0"
                           id="uuid2"
                           ip="192.168.0.2"
                           network="0"
                           port="5678"
                           priority="100"
                           protocol="udp"
                           type="host"
                           rel-addr="10.0.0.1"
                           rel-port="9012" />
            </transport>
            """
        )

        ice_data = xep_0176.parse_transport(transport_elt)

        assert transport_elt.getAttribute("ufrag") == "user1"
        assert transport_elt.getAttribute("pwd") == "password1"

        candidates = list(transport_elt.elements(NS_JINGLE_ICE_UDP, "candidate"))
        assert len(candidates) == len(ice_data["candidates"])

        for i, candidate_elt in enumerate(candidates):
            ice_candidate = ice_data["candidates"][i]
            assert (
                int(candidate_elt.getAttribute("component"))
                == ice_candidate["component_id"]
            )
            assert candidate_elt.getAttribute("foundation") == ice_candidate["foundation"]
            assert candidate_elt.getAttribute("ip") == ice_candidate["address"]
            assert int(candidate_elt.getAttribute("port")) == ice_candidate["port"]
            assert (
                int(candidate_elt.getAttribute("priority")) == ice_candidate["priority"]
            )
            assert candidate_elt.getAttribute("protocol") == ice_candidate["transport"]
            assert candidate_elt.getAttribute("type") == ice_candidate["type"]
            assert candidate_elt.getAttribute("generation") == str(
                ice_candidate.get("generation", "0")
            )
            assert candidate_elt.getAttribute("network") == str(
                ice_candidate.get("network", "0")
            )

            if "rel_addr" in ice_candidate:
                assert candidate_elt.getAttribute("rel-addr") == ice_candidate["rel_addr"]
                assert (
                    int(candidate_elt.getAttribute("rel-port"))
                    == ice_candidate["rel_port"]
                )
            else:
                assert candidate_elt.getAttribute("rel-addr") is None
                assert candidate_elt.getAttribute("rel-port") is None

    @ed
    async def test_jingle_session_init(self, host, client):
        """<transport/> element is built on initiator side during init"""
        xep_0176 = XEP_0176(host)

        content_name = "test-content"
        session = self.create_mock_session(content_name)

        transport_elt = await xep_0176.jingle_session_init(client, session, content_name)

        expected_transport_elt = xep_0176.build_transport(
            session["contents"][content_name]["transport_data"]["local_ice_data"]
        )

        assert transport_elt.toXml() == expected_transport_elt.toXml()

    @ed
    async def test_jingle_handler(self, host, client):
        """<transport/> element is built on responder side during init"""
        xep_0176 = XEP_0176(host)

        content_name = "test-content"
        action = "session-initiate"
        session = self.create_mock_session(content_name)
        transport_elt = xml_tools.parse("<transport/>")

        returned_transport_elt = await xep_0176.jingle_handler(
            client, action, session, content_name, transport_elt
        )

        expected_transport_elt = xep_0176.build_transport(
            session["contents"][content_name]["transport_data"]["local_ice_data"]
        )

        assert returned_transport_elt.toXml() == expected_transport_elt.toXml()

    @ed
    async def test_ice_candidates_add(self, host, client):
        """local new ICE candidates are added, IQ is sent, bridge signal emitted"""
        xep_0176 = XEP_0176(host)

        content_name = "test-content"
        session_id = "test-session-id"
        media_ice_data_s = {
            "audio": {
                # we use different "ufrag" and "pwd" than in "create_mock_session" to
                # trigger an ICE restart
                "ufrag": "new_testufrag",
                "pwd": "new_testpwd",
                "candidates": [
                    {
                        "component_id": 2,
                        "foundation": "2",
                        "address": "192.0.2.2",
                        "port": 1235,
                        "priority": 2,
                        "transport": "udp",
                        "type": "host",
                    }
                ],
            }
        }

        mock_session = self.create_mock_session(content_name)
        xep_0176._j.get_session = MagicMock(return_value=mock_session)
        xep_0176._j.build_action = MagicMock()
        iq_elt = AsyncMock()
        xep_0176._j.build_action.return_value = (iq_elt, None)
        xep_0176.host.bridge.ice_restart = MagicMock()

        await xep_0176.ice_candidates_add(client, session_id, media_ice_data_s)

        xep_0176._j.build_action.assert_called()
        iq_elt.send.assert_called()
        xep_0176.host.bridge.ice_restart.assert_called()

        # Checking that local ICE data is updated correctly
        updated_local_ice_data = mock_session["contents"][content_name]["transport_data"][
            "local_ice_data"
        ]
        assert updated_local_ice_data["ufrag"] == "new_testufrag"
        assert updated_local_ice_data["pwd"] == "new_testpwd"
        assert (
            updated_local_ice_data["candidates"]
            == media_ice_data_s["audio"]["candidates"]
        )