view libervia/backend/plugins/plugin_xep_0343.py @ 4231:e11b13418ba6

plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:57:23 +0200 (9 months ago)
parents
children 79c8a70e1813
line wrap: on
line source
#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Final

from twisted.internet import defer, reactor
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.xmpp import SatXMPPEntity
from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData
from libervia.backend.tools.common import data_format

from .plugin_xep_0167 import mapping

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "WebRTC datachannels in Jingle",
    C.PI_IMPORT_NAME: "XEP-0343",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0343",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""),
}
NS_JINGLE_WEBRTC_DATACHANNELS: Final[
    str
] = "urn:xmpp:jingle:transports:webrtc-datachannel:1"


class XEP_0343(BaseTransportHandler):
    namespace = NS_JINGLE_WEBRTC_DATACHANNELS

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._ice_udp = host.plugins["XEP-0176"]
        self._j = host.plugins["XEP-0166"]
        self._j.register_transport(
            NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000
        )
        self._rtp = host.plugins["XEP-0167"]
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
        )
        host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer)
        host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send)
        host.trigger.add(
            "XEP-0234_file_jingle_send", self._file_jingle_send
        )

    def get_handler(self, client: SatXMPPEntity):
        return XEP0343Handler()

    def is_usable(self, client, content_data: ContentData) -> bool:
        try:
            return content_data.app_kwargs["extra"]["webrtc"]
        except KeyError:
            return False

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: list[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: dict,
        transport_data: dict,
    ) -> None:
        """Parse "sctp-port" and "max-message-size" attributes"""
        try:
            if attribute == "sctp-port":
                transport_data["sctp-port"] = int(parts[0])
            elif attribute == "max-message-size":
                transport_data["max-message-size"] = int(parts[0])
        except ValueError:
            log.warning(f"Can't parse value of {attribute}, ignoring: {parts}")

    def _generate_sdp_content_trigger(
        self,
        session: dict,
        local: bool,
        idx: int,
        content_data: dict,
        sdp_lines: list[str],
        application_data: dict,
        app_data_key: str,
        media_data: dict,
        media: str
    ) -> None:
        """Generate "sctp-port" and "max-message-size" attributes"""
        transport_data = content_data["transport_data"]
        sctp_port = transport_data.get("sctp-port")
        if sctp_port is not None:
            sdp_lines.append(f"a=sctp-port:{sctp_port}")

        max_message_size = transport_data.get("max-message-size")
        if max_message_size is not None:
            sdp_lines.append(f"a=max-message-size:{max_message_size}")

    def _wrap_transport_element(
        self,
        transport_elt: domish.Element
    ) -> None:
        """Wrap the XEP-0176 transport in a transport with this XEP namespace

        @param transport_elt: ICE UDP <transport>. Must be already a child of a <content>
            element.
        """
        content_elt = transport_elt.parent
        if content_elt is None or not content_elt.name == "content":
            raise exceptions.InternalError("Was expecting <content> element.")
        content_elt.children.remove(transport_elt)
        wrapping_transport_elt = content_elt.addElement(
            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport")
        )
        wrapping_transport_elt.addChild(transport_elt)

    def _on_send_ice_buffer(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
        content_data: dict,
        transport_elt: domish.Element,
        iq_elt: domish.Element
    ) -> bool:
        if content_data["transport"].handler == self:
            self._wrap_transport_element(transport_elt)
        return True

    def _on_ice_candidate_send(
        self,
        client: SatXMPPEntity,
        session: dict,
        media_ice_data: dict[str, dict],
        content_name: str,
        content_data: dict,
        iq_elt: domish.Element
    ) -> bool:
        if content_data["transport"].handler == self:
            transport_elt = iq_elt.jingle.content.transport
            if transport_elt.uri != self._ice_udp.namespace:
                raise exceptions.InternalError("Was expecting an ICE UDP transport")
            self._wrap_transport_element(transport_elt)
        return True

    async def _file_jingle_send(
        self,
        client: SatXMPPEntity,
        peer_jid: jid.JID,
        content: dict
    ) -> None:
        call_data = content["app_kwargs"]["extra"].pop("call_data", None)
        if call_data:
            metadata = self._rtp.parse_call_data(call_data)
            try:
                application_data = call_data["application"]
            except KeyError:
                raise exceptions.DataError(
                    '"call_data" must have an application media.'
                )
            try:
                content["transport_data"] = {
                    "sctp-port": metadata["sctp-port"],
                    "max-message-size": metadata.get("max-message-size", 65536),
                    "local_ice_data": {
                        "ufrag": metadata["ice-ufrag"],
                        "pwd": metadata["ice-pwd"],
                        "candidates": application_data.pop("ice-candidates"),
                        "fingerprint": application_data.pop("fingerprint", {}),
                    }
                }
            except KeyError as e:
                raise exceptions.DataError(f"Mandatory key is missing: {e}")

    async def jingle_session_init(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
    ) -> domish.Element:
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        ice_transport_elt = await self._ice_udp.jingle_session_init(
            client,
            session,
            content_name
        )
        transport_elt = domish.Element(
            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"),
            attribs={
                "sctp-port": str(transport_data["sctp-port"]),
                "max-message-size": str(transport_data["max-message-size"])
            }
        )
        transport_elt.addChild(ice_transport_elt)
        return transport_elt

    async def _call_ice_udp_handler(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        transport_elt: domish.Element,
    ):
        """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it"""
        try:
            ice_transport_elt = next(
                transport_elt.elements(self._ice_udp.namespace, "transport")
            )
        except StopIteration:
            raise exceptions.DataError("Missing ICE UDP <transport> element.")
        else:
            await self._ice_udp.jingle_handler(
                client, action, session, content_name, ice_transport_elt
            )

    async def jingle_handler(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        transport_elt: domish.Element,
    ) -> domish.Element:
        """Handle Jingle requests

        @param client: The SatXMPPEntity instance.
        @param action: The action to be performed with the session.
        @param session: A dictionary containing the session information.
        @param content_name: The name of the content.
        @param transport_elt: The domish.Element instance representing the transport
            element.

        @return: <transport> element
        """
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
            session.setdefault("metadata", {})
            session.setdefault("peer_metadata", {})
            # we have to set application data despite being a transport handler,
            # because the SDP generation needs application data
            application_data = content_data["application_data"]
            application_data.setdefault("peer_data", {})
            application_data.setdefault("media", "application")

        if action == self._j.A_PREPARE_CONFIRMATION:
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
            try:
                transport_data["sctp-port"] = int(transport_elt["sctp-port"])
                transport_data["max-message-size"] = int(
                    transport_elt.getAttribute("max-message-size", 65536)
                )
            except (KeyError, ValueError):
                raise exceptions.DataError(
                    f"Invalid datachannel signaling element: {transport_elt.toXml()}"
                )
            transport_data["webrtc"] = True
        elif action in (
            self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR,
            self._j.A_TRANSPORT_INFO
        ):
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
        elif action == self._j.A_SESSION_ACCEPT:
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
            answer_sdp = mapping.generate_sdp_from_session(session)
            self.host.bridge.call_setup(
                session["id"],
                data_format.serialise(
                    {
                        "role": session["role"],
                        "sdp": answer_sdp,
                    }
                ),
                client.profile,
            )
        elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
            pass
        elif action == self._j.A_START:
            pass
        elif action == self._j.A_SESSION_INITIATE:
            # responder side

            sdp = mapping.generate_sdp_from_session(session)
            session["answer_sdp_d"] = answer_sdp_d = defer.Deferred()
            # we should have the answer long before 2 min
            answer_sdp_d.addTimeout(2 * 60, reactor)

            self.host.bridge.call_setup(
                session["id"],
                data_format.serialise(
                    {
                        "role": session["role"],
                        "sdp": sdp,
                    }
                ),
                client.profile,
            )

            answer_sdp = await answer_sdp_d
            parsed_answer = mapping.parse_sdp(answer_sdp)
            session["metadata"].update(parsed_answer["metadata"])
            contents = session["contents"]
            if len(contents) != 1:
                raise NotImplementedError(
                    "Only a singlecontent is supported at the moment."
                )
            content = next(iter(contents.values()))
            media_data = parsed_answer["application"]
            application_data = content["application_data"]
            application_data["local_data"] = media_data["application_data"]
            transport_data = content["transport_data"]
            local_ice_data = media_data["transport_data"]
            transport_data["local_ice_data"] = local_ice_data
            transport_elt.children.clear()
            ice_transport_elt = await self._ice_udp.jingle_handler(
                client, action, session, content_name, transport_elt
            )
            transport_elt.addChild(ice_transport_elt)
        elif action == self._j.A_DESTROY:
            # the transport is replaced (fallback ?)
            pass
        else:
            log.warning(f"FIXME: unmanaged action {action}")

        return transport_elt


@implementer(iwokkel.IDisco)
class XEP0343Handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []