# HG changeset patch # User Goffi # Date 1563348515 -7200 # Node ID 93da7c6f8e0ceb14a0cd5a1f621dc8b33b24b66d # Parent e6806aaab16d9e646153a675e94142a2ce37b555 plugin XEP-0198: retrieve missing messages + send buffered ones on hot reconnection: Missing one2one messages are now retrieved with MAM on hot reconnection, and buffered ones (which have most probably not been received by the server) are resent at the end of the reconnection workflow. IQ results are not re-sent on hot reconnection, as they don't make sense anymore with a new session. fix 330 diff -r e6806aaab16d -r 93da7c6f8e0c sat/plugins/plugin_xep_0198.py --- a/sat/plugins/plugin_xep_0198.py Wed Jul 17 09:28:34 2019 +0200 +++ b/sat/plugins/plugin_xep_0198.py Wed Jul 17 09:28:35 2019 +0200 @@ -41,7 +41,7 @@ C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: [u"XEP-0198"], C.PI_DEPENDENCIES: [], - C.PI_RECOMMENDATIONS: [u"XEP-0045"], + C.PI_RECOMMENDATIONS: [u"XEP-0045", u"XEP-0313"], C.PI_MAIN: u"XEP_0198", C.PI_HANDLER: u"yes", C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), @@ -124,6 +124,9 @@ self.req_timer.cancel() self.req_timer = None + def getBufferCopy(self): + return list(self.buffer) + class XEP_0198(object): # FIXME: location is not handled yet @@ -246,6 +249,25 @@ session.buffer.pop() session.buffer_idx += diff + def replayBuffer(self, client, buffer_, discard_results=False): + """Resend all stanza in buffer + + @param buffer_(collection.deque, list): buffer to replay + the buffer will be cleared by this method + @param discard_results(bool): if True, don't replay IQ result stanzas + """ + while True: + try: + stanza = buffer_.pop() + except IndexError: + break + else: + if ((discard_results + and stanza.name == u'iq' + and stanza.getAttribute(u'type') == 'result')): + continue + client.send(stanza) + def sendAck(self, client): """Send an answer element with current IN counter""" a_elt = domish.Element((NS_SM, 'a')) @@ -345,13 +367,7 @@ self.updateBuffer(session, server_acked) resend_count = len(session.buffer) # we resend all stanza which have not been received properly - while True: - try: - stanza = session.buffer.pop() - except IndexError: - break - else: - client.send(stanza) + self.replayBuffer(client, session.buffer) # now we can continue the session session.enabled = True d_time = time.time() - session.disconnected_time @@ -361,6 +377,7 @@ def onFailed(self, failed_elt, client): session = client._xep_0198_session condition_elt = failed_elt.firstChildElement() + buffer_ = session.getBufferCopy() session.reset() try: @@ -396,6 +413,12 @@ else: log.error(u"conn_deferred should be called at this point") plg_0045 = self.host.plugins.get(u'XEP-0045') + plg_0313 = self.host.plugins.get(u'XEP-0313') + + # FIXME: we should call all loaded plugins with generic callbacks + # (e.g. prepareResume and resume), so a hot resuming can be done + # properly for all plugins. + if plg_0045 is not None: # we have to remove joined rooms muc_join_args = plg_0045.popRooms(client) @@ -415,12 +438,20 @@ d.addCallback(lambda __: client.roster.requestRoster()) # we add got_roster to be sure to have roster before sending initial presence d.addCallback(lambda __: client.roster.got_roster) + if plg_0313 is not None: + # we retrieve one2one MAM archives + d.addCallback(lambda __: plg_0313.resume(client)) # initial presence must be sent manually d.addCallback(lambda __: client.presence.available()) if plg_0045 is not None: + # we re-join MUC rooms muc_d_list = defer.DeferredList( [plg_0045.join(*args) for args in muc_join_args]) d.addCallback(lambda __: muc_d_list) + # at the end we replay the buffer, as those stanzas have probably not + # been received + d.addCallback(lambda __: self.replayBuffer(client, buffer_, + discard_results=True)) def onReceive(self, element, client): if not client.is_component: diff -r e6806aaab16d -r 93da7c6f8e0c sat/plugins/plugin_xep_0313.py --- a/sat/plugins/plugin_xep_0313.py Wed Jul 17 09:28:34 2019 +0200 +++ b/sat/plugins/plugin_xep_0313.py Wed Jul 17 09:28:35 2019 +0200 @@ -73,7 +73,8 @@ async=True) @defer.inlineCallbacks - def profileConnected(self, client): + def resume(self, client): + """Retrieve one2one messages received since the last we have in local storage""" stanza_id_data = yield self.host.memory.storage.getPrivates( mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) @@ -142,6 +143,9 @@ log.info(_(u"We have received {num_mess} message(s) while offline.") .format(num_mess=count)) + def profileConnected(self, client): + return self.resume(client) + def getHandler(self, client): mam_client = client._mam = SatMAMClient(self) return mam_client