Mercurial > libervia-backend
view tests/unit/test_plugin_xep_0293.py @ 4062:18719058a914
plugin XEP-0294: "Jingle RTP Feedback Negotiation" implementation:
rel 438
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 30 May 2023 17:58:43 +0200 |
parents | fddd76dedc97 |
children | 4b842c1fb686 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from twisted.words.xish import domish from sat.plugins.plugin_xep_0167.constants import NS_JINGLE_RTP from sat.plugins.plugin_xep_0293 import NS_JINGLE_RTP_RTCP_FB, RTCP_FB_KEY, XEP_0293 from sat.tools import xml_tools class TestXEP0293: def test_parse_sdp_rtcp_fb_general(self, host): """Parsing of a general RTCP feedback SDP line.""" xep_0293 = XEP_0293(host) application_data = {} transport_data = {} # SDP line: a=rtcp-fb:* nack pli attribute = "rtcp-fb" parts = ["*", "nack", "pli"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) def test_parse_sdp_rtcp_fb_specific(self, host): """Parsing of a payload-specific RTCP feedback SDP line.""" xep_0293 = XEP_0293(host) application_data = { "payload_types": {96: {}} } transport_data = {} # SDP line: a=rtcp-fb:96 nack attribute = "rtcp-fb" parts = ["96", "nack"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data["payload_types"][96][RTCP_FB_KEY][0] == ("nack", None, {}) def test_parse_sdp_rtcp_fb_trr_int_general(self, host): """Parsing of a general RTCP feedback with trr-int SDP line.""" xep_0293 = XEP_0293(host) application_data = {} transport_data = {} # SDP line: a=rtcp-fb-trr-int:* 100 attribute = "rtcp-fb-trr-int" parts = ["*", "100"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data["rtcp-fb-trr-int"] == 100 def test_parse_sdp_rtcp_fb_trr_int_specific(self, host): """Parsing of a payload-specific RTCP feedback with trr-int SDP line.""" xep_0293 = XEP_0293(host) application_data = { "payload_types": {96: {}} } transport_data = {} # SDP line: a=rtcp-fb-trr-int:96 100 attribute = "rtcp-fb-trr-int" parts = ["96", "100"] xep_0293._parse_sdp_a_trigger( attribute=attribute, parts=parts, call_data={}, metadata={}, media_type="video", application_data=application_data, transport_data=transport_data, ) assert application_data["payload_types"][96]["rtcp-fb-trr-int"] == 100 def test_generate_sdp_session(self, host): """Generation of SDP lines for session data.""" xep_0293 = XEP_0293(host) sdp_lines = [] application_data = {RTCP_FB_KEY: [("nack", "pli", {})], "rtcp-fb-trr-int": 100} xep_0293._generate_sdp_content_trigger( session={}, local=True, content_name="test", content_data={}, sdp_lines=sdp_lines, application_data=application_data, app_data_key="test", media_data={}, media="video", ) assert sdp_lines[0] == "a=rtcp-fb:* nack pli" assert sdp_lines[1] == "a=rtcp-fb:* trr-int 100" def test_generate_sdp_payload_type(self, host): """Generation of SDP lines for each payload type.""" xep_0293 = XEP_0293(host) sdp_lines = [] application_data = {} media_data = { "payload_types": {96: {RTCP_FB_KEY: [("nack", None, {})], "rtcp-fb-trr-int": 100}} } xep_0293._generate_sdp_content_trigger( session={}, local=True, content_name="test", content_data={}, sdp_lines=sdp_lines, application_data=application_data, app_data_key="test", media_data=media_data, media="video", ) assert sdp_lines[0] == "a=rtcp-fb:96 nack" assert sdp_lines[1] == "a=rtcp-fb:96 trr-int 100" def test_parse_description(self, host): """Parsing of <rtcp-fb> and <rtcp-fb-trr-int> elements from a description.""" xep_0293 = XEP_0293(host) desc_element = xml_tools.parse( f""" <description xmlns="urn:xmpp:jingle:apps:rtp:1" media="audio"> <rtcp-fb xmlns="{NS_JINGLE_RTP_RTCP_FB}" type="nack" subtype="pli"/> <rtcp-fb-trr-int xmlns="{NS_JINGLE_RTP_RTCP_FB}" value="100"/> </description> """ ) media_data = {} xep_0293._parse_description_trigger(desc_element, media_data) assert media_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) assert media_data["rtcp_fb_trr_int"][0] == 100 def test_parse_description_payload_type(self, host): """Parsing of <rtcp-fb> and <rtcp-fb-trr-int> elements from a payload type.""" xep_0293 = XEP_0293(host) desc_element = xml_tools.parse( f""" <description xmlns="urn:xmpp:jingle:apps:rtp:1" media="audio"> <payload-type id="96"> <rtcp-fb xmlns="{NS_JINGLE_RTP_RTCP_FB}" type="nack" subtype="pli"/> <rtcp-fb-trr-int xmlns="{NS_JINGLE_RTP_RTCP_FB}" value="100"/> </payload-type> </description> """ ) media_data = {} payload_type_elt = desc_element.firstChildElement() payload_type_data = {} xep_0293._parse_description_payload_type_trigger( desc_element, media_data, payload_type_elt, payload_type_data ) assert payload_type_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) assert payload_type_data["rtcp_fb_trr_int"][0] == 100 def test_build_rtcp_fb_elements(self, host): """Building the <rtcp-fb> and <rtcp-fb-trr-int> elements.""" xep_0293 = XEP_0293(host) data = { RTCP_FB_KEY: [("nack", "pli", {})], "rtcp-fb-trr-int": 100 } # Test _build_description_trigger desc_elt = domish.Element((NS_JINGLE_RTP, "description")) xep_0293._build_description_trigger(desc_elt, data, {}) rtcp_fb_elts = list(desc_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) assert len(rtcp_fb_elts) == 1 rtcp_fb_elt = rtcp_fb_elts[0] assert rtcp_fb_elt["type"] == "nack" assert rtcp_fb_elt["subtype"] == "pli" rtcp_fb_trr_int_elts = list(desc_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")) assert len(rtcp_fb_trr_int_elts) == 1 rtcp_fb_trr_int_elt = rtcp_fb_trr_int_elts[0] assert int(rtcp_fb_trr_int_elt["value"]) == 100 # Test _build_description_payload_type_trigger desc_elt = domish.Element((NS_JINGLE_RTP, "description")) payload_type_elt = desc_elt.addElement((NS_JINGLE_RTP, "payload-type")) xep_0293._build_description_payload_type_trigger(desc_elt, {}, data, payload_type_elt) rtcp_fb_elts = list(payload_type_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) assert len(rtcp_fb_elts) == 1 rtcp_fb_elt = rtcp_fb_elts[0] assert rtcp_fb_elt["type"] == "nack" assert rtcp_fb_elt["subtype"] == "pli" rtcp_fb_trr_int_elts = list(payload_type_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")) assert len(rtcp_fb_trr_int_elts) == 1 rtcp_fb_trr_int_elt = rtcp_fb_trr_int_elts[0] assert int(rtcp_fb_trr_int_elt["value"]) == 100