comparison libervia/frontends/jp/cmd_pubsub.py @ 4074:26b7ed2817da

refactoring: rename `sat_frontends` to `libervia.frontends`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 14:12:38 +0200
parents sat_frontends/jp/cmd_pubsub.py@4b842c1fb686
children
comparison
equal deleted inserted replaced
4073:7c5654c54fed 4074:26b7ed2817da
1 #!/usr/bin/env python3
2
3
4 # jp: a SàT command line tool
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
21 import argparse
22 import os.path
23 import re
24 import sys
25 import subprocess
26 import asyncio
27 import json
28 from . import base
29 from libervia.backend.core.i18n import _
30 from libervia.backend.core import exceptions
31 from libervia.frontends.jp.constants import Const as C
32 from libervia.frontends.jp import common
33 from libervia.frontends.jp import arg_tools
34 from libervia.frontends.jp import xml_tools
35 from functools import partial
36 from libervia.backend.tools.common import data_format
37 from libervia.backend.tools.common import uri
38 from libervia.backend.tools.common.ansi import ANSI as A
39 from libervia.backend.tools.common import date_utils
40 from libervia.frontends.tools import jid, strings
41 from libervia.frontends.bridge.bridge_frontend import BridgeException
42
43 __commands__ = ["Pubsub"]
44
45 PUBSUB_TMP_DIR = "pubsub"
46 PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema"
47 ALLOWED_SUBSCRIPTIONS_OWNER = ("subscribed", "pending", "none")
48
49 # TODO: need to split this class in several modules, plugin should handle subcommands
50
51
52 class NodeInfo(base.CommandBase):
53 def __init__(self, host):
54 base.CommandBase.__init__(
55 self,
56 host,
57 "info",
58 use_output=C.OUTPUT_DICT,
59 use_pubsub=True,
60 pubsub_flags={C.NODE},
61 help=_("retrieve node configuration"),
62 )
63
64 def add_parser_options(self):
65 self.parser.add_argument(
66 "-k",
67 "--key",
68 action="append",
69 dest="keys",
70 help=_("data key to filter"),
71 )
72
73 def remove_prefix(self, key):
74 return key[7:] if key.startswith("pubsub#") else key
75
76 def filter_key(self, key):
77 return any((key == k or key == "pubsub#" + k) for k in self.args.keys)
78
79 async def start(self):
80 try:
81 config_dict = await self.host.bridge.ps_node_configuration_get(
82 self.args.service,
83 self.args.node,
84 self.profile,
85 )
86 except BridgeException as e:
87 if e.condition == "item-not-found":
88 self.disp(
89 f"The node {self.args.node} doesn't exist on {self.args.service}",
90 error=True,
91 )
92 self.host.quit(C.EXIT_NOT_FOUND)
93 else:
94 self.disp(f"can't get node configuration: {e}", error=True)
95 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
96 except Exception as e:
97 self.disp(f"Internal error: {e}", error=True)
98 self.host.quit(C.EXIT_INTERNAL_ERROR)
99 else:
100 key_filter = (lambda k: True) if not self.args.keys else self.filter_key
101 config_dict = {
102 self.remove_prefix(k): v for k, v in config_dict.items() if key_filter(k)
103 }
104 await self.output(config_dict)
105 self.host.quit()
106
107
108 class NodeCreate(base.CommandBase):
109 def __init__(self, host):
110 base.CommandBase.__init__(
111 self,
112 host,
113 "create",
114 use_output=C.OUTPUT_DICT,
115 use_pubsub=True,
116 pubsub_flags={C.NODE},
117 use_verbose=True,
118 help=_("create a node"),
119 )
120
121 @staticmethod
122 def add_node_config_options(parser):
123 parser.add_argument(
124 "-f",
125 "--field",
126 action="append",
127 nargs=2,
128 dest="fields",
129 default=[],
130 metavar=("KEY", "VALUE"),
131 help=_("configuration field to set"),
132 )
133 parser.add_argument(
134 "-F",
135 "--full-prefix",
136 action="store_true",
137 help=_('don\'t prepend "pubsub#" prefix to field names'),
138 )
139
140 def add_parser_options(self):
141 self.add_node_config_options(self.parser)
142
143 @staticmethod
144 def get_config_options(args):
145 if not args.full_prefix:
146 return {"pubsub#" + k: v for k, v in args.fields}
147 else:
148 return dict(args.fields)
149
150 async def start(self):
151 options = self.get_config_options(self.args)
152 try:
153 node_id = await self.host.bridge.ps_node_create(
154 self.args.service,
155 self.args.node,
156 options,
157 self.profile,
158 )
159 except Exception as e:
160 self.disp(msg=_("can't create node: {e}").format(e=e), error=True)
161 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
162 else:
163 if self.host.verbosity:
164 announce = _("node created successfully: ")
165 else:
166 announce = ""
167 self.disp(announce + node_id)
168 self.host.quit()
169
170
171 class NodePurge(base.CommandBase):
172 def __init__(self, host):
173 super(NodePurge, self).__init__(
174 host,
175 "purge",
176 use_pubsub=True,
177 pubsub_flags={C.NODE},
178 help=_("purge a node (i.e. remove all items from it)"),
179 )
180
181 def add_parser_options(self):
182 self.parser.add_argument(
183 "-f",
184 "--force",
185 action="store_true",
186 help=_("purge node without confirmation"),
187 )
188
189 async def start(self):
190 if not self.args.force:
191 if not self.args.service:
192 message = _(
193 "Are you sure to purge PEP node [{node}]? This will "
194 "delete ALL items from it!"
195 ).format(node=self.args.node)
196 else:
197 message = _(
198 "Are you sure to delete node [{node}] on service "
199 "[{service}]? This will delete ALL items from it!"
200 ).format(node=self.args.node, service=self.args.service)
201 await self.host.confirm_or_quit(message, _("node purge cancelled"))
202
203 try:
204 await self.host.bridge.ps_node_purge(
205 self.args.service,
206 self.args.node,
207 self.profile,
208 )
209 except Exception as e:
210 self.disp(msg=_("can't purge node: {e}").format(e=e), error=True)
211 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
212 else:
213 self.disp(_("node [{node}] purged successfully").format(node=self.args.node))
214 self.host.quit()
215
216
217 class NodeDelete(base.CommandBase):
218 def __init__(self, host):
219 base.CommandBase.__init__(
220 self,
221 host,
222 "delete",
223 use_pubsub=True,
224 pubsub_flags={C.NODE},
225 help=_("delete a node"),
226 )
227
228 def add_parser_options(self):
229 self.parser.add_argument(
230 "-f",
231 "--force",
232 action="store_true",
233 help=_("delete node without confirmation"),
234 )
235
236 async def start(self):
237 if not self.args.force:
238 if not self.args.service:
239 message = _("Are you sure to delete PEP node [{node}] ?").format(
240 node=self.args.node
241 )
242 else:
243 message = _(
244 "Are you sure to delete node [{node}] on " "service [{service}]?"
245 ).format(node=self.args.node, service=self.args.service)
246 await self.host.confirm_or_quit(message, _("node deletion cancelled"))
247
248 try:
249 await self.host.bridge.ps_node_delete(
250 self.args.service,
251 self.args.node,
252 self.profile,
253 )
254 except Exception as e:
255 self.disp(f"can't delete node: {e}", error=True)
256 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
257 else:
258 self.disp(_("node [{node}] deleted successfully").format(node=self.args.node))
259 self.host.quit()
260
261
262 class NodeSet(base.CommandBase):
263 def __init__(self, host):
264 base.CommandBase.__init__(
265 self,
266 host,
267 "set",
268 use_output=C.OUTPUT_DICT,
269 use_pubsub=True,
270 pubsub_flags={C.NODE},
271 use_verbose=True,
272 help=_("set node configuration"),
273 )
274
275 def add_parser_options(self):
276 self.parser.add_argument(
277 "-f",
278 "--field",
279 action="append",
280 nargs=2,
281 dest="fields",
282 required=True,
283 metavar=("KEY", "VALUE"),
284 help=_("configuration field to set (required)"),
285 )
286 self.parser.add_argument(
287 "-F",
288 "--full-prefix",
289 action="store_true",
290 help=_('don\'t prepend "pubsub#" prefix to field names'),
291 )
292
293 def get_key_name(self, k):
294 if self.args.full_prefix or k.startswith("pubsub#"):
295 return k
296 else:
297 return "pubsub#" + k
298
299 async def start(self):
300 try:
301 await self.host.bridge.ps_node_configuration_set(
302 self.args.service,
303 self.args.node,
304 {self.get_key_name(k): v for k, v in self.args.fields},
305 self.profile,
306 )
307 except Exception as e:
308 self.disp(f"can't set node configuration: {e}", error=True)
309 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
310 else:
311 self.disp(_("node configuration successful"), 1)
312 self.host.quit()
313
314
315 class NodeImport(base.CommandBase):
316 def __init__(self, host):
317 super(NodeImport, self).__init__(
318 host,
319 "import",
320 use_pubsub=True,
321 pubsub_flags={C.NODE},
322 help=_("import raw XML to a node"),
323 )
324
325 def add_parser_options(self):
326 self.parser.add_argument(
327 "--admin",
328 action="store_true",
329 help=_("do a pubsub admin request, needed to change publisher"),
330 )
331 self.parser.add_argument(
332 "import_file",
333 type=argparse.FileType(),
334 help=_(
335 "path to the XML file with data to import. The file must contain "
336 "whole XML of each item to import."
337 ),
338 )
339
340 async def start(self):
341 try:
342 element, etree = xml_tools.etree_parse(
343 self, self.args.import_file, reraise=True
344 )
345 except Exception as e:
346 from lxml.etree import XMLSyntaxError
347
348 if isinstance(e, XMLSyntaxError) and e.code == 5:
349 # we have extra content, this probaby means that item are not wrapped
350 # so we wrap them here and try again
351 self.args.import_file.seek(0)
352 xml_buf = "<import>" + self.args.import_file.read() + "</import>"
353 element, etree = xml_tools.etree_parse(self, xml_buf)
354
355 # we reverse element as we expect to have most recently published element first
356 # TODO: make this more explicit and add an option
357 element[:] = reversed(element)
358
359 if not all([i.tag == "{http://jabber.org/protocol/pubsub}item" for i in element]):
360 self.disp(
361 _("You are not using list of pubsub items, we can't import this file"),
362 error=True,
363 )
364 self.host.quit(C.EXIT_DATA_ERROR)
365 return
366
367 items = [etree.tostring(i, encoding="unicode") for i in element]
368 if self.args.admin:
369 method = self.host.bridge.ps_admin_items_send
370 else:
371 self.disp(
372 _(
373 "Items are imported without using admin mode, publisher can't "
374 "be changed"
375 )
376 )
377 method = self.host.bridge.ps_items_send
378
379 try:
380 items_ids = await method(
381 self.args.service,
382 self.args.node,
383 items,
384 "",
385 self.profile,
386 )
387 except Exception as e:
388 self.disp(f"can't send items: {e}", error=True)
389 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
390 else:
391 if items_ids:
392 self.disp(
393 _("items published with id(s) {items_ids}").format(
394 items_ids=", ".join(items_ids)
395 )
396 )
397 else:
398 self.disp(_("items published"))
399 self.host.quit()
400
401
402 class NodeAffiliationsGet(base.CommandBase):
403 def __init__(self, host):
404 base.CommandBase.__init__(
405 self,
406 host,
407 "get",
408 use_output=C.OUTPUT_DICT,
409 use_pubsub=True,
410 pubsub_flags={C.NODE},
411 help=_("retrieve node affiliations (for node owner)"),
412 )
413
414 def add_parser_options(self):
415 pass
416
417 async def start(self):
418 try:
419 affiliations = await self.host.bridge.ps_node_affiliations_get(
420 self.args.service,
421 self.args.node,
422 self.profile,
423 )
424 except Exception as e:
425 self.disp(f"can't get node affiliations: {e}", error=True)
426 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
427 else:
428 await self.output(affiliations)
429 self.host.quit()
430
431
432 class NodeAffiliationsSet(base.CommandBase):
433 def __init__(self, host):
434 base.CommandBase.__init__(
435 self,
436 host,
437 "set",
438 use_pubsub=True,
439 pubsub_flags={C.NODE},
440 use_verbose=True,
441 help=_("set affiliations (for node owner)"),
442 )
443
444 def add_parser_options(self):
445 # XXX: we use optional argument syntax for a required one because list of list of 2 elements
446 # (used to construct dicts) don't work with positional arguments
447 self.parser.add_argument(
448 "-a",
449 "--affiliation",
450 dest="affiliations",
451 metavar=("JID", "AFFILIATION"),
452 required=True,
453 action="append",
454 nargs=2,
455 help=_("entity/affiliation couple(s)"),
456 )
457
458 async def start(self):
459 affiliations = dict(self.args.affiliations)
460 try:
461 await self.host.bridge.ps_node_affiliations_set(
462 self.args.service,
463 self.args.node,
464 affiliations,
465 self.profile,
466 )
467 except Exception as e:
468 self.disp(f"can't set node affiliations: {e}", error=True)
469 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
470 else:
471 self.disp(_("affiliations have been set"), 1)
472 self.host.quit()
473
474
475 class NodeAffiliations(base.CommandBase):
476 subcommands = (NodeAffiliationsGet, NodeAffiliationsSet)
477
478 def __init__(self, host):
479 super(NodeAffiliations, self).__init__(
480 host,
481 "affiliations",
482 use_profile=False,
483 help=_("set or retrieve node affiliations"),
484 )
485
486
487 class NodeSubscriptionsGet(base.CommandBase):
488 def __init__(self, host):
489 base.CommandBase.__init__(
490 self,
491 host,
492 "get",
493 use_output=C.OUTPUT_DICT,
494 use_pubsub=True,
495 pubsub_flags={C.NODE},
496 help=_("retrieve node subscriptions (for node owner)"),
497 )
498
499 def add_parser_options(self):
500 self.parser.add_argument(
501 "--public",
502 action="store_true",
503 help=_("get public subscriptions"),
504 )
505
506 async def start(self):
507 if self.args.public:
508 method = self.host.bridge.ps_public_node_subscriptions_get
509 else:
510 method = self.host.bridge.ps_node_subscriptions_get
511 try:
512 subscriptions = await method(
513 self.args.service,
514 self.args.node,
515 self.profile,
516 )
517 except Exception as e:
518 self.disp(f"can't get node subscriptions: {e}", error=True)
519 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
520 else:
521 await self.output(subscriptions)
522 self.host.quit()
523
524
525 class StoreSubscriptionAction(argparse.Action):
526 """Action which handle subscription parameter for owner
527
528 list is given by pairs: jid and subscription state
529 if subscription state is not specified, it default to "subscribed"
530 """
531
532 def __call__(self, parser, namespace, values, option_string):
533 dest_dict = getattr(namespace, self.dest)
534 while values:
535 jid_s = values.pop(0)
536 try:
537 subscription = values.pop(0)
538 except IndexError:
539 subscription = "subscribed"
540 if subscription not in ALLOWED_SUBSCRIPTIONS_OWNER:
541 parser.error(
542 _("subscription must be one of {}").format(
543 ", ".join(ALLOWED_SUBSCRIPTIONS_OWNER)
544 )
545 )
546 dest_dict[jid_s] = subscription
547
548
549 class NodeSubscriptionsSet(base.CommandBase):
550 def __init__(self, host):
551 base.CommandBase.__init__(
552 self,
553 host,
554 "set",
555 use_pubsub=True,
556 pubsub_flags={C.NODE},
557 use_verbose=True,
558 help=_("set/modify subscriptions (for node owner)"),
559 )
560
561 def add_parser_options(self):
562 # XXX: we use optional argument syntax for a required one because list of list of 2 elements
563 # (uses to construct dicts) don't work with positional arguments
564 self.parser.add_argument(
565 "-S",
566 "--subscription",
567 dest="subscriptions",
568 default={},
569 nargs="+",
570 metavar=("JID [SUSBSCRIPTION]"),
571 required=True,
572 action=StoreSubscriptionAction,
573 help=_("entity/subscription couple(s)"),
574 )
575
576 async def start(self):
577 try:
578 self.host.bridge.ps_node_subscriptions_set(
579 self.args.service,
580 self.args.node,
581 self.args.subscriptions,
582 self.profile,
583 )
584 except Exception as e:
585 self.disp(f"can't set node subscriptions: {e}", error=True)
586 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
587 else:
588 self.disp(_("subscriptions have been set"), 1)
589 self.host.quit()
590
591
592 class NodeSubscriptions(base.CommandBase):
593 subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet)
594
595 def __init__(self, host):
596 super(NodeSubscriptions, self).__init__(
597 host,
598 "subscriptions",
599 use_profile=False,
600 help=_("get or modify node subscriptions"),
601 )
602
603
604 class NodeSchemaSet(base.CommandBase):
605 def __init__(self, host):
606 base.CommandBase.__init__(
607 self,
608 host,
609 "set",
610 use_pubsub=True,
611 pubsub_flags={C.NODE},
612 use_verbose=True,
613 help=_("set/replace a schema"),
614 )
615
616 def add_parser_options(self):
617 self.parser.add_argument("schema", help=_("schema to set (must be XML)"))
618
619 async def start(self):
620 try:
621 await self.host.bridge.ps_schema_set(
622 self.args.service,
623 self.args.node,
624 self.args.schema,
625 self.profile,
626 )
627 except Exception as e:
628 self.disp(f"can't set schema: {e}", error=True)
629 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
630 else:
631 self.disp(_("schema has been set"), 1)
632 self.host.quit()
633
634
635 class NodeSchemaEdit(base.CommandBase, common.BaseEdit):
636 use_items = False
637
638 def __init__(self, host):
639 base.CommandBase.__init__(
640 self,
641 host,
642 "edit",
643 use_pubsub=True,
644 pubsub_flags={C.NODE},
645 use_draft=True,
646 use_verbose=True,
647 help=_("edit a schema"),
648 )
649 common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR)
650
651 def add_parser_options(self):
652 pass
653
654 async def publish(self, schema):
655 try:
656 await self.host.bridge.ps_schema_set(
657 self.args.service,
658 self.args.node,
659 schema,
660 self.profile,
661 )
662 except Exception as e:
663 self.disp(f"can't set schema: {e}", error=True)
664 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
665 else:
666 self.disp(_("schema has been set"), 1)
667 self.host.quit()
668
669 async def ps_schema_get_cb(self, schema):
670 try:
671 from lxml import etree
672 except ImportError:
673 self.disp(
674 "lxml module must be installed to use edit, please install it "
675 'with "pip install lxml"',
676 error=True,
677 )
678 self.host.quit(1)
679 content_file_obj, content_file_path = self.get_tmp_file()
680 schema = schema.strip()
681 if schema:
682 parser = etree.XMLParser(remove_blank_text=True)
683 schema_elt = etree.fromstring(schema, parser)
684 content_file_obj.write(
685 etree.tostring(schema_elt, encoding="utf-8", pretty_print=True)
686 )
687 content_file_obj.seek(0)
688 await self.run_editor(
689 "pubsub_schema_editor_args", content_file_path, content_file_obj
690 )
691
692 async def start(self):
693 try:
694 schema = await self.host.bridge.ps_schema_get(
695 self.args.service,
696 self.args.node,
697 self.profile,
698 )
699 except BridgeException as e:
700 if e.condition == "item-not-found" or e.classname == "NotFound":
701 schema = ""
702 else:
703 self.disp(f"can't edit schema: {e}", error=True)
704 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
705
706 await self.ps_schema_get_cb(schema)
707
708
709 class NodeSchemaGet(base.CommandBase):
710 def __init__(self, host):
711 base.CommandBase.__init__(
712 self,
713 host,
714 "get",
715 use_output=C.OUTPUT_XML,
716 use_pubsub=True,
717 pubsub_flags={C.NODE},
718 use_verbose=True,
719 help=_("get schema"),
720 )
721
722 def add_parser_options(self):
723 pass
724
725 async def start(self):
726 try:
727 schema = await self.host.bridge.ps_schema_get(
728 self.args.service,
729 self.args.node,
730 self.profile,
731 )
732 except BridgeException as e:
733 if e.condition == "item-not-found" or e.classname == "NotFound":
734 schema = None
735 else:
736 self.disp(f"can't get schema: {e}", error=True)
737 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
738
739 if schema:
740 await self.output(schema)
741 self.host.quit()
742 else:
743 self.disp(_("no schema found"), 1)
744 self.host.quit(C.EXIT_NOT_FOUND)
745
746
747 class NodeSchema(base.CommandBase):
748 subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet)
749
750 def __init__(self, host):
751 super(NodeSchema, self).__init__(
752 host, "schema", use_profile=False, help=_("data schema manipulation")
753 )
754
755
756 class Node(base.CommandBase):
757 subcommands = (
758 NodeInfo,
759 NodeCreate,
760 NodePurge,
761 NodeDelete,
762 NodeSet,
763 NodeImport,
764 NodeAffiliations,
765 NodeSubscriptions,
766 NodeSchema,
767 )
768
769 def __init__(self, host):
770 super(Node, self).__init__(
771 host, "node", use_profile=False, help=_("node handling")
772 )
773
774
775 class CacheGet(base.CommandBase):
776 def __init__(self, host):
777 super().__init__(
778 host,
779 "get",
780 use_output=C.OUTPUT_LIST_XML,
781 use_pubsub=True,
782 pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
783 help=_("get pubsub item(s) from cache"),
784 )
785
786 def add_parser_options(self):
787 self.parser.add_argument(
788 "-S",
789 "--sub-id",
790 default="",
791 help=_("subscription id"),
792 )
793
794 async def start(self):
795 try:
796 ps_result = data_format.deserialise(
797 await self.host.bridge.ps_cache_get(
798 self.args.service,
799 self.args.node,
800 self.args.max,
801 self.args.items,
802 self.args.sub_id,
803 self.get_pubsub_extra(),
804 self.profile,
805 )
806 )
807 except BridgeException as e:
808 if e.classname == "NotFound":
809 self.disp(
810 f"The node {self.args.node} from {self.args.service} is not in cache "
811 f"for {self.profile}",
812 error=True,
813 )
814 self.host.quit(C.EXIT_NOT_FOUND)
815 else:
816 self.disp(f"can't get pubsub items from cache: {e}", error=True)
817 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
818 except Exception as e:
819 self.disp(f"Internal error: {e}", error=True)
820 self.host.quit(C.EXIT_INTERNAL_ERROR)
821 else:
822 await self.output(ps_result["items"])
823 self.host.quit(C.EXIT_OK)
824
825
826 class CacheSync(base.CommandBase):
827
828 def __init__(self, host):
829 super().__init__(
830 host,
831 "sync",
832 use_pubsub=True,
833 pubsub_flags={C.NODE},
834 help=_("(re)synchronise a pubsub node"),
835 )
836
837 def add_parser_options(self):
838 pass
839
840 async def start(self):
841 try:
842 await self.host.bridge.ps_cache_sync(
843 self.args.service,
844 self.args.node,
845 self.profile,
846 )
847 except BridgeException as e:
848 if e.condition == "item-not-found" or e.classname == "NotFound":
849 self.disp(
850 f"The node {self.args.node} doesn't exist on {self.args.service}",
851 error=True,
852 )
853 self.host.quit(C.EXIT_NOT_FOUND)
854 else:
855 self.disp(f"can't synchronise pubsub node: {e}", error=True)
856 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
857 except Exception as e:
858 self.disp(f"Internal error: {e}", error=True)
859 self.host.quit(C.EXIT_INTERNAL_ERROR)
860 else:
861 self.host.quit(C.EXIT_OK)
862
863
864 class CachePurge(base.CommandBase):
865
866 def __init__(self, host):
867 super().__init__(
868 host,
869 "purge",
870 use_profile=False,
871 help=_("purge (delete) items from cache"),
872 )
873
874 def add_parser_options(self):
875 self.parser.add_argument(
876 "-s", "--service", action="append", metavar="JID", dest="services",
877 help="purge items only for these services. If not specified, items from ALL "
878 "services will be purged. May be used several times."
879 )
880 self.parser.add_argument(
881 "-n", "--node", action="append", dest="nodes",
882 help="purge items only for these nodes. If not specified, items from ALL "
883 "nodes will be purged. May be used several times."
884 )
885 self.parser.add_argument(
886 "-p", "--profile", action="append", dest="profiles",
887 help="purge items only for these profiles. If not specified, items from ALL "
888 "profiles will be purged. May be used several times."
889 )
890 self.parser.add_argument(
891 "-b", "--updated-before", type=base.date_decoder, metavar="TIME_PATTERN",
892 help="purge items which have been last updated before given time."
893 )
894 self.parser.add_argument(
895 "-C", "--created-before", type=base.date_decoder, metavar="TIME_PATTERN",
896 help="purge items which have been last created before given time."
897 )
898 self.parser.add_argument(
899 "-t", "--type", action="append", dest="types",
900 help="purge items flagged with TYPE. May be used several times."
901 )
902 self.parser.add_argument(
903 "-S", "--subtype", action="append", dest="subtypes",
904 help="purge items flagged with SUBTYPE. May be used several times."
905 )
906 self.parser.add_argument(
907 "-f", "--force", action="store_true",
908 help=_("purge items without confirmation")
909 )
910
911 async def start(self):
912 if not self.args.force:
913 await self.host.confirm_or_quit(
914 _(
915 "Are you sure to purge items from cache? You'll have to bypass cache "
916 "or resynchronise nodes to access deleted items again."
917 ),
918 _("Items purgins has been cancelled.")
919 )
920 purge_data = {}
921 for key in (
922 "services", "nodes", "profiles", "updated_before", "created_before",
923 "types", "subtypes"
924 ):
925 value = getattr(self.args, key)
926 if value is not None:
927 purge_data[key] = value
928 try:
929 await self.host.bridge.ps_cache_purge(
930 data_format.serialise(
931 purge_data
932 )
933 )
934 except Exception as e:
935 self.disp(f"Internal error: {e}", error=True)
936 self.host.quit(C.EXIT_INTERNAL_ERROR)
937 else:
938 self.host.quit(C.EXIT_OK)
939
940
941 class CacheReset(base.CommandBase):
942
943 def __init__(self, host):
944 super().__init__(
945 host,
946 "reset",
947 use_profile=False,
948 help=_("remove everything from cache"),
949 )
950
951 def add_parser_options(self):
952 self.parser.add_argument(
953 "-f", "--force", action="store_true",
954 help=_("reset cache without confirmation")
955 )
956
957 async def start(self):
958 if not self.args.force:
959 await self.host.confirm_or_quit(
960 _(
961 "Are you sure to reset cache? All nodes and items will be removed "
962 "from it, then it will be progressively refilled as if it were new. "
963 "This may be resources intensive."
964 ),
965 _("Pubsub cache reset has been cancelled.")
966 )
967 try:
968 await self.host.bridge.ps_cache_reset()
969 except Exception as e:
970 self.disp(f"Internal error: {e}", error=True)
971 self.host.quit(C.EXIT_INTERNAL_ERROR)
972 else:
973 self.host.quit(C.EXIT_OK)
974
975
976 class CacheSearch(base.CommandBase):
977 def __init__(self, host):
978 extra_outputs = {
979 "default": self.default_output,
980 "xml": self.xml_output,
981 "xml-raw": self.xml_raw_output,
982 }
983 super().__init__(
984 host,
985 "search",
986 use_profile=False,
987 use_output=C.OUTPUT_LIST_DICT,
988 extra_outputs=extra_outputs,
989 help=_("search for pubsub items in cache"),
990 )
991
992 def add_parser_options(self):
993 self.parser.add_argument(
994 "-f", "--fts", help=_("Full-Text Search query"), metavar="FTS_QUERY"
995 )
996 self.parser.add_argument(
997 "-p", "--profile", action="append", dest="profiles", metavar="PROFILE",
998 help="search items only from these profiles. May be used several times."
999 )
1000 self.parser.add_argument(
1001 "-s", "--service", action="append", dest="services", metavar="SERVICE",
1002 help="items must be from specified service. May be used several times."
1003 )
1004 self.parser.add_argument(
1005 "-n", "--node", action="append", dest="nodes", metavar="NODE",
1006 help="items must be in the specified node. May be used several times."
1007 )
1008 self.parser.add_argument(
1009 "-t", "--type", action="append", dest="types", metavar="TYPE",
1010 help="items must be of specified type. May be used several times."
1011 )
1012 self.parser.add_argument(
1013 "-S", "--subtype", action="append", dest="subtypes", metavar="SUBTYPE",
1014 help="items must be of specified subtype. May be used several times."
1015 )
1016 self.parser.add_argument(
1017 "-P", "--payload", action="store_true", help=_("include item XML payload")
1018 )
1019 self.parser.add_argument(
1020 "-o", "--order-by", action="append", nargs="+",
1021 metavar=("ORDER", "[FIELD] [DIRECTION]"),
1022 help=_("how items must be ordered. May be used several times.")
1023 )
1024 self.parser.add_argument(
1025 "-l", "--limit", type=int, help=_("maximum number of items to return")
1026 )
1027 self.parser.add_argument(
1028 "-i", "--index", type=int, help=_("return results starting from this index")
1029 )
1030 self.parser.add_argument(
1031 "-F",
1032 "--field",
1033 action="append",
1034 nargs=3,
1035 dest="fields",
1036 default=[],
1037 metavar=("PATH", "OPERATOR", "VALUE"),
1038 help=_("parsed data field filter. May be used several times."),
1039 )
1040 self.parser.add_argument(
1041 "-k",
1042 "--key",
1043 action="append",
1044 dest="keys",
1045 metavar="KEY",
1046 help=_(
1047 "data key(s) to display. May be used several times. DEFAULT: show all "
1048 "keys"
1049 ),
1050 )
1051
1052 async def start(self):
1053 query = {}
1054 for arg in ("fts", "profiles", "services", "nodes", "types", "subtypes"):
1055 value = getattr(self.args, arg)
1056 if value:
1057 if arg in ("types", "subtypes"):
1058 # empty string is used to find items without type and/or subtype
1059 value = [v or None for v in value]
1060 query[arg] = value
1061 for arg in ("limit", "index"):
1062 value = getattr(self.args, arg)
1063 if value is not None:
1064 query[arg] = value
1065 if self.args.order_by is not None:
1066 for order_data in self.args.order_by:
1067 order, *args = order_data
1068 if order == "field":
1069 if not args:
1070 self.parser.error(_("field data must be specified in --order-by"))
1071 elif len(args) == 1:
1072 path = args[0]
1073 direction = "asc"
1074 elif len(args) == 2:
1075 path, direction = args
1076 else:
1077 self.parser.error(_(
1078 "You can't specify more that 2 arguments for a field in "
1079 "--order-by"
1080 ))
1081 try:
1082 path = json.loads(path)
1083 except json.JSONDecodeError:
1084 pass
1085 order_query = {
1086 "path": path,
1087 }
1088 else:
1089 order_query = {
1090 "order": order
1091 }
1092 if not args:
1093 direction = "asc"
1094 elif len(args) == 1:
1095 direction = args[0]
1096 else:
1097 self.parser.error(_(
1098 "there are too many arguments in --order-by option"
1099 ))
1100 if direction.lower() not in ("asc", "desc"):
1101 self.parser.error(_("invalid --order-by direction: {direction!r}"))
1102 order_query["direction"] = direction
1103 query.setdefault("order-by", []).append(order_query)
1104
1105 if self.args.fields:
1106 parsed = []
1107 for field in self.args.fields:
1108 path, operator, value = field
1109 try:
1110 path = json.loads(path)
1111 except json.JSONDecodeError:
1112 # this is not a JSON encoded value, we keep it as a string
1113 pass
1114
1115 if not isinstance(path, list):
1116 path = [path]
1117
1118 # handling of TP(<time pattern>)
1119 if operator in (">", "gt", "<", "le", "between"):
1120 def datetime_sub(match):
1121 return str(date_utils.date_parse_ext(
1122 match.group(1), default_tz=date_utils.TZ_LOCAL
1123 ))
1124 value = re.sub(r"\bTP\(([^)]+)\)", datetime_sub, value)
1125
1126 try:
1127 value = json.loads(value)
1128 except json.JSONDecodeError:
1129 # not JSON, as above we keep it as string
1130 pass
1131
1132 if operator in ("overlap", "ioverlap", "disjoint", "idisjoint"):
1133 if not isinstance(value, list):
1134 value = [value]
1135
1136 parsed.append({
1137 "path": path,
1138 "op": operator,
1139 "value": value
1140 })
1141
1142 query["parsed"] = parsed
1143
1144 if self.args.payload or "xml" in self.args.output:
1145 query["with_payload"] = True
1146 if self.args.keys:
1147 self.args.keys.append("item_payload")
1148 try:
1149 found_items = data_format.deserialise(
1150 await self.host.bridge.ps_cache_search(
1151 data_format.serialise(query)
1152 ),
1153 type_check=list,
1154 )
1155 except BridgeException as e:
1156 self.disp(f"can't search for pubsub items in cache: {e}", error=True)
1157 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1158 except Exception as e:
1159 self.disp(f"Internal error: {e}", error=True)
1160 self.host.quit(C.EXIT_INTERNAL_ERROR)
1161 else:
1162 if self.args.keys:
1163 found_items = [
1164 {k: v for k,v in item.items() if k in self.args.keys}
1165 for item in found_items
1166 ]
1167 await self.output(found_items)
1168 self.host.quit(C.EXIT_OK)
1169
1170 def default_output(self, found_items):
1171 for item in found_items:
1172 for field in ("created", "published", "updated"):
1173 try:
1174 timestamp = item[field]
1175 except KeyError:
1176 pass
1177 else:
1178 try:
1179 item[field] = common.format_time(timestamp)
1180 except ValueError:
1181 pass
1182 self.host._outputs[C.OUTPUT_LIST_DICT]["simple"]["callback"](found_items)
1183
1184 def xml_output(self, found_items):
1185 """Output prettified item payload"""
1186 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML]["callback"]
1187 for item in found_items:
1188 cb(item["item_payload"])
1189
1190 def xml_raw_output(self, found_items):
1191 """Output item payload without prettifying"""
1192 cb = self.host._outputs[C.OUTPUT_XML][C.OUTPUT_NAME_XML_RAW]["callback"]
1193 for item in found_items:
1194 cb(item["item_payload"])
1195
1196
1197 class Cache(base.CommandBase):
1198 subcommands = (
1199 CacheGet,
1200 CacheSync,
1201 CachePurge,
1202 CacheReset,
1203 CacheSearch,
1204 )
1205
1206 def __init__(self, host):
1207 super(Cache, self).__init__(
1208 host, "cache", use_profile=False, help=_("pubsub cache handling")
1209 )
1210
1211
1212 class Set(base.CommandBase):
1213 def __init__(self, host):
1214 base.CommandBase.__init__(
1215 self,
1216 host,
1217 "set",
1218 use_pubsub=True,
1219 use_quiet=True,
1220 pubsub_flags={C.NODE},
1221 help=_("publish a new item or update an existing one"),
1222 )
1223
1224 def add_parser_options(self):
1225 NodeCreate.add_node_config_options(self.parser)
1226 self.parser.add_argument(
1227 "-e",
1228 "--encrypt",
1229 action="store_true",
1230 help=_("end-to-end encrypt the blog item")
1231 )
1232 self.parser.add_argument(
1233 "--encrypt-for",
1234 metavar="JID",
1235 action="append",
1236 help=_("encrypt a single item for")
1237 )
1238 self.parser.add_argument(
1239 "-X",
1240 "--sign",
1241 action="store_true",
1242 help=_("cryptographically sign the blog post")
1243 )
1244 self.parser.add_argument(
1245 "item",
1246 nargs="?",
1247 default="",
1248 help=_("id, URL of the item to update, keyword, or nothing for new item"),
1249 )
1250
1251 async def start(self):
1252 element, etree = xml_tools.etree_parse(self, sys.stdin)
1253 element = xml_tools.get_payload(self, element)
1254 payload = etree.tostring(element, encoding="unicode")
1255 extra = {}
1256 if self.args.encrypt:
1257 extra["encrypted"] = True
1258 if self.args.encrypt_for:
1259 extra["encrypted_for"] = {"targets": self.args.encrypt_for}
1260 if self.args.sign:
1261 extra["signed"] = True
1262 publish_options = NodeCreate.get_config_options(self.args)
1263 if publish_options:
1264 extra["publish_options"] = publish_options
1265
1266 try:
1267 published_id = await self.host.bridge.ps_item_send(
1268 self.args.service,
1269 self.args.node,
1270 payload,
1271 self.args.item,
1272 data_format.serialise(extra),
1273 self.profile,
1274 )
1275 except Exception as e:
1276 self.disp(_("can't send item: {e}").format(e=e), error=True)
1277 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1278 else:
1279 if published_id:
1280 if self.args.quiet:
1281 self.disp(published_id, end="")
1282 else:
1283 self.disp(f"Item published at {published_id}")
1284 else:
1285 self.disp("Item published")
1286 self.host.quit(C.EXIT_OK)
1287
1288
1289 class Get(base.CommandBase):
1290 def __init__(self, host):
1291 base.CommandBase.__init__(
1292 self,
1293 host,
1294 "get",
1295 use_output=C.OUTPUT_LIST_XML,
1296 use_pubsub=True,
1297 pubsub_flags={C.NODE, C.MULTI_ITEMS, C.CACHE},
1298 help=_("get pubsub item(s)"),
1299 )
1300
1301 def add_parser_options(self):
1302 self.parser.add_argument(
1303 "-S",
1304 "--sub-id",
1305 default="",
1306 help=_("subscription id"),
1307 )
1308 self.parser.add_argument(
1309 "--no-decrypt",
1310 action="store_true",
1311 help=_("don't do automatic decryption of e2ee items"),
1312 )
1313 #  TODO: a key(s) argument to select keys to display
1314
1315 async def start(self):
1316 extra = {}
1317 if self.args.no_decrypt:
1318 extra["decrypt"] = False
1319 try:
1320 ps_result = data_format.deserialise(
1321 await self.host.bridge.ps_items_get(
1322 self.args.service,
1323 self.args.node,
1324 self.args.max,
1325 self.args.items,
1326 self.args.sub_id,
1327 self.get_pubsub_extra(extra),
1328 self.profile,
1329 )
1330 )
1331 except BridgeException as e:
1332 if e.condition == "item-not-found" or e.classname == "NotFound":
1333 self.disp(
1334 f"The node {self.args.node} doesn't exist on {self.args.service}",
1335 error=True,
1336 )
1337 self.host.quit(C.EXIT_NOT_FOUND)
1338 else:
1339 self.disp(f"can't get pubsub items: {e}", error=True)
1340 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1341 except Exception as e:
1342 self.disp(f"Internal error: {e}", error=True)
1343 self.host.quit(C.EXIT_INTERNAL_ERROR)
1344 else:
1345 await self.output(ps_result["items"])
1346 self.host.quit(C.EXIT_OK)
1347
1348
1349 class Delete(base.CommandBase):
1350 def __init__(self, host):
1351 base.CommandBase.__init__(
1352 self,
1353 host,
1354 "delete",
1355 use_pubsub=True,
1356 pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM},
1357 help=_("delete an item"),
1358 )
1359
1360 def add_parser_options(self):
1361 self.parser.add_argument(
1362 "-f", "--force", action="store_true", help=_("delete without confirmation")
1363 )
1364 self.parser.add_argument(
1365 "--no-notification", dest="notify", action="store_false",
1366 help=_("do not send notification (not recommended)")
1367 )
1368
1369 async def start(self):
1370 if not self.args.item:
1371 self.parser.error(_("You need to specify an item to delete"))
1372 if not self.args.force:
1373 message = _("Are you sure to delete item {item_id} ?").format(
1374 item_id=self.args.item
1375 )
1376 await self.host.confirm_or_quit(message, _("item deletion cancelled"))
1377 try:
1378 await self.host.bridge.ps_item_retract(
1379 self.args.service,
1380 self.args.node,
1381 self.args.item,
1382 self.args.notify,
1383 self.profile,
1384 )
1385 except Exception as e:
1386 self.disp(_("can't delete item: {e}").format(e=e), error=True)
1387 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1388 else:
1389 self.disp(_("item {item} has been deleted").format(item=self.args.item))
1390 self.host.quit(C.EXIT_OK)
1391
1392
1393 class Edit(base.CommandBase, common.BaseEdit):
1394 def __init__(self, host):
1395 base.CommandBase.__init__(
1396 self,
1397 host,
1398 "edit",
1399 use_verbose=True,
1400 use_pubsub=True,
1401 pubsub_flags={C.NODE, C.SINGLE_ITEM},
1402 use_draft=True,
1403 help=_("edit an existing or new pubsub item"),
1404 )
1405 common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR)
1406
1407 def add_parser_options(self):
1408 self.parser.add_argument(
1409 "-e",
1410 "--encrypt",
1411 action="store_true",
1412 help=_("end-to-end encrypt the blog item")
1413 )
1414 self.parser.add_argument(
1415 "--encrypt-for",
1416 metavar="JID",
1417 action="append",
1418 help=_("encrypt a single item for")
1419 )
1420 self.parser.add_argument(
1421 "-X",
1422 "--sign",
1423 action="store_true",
1424 help=_("cryptographically sign the blog post")
1425 )
1426
1427 async def publish(self, content):
1428 extra = {}
1429 if self.args.encrypt:
1430 extra["encrypted"] = True
1431 if self.args.encrypt_for:
1432 extra["encrypted_for"] = {"targets": self.args.encrypt_for}
1433 if self.args.sign:
1434 extra["signed"] = True
1435 published_id = await self.host.bridge.ps_item_send(
1436 self.pubsub_service,
1437 self.pubsub_node,
1438 content,
1439 self.pubsub_item or "",
1440 data_format.serialise(extra),
1441 self.profile,
1442 )
1443 if published_id:
1444 self.disp("Item published at {pub_id}".format(pub_id=published_id))
1445 else:
1446 self.disp("Item published")
1447
1448 async def get_item_data(self, service, node, item):
1449 try:
1450 from lxml import etree
1451 except ImportError:
1452 self.disp(
1453 "lxml module must be installed to use edit, please install it "
1454 'with "pip install lxml"',
1455 error=True,
1456 )
1457 self.host.quit(1)
1458 items = [item] if item else []
1459 ps_result = data_format.deserialise(
1460 await self.host.bridge.ps_items_get(
1461 service, node, 1, items, "", data_format.serialise({}), self.profile
1462 )
1463 )
1464 item_raw = ps_result["items"][0]
1465 parser = etree.XMLParser(remove_blank_text=True, recover=True)
1466 item_elt = etree.fromstring(item_raw, parser)
1467 item_id = item_elt.get("id")
1468 try:
1469 payload = item_elt[0]
1470 except IndexError:
1471 self.disp(_("Item has not payload"), 1)
1472 return "", item_id
1473 return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id
1474
1475 async def start(self):
1476 (
1477 self.pubsub_service,
1478 self.pubsub_node,
1479 self.pubsub_item,
1480 content_file_path,
1481 content_file_obj,
1482 ) = await self.get_item_path()
1483 await self.run_editor("pubsub_editor_args", content_file_path, content_file_obj)
1484 self.host.quit()
1485
1486
1487 class Rename(base.CommandBase):
1488 def __init__(self, host):
1489 base.CommandBase.__init__(
1490 self,
1491 host,
1492 "rename",
1493 use_pubsub=True,
1494 pubsub_flags={C.NODE, C.SINGLE_ITEM},
1495 help=_("rename a pubsub item"),
1496 )
1497
1498 def add_parser_options(self):
1499 self.parser.add_argument("new_id", help=_("new item id to use"))
1500
1501 async def start(self):
1502 try:
1503 await self.host.bridge.ps_item_rename(
1504 self.args.service,
1505 self.args.node,
1506 self.args.item,
1507 self.args.new_id,
1508 self.profile,
1509 )
1510 except Exception as e:
1511 self.disp(f"can't rename item: {e}", error=True)
1512 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1513 else:
1514 self.disp("Item renamed")
1515 self.host.quit(C.EXIT_OK)
1516
1517
1518 class Subscribe(base.CommandBase):
1519 def __init__(self, host):
1520 base.CommandBase.__init__(
1521 self,
1522 host,
1523 "subscribe",
1524 use_pubsub=True,
1525 pubsub_flags={C.NODE},
1526 use_verbose=True,
1527 help=_("subscribe to a node"),
1528 )
1529
1530 def add_parser_options(self):
1531 self.parser.add_argument(
1532 "--public",
1533 action="store_true",
1534 help=_("make the registration visible for everybody"),
1535 )
1536
1537 async def start(self):
1538 options = {}
1539 if self.args.public:
1540 namespaces = await self.host.bridge.namespaces_get()
1541 try:
1542 ns_pps = namespaces["pps"]
1543 except KeyError:
1544 self.disp(
1545 "Pubsub Public Subscription plugin is not loaded, can't use --public "
1546 "option, subscription stopped", error=True
1547 )
1548 self.host.quit(C.EXIT_MISSING_FEATURE)
1549 else:
1550 options[f"{{{ns_pps}}}public"] = True
1551 try:
1552 sub_id = await self.host.bridge.ps_subscribe(
1553 self.args.service,
1554 self.args.node,
1555 data_format.serialise(options),
1556 self.profile,
1557 )
1558 except Exception as e:
1559 self.disp(_("can't subscribe to node: {e}").format(e=e), error=True)
1560 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1561 else:
1562 self.disp(_("subscription done"), 1)
1563 if sub_id:
1564 self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id))
1565 self.host.quit()
1566
1567
1568 class Unsubscribe(base.CommandBase):
1569 # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe
1570
1571 def __init__(self, host):
1572 base.CommandBase.__init__(
1573 self,
1574 host,
1575 "unsubscribe",
1576 use_pubsub=True,
1577 pubsub_flags={C.NODE},
1578 use_verbose=True,
1579 help=_("unsubscribe from a node"),
1580 )
1581
1582 def add_parser_options(self):
1583 pass
1584
1585 async def start(self):
1586 try:
1587 await self.host.bridge.ps_unsubscribe(
1588 self.args.service,
1589 self.args.node,
1590 self.profile,
1591 )
1592 except Exception as e:
1593 self.disp(_("can't unsubscribe from node: {e}").format(e=e), error=True)
1594 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1595 else:
1596 self.disp(_("subscription removed"), 1)
1597 self.host.quit()
1598
1599
1600 class Subscriptions(base.CommandBase):
1601 def __init__(self, host):
1602 base.CommandBase.__init__(
1603 self,
1604 host,
1605 "subscriptions",
1606 use_output=C.OUTPUT_LIST_DICT,
1607 use_pubsub=True,
1608 help=_("retrieve all subscriptions on a service"),
1609 )
1610
1611 def add_parser_options(self):
1612 self.parser.add_argument(
1613 "--public",
1614 action="store_true",
1615 help=_("get public subscriptions"),
1616 )
1617
1618 async def start(self):
1619 if self.args.public:
1620 method = self.host.bridge.ps_public_subscriptions_get
1621 else:
1622 method = self.host.bridge.ps_subscriptions_get
1623 try:
1624 subscriptions = data_format.deserialise(
1625 await method(
1626 self.args.service,
1627 self.args.node,
1628 self.profile,
1629 ),
1630 type_check=list
1631 )
1632 except Exception as e:
1633 self.disp(_("can't retrieve subscriptions: {e}").format(e=e), error=True)
1634 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1635 else:
1636 await self.output(subscriptions)
1637 self.host.quit()
1638
1639
1640 class Affiliations(base.CommandBase):
1641 def __init__(self, host):
1642 base.CommandBase.__init__(
1643 self,
1644 host,
1645 "affiliations",
1646 use_output=C.OUTPUT_DICT,
1647 use_pubsub=True,
1648 help=_("retrieve all affiliations on a service"),
1649 )
1650
1651 def add_parser_options(self):
1652 pass
1653
1654 async def start(self):
1655 try:
1656 affiliations = await self.host.bridge.ps_affiliations_get(
1657 self.args.service,
1658 self.args.node,
1659 self.profile,
1660 )
1661 except Exception as e:
1662 self.disp(f"can't get node affiliations: {e}", error=True)
1663 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1664 else:
1665 await self.output(affiliations)
1666 self.host.quit()
1667
1668
1669 class Reference(base.CommandBase):
1670 def __init__(self, host):
1671 super().__init__(
1672 host,
1673 "reference",
1674 use_pubsub=True,
1675 pubsub_flags={C.NODE, C.SINGLE_ITEM},
1676 help=_("send a reference/mention to pubsub item"),
1677 )
1678
1679 def add_parser_options(self):
1680 self.parser.add_argument(
1681 "-t",
1682 "--type",
1683 default="mention",
1684 choices=("data", "mention"),
1685 help=_("type of reference to send (DEFAULT: mention)"),
1686 )
1687 self.parser.add_argument(
1688 "recipient",
1689 help=_("recipient of the reference")
1690 )
1691
1692 async def start(self):
1693 service = self.args.service or await self.host.get_profile_jid()
1694 if self.args.item:
1695 anchor = uri.build_xmpp_uri(
1696 "pubsub", path=service, node=self.args.node, item=self.args.item
1697 )
1698 else:
1699 anchor = uri.build_xmpp_uri("pubsub", path=service, node=self.args.node)
1700
1701 try:
1702 await self.host.bridge.reference_send(
1703 self.args.recipient,
1704 anchor,
1705 self.args.type,
1706 "",
1707 self.profile,
1708 )
1709 except Exception as e:
1710 self.disp(f"can't send reference: {e}", error=True)
1711 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1712 else:
1713 self.host.quit()
1714
1715
1716 class Search(base.CommandBase):
1717 """This command do a search without using MAM
1718
1719 This commands checks every items it finds by itself,
1720 so it may be heavy in resources both for server and client
1721 """
1722
1723 RE_FLAGS = re.MULTILINE | re.UNICODE
1724 EXEC_ACTIONS = ("exec", "external")
1725
1726 def __init__(self, host):
1727 # FIXME: C.NO_MAX is not needed here, and this can be globally removed from consts
1728 # the only interest is to change the help string, but this can be explained
1729 # extensively in man pages (max is for each node found)
1730 base.CommandBase.__init__(
1731 self,
1732 host,
1733 "search",
1734 use_output=C.OUTPUT_XML,
1735 use_pubsub=True,
1736 pubsub_flags={C.MULTI_ITEMS, C.NO_MAX},
1737 use_verbose=True,
1738 help=_("search items corresponding to filters"),
1739 )
1740
1741 @property
1742 def etree(self):
1743 """load lxml.etree only if needed"""
1744 if self._etree is None:
1745 from lxml import etree
1746
1747 self._etree = etree
1748 return self._etree
1749
1750 def filter_opt(self, value, type_):
1751 return (type_, value)
1752
1753 def filter_flag(self, value, type_):
1754 value = C.bool(value)
1755 return (type_, value)
1756
1757 def add_parser_options(self):
1758 self.parser.add_argument(
1759 "-D",
1760 "--max-depth",
1761 type=int,
1762 default=0,
1763 help=_(
1764 "maximum depth of recursion (will search linked nodes if > 0, "
1765 "DEFAULT: 0)"
1766 ),
1767 )
1768 self.parser.add_argument(
1769 "-M",
1770 "--node-max",
1771 type=int,
1772 default=30,
1773 help=_(
1774 "maximum number of items to get per node ({} to get all items, "
1775 "DEFAULT: 30)".format(C.NO_LIMIT)
1776 ),
1777 )
1778 self.parser.add_argument(
1779 "-N",
1780 "--namespace",
1781 action="append",
1782 nargs=2,
1783 default=[],
1784 metavar="NAME NAMESPACE",
1785 help=_("namespace to use for xpath"),
1786 )
1787
1788 # filters
1789 filter_text = partial(self.filter_opt, type_="text")
1790 filter_re = partial(self.filter_opt, type_="regex")
1791 filter_xpath = partial(self.filter_opt, type_="xpath")
1792 filter_python = partial(self.filter_opt, type_="python")
1793 filters = self.parser.add_argument_group(
1794 _("filters"),
1795 _("only items corresponding to following filters will be kept"),
1796 )
1797 filters.add_argument(
1798 "-t",
1799 "--text",
1800 action="append",
1801 dest="filters",
1802 type=filter_text,
1803 metavar="TEXT",
1804 help=_("full text filter, item must contain this string (XML included)"),
1805 )
1806 filters.add_argument(
1807 "-r",
1808 "--regex",
1809 action="append",
1810 dest="filters",
1811 type=filter_re,
1812 metavar="EXPRESSION",
1813 help=_("like --text but using a regular expression"),
1814 )
1815 filters.add_argument(
1816 "-x",
1817 "--xpath",
1818 action="append",
1819 dest="filters",
1820 type=filter_xpath,
1821 metavar="XPATH",
1822 help=_("filter items which has elements matching this xpath"),
1823 )
1824 filters.add_argument(
1825 "-P",
1826 "--python",
1827 action="append",
1828 dest="filters",
1829 type=filter_python,
1830 metavar="PYTHON_CODE",
1831 help=_(
1832 "Python expression which much return a bool (True to keep item, "
1833 'False to reject it). "item" is raw text item, "item_xml" is '
1834 "lxml's etree.Element"
1835 ),
1836 )
1837
1838 # filters flags
1839 flag_case = partial(self.filter_flag, type_="ignore-case")
1840 flag_invert = partial(self.filter_flag, type_="invert")
1841 flag_dotall = partial(self.filter_flag, type_="dotall")
1842 flag_matching = partial(self.filter_flag, type_="only-matching")
1843 flags = self.parser.add_argument_group(
1844 _("filters flags"),
1845 _("filters modifiers (change behaviour of following filters)"),
1846 )
1847 flags.add_argument(
1848 "-C",
1849 "--ignore-case",
1850 action="append",
1851 dest="filters",
1852 type=flag_case,
1853 const=("ignore-case", True),
1854 nargs="?",
1855 metavar="BOOLEAN",
1856 help=_("(don't) ignore case in following filters (DEFAULT: case sensitive)"),
1857 )
1858 flags.add_argument(
1859 "-I",
1860 "--invert",
1861 action="append",
1862 dest="filters",
1863 type=flag_invert,
1864 const=("invert", True),
1865 nargs="?",
1866 metavar="BOOLEAN",
1867 help=_("(don't) invert effect of following filters (DEFAULT: don't invert)"),
1868 )
1869 flags.add_argument(
1870 "-A",
1871 "--dot-all",
1872 action="append",
1873 dest="filters",
1874 type=flag_dotall,
1875 const=("dotall", True),
1876 nargs="?",
1877 metavar="BOOLEAN",
1878 help=_("(don't) use DOTALL option for regex (DEFAULT: don't use)"),
1879 )
1880 flags.add_argument(
1881 "-k",
1882 "--only-matching",
1883 action="append",
1884 dest="filters",
1885 type=flag_matching,
1886 const=("only-matching", True),
1887 nargs="?",
1888 metavar="BOOLEAN",
1889 help=_("keep only the matching part of the item"),
1890 )
1891
1892 # action
1893 self.parser.add_argument(
1894 "action",
1895 default="print",
1896 nargs="?",
1897 choices=("print", "exec", "external"),
1898 help=_("action to do on found items (DEFAULT: print)"),
1899 )
1900 self.parser.add_argument("command", nargs=argparse.REMAINDER)
1901
1902 async def get_items(self, depth, service, node, items):
1903 self.to_get += 1
1904 try:
1905 ps_result = data_format.deserialise(
1906 await self.host.bridge.ps_items_get(
1907 service,
1908 node,
1909 self.args.node_max,
1910 items,
1911 "",
1912 self.get_pubsub_extra(),
1913 self.profile,
1914 )
1915 )
1916 except Exception as e:
1917 self.disp(
1918 f"can't get pubsub items at {service} (node: {node}): {e}",
1919 error=True,
1920 )
1921 self.to_get -= 1
1922 else:
1923 await self.search(ps_result, depth)
1924
1925 def _check_pubsub_url(self, match, found_nodes):
1926 """check that the matched URL is an xmpp: one
1927
1928 @param found_nodes(list[unicode]): found_nodes
1929 this list will be filled while xmpp: URIs are discovered
1930 """
1931 url = match.group(0)
1932 if url.startswith("xmpp"):
1933 try:
1934 url_data = uri.parse_xmpp_uri(url)
1935 except ValueError:
1936 return
1937 if url_data["type"] == "pubsub":
1938 found_node = {"service": url_data["path"], "node": url_data["node"]}
1939 if "item" in url_data:
1940 found_node["item"] = url_data["item"]
1941 found_nodes.append(found_node)
1942
1943 async def get_sub_nodes(self, item, depth):
1944 """look for pubsub URIs in item, and get_items on the linked nodes"""
1945 found_nodes = []
1946 checkURI = partial(self._check_pubsub_url, found_nodes=found_nodes)
1947 strings.RE_URL.sub(checkURI, item)
1948 for data in found_nodes:
1949 await self.get_items(
1950 depth + 1,
1951 data["service"],
1952 data["node"],
1953 [data["item"]] if "item" in data else [],
1954 )
1955
1956 def parseXml(self, item):
1957 try:
1958 return self.etree.fromstring(item)
1959 except self.etree.XMLSyntaxError:
1960 self.disp(
1961 _(
1962 "item doesn't looks like XML, you have probably used --only-matching "
1963 "somewhere before and we have no more XML"
1964 ),
1965 error=True,
1966 )
1967 self.host.quit(C.EXIT_BAD_ARG)
1968
1969 def filter(self, item):
1970 """apply filters given on command line
1971
1972 if only-matching is used, item may be modified
1973 @return (tuple[bool, unicode]): a tuple with:
1974 - keep: True if item passed the filters
1975 - item: it is returned in case of modifications
1976 """
1977 ignore_case = False
1978 invert = False
1979 dotall = False
1980 only_matching = False
1981 item_xml = None
1982 for type_, value in self.args.filters:
1983 keep = True
1984
1985 ## filters
1986
1987 if type_ == "text":
1988 if ignore_case:
1989 if value.lower() not in item.lower():
1990 keep = False
1991 else:
1992 if value not in item:
1993 keep = False
1994 if keep and only_matching:
1995 # doesn't really make sens to keep a fixed string
1996 # so we raise an error
1997 self.host.disp(
1998 _("--only-matching used with fixed --text string, are you sure?"),
1999 error=True,
2000 )
2001 self.host.quit(C.EXIT_BAD_ARG)
2002 elif type_ == "regex":
2003 flags = self.RE_FLAGS
2004 if ignore_case:
2005 flags |= re.IGNORECASE
2006 if dotall:
2007 flags |= re.DOTALL
2008 match = re.search(value, item, flags)
2009 keep = match != None
2010 if keep and only_matching:
2011 item = match.group()
2012 item_xml = None
2013 elif type_ == "xpath":
2014 if item_xml is None:
2015 item_xml = self.parseXml(item)
2016 try:
2017 elts = item_xml.xpath(value, namespaces=self.args.namespace)
2018 except self.etree.XPathEvalError as e:
2019 self.disp(_("can't use xpath: {reason}").format(reason=e), error=True)
2020 self.host.quit(C.EXIT_BAD_ARG)
2021 keep = bool(elts)
2022 if keep and only_matching:
2023 item_xml = elts[0]
2024 try:
2025 item = self.etree.tostring(item_xml, encoding="unicode")
2026 except TypeError:
2027 # we have a string only, not an element
2028 item = str(item_xml)
2029 item_xml = None
2030 elif type_ == "python":
2031 if item_xml is None:
2032 item_xml = self.parseXml(item)
2033 cmd_ns = {"etree": self.etree, "item": item, "item_xml": item_xml}
2034 try:
2035 keep = eval(value, cmd_ns)
2036 except SyntaxError as e:
2037 self.disp(str(e), error=True)
2038 self.host.quit(C.EXIT_BAD_ARG)
2039
2040 ## flags
2041
2042 elif type_ == "ignore-case":
2043 ignore_case = value
2044 elif type_ == "invert":
2045 invert = value
2046 #  we need to continue, else loop would end here
2047 continue
2048 elif type_ == "dotall":
2049 dotall = value
2050 elif type_ == "only-matching":
2051 only_matching = value
2052 else:
2053 raise exceptions.InternalError(
2054 _("unknown filter type {type}").format(type=type_)
2055 )
2056
2057 if invert:
2058 keep = not keep
2059 if not keep:
2060 return False, item
2061
2062 return True, item
2063
2064 async def do_item_action(self, item, metadata):
2065 """called when item has been kepts and the action need to be done
2066
2067 @param item(unicode): accepted item
2068 """
2069 action = self.args.action
2070 if action == "print" or self.host.verbosity > 0:
2071 try:
2072 await self.output(item)
2073 except self.etree.XMLSyntaxError:
2074 # item is not valid XML, but a string
2075 # can happen when --only-matching is used
2076 self.disp(item)
2077 if action in self.EXEC_ACTIONS:
2078 item_elt = self.parseXml(item)
2079 if action == "exec":
2080 use = {
2081 "service": metadata["service"],
2082 "node": metadata["node"],
2083 "item": item_elt.get("id"),
2084 "profile": self.profile,
2085 }
2086 # we need to send a copy of self.args.command
2087 # else it would be modified
2088 parser_args, use_args = arg_tools.get_use_args(
2089 self.host, self.args.command, use, verbose=self.host.verbosity > 1
2090 )
2091 cmd_args = sys.argv[0:1] + parser_args + use_args
2092 else:
2093 cmd_args = self.args.command
2094
2095 self.disp(
2096 "COMMAND: {command}".format(
2097 command=" ".join([arg_tools.escape(a) for a in cmd_args])
2098 ),
2099 2,
2100 )
2101 if action == "exec":
2102 p = await asyncio.create_subprocess_exec(*cmd_args)
2103 ret = await p.wait()
2104 else:
2105 p = await asyncio.create_subprocess_exec(*cmd_args, stdin=subprocess.PIPE)
2106 await p.communicate(item.encode(sys.getfilesystemencoding()))
2107 ret = p.returncode
2108 if ret != 0:
2109 self.disp(
2110 A.color(
2111 C.A_FAILURE,
2112 _("executed command failed with exit code {ret}").format(ret=ret),
2113 )
2114 )
2115
2116 async def search(self, ps_result, depth):
2117 """callback of get_items
2118
2119 this method filters items, get sub nodes if needed,
2120 do the requested action, and exit the command when everything is done
2121 @param items_data(tuple): result of get_items
2122 @param depth(int): current depth level
2123 0 for first node, 1 for first children, and so on
2124 """
2125 for item in ps_result["items"]:
2126 if depth < self.args.max_depth:
2127 await self.get_sub_nodes(item, depth)
2128 keep, item = self.filter(item)
2129 if not keep:
2130 continue
2131 await self.do_item_action(item, ps_result)
2132
2133 #  we check if we got all get_items results
2134 self.to_get -= 1
2135 if self.to_get == 0:
2136 # yes, we can quit
2137 self.host.quit()
2138 assert self.to_get > 0
2139
2140 async def start(self):
2141 if self.args.command:
2142 if self.args.action not in self.EXEC_ACTIONS:
2143 self.parser.error(
2144 _("Command can only be used with {actions} actions").format(
2145 actions=", ".join(self.EXEC_ACTIONS)
2146 )
2147 )
2148 else:
2149 if self.args.action in self.EXEC_ACTIONS:
2150 self.parser.error(_("you need to specify a command to execute"))
2151 if not self.args.node:
2152 # TODO: handle get service affiliations when node is not set
2153 self.parser.error(_("empty node is not handled yet"))
2154 # to_get is increased on each get and decreased on each answer
2155 # when it reach 0 again, the command is finished
2156 self.to_get = 0
2157 self._etree = None
2158 if self.args.filters is None:
2159 self.args.filters = []
2160 self.args.namespace = dict(
2161 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")]
2162 )
2163 await self.get_items(0, self.args.service, self.args.node, self.args.items)
2164
2165
2166 class Transform(base.CommandBase):
2167 def __init__(self, host):
2168 base.CommandBase.__init__(
2169 self,
2170 host,
2171 "transform",
2172 use_pubsub=True,
2173 pubsub_flags={C.NODE, C.MULTI_ITEMS},
2174 help=_("modify items of a node using an external command/script"),
2175 )
2176
2177 def add_parser_options(self):
2178 self.parser.add_argument(
2179 "--apply",
2180 action="store_true",
2181 help=_("apply transformation (DEFAULT: do a dry run)"),
2182 )
2183 self.parser.add_argument(
2184 "--admin",
2185 action="store_true",
2186 help=_("do a pubsub admin request, needed to change publisher"),
2187 )
2188 self.parser.add_argument(
2189 "-I",
2190 "--ignore-errors",
2191 action="store_true",
2192 help=_(
2193 "if command return a non zero exit code, ignore the item and continue"
2194 ),
2195 )
2196 self.parser.add_argument(
2197 "-A",
2198 "--all",
2199 action="store_true",
2200 help=_("get all items by looping over all pages using RSM"),
2201 )
2202 self.parser.add_argument(
2203 "command_path",
2204 help=_(
2205 "path to the command to use. Will be called repetitivly with an "
2206 "item as input. Output (full item XML) will be used as new one. "
2207 'Return "DELETE" string to delete the item, and "SKIP" to ignore it'
2208 ),
2209 )
2210
2211 async def ps_items_send_cb(self, item_ids, metadata):
2212 if item_ids:
2213 self.disp(
2214 _("items published with ids {item_ids}").format(
2215 item_ids=", ".join(item_ids)
2216 )
2217 )
2218 else:
2219 self.disp(_("items published"))
2220 if self.args.all:
2221 return await self.handle_next_page(metadata)
2222 else:
2223 self.host.quit()
2224
2225 async def handle_next_page(self, metadata):
2226 """Retrieve new page through RSM or quit if we're in the last page
2227
2228 use to handle --all option
2229 @param metadata(dict): metadata as returned by ps_items_get
2230 """
2231 try:
2232 last = metadata["rsm"]["last"]
2233 index = int(metadata["rsm"]["index"])
2234 count = int(metadata["rsm"]["count"])
2235 except KeyError:
2236 self.disp(
2237 _("Can't retrieve all items, RSM metadata not available"), error=True
2238 )
2239 self.host.quit(C.EXIT_MISSING_FEATURE)
2240 except ValueError as e:
2241 self.disp(
2242 _("Can't retrieve all items, bad RSM metadata: {msg}").format(msg=e),
2243 error=True,
2244 )
2245 self.host.quit(C.EXIT_ERROR)
2246
2247 if index + self.args.rsm_max >= count:
2248 self.disp(_("All items transformed"))
2249 self.host.quit(0)
2250
2251 self.disp(
2252 _("Retrieving next page ({page_idx}/{page_total})").format(
2253 page_idx=int(index / self.args.rsm_max) + 1,
2254 page_total=int(count / self.args.rsm_max),
2255 )
2256 )
2257
2258 extra = self.get_pubsub_extra()
2259 extra["rsm_after"] = last
2260 try:
2261 ps_result = await data_format.deserialise(
2262 self.host.bridge.ps_items_get(
2263 self.args.service,
2264 self.args.node,
2265 self.args.rsm_max,
2266 self.args.items,
2267 "",
2268 data_format.serialise(extra),
2269 self.profile,
2270 )
2271 )
2272 except Exception as e:
2273 self.disp(f"can't retrieve items: {e}", error=True)
2274 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2275 else:
2276 await self.ps_items_get_cb(ps_result)
2277
2278 async def ps_items_get_cb(self, ps_result):
2279 encoding = "utf-8"
2280 new_items = []
2281
2282 for item in ps_result["items"]:
2283 if self.check_duplicates:
2284 # this is used when we are not ordering by creation
2285 # to avoid infinite loop
2286 item_elt, __ = xml_tools.etree_parse(self, item)
2287 item_id = item_elt.get("id")
2288 if item_id in self.items_ids:
2289 self.disp(
2290 _(
2291 "Duplicate found on item {item_id}, we have probably handled "
2292 "all items."
2293 ).format(item_id=item_id)
2294 )
2295 self.host.quit()
2296 self.items_ids.append(item_id)
2297
2298 # we launch the command to filter the item
2299 try:
2300 p = await asyncio.create_subprocess_exec(
2301 self.args.command_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE
2302 )
2303 except OSError as e:
2304 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR
2305 self.disp(f"Can't execute the command: {e}", error=True)
2306 self.host.quit(exit_code)
2307 encoding = "utf-8"
2308 cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding))
2309 ret = p.returncode
2310 if ret != 0:
2311 self.disp(
2312 f"The command returned a non zero status while parsing the "
2313 f"following item:\n\n{item}",
2314 error=True,
2315 )
2316 if self.args.ignore_errors:
2317 continue
2318 else:
2319 self.host.quit(C.EXIT_CMD_ERROR)
2320 if cmd_std_err is not None:
2321 cmd_std_err = cmd_std_err.decode(encoding, errors="ignore")
2322 self.disp(cmd_std_err, error=True)
2323 cmd_std_out = cmd_std_out.decode(encoding).strip()
2324 if cmd_std_out == "DELETE":
2325 item_elt, __ = xml_tools.etree_parse(self, item)
2326 item_id = item_elt.get("id")
2327 self.disp(_("Deleting item {item_id}").format(item_id=item_id))
2328 if self.args.apply:
2329 try:
2330 await self.host.bridge.ps_item_retract(
2331 self.args.service,
2332 self.args.node,
2333 item_id,
2334 False,
2335 self.profile,
2336 )
2337 except Exception as e:
2338 self.disp(f"can't delete item {item_id}: {e}", error=True)
2339 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2340 continue
2341 elif cmd_std_out == "SKIP":
2342 item_elt, __ = xml_tools.etree_parse(self, item)
2343 item_id = item_elt.get("id")
2344 self.disp(_("Skipping item {item_id}").format(item_id=item_id))
2345 continue
2346 element, etree = xml_tools.etree_parse(self, cmd_std_out)
2347
2348 # at this point command has been run and we have a etree.Element object
2349 if element.tag not in ("item", "{http://jabber.org/protocol/pubsub}item"):
2350 self.disp(
2351 "your script must return a whole item, this is not:\n{xml}".format(
2352 xml=etree.tostring(element, encoding="unicode")
2353 ),
2354 error=True,
2355 )
2356 self.host.quit(C.EXIT_DATA_ERROR)
2357
2358 if not self.args.apply:
2359 # we have a dry run, we just display filtered items
2360 serialised = etree.tostring(
2361 element, encoding="unicode", pretty_print=True
2362 )
2363 self.disp(serialised)
2364 else:
2365 new_items.append(etree.tostring(element, encoding="unicode"))
2366
2367 if not self.args.apply:
2368 # on dry run we have nothing to wait for, we can quit
2369 if self.args.all:
2370 return await self.handle_next_page(ps_result)
2371 self.host.quit()
2372 else:
2373 if self.args.admin:
2374 bridge_method = self.host.bridge.ps_admin_items_send
2375 else:
2376 bridge_method = self.host.bridge.ps_items_send
2377
2378 try:
2379 ps_items_send_result = await bridge_method(
2380 self.args.service,
2381 self.args.node,
2382 new_items,
2383 "",
2384 self.profile,
2385 )
2386 except Exception as e:
2387 self.disp(f"can't send item: {e}", error=True)
2388 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2389 else:
2390 await self.ps_items_send_cb(ps_items_send_result, metadata=ps_result)
2391
2392 async def start(self):
2393 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
2394 self.check_duplicates = True
2395 self.items_ids = []
2396 self.disp(
2397 A.color(
2398 A.FG_RED,
2399 A.BOLD,
2400 '/!\\ "--all" should be used with "--order-by creation" /!\\\n',
2401 A.RESET,
2402 "We'll update items, so order may change during transformation,\n"
2403 "we'll try to mitigate that by stopping on first duplicate,\n"
2404 "but this method is not safe, and some items may be missed.\n---\n",
2405 )
2406 )
2407 else:
2408 self.check_duplicates = False
2409
2410 try:
2411 ps_result = data_format.deserialise(
2412 await self.host.bridge.ps_items_get(
2413 self.args.service,
2414 self.args.node,
2415 self.args.max,
2416 self.args.items,
2417 "",
2418 self.get_pubsub_extra(),
2419 self.profile,
2420 )
2421 )
2422 except Exception as e:
2423 self.disp(f"can't retrieve items: {e}", error=True)
2424 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2425 else:
2426 await self.ps_items_get_cb(ps_result)
2427
2428
2429 class Uri(base.CommandBase):
2430 def __init__(self, host):
2431 base.CommandBase.__init__(
2432 self,
2433 host,
2434 "uri",
2435 use_profile=False,
2436 use_pubsub=True,
2437 pubsub_flags={C.NODE, C.SINGLE_ITEM},
2438 help=_("build URI"),
2439 )
2440
2441 def add_parser_options(self):
2442 self.parser.add_argument(
2443 "-p",
2444 "--profile",
2445 default=C.PROF_KEY_DEFAULT,
2446 help=_("profile (used when no server is specified)"),
2447 )
2448
2449 def display_uri(self, jid_):
2450 uri_args = {}
2451 if not self.args.service:
2452 self.args.service = jid.JID(jid_).bare
2453
2454 for key in ("node", "service", "item"):
2455 value = getattr(self.args, key)
2456 if key == "service":
2457 key = "path"
2458 if value:
2459 uri_args[key] = value
2460 self.disp(uri.build_xmpp_uri("pubsub", **uri_args))
2461 self.host.quit()
2462
2463 async def start(self):
2464 if not self.args.service:
2465 try:
2466 jid_ = await self.host.bridge.param_get_a_async(
2467 "JabberID", "Connection", profile_key=self.args.profile
2468 )
2469 except Exception as e:
2470 self.disp(f"can't retrieve jid: {e}", error=True)
2471 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2472 else:
2473 self.display_uri(jid_)
2474 else:
2475 self.display_uri(None)
2476
2477
2478 class AttachmentGet(base.CommandBase):
2479 def __init__(self, host):
2480 super().__init__(
2481 host,
2482 "get",
2483 use_output=C.OUTPUT_LIST_DICT,
2484 use_pubsub=True,
2485 pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
2486 help=_("get data attached to an item"),
2487 )
2488
2489 def add_parser_options(self):
2490 self.parser.add_argument(
2491 "-j",
2492 "--jid",
2493 action="append",
2494 dest="jids",
2495 help=_(
2496 "get attached data published only by those JIDs (DEFAULT: get all "
2497 "attached data)"
2498 )
2499 )
2500
2501 async def start(self):
2502 try:
2503 attached_data, __ = await self.host.bridge.ps_attachments_get(
2504 self.args.service,
2505 self.args.node,
2506 self.args.item,
2507 self.args.jids or [],
2508 "",
2509 self.profile,
2510 )
2511 except Exception as e:
2512 self.disp(f"can't get attached data: {e}", error=True)
2513 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2514 else:
2515 attached_data = data_format.deserialise(attached_data, type_check=list)
2516 await self.output(attached_data)
2517 self.host.quit(C.EXIT_OK)
2518
2519
2520 class AttachmentSet(base.CommandBase):
2521 def __init__(self, host):
2522 super().__init__(
2523 host,
2524 "set",
2525 use_pubsub=True,
2526 pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
2527 help=_("attach data to an item"),
2528 )
2529
2530 def add_parser_options(self):
2531 self.parser.add_argument(
2532 "--replace",
2533 action="store_true",
2534 help=_(
2535 "replace previous versions of attachments (DEFAULT: update previous "
2536 "version)"
2537 )
2538 )
2539 self.parser.add_argument(
2540 "-N",
2541 "--noticed",
2542 metavar="BOOLEAN",
2543 nargs="?",
2544 default="keep",
2545 help=_("mark item as (un)noticed (DEFAULT: keep current value))")
2546 )
2547 self.parser.add_argument(
2548 "-r",
2549 "--reactions",
2550 # FIXME: to be replaced by "extend" when we stop supporting python 3.7
2551 action="append",
2552 help=_("emojis to add to react to an item")
2553 )
2554 self.parser.add_argument(
2555 "-R",
2556 "--reactions-remove",
2557 # FIXME: to be replaced by "extend" when we stop supporting python 3.7
2558 action="append",
2559 help=_("emojis to remove from reactions to an item")
2560 )
2561
2562 async def start(self):
2563 attachments_data = {
2564 "service": self.args.service,
2565 "node": self.args.node,
2566 "id": self.args.item,
2567 "extra": {}
2568 }
2569 operation = "replace" if self.args.replace else "update"
2570 if self.args.noticed != "keep":
2571 if self.args.noticed is None:
2572 self.args.noticed = C.BOOL_TRUE
2573 attachments_data["extra"]["noticed"] = C.bool(self.args.noticed)
2574
2575 if self.args.reactions or self.args.reactions_remove:
2576 reactions = attachments_data["extra"]["reactions"] = {
2577 "operation": operation
2578 }
2579 if self.args.replace:
2580 reactions["reactions"] = self.args.reactions
2581 else:
2582 reactions["add"] = self.args.reactions
2583 reactions["remove"] = self.args.reactions_remove
2584
2585
2586 if not attachments_data["extra"]:
2587 self.parser.error(_("At leat one attachment must be specified."))
2588
2589 try:
2590 await self.host.bridge.ps_attachments_set(
2591 data_format.serialise(attachments_data),
2592 self.profile,
2593 )
2594 except Exception as e:
2595 self.disp(f"can't attach data to item: {e}", error=True)
2596 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2597 else:
2598 self.disp("data attached")
2599 self.host.quit(C.EXIT_OK)
2600
2601
2602 class Attachments(base.CommandBase):
2603 subcommands = (AttachmentGet, AttachmentSet)
2604
2605 def __init__(self, host):
2606 super().__init__(
2607 host,
2608 "attachments",
2609 use_profile=False,
2610 help=_("set or retrieve items attachments"),
2611 )
2612
2613
2614 class SignatureSign(base.CommandBase):
2615 def __init__(self, host):
2616 super().__init__(
2617 host,
2618 "sign",
2619 use_pubsub=True,
2620 pubsub_flags={C.NODE, C.SINGLE_ITEM},
2621 help=_("sign an item"),
2622 )
2623
2624 def add_parser_options(self):
2625 pass
2626
2627 async def start(self):
2628 attachments_data = {
2629 "service": self.args.service,
2630 "node": self.args.node,
2631 "id": self.args.item,
2632 "extra": {
2633 # we set None to use profile's bare JID
2634 "signature": {"signer": None}
2635 }
2636 }
2637 try:
2638 await self.host.bridge.ps_attachments_set(
2639 data_format.serialise(attachments_data),
2640 self.profile,
2641 )
2642 except Exception as e:
2643 self.disp(f"can't sign the item: {e}", error=True)
2644 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2645 else:
2646 self.disp(f"item {self.args.item!r} has been signed")
2647 self.host.quit(C.EXIT_OK)
2648
2649
2650 class SignatureCheck(base.CommandBase):
2651 def __init__(self, host):
2652 super().__init__(
2653 host,
2654 "check",
2655 use_output=C.OUTPUT_DICT,
2656 use_pubsub=True,
2657 pubsub_flags={C.SERVICE, C.NODE, C.SINGLE_ITEM},
2658 help=_("check the validity of pubsub signature"),
2659 )
2660
2661 def add_parser_options(self):
2662 self.parser.add_argument(
2663 "signature",
2664 metavar="JSON",
2665 help=_("signature data")
2666 )
2667
2668 async def start(self):
2669 try:
2670 ret_s = await self.host.bridge.ps_signature_check(
2671 self.args.service,
2672 self.args.node,
2673 self.args.item,
2674 self.args.signature,
2675 self.profile,
2676 )
2677 except Exception as e:
2678 self.disp(f"can't check signature: {e}", error=True)
2679 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2680 else:
2681 await self.output(data_format.deserialise((ret_s)))
2682 self.host.quit()
2683
2684
2685 class Signature(base.CommandBase):
2686 subcommands = (
2687 SignatureSign,
2688 SignatureCheck,
2689 )
2690
2691 def __init__(self, host):
2692 super().__init__(
2693 host, "signature", use_profile=False, help=_("items signatures")
2694 )
2695
2696
2697 class SecretShare(base.CommandBase):
2698 def __init__(self, host):
2699 super().__init__(
2700 host,
2701 "share",
2702 use_pubsub=True,
2703 pubsub_flags={C.NODE},
2704 help=_("share a secret to let other entity encrypt or decrypt items"),
2705 )
2706
2707 def add_parser_options(self):
2708 self.parser.add_argument(
2709 "-k", "--key", metavar="ID", dest="secret_ids", action="append", default=[],
2710 help=_(
2711 "only share secrets with those IDs (default: share all secrets of the "
2712 "node)"
2713 )
2714 )
2715 self.parser.add_argument(
2716 "recipient", metavar="JID", help=_("entity who must get the shared secret")
2717 )
2718
2719 async def start(self):
2720 try:
2721 await self.host.bridge.ps_secret_share(
2722 self.args.recipient,
2723 self.args.service,
2724 self.args.node,
2725 self.args.secret_ids,
2726 self.profile,
2727 )
2728 except Exception as e:
2729 self.disp(f"can't share secret: {e}", error=True)
2730 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2731 else:
2732 self.disp("secrets have been shared")
2733 self.host.quit(C.EXIT_OK)
2734
2735
2736 class SecretRevoke(base.CommandBase):
2737 def __init__(self, host):
2738 super().__init__(
2739 host,
2740 "revoke",
2741 use_pubsub=True,
2742 pubsub_flags={C.NODE},
2743 help=_("revoke an encrypted node secret"),
2744 )
2745
2746 def add_parser_options(self):
2747 self.parser.add_argument(
2748 "secret_id", help=_("ID of the secrets to revoke")
2749 )
2750 self.parser.add_argument(
2751 "-r", "--recipient", dest="recipients", metavar="JID", action="append",
2752 default=[], help=_(
2753 "entity who must get the revocation notification (default: send to all "
2754 "entities known to have the shared secret)"
2755 )
2756 )
2757
2758 async def start(self):
2759 try:
2760 await self.host.bridge.ps_secret_revoke(
2761 self.args.service,
2762 self.args.node,
2763 self.args.secret_id,
2764 self.args.recipients,
2765 self.profile,
2766 )
2767 except Exception as e:
2768 self.disp(f"can't revoke secret: {e}", error=True)
2769 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2770 else:
2771 self.disp("secret {self.args.secret_id} has been revoked.")
2772 self.host.quit(C.EXIT_OK)
2773
2774
2775 class SecretRotate(base.CommandBase):
2776 def __init__(self, host):
2777 super().__init__(
2778 host,
2779 "rotate",
2780 use_pubsub=True,
2781 pubsub_flags={C.NODE},
2782 help=_("revoke existing secrets, create a new one and send notifications"),
2783 )
2784
2785 def add_parser_options(self):
2786 self.parser.add_argument(
2787 "-r", "--recipient", dest="recipients", metavar="JID", action="append",
2788 default=[], help=_(
2789 "entity who must get the revocation and shared secret notifications "
2790 "(default: send to all entities known to have the shared secret)"
2791 )
2792 )
2793
2794 async def start(self):
2795 try:
2796 await self.host.bridge.ps_secret_rotate(
2797 self.args.service,
2798 self.args.node,
2799 self.args.recipients,
2800 self.profile,
2801 )
2802 except Exception as e:
2803 self.disp(f"can't rotate secret: {e}", error=True)
2804 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2805 else:
2806 self.disp("secret has been rotated")
2807 self.host.quit(C.EXIT_OK)
2808
2809
2810 class SecretList(base.CommandBase):
2811 def __init__(self, host):
2812 super().__init__(
2813 host,
2814 "list",
2815 use_pubsub=True,
2816 use_verbose=True,
2817 pubsub_flags={C.NODE},
2818 help=_("list known secrets for a pubsub node"),
2819 use_output=C.OUTPUT_LIST_DICT
2820 )
2821
2822 def add_parser_options(self):
2823 pass
2824
2825 async def start(self):
2826 try:
2827 secrets = data_format.deserialise(await self.host.bridge.ps_secrets_list(
2828 self.args.service,
2829 self.args.node,
2830 self.profile,
2831 ), type_check=list)
2832 except Exception as e:
2833 self.disp(f"can't list node secrets: {e}", error=True)
2834 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2835 else:
2836 if not self.verbosity:
2837 # we don't print key if verbosity is not a least one, to avoid showing it
2838 # on the screen accidentally
2839 for secret in secrets:
2840 del secret["key"]
2841 await self.output(secrets)
2842 self.host.quit(C.EXIT_OK)
2843
2844
2845 class Secret(base.CommandBase):
2846 subcommands = (SecretShare, SecretRevoke, SecretRotate, SecretList)
2847
2848 def __init__(self, host):
2849 super().__init__(
2850 host,
2851 "secret",
2852 use_profile=False,
2853 help=_("handle encrypted nodes secrets"),
2854 )
2855
2856
2857 class HookCreate(base.CommandBase):
2858 def __init__(self, host):
2859 base.CommandBase.__init__(
2860 self,
2861 host,
2862 "create",
2863 use_pubsub=True,
2864 pubsub_flags={C.NODE},
2865 help=_("create a Pubsub hook"),
2866 )
2867
2868 def add_parser_options(self):
2869 self.parser.add_argument(
2870 "-t",
2871 "--type",
2872 default="python",
2873 choices=("python", "python_file", "python_code"),
2874 help=_("hook type"),
2875 )
2876 self.parser.add_argument(
2877 "-P",
2878 "--persistent",
2879 action="store_true",
2880 help=_("make hook persistent across restarts"),
2881 )
2882 self.parser.add_argument(
2883 "hook_arg",
2884 help=_("argument of the hook (depend of the type)"),
2885 )
2886
2887 @staticmethod
2888 def check_args(self):
2889 if self.args.type == "python_file":
2890 self.args.hook_arg = os.path.abspath(self.args.hook_arg)
2891 if not os.path.isfile(self.args.hook_arg):
2892 self.parser.error(
2893 _("{path} is not a file").format(path=self.args.hook_arg)
2894 )
2895
2896 async def start(self):
2897 self.check_args(self)
2898 try:
2899 await self.host.bridge.ps_hook_add(
2900 self.args.service,
2901 self.args.node,
2902 self.args.type,
2903 self.args.hook_arg,
2904 self.args.persistent,
2905 self.profile,
2906 )
2907 except Exception as e:
2908 self.disp(f"can't create hook: {e}", error=True)
2909 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2910 else:
2911 self.host.quit()
2912
2913
2914 class HookDelete(base.CommandBase):
2915 def __init__(self, host):
2916 base.CommandBase.__init__(
2917 self,
2918 host,
2919 "delete",
2920 use_pubsub=True,
2921 pubsub_flags={C.NODE},
2922 help=_("delete a Pubsub hook"),
2923 )
2924
2925 def add_parser_options(self):
2926 self.parser.add_argument(
2927 "-t",
2928 "--type",
2929 default="",
2930 choices=("", "python", "python_file", "python_code"),
2931 help=_("hook type to remove, empty to remove all (DEFAULT: remove all)"),
2932 )
2933 self.parser.add_argument(
2934 "-a",
2935 "--arg",
2936 dest="hook_arg",
2937 default="",
2938 help=_(
2939 "argument of the hook to remove, empty to remove all (DEFAULT: remove all)"
2940 ),
2941 )
2942
2943 async def start(self):
2944 HookCreate.check_args(self)
2945 try:
2946 nb_deleted = await self.host.bridge.ps_hook_remove(
2947 self.args.service,
2948 self.args.node,
2949 self.args.type,
2950 self.args.hook_arg,
2951 self.profile,
2952 )
2953 except Exception as e:
2954 self.disp(f"can't delete hook: {e}", error=True)
2955 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2956 else:
2957 self.disp(
2958 _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted)
2959 )
2960 self.host.quit()
2961
2962
2963 class HookList(base.CommandBase):
2964 def __init__(self, host):
2965 base.CommandBase.__init__(
2966 self,
2967 host,
2968 "list",
2969 use_output=C.OUTPUT_LIST_DICT,
2970 help=_("list hooks of a profile"),
2971 )
2972
2973 def add_parser_options(self):
2974 pass
2975
2976 async def start(self):
2977 try:
2978 data = await self.host.bridge.ps_hook_list(
2979 self.profile,
2980 )
2981 except Exception as e:
2982 self.disp(f"can't list hooks: {e}", error=True)
2983 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2984 else:
2985 if not data:
2986 self.disp(_("No hook found."))
2987 await self.output(data)
2988 self.host.quit()
2989
2990
2991 class Hook(base.CommandBase):
2992 subcommands = (HookCreate, HookDelete, HookList)
2993
2994 def __init__(self, host):
2995 super(Hook, self).__init__(
2996 host,
2997 "hook",
2998 use_profile=False,
2999 use_verbose=True,
3000 help=_("trigger action on Pubsub notifications"),
3001 )
3002
3003
3004 class Pubsub(base.CommandBase):
3005 subcommands = (
3006 Set,
3007 Get,
3008 Delete,
3009 Edit,
3010 Rename,
3011 Subscribe,
3012 Unsubscribe,
3013 Subscriptions,
3014 Affiliations,
3015 Reference,
3016 Search,
3017 Transform,
3018 Attachments,
3019 Signature,
3020 Secret,
3021 Hook,
3022 Uri,
3023 Node,
3024 Cache,
3025 )
3026
3027 def __init__(self, host):
3028 super(Pubsub, self).__init__(
3029 host, "pubsub", use_profile=False, help=_("PubSub nodes/items management")
3030 )