view tests/unit/test_plugin_xep_0294.py @ 4141:ba8ddfdd334f

cli (loops): run GLib loop in same thread as asyncio: use the new `install_glib_asyncio_iteration` to run GLib in the same thread as asyncio. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:05:53 +0100
parents 4b842c1fb686
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from twisted.words.xish import domish

from libervia.backend.plugins.plugin_xep_0294 import NS_JINGLE_RTP_HDREXT, XEP_0294
from libervia.backend.tools.xml_tools import parse


class TestXEP0294:
    def test_extmap_attribute(self, host):
        """Session level 'extmap' SDP attribute are set to application_data"""
        xep_0294 = XEP_0294(host)

        call_data = {}
        # "a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level"
        parts = ["1", "urn:ietf:params:rtp-hdrext:ssrc-audio-level"]

        # Call the method with application_data as None to make is a session level
        # attribute
        xep_0294._parse_sdp_a_trigger("extmap", parts, call_data, {}, "audio", None, {})

        # Call the method again with application_data set
        application_data = {}
        xep_0294._parse_sdp_a_trigger(
            "", [], call_data, {}, "audio", application_data, {}
        )

        # Assert data has been correctly transferred
        assert application_data == {
            "rtp-hdrext": {
                "1": {
                    "id": "1",
                    "uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
                    "senders": "both",
                }
            }
        }

    def test_extmap_allow_mixed_attribute(self, host):
        """Session level 'extmap-allow-mixed' SDP attribute are set to application_data"""
        xep_0294 = XEP_0294(host)

        call_data = {}

        # Call the method with application_data as None to make is a session level
        # attribute
        xep_0294._parse_sdp_a_trigger(
            "extmap-allow-mixed", [], call_data, {}, "audio", None, {}
        )

        # Call the method again with application_data set
        application_data = {}
        xep_0294._parse_sdp_a_trigger(
            "", [], call_data, {}, "audio", application_data, {}
        )

        # Assert value has been correctly transferred
        assert application_data == {"extmap-allow-mixed": True}

    def test_generate_sdp_content(self, host):
        """SDP for 'extmap' and 'extmap-allow-mixed' attributes is correctly generated"""
        xep_0294 = XEP_0294(host)

        session = {}
        local = False
        idx = 0
        content_data = {}
        sdp_lines = []
        application_data = {}
        app_data_key = "rtp-hdrext"
        media_data = {
            "rtp-hdrext": {
                "1": {
                    "id": "1",
                    "uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
                    "senders": "both",
                    "parameters": {"param1": "value1", "param2": "value2"},
                },
                "2": {
                    "id": "2",
                    "uri": "urn:ietf:params:rtp-hdrext:time-offset",
                    "senders": "initiator",
                },
                "3": {
                    "id": "3",
                    "uri": "urn:ietf:params:rtp-hdrext:toffset",
                    "senders": "none",
                },
            },
            "extmap-allow-mixed": True,
        }
        media = "audio"

        xep_0294._generate_sdp_content_trigger(
            session,
            local,
            idx,
            content_data,
            sdp_lines,
            application_data,
            app_data_key,
            media_data,
            media,
        )

        assert sdp_lines == [
            "a=extmap:1/sendrecv urn:ietf:params:rtp-hdrext:ssrc-audio-level param1=value1 param2=value2",
            "a=extmap:2/sendonly urn:ietf:params:rtp-hdrext:time-offset",
            "a=extmap:3/inactive urn:ietf:params:rtp-hdrext:toffset",
            "a=extmap-allow-mixed",
        ]

    def test_parse_description(self, host):
        """'rtp-hdrext' and 'extmap-allow-mixed' elements are correctly parsed"""
        xep_0294 = XEP_0294(host)
        media_data = {}

        desc_elt_str = """
        <description>
            <rtp-hdrext xmlns='urn:xmpp:jingle:apps:rtp:rtp-hdrext:0'
                        id='1'
                        uri='urn:ietf:params:rtp-hdrext:ssrc-audio-level'
                        senders='both'>
                <parameter xmlns='urn:xmpp:jingle:apps:rtp:rtp-hdrext:0'
                           name='vad'
                           value='on'/>
            </rtp-hdrext>
            <rtp-hdrext xmlns='urn:xmpp:jingle:apps:rtp:rtp-hdrext:0'
                        id='2'
                        uri='urn:ietf:params:rtp-hdrext:toffset'
                        senders='initiator'/>
            <extmap-allow-mixed xmlns='urn:xmpp:jingle:apps:rtp:rtp-hdrext:0'/>
        </description>
        """
        desc_elt = parse(desc_elt_str)

        xep_0294._parse_description_trigger(desc_elt, media_data)

        assert media_data == {
            "rtp-hdrext": {
                "1": {
                    "id": "1",
                    "uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
                    "senders": "both",
                    "parameters": {"vad": "on"},
                },
                "2": {
                    "id": "2",
                    "uri": "urn:ietf:params:rtp-hdrext:toffset",
                    "senders": "initiator",
                },
            },
            "extmap-allow-mixed": True,
        }

    def test_build_description(self, host):
        """'rtp-hdrext' and 'extmap-allow-mixed' elements are correctly built"""
        xep_0294 = XEP_0294(host)

        desc_elt = domish.Element((None, "description"))
        media_data = {
            "rtp-hdrext": {
                "1": {
                    "id": "1",
                    "uri": "urn:ietf:params:rtp-hdrext:ssrc-audio-level",
                    "senders": "both",
                    "parameters": {"vad": "on"},
                },
                "2": {
                    "id": "2",
                    "uri": "urn:ietf:params:rtp-hdrext:toffset",
                    "senders": "initiator",
                },
            },
            "extmap-allow-mixed": True,
        }

        xep_0294._build_description_trigger(desc_elt, media_data, {})

        rtp_hdrext_elts = list(desc_elt.elements(NS_JINGLE_RTP_HDREXT, "rtp-hdrext"))
        assert len(rtp_hdrext_elts) == 2

        rtp_hdrext_elt1, rtp_hdrext_elt2 = rtp_hdrext_elts

        assert rtp_hdrext_elt1["id"] == "1"
        assert rtp_hdrext_elt1["uri"] == "urn:ietf:params:rtp-hdrext:ssrc-audio-level"

        # both is default and should not be set
        assert "senders" not in rtp_hdrext_elt1.attributes

        param_elt1 = list(rtp_hdrext_elt1.elements(NS_JINGLE_RTP_HDREXT, "parameter"))[0]
        assert param_elt1["name"] == "vad"
        assert param_elt1["value"] == "on"

        assert rtp_hdrext_elt2["id"] == "2"
        assert rtp_hdrext_elt2["uri"] == "urn:ietf:params:rtp-hdrext:toffset"
        assert rtp_hdrext_elt2["senders"] == "initiator"

        extmap_allow_mixed_elts = list(
            desc_elt.elements(NS_JINGLE_RTP_HDREXT, "extmap-allow-mixed")
        )
        assert len(extmap_allow_mixed_elts) == 1