comparison sat/plugins/plugin_xep_0060.py @ 2624:56f94936df1e

code style reformatting using black
author Goffi <goffi@goffi.org>
date Wed, 27 Jun 2018 20:14:46 +0200
parents 26edcf3a30eb
children 189e38fb11ff
comparison
equal deleted inserted replaced
2623:49533de4540b 2624:56f94936df1e
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.log import getLogger 22 from sat.core.log import getLogger
23
23 log = getLogger(__name__) 24 log = getLogger(__name__)
24 from sat.core import exceptions 25 from sat.core import exceptions
25 from sat.tools import sat_defer 26 from sat.tools import sat_defer
26 27
27 from twisted.words.protocols.jabber import jid, error 28 from twisted.words.protocols.jabber import jid, error
31 from zope.interface import implements 32 from zope.interface import implements
32 from collections import namedtuple 33 from collections import namedtuple
33 import urllib 34 import urllib
34 import datetime 35 import datetime
35 from dateutil import tz 36 from dateutil import tz
37
36 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version 38 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version
37 # mam and rsm come from sat_tmp.wokkel too 39 # mam and rsm come from sat_tmp.wokkel too
38 from wokkel import pubsub 40 from wokkel import pubsub
39 from wokkel import rsm 41 from wokkel import rsm
40 from wokkel import mam 42 from wokkel import mam
47 C.PI_PROTOCOLS: ["XEP-0060"], 49 C.PI_PROTOCOLS: ["XEP-0060"],
48 C.PI_DEPENDENCIES: [], 50 C.PI_DEPENDENCIES: [],
49 C.PI_RECOMMENDATIONS: ["XEP-0313"], 51 C.PI_RECOMMENDATIONS: ["XEP-0313"],
50 C.PI_MAIN: "XEP_0060", 52 C.PI_MAIN: "XEP_0060",
51 C.PI_HANDLER: "yes", 53 C.PI_HANDLER: "yes",
52 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol""") 54 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""),
53 } 55 }
54 56
55 UNSPECIFIED = "unspecified error" 57 UNSPECIFIED = "unspecified error"
56 MAM_FILTER = "mam_filter_" 58 MAM_FILTER = "mam_filter_"
57 59
58 60
59 Extra = namedtuple('Extra', ('rsm_request', 'extra')) 61 Extra = namedtuple("Extra", ("rsm_request", "extra"))
60 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None 62 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None
61 # extra is a potentially empty dict 63 # extra is a potentially empty dict
62 64
63 65
64 class XEP_0060(object): 66 class XEP_0060(object):
65 OPT_ACCESS_MODEL = 'pubsub#access_model' 67 OPT_ACCESS_MODEL = "pubsub#access_model"
66 OPT_PERSIST_ITEMS = 'pubsub#persist_items' 68 OPT_PERSIST_ITEMS = "pubsub#persist_items"
67 OPT_MAX_ITEMS = 'pubsub#max_items' 69 OPT_MAX_ITEMS = "pubsub#max_items"
68 OPT_DELIVER_PAYLOADS = 'pubsub#deliver_payloads' 70 OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads"
69 OPT_SEND_ITEM_SUBSCRIBE = 'pubsub#send_item_subscribe' 71 OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe"
70 OPT_NODE_TYPE = 'pubsub#node_type' 72 OPT_NODE_TYPE = "pubsub#node_type"
71 OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type' 73 OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type"
72 OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth' 74 OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth"
73 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' 75 OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed"
74 OPT_PUBLISH_MODEL = 'pubsub#publish_model' 76 OPT_PUBLISH_MODEL = "pubsub#publish_model"
75 ACCESS_OPEN = 'open' 77 ACCESS_OPEN = "open"
76 ACCESS_PRESENCE = 'presence' 78 ACCESS_PRESENCE = "presence"
77 ACCESS_ROSTER = 'roster' 79 ACCESS_ROSTER = "roster"
78 ACCESS_PUBLISHER_ROSTER = 'publisher-roster' 80 ACCESS_PUBLISHER_ROSTER = "publisher-roster"
79 ACCESS_AUTHORIZE = 'authorize' 81 ACCESS_AUTHORIZE = "authorize"
80 ACCESS_WHITELIST = 'whitelist' 82 ACCESS_WHITELIST = "whitelist"
81 83
82 def __init__(self, host): 84 def __init__(self, host):
83 log.info(_(u"PubSub plugin initialization")) 85 log.info(_(u"PubSub plugin initialization"))
84 self.host = host 86 self.host = host
85 self._mam = host.plugins.get('XEP-0313') 87 self._mam = host.plugins.get("XEP-0313")
86 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) 88 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks)
87 self.rt_sessions = sat_defer.RTDeferredSessions() 89 self.rt_sessions = sat_defer.RTDeferredSessions()
88 host.bridge.addMethod("psNodeCreate", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self._createNode, async=True) 90 host.bridge.addMethod(
89 host.bridge.addMethod("psNodeConfigurationGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeConfiguration, async=True) 91 "psNodeCreate",
90 host.bridge.addMethod("psNodeConfigurationSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeConfiguration, async=True) 92 ".plugin",
91 host.bridge.addMethod("psNodeAffiliationsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeAffiliations, async=True) 93 in_sign="ssa{ss}s",
92 host.bridge.addMethod("psNodeAffiliationsSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeAffiliations, async=True) 94 out_sign="s",
93 host.bridge.addMethod("psNodeSubscriptionsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeSubscriptions, async=True) 95 method=self._createNode,
94 host.bridge.addMethod("psNodeSubscriptionsSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeSubscriptions, async=True) 96 async=True,
95 host.bridge.addMethod("psNodeDelete", ".plugin", in_sign='sss', out_sign='', method=self._deleteNode, async=True) 97 )
96 host.bridge.addMethod("psNodeWatchAdd", ".plugin", in_sign='sss', out_sign='', method=self._addWatch, async=False) 98 host.bridge.addMethod(
97 host.bridge.addMethod("psNodeWatchRemove", ".plugin", in_sign='sss', out_sign='', method=self._removeWatch, async=False) 99 "psNodeConfigurationGet",
98 host.bridge.addMethod("psAffiliationsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getAffiliations, async=True) 100 ".plugin",
99 host.bridge.addMethod("psItemsGet", ".plugin", in_sign='ssiassa{ss}s', out_sign='(asa{ss})', method=self._getItems, async=True) 101 in_sign="sss",
100 host.bridge.addMethod("psItemSend", ".plugin", in_sign='ssssa{ss}s', out_sign='s', method=self._sendItem, async=True) 102 out_sign="a{ss}",
101 host.bridge.addMethod("psRetractItem", ".plugin", in_sign='sssbs', out_sign='', method=self._retractItem, async=True) 103 method=self._getNodeConfiguration,
102 host.bridge.addMethod("psRetractItems", ".plugin", in_sign='ssasbs', out_sign='', method=self._retractItems, async=True) 104 async=True,
103 host.bridge.addMethod("psSubscribe", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self._subscribe, async=True) 105 )
104 host.bridge.addMethod("psUnsubscribe", ".plugin", in_sign='sss', out_sign='', method=self._unsubscribe, async=True) 106 host.bridge.addMethod(
105 host.bridge.addMethod("psSubscriptionsGet", ".plugin", in_sign='sss', out_sign='aa{ss}', method=self._subscriptions, async=True) 107 "psNodeConfigurationSet",
106 host.bridge.addMethod("psSubscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) 108 ".plugin",
107 host.bridge.addMethod("psGetSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) 109 in_sign="ssa{ss}s",
108 host.bridge.addMethod("psGetFromMany", ".plugin", in_sign='a(ss)ia{ss}s', out_sign='s', method=self._getFromMany) 110 out_sign="",
109 host.bridge.addMethod("psGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssasa{ss}))', method=self._getFromManyRTResult, async=True) 111 method=self._setNodeConfiguration,
110 112 async=True,
111 # high level observer method 113 )
112 host.bridge.addSignal("psEvent", ".plugin", signature='ssssa{ss}s') # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile 114 host.bridge.addMethod(
115 "psNodeAffiliationsGet",
116 ".plugin",
117 in_sign="sss",
118 out_sign="a{ss}",
119 method=self._getNodeAffiliations,
120 async=True,
121 )
122 host.bridge.addMethod(
123 "psNodeAffiliationsSet",
124 ".plugin",
125 in_sign="ssa{ss}s",
126 out_sign="",
127 method=self._setNodeAffiliations,
128 async=True,
129 )
130 host.bridge.addMethod(
131 "psNodeSubscriptionsGet",
132 ".plugin",
133 in_sign="sss",
134 out_sign="a{ss}",
135 method=self._getNodeSubscriptions,
136 async=True,
137 )
138 host.bridge.addMethod(
139 "psNodeSubscriptionsSet",
140 ".plugin",
141 in_sign="ssa{ss}s",
142 out_sign="",
143 method=self._setNodeSubscriptions,
144 async=True,
145 )
146 host.bridge.addMethod(
147 "psNodeDelete",
148 ".plugin",
149 in_sign="sss",
150 out_sign="",
151 method=self._deleteNode,
152 async=True,
153 )
154 host.bridge.addMethod(
155 "psNodeWatchAdd",
156 ".plugin",
157 in_sign="sss",
158 out_sign="",
159 method=self._addWatch,
160 async=False,
161 )
162 host.bridge.addMethod(
163 "psNodeWatchRemove",
164 ".plugin",
165 in_sign="sss",
166 out_sign="",
167 method=self._removeWatch,
168 async=False,
169 )
170 host.bridge.addMethod(
171 "psAffiliationsGet",
172 ".plugin",
173 in_sign="sss",
174 out_sign="a{ss}",
175 method=self._getAffiliations,
176 async=True,
177 )
178 host.bridge.addMethod(
179 "psItemsGet",
180 ".plugin",
181 in_sign="ssiassa{ss}s",
182 out_sign="(asa{ss})",
183 method=self._getItems,
184 async=True,
185 )
186 host.bridge.addMethod(
187 "psItemSend",
188 ".plugin",
189 in_sign="ssssa{ss}s",
190 out_sign="s",
191 method=self._sendItem,
192 async=True,
193 )
194 host.bridge.addMethod(
195 "psRetractItem",
196 ".plugin",
197 in_sign="sssbs",
198 out_sign="",
199 method=self._retractItem,
200 async=True,
201 )
202 host.bridge.addMethod(
203 "psRetractItems",
204 ".plugin",
205 in_sign="ssasbs",
206 out_sign="",
207 method=self._retractItems,
208 async=True,
209 )
210 host.bridge.addMethod(
211 "psSubscribe",
212 ".plugin",
213 in_sign="ssa{ss}s",
214 out_sign="s",
215 method=self._subscribe,
216 async=True,
217 )
218 host.bridge.addMethod(
219 "psUnsubscribe",
220 ".plugin",
221 in_sign="sss",
222 out_sign="",
223 method=self._unsubscribe,
224 async=True,
225 )
226 host.bridge.addMethod(
227 "psSubscriptionsGet",
228 ".plugin",
229 in_sign="sss",
230 out_sign="aa{ss}",
231 method=self._subscriptions,
232 async=True,
233 )
234 host.bridge.addMethod(
235 "psSubscribeToMany",
236 ".plugin",
237 in_sign="a(ss)sa{ss}s",
238 out_sign="s",
239 method=self._subscribeToMany,
240 )
241 host.bridge.addMethod(
242 "psGetSubscribeRTResult",
243 ".plugin",
244 in_sign="ss",
245 out_sign="(ua(sss))",
246 method=self._manySubscribeRTResult,
247 async=True,
248 )
249 host.bridge.addMethod(
250 "psGetFromMany",
251 ".plugin",
252 in_sign="a(ss)ia{ss}s",
253 out_sign="s",
254 method=self._getFromMany,
255 )
256 host.bridge.addMethod(
257 "psGetFromManyRTResult",
258 ".plugin",
259 in_sign="ss",
260 out_sign="(ua(sssasa{ss}))",
261 method=self._getFromManyRTResult,
262 async=True,
263 )
264
265 #  high level observer method
266 host.bridge.addSignal(
267 "psEvent", ".plugin", signature="ssssa{ss}s"
268 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile
113 269
114 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) 270 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods)
115 host.bridge.addSignal("psEventRaw", ".plugin", signature='sssass') # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile 271 host.bridge.addSignal(
272 "psEventRaw", ".plugin", signature="sssass"
273 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile
116 274
117 def getHandler(self, client): 275 def getHandler(self, client):
118 client.pubsub_client = SatPubSubClient(self.host, self) 276 client.pubsub_client = SatPubSubClient(self.host, self)
119 return client.pubsub_client 277 return client.pubsub_client
120 278
121 @defer.inlineCallbacks 279 @defer.inlineCallbacks
122 def profileConnected(self, client): 280 def profileConnected(self, client):
123 client.pubsub_watching = set() 281 client.pubsub_watching = set()
124 try: 282 try:
125 client.pubsub_service = jid.JID(self.host.memory.getConfig('', 'pubsub_service')) 283 client.pubsub_service = jid.JID(
284 self.host.memory.getConfig("", "pubsub_service")
285 )
126 except RuntimeError: 286 except RuntimeError:
127 log.info(_(u"Can't retrieve pubsub_service from conf, we'll use first one that we find")) 287 log.info(
128 client.pubsub_service = yield self.host.findServiceEntity(client, "pubsub", "service") 288 _(
289 u"Can't retrieve pubsub_service from conf, we'll use first one that we find"
290 )
291 )
292 client.pubsub_service = yield self.host.findServiceEntity(
293 client, "pubsub", "service"
294 )
129 295
130 def getFeatures(self, profile): 296 def getFeatures(self, profile):
131 try: 297 try:
132 client = self.host.getClient(profile) 298 client = self.host.getClient(profile)
133 except exceptions.ProfileNotSetError: 299 except exceptions.ProfileNotSetError:
134 return {} 300 return {}
135 try: 301 try:
136 return {'service': client.pubsub_service.full() if client.pubsub_service is not None else ''} 302 return {
303 "service": client.pubsub_service.full()
304 if client.pubsub_service is not None
305 else ""
306 }
137 except AttributeError: 307 except AttributeError:
138 if self.host.isConnected(profile): 308 if self.host.isConnected(profile):
139 log.debug("Profile is not connected, service is not checked yet") 309 log.debug("Profile is not connected, service is not checked yet")
140 else: 310 else:
141 log.error("Service should be available !") 311 log.error("Service should be available !")
152 rsm_request = None 322 rsm_request = None
153 extra = {} 323 extra = {}
154 else: 324 else:
155 # rsm 325 # rsm
156 rsm_args = {} 326 rsm_args = {}
157 for arg in ('max', 'after', 'before', 'index'): 327 for arg in ("max", "after", "before", "index"):
158 try: 328 try:
159 argname = "max_" if arg == 'max' else arg 329 argname = "max_" if arg == "max" else arg
160 rsm_args[argname] = extra.pop('rsm_{}'.format(arg)) 330 rsm_args[argname] = extra.pop("rsm_{}".format(arg))
161 except KeyError: 331 except KeyError:
162 continue 332 continue
163 333
164 if rsm_args: 334 if rsm_args:
165 rsm_request = rsm.RSMRequest(**rsm_args) 335 rsm_request = rsm.RSMRequest(**rsm_args)
166 else: 336 else:
167 rsm_request = None 337 rsm_request = None
168 338
169 # mam 339 # mam
170 mam_args = {} 340 mam_args = {}
171 for arg in ('start', 'end'): 341 for arg in ("start", "end"):
172 try: 342 try:
173 mam_args[arg] = datetime.datetime.fromtimestamp(int(extra.pop('{}{}'.format(MAM_FILTER, arg))), tz.tzutc()) 343 mam_args[arg] = datetime.datetime.fromtimestamp(
344 int(extra.pop("{}{}".format(MAM_FILTER, arg))), tz.tzutc()
345 )
174 except (TypeError, ValueError): 346 except (TypeError, ValueError):
175 log.warning(u"Bad value for {} filter".format(arg)) 347 log.warning(u"Bad value for {} filter".format(arg))
176 except KeyError: 348 except KeyError:
177 continue 349 continue
178 350
179 try: 351 try:
180 mam_args['with_jid'] = jid.JID(extra.pop('{}jid'.format(MAM_FILTER))) 352 mam_args["with_jid"] = jid.JID(extra.pop("{}jid".format(MAM_FILTER)))
181 except (jid.InvalidFormat): 353 except (jid.InvalidFormat):
182 log.warning(u"Bad value for jid filter") 354 log.warning(u"Bad value for jid filter")
183 except KeyError: 355 except KeyError:
184 pass 356 pass
185 357
186 for name, value in extra.iteritems(): 358 for name, value in extra.iteritems():
187 if name.startswith(MAM_FILTER): 359 if name.startswith(MAM_FILTER):
188 var = name[len(MAM_FILTER):] 360 var = name[len(MAM_FILTER) :]
189 extra_fields = mam_args.setdefault('extra_fields', []) 361 extra_fields = mam_args.setdefault("extra_fields", [])
190 extra_fields.append(data_form.Field(var=var, value=value)) 362 extra_fields.append(data_form.Field(var=var, value=value))
191 363
192 if mam_args: 364 if mam_args:
193 assert 'mam' not in extra 365 assert "mam" not in extra
194 extra['mam'] = mam.MAMRequest(mam.buildForm(**mam_args)) 366 extra["mam"] = mam.MAMRequest(mam.buildForm(**mam_args))
195 return Extra(rsm_request, extra) 367 return Extra(rsm_request, extra)
196 368
197 def addManagedNode(self, node, **kwargs): 369 def addManagedNode(self, node, **kwargs):
198 """Add a handler for a node 370 """Add a handler for a node
199 371
208 assert kwargs 380 assert kwargs
209 callbacks = self._node_cb.setdefault(node, {}) 381 callbacks = self._node_cb.setdefault(node, {})
210 for event, cb in kwargs.iteritems(): 382 for event, cb in kwargs.iteritems():
211 event_name = event[:-3] 383 event_name = event[:-3]
212 assert event_name in C.PS_EVENTS 384 assert event_name in C.PS_EVENTS
213 callbacks.setdefault(event_name,[]).append(cb) 385 callbacks.setdefault(event_name, []).append(cb)
214 386
215 def removeManagedNode(self, node, *args): 387 def removeManagedNode(self, node, *args):
216 """Add a handler for a node 388 """Add a handler for a node
217 389
218 @param node(unicode): node to monitor 390 @param node(unicode): node to monitor
229 try: 401 try:
230 cb_list.remove(callback) 402 cb_list.remove(callback)
231 except ValueError: 403 except ValueError:
232 pass 404 pass
233 else: 405 else:
234 log.debug(u"removed callback {cb} for event {event} on node {node}".format( 406 log.debug(
235 cb=callback, event=event, node=node)) 407 u"removed callback {cb} for event {event} on node {node}".format(
408 cb=callback, event=event, node=node
409 )
410 )
236 if not cb_list: 411 if not cb_list:
237 del registred_cb[event] 412 del registred_cb[event]
238 if not registred_cb: 413 if not registred_cb:
239 del self._node_cb[node] 414 del self._node_cb[node]
240 return 415 return
241 log.error(u"Trying to remove inexistant callback {cb} for node {node}".format(cb=callback, node=node)) 416 log.error(
417 u"Trying to remove inexistant callback {cb} for node {node}".format(
418 cb=callback, node=node
419 )
420 )
242 421
243 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): 422 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
244 # """Retrieve the name of the nodes that are accessible on the target service. 423 # """Retrieve the name of the nodes that are accessible on the target service.
245 424
246 # @param service (JID): target service 425 # @param service (JID): target service
268 # """ 447 # """
269 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) 448 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
270 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) 449 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
271 # return d 450 # return d
272 451
273 def _sendItem(self, service, nodeIdentifier, payload, item_id=None, extra=None, profile_key=C.PROF_KEY_NONE): 452 def _sendItem(
453 self,
454 service,
455 nodeIdentifier,
456 payload,
457 item_id=None,
458 extra=None,
459 profile_key=C.PROF_KEY_NONE,
460 ):
274 client = self.host.getClient(profile_key) 461 client = self.host.getClient(profile_key)
275 service = None if not service else jid.JID(service) 462 service = None if not service else jid.JID(service)
276 d = self.sendItem(client, service, nodeIdentifier, payload, item_id or None, extra) 463 d = self.sendItem(
277 d.addCallback(lambda ret: ret or u'') 464 client, service, nodeIdentifier, payload, item_id or None, extra
465 )
466 d.addCallback(lambda ret: ret or u"")
278 return d 467 return d
279 468
280 def _getPublishedItemId(self, iq_elt, original_id): 469 def _getPublishedItemId(self, iq_elt, original_id):
281 """return item of published id if found in answer 470 """return item of published id if found in answer
282 471
283 if not found original_id is returned, or empty string if it is None or empty string 472 if not found original_id is returned, or empty string if it is None or empty string
284 """ 473 """
285 try: 474 try:
286 item_id = iq_elt.pubsub.publish.item['id'] 475 item_id = iq_elt.pubsub.publish.item["id"]
287 except (AttributeError, KeyError): 476 except (AttributeError, KeyError):
288 item_id = None 477 item_id = None
289 return item_id or original_id 478 return item_id or original_id
290 479
291 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, extra=None): 480 def sendItem(
481 self, client, service, nodeIdentifier, payload, item_id=None, extra=None
482 ):
292 """high level method to send one item 483 """high level method to send one item
293 484
294 @param service(jid.JID, None): service to send the item to 485 @param service(jid.JID, None): service to send the item to
295 None to use PEP 486 None to use PEP
296 @param NodeIdentifier(unicode): PubSub node to use 487 @param NodeIdentifier(unicode): PubSub node to use
303 d = self.publish(client, service, nodeIdentifier, [item_elt]) 494 d = self.publish(client, service, nodeIdentifier, [item_elt])
304 d.addCallback(self._getPublishedItemId, item_id) 495 d.addCallback(self._getPublishedItemId, item_id)
305 return d 496 return d
306 497
307 def publish(self, client, service, nodeIdentifier, items=None): 498 def publish(self, client, service, nodeIdentifier, items=None):
308 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) 499 return client.pubsub_client.publish(
500 service, nodeIdentifier, items, client.pubsub_client.parent.jid
501 )
309 502
310 def _unwrapMAMMessage(self, message_elt): 503 def _unwrapMAMMessage(self, message_elt):
311 try: 504 try:
312 item_elt = (message_elt.elements(mam.NS_MAM, 'result').next() 505 item_elt = (
313 .elements(C.NS_FORWARD, 'forwarded').next() 506 message_elt.elements(mam.NS_MAM, "result")
314 .elements(C.NS_CLIENT, 'message').next() 507 .next()
315 .elements('http://jabber.org/protocol/pubsub#event', 'event').next() 508 .elements(C.NS_FORWARD, "forwarded")
316 .elements('http://jabber.org/protocol/pubsub#event', 'items').next() 509 .next()
317 .elements('http://jabber.org/protocol/pubsub#event', 'item').next()) 510 .elements(C.NS_CLIENT, "message")
511 .next()
512 .elements("http://jabber.org/protocol/pubsub#event", "event")
513 .next()
514 .elements("http://jabber.org/protocol/pubsub#event", "items")
515 .next()
516 .elements("http://jabber.org/protocol/pubsub#event", "item")
517 .next()
518 )
318 except StopIteration: 519 except StopIteration:
319 raise exceptions.DataError(u"Can't find Item in MAM message element") 520 raise exceptions.DataError(u"Can't find Item in MAM message element")
320 return item_elt 521 return item_elt
321 522
322 def _getItems(self, service='', node='', max_items=10, item_ids=None, sub_id=None, extra_dict=None, profile_key=C.PROF_KEY_NONE): 523 def _getItems(
524 self,
525 service="",
526 node="",
527 max_items=10,
528 item_ids=None,
529 sub_id=None,
530 extra_dict=None,
531 profile_key=C.PROF_KEY_NONE,
532 ):
323 """Get items from pubsub node 533 """Get items from pubsub node
324 534
325 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit 535 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
326 """ 536 """
327 client = self.host.getClient(profile_key) 537 client = self.host.getClient(profile_key)
328 service = jid.JID(service) if service else None 538 service = jid.JID(service) if service else None
329 max_items = None if max_items == C.NO_LIMIT else max_items 539 max_items = None if max_items == C.NO_LIMIT else max_items
330 extra = self.parseExtra(extra_dict) 540 extra = self.parseExtra(extra_dict)
331 d = self.getItems(client, service, node or None, max_items or None, item_ids, sub_id or None, extra.rsm_request, extra.extra) 541 d = self.getItems(
542 client,
543 service,
544 node or None,
545 max_items or None,
546 item_ids,
547 sub_id or None,
548 extra.rsm_request,
549 extra.extra,
550 )
332 d.addCallback(self.serItemsData) 551 d.addCallback(self.serItemsData)
333 return d 552 return d
334 553
335 def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, extra=None): 554 def getItems(
555 self,
556 client,
557 service,
558 node,
559 max_items=None,
560 item_ids=None,
561 sub_id=None,
562 rsm_request=None,
563 extra=None,
564 ):
336 """Retrieve pubsub items from a node. 565 """Retrieve pubsub items from a node.
337 566
338 @param service (JID, None): pubsub service. 567 @param service (JID, None): pubsub service.
339 @param node (str): node id. 568 @param node (str): node id.
340 @param max_items (int): optional limit on the number of retrieved items. 569 @param max_items (int): optional limit on the number of retrieved items.
352 if rsm_request and item_ids: 581 if rsm_request and item_ids:
353 raise ValueError(u"items_id can't be used with rsm") 582 raise ValueError(u"items_id can't be used with rsm")
354 if extra is None: 583 if extra is None:
355 extra = {} 584 extra = {}
356 try: 585 try:
357 mam_query = extra['mam'] 586 mam_query = extra["mam"]
358 except KeyError: 587 except KeyError:
359 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, None, rsm_request) 588 d = client.pubsub_client.items(
589 service, node, max_items, item_ids, sub_id, None, rsm_request
590 )
360 else: 591 else:
361 # if mam is requested, we have to do a totally different query 592 # if mam is requested, we have to do a totally different query
362 if self._mam is None: 593 if self._mam is None:
363 raise exceptions.NotFound(u"MAM (XEP-0313) plugin is not available") 594 raise exceptions.NotFound(u"MAM (XEP-0313) plugin is not available")
364 if max_items is not None: 595 if max_items is not None:
366 if item_ids: 597 if item_ids:
367 raise exceptions.DataError(u"items_ids parameter can't be used with MAM") 598 raise exceptions.DataError(u"items_ids parameter can't be used with MAM")
368 if mam_query.node is None: 599 if mam_query.node is None:
369 mam_query.node = node 600 mam_query.node = node
370 elif mam_query.node != node: 601 elif mam_query.node != node:
371 raise exceptions.DataError(u"MAM query node is incoherent with getItems's node") 602 raise exceptions.DataError(
603 u"MAM query node is incoherent with getItems's node"
604 )
372 if mam_query.rsm is None: 605 if mam_query.rsm is None:
373 mam_query.rsm = rsm_request 606 mam_query.rsm = rsm_request
374 else: 607 else:
375 if mam_query.rsm != rsm_request: 608 if mam_query.rsm != rsm_request:
376 raise exceptions.DataError(u"Conflict between RSM request and MAM's RSM request") 609 raise exceptions.DataError(
610 u"Conflict between RSM request and MAM's RSM request"
611 )
377 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) 612 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage)
378 613
379 try: 614 try:
380 subscribe = C.bool(extra['subscribe']) 615 subscribe = C.bool(extra["subscribe"])
381 except KeyError: 616 except KeyError:
382 subscribe = False 617 subscribe = False
383 618
384 def subscribeEb(failure, service, node): 619 def subscribeEb(failure, service, node):
385 failure.trap(error.StanzaError) 620 failure.trap(error.StanzaError)
386 log.warning(u"Could not subscribe to node {} on service {}: {}".format(node, unicode(service), unicode(failure.value))) 621 log.warning(
622 u"Could not subscribe to node {} on service {}: {}".format(
623 node, unicode(service), unicode(failure.value)
624 )
625 )
387 626
388 def doSubscribe(items): 627 def doSubscribe(items):
389 self.subscribe(service, node, profile_key=client.profile).addErrback(subscribeEb, service, node) 628 self.subscribe(service, node, profile_key=client.profile).addErrback(
629 subscribeEb, service, node
630 )
390 return items 631 return items
391 632
392 if subscribe: 633 if subscribe:
393 d.addCallback(doSubscribe) 634 d.addCallback(doSubscribe)
394 635
395 def addMetadata(result): 636 def addMetadata(result):
396 items, rsm_response = result 637 items, rsm_response = result
397 service_jid = service if service else client.jid.userhostJID() 638 service_jid = service if service else client.jid.userhostJID()
398 metadata = {'service': service_jid, 639 metadata = {
399 'node': node, 640 "service": service_jid,
400 'uri': self.getNodeURI(service_jid, node), 641 "node": node,
401 } 642 "uri": self.getNodeURI(service_jid, node),
643 }
402 if rsm_request is not None and rsm_response is not None: 644 if rsm_request is not None and rsm_response is not None:
403 metadata.update({'rsm_{}'.format(key): value for key, value in rsm_response.toDict().iteritems()}) 645 metadata.update(
646 {
647 "rsm_{}".format(key): value
648 for key, value in rsm_response.toDict().iteritems()
649 }
650 )
404 return (items, metadata) 651 return (items, metadata)
405 652
406 d.addCallback(addMetadata) 653 d.addCallback(addMetadata)
407 return d 654 return d
408 655
429 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) 676 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
430 # continue # avoid pubsub "item-not-found" error 677 # continue # avoid pubsub "item-not-found" error
431 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) 678 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile)
432 # defer.returnValue(d_dict) 679 # defer.returnValue(d_dict)
433 680
434 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 681 def getOptions(
435 client = self.host.getClient(profile_key) 682 self,
436 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) 683 service,
437 684 nodeIdentifier,
438 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): 685 subscriber,
439 client = self.host.getClient(profile_key) 686 subscriptionIdentifier=None,
440 return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) 687 profile_key=C.PROF_KEY_NONE,
688 ):
689 client = self.host.getClient(profile_key)
690 return client.pubsub_client.getOptions(
691 service, nodeIdentifier, subscriber, subscriptionIdentifier
692 )
693
694 def setOptions(
695 self,
696 service,
697 nodeIdentifier,
698 subscriber,
699 options,
700 subscriptionIdentifier=None,
701 profile_key=C.PROF_KEY_NONE,
702 ):
703 client = self.host.getClient(profile_key)
704 return client.pubsub_client.setOptions(
705 service, nodeIdentifier, subscriber, options, subscriptionIdentifier
706 )
441 707
442 def _createNode(self, service_s, nodeIdentifier, options, profile_key): 708 def _createNode(self, service_s, nodeIdentifier, options, profile_key):
443 client = self.host.getClient(profile_key) 709 client = self.host.getClient(profile_key)
444 return self.createNode(client, jid.JID(service_s) if service_s else None, nodeIdentifier, options) 710 return self.createNode(
711 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
712 )
445 713
446 def createNode(self, client, service, nodeIdentifier=None, options=None): 714 def createNode(self, client, service, nodeIdentifier=None, options=None):
447 """Create a new node 715 """Create a new node
448 716
449 @param service(jid.JID): PubSub service, 717 @param service(jid.JID): PubSub service,
459 def createIfNewNode(self, client, service, nodeIdentifier, options=None): 727 def createIfNewNode(self, client, service, nodeIdentifier, options=None):
460 """Helper method similar to createNode, but will not fail in case of conflict""" 728 """Helper method similar to createNode, but will not fail in case of conflict"""
461 try: 729 try:
462 yield self.createNode(client, service, nodeIdentifier, options) 730 yield self.createNode(client, service, nodeIdentifier, options)
463 except error.StanzaError as e: 731 except error.StanzaError as e:
464 if e.condition == 'conflict': 732 if e.condition == "conflict":
465 pass 733 pass
466 else: 734 else:
467 raise e 735 raise e
468 736
469 def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key): 737 def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key):
470 client = self.host.getClient(profile_key) 738 client = self.host.getClient(profile_key)
471 d = self.getConfiguration(client, jid.JID(service_s) if service_s else None, nodeIdentifier) 739 d = self.getConfiguration(
740 client, jid.JID(service_s) if service_s else None, nodeIdentifier
741 )
742
472 def serialize(form): 743 def serialize(form):
473 # FIXME: better more generic dataform serialisation should be available in SàT 744 # FIXME: better more generic dataform serialisation should be available in SàT
474 return {f.var: unicode(f.value) for f in form.fields.values()} 745 return {f.var: unicode(f.value) for f in form.fields.values()}
746
475 d.addCallback(serialize) 747 d.addCallback(serialize)
476 return d 748 return d
477 749
478 def getConfiguration(self, client, service, nodeIdentifier): 750 def getConfiguration(self, client, service, nodeIdentifier):
479 request = pubsub.PubSubRequest('configureGet') 751 request = pubsub.PubSubRequest("configureGet")
480 request.recipient = service 752 request.recipient = service
481 request.nodeIdentifier = nodeIdentifier 753 request.nodeIdentifier = nodeIdentifier
482 754
483 def cb(iq): 755 def cb(iq):
484 form = data_form.findForm(iq.pubsub.configure, 756 form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG)
485 pubsub.NS_PUBSUB_NODE_CONFIG)
486 form.typeCheck() 757 form.typeCheck()
487 return form 758 return form
488 759
489 d = request.send(client.xmlstream) 760 d = request.send(client.xmlstream)
490 d.addCallback(cb) 761 d.addCallback(cb)
491 return d 762 return d
492 763
493 def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key): 764 def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key):
494 client = self.host.getClient(profile_key) 765 client = self.host.getClient(profile_key)
495 d = self.setConfiguration(client, jid.JID(service_s) if service_s else None, nodeIdentifier, options) 766 d = self.setConfiguration(
767 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
768 )
496 return d 769 return d
497 770
498 def setConfiguration(self, client, service, nodeIdentifier, options): 771 def setConfiguration(self, client, service, nodeIdentifier, options):
499 request = pubsub.PubSubRequest('configureSet') 772 request = pubsub.PubSubRequest("configureSet")
500 request.recipient = service 773 request.recipient = service
501 request.nodeIdentifier = nodeIdentifier 774 request.nodeIdentifier = nodeIdentifier
502 775
503 form = data_form.Form(formType='submit', 776 form = data_form.Form(
504 formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG) 777 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG
778 )
505 form.makeFields(options) 779 form.makeFields(options)
506 request.options = form 780 request.options = form
507 781
508 d = request.send(client.xmlstream) 782 d = request.send(client.xmlstream)
509 return d 783 return d
510 784
511 def _getAffiliations(self, service_s, nodeIdentifier, profile_key): 785 def _getAffiliations(self, service_s, nodeIdentifier, profile_key):
512 client = self.host.getClient(profile_key) 786 client = self.host.getClient(profile_key)
513 d = self.getAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier or None) 787 d = self.getAffiliations(
788 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None
789 )
514 return d 790 return d
515 791
516 def getAffiliations(self, client, service, nodeIdentifier=None): 792 def getAffiliations(self, client, service, nodeIdentifier=None):
517 """Retrieve affiliations of an entity 793 """Retrieve affiliations of an entity
518 794
519 @param nodeIdentifier(unicode, None): node to get affiliation from 795 @param nodeIdentifier(unicode, None): node to get affiliation from
520 None to get all nodes affiliations for this service 796 None to get all nodes affiliations for this service
521 """ 797 """
522 request = pubsub.PubSubRequest('affiliations') 798 request = pubsub.PubSubRequest("affiliations")
523 request.recipient = service 799 request.recipient = service
524 request.nodeIdentifier = nodeIdentifier 800 request.nodeIdentifier = nodeIdentifier
525 801
526 def cb(iq_elt): 802 def cb(iq_elt):
527 try: 803 try:
528 affiliations_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB, 'affiliations'))) 804 affiliations_elt = next(
805 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations"))
806 )
529 except StopIteration: 807 except StopIteration:
530 raise ValueError(_(u"Invalid result: missing <affiliations> element: {}").format(iq_elt.toXml)) 808 raise ValueError(
809 _(u"Invalid result: missing <affiliations> element: {}").format(
810 iq_elt.toXml
811 )
812 )
531 try: 813 try:
532 return {e['node']: e['affiliation'] for e in affiliations_elt.elements((pubsub.NS_PUBSUB, 'affiliation'))} 814 return {
815 e["node"]: e["affiliation"]
816 for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation"))
817 }
533 except KeyError: 818 except KeyError:
534 raise ValueError(_(u"Invalid result: bad <affiliation> element: {}").format(iq_elt.toXml)) 819 raise ValueError(
820 _(u"Invalid result: bad <affiliation> element: {}").format(
821 iq_elt.toXml
822 )
823 )
535 824
536 d = request.send(client.xmlstream) 825 d = request.send(client.xmlstream)
537 d.addCallback(cb) 826 d.addCallback(cb)
538 return d 827 return d
539 828
540 def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key): 829 def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key):
541 client = self.host.getClient(profile_key) 830 client = self.host.getClient(profile_key)
542 d = self.getNodeAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier) 831 d = self.getNodeAffiliations(
543 d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.iteritems()}) 832 client, jid.JID(service_s) if service_s else None, nodeIdentifier
833 )
834 d.addCallback(
835 lambda affiliations: {j.full(): a for j, a in affiliations.iteritems()}
836 )
544 return d 837 return d
545 838
546 def getNodeAffiliations(self, client, service, nodeIdentifier): 839 def getNodeAffiliations(self, client, service, nodeIdentifier):
547 """Retrieve affiliations of a node owned by profile""" 840 """Retrieve affiliations of a node owned by profile"""
548 request = pubsub.PubSubRequest('affiliationsGet') 841 request = pubsub.PubSubRequest("affiliationsGet")
549 request.recipient = service 842 request.recipient = service
550 request.nodeIdentifier = nodeIdentifier 843 request.nodeIdentifier = nodeIdentifier
551 844
552 def cb(iq_elt): 845 def cb(iq_elt):
553 try: 846 try:
554 affiliations_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, 'affiliations'))) 847 affiliations_elt = next(
848 iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations"))
849 )
555 except StopIteration: 850 except StopIteration:
556 raise ValueError(_(u"Invalid result: missing <affiliations> element: {}").format(iq_elt.toXml)) 851 raise ValueError(
852 _(u"Invalid result: missing <affiliations> element: {}").format(
853 iq_elt.toXml
854 )
855 )
557 try: 856 try:
558 return {jid.JID(e['jid']): e['affiliation'] for e in affiliations_elt.elements((pubsub.NS_PUBSUB_OWNER, 'affiliation'))} 857 return {
858 jid.JID(e["jid"]): e["affiliation"]
859 for e in affiliations_elt.elements(
860 (pubsub.NS_PUBSUB_OWNER, "affiliation")
861 )
862 }
559 except KeyError: 863 except KeyError:
560 raise ValueError(_(u"Invalid result: bad <affiliation> element: {}").format(iq_elt.toXml)) 864 raise ValueError(
865 _(u"Invalid result: bad <affiliation> element: {}").format(
866 iq_elt.toXml
867 )
868 )
561 869
562 d = request.send(client.xmlstream) 870 d = request.send(client.xmlstream)
563 d.addCallback(cb) 871 d.addCallback(cb)
564 return d 872 return d
565 873
566 def _setNodeAffiliations(self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE): 874 def _setNodeAffiliations(
567 client = self.host.getClient(profile_key) 875 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE
568 affiliations = {jid.JID(jid_): affiliation for jid_, affiliation in affiliations.iteritems()} 876 ):
569 d = self.setNodeAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier, affiliations) 877 client = self.host.getClient(profile_key)
878 affiliations = {
879 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.iteritems()
880 }
881 d = self.setNodeAffiliations(
882 client,
883 jid.JID(service_s) if service_s else None,
884 nodeIdentifier,
885 affiliations,
886 )
570 return d 887 return d
571 888
572 def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations): 889 def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations):
573 """Update affiliations of a node owned by profile 890 """Update affiliations of a node owned by profile
574 891
575 @param affiliations(dict[jid.JID, unicode]): affiliations to set 892 @param affiliations(dict[jid.JID, unicode]): affiliations to set
576 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations 893 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations
577 """ 894 """
578 request = pubsub.PubSubRequest('affiliationsSet') 895 request = pubsub.PubSubRequest("affiliationsSet")
579 request.recipient = service 896 request.recipient = service
580 request.nodeIdentifier = nodeIdentifier 897 request.nodeIdentifier = nodeIdentifier
581 request.affiliations = affiliations 898 request.affiliations = affiliations
582 d = request.send(client.xmlstream) 899 d = request.send(client.xmlstream)
583 return d 900 return d
584 901
585 def _deleteNode(self, service_s, nodeIdentifier, profile_key): 902 def _deleteNode(self, service_s, nodeIdentifier, profile_key):
586 client = self.host.getClient(profile_key) 903 client = self.host.getClient(profile_key)
587 return self.deleteNode(client, jid.JID(service_s) if service_s else None, nodeIdentifier) 904 return self.deleteNode(
905 client, jid.JID(service_s) if service_s else None, nodeIdentifier
906 )
588 907
589 def deleteNode(self, client, service, nodeIdentifier): 908 def deleteNode(self, client, service, nodeIdentifier):
590 return client.pubsub_client.deleteNode(service, nodeIdentifier) 909 return client.pubsub_client.deleteNode(service, nodeIdentifier)
591 910
592 def _addWatch(self, service_s, node, profile_key): 911 def _addWatch(self, service_s, node, profile_key):
605 """ 924 """
606 client = self.host.getClient(profile_key) 925 client = self.host.getClient(profile_key)
607 service = jid.JID(service_s) if service_s else client.jid.userhostJID() 926 service = jid.JID(service_s) if service_s else client.jid.userhostJID()
608 client.pubsub_watching.remove((service, node)) 927 client.pubsub_watching.remove((service, node))
609 928
610 def _retractItem(self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key): 929 def _retractItem(
611 return self._retractItems(service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key) 930 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key
612 931 ):
613 def _retractItems(self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key): 932 return self._retractItems(
614 return self.retractItems(jid.JID(service_s) if service_s else None, nodeIdentifier, itemIdentifiers, notify, profile_key) 933 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key
615 934 )
616 def retractItems(self, service, nodeIdentifier, itemIdentifiers, notify=True, profile_key=C.PROF_KEY_NONE): 935
617 client = self.host.getClient(profile_key) 936 def _retractItems(
618 return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers, notify=True) 937 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key
938 ):
939 return self.retractItems(
940 jid.JID(service_s) if service_s else None,
941 nodeIdentifier,
942 itemIdentifiers,
943 notify,
944 profile_key,
945 )
946
947 def retractItems(
948 self,
949 service,
950 nodeIdentifier,
951 itemIdentifiers,
952 notify=True,
953 profile_key=C.PROF_KEY_NONE,
954 ):
955 client = self.host.getClient(profile_key)
956 return client.pubsub_client.retractItems(
957 service, nodeIdentifier, itemIdentifiers, notify=True
958 )
619 959
620 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): 960 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
621 client = self.host.getClient(profile_key) 961 client = self.host.getClient(profile_key)
622 service = None if not service else jid.JID(service) 962 service = None if not service else jid.JID(service)
623 d = self.subscribe(client, service, nodeIdentifier, options=options or None) 963 d = self.subscribe(client, service, nodeIdentifier, options=options or None)
624 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or u'') 964 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or u"")
625 return d 965 return d
626 966
627 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): 967 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None):
628 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe 968 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
629 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options) 969 return client.pubsub_client.subscribe(
970 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options
971 )
630 972
631 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): 973 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
632 client = self.host.getClient(profile_key) 974 client = self.host.getClient(profile_key)
633 service = None if not service else jid.JID(service) 975 service = None if not service else jid.JID(service)
634 return self.unsubscribe(client, service, nodeIdentifier) 976 return self.unsubscribe(client, service, nodeIdentifier)
635 977
636 def unsubscribe(self, client, service, nodeIdentifier, sub_jid=None, subscriptionIdentifier=None, sender=None): 978 def unsubscribe(
637 return client.pubsub_client.unsubscribe(service, nodeIdentifier, sub_jid or client.jid.userhostJID(), subscriptionIdentifier, sender) 979 self,
638 980 client,
639 def _subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): 981 service,
982 nodeIdentifier,
983 sub_jid=None,
984 subscriptionIdentifier=None,
985 sender=None,
986 ):
987 return client.pubsub_client.unsubscribe(
988 service,
989 nodeIdentifier,
990 sub_jid or client.jid.userhostJID(),
991 subscriptionIdentifier,
992 sender,
993 )
994
995 def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE):
640 client = self.host.getClient(profile_key) 996 client = self.host.getClient(profile_key)
641 service = None if not service else jid.JID(service) 997 service = None if not service else jid.JID(service)
642 998
643 def gotSubscriptions(subscriptions): 999 def gotSubscriptions(subscriptions):
644 # we replace pubsub.Subscription instance by dict that we can serialize 1000 # we replace pubsub.Subscription instance by dict that we can serialize
645 for idx, sub in enumerate(subscriptions): 1001 for idx, sub in enumerate(subscriptions):
646 sub_dict = {'node': sub.nodeIdentifier, 1002 sub_dict = {
647 'subscriber': sub.subscriber.full(), 1003 "node": sub.nodeIdentifier,
648 'state': sub.state 1004 "subscriber": sub.subscriber.full(),
649 } 1005 "state": sub.state,
1006 }
650 if sub.subscriptionIdentifier is not None: 1007 if sub.subscriptionIdentifier is not None:
651 sub_dict['id'] = sub.subscriptionIdentifier 1008 sub_dict["id"] = sub.subscriptionIdentifier
652 subscriptions[idx] = sub_dict 1009 subscriptions[idx] = sub_dict
653 1010
654 return subscriptions 1011 return subscriptions
655 1012
656 d = self.subscriptions(client, service, nodeIdentifier or None) 1013 d = self.subscriptions(client, service, nodeIdentifier or None)
677 """ 1034 """
678 assert service is not None 1035 assert service is not None
679 # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122) 1036 # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122)
680 # use ";" as a separator. So if more than one value is used in query_data, 1037 # use ";" as a separator. So if more than one value is used in query_data,
681 # urlencode MUST NOT BE USED. 1038 # urlencode MUST NOT BE USED.
682 query_data = [('node', node.encode('utf-8'))] 1039 query_data = [("node", node.encode("utf-8"))]
683 if item is not None: 1040 if item is not None:
684 query_data.append(('item', item.encode('utf-8'))) 1041 query_data.append(("item", item.encode("utf-8")))
685 return "xmpp:{service}?;{query}".format( 1042 return "xmpp:{service}?;{query}".format(
686 service=service.userhost(), 1043 service=service.userhost(), query=urllib.urlencode(query_data)
687 query=urllib.urlencode(query_data) 1044 ).decode("utf-8")
688 ).decode('utf-8')
689 1045
690 ## methods to manage several stanzas/jids at once ## 1046 ## methods to manage several stanzas/jids at once ##
691 1047
692 # generic # 1048 # generic #
693 1049
694 def getRTResults(self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE): 1050 def getRTResults(
1051 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
1052 ):
695 return self.rt_sessions.getResults(session_id, on_success, on_error, profile) 1053 return self.rt_sessions.getResults(session_id, on_success, on_error, profile)
696 1054
697 def serItemsData(self, items_data, item_cb=lambda item: item.toXml()): 1055 def serItemsData(self, items_data, item_cb=lambda item: item.toXml()):
698 """Helper method to serialise result from [getItems] 1056 """Helper method to serialise result from [getItems]
699 1057
703 @param items_data(tuple): tuple returned by [getItems] 1061 @param items_data(tuple): tuple returned by [getItems]
704 @param item_cb(callable): method to transform each item 1062 @param item_cb(callable): method to transform each item
705 @return (tuple): a serialised form ready to go throught bridge 1063 @return (tuple): a serialised form ready to go throught bridge
706 """ 1064 """
707 items, metadata = items_data 1065 items, metadata = items_data
708 return [item_cb(item) for item in items], {key: unicode(value) for key, value in metadata.iteritems()} 1066 return (
1067 [item_cb(item) for item in items],
1068 {key: unicode(value) for key, value in metadata.iteritems()},
1069 )
709 1070
710 def serItemsDataD(self, items_data, item_cb): 1071 def serItemsDataD(self, items_data, item_cb):
711 """Helper method to serialise result from [getItems], deferred version 1072 """Helper method to serialise result from [getItems], deferred version
712 1073
713 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) 1074 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
717 @param items_data(tuple): tuple returned by [getItems] 1078 @param items_data(tuple): tuple returned by [getItems]
718 @param item_cb(callable): method to transform each item (must return a deferred) 1079 @param item_cb(callable): method to transform each item (must return a deferred)
719 @return (tuple): a deferred which fire a serialised form ready to go throught bridge 1080 @return (tuple): a deferred which fire a serialised form ready to go throught bridge
720 """ 1081 """
721 items, metadata = items_data 1082 items, metadata = items_data
1083
722 def eb(failure): 1084 def eb(failure):
723 log.warning("Error while serialising/parsing item: {}".format(unicode(failure.value))) 1085 log.warning(
1086 "Error while serialising/parsing item: {}".format(unicode(failure.value))
1087 )
1088
724 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) 1089 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])
1090
725 def finishSerialisation(serialised_items): 1091 def finishSerialisation(serialised_items):
726 return [item for item in serialised_items if item is not None], {key: unicode(value) for key, value in metadata.iteritems()} 1092 return (
1093 [item for item in serialised_items if item is not None],
1094 {key: unicode(value) for key, value in metadata.iteritems()},
1095 )
1096
727 d.addCallback(finishSerialisation) 1097 d.addCallback(finishSerialisation)
728 return d 1098 return d
729 1099
730 def serDList(self, results, failure_result=None): 1100 def serDList(self, results, failure_result=None):
731 """Serialise a DeferredList result 1101 """Serialise a DeferredList result
737 - failure: empty in case of success, else error message 1107 - failure: empty in case of success, else error message
738 - result 1108 - result
739 """ 1109 """
740 if failure_result is None: 1110 if failure_result is None:
741 failure_result = () 1111 failure_result = ()
742 return [('', result) if success else (unicode(result.result) or UNSPECIFIED, failure_result) for success, result in results] 1112 return [
1113 ("", result)
1114 if success
1115 else (unicode(result.result) or UNSPECIFIED, failure_result)
1116 for success, result in results
1117 ]
743 1118
744 # subscribe # 1119 # subscribe #
745 1120
746 def _getNodeSubscriptions(self, service_s, nodeIdentifier, profile_key): 1121 def _getNodeSubscriptions(self, service_s, nodeIdentifier, profile_key):
747 client = self.host.getClient(profile_key) 1122 client = self.host.getClient(profile_key)
748 d = self.getNodeSubscriptions(client, jid.JID(service_s) if service_s else None, nodeIdentifier) 1123 d = self.getNodeSubscriptions(
749 d.addCallback(lambda subscriptions: {j.full(): a for j, a in subscriptions.iteritems()}) 1124 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1125 )
1126 d.addCallback(
1127 lambda subscriptions: {j.full(): a for j, a in subscriptions.iteritems()}
1128 )
750 return d 1129 return d
751 1130
752 def getNodeSubscriptions(self, client, service, nodeIdentifier): 1131 def getNodeSubscriptions(self, client, service, nodeIdentifier):
753 """Retrieve subscriptions to a node 1132 """Retrieve subscriptions to a node
754 1133
755 @param nodeIdentifier(unicode): node to get subscriptions from 1134 @param nodeIdentifier(unicode): node to get subscriptions from
756 """ 1135 """
757 if not nodeIdentifier: 1136 if not nodeIdentifier:
758 raise exceptions.DataError("node identifier can't be empty") 1137 raise exceptions.DataError("node identifier can't be empty")
759 request = pubsub.PubSubRequest('subscriptionsGet') 1138 request = pubsub.PubSubRequest("subscriptionsGet")
760 request.recipient = service 1139 request.recipient = service
761 request.nodeIdentifier = nodeIdentifier 1140 request.nodeIdentifier = nodeIdentifier
762 1141
763 def cb(iq_elt): 1142 def cb(iq_elt):
764 try: 1143 try:
765 subscriptions_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB, 'subscriptions'))) 1144 subscriptions_elt = next(
1145 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions"))
1146 )
766 except StopIteration: 1147 except StopIteration:
767 raise ValueError(_(u"Invalid result: missing <subscriptions> element: {}").format(iq_elt.toXml)) 1148 raise ValueError(
1149 _(u"Invalid result: missing <subscriptions> element: {}").format(
1150 iq_elt.toXml
1151 )
1152 )
768 except AttributeError as e: 1153 except AttributeError as e:
769 raise ValueError(_(u"Invalid result: {}").format(e)) 1154 raise ValueError(_(u"Invalid result: {}").format(e))
770 try: 1155 try:
771 return {jid.JID(s['jid']): s['subscription'] for s in subscriptions_elt.elements((pubsub.NS_PUBSUB, 'subscription'))} 1156 return {
1157 jid.JID(s["jid"]): s["subscription"]
1158 for s in subscriptions_elt.elements(
1159 (pubsub.NS_PUBSUB, "subscription")
1160 )
1161 }
772 except KeyError: 1162 except KeyError:
773 raise ValueError(_(u"Invalid result: bad <subscription> element: {}").format(iq_elt.toXml)) 1163 raise ValueError(
1164 _(u"Invalid result: bad <subscription> element: {}").format(
1165 iq_elt.toXml
1166 )
1167 )
774 1168
775 d = request.send(client.xmlstream) 1169 d = request.send(client.xmlstream)
776 d.addCallback(cb) 1170 d.addCallback(cb)
777 return d 1171 return d
778 1172
779 def _setNodeSubscriptions(self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE): 1173 def _setNodeSubscriptions(
780 client = self.host.getClient(profile_key) 1174 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
781 subscriptions = {jid.JID(jid_): subscription for jid_, subscription in subscriptions.iteritems()} 1175 ):
782 d = self.setNodeSubscriptions(client, jid.JID(service_s) if service_s else None, nodeIdentifier, subscriptions) 1176 client = self.host.getClient(profile_key)
1177 subscriptions = {
1178 jid.JID(jid_): subscription
1179 for jid_, subscription in subscriptions.iteritems()
1180 }
1181 d = self.setNodeSubscriptions(
1182 client,
1183 jid.JID(service_s) if service_s else None,
1184 nodeIdentifier,
1185 subscriptions,
1186 )
783 return d 1187 return d
784 1188
785 def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions): 1189 def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions):
786 """Set or update subscriptions of a node owned by profile 1190 """Set or update subscriptions of a node owned by profile
787 1191
788 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set 1192 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set
789 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions 1193 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions
790 """ 1194 """
791 request = pubsub.PubSubRequest('subscriptionsSet') 1195 request = pubsub.PubSubRequest("subscriptionsSet")
792 request.recipient = service 1196 request.recipient = service
793 request.nodeIdentifier = nodeIdentifier 1197 request.nodeIdentifier = nodeIdentifier
794 request.subscriptions = {pubsub.Subscription(nodeIdentifier, jid_, state) for jid_, state in subscriptions.iteritems()} 1198 request.subscriptions = {
1199 pubsub.Subscription(nodeIdentifier, jid_, state)
1200 for jid_, state in subscriptions.iteritems()
1201 }
795 d = request.send(client.xmlstream) 1202 d = request.send(client.xmlstream)
796 return d 1203 return d
797 1204
798 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): 1205 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
799 """Get real-time results for subcribeToManu session 1206 """Get real-time results for subcribeToManu session
806 - and node: pubsub node 1213 - and node: pubsub node
807 - failure(unicode): empty string in case of success, error message else 1214 - failure(unicode): empty string in case of success, error message else
808 @param profile_key: %(doc_profile_key)s 1215 @param profile_key: %(doc_profile_key)s
809 """ 1216 """
810 profile = self.host.getClient(profile_key).profile 1217 profile = self.host.getClient(profile_key).profile
811 d = self.rt_sessions.getResults(session_id, on_success=lambda result:'', on_error=lambda failure:unicode(failure.value), profile=profile) 1218 d = self.rt_sessions.getResults(
1219 session_id,
1220 on_success=lambda result: "",
1221 on_error=lambda failure: unicode(failure.value),
1222 profile=profile,
1223 )
812 # we need to convert jid.JID to unicode with full() to serialise it for the bridge 1224 # we need to convert jid.JID to unicode with full() to serialise it for the bridge
813 d.addCallback(lambda ret: (ret[0], [(service.full(), node, '' if success else failure or UNSPECIFIED) 1225 d.addCallback(
814 for (service, node), (success, failure) in ret[1].iteritems()])) 1226 lambda ret: (
815 return d 1227 ret[0],
816 1228 [
817 def _subscribeToMany(self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE): 1229 (service.full(), node, "" if success else failure or UNSPECIFIED)
818 return self.subscribeToMany([(jid.JID(service), unicode(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key) 1230 for (service, node), (success, failure) in ret[1].iteritems()
819 1231 ],
820 def subscribeToMany(self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE): 1232 )
1233 )
1234 return d
1235
1236 def _subscribeToMany(
1237 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE
1238 ):
1239 return self.subscribeToMany(
1240 [(jid.JID(service), unicode(node)) for service, node in node_data],
1241 jid.JID(subscriber),
1242 options,
1243 profile_key,
1244 )
1245
1246 def subscribeToMany(
1247 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE
1248 ):
821 """Subscribe to several nodes at once. 1249 """Subscribe to several nodes at once.
822 1250
823 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: 1251 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
824 - service (jid.JID) is the pubsub service 1252 - service (jid.JID) is the pubsub service
825 - node (unicode) is the node to subscribe to 1253 - node (unicode) is the node to subscribe to
829 @return (str): RT Deferred session id 1257 @return (str): RT Deferred session id
830 """ 1258 """
831 client = self.host.getClient(profile_key) 1259 client = self.host.getClient(profile_key)
832 deferreds = {} 1260 deferreds = {}
833 for service, node in node_data: 1261 for service, node in node_data:
834 deferreds[(service, node)] = client.pubsub_client.subscribe(service, node, subscriber, options=options) 1262 deferreds[(service, node)] = client.pubsub_client.subscribe(
1263 service, node, subscriber, options=options
1264 )
835 return self.rt_sessions.newSession(deferreds, client.profile) 1265 return self.rt_sessions.newSession(deferreds, client.profile)
836 # found_nodes = yield self.listNodes(service, profile=client.profile) 1266 # found_nodes = yield self.listNodes(service, profile=client.profile)
837 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) 1267 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
838 # d_list = [] 1268 # d_list = []
839 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): 1269 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
858 - failure (unicode): empty string in case of success, error message else 1288 - failure (unicode): empty string in case of success, error message else
859 - items (list[s]): raw XML of items 1289 - items (list[s]): raw XML of items
860 - metadata(dict): serialised metadata 1290 - metadata(dict): serialised metadata
861 """ 1291 """
862 profile = self.host.getClient(profile_key).profile 1292 profile = self.host.getClient(profile_key).profile
863 d = self.rt_sessions.getResults(session_id, 1293 d = self.rt_sessions.getResults(
864 on_success=lambda result: ('', self.serItemsData(result)), 1294 session_id,
865 on_error=lambda failure: (unicode(failure.value) or UNSPECIFIED, ([],{})), 1295 on_success=lambda result: ("", self.serItemsData(result)),
866 profile=profile) 1296 on_error=lambda failure: (unicode(failure.value) or UNSPECIFIED, ([], {})),
867 d.addCallback(lambda ret: (ret[0], 1297 profile=profile,
868 [(service.full(), node, failure, items, metadata) 1298 )
869 for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) 1299 d.addCallback(
870 return d 1300 lambda ret: (
871 1301 ret[0],
872 def _getFromMany(self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE): 1302 [
1303 (service.full(), node, failure, items, metadata)
1304 for (service, node), (success, (failure, (items, metadata))) in ret[
1305 1
1306 ].iteritems()
1307 ],
1308 )
1309 )
1310 return d
1311
1312 def _getFromMany(
1313 self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE
1314 ):
873 """ 1315 """
874 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit 1316 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
875 """ 1317 """
876 max_item = None if max_item == C.NO_LIMIT else max_item 1318 max_item = None if max_item == C.NO_LIMIT else max_item
877 extra = self.parseExtra(extra_dict) 1319 extra = self.parseExtra(extra_dict)
878 return self.getFromMany([(jid.JID(service), unicode(node)) for service, node in node_data], max_item, extra.rsm_request, extra.extra, profile_key) 1320 return self.getFromMany(
879 1321 [(jid.JID(service), unicode(node)) for service, node in node_data],
880 def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): 1322 max_item,
1323 extra.rsm_request,
1324 extra.extra,
1325 profile_key,
1326 )
1327
1328 def getFromMany(
1329 self,
1330 node_data,
1331 max_item=None,
1332 rsm_request=None,
1333 extra=None,
1334 profile_key=C.PROF_KEY_NONE,
1335 ):
881 """Get items from many nodes at once 1336 """Get items from many nodes at once
882 1337
883 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: 1338 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
884 - service (jid.JID) is the pubsub service 1339 - service (jid.JID) is the pubsub service
885 - node (unicode) is the node to get items from 1340 - node (unicode) is the node to get items from
889 @return (str): RT Deferred session id 1344 @return (str): RT Deferred session id
890 """ 1345 """
891 client = self.host.getClient(profile_key) 1346 client = self.host.getClient(profile_key)
892 deferreds = {} 1347 deferreds = {}
893 for service, node in node_data: 1348 for service, node in node_data:
894 deferreds[(service, node)] = self.getItems(client, service, node, max_item, rsm_request=rsm_request, extra=extra) 1349 deferreds[(service, node)] = self.getItems(
1350 client, service, node, max_item, rsm_request=rsm_request, extra=extra
1351 )
895 return self.rt_sessions.newSession(deferreds, client.profile) 1352 return self.rt_sessions.newSession(deferreds, client.profile)
896 1353
897 1354
898 class SatPubSubClient(rsm.PubSubClient): 1355 class SatPubSubClient(rsm.PubSubClient):
899 implements(disco.IDisco) 1356 implements(disco.IDisco)
928 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): 1385 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS):
929 callback(self.parent, event) 1386 callback(self.parent, event)
930 client = self.parent 1387 client = self.parent
931 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1388 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
932 raw_items = [i.toXml() for i in event.items] 1389 raw_items = [i.toXml() for i in event.items]
933 self.host.bridge.psEventRaw(event.sender.full(), event.nodeIdentifier, C.PS_ITEMS, raw_items, client.profile) 1390 self.host.bridge.psEventRaw(
1391 event.sender.full(),
1392 event.nodeIdentifier,
1393 C.PS_ITEMS,
1394 raw_items,
1395 client.profile,
1396 )
934 1397
935 def deleteReceived(self, event): 1398 def deleteReceived(self, event):
936 log.debug((u"Publish node deleted")) 1399 log.debug((u"Publish node deleted"))
937 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): 1400 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE):
938 callback(self.parent, event) 1401 callback(self.parent, event)
939 client = self.parent 1402 client = self.parent
940 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: 1403 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
941 self.host.bridge.psEventRaw(event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile) 1404 self.host.bridge.psEventRaw(
1405 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
1406 )
942 1407
943 def subscriptions(self, service, nodeIdentifier, sender=None): 1408 def subscriptions(self, service, nodeIdentifier, sender=None):
944 """Return the list of subscriptions to the given service and node. 1409 """Return the list of subscriptions to the given service and node.
945 1410
946 @param service: The publish subscribe service to retrieve the subscriptions from. 1411 @param service: The publish subscribe service to retrieve the subscriptions from.
947 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} 1412 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
948 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). 1413 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
949 @type nodeIdentifier: C{unicode} 1414 @type nodeIdentifier: C{unicode}
950 @return (list[pubsub.Subscription]): list of subscriptions 1415 @return (list[pubsub.Subscription]): list of subscriptions
951 """ 1416 """
952 request = pubsub.PubSubRequest('subscriptions') 1417 request = pubsub.PubSubRequest("subscriptions")
953 request.recipient = service 1418 request.recipient = service
954 request.nodeIdentifier = nodeIdentifier 1419 request.nodeIdentifier = nodeIdentifier
955 request.sender = sender 1420 request.sender = sender
956 d = request.send(self.xmlstream) 1421 d = request.send(self.xmlstream)
957 1422
958 def cb(iq): 1423 def cb(iq):
959 subs = [] 1424 subs = []
960 for subscription_elt in iq.pubsub.subscriptions.elements(pubsub.NS_PUBSUB, 'subscription'): 1425 for subscription_elt in iq.pubsub.subscriptions.elements(
961 subscription = pubsub.Subscription(subscription_elt['node'], 1426 pubsub.NS_PUBSUB, "subscription"
962 jid.JID(subscription_elt['jid']), 1427 ):
963 subscription_elt['subscription'], 1428 subscription = pubsub.Subscription(
964 subscriptionIdentifier=subscription_elt.getAttribute('subid')) 1429 subscription_elt["node"],
1430 jid.JID(subscription_elt["jid"]),
1431 subscription_elt["subscription"],
1432 subscriptionIdentifier=subscription_elt.getAttribute("subid"),
1433 )
965 subs.append(subscription) 1434 subs.append(subscription)
966 return subs 1435 return subs
967 1436
968 return d.addCallback(cb) 1437 return d.addCallback(cb)
969 1438
970 def getDiscoInfo(self, requestor, service, nodeIdentifier=''): 1439 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
971 disco_info = [] 1440 disco_info = []
972 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) 1441 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
973 return disco_info 1442 return disco_info
974 1443
975 def getDiscoItems(self, requestor, service, nodeIdentifier=''): 1444 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
976 return [] 1445 return []