diff libervia/backend/plugins/plugin_xep_0338.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0338.py@08ee0e623e7e
children 52a89ddf3087
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0338.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,156 @@
+#!/usr/bin/env python3
+
+# Libervia plugin
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import List
+
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.core_types import SatXMPPEntity
+
+log = getLogger(__name__)
+
+NS_JINGLE_GROUPING = "urn:xmpp:jingle:apps:grouping:0"
+NS_RFC_5888 = "urn:ietf:rfc:5888"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Jingle Grouping Framework",
+    C.PI_IMPORT_NAME: "XEP-0338",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0338"],
+    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "XEP_0338",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Jingle mapping of RFC 5888 SDP Grouping Framework"""),
+}
+
+
+class XEP_0338:
+    def __init__(self, host):
+        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
+        self._j = host.plugins["XEP-0166"]
+        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
+        host.trigger.add(
+            "XEP-0167_generate_sdp_session", self._generate_sdp_session_trigger
+        )
+        host.trigger.add("XEP-0167_jingle_session_init", self._jingle_session_init_trigger)
+        host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)
+
+    def get_handler(self, client):
+        return XEP_0338_handler()
+
+    def _parse_sdp_a_trigger(
+        self,
+        attribute: str,
+        parts: List[str],
+        call_data: dict,
+        metadata: dict,
+        media_type: str,
+        application_data: dict,
+        transport_data: dict,
+    ) -> None:
+        """Parse "group" attributes"""
+        if attribute == "group":
+            semantics = parts[0]
+            content_names = parts[1:]
+            metadata.setdefault("group", {})[semantics] = content_names
+
+    def _generate_sdp_session_trigger(
+        self,
+        session: dict,
+        local: bool,
+        sdp_lines: List[str],
+    ) -> None:
+        """Generate "group" attributes"""
+        key = "metadata" if local else "peer_metadata"
+        group_data = session[key].get("group", {})
+
+        for semantics, content_names in group_data.items():
+            sdp_lines.append(f"a=group:{semantics} {' '.join(content_names)}")
+
+    def parse_group_element(
+        self, jingle_elt: domish.Element, session: dict
+    ) -> None:
+        """Parse the <group> and <content> elements"""
+        for group_elt in jingle_elt.elements(NS_JINGLE_GROUPING, "group"):
+            try:
+                metadata = session["peer_metadata"]
+                semantics = group_elt["semantics"]
+                group_content = metadata.setdefault("group", {})[semantics] = []
+                for content_elt in group_elt.elements(NS_JINGLE_GROUPING, "content"):
+                    group_content.append(content_elt["name"])
+            except KeyError as e:
+                log.warning(f"Error while parsing <group>: {e}\n{group_elt.toXml()}")
+
+    def add_group_element(
+        self, jingle_elt: domish.Element, session: dict
+    ) -> None:
+        """Build the <group> and <content> elements if possible"""
+        for semantics, content_names in session["metadata"].get("group", {}).items():
+            group_elt = jingle_elt.addElement((NS_JINGLE_GROUPING, "group"))
+            group_elt["semantics"] = semantics
+            for content_name in content_names:
+                content_elt = group_elt.addElement((NS_JINGLE_GROUPING, "content"))
+                content_elt["name"] = content_name
+
+    def _jingle_session_init_trigger(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        content_name: str,
+        media: str,
+        media_data: dict,
+        desc_elt: domish.Element,
+    ) -> None:
+        jingle_elt = session["jingle_elt"]
+        self.add_group_element(jingle_elt, session)
+
+    def _jingle_handler_trigger(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        desc_elt: domish.Element,
+    ) -> None:
+        # this is a session metadata, so we only generate it on the first content
+        if content_name == next(iter(session["contents"])) and action in (
+            self._j.A_PREPARE_CONFIRMATION,
+            self._j.A_SESSION_INITIATE,
+            self._j.A_PREPARE_INITIATOR,
+        ):
+            jingle_elt = session["jingle_elt"]
+            self.parse_group_element(jingle_elt, session)
+            if action == self._j.A_SESSION_INITIATE:
+                self.add_group_element(jingle_elt, session)
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0338_handler(XMPPHandler):
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_RFC_5888)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []