comparison libervia/backend/plugins/plugin_xep_0353.py @ 4231:e11b13418ba6

plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:57:23 +0200
parents 6784d07b99c8
children 79c8a70e1813
comparison
equal deleted inserted replaced
4230:314d3c02bb67 4231:e11b13418ba6
27 from libervia.backend.core import exceptions 27 from libervia.backend.core import exceptions
28 from libervia.backend.core.constants import Const as C 28 from libervia.backend.core.constants import Const as C
29 from libervia.backend.core.core_types import SatXMPPEntity 29 from libervia.backend.core.core_types import SatXMPPEntity
30 from libervia.backend.core.i18n import D_, _ 30 from libervia.backend.core.i18n import D_, _
31 from libervia.backend.core.log import getLogger 31 from libervia.backend.core.log import getLogger
32 from libervia.backend.tools.xml_tools import element_copy
33
34 try:
35 from .plugin_xep_0167 import NS_JINGLE_RTP
36 except ImportError:
37 NS_JINGLE_RTP = None
32 38
33 log = getLogger(__name__) 39 log = getLogger(__name__)
34 40
35 41
36 NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0" 42 NS_JINGLE_MESSAGE = "urn:xmpp:jingle-message:0"
53 def __init__(self, reason: str, text: str|None = None): 59 def __init__(self, reason: str, text: str|None = None):
54 super().__init__(text) 60 super().__init__(text)
55 self.reason = reason 61 self.reason = reason
56 62
57 63
64 class TakenByOtherDeviceException(exceptions.CancelError):
65 reason: str = "taken_by_other_device"
66
67 def __init__(self, device_jid: jid.JID):
68 super().__init__(device_jid.full())
69 self.device_jid = device_jid
70
71
58 class RetractException(exceptions.CancelError): 72 class RetractException(exceptions.CancelError):
59 pass 73 pass
60 74
61 75
62 class XEP_0353: 76 class XEP_0353:
68 self._h = host.plugins["XEP-0334"] 82 self._h = host.plugins["XEP-0334"]
69 host.trigger.add_with_check( 83 host.trigger.add_with_check(
70 "XEP-0166_initiate_elt_built", 84 "XEP-0166_initiate_elt_built",
71 self, 85 self,
72 self._on_initiate_trigger, 86 self._on_initiate_trigger,
73 # this plugin set the resource, we want it to happen first to other trigger 87 # this plugin set the resource, we want it to happen first so other triggers
74 # can get the full peer JID 88 # can get the full peer JID
75 priority=host.trigger.MAX_PRIORITY, 89 priority=host.trigger.MAX_PRIORITY,
76 ) 90 )
77 host.trigger.add_with_check( 91 host.trigger.add_with_check(
78 "XEP-0166_terminate", 92 "XEP-0166_terminate",
138 mess_data = self.build_message_data(client, peer_jid, "propose", session["id"]) 152 mess_data = self.build_message_data(client, peer_jid, "propose", session["id"])
139 message_elt = mess_data["xml"] 153 message_elt = mess_data["xml"]
140 for content_data in session["contents"].values(): 154 for content_data in session["contents"].values():
141 # we get the full element build by the application plugin 155 # we get the full element build by the application plugin
142 jingle_description_elt = content_data["application_data"]["desc_elt"] 156 jingle_description_elt = content_data["application_data"]["desc_elt"]
143 # and copy it to only keep the root <description> element, no children 157
144 description_elt = domish.Element( 158 # we need to copy the element
145 (jingle_description_elt.uri, jingle_description_elt.name), 159 if jingle_description_elt.uri == NS_JINGLE_RTP:
146 defaultUri=jingle_description_elt.defaultUri, 160 # for RTP, we only keep the root <description> element, no children
147 attribs=jingle_description_elt.attributes, 161 description_elt = domish.Element(
148 localPrefixes=jingle_description_elt.localPrefixes, 162 (jingle_description_elt.uri, jingle_description_elt.name),
149 ) 163 defaultUri=jingle_description_elt.defaultUri,
164 attribs=jingle_description_elt.attributes,
165 localPrefixes=jingle_description_elt.localPrefixes,
166 )
167 else:
168 # Otherwise we keep the children to have application useful data
169 description_elt = element_copy(jingle_description_elt, with_parent=False)
170
150 message_elt.propose.addChild(description_elt) 171 message_elt.propose.addChild(description_elt)
151 response_d = defer.Deferred() 172 response_d = defer.Deferred()
152 # we wait for 2 min before cancelling the session init 173 # we wait for 2 min before cancelling the session init
153 # FIXME: let's application decide timeout? 174 # FIXME: let's application decide timeout?
154 response_d.addTimeout(2 * 60, reactor) 175 response_d.addTimeout(2 * 60, reactor)
204 return False 225 return False
205 226
206 async def _on_message_received(self, client, message_elt, post_treat): 227 async def _on_message_received(self, client, message_elt, post_treat):
207 for elt in message_elt.elements(): 228 for elt in message_elt.elements():
208 if elt.uri == NS_JINGLE_MESSAGE: 229 if elt.uri == NS_JINGLE_MESSAGE:
209 if elt.name == "propose": 230 # We use ensureDeferred to process the message initiation workflow in
210 return await self._handle_propose(client, message_elt, elt) 231 # parallel and to avoid blocking the message queue.
211 elif elt.name == "retract": 232 defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt))
212 return self._handle_retract(client, message_elt, elt) 233 return False
213 elif elt.name == "proceed":
214 return self._handle_proceed(client, message_elt, elt)
215 elif elt.name == "accept":
216 return self._handle_accept(client, message_elt, elt)
217 elif elt.name == "reject":
218 return self._handle_reject(client, message_elt, elt)
219 elif elt.name == "ringing":
220 return await self._handle_ringing(client, message_elt, elt)
221 else:
222 log.warning(f"invalid element: {elt.toXml}")
223 return True
224 return True 234 return True
225 235
226 def _get_sid_and_response_d( 236 async def _handle_mess_init(
237 self,
238 client: SatXMPPEntity,
239 message_elt: domish.Element,
240 mess_init_elt: domish.Element
241 ) -> None:
242 if mess_init_elt.name == "propose":
243 await self._handle_propose(client, message_elt, mess_init_elt)
244 elif mess_init_elt.name == "retract":
245 self._handle_retract(client, message_elt, mess_init_elt)
246 elif mess_init_elt.name == "proceed":
247 self._handle_proceed(client, message_elt, mess_init_elt)
248 elif mess_init_elt.name == "accept":
249 self._handle_accept(client, message_elt, mess_init_elt)
250 elif mess_init_elt.name == "reject":
251 self._handle_reject(client, message_elt, mess_init_elt)
252 elif mess_init_elt.name == "ringing":
253 await self._handle_ringing(client, message_elt, mess_init_elt)
254 else:
255 log.warning(f"invalid element: {mess_init_elt.toXml}")
256
257 def _get_sid_and_session_d(
227 self, 258 self,
228 client: SatXMPPEntity, 259 client: SatXMPPEntity,
229 elt: domish.Element 260 elt: domish.Element
230 ) -> tuple[str, defer.Deferred]: 261 ) -> tuple[str, defer.Deferred|list[defer.Deferred]]:
231 """Retrieve session ID and response_d from response element""" 262 """Retrieve session ID and deferred or list of deferred from response element"""
232 try: 263 try:
233 session_id = elt["id"] 264 session_id = elt["id"]
234 except KeyError as e: 265 except KeyError as e:
235 assert elt.parent is not None 266 assert elt.parent is not None
236 log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") 267 log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}")
237 raise e 268 raise e
238 try: 269 try:
239 response_d = client._xep_0353_pending_sessions[session_id] 270 session_d = client._xep_0353_pending_sessions[session_id]
240 except KeyError as e: 271 except KeyError as e:
241 log.warning( 272 log.warning(
242 _( 273 _(
243 "no pending session found with id {session_id}, did it timed out?" 274 "no pending session found with id {session_id}, did it timed out?"
244 ).format(session_id=session_id) 275 ).format(session_id=session_id)
245 ) 276 )
246 raise e 277 raise e
278 return session_id, session_d
279
280 def _get_sid_and_response_d(
281 self,
282 client: SatXMPPEntity,
283 elt: domish.Element
284 ) -> tuple[str, defer.Deferred]:
285 """Retrieve session ID and response_d from response element"""
286 session_id, response_d = self._get_sid_and_session_d(client, elt)
287 assert isinstance(response_d, defer.Deferred)
247 return session_id, response_d 288 return session_id, response_d
248 289
249 async def _handle_propose(self, client, message_elt, elt): 290 def _get_sid_and_preflight_d_list(
291 self,
292 client: SatXMPPEntity,
293 elt: domish.Element
294 ) -> tuple[str, list[defer.Deferred]]:
295 """Retrieve session ID and list of preflight_d from response element"""
296 session_id, preflight_d_list = self._get_sid_and_session_d(client, elt)
297 assert isinstance(preflight_d_list, list)
298 return session_id, preflight_d_list
299
300 async def _handle_propose(
301 self,
302 client: SatXMPPEntity,
303 message_elt: domish.Element,
304 elt: domish.Element
305 ) -> None:
250 peer_jid = jid.JID(message_elt["from"]) 306 peer_jid = jid.JID(message_elt["from"])
251 local_jid = jid.JID(message_elt["to"]) 307 local_jid = jid.JID(message_elt["to"])
252 session_id = elt["id"] 308 session_id = elt["id"]
253 try: 309 try:
254 desc_and_apps = [ 310 desc_and_apps = [
258 ] 314 ]
259 if not desc_and_apps: 315 if not desc_and_apps:
260 raise AttributeError 316 raise AttributeError
261 except AttributeError: 317 except AttributeError:
262 log.warning(f"Invalid propose element: {message_elt.toXml()}") 318 log.warning(f"Invalid propose element: {message_elt.toXml()}")
263 return False 319 return
264 except exceptions.NotFound: 320 except exceptions.NotFound:
265 log.warning( 321 log.warning(
266 f"There is not registered application to handle this " 322 f"There is not registered application to handle this "
267 f"proposal: {elt.toXml()}" 323 f"proposal: {elt.toXml()}"
268 ) 324 )
269 return False 325 return
270 326
271 if not desc_and_apps: 327 if not desc_and_apps:
272 log.warning("No application specified: {message_elt.toXml()}") 328 log.warning("No application specified: {message_elt.toXml()}")
273 return False 329 return
274 330
275 session = self._j.create_session( 331 session = self._j.create_session(
276 client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid 332 client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid
277 ) 333 )
278 334
282 # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user 338 # https://xmpp.org/extensions/xep-0353.html#ring , but we only do that if user
283 # is in roster to avoid presence leak of all our devices. 339 # is in roster to avoid presence leak of all our devices.
284 mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) 340 mess_data = self.build_message_data(client, peer_jid, "ringing", session_id)
285 await client.send_message_data(mess_data) 341 await client.send_message_data(mess_data)
286 342
287 for description_elt, application in desc_and_apps: 343 try:
344 for description_elt, application in desc_and_apps:
345 try:
346 preflight_d = defer.ensureDeferred(
347 application.handler.jingle_preflight(
348 client, session, description_elt
349 )
350 )
351 client._xep_0353_pending_sessions.setdefault(session_id, []).append(
352 preflight_d
353 )
354 await preflight_d
355 except TakenByOtherDeviceException as e:
356 log.info(f"The call has been takend by {e.device_jid}")
357 await application.handler.jingle_preflight_cancel(client, session, e)
358 self._j.delete_session(client, session_id)
359 return
360 except exceptions.CancelError as e:
361 log.info(f"{client.profile} refused the session: {e}")
362
363 if is_in_roster:
364 # peer is in our roster, we send reject to them, ou other devices
365 # will get carbon copies
366 reject_dest_jid = peer_jid
367 else:
368 # peer is not in our roster, we send the "reject" only to our own
369 # devices to make them stop ringing/doing notification, and we
370 # don't send anything to peer to avoid presence leak.
371 reject_dest_jid = client.jid.userhostJID()
372
373 mess_data = self.build_message_data(
374 client, reject_dest_jid, "reject", session_id
375 )
376 await client.send_message_data(mess_data)
377 self._j.delete_session(client, session_id)
378 return
379 except defer.CancelledError:
380 # raised when call is retracted before user can reply
381 self._j.delete_session(client, session_id)
382 return
383 finally:
288 try: 384 try:
289 await application.handler.jingle_preflight( 385 del client._xep_0353_pending_sessions[session_id]
290 client, session, description_elt 386 except KeyError:
291 ) 387 pass
292 except exceptions.CancelError as e:
293 log.info(f"{client.profile} refused the session: {e}")
294
295 if is_in_roster:
296 # peer is in our roster, we send reject to them, ou other devices will
297 # get carbon copies
298 reject_dest_jid = peer_jid
299 else:
300 # peer is not in our roster, we send the "reject" only to our own
301 # devices to make them stop ringing/doing notification, and we don't
302 # send anything to peer to avoid presence leak.
303 reject_dest_jid = client.jid.userhostJID()
304
305 mess_data = self.build_message_data(
306 client, reject_dest_jid, "reject", session_id
307 )
308 await client.send_message_data(mess_data)
309 self._j.delete_session(client, session_id)
310
311 return False
312 except defer.CancelledError:
313 # raised when call is retracted before user can reply
314 self._j.delete_session(client, session_id)
315 return False
316 388
317 if peer_jid.userhostJID() not in client.roster: 389 if peer_jid.userhostJID() not in client.roster:
318 await client.presence.available(peer_jid) 390 await client.presence.available(peer_jid)
319 391
320 mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) 392 mess_data = self.build_message_data(client, peer_jid, "proceed", session_id)
321 await client.send_message_data(mess_data) 393 await client.send_message_data(mess_data)
322
323 return False
324 394
325 def _handle_retract(self, client, message_elt, retract_elt): 395 def _handle_retract(self, client, message_elt, retract_elt):
326 try: 396 try:
327 session = self._j.get_session(client, retract_elt["id"]) 397 session = self._j.get_session(client, retract_elt["id"])
328 except KeyError: 398 except KeyError:
343 else: 413 else:
344 for d in cancellable_deferred: 414 for d in cancellable_deferred:
345 d.cancel() 415 d.cancel()
346 return False 416 return False
347 417
348 def _handle_proceed(self, client, message_elt, proceed_elt): 418 def _handle_proceed(
349 try: 419 self,
350 __, response_d = self._get_sid_and_response_d(client, proceed_elt) 420 client: SatXMPPEntity,
351 except KeyError: 421 message_elt: domish.Element,
352 return True 422 proceed_elt: domish.Element
353 423 ) -> None:
354 response_d.callback(jid.JID(message_elt["from"])) 424 from_jid = jid.JID(message_elt["from"])
355 return False 425 # session_d is the deferred of the session, it can be preflight_d or response_d
426 if from_jid.userhostJID() == client.jid.userhostJID():
427 # an other device took the session
428 try:
429 sid, preflight_d_list = self._get_sid_and_preflight_d_list(
430 client, proceed_elt
431 )
432 except KeyError:
433 return
434 for preflight_d in preflight_d_list:
435 if not preflight_d.called:
436 preflight_d.errback(TakenByOtherDeviceException(from_jid))
437
438 try:
439 session = self._j.get_session(client, sid)
440 except exceptions.NotFound:
441 log.warning("No session found with sid {sid!r}.")
442 else:
443 # jingle_preflight_cancel?
444 pass
445
446 # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always
447 # cleaned correctly (use a timeout?)
448
449 else:
450 try:
451 __, response_d = self._get_sid_and_response_d(client, proceed_elt)
452 except KeyError:
453 return
454 # we have a response deferred
455 response_d.callback(jid.JID(message_elt["from"]))
356 456
357 def _handle_accept(self, client, message_elt, accept_elt): 457 def _handle_accept(self, client, message_elt, accept_elt):
358 pass 458 pass
359 459
360 def _handle_reject(self, client, message_elt, reject_elt): 460 def _handle_reject(self, client, message_elt, reject_elt):