view libervia/cli/cmd_pubsub.py @ 4202:b26339343076

core: use a user specific directory for PID file: default location of pid file is now specific to logged user, this allow to run several instances of Libervia by different users on the same machine without PID conflicts.
author Goffi <goffi@goffi.org>
date Sun, 14 Jan 2024 17:48:02 +0100
parents 47401850dec6
children 3f7ca590a5da
line wrap: on
line source

#!/usr/bin/env python3


# Libervia CLI
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import argparse
import os.path
import re
import sys
import subprocess
import asyncio
import json
from . import base
from libervia.backend.core.i18n import _
from libervia.backend.core import exceptions
from libervia.cli.constants import Const as C
from libervia.cli import common
from libervia.cli import arg_tools
from libervia.cli import xml_tools
from functools import partial
from libervia.backend.tools.common import data_format
from libervia.backend.tools.common import uri
from libervia.backend.tools.common.ansi import ANSI as A
from libervia.backend.tools.common import date_utils
from libervia.frontends.tools import jid, strings
from libervia.frontends.bridge.bridge_frontend import BridgeException

__commands__ = ["Pubsub"]

PUBSUB_TMP_DIR = "pubsub"
PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema"
ALLOWED_SUBSCRIPTIONS_OWNER = ("subscribed", "pending", "none")

# TODO: need to split this class in several modules, plugin should handle subcommands


class NodeInfo(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "info",
            use_output=C.OUTPUT_DICT,
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("retrieve node configuration"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-k",
            "--key",
            action="append",
            dest="keys",
            help=_("data key to filter"),
        )

    def remove_prefix(self, key):
        return key[7:] if key.startswith("pubsub#") else key

    def filter_key(self, key):
        return any((key == k or key == "pubsub#" + k) for k in self.args.keys)

    async def start(self):
        try:
            config_dict = await self.host.bridge.ps_node_configuration_get(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except BridgeException as e:
            if e.condition == "item-not-found":
                self.disp(
                    f"The node {self.args.node} doesn't exist on {self.args.service}",
                    error=True,
                )
                self.host.quit(C.EXIT_NOT_FOUND)
            else:
                self.disp(f"can't get node configuration: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        except Exception as e:
            self.disp(f"Internal error: {e}", error=True)
            self.host.quit(C.EXIT_INTERNAL_ERROR)
        else:
            key_filter = (lambda k: True) if not self.args.keys else self.filter_key
            config_dict = {
                self.remove_prefix(k): v for k, v in config_dict.items() if key_filter(k)
            }
            await self.output(config_dict)
            self.host.quit()


class NodeCreate(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "create",
            use_output=C.OUTPUT_DICT,
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("create a node"),
        )

    @staticmethod
    def add_node_config_options(parser):
        parser.add_argument(
            "-f",
            "--field",
            action="append",
            nargs=2,
            dest="fields",
            default=[],
            metavar=("KEY", "VALUE"),
            help=_("configuration field to set"),
        )
        parser.add_argument(
            "-F",
            "--full-prefix",
            action="store_true",
            help=_('don\'t prepend "pubsub#" prefix to field names'),
        )

    def add_parser_options(self):
        self.add_node_config_options(self.parser)

    @staticmethod
    def get_config_options(args):
        if not args.full_prefix:
            return {"pubsub#" + k: v for k, v in args.fields}
        else:
            return dict(args.fields)

    async def start(self):
        options = self.get_config_options(self.args)
        try:
            node_id = await self.host.bridge.ps_node_create(
                self.args.service,
                self.args.node,
                options,
                self.profile,
            )
        except Exception as e:
            self.disp(msg=_("can't create node: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            if self.host.verbosity:
                announce = _("node created successfully: ")
            else:
                announce = ""
            self.disp(announce + node_id)
            self.host.quit()


class NodePurge(base.CommandBase):
    def __init__(self, host):
        super(NodePurge, self).__init__(
            host,
            "purge",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("purge a node (i.e. remove all items from it)"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f",
            "--force",
            action="store_true",
            help=_("purge node without confirmation"),
        )

    async def start(self):
        if not self.args.force:
            if not self.args.service:
                message = _(
                    "Are you sure to purge PEP node [{node}]? This will "
                    "delete ALL items from it!"
                ).format(node=self.args.node)
            else:
                message = _(
                    "Are you sure to delete node [{node}] on service "
                    "[{service}]? This will delete ALL items from it!"
                ).format(node=self.args.node, service=self.args.service)
            await self.host.confirm_or_quit(message, _("node purge cancelled"))

        try:
            await self.host.bridge.ps_node_purge(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except Exception as e:
            self.disp(msg=_("can't purge node: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("node [{node}] purged successfully").format(node=self.args.node))
            self.host.quit()


class NodeDelete(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "delete",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("delete a node"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f",
            "--force",
            action="store_true",
            help=_("delete node without confirmation"),
        )

    async def start(self):
        if not self.args.force:
            if not self.args.service:
                message = _("Are you sure to delete PEP node [{node}] ?").format(
                    node=self.args.node
                )
            else:
                message = _(
                    "Are you sure to delete node [{node}] on " "service [{service}]?"
                ).format(node=self.args.node, service=self.args.service)
            await self.host.confirm_or_quit(message, _("node deletion cancelled"))

        try:
            await self.host.bridge.ps_node_delete(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't delete node: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("node [{node}] deleted successfully").format(node=self.args.node))
            self.host.quit()


class NodeSet(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "set",
            use_output=C.OUTPUT_DICT,
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("set node configuration"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f",
            "--field",
            action="append",
            nargs=2,
            dest="fields",
            required=True,
            metavar=("KEY", "VALUE"),
            help=_("configuration field to set (required)"),
        )
        self.parser.add_argument(
            "-F",
            "--full-prefix",
            action="store_true",
            help=_('don\'t prepend "pubsub#" prefix to field names'),
        )

    def get_key_name(self, k):
        if self.args.full_prefix or k.startswith("pubsub#"):
            return k
        else:
            return "pubsub#" + k

    async def start(self):
        try:
            await self.host.bridge.ps_node_configuration_set(
                self.args.service,
                self.args.node,
                {self.get_key_name(k): v for k, v in self.args.fields},
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't set node configuration: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("node configuration successful"), 1)
            self.host.quit()


class NodeImport(base.CommandBase):
    def __init__(self, host):
        super(NodeImport, self).__init__(
            host,
            "import",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("import raw XML to a node"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "--admin",
            action="store_true",
            help=_("do a pubsub admin request, needed to change publisher"),
        )
        self.parser.add_argument(
            "import_file",
            type=argparse.FileType(),
            help=_(
                "path to the XML file with data to import. The file must contain "
                "whole XML of each item to import."
            ),
        )

    async def start(self):
        try:
            element, etree = xml_tools.etree_parse(
                self, self.args.import_file, reraise=True
            )
        except Exception as e:
            from lxml.etree import XMLSyntaxError

            if isinstance(e, XMLSyntaxError) and e.code == 5:
                # we have extra content, this probaby means that item are not wrapped
                # so we wrap them here and try again
                self.args.import_file.seek(0)
                xml_buf = "<import>" + self.args.import_file.read() + "</import>"
                element, etree = xml_tools.etree_parse(self, xml_buf)

                # we reverse element as we expect to have most recently published element first
                # TODO: make this more explicit and add an option
        element[:] = reversed(element)

        if not all([i.tag == "{http://jabber.org/protocol/pubsub}item" for i in element]):
            self.disp(
                _("You are not using list of pubsub items, we can't import this file"),
                error=True,
            )
            self.host.quit(C.EXIT_DATA_ERROR)
            return

        items = [etree.tostring(i, encoding="unicode") for i in element]
        if self.args.admin:
            method = self.host.bridge.ps_admin_items_send
        else:
            self.disp(
                _(
                    "Items are imported without using admin mode, publisher can't "
                    "be changed"
                )
            )
            method = self.host.bridge.ps_items_send

        try:
            items_ids = await method(
                self.args.service,
                self.args.node,
                items,
                "",
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't send items: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            if items_ids:
                self.disp(
                    _("items published with id(s) {items_ids}").format(
                        items_ids=", ".join(items_ids)
                    )
                )
            else:
                self.disp(_("items published"))
            self.host.quit()


class NodeAffiliationsGet(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "get",
            use_output=C.OUTPUT_DICT,
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("retrieve node affiliations (for node owner)"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            affiliations = await self.host.bridge.ps_node_affiliations_get(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't get node affiliations: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.output(affiliations)
            self.host.quit()


class NodeAffiliationsSet(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "set",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("set affiliations (for node owner)"),
        )

    def add_parser_options(self):
        # XXX: we use optional argument syntax for a required one because list of list of 2 elements
        #      (used to construct dicts) don't work with positional arguments
        self.parser.add_argument(
            "-a",
            "--affiliation",
            dest="affiliations",
            metavar=("JID", "AFFILIATION"),
            required=True,
            action="append",
            nargs=2,
            help=_("entity/affiliation couple(s)"),
        )

    async def start(self):
        affiliations = dict(self.args.affiliations)
        try:
            await self.host.bridge.ps_node_affiliations_set(
                self.args.service,
                self.args.node,
                affiliations,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't set node affiliations: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("affiliations have been set"), 1)
            self.host.quit()


class NodeAffiliations(base.CommandBase):
    subcommands = (NodeAffiliationsGet, NodeAffiliationsSet)

    def __init__(self, host):
        super(NodeAffiliations, self).__init__(
            host,
            "affiliations",
            use_profile=False,
            help=_("set or retrieve node affiliations"),
        )


class NodeSubscriptionsGet(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "get",
            use_output=C.OUTPUT_DICT,
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("retrieve node subscriptions (for node owner)"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "--public",
            action="store_true",
            help=_("get public subscriptions"),
        )

    async def start(self):
        if self.args.public:
            method = self.host.bridge.ps_public_node_subscriptions_get
        else:
            method = self.host.bridge.ps_node_subscriptions_get
        try:
            subscriptions = await method(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't get node subscriptions: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.output(subscriptions)
            self.host.quit()


class StoreSubscriptionAction(argparse.Action):
    """Action which handle subscription parameter for owner

    list is given by pairs: jid and subscription state
    if subscription state is not specified, it default to "subscribed"
    """

    def __call__(self, parser, namespace, values, option_string):
        dest_dict = getattr(namespace, self.dest)
        while values:
            jid_s = values.pop(0)
            try:
                subscription = values.pop(0)
            except IndexError:
                subscription = "subscribed"
            if subscription not in ALLOWED_SUBSCRIPTIONS_OWNER:
                parser.error(
                    _("subscription must be one of {}").format(
                        ", ".join(ALLOWED_SUBSCRIPTIONS_OWNER)
                    )
                )
            dest_dict[jid_s] = subscription


class NodeSubscriptionsSet(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "set",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("set/modify subscriptions (for node owner)"),
        )

    def add_parser_options(self):
        # XXX: we use optional argument syntax for a required one because list of list of 2 elements
        #      (uses to construct dicts) don't work with positional arguments
        self.parser.add_argument(
            "-S",
            "--subscription",
            dest="subscriptions",
            default={},
            nargs="+",
            metavar=("JID [SUSBSCRIPTION]"),
            required=True,
            action=StoreSubscriptionAction,
            help=_("entity/subscription couple(s)"),
        )

    async def start(self):
        try:
            self.host.bridge.ps_node_subscriptions_set(
                self.args.service,
                self.args.node,
                self.args.subscriptions,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't set node subscriptions: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("subscriptions have been set"), 1)
            self.host.quit()


class NodeSubscriptions(base.CommandBase):
    subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet)

    def __init__(self, host):
        super(NodeSubscriptions, self).__init__(
            host,
            "subscriptions",
            use_profile=False,
            help=_("get or modify node subscriptions"),
        )


class NodeSchemaSet(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "set",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("set/replace a schema"),
        )

    def add_parser_options(self):
        self.parser.add_argument("schema", help=_("schema to set (must be XML)"))

    async def start(self):
        try:
            await self.host.bridge.ps_schema_set(
                self.args.service,
                self.args.node,
                self.args.schema,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't set schema: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("schema has been set"), 1)
            self.host.quit()


class NodeSchemaEdit(base.CommandBase, common.BaseEdit):
    use_items = False

    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "edit",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_draft=True,
            use_verbose=True,
            help=_("edit a schema"),
        )
        common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR)

    def add_parser_options(self):
        pass

    async def publish(self, schema):
        try:
            await self.host.bridge.ps_schema_set(
                self.args.service,
                self.args.node,
                schema,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't set schema: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("schema has been set"), 1)
            self.host.quit()

    async def ps_schema_get_cb(self, schema):
        try:
            from lxml import etree
        except ImportError:
            self.disp(
                "lxml module must be installed to use edit, please install it "
                'with "pip install lxml"',
                error=True,
            )
            self.host.quit(1)
        content_file_obj, content_file_path = self.get_tmp_file()
        schema = schema.strip()
        if schema:
            parser = etree.XMLParser(remove_blank_text=True)
            schema_elt = etree.fromstring(schema, parser)
            content_file_obj.write(
                etree.tostring(schema_elt, encoding="utf-8", pretty_print=True)
            )
            content_file_obj.seek(0)
        await self.run_editor(
            "pubsub_schema_editor_args", content_file_path, content_file_obj
        )

    async def start(self):
        try:
            schema = await self.host.bridge.ps_schema_get(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except BridgeException as e:
            if e.condition == "item-not-found" or e.classname == "NotFound":
                schema = ""
            else:
                self.disp(f"can't edit schema: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)

        await self.ps_schema_get_cb(schema)


class NodeSchemaGet(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "get",
            use_output=C.OUTPUT_XML,
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("get schema"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            schema = await self.host.bridge.ps_schema_get(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except BridgeException as e:
            if e.condition == "item-not-found" or e.classname == "NotFound":
                schema = None
            else:
                self.disp(f"can't get schema: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)

        if schema:
            await self.output(schema)
            self.host.quit()
        else:
            self.disp(_("no schema found"), 1)
            self.host.quit(C.EXIT_NOT_FOUND)


class NodeSchema(base.CommandBase):
    subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet)

    def __init__(self, host):
        super(NodeSchema, self).__init__(
            host, "schema", use_profile=False, help=_("data schema manipulation")
        )


class Node(base.CommandBase):
    subcommands = (
        NodeInfo,
        NodeCreate,
        NodePurge,
        NodeDelete,
        NodeSet,
        NodeImport,
        NodeAffiliations,
        NodeSubscriptions,
        NodeSchema,
    )

    def __init__(self, host):
        super(Node, self).__init__(
            host, "node", use_profile=False, help=_("node handling")
        )


class CacheGet(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "get",
            use_output=C.OUTPUT_LIST_XML,
            use_pubsub=True,
            pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
            help=_("get pubsub item(s) from cache"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-S",
            "--sub-id",
            default="",
            help=_("subscription id"),
        )

    async def start(self):
        try:
            ps_result = data_format.deserialise(
                await self.host.bridge.ps_cache_get(
                    self.args.service,
                    self.args.node,
                    self.args.max,
                    self.args.items,
                    self.args.sub_id,
                    self.get_pubsub_extra(),
                    self.profile,
                )
            )
        except BridgeException as e:
            if e.classname == "NotFound":
                self.disp(
                    f"The node {self.args.node} from {self.args.service} is not in cache "
                    f"for {self.profile}",
                    error=True,
                )
                self.host.quit(C.EXIT_NOT_FOUND)
            else:
                self.disp(f"can't get pubsub items from cache: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        except Exception as e:
            self.disp(f"Internal error: {e}", error=True)
            self.host.quit(C.EXIT_INTERNAL_ERROR)
        else:
            await self.output(ps_result["items"])
            self.host.quit(C.EXIT_OK)


class CacheSync(base.CommandBase):

    def __init__(self, host):
        super().__init__(
            host,
            "sync",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("(re)synchronise a pubsub node"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            await self.host.bridge.ps_cache_sync(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except BridgeException as e:
            if e.condition == "item-not-found" or e.classname == "NotFound":
                self.disp(
                    f"The node {self.args.node} doesn't exist on {self.args.service}",
                    error=True,
                )
                self.host.quit(C.EXIT_NOT_FOUND)
            else:
                self.disp(f"can't synchronise pubsub node: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        except Exception as e:
            self.disp(f"Internal error: {e}", error=True)
            self.host.quit(C.EXIT_INTERNAL_ERROR)
        else:
            self.host.quit(C.EXIT_OK)


class CachePurge(base.CommandBase):

    def __init__(self, host):
        super().__init__(
            host,
            "purge",
            use_profile=False,
            help=_("purge (delete) items from cache"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-s", "--service", action="append", metavar="JID", dest="services",
            help="purge items only for these services. If not specified, items from ALL "
            "services will be purged. May be used several times."
        )
        self.parser.add_argument(
            "-n", "--node", action="append", dest="nodes",
            help="purge items only for these nodes. If not specified, items from ALL "
            "nodes will be purged. May be used several times."
        )
        self.parser.add_argument(
            "-p", "--profile", action="append", dest="profiles",
            help="purge items only for these profiles. If not specified, items from ALL "
            "profiles will be purged. May be used several times."
        )
        self.parser.add_argument(
            "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN",
            help="purge items which have been last updated before given time."
        )
        self.parser.add_argument(
            "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN",
            help="purge items which have been last created before given time."
        )
        self.parser.add_argument(
            "-t", "--type", action="append", dest="types",
            help="purge items flagged with TYPE. May be used several times."
        )
        self.parser.add_argument(
            "-S", "--subtype", action="append", dest="subtypes",
            help="purge items flagged with SUBTYPE. May be used several times."
        )
        self.parser.add_argument(
            "-f", "--force", action="store_true",
            help=_("purge items without confirmation")
        )

    async def start(self):
        if not self.args.force:
            await self.host.confirm_or_quit(
                _(
                    "Are you sure to purge items from cache? You'll have to bypass cache "
                    "or resynchronise nodes to access deleted items again."
                ),
                _("Items purgins has been cancelled.")
            )
        purge_data = {}
        for key in (
                "services", "nodes", "profiles", "updated_before", "created_before",
                "types", "subtypes"
        ):
            value = getattr(self.args, key)
            if value is not None:
                purge_data[key] = value
        try:
            await self.host.bridge.ps_cache_purge(
                data_format.serialise(
                    purge_data
                )
            )
        except Exception as e:
            self.disp(f"Internal error: {e}", error=True)
            self.host.quit(C.EXIT_INTERNAL_ERROR)
        else:
            self.host.quit(C.EXIT_OK)


class CacheReset(base.CommandBase):

    def __init__(self, host):
        super().__init__(
            host,
            "reset",
            use_profile=False,
            help=_("remove everything from cache"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f", "--force", action="store_true",
            help=_("reset cache without confirmation")
        )

    async def start(self):
        if not self.args.force:
            await self.host.confirm_or_quit(
                _(
                    "Are you sure to reset cache? All nodes and items will be removed "
                    "from it, then it will be progressively refilled as if it were new. "
                    "This may be resources intensive."
                ),
                _("Pubsub cache reset has been cancelled.")
            )
        try:
            await self.host.bridge.ps_cache_reset()
        except Exception as e:
            self.disp(f"Internal error: {e}", error=True)
            self.host.quit(C.EXIT_INTERNAL_ERROR)
        else:
            self.host.quit(C.EXIT_OK)


class CacheSearch(base.CommandBase):
    def __init__(self, host):
        extra_outputs = {
            "default": self.default_output,
            "xml": self.xml_output,
            "xml-raw": self.xml_raw_output,
        }
        super().__init__(
            host,
            "search",
            use_profile=False,
            use_output=C.OUTPUT_LIST_DICT,
            extra_outputs=extra_outputs,
            help=_("search for pubsub items in cache"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY"
        )
        self.parser.add_argument(
            "-p", "--profile", action="append", dest="profiles", metavar="PROFILE",
            help="search items only from these profiles. May be used several times."
        )
        self.parser.add_argument(
            "-s", "--service", action="append", dest="services", metavar="SERVICE",
            help="items must be from specified service. May be used several times."
        )
        self.parser.add_argument(
            "-n", "--node", action="append", dest="nodes", metavar="NODE",
            help="items must be in the specified node. May be used several times."
        )
        self.parser.add_argument(
            "-t", "--type", action="append", dest="types", metavar="TYPE",
            help="items must be of specified type. May be used several times."
        )
        self.parser.add_argument(
            "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE",
            help="items must be of specified subtype. May be used several times."
        )
        self.parser.add_argument(
            "-P", "--payload", action="store_true", help=_("include item XML payload")
        )
        self.parser.add_argument(
            "-o", "--order-by", action="append", nargs="+",
            metavar=("ORDER", "[FIELD] [DIRECTION]"),
            help=_("how items must be ordered. May be used several times.")
        )
        self.parser.add_argument(
            "-l", "--limit", type=int, help=_("maximum number of items to return")
        )
        self.parser.add_argument(
            "-i", "--index", type=int, help=_("return results starting from this index")
        )
        self.parser.add_argument(
            "-F",
            "--field",
            action="append",
            nargs=3,
            dest="fields",
            default=[],
            metavar=("PATH", "OPERATOR", "VALUE"),
            help=_("parsed data field filter. May be used several times."),
        )
        self.parser.add_argument(
            "-k",
            "--key",
            action="append",
            dest="keys",
            metavar="KEY",
            help=_(
                "data key(s) to display. May be used several times. DEFAULT: show all "
                "keys"
            ),
        )

    async def start(self):
        query = {}
        for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"):
            value = getattr(self.args, arg)
            if value:
                if arg in ("types", "subtypes"):
                    # empty string is used to find items without type and/or subtype
                    value = [v or None for v in value]
                query[arg] = value
        for arg in ("limit", "index"):
            value = getattr(self.args, arg)
            if value is not None:
                query[arg] = value
        if self.args.order_by is not None:
            for order_data in self.args.order_by:
                order, *args = order_data
                if order == "field":
                    if not args:
                        self.parser.error(_("field data must be specified in --order-by"))
                    elif len(args) == 1:
                        path = args[0]
                        direction = "asc"
                    elif len(args) == 2:
                        path, direction = args
                    else:
                        self.parser.error(_(
                            "You can't specify more that 2 arguments for a field in "
                            "--order-by"
                        ))
                    try:
                        path = json.loads(path)
                    except json.JSONDecodeError:
                        pass
                    order_query = {
                        "path": path,
                    }
                else:
                    order_query = {
                        "order": order
                    }
                    if not args:
                        direction = "asc"
                    elif len(args) == 1:
                        direction = args[0]
                    else:
                        self.parser.error(_(
                            "there are too many arguments in --order-by option"
                        ))
                if direction.lower() not in ("asc", "desc"):
                    self.parser.error(_("invalid --order-by direction: {direction!r}"))
                order_query["direction"] = direction
                query.setdefault("order-by", []).append(order_query)

        if self.args.fields:
            parsed = []
            for field in self.args.fields:
                path, operator, value = field
                try:
                    path = json.loads(path)
                except json.JSONDecodeError:
                    # this is not a JSON encoded value, we keep it as a string
                    pass

                if not isinstance(path, list):
                    path = [path]

                # handling of TP(<time pattern>)
                if operator in (">", "gt", "<", "le", "between"):
                    def datetime_sub(match):
                        return str(date_utils.date_parse_ext(
                            match.group(1), default_tz=date_utils.TZ_LOCAL
                        ))
                    value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value)

                try:
                    value = json.loads(value)
                except json.JSONDecodeError:
                    # not JSON, as above we keep it as string
                    pass

                if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
                    if not isinstance(value, list):
                        value = [value]

                parsed.append({
                    "path": path,
                    "op": operator,
                    "value": value
                })

            query["parsed"] = parsed

        if self.args.payload or "xml" in self.args.output:
            query["with_payload"] = True
            if self.args.keys:
                self.args.keys.append("item_payload")
        try:
            found_items = data_format.deserialise(
                await self.host.bridge.ps_cache_search(
                    data_format.serialise(query)
                ),
                type_check=list,
            )
        except BridgeException as e:
            self.disp(f"can't search for pubsub items in cache: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        except Exception as e:
            self.disp(f"Internal error: {e}", error=True)
            self.host.quit(C.EXIT_INTERNAL_ERROR)
        else:
            if self.args.keys:
                found_items = [
                    {k: v for k,v in item.items() if k in self.args.keys}
                    for item in found_items
                ]
            await self.output(found_items)
            self.host.quit(C.EXIT_OK)

    def default_output(self, found_items):
        for item in found_items:
            for field in ("created", "published", "updated"):
                try:
                    timestamp = item[field]
                except KeyError:
                    pass
                else:
                    try:
                        item[field] = common.format_time(timestamp)
                    except ValueError:
                        pass
        self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items)

    def xml_output(self, found_items):
        """Output prettified item payload"""
        cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"]
        for item in found_items:
            cb(item["item_payload"])

    def xml_raw_output(self, found_items):
        """Output item payload without prettifying"""
        cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"]
        for item in found_items:
            cb(item["item_payload"])


class Cache(base.CommandBase):
    subcommands = (
        CacheGet,
        CacheSync,
        CachePurge,
        CacheReset,
        CacheSearch,
    )

    def __init__(self, host):
        super(Cache, self).__init__(
            host, "cache", use_profile=False, help=_("pubsub cache handling")
        )


class Set(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "set",
            use_pubsub=True,
            use_quiet=True,
            pubsub_flags={C.NODE},
            help=_("publish a new item or update an existing one"),
        )

    def add_parser_options(self):
        NodeCreate.add_node_config_options(self.parser)
        self.parser.add_argument(
            "-e",
            "--encrypt",
            action="store_true",
            help=_("end-to-end encrypt the blog item")
        )
        self.parser.add_argument(
            "--encrypt-for",
            metavar="JID",
            action="append",
            help=_("encrypt a single item for")
        )
        self.parser.add_argument(
            "-X",
            "--sign",
            action="store_true",
            help=_("cryptographically sign the blog post")
        )
        self.parser.add_argument(
            "item",
            nargs="?",
            default="",
            help=_("id, URL of the item to update, keyword, or nothing for new item"),
        )

    async def start(self):
        element, etree = xml_tools.etree_parse(self, sys.stdin)
        element = xml_tools.get_payload(self, element)
        payload = etree.tostring(element, encoding="unicode")
        extra = {}
        if self.args.encrypt:
            extra["encrypted"] = True
        if self.args.encrypt_for:
            extra["encrypted_for"] = {"targets": self.args.encrypt_for}
        if self.args.sign:
            extra["signed"] = True
        publish_options = NodeCreate.get_config_options(self.args)
        if publish_options:
            extra["publish_options"] = publish_options

        try:
            published_id = await self.host.bridge.ps_item_send(
                self.args.service,
                self.args.node,
                payload,
                self.args.item,
                data_format.serialise(extra),
                self.profile,
            )
        except Exception as e:
            self.disp(_("can't send item: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            if published_id:
                if self.args.quiet:
                    self.disp(published_id, end="")
                else:
                    self.disp(f"Item published at {published_id}")
            else:
                self.disp("Item published")
            self.host.quit(C.EXIT_OK)


class Get(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "get",
            use_output=C.OUTPUT_LIST_XML,
            use_pubsub=True,
            pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
            help=_("get pubsub item(s)"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-S",
            "--sub-id",
            default="",
            help=_("subscription id"),
        )
        self.parser.add_argument(
            "--no-decrypt",
            action="store_true",
            help=_("don't do automatic decryption of e2ee items"),
        )
        #  TODO: a key(s) argument to select keys to display

    async def start(self):
        extra = {}
        if self.args.no_decrypt:
            extra["decrypt"] = False
        try:
            ps_result = data_format.deserialise(
                await self.host.bridge.ps_items_get(
                    self.args.service,
                    self.args.node,
                    self.args.max,
                    self.args.items,
                    self.args.sub_id,
                    self.get_pubsub_extra(extra),
                    self.profile,
                )
            )
        except BridgeException as e:
            if e.condition == "item-not-found" or e.classname == "NotFound":
                self.disp(
                    f"The node {self.args.node} doesn't exist on {self.args.service}",
                    error=True,
                )
                self.host.quit(C.EXIT_NOT_FOUND)
            else:
                self.disp(f"can't get pubsub items: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        except Exception as e:
            self.disp(f"Internal error: {e}", error=True)
            self.host.quit(C.EXIT_INTERNAL_ERROR)
        else:
            await self.output(ps_result["items"])
            self.host.quit(C.EXIT_OK)


class Delete(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "delete",
            use_pubsub=True,
            pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM},
            help=_("delete an item"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-f", "--force", action="store_true", help=_("delete without confirmation")
        )
        self.parser.add_argument(
            "--no-notification", dest="notify", action="store_false",
            help=_("do not send notification (not recommended)")
        )

    async def start(self):
        if not self.args.item:
            self.parser.error(_("You need to specify an item to delete"))
        if not self.args.force:
            message = _("Are you sure to delete item {item_id} ?").format(
                item_id=self.args.item
            )
            await self.host.confirm_or_quit(message, _("item deletion cancelled"))
        try:
            await self.host.bridge.ps_item_retract(
                self.args.service,
                self.args.node,
                self.args.item,
                self.args.notify,
                self.profile,
            )
        except Exception as e:
            self.disp(_("can't delete item: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("item {item} has been deleted").format(item=self.args.item))
            self.host.quit(C.EXIT_OK)


class Edit(base.CommandBase, common.BaseEdit):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "edit",
            use_verbose=True,
            use_pubsub=True,
            pubsub_flags={C.NODE, C.SINGLE_ITEM},
            use_draft=True,
            help=_("edit an existing or new pubsub item"),
        )
        common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR)

    def add_parser_options(self):
        self.parser.add_argument(
            "-e",
            "--encrypt",
            action="store_true",
            help=_("end-to-end encrypt the blog item")
        )
        self.parser.add_argument(
            "--encrypt-for",
            metavar="JID",
            action="append",
            help=_("encrypt a single item for")
        )
        self.parser.add_argument(
            "-X",
            "--sign",
            action="store_true",
            help=_("cryptographically sign the blog post")
        )

    async def publish(self, content):
        extra = {}
        if self.args.encrypt:
            extra["encrypted"] = True
        if self.args.encrypt_for:
            extra["encrypted_for"] = {"targets": self.args.encrypt_for}
        if self.args.sign:
            extra["signed"] = True
        published_id = await self.host.bridge.ps_item_send(
            self.pubsub_service,
            self.pubsub_node,
            content,
            self.pubsub_item or "",
            data_format.serialise(extra),
            self.profile,
        )
        if published_id:
            self.disp("Item published at {pub_id}".format(pub_id=published_id))
        else:
            self.disp("Item published")

    async def get_item_data(self, service, node, item):
        try:
            from lxml import etree
        except ImportError:
            self.disp(
                "lxml module must be installed to use edit, please install it "
                'with "pip install lxml"',
                error=True,
            )
            self.host.quit(1)
        items = [item] if item else []
        ps_result = data_format.deserialise(
            await self.host.bridge.ps_items_get(
                service, node, 1, items, "", data_format.serialise({}), self.profile
            )
        )
        item_raw = ps_result["items"][0]
        parser = etree.XMLParser(remove_blank_text=True, recover=True)
        item_elt = etree.fromstring(item_raw, parser)
        item_id = item_elt.get("id")
        try:
            payload = item_elt[0]
        except IndexError:
            self.disp(_("Item has not payload"), 1)
            return "", item_id
        return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id

    async def start(self):
        (
            self.pubsub_service,
            self.pubsub_node,
            self.pubsub_item,
            content_file_path,
            content_file_obj,
        ) = await self.get_item_path()
        await self.run_editor("pubsub_editor_args", content_file_path, content_file_obj)
        self.host.quit()


class Rename(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "rename",
            use_pubsub=True,
            pubsub_flags={C.NODE, C.SINGLE_ITEM},
            help=_("rename a pubsub item"),
        )

    def add_parser_options(self):
        self.parser.add_argument("new_id", help=_("new item id to use"))

    async def start(self):
        try:
            await self.host.bridge.ps_item_rename(
                self.args.service,
                self.args.node,
                self.args.item,
                self.args.new_id,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't rename item: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("Item renamed")
            self.host.quit(C.EXIT_OK)


class Subscribe(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "subscribe",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("subscribe to a node"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "--public",
            action="store_true",
            help=_("make the registration visible for everybody"),
        )

    async def start(self):
        options = {}
        if self.args.public:
            namespaces = await self.host.bridge.namespaces_get()
            try:
                ns_pps = namespaces["pps"]
            except KeyError:
                self.disp(
                    "Pubsub Public Subscription plugin is not loaded, can't use --public "
                    "option, subscription stopped", error=True
                )
                self.host.quit(C.EXIT_MISSING_FEATURE)
            else:
                options[f"{{{ns_pps}}}public"] = True
        try:
            sub_id = await self.host.bridge.ps_subscribe(
                self.args.service,
                self.args.node,
                data_format.serialise(options),
                self.profile,
            )
        except Exception as e:
            self.disp(_("can't subscribe to node: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("subscription done"), 1)
            if sub_id:
                self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id))
            self.host.quit()


class Unsubscribe(base.CommandBase):
    # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe

    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "unsubscribe",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            use_verbose=True,
            help=_("unsubscribe from a node"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            await self.host.bridge.ps_unsubscribe(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except Exception as e:
            self.disp(_("can't unsubscribe from node: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(_("subscription removed"), 1)
            self.host.quit()


class Subscriptions(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "subscriptions",
            use_output=C.OUTPUT_LIST_DICT,
            use_pubsub=True,
            help=_("retrieve all subscriptions on a service"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "--public",
            action="store_true",
            help=_("get public subscriptions"),
        )

    async def start(self):
        if self.args.public:
            method = self.host.bridge.ps_public_subscriptions_get
        else:
            method = self.host.bridge.ps_subscriptions_get
        try:
            subscriptions = data_format.deserialise(
                await method(
                    self.args.service,
                    self.args.node,
                    self.profile,
                ),
                type_check=list
            )
        except Exception as e:
            self.disp(_("can't retrieve subscriptions: {e}").format(e=e), error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.output(subscriptions)
            self.host.quit()


class Affiliations(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "affiliations",
            use_output=C.OUTPUT_DICT,
            use_pubsub=True,
            help=_("retrieve all affiliations on a service"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            affiliations = await self.host.bridge.ps_affiliations_get(
                self.args.service,
                self.args.node,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't get node affiliations: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.output(affiliations)
            self.host.quit()


class Reference(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "reference",
            use_pubsub=True,
            pubsub_flags={C.NODE, C.SINGLE_ITEM},
            help=_("send a reference/mention to pubsub item"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-t",
            "--type",
            default="mention",
            choices=("data", "mention"),
            help=_("type of reference to send (DEFAULT: mention)"),
        )
        self.parser.add_argument(
            "recipient",
            help=_("recipient of the reference")
        )

    async def start(self):
        service = self.args.service or await self.host.get_profile_jid()
        if self.args.item:
            anchor = uri.build_xmpp_uri(
                "pubsub", path=service, node=self.args.node, item=self.args.item
            )
        else:
            anchor = uri.build_xmpp_uri("pubsub", path=service, node=self.args.node)

        try:
            await self.host.bridge.reference_send(
                self.args.recipient,
                anchor,
                self.args.type,
                "",
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't send reference: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.host.quit()


class Search(base.CommandBase):
    """This command do a search without using MAM

    This commands checks every items it finds by itself,
    so it may be heavy in resources both for server and client
    """

    RE_FLAGS = re.MULTILINE | re.UNICODE
    EXEC_ACTIONS = ("exec", "external")

    def __init__(self, host):
        # FIXME: C.NO_MAX is not needed here, and this can be globally removed from consts
        #        the only interest is to change the help string, but this can be explained
        #        extensively in man pages (max is for each node found)
        base.CommandBase.__init__(
            self,
            host,
            "search",
            use_output=C.OUTPUT_XML,
            use_pubsub=True,
            pubsub_flags={C.MULTI_ITEMS, C.NO_MAX},
            use_verbose=True,
            help=_("search items corresponding to filters"),
        )

    @property
    def etree(self):
        """load lxml.etree only if needed"""
        if self._etree is None:
            from lxml import etree

            self._etree = etree
        return self._etree

    def filter_opt(self, value, type_):
        return (type_, value)

    def filter_flag(self, value, type_):
        value = C.bool(value)
        return (type_, value)

    def add_parser_options(self):
        self.parser.add_argument(
            "-D",
            "--max-depth",
            type=int,
            default=0,
            help=_(
                "maximum depth of recursion (will search linked nodes if > 0, "
                "DEFAULT: 0)"
            ),
        )
        self.parser.add_argument(
            "-M",
            "--node-max",
            type=int,
            default=30,
            help=_(
                "maximum number of items to get per node ({} to get all items, "
                "DEFAULT: 30)".format(C.NO_LIMIT)
            ),
        )
        self.parser.add_argument(
            "-N",
            "--namespace",
            action="append",
            nargs=2,
            default=[],
            metavar="NAME NAMESPACE",
            help=_("namespace to use for xpath"),
        )

        # filters
        filter_text = partial(self.filter_opt, type_="text")
        filter_re = partial(self.filter_opt, type_="regex")
        filter_xpath = partial(self.filter_opt, type_="xpath")
        filter_python = partial(self.filter_opt, type_="python")
        filters = self.parser.add_argument_group(
            _("filters"),
            _("only items corresponding to following filters will be kept"),
        )
        filters.add_argument(
            "-t",
            "--text",
            action="append",
            dest="filters",
            type=filter_text,
            metavar="TEXT",
            help=_("full text filter, item must contain this string (XML included)"),
        )
        filters.add_argument(
            "-r",
            "--regex",
            action="append",
            dest="filters",
            type=filter_re,
            metavar="EXPRESSION",
            help=_("like --text but using a regular expression"),
        )
        filters.add_argument(
            "-x",
            "--xpath",
            action="append",
            dest="filters",
            type=filter_xpath,
            metavar="XPATH",
            help=_("filter items which has elements matching this xpath"),
        )
        filters.add_argument(
            "-P",
            "--python",
            action="append",
            dest="filters",
            type=filter_python,
            metavar="PYTHON_CODE",
            help=_(
                "Python expression which much return a bool (True to keep item, "
                'False to reject it). "item" is raw text item, "item_xml" is '
                "lxml's etree.Element"
            ),
        )

        # filters flags
        flag_case = partial(self.filter_flag, type_="ignore-case")
        flag_invert = partial(self.filter_flag, type_="invert")
        flag_dotall = partial(self.filter_flag, type_="dotall")
        flag_matching = partial(self.filter_flag, type_="only-matching")
        flags = self.parser.add_argument_group(
            _("filters flags"),
            _("filters modifiers (change behaviour of following filters)"),
        )
        flags.add_argument(
            "-C",
            "--ignore-case",
            action="append",
            dest="filters",
            type=flag_case,
            const=("ignore-case", True),
            nargs="?",
            metavar="BOOLEAN",
            help=_("(don't) ignore case in following filters (DEFAULT: case sensitive)"),
        )
        flags.add_argument(
            "-I",
            "--invert",
            action="append",
            dest="filters",
            type=flag_invert,
            const=("invert", True),
            nargs="?",
            metavar="BOOLEAN",
            help=_("(don't) invert effect of following filters (DEFAULT: don't invert)"),
        )
        flags.add_argument(
            "-A",
            "--dot-all",
            action="append",
            dest="filters",
            type=flag_dotall,
            const=("dotall", True),
            nargs="?",
            metavar="BOOLEAN",
            help=_("(don't) use DOTALL option for regex (DEFAULT: don't use)"),
        )
        flags.add_argument(
            "-k",
            "--only-matching",
            action="append",
            dest="filters",
            type=flag_matching,
            const=("only-matching", True),
            nargs="?",
            metavar="BOOLEAN",
            help=_("keep only the matching part of the item"),
        )

        # action
        self.parser.add_argument(
            "action",
            default="print",
            nargs="?",
            choices=("print", "exec", "external"),
            help=_("action to do on found items (DEFAULT: print)"),
        )
        self.parser.add_argument("command", nargs=argparse.REMAINDER)

    async def get_items(self, depth, service, node, items):
        self.to_get += 1
        try:
            ps_result = data_format.deserialise(
                await self.host.bridge.ps_items_get(
                    service,
                    node,
                    self.args.node_max,
                    items,
                    "",
                    self.get_pubsub_extra(),
                    self.profile,
                )
            )
        except Exception as e:
            self.disp(
                f"can't get pubsub items at {service} (node: {node}): {e}",
                error=True,
            )
            self.to_get -= 1
        else:
            await self.search(ps_result, depth)

    def _check_pubsub_url(self, match, found_nodes):
        """check that the matched URL is an xmpp: one

        @param found_nodes(list[unicode]): found_nodes
            this list will be filled while xmpp: URIs are discovered
        """
        url = match.group(0)
        if url.startswith("xmpp"):
            try:
                url_data = uri.parse_xmpp_uri(url)
            except ValueError:
                return
            if url_data["type"] == "pubsub":
                found_node = {"service": url_data["path"], "node": url_data["node"]}
                if "item" in url_data:
                    found_node["item"] = url_data["item"]
                found_nodes.append(found_node)

    async def get_sub_nodes(self, item, depth):
        """look for pubsub URIs in item, and get_items on the linked nodes"""
        found_nodes = []
        checkURI = partial(self._check_pubsub_url, found_nodes=found_nodes)
        strings.RE_URL.sub(checkURI, item)
        for data in found_nodes:
            await self.get_items(
                depth + 1,
                data["service"],
                data["node"],
                [data["item"]] if "item" in data else [],
            )

    def parseXml(self, item):
        try:
            return self.etree.fromstring(item)
        except self.etree.XMLSyntaxError:
            self.disp(
                _(
                    "item doesn't looks like XML, you have probably used --only-matching "
                    "somewhere before and we have no more XML"
                ),
                error=True,
            )
            self.host.quit(C.EXIT_BAD_ARG)

    def filter(self, item):
        """apply filters given on command line

        if only-matching is used, item may be modified
        @return (tuple[bool, unicode]): a tuple with:
            - keep: True if item passed the filters
            - item: it is returned in case of modifications
        """
        ignore_case = False
        invert = False
        dotall = False
        only_matching = False
        item_xml = None
        for type_, value in self.args.filters:
            keep = True

            ## filters

            if type_ == "text":
                if ignore_case:
                    if value.lower() not in item.lower():
                        keep = False
                else:
                    if value not in item:
                        keep = False
                if keep and only_matching:
                    # doesn't really make sens to keep a fixed string
                    # so we raise an error
                    self.host.disp(
                        _("--only-matching used with fixed --text string, are you sure?"),
                        error=True,
                    )
                    self.host.quit(C.EXIT_BAD_ARG)
            elif type_ == "regex":
                flags = self.RE_FLAGS
                if ignore_case:
                    flags |= re.IGNORECASE
                if dotall:
                    flags |= re.DOTALL
                match = re.search(value, item, flags)
                keep = match != None
                if keep and only_matching:
                    item = match.group()
                    item_xml = None
            elif type_ == "xpath":
                if item_xml is None:
                    item_xml = self.parseXml(item)
                try:
                    elts = item_xml.xpath(value, namespaces=self.args.namespace)
                except self.etree.XPathEvalError as e:
                    self.disp(_("can't use xpath: {reason}").format(reason=e), error=True)
                    self.host.quit(C.EXIT_BAD_ARG)
                keep = bool(elts)
                if keep and only_matching:
                    item_xml = elts[0]
                    try:
                        item = self.etree.tostring(item_xml, encoding="unicode")
                    except TypeError:
                        # we have a string only, not an element
                        item = str(item_xml)
                        item_xml = None
            elif type_ == "python":
                if item_xml is None:
                    item_xml = self.parseXml(item)
                cmd_ns = {"etree": self.etree, "item": item, "item_xml": item_xml}
                try:
                    keep = eval(value, cmd_ns)
                except SyntaxError as e:
                    self.disp(str(e), error=True)
                    self.host.quit(C.EXIT_BAD_ARG)

                    ## flags

            elif type_ == "ignore-case":
                ignore_case = value
            elif type_ == "invert":
                invert = value
                #  we need to continue, else loop would end here
                continue
            elif type_ == "dotall":
                dotall = value
            elif type_ == "only-matching":
                only_matching = value
            else:
                raise exceptions.InternalError(
                    _("unknown filter type {type}").format(type=type_)
                )

            if invert:
                keep = not keep
            if not keep:
                return False, item

        return True, item

    async def do_item_action(self, item, metadata):
        """called when item has been kepts and the action need to be done

        @param item(unicode): accepted item
        """
        action = self.args.action
        if action == "print" or self.host.verbosity > 0:
            try:
                await self.output(item)
            except self.etree.XMLSyntaxError:
                # item is not valid XML, but a string
                # can happen when --only-matching is used
                self.disp(item)
        if action in self.EXEC_ACTIONS:
            item_elt = self.parseXml(item)
            if action == "exec":
                use = {
                    "service": metadata["service"],
                    "node": metadata["node"],
                    "item": item_elt.get("id"),
                    "profile": self.profile,
                }
                # we need to send a copy of self.args.command
                # else it would be modified
                parser_args, use_args = arg_tools.get_use_args(
                    self.host, self.args.command, use, verbose=self.host.verbosity > 1
                )
                cmd_args = sys.argv[0:1] + parser_args + use_args
            else:
                cmd_args = self.args.command

            self.disp(
                "COMMAND: {command}".format(
                    command=" ".join([arg_tools.escape(a) for a in cmd_args])
                ),
                2,
            )
            if action == "exec":
                p = await asyncio.create_subprocess_exec(*cmd_args)
                ret = await p.wait()
            else:
                p = await asyncio.create_subprocess_exec(*cmd_args, stdin=subprocess.PIPE)
                await p.communicate(item.encode(sys.getfilesystemencoding()))
                ret = p.returncode
            if ret != 0:
                self.disp(
                    A.color(
                        C.A_FAILURE,
                        _("executed command failed with exit code {ret}").format(ret=ret),
                    )
                )

    async def search(self, ps_result, depth):
        """callback of get_items

        this method filters items, get sub nodes if needed,
        do the requested action, and exit the command when everything is done
        @param items_data(tuple): result of get_items
        @param depth(int): current depth level
            0 for first node, 1 for first children, and so on
        """
        for item in ps_result["items"]:
            if depth < self.args.max_depth:
                await self.get_sub_nodes(item, depth)
            keep, item = self.filter(item)
            if not keep:
                continue
            await self.do_item_action(item, ps_result)

            #  we check if we got all get_items results
        self.to_get -= 1
        if self.to_get == 0:
            # yes, we can quit
            self.host.quit()
        assert self.to_get > 0

    async def start(self):
        if self.args.command:
            if self.args.action not in self.EXEC_ACTIONS:
                self.parser.error(
                    _("Command can only be used with {actions} actions").format(
                        actions=", ".join(self.EXEC_ACTIONS)
                    )
                )
        else:
            if self.args.action in self.EXEC_ACTIONS:
                self.parser.error(_("you need to specify a command to execute"))
        if not self.args.node:
            # TODO: handle get service affiliations when node is not set
            self.parser.error(_("empty node is not handled yet"))
            # to_get is increased on each get and decreased on each answer
            # when it reach 0 again, the command is finished
        self.to_get = 0
        self._etree = None
        if self.args.filters is None:
            self.args.filters = []
        self.args.namespace = dict(
            self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")]
        )
        await self.get_items(0, self.args.service, self.args.node, self.args.items)


class Transform(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "transform",
            use_pubsub=True,
            pubsub_flags={C.NODE, C.MULTI_ITEMS},
            help=_("modify items of a node using an external command/script"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "--apply",
            action="store_true",
            help=_("apply transformation (DEFAULT: do a dry run)"),
        )
        self.parser.add_argument(
            "--admin",
            action="store_true",
            help=_("do a pubsub admin request, needed to change publisher"),
        )
        self.parser.add_argument(
            "-I",
            "--ignore-errors",
            action="store_true",
            help=_(
                "if command return a non zero exit code, ignore the item and continue"
            ),
        )
        self.parser.add_argument(
            "-A",
            "--all",
            action="store_true",
            help=_("get all items by looping over all pages using RSM"),
        )
        self.parser.add_argument(
            "command_path",
            help=_(
                "path to the command to use. Will be called repetitivly with an "
                "item as input. Output (full item XML) will be used as new one. "
                'Return "DELETE" string to delete the item, and "SKIP" to ignore it'
            ),
        )

    async def ps_items_send_cb(self, item_ids, metadata):
        if item_ids:
            self.disp(
                _("items published with ids {item_ids}").format(
                    item_ids=", ".join(item_ids)
                )
            )
        else:
            self.disp(_("items published"))
        if self.args.all:
            return await self.handle_next_page(metadata)
        else:
            self.host.quit()

    async def handle_next_page(self, metadata):
        """Retrieve new page through RSM or quit if we're in the last page

        use to handle --all option
        @param metadata(dict): metadata as returned by ps_items_get
        """
        try:
            last = metadata["rsm"]["last"]
            index = int(metadata["rsm"]["index"])
            count = int(metadata["rsm"]["count"])
        except KeyError:
            self.disp(
                _("Can't retrieve all items, RSM metadata not available"), error=True
            )
            self.host.quit(C.EXIT_MISSING_FEATURE)
        except ValueError as e:
            self.disp(
                _("Can't retrieve all items, bad RSM metadata: {msg}").format(msg=e),
                error=True,
            )
            self.host.quit(C.EXIT_ERROR)

        if index + self.args.rsm_max >= count:
            self.disp(_("All items transformed"))
            self.host.quit(0)

        self.disp(
            _("Retrieving next page ({page_idx}/{page_total})").format(
                page_idx=int(index / self.args.rsm_max) + 1,
                page_total=int(count / self.args.rsm_max),
            )
        )

        extra = self.get_pubsub_extra()
        extra["rsm_after"] = last
        try:
            ps_result = await data_format.deserialise(
                self.host.bridge.ps_items_get(
                    self.args.service,
                    self.args.node,
                    self.args.rsm_max,
                    self.args.items,
                    "",
                    data_format.serialise(extra),
                    self.profile,
                )
            )
        except Exception as e:
            self.disp(f"can't retrieve items: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.ps_items_get_cb(ps_result)

    async def ps_items_get_cb(self, ps_result):
        encoding = "utf-8"
        new_items = []

        for item in ps_result["items"]:
            if self.check_duplicates:
                # this is used when we are not ordering by creation
                # to avoid infinite loop
                item_elt, __ = xml_tools.etree_parse(self, item)
                item_id = item_elt.get("id")
                if item_id in self.items_ids:
                    self.disp(
                        _(
                            "Duplicate found on item {item_id}, we have probably handled "
                            "all items."
                        ).format(item_id=item_id)
                    )
                    self.host.quit()
                self.items_ids.append(item_id)

                # we launch the command to filter the item
            try:
                p = await asyncio.create_subprocess_exec(
                    self.args.command_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE
                )
            except OSError as e:
                exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR
                self.disp(f"Can't execute the command: {e}", error=True)
                self.host.quit(exit_code)
            encoding = "utf-8"
            cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding))
            ret = p.returncode
            if ret != 0:
                self.disp(
                    f"The command returned a non zero status while parsing the "
                    f"following item:\n\n{item}",
                    error=True,
                )
                if self.args.ignore_errors:
                    continue
                else:
                    self.host.quit(C.EXIT_CMD_ERROR)
            if cmd_std_err is not None:
                cmd_std_err = cmd_std_err.decode(encoding, errors="ignore")
                self.disp(cmd_std_err, error=True)
            cmd_std_out = cmd_std_out.decode(encoding).strip()
            if cmd_std_out == "DELETE":
                item_elt, __ = xml_tools.etree_parse(self, item)
                item_id = item_elt.get("id")
                self.disp(_("Deleting item {item_id}").format(item_id=item_id))
                if self.args.apply:
                    try:
                        await self.host.bridge.ps_item_retract(
                            self.args.service,
                            self.args.node,
                            item_id,
                            False,
                            self.profile,
                        )
                    except Exception as e:
                        self.disp(f"can't delete item {item_id}: {e}", error=True)
                        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
                continue
            elif cmd_std_out == "SKIP":
                item_elt, __ = xml_tools.etree_parse(self, item)
                item_id = item_elt.get("id")
                self.disp(_("Skipping item {item_id}").format(item_id=item_id))
                continue
            element, etree = xml_tools.etree_parse(self, cmd_std_out)

            # at this point command has been run and we have a etree.Element object
            if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"):
                self.disp(
                    "your script must return a whole item, this is not:\n{xml}".format(
                        xml=etree.tostring(element, encoding="unicode")
                    ),
                    error=True,
                )
                self.host.quit(C.EXIT_DATA_ERROR)

            if not self.args.apply:
                # we have a dry run, we just display filtered items
                serialised = etree.tostring(
                    element, encoding="unicode", pretty_print=True
                )
                self.disp(serialised)
            else:
                new_items.append(etree.tostring(element, encoding="unicode"))

        if not self.args.apply:
            # on dry run we have nothing to wait for, we can quit
            if self.args.all:
                return await self.handle_next_page(ps_result)
            self.host.quit()
        else:
            if self.args.admin:
                bridge_method = self.host.bridge.ps_admin_items_send
            else:
                bridge_method = self.host.bridge.ps_items_send

            try:
                ps_items_send_result = await bridge_method(
                    self.args.service,
                    self.args.node,
                    new_items,
                    "",
                    self.profile,
                )
            except Exception as e:
                self.disp(f"can't send item: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
            else:
                await self.ps_items_send_cb(ps_items_send_result, metadata=ps_result)

    async def start(self):
        if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
            self.check_duplicates = True
            self.items_ids = []
            self.disp(
                A.color(
                    A.FG_RED,
                    A.BOLD,
                    '/!\\ "--all" should be used with "--order-by creation" /!\\\n',
                    A.RESET,
                    "We'll update items, so order may change during transformation,\n"
                    "we'll try to mitigate that by stopping on first duplicate,\n"
                    "but this method is not safe, and some items may be missed.\n---\n",
                )
            )
        else:
            self.check_duplicates = False

        try:
            ps_result = data_format.deserialise(
                await self.host.bridge.ps_items_get(
                    self.args.service,
                    self.args.node,
                    self.args.max,
                    self.args.items,
                    "",
                    self.get_pubsub_extra(),
                    self.profile,
                )
            )
        except Exception as e:
            self.disp(f"can't retrieve items: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.ps_items_get_cb(ps_result)


class Uri(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "uri",
            use_profile=False,
            use_pubsub=True,
            pubsub_flags={C.NODE, C.SINGLE_ITEM},
            help=_("build URI"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-p",
            "--profile",
            default=C.PROF_KEY_DEFAULT,
            help=_("profile (used when no server is specified)"),
        )

    def display_uri(self, jid_):
        uri_args = {}
        if not self.args.service:
            self.args.service = jid.JID(jid_).bare

        for key in ("node", "service", "item"):
            value = getattr(self.args, key)
            if key == "service":
                key = "path"
            if value:
                uri_args[key] = value
        self.disp(uri.build_xmpp_uri("pubsub", **uri_args))
        self.host.quit()

    async def start(self):
        if not self.args.service:
            try:
                jid_ = await self.host.bridge.param_get_a_async(
                    "JabberID", "Connection", profile_key=self.args.profile
                )
            except Exception as e:
                self.disp(f"can't retrieve jid: {e}", error=True)
                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
            else:
                self.display_uri(jid_)
        else:
            self.display_uri(None)


class AttachmentGet(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "get",
            use_output=C.OUTPUT_LIST_DICT,
            use_pubsub=True,
            pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
            help=_("get data attached to an item"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-j",
            "--jid",
            action="append",
            dest="jids",
            help=_(
                "get attached data published only by those JIDs (DEFAULT: get all "
                "attached data)"
            )
        )

    async def start(self):
        try:
            attached_data, __ = await self.host.bridge.ps_attachments_get(
                self.args.service,
                self.args.node,
                self.args.item,
                self.args.jids or [],
                "",
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't get attached data: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            attached_data = data_format.deserialise(attached_data, type_check=list)
            await self.output(attached_data)
            self.host.quit(C.EXIT_OK)


class AttachmentSet(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "set",
            use_pubsub=True,
            pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
            help=_("attach data to an item"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "--replace",
            action="store_true",
            help=_(
                "replace previous versions of attachments (DEFAULT: update previous "
                "version)"
            )
        )
        self.parser.add_argument(
            "-N",
            "--noticed",
            metavar="BOOLEAN",
            nargs="?",
            default="keep",
            help=_("mark item as (un)noticed (DEFAULT: keep current value))")
        )
        self.parser.add_argument(
            "-r",
            "--reactions",
            # FIXME: to be replaced by "extend" when we stop supporting python 3.7
            action="append",
            help=_("emojis to add to react to an item")
        )
        self.parser.add_argument(
            "-R",
            "--reactions-remove",
            # FIXME: to be replaced by "extend" when we stop supporting python 3.7
            action="append",
            help=_("emojis to remove from reactions to an item")
        )

    async def start(self):
        attachments_data = {
            "service": self.args.service,
            "node": self.args.node,
            "id": self.args.item,
            "extra": {}
        }
        operation = "replace" if self.args.replace else "update"
        if self.args.noticed != "keep":
            if self.args.noticed is None:
                self.args.noticed = C.BOOL_TRUE
            attachments_data["extra"]["noticed"] = C.bool(self.args.noticed)

        if self.args.reactions or self.args.reactions_remove:
            reactions = attachments_data["extra"]["reactions"] = {
                "operation": operation
            }
            if self.args.replace:
                reactions["reactions"] = self.args.reactions
            else:
                reactions["add"] = self.args.reactions
                reactions["remove"] = self.args.reactions_remove


        if not attachments_data["extra"]:
            self.parser.error(_("At leat one attachment must be specified."))

        try:
            await self.host.bridge.ps_attachments_set(
                data_format.serialise(attachments_data),
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't attach data to item: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("data attached")
            self.host.quit(C.EXIT_OK)


class Attachments(base.CommandBase):
    subcommands = (AttachmentGet, AttachmentSet)

    def __init__(self, host):
        super().__init__(
            host,
            "attachments",
            use_profile=False,
            help=_("set or retrieve items attachments"),
        )


class SignatureSign(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "sign",
            use_pubsub=True,
            pubsub_flags={C.NODE, C.SINGLE_ITEM},
            help=_("sign an item"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        attachments_data = {
            "service": self.args.service,
            "node": self.args.node,
            "id": self.args.item,
            "extra": {
                # we set None to use profile's bare JID
                "signature": {"signer": None}
            }
        }
        try:
            await self.host.bridge.ps_attachments_set(
                data_format.serialise(attachments_data),
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't sign the item: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(f"item {self.args.item!r} has been signed")
            self.host.quit(C.EXIT_OK)


class SignatureCheck(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "check",
            use_output=C.OUTPUT_DICT,
            use_pubsub=True,
            pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
            help=_("check the validity of pubsub signature"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "signature",
            metavar="JSON",
            help=_("signature data")
        )

    async def start(self):
        try:
            ret_s = await self.host.bridge.ps_signature_check(
                self.args.service,
                self.args.node,
                self.args.item,
                self.args.signature,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't check signature: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            await self.output(data_format.deserialise((ret_s)))
            self.host.quit()


class Signature(base.CommandBase):
    subcommands = (
        SignatureSign,
        SignatureCheck,
    )

    def __init__(self, host):
        super().__init__(
            host, "signature", use_profile=False, help=_("items signatures")
        )


class SecretShare(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "share",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("share a secret to let other entity encrypt or decrypt items"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-k", "--key", metavar="ID", dest="secret_ids", action="append", default=[],
            help=_(
                "only share secrets with those IDs (default: share all secrets of the "
                "node)"
            )
        )
        self.parser.add_argument(
            "recipient", metavar="JID", help=_("entity who must get the shared secret")
        )

    async def start(self):
        try:
            await self.host.bridge.ps_secret_share(
                self.args.recipient,
                self.args.service,
                self.args.node,
                self.args.secret_ids,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't share secret: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("secrets have been shared")
            self.host.quit(C.EXIT_OK)


class SecretRevoke(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "revoke",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("revoke an encrypted node secret"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "secret_id", help=_("ID of the secrets to revoke")
        )
        self.parser.add_argument(
            "-r", "--recipient", dest="recipients", metavar="JID", action="append",
            default=[], help=_(
                "entity who must get the revocation notification (default: send to all "
                "entities known to have the shared secret)"
            )
        )

    async def start(self):
        try:
            await self.host.bridge.ps_secret_revoke(
                self.args.service,
                self.args.node,
                self.args.secret_id,
                self.args.recipients,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't revoke secret: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("secret {self.args.secret_id} has been revoked.")
            self.host.quit(C.EXIT_OK)


class SecretRotate(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "rotate",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("revoke existing secrets, create a new one and send notifications"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-r", "--recipient", dest="recipients", metavar="JID", action="append",
            default=[], help=_(
                "entity who must get the revocation and shared secret notifications "
                "(default: send to all entities known to have the shared secret)"
            )
        )

    async def start(self):
        try:
            await self.host.bridge.ps_secret_rotate(
                self.args.service,
                self.args.node,
                self.args.recipients,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't rotate secret: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp("secret has been rotated")
            self.host.quit(C.EXIT_OK)


class SecretList(base.CommandBase):
    def __init__(self, host):
        super().__init__(
            host,
            "list",
            use_pubsub=True,
            use_verbose=True,
            pubsub_flags={C.NODE},
            help=_("list known secrets for a pubsub node"),
            use_output=C.OUTPUT_LIST_DICT
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            secrets = data_format.deserialise(await self.host.bridge.ps_secrets_list(
                self.args.service,
                self.args.node,
                self.profile,
            ), type_check=list)
        except Exception as e:
            self.disp(f"can't list node secrets: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            if not self.verbosity:
                # we don't print key if verbosity is not a least one, to avoid showing it
                # on the screen accidentally
                for secret in secrets:
                    del secret["key"]
            await self.output(secrets)
            self.host.quit(C.EXIT_OK)


class Secret(base.CommandBase):
    subcommands = (SecretShare, SecretRevoke, SecretRotate, SecretList)

    def __init__(self, host):
        super().__init__(
            host,
            "secret",
            use_profile=False,
            help=_("handle encrypted nodes secrets"),
        )


class HookCreate(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "create",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("create a Pubsub hook"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-t",
            "--type",
            default="python",
            choices=("python", "python_file", "python_code"),
            help=_("hook type"),
        )
        self.parser.add_argument(
            "-P",
            "--persistent",
            action="store_true",
            help=_("make hook persistent across restarts"),
        )
        self.parser.add_argument(
            "hook_arg",
            help=_("argument of the hook (depend of the type)"),
        )

    @staticmethod
    def check_args(self):
        if self.args.type == "python_file":
            self.args.hook_arg = os.path.abspath(self.args.hook_arg)
            if not os.path.isfile(self.args.hook_arg):
                self.parser.error(
                    _("{path} is not a file").format(path=self.args.hook_arg)
                )

    async def start(self):
        self.check_args(self)
        try:
            await self.host.bridge.ps_hook_add(
                self.args.service,
                self.args.node,
                self.args.type,
                self.args.hook_arg,
                self.args.persistent,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't create hook: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.host.quit()


class HookDelete(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "delete",
            use_pubsub=True,
            pubsub_flags={C.NODE},
            help=_("delete a Pubsub hook"),
        )

    def add_parser_options(self):
        self.parser.add_argument(
            "-t",
            "--type",
            default="",
            choices=("", "python", "python_file", "python_code"),
            help=_("hook type to remove, empty to remove all (DEFAULT: remove all)"),
        )
        self.parser.add_argument(
            "-a",
            "--arg",
            dest="hook_arg",
            default="",
            help=_(
                "argument of the hook to remove, empty to remove all (DEFAULT: remove all)"
            ),
        )

    async def start(self):
        HookCreate.check_args(self)
        try:
            nb_deleted = await self.host.bridge.ps_hook_remove(
                self.args.service,
                self.args.node,
                self.args.type,
                self.args.hook_arg,
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't delete hook: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            self.disp(
                _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted)
            )
            self.host.quit()


class HookList(base.CommandBase):
    def __init__(self, host):
        base.CommandBase.__init__(
            self,
            host,
            "list",
            use_output=C.OUTPUT_LIST_DICT,
            help=_("list hooks of a profile"),
        )

    def add_parser_options(self):
        pass

    async def start(self):
        try:
            data = await self.host.bridge.ps_hook_list(
                self.profile,
            )
        except Exception as e:
            self.disp(f"can't list hooks: {e}", error=True)
            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
        else:
            if not data:
                self.disp(_("No hook found."))
            await self.output(data)
            self.host.quit()


class Hook(base.CommandBase):
    subcommands = (HookCreate, HookDelete, HookList)

    def __init__(self, host):
        super(Hook, self).__init__(
            host,
            "hook",
            use_profile=False,
            use_verbose=True,
            help=_("trigger action on Pubsub notifications"),
        )


class Pubsub(base.CommandBase):
    subcommands = (
        Set,
        Get,
        Delete,
        Edit,
        Rename,
        Subscribe,
        Unsubscribe,
        Subscriptions,
        Affiliations,
        Reference,
        Search,
        Transform,
        Attachments,
        Signature,
        Secret,
        Hook,
        Uri,
        Node,
        Cache,
    )

    def __init__(self, host):
        super(Pubsub, self).__init__(
            host, "pubsub", use_profile=False, help=_("PubSub nodes/items management")
        )