changeset 2645:f2cf1daa42cb

core: added async TriggerManager Async trigger manager add an asyncPoint method which can be used with Deferred (or sync method). This allow a triggers do to some long operations like doing network requests. MessageReceived and sendMessageData now use asyncPoint.
author Goffi <goffi@goffi.org>
date Sun, 29 Jul 2018 19:22:51 +0200
parents e107089d6640
children 712cb4ff3e13
files sat/core/sat_main.py sat/core/xmpp.py sat/tools/async_trigger.py
diffstat 3 files changed, 81 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/sat_main.py	Sun Jul 29 18:44:49 2018 +0200
+++ b/sat/core/sat_main.py	Sun Jul 29 19:22:51 2018 +0200
@@ -32,7 +32,7 @@
 from sat.core.constants import Const as C
 from sat.memory import memory
 from sat.memory import cache
-from sat.tools import trigger
+from sat.tools import async_trigger as trigger
 from sat.tools import utils
 from sat.tools.common import dynamic_import
 from sat.tools.common import regex
--- a/sat/core/xmpp.py	Sun Jul 29 18:44:49 2018 +0200
+++ b/sat/core/xmpp.py	Sun Jul 29 19:22:51 2018 +0200
@@ -421,20 +421,17 @@
             "extra": extra,
             "timestamp": time.time(),
         }
-        pre_xml_treatments = (
-            defer.Deferred()
-        )  # XXX: plugin can add their pre XML treatments to this deferred
-        post_xml_treatments = (
-            defer.Deferred()
-        )  # XXX: plugin can add their post XML treatments to this deferred
+        # XXX: plugin can add their pre XML treatments to this deferred
+        pre_xml_treatments = defer.Deferred()
+        # XXX: plugin can add their post XML treatments to this deferred
+        post_xml_treatments = defer.Deferred()
 
         if data["type"] == C.MESS_TYPE_AUTO:
             # we try to guess the type
             if data["subject"]:
                 data["type"] = C.MESS_TYPE_NORMAL
-            elif not data[
-                "to"
-            ].resource:  # if to JID has a resource, the type is not 'groupchat'
+            elif not data["to"].resource:  # if to JID has a resource,
+                                           # the type is not 'groupchat'
                 # we may have a groupchat message, we check if the we know this jid
                 try:
                     entity_type = self.host_app.memory.getEntityData(
@@ -632,12 +629,14 @@
         #      return
         super(SatXMPPClient, self).send(obj)
 
+    @defer.inlineCallbacks
     def sendMessageData(self, mess_data):
         """Convenient method to send message data to stream
 
         This method will send mess_data[u'xml'] to stream, but a trigger is there
         The trigger can't be cancelled, it's a good place for e2e encryption which
         don't handle full stanza encryption
+        This trigger can return a Deferred (it's an asyncPoint)
         @param mess_data(dict): message data as constructed by onMessage workflow
         @return (dict): mess_data (so it can be used in a deferred chain)
         """
@@ -646,9 +645,9 @@
         #      This is intented for e2e encryption which doesn't do full stanza
         #      encryption (e.g. OTR)
         #      This trigger point can't cancel the method
-        self.host_app.trigger.point("sendMessageData", self, mess_data)
+        yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data)
         self.send(mess_data[u"xml"])
-        return mess_data
+        defer.returnValue(mess_data)
 
     def feedback(self, to_jid, message):
         """Send message to frontends
@@ -861,28 +860,39 @@
                 data["delay_sender"] = parsed_delay.sender.full()
         return data
 
+    def _onMessageStartWorkflow(self, cont, client, message_elt, post_treat):
+        """Parse message and do post treatments
+
+        It is the first callback called after MessageReceived trigger
+        @param cont(bool): workflow will continue only if this is True
+        @param message_elt(domish.Element): message stanza
+            may have be modified by triggers
+        @param post_treat(defer.Deferred): post parsing treatments
+        """
+        if not cont:
+            return
+        data = self.parseMessage(message_elt, client=client)
+        post_treat.addCallback(self.skipEmptyMessage)
+        post_treat.addCallback(self.addToHistory, client)
+        post_treat.addCallback(self.bridgeSignal, client, data)
+        post_treat.addErrback(self.cancelErrorTrap)
+        post_treat.callback(data)
+
     def onMessage(self, message_elt):
         # TODO: handle threads
         client = self.parent
         if not "from" in message_elt.attributes:
             message_elt["from"] = client.jid.host
         log.debug(_(u"got message from: {from_}").format(from_=message_elt["from"]))
-        post_treat = (
-            defer.Deferred()
-        )  # XXX: plugin can add their treatments to this deferred
 
-        if not self.host.trigger.point(
+        # plugin can add their treatments to this deferred
+        post_treat = defer.Deferred()
+
+        d = self.host.trigger.asyncPoint(
             "MessageReceived", client, message_elt, post_treat
-        ):
-            return
-
-        data = self.parseMessage(message_elt, client)
+        )
 
-        post_treat.addCallback(self.skipEmptyMessage)
-        post_treat.addCallback(self.addToHistory, client)
-        post_treat.addCallback(self.bridgeSignal, client, data)
-        post_treat.addErrback(self.cancelErrorTrap)
-        post_treat.callback(data)
+        d.addCallback(self._onMessageStartWorkflow, client, message_elt, post_treat)
 
     def skipEmptyMessage(self, data):
         if not data["message"] and not data["extra"] and not data["subject"]:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/tools/async_trigger.py	Sun Jul 29 19:22:51 2018 +0200
@@ -0,0 +1,46 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# SAT: a jabber client
+# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""Misc usefull classes"""
+
+from sat.tools import trigger as sync_trigger
+from twisted.internet import defer
+
+class TriggerManager(sync_trigger.TriggerManager):
+    """This is a TriggerManager with an new asyncPoint method"""
+
+    @defer.inlineCallbacks
+    def asyncPoint(self, point_name, *args, **kwargs):
+        """This put a trigger point with potentially async Deferred
+
+        All the triggers for that point will be run
+        @param point_name: name of the trigger point
+        @return D(bool): True if the action must be continued, False else
+        """
+        if point_name not in self.__triggers:
+            defer.returnValue(True)
+
+        for priority, trigger in self.__triggers[point_name]:
+            try:
+                cont = yield trigger(*args, **kwargs)
+                if not cont:
+                    defer.returnValue(False)
+            except sync_trigger.SkipOtherTriggers:
+                break
+        defer.returnValue(True)