view libervia/backend/plugins/plugin_xep_0060.py @ 4334:111dce64dcb5

plugins XEP-0300, XEP-0446, XEP-0447, XEP0448 and others: Refactoring to use Pydantic: Pydantic models are used more and more in Libervia, for the bridge API, and also to convert `domish.Element` to internal representation. Type hints have also been added in many places. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:12:38 +0100
parents 554a87ae17a6
children
line wrap: on
line source

#!/usr/bin/env python3

# SàT plugin for Publish-Subscribe (xep-0060)
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


from collections import namedtuple
from functools import reduce
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
import urllib.error
import urllib.parse
import urllib.request

from twisted.internet import defer, reactor
from twisted.words.protocols.jabber import error, jid
from twisted.words.xish import domish
from wokkel import disco
from wokkel import data_form
from wokkel import pubsub
from wokkel import rsm
from wokkel import mam
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.xmpp import SatXMPPClient
from libervia.backend.tools import utils
from libervia.backend.tools import sat_defer
from libervia.backend.tools import xml_tools
from libervia.backend.tools.common import data_format


log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "Publish-Subscribe",
    C.PI_IMPORT_NAME: "XEP-0060",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0060"],
    C.PI_DEPENDENCIES: [],
    C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"],
    C.PI_MAIN: "XEP_0060",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""),
}

UNSPECIFIED = "unspecified error"


Extra = namedtuple("Extra", ("rsm_request", "extra"))
# rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None
# extra is a potentially empty dict
TIMEOUT = 30
# minimum features that a pubsub service must have to be selectable as default
DEFAULT_PUBSUB_MIN_FEAT = {
    "http://jabber.org/protocol/pubsub#persistent-items",
    "http://jabber.org/protocol/pubsub#publish",
    "http://jabber.org/protocol/pubsub#retract-items",
}


class XEP_0060(object):
    OPT_ACCESS_MODEL = "pubsub#access_model"
    OPT_PERSIST_ITEMS = "pubsub#persist_items"
    OPT_MAX_ITEMS = "pubsub#max_items"
    OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads"
    OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe"
    OPT_NODE_TYPE = "pubsub#node_type"
    OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type"
    OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth"
    OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed"
    OPT_PUBLISH_MODEL = "pubsub#publish_model"
    OPT_OVERWRITE_POLICY = "pubsub#overwrite_policy"
    ACCESS_OPEN = "open"
    ACCESS_PRESENCE = "presence"
    ACCESS_ROSTER = "roster"
    ACCESS_PUBLISHER_ROSTER = "publisher-roster"
    ACCESS_AUTHORIZE = "authorize"
    ACCESS_WHITELIST = "whitelist"
    PUBLISH_MODEL_PUBLISHERS = "publishers"
    PUBLISH_MODEL_SUBSCRIBERS = "subscribers"
    PUBLISH_MODEL_OPEN = "open"
    OWPOL_ORIGINAL = "original_publisher"
    OWPOL_ANY_PUB = "any_publisher"
    ID_SINGLETON = "current"
    EXTRA_PUBLISH_OPTIONS = "publish_options"
    EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met"
    EXTRA_AUTOCREATE = "autocreate"
    # extra disco needed for RSM, cf. XEP-0060 § 6.5.4
    DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm"

    def __init__(self, host):
        log.info(_("PubSub plugin initialization"))
        self.host = host
        self._rsm = host.plugins.get("XEP-0059")
        self._mam = host.plugins.get("XEP-0313")
        self._node_cb = (
            {}
        )  # dictionnary of callbacks for node (key: node, value: list of callbacks)
        self.rt_sessions = sat_defer.RTDeferredSessions()
        host.bridge.add_method(
            "ps_node_create",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="s",
            method=self._create_node,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_configuration_get",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._get_node_configuration,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_configuration_set",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="",
            method=self._set_node_configuration,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_affiliations_get",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._get_node_affiliations,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_affiliations_set",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="",
            method=self._set_node_affiliations,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_subscriptions_get",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._get_node_subscriptions,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_subscriptions_set",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="",
            method=self._set_node_subscriptions,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_purge",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._purge_node,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_delete",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._delete_node,
            async_=True,
        )
        host.bridge.add_method(
            "ps_node_watch_add",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._addWatch,
            async_=False,
        )
        host.bridge.add_method(
            "ps_node_watch_remove",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._remove_watch,
            async_=False,
        )
        host.bridge.add_method(
            "ps_affiliations_get",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._get_affiliations,
            async_=True,
        )
        host.bridge.add_method(
            "ps_items_get",
            ".plugin",
            in_sign="ssiassss",
            out_sign="s",
            method=self._get_items,
            async_=True,
        )
        host.bridge.add_method(
            "ps_item_send",
            ".plugin",
            in_sign="ssssss",
            out_sign="s",
            method=self._send_item,
            async_=True,
        )
        host.bridge.add_method(
            "ps_items_send",
            ".plugin",
            in_sign="ssasss",
            out_sign="as",
            method=self._send_items,
            async_=True,
        )
        host.bridge.add_method(
            "ps_item_retract",
            ".plugin",
            in_sign="sssbs",
            out_sign="",
            method=self._retract_item,
            async_=True,
        )
        host.bridge.add_method(
            "ps_items_retract",
            ".plugin",
            in_sign="ssasbs",
            out_sign="",
            method=self._retract_items,
            async_=True,
        )
        host.bridge.add_method(
            "ps_item_rename",
            ".plugin",
            in_sign="sssss",
            out_sign="",
            method=self._rename_item,
            async_=True,
        )
        host.bridge.add_method(
            "ps_subscribe",
            ".plugin",
            in_sign="ssss",
            out_sign="s",
            method=self._subscribe,
            async_=True,
        )
        host.bridge.add_method(
            "ps_unsubscribe",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._unsubscribe,
            async_=True,
        )
        host.bridge.add_method(
            "ps_subscriptions_get",
            ".plugin",
            in_sign="sss",
            out_sign="s",
            method=self._subscriptions,
            async_=True,
        )
        host.bridge.add_method(
            "ps_subscribe_to_many",
            ".plugin",
            in_sign="a(ss)sa{ss}s",
            out_sign="s",
            method=self._subscribe_to_many,
        )
        host.bridge.add_method(
            "ps_get_subscribe_rt_result",
            ".plugin",
            in_sign="ss",
            out_sign="(ua(sss))",
            method=self._many_subscribe_rt_result,
            async_=True,
        )
        host.bridge.add_method(
            "ps_get_from_many",
            ".plugin",
            in_sign="a(ss)iss",
            out_sign="s",
            method=self._get_from_many,
        )
        host.bridge.add_method(
            "ps_get_from_many_rt_result",
            ".plugin",
            in_sign="ss",
            out_sign="(ua(sssasa{ss}))",
            method=self._get_from_many_rt_result,
            async_=True,
        )

        #  high level observer method
        host.bridge.add_signal(
            "ps_event", ".plugin", signature="ssssss"
        )  # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile

        # low level observer method, used if service/node is in watching list (see psNodeWatch* methods)
        host.bridge.add_signal(
            "ps_event_raw", ".plugin", signature="sssass"
        )  # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile

    def get_handler(self, client):
        client.pubsub_client = SatPubSubClient(self.host, self)
        return client.pubsub_client

    def profile_connecting(self, client):
        client.pubsub_watching = set()

    async def profile_connected(self, client):
        try:
            client.pubsub_service = jid.JID(
                self.host.memory.config_get("", "pubsub_service")
            )
        except RuntimeError:
            log.info(
                _(
                    "Can't retrieve pubsub_service from conf, we'll use first one that "
                    "we find"
                )
            )
            pubsub_services = await self.host.find_service_entities(
                client, "pubsub", "service"
            )
            for service_jid in pubsub_services:
                infos = await self.host.memory.disco.get_infos(client, service_jid)
                if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features):
                    continue
                names = {(n or "").lower() for n in infos.identities.values()}
                if "libervia pubsub service" in names:
                    # this is the name of Libervia's side project pubsub service, we know
                    # that it is a suitable default pubsub service
                    client.pubsub_service = service_jid
                    break
                categories = {(i[0] or "").lower() for i in infos.identities.keys()}
                if "gateway" in categories or "gateway" in names:
                    # we don't want to use a gateway as default pubsub service
                    continue
                if "jabber:iq:register" in infos.features:
                    # may be present on gateways, and we don't want a service
                    # where registration is needed
                    continue
                client.pubsub_service = service_jid
                break
            else:
                client.pubsub_service = None
            pubsub_service_str = (
                client.pubsub_service.full() if client.pubsub_service else "PEP"
            )
            log.info(f"default pubsub service: {pubsub_service_str}")

    def features_get(self, profile):
        try:
            client = self.host.get_client(profile)
        except exceptions.ProfileNotSetError:
            return {}
        try:
            return {
                "service": (
                    client.pubsub_service.full()
                    if client.pubsub_service is not None
                    else ""
                )
            }
        except AttributeError:
            if self.host.is_connected(profile):
                log.debug("Profile is not connected, service is not checked yet")
            else:
                log.error("Service should be available !")
            return {}

    def parse_extra(self, extra):
        """Parse extra dictionnary

        used bridge's extra dictionnaries
        @param extra(dict): extra data used to configure request
        @return(Extra): filled Extra instance
        """
        if extra is None:
            rsm_request = None
            extra = {}
        else:
            # order-by
            if C.KEY_ORDER_BY in extra:
                # FIXME: we temporarily manage only one level of ordering
                #        we need to switch to a fully serialised extra data
                #        to be able to encode a whole ordered list
                extra[C.KEY_ORDER_BY] = [extra.pop(C.KEY_ORDER_BY)]

            # rsm
            if self._rsm is None:
                rsm_request = None
            else:
                rsm_request = self._rsm.parse_extra(extra)

            # mam
            if self._mam is None:
                mam_request = None
            else:
                mam_request = self._mam.parse_extra(extra, with_rsm=False)

            if mam_request is not None:
                assert "mam" not in extra
                extra["mam"] = mam_request

        return Extra(rsm_request, extra)

    def add_managed_node(self, node: str, priority: int = 0, **kwargs: Callable):
        """Add a handler for a node

        @param node: node to monitor
            all node *prefixed* with this one will be triggered
        @param priority: priority of the callback. Callbacks with higher priority will be
            called first.
        @param **kwargs: method(s) to call when the node is found
            the method must be named after PubSub constants in lower case
            and suffixed with "_cb"
            e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE
            note: only C.PS_ITEMS and C.PS_DELETE are implemented so far
        """
        assert node is not None
        assert kwargs
        callbacks = self._node_cb.setdefault(node, {})
        for event, cb in kwargs.items():
            event_name = event[:-3]
            assert event_name in C.PS_EVENTS
            cb_list = callbacks.setdefault(event_name, [])
            cb_list.append((cb, priority))
            cb_list.sort(key=lambda c: c[1], reverse=True)

    def remove_managed_node(self, node, *args):
        """Add a handler for a node

        @param node(unicode): node to monitor
        @param *args: callback(s) to remove
        """
        assert args
        try:
            registred_cb = self._node_cb[node]
        except KeyError:
            pass
        else:
            removed = False
            for callback in args:
                for event, cb_list in registred_cb.items():
                    to_remove = []
                    for cb in cb_list:
                        if cb[0] == callback:
                            to_remove.append(cb)
                            for cb in to_remove:
                                cb_list.remove(cb)
                            if not cb_list:
                                del registred_cb[event]
                            if not registred_cb:
                                del self._node_cb[node]
                            removed = True
                            break

            if not removed:
                log.error(
                    f"Trying to remove inexistant callback {callback} for node {node}"
                )

    # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
    #     """Retrieve the name of the nodes that are accessible on the target service.

    #     @param service (JID): target service
    #     @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
    #     @param profile (str): %(doc_profile)s
    #     @return: deferred which fire a list of nodes
    #     """
    #     client = self.host.get_client(profile)
    #     d = self.host.getDiscoItems(client, service, nodeIdentifier)
    #     d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
    #     return d

    # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
    #     """Retrieve the name of the nodes to which the profile is subscribed on the target service.

    #     @param service (JID): target service
    #     @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
    #     @param filter_ (str): filter the result according to the given subscription type:
    #         - None: do not filter
    #         - 'pending': subscription has not been approved yet by the node owner
    #         - 'unconfigured': subscription options have not been configured yet
    #         - 'subscribed': subscription is complete
    #     @param profile (str): %(doc_profile)s
    #     @return: Deferred list[str]
    #     """
    #     d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
    #     d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
    #     return d

    def _send_item(
        self,
        service,
        nodeIdentifier,
        payload,
        item_id=None,
        extra_ser="",
        profile_key=C.PROF_KEY_NONE,
    ):
        client = self.host.get_client(profile_key)
        service = None if not service else jid.JID(service)
        payload = xml_tools.parse(payload)
        extra = data_format.deserialise(extra_ser)
        d = defer.ensureDeferred(
            self.send_item(
                client, service, nodeIdentifier, payload, item_id or None, extra
            )
        )
        d.addCallback(lambda ret: ret or "")
        return d

    def _send_items(
        self, service, nodeIdentifier, items, extra_ser=None, profile_key=C.PROF_KEY_NONE
    ):
        client = self.host.get_client(profile_key)
        service = None if not service else jid.JID(service)
        try:
            items = [xml_tools.parse(item) for item in items]
        except Exception as e:
            raise exceptions.DataError(_("Can't parse items: {msg}").format(msg=e))
        extra = data_format.deserialise(extra_ser)
        return defer.ensureDeferred(
            self.send_items(client, service, nodeIdentifier, items, extra=extra)
        )

    async def send_item(
        self,
        client: SatXMPPClient,
        service: Union[jid.JID, None],
        nodeIdentifier: str,
        payload: domish.Element,
        item_id: Optional[str] = None,
        extra: Optional[Dict[str, Any]] = None,
    ) -> Optional[str]:
        """High level method to send one item

        @param service: service to send the item to None to use PEP
        @param NodeIdentifier: PubSub node to use
        @param payload: payload of the item to send
        @param item_id: id to use or None to create one
        @param extra: extra options
        @return: id of the created item
        """
        assert isinstance(payload, domish.Element)
        item_elt = domish.Element((pubsub.NS_PUBSUB, "item"))
        if item_id is not None:
            item_elt["id"] = item_id
        item_elt.addChild(payload)
        published_ids = await self.send_items(
            client, service, nodeIdentifier, [item_elt], extra=extra
        )
        try:
            return published_ids[0]
        except IndexError:
            return item_id

    async def send_items(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        nodeIdentifier: str,
        items: List[domish.Element],
        sender: Optional[jid.JID] = None,
        extra: Optional[Dict[str, Any]] = None,
    ) -> List[str]:
        """High level method to send several items at once

        @param service: service to send the item to
            None to use PEP
        @param NodeIdentifier: PubSub node to use
        @param items: whole item elements to send,
            "id" will be used if set
        @param extra: extra options. Key can be:
            - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5
                the dict maps option name to value(s)
            - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is
                failing du to failing precondition. Value can be:
                * raise (default): raise the exception
                * force: try to re-configure the node with the given option. Be sure to
                    use this option carefully, and not give public access to a node
                    which has not on purpose.
                * publish_without_options: re-publish without the publish-options.
                    A warning will be logged showing that the publish-options could not
                    be used
            - self.EXTRA_AUTOCREATE(bool): Create the node if it's not found, and the
              service doesn't do autocreate itself.
        @return: ids of the created items
        """
        if extra is None:
            extra = {}
        if service is None:
            service = client.jid.userhostJID()
        parsed_items = []
        for item in items:
            if item.name != "item":
                raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
            item_id = item.getAttribute("id")
            parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
        publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS)
        try:
            iq_result = await self.publish(
                client,
                service,
                nodeIdentifier,
                parsed_items,
                options=publish_options,
                sender=sender,
            )
        except error.StanzaError as e:
            if e.condition == "item-not-found":
                if extra.get(self.EXTRA_AUTOCREATE, False):
                    # Autocreate is requested, we create the requested node.
                    await self.createNode(
                        client, service, nodeIdentifier, publish_options
                    )
                    # And we try again.
                    iq_result = await self.publish(
                        client,
                        service,
                        nodeIdentifier,
                        parsed_items,
                        options=publish_options,
                        sender=sender,
                    )
                else:
                    raise e

            elif (
                e.condition == "conflict"
                and e.appCondition
                and e.appCondition.name == "precondition-not-met"
                and publish_options is not None
            ):
                # this usually happens when publish-options can't be set
                policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, "raise")
                if policy == "raise":
                    raise e
                elif policy == "force":
                    log.debug(
                        f"[{client.profile}] Force configuration for {nodeIdentifier!r} "
                        f"at {service}: {publish_options}"
                    )
                    await self.setConfiguration(
                        client, service, nodeIdentifier, publish_options
                    )
                    log.debug("Configuration updated.")
                    return await self.send_items(
                        client, service, nodeIdentifier, items, sender, extra
                    )

                elif policy == "publish_without_options":
                    log.warning(
                        _(
                            "Can't use publish-options ({options}) on node {node}, "
                            "re-publishing without them: {reason}"
                        ).format(
                            options=", ".join(
                                f"{k} = {v}" for k, v in publish_options.items()
                            ),
                            node=nodeIdentifier,
                            reason=e,
                        )
                    )
                    iq_result = await self.publish(
                        client, service, nodeIdentifier, parsed_items
                    )
                else:
                    raise exceptions.InternalError(
                        f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: "
                        f"{policy}"
                    )
            else:
                raise e
        try:
            return [
                item["id"]
                for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, "item")
            ]
        except AttributeError:
            return []

    async def publish(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        nodeIdentifier: str,
        items: list[pubsub.Item] | None = None,
        options: dict | None = None,
        sender: jid.JID | None = None,
        extra: dict[str, Any] | None = None,
    ) -> domish.Element:
        """Publish pubsub items

        @param sender: sender of the request,
            client.jid will be used if not set
        @param extra: extra data
            not used directly by ``publish``, but may be used in triggers
        @return: IQ result stanza
        @trigger XEP-0060_publish: called just before publication.
            if it returns False, extra must have a "iq_result_elt" key set with
            domish.Element to return.
        """
        if sender is None:
            sender = client.jid
        if extra is None:
            extra = {}
        if not await self.host.trigger.async_point(
            "XEP-0060_publish",
            client,
            service,
            nodeIdentifier,
            items,
            options,
            sender,
            extra,
        ):
            return extra["iq_result_elt"]
        iq_result_elt = await client.pubsub_client.publish(
            service, nodeIdentifier, items, sender, options=options
        )
        return iq_result_elt

    def _unwrap_mam_message(self, message_elt):
        try:
            item_elt = reduce(
                lambda elt, ns_name: next(elt.elements(*ns_name)),
                (
                    message_elt,
                    (mam.NS_MAM, "result"),
                    (C.NS_FORWARD, "forwarded"),
                    (C.NS_CLIENT, "message"),
                    ("http://jabber.org/protocol/pubsub#event", "event"),
                    ("http://jabber.org/protocol/pubsub#event", "items"),
                    ("http://jabber.org/protocol/pubsub#event", "item"),
                ),
            )
        except StopIteration:
            raise exceptions.DataError("Can't find Item in MAM message element")
        return item_elt

    def serialise_items(self, items_data):
        items, metadata = items_data
        metadata["items"] = items
        return data_format.serialise(metadata)

    def _get_items(
        self,
        service="",
        node="",
        max_items=10,
        item_ids=None,
        sub_id=None,
        extra="",
        profile_key=C.PROF_KEY_NONE,
    ):
        """Get items from pubsub node

        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        """
        client = self.host.get_client(profile_key)
        service = jid.JID(service) if service else None
        max_items = None if max_items == C.NO_LIMIT else max_items
        extra = self.parse_extra(data_format.deserialise(extra))
        d = defer.ensureDeferred(
            self.get_items(
                client,
                service,
                node,
                max_items,
                item_ids,
                sub_id or None,
                extra.rsm_request,
                extra.extra,
            )
        )
        d.addCallback(self.trans_items_data)
        d.addCallback(self.serialise_items)
        return d

    async def get_items(
        self,
        client: SatXMPPEntity,
        service: jid.JID | None,
        node: str,
        max_items: int | None = None,
        item_ids: list[str] | None = None,
        sub_id: str | None = None,
        rsm_request: rsm.RSMRequest | None = None,
        extra: dict | None = None,
    ) -> tuple[list[domish.Element], dict]:
        """Retrieve pubsub items from a node.

        @param service: pubsub service.
        @param node: node id.
        @param max_items: optional limit on the number of retrieved items.
        @param item_ids (list[str]): identifiers of the items to be retrieved (can't be
             used with rsm_request). If requested items don't exist, they won't be
             returned, meaning that we can have an empty list as result (NotFound
             exception is NOT raised).
        @param sub_id : optional subscription identifier.
        @param rsm_request: RSM request data
        @return: a deferred tuple containing:
            - list of items
            - metadata with the following keys:
                - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index
                    value of RSMResponse
                - service, node: service and node used
        """
        if item_ids and max_items is not None:
            max_items = None
        if rsm_request and item_ids:
            raise ValueError("items_id can't be used with rsm")
        if extra is None:
            extra = {}
        cont, ret = await self.host.trigger.async_return_point(
            "XEP-0060_getItems",
            client,
            service,
            node,
            max_items,
            item_ids,
            sub_id,
            rsm_request,
            extra,
        )
        if not cont:
            return ret
        try:
            mam_query = extra["mam"]
        except KeyError:
            d = defer.ensureDeferred(
                client.pubsub_client.items(
                    service=service,
                    nodeIdentifier=node,
                    maxItems=max_items,
                    subscriptionIdentifier=sub_id,
                    sender=None,
                    itemIdentifiers=item_ids,
                    orderBy=extra.get(C.KEY_ORDER_BY),
                    rsm_request=rsm_request,
                    extra=extra,
                )
            )
            # we have no MAM data here, so we add None
            d.addErrback(sat_defer.stanza_2_not_found)
            d.addTimeout(TIMEOUT, reactor)
            items, rsm_response = await d
            mam_response = None
        else:
            # if mam is requested, we have to do a totally different query
            if self._mam is None:
                raise exceptions.NotFound("MAM (XEP-0313) plugin is not available")
            if max_items is not None:
                raise exceptions.DataError("max_items parameter can't be used with MAM")
            if item_ids:
                raise exceptions.DataError("items_ids parameter can't be used with MAM")
            if mam_query.node is None:
                mam_query.node = node
            elif mam_query.node != node:
                raise exceptions.DataError(
                    "MAM query node is incoherent with get_items's node"
                )
            if mam_query.rsm is None:
                mam_query.rsm = rsm_request
            else:
                if mam_query.rsm != rsm_request:
                    raise exceptions.DataError(
                        "Conflict between RSM request and MAM's RSM request"
                    )
            items, rsm_response, mam_response = await self._mam.get_archives(
                client, mam_query, service, self._unwrap_mam_message
            )

        try:
            subscribe = C.bool(extra["subscribe"])
        except KeyError:
            subscribe = False

        if subscribe:
            try:
                await self.subscribe(client, service, node)
            except error.StanzaError as e:
                log.warning(
                    f"Could not subscribe to node {node} on service {service}: {e}"
                )

        # TODO: handle mam_response
        service_jid = service if service else client.jid.userhostJID()
        metadata = {
            "service": service_jid,
            "node": node,
            "uri": self.get_node_uri(service_jid, node),
        }
        if mam_response is not None:
            # mam_response is a dict with "complete" and "stable" keys
            # we can put them directly in metadata
            metadata.update(mam_response)
        if rsm_request is not None and rsm_response is not None:
            metadata["rsm"] = rsm_response.toDict()
            if mam_response is None:
                index = rsm_response.index
                count = rsm_response.count
                if index is None or count is None:
                    # we don't have enough information to know if the data is complete
                    # or not
                    metadata["complete"] = None
                else:
                    # normally we have a strict equality here but XEP-0059 states
                    # that index MAY be approximative, so just in case…
                    metadata["complete"] = index + len(items) >= count
        # encrypted metadata can be added by plugins in XEP-0060_items trigger
        if "encrypted" in extra:
            metadata["encrypted"] = extra["encrypted"]

        return (items, metadata)

    # @defer.inlineCallbacks
    # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
    #     """Massively retrieve pubsub items from many nodes.
    #     @param service (JID): target service.
    #     @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
    #     @param max_items (int): optional limit on the number of retrieved items *per node*.
    #     @param sub_id (str): optional subscription identifier.
    #     @param rsm (dict): RSM request data
    #     @param profile_key (str): %(doc_profile_key)s
    #     @return: a deferred dict with:
    #         - key: a value in (a subset of) data.keys()
    #         - couple (list[dict], dict) containing:
    #             - list of items
    #             - RSM response data
    #     """
    #     client = self.host.get_client(profile_key)
    #     found_nodes = yield self.listNodes(service, profile=client.profile)
    #     d_dict = {}
    #     for publisher, node in data.items():
    #         if node not in found_nodes:
    #             log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
    #             continue  # avoid pubsub "item-not-found" error
    #         d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile)
    #     defer.returnValue(d_dict)

    def getOptions(
        self,
        service,
        nodeIdentifier,
        subscriber,
        subscriptionIdentifier=None,
        profile_key=C.PROF_KEY_NONE,
    ):
        client = self.host.get_client(profile_key)
        return client.pubsub_client.getOptions(
            service, nodeIdentifier, subscriber, subscriptionIdentifier
        )

    def setOptions(
        self,
        service,
        nodeIdentifier,
        subscriber,
        options,
        subscriptionIdentifier=None,
        profile_key=C.PROF_KEY_NONE,
    ):
        client = self.host.get_client(profile_key)
        return client.pubsub_client.setOptions(
            service, nodeIdentifier, subscriber, options, subscriptionIdentifier
        )

    def _create_node(self, service_s, nodeIdentifier, options, profile_key):
        client = self.host.get_client(profile_key)
        return self.createNode(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
        )

    def createNode(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        nodeIdentifier: Optional[str] = None,
        options: Optional[Dict[str, str]] = None,
    ) -> defer.Deferred[str]:
        """Create a new node

        @param service: PubSub service,
        @param NodeIdentifier: node name use None to create instant node (identifier will
            be returned by this method)
        @param option: node configuration options
        @return: identifier of the created node (may be different from requested name)
        """
        # TODO: if pubsub service doesn't hande publish-options, configure it in a second time
        return client.pubsub_client.createNode(service, nodeIdentifier, options)

    @defer.inlineCallbacks
    def create_if_new_node(self, client, service, nodeIdentifier, options=None):
        """Helper method similar to createNode, but will not fail in case of conflict"""
        try:
            yield self.createNode(client, service, nodeIdentifier, options)
        except error.StanzaError as e:
            if e.condition == "conflict":
                pass
            else:
                raise e

    def _get_node_configuration(self, service_s, nodeIdentifier, profile_key):
        client = self.host.get_client(profile_key)
        d = self.getConfiguration(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )

        def serialize(form):
            # FIXME: better more generic dataform serialisation should be available in SàT
            return {f.var: str(f.value) for f in list(form.fields.values())}

        d.addCallback(serialize)
        return d

    def getConfiguration(self, client, service, nodeIdentifier):
        request = pubsub.PubSubRequest("configureGet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        def cb(iq):
            form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG)
            form.typeCheck()
            return form

        d = request.send(client.xmlstream)
        d.addCallback(cb)
        return d

    def make_configuration_form(self, options: dict) -> data_form.Form:
        """Build a configuration form"""
        form = data_form.Form(
            formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG
        )
        form.makeFields(options)
        return form

    def _set_node_configuration(self, service_s, nodeIdentifier, options, profile_key):
        client = self.host.get_client(profile_key)
        d = self.setConfiguration(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
        )
        return d

    def setConfiguration(self, client, service, nodeIdentifier, options):
        request = pubsub.PubSubRequest("configureSet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        form = self.make_configuration_form(options)
        request.options = form

        d = request.send(client.xmlstream)
        return d

    def _get_affiliations(self, service_s, nodeIdentifier, profile_key):
        client = self.host.get_client(profile_key)
        d = self.get_affiliations(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier or None
        )
        return d

    def get_affiliations(self, client, service, nodeIdentifier=None):
        """Retrieve affiliations of an entity

        @param nodeIdentifier(unicode, None): node to get affiliation from
            None to get all nodes affiliations for this service
        """
        request = pubsub.PubSubRequest("affiliations")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        def cb(iq_elt):
            try:
                affiliations_elt = next(
                    iq_elt.pubsub.elements(pubsub.NS_PUBSUB, "affiliations")
                )
            except StopIteration:
                raise ValueError(
                    _("Invalid result: missing <affiliations> element: {}").format(
                        iq_elt.toXml
                    )
                )
            try:
                return {
                    e["node"]: e["affiliation"]
                    for e in affiliations_elt.elements(pubsub.NS_PUBSUB, "affiliation")
                }
            except KeyError:
                raise ValueError(
                    _("Invalid result: bad <affiliation> element: {}").format(
                        iq_elt.toXml
                    )
                )

        d = request.send(client.xmlstream)
        d.addCallback(cb)
        return d

    def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key):
        client = self.host.get_client(profile_key)
        d = self.get_node_affiliations(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )
        d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.items()})
        return d

    def get_node_affiliations(self, client, service, nodeIdentifier):
        """Retrieve affiliations of a node owned by profile"""
        request = pubsub.PubSubRequest("affiliationsGet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        def cb(iq_elt):
            try:
                affiliations_elt = next(
                    iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "affiliations")
                )
            except StopIteration:
                raise ValueError(
                    _("Invalid result: missing <affiliations> element: {}").format(
                        iq_elt.toXml
                    )
                )
            try:
                return {
                    jid.JID(e["jid"]): e["affiliation"]
                    for e in affiliations_elt.elements(
                        (pubsub.NS_PUBSUB_OWNER, "affiliation")
                    )
                }
            except KeyError:
                raise ValueError(
                    _("Invalid result: bad <affiliation> element: {}").format(
                        iq_elt.toXml
                    )
                )

        d = request.send(client.xmlstream)
        d.addCallback(cb)
        return d

    def _set_node_affiliations(
        self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE
    ):
        client = self.host.get_client(profile_key)
        affiliations = {
            jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items()
        }
        d = self.set_node_affiliations(
            client,
            jid.JID(service_s) if service_s else None,
            nodeIdentifier,
            affiliations,
        )
        return d

    def set_node_affiliations(self, client, service, nodeIdentifier, affiliations):
        """Update affiliations of a node owned by profile

        @param affiliations(dict[jid.JID, unicode]): affiliations to set
            check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations
        """
        request = pubsub.PubSubRequest("affiliationsSet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.affiliations = affiliations
        d = request.send(client.xmlstream)
        return d

    def _purge_node(self, service_s, nodeIdentifier, profile_key):
        client = self.host.get_client(profile_key)
        return self.purge_node(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )

    def purge_node(self, client, service, nodeIdentifier):
        return client.pubsub_client.purge_node(service, nodeIdentifier)

    def _delete_node(self, service_s, nodeIdentifier, profile_key):
        client = self.host.get_client(profile_key)
        return self.deleteNode(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )

    def deleteNode(
        self, client: SatXMPPClient, service: jid.JID, nodeIdentifier: str
    ) -> defer.Deferred:
        return client.pubsub_client.deleteNode(service, nodeIdentifier)

    def _addWatch(self, service_s, node, profile_key):
        """watch modifications on a node

        This method should only be called from bridge
        """
        client = self.host.get_client(profile_key)
        service = jid.JID(service_s) if service_s else client.jid.userhostJID()
        client.pubsub_watching.add((service, node))

    def _remove_watch(self, service_s, node, profile_key):
        """remove a node watch

        This method should only be called from bridge
        """
        client = self.host.get_client(profile_key)
        service = jid.JID(service_s) if service_s else client.jid.userhostJID()
        client.pubsub_watching.remove((service, node))

    def _retract_item(
        self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key
    ):
        return self._retract_items(
            service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key
        )

    def _retract_items(
        self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key
    ):
        client = self.host.get_client(profile_key)
        return self.retract_items(
            client,
            jid.JID(service_s) if service_s else None,
            nodeIdentifier,
            itemIdentifiers,
            notify,
        )

    def retract_items(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        nodeIdentifier: str,
        itemIdentifiers: Iterable[str],
        notify: bool = True,
    ) -> defer.Deferred:
        return client.pubsub_client.retractItems(
            service, nodeIdentifier, itemIdentifiers, notify=notify
        )

    def _rename_item(
        self,
        service,
        node,
        item_id,
        new_id,
        profile_key=C.PROF_KEY_NONE,
    ):
        client = self.host.get_client(profile_key)
        service = jid.JID(service) if service else None
        return defer.ensureDeferred(
            self.rename_item(client, service, node, item_id, new_id)
        )

    async def rename_item(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        node: str,
        item_id: str,
        new_id: str,
    ) -> None:
        """Rename an item by recreating it then deleting it

        we have to recreate then delete because there is currently no rename operation
        with PubSub
        """
        if not item_id or not new_id:
            raise ValueError("item_id and new_id must not be empty")
        # retract must be done last, so if something goes wrong, the exception will stop
        # the workflow and no accidental delete should happen
        item_elt = (await self.get_items(client, service, node, item_ids=[item_id]))[0][0]
        await self.send_item(client, service, node, item_elt.firstChildElement(), new_id)
        await self.retract_items(client, service, node, [item_id])

    def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
        client = self.host.get_client(profile_key)
        service = None if not service else jid.JID(service)
        d = defer.ensureDeferred(
            self.subscribe(
                client, service, nodeIdentifier, options=data_format.deserialise(options)
            )
        )
        d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
        return d

    async def subscribe(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID],
        nodeIdentifier: str,
        sub_jid: Optional[jid.JID] = None,
        options: Optional[dict] = None,
    ) -> pubsub.Subscription:
        # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
        if service is None:
            service = client.jid.userhostJID()
        cont, trigger_sub = await self.host.trigger.async_return_point(
            "XEP-0060_subscribe",
            client,
            service,
            nodeIdentifier,
            sub_jid,
            options,
        )
        if not cont:
            return trigger_sub
        try:
            subscription = await client.pubsub_client.subscribe(
                service,
                nodeIdentifier,
                sub_jid or client.jid.userhostJID(),
                options=options,
                sender=client.jid.userhostJID(),
            )
        except error.StanzaError as e:
            if e.condition == "item-not-found":
                raise exceptions.NotFound(e.text or e.condition)
            else:
                raise e
        return subscription

    def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
        client = self.host.get_client(profile_key)
        service = None if not service else jid.JID(service)
        return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier))

    async def unsubscribe(
        self,
        client: SatXMPPEntity,
        service: jid.JID,
        nodeIdentifier: str,
        sub_jid: Optional[jid.JID] = None,
        subscriptionIdentifier: Optional[str] = None,
        sender: Optional[jid.JID] = None,
    ) -> None:
        if not await self.host.trigger.async_point(
            "XEP-0060_unsubscribe",
            client,
            service,
            nodeIdentifier,
            sub_jid,
            subscriptionIdentifier,
            sender,
        ):
            return
        try:
            await client.pubsub_client.unsubscribe(
                service,
                nodeIdentifier,
                sub_jid or client.jid.userhostJID(),
                subscriptionIdentifier,
                sender,
            )
        except error.StanzaError as e:
            try:
                next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed"))
            except StopIteration:
                raise e
            else:
                log.info(
                    f"{sender.full() if sender else client.jid.full()} was not "
                    f"subscribed to node {nodeIdentifier!s} at {service.full()}"
                )

    @utils.ensure_deferred
    async def _subscriptions(
        self, service="", nodeIdentifier="", profile_key=C.PROF_KEY_NONE
    ) -> str:
        client = self.host.get_client(profile_key)
        service = None if not service else jid.JID(service)
        subs = await self.subscriptions(client, service, nodeIdentifier or None)
        return data_format.serialise(subs)

    async def subscriptions(
        self,
        client: SatXMPPEntity,
        service: Optional[jid.JID] = None,
        node: Optional[str] = None,
    ) -> List[Dict[str, Union[str, bool]]]:
        """Retrieve subscriptions from a service

        @param service(jid.JID): PubSub service
        @param nodeIdentifier(unicode, None): node to check
            None to get all subscriptions
        """
        cont, ret = await self.host.trigger.async_return_point(
            "XEP-0060_subscriptions", client, service, node
        )
        if not cont:
            return ret
        subs = await client.pubsub_client.subscriptions(service, node)
        ret = []
        for sub in subs:
            sub_dict = {
                "service": service.host if service else client.jid.host,
                "node": sub.nodeIdentifier,
                "subscriber": sub.subscriber.full(),
                "state": sub.state,
            }
            if sub.subscriptionIdentifier is not None:
                sub_dict["id"] = sub.subscriptionIdentifier
            ret.append(sub_dict)
        return ret

    ## misc tools ##

    def get_node_uri(self, service, node, item=None):
        """Return XMPP URI of a PubSub node

        @param service(jid.JID): PubSub service
        @param node(unicode): node
        @return (unicode): URI of the node
        """
        # FIXME: deprecated, use sat.tools.common.uri instead
        assert service is not None
        # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122)
        #      use ";" as a separator. So if more than one value is used in query_data,
        #      urlencode MUST NOT BE USED.
        query_data = [("node", node.encode("utf-8"))]
        if item is not None:
            query_data.append(("item", item.encode("utf-8")))
        return "xmpp:{service}?;{query}".format(
            service=service.userhost(), query=urllib.parse.urlencode(query_data)
        )

    ## methods to manage several stanzas/jids at once ##

    # generic #

    def get_rt_results(
        self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
    ):
        return self.rt_sessions.get_results(session_id, on_success, on_error, profile)

    def trans_items_data(self, items_data, item_cb=lambda item: item.toXml()):
        """Helper method to transform result from [get_items]

        the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
        as returned by [get_items].
        @param items_data(tuple): tuple returned by [get_items]
        @param item_cb(callable): method to transform each item
        @return (tuple): a serialised form ready to go throught bridge
        """
        items, metadata = items_data
        items = [item_cb(item) for item in items]

        return (items, metadata)

    def trans_items_data_d(self, items_data, item_cb):
        """Helper method to transform result from [get_items], deferred version

        the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
        as returned by [get_items]. metadata values are then casted to unicode and
        each item is passed to items_cb.
        An errback is added to item_cb, and when it is fired the value is filtered from
            final items
        @param items_data(tuple): tuple returned by [get_items]
        @param item_cb(callable): method to transform each item (must return a deferred)
        @return (tuple): a deferred which fire a dict which can be serialised to go
            throught bridge
        """
        items, metadata = items_data

        def eb(failure_):
            log.warning(f"Error while parsing item: {failure_.value}")

        d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])
        d.addCallback(
            lambda parsed_items: ([i for i in parsed_items if i is not None], metadata)
        )
        return d

    def ser_d_list(self, results, failure_result=None):
        """Serialise a DeferredList result

        @param results: DeferredList results
        @param failure_result: value to use as value for failed Deferred
            (default: empty tuple)
        @return (list): list with:
            - failure: empty in case of success, else error message
            - result
        """
        if failure_result is None:
            failure_result = ()
        return [
            (
                ("", result)
                if success
                else (str(result.result) or UNSPECIFIED, failure_result)
            )
            for success, result in results
        ]

    # subscribe #

    @utils.ensure_deferred
    async def _get_node_subscriptions(
        self, service: str, node: str, profile_key: str
    ) -> Dict[str, str]:
        client = self.host.get_client(profile_key)
        subs = await self.get_node_subscriptions(
            client, jid.JID(service) if service else None, node
        )
        return {j.full(): a for j, a in subs.items()}

    async def get_node_subscriptions(
        self, client: SatXMPPEntity, service: Optional[jid.JID], nodeIdentifier: str
    ) -> Dict[jid.JID, str]:
        """Retrieve subscriptions to a node

        @param nodeIdentifier(unicode): node to get subscriptions from
        """
        if not nodeIdentifier:
            raise exceptions.DataError("node identifier can't be empty")
        request = pubsub.PubSubRequest("subscriptionsGet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        iq_elt = await request.send(client.xmlstream)
        try:
            subscriptions_elt = next(
                iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "subscriptions")
            )
        except StopIteration:
            raise ValueError(
                _("Invalid result: missing <subscriptions> element: {}").format(
                    iq_elt.toXml
                )
            )
        except AttributeError as e:
            raise ValueError(_("Invalid result: {}").format(e))
        try:
            return {
                jid.JID(s["jid"]): s["subscription"]
                for s in subscriptions_elt.elements(pubsub.NS_PUBSUB, "subscription")
            }
        except KeyError:
            raise ValueError(
                _("Invalid result: bad <subscription> element: {}").format(iq_elt.toXml)
            )

    def _set_node_subscriptions(
        self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
    ):
        client = self.host.get_client(profile_key)
        subscriptions = {
            jid.JID(jid_): subscription for jid_, subscription in subscriptions.items()
        }
        d = self.set_node_subscriptions(
            client,
            jid.JID(service_s) if service_s else None,
            nodeIdentifier,
            subscriptions,
        )
        return d

    def set_node_subscriptions(self, client, service, nodeIdentifier, subscriptions):
        """Set or update subscriptions of a node owned by profile

        @param subscriptions(dict[jid.JID, unicode]): subscriptions to set
            check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions
        """
        request = pubsub.PubSubRequest("subscriptionsSet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.subscriptions = {
            pubsub.Subscription(nodeIdentifier, jid_, state)
            for jid_, state in subscriptions.items()
        }
        d = request.send(client.xmlstream)
        return d

    def _many_subscribe_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
        """Get real-time results for subcribeToManu session

        @param session_id: id of the real-time deferred session
        @param return (tuple): (remaining, results) where:
            - remaining is the number of still expected results
            - results is a list of tuple(unicode, unicode, bool, unicode) with:
                - service: pubsub service
                - and node: pubsub node
                - failure(unicode): empty string in case of success, error message else
        @param profile_key: %(doc_profile_key)s
        """
        profile = self.host.get_client(profile_key).profile
        d = self.rt_sessions.get_results(
            session_id,
            on_success=lambda result: "",
            on_error=lambda failure: str(failure.value),
            profile=profile,
        )
        # we need to convert jid.JID to unicode with full() to serialise it for the bridge
        d.addCallback(
            lambda ret: (
                ret[0],
                [
                    (service.full(), node, "" if success else failure or UNSPECIFIED)
                    for (service, node), (success, failure) in ret[1].items()
                ],
            )
        )
        return d

    def _subscribe_to_many(
        self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE
    ):
        return self.subscribe_to_many(
            [(jid.JID(service), str(node)) for service, node in node_data],
            jid.JID(subscriber),
            options,
            profile_key,
        )

    def subscribe_to_many(
        self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE
    ):
        """Subscribe to several nodes at once.

        @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
            - service (jid.JID) is the pubsub service
            - node (unicode) is the node to subscribe to
        @param subscriber (jid.JID): optional subscription identifier.
        @param options (dict): subscription options
        @param profile_key (str): %(doc_profile_key)s
        @return (str): RT Deferred session id
        """
        client = self.host.get_client(profile_key)
        deferreds = {}
        for service, node in node_data:
            deferreds[(service, node)] = defer.ensureDeferred(
                client.pubsub_client.subscribe(service, node, subscriber, options=options)
            )
        return self.rt_sessions.new_session(deferreds, client.profile)
        # found_nodes = yield self.listNodes(service, profile=client.profile)
        # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
        # d_list = []
        # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
        #     if nodeIdentifier not in found_nodes:
        #         log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
        #         continue  # avoid sat-pubsub "SubscriptionExists" error
        #     d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
        # defer.returnValue(d_list)

    # get #

    def _get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
        """Get real-time results for get_from_many session

        @param session_id: id of the real-time deferred session
        @param profile_key: %(doc_profile_key)s
        @param return (tuple): (remaining, results) where:
            - remaining is the number of still expected results
            - results is a list of tuple with
                - service (unicode): pubsub service
                - node (unicode): pubsub node
                - failure (unicode): empty string in case of success, error message else
                - items (list[s]): raw XML of items
                - metadata(dict): serialised metadata
        """
        profile = self.host.get_client(profile_key).profile
        d = self.rt_sessions.get_results(
            session_id,
            on_success=lambda result: ("", self.trans_items_data(result)),
            on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})),
            profile=profile,
        )
        d.addCallback(
            lambda ret: (
                ret[0],
                [
                    (service.full(), node, failure, items, metadata)
                    for (service, node), (success, (failure, (items, metadata))) in ret[
                        1
                    ].items()
                ],
            )
        )
        return d

    def _get_from_many(
        self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE
    ):
        """
        @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
        """
        max_item = None if max_item == C.NO_LIMIT else max_item
        extra = self.parse_extra(data_format.deserialise(extra))
        return self.get_from_many(
            [(jid.JID(service), str(node)) for service, node in node_data],
            max_item,
            extra.rsm_request,
            extra.extra,
            profile_key,
        )

    def get_from_many(
        self,
        node_data,
        max_item=None,
        rsm_request=None,
        extra=None,
        profile_key=C.PROF_KEY_NONE,
    ):
        """Get items from many nodes at once

        @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
            - service (jid.JID) is the pubsub service
            - node (unicode) is the node to get items from
        @param max_items (int): optional limit on the number of retrieved items.
        @param rsm_request (RSMRequest): RSM request data
        @param profile_key (unicode): %(doc_profile_key)s
        @return (str): RT Deferred session id
        """
        client = self.host.get_client(profile_key)
        deferreds = {}
        for service, node in node_data:
            deferreds[(service, node)] = defer.ensureDeferred(
                self.get_items(
                    client, service, node, max_item, rsm_request=rsm_request, extra=extra
                )
            )
        return self.rt_sessions.new_session(deferreds, client.profile)


@implementer(disco.IDisco)
class SatPubSubClient(rsm.PubSubClient):

    def __init__(self, host, parent_plugin):
        self.host = host
        self.parent_plugin = parent_plugin
        rsm.PubSubClient.__init__(self)

    def connectionInitialized(self):
        rsm.PubSubClient.connectionInitialized(self)

    async def items(
        self,
        service: Optional[jid.JID],
        nodeIdentifier: str,
        maxItems: Optional[int] = None,
        subscriptionIdentifier: Optional[str] = None,
        sender: Optional[jid.JID] = None,
        itemIdentifiers: Optional[Set[str]] = None,
        orderBy: Optional[List[str]] = None,
        rsm_request: Optional[rsm.RSMRequest] = None,
        extra: Optional[Dict[str, Any]] = None,
    ):
        if extra is None:
            extra = {}
        items, rsm_response = await super().items(
            service,
            nodeIdentifier,
            maxItems,
            subscriptionIdentifier,
            sender,
            itemIdentifiers,
            orderBy,
            rsm_request,
        )
        # items must be returned, thus this async point can't stop the workflow (but it
        # can modify returned items)
        await self.host.trigger.async_point(
            "XEP-0060_items",
            self.parent,
            service,
            nodeIdentifier,
            items,
            rsm_response,
            extra,
        )
        return items, rsm_response

    def _get_node_callbacks(self, node, event):
        """Generate callbacks from given node and event

        @param node(unicode): node used for the item
            any registered node which prefix the node will match
        @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE
        @return (iterator[callable]): callbacks for this node/event
        """
        for registered_node, callbacks_dict in self.parent_plugin._node_cb.items():
            if not node.startswith(registered_node):
                continue
            try:
                for callback_data in callbacks_dict[event]:
                    yield callback_data[0]
            except KeyError:
                continue

    async def _call_node_callbacks(self, client, event: pubsub.ItemsEvent) -> None:
        """Call sequencially event callbacks of a node

        Callbacks are called sequencially and not in parallel to be sure to respect
        priority (notably for plugin needing to get old items before they are modified or
        deleted from cache).
        """
        for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS):
            try:
                await utils.as_deferred(callback, client, event)
            except Exception as e:
                log.error(f"Error while running items event callback {callback}: {e}")

    def itemsReceived(self, event):
        log.debug("Pubsub items received")
        client = self.parent
        defer.ensureDeferred(self._call_node_callbacks(client, event))
        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
            raw_items = [i.toXml() for i in event.items]
            self.host.bridge.ps_event_raw(
                event.sender.full(),
                event.nodeIdentifier,
                C.PS_ITEMS,
                raw_items,
                client.profile,
            )

    def deleteReceived(self, event):
        log.debug(("Publish node deleted"))
        for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE):
            d = utils.as_deferred(callback, self.parent, event)
            d.addErrback(
                lambda f: log.error(
                    f"Error while running delete event callback {callback}: {f}"
                )
            )
        client = self.parent
        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
            self.host.bridge.ps_event_raw(
                event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
            )

    def purgeReceived(self, event):
        log.debug(("Publish node purged"))
        for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE):
            d = utils.as_deferred(callback, self.parent, event)
            d.addErrback(
                lambda f: log.error(
                    f"Error while running purge event callback {callback}: {f}"
                )
            )
        client = self.parent
        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
            self.host.bridge.ps_event_raw(
                event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile
            )

    def subscriptions(self, service, nodeIdentifier, sender=None):
        """Return the list of subscriptions to the given service and node.

        @param service: The publish subscribe service to retrieve the subscriptions from.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
        @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
        @type nodeIdentifier: C{unicode}
        @return (list[pubsub.Subscription]): list of subscriptions
        """
        request = pubsub.PubSubRequest("subscriptions")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.sender = sender
        d = request.send(self.xmlstream)

        def cb(iq):
            subs = []
            for subscription_elt in iq.pubsub.subscriptions.elements(
                pubsub.NS_PUBSUB, "subscription"
            ):
                subscription = pubsub.Subscription(
                    subscription_elt["node"],
                    jid.JID(subscription_elt["jid"]),
                    subscription_elt["subscription"],
                    subscriptionIdentifier=subscription_elt.getAttribute("subid"),
                )
                subs.append(subscription)
            return subs

        return d.addCallback(cb)

    def purge_node(self, service, nodeIdentifier):
        """Purge a node (i.e. delete all items from it)

        @param service(jid.JID, None): service to send the item to
            None to use PEP
        @param NodeIdentifier(unicode): PubSub node to use
        """
        # TODO: propose this upstream and remove it once merged
        request = pubsub.PubSubRequest("purge")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        return request.send(self.xmlstream)

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        disco_info = []
        self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
        return disco_info

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []