Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0198.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0198.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # SàT plugin for managing Stream-Management | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from libervia.backend.core.i18n import _ | |
20 from libervia.backend.core.constants import Const as C | |
21 from libervia.backend.core import exceptions | |
22 from libervia.backend.core.log import getLogger | |
23 from twisted.words.protocols.jabber import client as jabber_client | |
24 from twisted.words.protocols.jabber import xmlstream | |
25 from twisted.words.xish import domish | |
26 from twisted.internet import defer | |
27 from twisted.internet import task, reactor | |
28 from functools import partial | |
29 from wokkel import disco, iwokkel | |
30 from zope.interface import implementer | |
31 import collections | |
32 import time | |
33 | |
34 log = getLogger(__name__) | |
35 | |
36 PLUGIN_INFO = { | |
37 C.PI_NAME: "Stream Management", | |
38 C.PI_IMPORT_NAME: "XEP-0198", | |
39 C.PI_TYPE: "XEP", | |
40 C.PI_MODES: C.PLUG_MODE_BOTH, | |
41 C.PI_PROTOCOLS: ["XEP-0198"], | |
42 C.PI_DEPENDENCIES: [], | |
43 C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0313"], | |
44 C.PI_MAIN: "XEP_0198", | |
45 C.PI_HANDLER: "yes", | |
46 C.PI_DESCRIPTION: _("""Implementation of Stream Management"""), | |
47 } | |
48 | |
49 NS_SM = "urn:xmpp:sm:3" | |
50 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]' | |
51 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]' | |
52 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]' | |
53 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]' | |
54 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]' | |
55 SM_H_REQUEST = '/h[@xmlns="' + NS_SM + '"]' | |
56 # Max number of stanza to send before requesting ack | |
57 MAX_STANZA_ACK_R = 5 | |
58 # Max number of seconds before requesting ack | |
59 MAX_DELAY_ACK_R = 30 | |
60 MAX_COUNTER = 2**32 | |
61 RESUME_MAX = 5*60 | |
62 # if we don't have an answer to ACK REQUEST after this delay, connection is aborted | |
63 ACK_TIMEOUT = 35 | |
64 | |
65 | |
66 class ProfileSessionData(object): | |
67 out_counter = 0 | |
68 in_counter = 0 | |
69 session_id = None | |
70 location = None | |
71 session_max = None | |
72 # True when an ack answer is expected | |
73 ack_requested = False | |
74 last_ack_r = 0 | |
75 disconnected_time = None | |
76 | |
77 def __init__(self, callback, **kw): | |
78 self.buffer = collections.deque() | |
79 self.buffer_idx = 0 | |
80 self._enabled = False | |
81 self.timer = None | |
82 # time used when doing a ack request | |
83 # when it times out, connection is aborted | |
84 self.req_timer = None | |
85 self.callback_data = (callback, kw) | |
86 | |
87 @property | |
88 def enabled(self): | |
89 return self._enabled | |
90 | |
91 @enabled.setter | |
92 def enabled(self, enabled): | |
93 if enabled: | |
94 if self._enabled: | |
95 raise exceptions.InternalError( | |
96 "Stream Management can't be enabled twice") | |
97 self._enabled = True | |
98 callback, kw = self.callback_data | |
99 self.timer = task.LoopingCall(callback, **kw) | |
100 self.timer.start(MAX_DELAY_ACK_R, now=False) | |
101 else: | |
102 self._enabled = False | |
103 if self.timer is not None: | |
104 self.timer.stop() | |
105 self.timer = None | |
106 | |
107 @property | |
108 def resume_enabled(self): | |
109 return self.session_id is not None | |
110 | |
111 def reset(self): | |
112 self.enabled = False | |
113 self.buffer.clear() | |
114 self.buffer_idx = 0 | |
115 self.in_counter = self.out_counter = 0 | |
116 self.session_id = self.location = None | |
117 self.ack_requested = False | |
118 self.last_ack_r = 0 | |
119 if self.req_timer is not None: | |
120 if self.req_timer.active(): | |
121 log.error("req_timer has been called/cancelled but not reset") | |
122 else: | |
123 self.req_timer.cancel() | |
124 self.req_timer = None | |
125 | |
126 def get_buffer_copy(self): | |
127 return list(self.buffer) | |
128 | |
129 | |
130 class XEP_0198(object): | |
131 # FIXME: location is not handled yet | |
132 | |
133 def __init__(self, host): | |
134 log.info(_("Plugin Stream Management initialization")) | |
135 self.host = host | |
136 host.register_namespace('sm', NS_SM) | |
137 host.trigger.add("stream_hooks", self.add_hooks) | |
138 host.trigger.add("xml_init", self._xml_init_trigger) | |
139 host.trigger.add("disconnecting", self._disconnecting_trigger) | |
140 host.trigger.add("disconnected", self._disconnected_trigger) | |
141 try: | |
142 self._ack_timeout = int(host.memory.config_get("", "ack_timeout", ACK_TIMEOUT)) | |
143 except ValueError: | |
144 log.error(_("Invalid ack_timeout value, please check your configuration")) | |
145 self._ack_timeout = ACK_TIMEOUT | |
146 if not self._ack_timeout: | |
147 log.info(_("Ack timeout disabled")) | |
148 else: | |
149 log.info(_("Ack timeout set to {timeout}s").format( | |
150 timeout=self._ack_timeout)) | |
151 | |
152 def profile_connecting(self, client): | |
153 client._xep_0198_session = ProfileSessionData(callback=self.check_acks, | |
154 client=client) | |
155 | |
156 def get_handler(self, client): | |
157 return XEP_0198_handler(self) | |
158 | |
159 def add_hooks(self, client, receive_hooks, send_hooks): | |
160 """Add hooks to handle in/out stanzas counters""" | |
161 receive_hooks.append(partial(self.on_receive, client=client)) | |
162 send_hooks.append(partial(self.on_send, client=client)) | |
163 return True | |
164 | |
165 def _xml_init_trigger(self, client): | |
166 """Enable or resume a stream mangement""" | |
167 if not (NS_SM, 'sm') in client.xmlstream.features: | |
168 log.warning(_( | |
169 "Your server doesn't support stream management ({namespace}), this is " | |
170 "used to improve connection problems detection (like network outages). " | |
171 "Please ask your server administrator to enable this feature.".format( | |
172 namespace=NS_SM))) | |
173 return True | |
174 session = client._xep_0198_session | |
175 | |
176 # a disconnect timer from a previous disconnection may still be active | |
177 try: | |
178 disconnect_timer = session.disconnect_timer | |
179 except AttributeError: | |
180 pass | |
181 else: | |
182 if disconnect_timer.active(): | |
183 disconnect_timer.cancel() | |
184 del session.disconnect_timer | |
185 | |
186 if session.resume_enabled: | |
187 # we are resuming a session | |
188 resume_elt = domish.Element((NS_SM, 'resume')) | |
189 resume_elt['h'] = str(session.in_counter) | |
190 resume_elt['previd'] = session.session_id | |
191 client.send(resume_elt) | |
192 session.resuming = True | |
193 # session.enabled will be set on <resumed/> reception | |
194 return False | |
195 else: | |
196 # we start a new session | |
197 assert session.out_counter == 0 | |
198 enable_elt = domish.Element((NS_SM, 'enable')) | |
199 enable_elt['resume'] = 'true' | |
200 client.send(enable_elt) | |
201 session.enabled = True | |
202 return True | |
203 | |
204 def _disconnecting_trigger(self, client): | |
205 session = client._xep_0198_session | |
206 if session.enabled: | |
207 self.send_ack(client) | |
208 # This is a requested disconnection, so we can reset the session | |
209 # to disable resuming and close normally the stream | |
210 session.reset() | |
211 return True | |
212 | |
213 def _disconnected_trigger(self, client, reason): | |
214 if client.is_component: | |
215 return True | |
216 session = client._xep_0198_session | |
217 session.enabled = False | |
218 if session.resume_enabled: | |
219 session.disconnected_time = time.time() | |
220 session.disconnect_timer = reactor.callLater(session.session_max, | |
221 client.disconnect_profile, | |
222 reason) | |
223 # disconnect_profile must not be called at this point | |
224 # because session can be resumed | |
225 return False | |
226 else: | |
227 return True | |
228 | |
229 def check_acks(self, client): | |
230 """Request ack if needed""" | |
231 session = client._xep_0198_session | |
232 # log.debug("check_acks (in_counter={}, out_counter={}, buf len={}, buf idx={})" | |
233 # .format(session.in_counter, session.out_counter, len(session.buffer), | |
234 # session.buffer_idx)) | |
235 if session.ack_requested or not session.buffer: | |
236 return | |
237 if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R | |
238 or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R): | |
239 self.request_ack(client) | |
240 session.ack_requested = True | |
241 session.last_ack_r = time.time() | |
242 | |
243 def update_buffer(self, session, server_acked): | |
244 """Update buffer and buffer_index""" | |
245 if server_acked > session.buffer_idx: | |
246 diff = server_acked - session.buffer_idx | |
247 try: | |
248 for i in range(diff): | |
249 session.buffer.pop() | |
250 except IndexError: | |
251 log.error( | |
252 "error while cleaning buffer, invalid index (buffer is empty):\n" | |
253 "diff = {diff}\n" | |
254 "server_acked = {server_acked}\n" | |
255 "buffer_idx = {buffer_id}".format( | |
256 diff=diff, server_acked=server_acked, | |
257 buffer_id=session.buffer_idx)) | |
258 session.buffer_idx += diff | |
259 | |
260 def replay_buffer(self, client, buffer_, discard_results=False): | |
261 """Resend all stanza in buffer | |
262 | |
263 @param buffer_(collection.deque, list): buffer to replay | |
264 the buffer will be cleared by this method | |
265 @param discard_results(bool): if True, don't replay IQ result stanzas | |
266 """ | |
267 while True: | |
268 try: | |
269 stanza = buffer_.pop() | |
270 except IndexError: | |
271 break | |
272 else: | |
273 if ((discard_results | |
274 and stanza.name == 'iq' | |
275 and stanza.getAttribute('type') == 'result')): | |
276 continue | |
277 client.send(stanza) | |
278 | |
279 def send_ack(self, client): | |
280 """Send an answer element with current IN counter""" | |
281 a_elt = domish.Element((NS_SM, 'a')) | |
282 a_elt['h'] = str(client._xep_0198_session.in_counter) | |
283 client.send(a_elt) | |
284 | |
285 def request_ack(self, client): | |
286 """Send a request element""" | |
287 session = client._xep_0198_session | |
288 r_elt = domish.Element((NS_SM, 'r')) | |
289 client.send(r_elt) | |
290 if session.req_timer is not None: | |
291 raise exceptions.InternalError("req_timer should not be set") | |
292 if self._ack_timeout: | |
293 session.req_timer = reactor.callLater(self._ack_timeout, self.on_ack_time_out, | |
294 client) | |
295 | |
296 def _connectionFailed(self, failure_, connector): | |
297 normal_host, normal_port = connector.normal_location | |
298 del connector.normal_location | |
299 log.warning(_( | |
300 "Connection failed using location given by server (host: {host}, port: " | |
301 "{port}), switching to normal host and port (host: {normal_host}, port: " | |
302 "{normal_port})".format(host=connector.host, port=connector.port, | |
303 normal_host=normal_host, normal_port=normal_port))) | |
304 connector.host, connector.port = normal_host, normal_port | |
305 connector.connectionFailed = connector.connectionFailed_ori | |
306 del connector.connectionFailed_ori | |
307 return connector.connectionFailed(failure_) | |
308 | |
309 def on_enabled(self, enabled_elt, client): | |
310 session = client._xep_0198_session | |
311 session.in_counter = 0 | |
312 | |
313 # we check that resuming is possible and that we have a session id | |
314 resume = C.bool(enabled_elt.getAttribute('resume')) | |
315 session_id = enabled_elt.getAttribute('id') | |
316 if not session_id: | |
317 log.warning(_('Incorrect <enabled/> element received, no "id" attribute')) | |
318 if not resume or not session_id: | |
319 log.warning(_( | |
320 "You're server doesn't support session resuming with stream management, " | |
321 "please contact your server administrator to enable it")) | |
322 return | |
323 | |
324 session.session_id = session_id | |
325 | |
326 # XXX: we disable resource binding, which must not be done | |
327 # when we resume the session. | |
328 client.factory.authenticator.res_binding = False | |
329 | |
330 # location, in case server want resuming session to be elsewhere | |
331 try: | |
332 location = enabled_elt['location'] | |
333 except KeyError: | |
334 pass | |
335 else: | |
336 # TODO: handle IPv6 here (in brackets, cf. XEP) | |
337 try: | |
338 domain, port = location.split(':', 1) | |
339 port = int(port) | |
340 except ValueError: | |
341 log.warning(_("Invalid location received: {location}") | |
342 .format(location=location)) | |
343 else: | |
344 session.location = (domain, port) | |
345 # we monkey patch connector to use the new location | |
346 connector = client.xmlstream.transport.connector | |
347 connector.normal_location = connector.host, connector.port | |
348 connector.host = domain | |
349 connector.port = port | |
350 connector.connectionFailed_ori = connector.connectionFailed | |
351 connector.connectionFailed = partial(self._connectionFailed, | |
352 connector=connector) | |
353 | |
354 # resuming time | |
355 try: | |
356 max_s = int(enabled_elt['max']) | |
357 except (ValueError, KeyError) as e: | |
358 if isinstance(e, ValueError): | |
359 log.warning(_('Invalid "max" attribute')) | |
360 max_s = RESUME_MAX | |
361 log.info(_("Using default session max value ({max_s} s).".format( | |
362 max_s=max_s))) | |
363 log.info(_("Stream Management enabled")) | |
364 else: | |
365 log.info(_( | |
366 "Stream Management enabled, with a resumption time of {res_m:.2f} min" | |
367 .format(res_m = max_s/60))) | |
368 session.session_max = max_s | |
369 | |
370 def on_resumed(self, enabled_elt, client): | |
371 session = client._xep_0198_session | |
372 assert not session.enabled | |
373 del session.resuming | |
374 server_acked = int(enabled_elt['h']) | |
375 self.update_buffer(session, server_acked) | |
376 resend_count = len(session.buffer) | |
377 # we resend all stanza which have not been received properly | |
378 self.replay_buffer(client, session.buffer) | |
379 # now we can continue the session | |
380 session.enabled = True | |
381 d_time = time.time() - session.disconnected_time | |
382 log.info(_("Stream session resumed (disconnected for {d_time} s, {count} " | |
383 "stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) | |
384 | |
385 def on_failed(self, failed_elt, client): | |
386 session = client._xep_0198_session | |
387 condition_elt = failed_elt.firstChildElement() | |
388 buffer_ = session.get_buffer_copy() | |
389 session.reset() | |
390 | |
391 try: | |
392 del session.resuming | |
393 except AttributeError: | |
394 # stream management can't be started at all | |
395 msg = _("Can't use stream management") | |
396 if condition_elt is None: | |
397 log.error(msg + '.') | |
398 else: | |
399 log.error(_("{msg}: {reason}").format( | |
400 msg=msg, reason=condition_elt.name)) | |
401 else: | |
402 # only stream resumption failed, we can try full session init | |
403 # XXX: we try to start full session init from this point, with many | |
404 # variables/attributes already initialised with a potentially different | |
405 # jid. This is experimental and may not be safe. It may be more | |
406 # secured to abord the connection and restart everything with a fresh | |
407 # client. | |
408 msg = _("stream resumption not possible, restarting full session") | |
409 | |
410 if condition_elt is None: | |
411 log.warning('{msg}.'.format(msg=msg)) | |
412 else: | |
413 log.warning("{msg}: {reason}".format( | |
414 msg=msg, reason=condition_elt.name)) | |
415 # stream resumption failed, but we still can do normal stream management | |
416 # we restore attributes as if the session was new, and init stream | |
417 # we keep everything initialized, and only do binding, roster request | |
418 # and initial presence sending. | |
419 if client.conn_deferred.called: | |
420 client.conn_deferred = defer.Deferred() | |
421 else: | |
422 log.error("conn_deferred should be called at this point") | |
423 plg_0045 = self.host.plugins.get('XEP-0045') | |
424 plg_0313 = self.host.plugins.get('XEP-0313') | |
425 | |
426 # FIXME: we should call all loaded plugins with generic callbacks | |
427 # (e.g. prepareResume and resume), so a hot resuming can be done | |
428 # properly for all plugins. | |
429 | |
430 if plg_0045 is not None: | |
431 # we have to remove joined rooms | |
432 muc_join_args = plg_0045.pop_rooms(client) | |
433 # we need to recreate roster | |
434 client.handlers.remove(client.roster) | |
435 client.roster = client.roster.__class__(self.host) | |
436 client.roster.setHandlerParent(client) | |
437 # bind init is not done when resuming is possible, so we have to do it now | |
438 bind_init = jabber_client.BindInitializer(client.xmlstream) | |
439 bind_init.required = True | |
440 d = bind_init.start() | |
441 # we set the jid, which may have changed | |
442 d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid)) | |
443 # we call the trigger who will send the <enable/> element | |
444 d.addCallback(lambda __: self._xml_init_trigger(client)) | |
445 # then we have to re-request the roster, as changes may have occured | |
446 d.addCallback(lambda __: client.roster.request_roster()) | |
447 # we add got_roster to be sure to have roster before sending initial presence | |
448 d.addCallback(lambda __: client.roster.got_roster) | |
449 if plg_0313 is not None: | |
450 # we retrieve one2one MAM archives | |
451 d.addCallback(lambda __: defer.ensureDeferred(plg_0313.resume(client))) | |
452 # initial presence must be sent manually | |
453 d.addCallback(lambda __: client.presence.available()) | |
454 if plg_0045 is not None: | |
455 # we re-join MUC rooms | |
456 muc_d_list = defer.DeferredList( | |
457 [defer.ensureDeferred(plg_0045.join(*args)) | |
458 for args in muc_join_args] | |
459 ) | |
460 d.addCallback(lambda __: muc_d_list) | |
461 # at the end we replay the buffer, as those stanzas have probably not | |
462 # been received | |
463 d.addCallback(lambda __: self.replay_buffer(client, buffer_, | |
464 discard_results=True)) | |
465 | |
466 def on_receive(self, element, client): | |
467 if not client.is_component: | |
468 session = client._xep_0198_session | |
469 if session.enabled and element.name.lower() in C.STANZA_NAMES: | |
470 session.in_counter += 1 % MAX_COUNTER | |
471 | |
472 def on_send(self, obj, client): | |
473 if not client.is_component: | |
474 session = client._xep_0198_session | |
475 if (session.enabled | |
476 and domish.IElement.providedBy(obj) | |
477 and obj.name.lower() in C.STANZA_NAMES): | |
478 session.out_counter += 1 % MAX_COUNTER | |
479 session.buffer.appendleft(obj) | |
480 self.check_acks(client) | |
481 | |
482 def on_ack_request(self, r_elt, client): | |
483 self.send_ack(client) | |
484 | |
485 def on_ack_answer(self, a_elt, client): | |
486 session = client._xep_0198_session | |
487 session.ack_requested = False | |
488 if self._ack_timeout: | |
489 if session.req_timer is None: | |
490 log.error("req_timer should be set") | |
491 else: | |
492 session.req_timer.cancel() | |
493 session.req_timer = None | |
494 try: | |
495 server_acked = int(a_elt['h']) | |
496 except ValueError: | |
497 log.warning(_("Server returned invalid ack element, disabling stream " | |
498 "management: {xml}").format(xml=a_elt)) | |
499 session.enabled = False | |
500 return | |
501 | |
502 if server_acked > session.out_counter: | |
503 log.error(_("Server acked more stanzas than we have sent, disabling stream " | |
504 "management.")) | |
505 session.reset() | |
506 return | |
507 | |
508 self.update_buffer(session, server_acked) | |
509 self.check_acks(client) | |
510 | |
511 def on_ack_time_out(self, client): | |
512 """Called when a requested ACK has not been received in time""" | |
513 log.info(_("Ack was not received in time, aborting connection")) | |
514 try: | |
515 xmlstream = client.xmlstream | |
516 except AttributeError: | |
517 log.warning("xmlstream has already been terminated") | |
518 else: | |
519 transport = xmlstream.transport | |
520 if transport is None: | |
521 log.warning("transport was already removed") | |
522 else: | |
523 transport.abortConnection() | |
524 client._xep_0198_session.req_timer = None | |
525 | |
526 | |
527 @implementer(iwokkel.IDisco) | |
528 class XEP_0198_handler(xmlstream.XMPPHandler): | |
529 | |
530 def __init__(self, plugin_parent): | |
531 self.plugin_parent = plugin_parent | |
532 self.host = plugin_parent.host | |
533 | |
534 def connectionInitialized(self): | |
535 self.xmlstream.addObserver( | |
536 SM_ENABLED, self.plugin_parent.on_enabled, client=self.parent | |
537 ) | |
538 self.xmlstream.addObserver( | |
539 SM_RESUMED, self.plugin_parent.on_resumed, client=self.parent | |
540 ) | |
541 self.xmlstream.addObserver( | |
542 SM_FAILED, self.plugin_parent.on_failed, client=self.parent | |
543 ) | |
544 self.xmlstream.addObserver( | |
545 SM_R_REQUEST, self.plugin_parent.on_ack_request, client=self.parent | |
546 ) | |
547 self.xmlstream.addObserver( | |
548 SM_A_REQUEST, self.plugin_parent.on_ack_answer, client=self.parent | |
549 ) | |
550 | |
551 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
552 return [disco.DiscoFeature(NS_SM)] | |
553 | |
554 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
555 return [] |