view src/memory/disco.py @ 1963:a2bc5089c2eb

backend, frontends: message refactoring (huge commit): /!\ several features are temporarily disabled, like notifications in frontends next step in refactoring, with the following changes: - jp: updated jp message to follow changes in backend/bridge - jp: added --lang, --subject, --subject_lang, and --type options to jp message + fixed unicode handling for jid - quick_frontend (QuickApp, QuickChat): - follow backend changes - refactored chat, message are now handled in OrderedDict and uid are kept so they can be updated - Message and Occupant classes handle metadata, so frontend just have to display them - Primitivus (Chat): - follow backend/QuickFrontend changes - info & standard messages are handled in the same MessageWidget class - improved/simplified handling of messages, removed update() method - user joined/left messages are merged when next to each other - a separator is shown when message is received while widget is out of focus, so user can quickly see the new messages - affiliation/role are shown (in a basic way for now) in occupants panel - removed "/me" messages handling, as it will be done by a backend plugin - message language is displayed when available (only one language per message for now) - fixed :history and :search commands - core (constants): new constants for messages type, XML namespace, entity type - core: *Message methods renamed to follow new code sytle (e.g. sendMessageToBridge => messageSendToBridge) - core (messages handling): fixed handling of language - core (messages handling): mes_data['from'] and ['to'] are now jid.JID - core (core.xmpp): reorganised message methods, added getNick() method to client.roster - plugin text commands: fixed plugin and adapted to new messages behaviour. client is now used in arguments instead of profile - plugins: added information for cancellation reason in CancelError calls - plugin XEP-0045: various improvments, but this plugin still need work: - trigger is used to avoid message already handled by the plugin to be handled a second time - changed the way to handle history, the last message from DB is checked and we request only messages since this one, in seconds (thanks Poezio folks :)) - subject reception is waited before sending the roomJoined signal, this way we are sure that everything including history is ready - cmd_* method now follow the new convention with client instead of profile - roomUserJoined and roomUserLeft messages are removed, the events are now handled with info message with a "ROOM_USER_JOINED" info subtype - probably other forgotten stuffs :p
author Goffi <goffi@goffi.org>
date Mon, 20 Jun 2016 18:41:53 +0200
parents 2daf7b4c6756
children efe31f0881fb
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# SAT: a jabber client
# Copyright (C) 2009-2016 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core import exceptions
from sat.core.log import getLogger
log = getLogger(__name__)
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.error import StanzaError
from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import failure
from sat.core.constants import Const as C
from sat.tools import xml_tools
from sat.memory import persistent
from wokkel import disco
from base64 import b64encode
from hashlib import sha1


TIMEOUT = 15
CAP_HASH_ERROR = 'ERROR'

class HashGenerationError(Exception):
    pass


class ByteIdentity(object):
    """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation"""

    def __init__(self, identity, lang=None):
        assert isinstance(identity, disco.DiscoIdentity)
        self.category = identity.category.encode('utf-8')
        self.idType = identity.type.encode('utf-8')
        self.name = identity.name.encode('utf-8') if identity.name else ''
        self.lang = lang.encode('utf-8') if lang is not None else ''

    def __str__(self):
        return "%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name)


class HashManager(object):
    """map object which manage hashes

    persistent storage is update when a new hash is added
    """

    def __init__(self, persistent):
        self.hashes = {
            CAP_HASH_ERROR: disco.DiscoInfo(), # used when we can't get disco infos
            }
        self.persistent = persistent

    def __getitem__(self, key):
        return self.hashes[key]

    def __setitem__(self, hash_, disco_info):
        if hash_ in self.hashes:
            log.debug(u"ignoring hash set: it is already known")
            return
        self.hashes[hash_] = disco_info
        self.persistent[hash_] = disco_info.toElement().toXml()

    def __contains__(self, hash_):
        return self.hashes.__contains__(hash_)

    def load(self):
        def fillHashes(hashes):
            for hash_, xml in hashes.iteritems():
                element = xml_tools.ElementParser()(xml)
                self.hashes[hash_] = disco.DiscoInfo.fromElement(element)
            log.info(u"Disco hashes loaded")
        d = self.persistent.load()
        d.addCallback(fillHashes)
        return d


class Discovery(object):
    """ Manage capabilities of entities """

    def __init__(self, host):
        self.host = host
        # TODO: remove legacy hashes

    def load(self):
        """Load persistent hashes"""
        self.hashes = HashManager(persistent.PersistentDict("disco"))
        return self.hashes.load()

    @defer.inlineCallbacks
    def hasFeature(self, feature, jid_=None, profile=C.PROF_KEY_NONE):
        """Tell if an entity has the required feature

        @param feature: feature namespace
        @param jid_: jid of the target, or None for profile's server
        @param profile: %(doc_profile)s
        @return: a Deferred which fire a boolean (True if feature is available)
        """
        disco_infos = yield self.getInfos(jid_, profile)
        defer.returnValue(feature in disco_infos.features)

    @defer.inlineCallbacks
    def checkFeature(self, feature, jid_=None, profile=C.PROF_KEY_NONE):
        """Like hasFeature, but raise an exception is feature is not Found

        @param feature: feature namespace
        @param jid_: jid of the target, or None for profile's server
        @param profile: %(doc_profile)s

        @raise: exceptions.FeatureNotFound
        """
        disco_infos = yield self.getInfos(jid_, profile)
        if not feature in disco_infos.features:
            raise failure.Failure(exceptions.FeatureNotFound)

    @defer.inlineCallbacks
    def checkFeatures(self, features, jid_=None, identity=None, profile=C.PROF_KEY_NONE):
        """Like checkFeature, but check several features at once, and check also identity

        @param features(iterable[unicode]): features to check
        @param jid_(jid.JID): jid of the target, or None for profile's server
        @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple
        @param profile: %(doc_profile)s

        @raise: exceptions.FeatureNotFound
        """
        disco_infos = yield self.getInfos(jid_, profile)
        if not set(features).issubset(disco_infos.features):
            raise failure.Failure(exceptions.FeatureNotFound())

        if identity is not None and identity not in disco_infos.identities:
            raise failure.Failure(exceptions.FeatureNotFound())

    def getInfos(self, jid_=None, profile=C.PROF_KEY_NONE):
        """get disco infos from jid_, filling capability hash if needed

        @param jid_: jid of the target, or None for profile's server
        @param profile: %(doc_profile)s
        @return: a Deferred which fire disco.DiscoInfo
        """
        client = self.host.getClient(profile)
        if jid_ is None:
            jid_ = jid.JID(client.jid.host)
        try:
            cap_hash = self.host.memory.getEntityData(jid_, [C.ENTITY_CAP_HASH], client.profile)[C.ENTITY_CAP_HASH]
        except (KeyError, exceptions.UnknownEntityError):
            # capability hash is not available, we'll compute one
            def infosCb(disco_infos):
                cap_hash = self.generateHash(disco_infos)
                self.hashes[cap_hash] = disco_infos
                self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, cap_hash, profile_key=client.profile)
                return disco_infos
            def infosEb(fail):
                if fail.check(defer.CancelledError):
                    reason = u"request time-out"
                else:
                    try:
                        reason = unicode(fail.value)
                    except AttributeError:
                        reason = unicode(fail)
                    log.warning(u"Error while requesting disco infos from {jid}: {reason}".format(jid=jid_.full(), reason=reason))
                self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR, profile_key=client.profile)
                disco_infos = self.hashes[CAP_HASH_ERROR]
                return disco_infos
            d = client.disco.requestInfo(jid_)
            d.addCallback(infosCb)
            d.addErrback(infosEb)
            return d
        else:
            disco_infos = self.hashes[cap_hash]
            return defer.succeed(disco_infos)

    @defer.inlineCallbacks
    def getItems(self, jid_=None, nodeIdentifier='', profile=C.PROF_KEY_NONE):
        """get disco items from jid_, cache them for our own server

        @param jid_ (jid.JID): jid of the target, or None for profile's server
        @param nodeIdentifier (str): optional node identifier
        @param profile: %(doc_profile)s
        @return: a Deferred which fire disco.DiscoItems
        """
        client = self.host.getClient(profile)
        if jid_ is None:
            jid_ = jid.JID(client.jid.host)
            # we cache items only for our own server
            try:
                items = self.host.memory.getEntityData(jid_, ["DISCO_ITEMS"], client.profile)["DISCO_ITEMS"]
                log.debug(u"[%s] disco items are in cache" % jid_.full())
            except (KeyError, exceptions.UnknownEntityError):
                log.debug(u"Caching [%s] disco items" % jid_.full())
                items = yield client.disco.requestItems(jid_, nodeIdentifier)
                self.host.memory.updateEntityData(jid_, "DISCO_ITEMS", items, profile_key=client.profile)
        else:
            try:
                items = yield client.disco.requestItems(jid_, nodeIdentifier)
            except StanzaError as e:
                log.warning(u"Error while requesting items for {jid}: {reason}"
                    .format(jid=jid_.full(), reason=e.condition))
                items = disco.DiscoItems()

        defer.returnValue(items)


    def _infosEb(self, failure_, entity_jid):
        failure_.trap(StanzaError)
        log.warning(_(u"Error while requesting [%(jid)s]: %(error)s") % {'jid': entity_jid.full(),
                                                                    'error': failure_.getErrorMessage()})

    def findServiceEntities(self, category, type_, jid_=None, profile=C.PROF_KEY_NONE):
        """Return all available items of an entity which correspond to (category, type_)

        @param category: identity's category
        @param type_: identitiy's type
        @param jid_: the jid of the target server (None for profile's server)
        @param profile: %(doc_profile)s
        @return: a set of found entities
        @raise defer.CancelledError: the request timed out
        """
        found_entities = set()

        def infosCb(infos, entity_jid):
            if (category, type_) in infos.identities:
                found_entities.add(entity_jid)

        def gotItems(items):
            defers_list = []
            for item in items:
                info_d = self.getInfos(item.entity, profile)
                info_d.addCallbacks(infosCb, self._infosEb, [item.entity], None, [item.entity])
                defers_list.append(info_d)
            return defer.DeferredList(defers_list)

        d = self.getItems(jid_, profile=profile)
        d.addCallback(gotItems)
        d.addCallback(lambda dummy: found_entities)
        reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout
        return d

    def findFeaturesSet(self, features, identity=None, jid_=None, profile=C.PROF_KEY_NONE):
        """Return entities (including jid_ and its items) offering features

        @param features: iterable of features which must be present
        @param identity(None, tuple(unicode, unicode)): if not None, accept only this (category/type) identity
        @param jid_: the jid of the target server (None for profile's server)
        @param profile: %(doc_profile)s
        @return: a set of found entities
        """
        client = self.host.getClient(profile)
        if jid_ is None:
            jid_ = jid.JID(client.jid.host)
        features = set(features)
        found_entities = set()

        def infosCb(infos, entity):
            if identity is not None and identity not in infos.identities:
                return
            if features.issubset(infos.features):
                found_entities.add(entity)

        def gotItems(items):
            defer_list = []
            for entity in [jid_] + [item.entity for item in items]:
                infos_d = self.getInfos(entity, profile)
                infos_d.addCallbacks(infosCb, self._infosEb, [entity], None, [entity])
                defer_list.append(infos_d)
            return defer.DeferredList(defer_list)

        d = self.getItems(jid_, profile=profile)
        d.addCallback(gotItems)
        d.addCallback(lambda dummy: found_entities)
        reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout
        return d

    def generateHash(self, services):
        """ Generate a unique hash for given service

        hash algorithm is the one described in XEP-0115
        @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info

        """
        s = []
        byte_identities = [ByteIdentity(service) for service in services if isinstance(service, disco.DiscoIdentity)]  # FIXME: lang must be managed here
        byte_identities.sort(key=lambda i: i.lang)
        byte_identities.sort(key=lambda i: i.idType)
        byte_identities.sort(key=lambda i: i.category)
        for identity in byte_identities:
            s.append(str(identity))
            s.append('<')
        byte_features = [service.encode('utf-8') for service in services if isinstance(service, disco.DiscoFeature)]
        byte_features.sort()  # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort
        for feature in byte_features:
            s.append(feature)
            s.append('<')
        #TODO: manage XEP-0128 data form here
        cap_hash = b64encode(sha1(''.join(s)).digest())
        log.debug(_(u'Capability hash generated: [%s]') % cap_hash)
        return cap_hash

    @defer.inlineCallbacks
    def _discoInfos(self, entity_jid_s, profile_key=C.PROF_KEY_NONE):
        """ Discovery method for the bridge
        @param entity_jid_s: entity we want to discover

        @return: list of tuples
        """
        profile = self.host.memory.getProfileName(profile_key)
        entity = jid.JID(entity_jid_s)
        disco_infos = yield self.getInfos(entity, profile)
        extensions = {}
        for form_type, form in disco_infos.extensions.items():
            fields = []
            for field in form.fieldList:
                data = {'type': field.fieldType}
                for attr in ('var', 'label', 'desc'):
                    value = getattr(field, attr)
                    if value is not None:
                        data[attr] = value

                values = [field.value] if field.value is not None else field.values
                fields.append((data, values))

            extensions[form_type or ""] = fields

        defer.returnValue((disco_infos.features,
            [(cat, type_, name or '') for (cat, type_), name in disco_infos.identities.items()],
            extensions))

    @defer.inlineCallbacks
    def _discoItems(self, entity_jid_s, profile_key=C.PROF_KEY_NONE):
        """ Discovery method for the bridge
        @param entity_jid_s: entity we want to discover

        @return: list of tuples"""
        profile = self.host.memory.getProfileName(profile_key)
        entity = jid.JID(entity_jid_s)
        disco_items = yield self.getItems(entity, profile=profile)
        defer.returnValue([(item.entity.full(), item.nodeIdentifier or '', item.name or '') for item in disco_items])