view src/memory/memory.py @ 943:71926ec2114d

core (memory): entities cache improvments: - entities cache is no more limited to bare jid - resources are now automatically updated in bare jid cache
author Goffi <goffi@goffi.org>
date Fri, 28 Mar 2014 18:07:17 +0100
parents 5b2d2f1f05d0
children e1842ebcb2f3
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SAT: a jabber client
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _

import os.path
import csv
from xdg import BaseDirectory
from ConfigParser import SafeConfigParser, NoOptionError, NoSectionError
from uuid import uuid4
from logging import debug, info, warning, error
from twisted.internet import defer, reactor
from twisted.words.protocols.jabber import jid
from sat.core import exceptions
from sat.core.constants import Const as C
from sat.memory.sqlite import SqliteStorage
from sat.memory.persistent import PersistentDict
from sat.memory.params import Params


class Sessions(object):
    DEFAULT_TIMEOUT = 600

    def __init__(self, timeout = None):
        """
        @param timeout: nb of seconds before session destruction
        """
        self._sessions = dict()
        self.timeout = timeout or Sessions.DEFAULT_TIMEOUT

    def newSession(self, session_data=None, profile=None):
        """ Create a new session
        @param session_data: mutable data to use, default to a dict
        @param profile: if set, the session is owned by the profile,
                        and profileGet must be used instead of __getitem__
        @return: session_id, session_data
        """
        session_id = str(uuid4())
        timer = reactor.callLater(self.timeout, self._purgeSession, session_id)
        if session_data is None:
            session_data = {}
        self._sessions[session_id] = (timer, session_data) if profile is None else (timer, session_data, profile)
        return session_id, session_data

    def _purgeSession(self, session_id):
        del self._sessions[session_id]
        debug("Session [%s] purged" % session_id)

    def __len__(self):
        return len(self._sessions)

    def __contains__(self, session_id):
        return session_id in self._sessions

    def profileGet(self, session_id, profile):
        try:
            timer, session_data, profile_set = self._sessions[session_id]
        except ValueError:
            raise exceptions.InternalError("You need to use __getitem__ when profile is not set")
        if profile_set != profile:
            raise exceptions.InternalError("current profile differ from set profile !")
        timer.reset(self.timeout)
        return session_data

    def __getitem__(self, session_id):
        try:
            timer, session_data = self._sessions[session_id]
        except ValueError:
            raise exceptions.InternalError("You need to use profileGet instead of __getitem__ when profile is set")
        timer.reset(self.timeout)
        return session_data

    def __setitem__(self, key, value):
        raise NotImplementedError("You need do use newSession to create a session")

    def __delitem__(self, session_id):
        """ Cancel the timer, then actually delete the session data """
        try:
            timer = self._sessions[session_id][0]
            timer.cancel()
            self._purgeSession(session_id)
        except KeyError:
            debug ("Session [%s] doesn't exists, timeout expired ?" % session_id)

    def keys(self):
        return self._sessions.keys()

    def iterkeys(self):
        return self._sessions.iterkeys()


class Memory(object):
    """This class manage all persistent informations"""

    def __init__(self, host):
        info(_("Memory manager init"))
        self.initialized = defer.Deferred()
        self.host = host
        self._entities_cache = {} # XXX: keep presence/last resource/other data in cache
                                  #     /!\ an entity is not necessarily in roster
        self.subscriptions = {}
        self.server_features = {}  # used to store discovery's informations
        self.server_identities = {}
        self.config = self.parseMainConf()
        self.__fixLocalDir()
        database_file = os.path.expanduser(os.path.join(self.getConfig('', 'local_dir'), C.SAVEFILE_DATABASE))
        self.storage = SqliteStorage(database_file, host.__version__)
        PersistentDict.storage = self.storage
        self.params = Params(host, self.storage)
        info(_("Loading default params template"))
        self.params.load_default_params()
        d = self.storage.initialized.addCallback(lambda ignore: self.load())
        self.memory_data = PersistentDict("memory")
        d.addCallback(lambda ignore: self.memory_data.load())
        d.chainDeferred(self.initialized)

    def parseMainConf(self):
        """look for main .ini configuration file, and parse it"""
        config = SafeConfigParser(defaults=C.DEFAULT_CONFIG)
        try:
            config.read(C.CONFIG_FILES)
        except:
            error(_("Can't read main config !"))
        return config

    # XXX: tmp update code, will be removed in the future
    # When you remove this, please also remove sat.core.constants.Const.DEFAULT_LOCAL_DIR
    # and add the default value for 'local_dir' in sat.core.constants.Const.DEFAULT_CONFIG
    def __fixLocalDir(self):
        """Retro-compatibility with the previous local_dir default value."""
        if self.getConfig('', 'local_dir'):
            return  # nothing to do
        old_default = '~/.sat'
        if os.path.isfile(os.path.expanduser(old_default) + '/' + C.SAVEFILE_DATABASE):
            warning(_("A database has been found in the default local_dir for previous versions (< 0.5)"))
            config = SafeConfigParser()
            target_file = None
            for file_ in C.CONFIG_FILES[::-1]:
                # we will eventually update the existing file with the highest priority, if it's a user personal file...
                if os.path.isfile(file_):
                    if file_.startswith(os.path.expanduser('~')):
                        config.read([file_])
                        target_file = file_
                    break
            if not target_file:
                # ... otherwise we create a new config file for that user
                target_file = BaseDirectory.save_config_path('sat') + '/sat.conf'
            config.set('', 'local_dir', old_default)
            with open(target_file, 'wb') as configfile:
                config.write(configfile)  # for the next time that user launches sat
            warning(_("Auto-update: local_dir set to %(path)s in the file %(config_file)s") % {'path': old_default, 'config_file': file_})
            default = old_default
        else:  # use the new default local_dir
            default = C.DEFAULT_LOCAL_DIR
        self.config.set('', 'local_dir', default)  # for the currently running instance

    def getConfig(self, section, name):
        """Get the main configuration option
        @param section: section of the config file (None or '' for DEFAULT)
        @param name: name of the option
        """
        if not section:
            section = 'DEFAULT'
        try:
            value = self.config.get(section, name)
        except (NoOptionError, NoSectionError):
            value = ''

        if name.endswith('_path') or name.endswith('_dir'):
            value = os.path.expanduser(value)
        # thx to Brian (http://stackoverflow.com/questions/186857/splitting-a-semicolon-separated-string-to-a-dictionary-in-python/186873#186873)
        elif name.endswith('_list'):
            value = csv.reader([value], delimiter=',', quotechar='"').next()
        elif name.endswith('_dict'):
            value = dict(csv.reader([item], delimiter=':', quotechar='"').next()
                         for item in csv.reader([value], delimiter=',', quotechar='"').next())
        return value

    def load_xml(self, filename):
        """Load parameters template from xml file"""
        if filename is None:
            return False
        filename = os.path.expanduser(filename)
        if os.path.exists(filename):
            try:
                self.params.load_xml(filename)
                debug(_("Parameters loaded from file: %s") % filename)
                return True
            except Exception as e:
                error(_("Can't load parameters from file: %s") % e)
        return False

    def load(self):
        """Load parameters and all memory things from db"""
        #parameters data
        return self.params.loadGenParams()

    def loadIndividualParams(self, profile):
        """Load individual parameters for a profile
        @param profile: %(doc_profile)s"""
        return self.params.loadIndParams(profile)

    def startProfileSession(self, profile):
        """"Iniatialise session for a profile
        @param profile: %(doc_profile)s"""
        info(_("[%s] Profile session started" % profile))
        self._entities_cache[profile] = {}

    def purgeProfileSession(self, profile):
        """Delete cache of data of profile
        @param profile: %(doc_profile)s"""
        info(_("[%s] Profile session purge" % profile))
        self.params.purgeProfile(profile)
        try:
            del self._entities_cache[profile]
        except KeyError:
            error(_("Trying to purge roster status cache for a profile not in memory: [%s]") % profile)

    def save_xml(self, filename=None):
        """Save parameters template to xml file"""
        if filename is None:
            return False
        #TODO: need to encrypt files (at least passwords !) and set permissions
        filename = os.path.expanduser(filename)
        try:
            self.params.save_xml(filename)
            debug(_("Parameters saved to file: %s") % filename)
            return True
        except Exception as e:
            error(_("Can't save parameters to file: %s") % e)
        return False

    def getProfilesList(self):
        return self.storage.getProfilesList()

    def getProfileName(self, profile_key, return_profile_keys = False):
        """Return name of profile from keyword
        @param profile_key: can be the profile name or a keywork (like @DEFAULT@)
        @return: profile name or None if it doesn't exist"""
        return self.params.getProfileName(profile_key, return_profile_keys)

    def asyncCreateProfile(self, name):
        """Create a new profile
        @param name: Profile name
        """
        return self.params.asyncCreateProfile(name)

    def asyncDeleteProfile(self, name, force=False):
        """Delete an existing profile
        @param name: Name of the profile
        @param force: force the deletion even if the profile is connected.
        To be used for direct calls only (not through the bridge).
        @return: a Deferred instance
        """
        return self.params.asyncDeleteProfile(name, force)

    def addToHistory(self, from_jid, to_jid, message, type_='chat', extra=None, timestamp=None, profile=C.PROF_KEY_NONE):
        assert profile != C.PROF_KEY_NONE
        if extra is None:
            extra = {}
        return self.storage.addToHistory(from_jid, to_jid, message, type_, extra, timestamp, profile)

    def getHistory(self, from_jid, to_jid, limit=0, between=True, profile=C.PROF_KEY_NONE):
        assert profile != C.PROF_KEY_NONE
        return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between, profile)

    def addServerFeature(self, feature, jid_, profile):
        """Add a feature discovered from server
        @param feature: string of the feature
        @param jid_: the jid of the target server
        @param profile: which profile asked this server?"""
        if profile not in self.server_features:
            self.server_features[profile] = {}
        features = self.server_features[profile].setdefault(jid_, [])
        features.append(feature)

    def addServerIdentity(self, category, type_, entity, jid_, profile):
        """Add an identity discovered from server
        @param feature: string of the feature
        @param jid_: the jid of the target server
        @param profile: which profile asked this server?"""
        if not profile in self.server_identities:
            self.server_identities[profile] = {}
        identities = self.server_identities[profile].setdefault(jid_, {})
        if (category, type_) not in identities:
            identities[(category, type_)] = set()
        identities[(category, type_)].add(entity)

    def getServerServiceEntities(self, category, type_, jid_=None, profile=None):
        """Return all available entities of a server for the service (category, type_)
        @param category: identity's category
        @param type_: identitiy's type
        @param jid_: the jid of the target server (None for profile's server)
        @param profile: which profile is asking this server?
        @return: a set of entities or None if no cached data were found
        """
        if jid_ is None:
            jid_ = self.host.getClientHostJid(profile)
        if profile in self.server_identities and jid_ in self.server_identities[profile]:
            return self.server_identities[profile][jid_].get((category, type_), set())
        else:
            return None

    def getServerServiceEntity(self, category, type_, jid_=None, profile=None):
        """Helper method to get first available entity of a server for the service (category, type_)
        @param category: identity's category
        @param type_: identitiy's type
        @param jid_: the jid of the target server (None for profile's server)
        @param profile: which profile is asking this server?
        @return: the first found entity or None if no cached data were found
        """
        entities = self.getServerServiceEntities(category, type_, jid_, profile)
        if entities is None:
            warning(_("Entities (%(category)s/%(type)s) of %(server)s not available, maybe they haven't been asked yet?")
                    % {"category": category, "type": type_, "server": jid_})
            return None
        else:
            return list(entities)[0] if entities else None

    def getAllServerIdentities(self, jid_, profile):
        """Helper method to get all identities of a server
        @param jid_: the jid of the target server (None for profile's server)
        @param profile: which profile is asking this server?
        @return: a set of entities or None if no cached data were found
        """
        if jid_ is None:
            jid_ = self.host.getClientHostJid(profile)
        if jid_ not in self.server_identities[profile]:
            return None
        entities = set()
        for set_ in self.server_identities[profile][jid_].values():
            entities.update(set_)
        return entities

    def hasServerFeature(self, feature, jid_=None, profile_key=C.PROF_KEY_NONE):
        """Tell if the specified server has the required feature
        @param feature: requested feature
        @param jid_: the jid of the target server (None for profile's server)
        @param profile_key: %(doc_profile_key)s
        """
        profile = self.getProfileName(profile_key)
        if not profile:
            error(_('Trying find server feature for a non-existant profile'))
            return None
        assert profile in self.server_features
        if jid_ is None:
            jid_ = self.host.getClientHostJid(profile)
        if jid_ in self.server_features[profile]:
            return feature in self.server_features[profile][jid_]
        else:
            warning(_("Features of %s not available, maybe they haven't been asked yet?") % jid_)
            return None

    def _getLastResource(self, jid_s, profile_key):
        jid_ = jid.JID(jid_s)
        return self.getLastResource(jid_, profile_key) or ""


    def getLastResource(self, entity_jid, profile_key):
        """Return the last resource used by an entity
        @param entity_jid: entity jid
        @param profile_key: %(doc_profile_key)s"""
        data = self.getEntityData(entity_jid.userhostJID(), [C.ENTITY_LAST_RESOURCE], profile_key)
        try:
            return data[C.ENTITY_LAST_RESOURCE]
        except KeyError:
            return None

    def _getPresenceStatuses(self, profile_key):
        ret = self.getPresenceStatuses(profile_key)
        return {entity.full():data for entity, data in ret.iteritems()}

    def getPresenceStatuses(self, profile_key):
        """Get all the presence status of a profile
        @param profile_key: %(doc_profile_key)s
        @return: presence data: key=entity JID, value=presence data for this entity
        """
        profile = self.getProfileName(profile_key)
        if not profile:
            raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile'))
        entities_presence = {}
        for entity in self._entities_cache[profile]:
            if "presence" in self._entities_cache[profile][entity]:
                entities_presence[entity] = self._entities_cache[profile][entity]["presence"]

        debug("Memory getPresenceStatus (%s)", entities_presence)
        return entities_presence

    def setPresenceStatus(self, entity_jid, show, priority, statuses, profile_key):
        """Change the presence status of an entity
        @param entity_jid: jid.JID of the entity
        @param show: show status
        @param priotity: priotity
        @param statuses: dictionary of statuses
        @param profile_key: %(doc_profile_key)s
        """
        profile = self.getProfileName(profile_key)
        if not profile:
            raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile'))
        entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid]
        resource = entity_jid.resource
        if resource:
            entity_data[C.ENTITY_LAST_RESOURCE] = resource
        entity_data.setdefault("presence", {})[resource or ''] = (show, priority, statuses)

    def _getEntitiesData(self, entity_jid, profile):
        """Get data dictionary for entities
        @param entity_jid: JID of the entity, or C.ENTITY_ALL for all entities)
        @param profile: %(doc_profile)s
        @return: entities_data (key=jid, value=entity_data)
        @raise: exceptions.ProfileNotInCacheError if profile is not in cache
        """
        if not profile in self._entities_cache:
            raise exceptions.ProfileNotInCacheError
        if entity_jid == C.ENTITY_ALL:
            entities_data = self._entities_cache[profile]
        else:
            entity_data = self._entities_cache[profile].setdefault(entity_jid, {})
            entities_data = {entity_jid: entity_data}
        return entities_data

    def _updateEntityResources(self, entity_jid, profile):
        """Add a known resource to bare entity_jid cache
        @param entity_jid: full entity_jid (with resource)
        @param profile: %(doc_profile)s
        """
        assert(entity_jid.resource)
        entity_data = self._getEntitiesData(entity_jid.userhostJID(), profile)[entity_jid.userhostJID()]
        resources = entity_data.setdefault('resources', set())
        resources.add(entity_jid.resource)

    def updateEntityData(self, entity_jid, key, value, profile_key):
        """Set a misc data for an entity
        @param entity_jid: JID of the entity, or C.ENTITY_ALL to update all entities)
        @param key: key to set (eg: "type")
        @param value: value for this key (eg: "chatroom")
        @param profile_key: %(doc_profile_key)s
        """
        profile = self.getProfileName(profile_key)
        if not profile:
            raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile'))
        entities_data = self._getEntitiesData(entity_jid, profile)
        if entity_jid.resource and entity_jid != C.ENTITY_ALL:
            self._updateEntityResources(entity_jid, profile)

        for jid_ in entities_data:
            entity_data = entities_data[jid_]
            if value == C.PROF_KEY_NONE and key in entity_data:
                del entity_data[key]
            else:
                entity_data[key] = value
            if isinstance(value, basestring):
                self.host.bridge.entityDataUpdated(jid_.full(), key, value, profile)

    def delEntityData(self, entity_jid, key, profile_key):
        """Delete data for an entity
        @param entity_jid: JID of the entity, or C.ENTITY_ALL to delete data from all entities)
        @param key: key to delete (eg: "type")
        @param profile_key: %(doc_profile_key)s
        """
        entities_data = self._getEntitiesData(entity_jid, profile_key)
        for entity_jid in entities_data:
            entity_data = entities_data[entity_jid]
            try:
                del entity_data[key]
            except KeyError:
                debug("Key [%s] doesn't exist for [%s] in entities_cache" % (key, entity_jid.full()))

    def getEntityData(self, entity_jid, keys_list, profile_key):
        """Get a list of cached values for entity
        @param entity_jid: JID of the entity
        @param keys_list: list of keys to get, empty list for everything
        @param profile_key: %(doc_profile_key)s
        @return: dict withs values for each key in keys_list.
                 if there is no value of a given key, resulting dict will
                 have nothing with that key nether
        @raise: exceptions.UnknownEntityError if entity is not in cache
        """
        profile = self.getProfileName(profile_key)
        if not profile:
            raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile'))
        entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid]
        if not keys_list:
            return entity_data
        ret = {}
        for key in keys_list:
            if key in entity_data:
                ret[key] = entity_data[key]
        return ret

    def delEntityCache(self, entity_jid, delete_all_resources=True, profile_key=C.PROF_KEY_NONE):
        """Remove cached data for entity
        @param entity_jid: JID of the entity to delete
        @param delete_all_resources: if True also delete all known resources form cache
        @param profile_key: %(doc_profile_key)s
        """
        profile = self.getProfileName(profile_key)
        if not profile:
            raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile'))
        to_delete = set([entity_jid])

        if delete_all_resources:
            if entity_jid.resource:
                raise ValueError(_("Need a bare jid to delete all resources"))
            entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid]
            resources = entity_data.setdefault('resources', set())
            to_delete.update([jid.JID("%s/%s" % (entity_jid.userhost(), resource)) for resource in resources])

        for entity in to_delete:
            try:
                del self._entities_cache[profile][entity]
            except KeyError:
                debug("Can't delete entity [%s]: not in cache" % entity.full())

    def addWaitingSub(self, type_, entity_jid, profile_key):
        """Called when a subcription request is received"""
        profile = self.getProfileName(profile_key)
        assert profile
        if profile not in self.subscriptions:
            self.subscriptions[profile] = {}
        self.subscriptions[profile][entity_jid] = type_

    def delWaitingSub(self, entity_jid, profile_key):
        """Called when a subcription request is finished"""
        profile = self.getProfileName(profile_key)
        assert profile
        if profile in self.subscriptions and entity_jid in self.subscriptions[profile]:
            del self.subscriptions[profile][entity_jid]

    def getWaitingSub(self, profile_key):
        """Called to get a list of currently waiting subscription requests"""
        profile = self.getProfileName(profile_key)
        if not profile:
            error(_('Asking waiting subscriptions for a non-existant profile'))
            return {}
        if profile not in self.subscriptions:
            return {}

        return self.subscriptions[profile]

    def getStringParamA(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE):
        return self.params.getStringParamA(name, category, attr, profile_key)

    def getParamA(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE):
        return self.params.getParamA(name, category, attr, profile_key)

    def asyncGetParamA(self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE):
        return self.params.asyncGetParamA(name, category, attr, security_limit, profile_key)

    def asyncGetStringParamA(self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE):
        return self.params.asyncGetStringParamA(name, category, attr, security_limit, profile_key)

    def getParamsUI(self, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE):
        return self.params.getParamsUI(security_limit, app, profile_key)

    def getParams(self, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE):
        return self.params.getParams(security_limit, app, profile_key)

    def getParamsForCategory(self, category, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE):
        return self.params.getParamsForCategory(category, security_limit, app, profile_key)

    def getParamsCategories(self):
        return self.params.getParamsCategories()

    def setParam(self, name, value, category, security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE):
        return self.params.setParam(name, value, category, security_limit, profile_key)

    def updateParams(self, xml):
        return self.params.updateParams(xml)

    def paramsRegisterApp(self, xml, security_limit=C.NO_SECURITY_LIMIT, app=''):
        return self.params.paramsRegisterApp(xml, security_limit, app)

    def setDefault(self, name, category, callback, errback=None):
        return self.params.setDefault(name, category, callback, errback)