view sat/plugins/plugin_xep_0060.py @ 3028:ab2696e34d29

Python 3 port: /!\ this is a huge commit /!\ starting from this commit, SàT is needs Python 3.6+ /!\ SàT maybe be instable or some feature may not work anymore, this will improve with time This patch port backend, bridge and frontends to Python 3. Roughly this has been done this way: - 2to3 tools has been applied (with python 3.7) - all references to python2 have been replaced with python3 (notably shebangs) - fixed files not handled by 2to3 (notably the shell script) - several manual fixes - fixed issues reported by Python 3 that where not handled in Python 2 - replaced "async" with "async_" when needed (it's a reserved word from Python 3.7) - replaced zope's "implements" with @implementer decorator - temporary hack to handle data pickled in database, as str or bytes may be returned, to be checked later - fixed hash comparison for password - removed some code which is not needed anymore with Python 3 - deactivated some code which needs to be checked (notably certificate validation) - tested with jp, fixed reported issues until some basic commands worked - ported Primitivus (after porting dependencies like urwid satext) - more manual fixes
author Goffi <goffi@goffi.org>
date Tue, 13 Aug 2019 19:08:41 +0200
parents cd391ea847cb
children 93e8793a735a
line wrap: on
line source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# SAT plugin for Publish-Subscribe (xep-0060)
# Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger

log = getLogger(__name__)
from sat.core import exceptions
from sat.tools import sat_defer
from sat.tools.common import data_format

from twisted.words.protocols.jabber import jid, error
from twisted.internet import reactor, defer
from wokkel import disco
from wokkel import data_form
from wokkel import generic
from zope.interface import implementer
from collections import namedtuple
import urllib.request, urllib.parse, urllib.error

# XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version
#      mam and rsm come from sat_tmp.wokkel too
from wokkel import pubsub
from wokkel import rsm
from wokkel import mam


PLUGIN_INFO = {
    C.PI_NAME: "Publish-Subscribe",
    C.PI_IMPORT_NAME: "XEP-0060",
    C.PI_TYPE: "XEP",
    C.PI_PROTOCOLS: ["XEP-0060"],
    C.PI_DEPENDENCIES: [],
    C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"],
    C.PI_MAIN: "XEP_0060",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""),
}

UNSPECIFIED = "unspecified error"


Extra = namedtuple("Extra", ("rsm_request", "extra"))
# rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None
# extra is a potentially empty dict
TIMEOUT = 30

class XEP_0060(object):
    OPT_ACCESS_MODEL = "pubsub#access_model"
    OPT_PERSIST_ITEMS = "pubsub#persist_items"
    OPT_MAX_ITEMS = "pubsub#max_items"
    OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads"
    OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe"
    OPT_NODE_TYPE = "pubsub#node_type"
    OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type"
    OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth"
    OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed"
    OPT_PUBLISH_MODEL = "pubsub#publish_model"
    ACCESS_OPEN = "open"
    ACCESS_PRESENCE = "presence"
    ACCESS_ROSTER = "roster"
    ACCESS_PUBLISHER_ROSTER = "publisher-roster"
    ACCESS_AUTHORIZE = "authorize"
    ACCESS_WHITELIST = "whitelist"
    ID_SINGLETON = "current"

    def __init__(self, host):
        log.info(_("PubSub plugin initialization"))
        self.host = host
        self._rsm = host.plugins.get("XEP-0059")
        self._mam = host.plugins.get("XEP-0313")
        self._node_cb = {}  # dictionnary of callbacks for node (key: node, value: list of callbacks)
        self.rt_sessions = sat_defer.RTDeferredSessions()
        host.bridge.addMethod(
            "psNodeCreate",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="s",
            method=self._createNode,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeConfigurationGet",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._getNodeConfiguration,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeConfigurationSet",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="",
            method=self._setNodeConfiguration,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeAffiliationsGet",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._getNodeAffiliations,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeAffiliationsSet",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="",
            method=self._setNodeAffiliations,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeSubscriptionsGet",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._getNodeSubscriptions,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeSubscriptionsSet",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="",
            method=self._setNodeSubscriptions,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodePurge",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._purgeNode,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeDelete",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._deleteNode,
            async_=True,
        )
        host.bridge.addMethod(
            "psNodeWatchAdd",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._addWatch,
            async_=False,
        )
        host.bridge.addMethod(
            "psNodeWatchRemove",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._removeWatch,
            async_=False,
        )
        host.bridge.addMethod(
            "psAffiliationsGet",
            ".plugin",
            in_sign="sss",
            out_sign="a{ss}",
            method=self._getAffiliations,
            async_=True,
        )
        host.bridge.addMethod(
            "psItemsGet",
            ".plugin",
            in_sign="ssiassa{ss}s",
            out_sign="(asa{ss})",
            method=self._getItems,
            async_=True,
        )
        host.bridge.addMethod(
            "psItemSend",
            ".plugin",
            in_sign="ssssa{ss}s",
            out_sign="s",
            method=self._sendItem,
            async_=True,
        )
        host.bridge.addMethod(
            "psItemsSend",
            ".plugin",
            in_sign="ssasa{ss}s",
            out_sign="as",
            method=self._sendItems,
            async_=True,
        )
        host.bridge.addMethod(
            "psRetractItem",
            ".plugin",
            in_sign="sssbs",
            out_sign="",
            method=self._retractItem,
            async_=True,
        )
        host.bridge.addMethod(
            "psRetractItems",
            ".plugin",
            in_sign="ssasbs",
            out_sign="",
            method=self._retractItems,
            async_=True,
        )
        host.bridge.addMethod(
            "psSubscribe",
            ".plugin",
            in_sign="ssa{ss}s",
            out_sign="s",
            method=self._subscribe,
            async_=True,
        )
        host.bridge.addMethod(
            "psUnsubscribe",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._unsubscribe,
            async_=True,
        )
        host.bridge.addMethod(
            "psSubscriptionsGet",
            ".plugin",
            in_sign="sss",
            out_sign="aa{ss}",
            method=self._subscriptions,
            async_=True,
        )
        host.bridge.addMethod(
            "psSubscribeToMany",
            ".plugin",
            in_sign="a(ss)sa{ss}s",
            out_sign="s",
            method=self._subscribeToMany,
        )
        host.bridge.addMethod(
            "psGetSubscribeRTResult",
            ".plugin",
            in_sign="ss",
            out_sign="(ua(sss))",
            method=self._manySubscribeRTResult,
            async_=True,
        )
        host.bridge.addMethod(
            "psGetFromMany",
            ".plugin",
            in_sign="a(ss)ia{ss}s",
            out_sign="s",
            method=self._getFromMany,
        )
        host.bridge.addMethod(
            "psGetFromManyRTResult",
            ".plugin",
            in_sign="ss",
            out_sign="(ua(sssasa{ss}))",
            method=self._getFromManyRTResult,
            async_=True,
        )

        #  high level observer method
        host.bridge.addSignal(
            "psEvent", ".plugin", signature="ssssss"
        )  # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile

        # low level observer method, used if service/node is in watching list (see psNodeWatch* methods)
        host.bridge.addSignal(
            "psEventRaw", ".plugin", signature="sssass"
        )  # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile

    def getHandler(self, client):
        client.pubsub_client = SatPubSubClient(self.host, self)
        return client.pubsub_client

    @defer.inlineCallbacks
    def profileConnected(self, client):
        client.pubsub_watching = set()
        try:
            client.pubsub_service = jid.JID(
                self.host.memory.getConfig("", "pubsub_service")
            )
        except RuntimeError:
            log.info(
                _(
                    "Can't retrieve pubsub_service from conf, we'll use first one that we find"
                )
            )
            client.pubsub_service = yield self.host.findServiceEntity(
                client, "pubsub", "service"
            )

    def getFeatures(self, profile):
        try:
            client = self.host.getClient(profile)
        except exceptions.ProfileNotSetError:
            return {}
        try:
            return {
                "service": client.pubsub_service.full()
                if client.pubsub_service is not None
                else ""
            }
        except AttributeError:
            if self.host.isConnected(profile):
                log.debug("Profile is not connected, service is not checked yet")
            else:
                log.error("Service should be available !")
            return {}

    def parseExtra(self, extra):
        """Parse extra dictionnary

        used bridge's extra dictionnaries
        @param extra(dict): extra data used to configure request
        @return(Extra): filled Extra instance
        """
        if extra is None:
            rsm_request = None
            extra = {}
        else:
            # order-by
            if C.KEY_ORDER_BY in extra:
                # FIXME: we temporarily manage only one level of ordering
                #        we need to switch to a fully serialised extra data
                #        to be able to encode a whole ordered list
                extra[C.KEY_ORDER_BY] = [extra.pop(C.KEY_ORDER_BY)]

            # rsm
            if self._rsm is None:
                rsm_request = None
            else:
                rsm_request = self._rsm.parseExtra(extra)

            # mam
            if self._mam is None:
                mam_request = None
            else:
                mam_request = self._mam.parseExtra(extra, with_rsm=False)

            if mam_request is not None:
                assert "mam" not in extra
                extra["mam"] = mam_request

        return Extra(rsm_request, extra)

    def addManagedNode(self, node, **kwargs):
        """Add a handler for a node

        @param node(unicode): node to monitor
            all node *prefixed* with this one will be triggered
        @param **kwargs: method(s) to call when the node is found
            the method must be named after PubSub constants in lower case
            and suffixed with "_cb"
            e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE
        """
        assert node is not None
        assert kwargs
        callbacks = self._node_cb.setdefault(node, {})
        for event, cb in kwargs.items():
            event_name = event[:-3]
            assert event_name in C.PS_EVENTS
            callbacks.setdefault(event_name, []).append(cb)

    def removeManagedNode(self, node, *args):
        """Add a handler for a node

        @param node(unicode): node to monitor
        @param *args: callback(s) to remove
        """
        assert args
        try:
            registred_cb = self._node_cb[node]
        except KeyError:
            pass
        else:
            for callback in args:
                for event, cb_list in registred_cb.items():
                    try:
                        cb_list.remove(callback)
                    except ValueError:
                        pass
                    else:
                        log.debug(
                            "removed callback {cb} for event {event} on node {node}".format(
                                cb=callback, event=event, node=node
                            )
                        )
                        if not cb_list:
                            del registred_cb[event]
                        if not registred_cb:
                            del self._node_cb[node]
                        return
        log.error(
            "Trying to remove inexistant callback {cb} for node {node}".format(
                cb=callback, node=node
            )
        )

    # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
    #     """Retrieve the name of the nodes that are accessible on the target service.

    #     @param service (JID): target service
    #     @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
    #     @param profile (str): %(doc_profile)s
    #     @return: deferred which fire a list of nodes
    #     """
    #     client = self.host.getClient(profile)
    #     d = self.host.getDiscoItems(client, service, nodeIdentifier)
    #     d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
    #     return d

    # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
    #     """Retrieve the name of the nodes to which the profile is subscribed on the target service.

    #     @param service (JID): target service
    #     @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
    #     @param filter_ (str): filter the result according to the given subscription type:
    #         - None: do not filter
    #         - 'pending': subscription has not been approved yet by the node owner
    #         - 'unconfigured': subscription options have not been configured yet
    #         - 'subscribed': subscription is complete
    #     @param profile (str): %(doc_profile)s
    #     @return: Deferred list[str]
    #     """
    #     d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
    #     d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
    #     return d

    def _sendItem(self, service, nodeIdentifier, payload, item_id=None, extra=None,
                  profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        service = None if not service else jid.JID(service)
        d = self.sendItem(
            client, service, nodeIdentifier, payload, item_id or None, extra
        )
        d.addCallback(lambda ret: ret or "")
        return d

    def _sendItems(self, service, nodeIdentifier, items, extra=None,
                  profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        service = None if not service else jid.JID(service)
        try:
            items = [generic.parseXml(item.encode('utf-8')) for item in items]
        except Exception as e:
            raise exceptions.DataError(_("Can't parse items: {msg}").format(
                msg=e))
        d = self.sendItems(
            client, service, nodeIdentifier, items, extra
        )
        return d

    def _getPublishedItemId(self, iq_elt, original_id):
        """return item of published id if found in answer

        if not found original_id is returned, or empty string if it is None or empty
            string
        """
        try:
            item_id = iq_elt.pubsub.publish.item["id"]
        except (AttributeError, KeyError):
            item_id = None
        return item_id or original_id

    def sendItem(self, client, service, nodeIdentifier, payload, item_id=None,
                 extra=None):
        """High level method to send one item

        @param service(jid.JID, None): service to send the item to
            None to use PEP
        @param NodeIdentifier(unicode): PubSub node to use
        @param payload(domish.Element, unicode): payload of the item to send
        @param item_id(unicode, None): id to use or None to create one
        @param extra(dict, None): extra option, not used yet
        @return (unicode, None): id of the created item
        """
        item_elt = pubsub.Item(id=item_id, payload=payload)
        d = self.publish(client, service, nodeIdentifier, [item_elt])
        d.addCallback(self._getPublishedItemId, item_id)
        return d

    def _publishCb(self, iq_result):
        """Parse publish result, and return ids given by pubsub service"""
        try:
            item_ids = [item['id']
                for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')]
        except AttributeError:
            return []
        return item_ids

    def sendItems(self, client, service, nodeIdentifier, items, extra=None):
        """High level method to send several items at once

        @param service(jid.JID, None): service to send the item to
            None to use PEP
        @param NodeIdentifier(unicode): PubSub node to use
        @param items(list[domish.Element]): whole item elements to send,
            "id" will be used if set
        @param extra(dict, None): extra option, not used yet
        @return (list[unicode]): ids of the created items
        """
        parsed_items = []
        for item in items:
            if item.name != 'item':
                raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml()))
            item_id = item.getAttribute("id")
            parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement()))
        d = self.publish(client, service, nodeIdentifier, parsed_items)
        d.addCallback(self._publishCb)
        return d

    def publish(self, client, service, nodeIdentifier, items=None):
        return client.pubsub_client.publish(
            service, nodeIdentifier, items, client.pubsub_client.parent.jid
        )

    def _unwrapMAMMessage(self, message_elt):
        try:
            item_elt = (
                next(message_elt.elements(mam.NS_MAM, "result").next()
                .elements(C.NS_FORWARD, "forwarded").next()
                .elements(C.NS_CLIENT, "message").next()
                .elements("http://jabber.org/protocol/pubsub#event", "event").next()
                .elements("http://jabber.org/protocol/pubsub#event", "items").next()
                .elements("http://jabber.org/protocol/pubsub#event", "item"))
            )
        except StopIteration:
            raise exceptions.DataError("Can't find Item in MAM message element")
        return item_elt

    def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None,
                  extra_dict=None, profile_key=C.PROF_KEY_NONE):
        """Get items from pubsub node

        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        """
        client = self.host.getClient(profile_key)
        service = jid.JID(service) if service else None
        max_items = None if max_items == C.NO_LIMIT else max_items
        extra = self.parseExtra(extra_dict)
        d = self.getItems(
            client,
            service,
            node or None,
            max_items or None,
            item_ids,
            sub_id or None,
            extra.rsm_request,
            extra.extra,
        )
        d.addCallback(self.transItemsData)
        return d

    def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None,
                 rsm_request=None, extra=None):
        """Retrieve pubsub items from a node.

        @param service (JID, None): pubsub service.
        @param node (str): node id.
        @param max_items (int): optional limit on the number of retrieved items.
        @param item_ids (list[str]): identifiers of the items to be retrieved (can't be
                                     used with rsm_request).
        @param sub_id (str): optional subscription identifier.
        @param rsm_request (rsm.RSMRequest): RSM request data
        @return: a deferred couple (list[dict], dict) containing:
            - list of items
            - metadata with the following keys:
                - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index
                    value of RSMResponse
                - service, node: service and node used
        """
        if item_ids and max_items is not None:
            max_items = None
        if rsm_request and item_ids:
            raise ValueError("items_id can't be used with rsm")
        if extra is None:
            extra = {}
        try:
            mam_query = extra["mam"]
        except KeyError:
            d = client.pubsub_client.items(
                service = service,
                nodeIdentifier = node,
                maxItems = max_items,
                subscriptionIdentifier = sub_id,
                sender = None,
                itemIdentifiers = item_ids,
                orderBy = extra.get(C.KEY_ORDER_BY),
                rsm_request = rsm_request
            )
            # we have no MAM data here, so we add None
            d.addCallback(lambda data: data + (None,))
            d.addErrback(sat_defer.stanza2NotFound)
            d.addTimeout(TIMEOUT, reactor)
        else:
            # if mam is requested, we have to do a totally different query
            if self._mam is None:
                raise exceptions.NotFound("MAM (XEP-0313) plugin is not available")
            if max_items is not None:
                raise exceptions.DataError("max_items parameter can't be used with MAM")
            if item_ids:
                raise exceptions.DataError("items_ids parameter can't be used with MAM")
            if mam_query.node is None:
                mam_query.node = node
            elif mam_query.node != node:
                raise exceptions.DataError(
                    "MAM query node is incoherent with getItems's node"
                )
            if mam_query.rsm is None:
                mam_query.rsm = rsm_request
            else:
                if mam_query.rsm != rsm_request:
                    raise exceptions.DataError(
                        "Conflict between RSM request and MAM's RSM request"
                    )
            d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage)

        try:
            subscribe = C.bool(extra["subscribe"])
        except KeyError:
            subscribe = False

        def subscribeEb(failure, service, node):
            failure.trap(error.StanzaError)
            log.warning(
                "Could not subscribe to node {} on service {}: {}".format(
                    node, str(service), str(failure.value)
                )
            )

        def doSubscribe(data):
            self.subscribe(client, service, node).addErrback(
                subscribeEb, service, node
            )
            return data

        if subscribe:
            d.addCallback(doSubscribe)

        def addMetadata(result):
            # TODO: handle the third argument (mam_response)
            items, rsm_response, mam_response = result
            service_jid = service if service else client.jid.userhostJID()
            metadata = {
                "service": service_jid,
                "node": node,
                "uri": self.getNodeURI(service_jid, node),
            }
            if rsm_request is not None and rsm_response is not None:
                metadata.update(
                    {
                        "rsm_" + key: value
                        for key, value in rsm_response.toDict().items()
                    }
                )
            if mam_response is not None:
                for key, value in mam_response.items():
                    metadata["mam_" + key] = value
            return (items, metadata)

        d.addCallback(addMetadata)
        return d

    # @defer.inlineCallbacks
    # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
    #     """Massively retrieve pubsub items from many nodes.
    #     @param service (JID): target service.
    #     @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
    #     @param max_items (int): optional limit on the number of retrieved items *per node*.
    #     @param sub_id (str): optional subscription identifier.
    #     @param rsm (dict): RSM request data
    #     @param profile_key (str): %(doc_profile_key)s
    #     @return: a deferred dict with:
    #         - key: a value in (a subset of) data.keys()
    #         - couple (list[dict], dict) containing:
    #             - list of items
    #             - RSM response data
    #     """
    #     client = self.host.getClient(profile_key)
    #     found_nodes = yield self.listNodes(service, profile=client.profile)
    #     d_dict = {}
    #     for publisher, node in data.items():
    #         if node not in found_nodes:
    #             log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
    #             continue  # avoid pubsub "item-not-found" error
    #         d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile)
    #     defer.returnValue(d_dict)

    def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None,
                   profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.getOptions(
            service, nodeIdentifier, subscriber, subscriptionIdentifier
        )

    def setOptions(self, service, nodeIdentifier, subscriber, options,
                   subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.setOptions(
            service, nodeIdentifier, subscriber, options, subscriptionIdentifier
        )

    def _createNode(self, service_s, nodeIdentifier, options, profile_key):
        client = self.host.getClient(profile_key)
        return self.createNode(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
        )

    def createNode(self, client, service, nodeIdentifier=None, options=None):
        """Create a new node

        @param service(jid.JID): PubSub service,
        @param NodeIdentifier(unicode, None): node name
           use None to create instant node (identifier will be returned by this method)
        @param option(dict[unicode, unicode], None): node configuration options
        @return (unicode): identifier of the created node (may be different from requested name)
        """
        # TODO: if pubsub service doesn't hande publish-options, configure it in a second time
        return client.pubsub_client.createNode(service, nodeIdentifier, options)

    @defer.inlineCallbacks
    def createIfNewNode(self, client, service, nodeIdentifier, options=None):
        """Helper method similar to createNode, but will not fail in case of conflict"""
        try:
            yield self.createNode(client, service, nodeIdentifier, options)
        except error.StanzaError as e:
            if e.condition == "conflict":
                pass
            else:
                raise e

    def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key):
        client = self.host.getClient(profile_key)
        d = self.getConfiguration(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )

        def serialize(form):
            # FIXME: better more generic dataform serialisation should be available in SàT
            return {f.var: str(f.value) for f in list(form.fields.values())}

        d.addCallback(serialize)
        return d

    def getConfiguration(self, client, service, nodeIdentifier):
        request = pubsub.PubSubRequest("configureGet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        def cb(iq):
            form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG)
            form.typeCheck()
            return form

        d = request.send(client.xmlstream)
        d.addCallback(cb)
        return d

    def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key):
        client = self.host.getClient(profile_key)
        d = self.setConfiguration(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier, options
        )
        return d

    def setConfiguration(self, client, service, nodeIdentifier, options):
        request = pubsub.PubSubRequest("configureSet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        form = data_form.Form(
            formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG
        )
        form.makeFields(options)
        request.options = form

        d = request.send(client.xmlstream)
        return d

    def _getAffiliations(self, service_s, nodeIdentifier, profile_key):
        client = self.host.getClient(profile_key)
        d = self.getAffiliations(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier or None
        )
        return d

    def getAffiliations(self, client, service, nodeIdentifier=None):
        """Retrieve affiliations of an entity

        @param nodeIdentifier(unicode, None): node to get affiliation from
            None to get all nodes affiliations for this service
        """
        request = pubsub.PubSubRequest("affiliations")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        def cb(iq_elt):
            try:
                affiliations_elt = next(
                    iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations"))
                )
            except StopIteration:
                raise ValueError(
                    _("Invalid result: missing <affiliations> element: {}").format(
                        iq_elt.toXml
                    )
                )
            try:
                return {
                    e["node"]: e["affiliation"]
                    for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation"))
                }
            except KeyError:
                raise ValueError(
                    _("Invalid result: bad <affiliation> element: {}").format(
                        iq_elt.toXml
                    )
                )

        d = request.send(client.xmlstream)
        d.addCallback(cb)
        return d

    def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key):
        client = self.host.getClient(profile_key)
        d = self.getNodeAffiliations(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )
        d.addCallback(
            lambda affiliations: {j.full(): a for j, a in affiliations.items()}
        )
        return d

    def getNodeAffiliations(self, client, service, nodeIdentifier):
        """Retrieve affiliations of a node owned by profile"""
        request = pubsub.PubSubRequest("affiliationsGet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        def cb(iq_elt):
            try:
                affiliations_elt = next(
                    iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations"))
                )
            except StopIteration:
                raise ValueError(
                    _("Invalid result: missing <affiliations> element: {}").format(
                        iq_elt.toXml
                    )
                )
            try:
                return {
                    jid.JID(e["jid"]): e["affiliation"]
                    for e in affiliations_elt.elements(
                        (pubsub.NS_PUBSUB_OWNER, "affiliation")
                    )
                }
            except KeyError:
                raise ValueError(
                    _("Invalid result: bad <affiliation> element: {}").format(
                        iq_elt.toXml
                    )
                )

        d = request.send(client.xmlstream)
        d.addCallback(cb)
        return d

    def _setNodeAffiliations(
        self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE
    ):
        client = self.host.getClient(profile_key)
        affiliations = {
            jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items()
        }
        d = self.setNodeAffiliations(
            client,
            jid.JID(service_s) if service_s else None,
            nodeIdentifier,
            affiliations,
        )
        return d

    def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations):
        """Update affiliations of a node owned by profile

        @param affiliations(dict[jid.JID, unicode]): affiliations to set
            check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations
        """
        request = pubsub.PubSubRequest("affiliationsSet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.affiliations = affiliations
        d = request.send(client.xmlstream)
        return d

    def _purgeNode(self, service_s, nodeIdentifier, profile_key):
        client = self.host.getClient(profile_key)
        return self.purgeNode(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )

    def purgeNode(self, client, service, nodeIdentifier):
        return client.pubsub_client.purgeNode(service, nodeIdentifier)

    def _deleteNode(self, service_s, nodeIdentifier, profile_key):
        client = self.host.getClient(profile_key)
        return self.deleteNode(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )

    def deleteNode(self, client, service, nodeIdentifier):
        return client.pubsub_client.deleteNode(service, nodeIdentifier)

    def _addWatch(self, service_s, node, profile_key):
        """watch modifications on a node

        This method should only be called from bridge
        """
        client = self.host.getClient(profile_key)
        service = jid.JID(service_s) if service_s else client.jid.userhostJID()
        client.pubsub_watching.add((service, node))

    def _removeWatch(self, service_s, node, profile_key):
        """remove a node watch

        This method should only be called from bridge
        """
        client = self.host.getClient(profile_key)
        service = jid.JID(service_s) if service_s else client.jid.userhostJID()
        client.pubsub_watching.remove((service, node))

    def _retractItem(
        self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key
    ):
        return self._retractItems(
            service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key
        )

    def _retractItems(
        self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key
    ):
        return self.retractItems(
            jid.JID(service_s) if service_s else None,
            nodeIdentifier,
            itemIdentifiers,
            notify,
            profile_key,
        )

    def retractItems(
        self,
        service,
        nodeIdentifier,
        itemIdentifiers,
        notify=True,
        profile_key=C.PROF_KEY_NONE,
    ):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.retractItems(
            service, nodeIdentifier, itemIdentifiers, notify=True
        )

    def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        service = None if not service else jid.JID(service)
        d = self.subscribe(client, service, nodeIdentifier, options=options or None)
        d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "")
        return d

    def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None):
        # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe
        return client.pubsub_client.subscribe(
            service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options
        )

    def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        service = None if not service else jid.JID(service)
        return self.unsubscribe(client, service, nodeIdentifier)

    def unsubscribe(
        self,
        client,
        service,
        nodeIdentifier,
        sub_jid=None,
        subscriptionIdentifier=None,
        sender=None,
    ):
        return client.pubsub_client.unsubscribe(
            service,
            nodeIdentifier,
            sub_jid or client.jid.userhostJID(),
            subscriptionIdentifier,
            sender,
        )

    def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        service = None if not service else jid.JID(service)

        def gotSubscriptions(subscriptions):
            # we replace pubsub.Subscription instance by dict that we can serialize
            for idx, sub in enumerate(subscriptions):
                sub_dict = {
                    "node": sub.nodeIdentifier,
                    "subscriber": sub.subscriber.full(),
                    "state": sub.state,
                }
                if sub.subscriptionIdentifier is not None:
                    sub_dict["id"] = sub.subscriptionIdentifier
                subscriptions[idx] = sub_dict

            return subscriptions

        d = self.subscriptions(client, service, nodeIdentifier or None)
        d.addCallback(gotSubscriptions)
        return d

    def subscriptions(self, client, service, nodeIdentifier=None):
        """retrieve subscriptions from a service

        @param service(jid.JID): PubSub service
        @param nodeIdentifier(unicode, None): node to check
            None to get all subscriptions
        """
        return client.pubsub_client.subscriptions(service, nodeIdentifier)

    ## misc tools ##

    def getNodeURI(self, service, node, item=None):
        """Return XMPP URI of a PubSub node

        @param service(jid.JID): PubSub service
        @param node(unicode): node
        @return (unicode): URI of the node
        """
        assert service is not None
        # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122)
        #      use ";" as a separator. So if more than one value is used in query_data,
        #      urlencode MUST NOT BE USED.
        query_data = [("node", node.encode("utf-8"))]
        if item is not None:
            query_data.append(("item", item.encode("utf-8")))
        return "xmpp:{service}?;{query}".format(
            service=service.userhost(), query=urllib.parse.urlencode(query_data)
        )

    ## methods to manage several stanzas/jids at once ##

    # generic #

    def getRTResults(
        self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE
    ):
        return self.rt_sessions.getResults(session_id, on_success, on_error, profile)

    def transItemsData(self, items_data, item_cb=lambda item: item.toXml(),
            serialise=False):
        """Helper method to transform result from [getItems]

        the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
        as returned by [getItems]. metadata values are then casted to unicode and
        each item is passed to items_cb then optionally serialised with
            data_format.serialise.
        @param items_data(tuple): tuple returned by [getItems]
        @param item_cb(callable): method to transform each item
        @param serialise(bool): if True do a data_format.serialise
            after applying item_cb
        @return (tuple): a serialised form ready to go throught bridge
        """
        items, metadata = items_data
        if serialise:
            items = [data_format.serialise(item_cb(item)) for item in items]
        else:
            items = [item_cb(item) for item in items]

        return (
            items,
            {key: str(value) for key, value in metadata.items()},
        )

    def transItemsDataD(self, items_data, item_cb, serialise=False):
        """Helper method to transform result from [getItems], deferred version

        the items_data must be a tuple(list[domish.Element], dict[unicode, unicode])
        as returned by [getItems]. metadata values are then casted to unicode and
        each item is passed to items_cb then optionally serialised with
            data_format.serialise.
        An errback is added to item_cb, and when it is fired the value is filtered from
            final items
        @param items_data(tuple): tuple returned by [getItems]
        @param item_cb(callable): method to transform each item (must return a deferred)
        @param serialise(bool): if True do a data_format.serialise
            after applying item_cb
        @return (tuple): a deferred which fire a serialised form ready to go throught
            bridge
        """
        items, metadata = items_data

        def eb(failure):
            log.warning(
                "Error while serialising/parsing item: {}".format(str(failure.value))
            )

        d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items])

        def finishSerialisation(parsed_items):
            if serialise:
                items = [data_format.serialise(i) for i in parsed_items if i is not None]
            else:
                items = [i for i in parsed_items if i is not None]

            return (
                items,
                {key: str(value) for key, value in metadata.items()},
            )

        d.addCallback(finishSerialisation)
        return d

    def serDList(self, results, failure_result=None):
        """Serialise a DeferredList result

        @param results: DeferredList results
        @param failure_result: value to use as value for failed Deferred
            (default: empty tuple)
        @return (list): list with:
            - failure: empty in case of success, else error message
            - result
        """
        if failure_result is None:
            failure_result = ()
        return [
            ("", result)
            if success
            else (str(result.result) or UNSPECIFIED, failure_result)
            for success, result in results
        ]

    # subscribe #

    def _getNodeSubscriptions(self, service_s, nodeIdentifier, profile_key):
        client = self.host.getClient(profile_key)
        d = self.getNodeSubscriptions(
            client, jid.JID(service_s) if service_s else None, nodeIdentifier
        )
        d.addCallback(
            lambda subscriptions: {j.full(): a for j, a in subscriptions.items()}
        )
        return d

    def getNodeSubscriptions(self, client, service, nodeIdentifier):
        """Retrieve subscriptions to a node

        @param nodeIdentifier(unicode): node to get subscriptions from
        """
        if not nodeIdentifier:
            raise exceptions.DataError("node identifier can't be empty")
        request = pubsub.PubSubRequest("subscriptionsGet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier

        def cb(iq_elt):
            try:
                subscriptions_elt = next(
                    iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions"))
                )
            except StopIteration:
                raise ValueError(
                    _("Invalid result: missing <subscriptions> element: {}").format(
                        iq_elt.toXml
                    )
                )
            except AttributeError as e:
                raise ValueError(_("Invalid result: {}").format(e))
            try:
                return {
                    jid.JID(s["jid"]): s["subscription"]
                    for s in subscriptions_elt.elements(
                        (pubsub.NS_PUBSUB, "subscription")
                    )
                }
            except KeyError:
                raise ValueError(
                    _("Invalid result: bad <subscription> element: {}").format(
                        iq_elt.toXml
                    )
                )

        d = request.send(client.xmlstream)
        d.addCallback(cb)
        return d

    def _setNodeSubscriptions(
        self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE
    ):
        client = self.host.getClient(profile_key)
        subscriptions = {
            jid.JID(jid_): subscription
            for jid_, subscription in subscriptions.items()
        }
        d = self.setNodeSubscriptions(
            client,
            jid.JID(service_s) if service_s else None,
            nodeIdentifier,
            subscriptions,
        )
        return d

    def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions):
        """Set or update subscriptions of a node owned by profile

        @param subscriptions(dict[jid.JID, unicode]): subscriptions to set
            check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions
        """
        request = pubsub.PubSubRequest("subscriptionsSet")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.subscriptions = {
            pubsub.Subscription(nodeIdentifier, jid_, state)
            for jid_, state in subscriptions.items()
        }
        d = request.send(client.xmlstream)
        return d

    def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
        """Get real-time results for subcribeToManu session

        @param session_id: id of the real-time deferred session
        @param return (tuple): (remaining, results) where:
            - remaining is the number of still expected results
            - results is a list of tuple(unicode, unicode, bool, unicode) with:
                - service: pubsub service
                - and node: pubsub node
                - failure(unicode): empty string in case of success, error message else
        @param profile_key: %(doc_profile_key)s
        """
        profile = self.host.getClient(profile_key).profile
        d = self.rt_sessions.getResults(
            session_id,
            on_success=lambda result: "",
            on_error=lambda failure: str(failure.value),
            profile=profile,
        )
        # we need to convert jid.JID to unicode with full() to serialise it for the bridge
        d.addCallback(
            lambda ret: (
                ret[0],
                [
                    (service.full(), node, "" if success else failure or UNSPECIFIED)
                    for (service, node), (success, failure) in ret[1].items()
                ],
            )
        )
        return d

    def _subscribeToMany(
        self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE
    ):
        return self.subscribeToMany(
            [(jid.JID(service), str(node)) for service, node in node_data],
            jid.JID(subscriber),
            options,
            profile_key,
        )

    def subscribeToMany(
        self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE
    ):
        """Subscribe to several nodes at once.

        @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
            - service (jid.JID) is the pubsub service
            - node (unicode) is the node to subscribe to
        @param subscriber (jid.JID): optional subscription identifier.
        @param options (dict): subscription options
        @param profile_key (str): %(doc_profile_key)s
        @return (str): RT Deferred session id
        """
        client = self.host.getClient(profile_key)
        deferreds = {}
        for service, node in node_data:
            deferreds[(service, node)] = client.pubsub_client.subscribe(
                service, node, subscriber, options=options
            )
        return self.rt_sessions.newSession(deferreds, client.profile)
        # found_nodes = yield self.listNodes(service, profile=client.profile)
        # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
        # d_list = []
        # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
        #     if nodeIdentifier not in found_nodes:
        #         log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
        #         continue  # avoid sat-pubsub "SubscriptionExists" error
        #     d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
        # defer.returnValue(d_list)

    # get #

    def _getFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
        """Get real-time results for getFromMany session

        @param session_id: id of the real-time deferred session
        @param profile_key: %(doc_profile_key)s
        @param return (tuple): (remaining, results) where:
            - remaining is the number of still expected results
            - results is a list of tuple with
                - service (unicode): pubsub service
                - node (unicode): pubsub node
                - failure (unicode): empty string in case of success, error message else
                - items (list[s]): raw XML of items
                - metadata(dict): serialised metadata
        """
        profile = self.host.getClient(profile_key).profile
        d = self.rt_sessions.getResults(
            session_id,
            on_success=lambda result: ("", self.transItemsData(result)),
            on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})),
            profile=profile,
        )
        d.addCallback(
            lambda ret: (
                ret[0],
                [
                    (service.full(), node, failure, items, metadata)
                    for (service, node), (success, (failure, (items, metadata))) in ret[
                        1
                    ].items()
                ],
            )
        )
        return d

    def _getFromMany(
        self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE
    ):
        """
        @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
        """
        max_item = None if max_item == C.NO_LIMIT else max_item
        extra = self.parseExtra(extra_dict)
        return self.getFromMany(
            [(jid.JID(service), str(node)) for service, node in node_data],
            max_item,
            extra.rsm_request,
            extra.extra,
            profile_key,
        )

    def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None,
                    profile_key=C.PROF_KEY_NONE):
        """Get items from many nodes at once

        @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
            - service (jid.JID) is the pubsub service
            - node (unicode) is the node to get items from
        @param max_items (int): optional limit on the number of retrieved items.
        @param rsm_request (RSMRequest): RSM request data
        @param profile_key (unicode): %(doc_profile_key)s
        @return (str): RT Deferred session id
        """
        client = self.host.getClient(profile_key)
        deferreds = {}
        for service, node in node_data:
            deferreds[(service, node)] = self.getItems(
                client, service, node, max_item, rsm_request=rsm_request, extra=extra
            )
        return self.rt_sessions.newSession(deferreds, client.profile)


@implementer(disco.IDisco)
class SatPubSubClient(rsm.PubSubClient):

    def __init__(self, host, parent_plugin):
        self.host = host
        self.parent_plugin = parent_plugin
        rsm.PubSubClient.__init__(self)

    def connectionInitialized(self):
        rsm.PubSubClient.connectionInitialized(self)

    def _getNodeCallbacks(self, node, event):
        """Generate callbacks from given node and event

        @param node(unicode): node used for the item
            any registered node which prefix the node will match
        @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE
        @return (iterator[callable]): callbacks for this node/event
        """
        for registered_node, callbacks_dict in self.parent_plugin._node_cb.items():
            if not node.startswith(registered_node):
                continue
            try:
                for callback in callbacks_dict[event]:
                    yield callback
            except KeyError:
                continue


    def itemsReceived(self, event):
        log.debug("Pubsub items received")
        for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS):
            callback(self.parent, event)
        client = self.parent
        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
            raw_items = [i.toXml() for i in event.items]
            self.host.bridge.psEventRaw(
                event.sender.full(),
                event.nodeIdentifier,
                C.PS_ITEMS,
                raw_items,
                client.profile,
            )

    def deleteReceived(self, event):
        log.debug(("Publish node deleted"))
        for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE):
            callback(self.parent, event)
        client = self.parent
        if (event.sender, event.nodeIdentifier) in client.pubsub_watching:
            self.host.bridge.psEventRaw(
                event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile
            )

    def subscriptions(self, service, nodeIdentifier, sender=None):
        """Return the list of subscriptions to the given service and node.

        @param service: The publish subscribe service to retrieve the subscriptions from.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
        @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
        @type nodeIdentifier: C{unicode}
        @return (list[pubsub.Subscription]): list of subscriptions
        """
        request = pubsub.PubSubRequest("subscriptions")
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.sender = sender
        d = request.send(self.xmlstream)

        def cb(iq):
            subs = []
            for subscription_elt in iq.pubsub.subscriptions.elements(
                pubsub.NS_PUBSUB, "subscription"
            ):
                subscription = pubsub.Subscription(
                    subscription_elt["node"],
                    jid.JID(subscription_elt["jid"]),
                    subscription_elt["subscription"],
                    subscriptionIdentifier=subscription_elt.getAttribute("subid"),
                )
                subs.append(subscription)
            return subs

        return d.addCallback(cb)

    def purgeNode(self, service, nodeIdentifier):
        """Purge a node (i.e. delete all items from it)

        @param service(jid.JID, None): service to send the item to
            None to use PEP
        @param NodeIdentifier(unicode): PubSub node to use
        """
        # TODO: propose this upstream and remove it once merged
        request = pubsub.PubSubRequest('purge')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        return request.send(self.xmlstream)

    def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
        disco_info = []
        self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
        return disco_info

    def getDiscoItems(self, requestor, service, nodeIdentifier=""):
        return []