comparison libervia/backend/plugins/plugin_xep_0198.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0198.py@524856bd7b19
children
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # SàT plugin for managing Stream-Management
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from libervia.backend.core.i18n import _
20 from libervia.backend.core.constants import Const as C
21 from libervia.backend.core import exceptions
22 from libervia.backend.core.log import getLogger
23 from twisted.words.protocols.jabber import client as jabber_client
24 from twisted.words.protocols.jabber import xmlstream
25 from twisted.words.xish import domish
26 from twisted.internet import defer
27 from twisted.internet import task, reactor
28 from functools import partial
29 from wokkel import disco, iwokkel
30 from zope.interface import implementer
31 import collections
32 import time
33
34 log = getLogger(__name__)
35
36 PLUGIN_INFO = {
37 C.PI_NAME: "Stream Management",
38 C.PI_IMPORT_NAME: "XEP-0198",
39 C.PI_TYPE: "XEP",
40 C.PI_MODES: C.PLUG_MODE_BOTH,
41 C.PI_PROTOCOLS: ["XEP-0198"],
42 C.PI_DEPENDENCIES: [],
43 C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0313"],
44 C.PI_MAIN: "XEP_0198",
45 C.PI_HANDLER: "yes",
46 C.PI_DESCRIPTION: _("""Implementation of Stream Management"""),
47 }
48
49 NS_SM = "urn:xmpp:sm:3"
50 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]'
51 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]'
52 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]'
53 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]'
54 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]'
55 SM_H_REQUEST = '/h[@xmlns="' + NS_SM + '"]'
56 # Max number of stanza to send before requesting ack
57 MAX_STANZA_ACK_R = 5
58 # Max number of seconds before requesting ack
59 MAX_DELAY_ACK_R = 30
60 MAX_COUNTER = 2**32
61 RESUME_MAX = 5*60
62 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted
63 ACK_TIMEOUT = 35
64
65
66 class ProfileSessionData(object):
67 out_counter = 0
68 in_counter = 0
69 session_id = None
70 location = None
71 session_max = None
72 # True when an ack answer is expected
73 ack_requested = False
74 last_ack_r = 0
75 disconnected_time = None
76
77 def __init__(self, callback, **kw):
78 self.buffer = collections.deque()
79 self.buffer_idx = 0
80 self._enabled = False
81 self.timer = None
82 # time used when doing a ack request
83 # when it times out, connection is aborted
84 self.req_timer = None
85 self.callback_data = (callback, kw)
86
87 @property
88 def enabled(self):
89 return self._enabled
90
91 @enabled.setter
92 def enabled(self, enabled):
93 if enabled:
94 if self._enabled:
95 raise exceptions.InternalError(
96 "Stream Management can't be enabled twice")
97 self._enabled = True
98 callback, kw = self.callback_data
99 self.timer = task.LoopingCall(callback, **kw)
100 self.timer.start(MAX_DELAY_ACK_R, now=False)
101 else:
102 self._enabled = False
103 if self.timer is not None:
104 self.timer.stop()
105 self.timer = None
106
107 @property
108 def resume_enabled(self):
109 return self.session_id is not None
110
111 def reset(self):
112 self.enabled = False
113 self.buffer.clear()
114 self.buffer_idx = 0
115 self.in_counter = self.out_counter = 0
116 self.session_id = self.location = None
117 self.ack_requested = False
118 self.last_ack_r = 0
119 if self.req_timer is not None:
120 if self.req_timer.active():
121 log.error("req_timer has been called/cancelled but not reset")
122 else:
123 self.req_timer.cancel()
124 self.req_timer = None
125
126 def get_buffer_copy(self):
127 return list(self.buffer)
128
129
130 class XEP_0198(object):
131 # FIXME: location is not handled yet
132
133 def __init__(self, host):
134 log.info(_("Plugin Stream Management initialization"))
135 self.host = host
136 host.register_namespace('sm', NS_SM)
137 host.trigger.add("stream_hooks", self.add_hooks)
138 host.trigger.add("xml_init", self._xml_init_trigger)
139 host.trigger.add("disconnecting", self._disconnecting_trigger)
140 host.trigger.add("disconnected", self._disconnected_trigger)
141 try:
142 self._ack_timeout = int(host.memory.config_get("", "ack_timeout", ACK_TIMEOUT))
143 except ValueError:
144 log.error(_("Invalid ack_timeout value, please check your configuration"))
145 self._ack_timeout = ACK_TIMEOUT
146 if not self._ack_timeout:
147 log.info(_("Ack timeout disabled"))
148 else:
149 log.info(_("Ack timeout set to {timeout}s").format(
150 timeout=self._ack_timeout))
151
152 def profile_connecting(self, client):
153 client._xep_0198_session = ProfileSessionData(callback=self.check_acks,
154 client=client)
155
156 def get_handler(self, client):
157 return XEP_0198_handler(self)
158
159 def add_hooks(self, client, receive_hooks, send_hooks):
160 """Add hooks to handle in/out stanzas counters"""
161 receive_hooks.append(partial(self.on_receive, client=client))
162 send_hooks.append(partial(self.on_send, client=client))
163 return True
164
165 def _xml_init_trigger(self, client):
166 """Enable or resume a stream mangement"""
167 if not (NS_SM, 'sm') in client.xmlstream.features:
168 log.warning(_(
169 "Your server doesn't support stream management ({namespace}), this is "
170 "used to improve connection problems detection (like network outages). "
171 "Please ask your server administrator to enable this feature.".format(
172 namespace=NS_SM)))
173 return True
174 session = client._xep_0198_session
175
176 # a disconnect timer from a previous disconnection may still be active
177 try:
178 disconnect_timer = session.disconnect_timer
179 except AttributeError:
180 pass
181 else:
182 if disconnect_timer.active():
183 disconnect_timer.cancel()
184 del session.disconnect_timer
185
186 if session.resume_enabled:
187 # we are resuming a session
188 resume_elt = domish.Element((NS_SM, 'resume'))
189 resume_elt['h'] = str(session.in_counter)
190 resume_elt['previd'] = session.session_id
191 client.send(resume_elt)
192 session.resuming = True
193 # session.enabled will be set on <resumed/> reception
194 return False
195 else:
196 # we start a new session
197 assert session.out_counter == 0
198 enable_elt = domish.Element((NS_SM, 'enable'))
199 enable_elt['resume'] = 'true'
200 client.send(enable_elt)
201 session.enabled = True
202 return True
203
204 def _disconnecting_trigger(self, client):
205 session = client._xep_0198_session
206 if session.enabled:
207 self.send_ack(client)
208 # This is a requested disconnection, so we can reset the session
209 # to disable resuming and close normally the stream
210 session.reset()
211 return True
212
213 def _disconnected_trigger(self, client, reason):
214 if client.is_component:
215 return True
216 session = client._xep_0198_session
217 session.enabled = False
218 if session.resume_enabled:
219 session.disconnected_time = time.time()
220 session.disconnect_timer = reactor.callLater(session.session_max,
221 client.disconnect_profile,
222 reason)
223 # disconnect_profile must not be called at this point
224 # because session can be resumed
225 return False
226 else:
227 return True
228
229 def check_acks(self, client):
230 """Request ack if needed"""
231 session = client._xep_0198_session
232 # log.debug("check_acks (in_counter={}, out_counter={}, buf len={}, buf idx={})"
233 # .format(session.in_counter, session.out_counter, len(session.buffer),
234 # session.buffer_idx))
235 if session.ack_requested or not session.buffer:
236 return
237 if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R
238 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R):
239 self.request_ack(client)
240 session.ack_requested = True
241 session.last_ack_r = time.time()
242
243 def update_buffer(self, session, server_acked):
244 """Update buffer and buffer_index"""
245 if server_acked > session.buffer_idx:
246 diff = server_acked - session.buffer_idx
247 try:
248 for i in range(diff):
249 session.buffer.pop()
250 except IndexError:
251 log.error(
252 "error while cleaning buffer, invalid index (buffer is empty):\n"
253 "diff = {diff}\n"
254 "server_acked = {server_acked}\n"
255 "buffer_idx = {buffer_id}".format(
256 diff=diff, server_acked=server_acked,
257 buffer_id=session.buffer_idx))
258 session.buffer_idx += diff
259
260 def replay_buffer(self, client, buffer_, discard_results=False):
261 """Resend all stanza in buffer
262
263 @param buffer_(collection.deque, list): buffer to replay
264 the buffer will be cleared by this method
265 @param discard_results(bool): if True, don't replay IQ result stanzas
266 """
267 while True:
268 try:
269 stanza = buffer_.pop()
270 except IndexError:
271 break
272 else:
273 if ((discard_results
274 and stanza.name == 'iq'
275 and stanza.getAttribute('type') == 'result')):
276 continue
277 client.send(stanza)
278
279 def send_ack(self, client):
280 """Send an answer element with current IN counter"""
281 a_elt = domish.Element((NS_SM, 'a'))
282 a_elt['h'] = str(client._xep_0198_session.in_counter)
283 client.send(a_elt)
284
285 def request_ack(self, client):
286 """Send a request element"""
287 session = client._xep_0198_session
288 r_elt = domish.Element((NS_SM, 'r'))
289 client.send(r_elt)
290 if session.req_timer is not None:
291 raise exceptions.InternalError("req_timer should not be set")
292 if self._ack_timeout:
293 session.req_timer = reactor.callLater(self._ack_timeout, self.on_ack_time_out,
294 client)
295
296 def _connectionFailed(self, failure_, connector):
297 normal_host, normal_port = connector.normal_location
298 del connector.normal_location
299 log.warning(_(
300 "Connection failed using location given by server (host: {host}, port: "
301 "{port}), switching to normal host and port (host: {normal_host}, port: "
302 "{normal_port})".format(host=connector.host, port=connector.port,
303 normal_host=normal_host, normal_port=normal_port)))
304 connector.host, connector.port = normal_host, normal_port
305 connector.connectionFailed = connector.connectionFailed_ori
306 del connector.connectionFailed_ori
307 return connector.connectionFailed(failure_)
308
309 def on_enabled(self, enabled_elt, client):
310 session = client._xep_0198_session
311 session.in_counter = 0
312
313 # we check that resuming is possible and that we have a session id
314 resume = C.bool(enabled_elt.getAttribute('resume'))
315 session_id = enabled_elt.getAttribute('id')
316 if not session_id:
317 log.warning(_('Incorrect <enabled/> element received, no "id" attribute'))
318 if not resume or not session_id:
319 log.warning(_(
320 "You're server doesn't support session resuming with stream management, "
321 "please contact your server administrator to enable it"))
322 return
323
324 session.session_id = session_id
325
326 # XXX: we disable resource binding, which must not be done
327 # when we resume the session.
328 client.factory.authenticator.res_binding = False
329
330 # location, in case server want resuming session to be elsewhere
331 try:
332 location = enabled_elt['location']
333 except KeyError:
334 pass
335 else:
336 # TODO: handle IPv6 here (in brackets, cf. XEP)
337 try:
338 domain, port = location.split(':', 1)
339 port = int(port)
340 except ValueError:
341 log.warning(_("Invalid location received: {location}")
342 .format(location=location))
343 else:
344 session.location = (domain, port)
345 # we monkey patch connector to use the new location
346 connector = client.xmlstream.transport.connector
347 connector.normal_location = connector.host, connector.port
348 connector.host = domain
349 connector.port = port
350 connector.connectionFailed_ori = connector.connectionFailed
351 connector.connectionFailed = partial(self._connectionFailed,
352 connector=connector)
353
354 # resuming time
355 try:
356 max_s = int(enabled_elt['max'])
357 except (ValueError, KeyError) as e:
358 if isinstance(e, ValueError):
359 log.warning(_('Invalid "max" attribute'))
360 max_s = RESUME_MAX
361 log.info(_("Using default session max value ({max_s} s).".format(
362 max_s=max_s)))
363 log.info(_("Stream Management enabled"))
364 else:
365 log.info(_(
366 "Stream Management enabled, with a resumption time of {res_m:.2f} min"
367 .format(res_m = max_s/60)))
368 session.session_max = max_s
369
370 def on_resumed(self, enabled_elt, client):
371 session = client._xep_0198_session
372 assert not session.enabled
373 del session.resuming
374 server_acked = int(enabled_elt['h'])
375 self.update_buffer(session, server_acked)
376 resend_count = len(session.buffer)
377 # we resend all stanza which have not been received properly
378 self.replay_buffer(client, session.buffer)
379 # now we can continue the session
380 session.enabled = True
381 d_time = time.time() - session.disconnected_time
382 log.info(_("Stream session resumed (disconnected for {d_time} s, {count} "
383 "stanza(s) resent)").format(d_time=int(d_time), count=resend_count))
384
385 def on_failed(self, failed_elt, client):
386 session = client._xep_0198_session
387 condition_elt = failed_elt.firstChildElement()
388 buffer_ = session.get_buffer_copy()
389 session.reset()
390
391 try:
392 del session.resuming
393 except AttributeError:
394 # stream management can't be started at all
395 msg = _("Can't use stream management")
396 if condition_elt is None:
397 log.error(msg + '.')
398 else:
399 log.error(_("{msg}: {reason}").format(
400 msg=msg, reason=condition_elt.name))
401 else:
402 # only stream resumption failed, we can try full session init
403 # XXX: we try to start full session init from this point, with many
404 # variables/attributes already initialised with a potentially different
405 # jid. This is experimental and may not be safe. It may be more
406 # secured to abord the connection and restart everything with a fresh
407 # client.
408 msg = _("stream resumption not possible, restarting full session")
409
410 if condition_elt is None:
411 log.warning('{msg}.'.format(msg=msg))
412 else:
413 log.warning("{msg}: {reason}".format(
414 msg=msg, reason=condition_elt.name))
415 # stream resumption failed, but we still can do normal stream management
416 # we restore attributes as if the session was new, and init stream
417 # we keep everything initialized, and only do binding, roster request
418 # and initial presence sending.
419 if client.conn_deferred.called:
420 client.conn_deferred = defer.Deferred()
421 else:
422 log.error("conn_deferred should be called at this point")
423 plg_0045 = self.host.plugins.get('XEP-0045')
424 plg_0313 = self.host.plugins.get('XEP-0313')
425
426 # FIXME: we should call all loaded plugins with generic callbacks
427 # (e.g. prepareResume and resume), so a hot resuming can be done
428 # properly for all plugins.
429
430 if plg_0045 is not None:
431 # we have to remove joined rooms
432 muc_join_args = plg_0045.pop_rooms(client)
433 # we need to recreate roster
434 client.handlers.remove(client.roster)
435 client.roster = client.roster.__class__(self.host)
436 client.roster.setHandlerParent(client)
437 # bind init is not done when resuming is possible, so we have to do it now
438 bind_init = jabber_client.BindInitializer(client.xmlstream)
439 bind_init.required = True
440 d = bind_init.start()
441 # we set the jid, which may have changed
442 d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid))
443 # we call the trigger who will send the <enable/> element
444 d.addCallback(lambda __: self._xml_init_trigger(client))
445 # then we have to re-request the roster, as changes may have occured
446 d.addCallback(lambda __: client.roster.request_roster())
447 # we add got_roster to be sure to have roster before sending initial presence
448 d.addCallback(lambda __: client.roster.got_roster)
449 if plg_0313 is not None:
450 # we retrieve one2one MAM archives
451 d.addCallback(lambda __: defer.ensureDeferred(plg_0313.resume(client)))
452 # initial presence must be sent manually
453 d.addCallback(lambda __: client.presence.available())
454 if plg_0045 is not None:
455 # we re-join MUC rooms
456 muc_d_list = defer.DeferredList(
457 [defer.ensureDeferred(plg_0045.join(*args))
458 for args in muc_join_args]
459 )
460 d.addCallback(lambda __: muc_d_list)
461 # at the end we replay the buffer, as those stanzas have probably not
462 # been received
463 d.addCallback(lambda __: self.replay_buffer(client, buffer_,
464 discard_results=True))
465
466 def on_receive(self, element, client):
467 if not client.is_component:
468 session = client._xep_0198_session
469 if session.enabled and element.name.lower() in C.STANZA_NAMES:
470 session.in_counter += 1 % MAX_COUNTER
471
472 def on_send(self, obj, client):
473 if not client.is_component:
474 session = client._xep_0198_session
475 if (session.enabled
476 and domish.IElement.providedBy(obj)
477 and obj.name.lower() in C.STANZA_NAMES):
478 session.out_counter += 1 % MAX_COUNTER
479 session.buffer.appendleft(obj)
480 self.check_acks(client)
481
482 def on_ack_request(self, r_elt, client):
483 self.send_ack(client)
484
485 def on_ack_answer(self, a_elt, client):
486 session = client._xep_0198_session
487 session.ack_requested = False
488 if self._ack_timeout:
489 if session.req_timer is None:
490 log.error("req_timer should be set")
491 else:
492 session.req_timer.cancel()
493 session.req_timer = None
494 try:
495 server_acked = int(a_elt['h'])
496 except ValueError:
497 log.warning(_("Server returned invalid ack element, disabling stream "
498 "management: {xml}").format(xml=a_elt))
499 session.enabled = False
500 return
501
502 if server_acked > session.out_counter:
503 log.error(_("Server acked more stanzas than we have sent, disabling stream "
504 "management."))
505 session.reset()
506 return
507
508 self.update_buffer(session, server_acked)
509 self.check_acks(client)
510
511 def on_ack_time_out(self, client):
512 """Called when a requested ACK has not been received in time"""
513 log.info(_("Ack was not received in time, aborting connection"))
514 try:
515 xmlstream = client.xmlstream
516 except AttributeError:
517 log.warning("xmlstream has already been terminated")
518 else:
519 transport = xmlstream.transport
520 if transport is None:
521 log.warning("transport was already removed")
522 else:
523 transport.abortConnection()
524 client._xep_0198_session.req_timer = None
525
526
527 @implementer(iwokkel.IDisco)
528 class XEP_0198_handler(xmlstream.XMPPHandler):
529
530 def __init__(self, plugin_parent):
531 self.plugin_parent = plugin_parent
532 self.host = plugin_parent.host
533
534 def connectionInitialized(self):
535 self.xmlstream.addObserver(
536 SM_ENABLED, self.plugin_parent.on_enabled, client=self.parent
537 )
538 self.xmlstream.addObserver(
539 SM_RESUMED, self.plugin_parent.on_resumed, client=self.parent
540 )
541 self.xmlstream.addObserver(
542 SM_FAILED, self.plugin_parent.on_failed, client=self.parent
543 )
544 self.xmlstream.addObserver(
545 SM_R_REQUEST, self.plugin_parent.on_ack_request, client=self.parent
546 )
547 self.xmlstream.addObserver(
548 SM_A_REQUEST, self.plugin_parent.on_ack_answer, client=self.parent
549 )
550
551 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
552 return [disco.DiscoFeature(NS_SM)]
553
554 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
555 return []