changeset 2691:1ecceac3df96

plugin XEP-0198: Stream Management implementation: - hooks can now be set in stream onElement and send methods - xmllog refactored to use new hooks - client.isConnected now uses transport.connected method - fixed reconnection, SàT will now try to reconnect indefinitely until it success, unresolvable failure happen (e.g. invalid certificate), or explicit disconnection is requested (or a plugin change this behaviour) - new triggers: "stream_hooks", "disconnecting", "disconnected", and "xml_init" (replace "XML Initialized")
author Goffi <goffi@goffi.org>
date Sun, 18 Nov 2018 15:49:46 +0100
parents 56bfe1b79204
children 7d4679ee7ded
files CHANGELOG sat/core/constants.py sat/core/patches.py sat/core/sat_main.py sat/core/xmpp.py sat/plugins/plugin_misc_xmllog.py sat/plugins/plugin_xep_0198.py
diffstat 7 files changed, 643 insertions(+), 115 deletions(-) [+]
line wrap: on
line diff
--- a/CHANGELOG	Sat Nov 10 10:16:38 2018 +0100
+++ b/CHANGELOG	Sun Nov 18 15:49:46 2018 +0100
@@ -5,6 +5,7 @@
     This is also the first "general audience" version.
     - XEP-0070 implementation (HTTP Auth via XMPP)
     - XEP-0184 implementation (Delivery Receipts)
+    - XEP-0198 implementation (Stream Management)
     - XEP-0199 implementation (XMPP Ping)
     - XEP-0231 implementation (Bits of Binary)
     - XEP-0264 implementation (Thumbnails)
--- a/sat/core/constants.py	Sat Nov 10 10:16:38 2018 +0100
+++ b/sat/core/constants.py	Sun Nov 18 15:49:46 2018 +0100
@@ -50,7 +50,7 @@
     ## Protocol ##
     XMPP_C2S_PORT = 5222
     XMPP_KEEP_ALIFE = 180
-    XMPP_MAX_RETRIES = 2
+    XMPP_MAX_RETRIES = None
     # default port used on Prosody, may differ on other servers
     XMPP_COMPONENT_PORT = 5347
 
@@ -403,6 +403,11 @@
     NO_LIMIT = -1  # used in bridge when a integer value is expected
     DEFAULT_MAX_AGE = 1209600  # default max age of cached files, in seconds
     HASH_SHA1_EMPTY = "da39a3ee5e6b4b0d3255bfef95601890afd80709"
+    STANZA_NAMES = (u"iq", u"message", u"presence")
+
+    # Stream Hooks
+    STREAM_HOOK_SEND = u"send"
+    STREAM_HOOK_RECEIVE = u"receive"
 
     @classmethod
     def LOG_OPTIONS(cls):
--- a/sat/core/patches.py	Sat Nov 10 10:16:38 2018 +0100
+++ b/sat/core/patches.py	Sun Nov 18 15:49:46 2018 +0100
@@ -1,10 +1,19 @@
-from twisted.words.protocols.jabber import xmlstream
+from twisted.words.protocols.jabber import xmlstream, sasl, client as tclient
 from twisted.internet import ssl
 from wokkel import client
+from sat.core.constants import Const as C
+from sat.core.log import getLogger
 
-"""This module apply monkey patches to Twisted and Wokkel to handle certificate validation
-   during XMPP connection"""
+log = getLogger(__name__)
 
+"""This module apply monkey patches to Twisted and Wokkel
+   First part handle certificate validation during XMPP connectionand are temporary
+   (until merged upstream).
+   Second part add a trigger point to send and onElement method of XmlStream
+   """
+
+
+## certificate validation patches
 
 class TLSInitiatingInitializer(xmlstream.TLSInitiatingInitializer):
     check_certificate = True
@@ -21,13 +30,15 @@
 
 class XMPPClient(client.XMPPClient):
 
-    def __init__(self, jid, password, host=None, port=5222, check_certificate=True):
+    def __init__(self, jid, password, host=None, port=5222,
+                 check_certificate=True):
         self.jid = jid
         self.domain = jid.host.encode('idna')
         self.host = host
         self.port = port
 
-        factory = HybridClientFactory(jid, password, check_certificate)
+        factory = HybridClientFactory(
+            jid, password, check_certificate=check_certificate)
 
         client.StreamManager.__init__(self, factory)
 
@@ -39,6 +50,7 @@
 
 
 class HybridAuthenticator(client.HybridAuthenticator):
+    res_binding = True
 
     def __init__(self, jid, password, check_certificate):
         xmlstream.ConnectAuthenticator.__init__(self, jid.host)
@@ -53,11 +65,83 @@
         tlsInit.check_certificate = self.check_certificate
         xs.initializers = [client.client.CheckVersionInitializer(xs),
                            tlsInit,
-                           client.CheckAuthInitializer(xs)]
+                           CheckAuthInitializer(xs, self.res_binding)]
+
+
+# XmlStream triggers
+
+
+class XmlStream(xmlstream.XmlStream):
+    """XmlStream which allows to add hooks"""
+
+    def __init__(self, authenticator):
+        xmlstream.XmlStream.__init__(self, authenticator)
+        # hooks at this level should not modify content
+        # so it's not needed to handle priority as with triggers
+        self._onElementHooks = []
+        self._sendHooks = []
+
+    def addHook(self, hook_type, callback):
+        """Add a send or receive hook"""
+        conflict_msg = (u"Hook conflict: can't add {hook_type} hook {callback}"
+            .format(hook_type=hook_type, callback=callback))
+        if hook_type == C.STREAM_HOOK_RECEIVE:
+            if callback not in self._onElementHooks:
+                self._onElementHooks.append(callback)
+            else:
+                log.warning(conflict_msg)
+        elif hook_type == C.STREAM_HOOK_SEND:
+            if callback not in self._sendHooks:
+                self._sendHooks.append(callback)
+            else:
+                log.warning(conflict_msg)
+        else:
+            raise ValueError(u"Invalid hook type: {hook_type}"
+                .format(hook_type=hook_type))
+
+    def onElement(self, element):
+        for hook in self._onElementHooks:
+            hook(element)
+        xmlstream.XmlStream.onElement(self, element)
+
+    def send(self, obj):
+        for hook in self._sendHooks:
+            hook(obj)
+        xmlstream.XmlStream.send(self, obj)
+
+
+# Binding activation (needed for stream management, XEP-0198)
+
+
+class CheckAuthInitializer(client.CheckAuthInitializer):
+
+    def __init__(self, xs, res_binding):
+        super(CheckAuthInitializer, self).__init__(xs)
+        self.res_binding = res_binding
+
+    def initialize(self):
+        # XXX: modification of client.CheckAuthInitializer which has optional
+        #      resource binding, and which doesn't do deprecated
+        #      SessionInitializer
+        if (sasl.NS_XMPP_SASL, 'mechanisms') in self.xmlstream.features:
+            inits = [(sasl.SASLInitiatingInitializer, True)]
+            if self.res_binding:
+                inits.append((tclient.BindInitializer, True)),
+
+            for initClass, required in inits:
+                init = initClass(self.xmlstream)
+                init.required = required
+                self.xmlstream.initializers.append(init)
+        elif (tclient.NS_IQ_AUTH_FEATURE, 'auth') in self.xmlstream.features:
+            self.xmlstream.initializers.append(
+                    tclient.IQAuthInitializer(self.xmlstream))
+        else:
+            raise Exception("No available authentication method found")
 
 
 def apply():
+    # certificate validation
     xmlstream.TLSInitiatingInitializer = TLSInitiatingInitializer
     client.XMPPClient = XMPPClient
-    client.HybridClientFactory = HybridClientFactory
-    client.HybridAuthenticator = HybridAuthenticator
+    # XmlStream triggers
+    xmlstream.XmlStreamFactory.protocol = XmlStream
--- a/sat/core/sat_main.py	Sat Nov 10 10:16:38 2018 +0100
+++ b/sat/core/sat_main.py	Sun Nov 18 15:49:46 2018 +0100
@@ -46,6 +46,7 @@
 import os.path
 import uuid
 
+
 try:
     from collections import OrderedDict  # only available from python 2.7
 except ImportError:
--- a/sat/core/xmpp.py	Sat Nov 10 10:16:38 2018 +0100
+++ b/sat/core/xmpp.py	Sun Nov 18 15:49:46 2018 +0100
@@ -49,20 +49,20 @@
 
 class SatXMPPEntity(object):
     """Common code for Client and Component"""
-    _reason = None  # reason of disconnection
 
     def __init__(self, host_app, profile, max_retries):
-
-        self.factory.clientConnectionLost = self.connectionLost
-        self.factory.maxRetries = max_retries
-        # when self._connected is None, we are not connected
+        factory = self.factory
+        factory.maxRetries = max_retries
+        factory.maxDelay = 30
+        # when self._connected_d is None, we are not connected
         # else, it's a deferred which fire on disconnection
-        self._connected = None
+        self._connected_d = None
         self.profile = profile
         self.host_app = host_app
         self.cache = cache.Cache(host_app, profile)
         self._mess_id_uid = {}  # map from message id to uid used in history.
                                 # Key: (full_jid,message_id) Value: uid
+        # this Deferred fire when entity is connected
         self.conn_deferred = defer.Deferred()
         self._progress_cb = {}  # callback called when a progress is requested
                                 # (key = progress id)
@@ -145,15 +145,12 @@
             "Password", "Connection", profile_key=profile
         )
         entity = host.profiles[profile] = cls(
-            host,
-            profile,
-            jid.JID(host.memory.getParamA("JabberID", "Connection", profile_key=profile)),
-            password,
-            host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection", profile_key=profile)
-            or None,
-            port,
-            max_retries,
-        )
+            host, profile, jid.JID(host.memory.getParamA("JabberID", "Connection",
+            profile_key=profile)), password,
+            host.memory.getParamA(C.FORCE_SERVER_PARAM, "Connection",
+                                  profile_key=profile) or None,
+            port, max_retries,
+            )
 
         entity._createSubProtocols()
 
@@ -172,7 +169,7 @@
 
         entity.startService()
 
-        yield entity.getConnectionDeferred()
+        yield entity.conn_deferred
 
         yield defer.maybeDeferred(entity.entityConnected)
 
@@ -205,40 +202,16 @@
         # we finally send our presence
         entity.presence.available()
 
-    def getConnectionDeferred(self):
-        """Return a deferred which fire when the client is connected"""
-        return self.conn_deferred
-
     def _disconnectionCb(self, __):
-        self._connected = None
+        self._connected_d = None
 
     def _disconnectionEb(self, failure_):
         log.error(_(u"Error while disconnecting: {}".format(failure_)))
 
     def _authd(self, xmlstream):
-        if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile):
-            return
         super(SatXMPPEntity, self)._authd(xmlstream)
-
-        if self._reason is not None:
-            # if we have had trouble to connect we can reset
-            # the exception as the connection is now working.
-            del self._reason
-
-        # the following Deferred is used to know when we are connected
-        # so we need to be set it to None when connection is lost
-        self._connected = defer.Deferred()
-        self._connected.addCallback(self._cleanConnection)
-        self._connected.addCallback(self._disconnectionCb)
-        self._connected.addErrback(self._disconnectionEb)
-
-        log.info(
-            _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile)
-        )
+        log.debug(_(u"{profile} identified").format(profile=self.profile))
         self.streamInitialized()
-        self.host_app.bridge.connected(
-            self.profile, unicode(self.jid)
-        )  # we send the signal to the clients
 
     def _finish_connection(self, __):
         self.conn_deferred.callback(None)
@@ -246,11 +219,32 @@
     def streamInitialized(self):
         """Called after _authd"""
         log.debug(_(u"XML stream is initialized"))
+        if not self.host_app.trigger.point("xml_init", self):
+            return
+        self.postStreamInit()
+
+    def postStreamInit(self):
+        """Workflow after stream initalisation."""
+        log.info(
+            _(u"********** [{profile}] CONNECTED **********").format(profile=self.profile)
+        )
+
+        # the following Deferred is used to know when we are connected
+        # so we need to be set it to None when connection is lost
+        self._connected_d = defer.Deferred()
+        self._connected_d.addCallback(self._cleanConnection)
+        self._connected_d.addCallback(self._disconnectionCb)
+        self._connected_d.addErrback(self._disconnectionEb)
+
+        self.host_app.bridge.connected(
+            self.profile, unicode(self.jid)
+        )  # we send the signal to the clients
+
+
         self.keep_alife = task.LoopingCall(
             self.xmlstream.send, " "
         )  # Needed to avoid disconnection (specially with openfire)
         self.keep_alife.start(C.XMPP_KEEP_ALIFE)
-
         self.disco = SatDiscoProtocol(self)
         self.disco.setHandlerParent(self)
         self.discoHandler = disco.DiscoHandler()
@@ -278,21 +272,27 @@
 
     ## connection ##
 
-    def _disconnected(self, reason):
-        # we have to save the reason of disconnection, otherwise it would be lost
-        self._reason = reason
-        super(SatXMPPEntity, self)._disconnected(reason)
+    def _connected(self, xs):
+        send_hooks = []
+        receive_hooks = []
+        self.host_app.trigger.point(
+            "stream_hooks", self, receive_hooks, send_hooks)
+        for hook in receive_hooks:
+            xs.addHook(C.STREAM_HOOK_RECEIVE, hook)
+        for hook in send_hooks:
+            xs.addHook(C.STREAM_HOOK_SEND, hook)
+        super(SatXMPPEntity, self)._connected(xs)
 
-    def connectionLost(self, connector, reason):
+    def disconnectProfile(self, reason):
         try:
             self.keep_alife.stop()
         except AttributeError:
             log.debug(_("No keep_alife"))
-        if self._connected is not None:
+        if self._connected_d is not None:
             self.host_app.bridge.disconnected(
                 self.profile
             )  # we send the signal to the clients
-            self._connected.callback(None)
+            self._connected_d.callback(None)
             self.host_app.purgeEntity(
                 self.profile
             )  # and we remove references to this client
@@ -302,12 +302,10 @@
                 )
             )
         if not self.conn_deferred.called:
-            # FIXME: real error is not gotten here (e.g. if jid is not know by Prosody,
-            #        we should have the real error)
-            if self._reason is None:
+            if reason is None:
                 err = error.StreamError(u"Server unexpectedly closed the connection")
             else:
-                err = self._reason
+                err = reason
                 try:
                     if err.value.args[0][0][2] == "certificate verify failed":
                         err = exceptions.InvalidCertificate(
@@ -316,10 +314,17 @@
                               u"This should never happen and may indicate that "
                               u"somebody is trying to spy on you.\n"
                               u"Please contact your server administrator."))
+                        self.factory.continueTrying = 0
                 except (IndexError, TypeError):
                     pass
             self.conn_deferred.errback(err)
 
+    def _disconnected(self, reason):
+        super(SatXMPPEntity, self)._disconnected(reason)
+        if not self.host_app.trigger.point("disconnected", self, reason):
+            return
+        self.disconnectProfile(reason)
+
     @defer.inlineCallbacks
     def _cleanConnection(self, __):
         """method called on disconnection
@@ -333,13 +338,18 @@
                 yield disconnected_cb(self)
 
     def isConnected(self):
-        return self._connected is not None
+        try:
+            return bool(self.xmlstream.transport.connected)
+        except AttributeError:
+            return False
 
     def entityDisconnect(self):
+        if not self.host_app.trigger.point("disconnecting", self):
+            return
         log.info(_(u"Disconnecting..."))
         self.stopService()
-        if self._connected is not None:
-            return self._connected
+        if self._connected_d is not None:
+            return self._connected_d
         else:
             return defer.succeed(None)
 
@@ -735,16 +745,8 @@
     )  # XXX: set to True from entry plugin to keep messages in history for received
        #      messages
 
-    def __init__(
-        self,
-        host_app,
-        profile,
-        component_jid,
-        password,
-        host=None,
-        port=None,
-        max_retries=C.XMPP_MAX_RETRIES,
-    ):
+    def __init__(self, host_app, profile, component_jid, password, host=None, port=None,
+                 max_retries=C.XMPP_MAX_RETRIES):
         self.started = time.time()
         if port is None:
             port = C.XMPP_COMPONENT_PORT
--- a/sat/plugins/plugin_misc_xmllog.py	Sat Nov 10 10:16:38 2018 +0100
+++ b/sat/plugins/plugin_misc_xmllog.py	Sun Nov 18 15:49:46 2018 +0100
@@ -23,7 +23,7 @@
 
 log = getLogger(__name__)
 from twisted.words.xish import domish
-from twisted.words.xish import xmlstream
+from functools import partial
 
 PLUGIN_INFO = {
     C.PI_NAME: "Raw XML log Plugin",
@@ -36,26 +36,6 @@
     C.PI_DESCRIPTION: _(u"""Send raw XML logs to bridge"""),
 }
 
-host = None
-
-
-def send(self, obj):
-    global host
-    if isinstance(obj, basestring):
-        log = unicode(obj)
-    elif isinstance(obj, domish.Element):
-        log = obj.toXml()
-    else:
-        log.error(_(u"INTERNAL ERROR: Unmanaged XML type"))
-    host.bridge.xmlLog("OUT", log, self._profile)
-    return self._original_send(obj)
-
-
-def onElement(self, element):
-    global host
-    host.bridge.xmlLog("IN", element.toXml(), self._profile)
-    return self._original_onElement(element)
-
 
 class XmlLog(object):
 
@@ -71,30 +51,32 @@
         "label_xmllog": _("Activate XML log")
     }
 
-    def __init__(self, host_):
+    def __init__(self, host):
         log.info(_("Plugin XML Log initialization"))
-        global host
-        host = host_
-
-        # parameters
+        self.host = host
         host.memory.updateParams(self.params)
-
-        # bridge
         host.bridge.addSignal(
             "xmlLog", ".plugin", signature="sss"
         )  # args: direction("IN" or "OUT"), xml_data, profile
 
-        self.do_log = host.memory.getParamA("Xml log", "Debug")
+        host.trigger.add("stream_hooks", self.addHooks)
+
+    def addHooks(self, client, receive_hooks, send_hooks):
+        self.do_log = self.host.memory.getParamA("Xml log", "Debug")
         if self.do_log:
-            XmlStream = xmlstream.XmlStream
-            XmlStream._original_send = XmlStream.send
-            XmlStream._original_onElement = XmlStream.onElement
-            XmlStream.send = send
-            XmlStream.onElement = onElement
-            XmlStream._profile = ""
-            host.trigger.add("XML Initialized", self.setProfile)
+            receive_hooks.append(partial(self.onReceive, client=client))
+            send_hooks.append(partial(self.onSend, client=client))
             log.info(_(u"XML log activated"))
+        return True
 
-    def setProfile(self, xmlstream, profile):
-        xmlstream._profile = profile
-        return True
+    def onReceive(self, element, client):
+        self.host.bridge.xmlLog("IN", element.toXml(), client.profile)
+
+    def onSend(self, obj, client):
+        if isinstance(obj, basestring):
+            log = unicode(obj)
+        elif isinstance(obj, domish.Element):
+            log = obj.toXml()
+        else:
+            log.error(_(u"INTERNAL ERROR: Unmanaged XML type"))
+        self.host.bridge.xmlLog("OUT", log, client.profile)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0198.py	Sun Nov 18 15:49:46 2018 +0100
@@ -0,0 +1,453 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# SàT plugin for managing raw XML log
+# Copyright (C) 2011  Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from sat.core.i18n import _
+from sat.core.constants import Const as C
+from sat.core import exceptions
+from sat.core.log import getLogger
+from twisted.words.protocols.jabber import client as jabber_client
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish
+from twisted.internet import defer
+from twisted.internet import task, reactor
+from functools import partial
+from wokkel import disco, iwokkel
+from zope.interface import implements
+import collections
+import time
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Stream Management",
+    C.PI_IMPORT_NAME: "XEP-0198",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0198"],
+    C.PI_DEPENDENCIES: [],
+    C.PI_MAIN: "XEP_0198",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""),
+}
+
+NS_SM = u"urn:xmpp:sm:3"
+SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]'
+SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]'
+SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]'
+SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]'
+SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]'
+SM_H_REQUEST = '/h[@xmlns="' + NS_SM + '"]'
+# Max number of stanza to send before requesting ack
+MAX_STANZA_ACK_R = 5
+# Max number of seconds before requesting ack
+MAX_DELAY_ACK_R = 30
+MAX_COUNTER = 2**32
+RESUME_MAX = 5*60
+
+
+class ProfileSessionData(object):
+    out_counter = 0
+    in_counter = 0
+    session_id = None
+    location = None
+    session_max = None
+    # True when an ack answer is expected
+    ack_requested = False
+    last_ack_r = 0
+    disconnected_time = None
+
+    def __init__(self, callback, **kw):
+        self.buffer = collections.deque()
+        self.buffer_idx = 0
+        self._enabled = False
+        self.timer = None
+        self.callback_data = (callback, kw)
+
+    @property
+    def enabled(self):
+        return self._enabled
+
+    @enabled.setter
+    def enabled(self, enabled):
+        if enabled:
+            if self._enabled:
+                raise exceptions.InternalError(
+                    u"Stream Management can't be enabled twice")
+            self._enabled = True
+            callback, kw = self.callback_data
+            self.timer = task.LoopingCall(callback, **kw)
+            self.timer.start(MAX_DELAY_ACK_R, now=False)
+        else:
+            self._enabled = False
+            if self.timer is not None:
+                self.timer.stop()
+                self.timer = None
+
+    @property
+    def resume_enabled(self):
+        return self.session_id is not None
+
+    def reset(self):
+        self.enabled = False
+        self.buffer.clear()
+        self.buffer_idx = 0
+        self.in_counter = self.out_counter = 0
+        self.session_id = self.location = None
+        self.ack_requested = False
+        self.last_ack_r = 0
+
+
+class XEP_0198(object):
+    # FIXME: location is not handled yet
+
+    def __init__(self, host):
+        log.info(_("Plugin Stream Management initialization"))
+        self.host = host
+        host.registerNamespace(u'sm', NS_SM)
+        host.trigger.add("stream_hooks", self.addHooks)
+        host.trigger.add("xml_init", self._XMLInitTrigger)
+        host.trigger.add("disconnecting", self._disconnectingTrigger)
+        host.trigger.add("disconnected", self._disconnectedTrigger)
+
+    def profileConnecting(self, client):
+        client._xep_0198_session = ProfileSessionData(callback=self.checkAcks,
+                                                      client=client)
+
+    def getHandler(self, client):
+        return XEP_0198_handler(self)
+
+    def addHooks(self, client, receive_hooks, send_hooks):
+        """Add hooks to handle in/out stanzas counters"""
+        receive_hooks.append(partial(self.onReceive, client=client))
+        send_hooks.append(partial(self.onSend, client=client))
+        return True
+
+    def _XMLInitTrigger(self, client):
+        """Enable or resume a stream mangement"""
+        if not (NS_SM, u'sm') in client.xmlstream.features:
+            log.warning(_(
+                u"Your server doesn't support stream management ({namespace}), this is "
+                u"used to improve connection problems detection (like network outages). "
+                u"Please ask your server administrator to enable this feature.".format(
+                namespace=NS_SM)))
+            return True
+        session = client._xep_0198_session
+
+        # a disconnect timer from a previous disconnection may still be active
+        try:
+            disconnect_timer = session.disconnect_timer
+        except AttributeError:
+            pass
+        else:
+            if disconnect_timer.active():
+                disconnect_timer.cancel()
+            del session.disconnect_timer
+
+        if session.resume_enabled:
+            # we are resuming a session
+            resume_elt = domish.Element((NS_SM, 'resume'))
+            resume_elt['h'] = unicode(session.in_counter)
+            resume_elt['previd'] = session.session_id
+            client.send(resume_elt)
+            session.resuming = True
+            # session.enabled will be set on <resumed/> reception
+            return False
+        else:
+            # we start a new session
+            assert session.out_counter == 0
+            enable_elt = domish.Element((NS_SM, 'enable'))
+            enable_elt[u'resume'] = u'true'
+            client.send(enable_elt)
+            session.enabled = True
+            return True
+
+    def _disconnectingTrigger(self, client):
+        session = client._xep_0198_session
+        if session.enabled:
+            self.sendAck(client)
+        # This is a requested disconnection, so we can reset the session
+        # to disable resuming and close normally the stream
+        session.reset()
+        return True
+
+    def _disconnectedTrigger(self, client, reason):
+        session = client._xep_0198_session
+        session.enabled = False
+        if session.resume_enabled:
+            session.disconnected_time = time.time()
+            session.disconnect_timer = reactor.callLater(session.session_max,
+                                                         client.disconnectProfile,
+                                                         reason)
+            # disconnectProfile must not be called at this point
+            # because session can be resumed
+            return False
+        else:
+            return True
+
+    def checkAcks(self, client):
+        """Request ack if needed"""
+        session = client._xep_0198_session
+        # log.debug("checkAcks (in_counter={}, out_counter={}, buf len={}, buf idx={})"
+        #     .format(session.in_counter, session.out_counter, len(session.buffer),
+        #             session.buffer_idx))
+        if session.ack_requested or not session.buffer:
+            return
+        if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R
+            or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R):
+            self.requestAck(client)
+            session.ack_requested = True
+            session.last_ack_r = time.time()
+
+    def updateBuffer(self, session, server_acked):
+        """Update buffer and buffer_index"""
+        if server_acked > session.buffer_idx:
+            diff = server_acked - session.buffer_idx
+            for i in xrange(diff):
+                session.buffer.pop()
+            session.buffer_idx += diff
+
+    def sendAck(self, client):
+        """Send an answer element with current IN counter"""
+        a_elt = domish.Element((NS_SM, 'a'))
+        a_elt['h'] = unicode(client._xep_0198_session.in_counter)
+        client.send(a_elt)
+
+    def requestAck(self, client):
+        """Send a request element"""
+        r_elt = domish.Element((NS_SM, 'r'))
+        client.send(r_elt)
+
+    def _connectionFailed(self, failure_, connector):
+        normal_host, normal_port = connector.normal_location
+        del connector.normal_location
+        log.warning(_(
+            u"Connection failed using location given by server (host: {host}, port: "
+            u"{port}), switching to normal host and port (host: {normal_host}, port: "
+            u"{normal_port})".format(host=connector.host, port=connector.port,
+                                     normal_host=normal_host, normal_port=normal_port)))
+        connector.host, connector.port = normal_host, normal_port
+        connector.connectionFailed = connector.connectionFailed_ori
+        del connector.connectionFailed_ori
+        return connector.connectionFailed(failure_)
+
+    def onEnabled(self, enabled_elt, client):
+        session = client._xep_0198_session
+        session.in_counter = 0
+
+        # we check that resuming is possible and that we have a session id
+        resume = C.bool(enabled_elt.getAttribute(u'resume'))
+        session_id = enabled_elt.getAttribute(u'id')
+        if not session_id:
+            log.warning(_(u'Incorrect <enabled/> element received, no "id" attribute'))
+        if not resume or not session_id:
+            log.warning(_(
+                u"You're server doesn't support session resuming with stream management, "
+                u"please contact your server administrator to enable it"))
+            return
+
+        session.session_id = session_id
+
+        # XXX: we disable resource binding, which must not be done
+        #      when we resume the session.
+        client.factory.authenticator.res_binding = False
+
+        # location, in case server want resuming session to be elsewhere
+        try:
+            location = enabled_elt[u'location']
+        except KeyError:
+            pass
+        else:
+            # TODO: handle IPv6 here (in brackets, cf. XEP)
+            try:
+                domain, port = location.split(':', 1)
+                port = int(port)
+            except ValueError:
+                log.warning(_(u"Invalid location received: {location}")
+                    .format(location=location))
+            else:
+                session.location = (domain, port)
+                # we monkey patch connector to use the new location
+                connector = client.xmlstream.transport.connector
+                connector.normal_location = connector.host, connector.port
+                connector.host = domain
+                connector.port = port
+                connector.connectionFailed_ori = connector.connectionFailed
+                connector.connectionFailed = partial(self._connectionFailed,
+                                                     connector=connector)
+
+        # resuming time
+        try:
+            max_s = int(enabled_elt[u'max'])
+        except (ValueError, KeyError) as e:
+            if isinstance(e, ValueError):
+                log.warning(_(u'Invalid "max" attribute'))
+            max_s = RESUME_MAX
+            log.info(_(u"Using default session max value ({max_s} s).".format(
+                max_s=max_s)))
+            log.info(_(u"Stream Management enabled"))
+        else:
+            log.info(_(
+                u"Stream Management enabled, with a resumption time of {res_m} min"
+                .format(res_m = max_s/60)))
+        session.session_max = max_s
+
+    def onResumed(self, enabled_elt, client):
+        session = client._xep_0198_session
+        assert not session.enabled
+        del session.resuming
+        server_acked = int(enabled_elt['h'])
+        self.updateBuffer(session, server_acked)
+        resend_count = len(session.buffer)
+        # we resend all stanza which have not been received properly
+        while True:
+            try:
+                stanza = session.buffer.pop()
+            except IndexError:
+                break
+            else:
+                client.send(stanza)
+        # now we can continue the session
+        session.enabled = True
+        d_time = time.time() - session.disconnected_time
+        log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} "
+                   u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count))
+
+    def onFailed(self, failed_elt, client):
+        session = client._xep_0198_session
+        condition_elt = failed_elt.firstChildElement()
+        session.reset()
+
+        try:
+            del session.resuming
+        except AttributeError:
+            # stream management can't be started at all
+            msg = _(u"Can't use stream management")
+            if condition_elt is None:
+                log.error(msg + u'.')
+            else:
+                log.error(_(u"{msg}: {reason}").format(
+                msg=msg, reason=condition_elt.name))
+        else:
+            # only stream resumption failed, we can try full session init
+            # XXX: we try to start full session init from this point, with many
+            #      variables/attributes already initialised with a potentially different
+            #      jid. This is experimental and may not be safe. It may be more
+            #      secured to abord the connection and restart everything with a fresh
+            #      client.
+            msg = _(u"stream resumption not possible, restarting full session")
+
+            if condition_elt is None:
+                log.warning(u'{msg}.'.format(msg=msg))
+            else:
+                log.warning(u"{msg}: {reason}".format(
+                    msg=msg, reason=condition_elt.name))
+            # stream resumption failed, but we still can do normal stream management
+            # we restore attributes as if the session was new, and init stream
+            # we keep everything initialized, and only do binding, roster request
+            # and initial presence sending.
+            if client.conn_deferred.called:
+                client.conn_deferred = defer.Deferred()
+            else:
+                log.error(u"conn_deferred should be called at this point")
+            # we need to recreate roster
+            client.handlers.remove(client.roster)
+            client.roster = client.roster.__class__(self.host)
+            client.roster.setHandlerParent(client)
+            # bind init is not done when resuming is possible, so we have to do it now
+            bind_init = jabber_client.BindInitializer(client.xmlstream)
+            bind_init.required = True
+            d = bind_init.start()
+            # we set the jid, which may have changed
+            d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid))
+            # we call the trigger who will send the <enable/> element
+            d.addCallback(lambda __: self._XMLInitTrigger(client))
+            # then we have to re-request the roster, as changes may have occured
+            d.addCallback(lambda __: client.roster.requestRoster())
+            # we add got_roster to be sure to have roster before sending initial presence
+            d.addCallback(lambda __: client.roster.got_roster)
+            # initial presence must be sent manually
+            d.addCallback(lambda __: client.presence.available())
+
+    def onReceive(self, element, client):
+        session = client._xep_0198_session
+        if session.enabled and element.name.lower() in C.STANZA_NAMES:
+            session.in_counter += 1 % MAX_COUNTER
+
+    def onSend(self, obj, client):
+        session = client._xep_0198_session
+        if (session.enabled
+            and domish.IElement.providedBy(obj)
+            and obj.name.lower() in C.STANZA_NAMES):
+            session.out_counter += 1 % MAX_COUNTER
+            session.buffer.appendleft(obj)
+            self.checkAcks(client)
+
+    def onAckRequest(self, r_elt, client):
+        self.sendAck(client)
+
+    def onAckAnswer(self, a_elt, client):
+        session = client._xep_0198_session
+        session.ack_requested = False
+        try:
+            server_acked = int(a_elt['h'])
+        except ValueError:
+            log.warning(_(u"Server returned invalid ack element, disabling stream "
+                          u"management: {xml}").format(xml=a_elt))
+            session.enabled = False
+            return
+
+        if server_acked > session.out_counter:
+            log.error(_(u"Server acked more stanzas than we have sent, disabling stream "
+                        u"management."))
+            session.reset()
+            return
+
+        self.updateBuffer(session, server_acked)
+        self.checkAcks(client)
+
+
+class XEP_0198_handler(xmlstream.XMPPHandler):
+    implements(iwokkel.IDisco)
+
+    def __init__(self, plugin_parent):
+        self.plugin_parent = plugin_parent
+        self.host = plugin_parent.host
+
+    def connectionInitialized(self):
+        self.xmlstream.addObserver(
+            SM_ENABLED, self.plugin_parent.onEnabled, client=self.parent
+        )
+        self.xmlstream.addObserver(
+            SM_RESUMED, self.plugin_parent.onResumed, client=self.parent
+        )
+        self.xmlstream.addObserver(
+            SM_FAILED, self.plugin_parent.onFailed, client=self.parent
+        )
+        self.xmlstream.addObserver(
+            SM_R_REQUEST, self.plugin_parent.onAckRequest, client=self.parent
+        )
+        self.xmlstream.addObserver(
+            SM_A_REQUEST, self.plugin_parent.onAckAnswer, client=self.parent
+        )
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [disco.DiscoFeature(NS_SM)]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []