Mercurial > libervia-backend
view libervia/backend/memory/sqla.py @ 4303:a7ec325246fb
component email-gateway: first draft:
Initial implementation of the Email Gateway.
This component uses XEP-0100 for registration. Upon registration and subsequent startups,
a connection is made to registered IMAP services, and incoming emails (in `INBOX`
mailboxes) are immediately forwarded as XMPP messages.
In the opposite direction, an SMTP connection is established to send emails on incoming
XMPP messages.
rel 449
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Sep 2024 18:07:17 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio from asyncio.subprocess import PIPE import copy from datetime import datetime import json from pathlib import Path import sys import time from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union from alembic import config as al_config, script as al_script from alembic.runtime import migration as al_migration from sqlalchemy import and_, delete, event, func, or_, update from sqlalchemy import Integer, literal_column, text from sqlalchemy.dialects.sqlite import insert from sqlalchemy.engine import Connection, Engine from sqlalchemy.exc import IntegrityError, NoResultFound from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine from sqlalchemy.future import select from sqlalchemy.orm import ( contains_eager, joinedload, selectinload, sessionmaker, subqueryload, ) from sqlalchemy.orm.attributes import Mapped from sqlalchemy.orm.decl_api import DeclarativeMeta from sqlalchemy.sql.functions import coalesce, count, now, sum as sum_ from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish import domish from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.memory import migration from libervia.backend.memory import sqla_config from libervia.backend.memory.sqla_mapping import ( Base, Component, File, History, Message, NOT_IN_EXTRA, Notification, NotificationPriority, NotificationStatus, NotificationType, ParamGen, ParamInd, PrivateGen, PrivateGenBin, PrivateInd, PrivateIndBin, Profile, PubsubItem, PubsubNode, Subject, SyncState, Thread, get_profile_by_id, profiles, ) from libervia.backend.tools.common import uri from libervia.backend.tools.utils import aio, as_future log = getLogger(__name__) migration_path = Path(migration.__file__).parent #: mapping of Libervia search query operators to SQLAlchemy method name OP_MAP = { "==": "__eq__", "eq": "__eq__", "!=": "__ne__", "ne": "__ne__", ">": "__gt__", "gt": "__gt__", "<": "__le__", "le": "__le__", "between": "between", "in": "in_", "not_in": "not_in", "overlap": "in_", "ioverlap": "in_", "disjoint": "in_", "idisjoint": "in_", "like": "like", "ilike": "ilike", "not_like": "notlike", "not_ilike": "notilike", } @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() class Storage: def __init__(self): self.initialized = defer.Deferred() # we keep cache for the profiles (key: profile name, value: profile id) # profile id to component entry point self.components: Dict[int, str] = {} @property def profiles(self): return profiles def get_profile_by_id(self, profile_id): return get_profile_by_id(profile_id) async def migrate_apply(self, *args: str, log_output: bool = False) -> None: """Do a migration command Commands are applied by running Alembic in a subprocess. Arguments are alembic executables commands @param log_output: manage stdout and stderr: - if False, stdout and stderr are buffered, and logged only in case of error - if True, stdout and stderr will be logged during the command execution @raise exceptions.DatabaseError: something went wrong while running the process """ stdout, stderr = 2 * (None,) if log_output else 2 * (PIPE,) proc = await asyncio.create_subprocess_exec( sys.executable, "-m", "alembic", *args, stdout=stdout, stderr=stderr, cwd=migration_path, ) log_out, log_err = await proc.communicate() if proc.returncode != 0: msg = _("Can't {operation} database (exit code {exit_code})").format( operation=args[0], exit_code=proc.returncode ) if log_out or log_err: msg += f":\nstdout: {log_out.decode()}\nstderr: {log_err.decode()}" log.error(msg) raise exceptions.DatabaseError(msg) async def create_db(self, engine: AsyncEngine, db_config: dict) -> None: """Create a new database The database is generated from SQLAlchemy model, then stamped by Alembic """ # the dir may not exist if it's not the XDG recommended one db_config["path"].parent.mkdir(0o700, True, True) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) log.debug("stamping the database") await self.migrate_apply("stamp", "head") log.debug("stamping done") def _check_db_is_up_to_date(self, conn: Connection) -> bool: al_ini_path = migration_path / "alembic.ini" al_cfg = al_config.Config(al_ini_path) directory = al_script.ScriptDirectory.from_config(al_cfg) context = al_migration.MigrationContext.configure(conn) return set(context.get_current_heads()) == set(directory.get_heads()) def _sqlite_set_journal_mode_wal(self, conn: Connection) -> None: """Check if journal mode is WAL, and set it if necesssary""" result = conn.execute(text("PRAGMA journal_mode")) if result.scalar() != "wal": log.info("WAL mode not activated, activating it") conn.execute(text("PRAGMA journal_mode=WAL")) async def check_and_update_db(self, engine: AsyncEngine, db_config: dict) -> None: """Check that database is up-to-date, and update if necessary""" async with engine.connect() as conn: up_to_date = await conn.run_sync(self._check_db_is_up_to_date) if up_to_date: log.debug("Database is up-to-date") else: log.info("Database needs to be updated") log.info("updating…") await self.migrate_apply("upgrade", "head", log_output=True) log.info("Database is now up-to-date") @aio async def initialise(self) -> None: log.info(_("Connecting database")) db_config = sqla_config.get_db_config() engine = create_async_engine( db_config["url"], future=True, json_serializer=lambda obj: json.dumps(obj, ensure_ascii=False), ) new_base = not db_config["path"].exists() if new_base: log.info(_("The database is new, creating the tables")) await self.create_db(engine, db_config) else: await self.check_and_update_db(engine, db_config) async with engine.connect() as conn: await conn.run_sync(self._sqlite_set_journal_mode_wal) self.session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) async with self.session() as session: result = await session.execute(select(Profile)) for p in result.scalars(): self.profiles[p.name] = p.id result = await session.execute(select(Component)) for c in result.scalars(): self.components[c.profile_id] = c.entry_point self.initialized.callback(None) ## Generic @aio async def get( self, client: SatXMPPEntity, db_cls: DeclarativeMeta, db_id_col: Mapped, id_value: Any, joined_loads=None, ) -> Optional[DeclarativeMeta]: stmt = select(db_cls).where(db_id_col == id_value) if client is not None: stmt = stmt.filter_by(profile_id=self.profiles[client.profile]) if joined_loads is not None: for joined_load in joined_loads: stmt = stmt.options(joinedload(joined_load)) async with self.session() as session: result = await session.execute(stmt) if joined_loads is not None: result = result.unique() return result.scalar_one_or_none() @aio async def add(self, db_obj: DeclarativeMeta) -> None: """Add an object to database""" async with self.session() as session: async with session.begin(): session.add(db_obj) @aio async def delete( self, db_obj: Union[DeclarativeMeta, List[DeclarativeMeta]], session_add: Optional[List[DeclarativeMeta]] = None, ) -> None: """Delete an object from database @param db_obj: object to delete or list of objects to delete @param session_add: other objects to add to session. This is useful when parents of deleted objects needs to be updated too, or if other objects needs to be updated in the same transaction. """ if not db_obj: return if not isinstance(db_obj, list): db_obj = [db_obj] async with self.session() as session: async with session.begin(): if session_add is not None: for obj in session_add: session.add(obj) for obj in db_obj: await session.delete(obj) await session.commit() ## Profiles def get_profiles_list(self) -> List[str]: """ "Return list of all registered profiles""" return list(self.profiles.keys()) def has_profile(self, profile_name: str) -> bool: """return True if profile_name exists @param profile_name: name of the profile to check """ return profile_name in self.profiles def profile_is_component(self, profile_name: str) -> bool: try: return self.profiles[profile_name] in self.components except KeyError: raise exceptions.NotFound("the requested profile doesn't exists") def get_entry_point(self, profile_name: str) -> str: try: return self.components[self.profiles[profile_name]] except KeyError: raise exceptions.NotFound( "the requested profile doesn't exists or is not a component" ) @aio async def create_profile(self, name: str, component_ep: Optional[str] = None) -> None: """Create a new profile @param name: name of the profile @param component: if not None, must point to a component entry point """ async with self.session() as session: profile = Profile(name=name) async with session.begin(): session.add(profile) self.profiles[profile.name] = profile.id if component_ep is not None: async with session.begin(): component = Component(profile=profile, entry_point=component_ep) session.add(component) self.components[profile.id] = component_ep return profile @aio async def delete_profile(self, name: str) -> None: """Delete profile @param name: name of the profile """ async with self.session() as session: result = await session.execute(select(Profile).where(Profile.name == name)) profile = result.scalar() await session.delete(profile) await session.commit() del self.profiles[profile.name] if profile.id in self.components: del self.components[profile.id] log.info(_("Profile {name!r} deleted").format(name=name)) ## Params @aio async def load_gen_params(self, params_gen: dict) -> None: """Load general parameters @param params_gen: dictionary to fill """ log.debug(_("loading general parameters from database")) async with self.session() as session: result = await session.execute(select(ParamGen)) for p in result.scalars(): params_gen[(p.category, p.name)] = p.value @aio async def load_ind_params(self, params_ind: dict, profile: str) -> None: """Load individual parameters @param params_ind: dictionary to fill @param profile: a profile which *must* exist """ log.debug(_("loading individual parameters from database")) async with self.session() as session: result = await session.execute( select(ParamInd).where(ParamInd.profile_id == self.profiles[profile]) ) for p in result.scalars(): params_ind[(p.category, p.name)] = p.value @aio async def get_ind_param( self, category: str, name: str, profile: str ) -> Optional[str]: """Ask database for the value of one specific individual parameter @param category: category of the parameter @param name: name of the parameter @param profile: %(doc_profile)s """ async with self.session() as session: result = await session.execute( select(ParamInd.value).filter_by( category=category, name=name, profile_id=self.profiles[profile] ) ) return result.scalar_one_or_none() @aio async def get_ind_param_values(self, category: str, name: str) -> Dict[str, str]: """Ask database for the individual values of a parameter for all profiles @param category: category of the parameter @param name: name of the parameter @return dict: profile => value map """ async with self.session() as session: result = await session.execute( select(ParamInd) .filter_by(category=category, name=name) .options(subqueryload(ParamInd.profile)) ) return {param.profile.name: param.value for param in result.scalars()} @aio async def set_gen_param(self, category: str, name: str, value: Optional[str]) -> None: """Save the general parameters in database @param category: category of the parameter @param name: name of the parameter @param value: value to set """ async with self.session() as session: stmt = ( insert(ParamGen) .values(category=category, name=name, value=value) .on_conflict_do_update( index_elements=(ParamGen.category, ParamGen.name), set_={ParamGen.value: value}, ) ) await session.execute(stmt) await session.commit() @aio async def set_ind_param( self, category: str, name: str, value: Optional[str], profile: str ) -> None: """Save the individual parameters in database @param category: category of the parameter @param name: name of the parameter @param value: value to set @param profile: a profile which *must* exist """ async with self.session() as session: stmt = ( insert(ParamInd) .values( category=category, name=name, profile_id=self.profiles[profile], value=value, ) .on_conflict_do_update( index_elements=( ParamInd.category, ParamInd.name, ParamInd.profile_id, ), set_={ParamInd.value: value}, ) ) await session.execute(stmt) await session.commit() def _jid_filter(self, jid_: jid.JID, dest: bool = False): """Generate condition to filter on a JID, using relevant columns @param dest: True if it's the destinee JID, otherwise it's the source one @param jid_: JID to filter by """ if jid_.resource: if dest: return and_( History.dest == jid_.userhost(), History.dest_res == jid_.resource ) else: return and_( History.source == jid_.userhost(), History.source_res == jid_.resource ) else: if dest: return History.dest == jid_.userhost() else: return History.source == jid_.userhost() @aio async def history_get( self, from_jid: Optional[jid.JID], to_jid: Optional[jid.JID], limit: Optional[int] = None, between: bool = True, filters: Optional[Dict[str, str]] = None, profile: Optional[str] = None, ) -> List[Tuple[str, int, str, str, Dict[str, str], Dict[str, str], str, str, str]]: """Retrieve messages in history @param from_jid: source JID (full, or bare for catchall) @param to_jid: dest JID (full, or bare for catchall) @param limit: maximum number of messages to get: - 0 for no message (returns the empty list) - None for unlimited @param between: confound source and dest (ignore the direction) @param filters: pattern to filter the history results @return: list of messages as in [message_new], minus the profile which is already known. """ # we have to set a default value to profile because it's last argument # and thus follow other keyword arguments with default values # but None should not be used for it assert profile is not None if limit == 0: return [] if filters is None: filters = {} stmt = ( select(History) .filter_by(profile_id=self.profiles[profile]) .outerjoin(History.messages) .outerjoin(History.subjects) .outerjoin(History.thread) .options( contains_eager(History.messages), contains_eager(History.subjects), contains_eager(History.thread), ) .order_by( # timestamp may be identical for 2 close messages (specially when delay is # used) that's why we order ties by received_timestamp. We'll reverse the # order when returning the result. We use DESC here so LIMIT keep the last # messages History.timestamp.desc(), History.received_timestamp.desc(), ) ) if not from_jid and not to_jid: # no jid specified, we want all one2one communications pass elif between: if not from_jid or not to_jid: # we only have one jid specified, we check all messages # from or to this jid jid_ = from_jid or to_jid stmt = stmt.where( or_(self._jid_filter(jid_), self._jid_filter(jid_, dest=True)) ) else: # we have 2 jids specified, we check all communications between # those 2 jids stmt = stmt.where( or_( and_( self._jid_filter(from_jid), self._jid_filter(to_jid, dest=True), ), and_( self._jid_filter(to_jid), self._jid_filter(from_jid, dest=True), ), ) ) else: # we want one communication in specific direction (from somebody or # to somebody). if from_jid is not None: stmt = stmt.where(self._jid_filter(from_jid)) if to_jid is not None: stmt = stmt.where(self._jid_filter(to_jid, dest=True)) if filters: if "timestamp_start" in filters: stmt = stmt.where(History.timestamp >= float(filters["timestamp_start"])) if "before_uid" in filters: # orignially this query was using SQLITE's rowid. This has been changed # to use coalesce(received_timestamp, timestamp) to be SQL engine independant stmt = stmt.where( coalesce(History.received_timestamp, History.timestamp) < ( select( coalesce(History.received_timestamp, History.timestamp) ).filter_by(uid=filters["before_uid"]) ).scalar_subquery() ) if "body" in filters: # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html stmt = stmt.where(Message.message.like(f"%{filters['body']}%")) if "search" in filters: search_term = f"%{filters['search']}%" stmt = stmt.where( or_( Message.message.like(search_term), History.source_res.like(search_term), ) ) if "types" in filters: types = filters["types"].split() stmt = stmt.where(History.type.in_(types)) if "not_types" in filters: types = filters["not_types"].split() stmt = stmt.where(History.type.not_in(types)) if "last_stanza_id" in filters: # this request get the last message with a "stanza_id" that we # have in history. This is mainly used to retrieve messages sent # while we were offline, using MAM (XEP-0313). if filters["last_stanza_id"] is not True or limit != 1: raise ValueError("Unexpected values for last_stanza_id filter") stmt = stmt.where(History.stanza_id.is_not(None)) if "id" in filters: stmt = stmt.where(History.uid == filters["id"]) if "origin_id" in filters: stmt = stmt.where(History.origin_id == filters["origin_id"]) if limit is not None: stmt = stmt.limit(limit) async with self.session() as session: result = await session.execute(stmt) result = result.scalars().unique().all() result.reverse() return [h.as_tuple() for h in result] @aio async def add_to_history(self, data: dict, profile: str) -> None: """Store a new message in history @param data: message data as build by SatMessageProtocol.onMessage """ extra = {k: v for k, v in data["extra"].items() if k not in NOT_IN_EXTRA} messages = [ Message(message=mess, language=lang) for lang, mess in data["message"].items() ] subjects = [ Subject(subject=mess, language=lang) for lang, mess in data["subject"].items() ] if "thread" in data["extra"]: thread = Thread( thread_id=data["extra"]["thread"], parent_id=data["extra"].get["thread_parent"], ) else: thread = None try: async with self.session() as session: async with session.begin(): session.add( History( uid=data["uid"], origin_id=data["extra"].get("origin_id"), stanza_id=data["extra"].get("stanza_id"), update_uid=data["extra"].get("update_uid"), profile_id=self.profiles[profile], source_jid=data["from"], dest_jid=data["to"], timestamp=data["timestamp"], received_timestamp=data.get("received_timestamp"), type=data["type"], extra=extra, messages=messages, subjects=subjects, thread=thread, ) ) except IntegrityError as e: if "unique" in str(e.orig).lower(): log.debug( f"message {data['uid']!r} is already in history, not storing it again" ) else: log.error(f"Can't store message {data['uid']!r} in history: {e}") except Exception as e: log.critical( f"Can't store message, unexpected exception (uid: {data['uid']}): {e}" ) ## Private values def _get_private_class(self, binary, profile): """Get ORM class to use for private values""" if profile is None: return PrivateGenBin if binary else PrivateGen else: return PrivateIndBin if binary else PrivateInd @aio async def get_privates( self, namespace: str, keys: Optional[Iterable[str]] = None, binary: bool = False, profile: Optional[str] = None, ) -> Dict[str, Any]: """Get private value(s) from databases @param namespace: namespace of the values @param keys: keys of the values to get None to get all keys/values @param binary: True to deserialise binary values @param profile: profile to use for individual values None to use general values @return: gotten keys/values """ if keys is not None: keys = list(keys) log.debug( f"getting {'general' if profile is None else 'individual'}" f"{' binary' if binary else ''} private values from database for namespace " f"{namespace}{f' with keys {keys!r}' if keys is not None else ''}" ) cls = self._get_private_class(binary, profile) stmt = select(cls).filter_by(namespace=namespace) if keys: stmt = stmt.where(cls.key.in_(list(keys))) if profile is not None: stmt = stmt.filter_by(profile_id=self.profiles[profile]) async with self.session() as session: result = await session.execute(stmt) return {p.key: p.value for p in result.scalars()} @aio async def set_private_value( self, namespace: str, key: str, value: Any, binary: bool = False, profile: Optional[str] = None, ) -> None: """Set a private value in database @param namespace: namespace of the values @param key: key of the value to set @param value: value to set @param binary: True if it's a binary values binary values need to be serialised, used for everything but strings @param profile: profile to use for individual value if None, it's a general value """ cls = self._get_private_class(binary, profile) values = {"namespace": namespace, "key": key, "value": value} index_elements = [cls.namespace, cls.key] if profile is not None: values["profile_id"] = self.profiles[profile] index_elements.append(cls.profile_id) async with self.session() as session: await session.execute( insert(cls) .values(**values) .on_conflict_do_update( index_elements=index_elements, set_={cls.value: value} ) ) await session.commit() @aio async def del_private_value( self, namespace: str, key: str, binary: bool = False, profile: Optional[str] = None, ) -> None: """Delete private value from database @param category: category of the privateeter @param key: key of the private value @param binary: True if it's a binary values @param profile: profile to use for individual value if None, it's a general value """ cls = self._get_private_class(binary, profile) stmt = delete(cls).filter_by(namespace=namespace, key=key) if profile is not None: stmt = stmt.filter_by(profile_id=self.profiles[profile]) async with self.session() as session: await session.execute(stmt) await session.commit() @aio async def del_private_namespace( self, namespace: str, binary: bool = False, profile: Optional[str] = None ) -> None: """Delete all data from a private namespace Be really cautious when you use this method, as all data with given namespace are removed. Params are the same as for del_private_value """ cls = self._get_private_class(binary, profile) stmt = delete(cls).filter_by(namespace=namespace) if profile is not None: stmt = stmt.filter_by(profile_id=self.profiles[profile]) async with self.session() as session: await session.execute(stmt) await session.commit() ## Files @aio async def get_files( self, client: Optional[SatXMPPEntity], file_id: Optional[str] = None, version: Optional[str] = "", parent: Optional[str] = None, type_: Optional[str] = None, file_hash: Optional[str] = None, hash_algo: Optional[str] = None, name: Optional[str] = None, namespace: Optional[str] = None, mime_type: Optional[str] = None, public_id: Optional[str] = None, owner: Optional[jid.JID] = None, access: Optional[dict] = None, projection: Optional[List[str]] = None, unique: bool = False, ) -> List[dict]: """Retrieve files with with given filters @param file_id: id of the file None to ignore @param version: version of the file None to ignore empty string to look for current version @param parent: id of the directory containing the files None to ignore empty string to look for root files/directories @param projection: name of columns to retrieve None to retrieve all @param unique: if True will remove duplicates other params are the same as for [set_file] @return: files corresponding to filters """ if projection is None: projection = [ "id", "version", "parent", "type", "file_hash", "hash_algo", "name", "size", "namespace", "media_type", "media_subtype", "public_id", "created", "modified", "owner", "access", "extra", ] stmt = select(*[getattr(File, f) for f in projection]) if unique: stmt = stmt.distinct() if client is not None: stmt = stmt.filter_by(profile_id=self.profiles[client.profile]) else: if public_id is None: raise exceptions.InternalError( "client can only be omitted when public_id is set" ) if file_id is not None: stmt = stmt.filter_by(id=file_id) if version is not None: stmt = stmt.filter_by(version=version) if parent is not None: stmt = stmt.filter_by(parent=parent) if type_ is not None: stmt = stmt.filter_by(type=type_) if file_hash is not None: stmt = stmt.filter_by(file_hash=file_hash) if hash_algo is not None: stmt = stmt.filter_by(hash_algo=hash_algo) if name is not None: stmt = stmt.filter_by(name=name) if namespace is not None: stmt = stmt.filter_by(namespace=namespace) if mime_type is not None: if "/" in mime_type: media_type, media_subtype = mime_type.split("/", 1) stmt = stmt.filter_by(media_type=media_type, media_subtype=media_subtype) else: stmt = stmt.filter_by(media_type=mime_type) if public_id is not None: stmt = stmt.filter_by(public_id=public_id) if owner is not None: stmt = stmt.filter_by(owner=owner) if access is not None: raise NotImplementedError("Access check is not implemented yet") # a JSON comparison is needed here async with self.session() as session: result = await session.execute(stmt) return [r._asdict() for r in result] @aio async def set_file( self, client: SatXMPPEntity, name: str, file_id: str, version: str = "", parent: str = "", type_: str = C.FILE_TYPE_FILE, file_hash: Optional[str] = None, hash_algo: Optional[str] = None, size: int = None, namespace: Optional[str] = None, mime_type: Optional[str] = None, public_id: Optional[str] = None, created: Optional[float] = None, modified: Optional[float] = None, owner: Optional[jid.JID] = None, access: Optional[dict] = None, extra: Optional[dict] = None, ) -> None: """Set a file metadata @param client: client owning the file @param name: name of the file (must not contain "/") @param file_id: unique id of the file @param version: version of this file @param parent: id of the directory containing this file Empty string if it is a root file/directory @param type_: one of: - file - directory @param file_hash: unique hash of the payload @param hash_algo: algorithm used for hashing the file (usually sha-256) @param size: size in bytes @param namespace: identifier (human readable is better) to group files for instance, namespace could be used to group files in a specific photo album @param mime_type: media type of the file, or None if not known/guessed @param public_id: ID used to server the file publicly via HTTP @param created: UNIX time of creation @param modified: UNIX time of last modification, or None to use created date @param owner: jid of the owner of the file (mainly useful for component) @param access: serialisable dictionary with access rules. See [memory.memory] for details @param extra: serialisable dictionary of any extra data will be encoded to json in database """ if mime_type is None: media_type = media_subtype = None elif "/" in mime_type: media_type, media_subtype = mime_type.split("/", 1) else: media_type, media_subtype = mime_type, None async with self.session() as session: async with session.begin(): session.add( File( id=file_id, version=version.strip(), parent=parent, type=type_, file_hash=file_hash, hash_algo=hash_algo, name=name, size=size, namespace=namespace, media_type=media_type, media_subtype=media_subtype, public_id=public_id, created=time.time() if created is None else created, modified=modified, owner=owner, access=access, extra=extra, profile_id=self.profiles[client.profile], ) ) @aio async def file_get_used_space(self, client: SatXMPPEntity, owner: jid.JID) -> int: async with self.session() as session: result = await session.execute( select(sum_(File.size)).filter_by( owner=owner, type=C.FILE_TYPE_FILE, profile_id=self.profiles[client.profile], ) ) return result.scalar_one_or_none() or 0 @aio async def file_delete(self, file_id: str) -> None: """Delete file metadata from the database @param file_id: id of the file to delete NOTE: file itself must still be removed, this method only handle metadata in database """ async with self.session() as session: await session.execute(delete(File).filter_by(id=file_id)) await session.commit() @aio async def file_update( self, file_id: str, column: str, update_cb: Callable[[dict], None] ) -> None: """Update a column value using a method to avoid race conditions the older value will be retrieved from database, then update_cb will be applied to update it, and file will be updated checking that older value has not been changed meanwhile by an other user. If it has changed, it tries again a couple of times before failing @param column: column name (only "access" or "extra" are allowed) @param update_cb: method to update the value of the colum the method will take older value as argument, and must update it in place update_cb must not care about serialization, it get the deserialized data (i.e. a Python object) directly @raise exceptions.NotFound: there is not file with this id """ if column not in ("access", "extra"): raise exceptions.InternalError("bad column name") orm_col = getattr(File, column) for i in range(5): async with self.session() as session: try: value = ( await session.execute(select(orm_col).filter_by(id=file_id)) ).scalar_one() except NoResultFound: raise exceptions.NotFound old_value = copy.deepcopy(value) update_cb(value) stmt = update(File).filter_by(id=file_id).values({column: value}) if not old_value: # because JsonDefaultDict convert NULL to an empty dict, we have to # test both for empty dict and None when we have an empty dict stmt = stmt.where((orm_col == None) | (orm_col == old_value)) else: stmt = stmt.where(orm_col == old_value) result = await session.execute(stmt) await session.commit() if result.rowcount == 1: break log.warning( _( "table not updated, probably due to race condition, trying again " "({tries})" ).format(tries=i + 1) ) else: raise exceptions.DatabaseError( _("Can't update file {file_id} due to race condition").format( file_id=file_id ) ) @aio async def get_pubsub_node( self, client: SatXMPPEntity, service: jid.JID, name: str, with_items: bool = False, with_subscriptions: bool = False, create: bool = False, create_kwargs: Optional[dict] = None, ) -> Optional[PubsubNode]: """Retrieve a PubsubNode from DB @param service: service hosting the node @param name: node's name @param with_items: retrieve items in the same query @param with_subscriptions: retrieve subscriptions in the same query @param create: if the node doesn't exist in DB, create it @param create_kwargs: keyword arguments to use with ``set_pubsub_node`` if the node needs to be created. """ async with self.session() as session: stmt = select(PubsubNode).filter_by( service=service, name=name, profile_id=self.profiles[client.profile], ) if with_items: stmt = stmt.options(joinedload(PubsubNode.items)) if with_subscriptions: stmt = stmt.options(joinedload(PubsubNode.subscriptions)) result = await session.execute(stmt) ret = result.unique().scalar_one_or_none() if ret is None and create: # we auto-create the node if create_kwargs is None: create_kwargs = {} try: return await as_future( self.set_pubsub_node(client, service, name, **create_kwargs) ) except IntegrityError as e: if "unique" in str(e.orig).lower(): # the node may already exist, if it has been created just after # get_pubsub_node above log.debug("ignoring UNIQUE constraint error") cached_node = await as_future( self.get_pubsub_node( client, service, name, with_items=with_items, with_subscriptions=with_subscriptions, ) ) else: raise e else: return ret @aio async def set_pubsub_node( self, client: SatXMPPEntity, service: jid.JID, name: str, analyser: Optional[str] = None, type_: Optional[str] = None, subtype: Optional[str] = None, subscribed: bool = False, ) -> PubsubNode: node = PubsubNode( profile_id=self.profiles[client.profile], service=service, name=name, subscribed=subscribed, analyser=analyser, type_=type_, subtype=subtype, subscriptions=[], ) async with self.session() as session: async with session.begin(): session.add(node) return node @aio async def update_pubsub_node_sync_state( self, node: PubsubNode, state: SyncState ) -> None: async with self.session() as session: async with session.begin(): await session.execute( update(PubsubNode) .filter_by(id=node.id) .values( sync_state=state, sync_state_updated=time.time(), ) ) @aio async def delete_pubsub_node( self, profiles: Optional[List[str]], services: Optional[List[jid.JID]], names: Optional[List[str]], ) -> None: """Delete items cached for a node @param profiles: profile names from which nodes must be deleted. None to remove nodes from ALL profiles @param services: JIDs of pubsub services from which nodes must be deleted. None to remove nodes from ALL services @param names: names of nodes which must be deleted. None to remove ALL nodes whatever is their names """ stmt = delete(PubsubNode) if profiles is not None: stmt = stmt.where( PubsubNode.profile.in_([self.profiles[p] for p in profiles]) ) if services is not None: stmt = stmt.where(PubsubNode.service.in_(services)) if names is not None: stmt = stmt.where(PubsubNode.name.in_(names)) async with self.session() as session: await session.execute(stmt) await session.commit() @aio async def cache_pubsub_items( self, client: SatXMPPEntity, node: PubsubNode, items: List[domish.Element], parsed_items: Optional[List[dict]] = None, ) -> None: """Add items to database, using an upsert taking care of "updated" field""" if parsed_items is not None and len(items) != len(parsed_items): raise exceptions.InternalError( "parsed_items must have the same lenght as items" ) async with self.session() as session: async with session.begin(): for idx, item in enumerate(items): parsed = parsed_items[idx] if parsed_items else None stmt = ( insert(PubsubItem) .values( node_id=node.id, name=item["id"], data=item, parsed=parsed, ) .on_conflict_do_update( index_elements=(PubsubItem.node_id, PubsubItem.name), set_={ PubsubItem.data: item, PubsubItem.parsed: parsed, PubsubItem.updated: now(), }, ) ) await session.execute(stmt) await session.commit() @aio async def delete_pubsub_items( self, node: PubsubNode, items_names: Optional[List[str]] = None ) -> None: """Delete items cached for a node @param node: node from which items must be deleted @param items_names: names of items to delete if None, ALL items will be deleted """ stmt = delete(PubsubItem) if node is not None: if isinstance(node, list): stmt = stmt.where(PubsubItem.node_id.in_([n.id for n in node])) else: stmt = stmt.filter_by(node_id=node.id) if items_names is not None: stmt = stmt.where(PubsubItem.name.in_(items_names)) async with self.session() as session: await session.execute(stmt) await session.commit() @aio async def purge_pubsub_items( self, services: Optional[List[jid.JID]] = None, names: Optional[List[str]] = None, types: Optional[List[str]] = None, subtypes: Optional[List[str]] = None, profiles: Optional[List[str]] = None, created_before: Optional[datetime] = None, updated_before: Optional[datetime] = None, ) -> None: """Delete items cached for a node @param node: node from which items must be deleted @param items_names: names of items to delete if None, ALL items will be deleted """ stmt = delete(PubsubItem) node_fields = { "service": services, "name": names, "type_": types, "subtype": subtypes, } if profiles is not None: node_fields["profile_id"] = [self.profiles[p] for p in profiles] if any(x is not None for x in node_fields.values()): sub_q = select(PubsubNode.id) for col, values in node_fields.items(): if values is None: continue sub_q = sub_q.where(getattr(PubsubNode, col).in_(values)) stmt = stmt.where(PubsubItem.node_id.in_(sub_q)).execution_options( synchronize_session=False ) if created_before is not None: stmt = stmt.where(PubsubItem.created < created_before) if updated_before is not None: stmt = stmt.where(PubsubItem.updated < updated_before) async with self.session() as session: await session.execute(stmt) await session.commit() @aio async def get_items( self, node: PubsubNode, max_items: Optional[int] = None, item_ids: Optional[list[str]] = None, before: Optional[str] = None, after: Optional[str] = None, from_index: Optional[int] = None, order_by: Optional[List[str]] = None, desc: bool = True, force_rsm: bool = False, ) -> Tuple[List[PubsubItem], dict]: """Get Pubsub Items from cache @param node: retrieve items from this node (must be synchronised) @param max_items: maximum number of items to retrieve @param before: get items which are before the item with this name in given order empty string is not managed here, use desc order to reproduce RSM behaviour. @param after: get items which are after the item with this name in given order @param from_index: get items with item index (as defined in RSM spec) starting from this number @param order_by: sorting order of items (one of C.ORDER_BY_*) @param desc: direction or ordering @param force_rsm: if True, force the use of RSM worklow. RSM workflow is automatically used if any of before, after or from_index is used, but if only RSM max_items is used, it won't be used by default. This parameter let's use RSM workflow in this case. Note that in addition to RSM metadata, the result will not be the same (max_items without RSM will returns most recent items, i.e. last items in modification order, while max_items with RSM will return the oldest ones (i.e. first items in modification order). to be used when max_items is used from RSM """ metadata = { "service": node.service, "node": node.name, "uri": uri.build_xmpp_uri( "pubsub", path=node.service.full(), node=node.name, ), } if max_items is None: max_items = 20 use_rsm = any((before, after, from_index is not None)) if force_rsm and not use_rsm: # use_rsm = True from_index = 0 stmt = select(PubsubItem).filter_by(node_id=node.id).limit(max_items) if item_ids is not None: stmt = stmt.where(PubsubItem.name.in_(item_ids)) if not order_by: order_by = [C.ORDER_BY_MODIFICATION] order = [] for order_type in order_by: if order_type == C.ORDER_BY_MODIFICATION: if desc: order.extend((PubsubItem.updated.desc(), PubsubItem.id.desc())) else: order.extend((PubsubItem.updated.asc(), PubsubItem.id.asc())) elif order_type == C.ORDER_BY_CREATION: if desc: order.append(PubsubItem.id.desc()) else: order.append(PubsubItem.id.asc()) else: raise exceptions.InternalError(f"Unknown order type {order_type!r}") stmt = stmt.order_by(*order) if use_rsm: # CTE to have result row numbers row_num_q = select( PubsubItem.id, PubsubItem.name, # row_number starts from 1, but RSM index must start from 0 (func.row_number().over(order_by=order) - 1).label("item_index"), ).filter_by(node_id=node.id) row_num_cte = row_num_q.cte() if max_items > 0: # as we can't simply use PubsubItem.id when we order by modification, # we need to use row number item_name = before or after row_num_limit_q = ( select(row_num_cte.c.item_index).where( row_num_cte.c.name == item_name ) ).scalar_subquery() stmt = ( select(row_num_cte.c.item_index, PubsubItem) .join(row_num_cte, PubsubItem.id == row_num_cte.c.id) .limit(max_items) ) if before: stmt = stmt.where( row_num_cte.c.item_index < row_num_limit_q ).order_by(row_num_cte.c.item_index.desc()) elif after: stmt = stmt.where( row_num_cte.c.item_index > row_num_limit_q ).order_by(row_num_cte.c.item_index.asc()) else: stmt = stmt.where(row_num_cte.c.item_index >= from_index).order_by( row_num_cte.c.item_index.asc() ) # from_index is used async with self.session() as session: if max_items == 0: items = result = [] else: result = await session.execute(stmt) result = result.all() if before: result.reverse() items = [row[-1] for row in result] rows_count = ( await session.execute(row_num_q.with_only_columns(count())) ).scalar_one() try: index = result[0][0] except IndexError: index = None try: first = result[0][1].name except IndexError: first = None last = None else: last = result[-1][1].name metadata["rsm"] = { k: v for k, v in { "index": index, "count": rows_count, "first": first, "last": last, }.items() if v is not None } metadata["complete"] = (index or 0) + len(result) == rows_count return items, metadata async with self.session() as session: result = await session.execute(stmt) result = result.scalars().all() if desc: result.reverse() return result, metadata def _get_sqlite_path(self, path: List[Union[str, int]]) -> str: """generate path suitable to query JSON element with SQLite""" return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}" @aio async def search_pubsub_items( self, query: dict, ) -> Tuple[List[PubsubItem]]: """Search for pubsub items in cache @param query: search terms. Keys can be: :fts (str): Full-Text Search query. Currently SQLite FT5 engine is used, its query syntax can be used, see `FTS5 Query documentation <https://sqlite.org/fts5.html#full_text_query_syntax>`_ :profiles (list[str]): filter on nodes linked to those profiles :nodes (list[str]): filter on nodes with those names :services (list[jid.JID]): filter on nodes from those services :types (list[str|None]): filter on nodes with those types. None can be used to filter on nodes with no type set :subtypes (list[str|None]): filter on nodes with those subtypes. None can be used to filter on nodes with no subtype set :names (list[str]): filter on items with those names :parsed (list[dict]): Filter on a parsed data field. The dict must contain 3 keys: ``path`` which is a list of str or int giving the path to the field of interest (str for a dict key, int for a list index), ``operator`` with indicate the operator to use to check the condition, and ``value`` which depends of field type and operator. See documentation for details on operators (it's currently explained at ``doc/libervia-cli/pubsub_cache.rst`` in ``search`` command documentation). :order-by (list[dict]): Indicates how to order results. The dict can contain either a ``order`` for a well-know order or a ``path`` for a parsed data field path (``order`` and ``path`` can't be used at the same time), an an optional ``direction`` which can be ``asc`` or ``desc``. See documentation for details on well-known orders (it's currently explained at ``doc/libervia-cli/pubsub_cache.rst`` in ``search`` command documentation). :index (int): starting index of items to return from the query result. It's translated to SQL's OFFSET :limit (int): maximum number of items to return. It's translated to SQL's LIMIT. @result: found items (the ``node`` attribute will be filled with suitable PubsubNode) """ # TODO: FTS and parsed data filters use SQLite specific syntax # when other DB engines will be used, this will have to be adapted stmt = select(PubsubItem) # Full-Text Search fts = query.get("fts") if fts: fts_select = ( text("SELECT rowid, rank FROM pubsub_items_fts(:fts_query)") .bindparams(fts_query=fts) .columns(rowid=Integer) .subquery() ) stmt = stmt.select_from(fts_select).outerjoin( PubsubItem, fts_select.c.rowid == PubsubItem.id ) # node related filters profiles = query.get("profiles") if profiles or any( query.get(k) for k in ("nodes", "services", "types", "subtypes") ): stmt = stmt.join(PubsubNode).options(contains_eager(PubsubItem.node)) if profiles: try: stmt = stmt.where( PubsubNode.profile_id.in_(self.profiles[p] for p in profiles) ) except KeyError as e: raise exceptions.ProfileUnknownError( f"This profile doesn't exist: {e.args[0]!r}" ) for key, attr in ( ("nodes", "name"), ("services", "service"), ("types", "type_"), ("subtypes", "subtype"), ): value = query.get(key) if not value: continue if key in ("types", "subtypes") and None in value: # NULL can't be used with SQL's IN, so we have to add a condition with # IS NULL, and use a OR if there are other values to check value.remove(None) condition = getattr(PubsubNode, attr).is_(None) if value: condition = or_(getattr(PubsubNode, attr).in_(value), condition) else: condition = getattr(PubsubNode, attr).in_(value) stmt = stmt.where(condition) else: stmt = stmt.options(selectinload(PubsubItem.node)) # names names = query.get("names") if names: stmt = stmt.where(PubsubItem.name.in_(names)) # parsed data filters parsed = query.get("parsed", []) for filter_ in parsed: try: path = filter_["path"] operator = filter_["op"] value = filter_["value"] except KeyError as e: raise ValueError( f'missing mandatory key {e.args[0]!r} in "parsed" filter' ) try: op_attr = OP_MAP[operator] except KeyError: raise ValueError(f"invalid operator: {operator!r}") sqlite_path = self._get_sqlite_path(path) if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): col = literal_column("json_each.value") if operator[0] == "i": col = func.lower(col) value = [str(v).lower() for v in value] condition = ( select(1) .select_from(func.json_each(PubsubItem.parsed, sqlite_path)) .where(col.in_(value)) ).scalar_subquery() if operator in ("disjoint", "idisjoint"): condition = condition.is_(None) stmt = stmt.where(condition) elif operator == "between": try: left, right = value except (ValueError, TypeError): raise ValueError( _( 'invalid value for "between" filter, you must use a 2 items ' "array: {value!r}" ).format(value=value) ) col = func.json_extract(PubsubItem.parsed, sqlite_path) stmt = stmt.where(col.between(left, right)) else: # we use func.json_extract instead of generic JSON way because SQLAlchemy # add a JSON_QUOTE to the value, and we want SQL value col = func.json_extract(PubsubItem.parsed, sqlite_path) stmt = stmt.where(getattr(col, op_attr)(value)) # order order_by = query.get("order-by") or [{"order": "creation"}] for order_data in order_by: order, path = order_data.get("order"), order_data.get("path") if order and path: raise ValueError( _( '"order" and "path" can\'t be used at the same time in ' '"order-by" data' ) ) if order: if order == "creation": col = PubsubItem.id elif order == "modification": col = PubsubItem.updated elif order == "item_id": col = PubsubItem.name elif order == "rank": if not fts: raise ValueError( "'rank' order can only be used with Full-Text Search (fts)" ) col = literal_column("rank") else: raise NotImplementedError(f"Unknown {order!r} order") else: # we have a JSON path # sqlite_path = self._get_sqlite_path(path) col = PubsubItem.parsed[path] direction = order_data.get("direction", "ASC").lower() if not direction in ("asc", "desc"): raise ValueError(f"Invalid order-by direction: {direction!r}") stmt = stmt.order_by(getattr(col, direction)()) # offset, limit index = query.get("index") if index: stmt = stmt.offset(index) limit = query.get("limit") if limit: stmt = stmt.limit(limit) async with self.session() as session: result = await session.execute(stmt) return result.scalars().all() # Notifications @aio async def add_notification( self, client: Optional[SatXMPPEntity], type_: NotificationType, body_plain: str, body_rich: Optional[str] = None, title: Optional[str] = None, requires_action: bool = False, priority: NotificationPriority = NotificationPriority.MEDIUM, expire_at: Optional[float] = None, extra: Optional[dict] = None, ) -> Notification: """Add a new notification to the DB. @param client: client associated with the notification. If None, the notification will be global. @param type_: type of the notification. @param body_plain: plain text body. @param body_rich: rich text (XHTML) body. @param title: optional title. @param requires_action: True if the notification requires user action (e.g. a dialog need to be answered). @priority: how urgent the notification is @param expire_at: expiration timestamp for the notification. @param extra: additional data. @return: created Notification """ profile_id = self.profiles[client.profile] if client else None notification = Notification( profile_id=profile_id, type=type_, body_plain=body_plain, body_rich=body_rich, requires_action=requires_action, priority=priority, expire_at=expire_at, title=title, extra_data=extra, status=NotificationStatus.new, ) async with self.session() as session: async with session.begin(): session.add(notification) return notification @aio async def update_notification( self, client: SatXMPPEntity, notification_id: int, **kwargs ) -> None: """Update an existing notification. @param client: client associated with the notification. @param notification_id: ID of the notification to update. """ profile_id = self.profiles[client.profile] async with self.session() as session: await session.execute( update(Notification) .where( and_( Notification.profile_id == profile_id, Notification.id == notification_id, ) ) .values(**kwargs) ) await session.commit() @aio async def get_notifications( self, client: SatXMPPEntity, type_: Optional[NotificationType] = None, status: Optional[NotificationStatus] = None, requires_action: Optional[bool] = None, min_priority: Optional[int] = None, ) -> List[Notification]: """Retrieve all notifications for a given profile with optional filters. @param client: client associated with the notifications. @param type_: filter by type of the notification. @param status: filter by status of the notification. @param requires_action: filter by notifications that require user action. @param min_priority: filter by minimum priority value. @return: list of matching Notification instances. """ profile_id = self.profiles[client.profile] filters = [ or_(Notification.profile_id == profile_id, Notification.profile_id.is_(None)) ] if type_: filters.append(Notification.type == type_) if status: filters.append(Notification.status == status) if requires_action is not None: filters.append(Notification.requires_action == requires_action) if min_priority: filters.append(Notification.priority >= min_priority) async with self.session() as session: result = await session.execute( select(Notification).where(and_(*filters)).order_by(Notification.id) ) return result.scalars().all() @aio async def delete_notification( self, client: Optional[SatXMPPEntity], notification_id: str ) -> None: """Delete a notification by its profile and id. @param client: client associated with the notification. If None, profile_id will be NULL. @param notification_id: ID of the notification to delete. """ profile_id = self.profiles[client.profile] if client else None async with self.session() as session: await session.execute( delete(Notification).where( and_( Notification.profile_id == profile_id, Notification.id == int(notification_id), ) ) ) await session.commit() @aio async def clean_expired_notifications( self, client: Optional[SatXMPPEntity], limit_timestamp: Optional[float] = None ) -> None: """Cleans expired notifications and older profile-specific notifications. - Removes all notifications where the expiration timestamp has passed, irrespective of their profile. - If a limit_timestamp is provided, removes older notifications with a profile set (i.e., not global notifications) that do not require user action. If client is provided, only remove notification for this profile. @param client: if provided, only expire notification for this client (in addition to truly expired notifications for everybody). @param limit_timestamp: Timestamp limit for older notifications. If None, only truly expired notifications are removed. """ # Delete truly expired notifications expired_condition = Notification.expire_at < time.time() # Delete older profile-specific notifications (created before the limit_timestamp) if client is None: profile_condition = Notification.profile_id.isnot(None) else: profile_condition = Notification.profile_id == self.profiles[client.profile] older_condition = and_( profile_condition, Notification.timestamp < limit_timestamp if limit_timestamp else False, Notification.requires_action == False, ) # Combine the conditions conditions = or_(expired_condition, older_condition) async with self.session() as session: await session.execute(delete(Notification).where(conditions)) await session.commit()