changeset 3404:26a0af6e32c1

plugin XEP-0166: new trigger point + coroutines + helper methods: - a new `XEP-0166_initiate` async trigger point is available - `initate` is now a coroutine - `jingleSessionInit` in applications and transports handlers are now called using `utils.adDeferred` - new `getApplication` helper method, to retrieve application from its namespace - new `getContentData` helper method to retrieve application and its argument from content
author Goffi <goffi@goffi.org>
date Thu, 12 Nov 2020 14:53:15 +0100 (2020-11-12)
parents 404d4b29de52
children ecdb3728749e
files sat/plugins/plugin_exp_jingle_stream.py sat/plugins/plugin_xep_0166.py sat/plugins/plugin_xep_0234.py
diffstat 3 files changed, 72 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_exp_jingle_stream.py	Thu Nov 12 14:53:15 2020 +0100
+++ b/sat/plugins/plugin_exp_jingle_stream.py	Thu Nov 12 14:53:15 2020 +0100
@@ -17,14 +17,8 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from sat.core.i18n import _, D_
-from sat.core.constants import Const as C
-from sat.core import exceptions
-from sat.core.log import getLogger
-
-log = getLogger(__name__)
-from sat.tools import xml_tools
-from sat.tools import stream
+import errno
+from zope import interface
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid
 from twisted.internet import defer
@@ -33,8 +27,15 @@
 from twisted.internet import reactor
 from twisted.internet import error
 from twisted.internet import interfaces
-from zope import interface
-import errno
+from sat.core.i18n import _, D_
+from sat.core.constants import Const as C
+from sat.core import exceptions
+from sat.core.log import getLogger
+from sat.tools import xml_tools
+from sat.tools import stream
+
+
+log = getLogger(__name__)
 
 NS_STREAM = "http://salut-a-toi.org/protocol/stream"
 SECURITY_LIMIT = 30
@@ -193,10 +194,9 @@
 
     def _streamOut(self, to_jid_s, profile_key):
         client = self.host.getClient(profile_key)
-        return self.streamOut(client, jid.JID(to_jid_s))
+        return defer.ensureDeferred(self.streamOut(client, jid.JID(to_jid_s)))
 
-    @defer.inlineCallbacks
-    def streamOut(self, client, to_jid):
+    async def streamOut(self, client, to_jid):
         """send a stream
 
         @param peer_jid(jid.JID): recipient
@@ -207,7 +207,7 @@
         while True:
             endpoint = endpoints.TCP4ServerEndpoint(reactor, port)
             try:
-                port_listening = yield endpoint.listen(factory)
+                port_listening = await endpoint.listen(factory)
             except error.CannotListenError as e:
                 if e.socketError.errno == errno.EADDRINUSE:
                     port += 1
@@ -216,7 +216,8 @@
             else:
                 factory.port_listening = port_listening
                 break
-        self._j.initiate(
+        # we don't want to wait for IQ result of initiate
+        defer.ensureDeferred(self._j.initiate(
             client,
             to_jid,
             [
@@ -226,8 +227,8 @@
                     "app_kwargs": {"stream_object": factory},
                 }
             ],
-        )
-        defer.returnValue(str(port))
+        ))
+        return str(port)
 
     def jingleSessionInit(self, client, session, content_name, stream_object):
         content_data = session["contents"][content_name]
--- a/sat/plugins/plugin_xep_0166.py	Thu Nov 12 14:53:15 2020 +0100
+++ b/sat/plugins/plugin_xep_0166.py	Thu Nov 12 14:53:15 2020 +0100
@@ -19,6 +19,7 @@
 
 import uuid
 import time
+from typing import Tuple
 from collections import namedtuple
 from zope.interface import implementer
 from twisted.words.protocols.jabber import jid
@@ -336,8 +337,34 @@
         """
         return self._buildJingleElt(client, session, XEP_0166.A_SESSION_INFO)
 
-    @defer.inlineCallbacks
-    def initiate(self, client, peer_jid, contents):
+    def getApplication(self, namespace: str) -> object:
+        """Retreive application corresponding to a namespace
+
+        @raise exceptions.NotFound if application can't be found
+        """
+        try:
+            return self._applications[namespace]
+        except KeyError:
+            raise exceptions.NotFound(
+                f"No application registered for {namespace}"
+            )
+
+    def getContentData(self, content: dict) -> Tuple[object, list, dict, str]:
+        """"Retrieve application and its argument from content"""
+        app_ns = content["app_ns"]
+        try:
+            application = self.getApplication(app_ns)
+        except exceptions.NotFound as e:
+            raise exceptions.InternalError(str(e))
+        app_args = content.get("app_args", [])
+        app_kwargs = content.get("app_kwargs", {})
+        try:
+            content_name = content["name"]
+        except KeyError:
+            content_name = content["name"] = str(uuid.uuid4())
+        return application, app_args, app_kwargs, content_name
+
+    async def initiate(self, client, peer_jid, contents):
         """Send a session initiation request
 
         @param peer_jid(jid.JID): jid to establith session with
@@ -352,7 +379,7 @@
                     default to BOTH (see XEP-0166 ยง7.3)
                 - app_args(list): args to pass to the application plugin
                 - app_kwargs(dict): keyword args to pass to the application plugin
-        @return D(unicode): jingle session id
+        @return (unicode): jingle session id
         """
         assert contents  # there must be at least one content
         if (peer_jid == client.jid
@@ -371,6 +398,13 @@
             "started": time.time(),
             "contents": {},
         }
+
+        if not await self.host.trigger.asyncPoint(
+            "XEP-0166_initiate",
+            client, session, contents
+        ):
+            return
+
         iq_elt, jingle_elt = self._buildJingleElt(
             client, session, XEP_0166.A_SESSION_INITIATE
         )
@@ -380,13 +414,7 @@
 
         for content in contents:
             # we get the application plugin
-            app_ns = content["app_ns"]
-            try:
-                application = self._applications[app_ns]
-            except KeyError:
-                raise exceptions.InternalError(
-                    "No application registered for {}".format(app_ns)
-                )
+            application, app_args, app_kwargs, content_name = self.getContentData(content)
 
             # and the transport plugin
             transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING)
@@ -406,15 +434,10 @@
                 "creator": XEP_0166.ROLE_INITIATOR,
                 "senders": content.get("senders", "both"),
             }
-            try:
-                content_name = content["name"]
-            except KeyError:
-                content_name = str(uuid.uuid4())
-            else:
-                if content_name in contents_dict:
-                    raise exceptions.InternalError(
-                        "There is already a content with this name"
-                    )
+            if content_name in contents_dict:
+                raise exceptions.InternalError(
+                    "There is already a content with this name"
+                )
             contents_dict[content_name] = content_data
 
             # we construct the content element
@@ -427,21 +450,21 @@
                 pass
 
             # then the description element
-            app_args = content.get("app_args", [])
-            app_kwargs = content.get("app_kwargs", {})
-            desc_elt = yield application.handler.jingleSessionInit(
+            desc_elt = await utils.asDeferred(
+                application.handler.jingleSessionInit,
                 client, session, content_name, *app_args, **app_kwargs
             )
             content_elt.addChild(desc_elt)
 
             # and the transport one
-            transport_elt = yield transport.handler.jingleSessionInit(
+            transport_elt = await utils.asDeferred(
+                transport.handler.jingleSessionInit,
                 client, session, content_name
             )
             content_elt.addChild(transport_elt)
 
         try:
-            yield iq_elt.send()
+            await iq_elt.send()
         except Exception as e:
             failure_ = failure.Failure(e)
             self._iqError(failure_, sid, client)
--- a/sat/plugins/plugin_xep_0234.py	Thu Nov 12 14:53:15 2020 +0100
+++ b/sat/plugins/plugin_xep_0234.py	Thu Nov 12 14:53:15 2020 +0100
@@ -405,6 +405,11 @@
 
     # jingle callbacks
 
+    def jingleDescriptionElt(
+        self, client, session, content_name, filepath, name, extra, progress_id_d
+    ):
+        return domish.Element((NS_JINGLE_FT, "description"))
+
     def jingleSessionInit(
         self, client, session, content_name, filepath, name, extra, progress_id_d
     ):
@@ -423,7 +428,8 @@
         assert "file_path" not in application_data
         application_data["file_path"] = filepath
         file_data = application_data["file_data"] = {}
-        desc_elt = domish.Element((NS_JINGLE_FT, "description"))
+        desc_elt = self.jingleDescriptionElt(
+            client, session, content_name, filepath, name, extra, progress_id_d)
         file_elt = desc_elt.addElement("file")
 
         if content_data["senders"] == self._j.ROLE_INITIATOR: