Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0343.py @ 4231:e11b13418ba6
plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation:
Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the
XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@,
Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to
work on another version and provided me with this link:
https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my
implementation.
This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167
plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it
normally does. This is because WebRTC has several implementations (browser for web
interface, GStreamer for others), and file/data must be handled directly by the frontend.
This is particularly important for web frontends, as the file is not sent from the backend
but from the end-user's browser device.
Among the changes, there are:
- XEP-0343 implementation.
- `file_send` bridge method now use serialised dict as output.
- New `BaseTransportHandler.is_usable` method which get content data and returns a boolean
(default to `True`) to tell if this transport can actually be used in this context (when
we are initiator). Used in webRTC case to see if call data are available.
- Support of `application` media type, and everything necessary to handle data channels.
- Better confirmation message, with file name, size and description when available.
- When file is accepted in preflight, it is specified in following `action_new` signal for
actual file transfer. This way, frontend can avoid the display or 2 confirmation
messages.
- XEP-0166: when not specified, default `content` name is now its index number instead of
a UUID. This follows the behaviour of browsers.
- XEP-0353: better handling of events such as call taken by another device.
- various other updates.
rel 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:57:23 +0200 |
parents | |
children | 79c8a70e1813 |
comparison
equal
deleted
inserted
replaced
4230:314d3c02bb67 | 4231:e11b13418ba6 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin | |
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import Final | |
20 | |
21 from twisted.internet import defer, reactor | |
22 from twisted.words.protocols.jabber import jid | |
23 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
24 from twisted.words.xish import domish | |
25 from wokkel import disco, iwokkel | |
26 from zope.interface import implementer | |
27 | |
28 from libervia.backend.core import exceptions | |
29 from libervia.backend.core.constants import Const as C | |
30 from libervia.backend.core.i18n import _ | |
31 from libervia.backend.core.log import getLogger | |
32 from libervia.backend.core.xmpp import SatXMPPEntity | |
33 from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData | |
34 from libervia.backend.tools.common import data_format | |
35 | |
36 from .plugin_xep_0167 import mapping | |
37 | |
38 log = getLogger(__name__) | |
39 | |
40 | |
41 PLUGIN_INFO = { | |
42 C.PI_NAME: "WebRTC datachannels in Jingle", | |
43 C.PI_IMPORT_NAME: "XEP-0343", | |
44 C.PI_TYPE: "XEP", | |
45 C.PI_MODES: C.PLUG_MODE_BOTH, | |
46 C.PI_PROTOCOLS: [], | |
47 C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"], | |
48 C.PI_RECOMMENDATIONS: [], | |
49 C.PI_MAIN: "XEP_0343", | |
50 C.PI_HANDLER: "yes", | |
51 C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""), | |
52 } | |
53 NS_JINGLE_WEBRTC_DATACHANNELS: Final[ | |
54 str | |
55 ] = "urn:xmpp:jingle:transports:webrtc-datachannel:1" | |
56 | |
57 | |
58 class XEP_0343(BaseTransportHandler): | |
59 namespace = NS_JINGLE_WEBRTC_DATACHANNELS | |
60 | |
61 def __init__(self, host): | |
62 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") | |
63 self.host = host | |
64 self._ice_udp = host.plugins["XEP-0176"] | |
65 self._j = host.plugins["XEP-0166"] | |
66 self._j.register_transport( | |
67 NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000 | |
68 ) | |
69 self._rtp = host.plugins["XEP-0167"] | |
70 host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) | |
71 host.trigger.add( | |
72 "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger | |
73 ) | |
74 host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer) | |
75 host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send) | |
76 host.trigger.add( | |
77 "XEP-0234_file_jingle_send", self._file_jingle_send | |
78 ) | |
79 | |
80 def get_handler(self, client: SatXMPPEntity): | |
81 return XEP0343Handler() | |
82 | |
83 def is_usable(self, client, content_data: ContentData) -> bool: | |
84 try: | |
85 return content_data.app_kwargs["extra"]["webrtc"] | |
86 except KeyError: | |
87 return False | |
88 | |
89 def _parse_sdp_a_trigger( | |
90 self, | |
91 attribute: str, | |
92 parts: list[str], | |
93 call_data: dict, | |
94 metadata: dict, | |
95 media_type: str, | |
96 application_data: dict, | |
97 transport_data: dict, | |
98 ) -> None: | |
99 """Parse "sctp-port" and "max-message-size" attributes""" | |
100 try: | |
101 if attribute == "sctp-port": | |
102 transport_data["sctp-port"] = int(parts[0]) | |
103 elif attribute == "max-message-size": | |
104 transport_data["max-message-size"] = int(parts[0]) | |
105 except ValueError: | |
106 log.warning(f"Can't parse value of {attribute}, ignoring: {parts}") | |
107 | |
108 def _generate_sdp_content_trigger( | |
109 self, | |
110 session: dict, | |
111 local: bool, | |
112 idx: int, | |
113 content_data: dict, | |
114 sdp_lines: list[str], | |
115 application_data: dict, | |
116 app_data_key: str, | |
117 media_data: dict, | |
118 media: str | |
119 ) -> None: | |
120 """Generate "sctp-port" and "max-message-size" attributes""" | |
121 transport_data = content_data["transport_data"] | |
122 sctp_port = transport_data.get("sctp-port") | |
123 if sctp_port is not None: | |
124 sdp_lines.append(f"a=sctp-port:{sctp_port}") | |
125 | |
126 max_message_size = transport_data.get("max-message-size") | |
127 if max_message_size is not None: | |
128 sdp_lines.append(f"a=max-message-size:{max_message_size}") | |
129 | |
130 def _wrap_transport_element( | |
131 self, | |
132 transport_elt: domish.Element | |
133 ) -> None: | |
134 """Wrap the XEP-0176 transport in a transport with this XEP namespace | |
135 | |
136 @param transport_elt: ICE UDP <transport>. Must be already a child of a <content> | |
137 element. | |
138 """ | |
139 content_elt = transport_elt.parent | |
140 if content_elt is None or not content_elt.name == "content": | |
141 raise exceptions.InternalError("Was expecting <content> element.") | |
142 content_elt.children.remove(transport_elt) | |
143 wrapping_transport_elt = content_elt.addElement( | |
144 (NS_JINGLE_WEBRTC_DATACHANNELS, "transport") | |
145 ) | |
146 wrapping_transport_elt.addChild(transport_elt) | |
147 | |
148 def _on_send_ice_buffer( | |
149 self, | |
150 client: SatXMPPEntity, | |
151 session: dict, | |
152 content_name: str, | |
153 content_data: dict, | |
154 transport_elt: domish.Element, | |
155 iq_elt: domish.Element | |
156 ) -> bool: | |
157 if content_data["transport"].handler == self: | |
158 self._wrap_transport_element(transport_elt) | |
159 return True | |
160 | |
161 def _on_ice_candidate_send( | |
162 self, | |
163 client: SatXMPPEntity, | |
164 session: dict, | |
165 media_ice_data: dict[str, dict], | |
166 content_name: str, | |
167 content_data: dict, | |
168 iq_elt: domish.Element | |
169 ) -> bool: | |
170 if content_data["transport"].handler == self: | |
171 transport_elt = iq_elt.jingle.content.transport | |
172 if transport_elt.uri != self._ice_udp.namespace: | |
173 raise exceptions.InternalError("Was expecting an ICE UDP transport") | |
174 self._wrap_transport_element(transport_elt) | |
175 return True | |
176 | |
177 async def _file_jingle_send( | |
178 self, | |
179 client: SatXMPPEntity, | |
180 peer_jid: jid.JID, | |
181 content: dict | |
182 ) -> None: | |
183 call_data = content["app_kwargs"]["extra"].pop("call_data", None) | |
184 if call_data: | |
185 metadata = self._rtp.parse_call_data(call_data) | |
186 try: | |
187 application_data = call_data["application"] | |
188 except KeyError: | |
189 raise exceptions.DataError( | |
190 '"call_data" must have an application media.' | |
191 ) | |
192 try: | |
193 content["transport_data"] = { | |
194 "sctp-port": metadata["sctp-port"], | |
195 "max-message-size": metadata.get("max-message-size", 65536), | |
196 "local_ice_data": { | |
197 "ufrag": metadata["ice-ufrag"], | |
198 "pwd": metadata["ice-pwd"], | |
199 "candidates": application_data.pop("ice-candidates"), | |
200 "fingerprint": application_data.pop("fingerprint", {}), | |
201 } | |
202 } | |
203 except KeyError as e: | |
204 raise exceptions.DataError(f"Mandatory key is missing: {e}") | |
205 | |
206 async def jingle_session_init( | |
207 self, | |
208 client: SatXMPPEntity, | |
209 session: dict, | |
210 content_name: str, | |
211 ) -> domish.Element: | |
212 content_data = session["contents"][content_name] | |
213 transport_data = content_data["transport_data"] | |
214 ice_transport_elt = await self._ice_udp.jingle_session_init( | |
215 client, | |
216 session, | |
217 content_name | |
218 ) | |
219 transport_elt = domish.Element( | |
220 (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"), | |
221 attribs={ | |
222 "sctp-port": str(transport_data["sctp-port"]), | |
223 "max-message-size": str(transport_data["max-message-size"]) | |
224 } | |
225 ) | |
226 transport_elt.addChild(ice_transport_elt) | |
227 return transport_elt | |
228 | |
229 async def _call_ice_udp_handler( | |
230 self, | |
231 client: SatXMPPEntity, | |
232 action: str, | |
233 session: dict, | |
234 content_name: str, | |
235 transport_elt: domish.Element, | |
236 ): | |
237 """Unwrap XEP-0176 <transport> element, and call its Jingle handler with it""" | |
238 try: | |
239 ice_transport_elt = next( | |
240 transport_elt.elements(self._ice_udp.namespace, "transport") | |
241 ) | |
242 except StopIteration: | |
243 raise exceptions.DataError("Missing ICE UDP <transport> element.") | |
244 else: | |
245 await self._ice_udp.jingle_handler( | |
246 client, action, session, content_name, ice_transport_elt | |
247 ) | |
248 | |
249 async def jingle_handler( | |
250 self, | |
251 client: SatXMPPEntity, | |
252 action: str, | |
253 session: dict, | |
254 content_name: str, | |
255 transport_elt: domish.Element, | |
256 ) -> domish.Element: | |
257 """Handle Jingle requests | |
258 | |
259 @param client: The SatXMPPEntity instance. | |
260 @param action: The action to be performed with the session. | |
261 @param session: A dictionary containing the session information. | |
262 @param content_name: The name of the content. | |
263 @param transport_elt: The domish.Element instance representing the transport | |
264 element. | |
265 | |
266 @return: <transport> element | |
267 """ | |
268 content_data = session["contents"][content_name] | |
269 transport_data = content_data["transport_data"] | |
270 if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): | |
271 session.setdefault("metadata", {}) | |
272 session.setdefault("peer_metadata", {}) | |
273 # we have to set application data despite being a transport handler, | |
274 # because the SDP generation needs application data | |
275 application_data = content_data["application_data"] | |
276 application_data.setdefault("peer_data", {}) | |
277 application_data.setdefault("media", "application") | |
278 | |
279 if action == self._j.A_PREPARE_CONFIRMATION: | |
280 await self._call_ice_udp_handler( | |
281 client, action, session, content_name, transport_elt | |
282 ) | |
283 try: | |
284 transport_data["sctp-port"] = int(transport_elt["sctp-port"]) | |
285 transport_data["max-message-size"] = int( | |
286 transport_elt.getAttribute("max-message-size", 65536) | |
287 ) | |
288 except (KeyError, ValueError): | |
289 raise exceptions.DataError( | |
290 f"Invalid datachannel signaling element: {transport_elt.toXml()}" | |
291 ) | |
292 transport_data["webrtc"] = True | |
293 elif action in ( | |
294 self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR, | |
295 self._j.A_TRANSPORT_INFO | |
296 ): | |
297 await self._call_ice_udp_handler( | |
298 client, action, session, content_name, transport_elt | |
299 ) | |
300 elif action == self._j.A_SESSION_ACCEPT: | |
301 await self._call_ice_udp_handler( | |
302 client, action, session, content_name, transport_elt | |
303 ) | |
304 answer_sdp = mapping.generate_sdp_from_session(session) | |
305 self.host.bridge.call_setup( | |
306 session["id"], | |
307 data_format.serialise( | |
308 { | |
309 "role": session["role"], | |
310 "sdp": answer_sdp, | |
311 } | |
312 ), | |
313 client.profile, | |
314 ) | |
315 elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): | |
316 pass | |
317 elif action == self._j.A_START: | |
318 pass | |
319 elif action == self._j.A_SESSION_INITIATE: | |
320 # responder side | |
321 | |
322 sdp = mapping.generate_sdp_from_session(session) | |
323 session["answer_sdp_d"] = answer_sdp_d = defer.Deferred() | |
324 # we should have the answer long before 2 min | |
325 answer_sdp_d.addTimeout(2 * 60, reactor) | |
326 | |
327 self.host.bridge.call_setup( | |
328 session["id"], | |
329 data_format.serialise( | |
330 { | |
331 "role": session["role"], | |
332 "sdp": sdp, | |
333 } | |
334 ), | |
335 client.profile, | |
336 ) | |
337 | |
338 answer_sdp = await answer_sdp_d | |
339 parsed_answer = mapping.parse_sdp(answer_sdp) | |
340 session["metadata"].update(parsed_answer["metadata"]) | |
341 contents = session["contents"] | |
342 if len(contents) != 1: | |
343 raise NotImplementedError( | |
344 "Only a singlecontent is supported at the moment." | |
345 ) | |
346 content = next(iter(contents.values())) | |
347 media_data = parsed_answer["application"] | |
348 application_data = content["application_data"] | |
349 application_data["local_data"] = media_data["application_data"] | |
350 transport_data = content["transport_data"] | |
351 local_ice_data = media_data["transport_data"] | |
352 transport_data["local_ice_data"] = local_ice_data | |
353 transport_elt.children.clear() | |
354 ice_transport_elt = await self._ice_udp.jingle_handler( | |
355 client, action, session, content_name, transport_elt | |
356 ) | |
357 transport_elt.addChild(ice_transport_elt) | |
358 elif action == self._j.A_DESTROY: | |
359 # the transport is replaced (fallback ?) | |
360 pass | |
361 else: | |
362 log.warning(f"FIXME: unmanaged action {action}") | |
363 | |
364 return transport_elt | |
365 | |
366 | |
367 @implementer(iwokkel.IDisco) | |
368 class XEP0343Handler(XMPPHandler): | |
369 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
370 return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)] | |
371 | |
372 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
373 return [] |