Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0198.py @ 3011:93da7c6f8e0c
plugin XEP-0198: retrieve missing messages + send buffered ones on hot reconnection:
Missing one2one messages are now retrieved with MAM on hot reconnection, and buffered ones
(which have most probably not been received by the server) are resent at the end of the
reconnection workflow. IQ results are not re-sent on hot reconnection, as they don't make
sense anymore with a new session.
fix 330
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 17 Jul 2019 09:28:35 +0200 |
parents | c8c68a3b0a79 |
children | c9f03b1eb64d |
comparison
equal
deleted
inserted
replaced
3010:e6806aaab16d | 3011:93da7c6f8e0c |
---|---|
39 C.PI_IMPORT_NAME: u"XEP-0198", | 39 C.PI_IMPORT_NAME: u"XEP-0198", |
40 C.PI_TYPE: u"XEP", | 40 C.PI_TYPE: u"XEP", |
41 C.PI_MODES: C.PLUG_MODE_BOTH, | 41 C.PI_MODES: C.PLUG_MODE_BOTH, |
42 C.PI_PROTOCOLS: [u"XEP-0198"], | 42 C.PI_PROTOCOLS: [u"XEP-0198"], |
43 C.PI_DEPENDENCIES: [], | 43 C.PI_DEPENDENCIES: [], |
44 C.PI_RECOMMENDATIONS: [u"XEP-0045"], | 44 C.PI_RECOMMENDATIONS: [u"XEP-0045", u"XEP-0313"], |
45 C.PI_MAIN: u"XEP_0198", | 45 C.PI_MAIN: u"XEP_0198", |
46 C.PI_HANDLER: u"yes", | 46 C.PI_HANDLER: u"yes", |
47 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), | 47 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), |
48 } | 48 } |
49 | 49 |
122 log.error(u"req_timer has been called/cancelled but not reset") | 122 log.error(u"req_timer has been called/cancelled but not reset") |
123 else: | 123 else: |
124 self.req_timer.cancel() | 124 self.req_timer.cancel() |
125 self.req_timer = None | 125 self.req_timer = None |
126 | 126 |
127 def getBufferCopy(self): | |
128 return list(self.buffer) | |
129 | |
127 | 130 |
128 class XEP_0198(object): | 131 class XEP_0198(object): |
129 # FIXME: location is not handled yet | 132 # FIXME: location is not handled yet |
130 | 133 |
131 def __init__(self, host): | 134 def __init__(self, host): |
244 diff = server_acked - session.buffer_idx | 247 diff = server_acked - session.buffer_idx |
245 for i in xrange(diff): | 248 for i in xrange(diff): |
246 session.buffer.pop() | 249 session.buffer.pop() |
247 session.buffer_idx += diff | 250 session.buffer_idx += diff |
248 | 251 |
252 def replayBuffer(self, client, buffer_, discard_results=False): | |
253 """Resend all stanza in buffer | |
254 | |
255 @param buffer_(collection.deque, list): buffer to replay | |
256 the buffer will be cleared by this method | |
257 @param discard_results(bool): if True, don't replay IQ result stanzas | |
258 """ | |
259 while True: | |
260 try: | |
261 stanza = buffer_.pop() | |
262 except IndexError: | |
263 break | |
264 else: | |
265 if ((discard_results | |
266 and stanza.name == u'iq' | |
267 and stanza.getAttribute(u'type') == 'result')): | |
268 continue | |
269 client.send(stanza) | |
270 | |
249 def sendAck(self, client): | 271 def sendAck(self, client): |
250 """Send an answer element with current IN counter""" | 272 """Send an answer element with current IN counter""" |
251 a_elt = domish.Element((NS_SM, 'a')) | 273 a_elt = domish.Element((NS_SM, 'a')) |
252 a_elt['h'] = unicode(client._xep_0198_session.in_counter) | 274 a_elt['h'] = unicode(client._xep_0198_session.in_counter) |
253 client.send(a_elt) | 275 client.send(a_elt) |
343 del session.resuming | 365 del session.resuming |
344 server_acked = int(enabled_elt['h']) | 366 server_acked = int(enabled_elt['h']) |
345 self.updateBuffer(session, server_acked) | 367 self.updateBuffer(session, server_acked) |
346 resend_count = len(session.buffer) | 368 resend_count = len(session.buffer) |
347 # we resend all stanza which have not been received properly | 369 # we resend all stanza which have not been received properly |
348 while True: | 370 self.replayBuffer(client, session.buffer) |
349 try: | |
350 stanza = session.buffer.pop() | |
351 except IndexError: | |
352 break | |
353 else: | |
354 client.send(stanza) | |
355 # now we can continue the session | 371 # now we can continue the session |
356 session.enabled = True | 372 session.enabled = True |
357 d_time = time.time() - session.disconnected_time | 373 d_time = time.time() - session.disconnected_time |
358 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} " | 374 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} " |
359 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) | 375 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) |
360 | 376 |
361 def onFailed(self, failed_elt, client): | 377 def onFailed(self, failed_elt, client): |
362 session = client._xep_0198_session | 378 session = client._xep_0198_session |
363 condition_elt = failed_elt.firstChildElement() | 379 condition_elt = failed_elt.firstChildElement() |
380 buffer_ = session.getBufferCopy() | |
364 session.reset() | 381 session.reset() |
365 | 382 |
366 try: | 383 try: |
367 del session.resuming | 384 del session.resuming |
368 except AttributeError: | 385 except AttributeError: |
394 if client.conn_deferred.called: | 411 if client.conn_deferred.called: |
395 client.conn_deferred = defer.Deferred() | 412 client.conn_deferred = defer.Deferred() |
396 else: | 413 else: |
397 log.error(u"conn_deferred should be called at this point") | 414 log.error(u"conn_deferred should be called at this point") |
398 plg_0045 = self.host.plugins.get(u'XEP-0045') | 415 plg_0045 = self.host.plugins.get(u'XEP-0045') |
416 plg_0313 = self.host.plugins.get(u'XEP-0313') | |
417 | |
418 # FIXME: we should call all loaded plugins with generic callbacks | |
419 # (e.g. prepareResume and resume), so a hot resuming can be done | |
420 # properly for all plugins. | |
421 | |
399 if plg_0045 is not None: | 422 if plg_0045 is not None: |
400 # we have to remove joined rooms | 423 # we have to remove joined rooms |
401 muc_join_args = plg_0045.popRooms(client) | 424 muc_join_args = plg_0045.popRooms(client) |
402 # we need to recreate roster | 425 # we need to recreate roster |
403 client.handlers.remove(client.roster) | 426 client.handlers.remove(client.roster) |
413 d.addCallback(lambda __: self._XMLInitTrigger(client)) | 436 d.addCallback(lambda __: self._XMLInitTrigger(client)) |
414 # then we have to re-request the roster, as changes may have occured | 437 # then we have to re-request the roster, as changes may have occured |
415 d.addCallback(lambda __: client.roster.requestRoster()) | 438 d.addCallback(lambda __: client.roster.requestRoster()) |
416 # we add got_roster to be sure to have roster before sending initial presence | 439 # we add got_roster to be sure to have roster before sending initial presence |
417 d.addCallback(lambda __: client.roster.got_roster) | 440 d.addCallback(lambda __: client.roster.got_roster) |
441 if plg_0313 is not None: | |
442 # we retrieve one2one MAM archives | |
443 d.addCallback(lambda __: plg_0313.resume(client)) | |
418 # initial presence must be sent manually | 444 # initial presence must be sent manually |
419 d.addCallback(lambda __: client.presence.available()) | 445 d.addCallback(lambda __: client.presence.available()) |
420 if plg_0045 is not None: | 446 if plg_0045 is not None: |
447 # we re-join MUC rooms | |
421 muc_d_list = defer.DeferredList( | 448 muc_d_list = defer.DeferredList( |
422 [plg_0045.join(*args) for args in muc_join_args]) | 449 [plg_0045.join(*args) for args in muc_join_args]) |
423 d.addCallback(lambda __: muc_d_list) | 450 d.addCallback(lambda __: muc_d_list) |
451 # at the end we replay the buffer, as those stanzas have probably not | |
452 # been received | |
453 d.addCallback(lambda __: self.replayBuffer(client, buffer_, | |
454 discard_results=True)) | |
424 | 455 |
425 def onReceive(self, element, client): | 456 def onReceive(self, element, client): |
426 if not client.is_component: | 457 if not client.is_component: |
427 session = client._xep_0198_session | 458 session = client._xep_0198_session |
428 if session.enabled and element.name.lower() in C.STANZA_NAMES: | 459 if session.enabled and element.name.lower() in C.STANZA_NAMES: |