view src/tmp/wokkel/pubsub.py @ 1414:159d16336f87

core, bridge, jp: management of service discovery extensions (XEP-0128)
author Goffi <goffi@goffi.org>
date Fri, 17 Apr 2015 22:59:35 +0200
parents 9141bde7ff31
children be1fccf4854d
line wrap: on
line source

# -*- test-case-name: wokkel.test.test_pubsub -*-
#
# Copyright (c) Ralph Meijer.
# See LICENSE for details.

"""
XMPP publish-subscribe protocol.

This protocol is specified in
U{XEP-0060<http://xmpp.org/extensions/xep-0060.html>}.
"""

from zope.interface import implements

from twisted.internet import defer
from twisted.python import log
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish

from wokkel import disco, data_form, generic, shim
from wokkel.compat import IQ
from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource

# Iq get and set XPath queries
IQ_GET = '/iq[@type="get"]'
IQ_SET = '/iq[@type="set"]'

# Publish-subscribe namespaces
NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"

# XPath to match pubsub requests
PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'

class SubscriptionPending(Exception):
    """
    Raised when the requested subscription is pending acceptance.
    """



class SubscriptionUnconfigured(Exception):
    """
    Raised when the requested subscription needs to be configured before
    becoming active.
    """



class PubSubError(error.StanzaError):
    """
    Exception with publish-subscribe specific condition.
    """
    def __init__(self, condition, pubsubCondition, feature=None, text=None):
        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
        if feature:
            appCondition['feature'] = feature
        error.StanzaError.__init__(self, condition,
                                         text=text,
                                         appCondition=appCondition)



class BadRequest(error.StanzaError):
    """
    Bad request stanza error.
    """
    def __init__(self, pubsubCondition=None, text=None):
        if pubsubCondition:
            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
        else:
            appCondition = None
        error.StanzaError.__init__(self, 'bad-request',
                                         text=text,
                                         appCondition=appCondition)



class Unsupported(PubSubError):
    def __init__(self, feature, text=None):
        self.feature = feature
        PubSubError.__init__(self, 'feature-not-implemented',
                                   'unsupported',
                                   feature,
                                   text)

    def __str__(self):
        message = PubSubError.__str__(self)
        message += ', feature %r' % self.feature
        return message


class Subscription(object):
    """
    A subscription to a node.

    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
        node is denoted by C{None}.
    @type nodeIdentifier: C{unicode}

    @ivar subscriber: The subscribing entity.
    @type subscriber: L{jid.JID}

    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
                 C{'unconfigured'}.
    @type state: C{unicode}

    @ivar options: Optional list of subscription options.
    @type options: C{dict}

    @ivar subscriptionIdentifier: Optional subscription identifier.
    @type subscriptionIdentifier: C{unicode}
    """

    def __init__(self, nodeIdentifier, subscriber, state, options=None,
                       subscriptionIdentifier=None):
        self.nodeIdentifier = nodeIdentifier
        self.subscriber = subscriber
        self.state = state
        self.options = options or {}
        self.subscriptionIdentifier = subscriptionIdentifier


    @staticmethod
    def fromElement(element):
        return Subscription(
                element.getAttribute('node'),
                jid.JID(element.getAttribute('jid')),
                element.getAttribute('subscription'),
                subscriptionIdentifier=element.getAttribute('subid'))


    def toElement(self, defaultUri=None):
        """
        Return the DOM representation of this subscription.

        @rtype: L{domish.Element}
        """
        element = domish.Element((defaultUri, 'subscription'))
        if self.nodeIdentifier:
            element['node'] = self.nodeIdentifier
        element['jid'] = unicode(self.subscriber)
        element['subscription'] = self.state
        if self.subscriptionIdentifier:
            element['subid'] = self.subscriptionIdentifier
        return element



class Item(domish.Element):
    """
    Publish subscribe item.

    This behaves like an object providing L{domish.IElement}.

    Item payload can be added using C{addChild} or C{addRawXml}, or using the
    C{payload} keyword argument to C{__init__}.
    """

    def __init__(self, id=None, payload=None):
        """
        @param id: optional item identifier
        @type id: C{unicode}
        @param payload: optional item payload. Either as a domish element, or
                        as serialized XML.
        @type payload: object providing L{domish.IElement} or C{unicode}.
        """

        domish.Element.__init__(self, (None, 'item'))
        if id is not None:
            self['id'] = id
        if payload is not None:
            if isinstance(payload, basestring):
                self.addRawXml(payload)
            else:
                self.addChild(payload)



class PubSubRequest(generic.Stanza):
    """
    A publish-subscribe request.

    The set of instance variables used depends on the type of request. If
    a variable is not applicable or not passed in the request, its value is
    C{None}.

    @ivar verb: The type of publish-subscribe request. See C{_requestVerbMap}.
    @type verb: C{str}.

    @ivar affiliations: Affiliations to be modified.
    @type affiliations: C{set}

    @ivar items: The items to be published, as L{domish.Element}s.
    @type items: C{list}

    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
                           retracted.
    @type itemIdentifiers: C{set}

    @ivar maxItems: Maximum number of items to retrieve.
    @type maxItems: C{int}.

    @ivar nodeIdentifier: Identifier of the node the request is about.
    @type nodeIdentifier: C{unicode}

    @ivar nodeType: The type of node that should be created, or for which the
                    configuration is retrieved. C{'leaf'} or C{'collection'}.
    @type nodeType: C{str}

    @ivar options: Configurations options for nodes, subscriptions and publish
                   requests.
    @type options: L{data_form.Form}

    @ivar subscriber: The subscribing entity.
    @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}

    @ivar subscriptionIdentifier: Identifier for a specific subscription.
    @type subscriptionIdentifier: C{unicode}

    @ivar subscriptions: Subscriptions to be modified, as a set of
        L{Subscription}.
    @type subscriptions: C{set}

    @ivar affiliations: Affiliations to be modified, as a dictionary of entity
        (L{JID<twisted.words.protocols.jabber.jid.JID>} to affiliation
        (C{unicode}).
    @type affiliations: C{dict}
    """

    verb = None

    affiliations = None
    items = None
    itemIdentifiers = None
    maxItems = None
    nodeIdentifier = None
    nodeType = None
    options = None
    subscriber = None
    subscriptionIdentifier = None
    subscriptions = None
    affiliations = None

    # Map request iq type and subelement name to request verb
    _requestVerbMap = {
        ('set', NS_PUBSUB, 'publish'): 'publish',
        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
        ('get', NS_PUBSUB, 'options'): 'optionsGet',
        ('set', NS_PUBSUB, 'options'): 'optionsSet',
        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
        ('set', NS_PUBSUB, 'create'): 'create',
        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
        ('get', NS_PUBSUB, 'items'): 'items',
        ('set', NS_PUBSUB, 'retract'): 'retract',
        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
    }

    # Map request verb to request iq type and subelement name
    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))

    # Map request verb to parameter handler names
    _parameters = {
        'publish': ['node', 'items'],
        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
        'subscriptions': [],
        'affiliations': [],
        'create': ['nodeOrNone', 'configureOrNone'],
        'default': ['default'],
        'configureGet': ['nodeOrEmpty'],
        'configureSet': ['nodeOrEmpty', 'configure'],
        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
        'retract': ['node', 'itemIdentifiers'],
        'purge': ['node'],
        'delete': ['node'],
        'affiliationsGet': ['nodeOrEmpty'],
        'affiliationsSet': ['nodeOrEmpty', 'affiliations'],
        'subscriptionsGet': ['nodeOrEmpty'],
        'subscriptionsSet': [],
    }

    def __init__(self, verb=None):
        self.verb = verb


    def _parse_node(self, verbElement):
        """
        Parse the required node identifier out of the verbElement.
        """
        try:
            self.nodeIdentifier = verbElement["node"]
        except KeyError:
            raise BadRequest('nodeid-required')


    def _render_node(self, verbElement):
        """
        Render the required node identifier on the verbElement.
        """
        if not self.nodeIdentifier:
            raise Exception("Node identifier is required")

        verbElement['node'] = self.nodeIdentifier


    def _parse_nodeOrEmpty(self, verbElement):
        """
        Parse the node identifier out of the verbElement. May be empty.
        """
        self.nodeIdentifier = verbElement.getAttribute("node", '')


    def _render_nodeOrEmpty(self, verbElement):
        """
        Render the node identifier on the verbElement. May be empty.
        """
        if self.nodeIdentifier:
            verbElement['node'] = self.nodeIdentifier


    def _parse_nodeOrNone(self, verbElement):
        """
        Parse the optional node identifier out of the verbElement.
        """
        self.nodeIdentifier = verbElement.getAttribute("node")


    def _render_nodeOrNone(self, verbElement):
        """
        Render the optional node identifier on the verbElement.
        """
        if self.nodeIdentifier:
            verbElement['node'] = self.nodeIdentifier


    def _parse_items(self, verbElement):
        """
        Parse items out of the verbElement for publish requests.
        """
        self.items = []
        for element in verbElement.elements():
            if element.uri == NS_PUBSUB and element.name == 'item':
                self.items.append(element)


    def _render_items(self, verbElement):
        """
        Render items into the verbElement for publish requests.
        """
        if self.items:
            for item in self.items:
                item.uri = NS_PUBSUB
                verbElement.addChild(item)


    def _parse_jid(self, verbElement):
        """
        Parse subscriber out of the verbElement for un-/subscribe requests.
        """
        try:
            self.subscriber = jid.internJID(verbElement["jid"])
        except KeyError:
            raise BadRequest('jid-required')


    def _render_jid(self, verbElement):
        """
        Render subscriber into the verbElement for un-/subscribe requests.
        """
        verbElement['jid'] = self.subscriber.full()


    def _parse_default(self, verbElement):
        """
        Parse node type out of a request for the default node configuration.
        """
        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
        if form is not None and form.formType == 'submit':
            values = form.getValues()
            self.nodeType = values.get('pubsub#node_type', 'leaf')
        else:
            self.nodeType = 'leaf'


    def _parse_configure(self, verbElement):
        """
        Parse options out of a request for setting the node configuration.
        """
        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
        if form is not None:
            if form.formType in ('submit', 'cancel'):
                self.options = form
            else:
                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
        else:
            raise BadRequest(text="Missing configuration form")


    def _parse_configureOrNone(self, verbElement):
        """
        Parse optional node configuration form in create request.
        """
        for element in verbElement.parent.elements():
            if element.uri == NS_PUBSUB and element.name == 'configure':
                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
                if form is not None:
                    if form.formType != 'submit':
                        raise BadRequest(text=u"Unexpected form type '%s'" %
                                              form.formType)
                else:
                    form = data_form.Form('submit',
                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
                self.options = form


    def _render_configureOrNone(self, verbElement):
        """
        Render optional node configuration form in create request.
        """
        if self.options is not None:
            configure = verbElement.parent.addElement('configure')
            configure.addChild(self.options.toElement())


    def _parse_itemIdentifiers(self, verbElement):
        """
        Parse item identifiers out of items and retract requests.
        """
        self.itemIdentifiers = []
        for element in verbElement.elements():
            if element.uri == NS_PUBSUB and element.name == 'item':
                try:
                    self.itemIdentifiers.append(element["id"])
                except KeyError:
                    raise BadRequest()


    def _render_itemIdentifiers(self, verbElement):
        """
        Render item identifiers into items and retract requests.
        """
        if self.itemIdentifiers:
            for itemIdentifier in self.itemIdentifiers:
                item = verbElement.addElement('item')
                item['id'] = itemIdentifier


    def _parse_maxItems(self, verbElement):
        """
        Parse maximum items out of an items request.
        """
        value = verbElement.getAttribute('max_items')

        if value:
            try:
                self.maxItems = int(value)
            except ValueError:
                raise BadRequest(text="Field max_items requires a positive " +
                                      "integer value")


    def _render_maxItems(self, verbElement):
        """
        Render maximum items into an items request.
        """
        if self.maxItems:
            verbElement['max_items'] = unicode(self.maxItems)


    def _parse_subidOrNone(self, verbElement):
        """
        Parse subscription identifier out of a request.
        """
        self.subscriptionIdentifier = verbElement.getAttribute("subid")


    def _render_subidOrNone(self, verbElement):
        """
        Render subscription identifier into a request.
        """
        if self.subscriptionIdentifier:
            verbElement['subid'] = self.subscriptionIdentifier


    def _parse_options(self, verbElement):
        """
        Parse options form out of a subscription options request.
        """
        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
        if form is not None:
            if form.formType in ('submit', 'cancel'):
                self.options = form
            else:
                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
        else:
            raise BadRequest(text="Missing options form")



    def _render_options(self, verbElement):
        verbElement.addChild(self.options.toElement())


    def _parse_optionsWithSubscribe(self, verbElement):
        for element in verbElement.parent.elements():
            if element.name == 'options' and element.uri == NS_PUBSUB:
                form = data_form.findForm(element,
                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
                if form is not None:
                    if form.formType != 'submit':
                        raise BadRequest(text=u"Unexpected form type '%s'" %
                                              form.formType)
                else:
                    form = data_form.Form('submit',
                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
                self.options = form


    def _render_optionsWithSubscribe(self, verbElement):
        if self.options is not None:
            optionsElement = verbElement.parent.addElement('options')
            self._render_options(optionsElement)


    def _parse_affiliations(self, verbElement):
        self.affiliations = {}
        for element in verbElement.elements():
            if (element.uri == NS_PUBSUB_OWNER and
                element.name == 'affiliation'):
                try:
                    entity = jid.internJID(element['jid']).userhostJID()
                except KeyError:
                    raise BadRequest(text='Missing jid attribute')

                if entity in self.affiliations:
                    raise BadRequest(text='Multiple affiliations for an entity')

                try:
                    affiliation = element['affiliation']
                except KeyError:
                    raise BadRequest(text='Missing affiliation attribute')

                self.affiliations[entity] = affiliation


    def parseElement(self, element):
        """
        Parse the publish-subscribe verb and parameters out of a request.
        """
        generic.Stanza.parseElement(self, element)

        verbs = []
        verbElements = []
        for child in element.pubsub.elements():
            key = (self.stanzaType, child.uri, child.name)
            try:
                verb = self._requestVerbMap[key]
            except KeyError:
                continue

            verbs.append(verb)
            verbElements.append(child)

        if not verbs:
            raise NotImplementedError()

        if len(verbs) > 1:
            if 'optionsSet' in verbs and 'subscribe' in verbs:
                self.verb = 'subscribe'
                verbElement = verbElements[verbs.index('subscribe')]
            else:
                raise NotImplementedError()
        else:
            self.verb = verbs[0]
            verbElement = verbElements[0]

        for parameter in self._parameters[self.verb]:
            getattr(self, '_parse_%s' % parameter)(verbElement)



    def send(self, xs):
        """
        Send this request to its recipient.

        This renders all of the relevant parameters for this specific
        requests into an L{IQ}, and invoke its C{send} method.
        This returns a deferred that fires upon reception of a response. See
        L{IQ} for details.

        @param xs: The XML stream to send the request on.
        @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream}
        @rtype: L{defer.Deferred}.
        """

        try:
            (self.stanzaType,
             childURI,
             childName) = self._verbRequestMap[self.verb]
        except KeyError:
            raise NotImplementedError()

        iq = IQ(xs, self.stanzaType)
        iq.addElement((childURI, 'pubsub'))
        verbElement = iq.pubsub.addElement(childName)

        if self.sender:
            iq['from'] = self.sender.full()
        if self.recipient:
            iq['to'] = self.recipient.full()

        for parameter in self._parameters[self.verb]:
            getattr(self, '_render_%s' % parameter)(verbElement)

        return iq.send()



class PubSubEvent(object):
    """
    A publish subscribe event.

    @param sender: The entity from which the notification was received.
    @type sender: L{jid.JID}
    @param recipient: The entity to which the notification was sent.
    @type recipient: L{wokkel.pubsub.ItemsEvent}
    @param nodeIdentifier: Identifier of the node the event pertains to.
    @type nodeIdentifier: C{unicode}
    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
    @type headers: C{dict}
    """

    def __init__(self, sender, recipient, nodeIdentifier, headers):
        self.sender = sender
        self.recipient = recipient
        self.nodeIdentifier = nodeIdentifier
        self.headers = headers



class ItemsEvent(PubSubEvent):
    """
    A publish-subscribe event that signifies new, updated and retracted items.

    @param items: List of received items as domish elements.
    @type items: C{list} of L{domish.Element}
    """

    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
        self.items = items



class DeleteEvent(PubSubEvent):
    """
    A publish-subscribe event that signifies the deletion of a node.
    """

    redirectURI = None



class PurgeEvent(PubSubEvent):
    """
    A publish-subscribe event that signifies the purging of a node.
    """



class PubSubClient(XMPPHandler):
    """
    Publish subscribe client protocol.
    """

    implements(IPubSubClient)

    def connectionInitialized(self):
        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
                                   NS_PUBSUB_EVENT, self._onEvent)


    def _onEvent(self, message):
        if message.getAttribute('type') == 'error':
            return

        try:
            sender = jid.JID(message["from"])
            recipient = jid.JID(message["to"])
        except KeyError:
            return

        actionElement = None
        for element in message.event.elements():
            if element.uri == NS_PUBSUB_EVENT:
                actionElement = element

        if not actionElement:
            return

        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)

        if eventHandler:
            headers = shim.extractHeaders(message)
            eventHandler(sender, recipient, actionElement, headers)
            message.handled = True


    def _onEvent_items(self, sender, recipient, action, headers):
        nodeIdentifier = action["node"]

        items = [element for element in action.elements()
                         if element.name in ('item', 'retract')]

        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
        self.itemsReceived(event)


    def _onEvent_delete(self, sender, recipient, action, headers):
        nodeIdentifier = action["node"]
        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
        if action.redirect:
            event.redirectURI = action.redirect.getAttribute('uri')
        self.deleteReceived(event)


    def _onEvent_purge(self, sender, recipient, action, headers):
        nodeIdentifier = action["node"]
        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
        self.purgeReceived(event)


    def itemsReceived(self, event):
        pass


    def deleteReceived(self, event):
        pass


    def purgeReceived(self, event):
        pass


    def createNode(self, service, nodeIdentifier=None, options=None,
                         sender=None):
        """
        Create a publish subscribe node.

        @param service: The publish subscribe service to create the node at.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
        @param nodeIdentifier: Optional suggestion for the id of the node.
        @type nodeIdentifier: C{unicode}
        @param options: Optional node configuration options.
        @type options: C{dict}
        """
        request = PubSubRequest('create')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.sender = sender

        if options:
            form = data_form.Form(formType='submit',
                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
            form.makeFields(options)
            request.options = form

        def cb(iq):
            try:
                new_node = iq.pubsub.create["node"]
            except AttributeError:
                # the suggested node identifier was accepted
                new_node = nodeIdentifier
            return new_node

        d = request.send(self.xmlstream)
        d.addCallback(cb)
        return d


    def deleteNode(self, service, nodeIdentifier, sender=None):
        """
        Delete a publish subscribe node.

        @param service: The publish subscribe service to delete the node from.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}
        """
        request = PubSubRequest('delete')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.sender = sender
        return request.send(self.xmlstream)


    def subscribe(self, service, nodeIdentifier, subscriber,
                        options=None, sender=None):
        """
        Subscribe to a publish subscribe node.

        @param service: The publish subscribe service that keeps the node.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}

        @param subscriber: The entity to subscribe to the node. This entity
            will get notifications of new published items.
        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param options: Subscription options.
        @type options: C{dict}

        @return: Deferred that fires with L{Subscription} or errbacks with
            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
        @rtype: L{defer.Deferred}
        """
        request = PubSubRequest('subscribe')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.subscriber = subscriber
        request.sender = sender

        if options:
            form = data_form.Form(formType='submit',
                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
            form.makeFields(options)
            request.options = form

        def cb(iq):
            subscription = Subscription.fromElement(iq.pubsub.subscription)

            if subscription.state == 'pending':
                raise SubscriptionPending()
            elif subscription.state == 'unconfigured':
                raise SubscriptionUnconfigured()
            else:
                # we assume subscription == 'subscribed'
                # any other value would be invalid, but that should have
                # yielded a stanza error.
                return subscription

        d = request.send(self.xmlstream)
        d.addCallback(cb)
        return d


    def unsubscribe(self, service, nodeIdentifier, subscriber,
                          subscriptionIdentifier=None, sender=None):
        """
        Unsubscribe from a publish subscribe node.

        @param service: The publish subscribe service that keeps the node.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}

        @param subscriber: The entity to unsubscribe from the node.
        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param subscriptionIdentifier: Optional subscription identifier.
        @type subscriptionIdentifier: C{unicode}
        """
        request = PubSubRequest('unsubscribe')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.subscriber = subscriber
        request.subscriptionIdentifier = subscriptionIdentifier
        request.sender = sender
        return request.send(self.xmlstream)


    def publish(self, service, nodeIdentifier, items=None, sender=None):
        """
        Publish to a publish subscribe node.

        @param service: The publish subscribe service that keeps the node.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}
        @param items: Optional list of L{Item}s to publish.
        @type items: C{list}
        """
        request = PubSubRequest('publish')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.items = items
        request.sender = sender
        return request.send(self.xmlstream)


    def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None,
              subscriptionIdentifier=None, sender=None):
        """
        Retrieve previously published items from a publish subscribe node.

        @param service: The publish subscribe service that keeps the node.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}

        @param maxItems: Optional limit on the number of retrieved items.
        @type maxItems: C{int}

        @param itemIdentifiers: Identifiers of the items to be retrieved.
        @type itemIdentifiers: C{set}

        @param subscriptionIdentifier: Optional subscription identifier. In
            case the node has been subscribed to multiple times, this narrows
            the results to the specific subscription.
        @type subscriptionIdentifier: C{unicode}
        """
        request = PubSubRequest('items')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        if maxItems:
            request.maxItems = str(int(maxItems))
        request.subscriptionIdentifier = subscriptionIdentifier
        request.sender = sender
        request.itemIdentifiers = itemIdentifiers

        def cb(iq):
            items = []
            for element in iq.pubsub.items.elements():
                if element.uri == NS_PUBSUB and element.name == 'item':
                    items.append(element)
            return items

        d = request.send(self.xmlstream)
        d.addCallback(cb)
        return d

    def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None):
        """
        Retract items from a publish subscribe node.

        @param service: The publish subscribe service to delete the node from.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}
        @param itemIdentifiers: Identifiers of the items to be retracted.
        @type itemIdentifiers: C{set}
        """
        request = PubSubRequest('retract')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.itemIdentifiers = itemIdentifiers
        request.sender = sender
        return request.send(self.xmlstream)

    def getOptions(self, service, nodeIdentifier, subscriber,
                         subscriptionIdentifier=None, sender=None):
        """
        Get subscription options.

        @param service: The publish subscribe service that keeps the node.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}

        @param subscriber: The entity subscribed to the node.
        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param subscriptionIdentifier: Optional subscription identifier.
        @type subscriptionIdentifier: C{unicode}

        @rtype: L{data_form.Form}
        """
        request = PubSubRequest('optionsGet')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.subscriber = subscriber
        request.subscriptionIdentifier = subscriptionIdentifier
        request.sender = sender

        def cb(iq):
            form = data_form.findForm(iq.pubsub.options,
                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
            form.typeCheck()
            return form

        d = request.send(self.xmlstream)
        d.addCallback(cb)
        return d


    def setOptions(self, service, nodeIdentifier, subscriber,
                         options, subscriptionIdentifier=None, sender=None):
        """
        Set subscription options.

        @param service: The publish subscribe service that keeps the node.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}

        @param subscriber: The entity subscribed to the node.
        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param options: Subscription options.
        @type options: C{dict}.

        @param subscriptionIdentifier: Optional subscription identifier.
        @type subscriptionIdentifier: C{unicode}
        """
        request = PubSubRequest('optionsSet')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.subscriber = subscriber
        request.subscriptionIdentifier = subscriptionIdentifier
        request.sender = sender

        form = data_form.Form(formType='submit',
                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
        form.makeFields(options)
        request.options = form

        d = request.send(self.xmlstream)
        return d



class PubSubService(XMPPHandler, IQHandlerMixin):
    """
    Protocol implementation for a XMPP Publish Subscribe Service.

    The word Service here is used as taken from the Publish Subscribe
    specification. It is the party responsible for keeping nodes and their
    subscriptions, and sending out notifications.

    Methods from the L{IPubSubService} interface that are called as a result
    of an XMPP request may raise exceptions. Alternatively the deferred
    returned by these methods may have their errback called. These are handled
    as follows:

     - If the exception is an instance of L{error.StanzaError}, an error
       response iq is returned.
     - Any other exception is reported using L{log.msg}. An error response
       with the condition C{internal-server-error} is returned.

    The default implementation of said methods raises an L{Unsupported}
    exception and are meant to be overridden.

    @ivar discoIdentity: Service discovery identity as a dictionary with
                         keys C{'category'}, C{'type'} and C{'name'}.
    @ivar pubSubFeatures: List of supported publish-subscribe features for
                          service discovery, as C{str}.
    @type pubSubFeatures: C{list} or C{None}
    """

    implements(IPubSubService, disco.IDisco)

    iqHandlers = {
            '/*': '_onPubSubRequest',
            }

    _legacyHandlers = {
        'publish': ('publish', ['sender', 'recipient',
                                'nodeIdentifier', 'items']),
        'subscribe': ('subscribe', ['sender', 'recipient',
                                    'nodeIdentifier', 'subscriber']),
        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
                                        'nodeIdentifier', 'subscriber']),
        'subscriptions': ('subscriptions', ['sender', 'recipient']),
        'affiliations': ('affiliations', ['sender', 'recipient']),
        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
        'getConfigurationOptions': ('getConfigurationOptions', []),
        'default': ('getDefaultConfiguration',
                    ['sender', 'recipient', 'nodeType']),
        'configureGet': ('getConfiguration', ['sender', 'recipient',
                                              'nodeIdentifier']),
        'configureSet': ('setConfiguration', ['sender', 'recipient',
                                              'nodeIdentifier', 'options']),
        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
                            'maxItems', 'itemIdentifiers']),
        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
                                'itemIdentifiers']),
        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
    }

    _request_class = PubSubRequest

    hideNodes = False

    def __init__(self, resource=None):
        self.resource = resource
        self.discoIdentity = {'category': 'pubsub',
                              'type': 'service',
                              'name': 'Generic Publish-Subscribe Service'}

        self.pubSubFeatures = []


    def connectionMade(self):
        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)


    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
        def toInfo(nodeInfo):
            if not nodeInfo:
                return

            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
            info.append(disco.DiscoIdentity('pubsub', nodeType))
            if metaData:
                form = data_form.Form(formType="result",
                                      formNamespace=NS_PUBSUB_META_DATA)
                form.addField(
                        data_form.Field(
                            var='pubsub#node_type',
                            value=nodeType,
                            label='The type of node (collection or leaf)'
                        )
                )

                for metaDatum in metaData:
                    form.addField(data_form.Field.fromDict(metaDatum))

                info.append(form)

            return

        info = []

        request = PubSubRequest('discoInfo')

        if self.resource is not None:
            resource = self.resource.locateResource(request)
            identity = resource.discoIdentity
            features = resource.features
            getInfo = resource.getInfo
        else:
            category = self.discoIdentity['category']
            idType = self.discoIdentity['type']
            name = self.discoIdentity['name']
            identity = disco.DiscoIdentity(category, idType, name)
            features = self.pubSubFeatures
            getInfo = self.getNodeInfo

        if not nodeIdentifier:
            info.append(identity)
            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
                         for feature in features])

        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
        d.addCallback(toInfo)
        d.addErrback(log.err)
        d.addCallback(lambda _: info)
        return d


    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
        if self.hideNodes:
            d = defer.succeed([])
        elif self.resource is not None:
            request = PubSubRequest('discoInfo')
            resource = self.resource.locateResource(request)
            d = resource.getNodes(requestor, target, nodeIdentifier)
        elif nodeIdentifier:
            d = self.getNodes(requestor, target)
        else:
            d = defer.succeed([])

        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
                                     for node in nodes])
        return d


    def _onPubSubRequest(self, iq):
        request = self._request_class.fromElement(iq)

        if self.resource is not None:
            resource = self.resource.locateResource(request)
        else:
            resource = self

        # Preprocess the request, knowing the handling resource
        try:
            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
        except AttributeError:
            pass
        else:
            request = preProcessor(resource, request)
            if request is None:
                return defer.succeed(None)

        # Process the request itself, 
        if resource is not self:
            try:
                handler = getattr(resource, request.verb)
            except AttributeError:
                text = "Request verb: %s" % request.verb
                return defer.fail(Unsupported('', text))

            d = handler(request)
        else:
            try:
                handlerName, argNames = self._legacyHandlers[request.verb]
            except KeyError:
                text = "Request verb: %s" % request.verb
                return defer.fail(Unsupported('', text))

            handler = getattr(self, handlerName)
            args = [getattr(request, arg) for arg in argNames]
            d = handler(*args)

        # If needed, translate the result into a response
        try:
            cb = getattr(self, '_toResponse_%s' % request.verb)
        except AttributeError:
            pass
        else:
            d.addCallback(cb, resource, request)

        return d


    def _toResponse_subscribe(self, result, resource, request):
        response = domish.Element((NS_PUBSUB, "pubsub"))
        response.addChild(result.toElement(NS_PUBSUB))
        return response


    def _toResponse_subscriptions(self, result, resource, request):
        response = domish.Element((NS_PUBSUB, 'pubsub'))
        subscriptions = response.addElement('subscriptions')
        for subscription in result:
            subscriptions.addChild(subscription.toElement(NS_PUBSUB))
        return response


    def _toResponse_affiliations(self, result, resource, request):
        response = domish.Element((NS_PUBSUB, 'pubsub'))
        affiliations = response.addElement('affiliations')

        for nodeIdentifier, affiliation in result:
            item = affiliations.addElement('affiliation')
            item['node'] = nodeIdentifier
            item['affiliation'] = affiliation

        return response


    def _toResponse_create(self, result, resource, request):
        if not request.nodeIdentifier or request.nodeIdentifier != result:
            response = domish.Element((NS_PUBSUB, 'pubsub'))
            create = response.addElement('create')
            create['node'] = result
            return response
        else:
            return None


    def _formFromConfiguration(self, resource, values):
        fieldDefs = resource.getConfigurationOptions()
        form = data_form.Form(formType="form",
                              formNamespace=NS_PUBSUB_NODE_CONFIG)
        form.makeFields(values, fieldDefs)
        return form


    def _checkConfiguration(self, resource, form):
        fieldDefs = resource.getConfigurationOptions()
        form.typeCheck(fieldDefs, filterUnknown=True)


    def _preProcess_create(self, resource, request):
        if request.options:
            self._checkConfiguration(resource, request.options)
        return request


    def _preProcess_default(self, resource, request):
        if request.nodeType not in ('leaf', 'collection'):
            raise error.StanzaError('not-acceptable')
        else:
            return request


    def _toResponse_default(self, options, resource, request):
        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
        default = response.addElement("default")
        form = self._formFromConfiguration(resource, options)
        default.addChild(form.toElement())
        return response


    def _toResponse_configureGet(self, options, resource, request):
        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
        configure = response.addElement("configure")
        form = self._formFromConfiguration(resource, options)
        configure.addChild(form.toElement())

        if request.nodeIdentifier:
            configure["node"] = request.nodeIdentifier

        return response


    def _preProcess_configureSet(self, resource, request):
        if request.options.formType == 'cancel':
            return None
        else:
            self._checkConfiguration(resource, request.options)
            return request


    def _toResponse_items(self, result, resource, request):
        response = domish.Element((NS_PUBSUB, 'pubsub'))
        items = response.addElement('items')
        items["node"] = request.nodeIdentifier

        for item in result:
            if item.name == 'item':
                item.uri = NS_PUBSUB
                items.addChild(item)

        return response


    def _createNotification(self, eventType, service, nodeIdentifier,
                                  subscriber, subscriptions=None):
        headers = []

        if subscriptions:
            for subscription in subscriptions:
                if nodeIdentifier != subscription.nodeIdentifier:
                    headers.append(('Collection', subscription.nodeIdentifier))

        message = domish.Element((None, "message"))
        message["from"] = service.full()
        message["to"] = subscriber.full()
        event = message.addElement((NS_PUBSUB_EVENT, "event"))

        element = event.addElement(eventType)
        element["node"] = nodeIdentifier

        if headers:
            message.addChild(shim.Headers(headers))

        return message


    def _toResponse_affiliationsGet(self, result, resource, request):
        response = domish.Element((NS_PUBSUB_OWNER, 'pubsub'))
        affiliations = response.addElement('affiliations')

        if request.nodeIdentifier:
            affiliations['node'] = request.nodeIdentifier

        for entity, affiliation in result.iteritems():
            item = affiliations.addElement('affiliation')
            item['jid'] = entity.full()
            item['affiliation'] = affiliation

        return response


    # public methods

    def notifyPublish(self, service, nodeIdentifier, notifications):
        for subscriber, subscriptions, items in notifications:
            message = self._createNotification('items', service,
                                               nodeIdentifier, subscriber,
                                               subscriptions)
            for item in items:
                item.uri = NS_PUBSUB_EVENT
                message.event.items.addChild(item)
            self.send(message)


    def notifyDelete(self, service, nodeIdentifier, subscribers,
                           redirectURI=None):
        for subscriber in subscribers:
            message = self._createNotification('delete', service,
                                               nodeIdentifier,
                                               subscriber)
            if redirectURI:
                redirect = message.event.delete.addElement('redirect')
                redirect['uri'] = redirectURI
            self.send(message)


    def getNodeInfo(self, requestor, service, nodeIdentifier):
        return None


    def getNodes(self, requestor, service):
        return []


    def publish(self, requestor, service, nodeIdentifier, items):
        raise Unsupported('publish')


    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
        raise Unsupported('subscribe')


    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
        raise Unsupported('subscribe')


    def subscriptions(self, requestor, service):
        raise Unsupported('retrieve-subscriptions')


    def affiliations(self, requestor, service):
        raise Unsupported('retrieve-affiliations')


    def create(self, requestor, service, nodeIdentifier):
        raise Unsupported('create-nodes')


    def getConfigurationOptions(self):
        return {}


    def getDefaultConfiguration(self, requestor, service, nodeType):
        raise Unsupported('retrieve-default')


    def getConfiguration(self, requestor, service, nodeIdentifier):
        raise Unsupported('config-node')


    def setConfiguration(self, requestor, service, nodeIdentifier, options):
        raise Unsupported('config-node')


    def items(self, requestor, service, nodeIdentifier, maxItems,
                    itemIdentifiers):
        raise Unsupported('retrieve-items')


    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
        raise Unsupported('retract-items')


    def purge(self, requestor, service, nodeIdentifier):
        raise Unsupported('purge-nodes')


    def delete(self, requestor, service, nodeIdentifier):
        raise Unsupported('delete-nodes')



class PubSubResource(object):

    implements(IPubSubResource)

    features = []
    discoIdentity = disco.DiscoIdentity('pubsub',
                                        'service',
                                        'Publish-Subscribe Service')


    def locateResource(self, request):
        return self


    def getInfo(self, requestor, service, nodeIdentifier):
        return defer.succeed(None)


    def getNodes(self, requestor, service, nodeIdentifier):
        return defer.succeed([])


    def getConfigurationOptions(self):
        return {}


    def publish(self, request):
        return defer.fail(Unsupported('publish'))


    def subscribe(self, request):
        return defer.fail(Unsupported('subscribe'))


    def unsubscribe(self, request):
        return defer.fail(Unsupported('subscribe'))


    def subscriptions(self, request):
        return defer.fail(Unsupported('retrieve-subscriptions'))


    def affiliations(self, request):
        return defer.fail(Unsupported('retrieve-affiliations'))


    def create(self, request):
        return defer.fail(Unsupported('create-nodes'))


    def default(self, request):
        return defer.fail(Unsupported('retrieve-default'))


    def configureGet(self, request):
        return defer.fail(Unsupported('config-node'))


    def configureSet(self, request):
        return defer.fail(Unsupported('config-node'))


    def items(self, request):
        return defer.fail(Unsupported('retrieve-items'))


    def retract(self, request):
        return defer.fail(Unsupported('retract-items'))


    def purge(self, request):
        return defer.fail(Unsupported('purge-nodes'))


    def delete(self, request):
        return defer.fail(Unsupported('delete-nodes'))


    def affiliationsGet(self, request):
        return defer.fail(Unsupported('modify-affiliations'))


    def affiliationsSet(self, request):
        return defer.fail(Unsupported('modify-affiliations'))


    def subscriptionsGet(self, request):
        return defer.fail(Unsupported('manage-subscriptions'))


    def subscriptionsSet(self, request):
        return defer.fail(Unsupported('manage-subscriptions'))