diff libervia/backend/plugins/plugin_xep_0166/__init__.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 79c8a70e1813
children f46891f2c9cb
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0166/__init__.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py	Wed Jun 19 18:44:57 2024 +0200
@@ -51,15 +51,15 @@
 log = getLogger(__name__)
 
 
-IQ_SET : Final = '/iq[@type="set"]'
-NS_JINGLE : Final = "urn:xmpp:jingle:1"
-NS_JINGLE_ERROR : Final = "urn:xmpp:jingle:errors:1"
-JINGLE_REQUEST : Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]'
-CONFIRM_TXT : Final = D_(
+IQ_SET: Final = '/iq[@type="set"]'
+NS_JINGLE: Final = "urn:xmpp:jingle:1"
+NS_JINGLE_ERROR: Final = "urn:xmpp:jingle:errors:1"
+JINGLE_REQUEST: Final = f'{IQ_SET}/jingle[@xmlns="{NS_JINGLE}"]'
+CONFIRM_TXT: Final = D_(
     "{entity} want to start a jingle session with you, do you accept ?"
 )
 
-PLUGIN_INFO : Final = {
+PLUGIN_INFO: Final = {
     C.PI_NAME: "Jingle",
     C.PI_IMPORT_NAME: "XEP-0166",
     C.PI_TYPE: "XEP",
@@ -72,54 +72,50 @@
 
 
 class XEP_0166:
-    namespace : Final = NS_JINGLE
+    namespace: Final = NS_JINGLE
 
-    ROLE_INITIATOR : Final = "initiator"
-    ROLE_RESPONDER : Final = "responder"
+    ROLE_INITIATOR: Final = "initiator"
+    ROLE_RESPONDER: Final = "responder"
 
-    TRANSPORT_DATAGRAM : Final = "UDP"
-    TRANSPORT_STREAMING : Final = "TCP"
+    TRANSPORT_DATAGRAM: Final = "UDP"
+    TRANSPORT_STREAMING: Final = "TCP"
 
-    REASON_SUCCESS : Final = "success"
-    REASON_DECLINE : Final = "decline"
-    REASON_FAILED_APPLICATION : Final = "failed-application"
-    REASON_FAILED_TRANSPORT : Final = "failed-transport"
-    REASON_CONNECTIVITY_ERROR : Final = "connectivity-error"
+    REASON_SUCCESS: Final = "success"
+    REASON_DECLINE: Final = "decline"
+    REASON_FAILED_APPLICATION: Final = "failed-application"
+    REASON_FAILED_TRANSPORT: Final = "failed-transport"
+    REASON_CONNECTIVITY_ERROR: Final = "connectivity-error"
 
-    STATE_PENDING : Final = "PENDING"
-    STATE_ACTIVE : Final = "ACTIVE"
-    STATE_ENDED : Final = "ENDED"
+    STATE_PENDING: Final = "PENDING"
+    STATE_ACTIVE: Final = "ACTIVE"
+    STATE_ENDED: Final = "ENDED"
 
     # standard actions
 
-    A_SESSION_INITIATE : Final = "session-initiate"
-    A_SESSION_ACCEPT : Final = "session-accept"
-    A_SESSION_TERMINATE : Final = "session-terminate"
-    A_SESSION_INFO : Final = "session-info"
-    A_TRANSPORT_REPLACE : Final = "transport-replace"
-    A_TRANSPORT_ACCEPT : Final = "transport-accept"
-    A_TRANSPORT_REJECT : Final = "transport-reject"
-    A_TRANSPORT_INFO : Final = "transport-info"
+    A_SESSION_INITIATE: Final = "session-initiate"
+    A_SESSION_ACCEPT: Final = "session-accept"
+    A_SESSION_TERMINATE: Final = "session-terminate"
+    A_SESSION_INFO: Final = "session-info"
+    A_TRANSPORT_REPLACE: Final = "transport-replace"
+    A_TRANSPORT_ACCEPT: Final = "transport-accept"
+    A_TRANSPORT_REJECT: Final = "transport-reject"
+    A_TRANSPORT_INFO: Final = "transport-info"
 
     # non standard actions
 
     #: called before the confirmation request, first event for responder, useful for
     #: parsing
-    A_PREPARE_CONFIRMATION : Final = "prepare-confirmation"
+    A_PREPARE_CONFIRMATION: Final = "prepare-confirmation"
     #: initiator must prepare tranfer
-    A_PREPARE_INITIATOR : Final = "prepare-initiator"
+    A_PREPARE_INITIATOR: Final = "prepare-initiator"
     #: responder must prepare tranfer
-    A_PREPARE_RESPONDER : Final = "prepare-responder"
-    #; session accepted ack has been received from initiator
-    A_ACCEPTED_ACK : Final = (
-        "accepted-ack"
-    )
-    A_START : Final = "start"  # application can start
+    A_PREPARE_RESPONDER: Final = "prepare-responder"
+    # ; session accepted ack has been received from initiator
+    A_ACCEPTED_ACK: Final = "accepted-ack"
+    A_START: Final = "start"  # application can start
     #: called when a transport is destroyed (e.g. because it is remplaced). Used to do
     #: cleaning operations
-    A_DESTROY : Final = (
-        "destroy"
-    )
+    A_DESTROY: Final = "destroy"
 
     def __init__(self, host):
         log.info(_("plugin Jingle initialization"))
@@ -157,9 +153,7 @@
         try:
             return client.jingle_sessions[session_id]
         except KeyError:
-            raise exceptions.NotFound(
-                f"No session with SID {session_id} found"
-            )
+            raise exceptions.NotFound(f"No session with SID {session_id} found")
 
     def create_session(
         self,
@@ -167,8 +161,8 @@
         sid: str,
         role: str,
         peer_jid: jid.JID,
-        local_jid: jid.JID|None = None,
-        **kwargs
+        local_jid: jid.JID | None = None,
+        **kwargs,
     ) -> dict:
         """Create a new jingle session.
 
@@ -192,7 +186,6 @@
         if role not in [XEP_0166.ROLE_INITIATOR, XEP_0166.ROLE_RESPONDER]:
             raise ValueError(f"Invalid role {role}. Expected initiator or responder.")
 
-
         session_data = {
             "id": sid,
             "state": XEP_0166.STATE_PENDING,
@@ -201,7 +194,7 @@
             "local_jid": local_jid or client.jid,
             "peer_jid": peer_jid,
             "started": time.time(),
-            "contents": {}
+            "contents": {},
         }
 
         # If extra kw args are provided, merge them into the session_data
@@ -213,27 +206,24 @@
 
         return session_data
 
-
     def delete_session(self, client, sid):
         try:
             del client.jingle_sessions[sid]
         except KeyError:
             log.debug(
                 f"Jingle session id {sid!r} is unknown, nothing to delete "
-                f"[{client.profile}]")
+                f"[{client.profile}]"
+            )
         else:
             log.debug(f"Jingle session id {sid!r} deleted [{client.profile}]")
 
     ## helpers methods to build stanzas ##
 
     def _build_jingle_elt(
-        self,
-        client: SatXMPPEntity,
-        session: dict,
-        action: str
+        self, client: SatXMPPEntity, session: dict, action: str
     ) -> Tuple[xmlstream.IQ, domish.Element]:
         iq_elt = client.IQ("set")
-        iq_elt["from"] = session['local_jid'].full()
+        iq_elt["from"] = session["local_jid"].full()
         iq_elt["to"] = session["peer_jid"].full()
         jingle_elt = iq_elt.addElement("jingle", NS_JINGLE)
         jingle_elt["sid"] = session["id"]
@@ -264,11 +254,7 @@
         log.warning(_("Error while terminating session: {msg}").format(msg=failure_))
 
     def _terminate(
-        self,
-        session_id: str,
-        reason: str,
-        reason_txt: str,
-        profile: str
+        self, session_id: str, reason: str, reason_txt: str, profile: str
     ) -> defer.Deferred:
         client = self.host.get_client(profile)
         session = self.get_session(client, session_id)
@@ -276,19 +262,14 @@
             raise ValueError(
                 'only "cancel", "decline" and "busy" and empty value are allowed'
             )
-        return self.terminate(
-            client,
-            reason or None,
-            session,
-            text=reason_txt or None
-        )
+        return self.terminate(client, reason or None, session, text=reason_txt or None)
 
     def terminate(
-            self,
-            client: SatXMPPEntity,
-            reason: str|list[domish.Element]|None,
-            session: dict,
-            text: str|None = None
+        self,
+        client: SatXMPPEntity,
+        reason: str | list[domish.Element] | None,
+        session: dict,
+        text: str | None = None,
     ) -> defer.Deferred:
         """Terminate the session
 
@@ -311,14 +292,9 @@
             reason_elt = None
         if text is not None:
             if reason_elt is None:
-                raise ValueError(
-                    "You have to specify a reason if text is specified"
-                )
+                raise ValueError("You have to specify a reason if text is specified")
             reason_elt.addElement("text", content=text)
-        if not self.host.trigger.point(
-            "XEP-0166_terminate",
-            client, session, reason_elt
-        ):
+        if not self.host.trigger.point("XEP-0166_terminate", client, session, reason_elt):
             return defer.succeed(None)
         self.delete_session(client, session["id"])
         d = iq_elt.send()
@@ -328,11 +304,11 @@
     ## errors which doesn't imply a stanza sending ##
 
     def _jingle_error_cb(
-            self,
-            failure_: failure.Failure|BaseException,
-            session: dict,
-            request: domish.Element,
-            client: SatXMPPEntity
+        self,
+        failure_: failure.Failure | BaseException,
+        session: dict,
+        request: domish.Element,
+        client: SatXMPPEntity,
     ) -> defer.Deferred:
         """Called when something is going wrong while parsing jingle request
 
@@ -352,20 +328,19 @@
         if isinstance(failure_, exceptions.DataError):
             return self.sendError(client, "bad-request", session["id"], request)
         elif isinstance(failure_, error.StanzaError):
-            return self.terminate(client, self.REASON_FAILED_APPLICATION, session,
-                                  text=str(failure_))
+            return self.terminate(
+                client, self.REASON_FAILED_APPLICATION, session, text=str(failure_)
+            )
         else:
             log.error(f"Unmanaged jingle exception: {failure_}")
-            return self.terminate(client, self.REASON_FAILED_APPLICATION, session,
-                                  text=str(failure_))
+            return self.terminate(
+                client, self.REASON_FAILED_APPLICATION, session, text=str(failure_)
+            )
 
     ## methods used by other plugins ##
 
     def register_application(
-        self,
-        namespace: str,
-        handler: BaseApplicationHandler,
-        priority: int = 0
+        self, namespace: str, handler: BaseApplicationHandler, priority: int = 0
     ) -> None:
         """Register an application plugin
 
@@ -405,11 +380,11 @@
         log.debug("new jingle application registered")
 
     def register_transport(
-            self,
-            namespace: str,
-            transport_type: str,
-            handler: BaseTransportHandler,
-            priority: int = 0
+        self,
+        namespace: str,
+        transport_type: str,
+        handler: BaseTransportHandler,
+        priority: int = 0,
     ) -> None:
         """Register a transport plugin
 
@@ -466,7 +441,9 @@
         content_elt["name"] = content_name
         content_elt["creator"] = content_data["creator"]
 
-        transport_elt = transport.handler.jingle_session_init(client, session, content_name)
+        transport_elt = transport.handler.jingle_session_init(
+            client, session, content_name
+        )
         content_elt.addChild(transport_elt)
         iq_elt.send()
 
@@ -477,7 +454,7 @@
         session: dict,
         content_name: str,
         iq_elt: Optional[xmlstream.IQ] = None,
-        context_elt: Optional[domish.Element] = None
+        context_elt: Optional[domish.Element] = None,
     ) -> Tuple[xmlstream.IQ, domish.Element]:
         """Build an element according to requested action
 
@@ -534,12 +511,10 @@
         try:
             return self._applications[namespace]
         except KeyError:
-            raise exceptions.NotFound(
-                f"No application registered for {namespace}"
-            )
+            raise exceptions.NotFound(f"No application registered for {namespace}")
 
     def get_content_data(self, content: dict, content_idx: int) -> ContentData:
-        """"Retrieve application and its argument from content"""
+        """ "Retrieve application and its argument from content"""
         app_ns = content["app_ns"]
         try:
             application = self.get_application(app_ns)
@@ -553,11 +528,7 @@
         except KeyError:
             content_name = content["name"] = str(content_idx)
         return ContentData(
-            application,
-            app_args,
-            app_kwargs,
-            transport_data,
-            content_name
+            application, app_args, app_kwargs, transport_data, content_name
         )
 
     async def initiate(
@@ -566,8 +537,8 @@
         peer_jid: jid.JID,
         contents: List[dict],
         encrypted: bool = False,
-        sid: str|None = None,
-        **extra_data: Any
+        sid: str | None = None,
+        **extra_data: Any,
     ) -> str:
         """Send a session initiation request
 
@@ -591,8 +562,11 @@
         @return: Sesson ID
         """
         assert contents  # there must be at least one content
-        if (peer_jid == client.jid
-            or client.is_component and peer_jid.host == client.jid.host):
+        if (
+            peer_jid == client.jid
+            or client.is_component
+            and peer_jid.host == client.jid.host
+        ):
             raise ValueError(_("You can't do a jingle session with yourself"))
         if sid is None:
             sid = str(uuid.uuid4())
@@ -602,8 +576,7 @@
         initiator = session["initiator"]
 
         if not await self.host.trigger.async_point(
-            "XEP-0166_initiate",
-            client, session, contents
+            "XEP-0166_initiate", client, session, contents
         ):
             return sid
 
@@ -658,21 +631,25 @@
             # then the description element
             application_data["desc_elt"] = desc_elt = await utils.as_deferred(
                 content_data.application.handler.jingle_session_init,
-                client, session, content_data.content_name,
-                *content_data.app_args, **content_data.app_kwargs
+                client,
+                session,
+                content_data.content_name,
+                *content_data.app_args,
+                **content_data.app_kwargs,
             )
             content_elt.addChild(desc_elt)
 
             # and the transport one
             transport_data["transport_elt"] = transport_elt = await utils.as_deferred(
                 transport.handler.jingle_session_init,
-                client, session, content_data.content_name,
+                client,
+                session,
+                content_data.content_name,
             )
             content_elt.addChild(transport_elt)
 
         if not await self.host.trigger.async_point(
-            "XEP-0166_initiate_elt_built",
-            client, session, iq_elt, jingle_elt
+            "XEP-0166_initiate_elt_built", client, session, iq_elt, jingle_elt
         ):
             return sid
 
@@ -737,9 +714,7 @@
         defer.ensureDeferred(self.on_jingle_request(client, request))
 
     async def on_jingle_request(
-        self,
-        client: SatXMPPEntity,
-        request: domish.Element
+        self, client: SatXMPPEntity, request: domish.Element
     ) -> None:
         """Called when any jingle request is received
 
@@ -803,15 +778,13 @@
                 # XXX: we store local_jid using request['to'] because for a component the
                 # jid used may not be client.jid (if a local part is used).
                 session = self.create_session(
-                    client, sid, XEP_0166.ROLE_RESPONDER, peer_jid, jid.JID(request['to'])
+                    client, sid, XEP_0166.ROLE_RESPONDER, peer_jid, jid.JID(request["to"])
                 )
         else:
             if session["peer_jid"] != peer_jid:
                 log.warning(
                     "sid conflict ({}), the jid doesn't match. Can be a collision, a "
-                    "hack attempt, or a bad sid generation".format(
-                        sid
-                    )
+                    "hack attempt, or a bad sid generation".format(sid)
                 )
                 self.sendError(client, "service-unavailable", sid, request)
                 return
@@ -849,7 +822,7 @@
         client: SatXMPPEntity,
         new: bool = False,
         creator: str = ROLE_INITIATOR,
-        with_application: bool =True,
+        with_application: bool = True,
         with_transport: bool = True,
         store_in_session: bool = True,
     ) -> Dict[str, dict]:
@@ -926,9 +899,7 @@
                     try:
                         application = self._applications[app_ns]
                     except KeyError:
-                        log.warning(
-                            "Unmanaged application namespace [{}]".format(app_ns)
-                        )
+                        log.warning("Unmanaged application namespace [{}]".format(app_ns))
                         self.sendError(
                             client, "service-unavailable", session["id"], request
                         )
@@ -998,7 +969,7 @@
         transp_default_cb: Optional[Callable] = None,
         delete: bool = True,
         elements: bool = True,
-        force_element: Optional[domish.Element] = None
+        force_element: Optional[domish.Element] = None,
     ) -> list[Any]:
         """Call application and transport plugin methods for all contents
 
@@ -1059,7 +1030,7 @@
         client: SatXMPPEntity,
         request: domish.Element,
         jingle_elt: domish.Element,
-        session: Dict[str, Any]
+        session: Dict[str, Any],
     ) -> None:
         """Called on session-initiate action
 
@@ -1093,20 +1064,15 @@
         # at this point we can send the <iq/> result to confirm reception of the request
         client.send(xmlstream.toResponse(request, "result"))
 
-
         assert "jingle_elt" not in session
         session["jingle_elt"] = jingle_elt
         if not await self.host.trigger.async_point(
-            "XEP-0166_on_session_initiate",
-            client, session, request, jingle_elt
+            "XEP-0166_on_session_initiate", client, session, request, jingle_elt
         ):
             return
 
         await self._call_plugins(
-            client,
-            XEP_0166.A_PREPARE_CONFIRMATION,
-            session,
-            delete=False
+            client, XEP_0166.A_PREPARE_CONFIRMATION, session, delete=False
         )
 
         # we now request each application plugin confirmation
@@ -1131,7 +1097,7 @@
         confirmations: list[bool],
         session: dict,
         jingle_elt: domish.Element,
-        client: SatXMPPEntity
+        client: SatXMPPEntity,
     ) -> None:
         """Method called when confirmation from user has been received
 
@@ -1150,7 +1116,7 @@
         iq_elt, jingle_elt = self._build_jingle_elt(
             client, session, XEP_0166.A_SESSION_ACCEPT
         )
-        jingle_elt["responder"] = session['local_jid'].full()
+        jingle_elt["responder"] = session["local_jid"].full()
         session["jingle_elt"] = jingle_elt
 
         # contents
@@ -1219,7 +1185,9 @@
             reason_elt = parent_elt.addElement("reason")
             return reason_elt
 
-    def parse_reason_elt(self, reason_elt: domish.Element) -> tuple[str|None, str|None]:
+    def parse_reason_elt(
+        self, reason_elt: domish.Element
+    ) -> tuple[str | None, str | None]:
         """Parse a <reason> element
 
         @return: reason found, and text if any
@@ -1242,7 +1210,7 @@
         client: SatXMPPEntity,
         request: domish.Element,
         jingle_elt: domish.Element,
-        session: dict
+        session: dict,
     ) -> None:
         # TODO: check reason, display a message to user if needed
         log.debug(f"Jingle Session {session['id']} terminated")
@@ -1286,10 +1254,7 @@
         session["jingle_elt"] = jingle_elt
 
         await self._call_plugins(
-            client,
-            XEP_0166.A_PREPARE_INITIATOR,
-            session,
-            delete=False
+            client, XEP_0166.A_PREPARE_INITIATOR, session, delete=False
         )
 
         await self._call_plugins(client, XEP_0166.A_SESSION_ACCEPT, session)
@@ -1398,7 +1363,11 @@
             # we can now actually replace the transport
             await utils.as_deferred(
                 content_data["transport"].handler.jingle_handler,
-                client, XEP_0166.A_DESTROY, session, content_name, None
+                client,
+                XEP_0166.A_DESTROY,
+                session,
+                content_name,
+                None,
             )
             content_data["transport"] = transport
             content_data["transport_data"].clear()
@@ -1409,13 +1378,21 @@
             # we notify the transport and insert its <transport/> in the answer
             accept_transport_elt = await utils.as_deferred(
                 transport.handler.jingle_handler,
-                client, XEP_0166.A_TRANSPORT_REPLACE, session, content_name, transport_elt
+                client,
+                XEP_0166.A_TRANSPORT_REPLACE,
+                session,
+                content_name,
+                transport_elt,
             )
             content_elt.addChild(accept_transport_elt)
             # there is no confirmation needed here, so we can directly prepare it
             await utils.as_deferred(
                 transport.handler.jingle_handler,
-                client, XEP_0166.A_PREPARE_RESPONDER, session, content_name, None
+                client,
+                XEP_0166.A_PREPARE_RESPONDER,
+                session,
+                content_name,
+                None,
             )
 
         iq_elt.send()
@@ -1425,7 +1402,7 @@
         client: SatXMPPEntity,
         request: domish.Element,
         jingle_elt: domish.Element,
-        session: dict
+        session: dict,
     ) -> None:
         """Method called once transport replacement is accepted
 
@@ -1472,7 +1449,7 @@
         client: SatXMPPEntity,
         request: domish.Element,
         jingle_elt: domish.Element,
-        session: dict
+        session: dict,
     ) -> None:
         """Method called when a transport-info action is received from other peer
 
@@ -1487,8 +1464,12 @@
 
         try:
             parsed_contents = self._parse_elements(
-                jingle_elt, session, request, client, with_application=False,
-                store_in_session=False
+                jingle_elt,
+                session,
+                request,
+                client,
+                with_application=False,
+                store_in_session=False,
             )
         except exceptions.CancelError:
             return