Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0343.py @ 4231:e11b13418ba6
plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation:
Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the
XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@,
Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to
work on another version and provided me with this link:
https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my
implementation.
This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167
plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it
normally does. This is because WebRTC has several implementations (browser for web
interface, GStreamer for others), and file/data must be handled directly by the frontend.
This is particularly important for web frontends, as the file is not sent from the backend
but from the end-user's browser device.
Among the changes, there are:
- XEP-0343 implementation.
- `file_send` bridge method now use serialised dict as output.
- New `BaseTransportHandler.is_usable` method which get content data and returns a boolean
(default to `True`) to tell if this transport can actually be used in this context (when
we are initiator). Used in webRTC case to see if call data are available.
- Support of `application` media type, and everything necessary to handle data channels.
- Better confirmation message, with file name, size and description when available.
- When file is accepted in preflight, it is specified in following `action_new` signal for
actual file transfer. This way, frontend can avoid the display or 2 confirmation
messages.
- XEP-0166: when not specified, default `content` name is now its index number instead of
a UUID. This follows the behaviour of browsers.
- XEP-0353: better handling of events such as call taken by another device.
- various other updates.
rel 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:57:23 +0200 |
parents | |
children | 79c8a70e1813 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0343.py Sat Apr 06 12:57:23 2024 +0200 @@ -0,0 +1,373 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Final + +from twisted.internet import defer, reactor +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.core.xmpp import SatXMPPEntity +from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData +from libervia.backend.tools.common import data_format + +from .plugin_xep_0167 import mapping + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "WebRTC datachannels in Jingle", + C.PI_IMPORT_NAME: "XEP-0343", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0343", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""), +} +NS_JINGLE_WEBRTC_DATACHANNELS: Final[ + str +] = "urn:xmpp:jingle:transports:webrtc-datachannel:1" + + +class XEP_0343(BaseTransportHandler): + namespace = NS_JINGLE_WEBRTC_DATACHANNELS + + def __init__(self, host): + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + self.host = host + self._ice_udp = host.plugins["XEP-0176"] + self._j = host.plugins["XEP-0166"] + self._j.register_transport( + NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000 + ) + self._rtp = host.plugins["XEP-0167"] + host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) + host.trigger.add( + "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger + ) + host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer) + host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send) + host.trigger.add( + "XEP-0234_file_jingle_send", self._file_jingle_send + ) + + def get_handler(self, client: SatXMPPEntity): + return XEP0343Handler() + + def is_usable(self, client, content_data: ContentData) -> bool: + try: + return content_data.app_kwargs["extra"]["webrtc"] + except KeyError: + return False + + def _parse_sdp_a_trigger( + self, + attribute: str, + parts: list[str], + call_data: dict, + metadata: dict, + media_type: str, + application_data: dict, + transport_data: dict, + ) -> None: + """Parse "sctp-port" and "max-message-size" attributes""" + try: + if attribute == "sctp-port": + transport_data["sctp-port"] = int(parts[0]) + elif attribute == "max-message-size": + transport_data["max-message-size"] = int(parts[0]) + except ValueError: + log.warning(f"Can't parse value of {attribute}, ignoring: {parts}") + + def _generate_sdp_content_trigger( + self, + session: dict, + local: bool, + idx: int, + content_data: dict, + sdp_lines: list[str], + application_data: dict, + app_data_key: str, + media_data: dict, + media: str + ) -> None: + """Generate "sctp-port" and "max-message-size" attributes""" + transport_data = content_data["transport_data"] + sctp_port = transport_data.get("sctp-port") + if sctp_port is not None: + sdp_lines.append(f"a=sctp-port:{sctp_port}") + + max_message_size = transport_data.get("max-message-size") + if max_message_size is not None: + sdp_lines.append(f"a=max-message-size:{max_message_size}") + + def _wrap_transport_element( + self, + transport_elt: domish.Element + ) -> None: + """Wrap the XEP-0176 transport in a transport with this XEP namespace + + @param transport_elt: ICE UDP <transport>. Must be already a child of a <content> + element. + """ + content_elt = transport_elt.parent + if content_elt is None or not content_elt.name == "content": + raise exceptions.InternalError("Was expecting <content> element.") + content_elt.children.remove(transport_elt) + wrapping_transport_elt = content_elt.addElement( + (NS_JINGLE_WEBRTC_DATACHANNELS, "transport") + ) + wrapping_transport_elt.addChild(transport_elt) + + def _on_send_ice_buffer( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + content_data: dict, + transport_elt: domish.Element, + iq_elt: domish.Element + ) -> bool: + if content_data["transport"].handler == self: + self._wrap_transport_element(transport_elt) + return True + + def _on_ice_candidate_send( + self, + client: SatXMPPEntity, + session: dict, + media_ice_data: dict[str, dict], + content_name: str, + content_data: dict, + iq_elt: domish.Element + ) -> bool: + if content_data["transport"].handler == self: + transport_elt = iq_elt.jingle.content.transport + if transport_elt.uri != self._ice_udp.namespace: + raise exceptions.InternalError("Was expecting an ICE UDP transport") + self._wrap_transport_element(transport_elt) + return True + + async def _file_jingle_send( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + content: dict + ) -> None: + call_data = content["app_kwargs"]["extra"].pop("call_data", None) + if call_data: + metadata = self._rtp.parse_call_data(call_data) + try: + application_data = call_data["application"] + except KeyError: + raise exceptions.DataError( + '"call_data" must have an application media.' + ) + try: + content["transport_data"] = { + "sctp-port": metadata["sctp-port"], + "max-message-size": metadata.get("max-message-size", 65536), + "local_ice_data": { + "ufrag": metadata["ice-ufrag"], + "pwd": metadata["ice-pwd"], + "candidates": application_data.pop("ice-candidates"), + "fingerprint": application_data.pop("fingerprint", {}), + } + } + except KeyError as e: + raise exceptions.DataError(f"Mandatory key is missing: {e}") + + async def jingle_session_init( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + ) -> domish.Element: + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + ice_transport_elt = await self._ice_udp.jingle_session_init( + client, + session, + content_name + ) + transport_elt = domish.Element( + (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"), + attribs={ + "sctp-port": str(transport_data["sctp-port"]), + "max-message-size": str(transport_data["max-message-size"]) + } + ) + transport_elt.addChild(ice_transport_elt) + return transport_elt + + async def _call_ice_udp_handler( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + transport_elt: domish.Element, + ): + """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it""" + try: + ice_transport_elt = next( + transport_elt.elements(self._ice_udp.namespace, "transport") + ) + except StopIteration: + raise exceptions.DataError("Missing ICE UDP <transport> element.") + else: + await self._ice_udp.jingle_handler( + client, action, session, content_name, ice_transport_elt + ) + + async def jingle_handler( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + transport_elt: domish.Element, + ) -> domish.Element: + """Handle Jingle requests + + @param client: The SatXMPPEntity instance. + @param action: The action to be performed with the session. + @param session: A dictionary containing the session information. + @param content_name: The name of the content. + @param transport_elt: The domish.Element instance representing the transport + element. + + @return: <transport> element + """ + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): + session.setdefault("metadata", {}) + session.setdefault("peer_metadata", {}) + # we have to set application data despite being a transport handler, + # because the SDP generation needs application data + application_data = content_data["application_data"] + application_data.setdefault("peer_data", {}) + application_data.setdefault("media", "application") + + if action == self._j.A_PREPARE_CONFIRMATION: + await self._call_ice_udp_handler( + client, action, session, content_name, transport_elt + ) + try: + transport_data["sctp-port"] = int(transport_elt["sctp-port"]) + transport_data["max-message-size"] = int( + transport_elt.getAttribute("max-message-size", 65536) + ) + except (KeyError, ValueError): + raise exceptions.DataError( + f"Invalid datachannel signaling element: {transport_elt.toXml()}" + ) + transport_data["webrtc"] = True + elif action in ( + self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR, + self._j.A_TRANSPORT_INFO + ): + await self._call_ice_udp_handler( + client, action, session, content_name, transport_elt + ) + elif action == self._j.A_SESSION_ACCEPT: + await self._call_ice_udp_handler( + client, action, session, content_name, transport_elt + ) + answer_sdp = mapping.generate_sdp_from_session(session) + self.host.bridge.call_setup( + session["id"], + data_format.serialise( + { + "role": session["role"], + "sdp": answer_sdp, + } + ), + client.profile, + ) + elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): + pass + elif action == self._j.A_START: + pass + elif action == self._j.A_SESSION_INITIATE: + # responder side + + sdp = mapping.generate_sdp_from_session(session) + session["answer_sdp_d"] = answer_sdp_d = defer.Deferred() + # we should have the answer long before 2 min + answer_sdp_d.addTimeout(2 * 60, reactor) + + self.host.bridge.call_setup( + session["id"], + data_format.serialise( + { + "role": session["role"], + "sdp": sdp, + } + ), + client.profile, + ) + + answer_sdp = await answer_sdp_d + parsed_answer = mapping.parse_sdp(answer_sdp) + session["metadata"].update(parsed_answer["metadata"]) + contents = session["contents"] + if len(contents) != 1: + raise NotImplementedError( + "Only a singlecontent is supported at the moment." + ) + content = next(iter(contents.values())) + media_data = parsed_answer["application"] + application_data = content["application_data"] + application_data["local_data"] = media_data["application_data"] + transport_data = content["transport_data"] + local_ice_data = media_data["transport_data"] + transport_data["local_ice_data"] = local_ice_data + transport_elt.children.clear() + ice_transport_elt = await self._ice_udp.jingle_handler( + client, action, session, content_name, transport_elt + ) + transport_elt.addChild(ice_transport_elt) + elif action == self._j.A_DESTROY: + # the transport is replaced (fallback ?) + pass + else: + log.warning(f"FIXME: unmanaged action {action}") + + return transport_elt + + +@implementer(iwokkel.IDisco) +class XEP0343Handler(XMPPHandler): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []