comparison sat/memory/disco.py @ 2562:26edcf3a30eb

core, setup: huge cleaning: - moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention - move twisted directory to root - removed all hacks from setup.py, and added missing dependencies, it is now clean - use https URL for website in setup.py - removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed - renamed sat.sh to sat and fixed its installation - added python_requires to specify Python version needed - replaced glib2reactor which use deprecated code by gtk3reactor sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author Goffi <goffi@goffi.org>
date Mon, 02 Apr 2018 19:44:50 +0200
parents src/memory/disco.py@8d82a62fa098
children e70023e84974
comparison
equal deleted inserted replaced
2561:bd30dc3ffe5a 2562:26edcf3a30eb
1 #!/usr/bin/env python2
2 # -*- coding: utf-8 -*-
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from sat.core.i18n import _
21 from sat.core import exceptions
22 from sat.core.log import getLogger
23 log = getLogger(__name__)
24 from twisted.words.protocols.jabber import jid
25 from twisted.words.protocols.jabber.error import StanzaError
26 from twisted.internet import defer
27 from twisted.internet import reactor
28 from twisted.python import failure
29 from sat.core.constants import Const as C
30 from sat.tools import xml_tools
31 from sat.memory import persistent
32 from wokkel import disco
33 from base64 import b64encode
34 from hashlib import sha1
35
36
37 TIMEOUT = 15
38 CAP_HASH_ERROR = 'ERROR'
39
40 class HashGenerationError(Exception):
41 pass
42
43
44 class ByteIdentity(object):
45 """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation"""
46
47 def __init__(self, identity, lang=None):
48 assert isinstance(identity, disco.DiscoIdentity)
49 self.category = identity.category.encode('utf-8')
50 self.idType = identity.type.encode('utf-8')
51 self.name = identity.name.encode('utf-8') if identity.name else ''
52 self.lang = lang.encode('utf-8') if lang is not None else ''
53
54 def __str__(self):
55 return "%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name)
56
57
58 class HashManager(object):
59 """map object which manage hashes
60
61 persistent storage is update when a new hash is added
62 """
63
64 def __init__(self, persistent):
65 self.hashes = {
66 CAP_HASH_ERROR: disco.DiscoInfo(), # used when we can't get disco infos
67 }
68 self.persistent = persistent
69
70 def __getitem__(self, key):
71 return self.hashes[key]
72
73 def __setitem__(self, hash_, disco_info):
74 if hash_ in self.hashes:
75 log.debug(u"ignoring hash set: it is already known")
76 return
77 self.hashes[hash_] = disco_info
78 self.persistent[hash_] = disco_info.toElement().toXml()
79
80 def __contains__(self, hash_):
81 return self.hashes.__contains__(hash_)
82
83 def load(self):
84 def fillHashes(hashes):
85 for hash_, xml in hashes.iteritems():
86 element = xml_tools.ElementParser()(xml)
87 self.hashes[hash_] = disco.DiscoInfo.fromElement(element)
88 log.info(u"Disco hashes loaded")
89 d = self.persistent.load()
90 d.addCallback(fillHashes)
91 return d
92
93
94 class Discovery(object):
95 """ Manage capabilities of entities """
96
97 def __init__(self, host):
98 self.host = host
99 # TODO: remove legacy hashes
100
101 def load(self):
102 """Load persistent hashes"""
103 self.hashes = HashManager(persistent.PersistentDict("disco"))
104 return self.hashes.load()
105
106 @defer.inlineCallbacks
107 def hasFeature(self, client, feature, jid_=None, node=u''):
108 """Tell if an entity has the required feature
109
110 @param feature: feature namespace
111 @param jid_: jid of the target, or None for profile's server
112 @param node(unicode): optional node to use for disco request
113 @return: a Deferred which fire a boolean (True if feature is available)
114 """
115 disco_infos = yield self.getInfos(client, jid_, node)
116 defer.returnValue(feature in disco_infos.features)
117
118 @defer.inlineCallbacks
119 def checkFeature(self, client, feature, jid_=None, node=u''):
120 """Like hasFeature, but raise an exception is feature is not Found
121
122 @param feature: feature namespace
123 @param jid_: jid of the target, or None for profile's server
124 @param node(unicode): optional node to use for disco request
125
126 @raise: exceptions.FeatureNotFound
127 """
128 disco_infos = yield self.getInfos(client, jid_, node)
129 if not feature in disco_infos.features:
130 raise failure.Failure(exceptions.FeatureNotFound)
131
132 @defer.inlineCallbacks
133 def checkFeatures(self, client, features, jid_=None, identity=None, node=u''):
134 """Like checkFeature, but check several features at once, and check also identity
135
136 @param features(iterable[unicode]): features to check
137 @param jid_(jid.JID): jid of the target, or None for profile's server
138 @param node(unicode): optional node to use for disco request
139 @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple
140
141 @raise: exceptions.FeatureNotFound
142 """
143 disco_infos = yield self.getInfos(client, jid_, node)
144 if not set(features).issubset(disco_infos.features):
145 raise failure.Failure(exceptions.FeatureNotFound())
146
147 if identity is not None and identity not in disco_infos.identities:
148 raise failure.Failure(exceptions.FeatureNotFound())
149
150 def getInfos(self, client, jid_=None, node=u'', use_cache=True):
151 """get disco infos from jid_, filling capability hash if needed
152
153 @param jid_: jid of the target, or None for profile's server
154 @param node(unicode): optional node to use for disco request
155 @param use_cache(bool): if True, use cached data if available
156 @return: a Deferred which fire disco.DiscoInfo
157 """
158 if jid_ is None:
159 jid_ = jid.JID(client.jid.host)
160 try:
161 cap_hash = self.host.memory.getEntityData(jid_, [C.ENTITY_CAP_HASH], client.profile)[C.ENTITY_CAP_HASH]
162 if not use_cache:
163 # we ignore cache, so we pretend we haven't found it
164 raise KeyError
165 except (KeyError, exceptions.UnknownEntityError):
166 # capability hash is not available, we'll compute one
167 def infosCb(disco_infos):
168 cap_hash = self.generateHash(disco_infos)
169 self.hashes[cap_hash] = disco_infos
170 self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, cap_hash, profile_key=client.profile)
171 return disco_infos
172 def infosEb(fail):
173 if fail.check(defer.CancelledError):
174 reason = u"request time-out"
175 else:
176 try:
177 reason = unicode(fail.value)
178 except AttributeError:
179 reason = unicode(fail)
180 log.warning(u"Error while requesting disco infos from {jid}: {reason}".format(jid=jid_.full(), reason=reason))
181 self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR, profile_key=client.profile)
182 disco_infos = self.hashes[CAP_HASH_ERROR]
183 return disco_infos
184 d = client.disco.requestInfo(jid_, nodeIdentifier=node)
185 d.addCallback(infosCb)
186 d.addErrback(infosEb)
187 return d
188 else:
189 disco_infos = self.hashes[cap_hash]
190 return defer.succeed(disco_infos)
191
192 @defer.inlineCallbacks
193 def getItems(self, client, jid_=None, node=u'', use_cache=True):
194 """get disco items from jid_, cache them for our own server
195
196 @param jid_(jid.JID): jid of the target, or None for profile's server
197 @param node(unicode): optional node to use for disco request
198 @param use_cache(bool): if True, use cached data if available
199 @return: a Deferred which fire disco.DiscoItems
200 """
201 server_jid = jid.JID(client.jid.host)
202 if jid_ is None:
203 jid_ = server_jid
204
205 if jid_ == server_jid and not node:
206 # we cache items only for our own server and if node is not set
207 try:
208 items = self.host.memory.getEntityData(jid_, ["DISCO_ITEMS"], client.profile)["DISCO_ITEMS"]
209 log.debug(u"[%s] disco items are in cache" % jid_.full())
210 if not use_cache:
211 # we ignore cache, so we pretend we haven't found it
212 raise KeyError
213 except (KeyError, exceptions.UnknownEntityError):
214 log.debug(u"Caching [%s] disco items" % jid_.full())
215 items = yield client.disco.requestItems(jid_, nodeIdentifier=node)
216 self.host.memory.updateEntityData(jid_, "DISCO_ITEMS", items, profile_key=client.profile)
217 else:
218 try:
219 items = yield client.disco.requestItems(jid_, nodeIdentifier=node)
220 except StanzaError as e:
221 log.warning(u"Error while requesting items for {jid}: {reason}"
222 .format(jid=jid_.full(), reason=e.condition))
223 items = disco.DiscoItems()
224
225 defer.returnValue(items)
226
227
228 def _infosEb(self, failure_, entity_jid):
229 failure_.trap(StanzaError)
230 log.warning(_(u"Error while requesting [%(jid)s]: %(error)s") % {'jid': entity_jid.full(),
231 'error': failure_.getErrorMessage()})
232
233 def findServiceEntity(self, client, category, type_, jid_=None):
234 """Helper method to find first available entity from findServiceEntities
235
236 args are the same as for [findServiceEntities]
237 @return (jid.JID, None): found entity
238 """
239 d = self.host.findServiceEntities(client, "pubsub", "service")
240 d.addCallback(lambda entities: entities.pop() if entities else None)
241 return d
242
243 def findServiceEntities(self, client, category, type_, jid_=None):
244 """Return all available items of an entity which correspond to (category, type_)
245
246 @param category: identity's category
247 @param type_: identitiy's type
248 @param jid_: the jid of the target server (None for profile's server)
249 @return: a set of found entities
250 @raise defer.CancelledError: the request timed out
251 """
252 found_entities = set()
253
254 def infosCb(infos, entity_jid):
255 if (category, type_) in infos.identities:
256 found_entities.add(entity_jid)
257
258 def gotItems(items):
259 defers_list = []
260 for item in items:
261 info_d = self.getInfos(client, item.entity)
262 info_d.addCallbacks(infosCb, self._infosEb, [item.entity], None, [item.entity])
263 defers_list.append(info_d)
264 return defer.DeferredList(defers_list)
265
266 d = self.getItems(client, jid_)
267 d.addCallback(gotItems)
268 d.addCallback(lambda dummy: found_entities)
269 reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout
270 return d
271
272 def findFeaturesSet(self, client, features, identity=None, jid_=None):
273 """Return entities (including jid_ and its items) offering features
274
275 @param features: iterable of features which must be present
276 @param identity(None, tuple(unicode, unicode)): if not None, accept only this (category/type) identity
277 @param jid_: the jid of the target server (None for profile's server)
278 @param profile: %(doc_profile)s
279 @return: a set of found entities
280 """
281 if jid_ is None:
282 jid_ = jid.JID(client.jid.host)
283 features = set(features)
284 found_entities = set()
285
286 def infosCb(infos, entity):
287 if entity is None:
288 log.warning(_(u'received an item without jid'))
289 return
290 if identity is not None and identity not in infos.identities:
291 return
292 if features.issubset(infos.features):
293 found_entities.add(entity)
294
295 def gotItems(items):
296 defer_list = []
297 for entity in [jid_] + [item.entity for item in items]:
298 infos_d = self.getInfos(client, entity)
299 infos_d.addCallbacks(infosCb, self._infosEb, [entity], None, [entity])
300 defer_list.append(infos_d)
301 return defer.DeferredList(defer_list)
302
303 d = self.getItems(client, jid_)
304 d.addCallback(gotItems)
305 d.addCallback(lambda dummy: found_entities)
306 reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout
307 return d
308
309 def generateHash(self, services):
310 """ Generate a unique hash for given service
311
312 hash algorithm is the one described in XEP-0115
313 @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info
314
315 """
316 s = []
317 byte_identities = [ByteIdentity(service) for service in services if isinstance(service, disco.DiscoIdentity)] # FIXME: lang must be managed here
318 byte_identities.sort(key=lambda i: i.lang)
319 byte_identities.sort(key=lambda i: i.idType)
320 byte_identities.sort(key=lambda i: i.category)
321 for identity in byte_identities:
322 s.append(str(identity))
323 s.append('<')
324 byte_features = [service.encode('utf-8') for service in services if isinstance(service, disco.DiscoFeature)]
325 byte_features.sort() # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort
326 for feature in byte_features:
327 s.append(feature)
328 s.append('<')
329 #TODO: manage XEP-0128 data form here
330 cap_hash = b64encode(sha1(''.join(s)).digest())
331 log.debug(_(u'Capability hash generated: [%s]') % cap_hash)
332 return cap_hash
333
334 @defer.inlineCallbacks
335 def _discoInfos(self, entity_jid_s, node=u'', use_cache=True, profile_key=C.PROF_KEY_NONE):
336 """ Discovery method for the bridge
337 @param entity_jid_s: entity we want to discover
338 @param use_cache(bool): if True, use cached data if available
339 @param node(unicode): optional node to use
340
341 @return: list of tuples
342 """
343 client = self.host.getClient(profile_key)
344 entity = jid.JID(entity_jid_s)
345 disco_infos = yield self.getInfos(client, entity, node, use_cache)
346 extensions = {}
347 for form_type, form in disco_infos.extensions.items():
348 fields = []
349 for field in form.fieldList:
350 data = {'type': field.fieldType}
351 for attr in ('var', 'label', 'desc'):
352 value = getattr(field, attr)
353 if value is not None:
354 data[attr] = value
355
356 values = [field.value] if field.value is not None else field.values
357 fields.append((data, values))
358
359 extensions[form_type or ""] = fields
360
361 defer.returnValue((disco_infos.features,
362 [(cat, type_, name or '') for (cat, type_), name in disco_infos.identities.items()],
363 extensions))
364
365 def items2tuples(self, disco_items):
366 """convert disco items to tuple of strings
367
368 @param disco_items(iterable[disco.DiscoItem]): items
369 @return G(tuple[unicode,unicode,unicode]): serialised items
370 """
371 for item in disco_items:
372 if not item.entity:
373 log.warning(_(u"invalid item (no jid)"))
374 continue
375 yield (item.entity.full(), item.nodeIdentifier or '', item.name or '')
376
377 @defer.inlineCallbacks
378 def _discoItems(self, entity_jid_s, node=u'', use_cache=True, profile_key=C.PROF_KEY_NONE):
379 """ Discovery method for the bridge
380
381 @param entity_jid_s: entity we want to discover
382 @param node(unicode): optional node to use
383 @param use_cache(bool): if True, use cached data if available
384 @return: list of tuples"""
385 client = self.host.getClient(profile_key)
386 entity = jid.JID(entity_jid_s)
387 disco_items = yield self.getItems(client, entity, node, use_cache)
388 ret = list(self.items2tuples(disco_items))
389 defer.returnValue(ret)