Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0198.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 4b842c1fb686 |
children |
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0198.py Tue Jun 18 12:06:45 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0198.py Wed Jun 19 18:44:57 2024 +0200 @@ -58,7 +58,7 @@ # Max number of seconds before requesting ack MAX_DELAY_ACK_R = 30 MAX_COUNTER = 2**32 -RESUME_MAX = 5*60 +RESUME_MAX = 5 * 60 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted ACK_TIMEOUT = 35 @@ -92,8 +92,7 @@ def enabled(self, enabled): if enabled: if self._enabled: - raise exceptions.InternalError( - "Stream Management can't be enabled twice") + raise exceptions.InternalError("Stream Management can't be enabled twice") self._enabled = True callback, kw = self.callback_data self.timer = task.LoopingCall(callback, **kw) @@ -133,25 +132,27 @@ def __init__(self, host): log.info(_("Plugin Stream Management initialization")) self.host = host - host.register_namespace('sm', NS_SM) + host.register_namespace("sm", NS_SM) host.trigger.add("stream_hooks", self.add_hooks) host.trigger.add("xml_init", self._xml_init_trigger) host.trigger.add("disconnecting", self._disconnecting_trigger) host.trigger.add("disconnected", self._disconnected_trigger) try: - self._ack_timeout = int(host.memory.config_get("", "ack_timeout", ACK_TIMEOUT)) + self._ack_timeout = int( + host.memory.config_get("", "ack_timeout", ACK_TIMEOUT) + ) except ValueError: log.error(_("Invalid ack_timeout value, please check your configuration")) self._ack_timeout = ACK_TIMEOUT if not self._ack_timeout: log.info(_("Ack timeout disabled")) else: - log.info(_("Ack timeout set to {timeout}s").format( - timeout=self._ack_timeout)) + log.info(_("Ack timeout set to {timeout}s").format(timeout=self._ack_timeout)) def profile_connecting(self, client): - client._xep_0198_session = ProfileSessionData(callback=self.check_acks, - client=client) + client._xep_0198_session = ProfileSessionData( + callback=self.check_acks, client=client + ) def get_handler(self, client): return XEP_0198_handler(self) @@ -164,12 +165,16 @@ def _xml_init_trigger(self, client): """Enable or resume a stream mangement""" - if not (NS_SM, 'sm') in client.xmlstream.features: - log.warning(_( - "Your server doesn't support stream management ({namespace}), this is " - "used to improve connection problems detection (like network outages). " - "Please ask your server administrator to enable this feature.".format( - namespace=NS_SM))) + if not (NS_SM, "sm") in client.xmlstream.features: + log.warning( + _( + "Your server doesn't support stream management ({namespace}), this is " + "used to improve connection problems detection (like network outages). " + "Please ask your server administrator to enable this feature.".format( + namespace=NS_SM + ) + ) + ) return True session = client._xep_0198_session @@ -185,9 +190,9 @@ if session.resume_enabled: # we are resuming a session - resume_elt = domish.Element((NS_SM, 'resume')) - resume_elt['h'] = str(session.in_counter) - resume_elt['previd'] = session.session_id + resume_elt = domish.Element((NS_SM, "resume")) + resume_elt["h"] = str(session.in_counter) + resume_elt["previd"] = session.session_id client.send(resume_elt) session.resuming = True # session.enabled will be set on <resumed/> reception @@ -195,8 +200,8 @@ else: # we start a new session assert session.out_counter == 0 - enable_elt = domish.Element((NS_SM, 'enable')) - enable_elt['resume'] = 'true' + enable_elt = domish.Element((NS_SM, "enable")) + enable_elt["resume"] = "true" client.send(enable_elt) session.enabled = True return True @@ -217,9 +222,9 @@ session.enabled = False if session.resume_enabled: session.disconnected_time = time.time() - session.disconnect_timer = reactor.callLater(session.session_max, - client.disconnect_profile, - reason) + session.disconnect_timer = reactor.callLater( + session.session_max, client.disconnect_profile, reason + ) # disconnect_profile must not be called at this point # because session can be resumed return False @@ -234,8 +239,10 @@ # session.buffer_idx)) if session.ack_requested or not session.buffer: return - if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R - or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R): + if ( + session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R + or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R + ): self.request_ack(client) session.ack_requested = True session.last_ack_r = time.time() @@ -253,8 +260,9 @@ "diff = {diff}\n" "server_acked = {server_acked}\n" "buffer_idx = {buffer_id}".format( - diff=diff, server_acked=server_acked, - buffer_id=session.buffer_idx)) + diff=diff, server_acked=server_acked, buffer_id=session.buffer_idx + ) + ) session.buffer_idx += diff def replay_buffer(self, client, buffer_, discard_results=False): @@ -270,37 +278,47 @@ except IndexError: break else: - if ((discard_results - and stanza.name == 'iq' - and stanza.getAttribute('type') == 'result')): + if ( + discard_results + and stanza.name == "iq" + and stanza.getAttribute("type") == "result" + ): continue client.send(stanza) def send_ack(self, client): """Send an answer element with current IN counter""" - a_elt = domish.Element((NS_SM, 'a')) - a_elt['h'] = str(client._xep_0198_session.in_counter) + a_elt = domish.Element((NS_SM, "a")) + a_elt["h"] = str(client._xep_0198_session.in_counter) client.send(a_elt) def request_ack(self, client): """Send a request element""" session = client._xep_0198_session - r_elt = domish.Element((NS_SM, 'r')) + r_elt = domish.Element((NS_SM, "r")) client.send(r_elt) if session.req_timer is not None: raise exceptions.InternalError("req_timer should not be set") if self._ack_timeout: - session.req_timer = reactor.callLater(self._ack_timeout, self.on_ack_time_out, - client) + session.req_timer = reactor.callLater( + self._ack_timeout, self.on_ack_time_out, client + ) def _connectionFailed(self, failure_, connector): normal_host, normal_port = connector.normal_location del connector.normal_location - log.warning(_( - "Connection failed using location given by server (host: {host}, port: " - "{port}), switching to normal host and port (host: {normal_host}, port: " - "{normal_port})".format(host=connector.host, port=connector.port, - normal_host=normal_host, normal_port=normal_port))) + log.warning( + _( + "Connection failed using location given by server (host: {host}, port: " + "{port}), switching to normal host and port (host: {normal_host}, port: " + "{normal_port})".format( + host=connector.host, + port=connector.port, + normal_host=normal_host, + normal_port=normal_port, + ) + ) + ) connector.host, connector.port = normal_host, normal_port connector.connectionFailed = connector.connectionFailed_ori del connector.connectionFailed_ori @@ -311,14 +329,17 @@ session.in_counter = 0 # we check that resuming is possible and that we have a session id - resume = C.bool(enabled_elt.getAttribute('resume')) - session_id = enabled_elt.getAttribute('id') + resume = C.bool(enabled_elt.getAttribute("resume")) + session_id = enabled_elt.getAttribute("id") if not session_id: log.warning(_('Incorrect <enabled/> element received, no "id" attribute')) if not resume or not session_id: - log.warning(_( - "You're server doesn't support session resuming with stream management, " - "please contact your server administrator to enable it")) + log.warning( + _( + "You're server doesn't support session resuming with stream management, " + "please contact your server administrator to enable it" + ) + ) return session.session_id = session_id @@ -329,17 +350,18 @@ # location, in case server want resuming session to be elsewhere try: - location = enabled_elt['location'] + location = enabled_elt["location"] except KeyError: pass else: # TODO: handle IPv6 here (in brackets, cf. XEP) try: - domain, port = location.split(':', 1) + domain, port = location.split(":", 1) port = int(port) except ValueError: - log.warning(_("Invalid location received: {location}") - .format(location=location)) + log.warning( + _("Invalid location received: {location}").format(location=location) + ) else: session.location = (domain, port) # we monkey patch connector to use the new location @@ -348,30 +370,36 @@ connector.host = domain connector.port = port connector.connectionFailed_ori = connector.connectionFailed - connector.connectionFailed = partial(self._connectionFailed, - connector=connector) + connector.connectionFailed = partial( + self._connectionFailed, connector=connector + ) # resuming time try: - max_s = int(enabled_elt['max']) + max_s = int(enabled_elt["max"]) except (ValueError, KeyError) as e: if isinstance(e, ValueError): log.warning(_('Invalid "max" attribute')) max_s = RESUME_MAX - log.info(_("Using default session max value ({max_s} s).".format( - max_s=max_s))) + log.info( + _("Using default session max value ({max_s} s).".format(max_s=max_s)) + ) log.info(_("Stream Management enabled")) else: - log.info(_( - "Stream Management enabled, with a resumption time of {res_m:.2f} min" - .format(res_m = max_s/60))) + log.info( + _( + "Stream Management enabled, with a resumption time of {res_m:.2f} min".format( + res_m=max_s / 60 + ) + ) + ) session.session_max = max_s def on_resumed(self, enabled_elt, client): session = client._xep_0198_session assert not session.enabled del session.resuming - server_acked = int(enabled_elt['h']) + server_acked = int(enabled_elt["h"]) self.update_buffer(session, server_acked) resend_count = len(session.buffer) # we resend all stanza which have not been received properly @@ -379,8 +407,12 @@ # now we can continue the session session.enabled = True d_time = time.time() - session.disconnected_time - log.info(_("Stream session resumed (disconnected for {d_time} s, {count} " - "stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) + log.info( + _( + "Stream session resumed (disconnected for {d_time} s, {count} " + "stanza(s) resent)" + ).format(d_time=int(d_time), count=resend_count) + ) def on_failed(self, failed_elt, client): session = client._xep_0198_session @@ -394,10 +426,9 @@ # stream management can't be started at all msg = _("Can't use stream management") if condition_elt is None: - log.error(msg + '.') + log.error(msg + ".") else: - log.error(_("{msg}: {reason}").format( - msg=msg, reason=condition_elt.name)) + log.error(_("{msg}: {reason}").format(msg=msg, reason=condition_elt.name)) else: # only stream resumption failed, we can try full session init # XXX: we try to start full session init from this point, with many @@ -408,10 +439,9 @@ msg = _("stream resumption not possible, restarting full session") if condition_elt is None: - log.warning('{msg}.'.format(msg=msg)) + log.warning("{msg}.".format(msg=msg)) else: - log.warning("{msg}: {reason}".format( - msg=msg, reason=condition_elt.name)) + log.warning("{msg}: {reason}".format(msg=msg, reason=condition_elt.name)) # stream resumption failed, but we still can do normal stream management # we restore attributes as if the session was new, and init stream # we keep everything initialized, and only do binding, roster request @@ -420,15 +450,15 @@ client.conn_deferred = defer.Deferred() else: log.error("conn_deferred should be called at this point") - plg_0045 = self.host.plugins.get('XEP-0045') - plg_0313 = self.host.plugins.get('XEP-0313') + plg_0045 = self.host.plugins.get("XEP-0045") + plg_0313 = self.host.plugins.get("XEP-0313") # FIXME: we should call all loaded plugins with generic callbacks # (e.g. prepareResume and resume), so a hot resuming can be done # properly for all plugins. if plg_0045 is not None: - # we have to remove joined rooms + # we have to remove joined rooms muc_join_args = plg_0045.pop_rooms(client) # we need to recreate roster client.handlers.remove(client.roster) @@ -439,7 +469,9 @@ bind_init.required = True d = bind_init.start() # we set the jid, which may have changed - d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid)) + d.addCallback( + lambda __: setattr(client.factory.authenticator, "jid", client.jid) + ) # we call the trigger who will send the <enable/> element d.addCallback(lambda __: self._xml_init_trigger(client)) # then we have to re-request the roster, as changes may have occured @@ -454,14 +486,14 @@ if plg_0045 is not None: # we re-join MUC rooms muc_d_list = defer.DeferredList( - [defer.ensureDeferred(plg_0045.join(*args)) - for args in muc_join_args] + [defer.ensureDeferred(plg_0045.join(*args)) for args in muc_join_args] ) d.addCallback(lambda __: muc_d_list) # at the end we replay the buffer, as those stanzas have probably not # been received - d.addCallback(lambda __: self.replay_buffer(client, buffer_, - discard_results=True)) + d.addCallback( + lambda __: self.replay_buffer(client, buffer_, discard_results=True) + ) def on_receive(self, element, client): if not client.is_component: @@ -472,9 +504,11 @@ def on_send(self, obj, client): if not client.is_component: session = client._xep_0198_session - if (session.enabled + if ( + session.enabled and domish.IElement.providedBy(obj) - and obj.name.lower() in C.STANZA_NAMES): + and obj.name.lower() in C.STANZA_NAMES + ): session.out_counter += 1 % MAX_COUNTER session.buffer.appendleft(obj) self.check_acks(client) @@ -492,16 +526,24 @@ session.req_timer.cancel() session.req_timer = None try: - server_acked = int(a_elt['h']) + server_acked = int(a_elt["h"]) except ValueError: - log.warning(_("Server returned invalid ack element, disabling stream " - "management: {xml}").format(xml=a_elt)) + log.warning( + _( + "Server returned invalid ack element, disabling stream " + "management: {xml}" + ).format(xml=a_elt) + ) session.enabled = False return if server_acked > session.out_counter: - log.error(_("Server acked more stanzas than we have sent, disabling stream " - "management.")) + log.error( + _( + "Server acked more stanzas than we have sent, disabling stream " + "management." + ) + ) session.reset() return