view sat/plugins/plugin_xep_0198.py @ 2699:310e41bd6666

core (memory/sqlite): added stanza_id: /!\ database schema change /!\ stanza_id is a new field in history added to prepare the implementation of MAM for messages. A new "last_stanza_id" can be used in filters to retrieve last message with a know stanza id (useful for history synchronisation).
author Goffi <goffi@goffi.org>
date Sat, 01 Dec 2018 10:08:17 +0100
parents 1ecceac3df96
children d0466af33483
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# SàT plugin for managing raw XML log
# Copyright (C) 2011  Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core import exceptions
from sat.core.log import getLogger
from twisted.words.protocols.jabber import client as jabber_client
from twisted.words.protocols.jabber import xmlstream
from twisted.words.xish import domish
from twisted.internet import defer
from twisted.internet import task, reactor
from functools import partial
from wokkel import disco, iwokkel
from zope.interface import implements
import collections
import time

log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "Stream Management",
    C.PI_IMPORT_NAME: "XEP-0198",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0198"],
    C.PI_DEPENDENCIES: [],
    C.PI_MAIN: "XEP_0198",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(u"""Implementation of Stream Management"""),
}

NS_SM = u"urn:xmpp:sm:3"
SM_ENABLED = '/enabled[@xmlns="' + NS_SM + '"]'
SM_RESUMED = '/resumed[@xmlns="' + NS_SM + '"]'
SM_FAILED = '/failed[@xmlns="' + NS_SM + '"]'
SM_R_REQUEST = '/r[@xmlns="' + NS_SM + '"]'
SM_A_REQUEST = '/a[@xmlns="' + NS_SM + '"]'
SM_H_REQUEST = '/h[@xmlns="' + NS_SM + '"]'
# Max number of stanza to send before requesting ack
MAX_STANZA_ACK_R = 5
# Max number of seconds before requesting ack
MAX_DELAY_ACK_R = 30
MAX_COUNTER = 2**32
RESUME_MAX = 5*60


class ProfileSessionData(object):
    out_counter = 0
    in_counter = 0
    session_id = None
    location = None
    session_max = None
    # True when an ack answer is expected
    ack_requested = False
    last_ack_r = 0
    disconnected_time = None

    def __init__(self, callback, **kw):
        self.buffer = collections.deque()
        self.buffer_idx = 0
        self._enabled = False
        self.timer = None
        self.callback_data = (callback, kw)

    @property
    def enabled(self):
        return self._enabled

    @enabled.setter
    def enabled(self, enabled):
        if enabled:
            if self._enabled:
                raise exceptions.InternalError(
                    u"Stream Management can't be enabled twice")
            self._enabled = True
            callback, kw = self.callback_data
            self.timer = task.LoopingCall(callback, **kw)
            self.timer.start(MAX_DELAY_ACK_R, now=False)
        else:
            self._enabled = False
            if self.timer is not None:
                self.timer.stop()
                self.timer = None

    @property
    def resume_enabled(self):
        return self.session_id is not None

    def reset(self):
        self.enabled = False
        self.buffer.clear()
        self.buffer_idx = 0
        self.in_counter = self.out_counter = 0
        self.session_id = self.location = None
        self.ack_requested = False
        self.last_ack_r = 0


class XEP_0198(object):
    # FIXME: location is not handled yet

    def __init__(self, host):
        log.info(_("Plugin Stream Management initialization"))
        self.host = host
        host.registerNamespace(u'sm', NS_SM)
        host.trigger.add("stream_hooks", self.addHooks)
        host.trigger.add("xml_init", self._XMLInitTrigger)
        host.trigger.add("disconnecting", self._disconnectingTrigger)
        host.trigger.add("disconnected", self._disconnectedTrigger)

    def profileConnecting(self, client):
        client._xep_0198_session = ProfileSessionData(callback=self.checkAcks,
                                                      client=client)

    def getHandler(self, client):
        return XEP_0198_handler(self)

    def addHooks(self, client, receive_hooks, send_hooks):
        """Add hooks to handle in/out stanzas counters"""
        receive_hooks.append(partial(self.onReceive, client=client))
        send_hooks.append(partial(self.onSend, client=client))
        return True

    def _XMLInitTrigger(self, client):
        """Enable or resume a stream mangement"""
        if not (NS_SM, u'sm') in client.xmlstream.features:
            log.warning(_(
                u"Your server doesn't support stream management ({namespace}), this is "
                u"used to improve connection problems detection (like network outages). "
                u"Please ask your server administrator to enable this feature.".format(
                namespace=NS_SM)))
            return True
        session = client._xep_0198_session

        # a disconnect timer from a previous disconnection may still be active
        try:
            disconnect_timer = session.disconnect_timer
        except AttributeError:
            pass
        else:
            if disconnect_timer.active():
                disconnect_timer.cancel()
            del session.disconnect_timer

        if session.resume_enabled:
            # we are resuming a session
            resume_elt = domish.Element((NS_SM, 'resume'))
            resume_elt['h'] = unicode(session.in_counter)
            resume_elt['previd'] = session.session_id
            client.send(resume_elt)
            session.resuming = True
            # session.enabled will be set on <resumed/> reception
            return False
        else:
            # we start a new session
            assert session.out_counter == 0
            enable_elt = domish.Element((NS_SM, 'enable'))
            enable_elt[u'resume'] = u'true'
            client.send(enable_elt)
            session.enabled = True
            return True

    def _disconnectingTrigger(self, client):
        session = client._xep_0198_session
        if session.enabled:
            self.sendAck(client)
        # This is a requested disconnection, so we can reset the session
        # to disable resuming and close normally the stream
        session.reset()
        return True

    def _disconnectedTrigger(self, client, reason):
        session = client._xep_0198_session
        session.enabled = False
        if session.resume_enabled:
            session.disconnected_time = time.time()
            session.disconnect_timer = reactor.callLater(session.session_max,
                                                         client.disconnectProfile,
                                                         reason)
            # disconnectProfile must not be called at this point
            # because session can be resumed
            return False
        else:
            return True

    def checkAcks(self, client):
        """Request ack if needed"""
        session = client._xep_0198_session
        # log.debug("checkAcks (in_counter={}, out_counter={}, buf len={}, buf idx={})"
        #     .format(session.in_counter, session.out_counter, len(session.buffer),
        #             session.buffer_idx))
        if session.ack_requested or not session.buffer:
            return
        if (session.out_counter - session.buffer_idx >= MAX_STANZA_ACK_R
            or time.time() - session.last_ack_r >= MAX_DELAY_ACK_R):
            self.requestAck(client)
            session.ack_requested = True
            session.last_ack_r = time.time()

    def updateBuffer(self, session, server_acked):
        """Update buffer and buffer_index"""
        if server_acked > session.buffer_idx:
            diff = server_acked - session.buffer_idx
            for i in xrange(diff):
                session.buffer.pop()
            session.buffer_idx += diff

    def sendAck(self, client):
        """Send an answer element with current IN counter"""
        a_elt = domish.Element((NS_SM, 'a'))
        a_elt['h'] = unicode(client._xep_0198_session.in_counter)
        client.send(a_elt)

    def requestAck(self, client):
        """Send a request element"""
        r_elt = domish.Element((NS_SM, 'r'))
        client.send(r_elt)

    def _connectionFailed(self, failure_, connector):
        normal_host, normal_port = connector.normal_location
        del connector.normal_location
        log.warning(_(
            u"Connection failed using location given by server (host: {host}, port: "
            u"{port}), switching to normal host and port (host: {normal_host}, port: "
            u"{normal_port})".format(host=connector.host, port=connector.port,
                                     normal_host=normal_host, normal_port=normal_port)))
        connector.host, connector.port = normal_host, normal_port
        connector.connectionFailed = connector.connectionFailed_ori
        del connector.connectionFailed_ori
        return connector.connectionFailed(failure_)

    def onEnabled(self, enabled_elt, client):
        session = client._xep_0198_session
        session.in_counter = 0

        # we check that resuming is possible and that we have a session id
        resume = C.bool(enabled_elt.getAttribute(u'resume'))
        session_id = enabled_elt.getAttribute(u'id')
        if not session_id:
            log.warning(_(u'Incorrect <enabled/> element received, no "id" attribute'))
        if not resume or not session_id:
            log.warning(_(
                u"You're server doesn't support session resuming with stream management, "
                u"please contact your server administrator to enable it"))
            return

        session.session_id = session_id

        # XXX: we disable resource binding, which must not be done
        #      when we resume the session.
        client.factory.authenticator.res_binding = False

        # location, in case server want resuming session to be elsewhere
        try:
            location = enabled_elt[u'location']
        except KeyError:
            pass
        else:
            # TODO: handle IPv6 here (in brackets, cf. XEP)
            try:
                domain, port = location.split(':', 1)
                port = int(port)
            except ValueError:
                log.warning(_(u"Invalid location received: {location}")
                    .format(location=location))
            else:
                session.location = (domain, port)
                # we monkey patch connector to use the new location
                connector = client.xmlstream.transport.connector
                connector.normal_location = connector.host, connector.port
                connector.host = domain
                connector.port = port
                connector.connectionFailed_ori = connector.connectionFailed
                connector.connectionFailed = partial(self._connectionFailed,
                                                     connector=connector)

        # resuming time
        try:
            max_s = int(enabled_elt[u'max'])
        except (ValueError, KeyError) as e:
            if isinstance(e, ValueError):
                log.warning(_(u'Invalid "max" attribute'))
            max_s = RESUME_MAX
            log.info(_(u"Using default session max value ({max_s} s).".format(
                max_s=max_s)))
            log.info(_(u"Stream Management enabled"))
        else:
            log.info(_(
                u"Stream Management enabled, with a resumption time of {res_m} min"
                .format(res_m = max_s/60)))
        session.session_max = max_s

    def onResumed(self, enabled_elt, client):
        session = client._xep_0198_session
        assert not session.enabled
        del session.resuming
        server_acked = int(enabled_elt['h'])
        self.updateBuffer(session, server_acked)
        resend_count = len(session.buffer)
        # we resend all stanza which have not been received properly
        while True:
            try:
                stanza = session.buffer.pop()
            except IndexError:
                break
            else:
                client.send(stanza)
        # now we can continue the session
        session.enabled = True
        d_time = time.time() - session.disconnected_time
        log.info(_(u"Stream session resumed (disconnected for {d_time} s, {count} "
                   u"stanza(s) resent)").format(d_time=int(d_time), count=resend_count))

    def onFailed(self, failed_elt, client):
        session = client._xep_0198_session
        condition_elt = failed_elt.firstChildElement()
        session.reset()

        try:
            del session.resuming
        except AttributeError:
            # stream management can't be started at all
            msg = _(u"Can't use stream management")
            if condition_elt is None:
                log.error(msg + u'.')
            else:
                log.error(_(u"{msg}: {reason}").format(
                msg=msg, reason=condition_elt.name))
        else:
            # only stream resumption failed, we can try full session init
            # XXX: we try to start full session init from this point, with many
            #      variables/attributes already initialised with a potentially different
            #      jid. This is experimental and may not be safe. It may be more
            #      secured to abord the connection and restart everything with a fresh
            #      client.
            msg = _(u"stream resumption not possible, restarting full session")

            if condition_elt is None:
                log.warning(u'{msg}.'.format(msg=msg))
            else:
                log.warning(u"{msg}: {reason}".format(
                    msg=msg, reason=condition_elt.name))
            # stream resumption failed, but we still can do normal stream management
            # we restore attributes as if the session was new, and init stream
            # we keep everything initialized, and only do binding, roster request
            # and initial presence sending.
            if client.conn_deferred.called:
                client.conn_deferred = defer.Deferred()
            else:
                log.error(u"conn_deferred should be called at this point")
            # we need to recreate roster
            client.handlers.remove(client.roster)
            client.roster = client.roster.__class__(self.host)
            client.roster.setHandlerParent(client)
            # bind init is not done when resuming is possible, so we have to do it now
            bind_init = jabber_client.BindInitializer(client.xmlstream)
            bind_init.required = True
            d = bind_init.start()
            # we set the jid, which may have changed
            d.addCallback(lambda __: setattr(client.factory.authenticator, "jid", client.jid))
            # we call the trigger who will send the <enable/> element
            d.addCallback(lambda __: self._XMLInitTrigger(client))
            # then we have to re-request the roster, as changes may have occured
            d.addCallback(lambda __: client.roster.requestRoster())
            # we add got_roster to be sure to have roster before sending initial presence
            d.addCallback(lambda __: client.roster.got_roster)
            # initial presence must be sent manually
            d.addCallback(lambda __: client.presence.available())

    def onReceive(self, element, client):
        session = client._xep_0198_session
        if session.enabled and element.name.lower() in C.STANZA_NAMES:
            session.in_counter += 1 % MAX_COUNTER

    def onSend(self, obj, client):
        session = client._xep_0198_session
        if (session.enabled
            and domish.IElement.providedBy(obj)
            and obj.name.lower() in C.STANZA_NAMES):
            session.out_counter += 1 % MAX_COUNTER
            session.buffer.appendleft(obj)
            self.checkAcks(client)

    def onAckRequest(self, r_elt, client):
        self.sendAck(client)

    def onAckAnswer(self, a_elt, client):
        session = client._xep_0198_session
        session.ack_requested = False
        try:
            server_acked = int(a_elt['h'])
        except ValueError:
            log.warning(_(u"Server returned invalid ack element, disabling stream "
                          u"management: {xml}").format(xml=a_elt))
            session.enabled = False
            return

        if server_acked > session.out_counter:
            log.error(_(u"Server acked more stanzas than we have sent, disabling stream "
                        u"management."))
            session.reset()
            return

        self.updateBuffer(session, server_acked)
        self.checkAcks(client)


class XEP_0198_handler(xmlstream.XMPPHandler):
    implements(iwokkel.IDisco)

    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent
        self.host = plugin_parent.host

    def connectionInitialized(self):
        self.xmlstream.addObserver(
            SM_ENABLED, self.plugin_parent.onEnabled, client=self.parent
        )
        self.xmlstream.addObserver(
            SM_RESUMED, self.plugin_parent.onResumed, client=self.parent
        )
        self.xmlstream.addObserver(
            SM_FAILED, self.plugin_parent.onFailed, client=self.parent
        )
        self.xmlstream.addObserver(
            SM_R_REQUEST, self.plugin_parent.onAckRequest, client=self.parent
        )
        self.xmlstream.addObserver(
            SM_A_REQUEST, self.plugin_parent.onAckAnswer, client=self.parent
        )

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_SM)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []