Mercurial > libervia-backend
view sat_frontends/jp/cmd_event.py @ 3956:3cb9ade2ab84
plugin pubsub signing: pubsub items signature implementation:
- this is based on a protoXEP, not yet an official XEP: https://github.com/xsf/xeps/pull/1228
- XEP-0470: `set` attachment handler can now be async
rel 381
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Oct 2022 18:47:17 +0200 |
parents | d8baf92cb921 |
children | 524856bd7b19 |
line wrap: on
line source
#!/usr/bin/env python3 # libervia-cli: Libervia CLI frontend # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import sys from sqlalchemy import desc from sat.core.i18n import _ from sat.core.i18n import _ from sat.tools.common import data_format from sat.tools.common import data_format from sat.tools.common import date_utils from sat.tools.common.ansi import ANSI as A from sat.tools.common.ansi import ANSI as A from sat_frontends.jp import common from sat_frontends.jp.constants import Const as C from sat_frontends.jp.constants import Const as C from . import base __commands__ = ["Event"] OUTPUT_OPT_TABLE = "table" class Get(base.CommandBase): def __init__(self, host): base.CommandBase.__init__( self, host, "get", use_output=C.OUTPUT_LIST_DICT, use_pubsub=True, pubsub_flags={C.MULTI_ITEMS, C.CACHE}, use_verbose=True, extra_outputs={ "default": self.default_output, }, help=_("get event(s) data"), ) def add_parser_options(self): pass async def start(self): try: events_data_s = await self.host.bridge.eventsGet( self.args.service, self.args.node, self.args.items, self.getPubsubExtra(), self.profile, ) except Exception as e: self.disp(f"can't get events data: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: events_data = data_format.deserialise(events_data_s, type_check=list) await self.output(events_data) self.host.quit() def default_output(self, events): nb_events = len(events) for idx, event in enumerate(events): names = event["name"] name = names.get("") or next(iter(names.values())) start = event["start"] start_human = date_utils.date_fmt( start, "medium", tz_info=date_utils.TZ_LOCAL ) end = event["end"] self.disp(A.color( A.BOLD, start_human, A.RESET, " ", f"({date_utils.delta2human(start, end)}) ", C.A_HEADER, name )) if self.verbosity > 0: descriptions = event.get("descriptions", []) if descriptions: self.disp(descriptions[0]["description"]) if idx < (nb_events-1): self.disp("") class CategoryAction(argparse.Action): def __init__(self, option_strings, dest, nargs=None, metavar=None, **kwargs): if nargs is not None or metavar is not None: raise ValueError("nargs and metavar must not be used") if metavar is not None: metavar="TERM WIKIDATA_ID LANG" if "--help" in sys.argv: # FIXME: dirty workaround to have correct --help message # argparse doesn't normally allow variable number of arguments beside "+" # and "*", this workaround show METAVAR as 3 arguments were expected, while # we can actuall use 1, 2 or 3. nargs = 3 metavar = ("TERM", "[WIKIDATA_ID]", "[LANG]") else: nargs = "+" super().__init__(option_strings, dest, metavar=metavar, nargs=nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): categories = getattr(namespace, self.dest) if categories is None: categories = [] setattr(namespace, self.dest, categories) if not values: parser.error("category values must be set") category = { "term": values[0] } if len(values) == 1: pass elif len(values) == 2: value = values[1] if value.startswith("Q"): category["wikidata_id"] = value else: category["language"] = value elif len(values) == 3: __, wd, lang = values category["wikidata_id"] = wd category["language"] = lang else: parser.error("Category can't have more than 3 arguments") categories.append(category) class EventBase: def add_parser_options(self): self.parser.add_argument( "-S", "--start", type=base.date_decoder, metavar="TIME_PATTERN", help=_("the start time of the event")) end_group = self.parser.add_mutually_exclusive_group() end_group.add_argument( "-E", "--end", type=base.date_decoder, metavar="TIME_PATTERN", help=_("the time of the end of the event")) end_group.add_argument( "-D", "--duration", help=_("duration of the event")) self.parser.add_argument( "-H", "--head-picture", help="URL to a picture to use as head-picture" ) self.parser.add_argument( "-d", "--description", help="plain text description the event" ) self.parser.add_argument( "-C", "--category", action=CategoryAction, dest="categories", help="Category of the event" ) self.parser.add_argument( "-l", "--location", action="append", nargs="+", metavar="[KEY] VALUE", help="Location metadata" ) rsvp_group = self.parser.add_mutually_exclusive_group() rsvp_group.add_argument( "--rsvp", action="store_true", help=_("RSVP is requested")) rsvp_group.add_argument( "--rsvp_json", metavar="JSON", help=_("JSON description of RSVP form")) for node_type in ("invitees", "comments", "blog", "schedule"): self.parser.add_argument( f"--{node_type}", nargs=2, metavar=("JID", "NODE"), help=_("link {node_type} pubsub node").format(node_type=node_type) ) self.parser.add_argument( "-a", "--attachment", action="append", dest="attachments", help=_("attach a file") ) self.parser.add_argument("--website", help=_("website of the event")) self.parser.add_argument( "--status", choices=["confirmed", "tentative", "cancelled"], help=_("status of the event") ) self.parser.add_argument( "-T", "--language", metavar="LANG", action="append", dest="languages", help=_("main languages spoken at the event") ) self.parser.add_argument( "--wheelchair", choices=["full", "partial", "no"], help=_("is the location accessible by wheelchair") ) self.parser.add_argument( "--external", nargs=3, metavar=("JID", "NODE", "ITEM"), help=_("link to an external event") ) def get_event_data(self): if self.args.duration is not None: if self.args.start is None: self.parser.error("--start must be send if --duration is used") # if duration is used, we simply add it to start time to get end time self.args.end = base.date_decoder(f"{self.args.start} + {self.args.duration}") event = {} if self.args.name is not None: event["name"] = {"": self.args.name} if self.args.start is not None: event["start"] = self.args.start if self.args.end is not None: event["end"] = self.args.end if self.args.head_picture: event["head-picture"] = { "sources": [{ "url": self.args.head_picture }] } if self.args.description: event["descriptions"] = [ { "type": "text", "description": self.args.description } ] if self.args.categories: event["categories"] = self.args.categories if self.args.location is not None: location = {} for location_data in self.args.location: if len(location_data) == 1: location["description"] = location_data[0] else: key, *values = location_data location[key] = " ".join(values) event["locations"] = [location] if self.args.rsvp: event["rsvp"] = [{}] elif self.args.rsvp_json: if isinstance(self.args.rsvp_elt, dict): event["rsvp"] = [self.args.rsvp_json] else: event["rsvp"] = self.args.rsvp_json for node_type in ("invitees", "comments", "blog", "schedule"): value = getattr(self.args, node_type) if value: service, node = value event[node_type] = {"service": service, "node": node} if self.args.attachments: attachments = event["attachments"] = [] for attachment in self.args.attachments: attachments.append({ "sources": [{"url": attachment}] }) extra = {} for arg in ("website", "status", "languages"): value = getattr(self.args, arg) if value is not None: extra[arg] = value if self.args.wheelchair is not None: extra["accessibility"] = {"wheelchair": self.args.wheelchair} if extra: event["extra"] = extra if self.args.external: ext_jid, ext_node, ext_item = self.args.external event["external"] = { "jid": ext_jid, "node": ext_node, "item": ext_item } return event class Create(EventBase, base.CommandBase): def __init__(self, host): super().__init__( host, "create", use_pubsub=True, help=_("create or replace event"), ) def add_parser_options(self): super().add_parser_options() self.parser.add_argument( "-i", "--id", default="", help=_("ID of the PubSub Item"), ) # name is mandatory here self.parser.add_argument("name", help=_("name of the event")) async def start(self): if self.args.start is None: self.parser.error("--start must be set") event_data = self.get_event_data() # we check self.args.end after get_event_data because it may be set there id # --duration is used if self.args.end is None: self.parser.error("--end or --duration must be set") try: await self.host.bridge.eventCreate( data_format.serialise(event_data), self.args.id, self.args.node, self.args.service, self.profile, ) except Exception as e: self.disp(f"can't create event: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.disp(_("Event created successfuly)")) self.host.quit() class Modify(EventBase, base.CommandBase): def __init__(self, host): super(Modify, self).__init__( host, "modify", use_pubsub=True, pubsub_flags={C.SINGLE_ITEM}, help=_("modify an existing event"), ) EventBase.__init__(self) def add_parser_options(self): super().add_parser_options() # name is optional here self.parser.add_argument("-N", "--name", help=_("name of the event")) async def start(self): event_data = self.get_event_data() try: await self.host.bridge.eventModify( data_format.serialise(event_data), self.args.item, self.args.service, self.args.node, self.profile, ) except Exception as e: self.disp(f"can't update event data: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.host.quit() class InviteeGet(base.CommandBase): def __init__(self, host): base.CommandBase.__init__( self, host, "get", use_output=C.OUTPUT_DICT, use_pubsub=True, pubsub_flags={C.SINGLE_ITEM}, use_verbose=True, help=_("get event attendance"), ) def add_parser_options(self): self.parser.add_argument( "-j", "--jid", action="append", dest="jids", default=[], help=_("only retrieve RSVP from those JIDs") ) async def start(self): try: event_data_s = await self.host.bridge.eventInviteeGet( self.args.service, self.args.node, self.args.item, self.args.jids, "", self.profile, ) except Exception as e: self.disp(f"can't get event data: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: event_data = data_format.deserialise(event_data_s) await self.output(event_data) self.host.quit() class InviteeSet(base.CommandBase): def __init__(self, host): super(InviteeSet, self).__init__( host, "set", use_pubsub=True, pubsub_flags={C.SINGLE_ITEM}, help=_("set event attendance"), ) def add_parser_options(self): self.parser.add_argument( "-f", "--field", action="append", nargs=2, dest="fields", metavar=("KEY", "VALUE"), help=_("configuration field to set"), ) async def start(self): # TODO: handle RSVP with XMLUI in a similar way as for `ad-hoc run` fields = dict(self.args.fields) if self.args.fields else {} try: self.host.bridge.eventInviteeSet( self.args.service, self.args.node, self.args.item, data_format.serialise(fields), self.profile, ) except Exception as e: self.disp(f"can't set event data: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.host.quit() class InviteesList(base.CommandBase): def __init__(self, host): extra_outputs = {"default": self.default_output} base.CommandBase.__init__( self, host, "list", use_output=C.OUTPUT_DICT_DICT, extra_outputs=extra_outputs, use_pubsub=True, pubsub_flags={C.NODE}, use_verbose=True, help=_("get event attendance"), ) def add_parser_options(self): self.parser.add_argument( "-m", "--missing", action="store_true", help=_("show missing people (invited but no R.S.V.P. so far)"), ) self.parser.add_argument( "-R", "--no-rsvp", action="store_true", help=_("don't show people which gave R.S.V.P."), ) def _attend_filter(self, attend, row): if attend == "yes": attend_color = C.A_SUCCESS elif attend == "no": attend_color = C.A_FAILURE else: attend_color = A.FG_WHITE return A.color(attend_color, attend) def _guests_filter(self, guests): return "(" + str(guests) + ")" if guests else "" def default_output(self, event_data): data = [] attendees_yes = 0 attendees_maybe = 0 attendees_no = 0 attendees_missing = 0 guests = 0 guests_maybe = 0 for jid_, jid_data in event_data.items(): jid_data["jid"] = jid_ try: guests_int = int(jid_data["guests"]) except (ValueError, KeyError): pass attend = jid_data.get("attend", "") if attend == "yes": attendees_yes += 1 guests += guests_int elif attend == "maybe": attendees_maybe += 1 guests_maybe += guests_int elif attend == "no": attendees_no += 1 jid_data["guests"] = "" else: attendees_missing += 1 jid_data["guests"] = "" data.append(jid_data) show_table = OUTPUT_OPT_TABLE in self.args.output_opts table = common.Table.fromListDict( self.host, data, ("nick",) + (("jid",) if self.host.verbosity else ()) + ("attend", "guests"), headers=None, filters={ "nick": A.color(C.A_HEADER, "{}" if show_table else "{} "), "jid": "{}" if show_table else "{} ", "attend": self._attend_filter, "guests": "{}" if show_table else self._guests_filter, }, defaults={"nick": "", "attend": "", "guests": 1}, ) if show_table: table.display() else: table.display_blank(show_header=False, col_sep="") if not self.args.no_rsvp: self.disp("") self.disp( A.color( C.A_SUBHEADER, _("Attendees: "), A.RESET, str(len(data)), _(" ("), C.A_SUCCESS, _("yes: "), str(attendees_yes), A.FG_WHITE, _(", maybe: "), str(attendees_maybe), ", ", C.A_FAILURE, _("no: "), str(attendees_no), A.RESET, ")", ) ) self.disp( A.color(C.A_SUBHEADER, _("confirmed guests: "), A.RESET, str(guests)) ) self.disp( A.color( C.A_SUBHEADER, _("unconfirmed guests: "), A.RESET, str(guests_maybe), ) ) self.disp( A.color(C.A_SUBHEADER, _("total: "), A.RESET, str(guests + guests_maybe)) ) if attendees_missing: self.disp("") self.disp( A.color( C.A_SUBHEADER, _("missing people (no reply): "), A.RESET, str(attendees_missing), ) ) async def start(self): if self.args.no_rsvp and not self.args.missing: self.parser.error(_("you need to use --missing if you use --no-rsvp")) if not self.args.missing: prefilled = {} else: # we get prefilled data with all people try: affiliations = await self.host.bridge.psNodeAffiliationsGet( self.args.service, self.args.node, self.profile, ) except Exception as e: self.disp(f"can't get node affiliations: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: # we fill all affiliations with empty data, answered one will be filled # below. We only consider people with "publisher" affiliation as invited, # creators are not, and members can just observe prefilled = { jid_: {} for jid_, affiliation in affiliations.items() if affiliation in ("publisher",) } try: event_data = await self.host.bridge.eventInviteesList( self.args.service, self.args.node, self.profile, ) except Exception as e: self.disp(f"can't get event data: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) # we fill nicknames and keep only requested people if self.args.no_rsvp: for jid_ in event_data: # if there is a jid in event_data it must be there in prefilled too # otherwie somebody is not on the invitees list try: del prefilled[jid_] except KeyError: self.disp( A.color( C.A_WARNING, f"We got a RSVP from somebody who was not in invitees " f"list: {jid_}", ), error=True, ) else: # we replace empty dicts for existing people with R.S.V.P. data prefilled.update(event_data) # we get nicknames for everybody, make it easier for organisers for jid_, data in prefilled.items(): id_data = await self.host.bridge.identityGet(jid_, [], True, self.profile) id_data = data_format.deserialise(id_data) data["nick"] = id_data["nicknames"][0] await self.output(prefilled) self.host.quit() class InviteeInvite(base.CommandBase): def __init__(self, host): base.CommandBase.__init__( self, host, "invite", use_pubsub=True, pubsub_flags={C.NODE, C.SINGLE_ITEM}, help=_("invite someone to the event through email"), ) def add_parser_options(self): self.parser.add_argument( "-e", "--email", action="append", default=[], help="email(s) to send the invitation to", ) self.parser.add_argument( "-N", "--name", default="", help="name of the invitee", ) self.parser.add_argument( "-H", "--host-name", default="", help="name of the host", ) self.parser.add_argument( "-l", "--lang", default="", help="main language spoken by the invitee", ) self.parser.add_argument( "-U", "--url-template", default="", help="template to construct the URL", ) self.parser.add_argument( "-S", "--subject", default="", help="subject of the invitation email (default: generic subject)", ) self.parser.add_argument( "-b", "--body", default="", help="body of the invitation email (default: generic body)", ) async def start(self): email = self.args.email[0] if self.args.email else None emails_extra = self.args.email[1:] try: await self.host.bridge.eventInviteByEmail( self.args.service, self.args.node, self.args.item, email, emails_extra, self.args.name, self.args.host_name, self.args.lang, self.args.url_template, self.args.subject, self.args.body, self.args.profile, ) except Exception as e: self.disp(f"can't create invitation: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: self.host.quit() class Invitee(base.CommandBase): subcommands = (InviteeGet, InviteeSet, InviteesList, InviteeInvite) def __init__(self, host): super(Invitee, self).__init__( host, "invitee", use_profile=False, help=_("manage invities") ) class Event(base.CommandBase): subcommands = (Get, Create, Modify, Invitee) def __init__(self, host): super(Event, self).__init__( host, "event", use_profile=False, help=_("event management") )