Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0343.py @ 4232:0fbe5c605eb6
tests (unit/webrtc,XEP-0176, XEP-0234): Fix tests and add webrtc file transfer tests:
fix 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:59:50 +0200 |
parents | e11b13418ba6 |
children | 79c8a70e1813 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Final from twisted.internet import defer, reactor from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.core.xmpp import SatXMPPEntity from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData from libervia.backend.tools.common import data_format from .plugin_xep_0167 import mapping log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "WebRTC datachannels in Jingle", C.PI_IMPORT_NAME: "XEP-0343", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "XEP_0343", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""), } NS_JINGLE_WEBRTC_DATACHANNELS: Final[ str ] = "urn:xmpp:jingle:transports:webrtc-datachannel:1" class XEP_0343(BaseTransportHandler): namespace = NS_JINGLE_WEBRTC_DATACHANNELS def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._ice_udp = host.plugins["XEP-0176"] self._j = host.plugins["XEP-0166"] self._j.register_transport( NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000 ) self._rtp = host.plugins["XEP-0167"] host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) host.trigger.add( "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger ) host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer) host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send) host.trigger.add( "XEP-0234_file_jingle_send", self._file_jingle_send ) def get_handler(self, client: SatXMPPEntity): return XEP0343Handler() def is_usable(self, client, content_data: ContentData) -> bool: try: return content_data.app_kwargs["extra"]["webrtc"] except KeyError: return False def _parse_sdp_a_trigger( self, attribute: str, parts: list[str], call_data: dict, metadata: dict, media_type: str, application_data: dict, transport_data: dict, ) -> None: """Parse "sctp-port" and "max-message-size" attributes""" try: if attribute == "sctp-port": transport_data["sctp-port"] = int(parts[0]) elif attribute == "max-message-size": transport_data["max-message-size"] = int(parts[0]) except ValueError: log.warning(f"Can't parse value of {attribute}, ignoring: {parts}") def _generate_sdp_content_trigger( self, session: dict, local: bool, idx: int, content_data: dict, sdp_lines: list[str], application_data: dict, app_data_key: str, media_data: dict, media: str ) -> None: """Generate "sctp-port" and "max-message-size" attributes""" transport_data = content_data["transport_data"] sctp_port = transport_data.get("sctp-port") if sctp_port is not None: sdp_lines.append(f"a=sctp-port:{sctp_port}") max_message_size = transport_data.get("max-message-size") if max_message_size is not None: sdp_lines.append(f"a=max-message-size:{max_message_size}") def _wrap_transport_element( self, transport_elt: domish.Element ) -> None: """Wrap the XEP-0176 transport in a transport with this XEP namespace @param transport_elt: ICE UDP <transport>. Must be already a child of a <content> element. """ content_elt = transport_elt.parent if content_elt is None or not content_elt.name == "content": raise exceptions.InternalError("Was expecting <content> element.") content_elt.children.remove(transport_elt) wrapping_transport_elt = content_elt.addElement( (NS_JINGLE_WEBRTC_DATACHANNELS, "transport") ) wrapping_transport_elt.addChild(transport_elt) def _on_send_ice_buffer( self, client: SatXMPPEntity, session: dict, content_name: str, content_data: dict, transport_elt: domish.Element, iq_elt: domish.Element ) -> bool: if content_data["transport"].handler == self: self._wrap_transport_element(transport_elt) return True def _on_ice_candidate_send( self, client: SatXMPPEntity, session: dict, media_ice_data: dict[str, dict], content_name: str, content_data: dict, iq_elt: domish.Element ) -> bool: if content_data["transport"].handler == self: transport_elt = iq_elt.jingle.content.transport if transport_elt.uri != self._ice_udp.namespace: raise exceptions.InternalError("Was expecting an ICE UDP transport") self._wrap_transport_element(transport_elt) return True async def _file_jingle_send( self, client: SatXMPPEntity, peer_jid: jid.JID, content: dict ) -> None: call_data = content["app_kwargs"]["extra"].pop("call_data", None) if call_data: metadata = self._rtp.parse_call_data(call_data) try: application_data = call_data["application"] except KeyError: raise exceptions.DataError( '"call_data" must have an application media.' ) try: content["transport_data"] = { "sctp-port": metadata["sctp-port"], "max-message-size": metadata.get("max-message-size", 65536), "local_ice_data": { "ufrag": metadata["ice-ufrag"], "pwd": metadata["ice-pwd"], "candidates": application_data.pop("ice-candidates"), "fingerprint": application_data.pop("fingerprint", {}), } } except KeyError as e: raise exceptions.DataError(f"Mandatory key is missing: {e}") async def jingle_session_init( self, client: SatXMPPEntity, session: dict, content_name: str, ) -> domish.Element: content_data = session["contents"][content_name] transport_data = content_data["transport_data"] ice_transport_elt = await self._ice_udp.jingle_session_init( client, session, content_name ) transport_elt = domish.Element( (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"), attribs={ "sctp-port": str(transport_data["sctp-port"]), "max-message-size": str(transport_data["max-message-size"]) } ) transport_elt.addChild(ice_transport_elt) return transport_elt async def _call_ice_udp_handler( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, transport_elt: domish.Element, ): """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it""" try: ice_transport_elt = next( transport_elt.elements(self._ice_udp.namespace, "transport") ) except StopIteration: raise exceptions.DataError("Missing ICE UDP <transport> element.") else: await self._ice_udp.jingle_handler( client, action, session, content_name, ice_transport_elt ) async def jingle_handler( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, transport_elt: domish.Element, ) -> domish.Element: """Handle Jingle requests @param client: The SatXMPPEntity instance. @param action: The action to be performed with the session. @param session: A dictionary containing the session information. @param content_name: The name of the content. @param transport_elt: The domish.Element instance representing the transport element. @return: <transport> element """ content_data = session["contents"][content_name] transport_data = content_data["transport_data"] if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): session.setdefault("metadata", {}) session.setdefault("peer_metadata", {}) # we have to set application data despite being a transport handler, # because the SDP generation needs application data application_data = content_data["application_data"] application_data.setdefault("peer_data", {}) application_data.setdefault("media", "application") if action == self._j.A_PREPARE_CONFIRMATION: await self._call_ice_udp_handler( client, action, session, content_name, transport_elt ) try: transport_data["sctp-port"] = int(transport_elt["sctp-port"]) transport_data["max-message-size"] = int( transport_elt.getAttribute("max-message-size", 65536) ) except (KeyError, ValueError): raise exceptions.DataError( f"Invalid datachannel signaling element: {transport_elt.toXml()}" ) transport_data["webrtc"] = True elif action in ( self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR, self._j.A_TRANSPORT_INFO ): await self._call_ice_udp_handler( client, action, session, content_name, transport_elt ) elif action == self._j.A_SESSION_ACCEPT: await self._call_ice_udp_handler( client, action, session, content_name, transport_elt ) answer_sdp = mapping.generate_sdp_from_session(session) self.host.bridge.call_setup( session["id"], data_format.serialise( { "role": session["role"], "sdp": answer_sdp, } ), client.profile, ) elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): pass elif action == self._j.A_START: pass elif action == self._j.A_SESSION_INITIATE: # responder side sdp = mapping.generate_sdp_from_session(session) session["answer_sdp_d"] = answer_sdp_d = defer.Deferred() # we should have the answer long before 2 min answer_sdp_d.addTimeout(2 * 60, reactor) self.host.bridge.call_setup( session["id"], data_format.serialise( { "role": session["role"], "sdp": sdp, } ), client.profile, ) answer_sdp = await answer_sdp_d parsed_answer = mapping.parse_sdp(answer_sdp) session["metadata"].update(parsed_answer["metadata"]) contents = session["contents"] if len(contents) != 1: raise NotImplementedError( "Only a singlecontent is supported at the moment." ) content = next(iter(contents.values())) media_data = parsed_answer["application"] application_data = content["application_data"] application_data["local_data"] = media_data["application_data"] transport_data = content["transport_data"] local_ice_data = media_data["transport_data"] transport_data["local_ice_data"] = local_ice_data transport_elt.children.clear() ice_transport_elt = await self._ice_udp.jingle_handler( client, action, session, content_name, transport_elt ) transport_elt.addChild(ice_transport_elt) elif action == self._j.A_DESTROY: # the transport is replaced (fallback ?) pass else: log.warning(f"FIXME: unmanaged action {action}") return transport_elt @implementer(iwokkel.IDisco) class XEP0343Handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []