Mercurial > libervia-backend
view src/memory/memory.py @ 1063:6ec513ad92c2
frontends: async failures are more detailed (full class name + error message)
author | souliane <souliane@mailoo.org> |
---|---|
date | Sat, 07 Jun 2014 15:39:20 +0200 |
parents | a874a79ad0f5 |
children | 7ee9d9db67b9 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT: a jabber client # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ import os.path import csv from ConfigParser import SafeConfigParser, NoOptionError, NoSectionError from uuid import uuid4 from sat.core.log import getLogger log = getLogger(__name__) from twisted.internet import defer, reactor from twisted.words.protocols.jabber import jid from sat.core import exceptions from sat.core.constants import Const as C from sat.memory.sqlite import SqliteStorage from sat.memory.persistent import PersistentDict from sat.memory.params import Params from sat.memory.disco import Discovery from sat.memory.crypto import BlockCipher from sat.tools.config import fixConfigOption class Sessions(object): """Sessions are data associated to key used for a temporary moment, with optional profile checking.""" DEFAULT_TIMEOUT = 600 def __init__(self, timeout=None): """ @param timeout: nb of seconds before session destruction """ self._sessions = dict() self.timeout = timeout or Sessions.DEFAULT_TIMEOUT def newSession(self, session_data=None, profile=None): """ Create a new session @param session_data: mutable data to use, default to a dict @param profile: if set, the session is owned by the profile, and profileGet must be used instead of __getitem__ @return: session_id, session_data """ session_id = str(uuid4()) timer = reactor.callLater(self.timeout, self._purgeSession, session_id) if session_data is None: session_data = {} self._sessions[session_id] = (timer, session_data) if profile is None else (timer, session_data, profile) return session_id, session_data def _purgeSession(self, session_id): del self._sessions[session_id] log.debug("Session [%s] purged" % session_id) def __len__(self): return len(self._sessions) def __contains__(self, session_id): return session_id in self._sessions def profileGet(self, session_id, profile): try: timer, session_data, profile_set = self._sessions[session_id] except ValueError: raise exceptions.InternalError("You need to use __getitem__ when profile is not set") if profile_set != profile: raise exceptions.InternalError("current profile differ from set profile !") timer.reset(self.timeout) return session_data def __getitem__(self, session_id): try: timer, session_data = self._sessions[session_id] except ValueError: raise exceptions.InternalError("You need to use profileGet instead of __getitem__ when profile is set") timer.reset(self.timeout) return session_data def __setitem__(self, key, value): raise NotImplementedError("You need do use newSession to create a session") def __delitem__(self, session_id): """ Cancel the timer, then actually delete the session data """ try: timer = self._sessions[session_id][0] timer.cancel() self._purgeSession(session_id) except KeyError: log.debug("Session [%s] doesn't exists, timeout expired?" % session_id) def keys(self): return self._sessions.keys() def iterkeys(self): return self._sessions.iterkeys() class ProfileSessions(Sessions): """ProfileSessions extends the Sessions class, but here the profile can be used as the key to retrieve data or delete a session (instead of session id). """ def _profileGetAllIds(self, profile): """Return a list of the sessions ids that are associated to the given profile. @param profile: %(doc_profile)s @return: a list containing the sessions ids """ ret = [] for session_id in self._sessions: try: timer, session_data, profile_set = self._sessions[session_id] except ValueError: continue if profile == profile_set: ret.append(session_id) return ret def profileGetUnique(self, profile): """Return the data of the unique session that is associated to the given profile. @param profile: %(doc_profile)s @return: - mutable data (default: dict) of the unique session - None if no session is associated to the profile - raise an error if more than one session are found """ ids = self._profileGetAllIds(profile) if len(ids) > 1: raise exceptions.InternalError('profileGetUnique has been used but more than one session has been found!') return self._sessions[ids[0]][1] if len(ids) == 1 else None def profileDelUnique(self, profile): """Delete the unique session that is associated to the given profile. @param profile: %(doc_profile)s @return: None, but raise an error if more than one session are found """ ids = self._profileGetAllIds(profile) if len(ids) > 1: raise exceptions.InternalError('profileDelUnique has been used but more than one session has been found!') if len(ids) == 1: del self._sessions[ids[0]] # XXX: tmp update code, will be removed in the future # When you remove this, please add the default value for # 'local_dir' in sat.core.constants.Const.DEFAULT_CONFIG def fixLocalDir(silent=True): """Retro-compatibility with the previous local_dir default value. @param silent (boolean): toggle logging output (must be True when called from sat.sh) """ user_config = SafeConfigParser() try: user_config.read(C.CONFIG_FILES) except: pass # file is readable but its structure if wrong try: current_value = user_config.get('DEFAULT', 'local_dir') except (NoOptionError, NoSectionError): current_value = '' if current_value: return # nothing to do old_default = '~/.sat' if os.path.isfile(os.path.expanduser(old_default) + '/' + C.SAVEFILE_DATABASE): if not silent: log.warning(_("A database has been found in the default local_dir for previous versions (< 0.5)")) fixConfigOption('', 'local_dir', old_default, silent) class Memory(object): """This class manage all persistent informations""" def __init__(self, host): log.info(_("Memory manager init")) self.initialized = defer.Deferred() self.host = host self._entities_cache = {} # XXX: keep presence/last resource/other data in cache # /!\ an entity is not necessarily in roster self.subscriptions = {} self.auth_sessions = ProfileSessions() # remember the authenticated profiles self.disco = Discovery(host) fixLocalDir(False) # XXX: tmp update code, will be removed in the future self.config = self.parseMainConf() database_file = os.path.expanduser(os.path.join(self.getConfig('', 'local_dir'), C.SAVEFILE_DATABASE)) self.storage = SqliteStorage(database_file, host.__version__) PersistentDict.storage = self.storage self.params = Params(host, self.storage) log.info(_("Loading default params template")) self.params.load_default_params() d = self.storage.initialized.addCallback(lambda ignore: self.load()) self.memory_data = PersistentDict("memory") d.addCallback(lambda ignore: self.memory_data.load()) d.chainDeferred(self.initialized) def parseMainConf(self): """look for main .ini configuration file, and parse it""" config = SafeConfigParser(defaults=C.DEFAULT_CONFIG) try: config.read(C.CONFIG_FILES) except: log.error(_("Can't read main config !")) return config def getConfig(self, section, name): """Get the main configuration option @param section: section of the config file (None or '' for DEFAULT) @param name: name of the option """ if not section: section = 'DEFAULT' try: value = self.config.get(section, name) except (NoOptionError, NoSectionError): value = '' if name.endswith('_path') or name.endswith('_dir'): value = os.path.expanduser(value) # thx to Brian (http://stackoverflow.com/questions/186857/splitting-a-semicolon-separated-string-to-a-dictionary-in-python/186873#186873) elif name.endswith('_list'): value = csv.reader([value], delimiter=',', quotechar='"').next() elif name.endswith('_dict'): value = dict(csv.reader([item], delimiter=':', quotechar='"').next() for item in csv.reader([value], delimiter=',', quotechar='"').next()) return value def load_xml(self, filename): """Load parameters template from xml file @param filename (str): input file @return: bool: True in case of success """ if not filename: return False filename = os.path.expanduser(filename) if os.path.exists(filename): try: self.params.load_xml(filename) log.debug(_("Parameters loaded from file: %s") % filename) return True except Exception as e: log.error(_("Can't load parameters from file: %s") % e) return False def load(self): """Load parameters and all memory things from db""" #parameters data return self.params.loadGenParams() def loadIndividualParams(self, profile): """Load individual parameters for a profile @param profile: %(doc_profile)s""" return self.params.loadIndParams(profile) def startProfileSession(self, profile): """"Iniatialise session for a profile @param profile: %(doc_profile)s""" log.info(_("[%s] Profile session started" % profile)) self._entities_cache[profile] = {} def newAuthSession(self, key, profile): """Start a new session for the authenticated profile. The personal key is loaded encrypted from a PersistentDict before being decrypted. @param key: the key to decrypt the personal key @param profile: %(doc_profile)s @return: a deferred None value """ def gotPersonalKey(personal_key): """Create the session for this profile and store the personal key""" self.auth_sessions.newSession({C.MEMORY_CRYPTO_KEY: personal_key}, profile) log.debug('auth session created for profile %s' % profile) d = PersistentDict(C.MEMORY_CRYPTO_NAMESPACE, profile).load() d.addCallback(lambda data: BlockCipher.decrypt(key, data[C.MEMORY_CRYPTO_KEY])) return d.addCallback(gotPersonalKey) def purgeProfileSession(self, profile): """Delete cache of data of profile @param profile: %(doc_profile)s""" log.info(_("[%s] Profile session purge" % profile)) self.params.purgeProfile(profile) try: del self._entities_cache[profile] except KeyError: log.error(_("Trying to purge roster status cache for a profile not in memory: [%s]") % profile) def save_xml(self, filename): """Save parameters template to xml file @param filename (str): output file @return: bool: True in case of success """ if not filename: return False #TODO: need to encrypt files (at least passwords !) and set permissions filename = os.path.expanduser(filename) try: self.params.save_xml(filename) log.debug(_("Parameters saved to file: %s") % filename) return True except Exception as e: log.error(_("Can't save parameters to file: %s") % e) return False def getProfilesList(self): return self.storage.getProfilesList() def getProfileName(self, profile_key, return_profile_keys=False): """Return name of profile from keyword @param profile_key: can be the profile name or a keywork (like @DEFAULT@) @return: profile name or None if it doesn't exist""" return self.params.getProfileName(profile_key, return_profile_keys) def asyncCreateProfile(self, name, password=''): """Create a new profile @param name: profile name @param password: profile password @return: Deferred """ personal_key = BlockCipher.getRandomKey(base64=True) # generated once for all and saved in a PersistentDict self.auth_sessions.newSession({C.MEMORY_CRYPTO_KEY: personal_key}, name) # will be encrypted by setParam d = self.params.asyncCreateProfile(name) d.addCallback(lambda dummy: self.setParam(C.PROFILE_PASS_PATH[1], password, C.PROFILE_PASS_PATH[0], profile_key=name)) return d def asyncDeleteProfile(self, name, force=False): """Delete an existing profile @param name: Name of the profile @param force: force the deletion even if the profile is connected. To be used for direct calls only (not through the bridge). @return: a Deferred instance """ return self.params.asyncDeleteProfile(name, force) def addToHistory(self, from_jid, to_jid, message, type_='chat', extra=None, timestamp=None, profile=C.PROF_KEY_NONE): assert profile != C.PROF_KEY_NONE if extra is None: extra = {} return self.storage.addToHistory(from_jid, to_jid, message, type_, extra, timestamp, profile) def getHistory(self, from_jid, to_jid, limit=0, between=True, profile=C.PROF_KEY_NONE): assert profile != C.PROF_KEY_NONE return self.storage.getHistory(jid.JID(from_jid), jid.JID(to_jid), limit, between, profile) def _getLastResource(self, jid_s, profile_key): jid_ = jid.JID(jid_s) return self.getLastResource(jid_, profile_key) or "" def getLastResource(self, entity_jid, profile_key): """Return the last resource used by an entity @param entity_jid: entity jid @param profile_key: %(doc_profile_key)s""" data = self.getEntityData(entity_jid.userhostJID(), [C.ENTITY_LAST_RESOURCE], profile_key) try: return data[C.ENTITY_LAST_RESOURCE] except KeyError: return None def _getPresenceStatuses(self, profile_key): ret = self.getPresenceStatuses(profile_key) return {entity.full():data for entity, data in ret.iteritems()} def getPresenceStatuses(self, profile_key): """Get all the presence status of a profile @param profile_key: %(doc_profile_key)s @return: presence data: key=entity JID, value=presence data for this entity """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entities_presence = {} for entity in self._entities_cache[profile]: if "presence" in self._entities_cache[profile][entity]: entities_presence[entity] = self._entities_cache[profile][entity]["presence"] log.debug("Memory getPresenceStatus (%s)" % entities_presence) return entities_presence def setPresenceStatus(self, entity_jid, show, priority, statuses, profile_key): """Change the presence status of an entity @param entity_jid: jid.JID of the entity @param show: show status @param priotity: priotity @param statuses: dictionary of statuses @param profile_key: %(doc_profile_key)s """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid] resource = entity_jid.resource if resource: self.updateEntityData(entity_jid.userhostJID(), C.ENTITY_LAST_RESOURCE, resource, profile) entity_data.setdefault("presence", {})[resource or ''] = (show, priority, statuses) def _getEntitiesData(self, entity_jid, profile): """Get data dictionary for entities @param entity_jid: JID of the entity, or C.ENTITY_ALL for all entities) @param profile: %(doc_profile)s @return: entities_data (key=jid, value=entity_data) @raise: exceptions.ProfileNotInCacheError if profile is not in cache """ if not profile in self._entities_cache: raise exceptions.ProfileNotInCacheError if entity_jid == C.ENTITY_ALL: entities_data = self._entities_cache[profile] else: entity_data = self._entities_cache[profile].setdefault(entity_jid, {}) entities_data = {entity_jid: entity_data} return entities_data def _updateEntityResources(self, entity_jid, profile): """Add a known resource to bare entity_jid cache @param entity_jid: full entity_jid (with resource) @param profile: %(doc_profile)s """ assert(entity_jid.resource) entity_data = self._getEntitiesData(entity_jid.userhostJID(), profile)[entity_jid.userhostJID()] resources = entity_data.setdefault('resources', set()) resources.add(entity_jid.resource) def updateEntityData(self, entity_jid, key, value, profile_key): """Set a misc data for an entity @param entity_jid: JID of the entity, or C.ENTITY_ALL to update all entities) @param key: key to set (eg: "type") @param value: value for this key (eg: "chatroom") @param profile_key: %(doc_profile_key)s """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entities_data = self._getEntitiesData(entity_jid, profile) if entity_jid.resource and entity_jid != C.ENTITY_ALL: self._updateEntityResources(entity_jid, profile) for jid_ in entities_data: entity_data = entities_data[jid_] if value == C.PROF_KEY_NONE and key in entity_data: del entity_data[key] else: entity_data[key] = value if isinstance(value, basestring): self.host.bridge.entityDataUpdated(jid_.full(), key, value, profile) def delEntityData(self, entity_jid, key, profile_key): """Delete data for an entity @param entity_jid: JID of the entity, or C.ENTITY_ALL to delete data from all entities) @param key: key to delete (eg: "type") @param profile_key: %(doc_profile_key)s """ entities_data = self._getEntitiesData(entity_jid, profile_key) for entity_jid in entities_data: entity_data = entities_data[entity_jid] try: del entity_data[key] except KeyError: log.debug("Key [%s] doesn't exist for [%s] in entities_cache" % (key, entity_jid.full())) def getEntityData(self, entity_jid, keys_list, profile_key): """Get a list of cached values for entity @param entity_jid: JID of the entity @param keys_list (iterable): list of keys to get, empty list for everything @param profile_key: %(doc_profile_key)s @return: dict withs values for each key in keys_list. if there is no value of a given key, resulting dict will have nothing with that key nether @raise: exceptions.UnknownEntityError if entity is not in cache """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid] if not keys_list: return entity_data ret = {} for key in keys_list: if key in entity_data: ret[key] = entity_data[key] return ret def getEntityDatum(self, entity_jid, key, profile_key): """Get a datum from entity @param entity_jid: JID of the entity @param keys: key to get @param profile_key: %(doc_profile_key)s @return: requested value @raise: exceptions.UnknownEntityError if entity is not in cache @raise: KeyError if there is no value for this key and this entity """ return self.getEntityData(entity_jid, (key,), profile_key)[key] def delEntityCache(self, entity_jid, delete_all_resources=True, profile_key=C.PROF_KEY_NONE): """Remove cached data for entity @param entity_jid: JID of the entity to delete @param delete_all_resources: if True also delete all known resources form cache @param profile_key: %(doc_profile_key)s """ profile = self.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError(_('Trying to get entity data for a non-existant profile')) to_delete = set([entity_jid]) if delete_all_resources: if entity_jid.resource: raise ValueError(_("Need a bare jid to delete all resources")) entity_data = self._getEntitiesData(entity_jid, profile)[entity_jid] resources = entity_data.setdefault('resources', set()) to_delete.update([jid.JID("%s/%s" % (entity_jid.userhost(), resource)) for resource in resources]) for entity in to_delete: try: del self._entities_cache[profile][entity] except KeyError: log.debug("Can't delete entity [%s]: not in cache" % entity.full()) def encryptPersonalData(self, data_key, data_value, crypto_key, profile): """Re-encrypt a personal data (saved to a PersistentDict). @param data_key: key for the individual PersistentDict instance @param data_value: the value to be encrypted @param crypto_key: the key to encrypt the value @param profile: %(profile_doc)s @return: a deferred None value """ def gotIndMemory(data): d = BlockCipher.encrypt(crypto_key, data_value) def cb(cipher): data[data_key] = cipher return data.force(data_key) return d.addCallback(cb) def done(dummy): log.debug(_('Personal data (%(ns)s, %(key)s) has been successfuly encrypted') % {'ns': C.MEMORY_CRYPTO_NAMESPACE, 'key': data_key}) d = PersistentDict(C.MEMORY_CRYPTO_NAMESPACE, profile).load() return d.addCallback(gotIndMemory).addCallback(done) def addWaitingSub(self, type_, entity_jid, profile_key): """Called when a subcription request is received""" profile = self.getProfileName(profile_key) assert profile if profile not in self.subscriptions: self.subscriptions[profile] = {} self.subscriptions[profile][entity_jid] = type_ def delWaitingSub(self, entity_jid, profile_key): """Called when a subcription request is finished""" profile = self.getProfileName(profile_key) assert profile if profile in self.subscriptions and entity_jid in self.subscriptions[profile]: del self.subscriptions[profile][entity_jid] def getWaitingSub(self, profile_key): """Called to get a list of currently waiting subscription requests""" profile = self.getProfileName(profile_key) if not profile: log.error(_('Asking waiting subscriptions for a non-existant profile')) return {} if profile not in self.subscriptions: return {} return self.subscriptions[profile] def getStringParamA(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE): return self.params.getStringParamA(name, category, attr, profile_key) def getParamA(self, name, category, attr="value", profile_key=C.PROF_KEY_NONE): return self.params.getParamA(name, category, attr, profile_key) def asyncGetParamA(self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): return self.params.asyncGetParamA(name, category, attr, security_limit, profile_key) def asyncGetStringParamA(self, name, category, attr="value", security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): return self.params.asyncGetStringParamA(name, category, attr, security_limit, profile_key) def getParamsUI(self, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE): return self.params.getParamsUI(security_limit, app, profile_key) def getParams(self, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE): return self.params.getParams(security_limit, app, profile_key) def getParamsForCategory(self, category, security_limit=C.NO_SECURITY_LIMIT, app='', profile_key=C.PROF_KEY_NONE): return self.params.getParamsForCategory(category, security_limit, app, profile_key) def getParamsCategories(self): return self.params.getParamsCategories() def setParam(self, name, value, category, security_limit=C.NO_SECURITY_LIMIT, profile_key=C.PROF_KEY_NONE): return self.params.setParam(name, value, category, security_limit, profile_key) def updateParams(self, xml): return self.params.updateParams(xml) def paramsRegisterApp(self, xml, security_limit=C.NO_SECURITY_LIMIT, app=''): return self.params.paramsRegisterApp(xml, security_limit, app) def setDefault(self, name, category, callback, errback=None): return self.params.setDefault(name, category, callback, errback)