# HG changeset patch # User Goffi # Date 1622726447 -7200 # Node ID f9a5b810f14d5249988ee1c62ee356acf8ec8a1d # Parent 0985c47ffd96f0c1e93d5854184b2ad11a3e0544 core (memory/storage): backend storage is now based on SQLAlchemy diff -r 0985c47ffd96 -r f9a5b810f14d sat/core/core_types.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/core/core_types.py Thu Jun 03 15:20:47 2021 +0200 @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +# Libervia types +# Copyright (C) 2011 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +class SatXMPPEntity: + pass diff -r 0985c47ffd96 -r f9a5b810f14d sat/core/sat_main.py --- a/sat/core/sat_main.py Thu Jun 03 15:15:11 2021 +0200 +++ b/sat/core/sat_main.py Thu Jun 03 15:20:47 2021 +0200 @@ -58,6 +58,7 @@ log = getLogger(__name__) class SAT(service.Service): + def _init(self): # we don't use __init__ to avoid doule initialisation with twistd # this _init is called in startService @@ -175,8 +176,7 @@ self.bridge.register_method("imageResize", self._imageResize) self.bridge.register_method("imageGeneratePreview", self._imageGeneratePreview) self.bridge.register_method("imageConvert", self._imageConvert) - - self.memory.initialized.addCallback(lambda __: defer.ensureDeferred(self._postMemoryInit())) + defer.ensureDeferred(self._postInit()) @property def version(self): @@ -206,8 +206,8 @@ def bridge_name(self): return os.path.splitext(os.path.basename(self.bridge.__file__))[0] - async def _postMemoryInit(self): - """Method called after memory initialization is done""" + async def _postInit(self): + await self.memory.initialise() self.common_cache = cache.Cache(self, None) log.info(_("Memory initialised")) try: diff -r 0985c47ffd96 -r f9a5b810f14d sat/core/xmpp.py --- a/sat/core/xmpp.py Thu Jun 03 15:15:11 2021 +0200 +++ b/sat/core/xmpp.py Thu Jun 03 15:20:47 2021 +0200 @@ -42,6 +42,7 @@ from wokkel import delay from sat.core.log import getLogger from sat.core import exceptions +from sat.core import core_types from sat.memory import encryption from sat.memory import persistent from sat.tools import xml_tools @@ -83,7 +84,7 @@ return partial(getattr(self.plugin, attr), self.client) -class SatXMPPEntity: +class SatXMPPEntity(core_types.SatXMPPEntity): """Common code for Client and Component""" # profile is added there when startConnection begins and removed when it is finished profiles_connecting = set() diff -r 0985c47ffd96 -r f9a5b810f14d sat/memory/memory.py --- a/sat/memory/memory.py Thu Jun 03 15:15:11 2021 +0200 +++ b/sat/memory/memory.py Thu Jun 03 15:20:47 2021 +0200 @@ -33,7 +33,7 @@ from sat.core.log import getLogger from sat.core import exceptions from sat.core.constants import Const as C -from sat.memory.sqlite import SqliteStorage +from sat.memory.sqla import Storage from sat.memory.persistent import PersistentDict from sat.memory.params import Params from sat.memory.disco import Discovery @@ -228,7 +228,6 @@ def __init__(self, host): log.info(_("Memory manager init")) - self.initialized = defer.Deferred() self.host = host self._entities_cache = {} # XXX: keep presence/last resource/other data in cache # /!\ an entity is not necessarily in roster @@ -240,19 +239,22 @@ self.disco = Discovery(host) self.config = tools_config.parseMainConf(log_filenames=True) self._cache_path = Path(self.getConfig("", "local_dir"), C.CACHE_DIR) + + async def initialise(self): database_file = os.path.expanduser( os.path.join(self.getConfig("", "local_dir"), C.SAVEFILE_DATABASE) ) - self.storage = SqliteStorage(database_file, host.version) + self.storage = Storage(database_file, self.host.version) + await self.storage.initialise() PersistentDict.storage = self.storage - self.params = Params(host, self.storage) + self.params = Params(self.host, self.storage) log.info(_("Loading default params template")) self.params.load_default_params() - d = self.storage.initialized.addCallback(lambda ignore: self.load()) + await self.load() self.memory_data = PersistentDict("memory") - d.addCallback(lambda ignore: self.memory_data.load()) - d.addCallback(lambda ignore: self.disco.load()) - d.chainDeferred(self.initialized) + await self.memory_data.load() + await self.disco.load() + ## Configuration ## diff -r 0985c47ffd96 -r f9a5b810f14d sat/memory/sqla.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/memory/sqla.py Thu Jun 03 15:20:47 2021 +0200 @@ -0,0 +1,881 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import time +from typing import Dict, List, Tuple, Iterable, Any, Callable, Optional +from urllib.parse import quote +from pathlib import Path +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.exc import IntegrityError, NoResultFound +from sqlalchemy.orm import sessionmaker, subqueryload, contains_eager +from sqlalchemy.future import select +from sqlalchemy.engine import Engine +from sqlalchemy import update, delete, and_, or_, event +from sqlalchemy.sql.functions import coalesce, sum as sum_ +from sqlalchemy.dialects.sqlite import insert +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from sat.core.i18n import _ +from sat.core import exceptions +from sat.core.log import getLogger +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.tools.utils import aio +from sat.memory.sqla_mapping import ( + NOT_IN_EXTRA, + Base, + Profile, + Component, + History, + Message, + Subject, + Thread, + ParamGen, + ParamInd, + PrivateGen, + PrivateInd, + PrivateGenBin, + PrivateIndBin, + File +) + + +log = getLogger(__name__) + + +@event.listens_for(Engine, "connect") +def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + + +class Storage: + + def __init__(self, db_filename, sat_version): + self.initialized = defer.Deferred() + self.filename = Path(db_filename) + # we keep cache for the profiles (key: profile name, value: profile id) + # profile id to name + self.profiles: Dict[int, str] = {} + # profile id to component entry point + self.components: Dict[int, str] = {} + + @aio + async def initialise(self): + log.info(_("Connecting database")) + engine = create_async_engine( + f"sqlite+aiosqlite:///{quote(str(self.filename))}", + future=True + ) + self.session = sessionmaker( + engine, expire_on_commit=False, class_=AsyncSession + ) + new_base = not self.filename.exists() + if new_base: + log.info(_("The database is new, creating the tables")) + # the dir may not exist if it's not the XDG recommended one + self.filename.parent.mkdir(0o700, True, True) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with self.session() as session: + result = await session.execute(select(Profile)) + for p in result.scalars(): + self.profiles[p.name] = p.id + result = await session.execute(select(Component)) + for c in result.scalars(): + self.components[c.profile_id] = c.entry_point + + self.initialized.callback(None) + + ## Profiles + + def getProfilesList(self) -> List[str]: + """"Return list of all registered profiles""" + return list(self.profiles.keys()) + + def hasProfile(self, profile_name: str) -> bool: + """return True if profile_name exists + + @param profile_name: name of the profile to check + """ + return profile_name in self.profiles + + def profileIsComponent(self, profile_name: str) -> bool: + try: + return self.profiles[profile_name] in self.components + except KeyError: + raise exceptions.NotFound("the requested profile doesn't exists") + + def getEntryPoint(self, profile_name: str) -> str: + try: + return self.components[self.profiles[profile_name]] + except KeyError: + raise exceptions.NotFound("the requested profile doesn't exists or is not a component") + + @aio + async def createProfile(self, name: str, component_ep: Optional[str] = None) -> None: + """Create a new profile + + @param name: name of the profile + @param component: if not None, must point to a component entry point + """ + async with self.session() as session: + profile = Profile(name=name) + async with session.begin(): + session.add(profile) + self.profiles[profile.id] = profile.name + if component_ep is not None: + async with session.begin(): + component = Component(profile=profile, entry_point=component_ep) + session.add(component) + self.components[profile.id] = component_ep + return profile + + @aio + async def deleteProfile(self, name: str) -> None: + """Delete profile + + @param name: name of the profile + """ + async with self.session() as session: + result = await session.execute(select(Profile).where(Profile.name == name)) + profile = result.scalar() + await session.delete(profile) + await session.commit() + del self.profiles[profile.id] + if profile.id in self.components: + del self.components[profile.id] + log.info(_("Profile {name!r} deleted").format(name = name)) + + ## Params + + @aio + async def loadGenParams(self, params_gen: dict) -> None: + """Load general parameters + + @param params_gen: dictionary to fill + """ + log.debug(_("loading general parameters from database")) + async with self.session() as session: + result = await session.execute(select(ParamGen)) + for p in result.scalars(): + params_gen[(p.category, p.name)] = p.value + + @aio + async def loadIndParams(self, params_ind: dict, profile: str) -> None: + """Load individual parameters + + @param params_ind: dictionary to fill + @param profile: a profile which *must* exist + """ + log.debug(_("loading individual parameters from database")) + async with self.session() as session: + result = await session.execute( + select(ParamInd).where(ParamInd.profile_id == self.profiles[profile]) + ) + for p in result.scalars(): + params_ind[(p.category, p.name)] = p.value + + @aio + async def getIndParam(self, category: str, name: str, profile: str) -> Optional[str]: + """Ask database for the value of one specific individual parameter + + @param category: category of the parameter + @param name: name of the parameter + @param profile: %(doc_profile)s + """ + async with self.session() as session: + result = await session.execute( + select(ParamInd.value) + .filter_by( + category=category, + name=name, + profile_id=self.profiles[profile] + ) + ) + return result.scalar_one_or_none() + + @aio + async def getIndParamValues(self, category: str, name: str) -> Dict[str, str]: + """Ask database for the individual values of a parameter for all profiles + + @param category: category of the parameter + @param name: name of the parameter + @return dict: profile => value map + """ + async with self.session() as session: + result = await session.execute( + select(ParamInd) + .filter_by( + category=category, + name=name + ) + .options(subqueryload(ParamInd.profile)) + ) + return {param.profile.name: param.value for param in result.scalars()} + + @aio + async def setGenParam(self, category: str, name: str, value: Optional[str]) -> None: + """Save the general parameters in database + + @param category: category of the parameter + @param name: name of the parameter + @param value: value to set + """ + async with self.session() as session: + stmt = insert(ParamGen).values( + category=category, + name=name, + value=value + ).on_conflict_do_update( + index_elements=(ParamGen.category, ParamGen.name), + set_={ + ParamGen.value: value + } + ) + await session.execute(stmt) + await session.commit() + + @aio + async def setIndParam( + self, + category:str, + name: str, + value: Optional[str], + profile: str + ) -> None: + """Save the individual parameters in database + + @param category: category of the parameter + @param name: name of the parameter + @param value: value to set + @param profile: a profile which *must* exist + """ + async with self.session() as session: + stmt = insert(ParamInd).values( + category=category, + name=name, + profile_id=self.profiles[profile], + value=value + ).on_conflict_do_update( + index_elements=(ParamInd.category, ParamInd.name, ParamInd.profile_id), + set_={ + ParamInd.value: value + } + ) + await session.execute(stmt) + await session.commit() + + def _jid_filter(self, jid_: jid.JID, dest: bool = False): + """Generate condition to filter on a JID, using relevant columns + + @param dest: True if it's the destinee JID, otherwise it's the source one + @param jid_: JID to filter by + """ + if jid_.resource: + if dest: + return and_( + History.dest == jid_.userhost(), + History.dest_res == jid_.resource + ) + else: + return and_( + History.source == jid_.userhost(), + History.source_res == jid_.resource + ) + else: + if dest: + return History.dest == jid_.userhost() + else: + return History.source == jid_.userhost() + + @aio + async def historyGet( + self, + from_jid: Optional[jid.JID], + to_jid: Optional[jid.JID], + limit: Optional[int] = None, + between: bool = True, + filters: Optional[Dict[str, str]] = None, + profile: Optional[str] = None, + ) -> List[Tuple[ + str, int, str, str, Dict[str, str], Dict[str, str], str, str, str] + ]: + """Retrieve messages in history + + @param from_jid: source JID (full, or bare for catchall) + @param to_jid: dest JID (full, or bare for catchall) + @param limit: maximum number of messages to get: + - 0 for no message (returns the empty list) + - None for unlimited + @param between: confound source and dest (ignore the direction) + @param filters: pattern to filter the history results + @return: list of messages as in [messageNew], minus the profile which is already + known. + """ + # we have to set a default value to profile because it's last argument + # and thus follow other keyword arguments with default values + # but None should not be used for it + assert profile is not None + if limit == 0: + return [] + if filters is None: + filters = {} + + stmt = ( + select(History) + .filter_by( + profile_id=self.profiles[profile] + ) + .outerjoin(History.messages) + .outerjoin(History.subjects) + .outerjoin(History.thread) + .options( + contains_eager(History.messages), + contains_eager(History.subjects), + contains_eager(History.thread), + ) + .order_by( + # timestamp may be identical for 2 close messages (specially when delay is + # used) that's why we order ties by received_timestamp. We'll reverse the + # order when returning the result. We use DESC here so LIMIT keep the last + # messages + History.timestamp.desc(), + History.received_timestamp.desc() + ) + ) + + + if not from_jid and not to_jid: + # no jid specified, we want all one2one communications + pass + elif between: + if not from_jid or not to_jid: + # we only have one jid specified, we check all messages + # from or to this jid + jid_ = from_jid or to_jid + stmt = stmt.where( + or_( + self._jid_filter(jid_), + self._jid_filter(jid_, dest=True) + ) + ) + else: + # we have 2 jids specified, we check all communications between + # those 2 jids + stmt = stmt.where( + or_( + and_( + self._jid_filter(from_jid), + self._jid_filter(to_jid, dest=True), + ), + and_( + self._jid_filter(to_jid), + self._jid_filter(from_jid, dest=True), + ) + ) + ) + else: + # we want one communication in specific direction (from somebody or + # to somebody). + if from_jid is not None: + stmt = stmt.where(self._jid_filter(from_jid)) + if to_jid is not None: + stmt = stmt.where(self._jid_filter(to_jid, dest=True)) + + if filters: + if 'timestamp_start' in filters: + stmt = stmt.where(History.timestamp >= float(filters['timestamp_start'])) + if 'before_uid' in filters: + # orignially this query was using SQLITE's rowid. This has been changed + # to use coalesce(received_timestamp, timestamp) to be SQL engine independant + stmt = stmt.where( + coalesce( + History.received_timestamp, + History.timestamp + ) < ( + select(coalesce(History.received_timestamp, History.timestamp)) + .filter_by(uid=filters["before_uid"]) + ).scalar_subquery() + ) + if 'body' in filters: + # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html + stmt = stmt.where(Message.message.like(f"%{filters['body']}%")) + if 'search' in filters: + search_term = f"%{filters['search']}%" + stmt = stmt.where(or_( + Message.message.like(search_term), + History.source_res.like(search_term) + )) + if 'types' in filters: + types = filters['types'].split() + stmt = stmt.where(History.type.in_(types)) + if 'not_types' in filters: + types = filters['not_types'].split() + stmt = stmt.where(History.type.not_in(types)) + if 'last_stanza_id' in filters: + # this request get the last message with a "stanza_id" that we + # have in history. This is mainly used to retrieve messages sent + # while we were offline, using MAM (XEP-0313). + if (filters['last_stanza_id'] is not True + or limit != 1): + raise ValueError("Unexpected values for last_stanza_id filter") + stmt = stmt.where(History.stanza_id.is_not(None)) + + if limit is not None: + stmt = stmt.limit(limit) + + async with self.session() as session: + result = await session.execute(stmt) + + result = result.scalars().unique().all() + result.reverse() + return [h.as_tuple() for h in result] + + @aio + async def addToHistory(self, data: dict, profile: str) -> None: + """Store a new message in history + + @param data: message data as build by SatMessageProtocol.onMessage + """ + extra = {k: v for k, v in data["extra"].items() if k not in NOT_IN_EXTRA} + messages = [Message(message=mess, language=lang) + for lang, mess in data["message"].items()] + subjects = [Subject(subject=mess, language=lang) + for lang, mess in data["subject"].items()] + if "thread" in data["extra"]: + thread = Thread(thread_id=data["extra"]["thread"], + parent_id=data["extra"].get["thread_parent"]) + else: + thread = None + try: + async with self.session() as session: + async with session.begin(): + session.add(History( + uid=data["uid"], + stanza_id=data["extra"].get("stanza_id"), + update_uid=data["extra"].get("update_uid"), + profile_id=self.profiles[profile], + source_jid=data["from"], + dest_jid=data["to"], + timestamp=data["timestamp"], + received_timestamp=data.get("received_timestamp"), + type=data["type"], + extra=extra, + messages=messages, + subjects=subjects, + thread=thread, + )) + except IntegrityError as e: + if "unique" in str(e.orig).lower(): + log.debug( + f"message {data['uid']!r} is already in history, not storing it again" + ) + else: + log.error(f"Can't store message {data['uid']!r} in history: {e}") + except Exception as e: + log.critical( + f"Can't store message, unexpected exception (uid: {data['uid']}): {e}" + ) + + ## Private values + + def _getPrivateClass(self, binary, profile): + """Get ORM class to use for private values""" + if profile is None: + return PrivateGenBin if binary else PrivateGen + else: + return PrivateIndBin if binary else PrivateInd + + + @aio + async def getPrivates( + self, + namespace:str, + keys: Optional[Iterable[str]] = None, + binary: bool = False, + profile: Optional[str] = None + ) -> Dict[str, Any]: + """Get private value(s) from databases + + @param namespace: namespace of the values + @param keys: keys of the values to get None to get all keys/values + @param binary: True to deserialise binary values + @param profile: profile to use for individual values + None to use general values + @return: gotten keys/values + """ + if keys is not None: + keys = list(keys) + log.debug( + f"getting {'general' if profile is None else 'individual'}" + f"{' binary' if binary else ''} private values from database for namespace " + f"{namespace}{f' with keys {keys!r}' if keys is not None else ''}" + ) + cls = self._getPrivateClass(binary, profile) + stmt = select(cls).filter_by(namespace=namespace) + if keys: + stmt = stmt.where(cls.key.in_(list(keys))) + if profile is not None: + stmt = stmt.filter_by(profile_id=self.profiles[profile]) + async with self.session() as session: + result = await session.execute(stmt) + return {p.key: p.value for p in result.scalars()} + + @aio + async def setPrivateValue( + self, + namespace: str, + key:str, + value: Any, + binary: bool = False, + profile: Optional[str] = None + ) -> None: + """Set a private value in database + + @param namespace: namespace of the values + @param key: key of the value to set + @param value: value to set + @param binary: True if it's a binary values + binary values need to be serialised, used for everything but strings + @param profile: profile to use for individual value + if None, it's a general value + """ + cls = self._getPrivateClass(binary, profile) + + values = { + "namespace": namespace, + "key": key, + "value": value + } + index_elements = [cls.namespace, cls.key] + + if profile is not None: + values["profile_id"] = self.profiles[profile] + index_elements.append(cls.profile_id) + + async with self.session() as session: + await session.execute( + insert(cls).values(**values).on_conflict_do_update( + index_elements=index_elements, + set_={ + cls.value: value + } + ) + ) + await session.commit() + + @aio + async def delPrivateValue( + self, + namespace: str, + key: str, + binary: bool = False, + profile: Optional[str] = None + ) -> None: + """Delete private value from database + + @param category: category of the privateeter + @param key: key of the private value + @param binary: True if it's a binary values + @param profile: profile to use for individual value + if None, it's a general value + """ + cls = self._getPrivateClass(binary, profile) + + stmt = delete(cls).filter_by(namespace=namespace, key=key) + + if profile is not None: + stmt = stmt.filter_by(profile_id=self.profiles[profile]) + + async with self.session() as session: + await session.execute(stmt) + await session.commit() + + @aio + async def delPrivateNamespace( + self, + namespace: str, + binary: bool = False, + profile: Optional[str] = None + ) -> None: + """Delete all data from a private namespace + + Be really cautious when you use this method, as all data with given namespace are + removed. + Params are the same as for delPrivateValue + """ + cls = self._getPrivateClass(binary, profile) + + stmt = delete(cls).filter_by(namespace=namespace) + + if profile is not None: + stmt = stmt.filter_by(profile_id=self.profiles[profile]) + + async with self.session() as session: + await session.execute(stmt) + await session.commit() + + ## Files + + @aio + async def getFiles( + self, + client: Optional[SatXMPPEntity], + file_id: Optional[str] = None, + version: Optional[str] = '', + parent: Optional[str] = None, + type_: Optional[str] = None, + file_hash: Optional[str] = None, + hash_algo: Optional[str] = None, + name: Optional[str] = None, + namespace: Optional[str] = None, + mime_type: Optional[str] = None, + public_id: Optional[str] = None, + owner: Optional[jid.JID] = None, + access: Optional[dict] = None, + projection: Optional[List[str]] = None, + unique: bool = False + ) -> List[dict]: + """Retrieve files with with given filters + + @param file_id: id of the file + None to ignore + @param version: version of the file + None to ignore + empty string to look for current version + @param parent: id of the directory containing the files + None to ignore + empty string to look for root files/directories + @param projection: name of columns to retrieve + None to retrieve all + @param unique: if True will remove duplicates + other params are the same as for [setFile] + @return: files corresponding to filters + """ + if projection is None: + projection = [ + 'id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name', + 'size', 'namespace', 'media_type', 'media_subtype', 'public_id', + 'created', 'modified', 'owner', 'access', 'extra' + ] + + stmt = select(*[getattr(File, f) for f in projection]) + + if unique: + stmt = stmt.distinct() + + if client is not None: + stmt = stmt.filter_by(profile_id=self.profiles[client.profile]) + else: + if public_id is None: + raise exceptions.InternalError( + "client can only be omitted when public_id is set" + ) + if file_id is not None: + stmt = stmt.filter_by(id=file_id) + if version is not None: + stmt = stmt.filter_by(version=version) + if parent is not None: + stmt = stmt.filter_by(parent=parent) + if type_ is not None: + stmt = stmt.filter_by(type=type_) + if file_hash is not None: + stmt = stmt.filter_by(file_hash=file_hash) + if hash_algo is not None: + stmt = stmt.filter_by(hash_algo=hash_algo) + if name is not None: + stmt = stmt.filter_by(name=name) + if namespace is not None: + stmt = stmt.filter_by(namespace=namespace) + if mime_type is not None: + if '/' in mime_type: + media_type, media_subtype = mime_type.split("/", 1) + stmt = stmt.filter_by(media_type=media_type, media_subtype=media_subtype) + else: + stmt = stmt.filter_by(media_type=mime_type) + if public_id is not None: + stmt = stmt.filter_by(public_id=public_id) + if owner is not None: + stmt = stmt.filter_by(owner=owner) + if access is not None: + raise NotImplementedError('Access check is not implemented yet') + # a JSON comparison is needed here + + async with self.session() as session: + result = await session.execute(stmt) + + return [dict(r) for r in result] + + @aio + async def setFile( + self, + client: SatXMPPEntity, + name: str, + file_id: str, + version: str = "", + parent: str = "", + type_: str = C.FILE_TYPE_FILE, + file_hash: Optional[str] = None, + hash_algo: Optional[str] = None, + size: int = None, + namespace: Optional[str] = None, + mime_type: Optional[str] = None, + public_id: Optional[str] = None, + created: Optional[float] = None, + modified: Optional[float] = None, + owner: Optional[jid.JID] = None, + access: Optional[dict] = None, + extra: Optional[dict] = None + ) -> None: + """Set a file metadata + + @param client: client owning the file + @param name: name of the file (must not contain "/") + @param file_id: unique id of the file + @param version: version of this file + @param parent: id of the directory containing this file + Empty string if it is a root file/directory + @param type_: one of: + - file + - directory + @param file_hash: unique hash of the payload + @param hash_algo: algorithm used for hashing the file (usually sha-256) + @param size: size in bytes + @param namespace: identifier (human readable is better) to group files + for instance, namespace could be used to group files in a specific photo album + @param mime_type: media type of the file, or None if not known/guessed + @param public_id: ID used to server the file publicly via HTTP + @param created: UNIX time of creation + @param modified: UNIX time of last modification, or None to use created date + @param owner: jid of the owner of the file (mainly useful for component) + @param access: serialisable dictionary with access rules. See [memory.memory] for details + @param extra: serialisable dictionary of any extra data + will be encoded to json in database + """ + if mime_type is None: + media_type = media_subtype = None + elif '/' in mime_type: + media_type, media_subtype = mime_type.split('/', 1) + else: + media_type, media_subtype = mime_type, None + + async with self.session() as session: + async with session.begin(): + session.add(File( + id=file_id, + version=version.strip(), + parent=parent, + type=type_, + file_hash=file_hash, + hash_algo=hash_algo, + name=name, + size=size, + namespace=namespace, + media_type=media_type, + media_subtype=media_subtype, + public_id=public_id, + created=time.time() if created is None else created, + modified=modified, + owner=owner, + access=access, + extra=extra, + profile_id=self.profiles[client.profile] + )) + + @aio + async def fileGetUsedSpace(self, client: SatXMPPEntity, owner: jid.JID) -> int: + async with self.session() as session: + result = await session.execute( + select(sum_(File.size)).filter_by( + owner=owner, + type=C.FILE_TYPE_FILE, + profile_id=self.profiles[client.profile] + )) + return result.scalar_one_or_none() or 0 + + @aio + async def fileDelete(self, file_id: str) -> None: + """Delete file metadata from the database + + @param file_id: id of the file to delete + NOTE: file itself must still be removed, this method only handle metadata in + database + """ + async with self.session() as session: + await session.execute(delete(File).filter_by(id=file_id)) + await session.commit() + + @aio + async def fileUpdate( + self, + file_id: str, + column: str, + update_cb: Callable[[dict], None] + ) -> None: + """Update a column value using a method to avoid race conditions + + the older value will be retrieved from database, then update_cb will be applied to + update it, and file will be updated checking that older value has not been changed + meanwhile by an other user. If it has changed, it tries again a couple of times + before failing + @param column: column name (only "access" or "extra" are allowed) + @param update_cb: method to update the value of the colum + the method will take older value as argument, and must update it in place + update_cb must not care about serialization, + it get the deserialized data (i.e. a Python object) directly + @raise exceptions.NotFound: there is not file with this id + """ + if column not in ('access', 'extra'): + raise exceptions.InternalError('bad column name') + orm_col = getattr(File, column) + + for i in range(5): + async with self.session() as session: + try: + value = (await session.execute( + select(orm_col).filter_by(id=file_id) + )).scalar_one() + except NoResultFound: + raise exceptions.NotFound + update_cb(value) + stmt = update(orm_col).filter_by(id=file_id) + if not value: + # because JsonDefaultDict convert NULL to an empty dict, we have to + # test both for empty dict and None when we have and empty dict + stmt = stmt.where((orm_col == None) | (orm_col == value)) + else: + stmt = stmt.where(orm_col == value) + result = await session.execute(stmt) + await session.commit() + + if result.rowcount == 1: + break + + log.warning( + _("table not updated, probably due to race condition, trying again " + "({tries})").format(tries=i+1) + ) + + else: + raise exceptions.DatabaseError( + _("Can't update file {file_id} due to race condition") + .format(file_id=file_id) + ) diff -r 0985c47ffd96 -r f9a5b810f14d sat/memory/sqla_mapping.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/memory/sqla_mapping.py Thu Jun 03 15:20:47 2021 +0200 @@ -0,0 +1,395 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import pickle +import json +from sqlalchemy import ( + Column, Integer, Text, Float, Enum, ForeignKey, UniqueConstraint, Index, +) + +from sqlalchemy.orm import declarative_base, relationship +from sqlalchemy.types import TypeDecorator +from twisted.words.protocols.jabber import jid +from datetime import datetime + + +Base = declarative_base() +# keys which are in message data extra but not stored in extra field this is +# because those values are stored in separate fields +NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') + + +class LegacyPickle(TypeDecorator): + """Handle troubles with data pickled by former version of SàT + + This type is temporary until we do migration to a proper data type + """ + # Blob is used on SQLite but gives errors when used here, while Text works fine + impl = Text + cache_ok = True + + def process_bind_param(self, value, dialect): + if value is None: + return None + return pickle.dumps(value, 0) + + def process_result_value(self, value, dialect): + if value is None: + return None + # value types are inconsistent (probably a consequence of Python 2/3 port + # and/or SQLite dynamic typing) + try: + value = value.encode() + except AttributeError: + pass + # "utf-8" encoding is needed to handle Python 2 pickled data + return pickle.loads(value, encoding="utf-8") + + +class Json(TypeDecorator): + """Handle JSON field in DB independant way""" + # Blob is used on SQLite but gives errors when used here, while Text works fine + impl = Text + cache_ok = True + + def process_bind_param(self, value, dialect): + if value is None: + return None + return json.dumps(value) + + def process_result_value(self, value, dialect): + if value is None: + return None + return json.loads(value) + + +class JsonDefaultDict(Json): + """Json type which convert NULL to empty dict instead of None""" + + def process_result_value(self, value, dialect): + if value is None: + return {} + return json.loads(value) + + +class JID(TypeDecorator): + """Store twisted JID in text fields""" + impl = Text + cache_ok = True + + def process_bind_param(self, value, dialect): + if value is None: + return None + return value.full() + + def process_result_value(self, value, dialect): + if value is None: + return None + return jid.JID(value) + + +class Profile(Base): + __tablename__ = "profiles" + + id = Column(Integer, primary_key=True) + name = Column(Text, unique=True) + + params = relationship("ParamInd", back_populates="profile", passive_deletes=True) + private_data = relationship( + "PrivateInd", back_populates="profile", passive_deletes=True + ) + private_bin_data = relationship( + "PrivateIndBin", back_populates="profile", passive_deletes=True + ) + + +class Component(Base): + __tablename__ = "components" + + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True) + entry_point = Column(Text, nullable=False) + profile = relationship("Profile") + + +class MessageType(Base): + __tablename__ = "message_types" + + type = Column(Text, primary_key=True) + + +class History(Base): + __tablename__ = "history" + __table_args__ = ( + UniqueConstraint("profile_id", "stanza_id", "source", "dest"), + Index("history__profile_id_timestamp", "profile_id", "timestamp"), + Index( + "history__profile_id_received_timestamp", "profile_id", "received_timestamp" + ) + ) + + uid = Column(Text, primary_key=True) + stanza_id = Column(Text) + update_uid = Column(Text) + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) + source = Column(Text) + dest = Column(Text) + source_res = Column(Text) + dest_res = Column(Text) + timestamp = Column(Float, nullable=False) + received_timestamp = Column(Float) + type = Column(ForeignKey("message_types.type")) + extra = Column(LegacyPickle) + + profile = relationship("Profile") + message_type = relationship("MessageType") + messages = relationship("Message", backref="history", passive_deletes=True) + subjects = relationship("Subject", backref="history", passive_deletes=True) + thread = relationship( + "Thread", uselist=False, back_populates="history", passive_deletes=True + ) + + def __init__(self, *args, **kwargs): + source_jid = kwargs.pop("source_jid", None) + if source_jid is not None: + kwargs["source"] = source_jid.userhost() + kwargs["source_res"] = source_jid.resource + dest_jid = kwargs.pop("dest_jid", None) + if dest_jid is not None: + kwargs["dest"] = dest_jid.userhost() + kwargs["dest_res"] = dest_jid.resource + super().__init__(*args, **kwargs) + + @property + def source_jid(self) -> jid.JID: + return jid.JID(f"{self.source}/{self.source_res or ''}") + + @source_jid.setter + def source_jid(self, source_jid: jid.JID) -> None: + self.source = source_jid.userhost + self.source_res = source_jid.resource + + @property + def dest_jid(self): + return jid.JID(f"{self.dest}/{self.dest_res or ''}") + + @dest_jid.setter + def dest_jid(self, dest_jid: jid.JID) -> None: + self.dest = dest_jid.userhost + self.dest_res = dest_jid.resource + + def __repr__(self): + dt = datetime.fromtimestamp(self.timestamp) + return f"History<{self.source_jid.full()}->{self.dest_jid.full()} [{dt}]>" + + def serialise(self): + extra = self.extra + if self.stanza_id is not None: + extra["stanza_id"] = self.stanza_id + if self.update_uid is not None: + extra["update_uid"] = self.update_uid + if self.received_timestamp is not None: + extra["received_timestamp"] = self.received_timestamp + if self.thread is not None: + extra["thread"] = self.thread.thread_id + if self.thread.parent_id is not None: + extra["thread_parent"] = self.thread.parent_id + + + return { + "from": f"{self.source}/{self.source_res}" if self.source_res + else self.source, + "to": f"{self.dest}/{self.dest_res}" if self.dest_res else self.dest, + "uid": self.uid, + "message": {m.language or '': m.message for m in self.messages}, + "subject": {m.language or '': m.subject for m in self.subjects}, + "type": self.type, + "extra": extra, + "timestamp": self.timestamp, + } + + def as_tuple(self): + d = self.serialise() + return ( + d['uid'], d['timestamp'], d['from'], d['to'], d['message'], d['subject'], + d['type'], d['extra'] + ) + + @staticmethod + def debug_collection(history_collection): + for idx, history in enumerate(history_collection): + history.debug_msg(idx) + + def debug_msg(self, idx=None): + """Print messages""" + dt = datetime.fromtimestamp(self.timestamp) + if idx is not None: + dt = f"({idx}) {dt}" + parts = [] + parts.append(f"[{dt}]<{self.source_jid.full()}->{self.dest_jid.full()}> ") + for message in self.messages: + if message.language: + parts.append(f"[{message.language}] ") + parts.append(f"{message.message}\n") + print("".join(parts)) + + +class Message(Base): + __tablename__ = "message" + + id = Column(Integer, primary_key=True) + history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), index=True) + message = Column(Text) + language = Column(Text) + + def __repr__(self): + lang_str = f"[{self.language}]" if self.language else "" + msg = f"{self.message[:20]}…" if len(self.message)>20 else self.message + content = f"{lang_str}{msg}" + return f"Message<{content}>" + + +class Subject(Base): + __tablename__ = "subject" + + id = Column(Integer, primary_key=True) + history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), index=True) + subject = Column(Text) + language = Column(Text) + + def __repr__(self): + lang_str = f"[{self.language}]" if self.language else "" + msg = f"{self.subject[:20]}…" if len(self.subject)>20 else self.subject + content = f"{lang_str}{msg}" + return f"Subject<{content}>" + + +class Thread(Base): + __tablename__ = "thread" + + id = Column(Integer, primary_key=True) + history_uid = Column(ForeignKey("history.uid", ondelete="CASCADE"), index=True) + thread_id = Column(Text) + parent_id = Column(Text) + + history = relationship("History", uselist=False, back_populates="thread") + + def __repr__(self): + return f"Thread<{self.thread_id} [parent: {self.parent_id}]>" + + +class ParamGen(Base): + __tablename__ = "param_gen" + + category = Column(Text, primary_key=True, nullable=False) + name = Column(Text, primary_key=True, nullable=False) + value = Column(Text) + + +class ParamInd(Base): + __tablename__ = "param_ind" + + category = Column(Text, primary_key=True, nullable=False) + name = Column(Text, primary_key=True, nullable=False) + profile_id = Column( + ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True, nullable=False + ) + value = Column(Text) + + profile = relationship("Profile", back_populates="params") + + +class PrivateGen(Base): + __tablename__ = "private_gen" + + namespace = Column(Text, primary_key=True, nullable=False) + key = Column(Text, primary_key=True, nullable=False) + value = Column(Text) + + +class PrivateInd(Base): + __tablename__ = "private_ind" + + namespace = Column(Text, primary_key=True, nullable=False) + key = Column(Text, primary_key=True, nullable=False) + profile_id = Column( + ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True, nullable=False + ) + value = Column(Text) + + profile = relationship("Profile", back_populates="private_data") + + +class PrivateGenBin(Base): + __tablename__ = "private_gen_bin" + + namespace = Column(Text, primary_key=True, nullable=False) + key = Column(Text, primary_key=True, nullable=False) + value = Column(LegacyPickle) + + +class PrivateIndBin(Base): + __tablename__ = "private_ind_bin" + + namespace = Column(Text, primary_key=True, nullable=False) + key = Column(Text, primary_key=True, nullable=False) + profile_id = Column( + ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True, nullable=False + ) + value = Column(LegacyPickle) + + profile = relationship("Profile", back_populates="private_bin_data") + + +class File(Base): + __tablename__ = "files" + __table_args__ = ( + Index("files__profile_id_owner_parent", "profile_id", "owner", "parent"), + Index( + "files__profile_id_owner_media_type_media_subtype", + "profile_id", + "owner", + "media_type", + "media_subtype" + ) + ) + + id = Column(Text, primary_key=True, nullable=False) + public_id = Column(Text, unique=True) + version = Column(Text, primary_key=True, nullable=False) + parent = Column(Text, nullable=False) + type = Column( + Enum("file", "directory", create_constraint=True), + nullable=False, + server_default="file", + # name="file_type", + ) + file_hash = Column(Text) + hash_algo = Column(Text) + name = Column(Text, nullable=False) + size = Column(Integer) + namespace = Column(Text) + media_type = Column(Text) + media_subtype = Column(Text) + created = Column(Float, nullable=False) + modified = Column(Float) + owner = Column(JID) + access = Column(JsonDefaultDict) + extra = Column(JsonDefaultDict) + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) + + profile = relationship("Profile") diff -r 0985c47ffd96 -r f9a5b810f14d sat/memory/sqlite.py --- a/sat/memory/sqlite.py Thu Jun 03 15:15:11 2021 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1766 +0,0 @@ -#!/usr/bin/env python3 - - -# SAT: a jabber client -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -from sat.core.i18n import _ -from sat.core.constants import Const as C -from sat.core import exceptions -from sat.core.log import getLogger -from sat.memory.crypto import BlockCipher, PasswordHasher -from sat.tools.config import fixConfigOption -from twisted.enterprise import adbapi -from twisted.internet import defer -from twisted.words.protocols.jabber import jid -from twisted.python import failure -from collections import OrderedDict -import sys -import re -import os.path -import pickle as pickle -import hashlib -import sqlite3 -import json - -log = getLogger(__name__) - -CURRENT_DB_VERSION = 9 - -# XXX: DATABASE schemas are used in the following way: -# - 'current' key is for the actual database schema, for a new base -# - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update -# e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed -# a 'current' data dict can contains the keys: -# - 'CREATE': it contains an Ordered dict with table to create as keys, and a len 2 tuple as value, where value[0] are the columns definitions and value[1] are the table constraints -# - 'INSERT': it contains an Ordered dict with table where values have to be inserted, and many tuples containing values to insert in the order of the rows (#TODO: manage named columns) -# - 'INDEX': -# an update data dict (the ones with a number) can contains the keys 'create', 'delete', 'cols create', 'cols delete', 'cols modify', 'insert' or 'specific'. See Updater.generateUpdateData for more infos. This method can be used to autogenerate update_data, to ease the work of the developers. -# TODO: indexes need to be improved - -DATABASE_SCHEMAS = { - "current": {'CREATE': OrderedDict(( - ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"), - ("UNIQUE (name)",))), - ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"), - ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))), - ('message_types', (("type TEXT PRIMARY KEY",), - ())), - ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", - "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) - "type TEXT", "extra BLOB"), - ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", - "UNIQUE (profile_id, stanza_id, source, dest)" # avoid storing 2 times the same message - ))), - ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"), - ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), - ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"), - ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), - ('thread', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "thread_id TEXT", "parent_id TEXT"),("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), - ('param_gen', (("category TEXT", "name TEXT", "value TEXT"), - ("PRIMARY KEY (category, name)",))), - ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"), - ("PRIMARY KEY (profile_id, category, name)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), - ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"), - ("PRIMARY KEY (namespace, key)",))), - ('private_ind', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"), - ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), - ('private_gen_bin', (("namespace TEXT", "key TEXT", "value BLOB"), - ("PRIMARY KEY (namespace, key)",))), - ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"), - ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), - ('files', (("id TEXT NOT NULL", "public_id TEXT", "version TEXT NOT NULL", - "parent TEXT NOT NULL", - "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( - file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), - "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", - "namespace TEXT", "media_type TEXT", "media_subtype TEXT", - "created DATETIME NOT NULL", "modified DATETIME", - "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"), - ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", - "UNIQUE (public_id)"))), - )), - 'INSERT': OrderedDict(( - ('message_types', (("'chat'",), - ("'error'",), - ("'groupchat'",), - ("'headline'",), - ("'normal'",), - ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC - )), - )), - 'INDEX': (('history', (('profile_id', 'timestamp'), - ('profile_id', 'received_timestamp'))), - ('message', ('history_uid',)), - ('subject', ('history_uid',)), - ('thread', ('history_uid',)), - ('files', (('profile_id', 'owner', 'media_type', 'media_subtype'), - ('profile_id', 'owner', 'parent'))), - ) - }, - 9: {'specific': 'update_v9' - }, - 8: {'specific': 'update_v8' - }, - 7: {'specific': 'update_v7' - }, - 6: {'cols create': {'history': ('stanza_id TEXT',)}, - }, - 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", - "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( - file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), - "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", - "namespace TEXT", "mime_type TEXT", - "created DATETIME NOT NULL", "modified DATETIME", - "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"), - ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))}, - }, - 4: {'create': {'components': (('profile_id INTEGER PRIMARY KEY', 'entry_point TEXT NOT NULL'), ('FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE',))} - }, - 3: {'specific': 'update_v3' - }, - 2: {'specific': 'update2raw_v2' - }, - 1: {'cols create': {'history': ('extra BLOB',)}, - }, - } - -NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field - # this is specific to this sqlite storage and for now only used for received_timestamp - # because this value is stored in a separate field - - -class ConnectionPool(adbapi.ConnectionPool): - def _runQuery(self, trans, *args, **kw): - retry = kw.pop('query_retry', 6) - try: - trans.execute(*args, **kw) - except sqlite3.IntegrityError as e: - # Workaround to avoid IntegrityError causing (i)pdb to be - # launched in debug mode - raise failure.Failure(e) - except Exception as e: - # FIXME: in case of error, we retry a couple of times - # this is a workaround, we need to move to better - # Sqlite integration, probably with high level library - retry -= 1 - if retry == 0: - log.error(_('too many db tries, we abandon! Error message: {msg}\n' - 'query was {query}' - .format(msg=e, query=' '.join([str(a) for a in args])))) - raise e - log.warning( - _('exception while running query, retrying ({try_}): {msg}').format( - try_ = 6 - retry, - msg = e)) - kw['query_retry'] = retry - return self._runQuery(trans, *args, **kw) - return trans.fetchall() - - def _runInteraction(self, interaction, *args, **kw): - # sometimes interaction may fail while committing in _runInteraction - # and it may be due to a db lock. So we work around it in a similar way - # as for _runQuery but with only 3 tries - retry = kw.pop('interaction_retry', 4) - try: - return adbapi.ConnectionPool._runInteraction(self, interaction, *args, **kw) - except Exception as e: - retry -= 1 - if retry == 0: - log.error( - _('too many interaction tries, we abandon! Error message: {msg}\n' - 'interaction method was: {interaction}\n' - 'interaction arguments were: {args}' - .format(msg=e, interaction=interaction, - args=', '.join([str(a) for a in args])))) - raise e - log.warning( - _('exception while running interaction, retrying ({try_}): {msg}') - .format(try_ = 4 - retry, msg = e)) - kw['interaction_retry'] = retry - return self._runInteraction(interaction, *args, **kw) - - -class SqliteStorage(object): - """This class manage storage with Sqlite database""" - - def __init__(self, db_filename, sat_version): - """Connect to the given database - - @param db_filename: full path to the Sqlite database - """ - # triggered when memory is fully initialised and ready - self.initialized = defer.Deferred() - # we keep cache for the profiles (key: profile name, value: profile id) - self.profiles = {} - - log.info(_("Connecting database")) - new_base = not os.path.exists(db_filename) # do we have to create the database ? - if new_base: # the dir may not exist if it's not the XDG recommended one - dir_ = os.path.dirname(db_filename) - if not os.path.exists(dir_): - os.makedirs(dir_, 0o700) - - def foreignKeysOn(sqlite): - sqlite.execute('PRAGMA foreign_keys = ON') - - self.dbpool = ConnectionPool("sqlite3", db_filename, cp_openfun=foreignKeysOn, check_same_thread=False, timeout=15) - - def getNewBaseSql(): - log.info(_("The database is new, creating the tables")) - database_creation = ["PRAGMA user_version=%d" % CURRENT_DB_VERSION] - database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE'])) - database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT'])) - database_creation.extend(Updater.indexData2Raw(DATABASE_SCHEMAS['current']['INDEX'])) - return database_creation - - def getUpdateSql(): - updater = Updater(self, sat_version) - return updater.checkUpdates() - - # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done - - init_defer = defer.succeed(None) - - init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) - init_defer.addCallback(self.commitStatements) - - def fillProfileCache(ignore): - return self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles) - - init_defer.addCallback(fillProfileCache) - init_defer.chainDeferred(self.initialized) - - def commitStatements(self, statements): - - if statements is None: - return defer.succeed(None) - log.debug("\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) - d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) - return d - - def _updateDb(self, interaction, statements): - for statement in statements: - interaction.execute(statement) - - ## Profiles - - def _cacheComponentsAndProfiles(self, components_result): - """Get components results and send requests profiles - - they will be both put in cache in _profilesCache - """ - return self.dbpool.runQuery("SELECT name,id FROM profiles").addCallback( - self._cacheComponentsAndProfiles2, components_result) - - def _cacheComponentsAndProfiles2(self, profiles_result, components): - """Fill the profiles cache - - @param profiles_result: result of the sql profiles query - """ - self.components = dict(components) - for profile in profiles_result: - name, id_ = profile - self.profiles[name] = id_ - - def getProfilesList(self): - """"Return list of all registered profiles""" - return list(self.profiles.keys()) - - def hasProfile(self, profile_name): - """return True if profile_name exists - - @param profile_name: name of the profile to check - """ - return profile_name in self.profiles - - def profileIsComponent(self, profile_name): - try: - return self.profiles[profile_name] in self.components - except KeyError: - raise exceptions.NotFound("the requested profile doesn't exists") - - def getEntryPoint(self, profile_name): - try: - return self.components[self.profiles[profile_name]] - except KeyError: - raise exceptions.NotFound("the requested profile doesn't exists or is not a component") - - def createProfile(self, name, component=None): - """Create a new profile - - @param name(unicode): name of the profile - @param component(None, unicode): if not None, must point to a component entry point - @return: deferred triggered once profile is actually created - """ - - def getProfileId(ignore): - return self.dbpool.runQuery("SELECT (id) FROM profiles WHERE name = ?", (name, )) - - def setComponent(profile_id): - id_ = profile_id[0][0] - d_comp = self.dbpool.runQuery("INSERT INTO components(profile_id, entry_point) VALUES (?, ?)", (id_, component)) - d_comp.addCallback(lambda __: profile_id) - return d_comp - - def profile_created(profile_id): - id_= profile_id[0][0] - self.profiles[name] = id_ # we synchronise the cache - - d = self.dbpool.runQuery("INSERT INTO profiles(name) VALUES (?)", (name, )) - d.addCallback(getProfileId) - if component is not None: - d.addCallback(setComponent) - d.addCallback(profile_created) - return d - - def deleteProfile(self, name): - """Delete profile - - @param name: name of the profile - @return: deferred triggered once profile is actually deleted - """ - def deletionError(failure_): - log.error(_("Can't delete profile [%s]") % name) - return failure_ - - def delete(txn): - profile_id = self.profiles.pop(name) - txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) - # FIXME: the following queries should be done by the ON DELETE CASCADE - # but it seems they are not, so we explicitly do them by security - # this need more investigation - txn.execute("DELETE FROM history WHERE profile_id = ?", (profile_id,)) - txn.execute("DELETE FROM param_ind WHERE profile_id = ?", (profile_id,)) - txn.execute("DELETE FROM private_ind WHERE profile_id = ?", (profile_id,)) - txn.execute("DELETE FROM private_ind_bin WHERE profile_id = ?", (profile_id,)) - txn.execute("DELETE FROM components WHERE profile_id = ?", (profile_id,)) - return None - - d = self.dbpool.runInteraction(delete) - d.addCallback(lambda ignore: log.info(_("Profile [%s] deleted") % name)) - d.addErrback(deletionError) - return d - - ## Params - def loadGenParams(self, params_gen): - """Load general parameters - - @param params_gen: dictionary to fill - @return: deferred - """ - - def fillParams(result): - for param in result: - category, name, value = param - params_gen[(category, name)] = value - log.debug(_("loading general parameters from database")) - return self.dbpool.runQuery("SELECT category,name,value FROM param_gen").addCallback(fillParams) - - def loadIndParams(self, params_ind, profile): - """Load individual parameters - - @param params_ind: dictionary to fill - @param profile: a profile which *must* exist - @return: deferred - """ - - def fillParams(result): - for param in result: - category, name, value = param - params_ind[(category, name)] = value - log.debug(_("loading individual parameters from database")) - d = self.dbpool.runQuery("SELECT category,name,value FROM param_ind WHERE profile_id=?", (self.profiles[profile], )) - d.addCallback(fillParams) - return d - - def getIndParam(self, category, name, profile): - """Ask database for the value of one specific individual parameter - - @param category: category of the parameter - @param name: name of the parameter - @param profile: %(doc_profile)s - @return: deferred - """ - d = self.dbpool.runQuery( - "SELECT value FROM param_ind WHERE category=? AND name=? AND profile_id=?", - (category, name, self.profiles[profile])) - d.addCallback(self.__getFirstResult) - return d - - async def getIndParamValues(self, category, name): - """Ask database for the individual values of a parameter for all profiles - - @param category: category of the parameter - @param name: name of the parameter - @return dict: profile => value map - """ - result = await self.dbpool.runQuery( - "SELECT profiles.name, param_ind.value FROM param_ind JOIN profiles ON " - "param_ind.profile_id = profiles.id WHERE param_ind.category=? " - "and param_ind.name=?", - (category, name)) - return dict(result) - - def setGenParam(self, category, name, value): - """Save the general parameters in database - - @param category: category of the parameter - @param name: name of the parameter - @param value: value to set - @return: deferred""" - d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value)) - d.addErrback(lambda ignore: log.error(_("Can't set general parameter (%(category)s/%(name)s) in database" % {"category": category, "name": name}))) - return d - - def setIndParam(self, category, name, value, profile): - """Save the individual parameters in database - - @param category: category of the parameter - @param name: name of the parameter - @param value: value to set - @param profile: a profile which *must* exist - @return: deferred - """ - d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) - d.addErrback(lambda ignore: log.error(_("Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) - return d - - ## History - - def _addToHistoryCb(self, __, data): - # Message metadata were successfuly added to history - # now we can add message and subject - uid = data['uid'] - d_list = [] - for key in ('message', 'subject'): - for lang, value in data[key].items(): - if not value.strip(): - # no need to store empty messages - continue - d = self.dbpool.runQuery( - "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)" - .format(key=key), - (uid, value, lang or None)) - d.addErrback(lambda __: log.error( - _("Can't save following {key} in history (uid: {uid}, lang:{lang}):" - " {value}").format( - key=key, uid=uid, lang=lang, value=value))) - d_list.append(d) - try: - thread = data['extra']['thread'] - except KeyError: - pass - else: - thread_parent = data['extra'].get('thread_parent') - d = self.dbpool.runQuery( - "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", - (uid, thread, thread_parent)) - d.addErrback(lambda __: log.error( - _("Can't save following thread in history (uid: {uid}): thread: " - "{thread}), parent:{parent}").format( - uid=uid, thread=thread, parent=thread_parent))) - d_list.append(d) - return defer.DeferredList(d_list) - - def _addToHistoryEb(self, failure_, data): - failure_.trap(sqlite3.IntegrityError) - sqlite_msg = failure_.value.args[0] - if "UNIQUE constraint failed" in sqlite_msg: - log.debug("message {} is already in history, not storing it again" - .format(data['uid'])) - if 'received_timestamp' not in data: - log.warning( - "duplicate message is not delayed, this is maybe a bug: data={}" - .format(data)) - # we cancel message to avoid sending duplicate message to frontends - raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) - else: - log.error("Can't store message in history: {}".format(failure_)) - - def _logHistoryError(self, failure_, from_jid, to_jid, data): - if failure_.check(exceptions.CancelError): - # we propagate CancelError to avoid sending message to frontends - raise failure_ - log.error(_( - "Can't save following message in history: from [{from_jid}] to [{to_jid}] " - "(uid: {uid})") - .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])) - - def addToHistory(self, data, profile): - """Store a new message in history - - @param data(dict): message data as build by SatMessageProtocol.onMessage - """ - extra = pickle.dumps({k: v for k, v in data['extra'].items() - if k not in NOT_IN_EXTRA}, 0) - from_jid = data['from'] - to_jid = data['to'] - d = self.dbpool.runQuery( - "INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, " - "source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES " - "(?,?,?,?,?,?,?,?,?,?,?,?)", - (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), - self.profiles[profile], data['from'].userhost(), to_jid.userhost(), - from_jid.resource, to_jid.resource, data['timestamp'], - data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) - d.addCallbacks(self._addToHistoryCb, - self._addToHistoryEb, - callbackArgs=[data], - errbackArgs=[data]) - d.addErrback(self._logHistoryError, from_jid, to_jid, data) - return d - - def sqliteHistoryToList(self, query_result): - """Get SQL query result and return a list of message data dicts""" - result = [] - current = {'uid': None} - for row in reversed(query_result): - (uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, - received_timestamp, type_, extra, message, message_lang, subject, - subject_lang, thread, thread_parent) = row - if uid != current['uid']: - # new message - try: - extra = pickle.loads(extra or b"") - except EOFError: - extra = {} - current = { - 'from': "%s/%s" % (source, source_res) if source_res else source, - 'to': "%s/%s" % (dest, dest_res) if dest_res else dest, - 'uid': uid, - 'message': {}, - 'subject': {}, - 'type': type_, - 'extra': extra, - 'timestamp': timestamp, - } - if stanza_id is not None: - current['extra']['stanza_id'] = stanza_id - if update_uid is not None: - current['extra']['update_uid'] = update_uid - if received_timestamp is not None: - current['extra']['received_timestamp'] = str(received_timestamp) - result.append(current) - - if message is not None: - current['message'][message_lang or ''] = message - - if subject is not None: - current['subject'][subject_lang or ''] = subject - - if thread is not None: - current_extra = current['extra'] - current_extra['thread'] = thread - if thread_parent is not None: - current_extra['thread_parent'] = thread_parent - else: - if thread_parent is not None: - log.error( - "Database inconsistency: thread parent without thread (uid: " - "{uid}, thread_parent: {parent})" - .format(uid=uid, parent=thread_parent)) - - return result - - def listDict2listTuple(self, messages_data): - """Return a list of tuple as used in bridge from a list of messages data""" - ret = [] - for m in messages_data: - ret.append((m['uid'], m['timestamp'], m['from'], m['to'], m['message'], m['subject'], m['type'], m['extra'])) - return ret - - def historyGet(self, from_jid, to_jid, limit=None, between=True, filters=None, profile=None): - """Retrieve messages in history - - @param from_jid (JID): source JID (full, or bare for catchall) - @param to_jid (JID): dest JID (full, or bare for catchall) - @param limit (int): maximum number of messages to get: - - 0 for no message (returns the empty list) - - None for unlimited - @param between (bool): confound source and dest (ignore the direction) - @param filters (dict[unicode, unicode]): pattern to filter the history results - @param profile (unicode): %(doc_profile)s - @return: list of tuple as in [messageNew] - """ - assert profile - if filters is None: - filters = {} - if limit == 0: - return defer.succeed([]) - - query_parts = ["SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ - type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ - FROM history LEFT JOIN message ON history.uid = message.history_uid\ - LEFT JOIN subject ON history.uid=subject.history_uid\ - LEFT JOIN thread ON history.uid=thread.history_uid\ - WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here - values = [self.profiles[profile]] - - def test_jid(type_, jid_): - values.append(jid_.userhost()) - if jid_.resource: - values.append(jid_.resource) - return '({type_}=? AND {type_}_res=?)'.format(type_=type_) - return '{type_}=?'.format(type_=type_) - - if not from_jid and not to_jid: - # not jid specified, we want all one2one communications - pass - elif between: - if not from_jid or not to_jid: - # we only have one jid specified, we check all messages - # from or to this jid - jid_ = from_jid or to_jid - query_parts.append("AND ({source} OR {dest})".format( - source=test_jid('source', jid_), - dest=test_jid('dest' , jid_))) - else: - # we have 2 jids specified, we check all communications between - # those 2 jids - query_parts.append( - "AND (({source_from} AND {dest_to}) " - "OR ({source_to} AND {dest_from}))".format( - source_from=test_jid('source', from_jid), - dest_to=test_jid('dest', to_jid), - source_to=test_jid('source', to_jid), - dest_from=test_jid('dest', from_jid))) - else: - # we want one communication in specific direction (from somebody or - # to somebody). - q = [] - if from_jid is not None: - q.append(test_jid('source', from_jid)) - if to_jid is not None: - q.append(test_jid('dest', to_jid)) - query_parts.append("AND " + " AND ".join(q)) - - if filters: - if 'timestamp_start' in filters: - query_parts.append("AND timestamp>= ?") - values.append(float(filters['timestamp_start'])) - if 'before_uid' in filters: - query_parts.append("AND history.rowid<(select rowid from history where uid=?)") - values.append(filters['before_uid']) - if 'body' in filters: - # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html - query_parts.append("AND message LIKE ?") - values.append("%{}%".format(filters['body'])) - if 'search' in filters: - query_parts.append("AND (message LIKE ? OR source_res LIKE ?)") - values.extend(["%{}%".format(filters['search'])] * 2) - if 'types' in filters: - types = filters['types'].split() - query_parts.append("AND type IN ({})".format(','.join("?"*len(types)))) - values.extend(types) - if 'not_types' in filters: - types = filters['not_types'].split() - query_parts.append("AND type NOT IN ({})".format(','.join("?"*len(types)))) - values.extend(types) - if 'last_stanza_id' in filters: - # this request get the last message with a "stanza_id" that we - # have in history. This is mainly used to retrieve messages sent - # while we were offline, using MAM (XEP-0313). - if (filters['last_stanza_id'] is not True - or limit != 1): - raise ValueError("Unexpected values for last_stanza_id filter") - query_parts.append("AND stanza_id IS NOT NULL") - - - # timestamp may be identical for 2 close messages (specially when delay is - # used) that's why we order ties by received_timestamp - # We'll reverse the order in sqliteHistoryToList - # we use DESC here so LIMIT keep the last messages - query_parts.append("ORDER BY timestamp DESC, history.received_timestamp DESC") - if limit is not None: - query_parts.append("LIMIT ?") - values.append(limit) - - d = self.dbpool.runQuery(" ".join(query_parts), values) - d.addCallback(self.sqliteHistoryToList) - d.addCallback(self.listDict2listTuple) - return d - - ## Private values - - def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None): - """generic errback for data queries""" - log.error(_("Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format( - operation = operation, - namespace = namespace, - and_key = (" and key " + key) if key is not None else "", - for_profile = (' [' + profile + ']') if profile is not None else '', - msg = failure_)) - - def _load_pickle(self, v): - # FIXME: workaround for Python 3 port, some pickled data are byte while other are strings - try: - return pickle.loads(v) - except TypeError: - data = pickle.loads(v.encode('utf-8')) - log.warning(f"encoding issue in pickled data: {data}") - return data - - def _generateDataDict(self, query_result, binary): - if binary: - return {k: self._load_pickle(v) for k,v in query_result} - else: - return dict(query_result) - - def _getPrivateTable(self, binary, profile): - """Get table to use for private values""" - table = ['private'] - - if profile is None: - table.append('gen') - else: - table.append('ind') - - if binary: - table.append('bin') - - return '_'.join(table) - - def getPrivates(self, namespace, keys=None, binary=False, profile=None): - """Get private value(s) from databases - - @param namespace(unicode): namespace of the values - @param keys(iterable, None): keys of the values to get - None to get all keys/values - @param binary(bool): True to deserialise binary values - @param profile(unicode, None): profile to use for individual values - None to use general values - @return (dict[unicode, object]): gotten keys/values - """ - log.debug(_("getting {type}{binary} private values from database for namespace {namespace}{keys}".format( - type = "general" if profile is None else "individual", - binary = " binary" if binary else "", - namespace = namespace, - keys = " with keys {}".format(", ".join(keys)) if keys is not None else ""))) - table = self._getPrivateTable(binary, profile) - query_parts = ["SELECT key,value FROM", table, "WHERE namespace=?"] - args = [namespace] - - if keys is not None: - placeholders = ','.join(len(keys) * '?') - query_parts.append('AND key IN (' + placeholders + ')') - args.extend(keys) - - if profile is not None: - query_parts.append('AND profile_id=?') - args.append(self.profiles[profile]) - - d = self.dbpool.runQuery(" ".join(query_parts), args) - d.addCallback(self._generateDataDict, binary) - d.addErrback(self._privateDataEb, "get", namespace, profile=profile) - return d - - def setPrivateValue(self, namespace, key, value, binary=False, profile=None): - """Set a private value in database - - @param namespace(unicode): namespace of the values - @param key(unicode): key of the value to set - @param value(object): value to set - @param binary(bool): True if it's a binary values - binary values need to be serialised, used for everything but strings - @param profile(unicode, None): profile to use for individual value - if None, it's a general value - """ - table = self._getPrivateTable(binary, profile) - query_values_names = ['namespace', 'key', 'value'] - query_values = [namespace, key] - - if binary: - value = sqlite3.Binary(pickle.dumps(value, 0)) - - query_values.append(value) - - if profile is not None: - query_values_names.append('profile_id') - query_values.append(self.profiles[profile]) - - query_parts = ["REPLACE INTO", table, '(', ','.join(query_values_names), ')', - "VALUES (", ",".join('?'*len(query_values_names)), ')'] - - d = self.dbpool.runQuery(" ".join(query_parts), query_values) - d.addErrback(self._privateDataEb, "set", namespace, key, profile=profile) - return d - - def delPrivateValue(self, namespace, key, binary=False, profile=None): - """Delete private value from database - - @param category: category of the privateeter - @param key: key of the private value - @param binary(bool): True if it's a binary values - @param profile(unicode, None): profile to use for individual value - if None, it's a general value - """ - table = self._getPrivateTable(binary, profile) - query_parts = ["DELETE FROM", table, "WHERE namespace=? AND key=?"] - args = [namespace, key] - if profile is not None: - query_parts.append("AND profile_id=?") - args.append(self.profiles[profile]) - d = self.dbpool.runQuery(" ".join(query_parts), args) - d.addErrback(self._privateDataEb, "delete", namespace, key, profile=profile) - return d - - def delPrivateNamespace(self, namespace, binary=False, profile=None): - """Delete all data from a private namespace - - Be really cautious when you use this method, as all data with given namespace are - removed. - Params are the same as for delPrivateValue - """ - table = self._getPrivateTable(binary, profile) - query_parts = ["DELETE FROM", table, "WHERE namespace=?"] - args = [namespace] - if profile is not None: - query_parts.append("AND profile_id=?") - args.append(self.profiles[profile]) - d = self.dbpool.runQuery(" ".join(query_parts), args) - d.addErrback(self._privateDataEb, "delete namespace", namespace, profile=profile) - return d - - ## Files - - @defer.inlineCallbacks - def getFiles(self, client, file_id=None, version='', parent=None, type_=None, - file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None, - public_id=None, owner=None, access=None, projection=None, unique=False): - """retrieve files with with given filters - - @param file_id(unicode, None): id of the file - None to ignore - @param version(unicode, None): version of the file - None to ignore - empty string to look for current version - @param parent(unicode, None): id of the directory containing the files - None to ignore - empty string to look for root files/directories - @param projection(list[unicode], None): name of columns to retrieve - None to retrieve all - @param unique(bool): if True will remove duplicates - other params are the same as for [setFile] - @return (list[dict]): files corresponding to filters - """ - query_parts = ["SELECT"] - if unique: - query_parts.append('DISTINCT') - if projection is None: - projection = ['id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name', - 'size', 'namespace', 'media_type', 'media_subtype', 'public_id', 'created', 'modified', 'owner', - 'access', 'extra'] - query_parts.append(','.join(projection)) - query_parts.append("FROM files WHERE") - if client is not None: - filters = ['profile_id=?'] - args = [self.profiles[client.profile]] - else: - if public_id is None: - raise exceptions.InternalError( - "client can only be omitted when public_id is set") - filters = [] - args = [] - - if file_id is not None: - filters.append('id=?') - args.append(file_id) - if version is not None: - filters.append('version=?') - args.append(version) - if parent is not None: - filters.append('parent=?') - args.append(parent) - if type_ is not None: - filters.append('type=?') - args.append(type_) - if file_hash is not None: - filters.append('file_hash=?') - args.append(file_hash) - if hash_algo is not None: - filters.append('hash_algo=?') - args.append(hash_algo) - if name is not None: - filters.append('name=?') - args.append(name) - if namespace is not None: - filters.append('namespace=?') - args.append(namespace) - if mime_type is not None: - if '/' in mime_type: - filters.extend('media_type=?', 'media_subtype=?') - args.extend(mime_type.split('/', 1)) - else: - filters.append('media_type=?') - args.append(mime_type) - if public_id is not None: - filters.append('public_id=?') - args.append(public_id) - if owner is not None: - filters.append('owner=?') - args.append(owner.full()) - if access is not None: - raise NotImplementedError('Access check is not implemented yet') - # a JSON comparison is needed here - - filters = ' AND '.join(filters) - query_parts.append(filters) - query = ' '.join(query_parts) - - result = yield self.dbpool.runQuery(query, args) - files_data = [dict(list(zip(projection, row))) for row in result] - to_parse = {'access', 'extra'}.intersection(projection) - to_filter = {'owner'}.intersection(projection) - if to_parse or to_filter: - for file_data in files_data: - for key in to_parse: - value = file_data[key] - file_data[key] = {} if value is None else json.loads(value) - owner = file_data.get('owner') - if owner is not None: - file_data['owner'] = jid.JID(owner) - defer.returnValue(files_data) - - def setFile(self, client, name, file_id, version='', parent=None, type_=C.FILE_TYPE_FILE, - file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None, - public_id=None, created=None, modified=None, owner=None, access=None, extra=None): - """set a file metadata - - @param client(SatXMPPClient): client owning the file - @param name(str): name of the file (must not contain "/") - @param file_id(str): unique id of the file - @param version(str): version of this file - @param parent(str): id of the directory containing this file - None if it is a root file/directory - @param type_(str): one of: - - file - - directory - @param file_hash(str): unique hash of the payload - @param hash_algo(str): algorithm used for hashing the file (usually sha-256) - @param size(int): size in bytes - @param namespace(str, None): identifier (human readable is better) to group files - for instance, namespace could be used to group files in a specific photo album - @param mime_type(str): media type of the file, or None if not known/guessed - @param public_id(str): ID used to server the file publicly via HTTP - @param created(int): UNIX time of creation - @param modified(int,None): UNIX time of last modification, or None to use created date - @param owner(jid.JID, None): jid of the owner of the file (mainly useful for component) - @param access(dict, None): serialisable dictionary with access rules. See [memory.memory] for details - @param extra(dict, None): serialisable dictionary of any extra data - will be encoded to json in database - """ - if extra is not None: - assert isinstance(extra, dict) - - if mime_type is None: - media_type = media_subtype = None - elif '/' in mime_type: - media_type, media_subtype = mime_type.split('/', 1) - else: - media_type, media_subtype = mime_type, None - - query = ('INSERT INTO files(id, version, parent, type, file_hash, hash_algo, name, size, namespace, ' - 'media_type, media_subtype, public_id, created, modified, owner, access, extra, profile_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)') - d = self.dbpool.runQuery(query, (file_id, version.strip(), parent, type_, - file_hash, hash_algo, - name, size, namespace, - media_type, media_subtype, public_id, created, modified, - owner.full() if owner is not None else None, - json.dumps(access) if access else None, - json.dumps(extra) if extra else None, - self.profiles[client.profile])) - d.addErrback(lambda failure: log.error(_("Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure)))) - return d - - async def fileGetUsedSpace(self, client, owner): - """Get space used by owner of file""" - query = "SELECT SUM(size) FROM files WHERE owner=? AND type='file' AND profile_id=?" - ret = await self.dbpool.runQuery( - query, - (owner.userhost(), self.profiles[client.profile]) - ) - return ret[0][0] or 0 - - def _fileUpdate(self, cursor, file_id, column, update_cb): - query = 'SELECT {column} FROM files where id=?'.format(column=column) - for i in range(5): - cursor.execute(query, [file_id]) - try: - older_value_raw = cursor.fetchone()[0] - except TypeError: - raise exceptions.NotFound - if older_value_raw is None: - value = {} - else: - value = json.loads(older_value_raw) - update_cb(value) - value_raw = json.dumps(value) - if older_value_raw is None: - update_query = 'UPDATE files SET {column}=? WHERE id=? AND {column} is NULL'.format(column=column) - update_args = (value_raw, file_id) - else: - update_query = 'UPDATE files SET {column}=? WHERE id=? AND {column}=?'.format(column=column) - update_args = (value_raw, file_id, older_value_raw) - try: - cursor.execute(update_query, update_args) - except sqlite3.Error: - pass - else: - if cursor.rowcount == 1: - break; - log.warning(_("table not updated, probably due to race condition, trying again ({tries})").format(tries=i+1)) - else: - log.error(_("Can't update file table")) - - def fileUpdate(self, file_id, column, update_cb): - """Update a column value using a method to avoid race conditions - - the older value will be retrieved from database, then update_cb will be applied - to update it, and file will be updated checking that older value has not been changed meanwhile - by an other user. If it has changed, it tries again a couple of times before failing - @param column(str): column name (only "access" or "extra" are allowed) - @param update_cb(callable): method to update the value of the colum - the method will take older value as argument, and must update it in place - update_cb must not care about serialization, - it get the deserialized data (i.e. a Python object) directly - Note that the callable must be thread-safe - @raise exceptions.NotFound: there is not file with this id - """ - if column not in ('access', 'extra'): - raise exceptions.InternalError('bad column name') - return self.dbpool.runInteraction(self._fileUpdate, file_id, column, update_cb) - - def fileDelete(self, file_id): - """Delete file metadata from the database - - @param file_id(unicode): id of the file to delete - NOTE: file itself must still be removed, this method only handle metadata in - database - """ - return self.dbpool.runQuery("DELETE FROM files WHERE id = ?", (file_id,)) - - ##Helper methods## - - def __getFirstResult(self, result): - """Return the first result of a database query - Useful when we are looking for one specific value""" - return None if not result else result[0][0] - - -class Updater(object): - stmnt_regex = re.compile(r"[\w/' ]+(?:\(.*?\))?[^,]*") - clean_regex = re.compile(r"^ +|(?<= ) +|(?<=,) +| +$") - CREATE_SQL = "CREATE TABLE %s (%s)" - INSERT_SQL = "INSERT INTO %s VALUES (%s)" - INDEX_SQL = "CREATE INDEX %s ON %s(%s)" - DROP_SQL = "DROP TABLE %s" - ALTER_SQL = "ALTER TABLE %s ADD COLUMN %s" - RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s" - - CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN') - TMP_TABLE = "tmp_sat_update" - - def __init__(self, sqlite_storage, sat_version): - self._sat_version = sat_version - self.sqlite_storage = sqlite_storage - - @property - def dbpool(self): - return self.sqlite_storage.dbpool - - def getLocalVersion(self): - """ Get local database version - - @return: version (int) - """ - return self.dbpool.runQuery("PRAGMA user_version").addCallback(lambda ret: int(ret[0][0])) - - def _setLocalVersion(self, version): - """ Set local database version - - @param version: version (int) - @return: deferred - """ - return self.dbpool.runOperation("PRAGMA user_version=%d" % version) - - def getLocalSchema(self): - """ return raw local schema - - @return: list of strings with CREATE sql statements for local database - """ - d = self.dbpool.runQuery("select sql from sqlite_master where type = 'table'") - d.addCallback(lambda result: [row[0] for row in result]) - return d - - @defer.inlineCallbacks - def checkUpdates(self): - """ Check if a database schema/content update is needed, according to DATABASE_SCHEMAS - - @return: deferred which fire a list of SQL update statements, or None if no update is needed - """ - # TODO: only "table" type (i.e. "CREATE" statements) is checked, - # "index" should be checked too. - # This may be not relevant is we move to a higher level library (alchimia?) - local_version = yield self.getLocalVersion() - raw_local_sch = yield self.getLocalSchema() - - local_sch = self.rawStatements2data(raw_local_sch) - current_sch = DATABASE_SCHEMAS['current']['CREATE'] - local_hash = self.statementHash(local_sch) - current_hash = self.statementHash(current_sch) - - # Force the update if the schemas are unchanged but a specific update is needed - force_update = local_hash == current_hash and local_version < CURRENT_DB_VERSION \ - and {'index', 'specific'}.intersection(DATABASE_SCHEMAS[CURRENT_DB_VERSION]) - - if local_hash == current_hash and not force_update: - if local_version != CURRENT_DB_VERSION: - log.warning(_("Your local schema is up-to-date, but database versions mismatch, fixing it...")) - yield self._setLocalVersion(CURRENT_DB_VERSION) - else: - # an update is needed - - if local_version == CURRENT_DB_VERSION: - # Database mismatch and we have the latest version - if self._sat_version.endswith('D'): - # we are in a development version - update_data = self.generateUpdateData(local_sch, current_sch, False) - log.warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated")) - update_raw = yield self.update2raw(update_data, True) - defer.returnValue(update_raw) - else: - log.error(_("schema version is up-to-date, but local schema differ from expected current schema")) - update_data = self.generateUpdateData(local_sch, current_sch, True) - update_raw = yield self.update2raw(update_data) - log.warning(_("Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % '\n'.join("%s;" % statement for statement in update_raw)) - raise exceptions.DatabaseError("Database mismatch") - else: - if local_version > CURRENT_DB_VERSION: - log.error(_( - "You database version is higher than the one used in this SàT " - "version, are you using several version at the same time? We " - "can't run SàT with this database.")) - sys.exit(1) - - # Database is not up-to-date, we'll do the update - if force_update: - log.info(_("Database content needs a specific processing, local database will be updated")) - else: - log.info(_("Database schema has changed, local database will be updated")) - update_raw = [] - for version in range(local_version + 1, CURRENT_DB_VERSION + 1): - try: - update_data = DATABASE_SCHEMAS[version] - except KeyError: - raise exceptions.InternalError("Missing update definition (version %d)" % version) - if "specific" in update_data and update_raw: - # if we have a specific, we must commit current statements - # because a specific may modify database itself, and the database - # must be in the expected state of the previous version. - yield self.sqlite_storage.commitStatements(update_raw) - del update_raw[:] - update_raw_step = yield self.update2raw(update_data) - if update_raw_step is not None: - # can be None with specifics - update_raw.extend(update_raw_step) - update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION) - defer.returnValue(update_raw) - - @staticmethod - def createData2Raw(data): - """ Generate SQL statements from statements data - - @param data: dictionary with table as key, and statements data in tuples as value - @return: list of strings with raw statements - """ - ret = [] - for table in data: - defs, constraints = data[table] - assert isinstance(defs, tuple) - assert isinstance(constraints, tuple) - ret.append(Updater.CREATE_SQL % (table, ', '.join(defs + constraints))) - return ret - - @staticmethod - def insertData2Raw(data): - """ Generate SQL statements from statements data - - @param data: dictionary with table as key, and statements data in tuples as value - @return: list of strings with raw statements - """ - ret = [] - for table in data: - values_tuple = data[table] - assert isinstance(values_tuple, tuple) - for values in values_tuple: - assert isinstance(values, tuple) - ret.append(Updater.INSERT_SQL % (table, ', '.join(values))) - return ret - - @staticmethod - def indexData2Raw(data): - """ Generate SQL statements from statements data - - @param data: dictionary with table as key, and statements data in tuples as value - @return: list of strings with raw statements - """ - ret = [] - assert isinstance(data, tuple) - for table, col_data in data: - assert isinstance(table, str) - assert isinstance(col_data, tuple) - for cols in col_data: - if isinstance(cols, tuple): - assert all([isinstance(c, str) for c in cols]) - indexed_cols = ','.join(cols) - elif isinstance(cols, str): - indexed_cols = cols - else: - raise exceptions.InternalError("unexpected index columns value") - index_name = table + '__' + indexed_cols.replace(',', '_') - ret.append(Updater.INDEX_SQL % (index_name, table, indexed_cols)) - return ret - - def statementHash(self, data): - """ Generate hash of template data - - useful to compare schemas - @param data: dictionary of "CREATE" statement, with tables names as key, - and tuples of (col_defs, constraints) as values - @return: hash as string - """ - hash_ = hashlib.sha1() - tables = list(data.keys()) - tables.sort() - - def stmnts2str(stmts): - return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)]) - - for table in tables: - col_defs, col_constr = data[table] - hash_.update( - ("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr))) - .encode('utf-8')) - return hash_.digest() - - def rawStatements2data(self, raw_statements): - """ separate "CREATE" statements into dictionary/tuples data - - @param raw_statements: list of CREATE statements as strings - @return: dictionary with table names as key, and a (col_defs, constraints) tuple - """ - schema_dict = {} - for create_statement in raw_statements: - if not create_statement.startswith("CREATE TABLE "): - log.warning("Unexpected statement, ignoring it") - continue - _create_statement = create_statement[13:] - table, raw_col_stats = _create_statement.split(' ',1) - if raw_col_stats[0] != '(' or raw_col_stats[-1] != ')': - log.warning("Unexpected statement structure, ignoring it") - continue - col_stats = [stmt.strip() for stmt in self.stmnt_regex.findall(raw_col_stats[1:-1])] - col_defs = [] - constraints = [] - for col_stat in col_stats: - name = col_stat.split(' ',1)[0] - if name in self.CONSTRAINTS: - constraints.append(col_stat) - else: - col_defs.append(col_stat) - schema_dict[table] = (tuple(col_defs), tuple(constraints)) - return schema_dict - - def generateUpdateData(self, old_data, new_data, modify=False): - """ Generate data for automatic update between two schema data - - @param old_data: data of the former schema (which must be updated) - @param new_data: data of the current schema - @param modify: if True, always use "cols modify" table, else try to ALTER tables - @return: update data, a dictionary with: - - 'create': dictionary of tables to create - - 'delete': tuple of tables to delete - - 'cols create': dictionary of columns to create (table as key, tuple of columns to create as value) - - 'cols delete': dictionary of columns to delete (table as key, tuple of columns to delete as value) - - 'cols modify': dictionary of columns to modify (table as key, tuple of old columns to transfert as value). With this table, a new table will be created, and content from the old table will be transfered to it, only cols specified in the tuple will be transfered. - """ - - create_tables_data = {} - create_cols_data = {} - modify_cols_data = {} - delete_cols_data = {} - old_tables = set(old_data.keys()) - new_tables = set(new_data.keys()) - - def getChanges(set_olds, set_news): - to_create = set_news.difference(set_olds) - to_delete = set_olds.difference(set_news) - to_check = set_news.intersection(set_olds) - return tuple(to_create), tuple(to_delete), tuple(to_check) - - tables_to_create, tables_to_delete, tables_to_check = getChanges(old_tables, new_tables) - - for table in tables_to_create: - create_tables_data[table] = new_data[table] - - for table in tables_to_check: - old_col_defs, old_constraints = old_data[table] - new_col_defs, new_constraints = new_data[table] - for obj in old_col_defs, old_constraints, new_col_defs, new_constraints: - if not isinstance(obj, tuple): - raise exceptions.InternalError("Columns definitions must be tuples") - defs_create, defs_delete, ignore = getChanges(set(old_col_defs), set(new_col_defs)) - constraints_create, constraints_delete, ignore = getChanges(set(old_constraints), set(new_constraints)) - created_col_names = set([name.split(' ',1)[0] for name in defs_create]) - deleted_col_names = set([name.split(' ',1)[0] for name in defs_delete]) - if (created_col_names.intersection(deleted_col_names or constraints_create or constraints_delete) or - (modify and (defs_create or constraints_create or defs_delete or constraints_delete))): - # we have modified columns, we need to transfer table - # we determinate which columns are in both schema so we can transfer them - old_names = set([name.split(' ',1)[0] for name in old_col_defs]) - new_names = set([name.split(' ',1)[0] for name in new_col_defs]) - modify_cols_data[table] = tuple(old_names.intersection(new_names)); - else: - if defs_create: - create_cols_data[table] = (defs_create) - if defs_delete or constraints_delete: - delete_cols_data[table] = (defs_delete) - - return {'create': create_tables_data, - 'delete': tables_to_delete, - 'cols create': create_cols_data, - 'cols delete': delete_cols_data, - 'cols modify': modify_cols_data - } - - @defer.inlineCallbacks - def update2raw(self, update, dev_version=False): - """ Transform update data to raw SQLite statements - - @param update: update data as returned by generateUpdateData - @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database. - @return: list of string with SQL statements needed to update the base - """ - ret = self.createData2Raw(update.get('create', {})) - drop = [] - for table in update.get('delete', tuple()): - drop.append(self.DROP_SQL % table) - if dev_version: - if drop: - log.info("Dev version, SQL NOT EXECUTED:\n--\n%s\n--\n" % "\n".join(drop)) - else: - ret.extend(drop) - - cols_create = update.get('cols create', {}) - for table in cols_create: - for col_def in cols_create[table]: - ret.append(self.ALTER_SQL % (table, col_def)) - - cols_delete = update.get('cols delete', {}) - for table in cols_delete: - log.info("Following columns in table [%s] are not needed anymore, but are kept for dev version: %s" % (table, ", ".join(cols_delete[table]))) - - cols_modify = update.get('cols modify', {}) - for table in cols_modify: - ret.append(self.RENAME_TABLE_SQL % (table, self.TMP_TABLE)) - main, extra = DATABASE_SCHEMAS['current']['CREATE'][table] - ret.append(self.CREATE_SQL % (table, ', '.join(main + extra))) - common_cols = ', '.join(cols_modify[table]) - ret.append("INSERT INTO %s (%s) SELECT %s FROM %s" % (table, common_cols, common_cols, self.TMP_TABLE)) - ret.append(self.DROP_SQL % self.TMP_TABLE) - - insert = update.get('insert', {}) - ret.extend(self.insertData2Raw(insert)) - - index = update.get('index', tuple()) - ret.extend(self.indexData2Raw(index)) - - specific = update.get('specific', None) - if specific: - cmds = yield getattr(self, specific)() - ret.extend(cmds or []) - defer.returnValue(ret) - - def update_v9(self): - """Update database from v8 to v9 - - (public_id on file with UNIQUE constraint, files indexes fix, media_type split) - """ - # we have to do a specific update because we can't set UNIQUE constraint when adding a column - # (see https://sqlite.org/lang_altertable.html#alter_table_add_column) - log.info("Database update to v9") - - create = { - 'files': (("id TEXT NOT NULL", "public_id TEXT", "version TEXT NOT NULL", - "parent TEXT NOT NULL", - "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( - file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), - "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", - "namespace TEXT", "media_type TEXT", "media_subtype TEXT", - "created DATETIME NOT NULL", "modified DATETIME", - "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"), - ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", - "UNIQUE (public_id)")), - - } - index = tuple({'files': (('profile_id', 'owner', 'media_type', 'media_subtype'), - ('profile_id', 'owner', 'parent'))}.items()) - # XXX: Sqlite doc recommends to do the other way around (i.e. create new table, - # copy, drop old table then rename), but the RENAME would then add - # "IF NOT EXISTS" which breaks the (admittely fragile) schema comparison. - # TODO: rework sqlite update management, don't try to automatically detect - # update, the database version is now enough. - statements = ["ALTER TABLE files RENAME TO files_old"] - statements.extend(Updater.createData2Raw(create)) - cols = ','.join([col_stmt.split()[0] for col_stmt in create['files'][0] if "public_id" not in col_stmt]) - old_cols = cols[:] - # we need to split mime_type to the new media_type and media_subtype - old_cols = old_cols.replace( - 'media_type,media_subtype', - "substr(mime_type, 0, instr(mime_type,'/')),substr(mime_type, instr(mime_type,'/')+1)" - ) - statements.extend([ - f"INSERT INTO files({cols}) SELECT {old_cols} FROM files_old", - "DROP TABLE files_old", - ]) - statements.extend(Updater.indexData2Raw(index)) - return statements - - def update_v8(self): - """Update database from v7 to v8 (primary keys order changes + indexes)""" - log.info("Database update to v8") - statements = ["PRAGMA foreign_keys = OFF"] - - # here is a copy of create and index data, we can't use "current" table - # because it may change in a future version, which would break the update - # when doing v8 - create = { - 'param_gen': ( - ("category TEXT", "name TEXT", "value TEXT"), - ("PRIMARY KEY (category, name)",)), - 'param_ind': ( - ("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"), - ("PRIMARY KEY (profile_id, category, name)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")), - 'private_ind': ( - ("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"), - ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")), - 'private_ind_bin': ( - ("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"), - ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")), - } - index = ( - ('history', (('profile_id', 'timestamp'), - ('profile_id', 'received_timestamp'))), - ('message', ('history_uid',)), - ('subject', ('history_uid',)), - ('thread', ('history_uid',)), - ('files', ('profile_id', 'mime_type', 'owner', 'parent'))) - - for table in ('param_gen', 'param_ind', 'private_ind', 'private_ind_bin'): - statements.append("ALTER TABLE {0} RENAME TO {0}_old".format(table)) - schema = {table: create[table]} - cols = [d.split()[0] for d in schema[table][0]] - statements.extend(Updater.createData2Raw(schema)) - statements.append("INSERT INTO {table}({cols}) " - "SELECT {cols} FROM {table}_old".format( - table=table, - cols=','.join(cols))) - statements.append("DROP TABLE {}_old".format(table)) - - statements.extend(Updater.indexData2Raw(index)) - statements.append("PRAGMA foreign_keys = ON") - return statements - - @defer.inlineCallbacks - def update_v7(self): - """Update database from v6 to v7 (history unique constraint change)""" - log.info("Database update to v7, this may be long depending on your history " - "size, please be patient.") - - log.info("Some cleaning first") - # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema - # normally database should not contain any, but better safe than sorry. - rows = yield self.dbpool.runQuery( - "SELECT stanza_id, COUNT(*) as c FROM history WHERE stanza_id is not NULL " - "GROUP BY stanza_id HAVING c>1") - if rows: - count = sum([r[1] for r in rows]) - len(rows) - log.info("{count} duplicate stanzas found, cleaning".format(count=count)) - for stanza_id, count in rows: - log.info("cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id)) - row_uids = yield self.dbpool.runQuery( - "SELECT uid FROM history WHERE stanza_id = ? LIMIT ?", - (stanza_id, count-1)) - uids = [r[0] for r in row_uids] - yield self.dbpool.runQuery( - "DELETE FROM history WHERE uid IN ({})".format(",".join("?"*len(uids))), - uids) - - def deleteInfo(txn): - # with foreign_keys on, the delete takes ages, so we deactivate it here - # the time to delete info messages from history. - txn.execute("PRAGMA foreign_keys = OFF") - txn.execute("DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " - "type='info')") - txn.execute("DELETE FROM subject WHERE history_uid IN (SELECT uid FROM history WHERE " - "type='info')") - txn.execute("DELETE FROM thread WHERE history_uid IN (SELECT uid FROM history WHERE " - "type='info')") - txn.execute("DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " - "type='info')") - txn.execute("DELETE FROM history WHERE type='info'") - # not sure that is is necessary to reactivate here, but in doubt… - txn.execute("PRAGMA foreign_keys = ON") - - log.info('Deleting "info" messages (this can take a while)') - yield self.dbpool.runInteraction(deleteInfo) - - log.info("Cleaning done") - - # we have to rename table we will replace - # tables referencing history need to be replaced to, else reference would - # be to the old table (which will be dropped at the end). This buggy behaviour - # seems to be fixed in new version of Sqlite - yield self.dbpool.runQuery("ALTER TABLE history RENAME TO history_old") - yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old") - yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old") - yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old") - - # history - query = ("CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, " - "update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, " - "source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, " - "received_timestamp DATETIME, type TEXT, extra BLOB, " - "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, " - "FOREIGN KEY(type) REFERENCES message_types(type), " - "UNIQUE (profile_id, stanza_id, source, dest))") - yield self.dbpool.runQuery(query) - - # message - query = ("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" - ", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " - "history(uid) ON DELETE CASCADE)") - yield self.dbpool.runQuery(query) - - # subject - query = ("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" - ", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " - "history(uid) ON DELETE CASCADE)") - yield self.dbpool.runQuery(query) - - # thread - query = ("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" - ", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES " - "history(uid) ON DELETE CASCADE)") - yield self.dbpool.runQuery(query) - - log.info("Now transfering old data to new tables, please be patient.") - - log.info("\nTransfering table history") - query = ("INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, " - "dest, source_res, dest_res, timestamp, received_timestamp, type, extra" - ") SELECT uid, stanza_id, update_uid, profile_id, source, dest, " - "source_res, dest_res, timestamp, received_timestamp, type, extra " - "FROM history_old") - yield self.dbpool.runQuery(query) - - log.info("\nTransfering table message") - query = ("INSERT INTO message (id, history_uid, message, language) SELECT id, " - "history_uid, message, language FROM message_old") - yield self.dbpool.runQuery(query) - - log.info("\nTransfering table subject") - query = ("INSERT INTO subject (id, history_uid, subject, language) SELECT id, " - "history_uid, subject, language FROM subject_old") - yield self.dbpool.runQuery(query) - - log.info("\nTransfering table thread") - query = ("INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id" - ", history_uid, thread_id, parent_id FROM thread_old") - yield self.dbpool.runQuery(query) - - log.info("\nRemoving old tables") - # because of foreign keys, tables referencing history_old - # must be deleted first - yield self.dbpool.runQuery("DROP TABLE thread_old") - yield self.dbpool.runQuery("DROP TABLE subject_old") - yield self.dbpool.runQuery("DROP TABLE message_old") - yield self.dbpool.runQuery("DROP TABLE history_old") - log.info("\nReducing database size (this can take a while)") - yield self.dbpool.runQuery("VACUUM") - log.info("Database update done :)") - - @defer.inlineCallbacks - def update_v3(self): - """Update database from v2 to v3 (message refactoring)""" - # XXX: this update do all the messages in one huge transaction - # this is really memory consuming, but was OK on a reasonably - # big database for tests. If issues are happening, we can cut it - # in smaller transactions using LIMIT and by deleting already updated - # messages - log.info("Database update to v3, this may take a while") - - # we need to fix duplicate timestamp, as it can result in conflicts with the new schema - rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1") - if rows: - log.info("fixing duplicate timestamp") - fixed = [] - for timestamp, __ in rows: - ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,)) - for idx, (id_,) in enumerate(ids_rows): - fixed.append(id_) - yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_)) - log.info("fixed messages with ids {}".format(', '.join([str(id_) for id_ in fixed]))) - - def historySchema(txn): - log.info("History schema update") - txn.execute("ALTER TABLE history RENAME TO tmp_sat_update") - txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))") - txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update") - - yield self.dbpool.runInteraction(historySchema) - - def newTables(txn): - log.info("Creating new tables") - txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") - txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") - txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") - - yield self.dbpool.runInteraction(newTables) - - log.info("inserting new message type") - yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',)) - - log.info("messages update") - rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update") - total = len(rows) - - def updateHistory(txn, queries): - for query, args in iter(queries): - txn.execute(query, args) - - queries = [] - for idx, row in enumerate(rows, 1): - if idx % 1000 == 0 or total - idx == 0: - log.info("preparing message {}/{}".format(idx, total)) - id_, timestamp, message, extra = row - try: - extra = pickle.loads(str(extra or "")) - except EOFError: - extra = {} - except Exception: - log.warning("Can't handle extra data for message id {}, ignoring it".format(id_)) - extra = {} - - queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message))) - - try: - subject = extra.pop('subject') - except KeyError: - pass - else: - try: - subject = subject - except UnicodeEncodeError: - log.warning("Error while decoding subject, ignoring it") - del extra['subject'] - else: - queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject))) - - received_timestamp = extra.pop('timestamp', None) - try: - del extra['archive'] - except KeyError: - # archive was not used - pass - - queries.append(("UPDATE history SET received_timestamp=?,extra=? WHERE uid=?",(id_, received_timestamp, sqlite3.Binary(pickle.dumps(extra, 0))))) - - yield self.dbpool.runInteraction(updateHistory, queries) - - log.info("Dropping temporary table") - yield self.dbpool.runQuery("DROP TABLE tmp_sat_update") - log.info("Database update finished :)") - - def update2raw_v2(self): - """Update the database from v1 to v2 (add passwords encryptions): - - - the XMPP password value is re-used for the profile password (new parameter) - - the profile password is stored hashed - - the XMPP password is stored encrypted, with the profile password as key - - as there are no other stored passwords yet, it is enough, otherwise we - would need to encrypt the other passwords as it's done for XMPP password - """ - xmpp_pass_path = ('Connection', 'Password') - - def encrypt_values(values): - ret = [] - list_ = [] - - def prepare_queries(result, xmpp_password): - try: - id_ = result[0][0] - except IndexError: - log.error("Profile of id %d is referenced in 'param_ind' but it doesn't exist!" % profile_id) - return defer.succeed(None) - - sat_password = xmpp_password - sat_cipher = PasswordHasher.hash(sat_password) - personal_key = BlockCipher.getRandomKey(base64=True) - personal_cipher = BlockCipher.encrypt(sat_password, personal_key) - xmpp_cipher = BlockCipher.encrypt(personal_key, xmpp_password) - - ret.append("INSERT INTO param_ind(category,name,profile_id,value) VALUES ('%s','%s',%s,'%s')" % - (C.PROFILE_PASS_PATH[0], C.PROFILE_PASS_PATH[1], id_, sat_cipher)) - - ret.append("INSERT INTO private_ind(namespace,key,profile_id,value) VALUES ('%s','%s',%s,'%s')" % - (C.MEMORY_CRYPTO_NAMESPACE, C.MEMORY_CRYPTO_KEY, id_, personal_cipher)) - - ret.append("REPLACE INTO param_ind(category,name,profile_id,value) VALUES ('%s','%s',%s,'%s')" % - (xmpp_pass_path[0], xmpp_pass_path[1], id_, xmpp_cipher)) - - - for profile_id, xmpp_password in values: - d = self.dbpool.runQuery("SELECT id FROM profiles WHERE id=?", (profile_id,)) - d.addCallback(prepare_queries, xmpp_password) - list_.append(d) - - d_list = defer.DeferredList(list_) - d_list.addCallback(lambda __: ret) - return d_list - - def updateLiberviaConf(values): - try: - profile_id = values[0][0] - except IndexError: - return # no profile called "libervia" - - def cb(selected): - try: - password = selected[0][0] - except IndexError: - log.error("Libervia profile exists but no password is set! Update Libervia configuration will be skipped.") - return - fixConfigOption('libervia', 'passphrase', password, False) - d = self.dbpool.runQuery("SELECT value FROM param_ind WHERE category=? AND name=? AND profile_id=?", xmpp_pass_path + (profile_id,)) - return d.addCallback(cb) - - d = self.dbpool.runQuery("SELECT id FROM profiles WHERE name='libervia'") - d.addCallback(updateLiberviaConf) - d.addCallback(lambda __: self.dbpool.runQuery("SELECT profile_id,value FROM param_ind WHERE category=? AND name=?", xmpp_pass_path)) - d.addCallback(encrypt_values) - return d diff -r 0985c47ffd96 -r f9a5b810f14d sat/tools/utils.py --- a/sat/tools/utils.py Thu Jun 03 15:15:11 2021 +0200 +++ b/sat/tools/utils.py Thu Jun 03 15:20:47 2021 +0200 @@ -28,7 +28,7 @@ import inspect import textwrap import functools -from asyncio import iscoroutine +import asyncio from twisted.python import procutils, failure from twisted.internet import defer from sat.core.constants import Const as C @@ -102,7 +102,7 @@ except Exception as e: return defer.fail(failure.Failure(e)) else: - if iscoroutine(ret): + if asyncio.iscoroutine(ret): return defer.ensureDeferred(ret) elif isinstance(ret, defer.Deferred): return ret @@ -112,6 +112,20 @@ return defer.succeed(ret) +def aio(func): + """Decorator to return a Deferred from asyncio coroutine + + Functions with this decorator are run in asyncio context + """ + def wrapper(*args, **kwargs): + return defer.Deferred.fromFuture(asyncio.ensure_future(func(*args, **kwargs))) + return wrapper + + +def as_future(d): + return d.asFuture(asyncio.get_event_loop()) + + def xmpp_date(timestamp=None, with_time=True): """Return date according to XEP-0082 specification diff -r 0985c47ffd96 -r f9a5b810f14d setup.py --- a/setup.py Thu Jun 03 15:15:11 2021 +0200 +++ b/setup.py Thu Jun 03 15:20:47 2021 +0200 @@ -52,6 +52,8 @@ 'omemo >= 0.11.0', 'omemo-backend-signal', 'pyyaml', + 'sqlalchemy >= 1.4', + 'aiosqlite', ] extras_require = { diff -r 0985c47ffd96 -r f9a5b810f14d twisted/plugins/sat_plugin.py --- a/twisted/plugins/sat_plugin.py Thu Jun 03 15:15:11 2021 +0200 +++ b/twisted/plugins/sat_plugin.py Thu Jun 03 15:20:47 2021 +0200 @@ -63,10 +63,11 @@ pass def makeService(self, options): - from twisted.internet import gireactor - gireactor.install() + from twisted.internet import asyncioreactor + asyncioreactor.install() self.setDebugger() - # XXX: SAT must be imported after log configuration, because it write stuff to logs + # XXX: Libervia must be imported after log configuration, + # because it write stuff to logs initialise(options.parent) from sat.core.sat_main import SAT return SAT()