comparison sat/plugins/plugin_xep_0198.py @ 3028:ab2696e34d29

Python 3 port: /!\ this is a huge commit /!\ starting from this commit, SàT is needs Python 3.6+ /!\ SàT maybe be instable or some feature may not work anymore, this will improve with time This patch port backend, bridge and frontends to Python 3. Roughly this has been done this way: - 2to3 tools has been applied (with python 3.7) - all references to python2 have been replaced with python3 (notably shebangs) - fixed files not handled by 2to3 (notably the shell script) - several manual fixes - fixed issues reported by Python 3 that where not handled in Python 2 - replaced "async" with "async_" when needed (it's a reserved word from Python 3.7) - replaced zope's "implements" with @implementer decorator - temporary hack to handle data pickled in database, as str or bytes may be returned, to be checked later - fixed hash comparison for password - removed some code which is not needed anymore with Python 3 - deactivated some code which needs to be checked (notably certificate validation) - tested with jp, fixed reported issues until some basic commands worked - ported Primitivus (after porting dependencies like urwid satext) - more manual fixes
author Goffi <goffi@goffi.org>
date Tue, 13 Aug 2019 19:08:41 +0200
parents af9d71303605
children 87b8808ac49d
comparison
equal deleted inserted replaced
3027:ff5bcb12ae60 3028:ab2696e34d29
1 #!/usr/bin/env python2 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*- 2 # -*- coding: utf-8 -*-
3 3
4 # SàT plugin for managing raw XML log 4 # SàT plugin for managing raw XML log
5 # Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org) 5 # Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org)
6 6
26 from twisted.words.xish import domish 26 from twisted.words.xish import domish
27 from twisted.internet import defer 27 from twisted.internet import defer
28 from twisted.internet import task, reactor 28 from twisted.internet import task, reactor
29 from functools import partial 29 from functools import partial
30 from wokkel import disco, iwokkel 30 from wokkel import disco, iwokkel
31 from zope.interface import implements 31 from zope.interface import implementer
32 import collections 32 import collections
33 import time 33 import time
34 34
35 log = getLogger(__name__) 35 log = getLogger(__name__)
36 36
37 PLUGIN_INFO = { 37 PLUGIN_INFO = {
38 C.PI_NAME: u"Stream Management", 38 C.PI_NAME: "Stream Management",
39 C.PI_IMPORT_NAME: u"XEP-0198", 39 C.PI_IMPORT_NAME: "XEP-0198",
40 C.PI_TYPE: u"XEP", 40 C.PI_TYPE: "XEP",
41 C.PI_MODES: C.PLUG_MODE_BOTH, 41 C.PI_MODES: C.PLUG_MODE_BOTH,
42 C.PI_PROTOCOLS: [u"XEP-0198"], 42 C.PI_PROTOCOLS: ["XEP-0198"],
43 C.PI_DEPENDENCIES: [], 43 C.PI_DEPENDENCIES: [],
44 C.PI_RECOMMENDATIONS: [u"XEP-0045", u"XEP-0313"], 44 C.PI_RECOMMENDATIONS: ["XEP-0045", "XEP-0313"],
45 C.PI_MAIN: u"XEP_0198", 45 C.PI_MAIN: "XEP_0198",
46 C.PI_HANDLER: u"yes", 46 C.PI_HANDLER: "yes",
47 C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), 47 C.PI_DESCRIPTION: _("""Implementation of Stream Management"""),
48 } 48 }
49 49
50 NS_SM = u"urn:xmpp:sm:3" 50 NS_SM = "urn:xmpp:sm:3"
51 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]' 51 SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]'
52 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]' 52 SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]'
53 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]' 53 SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]'
54 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]' 54 SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]'
55 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]' 55 SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]'
92 @enabled.setter 92 @enabled.setter
93 def enabled(self, enabled): 93 def enabled(self, enabled):
94 if enabled: 94 if enabled:
95 if self._enabled: 95 if self._enabled:
96 raise exceptions.InternalError( 96 raise exceptions.InternalError(
97 u"Stream Management can't be enabled twice") 97 "Stream Management can't be enabled twice")
98 self._enabled = True 98 self._enabled = True
99 callback, kw = self.callback_data 99 callback, kw = self.callback_data
100 self.timer = task.LoopingCall(callback, **kw) 100 self.timer = task.LoopingCall(callback, **kw)
101 self.timer.start(MAX_DELAY_ACK_R, now=False) 101 self.timer.start(MAX_DELAY_ACK_R, now=False)
102 else: 102 else:
117 self.session_id = self.location = None 117 self.session_id = self.location = None
118 self.ack_requested = False 118 self.ack_requested = False
119 self.last_ack_r = 0 119 self.last_ack_r = 0
120 if self.req_timer is not None: 120 if self.req_timer is not None:
121 if self.req_timer.active(): 121 if self.req_timer.active():
122 log.error(u"req_timer has been called/cancelled but not reset") 122 log.error("req_timer has been called/cancelled but not reset")
123 else: 123 else:
124 self.req_timer.cancel() 124 self.req_timer.cancel()
125 self.req_timer = None 125 self.req_timer = None
126 126
127 def getBufferCopy(self): 127 def getBufferCopy(self):
132 # FIXME: location is not handled yet 132 # FIXME: location is not handled yet
133 133
134 def __init__(self, host): 134 def __init__(self, host):
135 log.info(_("Plugin Stream Management initialization")) 135 log.info(_("Plugin Stream Management initialization"))
136 self.host = host 136 self.host = host
137 host.registerNamespace(u'sm', NS_SM) 137 host.registerNamespace('sm', NS_SM)
138 host.trigger.add("stream_hooks", self.addHooks) 138 host.trigger.add("stream_hooks", self.addHooks)
139 host.trigger.add("xml_init", self._XMLInitTrigger) 139 host.trigger.add("xml_init", self._XMLInitTrigger)
140 host.trigger.add("disconnecting", self._disconnectingTrigger) 140 host.trigger.add("disconnecting", self._disconnectingTrigger)
141 host.trigger.add("disconnected", self._disconnectedTrigger) 141 host.trigger.add("disconnected", self._disconnectedTrigger)
142 try: 142 try:
143 self._ack_timeout = int(host.memory.getConfig("", "ack_timeout", ACK_TIMEOUT)) 143 self._ack_timeout = int(host.memory.getConfig("", "ack_timeout", ACK_TIMEOUT))
144 except ValueError: 144 except ValueError:
145 log.error(_(u"Invalid ack_timeout value, please check your configuration")) 145 log.error(_("Invalid ack_timeout value, please check your configuration"))
146 self._ack_timeout = ACK_TIMEOUT 146 self._ack_timeout = ACK_TIMEOUT
147 if not self._ack_timeout: 147 if not self._ack_timeout:
148 log.info(_(u"Ack timeout disabled")) 148 log.info(_("Ack timeout disabled"))
149 else: 149 else:
150 log.info(_(u"Ack timeout set to {timeout}s").format( 150 log.info(_("Ack timeout set to {timeout}s").format(
151 timeout=self._ack_timeout)) 151 timeout=self._ack_timeout))
152 152
153 def profileConnecting(self, client): 153 def profileConnecting(self, client):
154 client._xep_0198_session = ProfileSessionData(callback=self.checkAcks, 154 client._xep_0198_session = ProfileSessionData(callback=self.checkAcks,
155 client=client) 155 client=client)
163 send_hooks.append(partial(self.onSend, client=client)) 163 send_hooks.append(partial(self.onSend, client=client))
164 return True 164 return True
165 165
166 def _XMLInitTrigger(self, client): 166 def _XMLInitTrigger(self, client):
167 """Enable or resume a stream mangement""" 167 """Enable or resume a stream mangement"""
168 if not (NS_SM, u'sm') in client.xmlstream.features: 168 if not (NS_SM, 'sm') in client.xmlstream.features:
169 log.warning(_( 169 log.warning(_(
170 u"Your server doesn't support stream management ({namespace}), this is " 170 "Your server doesn't support stream management ({namespace}), this is "
171 u"used to improve connection problems detection (like network outages). " 171 "used to improve connection problems detection (like network outages). "
172 u"Please ask your server administrator to enable this feature.".format( 172 "Please ask your server administrator to enable this feature.".format(
173 namespace=NS_SM))) 173 namespace=NS_SM)))
174 return True 174 return True
175 session = client._xep_0198_session 175 session = client._xep_0198_session
176 176
177 # a disconnect timer from a previous disconnection may still be active 177 # a disconnect timer from a previous disconnection may still be active
185 del session.disconnect_timer 185 del session.disconnect_timer
186 186
187 if session.resume_enabled: 187 if session.resume_enabled:
188 # we are resuming a session 188 # we are resuming a session
189 resume_elt = domish.Element((NS_SM, 'resume')) 189 resume_elt = domish.Element((NS_SM, 'resume'))
190 resume_elt['h'] = unicode(session.in_counter) 190 resume_elt['h'] = str(session.in_counter)
191 resume_elt['previd'] = session.session_id 191 resume_elt['previd'] = session.session_id
192 client.send(resume_elt) 192 client.send(resume_elt)
193 session.resuming = True 193 session.resuming = True
194 # session.enabled will be set on <resumed/> reception 194 # session.enabled will be set on <resumed/> reception
195 return False 195 return False
196 else: 196 else:
197 # we start a new session 197 # we start a new session
198 assert session.out_counter == 0 198 assert session.out_counter == 0
199 enable_elt = domish.Element((NS_SM, 'enable')) 199 enable_elt = domish.Element((NS_SM, 'enable'))
200 enable_elt[u'resume'] = u'true' 200 enable_elt['resume'] = 'true'
201 client.send(enable_elt) 201 client.send(enable_elt)
202 session.enabled = True 202 session.enabled = True
203 return True 203 return True
204 204
205 def _disconnectingTrigger(self, client): 205 def _disconnectingTrigger(self, client):
244 def updateBuffer(self, session, server_acked): 244 def updateBuffer(self, session, server_acked):
245 """Update buffer and buffer_index""" 245 """Update buffer and buffer_index"""
246 if server_acked > session.buffer_idx: 246 if server_acked > session.buffer_idx:
247 diff = server_acked - session.buffer_idx 247 diff = server_acked - session.buffer_idx
248 try: 248 try:
249 for i in xrange(diff): 249 for i in range(diff):
250 session.buffer.pop() 250 session.buffer.pop()
251 except IndexError: 251 except IndexError:
252 log.error( 252 log.error(
253 u"error while cleaning buffer, invalid index (buffer is empty):\n" 253 "error while cleaning buffer, invalid index (buffer is empty):\n"
254 u"diff = {diff}\n" 254 "diff = {diff}\n"
255 u"server_acked = {server_acked}\n" 255 "server_acked = {server_acked}\n"
256 u"buffer_idx = {buffer_id}".format( 256 "buffer_idx = {buffer_id}".format(
257 diff=diff, server_acked=server_acked, 257 diff=diff, server_acked=server_acked,
258 buffer_id=session.buffer_idx)) 258 buffer_id=session.buffer_idx))
259 session.buffer_idx += diff 259 session.buffer_idx += diff
260 260
261 def replayBuffer(self, client, buffer_, discard_results=False): 261 def replayBuffer(self, client, buffer_, discard_results=False):
270 stanza = buffer_.pop() 270 stanza = buffer_.pop()
271 except IndexError: 271 except IndexError:
272 break 272 break
273 else: 273 else:
274 if ((discard_results 274 if ((discard_results
275 and stanza.name == u'iq' 275 and stanza.name == 'iq'
276 and stanza.getAttribute(u'type') == 'result')): 276 and stanza.getAttribute('type') == 'result')):
277 continue 277 continue
278 client.send(stanza) 278 client.send(stanza)
279 279
280 def sendAck(self, client): 280 def sendAck(self, client):
281 """Send an answer element with current IN counter""" 281 """Send an answer element with current IN counter"""
282 a_elt = domish.Element((NS_SM, 'a')) 282 a_elt = domish.Element((NS_SM, 'a'))
283 a_elt['h'] = unicode(client._xep_0198_session.in_counter) 283 a_elt['h'] = str(client._xep_0198_session.in_counter)
284 client.send(a_elt) 284 client.send(a_elt)
285 285
286 def requestAck(self, client): 286 def requestAck(self, client):
287 """Send a request element""" 287 """Send a request element"""
288 session = client._xep_0198_session 288 session = client._xep_0198_session
296 296
297 def _connectionFailed(self, failure_, connector): 297 def _connectionFailed(self, failure_, connector):
298 normal_host, normal_port = connector.normal_location 298 normal_host, normal_port = connector.normal_location
299 del connector.normal_location 299 del connector.normal_location
300 log.warning(_( 300 log.warning(_(
301 u"Connection failed using location given by server (host: {host}, port: " 301 "Connection failed using location given by server (host: {host}, port: "
302 u"{port}), switching to normal host and port (host: {normal_host}, port: " 302 "{port}), switching to normal host and port (host: {normal_host}, port: "
303 u"{normal_port})".format(host=connector.host, port=connector.port, 303 "{normal_port})".format(host=connector.host, port=connector.port,
304 normal_host=normal_host, normal_port=normal_port))) 304 normal_host=normal_host, normal_port=normal_port)))
305 connector.host, connector.port = normal_host, normal_port 305 connector.host, connector.port = normal_host, normal_port
306 connector.connectionFailed = connector.connectionFailed_ori 306 connector.connectionFailed = connector.connectionFailed_ori
307 del connector.connectionFailed_ori 307 del connector.connectionFailed_ori
308 return connector.connectionFailed(failure_) 308 return connector.connectionFailed(failure_)
310 def onEnabled(self, enabled_elt, client): 310 def onEnabled(self, enabled_elt, client):
311 session = client._xep_0198_session 311 session = client._xep_0198_session
312 session.in_counter = 0 312 session.in_counter = 0
313 313
314 # we check that resuming is possible and that we have a session id 314 # we check that resuming is possible and that we have a session id
315 resume = C.bool(enabled_elt.getAttribute(u'resume')) 315 resume = C.bool(enabled_elt.getAttribute('resume'))
316 session_id = enabled_elt.getAttribute(u'id') 316 session_id = enabled_elt.getAttribute('id')
317 if not session_id: 317 if not session_id:
318 log.warning(_(u'Incorrect <enabled/> element received, no "id" attribute')) 318 log.warning(_('Incorrect <enabled/> element received, no "id" attribute'))
319 if not resume or not session_id: 319 if not resume or not session_id:
320 log.warning(_( 320 log.warning(_(
321 u"You're server doesn't support session resuming with stream management, " 321 "You're server doesn't support session resuming with stream management, "
322 u"please contact your server administrator to enable it")) 322 "please contact your server administrator to enable it"))
323 return 323 return
324 324
325 session.session_id = session_id 325 session.session_id = session_id
326 326
327 # XXX: we disable resource binding, which must not be done 327 # XXX: we disable resource binding, which must not be done
328 # when we resume the session. 328 # when we resume the session.
329 client.factory.authenticator.res_binding = False 329 client.factory.authenticator.res_binding = False
330 330
331 # location, in case server want resuming session to be elsewhere 331 # location, in case server want resuming session to be elsewhere
332 try: 332 try:
333 location = enabled_elt[u'location'] 333 location = enabled_elt['location']
334 except KeyError: 334 except KeyError:
335 pass 335 pass
336 else: 336 else:
337 # TODO: handle IPv6 here (in brackets, cf. XEP) 337 # TODO: handle IPv6 here (in brackets, cf. XEP)
338 try: 338 try:
339 domain, port = location.split(':', 1) 339 domain, port = location.split(':', 1)
340 port = int(port) 340 port = int(port)
341 except ValueError: 341 except ValueError:
342 log.warning(_(u"Invalid location received: {location}") 342 log.warning(_("Invalid location received: {location}")
343 .format(location=location)) 343 .format(location=location))
344 else: 344 else:
345 session.location = (domain, port) 345 session.location = (domain, port)
346 # we monkey patch connector to use the new location 346 # we monkey patch connector to use the new location
347 connector = client.xmlstream.transport.connector 347 connector = client.xmlstream.transport.connector
352 connector.connectionFailed = partial(self._connectionFailed, 352 connector.connectionFailed = partial(self._connectionFailed,
353 connector=connector) 353 connector=connector)
354 354
355 # resuming time 355 # resuming time
356 try: 356 try:
357 max_s = int(enabled_elt[u'max']) 357 max_s = int(enabled_elt['max'])
358 except (ValueError, KeyError) as e: 358 except (ValueError, KeyError) as e:
359 if isinstance(e, ValueError): 359 if isinstance(e, ValueError):
360 log.warning(_(u'Invalid "max" attribute')) 360 log.warning(_('Invalid "max" attribute'))
361 max_s = RESUME_MAX 361 max_s = RESUME_MAX
362 log.info(_(u"Using default session max value ({max_s} s).".format( 362 log.info(_("Using default session max value ({max_s} s).".format(
363 max_s=max_s))) 363 max_s=max_s)))
364 log.info(_(u"Stream Management enabled")) 364 log.info(_("Stream Management enabled"))
365 else: 365 else:
366 log.info(_( 366 log.info(_(
367 u"Stream Management enabled, with a resumption time of {res_m} min" 367 "Stream Management enabled, with a resumption time of {res_m} min"
368 .format(res_m = max_s/60))) 368 .format(res_m = max_s/60)))
369 session.session_max = max_s 369 session.session_max = max_s
370 370
371 def onResumed(self, enabled_elt, client): 371 def onResumed(self, enabled_elt, client):
372 session = client._xep_0198_session 372 session = client._xep_0198_session
378 # we resend all stanza which have not been received properly 378 # we resend all stanza which have not been received properly
379 self.replayBuffer(client, session.buffer) 379 self.replayBuffer(client, session.buffer)
380 # now we can continue the session 380 # now we can continue the session
381 session.enabled = True 381 session.enabled = True
382 d_time = time.time() - session.disconnected_time 382 d_time = time.time() - session.disconnected_time
383 log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} " 383 log.info(_("Stream session resumed (disconnected for {d_time} s, {count} "
384 u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) 384 "stanza(s) resent)").format(d_time=int(d_time), count=resend_count))
385 385
386 def onFailed(self, failed_elt, client): 386 def onFailed(self, failed_elt, client):
387 session = client._xep_0198_session 387 session = client._xep_0198_session
388 condition_elt = failed_elt.firstChildElement() 388 condition_elt = failed_elt.firstChildElement()
389 buffer_ = session.getBufferCopy() 389 buffer_ = session.getBufferCopy()
391 391
392 try: 392 try:
393 del session.resuming 393 del session.resuming
394 except AttributeError: 394 except AttributeError:
395 # stream management can't be started at all 395 # stream management can't be started at all
396 msg = _(u"Can't use stream management") 396 msg = _("Can't use stream management")
397 if condition_elt is None: 397 if condition_elt is None:
398 log.error(msg + u'.') 398 log.error(msg + '.')
399 else: 399 else:
400 log.error(_(u"{msg}: {reason}").format( 400 log.error(_("{msg}: {reason}").format(
401 msg=msg, reason=condition_elt.name)) 401 msg=msg, reason=condition_elt.name))
402 else: 402 else:
403 # only stream resumption failed, we can try full session init 403 # only stream resumption failed, we can try full session init
404 # XXX: we try to start full session init from this point, with many 404 # XXX: we try to start full session init from this point, with many
405 # variables/attributes already initialised with a potentially different 405 # variables/attributes already initialised with a potentially different
406 # jid. This is experimental and may not be safe. It may be more 406 # jid. This is experimental and may not be safe. It may be more
407 # secured to abord the connection and restart everything with a fresh 407 # secured to abord the connection and restart everything with a fresh
408 # client. 408 # client.
409 msg = _(u"stream resumption not possible, restarting full session") 409 msg = _("stream resumption not possible, restarting full session")
410 410
411 if condition_elt is None: 411 if condition_elt is None:
412 log.warning(u'{msg}.'.format(msg=msg)) 412 log.warning('{msg}.'.format(msg=msg))
413 else: 413 else:
414 log.warning(u"{msg}: {reason}".format( 414 log.warning("{msg}: {reason}".format(
415 msg=msg, reason=condition_elt.name)) 415 msg=msg, reason=condition_elt.name))
416 # stream resumption failed, but we still can do normal stream management 416 # stream resumption failed, but we still can do normal stream management
417 # we restore attributes as if the session was new, and init stream 417 # we restore attributes as if the session was new, and init stream
418 # we keep everything initialized, and only do binding, roster request 418 # we keep everything initialized, and only do binding, roster request
419 # and initial presence sending. 419 # and initial presence sending.
420 if client.conn_deferred.called: 420 if client.conn_deferred.called:
421 client.conn_deferred = defer.Deferred() 421 client.conn_deferred = defer.Deferred()
422 else: 422 else:
423 log.error(u"conn_deferred should be called at this point") 423 log.error("conn_deferred should be called at this point")
424 plg_0045 = self.host.plugins.get(u'XEP-0045') 424 plg_0045 = self.host.plugins.get('XEP-0045')
425 plg_0313 = self.host.plugins.get(u'XEP-0313') 425 plg_0313 = self.host.plugins.get('XEP-0313')
426 426
427 # FIXME: we should call all loaded plugins with generic callbacks 427 # FIXME: we should call all loaded plugins with generic callbacks
428 # (e.g. prepareResume and resume), so a hot resuming can be done 428 # (e.g. prepareResume and resume), so a hot resuming can be done
429 # properly for all plugins. 429 # properly for all plugins.
430 430
491 session.req_timer.cancel() 491 session.req_timer.cancel()
492 session.req_timer = None 492 session.req_timer = None
493 try: 493 try:
494 server_acked = int(a_elt['h']) 494 server_acked = int(a_elt['h'])
495 except ValueError: 495 except ValueError:
496 log.warning(_(u"Server returned invalid ack element, disabling stream " 496 log.warning(_("Server returned invalid ack element, disabling stream "
497 u"management: {xml}").format(xml=a_elt)) 497 "management: {xml}").format(xml=a_elt))
498 session.enabled = False 498 session.enabled = False
499 return 499 return
500 500
501 if server_acked > session.out_counter: 501 if server_acked > session.out_counter:
502 log.error(_(u"Server acked more stanzas than we have sent, disabling stream " 502 log.error(_("Server acked more stanzas than we have sent, disabling stream "
503 u"management.")) 503 "management."))
504 session.reset() 504 session.reset()
505 return 505 return
506 506
507 self.updateBuffer(session, server_acked) 507 self.updateBuffer(session, server_acked)
508 self.checkAcks(client) 508 self.checkAcks(client)
509 509
510 def onAckTimeOut(self, client): 510 def onAckTimeOut(self, client):
511 """Called when a requested ACK has not been received in time""" 511 """Called when a requested ACK has not been received in time"""
512 log.info(_(u"Ack was not received in time, aborting connection")) 512 log.info(_("Ack was not received in time, aborting connection"))
513 transport = client.xmlstream.transport 513 transport = client.xmlstream.transport
514 if transport is None: 514 if transport is None:
515 log.warning(u"transport was already removed") 515 log.warning("transport was already removed")
516 else: 516 else:
517 transport.abortConnection() 517 transport.abortConnection()
518 client._xep_0198_session.req_timer = None 518 client._xep_0198_session.req_timer = None
519 519
520 520
521 @implementer(iwokkel.IDisco)
521 class XEP_0198_handler(xmlstream.XMPPHandler): 522 class XEP_0198_handler(xmlstream.XMPPHandler):
522 implements(iwokkel.IDisco)
523 523
524 def __init__(self, plugin_parent): 524 def __init__(self, plugin_parent):
525 self.plugin_parent = plugin_parent 525 self.plugin_parent = plugin_parent
526 self.host = plugin_parent.host 526 self.host = plugin_parent.host
527 527