diff libervia/cli/cmd_event.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_event.py@26b7ed2817da
children e9d800b105c1
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/cmd_event.py	Fri Jun 02 14:54:26 2023 +0200
@@ -0,0 +1,755 @@
+#!/usr/bin/env python3
+
+
+# libervia-cli: Libervia CLI frontend
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import argparse
+import sys
+
+from sqlalchemy import desc
+
+from libervia.backend.core.i18n import _
+from libervia.backend.core.i18n import _
+from libervia.backend.tools.common import data_format
+from libervia.backend.tools.common import data_format
+from libervia.backend.tools.common import date_utils
+from libervia.backend.tools.common.ansi import ANSI as A
+from libervia.backend.tools.common.ansi import ANSI as A
+from libervia.cli import common
+from libervia.cli.constants import Const as C
+from libervia.cli.constants import Const as C
+
+from . import base
+
+__commands__ = ["Event"]
+
+OUTPUT_OPT_TABLE = "table"
+
+
+class Get(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "get",
+            use_output=C.OUTPUT_LIST_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.MULTI_ITEMS, C.CACHE},
+            use_verbose=True,
+            extra_outputs={
+                "default": self.default_output,
+            },
+            help=_("get event(s) data"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            events_data_s = await self.host.bridge.events_get(
+                self.args.service,
+                self.args.node,
+                self.args.items,
+                self.get_pubsub_extra(),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get events data: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            events_data = data_format.deserialise(events_data_s, type_check=list)
+            await self.output(events_data)
+            self.host.quit()
+
+    def default_output(self, events):
+        nb_events = len(events)
+        for idx, event in enumerate(events):
+            names = event["name"]
+            name = names.get("") or next(iter(names.values()))
+            start = event["start"]
+            start_human = date_utils.date_fmt(
+                start, "medium", tz_info=date_utils.TZ_LOCAL
+            )
+            end = event["end"]
+            self.disp(A.color(
+                A.BOLD, start_human, A.RESET, " ",
+                f"({date_utils.delta2human(start, end)}) ",
+                C.A_HEADER, name
+            ))
+            if self.verbosity > 0:
+                descriptions = event.get("descriptions", [])
+                if descriptions:
+                    self.disp(descriptions[0]["description"])
+            if idx < (nb_events-1):
+                self.disp("")
+
+
+class CategoryAction(argparse.Action):
+
+    def __init__(self, option_strings, dest, nargs=None, metavar=None, **kwargs):
+        if nargs is not None or metavar is not None:
+            raise ValueError("nargs and metavar must not be used")
+        if metavar is not None:
+            metavar="TERM WIKIDATA_ID LANG"
+        if "--help" in sys.argv:
+            # FIXME: dirty workaround to have correct --help message
+            #   argparse doesn't normally allow variable number of arguments beside "+"
+            #   and "*", this workaround show METAVAR as 3 arguments were expected, while
+            #   we can actuall use 1, 2 or 3.
+            nargs = 3
+            metavar = ("TERM", "[WIKIDATA_ID]", "[LANG]")
+        else:
+            nargs = "+"
+
+        super().__init__(option_strings, dest, metavar=metavar, nargs=nargs, **kwargs)
+
+    def __call__(self, parser, namespace, values, option_string=None):
+        categories = getattr(namespace, self.dest)
+        if categories is None:
+            categories = []
+            setattr(namespace, self.dest, categories)
+
+        if not values:
+            parser.error("category values must be set")
+
+        category = {
+            "term": values[0]
+        }
+
+        if len(values) == 1:
+            pass
+        elif len(values) == 2:
+            value = values[1]
+            if value.startswith("Q"):
+                category["wikidata_id"] = value
+            else:
+                category["language"] = value
+        elif len(values) == 3:
+            __, wd, lang = values
+            category["wikidata_id"] = wd
+            category["language"] = lang
+        else:
+            parser.error("Category can't have more than 3 arguments")
+
+        categories.append(category)
+
+
+class EventBase:
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-S", "--start", type=base.date_decoder, metavar="TIME_PATTERN",
+            help=_("the start time of the event"))
+        end_group = self.parser.add_mutually_exclusive_group()
+        end_group.add_argument(
+            "-E", "--end", type=base.date_decoder, metavar="TIME_PATTERN",
+            help=_("the time of the end of the event"))
+        end_group.add_argument(
+            "-D", "--duration", help=_("duration of the event"))
+        self.parser.add_argument(
+            "-H", "--head-picture", help="URL to a picture to use as head-picture"
+        )
+        self.parser.add_argument(
+            "-d", "--description", help="plain text description the event"
+        )
+        self.parser.add_argument(
+            "-C", "--category", action=CategoryAction, dest="categories",
+            help="Category of the event"
+        )
+        self.parser.add_argument(
+            "-l", "--location", action="append", nargs="+", metavar="[KEY] VALUE",
+            help="Location metadata"
+        )
+        rsvp_group = self.parser.add_mutually_exclusive_group()
+        rsvp_group.add_argument(
+            "--rsvp", action="store_true", help=_("RSVP is requested"))
+        rsvp_group.add_argument(
+            "--rsvp_json", metavar="JSON", help=_("JSON description of RSVP form"))
+        for node_type in ("invitees", "comments", "blog", "schedule"):
+            self.parser.add_argument(
+                f"--{node_type}",
+                nargs=2,
+                metavar=("JID", "NODE"),
+                help=_("link {node_type} pubsub node").format(node_type=node_type)
+            )
+        self.parser.add_argument(
+            "-a", "--attachment", action="append", dest="attachments",
+            help=_("attach a file")
+        )
+        self.parser.add_argument("--website", help=_("website of the event"))
+        self.parser.add_argument(
+            "--status", choices=["confirmed", "tentative", "cancelled"],
+            help=_("status of the event")
+        )
+        self.parser.add_argument(
+            "-T", "--language", metavar="LANG", action="append", dest="languages",
+            help=_("main languages spoken at the event")
+        )
+        self.parser.add_argument(
+            "--wheelchair", choices=["full", "partial", "no"],
+            help=_("is the location accessible by wheelchair")
+        )
+        self.parser.add_argument(
+            "--external",
+            nargs=3,
+            metavar=("JID", "NODE", "ITEM"),
+            help=_("link to an external event")
+        )
+
+    def get_event_data(self):
+        if self.args.duration is not None:
+            if self.args.start is None:
+                self.parser.error("--start must be send if --duration is used")
+            # if duration is used, we simply add it to start time to get end time
+            self.args.end = base.date_decoder(f"{self.args.start} + {self.args.duration}")
+
+        event = {}
+        if self.args.name is not None:
+            event["name"] = {"": self.args.name}
+
+        if self.args.start is not None:
+            event["start"] = self.args.start
+
+        if self.args.end is not None:
+            event["end"] = self.args.end
+
+        if self.args.head_picture:
+            event["head-picture"] = {
+                "sources": [{
+                    "url": self.args.head_picture
+                }]
+            }
+        if self.args.description:
+            event["descriptions"] = [
+                {
+                    "type": "text",
+                    "description": self.args.description
+                }
+            ]
+        if self.args.categories:
+            event["categories"] = self.args.categories
+        if self.args.location is not None:
+            location = {}
+            for location_data in self.args.location:
+                if len(location_data) == 1:
+                    location["description"] = location_data[0]
+                else:
+                    key, *values = location_data
+                    location[key] = " ".join(values)
+            event["locations"] = [location]
+
+        if self.args.rsvp:
+            event["rsvp"] = [{}]
+        elif self.args.rsvp_json:
+            if isinstance(self.args.rsvp_elt, dict):
+                event["rsvp"] = [self.args.rsvp_json]
+            else:
+                event["rsvp"] = self.args.rsvp_json
+
+        for node_type in ("invitees", "comments", "blog", "schedule"):
+            value = getattr(self.args, node_type)
+            if value:
+                service, node = value
+                event[node_type] = {"service": service, "node": node}
+
+        if self.args.attachments:
+            attachments = event["attachments"] = []
+            for attachment in self.args.attachments:
+                attachments.append({
+                    "sources": [{"url": attachment}]
+                })
+
+        extra = {}
+
+        for arg in ("website", "status", "languages"):
+            value = getattr(self.args, arg)
+            if value is not None:
+                extra[arg] = value
+        if self.args.wheelchair is not None:
+            extra["accessibility"] = {"wheelchair": self.args.wheelchair}
+
+        if extra:
+            event["extra"] = extra
+
+        if self.args.external:
+            ext_jid, ext_node, ext_item = self.args.external
+            event["external"] = {
+                "jid": ext_jid,
+                "node": ext_node,
+                "item": ext_item
+            }
+        return event
+
+
+class Create(EventBase, base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "create",
+            use_pubsub=True,
+            help=_("create or replace event"),
+        )
+
+    def add_parser_options(self):
+        super().add_parser_options()
+        self.parser.add_argument(
+            "-i",
+            "--id",
+            default="",
+            help=_("ID of the PubSub Item"),
+        )
+        # name is mandatory here
+        self.parser.add_argument("name", help=_("name of the event"))
+
+    async def start(self):
+        if self.args.start is None:
+            self.parser.error("--start must be set")
+        event_data = self.get_event_data()
+        # we check self.args.end after get_event_data because it may be set there id
+        # --duration is used
+        if self.args.end is None:
+            self.parser.error("--end or --duration must be set")
+        try:
+            await self.host.bridge.event_create(
+                data_format.serialise(event_data),
+                self.args.id,
+                self.args.node,
+                self.args.service,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't create event: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("Event created successfuly)"))
+            self.host.quit()
+
+
+class Modify(EventBase, base.CommandBase):
+    def __init__(self, host):
+        super(Modify, self).__init__(
+            host,
+            "modify",
+            use_pubsub=True,
+            pubsub_flags={C.SINGLE_ITEM},
+            help=_("modify an existing event"),
+        )
+        EventBase.__init__(self)
+
+    def add_parser_options(self):
+        super().add_parser_options()
+        # name is optional here
+        self.parser.add_argument("-N", "--name", help=_("name of the event"))
+
+    async def start(self):
+        event_data = self.get_event_data()
+        try:
+            await self.host.bridge.event_modify(
+                data_format.serialise(event_data),
+                self.args.item,
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't update event data: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.host.quit()
+
+
+class InviteeGet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "get",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.SINGLE_ITEM},
+            use_verbose=True,
+            help=_("get event attendance"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-j", "--jid", action="append", dest="jids", default=[],
+            help=_("only retrieve RSVP from those JIDs")
+        )
+
+    async def start(self):
+        try:
+            event_data_s = await self.host.bridge.event_invitee_get(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                self.args.jids,
+                "",
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get event data: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            event_data = data_format.deserialise(event_data_s)
+            await self.output(event_data)
+            self.host.quit()
+
+
+class InviteeSet(base.CommandBase):
+    def __init__(self, host):
+        super(InviteeSet, self).__init__(
+            host,
+            "set",
+            use_pubsub=True,
+            pubsub_flags={C.SINGLE_ITEM},
+            help=_("set event attendance"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f",
+            "--field",
+            action="append",
+            nargs=2,
+            dest="fields",
+            metavar=("KEY", "VALUE"),
+            help=_("configuration field to set"),
+        )
+
+    async def start(self):
+        # TODO: handle RSVP with XMLUI in a similar way as for `ad-hoc run`
+        fields = dict(self.args.fields) if self.args.fields else {}
+        try:
+            self.host.bridge.event_invitee_set(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                data_format.serialise(fields),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set event data: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.host.quit()
+
+
+class InviteesList(base.CommandBase):
+    def __init__(self, host):
+        extra_outputs = {"default": self.default_output}
+        base.CommandBase.__init__(
+            self,
+            host,
+            "list",
+            use_output=C.OUTPUT_DICT_DICT,
+            extra_outputs=extra_outputs,
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("get event attendance"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-m",
+            "--missing",
+            action="store_true",
+            help=_("show missing people (invited but no R.S.V.P. so far)"),
+        )
+        self.parser.add_argument(
+            "-R",
+            "--no-rsvp",
+            action="store_true",
+            help=_("don't show people which gave R.S.V.P."),
+        )
+
+    def _attend_filter(self, attend, row):
+        if attend == "yes":
+            attend_color = C.A_SUCCESS
+        elif attend == "no":
+            attend_color = C.A_FAILURE
+        else:
+            attend_color = A.FG_WHITE
+        return A.color(attend_color, attend)
+
+    def _guests_filter(self, guests):
+        return "(" + str(guests) + ")" if guests else ""
+
+    def default_output(self, event_data):
+        data = []
+        attendees_yes = 0
+        attendees_maybe = 0
+        attendees_no = 0
+        attendees_missing = 0
+        guests = 0
+        guests_maybe = 0
+        for jid_, jid_data in event_data.items():
+            jid_data["jid"] = jid_
+            try:
+                guests_int = int(jid_data["guests"])
+            except (ValueError, KeyError):
+                pass
+            attend = jid_data.get("attend", "")
+            if attend == "yes":
+                attendees_yes += 1
+                guests += guests_int
+            elif attend == "maybe":
+                attendees_maybe += 1
+                guests_maybe += guests_int
+            elif attend == "no":
+                attendees_no += 1
+                jid_data["guests"] = ""
+            else:
+                attendees_missing += 1
+                jid_data["guests"] = ""
+            data.append(jid_data)
+
+        show_table = OUTPUT_OPT_TABLE in self.args.output_opts
+
+        table = common.Table.from_list_dict(
+            self.host,
+            data,
+            ("nick",) + (("jid",) if self.host.verbosity else ()) + ("attend", "guests"),
+            headers=None,
+            filters={
+                "nick": A.color(C.A_HEADER, "{}" if show_table else "{} "),
+                "jid": "{}" if show_table else "{} ",
+                "attend": self._attend_filter,
+                "guests": "{}" if show_table else self._guests_filter,
+            },
+            defaults={"nick": "", "attend": "", "guests": 1},
+        )
+        if show_table:
+            table.display()
+        else:
+            table.display_blank(show_header=False, col_sep="")
+
+        if not self.args.no_rsvp:
+            self.disp("")
+            self.disp(
+                A.color(
+                    C.A_SUBHEADER,
+                    _("Attendees: "),
+                    A.RESET,
+                    str(len(data)),
+                    _(" ("),
+                    C.A_SUCCESS,
+                    _("yes: "),
+                    str(attendees_yes),
+                    A.FG_WHITE,
+                    _(", maybe: "),
+                    str(attendees_maybe),
+                    ", ",
+                    C.A_FAILURE,
+                    _("no: "),
+                    str(attendees_no),
+                    A.RESET,
+                    ")",
+                )
+            )
+            self.disp(
+                A.color(C.A_SUBHEADER, _("confirmed guests: "), A.RESET, str(guests))
+            )
+            self.disp(
+                A.color(
+                    C.A_SUBHEADER,
+                    _("unconfirmed guests: "),
+                    A.RESET,
+                    str(guests_maybe),
+                )
+            )
+            self.disp(
+                A.color(C.A_SUBHEADER, _("total: "), A.RESET, str(guests + guests_maybe))
+            )
+        if attendees_missing:
+            self.disp("")
+            self.disp(
+                A.color(
+                    C.A_SUBHEADER,
+                    _("missing people (no reply): "),
+                    A.RESET,
+                    str(attendees_missing),
+                )
+            )
+
+    async def start(self):
+        if self.args.no_rsvp and not self.args.missing:
+            self.parser.error(_("you need to use --missing if you use --no-rsvp"))
+        if not self.args.missing:
+            prefilled = {}
+        else:
+            # we get prefilled data with all people
+            try:
+                affiliations = await self.host.bridge.ps_node_affiliations_get(
+                    self.args.service,
+                    self.args.node,
+                    self.profile,
+                )
+            except Exception as e:
+                self.disp(f"can't get node affiliations: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+            else:
+                # we fill all affiliations with empty data, answered one will be filled
+                # below. We only consider people with "publisher" affiliation as invited,
+                # creators are not, and members can just observe
+                prefilled = {
+                    jid_: {}
+                    for jid_, affiliation in affiliations.items()
+                    if affiliation in ("publisher",)
+                }
+
+        try:
+            event_data = await self.host.bridge.event_invitees_list(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get event data: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+
+            # we fill nicknames and keep only requested people
+
+        if self.args.no_rsvp:
+            for jid_ in event_data:
+                # if there is a jid in event_data it must be there in prefilled too
+                # otherwie somebody is not on the invitees list
+                try:
+                    del prefilled[jid_]
+                except KeyError:
+                    self.disp(
+                        A.color(
+                            C.A_WARNING,
+                            f"We got a RSVP from somebody who was not in invitees "
+                            f"list: {jid_}",
+                        ),
+                        error=True,
+                    )
+        else:
+            # we replace empty dicts for existing people with R.S.V.P. data
+            prefilled.update(event_data)
+
+            # we get nicknames for everybody, make it easier for organisers
+        for jid_, data in prefilled.items():
+            id_data = await self.host.bridge.identity_get(jid_, [], True, self.profile)
+            id_data = data_format.deserialise(id_data)
+            data["nick"] = id_data["nicknames"][0]
+
+        await self.output(prefilled)
+        self.host.quit()
+
+
+class InviteeInvite(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "invite",
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.SINGLE_ITEM},
+            help=_("invite someone to the event through email"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-e",
+            "--email",
+            action="append",
+            default=[],
+            help="email(s) to send the invitation to",
+        )
+        self.parser.add_argument(
+            "-N",
+            "--name",
+            default="",
+            help="name of the invitee",
+        )
+        self.parser.add_argument(
+            "-H",
+            "--host-name",
+            default="",
+            help="name of the host",
+        )
+        self.parser.add_argument(
+            "-l",
+            "--lang",
+            default="",
+            help="main language spoken by the invitee",
+        )
+        self.parser.add_argument(
+            "-U",
+            "--url-template",
+            default="",
+            help="template to construct the URL",
+        )
+        self.parser.add_argument(
+            "-S",
+            "--subject",
+            default="",
+            help="subject of the invitation email (default: generic subject)",
+        )
+        self.parser.add_argument(
+            "-b",
+            "--body",
+            default="",
+            help="body of the invitation email (default: generic body)",
+        )
+
+    async def start(self):
+        email = self.args.email[0] if self.args.email else None
+        emails_extra = self.args.email[1:]
+
+        try:
+            await self.host.bridge.event_invite_by_email(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                email,
+                emails_extra,
+                self.args.name,
+                self.args.host_name,
+                self.args.lang,
+                self.args.url_template,
+                self.args.subject,
+                self.args.body,
+                self.args.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't create invitation: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.host.quit()
+
+
+class Invitee(base.CommandBase):
+    subcommands = (InviteeGet, InviteeSet, InviteesList, InviteeInvite)
+
+    def __init__(self, host):
+        super(Invitee, self).__init__(
+            host, "invitee", use_profile=False, help=_("manage invities")
+        )
+
+
+class Event(base.CommandBase):
+    subcommands = (Get, Create, Modify, Invitee)
+
+    def __init__(self, host):
+        super(Event, self).__init__(
+            host, "event", use_profile=False, help=_("event management")
+        )