Mercurial > libervia-backend
changeset 4067:3f62c2445df1
plugin XEP-0339: "Source-Specific Media Attributes in Jingle" implementation:
rel 439
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 01 Jun 2023 20:53:33 +0200 |
parents | e75827204fe0 |
children | 5241267a92b3 |
files | sat/plugins/plugin_xep_0339.py |
diffstat | 1 files changed, 199 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0339.py Thu Jun 01 20:53:33 2023 +0200 @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import List + +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import _ +from sat.core.log import getLogger +from sat.tools import xml_tools + +log = getLogger(__name__) + +NS_JINGLE_RTP_SSMA = "urn:xmpp:jingle:apps:rtp:ssma:0" + +PLUGIN_INFO = { + C.PI_NAME: "Source-Specific Media Attributes in Jingle", + C.PI_IMPORT_NAME: "XEP-0339", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0339"], + C.PI_DEPENDENCIES: ["XEP-0092", "XEP-0167"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0339", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Source-Specific Media Attributes in Jingle"""), +} + + +class XEP_0339: + def __init__(self, host): + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + self.host = host + host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) + host.trigger.add( + "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger + ) + host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger) + host.trigger.add("XEP-0167_build_description", self._build_description_trigger) + + def get_handler(self, client): + return XEP_0339_handler() + + def _parse_sdp_a_trigger( + self, + attribute: str, + parts: List[str], + call_data: dict, + metadata: dict, + media_type: str, + application_data: dict, + transport_data: dict, + ) -> None: + """Parse "ssrc" attributes""" + if attribute == "ssrc": + assert application_data is not None + ssrc_id = int(parts[0]) + + if len(parts) > 1: + name, *values = " ".join(parts[1:]).split(":", 1) + if values: + value = values[0] or None + else: + value = None + application_data.setdefault("ssrc", {}).setdefault(ssrc_id, {})[ + name + ] = value + else: + log.warning(f"no attribute in ssrc: {' '.join(parts)}") + application_data.setdefault("ssrc", {}).setdefault(ssrc_id, {}) + elif attribute == "ssrc-group": + assert application_data is not None + semantics, *ssrc_ids = parts + ssrc_ids = [int(ssrc_id) for ssrc_id in ssrc_ids] + application_data.setdefault("ssrc-group", {})[semantics] = ssrc_ids + elif attribute == "msid": + assert application_data is not None + application_data["msid"] = " ".join(parts) + + + def _generate_sdp_content_trigger( + self, + session: dict, + local: bool, + idx: int, + content_data: dict, + sdp_lines: List[str], + application_data: dict, + app_data_key: str, + media_data: dict, + media: str + ) -> None: + """Generate "msid" and "ssrc" attributes""" + if "msid" in media_data: + sdp_lines.append(f"a=msid:{media_data['msid']}") + + ssrc_data = media_data.get("ssrc", {}) + ssrc_group_data = media_data.get("ssrc-group", {}) + + for ssrc_id, attributes in ssrc_data.items(): + if not attributes: + # there are no attributes for this SSRC ID, we add a simple line with only + # the SSRC ID + sdp_lines.append(f"a=ssrc:{ssrc_id}") + else: + for attr_name, attr_value in attributes.items(): + if attr_value is not None: + sdp_lines.append(f"a=ssrc:{ssrc_id} {attr_name}:{attr_value}") + else: + sdp_lines.append(f"a=ssrc:{ssrc_id} {attr_name}") + for semantics, ssrc_ids in ssrc_group_data.items(): + ssrc_lines = " ".join(str(ssrc_id) for ssrc_id in ssrc_ids) + sdp_lines.append(f"a=ssrc-group:{semantics} {ssrc_lines}") + + def _parse_description_trigger( + self, desc_elt: domish.Element, media_data: dict + ) -> bool: + """Parse the <source> and <ssrc-group> elements""" + for source_elt in desc_elt.elements(NS_JINGLE_RTP_SSMA, "source"): + try: + ssrc_id = int(source_elt["ssrc"]) + media_data.setdefault("ssrc", {})[ssrc_id] = {} + for param_elt in source_elt.elements(NS_JINGLE_RTP_SSMA, "parameter"): + name = param_elt["name"] + value = param_elt.getAttribute("value") + media_data["ssrc"][ssrc_id][name] = value + if name == "msid" and "msid" not in media_data: + media_data["msid"] = value + except (KeyError, ValueError) as e: + log.warning(f"Error while parsing <source>: {e}\n{source_elt.toXml()}") + + for ssrc_group_elt in desc_elt.elements(NS_JINGLE_RTP_SSMA, "ssrc-group"): + try: + semantics = ssrc_group_elt["semantics"] + semantic_ids = media_data.setdefault("ssrc-group", {})[semantics] = [] + for source_elt in ssrc_group_elt.elements(NS_JINGLE_RTP_SSMA, "source"): + semantic_ids.append( + int(source_elt["ssrc"]) + ) + except (KeyError, ValueError) as e: + log.warning( + f"Error while parsing <ssrc-group>: {e}\n{ssrc_group_elt.toXml()}" + ) + + return True + + def _build_description_trigger( + self, desc_elt: domish.Element, media_data: dict, session: dict + ) -> bool: + """Build the <source> and <ssrc-group> elements if possible""" + for ssrc_id, parameters in media_data.get("ssrc", {}).items(): + if "msid" not in parameters and "msid" in media_data: + parameters["msid"] = media_data["msid"] + source_elt = desc_elt.addElement((NS_JINGLE_RTP_SSMA, "source")) + source_elt["ssrc"] = str(ssrc_id) + for name, value in parameters.items(): + param_elt = source_elt.addElement((NS_JINGLE_RTP_SSMA, "parameter")) + param_elt["name"] = name + if value is not None: + param_elt["value"] = value + + for semantics, ssrc_ids in media_data.get("ssrc-group", {}).items(): + ssrc_group_elt = desc_elt.addElement((NS_JINGLE_RTP_SSMA, "ssrc-group")) + ssrc_group_elt["semantics"] = semantics + for ssrc_id in ssrc_ids: + source_elt = ssrc_group_elt.addElement((NS_JINGLE_RTP_SSMA, "source")) + source_elt["ssrc"] = str(ssrc_id) + + return True + + +@implementer(iwokkel.IDisco) +class XEP_0339_handler(XMPPHandler): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE_RTP_SSMA)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []