Mercurial > libervia-backend
diff sat/core/xmpp.py @ 2691:1ecceac3df96
plugin XEP-0198: Stream Management implementation:
- hooks can now be set in stream onElement and send methods
- xmllog refactored to use new hooks
- client.isConnected now uses transport.connected method
- fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour)
- new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized")
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 18 Nov 2018 15:49:46 +0100 |
parents | 943e78e18882 |
children | f64f1158a26e |
line wrap: on
line diff
--- a/sat/core/xmpp.py Sat Nov 10 10:16:38 2018 +0100 +++ b/sat/core/xmpp.py Sun Nov 18 15:49:46 2018 +0100 @@ -49,20 +49,20 @@ class SatXMPPEntity(object): """Common code for Client and Component""" - _reason = None # reason of disconnection def __init__(self, host_app, profile, max_retries): - - self.factory.clientConnectionLost = self.connectionLost - self.factory.maxRetries = max_retries - # when self._connected is None, we are not connected + factory = self.factory + factory.maxRetries = max_retries + factory.maxDelay = 30 + # when self._connected_d is None, we are not connected # else, it's a deferred which fire on disconnection - self._connected = None + self._connected_d = None self.profile = profile self.host_app = host_app self.cache = cache.Cache(host_app, profile) self._mess_id_uid = {} # map from message id to uid used in history. # Key: (full_jid,message_id) Value: uid + # this Deferred fire when entity is connected self.conn_deferred = defer.Deferred() self._progress_cb = {} # callback called when a progress is requested # (key = progress id) @@ -145,15 +145,12 @@ "Password", "Connection", profile_key=profile ) entity = host.profiles[profile] = cls( - host, - profile, - jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)), - password, - host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile) - or None, - port, - max_retries, - ) + host, profile, jid.JID(host.memory.getParamA("JabberID", "Connection", + profile_key=profile)), password, + host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", + profile_key=profile) or None, + port, max_retries, + ) entity._createSubProtocols() @@ -172,7 +169,7 @@ entity.startService() - yield entity.getConnectionDeferred() + yield entity.conn_deferred yield defer.maybeDeferred(entity.entityConnected) @@ -205,40 +202,16 @@ # we finally send our presence entity.presence.available() - def getConnectionDeferred(self): - """Return a deferred which fire when the client is connected""" - return self.conn_deferred - def _disconnectionCb(self, __): - self._connected = None + self._connected_d = None def _disconnectionEb(self, failure_): log.error(_(u"Error while disconnecting: {}".format(failure_))) def _authd(self, xmlstream): - if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile): - return super(SatXMPPEntity, self)._authd(xmlstream) - - if self._reason is not None: - # if we have had trouble to connect we can reset - # the exception as the connection is now working. - del self._reason - - # the following Deferred is used to know when we are connected - # so we need to be set it to None when connection is lost - self._connected = defer.Deferred() - self._connected.addCallback(self._cleanConnection) - self._connected.addCallback(self._disconnectionCb) - self._connected.addErrback(self._disconnectionEb) - - log.info( - _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile) - ) + log.debug(_(u"{profile} identified").format(profile=self.profile)) self.streamInitialized() - self.host_app.bridge.connected( - self.profile, unicode(self.jid) - ) # we send the signal to the clients def _finish_connection(self, __): self.conn_deferred.callback(None) @@ -246,11 +219,32 @@ def streamInitialized(self): """Called after _authd""" log.debug(_(u"XML stream is initialized")) + if not self.host_app.trigger.point("xml_init", self): + return + self.postStreamInit() + + def postStreamInit(self): + """Workflow after stream initalisation.""" + log.info( + _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile) + ) + + # the following Deferred is used to know when we are connected + # so we need to be set it to None when connection is lost + self._connected_d = defer.Deferred() + self._connected_d.addCallback(self._cleanConnection) + self._connected_d.addCallback(self._disconnectionCb) + self._connected_d.addErrback(self._disconnectionEb) + + self.host_app.bridge.connected( + self.profile, unicode(self.jid) + ) # we send the signal to the clients + + self.keep_alife = task.LoopingCall( self.xmlstream.send, " " ) # Needed to avoid disconnection (specially with openfire) self.keep_alife.start(C.XMPP_KEEP_ALIFE) - self.disco = SatDiscoProtocol(self) self.disco.setHandlerParent(self) self.discoHandler = disco.DiscoHandler() @@ -278,21 +272,27 @@ ## connection ## - def _disconnected(self, reason): - # we have to save the reason of disconnection, otherwise it would be lost - self._reason = reason - super(SatXMPPEntity, self)._disconnected(reason) + def _connected(self, xs): + send_hooks = [] + receive_hooks = [] + self.host_app.trigger.point( + "stream_hooks", self, receive_hooks, send_hooks) + for hook in receive_hooks: + xs.addHook(C.STREAM_HOOK_RECEIVE, hook) + for hook in send_hooks: + xs.addHook(C.STREAM_HOOK_SEND, hook) + super(SatXMPPEntity, self)._connected(xs) - def connectionLost(self, connector, reason): + def disconnectProfile(self, reason): try: self.keep_alife.stop() except AttributeError: log.debug(_("No keep_alife")) - if self._connected is not None: + if self._connected_d is not None: self.host_app.bridge.disconnected( self.profile ) # we send the signal to the clients - self._connected.callback(None) + self._connected_d.callback(None) self.host_app.purgeEntity( self.profile ) # and we remove references to this client @@ -302,12 +302,10 @@ ) ) if not self.conn_deferred.called: - # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody, - # we should have the real error) - if self._reason is None: + if reason is None: err = error.StreamError(u"Server unexpectedly closed the connection") else: - err = self._reason + err = reason try: if err.value.args[0][0][2] == "certificate verify failed": err = exceptions.InvalidCertificate( @@ -316,10 +314,17 @@ u"This should never happen and may indicate that " u"somebody is trying to spy on you.\n" u"Please contact your server administrator.")) + self.factory.continueTrying = 0 except (IndexError, TypeError): pass self.conn_deferred.errback(err) + def _disconnected(self, reason): + super(SatXMPPEntity, self)._disconnected(reason) + if not self.host_app.trigger.point("disconnected", self, reason): + return + self.disconnectProfile(reason) + @defer.inlineCallbacks def _cleanConnection(self, __): """method called on disconnection @@ -333,13 +338,18 @@ yield disconnected_cb(self) def isConnected(self): - return self._connected is not None + try: + return bool(self.xmlstream.transport.connected) + except AttributeError: + return False def entityDisconnect(self): + if not self.host_app.trigger.point("disconnecting", self): + return log.info(_(u"Disconnecting...")) self.stopService() - if self._connected is not None: - return self._connected + if self._connected_d is not None: + return self._connected_d else: return defer.succeed(None) @@ -735,16 +745,8 @@ ) # XXX: set to True from entry plugin to keep messages in history for received # messages - def __init__( - self, - host_app, - profile, - component_jid, - password, - host=None, - port=None, - max_retries=C.XMPP_MAX_RETRIES, - ): + def __init__(self, host_app, profile, component_jid, password, host=None, port=None, + max_retries=C.XMPP_MAX_RETRIES): self.started = time.time() if port is None: port = C.XMPP_COMPONENT_PORT