diff libervia/backend/plugins/plugin_xep_0198.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 4b842c1fb686
children
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0198.py	Tue Jun 18 12:06:45 2024 +0200
+++ b/libervia/backend/plugins/plugin_xep_0198.py	Wed Jun 19 18:44:57 2024 +0200
@@ -58,7 +58,7 @@
 # Max number of seconds before requesting ack
 MAX_DELAY_ACK_R = 30
 MAX_COUNTER = 2**32
-RESUME_MAX = 5*60
+RESUME_MAX = 5 * 60
 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted
 ACK_TIMEOUT = 35
 
@@ -92,8 +92,7 @@
     def enabled(self, enabled):
         if enabled:
             if self._enabled:
-                raise exceptions.InternalError(
-                    "Stream Management can't be enabled twice")
+                raise exceptions.InternalError("Stream Management can't be enabled twice")
             self._enabled = True
             callback, kw = self.callback_data
             self.timer = task.LoopingCall(callback, **kw)
@@ -133,25 +132,27 @@
     def __init__(self, host):
         log.info(_("Plugin Stream Management initialization"))
         self.host = host
-        host.register_namespace('sm', NS_SM)
+        host.register_namespace("sm", NS_SM)
         host.trigger.add("stream_hooks", self.add_hooks)
         host.trigger.add("xml_init", self._xml_init_trigger)
         host.trigger.add("disconnecting", self._disconnecting_trigger)
         host.trigger.add("disconnected", self._disconnected_trigger)
         try:
-            self._ack_timeout = int(host.memory.config_get("", "ack_timeout", ACK_TIMEOUT))
+            self._ack_timeout = int(
+                host.memory.config_get("", "ack_timeout", ACK_TIMEOUT)
+            )
         except ValueError:
             log.error(_("Invalid ack_timeout value, please check your configuration"))
             self._ack_timeout = ACK_TIMEOUT
         if not self._ack_timeout:
             log.info(_("Ack timeout disabled"))
         else:
-            log.info(_("Ack timeout set to {timeout}s").format(
-                timeout=self._ack_timeout))
+            log.info(_("Ack timeout set to {timeout}s").format(timeout=self._ack_timeout))
 
     def profile_connecting(self, client):
-        client._xep_0198_session = ProfileSessionData(callback=self.check_acks,
-                                                      client=client)
+        client._xep_0198_session = ProfileSessionData(
+            callback=self.check_acks, client=client
+        )
 
     def get_handler(self, client):
         return XEP_0198_handler(self)
@@ -164,12 +165,16 @@
 
     def _xml_init_trigger(self, client):
         """Enable or resume a stream mangement"""
-        if not (NS_SM, 'sm') in client.xmlstream.features:
-            log.warning(_(
-                "Your server doesn't support stream management ({namespace}), this is "
-                "used to improve connection problems detection (like network outages). "
-                "Please ask your server administrator to enable this feature.".format(
-                namespace=NS_SM)))
+        if not (NS_SM, "sm") in client.xmlstream.features:
+            log.warning(
+                _(
+                    "Your server doesn't support stream management ({namespace}), this is "
+                    "used to improve connection problems detection (like network outages). "
+                    "Please ask your server administrator to enable this feature.".format(
+                        namespace=NS_SM
+                    )
+                )
+            )
             return True
         session = client._xep_0198_session
 
@@ -185,9 +190,9 @@
 
         if session.resume_enabled:
             # we are resuming a session
-            resume_elt = domish.Element((NS_SM, 'resume'))
-            resume_elt['h'] = str(session.in_counter)
-            resume_elt['previd'] = session.session_id
+            resume_elt = domish.Element((NS_SM, "resume"))
+            resume_elt["h"] = str(session.in_counter)
+            resume_elt["previd"] = session.session_id
             client.send(resume_elt)
             session.resuming = True
             # session.enabled will be set on <resumed/> reception
@@ -195,8 +200,8 @@
         else:
             # we start a new session
             assert session.out_counter == 0
-            enable_elt = domish.Element((NS_SM, 'enable'))
-            enable_elt['resume'] = 'true'
+            enable_elt = domish.Element((NS_SM, "enable"))
+            enable_elt["resume"] = "true"
             client.send(enable_elt)
             session.enabled = True
             return True
@@ -217,9 +222,9 @@
         session.enabled = False
         if session.resume_enabled:
             session.disconnected_time = time.time()
-            session.disconnect_timer = reactor.callLater(session.session_max,
-                                                         client.disconnect_profile,
-                                                         reason)
+            session.disconnect_timer = reactor.callLater(
+                session.session_max, client.disconnect_profile, reason
+            )
             # disconnect_profile must not be called at this point
             # because session can be resumed
             return False
@@ -234,8 +239,10 @@
         #             session.buffer_idx))
         if session.ack_requested or not session.buffer:
             return
-        if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R
-            or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R):
+        if (
+            session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R
+            or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R
+        ):
             self.request_ack(client)
             session.ack_requested = True
             session.last_ack_r = time.time()
@@ -253,8 +260,9 @@
                     "diff = {diff}\n"
                     "server_acked = {server_acked}\n"
                     "buffer_idx = {buffer_id}".format(
-                        diff=diff, server_acked=server_acked,
-                        buffer_id=session.buffer_idx))
+                        diff=diff, server_acked=server_acked, buffer_id=session.buffer_idx
+                    )
+                )
             session.buffer_idx += diff
 
     def replay_buffer(self, client, buffer_, discard_results=False):
@@ -270,37 +278,47 @@
             except IndexError:
                 break
             else:
-                if ((discard_results
-                     and stanza.name == 'iq'
-                     and stanza.getAttribute('type') == 'result')):
+                if (
+                    discard_results
+                    and stanza.name == "iq"
+                    and stanza.getAttribute("type") == "result"
+                ):
                     continue
                 client.send(stanza)
 
     def send_ack(self, client):
         """Send an answer element with current IN counter"""
-        a_elt = domish.Element((NS_SM, 'a'))
-        a_elt['h'] = str(client._xep_0198_session.in_counter)
+        a_elt = domish.Element((NS_SM, "a"))
+        a_elt["h"] = str(client._xep_0198_session.in_counter)
         client.send(a_elt)
 
     def request_ack(self, client):
         """Send a request element"""
         session = client._xep_0198_session
-        r_elt = domish.Element((NS_SM, 'r'))
+        r_elt = domish.Element((NS_SM, "r"))
         client.send(r_elt)
         if session.req_timer is not None:
             raise exceptions.InternalError("req_timer should not be set")
         if self._ack_timeout:
-            session.req_timer = reactor.callLater(self._ack_timeout, self.on_ack_time_out,
-                                                  client)
+            session.req_timer = reactor.callLater(
+                self._ack_timeout, self.on_ack_time_out, client
+            )
 
     def _connectionFailed(self, failure_, connector):
         normal_host, normal_port = connector.normal_location
         del connector.normal_location
-        log.warning(_(
-            "Connection failed using location given by server (host: {host}, port: "
-            "{port}), switching to normal host and port (host: {normal_host}, port: "
-            "{normal_port})".format(host=connector.host, port=connector.port,
-                                     normal_host=normal_host, normal_port=normal_port)))
+        log.warning(
+            _(
+                "Connection failed using location given by server (host: {host}, port: "
+                "{port}), switching to normal host and port (host: {normal_host}, port: "
+                "{normal_port})".format(
+                    host=connector.host,
+                    port=connector.port,
+                    normal_host=normal_host,
+                    normal_port=normal_port,
+                )
+            )
+        )
         connector.host, connector.port = normal_host, normal_port
         connector.connectionFailed = connector.connectionFailed_ori
         del connector.connectionFailed_ori
@@ -311,14 +329,17 @@
         session.in_counter = 0
 
         # we check that resuming is possible and that we have a session id
-        resume = C.bool(enabled_elt.getAttribute('resume'))
-        session_id = enabled_elt.getAttribute('id')
+        resume = C.bool(enabled_elt.getAttribute("resume"))
+        session_id = enabled_elt.getAttribute("id")
         if not session_id:
             log.warning(_('Incorrect <enabled/> element received, no "id" attribute'))
         if not resume or not session_id:
-            log.warning(_(
-                "You're server doesn't support session resuming with stream management, "
-                "please contact your server administrator to enable it"))
+            log.warning(
+                _(
+                    "You're server doesn't support session resuming with stream management, "
+                    "please contact your server administrator to enable it"
+                )
+            )
             return
 
         session.session_id = session_id
@@ -329,17 +350,18 @@
 
         # location, in case server want resuming session to be elsewhere
         try:
-            location = enabled_elt['location']
+            location = enabled_elt["location"]
         except KeyError:
             pass
         else:
             # TODO: handle IPv6 here (in brackets, cf. XEP)
             try:
-                domain, port = location.split(':', 1)
+                domain, port = location.split(":", 1)
                 port = int(port)
             except ValueError:
-                log.warning(_("Invalid location received: {location}")
-                    .format(location=location))
+                log.warning(
+                    _("Invalid location received: {location}").format(location=location)
+                )
             else:
                 session.location = (domain, port)
                 # we monkey patch connector to use the new location
@@ -348,30 +370,36 @@
                 connector.host = domain
                 connector.port = port
                 connector.connectionFailed_ori = connector.connectionFailed
-                connector.connectionFailed = partial(self._connectionFailed,
-                                                     connector=connector)
+                connector.connectionFailed = partial(
+                    self._connectionFailed, connector=connector
+                )
 
         # resuming time
         try:
-            max_s = int(enabled_elt['max'])
+            max_s = int(enabled_elt["max"])
         except (ValueError, KeyError) as e:
             if isinstance(e, ValueError):
                 log.warning(_('Invalid "max" attribute'))
             max_s = RESUME_MAX
-            log.info(_("Using default session max value ({max_s} s).".format(
-                max_s=max_s)))
+            log.info(
+                _("Using default session max value ({max_s} s).".format(max_s=max_s))
+            )
             log.info(_("Stream Management enabled"))
         else:
-            log.info(_(
-                "Stream Management enabled, with a resumption time of {res_m:.2f} min"
-                .format(res_m = max_s/60)))
+            log.info(
+                _(
+                    "Stream Management enabled, with a resumption time of {res_m:.2f} min".format(
+                        res_m=max_s / 60
+                    )
+                )
+            )
         session.session_max = max_s
 
     def on_resumed(self, enabled_elt, client):
         session = client._xep_0198_session
         assert not session.enabled
         del session.resuming
-        server_acked = int(enabled_elt['h'])
+        server_acked = int(enabled_elt["h"])
         self.update_buffer(session, server_acked)
         resend_count = len(session.buffer)
         # we resend all stanza which have not been received properly
@@ -379,8 +407,12 @@
         # now we can continue the session
         session.enabled = True
         d_time = time.time() - session.disconnected_time
-        log.info(_("Stream session resumed (disconnected for {d_time} s, {count} "
-                   "stanza(s) resent)").format(d_time=int(d_time), count=resend_count))
+        log.info(
+            _(
+                "Stream session resumed (disconnected for {d_time} s, {count} "
+                "stanza(s) resent)"
+            ).format(d_time=int(d_time), count=resend_count)
+        )
 
     def on_failed(self, failed_elt, client):
         session = client._xep_0198_session
@@ -394,10 +426,9 @@
             # stream management can't be started at all
             msg = _("Can't use stream management")
             if condition_elt is None:
-                log.error(msg + '.')
+                log.error(msg + ".")
             else:
-                log.error(_("{msg}: {reason}").format(
-                msg=msg, reason=condition_elt.name))
+                log.error(_("{msg}: {reason}").format(msg=msg, reason=condition_elt.name))
         else:
             # only stream resumption failed, we can try full session init
             # XXX: we try to start full session init from this point, with many
@@ -408,10 +439,9 @@
             msg = _("stream resumption not possible, restarting full session")
 
             if condition_elt is None:
-                log.warning('{msg}.'.format(msg=msg))
+                log.warning("{msg}.".format(msg=msg))
             else:
-                log.warning("{msg}: {reason}".format(
-                    msg=msg, reason=condition_elt.name))
+                log.warning("{msg}: {reason}".format(msg=msg, reason=condition_elt.name))
             # stream resumption failed, but we still can do normal stream management
             # we restore attributes as if the session was new, and init stream
             # we keep everything initialized, and only do binding, roster request
@@ -420,15 +450,15 @@
                 client.conn_deferred = defer.Deferred()
             else:
                 log.error("conn_deferred should be called at this point")
-            plg_0045 = self.host.plugins.get('XEP-0045')
-            plg_0313 = self.host.plugins.get('XEP-0313')
+            plg_0045 = self.host.plugins.get("XEP-0045")
+            plg_0313 = self.host.plugins.get("XEP-0313")
 
             # FIXME: we should call all loaded plugins with generic callbacks
             #        (e.g. prepareResume and resume), so a hot resuming can be done
             #        properly for all plugins.
 
             if plg_0045 is not None:
-                # we have to remove joined rooms
+                # we have to remove joined rooms
                 muc_join_args = plg_0045.pop_rooms(client)
             # we need to recreate roster
             client.handlers.remove(client.roster)
@@ -439,7 +469,9 @@
             bind_init.required = True
             d = bind_init.start()
             # we set the jid, which may have changed
-            d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid))
+            d.addCallback(
+                lambda __: setattr(client.factory.authenticator, "jid", client.jid)
+            )
             # we call the trigger who will send the <enable/> element
             d.addCallback(lambda __: self._xml_init_trigger(client))
             # then we have to re-request the roster, as changes may have occured
@@ -454,14 +486,14 @@
             if plg_0045 is not None:
                 # we re-join MUC rooms
                 muc_d_list = defer.DeferredList(
-                    [defer.ensureDeferred(plg_0045.join(*args))
-                     for args in muc_join_args]
+                    [defer.ensureDeferred(plg_0045.join(*args)) for args in muc_join_args]
                 )
                 d.addCallback(lambda __: muc_d_list)
             # at the end we replay the buffer, as those stanzas have probably not
             # been received
-            d.addCallback(lambda __: self.replay_buffer(client, buffer_,
-                                                       discard_results=True))
+            d.addCallback(
+                lambda __: self.replay_buffer(client, buffer_, discard_results=True)
+            )
 
     def on_receive(self, element, client):
         if not client.is_component:
@@ -472,9 +504,11 @@
     def on_send(self, obj, client):
         if not client.is_component:
             session = client._xep_0198_session
-            if (session.enabled
+            if (
+                session.enabled
                 and domish.IElement.providedBy(obj)
-                and obj.name.lower() in C.STANZA_NAMES):
+                and obj.name.lower() in C.STANZA_NAMES
+            ):
                 session.out_counter += 1 % MAX_COUNTER
                 session.buffer.appendleft(obj)
                 self.check_acks(client)
@@ -492,16 +526,24 @@
                 session.req_timer.cancel()
                 session.req_timer = None
         try:
-            server_acked = int(a_elt['h'])
+            server_acked = int(a_elt["h"])
         except ValueError:
-            log.warning(_("Server returned invalid ack element, disabling stream "
-                          "management: {xml}").format(xml=a_elt))
+            log.warning(
+                _(
+                    "Server returned invalid ack element, disabling stream "
+                    "management: {xml}"
+                ).format(xml=a_elt)
+            )
             session.enabled = False
             return
 
         if server_acked > session.out_counter:
-            log.error(_("Server acked more stanzas than we have sent, disabling stream "
-                        "management."))
+            log.error(
+                _(
+                    "Server acked more stanzas than we have sent, disabling stream "
+                    "management."
+                )
+            )
             session.reset()
             return