changeset 2930:32b6893240e0

plugin XEP-0313: fixed archive retrieval on connection: last stanza-id is now saved for every message received, and not only retrieved from local message history anymore. If we are the sender of a message from archive, it is only added to local history, else it is injected to normal workflow.
author Goffi <goffi@goffi.org>
date Fri, 03 May 2019 13:00:08 +0200
parents e0429ff7f6b6
children b256e90612d0
files sat/plugins/plugin_xep_0313.py
diffstat 1 files changed, 83 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0313.py	Sun Apr 28 09:00:51 2019 +0200
+++ b/sat/plugins/plugin_xep_0313.py	Fri May 03 13:00:08 2019 +0200
@@ -39,9 +39,6 @@
 
 log = getLogger(__name__)
 
-
-MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
-
 PLUGIN_INFO = {
     C.PI_NAME: u"Message Archive Management",
     C.PI_IMPORT_NAME: u"XEP-0313",
@@ -55,6 +52,9 @@
 
 MAM_PREFIX = u"mam_"
 FILTER_PREFIX = MAM_PREFIX + "filter_"
+KEY_LAST_STANZA_ID = u"last_stanza_id"
+MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
+MESSAGE_STANZA_ID = '/message/stanza-id[@xmlns="{ns_stanza_id}"]'
 
 
 class XEP_0313(object):
@@ -64,6 +64,9 @@
         self.host.registerNamespace(u"mam", mam.NS_MAM)
         self._rsm = host.plugins[u"XEP-0059"]
         self._sid = host.plugins[u"XEP-0359"]
+        # Deferred used to store last stanza id in order of reception
+        self._last_stanza_id_d = defer.Deferred()
+        self._last_stanza_id_d.callback(None)
         host.bridge.addMethod(
             "MAMGet", ".plugin", in_sign='sss',
             out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives,
@@ -71,14 +74,19 @@
 
     @defer.inlineCallbacks
     def profileConnected(self, client):
-        last_mess = yield self.host.memory.historyGet(
-            None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT,
-                                          u'last_stanza_id': True},
-            profile=client.profile)
-        if not last_mess:
-            log.info(_(u"It seems that we have no MAM history yet"))
-            return
-        stanza_id = last_mess[0][-1][u'stanza_id']
+        stanza_id_data = yield self.host.memory.storage.getPrivates(
+            mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile)
+        stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID)
+        if stanza_id is None:
+            log.info(u"can't retrieve last stanza ID, checking history")
+            last_mess = yield self.host.memory.historyGet(
+                None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT,
+                                              u'last_stanza_id': True},
+                profile=client.profile)
+            if not last_mess:
+                log.info(_(u"It seems that we have no MAM history yet"))
+                return
+            stanza_id = last_mess[0][-1][u'stanza_id']
         rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
         mam_req = mam.MAMRequest(rsm_=rsm_req)
         complete = False
@@ -97,11 +105,36 @@
 
             for mess_elt in elt_list:
                 try:
-                    fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
+                    fwd_message_elt = self.getMessageFromResult(
+                        client, mess_elt, mam_req)
                 except exceptions.DataError:
                     continue
 
-                client.messageProt.onMessage(fwd_message_elt)
+                try:
+                    destinee = jid.JID(fwd_message_elt['to'])
+                except KeyError:
+                    log.warning(_(u'missing "to" attribute in forwarded message'))
+                    destinee = client.jid
+                if destinee.userhostJID() == client.jid.userhostJID():
+                    # message to use, we insert the forwarded message in the normal
+                    # workflow
+                    client.xmlstream.dispatch(fwd_message_elt)
+                else:
+                    # this message should be from us, we just add it to history
+                    try:
+                        from_jid = jid.JID(fwd_message_elt['from'])
+                    except KeyError:
+                        log.warning(_(u'missing "from" attribute in forwarded message'))
+                        from_jid = client.jid
+                    if from_jid.userhostJID() != client.jid.userhostJID():
+                        log.warning(_(
+                            u'was expecting a message sent by our jid, but this one if '
+                            u'from {from_jid}, ignoring\n{xml}').format(
+                                from_jid=from_jid.full(), xml=mess_elt.toXml()))
+                        continue
+                    # adding message to history
+                    mess_data = client.messageProt.parseMessage(fwd_message_elt)
+                    yield client.messageProt.addToHistory(mess_data)
 
         if not count:
             log.info(_(u"We have received no message while offline"))
@@ -110,7 +143,7 @@
                 .format(num_mess=count))
 
     def getHandler(self, client):
-        mam_client = client._mam = SatMAMClient()
+        mam_client = client._mam = SatMAMClient(self)
         return mam_client
 
     def parseExtra(self, extra, with_rsm=True):
@@ -198,7 +231,7 @@
         @param mam_req(mam.MAMRequest): request used (needed to get query_id)
         @param service(jid.JID, None): MAM service where the request has been sent
             None if it's user server
-        @return (domish.Element): <message/> that can be used directly with onMessage
+        @return domish.Element): <message/> that can be used directly with onMessage
         """
         if mess_elt.name != u"message":
             log.warning(u"unexpected stanza in archive: {xml}".format(
@@ -376,10 +409,45 @@
         # http://xmpp.org/extensions/xep-0313.html#prefs
         return client._mam.setPrefs(service, default, always, never)
 
+    def onMessageStanzaId(self, message_elt, client):
+        """Called when a message with a stanza-id is received
+
+        the messages' stanza ids are stored when received, so the last one can be used
+        to retrieve missing history on next connection
+        @param message_elt(domish.Element): <message> with a stanza-id
+        """
+        service_jid = client.jid.userhostJID()
+        stanza_id = self._sid.getStanzaId(message_elt, service_jid)
+        if stanza_id is None:
+            log.debug(u"Ignoring <message>, stanza id is not from our server")
+        else:
+            # we use self._last_stanza_id_d do be sure that last_stanza_id is stored in
+            # the order of reception
+            self._last_stanza_id_d.addCallback(
+                lambda __: self.host.memory.storage.setPrivateValue(
+                    namespace=mam.NS_MAM,
+                    key=KEY_LAST_STANZA_ID,
+                    value=stanza_id,
+                    profile=client.profile))
+
 
 class SatMAMClient(mam.MAMClient):
     implements(disco.IDisco)
 
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+
+    @property
+    def host(self):
+        return self.parent.host_app
+
+    def connectionInitialized(self):
+        observer_xpath = MESSAGE_STANZA_ID.format(
+            ns_stanza_id=self.host.ns_map[u'stanza_id'])
+        self.xmlstream.addObserver(
+            observer_xpath, self.plugin_parent.onMessageStanzaId, client=self.parent
+        )
+
     def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
         return [disco.DiscoFeature(mam.NS_MAM)]