comparison sat/plugins/plugin_xep_0060.py @ 4037:524856bd7b19

massive refactoring to switch from camelCase to snake_case: historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a pre-PEP8 code, to use the same coding style as in Twisted. However, snake_case is more readable and it's better to follow PEP8 best practices, so it has been decided to move on full snake_case. Because Libervia has a huge codebase, this ended with a ugly mix of camelCase and snake_case. To fix that, this patch does a big refactoring by renaming every function and method (including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case. This is a massive change, and may result in some bugs.
author Goffi <goffi@goffi.org>
date Sat, 08 Apr 2023 13:54:42 +0200
parents 9456852d3286
children
comparison
equal deleted inserted replaced
4036:c4464d7ae97b 4037:524856bd7b19
109 self.host = host 109 self.host = host
110 self._rsm = host.plugins.get("XEP-0059") 110 self._rsm = host.plugins.get("XEP-0059")
111 self._mam = host.plugins.get("XEP-0313") 111 self._mam = host.plugins.get("XEP-0313")
112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) 112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks)
113 self.rt_sessions = sat_defer.RTDeferredSessions() 113 self.rt_sessions = sat_defer.RTDeferredSessions()
114 host.bridge.addMethod( 114 host.bridge.add_method(
115 "psNodeCreate", 115 "ps_node_create",
116 ".plugin", 116 ".plugin",
117 in_sign="ssa{ss}s", 117 in_sign="ssa{ss}s",
118 out_sign="s", 118 out_sign="s",
119 method=self._createNode, 119 method=self._create_node,
120 async_=True, 120 async_=True,
121 ) 121 )
122 host.bridge.addMethod( 122 host.bridge.add_method(
123 "psNodeConfigurationGet", 123 "ps_node_configuration_get",
124 ".plugin", 124 ".plugin",
125 in_sign="sss", 125 in_sign="sss",
126 out_sign="a{ss}", 126 out_sign="a{ss}",
127 method=self._getNodeConfiguration, 127 method=self._get_node_configuration,
128 async_=True, 128 async_=True,
129 ) 129 )
130 host.bridge.addMethod( 130 host.bridge.add_method(
131 "psNodeConfigurationSet", 131 "ps_node_configuration_set",
132 ".plugin", 132 ".plugin",
133 in_sign="ssa{ss}s", 133 in_sign="ssa{ss}s",
134 out_sign="", 134 out_sign="",
135 method=self._setNodeConfiguration, 135 method=self._set_node_configuration,
136 async_=True, 136 async_=True,
137 ) 137 )
138 host.bridge.addMethod( 138 host.bridge.add_method(
139 "psNodeAffiliationsGet", 139 "ps_node_affiliations_get",
140 ".plugin", 140 ".plugin",
141 in_sign="sss", 141 in_sign="sss",
142 out_sign="a{ss}", 142 out_sign="a{ss}",
143 method=self._getNodeAffiliations, 143 method=self._get_node_affiliations,
144 async_=True, 144 async_=True,
145 ) 145 )
146 host.bridge.addMethod( 146 host.bridge.add_method(
147 "psNodeAffiliationsSet", 147 "ps_node_affiliations_set",
148 ".plugin", 148 ".plugin",
149 in_sign="ssa{ss}s", 149 in_sign="ssa{ss}s",
150 out_sign="", 150 out_sign="",
151 method=self._setNodeAffiliations, 151 method=self._set_node_affiliations,
152 async_=True, 152 async_=True,
153 ) 153 )
154 host.bridge.addMethod( 154 host.bridge.add_method(
155 "psNodeSubscriptionsGet", 155 "ps_node_subscriptions_get",
156 ".plugin", 156 ".plugin",
157 in_sign="sss", 157 in_sign="sss",
158 out_sign="a{ss}", 158 out_sign="a{ss}",
159 method=self._getNodeSubscriptions, 159 method=self._get_node_subscriptions,
160 async_=True, 160 async_=True,
161 ) 161 )
162 host.bridge.addMethod( 162 host.bridge.add_method(
163 "psNodeSubscriptionsSet", 163 "ps_node_subscriptions_set",
164 ".plugin", 164 ".plugin",
165 in_sign="ssa{ss}s", 165 in_sign="ssa{ss}s",
166 out_sign="", 166 out_sign="",
167 method=self._setNodeSubscriptions, 167 method=self._set_node_subscriptions,
168 async_=True, 168 async_=True,
169 ) 169 )
170 host.bridge.addMethod( 170 host.bridge.add_method(
171 "psNodePurge", 171 "ps_node_purge",
172 ".plugin", 172 ".plugin",
173 in_sign="sss", 173 in_sign="sss",
174 out_sign="", 174 out_sign="",
175 method=self._purgeNode, 175 method=self._purge_node,
176 async_=True, 176 async_=True,
177 ) 177 )
178 host.bridge.addMethod( 178 host.bridge.add_method(
179 "psNodeDelete", 179 "ps_node_delete",
180 ".plugin", 180 ".plugin",
181 in_sign="sss", 181 in_sign="sss",
182 out_sign="", 182 out_sign="",
183 method=self._deleteNode, 183 method=self._delete_node,
184 async_=True, 184 async_=True,
185 ) 185 )
186 host.bridge.addMethod( 186 host.bridge.add_method(
187 "psNodeWatchAdd", 187 "ps_node_watch_add",
188 ".plugin", 188 ".plugin",
189 in_sign="sss", 189 in_sign="sss",
190 out_sign="", 190 out_sign="",
191 method=self._addWatch, 191 method=self._addWatch,
192 async_=False, 192 async_=False,
193 ) 193 )
194 host.bridge.addMethod( 194 host.bridge.add_method(
195 "psNodeWatchRemove", 195 "ps_node_watch_remove",
196 ".plugin", 196 ".plugin",
197 in_sign="sss", 197 in_sign="sss",
198 out_sign="", 198 out_sign="",
199 method=self._removeWatch, 199 method=self._remove_watch,
200 async_=False, 200 async_=False,
201 ) 201 )
202 host.bridge.addMethod( 202 host.bridge.add_method(
203 "psAffiliationsGet", 203 "ps_affiliations_get",
204 ".plugin", 204 ".plugin",
205 in_sign="sss", 205 in_sign="sss",
206 out_sign="a{ss}", 206 out_sign="a{ss}",
207 method=self._getAffiliations, 207 method=self._get_affiliations,
208 async_=True, 208 async_=True,
209 ) 209 )
210 host.bridge.addMethod( 210 host.bridge.add_method(
211 "psItemsGet", 211 "ps_items_get",
212 ".plugin", 212 ".plugin",
213 in_sign="ssiassss", 213 in_sign="ssiassss",
214 out_sign="s", 214 out_sign="s",
215 method=self._getItems, 215 method=self._get_items,
216 async_=True, 216 async_=True,
217 ) 217 )
218 host.bridge.addMethod( 218 host.bridge.add_method(
219 "psItemSend", 219 "ps_item_send",
220 ".plugin", 220 ".plugin",
221 in_sign="ssssss", 221 in_sign="ssssss",
222 out_sign="s", 222 out_sign="s",
223 method=self._sendItem, 223 method=self._send_item,
224 async_=True, 224 async_=True,
225 ) 225 )
226 host.bridge.addMethod( 226 host.bridge.add_method(
227 "psItemsSend", 227 "ps_items_send",
228 ".plugin", 228 ".plugin",
229 in_sign="ssasss", 229 in_sign="ssasss",
230 out_sign="as", 230 out_sign="as",
231 method=self._sendItems, 231 method=self._send_items,
232 async_=True, 232 async_=True,
233 ) 233 )
234 host.bridge.addMethod( 234 host.bridge.add_method(
235 "psItemRetract", 235 "ps_item_retract",
236 ".plugin", 236 ".plugin",
237 in_sign="sssbs", 237 in_sign="sssbs",
238 out_sign="", 238 out_sign="",
239 method=self._retractItem, 239 method=self._retract_item,
240 async_=True, 240 async_=True,
241 ) 241 )
242 host.bridge.addMethod( 242 host.bridge.add_method(
243 "psItemsRetract", 243 "ps_items_retract",
244 ".plugin", 244 ".plugin",
245 in_sign="ssasbs", 245 in_sign="ssasbs",
246 out_sign="", 246 out_sign="",
247 method=self._retractItems, 247 method=self._retract_items,
248 async_=True, 248 async_=True,
249 ) 249 )
250 host.bridge.addMethod( 250 host.bridge.add_method(
251 "psItemRename", 251 "ps_item_rename",
252 ".plugin", 252 ".plugin",
253 in_sign="sssss", 253 in_sign="sssss",
254 out_sign="", 254 out_sign="",
255 method=self._renameItem, 255 method=self._rename_item,
256 async_=True, 256 async_=True,
257 ) 257 )
258 host.bridge.addMethod( 258 host.bridge.add_method(
259 "psSubscribe", 259 "ps_subscribe",
260 ".plugin", 260 ".plugin",
261 in_sign="ssss", 261 in_sign="ssss",
262 out_sign="s", 262 out_sign="s",
263 method=self._subscribe, 263 method=self._subscribe,
264 async_=True, 264 async_=True,
265 ) 265 )
266 host.bridge.addMethod( 266 host.bridge.add_method(
267 "psUnsubscribe", 267 "ps_unsubscribe",
268 ".plugin", 268 ".plugin",
269 in_sign="sss", 269 in_sign="sss",
270 out_sign="", 270 out_sign="",
271 method=self._unsubscribe, 271 method=self._unsubscribe,
272 async_=True, 272 async_=True,
273 ) 273 )
274 host.bridge.addMethod( 274 host.bridge.add_method(
275 "psSubscriptionsGet", 275 "ps_subscriptions_get",
276 ".plugin", 276 ".plugin",
277 in_sign="sss", 277 in_sign="sss",
278 out_sign="s", 278 out_sign="s",
279 method=self._subscriptions, 279 method=self._subscriptions,
280 async_=True, 280 async_=True,
281 ) 281 )
282 host.bridge.addMethod( 282 host.bridge.add_method(
283 "psSubscribeToMany", 283 "ps_subscribe_to_many",
284 ".plugin", 284 ".plugin",
285 in_sign="a(ss)sa{ss}s", 285 in_sign="a(ss)sa{ss}s",
286 out_sign="s", 286 out_sign="s",
287 method=self._subscribeToMany, 287 method=self._subscribe_to_many,
288 ) 288 )
289 host.bridge.addMethod( 289 host.bridge.add_method(
290 "psGetSubscribeRTResult", 290 "ps_get_subscribe_rt_result",
291 ".plugin", 291 ".plugin",
292 in_sign="ss", 292 in_sign="ss",
293 out_sign="(ua(sss))", 293 out_sign="(ua(sss))",
294 method=self._manySubscribeRTResult, 294 method=self._many_subscribe_rt_result,
295 async_=True, 295 async_=True,
296 ) 296 )
297 host.bridge.addMethod( 297 host.bridge.add_method(
298 "psGetFromMany", 298 "ps_get_from_many",
299 ".plugin", 299 ".plugin",
300 in_sign="a(ss)iss", 300 in_sign="a(ss)iss",
301 out_sign="s", 301 out_sign="s",
302 method=self._getFromMany, 302 method=self._get_from_many,
303 ) 303 )
304 host.bridge.addMethod( 304 host.bridge.add_method(
305 "psGetFromManyRTResult", 305 "ps_get_from_many_rt_result",
306 ".plugin", 306 ".plugin",
307 in_sign="ss", 307 in_sign="ss",
308 out_sign="(ua(sssasa{ss}))", 308 out_sign="(ua(sssasa{ss}))",
309 method=self._getFromManyRTResult, 309 method=self._get_from_many_rt_result,
310 async_=True, 310 async_=True,
311 ) 311 )
312 312
313 #  high level observer method 313 #  high level observer method
314 host.bridge.addSignal( 314 host.bridge.add_signal(
315 "psEvent", ".plugin", signature="ssssss" 315 "ps_event", ".plugin", signature="ssssss"
316 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile 316 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile
317 317
318 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) 318 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods)
319 host.bridge.addSignal( 319 host.bridge.add_signal(
320 "psEventRaw", ".plugin", signature="sssass" 320 "ps_event_raw", ".plugin", signature="sssass"
321 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile 321 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile
322 322
323 def getHandler(self, client): 323 def get_handler(self, client):
324 client.pubsub_client = SatPubSubClient(self.host, self) 324 client.pubsub_client = SatPubSubClient(self.host, self)
325 return client.pubsub_client 325 return client.pubsub_client
326 326
327 async def profileConnected(self, client): 327 async def profile_connected(self, client):
328 client.pubsub_watching = set() 328 client.pubsub_watching = set()
329 try: 329 try:
330 client.pubsub_service = jid.JID( 330 client.pubsub_service = jid.JID(
331 self.host.memory.getConfig("", "pubsub_service") 331 self.host.memory.config_get("", "pubsub_service")
332 ) 332 )
333 except RuntimeError: 333 except RuntimeError:
334 log.info( 334 log.info(
335 _( 335 _(
336 "Can't retrieve pubsub_service from conf, we'll use first one that " 336 "Can't retrieve pubsub_service from conf, we'll use first one that "
337 "we find" 337 "we find"
338 ) 338 )
339 ) 339 )
340 pubsub_services = await self.host.findServiceEntities( 340 pubsub_services = await self.host.find_service_entities(
341 client, "pubsub", "service" 341 client, "pubsub", "service"
342 ) 342 )
343 for service_jid in pubsub_services: 343 for service_jid in pubsub_services:
344 infos = await self.host.memory.disco.getInfos(client, service_jid) 344 infos = await self.host.memory.disco.get_infos(client, service_jid)
345 if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features): 345 if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features):
346 continue 346 continue
347 names = {(n or "").lower() for n in infos.identities.values()} 347 names = {(n or "").lower() for n in infos.identities.values()}
348 if "libervia pubsub service" in names: 348 if "libervia pubsub service" in names:
349 # this is the name of Libervia's side project pubsub service, we know 349 # this is the name of Libervia's side project pubsub service, we know
365 pubsub_service_str = ( 365 pubsub_service_str = (
366 client.pubsub_service.full() if client.pubsub_service else "PEP" 366 client.pubsub_service.full() if client.pubsub_service else "PEP"
367 ) 367 )
368 log.info(f"default pubsub service: {pubsub_service_str}") 368 log.info(f"default pubsub service: {pubsub_service_str}")
369 369
370 def getFeatures(self, profile): 370 def features_get(self, profile):
371 try: 371 try:
372 client = self.host.getClient(profile) 372 client = self.host.get_client(profile)
373 except exceptions.ProfileNotSetError: 373 except exceptions.ProfileNotSetError:
374 return {} 374 return {}
375 try: 375 try:
376 return { 376 return {
377 "service": client.pubsub_service.full() 377 "service": client.pubsub_service.full()
378 if client.pubsub_service is not None 378 if client.pubsub_service is not None
379 else "" 379 else ""
380 } 380 }
381 except AttributeError: 381 except AttributeError:
382 if self.host.isConnected(profile): 382 if self.host.is_connected(profile):
383 log.debug("Profile is not connected, service is not checked yet") 383 log.debug("Profile is not connected, service is not checked yet")
384 else: 384 else:
385 log.error("Service should be available !") 385 log.error("Service should be available !")
386 return {} 386 return {}
387 387
388 def parseExtra(self, extra): 388 def parse_extra(self, extra):
389 """Parse extra dictionnary 389 """Parse extra dictionnary
390 390
391 used bridge's extra dictionnaries 391 used bridge's extra dictionnaries
392 @param extra(dict): extra data used to configure request 392 @param extra(dict): extra data used to configure request
393 @return(Extra): filled Extra instance 393 @return(Extra): filled Extra instance
405 405
406 # rsm 406 # rsm
407 if self._rsm is None: 407 if self._rsm is None:
408 rsm_request = None 408 rsm_request = None
409 else: 409 else:
410 rsm_request = self._rsm.parseExtra(extra) 410 rsm_request = self._rsm.parse_extra(extra)
411 411
412 # mam 412 # mam
413 if self._mam is None: 413 if self._mam is None:
414 mam_request = None 414 mam_request = None
415 else: 415 else:
416 mam_request = self._mam.parseExtra(extra, with_rsm=False) 416 mam_request = self._mam.parse_extra(extra, with_rsm=False)
417 417
418 if mam_request is not None: 418 if mam_request is not None:
419 assert "mam" not in extra 419 assert "mam" not in extra
420 extra["mam"] = mam_request 420 extra["mam"] = mam_request
421 421
422 return Extra(rsm_request, extra) 422 return Extra(rsm_request, extra)
423 423
424 def addManagedNode( 424 def add_managed_node(
425 self, 425 self,
426 node: str, 426 node: str,
427 priority: int = 0, 427 priority: int = 0,
428 **kwargs: Callable 428 **kwargs: Callable
429 ): 429 ):
447 assert event_name in C.PS_EVENTS 447 assert event_name in C.PS_EVENTS
448 cb_list = callbacks.setdefault(event_name, []) 448 cb_list = callbacks.setdefault(event_name, [])
449 cb_list.append((cb, priority)) 449 cb_list.append((cb, priority))
450 cb_list.sort(key=lambda c: c[1], reverse=True) 450 cb_list.sort(key=lambda c: c[1], reverse=True)
451 451
452 def removeManagedNode(self, node, *args): 452 def remove_managed_node(self, node, *args):
453 """Add a handler for a node 453 """Add a handler for a node
454 454
455 @param node(unicode): node to monitor 455 @param node(unicode): node to monitor
456 @param *args: callback(s) to remove 456 @param *args: callback(s) to remove
457 """ 457 """
488 # @param service (JID): target service 488 # @param service (JID): target service
489 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) 489 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
490 # @param profile (str): %(doc_profile)s 490 # @param profile (str): %(doc_profile)s
491 # @return: deferred which fire a list of nodes 491 # @return: deferred which fire a list of nodes
492 # """ 492 # """
493 # client = self.host.getClient(profile) 493 # client = self.host.get_client(profile)
494 # d = self.host.getDiscoItems(client, service, nodeIdentifier) 494 # d = self.host.getDiscoItems(client, service, nodeIdentifier)
495 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) 495 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
496 # return d 496 # return d
497 497
498 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): 498 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
510 # """ 510 # """
511 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) 511 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
512 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) 512 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
513 # return d 513 # return d
514 514
515 def _sendItem(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", 515 def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="",
516 profile_key=C.PROF_KEY_NONE): 516 profile_key=C.PROF_KEY_NONE):
517 client = self.host.getClient(profile_key) 517 client = self.host.get_client(profile_key)
518 service = None if not service else jid.JID(service) 518 service = None if not service else jid.JID(service)
519 payload = xml_tools.parse(payload) 519 payload = xml_tools.parse(payload)
520 extra = data_format.deserialise(extra_ser) 520 extra = data_format.deserialise(extra_ser)
521 d = defer.ensureDeferred(self.sendItem( 521 d = defer.ensureDeferred(self.send_item(
522 client, service, nodeIdentifier, payload, item_id or None, extra 522 client, service, nodeIdentifier, payload, item_id or None, extra
523 )) 523 ))
524 d.addCallback(lambda ret: ret or "") 524 d.addCallback(lambda ret: ret or "")
525 return d 525 return d
526 526
527 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, 527 def _send_items(self, service, nodeIdentifier, items, extra_ser=None,
528 profile_key=C.PROF_KEY_NONE): 528 profile_key=C.PROF_KEY_NONE):
529 client = self.host.getClient(profile_key) 529 client = self.host.get_client(profile_key)
530 service = None if not service else jid.JID(service) 530 service = None if not service else jid.JID(service)
531 try: 531 try:
532 items = [xml_tools.parse(item) for item in items] 532 items = [xml_tools.parse(item) for item in items]
533 except Exception as e: 533 except Exception as e:
534 raise exceptions.DataError(_("Can't parse items: {msg}").format( 534 raise exceptions.DataError(_("Can't parse items: {msg}").format(
535 msg=e)) 535 msg=e))
536 extra = data_format.deserialise(extra_ser) 536 extra = data_format.deserialise(extra_ser)
537 return defer.ensureDeferred(self.sendItems( 537 return defer.ensureDeferred(self.send_items(
538 client, service, nodeIdentifier, items, extra=extra 538 client, service, nodeIdentifier, items, extra=extra
539 )) 539 ))
540 540
541 async def sendItem( 541 async def send_item(
542 self, 542 self,
543 client: SatXMPPClient, 543 client: SatXMPPClient,
544 service: Union[jid.JID, None], 544 service: Union[jid.JID, None],
545 nodeIdentifier: str, 545 nodeIdentifier: str,
546 payload: domish.Element, 546 payload: domish.Element,
559 assert isinstance(payload, domish.Element) 559 assert isinstance(payload, domish.Element)
560 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) 560 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
561 if item_id is not None: 561 if item_id is not None:
562 item_elt['id'] = item_id 562 item_elt['id'] = item_id
563 item_elt.addChild(payload) 563 item_elt.addChild(payload)
564 published_ids = await self.sendItems( 564 published_ids = await self.send_items(
565 client, 565 client,
566 service, 566 service,
567 nodeIdentifier, 567 nodeIdentifier,
568 [item_elt], 568 [item_elt],
569 extra=extra 569 extra=extra
571 try: 571 try:
572 return published_ids[0] 572 return published_ids[0]
573 except IndexError: 573 except IndexError:
574 return item_id 574 return item_id
575 575
576 async def sendItems( 576 async def send_items(
577 self, 577 self,
578 client: SatXMPPEntity, 578 client: SatXMPPEntity,
579 service: Optional[jid.JID], 579 service: Optional[jid.JID],
580 nodeIdentifier: str, 580 nodeIdentifier: str,
581 items: List[domish.Element], 581 items: List[domish.Element],
674 """ 674 """
675 if sender is None: 675 if sender is None:
676 sender = client.jid 676 sender = client.jid
677 if extra is None: 677 if extra is None:
678 extra = {} 678 extra = {}
679 if not await self.host.trigger.asyncPoint( 679 if not await self.host.trigger.async_point(
680 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, 680 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender,
681 extra 681 extra
682 ): 682 ):
683 return extra["iq_result_elt"] 683 return extra["iq_result_elt"]
684 iq_result_elt = await client.pubsub_client.publish( 684 iq_result_elt = await client.pubsub_client.publish(
685 service, nodeIdentifier, items, sender, 685 service, nodeIdentifier, items, sender,
686 options=options 686 options=options
687 ) 687 )
688 return iq_result_elt 688 return iq_result_elt
689 689
690 def _unwrapMAMMessage(self, message_elt): 690 def _unwrap_mam_message(self, message_elt):
691 try: 691 try:
692 item_elt = reduce( 692 item_elt = reduce(
693 lambda elt, ns_name: next(elt.elements(*ns_name)), 693 lambda elt, ns_name: next(elt.elements(*ns_name)),
694 (message_elt, 694 (message_elt,
695 (mam.NS_MAM, "result"), 695 (mam.NS_MAM, "result"),
701 )) 701 ))
702 except StopIteration: 702 except StopIteration:
703 raise exceptions.DataError("Can't find Item in MAM message element") 703 raise exceptions.DataError("Can't find Item in MAM message element")
704 return item_elt 704 return item_elt
705 705
706 def serialiseItems(self, items_data): 706 def serialise_items(self, items_data):
707 items, metadata = items_data 707 items, metadata = items_data
708 metadata['items'] = items 708 metadata['items'] = items
709 return data_format.serialise(metadata) 709 return data_format.serialise(metadata)
710 710
711 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, 711 def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None,
712 extra="", profile_key=C.PROF_KEY_NONE): 712 extra="", profile_key=C.PROF_KEY_NONE):
713 """Get items from pubsub node 713 """Get items from pubsub node
714 714
715 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit 715 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
716 """ 716 """
717 client = self.host.getClient(profile_key) 717 client = self.host.get_client(profile_key)
718 service = jid.JID(service) if service else None 718 service = jid.JID(service) if service else None
719 max_items = None if max_items == C.NO_LIMIT else max_items 719 max_items = None if max_items == C.NO_LIMIT else max_items
720 extra = self.parseExtra(data_format.deserialise(extra)) 720 extra = self.parse_extra(data_format.deserialise(extra))
721 d = defer.ensureDeferred(self.getItems( 721 d = defer.ensureDeferred(self.get_items(
722 client, 722 client,
723 service, 723 service,
724 node, 724 node,
725 max_items, 725 max_items,
726 item_ids, 726 item_ids,
727 sub_id or None, 727 sub_id or None,
728 extra.rsm_request, 728 extra.rsm_request,
729 extra.extra, 729 extra.extra,
730 )) 730 ))
731 d.addCallback(self.transItemsData) 731 d.addCallback(self.trans_items_data)
732 d.addCallback(self.serialiseItems) 732 d.addCallback(self.serialise_items)
733 return d 733 return d
734 734
735 async def getItems( 735 async def get_items(
736 self, 736 self,
737 client: SatXMPPEntity, 737 client: SatXMPPEntity,
738 service: Optional[jid.JID], 738 service: Optional[jid.JID],
739 node: str, 739 node: str,
740 max_items: Optional[int] = None, 740 max_items: Optional[int] = None,
765 max_items = None 765 max_items = None
766 if rsm_request and item_ids: 766 if rsm_request and item_ids:
767 raise ValueError("items_id can't be used with rsm") 767 raise ValueError("items_id can't be used with rsm")
768 if extra is None: 768 if extra is None:
769 extra = {} 769 extra = {}
770 cont, ret = await self.host.trigger.asyncReturnPoint( 770 cont, ret = await self.host.trigger.async_return_point(
771 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, 771 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id,
772 rsm_request, extra 772 rsm_request, extra
773 ) 773 )
774 if not cont: 774 if not cont:
775 return ret 775 return ret
786 orderBy = extra.get(C.KEY_ORDER_BY), 786 orderBy = extra.get(C.KEY_ORDER_BY),
787 rsm_request = rsm_request, 787 rsm_request = rsm_request,
788 extra = extra 788 extra = extra
789 )) 789 ))
790 # we have no MAM data here, so we add None 790 # we have no MAM data here, so we add None
791 d.addErrback(sat_defer.stanza2NotFound) 791 d.addErrback(sat_defer.stanza_2_not_found)
792 d.addTimeout(TIMEOUT, reactor) 792 d.addTimeout(TIMEOUT, reactor)
793 items, rsm_response = await d 793 items, rsm_response = await d
794 mam_response = None 794 mam_response = None
795 else: 795 else:
796 # if mam is requested, we have to do a totally different query 796 # if mam is requested, we have to do a totally different query
802 raise exceptions.DataError("items_ids parameter can't be used with MAM") 802 raise exceptions.DataError("items_ids parameter can't be used with MAM")
803 if mam_query.node is None: 803 if mam_query.node is None:
804 mam_query.node = node 804 mam_query.node = node
805 elif mam_query.node != node: 805 elif mam_query.node != node:
806 raise exceptions.DataError( 806 raise exceptions.DataError(
807 "MAM query node is incoherent with getItems's node" 807 "MAM query node is incoherent with get_items's node"
808 ) 808 )
809 if mam_query.rsm is None: 809 if mam_query.rsm is None:
810 mam_query.rsm = rsm_request 810 mam_query.rsm = rsm_request
811 else: 811 else:
812 if mam_query.rsm != rsm_request: 812 if mam_query.rsm != rsm_request:
813 raise exceptions.DataError( 813 raise exceptions.DataError(
814 "Conflict between RSM request and MAM's RSM request" 814 "Conflict between RSM request and MAM's RSM request"
815 ) 815 )
816 items, rsm_response, mam_response = await self._mam.getArchives( 816 items, rsm_response, mam_response = await self._mam.get_archives(
817 client, mam_query, service, self._unwrapMAMMessage 817 client, mam_query, service, self._unwrap_mam_message
818 ) 818 )
819 819
820 try: 820 try:
821 subscribe = C.bool(extra["subscribe"]) 821 subscribe = C.bool(extra["subscribe"])
822 except KeyError: 822 except KeyError:
833 # TODO: handle mam_response 833 # TODO: handle mam_response
834 service_jid = service if service else client.jid.userhostJID() 834 service_jid = service if service else client.jid.userhostJID()
835 metadata = { 835 metadata = {
836 "service": service_jid, 836 "service": service_jid,
837 "node": node, 837 "node": node,
838 "uri": self.getNodeURI(service_jid, node), 838 "uri": self.get_node_uri(service_jid, node),
839 } 839 }
840 if mam_response is not None: 840 if mam_response is not None:
841 # mam_response is a dict with "complete" and "stable" keys 841 # mam_response is a dict with "complete" and "stable" keys
842 # we can put them directly in metadata 842 # we can put them directly in metadata
843 metadata.update(mam_response) 843 metadata.update(mam_response)
873 # - key: a value in (a subset of) data.keys() 873 # - key: a value in (a subset of) data.keys()
874 # - couple (list[dict], dict) containing: 874 # - couple (list[dict], dict) containing:
875 # - list of items 875 # - list of items
876 # - RSM response data 876 # - RSM response data
877 # """ 877 # """
878 # client = self.host.getClient(profile_key) 878 # client = self.host.get_client(profile_key)
879 # found_nodes = yield self.listNodes(service, profile=client.profile) 879 # found_nodes = yield self.listNodes(service, profile=client.profile)
880 # d_dict = {} 880 # d_dict = {}
881 # for publisher, node in data.items(): 881 # for publisher, node in data.items():
882 # if node not in found_nodes: 882 # if node not in found_nodes:
883 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) 883 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
884 # continue # avoid pubsub "item-not-found" error 884 # continue # avoid pubsub "item-not-found" error
885 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) 885 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile)
886 # defer.returnValue(d_dict) 886 # defer.returnValue(d_dict)
887 887
888 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, 888 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None,
889 profile_key=C.PROF_KEY_NONE): 889 profile_key=C.PROF_KEY_NONE):
890 client = self.host.getClient(profile_key) 890 client = self.host.get_client(profile_key)
891 return client.pubsub_client.getOptions( 891 return client.pubsub_client.getOptions(
892 service, nodeIdentifier, subscriber, subscriptionIdentifier 892 service, nodeIdentifier, subscriber, subscriptionIdentifier
893 ) 893 )
894 894
895 def setOptions(self, service, nodeIdentifier, subscriber, options, 895 def setOptions(self, service, nodeIdentifier, subscriber, options,
896 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 896 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
897 client = self.host.getClient(profile_key) 897 client = self.host.get_client(profile_key)
898 return client.pubsub_client.setOptions( 898 return client.pubsub_client.setOptions(
899 service, nodeIdentifier, subscriber, options, subscriptionIdentifier 899 service, nodeIdentifier, subscriber, options, subscriptionIdentifier
900 ) 900 )
901 901
902 def _createNode(self, service_s, nodeIdentifier, options, profile_key): 902 def _create_node(self, service_s, nodeIdentifier, options, profile_key):
903 client = self.host.getClient(profile_key) 903 client = self.host.get_client(profile_key)
904 return self.createNode( 904 return self.createNode(
905 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options 905 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
906 ) 906 )
907 907
908 def createNode( 908 def createNode(
922 """ 922 """
923 # TODO: if pubsub service doesn't hande publish-options, configure it in a second time 923 # TODO: if pubsub service doesn't hande publish-options, configure it in a second time
924 return client.pubsub_client.createNode(service, nodeIdentifier, options) 924 return client.pubsub_client.createNode(service, nodeIdentifier, options)
925 925
926 @defer.inlineCallbacks 926 @defer.inlineCallbacks
927 def createIfNewNode(self, client, service, nodeIdentifier, options=None): 927 def create_if_new_node(self, client, service, nodeIdentifier, options=None):
928 """Helper method similar to createNode, but will not fail in case of conflict""" 928 """Helper method similar to createNode, but will not fail in case of conflict"""
929 try: 929 try:
930 yield self.createNode(client, service, nodeIdentifier, options) 930 yield self.createNode(client, service, nodeIdentifier, options)
931 except error.StanzaError as e: 931 except error.StanzaError as e:
932 if e.condition == "conflict": 932 if e.condition == "conflict":
933 pass 933 pass
934 else: 934 else:
935 raise e 935 raise e
936 936
937 def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key): 937 def _get_node_configuration(self, service_s, nodeIdentifier, profile_key):
938 client = self.host.getClient(profile_key) 938 client = self.host.get_client(profile_key)
939 d = self.getConfiguration( 939 d = self.getConfiguration(
940 client, jid.JID(service_s) if service_s else None, nodeIdentifier 940 client, jid.JID(service_s) if service_s else None, nodeIdentifier
941 ) 941 )
942 942
943 def serialize(form): 943 def serialize(form):
967 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG 967 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG
968 ) 968 )
969 form.makeFields(options) 969 form.makeFields(options)
970 return form 970 return form
971 971
972 def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key): 972 def _set_node_configuration(self, service_s, nodeIdentifier, options, profile_key):
973 client = self.host.getClient(profile_key) 973 client = self.host.get_client(profile_key)
974 d = self.setConfiguration( 974 d = self.setConfiguration(
975 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options 975 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
976 ) 976 )
977 return d 977 return d
978 978
985 request.options = form 985 request.options = form
986 986
987 d = request.send(client.xmlstream) 987 d = request.send(client.xmlstream)
988 return d 988 return d
989 989
990 def _getAffiliations(self, service_s, nodeIdentifier, profile_key): 990 def _get_affiliations(self, service_s, nodeIdentifier, profile_key):
991 client = self.host.getClient(profile_key) 991 client = self.host.get_client(profile_key)
992 d = self.getAffiliations( 992 d = self.get_affiliations(
993 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None 993 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None
994 ) 994 )
995 return d 995 return d
996 996
997 def getAffiliations(self, client, service, nodeIdentifier=None): 997 def get_affiliations(self, client, service, nodeIdentifier=None):
998 """Retrieve affiliations of an entity 998 """Retrieve affiliations of an entity
999 999
1000 @param nodeIdentifier(unicode, None): node to get affiliation from 1000 @param nodeIdentifier(unicode, None): node to get affiliation from
1001 None to get all nodes affiliations for this service 1001 None to get all nodes affiliations for this service
1002 """ 1002 """
1029 1029
1030 d = request.send(client.xmlstream) 1030 d = request.send(client.xmlstream)
1031 d.addCallback(cb) 1031 d.addCallback(cb)
1032 return d 1032 return d
1033 1033
1034 def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key): 1034 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key):
1035 client = self.host.getClient(profile_key) 1035 client = self.host.get_client(profile_key)
1036 d = self.getNodeAffiliations( 1036 d = self.get_node_affiliations(
1037 client, jid.JID(service_s) if service_s else None, nodeIdentifier 1037 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1038 ) 1038 )
1039 d.addCallback( 1039 d.addCallback(
1040 lambda affiliations: {j.full(): a for j, a in affiliations.items()} 1040 lambda affiliations: {j.full(): a for j, a in affiliations.items()}
1041 ) 1041 )
1042 return d 1042 return d
1043 1043
1044 def getNodeAffiliations(self, client, service, nodeIdentifier): 1044 def get_node_affiliations(self, client, service, nodeIdentifier):
1045 """Retrieve affiliations of a node owned by profile""" 1045 """Retrieve affiliations of a node owned by profile"""
1046 request = pubsub.PubSubRequest("affiliationsGet") 1046 request = pubsub.PubSubRequest("affiliationsGet")
1047 request.recipient = service 1047 request.recipient = service
1048 request.nodeIdentifier = nodeIdentifier 1048 request.nodeIdentifier = nodeIdentifier
1049 1049
1074 1074
1075 d = request.send(client.xmlstream) 1075 d = request.send(client.xmlstream)
1076 d.addCallback(cb) 1076 d.addCallback(cb)
1077 return d 1077 return d
1078 1078
1079 def _setNodeAffiliations( 1079 def _set_node_affiliations(
1080 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE 1080 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE
1081 ): 1081 ):
1082 client = self.host.getClient(profile_key) 1082 client = self.host.get_client(profile_key)
1083 affiliations = { 1083 affiliations = {
1084 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items() 1084 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items()
1085 } 1085 }
1086 d = self.setNodeAffiliations( 1086 d = self.set_node_affiliations(
1087 client, 1087 client,
1088 jid.JID(service_s) if service_s else None, 1088 jid.JID(service_s) if service_s else None,
1089 nodeIdentifier, 1089 nodeIdentifier,
1090 affiliations, 1090 affiliations,
1091 ) 1091 )
1092 return d 1092 return d
1093 1093
1094 def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations): 1094 def set_node_affiliations(self, client, service, nodeIdentifier, affiliations):
1095 """Update affiliations of a node owned by profile 1095 """Update affiliations of a node owned by profile
1096 1096
1097 @param affiliations(dict[jid.JID, unicode]): affiliations to set 1097 @param affiliations(dict[jid.JID, unicode]): affiliations to set
1098 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations 1098 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations
1099 """ 1099 """
1102 request.nodeIdentifier = nodeIdentifier 1102 request.nodeIdentifier = nodeIdentifier
1103 request.affiliations = affiliations 1103 request.affiliations = affiliations
1104 d = request.send(client.xmlstream) 1104 d = request.send(client.xmlstream)
1105 return d 1105 return d
1106 1106
1107 def _purgeNode(self, service_s, nodeIdentifier, profile_key): 1107 def _purge_node(self, service_s, nodeIdentifier, profile_key):
1108 client = self.host.getClient(profile_key) 1108 client = self.host.get_client(profile_key)
1109 return self.purgeNode( 1109 return self.purge_node(
1110 client, jid.JID(service_s) if service_s else None, nodeIdentifier 1110 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1111 ) 1111 )
1112 1112
1113 def purgeNode(self, client, service, nodeIdentifier): 1113 def purge_node(self, client, service, nodeIdentifier):
1114 return client.pubsub_client.purgeNode(service, nodeIdentifier) 1114 return client.pubsub_client.purge_node(service, nodeIdentifier)
1115 1115
1116 def _deleteNode(self, service_s, nodeIdentifier, profile_key): 1116 def _delete_node(self, service_s, nodeIdentifier, profile_key):
1117 client = self.host.getClient(profile_key) 1117 client = self.host.get_client(profile_key)
1118 return self.deleteNode( 1118 return self.deleteNode(
1119 client, jid.JID(service_s) if service_s else None, nodeIdentifier 1119 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1120 ) 1120 )
1121 1121
1122 def deleteNode( 1122 def deleteNode(
1130 def _addWatch(self, service_s, node, profile_key): 1130 def _addWatch(self, service_s, node, profile_key):
1131 """watch modifications on a node 1131 """watch modifications on a node
1132 1132
1133 This method should only be called from bridge 1133 This method should only be called from bridge
1134 """ 1134 """
1135 client = self.host.getClient(profile_key) 1135 client = self.host.get_client(profile_key)
1136 service = jid.JID(service_s) if service_s else client.jid.userhostJID() 1136 service = jid.JID(service_s) if service_s else client.jid.userhostJID()
1137 client.pubsub_watching.add((service, node)) 1137 client.pubsub_watching.add((service, node))
1138 1138
1139 def _removeWatch(self, service_s, node, profile_key): 1139 def _remove_watch(self, service_s, node, profile_key):
1140 """remove a node watch 1140 """remove a node watch
1141 1141
1142 This method should only be called from bridge 1142 This method should only be called from bridge
1143 """ 1143 """
1144 client = self.host.getClient(profile_key) 1144 client = self.host.get_client(profile_key)
1145 service = jid.JID(service_s) if service_s else client.jid.userhostJID() 1145 service = jid.JID(service_s) if service_s else client.jid.userhostJID()
1146 client.pubsub_watching.remove((service, node)) 1146 client.pubsub_watching.remove((service, node))
1147 1147
1148 def _retractItem( 1148 def _retract_item(
1149 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key 1149 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key
1150 ): 1150 ):
1151 return self._retractItems( 1151 return self._retract_items(
1152 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key 1152 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key
1153 ) 1153 )
1154 1154
1155 def _retractItems( 1155 def _retract_items(
1156 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key 1156 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key
1157 ): 1157 ):
1158 client = self.host.getClient(profile_key) 1158 client = self.host.get_client(profile_key)
1159 return self.retractItems( 1159 return self.retract_items(
1160 client, 1160 client,
1161 jid.JID(service_s) if service_s else None, 1161 jid.JID(service_s) if service_s else None,
1162 nodeIdentifier, 1162 nodeIdentifier,
1163 itemIdentifiers, 1163 itemIdentifiers,
1164 notify, 1164 notify,
1165 ) 1165 )
1166 1166
1167 def retractItems( 1167 def retract_items(
1168 self, 1168 self,
1169 client: SatXMPPClient, 1169 client: SatXMPPClient,
1170 service: jid.JID, 1170 service: jid.JID,
1171 nodeIdentifier: str, 1171 nodeIdentifier: str,
1172 itemIdentifiers: Iterable[str], 1172 itemIdentifiers: Iterable[str],
1174 ) -> defer.Deferred: 1174 ) -> defer.Deferred:
1175 return client.pubsub_client.retractItems( 1175 return client.pubsub_client.retractItems(
1176 service, nodeIdentifier, itemIdentifiers, notify=notify 1176 service, nodeIdentifier, itemIdentifiers, notify=notify
1177 ) 1177 )
1178 1178
1179 def _renameItem( 1179 def _rename_item(
1180 self, 1180 self,
1181 service, 1181 service,
1182 node, 1182 node,
1183 item_id, 1183 item_id,
1184 new_id, 1184 new_id,
1185 profile_key=C.PROF_KEY_NONE, 1185 profile_key=C.PROF_KEY_NONE,
1186 ): 1186 ):
1187 client = self.host.getClient(profile_key) 1187 client = self.host.get_client(profile_key)
1188 service = jid.JID(service) if service else None 1188 service = jid.JID(service) if service else None
1189 return defer.ensureDeferred(self.renameItem( 1189 return defer.ensureDeferred(self.rename_item(
1190 client, service, node, item_id, new_id 1190 client, service, node, item_id, new_id
1191 )) 1191 ))
1192 1192
1193 async def renameItem( 1193 async def rename_item(
1194 self, 1194 self,
1195 client: SatXMPPEntity, 1195 client: SatXMPPEntity,
1196 service: Optional[jid.JID], 1196 service: Optional[jid.JID],
1197 node: str, 1197 node: str,
1198 item_id: str, 1198 item_id: str,
1205 """ 1205 """
1206 if not item_id or not new_id: 1206 if not item_id or not new_id:
1207 raise ValueError("item_id and new_id must not be empty") 1207 raise ValueError("item_id and new_id must not be empty")
1208 # retract must be done last, so if something goes wrong, the exception will stop 1208 # retract must be done last, so if something goes wrong, the exception will stop
1209 # the workflow and no accidental delete should happen 1209 # the workflow and no accidental delete should happen
1210 item_elt = (await self.getItems(client, service, node, item_ids=[item_id]))[0][0] 1210 item_elt = (await self.get_items(client, service, node, item_ids=[item_id]))[0][0]
1211 await self.sendItem(client, service, node, item_elt.firstChildElement(), new_id) 1211 await self.send_item(client, service, node, item_elt.firstChildElement(), new_id)
1212 await self.retractItems(client, service, node, [item_id]) 1212 await self.retract_items(client, service, node, [item_id])
1213 1213
1214 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 1214 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
1215 client = self.host.getClient(profile_key) 1215 client = self.host.get_client(profile_key)
1216 service = None if not service else jid.JID(service) 1216 service = None if not service else jid.JID(service)
1217 d = defer.ensureDeferred( 1217 d = defer.ensureDeferred(
1218 self.subscribe( 1218 self.subscribe(
1219 client, 1219 client,
1220 service, 1220 service,
1234 options: Optional[dict] = None 1234 options: Optional[dict] = None
1235 ) -> pubsub.Subscription: 1235 ) -> pubsub.Subscription:
1236 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe 1236 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
1237 if service is None: 1237 if service is None:
1238 service = client.jid.userhostJID() 1238 service = client.jid.userhostJID()
1239 cont, trigger_sub = await self.host.trigger.asyncReturnPoint( 1239 cont, trigger_sub = await self.host.trigger.async_return_point(
1240 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, 1240 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options,
1241 ) 1241 )
1242 if not cont: 1242 if not cont:
1243 return trigger_sub 1243 return trigger_sub
1244 try: 1244 try:
1252 else: 1252 else:
1253 raise e 1253 raise e
1254 return subscription 1254 return subscription
1255 1255
1256 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): 1256 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
1257 client = self.host.getClient(profile_key) 1257 client = self.host.get_client(profile_key)
1258 service = None if not service else jid.JID(service) 1258 service = None if not service else jid.JID(service)
1259 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier)) 1259 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier))
1260 1260
1261 async def unsubscribe( 1261 async def unsubscribe(
1262 self, 1262 self,
1265 nodeIdentifier: str, 1265 nodeIdentifier: str,
1266 sub_jid: Optional[jid.JID] = None, 1266 sub_jid: Optional[jid.JID] = None,
1267 subscriptionIdentifier: Optional[str] = None, 1267 subscriptionIdentifier: Optional[str] = None,
1268 sender: Optional[jid.JID] = None, 1268 sender: Optional[jid.JID] = None,
1269 ) -> None: 1269 ) -> None:
1270 if not await self.host.trigger.asyncPoint( 1270 if not await self.host.trigger.async_point(
1271 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, 1271 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid,
1272 subscriptionIdentifier, sender 1272 subscriptionIdentifier, sender
1273 ): 1273 ):
1274 return 1274 return
1275 try: 1275 try:
1296 self, 1296 self,
1297 service="", 1297 service="",
1298 nodeIdentifier="", 1298 nodeIdentifier="",
1299 profile_key=C.PROF_KEY_NONE 1299 profile_key=C.PROF_KEY_NONE
1300 ) -> str: 1300 ) -> str:
1301 client = self.host.getClient(profile_key) 1301 client = self.host.get_client(profile_key)
1302 service = None if not service else jid.JID(service) 1302 service = None if not service else jid.JID(service)
1303 subs = await self.subscriptions(client, service, nodeIdentifier or None) 1303 subs = await self.subscriptions(client, service, nodeIdentifier or None)
1304 return data_format.serialise(subs) 1304 return data_format.serialise(subs)
1305 1305
1306 async def subscriptions( 1306 async def subscriptions(
1313 1313
1314 @param service(jid.JID): PubSub service 1314 @param service(jid.JID): PubSub service
1315 @param nodeIdentifier(unicode, None): node to check 1315 @param nodeIdentifier(unicode, None): node to check
1316 None to get all subscriptions 1316 None to get all subscriptions
1317 """ 1317 """
1318 cont, ret = await self.host.trigger.asyncReturnPoint( 1318 cont, ret = await self.host.trigger.async_return_point(
1319 "XEP-0060_subscriptions", client, service, node 1319 "XEP-0060_subscriptions", client, service, node
1320 ) 1320 )
1321 if not cont: 1321 if not cont:
1322 return ret 1322 return ret
1323 subs = await client.pubsub_client.subscriptions(service, node) 1323 subs = await client.pubsub_client.subscriptions(service, node)
1334 ret.append(sub_dict) 1334 ret.append(sub_dict)
1335 return ret 1335 return ret
1336 1336
1337 ## misc tools ## 1337 ## misc tools ##
1338 1338
1339 def getNodeURI(self, service, node, item=None): 1339 def get_node_uri(self, service, node, item=None):
1340 """Return XMPP URI of a PubSub node 1340 """Return XMPP URI of a PubSub node
1341 1341
1342 @param service(jid.JID): PubSub service 1342 @param service(jid.JID): PubSub service
1343 @param node(unicode): node 1343 @param node(unicode): node
1344 @return (unicode): URI of the node 1344 @return (unicode): URI of the node
1357 1357
1358 ## methods to manage several stanzas/jids at once ## 1358 ## methods to manage several stanzas/jids at once ##
1359 1359
1360 # generic # 1360 # generic #
1361 1361
1362 def getRTResults( 1362 def get_rt_results(
1363 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE 1363 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
1364 ): 1364 ):
1365 return self.rt_sessions.getResults(session_id, on_success, on_error, profile) 1365 return self.rt_sessions.get_results(session_id, on_success, on_error, profile)
1366 1366
1367 def transItemsData(self, items_data, item_cb=lambda item: item.toXml()): 1367 def trans_items_data(self, items_data, item_cb=lambda item: item.toXml()):
1368 """Helper method to transform result from [getItems] 1368 """Helper method to transform result from [get_items]
1369 1369
1370 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) 1370 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
1371 as returned by [getItems]. 1371 as returned by [get_items].
1372 @param items_data(tuple): tuple returned by [getItems] 1372 @param items_data(tuple): tuple returned by [get_items]
1373 @param item_cb(callable): method to transform each item 1373 @param item_cb(callable): method to transform each item
1374 @return (tuple): a serialised form ready to go throught bridge 1374 @return (tuple): a serialised form ready to go throught bridge
1375 """ 1375 """
1376 items, metadata = items_data 1376 items, metadata = items_data
1377 items = [item_cb(item) for item in items] 1377 items = [item_cb(item) for item in items]
1378 1378
1379 return (items, metadata) 1379 return (items, metadata)
1380 1380
1381 def transItemsDataD(self, items_data, item_cb): 1381 def trans_items_data_d(self, items_data, item_cb):
1382 """Helper method to transform result from [getItems], deferred version 1382 """Helper method to transform result from [get_items], deferred version
1383 1383
1384 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) 1384 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
1385 as returned by [getItems]. metadata values are then casted to unicode and 1385 as returned by [get_items]. metadata values are then casted to unicode and
1386 each item is passed to items_cb. 1386 each item is passed to items_cb.
1387 An errback is added to item_cb, and when it is fired the value is filtered from 1387 An errback is added to item_cb, and when it is fired the value is filtered from
1388 final items 1388 final items
1389 @param items_data(tuple): tuple returned by [getItems] 1389 @param items_data(tuple): tuple returned by [get_items]
1390 @param item_cb(callable): method to transform each item (must return a deferred) 1390 @param item_cb(callable): method to transform each item (must return a deferred)
1391 @return (tuple): a deferred which fire a dict which can be serialised to go 1391 @return (tuple): a deferred which fire a dict which can be serialised to go
1392 throught bridge 1392 throught bridge
1393 """ 1393 """
1394 items, metadata = items_data 1394 items, metadata = items_data
1401 [i for i in parsed_items if i is not None], 1401 [i for i in parsed_items if i is not None],
1402 metadata 1402 metadata
1403 )) 1403 ))
1404 return d 1404 return d
1405 1405
1406 def serDList(self, results, failure_result=None): 1406 def ser_d_list(self, results, failure_result=None):
1407 """Serialise a DeferredList result 1407 """Serialise a DeferredList result
1408 1408
1409 @param results: DeferredList results 1409 @param results: DeferredList results
1410 @param failure_result: value to use as value for failed Deferred 1410 @param failure_result: value to use as value for failed Deferred
1411 (default: empty tuple) 1411 (default: empty tuple)
1423 ] 1423 ]
1424 1424
1425 # subscribe # 1425 # subscribe #
1426 1426
1427 @utils.ensure_deferred 1427 @utils.ensure_deferred
1428 async def _getNodeSubscriptions( 1428 async def _get_node_subscriptions(
1429 self, 1429 self,
1430 service: str, 1430 service: str,
1431 node: str, 1431 node: str,
1432 profile_key: str 1432 profile_key: str
1433 ) -> Dict[str, str]: 1433 ) -> Dict[str, str]:
1434 client = self.host.getClient(profile_key) 1434 client = self.host.get_client(profile_key)
1435 subs = await self.getNodeSubscriptions( 1435 subs = await self.get_node_subscriptions(
1436 client, jid.JID(service) if service else None, node 1436 client, jid.JID(service) if service else None, node
1437 ) 1437 )
1438 return {j.full(): a for j, a in subs.items()} 1438 return {j.full(): a for j, a in subs.items()}
1439 1439
1440 async def getNodeSubscriptions( 1440 async def get_node_subscriptions(
1441 self, 1441 self,
1442 client: SatXMPPEntity, 1442 client: SatXMPPEntity,
1443 service: Optional[jid.JID], 1443 service: Optional[jid.JID],
1444 nodeIdentifier: str 1444 nodeIdentifier: str
1445 ) -> Dict[jid.JID, str]: 1445 ) -> Dict[jid.JID, str]:
1478 _("Invalid result: bad <subscription> element: {}").format( 1478 _("Invalid result: bad <subscription> element: {}").format(
1479 iq_elt.toXml 1479 iq_elt.toXml
1480 ) 1480 )
1481 ) 1481 )
1482 1482
1483 def _setNodeSubscriptions( 1483 def _set_node_subscriptions(
1484 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE 1484 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
1485 ): 1485 ):
1486 client = self.host.getClient(profile_key) 1486 client = self.host.get_client(profile_key)
1487 subscriptions = { 1487 subscriptions = {
1488 jid.JID(jid_): subscription 1488 jid.JID(jid_): subscription
1489 for jid_, subscription in subscriptions.items() 1489 for jid_, subscription in subscriptions.items()
1490 } 1490 }
1491 d = self.setNodeSubscriptions( 1491 d = self.set_node_subscriptions(
1492 client, 1492 client,
1493 jid.JID(service_s) if service_s else None, 1493 jid.JID(service_s) if service_s else None,
1494 nodeIdentifier, 1494 nodeIdentifier,
1495 subscriptions, 1495 subscriptions,
1496 ) 1496 )
1497 return d 1497 return d
1498 1498
1499 def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions): 1499 def set_node_subscriptions(self, client, service, nodeIdentifier, subscriptions):
1500 """Set or update subscriptions of a node owned by profile 1500 """Set or update subscriptions of a node owned by profile
1501 1501
1502 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set 1502 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set
1503 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions 1503 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions
1504 """ 1504 """
1510 for jid_, state in subscriptions.items() 1510 for jid_, state in subscriptions.items()
1511 } 1511 }
1512 d = request.send(client.xmlstream) 1512 d = request.send(client.xmlstream)
1513 return d 1513 return d
1514 1514
1515 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): 1515 def _many_subscribe_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
1516 """Get real-time results for subcribeToManu session 1516 """Get real-time results for subcribeToManu session
1517 1517
1518 @param session_id: id of the real-time deferred session 1518 @param session_id: id of the real-time deferred session
1519 @param return (tuple): (remaining, results) where: 1519 @param return (tuple): (remaining, results) where:
1520 - remaining is the number of still expected results 1520 - remaining is the number of still expected results
1522 - service: pubsub service 1522 - service: pubsub service
1523 - and node: pubsub node 1523 - and node: pubsub node
1524 - failure(unicode): empty string in case of success, error message else 1524 - failure(unicode): empty string in case of success, error message else
1525 @param profile_key: %(doc_profile_key)s 1525 @param profile_key: %(doc_profile_key)s
1526 """ 1526 """
1527 profile = self.host.getClient(profile_key).profile 1527 profile = self.host.get_client(profile_key).profile
1528 d = self.rt_sessions.getResults( 1528 d = self.rt_sessions.get_results(
1529 session_id, 1529 session_id,
1530 on_success=lambda result: "", 1530 on_success=lambda result: "",
1531 on_error=lambda failure: str(failure.value), 1531 on_error=lambda failure: str(failure.value),
1532 profile=profile, 1532 profile=profile,
1533 ) 1533 )
1541 ], 1541 ],
1542 ) 1542 )
1543 ) 1543 )
1544 return d 1544 return d
1545 1545
1546 def _subscribeToMany( 1546 def _subscribe_to_many(
1547 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE 1547 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE
1548 ): 1548 ):
1549 return self.subscribeToMany( 1549 return self.subscribe_to_many(
1550 [(jid.JID(service), str(node)) for service, node in node_data], 1550 [(jid.JID(service), str(node)) for service, node in node_data],
1551 jid.JID(subscriber), 1551 jid.JID(subscriber),
1552 options, 1552 options,
1553 profile_key, 1553 profile_key,
1554 ) 1554 )
1555 1555
1556 def subscribeToMany( 1556 def subscribe_to_many(
1557 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE 1557 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE
1558 ): 1558 ):
1559 """Subscribe to several nodes at once. 1559 """Subscribe to several nodes at once.
1560 1560
1561 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: 1561 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
1564 @param subscriber (jid.JID): optional subscription identifier. 1564 @param subscriber (jid.JID): optional subscription identifier.
1565 @param options (dict): subscription options 1565 @param options (dict): subscription options
1566 @param profile_key (str): %(doc_profile_key)s 1566 @param profile_key (str): %(doc_profile_key)s
1567 @return (str): RT Deferred session id 1567 @return (str): RT Deferred session id
1568 """ 1568 """
1569 client = self.host.getClient(profile_key) 1569 client = self.host.get_client(profile_key)
1570 deferreds = {} 1570 deferreds = {}
1571 for service, node in node_data: 1571 for service, node in node_data:
1572 deferreds[(service, node)] = defer.ensureDeferred( 1572 deferreds[(service, node)] = defer.ensureDeferred(
1573 client.pubsub_client.subscribe( 1573 client.pubsub_client.subscribe(
1574 service, node, subscriber, options=options 1574 service, node, subscriber, options=options
1575 ) 1575 )
1576 ) 1576 )
1577 return self.rt_sessions.newSession(deferreds, client.profile) 1577 return self.rt_sessions.new_session(deferreds, client.profile)
1578 # found_nodes = yield self.listNodes(service, profile=client.profile) 1578 # found_nodes = yield self.listNodes(service, profile=client.profile)
1579 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) 1579 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
1580 # d_list = [] 1580 # d_list = []
1581 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): 1581 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
1582 # if nodeIdentifier not in found_nodes: 1582 # if nodeIdentifier not in found_nodes:
1585 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) 1585 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
1586 # defer.returnValue(d_list) 1586 # defer.returnValue(d_list)
1587 1587
1588 # get # 1588 # get #
1589 1589
1590 def _getFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): 1590 def _get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
1591 """Get real-time results for getFromMany session 1591 """Get real-time results for get_from_many session
1592 1592
1593 @param session_id: id of the real-time deferred session 1593 @param session_id: id of the real-time deferred session
1594 @param profile_key: %(doc_profile_key)s 1594 @param profile_key: %(doc_profile_key)s
1595 @param return (tuple): (remaining, results) where: 1595 @param return (tuple): (remaining, results) where:
1596 - remaining is the number of still expected results 1596 - remaining is the number of still expected results
1599 - node (unicode): pubsub node 1599 - node (unicode): pubsub node
1600 - failure (unicode): empty string in case of success, error message else 1600 - failure (unicode): empty string in case of success, error message else
1601 - items (list[s]): raw XML of items 1601 - items (list[s]): raw XML of items
1602 - metadata(dict): serialised metadata 1602 - metadata(dict): serialised metadata
1603 """ 1603 """
1604 profile = self.host.getClient(profile_key).profile 1604 profile = self.host.get_client(profile_key).profile
1605 d = self.rt_sessions.getResults( 1605 d = self.rt_sessions.get_results(
1606 session_id, 1606 session_id,
1607 on_success=lambda result: ("", self.transItemsData(result)), 1607 on_success=lambda result: ("", self.trans_items_data(result)),
1608 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})), 1608 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})),
1609 profile=profile, 1609 profile=profile,
1610 ) 1610 )
1611 d.addCallback( 1611 d.addCallback(
1612 lambda ret: ( 1612 lambda ret: (
1619 ], 1619 ],
1620 ) 1620 )
1621 ) 1621 )
1622 return d 1622 return d
1623 1623
1624 def _getFromMany( 1624 def _get_from_many(
1625 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE 1625 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE
1626 ): 1626 ):
1627 """ 1627 """
1628 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit 1628 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
1629 """ 1629 """
1630 max_item = None if max_item == C.NO_LIMIT else max_item 1630 max_item = None if max_item == C.NO_LIMIT else max_item
1631 extra = self.parseExtra(data_format.deserialise(extra)) 1631 extra = self.parse_extra(data_format.deserialise(extra))
1632 return self.getFromMany( 1632 return self.get_from_many(
1633 [(jid.JID(service), str(node)) for service, node in node_data], 1633 [(jid.JID(service), str(node)) for service, node in node_data],
1634 max_item, 1634 max_item,
1635 extra.rsm_request, 1635 extra.rsm_request,
1636 extra.extra, 1636 extra.extra,
1637 profile_key, 1637 profile_key,
1638 ) 1638 )
1639 1639
1640 def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, 1640 def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None,
1641 profile_key=C.PROF_KEY_NONE): 1641 profile_key=C.PROF_KEY_NONE):
1642 """Get items from many nodes at once 1642 """Get items from many nodes at once
1643 1643
1644 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: 1644 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
1645 - service (jid.JID) is the pubsub service 1645 - service (jid.JID) is the pubsub service
1647 @param max_items (int): optional limit on the number of retrieved items. 1647 @param max_items (int): optional limit on the number of retrieved items.
1648 @param rsm_request (RSMRequest): RSM request data 1648 @param rsm_request (RSMRequest): RSM request data
1649 @param profile_key (unicode): %(doc_profile_key)s 1649 @param profile_key (unicode): %(doc_profile_key)s
1650 @return (str): RT Deferred session id 1650 @return (str): RT Deferred session id
1651 """ 1651 """
1652 client = self.host.getClient(profile_key) 1652 client = self.host.get_client(profile_key)
1653 deferreds = {} 1653 deferreds = {}
1654 for service, node in node_data: 1654 for service, node in node_data:
1655 deferreds[(service, node)] = defer.ensureDeferred(self.getItems( 1655 deferreds[(service, node)] = defer.ensureDeferred(self.get_items(
1656 client, service, node, max_item, rsm_request=rsm_request, extra=extra 1656 client, service, node, max_item, rsm_request=rsm_request, extra=extra
1657 )) 1657 ))
1658 return self.rt_sessions.newSession(deferreds, client.profile) 1658 return self.rt_sessions.new_session(deferreds, client.profile)
1659 1659
1660 1660
1661 @implementer(disco.IDisco) 1661 @implementer(disco.IDisco)
1662 class SatPubSubClient(rsm.PubSubClient): 1662 class SatPubSubClient(rsm.PubSubClient):
1663 1663
1687 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, 1687 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender,
1688 itemIdentifiers, orderBy, rsm_request 1688 itemIdentifiers, orderBy, rsm_request
1689 ) 1689 )
1690 # items must be returned, thus this async point can't stop the workflow (but it 1690 # items must be returned, thus this async point can't stop the workflow (but it
1691 # can modify returned items) 1691 # can modify returned items)
1692 await self.host.trigger.asyncPoint( 1692 await self.host.trigger.async_point(
1693 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, 1693 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response,
1694 extra 1694 extra
1695 ) 1695 )
1696 return items, rsm_response 1696 return items, rsm_response
1697 1697
1698 def _getNodeCallbacks(self, node, event): 1698 def _get_node_callbacks(self, node, event):
1699 """Generate callbacks from given node and event 1699 """Generate callbacks from given node and event
1700 1700
1701 @param node(unicode): node used for the item 1701 @param node(unicode): node used for the item
1702 any registered node which prefix the node will match 1702 any registered node which prefix the node will match
1703 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE 1703 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE
1710 for callback_data in callbacks_dict[event]: 1710 for callback_data in callbacks_dict[event]:
1711 yield callback_data[0] 1711 yield callback_data[0]
1712 except KeyError: 1712 except KeyError:
1713 continue 1713 continue
1714 1714
1715 async def _callNodeCallbacks(self, client, event: pubsub.ItemsEvent) -> None: 1715 async def _call_node_callbacks(self, client, event: pubsub.ItemsEvent) -> None:
1716 """Call sequencially event callbacks of a node 1716 """Call sequencially event callbacks of a node
1717 1717
1718 Callbacks are called sequencially and not in parallel to be sure to respect 1718 Callbacks are called sequencially and not in parallel to be sure to respect
1719 priority (notably for plugin needing to get old items before they are modified or 1719 priority (notably for plugin needing to get old items before they are modified or
1720 deleted from cache). 1720 deleted from cache).
1721 """ 1721 """
1722 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): 1722 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS):
1723 try: 1723 try:
1724 await utils.asDeferred(callback, client, event) 1724 await utils.as_deferred(callback, client, event)
1725 except Exception as e: 1725 except Exception as e:
1726 log.error( 1726 log.error(
1727 f"Error while running items event callback {callback}: {e}" 1727 f"Error while running items event callback {callback}: {e}"
1728 ) 1728 )
1729 1729
1730 def itemsReceived(self, event): 1730 def itemsReceived(self, event):
1731 log.debug("Pubsub items received") 1731 log.debug("Pubsub items received")
1732 client = self.parent 1732 client = self.parent
1733 defer.ensureDeferred(self._callNodeCallbacks(client, event)) 1733 defer.ensureDeferred(self._call_node_callbacks(client, event))
1734 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1734 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1735 raw_items = [i.toXml() for i in event.items] 1735 raw_items = [i.toXml() for i in event.items]
1736 self.host.bridge.psEventRaw( 1736 self.host.bridge.ps_event_raw(
1737 event.sender.full(), 1737 event.sender.full(),
1738 event.nodeIdentifier, 1738 event.nodeIdentifier,
1739 C.PS_ITEMS, 1739 C.PS_ITEMS,
1740 raw_items, 1740 raw_items,
1741 client.profile, 1741 client.profile,
1742 ) 1742 )
1743 1743
1744 def deleteReceived(self, event): 1744 def deleteReceived(self, event):
1745 log.debug(("Publish node deleted")) 1745 log.debug(("Publish node deleted"))
1746 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): 1746 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE):
1747 d = utils.asDeferred(callback, self.parent, event) 1747 d = utils.as_deferred(callback, self.parent, event)
1748 d.addErrback(lambda f: log.error( 1748 d.addErrback(lambda f: log.error(
1749 f"Error while running delete event callback {callback}: {f}" 1749 f"Error while running delete event callback {callback}: {f}"
1750 )) 1750 ))
1751 client = self.parent 1751 client = self.parent
1752 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1752 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1753 self.host.bridge.psEventRaw( 1753 self.host.bridge.ps_event_raw(
1754 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile 1754 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
1755 ) 1755 )
1756 1756
1757 def purgeReceived(self, event): 1757 def purgeReceived(self, event):
1758 log.debug(("Publish node purged")) 1758 log.debug(("Publish node purged"))
1759 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_PURGE): 1759 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE):
1760 d = utils.asDeferred(callback, self.parent, event) 1760 d = utils.as_deferred(callback, self.parent, event)
1761 d.addErrback(lambda f: log.error( 1761 d.addErrback(lambda f: log.error(
1762 f"Error while running purge event callback {callback}: {f}" 1762 f"Error while running purge event callback {callback}: {f}"
1763 )) 1763 ))
1764 client = self.parent 1764 client = self.parent
1765 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1765 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1766 self.host.bridge.psEventRaw( 1766 self.host.bridge.ps_event_raw(
1767 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile 1767 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile
1768 ) 1768 )
1769 1769
1770 def subscriptions(self, service, nodeIdentifier, sender=None): 1770 def subscriptions(self, service, nodeIdentifier, sender=None):
1771 """Return the list of subscriptions to the given service and node. 1771 """Return the list of subscriptions to the given service and node.
1796 subs.append(subscription) 1796 subs.append(subscription)
1797 return subs 1797 return subs
1798 1798
1799 return d.addCallback(cb) 1799 return d.addCallback(cb)
1800 1800
1801 def purgeNode(self, service, nodeIdentifier): 1801 def purge_node(self, service, nodeIdentifier):
1802 """Purge a node (i.e. delete all items from it) 1802 """Purge a node (i.e. delete all items from it)
1803 1803
1804 @param service(jid.JID, None): service to send the item to 1804 @param service(jid.JID, None): service to send the item to
1805 None to use PEP 1805 None to use PEP
1806 @param NodeIdentifier(unicode): PubSub node to use 1806 @param NodeIdentifier(unicode): PubSub node to use