Mercurial > libervia-backend
view src/memory/disco.py @ 2470:8084066ac95b
tools (utils): new "partial" function which is a hack to workaround issue with functools.partial while using it with dbus module.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 12 Jan 2018 15:45:37 +0100 |
parents | a9fac72825a2 |
children | 0046283a285d |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # SAT: a jabber client # Copyright (C) 2009-2017 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core import exceptions from sat.core.log import getLogger log = getLogger(__name__) from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.error import StanzaError from twisted.internet import defer from twisted.internet import reactor from twisted.python import failure from sat.core.constants import Const as C from sat.tools import xml_tools from sat.memory import persistent from wokkel import disco from base64 import b64encode from hashlib import sha1 TIMEOUT = 15 CAP_HASH_ERROR = 'ERROR' class HashGenerationError(Exception): pass class ByteIdentity(object): """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation""" def __init__(self, identity, lang=None): assert isinstance(identity, disco.DiscoIdentity) self.category = identity.category.encode('utf-8') self.idType = identity.type.encode('utf-8') self.name = identity.name.encode('utf-8') if identity.name else '' self.lang = lang.encode('utf-8') if lang is not None else '' def __str__(self): return "%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name) class HashManager(object): """map object which manage hashes persistent storage is update when a new hash is added """ def __init__(self, persistent): self.hashes = { CAP_HASH_ERROR: disco.DiscoInfo(), # used when we can't get disco infos } self.persistent = persistent def __getitem__(self, key): return self.hashes[key] def __setitem__(self, hash_, disco_info): if hash_ in self.hashes: log.debug(u"ignoring hash set: it is already known") return self.hashes[hash_] = disco_info self.persistent[hash_] = disco_info.toElement().toXml() def __contains__(self, hash_): return self.hashes.__contains__(hash_) def load(self): def fillHashes(hashes): for hash_, xml in hashes.iteritems(): element = xml_tools.ElementParser()(xml) self.hashes[hash_] = disco.DiscoInfo.fromElement(element) log.info(u"Disco hashes loaded") d = self.persistent.load() d.addCallback(fillHashes) return d class Discovery(object): """ Manage capabilities of entities """ def __init__(self, host): self.host = host # TODO: remove legacy hashes def load(self): """Load persistent hashes""" self.hashes = HashManager(persistent.PersistentDict("disco")) return self.hashes.load() @defer.inlineCallbacks def hasFeature(self, client, feature, jid_=None, node=u''): """Tell if an entity has the required feature @param feature: feature namespace @param jid_: jid of the target, or None for profile's server @param node(unicode): optional node to use for disco request @return: a Deferred which fire a boolean (True if feature is available) """ disco_infos = yield self.getInfos(client, jid_, node) defer.returnValue(feature in disco_infos.features) @defer.inlineCallbacks def checkFeature(self, client, feature, jid_=None, node=u''): """Like hasFeature, but raise an exception is feature is not Found @param feature: feature namespace @param jid_: jid of the target, or None for profile's server @param node(unicode): optional node to use for disco request @raise: exceptions.FeatureNotFound """ disco_infos = yield self.getInfos(client, jid_, node) if not feature in disco_infos.features: raise failure.Failure(exceptions.FeatureNotFound) @defer.inlineCallbacks def checkFeatures(self, client, features, jid_=None, identity=None, node=u''): """Like checkFeature, but check several features at once, and check also identity @param features(iterable[unicode]): features to check @param jid_(jid.JID): jid of the target, or None for profile's server @param node(unicode): optional node to use for disco request @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple @raise: exceptions.FeatureNotFound """ disco_infos = yield self.getInfos(client, jid_, node) if not set(features).issubset(disco_infos.features): raise failure.Failure(exceptions.FeatureNotFound()) if identity is not None and identity not in disco_infos.identities: raise failure.Failure(exceptions.FeatureNotFound()) def getInfos(self, client, jid_=None, node=u'', use_cache=True): """get disco infos from jid_, filling capability hash if needed @param jid_: jid of the target, or None for profile's server @param node(unicode): optional node to use for disco request @param use_cache(bool): if True, use cached data if available @return: a Deferred which fire disco.DiscoInfo """ if jid_ is None: jid_ = jid.JID(client.jid.host) try: cap_hash = self.host.memory.getEntityData(jid_, [C.ENTITY_CAP_HASH], client.profile)[C.ENTITY_CAP_HASH] if not use_cache: # we ignore cache, so we pretend we haven't found it raise KeyError except (KeyError, exceptions.UnknownEntityError): # capability hash is not available, we'll compute one def infosCb(disco_infos): cap_hash = self.generateHash(disco_infos) self.hashes[cap_hash] = disco_infos self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, cap_hash, profile_key=client.profile) return disco_infos def infosEb(fail): if fail.check(defer.CancelledError): reason = u"request time-out" else: try: reason = unicode(fail.value) except AttributeError: reason = unicode(fail) log.warning(u"Error while requesting disco infos from {jid}: {reason}".format(jid=jid_.full(), reason=reason)) self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR, profile_key=client.profile) disco_infos = self.hashes[CAP_HASH_ERROR] return disco_infos d = client.disco.requestInfo(jid_, nodeIdentifier=node) d.addCallback(infosCb) d.addErrback(infosEb) return d else: disco_infos = self.hashes[cap_hash] return defer.succeed(disco_infos) @defer.inlineCallbacks def getItems(self, client, jid_=None, node=u'', use_cache=True): """get disco items from jid_, cache them for our own server @param jid_(jid.JID): jid of the target, or None for profile's server @param node(unicode): optional node to use for disco request @param use_cache(bool): if True, use cached data if available @return: a Deferred which fire disco.DiscoItems """ server_jid = jid.JID(client.jid.host) if jid_ is None: jid_ = server_jid if jid_ == server_jid and not node: # we cache items only for our own server and if node is not set try: items = self.host.memory.getEntityData(jid_, ["DISCO_ITEMS"], client.profile)["DISCO_ITEMS"] log.debug(u"[%s] disco items are in cache" % jid_.full()) if not use_cache: # we ignore cache, so we pretend we haven't found it raise KeyError except (KeyError, exceptions.UnknownEntityError): log.debug(u"Caching [%s] disco items" % jid_.full()) items = yield client.disco.requestItems(jid_, nodeIdentifier=node) self.host.memory.updateEntityData(jid_, "DISCO_ITEMS", items, profile_key=client.profile) else: try: items = yield client.disco.requestItems(jid_, nodeIdentifier=node) except StanzaError as e: log.warning(u"Error while requesting items for {jid}: {reason}" .format(jid=jid_.full(), reason=e.condition)) items = disco.DiscoItems() defer.returnValue(items) def _infosEb(self, failure_, entity_jid): failure_.trap(StanzaError) log.warning(_(u"Error while requesting [%(jid)s]: %(error)s") % {'jid': entity_jid.full(), 'error': failure_.getErrorMessage()}) def findServiceEntity(self, client, category, type_, jid_=None): """Helper method to find first available entity from findServiceEntities args are the same as for [findServiceEntities] @return (jid.JID, None): found entity """ d = self.host.findServiceEntities(client, "pubsub", "service") d.addCallback(lambda entities: entities.pop() if entities else None) return d def findServiceEntities(self, client, category, type_, jid_=None): """Return all available items of an entity which correspond to (category, type_) @param category: identity's category @param type_: identitiy's type @param jid_: the jid of the target server (None for profile's server) @return: a set of found entities @raise defer.CancelledError: the request timed out """ found_entities = set() def infosCb(infos, entity_jid): if (category, type_) in infos.identities: found_entities.add(entity_jid) def gotItems(items): defers_list = [] for item in items: info_d = self.getInfos(client, item.entity) info_d.addCallbacks(infosCb, self._infosEb, [item.entity], None, [item.entity]) defers_list.append(info_d) return defer.DeferredList(defers_list) d = self.getItems(client, jid_) d.addCallback(gotItems) d.addCallback(lambda dummy: found_entities) reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout return d def findFeaturesSet(self, client, features, identity=None, jid_=None): """Return entities (including jid_ and its items) offering features @param features: iterable of features which must be present @param identity(None, tuple(unicode, unicode)): if not None, accept only this (category/type) identity @param jid_: the jid of the target server (None for profile's server) @param profile: %(doc_profile)s @return: a set of found entities """ if jid_ is None: jid_ = jid.JID(client.jid.host) features = set(features) found_entities = set() def infosCb(infos, entity): if identity is not None and identity not in infos.identities: return if features.issubset(infos.features): found_entities.add(entity) def gotItems(items): defer_list = [] for entity in [jid_] + [item.entity for item in items]: infos_d = self.getInfos(client, entity) infos_d.addCallbacks(infosCb, self._infosEb, [entity], None, [entity]) defer_list.append(infos_d) return defer.DeferredList(defer_list) d = self.getItems(client, jid_) d.addCallback(gotItems) d.addCallback(lambda dummy: found_entities) reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout return d def generateHash(self, services): """ Generate a unique hash for given service hash algorithm is the one described in XEP-0115 @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info """ s = [] byte_identities = [ByteIdentity(service) for service in services if isinstance(service, disco.DiscoIdentity)] # FIXME: lang must be managed here byte_identities.sort(key=lambda i: i.lang) byte_identities.sort(key=lambda i: i.idType) byte_identities.sort(key=lambda i: i.category) for identity in byte_identities: s.append(str(identity)) s.append('<') byte_features = [service.encode('utf-8') for service in services if isinstance(service, disco.DiscoFeature)] byte_features.sort() # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort for feature in byte_features: s.append(feature) s.append('<') #TODO: manage XEP-0128 data form here cap_hash = b64encode(sha1(''.join(s)).digest()) log.debug(_(u'Capability hash generated: [%s]') % cap_hash) return cap_hash @defer.inlineCallbacks def _discoInfos(self, entity_jid_s, node=u'', use_cache=True, profile_key=C.PROF_KEY_NONE): """ Discovery method for the bridge @param entity_jid_s: entity we want to discover @param use_cache(bool): if True, use cached data if available @param node(unicode): optional node to use @return: list of tuples """ client = self.host.getClient(profile_key) entity = jid.JID(entity_jid_s) disco_infos = yield self.getInfos(client, entity, node, use_cache) extensions = {} for form_type, form in disco_infos.extensions.items(): fields = [] for field in form.fieldList: data = {'type': field.fieldType} for attr in ('var', 'label', 'desc'): value = getattr(field, attr) if value is not None: data[attr] = value values = [field.value] if field.value is not None else field.values fields.append((data, values)) extensions[form_type or ""] = fields defer.returnValue((disco_infos.features, [(cat, type_, name or '') for (cat, type_), name in disco_infos.identities.items()], extensions)) def items2tuples(self, disco_items): """convert disco items to tuple of strings @param disco_items(iterable[disco.DiscoItem]): items @return G(tuple[unicode,unicode,unicode]): serialised items """ for item in disco_items: if not item.entity: log.warning(_(u"invalid item (no jid)")) continue yield (item.entity.full(), item.nodeIdentifier or '', item.name or '') @defer.inlineCallbacks def _discoItems(self, entity_jid_s, node=u'', use_cache=True, profile_key=C.PROF_KEY_NONE): """ Discovery method for the bridge @param entity_jid_s: entity we want to discover @param node(unicode): optional node to use @param use_cache(bool): if True, use cached data if available @return: list of tuples""" client = self.host.getClient(profile_key) entity = jid.JID(entity_jid_s) disco_items = yield self.getItems(client, entity, node, use_cache) ret = list(self.items2tuples(disco_items)) defer.returnValue(ret)