comparison libervia/backend/memory/memory.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/memory/memory.py@524856bd7b19
children 02f0adc745c6
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia: an XMPP client
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import os.path
20 import copy
21 import shortuuid
22 import mimetypes
23 import time
24 from functools import partial
25 from typing import Optional, Tuple, Dict
26 from pathlib import Path
27 from uuid import uuid4
28 from collections import namedtuple
29 from twisted.python import failure
30 from twisted.internet import defer, reactor, error
31 from twisted.words.protocols.jabber import jid
32 from libervia.backend.core.i18n import _
33 from libervia.backend.core.log import getLogger
34 from libervia.backend.core import exceptions
35 from libervia.backend.core.constants import Const as C
36 from libervia.backend.memory.sqla import Storage
37 from libervia.backend.memory.persistent import PersistentDict
38 from libervia.backend.memory.params import Params
39 from libervia.backend.memory.disco import Discovery
40 from libervia.backend.memory.crypto import BlockCipher
41 from libervia.backend.memory.crypto import PasswordHasher
42 from libervia.backend.tools import config as tools_config
43 from libervia.backend.tools.common import data_format
44 from libervia.backend.tools.common import regex
45
46
47 log = getLogger(__name__)
48
49
50 PresenceTuple = namedtuple("PresenceTuple", ("show", "priority", "statuses"))
51 MSG_NO_SESSION = "Session id doesn't exist or is finished"
52
53
54 class Sessions(object):
55 """Sessions are data associated to key used for a temporary moment, with optional profile checking."""
56
57 DEFAULT_TIMEOUT = 600
58
59 def __init__(self, timeout=None, resettable_timeout=True):
60 """
61 @param timeout (int): nb of seconds before session destruction
62 @param resettable_timeout (bool): if True, the timeout is reset on each access
63 """
64 self._sessions = dict()
65 self.timeout = timeout or Sessions.DEFAULT_TIMEOUT
66 self.resettable_timeout = resettable_timeout
67
68 def new_session(self, session_data=None, session_id=None, profile=None):
69 """Create a new session
70
71 @param session_data: mutable data to use, default to a dict
72 @param session_id (str): force the session_id to the given string
73 @param profile: if set, the session is owned by the profile,
74 and profile_get must be used instead of __getitem__
75 @return: session_id, session_data
76 """
77 if session_id is None:
78 session_id = str(uuid4())
79 elif session_id in self._sessions:
80 raise exceptions.ConflictError(
81 "Session id {} is already used".format(session_id)
82 )
83 timer = reactor.callLater(self.timeout, self._purge_session, session_id)
84 if session_data is None:
85 session_data = {}
86 self._sessions[session_id] = (
87 (timer, session_data) if profile is None else (timer, session_data, profile)
88 )
89 return session_id, session_data
90
91 def _purge_session(self, session_id):
92 try:
93 timer, session_data, profile = self._sessions[session_id]
94 except ValueError:
95 timer, session_data = self._sessions[session_id]
96 profile = None
97 try:
98 timer.cancel()
99 except error.AlreadyCalled:
100 # if the session is time-outed, the timer has been called
101 pass
102 del self._sessions[session_id]
103 log.debug(
104 "Session {} purged{}".format(
105 session_id,
106 " (profile {})".format(profile) if profile is not None else "",
107 )
108 )
109
110 def __len__(self):
111 return len(self._sessions)
112
113 def __contains__(self, session_id):
114 return session_id in self._sessions
115
116 def profile_get(self, session_id, profile):
117 try:
118 timer, session_data, profile_set = self._sessions[session_id]
119 except ValueError:
120 raise exceptions.InternalError(
121 "You need to use __getitem__ when profile is not set"
122 )
123 except KeyError:
124 raise failure.Failure(KeyError(MSG_NO_SESSION))
125 if profile_set != profile:
126 raise exceptions.InternalError("current profile differ from set profile !")
127 if self.resettable_timeout:
128 timer.reset(self.timeout)
129 return session_data
130
131 def __getitem__(self, session_id):
132 try:
133 timer, session_data = self._sessions[session_id]
134 except ValueError:
135 raise exceptions.InternalError(
136 "You need to use profile_get instead of __getitem__ when profile is set"
137 )
138 except KeyError:
139 raise failure.Failure(KeyError(MSG_NO_SESSION))
140 if self.resettable_timeout:
141 timer.reset(self.timeout)
142 return session_data
143
144 def __setitem__(self, key, value):
145 raise NotImplementedError("You need do use new_session to create a session")
146
147 def __delitem__(self, session_id):
148 """ delete the session data """
149 self._purge_session(session_id)
150
151 def keys(self):
152 return list(self._sessions.keys())
153
154 def iterkeys(self):
155 return iter(self._sessions.keys())
156
157
158 class ProfileSessions(Sessions):
159 """ProfileSessions extends the Sessions class, but here the profile can be
160 used as the key to retrieve data or delete a session (instead of session id).
161 """
162
163 def _profile_get_all_ids(self, profile):
164 """Return a list of the sessions ids that are associated to the given profile.
165
166 @param profile: %(doc_profile)s
167 @return: a list containing the sessions ids
168 """
169 ret = []
170 for session_id in self._sessions.keys():
171 try:
172 timer, session_data, profile_set = self._sessions[session_id]
173 except ValueError:
174 continue
175 if profile == profile_set:
176 ret.append(session_id)
177 return ret
178
179 def profile_get_unique(self, profile):
180 """Return the data of the unique session that is associated to the given profile.
181
182 @param profile: %(doc_profile)s
183 @return:
184 - mutable data (default: dict) of the unique session
185 - None if no session is associated to the profile
186 - raise an error if more than one session are found
187 """
188 ids = self._profile_get_all_ids(profile)
189 if len(ids) > 1:
190 raise exceptions.InternalError(
191 "profile_get_unique has been used but more than one session has been found!"
192 )
193 return (
194 self.profile_get(ids[0], profile) if len(ids) == 1 else None
195 ) # XXX: timeout might be reset
196
197 def profile_del_unique(self, profile):
198 """Delete the unique session that is associated to the given profile.
199
200 @param profile: %(doc_profile)s
201 @return: None, but raise an error if more than one session are found
202 """
203 ids = self._profile_get_all_ids(profile)
204 if len(ids) > 1:
205 raise exceptions.InternalError(
206 "profile_del_unique has been used but more than one session has been found!"
207 )
208 if len(ids) == 1:
209 del self._sessions[ids[0]]
210
211
212 class PasswordSessions(ProfileSessions):
213
214 # FIXME: temporary hack for the user personal key not to be lost. The session
215 # must actually be purged and later, when the personal key is needed, the
216 # profile password should be asked again in order to decrypt it.
217 def __init__(self, timeout=None):
218 ProfileSessions.__init__(self, timeout, resettable_timeout=False)
219
220 def _purge_session(self, session_id):
221 log.debug(
222 "FIXME: PasswordSessions should ask for the profile password after the session expired"
223 )
224
225
226 class Memory:
227 """This class manage all the persistent information"""
228
229 def __init__(self, host):
230 log.info(_("Memory manager init"))
231 self.host = host
232 self._entities_cache = {} # XXX: keep presence/last resource/other data in cache
233 # /!\ an entity is not necessarily in roster
234 # main key is bare jid, value is a dict
235 # where main key is resource, or None for bare jid
236 self._key_signals = set() # key which need a signal to frontends when updated
237 self.subscriptions = {}
238 self.auth_sessions = PasswordSessions() # remember the authenticated profiles
239 self.disco = Discovery(host)
240 self.config = tools_config.parse_main_conf(log_filenames=True)
241 self._cache_path = Path(self.config_get("", "local_dir"), C.CACHE_DIR)
242 self.admins = self.config_get("", "admins_list", [])
243 self.admin_jids = set()
244
245
246 async def initialise(self):
247 self.storage = Storage()
248 await self.storage.initialise()
249 PersistentDict.storage = self.storage
250 self.params = Params(self.host, self.storage)
251 log.info(_("Loading default params template"))
252 self.params.load_default_params()
253 await self.load()
254 self.memory_data = PersistentDict("memory")
255 await self.memory_data.load()
256 await self.disco.load()
257 for admin in self.admins:
258 try:
259 admin_jid_s = await self.param_get_a_async(
260 "JabberID", "Connection", profile_key=admin
261 )
262 except Exception as e:
263 log.warning(f"Can't retrieve jid of admin {admin!r}: {e}")
264 else:
265 if admin_jid_s is not None:
266 try:
267 admin_jid = jid.JID(admin_jid_s).userhostJID()
268 except RuntimeError:
269 log.warning(f"Invalid JID for admin {admin}: {admin_jid_s}")
270 else:
271 self.admin_jids.add(admin_jid)
272
273
274 ## Configuration ##
275
276 def config_get(self, section, name, default=None):
277 """Get the main configuration option
278
279 @param section: section of the config file (None or '' for DEFAULT)
280 @param name: name of the option
281 @param default: value to use if not found
282 @return: str, list or dict
283 """
284 return tools_config.config_get(self.config, section, name, default)
285
286 def load_xml(self, filename):
287 """Load parameters template from xml file
288
289 @param filename (str): input file
290 @return: bool: True in case of success
291 """
292 if not filename:
293 return False
294 filename = os.path.expanduser(filename)
295 if os.path.exists(filename):
296 try:
297 self.params.load_xml(filename)
298 log.debug(_("Parameters loaded from file: %s") % filename)
299 return True
300 except Exception as e:
301 log.error(_("Can't load parameters from file: %s") % e)
302 return False
303
304 def save_xml(self, filename):
305 """Save parameters template to xml file
306
307 @param filename (str): output file
308 @return: bool: True in case of success
309 """
310 if not filename:
311 return False
312 # TODO: need to encrypt files (at least passwords !) and set permissions
313 filename = os.path.expanduser(filename)
314 try:
315 self.params.save_xml(filename)
316 log.debug(_("Parameters saved to file: %s") % filename)
317 return True
318 except Exception as e:
319 log.error(_("Can't save parameters to file: %s") % e)
320 return False
321
322 def load(self):
323 """Load parameters and all memory things from db"""
324 # parameters data
325 return self.params.load_gen_params()
326
327 def load_individual_params(self, profile):
328 """Load individual parameters for a profile
329 @param profile: %(doc_profile)s"""
330 return self.params.load_ind_params(profile)
331
332 ## Profiles/Sessions management ##
333
334 def start_session(self, password, profile):
335 """"Iniatialise session for a profile
336
337 @param password(unicode): profile session password
338 or empty string is no password is set
339 @param profile: %(doc_profile)s
340 @raise exceptions.ProfileUnknownError if profile doesn't exists
341 @raise exceptions.PasswordError: the password does not match
342 """
343 profile = self.get_profile_name(profile)
344
345 def create_session(__):
346 """Called once params are loaded."""
347 self._entities_cache[profile] = {}
348 log.info("[{}] Profile session started".format(profile))
349 return False
350
351 def backend_initialised(__):
352 def do_start_session(__=None):
353 if self.is_session_started(profile):
354 log.info("Session already started!")
355 return True
356 try:
357 # if there is a value at this point in self._entities_cache,
358 # it is the load_individual_params Deferred, the session is starting
359 session_d = self._entities_cache[profile]
360 except KeyError:
361 # else we do request the params
362 session_d = self._entities_cache[profile] = self.load_individual_params(
363 profile
364 )
365 session_d.addCallback(create_session)
366 finally:
367 return session_d
368
369 auth_d = defer.ensureDeferred(self.profile_authenticate(password, profile))
370 auth_d.addCallback(do_start_session)
371 return auth_d
372
373 if self.host.initialised.called:
374 return defer.succeed(None).addCallback(backend_initialised)
375 else:
376 return self.host.initialised.addCallback(backend_initialised)
377
378 def stop_session(self, profile):
379 """Delete a profile session
380
381 @param profile: %(doc_profile)s
382 """
383 if self.host.is_connected(profile):
384 log.debug("Disconnecting profile because of session stop")
385 self.host.disconnect(profile)
386 self.auth_sessions.profile_del_unique(profile)
387 try:
388 self._entities_cache[profile]
389 except KeyError:
390 log.warning("Profile was not in cache")
391
392 def _is_session_started(self, profile_key):
393 return self.is_session_started(self.get_profile_name(profile_key))
394
395 def is_session_started(self, profile):
396 try:
397 # XXX: if the value in self._entities_cache is a Deferred,
398 # the session is starting but not started yet
399 return not isinstance(self._entities_cache[profile], defer.Deferred)
400 except KeyError:
401 return False
402
403 async def profile_authenticate(self, password, profile):
404 """Authenticate the profile.
405
406 @param password (unicode): the SàT profile password
407 @return: None in case of success (an exception is raised otherwise)
408 @raise exceptions.PasswordError: the password does not match
409 """
410 if not password and self.auth_sessions.profile_get_unique(profile):
411 # XXX: this allows any frontend to connect with the empty password as soon as
412 # the profile has been authenticated at least once before. It is OK as long as
413 # submitting a form with empty passwords is restricted to local frontends.
414 return
415
416 sat_cipher = await self.param_get_a_async(
417 C.PROFILE_PASS_PATH[1], C.PROFILE_PASS_PATH[0], profile_key=profile
418 )
419 valid = PasswordHasher.verify(password, sat_cipher)
420 if not valid:
421 log.warning(_("Authentication failure of profile {profile}").format(
422 profile=profile))
423 raise exceptions.PasswordError("The provided profile password doesn't match.")
424 return await self.new_auth_session(password, profile)
425
426 async def new_auth_session(self, key, profile):
427 """Start a new session for the authenticated profile.
428
429 If there is already an existing session, no new one is created
430 The personal key is loaded encrypted from a PersistentDict before being decrypted.
431
432 @param key: the key to decrypt the personal key
433 @param profile: %(doc_profile)s
434 """
435 data = await PersistentDict(C.MEMORY_CRYPTO_NAMESPACE, profile).load()
436 personal_key = BlockCipher.decrypt(key, data[C.MEMORY_CRYPTO_KEY])
437 # Create the session for this profile and store the personal key
438 session_data = self.auth_sessions.profile_get_unique(profile)
439 if not session_data:
440 self.auth_sessions.new_session(
441 {C.MEMORY_CRYPTO_KEY: personal_key}, profile=profile
442 )
443 log.debug("auth session created for profile %s" % profile)
444
445 def purge_profile_session(self, profile):
446 """Delete cache of data of profile
447 @param profile: %(doc_profile)s"""
448 log.info(_("[%s] Profile session purge" % profile))
449 self.params.purge_profile(profile)
450 try:
451 del self._entities_cache[profile]
452 except KeyError:
453 log.error(
454 _(
455 "Trying to purge roster status cache for a profile not in memory: [%s]"
456 )
457 % profile
458 )
459
460 def get_profiles_list(self, clients=True, components=False):
461 """retrieve profiles list
462
463 @param clients(bool): if True return clients profiles
464 @param components(bool): if True return components profiles
465 @return (list[unicode]): selected profiles
466 """
467 if not clients and not components:
468 log.warning(_("requesting no profiles at all"))
469 return []
470 profiles = self.storage.get_profiles_list()
471 if clients and components:
472 return sorted(profiles)
473 is_component = self.storage.profile_is_component
474 if clients:
475 p_filter = lambda p: not is_component(p)
476 else:
477 p_filter = lambda p: is_component(p)
478
479 return sorted(p for p in profiles if p_filter(p))
480
481 def get_profile_name(self, profile_key, return_profile_keys=False):
482 """Return name of profile from keyword
483
484 @param profile_key: can be the profile name or a keyword (like @DEFAULT@)
485 @param return_profile_keys: if True, return unmanaged profile keys (like "@ALL@"). This keys must be managed by the caller
486 @return: requested profile name
487 @raise exceptions.ProfileUnknownError if profile doesn't exists
488 """
489 return self.params.get_profile_name(profile_key, return_profile_keys)
490
491 def profile_set_default(self, profile):
492 """Set default profile
493
494 @param profile: %(doc_profile)s
495 """
496 # we want to be sure that the profile exists
497 profile = self.get_profile_name(profile)
498
499 self.memory_data["Profile_default"] = profile
500
501 def create_profile(self, name, password, component=None):
502 """Create a new profile
503
504 @param name(unicode): profile name
505 @param password(unicode): profile password
506 Can be empty to disable password
507 @param component(None, unicode): set to entry point if this is a component
508 @return: Deferred
509 @raise exceptions.NotFound: component is not a known plugin import name
510 """
511 if not name:
512 raise ValueError("Empty profile name")
513 if name[0] == "@":
514 raise ValueError("A profile name can't start with a '@'")
515 if "\n" in name:
516 raise ValueError("A profile name can't contain line feed ('\\n')")
517
518 if name in self._entities_cache:
519 raise exceptions.ConflictError("A session for this profile exists")
520
521 if component:
522 if not component in self.host.plugins:
523 raise exceptions.NotFound(
524 _(
525 "Can't find component {component} entry point".format(
526 component=component
527 )
528 )
529 )
530 # FIXME: PLUGIN_INFO is not currently accessible after import, but type shoul be tested here
531 #  if self.host.plugins[component].PLUGIN_INFO[u"type"] != C.PLUG_TYPE_ENTRY_POINT:
532 #   raise ValueError(_(u"Plugin {component} is not an entry point !".format(
533 #   component = component)))
534
535 d = self.params.create_profile(name, component)
536
537 def init_personal_key(__):
538 # be sure to call this after checking that the profile doesn't exist yet
539
540 # generated once for all and saved in a PersistentDict
541 personal_key = BlockCipher.get_random_key(
542 base64=True
543 ).decode('utf-8')
544 self.auth_sessions.new_session(
545 {C.MEMORY_CRYPTO_KEY: personal_key}, profile=name
546 ) # will be encrypted by param_set
547
548 def start_fake_session(__):
549 # avoid ProfileNotConnected exception in param_set
550 self._entities_cache[name] = None
551 self.params.load_ind_params(name)
552
553 def stop_fake_session(__):
554 del self._entities_cache[name]
555 self.params.purge_profile(name)
556
557 d.addCallback(init_personal_key)
558 d.addCallback(start_fake_session)
559 d.addCallback(
560 lambda __: self.param_set(
561 C.PROFILE_PASS_PATH[1], password, C.PROFILE_PASS_PATH[0], profile_key=name
562 )
563 )
564 d.addCallback(stop_fake_session)
565 d.addCallback(lambda __: self.auth_sessions.profile_del_unique(name))
566 return d
567
568 def profile_delete_async(self, name, force=False):
569 """Delete an existing profile
570
571 @param name: Name of the profile
572 @param force: force the deletion even if the profile is connected.
573 To be used for direct calls only (not through the bridge).
574 @return: a Deferred instance
575 """
576
577 def clean_memory(__):
578 self.auth_sessions.profile_del_unique(name)
579 try:
580 del self._entities_cache[name]
581 except KeyError:
582 pass
583
584 d = self.params.profile_delete_async(name, force)
585 d.addCallback(clean_memory)
586 return d
587
588 def is_component(self, profile_name):
589 """Tell if a profile is a component
590
591 @param profile_name(unicode): name of the profile
592 @return (bool): True if profile is a component
593 @raise exceptions.NotFound: profile doesn't exist
594 """
595 return self.storage.profile_is_component(profile_name)
596
597 def get_entry_point(self, profile_name):
598 """Get a component entry point
599
600 @param profile_name(unicode): name of the profile
601 @return (bool): True if profile is a component
602 @raise exceptions.NotFound: profile doesn't exist
603 """
604 return self.storage.get_entry_point(profile_name)
605
606 ## History ##
607
608 def add_to_history(self, client, data):
609 return self.storage.add_to_history(data, client.profile)
610
611 def _history_get_serialise(self, history_data):
612 return [
613 (uid, timestamp, from_jid, to_jid, message, subject, mess_type,
614 data_format.serialise(extra)) for uid, timestamp, from_jid, to_jid, message,
615 subject, mess_type, extra in history_data
616 ]
617
618 def _history_get(self, from_jid_s, to_jid_s, limit=C.HISTORY_LIMIT_NONE, between=True,
619 filters=None, profile=C.PROF_KEY_NONE):
620 d = self.history_get(jid.JID(from_jid_s), jid.JID(to_jid_s), limit, between,
621 filters, profile)
622 d.addCallback(self._history_get_serialise)
623 return d
624
625 def history_get(self, from_jid, to_jid, limit=C.HISTORY_LIMIT_NONE, between=True,
626 filters=None, profile=C.PROF_KEY_NONE):
627 """Retrieve messages in history
628
629 @param from_jid (JID): source JID (full, or bare for catchall)
630 @param to_jid (JID): dest JID (full, or bare for catchall)
631 @param limit (int): maximum number of messages to get:
632 - 0 for no message (returns the empty list)
633 - C.HISTORY_LIMIT_NONE or None for unlimited
634 - C.HISTORY_LIMIT_DEFAULT to use the HISTORY_LIMIT parameter value
635 @param between (bool): confound source and dest (ignore the direction)
636 @param filters (dict[unicode, unicode]): pattern to filter the history results
637 (see bridge API for details)
638 @param profile (str): %(doc_profile)s
639 @return (D(list)): list of message data as in [message_new]
640 """
641 assert profile != C.PROF_KEY_NONE
642 if limit == C.HISTORY_LIMIT_DEFAULT:
643 limit = int(self.param_get_a(C.HISTORY_LIMIT, "General", profile_key=profile))
644 elif limit == C.HISTORY_LIMIT_NONE:
645 limit = None
646 if limit == 0:
647 return defer.succeed([])
648 return self.storage.history_get(from_jid, to_jid, limit, between, filters, profile)
649
650 ## Statuses ##
651
652 def _get_presence_statuses(self, profile_key):
653 ret = self.presence_statuses_get(profile_key)
654 return {entity.full(): data for entity, data in ret.items()}
655
656 def presence_statuses_get(self, profile_key):
657 """Get all the presence statuses of a profile
658
659 @param profile_key: %(doc_profile_key)s
660 @return: presence data: key=entity JID, value=presence data for this entity
661 """
662 client = self.host.get_client(profile_key)
663 profile_cache = self._get_profile_cache(client)
664 entities_presence = {}
665
666 for entity_jid, entity_data in profile_cache.items():
667 for resource, resource_data in entity_data.items():
668 full_jid = copy.copy(entity_jid)
669 full_jid.resource = resource
670 try:
671 presence_data = self.get_entity_datum(client, full_jid, "presence")
672 except KeyError:
673 continue
674 entities_presence.setdefault(entity_jid, {})[
675 resource or ""
676 ] = presence_data
677
678 return entities_presence
679
680 def set_presence_status(self, entity_jid, show, priority, statuses, profile_key):
681 """Change the presence status of an entity
682
683 @param entity_jid: jid.JID of the entity
684 @param show: show status
685 @param priority: priority
686 @param statuses: dictionary of statuses
687 @param profile_key: %(doc_profile_key)s
688 """
689 client = self.host.get_client(profile_key)
690 presence_data = PresenceTuple(show, priority, statuses)
691 self.update_entity_data(
692 client, entity_jid, "presence", presence_data
693 )
694 if entity_jid.resource and show != C.PRESENCE_UNAVAILABLE:
695 # If a resource is available, bare jid should not have presence information
696 try:
697 self.del_entity_datum(client, entity_jid.userhostJID(), "presence")
698 except (KeyError, exceptions.UnknownEntityError):
699 pass
700
701 ## Resources ##
702
703 def _get_all_resource(self, jid_s, profile_key):
704 client = self.host.get_client(profile_key)
705 jid_ = jid.JID(jid_s)
706 return self.get_all_resources(client, jid_)
707
708 def get_all_resources(self, client, entity_jid):
709 """Return all resource from jid for which we have had data in this session
710
711 @param entity_jid: bare jid of the entity
712 return (set[unicode]): set of resources
713
714 @raise exceptions.UnknownEntityError: if entity is not in cache
715 @raise ValueError: entity_jid has a resource
716 """
717 # FIXME: is there a need to keep cache data for resources which are not connected anymore?
718 if entity_jid.resource:
719 raise ValueError(
720 "get_all_resources must be used with a bare jid (got {})".format(entity_jid)
721 )
722 profile_cache = self._get_profile_cache(client)
723 try:
724 entity_data = profile_cache[entity_jid.userhostJID()]
725 except KeyError:
726 raise exceptions.UnknownEntityError(
727 "Entity {} not in cache".format(entity_jid)
728 )
729 resources = set(entity_data.keys())
730 resources.discard(None)
731 return resources
732
733 def get_available_resources(self, client, entity_jid):
734 """Return available resource for entity_jid
735
736 This method differs from get_all_resources by returning only available resources
737 @param entity_jid: bare jid of the entit
738 return (list[unicode]): list of available resources
739
740 @raise exceptions.UnknownEntityError: if entity is not in cache
741 """
742 available = []
743 for resource in self.get_all_resources(client, entity_jid):
744 full_jid = copy.copy(entity_jid)
745 full_jid.resource = resource
746 try:
747 presence_data = self.get_entity_datum(client, full_jid, "presence")
748 except KeyError:
749 log.debug("Can't get presence data for {}".format(full_jid))
750 else:
751 if presence_data.show != C.PRESENCE_UNAVAILABLE:
752 available.append(resource)
753 return available
754
755 def _get_main_resource(self, jid_s, profile_key):
756 client = self.host.get_client(profile_key)
757 jid_ = jid.JID(jid_s)
758 return self.main_resource_get(client, jid_) or ""
759
760 def main_resource_get(self, client, entity_jid):
761 """Return the main resource used by an entity
762
763 @param entity_jid: bare entity jid
764 @return (unicode): main resource or None
765 """
766 if entity_jid.resource:
767 raise ValueError(
768 "main_resource_get must be used with a bare jid (got {})".format(entity_jid)
769 )
770 try:
771 if self.host.plugins["XEP-0045"].is_joined_room(client, entity_jid):
772 return None # MUC rooms have no main resource
773 except KeyError: # plugin not found
774 pass
775 try:
776 resources = self.get_all_resources(client, entity_jid)
777 except exceptions.UnknownEntityError:
778 log.warning("Entity is not in cache, we can't find any resource")
779 return None
780 priority_resources = []
781 for resource in resources:
782 full_jid = copy.copy(entity_jid)
783 full_jid.resource = resource
784 try:
785 presence_data = self.get_entity_datum(client, full_jid, "presence")
786 except KeyError:
787 log.debug("No presence information for {}".format(full_jid))
788 continue
789 priority_resources.append((resource, presence_data.priority))
790 try:
791 return max(priority_resources, key=lambda res_tuple: res_tuple[1])[0]
792 except ValueError:
793 log.warning("No resource found at all for {}".format(entity_jid))
794 return None
795
796 ## Entities data ##
797
798 def _get_profile_cache(self, client):
799 """Check profile validity and return its cache
800
801 @param client: SatXMPPClient
802 @return (dict): profile cache
803 """
804 return self._entities_cache[client.profile]
805
806 def set_signal_on_update(self, key, signal=True):
807 """Set a signal flag on the key
808
809 When the key will be updated, a signal will be sent to frontends
810 @param key: key to signal
811 @param signal(boolean): if True, do the signal
812 """
813 if signal:
814 self._key_signals.add(key)
815 else:
816 self._key_signals.discard(key)
817
818 def get_all_entities_iter(self, client, with_bare=False):
819 """Return an iterator of full jids of all entities in cache
820
821 @param with_bare: if True, include bare jids
822 @return (list[unicode]): list of jids
823 """
824 profile_cache = self._get_profile_cache(client)
825 # we construct a list of all known full jids (bare jid of entities x resources)
826 for bare_jid, entity_data in profile_cache.items():
827 for resource in entity_data.keys():
828 if resource is None:
829 continue
830 full_jid = copy.copy(bare_jid)
831 full_jid.resource = resource
832 yield full_jid
833
834 def update_entity_data(
835 self, client, entity_jid, key, value, silent=False
836 ):
837 """Set a misc data for an entity
838
839 If key was registered with set_signal_on_update, a signal will be sent to frontends
840 @param entity_jid: JID of the entity, C.ENTITY_ALL_RESOURCES for all resources of
841 all entities, C.ENTITY_ALL for all entities (all resources + bare jids)
842 @param key: key to set (eg: C.ENTITY_TYPE)
843 @param value: value for this key (eg: C.ENTITY_TYPE_MUC)
844 @param silent(bool): if True, doesn't send signal to frontend, even if there is a
845 signal flag (see set_signal_on_update)
846 """
847 profile_cache = self._get_profile_cache(client)
848 if entity_jid in (C.ENTITY_ALL_RESOURCES, C.ENTITY_ALL):
849 entities = self.get_all_entities_iter(client, entity_jid == C.ENTITY_ALL)
850 else:
851 entities = (entity_jid,)
852
853 for jid_ in entities:
854 entity_data = profile_cache.setdefault(jid_.userhostJID(), {}).setdefault(
855 jid_.resource, {}
856 )
857
858 entity_data[key] = value
859 if key in self._key_signals and not silent:
860 self.host.bridge.entity_data_updated(
861 jid_.full(),
862 key,
863 data_format.serialise(value),
864 client.profile
865 )
866
867 def del_entity_datum(self, client, entity_jid, key):
868 """Delete a data for an entity
869
870 @param entity_jid: JID of the entity, C.ENTITY_ALL_RESOURCES for all resources of all entities,
871 C.ENTITY_ALL for all entities (all resources + bare jids)
872 @param key: key to delete (eg: C.ENTITY_TYPE)
873
874 @raise exceptions.UnknownEntityError: if entity is not in cache
875 @raise KeyError: key is not in cache
876 """
877 profile_cache = self._get_profile_cache(client)
878 if entity_jid in (C.ENTITY_ALL_RESOURCES, C.ENTITY_ALL):
879 entities = self.get_all_entities_iter(client, entity_jid == C.ENTITY_ALL)
880 else:
881 entities = (entity_jid,)
882
883 for jid_ in entities:
884 try:
885 entity_data = profile_cache[jid_.userhostJID()][jid_.resource]
886 except KeyError:
887 raise exceptions.UnknownEntityError(
888 "Entity {} not in cache".format(jid_)
889 )
890 try:
891 del entity_data[key]
892 except KeyError as e:
893 if entity_jid in (C.ENTITY_ALL_RESOURCES, C.ENTITY_ALL):
894 continue # we ignore KeyError when deleting keys from several entities
895 else:
896 raise e
897
898 def _get_entities_data(self, entities_jids, keys_list, profile_key):
899 client = self.host.get_client(profile_key)
900 ret = self.entities_data_get(
901 client, [jid.JID(jid_) for jid_ in entities_jids], keys_list
902 )
903 return {
904 jid_.full(): {k: data_format.serialise(v) for k,v in data.items()}
905 for jid_, data in ret.items()
906 }
907
908 def entities_data_get(self, client, entities_jids, keys_list=None):
909 """Get a list of cached values for several entities at once
910
911 @param entities_jids: jids of the entities, or empty list for all entities in cache
912 @param keys_list (iterable,None): list of keys to get, None for everything
913 @param profile_key: %(doc_profile_key)s
914 @return: dict withs values for each key in keys_list.
915 if there is no value of a given key, resulting dict will
916 have nothing with that key nether
917 if an entity doesn't exist in cache, it will not appear
918 in resulting dict
919
920 @raise exceptions.UnknownEntityError: if entity is not in cache
921 """
922
923 def fill_entity_data(entity_cache_data):
924 entity_data = {}
925 if keys_list is None:
926 entity_data = entity_cache_data
927 else:
928 for key in keys_list:
929 try:
930 entity_data[key] = entity_cache_data[key]
931 except KeyError:
932 continue
933 return entity_data
934
935 profile_cache = self._get_profile_cache(client)
936 ret_data = {}
937 if entities_jids:
938 for entity in entities_jids:
939 try:
940 entity_cache_data = profile_cache[entity.userhostJID()][
941 entity.resource
942 ]
943 except KeyError:
944 continue
945 ret_data[entity.full()] = fill_entity_data(entity_cache_data, keys_list)
946 else:
947 for bare_jid, data in profile_cache.items():
948 for resource, entity_cache_data in data.items():
949 full_jid = copy.copy(bare_jid)
950 full_jid.resource = resource
951 ret_data[full_jid] = fill_entity_data(entity_cache_data)
952
953 return ret_data
954
955 def _get_entity_data(self, entity_jid_s, keys_list=None, profile=C.PROF_KEY_NONE):
956 return self.entity_data_get(
957 self.host.get_client(profile), jid.JID(entity_jid_s), keys_list)
958
959 def entity_data_get(self, client, entity_jid, keys_list=None):
960 """Get a list of cached values for entity
961
962 @param entity_jid: JID of the entity
963 @param keys_list (iterable,None): list of keys to get, None for everything
964 @param profile_key: %(doc_profile_key)s
965 @return: dict withs values for each key in keys_list.
966 if there is no value of a given key, resulting dict will
967 have nothing with that key nether
968
969 @raise exceptions.UnknownEntityError: if entity is not in cache
970 """
971 profile_cache = self._get_profile_cache(client)
972 try:
973 entity_data = profile_cache[entity_jid.userhostJID()][entity_jid.resource]
974 except KeyError:
975 raise exceptions.UnknownEntityError(
976 "Entity {} not in cache (was requesting {})".format(
977 entity_jid, keys_list
978 )
979 )
980 if keys_list is None:
981 return entity_data
982
983 return {key: entity_data[key] for key in keys_list if key in entity_data}
984
985 def get_entity_datum(self, client, entity_jid, key):
986 """Get a datum from entity
987
988 @param entity_jid: JID of the entity
989 @param key: key to get
990 @return: requested value
991
992 @raise exceptions.UnknownEntityError: if entity is not in cache
993 @raise KeyError: if there is no value for this key and this entity
994 """
995 return self.entity_data_get(client, entity_jid, (key,))[key]
996
997 def del_entity_cache(
998 self, entity_jid, delete_all_resources=True, profile_key=C.PROF_KEY_NONE
999 ):
1000 """Remove all cached data for entity
1001
1002 @param entity_jid: JID of the entity to delete
1003 @param delete_all_resources: if True also delete all known resources from cache (a bare jid must be given in this case)
1004 @param profile_key: %(doc_profile_key)s
1005
1006 @raise exceptions.UnknownEntityError: if entity is not in cache
1007 """
1008 client = self.host.get_client(profile_key)
1009 profile_cache = self._get_profile_cache(client)
1010
1011 if delete_all_resources:
1012 if entity_jid.resource:
1013 raise ValueError(_("Need a bare jid to delete all resources"))
1014 try:
1015 del profile_cache[entity_jid]
1016 except KeyError:
1017 raise exceptions.UnknownEntityError(
1018 "Entity {} not in cache".format(entity_jid)
1019 )
1020 else:
1021 try:
1022 del profile_cache[entity_jid.userhostJID()][entity_jid.resource]
1023 except KeyError:
1024 raise exceptions.UnknownEntityError(
1025 "Entity {} not in cache".format(entity_jid)
1026 )
1027
1028 ## Encryption ##
1029
1030 def encrypt_value(self, value, profile):
1031 """Encrypt a value for the given profile. The personal key must be loaded
1032 already in the profile session, that should be the case if the profile is
1033 already authenticated.
1034
1035 @param value (str): the value to encrypt
1036 @param profile (str): %(doc_profile)s
1037 @return: the deferred encrypted value
1038 """
1039 try:
1040 personal_key = self.auth_sessions.profile_get_unique(profile)[
1041 C.MEMORY_CRYPTO_KEY
1042 ]
1043 except TypeError:
1044 raise exceptions.InternalError(
1045 _("Trying to encrypt a value for %s while the personal key is undefined!")
1046 % profile
1047 )
1048 return BlockCipher.encrypt(personal_key, value)
1049
1050 def decrypt_value(self, value, profile):
1051 """Decrypt a value for the given profile. The personal key must be loaded
1052 already in the profile session, that should be the case if the profile is
1053 already authenticated.
1054
1055 @param value (str): the value to decrypt
1056 @param profile (str): %(doc_profile)s
1057 @return: the deferred decrypted value
1058 """
1059 try:
1060 personal_key = self.auth_sessions.profile_get_unique(profile)[
1061 C.MEMORY_CRYPTO_KEY
1062 ]
1063 except TypeError:
1064 raise exceptions.InternalError(
1065 _("Trying to decrypt a value for %s while the personal key is undefined!")
1066 % profile
1067 )
1068 return BlockCipher.decrypt(personal_key, value)
1069
1070 def encrypt_personal_data(self, data_key, data_value, crypto_key, profile):
1071 """Re-encrypt a personal data (saved to a PersistentDict).
1072
1073 @param data_key: key for the individual PersistentDict instance
1074 @param data_value: the value to be encrypted
1075 @param crypto_key: the key to encrypt the value
1076 @param profile: %(profile_doc)s
1077 @return: a deferred None value
1078 """
1079
1080 def got_ind_memory(data):
1081 data[data_key] = BlockCipher.encrypt(crypto_key, data_value)
1082 return data.force(data_key)
1083
1084 def done(__):
1085 log.debug(
1086 _("Personal data (%(ns)s, %(key)s) has been successfuly encrypted")
1087 % {"ns": C.MEMORY_CRYPTO_NAMESPACE, "key": data_key}
1088 )
1089
1090 d = PersistentDict(C.MEMORY_CRYPTO_NAMESPACE, profile).load()
1091 return d.addCallback(got_ind_memory).addCallback(done)
1092
1093 ## Subscription requests ##
1094
1095 def add_waiting_sub(self, type_, entity_jid, profile_key):
1096 """Called when a subcription request is received"""
1097 profile = self.get_profile_name(profile_key)
1098 assert profile
1099 if profile not in self.subscriptions:
1100 self.subscriptions[profile] = {}
1101 self.subscriptions[profile][entity_jid] = type_
1102
1103 def del_waiting_sub(self, entity_jid, profile_key):
1104 """Called when a subcription request is finished"""
1105 profile = self.get_profile_name(profile_key)
1106 assert profile
1107 if profile in self.subscriptions and entity_jid in self.subscriptions[profile]:
1108 del self.subscriptions[profile][entity_jid]
1109
1110 def sub_waiting_get(self, profile_key):
1111 """Called to get a list of currently waiting subscription requests"""
1112 profile = self.get_profile_name(profile_key)
1113 if not profile:
1114 log.error(_("Asking waiting subscriptions for a non-existant profile"))
1115 return {}
1116 if profile not in self.subscriptions:
1117 return {}
1118
1119 return self.subscriptions[profile]
1120
1121 ## Parameters ##
1122
1123 def get_string_param_a(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE):
1124 return self.params.get_string_param_a(name, category, attr, profile_key)
1125
1126 def param_get_a(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE):
1127 return self.params.param_get_a(name, category, attr, profile_key=profile_key)
1128
1129 def param_get_a_async(
1130 self,
1131 name,
1132 category,
1133 attr="value",
1134 security_limit=C.NO_SECURITY_LIMIT,
1135 profile_key=C.PROF_KEY_NONE,
1136 ):
1137 return self.params.param_get_a_async(
1138 name, category, attr, security_limit, profile_key
1139 )
1140
1141 def _get_params_values_from_category(
1142 self, category, security_limit, app, extra_s, profile_key
1143 ):
1144 return self.params._get_params_values_from_category(
1145 category, security_limit, app, extra_s, profile_key
1146 )
1147
1148 def async_get_string_param_a(
1149 self, name, category, attribute="value", security_limit=C.NO_SECURITY_LIMIT,
1150 profile_key=C.PROF_KEY_NONE):
1151
1152 profile = self.get_profile_name(profile_key)
1153 return defer.ensureDeferred(self.params.async_get_string_param_a(
1154 name, category, attribute, security_limit, profile
1155 ))
1156
1157 def _get_params_ui(self, security_limit, app, extra_s, profile_key):
1158 return self.params._get_params_ui(security_limit, app, extra_s, profile_key)
1159
1160 def params_categories_get(self):
1161 return self.params.params_categories_get()
1162
1163 def param_set(
1164 self,
1165 name,
1166 value,
1167 category,
1168 security_limit=C.NO_SECURITY_LIMIT,
1169 profile_key=C.PROF_KEY_NONE,
1170 ):
1171 return self.params.param_set(name, value, category, security_limit, profile_key)
1172
1173 def update_params(self, xml):
1174 return self.params.update_params(xml)
1175
1176 def params_register_app(self, xml, security_limit=C.NO_SECURITY_LIMIT, app=""):
1177 return self.params.params_register_app(xml, security_limit, app)
1178
1179 def set_default(self, name, category, callback, errback=None):
1180 return self.params.set_default(name, category, callback, errback)
1181
1182 ## Private Data ##
1183
1184 def _private_data_set(self, namespace, key, data_s, profile_key):
1185 client = self.host.get_client(profile_key)
1186 # we accept any type
1187 data = data_format.deserialise(data_s, type_check=None)
1188 return defer.ensureDeferred(self.storage.set_private_value(
1189 namespace, key, data, binary=True, profile=client.profile))
1190
1191 def _private_data_get(self, namespace, key, profile_key):
1192 client = self.host.get_client(profile_key)
1193 d = defer.ensureDeferred(
1194 self.storage.get_privates(
1195 namespace, [key], binary=True, profile=client.profile)
1196 )
1197 d.addCallback(lambda data_dict: data_format.serialise(data_dict.get(key)))
1198 return d
1199
1200 def _private_data_delete(self, namespace, key, profile_key):
1201 client = self.host.get_client(profile_key)
1202 return defer.ensureDeferred(self.storage.del_private_value(
1203 namespace, key, binary=True, profile=client.profile))
1204
1205 ## Files ##
1206
1207 def check_file_permission(
1208 self,
1209 file_data: dict,
1210 peer_jid: Optional[jid.JID],
1211 perms_to_check: Optional[Tuple[str]],
1212 set_affiliation: bool = False
1213 ) -> None:
1214 """Check that an entity has the right permission on a file
1215
1216 @param file_data: data of one file, as returned by get_files
1217 @param peer_jid: entity trying to access the file
1218 @param perms_to_check: permissions to check
1219 tuple of C.ACCESS_PERM_*
1220 @param check_parents: if True, also check all parents until root node
1221 @parma set_affiliation: if True, "affiliation" metadata will be set
1222 @raise exceptions.PermissionError: peer_jid doesn't have all permission
1223 in perms_to_check for file_data
1224 @raise exceptions.InternalError: perms_to_check is invalid
1225 """
1226 # TODO: knowing if user is owner is not enough, we need to check permission
1227 # to see if user can modify/delete files, and set corresponding affiliation (publisher, member)
1228 if peer_jid is None and perms_to_check is None:
1229 return
1230 peer_jid = peer_jid.userhostJID()
1231 if peer_jid == file_data["owner"]:
1232 if set_affiliation:
1233 file_data['affiliation'] = 'owner'
1234 # the owner has all rights, nothing to check
1235 return
1236 if not C.ACCESS_PERMS.issuperset(perms_to_check):
1237 raise exceptions.InternalError(_("invalid permission"))
1238
1239 for perm in perms_to_check:
1240 # we check each perm and raise PermissionError as soon as one condition is not valid
1241 # we must never return here, we only return after the loop if nothing was blocking the access
1242 try:
1243 perm_data = file_data["access"][perm]
1244 perm_type = perm_data["type"]
1245 except KeyError:
1246 # No permission is set.
1247 # If we are in a root file/directory, we deny access
1248 # otherwise, we use public permission, as the parent directory will
1249 # block anyway, this avoid to have to recursively change permissions for
1250 # all sub directories/files when modifying a permission
1251 if not file_data.get('parent'):
1252 raise exceptions.PermissionError()
1253 else:
1254 perm_type = C.ACCESS_TYPE_PUBLIC
1255 if perm_type == C.ACCESS_TYPE_PUBLIC:
1256 continue
1257 elif perm_type == C.ACCESS_TYPE_WHITELIST:
1258 try:
1259 jids = perm_data["jids"]
1260 except KeyError:
1261 raise exceptions.PermissionError()
1262 if peer_jid.full() in jids:
1263 continue
1264 else:
1265 raise exceptions.PermissionError()
1266 else:
1267 raise exceptions.InternalError(
1268 _("unknown access type: {type}").format(type=perm_type)
1269 )
1270
1271 async def check_permission_to_root(self, client, file_data, peer_jid, perms_to_check):
1272 """do check_file_permission on file_data and all its parents until root"""
1273 current = file_data
1274 while True:
1275 self.check_file_permission(current, peer_jid, perms_to_check)
1276 parent = current["parent"]
1277 if not parent:
1278 break
1279 files_data = await self.get_files(
1280 client, peer_jid=None, file_id=parent, perms_to_check=None
1281 )
1282 try:
1283 current = files_data[0]
1284 except IndexError:
1285 raise exceptions.DataError("Missing parent")
1286
1287 async def _get_parent_dir(
1288 self, client, path, parent, namespace, owner, peer_jid, perms_to_check
1289 ):
1290 """Retrieve parent node from a path, or last existing directory
1291
1292 each directory of the path will be retrieved, until the last existing one
1293 @return (tuple[unicode, list[unicode])): parent, remaining path elements:
1294 - parent is the id of the last retrieved directory (or u'' for root)
1295 - remaining path elements are the directories which have not been retrieved
1296 (i.e. which don't exist)
1297 """
1298 # if path is set, we have to retrieve parent directory of the file(s) from it
1299 if parent is not None:
1300 raise exceptions.ConflictError(
1301 _("You can't use path and parent at the same time")
1302 )
1303 path_elts = [_f for _f in path.split("/") if _f]
1304 if {"..", "."}.intersection(path_elts):
1305 raise ValueError(_('".." or "." can\'t be used in path'))
1306
1307 # we retrieve all directories from path until we get the parent container
1308 # non existing directories will be created
1309 parent = ""
1310 for idx, path_elt in enumerate(path_elts):
1311 directories = await self.storage.get_files(
1312 client,
1313 parent=parent,
1314 type_=C.FILE_TYPE_DIRECTORY,
1315 name=path_elt,
1316 namespace=namespace,
1317 owner=owner,
1318 )
1319 if not directories:
1320 return (parent, path_elts[idx:])
1321 # from this point, directories don't exist anymore, we have to create them
1322 elif len(directories) > 1:
1323 raise exceptions.InternalError(
1324 _("Several directories found, this should not happen")
1325 )
1326 else:
1327 directory = directories[0]
1328 self.check_file_permission(directory, peer_jid, perms_to_check)
1329 parent = directory["id"]
1330 return (parent, [])
1331
1332 def get_file_affiliations(self, file_data: dict) -> Dict[jid.JID, str]:
1333 """Convert file access to pubsub like affiliations"""
1334 affiliations = {}
1335 access_data = file_data['access']
1336
1337 read_data = access_data.get(C.ACCESS_PERM_READ, {})
1338 if read_data.get('type') == C.ACCESS_TYPE_WHITELIST:
1339 for entity_jid_s in read_data['jids']:
1340 entity_jid = jid.JID(entity_jid_s)
1341 affiliations[entity_jid] = 'member'
1342
1343 write_data = access_data.get(C.ACCESS_PERM_WRITE, {})
1344 if write_data.get('type') == C.ACCESS_TYPE_WHITELIST:
1345 for entity_jid_s in write_data['jids']:
1346 entity_jid = jid.JID(entity_jid_s)
1347 affiliations[entity_jid] = 'publisher'
1348
1349 owner = file_data.get('owner')
1350 if owner:
1351 affiliations[owner] = 'owner'
1352
1353 return affiliations
1354
1355 def _set_file_affiliations_update(
1356 self,
1357 access: dict,
1358 file_data: dict,
1359 affiliations: Dict[jid.JID, str]
1360 ) -> None:
1361 read_data = access.setdefault(C.ACCESS_PERM_READ, {})
1362 if read_data.get('type') != C.ACCESS_TYPE_WHITELIST:
1363 read_data['type'] = C.ACCESS_TYPE_WHITELIST
1364 if 'jids' not in read_data:
1365 read_data['jids'] = []
1366 read_whitelist = read_data['jids']
1367 write_data = access.setdefault(C.ACCESS_PERM_WRITE, {})
1368 if write_data.get('type') != C.ACCESS_TYPE_WHITELIST:
1369 write_data['type'] = C.ACCESS_TYPE_WHITELIST
1370 if 'jids' not in write_data:
1371 write_data['jids'] = []
1372 write_whitelist = write_data['jids']
1373 for entity_jid, affiliation in affiliations.items():
1374 entity_jid_s = entity_jid.full()
1375 if affiliation == "none":
1376 try:
1377 read_whitelist.remove(entity_jid_s)
1378 except ValueError:
1379 log.warning(
1380 "removing affiliation from an entity without read permission: "
1381 f"{entity_jid}"
1382 )
1383 try:
1384 write_whitelist.remove(entity_jid_s)
1385 except ValueError:
1386 pass
1387 elif affiliation == "publisher":
1388 if entity_jid_s not in read_whitelist:
1389 read_whitelist.append(entity_jid_s)
1390 if entity_jid_s not in write_whitelist:
1391 write_whitelist.append(entity_jid_s)
1392 elif affiliation == "member":
1393 if entity_jid_s not in read_whitelist:
1394 read_whitelist.append(entity_jid_s)
1395 try:
1396 write_whitelist.remove(entity_jid_s)
1397 except ValueError:
1398 pass
1399 elif affiliation == "owner":
1400 raise NotImplementedError('"owner" affiliation can\'t be set')
1401 else:
1402 raise ValueError(f"unknown affiliation: {affiliation!r}")
1403
1404 async def set_file_affiliations(
1405 self,
1406 client,
1407 file_data: dict,
1408 affiliations: Dict[jid.JID, str]
1409 ) -> None:
1410 """Apply pubsub like affiliation to file_data
1411
1412 Affiliations are converted to access types, then set in a whitelist.
1413 Affiliation are mapped as follow:
1414 - "owner" can't be set (for now)
1415 - "publisher" gives read and write permissions
1416 - "member" gives read permission only
1417 - "none" removes both read and write permissions
1418 """
1419 file_id = file_data['id']
1420 await self.file_update(
1421 file_id,
1422 'access',
1423 update_cb=partial(
1424 self._set_file_affiliations_update,
1425 file_data=file_data,
1426 affiliations=affiliations
1427 ),
1428 )
1429
1430 def _set_file_access_model_update(
1431 self,
1432 access: dict,
1433 file_data: dict,
1434 access_model: str
1435 ) -> None:
1436 read_data = access.setdefault(C.ACCESS_PERM_READ, {})
1437 if access_model == "open":
1438 requested_type = C.ACCESS_TYPE_PUBLIC
1439 elif access_model == "whitelist":
1440 requested_type = C.ACCESS_TYPE_WHITELIST
1441 else:
1442 raise ValueError(f"unknown access model: {access_model}")
1443
1444 read_data['type'] = requested_type
1445 if requested_type == C.ACCESS_TYPE_WHITELIST and 'jids' not in read_data:
1446 read_data['jids'] = []
1447
1448 async def set_file_access_model(
1449 self,
1450 client,
1451 file_data: dict,
1452 access_model: str,
1453 ) -> None:
1454 """Apply pubsub like access_model to file_data
1455
1456 Only 2 access models are supported so far:
1457 - "open": set public access to file/dir
1458 - "whitelist": set whitelist to file/dir
1459 """
1460 file_id = file_data['id']
1461 await self.file_update(
1462 file_id,
1463 'access',
1464 update_cb=partial(
1465 self._set_file_access_model_update,
1466 file_data=file_data,
1467 access_model=access_model
1468 ),
1469 )
1470
1471 def get_files_owner(
1472 self,
1473 client,
1474 owner: Optional[jid.JID],
1475 peer_jid: Optional[jid.JID],
1476 file_id: Optional[str] = None,
1477 parent: Optional[str] = None
1478 ) -> jid.JID:
1479 """Get owner to use for a file operation
1480
1481 if owner is not explicitely set, a suitable one will be used (client.jid for
1482 clients, peer_jid for components).
1483 @raise exception.InternalError: we are one a component, and neither owner nor
1484 peer_jid are set
1485 """
1486 if owner is not None:
1487 return owner.userhostJID()
1488 if client is None:
1489 # client may be None when looking for file with public_id
1490 return None
1491 if file_id or parent:
1492 # owner has already been filtered on parent file
1493 return None
1494 if not client.is_component:
1495 return client.jid.userhostJID()
1496 if peer_jid is None:
1497 raise exceptions.InternalError(
1498 "Owner must be set for component if peer_jid is None"
1499 )
1500 return peer_jid.userhostJID()
1501
1502 async def get_files(
1503 self, client, peer_jid, file_id=None, version=None, parent=None, path=None,
1504 type_=None, file_hash=None, hash_algo=None, name=None, namespace=None,
1505 mime_type=None, public_id=None, owner=None, access=None, projection=None,
1506 unique=False, perms_to_check=(C.ACCESS_PERM_READ,)):
1507 """Retrieve files with with given filters
1508
1509 @param peer_jid(jid.JID, None): jid trying to access the file
1510 needed to check permission.
1511 Use None to ignore permission (perms_to_check must be None too)
1512 @param file_id(unicode, None): id of the file
1513 None to ignore
1514 @param version(unicode, None): version of the file
1515 None to ignore
1516 empty string to look for current version
1517 @param parent(unicode, None): id of the directory containing the files
1518 None to ignore
1519 empty string to look for root files/directories
1520 @param path(Path, unicode, None): path to the directory containing the files
1521 @param type_(unicode, None): type of file filter, can be one of C.FILE_TYPE_*
1522 @param file_hash(unicode, None): hash of the file to retrieve
1523 @param hash_algo(unicode, None): algorithm use for file_hash
1524 @param name(unicode, None): name of the file to retrieve
1525 @param namespace(unicode, None): namespace of the files to retrieve
1526 @param mime_type(unicode, None): filter on this mime type
1527 @param public_id(unicode, None): filter on this public id
1528 @param owner(jid.JID, None): if not None, only get files from this owner
1529 @param access(dict, None): get file with given access (see [set_file])
1530 @param projection(list[unicode], None): name of columns to retrieve
1531 None to retrieve all
1532 @param unique(bool): if True will remove duplicates
1533 @param perms_to_check(tuple[unicode],None): permission to check
1534 must be a tuple of C.ACCESS_PERM_* or None
1535 if None, permission will no be checked (peer_jid must be None too in this
1536 case)
1537 other params are the same as for [set_file]
1538 @return (list[dict]): files corresponding to filters
1539 @raise exceptions.NotFound: parent directory not found (when path is specified)
1540 @raise exceptions.PermissionError: peer_jid can't use perms_to_check for one of
1541 the file
1542 on the path
1543 """
1544 if peer_jid is None and perms_to_check or perms_to_check is None and peer_jid:
1545 raise exceptions.InternalError(
1546 "if you want to disable permission check, both peer_jid and "
1547 "perms_to_check must be None"
1548 )
1549 owner = self.get_files_owner(client, owner, peer_jid, file_id, parent)
1550 if path is not None:
1551 path = str(path)
1552 # permission are checked by _get_parent_dir
1553 parent, remaining_path_elts = await self._get_parent_dir(
1554 client, path, parent, namespace, owner, peer_jid, perms_to_check
1555 )
1556 if remaining_path_elts:
1557 # if we have remaining path elements,
1558 # the parent directory is not found
1559 raise failure.Failure(exceptions.NotFound())
1560 if parent and peer_jid:
1561 # if parent is given directly and permission check is requested,
1562 # we need to check all the parents
1563 parent_data = await self.storage.get_files(client, file_id=parent)
1564 try:
1565 parent_data = parent_data[0]
1566 except IndexError:
1567 raise exceptions.DataError("mising parent")
1568 await self.check_permission_to_root(
1569 client, parent_data, peer_jid, perms_to_check
1570 )
1571
1572 files = await self.storage.get_files(
1573 client,
1574 file_id=file_id,
1575 version=version,
1576 parent=parent,
1577 type_=type_,
1578 file_hash=file_hash,
1579 hash_algo=hash_algo,
1580 name=name,
1581 namespace=namespace,
1582 mime_type=mime_type,
1583 public_id=public_id,
1584 owner=owner,
1585 access=access,
1586 projection=projection,
1587 unique=unique,
1588 )
1589
1590 if peer_jid:
1591 # if permission are checked, we must remove all file that user can't access
1592 to_remove = []
1593 for file_data in files:
1594 try:
1595 self.check_file_permission(
1596 file_data, peer_jid, perms_to_check, set_affiliation=True
1597 )
1598 except exceptions.PermissionError:
1599 to_remove.append(file_data)
1600 for file_data in to_remove:
1601 files.remove(file_data)
1602 return files
1603
1604 async def set_file(
1605 self, client, name, file_id=None, version="", parent=None, path=None,
1606 type_=C.FILE_TYPE_FILE, file_hash=None, hash_algo=None, size=None,
1607 namespace=None, mime_type=None, public_id=None, created=None, modified=None,
1608 owner=None, access=None, extra=None, peer_jid=None,
1609 perms_to_check=(C.ACCESS_PERM_WRITE,)
1610 ):
1611 """Set a file metadata
1612
1613 @param name(unicode): basename of the file
1614 @param file_id(unicode): unique id of the file
1615 @param version(unicode): version of this file
1616 empty string for current version or when there is no versioning
1617 @param parent(unicode, None): id of the directory containing the files
1618 @param path(unicode, None): virtual path of the file in the namespace
1619 if set, parent must be None. All intermediate directories will be created
1620 if needed, using current access.
1621 @param type_(str, None): type of file filter, can be one of C.FILE_TYPE_*
1622 @param file_hash(unicode): unique hash of the payload
1623 @param hash_algo(unicode): algorithm used for hashing the file (usually sha-256)
1624 @param size(int): size in bytes
1625 @param namespace(unicode, None): identifier (human readable is better) to group
1626 files
1627 For instance, namespace could be used to group files in a specific photo album
1628 @param mime_type(unicode): MIME type of the file, or None if not known/guessed
1629 @param public_id(unicode): id used to share publicly the file via HTTP
1630 @param created(int): UNIX time of creation
1631 @param modified(int,None): UNIX time of last modification, or None to use
1632 created date
1633 @param owner(jid.JID, None): jid of the owner of the file (mainly useful for
1634 component)
1635 will be used to check permission (only bare jid is used, don't use with MUC).
1636 Use None to ignore permission (perms_to_check must be None too)
1637 @param access(dict, None): serialisable dictionary with access rules.
1638 None (or empty dict) to use private access, i.e. allow only profile's jid to
1639 access the file
1640 key can be on on C.ACCESS_PERM_*,
1641 then a sub dictionary with a type key is used (one of C.ACCESS_TYPE_*).
1642 According to type, extra keys can be used:
1643 - C.ACCESS_TYPE_PUBLIC: the permission is granted for everybody
1644 - C.ACCESS_TYPE_WHITELIST: the permission is granted for jids (as unicode)
1645 in the 'jids' key
1646 will be encoded to json in database
1647 @param extra(dict, None): serialisable dictionary of any extra data
1648 will be encoded to json in database
1649 @param perms_to_check(tuple[unicode],None): permission to check
1650 must be a tuple of C.ACCESS_PERM_* or None
1651 if None, permission will not be checked (peer_jid must be None too in this
1652 case)
1653 @param profile(unicode): profile owning the file
1654 """
1655 if "/" in name:
1656 raise ValueError('name must not contain a slash ("/")')
1657 if file_id is None:
1658 file_id = shortuuid.uuid()
1659 if (
1660 file_hash is not None
1661 and hash_algo is None
1662 or hash_algo is not None
1663 and file_hash is None
1664 ):
1665 raise ValueError("file_hash and hash_algo must be set at the same time")
1666 if mime_type is None:
1667 mime_type, __ = mimetypes.guess_type(name)
1668 else:
1669 mime_type = mime_type.lower()
1670 if public_id is not None:
1671 assert len(public_id)>0
1672 if created is None:
1673 created = time.time()
1674 if namespace is not None:
1675 namespace = namespace.strip() or None
1676 if type_ == C.FILE_TYPE_DIRECTORY:
1677 if any((version, file_hash, size, mime_type)):
1678 raise ValueError(
1679 "version, file_hash, size and mime_type can't be set for a directory"
1680 )
1681 owner = self.get_files_owner(client, owner, peer_jid, file_id, parent)
1682
1683 if path is not None:
1684 path = str(path)
1685 # _get_parent_dir will check permissions if peer_jid is set, so we use owner
1686 parent, remaining_path_elts = await self._get_parent_dir(
1687 client, path, parent, namespace, owner, owner, perms_to_check
1688 )
1689 # if remaining directories don't exist, we have to create them
1690 for new_dir in remaining_path_elts:
1691 new_dir_id = shortuuid.uuid()
1692 await self.storage.set_file(
1693 client,
1694 name=new_dir,
1695 file_id=new_dir_id,
1696 version="",
1697 parent=parent,
1698 type_=C.FILE_TYPE_DIRECTORY,
1699 namespace=namespace,
1700 created=time.time(),
1701 owner=owner,
1702 access=access,
1703 extra={},
1704 )
1705 parent = new_dir_id
1706 elif parent is None:
1707 parent = ""
1708
1709 await self.storage.set_file(
1710 client,
1711 file_id=file_id,
1712 version=version,
1713 parent=parent,
1714 type_=type_,
1715 file_hash=file_hash,
1716 hash_algo=hash_algo,
1717 name=name,
1718 size=size,
1719 namespace=namespace,
1720 mime_type=mime_type,
1721 public_id=public_id,
1722 created=created,
1723 modified=modified,
1724 owner=owner,
1725 access=access,
1726 extra=extra,
1727 )
1728
1729 async def file_get_used_space(
1730 self,
1731 client,
1732 peer_jid: jid.JID,
1733 owner: Optional[jid.JID] = None
1734 ) -> int:
1735 """Get space taken by all files owned by an entity
1736
1737 @param peer_jid: entity requesting the size
1738 @param owner: entity owning the file to check. If None, will be determined by
1739 get_files_owner
1740 @return: size of total space used by files of this owner
1741 """
1742 owner = self.get_files_owner(client, owner, peer_jid)
1743 if peer_jid.userhostJID() != owner and client.profile not in self.admins:
1744 raise exceptions.PermissionError("You are not allowed to check this size")
1745 return await self.storage.file_get_used_space(client, owner)
1746
1747 def file_update(self, file_id, column, update_cb):
1748 """Update a file column taking care of race condition
1749
1750 access is NOT checked in this method, it must be checked beforehand
1751 @param file_id(unicode): id of the file to update
1752 @param column(unicode): one of "access" or "extra"
1753 @param update_cb(callable): method to update the value of the colum
1754 the method will take older value as argument, and must update it in place
1755 Note that the callable must be thread-safe
1756 """
1757 return self.storage.file_update(file_id, column, update_cb)
1758
1759 @defer.inlineCallbacks
1760 def _delete_file(
1761 self,
1762 client,
1763 peer_jid: jid.JID,
1764 recursive: bool,
1765 files_path: Path,
1766 file_data: dict
1767 ):
1768 """Internal method to delete files/directories recursively
1769
1770 @param peer_jid(jid.JID): entity requesting the deletion (must be owner of files
1771 to delete)
1772 @param recursive(boolean): True if recursive deletion is needed
1773 @param files_path(unicode): path of the directory containing the actual files
1774 @param file_data(dict): data of the file to delete
1775 """
1776 if file_data['owner'] != peer_jid:
1777 raise exceptions.PermissionError(
1778 "file {file_name} can't be deleted, {peer_jid} is not the owner"
1779 .format(file_name=file_data['name'], peer_jid=peer_jid.full()))
1780 if file_data['type'] == C.FILE_TYPE_DIRECTORY:
1781 sub_files = yield self.get_files(client, peer_jid, parent=file_data['id'])
1782 if sub_files and not recursive:
1783 raise exceptions.DataError(_("Can't delete directory, it is not empty"))
1784 # we first delete the sub-files
1785 for sub_file_data in sub_files:
1786 if sub_file_data['type'] == C.FILE_TYPE_DIRECTORY:
1787 sub_file_path = files_path / sub_file_data['name']
1788 else:
1789 sub_file_path = files_path
1790 yield self._delete_file(
1791 client, peer_jid, recursive, sub_file_path, sub_file_data)
1792 # then the directory itself
1793 yield self.storage.file_delete(file_data['id'])
1794 elif file_data['type'] == C.FILE_TYPE_FILE:
1795 log.info(_("deleting file {name} with hash {file_hash}").format(
1796 name=file_data['name'], file_hash=file_data['file_hash']))
1797 yield self.storage.file_delete(file_data['id'])
1798 references = yield self.get_files(
1799 client, peer_jid, file_hash=file_data['file_hash'])
1800 if references:
1801 log.debug("there are still references to the file, we keep it")
1802 else:
1803 file_path = os.path.join(files_path, file_data['file_hash'])
1804 log.info(_("no reference left to {file_path}, deleting").format(
1805 file_path=file_path))
1806 try:
1807 os.unlink(file_path)
1808 except FileNotFoundError:
1809 log.error(f"file at {file_path!r} doesn't exist but it was referenced in files database")
1810 else:
1811 raise exceptions.InternalError('Unexpected file type: {file_type}'
1812 .format(file_type=file_data['type']))
1813
1814 async def file_delete(self, client, peer_jid, file_id, recursive=False):
1815 """Delete a single file or a directory and all its sub-files
1816
1817 @param file_id(unicode): id of the file to delete
1818 @param peer_jid(jid.JID): entity requesting the deletion,
1819 must be owner of all files to delete
1820 @param recursive(boolean): must be True to delete a directory and all sub-files
1821 """
1822 # FIXME: we only allow owner of file to delete files for now, but WRITE access
1823 # should be checked too
1824 files_data = await self.get_files(client, peer_jid, file_id)
1825 if not files_data:
1826 raise exceptions.NotFound("Can't find the file with id {file_id}".format(
1827 file_id=file_id))
1828 file_data = files_data[0]
1829 if file_data["type"] != C.FILE_TYPE_DIRECTORY and recursive:
1830 raise ValueError("recursive can only be set for directories")
1831 files_path = self.host.get_local_path(None, C.FILES_DIR)
1832 await self._delete_file(client, peer_jid, recursive, files_path, file_data)
1833
1834 ## Cache ##
1835
1836 def get_cache_path(self, namespace: str, *args: str) -> Path:
1837 """Get path to use to get a common path for a namespace
1838
1839 This can be used by plugins to manage permanent data. It's the responsability
1840 of plugins to clean this directory from unused data.
1841 @param namespace: unique namespace to use
1842 @param args: extra identifier which will be added to the path
1843 """
1844 namespace = namespace.strip().lower()
1845 return Path(
1846 self._cache_path,
1847 regex.path_escape(namespace),
1848 *(regex.path_escape(a) for a in args)
1849 )
1850
1851 ## Misc ##
1852
1853 def is_entity_available(self, client, entity_jid):
1854 """Tell from the presence information if the given entity is available.
1855
1856 @param entity_jid (JID): the entity to check (if bare jid is used, all resources are tested)
1857 @return (bool): True if entity is available
1858 """
1859 if not entity_jid.resource:
1860 return bool(
1861 self.get_available_resources(client, entity_jid)
1862 ) # is any resource is available, entity is available
1863 try:
1864 presence_data = self.get_entity_datum(client, entity_jid, "presence")
1865 except KeyError:
1866 log.debug("No presence information for {}".format(entity_jid))
1867 return False
1868 return presence_data.show != C.PRESENCE_UNAVAILABLE
1869
1870 def is_admin(self, profile: str) -> bool:
1871 """Tell if given profile has administrator privileges"""
1872 return profile in self.admins
1873
1874 def is_admin_jid(self, entity: jid.JID) -> bool:
1875 """Tells if an entity jid correspond to an admin one
1876
1877 It is sometime not possible to use the profile alone to check if an entity is an
1878 admin (e.g. a request managed by a component). In this case we check if the JID
1879 correspond to an admin profile
1880 """
1881 return entity.userhostJID() in self.admin_jids