diff sat/core/xmpp.py @ 2691:1ecceac3df96

plugin XEP-0198: Stream Management implementation: - hooks can now be set in stream onElement and send methods - xmllog refactored to use new hooks - client.isConnected now uses transport.connected method - fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour) - new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized")
author Goffi <goffi@goffi.org>
date Sun, 18 Nov 2018 15:49:46 +0100
parents 943e78e18882
children f64f1158a26e
line wrap: on
line diff
--- a/sat/core/xmpp.py	Sat Nov 10 10:16:38 2018 +0100
+++ b/sat/core/xmpp.py	Sun Nov 18 15:49:46 2018 +0100
@@ -49,20 +49,20 @@
 
 class SatXMPPEntity(object):
     """Common code for Client and Component"""
-    _reason = None  # reason of disconnection
 
     def __init__(self, host_app, profile, max_retries):
-
-        self.factory.clientConnectionLost = self.connectionLost
-        self.factory.maxRetries = max_retries
-        # when self._connected is None, we are not connected
+        factory = self.factory
+        factory.maxRetries = max_retries
+        factory.maxDelay = 30
+        # when self._connected_d is None, we are not connected
         # else, it's a deferred which fire on disconnection
-        self._connected = None
+        self._connected_d = None
         self.profile = profile
         self.host_app = host_app
         self.cache = cache.Cache(host_app, profile)
         self._mess_id_uid = {}  # map from message id to uid used in history.
                                 # Key: (full_jid,message_id) Value: uid
+        # this Deferred fire when entity is connected
         self.conn_deferred = defer.Deferred()
         self._progress_cb = {}  # callback called when a progress is requested
                                 # (key = progress id)
@@ -145,15 +145,12 @@
             "Password", "Connection", profile_key=profile
         )
         entity = host.profiles[profile] = cls(
-            host,
-            profile,
-            jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)),
-            password,
-            host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile)
-            or None,
-            port,
-            max_retries,
-        )
+            host, profile, jid.JID(host.memory.getParamA("JabberID", "Connection",
+            profile_key=profile)), password,
+            host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection",
+                                  profile_key=profile) or None,
+            port, max_retries,
+            )
 
         entity._createSubProtocols()
 
@@ -172,7 +169,7 @@
 
         entity.startService()
 
-        yield entity.getConnectionDeferred()
+        yield entity.conn_deferred
 
         yield defer.maybeDeferred(entity.entityConnected)
 
@@ -205,40 +202,16 @@
         # we finally send our presence
         entity.presence.available()
 
-    def getConnectionDeferred(self):
-        """Return a deferred which fire when the client is connected"""
-        return self.conn_deferred
-
     def _disconnectionCb(self, __):
-        self._connected = None
+        self._connected_d = None
 
     def _disconnectionEb(self, failure_):
         log.error(_(u"Error while disconnecting: {}".format(failure_)))
 
     def _authd(self, xmlstream):
-        if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile):
-            return
         super(SatXMPPEntity, self)._authd(xmlstream)
-
-        if self._reason is not None:
-            # if we have had trouble to connect we can reset
-            # the exception as the connection is now working.
-            del self._reason
-
-        # the following Deferred is used to know when we are connected
-        # so we need to be set it to None when connection is lost
-        self._connected = defer.Deferred()
-        self._connected.addCallback(self._cleanConnection)
-        self._connected.addCallback(self._disconnectionCb)
-        self._connected.addErrback(self._disconnectionEb)
-
-        log.info(
-            _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile)
-        )
+        log.debug(_(u"{profile} identified").format(profile=self.profile))
         self.streamInitialized()
-        self.host_app.bridge.connected(
-            self.profile, unicode(self.jid)
-        )  # we send the signal to the clients
 
     def _finish_connection(self, __):
         self.conn_deferred.callback(None)
@@ -246,11 +219,32 @@
     def streamInitialized(self):
         """Called after _authd"""
         log.debug(_(u"XML stream is initialized"))
+        if not self.host_app.trigger.point("xml_init", self):
+            return
+        self.postStreamInit()
+
+    def postStreamInit(self):
+        """Workflow after stream initalisation."""
+        log.info(
+            _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile)
+        )
+
+        # the following Deferred is used to know when we are connected
+        # so we need to be set it to None when connection is lost
+        self._connected_d = defer.Deferred()
+        self._connected_d.addCallback(self._cleanConnection)
+        self._connected_d.addCallback(self._disconnectionCb)
+        self._connected_d.addErrback(self._disconnectionEb)
+
+        self.host_app.bridge.connected(
+            self.profile, unicode(self.jid)
+        )  # we send the signal to the clients
+
+
         self.keep_alife = task.LoopingCall(
             self.xmlstream.send, " "
         )  # Needed to avoid disconnection (specially with openfire)
         self.keep_alife.start(C.XMPP_KEEP_ALIFE)
-
         self.disco = SatDiscoProtocol(self)
         self.disco.setHandlerParent(self)
         self.discoHandler = disco.DiscoHandler()
@@ -278,21 +272,27 @@
 
     ## connection ##
 
-    def _disconnected(self, reason):
-        # we have to save the reason of disconnection, otherwise it would be lost
-        self._reason = reason
-        super(SatXMPPEntity, self)._disconnected(reason)
+    def _connected(self, xs):
+        send_hooks = []
+        receive_hooks = []
+        self.host_app.trigger.point(
+            "stream_hooks", self, receive_hooks, send_hooks)
+        for hook in receive_hooks:
+            xs.addHook(C.STREAM_HOOK_RECEIVE, hook)
+        for hook in send_hooks:
+            xs.addHook(C.STREAM_HOOK_SEND, hook)
+        super(SatXMPPEntity, self)._connected(xs)
 
-    def connectionLost(self, connector, reason):
+    def disconnectProfile(self, reason):
         try:
             self.keep_alife.stop()
         except AttributeError:
             log.debug(_("No keep_alife"))
-        if self._connected is not None:
+        if self._connected_d is not None:
             self.host_app.bridge.disconnected(
                 self.profile
             )  # we send the signal to the clients
-            self._connected.callback(None)
+            self._connected_d.callback(None)
             self.host_app.purgeEntity(
                 self.profile
             )  # and we remove references to this client
@@ -302,12 +302,10 @@
                 )
             )
         if not self.conn_deferred.called:
-            # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody,
-            #        we should have the real error)
-            if self._reason is None:
+            if reason is None:
                 err = error.StreamError(u"Server unexpectedly closed the connection")
             else:
-                err = self._reason
+                err = reason
                 try:
                     if err.value.args[0][0][2] == "certificate verify failed":
                         err = exceptions.InvalidCertificate(
@@ -316,10 +314,17 @@
                               u"This should never happen and may indicate that "
                               u"somebody is trying to spy on you.\n"
                               u"Please contact your server administrator."))
+                        self.factory.continueTrying = 0
                 except (IndexError, TypeError):
                     pass
             self.conn_deferred.errback(err)
 
+    def _disconnected(self, reason):
+        super(SatXMPPEntity, self)._disconnected(reason)
+        if not self.host_app.trigger.point("disconnected", self, reason):
+            return
+        self.disconnectProfile(reason)
+
     @defer.inlineCallbacks
     def _cleanConnection(self, __):
         """method called on disconnection
@@ -333,13 +338,18 @@
                 yield disconnected_cb(self)
 
     def isConnected(self):
-        return self._connected is not None
+        try:
+            return bool(self.xmlstream.transport.connected)
+        except AttributeError:
+            return False
 
     def entityDisconnect(self):
+        if not self.host_app.trigger.point("disconnecting", self):
+            return
         log.info(_(u"Disconnecting..."))
         self.stopService()
-        if self._connected is not None:
-            return self._connected
+        if self._connected_d is not None:
+            return self._connected_d
         else:
             return defer.succeed(None)
 
@@ -735,16 +745,8 @@
     )  # XXX: set to True from entry plugin to keep messages in history for received
        #      messages
 
-    def __init__(
-        self,
-        host_app,
-        profile,
-        component_jid,
-        password,
-        host=None,
-        port=None,
-        max_retries=C.XMPP_MAX_RETRIES,
-    ):
+    def __init__(self, host_app, profile, component_jid, password, host=None, port=None,
+                 max_retries=C.XMPP_MAX_RETRIES):
         self.started = time.time()
         if port is None:
             port = C.XMPP_COMPONENT_PORT