view libervia/cli/cmd_notifications.py @ 4230:314d3c02bb67

core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:21:04 +0200
parents 8d361adf0ee1
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia CLI
# Copyright (C) 2009-2023 JΓ©rΓ΄me Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from libervia.backend.core.i18n import _
from libervia.backend.memory.memory import (
    NotificationPriority,
    NotificationStatus,
    NotificationType,
)
from libervia.backend.tools.common import data_format, date_utils
from libervia.cli.constants import Const as C
from rich.live import Live
from rich.table import Table
from rich.text import Text

from . import base

__commands__ = ["Notification"]


class Add(base.CommandBase):
    """Create and broadcast a notification"""

    def __init__(self, host):
        super(Add, self).__init__(
            host, "add", use_verbose=True, help=_("create and broadcast a notification")
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "type",
            choices=[e.name for e in NotificationType],
            help=_("notification type (default: %(default)s)"),
        )

        self.parser.add_argument(
            "body_plain", help=_("plain text body of the notification")
        )

        # TODO:
        # self.parser.add_argument(
        #     "-r", "--body-rich", default="", help=_("rich text body of the notification")
        # )

        self.parser.add_argument(
            "-t", "--title", default="", help=_("title of the notification")
        )

        self.parser.add_argument(
            "-g",
            "--is-global",
            action="store_true",
            help=_("indicates if the notification is for all profiles"),
        )

        # TODO:
        # self.parser.add_argument(
        #     "--requires-action",
        #     action="store_true",
        #     help=_("indicates if the notification requires action"),
        # )

        self.parser.add_argument(
            "-P",
            "--priority",
            default="MEDIUM",
            choices=[p.name for p in NotificationPriority],
            help=_("priority level of the notification (default: %(default)s)"),
        )

        self.parser.add_argument(
            "-e",
            "--expire-at",
            type=base.date_decoder,
            default=0,
            help=_(
                "expiration timestamp for the notification (optional, can be 0 for none)"
            ),
        )

    async def start(self):
        try:
            await self.host.bridge.notification_add(
                self.args.type,
                self.args.body_plain,
                "", # TODO: self.args.body_rich or "",
                self.args.title or "",
                self.args.is_global,
                False, # TODO: self.args.requires_action,
                self.args.priority,
                self.args.expire_at,
                "",
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't add notification: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("Notification added.")

            self.host.quit()


class Get(base.CommandBase):
    """Get available notifications"""

    def __init__(self, host):
        super(Get, self).__init__(
            host,
            "get",
            use_output=C.OUTPUT_LIST_DICT,
            extra_outputs={"default": self.default_output},
            help=_("display notifications"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f",
            "--follow",
            action="store_true",
            help=_("wait and print incoming notifications"),
        )

        self.parser.add_argument(
            "-t",
            "--type",
            type=str,
            choices=[t.name for t in NotificationType],
            help=_("filter by type of the notification"),
        )

        self.parser.add_argument(
            "-s",
            "--status",
            type=str,
            choices=[s.name for s in NotificationStatus],
            help=_("filter by status of the notification"),
        )

        self.parser.add_argument(
            "-a",
            "--requires-action",
            type=C.bool,
            default=None,
            help=_(
                "filter notifications that require (or not) user action, true by "
                "default, don't filter if omitted"
            ),
        )

        self.parser.add_argument(
            "-P",
            "--min-priority",
            type=str,
            choices=[p.name for p in NotificationPriority],
            help=_("filter notifications with at least the specified priority"),
        )

    def create_table(self):
        table = Table(box=None, show_header=False, collapse_padding=True)
        table.add_column("is_new")
        table.add_column("type")
        table.add_column("id")
        table.add_column("priority")
        table.add_column("timestamp")
        table.add_column("body")
        return table

    def default_output(self, notifs):
        if self.args.follow:
            if self.live is None:
                self.table = table = self.create_table()
                self.live = Live(table, auto_refresh=False, console=self.console)
                self.host.add_on_quit_callback(self.live.stop)
                self.live.start()
            else:
                table = self.table
        else:
            table = self.create_table()

        for notif in notifs:
            emoji_mapper = {
                "chat": "πŸ’¬",
                "blog": "πŸ“",
                "calendar": "πŸ“…",
                "file": "πŸ“‚",
                "call": "πŸ“ž",
                "service": "πŸ“’",
                "other": "🟣",
            }
            emoji = emoji_mapper[notif.get("type", "other")]
            notif_id = Text(notif["id"])
            created = date_utils.date_fmt(notif["timestamp"], tz_info=date_utils.TZ_LOCAL)

            priority_name = NotificationPriority(notif["priority"]).name.lower()

            priority = Text(f"[{priority_name}]", style=f"priority_{priority_name}")

            body_parts = []
            title = notif.get("title")
            if title:
                body_parts.append((f"{title}\n", "notif_title"))
            body_parts.append(notif["body_plain"])
            body = Text.assemble(*body_parts)

            new_flag = "🌟 " if notif.get("new") else ""
            table.add_row(new_flag, emoji, notif_id, created, priority, body)

        if self.args.follow:
            self.live.refresh()
        else:
            self.print(table)

    async def on_notification_new(
        self,
        id_: str,
        timestamp: float,
        type_: str,
        body_plain: str,
        body_rich: str,
        title: str,
        requires_action: bool,
        priority: int,
        expire_at: float,
        extra: str,
        profile: str,
    ) -> None:
        """Callback when a new notification is emitted."""
        notification_data = {
            "id": id_,
            "timestamp": timestamp,
            "type": type_,
            "body_plain": body_plain,
            "body_rich": body_rich,
            "title": title,
            "requires_action": requires_action,
            "priority": priority,
            "expire_at": expire_at,
            "extra": data_format.deserialise(extra),
            "profile": profile,
            "new": True,
        }

        await self.output([notification_data])

    async def start(self):
        keys = ["type", "status", "requires_action", "min_priority"]
        filters = {
            key: getattr(self.args, key) for key in keys if getattr(self.args, key)
        }
        try:
            notifications = data_format.deserialise(
                await self.host.bridge.notifications_get(
                    data_format.serialise(filters),
                    self.profile,
                ),
                type_check=list,
            )
        except Exception as e:
            self.disp(f"can't get notifications: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.live = None
            await self.output(notifications)
            if self.args.follow:
                self.host.bridge.register_signal(
                    "notification_new", self.on_notification_new, "core"
                )
            else:
                self.host.quit()


class Delete(base.CommandBase):
    """Delete a notification"""

    def __init__(self, host):
        super(Delete, self).__init__(
            host, "delete", use_verbose=True, help=_("delete a notification")
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "id",
            help=_("ID of the notification to delete"),
        )

        self.parser.add_argument(
            "-g", "--is-global",
            action="store_true",
            help=_("true if the notification is a global one"),
        )

        self.parser.add_argument(
            "--profile-key",
            default="@ALL@",
            help=_("Profile key (use '@ALL@' for all profiles, default: %(default)s)"),
        )

    async def start(self):
        try:
            await self.host.bridge.notification_delete(
                self.args.id, self.args.is_global, self.profile
            )
        except Exception as e:
            self.disp(f"can't delete notification: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("Notification deleted.")
            self.host.quit()


class Expire(base.CommandBase):
    """Clean expired notifications"""

    def __init__(self, host):
        super(Expire, self).__init__(
            host, "expire", use_verbose=True, help=_("clean expired notifications")
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-l",
            "--limit",
            type=base.date_decoder,
            metavar="TIME_PATTERN",
            help=_("time limit for older notifications. default: no limit used)"),
        )
        self.parser.add_argument(
            "-a",
            "--all",
            action="store_true",
            help=_(
                "expire notifications for all profiles (default: use current profile)"
            ),
        )

    async def start(self):
        try:
            await self.host.bridge.notifications_expired_clean(
                -1.0 if self.args.limit is None else self.args.limit,
                C.PROF_KEY_NONE if self.args.all else self.profile,
            )
        except Exception as e:
            self.disp(f"can't clean expired notifications: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("Expired notifications cleaned.")
            self.host.quit()


class Notification(base.CommandBase):
    subcommands = (Add, Get, Delete, Expire)

    def __init__(self, host):
        super(Notification, self).__init__(
            host, "notification", use_profile=False, help=_("Notifications handling")
        )