view libervia/cli/cmd_list.py @ 4230:314d3c02bb67

core (xmpp): Add a timeout for messages processing to avoid blocking the queue.
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:21:04 +0200
parents 47401850dec6
children 89a0999884ac
line wrap: on
line source

#!/usr/bin/env python3


# Libervia CLI
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import json
import os
from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format
from libervia.cli import common
from libervia.cli.constants import Const as C
from . import base

__commands__ = ["List"]

FIELDS_MAP = "mapping"


class Get(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "get",
            use_verbose=True,
            use_pubsub=True,
            pubsub_flags={C.MULTI_ITEMS},
            pubsub_defaults={"service": _("auto"), "node": _("auto")},
            use_output=C.OUTPUT_LIST_XMLUI,
            help=_("get lists"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        await common.fill_well_known_uri(self, os.getcwd(), "tickets", meta_map={})
        try:
            lists_data = data_format.deserialise(
                await self.host.bridge.list_get(
                    self.args.service,
                    self.args.node,
                    self.args.max,
                    self.args.items,
                    "",
                    self.get_pubsub_extra(),
                    self.profile,
                ),
                type_check=list,
            )
        except Exception as e:
            self.disp(f"can't get lists: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.output(lists_data[0])
            self.host.quit(C.EXIT_OK)


class Set(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "set",
            use_pubsub=True,
            pubsub_defaults={"service": _("auto"), "node": _("auto")},
            help=_("set a list item"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f",
            "--field",
            action="append",
            nargs="+",
            dest="fields",
            required=True,
            metavar=("NAME", "VALUES"),
            help=_("field(s) to set (required)"),
        )
        self.parser.add_argument(
            "-U",
            "--update",
            choices=("auto", "true", "false"),
            default="auto",
            help=_("update existing item instead of replacing it (DEFAULT: auto)"),
        )
        self.parser.add_argument(
            "item",
            nargs="?",
            default="",
            help=_("id, URL of the item to update, or nothing for new item"),
        )

    async def start(self):
        await common.fill_well_known_uri(self, os.getcwd(), "tickets", meta_map={})
        if self.args.update == "auto":
            # we update if we have a item id specified
            update = bool(self.args.item)
        else:
            update = C.bool(self.args.update)

        values = {}

        for field_data in self.args.fields:
            values.setdefault(field_data[0], []).extend(field_data[1:])

        extra = {"update": update}

        try:
            item_id = await self.host.bridge.list_set(
                self.args.service,
                self.args.node,
                values,
                "",
                self.args.item,
                data_format.serialise(extra),
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't set list item: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(f"item {str(item_id or self.args.item)!r} set successfully")
            self.host.quit(C.EXIT_OK)


class Delete(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "delete",
            use_pubsub=True,
            pubsub_defaults={"service": _("auto"), "node": _("auto")},
            help=_("delete a list item"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f", "--force", action="store_true", help=_("delete without confirmation")
        )
        self.parser.add_argument(
            "-N", "--notify", action="store_true", help=_("notify deletion")
        )
        self.parser.add_argument(
            "item",
            help=_("id of the item to delete"),
        )

    async def start(self):
        await common.fill_well_known_uri(self, os.getcwd(), "tickets", meta_map={})
        if not self.args.item:
            self.parser.error(_("You need to specify a list item to delete"))
        if not self.args.force:
            message = _("Are you sure to delete list item {item_id} ?").format(
                item_id=self.args.item
            )
            await self.host.confirm_or_quit(message, _("item deletion cancelled"))
        try:
            await self.host.bridge.list_delete_item(
                self.args.service,
                self.args.node,
                self.args.item,
                self.args.notify,
                self.profile,
            )
        except Exception as e:
            self.disp(_("can't delete item: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("item {item} has been deleted").format(item=self.args.item))
            self.host.quit(C.EXIT_OK)


class Import(base.CommandBase):
    # TODO: factorize with blog/import

    def __init__(self, host):
        super().__init__(
            host,
            "import",
            use_progress=True,
            use_verbose=True,
            help=_("import tickets from external software/dataset"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "importer",
            nargs="?",
            help=_("importer name, nothing to display importers list"),
        )
        self.parser.add_argument(
            "-o",
            "--option",
            action="append",
            nargs=2,
            default=[],
            metavar=("NAME", "VALUE"),
            help=_("importer specific options (see importer description)"),
        )
        self.parser.add_argument(
            "-m",
            "--map",
            action="append",
            nargs=2,
            default=[],
            metavar=("IMPORTED_FIELD", "DEST_FIELD"),
            help=_(
                "specified field in import data will be put in dest field (default: use "
                "same field name, or ignore if it doesn't exist)"
            ),
        )
        self.parser.add_argument(
            "-s",
            "--service",
            default="",
            metavar="PUBSUB_SERVICE",
            help=_("PubSub service where the items must be uploaded (default: server)"),
        )
        self.parser.add_argument(
            "-n",
            "--node",
            default="",
            metavar="PUBSUB_NODE",
            help=_(
                "PubSub node where the items must be uploaded (default: tickets' "
                "defaults)"
            ),
        )
        self.parser.add_argument(
            "location",
            nargs="?",
            help=_(
                "importer data location (see importer description), nothing to show "
                "importer description"
            ),
        )

    async def on_progress_started(self, metadata):
        self.disp(_("Tickets upload started"), 2)

    async def on_progress_finished(self, metadata):
        self.disp(_("Tickets uploaded successfully"), 2)

    async def on_progress_error(self, error_msg):
        self.disp(
            _("Error while uploading tickets: {error_msg}").format(error_msg=error_msg),
            error=True,
        )

    async def start(self):
        if self.args.location is None:
            # no location, the list of importer or description is requested
            for name in ("option", "service", "node"):
                if getattr(self.args, name):
                    self.parser.error(
                        _(
                            "{name} argument can't be used without location argument"
                        ).format(name=name)
                    )
            if self.args.importer is None:
                self.disp(
                    "\n".join(
                        [
                            f"{name}: {desc}"
                            for name, desc in await self.host.bridge.ticketsImportList()
                        ]
                    )
                )
            else:
                try:
                    short_desc, long_desc = await self.host.bridge.ticketsImportDesc(
                        self.args.importer
                    )
                except Exception as e:
                    self.disp(f"can't get importer description: {e}", error=True)
                    self.host.quit(C.EXIT_BRIDGE_ERRBACK)
                else:
                    self.disp(f"{name}: {short_desc}\n\n{long_desc}")
            self.host.quit()
        else:
            # we have a location, an import is requested

            if self.args.progress:
                # we use a custom progress bar template as we want a counter
                self.pbar_template = [
                    _("Progress: "),
                    ["Percentage"],
                    " ",
                    ["Bar"],
                    " ",
                    ["Counter"],
                    " ",
                    ["ETA"],
                ]

            options = {key: value for key, value in self.args.option}
            fields_map = dict(self.args.map)
            if fields_map:
                if FIELDS_MAP in options:
                    self.parser.error(
                        _(
                            "fields_map must be specified either preencoded in --option or "
                            "using --map, but not both at the same time"
                        )
                    )
                options[FIELDS_MAP] = json.dumps(fields_map)

            try:
                progress_id = await self.host.bridge.ticketsImport(
                    self.args.importer,
                    self.args.location,
                    options,
                    self.args.service,
                    self.args.node,
                    self.profile,
                )
            except Exception as e:
                self.disp(
                    _("Error while trying to import tickets: {e}").format(e=e),
                    error=True,
                )
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
            else:
                await self.set_progress_id(progress_id)


class List(base.CommandBase):
    subcommands = (Get, Set, Delete, Import)

    def __init__(self, host):
        super(List, self).__init__(
            host, "list", use_profile=False, help=_("pubsub lists handling")
        )