Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0060.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0060.py@524856bd7b19 |
children | 087902fbb77a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0060.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,1820 @@ +#!/usr/bin/env python3 + +# SàT plugin for Publish-Subscribe (xep-0060) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +from collections import namedtuple +from functools import reduce +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union +import urllib.error +import urllib.parse +import urllib.request + +from twisted.internet import defer, reactor +from twisted.words.protocols.jabber import error, jid +from twisted.words.xish import domish +from wokkel import disco +from wokkel import data_form +from wokkel import pubsub +from wokkel import rsm +from wokkel import mam +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.core.xmpp import SatXMPPClient +from libervia.backend.tools import utils +from libervia.backend.tools import sat_defer +from libervia.backend.tools import xml_tools +from libervia.backend.tools.common import data_format + + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "Publish-Subscribe", + C.PI_IMPORT_NAME: "XEP-0060", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0060"], + C.PI_DEPENDENCIES: [], + C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"], + C.PI_MAIN: "XEP_0060", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""), +} + +UNSPECIFIED = "unspecified error" + + +Extra = namedtuple("Extra", ("rsm_request", "extra")) +# rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None +# extra is a potentially empty dict +TIMEOUT = 30 +# minimum features that a pubsub service must have to be selectable as default +DEFAULT_PUBSUB_MIN_FEAT = { + 'http://jabber.org/protocol/pubsub#persistent-items', + 'http://jabber.org/protocol/pubsub#publish', + 'http://jabber.org/protocol/pubsub#retract-items', +} + +class XEP_0060(object): + OPT_ACCESS_MODEL = "pubsub#access_model" + OPT_PERSIST_ITEMS = "pubsub#persist_items" + OPT_MAX_ITEMS = "pubsub#max_items" + OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads" + OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe" + OPT_NODE_TYPE = "pubsub#node_type" + OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type" + OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth" + OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed" + OPT_PUBLISH_MODEL = "pubsub#publish_model" + OPT_OVERWRITE_POLICY = "pubsub#overwrite_policy" + ACCESS_OPEN = "open" + ACCESS_PRESENCE = "presence" + ACCESS_ROSTER = "roster" + ACCESS_PUBLISHER_ROSTER = "publisher-roster" + ACCESS_AUTHORIZE = "authorize" + ACCESS_WHITELIST = "whitelist" + PUBLISH_MODEL_PUBLISHERS = "publishers" + PUBLISH_MODEL_SUBSCRIBERS = "subscribers" + PUBLISH_MODEL_OPEN = "open" + OWPOL_ORIGINAL = "original_publisher" + OWPOL_ANY_PUB = "any_publisher" + ID_SINGLETON = "current" + EXTRA_PUBLISH_OPTIONS = "publish_options" + EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met" + # extra disco needed for RSM, cf. XEP-0060 § 6.5.4 + DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm" + + def __init__(self, host): + log.info(_("PubSub plugin initialization")) + self.host = host + self._rsm = host.plugins.get("XEP-0059") + self._mam = host.plugins.get("XEP-0313") + self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) + self.rt_sessions = sat_defer.RTDeferredSessions() + host.bridge.add_method( + "ps_node_create", + ".plugin", + in_sign="ssa{ss}s", + out_sign="s", + method=self._create_node, + async_=True, + ) + host.bridge.add_method( + "ps_node_configuration_get", + ".plugin", + in_sign="sss", + out_sign="a{ss}", + method=self._get_node_configuration, + async_=True, + ) + host.bridge.add_method( + "ps_node_configuration_set", + ".plugin", + in_sign="ssa{ss}s", + out_sign="", + method=self._set_node_configuration, + async_=True, + ) + host.bridge.add_method( + "ps_node_affiliations_get", + ".plugin", + in_sign="sss", + out_sign="a{ss}", + method=self._get_node_affiliations, + async_=True, + ) + host.bridge.add_method( + "ps_node_affiliations_set", + ".plugin", + in_sign="ssa{ss}s", + out_sign="", + method=self._set_node_affiliations, + async_=True, + ) + host.bridge.add_method( + "ps_node_subscriptions_get", + ".plugin", + in_sign="sss", + out_sign="a{ss}", + method=self._get_node_subscriptions, + async_=True, + ) + host.bridge.add_method( + "ps_node_subscriptions_set", + ".plugin", + in_sign="ssa{ss}s", + out_sign="", + method=self._set_node_subscriptions, + async_=True, + ) + host.bridge.add_method( + "ps_node_purge", + ".plugin", + in_sign="sss", + out_sign="", + method=self._purge_node, + async_=True, + ) + host.bridge.add_method( + "ps_node_delete", + ".plugin", + in_sign="sss", + out_sign="", + method=self._delete_node, + async_=True, + ) + host.bridge.add_method( + "ps_node_watch_add", + ".plugin", + in_sign="sss", + out_sign="", + method=self._addWatch, + async_=False, + ) + host.bridge.add_method( + "ps_node_watch_remove", + ".plugin", + in_sign="sss", + out_sign="", + method=self._remove_watch, + async_=False, + ) + host.bridge.add_method( + "ps_affiliations_get", + ".plugin", + in_sign="sss", + out_sign="a{ss}", + method=self._get_affiliations, + async_=True, + ) + host.bridge.add_method( + "ps_items_get", + ".plugin", + in_sign="ssiassss", + out_sign="s", + method=self._get_items, + async_=True, + ) + host.bridge.add_method( + "ps_item_send", + ".plugin", + in_sign="ssssss", + out_sign="s", + method=self._send_item, + async_=True, + ) + host.bridge.add_method( + "ps_items_send", + ".plugin", + in_sign="ssasss", + out_sign="as", + method=self._send_items, + async_=True, + ) + host.bridge.add_method( + "ps_item_retract", + ".plugin", + in_sign="sssbs", + out_sign="", + method=self._retract_item, + async_=True, + ) + host.bridge.add_method( + "ps_items_retract", + ".plugin", + in_sign="ssasbs", + out_sign="", + method=self._retract_items, + async_=True, + ) + host.bridge.add_method( + "ps_item_rename", + ".plugin", + in_sign="sssss", + out_sign="", + method=self._rename_item, + async_=True, + ) + host.bridge.add_method( + "ps_subscribe", + ".plugin", + in_sign="ssss", + out_sign="s", + method=self._subscribe, + async_=True, + ) + host.bridge.add_method( + "ps_unsubscribe", + ".plugin", + in_sign="sss", + out_sign="", + method=self._unsubscribe, + async_=True, + ) + host.bridge.add_method( + "ps_subscriptions_get", + ".plugin", + in_sign="sss", + out_sign="s", + method=self._subscriptions, + async_=True, + ) + host.bridge.add_method( + "ps_subscribe_to_many", + ".plugin", + in_sign="a(ss)sa{ss}s", + out_sign="s", + method=self._subscribe_to_many, + ) + host.bridge.add_method( + "ps_get_subscribe_rt_result", + ".plugin", + in_sign="ss", + out_sign="(ua(sss))", + method=self._many_subscribe_rt_result, + async_=True, + ) + host.bridge.add_method( + "ps_get_from_many", + ".plugin", + in_sign="a(ss)iss", + out_sign="s", + method=self._get_from_many, + ) + host.bridge.add_method( + "ps_get_from_many_rt_result", + ".plugin", + in_sign="ss", + out_sign="(ua(sssasa{ss}))", + method=self._get_from_many_rt_result, + async_=True, + ) + + # high level observer method + host.bridge.add_signal( + "ps_event", ".plugin", signature="ssssss" + ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile + + # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) + host.bridge.add_signal( + "ps_event_raw", ".plugin", signature="sssass" + ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile + + def get_handler(self, client): + client.pubsub_client = SatPubSubClient(self.host, self) + return client.pubsub_client + + async def profile_connected(self, client): + client.pubsub_watching = set() + try: + client.pubsub_service = jid.JID( + self.host.memory.config_get("", "pubsub_service") + ) + except RuntimeError: + log.info( + _( + "Can't retrieve pubsub_service from conf, we'll use first one that " + "we find" + ) + ) + pubsub_services = await self.host.find_service_entities( + client, "pubsub", "service" + ) + for service_jid in pubsub_services: + infos = await self.host.memory.disco.get_infos(client, service_jid) + if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features): + continue + names = {(n or "").lower() for n in infos.identities.values()} + if "libervia pubsub service" in names: + # this is the name of Libervia's side project pubsub service, we know + # that it is a suitable default pubsub service + client.pubsub_service = service_jid + break + categories = {(i[0] or "").lower() for i in infos.identities.keys()} + if "gateway" in categories or "gateway" in names: + # we don't want to use a gateway as default pubsub service + continue + if "jabber:iq:register" in infos.features: + # may be present on gateways, and we don't want a service + # where registration is needed + continue + client.pubsub_service = service_jid + break + else: + client.pubsub_service = None + pubsub_service_str = ( + client.pubsub_service.full() if client.pubsub_service else "PEP" + ) + log.info(f"default pubsub service: {pubsub_service_str}") + + def features_get(self, profile): + try: + client = self.host.get_client(profile) + except exceptions.ProfileNotSetError: + return {} + try: + return { + "service": client.pubsub_service.full() + if client.pubsub_service is not None + else "" + } + except AttributeError: + if self.host.is_connected(profile): + log.debug("Profile is not connected, service is not checked yet") + else: + log.error("Service should be available !") + return {} + + def parse_extra(self, extra): + """Parse extra dictionnary + + used bridge's extra dictionnaries + @param extra(dict): extra data used to configure request + @return(Extra): filled Extra instance + """ + if extra is None: + rsm_request = None + extra = {} + else: + # order-by + if C.KEY_ORDER_BY in extra: + # FIXME: we temporarily manage only one level of ordering + # we need to switch to a fully serialised extra data + # to be able to encode a whole ordered list + extra[C.KEY_ORDER_BY] = [extra.pop(C.KEY_ORDER_BY)] + + # rsm + if self._rsm is None: + rsm_request = None + else: + rsm_request = self._rsm.parse_extra(extra) + + # mam + if self._mam is None: + mam_request = None + else: + mam_request = self._mam.parse_extra(extra, with_rsm=False) + + if mam_request is not None: + assert "mam" not in extra + extra["mam"] = mam_request + + return Extra(rsm_request, extra) + + def add_managed_node( + self, + node: str, + priority: int = 0, + **kwargs: Callable + ): + """Add a handler for a node + + @param node: node to monitor + all node *prefixed* with this one will be triggered + @param priority: priority of the callback. Callbacks with higher priority will be + called first. + @param **kwargs: method(s) to call when the node is found + the method must be named after PubSub constants in lower case + and suffixed with "_cb" + e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE + note: only C.PS_ITEMS and C.PS_DELETE are implemented so far + """ + assert node is not None + assert kwargs + callbacks = self._node_cb.setdefault(node, {}) + for event, cb in kwargs.items(): + event_name = event[:-3] + assert event_name in C.PS_EVENTS + cb_list = callbacks.setdefault(event_name, []) + cb_list.append((cb, priority)) + cb_list.sort(key=lambda c: c[1], reverse=True) + + def remove_managed_node(self, node, *args): + """Add a handler for a node + + @param node(unicode): node to monitor + @param *args: callback(s) to remove + """ + assert args + try: + registred_cb = self._node_cb[node] + except KeyError: + pass + else: + removed = False + for callback in args: + for event, cb_list in registred_cb.items(): + to_remove = [] + for cb in cb_list: + if cb[0] == callback: + to_remove.append(cb) + for cb in to_remove: + cb_list.remove(cb) + if not cb_list: + del registred_cb[event] + if not registred_cb: + del self._node_cb[node] + removed = True + break + + if not removed: + log.error( + f"Trying to remove inexistant callback {callback} for node {node}" + ) + + # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): + # """Retrieve the name of the nodes that are accessible on the target service. + + # @param service (JID): target service + # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) + # @param profile (str): %(doc_profile)s + # @return: deferred which fire a list of nodes + # """ + # client = self.host.get_client(profile) + # d = self.host.getDiscoItems(client, service, nodeIdentifier) + # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) + # return d + + # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): + # """Retrieve the name of the nodes to which the profile is subscribed on the target service. + + # @param service (JID): target service + # @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) + # @param filter_ (str): filter the result according to the given subscription type: + # - None: do not filter + # - 'pending': subscription has not been approved yet by the node owner + # - 'unconfigured': subscription options have not been configured yet + # - 'subscribed': subscription is complete + # @param profile (str): %(doc_profile)s + # @return: Deferred list[str] + # """ + # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) + # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) + # return d + + def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="", + profile_key=C.PROF_KEY_NONE): + client = self.host.get_client(profile_key) + service = None if not service else jid.JID(service) + payload = xml_tools.parse(payload) + extra = data_format.deserialise(extra_ser) + d = defer.ensureDeferred(self.send_item( + client, service, nodeIdentifier, payload, item_id or None, extra + )) + d.addCallback(lambda ret: ret or "") + return d + + def _send_items(self, service, nodeIdentifier, items, extra_ser=None, + profile_key=C.PROF_KEY_NONE): + client = self.host.get_client(profile_key) + service = None if not service else jid.JID(service) + try: + items = [xml_tools.parse(item) for item in items] + except Exception as e: + raise exceptions.DataError(_("Can't parse items: {msg}").format( + msg=e)) + extra = data_format.deserialise(extra_ser) + return defer.ensureDeferred(self.send_items( + client, service, nodeIdentifier, items, extra=extra + )) + + async def send_item( + self, + client: SatXMPPClient, + service: Union[jid.JID, None], + nodeIdentifier: str, + payload: domish.Element, + item_id: Optional[str] = None, + extra: Optional[Dict[str, Any]] = None + ) -> Optional[str]: + """High level method to send one item + + @param service: service to send the item to None to use PEP + @param NodeIdentifier: PubSub node to use + @param payload: payload of the item to send + @param item_id: id to use or None to create one + @param extra: extra options + @return: id of the created item + """ + assert isinstance(payload, domish.Element) + item_elt = domish.Element((pubsub.NS_PUBSUB, 'item')) + if item_id is not None: + item_elt['id'] = item_id + item_elt.addChild(payload) + published_ids = await self.send_items( + client, + service, + nodeIdentifier, + [item_elt], + extra=extra + ) + try: + return published_ids[0] + except IndexError: + return item_id + + async def send_items( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + nodeIdentifier: str, + items: List[domish.Element], + sender: Optional[jid.JID] = None, + extra: Optional[Dict[str, Any]] = None + ) -> List[str]: + """High level method to send several items at once + + @param service: service to send the item to + None to use PEP + @param NodeIdentifier: PubSub node to use + @param items: whole item elements to send, + "id" will be used if set + @param extra: extra options. Key can be: + - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5 + the dict maps option name to value(s) + - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is + failing du to failing precondition. Value can be: + * raise (default): raise the exception + * publish_without_options: re-publish without the publish-options. + A warning will be logged showing that the publish-options could not + be used + @return: ids of the created items + """ + if extra is None: + extra = {} + if service is None: + service = client.jid.userhostJID() + parsed_items = [] + for item in items: + if item.name != 'item': + raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) + item_id = item.getAttribute("id") + parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) + publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS) + try: + iq_result = await self.publish( + client, service, nodeIdentifier, parsed_items, options=publish_options, + sender=sender + ) + except error.StanzaError as e: + if ((e.condition == 'conflict' and e.appCondition + and e.appCondition.name == 'precondition-not-met' + and publish_options is not None)): + # this usually happens when publish-options can't be set + policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise') + if policy == 'raise': + raise e + elif policy == 'publish_without_options': + log.warning(_( + "Can't use publish-options ({options}) on node {node}, " + "re-publishing without them: {reason}").format( + options=', '.join(f'{k} = {v}' + for k,v in publish_options.items()), + node=nodeIdentifier, + reason=e, + ) + ) + iq_result = await self.publish( + client, service, nodeIdentifier, parsed_items) + else: + raise exceptions.InternalError( + f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: " + f"{policy}" + ) + else: + raise e + try: + return [ + item['id'] + for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item') + ] + except AttributeError: + return [] + + async def publish( + self, + client: SatXMPPEntity, + service: jid.JID, + nodeIdentifier: str, + items: Optional[List[domish.Element]] = None, + options: Optional[dict] = None, + sender: Optional[jid.JID] = None, + extra: Optional[Dict[str, Any]] = None + ) -> domish.Element: + """Publish pubsub items + + @param sender: sender of the request, + client.jid will be used if nto set + @param extra: extra data + not used directly by ``publish``, but may be used in triggers + @return: IQ result stanza + @trigger XEP-0060_publish: called just before publication. + if it returns False, extra must have a "iq_result_elt" key set with + domish.Element to return. + """ + if sender is None: + sender = client.jid + if extra is None: + extra = {} + if not await self.host.trigger.async_point( + "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender, + extra + ): + return extra["iq_result_elt"] + iq_result_elt = await client.pubsub_client.publish( + service, nodeIdentifier, items, sender, + options=options + ) + return iq_result_elt + + def _unwrap_mam_message(self, message_elt): + try: + item_elt = reduce( + lambda elt, ns_name: next(elt.elements(*ns_name)), + (message_elt, + (mam.NS_MAM, "result"), + (C.NS_FORWARD, "forwarded"), + (C.NS_CLIENT, "message"), + ("http://jabber.org/protocol/pubsub#event", "event"), + ("http://jabber.org/protocol/pubsub#event", "items"), + ("http://jabber.org/protocol/pubsub#event", "item"), + )) + except StopIteration: + raise exceptions.DataError("Can't find Item in MAM message element") + return item_elt + + def serialise_items(self, items_data): + items, metadata = items_data + metadata['items'] = items + return data_format.serialise(metadata) + + def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None, + extra="", profile_key=C.PROF_KEY_NONE): + """Get items from pubsub node + + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit + """ + client = self.host.get_client(profile_key) + service = jid.JID(service) if service else None + max_items = None if max_items == C.NO_LIMIT else max_items + extra = self.parse_extra(data_format.deserialise(extra)) + d = defer.ensureDeferred(self.get_items( + client, + service, + node, + max_items, + item_ids, + sub_id or None, + extra.rsm_request, + extra.extra, + )) + d.addCallback(self.trans_items_data) + d.addCallback(self.serialise_items) + return d + + async def get_items( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + node: str, + max_items: Optional[int] = None, + item_ids: Optional[List[str]] = None, + sub_id: Optional[str] = None, + rsm_request: Optional[rsm.RSMRequest] = None, + extra: Optional[dict] = None + ) -> Tuple[List[dict], dict]: + """Retrieve pubsub items from a node. + + @param service (JID, None): pubsub service. + @param node (str): node id. + @param max_items (int): optional limit on the number of retrieved items. + @param item_ids (list[str]): identifiers of the items to be retrieved (can't be + used with rsm_request). If requested items don't exist, they won't be + returned, meaning that we can have an empty list as result (NotFound + exception is NOT raised). + @param sub_id (str): optional subscription identifier. + @param rsm_request (rsm.RSMRequest): RSM request data + @return: a deferred couple (list[dict], dict) containing: + - list of items + - metadata with the following keys: + - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index + value of RSMResponse + - service, node: service and node used + """ + if item_ids and max_items is not None: + max_items = None + if rsm_request and item_ids: + raise ValueError("items_id can't be used with rsm") + if extra is None: + extra = {} + cont, ret = await self.host.trigger.async_return_point( + "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id, + rsm_request, extra + ) + if not cont: + return ret + try: + mam_query = extra["mam"] + except KeyError: + d = defer.ensureDeferred(client.pubsub_client.items( + service = service, + nodeIdentifier = node, + maxItems = max_items, + subscriptionIdentifier = sub_id, + sender = None, + itemIdentifiers = item_ids, + orderBy = extra.get(C.KEY_ORDER_BY), + rsm_request = rsm_request, + extra = extra + )) + # we have no MAM data here, so we add None + d.addErrback(sat_defer.stanza_2_not_found) + d.addTimeout(TIMEOUT, reactor) + items, rsm_response = await d + mam_response = None + else: + # if mam is requested, we have to do a totally different query + if self._mam is None: + raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") + if max_items is not None: + raise exceptions.DataError("max_items parameter can't be used with MAM") + if item_ids: + raise exceptions.DataError("items_ids parameter can't be used with MAM") + if mam_query.node is None: + mam_query.node = node + elif mam_query.node != node: + raise exceptions.DataError( + "MAM query node is incoherent with get_items's node" + ) + if mam_query.rsm is None: + mam_query.rsm = rsm_request + else: + if mam_query.rsm != rsm_request: + raise exceptions.DataError( + "Conflict between RSM request and MAM's RSM request" + ) + items, rsm_response, mam_response = await self._mam.get_archives( + client, mam_query, service, self._unwrap_mam_message + ) + + try: + subscribe = C.bool(extra["subscribe"]) + except KeyError: + subscribe = False + + if subscribe: + try: + await self.subscribe(client, service, node) + except error.StanzaError as e: + log.warning( + f"Could not subscribe to node {node} on service {service}: {e}" + ) + + # TODO: handle mam_response + service_jid = service if service else client.jid.userhostJID() + metadata = { + "service": service_jid, + "node": node, + "uri": self.get_node_uri(service_jid, node), + } + if mam_response is not None: + # mam_response is a dict with "complete" and "stable" keys + # we can put them directly in metadata + metadata.update(mam_response) + if rsm_request is not None and rsm_response is not None: + metadata['rsm'] = rsm_response.toDict() + if mam_response is None: + index = rsm_response.index + count = rsm_response.count + if index is None or count is None: + # we don't have enough information to know if the data is complete + # or not + metadata["complete"] = None + else: + # normally we have a strict equality here but XEP-0059 states + # that index MAY be approximative, so just in case… + metadata["complete"] = index + len(items) >= count + # encrypted metadata can be added by plugins in XEP-0060_items trigger + if "encrypted" in extra: + metadata["encrypted"] = extra["encrypted"] + + return (items, metadata) + + # @defer.inlineCallbacks + # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): + # """Massively retrieve pubsub items from many nodes. + # @param service (JID): target service. + # @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. + # @param max_items (int): optional limit on the number of retrieved items *per node*. + # @param sub_id (str): optional subscription identifier. + # @param rsm (dict): RSM request data + # @param profile_key (str): %(doc_profile_key)s + # @return: a deferred dict with: + # - key: a value in (a subset of) data.keys() + # - couple (list[dict], dict) containing: + # - list of items + # - RSM response data + # """ + # client = self.host.get_client(profile_key) + # found_nodes = yield self.listNodes(service, profile=client.profile) + # d_dict = {} + # for publisher, node in data.items(): + # if node not in found_nodes: + # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) + # continue # avoid pubsub "item-not-found" error + # d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile) + # defer.returnValue(d_dict) + + def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, + profile_key=C.PROF_KEY_NONE): + client = self.host.get_client(profile_key) + return client.pubsub_client.getOptions( + service, nodeIdentifier, subscriber, subscriptionIdentifier + ) + + def setOptions(self, service, nodeIdentifier, subscriber, options, + subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): + client = self.host.get_client(profile_key) + return client.pubsub_client.setOptions( + service, nodeIdentifier, subscriber, options, subscriptionIdentifier + ) + + def _create_node(self, service_s, nodeIdentifier, options, profile_key): + client = self.host.get_client(profile_key) + return self.createNode( + client, jid.JID(service_s) if service_s else None, nodeIdentifier, options + ) + + def createNode( + self, + client: SatXMPPClient, + service: jid.JID, + nodeIdentifier: Optional[str] = None, + options: Optional[Dict[str, str]] = None + ) -> str: + """Create a new node + + @param service: PubSub service, + @param NodeIdentifier: node name use None to create instant node (identifier will + be returned by this method) + @param option: node configuration options + @return: identifier of the created node (may be different from requested name) + """ + # TODO: if pubsub service doesn't hande publish-options, configure it in a second time + return client.pubsub_client.createNode(service, nodeIdentifier, options) + + @defer.inlineCallbacks + def create_if_new_node(self, client, service, nodeIdentifier, options=None): + """Helper method similar to createNode, but will not fail in case of conflict""" + try: + yield self.createNode(client, service, nodeIdentifier, options) + except error.StanzaError as e: + if e.condition == "conflict": + pass + else: + raise e + + def _get_node_configuration(self, service_s, nodeIdentifier, profile_key): + client = self.host.get_client(profile_key) + d = self.getConfiguration( + client, jid.JID(service_s) if service_s else None, nodeIdentifier + ) + + def serialize(form): + # FIXME: better more generic dataform serialisation should be available in SàT + return {f.var: str(f.value) for f in list(form.fields.values())} + + d.addCallback(serialize) + return d + + def getConfiguration(self, client, service, nodeIdentifier): + request = pubsub.PubSubRequest("configureGet") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + + def cb(iq): + form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG) + form.typeCheck() + return form + + d = request.send(client.xmlstream) + d.addCallback(cb) + return d + + def make_configuration_form(self, options: dict) -> data_form.Form: + """Build a configuration form""" + form = data_form.Form( + formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG + ) + form.makeFields(options) + return form + + def _set_node_configuration(self, service_s, nodeIdentifier, options, profile_key): + client = self.host.get_client(profile_key) + d = self.setConfiguration( + client, jid.JID(service_s) if service_s else None, nodeIdentifier, options + ) + return d + + def setConfiguration(self, client, service, nodeIdentifier, options): + request = pubsub.PubSubRequest("configureSet") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + + form = self.make_configuration_form(options) + request.options = form + + d = request.send(client.xmlstream) + return d + + def _get_affiliations(self, service_s, nodeIdentifier, profile_key): + client = self.host.get_client(profile_key) + d = self.get_affiliations( + client, jid.JID(service_s) if service_s else None, nodeIdentifier or None + ) + return d + + def get_affiliations(self, client, service, nodeIdentifier=None): + """Retrieve affiliations of an entity + + @param nodeIdentifier(unicode, None): node to get affiliation from + None to get all nodes affiliations for this service + """ + request = pubsub.PubSubRequest("affiliations") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + + def cb(iq_elt): + try: + affiliations_elt = next( + iq_elt.pubsub.elements(pubsub.NS_PUBSUB, "affiliations") + ) + except StopIteration: + raise ValueError( + _("Invalid result: missing <affiliations> element: {}").format( + iq_elt.toXml + ) + ) + try: + return { + e["node"]: e["affiliation"] + for e in affiliations_elt.elements(pubsub.NS_PUBSUB, "affiliation") + } + except KeyError: + raise ValueError( + _("Invalid result: bad <affiliation> element: {}").format( + iq_elt.toXml + ) + ) + + d = request.send(client.xmlstream) + d.addCallback(cb) + return d + + def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key): + client = self.host.get_client(profile_key) + d = self.get_node_affiliations( + client, jid.JID(service_s) if service_s else None, nodeIdentifier + ) + d.addCallback( + lambda affiliations: {j.full(): a for j, a in affiliations.items()} + ) + return d + + def get_node_affiliations(self, client, service, nodeIdentifier): + """Retrieve affiliations of a node owned by profile""" + request = pubsub.PubSubRequest("affiliationsGet") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + + def cb(iq_elt): + try: + affiliations_elt = next( + iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "affiliations") + ) + except StopIteration: + raise ValueError( + _("Invalid result: missing <affiliations> element: {}").format( + iq_elt.toXml + ) + ) + try: + return { + jid.JID(e["jid"]): e["affiliation"] + for e in affiliations_elt.elements( + (pubsub.NS_PUBSUB_OWNER, "affiliation") + ) + } + except KeyError: + raise ValueError( + _("Invalid result: bad <affiliation> element: {}").format( + iq_elt.toXml + ) + ) + + d = request.send(client.xmlstream) + d.addCallback(cb) + return d + + def _set_node_affiliations( + self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE + ): + client = self.host.get_client(profile_key) + affiliations = { + jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items() + } + d = self.set_node_affiliations( + client, + jid.JID(service_s) if service_s else None, + nodeIdentifier, + affiliations, + ) + return d + + def set_node_affiliations(self, client, service, nodeIdentifier, affiliations): + """Update affiliations of a node owned by profile + + @param affiliations(dict[jid.JID, unicode]): affiliations to set + check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations + """ + request = pubsub.PubSubRequest("affiliationsSet") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.affiliations = affiliations + d = request.send(client.xmlstream) + return d + + def _purge_node(self, service_s, nodeIdentifier, profile_key): + client = self.host.get_client(profile_key) + return self.purge_node( + client, jid.JID(service_s) if service_s else None, nodeIdentifier + ) + + def purge_node(self, client, service, nodeIdentifier): + return client.pubsub_client.purge_node(service, nodeIdentifier) + + def _delete_node(self, service_s, nodeIdentifier, profile_key): + client = self.host.get_client(profile_key) + return self.deleteNode( + client, jid.JID(service_s) if service_s else None, nodeIdentifier + ) + + def deleteNode( + self, + client: SatXMPPClient, + service: jid.JID, + nodeIdentifier: str + ) -> defer.Deferred: + return client.pubsub_client.deleteNode(service, nodeIdentifier) + + def _addWatch(self, service_s, node, profile_key): + """watch modifications on a node + + This method should only be called from bridge + """ + client = self.host.get_client(profile_key) + service = jid.JID(service_s) if service_s else client.jid.userhostJID() + client.pubsub_watching.add((service, node)) + + def _remove_watch(self, service_s, node, profile_key): + """remove a node watch + + This method should only be called from bridge + """ + client = self.host.get_client(profile_key) + service = jid.JID(service_s) if service_s else client.jid.userhostJID() + client.pubsub_watching.remove((service, node)) + + def _retract_item( + self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key + ): + return self._retract_items( + service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key + ) + + def _retract_items( + self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key + ): + client = self.host.get_client(profile_key) + return self.retract_items( + client, + jid.JID(service_s) if service_s else None, + nodeIdentifier, + itemIdentifiers, + notify, + ) + + def retract_items( + self, + client: SatXMPPClient, + service: jid.JID, + nodeIdentifier: str, + itemIdentifiers: Iterable[str], + notify: bool = True, + ) -> defer.Deferred: + return client.pubsub_client.retractItems( + service, nodeIdentifier, itemIdentifiers, notify=notify + ) + + def _rename_item( + self, + service, + node, + item_id, + new_id, + profile_key=C.PROF_KEY_NONE, + ): + client = self.host.get_client(profile_key) + service = jid.JID(service) if service else None + return defer.ensureDeferred(self.rename_item( + client, service, node, item_id, new_id + )) + + async def rename_item( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + node: str, + item_id: str, + new_id: str + ) -> None: + """Rename an item by recreating it then deleting it + + we have to recreate then delete because there is currently no rename operation + with PubSub + """ + if not item_id or not new_id: + raise ValueError("item_id and new_id must not be empty") + # retract must be done last, so if something goes wrong, the exception will stop + # the workflow and no accidental delete should happen + item_elt = (await self.get_items(client, service, node, item_ids=[item_id]))[0][0] + await self.send_item(client, service, node, item_elt.firstChildElement(), new_id) + await self.retract_items(client, service, node, [item_id]) + + def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): + client = self.host.get_client(profile_key) + service = None if not service else jid.JID(service) + d = defer.ensureDeferred( + self.subscribe( + client, + service, + nodeIdentifier, + options=data_format.deserialise(options) + ) + ) + d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") + return d + + async def subscribe( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + nodeIdentifier: str, + sub_jid: Optional[jid.JID] = None, + options: Optional[dict] = None + ) -> pubsub.Subscription: + # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe + if service is None: + service = client.jid.userhostJID() + cont, trigger_sub = await self.host.trigger.async_return_point( + "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options, + ) + if not cont: + return trigger_sub + try: + subscription = await client.pubsub_client.subscribe( + service, nodeIdentifier, sub_jid or client.jid.userhostJID(), + options=options, sender=client.jid.userhostJID() + ) + except error.StanzaError as e: + if e.condition == 'item-not-found': + raise exceptions.NotFound(e.text or e.condition) + else: + raise e + return subscription + + def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): + client = self.host.get_client(profile_key) + service = None if not service else jid.JID(service) + return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier)) + + async def unsubscribe( + self, + client: SatXMPPEntity, + service: jid.JID, + nodeIdentifier: str, + sub_jid: Optional[jid.JID] = None, + subscriptionIdentifier: Optional[str] = None, + sender: Optional[jid.JID] = None, + ) -> None: + if not await self.host.trigger.async_point( + "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid, + subscriptionIdentifier, sender + ): + return + try: + await client.pubsub_client.unsubscribe( + service, + nodeIdentifier, + sub_jid or client.jid.userhostJID(), + subscriptionIdentifier, + sender, + ) + except error.StanzaError as e: + try: + next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed")) + except StopIteration: + raise e + else: + log.info( + f"{sender.full() if sender else client.jid.full()} was not " + f"subscribed to node {nodeIdentifier!s} at {service.full()}" + ) + + @utils.ensure_deferred + async def _subscriptions( + self, + service="", + nodeIdentifier="", + profile_key=C.PROF_KEY_NONE + ) -> str: + client = self.host.get_client(profile_key) + service = None if not service else jid.JID(service) + subs = await self.subscriptions(client, service, nodeIdentifier or None) + return data_format.serialise(subs) + + async def subscriptions( + self, + client: SatXMPPEntity, + service: Optional[jid.JID] = None, + node: Optional[str] = None + ) -> List[Dict[str, Union[str, bool]]]: + """Retrieve subscriptions from a service + + @param service(jid.JID): PubSub service + @param nodeIdentifier(unicode, None): node to check + None to get all subscriptions + """ + cont, ret = await self.host.trigger.async_return_point( + "XEP-0060_subscriptions", client, service, node + ) + if not cont: + return ret + subs = await client.pubsub_client.subscriptions(service, node) + ret = [] + for sub in subs: + sub_dict = { + "service": service.host if service else client.jid.host, + "node": sub.nodeIdentifier, + "subscriber": sub.subscriber.full(), + "state": sub.state, + } + if sub.subscriptionIdentifier is not None: + sub_dict["id"] = sub.subscriptionIdentifier + ret.append(sub_dict) + return ret + + ## misc tools ## + + def get_node_uri(self, service, node, item=None): + """Return XMPP URI of a PubSub node + + @param service(jid.JID): PubSub service + @param node(unicode): node + @return (unicode): URI of the node + """ + # FIXME: deprecated, use sat.tools.common.uri instead + assert service is not None + # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122) + # use ";" as a separator. So if more than one value is used in query_data, + # urlencode MUST NOT BE USED. + query_data = [("node", node.encode("utf-8"))] + if item is not None: + query_data.append(("item", item.encode("utf-8"))) + return "xmpp:{service}?;{query}".format( + service=service.userhost(), query=urllib.parse.urlencode(query_data) + ) + + ## methods to manage several stanzas/jids at once ## + + # generic # + + def get_rt_results( + self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE + ): + return self.rt_sessions.get_results(session_id, on_success, on_error, profile) + + def trans_items_data(self, items_data, item_cb=lambda item: item.toXml()): + """Helper method to transform result from [get_items] + + the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) + as returned by [get_items]. + @param items_data(tuple): tuple returned by [get_items] + @param item_cb(callable): method to transform each item + @return (tuple): a serialised form ready to go throught bridge + """ + items, metadata = items_data + items = [item_cb(item) for item in items] + + return (items, metadata) + + def trans_items_data_d(self, items_data, item_cb): + """Helper method to transform result from [get_items], deferred version + + the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) + as returned by [get_items]. metadata values are then casted to unicode and + each item is passed to items_cb. + An errback is added to item_cb, and when it is fired the value is filtered from + final items + @param items_data(tuple): tuple returned by [get_items] + @param item_cb(callable): method to transform each item (must return a deferred) + @return (tuple): a deferred which fire a dict which can be serialised to go + throught bridge + """ + items, metadata = items_data + + def eb(failure_): + log.warning(f"Error while parsing item: {failure_.value}") + + d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) + d.addCallback(lambda parsed_items: ( + [i for i in parsed_items if i is not None], + metadata + )) + return d + + def ser_d_list(self, results, failure_result=None): + """Serialise a DeferredList result + + @param results: DeferredList results + @param failure_result: value to use as value for failed Deferred + (default: empty tuple) + @return (list): list with: + - failure: empty in case of success, else error message + - result + """ + if failure_result is None: + failure_result = () + return [ + ("", result) + if success + else (str(result.result) or UNSPECIFIED, failure_result) + for success, result in results + ] + + # subscribe # + + @utils.ensure_deferred + async def _get_node_subscriptions( + self, + service: str, + node: str, + profile_key: str + ) -> Dict[str, str]: + client = self.host.get_client(profile_key) + subs = await self.get_node_subscriptions( + client, jid.JID(service) if service else None, node + ) + return {j.full(): a for j, a in subs.items()} + + async def get_node_subscriptions( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + nodeIdentifier: str + ) -> Dict[jid.JID, str]: + """Retrieve subscriptions to a node + + @param nodeIdentifier(unicode): node to get subscriptions from + """ + if not nodeIdentifier: + raise exceptions.DataError("node identifier can't be empty") + request = pubsub.PubSubRequest("subscriptionsGet") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + + iq_elt = await request.send(client.xmlstream) + try: + subscriptions_elt = next( + iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "subscriptions") + ) + except StopIteration: + raise ValueError( + _("Invalid result: missing <subscriptions> element: {}").format( + iq_elt.toXml + ) + ) + except AttributeError as e: + raise ValueError(_("Invalid result: {}").format(e)) + try: + return { + jid.JID(s["jid"]): s["subscription"] + for s in subscriptions_elt.elements( + (pubsub.NS_PUBSUB, "subscription") + ) + } + except KeyError: + raise ValueError( + _("Invalid result: bad <subscription> element: {}").format( + iq_elt.toXml + ) + ) + + def _set_node_subscriptions( + self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE + ): + client = self.host.get_client(profile_key) + subscriptions = { + jid.JID(jid_): subscription + for jid_, subscription in subscriptions.items() + } + d = self.set_node_subscriptions( + client, + jid.JID(service_s) if service_s else None, + nodeIdentifier, + subscriptions, + ) + return d + + def set_node_subscriptions(self, client, service, nodeIdentifier, subscriptions): + """Set or update subscriptions of a node owned by profile + + @param subscriptions(dict[jid.JID, unicode]): subscriptions to set + check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions + """ + request = pubsub.PubSubRequest("subscriptionsSet") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriptions = { + pubsub.Subscription(nodeIdentifier, jid_, state) + for jid_, state in subscriptions.items() + } + d = request.send(client.xmlstream) + return d + + def _many_subscribe_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): + """Get real-time results for subcribeToManu session + + @param session_id: id of the real-time deferred session + @param return (tuple): (remaining, results) where: + - remaining is the number of still expected results + - results is a list of tuple(unicode, unicode, bool, unicode) with: + - service: pubsub service + - and node: pubsub node + - failure(unicode): empty string in case of success, error message else + @param profile_key: %(doc_profile_key)s + """ + profile = self.host.get_client(profile_key).profile + d = self.rt_sessions.get_results( + session_id, + on_success=lambda result: "", + on_error=lambda failure: str(failure.value), + profile=profile, + ) + # we need to convert jid.JID to unicode with full() to serialise it for the bridge + d.addCallback( + lambda ret: ( + ret[0], + [ + (service.full(), node, "" if success else failure or UNSPECIFIED) + for (service, node), (success, failure) in ret[1].items() + ], + ) + ) + return d + + def _subscribe_to_many( + self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE + ): + return self.subscribe_to_many( + [(jid.JID(service), str(node)) for service, node in node_data], + jid.JID(subscriber), + options, + profile_key, + ) + + def subscribe_to_many( + self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE + ): + """Subscribe to several nodes at once. + + @param node_data (iterable[tuple]): iterable of tuple (service, node) where: + - service (jid.JID) is the pubsub service + - node (unicode) is the node to subscribe to + @param subscriber (jid.JID): optional subscription identifier. + @param options (dict): subscription options + @param profile_key (str): %(doc_profile_key)s + @return (str): RT Deferred session id + """ + client = self.host.get_client(profile_key) + deferreds = {} + for service, node in node_data: + deferreds[(service, node)] = defer.ensureDeferred( + client.pubsub_client.subscribe( + service, node, subscriber, options=options + ) + ) + return self.rt_sessions.new_session(deferreds, client.profile) + # found_nodes = yield self.listNodes(service, profile=client.profile) + # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) + # d_list = [] + # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): + # if nodeIdentifier not in found_nodes: + # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) + # continue # avoid sat-pubsub "SubscriptionExists" error + # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) + # defer.returnValue(d_list) + + # get # + + def _get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): + """Get real-time results for get_from_many session + + @param session_id: id of the real-time deferred session + @param profile_key: %(doc_profile_key)s + @param return (tuple): (remaining, results) where: + - remaining is the number of still expected results + - results is a list of tuple with + - service (unicode): pubsub service + - node (unicode): pubsub node + - failure (unicode): empty string in case of success, error message else + - items (list[s]): raw XML of items + - metadata(dict): serialised metadata + """ + profile = self.host.get_client(profile_key).profile + d = self.rt_sessions.get_results( + session_id, + on_success=lambda result: ("", self.trans_items_data(result)), + on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})), + profile=profile, + ) + d.addCallback( + lambda ret: ( + ret[0], + [ + (service.full(), node, failure, items, metadata) + for (service, node), (success, (failure, (items, metadata))) in ret[ + 1 + ].items() + ], + ) + ) + return d + + def _get_from_many( + self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE + ): + """ + @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit + """ + max_item = None if max_item == C.NO_LIMIT else max_item + extra = self.parse_extra(data_format.deserialise(extra)) + return self.get_from_many( + [(jid.JID(service), str(node)) for service, node in node_data], + max_item, + extra.rsm_request, + extra.extra, + profile_key, + ) + + def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None, + profile_key=C.PROF_KEY_NONE): + """Get items from many nodes at once + + @param node_data (iterable[tuple]): iterable of tuple (service, node) where: + - service (jid.JID) is the pubsub service + - node (unicode) is the node to get items from + @param max_items (int): optional limit on the number of retrieved items. + @param rsm_request (RSMRequest): RSM request data + @param profile_key (unicode): %(doc_profile_key)s + @return (str): RT Deferred session id + """ + client = self.host.get_client(profile_key) + deferreds = {} + for service, node in node_data: + deferreds[(service, node)] = defer.ensureDeferred(self.get_items( + client, service, node, max_item, rsm_request=rsm_request, extra=extra + )) + return self.rt_sessions.new_session(deferreds, client.profile) + + +@implementer(disco.IDisco) +class SatPubSubClient(rsm.PubSubClient): + + def __init__(self, host, parent_plugin): + self.host = host + self.parent_plugin = parent_plugin + rsm.PubSubClient.__init__(self) + + def connectionInitialized(self): + rsm.PubSubClient.connectionInitialized(self) + + async def items( + self, + service: Optional[jid.JID], + nodeIdentifier: str, + maxItems: Optional[int] = None, + subscriptionIdentifier: Optional[str] = None, + sender: Optional[jid.JID] = None, + itemIdentifiers: Optional[Set[str]] = None, + orderBy: Optional[List[str]] = None, + rsm_request: Optional[rsm.RSMRequest] = None, + extra: Optional[Dict[str, Any]] = None, + ): + if extra is None: + extra = {} + items, rsm_response = await super().items( + service, nodeIdentifier, maxItems, subscriptionIdentifier, sender, + itemIdentifiers, orderBy, rsm_request + ) + # items must be returned, thus this async point can't stop the workflow (but it + # can modify returned items) + await self.host.trigger.async_point( + "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response, + extra + ) + return items, rsm_response + + def _get_node_callbacks(self, node, event): + """Generate callbacks from given node and event + + @param node(unicode): node used for the item + any registered node which prefix the node will match + @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE + @return (iterator[callable]): callbacks for this node/event + """ + for registered_node, callbacks_dict in self.parent_plugin._node_cb.items(): + if not node.startswith(registered_node): + continue + try: + for callback_data in callbacks_dict[event]: + yield callback_data[0] + except KeyError: + continue + + async def _call_node_callbacks(self, client, event: pubsub.ItemsEvent) -> None: + """Call sequencially event callbacks of a node + + Callbacks are called sequencially and not in parallel to be sure to respect + priority (notably for plugin needing to get old items before they are modified or + deleted from cache). + """ + for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS): + try: + await utils.as_deferred(callback, client, event) + except Exception as e: + log.error( + f"Error while running items event callback {callback}: {e}" + ) + + def itemsReceived(self, event): + log.debug("Pubsub items received") + client = self.parent + defer.ensureDeferred(self._call_node_callbacks(client, event)) + if (event.sender, event.nodeIdentifier) in client.pubsub_watching: + raw_items = [i.toXml() for i in event.items] + self.host.bridge.ps_event_raw( + event.sender.full(), + event.nodeIdentifier, + C.PS_ITEMS, + raw_items, + client.profile, + ) + + def deleteReceived(self, event): + log.debug(("Publish node deleted")) + for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE): + d = utils.as_deferred(callback, self.parent, event) + d.addErrback(lambda f: log.error( + f"Error while running delete event callback {callback}: {f}" + )) + client = self.parent + if (event.sender, event.nodeIdentifier) in client.pubsub_watching: + self.host.bridge.ps_event_raw( + event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile + ) + + def purgeReceived(self, event): + log.debug(("Publish node purged")) + for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE): + d = utils.as_deferred(callback, self.parent, event) + d.addErrback(lambda f: log.error( + f"Error while running purge event callback {callback}: {f}" + )) + client = self.parent + if (event.sender, event.nodeIdentifier) in client.pubsub_watching: + self.host.bridge.ps_event_raw( + event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile + ) + + def subscriptions(self, service, nodeIdentifier, sender=None): + """Return the list of subscriptions to the given service and node. + + @param service: The publish subscribe service to retrieve the subscriptions from. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). + @type nodeIdentifier: C{unicode} + @return (list[pubsub.Subscription]): list of subscriptions + """ + request = pubsub.PubSubRequest("subscriptions") + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.sender = sender + d = request.send(self.xmlstream) + + def cb(iq): + subs = [] + for subscription_elt in iq.pubsub.subscriptions.elements( + pubsub.NS_PUBSUB, "subscription" + ): + subscription = pubsub.Subscription( + subscription_elt["node"], + jid.JID(subscription_elt["jid"]), + subscription_elt["subscription"], + subscriptionIdentifier=subscription_elt.getAttribute("subid"), + ) + subs.append(subscription) + return subs + + return d.addCallback(cb) + + def purge_node(self, service, nodeIdentifier): + """Purge a node (i.e. delete all items from it) + + @param service(jid.JID, None): service to send the item to + None to use PEP + @param NodeIdentifier(unicode): PubSub node to use + """ + # TODO: propose this upstream and remove it once merged + request = pubsub.PubSubRequest('purge') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + return request.send(self.xmlstream) + + def getDiscoInfo(self, requestor, service, nodeIdentifier=""): + disco_info = [] + self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) + return disco_info + + def getDiscoItems(self, requestor, service, nodeIdentifier=""): + return []