Mercurial > libervia-backend
diff sat/plugins/plugin_xep_0293.py @ 4060:fce92ba311f4
plugin XEP-0293: "Jingle RTP Feedback Negotiation" implementation:
rel 437
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 29 May 2023 17:58:03 +0200 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0293.py Mon May 29 17:58:03 2023 +0200 @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import List + +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from sat.core.constants import Const as C +from sat.core.i18n import _ +from sat.core.log import getLogger + +log = getLogger(__name__) + +NS_JINGLE_RTP_RTCP_FB = "urn:xmpp:jingle:apps:rtp:rtcp-fb:0" + +PLUGIN_INFO = { + C.PI_NAME: "Jingle RTP Feedback Negotiation", + C.PI_IMPORT_NAME: "XEP-0293", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0293"], + C.PI_DEPENDENCIES: ["XEP-0092", "XEP-0166", "XEP-0167"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0293", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Jingle RTP Feedback Negotiation"""), +} + +RTCP_FB_KEY = "rtcp-fb" + + +class XEP_0293: + def __init__(self, host): + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) + host.trigger.add( + "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger + ) + host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger) + host.trigger.add( + "XEP-0167_parse_description_payload_type", + self._parse_description_payload_type_trigger, + ) + host.trigger.add("XEP-0167_build_description", self._build_description_trigger) + host.trigger.add( + "XEP-0167_build_description_payload_type", + self._build_description_payload_type_trigger, + ) + + def get_handler(self, client): + return XEP_0293_handler() + + ## SDP + + def _parse_sdp_a_trigger( + self, + attribute: str, + parts: List[str], + call_data: dict, + metadata: dict, + media_type: str, + application_data: dict, + transport_data: dict, + ) -> None: + """Parse "rtcp-fb" and "rtcp-fb-trr-int" attributes + + @param attribute: The attribute being parsed. + @param parts: The list of parts in the attribute. + @param call_data: The call data dict. + @param metadata: The metadata dict. + @param media_type: The media type (e.g., audio, video). + @param application_data: The application data dict. + @param transport_data: The transport data dict. + @param payload_map: The payload map dict. + """ + if attribute == "rtcp-fb": + pt_id = parts[0] + feedback_type = parts[1] + + feedback_subtype = None + parameters = {} + + # Check if there are extra parameters + if len(parts) > 2: + feedback_subtype = parts[2] + + if len(parts) > 3: + for parameter in parts[3:]: + name, _, value = parameter.partition("=") + parameters[name] = value or None + + # Check if this feedback is linked to a payload type + if pt_id == "*": + # Not linked to a payload type, add to application data + application_data.setdefault(RTCP_FB_KEY, []).append( + (feedback_type, feedback_subtype, parameters) + ) + else: + payload_types = application_data.get("payload_types", {}) + try: + payload_type = payload_types[int(pt_id)] + except KeyError: + log.warning( + f"Got reference to unknown payload type {pt_id}: " + f"{' '.join(parts)}" + ) + else: + # Linked to a payload type, add to payload data + payload_type.setdefault(RTCP_FB_KEY, []).append( + (feedback_type, feedback_subtype, parameters) + ) + + elif attribute == "rtcp-fb-trr-int": + pt_id = parts[0] # Payload type ID + interval = int(parts[1]) + + # Check if this interval is linked to a payload type + if pt_id == "*": + # Not linked to a payload type, add to application data + application_data["rtcp-fb-trr-int"] = interval + else: + payload_types = application_data.get("payload_types", {}) + try: + payload_type = payload_types[int(pt_id)] + except KeyError: + log.warning( + f"Got reference to unknown payload type {pt_id}: " + f"{' '.join(parts)}" + ) + else: + # Linked to a payload type, add to payload data + payload_type["rtcp-fb-trr-int"] = interval + + def _generate_rtcp_fb_lines( + self, data: dict, pt_id: str, sdp_lines: List[str] + ) -> None: + for type_, subtype, parameters in data.get(RTCP_FB_KEY, []): + parameters_strs = [ + f"{k}={v}" if v is not None else k for k, v in parameters.items() + ] + parameters_str = " ".join(parameters_strs) + + sdp_line = f"a=rtcp-fb:{pt_id} {type_}" + if subtype: + sdp_line += f" {subtype}" + if parameters_str: + sdp_line += f" {parameters_str}" + sdp_lines.append(sdp_line) + + def _generate_rtcp_fb_trr_int_lines( + self, data: dict, pt_id: str, sdp_lines: List[str] + ) -> None: + if "rtcp-fb-trr-int" not in data: + return + sdp_lines.append(f"a=rtcp-fb:{pt_id} trr-int {data['rtcp-fb-trr-int']}") + + def _generate_sdp_content_trigger( + self, + session: dict, + local: bool, + content_name: str, + content_data: dict, + sdp_lines: List[str], + application_data: dict, + app_data_key: str, + media_data: dict, + media: str, + ) -> None: + """Generate SDP attributes "rtcp-fb" and "rtcp-fb-trr-int" from application data. + + @param session: The session data. + @param local: Whether this is local or remote content. + @param content_name: The name of the content. + @param content_data: The data of the content. + @param sdp_lines: The list of SDP lines to append to. + @param application_data: The application data dict. + @param app_data_key: The key for the application data. + @param media_data: The media data dict. + @param media: The media type (e.g., audio, video). + """ + # Generate lines for application data + self._generate_rtcp_fb_lines(application_data, "*", sdp_lines) + self._generate_rtcp_fb_trr_int_lines(application_data, "*", sdp_lines) + + # Generate lines for each payload type + for pt_id, payload_data in media_data.get("payload_types", {}).items(): + self._generate_rtcp_fb_lines(payload_data, pt_id, sdp_lines) + self._generate_rtcp_fb_trr_int_lines(payload_data, pt_id, sdp_lines) + + ## XML + + def _parse_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None: + """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements. + + @param parent_elt: The parent domish.Element. + @param data: The data dict to populate. + """ + for rtcp_fb_elt in parent_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb"): + try: + type_ = rtcp_fb_elt["type"] + subtype = rtcp_fb_elt.getAttribute("subtype") + + parameters = {} + for parameter_elt in rtcp_fb_elt.elements( + NS_JINGLE_RTP_RTCP_FB, "parameter" + ): + parameters[parameter_elt["name"]] = parameter_elt.getAttribute( + "value" + ) + + data.setdefault(RTCP_FB_KEY, []).append((type_, subtype, parameters)) + except (KeyError, ValueError) as e: + log.warning(f"Error while parsing <rtcp-fb>: {e}\n{rtcp_fb_elt.toXml()}") + + for rtcp_fb_trr_int_elt in parent_elt.elements( + NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int" + ): + try: + interval_value = int(rtcp_fb_trr_int_elt["value"]) + data.setdefault("rtcp_fb_trr_int", []).append(interval_value) + except (KeyError, ValueError) as e: + log.warning( + f"Error while parsing <rtcp-fb-trr-int>: {e}\n" + f"{rtcp_fb_trr_int_elt.toXml()}" + ) + + def _parse_description_trigger( + self, desc_elt: domish.Element, media_data: dict + ) -> None: + """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a description. + + @param desc_elt: The <description> domish.Element. + @param media_data: The media data dict to populate. + """ + self._parse_rtcp_fb_elements(desc_elt, media_data) + + def _parse_description_payload_type_trigger( + self, + desc_elt: domish.Element, + media_data: dict, + payload_type_elt: domish.Element, + payload_type_data: dict, + ) -> None: + """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a payload type. + + @param desc_elt: The <description> domish.Element. + @param media_data: The media data dict. + @param payload_type_elt: The <payload-type> domish.Element. + @param payload_type_data: The payload type data dict to populate. + """ + self._parse_rtcp_fb_elements(payload_type_elt, payload_type_data) + + def build_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None: + """Helper method to build the <rtcp-fb> and <rtcp-fb-trr-int> elements""" + for type_, subtype, parameters in data.get(RTCP_FB_KEY, []): + rtcp_fb_elt = parent_elt.addElement((NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) + rtcp_fb_elt["type"] = type_ + if subtype: + rtcp_fb_elt["subtype"] = subtype + for name, value in parameters.items(): + param_elt = rtcp_fb_elt.addElement(name) + if value is not None: + param_elt.addContent(str(value)) + + if "rtcp-fb-trr-int" in data: + rtcp_fb_trr_int_elt = parent_elt.addElement( + (NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int") + ) + rtcp_fb_trr_int_elt["value"] = str(data["rtcp-fb-trr-int"]) + + def _build_description_payload_type_trigger( + self, + desc_elt: domish.Element, + media_data: dict, + payload_type: dict, + payload_type_elt: domish.Element, + ) -> None: + """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a payload type""" + self.build_rtcp_fb_elements(payload_type_elt, payload_type) + + def _build_description_trigger( + self, desc_elt: domish.Element, media_data: dict, session: dict + ) -> None: + """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a media description""" + self.build_rtcp_fb_elements(desc_elt, media_data) + + +@implementer(iwokkel.IDisco) +class XEP_0293_handler(XMPPHandler): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE_RTP_RTCP_FB)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []