diff libervia/backend/plugins/plugin_xep_0060.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0060.py@524856bd7b19
children 087902fbb77a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_xep_0060.py	Fri Jun 02 11:49:51 2023 +0200
@@ -0,0 +1,1820 @@
+#!/usr/bin/env python3
+
+# SàT plugin for Publish-Subscribe (xep-0060)
+# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+from collections import namedtuple
+from functools import reduce
+from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
+import urllib.error
+import urllib.parse
+import urllib.request
+
+from twisted.internet import defer, reactor
+from twisted.words.protocols.jabber import error, jid
+from twisted.words.xish import domish
+from wokkel import disco
+from wokkel import data_form
+from wokkel import pubsub
+from wokkel import rsm
+from wokkel import mam
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.core.xmpp import SatXMPPClient
+from libervia.backend.tools import utils
+from libervia.backend.tools import sat_defer
+from libervia.backend.tools import xml_tools
+from libervia.backend.tools.common import data_format
+
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Publish-Subscribe",
+    C.PI_IMPORT_NAME: "XEP-0060",
+    C.PI_TYPE: "XEP",
+    C.PI_MODES: C.PLUG_MODE_BOTH,
+    C.PI_PROTOCOLS: ["XEP-0060"],
+    C.PI_DEPENDENCIES: [],
+    C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"],
+    C.PI_MAIN: "XEP_0060",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""),
+}
+
+UNSPECIFIED = "unspecified error"
+
+
+Extra = namedtuple("Extra", ("rsm_request", "extra"))
+# rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None
+# extra is a potentially empty dict
+TIMEOUT = 30
+# minimum features that a pubsub service must have to be selectable as default
+DEFAULT_PUBSUB_MIN_FEAT = {
+ 'http://jabber.org/protocol/pubsub#persistent-items',
+ 'http://jabber.org/protocol/pubsub#publish',
+ 'http://jabber.org/protocol/pubsub#retract-items',
+}
+
+class XEP_0060(object):
+    OPT_ACCESS_MODEL = "pubsub#access_model"
+    OPT_PERSIST_ITEMS = "pubsub#persist_items"
+    OPT_MAX_ITEMS = "pubsub#max_items"
+    OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads"
+    OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe"
+    OPT_NODE_TYPE = "pubsub#node_type"
+    OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type"
+    OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth"
+    OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed"
+    OPT_PUBLISH_MODEL = "pubsub#publish_model"
+    OPT_OVERWRITE_POLICY = "pubsub#overwrite_policy"
+    ACCESS_OPEN = "open"
+    ACCESS_PRESENCE = "presence"
+    ACCESS_ROSTER = "roster"
+    ACCESS_PUBLISHER_ROSTER = "publisher-roster"
+    ACCESS_AUTHORIZE = "authorize"
+    ACCESS_WHITELIST = "whitelist"
+    PUBLISH_MODEL_PUBLISHERS = "publishers"
+    PUBLISH_MODEL_SUBSCRIBERS = "subscribers"
+    PUBLISH_MODEL_OPEN = "open"
+    OWPOL_ORIGINAL = "original_publisher"
+    OWPOL_ANY_PUB = "any_publisher"
+    ID_SINGLETON = "current"
+    EXTRA_PUBLISH_OPTIONS = "publish_options"
+    EXTRA_ON_PRECOND_NOT_MET = "on_precondition_not_met"
+    # extra disco needed for RSM, cf. XEP-0060 § 6.5.4
+    DISCO_RSM = "http://jabber.org/protocol/pubsub#rsm"
+
+    def __init__(self, host):
+        log.info(_("PubSub plugin initialization"))
+        self.host = host
+        self._rsm = host.plugins.get("XEP-0059")
+        self._mam = host.plugins.get("XEP-0313")
+        self._node_cb = {}  # dictionnary of callbacks for node (key: node, value: list of callbacks)
+        self.rt_sessions = sat_defer.RTDeferredSessions()
+        host.bridge.add_method(
+            "ps_node_create",
+            ".plugin",
+            in_sign="ssa{ss}s",
+            out_sign="s",
+            method=self._create_node,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_configuration_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="a{ss}",
+            method=self._get_node_configuration,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_configuration_set",
+            ".plugin",
+            in_sign="ssa{ss}s",
+            out_sign="",
+            method=self._set_node_configuration,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_affiliations_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="a{ss}",
+            method=self._get_node_affiliations,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_affiliations_set",
+            ".plugin",
+            in_sign="ssa{ss}s",
+            out_sign="",
+            method=self._set_node_affiliations,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_subscriptions_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="a{ss}",
+            method=self._get_node_subscriptions,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_subscriptions_set",
+            ".plugin",
+            in_sign="ssa{ss}s",
+            out_sign="",
+            method=self._set_node_subscriptions,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_purge",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._purge_node,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_delete",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._delete_node,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_node_watch_add",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._addWatch,
+            async_=False,
+        )
+        host.bridge.add_method(
+            "ps_node_watch_remove",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._remove_watch,
+            async_=False,
+        )
+        host.bridge.add_method(
+            "ps_affiliations_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="a{ss}",
+            method=self._get_affiliations,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_items_get",
+            ".plugin",
+            in_sign="ssiassss",
+            out_sign="s",
+            method=self._get_items,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_item_send",
+            ".plugin",
+            in_sign="ssssss",
+            out_sign="s",
+            method=self._send_item,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_items_send",
+            ".plugin",
+            in_sign="ssasss",
+            out_sign="as",
+            method=self._send_items,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_item_retract",
+            ".plugin",
+            in_sign="sssbs",
+            out_sign="",
+            method=self._retract_item,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_items_retract",
+            ".plugin",
+            in_sign="ssasbs",
+            out_sign="",
+            method=self._retract_items,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_item_rename",
+            ".plugin",
+            in_sign="sssss",
+            out_sign="",
+            method=self._rename_item,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_subscribe",
+            ".plugin",
+            in_sign="ssss",
+            out_sign="s",
+            method=self._subscribe,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_unsubscribe",
+            ".plugin",
+            in_sign="sss",
+            out_sign="",
+            method=self._unsubscribe,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_subscriptions_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._subscriptions,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_subscribe_to_many",
+            ".plugin",
+            in_sign="a(ss)sa{ss}s",
+            out_sign="s",
+            method=self._subscribe_to_many,
+        )
+        host.bridge.add_method(
+            "ps_get_subscribe_rt_result",
+            ".plugin",
+            in_sign="ss",
+            out_sign="(ua(sss))",
+            method=self._many_subscribe_rt_result,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "ps_get_from_many",
+            ".plugin",
+            in_sign="a(ss)iss",
+            out_sign="s",
+            method=self._get_from_many,
+        )
+        host.bridge.add_method(
+            "ps_get_from_many_rt_result",
+            ".plugin",
+            in_sign="ss",
+            out_sign="(ua(sssasa{ss}))",
+            method=self._get_from_many_rt_result,
+            async_=True,
+        )
+
+        #  high level observer method
+        host.bridge.add_signal(
+            "ps_event", ".plugin", signature="ssssss"
+        )  # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile
+
+        # low level observer method, used if service/node is in watching list (see psNodeWatch* methods)
+        host.bridge.add_signal(
+            "ps_event_raw", ".plugin", signature="sssass"
+        )  # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile
+
+    def get_handler(self, client):
+        client.pubsub_client = SatPubSubClient(self.host, self)
+        return client.pubsub_client
+
+    async def profile_connected(self, client):
+        client.pubsub_watching = set()
+        try:
+            client.pubsub_service = jid.JID(
+                self.host.memory.config_get("", "pubsub_service")
+            )
+        except RuntimeError:
+            log.info(
+                _(
+                    "Can't retrieve pubsub_service from conf, we'll use first one that "
+                    "we find"
+                )
+            )
+            pubsub_services = await self.host.find_service_entities(
+                client, "pubsub", "service"
+            )
+            for service_jid in pubsub_services:
+                infos = await self.host.memory.disco.get_infos(client, service_jid)
+                if not DEFAULT_PUBSUB_MIN_FEAT.issubset(infos.features):
+                    continue
+                names = {(n or "").lower() for n in infos.identities.values()}
+                if "libervia pubsub service" in names:
+                    # this is the name of Libervia's side project pubsub service, we know
+                    # that it is a suitable default pubsub service
+                    client.pubsub_service = service_jid
+                    break
+                categories = {(i[0] or "").lower() for i in infos.identities.keys()}
+                if "gateway" in categories or "gateway" in names:
+                    # we don't want to use a gateway as default pubsub service
+                    continue
+                if "jabber:iq:register" in infos.features:
+                    # may be present on gateways, and we don't want a service
+                    # where registration is needed
+                    continue
+                client.pubsub_service = service_jid
+                break
+            else:
+                client.pubsub_service = None
+            pubsub_service_str = (
+                client.pubsub_service.full() if client.pubsub_service else "PEP"
+            )
+            log.info(f"default pubsub service: {pubsub_service_str}")
+
+    def features_get(self, profile):
+        try:
+            client = self.host.get_client(profile)
+        except exceptions.ProfileNotSetError:
+            return {}
+        try:
+            return {
+                "service": client.pubsub_service.full()
+                if client.pubsub_service is not None
+                else ""
+            }
+        except AttributeError:
+            if self.host.is_connected(profile):
+                log.debug("Profile is not connected, service is not checked yet")
+            else:
+                log.error("Service should be available !")
+            return {}
+
+    def parse_extra(self, extra):
+        """Parse extra dictionnary
+
+        used bridge's extra dictionnaries
+        @param extra(dict): extra data used to configure request
+        @return(Extra): filled Extra instance
+        """
+        if extra is None:
+            rsm_request = None
+            extra = {}
+        else:
+            # order-by
+            if C.KEY_ORDER_BY in extra:
+                # FIXME: we temporarily manage only one level of ordering
+                #        we need to switch to a fully serialised extra data
+                #        to be able to encode a whole ordered list
+                extra[C.KEY_ORDER_BY] = [extra.pop(C.KEY_ORDER_BY)]
+
+            # rsm
+            if self._rsm is None:
+                rsm_request = None
+            else:
+                rsm_request = self._rsm.parse_extra(extra)
+
+            # mam
+            if self._mam is None:
+                mam_request = None
+            else:
+                mam_request = self._mam.parse_extra(extra, with_rsm=False)
+
+            if mam_request is not None:
+                assert "mam" not in extra
+                extra["mam"] = mam_request
+
+        return Extra(rsm_request, extra)
+
+    def add_managed_node(
+        self,
+        node: str,
+        priority: int = 0,
+        **kwargs: Callable
+    ):
+        """Add a handler for a node
+
+        @param node: node to monitor
+            all node *prefixed* with this one will be triggered
+        @param priority: priority of the callback. Callbacks with higher priority will be
+            called first.
+        @param **kwargs: method(s) to call when the node is found
+            the method must be named after PubSub constants in lower case
+            and suffixed with "_cb"
+            e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE
+            note: only C.PS_ITEMS and C.PS_DELETE are implemented so far
+        """
+        assert node is not None
+        assert kwargs
+        callbacks = self._node_cb.setdefault(node, {})
+        for event, cb in kwargs.items():
+            event_name = event[:-3]
+            assert event_name in C.PS_EVENTS
+            cb_list = callbacks.setdefault(event_name, [])
+            cb_list.append((cb, priority))
+            cb_list.sort(key=lambda c: c[1], reverse=True)
+
+    def remove_managed_node(self, node, *args):
+        """Add a handler for a node
+
+        @param node(unicode): node to monitor
+        @param *args: callback(s) to remove
+        """
+        assert args
+        try:
+            registred_cb = self._node_cb[node]
+        except KeyError:
+            pass
+        else:
+            removed = False
+            for callback in args:
+                for event, cb_list in registred_cb.items():
+                    to_remove = []
+                    for cb in cb_list:
+                        if cb[0] == callback:
+                            to_remove.append(cb)
+                            for cb in to_remove:
+                                cb_list.remove(cb)
+                            if not cb_list:
+                                del registred_cb[event]
+                            if not registred_cb:
+                                del self._node_cb[node]
+                            removed = True
+                            break
+
+            if not removed:
+                log.error(
+                    f"Trying to remove inexistant callback {callback} for node {node}"
+                )
+
+    # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
+    #     """Retrieve the name of the nodes that are accessible on the target service.
+
+    #     @param service (JID): target service
+    #     @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
+    #     @param profile (str): %(doc_profile)s
+    #     @return: deferred which fire a list of nodes
+    #     """
+    #     client = self.host.get_client(profile)
+    #     d = self.host.getDiscoItems(client, service, nodeIdentifier)
+    #     d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
+    #     return d
+
+    # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
+    #     """Retrieve the name of the nodes to which the profile is subscribed on the target service.
+
+    #     @param service (JID): target service
+    #     @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
+    #     @param filter_ (str): filter the result according to the given subscription type:
+    #         - None: do not filter
+    #         - 'pending': subscription has not been approved yet by the node owner
+    #         - 'unconfigured': subscription options have not been configured yet
+    #         - 'subscribed': subscription is complete
+    #     @param profile (str): %(doc_profile)s
+    #     @return: Deferred list[str]
+    #     """
+    #     d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
+    #     d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
+    #     return d
+
+    def _send_item(self, service, nodeIdentifier, payload, item_id=None, extra_ser="",
+                  profile_key=C.PROF_KEY_NONE):
+        client = self.host.get_client(profile_key)
+        service = None if not service else jid.JID(service)
+        payload = xml_tools.parse(payload)
+        extra = data_format.deserialise(extra_ser)
+        d = defer.ensureDeferred(self.send_item(
+            client, service, nodeIdentifier, payload, item_id or None, extra
+        ))
+        d.addCallback(lambda ret: ret or "")
+        return d
+
+    def _send_items(self, service, nodeIdentifier, items, extra_ser=None,
+                  profile_key=C.PROF_KEY_NONE):
+        client = self.host.get_client(profile_key)
+        service = None if not service else jid.JID(service)
+        try:
+            items = [xml_tools.parse(item) for item in items]
+        except Exception as e:
+            raise exceptions.DataError(_("Can't parse items: {msg}").format(
+                msg=e))
+        extra = data_format.deserialise(extra_ser)
+        return defer.ensureDeferred(self.send_items(
+            client, service, nodeIdentifier, items, extra=extra
+        ))
+
+    async def send_item(
+        self,
+        client: SatXMPPClient,
+        service: Union[jid.JID, None],
+        nodeIdentifier: str,
+        payload: domish.Element,
+        item_id: Optional[str] = None,
+        extra: Optional[Dict[str, Any]] = None
+    ) -> Optional[str]:
+        """High level method to send one item
+
+        @param service: service to send the item to None to use PEP
+        @param NodeIdentifier: PubSub node to use
+        @param payload: payload of the item to send
+        @param item_id: id to use or None to create one
+        @param extra: extra options
+        @return: id of the created item
+        """
+        assert isinstance(payload, domish.Element)
+        item_elt = domish.Element((pubsub.NS_PUBSUB, 'item'))
+        if item_id is not None:
+            item_elt['id'] = item_id
+        item_elt.addChild(payload)
+        published_ids = await self.send_items(
+            client,
+            service,
+            nodeIdentifier,
+            [item_elt],
+            extra=extra
+        )
+        try:
+            return published_ids[0]
+        except IndexError:
+            return item_id
+
+    async def send_items(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        nodeIdentifier: str,
+        items: List[domish.Element],
+        sender: Optional[jid.JID] = None,
+        extra: Optional[Dict[str, Any]] = None
+    ) -> List[str]:
+        """High level method to send several items at once
+
+        @param service: service to send the item to
+            None to use PEP
+        @param NodeIdentifier: PubSub node to use
+        @param items: whole item elements to send,
+            "id" will be used if set
+        @param extra: extra options. Key can be:
+            - self.EXTRA_PUBLISH_OPTIONS(dict): publish options, cf. XEP-0060 § 7.1.5
+                the dict maps option name to value(s)
+            - self.EXTRA_ON_PRECOND_NOT_MET(str): policy to have when publishing is
+                failing du to failing precondition. Value can be:
+                * raise (default): raise the exception
+                * publish_without_options: re-publish without the publish-options.
+                    A warning will be logged showing that the publish-options could not
+                    be used
+        @return: ids of the created items
+        """
+        if extra is None:
+            extra = {}
+        if service is None:
+            service = client.jid.userhostJID()
+        parsed_items = []
+        for item in items:
+            if item.name != 'item':
+                raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
+            item_id = item.getAttribute("id")
+            parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
+        publish_options = extra.get(self.EXTRA_PUBLISH_OPTIONS)
+        try:
+            iq_result = await self.publish(
+                client, service, nodeIdentifier, parsed_items, options=publish_options,
+                sender=sender
+            )
+        except error.StanzaError as e:
+            if ((e.condition == 'conflict' and e.appCondition
+                 and e.appCondition.name == 'precondition-not-met'
+                 and publish_options is not None)):
+                # this usually happens when publish-options can't be set
+                policy = extra.get(self.EXTRA_ON_PRECOND_NOT_MET, 'raise')
+                if policy == 'raise':
+                    raise e
+                elif policy == 'publish_without_options':
+                    log.warning(_(
+                        "Can't use publish-options ({options}) on node {node}, "
+                        "re-publishing without them: {reason}").format(
+                            options=', '.join(f'{k} = {v}'
+                                    for k,v in publish_options.items()),
+                            node=nodeIdentifier,
+                            reason=e,
+                        )
+                    )
+                    iq_result = await self.publish(
+                        client, service, nodeIdentifier, parsed_items)
+                else:
+                    raise exceptions.InternalError(
+                        f"Invalid policy in extra's {self.EXTRA_ON_PRECOND_NOT_MET!r}: "
+                        f"{policy}"
+                    )
+            else:
+                raise e
+        try:
+            return [
+                item['id']
+                for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')
+            ]
+        except AttributeError:
+            return []
+
+    async def publish(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        nodeIdentifier: str,
+        items: Optional[List[domish.Element]] = None,
+        options: Optional[dict] = None,
+        sender: Optional[jid.JID] = None,
+        extra: Optional[Dict[str, Any]] = None
+    ) -> domish.Element:
+        """Publish pubsub items
+
+        @param sender: sender of the request,
+            client.jid will be used if nto set
+        @param extra: extra data
+            not used directly by ``publish``, but may be used in triggers
+        @return: IQ result stanza
+        @trigger XEP-0060_publish: called just before publication.
+            if it returns False, extra must have a "iq_result_elt" key set with
+            domish.Element to return.
+        """
+        if sender is None:
+            sender = client.jid
+        if extra is None:
+            extra = {}
+        if not await self.host.trigger.async_point(
+            "XEP-0060_publish", client, service, nodeIdentifier, items, options, sender,
+            extra
+        ):
+            return extra["iq_result_elt"]
+        iq_result_elt = await client.pubsub_client.publish(
+            service, nodeIdentifier, items, sender,
+            options=options
+        )
+        return iq_result_elt
+
+    def _unwrap_mam_message(self, message_elt):
+        try:
+            item_elt = reduce(
+                lambda elt, ns_name: next(elt.elements(*ns_name)),
+                (message_elt,
+                 (mam.NS_MAM, "result"),
+                 (C.NS_FORWARD, "forwarded"),
+                 (C.NS_CLIENT, "message"),
+                 ("http://jabber.org/protocol/pubsub#event", "event"),
+                 ("http://jabber.org/protocol/pubsub#event", "items"),
+                 ("http://jabber.org/protocol/pubsub#event", "item"),
+                ))
+        except StopIteration:
+            raise exceptions.DataError("Can't find Item in MAM message element")
+        return item_elt
+
+    def serialise_items(self, items_data):
+        items, metadata = items_data
+        metadata['items'] = items
+        return data_format.serialise(metadata)
+
+    def _get_items(self, service="", node="", max_items=10, item_ids=None, sub_id=None,
+                  extra="", profile_key=C.PROF_KEY_NONE):
+        """Get items from pubsub node
+
+        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
+        """
+        client = self.host.get_client(profile_key)
+        service = jid.JID(service) if service else None
+        max_items = None if max_items == C.NO_LIMIT else max_items
+        extra = self.parse_extra(data_format.deserialise(extra))
+        d = defer.ensureDeferred(self.get_items(
+            client,
+            service,
+            node,
+            max_items,
+            item_ids,
+            sub_id or None,
+            extra.rsm_request,
+            extra.extra,
+        ))
+        d.addCallback(self.trans_items_data)
+        d.addCallback(self.serialise_items)
+        return d
+
+    async def get_items(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        max_items: Optional[int] = None,
+        item_ids: Optional[List[str]] = None,
+        sub_id: Optional[str] = None,
+        rsm_request: Optional[rsm.RSMRequest] = None,
+        extra: Optional[dict] = None
+    ) -> Tuple[List[dict], dict]:
+        """Retrieve pubsub items from a node.
+
+        @param service (JID, None): pubsub service.
+        @param node (str): node id.
+        @param max_items (int): optional limit on the number of retrieved items.
+        @param item_ids (list[str]): identifiers of the items to be retrieved (can't be
+             used with rsm_request). If requested items don't exist, they won't be
+             returned, meaning that we can have an empty list as result (NotFound
+             exception is NOT raised).
+        @param sub_id (str): optional subscription identifier.
+        @param rsm_request (rsm.RSMRequest): RSM request data
+        @return: a deferred couple (list[dict], dict) containing:
+            - list of items
+            - metadata with the following keys:
+                - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index
+                    value of RSMResponse
+                - service, node: service and node used
+        """
+        if item_ids and max_items is not None:
+            max_items = None
+        if rsm_request and item_ids:
+            raise ValueError("items_id can't be used with rsm")
+        if extra is None:
+            extra = {}
+        cont, ret = await self.host.trigger.async_return_point(
+            "XEP-0060_getItems", client, service, node, max_items, item_ids, sub_id,
+            rsm_request, extra
+        )
+        if not cont:
+            return ret
+        try:
+            mam_query = extra["mam"]
+        except KeyError:
+            d = defer.ensureDeferred(client.pubsub_client.items(
+                service = service,
+                nodeIdentifier = node,
+                maxItems = max_items,
+                subscriptionIdentifier = sub_id,
+                sender = None,
+                itemIdentifiers = item_ids,
+                orderBy = extra.get(C.KEY_ORDER_BY),
+                rsm_request = rsm_request,
+                extra = extra
+            ))
+            # we have no MAM data here, so we add None
+            d.addErrback(sat_defer.stanza_2_not_found)
+            d.addTimeout(TIMEOUT, reactor)
+            items, rsm_response = await d
+            mam_response = None
+        else:
+            # if mam is requested, we have to do a totally different query
+            if self._mam is None:
+                raise exceptions.NotFound("MAM (XEP-0313) plugin is not available")
+            if max_items is not None:
+                raise exceptions.DataError("max_items parameter can't be used with MAM")
+            if item_ids:
+                raise exceptions.DataError("items_ids parameter can't be used with MAM")
+            if mam_query.node is None:
+                mam_query.node = node
+            elif mam_query.node != node:
+                raise exceptions.DataError(
+                    "MAM query node is incoherent with get_items's node"
+                )
+            if mam_query.rsm is None:
+                mam_query.rsm = rsm_request
+            else:
+                if mam_query.rsm != rsm_request:
+                    raise exceptions.DataError(
+                        "Conflict between RSM request and MAM's RSM request"
+                    )
+            items, rsm_response, mam_response = await self._mam.get_archives(
+                client, mam_query, service, self._unwrap_mam_message
+            )
+
+        try:
+            subscribe = C.bool(extra["subscribe"])
+        except KeyError:
+            subscribe = False
+
+        if subscribe:
+            try:
+                await self.subscribe(client, service, node)
+            except error.StanzaError as e:
+                log.warning(
+                    f"Could not subscribe to node {node} on service {service}: {e}"
+                )
+
+        # TODO: handle mam_response
+        service_jid = service if service else client.jid.userhostJID()
+        metadata = {
+            "service": service_jid,
+            "node": node,
+            "uri": self.get_node_uri(service_jid, node),
+        }
+        if mam_response is not None:
+            # mam_response is a dict with "complete" and "stable" keys
+            # we can put them directly in metadata
+            metadata.update(mam_response)
+        if rsm_request is not None and rsm_response is not None:
+            metadata['rsm'] = rsm_response.toDict()
+            if mam_response is None:
+                index = rsm_response.index
+                count = rsm_response.count
+                if index is None or count is None:
+                    # we don't have enough information to know if the data is complete
+                    # or not
+                    metadata["complete"] = None
+                else:
+                    # normally we have a strict equality here but XEP-0059 states
+                    # that index MAY be approximative, so just in case…
+                    metadata["complete"] = index + len(items) >= count
+        # encrypted metadata can be added by plugins in XEP-0060_items trigger
+        if "encrypted" in extra:
+            metadata["encrypted"] = extra["encrypted"]
+
+        return (items, metadata)
+
+    # @defer.inlineCallbacks
+    # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
+    #     """Massively retrieve pubsub items from many nodes.
+    #     @param service (JID): target service.
+    #     @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
+    #     @param max_items (int): optional limit on the number of retrieved items *per node*.
+    #     @param sub_id (str): optional subscription identifier.
+    #     @param rsm (dict): RSM request data
+    #     @param profile_key (str): %(doc_profile_key)s
+    #     @return: a deferred dict with:
+    #         - key: a value in (a subset of) data.keys()
+    #         - couple (list[dict], dict) containing:
+    #             - list of items
+    #             - RSM response data
+    #     """
+    #     client = self.host.get_client(profile_key)
+    #     found_nodes = yield self.listNodes(service, profile=client.profile)
+    #     d_dict = {}
+    #     for publisher, node in data.items():
+    #         if node not in found_nodes:
+    #             log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
+    #             continue  # avoid pubsub "item-not-found" error
+    #         d_dict[publisher] = self.get_items(service, node, max_items, None, sub_id, rsm, client.profile)
+    #     defer.returnValue(d_dict)
+
+    def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None,
+                   profile_key=C.PROF_KEY_NONE):
+        client = self.host.get_client(profile_key)
+        return client.pubsub_client.getOptions(
+            service, nodeIdentifier, subscriber, subscriptionIdentifier
+        )
+
+    def setOptions(self, service, nodeIdentifier, subscriber, options,
+                   subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
+        client = self.host.get_client(profile_key)
+        return client.pubsub_client.setOptions(
+            service, nodeIdentifier, subscriber, options, subscriptionIdentifier
+        )
+
+    def _create_node(self, service_s, nodeIdentifier, options, profile_key):
+        client = self.host.get_client(profile_key)
+        return self.createNode(
+            client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
+        )
+
+    def createNode(
+        self,
+        client: SatXMPPClient,
+        service: jid.JID,
+        nodeIdentifier: Optional[str] = None,
+        options: Optional[Dict[str, str]] = None
+    ) -> str:
+        """Create a new node
+
+        @param service: PubSub service,
+        @param NodeIdentifier: node name use None to create instant node (identifier will
+            be returned by this method)
+        @param option: node configuration options
+        @return: identifier of the created node (may be different from requested name)
+        """
+        # TODO: if pubsub service doesn't hande publish-options, configure it in a second time
+        return client.pubsub_client.createNode(service, nodeIdentifier, options)
+
+    @defer.inlineCallbacks
+    def create_if_new_node(self, client, service, nodeIdentifier, options=None):
+        """Helper method similar to createNode, but will not fail in case of conflict"""
+        try:
+            yield self.createNode(client, service, nodeIdentifier, options)
+        except error.StanzaError as e:
+            if e.condition == "conflict":
+                pass
+            else:
+                raise e
+
+    def _get_node_configuration(self, service_s, nodeIdentifier, profile_key):
+        client = self.host.get_client(profile_key)
+        d = self.getConfiguration(
+            client, jid.JID(service_s) if service_s else None, nodeIdentifier
+        )
+
+        def serialize(form):
+            # FIXME: better more generic dataform serialisation should be available in SàT
+            return {f.var: str(f.value) for f in list(form.fields.values())}
+
+        d.addCallback(serialize)
+        return d
+
+    def getConfiguration(self, client, service, nodeIdentifier):
+        request = pubsub.PubSubRequest("configureGet")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+
+        def cb(iq):
+            form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG)
+            form.typeCheck()
+            return form
+
+        d = request.send(client.xmlstream)
+        d.addCallback(cb)
+        return d
+
+    def make_configuration_form(self, options: dict) -> data_form.Form:
+        """Build a configuration form"""
+        form = data_form.Form(
+            formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG
+        )
+        form.makeFields(options)
+        return form
+
+    def _set_node_configuration(self, service_s, nodeIdentifier, options, profile_key):
+        client = self.host.get_client(profile_key)
+        d = self.setConfiguration(
+            client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
+        )
+        return d
+
+    def setConfiguration(self, client, service, nodeIdentifier, options):
+        request = pubsub.PubSubRequest("configureSet")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+
+        form = self.make_configuration_form(options)
+        request.options = form
+
+        d = request.send(client.xmlstream)
+        return d
+
+    def _get_affiliations(self, service_s, nodeIdentifier, profile_key):
+        client = self.host.get_client(profile_key)
+        d = self.get_affiliations(
+            client, jid.JID(service_s) if service_s else None, nodeIdentifier or None
+        )
+        return d
+
+    def get_affiliations(self, client, service, nodeIdentifier=None):
+        """Retrieve affiliations of an entity
+
+        @param nodeIdentifier(unicode, None): node to get affiliation from
+            None to get all nodes affiliations for this service
+        """
+        request = pubsub.PubSubRequest("affiliations")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+
+        def cb(iq_elt):
+            try:
+                affiliations_elt = next(
+                    iq_elt.pubsub.elements(pubsub.NS_PUBSUB, "affiliations")
+                )
+            except StopIteration:
+                raise ValueError(
+                    _("Invalid result: missing <affiliations> element: {}").format(
+                        iq_elt.toXml
+                    )
+                )
+            try:
+                return {
+                    e["node"]: e["affiliation"]
+                    for e in affiliations_elt.elements(pubsub.NS_PUBSUB, "affiliation")
+                }
+            except KeyError:
+                raise ValueError(
+                    _("Invalid result: bad <affiliation> element: {}").format(
+                        iq_elt.toXml
+                    )
+                )
+
+        d = request.send(client.xmlstream)
+        d.addCallback(cb)
+        return d
+
+    def _get_node_affiliations(self, service_s, nodeIdentifier, profile_key):
+        client = self.host.get_client(profile_key)
+        d = self.get_node_affiliations(
+            client, jid.JID(service_s) if service_s else None, nodeIdentifier
+        )
+        d.addCallback(
+            lambda affiliations: {j.full(): a for j, a in affiliations.items()}
+        )
+        return d
+
+    def get_node_affiliations(self, client, service, nodeIdentifier):
+        """Retrieve affiliations of a node owned by profile"""
+        request = pubsub.PubSubRequest("affiliationsGet")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+
+        def cb(iq_elt):
+            try:
+                affiliations_elt = next(
+                    iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "affiliations")
+                )
+            except StopIteration:
+                raise ValueError(
+                    _("Invalid result: missing <affiliations> element: {}").format(
+                        iq_elt.toXml
+                    )
+                )
+            try:
+                return {
+                    jid.JID(e["jid"]): e["affiliation"]
+                    for e in affiliations_elt.elements(
+                        (pubsub.NS_PUBSUB_OWNER, "affiliation")
+                    )
+                }
+            except KeyError:
+                raise ValueError(
+                    _("Invalid result: bad <affiliation> element: {}").format(
+                        iq_elt.toXml
+                    )
+                )
+
+        d = request.send(client.xmlstream)
+        d.addCallback(cb)
+        return d
+
+    def _set_node_affiliations(
+        self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE
+    ):
+        client = self.host.get_client(profile_key)
+        affiliations = {
+            jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items()
+        }
+        d = self.set_node_affiliations(
+            client,
+            jid.JID(service_s) if service_s else None,
+            nodeIdentifier,
+            affiliations,
+        )
+        return d
+
+    def set_node_affiliations(self, client, service, nodeIdentifier, affiliations):
+        """Update affiliations of a node owned by profile
+
+        @param affiliations(dict[jid.JID, unicode]): affiliations to set
+            check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations
+        """
+        request = pubsub.PubSubRequest("affiliationsSet")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.affiliations = affiliations
+        d = request.send(client.xmlstream)
+        return d
+
+    def _purge_node(self, service_s, nodeIdentifier, profile_key):
+        client = self.host.get_client(profile_key)
+        return self.purge_node(
+            client, jid.JID(service_s) if service_s else None, nodeIdentifier
+        )
+
+    def purge_node(self, client, service, nodeIdentifier):
+        return client.pubsub_client.purge_node(service, nodeIdentifier)
+
+    def _delete_node(self, service_s, nodeIdentifier, profile_key):
+        client = self.host.get_client(profile_key)
+        return self.deleteNode(
+            client, jid.JID(service_s) if service_s else None, nodeIdentifier
+        )
+
+    def deleteNode(
+        self,
+        client: SatXMPPClient,
+        service: jid.JID,
+        nodeIdentifier: str
+    ) -> defer.Deferred:
+        return client.pubsub_client.deleteNode(service, nodeIdentifier)
+
+    def _addWatch(self, service_s, node, profile_key):
+        """watch modifications on a node
+
+        This method should only be called from bridge
+        """
+        client = self.host.get_client(profile_key)
+        service = jid.JID(service_s) if service_s else client.jid.userhostJID()
+        client.pubsub_watching.add((service, node))
+
+    def _remove_watch(self, service_s, node, profile_key):
+        """remove a node watch
+
+        This method should only be called from bridge
+        """
+        client = self.host.get_client(profile_key)
+        service = jid.JID(service_s) if service_s else client.jid.userhostJID()
+        client.pubsub_watching.remove((service, node))
+
+    def _retract_item(
+        self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key
+    ):
+        return self._retract_items(
+            service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key
+        )
+
+    def _retract_items(
+        self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key
+    ):
+        client = self.host.get_client(profile_key)
+        return self.retract_items(
+            client,
+            jid.JID(service_s) if service_s else None,
+            nodeIdentifier,
+            itemIdentifiers,
+            notify,
+        )
+
+    def retract_items(
+        self,
+        client: SatXMPPClient,
+        service: jid.JID,
+        nodeIdentifier: str,
+        itemIdentifiers: Iterable[str],
+        notify: bool = True,
+    ) -> defer.Deferred:
+        return client.pubsub_client.retractItems(
+            service, nodeIdentifier, itemIdentifiers, notify=notify
+        )
+
+    def _rename_item(
+        self,
+        service,
+        node,
+        item_id,
+        new_id,
+        profile_key=C.PROF_KEY_NONE,
+    ):
+        client = self.host.get_client(profile_key)
+        service = jid.JID(service) if service else None
+        return defer.ensureDeferred(self.rename_item(
+            client, service, node, item_id, new_id
+        ))
+
+    async def rename_item(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        node: str,
+        item_id: str,
+        new_id: str
+    ) -> None:
+        """Rename an item by recreating it then deleting it
+
+        we have to recreate then delete because there is currently no rename operation
+        with PubSub
+        """
+        if not item_id or not new_id:
+            raise ValueError("item_id and new_id must not be empty")
+        # retract must be done last, so if something goes wrong, the exception will stop
+        # the workflow and no accidental delete should happen
+        item_elt = (await self.get_items(client, service, node, item_ids=[item_id]))[0][0]
+        await self.send_item(client, service, node, item_elt.firstChildElement(), new_id)
+        await self.retract_items(client, service, node, [item_id])
+
+    def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
+        client = self.host.get_client(profile_key)
+        service = None if not service else jid.JID(service)
+        d = defer.ensureDeferred(
+            self.subscribe(
+                client,
+                service,
+                nodeIdentifier,
+                options=data_format.deserialise(options)
+            )
+        )
+        d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
+        return d
+
+    async def subscribe(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        nodeIdentifier: str,
+        sub_jid: Optional[jid.JID] = None,
+        options: Optional[dict] = None
+    ) -> pubsub.Subscription:
+        # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
+        if service is None:
+            service = client.jid.userhostJID()
+        cont, trigger_sub = await self.host.trigger.async_return_point(
+            "XEP-0060_subscribe", client, service, nodeIdentifier, sub_jid, options,
+        )
+        if not cont:
+            return trigger_sub
+        try:
+            subscription = await client.pubsub_client.subscribe(
+                service, nodeIdentifier, sub_jid or client.jid.userhostJID(),
+                options=options, sender=client.jid.userhostJID()
+            )
+        except error.StanzaError as e:
+            if e.condition == 'item-not-found':
+                raise exceptions.NotFound(e.text or e.condition)
+            else:
+                raise e
+        return subscription
+
+    def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
+        client = self.host.get_client(profile_key)
+        service = None if not service else jid.JID(service)
+        return defer.ensureDeferred(self.unsubscribe(client, service, nodeIdentifier))
+
+    async def unsubscribe(
+        self,
+        client: SatXMPPEntity,
+        service: jid.JID,
+        nodeIdentifier: str,
+        sub_jid: Optional[jid.JID] = None,
+        subscriptionIdentifier: Optional[str] = None,
+        sender: Optional[jid.JID] = None,
+    ) -> None:
+        if not await self.host.trigger.async_point(
+            "XEP-0060_unsubscribe", client, service, nodeIdentifier, sub_jid,
+            subscriptionIdentifier, sender
+        ):
+            return
+        try:
+            await client.pubsub_client.unsubscribe(
+            service,
+            nodeIdentifier,
+            sub_jid or client.jid.userhostJID(),
+            subscriptionIdentifier,
+            sender,
+        )
+        except error.StanzaError as e:
+            try:
+                next(e.getElement().elements(pubsub.NS_PUBSUB_ERRORS, "not-subscribed"))
+            except StopIteration:
+                raise e
+            else:
+                log.info(
+                    f"{sender.full() if sender else client.jid.full()} was not "
+                    f"subscribed to node {nodeIdentifier!s} at {service.full()}"
+                )
+
+    @utils.ensure_deferred
+    async def _subscriptions(
+        self,
+        service="",
+        nodeIdentifier="",
+        profile_key=C.PROF_KEY_NONE
+    ) -> str:
+        client = self.host.get_client(profile_key)
+        service = None if not service else jid.JID(service)
+        subs = await self.subscriptions(client, service, nodeIdentifier or None)
+        return data_format.serialise(subs)
+
+    async def subscriptions(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID] = None,
+        node: Optional[str] = None
+    ) -> List[Dict[str, Union[str, bool]]]:
+        """Retrieve subscriptions from a service
+
+        @param service(jid.JID): PubSub service
+        @param nodeIdentifier(unicode, None): node to check
+            None to get all subscriptions
+        """
+        cont, ret = await self.host.trigger.async_return_point(
+            "XEP-0060_subscriptions", client, service, node
+        )
+        if not cont:
+            return ret
+        subs = await client.pubsub_client.subscriptions(service, node)
+        ret = []
+        for sub in subs:
+            sub_dict = {
+                "service": service.host if service else client.jid.host,
+                "node": sub.nodeIdentifier,
+                "subscriber": sub.subscriber.full(),
+                "state": sub.state,
+            }
+            if sub.subscriptionIdentifier is not None:
+                sub_dict["id"] = sub.subscriptionIdentifier
+            ret.append(sub_dict)
+        return ret
+
+    ## misc tools ##
+
+    def get_node_uri(self, service, node, item=None):
+        """Return XMPP URI of a PubSub node
+
+        @param service(jid.JID): PubSub service
+        @param node(unicode): node
+        @return (unicode): URI of the node
+        """
+        # FIXME: deprecated, use sat.tools.common.uri instead
+        assert service is not None
+        # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122)
+        #      use ";" as a separator. So if more than one value is used in query_data,
+        #      urlencode MUST NOT BE USED.
+        query_data = [("node", node.encode("utf-8"))]
+        if item is not None:
+            query_data.append(("item", item.encode("utf-8")))
+        return "xmpp:{service}?;{query}".format(
+            service=service.userhost(), query=urllib.parse.urlencode(query_data)
+        )
+
+    ## methods to manage several stanzas/jids at once ##
+
+    # generic #
+
+    def get_rt_results(
+        self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
+    ):
+        return self.rt_sessions.get_results(session_id, on_success, on_error, profile)
+
+    def trans_items_data(self, items_data, item_cb=lambda item: item.toXml()):
+        """Helper method to transform result from [get_items]
+
+        the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
+        as returned by [get_items].
+        @param items_data(tuple): tuple returned by [get_items]
+        @param item_cb(callable): method to transform each item
+        @return (tuple): a serialised form ready to go throught bridge
+        """
+        items, metadata = items_data
+        items = [item_cb(item) for item in items]
+
+        return (items, metadata)
+
+    def trans_items_data_d(self, items_data, item_cb):
+        """Helper method to transform result from [get_items], deferred version
+
+        the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
+        as returned by [get_items]. metadata values are then casted to unicode and
+        each item is passed to items_cb.
+        An errback is added to item_cb, and when it is fired the value is filtered from
+            final items
+        @param items_data(tuple): tuple returned by [get_items]
+        @param item_cb(callable): method to transform each item (must return a deferred)
+        @return (tuple): a deferred which fire a dict which can be serialised to go
+            throught bridge
+        """
+        items, metadata = items_data
+
+        def eb(failure_):
+            log.warning(f"Error while parsing item: {failure_.value}")
+
+        d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])
+        d.addCallback(lambda parsed_items: (
+            [i for i in parsed_items if i is not None],
+            metadata
+        ))
+        return d
+
+    def ser_d_list(self, results, failure_result=None):
+        """Serialise a DeferredList result
+
+        @param results: DeferredList results
+        @param failure_result: value to use as value for failed Deferred
+            (default: empty tuple)
+        @return (list): list with:
+            - failure: empty in case of success, else error message
+            - result
+        """
+        if failure_result is None:
+            failure_result = ()
+        return [
+            ("", result)
+            if success
+            else (str(result.result) or UNSPECIFIED, failure_result)
+            for success, result in results
+        ]
+
+    # subscribe #
+
+    @utils.ensure_deferred
+    async def _get_node_subscriptions(
+        self,
+        service: str,
+        node: str,
+        profile_key: str
+    ) -> Dict[str, str]:
+        client = self.host.get_client(profile_key)
+        subs = await self.get_node_subscriptions(
+            client, jid.JID(service) if service else None, node
+        )
+        return {j.full(): a for j, a in subs.items()}
+
+    async def get_node_subscriptions(
+        self,
+        client: SatXMPPEntity,
+        service: Optional[jid.JID],
+        nodeIdentifier: str
+    ) -> Dict[jid.JID, str]:
+        """Retrieve subscriptions to a node
+
+        @param nodeIdentifier(unicode): node to get subscriptions from
+        """
+        if not nodeIdentifier:
+            raise exceptions.DataError("node identifier can't be empty")
+        request = pubsub.PubSubRequest("subscriptionsGet")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+
+        iq_elt = await request.send(client.xmlstream)
+        try:
+            subscriptions_elt = next(
+                iq_elt.pubsub.elements(pubsub.NS_PUBSUB_OWNER, "subscriptions")
+            )
+        except StopIteration:
+            raise ValueError(
+                _("Invalid result: missing <subscriptions> element: {}").format(
+                    iq_elt.toXml
+                )
+            )
+        except AttributeError as e:
+            raise ValueError(_("Invalid result: {}").format(e))
+        try:
+            return {
+                jid.JID(s["jid"]): s["subscription"]
+                for s in subscriptions_elt.elements(
+                    (pubsub.NS_PUBSUB, "subscription")
+                )
+            }
+        except KeyError:
+            raise ValueError(
+                _("Invalid result: bad <subscription> element: {}").format(
+                    iq_elt.toXml
+                )
+            )
+
+    def _set_node_subscriptions(
+        self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
+    ):
+        client = self.host.get_client(profile_key)
+        subscriptions = {
+            jid.JID(jid_): subscription
+            for jid_, subscription in subscriptions.items()
+        }
+        d = self.set_node_subscriptions(
+            client,
+            jid.JID(service_s) if service_s else None,
+            nodeIdentifier,
+            subscriptions,
+        )
+        return d
+
+    def set_node_subscriptions(self, client, service, nodeIdentifier, subscriptions):
+        """Set or update subscriptions of a node owned by profile
+
+        @param subscriptions(dict[jid.JID, unicode]): subscriptions to set
+            check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions
+        """
+        request = pubsub.PubSubRequest("subscriptionsSet")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.subscriptions = {
+            pubsub.Subscription(nodeIdentifier, jid_, state)
+            for jid_, state in subscriptions.items()
+        }
+        d = request.send(client.xmlstream)
+        return d
+
+    def _many_subscribe_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
+        """Get real-time results for subcribeToManu session
+
+        @param session_id: id of the real-time deferred session
+        @param return (tuple): (remaining, results) where:
+            - remaining is the number of still expected results
+            - results is a list of tuple(unicode, unicode, bool, unicode) with:
+                - service: pubsub service
+                - and node: pubsub node
+                - failure(unicode): empty string in case of success, error message else
+        @param profile_key: %(doc_profile_key)s
+        """
+        profile = self.host.get_client(profile_key).profile
+        d = self.rt_sessions.get_results(
+            session_id,
+            on_success=lambda result: "",
+            on_error=lambda failure: str(failure.value),
+            profile=profile,
+        )
+        # we need to convert jid.JID to unicode with full() to serialise it for the bridge
+        d.addCallback(
+            lambda ret: (
+                ret[0],
+                [
+                    (service.full(), node, "" if success else failure or UNSPECIFIED)
+                    for (service, node), (success, failure) in ret[1].items()
+                ],
+            )
+        )
+        return d
+
+    def _subscribe_to_many(
+        self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE
+    ):
+        return self.subscribe_to_many(
+            [(jid.JID(service), str(node)) for service, node in node_data],
+            jid.JID(subscriber),
+            options,
+            profile_key,
+        )
+
+    def subscribe_to_many(
+        self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE
+    ):
+        """Subscribe to several nodes at once.
+
+        @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
+            - service (jid.JID) is the pubsub service
+            - node (unicode) is the node to subscribe to
+        @param subscriber (jid.JID): optional subscription identifier.
+        @param options (dict): subscription options
+        @param profile_key (str): %(doc_profile_key)s
+        @return (str): RT Deferred session id
+        """
+        client = self.host.get_client(profile_key)
+        deferreds = {}
+        for service, node in node_data:
+            deferreds[(service, node)] = defer.ensureDeferred(
+                client.pubsub_client.subscribe(
+                    service, node, subscriber, options=options
+                )
+            )
+        return self.rt_sessions.new_session(deferreds, client.profile)
+        # found_nodes = yield self.listNodes(service, profile=client.profile)
+        # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
+        # d_list = []
+        # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
+        #     if nodeIdentifier not in found_nodes:
+        #         log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
+        #         continue  # avoid sat-pubsub "SubscriptionExists" error
+        #     d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
+        # defer.returnValue(d_list)
+
+    # get #
+
+    def _get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
+        """Get real-time results for get_from_many session
+
+        @param session_id: id of the real-time deferred session
+        @param profile_key: %(doc_profile_key)s
+        @param return (tuple): (remaining, results) where:
+            - remaining is the number of still expected results
+            - results is a list of tuple with
+                - service (unicode): pubsub service
+                - node (unicode): pubsub node
+                - failure (unicode): empty string in case of success, error message else
+                - items (list[s]): raw XML of items
+                - metadata(dict): serialised metadata
+        """
+        profile = self.host.get_client(profile_key).profile
+        d = self.rt_sessions.get_results(
+            session_id,
+            on_success=lambda result: ("", self.trans_items_data(result)),
+            on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})),
+            profile=profile,
+        )
+        d.addCallback(
+            lambda ret: (
+                ret[0],
+                [
+                    (service.full(), node, failure, items, metadata)
+                    for (service, node), (success, (failure, (items, metadata))) in ret[
+                        1
+                    ].items()
+                ],
+            )
+        )
+        return d
+
+    def _get_from_many(
+        self, node_data, max_item=10, extra="", profile_key=C.PROF_KEY_NONE
+    ):
+        """
+        @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
+        """
+        max_item = None if max_item == C.NO_LIMIT else max_item
+        extra = self.parse_extra(data_format.deserialise(extra))
+        return self.get_from_many(
+            [(jid.JID(service), str(node)) for service, node in node_data],
+            max_item,
+            extra.rsm_request,
+            extra.extra,
+            profile_key,
+        )
+
+    def get_from_many(self, node_data, max_item=None, rsm_request=None, extra=None,
+                    profile_key=C.PROF_KEY_NONE):
+        """Get items from many nodes at once
+
+        @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
+            - service (jid.JID) is the pubsub service
+            - node (unicode) is the node to get items from
+        @param max_items (int): optional limit on the number of retrieved items.
+        @param rsm_request (RSMRequest): RSM request data
+        @param profile_key (unicode): %(doc_profile_key)s
+        @return (str): RT Deferred session id
+        """
+        client = self.host.get_client(profile_key)
+        deferreds = {}
+        for service, node in node_data:
+            deferreds[(service, node)] = defer.ensureDeferred(self.get_items(
+                client, service, node, max_item, rsm_request=rsm_request, extra=extra
+            ))
+        return self.rt_sessions.new_session(deferreds, client.profile)
+
+
+@implementer(disco.IDisco)
+class SatPubSubClient(rsm.PubSubClient):
+
+    def __init__(self, host, parent_plugin):
+        self.host = host
+        self.parent_plugin = parent_plugin
+        rsm.PubSubClient.__init__(self)
+
+    def connectionInitialized(self):
+        rsm.PubSubClient.connectionInitialized(self)
+
+    async def items(
+        self,
+        service: Optional[jid.JID],
+        nodeIdentifier: str,
+        maxItems: Optional[int] = None,
+        subscriptionIdentifier: Optional[str] = None,
+        sender: Optional[jid.JID] = None,
+        itemIdentifiers: Optional[Set[str]] = None,
+        orderBy: Optional[List[str]] = None,
+        rsm_request: Optional[rsm.RSMRequest] = None,
+        extra: Optional[Dict[str, Any]] = None,
+    ):
+        if extra is None:
+            extra = {}
+        items, rsm_response = await super().items(
+            service, nodeIdentifier, maxItems, subscriptionIdentifier, sender,
+            itemIdentifiers, orderBy, rsm_request
+        )
+        # items must be returned, thus this async point can't stop the workflow (but it
+        # can modify returned items)
+        await self.host.trigger.async_point(
+            "XEP-0060_items", self.parent, service, nodeIdentifier, items, rsm_response,
+            extra
+        )
+        return items, rsm_response
+
+    def _get_node_callbacks(self, node, event):
+        """Generate callbacks from given node and event
+
+        @param node(unicode): node used for the item
+            any registered node which prefix the node will match
+        @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE
+        @return (iterator[callable]): callbacks for this node/event
+        """
+        for registered_node, callbacks_dict in self.parent_plugin._node_cb.items():
+            if not node.startswith(registered_node):
+                continue
+            try:
+                for callback_data in callbacks_dict[event]:
+                    yield callback_data[0]
+            except KeyError:
+                continue
+
+    async def _call_node_callbacks(self, client, event: pubsub.ItemsEvent) -> None:
+        """Call sequencially event callbacks of a node
+
+        Callbacks are called sequencially and not in parallel to be sure to respect
+        priority (notably for plugin needing to get old items before they are modified or
+        deleted from cache).
+        """
+        for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_ITEMS):
+            try:
+                await utils.as_deferred(callback, client, event)
+            except Exception as e:
+                log.error(
+                    f"Error while running items event callback {callback}: {e}"
+                )
+
+    def itemsReceived(self, event):
+        log.debug("Pubsub items received")
+        client = self.parent
+        defer.ensureDeferred(self._call_node_callbacks(client, event))
+        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
+            raw_items = [i.toXml() for i in event.items]
+            self.host.bridge.ps_event_raw(
+                event.sender.full(),
+                event.nodeIdentifier,
+                C.PS_ITEMS,
+                raw_items,
+                client.profile,
+            )
+
+    def deleteReceived(self, event):
+        log.debug(("Publish node deleted"))
+        for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_DELETE):
+            d = utils.as_deferred(callback, self.parent, event)
+            d.addErrback(lambda f: log.error(
+                f"Error while running delete event callback {callback}: {f}"
+            ))
+        client = self.parent
+        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
+            self.host.bridge.ps_event_raw(
+                event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
+            )
+
+    def purgeReceived(self, event):
+        log.debug(("Publish node purged"))
+        for callback in self._get_node_callbacks(event.nodeIdentifier, C.PS_PURGE):
+            d = utils.as_deferred(callback, self.parent, event)
+            d.addErrback(lambda f: log.error(
+                f"Error while running purge event callback {callback}: {f}"
+            ))
+        client = self.parent
+        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
+            self.host.bridge.ps_event_raw(
+                event.sender.full(), event.nodeIdentifier, C.PS_PURGE, [], client.profile
+            )
+
+    def subscriptions(self, service, nodeIdentifier, sender=None):
+        """Return the list of subscriptions to the given service and node.
+
+        @param service: The publish subscribe service to retrieve the subscriptions from.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
+        @type nodeIdentifier: C{unicode}
+        @return (list[pubsub.Subscription]): list of subscriptions
+        """
+        request = pubsub.PubSubRequest("subscriptions")
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.sender = sender
+        d = request.send(self.xmlstream)
+
+        def cb(iq):
+            subs = []
+            for subscription_elt in iq.pubsub.subscriptions.elements(
+                pubsub.NS_PUBSUB, "subscription"
+            ):
+                subscription = pubsub.Subscription(
+                    subscription_elt["node"],
+                    jid.JID(subscription_elt["jid"]),
+                    subscription_elt["subscription"],
+                    subscriptionIdentifier=subscription_elt.getAttribute("subid"),
+                )
+                subs.append(subscription)
+            return subs
+
+        return d.addCallback(cb)
+
+    def purge_node(self, service, nodeIdentifier):
+        """Purge a node (i.e. delete all items from it)
+
+        @param service(jid.JID, None): service to send the item to
+            None to use PEP
+        @param NodeIdentifier(unicode): PubSub node to use
+        """
+        # TODO: propose this upstream and remove it once merged
+        request = pubsub.PubSubRequest('purge')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        return request.send(self.xmlstream)
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
+        disco_info = []
+        self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
+        return disco_info
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
+        return []