Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0060.py @ 3028:ab2696e34d29
Python 3 port:
/!\ this is a huge commit
/!\ starting from this commit, SàT is needs Python 3.6+
/!\ SàT maybe be instable or some feature may not work anymore, this will improve with time
This patch port backend, bridge and frontends to Python 3.
Roughly this has been done this way:
- 2to3 tools has been applied (with python 3.7)
- all references to python2 have been replaced with python3 (notably shebangs)
- fixed files not handled by 2to3 (notably the shell script)
- several manual fixes
- fixed issues reported by Python 3 that where not handled in Python 2
- replaced "async" with "async_" when needed (it's a reserved word from Python 3.7)
- replaced zope's "implements" with @implementer decorator
- temporary hack to handle data pickled in database, as str or bytes may be returned,
to be checked later
- fixed hash comparison for password
- removed some code which is not needed anymore with Python 3
- deactivated some code which needs to be checked (notably certificate validation)
- tested with jp, fixed reported issues until some basic commands worked
- ported Primitivus (after porting dependencies like urwid satext)
- more manual fixes
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 13 Aug 2019 19:08:41 +0200 |
parents | cd391ea847cb |
children | 93e8793a735a |
comparison
equal
deleted
inserted
replaced
3027:ff5bcb12ae60 | 3028:ab2696e34d29 |
---|---|
1 #!/usr/bin/env python2 | 1 #!/usr/bin/env python3 |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 | 3 |
4 # SAT plugin for Publish-Subscribe (xep-0060) | 4 # SAT plugin for Publish-Subscribe (xep-0060) |
5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) | 5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) |
6 | 6 |
29 from twisted.words.protocols.jabber import jid, error | 29 from twisted.words.protocols.jabber import jid, error |
30 from twisted.internet import reactor, defer | 30 from twisted.internet import reactor, defer |
31 from wokkel import disco | 31 from wokkel import disco |
32 from wokkel import data_form | 32 from wokkel import data_form |
33 from wokkel import generic | 33 from wokkel import generic |
34 from zope.interface import implements | 34 from zope.interface import implementer |
35 from collections import namedtuple | 35 from collections import namedtuple |
36 import urllib | 36 import urllib.request, urllib.parse, urllib.error |
37 | 37 |
38 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version | 38 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version |
39 # mam and rsm come from sat_tmp.wokkel too | 39 # mam and rsm come from sat_tmp.wokkel too |
40 from wokkel import pubsub | 40 from wokkel import pubsub |
41 from wokkel import rsm | 41 from wokkel import rsm |
42 from wokkel import mam | 42 from wokkel import mam |
43 | 43 |
44 | 44 |
45 PLUGIN_INFO = { | 45 PLUGIN_INFO = { |
46 C.PI_NAME: u"Publish-Subscribe", | 46 C.PI_NAME: "Publish-Subscribe", |
47 C.PI_IMPORT_NAME: u"XEP-0060", | 47 C.PI_IMPORT_NAME: "XEP-0060", |
48 C.PI_TYPE: u"XEP", | 48 C.PI_TYPE: "XEP", |
49 C.PI_PROTOCOLS: [u"XEP-0060"], | 49 C.PI_PROTOCOLS: ["XEP-0060"], |
50 C.PI_DEPENDENCIES: [], | 50 C.PI_DEPENDENCIES: [], |
51 C.PI_RECOMMENDATIONS: [u"XEP-0059", u"XEP-0313"], | 51 C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"], |
52 C.PI_MAIN: u"XEP_0060", | 52 C.PI_MAIN: "XEP_0060", |
53 C.PI_HANDLER: u"yes", | 53 C.PI_HANDLER: "yes", |
54 C.PI_DESCRIPTION: _(u"""Implementation of PubSub Protocol"""), | 54 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""), |
55 } | 55 } |
56 | 56 |
57 UNSPECIFIED = "unspecified error" | 57 UNSPECIFIED = "unspecified error" |
58 | 58 |
59 | 59 |
80 ACCESS_AUTHORIZE = "authorize" | 80 ACCESS_AUTHORIZE = "authorize" |
81 ACCESS_WHITELIST = "whitelist" | 81 ACCESS_WHITELIST = "whitelist" |
82 ID_SINGLETON = "current" | 82 ID_SINGLETON = "current" |
83 | 83 |
84 def __init__(self, host): | 84 def __init__(self, host): |
85 log.info(_(u"PubSub plugin initialization")) | 85 log.info(_("PubSub plugin initialization")) |
86 self.host = host | 86 self.host = host |
87 self._rsm = host.plugins.get(u"XEP-0059") | 87 self._rsm = host.plugins.get("XEP-0059") |
88 self._mam = host.plugins.get(u"XEP-0313") | 88 self._mam = host.plugins.get("XEP-0313") |
89 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) | 89 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) |
90 self.rt_sessions = sat_defer.RTDeferredSessions() | 90 self.rt_sessions = sat_defer.RTDeferredSessions() |
91 host.bridge.addMethod( | 91 host.bridge.addMethod( |
92 "psNodeCreate", | 92 "psNodeCreate", |
93 ".plugin", | 93 ".plugin", |
94 in_sign="ssa{ss}s", | 94 in_sign="ssa{ss}s", |
95 out_sign="s", | 95 out_sign="s", |
96 method=self._createNode, | 96 method=self._createNode, |
97 async=True, | 97 async_=True, |
98 ) | 98 ) |
99 host.bridge.addMethod( | 99 host.bridge.addMethod( |
100 "psNodeConfigurationGet", | 100 "psNodeConfigurationGet", |
101 ".plugin", | 101 ".plugin", |
102 in_sign="sss", | 102 in_sign="sss", |
103 out_sign="a{ss}", | 103 out_sign="a{ss}", |
104 method=self._getNodeConfiguration, | 104 method=self._getNodeConfiguration, |
105 async=True, | 105 async_=True, |
106 ) | 106 ) |
107 host.bridge.addMethod( | 107 host.bridge.addMethod( |
108 "psNodeConfigurationSet", | 108 "psNodeConfigurationSet", |
109 ".plugin", | 109 ".plugin", |
110 in_sign="ssa{ss}s", | 110 in_sign="ssa{ss}s", |
111 out_sign="", | 111 out_sign="", |
112 method=self._setNodeConfiguration, | 112 method=self._setNodeConfiguration, |
113 async=True, | 113 async_=True, |
114 ) | 114 ) |
115 host.bridge.addMethod( | 115 host.bridge.addMethod( |
116 "psNodeAffiliationsGet", | 116 "psNodeAffiliationsGet", |
117 ".plugin", | 117 ".plugin", |
118 in_sign="sss", | 118 in_sign="sss", |
119 out_sign="a{ss}", | 119 out_sign="a{ss}", |
120 method=self._getNodeAffiliations, | 120 method=self._getNodeAffiliations, |
121 async=True, | 121 async_=True, |
122 ) | 122 ) |
123 host.bridge.addMethod( | 123 host.bridge.addMethod( |
124 "psNodeAffiliationsSet", | 124 "psNodeAffiliationsSet", |
125 ".plugin", | 125 ".plugin", |
126 in_sign="ssa{ss}s", | 126 in_sign="ssa{ss}s", |
127 out_sign="", | 127 out_sign="", |
128 method=self._setNodeAffiliations, | 128 method=self._setNodeAffiliations, |
129 async=True, | 129 async_=True, |
130 ) | 130 ) |
131 host.bridge.addMethod( | 131 host.bridge.addMethod( |
132 "psNodeSubscriptionsGet", | 132 "psNodeSubscriptionsGet", |
133 ".plugin", | 133 ".plugin", |
134 in_sign="sss", | 134 in_sign="sss", |
135 out_sign="a{ss}", | 135 out_sign="a{ss}", |
136 method=self._getNodeSubscriptions, | 136 method=self._getNodeSubscriptions, |
137 async=True, | 137 async_=True, |
138 ) | 138 ) |
139 host.bridge.addMethod( | 139 host.bridge.addMethod( |
140 "psNodeSubscriptionsSet", | 140 "psNodeSubscriptionsSet", |
141 ".plugin", | 141 ".plugin", |
142 in_sign="ssa{ss}s", | 142 in_sign="ssa{ss}s", |
143 out_sign="", | 143 out_sign="", |
144 method=self._setNodeSubscriptions, | 144 method=self._setNodeSubscriptions, |
145 async=True, | 145 async_=True, |
146 ) | 146 ) |
147 host.bridge.addMethod( | 147 host.bridge.addMethod( |
148 "psNodePurge", | 148 "psNodePurge", |
149 ".plugin", | 149 ".plugin", |
150 in_sign="sss", | 150 in_sign="sss", |
151 out_sign="", | 151 out_sign="", |
152 method=self._purgeNode, | 152 method=self._purgeNode, |
153 async=True, | 153 async_=True, |
154 ) | 154 ) |
155 host.bridge.addMethod( | 155 host.bridge.addMethod( |
156 "psNodeDelete", | 156 "psNodeDelete", |
157 ".plugin", | 157 ".plugin", |
158 in_sign="sss", | 158 in_sign="sss", |
159 out_sign="", | 159 out_sign="", |
160 method=self._deleteNode, | 160 method=self._deleteNode, |
161 async=True, | 161 async_=True, |
162 ) | 162 ) |
163 host.bridge.addMethod( | 163 host.bridge.addMethod( |
164 "psNodeWatchAdd", | 164 "psNodeWatchAdd", |
165 ".plugin", | 165 ".plugin", |
166 in_sign="sss", | 166 in_sign="sss", |
167 out_sign="", | 167 out_sign="", |
168 method=self._addWatch, | 168 method=self._addWatch, |
169 async=False, | 169 async_=False, |
170 ) | 170 ) |
171 host.bridge.addMethod( | 171 host.bridge.addMethod( |
172 "psNodeWatchRemove", | 172 "psNodeWatchRemove", |
173 ".plugin", | 173 ".plugin", |
174 in_sign="sss", | 174 in_sign="sss", |
175 out_sign="", | 175 out_sign="", |
176 method=self._removeWatch, | 176 method=self._removeWatch, |
177 async=False, | 177 async_=False, |
178 ) | 178 ) |
179 host.bridge.addMethod( | 179 host.bridge.addMethod( |
180 "psAffiliationsGet", | 180 "psAffiliationsGet", |
181 ".plugin", | 181 ".plugin", |
182 in_sign="sss", | 182 in_sign="sss", |
183 out_sign="a{ss}", | 183 out_sign="a{ss}", |
184 method=self._getAffiliations, | 184 method=self._getAffiliations, |
185 async=True, | 185 async_=True, |
186 ) | 186 ) |
187 host.bridge.addMethod( | 187 host.bridge.addMethod( |
188 "psItemsGet", | 188 "psItemsGet", |
189 ".plugin", | 189 ".plugin", |
190 in_sign="ssiassa{ss}s", | 190 in_sign="ssiassa{ss}s", |
191 out_sign="(asa{ss})", | 191 out_sign="(asa{ss})", |
192 method=self._getItems, | 192 method=self._getItems, |
193 async=True, | 193 async_=True, |
194 ) | 194 ) |
195 host.bridge.addMethod( | 195 host.bridge.addMethod( |
196 "psItemSend", | 196 "psItemSend", |
197 ".plugin", | 197 ".plugin", |
198 in_sign="ssssa{ss}s", | 198 in_sign="ssssa{ss}s", |
199 out_sign="s", | 199 out_sign="s", |
200 method=self._sendItem, | 200 method=self._sendItem, |
201 async=True, | 201 async_=True, |
202 ) | 202 ) |
203 host.bridge.addMethod( | 203 host.bridge.addMethod( |
204 "psItemsSend", | 204 "psItemsSend", |
205 ".plugin", | 205 ".plugin", |
206 in_sign="ssasa{ss}s", | 206 in_sign="ssasa{ss}s", |
207 out_sign="as", | 207 out_sign="as", |
208 method=self._sendItems, | 208 method=self._sendItems, |
209 async=True, | 209 async_=True, |
210 ) | 210 ) |
211 host.bridge.addMethod( | 211 host.bridge.addMethod( |
212 "psRetractItem", | 212 "psRetractItem", |
213 ".plugin", | 213 ".plugin", |
214 in_sign="sssbs", | 214 in_sign="sssbs", |
215 out_sign="", | 215 out_sign="", |
216 method=self._retractItem, | 216 method=self._retractItem, |
217 async=True, | 217 async_=True, |
218 ) | 218 ) |
219 host.bridge.addMethod( | 219 host.bridge.addMethod( |
220 "psRetractItems", | 220 "psRetractItems", |
221 ".plugin", | 221 ".plugin", |
222 in_sign="ssasbs", | 222 in_sign="ssasbs", |
223 out_sign="", | 223 out_sign="", |
224 method=self._retractItems, | 224 method=self._retractItems, |
225 async=True, | 225 async_=True, |
226 ) | 226 ) |
227 host.bridge.addMethod( | 227 host.bridge.addMethod( |
228 "psSubscribe", | 228 "psSubscribe", |
229 ".plugin", | 229 ".plugin", |
230 in_sign="ssa{ss}s", | 230 in_sign="ssa{ss}s", |
231 out_sign="s", | 231 out_sign="s", |
232 method=self._subscribe, | 232 method=self._subscribe, |
233 async=True, | 233 async_=True, |
234 ) | 234 ) |
235 host.bridge.addMethod( | 235 host.bridge.addMethod( |
236 "psUnsubscribe", | 236 "psUnsubscribe", |
237 ".plugin", | 237 ".plugin", |
238 in_sign="sss", | 238 in_sign="sss", |
239 out_sign="", | 239 out_sign="", |
240 method=self._unsubscribe, | 240 method=self._unsubscribe, |
241 async=True, | 241 async_=True, |
242 ) | 242 ) |
243 host.bridge.addMethod( | 243 host.bridge.addMethod( |
244 "psSubscriptionsGet", | 244 "psSubscriptionsGet", |
245 ".plugin", | 245 ".plugin", |
246 in_sign="sss", | 246 in_sign="sss", |
247 out_sign="aa{ss}", | 247 out_sign="aa{ss}", |
248 method=self._subscriptions, | 248 method=self._subscriptions, |
249 async=True, | 249 async_=True, |
250 ) | 250 ) |
251 host.bridge.addMethod( | 251 host.bridge.addMethod( |
252 "psSubscribeToMany", | 252 "psSubscribeToMany", |
253 ".plugin", | 253 ".plugin", |
254 in_sign="a(ss)sa{ss}s", | 254 in_sign="a(ss)sa{ss}s", |
259 "psGetSubscribeRTResult", | 259 "psGetSubscribeRTResult", |
260 ".plugin", | 260 ".plugin", |
261 in_sign="ss", | 261 in_sign="ss", |
262 out_sign="(ua(sss))", | 262 out_sign="(ua(sss))", |
263 method=self._manySubscribeRTResult, | 263 method=self._manySubscribeRTResult, |
264 async=True, | 264 async_=True, |
265 ) | 265 ) |
266 host.bridge.addMethod( | 266 host.bridge.addMethod( |
267 "psGetFromMany", | 267 "psGetFromMany", |
268 ".plugin", | 268 ".plugin", |
269 in_sign="a(ss)ia{ss}s", | 269 in_sign="a(ss)ia{ss}s", |
274 "psGetFromManyRTResult", | 274 "psGetFromManyRTResult", |
275 ".plugin", | 275 ".plugin", |
276 in_sign="ss", | 276 in_sign="ss", |
277 out_sign="(ua(sssasa{ss}))", | 277 out_sign="(ua(sssasa{ss}))", |
278 method=self._getFromManyRTResult, | 278 method=self._getFromManyRTResult, |
279 async=True, | 279 async_=True, |
280 ) | 280 ) |
281 | 281 |
282 # high level observer method | 282 # high level observer method |
283 host.bridge.addSignal( | 283 host.bridge.addSignal( |
284 "psEvent", ".plugin", signature="ssssss" | 284 "psEvent", ".plugin", signature="ssssss" |
301 self.host.memory.getConfig("", "pubsub_service") | 301 self.host.memory.getConfig("", "pubsub_service") |
302 ) | 302 ) |
303 except RuntimeError: | 303 except RuntimeError: |
304 log.info( | 304 log.info( |
305 _( | 305 _( |
306 u"Can't retrieve pubsub_service from conf, we'll use first one that we find" | 306 "Can't retrieve pubsub_service from conf, we'll use first one that we find" |
307 ) | 307 ) |
308 ) | 308 ) |
309 client.pubsub_service = yield self.host.findServiceEntity( | 309 client.pubsub_service = yield self.host.findServiceEntity( |
310 client, "pubsub", "service" | 310 client, "pubsub", "service" |
311 ) | 311 ) |
357 mam_request = None | 357 mam_request = None |
358 else: | 358 else: |
359 mam_request = self._mam.parseExtra(extra, with_rsm=False) | 359 mam_request = self._mam.parseExtra(extra, with_rsm=False) |
360 | 360 |
361 if mam_request is not None: | 361 if mam_request is not None: |
362 assert u"mam" not in extra | 362 assert "mam" not in extra |
363 extra[u"mam"] = mam_request | 363 extra["mam"] = mam_request |
364 | 364 |
365 return Extra(rsm_request, extra) | 365 return Extra(rsm_request, extra) |
366 | 366 |
367 def addManagedNode(self, node, **kwargs): | 367 def addManagedNode(self, node, **kwargs): |
368 """Add a handler for a node | 368 """Add a handler for a node |
375 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE | 375 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE |
376 """ | 376 """ |
377 assert node is not None | 377 assert node is not None |
378 assert kwargs | 378 assert kwargs |
379 callbacks = self._node_cb.setdefault(node, {}) | 379 callbacks = self._node_cb.setdefault(node, {}) |
380 for event, cb in kwargs.iteritems(): | 380 for event, cb in kwargs.items(): |
381 event_name = event[:-3] | 381 event_name = event[:-3] |
382 assert event_name in C.PS_EVENTS | 382 assert event_name in C.PS_EVENTS |
383 callbacks.setdefault(event_name, []).append(cb) | 383 callbacks.setdefault(event_name, []).append(cb) |
384 | 384 |
385 def removeManagedNode(self, node, *args): | 385 def removeManagedNode(self, node, *args): |
393 registred_cb = self._node_cb[node] | 393 registred_cb = self._node_cb[node] |
394 except KeyError: | 394 except KeyError: |
395 pass | 395 pass |
396 else: | 396 else: |
397 for callback in args: | 397 for callback in args: |
398 for event, cb_list in registred_cb.iteritems(): | 398 for event, cb_list in registred_cb.items(): |
399 try: | 399 try: |
400 cb_list.remove(callback) | 400 cb_list.remove(callback) |
401 except ValueError: | 401 except ValueError: |
402 pass | 402 pass |
403 else: | 403 else: |
404 log.debug( | 404 log.debug( |
405 u"removed callback {cb} for event {event} on node {node}".format( | 405 "removed callback {cb} for event {event} on node {node}".format( |
406 cb=callback, event=event, node=node | 406 cb=callback, event=event, node=node |
407 ) | 407 ) |
408 ) | 408 ) |
409 if not cb_list: | 409 if not cb_list: |
410 del registred_cb[event] | 410 del registred_cb[event] |
411 if not registred_cb: | 411 if not registred_cb: |
412 del self._node_cb[node] | 412 del self._node_cb[node] |
413 return | 413 return |
414 log.error( | 414 log.error( |
415 u"Trying to remove inexistant callback {cb} for node {node}".format( | 415 "Trying to remove inexistant callback {cb} for node {node}".format( |
416 cb=callback, node=node | 416 cb=callback, node=node |
417 ) | 417 ) |
418 ) | 418 ) |
419 | 419 |
420 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): | 420 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): |
452 client = self.host.getClient(profile_key) | 452 client = self.host.getClient(profile_key) |
453 service = None if not service else jid.JID(service) | 453 service = None if not service else jid.JID(service) |
454 d = self.sendItem( | 454 d = self.sendItem( |
455 client, service, nodeIdentifier, payload, item_id or None, extra | 455 client, service, nodeIdentifier, payload, item_id or None, extra |
456 ) | 456 ) |
457 d.addCallback(lambda ret: ret or u"") | 457 d.addCallback(lambda ret: ret or "") |
458 return d | 458 return d |
459 | 459 |
460 def _sendItems(self, service, nodeIdentifier, items, extra=None, | 460 def _sendItems(self, service, nodeIdentifier, items, extra=None, |
461 profile_key=C.PROF_KEY_NONE): | 461 profile_key=C.PROF_KEY_NONE): |
462 client = self.host.getClient(profile_key) | 462 client = self.host.getClient(profile_key) |
463 service = None if not service else jid.JID(service) | 463 service = None if not service else jid.JID(service) |
464 try: | 464 try: |
465 items = [generic.parseXml(item.encode('utf-8')) for item in items] | 465 items = [generic.parseXml(item.encode('utf-8')) for item in items] |
466 except Exception as e: | 466 except Exception as e: |
467 raise exceptions.DataError(_(u"Can't parse items: {msg}").format( | 467 raise exceptions.DataError(_("Can't parse items: {msg}").format( |
468 msg=e)) | 468 msg=e)) |
469 d = self.sendItems( | 469 d = self.sendItems( |
470 client, service, nodeIdentifier, items, extra | 470 client, service, nodeIdentifier, items, extra |
471 ) | 471 ) |
472 return d | 472 return d |
502 | 502 |
503 def _publishCb(self, iq_result): | 503 def _publishCb(self, iq_result): |
504 """Parse publish result, and return ids given by pubsub service""" | 504 """Parse publish result, and return ids given by pubsub service""" |
505 try: | 505 try: |
506 item_ids = [item['id'] | 506 item_ids = [item['id'] |
507 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, u'item')] | 507 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] |
508 except AttributeError: | 508 except AttributeError: |
509 return [] | 509 return [] |
510 return item_ids | 510 return item_ids |
511 | 511 |
512 def sendItems(self, client, service, nodeIdentifier, items, extra=None): | 512 def sendItems(self, client, service, nodeIdentifier, items, extra=None): |
520 @param extra(dict, None): extra option, not used yet | 520 @param extra(dict, None): extra option, not used yet |
521 @return (list[unicode]): ids of the created items | 521 @return (list[unicode]): ids of the created items |
522 """ | 522 """ |
523 parsed_items = [] | 523 parsed_items = [] |
524 for item in items: | 524 for item in items: |
525 if item.name != u'item': | 525 if item.name != 'item': |
526 raise exceptions.DataError(_(u"Invalid item: {xml}").format(item.toXml())) | 526 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) |
527 item_id = item.getAttribute(u"id") | 527 item_id = item.getAttribute("id") |
528 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) | 528 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) |
529 d = self.publish(client, service, nodeIdentifier, parsed_items) | 529 d = self.publish(client, service, nodeIdentifier, parsed_items) |
530 d.addCallback(self._publishCb) | 530 d.addCallback(self._publishCb) |
531 return d | 531 return d |
532 | 532 |
536 ) | 536 ) |
537 | 537 |
538 def _unwrapMAMMessage(self, message_elt): | 538 def _unwrapMAMMessage(self, message_elt): |
539 try: | 539 try: |
540 item_elt = ( | 540 item_elt = ( |
541 message_elt.elements(mam.NS_MAM, "result").next() | 541 next(message_elt.elements(mam.NS_MAM, "result").next() |
542 .elements(C.NS_FORWARD, "forwarded").next() | 542 .elements(C.NS_FORWARD, "forwarded").next() |
543 .elements(C.NS_CLIENT, "message").next() | 543 .elements(C.NS_CLIENT, "message").next() |
544 .elements("http://jabber.org/protocol/pubsub#event", "event").next() | 544 .elements("http://jabber.org/protocol/pubsub#event", "event").next() |
545 .elements("http://jabber.org/protocol/pubsub#event", "items").next() | 545 .elements("http://jabber.org/protocol/pubsub#event", "items").next() |
546 .elements("http://jabber.org/protocol/pubsub#event", "item").next() | 546 .elements("http://jabber.org/protocol/pubsub#event", "item")) |
547 ) | 547 ) |
548 except StopIteration: | 548 except StopIteration: |
549 raise exceptions.DataError(u"Can't find Item in MAM message element") | 549 raise exceptions.DataError("Can't find Item in MAM message element") |
550 return item_elt | 550 return item_elt |
551 | 551 |
552 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, | 552 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, |
553 extra_dict=None, profile_key=C.PROF_KEY_NONE): | 553 extra_dict=None, profile_key=C.PROF_KEY_NONE): |
554 """Get items from pubsub node | 554 """Get items from pubsub node |
591 - service, node: service and node used | 591 - service, node: service and node used |
592 """ | 592 """ |
593 if item_ids and max_items is not None: | 593 if item_ids and max_items is not None: |
594 max_items = None | 594 max_items = None |
595 if rsm_request and item_ids: | 595 if rsm_request and item_ids: |
596 raise ValueError(u"items_id can't be used with rsm") | 596 raise ValueError("items_id can't be used with rsm") |
597 if extra is None: | 597 if extra is None: |
598 extra = {} | 598 extra = {} |
599 try: | 599 try: |
600 mam_query = extra["mam"] | 600 mam_query = extra["mam"] |
601 except KeyError: | 601 except KeyError: |
614 d.addErrback(sat_defer.stanza2NotFound) | 614 d.addErrback(sat_defer.stanza2NotFound) |
615 d.addTimeout(TIMEOUT, reactor) | 615 d.addTimeout(TIMEOUT, reactor) |
616 else: | 616 else: |
617 # if mam is requested, we have to do a totally different query | 617 # if mam is requested, we have to do a totally different query |
618 if self._mam is None: | 618 if self._mam is None: |
619 raise exceptions.NotFound(u"MAM (XEP-0313) plugin is not available") | 619 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") |
620 if max_items is not None: | 620 if max_items is not None: |
621 raise exceptions.DataError(u"max_items parameter can't be used with MAM") | 621 raise exceptions.DataError("max_items parameter can't be used with MAM") |
622 if item_ids: | 622 if item_ids: |
623 raise exceptions.DataError(u"items_ids parameter can't be used with MAM") | 623 raise exceptions.DataError("items_ids parameter can't be used with MAM") |
624 if mam_query.node is None: | 624 if mam_query.node is None: |
625 mam_query.node = node | 625 mam_query.node = node |
626 elif mam_query.node != node: | 626 elif mam_query.node != node: |
627 raise exceptions.DataError( | 627 raise exceptions.DataError( |
628 u"MAM query node is incoherent with getItems's node" | 628 "MAM query node is incoherent with getItems's node" |
629 ) | 629 ) |
630 if mam_query.rsm is None: | 630 if mam_query.rsm is None: |
631 mam_query.rsm = rsm_request | 631 mam_query.rsm = rsm_request |
632 else: | 632 else: |
633 if mam_query.rsm != rsm_request: | 633 if mam_query.rsm != rsm_request: |
634 raise exceptions.DataError( | 634 raise exceptions.DataError( |
635 u"Conflict between RSM request and MAM's RSM request" | 635 "Conflict between RSM request and MAM's RSM request" |
636 ) | 636 ) |
637 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) | 637 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) |
638 | 638 |
639 try: | 639 try: |
640 subscribe = C.bool(extra["subscribe"]) | 640 subscribe = C.bool(extra["subscribe"]) |
642 subscribe = False | 642 subscribe = False |
643 | 643 |
644 def subscribeEb(failure, service, node): | 644 def subscribeEb(failure, service, node): |
645 failure.trap(error.StanzaError) | 645 failure.trap(error.StanzaError) |
646 log.warning( | 646 log.warning( |
647 u"Could not subscribe to node {} on service {}: {}".format( | 647 "Could not subscribe to node {} on service {}: {}".format( |
648 node, unicode(service), unicode(failure.value) | 648 node, str(service), str(failure.value) |
649 ) | 649 ) |
650 ) | 650 ) |
651 | 651 |
652 def doSubscribe(data): | 652 def doSubscribe(data): |
653 self.subscribe(client, service, node).addErrback( | 653 self.subscribe(client, service, node).addErrback( |
668 "uri": self.getNodeURI(service_jid, node), | 668 "uri": self.getNodeURI(service_jid, node), |
669 } | 669 } |
670 if rsm_request is not None and rsm_response is not None: | 670 if rsm_request is not None and rsm_response is not None: |
671 metadata.update( | 671 metadata.update( |
672 { | 672 { |
673 u"rsm_" + key: value | 673 "rsm_" + key: value |
674 for key, value in rsm_response.toDict().iteritems() | 674 for key, value in rsm_response.toDict().items() |
675 } | 675 } |
676 ) | 676 ) |
677 if mam_response is not None: | 677 if mam_response is not None: |
678 for key, value in mam_response.iteritems(): | 678 for key, value in mam_response.items(): |
679 metadata[u"mam_" + key] = value | 679 metadata["mam_" + key] = value |
680 return (items, metadata) | 680 return (items, metadata) |
681 | 681 |
682 d.addCallback(addMetadata) | 682 d.addCallback(addMetadata) |
683 return d | 683 return d |
684 | 684 |
756 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 756 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
757 ) | 757 ) |
758 | 758 |
759 def serialize(form): | 759 def serialize(form): |
760 # FIXME: better more generic dataform serialisation should be available in SàT | 760 # FIXME: better more generic dataform serialisation should be available in SàT |
761 return {f.var: unicode(f.value) for f in form.fields.values()} | 761 return {f.var: str(f.value) for f in list(form.fields.values())} |
762 | 762 |
763 d.addCallback(serialize) | 763 d.addCallback(serialize) |
764 return d | 764 return d |
765 | 765 |
766 def getConfiguration(self, client, service, nodeIdentifier): | 766 def getConfiguration(self, client, service, nodeIdentifier): |
820 affiliations_elt = next( | 820 affiliations_elt = next( |
821 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations")) | 821 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations")) |
822 ) | 822 ) |
823 except StopIteration: | 823 except StopIteration: |
824 raise ValueError( | 824 raise ValueError( |
825 _(u"Invalid result: missing <affiliations> element: {}").format( | 825 _("Invalid result: missing <affiliations> element: {}").format( |
826 iq_elt.toXml | 826 iq_elt.toXml |
827 ) | 827 ) |
828 ) | 828 ) |
829 try: | 829 try: |
830 return { | 830 return { |
831 e["node"]: e["affiliation"] | 831 e["node"]: e["affiliation"] |
832 for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation")) | 832 for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation")) |
833 } | 833 } |
834 except KeyError: | 834 except KeyError: |
835 raise ValueError( | 835 raise ValueError( |
836 _(u"Invalid result: bad <affiliation> element: {}").format( | 836 _("Invalid result: bad <affiliation> element: {}").format( |
837 iq_elt.toXml | 837 iq_elt.toXml |
838 ) | 838 ) |
839 ) | 839 ) |
840 | 840 |
841 d = request.send(client.xmlstream) | 841 d = request.send(client.xmlstream) |
846 client = self.host.getClient(profile_key) | 846 client = self.host.getClient(profile_key) |
847 d = self.getNodeAffiliations( | 847 d = self.getNodeAffiliations( |
848 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 848 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
849 ) | 849 ) |
850 d.addCallback( | 850 d.addCallback( |
851 lambda affiliations: {j.full(): a for j, a in affiliations.iteritems()} | 851 lambda affiliations: {j.full(): a for j, a in affiliations.items()} |
852 ) | 852 ) |
853 return d | 853 return d |
854 | 854 |
855 def getNodeAffiliations(self, client, service, nodeIdentifier): | 855 def getNodeAffiliations(self, client, service, nodeIdentifier): |
856 """Retrieve affiliations of a node owned by profile""" | 856 """Retrieve affiliations of a node owned by profile""" |
863 affiliations_elt = next( | 863 affiliations_elt = next( |
864 iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations")) | 864 iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations")) |
865 ) | 865 ) |
866 except StopIteration: | 866 except StopIteration: |
867 raise ValueError( | 867 raise ValueError( |
868 _(u"Invalid result: missing <affiliations> element: {}").format( | 868 _("Invalid result: missing <affiliations> element: {}").format( |
869 iq_elt.toXml | 869 iq_elt.toXml |
870 ) | 870 ) |
871 ) | 871 ) |
872 try: | 872 try: |
873 return { | 873 return { |
876 (pubsub.NS_PUBSUB_OWNER, "affiliation") | 876 (pubsub.NS_PUBSUB_OWNER, "affiliation") |
877 ) | 877 ) |
878 } | 878 } |
879 except KeyError: | 879 except KeyError: |
880 raise ValueError( | 880 raise ValueError( |
881 _(u"Invalid result: bad <affiliation> element: {}").format( | 881 _("Invalid result: bad <affiliation> element: {}").format( |
882 iq_elt.toXml | 882 iq_elt.toXml |
883 ) | 883 ) |
884 ) | 884 ) |
885 | 885 |
886 d = request.send(client.xmlstream) | 886 d = request.send(client.xmlstream) |
890 def _setNodeAffiliations( | 890 def _setNodeAffiliations( |
891 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE | 891 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE |
892 ): | 892 ): |
893 client = self.host.getClient(profile_key) | 893 client = self.host.getClient(profile_key) |
894 affiliations = { | 894 affiliations = { |
895 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.iteritems() | 895 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items() |
896 } | 896 } |
897 d = self.setNodeAffiliations( | 897 d = self.setNodeAffiliations( |
898 client, | 898 client, |
899 jid.JID(service_s) if service_s else None, | 899 jid.JID(service_s) if service_s else None, |
900 nodeIdentifier, | 900 nodeIdentifier, |
984 | 984 |
985 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | 985 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): |
986 client = self.host.getClient(profile_key) | 986 client = self.host.getClient(profile_key) |
987 service = None if not service else jid.JID(service) | 987 service = None if not service else jid.JID(service) |
988 d = self.subscribe(client, service, nodeIdentifier, options=options or None) | 988 d = self.subscribe(client, service, nodeIdentifier, options=options or None) |
989 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or u"") | 989 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") |
990 return d | 990 return d |
991 | 991 |
992 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): | 992 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): |
993 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe | 993 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe |
994 return client.pubsub_client.subscribe( | 994 return client.pubsub_client.subscribe( |
1063 # urlencode MUST NOT BE USED. | 1063 # urlencode MUST NOT BE USED. |
1064 query_data = [("node", node.encode("utf-8"))] | 1064 query_data = [("node", node.encode("utf-8"))] |
1065 if item is not None: | 1065 if item is not None: |
1066 query_data.append(("item", item.encode("utf-8"))) | 1066 query_data.append(("item", item.encode("utf-8"))) |
1067 return "xmpp:{service}?;{query}".format( | 1067 return "xmpp:{service}?;{query}".format( |
1068 service=service.userhost(), query=urllib.urlencode(query_data) | 1068 service=service.userhost(), query=urllib.parse.urlencode(query_data) |
1069 ).decode("utf-8") | 1069 ) |
1070 | 1070 |
1071 ## methods to manage several stanzas/jids at once ## | 1071 ## methods to manage several stanzas/jids at once ## |
1072 | 1072 |
1073 # generic # | 1073 # generic # |
1074 | 1074 |
1097 else: | 1097 else: |
1098 items = [item_cb(item) for item in items] | 1098 items = [item_cb(item) for item in items] |
1099 | 1099 |
1100 return ( | 1100 return ( |
1101 items, | 1101 items, |
1102 {key: unicode(value) for key, value in metadata.iteritems()}, | 1102 {key: str(value) for key, value in metadata.items()}, |
1103 ) | 1103 ) |
1104 | 1104 |
1105 def transItemsDataD(self, items_data, item_cb, serialise=False): | 1105 def transItemsDataD(self, items_data, item_cb, serialise=False): |
1106 """Helper method to transform result from [getItems], deferred version | 1106 """Helper method to transform result from [getItems], deferred version |
1107 | 1107 |
1120 """ | 1120 """ |
1121 items, metadata = items_data | 1121 items, metadata = items_data |
1122 | 1122 |
1123 def eb(failure): | 1123 def eb(failure): |
1124 log.warning( | 1124 log.warning( |
1125 "Error while serialising/parsing item: {}".format(unicode(failure.value)) | 1125 "Error while serialising/parsing item: {}".format(str(failure.value)) |
1126 ) | 1126 ) |
1127 | 1127 |
1128 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) | 1128 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) |
1129 | 1129 |
1130 def finishSerialisation(parsed_items): | 1130 def finishSerialisation(parsed_items): |
1133 else: | 1133 else: |
1134 items = [i for i in parsed_items if i is not None] | 1134 items = [i for i in parsed_items if i is not None] |
1135 | 1135 |
1136 return ( | 1136 return ( |
1137 items, | 1137 items, |
1138 {key: unicode(value) for key, value in metadata.iteritems()}, | 1138 {key: str(value) for key, value in metadata.items()}, |
1139 ) | 1139 ) |
1140 | 1140 |
1141 d.addCallback(finishSerialisation) | 1141 d.addCallback(finishSerialisation) |
1142 return d | 1142 return d |
1143 | 1143 |
1154 if failure_result is None: | 1154 if failure_result is None: |
1155 failure_result = () | 1155 failure_result = () |
1156 return [ | 1156 return [ |
1157 ("", result) | 1157 ("", result) |
1158 if success | 1158 if success |
1159 else (unicode(result.result) or UNSPECIFIED, failure_result) | 1159 else (str(result.result) or UNSPECIFIED, failure_result) |
1160 for success, result in results | 1160 for success, result in results |
1161 ] | 1161 ] |
1162 | 1162 |
1163 # subscribe # | 1163 # subscribe # |
1164 | 1164 |
1166 client = self.host.getClient(profile_key) | 1166 client = self.host.getClient(profile_key) |
1167 d = self.getNodeSubscriptions( | 1167 d = self.getNodeSubscriptions( |
1168 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 1168 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
1169 ) | 1169 ) |
1170 d.addCallback( | 1170 d.addCallback( |
1171 lambda subscriptions: {j.full(): a for j, a in subscriptions.iteritems()} | 1171 lambda subscriptions: {j.full(): a for j, a in subscriptions.items()} |
1172 ) | 1172 ) |
1173 return d | 1173 return d |
1174 | 1174 |
1175 def getNodeSubscriptions(self, client, service, nodeIdentifier): | 1175 def getNodeSubscriptions(self, client, service, nodeIdentifier): |
1176 """Retrieve subscriptions to a node | 1176 """Retrieve subscriptions to a node |
1188 subscriptions_elt = next( | 1188 subscriptions_elt = next( |
1189 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions")) | 1189 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions")) |
1190 ) | 1190 ) |
1191 except StopIteration: | 1191 except StopIteration: |
1192 raise ValueError( | 1192 raise ValueError( |
1193 _(u"Invalid result: missing <subscriptions> element: {}").format( | 1193 _("Invalid result: missing <subscriptions> element: {}").format( |
1194 iq_elt.toXml | 1194 iq_elt.toXml |
1195 ) | 1195 ) |
1196 ) | 1196 ) |
1197 except AttributeError as e: | 1197 except AttributeError as e: |
1198 raise ValueError(_(u"Invalid result: {}").format(e)) | 1198 raise ValueError(_("Invalid result: {}").format(e)) |
1199 try: | 1199 try: |
1200 return { | 1200 return { |
1201 jid.JID(s["jid"]): s["subscription"] | 1201 jid.JID(s["jid"]): s["subscription"] |
1202 for s in subscriptions_elt.elements( | 1202 for s in subscriptions_elt.elements( |
1203 (pubsub.NS_PUBSUB, "subscription") | 1203 (pubsub.NS_PUBSUB, "subscription") |
1204 ) | 1204 ) |
1205 } | 1205 } |
1206 except KeyError: | 1206 except KeyError: |
1207 raise ValueError( | 1207 raise ValueError( |
1208 _(u"Invalid result: bad <subscription> element: {}").format( | 1208 _("Invalid result: bad <subscription> element: {}").format( |
1209 iq_elt.toXml | 1209 iq_elt.toXml |
1210 ) | 1210 ) |
1211 ) | 1211 ) |
1212 | 1212 |
1213 d = request.send(client.xmlstream) | 1213 d = request.send(client.xmlstream) |
1218 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE | 1218 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE |
1219 ): | 1219 ): |
1220 client = self.host.getClient(profile_key) | 1220 client = self.host.getClient(profile_key) |
1221 subscriptions = { | 1221 subscriptions = { |
1222 jid.JID(jid_): subscription | 1222 jid.JID(jid_): subscription |
1223 for jid_, subscription in subscriptions.iteritems() | 1223 for jid_, subscription in subscriptions.items() |
1224 } | 1224 } |
1225 d = self.setNodeSubscriptions( | 1225 d = self.setNodeSubscriptions( |
1226 client, | 1226 client, |
1227 jid.JID(service_s) if service_s else None, | 1227 jid.JID(service_s) if service_s else None, |
1228 nodeIdentifier, | 1228 nodeIdentifier, |
1239 request = pubsub.PubSubRequest("subscriptionsSet") | 1239 request = pubsub.PubSubRequest("subscriptionsSet") |
1240 request.recipient = service | 1240 request.recipient = service |
1241 request.nodeIdentifier = nodeIdentifier | 1241 request.nodeIdentifier = nodeIdentifier |
1242 request.subscriptions = { | 1242 request.subscriptions = { |
1243 pubsub.Subscription(nodeIdentifier, jid_, state) | 1243 pubsub.Subscription(nodeIdentifier, jid_, state) |
1244 for jid_, state in subscriptions.iteritems() | 1244 for jid_, state in subscriptions.items() |
1245 } | 1245 } |
1246 d = request.send(client.xmlstream) | 1246 d = request.send(client.xmlstream) |
1247 return d | 1247 return d |
1248 | 1248 |
1249 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | 1249 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): |
1260 """ | 1260 """ |
1261 profile = self.host.getClient(profile_key).profile | 1261 profile = self.host.getClient(profile_key).profile |
1262 d = self.rt_sessions.getResults( | 1262 d = self.rt_sessions.getResults( |
1263 session_id, | 1263 session_id, |
1264 on_success=lambda result: "", | 1264 on_success=lambda result: "", |
1265 on_error=lambda failure: unicode(failure.value), | 1265 on_error=lambda failure: str(failure.value), |
1266 profile=profile, | 1266 profile=profile, |
1267 ) | 1267 ) |
1268 # we need to convert jid.JID to unicode with full() to serialise it for the bridge | 1268 # we need to convert jid.JID to unicode with full() to serialise it for the bridge |
1269 d.addCallback( | 1269 d.addCallback( |
1270 lambda ret: ( | 1270 lambda ret: ( |
1271 ret[0], | 1271 ret[0], |
1272 [ | 1272 [ |
1273 (service.full(), node, "" if success else failure or UNSPECIFIED) | 1273 (service.full(), node, "" if success else failure or UNSPECIFIED) |
1274 for (service, node), (success, failure) in ret[1].iteritems() | 1274 for (service, node), (success, failure) in ret[1].items() |
1275 ], | 1275 ], |
1276 ) | 1276 ) |
1277 ) | 1277 ) |
1278 return d | 1278 return d |
1279 | 1279 |
1280 def _subscribeToMany( | 1280 def _subscribeToMany( |
1281 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE | 1281 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE |
1282 ): | 1282 ): |
1283 return self.subscribeToMany( | 1283 return self.subscribeToMany( |
1284 [(jid.JID(service), unicode(node)) for service, node in node_data], | 1284 [(jid.JID(service), str(node)) for service, node in node_data], |
1285 jid.JID(subscriber), | 1285 jid.JID(subscriber), |
1286 options, | 1286 options, |
1287 profile_key, | 1287 profile_key, |
1288 ) | 1288 ) |
1289 | 1289 |
1335 """ | 1335 """ |
1336 profile = self.host.getClient(profile_key).profile | 1336 profile = self.host.getClient(profile_key).profile |
1337 d = self.rt_sessions.getResults( | 1337 d = self.rt_sessions.getResults( |
1338 session_id, | 1338 session_id, |
1339 on_success=lambda result: ("", self.transItemsData(result)), | 1339 on_success=lambda result: ("", self.transItemsData(result)), |
1340 on_error=lambda failure: (unicode(failure.value) or UNSPECIFIED, ([], {})), | 1340 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})), |
1341 profile=profile, | 1341 profile=profile, |
1342 ) | 1342 ) |
1343 d.addCallback( | 1343 d.addCallback( |
1344 lambda ret: ( | 1344 lambda ret: ( |
1345 ret[0], | 1345 ret[0], |
1346 [ | 1346 [ |
1347 (service.full(), node, failure, items, metadata) | 1347 (service.full(), node, failure, items, metadata) |
1348 for (service, node), (success, (failure, (items, metadata))) in ret[ | 1348 for (service, node), (success, (failure, (items, metadata))) in ret[ |
1349 1 | 1349 1 |
1350 ].iteritems() | 1350 ].items() |
1351 ], | 1351 ], |
1352 ) | 1352 ) |
1353 ) | 1353 ) |
1354 return d | 1354 return d |
1355 | 1355 |
1360 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit | 1360 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit |
1361 """ | 1361 """ |
1362 max_item = None if max_item == C.NO_LIMIT else max_item | 1362 max_item = None if max_item == C.NO_LIMIT else max_item |
1363 extra = self.parseExtra(extra_dict) | 1363 extra = self.parseExtra(extra_dict) |
1364 return self.getFromMany( | 1364 return self.getFromMany( |
1365 [(jid.JID(service), unicode(node)) for service, node in node_data], | 1365 [(jid.JID(service), str(node)) for service, node in node_data], |
1366 max_item, | 1366 max_item, |
1367 extra.rsm_request, | 1367 extra.rsm_request, |
1368 extra.extra, | 1368 extra.extra, |
1369 profile_key, | 1369 profile_key, |
1370 ) | 1370 ) |
1388 client, service, node, max_item, rsm_request=rsm_request, extra=extra | 1388 client, service, node, max_item, rsm_request=rsm_request, extra=extra |
1389 ) | 1389 ) |
1390 return self.rt_sessions.newSession(deferreds, client.profile) | 1390 return self.rt_sessions.newSession(deferreds, client.profile) |
1391 | 1391 |
1392 | 1392 |
1393 @implementer(disco.IDisco) | |
1393 class SatPubSubClient(rsm.PubSubClient): | 1394 class SatPubSubClient(rsm.PubSubClient): |
1394 implements(disco.IDisco) | |
1395 | 1395 |
1396 def __init__(self, host, parent_plugin): | 1396 def __init__(self, host, parent_plugin): |
1397 self.host = host | 1397 self.host = host |
1398 self.parent_plugin = parent_plugin | 1398 self.parent_plugin = parent_plugin |
1399 rsm.PubSubClient.__init__(self) | 1399 rsm.PubSubClient.__init__(self) |
1407 @param node(unicode): node used for the item | 1407 @param node(unicode): node used for the item |
1408 any registered node which prefix the node will match | 1408 any registered node which prefix the node will match |
1409 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE | 1409 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE |
1410 @return (iterator[callable]): callbacks for this node/event | 1410 @return (iterator[callable]): callbacks for this node/event |
1411 """ | 1411 """ |
1412 for registered_node, callbacks_dict in self.parent_plugin._node_cb.iteritems(): | 1412 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items(): |
1413 if not node.startswith(registered_node): | 1413 if not node.startswith(registered_node): |
1414 continue | 1414 continue |
1415 try: | 1415 try: |
1416 for callback in callbacks_dict[event]: | 1416 for callback in callbacks_dict[event]: |
1417 yield callback | 1417 yield callback |
1418 except KeyError: | 1418 except KeyError: |
1419 continue | 1419 continue |
1420 | 1420 |
1421 | 1421 |
1422 def itemsReceived(self, event): | 1422 def itemsReceived(self, event): |
1423 log.debug(u"Pubsub items received") | 1423 log.debug("Pubsub items received") |
1424 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): | 1424 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): |
1425 callback(self.parent, event) | 1425 callback(self.parent, event) |
1426 client = self.parent | 1426 client = self.parent |
1427 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1427 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1428 raw_items = [i.toXml() for i in event.items] | 1428 raw_items = [i.toXml() for i in event.items] |
1433 raw_items, | 1433 raw_items, |
1434 client.profile, | 1434 client.profile, |
1435 ) | 1435 ) |
1436 | 1436 |
1437 def deleteReceived(self, event): | 1437 def deleteReceived(self, event): |
1438 log.debug((u"Publish node deleted")) | 1438 log.debug(("Publish node deleted")) |
1439 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): | 1439 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): |
1440 callback(self.parent, event) | 1440 callback(self.parent, event) |
1441 client = self.parent | 1441 client = self.parent |
1442 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1442 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1443 self.host.bridge.psEventRaw( | 1443 self.host.bridge.psEventRaw( |