comparison sat/core/xmpp.py @ 2691:1ecceac3df96

plugin XEP-0198: Stream Management implementation: - hooks can now be set in stream onElement and send methods - xmllog refactored to use new hooks - client.isConnected now uses transport.connected method - fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour) - new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized")
author Goffi <goffi@goffi.org>
date Sun, 18 Nov 2018 15:49:46 +0100
parents 943e78e18882
children f64f1158a26e
comparison
equal deleted inserted replaced
2690:56bfe1b79204 2691:1ecceac3df96
47 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info" 47 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info"
48 48
49 49
50 class SatXMPPEntity(object): 50 class SatXMPPEntity(object):
51 """Common code for Client and Component""" 51 """Common code for Client and Component"""
52 _reason = None # reason of disconnection
53 52
54 def __init__(self, host_app, profile, max_retries): 53 def __init__(self, host_app, profile, max_retries):
55 54 factory = self.factory
56 self.factory.clientConnectionLost = self.connectionLost 55 factory.maxRetries = max_retries
57 self.factory.maxRetries = max_retries 56 factory.maxDelay = 30
58 # when self._connected is None, we are not connected 57 # when self._connected_d is None, we are not connected
59 # else, it's a deferred which fire on disconnection 58 # else, it's a deferred which fire on disconnection
60 self._connected = None 59 self._connected_d = None
61 self.profile = profile 60 self.profile = profile
62 self.host_app = host_app 61 self.host_app = host_app
63 self.cache = cache.Cache(host_app, profile) 62 self.cache = cache.Cache(host_app, profile)
64 self._mess_id_uid = {} # map from message id to uid used in history. 63 self._mess_id_uid = {} # map from message id to uid used in history.
65 # Key: (full_jid,message_id) Value: uid 64 # Key: (full_jid,message_id) Value: uid
65 # this Deferred fire when entity is connected
66 self.conn_deferred = defer.Deferred() 66 self.conn_deferred = defer.Deferred()
67 self._progress_cb = {} # callback called when a progress is requested 67 self._progress_cb = {} # callback called when a progress is requested
68 # (key = progress id) 68 # (key = progress id)
69 self.actions = {} # used to keep track of actions for retrieval (key = action_id) 69 self.actions = {} # used to keep track of actions for retrieval (key = action_id)
70 self.encryption = encryption.EncryptionHandler(self) 70 self.encryption = encryption.EncryptionHandler(self)
143 143
144 password = yield host.memory.asyncGetParamA( 144 password = yield host.memory.asyncGetParamA(
145 "Password", "Connection", profile_key=profile 145 "Password", "Connection", profile_key=profile
146 ) 146 )
147 entity = host.profiles[profile] = cls( 147 entity = host.profiles[profile] = cls(
148 host, 148 host, profile, jid.JID(host.memory.getParamA("JabberID", "Connection",
149 profile, 149 profile_key=profile)), password,
150 jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)), 150 host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection",
151 password, 151 profile_key=profile) or None,
152 host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile) 152 port, max_retries,
153 or None, 153 )
154 port,
155 max_retries,
156 )
157 154
158 entity._createSubProtocols() 155 entity._createSubProtocols()
159 156
160 entity.fallBack = SatFallbackHandler(host) 157 entity.fallBack = SatFallbackHandler(host)
161 entity.fallBack.setHandlerParent(entity) 158 entity.fallBack.setHandlerParent(entity)
170 167
171 plugin_conn_cb = yield entity._callConnectionTriggers() 168 plugin_conn_cb = yield entity._callConnectionTriggers()
172 169
173 entity.startService() 170 entity.startService()
174 171
175 yield entity.getConnectionDeferred() 172 yield entity.conn_deferred
176 173
177 yield defer.maybeDeferred(entity.entityConnected) 174 yield defer.maybeDeferred(entity.entityConnected)
178 175
179 # Call profileConnected callback for all plugins, 176 # Call profileConnected callback for all plugins,
180 # and print error message if any of them fails 177 # and print error message if any of them fails
203 # TODO: mesure launch time of each plugin 200 # TODO: mesure launch time of each plugin
204 201
205 # we finally send our presence 202 # we finally send our presence
206 entity.presence.available() 203 entity.presence.available()
207 204
208 def getConnectionDeferred(self):
209 """Return a deferred which fire when the client is connected"""
210 return self.conn_deferred
211
212 def _disconnectionCb(self, __): 205 def _disconnectionCb(self, __):
213 self._connected = None 206 self._connected_d = None
214 207
215 def _disconnectionEb(self, failure_): 208 def _disconnectionEb(self, failure_):
216 log.error(_(u"Error while disconnecting: {}".format(failure_))) 209 log.error(_(u"Error while disconnecting: {}".format(failure_)))
217 210
218 def _authd(self, xmlstream): 211 def _authd(self, xmlstream):
219 if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile): 212 super(SatXMPPEntity, self)._authd(xmlstream)
213 log.debug(_(u"{profile} identified").format(profile=self.profile))
214 self.streamInitialized()
215
216 def _finish_connection(self, __):
217 self.conn_deferred.callback(None)
218
219 def streamInitialized(self):
220 """Called after _authd"""
221 log.debug(_(u"XML stream is initialized"))
222 if not self.host_app.trigger.point("xml_init", self):
220 return 223 return
221 super(SatXMPPEntity, self)._authd(xmlstream) 224 self.postStreamInit()
222 225
223 if self._reason is not None: 226 def postStreamInit(self):
224 # if we have had trouble to connect we can reset 227 """Workflow after stream initalisation."""
225 # the exception as the connection is now working.
226 del self._reason
227
228 # the following Deferred is used to know when we are connected
229 # so we need to be set it to None when connection is lost
230 self._connected = defer.Deferred()
231 self._connected.addCallback(self._cleanConnection)
232 self._connected.addCallback(self._disconnectionCb)
233 self._connected.addErrback(self._disconnectionEb)
234
235 log.info( 228 log.info(
236 _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile) 229 _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile)
237 ) 230 )
238 self.streamInitialized() 231
232 # the following Deferred is used to know when we are connected
233 # so we need to be set it to None when connection is lost
234 self._connected_d = defer.Deferred()
235 self._connected_d.addCallback(self._cleanConnection)
236 self._connected_d.addCallback(self._disconnectionCb)
237 self._connected_d.addErrback(self._disconnectionEb)
238
239 self.host_app.bridge.connected( 239 self.host_app.bridge.connected(
240 self.profile, unicode(self.jid) 240 self.profile, unicode(self.jid)
241 ) # we send the signal to the clients 241 ) # we send the signal to the clients
242 242
243 def _finish_connection(self, __): 243
244 self.conn_deferred.callback(None)
245
246 def streamInitialized(self):
247 """Called after _authd"""
248 log.debug(_(u"XML stream is initialized"))
249 self.keep_alife = task.LoopingCall( 244 self.keep_alife = task.LoopingCall(
250 self.xmlstream.send, " " 245 self.xmlstream.send, " "
251 ) # Needed to avoid disconnection (specially with openfire) 246 ) # Needed to avoid disconnection (specially with openfire)
252 self.keep_alife.start(C.XMPP_KEEP_ALIFE) 247 self.keep_alife.start(C.XMPP_KEEP_ALIFE)
253
254 self.disco = SatDiscoProtocol(self) 248 self.disco = SatDiscoProtocol(self)
255 self.disco.setHandlerParent(self) 249 self.disco.setHandlerParent(self)
256 self.discoHandler = disco.DiscoHandler() 250 self.discoHandler = disco.DiscoHandler()
257 self.discoHandler.setHandlerParent(self) 251 self.discoHandler.setHandlerParent(self)
258 disco_d = defer.succeed(None) 252 disco_d = defer.succeed(None)
276 # we already chained an errback, no need to raise an exception 270 # we already chained an errback, no need to raise an exception
277 pass 271 pass
278 272
279 ## connection ## 273 ## connection ##
280 274
281 def _disconnected(self, reason): 275 def _connected(self, xs):
282 # we have to save the reason of disconnection, otherwise it would be lost 276 send_hooks = []
283 self._reason = reason 277 receive_hooks = []
284 super(SatXMPPEntity, self)._disconnected(reason) 278 self.host_app.trigger.point(
285 279 "stream_hooks", self, receive_hooks, send_hooks)
286 def connectionLost(self, connector, reason): 280 for hook in receive_hooks:
281 xs.addHook(C.STREAM_HOOK_RECEIVE, hook)
282 for hook in send_hooks:
283 xs.addHook(C.STREAM_HOOK_SEND, hook)
284 super(SatXMPPEntity, self)._connected(xs)
285
286 def disconnectProfile(self, reason):
287 try: 287 try:
288 self.keep_alife.stop() 288 self.keep_alife.stop()
289 except AttributeError: 289 except AttributeError:
290 log.debug(_("No keep_alife")) 290 log.debug(_("No keep_alife"))
291 if self._connected is not None: 291 if self._connected_d is not None:
292 self.host_app.bridge.disconnected( 292 self.host_app.bridge.disconnected(
293 self.profile 293 self.profile
294 ) # we send the signal to the clients 294 ) # we send the signal to the clients
295 self._connected.callback(None) 295 self._connected_d.callback(None)
296 self.host_app.purgeEntity( 296 self.host_app.purgeEntity(
297 self.profile 297 self.profile
298 ) # and we remove references to this client 298 ) # and we remove references to this client
299 log.info( 299 log.info(
300 _(u"********** [{profile}] DISCONNECTED **********").format( 300 _(u"********** [{profile}] DISCONNECTED **********").format(
301 profile=self.profile 301 profile=self.profile
302 ) 302 )
303 ) 303 )
304 if not self.conn_deferred.called: 304 if not self.conn_deferred.called:
305 # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody, 305 if reason is None:
306 # we should have the real error)
307 if self._reason is None:
308 err = error.StreamError(u"Server unexpectedly closed the connection") 306 err = error.StreamError(u"Server unexpectedly closed the connection")
309 else: 307 else:
310 err = self._reason 308 err = reason
311 try: 309 try:
312 if err.value.args[0][0][2] == "certificate verify failed": 310 if err.value.args[0][0][2] == "certificate verify failed":
313 err = exceptions.InvalidCertificate( 311 err = exceptions.InvalidCertificate(
314 _(u"Your server certificate is not valid " 312 _(u"Your server certificate is not valid "
315 u"(its identity can't be checked).\n\n" 313 u"(its identity can't be checked).\n\n"
316 u"This should never happen and may indicate that " 314 u"This should never happen and may indicate that "
317 u"somebody is trying to spy on you.\n" 315 u"somebody is trying to spy on you.\n"
318 u"Please contact your server administrator.")) 316 u"Please contact your server administrator."))
317 self.factory.continueTrying = 0
319 except (IndexError, TypeError): 318 except (IndexError, TypeError):
320 pass 319 pass
321 self.conn_deferred.errback(err) 320 self.conn_deferred.errback(err)
321
322 def _disconnected(self, reason):
323 super(SatXMPPEntity, self)._disconnected(reason)
324 if not self.host_app.trigger.point("disconnected", self, reason):
325 return
326 self.disconnectProfile(reason)
322 327
323 @defer.inlineCallbacks 328 @defer.inlineCallbacks
324 def _cleanConnection(self, __): 329 def _cleanConnection(self, __):
325 """method called on disconnection 330 """method called on disconnection
326 331
331 disconnected_cb = getattr(plugin, trigger_name, None) 336 disconnected_cb = getattr(plugin, trigger_name, None)
332 if disconnected_cb is not None: 337 if disconnected_cb is not None:
333 yield disconnected_cb(self) 338 yield disconnected_cb(self)
334 339
335 def isConnected(self): 340 def isConnected(self):
336 return self._connected is not None 341 try:
342 return bool(self.xmlstream.transport.connected)
343 except AttributeError:
344 return False
337 345
338 def entityDisconnect(self): 346 def entityDisconnect(self):
347 if not self.host_app.trigger.point("disconnecting", self):
348 return
339 log.info(_(u"Disconnecting...")) 349 log.info(_(u"Disconnecting..."))
340 self.stopService() 350 self.stopService()
341 if self._connected is not None: 351 if self._connected_d is not None:
342 return self._connected 352 return self._connected_d
343 else: 353 else:
344 return defer.succeed(None) 354 return defer.succeed(None)
345 355
346 ## sending ## 356 ## sending ##
347 357
733 sendHistory = ( 743 sendHistory = (
734 False 744 False
735 ) # XXX: set to True from entry plugin to keep messages in history for received 745 ) # XXX: set to True from entry plugin to keep messages in history for received
736 # messages 746 # messages
737 747
738 def __init__( 748 def __init__(self, host_app, profile, component_jid, password, host=None, port=None,
739 self, 749 max_retries=C.XMPP_MAX_RETRIES):
740 host_app,
741 profile,
742 component_jid,
743 password,
744 host=None,
745 port=None,
746 max_retries=C.XMPP_MAX_RETRIES,
747 ):
748 self.started = time.time() 750 self.started = time.time()
749 if port is None: 751 if port is None:
750 port = C.XMPP_COMPONENT_PORT 752 port = C.XMPP_COMPONENT_PORT
751 753
752 ## entry point ## 754 ## entry point ##