view libervia/backend/memory/memory.py @ 4163:3b3cd9453d9b

plugin XEP-0308: implement Last Message Correction
author Goffi <goffi@goffi.org>
date Tue, 28 Nov 2023 17:38:31 +0100
parents 54b8cf8c8daf
children 730f542e4ad0
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from collections import namedtuple
import copy
from dataclasses import dataclass
from functools import partial
import mimetypes
import os.path
from pathlib import Path
import time
from typing import Dict, Optional, Tuple
from uuid import uuid4

import shortuuid
from twisted.internet import defer, error, reactor
from twisted.python import failure
from twisted.words.protocols.jabber import jid

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.memory.crypto import BlockCipher
from libervia.backend.memory.crypto import PasswordHasher
from libervia.backend.memory.disco import Discovery
from libervia.backend.memory.params import Params
from libervia.backend.memory.persistent import PersistentDict
from libervia.backend.memory.sqla import (
    Notification,
    NotificationPriority,
    NotificationStatus,
    NotificationType,
    Storage,
)
from libervia.backend.tools import config as tools_config
from libervia.backend.tools.common import data_format
from libervia.backend.tools.common import regex


log = getLogger(__name__)


PresenceTuple = namedtuple("PresenceTuple", ("show", "priority", "statuses"))
MSG_NO_SESSION = "Session id doesn't exist or is finished"


class Sessions(object):
    """Sessions are data associated to key used for a temporary moment, with optional profile checking."""

    DEFAULT_TIMEOUT = 600

    def __init__(self, timeout=None, resettable_timeout=True):
        """
        @param timeout (int): nb of seconds before session destruction
        @param resettable_timeout (bool): if True, the timeout is reset on each access
        """
        self._sessions = dict()
        self.timeout = timeout or Sessions.DEFAULT_TIMEOUT
        self.resettable_timeout = resettable_timeout

    def new_session(self, session_data=None, session_id=None, profile=None):
        """Create a new session

        @param session_data: mutable data to use, default to a dict
        @param session_id (str): force the session_id to the given string
        @param profile: if set, the session is owned by the profile,
                        and profile_get must be used instead of __getitem__
        @return: session_id, session_data
        """
        if session_id is None:
            session_id = str(uuid4())
        elif session_id in self._sessions:
            raise exceptions.ConflictError(
                "Session id {} is already used".format(session_id)
            )
        timer = reactor.callLater(self.timeout, self._purge_session, session_id)
        if session_data is None:
            session_data = {}
        self._sessions[session_id] = (
            (timer, session_data) if profile is None else (timer, session_data, profile)
        )
        return session_id, session_data

    def _purge_session(self, session_id):
        try:
            timer, session_data, profile = self._sessions[session_id]
        except ValueError:
            timer, session_data = self._sessions[session_id]
            profile = None
        try:
            timer.cancel()
        except error.AlreadyCalled:
            # if the session is time-outed, the timer has been called
            pass
        del self._sessions[session_id]
        log.debug(
            "Session {} purged{}".format(
                session_id,
                " (profile {})".format(profile) if profile is not None else "",
            )
        )

    def __len__(self):
        return len(self._sessions)

    def __contains__(self, session_id):
        return session_id in self._sessions

    def profile_get(self, session_id, profile):
        try:
            timer, session_data, profile_set = self._sessions[session_id]
        except ValueError:
            raise exceptions.InternalError(
                "You need to use __getitem__ when profile is not set"
            )
        except KeyError:
            raise failure.Failure(KeyError(MSG_NO_SESSION))
        if profile_set != profile:
            raise exceptions.InternalError("current profile differ from set profile !")
        if self.resettable_timeout:
            timer.reset(self.timeout)
        return session_data

    def __getitem__(self, session_id):
        try:
            timer, session_data = self._sessions[session_id]
        except ValueError:
            raise exceptions.InternalError(
                "You need to use profile_get instead of __getitem__ when profile is set"
            )
        except KeyError:
            raise failure.Failure(KeyError(MSG_NO_SESSION))
        if self.resettable_timeout:
            timer.reset(self.timeout)
        return session_data

    def __setitem__(self, key, value):
        raise NotImplementedError("You need do use new_session to create a session")

    def __delitem__(self, session_id):
        """ delete the session data """
        self._purge_session(session_id)

    def keys(self):
        return list(self._sessions.keys())

    def iterkeys(self):
        return iter(self._sessions.keys())


class ProfileSessions(Sessions):
    """ProfileSessions extends the Sessions class, but here the profile can be
    used as the key to retrieve data or delete a session (instead of session id).
    """

    def _profile_get_all_ids(self, profile):
        """Return a list of the sessions ids that are associated to the given profile.

        @param profile: %(doc_profile)s
        @return: a list containing the sessions ids
        """
        ret = []
        for session_id in self._sessions.keys():
            try:
                timer, session_data, profile_set = self._sessions[session_id]
            except ValueError:
                continue
            if profile == profile_set:
                ret.append(session_id)
        return ret

    def profile_get_unique(self, profile):
        """Return the data of the unique session that is associated to the given profile.

        @param profile: %(doc_profile)s
        @return:
            - mutable data (default: dict) of the unique session
            - None if no session is associated to the profile
            - raise an error if more than one session are found
        """
        ids = self._profile_get_all_ids(profile)
        if len(ids) > 1:
            raise exceptions.InternalError(
                "profile_get_unique has been used but more than one session has been found!"
            )
        return (
            self.profile_get(ids[0], profile) if len(ids) == 1 else None
        )  # XXX: timeout might be reset

    def profile_del_unique(self, profile):
        """Delete the unique session that is associated to the given profile.

        @param profile: %(doc_profile)s
        @return: None, but raise an error if more than one session are found
        """
        ids = self._profile_get_all_ids(profile)
        if len(ids) > 1:
            raise exceptions.InternalError(
                "profile_del_unique has been used but more than one session has been found!"
            )
        if len(ids) == 1:
            del self._sessions[ids[0]]


class PasswordSessions(ProfileSessions):

    # FIXME: temporary hack for the user personal key not to be lost. The session
    # must actually be purged and later, when the personal key is needed, the
    # profile password should be asked again in order to decrypt it.
    def __init__(self, timeout=None):
        ProfileSessions.__init__(self, timeout, resettable_timeout=False)

    def _purge_session(self, session_id):
        log.debug(
            "FIXME: PasswordSessions should ask for the profile password after the session expired"
        )


class Memory:
    """This class manage all the persistent information"""

    def __init__(self, host):
        log.info(_("Memory manager init"))
        self.host = host
        self._entities_cache = {}  # XXX: keep presence/last resource/other data in cache
        #     /!\ an entity is not necessarily in roster
        #     main key is bare jid, value is a dict
        #     where main key is resource, or None for bare jid
        self._key_signals = set()  # key which need a signal to frontends when updated
        self.subscriptions = {}
        self.auth_sessions = PasswordSessions()  # remember the authenticated profiles
        self.disco = Discovery(host)
        self.config = tools_config.parse_main_conf(log_filenames=True)
        self._cache_path = Path(self.config_get("", "local_dir"), C.CACHE_DIR)
        self.admins = self.config_get("", "admins_list", [])
        self.admin_jids = set()


    async def initialise(self):
        self.storage = Storage()
        await self.storage.initialise()
        PersistentDict.storage = self.storage
        self.params = Params(self.host, self.storage)
        log.info(_("Loading default params template"))
        self.params.load_default_params()
        await self.load()
        self.memory_data = PersistentDict("memory")
        await self.memory_data.load()
        await self.disco.load()
        for admin in self.admins:
            try:
                admin_jid_s = await self.param_get_a_async(
                    "JabberID", "Connection", profile_key=admin
                )
            except Exception as e:
                log.warning(f"Can't retrieve jid of admin {admin!r}: {e}")
            else:
                if admin_jid_s is not None:
                    try:
                        admin_jid = jid.JID(admin_jid_s).userhostJID()
                    except RuntimeError:
                        log.warning(f"Invalid JID for admin {admin}: {admin_jid_s}")
                    else:
                        self.admin_jids.add(admin_jid)


    ## Configuration ##

    def config_get(self, section, name, default=None):
        """Get the main configuration option

        @param section: section of the config file (None or '' for DEFAULT)
        @param name: name of the option
        @param default: value to use if not found
        @return: str, list or dict
        """
        return tools_config.config_get(self.config, section, name, default)

    def load_xml(self, filename):
        """Load parameters template from xml file

        @param filename (str): input file
        @return: bool: True in case of success
        """
        if not filename:
            return False
        filename = os.path.expanduser(filename)
        if os.path.exists(filename):
            try:
                self.params.load_xml(filename)
                log.debug(_("Parameters loaded from file: %s") % filename)
                return True
            except Exception as e:
                log.error(_("Can't load parameters from file: %s") % e)
        return False

    def save_xml(self, filename):
        """Save parameters template to xml file

        @param filename (str): output file
        @return: bool: True in case of success
        """
        if not filename:
            return False
        # TODO: need to encrypt files (at least passwords !) and set permissions
        filename = os.path.expanduser(filename)
        try:
            self.params.save_xml(filename)
            log.debug(_("Parameters saved to file: %s") % filename)
            return True
        except Exception as e:
            log.error(_("Can't save parameters to file: %s") % e)
        return False

    def load(self):
        """Load parameters and all memory things from db"""
        # parameters data
        return self.params.load_gen_params()

    def load_individual_params(self, profile):
        """Load individual parameters for a profile
        @param profile: %(doc_profile)s"""
        return self.params.load_ind_params(profile)

    ## Profiles/Sessions management ##

    def start_session(self, password, profile):
        """"Iniatialise session for a profile

        @param password(unicode): profile session password
            or empty string is no password is set
        @param profile: %(doc_profile)s
        @raise exceptions.ProfileUnknownError if profile doesn't exists
        @raise exceptions.PasswordError: the password does not match
        """
        profile = self.get_profile_name(profile)

        def create_session(__):
            """Called once params are loaded."""
            self._entities_cache[profile] = {}
            log.info("[{}] Profile session started".format(profile))
            return False

        def backend_initialised(__):
            def do_start_session(__=None):
                if self.is_session_started(profile):
                    log.info("Session already started!")
                    return True
                try:
                    # if there is a value at this point in self._entities_cache,
                    # it is the load_individual_params Deferred, the session is starting
                    session_d = self._entities_cache[profile]
                except KeyError:
                    # else we do request the params
                    session_d = self._entities_cache[profile] = self.load_individual_params(
                        profile
                    )
                    session_d.addCallback(create_session)
                finally:
                    return session_d

            auth_d = defer.ensureDeferred(self.profile_authenticate(password, profile))
            auth_d.addCallback(do_start_session)
            return auth_d

        if self.host.initialised.called:
            return defer.succeed(None).addCallback(backend_initialised)
        else:
            return self.host.initialised.addCallback(backend_initialised)

    def stop_session(self, profile):
        """Delete a profile session

        @param profile: %(doc_profile)s
        """
        if self.host.is_connected(profile):
            log.debug("Disconnecting profile because of session stop")
            self.host.disconnect(profile)
        self.auth_sessions.profile_del_unique(profile)
        try:
            self._entities_cache[profile]
        except KeyError:
            log.warning("Profile was not in cache")

    def _is_session_started(self, profile_key):
        return self.is_session_started(self.get_profile_name(profile_key))

    def is_session_started(self, profile):
        try:
            # XXX: if the value in self._entities_cache is a Deferred,
            #      the session is starting but not started yet
            return not isinstance(self._entities_cache[profile], defer.Deferred)
        except KeyError:
            return False

    async def profile_authenticate(self, password, profile):
        """Authenticate the profile.

        @param password (unicode): the SàT profile password
        @return: None in case of success (an exception is raised otherwise)
        @raise exceptions.PasswordError: the password does not match
        """
        if not password and self.auth_sessions.profile_get_unique(profile):
            # XXX: this allows any frontend to connect with the empty password as soon as
            # the profile has been authenticated at least once before. It is OK as long as
            # submitting a form with empty passwords is restricted to local frontends.
            return

        sat_cipher = await self.param_get_a_async(
            C.PROFILE_PASS_PATH[1], C.PROFILE_PASS_PATH[0], profile_key=profile
        )
        valid = PasswordHasher.verify(password, sat_cipher)
        if not valid:
            log.warning(_("Authentication failure of profile {profile}").format(
                profile=profile))
            raise exceptions.PasswordError("The provided profile password doesn't match.")
        return await self.new_auth_session(password, profile)

    async def new_auth_session(self, key, profile):
        """Start a new session for the authenticated profile.

        If there is already an existing session, no new one is created
        The personal key is loaded encrypted from a PersistentDict before being decrypted.

        @param key: the key to decrypt the personal key
        @param profile: %(doc_profile)s
        """
        data = await PersistentDict(C.MEMORY_CRYPTO_NAMESPACE, profile).load()
        personal_key = BlockCipher.decrypt(key, data[C.MEMORY_CRYPTO_KEY])
        # Create the session for this profile and store the personal key
        session_data = self.auth_sessions.profile_get_unique(profile)
        if not session_data:
            self.auth_sessions.new_session(
                {C.MEMORY_CRYPTO_KEY: personal_key}, profile=profile
            )
            log.debug("auth session created for profile %s" % profile)

    def purge_profile_session(self, profile):
        """Delete cache of data of profile
        @param profile: %(doc_profile)s"""
        log.info(_("[%s] Profile session purge" % profile))
        self.params.purge_profile(profile)
        try:
            del self._entities_cache[profile]
        except KeyError:
            log.error(
                _(
                    "Trying to purge roster status cache for a profile not in memory: [%s]"
                )
                % profile
            )

    def get_profiles_list(self, clients=True, components=False):
        """retrieve profiles list

        @param clients(bool): if True return clients profiles
        @param components(bool): if True return components profiles
        @return (list[unicode]): selected profiles
        """
        if not clients and not components:
            log.warning(_("requesting no profiles at all"))
            return []
        profiles = self.storage.get_profiles_list()
        if clients and components:
            return sorted(profiles)
        is_component = self.storage.profile_is_component
        if clients:
            p_filter = lambda p: not is_component(p)
        else:
            p_filter = lambda p: is_component(p)

        return sorted(p for p in profiles if p_filter(p))

    def get_profile_name(self, profile_key, return_profile_keys=False):
        """Return name of profile from keyword

        @param profile_key: can be the profile name or a keyword (like @DEFAULT@)
        @param return_profile_keys: if True, return unmanaged profile keys (like "@ALL@"). This keys must be managed by the caller
        @return: requested profile name
        @raise exceptions.ProfileUnknownError if profile doesn't exists
        """
        return self.params.get_profile_name(profile_key, return_profile_keys)

    def profile_set_default(self, profile):
        """Set default profile

        @param profile: %(doc_profile)s
        """
        # we want to be sure that the profile exists
        profile = self.get_profile_name(profile)

        self.memory_data["Profile_default"] = profile

    def create_profile(self, name, password, component=None):
        """Create a new profile

        @param name(unicode): profile name
        @param password(unicode): profile password
            Can be empty to disable password
        @param component(None, unicode): set to entry point if this is a component
        @return: Deferred
        @raise exceptions.NotFound: component is not a known plugin import name
        """
        if not name:
            raise ValueError("Empty profile name")
        if name[0] == "@":
            raise ValueError("A profile name can't start with a '@'")
        if "\n" in name:
            raise ValueError("A profile name can't contain line feed ('\\n')")

        if name in self._entities_cache:
            raise exceptions.ConflictError("A session for this profile exists")

        if component:
            if not component in self.host.plugins:
                raise exceptions.NotFound(
                    _(
                        "Can't find component {component} entry point".format(
                            component=component
                        )
                    )
                )
            # FIXME: PLUGIN_INFO is not currently accessible after import, but type shoul be tested here
            #  if self.host.plugins[component].PLUGIN_INFO[u"type"] != C.PLUG_TYPE_ENTRY_POINT:
            #      raise ValueError(_(u"Plugin {component} is not an entry point !".format(
            #          component = component)))

        d = self.params.create_profile(name, component)

        def init_personal_key(__):
            # be sure to call this after checking that the profile doesn't exist yet

            # generated once for all and saved in a PersistentDict
            personal_key = BlockCipher.get_random_key(
                base64=True
            ).decode('utf-8')
            self.auth_sessions.new_session(
                {C.MEMORY_CRYPTO_KEY: personal_key}, profile=name
            )  # will be encrypted by param_set

        def start_fake_session(__):
            # avoid ProfileNotConnected exception in param_set
            self._entities_cache[name] = None
            self.params.load_ind_params(name)

        def stop_fake_session(__):
            del self._entities_cache[name]
            self.params.purge_profile(name)

        d.addCallback(init_personal_key)
        d.addCallback(start_fake_session)
        d.addCallback(
            lambda __: self.param_set(
                C.PROFILE_PASS_PATH[1], password, C.PROFILE_PASS_PATH[0], profile_key=name
            )
        )
        d.addCallback(stop_fake_session)
        d.addCallback(lambda __: self.auth_sessions.profile_del_unique(name))
        return d

    def profile_delete_async(self, name, force=False):
        """Delete an existing profile

        @param name: Name of the profile
        @param force: force the deletion even if the profile is connected.
        To be used for direct calls only (not through the bridge).
        @return: a Deferred instance
        """

        def clean_memory(__):
            self.auth_sessions.profile_del_unique(name)
            try:
                del self._entities_cache[name]
            except KeyError:
                pass

        d = self.params.profile_delete_async(name, force)
        d.addCallback(clean_memory)
        return d

    def is_component(self, profile_name):
        """Tell if a profile is a component

        @param profile_name(unicode): name of the profile
        @return (bool): True if profile is a component
        @raise exceptions.NotFound: profile doesn't exist
        """
        return self.storage.profile_is_component(profile_name)

    def get_entry_point(self, profile_name):
        """Get a component entry point

        @param profile_name(unicode): name of the profile
        @return (bool): True if profile is a component
        @raise exceptions.NotFound: profile doesn't exist
        """
        return self.storage.get_entry_point(profile_name)

    ## History ##

    def add_to_history(self, client, data):
        return self.storage.add_to_history(data, client.profile)

    def _history_get_serialise(self, history_data):
        return [
            (uid, timestamp, from_jid, to_jid, message, subject, mess_type,
             data_format.serialise(extra)) for uid, timestamp, from_jid, to_jid, message,
            subject, mess_type, extra in history_data
        ]

    def _history_get(self, from_jid_s, to_jid_s, limit=C.HISTORY_LIMIT_NONE, between=True,
                    filters=None, profile=C.PROF_KEY_NONE):
        from_jid = jid.JID(from_jid_s) if from_jid_s else None
        to_jid = jid.JID(to_jid_s) if to_jid_s else None
        d = self.history_get(
            from_jid, to_jid, limit, between, filters, profile
        )
        d.addCallback(self._history_get_serialise)
        return d

    def history_get(
        self,
        from_jid: jid.JID|None,
        to_jid: jid.JID|None,
        limit: int = C.HISTORY_LIMIT_NONE,
        between: bool = True,
        filters: dict[str, str]|None = None,
        profile: str = C.PROF_KEY_NONE
    ) -> defer.Deferred[list]:
        """Retrieve messages in history

        @param from_jid: source JID (full, or bare for catchall)
        @param to_jid: dest JID (full, or bare for catchall)
        @param limit: maximum number of messages to get:
            - 0 for no message (returns the empty list)
            - C.HISTORY_LIMIT_NONE or None for unlimited
            - C.HISTORY_LIMIT_DEFAULT to use the HISTORY_LIMIT parameter value
        @param between: confound source and dest (ignore the direction)
        @param filters: pattern to filter the history results
            (see bridge API for details)
        @param profile: %(doc_profile)s
        @return: list of message data as in [message_new]
        """
        assert profile != C.PROF_KEY_NONE
        if limit == C.HISTORY_LIMIT_DEFAULT:
            limit = int(self.param_get_a(C.HISTORY_LIMIT, "General", profile_key=profile))
        elif limit == C.HISTORY_LIMIT_NONE:
            limit = None
        if limit == 0:
            return defer.succeed([])
        return self.storage.history_get(from_jid, to_jid, limit, between, filters, profile)

    ## Statuses ##

    def _get_presence_statuses(self, profile_key):
        ret = self.presence_statuses_get(profile_key)
        return {entity.full(): data for entity, data in ret.items()}

    def presence_statuses_get(self, profile_key):
        """Get all the presence statuses of a profile

        @param profile_key: %(doc_profile_key)s
        @return: presence data: key=entity JID, value=presence data for this entity
        """
        client = self.host.get_client(profile_key)
        profile_cache = self._get_profile_cache(client)
        entities_presence = {}

        for entity_jid, entity_data in profile_cache.items():
            for resource, resource_data in entity_data.items():
                full_jid = copy.copy(entity_jid)
                full_jid.resource = resource
                try:
                    presence_data = self.get_entity_datum(client, full_jid, "presence")
                except KeyError:
                    continue
                entities_presence.setdefault(entity_jid, {})[
                    resource or ""
                ] = presence_data

        return entities_presence

    def set_presence_status(self, entity_jid, show, priority, statuses, profile_key):
        """Change the presence status of an entity

        @param entity_jid: jid.JID of the entity
        @param show: show status
        @param priority: priority
        @param statuses: dictionary of statuses
        @param profile_key: %(doc_profile_key)s
        """
        client = self.host.get_client(profile_key)
        presence_data = PresenceTuple(show, priority, statuses)
        self.update_entity_data(
            client, entity_jid, "presence", presence_data
        )
        if entity_jid.resource and show != C.PRESENCE_UNAVAILABLE:
            # If a resource is available, bare jid should not have presence information
            try:
                self.del_entity_datum(client, entity_jid.userhostJID(), "presence")
            except (KeyError, exceptions.UnknownEntityError):
                pass

    ## Resources ##

    def _get_all_resource(self, jid_s, profile_key):
        client = self.host.get_client(profile_key)
        jid_ = jid.JID(jid_s)
        return self.get_all_resources(client, jid_)

    def get_all_resources(self, client, entity_jid):
        """Return all resource from jid for which we have had data in this session

        @param entity_jid: bare jid of the entity
        return (set[unicode]): set of resources

        @raise exceptions.UnknownEntityError: if entity is not in cache
        @raise ValueError: entity_jid has a resource
        """
        # FIXME: is there a need to keep cache data for resources which are not connected anymore?
        if entity_jid.resource:
            raise ValueError(
                "get_all_resources must be used with a bare jid (got {})".format(entity_jid)
            )
        profile_cache = self._get_profile_cache(client)
        try:
            entity_data = profile_cache[entity_jid.userhostJID()]
        except KeyError:
            raise exceptions.UnknownEntityError(
                "Entity {} not in cache".format(entity_jid)
            )
        resources = set(entity_data.keys())
        resources.discard(None)
        return resources

    def get_available_resources(self, client, entity_jid):
        """Return available resource for entity_jid

        This method differs from get_all_resources by returning only available resources
        @param entity_jid: bare jid of the entit
        return (list[unicode]): list of available resources

        @raise exceptions.UnknownEntityError: if entity is not in cache
        """
        available = []
        for resource in self.get_all_resources(client, entity_jid):
            full_jid = copy.copy(entity_jid)
            full_jid.resource = resource
            try:
                presence_data = self.get_entity_datum(client, full_jid, "presence")
            except KeyError:
                log.debug("Can't get presence data for {}".format(full_jid))
            else:
                if presence_data.show != C.PRESENCE_UNAVAILABLE:
                    available.append(resource)
        return available

    def _get_main_resource(self, jid_s, profile_key):
        client = self.host.get_client(profile_key)
        jid_ = jid.JID(jid_s)
        return self.main_resource_get(client, jid_) or ""

    def main_resource_get(self, client, entity_jid):
        """Return the main resource used by an entity

        @param entity_jid: bare entity jid
        @return (unicode): main resource or None
        """
        if entity_jid.resource:
            raise ValueError(
                "main_resource_get must be used with a bare jid (got {})".format(entity_jid)
            )
        try:
            if self.host.plugins["XEP-0045"].is_joined_room(client, entity_jid):
                return None  # MUC rooms have no main resource
        except KeyError:  # plugin not found
            pass
        try:
            resources = self.get_all_resources(client, entity_jid)
        except exceptions.UnknownEntityError:
            log.warning("Entity is not in cache, we can't find any resource")
            return None
        priority_resources = []
        for resource in resources:
            full_jid = copy.copy(entity_jid)
            full_jid.resource = resource
            try:
                presence_data = self.get_entity_datum(client, full_jid, "presence")
            except KeyError:
                log.debug("No presence information for {}".format(full_jid))
                continue
            priority_resources.append((resource, presence_data.priority))
        try:
            return max(priority_resources, key=lambda res_tuple: res_tuple[1])[0]
        except ValueError:
            log.warning("No resource found at all for {}".format(entity_jid))
            return None

    ## Entities data ##

    def _get_profile_cache(self, client):
        """Check profile validity and return its cache

        @param client: SatXMPPClient
        @return (dict): profile cache
        """
        return self._entities_cache[client.profile]

    def set_signal_on_update(self, key, signal=True):
        """Set a signal flag on the key

        When the key will be updated, a signal will be sent to frontends
        @param key: key to signal
        @param signal(boolean): if True, do the signal
        """
        if signal:
            self._key_signals.add(key)
        else:
            self._key_signals.discard(key)

    def get_all_entities_iter(self, client, with_bare=False):
        """Return an iterator of full jids of all entities in cache

        @param with_bare: if True, include bare jids
        @return (list[unicode]): list of jids
        """
        profile_cache = self._get_profile_cache(client)
        # we construct a list of all known full jids (bare jid of entities x resources)
        for bare_jid, entity_data in profile_cache.items():
            for resource in entity_data.keys():
                if resource is None:
                    continue
                full_jid = copy.copy(bare_jid)
                full_jid.resource = resource
                yield full_jid

    def update_entity_data(
        self, client, entity_jid, key, value, silent=False
    ):
        """Set a misc data for an entity

        If key was registered with set_signal_on_update, a signal will be sent to frontends
        @param entity_jid: JID of the entity, C.ENTITY_ALL_RESOURCES for all resources of
            all entities, C.ENTITY_ALL for all entities (all resources + bare jids)
        @param key: key to set (eg: C.ENTITY_TYPE)
        @param value: value for this key (eg: C.ENTITY_TYPE_MUC)
        @param silent(bool): if True, doesn't send signal to frontend, even if there is a
            signal flag (see set_signal_on_update)
        """
        profile_cache = self._get_profile_cache(client)
        if entity_jid in (C.ENTITY_ALL_RESOURCES, C.ENTITY_ALL):
            entities = self.get_all_entities_iter(client, entity_jid == C.ENTITY_ALL)
        else:
            entities = (entity_jid,)

        for jid_ in entities:
            entity_data = profile_cache.setdefault(jid_.userhostJID(), {}).setdefault(
                jid_.resource, {}
            )

            entity_data[key] = value
            if key in self._key_signals and not silent:
                self.host.bridge.entity_data_updated(
                    jid_.full(),
                    key,
                    data_format.serialise(value),
                    client.profile
                )

    def del_entity_datum(self, client, entity_jid, key):
        """Delete a data for an entity

        @param entity_jid: JID of the entity, C.ENTITY_ALL_RESOURCES for all resources of all entities,
                           C.ENTITY_ALL for all entities (all resources + bare jids)
        @param key: key to delete (eg: C.ENTITY_TYPE)

        @raise exceptions.UnknownEntityError: if entity is not in cache
        @raise KeyError: key is not in cache
        """
        profile_cache = self._get_profile_cache(client)
        if entity_jid in (C.ENTITY_ALL_RESOURCES, C.ENTITY_ALL):
            entities = self.get_all_entities_iter(client, entity_jid == C.ENTITY_ALL)
        else:
            entities = (entity_jid,)

        for jid_ in entities:
            try:
                entity_data = profile_cache[jid_.userhostJID()][jid_.resource]
            except KeyError:
                raise exceptions.UnknownEntityError(
                    "Entity {} not in cache".format(jid_)
                )
            try:
                del entity_data[key]
            except KeyError as e:
                if entity_jid in (C.ENTITY_ALL_RESOURCES, C.ENTITY_ALL):
                    continue  # we ignore KeyError when deleting keys from several entities
                else:
                    raise e

    def _get_entities_data(self, entities_jids, keys_list, profile_key):
        client = self.host.get_client(profile_key)
        ret = self.entities_data_get(
            client, [jid.JID(jid_) for jid_ in entities_jids], keys_list
        )
        return {
            jid_.full(): {k: data_format.serialise(v) for k,v in data.items()}
            for jid_, data in ret.items()
        }

    def entities_data_get(self, client, entities_jids, keys_list=None):
        """Get a list of cached values for several entities at once

        @param entities_jids: jids of the entities, or empty list for all entities in cache
        @param keys_list (iterable,None): list of keys to get, None for everything
        @param profile_key: %(doc_profile_key)s
        @return: dict withs values for each key in keys_list.
                 if there is no value of a given key, resulting dict will
                 have nothing with that key nether
                 if an entity doesn't exist in cache, it will not appear
                 in resulting dict

        @raise exceptions.UnknownEntityError: if entity is not in cache
        """

        def fill_entity_data(entity_cache_data):
            entity_data = {}
            if keys_list is None:
                entity_data = entity_cache_data
            else:
                for key in keys_list:
                    try:
                        entity_data[key] = entity_cache_data[key]
                    except KeyError:
                        continue
            return entity_data

        profile_cache = self._get_profile_cache(client)
        ret_data = {}
        if entities_jids:
            for entity in entities_jids:
                try:
                    entity_cache_data = profile_cache[entity.userhostJID()][
                        entity.resource
                    ]
                except KeyError:
                    continue
                ret_data[entity.full()] = fill_entity_data(entity_cache_data, keys_list)
        else:
            for bare_jid, data in profile_cache.items():
                for resource, entity_cache_data in data.items():
                    full_jid = copy.copy(bare_jid)
                    full_jid.resource = resource
                    ret_data[full_jid] = fill_entity_data(entity_cache_data)

        return ret_data

    def _get_entity_data(self, entity_jid_s, keys_list=None, profile=C.PROF_KEY_NONE):
        return self.entity_data_get(
            self.host.get_client(profile), jid.JID(entity_jid_s), keys_list)

    def entity_data_get(self, client, entity_jid, keys_list=None):
        """Get a list of cached values for entity

        @param entity_jid: JID of the entity
        @param keys_list (iterable,None): list of keys to get, None for everything
        @param profile_key: %(doc_profile_key)s
        @return: dict withs values for each key in keys_list.
                 if there is no value of a given key, resulting dict will
                 have nothing with that key nether

        @raise exceptions.UnknownEntityError: if entity is not in cache
        """
        profile_cache = self._get_profile_cache(client)
        try:
            entity_data = profile_cache[entity_jid.userhostJID()][entity_jid.resource]
        except KeyError:
            raise exceptions.UnknownEntityError(
                "Entity {} not in cache (was requesting {})".format(
                    entity_jid, keys_list
                )
            )
        if keys_list is None:
            return entity_data

        return {key: entity_data[key] for key in keys_list if key in entity_data}

    def get_entity_datum(self, client, entity_jid, key):
        """Get a datum from entity

        @param entity_jid: JID of the entity
        @param key: key to get
        @return: requested value

        @raise exceptions.UnknownEntityError: if entity is not in cache
        @raise KeyError: if there is no value for this key and this entity
        """
        return self.entity_data_get(client, entity_jid, (key,))[key]

    def del_entity_cache(
        self, entity_jid, delete_all_resources=True, profile_key=C.PROF_KEY_NONE
    ):
        """Remove all cached data for entity

        @param entity_jid: JID of the entity to delete
        @param delete_all_resources: if True also delete all known resources from cache (a bare jid must be given in this case)
        @param profile_key: %(doc_profile_key)s

        @raise exceptions.UnknownEntityError: if entity is not in cache
        """
        client = self.host.get_client(profile_key)
        profile_cache = self._get_profile_cache(client)

        if delete_all_resources:
            if entity_jid.resource:
                raise ValueError(_("Need a bare jid to delete all resources"))
            try:
                del profile_cache[entity_jid]
            except KeyError:
                raise exceptions.UnknownEntityError(
                    "Entity {} not in cache".format(entity_jid)
                )
        else:
            try:
                del profile_cache[entity_jid.userhostJID()][entity_jid.resource]
            except KeyError:
                raise exceptions.UnknownEntityError(
                    "Entity {} not in cache".format(entity_jid)
                )

    ## Encryption ##

    def encrypt_value(self, value, profile):
        """Encrypt a value for the given profile. The personal key must be loaded
        already in the profile session, that should be the case if the profile is
        already authenticated.

        @param value (str): the value to encrypt
        @param profile (str): %(doc_profile)s
        @return: the deferred encrypted value
        """
        try:
            personal_key = self.auth_sessions.profile_get_unique(profile)[
                C.MEMORY_CRYPTO_KEY
            ]
        except TypeError:
            raise exceptions.InternalError(
                _("Trying to encrypt a value for %s while the personal key is undefined!")
                % profile
            )
        return BlockCipher.encrypt(personal_key, value)

    def decrypt_value(self, value, profile):
        """Decrypt a value for the given profile. The personal key must be loaded
        already in the profile session, that should be the case if the profile is
        already authenticated.

        @param value (str): the value to decrypt
        @param profile (str): %(doc_profile)s
        @return: the deferred decrypted value
        """
        try:
            personal_key = self.auth_sessions.profile_get_unique(profile)[
                C.MEMORY_CRYPTO_KEY
            ]
        except TypeError:
            raise exceptions.InternalError(
                _("Trying to decrypt a value for %s while the personal key is undefined!")
                % profile
            )
        return BlockCipher.decrypt(personal_key, value)

    def encrypt_personal_data(self, data_key, data_value, crypto_key, profile):
        """Re-encrypt a personal data (saved to a PersistentDict).

        @param data_key: key for the individual PersistentDict instance
        @param data_value: the value to be encrypted
        @param crypto_key: the key to encrypt the value
        @param profile: %(profile_doc)s
        @return: a deferred None value
        """

        def got_ind_memory(data):
            data[data_key] = BlockCipher.encrypt(crypto_key, data_value)
            return data.force(data_key)

        def done(__):
            log.debug(
                _("Personal data (%(ns)s, %(key)s) has been successfuly encrypted")
                % {"ns": C.MEMORY_CRYPTO_NAMESPACE, "key": data_key}
            )

        d = PersistentDict(C.MEMORY_CRYPTO_NAMESPACE, profile).load()
        return d.addCallback(got_ind_memory).addCallback(done)

    ## Subscription requests ##

    def add_waiting_sub(self, type_, entity_jid, profile_key):
        """Called when a subcription request is received"""
        profile = self.get_profile_name(profile_key)
        assert profile
        if profile not in self.subscriptions:
            self.subscriptions[profile] = {}
        self.subscriptions[profile][entity_jid] = type_

    def del_waiting_sub(self, entity_jid, profile_key):
        """Called when a subcription request is finished"""
        profile = self.get_profile_name(profile_key)
        assert profile
        if profile in self.subscriptions and entity_jid in self.subscriptions[profile]:
            del self.subscriptions[profile][entity_jid]

    def sub_waiting_get(self, profile_key):
        """Called to get a list of currently waiting subscription requests"""
        profile = self.get_profile_name(profile_key)
        if not profile:
            log.error(_("Asking waiting subscriptions for a non-existant profile"))
            return {}
        if profile not in self.subscriptions:
            return {}

        return self.subscriptions[profile]

    ## Parameters ##

    def get_string_param_a(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE):
        return self.params.get_string_param_a(name, category, attr, profile_key)

    def param_get_a(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE):
        return self.params.param_get_a(name, category, attr, profile_key=profile_key)

    def param_get_a_async(
        self,
        name,
        category,
        attr="value",
        security_limit=C.NO_SECURITY_LIMIT,
        profile_key=C.PROF_KEY_NONE,
    ):
        return self.params.param_get_a_async(
            name, category, attr, security_limit, profile_key
        )

    def _get_params_values_from_category(
        self, category, security_limit, app, extra_s, profile_key
    ):
        return self.params._get_params_values_from_category(
            category, security_limit, app, extra_s, profile_key
        )

    def async_get_string_param_a(
        self, name, category, attribute="value", security_limit=C.NO_SECURITY_LIMIT,
        profile_key=C.PROF_KEY_NONE):

        profile = self.get_profile_name(profile_key)
        return defer.ensureDeferred(self.params.async_get_string_param_a(
            name, category, attribute, security_limit, profile
        ))

    def _get_params_ui(self, security_limit, app, extra_s, profile_key):
        return self.params._get_params_ui(security_limit, app, extra_s, profile_key)

    def params_categories_get(self):
        return self.params.params_categories_get()

    def param_set(
        self,
        name,
        value,
        category,
        security_limit=C.NO_SECURITY_LIMIT,
        profile_key=C.PROF_KEY_NONE,
    ):
        return self.params.param_set(name, value, category, security_limit, profile_key)

    def update_params(self, xml):
        return self.params.update_params(xml)

    def params_register_app(self, xml, security_limit=C.NO_SECURITY_LIMIT, app=""):
        return self.params.params_register_app(xml, security_limit, app)

    def set_default(self, name, category, callback, errback=None):
        return self.params.set_default(name, category, callback, errback)

    ## Private Data ##

    def _private_data_set(self, namespace, key, data_s, profile_key):
        client = self.host.get_client(profile_key)
        # we accept any type
        data = data_format.deserialise(data_s, type_check=None)
        return defer.ensureDeferred(self.storage.set_private_value(
            namespace, key, data, binary=True, profile=client.profile))

    def _private_data_get(self, namespace, key, profile_key):
        client = self.host.get_client(profile_key)
        d = defer.ensureDeferred(
            self.storage.get_privates(
                namespace, [key], binary=True, profile=client.profile)
        )
        d.addCallback(lambda data_dict: data_format.serialise(data_dict.get(key)))
        return d

    def _private_data_delete(self, namespace, key, profile_key):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(self.storage.del_private_value(
            namespace, key, binary=True, profile=client.profile))

    ## Files ##

    def check_file_permission(
            self,
            file_data: dict,
            peer_jid: Optional[jid.JID],
            perms_to_check: Optional[Tuple[str]],
            set_affiliation: bool = False
    ) -> None:
        """Check that an entity has the right permission on a file

        @param file_data: data of one file, as returned by get_files
        @param peer_jid: entity trying to access the file
        @param perms_to_check: permissions to check
            tuple of C.ACCESS_PERM_*
        @param check_parents: if True, also check all parents until root node
        @parma set_affiliation: if True, "affiliation" metadata will be set
        @raise exceptions.PermissionError: peer_jid doesn't have all permission
            in perms_to_check for file_data
        @raise exceptions.InternalError: perms_to_check is invalid
        """
        # TODO: knowing if user is owner is not enough, we need to check permission
        #   to see if user can modify/delete files, and set corresponding affiliation (publisher, member)
        if peer_jid is None and perms_to_check is None:
            return
        peer_jid = peer_jid.userhostJID()
        if peer_jid == file_data["owner"]:
            if set_affiliation:
                file_data['affiliation'] = 'owner'
            # the owner has all rights, nothing to check
            return
        if not C.ACCESS_PERMS.issuperset(perms_to_check):
            raise exceptions.InternalError(_("invalid permission"))

        for perm in perms_to_check:
            # we check each perm and raise PermissionError as soon as one condition is not valid
            # we must never return here, we only return after the loop if nothing was blocking the access
            try:
                perm_data = file_data["access"][perm]
                perm_type = perm_data["type"]
            except KeyError:
                # No permission is set.
                # If we are in a root file/directory, we deny access
                # otherwise, we use public permission, as the parent directory will
                # block anyway, this avoid to have to recursively change permissions for
                # all sub directories/files when modifying a permission
                if not file_data.get('parent'):
                    raise exceptions.PermissionError()
                else:
                    perm_type = C.ACCESS_TYPE_PUBLIC
            if perm_type == C.ACCESS_TYPE_PUBLIC:
                continue
            elif perm_type == C.ACCESS_TYPE_WHITELIST:
                try:
                    jids = perm_data["jids"]
                except KeyError:
                    raise exceptions.PermissionError()
                if peer_jid.full() in jids:
                    continue
                else:
                    raise exceptions.PermissionError()
            else:
                raise exceptions.InternalError(
                    _("unknown access type: {type}").format(type=perm_type)
                )

    async def check_permission_to_root(self, client, file_data, peer_jid, perms_to_check):
        """do check_file_permission on file_data and all its parents until root"""
        current = file_data
        while True:
            self.check_file_permission(current, peer_jid, perms_to_check)
            parent = current["parent"]
            if not parent:
                break
            files_data = await self.get_files(
                client, peer_jid=None, file_id=parent, perms_to_check=None
            )
            try:
                current = files_data[0]
            except IndexError:
                raise exceptions.DataError("Missing parent")

    async def _get_parent_dir(
        self, client, path, parent, namespace, owner, peer_jid, perms_to_check
    ):
        """Retrieve parent node from a path, or last existing directory

        each directory of the path will be retrieved, until the last existing one
        @return (tuple[unicode, list[unicode])): parent, remaining path elements:
            - parent is the id of the last retrieved directory (or u'' for root)
            - remaining path elements are the directories which have not been retrieved
              (i.e. which don't exist)
        """
        # if path is set, we have to retrieve parent directory of the file(s) from it
        if parent is not None:
            raise exceptions.ConflictError(
                _("You can't use path and parent at the same time")
            )
        path_elts = [_f for _f in path.split("/") if _f]
        if {"..", "."}.intersection(path_elts):
            raise ValueError(_('".." or "." can\'t be used in path'))

        # we retrieve all directories from path until we get the parent container
        # non existing directories will be created
        parent = ""
        for idx, path_elt in enumerate(path_elts):
            directories = await self.storage.get_files(
                client,
                parent=parent,
                type_=C.FILE_TYPE_DIRECTORY,
                name=path_elt,
                namespace=namespace,
                owner=owner,
            )
            if not directories:
                return (parent, path_elts[idx:])
                # from this point, directories don't exist anymore, we have to create them
            elif len(directories) > 1:
                raise exceptions.InternalError(
                    _("Several directories found, this should not happen")
                )
            else:
                directory = directories[0]
                self.check_file_permission(directory, peer_jid, perms_to_check)
                parent = directory["id"]
        return (parent, [])

    def get_file_affiliations(self, file_data: dict) -> Dict[jid.JID, str]:
        """Convert file access to pubsub like affiliations"""
        affiliations = {}
        access_data = file_data['access']

        read_data = access_data.get(C.ACCESS_PERM_READ, {})
        if read_data.get('type') == C.ACCESS_TYPE_WHITELIST:
            for entity_jid_s in read_data['jids']:
                entity_jid = jid.JID(entity_jid_s)
                affiliations[entity_jid] = 'member'

        write_data = access_data.get(C.ACCESS_PERM_WRITE, {})
        if write_data.get('type') == C.ACCESS_TYPE_WHITELIST:
            for entity_jid_s in write_data['jids']:
                entity_jid = jid.JID(entity_jid_s)
                affiliations[entity_jid] = 'publisher'

        owner = file_data.get('owner')
        if owner:
            affiliations[owner] = 'owner'

        return affiliations

    def _set_file_affiliations_update(
        self,
        access: dict,
        file_data: dict,
        affiliations: Dict[jid.JID, str]
    ) -> None:
        read_data = access.setdefault(C.ACCESS_PERM_READ, {})
        if read_data.get('type') != C.ACCESS_TYPE_WHITELIST:
            read_data['type'] = C.ACCESS_TYPE_WHITELIST
            if 'jids' not in read_data:
                read_data['jids'] = []
        read_whitelist = read_data['jids']
        write_data = access.setdefault(C.ACCESS_PERM_WRITE, {})
        if write_data.get('type') != C.ACCESS_TYPE_WHITELIST:
            write_data['type'] = C.ACCESS_TYPE_WHITELIST
            if 'jids' not in write_data:
                write_data['jids'] = []
        write_whitelist = write_data['jids']
        for entity_jid, affiliation in affiliations.items():
            entity_jid_s = entity_jid.full()
            if affiliation == "none":
                try:
                    read_whitelist.remove(entity_jid_s)
                except ValueError:
                    log.warning(
                        "removing affiliation from an entity without read permission: "
                        f"{entity_jid}"
                    )
                try:
                    write_whitelist.remove(entity_jid_s)
                except ValueError:
                    pass
            elif affiliation == "publisher":
                if entity_jid_s not in read_whitelist:
                    read_whitelist.append(entity_jid_s)
                if entity_jid_s not in write_whitelist:
                    write_whitelist.append(entity_jid_s)
            elif affiliation == "member":
                if entity_jid_s not in read_whitelist:
                    read_whitelist.append(entity_jid_s)
                try:
                    write_whitelist.remove(entity_jid_s)
                except ValueError:
                    pass
            elif affiliation == "owner":
                raise NotImplementedError('"owner" affiliation can\'t be set')
            else:
                raise ValueError(f"unknown affiliation: {affiliation!r}")

    async def set_file_affiliations(
        self,
        client,
        file_data: dict,
        affiliations: Dict[jid.JID, str]
    ) -> None:
        """Apply pubsub like affiliation to file_data

        Affiliations are converted to access types, then set in a whitelist.
        Affiliation are mapped as follow:
            - "owner" can't be set (for now)
            - "publisher" gives read and write permissions
            - "member" gives read permission only
            - "none" removes both read and write permissions
        """
        file_id = file_data['id']
        await self.file_update(
            file_id,
            'access',
            update_cb=partial(
                self._set_file_affiliations_update,
                file_data=file_data,
                affiliations=affiliations
            ),
        )

    def _set_file_access_model_update(
        self,
        access: dict,
        file_data: dict,
        access_model: str
    ) -> None:
        read_data = access.setdefault(C.ACCESS_PERM_READ, {})
        if access_model == "open":
            requested_type = C.ACCESS_TYPE_PUBLIC
        elif access_model == "whitelist":
            requested_type = C.ACCESS_TYPE_WHITELIST
        else:
            raise ValueError(f"unknown access model: {access_model}")

        read_data['type'] = requested_type
        if requested_type == C.ACCESS_TYPE_WHITELIST and 'jids' not in read_data:
            read_data['jids'] = []

    async def set_file_access_model(
        self,
        client,
        file_data: dict,
        access_model: str,
    ) -> None:
        """Apply pubsub like access_model to file_data

        Only 2 access models are supported so far:
            - "open": set public access to file/dir
            - "whitelist": set whitelist to file/dir
        """
        file_id = file_data['id']
        await self.file_update(
            file_id,
            'access',
            update_cb=partial(
                self._set_file_access_model_update,
                file_data=file_data,
                access_model=access_model
            ),
        )

    def get_files_owner(
            self,
            client,
            owner: Optional[jid.JID],
            peer_jid: Optional[jid.JID],
            file_id: Optional[str] = None,
            parent: Optional[str] = None
    ) -> jid.JID:
        """Get owner to use for a file operation

        if owner is not explicitely set, a suitable one will be used (client.jid for
        clients, peer_jid for components).
        @raise exception.InternalError: we are one a component, and neither owner nor
            peer_jid are set
        """
        if owner is not None:
            return owner.userhostJID()
        if client is None:
            # client may be None when looking for file with public_id
            return None
        if file_id or parent:
            # owner has already been filtered on parent file
            return None
        if not client.is_component:
            return client.jid.userhostJID()
        if peer_jid is None:
            raise exceptions.InternalError(
                "Owner must be set for component if peer_jid is None"
            )
        return peer_jid.userhostJID()

    async def get_files(
        self, client, peer_jid, file_id=None, version=None, parent=None, path=None,
        type_=None, file_hash=None, hash_algo=None, name=None, namespace=None,
        mime_type=None, public_id=None, owner=None, access=None, projection=None,
        unique=False, perms_to_check=(C.ACCESS_PERM_READ,)):
        """Retrieve files with with given filters

        @param peer_jid(jid.JID, None): jid trying to access the file
            needed to check permission.
            Use None to ignore permission (perms_to_check must be None too)
        @param file_id(unicode, None): id of the file
            None to ignore
        @param version(unicode, None): version of the file
            None to ignore
            empty string to look for current version
        @param parent(unicode, None): id of the directory containing the files
            None to ignore
            empty string to look for root files/directories
        @param path(Path, unicode, None): path to the directory containing the files
        @param type_(unicode, None): type of file filter, can be one of C.FILE_TYPE_*
        @param file_hash(unicode, None): hash of the file to retrieve
        @param hash_algo(unicode, None): algorithm use for file_hash
        @param name(unicode, None): name of the file to retrieve
        @param namespace(unicode, None): namespace of the files to retrieve
        @param mime_type(unicode, None): filter on this mime type
        @param public_id(unicode, None): filter on this public id
        @param owner(jid.JID, None): if not None, only get files from this owner
        @param access(dict, None): get file with given access (see [set_file])
        @param projection(list[unicode], None): name of columns to retrieve
            None to retrieve all
        @param unique(bool): if True will remove duplicates
        @param perms_to_check(tuple[unicode],None): permission to check
            must be a tuple of C.ACCESS_PERM_* or None
            if None, permission will no be checked (peer_jid must be None too in this
            case)
        other params are the same as for [set_file]
        @return (list[dict]): files corresponding to filters
        @raise exceptions.NotFound: parent directory not found (when path is specified)
        @raise exceptions.PermissionError: peer_jid can't use perms_to_check for one of
                                           the file
            on the path
        """
        if peer_jid is None and perms_to_check or perms_to_check is None and peer_jid:
            raise exceptions.InternalError(
                "if you want to disable permission check, both peer_jid and "
                "perms_to_check must be None"
            )
        owner = self.get_files_owner(client, owner, peer_jid, file_id, parent)
        if path is not None:
            path = str(path)
            # permission are checked by _get_parent_dir
            parent, remaining_path_elts = await self._get_parent_dir(
                client, path, parent, namespace, owner, peer_jid, perms_to_check
            )
            if remaining_path_elts:
                # if we have remaining path elements,
                # the parent directory is not found
                raise failure.Failure(exceptions.NotFound())
        if parent and peer_jid:
            # if parent is given directly and permission check is requested,
            # we need to check all the parents
            parent_data = await self.storage.get_files(client, file_id=parent)
            try:
                parent_data = parent_data[0]
            except IndexError:
                raise exceptions.DataError("mising parent")
            await self.check_permission_to_root(
                client, parent_data, peer_jid, perms_to_check
            )

        files = await self.storage.get_files(
            client,
            file_id=file_id,
            version=version,
            parent=parent,
            type_=type_,
            file_hash=file_hash,
            hash_algo=hash_algo,
            name=name,
            namespace=namespace,
            mime_type=mime_type,
            public_id=public_id,
            owner=owner,
            access=access,
            projection=projection,
            unique=unique,
        )

        if peer_jid:
            # if permission are checked, we must remove all file that user can't access
            to_remove = []
            for file_data in files:
                try:
                    self.check_file_permission(
                        file_data, peer_jid, perms_to_check, set_affiliation=True
                    )
                except exceptions.PermissionError:
                    to_remove.append(file_data)
            for file_data in to_remove:
                files.remove(file_data)
        return files

    async def set_file(
        self, client, name, file_id=None, version="", parent=None, path=None,
        type_=C.FILE_TYPE_FILE, file_hash=None, hash_algo=None, size=None,
        namespace=None, mime_type=None, public_id=None, created=None, modified=None,
        owner=None, access=None, extra=None, peer_jid=None,
        perms_to_check=(C.ACCESS_PERM_WRITE,)
    ):
        """Set a file metadata

        @param name(unicode): basename of the file
        @param file_id(unicode): unique id of the file
        @param version(unicode): version of this file
            empty string for current version or when there is no versioning
        @param parent(unicode, None): id of the directory containing the files
        @param path(unicode, None): virtual path of the file in the namespace
            if set, parent must be None. All intermediate directories will be created
            if needed, using current access.
        @param type_(str, None): type of file filter, can be one of C.FILE_TYPE_*
        @param file_hash(unicode): unique hash of the payload
        @param hash_algo(unicode): algorithm used for hashing the file (usually sha-256)
        @param size(int): size in bytes
        @param namespace(unicode, None): identifier (human readable is better) to group
                                         files
            For instance, namespace could be used to group files in a specific photo album
        @param mime_type(unicode): MIME type of the file, or None if not known/guessed
        @param public_id(unicode): id used to share publicly the file via HTTP
        @param created(int): UNIX time of creation
        @param modified(int,None): UNIX time of last modification, or None to use
                                   created date
        @param owner(jid.JID, None): jid of the owner of the file (mainly useful for
                                     component)
            will be used to check permission (only bare jid is used, don't use with MUC).
            Use None to ignore permission (perms_to_check must be None too)
        @param access(dict, None): serialisable dictionary with access rules.
            None (or empty dict) to use private access, i.e. allow only profile's jid to
            access the file
            key can be on on C.ACCESS_PERM_*,
            then a sub dictionary with a type key is used (one of C.ACCESS_TYPE_*).
            According to type, extra keys can be used:
                - C.ACCESS_TYPE_PUBLIC: the permission is granted for everybody
                - C.ACCESS_TYPE_WHITELIST: the permission is granted for jids (as unicode)
                  in the 'jids' key
            will be encoded to json in database
        @param extra(dict, None): serialisable dictionary of any extra data
            will be encoded to json in database
        @param perms_to_check(tuple[unicode],None): permission to check
            must be a tuple of C.ACCESS_PERM_* or None
            if None, permission will not be checked (peer_jid must be None too in this
            case)
        @param profile(unicode): profile owning the file
        """
        if "/" in name:
            raise ValueError('name must not contain a slash ("/")')
        if file_id is None:
            file_id = shortuuid.uuid()
        if (
            file_hash is not None
            and hash_algo is None
            or hash_algo is not None
            and file_hash is None
        ):
            raise ValueError("file_hash and hash_algo must be set at the same time")
        if mime_type is None:
            mime_type, __ = mimetypes.guess_type(name)
        else:
            mime_type = mime_type.lower()
        if public_id is not None:
            assert len(public_id)>0
        if created is None:
            created = time.time()
        if namespace is not None:
            namespace = namespace.strip() or None
        if type_ == C.FILE_TYPE_DIRECTORY:
            if any((version, file_hash, size, mime_type)):
                raise ValueError(
                    "version, file_hash, size and mime_type can't be set for a directory"
                )
        owner = self.get_files_owner(client, owner, peer_jid, file_id, parent)

        if path is not None:
            path = str(path)
            # _get_parent_dir will check permissions if peer_jid is set, so we use owner
            parent, remaining_path_elts = await self._get_parent_dir(
                client, path, parent, namespace, owner, owner, perms_to_check
            )
            # if remaining directories don't exist, we have to create them
            for new_dir in remaining_path_elts:
                new_dir_id = shortuuid.uuid()
                await self.storage.set_file(
                    client,
                    name=new_dir,
                    file_id=new_dir_id,
                    version="",
                    parent=parent,
                    type_=C.FILE_TYPE_DIRECTORY,
                    namespace=namespace,
                    created=time.time(),
                    owner=owner,
                    access=access,
                    extra={},
                )
                parent = new_dir_id
        elif parent is None:
            parent = ""

        await self.storage.set_file(
            client,
            file_id=file_id,
            version=version,
            parent=parent,
            type_=type_,
            file_hash=file_hash,
            hash_algo=hash_algo,
            name=name,
            size=size,
            namespace=namespace,
            mime_type=mime_type,
            public_id=public_id,
            created=created,
            modified=modified,
            owner=owner,
            access=access,
            extra=extra,
        )

    async def file_get_used_space(
        self,
        client,
        peer_jid: jid.JID,
        owner: Optional[jid.JID] = None
    ) -> int:
        """Get space taken by all files owned by an entity

        @param peer_jid: entity requesting the size
        @param owner: entity owning the file to check. If None, will be determined by
            get_files_owner
        @return: size of total space used by files of this owner
        """
        owner = self.get_files_owner(client, owner, peer_jid)
        if peer_jid.userhostJID() != owner and client.profile not in self.admins:
            raise exceptions.PermissionError("You are not allowed to check this size")
        return await self.storage.file_get_used_space(client, owner)

    def file_update(self, file_id, column, update_cb):
        """Update a file column taking care of race condition

        access is NOT checked in this method, it must be checked beforehand
        @param file_id(unicode): id of the file to update
        @param column(unicode): one of "access" or "extra"
        @param update_cb(callable): method to update the value of the colum
            the method will take older value as argument, and must update it in place
            Note that the callable must be thread-safe
        """
        return self.storage.file_update(file_id, column, update_cb)

    @defer.inlineCallbacks
    def _delete_file(
        self,
        client,
        peer_jid: jid.JID,
        recursive: bool,
        files_path: Path,
        file_data: dict
    ):
        """Internal method to delete files/directories recursively

        @param peer_jid(jid.JID): entity requesting the deletion (must be owner of files
            to delete)
        @param recursive(boolean): True if recursive deletion is needed
        @param files_path(unicode): path of the directory containing the actual files
        @param file_data(dict): data of the file to delete
        """
        if file_data['owner'] != peer_jid:
            raise exceptions.PermissionError(
                "file {file_name} can't be deleted, {peer_jid} is not the owner"
                .format(file_name=file_data['name'], peer_jid=peer_jid.full()))
        if file_data['type'] == C.FILE_TYPE_DIRECTORY:
            sub_files = yield self.get_files(client, peer_jid, parent=file_data['id'])
            if sub_files and not recursive:
                raise exceptions.DataError(_("Can't delete directory, it is not empty"))
            # we first delete the sub-files
            for sub_file_data in sub_files:
                if sub_file_data['type'] == C.FILE_TYPE_DIRECTORY:
                    sub_file_path = files_path / sub_file_data['name']
                else:
                    sub_file_path = files_path
                yield self._delete_file(
                    client, peer_jid, recursive, sub_file_path, sub_file_data)
            # then the directory itself
            yield self.storage.file_delete(file_data['id'])
        elif file_data['type'] == C.FILE_TYPE_FILE:
            log.info(_("deleting file {name} with hash {file_hash}").format(
                name=file_data['name'], file_hash=file_data['file_hash']))
            yield self.storage.file_delete(file_data['id'])
            references = yield self.get_files(
                client, peer_jid, file_hash=file_data['file_hash'])
            if references:
                log.debug("there are still references to the file, we keep it")
            else:
                file_path = os.path.join(files_path, file_data['file_hash'])
                log.info(_("no reference left to {file_path}, deleting").format(
                    file_path=file_path))
                try:
                    os.unlink(file_path)
                except FileNotFoundError:
                    log.error(f"file at {file_path!r} doesn't exist but it was referenced in files database")
        else:
            raise exceptions.InternalError('Unexpected file type: {file_type}'
                .format(file_type=file_data['type']))

    async def file_delete(self, client, peer_jid, file_id, recursive=False):
        """Delete a single file or a directory and all its sub-files

        @param file_id(unicode): id of the file to delete
        @param peer_jid(jid.JID): entity requesting the deletion,
            must be owner of all files to delete
        @param recursive(boolean): must be True to delete a directory and all sub-files
        """
        # FIXME: we only allow owner of file to delete files for now, but WRITE access
        #        should be checked too
        files_data = await self.get_files(client, peer_jid, file_id)
        if not files_data:
            raise exceptions.NotFound("Can't find the file with id {file_id}".format(
                file_id=file_id))
        file_data = files_data[0]
        if file_data["type"] != C.FILE_TYPE_DIRECTORY and recursive:
            raise ValueError("recursive can only be set for directories")
        files_path = self.host.get_local_path(None, C.FILES_DIR)
        await self._delete_file(client, peer_jid, recursive, files_path, file_data)

    ## Cache ##

    def get_cache_path(self, namespace: str, *args: str) -> Path:
        """Get path to use to get a common path for a namespace

        This can be used by plugins to manage permanent data. It's the responsability
        of plugins to clean this directory from unused data.
        @param namespace: unique namespace to use
        @param args: extra identifier which will be added to the path
        """
        namespace = namespace.strip().lower()
        return Path(
            self._cache_path,
            regex.path_escape(namespace),
            *(regex.path_escape(a) for a in args)
        )

    ## Notifications ##


    def _add_notification(
        self,
        type_: str,
        body_plain: str,
        body_rich: str,
        title: str,
        is_global: bool,
        requires_action: bool,
        priority: str,
        expire_at: float,
        extra_s: str,
        profile_key: str
    ) -> defer.Deferred:
        client = self.host.get_client(profile_key)

        if not client.is_admin:
            raise exceptions.PermissionError("Only admins can add a notification")

        try:
            notification_type = NotificationType[type_]
            notification_priority = NotificationPriority[priority]
        except KeyError as e:
            raise exceptions.DataError(
                f"invalid notification type or priority data: {e}"
            )

        return defer.ensureDeferred(
            self.add_notification(
                client,
                notification_type,
                body_plain,
                body_rich or None,
                title or None,
                is_global,
                requires_action,
                notification_priority,
                expire_at or None,
                data_format.deserialise(extra_s)
            )
        )

    async def add_notification(
        self,
        client: SatXMPPEntity,
        type_: NotificationType,
        body_plain: str,
        body_rich: Optional[str] = None,
        title: Optional[str] = None,
        is_global: bool = False,
        requires_action: bool = False,
        priority: NotificationPriority = NotificationPriority.MEDIUM,
        expire_at: Optional[float] = None,
        extra: Optional[dict] = None,
    ) -> None:
        """Create and broadcast a new notification.

        @param client: client associated with the notification. If None, the notification
            will be global (i.e. for all profiles).
        @param type_: type of the notification.
        @param body_plain: plain text body.
        @param body_rich: rich text (XHTML) body.
        @param title: optional title.
        @param is_global: True if the notification is for all profiles.
        @param requires_action: True if the notification requires user action (e.g. a
            dialog need to be answered).
        @priority: how urgent the notification is
        @param expire_at: expiration timestamp for the notification.
        @param extra: additional data.
        """
        notification = await self.storage.add_notification(
            None if is_global else client, type_, body_plain, body_rich, title,
            requires_action, priority, expire_at, extra
        )
        self.host.bridge.notification_new(
            str(notification.id),
            notification.timestamp,
            type_.value,
            body_plain,
            body_rich or '',
            title or '',
            requires_action,
            priority.value,
            expire_at or 0,
            data_format.serialise(extra) if extra else '',
            C.PROF_KEY_ALL if is_global else client.profile
        )

    def _get_notifications(self, filters_s: str, profile_key: str) -> defer.Deferred:
        """Fetch notifications for bridge with given filters and profile key.

        @param filters_s: serialized filter conditions. Keys can be:
            :type_ (str):
                Filter by type of the notification.
            :status (str):
                Filter by status of the notification.
            :requires_action (bool):
                Filter by notifications that require user action.
            :min_priority (str):
                Filter by minimum priority value.
        @param profile_key: key of the profile to fetch notifications for.
        @return: Deferred which fires with a list of serialised notifications.
        """
        client = self.host.get_client(profile_key)

        filters = data_format.deserialise(filters_s)

        try:
            if 'type' in filters:
                filters['type_'] = NotificationType[filters.pop('type')]
            if 'status' in filters:
                filters['status'] = NotificationStatus[filters['status']]
            if 'min_priority' in filters:
                filters['min_priority'] = NotificationPriority[filters['min_priority']].value
        except KeyError as e:
            raise exceptions.DataError(f"invalid filter data: {e}")

        d = defer.ensureDeferred(self.storage.get_notifications(client, **filters))
        d.addCallback(
            lambda notifications: data_format.serialise(
                [notification.serialise() for notification in notifications]
            )
        )
        return d

    def _delete_notification(
        self,
        id_: str,
        is_global: bool,
        profile_key: str
    ) -> defer.Deferred:
        client = self.host.get_client(profile_key)
        if is_global and not client.is_admin:
            raise exceptions.PermissionError(
                "Only admins can delete global notifications"
            )
        return defer.ensureDeferred(self.delete_notification(client, id_, is_global))

    async def delete_notification(
        self,
        client: SatXMPPEntity,
        id_: str,
        is_global: bool=False
    ) -> None:
        """Delete a notification

        the notification must be from the requesting profile.
        @param id_: ID of the notification
        is_global: if True, a global notification will be removed.
        """
        await self.storage.delete_notification(None if is_global else client, id_)
        self.host.bridge.notification_deleted(
            id_,
            C.PROF_KEY_ALL if is_global else client.profile
        )

    def _notifications_expired_clean(
        self, limit_timestamp: float, profile_key: str
    ) -> defer.Deferred:
        if profile_key == C.PROF_KEY_NONE:
            client = None
        else:
            client = self.host.get_client(profile_key)

        return defer.ensureDeferred(
            self.storage.clean_expired_notifications(
                client,
                None if limit_timestamp == -1.0 else limit_timestamp
            )
        )


    ## Misc ##

    def is_entity_available(self, client, entity_jid):
        """Tell from the presence information if the given entity is available.

        @param entity_jid (JID): the entity to check (if bare jid is used, all resources are tested)
        @return (bool): True if entity is available
        """
        if not entity_jid.resource:
            return bool(
                self.get_available_resources(client, entity_jid)
            )  # is any resource is available, entity is available
        try:
            presence_data = self.get_entity_datum(client, entity_jid, "presence")
        except KeyError:
            log.debug("No presence information for {}".format(entity_jid))
            return False
        return presence_data.show != C.PRESENCE_UNAVAILABLE

    def is_admin(self, profile: str) -> bool:
        """Tell if given profile has administrator privileges"""
        return profile in self.admins

    def is_admin_jid(self, entity: jid.JID) -> bool:
        """Tells if an entity jid correspond to an admin one

        It is sometime not possible to use the profile alone to check if an entity is an
        admin (e.g. a request managed by a component). In this case we check if the JID
        correspond to an admin profile
        """
        return entity.userhostJID() in self.admin_jids