Mercurial > libervia-backend
comparison sat/memory/disco.py @ 2562:26edcf3a30eb
core, setup: huge cleaning:
- moved directories from src and frontends/src to sat and sat_frontends, which is the recommanded naming convention
- move twisted directory to root
- removed all hacks from setup.py, and added missing dependencies, it is now clean
- use https URL for website in setup.py
- removed "Environment :: X11 Applications :: GTK", as wix is deprecated and removed
- renamed sat.sh to sat and fixed its installation
- added python_requires to specify Python version needed
- replaced glib2reactor which use deprecated code by gtk3reactor
sat can now be installed directly from virtualenv without using --system-site-packages anymore \o/
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 02 Apr 2018 19:44:50 +0200 |
parents | src/memory/disco.py@8d82a62fa098 |
children | e70023e84974 |
comparison
equal
deleted
inserted
replaced
2561:bd30dc3ffe5a | 2562:26edcf3a30eb |
---|---|
1 #!/usr/bin/env python2 | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from sat.core.i18n import _ | |
21 from sat.core import exceptions | |
22 from sat.core.log import getLogger | |
23 log = getLogger(__name__) | |
24 from twisted.words.protocols.jabber import jid | |
25 from twisted.words.protocols.jabber.error import StanzaError | |
26 from twisted.internet import defer | |
27 from twisted.internet import reactor | |
28 from twisted.python import failure | |
29 from sat.core.constants import Const as C | |
30 from sat.tools import xml_tools | |
31 from sat.memory import persistent | |
32 from wokkel import disco | |
33 from base64 import b64encode | |
34 from hashlib import sha1 | |
35 | |
36 | |
37 TIMEOUT = 15 | |
38 CAP_HASH_ERROR = 'ERROR' | |
39 | |
40 class HashGenerationError(Exception): | |
41 pass | |
42 | |
43 | |
44 class ByteIdentity(object): | |
45 """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation""" | |
46 | |
47 def __init__(self, identity, lang=None): | |
48 assert isinstance(identity, disco.DiscoIdentity) | |
49 self.category = identity.category.encode('utf-8') | |
50 self.idType = identity.type.encode('utf-8') | |
51 self.name = identity.name.encode('utf-8') if identity.name else '' | |
52 self.lang = lang.encode('utf-8') if lang is not None else '' | |
53 | |
54 def __str__(self): | |
55 return "%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name) | |
56 | |
57 | |
58 class HashManager(object): | |
59 """map object which manage hashes | |
60 | |
61 persistent storage is update when a new hash is added | |
62 """ | |
63 | |
64 def __init__(self, persistent): | |
65 self.hashes = { | |
66 CAP_HASH_ERROR: disco.DiscoInfo(), # used when we can't get disco infos | |
67 } | |
68 self.persistent = persistent | |
69 | |
70 def __getitem__(self, key): | |
71 return self.hashes[key] | |
72 | |
73 def __setitem__(self, hash_, disco_info): | |
74 if hash_ in self.hashes: | |
75 log.debug(u"ignoring hash set: it is already known") | |
76 return | |
77 self.hashes[hash_] = disco_info | |
78 self.persistent[hash_] = disco_info.toElement().toXml() | |
79 | |
80 def __contains__(self, hash_): | |
81 return self.hashes.__contains__(hash_) | |
82 | |
83 def load(self): | |
84 def fillHashes(hashes): | |
85 for hash_, xml in hashes.iteritems(): | |
86 element = xml_tools.ElementParser()(xml) | |
87 self.hashes[hash_] = disco.DiscoInfo.fromElement(element) | |
88 log.info(u"Disco hashes loaded") | |
89 d = self.persistent.load() | |
90 d.addCallback(fillHashes) | |
91 return d | |
92 | |
93 | |
94 class Discovery(object): | |
95 """ Manage capabilities of entities """ | |
96 | |
97 def __init__(self, host): | |
98 self.host = host | |
99 # TODO: remove legacy hashes | |
100 | |
101 def load(self): | |
102 """Load persistent hashes""" | |
103 self.hashes = HashManager(persistent.PersistentDict("disco")) | |
104 return self.hashes.load() | |
105 | |
106 @defer.inlineCallbacks | |
107 def hasFeature(self, client, feature, jid_=None, node=u''): | |
108 """Tell if an entity has the required feature | |
109 | |
110 @param feature: feature namespace | |
111 @param jid_: jid of the target, or None for profile's server | |
112 @param node(unicode): optional node to use for disco request | |
113 @return: a Deferred which fire a boolean (True if feature is available) | |
114 """ | |
115 disco_infos = yield self.getInfos(client, jid_, node) | |
116 defer.returnValue(feature in disco_infos.features) | |
117 | |
118 @defer.inlineCallbacks | |
119 def checkFeature(self, client, feature, jid_=None, node=u''): | |
120 """Like hasFeature, but raise an exception is feature is not Found | |
121 | |
122 @param feature: feature namespace | |
123 @param jid_: jid of the target, or None for profile's server | |
124 @param node(unicode): optional node to use for disco request | |
125 | |
126 @raise: exceptions.FeatureNotFound | |
127 """ | |
128 disco_infos = yield self.getInfos(client, jid_, node) | |
129 if not feature in disco_infos.features: | |
130 raise failure.Failure(exceptions.FeatureNotFound) | |
131 | |
132 @defer.inlineCallbacks | |
133 def checkFeatures(self, client, features, jid_=None, identity=None, node=u''): | |
134 """Like checkFeature, but check several features at once, and check also identity | |
135 | |
136 @param features(iterable[unicode]): features to check | |
137 @param jid_(jid.JID): jid of the target, or None for profile's server | |
138 @param node(unicode): optional node to use for disco request | |
139 @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple | |
140 | |
141 @raise: exceptions.FeatureNotFound | |
142 """ | |
143 disco_infos = yield self.getInfos(client, jid_, node) | |
144 if not set(features).issubset(disco_infos.features): | |
145 raise failure.Failure(exceptions.FeatureNotFound()) | |
146 | |
147 if identity is not None and identity not in disco_infos.identities: | |
148 raise failure.Failure(exceptions.FeatureNotFound()) | |
149 | |
150 def getInfos(self, client, jid_=None, node=u'', use_cache=True): | |
151 """get disco infos from jid_, filling capability hash if needed | |
152 | |
153 @param jid_: jid of the target, or None for profile's server | |
154 @param node(unicode): optional node to use for disco request | |
155 @param use_cache(bool): if True, use cached data if available | |
156 @return: a Deferred which fire disco.DiscoInfo | |
157 """ | |
158 if jid_ is None: | |
159 jid_ = jid.JID(client.jid.host) | |
160 try: | |
161 cap_hash = self.host.memory.getEntityData(jid_, [C.ENTITY_CAP_HASH], client.profile)[C.ENTITY_CAP_HASH] | |
162 if not use_cache: | |
163 # we ignore cache, so we pretend we haven't found it | |
164 raise KeyError | |
165 except (KeyError, exceptions.UnknownEntityError): | |
166 # capability hash is not available, we'll compute one | |
167 def infosCb(disco_infos): | |
168 cap_hash = self.generateHash(disco_infos) | |
169 self.hashes[cap_hash] = disco_infos | |
170 self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, cap_hash, profile_key=client.profile) | |
171 return disco_infos | |
172 def infosEb(fail): | |
173 if fail.check(defer.CancelledError): | |
174 reason = u"request time-out" | |
175 else: | |
176 try: | |
177 reason = unicode(fail.value) | |
178 except AttributeError: | |
179 reason = unicode(fail) | |
180 log.warning(u"Error while requesting disco infos from {jid}: {reason}".format(jid=jid_.full(), reason=reason)) | |
181 self.host.memory.updateEntityData(jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR, profile_key=client.profile) | |
182 disco_infos = self.hashes[CAP_HASH_ERROR] | |
183 return disco_infos | |
184 d = client.disco.requestInfo(jid_, nodeIdentifier=node) | |
185 d.addCallback(infosCb) | |
186 d.addErrback(infosEb) | |
187 return d | |
188 else: | |
189 disco_infos = self.hashes[cap_hash] | |
190 return defer.succeed(disco_infos) | |
191 | |
192 @defer.inlineCallbacks | |
193 def getItems(self, client, jid_=None, node=u'', use_cache=True): | |
194 """get disco items from jid_, cache them for our own server | |
195 | |
196 @param jid_(jid.JID): jid of the target, or None for profile's server | |
197 @param node(unicode): optional node to use for disco request | |
198 @param use_cache(bool): if True, use cached data if available | |
199 @return: a Deferred which fire disco.DiscoItems | |
200 """ | |
201 server_jid = jid.JID(client.jid.host) | |
202 if jid_ is None: | |
203 jid_ = server_jid | |
204 | |
205 if jid_ == server_jid and not node: | |
206 # we cache items only for our own server and if node is not set | |
207 try: | |
208 items = self.host.memory.getEntityData(jid_, ["DISCO_ITEMS"], client.profile)["DISCO_ITEMS"] | |
209 log.debug(u"[%s] disco items are in cache" % jid_.full()) | |
210 if not use_cache: | |
211 # we ignore cache, so we pretend we haven't found it | |
212 raise KeyError | |
213 except (KeyError, exceptions.UnknownEntityError): | |
214 log.debug(u"Caching [%s] disco items" % jid_.full()) | |
215 items = yield client.disco.requestItems(jid_, nodeIdentifier=node) | |
216 self.host.memory.updateEntityData(jid_, "DISCO_ITEMS", items, profile_key=client.profile) | |
217 else: | |
218 try: | |
219 items = yield client.disco.requestItems(jid_, nodeIdentifier=node) | |
220 except StanzaError as e: | |
221 log.warning(u"Error while requesting items for {jid}: {reason}" | |
222 .format(jid=jid_.full(), reason=e.condition)) | |
223 items = disco.DiscoItems() | |
224 | |
225 defer.returnValue(items) | |
226 | |
227 | |
228 def _infosEb(self, failure_, entity_jid): | |
229 failure_.trap(StanzaError) | |
230 log.warning(_(u"Error while requesting [%(jid)s]: %(error)s") % {'jid': entity_jid.full(), | |
231 'error': failure_.getErrorMessage()}) | |
232 | |
233 def findServiceEntity(self, client, category, type_, jid_=None): | |
234 """Helper method to find first available entity from findServiceEntities | |
235 | |
236 args are the same as for [findServiceEntities] | |
237 @return (jid.JID, None): found entity | |
238 """ | |
239 d = self.host.findServiceEntities(client, "pubsub", "service") | |
240 d.addCallback(lambda entities: entities.pop() if entities else None) | |
241 return d | |
242 | |
243 def findServiceEntities(self, client, category, type_, jid_=None): | |
244 """Return all available items of an entity which correspond to (category, type_) | |
245 | |
246 @param category: identity's category | |
247 @param type_: identitiy's type | |
248 @param jid_: the jid of the target server (None for profile's server) | |
249 @return: a set of found entities | |
250 @raise defer.CancelledError: the request timed out | |
251 """ | |
252 found_entities = set() | |
253 | |
254 def infosCb(infos, entity_jid): | |
255 if (category, type_) in infos.identities: | |
256 found_entities.add(entity_jid) | |
257 | |
258 def gotItems(items): | |
259 defers_list = [] | |
260 for item in items: | |
261 info_d = self.getInfos(client, item.entity) | |
262 info_d.addCallbacks(infosCb, self._infosEb, [item.entity], None, [item.entity]) | |
263 defers_list.append(info_d) | |
264 return defer.DeferredList(defers_list) | |
265 | |
266 d = self.getItems(client, jid_) | |
267 d.addCallback(gotItems) | |
268 d.addCallback(lambda dummy: found_entities) | |
269 reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout | |
270 return d | |
271 | |
272 def findFeaturesSet(self, client, features, identity=None, jid_=None): | |
273 """Return entities (including jid_ and its items) offering features | |
274 | |
275 @param features: iterable of features which must be present | |
276 @param identity(None, tuple(unicode, unicode)): if not None, accept only this (category/type) identity | |
277 @param jid_: the jid of the target server (None for profile's server) | |
278 @param profile: %(doc_profile)s | |
279 @return: a set of found entities | |
280 """ | |
281 if jid_ is None: | |
282 jid_ = jid.JID(client.jid.host) | |
283 features = set(features) | |
284 found_entities = set() | |
285 | |
286 def infosCb(infos, entity): | |
287 if entity is None: | |
288 log.warning(_(u'received an item without jid')) | |
289 return | |
290 if identity is not None and identity not in infos.identities: | |
291 return | |
292 if features.issubset(infos.features): | |
293 found_entities.add(entity) | |
294 | |
295 def gotItems(items): | |
296 defer_list = [] | |
297 for entity in [jid_] + [item.entity for item in items]: | |
298 infos_d = self.getInfos(client, entity) | |
299 infos_d.addCallbacks(infosCb, self._infosEb, [entity], None, [entity]) | |
300 defer_list.append(infos_d) | |
301 return defer.DeferredList(defer_list) | |
302 | |
303 d = self.getItems(client, jid_) | |
304 d.addCallback(gotItems) | |
305 d.addCallback(lambda dummy: found_entities) | |
306 reactor.callLater(TIMEOUT, d.cancel) # FIXME: one bad service make a general timeout | |
307 return d | |
308 | |
309 def generateHash(self, services): | |
310 """ Generate a unique hash for given service | |
311 | |
312 hash algorithm is the one described in XEP-0115 | |
313 @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info | |
314 | |
315 """ | |
316 s = [] | |
317 byte_identities = [ByteIdentity(service) for service in services if isinstance(service, disco.DiscoIdentity)] # FIXME: lang must be managed here | |
318 byte_identities.sort(key=lambda i: i.lang) | |
319 byte_identities.sort(key=lambda i: i.idType) | |
320 byte_identities.sort(key=lambda i: i.category) | |
321 for identity in byte_identities: | |
322 s.append(str(identity)) | |
323 s.append('<') | |
324 byte_features = [service.encode('utf-8') for service in services if isinstance(service, disco.DiscoFeature)] | |
325 byte_features.sort() # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort | |
326 for feature in byte_features: | |
327 s.append(feature) | |
328 s.append('<') | |
329 #TODO: manage XEP-0128 data form here | |
330 cap_hash = b64encode(sha1(''.join(s)).digest()) | |
331 log.debug(_(u'Capability hash generated: [%s]') % cap_hash) | |
332 return cap_hash | |
333 | |
334 @defer.inlineCallbacks | |
335 def _discoInfos(self, entity_jid_s, node=u'', use_cache=True, profile_key=C.PROF_KEY_NONE): | |
336 """ Discovery method for the bridge | |
337 @param entity_jid_s: entity we want to discover | |
338 @param use_cache(bool): if True, use cached data if available | |
339 @param node(unicode): optional node to use | |
340 | |
341 @return: list of tuples | |
342 """ | |
343 client = self.host.getClient(profile_key) | |
344 entity = jid.JID(entity_jid_s) | |
345 disco_infos = yield self.getInfos(client, entity, node, use_cache) | |
346 extensions = {} | |
347 for form_type, form in disco_infos.extensions.items(): | |
348 fields = [] | |
349 for field in form.fieldList: | |
350 data = {'type': field.fieldType} | |
351 for attr in ('var', 'label', 'desc'): | |
352 value = getattr(field, attr) | |
353 if value is not None: | |
354 data[attr] = value | |
355 | |
356 values = [field.value] if field.value is not None else field.values | |
357 fields.append((data, values)) | |
358 | |
359 extensions[form_type or ""] = fields | |
360 | |
361 defer.returnValue((disco_infos.features, | |
362 [(cat, type_, name or '') for (cat, type_), name in disco_infos.identities.items()], | |
363 extensions)) | |
364 | |
365 def items2tuples(self, disco_items): | |
366 """convert disco items to tuple of strings | |
367 | |
368 @param disco_items(iterable[disco.DiscoItem]): items | |
369 @return G(tuple[unicode,unicode,unicode]): serialised items | |
370 """ | |
371 for item in disco_items: | |
372 if not item.entity: | |
373 log.warning(_(u"invalid item (no jid)")) | |
374 continue | |
375 yield (item.entity.full(), item.nodeIdentifier or '', item.name or '') | |
376 | |
377 @defer.inlineCallbacks | |
378 def _discoItems(self, entity_jid_s, node=u'', use_cache=True, profile_key=C.PROF_KEY_NONE): | |
379 """ Discovery method for the bridge | |
380 | |
381 @param entity_jid_s: entity we want to discover | |
382 @param node(unicode): optional node to use | |
383 @param use_cache(bool): if True, use cached data if available | |
384 @return: list of tuples""" | |
385 client = self.host.getClient(profile_key) | |
386 entity = jid.JID(entity_jid_s) | |
387 disco_items = yield self.getItems(client, entity, node, use_cache) | |
388 ret = list(self.items2tuples(disco_items)) | |
389 defer.returnValue(ret) |