comparison sat/plugins/plugin_xep_0198.py @ 2691:1ecceac3df96

plugin XEP-0198: Stream Management implementation: - hooks can now be set in stream onElement and send methods - xmllog refactored to use new hooks - client.isConnected now uses transport.connected method - fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour) - new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized")
author Goffi <goffi@goffi.org>
date Sun, 18 Nov 2018 15:49:46 +0100
parents
children d0466af33483
comparison
equal deleted inserted replaced
2690:56bfe1b79204 2691:1ecceac3df96
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SàT plugin for managing raw XML log
5 # Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core.constants import Const as C
22 from sat.core import exceptions
23 from sat.core.log import getLogger
24 from twisted.words.protocols.jabber import client as jabber_client
25 from twisted.words.protocols.jabber import xmlstream
26 from twisted.words.xish import domish
27 from twisted.internet import defer
28 from twisted.internet import task, reactor
29 from functools import partial
30 from wokkel import disco, iwokkel
31 from zope.interface import implements
32 import collections
33 import time
34
35 log = getLogger(__name__)
36
37 PLUGIN_INFO = {
38 C.PI_NAME: "Stream Management",
39 C.PI_IMPORT_NAME: "XEP-0198",
40 C.PI_TYPE: "XEP",
41 C.PI_MODES: C.PLUG_MODE_BOTH,
42 C.PI_PROTOCOLS: ["XEP-0198"],
43 C.PI_DEPENDENCIES: [],
44 C.PI_MAIN: "XEP_0198",
45 C.PI_HANDLER: "yes",
46 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""),
47 }
48
49 NS_SM = u"urn:xmpp:sm:3"
50 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]'
51 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]'
52 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]'
53 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]'
54 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]'
55 SM_H_REQUEST = '/h[@xmlns="' + NS_SM + '"]'
56 # Max number of stanza to send before requesting ack
57 MAX_STANZA_ACK_R = 5
58 # Max number of seconds before requesting ack
59 MAX_DELAY_ACK_R = 30
60 MAX_COUNTER = 2**32
61 RESUME_MAX = 5*60
62
63
64 class ProfileSessionData(object):
65 out_counter = 0
66 in_counter = 0
67 session_id = None
68 location = None
69 session_max = None
70 # True when an ack answer is expected
71 ack_requested = False
72 last_ack_r = 0
73 disconnected_time = None
74
75 def __init__(self, callback, **kw):
76 self.buffer = collections.deque()
77 self.buffer_idx = 0
78 self._enabled = False
79 self.timer = None
80 self.callback_data = (callback, kw)
81
82 @property
83 def enabled(self):
84 return self._enabled
85
86 @enabled.setter
87 def enabled(self, enabled):
88 if enabled:
89 if self._enabled:
90 raise exceptions.InternalError(
91 u"Stream Management can't be enabled twice")
92 self._enabled = True
93 callback, kw = self.callback_data
94 self.timer = task.LoopingCall(callback, **kw)
95 self.timer.start(MAX_DELAY_ACK_R, now=False)
96 else:
97 self._enabled = False
98 if self.timer is not None:
99 self.timer.stop()
100 self.timer = None
101
102 @property
103 def resume_enabled(self):
104 return self.session_id is not None
105
106 def reset(self):
107 self.enabled = False
108 self.buffer.clear()
109 self.buffer_idx = 0
110 self.in_counter = self.out_counter = 0
111 self.session_id = self.location = None
112 self.ack_requested = False
113 self.last_ack_r = 0
114
115
116 class XEP_0198(object):
117 # FIXME: location is not handled yet
118
119 def __init__(self, host):
120 log.info(_("Plugin Stream Management initialization"))
121 self.host = host
122 host.registerNamespace(u'sm', NS_SM)
123 host.trigger.add("stream_hooks", self.addHooks)
124 host.trigger.add("xml_init", self._XMLInitTrigger)
125 host.trigger.add("disconnecting", self._disconnectingTrigger)
126 host.trigger.add("disconnected", self._disconnectedTrigger)
127
128 def profileConnecting(self, client):
129 client._xep_0198_session = ProfileSessionData(callback=self.checkAcks,
130 client=client)
131
132 def getHandler(self, client):
133 return XEP_0198_handler(self)
134
135 def addHooks(self, client, receive_hooks, send_hooks):
136 """Add hooks to handle in/out stanzas counters"""
137 receive_hooks.append(partial(self.onReceive, client=client))
138 send_hooks.append(partial(self.onSend, client=client))
139 return True
140
141 def _XMLInitTrigger(self, client):
142 """Enable or resume a stream mangement"""
143 if not (NS_SM, u'sm') in client.xmlstream.features:
144 log.warning(_(
145 u"Your server doesn't support stream management ({namespace}), this is "
146 u"used to improve connection problems detection (like network outages). "
147 u"Please ask your server administrator to enable this feature.".format(
148 namespace=NS_SM)))
149 return True
150 session = client._xep_0198_session
151
152 # a disconnect timer from a previous disconnection may still be active
153 try:
154 disconnect_timer = session.disconnect_timer
155 except AttributeError:
156 pass
157 else:
158 if disconnect_timer.active():
159 disconnect_timer.cancel()
160 del session.disconnect_timer
161
162 if session.resume_enabled:
163 # we are resuming a session
164 resume_elt = domish.Element((NS_SM, 'resume'))
165 resume_elt['h'] = unicode(session.in_counter)
166 resume_elt['previd'] = session.session_id
167 client.send(resume_elt)
168 session.resuming = True
169 # session.enabled will be set on <resumed/> reception
170 return False
171 else:
172 # we start a new session
173 assert session.out_counter == 0
174 enable_elt = domish.Element((NS_SM, 'enable'))
175 enable_elt[u'resume'] = u'true'
176 client.send(enable_elt)
177 session.enabled = True
178 return True
179
180 def _disconnectingTrigger(self, client):
181 session = client._xep_0198_session
182 if session.enabled:
183 self.sendAck(client)
184 # This is a requested disconnection, so we can reset the session
185 # to disable resuming and close normally the stream
186 session.reset()
187 return True
188
189 def _disconnectedTrigger(self, client, reason):
190 session = client._xep_0198_session
191 session.enabled = False
192 if session.resume_enabled:
193 session.disconnected_time = time.time()
194 session.disconnect_timer = reactor.callLater(session.session_max,
195 client.disconnectProfile,
196 reason)
197 # disconnectProfile must not be called at this point
198 # because session can be resumed
199 return False
200 else:
201 return True
202
203 def checkAcks(self, client):
204 """Request ack if needed"""
205 session = client._xep_0198_session
206 # log.debug("checkAcks (in_counter={}, out_counter={}, buf len={}, buf idx={})"
207 # .format(session.in_counter, session.out_counter, len(session.buffer),
208 # session.buffer_idx))
209 if session.ack_requested or not session.buffer:
210 return
211 if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R
212 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R):
213 self.requestAck(client)
214 session.ack_requested = True
215 session.last_ack_r = time.time()
216
217 def updateBuffer(self, session, server_acked):
218 """Update buffer and buffer_index"""
219 if server_acked > session.buffer_idx:
220 diff = server_acked - session.buffer_idx
221 for i in xrange(diff):
222 session.buffer.pop()
223 session.buffer_idx += diff
224
225 def sendAck(self, client):
226 """Send an answer element with current IN counter"""
227 a_elt = domish.Element((NS_SM, 'a'))
228 a_elt['h'] = unicode(client._xep_0198_session.in_counter)
229 client.send(a_elt)
230
231 def requestAck(self, client):
232 """Send a request element"""
233 r_elt = domish.Element((NS_SM, 'r'))
234 client.send(r_elt)
235
236 def _connectionFailed(self, failure_, connector):
237 normal_host, normal_port = connector.normal_location
238 del connector.normal_location
239 log.warning(_(
240 u"Connection failed using location given by server (host: {host}, port: "
241 u"{port}), switching to normal host and port (host: {normal_host}, port: "
242 u"{normal_port})".format(host=connector.host, port=connector.port,
243 normal_host=normal_host, normal_port=normal_port)))
244 connector.host, connector.port = normal_host, normal_port
245 connector.connectionFailed = connector.connectionFailed_ori
246 del connector.connectionFailed_ori
247 return connector.connectionFailed(failure_)
248
249 def onEnabled(self, enabled_elt, client):
250 session = client._xep_0198_session
251 session.in_counter = 0
252
253 # we check that resuming is possible and that we have a session id
254 resume = C.bool(enabled_elt.getAttribute(u'resume'))
255 session_id = enabled_elt.getAttribute(u'id')
256 if not session_id:
257 log.warning(_(u'Incorrect <enabled/> element received, no "id" attribute'))
258 if not resume or not session_id:
259 log.warning(_(
260 u"You're server doesn't support session resuming with stream management, "
261 u"please contact your server administrator to enable it"))
262 return
263
264 session.session_id = session_id
265
266 # XXX: we disable resource binding, which must not be done
267 # when we resume the session.
268 client.factory.authenticator.res_binding = False
269
270 # location, in case server want resuming session to be elsewhere
271 try:
272 location = enabled_elt[u'location']
273 except KeyError:
274 pass
275 else:
276 # TODO: handle IPv6 here (in brackets, cf. XEP)
277 try:
278 domain, port = location.split(':', 1)
279 port = int(port)
280 except ValueError:
281 log.warning(_(u"Invalid location received: {location}")
282 .format(location=location))
283 else:
284 session.location = (domain, port)
285 # we monkey patch connector to use the new location
286 connector = client.xmlstream.transport.connector
287 connector.normal_location = connector.host, connector.port
288 connector.host = domain
289 connector.port = port
290 connector.connectionFailed_ori = connector.connectionFailed
291 connector.connectionFailed = partial(self._connectionFailed,
292 connector=connector)
293
294 # resuming time
295 try:
296 max_s = int(enabled_elt[u'max'])
297 except (ValueError, KeyError) as e:
298 if isinstance(e, ValueError):
299 log.warning(_(u'Invalid "max" attribute'))
300 max_s = RESUME_MAX
301 log.info(_(u"Using default session max value ({max_s} s).".format(
302 max_s=max_s)))
303 log.info(_(u"Stream Management enabled"))
304 else:
305 log.info(_(
306 u"Stream Management enabled, with a resumption time of {res_m} min"
307 .format(res_m = max_s/60)))
308 session.session_max = max_s
309
310 def onResumed(self, enabled_elt, client):
311 session = client._xep_0198_session
312 assert not session.enabled
313 del session.resuming
314 server_acked = int(enabled_elt['h'])
315 self.updateBuffer(session, server_acked)
316 resend_count = len(session.buffer)
317 # we resend all stanza which have not been received properly
318 while True:
319 try:
320 stanza = session.buffer.pop()
321 except IndexError:
322 break
323 else:
324 client.send(stanza)
325 # now we can continue the session
326 session.enabled = True
327 d_time = time.time() - session.disconnected_time
328 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} "
329 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count))
330
331 def onFailed(self, failed_elt, client):
332 session = client._xep_0198_session
333 condition_elt = failed_elt.firstChildElement()
334 session.reset()
335
336 try:
337 del session.resuming
338 except AttributeError:
339 # stream management can't be started at all
340 msg = _(u"Can't use stream management")
341 if condition_elt is None:
342 log.error(msg + u'.')
343 else:
344 log.error(_(u"{msg}: {reason}").format(
345 msg=msg, reason=condition_elt.name))
346 else:
347 # only stream resumption failed, we can try full session init
348 # XXX: we try to start full session init from this point, with many
349 # variables/attributes already initialised with a potentially different
350 # jid. This is experimental and may not be safe. It may be more
351 # secured to abord the connection and restart everything with a fresh
352 # client.
353 msg = _(u"stream resumption not possible, restarting full session")
354
355 if condition_elt is None:
356 log.warning(u'{msg}.'.format(msg=msg))
357 else:
358 log.warning(u"{msg}: {reason}".format(
359 msg=msg, reason=condition_elt.name))
360 # stream resumption failed, but we still can do normal stream management
361 # we restore attributes as if the session was new, and init stream
362 # we keep everything initialized, and only do binding, roster request
363 # and initial presence sending.
364 if client.conn_deferred.called:
365 client.conn_deferred = defer.Deferred()
366 else:
367 log.error(u"conn_deferred should be called at this point")
368 # we need to recreate roster
369 client.handlers.remove(client.roster)
370 client.roster = client.roster.__class__(self.host)
371 client.roster.setHandlerParent(client)
372 # bind init is not done when resuming is possible, so we have to do it now
373 bind_init = jabber_client.BindInitializer(client.xmlstream)
374 bind_init.required = True
375 d = bind_init.start()
376 # we set the jid, which may have changed
377 d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid))
378 # we call the trigger who will send the <enable/> element
379 d.addCallback(lambda __: self._XMLInitTrigger(client))
380 # then we have to re-request the roster, as changes may have occured
381 d.addCallback(lambda __: client.roster.requestRoster())
382 # we add got_roster to be sure to have roster before sending initial presence
383 d.addCallback(lambda __: client.roster.got_roster)
384 # initial presence must be sent manually
385 d.addCallback(lambda __: client.presence.available())
386
387 def onReceive(self, element, client):
388 session = client._xep_0198_session
389 if session.enabled and element.name.lower() in C.STANZA_NAMES:
390 session.in_counter += 1 % MAX_COUNTER
391
392 def onSend(self, obj, client):
393 session = client._xep_0198_session
394 if (session.enabled
395 and domish.IElement.providedBy(obj)
396 and obj.name.lower() in C.STANZA_NAMES):
397 session.out_counter += 1 % MAX_COUNTER
398 session.buffer.appendleft(obj)
399 self.checkAcks(client)
400
401 def onAckRequest(self, r_elt, client):
402 self.sendAck(client)
403
404 def onAckAnswer(self, a_elt, client):
405 session = client._xep_0198_session
406 session.ack_requested = False
407 try:
408 server_acked = int(a_elt['h'])
409 except ValueError:
410 log.warning(_(u"Server returned invalid ack element, disabling stream "
411 u"management: {xml}").format(xml=a_elt))
412 session.enabled = False
413 return
414
415 if server_acked > session.out_counter:
416 log.error(_(u"Server acked more stanzas than we have sent, disabling stream "
417 u"management."))
418 session.reset()
419 return
420
421 self.updateBuffer(session, server_acked)
422 self.checkAcks(client)
423
424
425 class XEP_0198_handler(xmlstream.XMPPHandler):
426 implements(iwokkel.IDisco)
427
428 def __init__(self, plugin_parent):
429 self.plugin_parent = plugin_parent
430 self.host = plugin_parent.host
431
432 def connectionInitialized(self):
433 self.xmlstream.addObserver(
434 SM_ENABLED, self.plugin_parent.onEnabled, client=self.parent
435 )
436 self.xmlstream.addObserver(
437 SM_RESUMED, self.plugin_parent.onResumed, client=self.parent
438 )
439 self.xmlstream.addObserver(
440 SM_FAILED, self.plugin_parent.onFailed, client=self.parent
441 )
442 self.xmlstream.addObserver(
443 SM_R_REQUEST, self.plugin_parent.onAckRequest, client=self.parent
444 )
445 self.xmlstream.addObserver(
446 SM_A_REQUEST, self.plugin_parent.onAckAnswer, client=self.parent
447 )
448
449 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
450 return [disco.DiscoFeature(NS_SM)]
451
452 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
453 return []