Mercurial > libervia-backend
comparison libervia/backend/memory/disco.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/memory/disco.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from typing import Optional | |
21 from libervia.backend.core.i18n import _ | |
22 from libervia.backend.core import exceptions | |
23 from libervia.backend.core.log import getLogger | |
24 from libervia.backend.core.core_types import SatXMPPEntity | |
25 | |
26 from twisted.words.protocols.jabber import jid | |
27 from twisted.words.protocols.jabber.error import StanzaError | |
28 from twisted.internet import defer | |
29 from twisted.internet import reactor | |
30 from twisted.python import failure | |
31 from libervia.backend.core.constants import Const as C | |
32 from libervia.backend.tools import xml_tools | |
33 from libervia.backend.memory import persistent | |
34 from wokkel import disco | |
35 from base64 import b64encode | |
36 from hashlib import sha1 | |
37 | |
38 | |
39 log = getLogger(__name__) | |
40 | |
41 | |
42 TIMEOUT = 15 | |
43 CAP_HASH_ERROR = "ERROR" | |
44 | |
45 | |
46 class HashGenerationError(Exception): | |
47 pass | |
48 | |
49 | |
50 class ByteIdentity(object): | |
51 """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation""" | |
52 | |
53 def __init__(self, identity, lang=None): | |
54 assert isinstance(identity, disco.DiscoIdentity) | |
55 self.category = identity.category.encode("utf-8") | |
56 self.idType = identity.type.encode("utf-8") | |
57 self.name = identity.name.encode("utf-8") if identity.name else b"" | |
58 self.lang = lang.encode("utf-8") if lang is not None else b"" | |
59 | |
60 def __bytes__(self): | |
61 return b"%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name) | |
62 | |
63 | |
64 class HashManager(object): | |
65 """map object which manage hashes | |
66 | |
67 persistent storage is update when a new hash is added | |
68 """ | |
69 | |
70 def __init__(self, persistent): | |
71 self.hashes = { | |
72 CAP_HASH_ERROR: disco.DiscoInfo() # used when we can't get disco infos | |
73 } | |
74 self.persistent = persistent | |
75 | |
76 def __getitem__(self, key): | |
77 return self.hashes[key] | |
78 | |
79 def __setitem__(self, hash_, disco_info): | |
80 if hash_ in self.hashes: | |
81 log.debug("ignoring hash set: it is already known") | |
82 return | |
83 self.hashes[hash_] = disco_info | |
84 self.persistent[hash_] = disco_info.toElement().toXml() | |
85 | |
86 def __contains__(self, hash_): | |
87 return self.hashes.__contains__(hash_) | |
88 | |
89 def load(self): | |
90 def fill_hashes(hashes): | |
91 for hash_, xml in hashes.items(): | |
92 element = xml_tools.ElementParser()(xml) | |
93 disco_info = disco.DiscoInfo.fromElement(element) | |
94 for ext_form in disco_info.extensions.values(): | |
95 # wokkel doesn't call typeCheck on reception, so we do it here | |
96 ext_form.typeCheck() | |
97 if not disco_info.features and not disco_info.identities: | |
98 log.warning( | |
99 _( | |
100 "no feature/identity found in disco element (hash: {cap_hash}), ignoring: {xml}" | |
101 ).format(cap_hash=hash_, xml=xml) | |
102 ) | |
103 else: | |
104 self.hashes[hash_] = disco_info | |
105 | |
106 log.info("Disco hashes loaded") | |
107 | |
108 d = self.persistent.load() | |
109 d.addCallback(fill_hashes) | |
110 return d | |
111 | |
112 | |
113 class Discovery(object): | |
114 """ Manage capabilities of entities """ | |
115 | |
116 def __init__(self, host): | |
117 self.host = host | |
118 # TODO: remove legacy hashes | |
119 | |
120 def load(self): | |
121 """Load persistent hashes""" | |
122 self.hashes = HashManager(persistent.PersistentDict("disco")) | |
123 return self.hashes.load() | |
124 | |
125 @defer.inlineCallbacks | |
126 def hasFeature(self, client, feature, jid_=None, node=""): | |
127 """Tell if an entity has the required feature | |
128 | |
129 @param feature: feature namespace | |
130 @param jid_: jid of the target, or None for profile's server | |
131 @param node(unicode): optional node to use for disco request | |
132 @return: a Deferred which fire a boolean (True if feature is available) | |
133 """ | |
134 disco_infos = yield self.get_infos(client, jid_, node) | |
135 defer.returnValue(feature in disco_infos.features) | |
136 | |
137 @defer.inlineCallbacks | |
138 def check_feature(self, client, feature, jid_=None, node=""): | |
139 """Like hasFeature, but raise an exception is feature is not Found | |
140 | |
141 @param feature: feature namespace | |
142 @param jid_: jid of the target, or None for profile's server | |
143 @param node(unicode): optional node to use for disco request | |
144 | |
145 @raise: exceptions.FeatureNotFound | |
146 """ | |
147 disco_infos = yield self.get_infos(client, jid_, node) | |
148 if not feature in disco_infos.features: | |
149 raise failure.Failure(exceptions.FeatureNotFound()) | |
150 | |
151 @defer.inlineCallbacks | |
152 def check_features(self, client, features, jid_=None, identity=None, node=""): | |
153 """Like check_feature, but check several features at once, and check also identity | |
154 | |
155 @param features(iterable[unicode]): features to check | |
156 @param jid_(jid.JID): jid of the target, or None for profile's server | |
157 @param node(unicode): optional node to use for disco request | |
158 @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple | |
159 | |
160 @raise: exceptions.FeatureNotFound | |
161 """ | |
162 disco_infos = yield self.get_infos(client, jid_, node) | |
163 if not set(features).issubset(disco_infos.features): | |
164 raise failure.Failure(exceptions.FeatureNotFound()) | |
165 | |
166 if identity is not None and identity not in disco_infos.identities: | |
167 raise failure.Failure(exceptions.FeatureNotFound()) | |
168 | |
169 async def has_identity( | |
170 self, | |
171 client: SatXMPPEntity, | |
172 category: str, | |
173 type_: str, | |
174 jid_: Optional[jid.JID] = None, | |
175 node: str = "" | |
176 ) -> bool: | |
177 """Tell if an entity has the requested identity | |
178 | |
179 @param category: identity category | |
180 @param type_: identity type | |
181 @param jid_: jid of the target, or None for profile's server | |
182 @param node(unicode): optional node to use for disco request | |
183 @return: True if the entity has the given identity | |
184 """ | |
185 disco_infos = await self.get_infos(client, jid_, node) | |
186 return (category, type_) in disco_infos.identities | |
187 | |
188 def get_infos(self, client, jid_=None, node="", use_cache=True): | |
189 """get disco infos from jid_, filling capability hash if needed | |
190 | |
191 @param jid_: jid of the target, or None for profile's server | |
192 @param node(unicode): optional node to use for disco request | |
193 @param use_cache(bool): if True, use cached data if available | |
194 @return: a Deferred which fire disco.DiscoInfo | |
195 """ | |
196 if jid_ is None: | |
197 jid_ = jid.JID(client.jid.host) | |
198 try: | |
199 if not use_cache: | |
200 # we ignore cache, so we pretend we haven't found it | |
201 raise KeyError | |
202 cap_hash = self.host.memory.entity_data_get( | |
203 client, jid_, [C.ENTITY_CAP_HASH] | |
204 )[C.ENTITY_CAP_HASH] | |
205 except (KeyError, exceptions.UnknownEntityError): | |
206 # capability hash is not available, we'll compute one | |
207 def infos_cb(disco_infos): | |
208 cap_hash = self.generate_hash(disco_infos) | |
209 for ext_form in disco_infos.extensions.values(): | |
210 # wokkel doesn't call typeCheck on reception, so we do it here | |
211 # to avoid ending up with incorrect types. We have to do it after | |
212 # the hash has been generated (str value is needed to compute the | |
213 # hash) | |
214 ext_form.typeCheck() | |
215 self.hashes[cap_hash] = disco_infos | |
216 self.host.memory.update_entity_data( | |
217 client, jid_, C.ENTITY_CAP_HASH, cap_hash | |
218 ) | |
219 return disco_infos | |
220 | |
221 def infos_eb(fail): | |
222 if fail.check(defer.CancelledError): | |
223 reason = "request time-out" | |
224 fail = failure.Failure(exceptions.TimeOutError(str(fail.value))) | |
225 else: | |
226 try: | |
227 reason = str(fail.value) | |
228 except AttributeError: | |
229 reason = str(fail) | |
230 | |
231 log.warning( | |
232 "can't request disco infos from {jid}: {reason}".format( | |
233 jid=jid_.full(), reason=reason | |
234 ) | |
235 ) | |
236 | |
237 # XXX we set empty disco in cache, to avoid getting an error or waiting | |
238 # for a timeout again the next time | |
239 self.host.memory.update_entity_data( | |
240 client, jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR | |
241 ) | |
242 raise fail | |
243 | |
244 d = client.disco.requestInfo(jid_, nodeIdentifier=node) | |
245 d.addCallback(infos_cb) | |
246 d.addErrback(infos_eb) | |
247 return d | |
248 else: | |
249 disco_infos = self.hashes[cap_hash] | |
250 return defer.succeed(disco_infos) | |
251 | |
252 @defer.inlineCallbacks | |
253 def get_items(self, client, jid_=None, node="", use_cache=True): | |
254 """get disco items from jid_, cache them for our own server | |
255 | |
256 @param jid_(jid.JID): jid of the target, or None for profile's server | |
257 @param node(unicode): optional node to use for disco request | |
258 @param use_cache(bool): if True, use cached data if available | |
259 @return: a Deferred which fire disco.DiscoItems | |
260 """ | |
261 if jid_ is None: | |
262 jid_ = client.server_jid | |
263 | |
264 if jid_ == client.server_jid and not node: | |
265 # we cache items only for our own server and if node is not set | |
266 try: | |
267 items = self.host.memory.entity_data_get( | |
268 client, jid_, ["DISCO_ITEMS"] | |
269 )["DISCO_ITEMS"] | |
270 log.debug("[%s] disco items are in cache" % jid_.full()) | |
271 if not use_cache: | |
272 # we ignore cache, so we pretend we haven't found it | |
273 raise KeyError | |
274 except (KeyError, exceptions.UnknownEntityError): | |
275 log.debug("Caching [%s] disco items" % jid_.full()) | |
276 items = yield client.disco.requestItems(jid_, nodeIdentifier=node) | |
277 self.host.memory.update_entity_data( | |
278 client, jid_, "DISCO_ITEMS", items | |
279 ) | |
280 else: | |
281 try: | |
282 items = yield client.disco.requestItems(jid_, nodeIdentifier=node) | |
283 except StanzaError as e: | |
284 log.warning( | |
285 "Error while requesting items for {jid}: {reason}".format( | |
286 jid=jid_.full(), reason=e.condition | |
287 ) | |
288 ) | |
289 items = disco.DiscoItems() | |
290 | |
291 defer.returnValue(items) | |
292 | |
293 def _infos_eb(self, failure_, entity_jid): | |
294 failure_.trap(StanzaError) | |
295 log.warning( | |
296 _("Error while requesting [%(jid)s]: %(error)s") | |
297 % {"jid": entity_jid.full(), "error": failure_.getErrorMessage()} | |
298 ) | |
299 | |
300 def find_service_entity(self, client, category, type_, jid_=None): | |
301 """Helper method to find first available entity from find_service_entities | |
302 | |
303 args are the same as for [find_service_entities] | |
304 @return (jid.JID, None): found entity | |
305 """ | |
306 d = self.host.find_service_entities(client, category, type_) | |
307 d.addCallback(lambda entities: entities.pop() if entities else None) | |
308 return d | |
309 | |
310 def find_service_entities(self, client, category, type_, jid_=None): | |
311 """Return all available items of an entity which correspond to (category, type_) | |
312 | |
313 @param category: identity's category | |
314 @param type_: identitiy's type | |
315 @param jid_: the jid of the target server (None for profile's server) | |
316 @return: a set of found entities | |
317 @raise defer.CancelledError: the request timed out | |
318 """ | |
319 found_entities = set() | |
320 | |
321 def infos_cb(infos, entity_jid): | |
322 if (category, type_) in infos.identities: | |
323 found_entities.add(entity_jid) | |
324 | |
325 def got_items(items): | |
326 defers_list = [] | |
327 for item in items: | |
328 info_d = self.get_infos(client, item.entity) | |
329 info_d.addCallbacks( | |
330 infos_cb, self._infos_eb, [item.entity], None, [item.entity] | |
331 ) | |
332 defers_list.append(info_d) | |
333 return defer.DeferredList(defers_list) | |
334 | |
335 d = self.get_items(client, jid_) | |
336 d.addCallback(got_items) | |
337 d.addCallback(lambda __: found_entities) | |
338 reactor.callLater( | |
339 TIMEOUT, d.cancel | |
340 ) # FIXME: one bad service make a general timeout | |
341 return d | |
342 | |
343 def find_features_set(self, client, features, identity=None, jid_=None): | |
344 """Return entities (including jid_ and its items) offering features | |
345 | |
346 @param features: iterable of features which must be present | |
347 @param identity(None, tuple(unicode, unicode)): if not None, accept only this | |
348 (category/type) identity | |
349 @param jid_: the jid of the target server (None for profile's server) | |
350 @param profile: %(doc_profile)s | |
351 @return: a set of found entities | |
352 """ | |
353 if jid_ is None: | |
354 jid_ = jid.JID(client.jid.host) | |
355 features = set(features) | |
356 found_entities = set() | |
357 | |
358 def infos_cb(infos, entity): | |
359 if entity is None: | |
360 log.warning(_("received an item without jid")) | |
361 return | |
362 if identity is not None and identity not in infos.identities: | |
363 return | |
364 if features.issubset(infos.features): | |
365 found_entities.add(entity) | |
366 | |
367 def got_items(items): | |
368 defer_list = [] | |
369 for entity in [jid_] + [item.entity for item in items]: | |
370 infos_d = self.get_infos(client, entity) | |
371 infos_d.addCallbacks(infos_cb, self._infos_eb, [entity], None, [entity]) | |
372 defer_list.append(infos_d) | |
373 return defer.DeferredList(defer_list) | |
374 | |
375 d = self.get_items(client, jid_) | |
376 d.addCallback(got_items) | |
377 d.addCallback(lambda __: found_entities) | |
378 reactor.callLater( | |
379 TIMEOUT, d.cancel | |
380 ) # FIXME: one bad service make a general timeout | |
381 return d | |
382 | |
383 def generate_hash(self, services): | |
384 """ Generate a unique hash for given service | |
385 | |
386 hash algorithm is the one described in XEP-0115 | |
387 @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info | |
388 | |
389 """ | |
390 s = [] | |
391 # identities | |
392 byte_identities = [ | |
393 ByteIdentity(service) | |
394 for service in services | |
395 if isinstance(service, disco.DiscoIdentity) | |
396 ] # FIXME: lang must be managed here | |
397 byte_identities.sort(key=lambda i: i.lang) | |
398 byte_identities.sort(key=lambda i: i.idType) | |
399 byte_identities.sort(key=lambda i: i.category) | |
400 for identity in byte_identities: | |
401 s.append(bytes(identity)) | |
402 s.append(b"<") | |
403 # features | |
404 byte_features = [ | |
405 service.encode("utf-8") | |
406 for service in services | |
407 if isinstance(service, disco.DiscoFeature) | |
408 ] | |
409 byte_features.sort() # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort | |
410 for feature in byte_features: | |
411 s.append(feature) | |
412 s.append(b"<") | |
413 | |
414 # extensions | |
415 ext = list(services.extensions.values()) | |
416 ext.sort(key=lambda f: f.formNamespace.encode('utf-8')) | |
417 for extension in ext: | |
418 s.append(extension.formNamespace.encode('utf-8')) | |
419 s.append(b"<") | |
420 fields = extension.fieldList | |
421 fields.sort(key=lambda f: f.var.encode('utf-8')) | |
422 for field in fields: | |
423 s.append(field.var.encode('utf-8')) | |
424 s.append(b"<") | |
425 values = [v.encode('utf-8') for v in field.values] | |
426 values.sort() | |
427 for value in values: | |
428 s.append(value) | |
429 s.append(b"<") | |
430 | |
431 cap_hash = b64encode(sha1(b"".join(s)).digest()).decode('utf-8') | |
432 log.debug(_("Capability hash generated: [{cap_hash}]").format(cap_hash=cap_hash)) | |
433 return cap_hash | |
434 | |
435 @defer.inlineCallbacks | |
436 def _disco_infos( | |
437 self, entity_jid_s, node="", use_cache=True, profile_key=C.PROF_KEY_NONE | |
438 ): | |
439 """Discovery method for the bridge | |
440 @param entity_jid_s: entity we want to discover | |
441 @param use_cache(bool): if True, use cached data if available | |
442 @param node(unicode): optional node to use | |
443 | |
444 @return: list of tuples | |
445 """ | |
446 client = self.host.get_client(profile_key) | |
447 entity = jid.JID(entity_jid_s) | |
448 disco_infos = yield self.get_infos(client, entity, node, use_cache) | |
449 extensions = {} | |
450 # FIXME: should extensions be serialised using tools.common.data_format? | |
451 for form_type, form in list(disco_infos.extensions.items()): | |
452 fields = [] | |
453 for field in form.fieldList: | |
454 data = {"type": field.fieldType} | |
455 for attr in ("var", "label", "desc"): | |
456 value = getattr(field, attr) | |
457 if value is not None: | |
458 data[attr] = value | |
459 | |
460 values = [field.value] if field.value is not None else field.values | |
461 if field.fieldType == "boolean": | |
462 values = [C.bool_const(v) for v in values] | |
463 fields.append((data, values)) | |
464 | |
465 extensions[form_type or ""] = fields | |
466 | |
467 defer.returnValue(( | |
468 [str(f) for f in disco_infos.features], | |
469 [(cat, type_, name or "") | |
470 for (cat, type_), name in list(disco_infos.identities.items())], | |
471 extensions)) | |
472 | |
473 def items2tuples(self, disco_items): | |
474 """convert disco items to tuple of strings | |
475 | |
476 @param disco_items(iterable[disco.DiscoItem]): items | |
477 @return G(tuple[unicode,unicode,unicode]): serialised items | |
478 """ | |
479 for item in disco_items: | |
480 if not item.entity: | |
481 log.warning(_("invalid item (no jid)")) | |
482 continue | |
483 yield (item.entity.full(), item.nodeIdentifier or "", item.name or "") | |
484 | |
485 @defer.inlineCallbacks | |
486 def _disco_items( | |
487 self, entity_jid_s, node="", use_cache=True, profile_key=C.PROF_KEY_NONE | |
488 ): | |
489 """ Discovery method for the bridge | |
490 | |
491 @param entity_jid_s: entity we want to discover | |
492 @param node(unicode): optional node to use | |
493 @param use_cache(bool): if True, use cached data if available | |
494 @return: list of tuples""" | |
495 client = self.host.get_client(profile_key) | |
496 entity = jid.JID(entity_jid_s) | |
497 disco_items = yield self.get_items(client, entity, node, use_cache) | |
498 ret = list(self.items2tuples(disco_items)) | |
499 defer.returnValue(ret) |