view libervia/backend/plugins/plugin_xep_0343.py @ 4236:f59e9421a650

test (unit/cli): Add a file send/receive test for WebRTC: fix 442
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 15:21:00 +0200
parents e11b13418ba6
children 79c8a70e1813
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Final

from twisted.internet import defer, reactor
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.xmpp import SatXMPPEntity
from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData
from libervia.backend.tools.common import data_format

from .plugin_xep_0167 import mapping

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "WebRTC datachannels in Jingle",
    C.PI_IMPORT_NAME: "XEP-0343",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0343",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""),
}
NS_JINGLE_WEBRTC_DATACHANNELS: Final[
    str
] = "urn:xmpp:jingle:transports:webrtc-datachannel:1"


class XEP_0343(BaseTransportHandler):
    namespace = NS_JINGLE_WEBRTC_DATACHANNELS

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._ice_udp = host.plugins["XEP-0176"]
        self._j = host.plugins["XEP-0166"]
        self._j.register_transport(
            NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000
        )
        self._rtp = host.plugins["XEP-0167"]
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
        )
        host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer)
        host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send)
        host.trigger.add(
            "XEP-0234_file_jingle_send", self._file_jingle_send
        )

    def get_handler(self, client: SatXMPPEntity):
        return XEP0343Handler()

    def is_usable(self, client, content_data: ContentData) -> bool:
        try:
            return content_data.app_kwargs["extra"]["webrtc"]
        except KeyError:
            return False

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: list[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: dict,
        transport_data: dict,
    ) -> None:
        """Parse "sctp-port" and "max-message-size" attributes"""
        try:
            if attribute == "sctp-port":
                transport_data["sctp-port"] = int(parts[0])
            elif attribute == "max-message-size":
                transport_data["max-message-size"] = int(parts[0])
        except ValueError:
            log.warning(f"Can't parse value of {attribute}, ignoring: {parts}")

    def _generate_sdp_content_trigger(
        self,
        session: dict,
        local: bool,
        idx: int,
        content_data: dict,
        sdp_lines: list[str],
        application_data: dict,
        app_data_key: str,
        media_data: dict,
        media: str
    ) -> None:
        """Generate "sctp-port" and "max-message-size" attributes"""
        transport_data = content_data["transport_data"]
        sctp_port = transport_data.get("sctp-port")
        if sctp_port is not None:
            sdp_lines.append(f"a=sctp-port:{sctp_port}")

        max_message_size = transport_data.get("max-message-size")
        if max_message_size is not None:
            sdp_lines.append(f"a=max-message-size:{max_message_size}")

    def _wrap_transport_element(
        self,
        transport_elt: domish.Element
    ) -> None:
        """Wrap the XEP-0176 transport in a transport with this XEP namespace

        @param transport_elt: ICE UDP <transport>. Must be already a child of a <content>
            element.
        """
        content_elt = transport_elt.parent
        if content_elt is None or not content_elt.name == "content":
            raise exceptions.InternalError("Was expecting <content> element.")
        content_elt.children.remove(transport_elt)
        wrapping_transport_elt = content_elt.addElement(
            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport")
        )
        wrapping_transport_elt.addChild(transport_elt)

    def _on_send_ice_buffer(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
        content_data: dict,
        transport_elt: domish.Element,
        iq_elt: domish.Element
    ) -> bool:
        if content_data["transport"].handler == self:
            self._wrap_transport_element(transport_elt)
        return True

    def _on_ice_candidate_send(
        self,
        client: SatXMPPEntity,
        session: dict,
        media_ice_data: dict[str, dict],
        content_name: str,
        content_data: dict,
        iq_elt: domish.Element
    ) -> bool:
        if content_data["transport"].handler == self:
            transport_elt = iq_elt.jingle.content.transport
            if transport_elt.uri != self._ice_udp.namespace:
                raise exceptions.InternalError("Was expecting an ICE UDP transport")
            self._wrap_transport_element(transport_elt)
        return True

    async def _file_jingle_send(
        self,
        client: SatXMPPEntity,
        peer_jid: jid.JID,
        content: dict
    ) -> None:
        call_data = content["app_kwargs"]["extra"].pop("call_data", None)
        if call_data:
            metadata = self._rtp.parse_call_data(call_data)
            try:
                application_data = call_data["application"]
            except KeyError:
                raise exceptions.DataError(
                    '"call_data" must have an application media.'
                )
            try:
                content["transport_data"] = {
                    "sctp-port": metadata["sctp-port"],
                    "max-message-size": metadata.get("max-message-size", 65536),
                    "local_ice_data": {
                        "ufrag": metadata["ice-ufrag"],
                        "pwd": metadata["ice-pwd"],
                        "candidates": application_data.pop("ice-candidates"),
                        "fingerprint": application_data.pop("fingerprint", {}),
                    }
                }
            except KeyError as e:
                raise exceptions.DataError(f"Mandatory key is missing: {e}")

    async def jingle_session_init(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
    ) -> domish.Element:
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        ice_transport_elt = await self._ice_udp.jingle_session_init(
            client,
            session,
            content_name
        )
        transport_elt = domish.Element(
            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"),
            attribs={
                "sctp-port": str(transport_data["sctp-port"]),
                "max-message-size": str(transport_data["max-message-size"])
            }
        )
        transport_elt.addChild(ice_transport_elt)
        return transport_elt

    async def _call_ice_udp_handler(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        transport_elt: domish.Element,
    ):
        """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it"""
        try:
            ice_transport_elt = next(
                transport_elt.elements(self._ice_udp.namespace, "transport")
            )
        except StopIteration:
            raise exceptions.DataError("Missing ICE UDP <transport> element.")
        else:
            await self._ice_udp.jingle_handler(
                client, action, session, content_name, ice_transport_elt
            )

    async def jingle_handler(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        transport_elt: domish.Element,
    ) -> domish.Element:
        """Handle Jingle requests

        @param client: The SatXMPPEntity instance.
        @param action: The action to be performed with the session.
        @param session: A dictionary containing the session information.
        @param content_name: The name of the content.
        @param transport_elt: The domish.Element instance representing the transport
            element.

        @return: <transport> element
        """
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
            session.setdefault("metadata", {})
            session.setdefault("peer_metadata", {})
            # we have to set application data despite being a transport handler,
            # because the SDP generation needs application data
            application_data = content_data["application_data"]
            application_data.setdefault("peer_data", {})
            application_data.setdefault("media", "application")

        if action == self._j.A_PREPARE_CONFIRMATION:
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
            try:
                transport_data["sctp-port"] = int(transport_elt["sctp-port"])
                transport_data["max-message-size"] = int(
                    transport_elt.getAttribute("max-message-size", 65536)
                )
            except (KeyError, ValueError):
                raise exceptions.DataError(
                    f"Invalid datachannel signaling element: {transport_elt.toXml()}"
                )
            transport_data["webrtc"] = True
        elif action in (
            self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR,
            self._j.A_TRANSPORT_INFO
        ):
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
        elif action == self._j.A_SESSION_ACCEPT:
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
            answer_sdp = mapping.generate_sdp_from_session(session)
            self.host.bridge.call_setup(
                session["id"],
                data_format.serialise(
                    {
                        "role": session["role"],
                        "sdp": answer_sdp,
                    }
                ),
                client.profile,
            )
        elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
            pass
        elif action == self._j.A_START:
            pass
        elif action == self._j.A_SESSION_INITIATE:
            # responder side

            sdp = mapping.generate_sdp_from_session(session)
            session["answer_sdp_d"] = answer_sdp_d = defer.Deferred()
            # we should have the answer long before 2 min
            answer_sdp_d.addTimeout(2 * 60, reactor)

            self.host.bridge.call_setup(
                session["id"],
                data_format.serialise(
                    {
                        "role": session["role"],
                        "sdp": sdp,
                    }
                ),
                client.profile,
            )

            answer_sdp = await answer_sdp_d
            parsed_answer = mapping.parse_sdp(answer_sdp)
            session["metadata"].update(parsed_answer["metadata"])
            contents = session["contents"]
            if len(contents) != 1:
                raise NotImplementedError(
                    "Only a singlecontent is supported at the moment."
                )
            content = next(iter(contents.values()))
            media_data = parsed_answer["application"]
            application_data = content["application_data"]
            application_data["local_data"] = media_data["application_data"]
            transport_data = content["transport_data"]
            local_ice_data = media_data["transport_data"]
            transport_data["local_ice_data"] = local_ice_data
            transport_elt.children.clear()
            ice_transport_elt = await self._ice_udp.jingle_handler(
                client, action, session, content_name, transport_elt
            )
            transport_elt.addChild(ice_transport_elt)
        elif action == self._j.A_DESTROY:
            # the transport is replaced (fallback ?)
            pass
        else:
            log.warning(f"FIXME: unmanaged action {action}")

        return transport_elt


@implementer(iwokkel.IDisco)
class XEP0343Handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []