Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0060.py @ 2624:56f94936df1e
code style reformatting using black
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 27 Jun 2018 20:14:46 +0200 |
parents | 26edcf3a30eb |
children | 189e38fb11ff |
comparison
equal
deleted
inserted
replaced
2623:49533de4540b | 2624:56f94936df1e |
---|---|
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from sat.core.i18n import _ | 20 from sat.core.i18n import _ |
21 from sat.core.constants import Const as C | 21 from sat.core.constants import Const as C |
22 from sat.core.log import getLogger | 22 from sat.core.log import getLogger |
23 | |
23 log = getLogger(__name__) | 24 log = getLogger(__name__) |
24 from sat.core import exceptions | 25 from sat.core import exceptions |
25 from sat.tools import sat_defer | 26 from sat.tools import sat_defer |
26 | 27 |
27 from twisted.words.protocols.jabber import jid, error | 28 from twisted.words.protocols.jabber import jid, error |
31 from zope.interface import implements | 32 from zope.interface import implements |
32 from collections import namedtuple | 33 from collections import namedtuple |
33 import urllib | 34 import urllib |
34 import datetime | 35 import datetime |
35 from dateutil import tz | 36 from dateutil import tz |
37 | |
36 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version | 38 # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version |
37 # mam and rsm come from sat_tmp.wokkel too | 39 # mam and rsm come from sat_tmp.wokkel too |
38 from wokkel import pubsub | 40 from wokkel import pubsub |
39 from wokkel import rsm | 41 from wokkel import rsm |
40 from wokkel import mam | 42 from wokkel import mam |
47 C.PI_PROTOCOLS: ["XEP-0060"], | 49 C.PI_PROTOCOLS: ["XEP-0060"], |
48 C.PI_DEPENDENCIES: [], | 50 C.PI_DEPENDENCIES: [], |
49 C.PI_RECOMMENDATIONS: ["XEP-0313"], | 51 C.PI_RECOMMENDATIONS: ["XEP-0313"], |
50 C.PI_MAIN: "XEP_0060", | 52 C.PI_MAIN: "XEP_0060", |
51 C.PI_HANDLER: "yes", | 53 C.PI_HANDLER: "yes", |
52 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol""") | 54 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""), |
53 } | 55 } |
54 | 56 |
55 UNSPECIFIED = "unspecified error" | 57 UNSPECIFIED = "unspecified error" |
56 MAM_FILTER = "mam_filter_" | 58 MAM_FILTER = "mam_filter_" |
57 | 59 |
58 | 60 |
59 Extra = namedtuple('Extra', ('rsm_request', 'extra')) | 61 Extra = namedtuple("Extra", ("rsm_request", "extra")) |
60 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None | 62 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None |
61 # extra is a potentially empty dict | 63 # extra is a potentially empty dict |
62 | 64 |
63 | 65 |
64 class XEP_0060(object): | 66 class XEP_0060(object): |
65 OPT_ACCESS_MODEL = 'pubsub#access_model' | 67 OPT_ACCESS_MODEL = "pubsub#access_model" |
66 OPT_PERSIST_ITEMS = 'pubsub#persist_items' | 68 OPT_PERSIST_ITEMS = "pubsub#persist_items" |
67 OPT_MAX_ITEMS = 'pubsub#max_items' | 69 OPT_MAX_ITEMS = "pubsub#max_items" |
68 OPT_DELIVER_PAYLOADS = 'pubsub#deliver_payloads' | 70 OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads" |
69 OPT_SEND_ITEM_SUBSCRIBE = 'pubsub#send_item_subscribe' | 71 OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe" |
70 OPT_NODE_TYPE = 'pubsub#node_type' | 72 OPT_NODE_TYPE = "pubsub#node_type" |
71 OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type' | 73 OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type" |
72 OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth' | 74 OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth" |
73 OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' | 75 OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed" |
74 OPT_PUBLISH_MODEL = 'pubsub#publish_model' | 76 OPT_PUBLISH_MODEL = "pubsub#publish_model" |
75 ACCESS_OPEN = 'open' | 77 ACCESS_OPEN = "open" |
76 ACCESS_PRESENCE = 'presence' | 78 ACCESS_PRESENCE = "presence" |
77 ACCESS_ROSTER = 'roster' | 79 ACCESS_ROSTER = "roster" |
78 ACCESS_PUBLISHER_ROSTER = 'publisher-roster' | 80 ACCESS_PUBLISHER_ROSTER = "publisher-roster" |
79 ACCESS_AUTHORIZE = 'authorize' | 81 ACCESS_AUTHORIZE = "authorize" |
80 ACCESS_WHITELIST = 'whitelist' | 82 ACCESS_WHITELIST = "whitelist" |
81 | 83 |
82 def __init__(self, host): | 84 def __init__(self, host): |
83 log.info(_(u"PubSub plugin initialization")) | 85 log.info(_(u"PubSub plugin initialization")) |
84 self.host = host | 86 self.host = host |
85 self._mam = host.plugins.get('XEP-0313') | 87 self._mam = host.plugins.get("XEP-0313") |
86 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) | 88 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) |
87 self.rt_sessions = sat_defer.RTDeferredSessions() | 89 self.rt_sessions = sat_defer.RTDeferredSessions() |
88 host.bridge.addMethod("psNodeCreate", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self._createNode, async=True) | 90 host.bridge.addMethod( |
89 host.bridge.addMethod("psNodeConfigurationGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeConfiguration, async=True) | 91 "psNodeCreate", |
90 host.bridge.addMethod("psNodeConfigurationSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeConfiguration, async=True) | 92 ".plugin", |
91 host.bridge.addMethod("psNodeAffiliationsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeAffiliations, async=True) | 93 in_sign="ssa{ss}s", |
92 host.bridge.addMethod("psNodeAffiliationsSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeAffiliations, async=True) | 94 out_sign="s", |
93 host.bridge.addMethod("psNodeSubscriptionsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeSubscriptions, async=True) | 95 method=self._createNode, |
94 host.bridge.addMethod("psNodeSubscriptionsSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeSubscriptions, async=True) | 96 async=True, |
95 host.bridge.addMethod("psNodeDelete", ".plugin", in_sign='sss', out_sign='', method=self._deleteNode, async=True) | 97 ) |
96 host.bridge.addMethod("psNodeWatchAdd", ".plugin", in_sign='sss', out_sign='', method=self._addWatch, async=False) | 98 host.bridge.addMethod( |
97 host.bridge.addMethod("psNodeWatchRemove", ".plugin", in_sign='sss', out_sign='', method=self._removeWatch, async=False) | 99 "psNodeConfigurationGet", |
98 host.bridge.addMethod("psAffiliationsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getAffiliations, async=True) | 100 ".plugin", |
99 host.bridge.addMethod("psItemsGet", ".plugin", in_sign='ssiassa{ss}s', out_sign='(asa{ss})', method=self._getItems, async=True) | 101 in_sign="sss", |
100 host.bridge.addMethod("psItemSend", ".plugin", in_sign='ssssa{ss}s', out_sign='s', method=self._sendItem, async=True) | 102 out_sign="a{ss}", |
101 host.bridge.addMethod("psRetractItem", ".plugin", in_sign='sssbs', out_sign='', method=self._retractItem, async=True) | 103 method=self._getNodeConfiguration, |
102 host.bridge.addMethod("psRetractItems", ".plugin", in_sign='ssasbs', out_sign='', method=self._retractItems, async=True) | 104 async=True, |
103 host.bridge.addMethod("psSubscribe", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self._subscribe, async=True) | 105 ) |
104 host.bridge.addMethod("psUnsubscribe", ".plugin", in_sign='sss', out_sign='', method=self._unsubscribe, async=True) | 106 host.bridge.addMethod( |
105 host.bridge.addMethod("psSubscriptionsGet", ".plugin", in_sign='sss', out_sign='aa{ss}', method=self._subscriptions, async=True) | 107 "psNodeConfigurationSet", |
106 host.bridge.addMethod("psSubscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) | 108 ".plugin", |
107 host.bridge.addMethod("psGetSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) | 109 in_sign="ssa{ss}s", |
108 host.bridge.addMethod("psGetFromMany", ".plugin", in_sign='a(ss)ia{ss}s', out_sign='s', method=self._getFromMany) | 110 out_sign="", |
109 host.bridge.addMethod("psGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssasa{ss}))', method=self._getFromManyRTResult, async=True) | 111 method=self._setNodeConfiguration, |
110 | 112 async=True, |
111 # high level observer method | 113 ) |
112 host.bridge.addSignal("psEvent", ".plugin", signature='ssssa{ss}s') # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile | 114 host.bridge.addMethod( |
115 "psNodeAffiliationsGet", | |
116 ".plugin", | |
117 in_sign="sss", | |
118 out_sign="a{ss}", | |
119 method=self._getNodeAffiliations, | |
120 async=True, | |
121 ) | |
122 host.bridge.addMethod( | |
123 "psNodeAffiliationsSet", | |
124 ".plugin", | |
125 in_sign="ssa{ss}s", | |
126 out_sign="", | |
127 method=self._setNodeAffiliations, | |
128 async=True, | |
129 ) | |
130 host.bridge.addMethod( | |
131 "psNodeSubscriptionsGet", | |
132 ".plugin", | |
133 in_sign="sss", | |
134 out_sign="a{ss}", | |
135 method=self._getNodeSubscriptions, | |
136 async=True, | |
137 ) | |
138 host.bridge.addMethod( | |
139 "psNodeSubscriptionsSet", | |
140 ".plugin", | |
141 in_sign="ssa{ss}s", | |
142 out_sign="", | |
143 method=self._setNodeSubscriptions, | |
144 async=True, | |
145 ) | |
146 host.bridge.addMethod( | |
147 "psNodeDelete", | |
148 ".plugin", | |
149 in_sign="sss", | |
150 out_sign="", | |
151 method=self._deleteNode, | |
152 async=True, | |
153 ) | |
154 host.bridge.addMethod( | |
155 "psNodeWatchAdd", | |
156 ".plugin", | |
157 in_sign="sss", | |
158 out_sign="", | |
159 method=self._addWatch, | |
160 async=False, | |
161 ) | |
162 host.bridge.addMethod( | |
163 "psNodeWatchRemove", | |
164 ".plugin", | |
165 in_sign="sss", | |
166 out_sign="", | |
167 method=self._removeWatch, | |
168 async=False, | |
169 ) | |
170 host.bridge.addMethod( | |
171 "psAffiliationsGet", | |
172 ".plugin", | |
173 in_sign="sss", | |
174 out_sign="a{ss}", | |
175 method=self._getAffiliations, | |
176 async=True, | |
177 ) | |
178 host.bridge.addMethod( | |
179 "psItemsGet", | |
180 ".plugin", | |
181 in_sign="ssiassa{ss}s", | |
182 out_sign="(asa{ss})", | |
183 method=self._getItems, | |
184 async=True, | |
185 ) | |
186 host.bridge.addMethod( | |
187 "psItemSend", | |
188 ".plugin", | |
189 in_sign="ssssa{ss}s", | |
190 out_sign="s", | |
191 method=self._sendItem, | |
192 async=True, | |
193 ) | |
194 host.bridge.addMethod( | |
195 "psRetractItem", | |
196 ".plugin", | |
197 in_sign="sssbs", | |
198 out_sign="", | |
199 method=self._retractItem, | |
200 async=True, | |
201 ) | |
202 host.bridge.addMethod( | |
203 "psRetractItems", | |
204 ".plugin", | |
205 in_sign="ssasbs", | |
206 out_sign="", | |
207 method=self._retractItems, | |
208 async=True, | |
209 ) | |
210 host.bridge.addMethod( | |
211 "psSubscribe", | |
212 ".plugin", | |
213 in_sign="ssa{ss}s", | |
214 out_sign="s", | |
215 method=self._subscribe, | |
216 async=True, | |
217 ) | |
218 host.bridge.addMethod( | |
219 "psUnsubscribe", | |
220 ".plugin", | |
221 in_sign="sss", | |
222 out_sign="", | |
223 method=self._unsubscribe, | |
224 async=True, | |
225 ) | |
226 host.bridge.addMethod( | |
227 "psSubscriptionsGet", | |
228 ".plugin", | |
229 in_sign="sss", | |
230 out_sign="aa{ss}", | |
231 method=self._subscriptions, | |
232 async=True, | |
233 ) | |
234 host.bridge.addMethod( | |
235 "psSubscribeToMany", | |
236 ".plugin", | |
237 in_sign="a(ss)sa{ss}s", | |
238 out_sign="s", | |
239 method=self._subscribeToMany, | |
240 ) | |
241 host.bridge.addMethod( | |
242 "psGetSubscribeRTResult", | |
243 ".plugin", | |
244 in_sign="ss", | |
245 out_sign="(ua(sss))", | |
246 method=self._manySubscribeRTResult, | |
247 async=True, | |
248 ) | |
249 host.bridge.addMethod( | |
250 "psGetFromMany", | |
251 ".plugin", | |
252 in_sign="a(ss)ia{ss}s", | |
253 out_sign="s", | |
254 method=self._getFromMany, | |
255 ) | |
256 host.bridge.addMethod( | |
257 "psGetFromManyRTResult", | |
258 ".plugin", | |
259 in_sign="ss", | |
260 out_sign="(ua(sssasa{ss}))", | |
261 method=self._getFromManyRTResult, | |
262 async=True, | |
263 ) | |
264 | |
265 # high level observer method | |
266 host.bridge.addSignal( | |
267 "psEvent", ".plugin", signature="ssssa{ss}s" | |
268 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile | |
113 | 269 |
114 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) | 270 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) |
115 host.bridge.addSignal("psEventRaw", ".plugin", signature='sssass') # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile | 271 host.bridge.addSignal( |
272 "psEventRaw", ".plugin", signature="sssass" | |
273 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile | |
116 | 274 |
117 def getHandler(self, client): | 275 def getHandler(self, client): |
118 client.pubsub_client = SatPubSubClient(self.host, self) | 276 client.pubsub_client = SatPubSubClient(self.host, self) |
119 return client.pubsub_client | 277 return client.pubsub_client |
120 | 278 |
121 @defer.inlineCallbacks | 279 @defer.inlineCallbacks |
122 def profileConnected(self, client): | 280 def profileConnected(self, client): |
123 client.pubsub_watching = set() | 281 client.pubsub_watching = set() |
124 try: | 282 try: |
125 client.pubsub_service = jid.JID(self.host.memory.getConfig('', 'pubsub_service')) | 283 client.pubsub_service = jid.JID( |
284 self.host.memory.getConfig("", "pubsub_service") | |
285 ) | |
126 except RuntimeError: | 286 except RuntimeError: |
127 log.info(_(u"Can't retrieve pubsub_service from conf, we'll use first one that we find")) | 287 log.info( |
128 client.pubsub_service = yield self.host.findServiceEntity(client, "pubsub", "service") | 288 _( |
289 u"Can't retrieve pubsub_service from conf, we'll use first one that we find" | |
290 ) | |
291 ) | |
292 client.pubsub_service = yield self.host.findServiceEntity( | |
293 client, "pubsub", "service" | |
294 ) | |
129 | 295 |
130 def getFeatures(self, profile): | 296 def getFeatures(self, profile): |
131 try: | 297 try: |
132 client = self.host.getClient(profile) | 298 client = self.host.getClient(profile) |
133 except exceptions.ProfileNotSetError: | 299 except exceptions.ProfileNotSetError: |
134 return {} | 300 return {} |
135 try: | 301 try: |
136 return {'service': client.pubsub_service.full() if client.pubsub_service is not None else ''} | 302 return { |
303 "service": client.pubsub_service.full() | |
304 if client.pubsub_service is not None | |
305 else "" | |
306 } | |
137 except AttributeError: | 307 except AttributeError: |
138 if self.host.isConnected(profile): | 308 if self.host.isConnected(profile): |
139 log.debug("Profile is not connected, service is not checked yet") | 309 log.debug("Profile is not connected, service is not checked yet") |
140 else: | 310 else: |
141 log.error("Service should be available !") | 311 log.error("Service should be available !") |
152 rsm_request = None | 322 rsm_request = None |
153 extra = {} | 323 extra = {} |
154 else: | 324 else: |
155 # rsm | 325 # rsm |
156 rsm_args = {} | 326 rsm_args = {} |
157 for arg in ('max', 'after', 'before', 'index'): | 327 for arg in ("max", "after", "before", "index"): |
158 try: | 328 try: |
159 argname = "max_" if arg == 'max' else arg | 329 argname = "max_" if arg == "max" else arg |
160 rsm_args[argname] = extra.pop('rsm_{}'.format(arg)) | 330 rsm_args[argname] = extra.pop("rsm_{}".format(arg)) |
161 except KeyError: | 331 except KeyError: |
162 continue | 332 continue |
163 | 333 |
164 if rsm_args: | 334 if rsm_args: |
165 rsm_request = rsm.RSMRequest(**rsm_args) | 335 rsm_request = rsm.RSMRequest(**rsm_args) |
166 else: | 336 else: |
167 rsm_request = None | 337 rsm_request = None |
168 | 338 |
169 # mam | 339 # mam |
170 mam_args = {} | 340 mam_args = {} |
171 for arg in ('start', 'end'): | 341 for arg in ("start", "end"): |
172 try: | 342 try: |
173 mam_args[arg] = datetime.datetime.fromtimestamp(int(extra.pop('{}{}'.format(MAM_FILTER, arg))), tz.tzutc()) | 343 mam_args[arg] = datetime.datetime.fromtimestamp( |
344 int(extra.pop("{}{}".format(MAM_FILTER, arg))), tz.tzutc() | |
345 ) | |
174 except (TypeError, ValueError): | 346 except (TypeError, ValueError): |
175 log.warning(u"Bad value for {} filter".format(arg)) | 347 log.warning(u"Bad value for {} filter".format(arg)) |
176 except KeyError: | 348 except KeyError: |
177 continue | 349 continue |
178 | 350 |
179 try: | 351 try: |
180 mam_args['with_jid'] = jid.JID(extra.pop('{}jid'.format(MAM_FILTER))) | 352 mam_args["with_jid"] = jid.JID(extra.pop("{}jid".format(MAM_FILTER))) |
181 except (jid.InvalidFormat): | 353 except (jid.InvalidFormat): |
182 log.warning(u"Bad value for jid filter") | 354 log.warning(u"Bad value for jid filter") |
183 except KeyError: | 355 except KeyError: |
184 pass | 356 pass |
185 | 357 |
186 for name, value in extra.iteritems(): | 358 for name, value in extra.iteritems(): |
187 if name.startswith(MAM_FILTER): | 359 if name.startswith(MAM_FILTER): |
188 var = name[len(MAM_FILTER):] | 360 var = name[len(MAM_FILTER) :] |
189 extra_fields = mam_args.setdefault('extra_fields', []) | 361 extra_fields = mam_args.setdefault("extra_fields", []) |
190 extra_fields.append(data_form.Field(var=var, value=value)) | 362 extra_fields.append(data_form.Field(var=var, value=value)) |
191 | 363 |
192 if mam_args: | 364 if mam_args: |
193 assert 'mam' not in extra | 365 assert "mam" not in extra |
194 extra['mam'] = mam.MAMRequest(mam.buildForm(**mam_args)) | 366 extra["mam"] = mam.MAMRequest(mam.buildForm(**mam_args)) |
195 return Extra(rsm_request, extra) | 367 return Extra(rsm_request, extra) |
196 | 368 |
197 def addManagedNode(self, node, **kwargs): | 369 def addManagedNode(self, node, **kwargs): |
198 """Add a handler for a node | 370 """Add a handler for a node |
199 | 371 |
208 assert kwargs | 380 assert kwargs |
209 callbacks = self._node_cb.setdefault(node, {}) | 381 callbacks = self._node_cb.setdefault(node, {}) |
210 for event, cb in kwargs.iteritems(): | 382 for event, cb in kwargs.iteritems(): |
211 event_name = event[:-3] | 383 event_name = event[:-3] |
212 assert event_name in C.PS_EVENTS | 384 assert event_name in C.PS_EVENTS |
213 callbacks.setdefault(event_name,[]).append(cb) | 385 callbacks.setdefault(event_name, []).append(cb) |
214 | 386 |
215 def removeManagedNode(self, node, *args): | 387 def removeManagedNode(self, node, *args): |
216 """Add a handler for a node | 388 """Add a handler for a node |
217 | 389 |
218 @param node(unicode): node to monitor | 390 @param node(unicode): node to monitor |
229 try: | 401 try: |
230 cb_list.remove(callback) | 402 cb_list.remove(callback) |
231 except ValueError: | 403 except ValueError: |
232 pass | 404 pass |
233 else: | 405 else: |
234 log.debug(u"removed callback {cb} for event {event} on node {node}".format( | 406 log.debug( |
235 cb=callback, event=event, node=node)) | 407 u"removed callback {cb} for event {event} on node {node}".format( |
408 cb=callback, event=event, node=node | |
409 ) | |
410 ) | |
236 if not cb_list: | 411 if not cb_list: |
237 del registred_cb[event] | 412 del registred_cb[event] |
238 if not registred_cb: | 413 if not registred_cb: |
239 del self._node_cb[node] | 414 del self._node_cb[node] |
240 return | 415 return |
241 log.error(u"Trying to remove inexistant callback {cb} for node {node}".format(cb=callback, node=node)) | 416 log.error( |
417 u"Trying to remove inexistant callback {cb} for node {node}".format( | |
418 cb=callback, node=node | |
419 ) | |
420 ) | |
242 | 421 |
243 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): | 422 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): |
244 # """Retrieve the name of the nodes that are accessible on the target service. | 423 # """Retrieve the name of the nodes that are accessible on the target service. |
245 | 424 |
246 # @param service (JID): target service | 425 # @param service (JID): target service |
268 # """ | 447 # """ |
269 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) | 448 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) |
270 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) | 449 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) |
271 # return d | 450 # return d |
272 | 451 |
273 def _sendItem(self, service, nodeIdentifier, payload, item_id=None, extra=None, profile_key=C.PROF_KEY_NONE): | 452 def _sendItem( |
453 self, | |
454 service, | |
455 nodeIdentifier, | |
456 payload, | |
457 item_id=None, | |
458 extra=None, | |
459 profile_key=C.PROF_KEY_NONE, | |
460 ): | |
274 client = self.host.getClient(profile_key) | 461 client = self.host.getClient(profile_key) |
275 service = None if not service else jid.JID(service) | 462 service = None if not service else jid.JID(service) |
276 d = self.sendItem(client, service, nodeIdentifier, payload, item_id or None, extra) | 463 d = self.sendItem( |
277 d.addCallback(lambda ret: ret or u'') | 464 client, service, nodeIdentifier, payload, item_id or None, extra |
465 ) | |
466 d.addCallback(lambda ret: ret or u"") | |
278 return d | 467 return d |
279 | 468 |
280 def _getPublishedItemId(self, iq_elt, original_id): | 469 def _getPublishedItemId(self, iq_elt, original_id): |
281 """return item of published id if found in answer | 470 """return item of published id if found in answer |
282 | 471 |
283 if not found original_id is returned, or empty string if it is None or empty string | 472 if not found original_id is returned, or empty string if it is None or empty string |
284 """ | 473 """ |
285 try: | 474 try: |
286 item_id = iq_elt.pubsub.publish.item['id'] | 475 item_id = iq_elt.pubsub.publish.item["id"] |
287 except (AttributeError, KeyError): | 476 except (AttributeError, KeyError): |
288 item_id = None | 477 item_id = None |
289 return item_id or original_id | 478 return item_id or original_id |
290 | 479 |
291 def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, extra=None): | 480 def sendItem( |
481 self, client, service, nodeIdentifier, payload, item_id=None, extra=None | |
482 ): | |
292 """high level method to send one item | 483 """high level method to send one item |
293 | 484 |
294 @param service(jid.JID, None): service to send the item to | 485 @param service(jid.JID, None): service to send the item to |
295 None to use PEP | 486 None to use PEP |
296 @param NodeIdentifier(unicode): PubSub node to use | 487 @param NodeIdentifier(unicode): PubSub node to use |
303 d = self.publish(client, service, nodeIdentifier, [item_elt]) | 494 d = self.publish(client, service, nodeIdentifier, [item_elt]) |
304 d.addCallback(self._getPublishedItemId, item_id) | 495 d.addCallback(self._getPublishedItemId, item_id) |
305 return d | 496 return d |
306 | 497 |
307 def publish(self, client, service, nodeIdentifier, items=None): | 498 def publish(self, client, service, nodeIdentifier, items=None): |
308 return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) | 499 return client.pubsub_client.publish( |
500 service, nodeIdentifier, items, client.pubsub_client.parent.jid | |
501 ) | |
309 | 502 |
310 def _unwrapMAMMessage(self, message_elt): | 503 def _unwrapMAMMessage(self, message_elt): |
311 try: | 504 try: |
312 item_elt = (message_elt.elements(mam.NS_MAM, 'result').next() | 505 item_elt = ( |
313 .elements(C.NS_FORWARD, 'forwarded').next() | 506 message_elt.elements(mam.NS_MAM, "result") |
314 .elements(C.NS_CLIENT, 'message').next() | 507 .next() |
315 .elements('http://jabber.org/protocol/pubsub#event', 'event').next() | 508 .elements(C.NS_FORWARD, "forwarded") |
316 .elements('http://jabber.org/protocol/pubsub#event', 'items').next() | 509 .next() |
317 .elements('http://jabber.org/protocol/pubsub#event', 'item').next()) | 510 .elements(C.NS_CLIENT, "message") |
511 .next() | |
512 .elements("http://jabber.org/protocol/pubsub#event", "event") | |
513 .next() | |
514 .elements("http://jabber.org/protocol/pubsub#event", "items") | |
515 .next() | |
516 .elements("http://jabber.org/protocol/pubsub#event", "item") | |
517 .next() | |
518 ) | |
318 except StopIteration: | 519 except StopIteration: |
319 raise exceptions.DataError(u"Can't find Item in MAM message element") | 520 raise exceptions.DataError(u"Can't find Item in MAM message element") |
320 return item_elt | 521 return item_elt |
321 | 522 |
322 def _getItems(self, service='', node='', max_items=10, item_ids=None, sub_id=None, extra_dict=None, profile_key=C.PROF_KEY_NONE): | 523 def _getItems( |
524 self, | |
525 service="", | |
526 node="", | |
527 max_items=10, | |
528 item_ids=None, | |
529 sub_id=None, | |
530 extra_dict=None, | |
531 profile_key=C.PROF_KEY_NONE, | |
532 ): | |
323 """Get items from pubsub node | 533 """Get items from pubsub node |
324 | 534 |
325 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | 535 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit |
326 """ | 536 """ |
327 client = self.host.getClient(profile_key) | 537 client = self.host.getClient(profile_key) |
328 service = jid.JID(service) if service else None | 538 service = jid.JID(service) if service else None |
329 max_items = None if max_items == C.NO_LIMIT else max_items | 539 max_items = None if max_items == C.NO_LIMIT else max_items |
330 extra = self.parseExtra(extra_dict) | 540 extra = self.parseExtra(extra_dict) |
331 d = self.getItems(client, service, node or None, max_items or None, item_ids, sub_id or None, extra.rsm_request, extra.extra) | 541 d = self.getItems( |
542 client, | |
543 service, | |
544 node or None, | |
545 max_items or None, | |
546 item_ids, | |
547 sub_id or None, | |
548 extra.rsm_request, | |
549 extra.extra, | |
550 ) | |
332 d.addCallback(self.serItemsData) | 551 d.addCallback(self.serItemsData) |
333 return d | 552 return d |
334 | 553 |
335 def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, extra=None): | 554 def getItems( |
555 self, | |
556 client, | |
557 service, | |
558 node, | |
559 max_items=None, | |
560 item_ids=None, | |
561 sub_id=None, | |
562 rsm_request=None, | |
563 extra=None, | |
564 ): | |
336 """Retrieve pubsub items from a node. | 565 """Retrieve pubsub items from a node. |
337 | 566 |
338 @param service (JID, None): pubsub service. | 567 @param service (JID, None): pubsub service. |
339 @param node (str): node id. | 568 @param node (str): node id. |
340 @param max_items (int): optional limit on the number of retrieved items. | 569 @param max_items (int): optional limit on the number of retrieved items. |
352 if rsm_request and item_ids: | 581 if rsm_request and item_ids: |
353 raise ValueError(u"items_id can't be used with rsm") | 582 raise ValueError(u"items_id can't be used with rsm") |
354 if extra is None: | 583 if extra is None: |
355 extra = {} | 584 extra = {} |
356 try: | 585 try: |
357 mam_query = extra['mam'] | 586 mam_query = extra["mam"] |
358 except KeyError: | 587 except KeyError: |
359 d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, None, rsm_request) | 588 d = client.pubsub_client.items( |
589 service, node, max_items, item_ids, sub_id, None, rsm_request | |
590 ) | |
360 else: | 591 else: |
361 # if mam is requested, we have to do a totally different query | 592 # if mam is requested, we have to do a totally different query |
362 if self._mam is None: | 593 if self._mam is None: |
363 raise exceptions.NotFound(u"MAM (XEP-0313) plugin is not available") | 594 raise exceptions.NotFound(u"MAM (XEP-0313) plugin is not available") |
364 if max_items is not None: | 595 if max_items is not None: |
366 if item_ids: | 597 if item_ids: |
367 raise exceptions.DataError(u"items_ids parameter can't be used with MAM") | 598 raise exceptions.DataError(u"items_ids parameter can't be used with MAM") |
368 if mam_query.node is None: | 599 if mam_query.node is None: |
369 mam_query.node = node | 600 mam_query.node = node |
370 elif mam_query.node != node: | 601 elif mam_query.node != node: |
371 raise exceptions.DataError(u"MAM query node is incoherent with getItems's node") | 602 raise exceptions.DataError( |
603 u"MAM query node is incoherent with getItems's node" | |
604 ) | |
372 if mam_query.rsm is None: | 605 if mam_query.rsm is None: |
373 mam_query.rsm = rsm_request | 606 mam_query.rsm = rsm_request |
374 else: | 607 else: |
375 if mam_query.rsm != rsm_request: | 608 if mam_query.rsm != rsm_request: |
376 raise exceptions.DataError(u"Conflict between RSM request and MAM's RSM request") | 609 raise exceptions.DataError( |
610 u"Conflict between RSM request and MAM's RSM request" | |
611 ) | |
377 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) | 612 d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) |
378 | 613 |
379 try: | 614 try: |
380 subscribe = C.bool(extra['subscribe']) | 615 subscribe = C.bool(extra["subscribe"]) |
381 except KeyError: | 616 except KeyError: |
382 subscribe = False | 617 subscribe = False |
383 | 618 |
384 def subscribeEb(failure, service, node): | 619 def subscribeEb(failure, service, node): |
385 failure.trap(error.StanzaError) | 620 failure.trap(error.StanzaError) |
386 log.warning(u"Could not subscribe to node {} on service {}: {}".format(node, unicode(service), unicode(failure.value))) | 621 log.warning( |
622 u"Could not subscribe to node {} on service {}: {}".format( | |
623 node, unicode(service), unicode(failure.value) | |
624 ) | |
625 ) | |
387 | 626 |
388 def doSubscribe(items): | 627 def doSubscribe(items): |
389 self.subscribe(service, node, profile_key=client.profile).addErrback(subscribeEb, service, node) | 628 self.subscribe(service, node, profile_key=client.profile).addErrback( |
629 subscribeEb, service, node | |
630 ) | |
390 return items | 631 return items |
391 | 632 |
392 if subscribe: | 633 if subscribe: |
393 d.addCallback(doSubscribe) | 634 d.addCallback(doSubscribe) |
394 | 635 |
395 def addMetadata(result): | 636 def addMetadata(result): |
396 items, rsm_response = result | 637 items, rsm_response = result |
397 service_jid = service if service else client.jid.userhostJID() | 638 service_jid = service if service else client.jid.userhostJID() |
398 metadata = {'service': service_jid, | 639 metadata = { |
399 'node': node, | 640 "service": service_jid, |
400 'uri': self.getNodeURI(service_jid, node), | 641 "node": node, |
401 } | 642 "uri": self.getNodeURI(service_jid, node), |
643 } | |
402 if rsm_request is not None and rsm_response is not None: | 644 if rsm_request is not None and rsm_response is not None: |
403 metadata.update({'rsm_{}'.format(key): value for key, value in rsm_response.toDict().iteritems()}) | 645 metadata.update( |
646 { | |
647 "rsm_{}".format(key): value | |
648 for key, value in rsm_response.toDict().iteritems() | |
649 } | |
650 ) | |
404 return (items, metadata) | 651 return (items, metadata) |
405 | 652 |
406 d.addCallback(addMetadata) | 653 d.addCallback(addMetadata) |
407 return d | 654 return d |
408 | 655 |
429 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) | 676 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) |
430 # continue # avoid pubsub "item-not-found" error | 677 # continue # avoid pubsub "item-not-found" error |
431 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) | 678 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) |
432 # defer.returnValue(d_dict) | 679 # defer.returnValue(d_dict) |
433 | 680 |
434 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 681 def getOptions( |
435 client = self.host.getClient(profile_key) | 682 self, |
436 return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) | 683 service, |
437 | 684 nodeIdentifier, |
438 def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 685 subscriber, |
439 client = self.host.getClient(profile_key) | 686 subscriptionIdentifier=None, |
440 return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) | 687 profile_key=C.PROF_KEY_NONE, |
688 ): | |
689 client = self.host.getClient(profile_key) | |
690 return client.pubsub_client.getOptions( | |
691 service, nodeIdentifier, subscriber, subscriptionIdentifier | |
692 ) | |
693 | |
694 def setOptions( | |
695 self, | |
696 service, | |
697 nodeIdentifier, | |
698 subscriber, | |
699 options, | |
700 subscriptionIdentifier=None, | |
701 profile_key=C.PROF_KEY_NONE, | |
702 ): | |
703 client = self.host.getClient(profile_key) | |
704 return client.pubsub_client.setOptions( | |
705 service, nodeIdentifier, subscriber, options, subscriptionIdentifier | |
706 ) | |
441 | 707 |
442 def _createNode(self, service_s, nodeIdentifier, options, profile_key): | 708 def _createNode(self, service_s, nodeIdentifier, options, profile_key): |
443 client = self.host.getClient(profile_key) | 709 client = self.host.getClient(profile_key) |
444 return self.createNode(client, jid.JID(service_s) if service_s else None, nodeIdentifier, options) | 710 return self.createNode( |
711 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options | |
712 ) | |
445 | 713 |
446 def createNode(self, client, service, nodeIdentifier=None, options=None): | 714 def createNode(self, client, service, nodeIdentifier=None, options=None): |
447 """Create a new node | 715 """Create a new node |
448 | 716 |
449 @param service(jid.JID): PubSub service, | 717 @param service(jid.JID): PubSub service, |
459 def createIfNewNode(self, client, service, nodeIdentifier, options=None): | 727 def createIfNewNode(self, client, service, nodeIdentifier, options=None): |
460 """Helper method similar to createNode, but will not fail in case of conflict""" | 728 """Helper method similar to createNode, but will not fail in case of conflict""" |
461 try: | 729 try: |
462 yield self.createNode(client, service, nodeIdentifier, options) | 730 yield self.createNode(client, service, nodeIdentifier, options) |
463 except error.StanzaError as e: | 731 except error.StanzaError as e: |
464 if e.condition == 'conflict': | 732 if e.condition == "conflict": |
465 pass | 733 pass |
466 else: | 734 else: |
467 raise e | 735 raise e |
468 | 736 |
469 def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key): | 737 def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key): |
470 client = self.host.getClient(profile_key) | 738 client = self.host.getClient(profile_key) |
471 d = self.getConfiguration(client, jid.JID(service_s) if service_s else None, nodeIdentifier) | 739 d = self.getConfiguration( |
740 client, jid.JID(service_s) if service_s else None, nodeIdentifier | |
741 ) | |
742 | |
472 def serialize(form): | 743 def serialize(form): |
473 # FIXME: better more generic dataform serialisation should be available in SàT | 744 # FIXME: better more generic dataform serialisation should be available in SàT |
474 return {f.var: unicode(f.value) for f in form.fields.values()} | 745 return {f.var: unicode(f.value) for f in form.fields.values()} |
746 | |
475 d.addCallback(serialize) | 747 d.addCallback(serialize) |
476 return d | 748 return d |
477 | 749 |
478 def getConfiguration(self, client, service, nodeIdentifier): | 750 def getConfiguration(self, client, service, nodeIdentifier): |
479 request = pubsub.PubSubRequest('configureGet') | 751 request = pubsub.PubSubRequest("configureGet") |
480 request.recipient = service | 752 request.recipient = service |
481 request.nodeIdentifier = nodeIdentifier | 753 request.nodeIdentifier = nodeIdentifier |
482 | 754 |
483 def cb(iq): | 755 def cb(iq): |
484 form = data_form.findForm(iq.pubsub.configure, | 756 form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG) |
485 pubsub.NS_PUBSUB_NODE_CONFIG) | |
486 form.typeCheck() | 757 form.typeCheck() |
487 return form | 758 return form |
488 | 759 |
489 d = request.send(client.xmlstream) | 760 d = request.send(client.xmlstream) |
490 d.addCallback(cb) | 761 d.addCallback(cb) |
491 return d | 762 return d |
492 | 763 |
493 def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key): | 764 def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key): |
494 client = self.host.getClient(profile_key) | 765 client = self.host.getClient(profile_key) |
495 d = self.setConfiguration(client, jid.JID(service_s) if service_s else None, nodeIdentifier, options) | 766 d = self.setConfiguration( |
767 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options | |
768 ) | |
496 return d | 769 return d |
497 | 770 |
498 def setConfiguration(self, client, service, nodeIdentifier, options): | 771 def setConfiguration(self, client, service, nodeIdentifier, options): |
499 request = pubsub.PubSubRequest('configureSet') | 772 request = pubsub.PubSubRequest("configureSet") |
500 request.recipient = service | 773 request.recipient = service |
501 request.nodeIdentifier = nodeIdentifier | 774 request.nodeIdentifier = nodeIdentifier |
502 | 775 |
503 form = data_form.Form(formType='submit', | 776 form = data_form.Form( |
504 formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG) | 777 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG |
778 ) | |
505 form.makeFields(options) | 779 form.makeFields(options) |
506 request.options = form | 780 request.options = form |
507 | 781 |
508 d = request.send(client.xmlstream) | 782 d = request.send(client.xmlstream) |
509 return d | 783 return d |
510 | 784 |
511 def _getAffiliations(self, service_s, nodeIdentifier, profile_key): | 785 def _getAffiliations(self, service_s, nodeIdentifier, profile_key): |
512 client = self.host.getClient(profile_key) | 786 client = self.host.getClient(profile_key) |
513 d = self.getAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier or None) | 787 d = self.getAffiliations( |
788 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None | |
789 ) | |
514 return d | 790 return d |
515 | 791 |
516 def getAffiliations(self, client, service, nodeIdentifier=None): | 792 def getAffiliations(self, client, service, nodeIdentifier=None): |
517 """Retrieve affiliations of an entity | 793 """Retrieve affiliations of an entity |
518 | 794 |
519 @param nodeIdentifier(unicode, None): node to get affiliation from | 795 @param nodeIdentifier(unicode, None): node to get affiliation from |
520 None to get all nodes affiliations for this service | 796 None to get all nodes affiliations for this service |
521 """ | 797 """ |
522 request = pubsub.PubSubRequest('affiliations') | 798 request = pubsub.PubSubRequest("affiliations") |
523 request.recipient = service | 799 request.recipient = service |
524 request.nodeIdentifier = nodeIdentifier | 800 request.nodeIdentifier = nodeIdentifier |
525 | 801 |
526 def cb(iq_elt): | 802 def cb(iq_elt): |
527 try: | 803 try: |
528 affiliations_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB, 'affiliations'))) | 804 affiliations_elt = next( |
805 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations")) | |
806 ) | |
529 except StopIteration: | 807 except StopIteration: |
530 raise ValueError(_(u"Invalid result: missing <affiliations> element: {}").format(iq_elt.toXml)) | 808 raise ValueError( |
809 _(u"Invalid result: missing <affiliations> element: {}").format( | |
810 iq_elt.toXml | |
811 ) | |
812 ) | |
531 try: | 813 try: |
532 return {e['node']: e['affiliation'] for e in affiliations_elt.elements((pubsub.NS_PUBSUB, 'affiliation'))} | 814 return { |
815 e["node"]: e["affiliation"] | |
816 for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation")) | |
817 } | |
533 except KeyError: | 818 except KeyError: |
534 raise ValueError(_(u"Invalid result: bad <affiliation> element: {}").format(iq_elt.toXml)) | 819 raise ValueError( |
820 _(u"Invalid result: bad <affiliation> element: {}").format( | |
821 iq_elt.toXml | |
822 ) | |
823 ) | |
535 | 824 |
536 d = request.send(client.xmlstream) | 825 d = request.send(client.xmlstream) |
537 d.addCallback(cb) | 826 d.addCallback(cb) |
538 return d | 827 return d |
539 | 828 |
540 def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key): | 829 def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key): |
541 client = self.host.getClient(profile_key) | 830 client = self.host.getClient(profile_key) |
542 d = self.getNodeAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier) | 831 d = self.getNodeAffiliations( |
543 d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.iteritems()}) | 832 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
833 ) | |
834 d.addCallback( | |
835 lambda affiliations: {j.full(): a for j, a in affiliations.iteritems()} | |
836 ) | |
544 return d | 837 return d |
545 | 838 |
546 def getNodeAffiliations(self, client, service, nodeIdentifier): | 839 def getNodeAffiliations(self, client, service, nodeIdentifier): |
547 """Retrieve affiliations of a node owned by profile""" | 840 """Retrieve affiliations of a node owned by profile""" |
548 request = pubsub.PubSubRequest('affiliationsGet') | 841 request = pubsub.PubSubRequest("affiliationsGet") |
549 request.recipient = service | 842 request.recipient = service |
550 request.nodeIdentifier = nodeIdentifier | 843 request.nodeIdentifier = nodeIdentifier |
551 | 844 |
552 def cb(iq_elt): | 845 def cb(iq_elt): |
553 try: | 846 try: |
554 affiliations_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, 'affiliations'))) | 847 affiliations_elt = next( |
848 iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations")) | |
849 ) | |
555 except StopIteration: | 850 except StopIteration: |
556 raise ValueError(_(u"Invalid result: missing <affiliations> element: {}").format(iq_elt.toXml)) | 851 raise ValueError( |
852 _(u"Invalid result: missing <affiliations> element: {}").format( | |
853 iq_elt.toXml | |
854 ) | |
855 ) | |
557 try: | 856 try: |
558 return {jid.JID(e['jid']): e['affiliation'] for e in affiliations_elt.elements((pubsub.NS_PUBSUB_OWNER, 'affiliation'))} | 857 return { |
858 jid.JID(e["jid"]): e["affiliation"] | |
859 for e in affiliations_elt.elements( | |
860 (pubsub.NS_PUBSUB_OWNER, "affiliation") | |
861 ) | |
862 } | |
559 except KeyError: | 863 except KeyError: |
560 raise ValueError(_(u"Invalid result: bad <affiliation> element: {}").format(iq_elt.toXml)) | 864 raise ValueError( |
865 _(u"Invalid result: bad <affiliation> element: {}").format( | |
866 iq_elt.toXml | |
867 ) | |
868 ) | |
561 | 869 |
562 d = request.send(client.xmlstream) | 870 d = request.send(client.xmlstream) |
563 d.addCallback(cb) | 871 d.addCallback(cb) |
564 return d | 872 return d |
565 | 873 |
566 def _setNodeAffiliations(self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE): | 874 def _setNodeAffiliations( |
567 client = self.host.getClient(profile_key) | 875 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE |
568 affiliations = {jid.JID(jid_): affiliation for jid_, affiliation in affiliations.iteritems()} | 876 ): |
569 d = self.setNodeAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier, affiliations) | 877 client = self.host.getClient(profile_key) |
878 affiliations = { | |
879 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.iteritems() | |
880 } | |
881 d = self.setNodeAffiliations( | |
882 client, | |
883 jid.JID(service_s) if service_s else None, | |
884 nodeIdentifier, | |
885 affiliations, | |
886 ) | |
570 return d | 887 return d |
571 | 888 |
572 def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations): | 889 def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations): |
573 """Update affiliations of a node owned by profile | 890 """Update affiliations of a node owned by profile |
574 | 891 |
575 @param affiliations(dict[jid.JID, unicode]): affiliations to set | 892 @param affiliations(dict[jid.JID, unicode]): affiliations to set |
576 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations | 893 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations |
577 """ | 894 """ |
578 request = pubsub.PubSubRequest('affiliationsSet') | 895 request = pubsub.PubSubRequest("affiliationsSet") |
579 request.recipient = service | 896 request.recipient = service |
580 request.nodeIdentifier = nodeIdentifier | 897 request.nodeIdentifier = nodeIdentifier |
581 request.affiliations = affiliations | 898 request.affiliations = affiliations |
582 d = request.send(client.xmlstream) | 899 d = request.send(client.xmlstream) |
583 return d | 900 return d |
584 | 901 |
585 def _deleteNode(self, service_s, nodeIdentifier, profile_key): | 902 def _deleteNode(self, service_s, nodeIdentifier, profile_key): |
586 client = self.host.getClient(profile_key) | 903 client = self.host.getClient(profile_key) |
587 return self.deleteNode(client, jid.JID(service_s) if service_s else None, nodeIdentifier) | 904 return self.deleteNode( |
905 client, jid.JID(service_s) if service_s else None, nodeIdentifier | |
906 ) | |
588 | 907 |
589 def deleteNode(self, client, service, nodeIdentifier): | 908 def deleteNode(self, client, service, nodeIdentifier): |
590 return client.pubsub_client.deleteNode(service, nodeIdentifier) | 909 return client.pubsub_client.deleteNode(service, nodeIdentifier) |
591 | 910 |
592 def _addWatch(self, service_s, node, profile_key): | 911 def _addWatch(self, service_s, node, profile_key): |
605 """ | 924 """ |
606 client = self.host.getClient(profile_key) | 925 client = self.host.getClient(profile_key) |
607 service = jid.JID(service_s) if service_s else client.jid.userhostJID() | 926 service = jid.JID(service_s) if service_s else client.jid.userhostJID() |
608 client.pubsub_watching.remove((service, node)) | 927 client.pubsub_watching.remove((service, node)) |
609 | 928 |
610 def _retractItem(self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key): | 929 def _retractItem( |
611 return self._retractItems(service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key) | 930 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key |
612 | 931 ): |
613 def _retractItems(self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key): | 932 return self._retractItems( |
614 return self.retractItems(jid.JID(service_s) if service_s else None, nodeIdentifier, itemIdentifiers, notify, profile_key) | 933 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key |
615 | 934 ) |
616 def retractItems(self, service, nodeIdentifier, itemIdentifiers, notify=True, profile_key=C.PROF_KEY_NONE): | 935 |
617 client = self.host.getClient(profile_key) | 936 def _retractItems( |
618 return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers, notify=True) | 937 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key |
938 ): | |
939 return self.retractItems( | |
940 jid.JID(service_s) if service_s else None, | |
941 nodeIdentifier, | |
942 itemIdentifiers, | |
943 notify, | |
944 profile_key, | |
945 ) | |
946 | |
947 def retractItems( | |
948 self, | |
949 service, | |
950 nodeIdentifier, | |
951 itemIdentifiers, | |
952 notify=True, | |
953 profile_key=C.PROF_KEY_NONE, | |
954 ): | |
955 client = self.host.getClient(profile_key) | |
956 return client.pubsub_client.retractItems( | |
957 service, nodeIdentifier, itemIdentifiers, notify=True | |
958 ) | |
619 | 959 |
620 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | 960 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): |
621 client = self.host.getClient(profile_key) | 961 client = self.host.getClient(profile_key) |
622 service = None if not service else jid.JID(service) | 962 service = None if not service else jid.JID(service) |
623 d = self.subscribe(client, service, nodeIdentifier, options=options or None) | 963 d = self.subscribe(client, service, nodeIdentifier, options=options or None) |
624 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or u'') | 964 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or u"") |
625 return d | 965 return d |
626 | 966 |
627 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): | 967 def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): |
628 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe | 968 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe |
629 return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options) | 969 return client.pubsub_client.subscribe( |
970 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options | |
971 ) | |
630 | 972 |
631 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): | 973 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): |
632 client = self.host.getClient(profile_key) | 974 client = self.host.getClient(profile_key) |
633 service = None if not service else jid.JID(service) | 975 service = None if not service else jid.JID(service) |
634 return self.unsubscribe(client, service, nodeIdentifier) | 976 return self.unsubscribe(client, service, nodeIdentifier) |
635 | 977 |
636 def unsubscribe(self, client, service, nodeIdentifier, sub_jid=None, subscriptionIdentifier=None, sender=None): | 978 def unsubscribe( |
637 return client.pubsub_client.unsubscribe(service, nodeIdentifier, sub_jid or client.jid.userhostJID(), subscriptionIdentifier, sender) | 979 self, |
638 | 980 client, |
639 def _subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): | 981 service, |
982 nodeIdentifier, | |
983 sub_jid=None, | |
984 subscriptionIdentifier=None, | |
985 sender=None, | |
986 ): | |
987 return client.pubsub_client.unsubscribe( | |
988 service, | |
989 nodeIdentifier, | |
990 sub_jid or client.jid.userhostJID(), | |
991 subscriptionIdentifier, | |
992 sender, | |
993 ) | |
994 | |
995 def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE): | |
640 client = self.host.getClient(profile_key) | 996 client = self.host.getClient(profile_key) |
641 service = None if not service else jid.JID(service) | 997 service = None if not service else jid.JID(service) |
642 | 998 |
643 def gotSubscriptions(subscriptions): | 999 def gotSubscriptions(subscriptions): |
644 # we replace pubsub.Subscription instance by dict that we can serialize | 1000 # we replace pubsub.Subscription instance by dict that we can serialize |
645 for idx, sub in enumerate(subscriptions): | 1001 for idx, sub in enumerate(subscriptions): |
646 sub_dict = {'node': sub.nodeIdentifier, | 1002 sub_dict = { |
647 'subscriber': sub.subscriber.full(), | 1003 "node": sub.nodeIdentifier, |
648 'state': sub.state | 1004 "subscriber": sub.subscriber.full(), |
649 } | 1005 "state": sub.state, |
1006 } | |
650 if sub.subscriptionIdentifier is not None: | 1007 if sub.subscriptionIdentifier is not None: |
651 sub_dict['id'] = sub.subscriptionIdentifier | 1008 sub_dict["id"] = sub.subscriptionIdentifier |
652 subscriptions[idx] = sub_dict | 1009 subscriptions[idx] = sub_dict |
653 | 1010 |
654 return subscriptions | 1011 return subscriptions |
655 | 1012 |
656 d = self.subscriptions(client, service, nodeIdentifier or None) | 1013 d = self.subscriptions(client, service, nodeIdentifier or None) |
677 """ | 1034 """ |
678 assert service is not None | 1035 assert service is not None |
679 # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122) | 1036 # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122) |
680 # use ";" as a separator. So if more than one value is used in query_data, | 1037 # use ";" as a separator. So if more than one value is used in query_data, |
681 # urlencode MUST NOT BE USED. | 1038 # urlencode MUST NOT BE USED. |
682 query_data = [('node', node.encode('utf-8'))] | 1039 query_data = [("node", node.encode("utf-8"))] |
683 if item is not None: | 1040 if item is not None: |
684 query_data.append(('item', item.encode('utf-8'))) | 1041 query_data.append(("item", item.encode("utf-8"))) |
685 return "xmpp:{service}?;{query}".format( | 1042 return "xmpp:{service}?;{query}".format( |
686 service=service.userhost(), | 1043 service=service.userhost(), query=urllib.urlencode(query_data) |
687 query=urllib.urlencode(query_data) | 1044 ).decode("utf-8") |
688 ).decode('utf-8') | |
689 | 1045 |
690 ## methods to manage several stanzas/jids at once ## | 1046 ## methods to manage several stanzas/jids at once ## |
691 | 1047 |
692 # generic # | 1048 # generic # |
693 | 1049 |
694 def getRTResults(self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE): | 1050 def getRTResults( |
1051 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE | |
1052 ): | |
695 return self.rt_sessions.getResults(session_id, on_success, on_error, profile) | 1053 return self.rt_sessions.getResults(session_id, on_success, on_error, profile) |
696 | 1054 |
697 def serItemsData(self, items_data, item_cb=lambda item: item.toXml()): | 1055 def serItemsData(self, items_data, item_cb=lambda item: item.toXml()): |
698 """Helper method to serialise result from [getItems] | 1056 """Helper method to serialise result from [getItems] |
699 | 1057 |
703 @param items_data(tuple): tuple returned by [getItems] | 1061 @param items_data(tuple): tuple returned by [getItems] |
704 @param item_cb(callable): method to transform each item | 1062 @param item_cb(callable): method to transform each item |
705 @return (tuple): a serialised form ready to go throught bridge | 1063 @return (tuple): a serialised form ready to go throught bridge |
706 """ | 1064 """ |
707 items, metadata = items_data | 1065 items, metadata = items_data |
708 return [item_cb(item) for item in items], {key: unicode(value) for key, value in metadata.iteritems()} | 1066 return ( |
1067 [item_cb(item) for item in items], | |
1068 {key: unicode(value) for key, value in metadata.iteritems()}, | |
1069 ) | |
709 | 1070 |
710 def serItemsDataD(self, items_data, item_cb): | 1071 def serItemsDataD(self, items_data, item_cb): |
711 """Helper method to serialise result from [getItems], deferred version | 1072 """Helper method to serialise result from [getItems], deferred version |
712 | 1073 |
713 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) | 1074 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) |
717 @param items_data(tuple): tuple returned by [getItems] | 1078 @param items_data(tuple): tuple returned by [getItems] |
718 @param item_cb(callable): method to transform each item (must return a deferred) | 1079 @param item_cb(callable): method to transform each item (must return a deferred) |
719 @return (tuple): a deferred which fire a serialised form ready to go throught bridge | 1080 @return (tuple): a deferred which fire a serialised form ready to go throught bridge |
720 """ | 1081 """ |
721 items, metadata = items_data | 1082 items, metadata = items_data |
1083 | |
722 def eb(failure): | 1084 def eb(failure): |
723 log.warning("Error while serialising/parsing item: {}".format(unicode(failure.value))) | 1085 log.warning( |
1086 "Error while serialising/parsing item: {}".format(unicode(failure.value)) | |
1087 ) | |
1088 | |
724 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) | 1089 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) |
1090 | |
725 def finishSerialisation(serialised_items): | 1091 def finishSerialisation(serialised_items): |
726 return [item for item in serialised_items if item is not None], {key: unicode(value) for key, value in metadata.iteritems()} | 1092 return ( |
1093 [item for item in serialised_items if item is not None], | |
1094 {key: unicode(value) for key, value in metadata.iteritems()}, | |
1095 ) | |
1096 | |
727 d.addCallback(finishSerialisation) | 1097 d.addCallback(finishSerialisation) |
728 return d | 1098 return d |
729 | 1099 |
730 def serDList(self, results, failure_result=None): | 1100 def serDList(self, results, failure_result=None): |
731 """Serialise a DeferredList result | 1101 """Serialise a DeferredList result |
737 - failure: empty in case of success, else error message | 1107 - failure: empty in case of success, else error message |
738 - result | 1108 - result |
739 """ | 1109 """ |
740 if failure_result is None: | 1110 if failure_result is None: |
741 failure_result = () | 1111 failure_result = () |
742 return [('', result) if success else (unicode(result.result) or UNSPECIFIED, failure_result) for success, result in results] | 1112 return [ |
1113 ("", result) | |
1114 if success | |
1115 else (unicode(result.result) or UNSPECIFIED, failure_result) | |
1116 for success, result in results | |
1117 ] | |
743 | 1118 |
744 # subscribe # | 1119 # subscribe # |
745 | 1120 |
746 def _getNodeSubscriptions(self, service_s, nodeIdentifier, profile_key): | 1121 def _getNodeSubscriptions(self, service_s, nodeIdentifier, profile_key): |
747 client = self.host.getClient(profile_key) | 1122 client = self.host.getClient(profile_key) |
748 d = self.getNodeSubscriptions(client, jid.JID(service_s) if service_s else None, nodeIdentifier) | 1123 d = self.getNodeSubscriptions( |
749 d.addCallback(lambda subscriptions: {j.full(): a for j, a in subscriptions.iteritems()}) | 1124 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
1125 ) | |
1126 d.addCallback( | |
1127 lambda subscriptions: {j.full(): a for j, a in subscriptions.iteritems()} | |
1128 ) | |
750 return d | 1129 return d |
751 | 1130 |
752 def getNodeSubscriptions(self, client, service, nodeIdentifier): | 1131 def getNodeSubscriptions(self, client, service, nodeIdentifier): |
753 """Retrieve subscriptions to a node | 1132 """Retrieve subscriptions to a node |
754 | 1133 |
755 @param nodeIdentifier(unicode): node to get subscriptions from | 1134 @param nodeIdentifier(unicode): node to get subscriptions from |
756 """ | 1135 """ |
757 if not nodeIdentifier: | 1136 if not nodeIdentifier: |
758 raise exceptions.DataError("node identifier can't be empty") | 1137 raise exceptions.DataError("node identifier can't be empty") |
759 request = pubsub.PubSubRequest('subscriptionsGet') | 1138 request = pubsub.PubSubRequest("subscriptionsGet") |
760 request.recipient = service | 1139 request.recipient = service |
761 request.nodeIdentifier = nodeIdentifier | 1140 request.nodeIdentifier = nodeIdentifier |
762 | 1141 |
763 def cb(iq_elt): | 1142 def cb(iq_elt): |
764 try: | 1143 try: |
765 subscriptions_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB, 'subscriptions'))) | 1144 subscriptions_elt = next( |
1145 iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions")) | |
1146 ) | |
766 except StopIteration: | 1147 except StopIteration: |
767 raise ValueError(_(u"Invalid result: missing <subscriptions> element: {}").format(iq_elt.toXml)) | 1148 raise ValueError( |
1149 _(u"Invalid result: missing <subscriptions> element: {}").format( | |
1150 iq_elt.toXml | |
1151 ) | |
1152 ) | |
768 except AttributeError as e: | 1153 except AttributeError as e: |
769 raise ValueError(_(u"Invalid result: {}").format(e)) | 1154 raise ValueError(_(u"Invalid result: {}").format(e)) |
770 try: | 1155 try: |
771 return {jid.JID(s['jid']): s['subscription'] for s in subscriptions_elt.elements((pubsub.NS_PUBSUB, 'subscription'))} | 1156 return { |
1157 jid.JID(s["jid"]): s["subscription"] | |
1158 for s in subscriptions_elt.elements( | |
1159 (pubsub.NS_PUBSUB, "subscription") | |
1160 ) | |
1161 } | |
772 except KeyError: | 1162 except KeyError: |
773 raise ValueError(_(u"Invalid result: bad <subscription> element: {}").format(iq_elt.toXml)) | 1163 raise ValueError( |
1164 _(u"Invalid result: bad <subscription> element: {}").format( | |
1165 iq_elt.toXml | |
1166 ) | |
1167 ) | |
774 | 1168 |
775 d = request.send(client.xmlstream) | 1169 d = request.send(client.xmlstream) |
776 d.addCallback(cb) | 1170 d.addCallback(cb) |
777 return d | 1171 return d |
778 | 1172 |
779 def _setNodeSubscriptions(self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE): | 1173 def _setNodeSubscriptions( |
780 client = self.host.getClient(profile_key) | 1174 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE |
781 subscriptions = {jid.JID(jid_): subscription for jid_, subscription in subscriptions.iteritems()} | 1175 ): |
782 d = self.setNodeSubscriptions(client, jid.JID(service_s) if service_s else None, nodeIdentifier, subscriptions) | 1176 client = self.host.getClient(profile_key) |
1177 subscriptions = { | |
1178 jid.JID(jid_): subscription | |
1179 for jid_, subscription in subscriptions.iteritems() | |
1180 } | |
1181 d = self.setNodeSubscriptions( | |
1182 client, | |
1183 jid.JID(service_s) if service_s else None, | |
1184 nodeIdentifier, | |
1185 subscriptions, | |
1186 ) | |
783 return d | 1187 return d |
784 | 1188 |
785 def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions): | 1189 def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions): |
786 """Set or update subscriptions of a node owned by profile | 1190 """Set or update subscriptions of a node owned by profile |
787 | 1191 |
788 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set | 1192 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set |
789 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions | 1193 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions |
790 """ | 1194 """ |
791 request = pubsub.PubSubRequest('subscriptionsSet') | 1195 request = pubsub.PubSubRequest("subscriptionsSet") |
792 request.recipient = service | 1196 request.recipient = service |
793 request.nodeIdentifier = nodeIdentifier | 1197 request.nodeIdentifier = nodeIdentifier |
794 request.subscriptions = {pubsub.Subscription(nodeIdentifier, jid_, state) for jid_, state in subscriptions.iteritems()} | 1198 request.subscriptions = { |
1199 pubsub.Subscription(nodeIdentifier, jid_, state) | |
1200 for jid_, state in subscriptions.iteritems() | |
1201 } | |
795 d = request.send(client.xmlstream) | 1202 d = request.send(client.xmlstream) |
796 return d | 1203 return d |
797 | 1204 |
798 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | 1205 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): |
799 """Get real-time results for subcribeToManu session | 1206 """Get real-time results for subcribeToManu session |
806 - and node: pubsub node | 1213 - and node: pubsub node |
807 - failure(unicode): empty string in case of success, error message else | 1214 - failure(unicode): empty string in case of success, error message else |
808 @param profile_key: %(doc_profile_key)s | 1215 @param profile_key: %(doc_profile_key)s |
809 """ | 1216 """ |
810 profile = self.host.getClient(profile_key).profile | 1217 profile = self.host.getClient(profile_key).profile |
811 d = self.rt_sessions.getResults(session_id, on_success=lambda result:'', on_error=lambda failure:unicode(failure.value), profile=profile) | 1218 d = self.rt_sessions.getResults( |
1219 session_id, | |
1220 on_success=lambda result: "", | |
1221 on_error=lambda failure: unicode(failure.value), | |
1222 profile=profile, | |
1223 ) | |
812 # we need to convert jid.JID to unicode with full() to serialise it for the bridge | 1224 # we need to convert jid.JID to unicode with full() to serialise it for the bridge |
813 d.addCallback(lambda ret: (ret[0], [(service.full(), node, '' if success else failure or UNSPECIFIED) | 1225 d.addCallback( |
814 for (service, node), (success, failure) in ret[1].iteritems()])) | 1226 lambda ret: ( |
815 return d | 1227 ret[0], |
816 | 1228 [ |
817 def _subscribeToMany(self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE): | 1229 (service.full(), node, "" if success else failure or UNSPECIFIED) |
818 return self.subscribeToMany([(jid.JID(service), unicode(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key) | 1230 for (service, node), (success, failure) in ret[1].iteritems() |
819 | 1231 ], |
820 def subscribeToMany(self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE): | 1232 ) |
1233 ) | |
1234 return d | |
1235 | |
1236 def _subscribeToMany( | |
1237 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE | |
1238 ): | |
1239 return self.subscribeToMany( | |
1240 [(jid.JID(service), unicode(node)) for service, node in node_data], | |
1241 jid.JID(subscriber), | |
1242 options, | |
1243 profile_key, | |
1244 ) | |
1245 | |
1246 def subscribeToMany( | |
1247 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE | |
1248 ): | |
821 """Subscribe to several nodes at once. | 1249 """Subscribe to several nodes at once. |
822 | 1250 |
823 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | 1251 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: |
824 - service (jid.JID) is the pubsub service | 1252 - service (jid.JID) is the pubsub service |
825 - node (unicode) is the node to subscribe to | 1253 - node (unicode) is the node to subscribe to |
829 @return (str): RT Deferred session id | 1257 @return (str): RT Deferred session id |
830 """ | 1258 """ |
831 client = self.host.getClient(profile_key) | 1259 client = self.host.getClient(profile_key) |
832 deferreds = {} | 1260 deferreds = {} |
833 for service, node in node_data: | 1261 for service, node in node_data: |
834 deferreds[(service, node)] = client.pubsub_client.subscribe(service, node, subscriber, options=options) | 1262 deferreds[(service, node)] = client.pubsub_client.subscribe( |
1263 service, node, subscriber, options=options | |
1264 ) | |
835 return self.rt_sessions.newSession(deferreds, client.profile) | 1265 return self.rt_sessions.newSession(deferreds, client.profile) |
836 # found_nodes = yield self.listNodes(service, profile=client.profile) | 1266 # found_nodes = yield self.listNodes(service, profile=client.profile) |
837 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | 1267 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) |
838 # d_list = [] | 1268 # d_list = [] |
839 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | 1269 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): |
858 - failure (unicode): empty string in case of success, error message else | 1288 - failure (unicode): empty string in case of success, error message else |
859 - items (list[s]): raw XML of items | 1289 - items (list[s]): raw XML of items |
860 - metadata(dict): serialised metadata | 1290 - metadata(dict): serialised metadata |
861 """ | 1291 """ |
862 profile = self.host.getClient(profile_key).profile | 1292 profile = self.host.getClient(profile_key).profile |
863 d = self.rt_sessions.getResults(session_id, | 1293 d = self.rt_sessions.getResults( |
864 on_success=lambda result: ('', self.serItemsData(result)), | 1294 session_id, |
865 on_error=lambda failure: (unicode(failure.value) or UNSPECIFIED, ([],{})), | 1295 on_success=lambda result: ("", self.serItemsData(result)), |
866 profile=profile) | 1296 on_error=lambda failure: (unicode(failure.value) or UNSPECIFIED, ([], {})), |
867 d.addCallback(lambda ret: (ret[0], | 1297 profile=profile, |
868 [(service.full(), node, failure, items, metadata) | 1298 ) |
869 for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) | 1299 d.addCallback( |
870 return d | 1300 lambda ret: ( |
871 | 1301 ret[0], |
872 def _getFromMany(self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE): | 1302 [ |
1303 (service.full(), node, failure, items, metadata) | |
1304 for (service, node), (success, (failure, (items, metadata))) in ret[ | |
1305 1 | |
1306 ].iteritems() | |
1307 ], | |
1308 ) | |
1309 ) | |
1310 return d | |
1311 | |
1312 def _getFromMany( | |
1313 self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE | |
1314 ): | |
873 """ | 1315 """ |
874 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit | 1316 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit |
875 """ | 1317 """ |
876 max_item = None if max_item == C.NO_LIMIT else max_item | 1318 max_item = None if max_item == C.NO_LIMIT else max_item |
877 extra = self.parseExtra(extra_dict) | 1319 extra = self.parseExtra(extra_dict) |
878 return self.getFromMany([(jid.JID(service), unicode(node)) for service, node in node_data], max_item, extra.rsm_request, extra.extra, profile_key) | 1320 return self.getFromMany( |
879 | 1321 [(jid.JID(service), unicode(node)) for service, node in node_data], |
880 def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): | 1322 max_item, |
1323 extra.rsm_request, | |
1324 extra.extra, | |
1325 profile_key, | |
1326 ) | |
1327 | |
1328 def getFromMany( | |
1329 self, | |
1330 node_data, | |
1331 max_item=None, | |
1332 rsm_request=None, | |
1333 extra=None, | |
1334 profile_key=C.PROF_KEY_NONE, | |
1335 ): | |
881 """Get items from many nodes at once | 1336 """Get items from many nodes at once |
882 | 1337 |
883 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | 1338 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: |
884 - service (jid.JID) is the pubsub service | 1339 - service (jid.JID) is the pubsub service |
885 - node (unicode) is the node to get items from | 1340 - node (unicode) is the node to get items from |
889 @return (str): RT Deferred session id | 1344 @return (str): RT Deferred session id |
890 """ | 1345 """ |
891 client = self.host.getClient(profile_key) | 1346 client = self.host.getClient(profile_key) |
892 deferreds = {} | 1347 deferreds = {} |
893 for service, node in node_data: | 1348 for service, node in node_data: |
894 deferreds[(service, node)] = self.getItems(client, service, node, max_item, rsm_request=rsm_request, extra=extra) | 1349 deferreds[(service, node)] = self.getItems( |
1350 client, service, node, max_item, rsm_request=rsm_request, extra=extra | |
1351 ) | |
895 return self.rt_sessions.newSession(deferreds, client.profile) | 1352 return self.rt_sessions.newSession(deferreds, client.profile) |
896 | 1353 |
897 | 1354 |
898 class SatPubSubClient(rsm.PubSubClient): | 1355 class SatPubSubClient(rsm.PubSubClient): |
899 implements(disco.IDisco) | 1356 implements(disco.IDisco) |
928 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): | 1385 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): |
929 callback(self.parent, event) | 1386 callback(self.parent, event) |
930 client = self.parent | 1387 client = self.parent |
931 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1388 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
932 raw_items = [i.toXml() for i in event.items] | 1389 raw_items = [i.toXml() for i in event.items] |
933 self.host.bridge.psEventRaw(event.sender.full(), event.nodeIdentifier, C.PS_ITEMS, raw_items, client.profile) | 1390 self.host.bridge.psEventRaw( |
1391 event.sender.full(), | |
1392 event.nodeIdentifier, | |
1393 C.PS_ITEMS, | |
1394 raw_items, | |
1395 client.profile, | |
1396 ) | |
934 | 1397 |
935 def deleteReceived(self, event): | 1398 def deleteReceived(self, event): |
936 log.debug((u"Publish node deleted")) | 1399 log.debug((u"Publish node deleted")) |
937 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): | 1400 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): |
938 callback(self.parent, event) | 1401 callback(self.parent, event) |
939 client = self.parent | 1402 client = self.parent |
940 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1403 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
941 self.host.bridge.psEventRaw(event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile) | 1404 self.host.bridge.psEventRaw( |
1405 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile | |
1406 ) | |
942 | 1407 |
943 def subscriptions(self, service, nodeIdentifier, sender=None): | 1408 def subscriptions(self, service, nodeIdentifier, sender=None): |
944 """Return the list of subscriptions to the given service and node. | 1409 """Return the list of subscriptions to the given service and node. |
945 | 1410 |
946 @param service: The publish subscribe service to retrieve the subscriptions from. | 1411 @param service: The publish subscribe service to retrieve the subscriptions from. |
947 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | 1412 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} |
948 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). | 1413 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). |
949 @type nodeIdentifier: C{unicode} | 1414 @type nodeIdentifier: C{unicode} |
950 @return (list[pubsub.Subscription]): list of subscriptions | 1415 @return (list[pubsub.Subscription]): list of subscriptions |
951 """ | 1416 """ |
952 request = pubsub.PubSubRequest('subscriptions') | 1417 request = pubsub.PubSubRequest("subscriptions") |
953 request.recipient = service | 1418 request.recipient = service |
954 request.nodeIdentifier = nodeIdentifier | 1419 request.nodeIdentifier = nodeIdentifier |
955 request.sender = sender | 1420 request.sender = sender |
956 d = request.send(self.xmlstream) | 1421 d = request.send(self.xmlstream) |
957 | 1422 |
958 def cb(iq): | 1423 def cb(iq): |
959 subs = [] | 1424 subs = [] |
960 for subscription_elt in iq.pubsub.subscriptions.elements(pubsub.NS_PUBSUB, 'subscription'): | 1425 for subscription_elt in iq.pubsub.subscriptions.elements( |
961 subscription = pubsub.Subscription(subscription_elt['node'], | 1426 pubsub.NS_PUBSUB, "subscription" |
962 jid.JID(subscription_elt['jid']), | 1427 ): |
963 subscription_elt['subscription'], | 1428 subscription = pubsub.Subscription( |
964 subscriptionIdentifier=subscription_elt.getAttribute('subid')) | 1429 subscription_elt["node"], |
1430 jid.JID(subscription_elt["jid"]), | |
1431 subscription_elt["subscription"], | |
1432 subscriptionIdentifier=subscription_elt.getAttribute("subid"), | |
1433 ) | |
965 subs.append(subscription) | 1434 subs.append(subscription) |
966 return subs | 1435 return subs |
967 | 1436 |
968 return d.addCallback(cb) | 1437 return d.addCallback(cb) |
969 | 1438 |
970 def getDiscoInfo(self, requestor, service, nodeIdentifier=''): | 1439 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): |
971 disco_info = [] | 1440 disco_info = [] |
972 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) | 1441 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) |
973 return disco_info | 1442 return disco_info |
974 | 1443 |
975 def getDiscoItems(self, requestor, service, nodeIdentifier=''): | 1444 def getDiscoItems(self, requestor, service, nodeIdentifier=""): |
976 return [] | 1445 return [] |