changeset 0:09e7c32a6a00

use sat.tmp.wokkel as a buffer module until the changes are integrated to wokkel
author souliane <souliane@mailoo.org>
date Mon, 15 Dec 2014 12:46:58 +0100
parents
children 9d35f88168a1
files README __init__.py wokkel/__init__.py wokkel/delay.py wokkel/pubsub.py wokkel/rsm.py
diffstat 4 files changed, 2040 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/README	Mon Dec 15 12:46:58 2014 +0100
@@ -0,0 +1,6 @@
+Use this module to temporary store files that need to be integrated to other
+projects such as Wokkel.
+
+For example, the changeset that introduced this folder adds RSM (XEP-0059)
+support to Wokkel. The files in sat.tmp.wokkel are imported over the initial
+wokkel files from sat.plugins.__init__.py
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/wokkel/delay.py	Mon Dec 15 12:46:58 2014 +0100
@@ -0,0 +1,117 @@
+# -*- test-case-name: wokkel.test.test_delay -*-
+#
+# Copyright (c) Ralph Meijer.
+# See LICENSE for details.
+
+"""
+Delayed Delivery.
+
+Support for comunicating Delayed Delivery information as specified by
+U{XEP-0203<http://xmpp.org/extensions/xep-0203.html>} and its predecessor
+U{XEP-0091<http://xmpp.org/extensions/xep-0091.html>}.
+"""
+
+from dateutil.parser import parse
+from dateutil.tz import tzutc
+
+from twisted.words.protocols.jabber.jid import InvalidFormat, JID
+from twisted.words.xish import domish
+
+NS_DELAY = 'urn:xmpp:delay'
+NS_JABBER_DELAY = 'jabber:x:delay'
+
+class Delay(object):
+    """
+    Delayed Delivery information.
+
+    Instances of this class represent delayed delivery information that can be
+    parsed from and rendered into both XEP-0203 and legacy XEP-0091 formats.
+
+    @ivar stamp: The timestamp the stanza was originally sent.
+    @type stamp: L{datetime.datetime}
+    @ivar sender: The optional entity that originally sent the stanza or
+        delayed its delivery.
+    @type sender: L{JID}
+    """
+
+    def __init__(self, stamp, sender=None):
+        self.stamp = stamp
+        self.sender = sender
+
+
+    def toElement(self, legacy=False):
+        """
+        Render this instance into a domish Element.
+
+        @param legacy: If C{True}, use the legacy XEP-0091 format.
+        @type legacy: C{bool}
+        """
+        if not self.stamp:
+            raise ValueError("stamp is required")
+        if self.stamp.tzinfo is None:
+            raise ValueError("stamp is not offset-aware")
+
+        if legacy:
+            element = domish.Element((NS_JABBER_DELAY, 'x'))
+            stampFormat = '%Y%m%dT%H:%M:%S'
+        else:
+            element = domish.Element((NS_DELAY, 'delay'))
+            stampFormat = '%Y-%m-%dT%H:%M:%SZ'
+
+        stamp = self.stamp.astimezone(tzutc())
+        element['stamp'] = stamp.strftime(stampFormat)
+
+        if self.sender:
+            element['from'] = self.sender.full()
+
+        return element
+
+
+    @staticmethod
+    def fromElement(element):
+        """
+        Create an instance from a domish Element.
+        """
+        try:
+            stamp = parse(element[u'stamp'])
+
+            # Assume UTC if no timezone was given
+            if stamp.tzinfo is None:
+                stamp = stamp.replace(tzinfo=tzutc())
+        except (KeyError, ValueError, TypeError):
+            stamp = None
+
+        try:
+            sender = JID(element[u'from'])
+        except (KeyError, InvalidFormat):
+            sender = None
+
+        delay = Delay(stamp, sender)
+        return delay
+
+
+
+class DelayMixin(object):
+    """
+    Mixin for parsing delayed delivery information from stanzas.
+
+    This can be used as a mixin for subclasses of L{wokkel.generic.Stanza}
+    for parsing delayed delivery information. If both XEP-0203 and XEP-0091
+    formats are present, the former takes precedence.
+    """
+
+    delay = None
+
+    childParsers = {
+            (NS_DELAY, 'delay'): '_childParser_delay',
+            (NS_JABBER_DELAY, 'x'): '_childParser_legacyDelay',
+            }
+
+
+    def _childParser_delay(self, element):
+        self.delay = Delay.fromElement(element)
+
+
+    def _childParser_legacyDelay(self, element):
+        if not self.delay:
+            self.delay = Delay.fromElement(element)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/wokkel/pubsub.py	Mon Dec 15 12:46:58 2014 +0100
@@ -0,0 +1,1568 @@
+# -*- test-case-name: wokkel.test.test_pubsub -*-
+#
+# Copyright (c) Ralph Meijer.
+# See LICENSE for details.
+
+"""
+XMPP publish-subscribe protocol.
+
+This protocol is specified in
+U{XEP-0060<http://xmpp.org/extensions/xep-0060.html>}.
+"""
+
+from zope.interface import implements
+
+from twisted.internet import defer
+from twisted.python import log
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.xish import domish
+
+from wokkel import disco, data_form, generic, shim
+from wokkel.compat import IQ
+from wokkel.subprotocols import IQHandlerMixin, XMPPHandler
+from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource
+
+# Iq get and set XPath queries
+IQ_GET = '/iq[@type="get"]'
+IQ_SET = '/iq[@type="set"]'
+
+# Publish-subscribe namespaces
+NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
+NS_PUBSUB_EVENT = NS_PUBSUB + '#event'
+NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors'
+NS_PUBSUB_OWNER = NS_PUBSUB + "#owner"
+NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config"
+NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data"
+NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options"
+
+# XPath to match pubsub requests
+PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \
+                    'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \
+                           '@xmlns="' + NS_PUBSUB_OWNER + '"]'
+
+class SubscriptionPending(Exception):
+    """
+    Raised when the requested subscription is pending acceptance.
+    """
+
+
+
+class SubscriptionUnconfigured(Exception):
+    """
+    Raised when the requested subscription needs to be configured before
+    becoming active.
+    """
+
+
+
+class PubSubError(error.StanzaError):
+    """
+    Exception with publish-subscribe specific condition.
+    """
+    def __init__(self, condition, pubsubCondition, feature=None, text=None):
+        appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
+        if feature:
+            appCondition['feature'] = feature
+        error.StanzaError.__init__(self, condition,
+                                         text=text,
+                                         appCondition=appCondition)
+
+
+
+class BadRequest(error.StanzaError):
+    """
+    Bad request stanza error.
+    """
+    def __init__(self, pubsubCondition=None, text=None):
+        if pubsubCondition:
+            appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition))
+        else:
+            appCondition = None
+        error.StanzaError.__init__(self, 'bad-request',
+                                         text=text,
+                                         appCondition=appCondition)
+
+
+
+class Unsupported(PubSubError):
+    def __init__(self, feature, text=None):
+        self.feature = feature
+        PubSubError.__init__(self, 'feature-not-implemented',
+                                   'unsupported',
+                                   feature,
+                                   text)
+
+    def __str__(self):
+        message = PubSubError.__str__(self)
+        message += ', feature %r' % self.feature
+        return message
+
+
+class Subscription(object):
+    """
+    A subscription to a node.
+
+    @ivar nodeIdentifier: The identifier of the node subscribed to.  The root
+        node is denoted by C{None}.
+    @type nodeIdentifier: C{unicode}
+
+    @ivar subscriber: The subscribing entity.
+    @type subscriber: L{jid.JID}
+
+    @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
+                 C{'unconfigured'}.
+    @type state: C{unicode}
+
+    @ivar options: Optional list of subscription options.
+    @type options: C{dict}
+
+    @ivar subscriptionIdentifier: Optional subscription identifier.
+    @type subscriptionIdentifier: C{unicode}
+    """
+
+    def __init__(self, nodeIdentifier, subscriber, state, options=None,
+                       subscriptionIdentifier=None):
+        self.nodeIdentifier = nodeIdentifier
+        self.subscriber = subscriber
+        self.state = state
+        self.options = options or {}
+        self.subscriptionIdentifier = subscriptionIdentifier
+
+
+    @staticmethod
+    def fromElement(element):
+        return Subscription(
+                element.getAttribute('node'),
+                jid.JID(element.getAttribute('jid')),
+                element.getAttribute('subscription'),
+                subscriptionIdentifier=element.getAttribute('subid'))
+
+
+    def toElement(self, defaultUri=None):
+        """
+        Return the DOM representation of this subscription.
+
+        @rtype: L{domish.Element}
+        """
+        element = domish.Element((defaultUri, 'subscription'))
+        if self.nodeIdentifier:
+            element['node'] = self.nodeIdentifier
+        element['jid'] = unicode(self.subscriber)
+        element['subscription'] = self.state
+        if self.subscriptionIdentifier:
+            element['subid'] = self.subscriptionIdentifier
+        return element
+
+
+
+class Item(domish.Element):
+    """
+    Publish subscribe item.
+
+    This behaves like an object providing L{domish.IElement}.
+
+    Item payload can be added using C{addChild} or C{addRawXml}, or using the
+    C{payload} keyword argument to C{__init__}.
+    """
+
+    def __init__(self, id=None, payload=None):
+        """
+        @param id: optional item identifier
+        @type id: C{unicode}
+        @param payload: optional item payload. Either as a domish element, or
+                        as serialized XML.
+        @type payload: object providing L{domish.IElement} or C{unicode}.
+        """
+
+        domish.Element.__init__(self, (None, 'item'))
+        if id is not None:
+            self['id'] = id
+        if payload is not None:
+            if isinstance(payload, basestring):
+                self.addRawXml(payload)
+            else:
+                self.addChild(payload)
+
+
+
+class PubSubRequest(generic.Stanza):
+    """
+    A publish-subscribe request.
+
+    The set of instance variables used depends on the type of request. If
+    a variable is not applicable or not passed in the request, its value is
+    C{None}.
+
+    @ivar verb: The type of publish-subscribe request. See C{_requestVerbMap}.
+    @type verb: C{str}.
+
+    @ivar affiliations: Affiliations to be modified.
+    @type affiliations: C{set}
+
+    @ivar items: The items to be published, as L{domish.Element}s.
+    @type items: C{list}
+
+    @ivar itemIdentifiers: Identifiers of the items to be retrieved or
+                           retracted.
+    @type itemIdentifiers: C{set}
+
+    @ivar maxItems: Maximum number of items to retrieve.
+    @type maxItems: C{int}.
+
+    @ivar nodeIdentifier: Identifier of the node the request is about.
+    @type nodeIdentifier: C{unicode}
+
+    @ivar nodeType: The type of node that should be created, or for which the
+                    configuration is retrieved. C{'leaf'} or C{'collection'}.
+    @type nodeType: C{str}
+
+    @ivar options: Configurations options for nodes, subscriptions and publish
+                   requests.
+    @type options: L{data_form.Form}
+
+    @ivar subscriber: The subscribing entity.
+    @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+    @ivar subscriptionIdentifier: Identifier for a specific subscription.
+    @type subscriptionIdentifier: C{unicode}
+
+    @ivar subscriptions: Subscriptions to be modified, as a set of
+        L{Subscription}.
+    @type subscriptions: C{set}
+
+    @ivar affiliations: Affiliations to be modified, as a dictionary of entity
+        (L{JID<twisted.words.protocols.jabber.jid.JID>} to affiliation
+        (C{unicode}).
+    @type affiliations: C{dict}
+    """
+
+    verb = None
+
+    affiliations = None
+    items = None
+    itemIdentifiers = None
+    maxItems = None
+    nodeIdentifier = None
+    nodeType = None
+    options = None
+    subscriber = None
+    subscriptionIdentifier = None
+    subscriptions = None
+    affiliations = None
+
+    # Map request iq type and subelement name to request verb
+    _requestVerbMap = {
+        ('set', NS_PUBSUB, 'publish'): 'publish',
+        ('set', NS_PUBSUB, 'subscribe'): 'subscribe',
+        ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe',
+        ('get', NS_PUBSUB, 'options'): 'optionsGet',
+        ('set', NS_PUBSUB, 'options'): 'optionsSet',
+        ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions',
+        ('get', NS_PUBSUB, 'affiliations'): 'affiliations',
+        ('set', NS_PUBSUB, 'create'): 'create',
+        ('get', NS_PUBSUB_OWNER, 'default'): 'default',
+        ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet',
+        ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet',
+        ('get', NS_PUBSUB, 'items'): 'items',
+        ('set', NS_PUBSUB, 'retract'): 'retract',
+        ('set', NS_PUBSUB_OWNER, 'purge'): 'purge',
+        ('set', NS_PUBSUB_OWNER, 'delete'): 'delete',
+        ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet',
+        ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet',
+        ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet',
+        ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet',
+    }
+
+    # Map request verb to request iq type and subelement name
+    _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems()))
+
+    # Map request verb to parameter handler names
+    _parameters = {
+        'publish': ['node', 'items'],
+        'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
+        'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
+        'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
+        'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
+        'subscriptions': [],
+        'affiliations': [],
+        'create': ['nodeOrNone', 'configureOrNone'],
+        'default': ['default'],
+        'configureGet': ['nodeOrEmpty'],
+        'configureSet': ['nodeOrEmpty', 'configure'],
+        'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
+        'retract': ['node', 'itemIdentifiers'],
+        'purge': ['node'],
+        'delete': ['node'],
+        'affiliationsGet': ['nodeOrEmpty'],
+        'affiliationsSet': ['nodeOrEmpty', 'affiliations'],
+        'subscriptionsGet': ['nodeOrEmpty'],
+        'subscriptionsSet': [],
+    }
+
+    def __init__(self, verb=None):
+        self.verb = verb
+
+
+    def _parse_node(self, verbElement):
+        """
+        Parse the required node identifier out of the verbElement.
+        """
+        try:
+            self.nodeIdentifier = verbElement["node"]
+        except KeyError:
+            raise BadRequest('nodeid-required')
+
+
+    def _render_node(self, verbElement):
+        """
+        Render the required node identifier on the verbElement.
+        """
+        if not self.nodeIdentifier:
+            raise Exception("Node identifier is required")
+
+        verbElement['node'] = self.nodeIdentifier
+
+
+    def _parse_nodeOrEmpty(self, verbElement):
+        """
+        Parse the node identifier out of the verbElement. May be empty.
+        """
+        self.nodeIdentifier = verbElement.getAttribute("node", '')
+
+
+    def _render_nodeOrEmpty(self, verbElement):
+        """
+        Render the node identifier on the verbElement. May be empty.
+        """
+        if self.nodeIdentifier:
+            verbElement['node'] = self.nodeIdentifier
+
+
+    def _parse_nodeOrNone(self, verbElement):
+        """
+        Parse the optional node identifier out of the verbElement.
+        """
+        self.nodeIdentifier = verbElement.getAttribute("node")
+
+
+    def _render_nodeOrNone(self, verbElement):
+        """
+        Render the optional node identifier on the verbElement.
+        """
+        if self.nodeIdentifier:
+            verbElement['node'] = self.nodeIdentifier
+
+
+    def _parse_items(self, verbElement):
+        """
+        Parse items out of the verbElement for publish requests.
+        """
+        self.items = []
+        for element in verbElement.elements():
+            if element.uri == NS_PUBSUB and element.name == 'item':
+                self.items.append(element)
+
+
+    def _render_items(self, verbElement):
+        """
+        Render items into the verbElement for publish requests.
+        """
+        if self.items:
+            for item in self.items:
+                item.uri = NS_PUBSUB
+                verbElement.addChild(item)
+
+
+    def _parse_jid(self, verbElement):
+        """
+        Parse subscriber out of the verbElement for un-/subscribe requests.
+        """
+        try:
+            self.subscriber = jid.internJID(verbElement["jid"])
+        except KeyError:
+            raise BadRequest('jid-required')
+
+
+    def _render_jid(self, verbElement):
+        """
+        Render subscriber into the verbElement for un-/subscribe requests.
+        """
+        verbElement['jid'] = self.subscriber.full()
+
+
+    def _parse_default(self, verbElement):
+        """
+        Parse node type out of a request for the default node configuration.
+        """
+        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
+        if form is not None and form.formType == 'submit':
+            values = form.getValues()
+            self.nodeType = values.get('pubsub#node_type', 'leaf')
+        else:
+            self.nodeType = 'leaf'
+
+
+    def _parse_configure(self, verbElement):
+        """
+        Parse options out of a request for setting the node configuration.
+        """
+        form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG)
+        if form is not None:
+            if form.formType in ('submit', 'cancel'):
+                self.options = form
+            else:
+                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
+        else:
+            raise BadRequest(text="Missing configuration form")
+
+
+    def _parse_configureOrNone(self, verbElement):
+        """
+        Parse optional node configuration form in create request.
+        """
+        for element in verbElement.parent.elements():
+            if element.uri == NS_PUBSUB and element.name == 'configure':
+                form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG)
+                if form is not None:
+                    if form.formType != 'submit':
+                        raise BadRequest(text=u"Unexpected form type '%s'" %
+                                              form.formType)
+                else:
+                    form = data_form.Form('submit',
+                                          formNamespace=NS_PUBSUB_NODE_CONFIG)
+                self.options = form
+
+
+    def _render_configureOrNone(self, verbElement):
+        """
+        Render optional node configuration form in create request.
+        """
+        if self.options is not None:
+            configure = verbElement.parent.addElement('configure')
+            configure.addChild(self.options.toElement())
+
+
+    def _parse_itemIdentifiers(self, verbElement):
+        """
+        Parse item identifiers out of items and retract requests.
+        """
+        self.itemIdentifiers = []
+        for element in verbElement.elements():
+            if element.uri == NS_PUBSUB and element.name == 'item':
+                try:
+                    self.itemIdentifiers.append(element["id"])
+                except KeyError:
+                    raise BadRequest()
+
+
+    def _render_itemIdentifiers(self, verbElement):
+        """
+        Render item identifiers into items and retract requests.
+        """
+        if self.itemIdentifiers:
+            for itemIdentifier in self.itemIdentifiers:
+                item = verbElement.addElement('item')
+                item['id'] = itemIdentifier
+
+
+    def _parse_maxItems(self, verbElement):
+        """
+        Parse maximum items out of an items request.
+        """
+        value = verbElement.getAttribute('max_items')
+
+        if value:
+            try:
+                self.maxItems = int(value)
+            except ValueError:
+                raise BadRequest(text="Field max_items requires a positive " +
+                                      "integer value")
+
+
+    def _render_maxItems(self, verbElement):
+        """
+        Render maximum items into an items request.
+        """
+        if self.maxItems:
+            verbElement['max_items'] = unicode(self.maxItems)
+
+
+    def _parse_subidOrNone(self, verbElement):
+        """
+        Parse subscription identifier out of a request.
+        """
+        self.subscriptionIdentifier = verbElement.getAttribute("subid")
+
+
+    def _render_subidOrNone(self, verbElement):
+        """
+        Render subscription identifier into a request.
+        """
+        if self.subscriptionIdentifier:
+            verbElement['subid'] = self.subscriptionIdentifier
+
+
+    def _parse_options(self, verbElement):
+        """
+        Parse options form out of a subscription options request.
+        """
+        form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS)
+        if form is not None:
+            if form.formType in ('submit', 'cancel'):
+                self.options = form
+            else:
+                raise BadRequest(text=u"Unexpected form type '%s'" % form.formType)
+        else:
+            raise BadRequest(text="Missing options form")
+
+
+
+    def _render_options(self, verbElement):
+        verbElement.addChild(self.options.toElement())
+
+
+    def _parse_optionsWithSubscribe(self, verbElement):
+        for element in verbElement.parent.elements():
+            if element.name == 'options' and element.uri == NS_PUBSUB:
+                form = data_form.findForm(element,
+                                          NS_PUBSUB_SUBSCRIBE_OPTIONS)
+                if form is not None:
+                    if form.formType != 'submit':
+                        raise BadRequest(text=u"Unexpected form type '%s'" %
+                                              form.formType)
+                else:
+                    form = data_form.Form('submit',
+                                          formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
+                self.options = form
+
+
+    def _render_optionsWithSubscribe(self, verbElement):
+        if self.options is not None:
+            optionsElement = verbElement.parent.addElement('options')
+            self._render_options(optionsElement)
+
+
+    def _parse_affiliations(self, verbElement):
+        self.affiliations = {}
+        for element in verbElement.elements():
+            if (element.uri == NS_PUBSUB_OWNER and
+                element.name == 'affiliation'):
+                try:
+                    entity = jid.internJID(element['jid']).userhostJID()
+                except KeyError:
+                    raise BadRequest(text='Missing jid attribute')
+
+                if entity in self.affiliations:
+                    raise BadRequest(text='Multiple affiliations for an entity')
+
+                try:
+                    affiliation = element['affiliation']
+                except KeyError:
+                    raise BadRequest(text='Missing affiliation attribute')
+
+                self.affiliations[entity] = affiliation
+
+
+    def parseElement(self, element):
+        """
+        Parse the publish-subscribe verb and parameters out of a request.
+        """
+        generic.Stanza.parseElement(self, element)
+
+        verbs = []
+        verbElements = []
+        for child in element.pubsub.elements():
+            key = (self.stanzaType, child.uri, child.name)
+            try:
+                verb = self._requestVerbMap[key]
+            except KeyError:
+                continue
+
+            verbs.append(verb)
+            verbElements.append(child)
+
+        if not verbs:
+            raise NotImplementedError()
+
+        if len(verbs) > 1:
+            if 'optionsSet' in verbs and 'subscribe' in verbs:
+                self.verb = 'subscribe'
+                verbElement = verbElements[verbs.index('subscribe')]
+            else:
+                raise NotImplementedError()
+        else:
+            self.verb = verbs[0]
+            verbElement = verbElements[0]
+
+        for parameter in self._parameters[self.verb]:
+            getattr(self, '_parse_%s' % parameter)(verbElement)
+
+
+
+    def send(self, xs):
+        """
+        Send this request to its recipient.
+
+        This renders all of the relevant parameters for this specific
+        requests into an L{IQ}, and invoke its C{send} method.
+        This returns a deferred that fires upon reception of a response. See
+        L{IQ} for details.
+
+        @param xs: The XML stream to send the request on.
+        @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream}
+        @rtype: L{defer.Deferred}.
+        """
+
+        try:
+            (self.stanzaType,
+             childURI,
+             childName) = self._verbRequestMap[self.verb]
+        except KeyError:
+            raise NotImplementedError()
+
+        iq = IQ(xs, self.stanzaType)
+        iq.addElement((childURI, 'pubsub'))
+        verbElement = iq.pubsub.addElement(childName)
+
+        if self.sender:
+            iq['from'] = self.sender.full()
+        if self.recipient:
+            iq['to'] = self.recipient.full()
+
+        for parameter in self._parameters[self.verb]:
+            getattr(self, '_render_%s' % parameter)(verbElement)
+
+        return iq.send()
+
+
+
+class PubSubEvent(object):
+    """
+    A publish subscribe event.
+
+    @param sender: The entity from which the notification was received.
+    @type sender: L{jid.JID}
+    @param recipient: The entity to which the notification was sent.
+    @type recipient: L{wokkel.pubsub.ItemsEvent}
+    @param nodeIdentifier: Identifier of the node the event pertains to.
+    @type nodeIdentifier: C{unicode}
+    @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}.
+    @type headers: C{dict}
+    """
+
+    def __init__(self, sender, recipient, nodeIdentifier, headers):
+        self.sender = sender
+        self.recipient = recipient
+        self.nodeIdentifier = nodeIdentifier
+        self.headers = headers
+
+
+
+class ItemsEvent(PubSubEvent):
+    """
+    A publish-subscribe event that signifies new, updated and retracted items.
+
+    @param items: List of received items as domish elements.
+    @type items: C{list} of L{domish.Element}
+    """
+
+    def __init__(self, sender, recipient, nodeIdentifier, items, headers):
+        PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers)
+        self.items = items
+
+
+
+class DeleteEvent(PubSubEvent):
+    """
+    A publish-subscribe event that signifies the deletion of a node.
+    """
+
+    redirectURI = None
+
+
+
+class PurgeEvent(PubSubEvent):
+    """
+    A publish-subscribe event that signifies the purging of a node.
+    """
+
+
+
+class PubSubClient(XMPPHandler):
+    """
+    Publish subscribe client protocol.
+    """
+
+    implements(IPubSubClient)
+
+    def connectionInitialized(self):
+        self.xmlstream.addObserver('/message/event[@xmlns="%s"]' %
+                                   NS_PUBSUB_EVENT, self._onEvent)
+
+
+    def _onEvent(self, message):
+        if message.getAttribute('type') == 'error':
+            return
+
+        try:
+            sender = jid.JID(message["from"])
+            recipient = jid.JID(message["to"])
+        except KeyError:
+            return
+
+        actionElement = None
+        for element in message.event.elements():
+            if element.uri == NS_PUBSUB_EVENT:
+                actionElement = element
+
+        if not actionElement:
+            return
+
+        eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None)
+
+        if eventHandler:
+            headers = shim.extractHeaders(message)
+            eventHandler(sender, recipient, actionElement, headers)
+            message.handled = True
+
+
+    def _onEvent_items(self, sender, recipient, action, headers):
+        nodeIdentifier = action["node"]
+
+        items = [element for element in action.elements()
+                         if element.name in ('item', 'retract')]
+
+        event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers)
+        self.itemsReceived(event)
+
+
+    def _onEvent_delete(self, sender, recipient, action, headers):
+        nodeIdentifier = action["node"]
+        event = DeleteEvent(sender, recipient, nodeIdentifier, headers)
+        if action.redirect:
+            event.redirectURI = action.redirect.getAttribute('uri')
+        self.deleteReceived(event)
+
+
+    def _onEvent_purge(self, sender, recipient, action, headers):
+        nodeIdentifier = action["node"]
+        event = PurgeEvent(sender, recipient, nodeIdentifier, headers)
+        self.purgeReceived(event)
+
+
+    def itemsReceived(self, event):
+        pass
+
+
+    def deleteReceived(self, event):
+        pass
+
+
+    def purgeReceived(self, event):
+        pass
+
+
+    def createNode(self, service, nodeIdentifier=None, options=None,
+                         sender=None):
+        """
+        Create a publish subscribe node.
+
+        @param service: The publish subscribe service to create the node at.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: Optional suggestion for the id of the node.
+        @type nodeIdentifier: C{unicode}
+        @param options: Optional node configuration options.
+        @type options: C{dict}
+        """
+        request = PubSubRequest('create')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.sender = sender
+
+        if options:
+            form = data_form.Form(formType='submit',
+                                  formNamespace=NS_PUBSUB_NODE_CONFIG)
+            form.makeFields(options)
+            request.options = form
+
+        def cb(iq):
+            try:
+                new_node = iq.pubsub.create["node"]
+            except AttributeError:
+                # the suggested node identifier was accepted
+                new_node = nodeIdentifier
+            return new_node
+
+        d = request.send(self.xmlstream)
+        d.addCallback(cb)
+        return d
+
+
+    def deleteNode(self, service, nodeIdentifier, sender=None):
+        """
+        Delete a publish subscribe node.
+
+        @param service: The publish subscribe service to delete the node from.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+        """
+        request = PubSubRequest('delete')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.sender = sender
+        return request.send(self.xmlstream)
+
+
+    def subscribe(self, service, nodeIdentifier, subscriber,
+                        options=None, sender=None):
+        """
+        Subscribe to a publish subscribe node.
+
+        @param service: The publish subscribe service that keeps the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+
+        @param subscriber: The entity to subscribe to the node. This entity
+            will get notifications of new published items.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param options: Subscription options.
+        @type options: C{dict}
+
+        @return: Deferred that fires with L{Subscription} or errbacks with
+            L{SubscriptionPending} or L{SubscriptionUnconfigured}.
+        @rtype: L{defer.Deferred}
+        """
+        request = PubSubRequest('subscribe')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.subscriber = subscriber
+        request.sender = sender
+
+        if options:
+            form = data_form.Form(formType='submit',
+                                  formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
+            form.makeFields(options)
+            request.options = form
+
+        def cb(iq):
+            subscription = Subscription.fromElement(iq.pubsub.subscription)
+
+            if subscription.state == 'pending':
+                raise SubscriptionPending()
+            elif subscription.state == 'unconfigured':
+                raise SubscriptionUnconfigured()
+            else:
+                # we assume subscription == 'subscribed'
+                # any other value would be invalid, but that should have
+                # yielded a stanza error.
+                return subscription
+
+        d = request.send(self.xmlstream)
+        d.addCallback(cb)
+        return d
+
+
+    def unsubscribe(self, service, nodeIdentifier, subscriber,
+                          subscriptionIdentifier=None, sender=None):
+        """
+        Unsubscribe from a publish subscribe node.
+
+        @param service: The publish subscribe service that keeps the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+
+        @param subscriber: The entity to unsubscribe from the node.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param subscriptionIdentifier: Optional subscription identifier.
+        @type subscriptionIdentifier: C{unicode}
+        """
+        request = PubSubRequest('unsubscribe')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.subscriber = subscriber
+        request.subscriptionIdentifier = subscriptionIdentifier
+        request.sender = sender
+        return request.send(self.xmlstream)
+
+
+    def publish(self, service, nodeIdentifier, items=None, sender=None):
+        """
+        Publish to a publish subscribe node.
+
+        @param service: The publish subscribe service that keeps the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+        @param items: Optional list of L{Item}s to publish.
+        @type items: C{list}
+        """
+        request = PubSubRequest('publish')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.items = items
+        request.sender = sender
+        return request.send(self.xmlstream)
+
+
+    def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None,
+              subscriptionIdentifier=None, sender=None):
+        """
+        Retrieve previously published items from a publish subscribe node.
+
+        @param service: The publish subscribe service that keeps the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+
+        @param maxItems: Optional limit on the number of retrieved items.
+        @type maxItems: C{int}
+
+        @param itemIdentifiers: Identifiers of the items to be retrieved.
+        @type itemIdentifiers: C{set}
+
+        @param subscriptionIdentifier: Optional subscription identifier. In
+            case the node has been subscribed to multiple times, this narrows
+            the results to the specific subscription.
+        @type subscriptionIdentifier: C{unicode}
+        """
+        request = PubSubRequest('items')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        if maxItems:
+            request.maxItems = str(int(maxItems))
+        request.subscriptionIdentifier = subscriptionIdentifier
+        request.sender = sender
+        request.itemIdentifiers = itemIdentifiers
+
+        def cb(iq):
+            items = []
+            for element in iq.pubsub.items.elements():
+                if element.uri == NS_PUBSUB and element.name == 'item':
+                    items.append(element)
+            return items
+
+        d = request.send(self.xmlstream)
+        d.addCallback(cb)
+        return d
+
+    def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None):
+        """
+        Retract items from a publish subscribe node.
+
+        @param service: The publish subscribe service to delete the node from.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+        @param itemIdentifiers: Identifiers of the items to be retracted.
+        @type itemIdentifiers: C{set}
+        """
+        request = PubSubRequest('retract')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.itemIdentifiers = itemIdentifiers
+        request.sender = sender
+        return request.send(self.xmlstream)
+
+    def getOptions(self, service, nodeIdentifier, subscriber,
+                         subscriptionIdentifier=None, sender=None):
+        """
+        Get subscription options.
+
+        @param service: The publish subscribe service that keeps the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+
+        @param subscriber: The entity subscribed to the node.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param subscriptionIdentifier: Optional subscription identifier.
+        @type subscriptionIdentifier: C{unicode}
+
+        @rtype: L{data_form.Form}
+        """
+        request = PubSubRequest('optionsGet')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.subscriber = subscriber
+        request.subscriptionIdentifier = subscriptionIdentifier
+        request.sender = sender
+
+        def cb(iq):
+            form = data_form.findForm(iq.pubsub.options,
+                                      NS_PUBSUB_SUBSCRIBE_OPTIONS)
+            form.typeCheck()
+            return form
+
+        d = request.send(self.xmlstream)
+        d.addCallback(cb)
+        return d
+
+
+    def setOptions(self, service, nodeIdentifier, subscriber,
+                         options, subscriptionIdentifier=None, sender=None):
+        """
+        Set subscription options.
+
+        @param service: The publish subscribe service that keeps the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+
+        @param subscriber: The entity subscribed to the node.
+        @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param options: Subscription options.
+        @type options: C{dict}.
+
+        @param subscriptionIdentifier: Optional subscription identifier.
+        @type subscriptionIdentifier: C{unicode}
+        """
+        request = PubSubRequest('optionsSet')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.subscriber = subscriber
+        request.subscriptionIdentifier = subscriptionIdentifier
+        request.sender = sender
+
+        form = data_form.Form(formType='submit',
+                              formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS)
+        form.makeFields(options)
+        request.options = form
+
+        d = request.send(self.xmlstream)
+        return d
+
+
+
+class PubSubService(XMPPHandler, IQHandlerMixin):
+    """
+    Protocol implementation for a XMPP Publish Subscribe Service.
+
+    The word Service here is used as taken from the Publish Subscribe
+    specification. It is the party responsible for keeping nodes and their
+    subscriptions, and sending out notifications.
+
+    Methods from the L{IPubSubService} interface that are called as a result
+    of an XMPP request may raise exceptions. Alternatively the deferred
+    returned by these methods may have their errback called. These are handled
+    as follows:
+
+     - If the exception is an instance of L{error.StanzaError}, an error
+       response iq is returned.
+     - Any other exception is reported using L{log.msg}. An error response
+       with the condition C{internal-server-error} is returned.
+
+    The default implementation of said methods raises an L{Unsupported}
+    exception and are meant to be overridden.
+
+    @ivar discoIdentity: Service discovery identity as a dictionary with
+                         keys C{'category'}, C{'type'} and C{'name'}.
+    @ivar pubSubFeatures: List of supported publish-subscribe features for
+                          service discovery, as C{str}.
+    @type pubSubFeatures: C{list} or C{None}
+    """
+
+    implements(IPubSubService, disco.IDisco)
+
+    iqHandlers = {
+            '/*': '_onPubSubRequest',
+            }
+
+    _legacyHandlers = {
+        'publish': ('publish', ['sender', 'recipient',
+                                'nodeIdentifier', 'items']),
+        'subscribe': ('subscribe', ['sender', 'recipient',
+                                    'nodeIdentifier', 'subscriber']),
+        'unsubscribe': ('unsubscribe', ['sender', 'recipient',
+                                        'nodeIdentifier', 'subscriber']),
+        'subscriptions': ('subscriptions', ['sender', 'recipient']),
+        'affiliations': ('affiliations', ['sender', 'recipient']),
+        'create': ('create', ['sender', 'recipient', 'nodeIdentifier']),
+        'getConfigurationOptions': ('getConfigurationOptions', []),
+        'default': ('getDefaultConfiguration',
+                    ['sender', 'recipient', 'nodeType']),
+        'configureGet': ('getConfiguration', ['sender', 'recipient',
+                                              'nodeIdentifier']),
+        'configureSet': ('setConfiguration', ['sender', 'recipient',
+                                              'nodeIdentifier', 'options']),
+        'items': ('items', ['sender', 'recipient', 'nodeIdentifier',
+                            'maxItems', 'itemIdentifiers']),
+        'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier',
+                                'itemIdentifiers']),
+        'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']),
+        'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']),
+    }
+
+    _request_class = PubSubRequest
+
+    hideNodes = False
+
+    def __init__(self, resource=None):
+        self.resource = resource
+        self.discoIdentity = {'category': 'pubsub',
+                              'type': 'service',
+                              'name': 'Generic Publish-Subscribe Service'}
+
+        self.pubSubFeatures = []
+
+
+    def connectionMade(self):
+        self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest)
+
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
+        def toInfo(nodeInfo):
+            if not nodeInfo:
+                return
+
+            (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data']
+            info.append(disco.DiscoIdentity('pubsub', nodeType))
+            if metaData:
+                form = data_form.Form(formType="result",
+                                      formNamespace=NS_PUBSUB_META_DATA)
+                form.addField(
+                        data_form.Field(
+                            var='pubsub#node_type',
+                            value=nodeType,
+                            label='The type of node (collection or leaf)'
+                        )
+                )
+
+                for metaDatum in metaData:
+                    form.addField(data_form.Field.fromDict(metaDatum))
+
+                info.append(form)
+
+            return
+
+        info = []
+
+        request = PubSubRequest('discoInfo')
+
+        if self.resource is not None:
+            resource = self.resource.locateResource(request)
+            identity = resource.discoIdentity
+            features = resource.features
+            getInfo = resource.getInfo
+        else:
+            category = self.discoIdentity['category']
+            idType = self.discoIdentity['type']
+            name = self.discoIdentity['name']
+            identity = disco.DiscoIdentity(category, idType, name)
+            features = self.pubSubFeatures
+            getInfo = self.getNodeInfo
+
+        if not nodeIdentifier:
+            info.append(identity)
+            info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS))
+            info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature))
+                         for feature in features])
+
+        d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '')
+        d.addCallback(toInfo)
+        d.addErrback(log.err)
+        d.addCallback(lambda _: info)
+        return d
+
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
+        if self.hideNodes:
+            d = defer.succeed([])
+        elif self.resource is not None:
+            request = PubSubRequest('discoInfo')
+            resource = self.resource.locateResource(request)
+            d = resource.getNodes(requestor, target, nodeIdentifier)
+        elif nodeIdentifier:
+            d = self.getNodes(requestor, target)
+        else:
+            d = defer.succeed([])
+
+        d.addCallback(lambda nodes: [disco.DiscoItem(target, node)
+                                     for node in nodes])
+        return d
+
+
+    def _onPubSubRequest(self, iq):
+        request = self._request_class.fromElement(iq)
+
+        if self.resource is not None:
+            resource = self.resource.locateResource(request)
+        else:
+            resource = self
+
+        # Preprocess the request, knowing the handling resource
+        try:
+            preProcessor = getattr(self, '_preProcess_%s' % request.verb)
+        except AttributeError:
+            pass
+        else:
+            request = preProcessor(resource, request)
+            if request is None:
+                return defer.succeed(None)
+
+        # Process the request itself, 
+        if resource is not self:
+            try:
+                handler = getattr(resource, request.verb)
+            except AttributeError:
+                text = "Request verb: %s" % request.verb
+                return defer.fail(Unsupported('', text))
+
+            d = handler(request)
+        else:
+            try:
+                handlerName, argNames = self._legacyHandlers[request.verb]
+            except KeyError:
+                text = "Request verb: %s" % request.verb
+                return defer.fail(Unsupported('', text))
+
+            handler = getattr(self, handlerName)
+            args = [getattr(request, arg) for arg in argNames]
+            d = handler(*args)
+
+        # If needed, translate the result into a response
+        try:
+            cb = getattr(self, '_toResponse_%s' % request.verb)
+        except AttributeError:
+            pass
+        else:
+            d.addCallback(cb, resource, request)
+
+        return d
+
+
+    def _toResponse_subscribe(self, result, resource, request):
+        response = domish.Element((NS_PUBSUB, "pubsub"))
+        response.addChild(result.toElement(NS_PUBSUB))
+        return response
+
+
+    def _toResponse_subscriptions(self, result, resource, request):
+        response = domish.Element((NS_PUBSUB, 'pubsub'))
+        subscriptions = response.addElement('subscriptions')
+        for subscription in result:
+            subscriptions.addChild(subscription.toElement(NS_PUBSUB))
+        return response
+
+
+    def _toResponse_affiliations(self, result, resource, request):
+        response = domish.Element((NS_PUBSUB, 'pubsub'))
+        affiliations = response.addElement('affiliations')
+
+        for nodeIdentifier, affiliation in result:
+            item = affiliations.addElement('affiliation')
+            item['node'] = nodeIdentifier
+            item['affiliation'] = affiliation
+
+        return response
+
+
+    def _toResponse_create(self, result, resource, request):
+        if not request.nodeIdentifier or request.nodeIdentifier != result:
+            response = domish.Element((NS_PUBSUB, 'pubsub'))
+            create = response.addElement('create')
+            create['node'] = result
+            return response
+        else:
+            return None
+
+
+    def _formFromConfiguration(self, resource, values):
+        fieldDefs = resource.getConfigurationOptions()
+        form = data_form.Form(formType="form",
+                              formNamespace=NS_PUBSUB_NODE_CONFIG)
+        form.makeFields(values, fieldDefs)
+        return form
+
+
+    def _checkConfiguration(self, resource, form):
+        fieldDefs = resource.getConfigurationOptions()
+        form.typeCheck(fieldDefs, filterUnknown=True)
+
+
+    def _preProcess_create(self, resource, request):
+        if request.options:
+            self._checkConfiguration(resource, request.options)
+        return request
+
+
+    def _preProcess_default(self, resource, request):
+        if request.nodeType not in ('leaf', 'collection'):
+            raise error.StanzaError('not-acceptable')
+        else:
+            return request
+
+
+    def _toResponse_default(self, options, resource, request):
+        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
+        default = response.addElement("default")
+        form = self._formFromConfiguration(resource, options)
+        default.addChild(form.toElement())
+        return response
+
+
+    def _toResponse_configureGet(self, options, resource, request):
+        response = domish.Element((NS_PUBSUB_OWNER, "pubsub"))
+        configure = response.addElement("configure")
+        form = self._formFromConfiguration(resource, options)
+        configure.addChild(form.toElement())
+
+        if request.nodeIdentifier:
+            configure["node"] = request.nodeIdentifier
+
+        return response
+
+
+    def _preProcess_configureSet(self, resource, request):
+        if request.options.formType == 'cancel':
+            return None
+        else:
+            self._checkConfiguration(resource, request.options)
+            return request
+
+
+    def _toResponse_items(self, result, resource, request):
+        response = domish.Element((NS_PUBSUB, 'pubsub'))
+        items = response.addElement('items')
+        items["node"] = request.nodeIdentifier
+
+        for item in result:
+            if item.name == 'item':
+                item.uri = NS_PUBSUB
+                items.addChild(item)
+
+        return response
+
+
+    def _createNotification(self, eventType, service, nodeIdentifier,
+                                  subscriber, subscriptions=None):
+        headers = []
+
+        if subscriptions:
+            for subscription in subscriptions:
+                if nodeIdentifier != subscription.nodeIdentifier:
+                    headers.append(('Collection', subscription.nodeIdentifier))
+
+        message = domish.Element((None, "message"))
+        message["from"] = service.full()
+        message["to"] = subscriber.full()
+        event = message.addElement((NS_PUBSUB_EVENT, "event"))
+
+        element = event.addElement(eventType)
+        element["node"] = nodeIdentifier
+
+        if headers:
+            message.addChild(shim.Headers(headers))
+
+        return message
+
+
+    def _toResponse_affiliationsGet(self, result, resource, request):
+        response = domish.Element((NS_PUBSUB_OWNER, 'pubsub'))
+        affiliations = response.addElement('affiliations')
+
+        if request.nodeIdentifier:
+            affiliations['node'] = request.nodeIdentifier
+
+        for entity, affiliation in result.iteritems():
+            item = affiliations.addElement('affiliation')
+            item['jid'] = entity.full()
+            item['affiliation'] = affiliation
+
+        return response
+
+
+    # public methods
+
+    def notifyPublish(self, service, nodeIdentifier, notifications):
+        for subscriber, subscriptions, items in notifications:
+            message = self._createNotification('items', service,
+                                               nodeIdentifier, subscriber,
+                                               subscriptions)
+            for item in items:
+                item.uri = NS_PUBSUB_EVENT
+                message.event.items.addChild(item)
+            self.send(message)
+
+
+    def notifyDelete(self, service, nodeIdentifier, subscribers,
+                           redirectURI=None):
+        for subscriber in subscribers:
+            message = self._createNotification('delete', service,
+                                               nodeIdentifier,
+                                               subscriber)
+            if redirectURI:
+                redirect = message.event.delete.addElement('redirect')
+                redirect['uri'] = redirectURI
+            self.send(message)
+
+
+    def getNodeInfo(self, requestor, service, nodeIdentifier):
+        return None
+
+
+    def getNodes(self, requestor, service):
+        return []
+
+
+    def publish(self, requestor, service, nodeIdentifier, items):
+        raise Unsupported('publish')
+
+
+    def subscribe(self, requestor, service, nodeIdentifier, subscriber):
+        raise Unsupported('subscribe')
+
+
+    def unsubscribe(self, requestor, service, nodeIdentifier, subscriber):
+        raise Unsupported('subscribe')
+
+
+    def subscriptions(self, requestor, service):
+        raise Unsupported('retrieve-subscriptions')
+
+
+    def affiliations(self, requestor, service):
+        raise Unsupported('retrieve-affiliations')
+
+
+    def create(self, requestor, service, nodeIdentifier):
+        raise Unsupported('create-nodes')
+
+
+    def getConfigurationOptions(self):
+        return {}
+
+
+    def getDefaultConfiguration(self, requestor, service, nodeType):
+        raise Unsupported('retrieve-default')
+
+
+    def getConfiguration(self, requestor, service, nodeIdentifier):
+        raise Unsupported('config-node')
+
+
+    def setConfiguration(self, requestor, service, nodeIdentifier, options):
+        raise Unsupported('config-node')
+
+
+    def items(self, requestor, service, nodeIdentifier, maxItems,
+                    itemIdentifiers):
+        raise Unsupported('retrieve-items')
+
+
+    def retract(self, requestor, service, nodeIdentifier, itemIdentifiers):
+        raise Unsupported('retract-items')
+
+
+    def purge(self, requestor, service, nodeIdentifier):
+        raise Unsupported('purge-nodes')
+
+
+    def delete(self, requestor, service, nodeIdentifier):
+        raise Unsupported('delete-nodes')
+
+
+
+class PubSubResource(object):
+
+    implements(IPubSubResource)
+
+    features = []
+    discoIdentity = disco.DiscoIdentity('pubsub',
+                                        'service',
+                                        'Publish-Subscribe Service')
+
+
+    def locateResource(self, request):
+        return self
+
+
+    def getInfo(self, requestor, service, nodeIdentifier):
+        return defer.succeed(None)
+
+
+    def getNodes(self, requestor, service, nodeIdentifier):
+        return defer.succeed([])
+
+
+    def getConfigurationOptions(self):
+        return {}
+
+
+    def publish(self, request):
+        return defer.fail(Unsupported('publish'))
+
+
+    def subscribe(self, request):
+        return defer.fail(Unsupported('subscribe'))
+
+
+    def unsubscribe(self, request):
+        return defer.fail(Unsupported('subscribe'))
+
+
+    def subscriptions(self, request):
+        return defer.fail(Unsupported('retrieve-subscriptions'))
+
+
+    def affiliations(self, request):
+        return defer.fail(Unsupported('retrieve-affiliations'))
+
+
+    def create(self, request):
+        return defer.fail(Unsupported('create-nodes'))
+
+
+    def default(self, request):
+        return defer.fail(Unsupported('retrieve-default'))
+
+
+    def configureGet(self, request):
+        return defer.fail(Unsupported('config-node'))
+
+
+    def configureSet(self, request):
+        return defer.fail(Unsupported('config-node'))
+
+
+    def items(self, request):
+        return defer.fail(Unsupported('retrieve-items'))
+
+
+    def retract(self, request):
+        return defer.fail(Unsupported('retract-items'))
+
+
+    def purge(self, request):
+        return defer.fail(Unsupported('purge-nodes'))
+
+
+    def delete(self, request):
+        return defer.fail(Unsupported('delete-nodes'))
+
+
+    def affiliationsGet(self, request):
+        return defer.fail(Unsupported('modify-affiliations'))
+
+
+    def affiliationsSet(self, request):
+        return defer.fail(Unsupported('modify-affiliations'))
+
+
+    def subscriptionsGet(self, request):
+        return defer.fail(Unsupported('manage-subscriptions'))
+
+
+    def subscriptionsSet(self, request):
+        return defer.fail(Unsupported('manage-subscriptions'))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/wokkel/rsm.py	Mon Dec 15 12:46:58 2014 +0100
@@ -0,0 +1,349 @@
+# -*- test-case-name: wokkel.test.test_rsm -*-
+#
+# Copyright (c) Adrien Cossa.
+# See LICENSE for details.
+
+"""
+XMPP Result Set Management protocol.
+
+This protocol is specified in
+U{XEP-0059<http://xmpp.org/extensions/xep-0059.html>}.
+"""
+
+from twisted.python import log
+from twisted.words.xish import domish
+
+import pubsub
+import copy
+
+
+# RSM namespace
+NS_RSM = 'http://jabber.org/protocol/rsm'
+
+
+class RSMRequest():
+    """
+    A Result Set Management request.
+
+    @ivar max: limit on the number of retrieved items.
+    @itype max: C{int} or C{unicode}
+
+    @ivar index: starting index of the requested page.
+    @itype index: C{int} or C{unicode}
+
+    @ivar after: ID of the element immediately preceding the page.
+    @itype after: C{unicode}
+
+    @ivar before: ID of the element immediately following the page.
+    @itype before: C{unicode}
+    """
+
+    max = 10
+    index = None
+    after = None
+    before = None
+
+    def __init__(self, max=None, index=None, after=None, before=None):
+        if max is not None:
+            max = int(max)
+            assert(max >= 0)
+            self.max = max
+
+        if index is not None:
+            assert(after is None and before is None)
+            index = int(index)
+            assert(index >= 0)
+            self.index = index
+
+        if after is not None:
+            assert(before is None)
+            assert(isinstance(after, unicode))
+            self.after = after
+
+        if before is not None:
+            assert(isinstance(before, unicode))
+            self.before = before
+
+    @classmethod
+    def parse(cls, element):
+        """Parse the given request element.
+
+        @param element: request containing a set element.
+        @type element: L{domish.Element}
+
+        @return: RSMRequest instance.
+        @rtype: L{RSMRequest}
+        """
+        try:
+            set_elt = domish.generateElementsQNamed(element.elements(),
+                                                    name="set",
+                                                    uri=NS_RSM).next()
+        except StopIteration:
+            return None
+
+        request = RSMRequest()
+        for elt in list(set_elt.elements()):
+            if elt.name in ('before', 'after'):
+                setattr(request, elt.name, ''.join(elt.children))
+            elif elt.name in ('max', 'index'):
+                setattr(request, elt.name, int(''.join(elt.children)))
+
+        if request.max is None:
+            log.err("RSM request is missing its 'max' element!")
+
+        return request
+
+    def render(self, element=None):
+        """Render a RSM page request, eventually embed it in the given element.
+
+        @param element: request element.
+        @type element: L{domish.Element}
+
+        @return: RSM request element.
+        @rtype: L{domish.Element}
+        """
+        if element and element.name == 'pubsub' and hasattr(element, 'items'):
+            element.items.attributes['max_items'] = unicode(self.max)
+
+        set_elt = domish.Element((NS_RSM, 'set'))
+        set_elt.addElement('max').addContent(unicode(self.max))
+
+        if self.index is not None:
+            set_elt.addElement('index').addContent(unicode(self.index))
+
+        if self.before is not None:
+            if self.before == '':  # request the last page
+                set_elt.addElement('before')
+            else:
+                set_elt.addElement('before').addContent(self.before)
+
+        if self.after is not None:
+            set_elt.addElement('after').addContent(self.after)
+
+        if element:
+            element.addChild(set_elt)
+
+        return set_elt
+
+
+class RSMResponse():
+    """
+    A Result Set Management response.
+
+    @ivar count: total number of items.
+    @itype count: C{int}
+
+    @ivar index: starting index of the returned page.
+    @itype index: C{int}
+
+    @ivar first: ID of the first element of the returned page.
+    @itype first: C{unicode}
+
+    @ivar last: ID of the last element of the returned page.
+    @itype last: C{unicode}
+    """
+
+    count = 0
+    index = None
+    first = None
+    last = None
+
+    def __init__(self, count=None, index=None, first=None, last=None):
+        if count is not None:
+            assert(isinstance(count, int) and count >= 0)
+            self.count = count
+
+        if index is not None:
+            assert(isinstance(index, int) and index >= 0)
+            self.index = index
+            assert(isinstance(first, unicode))
+            self.first = first
+            assert(isinstance(last, unicode))
+            self.last = last
+        else:
+            assert(first is None and last is None)
+
+    @classmethod
+    def parse(cls, element):
+        """Parse the given response element.
+
+        @param element: response element.
+        @type element: L{domish.Element}
+
+        @return: RSMResponse instance.
+        @rtype: L{RSMResponse}
+        """
+        try:
+            set_elt = domish.generateElementsQNamed(element.elements(),
+                                                    name="set",
+                                                    uri=NS_RSM).next()
+        except StopIteration:
+            return None
+
+        response = RSMResponse()
+        for elt in list(set_elt.elements()):
+            if elt.name in ('first', 'last'):
+                setattr(response, elt.name, ''.join(elt.children))
+                if elt.name == 'first':
+                    response.index = int(elt.getAttribute("index"))
+            elif elt.name == 'count':
+                response.count = int(''.join(elt.children))
+
+        if response.count is None:
+            log.err("RSM response is missing its 'count' element!")
+
+        return response
+
+    def render(self, parent=None):
+        """Render a RSM page response, eventually embed it in the given element.
+
+        @param element: response element.
+        @type element:  L{domish.Element}
+
+        @return: RSM request element.
+        @rtype: L{domish.Element}
+        """
+        set_elt = domish.Element((NS_RSM, 'set'))
+        set_elt.addElement('count').addContent(unicode(self.count))
+
+        if self.index is not None:
+            first_elt = set_elt.addElement('first')
+            first_elt.addContent(self.first)
+            first_elt['index'] = unicode(self.index)
+
+            set_elt.addElement('last').addContent(self.last)
+
+        if parent:
+            parent.addChild(set_elt)
+
+        return set_elt
+
+    def toDict(self):
+        """Return a dict representation of the object.
+
+        @return: a dict of strings.
+        @rtype: C{dict} binding C{unicode} to C{unicode}
+        """
+        result = {}
+        for attr in ('count', 'index', 'first', 'last'):
+            value = getattr(self, attr)
+            if value is not None:
+                result[attr] = unicode(value)
+        return result
+
+
+class PubSubRequest(pubsub.PubSubRequest):
+    """PubSubRequest extension to handle RSM.
+
+    @ivar rsm: RSM request instance.
+    @type rsm: L{RSMRequest}
+    """
+
+    rsm = None
+
+    def __init__(self, verb=None):
+        pubsub.PubSubRequest.__init__(self, verb)
+        self._parameters = copy.deepcopy(pubsub.PubSubRequest._parameters)
+        self._parameters['items'].append('rsm')
+
+    def _parse_rsm(self, verbElement):
+        self.rsm = RSMRequest.parse(verbElement.parent)
+
+    def _render_rsm(self, verbElement):
+        if self.rsm:
+            self.rsm.render(verbElement.parent)
+
+
+class PubSubClient(pubsub.PubSubClient):
+    """PubSubClient extension to handle RSM."""
+
+    _rsm_responses = {}
+
+    def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None,
+              subscriptionIdentifier=None, sender=None, ext_data=None):
+        """
+        Retrieve previously published items from a publish subscribe node.
+
+        @param service: The publish subscribe service that keeps the node.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+
+        @param nodeIdentifier: The identifier of the node.
+        @type nodeIdentifier: C{unicode}
+
+        @param maxItems: Optional limit on the number of retrieved items.
+        @type maxItems: C{int}
+
+        @param itemIdentifiers: Identifiers of the items to be retrieved.
+        @type itemIdentifiers: C{set}
+
+        @param subscriptionIdentifier: Optional subscription identifier. In
+            case the node has been subscribed to multiple times, this narrows
+            the results to the specific subscription.
+        @type subscriptionIdentifier: C{unicode}
+
+        @param ext_data: extension data.
+        @type ext_data: L{dict}
+
+        @return: a Deferred that fires a C{list} of L{domish.Element}.
+        @rtype: L{defer.Deferred}
+        """
+        request = PubSubRequest('items')  # that's a rsm.PubSubRequest instance
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        if maxItems:
+            request.maxItems = str(int(maxItems))
+        request.subscriptionIdentifier = subscriptionIdentifier
+        request.sender = sender
+        request.itemIdentifiers = itemIdentifiers
+        if ext_data and 'rsm' in ext_data:
+            request.rsm = ext_data['rsm']
+
+        def cb(iq):
+            items = []
+            if iq.pubsub.items:
+                for element in iq.pubsub.items.elements():
+                    if element.uri == pubsub.NS_PUBSUB and element.name == 'item':
+                        items.append(element)
+
+            if request.rsm:
+                response = RSMResponse.parse(iq.pubsub)
+                if response is not None:
+                    self._rsm_responses[ext_data['id']] = response
+            return items
+
+        d = request.send(self.xmlstream)
+        d.addCallback(cb)
+        return d
+
+    def getRSMResponse(self, id):
+        """
+        Post-retrieve the RSM response data after items retrieval is done.
+
+        @param id: extension data ID
+        @type id: C{unicode}
+
+        @return: dict representation of the RSM response.
+        @rtype: C{dict} of C{unicode}
+        """
+        # This method exists to not modify the return value of self.items.
+        if id not in self._rsm_responses:
+            return {}
+        result = self._rsm_responses[id].toDict()
+        del self._rsm_responses[id]
+        return result
+
+
+class PubSubService(pubsub.PubSubService):
+    """PubSubService extension to handle RSM."""
+
+    _request_class = PubSubRequest
+
+    def _toResponse_items(self, result, resource, request):
+        response = pubsub.PubSubService._toResponse_items(self, result,
+                                                          resource, request)
+        set_elts = [elt for elt in result if elt.name == 'set']
+        if set_elts:
+            assert(len(set_elts) == 1)
+            response.addChild(set_elts[0])
+
+        return response