view sat/memory/disco.py @ 2701:2ea2369ae7de

plugin XEP-0313: implementation of MAM for messages: - (core/xmpp): new messageGetBridgeArgs to easily retrieve arguments used in bridge from message data - : parseMessage is not static anymore - : new "message_parse" trigger point - (xep-0313) : new "MAMGet" bridge method to retrieve history from MAM instead of local one - : on profileConnected, if previous MAM message is found (i.e. message with a stanza_id), message received while offline are retrieved and injected in message workflow. In other words, one2one history is synchronised on connection. - : new "parseExtra" method which parse MAM (and optionally RSM) option from extra dictionary used in bridge.
author Goffi <goffi@goffi.org>
date Sat, 01 Dec 2018 10:33:43 +0100
parents 56f94936df1e
children c3f59c1dcb0a
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# SAT: a jabber client
# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core import exceptions
from sat.core.log import getLogger

log = getLogger(__name__)
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.error import StanzaError
from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import failure
from sat.core.constants import Const as C
from sat.tools import xml_tools
from sat.memory import persistent
from wokkel import disco
from base64 import b64encode
from hashlib import sha1


TIMEOUT = 15
CAP_HASH_ERROR = "ERROR"


class HashGenerationError(Exception):
    pass


class ByteIdentity(object):
    """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation"""

    def __init__(self, identity, lang=None):
        assert isinstance(identity, disco.DiscoIdentity)
        self.category = identity.category.encode("utf-8")
        self.idType = identity.type.encode("utf-8")
        self.name = identity.name.encode("utf-8") if identity.name else ""
        self.lang = lang.encode("utf-8") if lang is not None else ""

    def __str__(self):
        return "%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name)


class HashManager(object):
    """map object which manage hashes

    persistent storage is update when a new hash is added
    """

    def __init__(self, persistent):
        self.hashes = {
            CAP_HASH_ERROR: disco.DiscoInfo()  # used when we can't get disco infos
        }
        self.persistent = persistent

    def __getitem__(self, key):
        return self.hashes[key]

    def __setitem__(self, hash_, disco_info):
        if hash_ in self.hashes:
            log.debug(u"ignoring hash set: it is already known")
            return
        self.hashes[hash_] = disco_info
        self.persistent[hash_] = disco_info.toElement().toXml()

    def __contains__(self, hash_):
        return self.hashes.__contains__(hash_)

    def load(self):
        def fillHashes(hashes):
            for hash_, xml in hashes.iteritems():
                element = xml_tools.ElementParser()(xml)
                disco_info = disco.DiscoInfo.fromElement(element)
                if not disco_info.features and not disco_info.identities:
                    log.warning(
                        _(
                            u"no feature/identity found in disco element (hash: {cap_hash}), ignoring: {xml}"
                        ).format(cap_hash=hash_, xml=xml)
                    )
                else:
                    self.hashes[hash_] = disco_info

            log.info(u"Disco hashes loaded")

        d = self.persistent.load()
        d.addCallback(fillHashes)
        return d


class Discovery(object):
    """ Manage capabilities of entities """

    def __init__(self, host):
        self.host = host
        # TODO: remove legacy hashes

    def load(self):
        """Load persistent hashes"""
        self.hashes = HashManager(persistent.PersistentDict("disco"))
        return self.hashes.load()

    @defer.inlineCallbacks
    def hasFeature(self, client, feature, jid_=None, node=u""):
        """Tell if an entity has the required feature

        @param feature: feature namespace
        @param jid_: jid of the target, or None for profile's server
        @param node(unicode): optional node to use for disco request
        @return: a Deferred which fire a boolean (True if feature is available)
        """
        disco_infos = yield self.getInfos(client, jid_, node)
        defer.returnValue(feature in disco_infos.features)

    @defer.inlineCallbacks
    def checkFeature(self, client, feature, jid_=None, node=u""):
        """Like hasFeature, but raise an exception is feature is not Found

        @param feature: feature namespace
        @param jid_: jid of the target, or None for profile's server
        @param node(unicode): optional node to use for disco request

        @raise: exceptions.FeatureNotFound
        """
        disco_infos = yield self.getInfos(client, jid_, node)
        if not feature in disco_infos.features:
            raise failure.Failure(exceptions.FeatureNotFound)

    @defer.inlineCallbacks
    def checkFeatures(self, client, features, jid_=None, identity=None, node=u""):
        """Like checkFeature, but check several features at once, and check also identity

        @param features(iterable[unicode]): features to check
        @param jid_(jid.JID): jid of the target, or None for profile's server
        @param node(unicode): optional node to use for disco request
        @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple

        @raise: exceptions.FeatureNotFound
        """
        disco_infos = yield self.getInfos(client, jid_, node)
        if not set(features).issubset(disco_infos.features):
            raise failure.Failure(exceptions.FeatureNotFound())

        if identity is not None and identity not in disco_infos.identities:
            raise failure.Failure(exceptions.FeatureNotFound())

    def getInfos(self, client, jid_=None, node=u"", use_cache=True):
        """get disco infos from jid_, filling capability hash if needed

        @param jid_: jid of the target, or None for profile's server
        @param node(unicode): optional node to use for disco request
        @param use_cache(bool): if True, use cached data if available
        @return: a Deferred which fire disco.DiscoInfo
        """
        if jid_ is None:
            jid_ = jid.JID(client.jid.host)
        try:
            if not use_cache:
                # we ignore cache, so we pretend we haven't found it
                raise KeyError
            cap_hash = self.host.memory.getEntityData(
                jid_, [C.ENTITY_CAP_HASH], client.profile
            )[C.ENTITY_CAP_HASH]
        except (KeyError, exceptions.UnknownEntityError):
            # capability hash is not available, we'll compute one
            def infosCb(disco_infos):
                cap_hash = self.generateHash(disco_infos)
                self.hashes[cap_hash] = disco_infos
                self.host.memory.updateEntityData(
                    jid_, C.ENTITY_CAP_HASH, cap_hash, profile_key=client.profile
                )
                return disco_infos

            def infosEb(fail):
                if fail.check(defer.CancelledError):
                    reason = u"request time-out"
                else:
                    try:
                        reason = unicode(fail.value)
                    except AttributeError:
                        reason = unicode(fail)
                    log.warning(
                        u"Error while requesting disco infos from {jid}: {reason}".format(
                            jid=jid_.full(), reason=reason
                        )
                    )
                self.host.memory.updateEntityData(
                    jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR, profile_key=client.profile
                )
                disco_infos = self.hashes[CAP_HASH_ERROR]
                return disco_infos

            d = client.disco.requestInfo(jid_, nodeIdentifier=node)
            d.addCallback(infosCb)
            d.addErrback(infosEb)
            return d
        else:
            disco_infos = self.hashes[cap_hash]
            return defer.succeed(disco_infos)

    @defer.inlineCallbacks
    def getItems(self, client, jid_=None, node=u"", use_cache=True):
        """get disco items from jid_, cache them for our own server

        @param jid_(jid.JID): jid of the target, or None for profile's server
        @param node(unicode): optional node to use for disco request
        @param use_cache(bool): if True, use cached data if available
        @return: a Deferred which fire disco.DiscoItems
        """
        server_jid = jid.JID(client.jid.host)
        if jid_ is None:
            jid_ = server_jid

        if jid_ == server_jid and not node:
            # we cache items only for our own server and if node is not set
            try:
                items = self.host.memory.getEntityData(
                    jid_, ["DISCO_ITEMS"], client.profile
                )["DISCO_ITEMS"]
                log.debug(u"[%s] disco items are in cache" % jid_.full())
                if not use_cache:
                    # we ignore cache, so we pretend we haven't found it
                    raise KeyError
            except (KeyError, exceptions.UnknownEntityError):
                log.debug(u"Caching [%s] disco items" % jid_.full())
                items = yield client.disco.requestItems(jid_, nodeIdentifier=node)
                self.host.memory.updateEntityData(
                    jid_, "DISCO_ITEMS", items, profile_key=client.profile
                )
        else:
            try:
                items = yield client.disco.requestItems(jid_, nodeIdentifier=node)
            except StanzaError as e:
                log.warning(
                    u"Error while requesting items for {jid}: {reason}".format(
                        jid=jid_.full(), reason=e.condition
                    )
                )
                items = disco.DiscoItems()

        defer.returnValue(items)

    def _infosEb(self, failure_, entity_jid):
        failure_.trap(StanzaError)
        log.warning(
            _(u"Error while requesting [%(jid)s]: %(error)s")
            % {"jid": entity_jid.full(), "error": failure_.getErrorMessage()}
        )

    def findServiceEntity(self, client, category, type_, jid_=None):
        """Helper method to find first available entity from findServiceEntities

        args are the same as for [findServiceEntities]
        @return (jid.JID, None): found entity
        """
        d = self.host.findServiceEntities(client, "pubsub", "service")
        d.addCallback(lambda entities: entities.pop() if entities else None)
        return d

    def findServiceEntities(self, client, category, type_, jid_=None):
        """Return all available items of an entity which correspond to (category, type_)

        @param category: identity's category
        @param type_: identitiy's type
        @param jid_: the jid of the target server (None for profile's server)
        @return: a set of found entities
        @raise defer.CancelledError: the request timed out
        """
        found_entities = set()

        def infosCb(infos, entity_jid):
            if (category, type_) in infos.identities:
                found_entities.add(entity_jid)

        def gotItems(items):
            defers_list = []
            for item in items:
                info_d = self.getInfos(client, item.entity)
                info_d.addCallbacks(
                    infosCb, self._infosEb, [item.entity], None, [item.entity]
                )
                defers_list.append(info_d)
            return defer.DeferredList(defers_list)

        d = self.getItems(client, jid_)
        d.addCallback(gotItems)
        d.addCallback(lambda dummy: found_entities)
        reactor.callLater(
            TIMEOUT, d.cancel
        )  # FIXME: one bad service make a general timeout
        return d

    def findFeaturesSet(self, client, features, identity=None, jid_=None):
        """Return entities (including jid_ and its items) offering features

        @param features: iterable of features which must be present
        @param identity(None, tuple(unicode, unicode)): if not None, accept only this (category/type) identity
        @param jid_: the jid of the target server (None for profile's server)
        @param profile: %(doc_profile)s
        @return: a set of found entities
        """
        if jid_ is None:
            jid_ = jid.JID(client.jid.host)
        features = set(features)
        found_entities = set()

        def infosCb(infos, entity):
            if entity is None:
                log.warning(_(u"received an item without jid"))
                return
            if identity is not None and identity not in infos.identities:
                return
            if features.issubset(infos.features):
                found_entities.add(entity)

        def gotItems(items):
            defer_list = []
            for entity in [jid_] + [item.entity for item in items]:
                infos_d = self.getInfos(client, entity)
                infos_d.addCallbacks(infosCb, self._infosEb, [entity], None, [entity])
                defer_list.append(infos_d)
            return defer.DeferredList(defer_list)

        d = self.getItems(client, jid_)
        d.addCallback(gotItems)
        d.addCallback(lambda dummy: found_entities)
        reactor.callLater(
            TIMEOUT, d.cancel
        )  # FIXME: one bad service make a general timeout
        return d

    def generateHash(self, services):
        """ Generate a unique hash for given service

        hash algorithm is the one described in XEP-0115
        @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info

        """
        s = []
        byte_identities = [
            ByteIdentity(service)
            for service in services
            if isinstance(service, disco.DiscoIdentity)
        ]  # FIXME: lang must be managed here
        byte_identities.sort(key=lambda i: i.lang)
        byte_identities.sort(key=lambda i: i.idType)
        byte_identities.sort(key=lambda i: i.category)
        for identity in byte_identities:
            s.append(str(identity))
            s.append("<")
        byte_features = [
            service.encode("utf-8")
            for service in services
            if isinstance(service, disco.DiscoFeature)
        ]
        byte_features.sort()  # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort
        for feature in byte_features:
            s.append(feature)
            s.append("<")
        # TODO: manage XEP-0128 data form here
        cap_hash = b64encode(sha1("".join(s)).digest())
        log.debug(_(u"Capability hash generated: [%s]") % cap_hash)
        return cap_hash

    @defer.inlineCallbacks
    def _discoInfos(
        self, entity_jid_s, node=u"", use_cache=True, profile_key=C.PROF_KEY_NONE
    ):
        """ Discovery method for the bridge
        @param entity_jid_s: entity we want to discover
        @param use_cache(bool): if True, use cached data if available
        @param node(unicode): optional node to use

        @return: list of tuples
        """
        client = self.host.getClient(profile_key)
        entity = jid.JID(entity_jid_s)
        disco_infos = yield self.getInfos(client, entity, node, use_cache)
        extensions = {}
        for form_type, form in disco_infos.extensions.items():
            fields = []
            for field in form.fieldList:
                data = {"type": field.fieldType}
                for attr in ("var", "label", "desc"):
                    value = getattr(field, attr)
                    if value is not None:
                        data[attr] = value

                values = [field.value] if field.value is not None else field.values
                fields.append((data, values))

            extensions[form_type or ""] = fields

        defer.returnValue(
            (
                disco_infos.features,
                [
                    (cat, type_, name or "")
                    for (cat, type_), name in disco_infos.identities.items()
                ],
                extensions,
            )
        )

    def items2tuples(self, disco_items):
        """convert disco items to tuple of strings

        @param disco_items(iterable[disco.DiscoItem]): items
        @return G(tuple[unicode,unicode,unicode]): serialised items
        """
        for item in disco_items:
            if not item.entity:
                log.warning(_(u"invalid item (no jid)"))
                continue
            yield (item.entity.full(), item.nodeIdentifier or "", item.name or "")

    @defer.inlineCallbacks
    def _discoItems(
        self, entity_jid_s, node=u"", use_cache=True, profile_key=C.PROF_KEY_NONE
    ):
        """ Discovery method for the bridge

        @param entity_jid_s: entity we want to discover
        @param node(unicode): optional node to use
        @param use_cache(bool): if True, use cached data if available
        @return: list of tuples"""
        client = self.host.getClient(profile_key)
        entity = jid.JID(entity_jid_s)
        disco_items = yield self.getItems(client, entity, node, use_cache)
        ret = list(self.items2tuples(disco_items))
        defer.returnValue(ret)