diff libervia/backend/plugins/plugin_xep_0343.py @ 4231:e11b13418ba6

plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:57:23 +0200
parents
children 79c8a70e1813
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0343.py	Sat Apr 06 12:57:23 2024 +0200
@@ -0,0 +1,373 @@
+#!/usr/bin/env python3
+
+# Libervia plugin
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Final
+
+from twisted.internet import defer, reactor
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.xmpp import SatXMPPEntity
+from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData
+from libervia.backend.tools.common import data_format
+
+from .plugin_xep_0167 import mapping
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "WebRTC datachannels in Jingle",
+    C.PI_IMPORT_NAME: "XEP-0343",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "XEP_0343",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""),
+}
+NS_JINGLE_WEBRTC_DATACHANNELS: Final[
+    str
+] = "urn:xmpp:jingle:transports:webrtc-datachannel:1"
+
+
+class XEP_0343(BaseTransportHandler):
+    namespace = NS_JINGLE_WEBRTC_DATACHANNELS
+
+    def __init__(self, host):
+        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
+        self.host = host
+        self._ice_udp = host.plugins["XEP-0176"]
+        self._j = host.plugins["XEP-0166"]
+        self._j.register_transport(
+            NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000
+        )
+        self._rtp = host.plugins["XEP-0167"]
+        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
+        host.trigger.add(
+            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
+        )
+        host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer)
+        host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send)
+        host.trigger.add(
+            "XEP-0234_file_jingle_send", self._file_jingle_send
+        )
+
+    def get_handler(self, client: SatXMPPEntity):
+        return XEP0343Handler()
+
+    def is_usable(self, client, content_data: ContentData) -> bool:
+        try:
+            return content_data.app_kwargs["extra"]["webrtc"]
+        except KeyError:
+            return False
+
+    def _parse_sdp_a_trigger(
+        self,
+        attribute: str,
+        parts: list[str],
+        call_data: dict,
+        metadata: dict,
+        media_type: str,
+        application_data: dict,
+        transport_data: dict,
+    ) -> None:
+        """Parse "sctp-port" and "max-message-size" attributes"""
+        try:
+            if attribute == "sctp-port":
+                transport_data["sctp-port"] = int(parts[0])
+            elif attribute == "max-message-size":
+                transport_data["max-message-size"] = int(parts[0])
+        except ValueError:
+            log.warning(f"Can't parse value of {attribute}, ignoring: {parts}")
+
+    def _generate_sdp_content_trigger(
+        self,
+        session: dict,
+        local: bool,
+        idx: int,
+        content_data: dict,
+        sdp_lines: list[str],
+        application_data: dict,
+        app_data_key: str,
+        media_data: dict,
+        media: str
+    ) -> None:
+        """Generate "sctp-port" and "max-message-size" attributes"""
+        transport_data = content_data["transport_data"]
+        sctp_port = transport_data.get("sctp-port")
+        if sctp_port is not None:
+            sdp_lines.append(f"a=sctp-port:{sctp_port}")
+
+        max_message_size = transport_data.get("max-message-size")
+        if max_message_size is not None:
+            sdp_lines.append(f"a=max-message-size:{max_message_size}")
+
+    def _wrap_transport_element(
+        self,
+        transport_elt: domish.Element
+    ) -> None:
+        """Wrap the XEP-0176 transport in a transport with this XEP namespace
+
+        @param transport_elt: ICE UDP <transport>. Must be already a child of a <content>
+            element.
+        """
+        content_elt = transport_elt.parent
+        if content_elt is None or not content_elt.name == "content":
+            raise exceptions.InternalError("Was expecting <content> element.")
+        content_elt.children.remove(transport_elt)
+        wrapping_transport_elt = content_elt.addElement(
+            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport")
+        )
+        wrapping_transport_elt.addChild(transport_elt)
+
+    def _on_send_ice_buffer(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        content_name: str,
+        content_data: dict,
+        transport_elt: domish.Element,
+        iq_elt: domish.Element
+    ) -> bool:
+        if content_data["transport"].handler == self:
+            self._wrap_transport_element(transport_elt)
+        return True
+
+    def _on_ice_candidate_send(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        media_ice_data: dict[str, dict],
+        content_name: str,
+        content_data: dict,
+        iq_elt: domish.Element
+    ) -> bool:
+        if content_data["transport"].handler == self:
+            transport_elt = iq_elt.jingle.content.transport
+            if transport_elt.uri != self._ice_udp.namespace:
+                raise exceptions.InternalError("Was expecting an ICE UDP transport")
+            self._wrap_transport_element(transport_elt)
+        return True
+
+    async def _file_jingle_send(
+        self,
+        client: SatXMPPEntity,
+        peer_jid: jid.JID,
+        content: dict
+    ) -> None:
+        call_data = content["app_kwargs"]["extra"].pop("call_data", None)
+        if call_data:
+            metadata = self._rtp.parse_call_data(call_data)
+            try:
+                application_data = call_data["application"]
+            except KeyError:
+                raise exceptions.DataError(
+                    '"call_data" must have an application media.'
+                )
+            try:
+                content["transport_data"] = {
+                    "sctp-port": metadata["sctp-port"],
+                    "max-message-size": metadata.get("max-message-size", 65536),
+                    "local_ice_data": {
+                        "ufrag": metadata["ice-ufrag"],
+                        "pwd": metadata["ice-pwd"],
+                        "candidates": application_data.pop("ice-candidates"),
+                        "fingerprint": application_data.pop("fingerprint", {}),
+                    }
+                }
+            except KeyError as e:
+                raise exceptions.DataError(f"Mandatory key is missing: {e}")
+
+    async def jingle_session_init(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        content_name: str,
+    ) -> domish.Element:
+        content_data = session["contents"][content_name]
+        transport_data = content_data["transport_data"]
+        ice_transport_elt = await self._ice_udp.jingle_session_init(
+            client,
+            session,
+            content_name
+        )
+        transport_elt = domish.Element(
+            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"),
+            attribs={
+                "sctp-port": str(transport_data["sctp-port"]),
+                "max-message-size": str(transport_data["max-message-size"])
+            }
+        )
+        transport_elt.addChild(ice_transport_elt)
+        return transport_elt
+
+    async def _call_ice_udp_handler(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        transport_elt: domish.Element,
+    ):
+        """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it"""
+        try:
+            ice_transport_elt = next(
+                transport_elt.elements(self._ice_udp.namespace, "transport")
+            )
+        except StopIteration:
+            raise exceptions.DataError("Missing ICE UDP <transport> element.")
+        else:
+            await self._ice_udp.jingle_handler(
+                client, action, session, content_name, ice_transport_elt
+            )
+
+    async def jingle_handler(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        transport_elt: domish.Element,
+    ) -> domish.Element:
+        """Handle Jingle requests
+
+        @param client: The SatXMPPEntity instance.
+        @param action: The action to be performed with the session.
+        @param session: A dictionary containing the session information.
+        @param content_name: The name of the content.
+        @param transport_elt: The domish.Element instance representing the transport
+            element.
+
+        @return: <transport> element
+        """
+        content_data = session["contents"][content_name]
+        transport_data = content_data["transport_data"]
+        if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
+            session.setdefault("metadata", {})
+            session.setdefault("peer_metadata", {})
+            # we have to set application data despite being a transport handler,
+            # because the SDP generation needs application data
+            application_data = content_data["application_data"]
+            application_data.setdefault("peer_data", {})
+            application_data.setdefault("media", "application")
+
+        if action == self._j.A_PREPARE_CONFIRMATION:
+            await self._call_ice_udp_handler(
+                client, action, session, content_name, transport_elt
+            )
+            try:
+                transport_data["sctp-port"] = int(transport_elt["sctp-port"])
+                transport_data["max-message-size"] = int(
+                    transport_elt.getAttribute("max-message-size", 65536)
+                )
+            except (KeyError, ValueError):
+                raise exceptions.DataError(
+                    f"Invalid datachannel signaling element: {transport_elt.toXml()}"
+                )
+            transport_data["webrtc"] = True
+        elif action in (
+            self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR,
+            self._j.A_TRANSPORT_INFO
+        ):
+            await self._call_ice_udp_handler(
+                client, action, session, content_name, transport_elt
+            )
+        elif action == self._j.A_SESSION_ACCEPT:
+            await self._call_ice_udp_handler(
+                client, action, session, content_name, transport_elt
+            )
+            answer_sdp = mapping.generate_sdp_from_session(session)
+            self.host.bridge.call_setup(
+                session["id"],
+                data_format.serialise(
+                    {
+                        "role": session["role"],
+                        "sdp": answer_sdp,
+                    }
+                ),
+                client.profile,
+            )
+        elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
+            pass
+        elif action == self._j.A_START:
+            pass
+        elif action == self._j.A_SESSION_INITIATE:
+            # responder side
+
+            sdp = mapping.generate_sdp_from_session(session)
+            session["answer_sdp_d"] = answer_sdp_d = defer.Deferred()
+            # we should have the answer long before 2 min
+            answer_sdp_d.addTimeout(2 * 60, reactor)
+
+            self.host.bridge.call_setup(
+                session["id"],
+                data_format.serialise(
+                    {
+                        "role": session["role"],
+                        "sdp": sdp,
+                    }
+                ),
+                client.profile,
+            )
+
+            answer_sdp = await answer_sdp_d
+            parsed_answer = mapping.parse_sdp(answer_sdp)
+            session["metadata"].update(parsed_answer["metadata"])
+            contents = session["contents"]
+            if len(contents) != 1:
+                raise NotImplementedError(
+                    "Only a singlecontent is supported at the moment."
+                )
+            content = next(iter(contents.values()))
+            media_data = parsed_answer["application"]
+            application_data = content["application_data"]
+            application_data["local_data"] = media_data["application_data"]
+            transport_data = content["transport_data"]
+            local_ice_data = media_data["transport_data"]
+            transport_data["local_ice_data"] = local_ice_data
+            transport_elt.children.clear()
+            ice_transport_elt = await self._ice_udp.jingle_handler(
+                client, action, session, content_name, transport_elt
+            )
+            transport_elt.addChild(ice_transport_elt)
+        elif action == self._j.A_DESTROY:
+            # the transport is replaced (fallback ?)
+            pass
+        else:
+            log.warning(f"FIXME: unmanaged action {action}")
+
+        return transport_elt
+
+
+@implementer(iwokkel.IDisco)
+class XEP0343Handler(XMPPHandler):
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []