Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0060.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0060.py@524856bd7b19 |
children | 087902fbb77a |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # SàT plugin for Publish-Subscribe (xep-0060) | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 | |
20 from collections import namedtuple | |
21 from functools import reduce | |
22 from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union | |
23 import urllib.error | |
24 import urllib.parse | |
25 import urllib.request | |
26 | |
27 from twisted.internet import defer, reactor | |
28 from twisted.words.protocols.jabber import error, jid | |
29 from twisted.words.xish import domish | |
30 from wokkel import disco | |
31 from wokkel import data_form | |
32 from wokkel import pubsub | |
33 from wokkel import rsm | |
34 from wokkel import mam | |
35 from zope.interface import implementer | |
36 | |
37 from libervia.backend.core import exceptions | |
38 from libervia.backend.core.constants import Const as C | |
39 from libervia.backend.core.core_types import SatXMPPEntity | |
40 from libervia.backend.core.i18n import _ | |
41 from libervia.backend.core.log import getLogger | |
42 from libervia.backend.core.xmpp import SatXMPPClient | |
43 from libervia.backend.tools import utils | |
44 from libervia.backend.tools import sat_defer | |
45 from libervia.backend.tools import xml_tools | |
46 from libervia.backend.tools.common import data_format | |
47 | |
48 | |
49 log = getLogger(__name__) | |
50 | |
51 PLUGIN_INFO = { | |
52 C.PI_NAME: "Publish-Subscribe", | |
53 C.PI_IMPORT_NAME: "XEP-0060", | |
54 C.PI_TYPE: "XEP", | |
55 C.PI_MODES: C.PLUG_MODE_BOTH, | |
56 C.PI_PROTOCOLS: ["XEP-0060"], | |
57 C.PI_DEPENDENCIES: [], | |
58 C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"], | |
59 C.PI_MAIN: "XEP_0060", | |
60 C.PI_HANDLER: "yes", | |
61 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""), | |
62 } | |
63 | |
64 UNSPECIFIED = "unspecified error" | |
65 | |
66 | |
67 Extra = namedtuple("Extra", ("rsm_request", "extra")) | |
68 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None | |
69 # extra is a potentially empty dict | |
70 TIMEOUT = 30 | |
71 # minimum features that a pubsub service must have to be selectable as default | |
72 DEFAULT_PUBSUB_MIN_FEAT = { | |
73 'http://jabber.org/protocol/pubsub#persistent-items', | |
74 'http://jabber.org/protocol/pubsub#publish', | |
75 'http://jabber.org/protocol/pubsub#retract-items', | |
76 } | |
77 | |
78 class XEP_0060(object): | |
79 OPT_ACCESS_MODEL = "pubsub#access_model" | |
80 OPT_PERSIST_ITEMS = "pubsub#persist_items" | |
81 OPT_MAX_ITEMS = "pubsub#max_items" | |
82 OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads" | |
83 OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe" | |
84 OPT_NODE_TYPE = "pubsub#node_type" | |
85 OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type" | |
86 OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth" | |
87 OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed" | |
88 OPT_PUBLISH_MODEL = "pubsub#publish_model" | |
89 OPT_OVERWRITE_POLICY = "pubsub#overwrite_policy" | |
90 ACCESS_OPEN = "open" | |
91 ACCESS_PRESENCE = "presence" | |
92 ACCESS_ROSTER = "roster" | |
93 ACCESS_PUBLISHER_ROSTER = "publisher-roster" | |
94 ACCESS_AUTHORIZE = "authorize" | |
95 ACCESS_WHITELIST = "whitelist" | |
96 PUBLISH_MODEL_PUBLISHERS = "publishers" | |
97 PUBLISH_MODEL_SUBSCRIBERS = "subscribers" | |
98 PUBLISH_MODEL_OPEN = "open" | |
99 OWPOL_ORIGINAL = "original_publisher" | |
100 OWPOL_ANY_PUB = "any_publisher" | |
101 ID_SINGLETON = "current" | |
102 EXTRA_PUBLISH_OPTIONS = "publish_options" | |
103 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" | |
104 # extra disco needed for RSM, cf. XEP-0060 § 6.5.4 | |
105 DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm" | |
106 | |
107 def __init__(self, host): | |
108 log.info(_("PubSub plugin initialization")) | |
109 self.host = host | |
110 self._rsm = host.plugins.get("XEP-0059") | |
111 self._mam = host.plugins.get("XEP-0313") | |
112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) | |
113 self.rt_sessions = sat_defer.RTDeferredSessions() | |
114 host.bridge.add_method( | |
115 "ps_node_create", | |
116 ".plugin", | |
117 in_sign="ssa{ss}s", | |
118 out_sign="s", | |
119 method=self._create_node, | |
120 async_=True, | |
121 ) | |
122 host.bridge.add_method( | |
123 "ps_node_configuration_get", | |
124 ".plugin", | |
125 in_sign="sss", | |
126 out_sign="a{ss}", | |
127 method=self._get_node_configuration, | |
128 async_=True, | |
129 ) | |
130 host.bridge.add_method( | |
131 "ps_node_configuration_set", | |
132 ".plugin", | |
133 in_sign="ssa{ss}s", | |
134 out_sign="", | |
135 method=self._set_node_configuration, | |
136 async_=True, | |
137 ) | |
138 host.bridge.add_method( | |
139 "ps_node_affiliations_get", | |
140 ".plugin", | |
141 in_sign="sss", | |
142 out_sign="a{ss}", | |
143 method=self._get_node_affiliations, | |
144 async_=True, | |
145 ) | |
146 host.bridge.add_method( | |
147 "ps_node_affiliations_set", | |
148 ".plugin", | |
149 in_sign="ssa{ss}s", | |
150 out_sign="", | |
151 method=self._set_node_affiliations, | |
152 async_=True, | |
153 ) | |
154 host.bridge.add_method( | |
155 "ps_node_subscriptions_get", | |
156 ".plugin", | |
157 in_sign="sss", | |
158 out_sign="a{ss}", | |
159 method=self._get_node_subscriptions, | |
160 async_=True, | |
161 ) | |
162 host.bridge.add_method( | |
163 "ps_node_subscriptions_set", | |
164 ".plugin", | |
165 in_sign="ssa{ss}s", | |
166 out_sign="", | |
167 method=self._set_node_subscriptions, | |
168 async_=True, | |
169 ) | |
170 host.bridge.add_method( | |
171 "ps_node_purge", | |
172 ".plugin", | |
173 in_sign="sss", | |
174 out_sign="", | |
175 method=self._purge_node, | |
176 async_=True, | |
177 ) | |
178 host.bridge.add_method( | |
179 "ps_node_delete", | |
180 ".plugin", | |
181 in_sign="sss", | |
182 out_sign="", | |
183 method=self._delete_node, | |
184 async_=True, | |
185 ) | |
186 host.bridge.add_method( | |
187 "ps_node_watch_add", | |
188 ".plugin", | |
189 in_sign="sss", | |
190 out_sign="", | |
191 method=self._addWatch, | |
192 async_=False, | |
193 ) | |
194 host.bridge.add_method( | |
195 "ps_node_watch_remove", | |
196 ".plugin", | |
197 in_sign="sss", | |
198 out_sign="", | |
199 method=self._remove_watch, | |
200 async_=False, | |
201 ) | |
202 host.bridge.add_method( | |
203 "ps_affiliations_get", | |
204 ".plugin", | |
205 in_sign="sss", | |
206 out_sign="a{ss}", | |
207 method=self._get_affiliations, | |
208 async_=True, | |
209 ) | |
210 host.bridge.add_method( | |
211 "ps_items_get", | |
212 ".plugin", | |
213 in_sign="ssiassss", | |
214 out_sign="s", | |
215 method=self._get_items, | |
216 async_=True, | |
217 ) | |
218 host.bridge.add_method( | |
219 "ps_item_send", | |
220 ".plugin", | |
221 in_sign="ssssss", | |
222 out_sign="s", | |
223 method=self._send_item, | |
224 async_=True, | |
225 ) | |
226 host.bridge.add_method( | |
227 "ps_items_send", | |
228 ".plugin", | |
229 in_sign="ssasss", | |
230 out_sign="as", | |
231 method=self._send_items, | |
232 async_=True, | |
233 ) | |
234 host.bridge.add_method( | |
235 "ps_item_retract", | |
236 ".plugin", | |
237 in_sign="sssbs", | |
238 out_sign="", | |
239 method=self._retract_item, | |
240 async_=True, | |
241 ) | |
242 host.bridge.add_method( | |
243 "ps_items_retract", | |
244 ".plugin", | |
245 in_sign="ssasbs", | |
246 out_sign="", | |
247 method=self._retract_items, | |
248 async_=True, | |
249 ) | |
250 host.bridge.add_method( | |
251 "ps_item_rename", | |
252 ".plugin", | |
253 in_sign="sssss", | |
254 out_sign="", | |
255 method=self._rename_item, | |
256 async_=True, | |
257 ) | |
258 host.bridge.add_method( | |
259 "ps_subscribe", | |
260 ".plugin", | |
261 in_sign="ssss", | |
262 out_sign="s", | |
263 method=self._subscribe, | |
264 async_=True, | |
265 ) | |
266 host.bridge.add_method( | |
267 "ps_unsubscribe", | |
268 ".plugin", | |
269 in_sign="sss", | |
270 out_sign="", | |
271 method=self._unsubscribe, | |
272 async_=True, | |
273 ) | |
274 host.bridge.add_method( | |
275 "ps_subscriptions_get", | |
276 ".plugin", | |
277 in_sign="sss", | |
278 out_sign="s", | |
279 method=self._subscriptions, | |
280 async_=True, | |
281 ) | |
282 host.bridge.add_method( | |
283 "ps_subscribe_to_many", | |
284 ".plugin", | |
285 in_sign="a(ss)sa{ss}s", | |
286 out_sign="s", | |
287 method=self._subscribe_to_many, | |
288 ) | |
289 host.bridge.add_method( | |
290 "ps_get_subscribe_rt_result", | |
291 ".plugin", | |
292 in_sign="ss", | |
293 out_sign="(ua(sss))", | |
294 method=self._many_subscribe_rt_result, | |
295 async_=True, | |
296 ) | |
297 host.bridge.add_method( | |
298 "ps_get_from_many", | |
299 ".plugin", | |
300 in_sign="a(ss)iss", | |
301 out_sign="s", | |
302 method=self._get_from_many, | |
303 ) | |
304 host.bridge.add_method( | |
305 "ps_get_from_many_rt_result", | |
306 ".plugin", | |
307 in_sign="ss", | |
308 out_sign="(ua(sssasa{ss}))", | |
309 method=self._get_from_many_rt_result, | |
310 async_=True, | |
311 ) | |
312 | |
313 # high level observer method | |
314 host.bridge.add_signal( | |
315 "ps_event", ".plugin", signature="ssssss" | |
316 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile | |
317 | |
318 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) | |
319 host.bridge.add_signal( | |
320 "ps_event_raw", ".plugin", signature="sssass" | |
321 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile | |
322 | |
323 def get_handler(self, client): | |
324 client.pubsub_client = SatPubSubClient(self.host, self) | |
325 return client.pubsub_client | |
326 | |
327 async def profile_connected(self, client): | |
328 client.pubsub_watching = set() | |
329 try: | |
330 client.pubsub_service = jid.JID( | |
331 self.host.memory.config_get("", "pubsub_service") | |
332 ) | |
333 except RuntimeError: | |
334 log.info( | |
335 _( | |
336 "Can't retrieve pubsub_service from conf, we'll use first one that " | |
337 "we find" | |
338 ) | |
339 ) | |
340 pubsub_services = await self.host.find_service_entities( | |
341 client, "pubsub", "service" | |
342 ) | |
343 for service_jid in pubsub_services: | |
344 infos = await self.host.memory.disco.get_infos(client, service_jid) | |
345 if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features): | |
346 continue | |
347 names = {(n or "").lower() for n in infos.identities.values()} | |
348 if "libervia pubsub service" in names: | |
349 # this is the name of Libervia's side project pubsub service, we know | |
350 # that it is a suitable default pubsub service | |
351 client.pubsub_service = service_jid | |
352 break | |
353 categories = {(i[0] or "").lower() for i in infos.identities.keys()} | |
354 if "gateway" in categories or "gateway" in names: | |
355 # we don't want to use a gateway as default pubsub service | |
356 continue | |
357 if "jabber:iq:register" in infos.features: | |
358 # may be present on gateways, and we don't want a service | |
359 # where registration is needed | |
360 continue | |
361 client.pubsub_service = service_jid | |
362 break | |
363 else: | |
364 client.pubsub_service = None | |
365 pubsub_service_str = ( | |
366 client.pubsub_service.full() if client.pubsub_service else "PEP" | |
367 ) | |
368 log.info(f"default pubsub service: {pubsub_service_str}") | |
369 | |
370 def features_get(self, profile): | |
371 try: | |
372 client = self.host.get_client(profile) | |
373 except exceptions.ProfileNotSetError: | |
374 return {} | |
375 try: | |
376 return { | |
377 "service": client.pubsub_service.full() | |
378 if client.pubsub_service is not None | |
379 else "" | |
380 } | |
381 except AttributeError: | |
382 if self.host.is_connected(profile): | |
383 log.debug("Profile is not connected, service is not checked yet") | |
384 else: | |
385 log.error("Service should be available !") | |
386 return {} | |
387 | |
388 def parse_extra(self, extra): | |
389 """Parse extra dictionnary | |
390 | |
391 used bridge's extra dictionnaries | |
392 @param extra(dict): extra data used to configure request | |
393 @return(Extra): filled Extra instance | |
394 """ | |
395 if extra is None: | |
396 rsm_request = None | |
397 extra = {} | |
398 else: | |
399 # order-by | |
400 if C.KEY_ORDER_BY in extra: | |
401 # FIXME: we temporarily manage only one level of ordering | |
402 # we need to switch to a fully serialised extra data | |
403 # to be able to encode a whole ordered list | |
404 extra[C.KEY_ORDER_BY] = [extra.pop(C.KEY_ORDER_BY)] | |
405 | |
406 # rsm | |
407 if self._rsm is None: | |
408 rsm_request = None | |
409 else: | |
410 rsm_request = self._rsm.parse_extra(extra) | |
411 | |
412 # mam | |
413 if self._mam is None: | |
414 mam_request = None | |
415 else: | |
416 mam_request = self._mam.parse_extra(extra, with_rsm=False) | |
417 | |
418 if mam_request is not None: | |
419 assert "mam" not in extra | |
420 extra["mam"] = mam_request | |
421 | |
422 return Extra(rsm_request, extra) | |
423 | |
424 def add_managed_node( | |
425 self, | |
426 node: str, | |
427 priority: int = 0, | |
428 **kwargs: Callable | |
429 ): | |
430 """Add a handler for a node | |
431 | |
432 @param node: node to monitor | |
433 all node *prefixed* with this one will be triggered | |
434 @param priority: priority of the callback. Callbacks with higher priority will be | |
435 called first. | |
436 @param **kwargs: method(s) to call when the node is found | |
437 the method must be named after PubSub constants in lower case | |
438 and suffixed with "_cb" | |
439 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE | |
440 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far | |
441 """ | |
442 assert node is not None | |
443 assert kwargs | |
444 callbacks = self._node_cb.setdefault(node, {}) | |
445 for event, cb in kwargs.items(): | |
446 event_name = event[:-3] | |
447 assert event_name in C.PS_EVENTS | |
448 cb_list = callbacks.setdefault(event_name, []) | |
449 cb_list.append((cb, priority)) | |
450 cb_list.sort(key=lambda c: c[1], reverse=True) | |
451 | |
452 def remove_managed_node(self, node, *args): | |
453 """Add a handler for a node | |
454 | |
455 @param node(unicode): node to monitor | |
456 @param *args: callback(s) to remove | |
457 """ | |
458 assert args | |
459 try: | |
460 registred_cb = self._node_cb[node] | |
461 except KeyError: | |
462 pass | |
463 else: | |
464 removed = False | |
465 for callback in args: | |
466 for event, cb_list in registred_cb.items(): | |
467 to_remove = [] | |
468 for cb in cb_list: | |
469 if cb[0] == callback: | |
470 to_remove.append(cb) | |
471 for cb in to_remove: | |
472 cb_list.remove(cb) | |
473 if not cb_list: | |
474 del registred_cb[event] | |
475 if not registred_cb: | |
476 del self._node_cb[node] | |
477 removed = True | |
478 break | |
479 | |
480 if not removed: | |
481 log.error( | |
482 f"Trying to remove inexistant callback {callback} for node {node}" | |
483 ) | |
484 | |
485 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): | |
486 # """Retrieve the name of the nodes that are accessible on the target service. | |
487 | |
488 # @param service (JID): target service | |
489 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) | |
490 # @param profile (str): %(doc_profile)s | |
491 # @return: deferred which fire a list of nodes | |
492 # """ | |
493 # client = self.host.get_client(profile) | |
494 # d = self.host.getDiscoItems(client, service, nodeIdentifier) | |
495 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) | |
496 # return d | |
497 | |
498 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): | |
499 # """Retrieve the name of the nodes to which the profile is subscribed on the target service. | |
500 | |
501 # @param service (JID): target service | |
502 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) | |
503 # @param filter_ (str): filter the result according to the given subscription type: | |
504 # - None: do not filter | |
505 # - 'pending': subscription has not been approved yet by the node owner | |
506 # - 'unconfigured': subscription options have not been configured yet | |
507 # - 'subscribed': subscription is complete | |
508 # @param profile (str): %(doc_profile)s | |
509 # @return: Deferred list[str] | |
510 # """ | |
511 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) | |
512 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) | |
513 # return d | |
514 | |
515 def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", | |
516 profile_key=C.PROF_KEY_NONE): | |
517 client = self.host.get_client(profile_key) | |
518 service = None if not service else jid.JID(service) | |
519 payload = xml_tools.parse(payload) | |
520 extra = data_format.deserialise(extra_ser) | |
521 d = defer.ensureDeferred(self.send_item( | |
522 client, service, nodeIdentifier, payload, item_id or None, extra | |
523 )) | |
524 d.addCallback(lambda ret: ret or "") | |
525 return d | |
526 | |
527 def _send_items(self, service, nodeIdentifier, items, extra_ser=None, | |
528 profile_key=C.PROF_KEY_NONE): | |
529 client = self.host.get_client(profile_key) | |
530 service = None if not service else jid.JID(service) | |
531 try: | |
532 items = [xml_tools.parse(item) for item in items] | |
533 except Exception as e: | |
534 raise exceptions.DataError(_("Can't parse items: {msg}").format( | |
535 msg=e)) | |
536 extra = data_format.deserialise(extra_ser) | |
537 return defer.ensureDeferred(self.send_items( | |
538 client, service, nodeIdentifier, items, extra=extra | |
539 )) | |
540 | |
541 async def send_item( | |
542 self, | |
543 client: SatXMPPClient, | |
544 service: Union[jid.JID, None], | |
545 nodeIdentifier: str, | |
546 payload: domish.Element, | |
547 item_id: Optional[str] = None, | |
548 extra: Optional[Dict[str, Any]] = None | |
549 ) -> Optional[str]: | |
550 """High level method to send one item | |
551 | |
552 @param service: service to send the item to None to use PEP | |
553 @param NodeIdentifier: PubSub node to use | |
554 @param payload: payload of the item to send | |
555 @param item_id: id to use or None to create one | |
556 @param extra: extra options | |
557 @return: id of the created item | |
558 """ | |
559 assert isinstance(payload, domish.Element) | |
560 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) | |
561 if item_id is not None: | |
562 item_elt['id'] = item_id | |
563 item_elt.addChild(payload) | |
564 published_ids = await self.send_items( | |
565 client, | |
566 service, | |
567 nodeIdentifier, | |
568 [item_elt], | |
569 extra=extra | |
570 ) | |
571 try: | |
572 return published_ids[0] | |
573 except IndexError: | |
574 return item_id | |
575 | |
576 async def send_items( | |
577 self, | |
578 client: SatXMPPEntity, | |
579 service: Optional[jid.JID], | |
580 nodeIdentifier: str, | |
581 items: List[domish.Element], | |
582 sender: Optional[jid.JID] = None, | |
583 extra: Optional[Dict[str, Any]] = None | |
584 ) -> List[str]: | |
585 """High level method to send several items at once | |
586 | |
587 @param service: service to send the item to | |
588 None to use PEP | |
589 @param NodeIdentifier: PubSub node to use | |
590 @param items: whole item elements to send, | |
591 "id" will be used if set | |
592 @param extra: extra options. Key can be: | |
593 - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5 | |
594 the dict maps option name to value(s) | |
595 - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is | |
596 failing du to failing precondition. Value can be: | |
597 * raise (default): raise the exception | |
598 * publish_without_options: re-publish without the publish-options. | |
599 A warning will be logged showing that the publish-options could not | |
600 be used | |
601 @return: ids of the created items | |
602 """ | |
603 if extra is None: | |
604 extra = {} | |
605 if service is None: | |
606 service = client.jid.userhostJID() | |
607 parsed_items = [] | |
608 for item in items: | |
609 if item.name != 'item': | |
610 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) | |
611 item_id = item.getAttribute("id") | |
612 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) | |
613 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) | |
614 try: | |
615 iq_result = await self.publish( | |
616 client, service, nodeIdentifier, parsed_items, options=publish_options, | |
617 sender=sender | |
618 ) | |
619 except error.StanzaError as e: | |
620 if ((e.condition == 'conflict' and e.appCondition | |
621 and e.appCondition.name == 'precondition-not-met' | |
622 and publish_options is not None)): | |
623 # this usually happens when publish-options can't be set | |
624 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise') | |
625 if policy == 'raise': | |
626 raise e | |
627 elif policy == 'publish_without_options': | |
628 log.warning(_( | |
629 "Can't use publish-options ({options}) on node {node}, " | |
630 "re-publishing without them: {reason}").format( | |
631 options=', '.join(f'{k} = {v}' | |
632 for k,v in publish_options.items()), | |
633 node=nodeIdentifier, | |
634 reason=e, | |
635 ) | |
636 ) | |
637 iq_result = await self.publish( | |
638 client, service, nodeIdentifier, parsed_items) | |
639 else: | |
640 raise exceptions.InternalError( | |
641 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " | |
642 f"{policy}" | |
643 ) | |
644 else: | |
645 raise e | |
646 try: | |
647 return [ | |
648 item['id'] | |
649 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item') | |
650 ] | |
651 except AttributeError: | |
652 return [] | |
653 | |
654 async def publish( | |
655 self, | |
656 client: SatXMPPEntity, | |
657 service: jid.JID, | |
658 nodeIdentifier: str, | |
659 items: Optional[List[domish.Element]] = None, | |
660 options: Optional[dict] = None, | |
661 sender: Optional[jid.JID] = None, | |
662 extra: Optional[Dict[str, Any]] = None | |
663 ) -> domish.Element: | |
664 """Publish pubsub items | |
665 | |
666 @param sender: sender of the request, | |
667 client.jid will be used if nto set | |
668 @param extra: extra data | |
669 not used directly by ``publish``, but may be used in triggers | |
670 @return: IQ result stanza | |
671 @trigger XEP-0060_publish: called just before publication. | |
672 if it returns False, extra must have a "iq_result_elt" key set with | |
673 domish.Element to return. | |
674 """ | |
675 if sender is None: | |
676 sender = client.jid | |
677 if extra is None: | |
678 extra = {} | |
679 if not await self.host.trigger.async_point( | |
680 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, | |
681 extra | |
682 ): | |
683 return extra["iq_result_elt"] | |
684 iq_result_elt = await client.pubsub_client.publish( | |
685 service, nodeIdentifier, items, sender, | |
686 options=options | |
687 ) | |
688 return iq_result_elt | |
689 | |
690 def _unwrap_mam_message(self, message_elt): | |
691 try: | |
692 item_elt = reduce( | |
693 lambda elt, ns_name: next(elt.elements(*ns_name)), | |
694 (message_elt, | |
695 (mam.NS_MAM, "result"), | |
696 (C.NS_FORWARD, "forwarded"), | |
697 (C.NS_CLIENT, "message"), | |
698 ("http://jabber.org/protocol/pubsub#event", "event"), | |
699 ("http://jabber.org/protocol/pubsub#event", "items"), | |
700 ("http://jabber.org/protocol/pubsub#event", "item"), | |
701 )) | |
702 except StopIteration: | |
703 raise exceptions.DataError("Can't find Item in MAM message element") | |
704 return item_elt | |
705 | |
706 def serialise_items(self, items_data): | |
707 items, metadata = items_data | |
708 metadata['items'] = items | |
709 return data_format.serialise(metadata) | |
710 | |
711 def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None, | |
712 extra="", profile_key=C.PROF_KEY_NONE): | |
713 """Get items from pubsub node | |
714 | |
715 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | |
716 """ | |
717 client = self.host.get_client(profile_key) | |
718 service = jid.JID(service) if service else None | |
719 max_items = None if max_items == C.NO_LIMIT else max_items | |
720 extra = self.parse_extra(data_format.deserialise(extra)) | |
721 d = defer.ensureDeferred(self.get_items( | |
722 client, | |
723 service, | |
724 node, | |
725 max_items, | |
726 item_ids, | |
727 sub_id or None, | |
728 extra.rsm_request, | |
729 extra.extra, | |
730 )) | |
731 d.addCallback(self.trans_items_data) | |
732 d.addCallback(self.serialise_items) | |
733 return d | |
734 | |
735 async def get_items( | |
736 self, | |
737 client: SatXMPPEntity, | |
738 service: Optional[jid.JID], | |
739 node: str, | |
740 max_items: Optional[int] = None, | |
741 item_ids: Optional[List[str]] = None, | |
742 sub_id: Optional[str] = None, | |
743 rsm_request: Optional[rsm.RSMRequest] = None, | |
744 extra: Optional[dict] = None | |
745 ) -> Tuple[List[dict], dict]: | |
746 """Retrieve pubsub items from a node. | |
747 | |
748 @param service (JID, None): pubsub service. | |
749 @param node (str): node id. | |
750 @param max_items (int): optional limit on the number of retrieved items. | |
751 @param item_ids (list[str]): identifiers of the items to be retrieved (can't be | |
752 used with rsm_request). If requested items don't exist, they won't be | |
753 returned, meaning that we can have an empty list as result (NotFound | |
754 exception is NOT raised). | |
755 @param sub_id (str): optional subscription identifier. | |
756 @param rsm_request (rsm.RSMRequest): RSM request data | |
757 @return: a deferred couple (list[dict], dict) containing: | |
758 - list of items | |
759 - metadata with the following keys: | |
760 - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index | |
761 value of RSMResponse | |
762 - service, node: service and node used | |
763 """ | |
764 if item_ids and max_items is not None: | |
765 max_items = None | |
766 if rsm_request and item_ids: | |
767 raise ValueError("items_id can't be used with rsm") | |
768 if extra is None: | |
769 extra = {} | |
770 cont, ret = await self.host.trigger.async_return_point( | |
771 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, | |
772 rsm_request, extra | |
773 ) | |
774 if not cont: | |
775 return ret | |
776 try: | |
777 mam_query = extra["mam"] | |
778 except KeyError: | |
779 d = defer.ensureDeferred(client.pubsub_client.items( | |
780 service = service, | |
781 nodeIdentifier = node, | |
782 maxItems = max_items, | |
783 subscriptionIdentifier = sub_id, | |
784 sender = None, | |
785 itemIdentifiers = item_ids, | |
786 orderBy = extra.get(C.KEY_ORDER_BY), | |
787 rsm_request = rsm_request, | |
788 extra = extra | |
789 )) | |
790 # we have no MAM data here, so we add None | |
791 d.addErrback(sat_defer.stanza_2_not_found) | |
792 d.addTimeout(TIMEOUT, reactor) | |
793 items, rsm_response = await d | |
794 mam_response = None | |
795 else: | |
796 # if mam is requested, we have to do a totally different query | |
797 if self._mam is None: | |
798 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") | |
799 if max_items is not None: | |
800 raise exceptions.DataError("max_items parameter can't be used with MAM") | |
801 if item_ids: | |
802 raise exceptions.DataError("items_ids parameter can't be used with MAM") | |
803 if mam_query.node is None: | |
804 mam_query.node = node | |
805 elif mam_query.node != node: | |
806 raise exceptions.DataError( | |
807 "MAM query node is incoherent with get_items's node" | |
808 ) | |
809 if mam_query.rsm is None: | |
810 mam_query.rsm = rsm_request | |
811 else: | |
812 if mam_query.rsm != rsm_request: | |
813 raise exceptions.DataError( | |
814 "Conflict between RSM request and MAM's RSM request" | |
815 ) | |
816 items, rsm_response, mam_response = await self._mam.get_archives( | |
817 client, mam_query, service, self._unwrap_mam_message | |
818 ) | |
819 | |
820 try: | |
821 subscribe = C.bool(extra["subscribe"]) | |
822 except KeyError: | |
823 subscribe = False | |
824 | |
825 if subscribe: | |
826 try: | |
827 await self.subscribe(client, service, node) | |
828 except error.StanzaError as e: | |
829 log.warning( | |
830 f"Could not subscribe to node {node} on service {service}: {e}" | |
831 ) | |
832 | |
833 # TODO: handle mam_response | |
834 service_jid = service if service else client.jid.userhostJID() | |
835 metadata = { | |
836 "service": service_jid, | |
837 "node": node, | |
838 "uri": self.get_node_uri(service_jid, node), | |
839 } | |
840 if mam_response is not None: | |
841 # mam_response is a dict with "complete" and "stable" keys | |
842 # we can put them directly in metadata | |
843 metadata.update(mam_response) | |
844 if rsm_request is not None and rsm_response is not None: | |
845 metadata['rsm'] = rsm_response.toDict() | |
846 if mam_response is None: | |
847 index = rsm_response.index | |
848 count = rsm_response.count | |
849 if index is None or count is None: | |
850 # we don't have enough information to know if the data is complete | |
851 # or not | |
852 metadata["complete"] = None | |
853 else: | |
854 # normally we have a strict equality here but XEP-0059 states | |
855 # that index MAY be approximative, so just in case… | |
856 metadata["complete"] = index + len(items) >= count | |
857 # encrypted metadata can be added by plugins in XEP-0060_items trigger | |
858 if "encrypted" in extra: | |
859 metadata["encrypted"] = extra["encrypted"] | |
860 | |
861 return (items, metadata) | |
862 | |
863 # @defer.inlineCallbacks | |
864 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): | |
865 # """Massively retrieve pubsub items from many nodes. | |
866 # @param service (JID): target service. | |
867 # @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. | |
868 # @param max_items (int): optional limit on the number of retrieved items *per node*. | |
869 # @param sub_id (str): optional subscription identifier. | |
870 # @param rsm (dict): RSM request data | |
871 # @param profile_key (str): %(doc_profile_key)s | |
872 # @return: a deferred dict with: | |
873 # - key: a value in (a subset of) data.keys() | |
874 # - couple (list[dict], dict) containing: | |
875 # - list of items | |
876 # - RSM response data | |
877 # """ | |
878 # client = self.host.get_client(profile_key) | |
879 # found_nodes = yield self.listNodes(service, profile=client.profile) | |
880 # d_dict = {} | |
881 # for publisher, node in data.items(): | |
882 # if node not in found_nodes: | |
883 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) | |
884 # continue # avoid pubsub "item-not-found" error | |
885 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile) | |
886 # defer.returnValue(d_dict) | |
887 | |
888 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, | |
889 profile_key=C.PROF_KEY_NONE): | |
890 client = self.host.get_client(profile_key) | |
891 return client.pubsub_client.getOptions( | |
892 service, nodeIdentifier, subscriber, subscriptionIdentifier | |
893 ) | |
894 | |
895 def setOptions(self, service, nodeIdentifier, subscriber, options, | |
896 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): | |
897 client = self.host.get_client(profile_key) | |
898 return client.pubsub_client.setOptions( | |
899 service, nodeIdentifier, subscriber, options, subscriptionIdentifier | |
900 ) | |
901 | |
902 def _create_node(self, service_s, nodeIdentifier, options, profile_key): | |
903 client = self.host.get_client(profile_key) | |
904 return self.createNode( | |
905 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options | |
906 ) | |
907 | |
908 def createNode( | |
909 self, | |
910 client: SatXMPPClient, | |
911 service: jid.JID, | |
912 nodeIdentifier: Optional[str] = None, | |
913 options: Optional[Dict[str, str]] = None | |
914 ) -> str: | |
915 """Create a new node | |
916 | |
917 @param service: PubSub service, | |
918 @param NodeIdentifier: node name use None to create instant node (identifier will | |
919 be returned by this method) | |
920 @param option: node configuration options | |
921 @return: identifier of the created node (may be different from requested name) | |
922 """ | |
923 # TODO: if pubsub service doesn't hande publish-options, configure it in a second time | |
924 return client.pubsub_client.createNode(service, nodeIdentifier, options) | |
925 | |
926 @defer.inlineCallbacks | |
927 def create_if_new_node(self, client, service, nodeIdentifier, options=None): | |
928 """Helper method similar to createNode, but will not fail in case of conflict""" | |
929 try: | |
930 yield self.createNode(client, service, nodeIdentifier, options) | |
931 except error.StanzaError as e: | |
932 if e.condition == "conflict": | |
933 pass | |
934 else: | |
935 raise e | |
936 | |
937 def _get_node_configuration(self, service_s, nodeIdentifier, profile_key): | |
938 client = self.host.get_client(profile_key) | |
939 d = self.getConfiguration( | |
940 client, jid.JID(service_s) if service_s else None, nodeIdentifier | |
941 ) | |
942 | |
943 def serialize(form): | |
944 # FIXME: better more generic dataform serialisation should be available in SàT | |
945 return {f.var: str(f.value) for f in list(form.fields.values())} | |
946 | |
947 d.addCallback(serialize) | |
948 return d | |
949 | |
950 def getConfiguration(self, client, service, nodeIdentifier): | |
951 request = pubsub.PubSubRequest("configureGet") | |
952 request.recipient = service | |
953 request.nodeIdentifier = nodeIdentifier | |
954 | |
955 def cb(iq): | |
956 form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG) | |
957 form.typeCheck() | |
958 return form | |
959 | |
960 d = request.send(client.xmlstream) | |
961 d.addCallback(cb) | |
962 return d | |
963 | |
964 def make_configuration_form(self, options: dict) -> data_form.Form: | |
965 """Build a configuration form""" | |
966 form = data_form.Form( | |
967 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG | |
968 ) | |
969 form.makeFields(options) | |
970 return form | |
971 | |
972 def _set_node_configuration(self, service_s, nodeIdentifier, options, profile_key): | |
973 client = self.host.get_client(profile_key) | |
974 d = self.setConfiguration( | |
975 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options | |
976 ) | |
977 return d | |
978 | |
979 def setConfiguration(self, client, service, nodeIdentifier, options): | |
980 request = pubsub.PubSubRequest("configureSet") | |
981 request.recipient = service | |
982 request.nodeIdentifier = nodeIdentifier | |
983 | |
984 form = self.make_configuration_form(options) | |
985 request.options = form | |
986 | |
987 d = request.send(client.xmlstream) | |
988 return d | |
989 | |
990 def _get_affiliations(self, service_s, nodeIdentifier, profile_key): | |
991 client = self.host.get_client(profile_key) | |
992 d = self.get_affiliations( | |
993 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None | |
994 ) | |
995 return d | |
996 | |
997 def get_affiliations(self, client, service, nodeIdentifier=None): | |
998 """Retrieve affiliations of an entity | |
999 | |
1000 @param nodeIdentifier(unicode, None): node to get affiliation from | |
1001 None to get all nodes affiliations for this service | |
1002 """ | |
1003 request = pubsub.PubSubRequest("affiliations") | |
1004 request.recipient = service | |
1005 request.nodeIdentifier = nodeIdentifier | |
1006 | |
1007 def cb(iq_elt): | |
1008 try: | |
1009 affiliations_elt = next( | |
1010 iq_elt.pubsub.elements(pubsub.NS_PUBSUB, "affiliations") | |
1011 ) | |
1012 except StopIteration: | |
1013 raise ValueError( | |
1014 _("Invalid result: missing <affiliations> element: {}").format( | |
1015 iq_elt.toXml | |
1016 ) | |
1017 ) | |
1018 try: | |
1019 return { | |
1020 e["node"]: e["affiliation"] | |
1021 for e in affiliations_elt.elements(pubsub.NS_PUBSUB, "affiliation") | |
1022 } | |
1023 except KeyError: | |
1024 raise ValueError( | |
1025 _("Invalid result: bad <affiliation> element: {}").format( | |
1026 iq_elt.toXml | |
1027 ) | |
1028 ) | |
1029 | |
1030 d = request.send(client.xmlstream) | |
1031 d.addCallback(cb) | |
1032 return d | |
1033 | |
1034 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key): | |
1035 client = self.host.get_client(profile_key) | |
1036 d = self.get_node_affiliations( | |
1037 client, jid.JID(service_s) if service_s else None, nodeIdentifier | |
1038 ) | |
1039 d.addCallback( | |
1040 lambda affiliations: {j.full(): a for j, a in affiliations.items()} | |
1041 ) | |
1042 return d | |
1043 | |
1044 def get_node_affiliations(self, client, service, nodeIdentifier): | |
1045 """Retrieve affiliations of a node owned by profile""" | |
1046 request = pubsub.PubSubRequest("affiliationsGet") | |
1047 request.recipient = service | |
1048 request.nodeIdentifier = nodeIdentifier | |
1049 | |
1050 def cb(iq_elt): | |
1051 try: | |
1052 affiliations_elt = next( | |
1053 iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "affiliations") | |
1054 ) | |
1055 except StopIteration: | |
1056 raise ValueError( | |
1057 _("Invalid result: missing <affiliations> element: {}").format( | |
1058 iq_elt.toXml | |
1059 ) | |
1060 ) | |
1061 try: | |
1062 return { | |
1063 jid.JID(e["jid"]): e["affiliation"] | |
1064 for e in affiliations_elt.elements( | |
1065 (pubsub.NS_PUBSUB_OWNER, "affiliation") | |
1066 ) | |
1067 } | |
1068 except KeyError: | |
1069 raise ValueError( | |
1070 _("Invalid result: bad <affiliation> element: {}").format( | |
1071 iq_elt.toXml | |
1072 ) | |
1073 ) | |
1074 | |
1075 d = request.send(client.xmlstream) | |
1076 d.addCallback(cb) | |
1077 return d | |
1078 | |
1079 def _set_node_affiliations( | |
1080 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE | |
1081 ): | |
1082 client = self.host.get_client(profile_key) | |
1083 affiliations = { | |
1084 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items() | |
1085 } | |
1086 d = self.set_node_affiliations( | |
1087 client, | |
1088 jid.JID(service_s) if service_s else None, | |
1089 nodeIdentifier, | |
1090 affiliations, | |
1091 ) | |
1092 return d | |
1093 | |
1094 def set_node_affiliations(self, client, service, nodeIdentifier, affiliations): | |
1095 """Update affiliations of a node owned by profile | |
1096 | |
1097 @param affiliations(dict[jid.JID, unicode]): affiliations to set | |
1098 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations | |
1099 """ | |
1100 request = pubsub.PubSubRequest("affiliationsSet") | |
1101 request.recipient = service | |
1102 request.nodeIdentifier = nodeIdentifier | |
1103 request.affiliations = affiliations | |
1104 d = request.send(client.xmlstream) | |
1105 return d | |
1106 | |
1107 def _purge_node(self, service_s, nodeIdentifier, profile_key): | |
1108 client = self.host.get_client(profile_key) | |
1109 return self.purge_node( | |
1110 client, jid.JID(service_s) if service_s else None, nodeIdentifier | |
1111 ) | |
1112 | |
1113 def purge_node(self, client, service, nodeIdentifier): | |
1114 return client.pubsub_client.purge_node(service, nodeIdentifier) | |
1115 | |
1116 def _delete_node(self, service_s, nodeIdentifier, profile_key): | |
1117 client = self.host.get_client(profile_key) | |
1118 return self.deleteNode( | |
1119 client, jid.JID(service_s) if service_s else None, nodeIdentifier | |
1120 ) | |
1121 | |
1122 def deleteNode( | |
1123 self, | |
1124 client: SatXMPPClient, | |
1125 service: jid.JID, | |
1126 nodeIdentifier: str | |
1127 ) -> defer.Deferred: | |
1128 return client.pubsub_client.deleteNode(service, nodeIdentifier) | |
1129 | |
1130 def _addWatch(self, service_s, node, profile_key): | |
1131 """watch modifications on a node | |
1132 | |
1133 This method should only be called from bridge | |
1134 """ | |
1135 client = self.host.get_client(profile_key) | |
1136 service = jid.JID(service_s) if service_s else client.jid.userhostJID() | |
1137 client.pubsub_watching.add((service, node)) | |
1138 | |
1139 def _remove_watch(self, service_s, node, profile_key): | |
1140 """remove a node watch | |
1141 | |
1142 This method should only be called from bridge | |
1143 """ | |
1144 client = self.host.get_client(profile_key) | |
1145 service = jid.JID(service_s) if service_s else client.jid.userhostJID() | |
1146 client.pubsub_watching.remove((service, node)) | |
1147 | |
1148 def _retract_item( | |
1149 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key | |
1150 ): | |
1151 return self._retract_items( | |
1152 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key | |
1153 ) | |
1154 | |
1155 def _retract_items( | |
1156 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key | |
1157 ): | |
1158 client = self.host.get_client(profile_key) | |
1159 return self.retract_items( | |
1160 client, | |
1161 jid.JID(service_s) if service_s else None, | |
1162 nodeIdentifier, | |
1163 itemIdentifiers, | |
1164 notify, | |
1165 ) | |
1166 | |
1167 def retract_items( | |
1168 self, | |
1169 client: SatXMPPClient, | |
1170 service: jid.JID, | |
1171 nodeIdentifier: str, | |
1172 itemIdentifiers: Iterable[str], | |
1173 notify: bool = True, | |
1174 ) -> defer.Deferred: | |
1175 return client.pubsub_client.retractItems( | |
1176 service, nodeIdentifier, itemIdentifiers, notify=notify | |
1177 ) | |
1178 | |
1179 def _rename_item( | |
1180 self, | |
1181 service, | |
1182 node, | |
1183 item_id, | |
1184 new_id, | |
1185 profile_key=C.PROF_KEY_NONE, | |
1186 ): | |
1187 client = self.host.get_client(profile_key) | |
1188 service = jid.JID(service) if service else None | |
1189 return defer.ensureDeferred(self.rename_item( | |
1190 client, service, node, item_id, new_id | |
1191 )) | |
1192 | |
1193 async def rename_item( | |
1194 self, | |
1195 client: SatXMPPEntity, | |
1196 service: Optional[jid.JID], | |
1197 node: str, | |
1198 item_id: str, | |
1199 new_id: str | |
1200 ) -> None: | |
1201 """Rename an item by recreating it then deleting it | |
1202 | |
1203 we have to recreate then delete because there is currently no rename operation | |
1204 with PubSub | |
1205 """ | |
1206 if not item_id or not new_id: | |
1207 raise ValueError("item_id and new_id must not be empty") | |
1208 # retract must be done last, so if something goes wrong, the exception will stop | |
1209 # the workflow and no accidental delete should happen | |
1210 item_elt = (await self.get_items(client, service, node, item_ids=[item_id]))[0][0] | |
1211 await self.send_item(client, service, node, item_elt.firstChildElement(), new_id) | |
1212 await self.retract_items(client, service, node, [item_id]) | |
1213 | |
1214 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): | |
1215 client = self.host.get_client(profile_key) | |
1216 service = None if not service else jid.JID(service) | |
1217 d = defer.ensureDeferred( | |
1218 self.subscribe( | |
1219 client, | |
1220 service, | |
1221 nodeIdentifier, | |
1222 options=data_format.deserialise(options) | |
1223 ) | |
1224 ) | |
1225 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") | |
1226 return d | |
1227 | |
1228 async def subscribe( | |
1229 self, | |
1230 client: SatXMPPEntity, | |
1231 service: Optional[jid.JID], | |
1232 nodeIdentifier: str, | |
1233 sub_jid: Optional[jid.JID] = None, | |
1234 options: Optional[dict] = None | |
1235 ) -> pubsub.Subscription: | |
1236 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe | |
1237 if service is None: | |
1238 service = client.jid.userhostJID() | |
1239 cont, trigger_sub = await self.host.trigger.async_return_point( | |
1240 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, | |
1241 ) | |
1242 if not cont: | |
1243 return trigger_sub | |
1244 try: | |
1245 subscription = await client.pubsub_client.subscribe( | |
1246 service, nodeIdentifier, sub_jid or client.jid.userhostJID(), | |
1247 options=options, sender=client.jid.userhostJID() | |
1248 ) | |
1249 except error.StanzaError as e: | |
1250 if e.condition == 'item-not-found': | |
1251 raise exceptions.NotFound(e.text or e.condition) | |
1252 else: | |
1253 raise e | |
1254 return subscription | |
1255 | |
1256 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): | |
1257 client = self.host.get_client(profile_key) | |
1258 service = None if not service else jid.JID(service) | |
1259 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier)) | |
1260 | |
1261 async def unsubscribe( | |
1262 self, | |
1263 client: SatXMPPEntity, | |
1264 service: jid.JID, | |
1265 nodeIdentifier: str, | |
1266 sub_jid: Optional[jid.JID] = None, | |
1267 subscriptionIdentifier: Optional[str] = None, | |
1268 sender: Optional[jid.JID] = None, | |
1269 ) -> None: | |
1270 if not await self.host.trigger.async_point( | |
1271 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, | |
1272 subscriptionIdentifier, sender | |
1273 ): | |
1274 return | |
1275 try: | |
1276 await client.pubsub_client.unsubscribe( | |
1277 service, | |
1278 nodeIdentifier, | |
1279 sub_jid or client.jid.userhostJID(), | |
1280 subscriptionIdentifier, | |
1281 sender, | |
1282 ) | |
1283 except error.StanzaError as e: | |
1284 try: | |
1285 next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed")) | |
1286 except StopIteration: | |
1287 raise e | |
1288 else: | |
1289 log.info( | |
1290 f"{sender.full() if sender else client.jid.full()} was not " | |
1291 f"subscribed to node {nodeIdentifier!s} at {service.full()}" | |
1292 ) | |
1293 | |
1294 @utils.ensure_deferred | |
1295 async def _subscriptions( | |
1296 self, | |
1297 service="", | |
1298 nodeIdentifier="", | |
1299 profile_key=C.PROF_KEY_NONE | |
1300 ) -> str: | |
1301 client = self.host.get_client(profile_key) | |
1302 service = None if not service else jid.JID(service) | |
1303 subs = await self.subscriptions(client, service, nodeIdentifier or None) | |
1304 return data_format.serialise(subs) | |
1305 | |
1306 async def subscriptions( | |
1307 self, | |
1308 client: SatXMPPEntity, | |
1309 service: Optional[jid.JID] = None, | |
1310 node: Optional[str] = None | |
1311 ) -> List[Dict[str, Union[str, bool]]]: | |
1312 """Retrieve subscriptions from a service | |
1313 | |
1314 @param service(jid.JID): PubSub service | |
1315 @param nodeIdentifier(unicode, None): node to check | |
1316 None to get all subscriptions | |
1317 """ | |
1318 cont, ret = await self.host.trigger.async_return_point( | |
1319 "XEP-0060_subscriptions", client, service, node | |
1320 ) | |
1321 if not cont: | |
1322 return ret | |
1323 subs = await client.pubsub_client.subscriptions(service, node) | |
1324 ret = [] | |
1325 for sub in subs: | |
1326 sub_dict = { | |
1327 "service": service.host if service else client.jid.host, | |
1328 "node": sub.nodeIdentifier, | |
1329 "subscriber": sub.subscriber.full(), | |
1330 "state": sub.state, | |
1331 } | |
1332 if sub.subscriptionIdentifier is not None: | |
1333 sub_dict["id"] = sub.subscriptionIdentifier | |
1334 ret.append(sub_dict) | |
1335 return ret | |
1336 | |
1337 ## misc tools ## | |
1338 | |
1339 def get_node_uri(self, service, node, item=None): | |
1340 """Return XMPP URI of a PubSub node | |
1341 | |
1342 @param service(jid.JID): PubSub service | |
1343 @param node(unicode): node | |
1344 @return (unicode): URI of the node | |
1345 """ | |
1346 # FIXME: deprecated, use sat.tools.common.uri instead | |
1347 assert service is not None | |
1348 # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122) | |
1349 # use ";" as a separator. So if more than one value is used in query_data, | |
1350 # urlencode MUST NOT BE USED. | |
1351 query_data = [("node", node.encode("utf-8"))] | |
1352 if item is not None: | |
1353 query_data.append(("item", item.encode("utf-8"))) | |
1354 return "xmpp:{service}?;{query}".format( | |
1355 service=service.userhost(), query=urllib.parse.urlencode(query_data) | |
1356 ) | |
1357 | |
1358 ## methods to manage several stanzas/jids at once ## | |
1359 | |
1360 # generic # | |
1361 | |
1362 def get_rt_results( | |
1363 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE | |
1364 ): | |
1365 return self.rt_sessions.get_results(session_id, on_success, on_error, profile) | |
1366 | |
1367 def trans_items_data(self, items_data, item_cb=lambda item: item.toXml()): | |
1368 """Helper method to transform result from [get_items] | |
1369 | |
1370 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) | |
1371 as returned by [get_items]. | |
1372 @param items_data(tuple): tuple returned by [get_items] | |
1373 @param item_cb(callable): method to transform each item | |
1374 @return (tuple): a serialised form ready to go throught bridge | |
1375 """ | |
1376 items, metadata = items_data | |
1377 items = [item_cb(item) for item in items] | |
1378 | |
1379 return (items, metadata) | |
1380 | |
1381 def trans_items_data_d(self, items_data, item_cb): | |
1382 """Helper method to transform result from [get_items], deferred version | |
1383 | |
1384 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) | |
1385 as returned by [get_items]. metadata values are then casted to unicode and | |
1386 each item is passed to items_cb. | |
1387 An errback is added to item_cb, and when it is fired the value is filtered from | |
1388 final items | |
1389 @param items_data(tuple): tuple returned by [get_items] | |
1390 @param item_cb(callable): method to transform each item (must return a deferred) | |
1391 @return (tuple): a deferred which fire a dict which can be serialised to go | |
1392 throught bridge | |
1393 """ | |
1394 items, metadata = items_data | |
1395 | |
1396 def eb(failure_): | |
1397 log.warning(f"Error while parsing item: {failure_.value}") | |
1398 | |
1399 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) | |
1400 d.addCallback(lambda parsed_items: ( | |
1401 [i for i in parsed_items if i is not None], | |
1402 metadata | |
1403 )) | |
1404 return d | |
1405 | |
1406 def ser_d_list(self, results, failure_result=None): | |
1407 """Serialise a DeferredList result | |
1408 | |
1409 @param results: DeferredList results | |
1410 @param failure_result: value to use as value for failed Deferred | |
1411 (default: empty tuple) | |
1412 @return (list): list with: | |
1413 - failure: empty in case of success, else error message | |
1414 - result | |
1415 """ | |
1416 if failure_result is None: | |
1417 failure_result = () | |
1418 return [ | |
1419 ("", result) | |
1420 if success | |
1421 else (str(result.result) or UNSPECIFIED, failure_result) | |
1422 for success, result in results | |
1423 ] | |
1424 | |
1425 # subscribe # | |
1426 | |
1427 @utils.ensure_deferred | |
1428 async def _get_node_subscriptions( | |
1429 self, | |
1430 service: str, | |
1431 node: str, | |
1432 profile_key: str | |
1433 ) -> Dict[str, str]: | |
1434 client = self.host.get_client(profile_key) | |
1435 subs = await self.get_node_subscriptions( | |
1436 client, jid.JID(service) if service else None, node | |
1437 ) | |
1438 return {j.full(): a for j, a in subs.items()} | |
1439 | |
1440 async def get_node_subscriptions( | |
1441 self, | |
1442 client: SatXMPPEntity, | |
1443 service: Optional[jid.JID], | |
1444 nodeIdentifier: str | |
1445 ) -> Dict[jid.JID, str]: | |
1446 """Retrieve subscriptions to a node | |
1447 | |
1448 @param nodeIdentifier(unicode): node to get subscriptions from | |
1449 """ | |
1450 if not nodeIdentifier: | |
1451 raise exceptions.DataError("node identifier can't be empty") | |
1452 request = pubsub.PubSubRequest("subscriptionsGet") | |
1453 request.recipient = service | |
1454 request.nodeIdentifier = nodeIdentifier | |
1455 | |
1456 iq_elt = await request.send(client.xmlstream) | |
1457 try: | |
1458 subscriptions_elt = next( | |
1459 iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "subscriptions") | |
1460 ) | |
1461 except StopIteration: | |
1462 raise ValueError( | |
1463 _("Invalid result: missing <subscriptions> element: {}").format( | |
1464 iq_elt.toXml | |
1465 ) | |
1466 ) | |
1467 except AttributeError as e: | |
1468 raise ValueError(_("Invalid result: {}").format(e)) | |
1469 try: | |
1470 return { | |
1471 jid.JID(s["jid"]): s["subscription"] | |
1472 for s in subscriptions_elt.elements( | |
1473 (pubsub.NS_PUBSUB, "subscription") | |
1474 ) | |
1475 } | |
1476 except KeyError: | |
1477 raise ValueError( | |
1478 _("Invalid result: bad <subscription> element: {}").format( | |
1479 iq_elt.toXml | |
1480 ) | |
1481 ) | |
1482 | |
1483 def _set_node_subscriptions( | |
1484 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE | |
1485 ): | |
1486 client = self.host.get_client(profile_key) | |
1487 subscriptions = { | |
1488 jid.JID(jid_): subscription | |
1489 for jid_, subscription in subscriptions.items() | |
1490 } | |
1491 d = self.set_node_subscriptions( | |
1492 client, | |
1493 jid.JID(service_s) if service_s else None, | |
1494 nodeIdentifier, | |
1495 subscriptions, | |
1496 ) | |
1497 return d | |
1498 | |
1499 def set_node_subscriptions(self, client, service, nodeIdentifier, subscriptions): | |
1500 """Set or update subscriptions of a node owned by profile | |
1501 | |
1502 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set | |
1503 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions | |
1504 """ | |
1505 request = pubsub.PubSubRequest("subscriptionsSet") | |
1506 request.recipient = service | |
1507 request.nodeIdentifier = nodeIdentifier | |
1508 request.subscriptions = { | |
1509 pubsub.Subscription(nodeIdentifier, jid_, state) | |
1510 for jid_, state in subscriptions.items() | |
1511 } | |
1512 d = request.send(client.xmlstream) | |
1513 return d | |
1514 | |
1515 def _many_subscribe_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | |
1516 """Get real-time results for subcribeToManu session | |
1517 | |
1518 @param session_id: id of the real-time deferred session | |
1519 @param return (tuple): (remaining, results) where: | |
1520 - remaining is the number of still expected results | |
1521 - results is a list of tuple(unicode, unicode, bool, unicode) with: | |
1522 - service: pubsub service | |
1523 - and node: pubsub node | |
1524 - failure(unicode): empty string in case of success, error message else | |
1525 @param profile_key: %(doc_profile_key)s | |
1526 """ | |
1527 profile = self.host.get_client(profile_key).profile | |
1528 d = self.rt_sessions.get_results( | |
1529 session_id, | |
1530 on_success=lambda result: "", | |
1531 on_error=lambda failure: str(failure.value), | |
1532 profile=profile, | |
1533 ) | |
1534 # we need to convert jid.JID to unicode with full() to serialise it for the bridge | |
1535 d.addCallback( | |
1536 lambda ret: ( | |
1537 ret[0], | |
1538 [ | |
1539 (service.full(), node, "" if success else failure or UNSPECIFIED) | |
1540 for (service, node), (success, failure) in ret[1].items() | |
1541 ], | |
1542 ) | |
1543 ) | |
1544 return d | |
1545 | |
1546 def _subscribe_to_many( | |
1547 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE | |
1548 ): | |
1549 return self.subscribe_to_many( | |
1550 [(jid.JID(service), str(node)) for service, node in node_data], | |
1551 jid.JID(subscriber), | |
1552 options, | |
1553 profile_key, | |
1554 ) | |
1555 | |
1556 def subscribe_to_many( | |
1557 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE | |
1558 ): | |
1559 """Subscribe to several nodes at once. | |
1560 | |
1561 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | |
1562 - service (jid.JID) is the pubsub service | |
1563 - node (unicode) is the node to subscribe to | |
1564 @param subscriber (jid.JID): optional subscription identifier. | |
1565 @param options (dict): subscription options | |
1566 @param profile_key (str): %(doc_profile_key)s | |
1567 @return (str): RT Deferred session id | |
1568 """ | |
1569 client = self.host.get_client(profile_key) | |
1570 deferreds = {} | |
1571 for service, node in node_data: | |
1572 deferreds[(service, node)] = defer.ensureDeferred( | |
1573 client.pubsub_client.subscribe( | |
1574 service, node, subscriber, options=options | |
1575 ) | |
1576 ) | |
1577 return self.rt_sessions.new_session(deferreds, client.profile) | |
1578 # found_nodes = yield self.listNodes(service, profile=client.profile) | |
1579 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) | |
1580 # d_list = [] | |
1581 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): | |
1582 # if nodeIdentifier not in found_nodes: | |
1583 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) | |
1584 # continue # avoid sat-pubsub "SubscriptionExists" error | |
1585 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) | |
1586 # defer.returnValue(d_list) | |
1587 | |
1588 # get # | |
1589 | |
1590 def _get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | |
1591 """Get real-time results for get_from_many session | |
1592 | |
1593 @param session_id: id of the real-time deferred session | |
1594 @param profile_key: %(doc_profile_key)s | |
1595 @param return (tuple): (remaining, results) where: | |
1596 - remaining is the number of still expected results | |
1597 - results is a list of tuple with | |
1598 - service (unicode): pubsub service | |
1599 - node (unicode): pubsub node | |
1600 - failure (unicode): empty string in case of success, error message else | |
1601 - items (list[s]): raw XML of items | |
1602 - metadata(dict): serialised metadata | |
1603 """ | |
1604 profile = self.host.get_client(profile_key).profile | |
1605 d = self.rt_sessions.get_results( | |
1606 session_id, | |
1607 on_success=lambda result: ("", self.trans_items_data(result)), | |
1608 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})), | |
1609 profile=profile, | |
1610 ) | |
1611 d.addCallback( | |
1612 lambda ret: ( | |
1613 ret[0], | |
1614 [ | |
1615 (service.full(), node, failure, items, metadata) | |
1616 for (service, node), (success, (failure, (items, metadata))) in ret[ | |
1617 1 | |
1618 ].items() | |
1619 ], | |
1620 ) | |
1621 ) | |
1622 return d | |
1623 | |
1624 def _get_from_many( | |
1625 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE | |
1626 ): | |
1627 """ | |
1628 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit | |
1629 """ | |
1630 max_item = None if max_item == C.NO_LIMIT else max_item | |
1631 extra = self.parse_extra(data_format.deserialise(extra)) | |
1632 return self.get_from_many( | |
1633 [(jid.JID(service), str(node)) for service, node in node_data], | |
1634 max_item, | |
1635 extra.rsm_request, | |
1636 extra.extra, | |
1637 profile_key, | |
1638 ) | |
1639 | |
1640 def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None, | |
1641 profile_key=C.PROF_KEY_NONE): | |
1642 """Get items from many nodes at once | |
1643 | |
1644 @param node_data (iterable[tuple]): iterable of tuple (service, node) where: | |
1645 - service (jid.JID) is the pubsub service | |
1646 - node (unicode) is the node to get items from | |
1647 @param max_items (int): optional limit on the number of retrieved items. | |
1648 @param rsm_request (RSMRequest): RSM request data | |
1649 @param profile_key (unicode): %(doc_profile_key)s | |
1650 @return (str): RT Deferred session id | |
1651 """ | |
1652 client = self.host.get_client(profile_key) | |
1653 deferreds = {} | |
1654 for service, node in node_data: | |
1655 deferreds[(service, node)] = defer.ensureDeferred(self.get_items( | |
1656 client, service, node, max_item, rsm_request=rsm_request, extra=extra | |
1657 )) | |
1658 return self.rt_sessions.new_session(deferreds, client.profile) | |
1659 | |
1660 | |
1661 @implementer(disco.IDisco) | |
1662 class SatPubSubClient(rsm.PubSubClient): | |
1663 | |
1664 def __init__(self, host, parent_plugin): | |
1665 self.host = host | |
1666 self.parent_plugin = parent_plugin | |
1667 rsm.PubSubClient.__init__(self) | |
1668 | |
1669 def connectionInitialized(self): | |
1670 rsm.PubSubClient.connectionInitialized(self) | |
1671 | |
1672 async def items( | |
1673 self, | |
1674 service: Optional[jid.JID], | |
1675 nodeIdentifier: str, | |
1676 maxItems: Optional[int] = None, | |
1677 subscriptionIdentifier: Optional[str] = None, | |
1678 sender: Optional[jid.JID] = None, | |
1679 itemIdentifiers: Optional[Set[str]] = None, | |
1680 orderBy: Optional[List[str]] = None, | |
1681 rsm_request: Optional[rsm.RSMRequest] = None, | |
1682 extra: Optional[Dict[str, Any]] = None, | |
1683 ): | |
1684 if extra is None: | |
1685 extra = {} | |
1686 items, rsm_response = await super().items( | |
1687 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, | |
1688 itemIdentifiers, orderBy, rsm_request | |
1689 ) | |
1690 # items must be returned, thus this async point can't stop the workflow (but it | |
1691 # can modify returned items) | |
1692 await self.host.trigger.async_point( | |
1693 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, | |
1694 extra | |
1695 ) | |
1696 return items, rsm_response | |
1697 | |
1698 def _get_node_callbacks(self, node, event): | |
1699 """Generate callbacks from given node and event | |
1700 | |
1701 @param node(unicode): node used for the item | |
1702 any registered node which prefix the node will match | |
1703 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE | |
1704 @return (iterator[callable]): callbacks for this node/event | |
1705 """ | |
1706 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items(): | |
1707 if not node.startswith(registered_node): | |
1708 continue | |
1709 try: | |
1710 for callback_data in callbacks_dict[event]: | |
1711 yield callback_data[0] | |
1712 except KeyError: | |
1713 continue | |
1714 | |
1715 async def _call_node_callbacks(self, client, event: pubsub.ItemsEvent) -> None: | |
1716 """Call sequencially event callbacks of a node | |
1717 | |
1718 Callbacks are called sequencially and not in parallel to be sure to respect | |
1719 priority (notably for plugin needing to get old items before they are modified or | |
1720 deleted from cache). | |
1721 """ | |
1722 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS): | |
1723 try: | |
1724 await utils.as_deferred(callback, client, event) | |
1725 except Exception as e: | |
1726 log.error( | |
1727 f"Error while running items event callback {callback}: {e}" | |
1728 ) | |
1729 | |
1730 def itemsReceived(self, event): | |
1731 log.debug("Pubsub items received") | |
1732 client = self.parent | |
1733 defer.ensureDeferred(self._call_node_callbacks(client, event)) | |
1734 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | |
1735 raw_items = [i.toXml() for i in event.items] | |
1736 self.host.bridge.ps_event_raw( | |
1737 event.sender.full(), | |
1738 event.nodeIdentifier, | |
1739 C.PS_ITEMS, | |
1740 raw_items, | |
1741 client.profile, | |
1742 ) | |
1743 | |
1744 def deleteReceived(self, event): | |
1745 log.debug(("Publish node deleted")) | |
1746 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE): | |
1747 d = utils.as_deferred(callback, self.parent, event) | |
1748 d.addErrback(lambda f: log.error( | |
1749 f"Error while running delete event callback {callback}: {f}" | |
1750 )) | |
1751 client = self.parent | |
1752 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | |
1753 self.host.bridge.ps_event_raw( | |
1754 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile | |
1755 ) | |
1756 | |
1757 def purgeReceived(self, event): | |
1758 log.debug(("Publish node purged")) | |
1759 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE): | |
1760 d = utils.as_deferred(callback, self.parent, event) | |
1761 d.addErrback(lambda f: log.error( | |
1762 f"Error while running purge event callback {callback}: {f}" | |
1763 )) | |
1764 client = self.parent | |
1765 if (event.sender, event.nodeIdentifier) in client.pubsub_watching: | |
1766 self.host.bridge.ps_event_raw( | |
1767 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile | |
1768 ) | |
1769 | |
1770 def subscriptions(self, service, nodeIdentifier, sender=None): | |
1771 """Return the list of subscriptions to the given service and node. | |
1772 | |
1773 @param service: The publish subscribe service to retrieve the subscriptions from. | |
1774 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} | |
1775 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). | |
1776 @type nodeIdentifier: C{unicode} | |
1777 @return (list[pubsub.Subscription]): list of subscriptions | |
1778 """ | |
1779 request = pubsub.PubSubRequest("subscriptions") | |
1780 request.recipient = service | |
1781 request.nodeIdentifier = nodeIdentifier | |
1782 request.sender = sender | |
1783 d = request.send(self.xmlstream) | |
1784 | |
1785 def cb(iq): | |
1786 subs = [] | |
1787 for subscription_elt in iq.pubsub.subscriptions.elements( | |
1788 pubsub.NS_PUBSUB, "subscription" | |
1789 ): | |
1790 subscription = pubsub.Subscription( | |
1791 subscription_elt["node"], | |
1792 jid.JID(subscription_elt["jid"]), | |
1793 subscription_elt["subscription"], | |
1794 subscriptionIdentifier=subscription_elt.getAttribute("subid"), | |
1795 ) | |
1796 subs.append(subscription) | |
1797 return subs | |
1798 | |
1799 return d.addCallback(cb) | |
1800 | |
1801 def purge_node(self, service, nodeIdentifier): | |
1802 """Purge a node (i.e. delete all items from it) | |
1803 | |
1804 @param service(jid.JID, None): service to send the item to | |
1805 None to use PEP | |
1806 @param NodeIdentifier(unicode): PubSub node to use | |
1807 """ | |
1808 # TODO: propose this upstream and remove it once merged | |
1809 request = pubsub.PubSubRequest('purge') | |
1810 request.recipient = service | |
1811 request.nodeIdentifier = nodeIdentifier | |
1812 return request.send(self.xmlstream) | |
1813 | |
1814 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): | |
1815 disco_info = [] | |
1816 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) | |
1817 return disco_info | |
1818 | |
1819 def getDiscoItems(self, requestor, service, nodeIdentifier=""): | |
1820 return [] |