comparison libervia/backend/plugins/plugin_xep_0343.py @ 4231:e11b13418ba6

plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:57:23 +0200
parents
children 79c8a70e1813
comparison
equal deleted inserted replaced
4230:314d3c02bb67 4231:e11b13418ba6
1 #!/usr/bin/env python3
2
3 # Libervia plugin
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import Final
20
21 from twisted.internet import defer, reactor
22 from twisted.words.protocols.jabber import jid
23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
24 from twisted.words.xish import domish
25 from wokkel import disco, iwokkel
26 from zope.interface import implementer
27
28 from libervia.backend.core import exceptions
29 from libervia.backend.core.constants import Const as C
30 from libervia.backend.core.i18n import _
31 from libervia.backend.core.log import getLogger
32 from libervia.backend.core.xmpp import SatXMPPEntity
33 from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData
34 from libervia.backend.tools.common import data_format
35
36 from .plugin_xep_0167 import mapping
37
38 log = getLogger(__name__)
39
40
41 PLUGIN_INFO = {
42 C.PI_NAME: "WebRTC datachannels in Jingle",
43 C.PI_IMPORT_NAME: "XEP-0343",
44 C.PI_TYPE: "XEP",
45 C.PI_MODES: C.PLUG_MODE_BOTH,
46 C.PI_PROTOCOLS: [],
47 C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"],
48 C.PI_RECOMMENDATIONS: [],
49 C.PI_MAIN: "XEP_0343",
50 C.PI_HANDLER: "yes",
51 C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""),
52 }
53 NS_JINGLE_WEBRTC_DATACHANNELS: Final[
54 str
55 ] = "urn:xmpp:jingle:transports:webrtc-datachannel:1"
56
57
58 class XEP_0343(BaseTransportHandler):
59 namespace = NS_JINGLE_WEBRTC_DATACHANNELS
60
61 def __init__(self, host):
62 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
63 self.host = host
64 self._ice_udp = host.plugins["XEP-0176"]
65 self._j = host.plugins["XEP-0166"]
66 self._j.register_transport(
67 NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000
68 )
69 self._rtp = host.plugins["XEP-0167"]
70 host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
71 host.trigger.add(
72 "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
73 )
74 host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer)
75 host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send)
76 host.trigger.add(
77 "XEP-0234_file_jingle_send", self._file_jingle_send
78 )
79
80 def get_handler(self, client: SatXMPPEntity):
81 return XEP0343Handler()
82
83 def is_usable(self, client, content_data: ContentData) -> bool:
84 try:
85 return content_data.app_kwargs["extra"]["webrtc"]
86 except KeyError:
87 return False
88
89 def _parse_sdp_a_trigger(
90 self,
91 attribute: str,
92 parts: list[str],
93 call_data: dict,
94 metadata: dict,
95 media_type: str,
96 application_data: dict,
97 transport_data: dict,
98 ) -> None:
99 """Parse "sctp-port" and "max-message-size" attributes"""
100 try:
101 if attribute == "sctp-port":
102 transport_data["sctp-port"] = int(parts[0])
103 elif attribute == "max-message-size":
104 transport_data["max-message-size"] = int(parts[0])
105 except ValueError:
106 log.warning(f"Can't parse value of {attribute}, ignoring: {parts}")
107
108 def _generate_sdp_content_trigger(
109 self,
110 session: dict,
111 local: bool,
112 idx: int,
113 content_data: dict,
114 sdp_lines: list[str],
115 application_data: dict,
116 app_data_key: str,
117 media_data: dict,
118 media: str
119 ) -> None:
120 """Generate "sctp-port" and "max-message-size" attributes"""
121 transport_data = content_data["transport_data"]
122 sctp_port = transport_data.get("sctp-port")
123 if sctp_port is not None:
124 sdp_lines.append(f"a=sctp-port:{sctp_port}")
125
126 max_message_size = transport_data.get("max-message-size")
127 if max_message_size is not None:
128 sdp_lines.append(f"a=max-message-size:{max_message_size}")
129
130 def _wrap_transport_element(
131 self,
132 transport_elt: domish.Element
133 ) -> None:
134 """Wrap the XEP-0176 transport in a transport with this XEP namespace
135
136 @param transport_elt: ICE UDP <transport>. Must be already a child of a <content>
137 element.
138 """
139 content_elt = transport_elt.parent
140 if content_elt is None or not content_elt.name == "content":
141 raise exceptions.InternalError("Was expecting <content> element.")
142 content_elt.children.remove(transport_elt)
143 wrapping_transport_elt = content_elt.addElement(
144 (NS_JINGLE_WEBRTC_DATACHANNELS, "transport")
145 )
146 wrapping_transport_elt.addChild(transport_elt)
147
148 def _on_send_ice_buffer(
149 self,
150 client: SatXMPPEntity,
151 session: dict,
152 content_name: str,
153 content_data: dict,
154 transport_elt: domish.Element,
155 iq_elt: domish.Element
156 ) -> bool:
157 if content_data["transport"].handler == self:
158 self._wrap_transport_element(transport_elt)
159 return True
160
161 def _on_ice_candidate_send(
162 self,
163 client: SatXMPPEntity,
164 session: dict,
165 media_ice_data: dict[str, dict],
166 content_name: str,
167 content_data: dict,
168 iq_elt: domish.Element
169 ) -> bool:
170 if content_data["transport"].handler == self:
171 transport_elt = iq_elt.jingle.content.transport
172 if transport_elt.uri != self._ice_udp.namespace:
173 raise exceptions.InternalError("Was expecting an ICE UDP transport")
174 self._wrap_transport_element(transport_elt)
175 return True
176
177 async def _file_jingle_send(
178 self,
179 client: SatXMPPEntity,
180 peer_jid: jid.JID,
181 content: dict
182 ) -> None:
183 call_data = content["app_kwargs"]["extra"].pop("call_data", None)
184 if call_data:
185 metadata = self._rtp.parse_call_data(call_data)
186 try:
187 application_data = call_data["application"]
188 except KeyError:
189 raise exceptions.DataError(
190 '"call_data" must have an application media.'
191 )
192 try:
193 content["transport_data"] = {
194 "sctp-port": metadata["sctp-port"],
195 "max-message-size": metadata.get("max-message-size", 65536),
196 "local_ice_data": {
197 "ufrag": metadata["ice-ufrag"],
198 "pwd": metadata["ice-pwd"],
199 "candidates": application_data.pop("ice-candidates"),
200 "fingerprint": application_data.pop("fingerprint", {}),
201 }
202 }
203 except KeyError as e:
204 raise exceptions.DataError(f"Mandatory key is missing: {e}")
205
206 async def jingle_session_init(
207 self,
208 client: SatXMPPEntity,
209 session: dict,
210 content_name: str,
211 ) -> domish.Element:
212 content_data = session["contents"][content_name]
213 transport_data = content_data["transport_data"]
214 ice_transport_elt = await self._ice_udp.jingle_session_init(
215 client,
216 session,
217 content_name
218 )
219 transport_elt = domish.Element(
220 (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"),
221 attribs={
222 "sctp-port": str(transport_data["sctp-port"]),
223 "max-message-size": str(transport_data["max-message-size"])
224 }
225 )
226 transport_elt.addChild(ice_transport_elt)
227 return transport_elt
228
229 async def _call_ice_udp_handler(
230 self,
231 client: SatXMPPEntity,
232 action: str,
233 session: dict,
234 content_name: str,
235 transport_elt: domish.Element,
236 ):
237 """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it"""
238 try:
239 ice_transport_elt = next(
240 transport_elt.elements(self._ice_udp.namespace, "transport")
241 )
242 except StopIteration:
243 raise exceptions.DataError("Missing ICE UDP <transport> element.")
244 else:
245 await self._ice_udp.jingle_handler(
246 client, action, session, content_name, ice_transport_elt
247 )
248
249 async def jingle_handler(
250 self,
251 client: SatXMPPEntity,
252 action: str,
253 session: dict,
254 content_name: str,
255 transport_elt: domish.Element,
256 ) -> domish.Element:
257 """Handle Jingle requests
258
259 @param client: The SatXMPPEntity instance.
260 @param action: The action to be performed with the session.
261 @param session: A dictionary containing the session information.
262 @param content_name: The name of the content.
263 @param transport_elt: The domish.Element instance representing the transport
264 element.
265
266 @return: <transport> element
267 """
268 content_data = session["contents"][content_name]
269 transport_data = content_data["transport_data"]
270 if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR):
271 session.setdefault("metadata", {})
272 session.setdefault("peer_metadata", {})
273 # we have to set application data despite being a transport handler,
274 # because the SDP generation needs application data
275 application_data = content_data["application_data"]
276 application_data.setdefault("peer_data", {})
277 application_data.setdefault("media", "application")
278
279 if action == self._j.A_PREPARE_CONFIRMATION:
280 await self._call_ice_udp_handler(
281 client, action, session, content_name, transport_elt
282 )
283 try:
284 transport_data["sctp-port"] = int(transport_elt["sctp-port"])
285 transport_data["max-message-size"] = int(
286 transport_elt.getAttribute("max-message-size", 65536)
287 )
288 except (KeyError, ValueError):
289 raise exceptions.DataError(
290 f"Invalid datachannel signaling element: {transport_elt.toXml()}"
291 )
292 transport_data["webrtc"] = True
293 elif action in (
294 self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR,
295 self._j.A_TRANSPORT_INFO
296 ):
297 await self._call_ice_udp_handler(
298 client, action, session, content_name, transport_elt
299 )
300 elif action == self._j.A_SESSION_ACCEPT:
301 await self._call_ice_udp_handler(
302 client, action, session, content_name, transport_elt
303 )
304 answer_sdp = mapping.generate_sdp_from_session(session)
305 self.host.bridge.call_setup(
306 session["id"],
307 data_format.serialise(
308 {
309 "role": session["role"],
310 "sdp": answer_sdp,
311 }
312 ),
313 client.profile,
314 )
315 elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER):
316 pass
317 elif action == self._j.A_START:
318 pass
319 elif action == self._j.A_SESSION_INITIATE:
320 # responder side
321
322 sdp = mapping.generate_sdp_from_session(session)
323 session["answer_sdp_d"] = answer_sdp_d = defer.Deferred()
324 # we should have the answer long before 2 min
325 answer_sdp_d.addTimeout(2 * 60, reactor)
326
327 self.host.bridge.call_setup(
328 session["id"],
329 data_format.serialise(
330 {
331 "role": session["role"],
332 "sdp": sdp,
333 }
334 ),
335 client.profile,
336 )
337
338 answer_sdp = await answer_sdp_d
339 parsed_answer = mapping.parse_sdp(answer_sdp)
340 session["metadata"].update(parsed_answer["metadata"])
341 contents = session["contents"]
342 if len(contents) != 1:
343 raise NotImplementedError(
344 "Only a singlecontent is supported at the moment."
345 )
346 content = next(iter(contents.values()))
347 media_data = parsed_answer["application"]
348 application_data = content["application_data"]
349 application_data["local_data"] = media_data["application_data"]
350 transport_data = content["transport_data"]
351 local_ice_data = media_data["transport_data"]
352 transport_data["local_ice_data"] = local_ice_data
353 transport_elt.children.clear()
354 ice_transport_elt = await self._ice_udp.jingle_handler(
355 client, action, session, content_name, transport_elt
356 )
357 transport_elt.addChild(ice_transport_elt)
358 elif action == self._j.A_DESTROY:
359 # the transport is replaced (fallback ?)
360 pass
361 else:
362 log.warning(f"FIXME: unmanaged action {action}")
363
364 return transport_elt
365
366
367 @implementer(iwokkel.IDisco)
368 class XEP0343Handler(XMPPHandler):
369 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
370 return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)]
371
372 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
373 return []