comparison libervia/backend/plugins/plugin_xep_0198.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 4b842c1fb686
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
56 # Max number of stanza to send before requesting ack 56 # Max number of stanza to send before requesting ack
57 MAX_STANZA_ACK_R = 5 57 MAX_STANZA_ACK_R = 5
58 # Max number of seconds before requesting ack 58 # Max number of seconds before requesting ack
59 MAX_DELAY_ACK_R = 30 59 MAX_DELAY_ACK_R = 30
60 MAX_COUNTER = 2**32 60 MAX_COUNTER = 2**32
61 RESUME_MAX = 5*60 61 RESUME_MAX = 5 * 60
62 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted 62 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted
63 ACK_TIMEOUT = 35 63 ACK_TIMEOUT = 35
64 64
65 65
66 class ProfileSessionData(object): 66 class ProfileSessionData(object):
90 90
91 @enabled.setter 91 @enabled.setter
92 def enabled(self, enabled): 92 def enabled(self, enabled):
93 if enabled: 93 if enabled:
94 if self._enabled: 94 if self._enabled:
95 raise exceptions.InternalError( 95 raise exceptions.InternalError("Stream Management can't be enabled twice")
96 "Stream Management can't be enabled twice")
97 self._enabled = True 96 self._enabled = True
98 callback, kw = self.callback_data 97 callback, kw = self.callback_data
99 self.timer = task.LoopingCall(callback, **kw) 98 self.timer = task.LoopingCall(callback, **kw)
100 self.timer.start(MAX_DELAY_ACK_R, now=False) 99 self.timer.start(MAX_DELAY_ACK_R, now=False)
101 else: 100 else:
131 # FIXME: location is not handled yet 130 # FIXME: location is not handled yet
132 131
133 def __init__(self, host): 132 def __init__(self, host):
134 log.info(_("Plugin Stream Management initialization")) 133 log.info(_("Plugin Stream Management initialization"))
135 self.host = host 134 self.host = host
136 host.register_namespace('sm', NS_SM) 135 host.register_namespace("sm", NS_SM)
137 host.trigger.add("stream_hooks", self.add_hooks) 136 host.trigger.add("stream_hooks", self.add_hooks)
138 host.trigger.add("xml_init", self._xml_init_trigger) 137 host.trigger.add("xml_init", self._xml_init_trigger)
139 host.trigger.add("disconnecting", self._disconnecting_trigger) 138 host.trigger.add("disconnecting", self._disconnecting_trigger)
140 host.trigger.add("disconnected", self._disconnected_trigger) 139 host.trigger.add("disconnected", self._disconnected_trigger)
141 try: 140 try:
142 self._ack_timeout = int(host.memory.config_get("", "ack_timeout", ACK_TIMEOUT)) 141 self._ack_timeout = int(
142 host.memory.config_get("", "ack_timeout", ACK_TIMEOUT)
143 )
143 except ValueError: 144 except ValueError:
144 log.error(_("Invalid ack_timeout value, please check your configuration")) 145 log.error(_("Invalid ack_timeout value, please check your configuration"))
145 self._ack_timeout = ACK_TIMEOUT 146 self._ack_timeout = ACK_TIMEOUT
146 if not self._ack_timeout: 147 if not self._ack_timeout:
147 log.info(_("Ack timeout disabled")) 148 log.info(_("Ack timeout disabled"))
148 else: 149 else:
149 log.info(_("Ack timeout set to {timeout}s").format( 150 log.info(_("Ack timeout set to {timeout}s").format(timeout=self._ack_timeout))
150 timeout=self._ack_timeout))
151 151
152 def profile_connecting(self, client): 152 def profile_connecting(self, client):
153 client._xep_0198_session = ProfileSessionData(callback=self.check_acks, 153 client._xep_0198_session = ProfileSessionData(
154 client=client) 154 callback=self.check_acks, client=client
155 )
155 156
156 def get_handler(self, client): 157 def get_handler(self, client):
157 return XEP_0198_handler(self) 158 return XEP_0198_handler(self)
158 159
159 def add_hooks(self, client, receive_hooks, send_hooks): 160 def add_hooks(self, client, receive_hooks, send_hooks):
162 send_hooks.append(partial(self.on_send, client=client)) 163 send_hooks.append(partial(self.on_send, client=client))
163 return True 164 return True
164 165
165 def _xml_init_trigger(self, client): 166 def _xml_init_trigger(self, client):
166 """Enable or resume a stream mangement""" 167 """Enable or resume a stream mangement"""
167 if not (NS_SM, 'sm') in client.xmlstream.features: 168 if not (NS_SM, "sm") in client.xmlstream.features:
168 log.warning(_( 169 log.warning(
169 "Your server doesn't support stream management ({namespace}), this is " 170 _(
170 "used to improve connection problems detection (like network outages). " 171 "Your server doesn't support stream management ({namespace}), this is "
171 "Please ask your server administrator to enable this feature.".format( 172 "used to improve connection problems detection (like network outages). "
172 namespace=NS_SM))) 173 "Please ask your server administrator to enable this feature.".format(
174 namespace=NS_SM
175 )
176 )
177 )
173 return True 178 return True
174 session = client._xep_0198_session 179 session = client._xep_0198_session
175 180
176 # a disconnect timer from a previous disconnection may still be active 181 # a disconnect timer from a previous disconnection may still be active
177 try: 182 try:
183 disconnect_timer.cancel() 188 disconnect_timer.cancel()
184 del session.disconnect_timer 189 del session.disconnect_timer
185 190
186 if session.resume_enabled: 191 if session.resume_enabled:
187 # we are resuming a session 192 # we are resuming a session
188 resume_elt = domish.Element((NS_SM, 'resume')) 193 resume_elt = domish.Element((NS_SM, "resume"))
189 resume_elt['h'] = str(session.in_counter) 194 resume_elt["h"] = str(session.in_counter)
190 resume_elt['previd'] = session.session_id 195 resume_elt["previd"] = session.session_id
191 client.send(resume_elt) 196 client.send(resume_elt)
192 session.resuming = True 197 session.resuming = True
193 # session.enabled will be set on <resumed/> reception 198 # session.enabled will be set on <resumed/> reception
194 return False 199 return False
195 else: 200 else:
196 # we start a new session 201 # we start a new session
197 assert session.out_counter == 0 202 assert session.out_counter == 0
198 enable_elt = domish.Element((NS_SM, 'enable')) 203 enable_elt = domish.Element((NS_SM, "enable"))
199 enable_elt['resume'] = 'true' 204 enable_elt["resume"] = "true"
200 client.send(enable_elt) 205 client.send(enable_elt)
201 session.enabled = True 206 session.enabled = True
202 return True 207 return True
203 208
204 def _disconnecting_trigger(self, client): 209 def _disconnecting_trigger(self, client):
215 return True 220 return True
216 session = client._xep_0198_session 221 session = client._xep_0198_session
217 session.enabled = False 222 session.enabled = False
218 if session.resume_enabled: 223 if session.resume_enabled:
219 session.disconnected_time = time.time() 224 session.disconnected_time = time.time()
220 session.disconnect_timer = reactor.callLater(session.session_max, 225 session.disconnect_timer = reactor.callLater(
221 client.disconnect_profile, 226 session.session_max, client.disconnect_profile, reason
222 reason) 227 )
223 # disconnect_profile must not be called at this point 228 # disconnect_profile must not be called at this point
224 # because session can be resumed 229 # because session can be resumed
225 return False 230 return False
226 else: 231 else:
227 return True 232 return True
232 # log.debug("check_acks (in_counter={}, out_counter={}, buf len={}, buf idx={})" 237 # log.debug("check_acks (in_counter={}, out_counter={}, buf len={}, buf idx={})"
233 # .format(session.in_counter, session.out_counter, len(session.buffer), 238 # .format(session.in_counter, session.out_counter, len(session.buffer),
234 # session.buffer_idx)) 239 # session.buffer_idx))
235 if session.ack_requested or not session.buffer: 240 if session.ack_requested or not session.buffer:
236 return 241 return
237 if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R 242 if (
238 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R): 243 session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R
244 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R
245 ):
239 self.request_ack(client) 246 self.request_ack(client)
240 session.ack_requested = True 247 session.ack_requested = True
241 session.last_ack_r = time.time() 248 session.last_ack_r = time.time()
242 249
243 def update_buffer(self, session, server_acked): 250 def update_buffer(self, session, server_acked):
251 log.error( 258 log.error(
252 "error while cleaning buffer, invalid index (buffer is empty):\n" 259 "error while cleaning buffer, invalid index (buffer is empty):\n"
253 "diff = {diff}\n" 260 "diff = {diff}\n"
254 "server_acked = {server_acked}\n" 261 "server_acked = {server_acked}\n"
255 "buffer_idx = {buffer_id}".format( 262 "buffer_idx = {buffer_id}".format(
256 diff=diff, server_acked=server_acked, 263 diff=diff, server_acked=server_acked, buffer_id=session.buffer_idx
257 buffer_id=session.buffer_idx)) 264 )
265 )
258 session.buffer_idx += diff 266 session.buffer_idx += diff
259 267
260 def replay_buffer(self, client, buffer_, discard_results=False): 268 def replay_buffer(self, client, buffer_, discard_results=False):
261 """Resend all stanza in buffer 269 """Resend all stanza in buffer
262 270
268 try: 276 try:
269 stanza = buffer_.pop() 277 stanza = buffer_.pop()
270 except IndexError: 278 except IndexError:
271 break 279 break
272 else: 280 else:
273 if ((discard_results 281 if (
274 and stanza.name == 'iq' 282 discard_results
275 and stanza.getAttribute('type') == 'result')): 283 and stanza.name == "iq"
284 and stanza.getAttribute("type") == "result"
285 ):
276 continue 286 continue
277 client.send(stanza) 287 client.send(stanza)
278 288
279 def send_ack(self, client): 289 def send_ack(self, client):
280 """Send an answer element with current IN counter""" 290 """Send an answer element with current IN counter"""
281 a_elt = domish.Element((NS_SM, 'a')) 291 a_elt = domish.Element((NS_SM, "a"))
282 a_elt['h'] = str(client._xep_0198_session.in_counter) 292 a_elt["h"] = str(client._xep_0198_session.in_counter)
283 client.send(a_elt) 293 client.send(a_elt)
284 294
285 def request_ack(self, client): 295 def request_ack(self, client):
286 """Send a request element""" 296 """Send a request element"""
287 session = client._xep_0198_session 297 session = client._xep_0198_session
288 r_elt = domish.Element((NS_SM, 'r')) 298 r_elt = domish.Element((NS_SM, "r"))
289 client.send(r_elt) 299 client.send(r_elt)
290 if session.req_timer is not None: 300 if session.req_timer is not None:
291 raise exceptions.InternalError("req_timer should not be set") 301 raise exceptions.InternalError("req_timer should not be set")
292 if self._ack_timeout: 302 if self._ack_timeout:
293 session.req_timer = reactor.callLater(self._ack_timeout, self.on_ack_time_out, 303 session.req_timer = reactor.callLater(
294 client) 304 self._ack_timeout, self.on_ack_time_out, client
305 )
295 306
296 def _connectionFailed(self, failure_, connector): 307 def _connectionFailed(self, failure_, connector):
297 normal_host, normal_port = connector.normal_location 308 normal_host, normal_port = connector.normal_location
298 del connector.normal_location 309 del connector.normal_location
299 log.warning(_( 310 log.warning(
300 "Connection failed using location given by server (host: {host}, port: " 311 _(
301 "{port}), switching to normal host and port (host: {normal_host}, port: " 312 "Connection failed using location given by server (host: {host}, port: "
302 "{normal_port})".format(host=connector.host, port=connector.port, 313 "{port}), switching to normal host and port (host: {normal_host}, port: "
303 normal_host=normal_host, normal_port=normal_port))) 314 "{normal_port})".format(
315 host=connector.host,
316 port=connector.port,
317 normal_host=normal_host,
318 normal_port=normal_port,
319 )
320 )
321 )
304 connector.host, connector.port = normal_host, normal_port 322 connector.host, connector.port = normal_host, normal_port
305 connector.connectionFailed = connector.connectionFailed_ori 323 connector.connectionFailed = connector.connectionFailed_ori
306 del connector.connectionFailed_ori 324 del connector.connectionFailed_ori
307 return connector.connectionFailed(failure_) 325 return connector.connectionFailed(failure_)
308 326
309 def on_enabled(self, enabled_elt, client): 327 def on_enabled(self, enabled_elt, client):
310 session = client._xep_0198_session 328 session = client._xep_0198_session
311 session.in_counter = 0 329 session.in_counter = 0
312 330
313 # we check that resuming is possible and that we have a session id 331 # we check that resuming is possible and that we have a session id
314 resume = C.bool(enabled_elt.getAttribute('resume')) 332 resume = C.bool(enabled_elt.getAttribute("resume"))
315 session_id = enabled_elt.getAttribute('id') 333 session_id = enabled_elt.getAttribute("id")
316 if not session_id: 334 if not session_id:
317 log.warning(_('Incorrect <enabled/> element received, no "id" attribute')) 335 log.warning(_('Incorrect <enabled/> element received, no "id" attribute'))
318 if not resume or not session_id: 336 if not resume or not session_id:
319 log.warning(_( 337 log.warning(
320 "You're server doesn't support session resuming with stream management, " 338 _(
321 "please contact your server administrator to enable it")) 339 "You're server doesn't support session resuming with stream management, "
340 "please contact your server administrator to enable it"
341 )
342 )
322 return 343 return
323 344
324 session.session_id = session_id 345 session.session_id = session_id
325 346
326 # XXX: we disable resource binding, which must not be done 347 # XXX: we disable resource binding, which must not be done
327 # when we resume the session. 348 # when we resume the session.
328 client.factory.authenticator.res_binding = False 349 client.factory.authenticator.res_binding = False
329 350
330 # location, in case server want resuming session to be elsewhere 351 # location, in case server want resuming session to be elsewhere
331 try: 352 try:
332 location = enabled_elt['location'] 353 location = enabled_elt["location"]
333 except KeyError: 354 except KeyError:
334 pass 355 pass
335 else: 356 else:
336 # TODO: handle IPv6 here (in brackets, cf. XEP) 357 # TODO: handle IPv6 here (in brackets, cf. XEP)
337 try: 358 try:
338 domain, port = location.split(':', 1) 359 domain, port = location.split(":", 1)
339 port = int(port) 360 port = int(port)
340 except ValueError: 361 except ValueError:
341 log.warning(_("Invalid location received: {location}") 362 log.warning(
342 .format(location=location)) 363 _("Invalid location received: {location}").format(location=location)
364 )
343 else: 365 else:
344 session.location = (domain, port) 366 session.location = (domain, port)
345 # we monkey patch connector to use the new location 367 # we monkey patch connector to use the new location
346 connector = client.xmlstream.transport.connector 368 connector = client.xmlstream.transport.connector
347 connector.normal_location = connector.host, connector.port 369 connector.normal_location = connector.host, connector.port
348 connector.host = domain 370 connector.host = domain
349 connector.port = port 371 connector.port = port
350 connector.connectionFailed_ori = connector.connectionFailed 372 connector.connectionFailed_ori = connector.connectionFailed
351 connector.connectionFailed = partial(self._connectionFailed, 373 connector.connectionFailed = partial(
352 connector=connector) 374 self._connectionFailed, connector=connector
375 )
353 376
354 # resuming time 377 # resuming time
355 try: 378 try:
356 max_s = int(enabled_elt['max']) 379 max_s = int(enabled_elt["max"])
357 except (ValueError, KeyError) as e: 380 except (ValueError, KeyError) as e:
358 if isinstance(e, ValueError): 381 if isinstance(e, ValueError):
359 log.warning(_('Invalid "max" attribute')) 382 log.warning(_('Invalid "max" attribute'))
360 max_s = RESUME_MAX 383 max_s = RESUME_MAX
361 log.info(_("Using default session max value ({max_s} s).".format( 384 log.info(
362 max_s=max_s))) 385 _("Using default session max value ({max_s} s).".format(max_s=max_s))
386 )
363 log.info(_("Stream Management enabled")) 387 log.info(_("Stream Management enabled"))
364 else: 388 else:
365 log.info(_( 389 log.info(
366 "Stream Management enabled, with a resumption time of {res_m:.2f} min" 390 _(
367 .format(res_m = max_s/60))) 391 "Stream Management enabled, with a resumption time of {res_m:.2f} min".format(
392 res_m=max_s / 60
393 )
394 )
395 )
368 session.session_max = max_s 396 session.session_max = max_s
369 397
370 def on_resumed(self, enabled_elt, client): 398 def on_resumed(self, enabled_elt, client):
371 session = client._xep_0198_session 399 session = client._xep_0198_session
372 assert not session.enabled 400 assert not session.enabled
373 del session.resuming 401 del session.resuming
374 server_acked = int(enabled_elt['h']) 402 server_acked = int(enabled_elt["h"])
375 self.update_buffer(session, server_acked) 403 self.update_buffer(session, server_acked)
376 resend_count = len(session.buffer) 404 resend_count = len(session.buffer)
377 # we resend all stanza which have not been received properly 405 # we resend all stanza which have not been received properly
378 self.replay_buffer(client, session.buffer) 406 self.replay_buffer(client, session.buffer)
379 # now we can continue the session 407 # now we can continue the session
380 session.enabled = True 408 session.enabled = True
381 d_time = time.time() - session.disconnected_time 409 d_time = time.time() - session.disconnected_time
382 log.info(_("Stream session resumed (disconnected for {d_time} s, {count} " 410 log.info(
383 "stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) 411 _(
412 "Stream session resumed (disconnected for {d_time} s, {count} "
413 "stanza(s) resent)"
414 ).format(d_time=int(d_time), count=resend_count)
415 )
384 416
385 def on_failed(self, failed_elt, client): 417 def on_failed(self, failed_elt, client):
386 session = client._xep_0198_session 418 session = client._xep_0198_session
387 condition_elt = failed_elt.firstChildElement() 419 condition_elt = failed_elt.firstChildElement()
388 buffer_ = session.get_buffer_copy() 420 buffer_ = session.get_buffer_copy()
392 del session.resuming 424 del session.resuming
393 except AttributeError: 425 except AttributeError:
394 # stream management can't be started at all 426 # stream management can't be started at all
395 msg = _("Can't use stream management") 427 msg = _("Can't use stream management")
396 if condition_elt is None: 428 if condition_elt is None:
397 log.error(msg + '.') 429 log.error(msg + ".")
398 else: 430 else:
399 log.error(_("{msg}: {reason}").format( 431 log.error(_("{msg}: {reason}").format(msg=msg, reason=condition_elt.name))
400 msg=msg, reason=condition_elt.name))
401 else: 432 else:
402 # only stream resumption failed, we can try full session init 433 # only stream resumption failed, we can try full session init
403 # XXX: we try to start full session init from this point, with many 434 # XXX: we try to start full session init from this point, with many
404 # variables/attributes already initialised with a potentially different 435 # variables/attributes already initialised with a potentially different
405 # jid. This is experimental and may not be safe. It may be more 436 # jid. This is experimental and may not be safe. It may be more
406 # secured to abord the connection and restart everything with a fresh 437 # secured to abord the connection and restart everything with a fresh
407 # client. 438 # client.
408 msg = _("stream resumption not possible, restarting full session") 439 msg = _("stream resumption not possible, restarting full session")
409 440
410 if condition_elt is None: 441 if condition_elt is None:
411 log.warning('{msg}.'.format(msg=msg)) 442 log.warning("{msg}.".format(msg=msg))
412 else: 443 else:
413 log.warning("{msg}: {reason}".format( 444 log.warning("{msg}: {reason}".format(msg=msg, reason=condition_elt.name))
414 msg=msg, reason=condition_elt.name))
415 # stream resumption failed, but we still can do normal stream management 445 # stream resumption failed, but we still can do normal stream management
416 # we restore attributes as if the session was new, and init stream 446 # we restore attributes as if the session was new, and init stream
417 # we keep everything initialized, and only do binding, roster request 447 # we keep everything initialized, and only do binding, roster request
418 # and initial presence sending. 448 # and initial presence sending.
419 if client.conn_deferred.called: 449 if client.conn_deferred.called:
420 client.conn_deferred = defer.Deferred() 450 client.conn_deferred = defer.Deferred()
421 else: 451 else:
422 log.error("conn_deferred should be called at this point") 452 log.error("conn_deferred should be called at this point")
423 plg_0045 = self.host.plugins.get('XEP-0045') 453 plg_0045 = self.host.plugins.get("XEP-0045")
424 plg_0313 = self.host.plugins.get('XEP-0313') 454 plg_0313 = self.host.plugins.get("XEP-0313")
425 455
426 # FIXME: we should call all loaded plugins with generic callbacks 456 # FIXME: we should call all loaded plugins with generic callbacks
427 # (e.g. prepareResume and resume), so a hot resuming can be done 457 # (e.g. prepareResume and resume), so a hot resuming can be done
428 # properly for all plugins. 458 # properly for all plugins.
429 459
430 if plg_0045 is not None: 460 if plg_0045 is not None:
431 # we have to remove joined rooms 461 # we have to remove joined rooms
432 muc_join_args = plg_0045.pop_rooms(client) 462 muc_join_args = plg_0045.pop_rooms(client)
433 # we need to recreate roster 463 # we need to recreate roster
434 client.handlers.remove(client.roster) 464 client.handlers.remove(client.roster)
435 client.roster = client.roster.__class__(self.host) 465 client.roster = client.roster.__class__(self.host)
436 client.roster.setHandlerParent(client) 466 client.roster.setHandlerParent(client)
437 # bind init is not done when resuming is possible, so we have to do it now 467 # bind init is not done when resuming is possible, so we have to do it now
438 bind_init = jabber_client.BindInitializer(client.xmlstream) 468 bind_init = jabber_client.BindInitializer(client.xmlstream)
439 bind_init.required = True 469 bind_init.required = True
440 d = bind_init.start() 470 d = bind_init.start()
441 # we set the jid, which may have changed 471 # we set the jid, which may have changed
442 d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid)) 472 d.addCallback(
473 lambda __: setattr(client.factory.authenticator, "jid", client.jid)
474 )
443 # we call the trigger who will send the <enable/> element 475 # we call the trigger who will send the <enable/> element
444 d.addCallback(lambda __: self._xml_init_trigger(client)) 476 d.addCallback(lambda __: self._xml_init_trigger(client))
445 # then we have to re-request the roster, as changes may have occured 477 # then we have to re-request the roster, as changes may have occured
446 d.addCallback(lambda __: client.roster.request_roster()) 478 d.addCallback(lambda __: client.roster.request_roster())
447 # we add got_roster to be sure to have roster before sending initial presence 479 # we add got_roster to be sure to have roster before sending initial presence
452 # initial presence must be sent manually 484 # initial presence must be sent manually
453 d.addCallback(lambda __: client.presence.available()) 485 d.addCallback(lambda __: client.presence.available())
454 if plg_0045 is not None: 486 if plg_0045 is not None:
455 # we re-join MUC rooms 487 # we re-join MUC rooms
456 muc_d_list = defer.DeferredList( 488 muc_d_list = defer.DeferredList(
457 [defer.ensureDeferred(plg_0045.join(*args)) 489 [defer.ensureDeferred(plg_0045.join(*args)) for args in muc_join_args]
458 for args in muc_join_args]
459 ) 490 )
460 d.addCallback(lambda __: muc_d_list) 491 d.addCallback(lambda __: muc_d_list)
461 # at the end we replay the buffer, as those stanzas have probably not 492 # at the end we replay the buffer, as those stanzas have probably not
462 # been received 493 # been received
463 d.addCallback(lambda __: self.replay_buffer(client, buffer_, 494 d.addCallback(
464 discard_results=True)) 495 lambda __: self.replay_buffer(client, buffer_, discard_results=True)
496 )
465 497
466 def on_receive(self, element, client): 498 def on_receive(self, element, client):
467 if not client.is_component: 499 if not client.is_component:
468 session = client._xep_0198_session 500 session = client._xep_0198_session
469 if session.enabled and element.name.lower() in C.STANZA_NAMES: 501 if session.enabled and element.name.lower() in C.STANZA_NAMES:
470 session.in_counter += 1 % MAX_COUNTER 502 session.in_counter += 1 % MAX_COUNTER
471 503
472 def on_send(self, obj, client): 504 def on_send(self, obj, client):
473 if not client.is_component: 505 if not client.is_component:
474 session = client._xep_0198_session 506 session = client._xep_0198_session
475 if (session.enabled 507 if (
508 session.enabled
476 and domish.IElement.providedBy(obj) 509 and domish.IElement.providedBy(obj)
477 and obj.name.lower() in C.STANZA_NAMES): 510 and obj.name.lower() in C.STANZA_NAMES
511 ):
478 session.out_counter += 1 % MAX_COUNTER 512 session.out_counter += 1 % MAX_COUNTER
479 session.buffer.appendleft(obj) 513 session.buffer.appendleft(obj)
480 self.check_acks(client) 514 self.check_acks(client)
481 515
482 def on_ack_request(self, r_elt, client): 516 def on_ack_request(self, r_elt, client):
490 log.error("req_timer should be set") 524 log.error("req_timer should be set")
491 else: 525 else:
492 session.req_timer.cancel() 526 session.req_timer.cancel()
493 session.req_timer = None 527 session.req_timer = None
494 try: 528 try:
495 server_acked = int(a_elt['h']) 529 server_acked = int(a_elt["h"])
496 except ValueError: 530 except ValueError:
497 log.warning(_("Server returned invalid ack element, disabling stream " 531 log.warning(
498 "management: {xml}").format(xml=a_elt)) 532 _(
533 "Server returned invalid ack element, disabling stream "
534 "management: {xml}"
535 ).format(xml=a_elt)
536 )
499 session.enabled = False 537 session.enabled = False
500 return 538 return
501 539
502 if server_acked > session.out_counter: 540 if server_acked > session.out_counter:
503 log.error(_("Server acked more stanzas than we have sent, disabling stream " 541 log.error(
504 "management.")) 542 _(
543 "Server acked more stanzas than we have sent, disabling stream "
544 "management."
545 )
546 )
505 session.reset() 547 session.reset()
506 return 548 return
507 549
508 self.update_buffer(session, server_acked) 550 self.update_buffer(session, server_acked)
509 self.check_acks(client) 551 self.check_acks(client)