Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0198.py @ 3028:ab2696e34d29
Python 3 port:
/!\ this is a huge commit
/!\ starting from this commit, SàT is needs Python 3.6+
/!\ SàT maybe be instable or some feature may not work anymore, this will improve with time
This patch port backend, bridge and frontends to Python 3.
Roughly this has been done this way:
- 2to3 tools has been applied (with python 3.7)
- all references to python2 have been replaced with python3 (notably shebangs)
- fixed files not handled by 2to3 (notably the shell script)
- several manual fixes
- fixed issues reported by Python 3 that where not handled in Python 2
- replaced "async" with "async_" when needed (it's a reserved word from Python 3.7)
- replaced zope's "implements" with @implementer decorator
- temporary hack to handle data pickled in database, as str or bytes may be returned,
to be checked later
- fixed hash comparison for password
- removed some code which is not needed anymore with Python 3
- deactivated some code which needs to be checked (notably certificate validation)
- tested with jp, fixed reported issues until some basic commands worked
- ported Primitivus (after porting dependencies like urwid satext)
- more manual fixes
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 13 Aug 2019 19:08:41 +0200 |
parents | af9d71303605 |
children | 87b8808ac49d |
comparison
equal
deleted
inserted
replaced
3027:ff5bcb12ae60 | 3028:ab2696e34d29 |
---|---|
1 #!/usr/bin/env python2 | 1 #!/usr/bin/env python3 |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 | 3 |
4 # SàT plugin for managing raw XML log | 4 # SàT plugin for managing raw XML log |
5 # Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org) | 5 # Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org) |
6 | 6 |
26 from twisted.words.xish import domish | 26 from twisted.words.xish import domish |
27 from twisted.internet import defer | 27 from twisted.internet import defer |
28 from twisted.internet import task, reactor | 28 from twisted.internet import task, reactor |
29 from functools import partial | 29 from functools import partial |
30 from wokkel import disco, iwokkel | 30 from wokkel import disco, iwokkel |
31 from zope.interface import implements | 31 from zope.interface import implementer |
32 import collections | 32 import collections |
33 import time | 33 import time |
34 | 34 |
35 log = getLogger(__name__) | 35 log = getLogger(__name__) |
36 | 36 |
37 PLUGIN_INFO = { | 37 PLUGIN_INFO = { |
38 C.PI_NAME: u"Stream Management", | 38 C.PI_NAME: "Stream Management", |
39 C.PI_IMPORT_NAME: u"XEP-0198", | 39 C.PI_IMPORT_NAME: "XEP-0198", |
40 C.PI_TYPE: u"XEP", | 40 C.PI_TYPE: "XEP", |
41 C.PI_MODES: C.PLUG_MODE_BOTH, | 41 C.PI_MODES: C.PLUG_MODE_BOTH, |
42 C.PI_PROTOCOLS: [u"XEP-0198"], | 42 C.PI_PROTOCOLS: ["XEP-0198"], |
43 C.PI_DEPENDENCIES: [], | 43 C.PI_DEPENDENCIES: [], |
44 C.PI_RECOMMENDATIONS: [u"XEP-0045", u"XEP-0313"], | 44 C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0313"], |
45 C.PI_MAIN: u"XEP_0198", | 45 C.PI_MAIN: "XEP_0198", |
46 C.PI_HANDLER: u"yes", | 46 C.PI_HANDLER: "yes", |
47 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), | 47 C.PI_DESCRIPTION: _("""Implementation of Stream Management"""), |
48 } | 48 } |
49 | 49 |
50 NS_SM = u"urn:xmpp:sm:3" | 50 NS_SM = "urn:xmpp:sm:3" |
51 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]' | 51 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]' |
52 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]' | 52 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]' |
53 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]' | 53 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]' |
54 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]' | 54 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]' |
55 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]' | 55 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]' |
92 @enabled.setter | 92 @enabled.setter |
93 def enabled(self, enabled): | 93 def enabled(self, enabled): |
94 if enabled: | 94 if enabled: |
95 if self._enabled: | 95 if self._enabled: |
96 raise exceptions.InternalError( | 96 raise exceptions.InternalError( |
97 u"Stream Management can't be enabled twice") | 97 "Stream Management can't be enabled twice") |
98 self._enabled = True | 98 self._enabled = True |
99 callback, kw = self.callback_data | 99 callback, kw = self.callback_data |
100 self.timer = task.LoopingCall(callback, **kw) | 100 self.timer = task.LoopingCall(callback, **kw) |
101 self.timer.start(MAX_DELAY_ACK_R, now=False) | 101 self.timer.start(MAX_DELAY_ACK_R, now=False) |
102 else: | 102 else: |
117 self.session_id = self.location = None | 117 self.session_id = self.location = None |
118 self.ack_requested = False | 118 self.ack_requested = False |
119 self.last_ack_r = 0 | 119 self.last_ack_r = 0 |
120 if self.req_timer is not None: | 120 if self.req_timer is not None: |
121 if self.req_timer.active(): | 121 if self.req_timer.active(): |
122 log.error(u"req_timer has been called/cancelled but not reset") | 122 log.error("req_timer has been called/cancelled but not reset") |
123 else: | 123 else: |
124 self.req_timer.cancel() | 124 self.req_timer.cancel() |
125 self.req_timer = None | 125 self.req_timer = None |
126 | 126 |
127 def getBufferCopy(self): | 127 def getBufferCopy(self): |
132 # FIXME: location is not handled yet | 132 # FIXME: location is not handled yet |
133 | 133 |
134 def __init__(self, host): | 134 def __init__(self, host): |
135 log.info(_("Plugin Stream Management initialization")) | 135 log.info(_("Plugin Stream Management initialization")) |
136 self.host = host | 136 self.host = host |
137 host.registerNamespace(u'sm', NS_SM) | 137 host.registerNamespace('sm', NS_SM) |
138 host.trigger.add("stream_hooks", self.addHooks) | 138 host.trigger.add("stream_hooks", self.addHooks) |
139 host.trigger.add("xml_init", self._XMLInitTrigger) | 139 host.trigger.add("xml_init", self._XMLInitTrigger) |
140 host.trigger.add("disconnecting", self._disconnectingTrigger) | 140 host.trigger.add("disconnecting", self._disconnectingTrigger) |
141 host.trigger.add("disconnected", self._disconnectedTrigger) | 141 host.trigger.add("disconnected", self._disconnectedTrigger) |
142 try: | 142 try: |
143 self._ack_timeout = int(host.memory.getConfig("", "ack_timeout", ACK_TIMEOUT)) | 143 self._ack_timeout = int(host.memory.getConfig("", "ack_timeout", ACK_TIMEOUT)) |
144 except ValueError: | 144 except ValueError: |
145 log.error(_(u"Invalid ack_timeout value, please check your configuration")) | 145 log.error(_("Invalid ack_timeout value, please check your configuration")) |
146 self._ack_timeout = ACK_TIMEOUT | 146 self._ack_timeout = ACK_TIMEOUT |
147 if not self._ack_timeout: | 147 if not self._ack_timeout: |
148 log.info(_(u"Ack timeout disabled")) | 148 log.info(_("Ack timeout disabled")) |
149 else: | 149 else: |
150 log.info(_(u"Ack timeout set to {timeout}s").format( | 150 log.info(_("Ack timeout set to {timeout}s").format( |
151 timeout=self._ack_timeout)) | 151 timeout=self._ack_timeout)) |
152 | 152 |
153 def profileConnecting(self, client): | 153 def profileConnecting(self, client): |
154 client._xep_0198_session = ProfileSessionData(callback=self.checkAcks, | 154 client._xep_0198_session = ProfileSessionData(callback=self.checkAcks, |
155 client=client) | 155 client=client) |
163 send_hooks.append(partial(self.onSend, client=client)) | 163 send_hooks.append(partial(self.onSend, client=client)) |
164 return True | 164 return True |
165 | 165 |
166 def _XMLInitTrigger(self, client): | 166 def _XMLInitTrigger(self, client): |
167 """Enable or resume a stream mangement""" | 167 """Enable or resume a stream mangement""" |
168 if not (NS_SM, u'sm') in client.xmlstream.features: | 168 if not (NS_SM, 'sm') in client.xmlstream.features: |
169 log.warning(_( | 169 log.warning(_( |
170 u"Your server doesn't support stream management ({namespace}), this is " | 170 "Your server doesn't support stream management ({namespace}), this is " |
171 u"used to improve connection problems detection (like network outages). " | 171 "used to improve connection problems detection (like network outages). " |
172 u"Please ask your server administrator to enable this feature.".format( | 172 "Please ask your server administrator to enable this feature.".format( |
173 namespace=NS_SM))) | 173 namespace=NS_SM))) |
174 return True | 174 return True |
175 session = client._xep_0198_session | 175 session = client._xep_0198_session |
176 | 176 |
177 # a disconnect timer from a previous disconnection may still be active | 177 # a disconnect timer from a previous disconnection may still be active |
185 del session.disconnect_timer | 185 del session.disconnect_timer |
186 | 186 |
187 if session.resume_enabled: | 187 if session.resume_enabled: |
188 # we are resuming a session | 188 # we are resuming a session |
189 resume_elt = domish.Element((NS_SM, 'resume')) | 189 resume_elt = domish.Element((NS_SM, 'resume')) |
190 resume_elt['h'] = unicode(session.in_counter) | 190 resume_elt['h'] = str(session.in_counter) |
191 resume_elt['previd'] = session.session_id | 191 resume_elt['previd'] = session.session_id |
192 client.send(resume_elt) | 192 client.send(resume_elt) |
193 session.resuming = True | 193 session.resuming = True |
194 # session.enabled will be set on <resumed/> reception | 194 # session.enabled will be set on <resumed/> reception |
195 return False | 195 return False |
196 else: | 196 else: |
197 # we start a new session | 197 # we start a new session |
198 assert session.out_counter == 0 | 198 assert session.out_counter == 0 |
199 enable_elt = domish.Element((NS_SM, 'enable')) | 199 enable_elt = domish.Element((NS_SM, 'enable')) |
200 enable_elt[u'resume'] = u'true' | 200 enable_elt['resume'] = 'true' |
201 client.send(enable_elt) | 201 client.send(enable_elt) |
202 session.enabled = True | 202 session.enabled = True |
203 return True | 203 return True |
204 | 204 |
205 def _disconnectingTrigger(self, client): | 205 def _disconnectingTrigger(self, client): |
244 def updateBuffer(self, session, server_acked): | 244 def updateBuffer(self, session, server_acked): |
245 """Update buffer and buffer_index""" | 245 """Update buffer and buffer_index""" |
246 if server_acked > session.buffer_idx: | 246 if server_acked > session.buffer_idx: |
247 diff = server_acked - session.buffer_idx | 247 diff = server_acked - session.buffer_idx |
248 try: | 248 try: |
249 for i in xrange(diff): | 249 for i in range(diff): |
250 session.buffer.pop() | 250 session.buffer.pop() |
251 except IndexError: | 251 except IndexError: |
252 log.error( | 252 log.error( |
253 u"error while cleaning buffer, invalid index (buffer is empty):\n" | 253 "error while cleaning buffer, invalid index (buffer is empty):\n" |
254 u"diff = {diff}\n" | 254 "diff = {diff}\n" |
255 u"server_acked = {server_acked}\n" | 255 "server_acked = {server_acked}\n" |
256 u"buffer_idx = {buffer_id}".format( | 256 "buffer_idx = {buffer_id}".format( |
257 diff=diff, server_acked=server_acked, | 257 diff=diff, server_acked=server_acked, |
258 buffer_id=session.buffer_idx)) | 258 buffer_id=session.buffer_idx)) |
259 session.buffer_idx += diff | 259 session.buffer_idx += diff |
260 | 260 |
261 def replayBuffer(self, client, buffer_, discard_results=False): | 261 def replayBuffer(self, client, buffer_, discard_results=False): |
270 stanza = buffer_.pop() | 270 stanza = buffer_.pop() |
271 except IndexError: | 271 except IndexError: |
272 break | 272 break |
273 else: | 273 else: |
274 if ((discard_results | 274 if ((discard_results |
275 and stanza.name == u'iq' | 275 and stanza.name == 'iq' |
276 and stanza.getAttribute(u'type') == 'result')): | 276 and stanza.getAttribute('type') == 'result')): |
277 continue | 277 continue |
278 client.send(stanza) | 278 client.send(stanza) |
279 | 279 |
280 def sendAck(self, client): | 280 def sendAck(self, client): |
281 """Send an answer element with current IN counter""" | 281 """Send an answer element with current IN counter""" |
282 a_elt = domish.Element((NS_SM, 'a')) | 282 a_elt = domish.Element((NS_SM, 'a')) |
283 a_elt['h'] = unicode(client._xep_0198_session.in_counter) | 283 a_elt['h'] = str(client._xep_0198_session.in_counter) |
284 client.send(a_elt) | 284 client.send(a_elt) |
285 | 285 |
286 def requestAck(self, client): | 286 def requestAck(self, client): |
287 """Send a request element""" | 287 """Send a request element""" |
288 session = client._xep_0198_session | 288 session = client._xep_0198_session |
296 | 296 |
297 def _connectionFailed(self, failure_, connector): | 297 def _connectionFailed(self, failure_, connector): |
298 normal_host, normal_port = connector.normal_location | 298 normal_host, normal_port = connector.normal_location |
299 del connector.normal_location | 299 del connector.normal_location |
300 log.warning(_( | 300 log.warning(_( |
301 u"Connection failed using location given by server (host: {host}, port: " | 301 "Connection failed using location given by server (host: {host}, port: " |
302 u"{port}), switching to normal host and port (host: {normal_host}, port: " | 302 "{port}), switching to normal host and port (host: {normal_host}, port: " |
303 u"{normal_port})".format(host=connector.host, port=connector.port, | 303 "{normal_port})".format(host=connector.host, port=connector.port, |
304 normal_host=normal_host, normal_port=normal_port))) | 304 normal_host=normal_host, normal_port=normal_port))) |
305 connector.host, connector.port = normal_host, normal_port | 305 connector.host, connector.port = normal_host, normal_port |
306 connector.connectionFailed = connector.connectionFailed_ori | 306 connector.connectionFailed = connector.connectionFailed_ori |
307 del connector.connectionFailed_ori | 307 del connector.connectionFailed_ori |
308 return connector.connectionFailed(failure_) | 308 return connector.connectionFailed(failure_) |
310 def onEnabled(self, enabled_elt, client): | 310 def onEnabled(self, enabled_elt, client): |
311 session = client._xep_0198_session | 311 session = client._xep_0198_session |
312 session.in_counter = 0 | 312 session.in_counter = 0 |
313 | 313 |
314 # we check that resuming is possible and that we have a session id | 314 # we check that resuming is possible and that we have a session id |
315 resume = C.bool(enabled_elt.getAttribute(u'resume')) | 315 resume = C.bool(enabled_elt.getAttribute('resume')) |
316 session_id = enabled_elt.getAttribute(u'id') | 316 session_id = enabled_elt.getAttribute('id') |
317 if not session_id: | 317 if not session_id: |
318 log.warning(_(u'Incorrect <enabled/> element received, no "id" attribute')) | 318 log.warning(_('Incorrect <enabled/> element received, no "id" attribute')) |
319 if not resume or not session_id: | 319 if not resume or not session_id: |
320 log.warning(_( | 320 log.warning(_( |
321 u"You're server doesn't support session resuming with stream management, " | 321 "You're server doesn't support session resuming with stream management, " |
322 u"please contact your server administrator to enable it")) | 322 "please contact your server administrator to enable it")) |
323 return | 323 return |
324 | 324 |
325 session.session_id = session_id | 325 session.session_id = session_id |
326 | 326 |
327 # XXX: we disable resource binding, which must not be done | 327 # XXX: we disable resource binding, which must not be done |
328 # when we resume the session. | 328 # when we resume the session. |
329 client.factory.authenticator.res_binding = False | 329 client.factory.authenticator.res_binding = False |
330 | 330 |
331 # location, in case server want resuming session to be elsewhere | 331 # location, in case server want resuming session to be elsewhere |
332 try: | 332 try: |
333 location = enabled_elt[u'location'] | 333 location = enabled_elt['location'] |
334 except KeyError: | 334 except KeyError: |
335 pass | 335 pass |
336 else: | 336 else: |
337 # TODO: handle IPv6 here (in brackets, cf. XEP) | 337 # TODO: handle IPv6 here (in brackets, cf. XEP) |
338 try: | 338 try: |
339 domain, port = location.split(':', 1) | 339 domain, port = location.split(':', 1) |
340 port = int(port) | 340 port = int(port) |
341 except ValueError: | 341 except ValueError: |
342 log.warning(_(u"Invalid location received: {location}") | 342 log.warning(_("Invalid location received: {location}") |
343 .format(location=location)) | 343 .format(location=location)) |
344 else: | 344 else: |
345 session.location = (domain, port) | 345 session.location = (domain, port) |
346 # we monkey patch connector to use the new location | 346 # we monkey patch connector to use the new location |
347 connector = client.xmlstream.transport.connector | 347 connector = client.xmlstream.transport.connector |
352 connector.connectionFailed = partial(self._connectionFailed, | 352 connector.connectionFailed = partial(self._connectionFailed, |
353 connector=connector) | 353 connector=connector) |
354 | 354 |
355 # resuming time | 355 # resuming time |
356 try: | 356 try: |
357 max_s = int(enabled_elt[u'max']) | 357 max_s = int(enabled_elt['max']) |
358 except (ValueError, KeyError) as e: | 358 except (ValueError, KeyError) as e: |
359 if isinstance(e, ValueError): | 359 if isinstance(e, ValueError): |
360 log.warning(_(u'Invalid "max" attribute')) | 360 log.warning(_('Invalid "max" attribute')) |
361 max_s = RESUME_MAX | 361 max_s = RESUME_MAX |
362 log.info(_(u"Using default session max value ({max_s} s).".format( | 362 log.info(_("Using default session max value ({max_s} s).".format( |
363 max_s=max_s))) | 363 max_s=max_s))) |
364 log.info(_(u"Stream Management enabled")) | 364 log.info(_("Stream Management enabled")) |
365 else: | 365 else: |
366 log.info(_( | 366 log.info(_( |
367 u"Stream Management enabled, with a resumption time of {res_m} min" | 367 "Stream Management enabled, with a resumption time of {res_m} min" |
368 .format(res_m = max_s/60))) | 368 .format(res_m = max_s/60))) |
369 session.session_max = max_s | 369 session.session_max = max_s |
370 | 370 |
371 def onResumed(self, enabled_elt, client): | 371 def onResumed(self, enabled_elt, client): |
372 session = client._xep_0198_session | 372 session = client._xep_0198_session |
378 # we resend all stanza which have not been received properly | 378 # we resend all stanza which have not been received properly |
379 self.replayBuffer(client, session.buffer) | 379 self.replayBuffer(client, session.buffer) |
380 # now we can continue the session | 380 # now we can continue the session |
381 session.enabled = True | 381 session.enabled = True |
382 d_time = time.time() - session.disconnected_time | 382 d_time = time.time() - session.disconnected_time |
383 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} " | 383 log.info(_("Stream session resumed (disconnected for {d_time} s, {count} " |
384 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) | 384 "stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) |
385 | 385 |
386 def onFailed(self, failed_elt, client): | 386 def onFailed(self, failed_elt, client): |
387 session = client._xep_0198_session | 387 session = client._xep_0198_session |
388 condition_elt = failed_elt.firstChildElement() | 388 condition_elt = failed_elt.firstChildElement() |
389 buffer_ = session.getBufferCopy() | 389 buffer_ = session.getBufferCopy() |
391 | 391 |
392 try: | 392 try: |
393 del session.resuming | 393 del session.resuming |
394 except AttributeError: | 394 except AttributeError: |
395 # stream management can't be started at all | 395 # stream management can't be started at all |
396 msg = _(u"Can't use stream management") | 396 msg = _("Can't use stream management") |
397 if condition_elt is None: | 397 if condition_elt is None: |
398 log.error(msg + u'.') | 398 log.error(msg + '.') |
399 else: | 399 else: |
400 log.error(_(u"{msg}: {reason}").format( | 400 log.error(_("{msg}: {reason}").format( |
401 msg=msg, reason=condition_elt.name)) | 401 msg=msg, reason=condition_elt.name)) |
402 else: | 402 else: |
403 # only stream resumption failed, we can try full session init | 403 # only stream resumption failed, we can try full session init |
404 # XXX: we try to start full session init from this point, with many | 404 # XXX: we try to start full session init from this point, with many |
405 # variables/attributes already initialised with a potentially different | 405 # variables/attributes already initialised with a potentially different |
406 # jid. This is experimental and may not be safe. It may be more | 406 # jid. This is experimental and may not be safe. It may be more |
407 # secured to abord the connection and restart everything with a fresh | 407 # secured to abord the connection and restart everything with a fresh |
408 # client. | 408 # client. |
409 msg = _(u"stream resumption not possible, restarting full session") | 409 msg = _("stream resumption not possible, restarting full session") |
410 | 410 |
411 if condition_elt is None: | 411 if condition_elt is None: |
412 log.warning(u'{msg}.'.format(msg=msg)) | 412 log.warning('{msg}.'.format(msg=msg)) |
413 else: | 413 else: |
414 log.warning(u"{msg}: {reason}".format( | 414 log.warning("{msg}: {reason}".format( |
415 msg=msg, reason=condition_elt.name)) | 415 msg=msg, reason=condition_elt.name)) |
416 # stream resumption failed, but we still can do normal stream management | 416 # stream resumption failed, but we still can do normal stream management |
417 # we restore attributes as if the session was new, and init stream | 417 # we restore attributes as if the session was new, and init stream |
418 # we keep everything initialized, and only do binding, roster request | 418 # we keep everything initialized, and only do binding, roster request |
419 # and initial presence sending. | 419 # and initial presence sending. |
420 if client.conn_deferred.called: | 420 if client.conn_deferred.called: |
421 client.conn_deferred = defer.Deferred() | 421 client.conn_deferred = defer.Deferred() |
422 else: | 422 else: |
423 log.error(u"conn_deferred should be called at this point") | 423 log.error("conn_deferred should be called at this point") |
424 plg_0045 = self.host.plugins.get(u'XEP-0045') | 424 plg_0045 = self.host.plugins.get('XEP-0045') |
425 plg_0313 = self.host.plugins.get(u'XEP-0313') | 425 plg_0313 = self.host.plugins.get('XEP-0313') |
426 | 426 |
427 # FIXME: we should call all loaded plugins with generic callbacks | 427 # FIXME: we should call all loaded plugins with generic callbacks |
428 # (e.g. prepareResume and resume), so a hot resuming can be done | 428 # (e.g. prepareResume and resume), so a hot resuming can be done |
429 # properly for all plugins. | 429 # properly for all plugins. |
430 | 430 |
491 session.req_timer.cancel() | 491 session.req_timer.cancel() |
492 session.req_timer = None | 492 session.req_timer = None |
493 try: | 493 try: |
494 server_acked = int(a_elt['h']) | 494 server_acked = int(a_elt['h']) |
495 except ValueError: | 495 except ValueError: |
496 log.warning(_(u"Server returned invalid ack element, disabling stream " | 496 log.warning(_("Server returned invalid ack element, disabling stream " |
497 u"management: {xml}").format(xml=a_elt)) | 497 "management: {xml}").format(xml=a_elt)) |
498 session.enabled = False | 498 session.enabled = False |
499 return | 499 return |
500 | 500 |
501 if server_acked > session.out_counter: | 501 if server_acked > session.out_counter: |
502 log.error(_(u"Server acked more stanzas than we have sent, disabling stream " | 502 log.error(_("Server acked more stanzas than we have sent, disabling stream " |
503 u"management.")) | 503 "management.")) |
504 session.reset() | 504 session.reset() |
505 return | 505 return |
506 | 506 |
507 self.updateBuffer(session, server_acked) | 507 self.updateBuffer(session, server_acked) |
508 self.checkAcks(client) | 508 self.checkAcks(client) |
509 | 509 |
510 def onAckTimeOut(self, client): | 510 def onAckTimeOut(self, client): |
511 """Called when a requested ACK has not been received in time""" | 511 """Called when a requested ACK has not been received in time""" |
512 log.info(_(u"Ack was not received in time, aborting connection")) | 512 log.info(_("Ack was not received in time, aborting connection")) |
513 transport = client.xmlstream.transport | 513 transport = client.xmlstream.transport |
514 if transport is None: | 514 if transport is None: |
515 log.warning(u"transport was already removed") | 515 log.warning("transport was already removed") |
516 else: | 516 else: |
517 transport.abortConnection() | 517 transport.abortConnection() |
518 client._xep_0198_session.req_timer = None | 518 client._xep_0198_session.req_timer = None |
519 | 519 |
520 | 520 |
521 @implementer(iwokkel.IDisco) | |
521 class XEP_0198_handler(xmlstream.XMPPHandler): | 522 class XEP_0198_handler(xmlstream.XMPPHandler): |
522 implements(iwokkel.IDisco) | |
523 | 523 |
524 def __init__(self, plugin_parent): | 524 def __init__(self, plugin_parent): |
525 self.plugin_parent = plugin_parent | 525 self.plugin_parent = plugin_parent |
526 self.host = plugin_parent.host | 526 self.host = plugin_parent.host |
527 | 527 |