view src/core/ @ 1573:6338677f3a89

core (client): added a sendError method to easily build error response from <IQ\> stanza
author Goffi <>
date Wed, 11 Nov 2015 14:56:05 +0100
parents 001b62bed67c
children 5b24d6bf5d15
line wrap: on
line source

# -*- coding: utf-8 -*-

# SAT: a jabber client
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from twisted.internet import task, defer
from twisted.words.protocols.jabber import jid, xmlstream
from twisted.words.protocols.jabber import error
from wokkel import client, disco, xmppim, generic, delay, iwokkel
from sat.core.log import getLogger
log = getLogger(__name__)
from sat.core import exceptions
from calendar import timegm
from zope.interface import implements
from twisted.words.protocols.jabber.xmlstream import XMPPHandler

class SatXMPPClient(client.XMPPClient):

    def __init__(self, host_app, profile, user_jid, password, host=None, port=C.XMPP_C2S_PORT):
        # XXX: DNS SRV records are checked when the host is not specified.
        # If no SRV record is found, the host is directly extracted from the JID.
        client.XMPPClient.__init__(self, user_jid, password, host or None, port or C.XMPP_C2S_PORT)
        self.factory.clientConnectionLost = self.connectionLost
        self.__connected = False
        self.profile = profile
        self.host_app = host_app
        self.conn_deferred = defer.Deferred()
        self._waiting_conf = {}  # callback called when a confirmation is received
        self._progress_cb = {}  # callback called when a progress is requested (key = progress id)

    def getConnectionDeferred(self):
        """Return a deferred which fire when the client is connected"""
        return self.conn_deferred

    def IQ(self, type_=u'set', timeout=None):
        """shortcut to create an IQ element managing deferred

        @param type_(unicode): IQ type ('set' or 'get')
        @param timeout(None, int): timeout in seconds
        iq_elt = xmlstream.IQ(self.xmlstream, type_)
        iq_elt.timeout = timeout
        return iq_elt

    def sendError(self, iq_elt, condition):
        """Send error stanza build from iq_elt

        @param iq_elt(domish.Element): initial IQ element
        @param condition(unicode): error condition
        iq_error_elt = error.StanzaError(condition).toResponse(iq_elt)

    def _authd(self, xmlstream):
        if not self.host_app.trigger.point("XML Initialized", xmlstream, self.profile):
        client.XMPPClient._authd(self, xmlstream)
        self.__connected = True"********** [%s] CONNECTED **********") % self.profile)
        self.host_app.bridge.connected(self.profile, unicode(self.jid))  # we send the signal to the clients

    def streamInitialized(self):
        """Called after _authd"""
        log.debug(_("XML stream is initialized"))
        self.keep_alife = task.LoopingCall(self.xmlstream.send, " ")  # Needed to avoid disconnection (specially with openfire)

        self.disco = SatDiscoProtocol(self)
        self.discoHandler = disco.DiscoHandler()
        disco_d = defer.succeed(None)

        if not self.host_app.trigger.point("Disco handled", disco_d, self.profile):

        def finish_connection(dummy):


    def initializationFailed(self, reason):
        log.error(_(u"ERROR: XMPP connection failed for profile '%(profile)s': %(reason)s" % {'profile': self.profile, 'reason': reason}))
            client.XMPPClient.initializationFailed(self, reason)
            # we already chained an errback, no need to raise an exception

    def isConnected(self):
        return self.__connected

    def connectionLost(self, connector, unused_reason):
        except AttributeError:
            log.debug(_("No keep_alife"))
        if self.__connected:
  "********** [%s] DISCONNECTED **********") % self.profile)
            self.host_app.bridge.disconnected(self.profile)  # we send the signal to the clients
            self.host_app.purgeClient(self.profile)  # and we remove references to this client
        self.__connected = False

class SatMessageProtocol(xmppim.MessageProtocol):

    def __init__(self, host):
        xmppim.MessageProtocol.__init__(self) = host

    def onMessage(self, message):
        if not message.hasAttribute('from'):
            message['from'] =
        log.debug(_(u"got message from: %s") % message["from"])
        post_treat = defer.Deferred() # XXX: plugin can add their treatments to this deferred

        if not"MessageReceived", message, post_treat, profile=self.parent.profile):

        data = {"from": message['from'],
                "to": message['to'],
                "body": "",
                "extra": {}}

        for e in message.elements():
            if == "body":
                data['body'] = e.children[0] if e.children else ""
            elif == "subject" and e.children:
                data['extra']['subject'] = e.children[0]

        data['type'] = message['type'] if message.hasAttribute('type') else 'normal'

        def bridgeSignal(data):
            if data is not None:
      ['from'], data['body'], data['type'], data['to'], data['extra'], profile=self.parent.profile)
            return data

        def addToHistory(data):
            # set message body to empty string by default, and later
            # also forward message without body (chat state notification...)
                _delay = delay.Delay.fromElement(filter(lambda elm: == 'delay', message.elements())[0])
                timestamp = timegm(_delay.stamp.utctimetuple())
                data['extra']['archive'] = str(timestamp)
                if data['type'] != 'groupchat':  # XXX: we don't save delayed messages in history for groupchats
                    #TODO: add delayed messages to history if they aren't already in it
          ['from']), jid.JID(data['to']), data['body'], data['type'], data['extra'], timestamp, profile=self.parent.profile)
            except IndexError:
      ['from']), jid.JID(data['to']), data['body'], data['type'], data['extra'], profile=self.parent.profile)
            return data

        def treatmentsEb(failure):
            return data

        def cancelErrorTrap(failure):
            """A message sending can be cancelled by a plugin treatment"""


class SatRosterProtocol(xmppim.RosterClientProtocol):

    def __init__(self, host):
        xmppim.RosterClientProtocol.__init__(self) = host
        self.got_roster = defer.Deferred() # called when roster is received and ready
        #XXX: the two following dicts keep a local copy of the roster
        self._groups = {}  # map from groups to jids: key=group value=set of jids
        self._jids = None  # map from jids to RosterItem: key=jid value=RosterItem

    def rosterCb(self, roster):
        assert roster is not None # FIXME: must be managed with roster versioning
        self._jids = roster
        for item in roster.itervalues():
            if not item.subscriptionTo and not item.subscriptionFrom and not item.ask:
                #XXX: current behaviour: we don't want contact in our roster list
                # if there is no presence subscription
                # may change in the future
      "Removing contact {} from roster because there is no presence subscription".format(item.jid))
                self.removeItem(item.entity) # FIXME: to be checked

    def _registerItem(self, item):
        """Register item in local cache

        item must be already registered in self._jids before this method is called
        @param item (RosterIem): item added
        log.debug(u"registering item: {}".format(item.entity.full()))
        if item.entity.resource:
            log.warning(u"Received a roster item with a resource, this is not common but not restricted by RFC 6121, this case may be not well tested.")
        if not item.subscriptionTo:
            if not item.subscriptionFrom:
      "There's no subscription between you and [{}]!").format(item.entity.full()))
      "You are not subscribed to [{}]!").format(item.entity.full()))
        if not item.subscriptionFrom:
  "[{}] is not subscribed to you!").format(item.entity.full()))

        for group in item.groups:
            self._groups.setdefault(group, set()).add(item.entity)

    def requestRoster(self):
        """ ask the server for Roster list """
        d = self.getRoster().addCallback(self.rosterCb)

    def removeItem(self, to_jid):
        """Remove a contact from roster list
        @param to_jid: a JID instance
        @return: Deferred
        return xmppim.RosterClientProtocol.removeItem(self, to_jid)

    def getAttributes(self, item):
        """Return dictionary of attributes as used in bridge from a RosterItem
        @param item: RosterItem
        @return: dictionary of attributes"""
        item_attr = {'to': unicode(item.subscriptionTo),
                     'from': unicode(item.subscriptionFrom),
                     'ask': unicode(item.ask)
            item_attr['name'] =
        return item_attr

    def setReceived(self, request):
        #TODO: implement roster versioning (cf RFC 6121 §2.6)
        item = request.item
        try:  # update the cache for the groups the contact has been removed from
            left_groups = set(self._jids[item.entity].groups).difference(item.groups)
            for group in left_groups:
                jids_set = self._groups[group]
                if not jids_set:
                    del self._groups[group]
        except KeyError:
            pass  # no previous item registration (or it's been cleared)
        self._jids[item.entity] = item
        self._registerItem(item), self.getAttributes(item), item.groups, self.parent.profile)

    def removeReceived(self, request):
        entity = request.item.entity"removing %s from roster list" % entity.full())

        # we first remove item from local cache (self._groups and self._jids)
            item = self._jids.pop(entity)
        except KeyError:
            log.error(u"Received a roster remove event for an item not in cache ({})".format(entity))
        for group in item.groups:
                jids_set = self._groups[group]
                if not jids_set:
                    del self._groups[group]
            except KeyError:
                log.warning(u"there is no cache for the group [%(group)s] of the removed roster item [%(jid)s]" %
                            {"group": group, "jid": entity})

        # then we send the bridge signal, self.parent.profile)

    def getGroups(self):
        """Return a list of groups"""
        return self._groups.keys()

    def getItem(self, entity_jid):
        """Return RosterItem for a given jid

        @param entity_jid: jid of the contact
        @return: RosterItem or None if contact is not in cache
        return self._jids.get(entity_jid, None)

    def getJids(self):
        """Return all jids of the roster"""
        return self._jids.keys()

    def isJidInRoster(self, entity_jid):
        """Return True if jid is in roster"""
        return entity_jid in self._jids

    def getItems(self):
        """Return all items of the roster"""
        return self._jids.values()

    def getJidsFromGroup(self, group):
            return self._groups[group]
        except KeyError:
            raise exceptions.UnknownGroupError(group)

    def getJidsSet(self, type_, groups=None):
        """Helper method to get a set of jids

        @param type_(unicode): one of:
            C.ALL: get all jids from roster
            C.GROUP: get jids from groups (listed in "groups")
        @groups(list[unicode]): list of groups used if type_==C.GROUP
        @return (set(jid.JID)): set of selected jids
        if type_ == C.ALL and groups is not None:
            raise ValueError('groups must not be set for {} type'.format(C.ALL))

        if type_ == C.ALL:
            return set(self.getJids())
        elif type_ == C.GROUP:
            jids = set()
            for group in groups:
            return jids
            raise ValueError(u'Unexpected type_ {}'.format(type_))

class SatPresenceProtocol(xmppim.PresenceClientProtocol):

    def __init__(self, host):
        xmppim.PresenceClientProtocol.__init__(self) = host

    def send(self, obj):
        if not"Presence send", self.parent, obj):
        super(SatPresenceProtocol, self).send(obj)

    def availableReceived(self, entity, show=None, statuses=None, priority=0):
        log.debug(_(u"presence update for [%(entity)s] (available, show=%(show)s statuses=%(statuses)s priority=%(priority)d)") % {'entity': entity, C.PRESENCE_SHOW: show, C.PRESENCE_STATUSES: statuses, C.PRESENCE_PRIORITY: priority})

        if not statuses:
            statuses = {}

        if None in statuses:  # we only want string keys
            statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)

        if not"presenceReceived", entity, show, priority, statuses, self.parent.profile):
            return, show or "",
                                           int(priority), statuses,

        # now it's time to notify frontends, show or "",
                                        int(priority), statuses,

    def unavailableReceived(self, entity, statuses=None):
        log.debug(_(u"presence update for [%(entity)s] (unavailable, statuses=%(statuses)s)") % {'entity': entity, C.PRESENCE_STATUSES: statuses})

        if not statuses:
            statuses = {}

        if None in statuses:  # we only want string keys
            statuses[C.PRESENCE_STATUSES_DEFAULT] = statuses.pop(None)

        if not"presenceReceived", entity, "unavailable", 0, statuses, self.parent.profile):
            return, C.PRESENCE_UNAVAILABLE, 0, statuses, self.parent.profile)

        # now it's time to notify frontends, "unavailable", 0, statuses, self.parent.profile)

    def available(self, entity=None, show=None, statuses=None, priority=None):
        """Set a presence and statuses.

        @param entity (jid.JID): entity
        @param show (unicode): value in ('unavailable', '', 'away', 'xa', 'chat', 'dnd')
        @param statuses (dict{unicode: unicode}): multilingual statuses with
            the entry key beeing a language code on 2 characters or "default".
        if priority is None:
                priority = int("Priority", "Connection", profile_key=self.parent.profile))
            except ValueError:
                priority = 0

        if statuses is None:
            statuses = {}

        # default for us is None for wokkel
        # so we must temporarily switch to wokkel's convention...
        if C.PRESENCE_STATUSES_DEFAULT in statuses:
            statuses[None] = statuses.pop(C.PRESENCE_STATUSES_DEFAULT)

        presence_elt = xmppim.AvailablePresence(entity, show, statuses, priority)

        # ... before switching back
        if None in statuses:
            statuses['default'] = statuses.pop(None)

        if not"presence_available", presence_elt, self.parent):

    def subscribed(self, entity):
        xmppim.PresenceClientProtocol.subscribed(self, entity), self.parent.profile)
        item = self.parent.roster.getItem(entity)
        if not item or not item.subscriptionTo:  # we automatically subscribe to 'to' presence
            log.debug(_('sending automatic "from" subscription request'))

    def unsubscribed(self, entity):
        xmppim.PresenceClientProtocol.unsubscribed(self, entity), self.parent.profile)

    def subscribedReceived(self, entity):
        log.debug(_(u"subscription approved for [%s]") % entity.userhost())'subscribed', entity.userhost(), self.parent.profile)

    def unsubscribedReceived(self, entity):
        log.debug(_(u"unsubscription confirmed for [%s]") % entity.userhost())'unsubscribed', entity.userhost(), self.parent.profile)

    def subscribeReceived(self, entity):
        log.debug(_(u"subscription request from [%s]") % entity.userhost())
        item = self.parent.roster.getItem(entity)
        if item and item.subscriptionTo:
            # We automatically accept subscription if we are already subscribed to contact presence
            log.debug(_('sending automatic subscription acceptance'))
  'subscribe', entity.userhost(), self.parent.profile)
  'subscribe', entity.userhost(), self.parent.profile)

    def unsubscribeReceived(self, entity):
        log.debug(_(u"unsubscription asked for [%s]") % entity.userhost())
        item = self.parent.roster.getItem(entity)
        if item and item.subscriptionFrom:  # we automatically remove contact
            log.debug(_('automatic contact deletion'))
  , self.parent.profile)'unsubscribe', entity.userhost(), self.parent.profile)

class SatDiscoProtocol(disco.DiscoClientProtocol):
    def __init__(self, host):

class SatFallbackHandler(generic.FallbackHandler):
    def __init__(self, host):

    def iqFallback(self, iq):
        if iq.handled is True:
        log.debug(u"iqFallback: xml = [%s]" % (iq.toXml()))
        generic.FallbackHandler.iqFallback(self, iq)

class RegisteringAuthenticator(xmlstream.ConnectAuthenticator):

    def __init__(self, host, jabber_host, user_login, user_pass, email, deferred, profile):
        xmlstream.ConnectAuthenticator.__init__(self, jabber_host) = host
        self.jabber_host = jabber_host
        self.user_login = user_login
        self.user_pass = user_pass
        self.user_email = email
        self.deferred = deferred
        self.profile = profile
        log.debug(_(u"Registration asked for %(user)s@%(host)s") % {'user': user_login, 'host': jabber_host})

    def connectionMade(self):
        log.debug(_(u"Connection made with %s" % self.jabber_host))
        self.xmlstream.namespace = "jabber:client"

        iq = xmlstream.IQ(self.xmlstream, 'set')
        iq["to"] = self.jabber_host
        query = iq.addElement(('jabber:iq:register', 'query'))
        _user = query.addElement('username')
        _pass = query.addElement('password')
        if self.user_email:
            _email = query.addElement('email')
        d = iq.send(self.jabber_host).addCallbacks(self.registrationAnswer, self.registrationFailure)

    def registrationAnswer(self, answer):
        log.debug(_(u"Registration answer: %s") % answer.toXml())

    def registrationFailure(self, failure):"Registration failure: %s") % unicode(failure.value))
        raise failure.value

class SatVersionHandler(generic.VersionHandler):

    def getDiscoInfo(self, requestor, target, node):
        #XXX: We need to work around wokkel's behaviour (namespace not added if there is a
        # node) as it cause issues with XEP-0115 & PEP (XEP-0163): there is a node when server
        # ask for disco info, and not when we generate the key, so the hash is used with different
        # disco features, and when the server (seen on ejabberd) generate its own hash for security check
        # it reject our features (resulting in e.g. no notification on PEP)
        return generic.VersionHandler.getDiscoInfo(self, requestor, target, None)

class SatIdentityHandler(XMPPHandler):
    """ Manage disco Identity of SàT. Currently, we use "client/pc/Salut à Toi", but as
    SàT is multi-frontends and can be used on mobile devices, as a bot, with a web frontend,
    etc, we should implement a way to dynamically update identities through the bridge """
    #TODO: dynamic identity update (see docstring). Note that a XMPP entity can have several identities

    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
        return [disco.DiscoIdentity(u"client", u"pc", C.APP_NAME)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
        return []