comparison libervia/backend/plugins/plugin_xep_0060.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0060.py@524856bd7b19
children 087902fbb77a
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # SàT plugin for Publish-Subscribe (xep-0060)
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20 from collections import namedtuple
21 from functools import reduce
22 from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
23 import urllib.error
24 import urllib.parse
25 import urllib.request
26
27 from twisted.internet import defer, reactor
28 from twisted.words.protocols.jabber import error, jid
29 from twisted.words.xish import domish
30 from wokkel import disco
31 from wokkel import data_form
32 from wokkel import pubsub
33 from wokkel import rsm
34 from wokkel import mam
35 from zope.interface import implementer
36
37 from libervia.backend.core import exceptions
38 from libervia.backend.core.constants import Const as C
39 from libervia.backend.core.core_types import SatXMPPEntity
40 from libervia.backend.core.i18n import _
41 from libervia.backend.core.log import getLogger
42 from libervia.backend.core.xmpp import SatXMPPClient
43 from libervia.backend.tools import utils
44 from libervia.backend.tools import sat_defer
45 from libervia.backend.tools import xml_tools
46 from libervia.backend.tools.common import data_format
47
48
49 log = getLogger(__name__)
50
51 PLUGIN_INFO = {
52 C.PI_NAME: "Publish-Subscribe",
53 C.PI_IMPORT_NAME: "XEP-0060",
54 C.PI_TYPE: "XEP",
55 C.PI_MODES: C.PLUG_MODE_BOTH,
56 C.PI_PROTOCOLS: ["XEP-0060"],
57 C.PI_DEPENDENCIES: [],
58 C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"],
59 C.PI_MAIN: "XEP_0060",
60 C.PI_HANDLER: "yes",
61 C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""),
62 }
63
64 UNSPECIFIED = "unspecified error"
65
66
67 Extra = namedtuple("Extra", ("rsm_request", "extra"))
68 # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None
69 # extra is a potentially empty dict
70 TIMEOUT = 30
71 # minimum features that a pubsub service must have to be selectable as default
72 DEFAULT_PUBSUB_MIN_FEAT = {
73 'http://jabber.org/protocol/pubsub#persistent-items',
74 'http://jabber.org/protocol/pubsub#publish',
75 'http://jabber.org/protocol/pubsub#retract-items',
76 }
77
78 class XEP_0060(object):
79 OPT_ACCESS_MODEL = "pubsub#access_model"
80 OPT_PERSIST_ITEMS = "pubsub#persist_items"
81 OPT_MAX_ITEMS = "pubsub#max_items"
82 OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads"
83 OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe"
84 OPT_NODE_TYPE = "pubsub#node_type"
85 OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type"
86 OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth"
87 OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed"
88 OPT_PUBLISH_MODEL = "pubsub#publish_model"
89 OPT_OVERWRITE_POLICY = "pubsub#overwrite_policy"
90 ACCESS_OPEN = "open"
91 ACCESS_PRESENCE = "presence"
92 ACCESS_ROSTER = "roster"
93 ACCESS_PUBLISHER_ROSTER = "publisher-roster"
94 ACCESS_AUTHORIZE = "authorize"
95 ACCESS_WHITELIST = "whitelist"
96 PUBLISH_MODEL_PUBLISHERS = "publishers"
97 PUBLISH_MODEL_SUBSCRIBERS = "subscribers"
98 PUBLISH_MODEL_OPEN = "open"
99 OWPOL_ORIGINAL = "original_publisher"
100 OWPOL_ANY_PUB = "any_publisher"
101 ID_SINGLETON = "current"
102 EXTRA_PUBLISH_OPTIONS = "publish_options"
103 EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met"
104 # extra disco needed for RSM, cf. XEP-0060 § 6.5.4
105 DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm"
106
107 def __init__(self, host):
108 log.info(_("PubSub plugin initialization"))
109 self.host = host
110 self._rsm = host.plugins.get("XEP-0059")
111 self._mam = host.plugins.get("XEP-0313")
112 self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks)
113 self.rt_sessions = sat_defer.RTDeferredSessions()
114 host.bridge.add_method(
115 "ps_node_create",
116 ".plugin",
117 in_sign="ssa{ss}s",
118 out_sign="s",
119 method=self._create_node,
120 async_=True,
121 )
122 host.bridge.add_method(
123 "ps_node_configuration_get",
124 ".plugin",
125 in_sign="sss",
126 out_sign="a{ss}",
127 method=self._get_node_configuration,
128 async_=True,
129 )
130 host.bridge.add_method(
131 "ps_node_configuration_set",
132 ".plugin",
133 in_sign="ssa{ss}s",
134 out_sign="",
135 method=self._set_node_configuration,
136 async_=True,
137 )
138 host.bridge.add_method(
139 "ps_node_affiliations_get",
140 ".plugin",
141 in_sign="sss",
142 out_sign="a{ss}",
143 method=self._get_node_affiliations,
144 async_=True,
145 )
146 host.bridge.add_method(
147 "ps_node_affiliations_set",
148 ".plugin",
149 in_sign="ssa{ss}s",
150 out_sign="",
151 method=self._set_node_affiliations,
152 async_=True,
153 )
154 host.bridge.add_method(
155 "ps_node_subscriptions_get",
156 ".plugin",
157 in_sign="sss",
158 out_sign="a{ss}",
159 method=self._get_node_subscriptions,
160 async_=True,
161 )
162 host.bridge.add_method(
163 "ps_node_subscriptions_set",
164 ".plugin",
165 in_sign="ssa{ss}s",
166 out_sign="",
167 method=self._set_node_subscriptions,
168 async_=True,
169 )
170 host.bridge.add_method(
171 "ps_node_purge",
172 ".plugin",
173 in_sign="sss",
174 out_sign="",
175 method=self._purge_node,
176 async_=True,
177 )
178 host.bridge.add_method(
179 "ps_node_delete",
180 ".plugin",
181 in_sign="sss",
182 out_sign="",
183 method=self._delete_node,
184 async_=True,
185 )
186 host.bridge.add_method(
187 "ps_node_watch_add",
188 ".plugin",
189 in_sign="sss",
190 out_sign="",
191 method=self._addWatch,
192 async_=False,
193 )
194 host.bridge.add_method(
195 "ps_node_watch_remove",
196 ".plugin",
197 in_sign="sss",
198 out_sign="",
199 method=self._remove_watch,
200 async_=False,
201 )
202 host.bridge.add_method(
203 "ps_affiliations_get",
204 ".plugin",
205 in_sign="sss",
206 out_sign="a{ss}",
207 method=self._get_affiliations,
208 async_=True,
209 )
210 host.bridge.add_method(
211 "ps_items_get",
212 ".plugin",
213 in_sign="ssiassss",
214 out_sign="s",
215 method=self._get_items,
216 async_=True,
217 )
218 host.bridge.add_method(
219 "ps_item_send",
220 ".plugin",
221 in_sign="ssssss",
222 out_sign="s",
223 method=self._send_item,
224 async_=True,
225 )
226 host.bridge.add_method(
227 "ps_items_send",
228 ".plugin",
229 in_sign="ssasss",
230 out_sign="as",
231 method=self._send_items,
232 async_=True,
233 )
234 host.bridge.add_method(
235 "ps_item_retract",
236 ".plugin",
237 in_sign="sssbs",
238 out_sign="",
239 method=self._retract_item,
240 async_=True,
241 )
242 host.bridge.add_method(
243 "ps_items_retract",
244 ".plugin",
245 in_sign="ssasbs",
246 out_sign="",
247 method=self._retract_items,
248 async_=True,
249 )
250 host.bridge.add_method(
251 "ps_item_rename",
252 ".plugin",
253 in_sign="sssss",
254 out_sign="",
255 method=self._rename_item,
256 async_=True,
257 )
258 host.bridge.add_method(
259 "ps_subscribe",
260 ".plugin",
261 in_sign="ssss",
262 out_sign="s",
263 method=self._subscribe,
264 async_=True,
265 )
266 host.bridge.add_method(
267 "ps_unsubscribe",
268 ".plugin",
269 in_sign="sss",
270 out_sign="",
271 method=self._unsubscribe,
272 async_=True,
273 )
274 host.bridge.add_method(
275 "ps_subscriptions_get",
276 ".plugin",
277 in_sign="sss",
278 out_sign="s",
279 method=self._subscriptions,
280 async_=True,
281 )
282 host.bridge.add_method(
283 "ps_subscribe_to_many",
284 ".plugin",
285 in_sign="a(ss)sa{ss}s",
286 out_sign="s",
287 method=self._subscribe_to_many,
288 )
289 host.bridge.add_method(
290 "ps_get_subscribe_rt_result",
291 ".plugin",
292 in_sign="ss",
293 out_sign="(ua(sss))",
294 method=self._many_subscribe_rt_result,
295 async_=True,
296 )
297 host.bridge.add_method(
298 "ps_get_from_many",
299 ".plugin",
300 in_sign="a(ss)iss",
301 out_sign="s",
302 method=self._get_from_many,
303 )
304 host.bridge.add_method(
305 "ps_get_from_many_rt_result",
306 ".plugin",
307 in_sign="ss",
308 out_sign="(ua(sssasa{ss}))",
309 method=self._get_from_many_rt_result,
310 async_=True,
311 )
312
313 #  high level observer method
314 host.bridge.add_signal(
315 "ps_event", ".plugin", signature="ssssss"
316 ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile
317
318 # low level observer method, used if service/node is in watching list (see psNodeWatch* methods)
319 host.bridge.add_signal(
320 "ps_event_raw", ".plugin", signature="sssass"
321 ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile
322
323 def get_handler(self, client):
324 client.pubsub_client = SatPubSubClient(self.host, self)
325 return client.pubsub_client
326
327 async def profile_connected(self, client):
328 client.pubsub_watching = set()
329 try:
330 client.pubsub_service = jid.JID(
331 self.host.memory.config_get("", "pubsub_service")
332 )
333 except RuntimeError:
334 log.info(
335 _(
336 "Can't retrieve pubsub_service from conf, we'll use first one that "
337 "we find"
338 )
339 )
340 pubsub_services = await self.host.find_service_entities(
341 client, "pubsub", "service"
342 )
343 for service_jid in pubsub_services:
344 infos = await self.host.memory.disco.get_infos(client, service_jid)
345 if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features):
346 continue
347 names = {(n or "").lower() for n in infos.identities.values()}
348 if "libervia pubsub service" in names:
349 # this is the name of Libervia's side project pubsub service, we know
350 # that it is a suitable default pubsub service
351 client.pubsub_service = service_jid
352 break
353 categories = {(i[0] or "").lower() for i in infos.identities.keys()}
354 if "gateway" in categories or "gateway" in names:
355 # we don't want to use a gateway as default pubsub service
356 continue
357 if "jabber:iq:register" in infos.features:
358 # may be present on gateways, and we don't want a service
359 # where registration is needed
360 continue
361 client.pubsub_service = service_jid
362 break
363 else:
364 client.pubsub_service = None
365 pubsub_service_str = (
366 client.pubsub_service.full() if client.pubsub_service else "PEP"
367 )
368 log.info(f"default pubsub service: {pubsub_service_str}")
369
370 def features_get(self, profile):
371 try:
372 client = self.host.get_client(profile)
373 except exceptions.ProfileNotSetError:
374 return {}
375 try:
376 return {
377 "service": client.pubsub_service.full()
378 if client.pubsub_service is not None
379 else ""
380 }
381 except AttributeError:
382 if self.host.is_connected(profile):
383 log.debug("Profile is not connected, service is not checked yet")
384 else:
385 log.error("Service should be available !")
386 return {}
387
388 def parse_extra(self, extra):
389 """Parse extra dictionnary
390
391 used bridge's extra dictionnaries
392 @param extra(dict): extra data used to configure request
393 @return(Extra): filled Extra instance
394 """
395 if extra is None:
396 rsm_request = None
397 extra = {}
398 else:
399 # order-by
400 if C.KEY_ORDER_BY in extra:
401 # FIXME: we temporarily manage only one level of ordering
402 # we need to switch to a fully serialised extra data
403 # to be able to encode a whole ordered list
404 extra[C.KEY_ORDER_BY] = [extra.pop(C.KEY_ORDER_BY)]
405
406 # rsm
407 if self._rsm is None:
408 rsm_request = None
409 else:
410 rsm_request = self._rsm.parse_extra(extra)
411
412 # mam
413 if self._mam is None:
414 mam_request = None
415 else:
416 mam_request = self._mam.parse_extra(extra, with_rsm=False)
417
418 if mam_request is not None:
419 assert "mam" not in extra
420 extra["mam"] = mam_request
421
422 return Extra(rsm_request, extra)
423
424 def add_managed_node(
425 self,
426 node: str,
427 priority: int = 0,
428 **kwargs: Callable
429 ):
430 """Add a handler for a node
431
432 @param node: node to monitor
433 all node *prefixed* with this one will be triggered
434 @param priority: priority of the callback. Callbacks with higher priority will be
435 called first.
436 @param **kwargs: method(s) to call when the node is found
437 the method must be named after PubSub constants in lower case
438 and suffixed with "_cb"
439 e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE
440 note: only C.PS_ITEMS and C.PS_DELETE are implemented so far
441 """
442 assert node is not None
443 assert kwargs
444 callbacks = self._node_cb.setdefault(node, {})
445 for event, cb in kwargs.items():
446 event_name = event[:-3]
447 assert event_name in C.PS_EVENTS
448 cb_list = callbacks.setdefault(event_name, [])
449 cb_list.append((cb, priority))
450 cb_list.sort(key=lambda c: c[1], reverse=True)
451
452 def remove_managed_node(self, node, *args):
453 """Add a handler for a node
454
455 @param node(unicode): node to monitor
456 @param *args: callback(s) to remove
457 """
458 assert args
459 try:
460 registred_cb = self._node_cb[node]
461 except KeyError:
462 pass
463 else:
464 removed = False
465 for callback in args:
466 for event, cb_list in registred_cb.items():
467 to_remove = []
468 for cb in cb_list:
469 if cb[0] == callback:
470 to_remove.append(cb)
471 for cb in to_remove:
472 cb_list.remove(cb)
473 if not cb_list:
474 del registred_cb[event]
475 if not registred_cb:
476 del self._node_cb[node]
477 removed = True
478 break
479
480 if not removed:
481 log.error(
482 f"Trying to remove inexistant callback {callback} for node {node}"
483 )
484
485 # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
486 # """Retrieve the name of the nodes that are accessible on the target service.
487
488 # @param service (JID): target service
489 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
490 # @param profile (str): %(doc_profile)s
491 # @return: deferred which fire a list of nodes
492 # """
493 # client = self.host.get_client(profile)
494 # d = self.host.getDiscoItems(client, service, nodeIdentifier)
495 # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
496 # return d
497
498 # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
499 # """Retrieve the name of the nodes to which the profile is subscribed on the target service.
500
501 # @param service (JID): target service
502 # @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
503 # @param filter_ (str): filter the result according to the given subscription type:
504 # - None: do not filter
505 # - 'pending': subscription has not been approved yet by the node owner
506 # - 'unconfigured': subscription options have not been configured yet
507 # - 'subscribed': subscription is complete
508 # @param profile (str): %(doc_profile)s
509 # @return: Deferred list[str]
510 # """
511 # d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
512 # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
513 # return d
514
515 def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="",
516 profile_key=C.PROF_KEY_NONE):
517 client = self.host.get_client(profile_key)
518 service = None if not service else jid.JID(service)
519 payload = xml_tools.parse(payload)
520 extra = data_format.deserialise(extra_ser)
521 d = defer.ensureDeferred(self.send_item(
522 client, service, nodeIdentifier, payload, item_id or None, extra
523 ))
524 d.addCallback(lambda ret: ret or "")
525 return d
526
527 def _send_items(self, service, nodeIdentifier, items, extra_ser=None,
528 profile_key=C.PROF_KEY_NONE):
529 client = self.host.get_client(profile_key)
530 service = None if not service else jid.JID(service)
531 try:
532 items = [xml_tools.parse(item) for item in items]
533 except Exception as e:
534 raise exceptions.DataError(_("Can't parse items: {msg}").format(
535 msg=e))
536 extra = data_format.deserialise(extra_ser)
537 return defer.ensureDeferred(self.send_items(
538 client, service, nodeIdentifier, items, extra=extra
539 ))
540
541 async def send_item(
542 self,
543 client: SatXMPPClient,
544 service: Union[jid.JID, None],
545 nodeIdentifier: str,
546 payload: domish.Element,
547 item_id: Optional[str] = None,
548 extra: Optional[Dict[str, Any]] = None
549 ) -> Optional[str]:
550 """High level method to send one item
551
552 @param service: service to send the item to None to use PEP
553 @param NodeIdentifier: PubSub node to use
554 @param payload: payload of the item to send
555 @param item_id: id to use or None to create one
556 @param extra: extra options
557 @return: id of the created item
558 """
559 assert isinstance(payload, domish.Element)
560 item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
561 if item_id is not None:
562 item_elt['id'] = item_id
563 item_elt.addChild(payload)
564 published_ids = await self.send_items(
565 client,
566 service,
567 nodeIdentifier,
568 [item_elt],
569 extra=extra
570 )
571 try:
572 return published_ids[0]
573 except IndexError:
574 return item_id
575
576 async def send_items(
577 self,
578 client: SatXMPPEntity,
579 service: Optional[jid.JID],
580 nodeIdentifier: str,
581 items: List[domish.Element],
582 sender: Optional[jid.JID] = None,
583 extra: Optional[Dict[str, Any]] = None
584 ) -> List[str]:
585 """High level method to send several items at once
586
587 @param service: service to send the item to
588 None to use PEP
589 @param NodeIdentifier: PubSub node to use
590 @param items: whole item elements to send,
591 "id" will be used if set
592 @param extra: extra options. Key can be:
593 - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5
594 the dict maps option name to value(s)
595 - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is
596 failing du to failing precondition. Value can be:
597 * raise (default): raise the exception
598 * publish_without_options: re-publish without the publish-options.
599 A warning will be logged showing that the publish-options could not
600 be used
601 @return: ids of the created items
602 """
603 if extra is None:
604 extra = {}
605 if service is None:
606 service = client.jid.userhostJID()
607 parsed_items = []
608 for item in items:
609 if item.name != 'item':
610 raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
611 item_id = item.getAttribute("id")
612 parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
613 publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS)
614 try:
615 iq_result = await self.publish(
616 client, service, nodeIdentifier, parsed_items, options=publish_options,
617 sender=sender
618 )
619 except error.StanzaError as e:
620 if ((e.condition == 'conflict' and e.appCondition
621 and e.appCondition.name == 'precondition-not-met'
622 and publish_options is not None)):
623 # this usually happens when publish-options can't be set
624 policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise')
625 if policy == 'raise':
626 raise e
627 elif policy == 'publish_without_options':
628 log.warning(_(
629 "Can't use publish-options ({options}) on node {node}, "
630 "re-publishing without them: {reason}").format(
631 options=', '.join(f'{k} = {v}'
632 for k,v in publish_options.items()),
633 node=nodeIdentifier,
634 reason=e,
635 )
636 )
637 iq_result = await self.publish(
638 client, service, nodeIdentifier, parsed_items)
639 else:
640 raise exceptions.InternalError(
641 f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: "
642 f"{policy}"
643 )
644 else:
645 raise e
646 try:
647 return [
648 item['id']
649 for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')
650 ]
651 except AttributeError:
652 return []
653
654 async def publish(
655 self,
656 client: SatXMPPEntity,
657 service: jid.JID,
658 nodeIdentifier: str,
659 items: Optional[List[domish.Element]] = None,
660 options: Optional[dict] = None,
661 sender: Optional[jid.JID] = None,
662 extra: Optional[Dict[str, Any]] = None
663 ) -> domish.Element:
664 """Publish pubsub items
665
666 @param sender: sender of the request,
667 client.jid will be used if nto set
668 @param extra: extra data
669 not used directly by ``publish``, but may be used in triggers
670 @return: IQ result stanza
671 @trigger XEP-0060_publish: called just before publication.
672 if it returns False, extra must have a "iq_result_elt" key set with
673 domish.Element to return.
674 """
675 if sender is None:
676 sender = client.jid
677 if extra is None:
678 extra = {}
679 if not await self.host.trigger.async_point(
680 "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender,
681 extra
682 ):
683 return extra["iq_result_elt"]
684 iq_result_elt = await client.pubsub_client.publish(
685 service, nodeIdentifier, items, sender,
686 options=options
687 )
688 return iq_result_elt
689
690 def _unwrap_mam_message(self, message_elt):
691 try:
692 item_elt = reduce(
693 lambda elt, ns_name: next(elt.elements(*ns_name)),
694 (message_elt,
695 (mam.NS_MAM, "result"),
696 (C.NS_FORWARD, "forwarded"),
697 (C.NS_CLIENT, "message"),
698 ("http://jabber.org/protocol/pubsub#event", "event"),
699 ("http://jabber.org/protocol/pubsub#event", "items"),
700 ("http://jabber.org/protocol/pubsub#event", "item"),
701 ))
702 except StopIteration:
703 raise exceptions.DataError("Can't find Item in MAM message element")
704 return item_elt
705
706 def serialise_items(self, items_data):
707 items, metadata = items_data
708 metadata['items'] = items
709 return data_format.serialise(metadata)
710
711 def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None,
712 extra="", profile_key=C.PROF_KEY_NONE):
713 """Get items from pubsub node
714
715 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
716 """
717 client = self.host.get_client(profile_key)
718 service = jid.JID(service) if service else None
719 max_items = None if max_items == C.NO_LIMIT else max_items
720 extra = self.parse_extra(data_format.deserialise(extra))
721 d = defer.ensureDeferred(self.get_items(
722 client,
723 service,
724 node,
725 max_items,
726 item_ids,
727 sub_id or None,
728 extra.rsm_request,
729 extra.extra,
730 ))
731 d.addCallback(self.trans_items_data)
732 d.addCallback(self.serialise_items)
733 return d
734
735 async def get_items(
736 self,
737 client: SatXMPPEntity,
738 service: Optional[jid.JID],
739 node: str,
740 max_items: Optional[int] = None,
741 item_ids: Optional[List[str]] = None,
742 sub_id: Optional[str] = None,
743 rsm_request: Optional[rsm.RSMRequest] = None,
744 extra: Optional[dict] = None
745 ) -> Tuple[List[dict], dict]:
746 """Retrieve pubsub items from a node.
747
748 @param service (JID, None): pubsub service.
749 @param node (str): node id.
750 @param max_items (int): optional limit on the number of retrieved items.
751 @param item_ids (list[str]): identifiers of the items to be retrieved (can't be
752 used with rsm_request). If requested items don't exist, they won't be
753 returned, meaning that we can have an empty list as result (NotFound
754 exception is NOT raised).
755 @param sub_id (str): optional subscription identifier.
756 @param rsm_request (rsm.RSMRequest): RSM request data
757 @return: a deferred couple (list[dict], dict) containing:
758 - list of items
759 - metadata with the following keys:
760 - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index
761 value of RSMResponse
762 - service, node: service and node used
763 """
764 if item_ids and max_items is not None:
765 max_items = None
766 if rsm_request and item_ids:
767 raise ValueError("items_id can't be used with rsm")
768 if extra is None:
769 extra = {}
770 cont, ret = await self.host.trigger.async_return_point(
771 "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id,
772 rsm_request, extra
773 )
774 if not cont:
775 return ret
776 try:
777 mam_query = extra["mam"]
778 except KeyError:
779 d = defer.ensureDeferred(client.pubsub_client.items(
780 service = service,
781 nodeIdentifier = node,
782 maxItems = max_items,
783 subscriptionIdentifier = sub_id,
784 sender = None,
785 itemIdentifiers = item_ids,
786 orderBy = extra.get(C.KEY_ORDER_BY),
787 rsm_request = rsm_request,
788 extra = extra
789 ))
790 # we have no MAM data here, so we add None
791 d.addErrback(sat_defer.stanza_2_not_found)
792 d.addTimeout(TIMEOUT, reactor)
793 items, rsm_response = await d
794 mam_response = None
795 else:
796 # if mam is requested, we have to do a totally different query
797 if self._mam is None:
798 raise exceptions.NotFound("MAM (XEP-0313) plugin is not available")
799 if max_items is not None:
800 raise exceptions.DataError("max_items parameter can't be used with MAM")
801 if item_ids:
802 raise exceptions.DataError("items_ids parameter can't be used with MAM")
803 if mam_query.node is None:
804 mam_query.node = node
805 elif mam_query.node != node:
806 raise exceptions.DataError(
807 "MAM query node is incoherent with get_items's node"
808 )
809 if mam_query.rsm is None:
810 mam_query.rsm = rsm_request
811 else:
812 if mam_query.rsm != rsm_request:
813 raise exceptions.DataError(
814 "Conflict between RSM request and MAM's RSM request"
815 )
816 items, rsm_response, mam_response = await self._mam.get_archives(
817 client, mam_query, service, self._unwrap_mam_message
818 )
819
820 try:
821 subscribe = C.bool(extra["subscribe"])
822 except KeyError:
823 subscribe = False
824
825 if subscribe:
826 try:
827 await self.subscribe(client, service, node)
828 except error.StanzaError as e:
829 log.warning(
830 f"Could not subscribe to node {node} on service {service}: {e}"
831 )
832
833 # TODO: handle mam_response
834 service_jid = service if service else client.jid.userhostJID()
835 metadata = {
836 "service": service_jid,
837 "node": node,
838 "uri": self.get_node_uri(service_jid, node),
839 }
840 if mam_response is not None:
841 # mam_response is a dict with "complete" and "stable" keys
842 # we can put them directly in metadata
843 metadata.update(mam_response)
844 if rsm_request is not None and rsm_response is not None:
845 metadata['rsm'] = rsm_response.toDict()
846 if mam_response is None:
847 index = rsm_response.index
848 count = rsm_response.count
849 if index is None or count is None:
850 # we don't have enough information to know if the data is complete
851 # or not
852 metadata["complete"] = None
853 else:
854 # normally we have a strict equality here but XEP-0059 states
855 # that index MAY be approximative, so just in case…
856 metadata["complete"] = index + len(items) >= count
857 # encrypted metadata can be added by plugins in XEP-0060_items trigger
858 if "encrypted" in extra:
859 metadata["encrypted"] = extra["encrypted"]
860
861 return (items, metadata)
862
863 # @defer.inlineCallbacks
864 # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
865 # """Massively retrieve pubsub items from many nodes.
866 # @param service (JID): target service.
867 # @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
868 # @param max_items (int): optional limit on the number of retrieved items *per node*.
869 # @param sub_id (str): optional subscription identifier.
870 # @param rsm (dict): RSM request data
871 # @param profile_key (str): %(doc_profile_key)s
872 # @return: a deferred dict with:
873 # - key: a value in (a subset of) data.keys()
874 # - couple (list[dict], dict) containing:
875 # - list of items
876 # - RSM response data
877 # """
878 # client = self.host.get_client(profile_key)
879 # found_nodes = yield self.listNodes(service, profile=client.profile)
880 # d_dict = {}
881 # for publisher, node in data.items():
882 # if node not in found_nodes:
883 # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
884 # continue # avoid pubsub "item-not-found" error
885 # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile)
886 # defer.returnValue(d_dict)
887
888 def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None,
889 profile_key=C.PROF_KEY_NONE):
890 client = self.host.get_client(profile_key)
891 return client.pubsub_client.getOptions(
892 service, nodeIdentifier, subscriber, subscriptionIdentifier
893 )
894
895 def setOptions(self, service, nodeIdentifier, subscriber, options,
896 subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
897 client = self.host.get_client(profile_key)
898 return client.pubsub_client.setOptions(
899 service, nodeIdentifier, subscriber, options, subscriptionIdentifier
900 )
901
902 def _create_node(self, service_s, nodeIdentifier, options, profile_key):
903 client = self.host.get_client(profile_key)
904 return self.createNode(
905 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
906 )
907
908 def createNode(
909 self,
910 client: SatXMPPClient,
911 service: jid.JID,
912 nodeIdentifier: Optional[str] = None,
913 options: Optional[Dict[str, str]] = None
914 ) -> str:
915 """Create a new node
916
917 @param service: PubSub service,
918 @param NodeIdentifier: node name use None to create instant node (identifier will
919 be returned by this method)
920 @param option: node configuration options
921 @return: identifier of the created node (may be different from requested name)
922 """
923 # TODO: if pubsub service doesn't hande publish-options, configure it in a second time
924 return client.pubsub_client.createNode(service, nodeIdentifier, options)
925
926 @defer.inlineCallbacks
927 def create_if_new_node(self, client, service, nodeIdentifier, options=None):
928 """Helper method similar to createNode, but will not fail in case of conflict"""
929 try:
930 yield self.createNode(client, service, nodeIdentifier, options)
931 except error.StanzaError as e:
932 if e.condition == "conflict":
933 pass
934 else:
935 raise e
936
937 def _get_node_configuration(self, service_s, nodeIdentifier, profile_key):
938 client = self.host.get_client(profile_key)
939 d = self.getConfiguration(
940 client, jid.JID(service_s) if service_s else None, nodeIdentifier
941 )
942
943 def serialize(form):
944 # FIXME: better more generic dataform serialisation should be available in SàT
945 return {f.var: str(f.value) for f in list(form.fields.values())}
946
947 d.addCallback(serialize)
948 return d
949
950 def getConfiguration(self, client, service, nodeIdentifier):
951 request = pubsub.PubSubRequest("configureGet")
952 request.recipient = service
953 request.nodeIdentifier = nodeIdentifier
954
955 def cb(iq):
956 form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG)
957 form.typeCheck()
958 return form
959
960 d = request.send(client.xmlstream)
961 d.addCallback(cb)
962 return d
963
964 def make_configuration_form(self, options: dict) -> data_form.Form:
965 """Build a configuration form"""
966 form = data_form.Form(
967 formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG
968 )
969 form.makeFields(options)
970 return form
971
972 def _set_node_configuration(self, service_s, nodeIdentifier, options, profile_key):
973 client = self.host.get_client(profile_key)
974 d = self.setConfiguration(
975 client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
976 )
977 return d
978
979 def setConfiguration(self, client, service, nodeIdentifier, options):
980 request = pubsub.PubSubRequest("configureSet")
981 request.recipient = service
982 request.nodeIdentifier = nodeIdentifier
983
984 form = self.make_configuration_form(options)
985 request.options = form
986
987 d = request.send(client.xmlstream)
988 return d
989
990 def _get_affiliations(self, service_s, nodeIdentifier, profile_key):
991 client = self.host.get_client(profile_key)
992 d = self.get_affiliations(
993 client, jid.JID(service_s) if service_s else None, nodeIdentifier or None
994 )
995 return d
996
997 def get_affiliations(self, client, service, nodeIdentifier=None):
998 """Retrieve affiliations of an entity
999
1000 @param nodeIdentifier(unicode, None): node to get affiliation from
1001 None to get all nodes affiliations for this service
1002 """
1003 request = pubsub.PubSubRequest("affiliations")
1004 request.recipient = service
1005 request.nodeIdentifier = nodeIdentifier
1006
1007 def cb(iq_elt):
1008 try:
1009 affiliations_elt = next(
1010 iq_elt.pubsub.elements(pubsub.NS_PUBSUB, "affiliations")
1011 )
1012 except StopIteration:
1013 raise ValueError(
1014 _("Invalid result: missing <affiliations> element: {}").format(
1015 iq_elt.toXml
1016 )
1017 )
1018 try:
1019 return {
1020 e["node"]: e["affiliation"]
1021 for e in affiliations_elt.elements(pubsub.NS_PUBSUB, "affiliation")
1022 }
1023 except KeyError:
1024 raise ValueError(
1025 _("Invalid result: bad <affiliation> element: {}").format(
1026 iq_elt.toXml
1027 )
1028 )
1029
1030 d = request.send(client.xmlstream)
1031 d.addCallback(cb)
1032 return d
1033
1034 def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key):
1035 client = self.host.get_client(profile_key)
1036 d = self.get_node_affiliations(
1037 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1038 )
1039 d.addCallback(
1040 lambda affiliations: {j.full(): a for j, a in affiliations.items()}
1041 )
1042 return d
1043
1044 def get_node_affiliations(self, client, service, nodeIdentifier):
1045 """Retrieve affiliations of a node owned by profile"""
1046 request = pubsub.PubSubRequest("affiliationsGet")
1047 request.recipient = service
1048 request.nodeIdentifier = nodeIdentifier
1049
1050 def cb(iq_elt):
1051 try:
1052 affiliations_elt = next(
1053 iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "affiliations")
1054 )
1055 except StopIteration:
1056 raise ValueError(
1057 _("Invalid result: missing <affiliations> element: {}").format(
1058 iq_elt.toXml
1059 )
1060 )
1061 try:
1062 return {
1063 jid.JID(e["jid"]): e["affiliation"]
1064 for e in affiliations_elt.elements(
1065 (pubsub.NS_PUBSUB_OWNER, "affiliation")
1066 )
1067 }
1068 except KeyError:
1069 raise ValueError(
1070 _("Invalid result: bad <affiliation> element: {}").format(
1071 iq_elt.toXml
1072 )
1073 )
1074
1075 d = request.send(client.xmlstream)
1076 d.addCallback(cb)
1077 return d
1078
1079 def _set_node_affiliations(
1080 self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE
1081 ):
1082 client = self.host.get_client(profile_key)
1083 affiliations = {
1084 jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items()
1085 }
1086 d = self.set_node_affiliations(
1087 client,
1088 jid.JID(service_s) if service_s else None,
1089 nodeIdentifier,
1090 affiliations,
1091 )
1092 return d
1093
1094 def set_node_affiliations(self, client, service, nodeIdentifier, affiliations):
1095 """Update affiliations of a node owned by profile
1096
1097 @param affiliations(dict[jid.JID, unicode]): affiliations to set
1098 check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations
1099 """
1100 request = pubsub.PubSubRequest("affiliationsSet")
1101 request.recipient = service
1102 request.nodeIdentifier = nodeIdentifier
1103 request.affiliations = affiliations
1104 d = request.send(client.xmlstream)
1105 return d
1106
1107 def _purge_node(self, service_s, nodeIdentifier, profile_key):
1108 client = self.host.get_client(profile_key)
1109 return self.purge_node(
1110 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1111 )
1112
1113 def purge_node(self, client, service, nodeIdentifier):
1114 return client.pubsub_client.purge_node(service, nodeIdentifier)
1115
1116 def _delete_node(self, service_s, nodeIdentifier, profile_key):
1117 client = self.host.get_client(profile_key)
1118 return self.deleteNode(
1119 client, jid.JID(service_s) if service_s else None, nodeIdentifier
1120 )
1121
1122 def deleteNode(
1123 self,
1124 client: SatXMPPClient,
1125 service: jid.JID,
1126 nodeIdentifier: str
1127 ) -> defer.Deferred:
1128 return client.pubsub_client.deleteNode(service, nodeIdentifier)
1129
1130 def _addWatch(self, service_s, node, profile_key):
1131 """watch modifications on a node
1132
1133 This method should only be called from bridge
1134 """
1135 client = self.host.get_client(profile_key)
1136 service = jid.JID(service_s) if service_s else client.jid.userhostJID()
1137 client.pubsub_watching.add((service, node))
1138
1139 def _remove_watch(self, service_s, node, profile_key):
1140 """remove a node watch
1141
1142 This method should only be called from bridge
1143 """
1144 client = self.host.get_client(profile_key)
1145 service = jid.JID(service_s) if service_s else client.jid.userhostJID()
1146 client.pubsub_watching.remove((service, node))
1147
1148 def _retract_item(
1149 self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key
1150 ):
1151 return self._retract_items(
1152 service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key
1153 )
1154
1155 def _retract_items(
1156 self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key
1157 ):
1158 client = self.host.get_client(profile_key)
1159 return self.retract_items(
1160 client,
1161 jid.JID(service_s) if service_s else None,
1162 nodeIdentifier,
1163 itemIdentifiers,
1164 notify,
1165 )
1166
1167 def retract_items(
1168 self,
1169 client: SatXMPPClient,
1170 service: jid.JID,
1171 nodeIdentifier: str,
1172 itemIdentifiers: Iterable[str],
1173 notify: bool = True,
1174 ) -> defer.Deferred:
1175 return client.pubsub_client.retractItems(
1176 service, nodeIdentifier, itemIdentifiers, notify=notify
1177 )
1178
1179 def _rename_item(
1180 self,
1181 service,
1182 node,
1183 item_id,
1184 new_id,
1185 profile_key=C.PROF_KEY_NONE,
1186 ):
1187 client = self.host.get_client(profile_key)
1188 service = jid.JID(service) if service else None
1189 return defer.ensureDeferred(self.rename_item(
1190 client, service, node, item_id, new_id
1191 ))
1192
1193 async def rename_item(
1194 self,
1195 client: SatXMPPEntity,
1196 service: Optional[jid.JID],
1197 node: str,
1198 item_id: str,
1199 new_id: str
1200 ) -> None:
1201 """Rename an item by recreating it then deleting it
1202
1203 we have to recreate then delete because there is currently no rename operation
1204 with PubSub
1205 """
1206 if not item_id or not new_id:
1207 raise ValueError("item_id and new_id must not be empty")
1208 # retract must be done last, so if something goes wrong, the exception will stop
1209 # the workflow and no accidental delete should happen
1210 item_elt = (await self.get_items(client, service, node, item_ids=[item_id]))[0][0]
1211 await self.send_item(client, service, node, item_elt.firstChildElement(), new_id)
1212 await self.retract_items(client, service, node, [item_id])
1213
1214 def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
1215 client = self.host.get_client(profile_key)
1216 service = None if not service else jid.JID(service)
1217 d = defer.ensureDeferred(
1218 self.subscribe(
1219 client,
1220 service,
1221 nodeIdentifier,
1222 options=data_format.deserialise(options)
1223 )
1224 )
1225 d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
1226 return d
1227
1228 async def subscribe(
1229 self,
1230 client: SatXMPPEntity,
1231 service: Optional[jid.JID],
1232 nodeIdentifier: str,
1233 sub_jid: Optional[jid.JID] = None,
1234 options: Optional[dict] = None
1235 ) -> pubsub.Subscription:
1236 # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
1237 if service is None:
1238 service = client.jid.userhostJID()
1239 cont, trigger_sub = await self.host.trigger.async_return_point(
1240 "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options,
1241 )
1242 if not cont:
1243 return trigger_sub
1244 try:
1245 subscription = await client.pubsub_client.subscribe(
1246 service, nodeIdentifier, sub_jid or client.jid.userhostJID(),
1247 options=options, sender=client.jid.userhostJID()
1248 )
1249 except error.StanzaError as e:
1250 if e.condition == 'item-not-found':
1251 raise exceptions.NotFound(e.text or e.condition)
1252 else:
1253 raise e
1254 return subscription
1255
1256 def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
1257 client = self.host.get_client(profile_key)
1258 service = None if not service else jid.JID(service)
1259 return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier))
1260
1261 async def unsubscribe(
1262 self,
1263 client: SatXMPPEntity,
1264 service: jid.JID,
1265 nodeIdentifier: str,
1266 sub_jid: Optional[jid.JID] = None,
1267 subscriptionIdentifier: Optional[str] = None,
1268 sender: Optional[jid.JID] = None,
1269 ) -> None:
1270 if not await self.host.trigger.async_point(
1271 "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid,
1272 subscriptionIdentifier, sender
1273 ):
1274 return
1275 try:
1276 await client.pubsub_client.unsubscribe(
1277 service,
1278 nodeIdentifier,
1279 sub_jid or client.jid.userhostJID(),
1280 subscriptionIdentifier,
1281 sender,
1282 )
1283 except error.StanzaError as e:
1284 try:
1285 next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed"))
1286 except StopIteration:
1287 raise e
1288 else:
1289 log.info(
1290 f"{sender.full() if sender else client.jid.full()} was not "
1291 f"subscribed to node {nodeIdentifier!s} at {service.full()}"
1292 )
1293
1294 @utils.ensure_deferred
1295 async def _subscriptions(
1296 self,
1297 service="",
1298 nodeIdentifier="",
1299 profile_key=C.PROF_KEY_NONE
1300 ) -> str:
1301 client = self.host.get_client(profile_key)
1302 service = None if not service else jid.JID(service)
1303 subs = await self.subscriptions(client, service, nodeIdentifier or None)
1304 return data_format.serialise(subs)
1305
1306 async def subscriptions(
1307 self,
1308 client: SatXMPPEntity,
1309 service: Optional[jid.JID] = None,
1310 node: Optional[str] = None
1311 ) -> List[Dict[str, Union[str, bool]]]:
1312 """Retrieve subscriptions from a service
1313
1314 @param service(jid.JID): PubSub service
1315 @param nodeIdentifier(unicode, None): node to check
1316 None to get all subscriptions
1317 """
1318 cont, ret = await self.host.trigger.async_return_point(
1319 "XEP-0060_subscriptions", client, service, node
1320 )
1321 if not cont:
1322 return ret
1323 subs = await client.pubsub_client.subscriptions(service, node)
1324 ret = []
1325 for sub in subs:
1326 sub_dict = {
1327 "service": service.host if service else client.jid.host,
1328 "node": sub.nodeIdentifier,
1329 "subscriber": sub.subscriber.full(),
1330 "state": sub.state,
1331 }
1332 if sub.subscriptionIdentifier is not None:
1333 sub_dict["id"] = sub.subscriptionIdentifier
1334 ret.append(sub_dict)
1335 return ret
1336
1337 ## misc tools ##
1338
1339 def get_node_uri(self, service, node, item=None):
1340 """Return XMPP URI of a PubSub node
1341
1342 @param service(jid.JID): PubSub service
1343 @param node(unicode): node
1344 @return (unicode): URI of the node
1345 """
1346 # FIXME: deprecated, use sat.tools.common.uri instead
1347 assert service is not None
1348 # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122)
1349 # use ";" as a separator. So if more than one value is used in query_data,
1350 # urlencode MUST NOT BE USED.
1351 query_data = [("node", node.encode("utf-8"))]
1352 if item is not None:
1353 query_data.append(("item", item.encode("utf-8")))
1354 return "xmpp:{service}?;{query}".format(
1355 service=service.userhost(), query=urllib.parse.urlencode(query_data)
1356 )
1357
1358 ## methods to manage several stanzas/jids at once ##
1359
1360 # generic #
1361
1362 def get_rt_results(
1363 self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
1364 ):
1365 return self.rt_sessions.get_results(session_id, on_success, on_error, profile)
1366
1367 def trans_items_data(self, items_data, item_cb=lambda item: item.toXml()):
1368 """Helper method to transform result from [get_items]
1369
1370 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
1371 as returned by [get_items].
1372 @param items_data(tuple): tuple returned by [get_items]
1373 @param item_cb(callable): method to transform each item
1374 @return (tuple): a serialised form ready to go throught bridge
1375 """
1376 items, metadata = items_data
1377 items = [item_cb(item) for item in items]
1378
1379 return (items, metadata)
1380
1381 def trans_items_data_d(self, items_data, item_cb):
1382 """Helper method to transform result from [get_items], deferred version
1383
1384 the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
1385 as returned by [get_items]. metadata values are then casted to unicode and
1386 each item is passed to items_cb.
1387 An errback is added to item_cb, and when it is fired the value is filtered from
1388 final items
1389 @param items_data(tuple): tuple returned by [get_items]
1390 @param item_cb(callable): method to transform each item (must return a deferred)
1391 @return (tuple): a deferred which fire a dict which can be serialised to go
1392 throught bridge
1393 """
1394 items, metadata = items_data
1395
1396 def eb(failure_):
1397 log.warning(f"Error while parsing item: {failure_.value}")
1398
1399 d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])
1400 d.addCallback(lambda parsed_items: (
1401 [i for i in parsed_items if i is not None],
1402 metadata
1403 ))
1404 return d
1405
1406 def ser_d_list(self, results, failure_result=None):
1407 """Serialise a DeferredList result
1408
1409 @param results: DeferredList results
1410 @param failure_result: value to use as value for failed Deferred
1411 (default: empty tuple)
1412 @return (list): list with:
1413 - failure: empty in case of success, else error message
1414 - result
1415 """
1416 if failure_result is None:
1417 failure_result = ()
1418 return [
1419 ("", result)
1420 if success
1421 else (str(result.result) or UNSPECIFIED, failure_result)
1422 for success, result in results
1423 ]
1424
1425 # subscribe #
1426
1427 @utils.ensure_deferred
1428 async def _get_node_subscriptions(
1429 self,
1430 service: str,
1431 node: str,
1432 profile_key: str
1433 ) -> Dict[str, str]:
1434 client = self.host.get_client(profile_key)
1435 subs = await self.get_node_subscriptions(
1436 client, jid.JID(service) if service else None, node
1437 )
1438 return {j.full(): a for j, a in subs.items()}
1439
1440 async def get_node_subscriptions(
1441 self,
1442 client: SatXMPPEntity,
1443 service: Optional[jid.JID],
1444 nodeIdentifier: str
1445 ) -> Dict[jid.JID, str]:
1446 """Retrieve subscriptions to a node
1447
1448 @param nodeIdentifier(unicode): node to get subscriptions from
1449 """
1450 if not nodeIdentifier:
1451 raise exceptions.DataError("node identifier can't be empty")
1452 request = pubsub.PubSubRequest("subscriptionsGet")
1453 request.recipient = service
1454 request.nodeIdentifier = nodeIdentifier
1455
1456 iq_elt = await request.send(client.xmlstream)
1457 try:
1458 subscriptions_elt = next(
1459 iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "subscriptions")
1460 )
1461 except StopIteration:
1462 raise ValueError(
1463 _("Invalid result: missing <subscriptions> element: {}").format(
1464 iq_elt.toXml
1465 )
1466 )
1467 except AttributeError as e:
1468 raise ValueError(_("Invalid result: {}").format(e))
1469 try:
1470 return {
1471 jid.JID(s["jid"]): s["subscription"]
1472 for s in subscriptions_elt.elements(
1473 (pubsub.NS_PUBSUB, "subscription")
1474 )
1475 }
1476 except KeyError:
1477 raise ValueError(
1478 _("Invalid result: bad <subscription> element: {}").format(
1479 iq_elt.toXml
1480 )
1481 )
1482
1483 def _set_node_subscriptions(
1484 self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
1485 ):
1486 client = self.host.get_client(profile_key)
1487 subscriptions = {
1488 jid.JID(jid_): subscription
1489 for jid_, subscription in subscriptions.items()
1490 }
1491 d = self.set_node_subscriptions(
1492 client,
1493 jid.JID(service_s) if service_s else None,
1494 nodeIdentifier,
1495 subscriptions,
1496 )
1497 return d
1498
1499 def set_node_subscriptions(self, client, service, nodeIdentifier, subscriptions):
1500 """Set or update subscriptions of a node owned by profile
1501
1502 @param subscriptions(dict[jid.JID, unicode]): subscriptions to set
1503 check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions
1504 """
1505 request = pubsub.PubSubRequest("subscriptionsSet")
1506 request.recipient = service
1507 request.nodeIdentifier = nodeIdentifier
1508 request.subscriptions = {
1509 pubsub.Subscription(nodeIdentifier, jid_, state)
1510 for jid_, state in subscriptions.items()
1511 }
1512 d = request.send(client.xmlstream)
1513 return d
1514
1515 def _many_subscribe_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
1516 """Get real-time results for subcribeToManu session
1517
1518 @param session_id: id of the real-time deferred session
1519 @param return (tuple): (remaining, results) where:
1520 - remaining is the number of still expected results
1521 - results is a list of tuple(unicode, unicode, bool, unicode) with:
1522 - service: pubsub service
1523 - and node: pubsub node
1524 - failure(unicode): empty string in case of success, error message else
1525 @param profile_key: %(doc_profile_key)s
1526 """
1527 profile = self.host.get_client(profile_key).profile
1528 d = self.rt_sessions.get_results(
1529 session_id,
1530 on_success=lambda result: "",
1531 on_error=lambda failure: str(failure.value),
1532 profile=profile,
1533 )
1534 # we need to convert jid.JID to unicode with full() to serialise it for the bridge
1535 d.addCallback(
1536 lambda ret: (
1537 ret[0],
1538 [
1539 (service.full(), node, "" if success else failure or UNSPECIFIED)
1540 for (service, node), (success, failure) in ret[1].items()
1541 ],
1542 )
1543 )
1544 return d
1545
1546 def _subscribe_to_many(
1547 self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE
1548 ):
1549 return self.subscribe_to_many(
1550 [(jid.JID(service), str(node)) for service, node in node_data],
1551 jid.JID(subscriber),
1552 options,
1553 profile_key,
1554 )
1555
1556 def subscribe_to_many(
1557 self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE
1558 ):
1559 """Subscribe to several nodes at once.
1560
1561 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
1562 - service (jid.JID) is the pubsub service
1563 - node (unicode) is the node to subscribe to
1564 @param subscriber (jid.JID): optional subscription identifier.
1565 @param options (dict): subscription options
1566 @param profile_key (str): %(doc_profile_key)s
1567 @return (str): RT Deferred session id
1568 """
1569 client = self.host.get_client(profile_key)
1570 deferreds = {}
1571 for service, node in node_data:
1572 deferreds[(service, node)] = defer.ensureDeferred(
1573 client.pubsub_client.subscribe(
1574 service, node, subscriber, options=options
1575 )
1576 )
1577 return self.rt_sessions.new_session(deferreds, client.profile)
1578 # found_nodes = yield self.listNodes(service, profile=client.profile)
1579 # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
1580 # d_list = []
1581 # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
1582 # if nodeIdentifier not in found_nodes:
1583 # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
1584 # continue # avoid sat-pubsub "SubscriptionExists" error
1585 # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
1586 # defer.returnValue(d_list)
1587
1588 # get #
1589
1590 def _get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
1591 """Get real-time results for get_from_many session
1592
1593 @param session_id: id of the real-time deferred session
1594 @param profile_key: %(doc_profile_key)s
1595 @param return (tuple): (remaining, results) where:
1596 - remaining is the number of still expected results
1597 - results is a list of tuple with
1598 - service (unicode): pubsub service
1599 - node (unicode): pubsub node
1600 - failure (unicode): empty string in case of success, error message else
1601 - items (list[s]): raw XML of items
1602 - metadata(dict): serialised metadata
1603 """
1604 profile = self.host.get_client(profile_key).profile
1605 d = self.rt_sessions.get_results(
1606 session_id,
1607 on_success=lambda result: ("", self.trans_items_data(result)),
1608 on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})),
1609 profile=profile,
1610 )
1611 d.addCallback(
1612 lambda ret: (
1613 ret[0],
1614 [
1615 (service.full(), node, failure, items, metadata)
1616 for (service, node), (success, (failure, (items, metadata))) in ret[
1617 1
1618 ].items()
1619 ],
1620 )
1621 )
1622 return d
1623
1624 def _get_from_many(
1625 self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE
1626 ):
1627 """
1628 @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
1629 """
1630 max_item = None if max_item == C.NO_LIMIT else max_item
1631 extra = self.parse_extra(data_format.deserialise(extra))
1632 return self.get_from_many(
1633 [(jid.JID(service), str(node)) for service, node in node_data],
1634 max_item,
1635 extra.rsm_request,
1636 extra.extra,
1637 profile_key,
1638 )
1639
1640 def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None,
1641 profile_key=C.PROF_KEY_NONE):
1642 """Get items from many nodes at once
1643
1644 @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
1645 - service (jid.JID) is the pubsub service
1646 - node (unicode) is the node to get items from
1647 @param max_items (int): optional limit on the number of retrieved items.
1648 @param rsm_request (RSMRequest): RSM request data
1649 @param profile_key (unicode): %(doc_profile_key)s
1650 @return (str): RT Deferred session id
1651 """
1652 client = self.host.get_client(profile_key)
1653 deferreds = {}
1654 for service, node in node_data:
1655 deferreds[(service, node)] = defer.ensureDeferred(self.get_items(
1656 client, service, node, max_item, rsm_request=rsm_request, extra=extra
1657 ))
1658 return self.rt_sessions.new_session(deferreds, client.profile)
1659
1660
1661 @implementer(disco.IDisco)
1662 class SatPubSubClient(rsm.PubSubClient):
1663
1664 def __init__(self, host, parent_plugin):
1665 self.host = host
1666 self.parent_plugin = parent_plugin
1667 rsm.PubSubClient.__init__(self)
1668
1669 def connectionInitialized(self):
1670 rsm.PubSubClient.connectionInitialized(self)
1671
1672 async def items(
1673 self,
1674 service: Optional[jid.JID],
1675 nodeIdentifier: str,
1676 maxItems: Optional[int] = None,
1677 subscriptionIdentifier: Optional[str] = None,
1678 sender: Optional[jid.JID] = None,
1679 itemIdentifiers: Optional[Set[str]] = None,
1680 orderBy: Optional[List[str]] = None,
1681 rsm_request: Optional[rsm.RSMRequest] = None,
1682 extra: Optional[Dict[str, Any]] = None,
1683 ):
1684 if extra is None:
1685 extra = {}
1686 items, rsm_response = await super().items(
1687 service, nodeIdentifier, maxItems, subscriptionIdentifier, sender,
1688 itemIdentifiers, orderBy, rsm_request
1689 )
1690 # items must be returned, thus this async point can't stop the workflow (but it
1691 # can modify returned items)
1692 await self.host.trigger.async_point(
1693 "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response,
1694 extra
1695 )
1696 return items, rsm_response
1697
1698 def _get_node_callbacks(self, node, event):
1699 """Generate callbacks from given node and event
1700
1701 @param node(unicode): node used for the item
1702 any registered node which prefix the node will match
1703 @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE
1704 @return (iterator[callable]): callbacks for this node/event
1705 """
1706 for registered_node, callbacks_dict in self.parent_plugin._node_cb.items():
1707 if not node.startswith(registered_node):
1708 continue
1709 try:
1710 for callback_data in callbacks_dict[event]:
1711 yield callback_data[0]
1712 except KeyError:
1713 continue
1714
1715 async def _call_node_callbacks(self, client, event: pubsub.ItemsEvent) -> None:
1716 """Call sequencially event callbacks of a node
1717
1718 Callbacks are called sequencially and not in parallel to be sure to respect
1719 priority (notably for plugin needing to get old items before they are modified or
1720 deleted from cache).
1721 """
1722 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS):
1723 try:
1724 await utils.as_deferred(callback, client, event)
1725 except Exception as e:
1726 log.error(
1727 f"Error while running items event callback {callback}: {e}"
1728 )
1729
1730 def itemsReceived(self, event):
1731 log.debug("Pubsub items received")
1732 client = self.parent
1733 defer.ensureDeferred(self._call_node_callbacks(client, event))
1734 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1735 raw_items = [i.toXml() for i in event.items]
1736 self.host.bridge.ps_event_raw(
1737 event.sender.full(),
1738 event.nodeIdentifier,
1739 C.PS_ITEMS,
1740 raw_items,
1741 client.profile,
1742 )
1743
1744 def deleteReceived(self, event):
1745 log.debug(("Publish node deleted"))
1746 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE):
1747 d = utils.as_deferred(callback, self.parent, event)
1748 d.addErrback(lambda f: log.error(
1749 f"Error while running delete event callback {callback}: {f}"
1750 ))
1751 client = self.parent
1752 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1753 self.host.bridge.ps_event_raw(
1754 event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
1755 )
1756
1757 def purgeReceived(self, event):
1758 log.debug(("Publish node purged"))
1759 for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE):
1760 d = utils.as_deferred(callback, self.parent, event)
1761 d.addErrback(lambda f: log.error(
1762 f"Error while running purge event callback {callback}: {f}"
1763 ))
1764 client = self.parent
1765 if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
1766 self.host.bridge.ps_event_raw(
1767 event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile
1768 )
1769
1770 def subscriptions(self, service, nodeIdentifier, sender=None):
1771 """Return the list of subscriptions to the given service and node.
1772
1773 @param service: The publish subscribe service to retrieve the subscriptions from.
1774 @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
1775 @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
1776 @type nodeIdentifier: C{unicode}
1777 @return (list[pubsub.Subscription]): list of subscriptions
1778 """
1779 request = pubsub.PubSubRequest("subscriptions")
1780 request.recipient = service
1781 request.nodeIdentifier = nodeIdentifier
1782 request.sender = sender
1783 d = request.send(self.xmlstream)
1784
1785 def cb(iq):
1786 subs = []
1787 for subscription_elt in iq.pubsub.subscriptions.elements(
1788 pubsub.NS_PUBSUB, "subscription"
1789 ):
1790 subscription = pubsub.Subscription(
1791 subscription_elt["node"],
1792 jid.JID(subscription_elt["jid"]),
1793 subscription_elt["subscription"],
1794 subscriptionIdentifier=subscription_elt.getAttribute("subid"),
1795 )
1796 subs.append(subscription)
1797 return subs
1798
1799 return d.addCallback(cb)
1800
1801 def purge_node(self, service, nodeIdentifier):
1802 """Purge a node (i.e. delete all items from it)
1803
1804 @param service(jid.JID, None): service to send the item to
1805 None to use PEP
1806 @param NodeIdentifier(unicode): PubSub node to use
1807 """
1808 # TODO: propose this upstream and remove it once merged
1809 request = pubsub.PubSubRequest('purge')
1810 request.recipient = service
1811 request.nodeIdentifier = nodeIdentifier
1812 return request.send(self.xmlstream)
1813
1814 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
1815 disco_info = []
1816 self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
1817 return disco_info
1818
1819 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
1820 return []