Mercurial > libervia-backend
view tests/unit/test_plugin_xep_0293.py @ 4180:b86912d3fd33
plugin IP: fix use of legacy URL + coroutine use:
An https:/salut-a-toi.org URL was used to retrieve external IP, but it's not valid
anymore, resulting in an exception. This feature is currently disabled.
Also moved several methods from legacy inline callbacks to coroutines.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 09 Dec 2023 14:30:54 +0100 |
parents | 4b842c1fb686 |
children | f1d0cde61af7 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from twisted.words.xish import domish from libervia.backend.plugins.plugin_xep_0167.constants import NS_JINGLE_RTP from libervia.backend.plugins.plugin_xep_0293 import NS_JINGLE_RTP_RTCP_FB, RTCP_FB_KEY, XEP_0293 from libervia.backend.tools import xml_tools class TestXEP0293: def test_parse_sdp_rtcp_fb_general(self, host): """Parsing of a general RTCP feedback SDP line.""" xep_0293 = XEP_0293(host) application_data = {} transport_data = {} # SDP line: a=rtcp-fb:* nack pli attribute = "rtcp-fb" parts = ["*", "nack", "pli"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) def test_parse_sdp_rtcp_fb_specific(self, host): """Parsing of a payload-specific RTCP feedback SDP line.""" xep_0293 = XEP_0293(host) application_data = { "payload_types": {96: {}} } transport_data = {} # SDP line: a=rtcp-fb:96 nack attribute = "rtcp-fb" parts = ["96", "nack"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data["payload_types"][96][RTCP_FB_KEY][0] == ("nack", None, {}) def test_parse_sdp_rtcp_fb_trr_int_general(self, host): """Parsing of a general RTCP feedback with trr-int SDP line.""" xep_0293 = XEP_0293(host) application_data = {} transport_data = {} # SDP line: a=rtcp-fb-trr-int:* 100 attribute = "rtcp-fb-trr-int" parts = ["*", "100"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data["rtcp-fb-trr-int"] == 100 def test_parse_sdp_rtcp_fb_trr_int_specific(self, host): """Parsing of a payload-specific RTCP feedback with trr-int SDP line.""" xep_0293 = XEP_0293(host) application_data = { "payload_types": {96: {}} } transport_data = {} # SDP line: a=rtcp-fb-trr-int:96 100 attribute = "rtcp-fb-trr-int" parts = ["96", "100"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data["payload_types"][96]["rtcp-fb-trr-int"] == 100 def test_generate_sdp_session(self, host): """Generation of SDP lines for session data.""" xep_0293 = XEP_0293(host) sdp_lines = [] application_data = {RTCP_FB_KEY: [("nack", "pli", {})], "rtcp-fb-trr-int": 100} xep_0293._generate_sdp_content_trigger( session={}, local=True, content_name="test", content_data={}, sdp_lines=sdp_lines, application_data=application_data, app_data_key="test", media_data={}, media="video", ) assert sdp_lines[0] == "a=rtcp-fb:* nack pli" assert sdp_lines[1] == "a=rtcp-fb:* trr-int 100" def test_generate_sdp_payload_type(self, host): """Generation of SDP lines for each payload type.""" xep_0293 = XEP_0293(host) sdp_lines = [] application_data = {} media_data = { "payload_types": {96: {RTCP_FB_KEY: [("nack", None, {})], "rtcp-fb-trr-int": 100}} } xep_0293._generate_sdp_content_trigger( session={}, local=True, content_name="test", content_data={}, sdp_lines=sdp_lines, application_data=application_data, app_data_key="test", media_data=media_data, media="video", ) assert sdp_lines[0] == "a=rtcp-fb:96 nack" assert sdp_lines[1] == "a=rtcp-fb:96 trr-int 100" def test_parse_description(self, host): """Parsing of <rtcp-fb> and <rtcp-fb-trr-int> elements from a description.""" xep_0293 = XEP_0293(host) desc_element = xml_tools.parse( f""" <description xmlns="urn:xmpp:jingle:apps:rtp:1" media="audio"> <rtcp-fb xmlns="{NS_JINGLE_RTP_RTCP_FB}" type="nack" subtype="pli"/> <rtcp-fb-trr-int xmlns="{NS_JINGLE_RTP_RTCP_FB}" value="100"/> </description> """ ) media_data = {} xep_0293._parse_description_trigger(desc_element, media_data) assert media_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) assert media_data["rtcp_fb_trr_int"][0] == 100 def test_parse_description_payload_type(self, host): """Parsing of <rtcp-fb> and <rtcp-fb-trr-int> elements from a payload type.""" xep_0293 = XEP_0293(host) desc_element = xml_tools.parse( f""" <description xmlns="urn:xmpp:jingle:apps:rtp:1" media="audio"> <payload-type id="96"> <rtcp-fb xmlns="{NS_JINGLE_RTP_RTCP_FB}" type="nack" subtype="pli"/> <rtcp-fb-trr-int xmlns="{NS_JINGLE_RTP_RTCP_FB}" value="100"/> </payload-type> </description> """ ) media_data = {} payload_type_elt = desc_element.firstChildElement() payload_type_data = {} xep_0293._parse_description_payload_type_trigger( desc_element, media_data, payload_type_elt, payload_type_data ) assert payload_type_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) assert payload_type_data["rtcp_fb_trr_int"][0] == 100 def test_build_rtcp_fb_elements(self, host): """Building the <rtcp-fb> and <rtcp-fb-trr-int> elements.""" xep_0293 = XEP_0293(host) data = { RTCP_FB_KEY: [("nack", "pli", {})], "rtcp-fb-trr-int": 100 } # Test _build_description_trigger desc_elt = domish.Element((NS_JINGLE_RTP, "description")) xep_0293._build_description_trigger(desc_elt, data, {}) rtcp_fb_elts = list(desc_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) assert len(rtcp_fb_elts) == 1 rtcp_fb_elt = rtcp_fb_elts[0] assert rtcp_fb_elt["type"] == "nack" assert rtcp_fb_elt["subtype"] == "pli" rtcp_fb_trr_int_elts = list(desc_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")) assert len(rtcp_fb_trr_int_elts) == 1 rtcp_fb_trr_int_elt = rtcp_fb_trr_int_elts[0] assert int(rtcp_fb_trr_int_elt["value"]) == 100 # Test _build_description_payload_type_trigger desc_elt = domish.Element((NS_JINGLE_RTP, "description")) payload_type_elt = desc_elt.addElement((NS_JINGLE_RTP, "payload-type")) xep_0293._build_description_payload_type_trigger(desc_elt, {}, data, payload_type_elt) rtcp_fb_elts = list(payload_type_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) assert len(rtcp_fb_elts) == 1 rtcp_fb_elt = rtcp_fb_elts[0] assert rtcp_fb_elt["type"] == "nack" assert rtcp_fb_elt["subtype"] == "pli" rtcp_fb_trr_int_elts = list(payload_type_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")) assert len(rtcp_fb_trr_int_elts) == 1 rtcp_fb_trr_int_elt = rtcp_fb_trr_int_elts[0] assert int(rtcp_fb_trr_int_elt["value"]) == 100