Mercurial > libervia-backend
diff libervia/cli/cmd_pubsub.py @ 4075:47401850dec6
refactoring: rename `libervia.frontends.jp` to `libervia.cli`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 14:54:26 +0200 |
parents | libervia/frontends/jp/cmd_pubsub.py@26b7ed2817da |
children | 3f7ca590a5da |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/cli/cmd_pubsub.py Fri Jun 02 14:54:26 2023 +0200 @@ -0,0 +1,3030 @@ +#!/usr/bin/env python3 + + +# Libervia CLI +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import argparse +import os.path +import re +import sys +import subprocess +import asyncio +import json +from . import base +from libervia.backend.core.i18n import _ +from libervia.backend.core import exceptions +from libervia.cli.constants import Const as C +from libervia.cli import common +from libervia.cli import arg_tools +from libervia.cli import xml_tools +from functools import partial +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common import uri +from libervia.backend.tools.common.ansi import ANSI as A +from libervia.backend.tools.common import date_utils +from libervia.frontends.tools import jid, strings +from libervia.frontends.bridge.bridge_frontend import BridgeException + +__commands__ = ["Pubsub"] + +PUBSUB_TMP_DIR = "pubsub" +PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema" +ALLOWED_SUBSCRIPTIONS_OWNER = ("subscribed", "pending", "none") + +# TODO: need to split this class in several modules, plugin should handle subcommands + + +class NodeInfo(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "info", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("retrieve node configuration"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-k", + "--key", + action="append", + dest="keys", + help=_("data key to filter"), + ) + + def remove_prefix(self, key): + return key[7:] if key.startswith("pubsub#") else key + + def filter_key(self, key): + return any((key == k or key == "pubsub#" + k) for k in self.args.keys) + + async def start(self): + try: + config_dict = await self.host.bridge.ps_node_configuration_get( + self.args.service, + self.args.node, + self.profile, + ) + except BridgeException as e: + if e.condition == "item-not-found": + self.disp( + f"The node {self.args.node} doesn't exist on {self.args.service}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't get node configuration: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + key_filter = (lambda k: True) if not self.args.keys else self.filter_key + config_dict = { + self.remove_prefix(k): v for k, v in config_dict.items() if key_filter(k) + } + await self.output(config_dict) + self.host.quit() + + +class NodeCreate(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "create", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("create a node"), + ) + + @staticmethod + def add_node_config_options(parser): + parser.add_argument( + "-f", + "--field", + action="append", + nargs=2, + dest="fields", + default=[], + metavar=("KEY", "VALUE"), + help=_("configuration field to set"), + ) + parser.add_argument( + "-F", + "--full-prefix", + action="store_true", + help=_('don\'t prepend "pubsub#" prefix to field names'), + ) + + def add_parser_options(self): + self.add_node_config_options(self.parser) + + @staticmethod + def get_config_options(args): + if not args.full_prefix: + return {"pubsub#" + k: v for k, v in args.fields} + else: + return dict(args.fields) + + async def start(self): + options = self.get_config_options(self.args) + try: + node_id = await self.host.bridge.ps_node_create( + self.args.service, + self.args.node, + options, + self.profile, + ) + except Exception as e: + self.disp(msg=_("can't create node: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if self.host.verbosity: + announce = _("node created successfully: ") + else: + announce = "" + self.disp(announce + node_id) + self.host.quit() + + +class NodePurge(base.CommandBase): + def __init__(self, host): + super(NodePurge, self).__init__( + host, + "purge", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("purge a node (i.e. remove all items from it)"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", + "--force", + action="store_true", + help=_("purge node without confirmation"), + ) + + async def start(self): + if not self.args.force: + if not self.args.service: + message = _( + "Are you sure to purge PEP node [{node}]? This will " + "delete ALL items from it!" + ).format(node=self.args.node) + else: + message = _( + "Are you sure to delete node [{node}] on service " + "[{service}]? This will delete ALL items from it!" + ).format(node=self.args.node, service=self.args.service) + await self.host.confirm_or_quit(message, _("node purge cancelled")) + + try: + await self.host.bridge.ps_node_purge( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(msg=_("can't purge node: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("node [{node}] purged successfully").format(node=self.args.node)) + self.host.quit() + + +class NodeDelete(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "delete", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("delete a node"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", + "--force", + action="store_true", + help=_("delete node without confirmation"), + ) + + async def start(self): + if not self.args.force: + if not self.args.service: + message = _("Are you sure to delete PEP node [{node}] ?").format( + node=self.args.node + ) + else: + message = _( + "Are you sure to delete node [{node}] on " "service [{service}]?" + ).format(node=self.args.node, service=self.args.service) + await self.host.confirm_or_quit(message, _("node deletion cancelled")) + + try: + await self.host.bridge.ps_node_delete( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't delete node: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("node [{node}] deleted successfully").format(node=self.args.node)) + self.host.quit() + + +class NodeSet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "set", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("set node configuration"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", + "--field", + action="append", + nargs=2, + dest="fields", + required=True, + metavar=("KEY", "VALUE"), + help=_("configuration field to set (required)"), + ) + self.parser.add_argument( + "-F", + "--full-prefix", + action="store_true", + help=_('don\'t prepend "pubsub#" prefix to field names'), + ) + + def get_key_name(self, k): + if self.args.full_prefix or k.startswith("pubsub#"): + return k + else: + return "pubsub#" + k + + async def start(self): + try: + await self.host.bridge.ps_node_configuration_set( + self.args.service, + self.args.node, + {self.get_key_name(k): v for k, v in self.args.fields}, + self.profile, + ) + except Exception as e: + self.disp(f"can't set node configuration: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("node configuration successful"), 1) + self.host.quit() + + +class NodeImport(base.CommandBase): + def __init__(self, host): + super(NodeImport, self).__init__( + host, + "import", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("import raw XML to a node"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "--admin", + action="store_true", + help=_("do a pubsub admin request, needed to change publisher"), + ) + self.parser.add_argument( + "import_file", + type=argparse.FileType(), + help=_( + "path to the XML file with data to import. The file must contain " + "whole XML of each item to import." + ), + ) + + async def start(self): + try: + element, etree = xml_tools.etree_parse( + self, self.args.import_file, reraise=True + ) + except Exception as e: + from lxml.etree import XMLSyntaxError + + if isinstance(e, XMLSyntaxError) and e.code == 5: + # we have extra content, this probaby means that item are not wrapped + # so we wrap them here and try again + self.args.import_file.seek(0) + xml_buf = "<import>" + self.args.import_file.read() + "</import>" + element, etree = xml_tools.etree_parse(self, xml_buf) + + # we reverse element as we expect to have most recently published element first + # TODO: make this more explicit and add an option + element[:] = reversed(element) + + if not all([i.tag == "{http://jabber.org/protocol/pubsub}item" for i in element]): + self.disp( + _("You are not using list of pubsub items, we can't import this file"), + error=True, + ) + self.host.quit(C.EXIT_DATA_ERROR) + return + + items = [etree.tostring(i, encoding="unicode") for i in element] + if self.args.admin: + method = self.host.bridge.ps_admin_items_send + else: + self.disp( + _( + "Items are imported without using admin mode, publisher can't " + "be changed" + ) + ) + method = self.host.bridge.ps_items_send + + try: + items_ids = await method( + self.args.service, + self.args.node, + items, + "", + self.profile, + ) + except Exception as e: + self.disp(f"can't send items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if items_ids: + self.disp( + _("items published with id(s) {items_ids}").format( + items_ids=", ".join(items_ids) + ) + ) + else: + self.disp(_("items published")) + self.host.quit() + + +class NodeAffiliationsGet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "get", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("retrieve node affiliations (for node owner)"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + affiliations = await self.host.bridge.ps_node_affiliations_get( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get node affiliations: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(affiliations) + self.host.quit() + + +class NodeAffiliationsSet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "set", + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("set affiliations (for node owner)"), + ) + + def add_parser_options(self): + # XXX: we use optional argument syntax for a required one because list of list of 2 elements + # (used to construct dicts) don't work with positional arguments + self.parser.add_argument( + "-a", + "--affiliation", + dest="affiliations", + metavar=("JID", "AFFILIATION"), + required=True, + action="append", + nargs=2, + help=_("entity/affiliation couple(s)"), + ) + + async def start(self): + affiliations = dict(self.args.affiliations) + try: + await self.host.bridge.ps_node_affiliations_set( + self.args.service, + self.args.node, + affiliations, + self.profile, + ) + except Exception as e: + self.disp(f"can't set node affiliations: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("affiliations have been set"), 1) + self.host.quit() + + +class NodeAffiliations(base.CommandBase): + subcommands = (NodeAffiliationsGet, NodeAffiliationsSet) + + def __init__(self, host): + super(NodeAffiliations, self).__init__( + host, + "affiliations", + use_profile=False, + help=_("set or retrieve node affiliations"), + ) + + +class NodeSubscriptionsGet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "get", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("retrieve node subscriptions (for node owner)"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "--public", + action="store_true", + help=_("get public subscriptions"), + ) + + async def start(self): + if self.args.public: + method = self.host.bridge.ps_public_node_subscriptions_get + else: + method = self.host.bridge.ps_node_subscriptions_get + try: + subscriptions = await method( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get node subscriptions: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(subscriptions) + self.host.quit() + + +class StoreSubscriptionAction(argparse.Action): + """Action which handle subscription parameter for owner + + list is given by pairs: jid and subscription state + if subscription state is not specified, it default to "subscribed" + """ + + def __call__(self, parser, namespace, values, option_string): + dest_dict = getattr(namespace, self.dest) + while values: + jid_s = values.pop(0) + try: + subscription = values.pop(0) + except IndexError: + subscription = "subscribed" + if subscription not in ALLOWED_SUBSCRIPTIONS_OWNER: + parser.error( + _("subscription must be one of {}").format( + ", ".join(ALLOWED_SUBSCRIPTIONS_OWNER) + ) + ) + dest_dict[jid_s] = subscription + + +class NodeSubscriptionsSet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "set", + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("set/modify subscriptions (for node owner)"), + ) + + def add_parser_options(self): + # XXX: we use optional argument syntax for a required one because list of list of 2 elements + # (uses to construct dicts) don't work with positional arguments + self.parser.add_argument( + "-S", + "--subscription", + dest="subscriptions", + default={}, + nargs="+", + metavar=("JID [SUSBSCRIPTION]"), + required=True, + action=StoreSubscriptionAction, + help=_("entity/subscription couple(s)"), + ) + + async def start(self): + try: + self.host.bridge.ps_node_subscriptions_set( + self.args.service, + self.args.node, + self.args.subscriptions, + self.profile, + ) + except Exception as e: + self.disp(f"can't set node subscriptions: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("subscriptions have been set"), 1) + self.host.quit() + + +class NodeSubscriptions(base.CommandBase): + subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet) + + def __init__(self, host): + super(NodeSubscriptions, self).__init__( + host, + "subscriptions", + use_profile=False, + help=_("get or modify node subscriptions"), + ) + + +class NodeSchemaSet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "set", + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("set/replace a schema"), + ) + + def add_parser_options(self): + self.parser.add_argument("schema", help=_("schema to set (must be XML)")) + + async def start(self): + try: + await self.host.bridge.ps_schema_set( + self.args.service, + self.args.node, + self.args.schema, + self.profile, + ) + except Exception as e: + self.disp(f"can't set schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("schema has been set"), 1) + self.host.quit() + + +class NodeSchemaEdit(base.CommandBase, common.BaseEdit): + use_items = False + + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "edit", + use_pubsub=True, + pubsub_flags={C.NODE}, + use_draft=True, + use_verbose=True, + help=_("edit a schema"), + ) + common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR) + + def add_parser_options(self): + pass + + async def publish(self, schema): + try: + await self.host.bridge.ps_schema_set( + self.args.service, + self.args.node, + schema, + self.profile, + ) + except Exception as e: + self.disp(f"can't set schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("schema has been set"), 1) + self.host.quit() + + async def ps_schema_get_cb(self, schema): + try: + from lxml import etree + except ImportError: + self.disp( + "lxml module must be installed to use edit, please install it " + 'with "pip install lxml"', + error=True, + ) + self.host.quit(1) + content_file_obj, content_file_path = self.get_tmp_file() + schema = schema.strip() + if schema: + parser = etree.XMLParser(remove_blank_text=True) + schema_elt = etree.fromstring(schema, parser) + content_file_obj.write( + etree.tostring(schema_elt, encoding="utf-8", pretty_print=True) + ) + content_file_obj.seek(0) + await self.run_editor( + "pubsub_schema_editor_args", content_file_path, content_file_obj + ) + + async def start(self): + try: + schema = await self.host.bridge.ps_schema_get( + self.args.service, + self.args.node, + self.profile, + ) + except BridgeException as e: + if e.condition == "item-not-found" or e.classname == "NotFound": + schema = "" + else: + self.disp(f"can't edit schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + + await self.ps_schema_get_cb(schema) + + +class NodeSchemaGet(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "get", + use_output=C.OUTPUT_XML, + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("get schema"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + schema = await self.host.bridge.ps_schema_get( + self.args.service, + self.args.node, + self.profile, + ) + except BridgeException as e: + if e.condition == "item-not-found" or e.classname == "NotFound": + schema = None + else: + self.disp(f"can't get schema: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + + if schema: + await self.output(schema) + self.host.quit() + else: + self.disp(_("no schema found"), 1) + self.host.quit(C.EXIT_NOT_FOUND) + + +class NodeSchema(base.CommandBase): + subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet) + + def __init__(self, host): + super(NodeSchema, self).__init__( + host, "schema", use_profile=False, help=_("data schema manipulation") + ) + + +class Node(base.CommandBase): + subcommands = ( + NodeInfo, + NodeCreate, + NodePurge, + NodeDelete, + NodeSet, + NodeImport, + NodeAffiliations, + NodeSubscriptions, + NodeSchema, + ) + + def __init__(self, host): + super(Node, self).__init__( + host, "node", use_profile=False, help=_("node handling") + ) + + +class CacheGet(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "get", + use_output=C.OUTPUT_LIST_XML, + use_pubsub=True, + pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE}, + help=_("get pubsub item(s) from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-S", + "--sub-id", + default="", + help=_("subscription id"), + ) + + async def start(self): + try: + ps_result = data_format.deserialise( + await self.host.bridge.ps_cache_get( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + self.args.sub_id, + self.get_pubsub_extra(), + self.profile, + ) + ) + except BridgeException as e: + if e.classname == "NotFound": + self.disp( + f"The node {self.args.node} from {self.args.service} is not in cache " + f"for {self.profile}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't get pubsub items from cache: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + await self.output(ps_result["items"]) + self.host.quit(C.EXIT_OK) + + +class CacheSync(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "sync", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("(re)synchronise a pubsub node"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + await self.host.bridge.ps_cache_sync( + self.args.service, + self.args.node, + self.profile, + ) + except BridgeException as e: + if e.condition == "item-not-found" or e.classname == "NotFound": + self.disp( + f"The node {self.args.node} doesn't exist on {self.args.service}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't synchronise pubsub node: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CachePurge(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "purge", + use_profile=False, + help=_("purge (delete) items from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-s", "--service", action="append", metavar="JID", dest="services", + help="purge items only for these services. If not specified, items from ALL " + "services will be purged. May be used several times." + ) + self.parser.add_argument( + "-n", "--node", action="append", dest="nodes", + help="purge items only for these nodes. If not specified, items from ALL " + "nodes will be purged. May be used several times." + ) + self.parser.add_argument( + "-p", "--profile", action="append", dest="profiles", + help="purge items only for these profiles. If not specified, items from ALL " + "profiles will be purged. May be used several times." + ) + self.parser.add_argument( + "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN", + help="purge items which have been last updated before given time." + ) + self.parser.add_argument( + "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN", + help="purge items which have been last created before given time." + ) + self.parser.add_argument( + "-t", "--type", action="append", dest="types", + help="purge items flagged with TYPE. May be used several times." + ) + self.parser.add_argument( + "-S", "--subtype", action="append", dest="subtypes", + help="purge items flagged with SUBTYPE. May be used several times." + ) + self.parser.add_argument( + "-f", "--force", action="store_true", + help=_("purge items without confirmation") + ) + + async def start(self): + if not self.args.force: + await self.host.confirm_or_quit( + _( + "Are you sure to purge items from cache? You'll have to bypass cache " + "or resynchronise nodes to access deleted items again." + ), + _("Items purgins has been cancelled.") + ) + purge_data = {} + for key in ( + "services", "nodes", "profiles", "updated_before", "created_before", + "types", "subtypes" + ): + value = getattr(self.args, key) + if value is not None: + purge_data[key] = value + try: + await self.host.bridge.ps_cache_purge( + data_format.serialise( + purge_data + ) + ) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CacheReset(base.CommandBase): + + def __init__(self, host): + super().__init__( + host, + "reset", + use_profile=False, + help=_("remove everything from cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", "--force", action="store_true", + help=_("reset cache without confirmation") + ) + + async def start(self): + if not self.args.force: + await self.host.confirm_or_quit( + _( + "Are you sure to reset cache? All nodes and items will be removed " + "from it, then it will be progressively refilled as if it were new. " + "This may be resources intensive." + ), + _("Pubsub cache reset has been cancelled.") + ) + try: + await self.host.bridge.ps_cache_reset() + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + self.host.quit(C.EXIT_OK) + + +class CacheSearch(base.CommandBase): + def __init__(self, host): + extra_outputs = { + "default": self.default_output, + "xml": self.xml_output, + "xml-raw": self.xml_raw_output, + } + super().__init__( + host, + "search", + use_profile=False, + use_output=C.OUTPUT_LIST_DICT, + extra_outputs=extra_outputs, + help=_("search for pubsub items in cache"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY" + ) + self.parser.add_argument( + "-p", "--profile", action="append", dest="profiles", metavar="PROFILE", + help="search items only from these profiles. May be used several times." + ) + self.parser.add_argument( + "-s", "--service", action="append", dest="services", metavar="SERVICE", + help="items must be from specified service. May be used several times." + ) + self.parser.add_argument( + "-n", "--node", action="append", dest="nodes", metavar="NODE", + help="items must be in the specified node. May be used several times." + ) + self.parser.add_argument( + "-t", "--type", action="append", dest="types", metavar="TYPE", + help="items must be of specified type. May be used several times." + ) + self.parser.add_argument( + "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE", + help="items must be of specified subtype. May be used several times." + ) + self.parser.add_argument( + "-P", "--payload", action="store_true", help=_("include item XML payload") + ) + self.parser.add_argument( + "-o", "--order-by", action="append", nargs="+", + metavar=("ORDER", "[FIELD] [DIRECTION]"), + help=_("how items must be ordered. May be used several times.") + ) + self.parser.add_argument( + "-l", "--limit", type=int, help=_("maximum number of items to return") + ) + self.parser.add_argument( + "-i", "--index", type=int, help=_("return results starting from this index") + ) + self.parser.add_argument( + "-F", + "--field", + action="append", + nargs=3, + dest="fields", + default=[], + metavar=("PATH", "OPERATOR", "VALUE"), + help=_("parsed data field filter. May be used several times."), + ) + self.parser.add_argument( + "-k", + "--key", + action="append", + dest="keys", + metavar="KEY", + help=_( + "data key(s) to display. May be used several times. DEFAULT: show all " + "keys" + ), + ) + + async def start(self): + query = {} + for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"): + value = getattr(self.args, arg) + if value: + if arg in ("types", "subtypes"): + # empty string is used to find items without type and/or subtype + value = [v or None for v in value] + query[arg] = value + for arg in ("limit", "index"): + value = getattr(self.args, arg) + if value is not None: + query[arg] = value + if self.args.order_by is not None: + for order_data in self.args.order_by: + order, *args = order_data + if order == "field": + if not args: + self.parser.error(_("field data must be specified in --order-by")) + elif len(args) == 1: + path = args[0] + direction = "asc" + elif len(args) == 2: + path, direction = args + else: + self.parser.error(_( + "You can't specify more that 2 arguments for a field in " + "--order-by" + )) + try: + path = json.loads(path) + except json.JSONDecodeError: + pass + order_query = { + "path": path, + } + else: + order_query = { + "order": order + } + if not args: + direction = "asc" + elif len(args) == 1: + direction = args[0] + else: + self.parser.error(_( + "there are too many arguments in --order-by option" + )) + if direction.lower() not in ("asc", "desc"): + self.parser.error(_("invalid --order-by direction: {direction!r}")) + order_query["direction"] = direction + query.setdefault("order-by", []).append(order_query) + + if self.args.fields: + parsed = [] + for field in self.args.fields: + path, operator, value = field + try: + path = json.loads(path) + except json.JSONDecodeError: + # this is not a JSON encoded value, we keep it as a string + pass + + if not isinstance(path, list): + path = [path] + + # handling of TP(<time pattern>) + if operator in (">", "gt", "<", "le", "between"): + def datetime_sub(match): + return str(date_utils.date_parse_ext( + match.group(1), default_tz=date_utils.TZ_LOCAL + )) + value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value) + + try: + value = json.loads(value) + except json.JSONDecodeError: + # not JSON, as above we keep it as string + pass + + if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"): + if not isinstance(value, list): + value = [value] + + parsed.append({ + "path": path, + "op": operator, + "value": value + }) + + query["parsed"] = parsed + + if self.args.payload or "xml" in self.args.output: + query["with_payload"] = True + if self.args.keys: + self.args.keys.append("item_payload") + try: + found_items = data_format.deserialise( + await self.host.bridge.ps_cache_search( + data_format.serialise(query) + ), + type_check=list, + ) + except BridgeException as e: + self.disp(f"can't search for pubsub items in cache: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + if self.args.keys: + found_items = [ + {k: v for k,v in item.items() if k in self.args.keys} + for item in found_items + ] + await self.output(found_items) + self.host.quit(C.EXIT_OK) + + def default_output(self, found_items): + for item in found_items: + for field in ("created", "published", "updated"): + try: + timestamp = item[field] + except KeyError: + pass + else: + try: + item[field] = common.format_time(timestamp) + except ValueError: + pass + self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items) + + def xml_output(self, found_items): + """Output prettified item payload""" + cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"] + for item in found_items: + cb(item["item_payload"]) + + def xml_raw_output(self, found_items): + """Output item payload without prettifying""" + cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"] + for item in found_items: + cb(item["item_payload"]) + + +class Cache(base.CommandBase): + subcommands = ( + CacheGet, + CacheSync, + CachePurge, + CacheReset, + CacheSearch, + ) + + def __init__(self, host): + super(Cache, self).__init__( + host, "cache", use_profile=False, help=_("pubsub cache handling") + ) + + +class Set(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "set", + use_pubsub=True, + use_quiet=True, + pubsub_flags={C.NODE}, + help=_("publish a new item or update an existing one"), + ) + + def add_parser_options(self): + NodeCreate.add_node_config_options(self.parser) + self.parser.add_argument( + "-e", + "--encrypt", + action="store_true", + help=_("end-to-end encrypt the blog item") + ) + self.parser.add_argument( + "--encrypt-for", + metavar="JID", + action="append", + help=_("encrypt a single item for") + ) + self.parser.add_argument( + "-X", + "--sign", + action="store_true", + help=_("cryptographically sign the blog post") + ) + self.parser.add_argument( + "item", + nargs="?", + default="", + help=_("id, URL of the item to update, keyword, or nothing for new item"), + ) + + async def start(self): + element, etree = xml_tools.etree_parse(self, sys.stdin) + element = xml_tools.get_payload(self, element) + payload = etree.tostring(element, encoding="unicode") + extra = {} + if self.args.encrypt: + extra["encrypted"] = True + if self.args.encrypt_for: + extra["encrypted_for"] = {"targets": self.args.encrypt_for} + if self.args.sign: + extra["signed"] = True + publish_options = NodeCreate.get_config_options(self.args) + if publish_options: + extra["publish_options"] = publish_options + + try: + published_id = await self.host.bridge.ps_item_send( + self.args.service, + self.args.node, + payload, + self.args.item, + data_format.serialise(extra), + self.profile, + ) + except Exception as e: + self.disp(_("can't send item: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if published_id: + if self.args.quiet: + self.disp(published_id, end="") + else: + self.disp(f"Item published at {published_id}") + else: + self.disp("Item published") + self.host.quit(C.EXIT_OK) + + +class Get(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "get", + use_output=C.OUTPUT_LIST_XML, + use_pubsub=True, + pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE}, + help=_("get pubsub item(s)"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-S", + "--sub-id", + default="", + help=_("subscription id"), + ) + self.parser.add_argument( + "--no-decrypt", + action="store_true", + help=_("don't do automatic decryption of e2ee items"), + ) + # TODO: a key(s) argument to select keys to display + + async def start(self): + extra = {} + if self.args.no_decrypt: + extra["decrypt"] = False + try: + ps_result = data_format.deserialise( + await self.host.bridge.ps_items_get( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + self.args.sub_id, + self.get_pubsub_extra(extra), + self.profile, + ) + ) + except BridgeException as e: + if e.condition == "item-not-found" or e.classname == "NotFound": + self.disp( + f"The node {self.args.node} doesn't exist on {self.args.service}", + error=True, + ) + self.host.quit(C.EXIT_NOT_FOUND) + else: + self.disp(f"can't get pubsub items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + except Exception as e: + self.disp(f"Internal error: {e}", error=True) + self.host.quit(C.EXIT_INTERNAL_ERROR) + else: + await self.output(ps_result["items"]) + self.host.quit(C.EXIT_OK) + + +class Delete(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "delete", + use_pubsub=True, + pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM}, + help=_("delete an item"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-f", "--force", action="store_true", help=_("delete without confirmation") + ) + self.parser.add_argument( + "--no-notification", dest="notify", action="store_false", + help=_("do not send notification (not recommended)") + ) + + async def start(self): + if not self.args.item: + self.parser.error(_("You need to specify an item to delete")) + if not self.args.force: + message = _("Are you sure to delete item {item_id} ?").format( + item_id=self.args.item + ) + await self.host.confirm_or_quit(message, _("item deletion cancelled")) + try: + await self.host.bridge.ps_item_retract( + self.args.service, + self.args.node, + self.args.item, + self.args.notify, + self.profile, + ) + except Exception as e: + self.disp(_("can't delete item: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("item {item} has been deleted").format(item=self.args.item)) + self.host.quit(C.EXIT_OK) + + +class Edit(base.CommandBase, common.BaseEdit): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "edit", + use_verbose=True, + use_pubsub=True, + pubsub_flags={C.NODE, C.SINGLE_ITEM}, + use_draft=True, + help=_("edit an existing or new pubsub item"), + ) + common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR) + + def add_parser_options(self): + self.parser.add_argument( + "-e", + "--encrypt", + action="store_true", + help=_("end-to-end encrypt the blog item") + ) + self.parser.add_argument( + "--encrypt-for", + metavar="JID", + action="append", + help=_("encrypt a single item for") + ) + self.parser.add_argument( + "-X", + "--sign", + action="store_true", + help=_("cryptographically sign the blog post") + ) + + async def publish(self, content): + extra = {} + if self.args.encrypt: + extra["encrypted"] = True + if self.args.encrypt_for: + extra["encrypted_for"] = {"targets": self.args.encrypt_for} + if self.args.sign: + extra["signed"] = True + published_id = await self.host.bridge.ps_item_send( + self.pubsub_service, + self.pubsub_node, + content, + self.pubsub_item or "", + data_format.serialise(extra), + self.profile, + ) + if published_id: + self.disp("Item published at {pub_id}".format(pub_id=published_id)) + else: + self.disp("Item published") + + async def get_item_data(self, service, node, item): + try: + from lxml import etree + except ImportError: + self.disp( + "lxml module must be installed to use edit, please install it " + 'with "pip install lxml"', + error=True, + ) + self.host.quit(1) + items = [item] if item else [] + ps_result = data_format.deserialise( + await self.host.bridge.ps_items_get( + service, node, 1, items, "", data_format.serialise({}), self.profile + ) + ) + item_raw = ps_result["items"][0] + parser = etree.XMLParser(remove_blank_text=True, recover=True) + item_elt = etree.fromstring(item_raw, parser) + item_id = item_elt.get("id") + try: + payload = item_elt[0] + except IndexError: + self.disp(_("Item has not payload"), 1) + return "", item_id + return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id + + async def start(self): + ( + self.pubsub_service, + self.pubsub_node, + self.pubsub_item, + content_file_path, + content_file_obj, + ) = await self.get_item_path() + await self.run_editor("pubsub_editor_args", content_file_path, content_file_obj) + self.host.quit() + + +class Rename(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "rename", + use_pubsub=True, + pubsub_flags={C.NODE, C.SINGLE_ITEM}, + help=_("rename a pubsub item"), + ) + + def add_parser_options(self): + self.parser.add_argument("new_id", help=_("new item id to use")) + + async def start(self): + try: + await self.host.bridge.ps_item_rename( + self.args.service, + self.args.node, + self.args.item, + self.args.new_id, + self.profile, + ) + except Exception as e: + self.disp(f"can't rename item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp("Item renamed") + self.host.quit(C.EXIT_OK) + + +class Subscribe(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "subscribe", + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("subscribe to a node"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "--public", + action="store_true", + help=_("make the registration visible for everybody"), + ) + + async def start(self): + options = {} + if self.args.public: + namespaces = await self.host.bridge.namespaces_get() + try: + ns_pps = namespaces["pps"] + except KeyError: + self.disp( + "Pubsub Public Subscription plugin is not loaded, can't use --public " + "option, subscription stopped", error=True + ) + self.host.quit(C.EXIT_MISSING_FEATURE) + else: + options[f"{{{ns_pps}}}public"] = True + try: + sub_id = await self.host.bridge.ps_subscribe( + self.args.service, + self.args.node, + data_format.serialise(options), + self.profile, + ) + except Exception as e: + self.disp(_("can't subscribe to node: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("subscription done"), 1) + if sub_id: + self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id)) + self.host.quit() + + +class Unsubscribe(base.CommandBase): + # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe + + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "unsubscribe", + use_pubsub=True, + pubsub_flags={C.NODE}, + use_verbose=True, + help=_("unsubscribe from a node"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + await self.host.bridge.ps_unsubscribe( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(_("can't unsubscribe from node: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(_("subscription removed"), 1) + self.host.quit() + + +class Subscriptions(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "subscriptions", + use_output=C.OUTPUT_LIST_DICT, + use_pubsub=True, + help=_("retrieve all subscriptions on a service"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "--public", + action="store_true", + help=_("get public subscriptions"), + ) + + async def start(self): + if self.args.public: + method = self.host.bridge.ps_public_subscriptions_get + else: + method = self.host.bridge.ps_subscriptions_get + try: + subscriptions = data_format.deserialise( + await method( + self.args.service, + self.args.node, + self.profile, + ), + type_check=list + ) + except Exception as e: + self.disp(_("can't retrieve subscriptions: {e}").format(e=e), error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(subscriptions) + self.host.quit() + + +class Affiliations(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "affiliations", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + help=_("retrieve all affiliations on a service"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + affiliations = await self.host.bridge.ps_affiliations_get( + self.args.service, + self.args.node, + self.profile, + ) + except Exception as e: + self.disp(f"can't get node affiliations: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(affiliations) + self.host.quit() + + +class Reference(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "reference", + use_pubsub=True, + pubsub_flags={C.NODE, C.SINGLE_ITEM}, + help=_("send a reference/mention to pubsub item"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-t", + "--type", + default="mention", + choices=("data", "mention"), + help=_("type of reference to send (DEFAULT: mention)"), + ) + self.parser.add_argument( + "recipient", + help=_("recipient of the reference") + ) + + async def start(self): + service = self.args.service or await self.host.get_profile_jid() + if self.args.item: + anchor = uri.build_xmpp_uri( + "pubsub", path=service, node=self.args.node, item=self.args.item + ) + else: + anchor = uri.build_xmpp_uri("pubsub", path=service, node=self.args.node) + + try: + await self.host.bridge.reference_send( + self.args.recipient, + anchor, + self.args.type, + "", + self.profile, + ) + except Exception as e: + self.disp(f"can't send reference: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.host.quit() + + +class Search(base.CommandBase): + """This command do a search without using MAM + + This commands checks every items it finds by itself, + so it may be heavy in resources both for server and client + """ + + RE_FLAGS = re.MULTILINE | re.UNICODE + EXEC_ACTIONS = ("exec", "external") + + def __init__(self, host): + # FIXME: C.NO_MAX is not needed here, and this can be globally removed from consts + # the only interest is to change the help string, but this can be explained + # extensively in man pages (max is for each node found) + base.CommandBase.__init__( + self, + host, + "search", + use_output=C.OUTPUT_XML, + use_pubsub=True, + pubsub_flags={C.MULTI_ITEMS, C.NO_MAX}, + use_verbose=True, + help=_("search items corresponding to filters"), + ) + + @property + def etree(self): + """load lxml.etree only if needed""" + if self._etree is None: + from lxml import etree + + self._etree = etree + return self._etree + + def filter_opt(self, value, type_): + return (type_, value) + + def filter_flag(self, value, type_): + value = C.bool(value) + return (type_, value) + + def add_parser_options(self): + self.parser.add_argument( + "-D", + "--max-depth", + type=int, + default=0, + help=_( + "maximum depth of recursion (will search linked nodes if > 0, " + "DEFAULT: 0)" + ), + ) + self.parser.add_argument( + "-M", + "--node-max", + type=int, + default=30, + help=_( + "maximum number of items to get per node ({} to get all items, " + "DEFAULT: 30)".format(C.NO_LIMIT) + ), + ) + self.parser.add_argument( + "-N", + "--namespace", + action="append", + nargs=2, + default=[], + metavar="NAME NAMESPACE", + help=_("namespace to use for xpath"), + ) + + # filters + filter_text = partial(self.filter_opt, type_="text") + filter_re = partial(self.filter_opt, type_="regex") + filter_xpath = partial(self.filter_opt, type_="xpath") + filter_python = partial(self.filter_opt, type_="python") + filters = self.parser.add_argument_group( + _("filters"), + _("only items corresponding to following filters will be kept"), + ) + filters.add_argument( + "-t", + "--text", + action="append", + dest="filters", + type=filter_text, + metavar="TEXT", + help=_("full text filter, item must contain this string (XML included)"), + ) + filters.add_argument( + "-r", + "--regex", + action="append", + dest="filters", + type=filter_re, + metavar="EXPRESSION", + help=_("like --text but using a regular expression"), + ) + filters.add_argument( + "-x", + "--xpath", + action="append", + dest="filters", + type=filter_xpath, + metavar="XPATH", + help=_("filter items which has elements matching this xpath"), + ) + filters.add_argument( + "-P", + "--python", + action="append", + dest="filters", + type=filter_python, + metavar="PYTHON_CODE", + help=_( + "Python expression which much return a bool (True to keep item, " + 'False to reject it). "item" is raw text item, "item_xml" is ' + "lxml's etree.Element" + ), + ) + + # filters flags + flag_case = partial(self.filter_flag, type_="ignore-case") + flag_invert = partial(self.filter_flag, type_="invert") + flag_dotall = partial(self.filter_flag, type_="dotall") + flag_matching = partial(self.filter_flag, type_="only-matching") + flags = self.parser.add_argument_group( + _("filters flags"), + _("filters modifiers (change behaviour of following filters)"), + ) + flags.add_argument( + "-C", + "--ignore-case", + action="append", + dest="filters", + type=flag_case, + const=("ignore-case", True), + nargs="?", + metavar="BOOLEAN", + help=_("(don't) ignore case in following filters (DEFAULT: case sensitive)"), + ) + flags.add_argument( + "-I", + "--invert", + action="append", + dest="filters", + type=flag_invert, + const=("invert", True), + nargs="?", + metavar="BOOLEAN", + help=_("(don't) invert effect of following filters (DEFAULT: don't invert)"), + ) + flags.add_argument( + "-A", + "--dot-all", + action="append", + dest="filters", + type=flag_dotall, + const=("dotall", True), + nargs="?", + metavar="BOOLEAN", + help=_("(don't) use DOTALL option for regex (DEFAULT: don't use)"), + ) + flags.add_argument( + "-k", + "--only-matching", + action="append", + dest="filters", + type=flag_matching, + const=("only-matching", True), + nargs="?", + metavar="BOOLEAN", + help=_("keep only the matching part of the item"), + ) + + # action + self.parser.add_argument( + "action", + default="print", + nargs="?", + choices=("print", "exec", "external"), + help=_("action to do on found items (DEFAULT: print)"), + ) + self.parser.add_argument("command", nargs=argparse.REMAINDER) + + async def get_items(self, depth, service, node, items): + self.to_get += 1 + try: + ps_result = data_format.deserialise( + await self.host.bridge.ps_items_get( + service, + node, + self.args.node_max, + items, + "", + self.get_pubsub_extra(), + self.profile, + ) + ) + except Exception as e: + self.disp( + f"can't get pubsub items at {service} (node: {node}): {e}", + error=True, + ) + self.to_get -= 1 + else: + await self.search(ps_result, depth) + + def _check_pubsub_url(self, match, found_nodes): + """check that the matched URL is an xmpp: one + + @param found_nodes(list[unicode]): found_nodes + this list will be filled while xmpp: URIs are discovered + """ + url = match.group(0) + if url.startswith("xmpp"): + try: + url_data = uri.parse_xmpp_uri(url) + except ValueError: + return + if url_data["type"] == "pubsub": + found_node = {"service": url_data["path"], "node": url_data["node"]} + if "item" in url_data: + found_node["item"] = url_data["item"] + found_nodes.append(found_node) + + async def get_sub_nodes(self, item, depth): + """look for pubsub URIs in item, and get_items on the linked nodes""" + found_nodes = [] + checkURI = partial(self._check_pubsub_url, found_nodes=found_nodes) + strings.RE_URL.sub(checkURI, item) + for data in found_nodes: + await self.get_items( + depth + 1, + data["service"], + data["node"], + [data["item"]] if "item" in data else [], + ) + + def parseXml(self, item): + try: + return self.etree.fromstring(item) + except self.etree.XMLSyntaxError: + self.disp( + _( + "item doesn't looks like XML, you have probably used --only-matching " + "somewhere before and we have no more XML" + ), + error=True, + ) + self.host.quit(C.EXIT_BAD_ARG) + + def filter(self, item): + """apply filters given on command line + + if only-matching is used, item may be modified + @return (tuple[bool, unicode]): a tuple with: + - keep: True if item passed the filters + - item: it is returned in case of modifications + """ + ignore_case = False + invert = False + dotall = False + only_matching = False + item_xml = None + for type_, value in self.args.filters: + keep = True + + ## filters + + if type_ == "text": + if ignore_case: + if value.lower() not in item.lower(): + keep = False + else: + if value not in item: + keep = False + if keep and only_matching: + # doesn't really make sens to keep a fixed string + # so we raise an error + self.host.disp( + _("--only-matching used with fixed --text string, are you sure?"), + error=True, + ) + self.host.quit(C.EXIT_BAD_ARG) + elif type_ == "regex": + flags = self.RE_FLAGS + if ignore_case: + flags |= re.IGNORECASE + if dotall: + flags |= re.DOTALL + match = re.search(value, item, flags) + keep = match != None + if keep and only_matching: + item = match.group() + item_xml = None + elif type_ == "xpath": + if item_xml is None: + item_xml = self.parseXml(item) + try: + elts = item_xml.xpath(value, namespaces=self.args.namespace) + except self.etree.XPathEvalError as e: + self.disp(_("can't use xpath: {reason}").format(reason=e), error=True) + self.host.quit(C.EXIT_BAD_ARG) + keep = bool(elts) + if keep and only_matching: + item_xml = elts[0] + try: + item = self.etree.tostring(item_xml, encoding="unicode") + except TypeError: + # we have a string only, not an element + item = str(item_xml) + item_xml = None + elif type_ == "python": + if item_xml is None: + item_xml = self.parseXml(item) + cmd_ns = {"etree": self.etree, "item": item, "item_xml": item_xml} + try: + keep = eval(value, cmd_ns) + except SyntaxError as e: + self.disp(str(e), error=True) + self.host.quit(C.EXIT_BAD_ARG) + + ## flags + + elif type_ == "ignore-case": + ignore_case = value + elif type_ == "invert": + invert = value + # we need to continue, else loop would end here + continue + elif type_ == "dotall": + dotall = value + elif type_ == "only-matching": + only_matching = value + else: + raise exceptions.InternalError( + _("unknown filter type {type}").format(type=type_) + ) + + if invert: + keep = not keep + if not keep: + return False, item + + return True, item + + async def do_item_action(self, item, metadata): + """called when item has been kepts and the action need to be done + + @param item(unicode): accepted item + """ + action = self.args.action + if action == "print" or self.host.verbosity > 0: + try: + await self.output(item) + except self.etree.XMLSyntaxError: + # item is not valid XML, but a string + # can happen when --only-matching is used + self.disp(item) + if action in self.EXEC_ACTIONS: + item_elt = self.parseXml(item) + if action == "exec": + use = { + "service": metadata["service"], + "node": metadata["node"], + "item": item_elt.get("id"), + "profile": self.profile, + } + # we need to send a copy of self.args.command + # else it would be modified + parser_args, use_args = arg_tools.get_use_args( + self.host, self.args.command, use, verbose=self.host.verbosity > 1 + ) + cmd_args = sys.argv[0:1] + parser_args + use_args + else: + cmd_args = self.args.command + + self.disp( + "COMMAND: {command}".format( + command=" ".join([arg_tools.escape(a) for a in cmd_args]) + ), + 2, + ) + if action == "exec": + p = await asyncio.create_subprocess_exec(*cmd_args) + ret = await p.wait() + else: + p = await asyncio.create_subprocess_exec(*cmd_args, stdin=subprocess.PIPE) + await p.communicate(item.encode(sys.getfilesystemencoding())) + ret = p.returncode + if ret != 0: + self.disp( + A.color( + C.A_FAILURE, + _("executed command failed with exit code {ret}").format(ret=ret), + ) + ) + + async def search(self, ps_result, depth): + """callback of get_items + + this method filters items, get sub nodes if needed, + do the requested action, and exit the command when everything is done + @param items_data(tuple): result of get_items + @param depth(int): current depth level + 0 for first node, 1 for first children, and so on + """ + for item in ps_result["items"]: + if depth < self.args.max_depth: + await self.get_sub_nodes(item, depth) + keep, item = self.filter(item) + if not keep: + continue + await self.do_item_action(item, ps_result) + + # we check if we got all get_items results + self.to_get -= 1 + if self.to_get == 0: + # yes, we can quit + self.host.quit() + assert self.to_get > 0 + + async def start(self): + if self.args.command: + if self.args.action not in self.EXEC_ACTIONS: + self.parser.error( + _("Command can only be used with {actions} actions").format( + actions=", ".join(self.EXEC_ACTIONS) + ) + ) + else: + if self.args.action in self.EXEC_ACTIONS: + self.parser.error(_("you need to specify a command to execute")) + if not self.args.node: + # TODO: handle get service affiliations when node is not set + self.parser.error(_("empty node is not handled yet")) + # to_get is increased on each get and decreased on each answer + # when it reach 0 again, the command is finished + self.to_get = 0 + self._etree = None + if self.args.filters is None: + self.args.filters = [] + self.args.namespace = dict( + self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] + ) + await self.get_items(0, self.args.service, self.args.node, self.args.items) + + +class Transform(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "transform", + use_pubsub=True, + pubsub_flags={C.NODE, C.MULTI_ITEMS}, + help=_("modify items of a node using an external command/script"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "--apply", + action="store_true", + help=_("apply transformation (DEFAULT: do a dry run)"), + ) + self.parser.add_argument( + "--admin", + action="store_true", + help=_("do a pubsub admin request, needed to change publisher"), + ) + self.parser.add_argument( + "-I", + "--ignore-errors", + action="store_true", + help=_( + "if command return a non zero exit code, ignore the item and continue" + ), + ) + self.parser.add_argument( + "-A", + "--all", + action="store_true", + help=_("get all items by looping over all pages using RSM"), + ) + self.parser.add_argument( + "command_path", + help=_( + "path to the command to use. Will be called repetitivly with an " + "item as input. Output (full item XML) will be used as new one. " + 'Return "DELETE" string to delete the item, and "SKIP" to ignore it' + ), + ) + + async def ps_items_send_cb(self, item_ids, metadata): + if item_ids: + self.disp( + _("items published with ids {item_ids}").format( + item_ids=", ".join(item_ids) + ) + ) + else: + self.disp(_("items published")) + if self.args.all: + return await self.handle_next_page(metadata) + else: + self.host.quit() + + async def handle_next_page(self, metadata): + """Retrieve new page through RSM or quit if we're in the last page + + use to handle --all option + @param metadata(dict): metadata as returned by ps_items_get + """ + try: + last = metadata["rsm"]["last"] + index = int(metadata["rsm"]["index"]) + count = int(metadata["rsm"]["count"]) + except KeyError: + self.disp( + _("Can't retrieve all items, RSM metadata not available"), error=True + ) + self.host.quit(C.EXIT_MISSING_FEATURE) + except ValueError as e: + self.disp( + _("Can't retrieve all items, bad RSM metadata: {msg}").format(msg=e), + error=True, + ) + self.host.quit(C.EXIT_ERROR) + + if index + self.args.rsm_max >= count: + self.disp(_("All items transformed")) + self.host.quit(0) + + self.disp( + _("Retrieving next page ({page_idx}/{page_total})").format( + page_idx=int(index / self.args.rsm_max) + 1, + page_total=int(count / self.args.rsm_max), + ) + ) + + extra = self.get_pubsub_extra() + extra["rsm_after"] = last + try: + ps_result = await data_format.deserialise( + self.host.bridge.ps_items_get( + self.args.service, + self.args.node, + self.args.rsm_max, + self.args.items, + "", + data_format.serialise(extra), + self.profile, + ) + ) + except Exception as e: + self.disp(f"can't retrieve items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.ps_items_get_cb(ps_result) + + async def ps_items_get_cb(self, ps_result): + encoding = "utf-8" + new_items = [] + + for item in ps_result["items"]: + if self.check_duplicates: + # this is used when we are not ordering by creation + # to avoid infinite loop + item_elt, __ = xml_tools.etree_parse(self, item) + item_id = item_elt.get("id") + if item_id in self.items_ids: + self.disp( + _( + "Duplicate found on item {item_id}, we have probably handled " + "all items." + ).format(item_id=item_id) + ) + self.host.quit() + self.items_ids.append(item_id) + + # we launch the command to filter the item + try: + p = await asyncio.create_subprocess_exec( + self.args.command_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE + ) + except OSError as e: + exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR + self.disp(f"Can't execute the command: {e}", error=True) + self.host.quit(exit_code) + encoding = "utf-8" + cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding)) + ret = p.returncode + if ret != 0: + self.disp( + f"The command returned a non zero status while parsing the " + f"following item:\n\n{item}", + error=True, + ) + if self.args.ignore_errors: + continue + else: + self.host.quit(C.EXIT_CMD_ERROR) + if cmd_std_err is not None: + cmd_std_err = cmd_std_err.decode(encoding, errors="ignore") + self.disp(cmd_std_err, error=True) + cmd_std_out = cmd_std_out.decode(encoding).strip() + if cmd_std_out == "DELETE": + item_elt, __ = xml_tools.etree_parse(self, item) + item_id = item_elt.get("id") + self.disp(_("Deleting item {item_id}").format(item_id=item_id)) + if self.args.apply: + try: + await self.host.bridge.ps_item_retract( + self.args.service, + self.args.node, + item_id, + False, + self.profile, + ) + except Exception as e: + self.disp(f"can't delete item {item_id}: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + continue + elif cmd_std_out == "SKIP": + item_elt, __ = xml_tools.etree_parse(self, item) + item_id = item_elt.get("id") + self.disp(_("Skipping item {item_id}").format(item_id=item_id)) + continue + element, etree = xml_tools.etree_parse(self, cmd_std_out) + + # at this point command has been run and we have a etree.Element object + if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"): + self.disp( + "your script must return a whole item, this is not:\n{xml}".format( + xml=etree.tostring(element, encoding="unicode") + ), + error=True, + ) + self.host.quit(C.EXIT_DATA_ERROR) + + if not self.args.apply: + # we have a dry run, we just display filtered items + serialised = etree.tostring( + element, encoding="unicode", pretty_print=True + ) + self.disp(serialised) + else: + new_items.append(etree.tostring(element, encoding="unicode")) + + if not self.args.apply: + # on dry run we have nothing to wait for, we can quit + if self.args.all: + return await self.handle_next_page(ps_result) + self.host.quit() + else: + if self.args.admin: + bridge_method = self.host.bridge.ps_admin_items_send + else: + bridge_method = self.host.bridge.ps_items_send + + try: + ps_items_send_result = await bridge_method( + self.args.service, + self.args.node, + new_items, + "", + self.profile, + ) + except Exception as e: + self.disp(f"can't send item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.ps_items_send_cb(ps_items_send_result, metadata=ps_result) + + async def start(self): + if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: + self.check_duplicates = True + self.items_ids = [] + self.disp( + A.color( + A.FG_RED, + A.BOLD, + '/!\\ "--all" should be used with "--order-by creation" /!\\\n', + A.RESET, + "We'll update items, so order may change during transformation,\n" + "we'll try to mitigate that by stopping on first duplicate,\n" + "but this method is not safe, and some items may be missed.\n---\n", + ) + ) + else: + self.check_duplicates = False + + try: + ps_result = data_format.deserialise( + await self.host.bridge.ps_items_get( + self.args.service, + self.args.node, + self.args.max, + self.args.items, + "", + self.get_pubsub_extra(), + self.profile, + ) + ) + except Exception as e: + self.disp(f"can't retrieve items: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.ps_items_get_cb(ps_result) + + +class Uri(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "uri", + use_profile=False, + use_pubsub=True, + pubsub_flags={C.NODE, C.SINGLE_ITEM}, + help=_("build URI"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-p", + "--profile", + default=C.PROF_KEY_DEFAULT, + help=_("profile (used when no server is specified)"), + ) + + def display_uri(self, jid_): + uri_args = {} + if not self.args.service: + self.args.service = jid.JID(jid_).bare + + for key in ("node", "service", "item"): + value = getattr(self.args, key) + if key == "service": + key = "path" + if value: + uri_args[key] = value + self.disp(uri.build_xmpp_uri("pubsub", **uri_args)) + self.host.quit() + + async def start(self): + if not self.args.service: + try: + jid_ = await self.host.bridge.param_get_a_async( + "JabberID", "Connection", profile_key=self.args.profile + ) + except Exception as e: + self.disp(f"can't retrieve jid: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.display_uri(jid_) + else: + self.display_uri(None) + + +class AttachmentGet(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "get", + use_output=C.OUTPUT_LIST_DICT, + use_pubsub=True, + pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM}, + help=_("get data attached to an item"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-j", + "--jid", + action="append", + dest="jids", + help=_( + "get attached data published only by those JIDs (DEFAULT: get all " + "attached data)" + ) + ) + + async def start(self): + try: + attached_data, __ = await self.host.bridge.ps_attachments_get( + self.args.service, + self.args.node, + self.args.item, + self.args.jids or [], + "", + self.profile, + ) + except Exception as e: + self.disp(f"can't get attached data: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + attached_data = data_format.deserialise(attached_data, type_check=list) + await self.output(attached_data) + self.host.quit(C.EXIT_OK) + + +class AttachmentSet(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "set", + use_pubsub=True, + pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM}, + help=_("attach data to an item"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "--replace", + action="store_true", + help=_( + "replace previous versions of attachments (DEFAULT: update previous " + "version)" + ) + ) + self.parser.add_argument( + "-N", + "--noticed", + metavar="BOOLEAN", + nargs="?", + default="keep", + help=_("mark item as (un)noticed (DEFAULT: keep current value))") + ) + self.parser.add_argument( + "-r", + "--reactions", + # FIXME: to be replaced by "extend" when we stop supporting python 3.7 + action="append", + help=_("emojis to add to react to an item") + ) + self.parser.add_argument( + "-R", + "--reactions-remove", + # FIXME: to be replaced by "extend" when we stop supporting python 3.7 + action="append", + help=_("emojis to remove from reactions to an item") + ) + + async def start(self): + attachments_data = { + "service": self.args.service, + "node": self.args.node, + "id": self.args.item, + "extra": {} + } + operation = "replace" if self.args.replace else "update" + if self.args.noticed != "keep": + if self.args.noticed is None: + self.args.noticed = C.BOOL_TRUE + attachments_data["extra"]["noticed"] = C.bool(self.args.noticed) + + if self.args.reactions or self.args.reactions_remove: + reactions = attachments_data["extra"]["reactions"] = { + "operation": operation + } + if self.args.replace: + reactions["reactions"] = self.args.reactions + else: + reactions["add"] = self.args.reactions + reactions["remove"] = self.args.reactions_remove + + + if not attachments_data["extra"]: + self.parser.error(_("At leat one attachment must be specified.")) + + try: + await self.host.bridge.ps_attachments_set( + data_format.serialise(attachments_data), + self.profile, + ) + except Exception as e: + self.disp(f"can't attach data to item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp("data attached") + self.host.quit(C.EXIT_OK) + + +class Attachments(base.CommandBase): + subcommands = (AttachmentGet, AttachmentSet) + + def __init__(self, host): + super().__init__( + host, + "attachments", + use_profile=False, + help=_("set or retrieve items attachments"), + ) + + +class SignatureSign(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "sign", + use_pubsub=True, + pubsub_flags={C.NODE, C.SINGLE_ITEM}, + help=_("sign an item"), + ) + + def add_parser_options(self): + pass + + async def start(self): + attachments_data = { + "service": self.args.service, + "node": self.args.node, + "id": self.args.item, + "extra": { + # we set None to use profile's bare JID + "signature": {"signer": None} + } + } + try: + await self.host.bridge.ps_attachments_set( + data_format.serialise(attachments_data), + self.profile, + ) + except Exception as e: + self.disp(f"can't sign the item: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp(f"item {self.args.item!r} has been signed") + self.host.quit(C.EXIT_OK) + + +class SignatureCheck(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "check", + use_output=C.OUTPUT_DICT, + use_pubsub=True, + pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM}, + help=_("check the validity of pubsub signature"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "signature", + metavar="JSON", + help=_("signature data") + ) + + async def start(self): + try: + ret_s = await self.host.bridge.ps_signature_check( + self.args.service, + self.args.node, + self.args.item, + self.args.signature, + self.profile, + ) + except Exception as e: + self.disp(f"can't check signature: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + await self.output(data_format.deserialise((ret_s))) + self.host.quit() + + +class Signature(base.CommandBase): + subcommands = ( + SignatureSign, + SignatureCheck, + ) + + def __init__(self, host): + super().__init__( + host, "signature", use_profile=False, help=_("items signatures") + ) + + +class SecretShare(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "share", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("share a secret to let other entity encrypt or decrypt items"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-k", "--key", metavar="ID", dest="secret_ids", action="append", default=[], + help=_( + "only share secrets with those IDs (default: share all secrets of the " + "node)" + ) + ) + self.parser.add_argument( + "recipient", metavar="JID", help=_("entity who must get the shared secret") + ) + + async def start(self): + try: + await self.host.bridge.ps_secret_share( + self.args.recipient, + self.args.service, + self.args.node, + self.args.secret_ids, + self.profile, + ) + except Exception as e: + self.disp(f"can't share secret: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp("secrets have been shared") + self.host.quit(C.EXIT_OK) + + +class SecretRevoke(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "revoke", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("revoke an encrypted node secret"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "secret_id", help=_("ID of the secrets to revoke") + ) + self.parser.add_argument( + "-r", "--recipient", dest="recipients", metavar="JID", action="append", + default=[], help=_( + "entity who must get the revocation notification (default: send to all " + "entities known to have the shared secret)" + ) + ) + + async def start(self): + try: + await self.host.bridge.ps_secret_revoke( + self.args.service, + self.args.node, + self.args.secret_id, + self.args.recipients, + self.profile, + ) + except Exception as e: + self.disp(f"can't revoke secret: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp("secret {self.args.secret_id} has been revoked.") + self.host.quit(C.EXIT_OK) + + +class SecretRotate(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "rotate", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("revoke existing secrets, create a new one and send notifications"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-r", "--recipient", dest="recipients", metavar="JID", action="append", + default=[], help=_( + "entity who must get the revocation and shared secret notifications " + "(default: send to all entities known to have the shared secret)" + ) + ) + + async def start(self): + try: + await self.host.bridge.ps_secret_rotate( + self.args.service, + self.args.node, + self.args.recipients, + self.profile, + ) + except Exception as e: + self.disp(f"can't rotate secret: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp("secret has been rotated") + self.host.quit(C.EXIT_OK) + + +class SecretList(base.CommandBase): + def __init__(self, host): + super().__init__( + host, + "list", + use_pubsub=True, + use_verbose=True, + pubsub_flags={C.NODE}, + help=_("list known secrets for a pubsub node"), + use_output=C.OUTPUT_LIST_DICT + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + secrets = data_format.deserialise(await self.host.bridge.ps_secrets_list( + self.args.service, + self.args.node, + self.profile, + ), type_check=list) + except Exception as e: + self.disp(f"can't list node secrets: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if not self.verbosity: + # we don't print key if verbosity is not a least one, to avoid showing it + # on the screen accidentally + for secret in secrets: + del secret["key"] + await self.output(secrets) + self.host.quit(C.EXIT_OK) + + +class Secret(base.CommandBase): + subcommands = (SecretShare, SecretRevoke, SecretRotate, SecretList) + + def __init__(self, host): + super().__init__( + host, + "secret", + use_profile=False, + help=_("handle encrypted nodes secrets"), + ) + + +class HookCreate(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "create", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("create a Pubsub hook"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-t", + "--type", + default="python", + choices=("python", "python_file", "python_code"), + help=_("hook type"), + ) + self.parser.add_argument( + "-P", + "--persistent", + action="store_true", + help=_("make hook persistent across restarts"), + ) + self.parser.add_argument( + "hook_arg", + help=_("argument of the hook (depend of the type)"), + ) + + @staticmethod + def check_args(self): + if self.args.type == "python_file": + self.args.hook_arg = os.path.abspath(self.args.hook_arg) + if not os.path.isfile(self.args.hook_arg): + self.parser.error( + _("{path} is not a file").format(path=self.args.hook_arg) + ) + + async def start(self): + self.check_args(self) + try: + await self.host.bridge.ps_hook_add( + self.args.service, + self.args.node, + self.args.type, + self.args.hook_arg, + self.args.persistent, + self.profile, + ) + except Exception as e: + self.disp(f"can't create hook: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.host.quit() + + +class HookDelete(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "delete", + use_pubsub=True, + pubsub_flags={C.NODE}, + help=_("delete a Pubsub hook"), + ) + + def add_parser_options(self): + self.parser.add_argument( + "-t", + "--type", + default="", + choices=("", "python", "python_file", "python_code"), + help=_("hook type to remove, empty to remove all (DEFAULT: remove all)"), + ) + self.parser.add_argument( + "-a", + "--arg", + dest="hook_arg", + default="", + help=_( + "argument of the hook to remove, empty to remove all (DEFAULT: remove all)" + ), + ) + + async def start(self): + HookCreate.check_args(self) + try: + nb_deleted = await self.host.bridge.ps_hook_remove( + self.args.service, + self.args.node, + self.args.type, + self.args.hook_arg, + self.profile, + ) + except Exception as e: + self.disp(f"can't delete hook: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + self.disp( + _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted) + ) + self.host.quit() + + +class HookList(base.CommandBase): + def __init__(self, host): + base.CommandBase.__init__( + self, + host, + "list", + use_output=C.OUTPUT_LIST_DICT, + help=_("list hooks of a profile"), + ) + + def add_parser_options(self): + pass + + async def start(self): + try: + data = await self.host.bridge.ps_hook_list( + self.profile, + ) + except Exception as e: + self.disp(f"can't list hooks: {e}", error=True) + self.host.quit(C.EXIT_BRIDGE_ERRBACK) + else: + if not data: + self.disp(_("No hook found.")) + await self.output(data) + self.host.quit() + + +class Hook(base.CommandBase): + subcommands = (HookCreate, HookDelete, HookList) + + def __init__(self, host): + super(Hook, self).__init__( + host, + "hook", + use_profile=False, + use_verbose=True, + help=_("trigger action on Pubsub notifications"), + ) + + +class Pubsub(base.CommandBase): + subcommands = ( + Set, + Get, + Delete, + Edit, + Rename, + Subscribe, + Unsubscribe, + Subscriptions, + Affiliations, + Reference, + Search, + Transform, + Attachments, + Signature, + Secret, + Hook, + Uri, + Node, + Cache, + ) + + def __init__(self, host): + super(Pubsub, self).__init__( + host, "pubsub", use_profile=False, help=_("PubSub nodes/items management") + )