# HG changeset patch # User Goffi # Date 1697470171 -7200 # Node ID 02f0adc745c6d319d11e09f5dc383a8fdacd4b7a # Parent 51744ad00a42bfe2878c500ddff9964683b78aab core: notifications implementation, first draft: add a new table for notifications, and methods/bridge methods to manipulate them. diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/bridge/bridge_constructor/bridge_template.ini --- a/libervia/backend/bridge/bridge_constructor/bridge_template.ini Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/backend/bridge/bridge_constructor/bridge_template.ini Mon Oct 16 17:29:31 2023 +0200 @@ -147,6 +147,31 @@ doc_param_2=value: New value doc_param_3=%(doc_profile)s +[notification_new] +type=signal +category=core +sig_in=sdssssbidss +doc=A new notification has been emitted +doc_param_0=id: unique identifier of the notification +doc_param_1=timestamp: notification creation time +doc_param_2=type: type of the notification +doc_param_3=body_plain: plain text body of the notification +doc_param_4=body_rich: rich text (XHTML) body of the notification. Optional. +doc_param_5=title: optional title of the notification +doc_param_6=requires_action: True if the notification requires user action (e.g. a dialog needs to be answered), False otherwise +doc_param_7=priority: how urgent the notification is, represented as an enumeration value +doc_param_8=expire_at: expiration timestamp for the notification. Optional. +doc_param_9=extra: additional serialized data associated with the notification +doc_param_10=profile: profile associated with the notification. C.PROF_KEY_ALL can be used for global notifications. + +[notification_deleted] +type=signal +category=core +sig_in=ss +doc=A new notification has been emitted +doc_param_0=id: id of the deleted notification +doc_param_1=profile: profile of the deleted application, or C.PROF_KEY_ALL for a global notification + [progress_started] type=signal category=core @@ -1024,3 +1049,49 @@ doc_param_4=profile_key: either profile_key or empty string to use common cache this parameter is used only when dest is empty doc_return=path to the new converted image + +[notification_add] +type=method +category=core +sig_in=ssssbbsdss +sig_out= +doc=Add a new notification +doc_param_0=type_: Notification type +doc_param_1=body_plain: Plain text body of the notification +doc_param_2=body_rich: Rich text body of the notification (optional, can be empty string for default) +doc_param_3=title: Title of the notification (optional, can be empty string for default) +doc_param_4=is_global: True if the notification is for all profiles +doc_param_5=requires_action: Indicates if the notification requires action +doc_param_7=priority: Priority level of the notification (e.g. MEDIUM, HIGH, etc.) +doc_param_8=expire_at: Expiration timestamp for the notification (optional, can be 0 for none) +doc_param_9=extra: Additional details for the notification as a dictionary (optional, can be empty dictionary) +doc_param_10=%(doc_profile_key)s: Profile key (use "@ALL@" for all profiles) + +[notifications_get] +type=method +category=core +sig_in=ss +sig_out=s +doc=Retrieve notifications based on provided filters +doc_param_0=filters: a dictionary with filter criteria for notifications retrieval +doc_param_1=%(doc_profile_key)s or @ALL@ for all profiles +doc_return=list of Notification objects. The exact structure will depend on your Notification class. + +[notification_delete] +type=method +category=core +sig_in=sbs +sig_out= +doc=Delete a notification +doc_param_0=id_: ID of the notification to delete +doc_param_1=is_global: true if the notification is a global one +doc_param_2=profile_key: Profile key (use "@ALL@" for all profiles) + +[notifications_expired_clean] +type=method +category=core +sig_in=ds +sig_out= +doc=Cleans expired notifications and older profile-specific notifications +doc_param_0=limit_timestamp: Timestamp limit for older notifications. If -1.0, only truly expired notifications are removed. +doc_param_1=profile_key: Profile key (use "@NONE@" to indicate no specific profile, otherwise only notification for given profile will be expired, in addition to truly expired notifications). diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/bridge/dbus_bridge.py --- a/libervia/backend/bridge/dbus_bridge.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/backend/bridge/dbus_bridge.py Mon Oct 16 17:29:31 2023 +0200 @@ -124,6 +124,10 @@ Method('message_encryption_stop', arguments='ss', returns=''), Method('message_send', arguments='sa{ss}a{ss}sss', returns=''), Method('namespaces_get', arguments='', returns='a{ss}'), + Method('notification_add', arguments='ssssbbsdss', returns=''), + Method('notification_delete', arguments='sbs', returns=''), + Method('notifications_expired_clean', arguments='ds', returns=''), + Method('notifications_get', arguments='ss', returns='s'), Method('param_get_a', arguments='ssss', returns='s'), Method('param_get_a_async', arguments='sssis', returns='s'), Method('param_set', arguments='sssis', returns=''), @@ -164,6 +168,8 @@ Signal('message_encryption_started', 'sss'), Signal('message_encryption_stopped', 'sa{ss}s'), Signal('message_new', 'sdssa{ss}a{ss}sss'), + Signal('notification_deleted', 'ss'), + Signal('notification_new', 'sdssssbidss'), Signal('param_update', 'ssss'), Signal('presence_update', 'ssia{ss}s'), Signal('progress_error', 'sss'), @@ -304,6 +310,18 @@ def dbus_namespaces_get(self, ): return self._callback("namespaces_get", ) + def dbus_notification_add(self, type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra): + return self._callback("notification_add", type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra) + + def dbus_notification_delete(self, id_, is_global, profile_key): + return self._callback("notification_delete", id_, is_global, profile_key) + + def dbus_notifications_expired_clean(self, limit_timestamp, profile_key): + return self._callback("notifications_expired_clean", limit_timestamp, profile_key) + + def dbus_notifications_get(self, filters, profile_key): + return self._callback("notifications_get", filters, profile_key) + def dbus_param_get_a(self, name, category, attribute="value", profile_key="@DEFAULT@"): return self._callback("param_get_a", name, category, attribute, profile_key) @@ -447,6 +465,12 @@ def message_new(self, uid, timestamp, from_jid, to_jid, message, subject, mess_type, extra, profile): self._obj.emitSignal("message_new", uid, timestamp, from_jid, to_jid, message, subject, mess_type, extra, profile) + def notification_deleted(self, id, profile): + self._obj.emitSignal("notification_deleted", id, profile) + + def notification_new(self, id, timestamp, type, body_plain, body_rich, title, requires_action, priority, expire_at, extra, profile): + self._obj.emitSignal("notification_new", id, timestamp, type, body_plain, body_rich, title, requires_action, priority, expire_at, extra, profile) + def param_update(self, name, value, category, profile): self._obj.emitSignal("param_update", name, value, category, profile) diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/bridge/pb.py --- a/libervia/backend/bridge/pb.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/backend/bridge/pb.py Mon Oct 16 17:29:31 2023 +0200 @@ -193,6 +193,12 @@ def message_new(self, uid, timestamp, from_jid, to_jid, message, subject, mess_type, extra, profile): self.send_signal("message_new", uid, timestamp, from_jid, to_jid, message, subject, mess_type, extra, profile) + def notification_deleted(self, id, profile): + self.send_signal("notification_deleted", id, profile) + + def notification_new(self, id, timestamp, type, body_plain, body_rich, title, requires_action, priority, expire_at, extra, profile): + self.send_signal("notification_new", id, timestamp, type, body_plain, body_rich, title, requires_action, priority, expire_at, extra, profile) + def param_update(self, name, value, category, profile): self.send_signal("param_update", name, value, category, profile) diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/core/main.py --- a/libervia/backend/core/main.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/backend/core/main.py Mon Oct 16 17:29:31 2023 +0200 @@ -221,6 +221,10 @@ self.bridge.register_method("image_resize", self._image_resize) self.bridge.register_method("image_generate_preview", self._image_generate_preview) self.bridge.register_method("image_convert", self._image_convert) + self.bridge.register_method("notification_add", self.memory._add_notification) + self.bridge.register_method("notifications_get", self.memory._get_notifications) + self.bridge.register_method("notification_delete", self.memory._delete_notification) + self.bridge.register_method("notifications_expired_clean", self.memory._notifications_expired_clean) await self.memory.initialise() diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/memory/memory.py --- a/libervia/backend/memory/memory.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/backend/memory/memory.py Mon Oct 16 17:29:31 2023 +0200 @@ -16,29 +16,39 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import os.path +from collections import namedtuple import copy -import shortuuid -import mimetypes -import time +from dataclasses import dataclass from functools import partial -from typing import Optional, Tuple, Dict +import mimetypes +import os.path from pathlib import Path +import time +from typing import Dict, Optional, Tuple from uuid import uuid4 -from collections import namedtuple + +import shortuuid +from twisted.internet import defer, error, reactor from twisted.python import failure -from twisted.internet import defer, reactor, error from twisted.words.protocols.jabber import jid + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger -from libervia.backend.core import exceptions -from libervia.backend.core.constants import Const as C -from libervia.backend.memory.sqla import Storage -from libervia.backend.memory.persistent import PersistentDict -from libervia.backend.memory.params import Params -from libervia.backend.memory.disco import Discovery from libervia.backend.memory.crypto import BlockCipher from libervia.backend.memory.crypto import PasswordHasher +from libervia.backend.memory.disco import Discovery +from libervia.backend.memory.params import Params +from libervia.backend.memory.persistent import PersistentDict +from libervia.backend.memory.sqla import ( + Notification, + NotificationPriority, + NotificationStatus, + NotificationType, + Storage, +) from libervia.backend.tools import config as tools_config from libervia.backend.tools.common import data_format from libervia.backend.tools.common import regex @@ -1848,6 +1858,180 @@ *(regex.path_escape(a) for a in args) ) + ## Notifications ## + + + def _add_notification( + self, + type_: str, + body_plain: str, + body_rich: str, + title: str, + is_global: bool, + requires_action: bool, + priority: str, + expire_at: float, + extra_s: str, + profile_key: str + ) -> defer.Deferred: + client = self.host.get_client(profile_key) + + if not client.is_admin: + raise exceptions.PermissionError("Only admins can add a notification") + + try: + notification_type = NotificationType[type_] + notification_priority = NotificationPriority[priority] + except KeyError as e: + raise exceptions.DataError( + f"invalid notification type or priority data: {e}" + ) + + return defer.ensureDeferred( + self.add_notification( + client, + notification_type, + body_plain, + body_rich or None, + title or None, + is_global, + requires_action, + notification_priority, + expire_at or None, + data_format.deserialise(extra_s) + ) + ) + + async def add_notification( + self, + client: SatXMPPEntity, + type_: NotificationType, + body_plain: str, + body_rich: Optional[str] = None, + title: Optional[str] = None, + is_global: bool = False, + requires_action: bool = False, + priority: NotificationPriority = NotificationPriority.MEDIUM, + expire_at: Optional[float] = None, + extra: Optional[dict] = None, + ) -> None: + """Create and broadcast a new notification. + + @param client: client associated with the notification. If None, the notification + will be global (i.e. for all profiles). + @param type_: type of the notification. + @param body_plain: plain text body. + @param body_rich: rich text (XHTML) body. + @param title: optional title. + @param is_global: True if the notification is for all profiles. + @param requires_action: True if the notification requires user action (e.g. a + dialog need to be answered). + @priority: how urgent the notification is + @param expire_at: expiration timestamp for the notification. + @param extra: additional data. + """ + notification = await self.storage.add_notification( + None if is_global else client, type_, body_plain, body_rich, title, + requires_action, priority, expire_at, extra + ) + self.host.bridge.notification_new( + str(notification.id), + notification.timestamp, + type_.value, + body_plain, + body_rich or '', + title or '', + requires_action, + priority.value, + expire_at or 0, + data_format.serialise(extra) if extra else '', + C.PROF_KEY_ALL if is_global else client.profile + ) + + def _get_notifications(self, filters_s: str, profile_key: str) -> defer.Deferred: + """Fetch notifications for bridge with given filters and profile key. + + @param filters_s: serialized filter conditions. Keys can be: + :type_ (str): + Filter by type of the notification. + :status (str): + Filter by status of the notification. + :requires_action (bool): + Filter by notifications that require user action. + :min_priority (str): + Filter by minimum priority value. + @param profile_key: key of the profile to fetch notifications for. + @return: Deferred which fires with a list of serialised notifications. + """ + client = self.host.get_client(profile_key) + + filters = data_format.deserialise(filters_s) + + try: + if 'type' in filters: + filters['type_'] = NotificationType[filters.pop('type')] + if 'status' in filters: + filters['status'] = NotificationStatus[filters['status']] + if 'min_priority' in filters: + filters['min_priority'] = NotificationPriority[filters['min_priority']].value + except KeyError as e: + raise exceptions.DataError(f"invalid filter data: {e}") + + d = defer.ensureDeferred(self.storage.get_notifications(client, **filters)) + d.addCallback( + lambda notifications: data_format.serialise( + [notification.serialise() for notification in notifications] + ) + ) + return d + + def _delete_notification( + self, + id_: str, + is_global: bool, + profile_key: str + ) -> defer.Deferred: + client = self.host.get_client(profile_key) + if is_global and not client.is_admin: + raise exceptions.PermissionError( + "Only admins can delete global notifications" + ) + return defer.ensureDeferred(self.delete_notification(client, id_, is_global)) + + async def delete_notification( + self, + client: SatXMPPEntity, + id_: str, + is_global: bool=False + ) -> None: + """Delete a notification + + the notification must be from the requesting profile. + @param id_: ID of the notification + is_global: if True, a global notification will be removed. + """ + await self.storage.delete_notification(None if is_global else client, id_) + self.host.bridge.notification_deleted( + id_, + C.PROF_KEY_ALL if is_global else client.profile + ) + + def _notifications_expired_clean( + self, limit_timestamp: float, profile_key: str + ) -> defer.Deferred: + if profile_key == C.PROF_KEY_NONE: + client = None + else: + client = self.host.get_client(profile_key) + + return defer.ensureDeferred( + self.storage.clean_expired_notifications( + client, + None if limit_timestamp == -1.0 else limit_timestamp + ) + ) + + ## Misc ## def is_entity_available(self, client, entity_jid): diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/memory/migration/versions/2ab01aa1f686_create_table_for_notifications.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/memory/migration/versions/2ab01aa1f686_create_table_for_notifications.py Mon Oct 16 17:29:31 2023 +0200 @@ -0,0 +1,46 @@ +"""create table for notifications + +Revision ID: 2ab01aa1f686 +Revises: 4b002773cf92 +Create Date: 2023-10-16 12:11:43.507295 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '2ab01aa1f686' +down_revision = '4b002773cf92' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('notifications', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('timestamp', sa.Float(), nullable=False), + sa.Column('expire_at', sa.Float(), nullable=True), + sa.Column('profile_id', sa.Integer(), nullable=True), + sa.Column('type', sa.Enum('chat', 'blog', 'calendar', 'file', 'call', 'service', 'other', name='notificationtype'), nullable=False), + sa.Column('title', sa.Text(), nullable=True), + sa.Column('body_plain', sa.Text(), nullable=False), + sa.Column('body_rich', sa.Text(), nullable=True), + sa.Column('requires_action', sa.Boolean(), nullable=True), + sa.Column('priority', sa.Integer(), nullable=True), + sa.Column('extra_data', sa.JSON(), nullable=True), + sa.Column('status', sa.Enum('new', 'read', name='notificationstatus'), nullable=True), + sa.ForeignKeyConstraint(['profile_id'], ['profiles.id'], name=op.f('fk_notifications_profile_id_profiles'), ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id', name=op.f('pk_notifications')) + ) + with op.batch_alter_table('notifications', schema=None) as batch_op: + batch_op.create_index(batch_op.f('ix_notifications_profile_id'), ['profile_id'], unique=False) + batch_op.create_index('notifications_profile_id_status', ['profile_id', 'status'], unique=False) + + +def downgrade(): + with op.batch_alter_table('notifications', schema=None) as batch_op: + batch_op.drop_index('notifications_profile_id_status') + batch_op.drop_index(batch_op.f('ix_notifications_profile_id')) + + op.drop_table('notifications') diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/memory/sqla.py --- a/libervia/backend/memory/sqla.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/backend/memory/sqla.py Mon Oct 16 17:29:31 2023 +0200 @@ -62,6 +62,10 @@ History, Message, NOT_IN_EXTRA, + Notification, + NotificationPriority, + NotificationStatus, + NotificationType, ParamGen, ParamInd, PrivateGen, @@ -74,6 +78,8 @@ Subject, SyncState, Thread, + get_profile_by_id, + profiles, ) from libervia.backend.tools.common import uri from libervia.backend.tools.utils import aio, as_future @@ -117,13 +123,15 @@ def __init__(self): self.initialized = defer.Deferred() # we keep cache for the profiles (key: profile name, value: profile id) - # profile id to name - self.profiles: Dict[str, int] = {} # profile id to component entry point self.components: Dict[int, str] = {} + @property + def profiles(self): + return profiles + def get_profile_by_id(self, profile_id): - return self.profiles.get(profile_id) + return get_profile_by_id(profile_id) async def migrate_apply(self, *args: str, log_output: bool = False) -> None: """Do a migration command @@ -139,16 +147,18 @@ """ stdout, stderr = 2 * (None,) if log_output else 2 * (PIPE,) proc = await asyncio.create_subprocess_exec( - sys.executable, "-m", "alembic", *args, - stdout=stdout, stderr=stderr, cwd=migration_path + sys.executable, + "-m", + "alembic", + *args, + stdout=stdout, + stderr=stderr, + cwd=migration_path, ) log_out, log_err = await proc.communicate() if proc.returncode != 0: - msg = _( - "Can't {operation} database (exit code {exit_code})" - ).format( - operation=args[0], - exit_code=proc.returncode + msg = _("Can't {operation} database (exit code {exit_code})").format( + operation=args[0], exit_code=proc.returncode ) if log_out or log_err: msg += f":\nstdout: {log_out.decode()}\nstderr: {log_err.decode()}" @@ -216,9 +226,7 @@ async with engine.connect() as conn: await conn.run_sync(self._sqlite_set_journal_mode_wal) - self.session = sessionmaker( - engine, expire_on_commit=False, class_=AsyncSession - ) + self.session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession) async with self.session() as session: result = await session.execute(select(Profile)) @@ -239,9 +247,9 @@ db_cls: DeclarativeMeta, db_id_col: Mapped, id_value: Any, - joined_loads = None + joined_loads=None, ) -> Optional[DeclarativeMeta]: - stmt = select(db_cls).where(db_id_col==id_value) + stmt = select(db_cls).where(db_id_col == id_value) if client is not None: stmt = stmt.filter_by(profile_id=self.profiles[client.profile]) if joined_loads is not None: @@ -264,7 +272,7 @@ async def delete( self, db_obj: Union[DeclarativeMeta, List[DeclarativeMeta]], - session_add: Optional[List[DeclarativeMeta]] = None + session_add: Optional[List[DeclarativeMeta]] = None, ) -> None: """Delete an object from database @@ -289,7 +297,7 @@ ## Profiles def get_profiles_list(self) -> List[str]: - """"Return list of all registered profiles""" + """ "Return list of all registered profiles""" return list(self.profiles.keys()) def has_profile(self, profile_name: str) -> bool: @@ -309,7 +317,9 @@ try: return self.components[self.profiles[profile_name]] except KeyError: - raise exceptions.NotFound("the requested profile doesn't exists or is not a component") + raise exceptions.NotFound( + "the requested profile doesn't exists or is not a component" + ) @aio async def create_profile(self, name: str, component_ep: Optional[str] = None) -> None: @@ -344,7 +354,7 @@ del self.profiles[profile.name] if profile.id in self.components: del self.components[profile.id] - log.info(_("Profile {name!r} deleted").format(name = name)) + log.info(_("Profile {name!r} deleted").format(name=name)) ## Params @@ -376,7 +386,9 @@ params_ind[(p.category, p.name)] = p.value @aio - async def get_ind_param(self, category: str, name: str, profile: str) -> Optional[str]: + async def get_ind_param( + self, category: str, name: str, profile: str + ) -> Optional[str]: """Ask database for the value of one specific individual parameter @param category: category of the parameter @@ -385,11 +397,8 @@ """ async with self.session() as session: result = await session.execute( - select(ParamInd.value) - .filter_by( - category=category, - name=name, - profile_id=self.profiles[profile] + select(ParamInd.value).filter_by( + category=category, name=name, profile_id=self.profiles[profile] ) ) return result.scalar_one_or_none() @@ -405,10 +414,7 @@ async with self.session() as session: result = await session.execute( select(ParamInd) - .filter_by( - category=category, - name=name - ) + .filter_by(category=category, name=name) .options(subqueryload(ParamInd.profile)) ) return {param.profile.name: param.value for param in result.scalars()} @@ -422,26 +428,20 @@ @param value: value to set """ async with self.session() as session: - stmt = insert(ParamGen).values( - category=category, - name=name, - value=value - ).on_conflict_do_update( - index_elements=(ParamGen.category, ParamGen.name), - set_={ - ParamGen.value: value - } + stmt = ( + insert(ParamGen) + .values(category=category, name=name, value=value) + .on_conflict_do_update( + index_elements=(ParamGen.category, ParamGen.name), + set_={ParamGen.value: value}, + ) ) await session.execute(stmt) await session.commit() @aio async def set_ind_param( - self, - category:str, - name: str, - value: Optional[str], - profile: str + self, category: str, name: str, value: Optional[str], profile: str ) -> None: """Save the individual parameters in database @@ -451,16 +451,22 @@ @param profile: a profile which *must* exist """ async with self.session() as session: - stmt = insert(ParamInd).values( - category=category, - name=name, - profile_id=self.profiles[profile], - value=value - ).on_conflict_do_update( - index_elements=(ParamInd.category, ParamInd.name, ParamInd.profile_id), - set_={ - ParamInd.value: value - } + stmt = ( + insert(ParamInd) + .values( + category=category, + name=name, + profile_id=self.profiles[profile], + value=value, + ) + .on_conflict_do_update( + index_elements=( + ParamInd.category, + ParamInd.name, + ParamInd.profile_id, + ), + set_={ParamInd.value: value}, + ) ) await session.execute(stmt) await session.commit() @@ -474,13 +480,11 @@ if jid_.resource: if dest: return and_( - History.dest == jid_.userhost(), - History.dest_res == jid_.resource + History.dest == jid_.userhost(), History.dest_res == jid_.resource ) else: return and_( - History.source == jid_.userhost(), - History.source_res == jid_.resource + History.source == jid_.userhost(), History.source_res == jid_.resource ) else: if dest: @@ -497,9 +501,7 @@ between: bool = True, filters: Optional[Dict[str, str]] = None, profile: Optional[str] = None, - ) -> List[Tuple[ - str, int, str, str, Dict[str, str], Dict[str, str], str, str, str] - ]: + ) -> List[Tuple[str, int, str, str, Dict[str, str], Dict[str, str], str, str, str]]: """Retrieve messages in history @param from_jid: source JID (full, or bare for catchall) @@ -523,9 +525,7 @@ stmt = ( select(History) - .filter_by( - profile_id=self.profiles[profile] - ) + .filter_by(profile_id=self.profiles[profile]) .outerjoin(History.messages) .outerjoin(History.subjects) .outerjoin(History.thread) @@ -540,11 +540,10 @@ # order when returning the result. We use DESC here so LIMIT keep the last # messages History.timestamp.desc(), - History.received_timestamp.desc() + History.received_timestamp.desc(), ) ) - if not from_jid and not to_jid: # no jid specified, we want all one2one communications pass @@ -554,10 +553,7 @@ # from or to this jid jid_ = from_jid or to_jid stmt = stmt.where( - or_( - self._jid_filter(jid_), - self._jid_filter(jid_, dest=True) - ) + or_(self._jid_filter(jid_), self._jid_filter(jid_, dest=True)) ) else: # we have 2 jids specified, we check all communications between @@ -571,7 +567,7 @@ and_( self._jid_filter(to_jid), self._jid_filter(from_jid, dest=True), - ) + ), ) ) else: @@ -583,44 +579,44 @@ stmt = stmt.where(self._jid_filter(to_jid, dest=True)) if filters: - if 'timestamp_start' in filters: - stmt = stmt.where(History.timestamp >= float(filters['timestamp_start'])) - if 'before_uid' in filters: + if "timestamp_start" in filters: + stmt = stmt.where(History.timestamp >= float(filters["timestamp_start"])) + if "before_uid" in filters: # orignially this query was using SQLITE's rowid. This has been changed # to use coalesce(received_timestamp, timestamp) to be SQL engine independant stmt = stmt.where( - coalesce( - History.received_timestamp, - History.timestamp - ) < ( - select(coalesce(History.received_timestamp, History.timestamp)) - .filter_by(uid=filters["before_uid"]) + coalesce(History.received_timestamp, History.timestamp) + < ( + select( + coalesce(History.received_timestamp, History.timestamp) + ).filter_by(uid=filters["before_uid"]) ).scalar_subquery() ) - if 'body' in filters: + if "body" in filters: # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html stmt = stmt.where(Message.message.like(f"%{filters['body']}%")) - if 'search' in filters: + if "search" in filters: search_term = f"%{filters['search']}%" - stmt = stmt.where(or_( - Message.message.like(search_term), - History.source_res.like(search_term) - )) - if 'types' in filters: - types = filters['types'].split() + stmt = stmt.where( + or_( + Message.message.like(search_term), + History.source_res.like(search_term), + ) + ) + if "types" in filters: + types = filters["types"].split() stmt = stmt.where(History.type.in_(types)) - if 'not_types' in filters: - types = filters['not_types'].split() + if "not_types" in filters: + types = filters["not_types"].split() stmt = stmt.where(History.type.not_in(types)) - if 'last_stanza_id' in filters: + if "last_stanza_id" in filters: # this request get the last message with a "stanza_id" that we # have in history. This is mainly used to retrieve messages sent # while we were offline, using MAM (XEP-0313). - if (filters['last_stanza_id'] is not True - or limit != 1): + if filters["last_stanza_id"] is not True or limit != 1: raise ValueError("Unexpected values for last_stanza_id filter") stmt = stmt.where(History.stanza_id.is_not(None)) - if 'origin_id' in filters: + if "origin_id" in filters: stmt = stmt.where(History.origin_id == filters["origin_id"]) if limit is not None: @@ -640,34 +636,40 @@ @param data: message data as build by SatMessageProtocol.onMessage """ extra = {k: v for k, v in data["extra"].items() if k not in NOT_IN_EXTRA} - messages = [Message(message=mess, language=lang) - for lang, mess in data["message"].items()] - subjects = [Subject(subject=mess, language=lang) - for lang, mess in data["subject"].items()] + messages = [ + Message(message=mess, language=lang) for lang, mess in data["message"].items() + ] + subjects = [ + Subject(subject=mess, language=lang) for lang, mess in data["subject"].items() + ] if "thread" in data["extra"]: - thread = Thread(thread_id=data["extra"]["thread"], - parent_id=data["extra"].get["thread_parent"]) + thread = Thread( + thread_id=data["extra"]["thread"], + parent_id=data["extra"].get["thread_parent"], + ) else: thread = None try: async with self.session() as session: async with session.begin(): - session.add(History( - uid=data["uid"], - origin_id=data["extra"].get("origin_id"), - stanza_id=data["extra"].get("stanza_id"), - update_uid=data["extra"].get("update_uid"), - profile_id=self.profiles[profile], - source_jid=data["from"], - dest_jid=data["to"], - timestamp=data["timestamp"], - received_timestamp=data.get("received_timestamp"), - type=data["type"], - extra=extra, - messages=messages, - subjects=subjects, - thread=thread, - )) + session.add( + History( + uid=data["uid"], + origin_id=data["extra"].get("origin_id"), + stanza_id=data["extra"].get("stanza_id"), + update_uid=data["extra"].get("update_uid"), + profile_id=self.profiles[profile], + source_jid=data["from"], + dest_jid=data["to"], + timestamp=data["timestamp"], + received_timestamp=data.get("received_timestamp"), + type=data["type"], + extra=extra, + messages=messages, + subjects=subjects, + thread=thread, + ) + ) except IntegrityError as e: if "unique" in str(e.orig).lower(): log.debug( @@ -689,14 +691,13 @@ else: return PrivateIndBin if binary else PrivateInd - @aio async def get_privates( self, - namespace:str, + namespace: str, keys: Optional[Iterable[str]] = None, binary: bool = False, - profile: Optional[str] = None + profile: Optional[str] = None, ) -> Dict[str, Any]: """Get private value(s) from databases @@ -728,10 +729,10 @@ async def set_private_value( self, namespace: str, - key:str, + key: str, value: Any, binary: bool = False, - profile: Optional[str] = None + profile: Optional[str] = None, ) -> None: """Set a private value in database @@ -745,11 +746,7 @@ """ cls = self._get_private_class(binary, profile) - values = { - "namespace": namespace, - "key": key, - "value": value - } + values = {"namespace": namespace, "key": key, "value": value} index_elements = [cls.namespace, cls.key] if profile is not None: @@ -758,11 +755,10 @@ async with self.session() as session: await session.execute( - insert(cls).values(**values).on_conflict_do_update( - index_elements=index_elements, - set_={ - cls.value: value - } + insert(cls) + .values(**values) + .on_conflict_do_update( + index_elements=index_elements, set_={cls.value: value} ) ) await session.commit() @@ -773,7 +769,7 @@ namespace: str, key: str, binary: bool = False, - profile: Optional[str] = None + profile: Optional[str] = None, ) -> None: """Delete private value from database @@ -796,10 +792,7 @@ @aio async def del_private_namespace( - self, - namespace: str, - binary: bool = False, - profile: Optional[str] = None + self, namespace: str, binary: bool = False, profile: Optional[str] = None ) -> None: """Delete all data from a private namespace @@ -825,7 +818,7 @@ self, client: Optional[SatXMPPEntity], file_id: Optional[str] = None, - version: Optional[str] = '', + version: Optional[str] = "", parent: Optional[str] = None, type_: Optional[str] = None, file_hash: Optional[str] = None, @@ -837,7 +830,7 @@ owner: Optional[jid.JID] = None, access: Optional[dict] = None, projection: Optional[List[str]] = None, - unique: bool = False + unique: bool = False, ) -> List[dict]: """Retrieve files with with given filters @@ -857,9 +850,23 @@ """ if projection is None: projection = [ - 'id', 'version', 'parent', 'type', 'file_hash', 'hash_algo', 'name', - 'size', 'namespace', 'media_type', 'media_subtype', 'public_id', - 'created', 'modified', 'owner', 'access', 'extra' + "id", + "version", + "parent", + "type", + "file_hash", + "hash_algo", + "name", + "size", + "namespace", + "media_type", + "media_subtype", + "public_id", + "created", + "modified", + "owner", + "access", + "extra", ] stmt = select(*[getattr(File, f) for f in projection]) @@ -891,7 +898,7 @@ if namespace is not None: stmt = stmt.filter_by(namespace=namespace) if mime_type is not None: - if '/' in mime_type: + if "/" in mime_type: media_type, media_subtype = mime_type.split("/", 1) stmt = stmt.filter_by(media_type=media_type, media_subtype=media_subtype) else: @@ -901,8 +908,8 @@ if owner is not None: stmt = stmt.filter_by(owner=owner) if access is not None: - raise NotImplementedError('Access check is not implemented yet') - # a JSON comparison is needed here + raise NotImplementedError("Access check is not implemented yet") + # a JSON comparison is needed here async with self.session() as session: result = await session.execute(stmt) @@ -928,7 +935,7 @@ modified: Optional[float] = None, owner: Optional[jid.JID] = None, access: Optional[dict] = None, - extra: Optional[dict] = None + extra: Optional[dict] = None, ) -> None: """Set a file metadata @@ -958,33 +965,35 @@ """ if mime_type is None: media_type = media_subtype = None - elif '/' in mime_type: - media_type, media_subtype = mime_type.split('/', 1) + elif "/" in mime_type: + media_type, media_subtype = mime_type.split("/", 1) else: media_type, media_subtype = mime_type, None async with self.session() as session: async with session.begin(): - session.add(File( - id=file_id, - version=version.strip(), - parent=parent, - type=type_, - file_hash=file_hash, - hash_algo=hash_algo, - name=name, - size=size, - namespace=namespace, - media_type=media_type, - media_subtype=media_subtype, - public_id=public_id, - created=time.time() if created is None else created, - modified=modified, - owner=owner, - access=access, - extra=extra, - profile_id=self.profiles[client.profile] - )) + session.add( + File( + id=file_id, + version=version.strip(), + parent=parent, + type=type_, + file_hash=file_hash, + hash_algo=hash_algo, + name=name, + size=size, + namespace=namespace, + media_type=media_type, + media_subtype=media_subtype, + public_id=public_id, + created=time.time() if created is None else created, + modified=modified, + owner=owner, + access=access, + extra=extra, + profile_id=self.profiles[client.profile], + ) + ) @aio async def file_get_used_space(self, client: SatXMPPEntity, owner: jid.JID) -> int: @@ -993,8 +1002,9 @@ select(sum_(File.size)).filter_by( owner=owner, type=C.FILE_TYPE_FILE, - profile_id=self.profiles[client.profile] - )) + profile_id=self.profiles[client.profile], + ) + ) return result.scalar_one_or_none() or 0 @aio @@ -1011,10 +1021,7 @@ @aio async def file_update( - self, - file_id: str, - column: str, - update_cb: Callable[[dict], None] + self, file_id: str, column: str, update_cb: Callable[[dict], None] ) -> None: """Update a column value using a method to avoid race conditions @@ -1029,16 +1036,16 @@ it get the deserialized data (i.e. a Python object) directly @raise exceptions.NotFound: there is not file with this id """ - if column not in ('access', 'extra'): - raise exceptions.InternalError('bad column name') + if column not in ("access", "extra"): + raise exceptions.InternalError("bad column name") orm_col = getattr(File, column) for i in range(5): async with self.session() as session: try: - value = (await session.execute( - select(orm_col).filter_by(id=file_id) - )).scalar_one() + value = ( + await session.execute(select(orm_col).filter_by(id=file_id)) + ).scalar_one() except NoResultFound: raise exceptions.NotFound old_value = copy.deepcopy(value) @@ -1057,14 +1064,17 @@ break log.warning( - _("table not updated, probably due to race condition, trying again " - "({tries})").format(tries=i+1) + _( + "table not updated, probably due to race condition, trying again " + "({tries})" + ).format(tries=i + 1) ) else: raise exceptions.DatabaseError( - _("Can't update file {file_id} due to race condition") - .format(file_id=file_id) + _("Can't update file {file_id} due to race condition").format( + file_id=file_id + ) ) @aio @@ -1076,7 +1086,7 @@ with_items: bool = False, with_subscriptions: bool = False, create: bool = False, - create_kwargs: Optional[dict] = None + create_kwargs: Optional[dict] = None, ) -> Optional[PubsubNode]: """Retrieve a PubsubNode from DB @@ -1089,22 +1099,15 @@ needs to be created. """ async with self.session() as session: - stmt = ( - select(PubsubNode) - .filter_by( - service=service, - name=name, - profile_id=self.profiles[client.profile], - ) + stmt = select(PubsubNode).filter_by( + service=service, + name=name, + profile_id=self.profiles[client.profile], ) if with_items: - stmt = stmt.options( - joinedload(PubsubNode.items) - ) + stmt = stmt.options(joinedload(PubsubNode.items)) if with_subscriptions: - stmt = stmt.options( - joinedload(PubsubNode.subscriptions) - ) + stmt = stmt.options(joinedload(PubsubNode.subscriptions)) result = await session.execute(stmt) ret = result.unique().scalar_one_or_none() if ret is None and create: @@ -1112,21 +1115,23 @@ if create_kwargs is None: create_kwargs = {} try: - return await as_future(self.set_pubsub_node( - client, service, name, **create_kwargs - )) + return await as_future( + self.set_pubsub_node(client, service, name, **create_kwargs) + ) except IntegrityError as e: if "unique" in str(e.orig).lower(): # the node may already exist, if it has been created just after # get_pubsub_node above log.debug("ignoring UNIQUE constraint error") - cached_node = await as_future(self.get_pubsub_node( - client, - service, - name, - with_items=with_items, - with_subscriptions=with_subscriptions - )) + cached_node = await as_future( + self.get_pubsub_node( + client, + service, + name, + with_items=with_items, + with_subscriptions=with_subscriptions, + ) + ) else: raise e else: @@ -1160,9 +1165,7 @@ @aio async def update_pubsub_node_sync_state( - self, - node: PubsubNode, - state: SyncState + self, node: PubsubNode, state: SyncState ) -> None: async with self.session() as session: async with session.begin(): @@ -1180,7 +1183,7 @@ self, profiles: Optional[List[str]], services: Optional[List[jid.JID]], - names: Optional[List[str]] + names: Optional[List[str]], ) -> None: """Delete items cached for a node @@ -1194,9 +1197,7 @@ stmt = delete(PubsubNode) if profiles is not None: stmt = stmt.where( - PubsubNode.profile.in_( - [self.profiles[p] for p in profiles] - ) + PubsubNode.profile.in_([self.profiles[p] for p in profiles]) ) if services is not None: stmt = stmt.where(PubsubNode.service.in_(services)) @@ -1223,27 +1224,29 @@ async with session.begin(): for idx, item in enumerate(items): parsed = parsed_items[idx] if parsed_items else None - stmt = insert(PubsubItem).values( - node_id = node.id, - name = item["id"], - data = item, - parsed = parsed, - ).on_conflict_do_update( - index_elements=(PubsubItem.node_id, PubsubItem.name), - set_={ - PubsubItem.data: item, - PubsubItem.parsed: parsed, - PubsubItem.updated: now() - } + stmt = ( + insert(PubsubItem) + .values( + node_id=node.id, + name=item["id"], + data=item, + parsed=parsed, + ) + .on_conflict_do_update( + index_elements=(PubsubItem.node_id, PubsubItem.name), + set_={ + PubsubItem.data: item, + PubsubItem.parsed: parsed, + PubsubItem.updated: now(), + }, + ) ) await session.execute(stmt) await session.commit() @aio async def delete_pubsub_items( - self, - node: PubsubNode, - items_names: Optional[List[str]] = None + self, node: PubsubNode, items_names: Optional[List[str]] = None ) -> None: """Delete items cached for a node @@ -1296,10 +1299,8 @@ if values is None: continue sub_q = sub_q.where(getattr(PubsubNode, col).in_(values)) - stmt = ( - stmt - .where(PubsubItem.node_id.in_(sub_q)) - .execution_options(synchronize_session=False) + stmt = stmt.where(PubsubItem.node_id.in_(sub_q)).execution_options( + synchronize_session=False ) if created_before is not None: @@ -1367,11 +1368,7 @@ use_rsm = True from_index = 0 - stmt = ( - select(PubsubItem) - .filter_by(node_id=node.id) - .limit(max_items) - ) + stmt = select(PubsubItem).filter_by(node_id=node.id).limit(max_items) if item_ids is not None: stmt = stmt.where(PubsubItem.name.in_(item_ids)) @@ -1402,7 +1399,7 @@ PubsubItem.id, PubsubItem.name, # row_number starts from 1, but RSM index must start from 0 - (func.row_number().over(order_by=order)-1).label("item_index") + (func.row_number().over(order_by=order) - 1).label("item_index"), ).filter_by(node_id=node.id) row_num_cte = row_num_q.cte() @@ -1412,8 +1409,9 @@ # we need to use row number item_name = before or after row_num_limit_q = ( - select(row_num_cte.c.item_index) - .where(row_num_cte.c.name==item_name) + select(row_num_cte.c.item_index).where( + row_num_cte.c.name == item_name + ) ).scalar_subquery() stmt = ( @@ -1422,22 +1420,16 @@ .limit(max_items) ) if before: - stmt = ( - stmt - .where(row_num_cte.c.item_indexrow_num_limit_q) - .order_by(row_num_cte.c.item_index.asc()) - ) + stmt = stmt.where( + row_num_cte.c.item_index > row_num_limit_q + ).order_by(row_num_cte.c.item_index.asc()) else: - stmt = ( - stmt - .where(row_num_cte.c.item_index>=from_index) - .order_by(row_num_cte.c.item_index.asc()) + stmt = stmt.where(row_num_cte.c.item_index >= from_index).order_by( + row_num_cte.c.item_index.asc() ) # from_index is used @@ -1468,12 +1460,14 @@ last = result[-1][1].name metadata["rsm"] = { - k: v for k, v in { + k: v + for k, v in { "index": index, "count": rows_count, "first": first, "last": last, - }.items() if v is not None + }.items() + if v is not None } metadata["complete"] = (index or 0) + len(result) == rows_count @@ -1487,10 +1481,7 @@ result.reverse() return result, metadata - def _get_sqlite_path( - self, - path: List[Union[str, int]] - ) -> str: + def _get_sqlite_path(self, path: List[Union[str, int]]) -> str: """generate path suitable to query JSON element with SQLite""" return f"${''.join(f'[{p}]' if isinstance(p, int) else f'.{p}' for p in path)}" @@ -1557,19 +1548,20 @@ # Full-Text Search fts = query.get("fts") if fts: - fts_select = text( - "SELECT rowid, rank FROM pubsub_items_fts(:fts_query)" - ).bindparams(fts_query=fts).columns(rowid=Integer).subquery() - stmt = ( - stmt - .select_from(fts_select) - .outerjoin(PubsubItem, fts_select.c.rowid == PubsubItem.id) + fts_select = ( + text("SELECT rowid, rank FROM pubsub_items_fts(:fts_query)") + .bindparams(fts_query=fts) + .columns(rowid=Integer) + .subquery() + ) + stmt = stmt.select_from(fts_select).outerjoin( + PubsubItem, fts_select.c.rowid == PubsubItem.id ) # node related filters profiles = query.get("profiles") - if (profiles - or any(query.get(k) for k in ("nodes", "services", "types", "subtypes")) + if profiles or any( + query.get(k) for k in ("nodes", "services", "types", "subtypes") ): stmt = stmt.join(PubsubNode).options(contains_eager(PubsubItem.node)) if profiles: @@ -1585,7 +1577,7 @@ ("nodes", "name"), ("services", "service"), ("types", "type_"), - ("subtypes", "subtype") + ("subtypes", "subtype"), ): value = query.get(key) if not value: @@ -1596,10 +1588,7 @@ value.remove(None) condition = getattr(PubsubNode, attr).is_(None) if value: - condition = or_( - getattr(PubsubNode, attr).in_(value), - condition - ) + condition = or_(getattr(PubsubNode, attr).in_(value), condition) else: condition = getattr(PubsubNode, attr).in_(value) stmt = stmt.where(condition) @@ -1644,10 +1633,12 @@ try: left, right = value except (ValueError, TypeError): - raise ValueError(_( - "invalid value for \"between\" filter, you must use a 2 items " - "array: {value!r}" - ).format(value=value)) + raise ValueError( + _( + 'invalid value for "between" filter, you must use a 2 items ' + "array: {value!r}" + ).format(value=value) + ) col = func.json_extract(PubsubItem.parsed, sqlite_path) stmt = stmt.where(col.between(left, right)) else: @@ -1662,10 +1653,12 @@ for order_data in order_by: order, path = order_data.get("order"), order_data.get("path") if order and path: - raise ValueError(_( - '"order" and "path" can\'t be used at the same time in ' - '"order-by" data' - )) + raise ValueError( + _( + '"order" and "path" can\'t be used at the same time in ' + '"order-by" data' + ) + ) if order: if order == "creation": col = PubsubItem.id @@ -1702,3 +1695,172 @@ result = await session.execute(stmt) return result.scalars().all() + + # Notifications + + @aio + async def add_notification( + self, + client: Optional[SatXMPPEntity], + type_: NotificationType, + body_plain: str, + body_rich: Optional[str] = None, + title: Optional[str] = None, + requires_action: bool = False, + priority: NotificationPriority = NotificationPriority.MEDIUM, + expire_at: Optional[float] = None, + extra: Optional[dict] = None, + ) -> Notification: + """Add a new notification to the DB. + + @param client: client associated with the notification. If None, the notification + will be global. + @param type_: type of the notification. + @param body_plain: plain text body. + @param body_rich: rich text (XHTML) body. + @param title: optional title. + @param requires_action: True if the notification requires user action (e.g. a + dialog need to be answered). + @priority: how urgent the notification is + @param expire_at: expiration timestamp for the notification. + @param extra: additional data. + @return: created Notification + """ + profile_id = self.profiles[client.profile] if client else None + notification = Notification( + profile_id=profile_id, + type=type_, + body_plain=body_plain, + body_rich=body_rich, + requires_action=requires_action, + priority=priority, + expire_at=expire_at, + title=title, + extra_data=extra, + status=NotificationStatus.new, + ) + async with self.session() as session: + async with session.begin(): + session.add(notification) + return notification + + @aio + async def update_notification( + self, client: SatXMPPEntity, notification_id: int, **kwargs + ) -> None: + """Update an existing notification. + + @param client: client associated with the notification. + @param notification_id: ID of the notification to update. + """ + profile_id = self.profiles[client.profile] + async with self.session() as session: + await session.execute( + update(Notification) + .where( + and_( + Notification.profile_id == profile_id, + Notification.id == notification_id, + ) + ) + .values(**kwargs) + ) + await session.commit() + + @aio + async def get_notifications( + self, + client: SatXMPPEntity, + type_: Optional[NotificationType] = None, + status: Optional[NotificationStatus] = None, + requires_action: Optional[bool] = None, + min_priority: Optional[int] = None + ) -> List[Notification]: + """Retrieve all notifications for a given profile with optional filters. + + @param client: client associated with the notifications. + @param type_: filter by type of the notification. + @param status: filter by status of the notification. + @param requires_action: filter by notifications that require user action. + @param min_priority: filter by minimum priority value. + @return: list of matching Notification instances. + """ + profile_id = self.profiles[client.profile] + filters = [or_(Notification.profile_id == profile_id, Notification.profile_id.is_(None))] + + if type_: + filters.append(Notification.type == type_) + if status: + filters.append(Notification.status == status) + if requires_action is not None: + filters.append(Notification.requires_action == requires_action) + if min_priority: + filters.append(Notification.priority >= min_priority) + + async with self.session() as session: + result = await session.execute( + select(Notification) + .where(and_(*filters)) + .order_by(Notification.id) + ) + return result.scalars().all() + + @aio + async def delete_notification( + self, client: Optional[SatXMPPEntity], notification_id: str + ) -> None: + """Delete a notification by its profile and id. + + @param client: client associated with the notification. If None, profile_id will be NULL. + @param notification_id: ID of the notification to delete. + """ + profile_id = self.profiles[client.profile] if client else None + async with self.session() as session: + await session.execute( + delete(Notification).where( + and_( + Notification.profile_id == profile_id, + Notification.id == int(notification_id), + ) + ) + ) + await session.commit() + + @aio + async def clean_expired_notifications( + self, client: Optional[SatXMPPEntity], limit_timestamp: Optional[float] = None + ) -> None: + """Cleans expired notifications and older profile-specific notifications. + + - Removes all notifications where the expiration timestamp has passed, + irrespective of their profile. + - If a limit_timestamp is provided, removes older notifications with a profile set + (i.e., not global notifications) that do not require user action. If client is + provided, only remove notification for this profile. + + @param client: if provided, only expire notification for this client (in addition + to truly expired notifications for everybody). + @param limit_timestamp: Timestamp limit for older notifications. If None, only + truly expired notifications are removed. + """ + + # Delete truly expired notifications + expired_condition = Notification.expire_at < time.time() + + # Delete older profile-specific notifications (created before the limit_timestamp) + if client is None: + profile_condition = Notification.profile_id.isnot(None) + else: + profile_condition = Notification.profile_id == self.profiles[client.profile] + older_condition = and_( + profile_condition, + Notification.timestamp < limit_timestamp if limit_timestamp else False, + Notification.requires_action == False, + ) + + # Combine the conditions + conditions = or_(expired_condition, older_condition) + + async with self.session() as session: + await session.execute(delete(Notification).where(conditions)) + await session.commit() diff -r 51744ad00a42 -r 02f0adc745c6 libervia/backend/memory/sqla_mapping.py --- a/libervia/backend/memory/sqla_mapping.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/backend/memory/sqla_mapping.py Mon Oct 16 17:29:31 2023 +0200 @@ -16,12 +16,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Dict, Any from datetime import datetime import enum import json import pickle import time +from typing import Any, Dict from sqlalchemy import ( Boolean, @@ -45,21 +45,52 @@ from twisted.words.protocols.jabber import jid from wokkel import generic +from libervia.backend.core.constants import Const as C + Base = declarative_base( metadata=MetaData( naming_convention={ - "ix": 'ix_%(column_0_label)s', + "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", - "pk": "pk_%(table_name)s" + "pk": "pk_%(table_name)s", } ) ) # keys which are in message data extra but not stored in extra field this is # because those values are stored in separate fields -NOT_IN_EXTRA = ('origin_id', 'stanza_id', 'received_timestamp', 'update_uid') +NOT_IN_EXTRA = ("origin_id", "stanza_id", "received_timestamp", "update_uid") + + +class Profiles(dict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.id_to_profile = {v: k for k, v in self.items()} + + def __setitem__(self, key, value): + super().__setitem__(key, value) + self.id_to_profile[value] = key + + def __delitem__(self, key): + del self.id_to_profile[self[key]] + super().__delitem__(key) + + def update(self, *args, **kwargs): + super().update(*args, **kwargs) + self.id_to_profile = {v: k for k, v in self.items()} + + def clear(self): + super().clear() + self.id_to_profile.clear() + + +profiles = Profiles() + + +def get_profile_by_id( profile_id): + return profiles.id_to_profile.get(profile_id) class SyncState(enum.Enum): @@ -78,11 +109,34 @@ PENDING = 2 +class NotificationType(enum.Enum): + chat = "chat" + blog = "blog" + calendar = "calendar" + file = "file" + call = "call" + service = "service" + other = "other" + + +class NotificationStatus(enum.Enum): + new = "new" + read = "read" + + +class NotificationPriority(enum.IntEnum): + LOW = 10 + MEDIUM = 20 + HIGH = 30 + URGENT = 40 + + class LegacyPickle(TypeDecorator): """Handle troubles with data pickled by former version of SàT This type is temporary until we do migration to a proper data type """ + # Blob is used on SQLite but gives errors when used here, while Text works fine impl = Text cache_ok = True @@ -109,12 +163,13 @@ # JSON return pickle.loads( value.replace(b"sat.plugins", b"libervia.backend.plugins"), - encoding="utf-8" + encoding="utf-8", ) class Json(TypeDecorator): """Handle JSON field in DB independant way""" + # Blob is used on SQLite but gives errors when used here, while Text works fine impl = Text cache_ok = True @@ -156,6 +211,7 @@ class JID(TypeDecorator): """Store twisted JID in text fields""" + impl = Text cache_ok = True @@ -193,9 +249,7 @@ __tablename__ = "components" profile_id = Column( - ForeignKey("profiles.id", ondelete="CASCADE"), - nullable=True, - primary_key=True + ForeignKey("profiles.id", ondelete="CASCADE"), nullable=True, primary_key=True ) entry_point = Column(Text, nullable=False) profile = relationship("Profile") @@ -209,7 +263,7 @@ Index("history__profile_id_timestamp", "profile_id", "timestamp"), Index( "history__profile_id_received_timestamp", "profile_id", "received_timestamp" - ) + ), ) uid = Column(Text, primary_key=True) @@ -295,14 +349,14 @@ if self.thread.parent_id is not None: extra["thread_parent"] = self.thread.parent_id - return { - "from": f"{self.source}/{self.source_res}" if self.source_res - else self.source, + "from": f"{self.source}/{self.source_res}" + if self.source_res + else self.source, "to": f"{self.dest}/{self.dest_res}" if self.dest_res else self.dest, "uid": self.uid, - "message": {m.language or '': m.message for m in self.messages}, - "subject": {m.language or '': m.subject for m in self.subjects}, + "message": {m.language or "": m.message for m in self.messages}, + "subject": {m.language or "": m.subject for m in self.subjects}, "type": self.type, "extra": extra, "timestamp": self.timestamp, @@ -311,8 +365,14 @@ def as_tuple(self): d = self.serialise() return ( - d['uid'], d['timestamp'], d['from'], d['to'], d['message'], d['subject'], - d['type'], d['extra'] + d["uid"], + d["timestamp"], + d["from"], + d["to"], + d["message"], + d["subject"], + d["type"], + d["extra"], ) @staticmethod @@ -336,9 +396,7 @@ class Message(Base): __tablename__ = "message" - __table_args__ = ( - Index("message__history_uid", "history_uid"), - ) + __table_args__ = (Index("message__history_uid", "history_uid"),) id = Column( Integer, @@ -358,16 +416,14 @@ def __repr__(self): lang_str = f"[{self.language}]" if self.language else "" - msg = f"{self.message[:20]}…" if len(self.message)>20 else self.message + msg = f"{self.message[:20]}…" if len(self.message) > 20 else self.message content = f"{lang_str}{msg}" return f"Message<{content}>" class Subject(Base): __tablename__ = "subject" - __table_args__ = ( - Index("subject__history_uid", "history_uid"), - ) + __table_args__ = (Index("subject__history_uid", "history_uid"),) id = Column( Integer, @@ -387,16 +443,14 @@ def __repr__(self): lang_str = f"[{self.language}]" if self.language else "" - msg = f"{self.subject[:20]}…" if len(self.subject)>20 else self.subject + msg = f"{self.subject[:20]}…" if len(self.subject) > 20 else self.subject content = f"{lang_str}{msg}" return f"Subject<{content}>" class Thread(Base): __tablename__ = "thread" - __table_args__ = ( - Index("thread__history_uid", "history_uid"), - ) + __table_args__ = (Index("thread__history_uid", "history_uid"),) id = Column( Integer, @@ -412,6 +466,51 @@ return f"Thread<{self.thread_id} [parent: {self.parent_id}]>" +class Notification(Base): + __tablename__ = "notifications" + __table_args__ = (Index("notifications_profile_id_status", "profile_id", "status"),) + + id = Column(Integer, primary_key=True, autoincrement=True) + timestamp = Column(Float, nullable=False, default=time.time) + expire_at = Column(Float, nullable=True) + + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"), index=True, nullable=True) + profile = relationship("Profile") + + type = Column(Enum(NotificationType), nullable=False) + + title = Column(Text, nullable=True) + body_plain = Column(Text, nullable=False) + body_rich = Column(Text, nullable=True) + + requires_action = Column(Boolean, default=False) + priority = Column(Integer, default=NotificationPriority.MEDIUM.value) + + extra_data = Column(JSON) + status = Column(Enum(NotificationStatus), default=NotificationStatus.new) + + def serialise(self) -> dict[str, str | float | bool | int | dict]: + """ + Serialises the Notification instance to a dictionary. + """ + result = {} + for column in self.__table__.columns: + value = getattr(self, column.name) + if value is not None: + if column.name in ("type", "status"): + result[column.name] = value.name + elif column.name == "id": + result[column.name] = str(value) + elif column.name == "profile_id": + if value is None: + result["profile"] = C.PROF_KEY_ALL + else: + result["profile"] = get_profile_by_id(value) + else: + result[column.name] = value + return result + + class ParamGen(Base): __tablename__ = "param_gen" @@ -425,9 +524,7 @@ category = Column(Text, primary_key=True) name = Column(Text, primary_key=True) - profile_id = Column( - ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True - ) + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True) value = Column(Text) profile = relationship("Profile", back_populates="params") @@ -446,9 +543,7 @@ namespace = Column(Text, primary_key=True) key = Column(Text, primary_key=True) - profile_id = Column( - ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True - ) + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True) value = Column(Text) profile = relationship("Profile", back_populates="private_data") @@ -467,9 +562,7 @@ namespace = Column(Text, primary_key=True) key = Column(Text, primary_key=True) - profile_id = Column( - ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True - ) + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE"), primary_key=True) value = Column(LegacyPickle) profile = relationship("Profile", back_populates="private_bin_data") @@ -484,8 +577,8 @@ "profile_id", "owner", "media_type", - "media_subtype" - ) + "media_subtype", + ), ) id = Column(Text, primary_key=True) @@ -493,11 +586,7 @@ version = Column(Text, primary_key=True) parent = Column(Text, nullable=False) type = Column( - Enum( - "file", "directory", - name="file_type", - create_constraint=True - ), + Enum("file", "directory", name="file_type", create_constraint=True), nullable=False, server_default="file", ) @@ -520,14 +609,10 @@ class PubsubNode(Base): __tablename__ = "pubsub_nodes" - __table_args__ = ( - UniqueConstraint("profile_id", "service", "name"), - ) + __table_args__ = (UniqueConstraint("profile_id", "service", "name"),) id = Column(Integer, primary_key=True) - profile_id = Column( - ForeignKey("profiles.id", ondelete="CASCADE") - ) + profile_id = Column(ForeignKey("profiles.id", ondelete="CASCADE")) service = Column(JID) name = Column(Text, nullable=False) subscribed = Column( @@ -540,19 +625,11 @@ name="sync_state", create_constraint=True, ), - nullable=True - ) - sync_state_updated = Column( - Float, - nullable=False, - default=time.time() + nullable=True, ) - type_ = Column( - Text, name="type", nullable=True - ) - subtype = Column( - Text, nullable=True - ) + sync_state_updated = Column(Float, nullable=False, default=time.time()) + type_ = Column(Text, name="type", nullable=True) + subtype = Column(Text, nullable=True) extra = Column(JSON) items = relationship("PubsubItem", back_populates="node", passive_deletes=True) @@ -567,10 +644,9 @@ Used by components managing a pubsub service """ + __tablename__ = "pubsub_subs" - __table_args__ = ( - UniqueConstraint("node_id", "subscriber"), - ) + __table_args__ = (UniqueConstraint("node_id", "subscriber"),) id = Column(Integer, primary_key=True) node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False) @@ -581,7 +657,7 @@ name="state", create_constraint=True, ), - nullable=True + nullable=True, ) node = relationship("PubsubNode", back_populates="subscriptions") @@ -589,9 +665,7 @@ class PubsubItem(Base): __tablename__ = "pubsub_items" - __table_args__ = ( - UniqueConstraint("node_id", "name"), - ) + __table_args__ = (UniqueConstraint("node_id", "name"),) id = Column(Integer, primary_key=True) node_id = Column(ForeignKey("pubsub_nodes.id", ondelete="CASCADE"), nullable=False) name = Column(Text, nullable=False) @@ -607,6 +681,7 @@ # create + @event.listens_for(PubsubItem.__table__, "after_create") def fts_create(target, connection, **kw): """Full-Text Search table creation""" @@ -626,13 +701,15 @@ " INSERT INTO pubsub_items_fts(pubsub_items_fts, rowid, data) VALUES" "('delete', old.id, old.data);" " INSERT INTO pubsub_items_fts(rowid, data) VALUES(new.id, new.data);" - "END" + "END", ] for q in queries: connection.execute(DDL(q)) + # drop + @event.listens_for(PubsubItem.__table__, "before_drop") def fts_drop(target, connection, **kw): "Full-Text Search table drop" "" diff -r 51744ad00a42 -r 02f0adc745c6 libervia/frontends/bridge/dbus_bridge.py --- a/libervia/frontends/bridge/dbus_bridge.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/frontends/bridge/dbus_bridge.py Mon Oct 16 17:29:31 2023 +0200 @@ -562,6 +562,62 @@ kwargs['error_handler'] = error_handler return self.db_core_iface.namespaces_get(**kwargs) + def notification_add(self, type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra, callback=None, errback=None): + if callback is None: + error_handler = None + else: + if errback is None: + errback = log.error + error_handler = lambda err:errback(dbus_to_bridge_exception(err)) + kwargs={} + if callback is not None: + kwargs['timeout'] = const_TIMEOUT + kwargs['reply_handler'] = callback + kwargs['error_handler'] = error_handler + return self.db_core_iface.notification_add(type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra, **kwargs) + + def notification_delete(self, id_, is_global, profile_key, callback=None, errback=None): + if callback is None: + error_handler = None + else: + if errback is None: + errback = log.error + error_handler = lambda err:errback(dbus_to_bridge_exception(err)) + kwargs={} + if callback is not None: + kwargs['timeout'] = const_TIMEOUT + kwargs['reply_handler'] = callback + kwargs['error_handler'] = error_handler + return self.db_core_iface.notification_delete(id_, is_global, profile_key, **kwargs) + + def notifications_expired_clean(self, limit_timestamp, profile_key, callback=None, errback=None): + if callback is None: + error_handler = None + else: + if errback is None: + errback = log.error + error_handler = lambda err:errback(dbus_to_bridge_exception(err)) + kwargs={} + if callback is not None: + kwargs['timeout'] = const_TIMEOUT + kwargs['reply_handler'] = callback + kwargs['error_handler'] = error_handler + return self.db_core_iface.notifications_expired_clean(limit_timestamp, profile_key, **kwargs) + + def notifications_get(self, filters, profile_key, callback=None, errback=None): + if callback is None: + error_handler = None + else: + if errback is None: + errback = log.error + error_handler = lambda err:errback(dbus_to_bridge_exception(err)) + kwargs={} + if callback is not None: + kwargs['timeout'] = const_TIMEOUT + kwargs['reply_handler'] = callback + kwargs['error_handler'] = error_handler + return str(self.db_core_iface.notifications_get(filters, profile_key, **kwargs)) + def param_get_a(self, name, category, attribute="value", profile_key="@DEFAULT@", callback=None, errback=None): if callback is None: error_handler = None @@ -1271,6 +1327,38 @@ self.db_core_iface.namespaces_get(timeout=const_TIMEOUT, reply_handler=reply_handler, error_handler=error_handler) return fut + def notification_add(self, type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra): + loop = asyncio.get_running_loop() + fut = loop.create_future() + reply_handler = lambda ret=None: loop.call_soon_threadsafe(fut.set_result, ret) + error_handler = lambda err: loop.call_soon_threadsafe(fut.set_exception, dbus_to_bridge_exception(err)) + self.db_core_iface.notification_add(type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra, timeout=const_TIMEOUT, reply_handler=reply_handler, error_handler=error_handler) + return fut + + def notification_delete(self, id_, is_global, profile_key): + loop = asyncio.get_running_loop() + fut = loop.create_future() + reply_handler = lambda ret=None: loop.call_soon_threadsafe(fut.set_result, ret) + error_handler = lambda err: loop.call_soon_threadsafe(fut.set_exception, dbus_to_bridge_exception(err)) + self.db_core_iface.notification_delete(id_, is_global, profile_key, timeout=const_TIMEOUT, reply_handler=reply_handler, error_handler=error_handler) + return fut + + def notifications_expired_clean(self, limit_timestamp, profile_key): + loop = asyncio.get_running_loop() + fut = loop.create_future() + reply_handler = lambda ret=None: loop.call_soon_threadsafe(fut.set_result, ret) + error_handler = lambda err: loop.call_soon_threadsafe(fut.set_exception, dbus_to_bridge_exception(err)) + self.db_core_iface.notifications_expired_clean(limit_timestamp, profile_key, timeout=const_TIMEOUT, reply_handler=reply_handler, error_handler=error_handler) + return fut + + def notifications_get(self, filters, profile_key): + loop = asyncio.get_running_loop() + fut = loop.create_future() + reply_handler = lambda ret=None: loop.call_soon_threadsafe(fut.set_result, ret) + error_handler = lambda err: loop.call_soon_threadsafe(fut.set_exception, dbus_to_bridge_exception(err)) + self.db_core_iface.notifications_get(filters, profile_key, timeout=const_TIMEOUT, reply_handler=reply_handler, error_handler=error_handler) + return fut + def param_get_a(self, name, category, attribute="value", profile_key="@DEFAULT@"): loop = asyncio.get_running_loop() fut = loop.create_future() diff -r 51744ad00a42 -r 02f0adc745c6 libervia/frontends/bridge/pb.py --- a/libervia/frontends/bridge/pb.py Wed Oct 18 15:30:07 2023 +0200 +++ b/libervia/frontends/bridge/pb.py Mon Oct 16 17:29:31 2023 +0200 @@ -489,6 +489,42 @@ else: d.addErrback(self._errback, ori_errback=errback) + def notification_add(self, type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra, callback=None, errback=None): + d = self.root.callRemote("notification_add", type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra) + if callback is not None: + d.addCallback(lambda __: callback()) + if errback is None: + d.addErrback(self._generic_errback) + else: + d.addErrback(self._errback, ori_errback=errback) + + def notification_delete(self, id_, is_global, profile_key, callback=None, errback=None): + d = self.root.callRemote("notification_delete", id_, is_global, profile_key) + if callback is not None: + d.addCallback(lambda __: callback()) + if errback is None: + d.addErrback(self._generic_errback) + else: + d.addErrback(self._errback, ori_errback=errback) + + def notifications_expired_clean(self, limit_timestamp, profile_key, callback=None, errback=None): + d = self.root.callRemote("notifications_expired_clean", limit_timestamp, profile_key) + if callback is not None: + d.addCallback(lambda __: callback()) + if errback is None: + d.addErrback(self._generic_errback) + else: + d.addErrback(self._errback, ori_errback=errback) + + def notifications_get(self, filters, profile_key, callback=None, errback=None): + d = self.root.callRemote("notifications_get", filters, profile_key) + if callback is not None: + d.addCallback(callback) + if errback is None: + d.addErrback(self._generic_errback) + else: + d.addErrback(self._errback, ori_errback=errback) + def param_get_a(self, name, category, attribute="value", profile_key="@DEFAULT@", callback=None, errback=None): d = self.root.callRemote("param_get_a", name, category, attribute, profile_key) if callback is not None: @@ -969,6 +1005,26 @@ d.addErrback(self._errback) return d.asFuture(asyncio.get_event_loop()) + def notification_add(self, type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra): + d = self.root.callRemote("notification_add", type_, body_plain, body_rich, title, is_global, requires_action, arg_6, priority, expire_at, extra) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def notification_delete(self, id_, is_global, profile_key): + d = self.root.callRemote("notification_delete", id_, is_global, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def notifications_expired_clean(self, limit_timestamp, profile_key): + d = self.root.callRemote("notifications_expired_clean", limit_timestamp, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + + def notifications_get(self, filters, profile_key): + d = self.root.callRemote("notifications_get", filters, profile_key) + d.addErrback(self._errback) + return d.asFuture(asyncio.get_event_loop()) + def param_get_a(self, name, category, attribute="value", profile_key="@DEFAULT@"): d = self.root.callRemote("param_get_a", name, category, attribute, profile_key) d.addErrback(self._errback)