Mercurial > libervia-backend
comparison sat/core/xmpp.py @ 2691:1ecceac3df96
plugin XEP-0198: Stream Management implementation:
- hooks can now be set in stream onElement and send methods
- xmllog refactored to use new hooks
- client.isConnected now uses transport.connected method
- fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour)
- new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized")
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 18 Nov 2018 15:49:46 +0100 |
parents | 943e78e18882 |
children | f64f1158a26e |
comparison
equal
deleted
inserted
replaced
2690:56bfe1b79204 | 2691:1ecceac3df96 |
---|---|
47 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info" | 47 NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info" |
48 | 48 |
49 | 49 |
50 class SatXMPPEntity(object): | 50 class SatXMPPEntity(object): |
51 """Common code for Client and Component""" | 51 """Common code for Client and Component""" |
52 _reason = None # reason of disconnection | |
53 | 52 |
54 def __init__(self, host_app, profile, max_retries): | 53 def __init__(self, host_app, profile, max_retries): |
55 | 54 factory = self.factory |
56 self.factory.clientConnectionLost = self.connectionLost | 55 factory.maxRetries = max_retries |
57 self.factory.maxRetries = max_retries | 56 factory.maxDelay = 30 |
58 # when self._connected is None, we are not connected | 57 # when self._connected_d is None, we are not connected |
59 # else, it's a deferred which fire on disconnection | 58 # else, it's a deferred which fire on disconnection |
60 self._connected = None | 59 self._connected_d = None |
61 self.profile = profile | 60 self.profile = profile |
62 self.host_app = host_app | 61 self.host_app = host_app |
63 self.cache = cache.Cache(host_app, profile) | 62 self.cache = cache.Cache(host_app, profile) |
64 self._mess_id_uid = {} # map from message id to uid used in history. | 63 self._mess_id_uid = {} # map from message id to uid used in history. |
65 # Key: (full_jid,message_id) Value: uid | 64 # Key: (full_jid,message_id) Value: uid |
65 # this Deferred fire when entity is connected | |
66 self.conn_deferred = defer.Deferred() | 66 self.conn_deferred = defer.Deferred() |
67 self._progress_cb = {} # callback called when a progress is requested | 67 self._progress_cb = {} # callback called when a progress is requested |
68 # (key = progress id) | 68 # (key = progress id) |
69 self.actions = {} # used to keep track of actions for retrieval (key = action_id) | 69 self.actions = {} # used to keep track of actions for retrieval (key = action_id) |
70 self.encryption = encryption.EncryptionHandler(self) | 70 self.encryption = encryption.EncryptionHandler(self) |
143 | 143 |
144 password = yield host.memory.asyncGetParamA( | 144 password = yield host.memory.asyncGetParamA( |
145 "Password", "Connection", profile_key=profile | 145 "Password", "Connection", profile_key=profile |
146 ) | 146 ) |
147 entity = host.profiles[profile] = cls( | 147 entity = host.profiles[profile] = cls( |
148 host, | 148 host, profile, jid.JID(host.memory.getParamA("JabberID", "Connection", |
149 profile, | 149 profile_key=profile)), password, |
150 jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)), | 150 host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", |
151 password, | 151 profile_key=profile) or None, |
152 host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile) | 152 port, max_retries, |
153 or None, | 153 ) |
154 port, | |
155 max_retries, | |
156 ) | |
157 | 154 |
158 entity._createSubProtocols() | 155 entity._createSubProtocols() |
159 | 156 |
160 entity.fallBack = SatFallbackHandler(host) | 157 entity.fallBack = SatFallbackHandler(host) |
161 entity.fallBack.setHandlerParent(entity) | 158 entity.fallBack.setHandlerParent(entity) |
170 | 167 |
171 plugin_conn_cb = yield entity._callConnectionTriggers() | 168 plugin_conn_cb = yield entity._callConnectionTriggers() |
172 | 169 |
173 entity.startService() | 170 entity.startService() |
174 | 171 |
175 yield entity.getConnectionDeferred() | 172 yield entity.conn_deferred |
176 | 173 |
177 yield defer.maybeDeferred(entity.entityConnected) | 174 yield defer.maybeDeferred(entity.entityConnected) |
178 | 175 |
179 # Call profileConnected callback for all plugins, | 176 # Call profileConnected callback for all plugins, |
180 # and print error message if any of them fails | 177 # and print error message if any of them fails |
203 # TODO: mesure launch time of each plugin | 200 # TODO: mesure launch time of each plugin |
204 | 201 |
205 # we finally send our presence | 202 # we finally send our presence |
206 entity.presence.available() | 203 entity.presence.available() |
207 | 204 |
208 def getConnectionDeferred(self): | |
209 """Return a deferred which fire when the client is connected""" | |
210 return self.conn_deferred | |
211 | |
212 def _disconnectionCb(self, __): | 205 def _disconnectionCb(self, __): |
213 self._connected = None | 206 self._connected_d = None |
214 | 207 |
215 def _disconnectionEb(self, failure_): | 208 def _disconnectionEb(self, failure_): |
216 log.error(_(u"Error while disconnecting: {}".format(failure_))) | 209 log.error(_(u"Error while disconnecting: {}".format(failure_))) |
217 | 210 |
218 def _authd(self, xmlstream): | 211 def _authd(self, xmlstream): |
219 if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile): | 212 super(SatXMPPEntity, self)._authd(xmlstream) |
213 log.debug(_(u"{profile} identified").format(profile=self.profile)) | |
214 self.streamInitialized() | |
215 | |
216 def _finish_connection(self, __): | |
217 self.conn_deferred.callback(None) | |
218 | |
219 def streamInitialized(self): | |
220 """Called after _authd""" | |
221 log.debug(_(u"XML stream is initialized")) | |
222 if not self.host_app.trigger.point("xml_init", self): | |
220 return | 223 return |
221 super(SatXMPPEntity, self)._authd(xmlstream) | 224 self.postStreamInit() |
222 | 225 |
223 if self._reason is not None: | 226 def postStreamInit(self): |
224 # if we have had trouble to connect we can reset | 227 """Workflow after stream initalisation.""" |
225 # the exception as the connection is now working. | |
226 del self._reason | |
227 | |
228 # the following Deferred is used to know when we are connected | |
229 # so we need to be set it to None when connection is lost | |
230 self._connected = defer.Deferred() | |
231 self._connected.addCallback(self._cleanConnection) | |
232 self._connected.addCallback(self._disconnectionCb) | |
233 self._connected.addErrback(self._disconnectionEb) | |
234 | |
235 log.info( | 228 log.info( |
236 _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile) | 229 _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile) |
237 ) | 230 ) |
238 self.streamInitialized() | 231 |
232 # the following Deferred is used to know when we are connected | |
233 # so we need to be set it to None when connection is lost | |
234 self._connected_d = defer.Deferred() | |
235 self._connected_d.addCallback(self._cleanConnection) | |
236 self._connected_d.addCallback(self._disconnectionCb) | |
237 self._connected_d.addErrback(self._disconnectionEb) | |
238 | |
239 self.host_app.bridge.connected( | 239 self.host_app.bridge.connected( |
240 self.profile, unicode(self.jid) | 240 self.profile, unicode(self.jid) |
241 ) # we send the signal to the clients | 241 ) # we send the signal to the clients |
242 | 242 |
243 def _finish_connection(self, __): | 243 |
244 self.conn_deferred.callback(None) | |
245 | |
246 def streamInitialized(self): | |
247 """Called after _authd""" | |
248 log.debug(_(u"XML stream is initialized")) | |
249 self.keep_alife = task.LoopingCall( | 244 self.keep_alife = task.LoopingCall( |
250 self.xmlstream.send, " " | 245 self.xmlstream.send, " " |
251 ) # Needed to avoid disconnection (specially with openfire) | 246 ) # Needed to avoid disconnection (specially with openfire) |
252 self.keep_alife.start(C.XMPP_KEEP_ALIFE) | 247 self.keep_alife.start(C.XMPP_KEEP_ALIFE) |
253 | |
254 self.disco = SatDiscoProtocol(self) | 248 self.disco = SatDiscoProtocol(self) |
255 self.disco.setHandlerParent(self) | 249 self.disco.setHandlerParent(self) |
256 self.discoHandler = disco.DiscoHandler() | 250 self.discoHandler = disco.DiscoHandler() |
257 self.discoHandler.setHandlerParent(self) | 251 self.discoHandler.setHandlerParent(self) |
258 disco_d = defer.succeed(None) | 252 disco_d = defer.succeed(None) |
276 # we already chained an errback, no need to raise an exception | 270 # we already chained an errback, no need to raise an exception |
277 pass | 271 pass |
278 | 272 |
279 ## connection ## | 273 ## connection ## |
280 | 274 |
281 def _disconnected(self, reason): | 275 def _connected(self, xs): |
282 # we have to save the reason of disconnection, otherwise it would be lost | 276 send_hooks = [] |
283 self._reason = reason | 277 receive_hooks = [] |
284 super(SatXMPPEntity, self)._disconnected(reason) | 278 self.host_app.trigger.point( |
285 | 279 "stream_hooks", self, receive_hooks, send_hooks) |
286 def connectionLost(self, connector, reason): | 280 for hook in receive_hooks: |
281 xs.addHook(C.STREAM_HOOK_RECEIVE, hook) | |
282 for hook in send_hooks: | |
283 xs.addHook(C.STREAM_HOOK_SEND, hook) | |
284 super(SatXMPPEntity, self)._connected(xs) | |
285 | |
286 def disconnectProfile(self, reason): | |
287 try: | 287 try: |
288 self.keep_alife.stop() | 288 self.keep_alife.stop() |
289 except AttributeError: | 289 except AttributeError: |
290 log.debug(_("No keep_alife")) | 290 log.debug(_("No keep_alife")) |
291 if self._connected is not None: | 291 if self._connected_d is not None: |
292 self.host_app.bridge.disconnected( | 292 self.host_app.bridge.disconnected( |
293 self.profile | 293 self.profile |
294 ) # we send the signal to the clients | 294 ) # we send the signal to the clients |
295 self._connected.callback(None) | 295 self._connected_d.callback(None) |
296 self.host_app.purgeEntity( | 296 self.host_app.purgeEntity( |
297 self.profile | 297 self.profile |
298 ) # and we remove references to this client | 298 ) # and we remove references to this client |
299 log.info( | 299 log.info( |
300 _(u"********** [{profile}] DISCONNECTED **********").format( | 300 _(u"********** [{profile}] DISCONNECTED **********").format( |
301 profile=self.profile | 301 profile=self.profile |
302 ) | 302 ) |
303 ) | 303 ) |
304 if not self.conn_deferred.called: | 304 if not self.conn_deferred.called: |
305 # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody, | 305 if reason is None: |
306 # we should have the real error) | |
307 if self._reason is None: | |
308 err = error.StreamError(u"Server unexpectedly closed the connection") | 306 err = error.StreamError(u"Server unexpectedly closed the connection") |
309 else: | 307 else: |
310 err = self._reason | 308 err = reason |
311 try: | 309 try: |
312 if err.value.args[0][0][2] == "certificate verify failed": | 310 if err.value.args[0][0][2] == "certificate verify failed": |
313 err = exceptions.InvalidCertificate( | 311 err = exceptions.InvalidCertificate( |
314 _(u"Your server certificate is not valid " | 312 _(u"Your server certificate is not valid " |
315 u"(its identity can't be checked).\n\n" | 313 u"(its identity can't be checked).\n\n" |
316 u"This should never happen and may indicate that " | 314 u"This should never happen and may indicate that " |
317 u"somebody is trying to spy on you.\n" | 315 u"somebody is trying to spy on you.\n" |
318 u"Please contact your server administrator.")) | 316 u"Please contact your server administrator.")) |
317 self.factory.continueTrying = 0 | |
319 except (IndexError, TypeError): | 318 except (IndexError, TypeError): |
320 pass | 319 pass |
321 self.conn_deferred.errback(err) | 320 self.conn_deferred.errback(err) |
321 | |
322 def _disconnected(self, reason): | |
323 super(SatXMPPEntity, self)._disconnected(reason) | |
324 if not self.host_app.trigger.point("disconnected", self, reason): | |
325 return | |
326 self.disconnectProfile(reason) | |
322 | 327 |
323 @defer.inlineCallbacks | 328 @defer.inlineCallbacks |
324 def _cleanConnection(self, __): | 329 def _cleanConnection(self, __): |
325 """method called on disconnection | 330 """method called on disconnection |
326 | 331 |
331 disconnected_cb = getattr(plugin, trigger_name, None) | 336 disconnected_cb = getattr(plugin, trigger_name, None) |
332 if disconnected_cb is not None: | 337 if disconnected_cb is not None: |
333 yield disconnected_cb(self) | 338 yield disconnected_cb(self) |
334 | 339 |
335 def isConnected(self): | 340 def isConnected(self): |
336 return self._connected is not None | 341 try: |
342 return bool(self.xmlstream.transport.connected) | |
343 except AttributeError: | |
344 return False | |
337 | 345 |
338 def entityDisconnect(self): | 346 def entityDisconnect(self): |
347 if not self.host_app.trigger.point("disconnecting", self): | |
348 return | |
339 log.info(_(u"Disconnecting...")) | 349 log.info(_(u"Disconnecting...")) |
340 self.stopService() | 350 self.stopService() |
341 if self._connected is not None: | 351 if self._connected_d is not None: |
342 return self._connected | 352 return self._connected_d |
343 else: | 353 else: |
344 return defer.succeed(None) | 354 return defer.succeed(None) |
345 | 355 |
346 ## sending ## | 356 ## sending ## |
347 | 357 |
733 sendHistory = ( | 743 sendHistory = ( |
734 False | 744 False |
735 ) # XXX: set to True from entry plugin to keep messages in history for received | 745 ) # XXX: set to True from entry plugin to keep messages in history for received |
736 # messages | 746 # messages |
737 | 747 |
738 def __init__( | 748 def __init__(self, host_app, profile, component_jid, password, host=None, port=None, |
739 self, | 749 max_retries=C.XMPP_MAX_RETRIES): |
740 host_app, | |
741 profile, | |
742 component_jid, | |
743 password, | |
744 host=None, | |
745 port=None, | |
746 max_retries=C.XMPP_MAX_RETRIES, | |
747 ): | |
748 self.started = time.time() | 750 self.started = time.time() |
749 if port is None: | 751 if port is None: |
750 port = C.XMPP_COMPONENT_PORT | 752 port = C.XMPP_COMPONENT_PORT |
751 | 753 |
752 ## entry point ## | 754 ## entry point ## |