diff libervia/backend/plugins/plugin_xep_0353.py @ 4231:e11b13418ba6

plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:57:23 +0200
parents 6784d07b99c8
children 79c8a70e1813
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0353.py	Sat Apr 06 12:21:04 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0353.py	Sat Apr 06 12:57:23 2024 +0200
@@ -29,6 +29,12 @@
 from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.core.i18n import D_, _
 from libervia.backend.core.log import getLogger
+from libervia.backend.tools.xml_tools import element_copy
+
+try:
+    from .plugin_xep_0167 import NS_JINGLE_RTP
+except ImportError:
+    NS_JINGLE_RTP = None
 
 log = getLogger(__name__)
 
@@ -55,6 +61,14 @@
         self.reason = reason
 
 
+class TakenByOtherDeviceException(exceptions.CancelError):
+    reason: str = "taken_by_other_device"
+
+    def __init__(self, device_jid: jid.JID):
+        super().__init__(device_jid.full())
+        self.device_jid = device_jid
+
+
 class RetractException(exceptions.CancelError):
     pass
 
@@ -70,7 +84,7 @@
             "XEP-0166_initiate_elt_built",
             self,
             self._on_initiate_trigger,
-            # this plugin set the resource, we want it to happen first to other trigger
+            # this plugin set the resource, we want it to happen first so other triggers
             # can get the full peer JID
             priority=host.trigger.MAX_PRIORITY,
         )
@@ -140,13 +154,20 @@
         for content_data in session["contents"].values():
             # we get the full element build by the application plugin
             jingle_description_elt = content_data["application_data"]["desc_elt"]
-            # and copy it to only keep the root <description> element, no children
-            description_elt = domish.Element(
-                (jingle_description_elt.uri, jingle_description_elt.name),
-                defaultUri=jingle_description_elt.defaultUri,
-                attribs=jingle_description_elt.attributes,
-                localPrefixes=jingle_description_elt.localPrefixes,
-            )
+
+            # we need to copy the element
+            if jingle_description_elt.uri == NS_JINGLE_RTP:
+                # for RTP, we only keep the root <description> element, no children
+                description_elt = domish.Element(
+                    (jingle_description_elt.uri, jingle_description_elt.name),
+                    defaultUri=jingle_description_elt.defaultUri,
+                    attribs=jingle_description_elt.attributes,
+                    localPrefixes=jingle_description_elt.localPrefixes,
+                )
+            else:
+                # Otherwise we keep the children to have application useful data
+                description_elt = element_copy(jingle_description_elt, with_parent=False)
+
             message_elt.propose.addChild(description_elt)
         response_d = defer.Deferred()
         # we wait for 2 min before cancelling the session init
@@ -206,47 +227,82 @@
     async def _on_message_received(self, client, message_elt, post_treat):
         for elt in message_elt.elements():
             if elt.uri == NS_JINGLE_MESSAGE:
-                if elt.name == "propose":
-                    return await self._handle_propose(client, message_elt, elt)
-                elif elt.name == "retract":
-                    return self._handle_retract(client, message_elt, elt)
-                elif elt.name == "proceed":
-                    return self._handle_proceed(client, message_elt, elt)
-                elif elt.name == "accept":
-                    return self._handle_accept(client, message_elt, elt)
-                elif elt.name == "reject":
-                    return self._handle_reject(client, message_elt, elt)
-                elif elt.name == "ringing":
-                    return await self._handle_ringing(client, message_elt, elt)
-                else:
-                    log.warning(f"invalid element: {elt.toXml}")
-                    return True
+                # We use ensureDeferred to process the message initiation workflow in
+                # parallel and to avoid blocking the message queue.
+                defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt))
+                return False
         return True
 
+    async def _handle_mess_init(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        mess_init_elt: domish.Element
+    ) -> None:
+        if mess_init_elt.name == "propose":
+            await self._handle_propose(client, message_elt, mess_init_elt)
+        elif mess_init_elt.name == "retract":
+            self._handle_retract(client, message_elt, mess_init_elt)
+        elif mess_init_elt.name == "proceed":
+            self._handle_proceed(client, message_elt, mess_init_elt)
+        elif mess_init_elt.name == "accept":
+            self._handle_accept(client, message_elt, mess_init_elt)
+        elif mess_init_elt.name == "reject":
+            self._handle_reject(client, message_elt, mess_init_elt)
+        elif mess_init_elt.name == "ringing":
+            await self._handle_ringing(client, message_elt, mess_init_elt)
+        else:
+            log.warning(f"invalid element: {mess_init_elt.toXml}")
+
+    def _get_sid_and_session_d(
+        self,
+        client: SatXMPPEntity,
+        elt: domish.Element
+    ) -> tuple[str, defer.Deferred|list[defer.Deferred]]:
+        """Retrieve session ID and deferred or list of deferred from response element"""
+        try:
+            session_id = elt["id"]
+        except KeyError as e:
+            assert elt.parent is not None
+            log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}")
+            raise e
+        try:
+            session_d = client._xep_0353_pending_sessions[session_id]
+        except KeyError as e:
+            log.warning(
+                _(
+                    "no pending session found with id {session_id}, did it timed out?"
+                ).format(session_id=session_id)
+            )
+            raise e
+        return session_id, session_d
+
     def _get_sid_and_response_d(
         self,
         client: SatXMPPEntity,
         elt: domish.Element
     ) -> tuple[str, defer.Deferred]:
         """Retrieve session ID and response_d from response element"""
-        try:
-            session_id = elt["id"]
-        except KeyError as e:
-            assert elt.parent is not None
-            log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}")
-            raise e
-        try:
-            response_d = client._xep_0353_pending_sessions[session_id]
-        except KeyError as e:
-            log.warning(
-                _(
-                    "no pending session found with id {session_id}, did it timed out?"
-                ).format(session_id=session_id)
-            )
-            raise e
+        session_id, response_d = self._get_sid_and_session_d(client, elt)
+        assert isinstance(response_d, defer.Deferred)
         return session_id, response_d
 
-    async def _handle_propose(self, client, message_elt, elt):
+    def _get_sid_and_preflight_d_list(
+        self,
+        client: SatXMPPEntity,
+        elt: domish.Element
+    ) -> tuple[str, list[defer.Deferred]]:
+        """Retrieve session ID and list of preflight_d from response element"""
+        session_id, preflight_d_list = self._get_sid_and_session_d(client, elt)
+        assert isinstance(preflight_d_list, list)
+        return session_id, preflight_d_list
+
+    async def _handle_propose(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        elt: domish.Element
+    ) -> None:
         peer_jid = jid.JID(message_elt["from"])
         local_jid = jid.JID(message_elt["to"])
         session_id = elt["id"]
@@ -260,17 +316,17 @@
                 raise AttributeError
         except AttributeError:
             log.warning(f"Invalid propose element: {message_elt.toXml()}")
-            return False
+            return
         except exceptions.NotFound:
             log.warning(
                 f"There is not registered application to handle this "
                 f"proposal: {elt.toXml()}"
             )
-            return False
+            return
 
         if not desc_and_apps:
             log.warning("No application specified: {message_elt.toXml()}")
-            return False
+            return
 
         session = self._j.create_session(
             client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid
@@ -284,35 +340,51 @@
             mess_data = self.build_message_data(client, peer_jid, "ringing", session_id)
             await client.send_message_data(mess_data)
 
-        for description_elt, application in desc_and_apps:
-            try:
-                await application.handler.jingle_preflight(
-                    client, session, description_elt
-                )
-            except exceptions.CancelError as e:
-                log.info(f"{client.profile} refused the session: {e}")
+        try:
+            for description_elt, application in desc_and_apps:
+                try:
+                    preflight_d = defer.ensureDeferred(
+                        application.handler.jingle_preflight(
+                            client, session, description_elt
+                        )
+                    )
+                    client._xep_0353_pending_sessions.setdefault(session_id, []).append(
+                        preflight_d
+                    )
+                    await preflight_d
+                except TakenByOtherDeviceException as e:
+                    log.info(f"The call has been takend by {e.device_jid}")
+                    await application.handler.jingle_preflight_cancel(client, session, e)
+                    self._j.delete_session(client, session_id)
+                    return
+                except exceptions.CancelError as e:
+                    log.info(f"{client.profile} refused the session: {e}")
 
-                if is_in_roster:
-                    # peer is in our roster, we send reject to them, ou other devices will
-                    # get carbon copies
-                    reject_dest_jid = peer_jid
-                else:
-                    # peer is not in our roster, we send the "reject" only to our own
-                    # devices to make them stop ringing/doing notification, and we don't
-                    # send anything to peer to avoid presence leak.
-                    reject_dest_jid = client.jid.userhostJID()
+                    if is_in_roster:
+                        # peer is in our roster, we send reject to them, ou other devices
+                        # will get carbon copies
+                        reject_dest_jid = peer_jid
+                    else:
+                        # peer is not in our roster, we send the "reject" only to our own
+                        # devices to make them stop ringing/doing notification, and we
+                        # don't send anything to peer to avoid presence leak.
+                        reject_dest_jid = client.jid.userhostJID()
 
-                mess_data = self.build_message_data(
-                    client, reject_dest_jid, "reject", session_id
-                )
-                await client.send_message_data(mess_data)
-                self._j.delete_session(client, session_id)
-
-                return False
-            except defer.CancelledError:
-                # raised when call is retracted before user can reply
-                self._j.delete_session(client, session_id)
-                return False
+                    mess_data = self.build_message_data(
+                        client, reject_dest_jid, "reject", session_id
+                    )
+                    await client.send_message_data(mess_data)
+                    self._j.delete_session(client, session_id)
+                    return
+                except defer.CancelledError:
+                    # raised when call is retracted before user can reply
+                    self._j.delete_session(client, session_id)
+                    return
+        finally:
+            try:
+                del client._xep_0353_pending_sessions[session_id]
+            except KeyError:
+                pass
 
         if peer_jid.userhostJID() not in client.roster:
             await client.presence.available(peer_jid)
@@ -320,8 +392,6 @@
         mess_data = self.build_message_data(client, peer_jid, "proceed", session_id)
         await client.send_message_data(mess_data)
 
-        return False
-
     def _handle_retract(self, client, message_elt, retract_elt):
         try:
             session = self._j.get_session(client, retract_elt["id"])
@@ -345,14 +415,44 @@
                 d.cancel()
         return False
 
-    def _handle_proceed(self, client, message_elt, proceed_elt):
-        try:
-            __, response_d = self._get_sid_and_response_d(client, proceed_elt)
-        except KeyError:
-            return True
+    def _handle_proceed(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        proceed_elt: domish.Element
+    ) -> None:
+        from_jid = jid.JID(message_elt["from"])
+        # session_d is the deferred of the session, it can be preflight_d or response_d
+        if from_jid.userhostJID() == client.jid.userhostJID():
+            # an other device took the session
+            try:
+                sid, preflight_d_list = self._get_sid_and_preflight_d_list(
+                    client, proceed_elt
+                )
+            except KeyError:
+                return
+            for preflight_d in preflight_d_list:
+                if not preflight_d.called:
+                    preflight_d.errback(TakenByOtherDeviceException(from_jid))
 
-        response_d.callback(jid.JID(message_elt["from"]))
-        return False
+                try:
+                    session = self._j.get_session(client, sid)
+                except exceptions.NotFound:
+                    log.warning("No session found with sid {sid!r}.")
+                else:
+                    # jingle_preflight_cancel?
+                    pass
+
+            # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always
+            #   cleaned correctly (use a timeout?)
+
+        else:
+            try:
+                __, response_d = self._get_sid_and_response_d(client, proceed_elt)
+            except KeyError:
+                return
+            # we have a response deferred
+            response_d.callback(jid.JID(message_elt["from"]))
 
     def _handle_accept(self, client, message_elt, accept_elt):
         pass