Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0343.py @ 4231:e11b13418ba6
plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation:
Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the
XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@,
Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to
work on another version and provided me with this link:
https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my
implementation.
This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167
plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it
normally does. This is because WebRTC has several implementations (browser for web
interface, GStreamer for others), and file/data must be handled directly by the frontend.
This is particularly important for web frontends, as the file is not sent from the backend
but from the end-user's browser device.
Among the changes, there are:
- XEP-0343 implementation.
- `file_send` bridge method now use serialised dict as output.
- New `BaseTransportHandler.is_usable` method which get content data and returns a boolean
(default to `True`) to tell if this transport can actually be used in this context (when
we are initiator). Used in webRTC case to see if call data are available.
- Support of `application` media type, and everything necessary to handle data channels.
- Better confirmation message, with file name, size and description when available.
- When file is accepted in preflight, it is specified in following `action_new` signal for
actual file transfer. This way, frontend can avoid the display or 2 confirmation
messages.
- XEP-0166: when not specified, default `content` name is now its index number instead of
a UUID. This follows the behaviour of browsers.
- XEP-0353: better handling of events such as call taken by another device.
- various other updates.
rel 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:57:23 +0200 (9 months ago) |
parents | |
children | 79c8a70e1813 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Final from twisted.internet import defer, reactor from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.core.xmpp import SatXMPPEntity from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData from libervia.backend.tools.common import data_format from .plugin_xep_0167 import mapping log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "WebRTC datachannels in Jingle", C.PI_IMPORT_NAME: "XEP-0343", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0343", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""), } NS_JINGLE_WEBRTC_DATACHANNELS: Final[ str ] = "urn:xmpp:jingle:transports:webrtc-datachannel:1" class XEP_0343(BaseTransportHandler): namespace = NS_JINGLE_WEBRTC_DATACHANNELS def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._ice_udp = host.plugins["XEP-0176"] self._j = host.plugins["XEP-0166"] self._j.register_transport( NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000 ) self._rtp = host.plugins["XEP-0167"] host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) host.trigger.add( "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger ) host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer) host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send) host.trigger.add( "XEP-0234_file_jingle_send", self._file_jingle_send ) def get_handler(self, client: SatXMPPEntity): return XEP0343Handler() def is_usable(self, client, content_data: ContentData) -> bool: try: return content_data.app_kwargs["extra"]["webrtc"] except KeyError: return False def _parse_sdp_a_trigger( self, attribute: str, parts: list[str], call_data: dict, metadata: dict, media_type: str, application_data: dict, transport_data: dict, ) -> None: """Parse "sctp-port" and "max-message-size" attributes""" try: if attribute == "sctp-port": transport_data["sctp-port"] = int(parts[0]) elif attribute == "max-message-size": transport_data["max-message-size"] = int(parts[0]) except ValueError: log.warning(f"Can't parse value of {attribute}, ignoring: {parts}") def _generate_sdp_content_trigger( self, session: dict, local: bool, idx: int, content_data: dict, sdp_lines: list[str], application_data: dict, app_data_key: str, media_data: dict, media: str ) -> None: """Generate "sctp-port" and "max-message-size" attributes""" transport_data = content_data["transport_data"] sctp_port = transport_data.get("sctp-port") if sctp_port is not None: sdp_lines.append(f"a=sctp-port:{sctp_port}") max_message_size = transport_data.get("max-message-size") if max_message_size is not None: sdp_lines.append(f"a=max-message-size:{max_message_size}") def _wrap_transport_element( self, transport_elt: domish.Element ) -> None: """Wrap the XEP-0176 transport in a transport with this XEP namespace @param transport_elt: ICE UDP <transport>. Must be already a child of a <content> element. """ content_elt = transport_elt.parent if content_elt is None or not content_elt.name == "content": raise exceptions.InternalError("Was expecting <content> element.") content_elt.children.remove(transport_elt) wrapping_transport_elt = content_elt.addElement( (NS_JINGLE_WEBRTC_DATACHANNELS, "transport") ) wrapping_transport_elt.addChild(transport_elt) def _on_send_ice_buffer( self, client: SatXMPPEntity, session: dict, content_name: str, content_data: dict, transport_elt: domish.Element, iq_elt: domish.Element ) -> bool: if content_data["transport"].handler == self: self._wrap_transport_element(transport_elt) return True def _on_ice_candidate_send( self, client: SatXMPPEntity, session: dict, media_ice_data: dict[str, dict], content_name: str, content_data: dict, iq_elt: domish.Element ) -> bool: if content_data["transport"].handler == self: transport_elt = iq_elt.jingle.content.transport if transport_elt.uri != self._ice_udp.namespace: raise exceptions.InternalError("Was expecting an ICE UDP transport") self._wrap_transport_element(transport_elt) return True async def _file_jingle_send( self, client: SatXMPPEntity, peer_jid: jid.JID, content: dict ) -> None: call_data = content["app_kwargs"]["extra"].pop("call_data", None) if call_data: metadata = self._rtp.parse_call_data(call_data) try: application_data = call_data["application"] except KeyError: raise exceptions.DataError( '"call_data" must have an application media.' ) try: content["transport_data"] = { "sctp-port": metadata["sctp-port"], "max-message-size": metadata.get("max-message-size", 65536), "local_ice_data": { "ufrag": metadata["ice-ufrag"], "pwd": metadata["ice-pwd"], "candidates": application_data.pop("ice-candidates"), "fingerprint": application_data.pop("fingerprint", {}), } } except KeyError as e: raise exceptions.DataError(f"Mandatory key is missing: {e}") async def jingle_session_init( self, client: SatXMPPEntity, session: dict, content_name: str, ) -> domish.Element: content_data = session["contents"][content_name] transport_data = content_data["transport_data"] ice_transport_elt = await self._ice_udp.jingle_session_init( client, session, content_name ) transport_elt = domish.Element( (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"), attribs={ "sctp-port": str(transport_data["sctp-port"]), "max-message-size": str(transport_data["max-message-size"]) } ) transport_elt.addChild(ice_transport_elt) return transport_elt async def _call_ice_udp_handler( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, transport_elt: domish.Element, ): """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it""" try: ice_transport_elt = next( transport_elt.elements(self._ice_udp.namespace, "transport") ) except StopIteration: raise exceptions.DataError("Missing ICE UDP <transport> element.") else: await self._ice_udp.jingle_handler( client, action, session, content_name, ice_transport_elt ) async def jingle_handler( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, transport_elt: domish.Element, ) -> domish.Element: """Handle Jingle requests @param client: The SatXMPPEntity instance. @param action: The action to be performed with the session. @param session: A dictionary containing the session information. @param content_name: The name of the content. @param transport_elt: The domish.Element instance representing the transport element. @return: <transport> element """ content_data = session["contents"][content_name] transport_data = content_data["transport_data"] if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): session.setdefault("metadata", {}) session.setdefault("peer_metadata", {}) # we have to set application data despite being a transport handler, # because the SDP generation needs application data application_data = content_data["application_data"] application_data.setdefault("peer_data", {}) application_data.setdefault("media", "application") if action == self._j.A_PREPARE_CONFIRMATION: await self._call_ice_udp_handler( client, action, session, content_name, transport_elt ) try: transport_data["sctp-port"] = int(transport_elt["sctp-port"]) transport_data["max-message-size"] = int( transport_elt.getAttribute("max-message-size", 65536) ) except (KeyError, ValueError): raise exceptions.DataError( f"Invalid datachannel signaling element: {transport_elt.toXml()}" ) transport_data["webrtc"] = True elif action in ( self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR, self._j.A_TRANSPORT_INFO ): await self._call_ice_udp_handler( client, action, session, content_name, transport_elt ) elif action == self._j.A_SESSION_ACCEPT: await self._call_ice_udp_handler( client, action, session, content_name, transport_elt ) answer_sdp = mapping.generate_sdp_from_session(session) self.host.bridge.call_setup( session["id"], data_format.serialise( { "role": session["role"], "sdp": answer_sdp, } ), client.profile, ) elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): pass elif action == self._j.A_START: pass elif action == self._j.A_SESSION_INITIATE: # responder side sdp = mapping.generate_sdp_from_session(session) session["answer_sdp_d"] = answer_sdp_d = defer.Deferred() # we should have the answer long before 2 min answer_sdp_d.addTimeout(2 * 60, reactor) self.host.bridge.call_setup( session["id"], data_format.serialise( { "role": session["role"], "sdp": sdp, } ), client.profile, ) answer_sdp = await answer_sdp_d parsed_answer = mapping.parse_sdp(answer_sdp) session["metadata"].update(parsed_answer["metadata"]) contents = session["contents"] if len(contents) != 1: raise NotImplementedError( "Only a singlecontent is supported at the moment." ) content = next(iter(contents.values())) media_data = parsed_answer["application"] application_data = content["application_data"] application_data["local_data"] = media_data["application_data"] transport_data = content["transport_data"] local_ice_data = media_data["transport_data"] transport_data["local_ice_data"] = local_ice_data transport_elt.children.clear() ice_transport_elt = await self._ice_udp.jingle_handler( client, action, session, content_name, transport_elt ) transport_elt.addChild(ice_transport_elt) elif action == self._j.A_DESTROY: # the transport is replaced (fallback ?) pass else: log.warning(f"FIXME: unmanaged action {action}") return transport_elt @implementer(iwokkel.IDisco) class XEP0343Handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []