Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0060.py @ 4037:524856bd7b19
massive refactoring to switch from camelCase to snake_case:
historically, Libervia (SàT before) was using camelCase as allowed by PEP8 when using a
pre-PEP8 code, to use the same coding style as in Twisted.
However, snake_case is more readable and it's better to follow PEP8 best practices, so it
has been decided to move on full snake_case. Because Libervia has a huge codebase, this
ended with a ugly mix of camelCase and snake_case.
To fix that, this patch does a big refactoring by renaming every function and method
(including bridge) that are not coming from Twisted or Wokkel, to use fully snake_case.
This is a massive change, and may result in some bugs.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 08 Apr 2023 13:54:42 +0200 |
parents | 9456852d3286 |
children |
comparison
equal
deleted
inserted
replaced
4036:c4464d7ae97b | 4037:524856bd7b19 |
---|---|
109 self.host = host | 109 self.host = host |
110 self._rsm = host.plugins.get("XEP-0059") | 110 self._rsm = host.plugins.get("XEP-0059") |
111 self._mam = host.plugins.get("XEP-0313") | 111 self._mam = host.plugins.get("XEP-0313") |
112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) | 112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) |
113 self.rt_sessions = sat_defer.RTDeferredSessions() | 113 self.rt_sessions = sat_defer.RTDeferredSessions() |
114 host.bridge.addMethod( | 114 host.bridge.add_method( |
115 "psNodeCreate", | 115 "ps_node_create", |
116 ".plugin", | 116 ".plugin", |
117 in_sign="ssa{ss}s", | 117 in_sign="ssa{ss}s", |
118 out_sign="s", | 118 out_sign="s", |
119 method=self._createNode, | 119 method=self._create_node, |
120 async_=True, | 120 async_=True, |
121 ) | 121 ) |
122 host.bridge.addMethod( | 122 host.bridge.add_method( |
123 "psNodeConfigurationGet", | 123 "ps_node_configuration_get", |
124 ".plugin", | 124 ".plugin", |
125 in_sign="sss", | 125 in_sign="sss", |
126 out_sign="a{ss}", | 126 out_sign="a{ss}", |
127 method=self._getNodeConfiguration, | 127 method=self._get_node_configuration, |
128 async_=True, | 128 async_=True, |
129 ) | 129 ) |
130 host.bridge.addMethod( | 130 host.bridge.add_method( |
131 "psNodeConfigurationSet", | 131 "ps_node_configuration_set", |
132 ".plugin", | 132 ".plugin", |
133 in_sign="ssa{ss}s", | 133 in_sign="ssa{ss}s", |
134 out_sign="", | 134 out_sign="", |
135 method=self._setNodeConfiguration, | 135 method=self._set_node_configuration, |
136 async_=True, | 136 async_=True, |
137 ) | 137 ) |
138 host.bridge.addMethod( | 138 host.bridge.add_method( |
139 "psNodeAffiliationsGet", | 139 "ps_node_affiliations_get", |
140 ".plugin", | 140 ".plugin", |
141 in_sign="sss", | 141 in_sign="sss", |
142 out_sign="a{ss}", | 142 out_sign="a{ss}", |
143 method=self._getNodeAffiliations, | 143 method=self._get_node_affiliations, |
144 async_=True, | 144 async_=True, |
145 ) | 145 ) |
146 host.bridge.addMethod( | 146 host.bridge.add_method( |
147 "psNodeAffiliationsSet", | 147 "ps_node_affiliations_set", |
148 ".plugin", | 148 ".plugin", |
149 in_sign="ssa{ss}s", | 149 in_sign="ssa{ss}s", |
150 out_sign="", | 150 out_sign="", |
151 method=self._setNodeAffiliations, | 151 method=self._set_node_affiliations, |
152 async_=True, | 152 async_=True, |
153 ) | 153 ) |
154 host.bridge.addMethod( | 154 host.bridge.add_method( |
155 "psNodeSubscriptionsGet", | 155 "ps_node_subscriptions_get", |
156 ".plugin", | 156 ".plugin", |
157 in_sign="sss", | 157 in_sign="sss", |
158 out_sign="a{ss}", | 158 out_sign="a{ss}", |
159 method=self._getNodeSubscriptions, | 159 method=self._get_node_subscriptions, |
160 async_=True, | 160 async_=True, |
161 ) | 161 ) |
162 host.bridge.addMethod( | 162 host.bridge.add_method( |
163 "psNodeSubscriptionsSet", | 163 "ps_node_subscriptions_set", |
164 ".plugin", | 164 ".plugin", |
165 in_sign="ssa{ss}s", | 165 in_sign="ssa{ss}s", |
166 out_sign="", | 166 out_sign="", |
167 method=self._setNodeSubscriptions, | 167 method=self._set_node_subscriptions, |
168 async_=True, | 168 async_=True, |
169 ) | 169 ) |
170 host.bridge.addMethod( | 170 host.bridge.add_method( |
171 "psNodePurge", | 171 "ps_node_purge", |
172 ".plugin", | 172 ".plugin", |
173 in_sign="sss", | 173 in_sign="sss", |
174 out_sign="", | 174 out_sign="", |
175 method=self._purgeNode, | 175 method=self._purge_node, |
176 async_=True, | 176 async_=True, |
177 ) | 177 ) |
178 host.bridge.addMethod( | 178 host.bridge.add_method( |
179 "psNodeDelete", | 179 "ps_node_delete", |
180 ".plugin", | 180 ".plugin", |
181 in_sign="sss", | 181 in_sign="sss", |
182 out_sign="", | 182 out_sign="", |
183 method=self._deleteNode, | 183 method=self._delete_node, |
184 async_=True, | 184 async_=True, |
185 ) | 185 ) |
186 host.bridge.addMethod( | 186 host.bridge.add_method( |
187 "psNodeWatchAdd", | 187 "ps_node_watch_add", |
188 ".plugin", | 188 ".plugin", |
189 in_sign="sss", | 189 in_sign="sss", |
190 out_sign="", | 190 out_sign="", |
191 method=self._addWatch, | 191 method=self._addWatch, |
192 async_=False, | 192 async_=False, |
193 ) | 193 ) |
194 host.bridge.addMethod( | 194 host.bridge.add_method( |
195 "psNodeWatchRemove", | 195 "ps_node_watch_remove", |
196 ".plugin", | 196 ".plugin", |
197 in_sign="sss", | 197 in_sign="sss", |
198 out_sign="", | 198 out_sign="", |
199 method=self._removeWatch, | 199 method=self._remove_watch, |
200 async_=False, | 200 async_=False, |
201 ) | 201 ) |
202 host.bridge.addMethod( | 202 host.bridge.add_method( |
203 "psAffiliationsGet", | 203 "ps_affiliations_get", |
204 ".plugin", | 204 ".plugin", |
205 in_sign="sss", | 205 in_sign="sss", |
206 out_sign="a{ss}", | 206 out_sign="a{ss}", |
207 method=self._getAffiliations, | 207 method=self._get_affiliations, |
208 async_=True, | 208 async_=True, |
209 ) | 209 ) |
210 host.bridge.addMethod( | 210 host.bridge.add_method( |
211 "psItemsGet", | 211 "ps_items_get", |
212 ".plugin", | 212 ".plugin", |
213 in_sign="ssiassss", | 213 in_sign="ssiassss", |
214 out_sign="s", | 214 out_sign="s", |
215 method=self._getItems, | 215 method=self._get_items, |
216 async_=True, | 216 async_=True, |
217 ) | 217 ) |
218 host.bridge.addMethod( | 218 host.bridge.add_method( |
219 "psItemSend", | 219 "ps_item_send", |
220 ".plugin", | 220 ".plugin", |
221 in_sign="ssssss", | 221 in_sign="ssssss", |
222 out_sign="s", | 222 out_sign="s", |
223 method=self._sendItem, | 223 method=self._send_item, |
224 async_=True, | 224 async_=True, |
225 ) | 225 ) |
226 host.bridge.addMethod( | 226 host.bridge.add_method( |
227 "psItemsSend", | 227 "ps_items_send", |
228 ".plugin", | 228 ".plugin", |
229 in_sign="ssasss", | 229 in_sign="ssasss", |
230 out_sign="as", | 230 out_sign="as", |
231 method=self._sendItems, | 231 method=self._send_items, |
232 async_=True, | 232 async_=True, |
233 ) | 233 ) |
234 host.bridge.addMethod( | 234 host.bridge.add_method( |
235 "psItemRetract", | 235 "ps_item_retract", |
236 ".plugin", | 236 ".plugin", |
237 in_sign="sssbs", | 237 in_sign="sssbs", |
238 out_sign="", | 238 out_sign="", |
239 method=self._retractItem, | 239 method=self._retract_item, |
240 async_=True, | 240 async_=True, |
241 ) | 241 ) |
242 host.bridge.addMethod( | 242 host.bridge.add_method( |
243 "psItemsRetract", | 243 "ps_items_retract", |
244 ".plugin", | 244 ".plugin", |
245 in_sign="ssasbs", | 245 in_sign="ssasbs", |
246 out_sign="", | 246 out_sign="", |
247 method=self._retractItems, | 247 method=self._retract_items, |
248 async_=True, | 248 async_=True, |
249 ) | 249 ) |
250 host.bridge.addMethod( | 250 host.bridge.add_method( |
251 "psItemRename", | 251 "ps_item_rename", |
252 ".plugin", | 252 ".plugin", |
253 in_sign="sssss", | 253 in_sign="sssss", |
254 out_sign="", | 254 out_sign="", |
255 method=self._renameItem, | 255 method=self._rename_item, |
256 async_=True, | 256 async_=True, |
257 ) | 257 ) |
258 host.bridge.addMethod( | 258 host.bridge.add_method( |
259 "psSubscribe", | 259 "ps_subscribe", |
260 ".plugin", | 260 ".plugin", |
261 in_sign="ssss", | 261 in_sign="ssss", |
262 out_sign="s", | 262 out_sign="s", |
263 method=self._subscribe, | 263 method=self._subscribe, |
264 async_=True, | 264 async_=True, |
265 ) | 265 ) |
266 host.bridge.addMethod( | 266 host.bridge.add_method( |
267 "psUnsubscribe", | 267 "ps_unsubscribe", |
268 ".plugin", | 268 ".plugin", |
269 in_sign="sss", | 269 in_sign="sss", |
270 out_sign="", | 270 out_sign="", |
271 method=self._unsubscribe, | 271 method=self._unsubscribe, |
272 async_=True, | 272 async_=True, |
273 ) | 273 ) |
274 host.bridge.addMethod( | 274 host.bridge.add_method( |
275 "psSubscriptionsGet", | 275 "ps_subscriptions_get", |
276 ".plugin", | 276 ".plugin", |
277 in_sign="sss", | 277 in_sign="sss", |
278 out_sign="s", | 278 out_sign="s", |
279 method=self._subscriptions, | 279 method=self._subscriptions, |
280 async_=True, | 280 async_=True, |
281 ) | 281 ) |
282 host.bridge.addMethod( | 282 host.bridge.add_method( |
283 "psSubscribeToMany", | 283 "ps_subscribe_to_many", |
284 ".plugin", | 284 ".plugin", |
285 in_sign="a(ss)sa{ss}s", | 285 in_sign="a(ss)sa{ss}s", |
286 out_sign="s", | 286 out_sign="s", |
287 method=self._subscribeToMany, | 287 method=self._subscribe_to_many, |
288 ) | 288 ) |
289 host.bridge.addMethod( | 289 host.bridge.add_method( |
290 "psGetSubscribeRTResult", | 290 "ps_get_subscribe_rt_result", |
291 ".plugin", | 291 ".plugin", |
292 in_sign="ss", | 292 in_sign="ss", |
293 out_sign="(ua(sss))", | 293 out_sign="(ua(sss))", |
294 method=self._manySubscribeRTResult, | 294 method=self._many_subscribe_rt_result, |
295 async_=True, | 295 async_=True, |
296 ) | 296 ) |
297 host.bridge.addMethod( | 297 host.bridge.add_method( |
298 "psGetFromMany", | 298 "ps_get_from_many", |
299 ".plugin", | 299 ".plugin", |
300 in_sign="a(ss)iss", | 300 in_sign="a(ss)iss", |
301 out_sign="s", | 301 out_sign="s", |
302 method=self._getFromMany, | 302 method=self._get_from_many, |
303 ) | 303 ) |
304 host.bridge.addMethod( | 304 host.bridge.add_method( |
305 "psGetFromManyRTResult", | 305 "ps_get_from_many_rt_result", |
306 ".plugin", | 306 ".plugin", |
307 in_sign="ss", | 307 in_sign="ss", |
308 out_sign="(ua(sssasa{ss}))", | 308 out_sign="(ua(sssasa{ss}))", |
309 method=self._getFromManyRTResult, | 309 method=self._get_from_many_rt_result, |
310 async_=True, | 310 async_=True, |
311 ) | 311 ) |
312 | 312 |
313 # high level observer method | 313 # high level observer method |
314 host.bridge.addSignal( | 314 host.bridge.add_signal( |
315 "psEvent", ".plugin", signature="ssssss" | 315 "ps_event", ".plugin", signature="ssssss" |
316 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile | 316 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile |
317 | 317 |
318 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) | 318 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) |
319 host.bridge.addSignal( | 319 host.bridge.add_signal( |
320 "psEventRaw", ".plugin", signature="sssass" | 320 "ps_event_raw", ".plugin", signature="sssass" |
321 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile | 321 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile |
322 | 322 |
323 def getHandler(self, client): | 323 def get_handler(self, client): |
324 client.pubsub_client = SatPubSubClient(self.host, self) | 324 client.pubsub_client = SatPubSubClient(self.host, self) |
325 return client.pubsub_client | 325 return client.pubsub_client |
326 | 326 |
327 async def profileConnected(self, client): | 327 async def profile_connected(self, client): |
328 client.pubsub_watching = set() | 328 client.pubsub_watching = set() |
329 try: | 329 try: |
330 client.pubsub_service = jid.JID( | 330 client.pubsub_service = jid.JID( |
331 self.host.memory.getConfig("", "pubsub_service") | 331 self.host.memory.config_get("", "pubsub_service") |
332 ) | 332 ) |
333 except RuntimeError: | 333 except RuntimeError: |
334 log.info( | 334 log.info( |
335 _( | 335 _( |
336 "Can't retrieve pubsub_service from conf, we'll use first one that " | 336 "Can't retrieve pubsub_service from conf, we'll use first one that " |
337 "we find" | 337 "we find" |
338 ) | 338 ) |
339 ) | 339 ) |
340 pubsub_services = await self.host.findServiceEntities( | 340 pubsub_services = await self.host.find_service_entities( |
341 client, "pubsub", "service" | 341 client, "pubsub", "service" |
342 ) | 342 ) |
343 for service_jid in pubsub_services: | 343 for service_jid in pubsub_services: |
344 infos = await self.host.memory.disco.getInfos(client, service_jid) | 344 infos = await self.host.memory.disco.get_infos(client, service_jid) |
345 if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features): | 345 if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features): |
346 continue | 346 continue |
347 names = {(n or "").lower() for n in infos.identities.values()} | 347 names = {(n or "").lower() for n in infos.identities.values()} |
348 if "libervia pubsub service" in names: | 348 if "libervia pubsub service" in names: |
349 # this is the name of Libervia's side project pubsub service, we know | 349 # this is the name of Libervia's side project pubsub service, we know |
365 pubsub_service_str = ( | 365 pubsub_service_str = ( |
366 client.pubsub_service.full() if client.pubsub_service else "PEP" | 366 client.pubsub_service.full() if client.pubsub_service else "PEP" |
367 ) | 367 ) |
368 log.info(f"default pubsub service: {pubsub_service_str}") | 368 log.info(f"default pubsub service: {pubsub_service_str}") |
369 | 369 |
370 def getFeatures(self, profile): | 370 def features_get(self, profile): |
371 try: | 371 try: |
372 client = self.host.getClient(profile) | 372 client = self.host.get_client(profile) |
373 except exceptions.ProfileNotSetError: | 373 except exceptions.ProfileNotSetError: |
374 return {} | 374 return {} |
375 try: | 375 try: |
376 return { | 376 return { |
377 "service": client.pubsub_service.full() | 377 "service": client.pubsub_service.full() |
378 if client.pubsub_service is not None | 378 if client.pubsub_service is not None |
379 else "" | 379 else "" |
380 } | 380 } |
381 except AttributeError: | 381 except AttributeError: |
382 if self.host.isConnected(profile): | 382 if self.host.is_connected(profile): |
383 log.debug("Profile is not connected, service is not checked yet") | 383 log.debug("Profile is not connected, service is not checked yet") |
384 else: | 384 else: |
385 log.error("Service should be available !") | 385 log.error("Service should be available !") |
386 return {} | 386 return {} |
387 | 387 |
388 def parseExtra(self, extra): | 388 def parse_extra(self, extra): |
389 """Parse extra dictionnary | 389 """Parse extra dictionnary |
390 | 390 |
391 used bridge's extra dictionnaries | 391 used bridge's extra dictionnaries |
392 @param extra(dict): extra data used to configure request | 392 @param extra(dict): extra data used to configure request |
393 @return(Extra): filled Extra instance | 393 @return(Extra): filled Extra instance |
405 | 405 |
406 # rsm | 406 # rsm |
407 if self._rsm is None: | 407 if self._rsm is None: |
408 rsm_request = None | 408 rsm_request = None |
409 else: | 409 else: |
410 rsm_request = self._rsm.parseExtra(extra) | 410 rsm_request = self._rsm.parse_extra(extra) |
411 | 411 |
412 # mam | 412 # mam |
413 if self._mam is None: | 413 if self._mam is None: |
414 mam_request = None | 414 mam_request = None |
415 else: | 415 else: |
416 mam_request = self._mam.parseExtra(extra, with_rsm=False) | 416 mam_request = self._mam.parse_extra(extra, with_rsm=False) |
417 | 417 |
418 if mam_request is not None: | 418 if mam_request is not None: |
419 assert "mam" not in extra | 419 assert "mam" not in extra |
420 extra["mam"] = mam_request | 420 extra["mam"] = mam_request |
421 | 421 |
422 return Extra(rsm_request, extra) | 422 return Extra(rsm_request, extra) |
423 | 423 |
424 def addManagedNode( | 424 def add_managed_node( |
425 self, | 425 self, |
426 node: str, | 426 node: str, |
427 priority: int = 0, | 427 priority: int = 0, |
428 **kwargs: Callable | 428 **kwargs: Callable |
429 ): | 429 ): |
447 assert event_name in C.PS_EVENTS | 447 assert event_name in C.PS_EVENTS |
448 cb_list = callbacks.setdefault(event_name, []) | 448 cb_list = callbacks.setdefault(event_name, []) |
449 cb_list.append((cb, priority)) | 449 cb_list.append((cb, priority)) |
450 cb_list.sort(key=lambda c: c[1], reverse=True) | 450 cb_list.sort(key=lambda c: c[1], reverse=True) |
451 | 451 |
452 def removeManagedNode(self, node, *args): | 452 def remove_managed_node(self, node, *args): |
453 """Add a handler for a node | 453 """Add a handler for a node |
454 | 454 |
455 @param node(unicode): node to monitor | 455 @param node(unicode): node to monitor |
456 @param *args: callback(s) to remove | 456 @param *args: callback(s) to remove |
457 """ | 457 """ |
488 # @param service (JID): target service | 488 # @param service (JID): target service |
489 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) | 489 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) |
490 # @param profile (str): %(doc_profile)s | 490 # @param profile (str): %(doc_profile)s |
491 # @return: deferred which fire a list of nodes | 491 # @return: deferred which fire a list of nodes |
492 # """ | 492 # """ |
493 # client = self.host.getClient(profile) | 493 # client = self.host.get_client(profile) |
494 # d = self.host.getDiscoItems(client, service, nodeIdentifier) | 494 # d = self.host.getDiscoItems(client, service, nodeIdentifier) |
495 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) | 495 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) |
496 # return d | 496 # return d |
497 | 497 |
498 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): | 498 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): |
510 # """ | 510 # """ |
511 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) | 511 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) |
512 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) | 512 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) |
513 # return d | 513 # return d |
514 | 514 |
515 def _sendItem(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", | 515 def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", |
516 profile_key=C.PROF_KEY_NONE): | 516 profile_key=C.PROF_KEY_NONE): |
517 client = self.host.getClient(profile_key) | 517 client = self.host.get_client(profile_key) |
518 service = None if not service else jid.JID(service) | 518 service = None if not service else jid.JID(service) |
519 payload = xml_tools.parse(payload) | 519 payload = xml_tools.parse(payload) |
520 extra = data_format.deserialise(extra_ser) | 520 extra = data_format.deserialise(extra_ser) |
521 d = defer.ensureDeferred(self.sendItem( | 521 d = defer.ensureDeferred(self.send_item( |
522 client, service, nodeIdentifier, payload, item_id or None, extra | 522 client, service, nodeIdentifier, payload, item_id or None, extra |
523 )) | 523 )) |
524 d.addCallback(lambda ret: ret or "") | 524 d.addCallback(lambda ret: ret or "") |
525 return d | 525 return d |
526 | 526 |
527 def _sendItems(self, service, nodeIdentifier, items, extra_ser=None, | 527 def _send_items(self, service, nodeIdentifier, items, extra_ser=None, |
528 profile_key=C.PROF_KEY_NONE): | 528 profile_key=C.PROF_KEY_NONE): |
529 client = self.host.getClient(profile_key) | 529 client = self.host.get_client(profile_key) |
530 service = None if not service else jid.JID(service) | 530 service = None if not service else jid.JID(service) |
531 try: | 531 try: |
532 items = [xml_tools.parse(item) for item in items] | 532 items = [xml_tools.parse(item) for item in items] |
533 except Exception as e: | 533 except Exception as e: |
534 raise exceptions.DataError(_("Can't parse items: {msg}").format( | 534 raise exceptions.DataError(_("Can't parse items: {msg}").format( |
535 msg=e)) | 535 msg=e)) |
536 extra = data_format.deserialise(extra_ser) | 536 extra = data_format.deserialise(extra_ser) |
537 return defer.ensureDeferred(self.sendItems( | 537 return defer.ensureDeferred(self.send_items( |
538 client, service, nodeIdentifier, items, extra=extra | 538 client, service, nodeIdentifier, items, extra=extra |
539 )) | 539 )) |
540 | 540 |
541 async def sendItem( | 541 async def send_item( |
542 self, | 542 self, |
543 client: SatXMPPClient, | 543 client: SatXMPPClient, |
544 service: Union[jid.JID, None], | 544 service: Union[jid.JID, None], |
545 nodeIdentifier: str, | 545 nodeIdentifier: str, |
546 payload: domish.Element, | 546 payload: domish.Element, |
559 assert isinstance(payload, domish.Element) | 559 assert isinstance(payload, domish.Element) |
560 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) | 560 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) |
561 if item_id is not None: | 561 if item_id is not None: |
562 item_elt['id'] = item_id | 562 item_elt['id'] = item_id |
563 item_elt.addChild(payload) | 563 item_elt.addChild(payload) |
564 published_ids = await self.sendItems( | 564 published_ids = await self.send_items( |
565 client, | 565 client, |
566 service, | 566 service, |
567 nodeIdentifier, | 567 nodeIdentifier, |
568 [item_elt], | 568 [item_elt], |
569 extra=extra | 569 extra=extra |
571 try: | 571 try: |
572 return published_ids[0] | 572 return published_ids[0] |
573 except IndexError: | 573 except IndexError: |
574 return item_id | 574 return item_id |
575 | 575 |
576 async def sendItems( | 576 async def send_items( |
577 self, | 577 self, |
578 client: SatXMPPEntity, | 578 client: SatXMPPEntity, |
579 service: Optional[jid.JID], | 579 service: Optional[jid.JID], |
580 nodeIdentifier: str, | 580 nodeIdentifier: str, |
581 items: List[domish.Element], | 581 items: List[domish.Element], |
674 """ | 674 """ |
675 if sender is None: | 675 if sender is None: |
676 sender = client.jid | 676 sender = client.jid |
677 if extra is None: | 677 if extra is None: |
678 extra = {} | 678 extra = {} |
679 if not await self.host.trigger.asyncPoint( | 679 if not await self.host.trigger.async_point( |
680 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, | 680 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, |
681 extra | 681 extra |
682 ): | 682 ): |
683 return extra["iq_result_elt"] | 683 return extra["iq_result_elt"] |
684 iq_result_elt = await client.pubsub_client.publish( | 684 iq_result_elt = await client.pubsub_client.publish( |
685 service, nodeIdentifier, items, sender, | 685 service, nodeIdentifier, items, sender, |
686 options=options | 686 options=options |
687 ) | 687 ) |
688 return iq_result_elt | 688 return iq_result_elt |
689 | 689 |
690 def _unwrapMAMMessage(self, message_elt): | 690 def _unwrap_mam_message(self, message_elt): |
691 try: | 691 try: |
692 item_elt = reduce( | 692 item_elt = reduce( |
693 lambda elt, ns_name: next(elt.elements(*ns_name)), | 693 lambda elt, ns_name: next(elt.elements(*ns_name)), |
694 (message_elt, | 694 (message_elt, |
695 (mam.NS_MAM, "result"), | 695 (mam.NS_MAM, "result"), |
701 )) | 701 )) |
702 except StopIteration: | 702 except StopIteration: |
703 raise exceptions.DataError("Can't find Item in MAM message element") | 703 raise exceptions.DataError("Can't find Item in MAM message element") |
704 return item_elt | 704 return item_elt |
705 | 705 |
706 def serialiseItems(self, items_data): | 706 def serialise_items(self, items_data): |
707 items, metadata = items_data | 707 items, metadata = items_data |
708 metadata['items'] = items | 708 metadata['items'] = items |
709 return data_format.serialise(metadata) | 709 return data_format.serialise(metadata) |
710 | 710 |
711 def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, | 711 def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None, |
712 extra="", profile_key=C.PROF_KEY_NONE): | 712 extra="", profile_key=C.PROF_KEY_NONE): |
713 """Get items from pubsub node | 713 """Get items from pubsub node |
714 | 714 |
715 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | 715 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit |
716 """ | 716 """ |
717 client = self.host.getClient(profile_key) | 717 client = self.host.get_client(profile_key) |
718 service = jid.JID(service) if service else None | 718 service = jid.JID(service) if service else None |
719 max_items = None if max_items == C.NO_LIMIT else max_items | 719 max_items = None if max_items == C.NO_LIMIT else max_items |
720 extra = self.parseExtra(data_format.deserialise(extra)) | 720 extra = self.parse_extra(data_format.deserialise(extra)) |
721 d = defer.ensureDeferred(self.getItems( | 721 d = defer.ensureDeferred(self.get_items( |
722 client, | 722 client, |
723 service, | 723 service, |
724 node, | 724 node, |
725 max_items, | 725 max_items, |
726 item_ids, | 726 item_ids, |
727 sub_id or None, | 727 sub_id or None, |
728 extra.rsm_request, | 728 extra.rsm_request, |
729 extra.extra, | 729 extra.extra, |
730 )) | 730 )) |
731 d.addCallback(self.transItemsData) | 731 d.addCallback(self.trans_items_data) |
732 d.addCallback(self.serialiseItems) | 732 d.addCallback(self.serialise_items) |
733 return d | 733 return d |
734 | 734 |
735 async def getItems( | 735 async def get_items( |
736 self, | 736 self, |
737 client: SatXMPPEntity, | 737 client: SatXMPPEntity, |
738 service: Optional[jid.JID], | 738 service: Optional[jid.JID], |
739 node: str, | 739 node: str, |
740 max_items: Optional[int] = None, | 740 max_items: Optional[int] = None, |
765 max_items = None | 765 max_items = None |
766 if rsm_request and item_ids: | 766 if rsm_request and item_ids: |
767 raise ValueError("items_id can't be used with rsm") | 767 raise ValueError("items_id can't be used with rsm") |
768 if extra is None: | 768 if extra is None: |
769 extra = {} | 769 extra = {} |
770 cont, ret = await self.host.trigger.asyncReturnPoint( | 770 cont, ret = await self.host.trigger.async_return_point( |
771 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, | 771 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, |
772 rsm_request, extra | 772 rsm_request, extra |
773 ) | 773 ) |
774 if not cont: | 774 if not cont: |
775 return ret | 775 return ret |
786 orderBy = extra.get(C.KEY_ORDER_BY), | 786 orderBy = extra.get(C.KEY_ORDER_BY), |
787 rsm_request = rsm_request, | 787 rsm_request = rsm_request, |
788 extra = extra | 788 extra = extra |
789 )) | 789 )) |
790 # we have no MAM data here, so we add None | 790 # we have no MAM data here, so we add None |
791 d.addErrback(sat_defer.stanza2NotFound) | 791 d.addErrback(sat_defer.stanza_2_not_found) |
792 d.addTimeout(TIMEOUT, reactor) | 792 d.addTimeout(TIMEOUT, reactor) |
793 items, rsm_response = await d | 793 items, rsm_response = await d |
794 mam_response = None | 794 mam_response = None |
795 else: | 795 else: |
796 # if mam is requested, we have to do a totally different query | 796 # if mam is requested, we have to do a totally different query |
802 raise exceptions.DataError("items_ids parameter can't be used with MAM") | 802 raise exceptions.DataError("items_ids parameter can't be used with MAM") |
803 if mam_query.node is None: | 803 if mam_query.node is None: |
804 mam_query.node = node | 804 mam_query.node = node |
805 elif mam_query.node != node: | 805 elif mam_query.node != node: |
806 raise exceptions.DataError( | 806 raise exceptions.DataError( |
807 "MAM query node is incoherent with getItems's node" | 807 "MAM query node is incoherent with get_items's node" |
808 ) | 808 ) |
809 if mam_query.rsm is None: | 809 if mam_query.rsm is None: |
810 mam_query.rsm = rsm_request | 810 mam_query.rsm = rsm_request |
811 else: | 811 else: |
812 if mam_query.rsm != rsm_request: | 812 if mam_query.rsm != rsm_request: |
813 raise exceptions.DataError( | 813 raise exceptions.DataError( |
814 "Conflict between RSM request and MAM's RSM request" | 814 "Conflict between RSM request and MAM's RSM request" |
815 ) | 815 ) |
816 items, rsm_response, mam_response = await self._mam.getArchives( | 816 items, rsm_response, mam_response = await self._mam.get_archives( |
817 client, mam_query, service, self._unwrapMAMMessage | 817 client, mam_query, service, self._unwrap_mam_message |
818 ) | 818 ) |
819 | 819 |
820 try: | 820 try: |
821 subscribe = C.bool(extra["subscribe"]) | 821 subscribe = C.bool(extra["subscribe"]) |
822 except KeyError: | 822 except KeyError: |
833 # TODO: handle mam_response | 833 # TODO: handle mam_response |
834 service_jid = service if service else client.jid.userhostJID() | 834 service_jid = service if service else client.jid.userhostJID() |
835 metadata = { | 835 metadata = { |
836 "service": service_jid, | 836 "service": service_jid, |
837 "node": node, | 837 "node": node, |
838 "uri": self.getNodeURI(service_jid, node), | 838 "uri": self.get_node_uri(service_jid, node), |
839 } | 839 } |
840 if mam_response is not None: | 840 if mam_response is not None: |
841 # mam_response is a dict with "complete" and "stable" keys | 841 # mam_response is a dict with "complete" and "stable" keys |
842 # we can put them directly in metadata | 842 # we can put them directly in metadata |
843 metadata.update(mam_response) | 843 metadata.update(mam_response) |
873 # - key: a value in (a subset of) data.keys() | 873 # - key: a value in (a subset of) data.keys() |
874 # - couple (list[dict], dict) containing: | 874 # - couple (list[dict], dict) containing: |
875 # - list of items | 875 # - list of items |
876 # - RSM response data | 876 # - RSM response data |
877 # """ | 877 # """ |
878 # client = self.host.getClient(profile_key) | 878 # client = self.host.get_client(profile_key) |
879 # found_nodes = yield self.listNodes(service, profile=client.profile) | 879 # found_nodes = yield self.listNodes(service, profile=client.profile) |
880 # d_dict = {} | 880 # d_dict = {} |
881 # for publisher, node in data.items(): | 881 # for publisher, node in data.items(): |
882 # if node not in found_nodes: | 882 # if node not in found_nodes: |
883 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) | 883 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) |
884 # continue # avoid pubsub "item-not-found" error | 884 # continue # avoid pubsub "item-not-found" error |
885 # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) | 885 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile) |
886 # defer.returnValue(d_dict) | 886 # defer.returnValue(d_dict) |
887 | 887 |
888 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, | 888 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, |
889 profile_key=C.PROF_KEY_NONE): | 889 profile_key=C.PROF_KEY_NONE): |
890 client = self.host.getClient(profile_key) | 890 client = self.host.get_client(profile_key) |
891 return client.pubsub_client.getOptions( | 891 return client.pubsub_client.getOptions( |
892 service, nodeIdentifier, subscriber, subscriptionIdentifier | 892 service, nodeIdentifier, subscriber, subscriptionIdentifier |
893 ) | 893 ) |
894 | 894 |
895 def setOptions(self, service, nodeIdentifier, subscriber, options, | 895 def setOptions(self, service, nodeIdentifier, subscriber, options, |
896 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | 896 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): |
897 client = self.host.getClient(profile_key) | 897 client = self.host.get_client(profile_key) |
898 return client.pubsub_client.setOptions( | 898 return client.pubsub_client.setOptions( |
899 service, nodeIdentifier, subscriber, options, subscriptionIdentifier | 899 service, nodeIdentifier, subscriber, options, subscriptionIdentifier |
900 ) | 900 ) |
901 | 901 |
902 def _createNode(self, service_s, nodeIdentifier, options, profile_key): | 902 def _create_node(self, service_s, nodeIdentifier, options, profile_key): |
903 client = self.host.getClient(profile_key) | 903 client = self.host.get_client(profile_key) |
904 return self.createNode( | 904 return self.createNode( |
905 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options | 905 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options |
906 ) | 906 ) |
907 | 907 |
908 def createNode( | 908 def createNode( |
922 """ | 922 """ |
923 # TODO: if pubsub service doesn't hande publish-options, configure it in a second time | 923 # TODO: if pubsub service doesn't hande publish-options, configure it in a second time |
924 return client.pubsub_client.createNode(service, nodeIdentifier, options) | 924 return client.pubsub_client.createNode(service, nodeIdentifier, options) |
925 | 925 |
926 @defer.inlineCallbacks | 926 @defer.inlineCallbacks |
927 def createIfNewNode(self, client, service, nodeIdentifier, options=None): | 927 def create_if_new_node(self, client, service, nodeIdentifier, options=None): |
928 """Helper method similar to createNode, but will not fail in case of conflict""" | 928 """Helper method similar to createNode, but will not fail in case of conflict""" |
929 try: | 929 try: |
930 yield self.createNode(client, service, nodeIdentifier, options) | 930 yield self.createNode(client, service, nodeIdentifier, options) |
931 except error.StanzaError as e: | 931 except error.StanzaError as e: |
932 if e.condition == "conflict": | 932 if e.condition == "conflict": |
933 pass | 933 pass |
934 else: | 934 else: |
935 raise e | 935 raise e |
936 | 936 |
937 def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key): | 937 def _get_node_configuration(self, service_s, nodeIdentifier, profile_key): |
938 client = self.host.getClient(profile_key) | 938 client = self.host.get_client(profile_key) |
939 d = self.getConfiguration( | 939 d = self.getConfiguration( |
940 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 940 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
941 ) | 941 ) |
942 | 942 |
943 def serialize(form): | 943 def serialize(form): |
967 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG | 967 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG |
968 ) | 968 ) |
969 form.makeFields(options) | 969 form.makeFields(options) |
970 return form | 970 return form |
971 | 971 |
972 def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key): | 972 def _set_node_configuration(self, service_s, nodeIdentifier, options, profile_key): |
973 client = self.host.getClient(profile_key) | 973 client = self.host.get_client(profile_key) |
974 d = self.setConfiguration( | 974 d = self.setConfiguration( |
975 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options | 975 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options |
976 ) | 976 ) |
977 return d | 977 return d |
978 | 978 |
985 request.options = form | 985 request.options = form |
986 | 986 |
987 d = request.send(client.xmlstream) | 987 d = request.send(client.xmlstream) |
988 return d | 988 return d |
989 | 989 |
990 def _getAffiliations(self, service_s, nodeIdentifier, profile_key): | 990 def _get_affiliations(self, service_s, nodeIdentifier, profile_key): |
991 client = self.host.getClient(profile_key) | 991 client = self.host.get_client(profile_key) |
992 d = self.getAffiliations( | 992 d = self.get_affiliations( |
993 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None | 993 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None |
994 ) | 994 ) |
995 return d | 995 return d |
996 | 996 |
997 def getAffiliations(self, client, service, nodeIdentifier=None): | 997 def get_affiliations(self, client, service, nodeIdentifier=None): |
998 """Retrieve affiliations of an entity | 998 """Retrieve affiliations of an entity |
999 | 999 |
1000 @param nodeIdentifier(unicode, None): node to get affiliation from | 1000 @param nodeIdentifier(unicode, None): node to get affiliation from |
1001 None to get all nodes affiliations for this service | 1001 None to get all nodes affiliations for this service |
1002 """ | 1002 """ |
1029 | 1029 |
1030 d = request.send(client.xmlstream) | 1030 d = request.send(client.xmlstream) |
1031 d.addCallback(cb) | 1031 d.addCallback(cb) |
1032 return d | 1032 return d |
1033 | 1033 |
1034 def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key): | 1034 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key): |
1035 client = self.host.getClient(profile_key) | 1035 client = self.host.get_client(profile_key) |
1036 d = self.getNodeAffiliations( | 1036 d = self.get_node_affiliations( |
1037 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 1037 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
1038 ) | 1038 ) |
1039 d.addCallback( | 1039 d.addCallback( |
1040 lambda affiliations: {j.full(): a for j, a in affiliations.items()} | 1040 lambda affiliations: {j.full(): a for j, a in affiliations.items()} |
1041 ) | 1041 ) |
1042 return d | 1042 return d |
1043 | 1043 |
1044 def getNodeAffiliations(self, client, service, nodeIdentifier): | 1044 def get_node_affiliations(self, client, service, nodeIdentifier): |
1045 """Retrieve affiliations of a node owned by profile""" | 1045 """Retrieve affiliations of a node owned by profile""" |
1046 request = pubsub.PubSubRequest("affiliationsGet") | 1046 request = pubsub.PubSubRequest("affiliationsGet") |
1047 request.recipient = service | 1047 request.recipient = service |
1048 request.nodeIdentifier = nodeIdentifier | 1048 request.nodeIdentifier = nodeIdentifier |
1049 | 1049 |
1074 | 1074 |
1075 d = request.send(client.xmlstream) | 1075 d = request.send(client.xmlstream) |
1076 d.addCallback(cb) | 1076 d.addCallback(cb) |
1077 return d | 1077 return d |
1078 | 1078 |
1079 def _setNodeAffiliations( | 1079 def _set_node_affiliations( |
1080 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE | 1080 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE |
1081 ): | 1081 ): |
1082 client = self.host.getClient(profile_key) | 1082 client = self.host.get_client(profile_key) |
1083 affiliations = { | 1083 affiliations = { |
1084 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items() | 1084 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items() |
1085 } | 1085 } |
1086 d = self.setNodeAffiliations( | 1086 d = self.set_node_affiliations( |
1087 client, | 1087 client, |
1088 jid.JID(service_s) if service_s else None, | 1088 jid.JID(service_s) if service_s else None, |
1089 nodeIdentifier, | 1089 nodeIdentifier, |
1090 affiliations, | 1090 affiliations, |
1091 ) | 1091 ) |
1092 return d | 1092 return d |
1093 | 1093 |
1094 def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations): | 1094 def set_node_affiliations(self, client, service, nodeIdentifier, affiliations): |
1095 """Update affiliations of a node owned by profile | 1095 """Update affiliations of a node owned by profile |
1096 | 1096 |
1097 @param affiliations(dict[jid.JID, unicode]): affiliations to set | 1097 @param affiliations(dict[jid.JID, unicode]): affiliations to set |
1098 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations | 1098 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations |
1099 """ | 1099 """ |
1102 request.nodeIdentifier = nodeIdentifier | 1102 request.nodeIdentifier = nodeIdentifier |
1103 request.affiliations = affiliations | 1103 request.affiliations = affiliations |
1104 d = request.send(client.xmlstream) | 1104 d = request.send(client.xmlstream) |
1105 return d | 1105 return d |
1106 | 1106 |
1107 def _purgeNode(self, service_s, nodeIdentifier, profile_key): | 1107 def _purge_node(self, service_s, nodeIdentifier, profile_key): |
1108 client = self.host.getClient(profile_key) | 1108 client = self.host.get_client(profile_key) |
1109 return self.purgeNode( | 1109 return self.purge_node( |
1110 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 1110 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
1111 ) | 1111 ) |
1112 | 1112 |
1113 def purgeNode(self, client, service, nodeIdentifier): | 1113 def purge_node(self, client, service, nodeIdentifier): |
1114 return client.pubsub_client.purgeNode(service, nodeIdentifier) | 1114 return client.pubsub_client.purge_node(service, nodeIdentifier) |
1115 | 1115 |
1116 def _deleteNode(self, service_s, nodeIdentifier, profile_key): | 1116 def _delete_node(self, service_s, nodeIdentifier, profile_key): |
1117 client = self.host.getClient(profile_key) | 1117 client = self.host.get_client(profile_key) |
1118 return self.deleteNode( | 1118 return self.deleteNode( |
1119 client, jid.JID(service_s) if service_s else None, nodeIdentifier | 1119 client, jid.JID(service_s) if service_s else None, nodeIdentifier |
1120 ) | 1120 ) |
1121 | 1121 |
1122 def deleteNode( | 1122 def deleteNode( |
1130 def _addWatch(self, service_s, node, profile_key): | 1130 def _addWatch(self, service_s, node, profile_key): |
1131 """watch modifications on a node | 1131 """watch modifications on a node |
1132 | 1132 |
1133 This method should only be called from bridge | 1133 This method should only be called from bridge |
1134 """ | 1134 """ |
1135 client = self.host.getClient(profile_key) | 1135 client = self.host.get_client(profile_key) |
1136 service = jid.JID(service_s) if service_s else client.jid.userhostJID() | 1136 service = jid.JID(service_s) if service_s else client.jid.userhostJID() |
1137 client.pubsub_watching.add((service, node)) | 1137 client.pubsub_watching.add((service, node)) |
1138 | 1138 |
1139 def _removeWatch(self, service_s, node, profile_key): | 1139 def _remove_watch(self, service_s, node, profile_key): |
1140 """remove a node watch | 1140 """remove a node watch |
1141 | 1141 |
1142 This method should only be called from bridge | 1142 This method should only be called from bridge |
1143 """ | 1143 """ |
1144 client = self.host.getClient(profile_key) | 1144 client = self.host.get_client(profile_key) |
1145 service = jid.JID(service_s) if service_s else client.jid.userhostJID() | 1145 service = jid.JID(service_s) if service_s else client.jid.userhostJID() |
1146 client.pubsub_watching.remove((service, node)) | 1146 client.pubsub_watching.remove((service, node)) |
1147 | 1147 |
1148 def _retractItem( | 1148 def _retract_item( |
1149 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key | 1149 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key |
1150 ): | 1150 ): |
1151 return self._retractItems( | 1151 return self._retract_items( |
1152 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key | 1152 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key |
1153 ) | 1153 ) |
1154 | 1154 |
1155 def _retractItems( | 1155 def _retract_items( |
1156 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key | 1156 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key |
1157 ): | 1157 ): |
1158 client = self.host.getClient(profile_key) | 1158 client = self.host.get_client(profile_key) |
1159 return self.retractItems( | 1159 return self.retract_items( |
1160 client, | 1160 client, |
1161 jid.JID(service_s) if service_s else None, | 1161 jid.JID(service_s) if service_s else None, |
1162 nodeIdentifier, | 1162 nodeIdentifier, |
1163 itemIdentifiers, | 1163 itemIdentifiers, |
1164 notify, | 1164 notify, |
1165 ) | 1165 ) |
1166 | 1166 |
1167 def retractItems( | 1167 def retract_items( |
1168 self, | 1168 self, |
1169 client: SatXMPPClient, | 1169 client: SatXMPPClient, |
1170 service: jid.JID, | 1170 service: jid.JID, |
1171 nodeIdentifier: str, | 1171 nodeIdentifier: str, |
1172 itemIdentifiers: Iterable[str], | 1172 itemIdentifiers: Iterable[str], |
1174 ) -> defer.Deferred: | 1174 ) -> defer.Deferred: |
1175 return client.pubsub_client.retractItems( | 1175 return client.pubsub_client.retractItems( |
1176 service, nodeIdentifier, itemIdentifiers, notify=notify | 1176 service, nodeIdentifier, itemIdentifiers, notify=notify |
1177 ) | 1177 ) |
1178 | 1178 |
1179 def _renameItem( | 1179 def _rename_item( |
1180 self, | 1180 self, |
1181 service, | 1181 service, |
1182 node, | 1182 node, |
1183 item_id, | 1183 item_id, |
1184 new_id, | 1184 new_id, |
1185 profile_key=C.PROF_KEY_NONE, | 1185 profile_key=C.PROF_KEY_NONE, |
1186 ): | 1186 ): |
1187 client = self.host.getClient(profile_key) | 1187 client = self.host.get_client(profile_key) |
1188 service = jid.JID(service) if service else None | 1188 service = jid.JID(service) if service else None |
1189 return defer.ensureDeferred(self.renameItem( | 1189 return defer.ensureDeferred(self.rename_item( |
1190 client, service, node, item_id, new_id | 1190 client, service, node, item_id, new_id |
1191 )) | 1191 )) |
1192 | 1192 |
1193 async def renameItem( | 1193 async def rename_item( |
1194 self, | 1194 self, |
1195 client: SatXMPPEntity, | 1195 client: SatXMPPEntity, |
1196 service: Optional[jid.JID], | 1196 service: Optional[jid.JID], |
1197 node: str, | 1197 node: str, |
1198 item_id: str, | 1198 item_id: str, |
1205 """ | 1205 """ |
1206 if not item_id or not new_id: | 1206 if not item_id or not new_id: |
1207 raise ValueError("item_id and new_id must not be empty") | 1207 raise ValueError("item_id and new_id must not be empty") |
1208 # retract must be done last, so if something goes wrong, the exception will stop | 1208 # retract must be done last, so if something goes wrong, the exception will stop |
1209 # the workflow and no accidental delete should happen | 1209 # the workflow and no accidental delete should happen |
1210 item_elt = (await self.getItems(client, service, node, item_ids=[item_id]))[0][0] | 1210 item_elt = (await self.get_items(client, service, node, item_ids=[item_id]))[0][0] |
1211 await self.sendItem(client, service, node, item_elt.firstChildElement(), new_id) | 1211 await self.send_item(client, service, node, item_elt.firstChildElement(), new_id) |
1212 await self.retractItems(client, service, node, [item_id]) | 1212 await self.retract_items(client, service, node, [item_id]) |
1213 | 1213 |
1214 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | 1214 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): |
1215 client = self.host.getClient(profile_key) | 1215 client = self.host.get_client(profile_key) |
1216 service = None if not service else jid.JID(service) | 1216 service = None if not service else jid.JID(service) |
1217 d = defer.ensureDeferred( | 1217 d = defer.ensureDeferred( |
1218 self.subscribe( | 1218 self.subscribe( |
1219 client, | 1219 client, |
1220 service, | 1220 service, |
1234 options: Optional[dict] = None | 1234 options: Optional[dict] = None |
1235 ) -> pubsub.Subscription: | 1235 ) -> pubsub.Subscription: |
1236 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe | 1236 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe |
1237 if service is None: | 1237 if service is None: |
1238 service = client.jid.userhostJID() | 1238 service = client.jid.userhostJID() |
1239 cont, trigger_sub = await self.host.trigger.asyncReturnPoint( | 1239 cont, trigger_sub = await self.host.trigger.async_return_point( |
1240 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, | 1240 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, |
1241 ) | 1241 ) |
1242 if not cont: | 1242 if not cont: |
1243 return trigger_sub | 1243 return trigger_sub |
1244 try: | 1244 try: |
1252 else: | 1252 else: |
1253 raise e | 1253 raise e |
1254 return subscription | 1254 return subscription |
1255 | 1255 |
1256 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): | 1256 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): |
1257 client = self.host.getClient(profile_key) | 1257 client = self.host.get_client(profile_key) |
1258 service = None if not service else jid.JID(service) | 1258 service = None if not service else jid.JID(service) |
1259 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier)) | 1259 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier)) |
1260 | 1260 |
1261 async def unsubscribe( | 1261 async def unsubscribe( |
1262 self, | 1262 self, |
1265 nodeIdentifier: str, | 1265 nodeIdentifier: str, |
1266 sub_jid: Optional[jid.JID] = None, | 1266 sub_jid: Optional[jid.JID] = None, |
1267 subscriptionIdentifier: Optional[str] = None, | 1267 subscriptionIdentifier: Optional[str] = None, |
1268 sender: Optional[jid.JID] = None, | 1268 sender: Optional[jid.JID] = None, |
1269 ) -> None: | 1269 ) -> None: |
1270 if not await self.host.trigger.asyncPoint( | 1270 if not await self.host.trigger.async_point( |
1271 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, | 1271 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, |
1272 subscriptionIdentifier, sender | 1272 subscriptionIdentifier, sender |
1273 ): | 1273 ): |
1274 return | 1274 return |
1275 try: | 1275 try: |
1296 self, | 1296 self, |
1297 service="", | 1297 service="", |
1298 nodeIdentifier="", | 1298 nodeIdentifier="", |
1299 profile_key=C.PROF_KEY_NONE | 1299 profile_key=C.PROF_KEY_NONE |
1300 ) -> str: | 1300 ) -> str: |
1301 client = self.host.getClient(profile_key) | 1301 client = self.host.get_client(profile_key) |
1302 service = None if not service else jid.JID(service) | 1302 service = None if not service else jid.JID(service) |
1303 subs = await self.subscriptions(client, service, nodeIdentifier or None) | 1303 subs = await self.subscriptions(client, service, nodeIdentifier or None) |
1304 return data_format.serialise(subs) | 1304 return data_format.serialise(subs) |
1305 | 1305 |
1306 async def subscriptions( | 1306 async def subscriptions( |
1313 | 1313 |
1314 @param service(jid.JID): PubSub service | 1314 @param service(jid.JID): PubSub service |
1315 @param nodeIdentifier(unicode, None): node to check | 1315 @param nodeIdentifier(unicode, None): node to check |
1316 None to get all subscriptions | 1316 None to get all subscriptions |
1317 """ | 1317 """ |
1318 cont, ret = await self.host.trigger.asyncReturnPoint( | 1318 cont, ret = await self.host.trigger.async_return_point( |
1319 "XEP-0060_subscriptions", client, service, node | 1319 "XEP-0060_subscriptions", client, service, node |
1320 ) | 1320 ) |
1321 if not cont: | 1321 if not cont: |
1322 return ret | 1322 return ret |
1323 subs = await client.pubsub_client.subscriptions(service, node) | 1323 subs = await client.pubsub_client.subscriptions(service, node) |
1334 ret.append(sub_dict) | 1334 ret.append(sub_dict) |
1335 return ret | 1335 return ret |
1336 | 1336 |
1337 ## misc tools ## | 1337 ## misc tools ## |
1338 | 1338 |
1339 def getNodeURI(self, service, node, item=None): | 1339 def get_node_uri(self, service, node, item=None): |
1340 """Return XMPP URI of a PubSub node | 1340 """Return XMPP URI of a PubSub node |
1341 | 1341 |
1342 @param service(jid.JID): PubSub service | 1342 @param service(jid.JID): PubSub service |
1343 @param node(unicode): node | 1343 @param node(unicode): node |
1344 @return (unicode): URI of the node | 1344 @return (unicode): URI of the node |
1357 | 1357 |
1358 ## methods to manage several stanzas/jids at once ## | 1358 ## methods to manage several stanzas/jids at once ## |
1359 | 1359 |
1360 # generic # | 1360 # generic # |
1361 | 1361 |
1362 def getRTResults( | 1362 def get_rt_results( |
1363 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE | 1363 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE |
1364 ): | 1364 ): |
1365 return self.rt_sessions.getResults(session_id, on_success, on_error, profile) | 1365 return self.rt_sessions.get_results(session_id, on_success, on_error, profile) |
1366 | 1366 |
1367 def transItemsData(self, items_data, item_cb=lambda item: item.toXml()): | 1367 def trans_items_data(self, items_data, item_cb=lambda item: item.toXml()): |
1368 """Helper method to transform result from [getItems] | 1368 """Helper method to transform result from [get_items] |
1369 | 1369 |
1370 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) | 1370 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) |
1371 as returned by [getItems]. | 1371 as returned by [get_items]. |
1372 @param items_data(tuple): tuple returned by [getItems] | 1372 @param items_data(tuple): tuple returned by [get_items] |
1373 @param item_cb(callable): method to transform each item | 1373 @param item_cb(callable): method to transform each item |
1374 @return (tuple): a serialised form ready to go throught bridge | 1374 @return (tuple): a serialised form ready to go throught bridge |
1375 """ | 1375 """ |
1376 items, metadata = items_data | 1376 items, metadata = items_data |
1377 items = [item_cb(item) for item in items] | 1377 items = [item_cb(item) for item in items] |
1378 | 1378 |
1379 return (items, metadata) | 1379 return (items, metadata) |
1380 | 1380 |
1381 def transItemsDataD(self, items_data, item_cb): | 1381 def trans_items_data_d(self, items_data, item_cb): |
1382 """Helper method to transform result from [getItems], deferred version | 1382 """Helper method to transform result from [get_items], deferred version |
1383 | 1383 |
1384 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) | 1384 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) |
1385 as returned by [getItems]. metadata values are then casted to unicode and | 1385 as returned by [get_items]. metadata values are then casted to unicode and |
1386 each item is passed to items_cb. | 1386 each item is passed to items_cb. |
1387 An errback is added to item_cb, and when it is fired the value is filtered from | 1387 An errback is added to item_cb, and when it is fired the value is filtered from |
1388 final items | 1388 final items |
1389 @param items_data(tuple): tuple returned by [getItems] | 1389 @param items_data(tuple): tuple returned by [get_items] |
1390 @param item_cb(callable): method to transform each item (must return a deferred) | 1390 @param item_cb(callable): method to transform each item (must return a deferred) |
1391 @return (tuple): a deferred which fire a dict which can be serialised to go | 1391 @return (tuple): a deferred which fire a dict which can be serialised to go |
1392 throught bridge | 1392 throught bridge |
1393 """ | 1393 """ |
1394 items, metadata = items_data | 1394 items, metadata = items_data |
1401 [i for i in parsed_items if i is not None], | 1401 [i for i in parsed_items if i is not None], |
1402 metadata | 1402 metadata |
1403 )) | 1403 )) |
1404 return d | 1404 return d |
1405 | 1405 |
1406 def serDList(self, results, failure_result=None): | 1406 def ser_d_list(self, results, failure_result=None): |
1407 """Serialise a DeferredList result | 1407 """Serialise a DeferredList result |
1408 | 1408 |
1409 @param results: DeferredList results | 1409 @param results: DeferredList results |
1410 @param failure_result: value to use as value for failed Deferred | 1410 @param failure_result: value to use as value for failed Deferred |
1411 (default: empty tuple) | 1411 (default: empty tuple) |
1423 ] | 1423 ] |
1424 | 1424 |
1425 # subscribe # | 1425 # subscribe # |
1426 | 1426 |
1427 @utils.ensure_deferred | 1427 @utils.ensure_deferred |
1428 async def _getNodeSubscriptions( | 1428 async def _get_node_subscriptions( |
1429 self, | 1429 self, |
1430 service: str, | 1430 service: str, |
1431 node: str, | 1431 node: str, |
1432 profile_key: str | 1432 profile_key: str |
1433 ) -> Dict[str, str]: | 1433 ) -> Dict[str, str]: |
1434 client = self.host.getClient(profile_key) | 1434 client = self.host.get_client(profile_key) |
1435 subs = await self.getNodeSubscriptions( | 1435 subs = await self.get_node_subscriptions( |
1436 client, jid.JID(service) if service else None, node | 1436 client, jid.JID(service) if service else None, node |
1437 ) | 1437 ) |
1438 return {j.full(): a for j, a in subs.items()} | 1438 return {j.full(): a for j, a in subs.items()} |
1439 | 1439 |
1440 async def getNodeSubscriptions( | 1440 async def get_node_subscriptions( |
1441 self, | 1441 self, |
1442 client: SatXMPPEntity, | 1442 client: SatXMPPEntity, |
1443 service: Optional[jid.JID], | 1443 service: Optional[jid.JID], |
1444 nodeIdentifier: str | 1444 nodeIdentifier: str |
1445 ) -> Dict[jid.JID, str]: | 1445 ) -> Dict[jid.JID, str]: |
1478 _("Invalid result: bad <subscription> element: {}").format( | 1478 _("Invalid result: bad <subscription> element: {}").format( |
1479 iq_elt.toXml | 1479 iq_elt.toXml |
1480 ) | 1480 ) |
1481 ) | 1481 ) |
1482 | 1482 |
1483 def _setNodeSubscriptions( | 1483 def _set_node_subscriptions( |
1484 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE | 1484 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE |
1485 ): | 1485 ): |
1486 client = self.host.getClient(profile_key) | 1486 client = self.host.get_client(profile_key) |
1487 subscriptions = { | 1487 subscriptions = { |
1488 jid.JID(jid_): subscription | 1488 jid.JID(jid_): subscription |
1489 for jid_, subscription in subscriptions.items() | 1489 for jid_, subscription in subscriptions.items() |
1490 } | 1490 } |
1491 d = self.setNodeSubscriptions( | 1491 d = self.set_node_subscriptions( |
1492 client, | 1492 client, |
1493 jid.JID(service_s) if service_s else None, | 1493 jid.JID(service_s) if service_s else None, |
1494 nodeIdentifier, | 1494 nodeIdentifier, |
1495 subscriptions, | 1495 subscriptions, |
1496 ) | 1496 ) |
1497 return d | 1497 return d |
1498 | 1498 |
1499 def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions): | 1499 def set_node_subscriptions(self, client, service, nodeIdentifier, subscriptions): |
1500 """Set or update subscriptions of a node owned by profile | 1500 """Set or update subscriptions of a node owned by profile |
1501 | 1501 |
1502 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set | 1502 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set |
1503 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions | 1503 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions |
1504 """ | 1504 """ |
1510 for jid_, state in subscriptions.items() | 1510 for jid_, state in subscriptions.items() |
1511 } | 1511 } |
1512 d = request.send(client.xmlstream) | 1512 d = request.send(client.xmlstream) |
1513 return d | 1513 return d |
1514 | 1514 |
1515 def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | 1515 def _many_subscribe_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): |
1516 """Get real-time results for subcribeToManu session | 1516 """Get real-time results for subcribeToManu session |
1517 | 1517 |
1518 @param session_id: id of the real-time deferred session | 1518 @param session_id: id of the real-time deferred session |
1519 @param return (tuple): (remaining, results) where: | 1519 @param return (tuple): (remaining, results) where: |
1520 - remaining is the number of still expected results | 1520 - remaining is the number of still expected results |
1522 - service: pubsub service | 1522 - service: pubsub service |
1523 - and node: pubsub node | 1523 - and node: pubsub node |
1524 - failure(unicode): empty string in case of success, error message else | 1524 - failure(unicode): empty string in case of success, error message else |
1525 @param profile_key: %(doc_profile_key)s | 1525 @param profile_key: %(doc_profile_key)s |
1526 """ | 1526 """ |
1527 profile = self.host.getClient(profile_key).profile | 1527 profile = self.host.get_client(profile_key).profile |
1528 d = self.rt_sessions.getResults( | 1528 d = self.rt_sessions.get_results( |
1529 session_id, | 1529 session_id, |
1530 on_success=lambda result: "", | 1530 on_success=lambda result: "", |
1531 on_error=lambda failure: str(failure.value), | 1531 on_error=lambda failure: str(failure.value), |
1532 profile=profile, | 1532 profile=profile, |
1533 ) | 1533 ) |
1541 ], | 1541 ], |
1542 ) | 1542 ) |
1543 ) | 1543 ) |
1544 return d | 1544 return d |
1545 | 1545 |
1546 def _subscribeToMany( | 1546 def _subscribe_to_many( |
1547 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE | 1547 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE |
1548 ): | 1548 ): |
1549 return self.subscribeToMany( | 1549 return self.subscribe_to_many( |
1550 [(jid.JID(service), str(node)) for service, node in node_data], | 1550 [(jid.JID(service), str(node)) for service, node in node_data], |
1551 jid.JID(subscriber), | 1551 jid.JID(subscriber), |
1552 options, | 1552 options, |
1553 profile_key, | 1553 profile_key, |
1554 ) | 1554 ) |
1555 | 1555 |
1556 def subscribeToMany( | 1556 def subscribe_to_many( |
1557 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE | 1557 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE |
1558 ): | 1558 ): |
1559 """Subscribe to several nodes at once. | 1559 """Subscribe to several nodes at once. |
1560 | 1560 |
1561 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | 1561 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: |
1564 @param subscriber (jid.JID): optional subscription identifier. | 1564 @param subscriber (jid.JID): optional subscription identifier. |
1565 @param options (dict): subscription options | 1565 @param options (dict): subscription options |
1566 @param profile_key (str): %(doc_profile_key)s | 1566 @param profile_key (str): %(doc_profile_key)s |
1567 @return (str): RT Deferred session id | 1567 @return (str): RT Deferred session id |
1568 """ | 1568 """ |
1569 client = self.host.getClient(profile_key) | 1569 client = self.host.get_client(profile_key) |
1570 deferreds = {} | 1570 deferreds = {} |
1571 for service, node in node_data: | 1571 for service, node in node_data: |
1572 deferreds[(service, node)] = defer.ensureDeferred( | 1572 deferreds[(service, node)] = defer.ensureDeferred( |
1573 client.pubsub_client.subscribe( | 1573 client.pubsub_client.subscribe( |
1574 service, node, subscriber, options=options | 1574 service, node, subscriber, options=options |
1575 ) | 1575 ) |
1576 ) | 1576 ) |
1577 return self.rt_sessions.newSession(deferreds, client.profile) | 1577 return self.rt_sessions.new_session(deferreds, client.profile) |
1578 # found_nodes = yield self.listNodes(service, profile=client.profile) | 1578 # found_nodes = yield self.listNodes(service, profile=client.profile) |
1579 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | 1579 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) |
1580 # d_list = [] | 1580 # d_list = [] |
1581 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | 1581 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): |
1582 # if nodeIdentifier not in found_nodes: | 1582 # if nodeIdentifier not in found_nodes: |
1585 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) | 1585 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) |
1586 # defer.returnValue(d_list) | 1586 # defer.returnValue(d_list) |
1587 | 1587 |
1588 # get # | 1588 # get # |
1589 | 1589 |
1590 def _getFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | 1590 def _get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): |
1591 """Get real-time results for getFromMany session | 1591 """Get real-time results for get_from_many session |
1592 | 1592 |
1593 @param session_id: id of the real-time deferred session | 1593 @param session_id: id of the real-time deferred session |
1594 @param profile_key: %(doc_profile_key)s | 1594 @param profile_key: %(doc_profile_key)s |
1595 @param return (tuple): (remaining, results) where: | 1595 @param return (tuple): (remaining, results) where: |
1596 - remaining is the number of still expected results | 1596 - remaining is the number of still expected results |
1599 - node (unicode): pubsub node | 1599 - node (unicode): pubsub node |
1600 - failure (unicode): empty string in case of success, error message else | 1600 - failure (unicode): empty string in case of success, error message else |
1601 - items (list[s]): raw XML of items | 1601 - items (list[s]): raw XML of items |
1602 - metadata(dict): serialised metadata | 1602 - metadata(dict): serialised metadata |
1603 """ | 1603 """ |
1604 profile = self.host.getClient(profile_key).profile | 1604 profile = self.host.get_client(profile_key).profile |
1605 d = self.rt_sessions.getResults( | 1605 d = self.rt_sessions.get_results( |
1606 session_id, | 1606 session_id, |
1607 on_success=lambda result: ("", self.transItemsData(result)), | 1607 on_success=lambda result: ("", self.trans_items_data(result)), |
1608 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})), | 1608 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})), |
1609 profile=profile, | 1609 profile=profile, |
1610 ) | 1610 ) |
1611 d.addCallback( | 1611 d.addCallback( |
1612 lambda ret: ( | 1612 lambda ret: ( |
1619 ], | 1619 ], |
1620 ) | 1620 ) |
1621 ) | 1621 ) |
1622 return d | 1622 return d |
1623 | 1623 |
1624 def _getFromMany( | 1624 def _get_from_many( |
1625 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE | 1625 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE |
1626 ): | 1626 ): |
1627 """ | 1627 """ |
1628 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit | 1628 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit |
1629 """ | 1629 """ |
1630 max_item = None if max_item == C.NO_LIMIT else max_item | 1630 max_item = None if max_item == C.NO_LIMIT else max_item |
1631 extra = self.parseExtra(data_format.deserialise(extra)) | 1631 extra = self.parse_extra(data_format.deserialise(extra)) |
1632 return self.getFromMany( | 1632 return self.get_from_many( |
1633 [(jid.JID(service), str(node)) for service, node in node_data], | 1633 [(jid.JID(service), str(node)) for service, node in node_data], |
1634 max_item, | 1634 max_item, |
1635 extra.rsm_request, | 1635 extra.rsm_request, |
1636 extra.extra, | 1636 extra.extra, |
1637 profile_key, | 1637 profile_key, |
1638 ) | 1638 ) |
1639 | 1639 |
1640 def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, | 1640 def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None, |
1641 profile_key=C.PROF_KEY_NONE): | 1641 profile_key=C.PROF_KEY_NONE): |
1642 """Get items from many nodes at once | 1642 """Get items from many nodes at once |
1643 | 1643 |
1644 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | 1644 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: |
1645 - service (jid.JID) is the pubsub service | 1645 - service (jid.JID) is the pubsub service |
1647 @param max_items (int): optional limit on the number of retrieved items. | 1647 @param max_items (int): optional limit on the number of retrieved items. |
1648 @param rsm_request (RSMRequest): RSM request data | 1648 @param rsm_request (RSMRequest): RSM request data |
1649 @param profile_key (unicode): %(doc_profile_key)s | 1649 @param profile_key (unicode): %(doc_profile_key)s |
1650 @return (str): RT Deferred session id | 1650 @return (str): RT Deferred session id |
1651 """ | 1651 """ |
1652 client = self.host.getClient(profile_key) | 1652 client = self.host.get_client(profile_key) |
1653 deferreds = {} | 1653 deferreds = {} |
1654 for service, node in node_data: | 1654 for service, node in node_data: |
1655 deferreds[(service, node)] = defer.ensureDeferred(self.getItems( | 1655 deferreds[(service, node)] = defer.ensureDeferred(self.get_items( |
1656 client, service, node, max_item, rsm_request=rsm_request, extra=extra | 1656 client, service, node, max_item, rsm_request=rsm_request, extra=extra |
1657 )) | 1657 )) |
1658 return self.rt_sessions.newSession(deferreds, client.profile) | 1658 return self.rt_sessions.new_session(deferreds, client.profile) |
1659 | 1659 |
1660 | 1660 |
1661 @implementer(disco.IDisco) | 1661 @implementer(disco.IDisco) |
1662 class SatPubSubClient(rsm.PubSubClient): | 1662 class SatPubSubClient(rsm.PubSubClient): |
1663 | 1663 |
1687 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, | 1687 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, |
1688 itemIdentifiers, orderBy, rsm_request | 1688 itemIdentifiers, orderBy, rsm_request |
1689 ) | 1689 ) |
1690 # items must be returned, thus this async point can't stop the workflow (but it | 1690 # items must be returned, thus this async point can't stop the workflow (but it |
1691 # can modify returned items) | 1691 # can modify returned items) |
1692 await self.host.trigger.asyncPoint( | 1692 await self.host.trigger.async_point( |
1693 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, | 1693 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, |
1694 extra | 1694 extra |
1695 ) | 1695 ) |
1696 return items, rsm_response | 1696 return items, rsm_response |
1697 | 1697 |
1698 def _getNodeCallbacks(self, node, event): | 1698 def _get_node_callbacks(self, node, event): |
1699 """Generate callbacks from given node and event | 1699 """Generate callbacks from given node and event |
1700 | 1700 |
1701 @param node(unicode): node used for the item | 1701 @param node(unicode): node used for the item |
1702 any registered node which prefix the node will match | 1702 any registered node which prefix the node will match |
1703 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE | 1703 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE |
1710 for callback_data in callbacks_dict[event]: | 1710 for callback_data in callbacks_dict[event]: |
1711 yield callback_data[0] | 1711 yield callback_data[0] |
1712 except KeyError: | 1712 except KeyError: |
1713 continue | 1713 continue |
1714 | 1714 |
1715 async def _callNodeCallbacks(self, client, event: pubsub.ItemsEvent) -> None: | 1715 async def _call_node_callbacks(self, client, event: pubsub.ItemsEvent) -> None: |
1716 """Call sequencially event callbacks of a node | 1716 """Call sequencially event callbacks of a node |
1717 | 1717 |
1718 Callbacks are called sequencially and not in parallel to be sure to respect | 1718 Callbacks are called sequencially and not in parallel to be sure to respect |
1719 priority (notably for plugin needing to get old items before they are modified or | 1719 priority (notably for plugin needing to get old items before they are modified or |
1720 deleted from cache). | 1720 deleted from cache). |
1721 """ | 1721 """ |
1722 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): | 1722 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS): |
1723 try: | 1723 try: |
1724 await utils.asDeferred(callback, client, event) | 1724 await utils.as_deferred(callback, client, event) |
1725 except Exception as e: | 1725 except Exception as e: |
1726 log.error( | 1726 log.error( |
1727 f"Error while running items event callback {callback}: {e}" | 1727 f"Error while running items event callback {callback}: {e}" |
1728 ) | 1728 ) |
1729 | 1729 |
1730 def itemsReceived(self, event): | 1730 def itemsReceived(self, event): |
1731 log.debug("Pubsub items received") | 1731 log.debug("Pubsub items received") |
1732 client = self.parent | 1732 client = self.parent |
1733 defer.ensureDeferred(self._callNodeCallbacks(client, event)) | 1733 defer.ensureDeferred(self._call_node_callbacks(client, event)) |
1734 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1734 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1735 raw_items = [i.toXml() for i in event.items] | 1735 raw_items = [i.toXml() for i in event.items] |
1736 self.host.bridge.psEventRaw( | 1736 self.host.bridge.ps_event_raw( |
1737 event.sender.full(), | 1737 event.sender.full(), |
1738 event.nodeIdentifier, | 1738 event.nodeIdentifier, |
1739 C.PS_ITEMS, | 1739 C.PS_ITEMS, |
1740 raw_items, | 1740 raw_items, |
1741 client.profile, | 1741 client.profile, |
1742 ) | 1742 ) |
1743 | 1743 |
1744 def deleteReceived(self, event): | 1744 def deleteReceived(self, event): |
1745 log.debug(("Publish node deleted")) | 1745 log.debug(("Publish node deleted")) |
1746 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): | 1746 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE): |
1747 d = utils.asDeferred(callback, self.parent, event) | 1747 d = utils.as_deferred(callback, self.parent, event) |
1748 d.addErrback(lambda f: log.error( | 1748 d.addErrback(lambda f: log.error( |
1749 f"Error while running delete event callback {callback}: {f}" | 1749 f"Error while running delete event callback {callback}: {f}" |
1750 )) | 1750 )) |
1751 client = self.parent | 1751 client = self.parent |
1752 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1752 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1753 self.host.bridge.psEventRaw( | 1753 self.host.bridge.ps_event_raw( |
1754 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile | 1754 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile |
1755 ) | 1755 ) |
1756 | 1756 |
1757 def purgeReceived(self, event): | 1757 def purgeReceived(self, event): |
1758 log.debug(("Publish node purged")) | 1758 log.debug(("Publish node purged")) |
1759 for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_PURGE): | 1759 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE): |
1760 d = utils.asDeferred(callback, self.parent, event) | 1760 d = utils.as_deferred(callback, self.parent, event) |
1761 d.addErrback(lambda f: log.error( | 1761 d.addErrback(lambda f: log.error( |
1762 f"Error while running purge event callback {callback}: {f}" | 1762 f"Error while running purge event callback {callback}: {f}" |
1763 )) | 1763 )) |
1764 client = self.parent | 1764 client = self.parent |
1765 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | 1765 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: |
1766 self.host.bridge.psEventRaw( | 1766 self.host.bridge.ps_event_raw( |
1767 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile | 1767 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile |
1768 ) | 1768 ) |
1769 | 1769 |
1770 def subscriptions(self, service, nodeIdentifier, sender=None): | 1770 def subscriptions(self, service, nodeIdentifier, sender=None): |
1771 """Return the list of subscriptions to the given service and node. | 1771 """Return the list of subscriptions to the given service and node. |
1796 subs.append(subscription) | 1796 subs.append(subscription) |
1797 return subs | 1797 return subs |
1798 | 1798 |
1799 return d.addCallback(cb) | 1799 return d.addCallback(cb) |
1800 | 1800 |
1801 def purgeNode(self, service, nodeIdentifier): | 1801 def purge_node(self, service, nodeIdentifier): |
1802 """Purge a node (i.e. delete all items from it) | 1802 """Purge a node (i.e. delete all items from it) |
1803 | 1803 |
1804 @param service(jid.JID, None): service to send the item to | 1804 @param service(jid.JID, None): service to send the item to |
1805 None to use PEP | 1805 None to use PEP |
1806 @param NodeIdentifier(unicode): PubSub node to use | 1806 @param NodeIdentifier(unicode): PubSub node to use |