diff sat/core/xmpp.py @ 2645:f2cf1daa42cb

core: added async TriggerManager Async trigger manager add an asyncPoint method which can be used with Deferred (or sync method). This allow a triggers do to some long operations like doing network requests. MessageReceived and sendMessageData now use asyncPoint.
author Goffi <goffi@goffi.org>
date Sun, 29 Jul 2018 19:22:51 +0200
parents 189e38fb11ff
children 712cb4ff3e13
line wrap: on
line diff
--- a/sat/core/xmpp.py	Sun Jul 29 18:44:49 2018 +0200
+++ b/sat/core/xmpp.py	Sun Jul 29 19:22:51 2018 +0200
@@ -421,20 +421,17 @@
             "extra": extra,
             "timestamp": time.time(),
         }
-        pre_xml_treatments = (
-            defer.Deferred()
-        )  # XXX: plugin can add their pre XML treatments to this deferred
-        post_xml_treatments = (
-            defer.Deferred()
-        )  # XXX: plugin can add their post XML treatments to this deferred
+        # XXX: plugin can add their pre XML treatments to this deferred
+        pre_xml_treatments = defer.Deferred()
+        # XXX: plugin can add their post XML treatments to this deferred
+        post_xml_treatments = defer.Deferred()
 
         if data["type"] == C.MESS_TYPE_AUTO:
             # we try to guess the type
             if data["subject"]:
                 data["type"] = C.MESS_TYPE_NORMAL
-            elif not data[
-                "to"
-            ].resource:  # if to JID has a resource, the type is not 'groupchat'
+            elif not data["to"].resource:  # if to JID has a resource,
+                                           # the type is not 'groupchat'
                 # we may have a groupchat message, we check if the we know this jid
                 try:
                     entity_type = self.host_app.memory.getEntityData(
@@ -632,12 +629,14 @@
         #      return
         super(SatXMPPClient, self).send(obj)
 
+    @defer.inlineCallbacks
     def sendMessageData(self, mess_data):
         """Convenient method to send message data to stream
 
         This method will send mess_data[u'xml'] to stream, but a trigger is there
         The trigger can't be cancelled, it's a good place for e2e encryption which
         don't handle full stanza encryption
+        This trigger can return a Deferred (it's an asyncPoint)
         @param mess_data(dict): message data as constructed by onMessage workflow
         @return (dict): mess_data (so it can be used in a deferred chain)
         """
@@ -646,9 +645,9 @@
         #      This is intented for e2e encryption which doesn't do full stanza
         #      encryption (e.g. OTR)
         #      This trigger point can't cancel the method
-        self.host_app.trigger.point("sendMessageData", self, mess_data)
+        yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data)
         self.send(mess_data[u"xml"])
-        return mess_data
+        defer.returnValue(mess_data)
 
     def feedback(self, to_jid, message):
         """Send message to frontends
@@ -861,28 +860,39 @@
                 data["delay_sender"] = parsed_delay.sender.full()
         return data
 
+    def _onMessageStartWorkflow(self, cont, client, message_elt, post_treat):
+        """Parse message and do post treatments
+
+        It is the first callback called after MessageReceived trigger
+        @param cont(bool): workflow will continue only if this is True
+        @param message_elt(domish.Element): message stanza
+            may have be modified by triggers
+        @param post_treat(defer.Deferred): post parsing treatments
+        """
+        if not cont:
+            return
+        data = self.parseMessage(message_elt, client=client)
+        post_treat.addCallback(self.skipEmptyMessage)
+        post_treat.addCallback(self.addToHistory, client)
+        post_treat.addCallback(self.bridgeSignal, client, data)
+        post_treat.addErrback(self.cancelErrorTrap)
+        post_treat.callback(data)
+
     def onMessage(self, message_elt):
         # TODO: handle threads
         client = self.parent
         if not "from" in message_elt.attributes:
             message_elt["from"] = client.jid.host
         log.debug(_(u"got message from: {from_}").format(from_=message_elt["from"]))
-        post_treat = (
-            defer.Deferred()
-        )  # XXX: plugin can add their treatments to this deferred
 
-        if not self.host.trigger.point(
+        # plugin can add their treatments to this deferred
+        post_treat = defer.Deferred()
+
+        d = self.host.trigger.asyncPoint(
             "MessageReceived", client, message_elt, post_treat
-        ):
-            return
-
-        data = self.parseMessage(message_elt, client)
+        )
 
-        post_treat.addCallback(self.skipEmptyMessage)
-        post_treat.addCallback(self.addToHistory, client)
-        post_treat.addCallback(self.bridgeSignal, client, data)
-        post_treat.addErrback(self.cancelErrorTrap)
-        post_treat.callback(data)
+        d.addCallback(self._onMessageStartWorkflow, client, message_elt, post_treat)
 
     def skipEmptyMessage(self, data):
         if not data["message"] and not data["extra"] and not data["subject"]: