comparison libervia/backend/memory/disco.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/memory/disco.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from typing import Optional
21 from libervia.backend.core.i18n import _
22 from libervia.backend.core import exceptions
23 from libervia.backend.core.log import getLogger
24 from libervia.backend.core.core_types import SatXMPPEntity
25
26 from twisted.words.protocols.jabber import jid
27 from twisted.words.protocols.jabber.error import StanzaError
28 from twisted.internet import defer
29 from twisted.internet import reactor
30 from twisted.python import failure
31 from libervia.backend.core.constants import Const as C
32 from libervia.backend.tools import xml_tools
33 from libervia.backend.memory import persistent
34 from wokkel import disco
35 from base64 import b64encode
36 from hashlib import sha1
37
38
39 log = getLogger(__name__)
40
41
42 TIMEOUT = 15
43 CAP_HASH_ERROR = "ERROR"
44
45
46 class HashGenerationError(Exception):
47 pass
48
49
50 class ByteIdentity(object):
51 """This class manage identity as bytes (needed for i;octet sort), it is used for the hash generation"""
52
53 def __init__(self, identity, lang=None):
54 assert isinstance(identity, disco.DiscoIdentity)
55 self.category = identity.category.encode("utf-8")
56 self.idType = identity.type.encode("utf-8")
57 self.name = identity.name.encode("utf-8") if identity.name else b""
58 self.lang = lang.encode("utf-8") if lang is not None else b""
59
60 def __bytes__(self):
61 return b"%s/%s/%s/%s" % (self.category, self.idType, self.lang, self.name)
62
63
64 class HashManager(object):
65 """map object which manage hashes
66
67 persistent storage is update when a new hash is added
68 """
69
70 def __init__(self, persistent):
71 self.hashes = {
72 CAP_HASH_ERROR: disco.DiscoInfo() # used when we can't get disco infos
73 }
74 self.persistent = persistent
75
76 def __getitem__(self, key):
77 return self.hashes[key]
78
79 def __setitem__(self, hash_, disco_info):
80 if hash_ in self.hashes:
81 log.debug("ignoring hash set: it is already known")
82 return
83 self.hashes[hash_] = disco_info
84 self.persistent[hash_] = disco_info.toElement().toXml()
85
86 def __contains__(self, hash_):
87 return self.hashes.__contains__(hash_)
88
89 def load(self):
90 def fill_hashes(hashes):
91 for hash_, xml in hashes.items():
92 element = xml_tools.ElementParser()(xml)
93 disco_info = disco.DiscoInfo.fromElement(element)
94 for ext_form in disco_info.extensions.values():
95 # wokkel doesn't call typeCheck on reception, so we do it here
96 ext_form.typeCheck()
97 if not disco_info.features and not disco_info.identities:
98 log.warning(
99 _(
100 "no feature/identity found in disco element (hash: {cap_hash}), ignoring: {xml}"
101 ).format(cap_hash=hash_, xml=xml)
102 )
103 else:
104 self.hashes[hash_] = disco_info
105
106 log.info("Disco hashes loaded")
107
108 d = self.persistent.load()
109 d.addCallback(fill_hashes)
110 return d
111
112
113 class Discovery(object):
114 """ Manage capabilities of entities """
115
116 def __init__(self, host):
117 self.host = host
118 # TODO: remove legacy hashes
119
120 def load(self):
121 """Load persistent hashes"""
122 self.hashes = HashManager(persistent.PersistentDict("disco"))
123 return self.hashes.load()
124
125 @defer.inlineCallbacks
126 def hasFeature(self, client, feature, jid_=None, node=""):
127 """Tell if an entity has the required feature
128
129 @param feature: feature namespace
130 @param jid_: jid of the target, or None for profile's server
131 @param node(unicode): optional node to use for disco request
132 @return: a Deferred which fire a boolean (True if feature is available)
133 """
134 disco_infos = yield self.get_infos(client, jid_, node)
135 defer.returnValue(feature in disco_infos.features)
136
137 @defer.inlineCallbacks
138 def check_feature(self, client, feature, jid_=None, node=""):
139 """Like hasFeature, but raise an exception is feature is not Found
140
141 @param feature: feature namespace
142 @param jid_: jid of the target, or None for profile's server
143 @param node(unicode): optional node to use for disco request
144
145 @raise: exceptions.FeatureNotFound
146 """
147 disco_infos = yield self.get_infos(client, jid_, node)
148 if not feature in disco_infos.features:
149 raise failure.Failure(exceptions.FeatureNotFound())
150
151 @defer.inlineCallbacks
152 def check_features(self, client, features, jid_=None, identity=None, node=""):
153 """Like check_feature, but check several features at once, and check also identity
154
155 @param features(iterable[unicode]): features to check
156 @param jid_(jid.JID): jid of the target, or None for profile's server
157 @param node(unicode): optional node to use for disco request
158 @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple
159
160 @raise: exceptions.FeatureNotFound
161 """
162 disco_infos = yield self.get_infos(client, jid_, node)
163 if not set(features).issubset(disco_infos.features):
164 raise failure.Failure(exceptions.FeatureNotFound())
165
166 if identity is not None and identity not in disco_infos.identities:
167 raise failure.Failure(exceptions.FeatureNotFound())
168
169 async def has_identity(
170 self,
171 client: SatXMPPEntity,
172 category: str,
173 type_: str,
174 jid_: Optional[jid.JID] = None,
175 node: str = ""
176 ) -> bool:
177 """Tell if an entity has the requested identity
178
179 @param category: identity category
180 @param type_: identity type
181 @param jid_: jid of the target, or None for profile's server
182 @param node(unicode): optional node to use for disco request
183 @return: True if the entity has the given identity
184 """
185 disco_infos = await self.get_infos(client, jid_, node)
186 return (category, type_) in disco_infos.identities
187
188 def get_infos(self, client, jid_=None, node="", use_cache=True):
189 """get disco infos from jid_, filling capability hash if needed
190
191 @param jid_: jid of the target, or None for profile's server
192 @param node(unicode): optional node to use for disco request
193 @param use_cache(bool): if True, use cached data if available
194 @return: a Deferred which fire disco.DiscoInfo
195 """
196 if jid_ is None:
197 jid_ = jid.JID(client.jid.host)
198 try:
199 if not use_cache:
200 # we ignore cache, so we pretend we haven't found it
201 raise KeyError
202 cap_hash = self.host.memory.entity_data_get(
203 client, jid_, [C.ENTITY_CAP_HASH]
204 )[C.ENTITY_CAP_HASH]
205 except (KeyError, exceptions.UnknownEntityError):
206 # capability hash is not available, we'll compute one
207 def infos_cb(disco_infos):
208 cap_hash = self.generate_hash(disco_infos)
209 for ext_form in disco_infos.extensions.values():
210 # wokkel doesn't call typeCheck on reception, so we do it here
211 # to avoid ending up with incorrect types. We have to do it after
212 # the hash has been generated (str value is needed to compute the
213 # hash)
214 ext_form.typeCheck()
215 self.hashes[cap_hash] = disco_infos
216 self.host.memory.update_entity_data(
217 client, jid_, C.ENTITY_CAP_HASH, cap_hash
218 )
219 return disco_infos
220
221 def infos_eb(fail):
222 if fail.check(defer.CancelledError):
223 reason = "request time-out"
224 fail = failure.Failure(exceptions.TimeOutError(str(fail.value)))
225 else:
226 try:
227 reason = str(fail.value)
228 except AttributeError:
229 reason = str(fail)
230
231 log.warning(
232 "can't request disco infos from {jid}: {reason}".format(
233 jid=jid_.full(), reason=reason
234 )
235 )
236
237 # XXX we set empty disco in cache, to avoid getting an error or waiting
238 # for a timeout again the next time
239 self.host.memory.update_entity_data(
240 client, jid_, C.ENTITY_CAP_HASH, CAP_HASH_ERROR
241 )
242 raise fail
243
244 d = client.disco.requestInfo(jid_, nodeIdentifier=node)
245 d.addCallback(infos_cb)
246 d.addErrback(infos_eb)
247 return d
248 else:
249 disco_infos = self.hashes[cap_hash]
250 return defer.succeed(disco_infos)
251
252 @defer.inlineCallbacks
253 def get_items(self, client, jid_=None, node="", use_cache=True):
254 """get disco items from jid_, cache them for our own server
255
256 @param jid_(jid.JID): jid of the target, or None for profile's server
257 @param node(unicode): optional node to use for disco request
258 @param use_cache(bool): if True, use cached data if available
259 @return: a Deferred which fire disco.DiscoItems
260 """
261 if jid_ is None:
262 jid_ = client.server_jid
263
264 if jid_ == client.server_jid and not node:
265 # we cache items only for our own server and if node is not set
266 try:
267 items = self.host.memory.entity_data_get(
268 client, jid_, ["DISCO_ITEMS"]
269 )["DISCO_ITEMS"]
270 log.debug("[%s] disco items are in cache" % jid_.full())
271 if not use_cache:
272 # we ignore cache, so we pretend we haven't found it
273 raise KeyError
274 except (KeyError, exceptions.UnknownEntityError):
275 log.debug("Caching [%s] disco items" % jid_.full())
276 items = yield client.disco.requestItems(jid_, nodeIdentifier=node)
277 self.host.memory.update_entity_data(
278 client, jid_, "DISCO_ITEMS", items
279 )
280 else:
281 try:
282 items = yield client.disco.requestItems(jid_, nodeIdentifier=node)
283 except StanzaError as e:
284 log.warning(
285 "Error while requesting items for {jid}: {reason}".format(
286 jid=jid_.full(), reason=e.condition
287 )
288 )
289 items = disco.DiscoItems()
290
291 defer.returnValue(items)
292
293 def _infos_eb(self, failure_, entity_jid):
294 failure_.trap(StanzaError)
295 log.warning(
296 _("Error while requesting [%(jid)s]: %(error)s")
297 % {"jid": entity_jid.full(), "error": failure_.getErrorMessage()}
298 )
299
300 def find_service_entity(self, client, category, type_, jid_=None):
301 """Helper method to find first available entity from find_service_entities
302
303 args are the same as for [find_service_entities]
304 @return (jid.JID, None): found entity
305 """
306 d = self.host.find_service_entities(client, category, type_)
307 d.addCallback(lambda entities: entities.pop() if entities else None)
308 return d
309
310 def find_service_entities(self, client, category, type_, jid_=None):
311 """Return all available items of an entity which correspond to (category, type_)
312
313 @param category: identity's category
314 @param type_: identitiy's type
315 @param jid_: the jid of the target server (None for profile's server)
316 @return: a set of found entities
317 @raise defer.CancelledError: the request timed out
318 """
319 found_entities = set()
320
321 def infos_cb(infos, entity_jid):
322 if (category, type_) in infos.identities:
323 found_entities.add(entity_jid)
324
325 def got_items(items):
326 defers_list = []
327 for item in items:
328 info_d = self.get_infos(client, item.entity)
329 info_d.addCallbacks(
330 infos_cb, self._infos_eb, [item.entity], None, [item.entity]
331 )
332 defers_list.append(info_d)
333 return defer.DeferredList(defers_list)
334
335 d = self.get_items(client, jid_)
336 d.addCallback(got_items)
337 d.addCallback(lambda __: found_entities)
338 reactor.callLater(
339 TIMEOUT, d.cancel
340 ) # FIXME: one bad service make a general timeout
341 return d
342
343 def find_features_set(self, client, features, identity=None, jid_=None):
344 """Return entities (including jid_ and its items) offering features
345
346 @param features: iterable of features which must be present
347 @param identity(None, tuple(unicode, unicode)): if not None, accept only this
348 (category/type) identity
349 @param jid_: the jid of the target server (None for profile's server)
350 @param profile: %(doc_profile)s
351 @return: a set of found entities
352 """
353 if jid_ is None:
354 jid_ = jid.JID(client.jid.host)
355 features = set(features)
356 found_entities = set()
357
358 def infos_cb(infos, entity):
359 if entity is None:
360 log.warning(_("received an item without jid"))
361 return
362 if identity is not None and identity not in infos.identities:
363 return
364 if features.issubset(infos.features):
365 found_entities.add(entity)
366
367 def got_items(items):
368 defer_list = []
369 for entity in [jid_] + [item.entity for item in items]:
370 infos_d = self.get_infos(client, entity)
371 infos_d.addCallbacks(infos_cb, self._infos_eb, [entity], None, [entity])
372 defer_list.append(infos_d)
373 return defer.DeferredList(defer_list)
374
375 d = self.get_items(client, jid_)
376 d.addCallback(got_items)
377 d.addCallback(lambda __: found_entities)
378 reactor.callLater(
379 TIMEOUT, d.cancel
380 ) # FIXME: one bad service make a general timeout
381 return d
382
383 def generate_hash(self, services):
384 """ Generate a unique hash for given service
385
386 hash algorithm is the one described in XEP-0115
387 @param services: iterable of disco.DiscoIdentity/disco.DiscoFeature, as returned by discoHandler.info
388
389 """
390 s = []
391 # identities
392 byte_identities = [
393 ByteIdentity(service)
394 for service in services
395 if isinstance(service, disco.DiscoIdentity)
396 ] # FIXME: lang must be managed here
397 byte_identities.sort(key=lambda i: i.lang)
398 byte_identities.sort(key=lambda i: i.idType)
399 byte_identities.sort(key=lambda i: i.category)
400 for identity in byte_identities:
401 s.append(bytes(identity))
402 s.append(b"<")
403 # features
404 byte_features = [
405 service.encode("utf-8")
406 for service in services
407 if isinstance(service, disco.DiscoFeature)
408 ]
409 byte_features.sort() # XXX: the default sort has the same behaviour as the requested RFC 4790 i;octet sort
410 for feature in byte_features:
411 s.append(feature)
412 s.append(b"<")
413
414 # extensions
415 ext = list(services.extensions.values())
416 ext.sort(key=lambda f: f.formNamespace.encode('utf-8'))
417 for extension in ext:
418 s.append(extension.formNamespace.encode('utf-8'))
419 s.append(b"<")
420 fields = extension.fieldList
421 fields.sort(key=lambda f: f.var.encode('utf-8'))
422 for field in fields:
423 s.append(field.var.encode('utf-8'))
424 s.append(b"<")
425 values = [v.encode('utf-8') for v in field.values]
426 values.sort()
427 for value in values:
428 s.append(value)
429 s.append(b"<")
430
431 cap_hash = b64encode(sha1(b"".join(s)).digest()).decode('utf-8')
432 log.debug(_("Capability hash generated: [{cap_hash}]").format(cap_hash=cap_hash))
433 return cap_hash
434
435 @defer.inlineCallbacks
436 def _disco_infos(
437 self, entity_jid_s, node="", use_cache=True, profile_key=C.PROF_KEY_NONE
438 ):
439 """Discovery method for the bridge
440 @param entity_jid_s: entity we want to discover
441 @param use_cache(bool): if True, use cached data if available
442 @param node(unicode): optional node to use
443
444 @return: list of tuples
445 """
446 client = self.host.get_client(profile_key)
447 entity = jid.JID(entity_jid_s)
448 disco_infos = yield self.get_infos(client, entity, node, use_cache)
449 extensions = {}
450 # FIXME: should extensions be serialised using tools.common.data_format?
451 for form_type, form in list(disco_infos.extensions.items()):
452 fields = []
453 for field in form.fieldList:
454 data = {"type": field.fieldType}
455 for attr in ("var", "label", "desc"):
456 value = getattr(field, attr)
457 if value is not None:
458 data[attr] = value
459
460 values = [field.value] if field.value is not None else field.values
461 if field.fieldType == "boolean":
462 values = [C.bool_const(v) for v in values]
463 fields.append((data, values))
464
465 extensions[form_type or ""] = fields
466
467 defer.returnValue((
468 [str(f) for f in disco_infos.features],
469 [(cat, type_, name or "")
470 for (cat, type_), name in list(disco_infos.identities.items())],
471 extensions))
472
473 def items2tuples(self, disco_items):
474 """convert disco items to tuple of strings
475
476 @param disco_items(iterable[disco.DiscoItem]): items
477 @return G(tuple[unicode,unicode,unicode]): serialised items
478 """
479 for item in disco_items:
480 if not item.entity:
481 log.warning(_("invalid item (no jid)"))
482 continue
483 yield (item.entity.full(), item.nodeIdentifier or "", item.name or "")
484
485 @defer.inlineCallbacks
486 def _disco_items(
487 self, entity_jid_s, node="", use_cache=True, profile_key=C.PROF_KEY_NONE
488 ):
489 """ Discovery method for the bridge
490
491 @param entity_jid_s: entity we want to discover
492 @param node(unicode): optional node to use
493 @param use_cache(bool): if True, use cached data if available
494 @return: list of tuples"""
495 client = self.host.get_client(profile_key)
496 entity = jid.JID(entity_jid_s)
497 disco_items = yield self.get_items(client, entity, node, use_cache)
498 ret = list(self.items2tuples(disco_items))
499 defer.returnValue(ret)