Mercurial > libervia-backend
view libervia/cli/cmd_notifications.py @ 4348:35d41de5b2aa default tip @
doc (component): document use of Gateway Relayed Encryption:
fix 455
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 13 Jan 2025 01:23:22 +0100 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia CLI # Copyright (C) 2009-2023 JΓ©rΓ΄me Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core.i18n import _ from libervia.backend.memory.memory import ( NotificationPriority, NotificationStatus, NotificationType, ) from libervia.backend.tools.common import data_format, date_utils from libervia.cli.constants import Const as C from rich.live import Live from rich.table import Table from rich.text import Text from . import base __commands__ = ["Notification"] class Add(base.CommandBase): """Create and broadcast a notification""" def __init__(self, host): super(Add, self).__init__( host, "add", use_verbose=True, help=_("create and broadcast a notification") ) def add_parser_options(self): self.parser.add_argument( "type", choices=[e.name for e in NotificationType], help=_("notification type (default: %(default)s)"), ) self.parser.add_argument( "body_plain", help=_("plain text body of the notification") ) # TODO: # self.parser.add_argument( # "-r", "--body-rich", default="", help=_("rich text body of the notification") # ) self.parser.add_argument( "-t", "--title", default="", help=_("title of the notification") ) self.parser.add_argument( "-g", "--is-global", action="store_true", help=_("indicates if the notification is for all profiles"), ) # TODO: # self.parser.add_argument( # "--requires-action", # action="store_true", # help=_("indicates if the notification requires action"), # ) self.parser.add_argument( "-P", "--priority", default="MEDIUM", choices=[p.name for p in NotificationPriority], help=_("priority level of the notification (default: %(default)s)"), ) self.parser.add_argument( "-e", "--expire-at", type=base.date_decoder, default=0, help=_( "expiration timestamp for the notification (optional, can be 0 for none)" ), ) async def start(self): try: await self.host.bridge.notification_add( self.args.type, self.args.body_plain, "", # TODO: self.args.body_rich or "", self.args.title or "", self.args.is_global, False, # TODO: self.args.requires_action, self.args.priority, self.args.expire_at, "", self.profile, ) except Exception as e: self.disp(f"can't add notification: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.disp("Notification added.") self.host.quit() class Get(base.CommandBase): """Get available notifications""" def __init__(self, host): super(Get, self).__init__( host, "get", use_output=C.OUTPUT_LIST_DICT, extra_outputs={"default": self.default_output}, help=_("display notifications"), ) def add_parser_options(self): self.parser.add_argument( "-f", "--follow", action="store_true", help=_("wait and print incoming notifications"), ) self.parser.add_argument( "-t", "--type", type=str, choices=[t.name for t in NotificationType], help=_("filter by type of the notification"), ) self.parser.add_argument( "-s", "--status", type=str, choices=[s.name for s in NotificationStatus], help=_("filter by status of the notification"), ) self.parser.add_argument( "-a", "--requires-action", type=C.bool, default=None, help=_( "filter notifications that require (or not) user action, true by " "default, don't filter if omitted" ), ) self.parser.add_argument( "-P", "--min-priority", type=str, choices=[p.name for p in NotificationPriority], help=_("filter notifications with at least the specified priority"), ) def create_table(self): table = Table(box=None, show_header=False, collapse_padding=True) table.add_column("is_new") table.add_column("type") table.add_column("id") table.add_column("priority") table.add_column("timestamp") table.add_column("body") return table def default_output(self, notifs): if self.args.follow: if self.live is None: self.table = table = self.create_table() self.live = Live(table, auto_refresh=False, console=self.console) self.host.add_on_quit_callback(self.live.stop) self.live.start() else: table = self.table else: table = self.create_table() for notif in notifs: emoji_mapper = { "chat": "π¬", "blog": "π", "calendar": "π ", "file": "π", "call": "π", "service": "π’", "other": "π£", } emoji = emoji_mapper[notif.get("type", "other")] notif_id = Text(notif["id"]) created = date_utils.date_fmt(notif["timestamp"], tz_info=date_utils.TZ_LOCAL) priority_name = NotificationPriority(notif["priority"]).name.lower() priority = Text(f"[{priority_name}]", style=f"priority_{priority_name}") body_parts = [] title = notif.get("title") if title: body_parts.append((f"{title}\n", "notif_title")) body_parts.append(notif["body_plain"]) body = Text.assemble(*body_parts) new_flag = "π " if notif.get("new") else "" table.add_row(new_flag, emoji, notif_id, created, priority, body) if self.args.follow: self.live.refresh() else: self.print(table) async def on_notification_new( self, id_: str, timestamp: float, type_: str, body_plain: str, body_rich: str, title: str, requires_action: bool, priority: int, expire_at: float, extra: str, profile: str, ) -> None: """Callback when a new notification is emitted.""" notification_data = { "id": id_, "timestamp": timestamp, "type": type_, "body_plain": body_plain, "body_rich": body_rich, "title": title, "requires_action": requires_action, "priority": priority, "expire_at": expire_at, "extra": data_format.deserialise(extra), "profile": profile, "new": True, } await self.output([notification_data]) async def start(self): keys = ["type", "status", "requires_action", "min_priority"] filters = { key: getattr(self.args, key) for key in keys if getattr(self.args, key) } try: notifications = data_format.deserialise( await self.host.bridge.notifications_get( data_format.serialise(filters), self.profile, ), type_check=list, ) except Exception as e: self.disp(f"can't get notifications: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.live = None await self.output(notifications) if self.args.follow: self.host.bridge.register_signal( "notification_new", self.on_notification_new, "core" ) else: self.host.quit() class Delete(base.CommandBase): """Delete a notification""" def __init__(self, host): super(Delete, self).__init__( host, "delete", use_verbose=True, help=_("delete a notification") ) def add_parser_options(self): self.parser.add_argument( "id", help=_("ID of the notification to delete"), ) self.parser.add_argument( "-g", "--is-global", action="store_true", help=_("true if the notification is a global one"), ) self.parser.add_argument( "--profile-key", default="@ALL@", help=_("Profile key (use '@ALL@' for all profiles, default: %(default)s)"), ) async def start(self): try: await self.host.bridge.notification_delete( self.args.id, self.args.is_global, self.profile ) except Exception as e: self.disp(f"can't delete notification: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.disp("Notification deleted.") self.host.quit() class Expire(base.CommandBase): """Clean expired notifications""" def __init__(self, host): super(Expire, self).__init__( host, "expire", use_verbose=True, help=_("clean expired notifications") ) def add_parser_options(self): self.parser.add_argument( "-l", "--limit", type=base.date_decoder, metavar="TIME_PATTERN", help=_("time limit for older notifications. default: no limit used)"), ) self.parser.add_argument( "-a", "--all", action="store_true", help=_( "expire notifications for all profiles (default: use current profile)" ), ) async def start(self): try: await self.host.bridge.notifications_expired_clean( -1.0 if self.args.limit is None else self.args.limit, C.PROF_KEY_NONE if self.args.all else self.profile, ) except Exception as e: self.disp(f"can't clean expired notifications: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.disp("Expired notifications cleaned.") self.host.quit() class Notification(base.CommandBase): subcommands = (Add, Get, Delete, Expire) def __init__(self, host): super(Notification, self).__init__( host, "notification", use_profile=False, help=_("Notifications handling") )