comparison sat/plugins/plugin_xep_0060.py @ 3028:ab2696e34d29

Python 3 port: /!\ this is a huge commit /!\ starting from this commit, SàT is needs Python 3.6+ /!\ SàT maybe be instable or some feature may not work anymore, this will improve with time This patch port backend, bridge and frontends to Python 3. Roughly this has been done this way: - 2to3 tools has been applied (with python 3.7) - all references to python2 have been replaced with python3 (notably shebangs) - fixed files not handled by 2to3 (notably the shell script) - several manual fixes - fixed issues reported by Python 3 that where not handled in Python 2 - replaced "async" with "async_" when needed (it's a reserved word from Python 3.7) - replaced zope's "implements" with @implementer decorator - temporary hack to handle data pickled in database, as str or bytes may be returned, to be checked later - fixed hash comparison for password - removed some code which is not needed anymore with Python 3 - deactivated some code which needs to be checked (notably certificate validation) - tested with jp, fixed reported issues until some basic commands worked - ported Primitivus (after porting dependencies like urwid satext) - more manual fixes
author Goffi <goffi@goffi.org>
date Tue, 13 Aug 2019 19:08:41 +0200
parents cd391ea847cb
children 93e8793a735a
comparison
equal deleted inserted replaced
3027:ff5bcb12ae60 3028:ab2696e34d29
1 #!/usr/bin/env python2 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*- 2 # -*- coding: utf-8 -*-
3 3
4 # SAT plugin for Publish-Subscribe (xep-0060) 4 # SAT plugin for Publish-Subscribe (xep-0060)
5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) 5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org)
6 6
29 from twisted.words.protocols.jabber import jid, error 29 from twisted.words.protocols.jabber import jid, error
30 from twisted.internet import reactor, defer 30 from twisted.internet import reactor, defer
31 from wokkel import disco 31 from wokkel import disco
32 from wokkel import data_form 32 from wokkel import data_form
33 from wokkel import generic 33 from wokkel import generic
34 from zope.interface import implements 34 from zope.interface import implementer
35 from collections import namedtuple 35 from collections import namedtuple
36 import urllib 36 import urllib.request, urllib.parse, urllib.error
37 37
38 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version 38 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version
39 # mam and rsm come from sat_tmp.wokkel too 39 # mam and rsm come from sat_tmp.wokkel too
40 from wokkel import pubsub 40 from wokkel import pubsub
41 from wokkel import rsm 41 from wokkel import rsm
42 from wokkel import mam 42 from wokkel import mam
43 43
44 44
45 PLUGIN_INFO = { 45 PLUGIN_INFO = {
46 C.PI_NAME: u"Publish-Subscribe", 46 C.PI_NAME: "Publish-Subscribe",
47 C.PI_IMPORT_NAME: u"XEP-0060", 47 C.PI_IMPORT_NAME: "XEP-0060",
48 C.PI_TYPE: u"XEP", 48 C.PI_TYPE: "XEP",
49 C.PI_PROTOCOLS: [u"XEP-0060"], 49 C.PI_PROTOCOLS: ["XEP-0060"],
50 C.PI_DEPENDENCIES: [], 50 C.PI_DEPENDENCIES: [],
51 C.PI_RECOMMENDATIONS: [u"XEP-0059", u"XEP-0313"], 51 C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"],
52 C.PI_MAIN: u"XEP_0060", 52 C.PI_MAIN: "XEP_0060",
53 C.PI_HANDLER: u"yes", 53 C.PI_HANDLER: "yes",
54 C.PI_DESCRIPTION: _(u"""Implementation of PubSub Protocol"""), 54 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""),
55 } 55 }
56 56
57 UNSPECIFIED = "unspecified error" 57 UNSPECIFIED = "unspecified error"
58 58
59 59
80 ACCESS_AUTHORIZE = "authorize" 80 ACCESS_AUTHORIZE = "authorize"
81 ACCESS_WHITELIST = "whitelist" 81 ACCESS_WHITELIST = "whitelist"
82 ID_SINGLETON = "current" 82 ID_SINGLETON = "current"
83 83
84 def __init__(self, host): 84 def __init__(self, host):
85 log.info(_(u"PubSub plugin initialization")) 85 log.info(_("PubSub plugin initialization"))
86 self.host = host 86 self.host = host
87 self._rsm = host.plugins.get(u"XEP-0059") 87 self._rsm = host.plugins.get("XEP-0059")
88 self._mam = host.plugins.get(u"XEP-0313") 88 self._mam = host.plugins.get("XEP-0313")
89 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) 89 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks)
90 self.rt_sessions = sat_defer.RTDeferredSessions() 90 self.rt_sessions = sat_defer.RTDeferredSessions()
91 host.bridge.addMethod( 91 host.bridge.addMethod(
92 "psNodeCreate", 92 "psNodeCreate",
93 ".plugin", 93 ".plugin",
94 in_sign="ssa{ss}s", 94 in_sign="ssa{ss}s",
95 out_sign="s", 95 out_sign="s",
96 method=self._createNode, 96 method=self._createNode,
97 async=True, 97 async_=True,
98 ) 98 )
99 host.bridge.addMethod( 99 host.bridge.addMethod(
100 "psNodeConfigurationGet", 100 "psNodeConfigurationGet",
101 ".plugin", 101 ".plugin",
102 in_sign="sss", 102 in_sign="sss",
103 out_sign="a{ss}", 103 out_sign="a{ss}",
104 method=self._getNodeConfiguration, 104 method=self._getNodeConfiguration,
105 async=True, 105 async_=True,
106 ) 106 )
107 host.bridge.addMethod( 107 host.bridge.addMethod(
108 "psNodeConfigurationSet", 108 "psNodeConfigurationSet",
109 ".plugin", 109 ".plugin",
110 in_sign="ssa{ss}s", 110 in_sign="ssa{ss}s",
111 out_sign="", 111 out_sign="",
112 method=self._setNodeConfiguration, 112 method=self._setNodeConfiguration,
113 async=True, 113 async_=True,
114 ) 114 )
115 host.bridge.addMethod( 115 host.bridge.addMethod(
116 "psNodeAffiliationsGet", 116 "psNodeAffiliationsGet",
117 ".plugin", 117 ".plugin",
118 in_sign="sss", 118 in_sign="sss",
119 out_sign="a{ss}", 119 out_sign="a{ss}",
120 method=self._getNodeAffiliations, 120 method=self._getNodeAffiliations,
121 async=True, 121 async_=True,
122 ) 122 )
123 host.bridge.addMethod( 123 host.bridge.addMethod(
124 "psNodeAffiliationsSet", 124 "psNodeAffiliationsSet",
125 ".plugin", 125 ".plugin",
126 in_sign="ssa{ss}s", 126 in_sign="ssa{ss}s",
127 out_sign="", 127 out_sign="",
128 method=self._setNodeAffiliations, 128 method=self._setNodeAffiliations,
129 async=True, 129 async_=True,
130 ) 130 )
131 host.bridge.addMethod( 131 host.bridge.addMethod(
132 "psNodeSubscriptionsGet", 132 "psNodeSubscriptionsGet",
133 ".plugin", 133 ".plugin",
134 in_sign="sss", 134 in_sign="sss",
135 out_sign="a{ss}", 135 out_sign="a{ss}",
136 method=self._getNodeSubscriptions, 136 method=self._getNodeSubscriptions,
137 async=True, 137 async_=True,
138 ) 138 )
139 host.bridge.addMethod( 139 host.bridge.addMethod(
140 "psNodeSubscriptionsSet", 140 "psNodeSubscriptionsSet",
141 ".plugin", 141 ".plugin",
142 in_sign="ssa{ss}s", 142 in_sign="ssa{ss}s",
143 out_sign="", 143 out_sign="",
144 method=self._setNodeSubscriptions, 144 method=self._setNodeSubscriptions,
145 async=True, 145 async_=True,
146 ) 146 )
147 host.bridge.addMethod( 147 host.bridge.addMethod(
148 "psNodePurge", 148 "psNodePurge",
149 ".plugin", 149 ".plugin",
150 in_sign="sss", 150 in_sign="sss",
151 out_sign="", 151 out_sign="",
152 method=self._purgeNode, 152 method=self._purgeNode,
153 async=True, 153 async_=True,
154 ) 154 )
155 host.bridge.addMethod( 155 host.bridge.addMethod(
156 "psNodeDelete", 156 "psNodeDelete",
157 ".plugin", 157 ".plugin",
158 in_sign="sss", 158 in_sign="sss",
159 out_sign="", 159 out_sign="",
160 method=self._deleteNode, 160 method=self._deleteNode,
161 async=True, 161 async_=True,
162 ) 162 )
163 host.bridge.addMethod( 163 host.bridge.addMethod(
164 "psNodeWatchAdd", 164 "psNodeWatchAdd",
165 ".plugin", 165 ".plugin",
166 in_sign="sss", 166 in_sign="sss",
167 out_sign="", 167 out_sign="",
168 method=self._addWatch, 168 method=self._addWatch,
169 async=False, 169 async_=False,
170 ) 170 )
171 host.bridge.addMethod( 171 host.bridge.addMethod(
172 "psNodeWatchRemove", 172 "psNodeWatchRemove",
173 ".plugin", 173 ".plugin",
174 in_sign="sss", 174 in_sign="sss",
175 out_sign="", 175 out_sign="",
176 method=self._removeWatch, 176 method=self._removeWatch,
177 async=False, 177 async_=False,
178 ) 178 )
179 host.bridge.addMethod( 179 host.bridge.addMethod(
180 "psAffiliationsGet", 180 "psAffiliationsGet",
181 ".plugin", 181 ".plugin",
182 in_sign="sss", 182 in_sign="sss",
183 out_sign="a{ss}", 183 out_sign="a{ss}",
184 method=self._getAffiliations, 184 method=self._getAffiliations,
185 async=True, 185 async_=True,
186 ) 186 )
187 host.bridge.addMethod( 187 host.bridge.addMethod(
188 "psItemsGet", 188 "psItemsGet",
189 ".plugin", 189 ".plugin",
190 in_sign="ssiassa{ss}s", 190 in_sign="ssiassa{ss}s",
191 out_sign="(asa{ss})", 191 out_sign="(asa{ss})",
192 method=self._getItems, 192 method=self._getItems,
193 async=True, 193 async_=True,
194 ) 194 )
195 host.bridge.addMethod( 195 host.bridge.addMethod(
196 "psItemSend", 196 "psItemSend",
197 ".plugin", 197 ".plugin",
198 in_sign="ssssa{ss}s", 198 in_sign="ssssa{ss}s",
199 out_sign="s", 199 out_sign="s",
200 method=self._sendItem, 200 method=self._sendItem,
201 async=True, 201 async_=True,
202 ) 202 )
203 host.bridge.addMethod( 203 host.bridge.addMethod(
204 "psItemsSend", 204 "psItemsSend",
205 ".plugin", 205 ".plugin",
206 in_sign="ssasa{ss}s", 206 in_sign="ssasa{ss}s",
207 out_sign="as", 207 out_sign="as",
208 method=self._sendItems, 208 method=self._sendItems,
209 async=True, 209 async_=True,
210 ) 210 )
211 host.bridge.addMethod( 211 host.bridge.addMethod(
212 "psRetractItem", 212 "psRetractItem",
213 ".plugin", 213 ".plugin",
214 in_sign="sssbs", 214 in_sign="sssbs",
215 out_sign="", 215 out_sign="",
216 method=self._retractItem, 216 method=self._retractItem,
217 async=True, 217 async_=True,
218 ) 218 )
219 host.bridge.addMethod( 219 host.bridge.addMethod(
220 "psRetractItems", 220 "psRetractItems",
221 ".plugin", 221 ".plugin",
222 in_sign="ssasbs", 222 in_sign="ssasbs",
223 out_sign="", 223 out_sign="",
224 method=self._retractItems, 224 method=self._retractItems,
225 async=True, 225 async_=True,
226 ) 226 )
227 host.bridge.addMethod( 227 host.bridge.addMethod(
228 "psSubscribe", 228 "psSubscribe",
229 ".plugin", 229 ".plugin",
230 in_sign="ssa{ss}s", 230 in_sign="ssa{ss}s",
231 out_sign="s", 231 out_sign="s",
232 method=self._subscribe, 232 method=self._subscribe,
233 async=True, 233 async_=True,
234 ) 234 )
235 host.bridge.addMethod( 235 host.bridge.addMethod(
236 "psUnsubscribe", 236 "psUnsubscribe",
237 ".plugin", 237 ".plugin",
238 in_sign="sss", 238 in_sign="sss",
239 out_sign="", 239 out_sign="",
240 method=self._unsubscribe, 240 method=self._unsubscribe,
241 async=True, 241 async_=True,
242 ) 242 )
243 host.bridge.addMethod( 243 host.bridge.addMethod(
244 "psSubscriptionsGet", 244 "psSubscriptionsGet",
245 ".plugin", 245 ".plugin",
246 in_sign="sss", 246 in_sign="sss",
247 out_sign="aa{ss}", 247 out_sign="aa{ss}",
248 method=self._subscriptions, 248 method=self._subscriptions,
249 async=True, 249 async_=True,
250 ) 250 )
251 host.bridge.addMethod( 251 host.bridge.addMethod(
252 "psSubscribeToMany", 252 "psSubscribeToMany",
253 ".plugin", 253 ".plugin",
254 in_sign="a(ss)sa{ss}s", 254 in_sign="a(ss)sa{ss}s",
259 "psGetSubscribeRTResult", 259 "psGetSubscribeRTResult",
260 ".plugin", 260 ".plugin",
261 in_sign="ss", 261 in_sign="ss",
262 out_sign="(ua(sss))", 262 out_sign="(ua(sss))",
263 method=self._manySubscribeRTResult, 263 method=self._manySubscribeRTResult,
264 async=True, 264 async_=True,
265 ) 265 )
266 host.bridge.addMethod( 266 host.bridge.addMethod(
267 "psGetFromMany", 267 "psGetFromMany",
268 ".plugin", 268 ".plugin",
269 in_sign="a(ss)ia{ss}s", 269 in_sign="a(ss)ia{ss}s",
274 "psGetFromManyRTResult", 274 "psGetFromManyRTResult",
275 ".plugin", 275 ".plugin",
276 in_sign="ss", 276 in_sign="ss",
277 out_sign="(ua(sssasa{ss}))", 277 out_sign="(ua(sssasa{ss}))",
278 method=self._getFromManyRTResult, 278 method=self._getFromManyRTResult,
279 async=True, 279 async_=True,
280 ) 280 )
281 281
282 #  high level observer method 282 #  high level observer method
283 host.bridge.addSignal( 283 host.bridge.addSignal(
284 "psEvent", ".plugin", signature="ssssss" 284 "psEvent", ".plugin", signature="ssssss"
301 self.host.memory.getConfig("", "pubsub_service") 301 self.host.memory.getConfig("", "pubsub_service")
302 ) 302 )
303 except RuntimeError: 303 except RuntimeError:
304 log.info( 304 log.info(
305 _( 305 _(
306 u"Can't retrieve pubsub_service from conf, we'll use first one that we find" 306 "Can't retrieve pubsub_service from conf, we'll use first one that we find"
307 ) 307 )
308 ) 308 )
309 client.pubsub_service = yield self.host.findServiceEntity( 309 client.pubsub_service = yield self.host.findServiceEntity(
310 client, "pubsub", "service" 310 client, "pubsub", "service"
311 ) 311 )
357 mam_request = None 357 mam_request = None
358 else: 358 else:
359 mam_request = self._mam.parseExtra(extra, with_rsm=False) 359 mam_request = self._mam.parseExtra(extra, with_rsm=False)
360 360
361 if mam_request is not None: 361 if mam_request is not None:
362 assert u"mam" not in extra 362 assert "mam" not in extra
363 extra[u"mam"] = mam_request 363 extra["mam"] = mam_request
364 364
365 return Extra(rsm_request, extra) 365 return Extra(rsm_request, extra)
366 366
367 def addManagedNode(self, node, **kwargs): 367 def addManagedNode(self, node, **kwargs):
368 """Add a handler for a node 368 """Add a handler for a node
375 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE 375 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE
376 """ 376 """
377 assert node is not None 377 assert node is not None
378 assert kwargs 378 assert kwargs
379 callbacks = self._node_cb.setdefault(node, {}) 379 callbacks = self._node_cb.setdefault(node, {})
380 for event, cb in kwargs.iteritems(): 380 for event, cb in kwargs.items():
381 event_name = event[:-3] 381 event_name = event[:-3]
382 assert event_name in C.PS_EVENTS 382 assert event_name in C.PS_EVENTS
383 callbacks.setdefault(event_name, []).append(cb) 383 callbacks.setdefault(event_name, []).append(cb)
384 384
385 def removeManagedNode(self, node, *args): 385 def removeManagedNode(self, node, *args):
393 registred_cb = self._node_cb[node] 393 registred_cb = self._node_cb[node]
394 except KeyError: 394 except KeyError:
395 pass 395 pass
396 else: 396 else:
397 for callback in args: 397 for callback in args:
398 for event, cb_list in registred_cb.iteritems(): 398 for event, cb_list in registred_cb.items():
399 try: 399 try:
400 cb_list.remove(callback) 400 cb_list.remove(callback)
401 except ValueError: 401 except ValueError:
402 pass 402 pass
403 else: 403 else:
404 log.debug( 404 log.debug(
405 u"removed callback {cb} for event {event} on node {node}".format( 405 "removed callback {cb} for event {event} on node {node}".format(
406 cb=callback, event=event, node=node 406 cb=callback, event=event, node=node
407 ) 407 )
408 ) 408 )
409 if not cb_list: 409 if not cb_list:
410 del registred_cb[event] 410 del registred_cb[event]
411 if not registred_cb: 411 if not registred_cb:
412 del self._node_cb[node] 412 del self._node_cb[node]
413 return 413 return
414 log.error( 414 log.error(
415 u"Trying to remove inexistant callback {cb} for node {node}".format( 415 "Trying to remove inexistant callback {cb} for node {node}".format(
416 cb=callback, node=node 416 cb=callback, node=node
417 ) 417 )
418 ) 418 )
419 419
420 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): 420 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
452 client = self.host.getClient(profile_key) 452 client = self.host.getClient(profile_key)
453 service = None if not service else jid.JID(service) 453 service = None if not service else jid.JID(service)
454 d = self.sendItem( 454 d = self.sendItem(
455 client, service, nodeIdentifier, payload, item_id or None, extra 455 client, service, nodeIdentifier, payload, item_id or None, extra
456 ) 456 )
457 d.addCallback(lambda ret: ret or u"") 457 d.addCallback(lambda ret: ret or "")
458 return d 458 return d
459 459
460 def _sendItems(self, service, nodeIdentifier, items, extra=None, 460 def _sendItems(self, service, nodeIdentifier, items, extra=None,
461 profile_key=C.PROF_KEY_NONE): 461 profile_key=C.PROF_KEY_NONE):
462 client = self.host.getClient(profile_key) 462 client = self.host.getClient(profile_key)
463 service = None if not service else jid.JID(service) 463 service = None if not service else jid.JID(service)
464 try: 464 try:
465 items = [generic.parseXml(item.encode('utf-8')) for item in items] 465 items = [generic.parseXml(item.encode('utf-8')) for item in items]
466 except Exception as e: 466 except Exception as e:
467 raise exceptions.DataError(_(u"Can't parse items: {msg}").format( 467 raise exceptions.DataError(_("Can't parse items: {msg}").format(
468 msg=e)) 468 msg=e))
469 d = self.sendItems( 469 d = self.sendItems(
470 client, service, nodeIdentifier, items, extra 470 client, service, nodeIdentifier, items, extra
471 ) 471 )
472 return d 472 return d
502 502
503 def _publishCb(self, iq_result): 503 def _publishCb(self, iq_result):
504 """Parse publish result, and return ids given by pubsub service""" 504 """Parse publish result, and return ids given by pubsub service"""
505 try: 505 try:
506 item_ids = [item['id'] 506 item_ids = [item['id']
507 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, u'item')] 507 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
508 except AttributeError: 508 except AttributeError:
509 return [] 509 return []
510 return item_ids 510 return item_ids
511 511
512 def sendItems(self, client, service, nodeIdentifier, items, extra=None): 512 def sendItems(self, client, service, nodeIdentifier, items, extra=None):
520 @param extra(dict, None): extra option, not used yet 520 @param extra(dict, None): extra option, not used yet
521 @return (list[unicode]): ids of the created items 521 @return (list[unicode]): ids of the created items
522 """ 522 """
523 parsed_items = [] 523 parsed_items = []
524 for item in items: 524 for item in items:
525 if item.name != u'item': 525 if item.name != 'item':
526 raise exceptions.DataError(_(u"Invalid item: {xml}").format(item.toXml())) 526 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
527 item_id = item.getAttribute(u"id") 527 item_id = item.getAttribute("id")
528 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) 528 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
529 d = self.publish(client, service, nodeIdentifier, parsed_items) 529 d = self.publish(client, service, nodeIdentifier, parsed_items)
530 d.addCallback(self._publishCb) 530 d.addCallback(self._publishCb)
531 return d 531 return d
532 532
536 ) 536 )
537 537
538 def _unwrapMAMMessage(self, message_elt): 538 def _unwrapMAMMessage(self, message_elt):
539 try: 539 try:
540 item_elt = ( 540 item_elt = (
541 message_elt.elements(mam.NS_MAM, "result").next() 541 next(message_elt.elements(mam.NS_MAM, "result").next()
542 .elements(C.NS_FORWARD, "forwarded").next() 542 .elements(C.NS_FORWARD, "forwarded").next()
543 .elements(C.NS_CLIENT, "message").next() 543 .elements(C.NS_CLIENT, "message").next()
544 .elements("http://jabber.org/protocol/pubsub#event", "event").next() 544 .elements("http://jabber.org/protocol/pubsub#event", "event").next()
545 .elements("http://jabber.org/protocol/pubsub#event", "items").next() 545 .elements("http://jabber.org/protocol/pubsub#event", "items").next()
546 .elements("http://jabber.org/protocol/pubsub#event", "item").next() 546 .elements("http://jabber.org/protocol/pubsub#event", "item"))
547 ) 547 )
548 except StopIteration: 548 except StopIteration:
549 raise exceptions.DataError(u"Can't find Item in MAM message element") 549 raise exceptions.DataError("Can't find Item in MAM message element")
550 return item_elt 550 return item_elt
551 551
552 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, 552 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None,
553 extra_dict=None, profile_key=C.PROF_KEY_NONE): 553 extra_dict=None, profile_key=C.PROF_KEY_NONE):
554 """Get items from pubsub node 554 """Get items from pubsub node
591 - service, node: service and node used 591 - service, node: service and node used
592 """ 592 """
593 if item_ids and max_items is not None: 593 if item_ids and max_items is not None:
594 max_items = None 594 max_items = None
595 if rsm_request and item_ids: 595 if rsm_request and item_ids:
596 raise ValueError(u"items_id can't be used with rsm") 596 raise ValueError("items_id can't be used with rsm")
597 if extra is None: 597 if extra is None:
598 extra = {} 598 extra = {}
599 try: 599 try:
600 mam_query = extra["mam"] 600 mam_query = extra["mam"]
601 except KeyError: 601 except KeyError:
614 d.addErrback(sat_defer.stanza2NotFound) 614 d.addErrback(sat_defer.stanza2NotFound)
615 d.addTimeout(TIMEOUT, reactor) 615 d.addTimeout(TIMEOUT, reactor)
616 else: 616 else:
617 # if mam is requested, we have to do a totally different query 617 # if mam is requested, we have to do a totally different query
618 if self._mam is None: 618 if self._mam is None:
619 raise exceptions.NotFound(u"MAM (XEP-0313) plugin is not available") 619 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available")
620 if max_items is not None: 620 if max_items is not None:
621 raise exceptions.DataError(u"max_items parameter can't be used with MAM") 621 raise exceptions.DataError("max_items parameter can't be used with MAM")
622 if item_ids: 622 if item_ids:
623 raise exceptions.DataError(u"items_ids parameter can't be used with MAM") 623 raise exceptions.DataError("items_ids parameter can't be used with MAM")
624 if mam_query.node is None: 624 if mam_query.node is None:
625 mam_query.node = node 625 mam_query.node = node
626 elif mam_query.node != node: 626 elif mam_query.node != node:
627 raise exceptions.DataError( 627 raise exceptions.DataError(
628 u"MAM query node is incoherent with getItems's node" 628 "MAM query node is incoherent with getItems's node"
629 ) 629 )
630 if mam_query.rsm is None: 630 if mam_query.rsm is None:
631 mam_query.rsm = rsm_request 631 mam_query.rsm = rsm_request
632 else: 632 else:
633 if mam_query.rsm != rsm_request: 633 if mam_query.rsm != rsm_request:
634 raise exceptions.DataError( 634 raise exceptions.DataError(
635 u"Conflict between RSM request and MAM's RSM request" 635 "Conflict between RSM request and MAM's RSM request"
636 ) 636 )
637 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) 637 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage)
638 638
639 try: 639 try:
640 subscribe = C.bool(extra["subscribe"]) 640 subscribe = C.bool(extra["subscribe"])
642 subscribe = False 642 subscribe = False
643 643
644 def subscribeEb(failure, service, node): 644 def subscribeEb(failure, service, node):
645 failure.trap(error.StanzaError) 645 failure.trap(error.StanzaError)
646 log.warning( 646 log.warning(
647 u"Could not subscribe to node {} on service {}: {}".format( 647 "Could not subscribe to node {} on service {}: {}".format(
648 node, unicode(service), unicode(failure.value) 648 node, str(service), str(failure.value)
649 ) 649 )
650 ) 650 )
651 651
652 def doSubscribe(data): 652 def doSubscribe(data):
653 self.subscribe(client, service, node).addErrback( 653 self.subscribe(client, service, node).addErrback(
668 "uri": self.getNodeURI(service_jid, node), 668 "uri": self.getNodeURI(service_jid, node),
669 } 669 }
670 if rsm_request is not None and rsm_response is not None: 670 if rsm_request is not None and rsm_response is not None:
671 metadata.update( 671 metadata.update(
672 { 672 {
673 u"rsm_" + key: value 673 "rsm_" + key: value
674 for key, value in rsm_response.toDict().iteritems() 674 for key, value in rsm_response.toDict().items()
675 } 675 }
676 ) 676 )
677 if mam_response is not None: 677 if mam_response is not None:
678 for key, value in mam_response.iteritems(): 678 for key, value in mam_response.items():
679 metadata[u"mam_" + key] = value 679 metadata["mam_" + key] = value
680 return (items, metadata) 680 return (items, metadata)
681 681
682 d.addCallback(addMetadata) 682 d.addCallback(addMetadata)
683 return d 683 return d
684 684
756 client, jid.JID(service_s) if service_s else None, nodeIdentifier 756 client, jid.JID(service_s) if service_s else None, nodeIdentifier
757 ) 757 )
758 758
759 def serialize(form): 759 def serialize(form):
760 # FIXME: better more generic dataform serialisation should be available in SàT 760 # FIXME: better more generic dataform serialisation should be available in SàT
761 return {f.var: unicode(f.value) for f in form.fields.values()} 761 return {f.var: str(f.value) for f in list(form.fields.values())}
762 762
763 d.addCallback(serialize) 763 d.addCallback(serialize)
764 return d 764 return d
765 765
766 def getConfiguration(self, client, service, nodeIdentifier): 766 def getConfiguration(self, client, service, nodeIdentifier):
820 affiliations_elt = next( 820 affiliations_elt = next(
821 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations")) 821 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations"))
822 ) 822 )
823 except StopIteration: 823 except StopIteration:
824 raise ValueError( 824 raise ValueError(
825 _(u"Invalid result: missing <affiliations> element: {}").format( 825 _("Invalid result: missing <affiliations> element: {}").format(
826 iq_elt.toXml 826 iq_elt.toXml
827 ) 827 )
828 ) 828 )
829 try: 829 try:
830 return { 830 return {
831 e["node"]: e["affiliation"] 831 e["node"]: e["affiliation"]
832 for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation")) 832 for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation"))
833 } 833 }
834 except KeyError: 834 except KeyError:
835 raise ValueError( 835 raise ValueError(
836 _(u"Invalid result: bad <affiliation> element: {}").format( 836 _("Invalid result: bad <affiliation> element: {}").format(
837 iq_elt.toXml 837 iq_elt.toXml
838 ) 838 )
839 ) 839 )
840 840
841 d = request.send(client.xmlstream) 841 d = request.send(client.xmlstream)
846 client = self.host.getClient(profile_key) 846 client = self.host.getClient(profile_key)
847 d = self.getNodeAffiliations( 847 d = self.getNodeAffiliations(
848 client, jid.JID(service_s) if service_s else None, nodeIdentifier 848 client, jid.JID(service_s) if service_s else None, nodeIdentifier
849 ) 849 )
850 d.addCallback( 850 d.addCallback(
851 lambda affiliations: {j.full(): a for j, a in affiliations.iteritems()} 851 lambda affiliations: {j.full(): a for j, a in affiliations.items()}
852 ) 852 )
853 return d 853 return d
854 854
855 def getNodeAffiliations(self, client, service, nodeIdentifier): 855 def getNodeAffiliations(self, client, service, nodeIdentifier):
856 """Retrieve affiliations of a node owned by profile""" 856 """Retrieve affiliations of a node owned by profile"""
863 affiliations_elt = next( 863 affiliations_elt = next(
864 iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations")) 864 iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations"))
865 ) 865 )
866 except StopIteration: 866 except StopIteration:
867 raise ValueError( 867 raise ValueError(
868 _(u"Invalid result: missing <affiliations> element: {}").format( 868 _("Invalid result: missing <affiliations> element: {}").format(
869 iq_elt.toXml 869 iq_elt.toXml
870 ) 870 )
871 ) 871 )
872 try: 872 try:
873 return { 873 return {
876 (pubsub.NS_PUBSUB_OWNER, "affiliation") 876 (pubsub.NS_PUBSUB_OWNER, "affiliation")
877 ) 877 )
878 } 878 }
879 except KeyError: 879 except KeyError:
880 raise ValueError( 880 raise ValueError(
881 _(u"Invalid result: bad <affiliation> element: {}").format( 881 _("Invalid result: bad <affiliation> element: {}").format(
882 iq_elt.toXml 882 iq_elt.toXml
883 ) 883 )
884 ) 884 )
885 885
886 d = request.send(client.xmlstream) 886 d = request.send(client.xmlstream)
890 def _setNodeAffiliations( 890 def _setNodeAffiliations(
891 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE 891 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE
892 ): 892 ):
893 client = self.host.getClient(profile_key) 893 client = self.host.getClient(profile_key)
894 affiliations = { 894 affiliations = {
895 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.iteritems() 895 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items()
896 } 896 }
897 d = self.setNodeAffiliations( 897 d = self.setNodeAffiliations(
898 client, 898 client,
899 jid.JID(service_s) if service_s else None, 899 jid.JID(service_s) if service_s else None,
900 nodeIdentifier, 900 nodeIdentifier,
984 984
985 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 985 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
986 client = self.host.getClient(profile_key) 986 client = self.host.getClient(profile_key)
987 service = None if not service else jid.JID(service) 987 service = None if not service else jid.JID(service)
988 d = self.subscribe(client, service, nodeIdentifier, options=options or None) 988 d = self.subscribe(client, service, nodeIdentifier, options=options or None)
989 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or u"") 989 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
990 return d 990 return d
991 991
992 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): 992 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None):
993 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe 993 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
994 return client.pubsub_client.subscribe( 994 return client.pubsub_client.subscribe(
1063 # urlencode MUST NOT BE USED. 1063 # urlencode MUST NOT BE USED.
1064 query_data = [("node", node.encode("utf-8"))] 1064 query_data = [("node", node.encode("utf-8"))]
1065 if item is not None: 1065 if item is not None:
1066 query_data.append(("item", item.encode("utf-8"))) 1066 query_data.append(("item", item.encode("utf-8")))
1067 return "xmpp:{service}?;{query}".format( 1067 return "xmpp:{service}?;{query}".format(
1068 service=service.userhost(), query=urllib.urlencode(query_data) 1068 service=service.userhost(), query=urllib.parse.urlencode(query_data)
1069 ).decode("utf-8") 1069 )
1070 1070
1071 ## methods to manage several stanzas/jids at once ## 1071 ## methods to manage several stanzas/jids at once ##
1072 1072
1073 # generic # 1073 # generic #
1074 1074
1097 else: 1097 else:
1098 items = [item_cb(item) for item in items] 1098 items = [item_cb(item) for item in items]
1099 1099
1100 return ( 1100 return (
1101 items, 1101 items,
1102 {key: unicode(value) for key, value in metadata.iteritems()}, 1102 {key: str(value) for key, value in metadata.items()},
1103 ) 1103 )
1104 1104
1105 def transItemsDataD(self, items_data, item_cb, serialise=False): 1105 def transItemsDataD(self, items_data, item_cb, serialise=False):
1106 """Helper method to transform result from [getItems], deferred version 1106 """Helper method to transform result from [getItems], deferred version
1107 1107
1120 """ 1120 """
1121 items, metadata = items_data 1121 items, metadata = items_data
1122 1122
1123 def eb(failure): 1123 def eb(failure):
1124 log.warning( 1124 log.warning(
1125 "Error while serialising/parsing item: {}".format(unicode(failure.value)) 1125 "Error while serialising/parsing item: {}".format(str(failure.value))
1126 ) 1126 )
1127 1127
1128 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) 1128 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])
1129 1129
1130 def finishSerialisation(parsed_items): 1130 def finishSerialisation(parsed_items):
1133 else: 1133 else:
1134 items = [i for i in parsed_items if i is not None] 1134 items = [i for i in parsed_items if i is not None]
1135 1135
1136 return ( 1136 return (
1137 items, 1137 items,
1138 {key: unicode(value) for key, value in metadata.iteritems()}, 1138 {key: str(value) for key, value in metadata.items()},
1139 ) 1139 )
1140 1140
1141 d.addCallback(finishSerialisation) 1141 d.addCallback(finishSerialisation)
1142 return d 1142 return d
1143 1143
1154 if failure_result is None: 1154 if failure_result is None:
1155 failure_result = () 1155 failure_result = ()
1156 return [ 1156 return [
1157 ("", result) 1157 ("", result)
1158 if success 1158 if success
1159 else (unicode(result.result) or UNSPECIFIED, failure_result) 1159 else (str(result.result) or UNSPECIFIED, failure_result)
1160 for success, result in results 1160 for success, result in results
1161 ] 1161 ]
1162 1162
1163 # subscribe # 1163 # subscribe #
1164 1164
1166 client = self.host.getClient(profile_key) 1166 client = self.host.getClient(profile_key)
1167 d = self.getNodeSubscriptions( 1167 d = self.getNodeSubscriptions(
1168 client, jid.JID(service_s) if service_s else None, nodeIdentifier 1168 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1169 ) 1169 )
1170 d.addCallback( 1170 d.addCallback(
1171 lambda subscriptions: {j.full(): a for j, a in subscriptions.iteritems()} 1171 lambda subscriptions: {j.full(): a for j, a in subscriptions.items()}
1172 ) 1172 )
1173 return d 1173 return d
1174 1174
1175 def getNodeSubscriptions(self, client, service, nodeIdentifier): 1175 def getNodeSubscriptions(self, client, service, nodeIdentifier):
1176 """Retrieve subscriptions to a node 1176 """Retrieve subscriptions to a node
1188 subscriptions_elt = next( 1188 subscriptions_elt = next(
1189 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions")) 1189 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions"))
1190 ) 1190 )
1191 except StopIteration: 1191 except StopIteration:
1192 raise ValueError( 1192 raise ValueError(
1193 _(u"Invalid result: missing <subscriptions> element: {}").format( 1193 _("Invalid result: missing <subscriptions> element: {}").format(
1194 iq_elt.toXml 1194 iq_elt.toXml
1195 ) 1195 )
1196 ) 1196 )
1197 except AttributeError as e: 1197 except AttributeError as e:
1198 raise ValueError(_(u"Invalid result: {}").format(e)) 1198 raise ValueError(_("Invalid result: {}").format(e))
1199 try: 1199 try:
1200 return { 1200 return {
1201 jid.JID(s["jid"]): s["subscription"] 1201 jid.JID(s["jid"]): s["subscription"]
1202 for s in subscriptions_elt.elements( 1202 for s in subscriptions_elt.elements(
1203 (pubsub.NS_PUBSUB, "subscription") 1203 (pubsub.NS_PUBSUB, "subscription")
1204 ) 1204 )
1205 } 1205 }
1206 except KeyError: 1206 except KeyError:
1207 raise ValueError( 1207 raise ValueError(
1208 _(u"Invalid result: bad <subscription> element: {}").format( 1208 _("Invalid result: bad <subscription> element: {}").format(
1209 iq_elt.toXml 1209 iq_elt.toXml
1210 ) 1210 )
1211 ) 1211 )
1212 1212
1213 d = request.send(client.xmlstream) 1213 d = request.send(client.xmlstream)
1218 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE 1218 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
1219 ): 1219 ):
1220 client = self.host.getClient(profile_key) 1220 client = self.host.getClient(profile_key)
1221 subscriptions = { 1221 subscriptions = {
1222 jid.JID(jid_): subscription 1222 jid.JID(jid_): subscription
1223 for jid_, subscription in subscriptions.iteritems() 1223 for jid_, subscription in subscriptions.items()
1224 } 1224 }
1225 d = self.setNodeSubscriptions( 1225 d = self.setNodeSubscriptions(
1226 client, 1226 client,
1227 jid.JID(service_s) if service_s else None, 1227 jid.JID(service_s) if service_s else None,
1228 nodeIdentifier, 1228 nodeIdentifier,
1239 request = pubsub.PubSubRequest("subscriptionsSet") 1239 request = pubsub.PubSubRequest("subscriptionsSet")
1240 request.recipient = service 1240 request.recipient = service
1241 request.nodeIdentifier = nodeIdentifier 1241 request.nodeIdentifier = nodeIdentifier
1242 request.subscriptions = { 1242 request.subscriptions = {
1243 pubsub.Subscription(nodeIdentifier, jid_, state) 1243 pubsub.Subscription(nodeIdentifier, jid_, state)
1244 for jid_, state in subscriptions.iteritems() 1244 for jid_, state in subscriptions.items()
1245 } 1245 }
1246 d = request.send(client.xmlstream) 1246 d = request.send(client.xmlstream)
1247 return d 1247 return d
1248 1248
1249 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): 1249 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
1260 """ 1260 """
1261 profile = self.host.getClient(profile_key).profile 1261 profile = self.host.getClient(profile_key).profile
1262 d = self.rt_sessions.getResults( 1262 d = self.rt_sessions.getResults(
1263 session_id, 1263 session_id,
1264 on_success=lambda result: "", 1264 on_success=lambda result: "",
1265 on_error=lambda failure: unicode(failure.value), 1265 on_error=lambda failure: str(failure.value),
1266 profile=profile, 1266 profile=profile,
1267 ) 1267 )
1268 # we need to convert jid.JID to unicode with full() to serialise it for the bridge 1268 # we need to convert jid.JID to unicode with full() to serialise it for the bridge
1269 d.addCallback( 1269 d.addCallback(
1270 lambda ret: ( 1270 lambda ret: (
1271 ret[0], 1271 ret[0],
1272 [ 1272 [
1273 (service.full(), node, "" if success else failure or UNSPECIFIED) 1273 (service.full(), node, "" if success else failure or UNSPECIFIED)
1274 for (service, node), (success, failure) in ret[1].iteritems() 1274 for (service, node), (success, failure) in ret[1].items()
1275 ], 1275 ],
1276 ) 1276 )
1277 ) 1277 )
1278 return d 1278 return d
1279 1279
1280 def _subscribeToMany( 1280 def _subscribeToMany(
1281 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE 1281 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE
1282 ): 1282 ):
1283 return self.subscribeToMany( 1283 return self.subscribeToMany(
1284 [(jid.JID(service), unicode(node)) for service, node in node_data], 1284 [(jid.JID(service), str(node)) for service, node in node_data],
1285 jid.JID(subscriber), 1285 jid.JID(subscriber),
1286 options, 1286 options,
1287 profile_key, 1287 profile_key,
1288 ) 1288 )
1289 1289
1335 """ 1335 """
1336 profile = self.host.getClient(profile_key).profile 1336 profile = self.host.getClient(profile_key).profile
1337 d = self.rt_sessions.getResults( 1337 d = self.rt_sessions.getResults(
1338 session_id, 1338 session_id,
1339 on_success=lambda result: ("", self.transItemsData(result)), 1339 on_success=lambda result: ("", self.transItemsData(result)),
1340 on_error=lambda failure: (unicode(failure.value) or UNSPECIFIED, ([], {})), 1340 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})),
1341 profile=profile, 1341 profile=profile,
1342 ) 1342 )
1343 d.addCallback( 1343 d.addCallback(
1344 lambda ret: ( 1344 lambda ret: (
1345 ret[0], 1345 ret[0],
1346 [ 1346 [
1347 (service.full(), node, failure, items, metadata) 1347 (service.full(), node, failure, items, metadata)
1348 for (service, node), (success, (failure, (items, metadata))) in ret[ 1348 for (service, node), (success, (failure, (items, metadata))) in ret[
1349 1 1349 1
1350 ].iteritems() 1350 ].items()
1351 ], 1351 ],
1352 ) 1352 )
1353 ) 1353 )
1354 return d 1354 return d
1355 1355
1360 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit 1360 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
1361 """ 1361 """
1362 max_item = None if max_item == C.NO_LIMIT else max_item 1362 max_item = None if max_item == C.NO_LIMIT else max_item
1363 extra = self.parseExtra(extra_dict) 1363 extra = self.parseExtra(extra_dict)
1364 return self.getFromMany( 1364 return self.getFromMany(
1365 [(jid.JID(service), unicode(node)) for service, node in node_data], 1365 [(jid.JID(service), str(node)) for service, node in node_data],
1366 max_item, 1366 max_item,
1367 extra.rsm_request, 1367 extra.rsm_request,
1368 extra.extra, 1368 extra.extra,
1369 profile_key, 1369 profile_key,
1370 ) 1370 )
1388 client, service, node, max_item, rsm_request=rsm_request, extra=extra 1388 client, service, node, max_item, rsm_request=rsm_request, extra=extra
1389 ) 1389 )
1390 return self.rt_sessions.newSession(deferreds, client.profile) 1390 return self.rt_sessions.newSession(deferreds, client.profile)
1391 1391
1392 1392
1393 @implementer(disco.IDisco)
1393 class SatPubSubClient(rsm.PubSubClient): 1394 class SatPubSubClient(rsm.PubSubClient):
1394 implements(disco.IDisco)
1395 1395
1396 def __init__(self, host, parent_plugin): 1396 def __init__(self, host, parent_plugin):
1397 self.host = host 1397 self.host = host
1398 self.parent_plugin = parent_plugin 1398 self.parent_plugin = parent_plugin
1399 rsm.PubSubClient.__init__(self) 1399 rsm.PubSubClient.__init__(self)
1407 @param node(unicode): node used for the item 1407 @param node(unicode): node used for the item
1408 any registered node which prefix the node will match 1408 any registered node which prefix the node will match
1409 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE 1409 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE
1410 @return (iterator[callable]): callbacks for this node/event 1410 @return (iterator[callable]): callbacks for this node/event
1411 """ 1411 """
1412 for registered_node, callbacks_dict in self.parent_plugin._node_cb.iteritems(): 1412 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items():
1413 if not node.startswith(registered_node): 1413 if not node.startswith(registered_node):
1414 continue 1414 continue
1415 try: 1415 try:
1416 for callback in callbacks_dict[event]: 1416 for callback in callbacks_dict[event]:
1417 yield callback 1417 yield callback
1418 except KeyError: 1418 except KeyError:
1419 continue 1419 continue
1420 1420
1421 1421
1422 def itemsReceived(self, event): 1422 def itemsReceived(self, event):
1423 log.debug(u"Pubsub items received") 1423 log.debug("Pubsub items received")
1424 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): 1424 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS):
1425 callback(self.parent, event) 1425 callback(self.parent, event)
1426 client = self.parent 1426 client = self.parent
1427 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1427 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1428 raw_items = [i.toXml() for i in event.items] 1428 raw_items = [i.toXml() for i in event.items]
1433 raw_items, 1433 raw_items,
1434 client.profile, 1434 client.profile,
1435 ) 1435 )
1436 1436
1437 def deleteReceived(self, event): 1437 def deleteReceived(self, event):
1438 log.debug((u"Publish node deleted")) 1438 log.debug(("Publish node deleted"))
1439 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): 1439 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE):
1440 callback(self.parent, event) 1440 callback(self.parent, event)
1441 client = self.parent 1441 client = self.parent
1442 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1442 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1443 self.host.bridge.psEventRaw( 1443 self.host.bridge.psEventRaw(