Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0338.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0338.py@08ee0e623e7e |
children | 52a89ddf3087 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0338.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import List + +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core.constants import Const as C +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.core.core_types import SatXMPPEntity + +log = getLogger(__name__) + +NS_JINGLE_GROUPING = "urn:xmpp:jingle:apps:grouping:0" +NS_RFC_5888 = "urn:ietf:rfc:5888" + +PLUGIN_INFO = { + C.PI_NAME: "Jingle Grouping Framework", + C.PI_IMPORT_NAME: "XEP-0338", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0338"], + C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0338", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Jingle mapping of RFC 5888 SDP Grouping Framework"""), +} + + +class XEP_0338: + def __init__(self, host): + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + self._j = host.plugins["XEP-0166"] + host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) + host.trigger.add( + "XEP-0167_generate_sdp_session", self._generate_sdp_session_trigger + ) + host.trigger.add("XEP-0167_jingle_session_init", self._jingle_session_init_trigger) + host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger) + + def get_handler(self, client): + return XEP_0338_handler() + + def _parse_sdp_a_trigger( + self, + attribute: str, + parts: List[str], + call_data: dict, + metadata: dict, + media_type: str, + application_data: dict, + transport_data: dict, + ) -> None: + """Parse "group" attributes""" + if attribute == "group": + semantics = parts[0] + content_names = parts[1:] + metadata.setdefault("group", {})[semantics] = content_names + + def _generate_sdp_session_trigger( + self, + session: dict, + local: bool, + sdp_lines: List[str], + ) -> None: + """Generate "group" attributes""" + key = "metadata" if local else "peer_metadata" + group_data = session[key].get("group", {}) + + for semantics, content_names in group_data.items(): + sdp_lines.append(f"a=group:{semantics} {' '.join(content_names)}") + + def parse_group_element( + self, jingle_elt: domish.Element, session: dict + ) -> None: + """Parse the <group> and <content> elements""" + for group_elt in jingle_elt.elements(NS_JINGLE_GROUPING, "group"): + try: + metadata = session["peer_metadata"] + semantics = group_elt["semantics"] + group_content = metadata.setdefault("group", {})[semantics] = [] + for content_elt in group_elt.elements(NS_JINGLE_GROUPING, "content"): + group_content.append(content_elt["name"]) + except KeyError as e: + log.warning(f"Error while parsing <group>: {e}\n{group_elt.toXml()}") + + def add_group_element( + self, jingle_elt: domish.Element, session: dict + ) -> None: + """Build the <group> and <content> elements if possible""" + for semantics, content_names in session["metadata"].get("group", {}).items(): + group_elt = jingle_elt.addElement((NS_JINGLE_GROUPING, "group")) + group_elt["semantics"] = semantics + for content_name in content_names: + content_elt = group_elt.addElement((NS_JINGLE_GROUPING, "content")) + content_elt["name"] = content_name + + def _jingle_session_init_trigger( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + media: str, + media_data: dict, + desc_elt: domish.Element, + ) -> None: + jingle_elt = session["jingle_elt"] + self.add_group_element(jingle_elt, session) + + def _jingle_handler_trigger( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + desc_elt: domish.Element, + ) -> None: + # this is a session metadata, so we only generate it on the first content + if content_name == next(iter(session["contents"])) and action in ( + self._j.A_PREPARE_CONFIRMATION, + self._j.A_SESSION_INITIATE, + self._j.A_PREPARE_INITIATOR, + ): + jingle_elt = session["jingle_elt"] + self.parse_group_element(jingle_elt, session) + if action == self._j.A_SESSION_INITIATE: + self.add_group_element(jingle_elt, session) + + +@implementer(iwokkel.IDisco) +class XEP_0338_handler(XMPPHandler): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_RFC_5888)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []