# HG changeset patch # User Goffi # Date 1542552586 -3600 # Node ID 1ecceac3df96aa75484cb826e2dfc4c33fac4dc1 # Parent 56bfe1b792040cc8d20af506aa8e8470f591b1c8 plugin XEP-0198: Stream Management implementation: - hooks can now be set in stream onElement and send methods - xmllog refactored to use new hooks - client.isConnected now uses transport.connected method - fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour) - new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized") diff -r 56bfe1b79204 -r 1ecceac3df96 CHANGELOG --- a/CHANGELOG Sat Nov 10 10:16:38 2018 +0100 +++ b/CHANGELOG Sun Nov 18 15:49:46 2018 +0100 @@ -5,6 +5,7 @@ This is also the first "general audience" version. - XEP-0070 implementation (HTTP Auth via XMPP) - XEP-0184 implementation (Delivery Receipts) + - XEP-0198 implementation (Stream Management) - XEP-0199 implementation (XMPP Ping) - XEP-0231 implementation (Bits of Binary) - XEP-0264 implementation (Thumbnails) diff -r 56bfe1b79204 -r 1ecceac3df96 sat/core/constants.py --- a/sat/core/constants.py Sat Nov 10 10:16:38 2018 +0100 +++ b/sat/core/constants.py Sun Nov 18 15:49:46 2018 +0100 @@ -50,7 +50,7 @@ ## Protocol ## XMPP_C2S_PORT = 5222 XMPP_KEEP_ALIFE = 180 - XMPP_MAX_RETRIES = 2 + XMPP_MAX_RETRIES = None # default port used on Prosody, may differ on other servers XMPP_COMPONENT_PORT = 5347 @@ -403,6 +403,11 @@ NO_LIMIT = -1 # used in bridge when a integer value is expected DEFAULT_MAX_AGE = 1209600 # default max age of cached files, in seconds HASH_SHA1_EMPTY = "da39a3ee5e6b4b0d3255bfef95601890afd80709" + STANZA_NAMES = (u"iq", u"message", u"presence") + + # Stream Hooks + STREAM_HOOK_SEND = u"send" + STREAM_HOOK_RECEIVE = u"receive" @classmethod def LOG_OPTIONS(cls): diff -r 56bfe1b79204 -r 1ecceac3df96 sat/core/patches.py --- a/sat/core/patches.py Sat Nov 10 10:16:38 2018 +0100 +++ b/sat/core/patches.py Sun Nov 18 15:49:46 2018 +0100 @@ -1,10 +1,19 @@ -from twisted.words.protocols.jabber import xmlstream +from twisted.words.protocols.jabber import xmlstream, sasl, client as tclient from twisted.internet import ssl from wokkel import client +from sat.core.constants import Const as C +from sat.core.log import getLogger -"""This module apply monkey patches to Twisted and Wokkel to handle certificate validation - during XMPP connection""" +log = getLogger(__name__) +"""This module apply monkey patches to Twisted and Wokkel + First part handle certificate validation during XMPP connectionand are temporary + (until merged upstream). + Second part add a trigger point to send and onElement method of XmlStream + """ + + +## certificate validation patches class TLSInitiatingInitializer(xmlstream.TLSInitiatingInitializer): check_certificate = True @@ -21,13 +30,15 @@ class XMPPClient(client.XMPPClient): - def __init__(self, jid, password, host=None, port=5222, check_certificate=True): + def __init__(self, jid, password, host=None, port=5222, + check_certificate=True): self.jid = jid self.domain = jid.host.encode('idna') self.host = host self.port = port - factory = HybridClientFactory(jid, password, check_certificate) + factory = HybridClientFactory( + jid, password, check_certificate=check_certificate) client.StreamManager.__init__(self, factory) @@ -39,6 +50,7 @@ class HybridAuthenticator(client.HybridAuthenticator): + res_binding = True def __init__(self, jid, password, check_certificate): xmlstream.ConnectAuthenticator.__init__(self, jid.host) @@ -53,11 +65,83 @@ tlsInit.check_certificate = self.check_certificate xs.initializers = [client.client.CheckVersionInitializer(xs), tlsInit, - client.CheckAuthInitializer(xs)] + CheckAuthInitializer(xs, self.res_binding)] + + +# XmlStream triggers + + +class XmlStream(xmlstream.XmlStream): + """XmlStream which allows to add hooks""" + + def __init__(self, authenticator): + xmlstream.XmlStream.__init__(self, authenticator) + # hooks at this level should not modify content + # so it's not needed to handle priority as with triggers + self._onElementHooks = [] + self._sendHooks = [] + + def addHook(self, hook_type, callback): + """Add a send or receive hook""" + conflict_msg = (u"Hook conflict: can't add {hook_type} hook {callback}" + .format(hook_type=hook_type, callback=callback)) + if hook_type == C.STREAM_HOOK_RECEIVE: + if callback not in self._onElementHooks: + self._onElementHooks.append(callback) + else: + log.warning(conflict_msg) + elif hook_type == C.STREAM_HOOK_SEND: + if callback not in self._sendHooks: + self._sendHooks.append(callback) + else: + log.warning(conflict_msg) + else: + raise ValueError(u"Invalid hook type: {hook_type}" + .format(hook_type=hook_type)) + + def onElement(self, element): + for hook in self._onElementHooks: + hook(element) + xmlstream.XmlStream.onElement(self, element) + + def send(self, obj): + for hook in self._sendHooks: + hook(obj) + xmlstream.XmlStream.send(self, obj) + + +# Binding activation (needed for stream management, XEP-0198) + + +class CheckAuthInitializer(client.CheckAuthInitializer): + + def __init__(self, xs, res_binding): + super(CheckAuthInitializer, self).__init__(xs) + self.res_binding = res_binding + + def initialize(self): + # XXX: modification of client.CheckAuthInitializer which has optional + # resource binding, and which doesn't do deprecated + # SessionInitializer + if (sasl.NS_XMPP_SASL, 'mechanisms') in self.xmlstream.features: + inits = [(sasl.SASLInitiatingInitializer, True)] + if self.res_binding: + inits.append((tclient.BindInitializer, True)), + + for initClass, required in inits: + init = initClass(self.xmlstream) + init.required = required + self.xmlstream.initializers.append(init) + elif (tclient.NS_IQ_AUTH_FEATURE, 'auth') in self.xmlstream.features: + self.xmlstream.initializers.append( + tclient.IQAuthInitializer(self.xmlstream)) + else: + raise Exception("No available authentication method found") def apply(): + # certificate validation xmlstream.TLSInitiatingInitializer = TLSInitiatingInitializer client.XMPPClient = XMPPClient - client.HybridClientFactory = HybridClientFactory - client.HybridAuthenticator = HybridAuthenticator + # XmlStream triggers + xmlstream.XmlStreamFactory.protocol = XmlStream diff -r 56bfe1b79204 -r 1ecceac3df96 sat/core/sat_main.py --- a/sat/core/sat_main.py Sat Nov 10 10:16:38 2018 +0100 +++ b/sat/core/sat_main.py Sun Nov 18 15:49:46 2018 +0100 @@ -46,6 +46,7 @@ import os.path import uuid + try: from collections import OrderedDict # only available from python 2.7 except ImportError: diff -r 56bfe1b79204 -r 1ecceac3df96 sat/core/xmpp.py --- a/sat/core/xmpp.py Sat Nov 10 10:16:38 2018 +0100 +++ b/sat/core/xmpp.py Sun Nov 18 15:49:46 2018 +0100 @@ -49,20 +49,20 @@ class SatXMPPEntity(object): """Common code for Client and Component""" - _reason = None # reason of disconnection def __init__(self, host_app, profile, max_retries): - - self.factory.clientConnectionLost = self.connectionLost - self.factory.maxRetries = max_retries - # when self._connected is None, we are not connected + factory = self.factory + factory.maxRetries = max_retries + factory.maxDelay = 30 + # when self._connected_d is None, we are not connected # else, it's a deferred which fire on disconnection - self._connected = None + self._connected_d = None self.profile = profile self.host_app = host_app self.cache = cache.Cache(host_app, profile) self._mess_id_uid = {} # map from message id to uid used in history. # Key: (full_jid,message_id) Value: uid + # this Deferred fire when entity is connected self.conn_deferred = defer.Deferred() self._progress_cb = {} # callback called when a progress is requested # (key = progress id) @@ -145,15 +145,12 @@ "Password", "Connection", profile_key=profile ) entity = host.profiles[profile] = cls( - host, - profile, - jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)), - password, - host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile) - or None, - port, - max_retries, - ) + host, profile, jid.JID(host.memory.getParamA("JabberID", "Connection", + profile_key=profile)), password, + host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", + profile_key=profile) or None, + port, max_retries, + ) entity._createSubProtocols() @@ -172,7 +169,7 @@ entity.startService() - yield entity.getConnectionDeferred() + yield entity.conn_deferred yield defer.maybeDeferred(entity.entityConnected) @@ -205,40 +202,16 @@ # we finally send our presence entity.presence.available() - def getConnectionDeferred(self): - """Return a deferred which fire when the client is connected""" - return self.conn_deferred - def _disconnectionCb(self, __): - self._connected = None + self._connected_d = None def _disconnectionEb(self, failure_): log.error(_(u"Error while disconnecting: {}".format(failure_))) def _authd(self, xmlstream): - if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile): - return super(SatXMPPEntity, self)._authd(xmlstream) - - if self._reason is not None: - # if we have had trouble to connect we can reset - # the exception as the connection is now working. - del self._reason - - # the following Deferred is used to know when we are connected - # so we need to be set it to None when connection is lost - self._connected = defer.Deferred() - self._connected.addCallback(self._cleanConnection) - self._connected.addCallback(self._disconnectionCb) - self._connected.addErrback(self._disconnectionEb) - - log.info( - _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile) - ) + log.debug(_(u"{profile} identified").format(profile=self.profile)) self.streamInitialized() - self.host_app.bridge.connected( - self.profile, unicode(self.jid) - ) # we send the signal to the clients def _finish_connection(self, __): self.conn_deferred.callback(None) @@ -246,11 +219,32 @@ def streamInitialized(self): """Called after _authd""" log.debug(_(u"XML stream is initialized")) + if not self.host_app.trigger.point("xml_init", self): + return + self.postStreamInit() + + def postStreamInit(self): + """Workflow after stream initalisation.""" + log.info( + _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile) + ) + + # the following Deferred is used to know when we are connected + # so we need to be set it to None when connection is lost + self._connected_d = defer.Deferred() + self._connected_d.addCallback(self._cleanConnection) + self._connected_d.addCallback(self._disconnectionCb) + self._connected_d.addErrback(self._disconnectionEb) + + self.host_app.bridge.connected( + self.profile, unicode(self.jid) + ) # we send the signal to the clients + + self.keep_alife = task.LoopingCall( self.xmlstream.send, " " ) # Needed to avoid disconnection (specially with openfire) self.keep_alife.start(C.XMPP_KEEP_ALIFE) - self.disco = SatDiscoProtocol(self) self.disco.setHandlerParent(self) self.discoHandler = disco.DiscoHandler() @@ -278,21 +272,27 @@ ## connection ## - def _disconnected(self, reason): - # we have to save the reason of disconnection, otherwise it would be lost - self._reason = reason - super(SatXMPPEntity, self)._disconnected(reason) + def _connected(self, xs): + send_hooks = [] + receive_hooks = [] + self.host_app.trigger.point( + "stream_hooks", self, receive_hooks, send_hooks) + for hook in receive_hooks: + xs.addHook(C.STREAM_HOOK_RECEIVE, hook) + for hook in send_hooks: + xs.addHook(C.STREAM_HOOK_SEND, hook) + super(SatXMPPEntity, self)._connected(xs) - def connectionLost(self, connector, reason): + def disconnectProfile(self, reason): try: self.keep_alife.stop() except AttributeError: log.debug(_("No keep_alife")) - if self._connected is not None: + if self._connected_d is not None: self.host_app.bridge.disconnected( self.profile ) # we send the signal to the clients - self._connected.callback(None) + self._connected_d.callback(None) self.host_app.purgeEntity( self.profile ) # and we remove references to this client @@ -302,12 +302,10 @@ ) ) if not self.conn_deferred.called: - # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody, - # we should have the real error) - if self._reason is None: + if reason is None: err = error.StreamError(u"Server unexpectedly closed the connection") else: - err = self._reason + err = reason try: if err.value.args[0][0][2] == "certificate verify failed": err = exceptions.InvalidCertificate( @@ -316,10 +314,17 @@ u"This should never happen and may indicate that " u"somebody is trying to spy on you.\n" u"Please contact your server administrator.")) + self.factory.continueTrying = 0 except (IndexError, TypeError): pass self.conn_deferred.errback(err) + def _disconnected(self, reason): + super(SatXMPPEntity, self)._disconnected(reason) + if not self.host_app.trigger.point("disconnected", self, reason): + return + self.disconnectProfile(reason) + @defer.inlineCallbacks def _cleanConnection(self, __): """method called on disconnection @@ -333,13 +338,18 @@ yield disconnected_cb(self) def isConnected(self): - return self._connected is not None + try: + return bool(self.xmlstream.transport.connected) + except AttributeError: + return False def entityDisconnect(self): + if not self.host_app.trigger.point("disconnecting", self): + return log.info(_(u"Disconnecting...")) self.stopService() - if self._connected is not None: - return self._connected + if self._connected_d is not None: + return self._connected_d else: return defer.succeed(None) @@ -735,16 +745,8 @@ ) # XXX: set to True from entry plugin to keep messages in history for received # messages - def __init__( - self, - host_app, - profile, - component_jid, - password, - host=None, - port=None, - max_retries=C.XMPP_MAX_RETRIES, - ): + def __init__(self, host_app, profile, component_jid, password, host=None, port=None, + max_retries=C.XMPP_MAX_RETRIES): self.started = time.time() if port is None: port = C.XMPP_COMPONENT_PORT diff -r 56bfe1b79204 -r 1ecceac3df96 sat/plugins/plugin_misc_xmllog.py --- a/sat/plugins/plugin_misc_xmllog.py Sat Nov 10 10:16:38 2018 +0100 +++ b/sat/plugins/plugin_misc_xmllog.py Sun Nov 18 15:49:46 2018 +0100 @@ -23,7 +23,7 @@ log = getLogger(__name__) from twisted.words.xish import domish -from twisted.words.xish import xmlstream +from functools import partial PLUGIN_INFO = { C.PI_NAME: "Raw XML log Plugin", @@ -36,26 +36,6 @@ C.PI_DESCRIPTION: _(u"""Send raw XML logs to bridge"""), } -host = None - - -def send(self, obj): - global host - if isinstance(obj, basestring): - log = unicode(obj) - elif isinstance(obj, domish.Element): - log = obj.toXml() - else: - log.error(_(u"INTERNAL ERROR: Unmanaged XML type")) - host.bridge.xmlLog("OUT", log, self._profile) - return self._original_send(obj) - - -def onElement(self, element): - global host - host.bridge.xmlLog("IN", element.toXml(), self._profile) - return self._original_onElement(element) - class XmlLog(object): @@ -71,30 +51,32 @@ "label_xmllog": _("Activate XML log") } - def __init__(self, host_): + def __init__(self, host): log.info(_("Plugin XML Log initialization")) - global host - host = host_ - - # parameters + self.host = host host.memory.updateParams(self.params) - - # bridge host.bridge.addSignal( "xmlLog", ".plugin", signature="sss" ) # args: direction("IN" or "OUT"), xml_data, profile - self.do_log = host.memory.getParamA("Xml log", "Debug") + host.trigger.add("stream_hooks", self.addHooks) + + def addHooks(self, client, receive_hooks, send_hooks): + self.do_log = self.host.memory.getParamA("Xml log", "Debug") if self.do_log: - XmlStream = xmlstream.XmlStream - XmlStream._original_send = XmlStream.send - XmlStream._original_onElement = XmlStream.onElement - XmlStream.send = send - XmlStream.onElement = onElement - XmlStream._profile = "" - host.trigger.add("XML Initialized", self.setProfile) + receive_hooks.append(partial(self.onReceive, client=client)) + send_hooks.append(partial(self.onSend, client=client)) log.info(_(u"XML log activated")) + return True - def setProfile(self, xmlstream, profile): - xmlstream._profile = profile - return True + def onReceive(self, element, client): + self.host.bridge.xmlLog("IN", element.toXml(), client.profile) + + def onSend(self, obj, client): + if isinstance(obj, basestring): + log = unicode(obj) + elif isinstance(obj, domish.Element): + log = obj.toXml() + else: + log.error(_(u"INTERNAL ERROR: Unmanaged XML type")) + self.host.bridge.xmlLog("OUT", log, client.profile) diff -r 56bfe1b79204 -r 1ecceac3df96 sat/plugins/plugin_xep_0198.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0198.py Sun Nov 18 15:49:46 2018 +0100 @@ -0,0 +1,453 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# SàT plugin for managing raw XML log +# Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from sat.core.i18n import _ +from sat.core.constants import Const as C +from sat.core import exceptions +from sat.core.log import getLogger +from twisted.words.protocols.jabber import client as jabber_client +from twisted.words.protocols.jabber import xmlstream +from twisted.words.xish import domish +from twisted.internet import defer +from twisted.internet import task, reactor +from functools import partial +from wokkel import disco, iwokkel +from zope.interface import implements +import collections +import time + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "Stream Management", + C.PI_IMPORT_NAME: "XEP-0198", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0198"], + C.PI_DEPENDENCIES: [], + C.PI_MAIN: "XEP_0198", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""), +} + +NS_SM = u"urn:xmpp:sm:3" +SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]' +SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]' +SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]' +SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]' +SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]' +SM_H_REQUEST = '/h[@xmlns="' + NS_SM + '"]' +# Max number of stanza to send before requesting ack +MAX_STANZA_ACK_R = 5 +# Max number of seconds before requesting ack +MAX_DELAY_ACK_R = 30 +MAX_COUNTER = 2**32 +RESUME_MAX = 5*60 + + +class ProfileSessionData(object): + out_counter = 0 + in_counter = 0 + session_id = None + location = None + session_max = None + # True when an ack answer is expected + ack_requested = False + last_ack_r = 0 + disconnected_time = None + + def __init__(self, callback, **kw): + self.buffer = collections.deque() + self.buffer_idx = 0 + self._enabled = False + self.timer = None + self.callback_data = (callback, kw) + + @property + def enabled(self): + return self._enabled + + @enabled.setter + def enabled(self, enabled): + if enabled: + if self._enabled: + raise exceptions.InternalError( + u"Stream Management can't be enabled twice") + self._enabled = True + callback, kw = self.callback_data + self.timer = task.LoopingCall(callback, **kw) + self.timer.start(MAX_DELAY_ACK_R, now=False) + else: + self._enabled = False + if self.timer is not None: + self.timer.stop() + self.timer = None + + @property + def resume_enabled(self): + return self.session_id is not None + + def reset(self): + self.enabled = False + self.buffer.clear() + self.buffer_idx = 0 + self.in_counter = self.out_counter = 0 + self.session_id = self.location = None + self.ack_requested = False + self.last_ack_r = 0 + + +class XEP_0198(object): + # FIXME: location is not handled yet + + def __init__(self, host): + log.info(_("Plugin Stream Management initialization")) + self.host = host + host.registerNamespace(u'sm', NS_SM) + host.trigger.add("stream_hooks", self.addHooks) + host.trigger.add("xml_init", self._XMLInitTrigger) + host.trigger.add("disconnecting", self._disconnectingTrigger) + host.trigger.add("disconnected", self._disconnectedTrigger) + + def profileConnecting(self, client): + client._xep_0198_session = ProfileSessionData(callback=self.checkAcks, + client=client) + + def getHandler(self, client): + return XEP_0198_handler(self) + + def addHooks(self, client, receive_hooks, send_hooks): + """Add hooks to handle in/out stanzas counters""" + receive_hooks.append(partial(self.onReceive, client=client)) + send_hooks.append(partial(self.onSend, client=client)) + return True + + def _XMLInitTrigger(self, client): + """Enable or resume a stream mangement""" + if not (NS_SM, u'sm') in client.xmlstream.features: + log.warning(_( + u"Your server doesn't support stream management ({namespace}), this is " + u"used to improve connection problems detection (like network outages). " + u"Please ask your server administrator to enable this feature.".format( + namespace=NS_SM))) + return True + session = client._xep_0198_session + + # a disconnect timer from a previous disconnection may still be active + try: + disconnect_timer = session.disconnect_timer + except AttributeError: + pass + else: + if disconnect_timer.active(): + disconnect_timer.cancel() + del session.disconnect_timer + + if session.resume_enabled: + # we are resuming a session + resume_elt = domish.Element((NS_SM, 'resume')) + resume_elt['h'] = unicode(session.in_counter) + resume_elt['previd'] = session.session_id + client.send(resume_elt) + session.resuming = True + # session.enabled will be set on reception + return False + else: + # we start a new session + assert session.out_counter == 0 + enable_elt = domish.Element((NS_SM, 'enable')) + enable_elt[u'resume'] = u'true' + client.send(enable_elt) + session.enabled = True + return True + + def _disconnectingTrigger(self, client): + session = client._xep_0198_session + if session.enabled: + self.sendAck(client) + # This is a requested disconnection, so we can reset the session + # to disable resuming and close normally the stream + session.reset() + return True + + def _disconnectedTrigger(self, client, reason): + session = client._xep_0198_session + session.enabled = False + if session.resume_enabled: + session.disconnected_time = time.time() + session.disconnect_timer = reactor.callLater(session.session_max, + client.disconnectProfile, + reason) + # disconnectProfile must not be called at this point + # because session can be resumed + return False + else: + return True + + def checkAcks(self, client): + """Request ack if needed""" + session = client._xep_0198_session + # log.debug("checkAcks (in_counter={}, out_counter={}, buf len={}, buf idx={})" + # .format(session.in_counter, session.out_counter, len(session.buffer), + # session.buffer_idx)) + if session.ack_requested or not session.buffer: + return + if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R + or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R): + self.requestAck(client) + session.ack_requested = True + session.last_ack_r = time.time() + + def updateBuffer(self, session, server_acked): + """Update buffer and buffer_index""" + if server_acked > session.buffer_idx: + diff = server_acked - session.buffer_idx + for i in xrange(diff): + session.buffer.pop() + session.buffer_idx += diff + + def sendAck(self, client): + """Send an answer element with current IN counter""" + a_elt = domish.Element((NS_SM, 'a')) + a_elt['h'] = unicode(client._xep_0198_session.in_counter) + client.send(a_elt) + + def requestAck(self, client): + """Send a request element""" + r_elt = domish.Element((NS_SM, 'r')) + client.send(r_elt) + + def _connectionFailed(self, failure_, connector): + normal_host, normal_port = connector.normal_location + del connector.normal_location + log.warning(_( + u"Connection failed using location given by server (host: {host}, port: " + u"{port}), switching to normal host and port (host: {normal_host}, port: " + u"{normal_port})".format(host=connector.host, port=connector.port, + normal_host=normal_host, normal_port=normal_port))) + connector.host, connector.port = normal_host, normal_port + connector.connectionFailed = connector.connectionFailed_ori + del connector.connectionFailed_ori + return connector.connectionFailed(failure_) + + def onEnabled(self, enabled_elt, client): + session = client._xep_0198_session + session.in_counter = 0 + + # we check that resuming is possible and that we have a session id + resume = C.bool(enabled_elt.getAttribute(u'resume')) + session_id = enabled_elt.getAttribute(u'id') + if not session_id: + log.warning(_(u'Incorrect element received, no "id" attribute')) + if not resume or not session_id: + log.warning(_( + u"You're server doesn't support session resuming with stream management, " + u"please contact your server administrator to enable it")) + return + + session.session_id = session_id + + # XXX: we disable resource binding, which must not be done + # when we resume the session. + client.factory.authenticator.res_binding = False + + # location, in case server want resuming session to be elsewhere + try: + location = enabled_elt[u'location'] + except KeyError: + pass + else: + # TODO: handle IPv6 here (in brackets, cf. XEP) + try: + domain, port = location.split(':', 1) + port = int(port) + except ValueError: + log.warning(_(u"Invalid location received: {location}") + .format(location=location)) + else: + session.location = (domain, port) + # we monkey patch connector to use the new location + connector = client.xmlstream.transport.connector + connector.normal_location = connector.host, connector.port + connector.host = domain + connector.port = port + connector.connectionFailed_ori = connector.connectionFailed + connector.connectionFailed = partial(self._connectionFailed, + connector=connector) + + # resuming time + try: + max_s = int(enabled_elt[u'max']) + except (ValueError, KeyError) as e: + if isinstance(e, ValueError): + log.warning(_(u'Invalid "max" attribute')) + max_s = RESUME_MAX + log.info(_(u"Using default session max value ({max_s} s).".format( + max_s=max_s))) + log.info(_(u"Stream Management enabled")) + else: + log.info(_( + u"Stream Management enabled, with a resumption time of {res_m} min" + .format(res_m = max_s/60))) + session.session_max = max_s + + def onResumed(self, enabled_elt, client): + session = client._xep_0198_session + assert not session.enabled + del session.resuming + server_acked = int(enabled_elt['h']) + self.updateBuffer(session, server_acked) + resend_count = len(session.buffer) + # we resend all stanza which have not been received properly + while True: + try: + stanza = session.buffer.pop() + except IndexError: + break + else: + client.send(stanza) + # now we can continue the session + session.enabled = True + d_time = time.time() - session.disconnected_time + log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} " + u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count)) + + def onFailed(self, failed_elt, client): + session = client._xep_0198_session + condition_elt = failed_elt.firstChildElement() + session.reset() + + try: + del session.resuming + except AttributeError: + # stream management can't be started at all + msg = _(u"Can't use stream management") + if condition_elt is None: + log.error(msg + u'.') + else: + log.error(_(u"{msg}: {reason}").format( + msg=msg, reason=condition_elt.name)) + else: + # only stream resumption failed, we can try full session init + # XXX: we try to start full session init from this point, with many + # variables/attributes already initialised with a potentially different + # jid. This is experimental and may not be safe. It may be more + # secured to abord the connection and restart everything with a fresh + # client. + msg = _(u"stream resumption not possible, restarting full session") + + if condition_elt is None: + log.warning(u'{msg}.'.format(msg=msg)) + else: + log.warning(u"{msg}: {reason}".format( + msg=msg, reason=condition_elt.name)) + # stream resumption failed, but we still can do normal stream management + # we restore attributes as if the session was new, and init stream + # we keep everything initialized, and only do binding, roster request + # and initial presence sending. + if client.conn_deferred.called: + client.conn_deferred = defer.Deferred() + else: + log.error(u"conn_deferred should be called at this point") + # we need to recreate roster + client.handlers.remove(client.roster) + client.roster = client.roster.__class__(self.host) + client.roster.setHandlerParent(client) + # bind init is not done when resuming is possible, so we have to do it now + bind_init = jabber_client.BindInitializer(client.xmlstream) + bind_init.required = True + d = bind_init.start() + # we set the jid, which may have changed + d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid)) + # we call the trigger who will send the element + d.addCallback(lambda __: self._XMLInitTrigger(client)) + # then we have to re-request the roster, as changes may have occured + d.addCallback(lambda __: client.roster.requestRoster()) + # we add got_roster to be sure to have roster before sending initial presence + d.addCallback(lambda __: client.roster.got_roster) + # initial presence must be sent manually + d.addCallback(lambda __: client.presence.available()) + + def onReceive(self, element, client): + session = client._xep_0198_session + if session.enabled and element.name.lower() in C.STANZA_NAMES: + session.in_counter += 1 % MAX_COUNTER + + def onSend(self, obj, client): + session = client._xep_0198_session + if (session.enabled + and domish.IElement.providedBy(obj) + and obj.name.lower() in C.STANZA_NAMES): + session.out_counter += 1 % MAX_COUNTER + session.buffer.appendleft(obj) + self.checkAcks(client) + + def onAckRequest(self, r_elt, client): + self.sendAck(client) + + def onAckAnswer(self, a_elt, client): + session = client._xep_0198_session + session.ack_requested = False + try: + server_acked = int(a_elt['h']) + except ValueError: + log.warning(_(u"Server returned invalid ack element, disabling stream " + u"management: {xml}").format(xml=a_elt)) + session.enabled = False + return + + if server_acked > session.out_counter: + log.error(_(u"Server acked more stanzas than we have sent, disabling stream " + u"management.")) + session.reset() + return + + self.updateBuffer(session, server_acked) + self.checkAcks(client) + + +class XEP_0198_handler(xmlstream.XMPPHandler): + implements(iwokkel.IDisco) + + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + self.host = plugin_parent.host + + def connectionInitialized(self): + self.xmlstream.addObserver( + SM_ENABLED, self.plugin_parent.onEnabled, client=self.parent + ) + self.xmlstream.addObserver( + SM_RESUMED, self.plugin_parent.onResumed, client=self.parent + ) + self.xmlstream.addObserver( + SM_FAILED, self.plugin_parent.onFailed, client=self.parent + ) + self.xmlstream.addObserver( + SM_R_REQUEST, self.plugin_parent.onAckRequest, client=self.parent + ) + self.xmlstream.addObserver( + SM_A_REQUEST, self.plugin_parent.onAckAnswer, client=self.parent + ) + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_SM)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []