Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0198.py @ 2691:1ecceac3df96
plugin XEP-0198: Stream Management implementation:
- hooks can now be set in stream onElement and send methods
- xmllog refactored to use new hooks
- client.isConnected now uses transport.connected method
- fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour)
- new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized")
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 18 Nov 2018 15:49:46 +0100 |
parents | |
children | d0466af33483 |
comparison
equal
deleted
inserted
replaced
2690:56bfe1b79204 | 2691:1ecceac3df96 |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SàT plugin for managing raw XML log | |
5 # Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from sat.core.i18n import _ | |
21 from sat.core.constants import Const as C | |
22 from sat.core import exceptions | |
23 from sat.core.log import getLogger | |
24 from twisted.words.protocols.jabber import client as jabber_client | |
25 from twisted.words.protocols.jabber import xmlstream | |
26 from twisted.words.xish import domish | |
27 from twisted.internet import defer | |
28 from twisted.internet import task, reactor | |
29 from functools import partial | |
30 from wokkel import disco, iwokkel | |
31 from zope.interface import implements | |
32 import collections | |
33 import time | |
34 | |
35 log = getLogger(__name__) | |
36 | |
37 PLUGIN_INFO = { | |
38 C.PI_NAME: "Stream Management", | |
39 C.PI_IMPORT_NAME: "XEP-0198", | |
40 C.PI_TYPE: "XEP", | |
41 C.PI_MODES: C.PLUG_MODE_BOTH, | |
42 C.PI_PROTOCOLS: ["XEP-0198"], | |
43 C.PI_DEPENDENCIES: [], | |
44 C.PI_MAIN: "XEP_0198", | |
45 C.PI_HANDLER: "yes", | |
46 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), | |
47 } | |
48 | |
49 NS_SM = u"urn:xmpp:sm:3" | |
50 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]' | |
51 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]' | |
52 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]' | |
53 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]' | |
54 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]' | |
55 SM_H_REQUEST = '/h[@xmlns="' + NS_SM + '"]' | |
56 # Max number of stanza to send before requesting ack | |
57 MAX_STANZA_ACK_R = 5 | |
58 # Max number of seconds before requesting ack | |
59 MAX_DELAY_ACK_R = 30 | |
60 MAX_COUNTER = 2**32 | |
61 RESUME_MAX = 5*60 | |
62 | |
63 | |
64 class ProfileSessionData(object): | |
65 out_counter = 0 | |
66 in_counter = 0 | |
67 session_id = None | |
68 location = None | |
69 session_max = None | |
70 # True when an ack answer is expected | |
71 ack_requested = False | |
72 last_ack_r = 0 | |
73 disconnected_time = None | |
74 | |
75 def __init__(self, callback, **kw): | |
76 self.buffer = collections.deque() | |
77 self.buffer_idx = 0 | |
78 self._enabled = False | |
79 self.timer = None | |
80 self.callback_data = (callback, kw) | |
81 | |
82 @property | |
83 def enabled(self): | |
84 return self._enabled | |
85 | |
86 @enabled.setter | |
87 def enabled(self, enabled): | |
88 if enabled: | |
89 if self._enabled: | |
90 raise exceptions.InternalError( | |
91 u"Stream Management can't be enabled twice") | |
92 self._enabled = True | |
93 callback, kw = self.callback_data | |
94 self.timer = task.LoopingCall(callback, **kw) | |
95 self.timer.start(MAX_DELAY_ACK_R, now=False) | |
96 else: | |
97 self._enabled = False | |
98 if self.timer is not None: | |
99 self.timer.stop() | |
100 self.timer = None | |
101 | |
102 @property | |
103 def resume_enabled(self): | |
104 return self.session_id is not None | |
105 | |
106 def reset(self): | |
107 self.enabled = False | |
108 self.buffer.clear() | |
109 self.buffer_idx = 0 | |
110 self.in_counter = self.out_counter = 0 | |
111 self.session_id = self.location = None | |
112 self.ack_requested = False | |
113 self.last_ack_r = 0 | |
114 | |
115 | |
116 class XEP_0198(object): | |
117 # FIXME: location is not handled yet | |
118 | |
119 def __init__(self, host): | |
120 log.info(_("Plugin Stream Management initialization")) | |
121 self.host = host | |
122 host.registerNamespace(u'sm', NS_SM) | |
123 host.trigger.add("stream_hooks", self.addHooks) | |
124 host.trigger.add("xml_init", self._XMLInitTrigger) | |
125 host.trigger.add("disconnecting", self._disconnectingTrigger) | |
126 host.trigger.add("disconnected", self._disconnectedTrigger) | |
127 | |
128 def profileConnecting(self, client): | |
129 client._xep_0198_session = ProfileSessionData(callback=self.checkAcks, | |
130 client=client) | |
131 | |
132 def getHandler(self, client): | |
133 return XEP_0198_handler(self) | |
134 | |
135 def addHooks(self, client, receive_hooks, send_hooks): | |
136 """Add hooks to handle in/out stanzas counters""" | |
137 receive_hooks.append(partial(self.onReceive, client=client)) | |
138 send_hooks.append(partial(self.onSend, client=client)) | |
139 return True | |
140 | |
141 def _XMLInitTrigger(self, client): | |
142 """Enable or resume a stream mangement""" | |
143 if not (NS_SM, u'sm') in client.xmlstream.features: | |
144 log.warning(_( | |
145 u"Your server doesn't support stream management ({namespace}), this is " | |
146 u"used to improve connection problems detection (like network outages). " | |
147 u"Please ask your server administrator to enable this feature.".format( | |
148 namespace=NS_SM))) | |
149 return True | |
150 session = client._xep_0198_session | |
151 | |
152 # a disconnect timer from a previous disconnection may still be active | |
153 try: | |
154 disconnect_timer = session.disconnect_timer | |
155 except AttributeError: | |
156 pass | |
157 else: | |
158 if disconnect_timer.active(): | |
159 disconnect_timer.cancel() | |
160 del session.disconnect_timer | |
161 | |
162 if session.resume_enabled: | |
163 # we are resuming a session | |
164 resume_elt = domish.Element((NS_SM, 'resume')) | |
165 resume_elt['h'] = unicode(session.in_counter) | |
166 resume_elt['previd'] = session.session_id | |
167 client.send(resume_elt) | |
168 session.resuming = True | |
169 # session.enabled will be set on <resumed/> reception | |
170 return False | |
171 else: | |
172 # we start a new session | |
173 assert session.out_counter == 0 | |
174 enable_elt = domish.Element((NS_SM, 'enable')) | |
175 enable_elt[u'resume'] = u'true' | |
176 client.send(enable_elt) | |
177 session.enabled = True | |
178 return True | |
179 | |
180 def _disconnectingTrigger(self, client): | |
181 session = client._xep_0198_session | |
182 if session.enabled: | |
183 self.sendAck(client) | |
184 # This is a requested disconnection, so we can reset the session | |
185 # to disable resuming and close normally the stream | |
186 session.reset() | |
187 return True | |
188 | |
189 def _disconnectedTrigger(self, client, reason): | |
190 session = client._xep_0198_session | |
191 session.enabled = False | |
192 if session.resume_enabled: | |
193 session.disconnected_time = time.time() | |
194 session.disconnect_timer = reactor.callLater(session.session_max, | |
195 client.disconnectProfile, | |
196 reason) | |
197 # disconnectProfile must not be called at this point | |
198 # because session can be resumed | |
199 return False | |
200 else: | |
201 return True | |
202 | |
203 def checkAcks(self, client): | |
204 """Request ack if needed""" | |
205 session = client._xep_0198_session | |
206 # log.debug("checkAcks (in_counter={}, out_counter={}, buf len={}, buf idx={})" | |
207 # .format(session.in_counter, session.out_counter, len(session.buffer), | |
208 # session.buffer_idx)) | |
209 if session.ack_requested or not session.buffer: | |
210 return | |
211 if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R | |
212 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R): | |
213 self.requestAck(client) | |
214 session.ack_requested = True | |
215 session.last_ack_r = time.time() | |
216 | |
217 def updateBuffer(self, session, server_acked): | |
218 """Update buffer and buffer_index""" | |
219 if server_acked > session.buffer_idx: | |
220 diff = server_acked - session.buffer_idx | |
221 for i in xrange(diff): | |
222 session.buffer.pop() | |
223 session.buffer_idx += diff | |
224 | |
225 def sendAck(self, client): | |
226 """Send an answer element with current IN counter""" | |
227 a_elt = domish.Element((NS_SM, 'a')) | |
228 a_elt['h'] = unicode(client._xep_0198_session.in_counter) | |
229 client.send(a_elt) | |
230 | |
231 def requestAck(self, client): | |
232 """Send a request element""" | |
233 r_elt = domish.Element((NS_SM, 'r')) | |
234 client.send(r_elt) | |
235 | |
236 def _connectionFailed(self, failure_, connector): | |
237 normal_host, normal_port = connector.normal_location | |
238 del connector.normal_location | |
239 log.warning(_( | |
240 u"Connection failed using location given by server (host: {host}, port: " | |
241 u"{port}), switching to normal host and port (host: {normal_host}, port: " | |
242 u"{normal_port})".format(host=connector.host, port=connector.port, | |
243 normal_host=normal_host, normal_port=normal_port))) | |
244 connector.host, connector.port = normal_host, normal_port | |
245 connector.connectionFailed = connector.connectionFailed_ori | |
246 del connector.connectionFailed_ori | |
247 return connector.connectionFailed(failure_) | |
248 | |
249 def onEnabled(self, enabled_elt, client): | |
250 session = client._xep_0198_session | |
251 session.in_counter = 0 | |
252 | |
253 # we check that resuming is possible and that we have a session id | |
254 resume = C.bool(enabled_elt.getAttribute(u'resume')) | |
255 session_id = enabled_elt.getAttribute(u'id') | |
256 if not session_id: | |
257 log.warning(_(u'Incorrect <enabled/> element received, no "id" attribute')) | |
258 if not resume or not session_id: | |
259 log.warning(_( | |
260 u"You're server doesn't support session resuming with stream management, " | |
261 u"please contact your server administrator to enable it")) | |
262 return | |
263 | |
264 session.session_id = session_id | |
265 | |
266 # XXX: we disable resource binding, which must not be done | |
267 # when we resume the session. | |
268 client.factory.authenticator.res_binding = False | |
269 | |
270 # location, in case server want resuming session to be elsewhere | |
271 try: | |
272 location = enabled_elt[u'location'] | |
273 except KeyError: | |
274 pass | |
275 else: | |
276 # TODO: handle IPv6 here (in brackets, cf. XEP) | |
277 try: | |
278 domain, port = location.split(':', 1) | |
279 port = int(port) | |
280 except ValueError: | |
281 log.warning(_(u"Invalid location received: {location}") | |
282 .format(location=location)) | |
283 else: | |
284 session.location = (domain, port) | |
285 # we monkey patch connector to use the new location | |
286 connector = client.xmlstream.transport.connector | |
287 connector.normal_location = connector.host, connector.port | |
288 connector.host = domain | |
289 connector.port = port | |
290 connector.connectionFailed_ori = connector.connectionFailed | |
291 connector.connectionFailed = partial(self._connectionFailed, | |
292 connector=connector) | |
293 | |
294 # resuming time | |
295 try: | |
296 max_s = int(enabled_elt[u'max']) | |
297 except (ValueError, KeyError) as e: | |
298 if isinstance(e, ValueError): | |
299 log.warning(_(u'Invalid "max" attribute')) | |
300 max_s = RESUME_MAX | |
301 log.info(_(u"Using default session max value ({max_s} s).".format( | |
302 max_s=max_s))) | |
303 log.info(_(u"Stream Management enabled")) | |
304 else: | |
305 log.info(_( | |
306 u"Stream Management enabled, with a resumption time of {res_m} min" | |
307 .format(res_m = max_s/60))) | |
308 session.session_max = max_s | |
309 | |
310 def onResumed(self, enabled_elt, client): | |
311 session = client._xep_0198_session | |
312 assert not session.enabled | |
313 del session.resuming | |
314 server_acked = int(enabled_elt['h']) | |
315 self.updateBuffer(session, server_acked) | |
316 resend_count = len(session.buffer) | |
317 # we resend all stanza which have not been received properly | |
318 while True: | |
319 try: | |
320 stanza = session.buffer.pop() | |
321 except IndexError: | |
322 break | |
323 else: | |
324 client.send(stanza) | |
325 # now we can continue the session | |
326 session.enabled = True | |
327 d_time = time.time() - session.disconnected_time | |
328 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} " | |
329 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) | |
330 | |
331 def onFailed(self, failed_elt, client): | |
332 session = client._xep_0198_session | |
333 condition_elt = failed_elt.firstChildElement() | |
334 session.reset() | |
335 | |
336 try: | |
337 del session.resuming | |
338 except AttributeError: | |
339 # stream management can't be started at all | |
340 msg = _(u"Can't use stream management") | |
341 if condition_elt is None: | |
342 log.error(msg + u'.') | |
343 else: | |
344 log.error(_(u"{msg}: {reason}").format( | |
345 msg=msg, reason=condition_elt.name)) | |
346 else: | |
347 # only stream resumption failed, we can try full session init | |
348 # XXX: we try to start full session init from this point, with many | |
349 # variables/attributes already initialised with a potentially different | |
350 # jid. This is experimental and may not be safe. It may be more | |
351 # secured to abord the connection and restart everything with a fresh | |
352 # client. | |
353 msg = _(u"stream resumption not possible, restarting full session") | |
354 | |
355 if condition_elt is None: | |
356 log.warning(u'{msg}.'.format(msg=msg)) | |
357 else: | |
358 log.warning(u"{msg}: {reason}".format( | |
359 msg=msg, reason=condition_elt.name)) | |
360 # stream resumption failed, but we still can do normal stream management | |
361 # we restore attributes as if the session was new, and init stream | |
362 # we keep everything initialized, and only do binding, roster request | |
363 # and initial presence sending. | |
364 if client.conn_deferred.called: | |
365 client.conn_deferred = defer.Deferred() | |
366 else: | |
367 log.error(u"conn_deferred should be called at this point") | |
368 # we need to recreate roster | |
369 client.handlers.remove(client.roster) | |
370 client.roster = client.roster.__class__(self.host) | |
371 client.roster.setHandlerParent(client) | |
372 # bind init is not done when resuming is possible, so we have to do it now | |
373 bind_init = jabber_client.BindInitializer(client.xmlstream) | |
374 bind_init.required = True | |
375 d = bind_init.start() | |
376 # we set the jid, which may have changed | |
377 d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid)) | |
378 # we call the trigger who will send the <enable/> element | |
379 d.addCallback(lambda __: self._XMLInitTrigger(client)) | |
380 # then we have to re-request the roster, as changes may have occured | |
381 d.addCallback(lambda __: client.roster.requestRoster()) | |
382 # we add got_roster to be sure to have roster before sending initial presence | |
383 d.addCallback(lambda __: client.roster.got_roster) | |
384 # initial presence must be sent manually | |
385 d.addCallback(lambda __: client.presence.available()) | |
386 | |
387 def onReceive(self, element, client): | |
388 session = client._xep_0198_session | |
389 if session.enabled and element.name.lower() in C.STANZA_NAMES: | |
390 session.in_counter += 1 % MAX_COUNTER | |
391 | |
392 def onSend(self, obj, client): | |
393 session = client._xep_0198_session | |
394 if (session.enabled | |
395 and domish.IElement.providedBy(obj) | |
396 and obj.name.lower() in C.STANZA_NAMES): | |
397 session.out_counter += 1 % MAX_COUNTER | |
398 session.buffer.appendleft(obj) | |
399 self.checkAcks(client) | |
400 | |
401 def onAckRequest(self, r_elt, client): | |
402 self.sendAck(client) | |
403 | |
404 def onAckAnswer(self, a_elt, client): | |
405 session = client._xep_0198_session | |
406 session.ack_requested = False | |
407 try: | |
408 server_acked = int(a_elt['h']) | |
409 except ValueError: | |
410 log.warning(_(u"Server returned invalid ack element, disabling stream " | |
411 u"management: {xml}").format(xml=a_elt)) | |
412 session.enabled = False | |
413 return | |
414 | |
415 if server_acked > session.out_counter: | |
416 log.error(_(u"Server acked more stanzas than we have sent, disabling stream " | |
417 u"management.")) | |
418 session.reset() | |
419 return | |
420 | |
421 self.updateBuffer(session, server_acked) | |
422 self.checkAcks(client) | |
423 | |
424 | |
425 class XEP_0198_handler(xmlstream.XMPPHandler): | |
426 implements(iwokkel.IDisco) | |
427 | |
428 def __init__(self, plugin_parent): | |
429 self.plugin_parent = plugin_parent | |
430 self.host = plugin_parent.host | |
431 | |
432 def connectionInitialized(self): | |
433 self.xmlstream.addObserver( | |
434 SM_ENABLED, self.plugin_parent.onEnabled, client=self.parent | |
435 ) | |
436 self.xmlstream.addObserver( | |
437 SM_RESUMED, self.plugin_parent.onResumed, client=self.parent | |
438 ) | |
439 self.xmlstream.addObserver( | |
440 SM_FAILED, self.plugin_parent.onFailed, client=self.parent | |
441 ) | |
442 self.xmlstream.addObserver( | |
443 SM_R_REQUEST, self.plugin_parent.onAckRequest, client=self.parent | |
444 ) | |
445 self.xmlstream.addObserver( | |
446 SM_A_REQUEST, self.plugin_parent.onAckAnswer, client=self.parent | |
447 ) | |
448 | |
449 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
450 return [disco.DiscoFeature(NS_SM)] | |
451 | |
452 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
453 return [] |