# HG changeset patch # User Goffi # Date 1532884971 -7200 # Node ID f2cf1daa42cb96ebf4067d15c74b79ee51c5e5f2 # Parent e107089d6640b4bd802b372513fcf6cae74bbe1e core: added async TriggerManager Async trigger manager add an asyncPoint method which can be used with Deferred (or sync method). This allow a triggers do to some long operations like doing network requests. MessageReceived and sendMessageData now use asyncPoint. diff -r e107089d6640 -r f2cf1daa42cb sat/core/sat_main.py --- a/sat/core/sat_main.py Sun Jul 29 18:44:49 2018 +0200 +++ b/sat/core/sat_main.py Sun Jul 29 19:22:51 2018 +0200 @@ -32,7 +32,7 @@ from sat.core.constants import Const as C from sat.memory import memory from sat.memory import cache -from sat.tools import trigger +from sat.tools import async_trigger as trigger from sat.tools import utils from sat.tools.common import dynamic_import from sat.tools.common import regex diff -r e107089d6640 -r f2cf1daa42cb sat/core/xmpp.py --- a/sat/core/xmpp.py Sun Jul 29 18:44:49 2018 +0200 +++ b/sat/core/xmpp.py Sun Jul 29 19:22:51 2018 +0200 @@ -421,20 +421,17 @@ "extra": extra, "timestamp": time.time(), } - pre_xml_treatments = ( - defer.Deferred() - ) # XXX: plugin can add their pre XML treatments to this deferred - post_xml_treatments = ( - defer.Deferred() - ) # XXX: plugin can add their post XML treatments to this deferred + # XXX: plugin can add their pre XML treatments to this deferred + pre_xml_treatments = defer.Deferred() + # XXX: plugin can add their post XML treatments to this deferred + post_xml_treatments = defer.Deferred() if data["type"] == C.MESS_TYPE_AUTO: # we try to guess the type if data["subject"]: data["type"] = C.MESS_TYPE_NORMAL - elif not data[ - "to" - ].resource: # if to JID has a resource, the type is not 'groupchat' + elif not data["to"].resource: # if to JID has a resource, + # the type is not 'groupchat' # we may have a groupchat message, we check if the we know this jid try: entity_type = self.host_app.memory.getEntityData( @@ -632,12 +629,14 @@ #   return super(SatXMPPClient, self).send(obj) + @defer.inlineCallbacks def sendMessageData(self, mess_data): """Convenient method to send message data to stream This method will send mess_data[u'xml'] to stream, but a trigger is there The trigger can't be cancelled, it's a good place for e2e encryption which don't handle full stanza encryption + This trigger can return a Deferred (it's an asyncPoint) @param mess_data(dict): message data as constructed by onMessage workflow @return (dict): mess_data (so it can be used in a deferred chain) """ @@ -646,9 +645,9 @@ # This is intented for e2e encryption which doesn't do full stanza # encryption (e.g. OTR) # This trigger point can't cancel the method - self.host_app.trigger.point("sendMessageData", self, mess_data) + yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data) self.send(mess_data[u"xml"]) - return mess_data + defer.returnValue(mess_data) def feedback(self, to_jid, message): """Send message to frontends @@ -861,28 +860,39 @@ data["delay_sender"] = parsed_delay.sender.full() return data + def _onMessageStartWorkflow(self, cont, client, message_elt, post_treat): + """Parse message and do post treatments + + It is the first callback called after MessageReceived trigger + @param cont(bool): workflow will continue only if this is True + @param message_elt(domish.Element): message stanza + may have be modified by triggers + @param post_treat(defer.Deferred): post parsing treatments + """ + if not cont: + return + data = self.parseMessage(message_elt, client=client) + post_treat.addCallback(self.skipEmptyMessage) + post_treat.addCallback(self.addToHistory, client) + post_treat.addCallback(self.bridgeSignal, client, data) + post_treat.addErrback(self.cancelErrorTrap) + post_treat.callback(data) + def onMessage(self, message_elt): # TODO: handle threads client = self.parent if not "from" in message_elt.attributes: message_elt["from"] = client.jid.host log.debug(_(u"got message from: {from_}").format(from_=message_elt["from"])) - post_treat = ( - defer.Deferred() - ) # XXX: plugin can add their treatments to this deferred - if not self.host.trigger.point( + # plugin can add their treatments to this deferred + post_treat = defer.Deferred() + + d = self.host.trigger.asyncPoint( "MessageReceived", client, message_elt, post_treat - ): - return - - data = self.parseMessage(message_elt, client) + ) - post_treat.addCallback(self.skipEmptyMessage) - post_treat.addCallback(self.addToHistory, client) - post_treat.addCallback(self.bridgeSignal, client, data) - post_treat.addErrback(self.cancelErrorTrap) - post_treat.callback(data) + d.addCallback(self._onMessageStartWorkflow, client, message_elt, post_treat) def skipEmptyMessage(self, data): if not data["message"] and not data["extra"] and not data["subject"]: diff -r e107089d6640 -r f2cf1daa42cb sat/tools/async_trigger.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/tools/async_trigger.py Sun Jul 29 19:22:51 2018 +0200 @@ -0,0 +1,46 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SAT: a jabber client +# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +"""Misc usefull classes""" + +from sat.tools import trigger as sync_trigger +from twisted.internet import defer + +class TriggerManager(sync_trigger.TriggerManager): + """This is a TriggerManager with an new asyncPoint method""" + + @defer.inlineCallbacks + def asyncPoint(self, point_name, *args, **kwargs): + """This put a trigger point with potentially async Deferred + + All the triggers for that point will be run + @param point_name: name of the trigger point + @return D(bool): True if the action must be continued, False else + """ + if point_name not in self.__triggers: + defer.returnValue(True) + + for priority, trigger in self.__triggers[point_name]: + try: + cont = yield trigger(*args, **kwargs) + if not cont: + defer.returnValue(False) + except sync_trigger.SkipOtherTriggers: + break + defer.returnValue(True)