Mercurial > libervia-backend
view src/memory/disco.py @ 2092:5e79ba00c1e2
bridbe(pb): Perspective Broker bridge, first draft
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 13 Dec 2016 22:27:49 +0100 |
parents | 2daf7b4c6756 |
children | efe31f0881fb |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # SAT: a jabber client # Copyright (C) 2009-2016 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core import exceptions from sat.core.log import getLogger log = getLogger(__name__) from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.error import StanzaError from twisted.internet import defer from twisted.internet import reactor from twisted.python import failure from sat.core.constants import Const as C from sat.tools import xml_tools from sat.memory import persistent from wokkel import disco from base64 import b64encode from hashlib import sha1 TIMEOUT = 15 CAP_HASH_ERROR = 'ERROR' class HashGenerationError(Exception): pass class ByteIdentity(object): """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation""" def __init__(self, identity, lang=None): assert isinstance(identity, disco.DiscoIdentity) self.category = identity.category.encode('utf-8') self.idType = identity.type.encode('utf-8') self.name = identity.name.encode('utf-8') if identity.name else '' self.lang = lang.encode('utf-8') if lang is not None else '' def __str__(self): return "%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name) class HashManager(object): """map object which manage hashes persistent storage is update when a new hash is added """ def __init__(self, persistent): self.hashes = { CAP_HASH_ERROR: disco.DiscoInfo(), # used when we can't get disco infos } self.persistent = persistent def __getitem__(self, key): return self.hashes[key] def __setitem__(self, hash_, disco_info): if hash_ in self.hashes: log.debug(u"ignoring hash set: it is already known") return self.hashes[hash_] = disco_info self.persistent[hash_] = disco_info.toElement().toXml() def __contains__(self, hash_): return self.hashes.__contains__(hash_) def load(self): def fillHashes(hashes): for hash_, xml in hashes.iteritems(): element = xml_tools.ElementParser()(xml) self.hashes[hash_] = disco.DiscoInfo.fromElement(element) log.info(u"Disco hashes loaded") d = self.persistent.load() d.addCallback(fillHashes) return d class Discovery(object): """ Manage capabilities of entities """ def __init__(self, host): self.host = host # TODO: remove legacy hashes def load(self): """Load persistent hashes""" self.hashes = HashManager(persistent.PersistentDict("disco")) return self.hashes.load() @defer.inlineCallbacks def hasFeature(self, feature, jid_=None, profile=C.PROF_KEY_NONE): """Tell if an entity has the required feature @param feature: feature namespace @param jid_: jid of the target, or None for profile's server @param profile: %(doc_profile)s @return: a Deferred which fire a boolean (True if feature is available) """ disco_infos = yield self.getInfos(jid_, profile) defer.returnValue(feature in disco_infos.features) @defer.inlineCallbacks def checkFeature(self, feature, jid_=None, profile=C.PROF_KEY_NONE): """Like hasFeature, but raise an exception is feature is not Found @param feature: feature namespace @param jid_: jid of the target, or None for profile's server @param profile: %(doc_profile)s @raise: exceptions.FeatureNotFound """ disco_infos = yield self.getInfos(jid_, profile) if not feature in disco_infos.features: raise failure.Failure(exceptions.FeatureNotFound) @defer.inlineCallbacks def checkFeatures(self, features, jid_=None, identity=None, profile=C.PROF_KEY_NONE): """Like checkFeature, but check several features at once, and check also identity @param features(iterable[unicode]): features to check @param jid_(jid.JID): jid of the target, or None for profile's server @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple @param profile: %(doc_profile)s @raise: exceptions.FeatureNotFound """ disco_infos = yield self.getInfos(jid_, profile) if not set(features).issubset(disco_infos.features): raise failure.Failure(exceptions.FeatureNotFound()) if identity is not None and identity not in disco_infos.identities: raise failure.Failure(exceptions.FeatureNotFound()) def getInfos(self, jid_=None, profile=C.PROF_KEY_NONE): """get disco infos from jid_, filling capability hash if needed @param jid_: jid of the target, or None for profile's server @param profile: %(doc_profile)s @return: a Deferred which fire disco.DiscoInfo """ client = self.host.getClient(profile) if jid_ is None: jid_ = jid.JID(client.jid.host) try: cap_hash = self.host.memory.getEntityData(jid_, [C.ENTITY_CAP_HASH], client.profile)[C.ENTITY_CAP_HASH] except (KeyError, exceptions.UnknownEntityError): # capability hash is not available, we'll compute one def infosCb(disco_infos): cap_hash = self.generateHash(disco_infos) self.hashes[cap_hash] = disco_infos self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, cap_hash, profile_key=client.profile) return disco_infos def infosEb(fail): if fail.check(defer.CancelledError): reason = u"request time-out" else: try: reason = unicode(fail.value) except AttributeError: reason = unicode(fail) log.warning(u"Error while requesting disco infos from {jid}: {reason}".format(jid=jid_.full(), reason=reason)) self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR, profile_key=client.profile) disco_infos = self.hashes[CAP_HASH_ERROR] return disco_infos d = client.disco.requestInfo(jid_) d.addCallback(infosCb) d.addErrback(infosEb) return d else: disco_infos = self.hashes[cap_hash] return defer.succeed(disco_infos) @defer.inlineCallbacks def getItems(self, jid_=None, nodeIdentifier='', profile=C.PROF_KEY_NONE): """get disco items from jid_, cache them for our own server @param jid_ (jid.JID): jid of the target, or None for profile's server @param nodeIdentifier (str): optional node identifier @param profile: %(doc_profile)s @return: a Deferred which fire disco.DiscoItems """ client = self.host.getClient(profile) if jid_ is None: jid_ = jid.JID(client.jid.host) # we cache items only for our own server try: items = self.host.memory.getEntityData(jid_, ["DISCO_ITEMS"], client.profile)["DISCO_ITEMS"] log.debug(u"[%s] disco items are in cache" % jid_.full()) except (KeyError, exceptions.UnknownEntityError): log.debug(u"Caching [%s] disco items" % jid_.full()) items = yield client.disco.requestItems(jid_, nodeIdentifier) self.host.memory.updateEntityData(jid_, "DISCO_ITEMS", items, profile_key=client.profile) else: try: items = yield client.disco.requestItems(jid_, nodeIdentifier) except StanzaError as e: log.warning(u"Error while requesting items for {jid}: {reason}" .format(jid=jid_.full(), reason=e.condition)) items = disco.DiscoItems() defer.returnValue(items) def _infosEb(self, failure_, entity_jid): failure_.trap(StanzaError) log.warning(_(u"Error while requesting [%(jid)s]: %(error)s") % {'jid': entity_jid.full(), 'error': failure_.getErrorMessage()}) def findServiceEntities(self, category, type_, jid_=None, profile=C.PROF_KEY_NONE): """Return all available items of an entity which correspond to (category, type_) @param category: identity's category @param type_: identitiy's type @param jid_: the jid of the target server (None for profile's server) @param profile: %(doc_profile)s @return: a set of found entities @raise defer.CancelledError: the request timed out """ found_entities = set() def infosCb(infos, entity_jid): if (category, type_) in infos.identities: found_entities.add(entity_jid) def gotItems(items): defers_list = [] for item in items: info_d = self.getInfos(item.entity, profile) info_d.addCallbacks(infosCb, self._infosEb, [item.entity], None, [item.entity]) defers_list.append(info_d) return defer.DeferredList(defers_list) d = self.getItems(jid_, profile=profile) d.addCallback(gotItems) d.addCallback(lambda dummy: found_entities) reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout return d def findFeaturesSet(self, features, identity=None, jid_=None, profile=C.PROF_KEY_NONE): """Return entities (including jid_ and its items) offering features @param features: iterable of features which must be present @param identity(None, tuple(unicode, unicode)): if not None, accept only this (category/type) identity @param jid_: the jid of the target server (None for profile's server) @param profile: %(doc_profile)s @return: a set of found entities """ client = self.host.getClient(profile) if jid_ is None: jid_ = jid.JID(client.jid.host) features = set(features) found_entities = set() def infosCb(infos, entity): if identity is not None and identity not in infos.identities: return if features.issubset(infos.features): found_entities.add(entity) def gotItems(items): defer_list = [] for entity in [jid_] + [item.entity for item in items]: infos_d = self.getInfos(entity, profile) infos_d.addCallbacks(infosCb, self._infosEb, [entity], None, [entity]) defer_list.append(infos_d) return defer.DeferredList(defer_list) d = self.getItems(jid_, profile=profile) d.addCallback(gotItems) d.addCallback(lambda dummy: found_entities) reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout return d def generateHash(self, services): """ Generate a unique hash for given service hash algorithm is the one described in XEP-0115 @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info """ s = [] byte_identities = [ByteIdentity(service) for service in services if isinstance(service, disco.DiscoIdentity)] # FIXME: lang must be managed here byte_identities.sort(key=lambda i: i.lang) byte_identities.sort(key=lambda i: i.idType) byte_identities.sort(key=lambda i: i.category) for identity in byte_identities: s.append(str(identity)) s.append('<') byte_features = [service.encode('utf-8') for service in services if isinstance(service, disco.DiscoFeature)] byte_features.sort() # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort for feature in byte_features: s.append(feature) s.append('<') #TODO: manage XEP-0128 data form here cap_hash = b64encode(sha1(''.join(s)).digest()) log.debug(_(u'Capability hash generated: [%s]') % cap_hash) return cap_hash @defer.inlineCallbacks def _discoInfos(self, entity_jid_s, profile_key=C.PROF_KEY_NONE): """ Discovery method for the bridge @param entity_jid_s: entity we want to discover @return: list of tuples """ profile = self.host.memory.getProfileName(profile_key) entity = jid.JID(entity_jid_s) disco_infos = yield self.getInfos(entity, profile) extensions = {} for form_type, form in disco_infos.extensions.items(): fields = [] for field in form.fieldList: data = {'type': field.fieldType} for attr in ('var', 'label', 'desc'): value = getattr(field, attr) if value is not None: data[attr] = value values = [field.value] if field.value is not None else field.values fields.append((data, values)) extensions[form_type or ""] = fields defer.returnValue((disco_infos.features, [(cat, type_, name or '') for (cat, type_), name in disco_infos.identities.items()], extensions)) @defer.inlineCallbacks def _discoItems(self, entity_jid_s, profile_key=C.PROF_KEY_NONE): """ Discovery method for the bridge @param entity_jid_s: entity we want to discover @return: list of tuples""" profile = self.host.memory.getProfileName(profile_key) entity = jid.JID(entity_jid_s) disco_items = yield self.getItems(entity, profile=profile) defer.returnValue([(item.entity.full(), item.nodeIdentifier or '', item.name or '') for item in disco_items])