Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0353.py @ 4231:e11b13418ba6
plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation:
Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the
XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@,
Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to
work on another version and provided me with this link:
https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my
implementation.
This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167
plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it
normally does. This is because WebRTC has several implementations (browser for web
interface, GStreamer for others), and file/data must be handled directly by the frontend.
This is particularly important for web frontends, as the file is not sent from the backend
but from the end-user's browser device.
Among the changes, there are:
- XEP-0343 implementation.
- `file_send` bridge method now use serialised dict as output.
- New `BaseTransportHandler.is_usable` method which get content data and returns a boolean
(default to `True`) to tell if this transport can actually be used in this context (when
we are initiator). Used in webRTC case to see if call data are available.
- Support of `application` media type, and everything necessary to handle data channels.
- Better confirmation message, with file name, size and description when available.
- When file is accepted in preflight, it is specified in following `action_new` signal for
actual file transfer. This way, frontend can avoid the display or 2 confirmation
messages.
- XEP-0166: when not specified, default `content` name is now its index number instead of
a UUID. This follows the behaviour of browsers.
- XEP-0353: better handling of events such as call taken by another device.
- various other updates.
rel 441
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 12:57:23 +0200 |
parents | 6784d07b99c8 |
children | 79c8a70e1813 |
comparison
equal
deleted
inserted
replaced
4230:314d3c02bb67 | 4231:e11b13418ba6 |
---|---|
27 from libervia.backend.core import exceptions | 27 from libervia.backend.core import exceptions |
28 from libervia.backend.core.constants import Const as C | 28 from libervia.backend.core.constants import Const as C |
29 from libervia.backend.core.core_types import SatXMPPEntity | 29 from libervia.backend.core.core_types import SatXMPPEntity |
30 from libervia.backend.core.i18n import D_, _ | 30 from libervia.backend.core.i18n import D_, _ |
31 from libervia.backend.core.log import getLogger | 31 from libervia.backend.core.log import getLogger |
32 from libervia.backend.tools.xml_tools import element_copy | |
33 | |
34 try: | |
35 from .plugin_xep_0167 import NS_JINGLE_RTP | |
36 except ImportError: | |
37 NS_JINGLE_RTP = None | |
32 | 38 |
33 log = getLogger(__name__) | 39 log = getLogger(__name__) |
34 | 40 |
35 | 41 |
36 NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0" | 42 NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0" |
53 def __init__(self, reason: str, text: str|None = None): | 59 def __init__(self, reason: str, text: str|None = None): |
54 super().__init__(text) | 60 super().__init__(text) |
55 self.reason = reason | 61 self.reason = reason |
56 | 62 |
57 | 63 |
64 class TakenByOtherDeviceException(exceptions.CancelError): | |
65 reason: str = "taken_by_other_device" | |
66 | |
67 def __init__(self, device_jid: jid.JID): | |
68 super().__init__(device_jid.full()) | |
69 self.device_jid = device_jid | |
70 | |
71 | |
58 class RetractException(exceptions.CancelError): | 72 class RetractException(exceptions.CancelError): |
59 pass | 73 pass |
60 | 74 |
61 | 75 |
62 class XEP_0353: | 76 class XEP_0353: |
68 self._h = host.plugins["XEP-0334"] | 82 self._h = host.plugins["XEP-0334"] |
69 host.trigger.add_with_check( | 83 host.trigger.add_with_check( |
70 "XEP-0166_initiate_elt_built", | 84 "XEP-0166_initiate_elt_built", |
71 self, | 85 self, |
72 self._on_initiate_trigger, | 86 self._on_initiate_trigger, |
73 # this plugin set the resource, we want it to happen first to other trigger | 87 # this plugin set the resource, we want it to happen first so other triggers |
74 # can get the full peer JID | 88 # can get the full peer JID |
75 priority=host.trigger.MAX_PRIORITY, | 89 priority=host.trigger.MAX_PRIORITY, |
76 ) | 90 ) |
77 host.trigger.add_with_check( | 91 host.trigger.add_with_check( |
78 "XEP-0166_terminate", | 92 "XEP-0166_terminate", |
138 mess_data = self.build_message_data(client, peer_jid, "propose", session["id"]) | 152 mess_data = self.build_message_data(client, peer_jid, "propose", session["id"]) |
139 message_elt = mess_data["xml"] | 153 message_elt = mess_data["xml"] |
140 for content_data in session["contents"].values(): | 154 for content_data in session["contents"].values(): |
141 # we get the full element build by the application plugin | 155 # we get the full element build by the application plugin |
142 jingle_description_elt = content_data["application_data"]["desc_elt"] | 156 jingle_description_elt = content_data["application_data"]["desc_elt"] |
143 # and copy it to only keep the root <description> element, no children | 157 |
144 description_elt = domish.Element( | 158 # we need to copy the element |
145 (jingle_description_elt.uri, jingle_description_elt.name), | 159 if jingle_description_elt.uri == NS_JINGLE_RTP: |
146 defaultUri=jingle_description_elt.defaultUri, | 160 # for RTP, we only keep the root <description> element, no children |
147 attribs=jingle_description_elt.attributes, | 161 description_elt = domish.Element( |
148 localPrefixes=jingle_description_elt.localPrefixes, | 162 (jingle_description_elt.uri, jingle_description_elt.name), |
149 ) | 163 defaultUri=jingle_description_elt.defaultUri, |
164 attribs=jingle_description_elt.attributes, | |
165 localPrefixes=jingle_description_elt.localPrefixes, | |
166 ) | |
167 else: | |
168 # Otherwise we keep the children to have application useful data | |
169 description_elt = element_copy(jingle_description_elt, with_parent=False) | |
170 | |
150 message_elt.propose.addChild(description_elt) | 171 message_elt.propose.addChild(description_elt) |
151 response_d = defer.Deferred() | 172 response_d = defer.Deferred() |
152 # we wait for 2 min before cancelling the session init | 173 # we wait for 2 min before cancelling the session init |
153 # FIXME: let's application decide timeout? | 174 # FIXME: let's application decide timeout? |
154 response_d.addTimeout(2 * 60, reactor) | 175 response_d.addTimeout(2 * 60, reactor) |
204 return False | 225 return False |
205 | 226 |
206 async def _on_message_received(self, client, message_elt, post_treat): | 227 async def _on_message_received(self, client, message_elt, post_treat): |
207 for elt in message_elt.elements(): | 228 for elt in message_elt.elements(): |
208 if elt.uri == NS_JINGLE_MESSAGE: | 229 if elt.uri == NS_JINGLE_MESSAGE: |
209 if elt.name == "propose": | 230 # We use ensureDeferred to process the message initiation workflow in |
210 return await self._handle_propose(client, message_elt, elt) | 231 # parallel and to avoid blocking the message queue. |
211 elif elt.name == "retract": | 232 defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt)) |
212 return self._handle_retract(client, message_elt, elt) | 233 return False |
213 elif elt.name == "proceed": | |
214 return self._handle_proceed(client, message_elt, elt) | |
215 elif elt.name == "accept": | |
216 return self._handle_accept(client, message_elt, elt) | |
217 elif elt.name == "reject": | |
218 return self._handle_reject(client, message_elt, elt) | |
219 elif elt.name == "ringing": | |
220 return await self._handle_ringing(client, message_elt, elt) | |
221 else: | |
222 log.warning(f"invalid element: {elt.toXml}") | |
223 return True | |
224 return True | 234 return True |
225 | 235 |
226 def _get_sid_and_response_d( | 236 async def _handle_mess_init( |
237 self, | |
238 client: SatXMPPEntity, | |
239 message_elt: domish.Element, | |
240 mess_init_elt: domish.Element | |
241 ) -> None: | |
242 if mess_init_elt.name == "propose": | |
243 await self._handle_propose(client, message_elt, mess_init_elt) | |
244 elif mess_init_elt.name == "retract": | |
245 self._handle_retract(client, message_elt, mess_init_elt) | |
246 elif mess_init_elt.name == "proceed": | |
247 self._handle_proceed(client, message_elt, mess_init_elt) | |
248 elif mess_init_elt.name == "accept": | |
249 self._handle_accept(client, message_elt, mess_init_elt) | |
250 elif mess_init_elt.name == "reject": | |
251 self._handle_reject(client, message_elt, mess_init_elt) | |
252 elif mess_init_elt.name == "ringing": | |
253 await self._handle_ringing(client, message_elt, mess_init_elt) | |
254 else: | |
255 log.warning(f"invalid element: {mess_init_elt.toXml}") | |
256 | |
257 def _get_sid_and_session_d( | |
227 self, | 258 self, |
228 client: SatXMPPEntity, | 259 client: SatXMPPEntity, |
229 elt: domish.Element | 260 elt: domish.Element |
230 ) -> tuple[str, defer.Deferred]: | 261 ) -> tuple[str, defer.Deferred|list[defer.Deferred]]: |
231 """Retrieve session ID and response_d from response element""" | 262 """Retrieve session ID and deferred or list of deferred from response element""" |
232 try: | 263 try: |
233 session_id = elt["id"] | 264 session_id = elt["id"] |
234 except KeyError as e: | 265 except KeyError as e: |
235 assert elt.parent is not None | 266 assert elt.parent is not None |
236 log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") | 267 log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") |
237 raise e | 268 raise e |
238 try: | 269 try: |
239 response_d = client._xep_0353_pending_sessions[session_id] | 270 session_d = client._xep_0353_pending_sessions[session_id] |
240 except KeyError as e: | 271 except KeyError as e: |
241 log.warning( | 272 log.warning( |
242 _( | 273 _( |
243 "no pending session found with id {session_id}, did it timed out?" | 274 "no pending session found with id {session_id}, did it timed out?" |
244 ).format(session_id=session_id) | 275 ).format(session_id=session_id) |
245 ) | 276 ) |
246 raise e | 277 raise e |
278 return session_id, session_d | |
279 | |
280 def _get_sid_and_response_d( | |
281 self, | |
282 client: SatXMPPEntity, | |
283 elt: domish.Element | |
284 ) -> tuple[str, defer.Deferred]: | |
285 """Retrieve session ID and response_d from response element""" | |
286 session_id, response_d = self._get_sid_and_session_d(client, elt) | |
287 assert isinstance(response_d, defer.Deferred) | |
247 return session_id, response_d | 288 return session_id, response_d |
248 | 289 |
249 async def _handle_propose(self, client, message_elt, elt): | 290 def _get_sid_and_preflight_d_list( |
291 self, | |
292 client: SatXMPPEntity, | |
293 elt: domish.Element | |
294 ) -> tuple[str, list[defer.Deferred]]: | |
295 """Retrieve session ID and list of preflight_d from response element""" | |
296 session_id, preflight_d_list = self._get_sid_and_session_d(client, elt) | |
297 assert isinstance(preflight_d_list, list) | |
298 return session_id, preflight_d_list | |
299 | |
300 async def _handle_propose( | |
301 self, | |
302 client: SatXMPPEntity, | |
303 message_elt: domish.Element, | |
304 elt: domish.Element | |
305 ) -> None: | |
250 peer_jid = jid.JID(message_elt["from"]) | 306 peer_jid = jid.JID(message_elt["from"]) |
251 local_jid = jid.JID(message_elt["to"]) | 307 local_jid = jid.JID(message_elt["to"]) |
252 session_id = elt["id"] | 308 session_id = elt["id"] |
253 try: | 309 try: |
254 desc_and_apps = [ | 310 desc_and_apps = [ |
258 ] | 314 ] |
259 if not desc_and_apps: | 315 if not desc_and_apps: |
260 raise AttributeError | 316 raise AttributeError |
261 except AttributeError: | 317 except AttributeError: |
262 log.warning(f"Invalid propose element: {message_elt.toXml()}") | 318 log.warning(f"Invalid propose element: {message_elt.toXml()}") |
263 return False | 319 return |
264 except exceptions.NotFound: | 320 except exceptions.NotFound: |
265 log.warning( | 321 log.warning( |
266 f"There is not registered application to handle this " | 322 f"There is not registered application to handle this " |
267 f"proposal: {elt.toXml()}" | 323 f"proposal: {elt.toXml()}" |
268 ) | 324 ) |
269 return False | 325 return |
270 | 326 |
271 if not desc_and_apps: | 327 if not desc_and_apps: |
272 log.warning("No application specified: {message_elt.toXml()}") | 328 log.warning("No application specified: {message_elt.toXml()}") |
273 return False | 329 return |
274 | 330 |
275 session = self._j.create_session( | 331 session = self._j.create_session( |
276 client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid | 332 client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid |
277 ) | 333 ) |
278 | 334 |
282 # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user | 338 # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user |
283 # is in roster to avoid presence leak of all our devices. | 339 # is in roster to avoid presence leak of all our devices. |
284 mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) | 340 mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) |
285 await client.send_message_data(mess_data) | 341 await client.send_message_data(mess_data) |
286 | 342 |
287 for description_elt, application in desc_and_apps: | 343 try: |
344 for description_elt, application in desc_and_apps: | |
345 try: | |
346 preflight_d = defer.ensureDeferred( | |
347 application.handler.jingle_preflight( | |
348 client, session, description_elt | |
349 ) | |
350 ) | |
351 client._xep_0353_pending_sessions.setdefault(session_id, []).append( | |
352 preflight_d | |
353 ) | |
354 await preflight_d | |
355 except TakenByOtherDeviceException as e: | |
356 log.info(f"The call has been takend by {e.device_jid}") | |
357 await application.handler.jingle_preflight_cancel(client, session, e) | |
358 self._j.delete_session(client, session_id) | |
359 return | |
360 except exceptions.CancelError as e: | |
361 log.info(f"{client.profile} refused the session: {e}") | |
362 | |
363 if is_in_roster: | |
364 # peer is in our roster, we send reject to them, ou other devices | |
365 # will get carbon copies | |
366 reject_dest_jid = peer_jid | |
367 else: | |
368 # peer is not in our roster, we send the "reject" only to our own | |
369 # devices to make them stop ringing/doing notification, and we | |
370 # don't send anything to peer to avoid presence leak. | |
371 reject_dest_jid = client.jid.userhostJID() | |
372 | |
373 mess_data = self.build_message_data( | |
374 client, reject_dest_jid, "reject", session_id | |
375 ) | |
376 await client.send_message_data(mess_data) | |
377 self._j.delete_session(client, session_id) | |
378 return | |
379 except defer.CancelledError: | |
380 # raised when call is retracted before user can reply | |
381 self._j.delete_session(client, session_id) | |
382 return | |
383 finally: | |
288 try: | 384 try: |
289 await application.handler.jingle_preflight( | 385 del client._xep_0353_pending_sessions[session_id] |
290 client, session, description_elt | 386 except KeyError: |
291 ) | 387 pass |
292 except exceptions.CancelError as e: | |
293 log.info(f"{client.profile} refused the session: {e}") | |
294 | |
295 if is_in_roster: | |
296 # peer is in our roster, we send reject to them, ou other devices will | |
297 # get carbon copies | |
298 reject_dest_jid = peer_jid | |
299 else: | |
300 # peer is not in our roster, we send the "reject" only to our own | |
301 # devices to make them stop ringing/doing notification, and we don't | |
302 # send anything to peer to avoid presence leak. | |
303 reject_dest_jid = client.jid.userhostJID() | |
304 | |
305 mess_data = self.build_message_data( | |
306 client, reject_dest_jid, "reject", session_id | |
307 ) | |
308 await client.send_message_data(mess_data) | |
309 self._j.delete_session(client, session_id) | |
310 | |
311 return False | |
312 except defer.CancelledError: | |
313 # raised when call is retracted before user can reply | |
314 self._j.delete_session(client, session_id) | |
315 return False | |
316 | 388 |
317 if peer_jid.userhostJID() not in client.roster: | 389 if peer_jid.userhostJID() not in client.roster: |
318 await client.presence.available(peer_jid) | 390 await client.presence.available(peer_jid) |
319 | 391 |
320 mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) | 392 mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) |
321 await client.send_message_data(mess_data) | 393 await client.send_message_data(mess_data) |
322 | |
323 return False | |
324 | 394 |
325 def _handle_retract(self, client, message_elt, retract_elt): | 395 def _handle_retract(self, client, message_elt, retract_elt): |
326 try: | 396 try: |
327 session = self._j.get_session(client, retract_elt["id"]) | 397 session = self._j.get_session(client, retract_elt["id"]) |
328 except KeyError: | 398 except KeyError: |
343 else: | 413 else: |
344 for d in cancellable_deferred: | 414 for d in cancellable_deferred: |
345 d.cancel() | 415 d.cancel() |
346 return False | 416 return False |
347 | 417 |
348 def _handle_proceed(self, client, message_elt, proceed_elt): | 418 def _handle_proceed( |
349 try: | 419 self, |
350 __, response_d = self._get_sid_and_response_d(client, proceed_elt) | 420 client: SatXMPPEntity, |
351 except KeyError: | 421 message_elt: domish.Element, |
352 return True | 422 proceed_elt: domish.Element |
353 | 423 ) -> None: |
354 response_d.callback(jid.JID(message_elt["from"])) | 424 from_jid = jid.JID(message_elt["from"]) |
355 return False | 425 # session_d is the deferred of the session, it can be preflight_d or response_d |
426 if from_jid.userhostJID() == client.jid.userhostJID(): | |
427 # an other device took the session | |
428 try: | |
429 sid, preflight_d_list = self._get_sid_and_preflight_d_list( | |
430 client, proceed_elt | |
431 ) | |
432 except KeyError: | |
433 return | |
434 for preflight_d in preflight_d_list: | |
435 if not preflight_d.called: | |
436 preflight_d.errback(TakenByOtherDeviceException(from_jid)) | |
437 | |
438 try: | |
439 session = self._j.get_session(client, sid) | |
440 except exceptions.NotFound: | |
441 log.warning("No session found with sid {sid!r}.") | |
442 else: | |
443 # jingle_preflight_cancel? | |
444 pass | |
445 | |
446 # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always | |
447 # cleaned correctly (use a timeout?) | |
448 | |
449 else: | |
450 try: | |
451 __, response_d = self._get_sid_and_response_d(client, proceed_elt) | |
452 except KeyError: | |
453 return | |
454 # we have a response deferred | |
455 response_d.callback(jid.JID(message_elt["from"])) | |
356 | 456 |
357 def _handle_accept(self, client, message_elt, accept_elt): | 457 def _handle_accept(self, client, message_elt, accept_elt): |
358 pass | 458 pass |
359 | 459 |
360 def _handle_reject(self, client, message_elt, reject_elt): | 460 def _handle_reject(self, client, message_elt, reject_elt): |