Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0198.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 4b842c1fb686 |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
56 # Max number of stanza to send before requesting ack | 56 # Max number of stanza to send before requesting ack |
57 MAX_STANZA_ACK_R = 5 | 57 MAX_STANZA_ACK_R = 5 |
58 # Max number of seconds before requesting ack | 58 # Max number of seconds before requesting ack |
59 MAX_DELAY_ACK_R = 30 | 59 MAX_DELAY_ACK_R = 30 |
60 MAX_COUNTER = 2**32 | 60 MAX_COUNTER = 2**32 |
61 RESUME_MAX = 5*60 | 61 RESUME_MAX = 5 * 60 |
62 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted | 62 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted |
63 ACK_TIMEOUT = 35 | 63 ACK_TIMEOUT = 35 |
64 | 64 |
65 | 65 |
66 class ProfileSessionData(object): | 66 class ProfileSessionData(object): |
90 | 90 |
91 @enabled.setter | 91 @enabled.setter |
92 def enabled(self, enabled): | 92 def enabled(self, enabled): |
93 if enabled: | 93 if enabled: |
94 if self._enabled: | 94 if self._enabled: |
95 raise exceptions.InternalError( | 95 raise exceptions.InternalError("Stream Management can't be enabled twice") |
96 "Stream Management can't be enabled twice") | |
97 self._enabled = True | 96 self._enabled = True |
98 callback, kw = self.callback_data | 97 callback, kw = self.callback_data |
99 self.timer = task.LoopingCall(callback, **kw) | 98 self.timer = task.LoopingCall(callback, **kw) |
100 self.timer.start(MAX_DELAY_ACK_R, now=False) | 99 self.timer.start(MAX_DELAY_ACK_R, now=False) |
101 else: | 100 else: |
131 # FIXME: location is not handled yet | 130 # FIXME: location is not handled yet |
132 | 131 |
133 def __init__(self, host): | 132 def __init__(self, host): |
134 log.info(_("Plugin Stream Management initialization")) | 133 log.info(_("Plugin Stream Management initialization")) |
135 self.host = host | 134 self.host = host |
136 host.register_namespace('sm', NS_SM) | 135 host.register_namespace("sm", NS_SM) |
137 host.trigger.add("stream_hooks", self.add_hooks) | 136 host.trigger.add("stream_hooks", self.add_hooks) |
138 host.trigger.add("xml_init", self._xml_init_trigger) | 137 host.trigger.add("xml_init", self._xml_init_trigger) |
139 host.trigger.add("disconnecting", self._disconnecting_trigger) | 138 host.trigger.add("disconnecting", self._disconnecting_trigger) |
140 host.trigger.add("disconnected", self._disconnected_trigger) | 139 host.trigger.add("disconnected", self._disconnected_trigger) |
141 try: | 140 try: |
142 self._ack_timeout = int(host.memory.config_get("", "ack_timeout", ACK_TIMEOUT)) | 141 self._ack_timeout = int( |
142 host.memory.config_get("", "ack_timeout", ACK_TIMEOUT) | |
143 ) | |
143 except ValueError: | 144 except ValueError: |
144 log.error(_("Invalid ack_timeout value, please check your configuration")) | 145 log.error(_("Invalid ack_timeout value, please check your configuration")) |
145 self._ack_timeout = ACK_TIMEOUT | 146 self._ack_timeout = ACK_TIMEOUT |
146 if not self._ack_timeout: | 147 if not self._ack_timeout: |
147 log.info(_("Ack timeout disabled")) | 148 log.info(_("Ack timeout disabled")) |
148 else: | 149 else: |
149 log.info(_("Ack timeout set to {timeout}s").format( | 150 log.info(_("Ack timeout set to {timeout}s").format(timeout=self._ack_timeout)) |
150 timeout=self._ack_timeout)) | |
151 | 151 |
152 def profile_connecting(self, client): | 152 def profile_connecting(self, client): |
153 client._xep_0198_session = ProfileSessionData(callback=self.check_acks, | 153 client._xep_0198_session = ProfileSessionData( |
154 client=client) | 154 callback=self.check_acks, client=client |
155 ) | |
155 | 156 |
156 def get_handler(self, client): | 157 def get_handler(self, client): |
157 return XEP_0198_handler(self) | 158 return XEP_0198_handler(self) |
158 | 159 |
159 def add_hooks(self, client, receive_hooks, send_hooks): | 160 def add_hooks(self, client, receive_hooks, send_hooks): |
162 send_hooks.append(partial(self.on_send, client=client)) | 163 send_hooks.append(partial(self.on_send, client=client)) |
163 return True | 164 return True |
164 | 165 |
165 def _xml_init_trigger(self, client): | 166 def _xml_init_trigger(self, client): |
166 """Enable or resume a stream mangement""" | 167 """Enable or resume a stream mangement""" |
167 if not (NS_SM, 'sm') in client.xmlstream.features: | 168 if not (NS_SM, "sm") in client.xmlstream.features: |
168 log.warning(_( | 169 log.warning( |
169 "Your server doesn't support stream management ({namespace}), this is " | 170 _( |
170 "used to improve connection problems detection (like network outages). " | 171 "Your server doesn't support stream management ({namespace}), this is " |
171 "Please ask your server administrator to enable this feature.".format( | 172 "used to improve connection problems detection (like network outages). " |
172 namespace=NS_SM))) | 173 "Please ask your server administrator to enable this feature.".format( |
174 namespace=NS_SM | |
175 ) | |
176 ) | |
177 ) | |
173 return True | 178 return True |
174 session = client._xep_0198_session | 179 session = client._xep_0198_session |
175 | 180 |
176 # a disconnect timer from a previous disconnection may still be active | 181 # a disconnect timer from a previous disconnection may still be active |
177 try: | 182 try: |
183 disconnect_timer.cancel() | 188 disconnect_timer.cancel() |
184 del session.disconnect_timer | 189 del session.disconnect_timer |
185 | 190 |
186 if session.resume_enabled: | 191 if session.resume_enabled: |
187 # we are resuming a session | 192 # we are resuming a session |
188 resume_elt = domish.Element((NS_SM, 'resume')) | 193 resume_elt = domish.Element((NS_SM, "resume")) |
189 resume_elt['h'] = str(session.in_counter) | 194 resume_elt["h"] = str(session.in_counter) |
190 resume_elt['previd'] = session.session_id | 195 resume_elt["previd"] = session.session_id |
191 client.send(resume_elt) | 196 client.send(resume_elt) |
192 session.resuming = True | 197 session.resuming = True |
193 # session.enabled will be set on <resumed/> reception | 198 # session.enabled will be set on <resumed/> reception |
194 return False | 199 return False |
195 else: | 200 else: |
196 # we start a new session | 201 # we start a new session |
197 assert session.out_counter == 0 | 202 assert session.out_counter == 0 |
198 enable_elt = domish.Element((NS_SM, 'enable')) | 203 enable_elt = domish.Element((NS_SM, "enable")) |
199 enable_elt['resume'] = 'true' | 204 enable_elt["resume"] = "true" |
200 client.send(enable_elt) | 205 client.send(enable_elt) |
201 session.enabled = True | 206 session.enabled = True |
202 return True | 207 return True |
203 | 208 |
204 def _disconnecting_trigger(self, client): | 209 def _disconnecting_trigger(self, client): |
215 return True | 220 return True |
216 session = client._xep_0198_session | 221 session = client._xep_0198_session |
217 session.enabled = False | 222 session.enabled = False |
218 if session.resume_enabled: | 223 if session.resume_enabled: |
219 session.disconnected_time = time.time() | 224 session.disconnected_time = time.time() |
220 session.disconnect_timer = reactor.callLater(session.session_max, | 225 session.disconnect_timer = reactor.callLater( |
221 client.disconnect_profile, | 226 session.session_max, client.disconnect_profile, reason |
222 reason) | 227 ) |
223 # disconnect_profile must not be called at this point | 228 # disconnect_profile must not be called at this point |
224 # because session can be resumed | 229 # because session can be resumed |
225 return False | 230 return False |
226 else: | 231 else: |
227 return True | 232 return True |
232 # log.debug("check_acks (in_counter={}, out_counter={}, buf len={}, buf idx={})" | 237 # log.debug("check_acks (in_counter={}, out_counter={}, buf len={}, buf idx={})" |
233 # .format(session.in_counter, session.out_counter, len(session.buffer), | 238 # .format(session.in_counter, session.out_counter, len(session.buffer), |
234 # session.buffer_idx)) | 239 # session.buffer_idx)) |
235 if session.ack_requested or not session.buffer: | 240 if session.ack_requested or not session.buffer: |
236 return | 241 return |
237 if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R | 242 if ( |
238 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R): | 243 session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R |
244 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R | |
245 ): | |
239 self.request_ack(client) | 246 self.request_ack(client) |
240 session.ack_requested = True | 247 session.ack_requested = True |
241 session.last_ack_r = time.time() | 248 session.last_ack_r = time.time() |
242 | 249 |
243 def update_buffer(self, session, server_acked): | 250 def update_buffer(self, session, server_acked): |
251 log.error( | 258 log.error( |
252 "error while cleaning buffer, invalid index (buffer is empty):\n" | 259 "error while cleaning buffer, invalid index (buffer is empty):\n" |
253 "diff = {diff}\n" | 260 "diff = {diff}\n" |
254 "server_acked = {server_acked}\n" | 261 "server_acked = {server_acked}\n" |
255 "buffer_idx = {buffer_id}".format( | 262 "buffer_idx = {buffer_id}".format( |
256 diff=diff, server_acked=server_acked, | 263 diff=diff, server_acked=server_acked, buffer_id=session.buffer_idx |
257 buffer_id=session.buffer_idx)) | 264 ) |
265 ) | |
258 session.buffer_idx += diff | 266 session.buffer_idx += diff |
259 | 267 |
260 def replay_buffer(self, client, buffer_, discard_results=False): | 268 def replay_buffer(self, client, buffer_, discard_results=False): |
261 """Resend all stanza in buffer | 269 """Resend all stanza in buffer |
262 | 270 |
268 try: | 276 try: |
269 stanza = buffer_.pop() | 277 stanza = buffer_.pop() |
270 except IndexError: | 278 except IndexError: |
271 break | 279 break |
272 else: | 280 else: |
273 if ((discard_results | 281 if ( |
274 and stanza.name == 'iq' | 282 discard_results |
275 and stanza.getAttribute('type') == 'result')): | 283 and stanza.name == "iq" |
284 and stanza.getAttribute("type") == "result" | |
285 ): | |
276 continue | 286 continue |
277 client.send(stanza) | 287 client.send(stanza) |
278 | 288 |
279 def send_ack(self, client): | 289 def send_ack(self, client): |
280 """Send an answer element with current IN counter""" | 290 """Send an answer element with current IN counter""" |
281 a_elt = domish.Element((NS_SM, 'a')) | 291 a_elt = domish.Element((NS_SM, "a")) |
282 a_elt['h'] = str(client._xep_0198_session.in_counter) | 292 a_elt["h"] = str(client._xep_0198_session.in_counter) |
283 client.send(a_elt) | 293 client.send(a_elt) |
284 | 294 |
285 def request_ack(self, client): | 295 def request_ack(self, client): |
286 """Send a request element""" | 296 """Send a request element""" |
287 session = client._xep_0198_session | 297 session = client._xep_0198_session |
288 r_elt = domish.Element((NS_SM, 'r')) | 298 r_elt = domish.Element((NS_SM, "r")) |
289 client.send(r_elt) | 299 client.send(r_elt) |
290 if session.req_timer is not None: | 300 if session.req_timer is not None: |
291 raise exceptions.InternalError("req_timer should not be set") | 301 raise exceptions.InternalError("req_timer should not be set") |
292 if self._ack_timeout: | 302 if self._ack_timeout: |
293 session.req_timer = reactor.callLater(self._ack_timeout, self.on_ack_time_out, | 303 session.req_timer = reactor.callLater( |
294 client) | 304 self._ack_timeout, self.on_ack_time_out, client |
305 ) | |
295 | 306 |
296 def _connectionFailed(self, failure_, connector): | 307 def _connectionFailed(self, failure_, connector): |
297 normal_host, normal_port = connector.normal_location | 308 normal_host, normal_port = connector.normal_location |
298 del connector.normal_location | 309 del connector.normal_location |
299 log.warning(_( | 310 log.warning( |
300 "Connection failed using location given by server (host: {host}, port: " | 311 _( |
301 "{port}), switching to normal host and port (host: {normal_host}, port: " | 312 "Connection failed using location given by server (host: {host}, port: " |
302 "{normal_port})".format(host=connector.host, port=connector.port, | 313 "{port}), switching to normal host and port (host: {normal_host}, port: " |
303 normal_host=normal_host, normal_port=normal_port))) | 314 "{normal_port})".format( |
315 host=connector.host, | |
316 port=connector.port, | |
317 normal_host=normal_host, | |
318 normal_port=normal_port, | |
319 ) | |
320 ) | |
321 ) | |
304 connector.host, connector.port = normal_host, normal_port | 322 connector.host, connector.port = normal_host, normal_port |
305 connector.connectionFailed = connector.connectionFailed_ori | 323 connector.connectionFailed = connector.connectionFailed_ori |
306 del connector.connectionFailed_ori | 324 del connector.connectionFailed_ori |
307 return connector.connectionFailed(failure_) | 325 return connector.connectionFailed(failure_) |
308 | 326 |
309 def on_enabled(self, enabled_elt, client): | 327 def on_enabled(self, enabled_elt, client): |
310 session = client._xep_0198_session | 328 session = client._xep_0198_session |
311 session.in_counter = 0 | 329 session.in_counter = 0 |
312 | 330 |
313 # we check that resuming is possible and that we have a session id | 331 # we check that resuming is possible and that we have a session id |
314 resume = C.bool(enabled_elt.getAttribute('resume')) | 332 resume = C.bool(enabled_elt.getAttribute("resume")) |
315 session_id = enabled_elt.getAttribute('id') | 333 session_id = enabled_elt.getAttribute("id") |
316 if not session_id: | 334 if not session_id: |
317 log.warning(_('Incorrect <enabled/> element received, no "id" attribute')) | 335 log.warning(_('Incorrect <enabled/> element received, no "id" attribute')) |
318 if not resume or not session_id: | 336 if not resume or not session_id: |
319 log.warning(_( | 337 log.warning( |
320 "You're server doesn't support session resuming with stream management, " | 338 _( |
321 "please contact your server administrator to enable it")) | 339 "You're server doesn't support session resuming with stream management, " |
340 "please contact your server administrator to enable it" | |
341 ) | |
342 ) | |
322 return | 343 return |
323 | 344 |
324 session.session_id = session_id | 345 session.session_id = session_id |
325 | 346 |
326 # XXX: we disable resource binding, which must not be done | 347 # XXX: we disable resource binding, which must not be done |
327 # when we resume the session. | 348 # when we resume the session. |
328 client.factory.authenticator.res_binding = False | 349 client.factory.authenticator.res_binding = False |
329 | 350 |
330 # location, in case server want resuming session to be elsewhere | 351 # location, in case server want resuming session to be elsewhere |
331 try: | 352 try: |
332 location = enabled_elt['location'] | 353 location = enabled_elt["location"] |
333 except KeyError: | 354 except KeyError: |
334 pass | 355 pass |
335 else: | 356 else: |
336 # TODO: handle IPv6 here (in brackets, cf. XEP) | 357 # TODO: handle IPv6 here (in brackets, cf. XEP) |
337 try: | 358 try: |
338 domain, port = location.split(':', 1) | 359 domain, port = location.split(":", 1) |
339 port = int(port) | 360 port = int(port) |
340 except ValueError: | 361 except ValueError: |
341 log.warning(_("Invalid location received: {location}") | 362 log.warning( |
342 .format(location=location)) | 363 _("Invalid location received: {location}").format(location=location) |
364 ) | |
343 else: | 365 else: |
344 session.location = (domain, port) | 366 session.location = (domain, port) |
345 # we monkey patch connector to use the new location | 367 # we monkey patch connector to use the new location |
346 connector = client.xmlstream.transport.connector | 368 connector = client.xmlstream.transport.connector |
347 connector.normal_location = connector.host, connector.port | 369 connector.normal_location = connector.host, connector.port |
348 connector.host = domain | 370 connector.host = domain |
349 connector.port = port | 371 connector.port = port |
350 connector.connectionFailed_ori = connector.connectionFailed | 372 connector.connectionFailed_ori = connector.connectionFailed |
351 connector.connectionFailed = partial(self._connectionFailed, | 373 connector.connectionFailed = partial( |
352 connector=connector) | 374 self._connectionFailed, connector=connector |
375 ) | |
353 | 376 |
354 # resuming time | 377 # resuming time |
355 try: | 378 try: |
356 max_s = int(enabled_elt['max']) | 379 max_s = int(enabled_elt["max"]) |
357 except (ValueError, KeyError) as e: | 380 except (ValueError, KeyError) as e: |
358 if isinstance(e, ValueError): | 381 if isinstance(e, ValueError): |
359 log.warning(_('Invalid "max" attribute')) | 382 log.warning(_('Invalid "max" attribute')) |
360 max_s = RESUME_MAX | 383 max_s = RESUME_MAX |
361 log.info(_("Using default session max value ({max_s} s).".format( | 384 log.info( |
362 max_s=max_s))) | 385 _("Using default session max value ({max_s} s).".format(max_s=max_s)) |
386 ) | |
363 log.info(_("Stream Management enabled")) | 387 log.info(_("Stream Management enabled")) |
364 else: | 388 else: |
365 log.info(_( | 389 log.info( |
366 "Stream Management enabled, with a resumption time of {res_m:.2f} min" | 390 _( |
367 .format(res_m = max_s/60))) | 391 "Stream Management enabled, with a resumption time of {res_m:.2f} min".format( |
392 res_m=max_s / 60 | |
393 ) | |
394 ) | |
395 ) | |
368 session.session_max = max_s | 396 session.session_max = max_s |
369 | 397 |
370 def on_resumed(self, enabled_elt, client): | 398 def on_resumed(self, enabled_elt, client): |
371 session = client._xep_0198_session | 399 session = client._xep_0198_session |
372 assert not session.enabled | 400 assert not session.enabled |
373 del session.resuming | 401 del session.resuming |
374 server_acked = int(enabled_elt['h']) | 402 server_acked = int(enabled_elt["h"]) |
375 self.update_buffer(session, server_acked) | 403 self.update_buffer(session, server_acked) |
376 resend_count = len(session.buffer) | 404 resend_count = len(session.buffer) |
377 # we resend all stanza which have not been received properly | 405 # we resend all stanza which have not been received properly |
378 self.replay_buffer(client, session.buffer) | 406 self.replay_buffer(client, session.buffer) |
379 # now we can continue the session | 407 # now we can continue the session |
380 session.enabled = True | 408 session.enabled = True |
381 d_time = time.time() - session.disconnected_time | 409 d_time = time.time() - session.disconnected_time |
382 log.info(_("Stream session resumed (disconnected for {d_time} s, {count} " | 410 log.info( |
383 "stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) | 411 _( |
412 "Stream session resumed (disconnected for {d_time} s, {count} " | |
413 "stanza(s) resent)" | |
414 ).format(d_time=int(d_time), count=resend_count) | |
415 ) | |
384 | 416 |
385 def on_failed(self, failed_elt, client): | 417 def on_failed(self, failed_elt, client): |
386 session = client._xep_0198_session | 418 session = client._xep_0198_session |
387 condition_elt = failed_elt.firstChildElement() | 419 condition_elt = failed_elt.firstChildElement() |
388 buffer_ = session.get_buffer_copy() | 420 buffer_ = session.get_buffer_copy() |
392 del session.resuming | 424 del session.resuming |
393 except AttributeError: | 425 except AttributeError: |
394 # stream management can't be started at all | 426 # stream management can't be started at all |
395 msg = _("Can't use stream management") | 427 msg = _("Can't use stream management") |
396 if condition_elt is None: | 428 if condition_elt is None: |
397 log.error(msg + '.') | 429 log.error(msg + ".") |
398 else: | 430 else: |
399 log.error(_("{msg}: {reason}").format( | 431 log.error(_("{msg}: {reason}").format(msg=msg, reason=condition_elt.name)) |
400 msg=msg, reason=condition_elt.name)) | |
401 else: | 432 else: |
402 # only stream resumption failed, we can try full session init | 433 # only stream resumption failed, we can try full session init |
403 # XXX: we try to start full session init from this point, with many | 434 # XXX: we try to start full session init from this point, with many |
404 # variables/attributes already initialised with a potentially different | 435 # variables/attributes already initialised with a potentially different |
405 # jid. This is experimental and may not be safe. It may be more | 436 # jid. This is experimental and may not be safe. It may be more |
406 # secured to abord the connection and restart everything with a fresh | 437 # secured to abord the connection and restart everything with a fresh |
407 # client. | 438 # client. |
408 msg = _("stream resumption not possible, restarting full session") | 439 msg = _("stream resumption not possible, restarting full session") |
409 | 440 |
410 if condition_elt is None: | 441 if condition_elt is None: |
411 log.warning('{msg}.'.format(msg=msg)) | 442 log.warning("{msg}.".format(msg=msg)) |
412 else: | 443 else: |
413 log.warning("{msg}: {reason}".format( | 444 log.warning("{msg}: {reason}".format(msg=msg, reason=condition_elt.name)) |
414 msg=msg, reason=condition_elt.name)) | |
415 # stream resumption failed, but we still can do normal stream management | 445 # stream resumption failed, but we still can do normal stream management |
416 # we restore attributes as if the session was new, and init stream | 446 # we restore attributes as if the session was new, and init stream |
417 # we keep everything initialized, and only do binding, roster request | 447 # we keep everything initialized, and only do binding, roster request |
418 # and initial presence sending. | 448 # and initial presence sending. |
419 if client.conn_deferred.called: | 449 if client.conn_deferred.called: |
420 client.conn_deferred = defer.Deferred() | 450 client.conn_deferred = defer.Deferred() |
421 else: | 451 else: |
422 log.error("conn_deferred should be called at this point") | 452 log.error("conn_deferred should be called at this point") |
423 plg_0045 = self.host.plugins.get('XEP-0045') | 453 plg_0045 = self.host.plugins.get("XEP-0045") |
424 plg_0313 = self.host.plugins.get('XEP-0313') | 454 plg_0313 = self.host.plugins.get("XEP-0313") |
425 | 455 |
426 # FIXME: we should call all loaded plugins with generic callbacks | 456 # FIXME: we should call all loaded plugins with generic callbacks |
427 # (e.g. prepareResume and resume), so a hot resuming can be done | 457 # (e.g. prepareResume and resume), so a hot resuming can be done |
428 # properly for all plugins. | 458 # properly for all plugins. |
429 | 459 |
430 if plg_0045 is not None: | 460 if plg_0045 is not None: |
431 # we have to remove joined rooms | 461 # we have to remove joined rooms |
432 muc_join_args = plg_0045.pop_rooms(client) | 462 muc_join_args = plg_0045.pop_rooms(client) |
433 # we need to recreate roster | 463 # we need to recreate roster |
434 client.handlers.remove(client.roster) | 464 client.handlers.remove(client.roster) |
435 client.roster = client.roster.__class__(self.host) | 465 client.roster = client.roster.__class__(self.host) |
436 client.roster.setHandlerParent(client) | 466 client.roster.setHandlerParent(client) |
437 # bind init is not done when resuming is possible, so we have to do it now | 467 # bind init is not done when resuming is possible, so we have to do it now |
438 bind_init = jabber_client.BindInitializer(client.xmlstream) | 468 bind_init = jabber_client.BindInitializer(client.xmlstream) |
439 bind_init.required = True | 469 bind_init.required = True |
440 d = bind_init.start() | 470 d = bind_init.start() |
441 # we set the jid, which may have changed | 471 # we set the jid, which may have changed |
442 d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid)) | 472 d.addCallback( |
473 lambda __: setattr(client.factory.authenticator, "jid", client.jid) | |
474 ) | |
443 # we call the trigger who will send the <enable/> element | 475 # we call the trigger who will send the <enable/> element |
444 d.addCallback(lambda __: self._xml_init_trigger(client)) | 476 d.addCallback(lambda __: self._xml_init_trigger(client)) |
445 # then we have to re-request the roster, as changes may have occured | 477 # then we have to re-request the roster, as changes may have occured |
446 d.addCallback(lambda __: client.roster.request_roster()) | 478 d.addCallback(lambda __: client.roster.request_roster()) |
447 # we add got_roster to be sure to have roster before sending initial presence | 479 # we add got_roster to be sure to have roster before sending initial presence |
452 # initial presence must be sent manually | 484 # initial presence must be sent manually |
453 d.addCallback(lambda __: client.presence.available()) | 485 d.addCallback(lambda __: client.presence.available()) |
454 if plg_0045 is not None: | 486 if plg_0045 is not None: |
455 # we re-join MUC rooms | 487 # we re-join MUC rooms |
456 muc_d_list = defer.DeferredList( | 488 muc_d_list = defer.DeferredList( |
457 [defer.ensureDeferred(plg_0045.join(*args)) | 489 [defer.ensureDeferred(plg_0045.join(*args)) for args in muc_join_args] |
458 for args in muc_join_args] | |
459 ) | 490 ) |
460 d.addCallback(lambda __: muc_d_list) | 491 d.addCallback(lambda __: muc_d_list) |
461 # at the end we replay the buffer, as those stanzas have probably not | 492 # at the end we replay the buffer, as those stanzas have probably not |
462 # been received | 493 # been received |
463 d.addCallback(lambda __: self.replay_buffer(client, buffer_, | 494 d.addCallback( |
464 discard_results=True)) | 495 lambda __: self.replay_buffer(client, buffer_, discard_results=True) |
496 ) | |
465 | 497 |
466 def on_receive(self, element, client): | 498 def on_receive(self, element, client): |
467 if not client.is_component: | 499 if not client.is_component: |
468 session = client._xep_0198_session | 500 session = client._xep_0198_session |
469 if session.enabled and element.name.lower() in C.STANZA_NAMES: | 501 if session.enabled and element.name.lower() in C.STANZA_NAMES: |
470 session.in_counter += 1 % MAX_COUNTER | 502 session.in_counter += 1 % MAX_COUNTER |
471 | 503 |
472 def on_send(self, obj, client): | 504 def on_send(self, obj, client): |
473 if not client.is_component: | 505 if not client.is_component: |
474 session = client._xep_0198_session | 506 session = client._xep_0198_session |
475 if (session.enabled | 507 if ( |
508 session.enabled | |
476 and domish.IElement.providedBy(obj) | 509 and domish.IElement.providedBy(obj) |
477 and obj.name.lower() in C.STANZA_NAMES): | 510 and obj.name.lower() in C.STANZA_NAMES |
511 ): | |
478 session.out_counter += 1 % MAX_COUNTER | 512 session.out_counter += 1 % MAX_COUNTER |
479 session.buffer.appendleft(obj) | 513 session.buffer.appendleft(obj) |
480 self.check_acks(client) | 514 self.check_acks(client) |
481 | 515 |
482 def on_ack_request(self, r_elt, client): | 516 def on_ack_request(self, r_elt, client): |
490 log.error("req_timer should be set") | 524 log.error("req_timer should be set") |
491 else: | 525 else: |
492 session.req_timer.cancel() | 526 session.req_timer.cancel() |
493 session.req_timer = None | 527 session.req_timer = None |
494 try: | 528 try: |
495 server_acked = int(a_elt['h']) | 529 server_acked = int(a_elt["h"]) |
496 except ValueError: | 530 except ValueError: |
497 log.warning(_("Server returned invalid ack element, disabling stream " | 531 log.warning( |
498 "management: {xml}").format(xml=a_elt)) | 532 _( |
533 "Server returned invalid ack element, disabling stream " | |
534 "management: {xml}" | |
535 ).format(xml=a_elt) | |
536 ) | |
499 session.enabled = False | 537 session.enabled = False |
500 return | 538 return |
501 | 539 |
502 if server_acked > session.out_counter: | 540 if server_acked > session.out_counter: |
503 log.error(_("Server acked more stanzas than we have sent, disabling stream " | 541 log.error( |
504 "management.")) | 542 _( |
543 "Server acked more stanzas than we have sent, disabling stream " | |
544 "management." | |
545 ) | |
546 ) | |
505 session.reset() | 547 session.reset() |
506 return | 548 return |
507 | 549 |
508 self.update_buffer(session, server_acked) | 550 self.update_buffer(session, server_acked) |
509 self.check_acks(client) | 551 self.check_acks(client) |