comparison sat/core/xmpp.py @ 2645:f2cf1daa42cb

core: added async TriggerManager Async trigger manager add an asyncPoint method which can be used with Deferred (or sync method). This allow a triggers do to some long operations like doing network requests. MessageReceived and sendMessageData now use asyncPoint.
author Goffi <goffi@goffi.org>
date Sun, 29 Jul 2018 19:22:51 +0200
parents 189e38fb11ff
children 712cb4ff3e13
comparison
equal deleted inserted replaced
2644:e107089d6640 2645:f2cf1daa42cb
419 "subject": subject, 419 "subject": subject,
420 "type": mess_type, 420 "type": mess_type,
421 "extra": extra, 421 "extra": extra,
422 "timestamp": time.time(), 422 "timestamp": time.time(),
423 } 423 }
424 pre_xml_treatments = ( 424 # XXX: plugin can add their pre XML treatments to this deferred
425 defer.Deferred() 425 pre_xml_treatments = defer.Deferred()
426 ) # XXX: plugin can add their pre XML treatments to this deferred 426 # XXX: plugin can add their post XML treatments to this deferred
427 post_xml_treatments = ( 427 post_xml_treatments = defer.Deferred()
428 defer.Deferred()
429 ) # XXX: plugin can add their post XML treatments to this deferred
430 428
431 if data["type"] == C.MESS_TYPE_AUTO: 429 if data["type"] == C.MESS_TYPE_AUTO:
432 # we try to guess the type 430 # we try to guess the type
433 if data["subject"]: 431 if data["subject"]:
434 data["type"] = C.MESS_TYPE_NORMAL 432 data["type"] = C.MESS_TYPE_NORMAL
435 elif not data[ 433 elif not data["to"].resource: # if to JID has a resource,
436 "to" 434 # the type is not 'groupchat'
437 ].resource: # if to JID has a resource, the type is not 'groupchat'
438 # we may have a groupchat message, we check if the we know this jid 435 # we may have a groupchat message, we check if the we know this jid
439 try: 436 try:
440 entity_type = self.host_app.memory.getEntityData( 437 entity_type = self.host_app.memory.getEntityData(
441 data["to"], ["type"], self.profile 438 data["to"], ["type"], self.profile
442 )["type"] 439 )["type"]
630 # encryption is implemented 627 # encryption is implemented
631 #  if not self.host_app.trigger.point("send", self, obj): 628 #  if not self.host_app.trigger.point("send", self, obj):
632 #   return 629 #   return
633 super(SatXMPPClient, self).send(obj) 630 super(SatXMPPClient, self).send(obj)
634 631
632 @defer.inlineCallbacks
635 def sendMessageData(self, mess_data): 633 def sendMessageData(self, mess_data):
636 """Convenient method to send message data to stream 634 """Convenient method to send message data to stream
637 635
638 This method will send mess_data[u'xml'] to stream, but a trigger is there 636 This method will send mess_data[u'xml'] to stream, but a trigger is there
639 The trigger can't be cancelled, it's a good place for e2e encryption which 637 The trigger can't be cancelled, it's a good place for e2e encryption which
640 don't handle full stanza encryption 638 don't handle full stanza encryption
639 This trigger can return a Deferred (it's an asyncPoint)
641 @param mess_data(dict): message data as constructed by onMessage workflow 640 @param mess_data(dict): message data as constructed by onMessage workflow
642 @return (dict): mess_data (so it can be used in a deferred chain) 641 @return (dict): mess_data (so it can be used in a deferred chain)
643 """ 642 """
644 # XXX: This is the last trigger before u"send" (last but one globally) 643 # XXX: This is the last trigger before u"send" (last but one globally)
645 # for sending message. 644 # for sending message.
646 # This is intented for e2e encryption which doesn't do full stanza 645 # This is intented for e2e encryption which doesn't do full stanza
647 # encryption (e.g. OTR) 646 # encryption (e.g. OTR)
648 # This trigger point can't cancel the method 647 # This trigger point can't cancel the method
649 self.host_app.trigger.point("sendMessageData", self, mess_data) 648 yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data)
650 self.send(mess_data[u"xml"]) 649 self.send(mess_data[u"xml"])
651 return mess_data 650 defer.returnValue(mess_data)
652 651
653 def feedback(self, to_jid, message): 652 def feedback(self, to_jid, message):
654 """Send message to frontends 653 """Send message to frontends
655 654
656 This message will be an info message, not recorded in history. 655 This message will be an info message, not recorded in history.
859 data["received_timestamp"] = unicode(time.time()) 858 data["received_timestamp"] = unicode(time.time())
860 if parsed_delay.sender: 859 if parsed_delay.sender:
861 data["delay_sender"] = parsed_delay.sender.full() 860 data["delay_sender"] = parsed_delay.sender.full()
862 return data 861 return data
863 862
863 def _onMessageStartWorkflow(self, cont, client, message_elt, post_treat):
864 """Parse message and do post treatments
865
866 It is the first callback called after MessageReceived trigger
867 @param cont(bool): workflow will continue only if this is True
868 @param message_elt(domish.Element): message stanza
869 may have be modified by triggers
870 @param post_treat(defer.Deferred): post parsing treatments
871 """
872 if not cont:
873 return
874 data = self.parseMessage(message_elt, client=client)
875 post_treat.addCallback(self.skipEmptyMessage)
876 post_treat.addCallback(self.addToHistory, client)
877 post_treat.addCallback(self.bridgeSignal, client, data)
878 post_treat.addErrback(self.cancelErrorTrap)
879 post_treat.callback(data)
880
864 def onMessage(self, message_elt): 881 def onMessage(self, message_elt):
865 # TODO: handle threads 882 # TODO: handle threads
866 client = self.parent 883 client = self.parent
867 if not "from" in message_elt.attributes: 884 if not "from" in message_elt.attributes:
868 message_elt["from"] = client.jid.host 885 message_elt["from"] = client.jid.host
869 log.debug(_(u"got message from: {from_}").format(from_=message_elt["from"])) 886 log.debug(_(u"got message from: {from_}").format(from_=message_elt["from"]))
870 post_treat = ( 887
871 defer.Deferred() 888 # plugin can add their treatments to this deferred
872 ) # XXX: plugin can add their treatments to this deferred 889 post_treat = defer.Deferred()
873 890
874 if not self.host.trigger.point( 891 d = self.host.trigger.asyncPoint(
875 "MessageReceived", client, message_elt, post_treat 892 "MessageReceived", client, message_elt, post_treat
876 ): 893 )
877 return 894
878 895 d.addCallback(self._onMessageStartWorkflow, client, message_elt, post_treat)
879 data = self.parseMessage(message_elt, client)
880
881 post_treat.addCallback(self.skipEmptyMessage)
882 post_treat.addCallback(self.addToHistory, client)
883 post_treat.addCallback(self.bridgeSignal, client, data)
884 post_treat.addErrback(self.cancelErrorTrap)
885 post_treat.callback(data)
886 896
887 def skipEmptyMessage(self, data): 897 def skipEmptyMessage(self, data):
888 if not data["message"] and not data["extra"] and not data["subject"]: 898 if not data["message"] and not data["extra"] and not data["subject"]:
889 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) 899 raise failure.Failure(exceptions.CancelError("Cancelled empty message"))
890 return data 900 return data