Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0293.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | 4b842c1fb686 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import List from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger log = getLogger(__name__) NS_JINGLE_RTP_RTCP_FB = "urn:xmpp:jingle:apps:rtp:rtcp-fb:0" PLUGIN_INFO = { C.PI_NAME: "Jingle RTP Feedback Negotiation", C.PI_IMPORT_NAME: "XEP-0293", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0293"], C.PI_DEPENDENCIES: ["XEP-0092", "XEP-0166", "XEP-0167"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0293", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Jingle RTP Feedback Negotiation"""), } RTCP_FB_KEY = "rtcp-fb" class XEP_0293: def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) host.trigger.add( "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger ) host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger) host.trigger.add( "XEP-0167_parse_description_payload_type", self._parse_description_payload_type_trigger, ) host.trigger.add("XEP-0167_build_description", self._build_description_trigger) host.trigger.add( "XEP-0167_build_description_payload_type", self._build_description_payload_type_trigger, ) def get_handler(self, client): return XEP_0293_handler() ## SDP def _parse_sdp_a_trigger( self, attribute: str, parts: List[str], call_data: dict, metadata: dict, media_type: str, application_data: dict, transport_data: dict, ) -> None: """Parse "rtcp-fb" and "rtcp-fb-trr-int" attributes @param attribute: The attribute being parsed. @param parts: The list of parts in the attribute. @param call_data: The call data dict. @param metadata: The metadata dict. @param media_type: The media type (e.g., audio, video). @param application_data: The application data dict. @param transport_data: The transport data dict. @param payload_map: The payload map dict. """ if attribute == "rtcp-fb": pt_id = parts[0] feedback_type = parts[1] feedback_subtype = None parameters = {} # Check if there are extra parameters if len(parts) > 2: feedback_subtype = parts[2] if len(parts) > 3: for parameter in parts[3:]: name, _, value = parameter.partition("=") parameters[name] = value or None # Check if this feedback is linked to a payload type if pt_id == "*": # Not linked to a payload type, add to application data application_data.setdefault(RTCP_FB_KEY, []).append( (feedback_type, feedback_subtype, parameters) ) else: payload_types = application_data.get("payload_types", {}) try: payload_type = payload_types[int(pt_id)] except KeyError: log.warning( f"Got reference to unknown payload type {pt_id}: " f"{' '.join(parts)}" ) else: # Linked to a payload type, add to payload data payload_type.setdefault(RTCP_FB_KEY, []).append( (feedback_type, feedback_subtype, parameters) ) elif attribute == "rtcp-fb-trr-int": pt_id = parts[0] # Payload type ID interval = int(parts[1]) # Check if this interval is linked to a payload type if pt_id == "*": # Not linked to a payload type, add to application data application_data["rtcp-fb-trr-int"] = interval else: payload_types = application_data.get("payload_types", {}) try: payload_type = payload_types[int(pt_id)] except KeyError: log.warning( f"Got reference to unknown payload type {pt_id}: " f"{' '.join(parts)}" ) else: # Linked to a payload type, add to payload data payload_type["rtcp-fb-trr-int"] = interval def _generate_rtcp_fb_lines( self, data: dict, pt_id: str, sdp_lines: List[str] ) -> None: for type_, subtype, parameters in data.get(RTCP_FB_KEY, []): parameters_strs = [ f"{k}={v}" if v is not None else k for k, v in parameters.items() ] parameters_str = " ".join(parameters_strs) sdp_line = f"a=rtcp-fb:{pt_id} {type_}" if subtype: sdp_line += f" {subtype}" if parameters_str: sdp_line += f" {parameters_str}" sdp_lines.append(sdp_line) def _generate_rtcp_fb_trr_int_lines( self, data: dict, pt_id: str, sdp_lines: List[str] ) -> None: if "rtcp-fb-trr-int" not in data: return sdp_lines.append(f"a=rtcp-fb:{pt_id} trr-int {data['rtcp-fb-trr-int']}") def _generate_sdp_content_trigger( self, session: dict, local: bool, content_name: str, content_data: dict, sdp_lines: List[str], application_data: dict, app_data_key: str, media_data: dict, media: str, ) -> None: """Generate SDP attributes "rtcp-fb" and "rtcp-fb-trr-int" from application data. @param session: The session data. @param local: Whether this is local or remote content. @param content_name: The name of the content. @param content_data: The data of the content. @param sdp_lines: The list of SDP lines to append to. @param application_data: The application data dict. @param app_data_key: The key for the application data. @param media_data: The media data dict. @param media: The media type (e.g., audio, video). """ # Generate lines for application data self._generate_rtcp_fb_lines(application_data, "*", sdp_lines) self._generate_rtcp_fb_trr_int_lines(application_data, "*", sdp_lines) # Generate lines for each payload type for pt_id, payload_data in media_data.get("payload_types", {}).items(): self._generate_rtcp_fb_lines(payload_data, pt_id, sdp_lines) self._generate_rtcp_fb_trr_int_lines(payload_data, pt_id, sdp_lines) ## XML def _parse_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None: """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements. @param parent_elt: The parent domish.Element. @param data: The data dict to populate. """ for rtcp_fb_elt in parent_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb"): try: type_ = rtcp_fb_elt["type"] subtype = rtcp_fb_elt.getAttribute("subtype") parameters = {} for parameter_elt in rtcp_fb_elt.elements( NS_JINGLE_RTP_RTCP_FB, "parameter" ): parameters[parameter_elt["name"]] = parameter_elt.getAttribute( "value" ) data.setdefault(RTCP_FB_KEY, []).append((type_, subtype, parameters)) except (KeyError, ValueError) as e: log.warning(f"Error while parsing <rtcp-fb>: {e}\n{rtcp_fb_elt.toXml()}") for rtcp_fb_trr_int_elt in parent_elt.elements( NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int" ): try: interval_value = int(rtcp_fb_trr_int_elt["value"]) data.setdefault("rtcp_fb_trr_int", []).append(interval_value) except (KeyError, ValueError) as e: log.warning( f"Error while parsing <rtcp-fb-trr-int>: {e}\n" f"{rtcp_fb_trr_int_elt.toXml()}" ) def _parse_description_trigger( self, desc_elt: domish.Element, media_data: dict ) -> None: """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a description. @param desc_elt: The <description> domish.Element. @param media_data: The media data dict to populate. """ self._parse_rtcp_fb_elements(desc_elt, media_data) def _parse_description_payload_type_trigger( self, desc_elt: domish.Element, media_data: dict, payload_type_elt: domish.Element, payload_type_data: dict, ) -> None: """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a payload type. @param desc_elt: The <description> domish.Element. @param media_data: The media data dict. @param payload_type_elt: The <payload-type> domish.Element. @param payload_type_data: The payload type data dict to populate. """ self._parse_rtcp_fb_elements(payload_type_elt, payload_type_data) def build_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None: """Helper method to build the <rtcp-fb> and <rtcp-fb-trr-int> elements""" for type_, subtype, parameters in data.get(RTCP_FB_KEY, []): rtcp_fb_elt = parent_elt.addElement((NS_JINGLE_RTP_RTCP_FB, "rtcp-fb")) rtcp_fb_elt["type"] = type_ if subtype: rtcp_fb_elt["subtype"] = subtype for name, value in parameters.items(): param_elt = rtcp_fb_elt.addElement(name) if value is not None: param_elt.addContent(str(value)) if "rtcp-fb-trr-int" in data: rtcp_fb_trr_int_elt = parent_elt.addElement( (NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int") ) rtcp_fb_trr_int_elt["value"] = str(data["rtcp-fb-trr-int"]) def _build_description_payload_type_trigger( self, desc_elt: domish.Element, media_data: dict, payload_type: dict, payload_type_elt: domish.Element, ) -> None: """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a payload type""" self.build_rtcp_fb_elements(payload_type_elt, payload_type) def _build_description_trigger( self, desc_elt: domish.Element, media_data: dict, session: dict ) -> None: """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a media description""" self.build_rtcp_fb_elements(desc_elt, media_data) @implementer(iwokkel.IDisco) class XEP_0293_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_RTP_RTCP_FB)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []