changeset 4111:a8ac5e1e5848

plugin XEP-0166: `jingle_terminate`, session handling and reason parsing: - a new `jingle_terminate` bridge method let frontend terminate a session - methods to manage a session from another plugin - methods to parse `<reason>` element from another plugin - allow session to be created before a first jingle request is received, to prepare for `preflight` methods rel 423
author Goffi <goffi@goffi.org>
date Tue, 08 Aug 2023 23:59:24 +0200
parents b274f0d5c138
children bc60875cb3b8
files libervia/backend/plugins/plugin_xep_0166/__init__.py
diffstat 1 files changed, 175 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py	Tue Aug 08 23:49:36 2023 +0200
+++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py	Tue Aug 08 23:59:24 2023 +0200
@@ -130,6 +130,14 @@
             XEP_0166.TRANSPORT_DATAGRAM: [],
             XEP_0166.TRANSPORT_STREAMING: [],
         }
+        host.bridge.add_method(
+            "jingle_terminate",
+            ".plugin",
+            in_sign="ssss",
+            out_sign="",
+            method=self._terminate,
+            async_=True,
+        )
 
     def profile_connected(self, client):
         client.jingle_sessions = {}  # key = sid, value = session_data
@@ -152,8 +160,56 @@
                 f"No session with SID {session_id} found"
             )
 
+    def create_session(
+        self,
+        client: SatXMPPEntity,
+        sid: str,
+        role: str,
+        peer_jid: jid.JID,
+        local_jid: jid.JID|None = None,
+        **kwargs
+    ) -> dict:
+        """Create a new jingle session.
 
-    def _del_session(self, client, sid):
+        @param client: The client entity.
+        @param sid: Session ID.
+        @param role: Session role (initiator or responder).
+        @param peer_jid: JID of the peer.
+        @param local_jid: JID of the local entity.
+         If None, defaults to client.jid.
+        @param extra_data: Additional data to be added to the session. Defaults to None.
+
+        @return: The created session.
+
+        @raise ValueError: If the provided role is neither initiator nor responder.
+        """
+        # TODO: session cleaning after timeout ?
+        if role not in [XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER]:
+            raise ValueError(f"Invalid role {role}. Expected initiator or responder.")
+
+
+        session_data = {
+            "id": sid,
+            "state": STATE_PENDING,
+            "initiator": client.jid if role == XEP_0166.ROLE_INITIATOR else peer_jid,
+            "role": role,
+            "local_jid": local_jid or client.jid,
+            "peer_jid": peer_jid,
+            "started": time.time(),
+            "contents": {}
+        }
+
+        # If extra kw args are provided, merge them into the session_data
+        if kwargs:
+            session_data.update(kwargs)
+
+        # Add the session to the client's jingle sessions
+        client.jingle_sessions[sid] = session_data
+
+        return session_data
+
+
+    def delete_session(self, client, sid):
         try:
             del client.jingle_sessions[sid]
         except KeyError:
@@ -191,7 +247,7 @@
         if jingle_condition is not None:
             iq_elt.error.addElement((NS_JINGLE_ERROR, jingle_condition))
         if error.STANZA_CONDITIONS[error_condition]["type"] == "cancel" and sid:
-            self._del_session(client, sid)
+            self.delete_session(client, sid)
             log.warning(
                 "Error while managing jingle session, cancelling: {condition}".format(
                     condition=error_condition
@@ -202,26 +258,64 @@
     def _terminate_eb(self, failure_):
         log.warning(_("Error while terminating session: {msg}").format(msg=failure_))
 
-    def terminate(self, client, reason, session, text=None):
+    def _terminate(
+        self,
+        session_id: str,
+        reason: str,
+        reason_txt: str,
+        profile: str
+    ) -> defer.Deferred:
+        client = self.host.get_client(profile)
+        session = self.get_session(client, session_id)
+        if reason not in ("", "cancel", "decline", "busy"):
+            raise ValueError(
+                'only "cancel", "decline" and "busy" and empty value are allowed'
+            )
+        return self.terminate(
+            client,
+            reason or None,
+            session,
+            text=reason_txt or None
+        )
+
+    def terminate(
+            self,
+            client: SatXMPPEntity,
+            reason: str|list[domish.Element]|None,
+            session: dict,
+            text: str|None = None
+    ) -> defer.Deferred:
         """Terminate the session
 
         send the session-terminate action, and delete the session data
-        @param reason(unicode, list[domish.Element]): if unicode, will be transformed to an element
+        @param reason: if unicode, will be transformed to an element
             if a list of element, add them as children of the <reason/> element
-        @param session(dict): data of the session
+        @param session: data of the session
         """
         iq_elt, jingle_elt = self._build_jingle_elt(
             client, session, XEP_0166.A_SESSION_TERMINATE
         )
-        reason_elt = jingle_elt.addElement("reason")
-        if isinstance(reason, str):
-            reason_elt.addElement(reason)
+        if reason is not None:
+            reason_elt = jingle_elt.addElement("reason")
+            if isinstance(reason, str):
+                reason_elt.addElement(reason)
+            else:
+                for elt in reason:
+                    reason_elt.addChild(elt)
         else:
-            for elt in reason:
-                reason_elt.addChild(elt)
+            reason_elt = None
         if text is not None:
+            if reason_elt is None:
+                raise ValueError(
+                    "You have to specify a reason if text is specified"
+                )
             reason_elt.addElement("text", content=text)
-        self._del_session(client, session["id"])
+        if not self.host.trigger.point(
+            "XEP-0166_terminate",
+            client, session, reason_elt
+        ):
+            return defer.succeed(None)
+        self.delete_session(client, session["id"])
         d = iq_elt.send()
         d.addErrback(self._terminate_eb)
         return d
@@ -239,7 +333,7 @@
                 failure_=failure_.value
             )
         )
-        self._del_session(client, sid)
+        self.delete_session(client, sid)
 
     def _jingle_error_cb(self, failure_, session, request, client):
         """Called when something is going wrong while parsing jingle request
@@ -462,6 +556,7 @@
         peer_jid: jid.JID,
         contents: List[dict],
         encrypted: bool = False,
+        sid: str|None = None,
         **extra_data: Any
     ) -> str:
         """Send a session initiation request
@@ -474,32 +569,27 @@
                 - transport_type(str): type of transport to use (see XEP-0166 §8)
                     default to TRANSPORT_STREAMING
                 - name(str): name of the content
-                - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER, both or none
-                    default to BOTH (see XEP-0166 §7.3)
+                - senders(str): One of XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER,
+                    both or none
+                    Defaults to BOTH (see XEP-0166 §7.3)
                 - app_args(list): args to pass to the application plugin
                 - app_kwargs(dict): keyword args to pass to the application plugin
         @param encrypted: if True, session must be encrypted and "encryption" must be set
             to all content data of session
-        @return: jingle session id
+        @param sid: Session ID.
+            If None, one will be generated (and used as return value)
+        @return: Sesson ID
         """
         assert contents  # there must be at least one content
         if (peer_jid == client.jid
             or client.is_component and peer_jid.host == client.jid.host):
             raise ValueError(_("You can't do a jingle session with yourself"))
-        initiator = client.jid
-        sid = str(uuid.uuid4())
-        # TODO: session cleaning after timeout ?
-        session = client.jingle_sessions[sid] = {
-            "id": sid,
-            "state": STATE_PENDING,
-            "initiator": initiator,
-            "role": XEP_0166.ROLE_INITIATOR,
-            "local_jid": client.jid,
-            "peer_jid": peer_jid,
-            "started": time.time(),
-            "contents": {},
-            **extra_data,
-        }
+        if sid is None:
+            sid = str(uuid.uuid4())
+        session = self.create_session(
+            client, sid, XEP_0166.ROLE_INITIATOR, peer_jid, **extra_data
+        )
+        initiator = session["initiator"]
 
         if not await self.host.trigger.async_point(
             "XEP-0166_initiate",
@@ -679,7 +769,8 @@
                 pass
             elif action == XEP_0166.A_SESSION_TERMINATE:
                 log.debug(
-                    "ignoring session terminate action (inexisting session id): {request_id} [{profile}]".format(
+                    "ignoring session terminate action (inexisting session id): "
+                    "{request_id} [{profile}]".format(
                         request_id=sid, profile=client.profile
                     )
                 )
@@ -693,21 +784,19 @@
                 self.sendError(client, "item-not-found", None, request, "unknown-session")
                 return
 
-            session = client.jingle_sessions[sid] = {
-                "id": sid,
-                "state": STATE_PENDING,
-                "initiator": peer_jid,
-                "role": XEP_0166.ROLE_RESPONDER,
-                # we store local_jid using request['to'] because for a component the jid
-                # used may not be client.jid (if a local part is used).
-                "local_jid": jid.JID(request['to']),
-                "peer_jid": peer_jid,
-                "started": time.time(),
-            }
+            try:
+                session = self.get_session(client, sid)
+            except exceptions.NotFound:
+                # XXX: we store local_jid using request['to'] because for a component the
+                # jid used may not be client.jid (if a local part is used).
+                session = self.create_session(
+                    client, sid, XEP_0166.ROLE_RESPONDER, peer_jid, jid.JID(request['to'])
+                )
         else:
             if session["peer_jid"] != peer_jid:
                 log.warning(
-                    "sid conflict ({}), the jid doesn't match. Can be a collision, a hack attempt, or a bad sid generation".format(
+                    "sid conflict ({}), the jid doesn't match. Can be a collision, a "
+                    "hack attempt, or a bad sid generation".format(
                         sid
                     )
                 )
@@ -721,7 +810,7 @@
         if action == XEP_0166.A_SESSION_INITIATE:
             await self.on_session_initiate(client, request, jingle_elt, session)
         elif action == XEP_0166.A_SESSION_TERMINATE:
-            self.on_session_terminate(client, request, jingle_elt, session)
+            await self.on_session_terminate(client, request, jingle_elt, session)
         elif action == XEP_0166.A_SESSION_ACCEPT:
             await self.on_session_accept(client, request, jingle_elt, session)
         elif action == XEP_0166.A_SESSION_INFO:
@@ -969,11 +1058,11 @@
         @param jingle_elt(domish.Element): <jingle> element
         @param session(dict): session data
         """
-        if "contents" in session:
+        contents_dict = session["contents"]
+        if contents_dict:
             raise exceptions.InternalError(
-                "Contents dict should not already exist at this point"
+                "Contents dict should not already be set at this point"
             )
-        session["contents"] = contents_dict = {}
 
         try:
             self._parse_elements(
@@ -1107,14 +1196,47 @@
         d_list.addErrback(self._iq_error, session["id"], client)
         return d_list
 
-    def on_session_terminate(self, client, request, jingle_elt, session):
+    def get_reason_elt(self, parent_elt: domish.Element) -> domish.Element:
+        """Find a <reason> element in parent_elt
+
+        if none is found, add an empty one to the element
+        @return: the <reason> element
+        """
+        try:
+            return next(parent_elt.elements(NS_JINGLE, "reason"))
+        except StopIteration:
+            log.warning("No reason given for session termination")
+            reason_elt = parent_elt.addElement("reason")
+            return reason_elt
+
+    def parse_reason_elt(self, reason_elt: domish.Element) -> tuple[str|None, str|None]:
+        """Parse a <reason> element
+
+        @return: reason found, and text if any
+        """
+        reason, text = None, None
+        for elt in reason_elt.elements():
+            if elt.uri == NS_JINGLE:
+                if elt.name == "text":
+                    text = str(elt)
+                else:
+                    reason = elt.name
+
+        if reason is None:
+            log.debug("no reason specified,")
+
+        return reason, text
+
+    async def on_session_terminate(
+        self,
+        client: SatXMPPEntity,
+        request: domish.Element,
+        jingle_elt: domish.Element,
+        session: dict
+    ) -> None:
         # TODO: check reason, display a message to user if needed
         log.debug(f"Jingle Session {session['id']} terminated")
-        try:
-            reason_elt = next(jingle_elt.elements(NS_JINGLE, "reason"))
-        except StopIteration:
-            log.warning("No reason given for session termination")
-            reason_elt = jingle_elt.addElement("reason")
+        reason_elt = self.get_reason_elt(jingle_elt)
 
         terminate_defers = self._call_plugins(
             client,
@@ -1129,7 +1251,7 @@
         )
         terminate_dlist = defer.DeferredList(terminate_defers)
 
-        terminate_dlist.addCallback(lambda __: self._del_session(client, session["id"]))
+        terminate_dlist.addCallback(lambda __: self.delete_session(client, session["id"]))
         client.send(xmlstream.toResponse(request, "result"))
 
     async def on_session_accept(self, client, request, jingle_elt, session):