Mercurial > libervia-backend
comparison sat/core/xmpp.py @ 2645:f2cf1daa42cb
core: added async TriggerManager
Async trigger manager add an asyncPoint method which can be used with Deferred (or sync method).
This allow a triggers do to some long operations like doing network requests.
MessageReceived and sendMessageData now use asyncPoint.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 29 Jul 2018 19:22:51 +0200 |
parents | 189e38fb11ff |
children | 712cb4ff3e13 |
comparison
equal
deleted
inserted
replaced
2644:e107089d6640 | 2645:f2cf1daa42cb |
---|---|
419 "subject": subject, | 419 "subject": subject, |
420 "type": mess_type, | 420 "type": mess_type, |
421 "extra": extra, | 421 "extra": extra, |
422 "timestamp": time.time(), | 422 "timestamp": time.time(), |
423 } | 423 } |
424 pre_xml_treatments = ( | 424 # XXX: plugin can add their pre XML treatments to this deferred |
425 defer.Deferred() | 425 pre_xml_treatments = defer.Deferred() |
426 ) # XXX: plugin can add their pre XML treatments to this deferred | 426 # XXX: plugin can add their post XML treatments to this deferred |
427 post_xml_treatments = ( | 427 post_xml_treatments = defer.Deferred() |
428 defer.Deferred() | |
429 ) # XXX: plugin can add their post XML treatments to this deferred | |
430 | 428 |
431 if data["type"] == C.MESS_TYPE_AUTO: | 429 if data["type"] == C.MESS_TYPE_AUTO: |
432 # we try to guess the type | 430 # we try to guess the type |
433 if data["subject"]: | 431 if data["subject"]: |
434 data["type"] = C.MESS_TYPE_NORMAL | 432 data["type"] = C.MESS_TYPE_NORMAL |
435 elif not data[ | 433 elif not data["to"].resource: # if to JID has a resource, |
436 "to" | 434 # the type is not 'groupchat' |
437 ].resource: # if to JID has a resource, the type is not 'groupchat' | |
438 # we may have a groupchat message, we check if the we know this jid | 435 # we may have a groupchat message, we check if the we know this jid |
439 try: | 436 try: |
440 entity_type = self.host_app.memory.getEntityData( | 437 entity_type = self.host_app.memory.getEntityData( |
441 data["to"], ["type"], self.profile | 438 data["to"], ["type"], self.profile |
442 )["type"] | 439 )["type"] |
630 # encryption is implemented | 627 # encryption is implemented |
631 # if not self.host_app.trigger.point("send", self, obj): | 628 # if not self.host_app.trigger.point("send", self, obj): |
632 # return | 629 # return |
633 super(SatXMPPClient, self).send(obj) | 630 super(SatXMPPClient, self).send(obj) |
634 | 631 |
632 @defer.inlineCallbacks | |
635 def sendMessageData(self, mess_data): | 633 def sendMessageData(self, mess_data): |
636 """Convenient method to send message data to stream | 634 """Convenient method to send message data to stream |
637 | 635 |
638 This method will send mess_data[u'xml'] to stream, but a trigger is there | 636 This method will send mess_data[u'xml'] to stream, but a trigger is there |
639 The trigger can't be cancelled, it's a good place for e2e encryption which | 637 The trigger can't be cancelled, it's a good place for e2e encryption which |
640 don't handle full stanza encryption | 638 don't handle full stanza encryption |
639 This trigger can return a Deferred (it's an asyncPoint) | |
641 @param mess_data(dict): message data as constructed by onMessage workflow | 640 @param mess_data(dict): message data as constructed by onMessage workflow |
642 @return (dict): mess_data (so it can be used in a deferred chain) | 641 @return (dict): mess_data (so it can be used in a deferred chain) |
643 """ | 642 """ |
644 # XXX: This is the last trigger before u"send" (last but one globally) | 643 # XXX: This is the last trigger before u"send" (last but one globally) |
645 # for sending message. | 644 # for sending message. |
646 # This is intented for e2e encryption which doesn't do full stanza | 645 # This is intented for e2e encryption which doesn't do full stanza |
647 # encryption (e.g. OTR) | 646 # encryption (e.g. OTR) |
648 # This trigger point can't cancel the method | 647 # This trigger point can't cancel the method |
649 self.host_app.trigger.point("sendMessageData", self, mess_data) | 648 yield self.host_app.trigger.asyncPoint("sendMessageData", self, mess_data) |
650 self.send(mess_data[u"xml"]) | 649 self.send(mess_data[u"xml"]) |
651 return mess_data | 650 defer.returnValue(mess_data) |
652 | 651 |
653 def feedback(self, to_jid, message): | 652 def feedback(self, to_jid, message): |
654 """Send message to frontends | 653 """Send message to frontends |
655 | 654 |
656 This message will be an info message, not recorded in history. | 655 This message will be an info message, not recorded in history. |
859 data["received_timestamp"] = unicode(time.time()) | 858 data["received_timestamp"] = unicode(time.time()) |
860 if parsed_delay.sender: | 859 if parsed_delay.sender: |
861 data["delay_sender"] = parsed_delay.sender.full() | 860 data["delay_sender"] = parsed_delay.sender.full() |
862 return data | 861 return data |
863 | 862 |
863 def _onMessageStartWorkflow(self, cont, client, message_elt, post_treat): | |
864 """Parse message and do post treatments | |
865 | |
866 It is the first callback called after MessageReceived trigger | |
867 @param cont(bool): workflow will continue only if this is True | |
868 @param message_elt(domish.Element): message stanza | |
869 may have be modified by triggers | |
870 @param post_treat(defer.Deferred): post parsing treatments | |
871 """ | |
872 if not cont: | |
873 return | |
874 data = self.parseMessage(message_elt, client=client) | |
875 post_treat.addCallback(self.skipEmptyMessage) | |
876 post_treat.addCallback(self.addToHistory, client) | |
877 post_treat.addCallback(self.bridgeSignal, client, data) | |
878 post_treat.addErrback(self.cancelErrorTrap) | |
879 post_treat.callback(data) | |
880 | |
864 def onMessage(self, message_elt): | 881 def onMessage(self, message_elt): |
865 # TODO: handle threads | 882 # TODO: handle threads |
866 client = self.parent | 883 client = self.parent |
867 if not "from" in message_elt.attributes: | 884 if not "from" in message_elt.attributes: |
868 message_elt["from"] = client.jid.host | 885 message_elt["from"] = client.jid.host |
869 log.debug(_(u"got message from: {from_}").format(from_=message_elt["from"])) | 886 log.debug(_(u"got message from: {from_}").format(from_=message_elt["from"])) |
870 post_treat = ( | 887 |
871 defer.Deferred() | 888 # plugin can add their treatments to this deferred |
872 ) # XXX: plugin can add their treatments to this deferred | 889 post_treat = defer.Deferred() |
873 | 890 |
874 if not self.host.trigger.point( | 891 d = self.host.trigger.asyncPoint( |
875 "MessageReceived", client, message_elt, post_treat | 892 "MessageReceived", client, message_elt, post_treat |
876 ): | 893 ) |
877 return | 894 |
878 | 895 d.addCallback(self._onMessageStartWorkflow, client, message_elt, post_treat) |
879 data = self.parseMessage(message_elt, client) | |
880 | |
881 post_treat.addCallback(self.skipEmptyMessage) | |
882 post_treat.addCallback(self.addToHistory, client) | |
883 post_treat.addCallback(self.bridgeSignal, client, data) | |
884 post_treat.addErrback(self.cancelErrorTrap) | |
885 post_treat.callback(data) | |
886 | 896 |
887 def skipEmptyMessage(self, data): | 897 def skipEmptyMessage(self, data): |
888 if not data["message"] and not data["extra"] and not data["subject"]: | 898 if not data["message"] and not data["extra"] and not data["subject"]: |
889 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) | 899 raise failure.Failure(exceptions.CancelError("Cancelled empty message")) |
890 return data | 900 return data |