view libervia/backend/plugins/plugin_xep_0176.py @ 4231:e11b13418ba6

plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:57:23 +0200
parents 23fa52acf72c
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin for Jingle (XEP-0176)
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Dict, List, Optional
import uuid

from twisted.internet import defer
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.tools.common import data_format

from .plugin_xep_0166 import BaseTransportHandler

log = getLogger(__name__)

NS_JINGLE_ICE_UDP = "urn:xmpp:jingle:transports:ice-udp:1"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle ICE-UDP Transport Method",
    C.PI_IMPORT_NAME: "XEP-0176",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0176"],
    C.PI_DEPENDENCIES: ["XEP-0166"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0176",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of Jingle ICE-UDP transport"""),
}


class XEP_0176(BaseTransportHandler):
    namespace = NS_JINGLE_ICE_UDP

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._j = host.plugins["XEP-0166"]  # shortcut to access jingle
        self._j.register_transport(
            NS_JINGLE_ICE_UDP, self._j.TRANSPORT_DATAGRAM, self, 100
        )
        host.bridge.add_method(
            "ice_candidates_add",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._ice_candidates_add,
            async_=True,
        )
        host.bridge.add_signal(
            "ice_candidates_new", ".plugin", signature="sss"
        )  # args: jingle_sid, candidates_serialised, profile
        host.bridge.add_signal(
            "ice_restart", ".plugin", signature="sss"
        )  # args: jingle_sid, side ("local" or "peer"), profile

    def get_handler(self, client):
        return XEP_0176_handler()

    def _ice_candidates_add(
        self,
        session_id: str,
        media_ice_data_s: str,
        profile_key: str,
    ):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(
            self.ice_candidates_add(
                client,
                session_id,
                data_format.deserialise(media_ice_data_s),
            )
        )

    def build_transport(self, ice_data: dict) -> domish.Element:
        """Generate <transport> element from ICE data

        @param ice_data: a dict containing the following keys:
            - "ufrag" (str): The ICE username fragment.
            - "pwd" (str): The ICE password.
            - "candidates" (List[dict]): A list of ICE candidate dictionaries, each
              containing:
                - "component_id" (int): The component ID.
                - "foundation" (str): The candidate foundation.
                - "address" (str): The candidate IP address.
                - "port" (int): The candidate port.
                - "priority" (int): The candidate priority.
                - "transport" (str): The candidate transport protocol, e.g., "udp".
                - "type" (str): The candidate type, e.g., "host", "srflx", "prflx", or
                  "relay".
                - "generation" (str, optional): The candidate generation. Defaults to "0".
                - "network" (str, optional): The candidate network. Defaults to "0".
                - "rel_addr" (str, optional): The related address for the candidate, if
                  any.
                - "rel_port" (int, optional): The related port for the candidate, if any.

        @return: A <transport> element.
        """
        try:
            ufrag: str = ice_data["ufrag"]
            pwd: str = ice_data["pwd"]
        except KeyError as e:
            raise exceptions.DataError(f"ICE {e} must be provided")
        candidates: List[dict] = ice_data.get("candidates", [])

        candidates.sort(key=lambda c: int(c.get("priority", 0)), reverse=True)
        transport_elt = domish.Element(
            (NS_JINGLE_ICE_UDP, "transport"), attribs={"ufrag": ufrag, "pwd": pwd}
        )

        for candidate in candidates:
            try:
                candidate_elt = transport_elt.addElement("candidate")
                candidate_elt["component"] = str(candidate["component_id"])
                candidate_elt["foundation"] = candidate["foundation"]
                candidate_elt["generation"] = str(candidate.get("generation", "0"))
                candidate_elt["id"] = candidate.get("id") or str(uuid.uuid4())
                candidate_elt["ip"] = candidate["address"]
                candidate_elt["network"] = str(candidate.get("network", "0"))
                candidate_elt["port"] = str(candidate["port"])
                candidate_elt["priority"] = str(candidate["priority"])
                candidate_elt["protocol"] = candidate["transport"]
                candidate_elt["type"] = candidate["type"]
            except KeyError as e:
                raise exceptions.DataError(
                    f"Mandatory ICE candidate attribute {e} is missing"
                )

            if "rel_addr" in candidate and "rel_port" in candidate:
                candidate_elt["rel-addr"] = candidate["rel_addr"]
                candidate_elt["rel-port"] = str(candidate["rel_port"])

        self.host.trigger.point("XEP-0176_build_transport", transport_elt, ice_data)

        return transport_elt

    def parse_transport(self, transport_elt: domish.Element) -> dict:
        """Parse <transport> to a dict

        @param transport_elt: <transport> element
        @return: ICE data (as in [build_transport])
        """
        try:
            ice_data = {"ufrag": transport_elt["ufrag"], "pwd": transport_elt["pwd"]}
        except KeyError as e:
            raise exceptions.DataError(
                f"<transport> is missing mandatory attribute {e}: {transport_elt.toXml()}"
            )
        ice_data["candidates"] = ice_candidates = []

        for candidate_elt in transport_elt.elements(NS_JINGLE_ICE_UDP, "candidate"):
            try:
                candidate = {
                    "component_id": int(candidate_elt["component"]),
                    "foundation": candidate_elt["foundation"],
                    "address": candidate_elt["ip"],
                    "port": int(candidate_elt["port"]),
                    "priority": int(candidate_elt["priority"]),
                    "transport": candidate_elt["protocol"],
                    "type": candidate_elt["type"],
                }
            except KeyError as e:
                raise exceptions.DataError(
                    f"Mandatory attribute {e} is missing in candidate element"
                )

            if candidate_elt.hasAttribute("generation"):
                candidate["generation"] = candidate_elt["generation"]

            if candidate_elt.hasAttribute("network"):
                candidate["network"] = candidate_elt["network"]

            if candidate_elt.hasAttribute("rel-addr"):
                candidate["rel_addr"] = candidate_elt["rel-addr"]

            if candidate_elt.hasAttribute("rel-port"):
                candidate["rel_port"] = int(candidate_elt["rel-port"])

            ice_candidates.append(candidate)

        self.host.trigger.point("XEP-0176_parse_transport", transport_elt, ice_data)

        return ice_data

    async def jingle_session_init(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
    ) -> domish.Element:
        """Create a Jingle session initiation transport element with ICE candidates.

        @param client: SatXMPPEntity object representing the client.
        @param session: Dictionary containing session data.
        @param content_name: Name of the content.
        @param ufrag: ICE username fragment.
        @param pwd: ICE password.
        @param candidates: List of ICE candidate dictionaries parsed from the
            parse_ice_candidate method.

        @return: domish.Element representing the Jingle transport element.

        @raise exceptions.DataError: If mandatory data is missing from the candidates.
        """
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        ice_data = transport_data["local_ice_data"]
        return self.build_transport(ice_data)

    async def jingle_handler(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        transport_elt: domish.Element,
    ) -> domish.Element:
        """Handle Jingle requests

        @param client: The SatXMPPEntity instance.
        @param action: The action to be performed with the session.
        @param session: A dictionary containing the session information.
        @param content_name: The name of the content.
        @param transport_elt: The domish.Element instance representing the transport
            element.

        @return: <transport> element
        """
        content_data = session["contents"][content_name]
        transport_data = content_data["transport_data"]
        if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
            peer_ice_data = self.parse_transport(transport_elt)
            transport_data["peer_ice_data"] = peer_ice_data

        elif action == self._j.A_ACCEPTED_ACK:
            buffer = session.pop("XEP-0176_handler_buffer", None)
            if buffer:
                log.debug("replaying buffered events")
                for args in buffer:
                    await self.jingle_handler(*args)
        elif action == self._j.A_PREPARE_RESPONDER:
            pass

        elif action == self._j.A_SESSION_ACCEPT:
            # we check if we have any buffered ICE candidates, and send them if it's the
            # case
            media_type = content_data["application_data"].get("media")
            try:
                buffer = session["XEP-0176_buffer"]
                buffered_ice_data = buffer.pop(media_type)
            except KeyError:
                pass
            else:
                if not buffer:
                    del session["XEP-0176_buffer"]
                transport_elt = self.build_transport(buffered_ice_data)
                iq_elt, __ = self._j.build_action(
                    client,
                    self._j.A_TRANSPORT_INFO,
                    session,
                    content_name,
                    context_elt=transport_elt,
                )
                self.host.trigger.point(
                    "XEP-0176_jingle_handler_send_buffer",
                    client,
                    session,
                    content_name,
                    content_data,
                    transport_elt,
                    iq_elt
                )
                await iq_elt.send()

        elif action == self._j.A_START:
            pass

        elif action == self._j.A_SESSION_INITIATE:
            # responder side, we give our candidates
            transport_elt = self.build_transport(transport_data["local_ice_data"])
        elif action == self._j.A_TRANSPORT_INFO:
            if session["state"] == self._j.STATE_PENDING:
                # Session is not yet active; we buffer the arguments to replay them
                # when the session becomes active. This makes the frontend's life easier.
                log.debug("session is not active yet, buffering transport-info element")
                buffer = session.setdefault("XEP-0176_handler_buffer", [])
                buffer.append([client, action, session, content_name, transport_elt])
                return transport_elt

            media_type = content_data["application_data"].get("media")
            new_ice_data = self.parse_transport(transport_elt)
            restart = self.update_candidates(transport_data, new_ice_data, local=False)
            if restart:
                log.debug(
                    f"Peer ICE restart detected on session {session['id']} "
                    f"[{client.profile}]"
                )
                self.host.bridge.ice_restart(session["id"], "peer", client.profile)

            self.host.bridge.ice_candidates_new(
                session["id"],
                data_format.serialise({media_type: new_ice_data}),
                client.profile,
            )
        elif action == self._j.A_DESTROY:
            pass
        else:
            log.warning("FIXME: unmanaged action {}".format(action))

        return transport_elt

    def jingle_terminate(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        reason_elt: domish.Element,
    ) -> None:
        log.debug("ICE-UDP session terminated")

    def update_candidates(
        self, transport_data: dict, new_ice_data: dict, local: bool
    ) -> bool:
        """Update ICE candidates when new one are received

        @param transport_data: transport_data of the content linked to the candidates
        @param new_ice_data: new ICE data, in the same format as returned
            by [self.parse_transport]
        @param local: True if it's our candidates, False if it's peer ones
        @return: True if there is a ICE restart
        """
        key = "local_ice_data" if local else "peer_ice_data"
        try:
            ice_data = transport_data[key]
        except KeyError:
            log.warning(f"no {key} available")
            transport_data[key] = new_ice_data
        else:
            if (
                new_ice_data["ufrag"] != ice_data["ufrag"]
                or new_ice_data["pwd"] != ice_data["pwd"]
            ):
                ice_data["ufrag"] = new_ice_data["ufrag"]
                ice_data["pwd"] = new_ice_data["pwd"]
                ice_data["candidates"] = new_ice_data["candidates"]
                return True
        return False

    async def ice_candidates_add(
        self, client: SatXMPPEntity, session_id: str, media_ice_data: Dict[str, dict]
    ) -> None:
        """Called when a new ICE candidates are available for a session

        @param session_id: Session ID
        @param media_ice_data: a map from media type (audio, video) to ICE data
            ICE data must be in the same format as in [self.parse_transport]
        """
        session = self._j.get_session(client, session_id)
        iq_elt: Optional[domish.Element] = None
        content_name: str|None = None
        content_data: dict|None = None

        for media_type, new_ice_data in media_ice_data.items():
            if session["state"] == self._j.STATE_PENDING:
                log.debug(f"session not active, buffering")
                buffer = session.setdefault("XEP-0176_buffer", {})
                media_buffer = buffer.setdefault(media_type, {})

                for key in ["ufrag", "pwd"]:
                    if key not in media_buffer:
                        media_buffer[key] = new_ice_data[key]
                    else:
                        if media_buffer[key] != new_ice_data[key]:
                            log.warning(
                                f"{key} conflict, new value will replace old one\n"
                                f"buffer={media_buffer[key]!r}\n"
                                f"new={new_ice_data[key]!r}"
                            )
                            media_buffer[key] = new_ice_data[key]

                media_buffer.setdefault("candidates", []).extend(
                    new_ice_data["candidates"]
                )
                continue

            for content_name, content_data in session["contents"].items():
                if content_data["application_data"].get("media") == media_type:
                    break
            else:
                log.warning(f"no media of type {media_type} has been found")
                continue
            restart = self.update_candidates(
                content_data["transport_data"], new_ice_data, True
            )
            if restart:
                log.debug(
                    f"Local ICE restart detected on session {session['id']} "
                    f"[{client.profile}]"
                )
                self.host.bridge.ice_restart(session["id"], "local", client.profile)
            transport_elt = self.build_transport(new_ice_data)
            iq_elt, __ = self._j.build_action(
                client,
                self._j.A_TRANSPORT_INFO,
                session,
                content_name,
                iq_elt=iq_elt,
                context_elt=transport_elt,
            )

        if iq_elt is not None:
            assert content_name is not None and content_data is not None
            try:
                self.host.trigger.point(
                    "XEP-0176_ice_candidate_send", client, session, media_ice_data,
                    content_name, content_data, iq_elt
                )
                await iq_elt.send()
            except Exception as e:
                log.warning(f"Could not send new ICE candidates: {e}")


@implementer(iwokkel.IDisco)
class XEP_0176_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_ICE_UDP)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []