Mercurial > libervia-backend
comparison sat_frontends/jp/cmd_pubsub.py @ 3040:fee60f17ebac
jp: jp asyncio port:
/!\ this commit is huge. Jp is temporarily not working with `dbus` bridge /!\
This patch implements the port of jp to asyncio, so it is now correctly using the bridge
asynchronously, and it can be used with bridges like `pb`. This also simplify the code,
notably for things which were previously implemented with many callbacks (like pagination
with RSM).
During the process, some behaviours have been modified/fixed, in jp and backends, check
diff for details.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 25 Sep 2019 08:56:41 +0200 |
parents | ab2696e34d29 |
children | cea52c9ddfd9 |
comparison
equal
deleted
inserted
replaced
3039:a1bc34f90fa5 | 3040:fee60f17ebac |
---|---|
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 | 20 |
21 import argparse | |
22 import os.path | |
23 import re | |
24 import sys | |
25 import subprocess | |
26 import asyncio | |
21 from . import base | 27 from . import base |
22 from sat.core.i18n import _ | 28 from sat.core.i18n import _ |
23 from sat.core import exceptions | 29 from sat.core import exceptions |
24 from sat_frontends.jp.constants import Const as C | 30 from sat_frontends.jp.constants import Const as C |
25 from sat_frontends.jp import common | 31 from sat_frontends.jp import common |
27 from sat_frontends.jp import xml_tools | 33 from sat_frontends.jp import xml_tools |
28 from functools import partial | 34 from functools import partial |
29 from sat.tools.common import uri | 35 from sat.tools.common import uri |
30 from sat.tools.common.ansi import ANSI as A | 36 from sat.tools.common.ansi import ANSI as A |
31 from sat_frontends.tools import jid, strings | 37 from sat_frontends.tools import jid, strings |
32 import argparse | |
33 import os.path | |
34 import re | |
35 import subprocess | |
36 import sys | |
37 | 38 |
38 __commands__ = ["Pubsub"] | 39 __commands__ = ["Pubsub"] |
39 | 40 |
40 PUBSUB_TMP_DIR = "pubsub" | 41 PUBSUB_TMP_DIR = "pubsub" |
41 PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema" | 42 PUBSUB_SCHEMA_TMP_DIR = PUBSUB_TMP_DIR + "_schema" |
53 use_output=C.OUTPUT_DICT, | 54 use_output=C.OUTPUT_DICT, |
54 use_pubsub=True, | 55 use_pubsub=True, |
55 pubsub_flags={C.NODE}, | 56 pubsub_flags={C.NODE}, |
56 help=_("retrieve node configuration"), | 57 help=_("retrieve node configuration"), |
57 ) | 58 ) |
58 self.need_loop = True | |
59 | 59 |
60 def add_parser_options(self): | 60 def add_parser_options(self): |
61 self.parser.add_argument( | 61 self.parser.add_argument( |
62 "-k", | 62 "-k", |
63 "--key", | 63 "--key", |
70 return key[7:] if key.startswith("pubsub#") else key | 70 return key[7:] if key.startswith("pubsub#") else key |
71 | 71 |
72 def filterKey(self, key): | 72 def filterKey(self, key): |
73 return any((key == k or key == "pubsub#" + k) for k in self.args.keys) | 73 return any((key == k or key == "pubsub#" + k) for k in self.args.keys) |
74 | 74 |
75 def psNodeConfigurationGetCb(self, config_dict): | 75 async def start(self): |
76 key_filter = (lambda k: True) if not self.args.keys else self.filterKey | 76 try: |
77 config_dict = { | 77 config_dict = await self.host.bridge.psNodeConfigurationGet( |
78 self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k) | 78 self.args.service, |
79 } | 79 self.args.node, |
80 self.output(config_dict) | 80 self.profile, |
81 self.host.quit() | 81 ) |
82 | 82 except Exception as e: |
83 def psNodeConfigurationGetEb(self, failure_): | 83 self.disp(f"can't get node configuration: {e}", error=True) |
84 self.disp( | 84 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
85 "can't get node configuration: {reason}".format(reason=failure_), error=True | 85 else: |
86 ) | 86 key_filter = (lambda k: True) if not self.args.keys else self.filterKey |
87 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 87 config_dict = { |
88 | 88 self.removePrefix(k): v for k, v in config_dict.items() if key_filter(k) |
89 def start(self): | 89 } |
90 self.host.bridge.psNodeConfigurationGet( | 90 await self.output(config_dict) |
91 self.args.service, | 91 self.host.quit() |
92 self.args.node, | |
93 self.profile, | |
94 callback=self.psNodeConfigurationGetCb, | |
95 errback=self.psNodeConfigurationGetEb, | |
96 ) | |
97 | 92 |
98 | 93 |
99 class NodeCreate(base.CommandBase): | 94 class NodeCreate(base.CommandBase): |
100 def __init__(self, host): | 95 def __init__(self, host): |
101 base.CommandBase.__init__( | 96 base.CommandBase.__init__( |
106 use_pubsub=True, | 101 use_pubsub=True, |
107 pubsub_flags={C.NODE}, | 102 pubsub_flags={C.NODE}, |
108 use_verbose=True, | 103 use_verbose=True, |
109 help=_("create a node"), | 104 help=_("create a node"), |
110 ) | 105 ) |
111 self.need_loop = True | |
112 | 106 |
113 def add_parser_options(self): | 107 def add_parser_options(self): |
114 self.parser.add_argument( | 108 self.parser.add_argument( |
115 "-f", | 109 "-f", |
116 "--field", | 110 "--field", |
126 "--full-prefix", | 120 "--full-prefix", |
127 action="store_true", | 121 action="store_true", |
128 help=_('don\'t prepend "pubsub#" prefix to field names'), | 122 help=_('don\'t prepend "pubsub#" prefix to field names'), |
129 ) | 123 ) |
130 | 124 |
131 def psNodeCreateCb(self, node_id): | 125 async def start(self): |
132 if self.host.verbosity: | |
133 announce = _("node created successfully: ") | |
134 else: | |
135 announce = "" | |
136 self.disp(announce + node_id) | |
137 self.host.quit() | |
138 | |
139 def psNodeCreateEb(self, failure_): | |
140 self.disp("can't create: {reason}".format(reason=failure_), error=True) | |
141 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
142 | |
143 def start(self): | |
144 if not self.args.full_prefix: | 126 if not self.args.full_prefix: |
145 options = {"pubsub#" + k: v for k, v in self.args.fields} | 127 options = {"pubsub#" + k: v for k, v in self.args.fields} |
146 else: | 128 else: |
147 options = dict(self.args.fields) | 129 options = dict(self.args.fields) |
148 self.host.bridge.psNodeCreate( | 130 try: |
149 self.args.service, | 131 node_id = await self.host.bridge.psNodeCreate( |
150 self.args.node, | 132 self.args.service, |
151 options, | 133 self.args.node, |
152 self.profile, | 134 options, |
153 callback=self.psNodeCreateCb, | 135 self.profile, |
154 errback=partial( | 136 ) |
155 self.errback, | 137 except Exception as e: |
156 msg=_("can't create node: {}"), | 138 self.disp(msg=_(f"can't create node: {e}"), error=True) |
157 exit_code=C.EXIT_BRIDGE_ERRBACK, | 139 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
158 ), | 140 else: |
159 ) | 141 if self.host.verbosity: |
142 announce = _("node created successfully: ") | |
143 else: | |
144 announce = "" | |
145 self.disp(announce + node_id) | |
146 self.host.quit() | |
160 | 147 |
161 | 148 |
162 class NodePurge(base.CommandBase): | 149 class NodePurge(base.CommandBase): |
163 | 150 |
164 def __init__(self, host): | 151 def __init__(self, host): |
167 "purge", | 154 "purge", |
168 use_pubsub=True, | 155 use_pubsub=True, |
169 pubsub_flags={C.NODE}, | 156 pubsub_flags={C.NODE}, |
170 help=_("purge a node (i.e. remove all items from it)"), | 157 help=_("purge a node (i.e. remove all items from it)"), |
171 ) | 158 ) |
172 self.need_loop = True | |
173 | 159 |
174 def add_parser_options(self): | 160 def add_parser_options(self): |
175 self.parser.add_argument( | 161 self.parser.add_argument( |
176 "-f", | 162 "-f", |
177 "--force", | 163 "--force", |
178 action="store_true", | 164 action="store_true", |
179 help=_("purge node without confirmation"), | 165 help=_("purge node without confirmation"), |
180 ) | 166 ) |
181 | 167 |
182 def psNodePurgeCb(self): | 168 async def start(self): |
183 self.disp(_("node [{node}] purged successfully").format(node=self.args.node)) | |
184 self.host.quit() | |
185 | |
186 def start(self): | |
187 if not self.args.force: | 169 if not self.args.force: |
188 if not self.args.service: | 170 if not self.args.service: |
189 message = _("Are you sure to purge PEP node [{node_id}]? " | 171 message = _( |
190 "This will delete ALL items from it!").format( | 172 f"Are you sure to purge PEP node [{self.args.node}]? This will " |
191 node_id=self.args.node | 173 f"delete ALL items from it!") |
192 ) | |
193 else: | 174 else: |
194 message = _( | 175 message = _( |
195 "Are you sure to delete node [{node_id}] on service [{service}]? " | 176 f"Are you sure to delete node [{self.args.node}] on service " |
196 "This will delete ALL items from it!" | 177 f"[{self.args.service}]? This will delete ALL items from it!") |
197 ).format(node_id=self.args.node, service=self.args.service) | 178 await self.host.confirmOrQuit(message, _("node purge cancelled")) |
198 self.host.confirmOrQuit(message, _("node purge cancelled")) | 179 |
199 | 180 try: |
200 self.host.bridge.psNodePurge( | 181 await self.host.bridge.psNodePurge( |
201 self.args.service, | 182 self.args.service, |
202 self.args.node, | 183 self.args.node, |
203 self.profile, | 184 self.profile, |
204 callback=self.psNodePurgeCb, | 185 ) |
205 errback=partial( | 186 except Exception as e: |
206 self.errback, | 187 self.disp(msg=_(f"can't purge node: {e}"), error=True) |
207 msg=_("can't purge node: {}"), | 188 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
208 exit_code=C.EXIT_BRIDGE_ERRBACK, | 189 else: |
209 ), | 190 self.disp(_(f"node [{self.args.node}] purged successfully")) |
210 ) | 191 self.host.quit() |
211 | 192 |
212 | 193 |
213 class NodeDelete(base.CommandBase): | 194 class NodeDelete(base.CommandBase): |
214 def __init__(self, host): | 195 def __init__(self, host): |
215 base.CommandBase.__init__( | 196 base.CommandBase.__init__( |
218 "delete", | 199 "delete", |
219 use_pubsub=True, | 200 use_pubsub=True, |
220 pubsub_flags={C.NODE}, | 201 pubsub_flags={C.NODE}, |
221 help=_("delete a node"), | 202 help=_("delete a node"), |
222 ) | 203 ) |
223 self.need_loop = True | |
224 | 204 |
225 def add_parser_options(self): | 205 def add_parser_options(self): |
226 self.parser.add_argument( | 206 self.parser.add_argument( |
227 "-f", | 207 "-f", |
228 "--force", | 208 "--force", |
229 action="store_true", | 209 action="store_true", |
230 help=_("delete node without confirmation"), | 210 help=_("delete node without confirmation"), |
231 ) | 211 ) |
232 | 212 |
233 def psNodeDeleteCb(self): | 213 async def start(self): |
234 self.disp(_("node [{node}] deleted successfully").format(node=self.args.node)) | |
235 self.host.quit() | |
236 | |
237 def start(self): | |
238 if not self.args.force: | 214 if not self.args.force: |
239 if not self.args.service: | 215 if not self.args.service: |
240 message = _("Are you sure to delete PEP node [{node_id}] ?").format( | 216 message = _(f"Are you sure to delete PEP node [{self.args.node}] ?") |
241 node_id=self.args.node | |
242 ) | |
243 else: | 217 else: |
244 message = _( | 218 message = _(f"Are you sure to delete node [{self.args.node}] on " |
245 "Are you sure to delete node [{node_id}] on service [{service}] ?" | 219 f"service [{self.args.service}]?") |
246 ).format(node_id=self.args.node, service=self.args.service) | 220 await self.host.confirmOrQuit(message, _("node deletion cancelled")) |
247 self.host.confirmOrQuit(message, _("node deletion cancelled")) | 221 |
248 | 222 try: |
249 self.host.bridge.psNodeDelete( | 223 await self.host.bridge.psNodeDelete( |
250 self.args.service, | 224 self.args.service, |
251 self.args.node, | 225 self.args.node, |
252 self.profile, | 226 self.profile, |
253 callback=self.psNodeDeleteCb, | 227 ) |
254 errback=partial( | 228 except Exception as e: |
255 self.errback, | 229 self.disp(f"can't delete node: {e}", error=True) |
256 msg=_("can't delete node: {}"), | 230 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
257 exit_code=C.EXIT_BRIDGE_ERRBACK, | 231 else: |
258 ), | 232 self.disp(_(f"node [{self.args.node}] deleted successfully")) |
259 ) | 233 self.host.quit() |
260 | 234 |
261 | 235 |
262 class NodeSet(base.CommandBase): | 236 class NodeSet(base.CommandBase): |
263 def __init__(self, host): | 237 def __init__(self, host): |
264 base.CommandBase.__init__( | 238 base.CommandBase.__init__( |
269 use_pubsub=True, | 243 use_pubsub=True, |
270 pubsub_flags={C.NODE}, | 244 pubsub_flags={C.NODE}, |
271 use_verbose=True, | 245 use_verbose=True, |
272 help=_("set node configuration"), | 246 help=_("set node configuration"), |
273 ) | 247 ) |
274 self.need_loop = True | |
275 | 248 |
276 def add_parser_options(self): | 249 def add_parser_options(self): |
277 self.parser.add_argument( | 250 self.parser.add_argument( |
278 "-f", | 251 "-f", |
279 "--field", | 252 "--field", |
282 dest="fields", | 255 dest="fields", |
283 required=True, | 256 required=True, |
284 metavar=("KEY", "VALUE"), | 257 metavar=("KEY", "VALUE"), |
285 help=_("configuration field to set (required)"), | 258 help=_("configuration field to set (required)"), |
286 ) | 259 ) |
287 | 260 self.parser.add_argument( |
288 def psNodeConfigurationSetCb(self): | 261 "-F", |
289 self.disp(_("node configuration successful"), 1) | 262 "--full-prefix", |
290 self.host.quit() | 263 action="store_true", |
291 | 264 help=_('don\'t prepend "pubsub#" prefix to field names'), |
292 def psNodeConfigurationSetEb(self, failure_): | 265 ) |
293 self.disp( | |
294 "can't set node configuration: {reason}".format(reason=failure_), error=True | |
295 ) | |
296 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
297 | 266 |
298 def getKeyName(self, k): | 267 def getKeyName(self, k): |
299 if not k.startswith("pubsub#"): | 268 if self.args.full_prefix or k.startswith("pubsub#"): |
269 return k | |
270 else: | |
300 return "pubsub#" + k | 271 return "pubsub#" + k |
301 else: | 272 |
302 return k | 273 async def start(self): |
303 | 274 try: |
304 def start(self): | 275 await self.host.bridge.psNodeConfigurationSet( |
305 self.host.bridge.psNodeConfigurationSet( | 276 self.args.service, |
306 self.args.service, | 277 self.args.node, |
307 self.args.node, | 278 {self.getKeyName(k): v for k, v in self.args.fields}, |
308 {self.getKeyName(k): v for k, v in self.args.fields}, | 279 self.profile, |
309 self.profile, | 280 ) |
310 callback=self.psNodeConfigurationSetCb, | 281 except Exception as e: |
311 errback=self.psNodeConfigurationSetEb, | 282 self.disp(f"can't set node configuration: {e}", error=True) |
312 ) | 283 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
284 else: | |
285 self.disp(_("node configuration successful"), 1) | |
286 self.host.quit() | |
313 | 287 |
314 | 288 |
315 class NodeImport(base.CommandBase): | 289 class NodeImport(base.CommandBase): |
316 | 290 |
317 def __init__(self, host): | 291 def __init__(self, host): |
320 "import", | 294 "import", |
321 use_pubsub=True, | 295 use_pubsub=True, |
322 pubsub_flags={C.NODE}, | 296 pubsub_flags={C.NODE}, |
323 help=_("import raw XML to a node"), | 297 help=_("import raw XML to a node"), |
324 ) | 298 ) |
325 self.need_loop = True | |
326 | 299 |
327 def add_parser_options(self): | 300 def add_parser_options(self): |
328 self.parser.add_argument( | 301 self.parser.add_argument( |
329 "--admin", | 302 "--admin", |
330 action="store_true", | 303 action="store_true", |
335 type=argparse.FileType(), | 308 type=argparse.FileType(), |
336 help=_("path to the XML file with data to import. The file must contain " | 309 help=_("path to the XML file with data to import. The file must contain " |
337 "whole XML of each item to import."), | 310 "whole XML of each item to import."), |
338 ) | 311 ) |
339 | 312 |
340 def psItemsSendCb(self, item_ids): | 313 async def start(self): |
341 self.disp(_('items published with id(s) {item_ids}').format( | |
342 item_ids=', '.join(item_ids))) | |
343 self.host.quit() | |
344 | |
345 def start(self): | |
346 try: | 314 try: |
347 element, etree = xml_tools.etreeParse(self, self.args.import_file, | 315 element, etree = xml_tools.etreeParse(self, self.args.import_file, |
348 reraise=True) | 316 reraise=True) |
349 except Exception as e: | 317 except Exception as e: |
350 from lxml.etree import XMLSyntaxError | 318 from lxml.etree import XMLSyntaxError |
362 if not all([i.tag == '{http://jabber.org/protocol/pubsub}item' for i in element]): | 330 if not all([i.tag == '{http://jabber.org/protocol/pubsub}item' for i in element]): |
363 self.disp( | 331 self.disp( |
364 _("You are not using list of pubsub items, we can't import this file"), | 332 _("You are not using list of pubsub items, we can't import this file"), |
365 error=True) | 333 error=True) |
366 self.host.quit(C.EXIT_DATA_ERROR) | 334 self.host.quit(C.EXIT_DATA_ERROR) |
367 | 335 return |
368 items = [etree.tostring(i, encoding="utf-8") for i in element] | 336 |
337 items = [etree.tostring(i, encoding="unicode") for i in element] | |
369 if self.args.admin: | 338 if self.args.admin: |
370 self.host.bridge.psAdminItemsSend( | 339 method = self.host.bridge.psAdminItemsSend |
340 else: | |
341 self.disp(_("Items are imported without using admin mode, publisher can't " | |
342 "be changed")) | |
343 method = self.host.bridge.psItemsSend | |
344 | |
345 try: | |
346 items_ids = await method( | |
371 self.args.service, | 347 self.args.service, |
372 self.args.node, | 348 self.args.node, |
373 items, | 349 items, |
374 "", | 350 "", |
375 self.profile, | 351 self.profile, |
376 callback=partial(self.psItemsSendCb), | 352 ) |
377 errback=partial( | 353 except Exception as e: |
378 self.errback, | 354 self.disp(f"can't send items: {e}", error=True) |
379 msg=_("can't send item: {}"), | 355 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
380 exit_code=C.EXIT_BRIDGE_ERRBACK, | 356 else: |
381 ), | 357 if items_ids: |
382 ) | 358 self.disp(_('items published with id(s) {items_ids}').format( |
383 else: | 359 items_ids=', '.join(items_ids))) |
384 self.disp(_("Items are imported without using admin mode, publisher can't " | 360 else: |
385 "be changed")) | 361 self.disp(_('items published')) |
386 self.host.bridge.psItemsSend( | 362 self.host.quit() |
387 self.args.service, | |
388 self.args.node, | |
389 items, | |
390 "", | |
391 self.profile, | |
392 callback=partial(self.psItemsSendCb), | |
393 errback=partial( | |
394 self.errback, | |
395 msg=_("can't send item: {}"), | |
396 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
397 ), | |
398 ) | |
399 | 363 |
400 | 364 |
401 class NodeAffiliationsGet(base.CommandBase): | 365 class NodeAffiliationsGet(base.CommandBase): |
402 def __init__(self, host): | 366 def __init__(self, host): |
403 base.CommandBase.__init__( | 367 base.CommandBase.__init__( |
407 use_output=C.OUTPUT_DICT, | 371 use_output=C.OUTPUT_DICT, |
408 use_pubsub=True, | 372 use_pubsub=True, |
409 pubsub_flags={C.NODE}, | 373 pubsub_flags={C.NODE}, |
410 help=_("retrieve node affiliations (for node owner)"), | 374 help=_("retrieve node affiliations (for node owner)"), |
411 ) | 375 ) |
412 self.need_loop = True | |
413 | 376 |
414 def add_parser_options(self): | 377 def add_parser_options(self): |
415 pass | 378 pass |
416 | 379 |
417 def psNodeAffiliationsGetCb(self, affiliations): | 380 async def start(self): |
418 self.output(affiliations) | 381 try: |
419 self.host.quit() | 382 affiliations = await self.host.bridge.psNodeAffiliationsGet( |
420 | 383 self.args.service, |
421 def psNodeAffiliationsGetEb(self, failure_): | 384 self.args.node, |
422 self.disp( | 385 self.profile, |
423 "can't get node affiliations: {reason}".format(reason=failure_), error=True | 386 ) |
424 ) | 387 except Exception as e: |
425 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 388 self.disp(f"can't get node affiliations: {e}", error=True) |
426 | 389 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
427 def start(self): | 390 else: |
428 self.host.bridge.psNodeAffiliationsGet( | 391 await self.output(affiliations) |
429 self.args.service, | 392 self.host.quit() |
430 self.args.node, | |
431 self.profile, | |
432 callback=self.psNodeAffiliationsGetCb, | |
433 errback=self.psNodeAffiliationsGetEb, | |
434 ) | |
435 | 393 |
436 | 394 |
437 class NodeAffiliationsSet(base.CommandBase): | 395 class NodeAffiliationsSet(base.CommandBase): |
438 def __init__(self, host): | 396 def __init__(self, host): |
439 base.CommandBase.__init__( | 397 base.CommandBase.__init__( |
443 use_pubsub=True, | 401 use_pubsub=True, |
444 pubsub_flags={C.NODE}, | 402 pubsub_flags={C.NODE}, |
445 use_verbose=True, | 403 use_verbose=True, |
446 help=_("set affiliations (for node owner)"), | 404 help=_("set affiliations (for node owner)"), |
447 ) | 405 ) |
448 self.need_loop = True | |
449 | 406 |
450 def add_parser_options(self): | 407 def add_parser_options(self): |
451 # XXX: we use optional argument syntax for a required one because list of list of 2 elements | 408 # XXX: we use optional argument syntax for a required one because list of list of 2 elements |
452 # (uses to construct dicts) don't work with positional arguments | 409 # (uses to construct dicts) don't work with positional arguments |
453 self.parser.add_argument( | 410 self.parser.add_argument( |
459 action="append", | 416 action="append", |
460 nargs=2, | 417 nargs=2, |
461 help=_("entity/affiliation couple(s)"), | 418 help=_("entity/affiliation couple(s)"), |
462 ) | 419 ) |
463 | 420 |
464 def psNodeAffiliationsSetCb(self): | 421 async def start(self): |
465 self.disp(_("affiliations have been set"), 1) | |
466 self.host.quit() | |
467 | |
468 def psNodeAffiliationsSetEb(self, failure_): | |
469 self.disp( | |
470 "can't set node affiliations: {reason}".format(reason=failure_), error=True | |
471 ) | |
472 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
473 | |
474 def start(self): | |
475 affiliations = dict(self.args.affiliations) | 422 affiliations = dict(self.args.affiliations) |
476 self.host.bridge.psNodeAffiliationsSet( | 423 try: |
477 self.args.service, | 424 await self.host.bridge.psNodeAffiliationsSet( |
478 self.args.node, | 425 self.args.service, |
479 affiliations, | 426 self.args.node, |
480 self.profile, | 427 affiliations, |
481 callback=self.psNodeAffiliationsSetCb, | 428 self.profile, |
482 errback=self.psNodeAffiliationsSetEb, | 429 ) |
483 ) | 430 except Exception as e: |
431 self.disp(f"can't set node affiliations: {e}", error=True) | |
432 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
433 else: | |
434 self.disp(_("affiliations have been set"), 1) | |
435 self.host.quit() | |
484 | 436 |
485 | 437 |
486 class NodeAffiliations(base.CommandBase): | 438 class NodeAffiliations(base.CommandBase): |
487 subcommands = (NodeAffiliationsGet, NodeAffiliationsSet) | 439 subcommands = (NodeAffiliationsGet, NodeAffiliationsSet) |
488 | 440 |
504 use_output=C.OUTPUT_DICT, | 456 use_output=C.OUTPUT_DICT, |
505 use_pubsub=True, | 457 use_pubsub=True, |
506 pubsub_flags={C.NODE}, | 458 pubsub_flags={C.NODE}, |
507 help=_("retrieve node subscriptions (for node owner)"), | 459 help=_("retrieve node subscriptions (for node owner)"), |
508 ) | 460 ) |
509 self.need_loop = True | |
510 | 461 |
511 def add_parser_options(self): | 462 def add_parser_options(self): |
512 pass | 463 pass |
513 | 464 |
514 def psNodeSubscriptionsGetCb(self, subscriptions): | 465 async def start(self): |
515 self.output(subscriptions) | 466 try: |
516 self.host.quit() | 467 subscriptions = await self.host.bridge.psNodeSubscriptionsGet( |
517 | 468 self.args.service, |
518 def psNodeSubscriptionsGetEb(self, failure_): | 469 self.args.node, |
519 self.disp( | 470 self.profile, |
520 "can't get node subscriptions: {reason}".format(reason=failure_), error=True | 471 ) |
521 ) | 472 except Exception as e: |
522 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 473 self.disp(f"can't get node subscriptions: {e}", error=True) |
523 | 474 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
524 def start(self): | 475 else: |
525 self.host.bridge.psNodeSubscriptionsGet( | 476 await self.output(subscriptions) |
526 self.args.service, | 477 self.host.quit() |
527 self.args.node, | |
528 self.profile, | |
529 callback=self.psNodeSubscriptionsGetCb, | |
530 errback=self.psNodeSubscriptionsGetEb, | |
531 ) | |
532 | 478 |
533 | 479 |
534 class StoreSubscriptionAction(argparse.Action): | 480 class StoreSubscriptionAction(argparse.Action): |
535 """Action which handle subscription parameter for owner | 481 """Action which handle subscription parameter for owner |
536 | 482 |
564 use_pubsub=True, | 510 use_pubsub=True, |
565 pubsub_flags={C.NODE}, | 511 pubsub_flags={C.NODE}, |
566 use_verbose=True, | 512 use_verbose=True, |
567 help=_("set/modify subscriptions (for node owner)"), | 513 help=_("set/modify subscriptions (for node owner)"), |
568 ) | 514 ) |
569 self.need_loop = True | |
570 | 515 |
571 def add_parser_options(self): | 516 def add_parser_options(self): |
572 # XXX: we use optional argument syntax for a required one because list of list of 2 elements | 517 # XXX: we use optional argument syntax for a required one because list of list of 2 elements |
573 # (uses to construct dicts) don't work with positional arguments | 518 # (uses to construct dicts) don't work with positional arguments |
574 self.parser.add_argument( | 519 self.parser.add_argument( |
581 required=True, | 526 required=True, |
582 action=StoreSubscriptionAction, | 527 action=StoreSubscriptionAction, |
583 help=_("entity/subscription couple(s)"), | 528 help=_("entity/subscription couple(s)"), |
584 ) | 529 ) |
585 | 530 |
586 def psNodeSubscriptionsSetCb(self): | 531 async def start(self): |
587 self.disp(_("subscriptions have been set"), 1) | 532 try: |
588 self.host.quit() | 533 self.host.bridge.psNodeSubscriptionsSet( |
589 | 534 self.args.service, |
590 def psNodeSubscriptionsSetEb(self, failure_): | 535 self.args.node, |
591 self.disp( | 536 self.args.subscriptions, |
592 "can't set node subscriptions: {reason}".format(reason=failure_), error=True | 537 self.profile, |
593 ) | 538 ) |
594 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 539 except Exception as e: |
595 | 540 self.disp(f"can't set node subscriptions: {e}", error=True) |
596 def start(self): | 541 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
597 self.host.bridge.psNodeSubscriptionsSet( | 542 else: |
598 self.args.service, | 543 self.disp(_("subscriptions have been set"), 1) |
599 self.args.node, | 544 self.host.quit() |
600 self.args.subscriptions, | |
601 self.profile, | |
602 callback=self.psNodeSubscriptionsSetCb, | |
603 errback=self.psNodeSubscriptionsSetEb, | |
604 ) | |
605 | 545 |
606 | 546 |
607 class NodeSubscriptions(base.CommandBase): | 547 class NodeSubscriptions(base.CommandBase): |
608 subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet) | 548 subcommands = (NodeSubscriptionsGet, NodeSubscriptionsSet) |
609 | 549 |
625 use_pubsub=True, | 565 use_pubsub=True, |
626 pubsub_flags={C.NODE}, | 566 pubsub_flags={C.NODE}, |
627 use_verbose=True, | 567 use_verbose=True, |
628 help=_("set/replace a schema"), | 568 help=_("set/replace a schema"), |
629 ) | 569 ) |
630 self.need_loop = True | |
631 | 570 |
632 def add_parser_options(self): | 571 def add_parser_options(self): |
633 self.parser.add_argument("schema", help=_("schema to set (must be XML)")) | 572 self.parser.add_argument("schema", help=_("schema to set (must be XML)")) |
634 | 573 |
635 def psSchemaSetCb(self): | 574 async def start(self): |
636 self.disp(_("schema has been set"), 1) | 575 try: |
637 self.host.quit() | 576 await self.host.bridge.psSchemaSet( |
638 | 577 self.args.service, |
639 def start(self): | 578 self.args.node, |
640 self.host.bridge.psSchemaSet( | 579 self.args.schema, |
641 self.args.service, | 580 self.profile, |
642 self.args.node, | 581 ) |
643 self.args.schema, | 582 except Exception as e: |
644 self.profile, | 583 self.disp(f"can't set schema: {e}", error=True) |
645 callback=self.psSchemaSetCb, | 584 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
646 errback=partial( | 585 else: |
647 self.errback, | 586 self.disp(_("schema has been set"), 1) |
648 msg=_("can't set schema: {}"), | 587 self.host.quit() |
649 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
650 ), | |
651 ) | |
652 | 588 |
653 | 589 |
654 class NodeSchemaEdit(base.CommandBase, common.BaseEdit): | 590 class NodeSchemaEdit(base.CommandBase, common.BaseEdit): |
655 use_items = False | 591 use_items = False |
656 | 592 |
664 use_draft=True, | 600 use_draft=True, |
665 use_verbose=True, | 601 use_verbose=True, |
666 help=_("edit a schema"), | 602 help=_("edit a schema"), |
667 ) | 603 ) |
668 common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR) | 604 common.BaseEdit.__init__(self, self.host, PUBSUB_SCHEMA_TMP_DIR) |
669 self.need_loop = True | |
670 | 605 |
671 def add_parser_options(self): | 606 def add_parser_options(self): |
672 pass | 607 pass |
673 | 608 |
674 def psSchemaSetCb(self): | 609 async def publish(self, schema): |
675 self.disp(_("schema has been set"), 1) | 610 try: |
676 self.host.quit() | 611 await self.host.bridge.psSchemaSet( |
677 | 612 self.args.service, |
678 def publish(self, schema): | 613 self.args.node, |
679 self.host.bridge.psSchemaSet( | 614 schema, |
680 self.args.service, | 615 self.profile, |
681 self.args.node, | 616 ) |
682 schema, | 617 except Exception as e: |
683 self.profile, | 618 self.disp(f"can't set schema: {e}", error=True) |
684 callback=self.psSchemaSetCb, | 619 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
685 errback=partial( | 620 else: |
686 self.errback, | 621 self.disp(_("schema has been set"), 1) |
687 msg=_("can't set schema: {}"), | 622 self.host.quit() |
688 exit_code=C.EXIT_BRIDGE_ERRBACK, | 623 |
689 ), | 624 async def psSchemaGetCb(self, schema): |
690 ) | |
691 | |
692 def psSchemaGetCb(self, schema): | |
693 try: | 625 try: |
694 from lxml import etree | 626 from lxml import etree |
695 except ImportError: | 627 except ImportError: |
696 self.disp('lxml module must be installed to use edit, please install it ' | 628 self.disp('lxml module must be installed to use edit, please install it ' |
697 'with "pip install lxml"', | 629 'with "pip install lxml"', |
705 schema_elt = etree.fromstring(schema, parser) | 637 schema_elt = etree.fromstring(schema, parser) |
706 content_file_obj.write( | 638 content_file_obj.write( |
707 etree.tostring(schema_elt, encoding="utf-8", pretty_print=True) | 639 etree.tostring(schema_elt, encoding="utf-8", pretty_print=True) |
708 ) | 640 ) |
709 content_file_obj.seek(0) | 641 content_file_obj.seek(0) |
710 self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj) | 642 await self.runEditor("pubsub_schema_editor_args", content_file_path, content_file_obj) |
711 | 643 |
712 def start(self): | 644 async def start(self): |
713 self.host.bridge.psSchemaGet( | 645 try: |
714 self.args.service, | 646 schema = await self.host.bridge.psSchemaGet( |
715 self.args.node, | 647 self.args.service, |
716 self.profile, | 648 self.args.node, |
717 callback=self.psSchemaGetCb, | 649 self.profile, |
718 errback=partial( | 650 ) |
719 self.errback, | 651 except Exception as e: |
720 msg=_("can't edit schema: {}"), | 652 self.disp(f"can't edit schema: {e}", error=True) |
721 exit_code=C.EXIT_BRIDGE_ERRBACK, | 653 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
722 ), | 654 else: |
723 ) | 655 await self.psSchemaGetCb(schema) |
724 | 656 |
725 | 657 |
726 class NodeSchemaGet(base.CommandBase): | 658 class NodeSchemaGet(base.CommandBase): |
727 def __init__(self, host): | 659 def __init__(self, host): |
728 base.CommandBase.__init__( | 660 base.CommandBase.__init__( |
733 use_pubsub=True, | 665 use_pubsub=True, |
734 pubsub_flags={C.NODE}, | 666 pubsub_flags={C.NODE}, |
735 use_verbose=True, | 667 use_verbose=True, |
736 help=_("get schema"), | 668 help=_("get schema"), |
737 ) | 669 ) |
738 self.need_loop = True | |
739 | 670 |
740 def add_parser_options(self): | 671 def add_parser_options(self): |
741 pass | 672 pass |
742 | 673 |
743 def psSchemaGetCb(self, schema): | 674 async def start(self): |
744 if not schema: | 675 try: |
745 self.disp(_("no schema found"), 1) | 676 schema = await self.host.bridge.psSchemaGet( |
746 self.host.quit(1) | 677 self.args.service, |
747 self.output(schema) | 678 self.args.node, |
748 self.host.quit() | 679 self.profile, |
749 | 680 ) |
750 def start(self): | 681 except Exception as e: |
751 self.host.bridge.psSchemaGet( | 682 self.disp(f"can't get schema: {e}", error=True) |
752 self.args.service, | 683 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
753 self.args.node, | 684 else: |
754 self.profile, | 685 if schema: |
755 callback=self.psSchemaGetCb, | 686 await self.output(schema) |
756 errback=partial( | 687 self.host.quit() |
757 self.errback, | 688 else: |
758 msg=_("can't get schema: {}"), | 689 self.disp(_("no schema found"), 1) |
759 exit_code=C.EXIT_BRIDGE_ERRBACK, | 690 self.host.quit(1) |
760 ), | |
761 ) | |
762 | 691 |
763 | 692 |
764 class NodeSchema(base.CommandBase): | 693 class NodeSchema(base.CommandBase): |
765 subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet) | 694 subcommands = (NodeSchemaSet, NodeSchemaEdit, NodeSchemaGet) |
766 | 695 |
797 "set", | 726 "set", |
798 use_pubsub=True, | 727 use_pubsub=True, |
799 pubsub_flags={C.NODE}, | 728 pubsub_flags={C.NODE}, |
800 help=_("publish a new item or update an existing one"), | 729 help=_("publish a new item or update an existing one"), |
801 ) | 730 ) |
802 self.need_loop = True | |
803 | 731 |
804 def add_parser_options(self): | 732 def add_parser_options(self): |
805 self.parser.add_argument( | 733 self.parser.add_argument( |
806 "item", | 734 "item", |
807 nargs="?", | 735 nargs="?", |
808 default="", | 736 default="", |
809 help=_("id, URL of the item to update, keyword, or nothing for new item"), | 737 help=_("id, URL of the item to update, keyword, or nothing for new item"), |
810 ) | 738 ) |
811 | 739 |
812 def psItemsSendCb(self, published_id): | 740 async def start(self): |
813 if published_id: | |
814 self.disp("Item published at {pub_id}".format(pub_id=published_id)) | |
815 else: | |
816 self.disp("Item published") | |
817 self.host.quit(C.EXIT_OK) | |
818 | |
819 def start(self): | |
820 element, etree = xml_tools.etreeParse(self, sys.stdin) | 741 element, etree = xml_tools.etreeParse(self, sys.stdin) |
821 element = xml_tools.getPayload(self, element) | 742 element = xml_tools.getPayload(self, element) |
822 payload = etree.tostring(element, encoding="unicode") | 743 payload = etree.tostring(element, encoding="unicode") |
823 | 744 |
824 self.host.bridge.psItemSend( | 745 try: |
825 self.args.service, | 746 published_id = await self.host.bridge.psItemSend( |
826 self.args.node, | 747 self.args.service, |
827 payload, | 748 self.args.node, |
828 self.args.item, | 749 payload, |
829 {}, | 750 self.args.item, |
830 self.profile, | 751 {}, |
831 callback=self.psItemsSendCb, | 752 self.profile, |
832 errback=partial( | 753 ) |
833 self.errback, | 754 except Exception as e: |
834 msg=_("can't send item: {}"), | 755 self.disp(_(f"can't send item: {e}"), error=True) |
835 exit_code=C.EXIT_BRIDGE_ERRBACK, | 756 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
836 ), | 757 else: |
837 ) | 758 if published_id: |
759 self.disp("Item published at {pub_id}".format(pub_id=published_id)) | |
760 else: | |
761 self.disp("Item published") | |
762 self.host.quit(C.EXIT_OK) | |
838 | 763 |
839 | 764 |
840 class Get(base.CommandBase): | 765 class Get(base.CommandBase): |
841 def __init__(self, host): | 766 def __init__(self, host): |
842 base.CommandBase.__init__( | 767 base.CommandBase.__init__( |
846 use_output=C.OUTPUT_LIST_XML, | 771 use_output=C.OUTPUT_LIST_XML, |
847 use_pubsub=True, | 772 use_pubsub=True, |
848 pubsub_flags={C.NODE, C.MULTI_ITEMS}, | 773 pubsub_flags={C.NODE, C.MULTI_ITEMS}, |
849 help=_("get pubsub item(s)"), | 774 help=_("get pubsub item(s)"), |
850 ) | 775 ) |
851 self.need_loop = True | |
852 | 776 |
853 def add_parser_options(self): | 777 def add_parser_options(self): |
854 self.parser.add_argument( | 778 self.parser.add_argument( |
855 "-S", | 779 "-S", |
856 "--sub-id", | 780 "--sub-id", |
858 help=_("subscription id"), | 782 help=_("subscription id"), |
859 ) | 783 ) |
860 # Â TODO: a key(s) argument to select keys to display | 784 # Â TODO: a key(s) argument to select keys to display |
861 # TODO: add MAM filters | 785 # TODO: add MAM filters |
862 | 786 |
863 def psItemsGetCb(self, ps_result): | 787 async def start(self): |
864 self.output(ps_result[0]) | 788 try: |
865 self.host.quit(C.EXIT_OK) | 789 ps_result = await self.host.bridge.psItemsGet( |
866 | 790 self.args.service, |
867 def psItemsGetEb(self, failure_): | 791 self.args.node, |
868 self.disp("can't get pubsub items: {reason}".format(reason=failure_), error=True) | 792 self.args.max, |
869 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 793 self.args.items, |
870 | 794 self.args.sub_id, |
871 def start(self): | 795 self.getPubsubExtra(), |
872 self.host.bridge.psItemsGet( | 796 self.profile, |
873 self.args.service, | 797 ) |
874 self.args.node, | 798 except Exception as e: |
875 self.args.max, | 799 self.disp(f"can't get pubsub items: {e}", error=True) |
876 self.args.items, | 800 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
877 self.args.sub_id, | 801 else: |
878 self.getPubsubExtra(), | 802 await self.output(ps_result[0]) |
879 self.profile, | 803 self.host.quit(C.EXIT_OK) |
880 callback=self.psItemsGetCb, | |
881 errback=self.psItemsGetEb, | |
882 ) | |
883 | 804 |
884 | 805 |
885 class Delete(base.CommandBase): | 806 class Delete(base.CommandBase): |
886 def __init__(self, host): | 807 def __init__(self, host): |
887 base.CommandBase.__init__( | 808 base.CommandBase.__init__( |
890 "delete", | 811 "delete", |
891 use_pubsub=True, | 812 use_pubsub=True, |
892 pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM}, | 813 pubsub_flags={C.NODE, C.ITEM, C.SINGLE_ITEM}, |
893 help=_("delete an item"), | 814 help=_("delete an item"), |
894 ) | 815 ) |
895 self.need_loop = True | |
896 | 816 |
897 def add_parser_options(self): | 817 def add_parser_options(self): |
898 self.parser.add_argument( | 818 self.parser.add_argument( |
899 "-f", "--force", action="store_true", help=_("delete without confirmation") | 819 "-f", "--force", action="store_true", help=_("delete without confirmation") |
900 ) | 820 ) |
901 self.parser.add_argument( | 821 self.parser.add_argument( |
902 "-N", "--notify", action="store_true", help=_("notify deletion") | 822 "-N", "--notify", action="store_true", help=_("notify deletion") |
903 ) | 823 ) |
904 | 824 |
905 def psItemsDeleteCb(self): | 825 async def start(self): |
906 self.disp(_("item {item_id} has been deleted").format(item_id=self.args.item)) | |
907 self.host.quit(C.EXIT_OK) | |
908 | |
909 def start(self): | |
910 if not self.args.item: | 826 if not self.args.item: |
911 self.parser.error(_("You need to specify an item to delete")) | 827 self.parser.error(_("You need to specify an item to delete")) |
912 if not self.args.force: | 828 if not self.args.force: |
913 message = _("Are you sure to delete item {item_id} ?").format( | 829 message = _("Are you sure to delete item {item_id} ?").format( |
914 item_id=self.args.item | 830 item_id=self.args.item |
915 ) | 831 ) |
916 self.host.confirmOrQuit(message, _("item deletion cancelled")) | 832 await self.host.confirmOrQuit(message, _("item deletion cancelled")) |
917 self.host.bridge.psRetractItem( | 833 try: |
918 self.args.service, | 834 await self.host.bridge.psRetractItem( |
919 self.args.node, | 835 self.args.service, |
920 self.args.item, | 836 self.args.node, |
921 self.args.notify, | 837 self.args.item, |
922 self.profile, | 838 self.args.notify, |
923 callback=self.psItemsDeleteCb, | 839 self.profile, |
924 errback=partial( | 840 ) |
925 self.errback, | 841 except Exception as e: |
926 msg=_("can't delete item: {}"), | 842 self.disp(_(f"can't delete item: {e}"), error=True) |
927 exit_code=C.EXIT_BRIDGE_ERRBACK, | 843 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
928 ), | 844 else: |
929 ) | 845 self.disp(_(f"item {self.args.item} has been deleted")) |
846 self.host.quit(C.EXIT_OK) | |
930 | 847 |
931 | 848 |
932 class Edit(base.CommandBase, common.BaseEdit): | 849 class Edit(base.CommandBase, common.BaseEdit): |
933 def __init__(self, host): | 850 def __init__(self, host): |
934 base.CommandBase.__init__( | 851 base.CommandBase.__init__( |
944 common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR) | 861 common.BaseEdit.__init__(self, self.host, PUBSUB_TMP_DIR) |
945 | 862 |
946 def add_parser_options(self): | 863 def add_parser_options(self): |
947 pass | 864 pass |
948 | 865 |
949 def edit(self, content_file_path, content_file_obj): | 866 async def publish(self, content): |
950 # we launch editor | 867 published_id = await self.host.bridge.psItemSend( |
951 self.runEditor("pubsub_editor_args", content_file_path, content_file_obj) | |
952 | |
953 def publish(self, content): | |
954 published_id = self.host.bridge.psItemSend( | |
955 self.pubsub_service, | 868 self.pubsub_service, |
956 self.pubsub_node, | 869 self.pubsub_node, |
957 content, | 870 content, |
958 self.pubsub_item or "", | 871 self.pubsub_item or "", |
959 {}, | 872 {}, |
962 if published_id: | 875 if published_id: |
963 self.disp("Item published at {pub_id}".format(pub_id=published_id)) | 876 self.disp("Item published at {pub_id}".format(pub_id=published_id)) |
964 else: | 877 else: |
965 self.disp("Item published") | 878 self.disp("Item published") |
966 | 879 |
967 def getItemData(self, service, node, item): | 880 async def getItemData(self, service, node, item): |
968 try: | 881 try: |
969 from lxml import etree | 882 from lxml import etree |
970 except ImportError: | 883 except ImportError: |
971 self.disp('lxml module must be installed to use edit, please install it ' | 884 self.disp('lxml module must be installed to use edit, please install it ' |
972 'with "pip install lxml"', | 885 'with "pip install lxml"', |
973 error=True, | 886 error=True, |
974 ) | 887 ) |
975 self.host.quit(1) | 888 self.host.quit(1) |
976 items = [item] if item else [] | 889 items = [item] if item else [] |
977 item_raw = self.host.bridge.psItemsGet( | 890 item_raw = (await self.host.bridge.psItemsGet( |
978 service, node, 1, items, "", {}, self.profile | 891 service, node, 1, items, "", {}, self.profile |
979 )[0][0] | 892 ))[0][0] |
980 parser = etree.XMLParser(remove_blank_text=True) | 893 parser = etree.XMLParser(remove_blank_text=True, recover=True) |
981 item_elt = etree.fromstring(item_raw, parser) | 894 item_elt = etree.fromstring(item_raw, parser) |
982 item_id = item_elt.get("id") | 895 item_id = item_elt.get("id") |
983 try: | 896 try: |
984 payload = item_elt[0] | 897 payload = item_elt[0] |
985 except IndexError: | 898 except IndexError: |
986 self.disp(_("Item has not payload"), 1) | 899 self.disp(_("Item has not payload"), 1) |
987 return "" | 900 return "" |
988 return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id | 901 return etree.tostring(payload, encoding="unicode", pretty_print=True), item_id |
989 | 902 |
990 def start(self): | 903 async def start(self): |
991 self.pubsub_service, self.pubsub_node, self.pubsub_item, content_file_path, content_file_obj = ( | 904 (self.pubsub_service, |
992 self.getItemPath() | 905 self.pubsub_node, |
993 ) | 906 self.pubsub_item, |
994 self.edit(content_file_path, content_file_obj) | 907 content_file_path, |
908 content_file_obj) = await self.getItemPath() | |
909 await self.runEditor("pubsub_editor_args", content_file_path, content_file_obj) | |
910 self.host.quit() | |
995 | 911 |
996 | 912 |
997 class Subscribe(base.CommandBase): | 913 class Subscribe(base.CommandBase): |
998 def __init__(self, host): | 914 def __init__(self, host): |
999 base.CommandBase.__init__( | 915 base.CommandBase.__init__( |
1003 use_pubsub=True, | 919 use_pubsub=True, |
1004 pubsub_flags={C.NODE}, | 920 pubsub_flags={C.NODE}, |
1005 use_verbose=True, | 921 use_verbose=True, |
1006 help=_("subscribe to a node"), | 922 help=_("subscribe to a node"), |
1007 ) | 923 ) |
1008 self.need_loop = True | |
1009 | 924 |
1010 def add_parser_options(self): | 925 def add_parser_options(self): |
1011 pass | 926 pass |
1012 | 927 |
1013 def psSubscribeCb(self, sub_id): | 928 async def start(self): |
1014 self.disp(_("subscription done"), 1) | 929 try: |
1015 if sub_id: | 930 sub_id = await self.host.bridge.psSubscribe( |
1016 self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id)) | 931 self.args.service, |
1017 self.host.quit() | 932 self.args.node, |
1018 | 933 {}, |
1019 def start(self): | 934 self.profile, |
1020 self.host.bridge.psSubscribe( | 935 ) |
1021 self.args.service, | 936 except Exception as e: |
1022 self.args.node, | 937 self.disp(_(f"can't subscribe to node: {e}"), error=True) |
1023 {}, | 938 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1024 self.profile, | 939 else: |
1025 callback=self.psSubscribeCb, | 940 self.disp(_("subscription done"), 1) |
1026 errback=partial( | 941 if sub_id: |
1027 self.errback, | 942 self.disp(_("subscription id: {sub_id}").format(sub_id=sub_id)) |
1028 msg=_("can't subscribe to node: {}"), | 943 self.host.quit() |
1029 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1030 ), | |
1031 ) | |
1032 | 944 |
1033 | 945 |
1034 class Unsubscribe(base.CommandBase): | 946 class Unsubscribe(base.CommandBase): |
1035 # TODO: voir pourquoi NodeNotFound sur subscribe juste après unsubscribe | 947 # FIXME: check why we get a a NodeNotFound on subscribe just after unsubscribe |
1036 | 948 |
1037 def __init__(self, host): | 949 def __init__(self, host): |
1038 base.CommandBase.__init__( | 950 base.CommandBase.__init__( |
1039 self, | 951 self, |
1040 host, | 952 host, |
1042 use_pubsub=True, | 954 use_pubsub=True, |
1043 pubsub_flags={C.NODE}, | 955 pubsub_flags={C.NODE}, |
1044 use_verbose=True, | 956 use_verbose=True, |
1045 help=_("unsubscribe from a node"), | 957 help=_("unsubscribe from a node"), |
1046 ) | 958 ) |
1047 self.need_loop = True | |
1048 | 959 |
1049 def add_parser_options(self): | 960 def add_parser_options(self): |
1050 pass | 961 pass |
1051 | 962 |
1052 def psUnsubscribeCb(self): | 963 async def start(self): |
1053 self.disp(_("subscription removed"), 1) | 964 try: |
1054 self.host.quit() | 965 await self.host.bridge.psUnsubscribe( |
1055 | 966 self.args.service, |
1056 def start(self): | 967 self.args.node, |
1057 self.host.bridge.psUnsubscribe( | 968 self.profile, |
1058 self.args.service, | 969 ) |
1059 self.args.node, | 970 except Exception as e: |
1060 self.profile, | 971 self.disp(_(f"can't unsubscribe from node: {e}"), error=True) |
1061 callback=self.psUnsubscribeCb, | 972 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1062 errback=partial( | 973 else: |
1063 self.errback, | 974 self.disp(_("subscription removed"), 1) |
1064 msg=_("can't unsubscribe from node: {}"), | 975 self.host.quit() |
1065 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1066 ), | |
1067 ) | |
1068 | 976 |
1069 | 977 |
1070 class Subscriptions(base.CommandBase): | 978 class Subscriptions(base.CommandBase): |
1071 def __init__(self, host): | 979 def __init__(self, host): |
1072 base.CommandBase.__init__( | 980 base.CommandBase.__init__( |
1075 "subscriptions", | 983 "subscriptions", |
1076 use_output=C.OUTPUT_LIST_DICT, | 984 use_output=C.OUTPUT_LIST_DICT, |
1077 use_pubsub=True, | 985 use_pubsub=True, |
1078 help=_("retrieve all subscriptions on a service"), | 986 help=_("retrieve all subscriptions on a service"), |
1079 ) | 987 ) |
1080 self.need_loop = True | |
1081 | 988 |
1082 def add_parser_options(self): | 989 def add_parser_options(self): |
1083 pass | 990 pass |
1084 | 991 |
1085 def psSubscriptionsGetCb(self, subscriptions): | 992 async def start(self): |
1086 self.output(subscriptions) | 993 try: |
1087 self.host.quit() | 994 subscriptions = await self.host.bridge.psSubscriptionsGet( |
1088 | 995 self.args.service, |
1089 def start(self): | 996 self.args.node, |
1090 self.host.bridge.psSubscriptionsGet( | 997 self.profile, |
1091 self.args.service, | 998 ) |
1092 self.args.node, | 999 except Exception as e: |
1093 self.profile, | 1000 self.disp(_(f"can't retrieve subscriptions: {e}"), error=True) |
1094 callback=self.psSubscriptionsGetCb, | 1001 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1095 errback=partial( | 1002 else: |
1096 self.errback, | 1003 await self.output(subscriptions) |
1097 msg=_("can't retrieve subscriptions: {}"), | 1004 self.host.quit() |
1098 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1099 ), | |
1100 ) | |
1101 | 1005 |
1102 | 1006 |
1103 class Affiliations(base.CommandBase): | 1007 class Affiliations(base.CommandBase): |
1104 def __init__(self, host): | 1008 def __init__(self, host): |
1105 base.CommandBase.__init__( | 1009 base.CommandBase.__init__( |
1108 "affiliations", | 1012 "affiliations", |
1109 use_output=C.OUTPUT_DICT, | 1013 use_output=C.OUTPUT_DICT, |
1110 use_pubsub=True, | 1014 use_pubsub=True, |
1111 help=_("retrieve all affiliations on a service"), | 1015 help=_("retrieve all affiliations on a service"), |
1112 ) | 1016 ) |
1113 self.need_loop = True | |
1114 | 1017 |
1115 def add_parser_options(self): | 1018 def add_parser_options(self): |
1116 pass | 1019 pass |
1117 | 1020 |
1118 def psAffiliationsGetCb(self, affiliations): | 1021 async def start(self): |
1119 self.output(affiliations) | 1022 try: |
1120 self.host.quit() | 1023 affiliations = await self.host.bridge.psAffiliationsGet( |
1121 | 1024 self.args.service, |
1122 def psAffiliationsGetEb(self, failure_): | 1025 self.args.node, |
1123 self.disp( | 1026 self.profile, |
1124 "can't get node affiliations: {reason}".format(reason=failure_), error=True | 1027 ) |
1125 ) | 1028 except Exception as e: |
1126 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | 1029 self.disp( |
1127 | 1030 f"can't get node affiliations: {e}", error=True |
1128 def start(self): | 1031 ) |
1129 self.host.bridge.psAffiliationsGet( | 1032 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1130 self.args.service, | 1033 else: |
1131 self.args.node, | 1034 await self.output(affiliations) |
1132 self.profile, | 1035 self.host.quit() |
1133 callback=self.psAffiliationsGetCb, | |
1134 errback=self.psAffiliationsGetEb, | |
1135 ) | |
1136 | 1036 |
1137 | 1037 |
1138 class Search(base.CommandBase): | 1038 class Search(base.CommandBase): |
1139 """this command do a search without using MAM | 1039 """This command do a search without using MAM |
1140 | 1040 |
1141 This commands checks every items it finds by itself, | 1041 This commands checks every items it finds by itself, |
1142 so it may be heavy in resources both for server and client | 1042 so it may be heavy in resources both for server and client |
1143 """ | 1043 """ |
1144 | 1044 |
1157 use_pubsub=True, | 1057 use_pubsub=True, |
1158 pubsub_flags={C.MULTI_ITEMS, C.NO_MAX}, | 1058 pubsub_flags={C.MULTI_ITEMS, C.NO_MAX}, |
1159 use_verbose=True, | 1059 use_verbose=True, |
1160 help=_("search items corresponding to filters"), | 1060 help=_("search items corresponding to filters"), |
1161 ) | 1061 ) |
1162 self.need_loop = True | |
1163 | 1062 |
1164 @property | 1063 @property |
1165 def etree(self): | 1064 def etree(self): |
1166 """load lxml.etree only if needed""" | 1065 """load lxml.etree only if needed""" |
1167 if self._etree is None: | 1066 if self._etree is None: |
1315 choices=("print", "exec", "external"), | 1214 choices=("print", "exec", "external"), |
1316 help=_("action to do on found items (DEFAULT: print)"), | 1215 help=_("action to do on found items (DEFAULT: print)"), |
1317 ) | 1216 ) |
1318 self.parser.add_argument("command", nargs=argparse.REMAINDER) | 1217 self.parser.add_argument("command", nargs=argparse.REMAINDER) |
1319 | 1218 |
1320 def psItemsGetEb(self, failure_, service, node): | 1219 async def getItems(self, depth, service, node, items): |
1321 self.disp( | |
1322 "can't get pubsub items at {service} (node: {node}): {reason}".format( | |
1323 service=service, node=node, reason=failure_ | |
1324 ), | |
1325 error=True, | |
1326 ) | |
1327 self.to_get -= 1 | |
1328 | |
1329 def getItems(self, depth, service, node, items): | |
1330 search = partial(self.search, depth=depth) | |
1331 errback = partial(self.psItemsGetEb, service=service, node=node) | |
1332 self.host.bridge.psItemsGet( | |
1333 service, | |
1334 node, | |
1335 self.args.node_max, | |
1336 items, | |
1337 "", | |
1338 self.getPubsubExtra(), | |
1339 self.profile, | |
1340 callback=search, | |
1341 errback=errback, | |
1342 ) | |
1343 self.to_get += 1 | 1220 self.to_get += 1 |
1221 try: | |
1222 items_data = await self.host.bridge.psItemsGet( | |
1223 service, | |
1224 node, | |
1225 self.args.node_max, | |
1226 items, | |
1227 "", | |
1228 self.getPubsubExtra(), | |
1229 self.profile, | |
1230 ) | |
1231 except Exception as e: | |
1232 self.disp( | |
1233 f"can't get pubsub items at {service} (node: {node}): {e}", | |
1234 error=True, | |
1235 ) | |
1236 self.to_get -= 1 | |
1237 else: | |
1238 await self.search(items_data, depth) | |
1344 | 1239 |
1345 def _checkPubsubURL(self, match, found_nodes): | 1240 def _checkPubsubURL(self, match, found_nodes): |
1346 """check that the matched URL is an xmpp: one | 1241 """check that the matched URL is an xmpp: one |
1347 | 1242 |
1348 @param found_nodes(list[unicode]): found_nodes | 1243 @param found_nodes(list[unicode]): found_nodes |
1358 found_node = {"service": url_data["path"], "node": url_data["node"]} | 1253 found_node = {"service": url_data["path"], "node": url_data["node"]} |
1359 if "item" in url_data: | 1254 if "item" in url_data: |
1360 found_node["item"] = url_data["item"] | 1255 found_node["item"] = url_data["item"] |
1361 found_nodes.append(found_node) | 1256 found_nodes.append(found_node) |
1362 | 1257 |
1363 def getSubNodes(self, item, depth): | 1258 async def getSubNodes(self, item, depth): |
1364 """look for pubsub URIs in item, and getItems on the linked nodes""" | 1259 """look for pubsub URIs in item, and getItems on the linked nodes""" |
1365 found_nodes = [] | 1260 found_nodes = [] |
1366 checkURI = partial(self._checkPubsubURL, found_nodes=found_nodes) | 1261 checkURI = partial(self._checkPubsubURL, found_nodes=found_nodes) |
1367 strings.RE_URL.sub(checkURI, item) | 1262 strings.RE_URL.sub(checkURI, item) |
1368 for data in found_nodes: | 1263 for data in found_nodes: |
1369 self.getItems( | 1264 await self.getItems( |
1370 depth + 1, | 1265 depth + 1, |
1371 data["service"], | 1266 data["service"], |
1372 data["node"], | 1267 data["node"], |
1373 [data["item"]] if "item" in data else [], | 1268 [data["item"]] if "item" in data else [], |
1374 ) | 1269 ) |
1450 item = str(item_xml) | 1345 item = str(item_xml) |
1451 item_xml = None | 1346 item_xml = None |
1452 elif type_ == "python": | 1347 elif type_ == "python": |
1453 if item_xml is None: | 1348 if item_xml is None: |
1454 item_xml = self.parseXml(item) | 1349 item_xml = self.parseXml(item) |
1455 cmd_ns = {"item": item, "item_xml": item_xml} | 1350 cmd_ns = { |
1351 "etree": self.etree, | |
1352 "item": item, | |
1353 "item_xml": item_xml | |
1354 } | |
1456 try: | 1355 try: |
1457 keep = eval(value, cmd_ns) | 1356 keep = eval(value, cmd_ns) |
1458 except SyntaxError as e: | 1357 except SyntaxError as e: |
1459 self.disp(str(e), error=True) | 1358 self.disp(str(e), error=True) |
1460 self.host.quit(C.EXIT_BAD_ARG) | 1359 self.host.quit(C.EXIT_BAD_ARG) |
1481 if not keep: | 1380 if not keep: |
1482 return False, item | 1381 return False, item |
1483 | 1382 |
1484 return True, item | 1383 return True, item |
1485 | 1384 |
1486 def doItemAction(self, item, metadata): | 1385 async def doItemAction(self, item, metadata): |
1487 """called when item has been kepts and the action need to be done | 1386 """called when item has been kepts and the action need to be done |
1488 | 1387 |
1489 @param item(unicode): accepted item | 1388 @param item(unicode): accepted item |
1490 """ | 1389 """ |
1491 action = self.args.action | 1390 action = self.args.action |
1492 if action == "print" or self.host.verbosity > 0: | 1391 if action == "print" or self.host.verbosity > 0: |
1493 try: | 1392 try: |
1494 self.output(item) | 1393 await self.output(item) |
1495 except self.etree.XMLSyntaxError: | 1394 except self.etree.XMLSyntaxError: |
1496 # item is not valid XML, but a string | 1395 # item is not valid XML, but a string |
1497 # can happen when --only-matching is used | 1396 # can happen when --only-matching is used |
1498 self.disp(item) | 1397 self.disp(item) |
1499 if action in self.EXEC_ACTIONS: | 1398 if action in self.EXEC_ACTIONS: |
1519 command=" ".join([arg_tools.escape(a) for a in cmd_args]) | 1418 command=" ".join([arg_tools.escape(a) for a in cmd_args]) |
1520 ), | 1419 ), |
1521 2, | 1420 2, |
1522 ) | 1421 ) |
1523 if action == "exec": | 1422 if action == "exec": |
1524 ret = subprocess.call(cmd_args) | 1423 p = await asyncio.create_subprocess_exec(*cmd_args) |
1424 ret = await p.wait() | |
1525 else: | 1425 else: |
1526 p = subprocess.Popen(cmd_args, stdin=subprocess.PIPE) | 1426 p = await asyncio.create_subprocess_exec(*cmd_args, |
1527 p.communicate(item.encode("utf-8")) | 1427 stdin=subprocess.PIPE) |
1528 ret = p.wait() | 1428 await p.communicate(item.encode(sys.getfilesystemencoding())) |
1429 ret = p.returncode | |
1529 if ret != 0: | 1430 if ret != 0: |
1530 self.disp( | 1431 self.disp( |
1531 A.color( | 1432 A.color( |
1532 C.A_FAILURE, | 1433 C.A_FAILURE, |
1533 _("executed command failed with exit code {code}").format( | 1434 _(f"executed command failed with exit code {ret}"), |
1534 code=ret | |
1535 ), | |
1536 ) | 1435 ) |
1537 ) | 1436 ) |
1538 | 1437 |
1539 def search(self, items_data, depth): | 1438 async def search(self, items_data, depth): |
1540 """callback of getItems | 1439 """callback of getItems |
1541 | 1440 |
1542 this method filters items, get sub nodes if needed, | 1441 this method filters items, get sub nodes if needed, |
1543 do the requested action, and exit the command when everything is done | 1442 do the requested action, and exit the command when everything is done |
1544 @param items_data(tuple): result of getItems | 1443 @param items_data(tuple): result of getItems |
1546 0 for first node, 1 for first children, and so on | 1445 0 for first node, 1 for first children, and so on |
1547 """ | 1446 """ |
1548 items, metadata = items_data | 1447 items, metadata = items_data |
1549 for item in items: | 1448 for item in items: |
1550 if depth < self.args.max_depth: | 1449 if depth < self.args.max_depth: |
1551 self.getSubNodes(item, depth) | 1450 await self.getSubNodes(item, depth) |
1552 keep, item = self.filter(item) | 1451 keep, item = self.filter(item) |
1553 if not keep: | 1452 if not keep: |
1554 continue | 1453 continue |
1555 self.doItemAction(item, metadata) | 1454 await self.doItemAction(item, metadata) |
1556 | 1455 |
1557 # Â we check if we got all getItems results | 1456 # Â we check if we got all getItems results |
1558 self.to_get -= 1 | 1457 self.to_get -= 1 |
1559 if self.to_get == 0: | 1458 if self.to_get == 0: |
1560 # yes, we can quit | 1459 # yes, we can quit |
1561 self.host.quit() | 1460 self.host.quit() |
1562 assert self.to_get > 0 | 1461 assert self.to_get > 0 |
1563 | 1462 |
1564 def start(self): | 1463 async def start(self): |
1565 if self.args.command: | 1464 if self.args.command: |
1566 if self.args.action not in self.EXEC_ACTIONS: | 1465 if self.args.action not in self.EXEC_ACTIONS: |
1567 self.parser.error( | 1466 self.parser.error( |
1568 _("Command can only be used with {actions} actions").format( | 1467 _("Command can only be used with {actions} actions").format( |
1569 actions=", ".join(self.EXEC_ACTIONS) | 1468 actions=", ".join(self.EXEC_ACTIONS) |
1582 if self.args.filters is None: | 1481 if self.args.filters is None: |
1583 self.args.filters = [] | 1482 self.args.filters = [] |
1584 self.args.namespace = dict( | 1483 self.args.namespace = dict( |
1585 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] | 1484 self.args.namespace + [("pubsub", "http://jabber.org/protocol/pubsub")] |
1586 ) | 1485 ) |
1587 self.getItems(0, self.args.service, self.args.node, self.args.items) | 1486 await self.getItems(0, self.args.service, self.args.node, self.args.items) |
1588 | 1487 |
1589 | 1488 |
1590 class Transform(base.CommandBase): | 1489 class Transform(base.CommandBase): |
1591 def __init__(self, host): | 1490 def __init__(self, host): |
1592 base.CommandBase.__init__( | 1491 base.CommandBase.__init__( |
1595 "transform", | 1494 "transform", |
1596 use_pubsub=True, | 1495 use_pubsub=True, |
1597 pubsub_flags={C.NODE, C.MULTI_ITEMS}, | 1496 pubsub_flags={C.NODE, C.MULTI_ITEMS}, |
1598 help=_("modify items of a node using an external command/script"), | 1497 help=_("modify items of a node using an external command/script"), |
1599 ) | 1498 ) |
1600 self.need_loop = True | |
1601 | 1499 |
1602 def add_parser_options(self): | 1500 def add_parser_options(self): |
1603 self.parser.add_argument( | 1501 self.parser.add_argument( |
1604 "--apply", | 1502 "--apply", |
1605 action="store_true", | 1503 action="store_true", |
1628 help=_("path to the command to use. Will be called repetitivly with an " | 1526 help=_("path to the command to use. Will be called repetitivly with an " |
1629 "item as input. Output (full item XML) will be used as new one. " | 1527 "item as input. Output (full item XML) will be used as new one. " |
1630 'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), | 1528 'Return "DELETE" string to delete the item, and "SKIP" to ignore it'), |
1631 ) | 1529 ) |
1632 | 1530 |
1633 def psItemsSendCb(self, item_ids, metadata): | 1531 async def psItemsSendCb(self, item_ids, metadata): |
1634 if item_ids: | 1532 if item_ids: |
1635 self.disp(_('items published with ids {item_ids}').format( | 1533 self.disp(_('items published with ids {item_ids}').format( |
1636 item_ids=', '.join(item_ids))) | 1534 item_ids=', '.join(item_ids))) |
1637 else: | 1535 else: |
1638 self.disp(_('items published')) | 1536 self.disp(_('items published')) |
1639 if self.args.all: | 1537 if self.args.all: |
1640 return self.handleNextPage(metadata) | 1538 return await self.handleNextPage(metadata) |
1641 else: | 1539 else: |
1642 self.host.quit() | 1540 self.host.quit() |
1643 | 1541 |
1644 def handleNextPage(self, metadata): | 1542 async def handleNextPage(self, metadata): |
1645 """Retrieve new page through RSM or quit if we're in the last page | 1543 """Retrieve new page through RSM or quit if we're in the last page |
1646 | 1544 |
1647 use to handle --all option | 1545 use to handle --all option |
1648 @param metadata(dict): metadata as returned by psItemsGet | 1546 @param metadata(dict): metadata as returned by psItemsGet |
1649 """ | 1547 """ |
1670 ) | 1568 ) |
1671 ) | 1569 ) |
1672 | 1570 |
1673 extra = self.getPubsubExtra() | 1571 extra = self.getPubsubExtra() |
1674 extra['rsm_after'] = last | 1572 extra['rsm_after'] = last |
1675 self.host.bridge.psItemsGet( | 1573 try: |
1676 self.args.service, | 1574 ps_result = await self.host.bridge.psItemsGet( |
1677 self.args.node, | 1575 self.args.service, |
1678 self.args.rsm_max, | 1576 self.args.node, |
1679 self.args.items, | 1577 self.args.rsm_max, |
1680 "", | 1578 self.args.items, |
1681 extra, | 1579 "", |
1682 self.profile, | 1580 extra, |
1683 callback=self.psItemsGetCb, | 1581 self.profile, |
1684 errback=partial( | 1582 ) |
1685 self.errback, | 1583 except Exception as e: |
1686 msg=_("can't retrieve items: {}"), | 1584 self.disp( |
1687 exit_code=C.EXIT_BRIDGE_ERRBACK, | 1585 f"can't retrieve items: {e}", error=True |
1688 ), | 1586 ) |
1689 ) | 1587 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1690 | 1588 else: |
1691 def psItemsGetCb(self, ps_result): | 1589 await self.psItemsGetCb(ps_result) |
1590 | |
1591 async def psItemsGetCb(self, ps_result): | |
1692 items, metadata = ps_result | 1592 items, metadata = ps_result |
1593 encoding = 'utf-8' | |
1693 new_items = [] | 1594 new_items = [] |
1694 | 1595 |
1695 for item in items: | 1596 for item in items: |
1696 if self.check_duplicates: | 1597 if self.check_duplicates: |
1697 # this is used when we are not ordering by creation | 1598 # this is used when we are not ordering by creation |
1705 self.host.quit() | 1606 self.host.quit() |
1706 self.items_ids.append(item_id) | 1607 self.items_ids.append(item_id) |
1707 | 1608 |
1708 # we launch the command to filter the item | 1609 # we launch the command to filter the item |
1709 try: | 1610 try: |
1710 p = subprocess.Popen(self.args.command_path, stdin=subprocess.PIPE, | 1611 p = await asyncio.create_subprocess_exec( |
1711 stdout=subprocess.PIPE) | 1612 self.args.command_path, |
1613 stdin=subprocess.PIPE, | |
1614 stdout=subprocess.PIPE) | |
1712 except OSError as e: | 1615 except OSError as e: |
1713 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR | 1616 exit_code = C.EXIT_CMD_NOT_FOUND if e.errno == 2 else C.EXIT_ERROR |
1714 e = str(e).decode('utf-8', errors="ignore") | 1617 self.disp(f"Can't execute the command: {e}", error=True) |
1715 self.disp("Can't execute the command: {msg}".format(msg=e), error=True) | |
1716 self.host.quit(exit_code) | 1618 self.host.quit(exit_code) |
1717 cmd_std_out, cmd_std_err = p.communicate(item.encode("utf-8")) | 1619 encoding = "utf-8" |
1718 ret = p.wait() | 1620 cmd_std_out, cmd_std_err = await p.communicate(item.encode(encoding)) |
1621 ret = p.returncode | |
1719 if ret != 0: | 1622 if ret != 0: |
1720 self.disp("The command returned a non zero status while parsing the " | 1623 self.disp(f"The command returned a non zero status while parsing the " |
1721 "following item:\n\n{item}".format(item=item), error=True) | 1624 f"following item:\n\n{item}", error=True) |
1722 if self.args.ignore_errors: | 1625 if self.args.ignore_errors: |
1723 continue | 1626 continue |
1724 else: | 1627 else: |
1725 self.host.quit(C.EXIT_CMD_ERROR) | 1628 self.host.quit(C.EXIT_CMD_ERROR) |
1726 if cmd_std_err is not None: | 1629 if cmd_std_err is not None: |
1727 cmd_std_err = cmd_std_err.decode('utf-8', errors='ignore') | 1630 cmd_std_err = cmd_std_err.decode(encoding, errors='ignore') |
1728 self.disp(cmd_std_err, error=True) | 1631 self.disp(cmd_std_err, error=True) |
1729 cmd_std_out = cmd_std_out.strip() | 1632 cmd_std_out = cmd_std_out.decode(encoding).strip() |
1730 if cmd_std_out == "DELETE": | 1633 if cmd_std_out == "DELETE": |
1731 item_elt, __ = xml_tools.etreeParse(self, item) | 1634 item_elt, __ = xml_tools.etreeParse(self, item) |
1732 item_id = item_elt.get('id') | 1635 item_id = item_elt.get('id') |
1733 self.disp(_("Deleting item {item_id}").format(item_id=item_id)) | 1636 self.disp(_(f"Deleting item {item_id}")) |
1734 if self.args.apply: | 1637 if self.args.apply: |
1735 # FIXME: we don't wait for item to be retracted which can cause | 1638 try: |
1736 # trouble in case of error just before the end of the command | 1639 await self.host.bridge.psRetractItem( |
1737 # (the error message may be missed). | 1640 self.args.service, |
1738 # Once moved to Python 3, we must wait for it by using a | 1641 self.args.node, |
1739 # coroutine. | 1642 item_id, |
1740 self.host.bridge.psRetractItem( | 1643 False, |
1741 self.args.service, | 1644 self.profile, |
1742 self.args.node, | 1645 ) |
1743 item_id, | 1646 except Exception as e: |
1744 False, | 1647 self.disp( |
1745 self.profile, | 1648 f"can't delete item {item_id}: {e}", error=True |
1746 errback=partial( | 1649 ) |
1747 self.errback, | 1650 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1748 msg=_("can't delete item [%s]: {}" % item_id), | |
1749 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1750 ), | |
1751 ) | |
1752 continue | 1651 continue |
1753 elif cmd_std_out == "SKIP": | 1652 elif cmd_std_out == "SKIP": |
1754 item_elt, __ = xml_tools.etreeParse(self, item) | 1653 item_elt, __ = xml_tools.etreeParse(self, item) |
1755 item_id = item_elt.get('id') | 1654 item_id = item_elt.get('id') |
1756 self.disp(_("Skipping item {item_id}").format(item_id=item_id)) | 1655 self.disp(_("Skipping item {item_id}").format(item_id=item_id)) |
1772 new_items.append(etree.tostring(element, encoding="unicode")) | 1671 new_items.append(etree.tostring(element, encoding="unicode")) |
1773 | 1672 |
1774 if not self.args.apply: | 1673 if not self.args.apply: |
1775 # on dry run we have nothing to wait for, we can quit | 1674 # on dry run we have nothing to wait for, we can quit |
1776 if self.args.all: | 1675 if self.args.all: |
1777 return self.handleNextPage(metadata) | 1676 return await self.handleNextPage(metadata) |
1778 self.host.quit() | 1677 self.host.quit() |
1779 else: | 1678 else: |
1780 if self.args.admin: | 1679 if self.args.admin: |
1781 self.host.bridge.psAdminItemsSend( | 1680 bridge_method = self.host.bridge.psAdminItemsSend |
1681 else: | |
1682 bridge_method = self.host.bridge.psItemsSend | |
1683 | |
1684 try: | |
1685 ps_result = await bridge_method( | |
1782 self.args.service, | 1686 self.args.service, |
1783 self.args.node, | 1687 self.args.node, |
1784 new_items, | 1688 new_items, |
1785 "", | 1689 "", |
1786 self.profile, | 1690 self.profile, |
1787 callback=partial(self.psItemsSendCb, metadata=metadata), | |
1788 errback=partial( | |
1789 self.errback, | |
1790 msg=_("can't send item: {}"), | |
1791 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1792 ), | |
1793 ) | 1691 ) |
1692 except Exception as e: | |
1693 self.disp(f"can't send item: {e}", error=True) | |
1694 self.host.quit(C.EXIT_BRIDGE_ERRBACK) | |
1794 else: | 1695 else: |
1795 self.host.bridge.psItemsSend( | 1696 await self.psItemsSendCb(ps_result, metadata=metadata) |
1796 self.args.service, | 1697 |
1797 self.args.node, | 1698 async def start(self): |
1798 new_items, | |
1799 "", | |
1800 self.profile, | |
1801 callback=partial(self.psItemsSendCb, metadata=metadata), | |
1802 errback=partial( | |
1803 self.errback, | |
1804 msg=_("can't send item: {}"), | |
1805 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
1806 ), | |
1807 ) | |
1808 | |
1809 def start(self): | |
1810 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: | 1699 if self.args.all and self.args.order_by != C.ORDER_BY_CREATION: |
1811 self.check_duplicates = True | 1700 self.check_duplicates = True |
1812 self.items_ids = [] | 1701 self.items_ids = [] |
1813 self.disp(A.color( | 1702 self.disp(A.color( |
1814 A.FG_RED, A.BOLD, | 1703 A.FG_RED, A.BOLD, |
1817 "We'll update items, so order may change during transformation,\n" | 1706 "We'll update items, so order may change during transformation,\n" |
1818 "we'll try to mitigate that by stopping on first duplicate,\n" | 1707 "we'll try to mitigate that by stopping on first duplicate,\n" |
1819 "but this method is not safe, and some items may be missed.\n---\n")) | 1708 "but this method is not safe, and some items may be missed.\n---\n")) |
1820 else: | 1709 else: |
1821 self.check_duplicates = False | 1710 self.check_duplicates = False |
1822 self.host.bridge.psItemsGet( | 1711 |
1823 self.args.service, | 1712 try: |
1824 self.args.node, | 1713 ps_result = await self.host.bridge.psItemsGet( |
1825 self.args.max, | 1714 self.args.service, |
1826 self.args.items, | 1715 self.args.node, |
1827 "", | 1716 self.args.max, |
1828 self.getPubsubExtra(), | 1717 self.args.items, |
1829 self.profile, | 1718 "", |
1830 callback=self.psItemsGetCb, | 1719 self.getPubsubExtra(), |
1831 errback=partial( | 1720 self.profile, |
1832 self.errback, | 1721 ) |
1833 msg=_("can't retrieve items: {}"), | 1722 except Exception as e: |
1834 exit_code=C.EXIT_BRIDGE_ERRBACK, | 1723 self.disp(f"can't retrieve items: {e}", error=True) |
1835 ), | 1724 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1836 ) | 1725 else: |
1726 await self.psItemsGetCb(ps_result) | |
1837 | 1727 |
1838 | 1728 |
1839 class Uri(base.CommandBase): | 1729 class Uri(base.CommandBase): |
1840 def __init__(self, host): | 1730 def __init__(self, host): |
1841 base.CommandBase.__init__( | 1731 base.CommandBase.__init__( |
1845 use_profile=False, | 1735 use_profile=False, |
1846 use_pubsub=True, | 1736 use_pubsub=True, |
1847 pubsub_flags={C.NODE, C.SINGLE_ITEM}, | 1737 pubsub_flags={C.NODE, C.SINGLE_ITEM}, |
1848 help=_("build URI"), | 1738 help=_("build URI"), |
1849 ) | 1739 ) |
1850 self.need_loop = True | |
1851 | 1740 |
1852 def add_parser_options(self): | 1741 def add_parser_options(self): |
1853 self.parser.add_argument( | 1742 self.parser.add_argument( |
1854 "-p", | 1743 "-p", |
1855 "--profile", | 1744 "--profile", |
1869 if value: | 1758 if value: |
1870 uri_args[key] = value | 1759 uri_args[key] = value |
1871 self.disp(uri.buildXMPPUri("pubsub", **uri_args)) | 1760 self.disp(uri.buildXMPPUri("pubsub", **uri_args)) |
1872 self.host.quit() | 1761 self.host.quit() |
1873 | 1762 |
1874 def start(self): | 1763 async def start(self): |
1875 if not self.args.service: | 1764 if not self.args.service: |
1876 self.host.bridge.asyncGetParamA( | 1765 try: |
1877 "JabberID", | 1766 jid_ = await self.host.bridge.asyncGetParamA( |
1878 "Connection", | 1767 "JabberID", |
1879 profile_key=self.args.profile, | 1768 "Connection", |
1880 callback=self.display_uri, | 1769 profile_key=self.args.profile |
1881 errback=partial( | 1770 ) |
1882 self.errback, | 1771 except Exception as e: |
1883 msg=_("can't retrieve jid: {}"), | 1772 self.disp(f"can't retrieve jid: {e}", error=True) |
1884 exit_code=C.EXIT_BRIDGE_ERRBACK, | 1773 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1885 ), | 1774 else: |
1886 ) | 1775 self.display_uri(jid_) |
1887 else: | 1776 else: |
1888 self.display_uri(None) | 1777 self.display_uri(None) |
1889 | 1778 |
1890 | 1779 |
1891 class HookCreate(base.CommandBase): | 1780 class HookCreate(base.CommandBase): |
1896 "create", | 1785 "create", |
1897 use_pubsub=True, | 1786 use_pubsub=True, |
1898 pubsub_flags={C.NODE}, | 1787 pubsub_flags={C.NODE}, |
1899 help=_("create a Pubsub hook"), | 1788 help=_("create a Pubsub hook"), |
1900 ) | 1789 ) |
1901 self.need_loop = True | |
1902 | 1790 |
1903 def add_parser_options(self): | 1791 def add_parser_options(self): |
1904 self.parser.add_argument( | 1792 self.parser.add_argument( |
1905 "-t", | 1793 "-t", |
1906 "--type", | 1794 "--type", |
1926 if not os.path.isfile(self.args.hook_arg): | 1814 if not os.path.isfile(self.args.hook_arg): |
1927 self.parser.error( | 1815 self.parser.error( |
1928 _("{path} is not a file").format(path=self.args.hook_arg) | 1816 _("{path} is not a file").format(path=self.args.hook_arg) |
1929 ) | 1817 ) |
1930 | 1818 |
1931 def start(self): | 1819 async def start(self): |
1932 self.checkArgs(self) | 1820 self.checkArgs(self) |
1933 self.host.bridge.psHookAdd( | 1821 try: |
1934 self.args.service, | 1822 await self.host.bridge.psHookAdd( |
1935 self.args.node, | 1823 self.args.service, |
1936 self.args.type, | 1824 self.args.node, |
1937 self.args.hook_arg, | 1825 self.args.type, |
1938 self.args.persistent, | 1826 self.args.hook_arg, |
1939 self.profile, | 1827 self.args.persistent, |
1940 callback=self.host.quit, | 1828 self.profile, |
1941 errback=partial( | 1829 ) |
1942 self.errback, | 1830 except Exception as e: |
1943 msg=_("can't create hook: {}"), | 1831 self.disp(f"can't create hook: {e}", error=True) |
1944 exit_code=C.EXIT_BRIDGE_ERRBACK, | 1832 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1945 ), | 1833 else: |
1946 ) | 1834 self.host.quit() |
1947 | 1835 |
1948 | 1836 |
1949 class HookDelete(base.CommandBase): | 1837 class HookDelete(base.CommandBase): |
1950 def __init__(self, host): | 1838 def __init__(self, host): |
1951 base.CommandBase.__init__( | 1839 base.CommandBase.__init__( |
1954 "delete", | 1842 "delete", |
1955 use_pubsub=True, | 1843 use_pubsub=True, |
1956 pubsub_flags={C.NODE}, | 1844 pubsub_flags={C.NODE}, |
1957 help=_("delete a Pubsub hook"), | 1845 help=_("delete a Pubsub hook"), |
1958 ) | 1846 ) |
1959 self.need_loop = True | |
1960 | 1847 |
1961 def add_parser_options(self): | 1848 def add_parser_options(self): |
1962 self.parser.add_argument( | 1849 self.parser.add_argument( |
1963 "-t", | 1850 "-t", |
1964 "--type", | 1851 "--type", |
1974 help=_( | 1861 help=_( |
1975 "argument of the hook to remove, empty to remove all (DEFAULT: remove all)" | 1862 "argument of the hook to remove, empty to remove all (DEFAULT: remove all)" |
1976 ), | 1863 ), |
1977 ) | 1864 ) |
1978 | 1865 |
1979 def psHookRemoveCb(self, nb_deleted): | 1866 async def start(self): |
1980 self.disp( | |
1981 _("{nb_deleted} hook(s) have been deleted").format(nb_deleted=nb_deleted) | |
1982 ) | |
1983 self.host.quit() | |
1984 | |
1985 def start(self): | |
1986 HookCreate.checkArgs(self) | 1867 HookCreate.checkArgs(self) |
1987 self.host.bridge.psHookRemove( | 1868 try: |
1988 self.args.service, | 1869 nb_deleted = await self.host.bridge.psHookRemove( |
1989 self.args.node, | 1870 self.args.service, |
1990 self.args.type, | 1871 self.args.node, |
1991 self.args.hook_arg, | 1872 self.args.type, |
1992 self.profile, | 1873 self.args.hook_arg, |
1993 callback=self.psHookRemoveCb, | 1874 self.profile, |
1994 errback=partial( | 1875 ) |
1995 self.errback, | 1876 except Exception as e: |
1996 msg=_("can't delete hook: {}"), | 1877 self.disp(f"can't delete hook: {e}", error=True) |
1997 exit_code=C.EXIT_BRIDGE_ERRBACK, | 1878 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
1998 ), | 1879 else: |
1999 ) | 1880 self.disp(_(f"{nb_deleted} hook(s) have been deleted")) |
1881 self.host.quit() | |
2000 | 1882 |
2001 | 1883 |
2002 class HookList(base.CommandBase): | 1884 class HookList(base.CommandBase): |
2003 def __init__(self, host): | 1885 def __init__(self, host): |
2004 base.CommandBase.__init__( | 1886 base.CommandBase.__init__( |
2006 host, | 1888 host, |
2007 "list", | 1889 "list", |
2008 use_output=C.OUTPUT_LIST_DICT, | 1890 use_output=C.OUTPUT_LIST_DICT, |
2009 help=_("list hooks of a profile"), | 1891 help=_("list hooks of a profile"), |
2010 ) | 1892 ) |
2011 self.need_loop = True | |
2012 | 1893 |
2013 def add_parser_options(self): | 1894 def add_parser_options(self): |
2014 pass | 1895 pass |
2015 | 1896 |
2016 def psHookListCb(self, data): | 1897 async def start(self): |
2017 if not data: | 1898 try: |
2018 self.disp(_("No hook found.")) | 1899 data = await self.host.bridge.psHookList( |
2019 self.output(data) | 1900 self.profile, |
2020 self.host.quit() | 1901 ) |
2021 | 1902 except Exception as e: |
2022 def start(self): | 1903 self.disp(f"can't list hooks: {e}", error=True) |
2023 self.host.bridge.psHookList( | 1904 self.host.quit(C.EXIT_BRIDGE_ERRBACK) |
2024 self.profile, | 1905 else: |
2025 callback=self.psHookListCb, | 1906 if not data: |
2026 errback=partial( | 1907 self.disp(_("No hook found.")) |
2027 self.errback, | 1908 await self.output(data) |
2028 msg=_("can't list hooks: {}"), | 1909 self.host.quit() |
2029 exit_code=C.EXIT_BRIDGE_ERRBACK, | |
2030 ), | |
2031 ) | |
2032 | 1910 |
2033 | 1911 |
2034 class Hook(base.CommandBase): | 1912 class Hook(base.CommandBase): |
2035 subcommands = (HookCreate, HookDelete, HookList) | 1913 subcommands = (HookCreate, HookDelete, HookList) |
2036 | 1914 |