changeset 3011:93da7c6f8e0c

plugin XEP-0198: retrieve missing messages + send buffered ones on hot reconnection: Missing one2one messages are now retrieved with MAM on hot reconnection, and buffered ones (which have most probably not been received by the server) are resent at the end of the reconnection workflow. IQ results are not re-sent on hot reconnection, as they don't make sense anymore with a new session. fix 330
author Goffi <goffi@goffi.org>
date Wed, 17 Jul 2019 09:28:35 +0200
parents e6806aaab16d
children 2224fbbd45dd
files sat/plugins/plugin_xep_0198.py sat/plugins/plugin_xep_0313.py
diffstat 2 files changed, 44 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0198.py	Wed Jul 17 09:28:34 2019 +0200
+++ b/sat/plugins/plugin_xep_0198.py	Wed Jul 17 09:28:35 2019 +0200
@@ -41,7 +41,7 @@
     C.PI_MODES: C.PLUG_MODE_BOTH,
     C.PI_PROTOCOLS: [u"XEP-0198"],
     C.PI_DEPENDENCIES: [],
-    C.PI_RECOMMENDATIONS: [u"XEP-0045"],
+    C.PI_RECOMMENDATIONS: [u"XEP-0045", u"XEP-0313"],
     C.PI_MAIN: u"XEP_0198",
     C.PI_HANDLER: u"yes",
     C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""),
@@ -124,6 +124,9 @@
                 self.req_timer.cancel()
             self.req_timer = None
 
+    def getBufferCopy(self):
+        return list(self.buffer)
+
 
 class XEP_0198(object):
     # FIXME: location is not handled yet
@@ -246,6 +249,25 @@
                 session.buffer.pop()
             session.buffer_idx += diff
 
+    def replayBuffer(self, client, buffer_, discard_results=False):
+        """Resend all stanza in buffer
+
+        @param buffer_(collection.deque, list): buffer to replay
+            the buffer will be cleared by this method
+        @param discard_results(bool): if True, don't replay IQ result stanzas
+        """
+        while True:
+            try:
+                stanza = buffer_.pop()
+            except IndexError:
+                break
+            else:
+                if ((discard_results
+                     and stanza.name == u'iq'
+                     and stanza.getAttribute(u'type') == 'result')):
+                    continue
+                client.send(stanza)
+
     def sendAck(self, client):
         """Send an answer element with current IN counter"""
         a_elt = domish.Element((NS_SM, 'a'))
@@ -345,13 +367,7 @@
         self.updateBuffer(session, server_acked)
         resend_count = len(session.buffer)
         # we resend all stanza which have not been received properly
-        while True:
-            try:
-                stanza = session.buffer.pop()
-            except IndexError:
-                break
-            else:
-                client.send(stanza)
+        self.replayBuffer(client, session.buffer)
         # now we can continue the session
         session.enabled = True
         d_time = time.time() - session.disconnected_time
@@ -361,6 +377,7 @@
     def onFailed(self, failed_elt, client):
         session = client._xep_0198_session
         condition_elt = failed_elt.firstChildElement()
+        buffer_ = session.getBufferCopy()
         session.reset()
 
         try:
@@ -396,6 +413,12 @@
             else:
                 log.error(u"conn_deferred should be called at this point")
             plg_0045 = self.host.plugins.get(u'XEP-0045')
+            plg_0313 = self.host.plugins.get(u'XEP-0313')
+
+            # FIXME: we should call all loaded plugins with generic callbacks
+            #        (e.g. prepareResume and resume), so a hot resuming can be done
+            #        properly for all plugins.
+
             if plg_0045 is not None:
                 # we have to remove joined rooms
                 muc_join_args = plg_0045.popRooms(client)
@@ -415,12 +438,20 @@
             d.addCallback(lambda __: client.roster.requestRoster())
             # we add got_roster to be sure to have roster before sending initial presence
             d.addCallback(lambda __: client.roster.got_roster)
+            if plg_0313 is not None:
+                # we retrieve one2one MAM archives
+                d.addCallback(lambda __: plg_0313.resume(client))
             # initial presence must be sent manually
             d.addCallback(lambda __: client.presence.available())
             if plg_0045 is not None:
+                # we re-join MUC rooms
                 muc_d_list = defer.DeferredList(
                     [plg_0045.join(*args) for args in muc_join_args])
                 d.addCallback(lambda __: muc_d_list)
+            # at the end we replay the buffer, as those stanzas have probably not
+            # been received
+            d.addCallback(lambda __: self.replayBuffer(client, buffer_,
+                                                       discard_results=True))
 
     def onReceive(self, element, client):
         if not client.is_component:
--- a/sat/plugins/plugin_xep_0313.py	Wed Jul 17 09:28:34 2019 +0200
+++ b/sat/plugins/plugin_xep_0313.py	Wed Jul 17 09:28:35 2019 +0200
@@ -73,7 +73,8 @@
             async=True)
 
     @defer.inlineCallbacks
-    def profileConnected(self, client):
+    def resume(self, client):
+        """Retrieve one2one messages received since the last we have in local storage"""
         stanza_id_data = yield self.host.memory.storage.getPrivates(
             mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile)
         stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID)
@@ -142,6 +143,9 @@
             log.info(_(u"We have received {num_mess} message(s) while offline.")
                 .format(num_mess=count))
 
+    def profileConnected(self, client):
+        return self.resume(client)
+
     def getHandler(self, client):
         mam_client = client._mam = SatMAMClient(self)
         return mam_client