# HG changeset patch # User Goffi # Date 1685375885 -7200 # Node ID fddd76dedc97ff8515f20e039e082d9bdd0ef415 # Parent fce92ba311f4ae36dbd5019c07cb9d047b0ff38d tests (units): tests for plugin XEP-0293: fix 437 diff -r fce92ba311f4 -r fddd76dedc97 tests/unit/test_plugin_xep_0293.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/unit/test_plugin_xep_0293.py Mon May 29 17:58:05 2023 +0200 @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from twisted.words.xish import domish + +from sat.plugins.plugin_xep_0167.constants import NS_JINGLE_RTP +from sat.plugins.plugin_xep_0293 import NS_JINGLE_RTP_RTCP_FB, RTCP_FB_KEY, XEP_0293 +from sat.tools import xml_tools + +class TestXEP0293: + + def test_parse_sdp_rtcp_fb_general(self, host): + """Parsing of a general RTCP feedback SDP line.""" + xep_0293 = XEP_0293(host) + + application_data = {} + transport_data = {} + + # SDP line: a=rtcp-fb:* nack pli + attribute = "rtcp-fb" + parts = ["*", "nack", "pli"] + xep_0293._parse_sdp_a_trigger( + attribute=attribute, + parts=parts, + call_data={}, + metadata={}, + media_type="video", + application_data=application_data, + transport_data=transport_data, + ) + assert application_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) + + def test_parse_sdp_rtcp_fb_specific(self, host): + """Parsing of a payload-specific RTCP feedback SDP line.""" + xep_0293 = XEP_0293(host) + + application_data = { + "payload_types": {96: {}} + } + transport_data = {} + + # SDP line: a=rtcp-fb:96 nack + attribute = "rtcp-fb" + parts = ["96", "nack"] + xep_0293._parse_sdp_a_trigger( + attribute=attribute, + parts=parts, + call_data={}, + metadata={}, + media_type="video", + application_data=application_data, + transport_data=transport_data, + ) + assert application_data["payload_types"][96][RTCP_FB_KEY][0] == ("nack", None, {}) + + def test_parse_sdp_rtcp_fb_trr_int_general(self, host): + """Parsing of a general RTCP feedback with trr-int SDP line.""" + xep_0293 = XEP_0293(host) + + application_data = {} + transport_data = {} + + # SDP line: a=rtcp-fb-trr-int:* 100 + attribute = "rtcp-fb-trr-int" + parts = ["*", "100"] + xep_0293._parse_sdp_a_trigger( + attribute=attribute, + parts=parts, + call_data={}, + metadata={}, + media_type="video", + application_data=application_data, + transport_data=transport_data, + ) + assert application_data["rtcp-fb-trr-int"] == 100 + + def test_parse_sdp_rtcp_fb_trr_int_specific(self, host): + """Parsing of a payload-specific RTCP feedback with trr-int SDP line.""" + xep_0293 = XEP_0293(host) + + application_data = { + "payload_types": {96: {}} + } + transport_data = {} + + # SDP line: a=rtcp-fb-trr-int:96 100 + attribute = "rtcp-fb-trr-int" + parts = ["96", "100"] + xep_0293._parse_sdp_a_trigger( + attribute=attribute, + parts=parts, + call_data={}, + metadata={}, + media_type="video", + application_data=application_data, + transport_data=transport_data, + ) + assert application_data["payload_types"][96]["rtcp-fb-trr-int"] == 100 + + def test_generate_sdp_session(self, host): + """Generation of SDP lines for session data.""" + xep_0293 = XEP_0293(host) + sdp_lines = [] + application_data = {RTCP_FB_KEY: [("nack", "pli", {})], "rtcp-fb-trr-int": 100} + + xep_0293._generate_sdp_content_trigger( + session={}, + local=True, + content_name="test", + content_data={}, + sdp_lines=sdp_lines, + application_data=application_data, + app_data_key="test", + media_data={}, + media="video", + ) + + assert sdp_lines[0] == "a=rtcp-fb:* nack pli" + assert sdp_lines[1] == "a=rtcp-fb:* trr-int 100" + + def test_generate_sdp_payload_type(self, host): + """Generation of SDP lines for each payload type.""" + xep_0293 = XEP_0293(host) + sdp_lines = [] + application_data = {} + media_data = { + "payload_types": {96: {RTCP_FB_KEY: [("nack", None, {})], "rtcp-fb-trr-int": 100}} + } + + xep_0293._generate_sdp_content_trigger( + session={}, + local=True, + content_name="test", + content_data={}, + sdp_lines=sdp_lines, + application_data=application_data, + app_data_key="test", + media_data=media_data, + media="video", + ) + + assert sdp_lines[0] == "a=rtcp-fb:96 nack" + assert sdp_lines[1] == "a=rtcp-fb:96 trr-int 100" + + def test_parse_description(self, host): + """Parsing of and elements from a description.""" + xep_0293 = XEP_0293(host) + + desc_element = xml_tools.parse( + f""" + + + + + """ + ) + + media_data = {} + xep_0293._parse_description_trigger(desc_element, media_data) + + assert media_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) + assert media_data["rtcp_fb_trr_int"][0] == 100 + + def test_parse_description_payload_type(self, host): + """Parsing of and elements from a payload type.""" + xep_0293 = XEP_0293(host) + + desc_element = xml_tools.parse( + f""" + + + + + + + """ + ) + + media_data = {} + payload_type_elt = desc_element.firstChildElement() + payload_type_data = {} + xep_0293._parse_description_payload_type_trigger( + desc_element, media_data, payload_type_elt, payload_type_data + ) + + assert payload_type_data[RTCP_FB_KEY][0] == ("nack", "pli", {}) + assert payload_type_data["rtcp_fb_trr_int"][0] == 100 + + def test_build_rtcp_fb_elements(self, host): + """Building the and elements.""" + xep_0293 = XEP_0293(host) + data = { + RTCP_FB_KEY: [("nack", "pli", {})], + "rtcp-fb-trr-int": 100 + } + + # Test _build_description_trigger + desc_elt = domish.Element((NS_JINGLE_RTP, "description")) + xep_0293._build_description_trigger(desc_elt, data, {}) + + rtcp_fb_elts = list(desc_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) + assert len(rtcp_fb_elts) == 1 + rtcp_fb_elt = rtcp_fb_elts[0] + assert rtcp_fb_elt["type"] == "nack" + assert rtcp_fb_elt["subtype"] == "pli" + + rtcp_fb_trr_int_elts = list(desc_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")) + assert len(rtcp_fb_trr_int_elts) == 1 + rtcp_fb_trr_int_elt = rtcp_fb_trr_int_elts[0] + assert int(rtcp_fb_trr_int_elt["value"]) == 100 + + # Test _build_description_payload_type_trigger + desc_elt = domish.Element((NS_JINGLE_RTP, "description")) + payload_type_elt = desc_elt.addElement((NS_JINGLE_RTP, "payload-type")) + xep_0293._build_description_payload_type_trigger(desc_elt, {}, data, payload_type_elt) + + rtcp_fb_elts = list(payload_type_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) + assert len(rtcp_fb_elts) == 1 + rtcp_fb_elt = rtcp_fb_elts[0] + assert rtcp_fb_elt["type"] == "nack" + assert rtcp_fb_elt["subtype"] == "pli" + + rtcp_fb_trr_int_elts = list(payload_type_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")) + assert len(rtcp_fb_trr_int_elts) == 1 + rtcp_fb_trr_int_elt = rtcp_fb_trr_int_elts[0] + assert int(rtcp_fb_trr_int_elt["value"]) == 100