comparison sat/plugins/plugin_xep_0198.py @ 3011:93da7c6f8e0c

plugin XEP-0198: retrieve missing messages + send buffered ones on hot reconnection: Missing one2one messages are now retrieved with MAM on hot reconnection, and buffered ones (which have most probably not been received by the server) are resent at the end of the reconnection workflow. IQ results are not re-sent on hot reconnection, as they don't make sense anymore with a new session. fix 330
author Goffi <goffi@goffi.org>
date Wed, 17 Jul 2019 09:28:35 +0200
parents c8c68a3b0a79
children c9f03b1eb64d
comparison
equal deleted inserted replaced
3010:e6806aaab16d 3011:93da7c6f8e0c
39 C.PI_IMPORT_NAME: u"XEP-0198", 39 C.PI_IMPORT_NAME: u"XEP-0198",
40 C.PI_TYPE: u"XEP", 40 C.PI_TYPE: u"XEP",
41 C.PI_MODES: C.PLUG_MODE_BOTH, 41 C.PI_MODES: C.PLUG_MODE_BOTH,
42 C.PI_PROTOCOLS: [u"XEP-0198"], 42 C.PI_PROTOCOLS: [u"XEP-0198"],
43 C.PI_DEPENDENCIES: [], 43 C.PI_DEPENDENCIES: [],
44 C.PI_RECOMMENDATIONS: [u"XEP-0045"], 44 C.PI_RECOMMENDATIONS: [u"XEP-0045", u"XEP-0313"],
45 C.PI_MAIN: u"XEP_0198", 45 C.PI_MAIN: u"XEP_0198",
46 C.PI_HANDLER: u"yes", 46 C.PI_HANDLER: u"yes",
47 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), 47 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""),
48 } 48 }
49 49
122 log.error(u"req_timer has been called/cancelled but not reset") 122 log.error(u"req_timer has been called/cancelled but not reset")
123 else: 123 else:
124 self.req_timer.cancel() 124 self.req_timer.cancel()
125 self.req_timer = None 125 self.req_timer = None
126 126
127 def getBufferCopy(self):
128 return list(self.buffer)
129
127 130
128 class XEP_0198(object): 131 class XEP_0198(object):
129 # FIXME: location is not handled yet 132 # FIXME: location is not handled yet
130 133
131 def __init__(self, host): 134 def __init__(self, host):
244 diff = server_acked - session.buffer_idx 247 diff = server_acked - session.buffer_idx
245 for i in xrange(diff): 248 for i in xrange(diff):
246 session.buffer.pop() 249 session.buffer.pop()
247 session.buffer_idx += diff 250 session.buffer_idx += diff
248 251
252 def replayBuffer(self, client, buffer_, discard_results=False):
253 """Resend all stanza in buffer
254
255 @param buffer_(collection.deque, list): buffer to replay
256 the buffer will be cleared by this method
257 @param discard_results(bool): if True, don't replay IQ result stanzas
258 """
259 while True:
260 try:
261 stanza = buffer_.pop()
262 except IndexError:
263 break
264 else:
265 if ((discard_results
266 and stanza.name == u'iq'
267 and stanza.getAttribute(u'type') == 'result')):
268 continue
269 client.send(stanza)
270
249 def sendAck(self, client): 271 def sendAck(self, client):
250 """Send an answer element with current IN counter""" 272 """Send an answer element with current IN counter"""
251 a_elt = domish.Element((NS_SM, 'a')) 273 a_elt = domish.Element((NS_SM, 'a'))
252 a_elt['h'] = unicode(client._xep_0198_session.in_counter) 274 a_elt['h'] = unicode(client._xep_0198_session.in_counter)
253 client.send(a_elt) 275 client.send(a_elt)
343 del session.resuming 365 del session.resuming
344 server_acked = int(enabled_elt['h']) 366 server_acked = int(enabled_elt['h'])
345 self.updateBuffer(session, server_acked) 367 self.updateBuffer(session, server_acked)
346 resend_count = len(session.buffer) 368 resend_count = len(session.buffer)
347 # we resend all stanza which have not been received properly 369 # we resend all stanza which have not been received properly
348 while True: 370 self.replayBuffer(client, session.buffer)
349 try:
350 stanza = session.buffer.pop()
351 except IndexError:
352 break
353 else:
354 client.send(stanza)
355 # now we can continue the session 371 # now we can continue the session
356 session.enabled = True 372 session.enabled = True
357 d_time = time.time() - session.disconnected_time 373 d_time = time.time() - session.disconnected_time
358 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} " 374 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} "
359 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) 375 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count))
360 376
361 def onFailed(self, failed_elt, client): 377 def onFailed(self, failed_elt, client):
362 session = client._xep_0198_session 378 session = client._xep_0198_session
363 condition_elt = failed_elt.firstChildElement() 379 condition_elt = failed_elt.firstChildElement()
380 buffer_ = session.getBufferCopy()
364 session.reset() 381 session.reset()
365 382
366 try: 383 try:
367 del session.resuming 384 del session.resuming
368 except AttributeError: 385 except AttributeError:
394 if client.conn_deferred.called: 411 if client.conn_deferred.called:
395 client.conn_deferred = defer.Deferred() 412 client.conn_deferred = defer.Deferred()
396 else: 413 else:
397 log.error(u"conn_deferred should be called at this point") 414 log.error(u"conn_deferred should be called at this point")
398 plg_0045 = self.host.plugins.get(u'XEP-0045') 415 plg_0045 = self.host.plugins.get(u'XEP-0045')
416 plg_0313 = self.host.plugins.get(u'XEP-0313')
417
418 # FIXME: we should call all loaded plugins with generic callbacks
419 # (e.g. prepareResume and resume), so a hot resuming can be done
420 # properly for all plugins.
421
399 if plg_0045 is not None: 422 if plg_0045 is not None:
400 # we have to remove joined rooms 423 # we have to remove joined rooms
401 muc_join_args = plg_0045.popRooms(client) 424 muc_join_args = plg_0045.popRooms(client)
402 # we need to recreate roster 425 # we need to recreate roster
403 client.handlers.remove(client.roster) 426 client.handlers.remove(client.roster)
413 d.addCallback(lambda __: self._XMLInitTrigger(client)) 436 d.addCallback(lambda __: self._XMLInitTrigger(client))
414 # then we have to re-request the roster, as changes may have occured 437 # then we have to re-request the roster, as changes may have occured
415 d.addCallback(lambda __: client.roster.requestRoster()) 438 d.addCallback(lambda __: client.roster.requestRoster())
416 # we add got_roster to be sure to have roster before sending initial presence 439 # we add got_roster to be sure to have roster before sending initial presence
417 d.addCallback(lambda __: client.roster.got_roster) 440 d.addCallback(lambda __: client.roster.got_roster)
441 if plg_0313 is not None:
442 # we retrieve one2one MAM archives
443 d.addCallback(lambda __: plg_0313.resume(client))
418 # initial presence must be sent manually 444 # initial presence must be sent manually
419 d.addCallback(lambda __: client.presence.available()) 445 d.addCallback(lambda __: client.presence.available())
420 if plg_0045 is not None: 446 if plg_0045 is not None:
447 # we re-join MUC rooms
421 muc_d_list = defer.DeferredList( 448 muc_d_list = defer.DeferredList(
422 [plg_0045.join(*args) for args in muc_join_args]) 449 [plg_0045.join(*args) for args in muc_join_args])
423 d.addCallback(lambda __: muc_d_list) 450 d.addCallback(lambda __: muc_d_list)
451 # at the end we replay the buffer, as those stanzas have probably not
452 # been received
453 d.addCallback(lambda __: self.replayBuffer(client, buffer_,
454 discard_results=True))
424 455
425 def onReceive(self, element, client): 456 def onReceive(self, element, client):
426 if not client.is_component: 457 if not client.is_component:
427 session = client._xep_0198_session 458 session = client._xep_0198_session
428 if session.enabled and element.name.lower() in C.STANZA_NAMES: 459 if session.enabled and element.name.lower() in C.STANZA_NAMES: