diff libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents e11b13418ba6
children 0d7bb4df2343
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py	Sat May 11 13:25:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py	Sat May 11 13:52:41 2024 +0200
@@ -327,29 +327,24 @@
 
     ## errors which doesn't imply a stanza sending ##
 
-    def _iq_error(self, failure_, sid, client):
-        """Called when we got an <iq/> error
-
-        @param failure_(failure.Failure): the exceptions raised
-        @param sid(unicode): jingle session id
-        """
-        log.warning(
-            "Error while sending jingle <iq/> stanza: {failure_}".format(
-                failure_=failure_.value
-            )
-        )
-        self.delete_session(client, sid)
-
-    def _jingle_error_cb(self, failure_, session, request, client):
+    def _jingle_error_cb(
+            self,
+            failure_: failure.Failure|BaseException,
+            session: dict,
+            request: domish.Element,
+            client: SatXMPPEntity
+    ) -> defer.Deferred:
         """Called when something is going wrong while parsing jingle request
 
         The error condition depend of the exceptions raised:
             exceptions.DataError raise a bad-request condition
-        @param fail(failure.Failure): the exceptions raised
-        @param session(dict): data of the session
-        @param request(domsih.Element): jingle request
-        @param client: %(doc_client)s
+        @param fail: the exceptions raised
+        @param session: data of the session
+        @param request: jingle request
+        @param client: SatXMPPEntity instance
         """
+        if not isinstance(failure_, failure.Failure):
+            failure_ = failure.Failure(failure_)
         del session["jingle_elt"]
         log.warning(f"Error while processing jingle request [{client.profile}]")
         if isinstance(failure_.value, defer.FirstError):
@@ -369,7 +364,8 @@
     def register_application(
         self,
         namespace: str,
-        handler: BaseApplicationHandler
+        handler: BaseApplicationHandler,
+        priority: int = 0
     ) -> None:
         """Register an application plugin
 
@@ -390,13 +386,21 @@
                     called on several action to negociate the application or transport
                 - jingle_terminate: called on session terminate, with reason_elt
                     May be used to clean session
+        @param priority: Priority of the application. It is used when several contents
+            from different applications are used to determine in which order methods must
+            be called. An example use case is for remote control: when using remote
+            control, contents with call application may be used at the same time, but the
+            overall session is a remote control one (and so, a remote control confirmation
+            must be requested to the user, not a call one).
+            If two applications have the same priority, methods are called in the same
+            order as the content appears.
         """
         if namespace in self._applications:
             raise exceptions.ConflictError(
                 f"Trying to register already registered namespace {namespace}"
             )
         self._applications[namespace] = ApplicationData(
-            namespace=namespace, handler=handler
+            namespace=namespace, handler=handler, priority=priority
         )
         log.debug("new jingle application registered")
 
@@ -687,10 +691,10 @@
 
         try:
             await iq_elt.send()
-        except Exception as e:
-            failure_ = failure.Failure(e)
-            self._iq_error(failure_, sid, client)
-            raise failure_
+        except Exception:
+            log.exception("Error while sending jingle <iq/> stanza")
+            self.delete_session(client, sid)
+            raise
         return sid
 
     def delayed_content_terminate(self, *args, **kwargs):
@@ -823,13 +827,13 @@
         elif action == XEP_0166.A_SESSION_ACCEPT:
             await self.on_session_accept(client, request, jingle_elt, session)
         elif action == XEP_0166.A_SESSION_INFO:
-            self.on_session_info(client, request, jingle_elt, session)
+            await self.on_session_info(client, request, jingle_elt, session)
         elif action == XEP_0166.A_TRANSPORT_INFO:
             self.on_transport_info(client, request, jingle_elt, session)
         elif action == XEP_0166.A_TRANSPORT_REPLACE:
             await self.on_transport_replace(client, request, jingle_elt, session)
         elif action == XEP_0166.A_TRANSPORT_ACCEPT:
-            self.on_transport_accept(client, request, jingle_elt, session)
+            await self.on_transport_accept(client, request, jingle_elt, session)
         elif action == XEP_0166.A_TRANSPORT_REJECT:
             self.on_transport_reject(client, request, jingle_elt, session)
         else:
@@ -983,7 +987,7 @@
         """
         return elt
 
-    def _call_plugins(
+    async def _call_plugins(
         self,
         client: SatXMPPEntity,
         action: str,
@@ -995,7 +999,7 @@
         delete: bool = True,
         elements: bool = True,
         force_element: Optional[domish.Element] = None
-    ) -> List[defer.Deferred]:
+    ) -> list[Any]:
         """Call application and transport plugin methods for all contents
 
         @param action: jingle action name
@@ -1006,7 +1010,8 @@
             None to ignore
         @param app_default_cb: default callback to use if plugin has not app_method_name
             None to raise an exception instead
-        @param transp_default_cb: default callback to use if plugin has not transp_method_name
+        @param transp_default_cb: default callback to use if plugin has not
+            transp_method_name
             None to raise an exception instead
         @param delete: if True, remove desc_elt and transport_elt from session
             ignored if elements is False
@@ -1015,11 +1020,11 @@
             after a request (i.e. on <iq> result or error)
         @param force_element: if elements is False, it is used as element parameter
             else it is ignored
-        @return : list of launched Deferred
+        @return : list of launched methods results
         @raise exceptions.NotFound: method is not implemented
         """
         contents_dict = session["contents"]
-        defers_list = []
+        results = []
         for content_name, content_data in contents_dict.items():
             for method_name, handler_key, default_cb, elt_name in (
                 (app_method_name, "application", app_default_cb, "desc_elt"),
@@ -1042,12 +1047,12 @@
                     elt = content_data.pop(elt_name) if delete else content_data[elt_name]
                 else:
                     elt = force_element
-                d = utils.as_deferred(
+                result = await utils.as_deferred(
                     method, client, action, session, content_name, elt
                 )
-                defers_list.append(d)
+                results.append(result)
 
-        return defers_list
+        return results
 
     async def on_session_initiate(
         self,
@@ -1097,42 +1102,50 @@
         ):
             return
 
-        await defer.DeferredList(self._call_plugins(
+        await self._call_plugins(
             client,
             XEP_0166.A_PREPARE_CONFIRMATION,
             session,
             delete=False
-        ))
+        )
 
         # we now request each application plugin confirmation
         # and if all are accepted, we can accept the session
-        confirm_defers = self._call_plugins(
-            client,
-            XEP_0166.A_SESSION_INITIATE,
-            session,
-            "jingle_request_confirmation",
-            None,
-            self.jingle_request_confirmation_default,
-            delete=False,
-        )
+        try:
+            confirmations = await self._call_plugins(
+                client,
+                XEP_0166.A_SESSION_INITIATE,
+                session,
+                "jingle_request_confirmation",
+                None,
+                self.jingle_request_confirmation_default,
+                delete=False,
+            )
+        except Exception as e:
+            await self._jingle_error_cb(e, session, jingle_elt, client)
+        else:
+            await self._confirmation_cb(confirmations, session, jingle_elt, client)
 
-        confirm_dlist = defer.gatherResults(confirm_defers)
-        confirm_dlist.addCallback(self._confirmation_cb, session, jingle_elt, client)
-        confirm_dlist.addErrback(self._jingle_error_cb, session, request, client)
-
-    def _confirmation_cb(self, confirm_results, session, jingle_elt, client):
+    async def _confirmation_cb(
+        self,
+        confirmations: list[bool],
+        session: dict,
+        jingle_elt: domish.Element,
+        client: SatXMPPEntity
+    ) -> None:
         """Method called when confirmation from user has been received
 
         This method is only called for the responder
-        @param confirm_results(list[bool]): all True if session is accepted
-        @param session(dict): session data
-        @param jingle_elt(domish.Element): jingle data of this session
-        @param client: %(doc_client)s
+        @param confirm_results: all True if session is accepted
+        @param session: session data
+        @param jingle_elt: jingle data of this session
+        @param client: SatXMPPEntity
         """
         del session["jingle_elt"]
-        confirmed = all(confirm_results)
+        confirmed = all(confirmations)
         if not confirmed:
-            return self.terminate(client, XEP_0166.REASON_DECLINE, session)
+            await self.terminate(client, XEP_0166.REASON_DECLINE, session)
+            return
 
         iq_elt, jingle_elt = self._build_jingle_elt(
             client, session, XEP_0166.A_SESSION_ACCEPT
@@ -1142,68 +1155,56 @@
 
         # contents
 
-        def addElement(domish_elt, content_elt):
-            content_elt.addChild(domish_elt)
-
-        defers_list = []
+        try:
+            for content_name, content_data in session["contents"].items():
+                content_elt = jingle_elt.addElement("content")
+                content_elt["creator"] = XEP_0166.ROLE_INITIATOR
+                content_elt["name"] = content_name
 
-        for content_name, content_data in session["contents"].items():
-            content_elt = jingle_elt.addElement("content")
-            content_elt["creator"] = XEP_0166.ROLE_INITIATOR
-            content_elt["name"] = content_name
-
-            application = content_data["application"]
-            app_session_accept_cb = application.handler.jingle_handler
+                application = content_data["application"]
+                app_session_accept_cb = application.handler.jingle_handler
 
-            app_d = utils.as_deferred(
-                app_session_accept_cb,
-                client,
-                XEP_0166.A_SESSION_INITIATE,
-                session,
-                content_name,
-                content_data.pop("desc_elt"),
-            )
-            app_d.addCallback(addElement, content_elt)
-            defers_list.append(app_d)
-
-            transport = content_data["transport"]
-            transport_session_accept_cb = transport.handler.jingle_handler
+                updated_desc_elt = await utils.as_deferred(
+                    app_session_accept_cb,
+                    client,
+                    XEP_0166.A_SESSION_INITIATE,
+                    session,
+                    content_name,
+                    content_data.pop("desc_elt"),
+                )
+                content_elt.addChild(updated_desc_elt)
 
-            transport_d = utils.as_deferred(
-                transport_session_accept_cb,
-                client,
-                XEP_0166.A_SESSION_INITIATE,
-                session,
-                content_name,
-                content_data.pop("transport_elt"),
-            )
-            transport_d.addCallback(addElement, content_elt)
-            defers_list.append(transport_d)
+                transport = content_data["transport"]
+                transport_session_accept_cb = transport.handler.jingle_handler
 
-        d_list = defer.DeferredList(defers_list)
-        d_list.addCallback(
-            lambda __: self._call_plugins(
+                updated_transport_elt = await utils.as_deferred(
+                    transport_session_accept_cb,
+                    client,
+                    XEP_0166.A_SESSION_INITIATE,
+                    session,
+                    content_name,
+                    content_data.pop("transport_elt"),
+                )
+                content_elt.addChild(updated_transport_elt)
+
+            await self._call_plugins(
                 client,
                 XEP_0166.A_PREPARE_RESPONDER,
                 session,
                 app_method_name=None,
                 elements=False,
             )
-        )
-        d_list.addCallback(lambda __: session.pop("jingle_elt"))
-        d_list.addCallback(lambda __: iq_elt.send())
+            session.pop("jingle_elt")
+            await iq_elt.send()
 
-        def change_state(__, session):
             session["state"] = XEP_0166.STATE_ACTIVE
 
-        d_list.addCallback(change_state, session)
-        d_list.addCallback(
-            lambda __: self._call_plugins(
+            await self._call_plugins(
                 client, XEP_0166.A_ACCEPTED_ACK, session, elements=False
             )
-        )
-        d_list.addErrback(self._iq_error, session["id"], client)
-        return d_list
+        except Exception:
+            log.exception("Error while sending jingle <iq/> stanza")
+            self.delete_session(client, session["id"])
 
     def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element:
         """Find a <reason> element in parent_elt
@@ -1247,7 +1248,7 @@
         log.debug(f"Jingle Session {session['id']} terminated")
         reason_elt = self.get_reason_elt(jingle_elt)
 
-        terminate_defers = self._call_plugins(
+        await self._call_plugins(
             client,
             XEP_0166.A_SESSION_TERMINATE,
             session,
@@ -1258,10 +1259,9 @@
             elements=False,
             force_element=reason_elt,
         )
-        terminate_dlist = defer.DeferredList(terminate_defers)
 
-        terminate_dlist.addCallback(lambda __: self.delete_session(client, session["id"]))
-        client.send(xmlstream.toResponse(request, "result"))
+        self.delete_session(client, session["id"])
+        await client.a_send(xmlstream.toResponse(request, "result"))
 
     async def on_session_accept(self, client, request, jingle_elt, session):
         """Method called once session is accepted
@@ -1285,37 +1285,22 @@
         session["state"] = XEP_0166.STATE_ACTIVE
         session["jingle_elt"] = jingle_elt
 
-        await defer.DeferredList(self._call_plugins(
+        await self._call_plugins(
             client,
             XEP_0166.A_PREPARE_INITIATOR,
             session,
             delete=False
-        ))
+        )
 
-        negociate_defers = []
-        negociate_defers = self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session)
-
-        negociate_dlist = defer.gatherResults(negociate_defers)
+        await self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session)
 
         # after negociations we start the transfer
-        negociate_dlist.addCallback(
-            lambda __: self._call_plugins(
-                client, XEP_0166.A_START, session, app_method_name=None, elements=False
-            )
+        await self._call_plugins(
+            client, XEP_0166.A_START, session, app_method_name=None, elements=False
         )
-        negociate_dlist.addCallback(lambda __: session.pop("jingle_elt"))
+        session.pop("jingle_elt")
 
-    def _on_session_cb(self, result, client, request, jingle_elt, session):
-        client.send(xmlstream.toResponse(request, "result"))
-
-    def _on_session_eb(self, failure_, client, request, jingle_elt, session):
-        log.error("Error while handling on_session_info: {}".format(failure_.value))
-        # XXX: only error managed so far, maybe some applications/transports need more
-        self.sendError(
-            client, "feature-not-implemented", None, request, "unsupported-info"
-        )
-
-    def on_session_info(self, client, request, jingle_elt, session):
+    async def on_session_info(self, client, request, jingle_elt, session):
         """Method called when a session-info action is received from other peer
 
         This method is only called for initiator
@@ -1330,9 +1315,10 @@
             return
 
         try:
-            # XXX: session-info is most likely only used for application, so we don't call transport plugins
-            #      if a future transport use it, this behaviour must be adapted
-            defers = self._call_plugins(
+            # XXX: session-info is most likely only used for application, so we don't call
+            #   transport plugins if a future transport use it, this behaviour must be
+            #   adapted
+            await self._call_plugins(
                 client,
                 XEP_0166.A_SESSION_INFO,
                 session,
@@ -1342,12 +1328,15 @@
                 force_element=jingle_elt,
             )
         except exceptions.NotFound as e:
-            self._on_session_eb(failure.Failure(e), client, request, jingle_elt, session)
-            return
-
-        dlist = defer.DeferredList(defers, fireOnOneErrback=True)
-        dlist.addCallback(self._on_session_cb, client, request, jingle_elt, session)
-        dlist.addErrback(self._on_session_cb, client, request, jingle_elt, session)
+            log.exception("Error while handling on_session_info")
+            # XXX: only error managed so far, maybe some applications/transports need more
+            self.sendError(
+                client, "feature-not-implemented", None, request, "unsupported-info"
+            )
+        except Exception:
+            log.exception("Error while managing session info")
+        else:
+            client.send(xmlstream.toResponse(request, "result"))
 
     async def on_transport_replace(self, client, request, jingle_elt, session):
         """A transport change is requested
@@ -1431,13 +1420,19 @@
 
         iq_elt.send()
 
-    def on_transport_accept(self, client, request, jingle_elt, session):
+    async def on_transport_accept(
+        self,
+        client: SatXMPPEntity,
+        request: domish.Element,
+        jingle_elt: domish.Element,
+        session: dict
+    ) -> None:
         """Method called once transport replacement is accepted
 
-        @param client: %(doc_client)s
-        @param request(domish.Element): full <iq> request
-        @param jingle_elt(domish.Element): the <jingle> element
-        @param session(dict): session data
+        @param client: SatXMPPEntity instance
+        @param request: full <iq> request
+        @param jingle_elt: the <jingle> element
+        @param session: session data
         """
         log.debug("new transport has been accepted")
 
@@ -1449,20 +1444,15 @@
             return
 
         # at this point we can send the <iq/> result to confirm reception of the request
-        client.send(xmlstream.toResponse(request, "result"))
+        await client.a_send(xmlstream.toResponse(request, "result"))
 
-        negociate_defers = []
-        negociate_defers = self._call_plugins(
+        await self._call_plugins(
             client, XEP_0166.A_TRANSPORT_ACCEPT, session, app_method_name=None
         )
 
-        negociate_dlist = defer.DeferredList(negociate_defers)
-
         # after negociations we start the transfer
-        negociate_dlist.addCallback(
-            lambda __: self._call_plugins(
-                client, XEP_0166.A_START, session, app_method_name=None, elements=False
-            )
+        await self._call_plugins(
+            client, XEP_0166.A_START, session, app_method_name=None, elements=False
         )
 
     def on_transport_reject(self, client, request, jingle_elt, session):