Mercurial > libervia-backend
view src/memory/memory.py @ 972:07b817f5a197
core: better plugin initialisation sequence:
- profileConnected are now executed in a deferredList, instead of one after the other
- failure are collected, and a global message show which plugins failed and the failure message
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 02 Apr 2014 12:31:23 +0200 |
parents | fe181994246a |
children | 301b342c697a |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT: a jabber client # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ import os.path import csv from xdg import BaseDirectory from ConfigParser import SafeConfigParser, NoOptionError, NoSectionError from uuid import uuid4 from logging import debug, info, warning, error from twisted.internet import defer, reactor from twisted.words.protocols.jabber import jid from sat.core import exceptions from sat.core.constants import Const as C from sat.memory.sqlite import SqliteStorage from sat.memory.persistent import PersistentDict from sat.memory.params import Params from sat.memory.disco import Discovery class Sessions(object): DEFAULT_TIMEOUT = 600 def __init__(self, timeout = None): """ @param timeout: nb of seconds before session destruction """ self._sessions = dict() self.timeout = timeout or Sessions.DEFAULT_TIMEOUT def newSession(self, session_data=None, profile=None): """ Create a new session @param session_data: mutable data to use, default to a dict @param profile: if set, the session is owned by the profile, and profileGet must be used instead of __getitem__ @return: session_id, session_data """ session_id = str(uuid4()) timer = reactor.callLater(self.timeout, self._purgeSession, session_id) if session_data is None: session_data = {} self._sessions[session_id] = (timer, session_data) if profile is None else (timer, session_data, profile) return session_id, session_data def _purgeSession(self, session_id): del self._sessions[session_id] debug("Session [%s] purged" % session_id) def __len__(self): return len(self._sessions) def __contains__(self, session_id): return session_id in self._sessions def profileGet(self, session_id, profile): try: timer, session_data, profile_set = self._sessions[session_id] except ValueError: raise exceptions.InternalError("You need to use __getitem__ when profile is not set") if profile_set != profile: raise exceptions.InternalError("current profile differ from set profile !") timer.reset(self.timeout) return session_data def __getitem__(self, session_id): try: timer, session_data = self._sessions[session_id] except ValueError: raise exceptions.InternalError("You need to use profileGet instead of __getitem__ when profile is set") timer.reset(self.timeout) return session_data def __setitem__(self, key, value): raise NotImplementedError("You need do use newSession to create a session") def __delitem__(self, session_id): """ Cancel the timer, then actually delete the session data """ try: timer = self._sessions[session_id][0] timer.cancel() self._purgeSession(session_id) except KeyError: debug ("Session [%s] doesn't exists, timeout expired ?" % session_id) def keys(self): return self._sessions.keys() def iterkeys(self): return self._sessions.iterkeys() class Memory(object): """This class manage all persistent informations""" def __init__(self, host): info(_("Memory manager init")) self.initialized = defer.Deferred() self.host = host self._entities_cache = {} # XXX: keep presence/last resource/other data in cache # /!\ an entity is not necessarily in roster self.subscriptions = {} self.disco = Discovery(host) self.config = self.parseMainConf() self.__fixLocalDir() database_file = os.path.expanduser(os.path.join(self.getConfig('', 'local_dir'), C.SAVEFILE_DATABASE)) self.storage = SqliteStorage(database_file, host.__version__) PersistentDict.storage = self.storage self.params = Params(host, self.storage) info(_("Loading default params template")) self.params.load_default_params() d = self.storage.initialized.addCallback(lambda ignore: self.load()) self.memory_data = PersistentDict("memory") d.addCallback(lambda ignore: self.memory_data.load()) d.chainDeferred(self.initialized) def parseMainConf(self): """look for main .ini configuration file, and parse it""" config = SafeConfigParser(defaults=C.DEFAULT_CONFIG) try: config.read(C.CONFIG_FILES) except: error(_("Can't read main config !")) return config # XXX: tmp update code, will be removed in the future # When you remove this, please also remove sat.core.constants.Const.DEFAULT_LOCAL_DIR # and add the default value for 'local_dir' in sat.core.constants.Const.DEFAULT_CONFIG def __fixLocalDir(self): """Retro-compatibility with the previous local_dir default value.""" if self.getConfig('', 'local_dir'): return # nothing to do old_default = '~/.sat' if os.path.isfile(os.path.expanduser(old_default) + '/' + C.SAVEFILE_DATABASE): warning(_("A database has been found in the default local_dir for previous versions (< 0.5)")) config = SafeConfigParser() target_file = None for file_ in C.CONFIG_FILES[::-1]: # we will eventually update the existing file with the highest priority, if it's a user personal file... if os.path.isfile(file_): if file_.startswith(os.path.expanduser('~')): config.read([file_]) target_file = file_ break if not target_file: # ... otherwise we create a new config file for that user target_file = BaseDirectory.save_config_path('sat') + '/sat.conf' config.set('', 'local_dir', old_default) with open(target_file, 'wb') as configfile: config.write(configfile) # for the next time that user launches sat warning(_("Auto-update: local_dir set to %(path)s in the file %(config_file)s") % {'path': old_default, 'config_file': file_}) default = old_default else: # use the new default local_dir default = C.DEFAULT_LOCAL_DIR self.config.set('', 'local_dir', default) # for the currently running instance def getConfig(self, section, name): """Get the main configuration option @param section: section of the config file (None or '' for DEFAULT) @param name: name of the option """ if not section: section = 'DEFAULT' try: value = self.config.get(section, name) except (NoOptionError, NoSectionError): value = '' if name.endswith('_path') or name.endswith('_dir'): value = os.path.expanduser(value) # thx to Brian (http://stackoverflow.com/questions/186857/splitting-a-semicolon-separated-string-to-a-dictionary-in-python/186873#186873) elif name.endswith('_list'): value = csv.reader([value], delimiter=',', quotechar='"').next() elif name.endswith('_dict'): value = dict(csv.reader([item], delimiter=':', quotechar='"').next() for item in csv.reader([value], delimiter=',', quotechar='"').next()) return value def load_xml(self, filename): """Load parameters template from xml file""" if filename is None: return False filename = os.path.expanduser(filename) if os.path.exists(filename): try: self.params.load_xml(filename) debug(_("Parameters loaded from file: %s") % filename) return True except Exception as e: error(_("Can't load parameters from file: %s") % e) return False def load(self): """Load parameters and all memory things from db""" #parameters data return self.params.loadGenParams() def loadIndividualParams(self, profile): """Load individual parameters for a profile @param profile: %(doc_profile)s""" return self.params.loadIndParams(profile) def startProfileSession(self, profile): """"Iniatialise session for a profile @param profile: %(doc_profile)s""" info(_("[%s] Profile session started" % profile)) self._entities_cache[profile] = {} def purgeProfileSession(self, profile): """Delete cache of data of profile @param profile: %(doc_profile)s""" info(_("[%s] Profile session purge" % profile)) self.params.purgeProfile(profile) try: del self._entities_cache[profile] except KeyError: error(_("Trying to purge roster status cache for a profile not in memory: [%s]") % profile) def save_xml(self, filename=None): """Save parameters template to xml file""" if filename is None: return False #TODO: need to encrypt files (at least passwords !) and set permissions filename = os.path.expanduser(filename) try: self.params.save_xml(filename) debug(_("Parameters saved to file: %s") % filename) return True except Exception as e: error(_("Can't save parameters to file: %s") % e) return False def getProfilesList(self): return self.storage.getProfilesList() def getProfileName(self, profile_key, return_profile_keys = False): """Return name of profile from keyword @param profile_key: can be the profile name or a keywork (like @DEFAULT@) @return: profile name or None if it doesn't exist""" return self.params.getProfileName(profile_key, return_profile_keys) def asyncCreateProfile(self, name): """Create a new profile @param name: Profile name """ return self.params.asyncCreateProfile(name) def asyncDeleteProfile(self, name, force=False): """Delete an existing profile @param name: Name of the profile @param force: force the deletion even if the profile is connected. To be used for direct calls only (not through the bridge). @return: a Deferred instance """ return self.params.asyncDeleteProfile(name, force) def addToHistory(self, from_jid, to_jid, message, type_='chat', extra=None, timestamp=None, profile=C.PROF_KEY_NONE): assert profile != C.PROF_KEY_NONE if extra is None: extra = {} return self.storage.addToHistory(from_jid, to_jid, message, type_, extra, timestamp, profile) def getHistory(self, from_jid, to_jid, limit=0, between=True, profile=C.PROF_KEY_NONE): assert profile != C.PROF_KEY_NONE return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between, profile) def _getLastResource(self, jid_s, profile_key): jid_ = jid.JID(jid_s) return self.getLastResource(jid_, profile_key) or "" def getLastResource(self, entity_jid, profile_key): """Return the last resource used by an entity @param entity_jid: entity jid @param profile_key: %(doc_profile_key)s""" data = self.getEntityData(entity_jid.userhostJID(), [C.ENTITY_LAST_RESOURCE], profile_key) try: return data[C.ENTITY_LAST_RESOURCE] except KeyError: return None def _getPresenceStatuses(self, profile_key): ret = self.getPresenceStatuses(profile_key) return {entity.full():data for entity, data in ret.iteritems()} def getPresenceStatuses(self, profile_key): """Get all the presence status of a profile @param profile_key: %(doc_profile_key)s @return: presence data: key=entity JID, value=presence data for this entity """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entities_presence = {} for entity in self._entities_cache[profile]: if "presence" in self._entities_cache[profile][entity]: entities_presence[entity] = self._entities_cache[profile][entity]["presence"] debug("Memory getPresenceStatus (%s)", entities_presence) return entities_presence def setPresenceStatus(self, entity_jid, show, priority, statuses, profile_key): """Change the presence status of an entity @param entity_jid: jid.JID of the entity @param show: show status @param priotity: priotity @param statuses: dictionary of statuses @param profile_key: %(doc_profile_key)s """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid] resource = entity_jid.resource if resource: self.updateEntityData(entity_jid.userhostJID(), C.ENTITY_LAST_RESOURCE, resource, profile) entity_data.setdefault("presence", {})[resource or ''] = (show, priority, statuses) def _getEntitiesData(self, entity_jid, profile): """Get data dictionary for entities @param entity_jid: JID of the entity, or C.ENTITY_ALL for all entities) @param profile: %(doc_profile)s @return: entities_data (key=jid, value=entity_data) @raise: exceptions.ProfileNotInCacheError if profile is not in cache """ if not profile in self._entities_cache: raise exceptions.ProfileNotInCacheError if entity_jid == C.ENTITY_ALL: entities_data = self._entities_cache[profile] else: entity_data = self._entities_cache[profile].setdefault(entity_jid, {}) entities_data = {entity_jid: entity_data} return entities_data def _updateEntityResources(self, entity_jid, profile): """Add a known resource to bare entity_jid cache @param entity_jid: full entity_jid (with resource) @param profile: %(doc_profile)s """ assert(entity_jid.resource) entity_data = self._getEntitiesData(entity_jid.userhostJID(), profile)[entity_jid.userhostJID()] resources = entity_data.setdefault('resources', set()) resources.add(entity_jid.resource) def updateEntityData(self, entity_jid, key, value, profile_key): """Set a misc data for an entity @param entity_jid: JID of the entity, or C.ENTITY_ALL to update all entities) @param key: key to set (eg: "type") @param value: value for this key (eg: "chatroom") @param profile_key: %(doc_profile_key)s """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entities_data = self._getEntitiesData(entity_jid, profile) if entity_jid.resource and entity_jid != C.ENTITY_ALL: self._updateEntityResources(entity_jid, profile) for jid_ in entities_data: entity_data = entities_data[jid_] if value == C.PROF_KEY_NONE and key in entity_data: del entity_data[key] else: entity_data[key] = value if isinstance(value, basestring): self.host.bridge.entityDataUpdated(jid_.full(), key, value, profile) def delEntityData(self, entity_jid, key, profile_key): """Delete data for an entity @param entity_jid: JID of the entity, or C.ENTITY_ALL to delete data from all entities) @param key: key to delete (eg: "type") @param profile_key: %(doc_profile_key)s """ entities_data = self._getEntitiesData(entity_jid, profile_key) for entity_jid in entities_data: entity_data = entities_data[entity_jid] try: del entity_data[key] except KeyError: debug("Key [%s] doesn't exist for [%s] in entities_cache" % (key, entity_jid.full())) def getEntityData(self, entity_jid, keys_list, profile_key): """Get a list of cached values for entity @param entity_jid: JID of the entity @param keys_list (iterable): list of keys to get, empty list for everything @param profile_key: %(doc_profile_key)s @return: dict withs values for each key in keys_list. if there is no value of a given key, resulting dict will have nothing with that key nether @raise: exceptions.UnknownEntityError if entity is not in cache """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid] if not keys_list: return entity_data ret = {} for key in keys_list: if key in entity_data: ret[key] = entity_data[key] return ret def getEntityDatum(self, entity_jid, key, profile_key): """Get a datum from entity @param entity_jid: JID of the entity @param keys: key to get @param profile_key: %(doc_profile_key)s @return: requested value @raise: exceptions.UnknownEntityError if entity is not in cache @raise: KeyError if there is no value for this key and this entity """ return self.getEntityData(entity_jid, (key,), profile_key)[key] def delEntityCache(self, entity_jid, delete_all_resources=True, profile_key=C.PROF_KEY_NONE): """Remove cached data for entity @param entity_jid: JID of the entity to delete @param delete_all_resources: if True also delete all known resources form cache @param profile_key: %(doc_profile_key)s """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) to_delete = set([entity_jid]) if delete_all_resources: if entity_jid.resource: raise ValueError(_("Need a bare jid to delete all resources")) entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid] resources = entity_data.setdefault('resources', set()) to_delete.update([jid.JID("%s/%s" % (entity_jid.userhost(), resource)) for resource in resources]) for entity in to_delete: try: del self._entities_cache[profile][entity] except KeyError: debug("Can't delete entity [%s]: not in cache" % entity.full()) def addWaitingSub(self, type_, entity_jid, profile_key): """Called when a subcription request is received""" profile = self.getProfileName(profile_key) assert profile if profile not in self.subscriptions: self.subscriptions[profile] = {} self.subscriptions[profile][entity_jid] = type_ def delWaitingSub(self, entity_jid, profile_key): """Called when a subcription request is finished""" profile = self.getProfileName(profile_key) assert profile if profile in self.subscriptions and entity_jid in self.subscriptions[profile]: del self.subscriptions[profile][entity_jid] def getWaitingSub(self, profile_key): """Called to get a list of currently waiting subscription requests""" profile = self.getProfileName(profile_key) if not profile: error(_('Asking waiting subscriptions for a non-existant profile')) return {} if profile not in self.subscriptions: return {} return self.subscriptions[profile] def getStringParamA(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE): return self.params.getStringParamA(name, category, attr, profile_key) def getParamA(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE): return self.params.getParamA(name, category, attr, profile_key) def asyncGetParamA(self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): return self.params.asyncGetParamA(name, category, attr, security_limit, profile_key) def asyncGetStringParamA(self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): return self.params.asyncGetStringParamA(name, category, attr, security_limit, profile_key) def getParamsUI(self, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE): return self.params.getParamsUI(security_limit, app, profile_key) def getParams(self, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE): return self.params.getParams(security_limit, app, profile_key) def getParamsForCategory(self, category, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE): return self.params.getParamsForCategory(category, security_limit, app, profile_key) def getParamsCategories(self): return self.params.getParamsCategories() def setParam(self, name, value, category, security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): return self.params.setParam(name, value, category, security_limit, profile_key) def updateParams(self, xml): return self.params.updateParams(xml) def paramsRegisterApp(self, xml, security_limit=C.NO_SECURITY_LIMIT, app=''): return self.params.paramsRegisterApp(xml, security_limit, app) def setDefault(self, name, category, callback, errback=None): return self.params.setDefault(name, category, callback, errback)