diff libervia/cli/cmd_pubsub.py @ 4075:47401850dec6

refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:54:26 +0200
parents libervia/frontends/jp/cmd_pubsub.py@26b7ed2817da
children 3f7ca590a5da
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/cli/cmd_pubsub.py	Fri Jun 02 14:54:26 2023 +0200
@@ -0,0 +1,3030 @@
+#!/usr/bin/env python3
+
+
+# Libervia CLI
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import argparse
+import os.path
+import re
+import sys
+import subprocess
+import asyncio
+import json
+from . import base
+from libervia.backend.core.i18n import _
+from libervia.backend.core import exceptions
+from libervia.cli.constants import Const as C
+from libervia.cli import common
+from libervia.cli import arg_tools
+from libervia.cli import xml_tools
+from functools import partial
+from libervia.backend.tools.common import data_format
+from libervia.backend.tools.common import uri
+from libervia.backend.tools.common.ansi import ANSI as A
+from libervia.backend.tools.common import date_utils
+from libervia.frontends.tools import jid, strings
+from libervia.frontends.bridge.bridge_frontend import BridgeException
+
+__commands__ = ["Pubsub"]
+
+PUBSUB_TMP_DIR = "pubsub"
+PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema"
+ALLOWED_SUBSCRIPTIONS_OWNER = ("subscribed", "pending", "none")
+
+# TODO: need to split this class in several modules, plugin should handle subcommands
+
+
+class NodeInfo(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "info",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("retrieve node configuration"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-k",
+            "--key",
+            action="append",
+            dest="keys",
+            help=_("data key to filter"),
+        )
+
+    def remove_prefix(self, key):
+        return key[7:] if key.startswith("pubsub#") else key
+
+    def filter_key(self, key):
+        return any((key == k or key == "pubsub#" + k) for k in self.args.keys)
+
+    async def start(self):
+        try:
+            config_dict = await self.host.bridge.ps_node_configuration_get(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except BridgeException as e:
+            if e.condition == "item-not-found":
+                self.disp(
+                    f"The node {self.args.node} doesn't exist on {self.args.service}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't get node configuration: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            key_filter = (lambda k: True) if not self.args.keys else self.filter_key
+            config_dict = {
+                self.remove_prefix(k): v for k, v in config_dict.items() if key_filter(k)
+            }
+            await self.output(config_dict)
+            self.host.quit()
+
+
+class NodeCreate(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "create",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("create a node"),
+        )
+
+    @staticmethod
+    def add_node_config_options(parser):
+        parser.add_argument(
+            "-f",
+            "--field",
+            action="append",
+            nargs=2,
+            dest="fields",
+            default=[],
+            metavar=("KEY", "VALUE"),
+            help=_("configuration field to set"),
+        )
+        parser.add_argument(
+            "-F",
+            "--full-prefix",
+            action="store_true",
+            help=_('don\'t prepend "pubsub#" prefix to field names'),
+        )
+
+    def add_parser_options(self):
+        self.add_node_config_options(self.parser)
+
+    @staticmethod
+    def get_config_options(args):
+        if not args.full_prefix:
+            return {"pubsub#" + k: v for k, v in args.fields}
+        else:
+            return dict(args.fields)
+
+    async def start(self):
+        options = self.get_config_options(self.args)
+        try:
+            node_id = await self.host.bridge.ps_node_create(
+                self.args.service,
+                self.args.node,
+                options,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(msg=_("can't create node: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if self.host.verbosity:
+                announce = _("node created successfully: ")
+            else:
+                announce = ""
+            self.disp(announce + node_id)
+            self.host.quit()
+
+
+class NodePurge(base.CommandBase):
+    def __init__(self, host):
+        super(NodePurge, self).__init__(
+            host,
+            "purge",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("purge a node (i.e. remove all items from it)"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f",
+            "--force",
+            action="store_true",
+            help=_("purge node without confirmation"),
+        )
+
+    async def start(self):
+        if not self.args.force:
+            if not self.args.service:
+                message = _(
+                    "Are you sure to purge PEP node [{node}]? This will "
+                    "delete ALL items from it!"
+                ).format(node=self.args.node)
+            else:
+                message = _(
+                    "Are you sure to delete node [{node}] on service "
+                    "[{service}]? This will delete ALL items from it!"
+                ).format(node=self.args.node, service=self.args.service)
+            await self.host.confirm_or_quit(message, _("node purge cancelled"))
+
+        try:
+            await self.host.bridge.ps_node_purge(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(msg=_("can't purge node: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("node [{node}] purged successfully").format(node=self.args.node))
+            self.host.quit()
+
+
+class NodeDelete(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "delete",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("delete a node"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f",
+            "--force",
+            action="store_true",
+            help=_("delete node without confirmation"),
+        )
+
+    async def start(self):
+        if not self.args.force:
+            if not self.args.service:
+                message = _("Are you sure to delete PEP node [{node}] ?").format(
+                    node=self.args.node
+                )
+            else:
+                message = _(
+                    "Are you sure to delete node [{node}] on " "service [{service}]?"
+                ).format(node=self.args.node, service=self.args.service)
+            await self.host.confirm_or_quit(message, _("node deletion cancelled"))
+
+        try:
+            await self.host.bridge.ps_node_delete(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't delete node: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("node [{node}] deleted successfully").format(node=self.args.node))
+            self.host.quit()
+
+
+class NodeSet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "set",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("set node configuration"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f",
+            "--field",
+            action="append",
+            nargs=2,
+            dest="fields",
+            required=True,
+            metavar=("KEY", "VALUE"),
+            help=_("configuration field to set (required)"),
+        )
+        self.parser.add_argument(
+            "-F",
+            "--full-prefix",
+            action="store_true",
+            help=_('don\'t prepend "pubsub#" prefix to field names'),
+        )
+
+    def get_key_name(self, k):
+        if self.args.full_prefix or k.startswith("pubsub#"):
+            return k
+        else:
+            return "pubsub#" + k
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_node_configuration_set(
+                self.args.service,
+                self.args.node,
+                {self.get_key_name(k): v for k, v in self.args.fields},
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set node configuration: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("node configuration successful"), 1)
+            self.host.quit()
+
+
+class NodeImport(base.CommandBase):
+    def __init__(self, host):
+        super(NodeImport, self).__init__(
+            host,
+            "import",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("import raw XML to a node"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--admin",
+            action="store_true",
+            help=_("do a pubsub admin request, needed to change publisher"),
+        )
+        self.parser.add_argument(
+            "import_file",
+            type=argparse.FileType(),
+            help=_(
+                "path to the XML file with data to import. The file must contain "
+                "whole XML of each item to import."
+            ),
+        )
+
+    async def start(self):
+        try:
+            element, etree = xml_tools.etree_parse(
+                self, self.args.import_file, reraise=True
+            )
+        except Exception as e:
+            from lxml.etree import XMLSyntaxError
+
+            if isinstance(e, XMLSyntaxError) and e.code == 5:
+                # we have extra content, this probaby means that item are not wrapped
+                # so we wrap them here and try again
+                self.args.import_file.seek(0)
+                xml_buf = "<import>" + self.args.import_file.read() + "</import>"
+                element, etree = xml_tools.etree_parse(self, xml_buf)
+
+                # we reverse element as we expect to have most recently published element first
+                # TODO: make this more explicit and add an option
+        element[:] = reversed(element)
+
+        if not all([i.tag == "{http://jabber.org/protocol/pubsub}item" for i in element]):
+            self.disp(
+                _("You are not using list of pubsub items, we can't import this file"),
+                error=True,
+            )
+            self.host.quit(C.EXIT_DATA_ERROR)
+            return
+
+        items = [etree.tostring(i, encoding="unicode") for i in element]
+        if self.args.admin:
+            method = self.host.bridge.ps_admin_items_send
+        else:
+            self.disp(
+                _(
+                    "Items are imported without using admin mode, publisher can't "
+                    "be changed"
+                )
+            )
+            method = self.host.bridge.ps_items_send
+
+        try:
+            items_ids = await method(
+                self.args.service,
+                self.args.node,
+                items,
+                "",
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't send items: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if items_ids:
+                self.disp(
+                    _("items published with id(s) {items_ids}").format(
+                        items_ids=", ".join(items_ids)
+                    )
+                )
+            else:
+                self.disp(_("items published"))
+            self.host.quit()
+
+
+class NodeAffiliationsGet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "get",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("retrieve node affiliations (for node owner)"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            affiliations = await self.host.bridge.ps_node_affiliations_get(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get node affiliations: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(affiliations)
+            self.host.quit()
+
+
+class NodeAffiliationsSet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "set",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("set affiliations (for node owner)"),
+        )
+
+    def add_parser_options(self):
+        # XXX: we use optional argument syntax for a required one because list of list of 2 elements
+        #      (used to construct dicts) don't work with positional arguments
+        self.parser.add_argument(
+            "-a",
+            "--affiliation",
+            dest="affiliations",
+            metavar=("JID", "AFFILIATION"),
+            required=True,
+            action="append",
+            nargs=2,
+            help=_("entity/affiliation couple(s)"),
+        )
+
+    async def start(self):
+        affiliations = dict(self.args.affiliations)
+        try:
+            await self.host.bridge.ps_node_affiliations_set(
+                self.args.service,
+                self.args.node,
+                affiliations,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set node affiliations: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("affiliations have been set"), 1)
+            self.host.quit()
+
+
+class NodeAffiliations(base.CommandBase):
+    subcommands = (NodeAffiliationsGet, NodeAffiliationsSet)
+
+    def __init__(self, host):
+        super(NodeAffiliations, self).__init__(
+            host,
+            "affiliations",
+            use_profile=False,
+            help=_("set or retrieve node affiliations"),
+        )
+
+
+class NodeSubscriptionsGet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "get",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("retrieve node subscriptions (for node owner)"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--public",
+            action="store_true",
+            help=_("get public subscriptions"),
+        )
+
+    async def start(self):
+        if self.args.public:
+            method = self.host.bridge.ps_public_node_subscriptions_get
+        else:
+            method = self.host.bridge.ps_node_subscriptions_get
+        try:
+            subscriptions = await method(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get node subscriptions: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(subscriptions)
+            self.host.quit()
+
+
+class StoreSubscriptionAction(argparse.Action):
+    """Action which handle subscription parameter for owner
+
+    list is given by pairs: jid and subscription state
+    if subscription state is not specified, it default to "subscribed"
+    """
+
+    def __call__(self, parser, namespace, values, option_string):
+        dest_dict = getattr(namespace, self.dest)
+        while values:
+            jid_s = values.pop(0)
+            try:
+                subscription = values.pop(0)
+            except IndexError:
+                subscription = "subscribed"
+            if subscription not in ALLOWED_SUBSCRIPTIONS_OWNER:
+                parser.error(
+                    _("subscription must be one of {}").format(
+                        ", ".join(ALLOWED_SUBSCRIPTIONS_OWNER)
+                    )
+                )
+            dest_dict[jid_s] = subscription
+
+
+class NodeSubscriptionsSet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "set",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("set/modify subscriptions (for node owner)"),
+        )
+
+    def add_parser_options(self):
+        # XXX: we use optional argument syntax for a required one because list of list of 2 elements
+        #      (uses to construct dicts) don't work with positional arguments
+        self.parser.add_argument(
+            "-S",
+            "--subscription",
+            dest="subscriptions",
+            default={},
+            nargs="+",
+            metavar=("JID [SUSBSCRIPTION]"),
+            required=True,
+            action=StoreSubscriptionAction,
+            help=_("entity/subscription couple(s)"),
+        )
+
+    async def start(self):
+        try:
+            self.host.bridge.ps_node_subscriptions_set(
+                self.args.service,
+                self.args.node,
+                self.args.subscriptions,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set node subscriptions: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("subscriptions have been set"), 1)
+            self.host.quit()
+
+
+class NodeSubscriptions(base.CommandBase):
+    subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet)
+
+    def __init__(self, host):
+        super(NodeSubscriptions, self).__init__(
+            host,
+            "subscriptions",
+            use_profile=False,
+            help=_("get or modify node subscriptions"),
+        )
+
+
+class NodeSchemaSet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "set",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("set/replace a schema"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument("schema", help=_("schema to set (must be XML)"))
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_schema_set(
+                self.args.service,
+                self.args.node,
+                self.args.schema,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set schema: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("schema has been set"), 1)
+            self.host.quit()
+
+
+class NodeSchemaEdit(base.CommandBase, common.BaseEdit):
+    use_items = False
+
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "edit",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_draft=True,
+            use_verbose=True,
+            help=_("edit a schema"),
+        )
+        common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR)
+
+    def add_parser_options(self):
+        pass
+
+    async def publish(self, schema):
+        try:
+            await self.host.bridge.ps_schema_set(
+                self.args.service,
+                self.args.node,
+                schema,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't set schema: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("schema has been set"), 1)
+            self.host.quit()
+
+    async def ps_schema_get_cb(self, schema):
+        try:
+            from lxml import etree
+        except ImportError:
+            self.disp(
+                "lxml module must be installed to use edit, please install it "
+                'with "pip install lxml"',
+                error=True,
+            )
+            self.host.quit(1)
+        content_file_obj, content_file_path = self.get_tmp_file()
+        schema = schema.strip()
+        if schema:
+            parser = etree.XMLParser(remove_blank_text=True)
+            schema_elt = etree.fromstring(schema, parser)
+            content_file_obj.write(
+                etree.tostring(schema_elt, encoding="utf-8", pretty_print=True)
+            )
+            content_file_obj.seek(0)
+        await self.run_editor(
+            "pubsub_schema_editor_args", content_file_path, content_file_obj
+        )
+
+    async def start(self):
+        try:
+            schema = await self.host.bridge.ps_schema_get(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except BridgeException as e:
+            if e.condition == "item-not-found" or e.classname == "NotFound":
+                schema = ""
+            else:
+                self.disp(f"can't edit schema: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+
+        await self.ps_schema_get_cb(schema)
+
+
+class NodeSchemaGet(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "get",
+            use_output=C.OUTPUT_XML,
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("get schema"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            schema = await self.host.bridge.ps_schema_get(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except BridgeException as e:
+            if e.condition == "item-not-found" or e.classname == "NotFound":
+                schema = None
+            else:
+                self.disp(f"can't get schema: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+
+        if schema:
+            await self.output(schema)
+            self.host.quit()
+        else:
+            self.disp(_("no schema found"), 1)
+            self.host.quit(C.EXIT_NOT_FOUND)
+
+
+class NodeSchema(base.CommandBase):
+    subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet)
+
+    def __init__(self, host):
+        super(NodeSchema, self).__init__(
+            host, "schema", use_profile=False, help=_("data schema manipulation")
+        )
+
+
+class Node(base.CommandBase):
+    subcommands = (
+        NodeInfo,
+        NodeCreate,
+        NodePurge,
+        NodeDelete,
+        NodeSet,
+        NodeImport,
+        NodeAffiliations,
+        NodeSubscriptions,
+        NodeSchema,
+    )
+
+    def __init__(self, host):
+        super(Node, self).__init__(
+            host, "node", use_profile=False, help=_("node handling")
+        )
+
+
+class CacheGet(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "get",
+            use_output=C.OUTPUT_LIST_XML,
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
+            help=_("get pubsub item(s) from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-S",
+            "--sub-id",
+            default="",
+            help=_("subscription id"),
+        )
+
+    async def start(self):
+        try:
+            ps_result = data_format.deserialise(
+                await self.host.bridge.ps_cache_get(
+                    self.args.service,
+                    self.args.node,
+                    self.args.max,
+                    self.args.items,
+                    self.args.sub_id,
+                    self.get_pubsub_extra(),
+                    self.profile,
+                )
+            )
+        except BridgeException as e:
+            if e.classname == "NotFound":
+                self.disp(
+                    f"The node {self.args.node} from {self.args.service} is not in cache "
+                    f"for {self.profile}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't get pubsub items from cache: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            await self.output(ps_result["items"])
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheSync(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "sync",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("(re)synchronise a pubsub node"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_cache_sync(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except BridgeException as e:
+            if e.condition == "item-not-found" or e.classname == "NotFound":
+                self.disp(
+                    f"The node {self.args.node} doesn't exist on {self.args.service}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't synchronise pubsub node: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CachePurge(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "purge",
+            use_profile=False,
+            help=_("purge (delete) items from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-s", "--service", action="append", metavar="JID", dest="services",
+            help="purge items only for these services. If not specified, items from ALL "
+            "services will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-n", "--node", action="append", dest="nodes",
+            help="purge items only for these nodes. If not specified, items from ALL "
+            "nodes will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-p", "--profile", action="append", dest="profiles",
+            help="purge items only for these profiles. If not specified, items from ALL "
+            "profiles will be purged. May be used several times."
+        )
+        self.parser.add_argument(
+            "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN",
+            help="purge items which have been last updated before given time."
+        )
+        self.parser.add_argument(
+            "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN",
+            help="purge items which have been last created before given time."
+        )
+        self.parser.add_argument(
+            "-t", "--type", action="append", dest="types",
+            help="purge items flagged with TYPE. May be used several times."
+        )
+        self.parser.add_argument(
+            "-S", "--subtype", action="append", dest="subtypes",
+            help="purge items flagged with SUBTYPE. May be used several times."
+        )
+        self.parser.add_argument(
+            "-f", "--force", action="store_true",
+            help=_("purge items without confirmation")
+        )
+
+    async def start(self):
+        if not self.args.force:
+            await self.host.confirm_or_quit(
+                _(
+                    "Are you sure to purge items from cache? You'll have to bypass cache "
+                    "or resynchronise nodes to access deleted items again."
+                ),
+                _("Items purgins has been cancelled.")
+            )
+        purge_data = {}
+        for key in (
+                "services", "nodes", "profiles", "updated_before", "created_before",
+                "types", "subtypes"
+        ):
+            value = getattr(self.args, key)
+            if value is not None:
+                purge_data[key] = value
+        try:
+            await self.host.bridge.ps_cache_purge(
+                data_format.serialise(
+                    purge_data
+                )
+            )
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheReset(base.CommandBase):
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "reset",
+            use_profile=False,
+            help=_("remove everything from cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f", "--force", action="store_true",
+            help=_("reset cache without confirmation")
+        )
+
+    async def start(self):
+        if not self.args.force:
+            await self.host.confirm_or_quit(
+                _(
+                    "Are you sure to reset cache? All nodes and items will be removed "
+                    "from it, then it will be progressively refilled as if it were new. "
+                    "This may be resources intensive."
+                ),
+                _("Pubsub cache reset has been cancelled.")
+            )
+        try:
+            await self.host.bridge.ps_cache_reset()
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            self.host.quit(C.EXIT_OK)
+
+
+class CacheSearch(base.CommandBase):
+    def __init__(self, host):
+        extra_outputs = {
+            "default": self.default_output,
+            "xml": self.xml_output,
+            "xml-raw": self.xml_raw_output,
+        }
+        super().__init__(
+            host,
+            "search",
+            use_profile=False,
+            use_output=C.OUTPUT_LIST_DICT,
+            extra_outputs=extra_outputs,
+            help=_("search for pubsub items in cache"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY"
+        )
+        self.parser.add_argument(
+            "-p", "--profile", action="append", dest="profiles", metavar="PROFILE",
+            help="search items only from these profiles. May be used several times."
+        )
+        self.parser.add_argument(
+            "-s", "--service", action="append", dest="services", metavar="SERVICE",
+            help="items must be from specified service. May be used several times."
+        )
+        self.parser.add_argument(
+            "-n", "--node", action="append", dest="nodes", metavar="NODE",
+            help="items must be in the specified node. May be used several times."
+        )
+        self.parser.add_argument(
+            "-t", "--type", action="append", dest="types", metavar="TYPE",
+            help="items must be of specified type. May be used several times."
+        )
+        self.parser.add_argument(
+            "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE",
+            help="items must be of specified subtype. May be used several times."
+        )
+        self.parser.add_argument(
+            "-P", "--payload", action="store_true", help=_("include item XML payload")
+        )
+        self.parser.add_argument(
+            "-o", "--order-by", action="append", nargs="+",
+            metavar=("ORDER", "[FIELD] [DIRECTION]"),
+            help=_("how items must be ordered. May be used several times.")
+        )
+        self.parser.add_argument(
+            "-l", "--limit", type=int, help=_("maximum number of items to return")
+        )
+        self.parser.add_argument(
+            "-i", "--index", type=int, help=_("return results starting from this index")
+        )
+        self.parser.add_argument(
+            "-F",
+            "--field",
+            action="append",
+            nargs=3,
+            dest="fields",
+            default=[],
+            metavar=("PATH", "OPERATOR", "VALUE"),
+            help=_("parsed data field filter. May be used several times."),
+        )
+        self.parser.add_argument(
+            "-k",
+            "--key",
+            action="append",
+            dest="keys",
+            metavar="KEY",
+            help=_(
+                "data key(s) to display. May be used several times. DEFAULT: show all "
+                "keys"
+            ),
+        )
+
+    async def start(self):
+        query = {}
+        for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"):
+            value = getattr(self.args, arg)
+            if value:
+                if arg in ("types", "subtypes"):
+                    # empty string is used to find items without type and/or subtype
+                    value = [v or None for v in value]
+                query[arg] = value
+        for arg in ("limit", "index"):
+            value = getattr(self.args, arg)
+            if value is not None:
+                query[arg] = value
+        if self.args.order_by is not None:
+            for order_data in self.args.order_by:
+                order, *args = order_data
+                if order == "field":
+                    if not args:
+                        self.parser.error(_("field data must be specified in --order-by"))
+                    elif len(args) == 1:
+                        path = args[0]
+                        direction = "asc"
+                    elif len(args) == 2:
+                        path, direction = args
+                    else:
+                        self.parser.error(_(
+                            "You can't specify more that 2 arguments for a field in "
+                            "--order-by"
+                        ))
+                    try:
+                        path = json.loads(path)
+                    except json.JSONDecodeError:
+                        pass
+                    order_query = {
+                        "path": path,
+                    }
+                else:
+                    order_query = {
+                        "order": order
+                    }
+                    if not args:
+                        direction = "asc"
+                    elif len(args) == 1:
+                        direction = args[0]
+                    else:
+                        self.parser.error(_(
+                            "there are too many arguments in --order-by option"
+                        ))
+                if direction.lower() not in ("asc", "desc"):
+                    self.parser.error(_("invalid --order-by direction: {direction!r}"))
+                order_query["direction"] = direction
+                query.setdefault("order-by", []).append(order_query)
+
+        if self.args.fields:
+            parsed = []
+            for field in self.args.fields:
+                path, operator, value = field
+                try:
+                    path = json.loads(path)
+                except json.JSONDecodeError:
+                    # this is not a JSON encoded value, we keep it as a string
+                    pass
+
+                if not isinstance(path, list):
+                    path = [path]
+
+                # handling of TP(<time pattern>)
+                if operator in (">", "gt", "<", "le", "between"):
+                    def datetime_sub(match):
+                        return str(date_utils.date_parse_ext(
+                            match.group(1), default_tz=date_utils.TZ_LOCAL
+                        ))
+                    value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value)
+
+                try:
+                    value = json.loads(value)
+                except json.JSONDecodeError:
+                    # not JSON, as above we keep it as string
+                    pass
+
+                if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
+                    if not isinstance(value, list):
+                        value = [value]
+
+                parsed.append({
+                    "path": path,
+                    "op": operator,
+                    "value": value
+                })
+
+            query["parsed"] = parsed
+
+        if self.args.payload or "xml" in self.args.output:
+            query["with_payload"] = True
+            if self.args.keys:
+                self.args.keys.append("item_payload")
+        try:
+            found_items = data_format.deserialise(
+                await self.host.bridge.ps_cache_search(
+                    data_format.serialise(query)
+                ),
+                type_check=list,
+            )
+        except BridgeException as e:
+            self.disp(f"can't search for pubsub items in cache: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            if self.args.keys:
+                found_items = [
+                    {k: v for k,v in item.items() if k in self.args.keys}
+                    for item in found_items
+                ]
+            await self.output(found_items)
+            self.host.quit(C.EXIT_OK)
+
+    def default_output(self, found_items):
+        for item in found_items:
+            for field in ("created", "published", "updated"):
+                try:
+                    timestamp = item[field]
+                except KeyError:
+                    pass
+                else:
+                    try:
+                        item[field] = common.format_time(timestamp)
+                    except ValueError:
+                        pass
+        self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items)
+
+    def xml_output(self, found_items):
+        """Output prettified item payload"""
+        cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"]
+        for item in found_items:
+            cb(item["item_payload"])
+
+    def xml_raw_output(self, found_items):
+        """Output item payload without prettifying"""
+        cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"]
+        for item in found_items:
+            cb(item["item_payload"])
+
+
+class Cache(base.CommandBase):
+    subcommands = (
+        CacheGet,
+        CacheSync,
+        CachePurge,
+        CacheReset,
+        CacheSearch,
+    )
+
+    def __init__(self, host):
+        super(Cache, self).__init__(
+            host, "cache", use_profile=False, help=_("pubsub cache handling")
+        )
+
+
+class Set(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "set",
+            use_pubsub=True,
+            use_quiet=True,
+            pubsub_flags={C.NODE},
+            help=_("publish a new item or update an existing one"),
+        )
+
+    def add_parser_options(self):
+        NodeCreate.add_node_config_options(self.parser)
+        self.parser.add_argument(
+            "-e",
+            "--encrypt",
+            action="store_true",
+            help=_("end-to-end encrypt the blog item")
+        )
+        self.parser.add_argument(
+            "--encrypt-for",
+            metavar="JID",
+            action="append",
+            help=_("encrypt a single item for")
+        )
+        self.parser.add_argument(
+            "-X",
+            "--sign",
+            action="store_true",
+            help=_("cryptographically sign the blog post")
+        )
+        self.parser.add_argument(
+            "item",
+            nargs="?",
+            default="",
+            help=_("id, URL of the item to update, keyword, or nothing for new item"),
+        )
+
+    async def start(self):
+        element, etree = xml_tools.etree_parse(self, sys.stdin)
+        element = xml_tools.get_payload(self, element)
+        payload = etree.tostring(element, encoding="unicode")
+        extra = {}
+        if self.args.encrypt:
+            extra["encrypted"] = True
+        if self.args.encrypt_for:
+            extra["encrypted_for"] = {"targets": self.args.encrypt_for}
+        if self.args.sign:
+            extra["signed"] = True
+        publish_options = NodeCreate.get_config_options(self.args)
+        if publish_options:
+            extra["publish_options"] = publish_options
+
+        try:
+            published_id = await self.host.bridge.ps_item_send(
+                self.args.service,
+                self.args.node,
+                payload,
+                self.args.item,
+                data_format.serialise(extra),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_("can't send item: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if published_id:
+                if self.args.quiet:
+                    self.disp(published_id, end="")
+                else:
+                    self.disp(f"Item published at {published_id}")
+            else:
+                self.disp("Item published")
+            self.host.quit(C.EXIT_OK)
+
+
+class Get(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "get",
+            use_output=C.OUTPUT_LIST_XML,
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
+            help=_("get pubsub item(s)"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-S",
+            "--sub-id",
+            default="",
+            help=_("subscription id"),
+        )
+        self.parser.add_argument(
+            "--no-decrypt",
+            action="store_true",
+            help=_("don't do automatic decryption of e2ee items"),
+        )
+        #  TODO: a key(s) argument to select keys to display
+
+    async def start(self):
+        extra = {}
+        if self.args.no_decrypt:
+            extra["decrypt"] = False
+        try:
+            ps_result = data_format.deserialise(
+                await self.host.bridge.ps_items_get(
+                    self.args.service,
+                    self.args.node,
+                    self.args.max,
+                    self.args.items,
+                    self.args.sub_id,
+                    self.get_pubsub_extra(extra),
+                    self.profile,
+                )
+            )
+        except BridgeException as e:
+            if e.condition == "item-not-found" or e.classname == "NotFound":
+                self.disp(
+                    f"The node {self.args.node} doesn't exist on {self.args.service}",
+                    error=True,
+                )
+                self.host.quit(C.EXIT_NOT_FOUND)
+            else:
+                self.disp(f"can't get pubsub items: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        except Exception as e:
+            self.disp(f"Internal error: {e}", error=True)
+            self.host.quit(C.EXIT_INTERNAL_ERROR)
+        else:
+            await self.output(ps_result["items"])
+            self.host.quit(C.EXIT_OK)
+
+
+class Delete(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "delete",
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM},
+            help=_("delete an item"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-f", "--force", action="store_true", help=_("delete without confirmation")
+        )
+        self.parser.add_argument(
+            "--no-notification", dest="notify", action="store_false",
+            help=_("do not send notification (not recommended)")
+        )
+
+    async def start(self):
+        if not self.args.item:
+            self.parser.error(_("You need to specify an item to delete"))
+        if not self.args.force:
+            message = _("Are you sure to delete item {item_id} ?").format(
+                item_id=self.args.item
+            )
+            await self.host.confirm_or_quit(message, _("item deletion cancelled"))
+        try:
+            await self.host.bridge.ps_item_retract(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                self.args.notify,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_("can't delete item: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("item {item} has been deleted").format(item=self.args.item))
+            self.host.quit(C.EXIT_OK)
+
+
+class Edit(base.CommandBase, common.BaseEdit):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "edit",
+            use_verbose=True,
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.SINGLE_ITEM},
+            use_draft=True,
+            help=_("edit an existing or new pubsub item"),
+        )
+        common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR)
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-e",
+            "--encrypt",
+            action="store_true",
+            help=_("end-to-end encrypt the blog item")
+        )
+        self.parser.add_argument(
+            "--encrypt-for",
+            metavar="JID",
+            action="append",
+            help=_("encrypt a single item for")
+        )
+        self.parser.add_argument(
+            "-X",
+            "--sign",
+            action="store_true",
+            help=_("cryptographically sign the blog post")
+        )
+
+    async def publish(self, content):
+        extra = {}
+        if self.args.encrypt:
+            extra["encrypted"] = True
+        if self.args.encrypt_for:
+            extra["encrypted_for"] = {"targets": self.args.encrypt_for}
+        if self.args.sign:
+            extra["signed"] = True
+        published_id = await self.host.bridge.ps_item_send(
+            self.pubsub_service,
+            self.pubsub_node,
+            content,
+            self.pubsub_item or "",
+            data_format.serialise(extra),
+            self.profile,
+        )
+        if published_id:
+            self.disp("Item published at {pub_id}".format(pub_id=published_id))
+        else:
+            self.disp("Item published")
+
+    async def get_item_data(self, service, node, item):
+        try:
+            from lxml import etree
+        except ImportError:
+            self.disp(
+                "lxml module must be installed to use edit, please install it "
+                'with "pip install lxml"',
+                error=True,
+            )
+            self.host.quit(1)
+        items = [item] if item else []
+        ps_result = data_format.deserialise(
+            await self.host.bridge.ps_items_get(
+                service, node, 1, items, "", data_format.serialise({}), self.profile
+            )
+        )
+        item_raw = ps_result["items"][0]
+        parser = etree.XMLParser(remove_blank_text=True, recover=True)
+        item_elt = etree.fromstring(item_raw, parser)
+        item_id = item_elt.get("id")
+        try:
+            payload = item_elt[0]
+        except IndexError:
+            self.disp(_("Item has not payload"), 1)
+            return "", item_id
+        return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id
+
+    async def start(self):
+        (
+            self.pubsub_service,
+            self.pubsub_node,
+            self.pubsub_item,
+            content_file_path,
+            content_file_obj,
+        ) = await self.get_item_path()
+        await self.run_editor("pubsub_editor_args", content_file_path, content_file_obj)
+        self.host.quit()
+
+
+class Rename(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "rename",
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.SINGLE_ITEM},
+            help=_("rename a pubsub item"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument("new_id", help=_("new item id to use"))
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_item_rename(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                self.args.new_id,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't rename item: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp("Item renamed")
+            self.host.quit(C.EXIT_OK)
+
+
+class Subscribe(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "subscribe",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("subscribe to a node"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--public",
+            action="store_true",
+            help=_("make the registration visible for everybody"),
+        )
+
+    async def start(self):
+        options = {}
+        if self.args.public:
+            namespaces = await self.host.bridge.namespaces_get()
+            try:
+                ns_pps = namespaces["pps"]
+            except KeyError:
+                self.disp(
+                    "Pubsub Public Subscription plugin is not loaded, can't use --public "
+                    "option, subscription stopped", error=True
+                )
+                self.host.quit(C.EXIT_MISSING_FEATURE)
+            else:
+                options[f"{{{ns_pps}}}public"] = True
+        try:
+            sub_id = await self.host.bridge.ps_subscribe(
+                self.args.service,
+                self.args.node,
+                data_format.serialise(options),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_("can't subscribe to node: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("subscription done"), 1)
+            if sub_id:
+                self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id))
+            self.host.quit()
+
+
+class Unsubscribe(base.CommandBase):
+    # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe
+
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "unsubscribe",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            use_verbose=True,
+            help=_("unsubscribe from a node"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_unsubscribe(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(_("can't unsubscribe from node: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(_("subscription removed"), 1)
+            self.host.quit()
+
+
+class Subscriptions(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "subscriptions",
+            use_output=C.OUTPUT_LIST_DICT,
+            use_pubsub=True,
+            help=_("retrieve all subscriptions on a service"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--public",
+            action="store_true",
+            help=_("get public subscriptions"),
+        )
+
+    async def start(self):
+        if self.args.public:
+            method = self.host.bridge.ps_public_subscriptions_get
+        else:
+            method = self.host.bridge.ps_subscriptions_get
+        try:
+            subscriptions = data_format.deserialise(
+                await method(
+                    self.args.service,
+                    self.args.node,
+                    self.profile,
+                ),
+                type_check=list
+            )
+        except Exception as e:
+            self.disp(_("can't retrieve subscriptions: {e}").format(e=e), error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(subscriptions)
+            self.host.quit()
+
+
+class Affiliations(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "affiliations",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            help=_("retrieve all affiliations on a service"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            affiliations = await self.host.bridge.ps_affiliations_get(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get node affiliations: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(affiliations)
+            self.host.quit()
+
+
+class Reference(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "reference",
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.SINGLE_ITEM},
+            help=_("send a reference/mention to pubsub item"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-t",
+            "--type",
+            default="mention",
+            choices=("data", "mention"),
+            help=_("type of reference to send (DEFAULT: mention)"),
+        )
+        self.parser.add_argument(
+            "recipient",
+            help=_("recipient of the reference")
+        )
+
+    async def start(self):
+        service = self.args.service or await self.host.get_profile_jid()
+        if self.args.item:
+            anchor = uri.build_xmpp_uri(
+                "pubsub", path=service, node=self.args.node, item=self.args.item
+            )
+        else:
+            anchor = uri.build_xmpp_uri("pubsub", path=service, node=self.args.node)
+
+        try:
+            await self.host.bridge.reference_send(
+                self.args.recipient,
+                anchor,
+                self.args.type,
+                "",
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't send reference: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.host.quit()
+
+
+class Search(base.CommandBase):
+    """This command do a search without using MAM
+
+    This commands checks every items it finds by itself,
+    so it may be heavy in resources both for server and client
+    """
+
+    RE_FLAGS = re.MULTILINE | re.UNICODE
+    EXEC_ACTIONS = ("exec", "external")
+
+    def __init__(self, host):
+        # FIXME: C.NO_MAX is not needed here, and this can be globally removed from consts
+        #        the only interest is to change the help string, but this can be explained
+        #        extensively in man pages (max is for each node found)
+        base.CommandBase.__init__(
+            self,
+            host,
+            "search",
+            use_output=C.OUTPUT_XML,
+            use_pubsub=True,
+            pubsub_flags={C.MULTI_ITEMS, C.NO_MAX},
+            use_verbose=True,
+            help=_("search items corresponding to filters"),
+        )
+
+    @property
+    def etree(self):
+        """load lxml.etree only if needed"""
+        if self._etree is None:
+            from lxml import etree
+
+            self._etree = etree
+        return self._etree
+
+    def filter_opt(self, value, type_):
+        return (type_, value)
+
+    def filter_flag(self, value, type_):
+        value = C.bool(value)
+        return (type_, value)
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-D",
+            "--max-depth",
+            type=int,
+            default=0,
+            help=_(
+                "maximum depth of recursion (will search linked nodes if > 0, "
+                "DEFAULT: 0)"
+            ),
+        )
+        self.parser.add_argument(
+            "-M",
+            "--node-max",
+            type=int,
+            default=30,
+            help=_(
+                "maximum number of items to get per node ({} to get all items, "
+                "DEFAULT: 30)".format(C.NO_LIMIT)
+            ),
+        )
+        self.parser.add_argument(
+            "-N",
+            "--namespace",
+            action="append",
+            nargs=2,
+            default=[],
+            metavar="NAME NAMESPACE",
+            help=_("namespace to use for xpath"),
+        )
+
+        # filters
+        filter_text = partial(self.filter_opt, type_="text")
+        filter_re = partial(self.filter_opt, type_="regex")
+        filter_xpath = partial(self.filter_opt, type_="xpath")
+        filter_python = partial(self.filter_opt, type_="python")
+        filters = self.parser.add_argument_group(
+            _("filters"),
+            _("only items corresponding to following filters will be kept"),
+        )
+        filters.add_argument(
+            "-t",
+            "--text",
+            action="append",
+            dest="filters",
+            type=filter_text,
+            metavar="TEXT",
+            help=_("full text filter, item must contain this string (XML included)"),
+        )
+        filters.add_argument(
+            "-r",
+            "--regex",
+            action="append",
+            dest="filters",
+            type=filter_re,
+            metavar="EXPRESSION",
+            help=_("like --text but using a regular expression"),
+        )
+        filters.add_argument(
+            "-x",
+            "--xpath",
+            action="append",
+            dest="filters",
+            type=filter_xpath,
+            metavar="XPATH",
+            help=_("filter items which has elements matching this xpath"),
+        )
+        filters.add_argument(
+            "-P",
+            "--python",
+            action="append",
+            dest="filters",
+            type=filter_python,
+            metavar="PYTHON_CODE",
+            help=_(
+                "Python expression which much return a bool (True to keep item, "
+                'False to reject it). "item" is raw text item, "item_xml" is '
+                "lxml's etree.Element"
+            ),
+        )
+
+        # filters flags
+        flag_case = partial(self.filter_flag, type_="ignore-case")
+        flag_invert = partial(self.filter_flag, type_="invert")
+        flag_dotall = partial(self.filter_flag, type_="dotall")
+        flag_matching = partial(self.filter_flag, type_="only-matching")
+        flags = self.parser.add_argument_group(
+            _("filters flags"),
+            _("filters modifiers (change behaviour of following filters)"),
+        )
+        flags.add_argument(
+            "-C",
+            "--ignore-case",
+            action="append",
+            dest="filters",
+            type=flag_case,
+            const=("ignore-case", True),
+            nargs="?",
+            metavar="BOOLEAN",
+            help=_("(don't) ignore case in following filters (DEFAULT: case sensitive)"),
+        )
+        flags.add_argument(
+            "-I",
+            "--invert",
+            action="append",
+            dest="filters",
+            type=flag_invert,
+            const=("invert", True),
+            nargs="?",
+            metavar="BOOLEAN",
+            help=_("(don't) invert effect of following filters (DEFAULT: don't invert)"),
+        )
+        flags.add_argument(
+            "-A",
+            "--dot-all",
+            action="append",
+            dest="filters",
+            type=flag_dotall,
+            const=("dotall", True),
+            nargs="?",
+            metavar="BOOLEAN",
+            help=_("(don't) use DOTALL option for regex (DEFAULT: don't use)"),
+        )
+        flags.add_argument(
+            "-k",
+            "--only-matching",
+            action="append",
+            dest="filters",
+            type=flag_matching,
+            const=("only-matching", True),
+            nargs="?",
+            metavar="BOOLEAN",
+            help=_("keep only the matching part of the item"),
+        )
+
+        # action
+        self.parser.add_argument(
+            "action",
+            default="print",
+            nargs="?",
+            choices=("print", "exec", "external"),
+            help=_("action to do on found items (DEFAULT: print)"),
+        )
+        self.parser.add_argument("command", nargs=argparse.REMAINDER)
+
+    async def get_items(self, depth, service, node, items):
+        self.to_get += 1
+        try:
+            ps_result = data_format.deserialise(
+                await self.host.bridge.ps_items_get(
+                    service,
+                    node,
+                    self.args.node_max,
+                    items,
+                    "",
+                    self.get_pubsub_extra(),
+                    self.profile,
+                )
+            )
+        except Exception as e:
+            self.disp(
+                f"can't get pubsub items at {service} (node: {node}): {e}",
+                error=True,
+            )
+            self.to_get -= 1
+        else:
+            await self.search(ps_result, depth)
+
+    def _check_pubsub_url(self, match, found_nodes):
+        """check that the matched URL is an xmpp: one
+
+        @param found_nodes(list[unicode]): found_nodes
+            this list will be filled while xmpp: URIs are discovered
+        """
+        url = match.group(0)
+        if url.startswith("xmpp"):
+            try:
+                url_data = uri.parse_xmpp_uri(url)
+            except ValueError:
+                return
+            if url_data["type"] == "pubsub":
+                found_node = {"service": url_data["path"], "node": url_data["node"]}
+                if "item" in url_data:
+                    found_node["item"] = url_data["item"]
+                found_nodes.append(found_node)
+
+    async def get_sub_nodes(self, item, depth):
+        """look for pubsub URIs in item, and get_items on the linked nodes"""
+        found_nodes = []
+        checkURI = partial(self._check_pubsub_url, found_nodes=found_nodes)
+        strings.RE_URL.sub(checkURI, item)
+        for data in found_nodes:
+            await self.get_items(
+                depth + 1,
+                data["service"],
+                data["node"],
+                [data["item"]] if "item" in data else [],
+            )
+
+    def parseXml(self, item):
+        try:
+            return self.etree.fromstring(item)
+        except self.etree.XMLSyntaxError:
+            self.disp(
+                _(
+                    "item doesn't looks like XML, you have probably used --only-matching "
+                    "somewhere before and we have no more XML"
+                ),
+                error=True,
+            )
+            self.host.quit(C.EXIT_BAD_ARG)
+
+    def filter(self, item):
+        """apply filters given on command line
+
+        if only-matching is used, item may be modified
+        @return (tuple[bool, unicode]): a tuple with:
+            - keep: True if item passed the filters
+            - item: it is returned in case of modifications
+        """
+        ignore_case = False
+        invert = False
+        dotall = False
+        only_matching = False
+        item_xml = None
+        for type_, value in self.args.filters:
+            keep = True
+
+            ## filters
+
+            if type_ == "text":
+                if ignore_case:
+                    if value.lower() not in item.lower():
+                        keep = False
+                else:
+                    if value not in item:
+                        keep = False
+                if keep and only_matching:
+                    # doesn't really make sens to keep a fixed string
+                    # so we raise an error
+                    self.host.disp(
+                        _("--only-matching used with fixed --text string, are you sure?"),
+                        error=True,
+                    )
+                    self.host.quit(C.EXIT_BAD_ARG)
+            elif type_ == "regex":
+                flags = self.RE_FLAGS
+                if ignore_case:
+                    flags |= re.IGNORECASE
+                if dotall:
+                    flags |= re.DOTALL
+                match = re.search(value, item, flags)
+                keep = match != None
+                if keep and only_matching:
+                    item = match.group()
+                    item_xml = None
+            elif type_ == "xpath":
+                if item_xml is None:
+                    item_xml = self.parseXml(item)
+                try:
+                    elts = item_xml.xpath(value, namespaces=self.args.namespace)
+                except self.etree.XPathEvalError as e:
+                    self.disp(_("can't use xpath: {reason}").format(reason=e), error=True)
+                    self.host.quit(C.EXIT_BAD_ARG)
+                keep = bool(elts)
+                if keep and only_matching:
+                    item_xml = elts[0]
+                    try:
+                        item = self.etree.tostring(item_xml, encoding="unicode")
+                    except TypeError:
+                        # we have a string only, not an element
+                        item = str(item_xml)
+                        item_xml = None
+            elif type_ == "python":
+                if item_xml is None:
+                    item_xml = self.parseXml(item)
+                cmd_ns = {"etree": self.etree, "item": item, "item_xml": item_xml}
+                try:
+                    keep = eval(value, cmd_ns)
+                except SyntaxError as e:
+                    self.disp(str(e), error=True)
+                    self.host.quit(C.EXIT_BAD_ARG)
+
+                    ## flags
+
+            elif type_ == "ignore-case":
+                ignore_case = value
+            elif type_ == "invert":
+                invert = value
+                #  we need to continue, else loop would end here
+                continue
+            elif type_ == "dotall":
+                dotall = value
+            elif type_ == "only-matching":
+                only_matching = value
+            else:
+                raise exceptions.InternalError(
+                    _("unknown filter type {type}").format(type=type_)
+                )
+
+            if invert:
+                keep = not keep
+            if not keep:
+                return False, item
+
+        return True, item
+
+    async def do_item_action(self, item, metadata):
+        """called when item has been kepts and the action need to be done
+
+        @param item(unicode): accepted item
+        """
+        action = self.args.action
+        if action == "print" or self.host.verbosity > 0:
+            try:
+                await self.output(item)
+            except self.etree.XMLSyntaxError:
+                # item is not valid XML, but a string
+                # can happen when --only-matching is used
+                self.disp(item)
+        if action in self.EXEC_ACTIONS:
+            item_elt = self.parseXml(item)
+            if action == "exec":
+                use = {
+                    "service": metadata["service"],
+                    "node": metadata["node"],
+                    "item": item_elt.get("id"),
+                    "profile": self.profile,
+                }
+                # we need to send a copy of self.args.command
+                # else it would be modified
+                parser_args, use_args = arg_tools.get_use_args(
+                    self.host, self.args.command, use, verbose=self.host.verbosity > 1
+                )
+                cmd_args = sys.argv[0:1] + parser_args + use_args
+            else:
+                cmd_args = self.args.command
+
+            self.disp(
+                "COMMAND: {command}".format(
+                    command=" ".join([arg_tools.escape(a) for a in cmd_args])
+                ),
+                2,
+            )
+            if action == "exec":
+                p = await asyncio.create_subprocess_exec(*cmd_args)
+                ret = await p.wait()
+            else:
+                p = await asyncio.create_subprocess_exec(*cmd_args, stdin=subprocess.PIPE)
+                await p.communicate(item.encode(sys.getfilesystemencoding()))
+                ret = p.returncode
+            if ret != 0:
+                self.disp(
+                    A.color(
+                        C.A_FAILURE,
+                        _("executed command failed with exit code {ret}").format(ret=ret),
+                    )
+                )
+
+    async def search(self, ps_result, depth):
+        """callback of get_items
+
+        this method filters items, get sub nodes if needed,
+        do the requested action, and exit the command when everything is done
+        @param items_data(tuple): result of get_items
+        @param depth(int): current depth level
+            0 for first node, 1 for first children, and so on
+        """
+        for item in ps_result["items"]:
+            if depth < self.args.max_depth:
+                await self.get_sub_nodes(item, depth)
+            keep, item = self.filter(item)
+            if not keep:
+                continue
+            await self.do_item_action(item, ps_result)
+
+            #  we check if we got all get_items results
+        self.to_get -= 1
+        if self.to_get == 0:
+            # yes, we can quit
+            self.host.quit()
+        assert self.to_get > 0
+
+    async def start(self):
+        if self.args.command:
+            if self.args.action not in self.EXEC_ACTIONS:
+                self.parser.error(
+                    _("Command can only be used with {actions} actions").format(
+                        actions=", ".join(self.EXEC_ACTIONS)
+                    )
+                )
+        else:
+            if self.args.action in self.EXEC_ACTIONS:
+                self.parser.error(_("you need to specify a command to execute"))
+        if not self.args.node:
+            # TODO: handle get service affiliations when node is not set
+            self.parser.error(_("empty node is not handled yet"))
+            # to_get is increased on each get and decreased on each answer
+            # when it reach 0 again, the command is finished
+        self.to_get = 0
+        self._etree = None
+        if self.args.filters is None:
+            self.args.filters = []
+        self.args.namespace = dict(
+            self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")]
+        )
+        await self.get_items(0, self.args.service, self.args.node, self.args.items)
+
+
+class Transform(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "transform",
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.MULTI_ITEMS},
+            help=_("modify items of a node using an external command/script"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--apply",
+            action="store_true",
+            help=_("apply transformation (DEFAULT: do a dry run)"),
+        )
+        self.parser.add_argument(
+            "--admin",
+            action="store_true",
+            help=_("do a pubsub admin request, needed to change publisher"),
+        )
+        self.parser.add_argument(
+            "-I",
+            "--ignore-errors",
+            action="store_true",
+            help=_(
+                "if command return a non zero exit code, ignore the item and continue"
+            ),
+        )
+        self.parser.add_argument(
+            "-A",
+            "--all",
+            action="store_true",
+            help=_("get all items by looping over all pages using RSM"),
+        )
+        self.parser.add_argument(
+            "command_path",
+            help=_(
+                "path to the command to use. Will be called repetitivly with an "
+                "item as input. Output (full item XML) will be used as new one. "
+                'Return "DELETE" string to delete the item, and "SKIP" to ignore it'
+            ),
+        )
+
+    async def ps_items_send_cb(self, item_ids, metadata):
+        if item_ids:
+            self.disp(
+                _("items published with ids {item_ids}").format(
+                    item_ids=", ".join(item_ids)
+                )
+            )
+        else:
+            self.disp(_("items published"))
+        if self.args.all:
+            return await self.handle_next_page(metadata)
+        else:
+            self.host.quit()
+
+    async def handle_next_page(self, metadata):
+        """Retrieve new page through RSM or quit if we're in the last page
+
+        use to handle --all option
+        @param metadata(dict): metadata as returned by ps_items_get
+        """
+        try:
+            last = metadata["rsm"]["last"]
+            index = int(metadata["rsm"]["index"])
+            count = int(metadata["rsm"]["count"])
+        except KeyError:
+            self.disp(
+                _("Can't retrieve all items, RSM metadata not available"), error=True
+            )
+            self.host.quit(C.EXIT_MISSING_FEATURE)
+        except ValueError as e:
+            self.disp(
+                _("Can't retrieve all items, bad RSM metadata: {msg}").format(msg=e),
+                error=True,
+            )
+            self.host.quit(C.EXIT_ERROR)
+
+        if index + self.args.rsm_max >= count:
+            self.disp(_("All items transformed"))
+            self.host.quit(0)
+
+        self.disp(
+            _("Retrieving next page ({page_idx}/{page_total})").format(
+                page_idx=int(index / self.args.rsm_max) + 1,
+                page_total=int(count / self.args.rsm_max),
+            )
+        )
+
+        extra = self.get_pubsub_extra()
+        extra["rsm_after"] = last
+        try:
+            ps_result = await data_format.deserialise(
+                self.host.bridge.ps_items_get(
+                    self.args.service,
+                    self.args.node,
+                    self.args.rsm_max,
+                    self.args.items,
+                    "",
+                    data_format.serialise(extra),
+                    self.profile,
+                )
+            )
+        except Exception as e:
+            self.disp(f"can't retrieve items: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.ps_items_get_cb(ps_result)
+
+    async def ps_items_get_cb(self, ps_result):
+        encoding = "utf-8"
+        new_items = []
+
+        for item in ps_result["items"]:
+            if self.check_duplicates:
+                # this is used when we are not ordering by creation
+                # to avoid infinite loop
+                item_elt, __ = xml_tools.etree_parse(self, item)
+                item_id = item_elt.get("id")
+                if item_id in self.items_ids:
+                    self.disp(
+                        _(
+                            "Duplicate found on item {item_id}, we have probably handled "
+                            "all items."
+                        ).format(item_id=item_id)
+                    )
+                    self.host.quit()
+                self.items_ids.append(item_id)
+
+                # we launch the command to filter the item
+            try:
+                p = await asyncio.create_subprocess_exec(
+                    self.args.command_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE
+                )
+            except OSError as e:
+                exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR
+                self.disp(f"Can't execute the command: {e}", error=True)
+                self.host.quit(exit_code)
+            encoding = "utf-8"
+            cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding))
+            ret = p.returncode
+            if ret != 0:
+                self.disp(
+                    f"The command returned a non zero status while parsing the "
+                    f"following item:\n\n{item}",
+                    error=True,
+                )
+                if self.args.ignore_errors:
+                    continue
+                else:
+                    self.host.quit(C.EXIT_CMD_ERROR)
+            if cmd_std_err is not None:
+                cmd_std_err = cmd_std_err.decode(encoding, errors="ignore")
+                self.disp(cmd_std_err, error=True)
+            cmd_std_out = cmd_std_out.decode(encoding).strip()
+            if cmd_std_out == "DELETE":
+                item_elt, __ = xml_tools.etree_parse(self, item)
+                item_id = item_elt.get("id")
+                self.disp(_("Deleting item {item_id}").format(item_id=item_id))
+                if self.args.apply:
+                    try:
+                        await self.host.bridge.ps_item_retract(
+                            self.args.service,
+                            self.args.node,
+                            item_id,
+                            False,
+                            self.profile,
+                        )
+                    except Exception as e:
+                        self.disp(f"can't delete item {item_id}: {e}", error=True)
+                        self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+                continue
+            elif cmd_std_out == "SKIP":
+                item_elt, __ = xml_tools.etree_parse(self, item)
+                item_id = item_elt.get("id")
+                self.disp(_("Skipping item {item_id}").format(item_id=item_id))
+                continue
+            element, etree = xml_tools.etree_parse(self, cmd_std_out)
+
+            # at this point command has been run and we have a etree.Element object
+            if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"):
+                self.disp(
+                    "your script must return a whole item, this is not:\n{xml}".format(
+                        xml=etree.tostring(element, encoding="unicode")
+                    ),
+                    error=True,
+                )
+                self.host.quit(C.EXIT_DATA_ERROR)
+
+            if not self.args.apply:
+                # we have a dry run, we just display filtered items
+                serialised = etree.tostring(
+                    element, encoding="unicode", pretty_print=True
+                )
+                self.disp(serialised)
+            else:
+                new_items.append(etree.tostring(element, encoding="unicode"))
+
+        if not self.args.apply:
+            # on dry run we have nothing to wait for, we can quit
+            if self.args.all:
+                return await self.handle_next_page(ps_result)
+            self.host.quit()
+        else:
+            if self.args.admin:
+                bridge_method = self.host.bridge.ps_admin_items_send
+            else:
+                bridge_method = self.host.bridge.ps_items_send
+
+            try:
+                ps_items_send_result = await bridge_method(
+                    self.args.service,
+                    self.args.node,
+                    new_items,
+                    "",
+                    self.profile,
+                )
+            except Exception as e:
+                self.disp(f"can't send item: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+            else:
+                await self.ps_items_send_cb(ps_items_send_result, metadata=ps_result)
+
+    async def start(self):
+        if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
+            self.check_duplicates = True
+            self.items_ids = []
+            self.disp(
+                A.color(
+                    A.FG_RED,
+                    A.BOLD,
+                    '/!\\ "--all" should be used with "--order-by creation" /!\\\n',
+                    A.RESET,
+                    "We'll update items, so order may change during transformation,\n"
+                    "we'll try to mitigate that by stopping on first duplicate,\n"
+                    "but this method is not safe, and some items may be missed.\n---\n",
+                )
+            )
+        else:
+            self.check_duplicates = False
+
+        try:
+            ps_result = data_format.deserialise(
+                await self.host.bridge.ps_items_get(
+                    self.args.service,
+                    self.args.node,
+                    self.args.max,
+                    self.args.items,
+                    "",
+                    self.get_pubsub_extra(),
+                    self.profile,
+                )
+            )
+        except Exception as e:
+            self.disp(f"can't retrieve items: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.ps_items_get_cb(ps_result)
+
+
+class Uri(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "uri",
+            use_profile=False,
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.SINGLE_ITEM},
+            help=_("build URI"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-p",
+            "--profile",
+            default=C.PROF_KEY_DEFAULT,
+            help=_("profile (used when no server is specified)"),
+        )
+
+    def display_uri(self, jid_):
+        uri_args = {}
+        if not self.args.service:
+            self.args.service = jid.JID(jid_).bare
+
+        for key in ("node", "service", "item"):
+            value = getattr(self.args, key)
+            if key == "service":
+                key = "path"
+            if value:
+                uri_args[key] = value
+        self.disp(uri.build_xmpp_uri("pubsub", **uri_args))
+        self.host.quit()
+
+    async def start(self):
+        if not self.args.service:
+            try:
+                jid_ = await self.host.bridge.param_get_a_async(
+                    "JabberID", "Connection", profile_key=self.args.profile
+                )
+            except Exception as e:
+                self.disp(f"can't retrieve jid: {e}", error=True)
+                self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+            else:
+                self.display_uri(jid_)
+        else:
+            self.display_uri(None)
+
+
+class AttachmentGet(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "get",
+            use_output=C.OUTPUT_LIST_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
+            help=_("get data attached to an item"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-j",
+            "--jid",
+            action="append",
+            dest="jids",
+            help=_(
+                "get attached data published only by those JIDs (DEFAULT: get all "
+                "attached data)"
+            )
+        )
+
+    async def start(self):
+        try:
+            attached_data, __ = await self.host.bridge.ps_attachments_get(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                self.args.jids or [],
+                "",
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't get attached data: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            attached_data = data_format.deserialise(attached_data, type_check=list)
+            await self.output(attached_data)
+            self.host.quit(C.EXIT_OK)
+
+
+class AttachmentSet(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "set",
+            use_pubsub=True,
+            pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
+            help=_("attach data to an item"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "--replace",
+            action="store_true",
+            help=_(
+                "replace previous versions of attachments (DEFAULT: update previous "
+                "version)"
+            )
+        )
+        self.parser.add_argument(
+            "-N",
+            "--noticed",
+            metavar="BOOLEAN",
+            nargs="?",
+            default="keep",
+            help=_("mark item as (un)noticed (DEFAULT: keep current value))")
+        )
+        self.parser.add_argument(
+            "-r",
+            "--reactions",
+            # FIXME: to be replaced by "extend" when we stop supporting python 3.7
+            action="append",
+            help=_("emojis to add to react to an item")
+        )
+        self.parser.add_argument(
+            "-R",
+            "--reactions-remove",
+            # FIXME: to be replaced by "extend" when we stop supporting python 3.7
+            action="append",
+            help=_("emojis to remove from reactions to an item")
+        )
+
+    async def start(self):
+        attachments_data = {
+            "service": self.args.service,
+            "node": self.args.node,
+            "id": self.args.item,
+            "extra": {}
+        }
+        operation = "replace" if self.args.replace else "update"
+        if self.args.noticed != "keep":
+            if self.args.noticed is None:
+                self.args.noticed = C.BOOL_TRUE
+            attachments_data["extra"]["noticed"] = C.bool(self.args.noticed)
+
+        if self.args.reactions or self.args.reactions_remove:
+            reactions = attachments_data["extra"]["reactions"] = {
+                "operation": operation
+            }
+            if self.args.replace:
+                reactions["reactions"] = self.args.reactions
+            else:
+                reactions["add"] = self.args.reactions
+                reactions["remove"] = self.args.reactions_remove
+
+
+        if not attachments_data["extra"]:
+            self.parser.error(_("At leat one attachment must be specified."))
+
+        try:
+            await self.host.bridge.ps_attachments_set(
+                data_format.serialise(attachments_data),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't attach data to item: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp("data attached")
+            self.host.quit(C.EXIT_OK)
+
+
+class Attachments(base.CommandBase):
+    subcommands = (AttachmentGet, AttachmentSet)
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "attachments",
+            use_profile=False,
+            help=_("set or retrieve items attachments"),
+        )
+
+
+class SignatureSign(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "sign",
+            use_pubsub=True,
+            pubsub_flags={C.NODE, C.SINGLE_ITEM},
+            help=_("sign an item"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        attachments_data = {
+            "service": self.args.service,
+            "node": self.args.node,
+            "id": self.args.item,
+            "extra": {
+                # we set None to use profile's bare JID
+                "signature": {"signer": None}
+            }
+        }
+        try:
+            await self.host.bridge.ps_attachments_set(
+                data_format.serialise(attachments_data),
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't sign the item: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(f"item {self.args.item!r} has been signed")
+            self.host.quit(C.EXIT_OK)
+
+
+class SignatureCheck(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "check",
+            use_output=C.OUTPUT_DICT,
+            use_pubsub=True,
+            pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
+            help=_("check the validity of pubsub signature"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "signature",
+            metavar="JSON",
+            help=_("signature data")
+        )
+
+    async def start(self):
+        try:
+            ret_s = await self.host.bridge.ps_signature_check(
+                self.args.service,
+                self.args.node,
+                self.args.item,
+                self.args.signature,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't check signature: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            await self.output(data_format.deserialise((ret_s)))
+            self.host.quit()
+
+
+class Signature(base.CommandBase):
+    subcommands = (
+        SignatureSign,
+        SignatureCheck,
+    )
+
+    def __init__(self, host):
+        super().__init__(
+            host, "signature", use_profile=False, help=_("items signatures")
+        )
+
+
+class SecretShare(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "share",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("share a secret to let other entity encrypt or decrypt items"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-k", "--key", metavar="ID", dest="secret_ids", action="append", default=[],
+            help=_(
+                "only share secrets with those IDs (default: share all secrets of the "
+                "node)"
+            )
+        )
+        self.parser.add_argument(
+            "recipient", metavar="JID", help=_("entity who must get the shared secret")
+        )
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_secret_share(
+                self.args.recipient,
+                self.args.service,
+                self.args.node,
+                self.args.secret_ids,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't share secret: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp("secrets have been shared")
+            self.host.quit(C.EXIT_OK)
+
+
+class SecretRevoke(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "revoke",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("revoke an encrypted node secret"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "secret_id", help=_("ID of the secrets to revoke")
+        )
+        self.parser.add_argument(
+            "-r", "--recipient", dest="recipients", metavar="JID", action="append",
+            default=[], help=_(
+                "entity who must get the revocation notification (default: send to all "
+                "entities known to have the shared secret)"
+            )
+        )
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_secret_revoke(
+                self.args.service,
+                self.args.node,
+                self.args.secret_id,
+                self.args.recipients,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't revoke secret: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp("secret {self.args.secret_id} has been revoked.")
+            self.host.quit(C.EXIT_OK)
+
+
+class SecretRotate(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "rotate",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("revoke existing secrets, create a new one and send notifications"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-r", "--recipient", dest="recipients", metavar="JID", action="append",
+            default=[], help=_(
+                "entity who must get the revocation and shared secret notifications "
+                "(default: send to all entities known to have the shared secret)"
+            )
+        )
+
+    async def start(self):
+        try:
+            await self.host.bridge.ps_secret_rotate(
+                self.args.service,
+                self.args.node,
+                self.args.recipients,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't rotate secret: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp("secret has been rotated")
+            self.host.quit(C.EXIT_OK)
+
+
+class SecretList(base.CommandBase):
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "list",
+            use_pubsub=True,
+            use_verbose=True,
+            pubsub_flags={C.NODE},
+            help=_("list known secrets for a pubsub node"),
+            use_output=C.OUTPUT_LIST_DICT
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            secrets = data_format.deserialise(await self.host.bridge.ps_secrets_list(
+                self.args.service,
+                self.args.node,
+                self.profile,
+            ), type_check=list)
+        except Exception as e:
+            self.disp(f"can't list node secrets: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if not self.verbosity:
+                # we don't print key if verbosity is not a least one, to avoid showing it
+                # on the screen accidentally
+                for secret in secrets:
+                    del secret["key"]
+            await self.output(secrets)
+            self.host.quit(C.EXIT_OK)
+
+
+class Secret(base.CommandBase):
+    subcommands = (SecretShare, SecretRevoke, SecretRotate, SecretList)
+
+    def __init__(self, host):
+        super().__init__(
+            host,
+            "secret",
+            use_profile=False,
+            help=_("handle encrypted nodes secrets"),
+        )
+
+
+class HookCreate(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "create",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("create a Pubsub hook"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-t",
+            "--type",
+            default="python",
+            choices=("python", "python_file", "python_code"),
+            help=_("hook type"),
+        )
+        self.parser.add_argument(
+            "-P",
+            "--persistent",
+            action="store_true",
+            help=_("make hook persistent across restarts"),
+        )
+        self.parser.add_argument(
+            "hook_arg",
+            help=_("argument of the hook (depend of the type)"),
+        )
+
+    @staticmethod
+    def check_args(self):
+        if self.args.type == "python_file":
+            self.args.hook_arg = os.path.abspath(self.args.hook_arg)
+            if not os.path.isfile(self.args.hook_arg):
+                self.parser.error(
+                    _("{path} is not a file").format(path=self.args.hook_arg)
+                )
+
+    async def start(self):
+        self.check_args(self)
+        try:
+            await self.host.bridge.ps_hook_add(
+                self.args.service,
+                self.args.node,
+                self.args.type,
+                self.args.hook_arg,
+                self.args.persistent,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't create hook: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.host.quit()
+
+
+class HookDelete(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "delete",
+            use_pubsub=True,
+            pubsub_flags={C.NODE},
+            help=_("delete a Pubsub hook"),
+        )
+
+    def add_parser_options(self):
+        self.parser.add_argument(
+            "-t",
+            "--type",
+            default="",
+            choices=("", "python", "python_file", "python_code"),
+            help=_("hook type to remove, empty to remove all (DEFAULT: remove all)"),
+        )
+        self.parser.add_argument(
+            "-a",
+            "--arg",
+            dest="hook_arg",
+            default="",
+            help=_(
+                "argument of the hook to remove, empty to remove all (DEFAULT: remove all)"
+            ),
+        )
+
+    async def start(self):
+        HookCreate.check_args(self)
+        try:
+            nb_deleted = await self.host.bridge.ps_hook_remove(
+                self.args.service,
+                self.args.node,
+                self.args.type,
+                self.args.hook_arg,
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't delete hook: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            self.disp(
+                _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted)
+            )
+            self.host.quit()
+
+
+class HookList(base.CommandBase):
+    def __init__(self, host):
+        base.CommandBase.__init__(
+            self,
+            host,
+            "list",
+            use_output=C.OUTPUT_LIST_DICT,
+            help=_("list hooks of a profile"),
+        )
+
+    def add_parser_options(self):
+        pass
+
+    async def start(self):
+        try:
+            data = await self.host.bridge.ps_hook_list(
+                self.profile,
+            )
+        except Exception as e:
+            self.disp(f"can't list hooks: {e}", error=True)
+            self.host.quit(C.EXIT_BRIDGE_ERRBACK)
+        else:
+            if not data:
+                self.disp(_("No hook found."))
+            await self.output(data)
+            self.host.quit()
+
+
+class Hook(base.CommandBase):
+    subcommands = (HookCreate, HookDelete, HookList)
+
+    def __init__(self, host):
+        super(Hook, self).__init__(
+            host,
+            "hook",
+            use_profile=False,
+            use_verbose=True,
+            help=_("trigger action on Pubsub notifications"),
+        )
+
+
+class Pubsub(base.CommandBase):
+    subcommands = (
+        Set,
+        Get,
+        Delete,
+        Edit,
+        Rename,
+        Subscribe,
+        Unsubscribe,
+        Subscriptions,
+        Affiliations,
+        Reference,
+        Search,
+        Transform,
+        Attachments,
+        Signature,
+        Secret,
+        Hook,
+        Uri,
+        Node,
+        Cache,
+    )
+
+    def __init__(self, host):
+        super(Pubsub, self).__init__(
+            host, "pubsub", use_profile=False, help=_("PubSub nodes/items management")
+        )