view libervia/backend/plugins/plugin_xep_0343.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents e11b13418ba6
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Final

from twisted.internet import defer, reactor
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.xmpp import SatXMPPEntity
from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData
from libervia.backend.tools.common import data_format

from .plugin_xep_0167 import mapping

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "WebRTC datachannels in Jingle",
    C.PI_IMPORT_NAME: "XEP-0343",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0343",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""),
}
NS_JINGLE_WEBRTC_DATACHANNELS: Final[
    str
] = "urn:xmpp:jingle:transports:webrtc-datachannel:1"


class XEP_0343(BaseTransportHandler):
    namespace = NS_JINGLE_WEBRTC_DATACHANNELS

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._ice_udp = host.plugins["XEP-0176"]
        self._j = host.plugins["XEP-0166"]
        self._j.register_transport(
            NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000
        )
        self._rtp = host.plugins["XEP-0167"]
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
        )
        host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer)
        host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send)
        host.trigger.add(
            "XEP-0234_file_jingle_send", self._file_jingle_send
        )

    def get_handler(self, client: SatXMPPEntity):
        return XEP0343Handler()

    def is_usable(self, client, content_data: ContentData) -> bool:
        try:
            return content_data.app_kwargs["extra"]["webrtc"]
        except KeyError:
            return False

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: list[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: dict,
        transport_data: dict,
    ) -> None:
        """Parse "sctp-port" and "max-message-size" attributes"""
        try:
            if attribute == "sctp-port":
                transport_data["sctp-port"] = int(parts[0])
            elif attribute == "max-message-size":
                transport_data["max-message-size"] = int(parts[0])
        except ValueError:
            log.warning(f"Can't parse value of {attribute}, ignoring: {parts}")

    def _generate_sdp_content_trigger(
        self,
        session: dict,
        local: bool,
        idx: int,
        content_data: dict,
        sdp_lines: list[str],
        application_data: dict,
        app_data_key: str,
        media_data: dict,
        media: str
    ) -> None:
        """Generate "sctp-port" and "max-message-size" attributes"""
        transport_data = content_data["transport_data"]
        sctp_port = transport_data.get("sctp-port")
        if sctp_port is not None:
            sdp_lines.append(f"a=sctp-port:{sctp_port}")

        max_message_size = transport_data.get("max-message-size")
        if max_message_size is not None:
            sdp_lines.append(f"a=max-message-size:{max_message_size}")

    def _wrap_transport_element(
        self,
        transport_elt: domish.Element
    ) -> None:
        """Wrap the XEP-0176 transport in a transport with this XEP namespace

        @param transport_elt: ICE UDP <transport>. Must be already a child of a <content>
            element.
        """
        content_elt = transport_elt.parent
        if content_elt is None or not content_elt.name == "content":
            raise exceptions.InternalError("Was expecting <content> element.")
        content_elt.children.remove(transport_elt)
        wrapping_transport_elt = content_elt.addElement(
            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport")
        )
        wrapping_transport_elt.addChild(transport_elt)

    def _on_send_ice_buffer(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
        content_data: dict,
        transport_elt: domish.Element,
        iq_elt: domish.Element
    ) -> bool:
        if content_data["transport"].handler == self:
            self._wrap_transport_element(transport_elt)
        return True

    def _on_ice_candidate_send(
        self,
        client: SatXMPPEntity,
        session: dict,
        media_ice_data: dict[str, dict],
        content_name: str,
        content_data: dict,
        iq_elt: domish.Element
    ) -> bool:
        if content_data["transport"].handler == self:
            transport_elt = iq_elt.jingle.content.transport
            if transport_elt.uri != self._ice_udp.namespace:
                raise exceptions.InternalError("Was expecting an ICE UDP transport")
            self._wrap_transport_element(transport_elt)
        return True

    async def _file_jingle_send(
        self,
        client: SatXMPPEntity,
        peer_jid: jid.JID,
        content: dict
    ) -> None:
        call_data = content["app_kwargs"]["extra"].pop("call_data", None)
        if call_data:
            metadata = self._rtp.parse_call_data(call_data)
            try:
                application_data = call_data["application"]
            except KeyError:
                raise exceptions.DataError(
                    '"call_data" must have an application media.'
                )
            try:
                content["transport_data"] = {
                    "sctp-port": metadata["sctp-port"],
                    "max-message-size": metadata.get("max-message-size", 65536),
                    "local_ice_data": {
                        "ufrag": metadata["ice-ufrag"],
                        "pwd": metadata["ice-pwd"],
                        "candidates": application_data.pop("ice-candidates"),
                        "fingerprint": application_data.pop("fingerprint", {}),
                    }
                }
            except KeyError as e:
                raise exceptions.DataError(f"Mandatory key is missing: {e}")

    async def jingle_session_init(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
    ) -> domish.Element:
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        ice_transport_elt = await self._ice_udp.jingle_session_init(
            client,
            session,
            content_name
        )
        transport_elt = domish.Element(
            (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"),
            attribs={
                "sctp-port": str(transport_data["sctp-port"]),
                "max-message-size": str(transport_data["max-message-size"])
            }
        )
        transport_elt.addChild(ice_transport_elt)
        return transport_elt

    async def _call_ice_udp_handler(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        transport_elt: domish.Element,
    ):
        """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it"""
        try:
            ice_transport_elt = next(
                transport_elt.elements(self._ice_udp.namespace, "transport")
            )
        except StopIteration:
            raise exceptions.DataError("Missing ICE UDP <transport> element.")
        else:
            await self._ice_udp.jingle_handler(
                client, action, session, content_name, ice_transport_elt
            )

    async def jingle_handler(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        transport_elt: domish.Element,
    ) -> domish.Element:
        """Handle Jingle requests

        @param client: The SatXMPPEntity instance.
        @param action: The action to be performed with the session.
        @param session: A dictionary containing the session information.
        @param content_name: The name of the content.
        @param transport_elt: The domish.Element instance representing the transport
            element.

        @return: <transport> element
        """
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
            session.setdefault("metadata", {})
            session.setdefault("peer_metadata", {})
            # we have to set application data despite being a transport handler,
            # because the SDP generation needs application data
            application_data = content_data["application_data"]
            application_data.setdefault("peer_data", {})
            application_data.setdefault("media", "application")

        if action == self._j.A_PREPARE_CONFIRMATION:
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
            try:
                transport_data["sctp-port"] = int(transport_elt["sctp-port"])
                transport_data["max-message-size"] = int(
                    transport_elt.getAttribute("max-message-size", 65536)
                )
            except (KeyError, ValueError):
                raise exceptions.DataError(
                    f"Invalid datachannel signaling element: {transport_elt.toXml()}"
                )
            transport_data["webrtc"] = True
        elif action in (self._j.A_PREPARE_INITIATOR, self._j.A_TRANSPORT_INFO):
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
        elif action == self._j.A_SESSION_ACCEPT:
            await self._call_ice_udp_handler(
                client, action, session, content_name, transport_elt
            )
            answer_sdp = mapping.generate_sdp_from_session(session)
            self.host.bridge.call_setup(
                session["id"],
                data_format.serialise(
                    {
                        "role": session["role"],
                        "sdp": answer_sdp,
                    }
                ),
                client.profile,
            )
        elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
            pass
        elif action == self._j.A_START:
            pass
        elif action == self._j.A_SESSION_INITIATE:
            # responder side

            # answer_sdp_d is a deferred handled in XEP-0167: it is called when the
            # frontend answers with its SDP.
            session["answer_sdp_d"] = answer_sdp_d = defer.Deferred()
            # we should have the answer long before 2 min
            answer_sdp_d.addTimeout(2 * 60, reactor)

            self._rtp.send_answer_sdp(client, session)

            answer_sdp = await answer_sdp_d
            parsed_answer = mapping.parse_sdp(answer_sdp, session["role"])
            session["metadata"].update(parsed_answer["metadata"])
            self._rtp.propagate_data(session, parsed_answer)

            transport_elt.children.clear()
            ice_transport_elt = await self._ice_udp.jingle_handler(
                client, action, session, content_name, transport_elt
            )
            transport_elt.addChild(ice_transport_elt)
        elif action == self._j.A_DESTROY:
            # the transport is replaced (fallback ?)
            pass
        else:
            log.warning(f"FIXME: unmanaged action {action}")

        return transport_elt


@implementer(iwokkel.IDisco)
class XEP0343Handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []