comparison sat_frontends/jp/cmd_pubsub.py @ 3040:fee60f17ebac

jp: jp asyncio port: /!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\ This patch implements the port of jp to asyncio, so it is now correctly using the bridge asynchronously, and it can be used with bridges like `pb`. This also simplify the code, notably for things which were previously implemented with many callbacks (like pagination with RSM). During the process, some behaviours have been modified/fixed, in jp and backends, check diff for details.
author Goffi <goffi@goffi.org>
date Wed, 25 Sep 2019 08:56:41 +0200
parents ab2696e34d29
children cea52c9ddfd9
comparison
equal deleted inserted replaced
3039:a1bc34f90fa5 3040:fee60f17ebac
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 20
21 import argparse
22 import os.path
23 import re
24 import sys
25 import subprocess
26 import asyncio
21 from . import base 27 from . import base
22 from sat.core.i18n import _ 28 from sat.core.i18n import _
23 from sat.core import exceptions 29 from sat.core import exceptions
24 from sat_frontends.jp.constants import Const as C 30 from sat_frontends.jp.constants import Const as C
25 from sat_frontends.jp import common 31 from sat_frontends.jp import common
27 from sat_frontends.jp import xml_tools 33 from sat_frontends.jp import xml_tools
28 from functools import partial 34 from functools import partial
29 from sat.tools.common import uri 35 from sat.tools.common import uri
30 from sat.tools.common.ansi import ANSI as A 36 from sat.tools.common.ansi import ANSI as A
31 from sat_frontends.tools import jid, strings 37 from sat_frontends.tools import jid, strings
32 import argparse
33 import os.path
34 import re
35 import subprocess
36 import sys
37 38
38 __commands__ = ["Pubsub"] 39 __commands__ = ["Pubsub"]
39 40
40 PUBSUB_TMP_DIR = "pubsub" 41 PUBSUB_TMP_DIR = "pubsub"
41 PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema" 42 PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema"
53 use_output=C.OUTPUT_DICT, 54 use_output=C.OUTPUT_DICT,
54 use_pubsub=True, 55 use_pubsub=True,
55 pubsub_flags={C.NODE}, 56 pubsub_flags={C.NODE},
56 help=_("retrieve node configuration"), 57 help=_("retrieve node configuration"),
57 ) 58 )
58 self.need_loop = True
59 59
60 def add_parser_options(self): 60 def add_parser_options(self):
61 self.parser.add_argument( 61 self.parser.add_argument(
62 "-k", 62 "-k",
63 "--key", 63 "--key",
70 return key[7:] if key.startswith("pubsub#") else key 70 return key[7:] if key.startswith("pubsub#") else key
71 71
72 def filterKey(self, key): 72 def filterKey(self, key):
73 return any((key == k or key == "pubsub#" + k) for k in self.args.keys) 73 return any((key == k or key == "pubsub#" + k) for k in self.args.keys)
74 74
75 def psNodeConfigurationGetCb(self, config_dict): 75 async def start(self):
76 key_filter = (lambda k: True) if not self.args.keys else self.filterKey 76 try:
77 config_dict = { 77 config_dict = await self.host.bridge.psNodeConfigurationGet(
78 self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k) 78 self.args.service,
79 } 79 self.args.node,
80 self.output(config_dict) 80 self.profile,
81 self.host.quit() 81 )
82 82 except Exception as e:
83 def psNodeConfigurationGetEb(self, failure_): 83 self.disp(f"can't get node configuration: {e}", error=True)
84 self.disp( 84 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
85 "can't get node configuration: {reason}".format(reason=failure_), error=True 85 else:
86 ) 86 key_filter = (lambda k: True) if not self.args.keys else self.filterKey
87 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 87 config_dict = {
88 88 self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k)
89 def start(self): 89 }
90 self.host.bridge.psNodeConfigurationGet( 90 await self.output(config_dict)
91 self.args.service, 91 self.host.quit()
92 self.args.node,
93 self.profile,
94 callback=self.psNodeConfigurationGetCb,
95 errback=self.psNodeConfigurationGetEb,
96 )
97 92
98 93
99 class NodeCreate(base.CommandBase): 94 class NodeCreate(base.CommandBase):
100 def __init__(self, host): 95 def __init__(self, host):
101 base.CommandBase.__init__( 96 base.CommandBase.__init__(
106 use_pubsub=True, 101 use_pubsub=True,
107 pubsub_flags={C.NODE}, 102 pubsub_flags={C.NODE},
108 use_verbose=True, 103 use_verbose=True,
109 help=_("create a node"), 104 help=_("create a node"),
110 ) 105 )
111 self.need_loop = True
112 106
113 def add_parser_options(self): 107 def add_parser_options(self):
114 self.parser.add_argument( 108 self.parser.add_argument(
115 "-f", 109 "-f",
116 "--field", 110 "--field",
126 "--full-prefix", 120 "--full-prefix",
127 action="store_true", 121 action="store_true",
128 help=_('don\'t prepend "pubsub#" prefix to field names'), 122 help=_('don\'t prepend "pubsub#" prefix to field names'),
129 ) 123 )
130 124
131 def psNodeCreateCb(self, node_id): 125 async def start(self):
132 if self.host.verbosity:
133 announce = _("node created successfully: ")
134 else:
135 announce = ""
136 self.disp(announce + node_id)
137 self.host.quit()
138
139 def psNodeCreateEb(self, failure_):
140 self.disp("can't create: {reason}".format(reason=failure_), error=True)
141 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
142
143 def start(self):
144 if not self.args.full_prefix: 126 if not self.args.full_prefix:
145 options = {"pubsub#" + k: v for k, v in self.args.fields} 127 options = {"pubsub#" + k: v for k, v in self.args.fields}
146 else: 128 else:
147 options = dict(self.args.fields) 129 options = dict(self.args.fields)
148 self.host.bridge.psNodeCreate( 130 try:
149 self.args.service, 131 node_id = await self.host.bridge.psNodeCreate(
150 self.args.node, 132 self.args.service,
151 options, 133 self.args.node,
152 self.profile, 134 options,
153 callback=self.psNodeCreateCb, 135 self.profile,
154 errback=partial( 136 )
155 self.errback, 137 except Exception as e:
156 msg=_("can't create node: {}"), 138 self.disp(msg=_(f"can't create node: {e}"), error=True)
157 exit_code=C.EXIT_BRIDGE_ERRBACK, 139 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
158 ), 140 else:
159 ) 141 if self.host.verbosity:
142 announce = _("node created successfully: ")
143 else:
144 announce = ""
145 self.disp(announce + node_id)
146 self.host.quit()
160 147
161 148
162 class NodePurge(base.CommandBase): 149 class NodePurge(base.CommandBase):
163 150
164 def __init__(self, host): 151 def __init__(self, host):
167 "purge", 154 "purge",
168 use_pubsub=True, 155 use_pubsub=True,
169 pubsub_flags={C.NODE}, 156 pubsub_flags={C.NODE},
170 help=_("purge a node (i.e. remove all items from it)"), 157 help=_("purge a node (i.e. remove all items from it)"),
171 ) 158 )
172 self.need_loop = True
173 159
174 def add_parser_options(self): 160 def add_parser_options(self):
175 self.parser.add_argument( 161 self.parser.add_argument(
176 "-f", 162 "-f",
177 "--force", 163 "--force",
178 action="store_true", 164 action="store_true",
179 help=_("purge node without confirmation"), 165 help=_("purge node without confirmation"),
180 ) 166 )
181 167
182 def psNodePurgeCb(self): 168 async def start(self):
183 self.disp(_("node [{node}] purged successfully").format(node=self.args.node))
184 self.host.quit()
185
186 def start(self):
187 if not self.args.force: 169 if not self.args.force:
188 if not self.args.service: 170 if not self.args.service:
189 message = _("Are you sure to purge PEP node [{node_id}]? " 171 message = _(
190 "This will delete ALL items from it!").format( 172 f"Are you sure to purge PEP node [{self.args.node}]? This will "
191 node_id=self.args.node 173 f"delete ALL items from it!")
192 )
193 else: 174 else:
194 message = _( 175 message = _(
195 "Are you sure to delete node [{node_id}] on service [{service}]? " 176 f"Are you sure to delete node [{self.args.node}] on service "
196 "This will delete ALL items from it!" 177 f"[{self.args.service}]? This will delete ALL items from it!")
197 ).format(node_id=self.args.node, service=self.args.service) 178 await self.host.confirmOrQuit(message, _("node purge cancelled"))
198 self.host.confirmOrQuit(message, _("node purge cancelled")) 179
199 180 try:
200 self.host.bridge.psNodePurge( 181 await self.host.bridge.psNodePurge(
201 self.args.service, 182 self.args.service,
202 self.args.node, 183 self.args.node,
203 self.profile, 184 self.profile,
204 callback=self.psNodePurgeCb, 185 )
205 errback=partial( 186 except Exception as e:
206 self.errback, 187 self.disp(msg=_(f"can't purge node: {e}"), error=True)
207 msg=_("can't purge node: {}"), 188 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
208 exit_code=C.EXIT_BRIDGE_ERRBACK, 189 else:
209 ), 190 self.disp(_(f"node [{self.args.node}] purged successfully"))
210 ) 191 self.host.quit()
211 192
212 193
213 class NodeDelete(base.CommandBase): 194 class NodeDelete(base.CommandBase):
214 def __init__(self, host): 195 def __init__(self, host):
215 base.CommandBase.__init__( 196 base.CommandBase.__init__(
218 "delete", 199 "delete",
219 use_pubsub=True, 200 use_pubsub=True,
220 pubsub_flags={C.NODE}, 201 pubsub_flags={C.NODE},
221 help=_("delete a node"), 202 help=_("delete a node"),
222 ) 203 )
223 self.need_loop = True
224 204
225 def add_parser_options(self): 205 def add_parser_options(self):
226 self.parser.add_argument( 206 self.parser.add_argument(
227 "-f", 207 "-f",
228 "--force", 208 "--force",
229 action="store_true", 209 action="store_true",
230 help=_("delete node without confirmation"), 210 help=_("delete node without confirmation"),
231 ) 211 )
232 212
233 def psNodeDeleteCb(self): 213 async def start(self):
234 self.disp(_("node [{node}] deleted successfully").format(node=self.args.node))
235 self.host.quit()
236
237 def start(self):
238 if not self.args.force: 214 if not self.args.force:
239 if not self.args.service: 215 if not self.args.service:
240 message = _("Are you sure to delete PEP node [{node_id}] ?").format( 216 message = _(f"Are you sure to delete PEP node [{self.args.node}] ?")
241 node_id=self.args.node
242 )
243 else: 217 else:
244 message = _( 218 message = _(f"Are you sure to delete node [{self.args.node}] on "
245 "Are you sure to delete node [{node_id}] on service [{service}] ?" 219 f"service [{self.args.service}]?")
246 ).format(node_id=self.args.node, service=self.args.service) 220 await self.host.confirmOrQuit(message, _("node deletion cancelled"))
247 self.host.confirmOrQuit(message, _("node deletion cancelled")) 221
248 222 try:
249 self.host.bridge.psNodeDelete( 223 await self.host.bridge.psNodeDelete(
250 self.args.service, 224 self.args.service,
251 self.args.node, 225 self.args.node,
252 self.profile, 226 self.profile,
253 callback=self.psNodeDeleteCb, 227 )
254 errback=partial( 228 except Exception as e:
255 self.errback, 229 self.disp(f"can't delete node: {e}", error=True)
256 msg=_("can't delete node: {}"), 230 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
257 exit_code=C.EXIT_BRIDGE_ERRBACK, 231 else:
258 ), 232 self.disp(_(f"node [{self.args.node}] deleted successfully"))
259 ) 233 self.host.quit()
260 234
261 235
262 class NodeSet(base.CommandBase): 236 class NodeSet(base.CommandBase):
263 def __init__(self, host): 237 def __init__(self, host):
264 base.CommandBase.__init__( 238 base.CommandBase.__init__(
269 use_pubsub=True, 243 use_pubsub=True,
270 pubsub_flags={C.NODE}, 244 pubsub_flags={C.NODE},
271 use_verbose=True, 245 use_verbose=True,
272 help=_("set node configuration"), 246 help=_("set node configuration"),
273 ) 247 )
274 self.need_loop = True
275 248
276 def add_parser_options(self): 249 def add_parser_options(self):
277 self.parser.add_argument( 250 self.parser.add_argument(
278 "-f", 251 "-f",
279 "--field", 252 "--field",
282 dest="fields", 255 dest="fields",
283 required=True, 256 required=True,
284 metavar=("KEY", "VALUE"), 257 metavar=("KEY", "VALUE"),
285 help=_("configuration field to set (required)"), 258 help=_("configuration field to set (required)"),
286 ) 259 )
287 260 self.parser.add_argument(
288 def psNodeConfigurationSetCb(self): 261 "-F",
289 self.disp(_("node configuration successful"), 1) 262 "--full-prefix",
290 self.host.quit() 263 action="store_true",
291 264 help=_('don\'t prepend "pubsub#" prefix to field names'),
292 def psNodeConfigurationSetEb(self, failure_): 265 )
293 self.disp(
294 "can't set node configuration: {reason}".format(reason=failure_), error=True
295 )
296 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
297 266
298 def getKeyName(self, k): 267 def getKeyName(self, k):
299 if not k.startswith("pubsub#"): 268 if self.args.full_prefix or k.startswith("pubsub#"):
269 return k
270 else:
300 return "pubsub#" + k 271 return "pubsub#" + k
301 else: 272
302 return k 273 async def start(self):
303 274 try:
304 def start(self): 275 await self.host.bridge.psNodeConfigurationSet(
305 self.host.bridge.psNodeConfigurationSet( 276 self.args.service,
306 self.args.service, 277 self.args.node,
307 self.args.node, 278 {self.getKeyName(k): v for k, v in self.args.fields},
308 {self.getKeyName(k): v for k, v in self.args.fields}, 279 self.profile,
309 self.profile, 280 )
310 callback=self.psNodeConfigurationSetCb, 281 except Exception as e:
311 errback=self.psNodeConfigurationSetEb, 282 self.disp(f"can't set node configuration: {e}", error=True)
312 ) 283 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
284 else:
285 self.disp(_("node configuration successful"), 1)
286 self.host.quit()
313 287
314 288
315 class NodeImport(base.CommandBase): 289 class NodeImport(base.CommandBase):
316 290
317 def __init__(self, host): 291 def __init__(self, host):
320 "import", 294 "import",
321 use_pubsub=True, 295 use_pubsub=True,
322 pubsub_flags={C.NODE}, 296 pubsub_flags={C.NODE},
323 help=_("import raw XML to a node"), 297 help=_("import raw XML to a node"),
324 ) 298 )
325 self.need_loop = True
326 299
327 def add_parser_options(self): 300 def add_parser_options(self):
328 self.parser.add_argument( 301 self.parser.add_argument(
329 "--admin", 302 "--admin",
330 action="store_true", 303 action="store_true",
335 type=argparse.FileType(), 308 type=argparse.FileType(),
336 help=_("path to the XML file with data to import. The file must contain " 309 help=_("path to the XML file with data to import. The file must contain "
337 "whole XML of each item to import."), 310 "whole XML of each item to import."),
338 ) 311 )
339 312
340 def psItemsSendCb(self, item_ids): 313 async def start(self):
341 self.disp(_('items published with id(s) {item_ids}').format(
342 item_ids=', '.join(item_ids)))
343 self.host.quit()
344
345 def start(self):
346 try: 314 try:
347 element, etree = xml_tools.etreeParse(self, self.args.import_file, 315 element, etree = xml_tools.etreeParse(self, self.args.import_file,
348 reraise=True) 316 reraise=True)
349 except Exception as e: 317 except Exception as e:
350 from lxml.etree import XMLSyntaxError 318 from lxml.etree import XMLSyntaxError
362 if not all([i.tag == '{http://jabber.org/protocol/pubsub}item' for i in element]): 330 if not all([i.tag == '{http://jabber.org/protocol/pubsub}item' for i in element]):
363 self.disp( 331 self.disp(
364 _("You are not using list of pubsub items, we can't import this file"), 332 _("You are not using list of pubsub items, we can't import this file"),
365 error=True) 333 error=True)
366 self.host.quit(C.EXIT_DATA_ERROR) 334 self.host.quit(C.EXIT_DATA_ERROR)
367 335 return
368 items = [etree.tostring(i, encoding="utf-8") for i in element] 336
337 items = [etree.tostring(i, encoding="unicode") for i in element]
369 if self.args.admin: 338 if self.args.admin:
370 self.host.bridge.psAdminItemsSend( 339 method = self.host.bridge.psAdminItemsSend
340 else:
341 self.disp(_("Items are imported without using admin mode, publisher can't "
342 "be changed"))
343 method = self.host.bridge.psItemsSend
344
345 try:
346 items_ids = await method(
371 self.args.service, 347 self.args.service,
372 self.args.node, 348 self.args.node,
373 items, 349 items,
374 "", 350 "",
375 self.profile, 351 self.profile,
376 callback=partial(self.psItemsSendCb), 352 )
377 errback=partial( 353 except Exception as e:
378 self.errback, 354 self.disp(f"can't send items: {e}", error=True)
379 msg=_("can't send item: {}"), 355 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
380 exit_code=C.EXIT_BRIDGE_ERRBACK, 356 else:
381 ), 357 if items_ids:
382 ) 358 self.disp(_('items published with id(s) {items_ids}').format(
383 else: 359 items_ids=', '.join(items_ids)))
384 self.disp(_("Items are imported without using admin mode, publisher can't " 360 else:
385 "be changed")) 361 self.disp(_('items published'))
386 self.host.bridge.psItemsSend( 362 self.host.quit()
387 self.args.service,
388 self.args.node,
389 items,
390 "",
391 self.profile,
392 callback=partial(self.psItemsSendCb),
393 errback=partial(
394 self.errback,
395 msg=_("can't send item: {}"),
396 exit_code=C.EXIT_BRIDGE_ERRBACK,
397 ),
398 )
399 363
400 364
401 class NodeAffiliationsGet(base.CommandBase): 365 class NodeAffiliationsGet(base.CommandBase):
402 def __init__(self, host): 366 def __init__(self, host):
403 base.CommandBase.__init__( 367 base.CommandBase.__init__(
407 use_output=C.OUTPUT_DICT, 371 use_output=C.OUTPUT_DICT,
408 use_pubsub=True, 372 use_pubsub=True,
409 pubsub_flags={C.NODE}, 373 pubsub_flags={C.NODE},
410 help=_("retrieve node affiliations (for node owner)"), 374 help=_("retrieve node affiliations (for node owner)"),
411 ) 375 )
412 self.need_loop = True
413 376
414 def add_parser_options(self): 377 def add_parser_options(self):
415 pass 378 pass
416 379
417 def psNodeAffiliationsGetCb(self, affiliations): 380 async def start(self):
418 self.output(affiliations) 381 try:
419 self.host.quit() 382 affiliations = await self.host.bridge.psNodeAffiliationsGet(
420 383 self.args.service,
421 def psNodeAffiliationsGetEb(self, failure_): 384 self.args.node,
422 self.disp( 385 self.profile,
423 "can't get node affiliations: {reason}".format(reason=failure_), error=True 386 )
424 ) 387 except Exception as e:
425 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 388 self.disp(f"can't get node affiliations: {e}", error=True)
426 389 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
427 def start(self): 390 else:
428 self.host.bridge.psNodeAffiliationsGet( 391 await self.output(affiliations)
429 self.args.service, 392 self.host.quit()
430 self.args.node,
431 self.profile,
432 callback=self.psNodeAffiliationsGetCb,
433 errback=self.psNodeAffiliationsGetEb,
434 )
435 393
436 394
437 class NodeAffiliationsSet(base.CommandBase): 395 class NodeAffiliationsSet(base.CommandBase):
438 def __init__(self, host): 396 def __init__(self, host):
439 base.CommandBase.__init__( 397 base.CommandBase.__init__(
443 use_pubsub=True, 401 use_pubsub=True,
444 pubsub_flags={C.NODE}, 402 pubsub_flags={C.NODE},
445 use_verbose=True, 403 use_verbose=True,
446 help=_("set affiliations (for node owner)"), 404 help=_("set affiliations (for node owner)"),
447 ) 405 )
448 self.need_loop = True
449 406
450 def add_parser_options(self): 407 def add_parser_options(self):
451 # XXX: we use optional argument syntax for a required one because list of list of 2 elements 408 # XXX: we use optional argument syntax for a required one because list of list of 2 elements
452 # (uses to construct dicts) don't work with positional arguments 409 # (uses to construct dicts) don't work with positional arguments
453 self.parser.add_argument( 410 self.parser.add_argument(
459 action="append", 416 action="append",
460 nargs=2, 417 nargs=2,
461 help=_("entity/affiliation couple(s)"), 418 help=_("entity/affiliation couple(s)"),
462 ) 419 )
463 420
464 def psNodeAffiliationsSetCb(self): 421 async def start(self):
465 self.disp(_("affiliations have been set"), 1)
466 self.host.quit()
467
468 def psNodeAffiliationsSetEb(self, failure_):
469 self.disp(
470 "can't set node affiliations: {reason}".format(reason=failure_), error=True
471 )
472 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
473
474 def start(self):
475 affiliations = dict(self.args.affiliations) 422 affiliations = dict(self.args.affiliations)
476 self.host.bridge.psNodeAffiliationsSet( 423 try:
477 self.args.service, 424 await self.host.bridge.psNodeAffiliationsSet(
478 self.args.node, 425 self.args.service,
479 affiliations, 426 self.args.node,
480 self.profile, 427 affiliations,
481 callback=self.psNodeAffiliationsSetCb, 428 self.profile,
482 errback=self.psNodeAffiliationsSetEb, 429 )
483 ) 430 except Exception as e:
431 self.disp(f"can't set node affiliations: {e}", error=True)
432 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
433 else:
434 self.disp(_("affiliations have been set"), 1)
435 self.host.quit()
484 436
485 437
486 class NodeAffiliations(base.CommandBase): 438 class NodeAffiliations(base.CommandBase):
487 subcommands = (NodeAffiliationsGet, NodeAffiliationsSet) 439 subcommands = (NodeAffiliationsGet, NodeAffiliationsSet)
488 440
504 use_output=C.OUTPUT_DICT, 456 use_output=C.OUTPUT_DICT,
505 use_pubsub=True, 457 use_pubsub=True,
506 pubsub_flags={C.NODE}, 458 pubsub_flags={C.NODE},
507 help=_("retrieve node subscriptions (for node owner)"), 459 help=_("retrieve node subscriptions (for node owner)"),
508 ) 460 )
509 self.need_loop = True
510 461
511 def add_parser_options(self): 462 def add_parser_options(self):
512 pass 463 pass
513 464
514 def psNodeSubscriptionsGetCb(self, subscriptions): 465 async def start(self):
515 self.output(subscriptions) 466 try:
516 self.host.quit() 467 subscriptions = await self.host.bridge.psNodeSubscriptionsGet(
517 468 self.args.service,
518 def psNodeSubscriptionsGetEb(self, failure_): 469 self.args.node,
519 self.disp( 470 self.profile,
520 "can't get node subscriptions: {reason}".format(reason=failure_), error=True 471 )
521 ) 472 except Exception as e:
522 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 473 self.disp(f"can't get node subscriptions: {e}", error=True)
523 474 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
524 def start(self): 475 else:
525 self.host.bridge.psNodeSubscriptionsGet( 476 await self.output(subscriptions)
526 self.args.service, 477 self.host.quit()
527 self.args.node,
528 self.profile,
529 callback=self.psNodeSubscriptionsGetCb,
530 errback=self.psNodeSubscriptionsGetEb,
531 )
532 478
533 479
534 class StoreSubscriptionAction(argparse.Action): 480 class StoreSubscriptionAction(argparse.Action):
535 """Action which handle subscription parameter for owner 481 """Action which handle subscription parameter for owner
536 482
564 use_pubsub=True, 510 use_pubsub=True,
565 pubsub_flags={C.NODE}, 511 pubsub_flags={C.NODE},
566 use_verbose=True, 512 use_verbose=True,
567 help=_("set/modify subscriptions (for node owner)"), 513 help=_("set/modify subscriptions (for node owner)"),
568 ) 514 )
569 self.need_loop = True
570 515
571 def add_parser_options(self): 516 def add_parser_options(self):
572 # XXX: we use optional argument syntax for a required one because list of list of 2 elements 517 # XXX: we use optional argument syntax for a required one because list of list of 2 elements
573 # (uses to construct dicts) don't work with positional arguments 518 # (uses to construct dicts) don't work with positional arguments
574 self.parser.add_argument( 519 self.parser.add_argument(
581 required=True, 526 required=True,
582 action=StoreSubscriptionAction, 527 action=StoreSubscriptionAction,
583 help=_("entity/subscription couple(s)"), 528 help=_("entity/subscription couple(s)"),
584 ) 529 )
585 530
586 def psNodeSubscriptionsSetCb(self): 531 async def start(self):
587 self.disp(_("subscriptions have been set"), 1) 532 try:
588 self.host.quit() 533 self.host.bridge.psNodeSubscriptionsSet(
589 534 self.args.service,
590 def psNodeSubscriptionsSetEb(self, failure_): 535 self.args.node,
591 self.disp( 536 self.args.subscriptions,
592 "can't set node subscriptions: {reason}".format(reason=failure_), error=True 537 self.profile,
593 ) 538 )
594 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 539 except Exception as e:
595 540 self.disp(f"can't set node subscriptions: {e}", error=True)
596 def start(self): 541 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
597 self.host.bridge.psNodeSubscriptionsSet( 542 else:
598 self.args.service, 543 self.disp(_("subscriptions have been set"), 1)
599 self.args.node, 544 self.host.quit()
600 self.args.subscriptions,
601 self.profile,
602 callback=self.psNodeSubscriptionsSetCb,
603 errback=self.psNodeSubscriptionsSetEb,
604 )
605 545
606 546
607 class NodeSubscriptions(base.CommandBase): 547 class NodeSubscriptions(base.CommandBase):
608 subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet) 548 subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet)
609 549
625 use_pubsub=True, 565 use_pubsub=True,
626 pubsub_flags={C.NODE}, 566 pubsub_flags={C.NODE},
627 use_verbose=True, 567 use_verbose=True,
628 help=_("set/replace a schema"), 568 help=_("set/replace a schema"),
629 ) 569 )
630 self.need_loop = True
631 570
632 def add_parser_options(self): 571 def add_parser_options(self):
633 self.parser.add_argument("schema", help=_("schema to set (must be XML)")) 572 self.parser.add_argument("schema", help=_("schema to set (must be XML)"))
634 573
635 def psSchemaSetCb(self): 574 async def start(self):
636 self.disp(_("schema has been set"), 1) 575 try:
637 self.host.quit() 576 await self.host.bridge.psSchemaSet(
638 577 self.args.service,
639 def start(self): 578 self.args.node,
640 self.host.bridge.psSchemaSet( 579 self.args.schema,
641 self.args.service, 580 self.profile,
642 self.args.node, 581 )
643 self.args.schema, 582 except Exception as e:
644 self.profile, 583 self.disp(f"can't set schema: {e}", error=True)
645 callback=self.psSchemaSetCb, 584 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
646 errback=partial( 585 else:
647 self.errback, 586 self.disp(_("schema has been set"), 1)
648 msg=_("can't set schema: {}"), 587 self.host.quit()
649 exit_code=C.EXIT_BRIDGE_ERRBACK,
650 ),
651 )
652 588
653 589
654 class NodeSchemaEdit(base.CommandBase, common.BaseEdit): 590 class NodeSchemaEdit(base.CommandBase, common.BaseEdit):
655 use_items = False 591 use_items = False
656 592
664 use_draft=True, 600 use_draft=True,
665 use_verbose=True, 601 use_verbose=True,
666 help=_("edit a schema"), 602 help=_("edit a schema"),
667 ) 603 )
668 common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR) 604 common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR)
669 self.need_loop = True
670 605
671 def add_parser_options(self): 606 def add_parser_options(self):
672 pass 607 pass
673 608
674 def psSchemaSetCb(self): 609 async def publish(self, schema):
675 self.disp(_("schema has been set"), 1) 610 try:
676 self.host.quit() 611 await self.host.bridge.psSchemaSet(
677 612 self.args.service,
678 def publish(self, schema): 613 self.args.node,
679 self.host.bridge.psSchemaSet( 614 schema,
680 self.args.service, 615 self.profile,
681 self.args.node, 616 )
682 schema, 617 except Exception as e:
683 self.profile, 618 self.disp(f"can't set schema: {e}", error=True)
684 callback=self.psSchemaSetCb, 619 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
685 errback=partial( 620 else:
686 self.errback, 621 self.disp(_("schema has been set"), 1)
687 msg=_("can't set schema: {}"), 622 self.host.quit()
688 exit_code=C.EXIT_BRIDGE_ERRBACK, 623
689 ), 624 async def psSchemaGetCb(self, schema):
690 )
691
692 def psSchemaGetCb(self, schema):
693 try: 625 try:
694 from lxml import etree 626 from lxml import etree
695 except ImportError: 627 except ImportError:
696 self.disp('lxml module must be installed to use edit, please install it ' 628 self.disp('lxml module must be installed to use edit, please install it '
697 'with "pip install lxml"', 629 'with "pip install lxml"',
705 schema_elt = etree.fromstring(schema, parser) 637 schema_elt = etree.fromstring(schema, parser)
706 content_file_obj.write( 638 content_file_obj.write(
707 etree.tostring(schema_elt, encoding="utf-8", pretty_print=True) 639 etree.tostring(schema_elt, encoding="utf-8", pretty_print=True)
708 ) 640 )
709 content_file_obj.seek(0) 641 content_file_obj.seek(0)
710 self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj) 642 await self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj)
711 643
712 def start(self): 644 async def start(self):
713 self.host.bridge.psSchemaGet( 645 try:
714 self.args.service, 646 schema = await self.host.bridge.psSchemaGet(
715 self.args.node, 647 self.args.service,
716 self.profile, 648 self.args.node,
717 callback=self.psSchemaGetCb, 649 self.profile,
718 errback=partial( 650 )
719 self.errback, 651 except Exception as e:
720 msg=_("can't edit schema: {}"), 652 self.disp(f"can't edit schema: {e}", error=True)
721 exit_code=C.EXIT_BRIDGE_ERRBACK, 653 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
722 ), 654 else:
723 ) 655 await self.psSchemaGetCb(schema)
724 656
725 657
726 class NodeSchemaGet(base.CommandBase): 658 class NodeSchemaGet(base.CommandBase):
727 def __init__(self, host): 659 def __init__(self, host):
728 base.CommandBase.__init__( 660 base.CommandBase.__init__(
733 use_pubsub=True, 665 use_pubsub=True,
734 pubsub_flags={C.NODE}, 666 pubsub_flags={C.NODE},
735 use_verbose=True, 667 use_verbose=True,
736 help=_("get schema"), 668 help=_("get schema"),
737 ) 669 )
738 self.need_loop = True
739 670
740 def add_parser_options(self): 671 def add_parser_options(self):
741 pass 672 pass
742 673
743 def psSchemaGetCb(self, schema): 674 async def start(self):
744 if not schema: 675 try:
745 self.disp(_("no schema found"), 1) 676 schema = await self.host.bridge.psSchemaGet(
746 self.host.quit(1) 677 self.args.service,
747 self.output(schema) 678 self.args.node,
748 self.host.quit() 679 self.profile,
749 680 )
750 def start(self): 681 except Exception as e:
751 self.host.bridge.psSchemaGet( 682 self.disp(f"can't get schema: {e}", error=True)
752 self.args.service, 683 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
753 self.args.node, 684 else:
754 self.profile, 685 if schema:
755 callback=self.psSchemaGetCb, 686 await self.output(schema)
756 errback=partial( 687 self.host.quit()
757 self.errback, 688 else:
758 msg=_("can't get schema: {}"), 689 self.disp(_("no schema found"), 1)
759 exit_code=C.EXIT_BRIDGE_ERRBACK, 690 self.host.quit(1)
760 ),
761 )
762 691
763 692
764 class NodeSchema(base.CommandBase): 693 class NodeSchema(base.CommandBase):
765 subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet) 694 subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet)
766 695
797 "set", 726 "set",
798 use_pubsub=True, 727 use_pubsub=True,
799 pubsub_flags={C.NODE}, 728 pubsub_flags={C.NODE},
800 help=_("publish a new item or update an existing one"), 729 help=_("publish a new item or update an existing one"),
801 ) 730 )
802 self.need_loop = True
803 731
804 def add_parser_options(self): 732 def add_parser_options(self):
805 self.parser.add_argument( 733 self.parser.add_argument(
806 "item", 734 "item",
807 nargs="?", 735 nargs="?",
808 default="", 736 default="",
809 help=_("id, URL of the item to update, keyword, or nothing for new item"), 737 help=_("id, URL of the item to update, keyword, or nothing for new item"),
810 ) 738 )
811 739
812 def psItemsSendCb(self, published_id): 740 async def start(self):
813 if published_id:
814 self.disp("Item published at {pub_id}".format(pub_id=published_id))
815 else:
816 self.disp("Item published")
817 self.host.quit(C.EXIT_OK)
818
819 def start(self):
820 element, etree = xml_tools.etreeParse(self, sys.stdin) 741 element, etree = xml_tools.etreeParse(self, sys.stdin)
821 element = xml_tools.getPayload(self, element) 742 element = xml_tools.getPayload(self, element)
822 payload = etree.tostring(element, encoding="unicode") 743 payload = etree.tostring(element, encoding="unicode")
823 744
824 self.host.bridge.psItemSend( 745 try:
825 self.args.service, 746 published_id = await self.host.bridge.psItemSend(
826 self.args.node, 747 self.args.service,
827 payload, 748 self.args.node,
828 self.args.item, 749 payload,
829 {}, 750 self.args.item,
830 self.profile, 751 {},
831 callback=self.psItemsSendCb, 752 self.profile,
832 errback=partial( 753 )
833 self.errback, 754 except Exception as e:
834 msg=_("can't send item: {}"), 755 self.disp(_(f"can't send item: {e}"), error=True)
835 exit_code=C.EXIT_BRIDGE_ERRBACK, 756 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
836 ), 757 else:
837 ) 758 if published_id:
759 self.disp("Item published at {pub_id}".format(pub_id=published_id))
760 else:
761 self.disp("Item published")
762 self.host.quit(C.EXIT_OK)
838 763
839 764
840 class Get(base.CommandBase): 765 class Get(base.CommandBase):
841 def __init__(self, host): 766 def __init__(self, host):
842 base.CommandBase.__init__( 767 base.CommandBase.__init__(
846 use_output=C.OUTPUT_LIST_XML, 771 use_output=C.OUTPUT_LIST_XML,
847 use_pubsub=True, 772 use_pubsub=True,
848 pubsub_flags={C.NODE, C.MULTI_ITEMS}, 773 pubsub_flags={C.NODE, C.MULTI_ITEMS},
849 help=_("get pubsub item(s)"), 774 help=_("get pubsub item(s)"),
850 ) 775 )
851 self.need_loop = True
852 776
853 def add_parser_options(self): 777 def add_parser_options(self):
854 self.parser.add_argument( 778 self.parser.add_argument(
855 "-S", 779 "-S",
856 "--sub-id", 780 "--sub-id",
858 help=_("subscription id"), 782 help=_("subscription id"),
859 ) 783 )
860 #  TODO: a key(s) argument to select keys to display 784 #  TODO: a key(s) argument to select keys to display
861 # TODO: add MAM filters 785 # TODO: add MAM filters
862 786
863 def psItemsGetCb(self, ps_result): 787 async def start(self):
864 self.output(ps_result[0]) 788 try:
865 self.host.quit(C.EXIT_OK) 789 ps_result = await self.host.bridge.psItemsGet(
866 790 self.args.service,
867 def psItemsGetEb(self, failure_): 791 self.args.node,
868 self.disp("can't get pubsub items: {reason}".format(reason=failure_), error=True) 792 self.args.max,
869 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 793 self.args.items,
870 794 self.args.sub_id,
871 def start(self): 795 self.getPubsubExtra(),
872 self.host.bridge.psItemsGet( 796 self.profile,
873 self.args.service, 797 )
874 self.args.node, 798 except Exception as e:
875 self.args.max, 799 self.disp(f"can't get pubsub items: {e}", error=True)
876 self.args.items, 800 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
877 self.args.sub_id, 801 else:
878 self.getPubsubExtra(), 802 await self.output(ps_result[0])
879 self.profile, 803 self.host.quit(C.EXIT_OK)
880 callback=self.psItemsGetCb,
881 errback=self.psItemsGetEb,
882 )
883 804
884 805
885 class Delete(base.CommandBase): 806 class Delete(base.CommandBase):
886 def __init__(self, host): 807 def __init__(self, host):
887 base.CommandBase.__init__( 808 base.CommandBase.__init__(
890 "delete", 811 "delete",
891 use_pubsub=True, 812 use_pubsub=True,
892 pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM}, 813 pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM},
893 help=_("delete an item"), 814 help=_("delete an item"),
894 ) 815 )
895 self.need_loop = True
896 816
897 def add_parser_options(self): 817 def add_parser_options(self):
898 self.parser.add_argument( 818 self.parser.add_argument(
899 "-f", "--force", action="store_true", help=_("delete without confirmation") 819 "-f", "--force", action="store_true", help=_("delete without confirmation")
900 ) 820 )
901 self.parser.add_argument( 821 self.parser.add_argument(
902 "-N", "--notify", action="store_true", help=_("notify deletion") 822 "-N", "--notify", action="store_true", help=_("notify deletion")
903 ) 823 )
904 824
905 def psItemsDeleteCb(self): 825 async def start(self):
906 self.disp(_("item {item_id} has been deleted").format(item_id=self.args.item))
907 self.host.quit(C.EXIT_OK)
908
909 def start(self):
910 if not self.args.item: 826 if not self.args.item:
911 self.parser.error(_("You need to specify an item to delete")) 827 self.parser.error(_("You need to specify an item to delete"))
912 if not self.args.force: 828 if not self.args.force:
913 message = _("Are you sure to delete item {item_id} ?").format( 829 message = _("Are you sure to delete item {item_id} ?").format(
914 item_id=self.args.item 830 item_id=self.args.item
915 ) 831 )
916 self.host.confirmOrQuit(message, _("item deletion cancelled")) 832 await self.host.confirmOrQuit(message, _("item deletion cancelled"))
917 self.host.bridge.psRetractItem( 833 try:
918 self.args.service, 834 await self.host.bridge.psRetractItem(
919 self.args.node, 835 self.args.service,
920 self.args.item, 836 self.args.node,
921 self.args.notify, 837 self.args.item,
922 self.profile, 838 self.args.notify,
923 callback=self.psItemsDeleteCb, 839 self.profile,
924 errback=partial( 840 )
925 self.errback, 841 except Exception as e:
926 msg=_("can't delete item: {}"), 842 self.disp(_(f"can't delete item: {e}"), error=True)
927 exit_code=C.EXIT_BRIDGE_ERRBACK, 843 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
928 ), 844 else:
929 ) 845 self.disp(_(f"item {self.args.item} has been deleted"))
846 self.host.quit(C.EXIT_OK)
930 847
931 848
932 class Edit(base.CommandBase, common.BaseEdit): 849 class Edit(base.CommandBase, common.BaseEdit):
933 def __init__(self, host): 850 def __init__(self, host):
934 base.CommandBase.__init__( 851 base.CommandBase.__init__(
944 common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR) 861 common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR)
945 862
946 def add_parser_options(self): 863 def add_parser_options(self):
947 pass 864 pass
948 865
949 def edit(self, content_file_path, content_file_obj): 866 async def publish(self, content):
950 # we launch editor 867 published_id = await self.host.bridge.psItemSend(
951 self.runEditor("pubsub_editor_args", content_file_path, content_file_obj)
952
953 def publish(self, content):
954 published_id = self.host.bridge.psItemSend(
955 self.pubsub_service, 868 self.pubsub_service,
956 self.pubsub_node, 869 self.pubsub_node,
957 content, 870 content,
958 self.pubsub_item or "", 871 self.pubsub_item or "",
959 {}, 872 {},
962 if published_id: 875 if published_id:
963 self.disp("Item published at {pub_id}".format(pub_id=published_id)) 876 self.disp("Item published at {pub_id}".format(pub_id=published_id))
964 else: 877 else:
965 self.disp("Item published") 878 self.disp("Item published")
966 879
967 def getItemData(self, service, node, item): 880 async def getItemData(self, service, node, item):
968 try: 881 try:
969 from lxml import etree 882 from lxml import etree
970 except ImportError: 883 except ImportError:
971 self.disp('lxml module must be installed to use edit, please install it ' 884 self.disp('lxml module must be installed to use edit, please install it '
972 'with "pip install lxml"', 885 'with "pip install lxml"',
973 error=True, 886 error=True,
974 ) 887 )
975 self.host.quit(1) 888 self.host.quit(1)
976 items = [item] if item else [] 889 items = [item] if item else []
977 item_raw = self.host.bridge.psItemsGet( 890 item_raw = (await self.host.bridge.psItemsGet(
978 service, node, 1, items, "", {}, self.profile 891 service, node, 1, items, "", {}, self.profile
979 )[0][0] 892 ))[0][0]
980 parser = etree.XMLParser(remove_blank_text=True) 893 parser = etree.XMLParser(remove_blank_text=True, recover=True)
981 item_elt = etree.fromstring(item_raw, parser) 894 item_elt = etree.fromstring(item_raw, parser)
982 item_id = item_elt.get("id") 895 item_id = item_elt.get("id")
983 try: 896 try:
984 payload = item_elt[0] 897 payload = item_elt[0]
985 except IndexError: 898 except IndexError:
986 self.disp(_("Item has not payload"), 1) 899 self.disp(_("Item has not payload"), 1)
987 return "" 900 return ""
988 return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id 901 return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id
989 902
990 def start(self): 903 async def start(self):
991 self.pubsub_service, self.pubsub_node, self.pubsub_item, content_file_path, content_file_obj = ( 904 (self.pubsub_service,
992 self.getItemPath() 905 self.pubsub_node,
993 ) 906 self.pubsub_item,
994 self.edit(content_file_path, content_file_obj) 907 content_file_path,
908 content_file_obj) = await self.getItemPath()
909 await self.runEditor("pubsub_editor_args", content_file_path, content_file_obj)
910 self.host.quit()
995 911
996 912
997 class Subscribe(base.CommandBase): 913 class Subscribe(base.CommandBase):
998 def __init__(self, host): 914 def __init__(self, host):
999 base.CommandBase.__init__( 915 base.CommandBase.__init__(
1003 use_pubsub=True, 919 use_pubsub=True,
1004 pubsub_flags={C.NODE}, 920 pubsub_flags={C.NODE},
1005 use_verbose=True, 921 use_verbose=True,
1006 help=_("subscribe to a node"), 922 help=_("subscribe to a node"),
1007 ) 923 )
1008 self.need_loop = True
1009 924
1010 def add_parser_options(self): 925 def add_parser_options(self):
1011 pass 926 pass
1012 927
1013 def psSubscribeCb(self, sub_id): 928 async def start(self):
1014 self.disp(_("subscription done"), 1) 929 try:
1015 if sub_id: 930 sub_id = await self.host.bridge.psSubscribe(
1016 self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id)) 931 self.args.service,
1017 self.host.quit() 932 self.args.node,
1018 933 {},
1019 def start(self): 934 self.profile,
1020 self.host.bridge.psSubscribe( 935 )
1021 self.args.service, 936 except Exception as e:
1022 self.args.node, 937 self.disp(_(f"can't subscribe to node: {e}"), error=True)
1023 {}, 938 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1024 self.profile, 939 else:
1025 callback=self.psSubscribeCb, 940 self.disp(_("subscription done"), 1)
1026 errback=partial( 941 if sub_id:
1027 self.errback, 942 self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id))
1028 msg=_("can't subscribe to node: {}"), 943 self.host.quit()
1029 exit_code=C.EXIT_BRIDGE_ERRBACK,
1030 ),
1031 )
1032 944
1033 945
1034 class Unsubscribe(base.CommandBase): 946 class Unsubscribe(base.CommandBase):
1035 # TODO: voir pourquoi NodeNotFound sur subscribe juste après unsubscribe 947 # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe
1036 948
1037 def __init__(self, host): 949 def __init__(self, host):
1038 base.CommandBase.__init__( 950 base.CommandBase.__init__(
1039 self, 951 self,
1040 host, 952 host,
1042 use_pubsub=True, 954 use_pubsub=True,
1043 pubsub_flags={C.NODE}, 955 pubsub_flags={C.NODE},
1044 use_verbose=True, 956 use_verbose=True,
1045 help=_("unsubscribe from a node"), 957 help=_("unsubscribe from a node"),
1046 ) 958 )
1047 self.need_loop = True
1048 959
1049 def add_parser_options(self): 960 def add_parser_options(self):
1050 pass 961 pass
1051 962
1052 def psUnsubscribeCb(self): 963 async def start(self):
1053 self.disp(_("subscription removed"), 1) 964 try:
1054 self.host.quit() 965 await self.host.bridge.psUnsubscribe(
1055 966 self.args.service,
1056 def start(self): 967 self.args.node,
1057 self.host.bridge.psUnsubscribe( 968 self.profile,
1058 self.args.service, 969 )
1059 self.args.node, 970 except Exception as e:
1060 self.profile, 971 self.disp(_(f"can't unsubscribe from node: {e}"), error=True)
1061 callback=self.psUnsubscribeCb, 972 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1062 errback=partial( 973 else:
1063 self.errback, 974 self.disp(_("subscription removed"), 1)
1064 msg=_("can't unsubscribe from node: {}"), 975 self.host.quit()
1065 exit_code=C.EXIT_BRIDGE_ERRBACK,
1066 ),
1067 )
1068 976
1069 977
1070 class Subscriptions(base.CommandBase): 978 class Subscriptions(base.CommandBase):
1071 def __init__(self, host): 979 def __init__(self, host):
1072 base.CommandBase.__init__( 980 base.CommandBase.__init__(
1075 "subscriptions", 983 "subscriptions",
1076 use_output=C.OUTPUT_LIST_DICT, 984 use_output=C.OUTPUT_LIST_DICT,
1077 use_pubsub=True, 985 use_pubsub=True,
1078 help=_("retrieve all subscriptions on a service"), 986 help=_("retrieve all subscriptions on a service"),
1079 ) 987 )
1080 self.need_loop = True
1081 988
1082 def add_parser_options(self): 989 def add_parser_options(self):
1083 pass 990 pass
1084 991
1085 def psSubscriptionsGetCb(self, subscriptions): 992 async def start(self):
1086 self.output(subscriptions) 993 try:
1087 self.host.quit() 994 subscriptions = await self.host.bridge.psSubscriptionsGet(
1088 995 self.args.service,
1089 def start(self): 996 self.args.node,
1090 self.host.bridge.psSubscriptionsGet( 997 self.profile,
1091 self.args.service, 998 )
1092 self.args.node, 999 except Exception as e:
1093 self.profile, 1000 self.disp(_(f"can't retrieve subscriptions: {e}"), error=True)
1094 callback=self.psSubscriptionsGetCb, 1001 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1095 errback=partial( 1002 else:
1096 self.errback, 1003 await self.output(subscriptions)
1097 msg=_("can't retrieve subscriptions: {}"), 1004 self.host.quit()
1098 exit_code=C.EXIT_BRIDGE_ERRBACK,
1099 ),
1100 )
1101 1005
1102 1006
1103 class Affiliations(base.CommandBase): 1007 class Affiliations(base.CommandBase):
1104 def __init__(self, host): 1008 def __init__(self, host):
1105 base.CommandBase.__init__( 1009 base.CommandBase.__init__(
1108 "affiliations", 1012 "affiliations",
1109 use_output=C.OUTPUT_DICT, 1013 use_output=C.OUTPUT_DICT,
1110 use_pubsub=True, 1014 use_pubsub=True,
1111 help=_("retrieve all affiliations on a service"), 1015 help=_("retrieve all affiliations on a service"),
1112 ) 1016 )
1113 self.need_loop = True
1114 1017
1115 def add_parser_options(self): 1018 def add_parser_options(self):
1116 pass 1019 pass
1117 1020
1118 def psAffiliationsGetCb(self, affiliations): 1021 async def start(self):
1119 self.output(affiliations) 1022 try:
1120 self.host.quit() 1023 affiliations = await self.host.bridge.psAffiliationsGet(
1121 1024 self.args.service,
1122 def psAffiliationsGetEb(self, failure_): 1025 self.args.node,
1123 self.disp( 1026 self.profile,
1124 "can't get node affiliations: {reason}".format(reason=failure_), error=True 1027 )
1125 ) 1028 except Exception as e:
1126 self.host.quit(C.EXIT_BRIDGE_ERRBACK) 1029 self.disp(
1127 1030 f"can't get node affiliations: {e}", error=True
1128 def start(self): 1031 )
1129 self.host.bridge.psAffiliationsGet( 1032 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1130 self.args.service, 1033 else:
1131 self.args.node, 1034 await self.output(affiliations)
1132 self.profile, 1035 self.host.quit()
1133 callback=self.psAffiliationsGetCb,
1134 errback=self.psAffiliationsGetEb,
1135 )
1136 1036
1137 1037
1138 class Search(base.CommandBase): 1038 class Search(base.CommandBase):
1139 """this command do a search without using MAM 1039 """This command do a search without using MAM
1140 1040
1141 This commands checks every items it finds by itself, 1041 This commands checks every items it finds by itself,
1142 so it may be heavy in resources both for server and client 1042 so it may be heavy in resources both for server and client
1143 """ 1043 """
1144 1044
1157 use_pubsub=True, 1057 use_pubsub=True,
1158 pubsub_flags={C.MULTI_ITEMS, C.NO_MAX}, 1058 pubsub_flags={C.MULTI_ITEMS, C.NO_MAX},
1159 use_verbose=True, 1059 use_verbose=True,
1160 help=_("search items corresponding to filters"), 1060 help=_("search items corresponding to filters"),
1161 ) 1061 )
1162 self.need_loop = True
1163 1062
1164 @property 1063 @property
1165 def etree(self): 1064 def etree(self):
1166 """load lxml.etree only if needed""" 1065 """load lxml.etree only if needed"""
1167 if self._etree is None: 1066 if self._etree is None:
1315 choices=("print", "exec", "external"), 1214 choices=("print", "exec", "external"),
1316 help=_("action to do on found items (DEFAULT: print)"), 1215 help=_("action to do on found items (DEFAULT: print)"),
1317 ) 1216 )
1318 self.parser.add_argument("command", nargs=argparse.REMAINDER) 1217 self.parser.add_argument("command", nargs=argparse.REMAINDER)
1319 1218
1320 def psItemsGetEb(self, failure_, service, node): 1219 async def getItems(self, depth, service, node, items):
1321 self.disp(
1322 "can't get pubsub items at {service} (node: {node}): {reason}".format(
1323 service=service, node=node, reason=failure_
1324 ),
1325 error=True,
1326 )
1327 self.to_get -= 1
1328
1329 def getItems(self, depth, service, node, items):
1330 search = partial(self.search, depth=depth)
1331 errback = partial(self.psItemsGetEb, service=service, node=node)
1332 self.host.bridge.psItemsGet(
1333 service,
1334 node,
1335 self.args.node_max,
1336 items,
1337 "",
1338 self.getPubsubExtra(),
1339 self.profile,
1340 callback=search,
1341 errback=errback,
1342 )
1343 self.to_get += 1 1220 self.to_get += 1
1221 try:
1222 items_data = await self.host.bridge.psItemsGet(
1223 service,
1224 node,
1225 self.args.node_max,
1226 items,
1227 "",
1228 self.getPubsubExtra(),
1229 self.profile,
1230 )
1231 except Exception as e:
1232 self.disp(
1233 f"can't get pubsub items at {service} (node: {node}): {e}",
1234 error=True,
1235 )
1236 self.to_get -= 1
1237 else:
1238 await self.search(items_data, depth)
1344 1239
1345 def _checkPubsubURL(self, match, found_nodes): 1240 def _checkPubsubURL(self, match, found_nodes):
1346 """check that the matched URL is an xmpp: one 1241 """check that the matched URL is an xmpp: one
1347 1242
1348 @param found_nodes(list[unicode]): found_nodes 1243 @param found_nodes(list[unicode]): found_nodes
1358 found_node = {"service": url_data["path"], "node": url_data["node"]} 1253 found_node = {"service": url_data["path"], "node": url_data["node"]}
1359 if "item" in url_data: 1254 if "item" in url_data:
1360 found_node["item"] = url_data["item"] 1255 found_node["item"] = url_data["item"]
1361 found_nodes.append(found_node) 1256 found_nodes.append(found_node)
1362 1257
1363 def getSubNodes(self, item, depth): 1258 async def getSubNodes(self, item, depth):
1364 """look for pubsub URIs in item, and getItems on the linked nodes""" 1259 """look for pubsub URIs in item, and getItems on the linked nodes"""
1365 found_nodes = [] 1260 found_nodes = []
1366 checkURI = partial(self._checkPubsubURL, found_nodes=found_nodes) 1261 checkURI = partial(self._checkPubsubURL, found_nodes=found_nodes)
1367 strings.RE_URL.sub(checkURI, item) 1262 strings.RE_URL.sub(checkURI, item)
1368 for data in found_nodes: 1263 for data in found_nodes:
1369 self.getItems( 1264 await self.getItems(
1370 depth + 1, 1265 depth + 1,
1371 data["service"], 1266 data["service"],
1372 data["node"], 1267 data["node"],
1373 [data["item"]] if "item" in data else [], 1268 [data["item"]] if "item" in data else [],
1374 ) 1269 )
1450 item = str(item_xml) 1345 item = str(item_xml)
1451 item_xml = None 1346 item_xml = None
1452 elif type_ == "python": 1347 elif type_ == "python":
1453 if item_xml is None: 1348 if item_xml is None:
1454 item_xml = self.parseXml(item) 1349 item_xml = self.parseXml(item)
1455 cmd_ns = {"item": item, "item_xml": item_xml} 1350 cmd_ns = {
1351 "etree": self.etree,
1352 "item": item,
1353 "item_xml": item_xml
1354 }
1456 try: 1355 try:
1457 keep = eval(value, cmd_ns) 1356 keep = eval(value, cmd_ns)
1458 except SyntaxError as e: 1357 except SyntaxError as e:
1459 self.disp(str(e), error=True) 1358 self.disp(str(e), error=True)
1460 self.host.quit(C.EXIT_BAD_ARG) 1359 self.host.quit(C.EXIT_BAD_ARG)
1481 if not keep: 1380 if not keep:
1482 return False, item 1381 return False, item
1483 1382
1484 return True, item 1383 return True, item
1485 1384
1486 def doItemAction(self, item, metadata): 1385 async def doItemAction(self, item, metadata):
1487 """called when item has been kepts and the action need to be done 1386 """called when item has been kepts and the action need to be done
1488 1387
1489 @param item(unicode): accepted item 1388 @param item(unicode): accepted item
1490 """ 1389 """
1491 action = self.args.action 1390 action = self.args.action
1492 if action == "print" or self.host.verbosity > 0: 1391 if action == "print" or self.host.verbosity > 0:
1493 try: 1392 try:
1494 self.output(item) 1393 await self.output(item)
1495 except self.etree.XMLSyntaxError: 1394 except self.etree.XMLSyntaxError:
1496 # item is not valid XML, but a string 1395 # item is not valid XML, but a string
1497 # can happen when --only-matching is used 1396 # can happen when --only-matching is used
1498 self.disp(item) 1397 self.disp(item)
1499 if action in self.EXEC_ACTIONS: 1398 if action in self.EXEC_ACTIONS:
1519 command=" ".join([arg_tools.escape(a) for a in cmd_args]) 1418 command=" ".join([arg_tools.escape(a) for a in cmd_args])
1520 ), 1419 ),
1521 2, 1420 2,
1522 ) 1421 )
1523 if action == "exec": 1422 if action == "exec":
1524 ret = subprocess.call(cmd_args) 1423 p = await asyncio.create_subprocess_exec(*cmd_args)
1424 ret = await p.wait()
1525 else: 1425 else:
1526 p = subprocess.Popen(cmd_args, stdin=subprocess.PIPE) 1426 p = await asyncio.create_subprocess_exec(*cmd_args,
1527 p.communicate(item.encode("utf-8")) 1427 stdin=subprocess.PIPE)
1528 ret = p.wait() 1428 await p.communicate(item.encode(sys.getfilesystemencoding()))
1429 ret = p.returncode
1529 if ret != 0: 1430 if ret != 0:
1530 self.disp( 1431 self.disp(
1531 A.color( 1432 A.color(
1532 C.A_FAILURE, 1433 C.A_FAILURE,
1533 _("executed command failed with exit code {code}").format( 1434 _(f"executed command failed with exit code {ret}"),
1534 code=ret
1535 ),
1536 ) 1435 )
1537 ) 1436 )
1538 1437
1539 def search(self, items_data, depth): 1438 async def search(self, items_data, depth):
1540 """callback of getItems 1439 """callback of getItems
1541 1440
1542 this method filters items, get sub nodes if needed, 1441 this method filters items, get sub nodes if needed,
1543 do the requested action, and exit the command when everything is done 1442 do the requested action, and exit the command when everything is done
1544 @param items_data(tuple): result of getItems 1443 @param items_data(tuple): result of getItems
1546 0 for first node, 1 for first children, and so on 1445 0 for first node, 1 for first children, and so on
1547 """ 1446 """
1548 items, metadata = items_data 1447 items, metadata = items_data
1549 for item in items: 1448 for item in items:
1550 if depth < self.args.max_depth: 1449 if depth < self.args.max_depth:
1551 self.getSubNodes(item, depth) 1450 await self.getSubNodes(item, depth)
1552 keep, item = self.filter(item) 1451 keep, item = self.filter(item)
1553 if not keep: 1452 if not keep:
1554 continue 1453 continue
1555 self.doItemAction(item, metadata) 1454 await self.doItemAction(item, metadata)
1556 1455
1557 #  we check if we got all getItems results 1456 #  we check if we got all getItems results
1558 self.to_get -= 1 1457 self.to_get -= 1
1559 if self.to_get == 0: 1458 if self.to_get == 0:
1560 # yes, we can quit 1459 # yes, we can quit
1561 self.host.quit() 1460 self.host.quit()
1562 assert self.to_get > 0 1461 assert self.to_get > 0
1563 1462
1564 def start(self): 1463 async def start(self):
1565 if self.args.command: 1464 if self.args.command:
1566 if self.args.action not in self.EXEC_ACTIONS: 1465 if self.args.action not in self.EXEC_ACTIONS:
1567 self.parser.error( 1466 self.parser.error(
1568 _("Command can only be used with {actions} actions").format( 1467 _("Command can only be used with {actions} actions").format(
1569 actions=", ".join(self.EXEC_ACTIONS) 1468 actions=", ".join(self.EXEC_ACTIONS)
1582 if self.args.filters is None: 1481 if self.args.filters is None:
1583 self.args.filters = [] 1482 self.args.filters = []
1584 self.args.namespace = dict( 1483 self.args.namespace = dict(
1585 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] 1484 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")]
1586 ) 1485 )
1587 self.getItems(0, self.args.service, self.args.node, self.args.items) 1486 await self.getItems(0, self.args.service, self.args.node, self.args.items)
1588 1487
1589 1488
1590 class Transform(base.CommandBase): 1489 class Transform(base.CommandBase):
1591 def __init__(self, host): 1490 def __init__(self, host):
1592 base.CommandBase.__init__( 1491 base.CommandBase.__init__(
1595 "transform", 1494 "transform",
1596 use_pubsub=True, 1495 use_pubsub=True,
1597 pubsub_flags={C.NODE, C.MULTI_ITEMS}, 1496 pubsub_flags={C.NODE, C.MULTI_ITEMS},
1598 help=_("modify items of a node using an external command/script"), 1497 help=_("modify items of a node using an external command/script"),
1599 ) 1498 )
1600 self.need_loop = True
1601 1499
1602 def add_parser_options(self): 1500 def add_parser_options(self):
1603 self.parser.add_argument( 1501 self.parser.add_argument(
1604 "--apply", 1502 "--apply",
1605 action="store_true", 1503 action="store_true",
1628 help=_("path to the command to use. Will be called repetitivly with an " 1526 help=_("path to the command to use. Will be called repetitivly with an "
1629 "item as input. Output (full item XML) will be used as new one. " 1527 "item as input. Output (full item XML) will be used as new one. "
1630 'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), 1528 'Return "DELETE" string to delete the item, and "SKIP" to ignore it'),
1631 ) 1529 )
1632 1530
1633 def psItemsSendCb(self, item_ids, metadata): 1531 async def psItemsSendCb(self, item_ids, metadata):
1634 if item_ids: 1532 if item_ids:
1635 self.disp(_('items published with ids {item_ids}').format( 1533 self.disp(_('items published with ids {item_ids}').format(
1636 item_ids=', '.join(item_ids))) 1534 item_ids=', '.join(item_ids)))
1637 else: 1535 else:
1638 self.disp(_('items published')) 1536 self.disp(_('items published'))
1639 if self.args.all: 1537 if self.args.all:
1640 return self.handleNextPage(metadata) 1538 return await self.handleNextPage(metadata)
1641 else: 1539 else:
1642 self.host.quit() 1540 self.host.quit()
1643 1541
1644 def handleNextPage(self, metadata): 1542 async def handleNextPage(self, metadata):
1645 """Retrieve new page through RSM or quit if we're in the last page 1543 """Retrieve new page through RSM or quit if we're in the last page
1646 1544
1647 use to handle --all option 1545 use to handle --all option
1648 @param metadata(dict): metadata as returned by psItemsGet 1546 @param metadata(dict): metadata as returned by psItemsGet
1649 """ 1547 """
1670 ) 1568 )
1671 ) 1569 )
1672 1570
1673 extra = self.getPubsubExtra() 1571 extra = self.getPubsubExtra()
1674 extra['rsm_after'] = last 1572 extra['rsm_after'] = last
1675 self.host.bridge.psItemsGet( 1573 try:
1676 self.args.service, 1574 ps_result = await self.host.bridge.psItemsGet(
1677 self.args.node, 1575 self.args.service,
1678 self.args.rsm_max, 1576 self.args.node,
1679 self.args.items, 1577 self.args.rsm_max,
1680 "", 1578 self.args.items,
1681 extra, 1579 "",
1682 self.profile, 1580 extra,
1683 callback=self.psItemsGetCb, 1581 self.profile,
1684 errback=partial( 1582 )
1685 self.errback, 1583 except Exception as e:
1686 msg=_("can't retrieve items: {}"), 1584 self.disp(
1687 exit_code=C.EXIT_BRIDGE_ERRBACK, 1585 f"can't retrieve items: {e}", error=True
1688 ), 1586 )
1689 ) 1587 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1690 1588 else:
1691 def psItemsGetCb(self, ps_result): 1589 await self.psItemsGetCb(ps_result)
1590
1591 async def psItemsGetCb(self, ps_result):
1692 items, metadata = ps_result 1592 items, metadata = ps_result
1593 encoding = 'utf-8'
1693 new_items = [] 1594 new_items = []
1694 1595
1695 for item in items: 1596 for item in items:
1696 if self.check_duplicates: 1597 if self.check_duplicates:
1697 # this is used when we are not ordering by creation 1598 # this is used when we are not ordering by creation
1705 self.host.quit() 1606 self.host.quit()
1706 self.items_ids.append(item_id) 1607 self.items_ids.append(item_id)
1707 1608
1708 # we launch the command to filter the item 1609 # we launch the command to filter the item
1709 try: 1610 try:
1710 p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE, 1611 p = await asyncio.create_subprocess_exec(
1711 stdout=subprocess.PIPE) 1612 self.args.command_path,
1613 stdin=subprocess.PIPE,
1614 stdout=subprocess.PIPE)
1712 except OSError as e: 1615 except OSError as e:
1713 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR 1616 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR
1714 e = str(e).decode('utf-8', errors="ignore") 1617 self.disp(f"Can't execute the command: {e}", error=True)
1715 self.disp("Can't execute the command: {msg}".format(msg=e), error=True)
1716 self.host.quit(exit_code) 1618 self.host.quit(exit_code)
1717 cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8")) 1619 encoding = "utf-8"
1718 ret = p.wait() 1620 cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding))
1621 ret = p.returncode
1719 if ret != 0: 1622 if ret != 0:
1720 self.disp("The command returned a non zero status while parsing the " 1623 self.disp(f"The command returned a non zero status while parsing the "
1721 "following item:\n\n{item}".format(item=item), error=True) 1624 f"following item:\n\n{item}", error=True)
1722 if self.args.ignore_errors: 1625 if self.args.ignore_errors:
1723 continue 1626 continue
1724 else: 1627 else:
1725 self.host.quit(C.EXIT_CMD_ERROR) 1628 self.host.quit(C.EXIT_CMD_ERROR)
1726 if cmd_std_err is not None: 1629 if cmd_std_err is not None:
1727 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') 1630 cmd_std_err = cmd_std_err.decode(encoding, errors='ignore')
1728 self.disp(cmd_std_err, error=True) 1631 self.disp(cmd_std_err, error=True)
1729 cmd_std_out = cmd_std_out.strip() 1632 cmd_std_out = cmd_std_out.decode(encoding).strip()
1730 if cmd_std_out == "DELETE": 1633 if cmd_std_out == "DELETE":
1731 item_elt, __ = xml_tools.etreeParse(self, item) 1634 item_elt, __ = xml_tools.etreeParse(self, item)
1732 item_id = item_elt.get('id') 1635 item_id = item_elt.get('id')
1733 self.disp(_("Deleting item {item_id}").format(item_id=item_id)) 1636 self.disp(_(f"Deleting item {item_id}"))
1734 if self.args.apply: 1637 if self.args.apply:
1735 # FIXME: we don't wait for item to be retracted which can cause 1638 try:
1736 # trouble in case of error just before the end of the command 1639 await self.host.bridge.psRetractItem(
1737 # (the error message may be missed). 1640 self.args.service,
1738 # Once moved to Python 3, we must wait for it by using a 1641 self.args.node,
1739 # coroutine. 1642 item_id,
1740 self.host.bridge.psRetractItem( 1643 False,
1741 self.args.service, 1644 self.profile,
1742 self.args.node, 1645 )
1743 item_id, 1646 except Exception as e:
1744 False, 1647 self.disp(
1745 self.profile, 1648 f"can't delete item {item_id}: {e}", error=True
1746 errback=partial( 1649 )
1747 self.errback, 1650 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1748 msg=_("can't delete item [%s]: {}" % item_id),
1749 exit_code=C.EXIT_BRIDGE_ERRBACK,
1750 ),
1751 )
1752 continue 1651 continue
1753 elif cmd_std_out == "SKIP": 1652 elif cmd_std_out == "SKIP":
1754 item_elt, __ = xml_tools.etreeParse(self, item) 1653 item_elt, __ = xml_tools.etreeParse(self, item)
1755 item_id = item_elt.get('id') 1654 item_id = item_elt.get('id')
1756 self.disp(_("Skipping item {item_id}").format(item_id=item_id)) 1655 self.disp(_("Skipping item {item_id}").format(item_id=item_id))
1772 new_items.append(etree.tostring(element, encoding="unicode")) 1671 new_items.append(etree.tostring(element, encoding="unicode"))
1773 1672
1774 if not self.args.apply: 1673 if not self.args.apply:
1775 # on dry run we have nothing to wait for, we can quit 1674 # on dry run we have nothing to wait for, we can quit
1776 if self.args.all: 1675 if self.args.all:
1777 return self.handleNextPage(metadata) 1676 return await self.handleNextPage(metadata)
1778 self.host.quit() 1677 self.host.quit()
1779 else: 1678 else:
1780 if self.args.admin: 1679 if self.args.admin:
1781 self.host.bridge.psAdminItemsSend( 1680 bridge_method = self.host.bridge.psAdminItemsSend
1681 else:
1682 bridge_method = self.host.bridge.psItemsSend
1683
1684 try:
1685 ps_result = await bridge_method(
1782 self.args.service, 1686 self.args.service,
1783 self.args.node, 1687 self.args.node,
1784 new_items, 1688 new_items,
1785 "", 1689 "",
1786 self.profile, 1690 self.profile,
1787 callback=partial(self.psItemsSendCb, metadata=metadata),
1788 errback=partial(
1789 self.errback,
1790 msg=_("can't send item: {}"),
1791 exit_code=C.EXIT_BRIDGE_ERRBACK,
1792 ),
1793 ) 1691 )
1692 except Exception as e:
1693 self.disp(f"can't send item: {e}", error=True)
1694 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1794 else: 1695 else:
1795 self.host.bridge.psItemsSend( 1696 await self.psItemsSendCb(ps_result, metadata=metadata)
1796 self.args.service, 1697
1797 self.args.node, 1698 async def start(self):
1798 new_items,
1799 "",
1800 self.profile,
1801 callback=partial(self.psItemsSendCb, metadata=metadata),
1802 errback=partial(
1803 self.errback,
1804 msg=_("can't send item: {}"),
1805 exit_code=C.EXIT_BRIDGE_ERRBACK,
1806 ),
1807 )
1808
1809 def start(self):
1810 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: 1699 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION:
1811 self.check_duplicates = True 1700 self.check_duplicates = True
1812 self.items_ids = [] 1701 self.items_ids = []
1813 self.disp(A.color( 1702 self.disp(A.color(
1814 A.FG_RED, A.BOLD, 1703 A.FG_RED, A.BOLD,
1817 "We'll update items, so order may change during transformation,\n" 1706 "We'll update items, so order may change during transformation,\n"
1818 "we'll try to mitigate that by stopping on first duplicate,\n" 1707 "we'll try to mitigate that by stopping on first duplicate,\n"
1819 "but this method is not safe, and some items may be missed.\n---\n")) 1708 "but this method is not safe, and some items may be missed.\n---\n"))
1820 else: 1709 else:
1821 self.check_duplicates = False 1710 self.check_duplicates = False
1822 self.host.bridge.psItemsGet( 1711
1823 self.args.service, 1712 try:
1824 self.args.node, 1713 ps_result = await self.host.bridge.psItemsGet(
1825 self.args.max, 1714 self.args.service,
1826 self.args.items, 1715 self.args.node,
1827 "", 1716 self.args.max,
1828 self.getPubsubExtra(), 1717 self.args.items,
1829 self.profile, 1718 "",
1830 callback=self.psItemsGetCb, 1719 self.getPubsubExtra(),
1831 errback=partial( 1720 self.profile,
1832 self.errback, 1721 )
1833 msg=_("can't retrieve items: {}"), 1722 except Exception as e:
1834 exit_code=C.EXIT_BRIDGE_ERRBACK, 1723 self.disp(f"can't retrieve items: {e}", error=True)
1835 ), 1724 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1836 ) 1725 else:
1726 await self.psItemsGetCb(ps_result)
1837 1727
1838 1728
1839 class Uri(base.CommandBase): 1729 class Uri(base.CommandBase):
1840 def __init__(self, host): 1730 def __init__(self, host):
1841 base.CommandBase.__init__( 1731 base.CommandBase.__init__(
1845 use_profile=False, 1735 use_profile=False,
1846 use_pubsub=True, 1736 use_pubsub=True,
1847 pubsub_flags={C.NODE, C.SINGLE_ITEM}, 1737 pubsub_flags={C.NODE, C.SINGLE_ITEM},
1848 help=_("build URI"), 1738 help=_("build URI"),
1849 ) 1739 )
1850 self.need_loop = True
1851 1740
1852 def add_parser_options(self): 1741 def add_parser_options(self):
1853 self.parser.add_argument( 1742 self.parser.add_argument(
1854 "-p", 1743 "-p",
1855 "--profile", 1744 "--profile",
1869 if value: 1758 if value:
1870 uri_args[key] = value 1759 uri_args[key] = value
1871 self.disp(uri.buildXMPPUri("pubsub", **uri_args)) 1760 self.disp(uri.buildXMPPUri("pubsub", **uri_args))
1872 self.host.quit() 1761 self.host.quit()
1873 1762
1874 def start(self): 1763 async def start(self):
1875 if not self.args.service: 1764 if not self.args.service:
1876 self.host.bridge.asyncGetParamA( 1765 try:
1877 "JabberID", 1766 jid_ = await self.host.bridge.asyncGetParamA(
1878 "Connection", 1767 "JabberID",
1879 profile_key=self.args.profile, 1768 "Connection",
1880 callback=self.display_uri, 1769 profile_key=self.args.profile
1881 errback=partial( 1770 )
1882 self.errback, 1771 except Exception as e:
1883 msg=_("can't retrieve jid: {}"), 1772 self.disp(f"can't retrieve jid: {e}", error=True)
1884 exit_code=C.EXIT_BRIDGE_ERRBACK, 1773 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1885 ), 1774 else:
1886 ) 1775 self.display_uri(jid_)
1887 else: 1776 else:
1888 self.display_uri(None) 1777 self.display_uri(None)
1889 1778
1890 1779
1891 class HookCreate(base.CommandBase): 1780 class HookCreate(base.CommandBase):
1896 "create", 1785 "create",
1897 use_pubsub=True, 1786 use_pubsub=True,
1898 pubsub_flags={C.NODE}, 1787 pubsub_flags={C.NODE},
1899 help=_("create a Pubsub hook"), 1788 help=_("create a Pubsub hook"),
1900 ) 1789 )
1901 self.need_loop = True
1902 1790
1903 def add_parser_options(self): 1791 def add_parser_options(self):
1904 self.parser.add_argument( 1792 self.parser.add_argument(
1905 "-t", 1793 "-t",
1906 "--type", 1794 "--type",
1926 if not os.path.isfile(self.args.hook_arg): 1814 if not os.path.isfile(self.args.hook_arg):
1927 self.parser.error( 1815 self.parser.error(
1928 _("{path} is not a file").format(path=self.args.hook_arg) 1816 _("{path} is not a file").format(path=self.args.hook_arg)
1929 ) 1817 )
1930 1818
1931 def start(self): 1819 async def start(self):
1932 self.checkArgs(self) 1820 self.checkArgs(self)
1933 self.host.bridge.psHookAdd( 1821 try:
1934 self.args.service, 1822 await self.host.bridge.psHookAdd(
1935 self.args.node, 1823 self.args.service,
1936 self.args.type, 1824 self.args.node,
1937 self.args.hook_arg, 1825 self.args.type,
1938 self.args.persistent, 1826 self.args.hook_arg,
1939 self.profile, 1827 self.args.persistent,
1940 callback=self.host.quit, 1828 self.profile,
1941 errback=partial( 1829 )
1942 self.errback, 1830 except Exception as e:
1943 msg=_("can't create hook: {}"), 1831 self.disp(f"can't create hook: {e}", error=True)
1944 exit_code=C.EXIT_BRIDGE_ERRBACK, 1832 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1945 ), 1833 else:
1946 ) 1834 self.host.quit()
1947 1835
1948 1836
1949 class HookDelete(base.CommandBase): 1837 class HookDelete(base.CommandBase):
1950 def __init__(self, host): 1838 def __init__(self, host):
1951 base.CommandBase.__init__( 1839 base.CommandBase.__init__(
1954 "delete", 1842 "delete",
1955 use_pubsub=True, 1843 use_pubsub=True,
1956 pubsub_flags={C.NODE}, 1844 pubsub_flags={C.NODE},
1957 help=_("delete a Pubsub hook"), 1845 help=_("delete a Pubsub hook"),
1958 ) 1846 )
1959 self.need_loop = True
1960 1847
1961 def add_parser_options(self): 1848 def add_parser_options(self):
1962 self.parser.add_argument( 1849 self.parser.add_argument(
1963 "-t", 1850 "-t",
1964 "--type", 1851 "--type",
1974 help=_( 1861 help=_(
1975 "argument of the hook to remove, empty to remove all (DEFAULT: remove all)" 1862 "argument of the hook to remove, empty to remove all (DEFAULT: remove all)"
1976 ), 1863 ),
1977 ) 1864 )
1978 1865
1979 def psHookRemoveCb(self, nb_deleted): 1866 async def start(self):
1980 self.disp(
1981 _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted)
1982 )
1983 self.host.quit()
1984
1985 def start(self):
1986 HookCreate.checkArgs(self) 1867 HookCreate.checkArgs(self)
1987 self.host.bridge.psHookRemove( 1868 try:
1988 self.args.service, 1869 nb_deleted = await self.host.bridge.psHookRemove(
1989 self.args.node, 1870 self.args.service,
1990 self.args.type, 1871 self.args.node,
1991 self.args.hook_arg, 1872 self.args.type,
1992 self.profile, 1873 self.args.hook_arg,
1993 callback=self.psHookRemoveCb, 1874 self.profile,
1994 errback=partial( 1875 )
1995 self.errback, 1876 except Exception as e:
1996 msg=_("can't delete hook: {}"), 1877 self.disp(f"can't delete hook: {e}", error=True)
1997 exit_code=C.EXIT_BRIDGE_ERRBACK, 1878 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
1998 ), 1879 else:
1999 ) 1880 self.disp(_(f"{nb_deleted} hook(s) have been deleted"))
1881 self.host.quit()
2000 1882
2001 1883
2002 class HookList(base.CommandBase): 1884 class HookList(base.CommandBase):
2003 def __init__(self, host): 1885 def __init__(self, host):
2004 base.CommandBase.__init__( 1886 base.CommandBase.__init__(
2006 host, 1888 host,
2007 "list", 1889 "list",
2008 use_output=C.OUTPUT_LIST_DICT, 1890 use_output=C.OUTPUT_LIST_DICT,
2009 help=_("list hooks of a profile"), 1891 help=_("list hooks of a profile"),
2010 ) 1892 )
2011 self.need_loop = True
2012 1893
2013 def add_parser_options(self): 1894 def add_parser_options(self):
2014 pass 1895 pass
2015 1896
2016 def psHookListCb(self, data): 1897 async def start(self):
2017 if not data: 1898 try:
2018 self.disp(_("No hook found.")) 1899 data = await self.host.bridge.psHookList(
2019 self.output(data) 1900 self.profile,
2020 self.host.quit() 1901 )
2021 1902 except Exception as e:
2022 def start(self): 1903 self.disp(f"can't list hooks: {e}", error=True)
2023 self.host.bridge.psHookList( 1904 self.host.quit(C.EXIT_BRIDGE_ERRBACK)
2024 self.profile, 1905 else:
2025 callback=self.psHookListCb, 1906 if not data:
2026 errback=partial( 1907 self.disp(_("No hook found."))
2027 self.errback, 1908 await self.output(data)
2028 msg=_("can't list hooks: {}"), 1909 self.host.quit()
2029 exit_code=C.EXIT_BRIDGE_ERRBACK,
2030 ),
2031 )
2032 1910
2033 1911
2034 class Hook(base.CommandBase): 1912 class Hook(base.CommandBase):
2035 subcommands = (HookCreate, HookDelete, HookList) 1913 subcommands = (HookCreate, HookDelete, HookList)
2036 1914