Mercurial > libervia-backend
diff libervia/cli/cmd_event.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | libervia/frontends/jp/cmd_event.py@26b7ed2817da |
children | e9d800b105c1 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/cli/cmd_event.py Fri Jun 02 14:54:26 2023 +0200 @@ -0,0 +1,755 @@ +#!/usr/bin/env python3 + + +# libervia-cli: Libervia CLI frontend +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import argparse +import sys + +from sqlalchemy import desc + +from libervia.backend.core.i18n import _ +from libervia.backend.core.i18n import _ +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common import date_utils +from libervia.backend.tools.common.ansi import ANSI as A +from libervia.backend.tools.common.ansi import ANSI as A +from libervia.cli import common +from libervia.cli.constants import Const as C +from libervia.cli.constants import Const as C + +from . import base + +__commands__ = ["Event"] + +OUTPUT_OPT_TABLE = "table" + + +class Get(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "get", + use_output=C.OUTPUT_LIST_DICT, + use_pubsub=True, + pubsub_flags={C.MULTI_ITEMS, C.CACHE}, + use_verbose=True, + extra_outputs={ + "default": self.default_output, + }, + help=_("get event(s) data"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + events_data_s = await self.host.bridge.events_get( + self.args.service, + self.args.node, + self.args.items, + self.get_pubsub_extra(), + self.profile, + ) + except Exception as e: + self.disp(f"can't get events data: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + events_data = data_format.deserialise(events_data_s, type_check=list) + await self.output(events_data) + self.host.quit() + + def default_output(self, events): + nb_events = len(events) + for idx, event in enumerate(events): + names = event["name"] + name = names.get("") or next(iter(names.values())) + start = event["start"] + start_human = date_utils.date_fmt( + start, "medium", tz_info=date_utils.TZ_LOCAL + ) + end = event["end"] + self.disp(A.color( + A.BOLD, start_human, A.RESET, " ", + f"({date_utils.delta2human(start, end)}) ", + C.A_HEADER, name + )) + if self.verbosity > 0: + descriptions = event.get("descriptions", []) + if descriptions: + self.disp(descriptions[0]["description"]) + if idx < (nb_events-1): + self.disp("") + + +class CategoryAction(argparse.Action): + + def __init__(self, option_strings, dest, nargs=None, metavar=None, **kwargs): + if nargs is not None or metavar is not None: + raise ValueError("nargs and metavar must not be used") + if metavar is not None: + metavar="TERM WIKIDATA_ID LANG" + if "--help" in sys.argv: + # FIXME: dirty workaround to have correct --help message + # argparse doesn't normally allow variable number of arguments beside "+" + # and "*", this workaround show METAVAR as 3 arguments were expected, while + # we can actuall use 1, 2 or 3. + nargs = 3 + metavar = ("TERM", "[WIKIDATA_ID]", "[LANG]") + else: + nargs = "+" + + super().__init__(option_strings, dest, metavar=metavar, nargs=nargs, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + categories = getattr(namespace, self.dest) + if categories is None: + categories = [] + setattr(namespace, self.dest, categories) + + if not values: + parser.error("category values must be set") + + category = { + "term": values[0] + } + + if len(values) == 1: + pass + elif len(values) == 2: + value = values[1] + if value.startswith("Q"): + category["wikidata_id"] = value + else: + category["language"] = value + elif len(values) == 3: + __, wd, lang = values + category["wikidata_id"] = wd + category["language"] = lang + else: + parser.error("Category can't have more than 3 arguments") + + categories.append(category) + + +class EventBase: + def add_parser_options(self): + self.parser.add_argument( + "-S", "--start", type=base.date_decoder, metavar="TIME_PATTERN", + help=_("the start time of the event")) + end_group = self.parser.add_mutually_exclusive_group() + end_group.add_argument( + "-E", "--end", type=base.date_decoder, metavar="TIME_PATTERN", + help=_("the time of the end of the event")) + end_group.add_argument( + "-D", "--duration", help=_("duration of the event")) + self.parser.add_argument( + "-H", "--head-picture", help="URL to a picture to use as head-picture" + ) + self.parser.add_argument( + "-d", "--description", help="plain text description the event" + ) + self.parser.add_argument( + "-C", "--category", action=CategoryAction, dest="categories", + help="Category of the event" + ) + self.parser.add_argument( + "-l", "--location", action="append", nargs="+", metavar="[KEY] VALUE", + help="Location metadata" + ) + rsvp_group = self.parser.add_mutually_exclusive_group() + rsvp_group.add_argument( + "--rsvp", action="store_true", help=_("RSVP is requested")) + rsvp_group.add_argument( + "--rsvp_json", metavar="JSON", help=_("JSON description of RSVP form")) + for node_type in ("invitees", "comments", "blog", "schedule"): + self.parser.add_argument( + f"--{node_type}", + nargs=2, + metavar=("JID", "NODE"), + help=_("link {node_type} pubsub node").format(node_type=node_type) + ) + self.parser.add_argument( + "-a", "--attachment", action="append", dest="attachments", + help=_("attach a file") + ) + self.parser.add_argument("--website", help=_("website of the event")) + self.parser.add_argument( + "--status", choices=["confirmed", "tentative", "cancelled"], + help=_("status of the event") + ) + self.parser.add_argument( + "-T", "--language", metavar="LANG", action="append", dest="languages", + help=_("main languages spoken at the event") + ) + self.parser.add_argument( + "--wheelchair", choices=["full", "partial", "no"], + help=_("is the location accessible by wheelchair") + ) + self.parser.add_argument( + "--external", + nargs=3, + metavar=("JID", "NODE", "ITEM"), + help=_("link to an external event") + ) + + def get_event_data(self): + if self.args.duration is not None: + if self.args.start is None: + self.parser.error("--start must be send if --duration is used") + # if duration is used, we simply add it to start time to get end time + self.args.end = base.date_decoder(f"{self.args.start} + {self.args.duration}") + + event = {} + if self.args.name is not None: + event["name"] = {"": self.args.name} + + if self.args.start is not None: + event["start"] = self.args.start + + if self.args.end is not None: + event["end"] = self.args.end + + if self.args.head_picture: + event["head-picture"] = { + "sources": [{ + "url": self.args.head_picture + }] + } + if self.args.description: + event["descriptions"] = [ + { + "type": "text", + "description": self.args.description + } + ] + if self.args.categories: + event["categories"] = self.args.categories + if self.args.location is not None: + location = {} + for location_data in self.args.location: + if len(location_data) == 1: + location["description"] = location_data[0] + else: + key, *values = location_data + location[key] = " ".join(values) + event["locations"] = [location] + + if self.args.rsvp: + event["rsvp"] = [{}] + elif self.args.rsvp_json: + if isinstance(self.args.rsvp_elt, dict): + event["rsvp"] = [self.args.rsvp_json] + else: + event["rsvp"] = self.args.rsvp_json + + for node_type in ("invitees", "comments", "blog", "schedule"): + value = getattr(self.args, node_type) + if value: + service, node = value + event[node_type] = {"service": service, "node": node} + + if self.args.attachments: + attachments = event["attachments"] = [] + for attachment in self.args.attachments: + attachments.append({ + "sources": [{"url": attachment}] + }) + + extra = {} + + for arg in ("website", "status", "languages"): + value = getattr(self.args, arg) + if value is not None: + extra[arg] = value + if self.args.wheelchair is not None: + extra["accessibility"] = {"wheelchair": self.args.wheelchair} + + if extra: + event["extra"] = extra + + if self.args.external: + ext_jid, ext_node, ext_item = self.args.external + event["external"] = { + "jid": ext_jid, + "node": ext_node, + "item": ext_item + } + return event + + +class Create(EventBase, base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "create", + use_pubsub=True, + help=_("create or replace event"), + ) + + def add_parser_options(self): + super().add_parser_options() + self.parser.add_argument( + "-i", + "--id", + default="", + help=_("ID of the PubSub Item"), + ) + # name is mandatory here + self.parser.add_argument("name", help=_("name of the event")) + + async def start(self): + if self.args.start is None: + self.parser.error("--start must be set") + event_data = self.get_event_data() + # we check self.args.end after get_event_data because it may be set there id + # --duration is used + if self.args.end is None: + self.parser.error("--end or --duration must be set") + try: + await self.host.bridge.event_create( + data_format.serialise(event_data), + self.args.id, + self.args.node, + self.args.service, + self.profile, + ) + except Exception as e: + self.disp(f"can't create event: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("Event created successfuly)")) + self.host.quit() + + +class Modify(EventBase, base.CommandBase): + def __init__(self, host): + super(Modify, self).__init__( + host, + "modify", + use_pubsub=True, + pubsub_flags={C.SINGLE_ITEM}, + help=_("modify an existing event"), + ) + EventBase.__init__(self) + + def add_parser_options(self): + super().add_parser_options() + # name is optional here + self.parser.add_argument("-N", "--name", help=_("name of the event")) + + async def start(self): + event_data = self.get_event_data() + try: + await self.host.bridge.event_modify( + data_format.serialise(event_data), + self.args.item, + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't update event data: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.host.quit() + + +class InviteeGet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "get", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + pubsub_flags={C.SINGLE_ITEM}, + use_verbose=True, + help=_("get event attendance"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-j", "--jid", action="append", dest="jids", default=[], + help=_("only retrieve RSVP from those JIDs") + ) + + async def start(self): + try: + event_data_s = await self.host.bridge.event_invitee_get( + self.args.service, + self.args.node, + self.args.item, + self.args.jids, + "", + self.profile, + ) + except Exception as e: + self.disp(f"can't get event data: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + event_data = data_format.deserialise(event_data_s) + await self.output(event_data) + self.host.quit() + + +class InviteeSet(base.CommandBase): + def __init__(self, host): + super(InviteeSet, self).__init__( + host, + "set", + use_pubsub=True, + pubsub_flags={C.SINGLE_ITEM}, + help=_("set event attendance"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", + "--field", + action="append", + nargs=2, + dest="fields", + metavar=("KEY", "VALUE"), + help=_("configuration field to set"), + ) + + async def start(self): + # TODO: handle RSVP with XMLUI in a similar way as for `ad-hoc run` + fields = dict(self.args.fields) if self.args.fields else {} + try: + self.host.bridge.event_invitee_set( + self.args.service, + self.args.node, + self.args.item, + data_format.serialise(fields), + self.profile, + ) + except Exception as e: + self.disp(f"can't set event data: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.host.quit() + + +class InviteesList(base.CommandBase): + def __init__(self, host): + extra_outputs = {"default": self.default_output} + base.CommandBase.__init__( + self, + host, + "list", + use_output=C.OUTPUT_DICT_DICT, + extra_outputs=extra_outputs, + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("get event attendance"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-m", + "--missing", + action="store_true", + help=_("show missing people (invited but no R.S.V.P. so far)"), + ) + self.parser.add_argument( + "-R", + "--no-rsvp", + action="store_true", + help=_("don't show people which gave R.S.V.P."), + ) + + def _attend_filter(self, attend, row): + if attend == "yes": + attend_color = C.A_SUCCESS + elif attend == "no": + attend_color = C.A_FAILURE + else: + attend_color = A.FG_WHITE + return A.color(attend_color, attend) + + def _guests_filter(self, guests): + return "(" + str(guests) + ")" if guests else "" + + def default_output(self, event_data): + data = [] + attendees_yes = 0 + attendees_maybe = 0 + attendees_no = 0 + attendees_missing = 0 + guests = 0 + guests_maybe = 0 + for jid_, jid_data in event_data.items(): + jid_data["jid"] = jid_ + try: + guests_int = int(jid_data["guests"]) + except (ValueError, KeyError): + pass + attend = jid_data.get("attend", "") + if attend == "yes": + attendees_yes += 1 + guests += guests_int + elif attend == "maybe": + attendees_maybe += 1 + guests_maybe += guests_int + elif attend == "no": + attendees_no += 1 + jid_data["guests"] = "" + else: + attendees_missing += 1 + jid_data["guests"] = "" + data.append(jid_data) + + show_table = OUTPUT_OPT_TABLE in self.args.output_opts + + table = common.Table.from_list_dict( + self.host, + data, + ("nick",) + (("jid",) if self.host.verbosity else ()) + ("attend", "guests"), + headers=None, + filters={ + "nick": A.color(C.A_HEADER, "{}" if show_table else "{} "), + "jid": "{}" if show_table else "{} ", + "attend": self._attend_filter, + "guests": "{}" if show_table else self._guests_filter, + }, + defaults={"nick": "", "attend": "", "guests": 1}, + ) + if show_table: + table.display() + else: + table.display_blank(show_header=False, col_sep="") + + if not self.args.no_rsvp: + self.disp("") + self.disp( + A.color( + C.A_SUBHEADER, + _("Attendees: "), + A.RESET, + str(len(data)), + _(" ("), + C.A_SUCCESS, + _("yes: "), + str(attendees_yes), + A.FG_WHITE, + _(", maybe: "), + str(attendees_maybe), + ", ", + C.A_FAILURE, + _("no: "), + str(attendees_no), + A.RESET, + ")", + ) + ) + self.disp( + A.color(C.A_SUBHEADER, _("confirmed guests: "), A.RESET, str(guests)) + ) + self.disp( + A.color( + C.A_SUBHEADER, + _("unconfirmed guests: "), + A.RESET, + str(guests_maybe), + ) + ) + self.disp( + A.color(C.A_SUBHEADER, _("total: "), A.RESET, str(guests + guests_maybe)) + ) + if attendees_missing: + self.disp("") + self.disp( + A.color( + C.A_SUBHEADER, + _("missing people (no reply): "), + A.RESET, + str(attendees_missing), + ) + ) + + async def start(self): + if self.args.no_rsvp and not self.args.missing: + self.parser.error(_("you need to use --missing if you use --no-rsvp")) + if not self.args.missing: + prefilled = {} + else: + # we get prefilled data with all people + try: + affiliations = await self.host.bridge.ps_node_affiliations_get( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get node affiliations: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + # we fill all affiliations with empty data, answered one will be filled + # below. We only consider people with "publisher" affiliation as invited, + # creators are not, and members can just observe + prefilled = { + jid_: {} + for jid_, affiliation in affiliations.items() + if affiliation in ("publisher",) + } + + try: + event_data = await self.host.bridge.event_invitees_list( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get event data: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + + # we fill nicknames and keep only requested people + + if self.args.no_rsvp: + for jid_ in event_data: + # if there is a jid in event_data it must be there in prefilled too + # otherwie somebody is not on the invitees list + try: + del prefilled[jid_] + except KeyError: + self.disp( + A.color( + C.A_WARNING, + f"We got a RSVP from somebody who was not in invitees " + f"list: {jid_}", + ), + error=True, + ) + else: + # we replace empty dicts for existing people with R.S.V.P. data + prefilled.update(event_data) + + # we get nicknames for everybody, make it easier for organisers + for jid_, data in prefilled.items(): + id_data = await self.host.bridge.identity_get(jid_, [], True, self.profile) + id_data = data_format.deserialise(id_data) + data["nick"] = id_data["nicknames"][0] + + await self.output(prefilled) + self.host.quit() + + +class InviteeInvite(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "invite", + use_pubsub=True, + pubsub_flags={C.NODE, C.SINGLE_ITEM}, + help=_("invite someone to the event through email"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-e", + "--email", + action="append", + default=[], + help="email(s) to send the invitation to", + ) + self.parser.add_argument( + "-N", + "--name", + default="", + help="name of the invitee", + ) + self.parser.add_argument( + "-H", + "--host-name", + default="", + help="name of the host", + ) + self.parser.add_argument( + "-l", + "--lang", + default="", + help="main language spoken by the invitee", + ) + self.parser.add_argument( + "-U", + "--url-template", + default="", + help="template to construct the URL", + ) + self.parser.add_argument( + "-S", + "--subject", + default="", + help="subject of the invitation email (default: generic subject)", + ) + self.parser.add_argument( + "-b", + "--body", + default="", + help="body of the invitation email (default: generic body)", + ) + + async def start(self): + email = self.args.email[0] if self.args.email else None + emails_extra = self.args.email[1:] + + try: + await self.host.bridge.event_invite_by_email( + self.args.service, + self.args.node, + self.args.item, + email, + emails_extra, + self.args.name, + self.args.host_name, + self.args.lang, + self.args.url_template, + self.args.subject, + self.args.body, + self.args.profile, + ) + except Exception as e: + self.disp(f"can't create invitation: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.host.quit() + + +class Invitee(base.CommandBase): + subcommands = (InviteeGet, InviteeSet, InviteesList, InviteeInvite) + + def __init__(self, host): + super(Invitee, self).__init__( + host, "invitee", use_profile=False, help=_("manage invities") + ) + + +class Event(base.CommandBase): + subcommands = (Get, Create, Modify, Invitee) + + def __init__(self, host): + super(Event, self).__init__( + host, "event", use_profile=False, help=_("event management") + )