Mercurial > libervia-backend
diff src/tmp/wokkel/pubsub.py @ 1266:9141bde7ff31
use sat.tmp.wokkel as a buffer module until the changes are integrated to wokkel
author | souliane <souliane@mailoo.org> |
---|---|
date | Mon, 15 Dec 2014 12:46:58 +0100 (2014-12-15) |
parents | |
children | be1fccf4854d |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/tmp/wokkel/pubsub.py Mon Dec 15 12:46:58 2014 +0100 @@ -0,0 +1,1568 @@ +# -*- test-case-name: wokkel.test.test_pubsub -*- +# +# Copyright (c) Ralph Meijer. +# See LICENSE for details. + +""" +XMPP publish-subscribe protocol. + +This protocol is specified in +U{XEP-0060<http://xmpp.org/extensions/xep-0060.html>}. +""" + +from zope.interface import implements + +from twisted.internet import defer +from twisted.python import log +from twisted.words.protocols.jabber import jid, error +from twisted.words.xish import domish + +from wokkel import disco, data_form, generic, shim +from wokkel.compat import IQ +from wokkel.subprotocols import IQHandlerMixin, XMPPHandler +from wokkel.iwokkel import IPubSubClient, IPubSubService, IPubSubResource + +# Iq get and set XPath queries +IQ_GET = '/iq[@type="get"]' +IQ_SET = '/iq[@type="set"]' + +# Publish-subscribe namespaces +NS_PUBSUB = 'http://jabber.org/protocol/pubsub' +NS_PUBSUB_EVENT = NS_PUBSUB + '#event' +NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' +NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" +NS_PUBSUB_NODE_CONFIG = NS_PUBSUB + "#node_config" +NS_PUBSUB_META_DATA = NS_PUBSUB + "#meta-data" +NS_PUBSUB_SUBSCRIBE_OPTIONS = NS_PUBSUB + "#subscribe_options" + +# XPath to match pubsub requests +PUBSUB_REQUEST = '/iq[@type="get" or @type="set"]/' + \ + 'pubsub[@xmlns="' + NS_PUBSUB + '" or ' + \ + '@xmlns="' + NS_PUBSUB_OWNER + '"]' + +class SubscriptionPending(Exception): + """ + Raised when the requested subscription is pending acceptance. + """ + + + +class SubscriptionUnconfigured(Exception): + """ + Raised when the requested subscription needs to be configured before + becoming active. + """ + + + +class PubSubError(error.StanzaError): + """ + Exception with publish-subscribe specific condition. + """ + def __init__(self, condition, pubsubCondition, feature=None, text=None): + appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) + if feature: + appCondition['feature'] = feature + error.StanzaError.__init__(self, condition, + text=text, + appCondition=appCondition) + + + +class BadRequest(error.StanzaError): + """ + Bad request stanza error. + """ + def __init__(self, pubsubCondition=None, text=None): + if pubsubCondition: + appCondition = domish.Element((NS_PUBSUB_ERRORS, pubsubCondition)) + else: + appCondition = None + error.StanzaError.__init__(self, 'bad-request', + text=text, + appCondition=appCondition) + + + +class Unsupported(PubSubError): + def __init__(self, feature, text=None): + self.feature = feature + PubSubError.__init__(self, 'feature-not-implemented', + 'unsupported', + feature, + text) + + def __str__(self): + message = PubSubError.__str__(self) + message += ', feature %r' % self.feature + return message + + +class Subscription(object): + """ + A subscription to a node. + + @ivar nodeIdentifier: The identifier of the node subscribed to. The root + node is denoted by C{None}. + @type nodeIdentifier: C{unicode} + + @ivar subscriber: The subscribing entity. + @type subscriber: L{jid.JID} + + @ivar state: The subscription state. One of C{'subscribed'}, C{'pending'}, + C{'unconfigured'}. + @type state: C{unicode} + + @ivar options: Optional list of subscription options. + @type options: C{dict} + + @ivar subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + """ + + def __init__(self, nodeIdentifier, subscriber, state, options=None, + subscriptionIdentifier=None): + self.nodeIdentifier = nodeIdentifier + self.subscriber = subscriber + self.state = state + self.options = options or {} + self.subscriptionIdentifier = subscriptionIdentifier + + + @staticmethod + def fromElement(element): + return Subscription( + element.getAttribute('node'), + jid.JID(element.getAttribute('jid')), + element.getAttribute('subscription'), + subscriptionIdentifier=element.getAttribute('subid')) + + + def toElement(self, defaultUri=None): + """ + Return the DOM representation of this subscription. + + @rtype: L{domish.Element} + """ + element = domish.Element((defaultUri, 'subscription')) + if self.nodeIdentifier: + element['node'] = self.nodeIdentifier + element['jid'] = unicode(self.subscriber) + element['subscription'] = self.state + if self.subscriptionIdentifier: + element['subid'] = self.subscriptionIdentifier + return element + + + +class Item(domish.Element): + """ + Publish subscribe item. + + This behaves like an object providing L{domish.IElement}. + + Item payload can be added using C{addChild} or C{addRawXml}, or using the + C{payload} keyword argument to C{__init__}. + """ + + def __init__(self, id=None, payload=None): + """ + @param id: optional item identifier + @type id: C{unicode} + @param payload: optional item payload. Either as a domish element, or + as serialized XML. + @type payload: object providing L{domish.IElement} or C{unicode}. + """ + + domish.Element.__init__(self, (None, 'item')) + if id is not None: + self['id'] = id + if payload is not None: + if isinstance(payload, basestring): + self.addRawXml(payload) + else: + self.addChild(payload) + + + +class PubSubRequest(generic.Stanza): + """ + A publish-subscribe request. + + The set of instance variables used depends on the type of request. If + a variable is not applicable or not passed in the request, its value is + C{None}. + + @ivar verb: The type of publish-subscribe request. See C{_requestVerbMap}. + @type verb: C{str}. + + @ivar affiliations: Affiliations to be modified. + @type affiliations: C{set} + + @ivar items: The items to be published, as L{domish.Element}s. + @type items: C{list} + + @ivar itemIdentifiers: Identifiers of the items to be retrieved or + retracted. + @type itemIdentifiers: C{set} + + @ivar maxItems: Maximum number of items to retrieve. + @type maxItems: C{int}. + + @ivar nodeIdentifier: Identifier of the node the request is about. + @type nodeIdentifier: C{unicode} + + @ivar nodeType: The type of node that should be created, or for which the + configuration is retrieved. C{'leaf'} or C{'collection'}. + @type nodeType: C{str} + + @ivar options: Configurations options for nodes, subscriptions and publish + requests. + @type options: L{data_form.Form} + + @ivar subscriber: The subscribing entity. + @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @ivar subscriptionIdentifier: Identifier for a specific subscription. + @type subscriptionIdentifier: C{unicode} + + @ivar subscriptions: Subscriptions to be modified, as a set of + L{Subscription}. + @type subscriptions: C{set} + + @ivar affiliations: Affiliations to be modified, as a dictionary of entity + (L{JID<twisted.words.protocols.jabber.jid.JID>} to affiliation + (C{unicode}). + @type affiliations: C{dict} + """ + + verb = None + + affiliations = None + items = None + itemIdentifiers = None + maxItems = None + nodeIdentifier = None + nodeType = None + options = None + subscriber = None + subscriptionIdentifier = None + subscriptions = None + affiliations = None + + # Map request iq type and subelement name to request verb + _requestVerbMap = { + ('set', NS_PUBSUB, 'publish'): 'publish', + ('set', NS_PUBSUB, 'subscribe'): 'subscribe', + ('set', NS_PUBSUB, 'unsubscribe'): 'unsubscribe', + ('get', NS_PUBSUB, 'options'): 'optionsGet', + ('set', NS_PUBSUB, 'options'): 'optionsSet', + ('get', NS_PUBSUB, 'subscriptions'): 'subscriptions', + ('get', NS_PUBSUB, 'affiliations'): 'affiliations', + ('set', NS_PUBSUB, 'create'): 'create', + ('get', NS_PUBSUB_OWNER, 'default'): 'default', + ('get', NS_PUBSUB_OWNER, 'configure'): 'configureGet', + ('set', NS_PUBSUB_OWNER, 'configure'): 'configureSet', + ('get', NS_PUBSUB, 'items'): 'items', + ('set', NS_PUBSUB, 'retract'): 'retract', + ('set', NS_PUBSUB_OWNER, 'purge'): 'purge', + ('set', NS_PUBSUB_OWNER, 'delete'): 'delete', + ('get', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsGet', + ('set', NS_PUBSUB_OWNER, 'affiliations'): 'affiliationsSet', + ('get', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsGet', + ('set', NS_PUBSUB_OWNER, 'subscriptions'): 'subscriptionsSet', + } + + # Map request verb to request iq type and subelement name + _verbRequestMap = dict(((v, k) for k, v in _requestVerbMap.iteritems())) + + # Map request verb to parameter handler names + _parameters = { + 'publish': ['node', 'items'], + 'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'], + 'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'], + 'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'], + 'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'], + 'subscriptions': [], + 'affiliations': [], + 'create': ['nodeOrNone', 'configureOrNone'], + 'default': ['default'], + 'configureGet': ['nodeOrEmpty'], + 'configureSet': ['nodeOrEmpty', 'configure'], + 'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'], + 'retract': ['node', 'itemIdentifiers'], + 'purge': ['node'], + 'delete': ['node'], + 'affiliationsGet': ['nodeOrEmpty'], + 'affiliationsSet': ['nodeOrEmpty', 'affiliations'], + 'subscriptionsGet': ['nodeOrEmpty'], + 'subscriptionsSet': [], + } + + def __init__(self, verb=None): + self.verb = verb + + + def _parse_node(self, verbElement): + """ + Parse the required node identifier out of the verbElement. + """ + try: + self.nodeIdentifier = verbElement["node"] + except KeyError: + raise BadRequest('nodeid-required') + + + def _render_node(self, verbElement): + """ + Render the required node identifier on the verbElement. + """ + if not self.nodeIdentifier: + raise Exception("Node identifier is required") + + verbElement['node'] = self.nodeIdentifier + + + def _parse_nodeOrEmpty(self, verbElement): + """ + Parse the node identifier out of the verbElement. May be empty. + """ + self.nodeIdentifier = verbElement.getAttribute("node", '') + + + def _render_nodeOrEmpty(self, verbElement): + """ + Render the node identifier on the verbElement. May be empty. + """ + if self.nodeIdentifier: + verbElement['node'] = self.nodeIdentifier + + + def _parse_nodeOrNone(self, verbElement): + """ + Parse the optional node identifier out of the verbElement. + """ + self.nodeIdentifier = verbElement.getAttribute("node") + + + def _render_nodeOrNone(self, verbElement): + """ + Render the optional node identifier on the verbElement. + """ + if self.nodeIdentifier: + verbElement['node'] = self.nodeIdentifier + + + def _parse_items(self, verbElement): + """ + Parse items out of the verbElement for publish requests. + """ + self.items = [] + for element in verbElement.elements(): + if element.uri == NS_PUBSUB and element.name == 'item': + self.items.append(element) + + + def _render_items(self, verbElement): + """ + Render items into the verbElement for publish requests. + """ + if self.items: + for item in self.items: + item.uri = NS_PUBSUB + verbElement.addChild(item) + + + def _parse_jid(self, verbElement): + """ + Parse subscriber out of the verbElement for un-/subscribe requests. + """ + try: + self.subscriber = jid.internJID(verbElement["jid"]) + except KeyError: + raise BadRequest('jid-required') + + + def _render_jid(self, verbElement): + """ + Render subscriber into the verbElement for un-/subscribe requests. + """ + verbElement['jid'] = self.subscriber.full() + + + def _parse_default(self, verbElement): + """ + Parse node type out of a request for the default node configuration. + """ + form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) + if form is not None and form.formType == 'submit': + values = form.getValues() + self.nodeType = values.get('pubsub#node_type', 'leaf') + else: + self.nodeType = 'leaf' + + + def _parse_configure(self, verbElement): + """ + Parse options out of a request for setting the node configuration. + """ + form = data_form.findForm(verbElement, NS_PUBSUB_NODE_CONFIG) + if form is not None: + if form.formType in ('submit', 'cancel'): + self.options = form + else: + raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) + else: + raise BadRequest(text="Missing configuration form") + + + def _parse_configureOrNone(self, verbElement): + """ + Parse optional node configuration form in create request. + """ + for element in verbElement.parent.elements(): + if element.uri == NS_PUBSUB and element.name == 'configure': + form = data_form.findForm(element, NS_PUBSUB_NODE_CONFIG) + if form is not None: + if form.formType != 'submit': + raise BadRequest(text=u"Unexpected form type '%s'" % + form.formType) + else: + form = data_form.Form('submit', + formNamespace=NS_PUBSUB_NODE_CONFIG) + self.options = form + + + def _render_configureOrNone(self, verbElement): + """ + Render optional node configuration form in create request. + """ + if self.options is not None: + configure = verbElement.parent.addElement('configure') + configure.addChild(self.options.toElement()) + + + def _parse_itemIdentifiers(self, verbElement): + """ + Parse item identifiers out of items and retract requests. + """ + self.itemIdentifiers = [] + for element in verbElement.elements(): + if element.uri == NS_PUBSUB and element.name == 'item': + try: + self.itemIdentifiers.append(element["id"]) + except KeyError: + raise BadRequest() + + + def _render_itemIdentifiers(self, verbElement): + """ + Render item identifiers into items and retract requests. + """ + if self.itemIdentifiers: + for itemIdentifier in self.itemIdentifiers: + item = verbElement.addElement('item') + item['id'] = itemIdentifier + + + def _parse_maxItems(self, verbElement): + """ + Parse maximum items out of an items request. + """ + value = verbElement.getAttribute('max_items') + + if value: + try: + self.maxItems = int(value) + except ValueError: + raise BadRequest(text="Field max_items requires a positive " + + "integer value") + + + def _render_maxItems(self, verbElement): + """ + Render maximum items into an items request. + """ + if self.maxItems: + verbElement['max_items'] = unicode(self.maxItems) + + + def _parse_subidOrNone(self, verbElement): + """ + Parse subscription identifier out of a request. + """ + self.subscriptionIdentifier = verbElement.getAttribute("subid") + + + def _render_subidOrNone(self, verbElement): + """ + Render subscription identifier into a request. + """ + if self.subscriptionIdentifier: + verbElement['subid'] = self.subscriptionIdentifier + + + def _parse_options(self, verbElement): + """ + Parse options form out of a subscription options request. + """ + form = data_form.findForm(verbElement, NS_PUBSUB_SUBSCRIBE_OPTIONS) + if form is not None: + if form.formType in ('submit', 'cancel'): + self.options = form + else: + raise BadRequest(text=u"Unexpected form type '%s'" % form.formType) + else: + raise BadRequest(text="Missing options form") + + + + def _render_options(self, verbElement): + verbElement.addChild(self.options.toElement()) + + + def _parse_optionsWithSubscribe(self, verbElement): + for element in verbElement.parent.elements(): + if element.name == 'options' and element.uri == NS_PUBSUB: + form = data_form.findForm(element, + NS_PUBSUB_SUBSCRIBE_OPTIONS) + if form is not None: + if form.formType != 'submit': + raise BadRequest(text=u"Unexpected form type '%s'" % + form.formType) + else: + form = data_form.Form('submit', + formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) + self.options = form + + + def _render_optionsWithSubscribe(self, verbElement): + if self.options is not None: + optionsElement = verbElement.parent.addElement('options') + self._render_options(optionsElement) + + + def _parse_affiliations(self, verbElement): + self.affiliations = {} + for element in verbElement.elements(): + if (element.uri == NS_PUBSUB_OWNER and + element.name == 'affiliation'): + try: + entity = jid.internJID(element['jid']).userhostJID() + except KeyError: + raise BadRequest(text='Missing jid attribute') + + if entity in self.affiliations: + raise BadRequest(text='Multiple affiliations for an entity') + + try: + affiliation = element['affiliation'] + except KeyError: + raise BadRequest(text='Missing affiliation attribute') + + self.affiliations[entity] = affiliation + + + def parseElement(self, element): + """ + Parse the publish-subscribe verb and parameters out of a request. + """ + generic.Stanza.parseElement(self, element) + + verbs = [] + verbElements = [] + for child in element.pubsub.elements(): + key = (self.stanzaType, child.uri, child.name) + try: + verb = self._requestVerbMap[key] + except KeyError: + continue + + verbs.append(verb) + verbElements.append(child) + + if not verbs: + raise NotImplementedError() + + if len(verbs) > 1: + if 'optionsSet' in verbs and 'subscribe' in verbs: + self.verb = 'subscribe' + verbElement = verbElements[verbs.index('subscribe')] + else: + raise NotImplementedError() + else: + self.verb = verbs[0] + verbElement = verbElements[0] + + for parameter in self._parameters[self.verb]: + getattr(self, '_parse_%s' % parameter)(verbElement) + + + + def send(self, xs): + """ + Send this request to its recipient. + + This renders all of the relevant parameters for this specific + requests into an L{IQ}, and invoke its C{send} method. + This returns a deferred that fires upon reception of a response. See + L{IQ} for details. + + @param xs: The XML stream to send the request on. + @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream} + @rtype: L{defer.Deferred}. + """ + + try: + (self.stanzaType, + childURI, + childName) = self._verbRequestMap[self.verb] + except KeyError: + raise NotImplementedError() + + iq = IQ(xs, self.stanzaType) + iq.addElement((childURI, 'pubsub')) + verbElement = iq.pubsub.addElement(childName) + + if self.sender: + iq['from'] = self.sender.full() + if self.recipient: + iq['to'] = self.recipient.full() + + for parameter in self._parameters[self.verb]: + getattr(self, '_render_%s' % parameter)(verbElement) + + return iq.send() + + + +class PubSubEvent(object): + """ + A publish subscribe event. + + @param sender: The entity from which the notification was received. + @type sender: L{jid.JID} + @param recipient: The entity to which the notification was sent. + @type recipient: L{wokkel.pubsub.ItemsEvent} + @param nodeIdentifier: Identifier of the node the event pertains to. + @type nodeIdentifier: C{unicode} + @param headers: SHIM headers, see L{wokkel.shim.extractHeaders}. + @type headers: C{dict} + """ + + def __init__(self, sender, recipient, nodeIdentifier, headers): + self.sender = sender + self.recipient = recipient + self.nodeIdentifier = nodeIdentifier + self.headers = headers + + + +class ItemsEvent(PubSubEvent): + """ + A publish-subscribe event that signifies new, updated and retracted items. + + @param items: List of received items as domish elements. + @type items: C{list} of L{domish.Element} + """ + + def __init__(self, sender, recipient, nodeIdentifier, items, headers): + PubSubEvent.__init__(self, sender, recipient, nodeIdentifier, headers) + self.items = items + + + +class DeleteEvent(PubSubEvent): + """ + A publish-subscribe event that signifies the deletion of a node. + """ + + redirectURI = None + + + +class PurgeEvent(PubSubEvent): + """ + A publish-subscribe event that signifies the purging of a node. + """ + + + +class PubSubClient(XMPPHandler): + """ + Publish subscribe client protocol. + """ + + implements(IPubSubClient) + + def connectionInitialized(self): + self.xmlstream.addObserver('/message/event[@xmlns="%s"]' % + NS_PUBSUB_EVENT, self._onEvent) + + + def _onEvent(self, message): + if message.getAttribute('type') == 'error': + return + + try: + sender = jid.JID(message["from"]) + recipient = jid.JID(message["to"]) + except KeyError: + return + + actionElement = None + for element in message.event.elements(): + if element.uri == NS_PUBSUB_EVENT: + actionElement = element + + if not actionElement: + return + + eventHandler = getattr(self, "_onEvent_%s" % actionElement.name, None) + + if eventHandler: + headers = shim.extractHeaders(message) + eventHandler(sender, recipient, actionElement, headers) + message.handled = True + + + def _onEvent_items(self, sender, recipient, action, headers): + nodeIdentifier = action["node"] + + items = [element for element in action.elements() + if element.name in ('item', 'retract')] + + event = ItemsEvent(sender, recipient, nodeIdentifier, items, headers) + self.itemsReceived(event) + + + def _onEvent_delete(self, sender, recipient, action, headers): + nodeIdentifier = action["node"] + event = DeleteEvent(sender, recipient, nodeIdentifier, headers) + if action.redirect: + event.redirectURI = action.redirect.getAttribute('uri') + self.deleteReceived(event) + + + def _onEvent_purge(self, sender, recipient, action, headers): + nodeIdentifier = action["node"] + event = PurgeEvent(sender, recipient, nodeIdentifier, headers) + self.purgeReceived(event) + + + def itemsReceived(self, event): + pass + + + def deleteReceived(self, event): + pass + + + def purgeReceived(self, event): + pass + + + def createNode(self, service, nodeIdentifier=None, options=None, + sender=None): + """ + Create a publish subscribe node. + + @param service: The publish subscribe service to create the node at. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + @param nodeIdentifier: Optional suggestion for the id of the node. + @type nodeIdentifier: C{unicode} + @param options: Optional node configuration options. + @type options: C{dict} + """ + request = PubSubRequest('create') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.sender = sender + + if options: + form = data_form.Form(formType='submit', + formNamespace=NS_PUBSUB_NODE_CONFIG) + form.makeFields(options) + request.options = form + + def cb(iq): + try: + new_node = iq.pubsub.create["node"] + except AttributeError: + # the suggested node identifier was accepted + new_node = nodeIdentifier + return new_node + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + + def deleteNode(self, service, nodeIdentifier, sender=None): + """ + Delete a publish subscribe node. + + @param service: The publish subscribe service to delete the node from. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + """ + request = PubSubRequest('delete') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.sender = sender + return request.send(self.xmlstream) + + + def subscribe(self, service, nodeIdentifier, subscriber, + options=None, sender=None): + """ + Subscribe to a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity to subscribe to the node. This entity + will get notifications of new published items. + @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param options: Subscription options. + @type options: C{dict} + + @return: Deferred that fires with L{Subscription} or errbacks with + L{SubscriptionPending} or L{SubscriptionUnconfigured}. + @rtype: L{defer.Deferred} + """ + request = PubSubRequest('subscribe') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.sender = sender + + if options: + form = data_form.Form(formType='submit', + formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) + form.makeFields(options) + request.options = form + + def cb(iq): + subscription = Subscription.fromElement(iq.pubsub.subscription) + + if subscription.state == 'pending': + raise SubscriptionPending() + elif subscription.state == 'unconfigured': + raise SubscriptionUnconfigured() + else: + # we assume subscription == 'subscribed' + # any other value would be invalid, but that should have + # yielded a stanza error. + return subscription + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + + def unsubscribe(self, service, nodeIdentifier, subscriber, + subscriptionIdentifier=None, sender=None): + """ + Unsubscribe from a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity to unsubscribe from the node. + @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + """ + request = PubSubRequest('unsubscribe') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + return request.send(self.xmlstream) + + + def publish(self, service, nodeIdentifier, items=None, sender=None): + """ + Publish to a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + @param items: Optional list of L{Item}s to publish. + @type items: C{list} + """ + request = PubSubRequest('publish') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.items = items + request.sender = sender + return request.send(self.xmlstream) + + + def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, + subscriptionIdentifier=None, sender=None): + """ + Retrieve previously published items from a publish subscribe node. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param maxItems: Optional limit on the number of retrieved items. + @type maxItems: C{int} + + @param itemIdentifiers: Identifiers of the items to be retrieved. + @type itemIdentifiers: C{set} + + @param subscriptionIdentifier: Optional subscription identifier. In + case the node has been subscribed to multiple times, this narrows + the results to the specific subscription. + @type subscriptionIdentifier: C{unicode} + """ + request = PubSubRequest('items') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + if maxItems: + request.maxItems = str(int(maxItems)) + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + request.itemIdentifiers = itemIdentifiers + + def cb(iq): + items = [] + for element in iq.pubsub.items.elements(): + if element.uri == NS_PUBSUB and element.name == 'item': + items.append(element) + return items + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None): + """ + Retract items from a publish subscribe node. + + @param service: The publish subscribe service to delete the node from. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + @param itemIdentifiers: Identifiers of the items to be retracted. + @type itemIdentifiers: C{set} + """ + request = PubSubRequest('retract') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.itemIdentifiers = itemIdentifiers + request.sender = sender + return request.send(self.xmlstream) + + def getOptions(self, service, nodeIdentifier, subscriber, + subscriptionIdentifier=None, sender=None): + """ + Get subscription options. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity subscribed to the node. + @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + + @rtype: L{data_form.Form} + """ + request = PubSubRequest('optionsGet') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + + def cb(iq): + form = data_form.findForm(iq.pubsub.options, + NS_PUBSUB_SUBSCRIBE_OPTIONS) + form.typeCheck() + return form + + d = request.send(self.xmlstream) + d.addCallback(cb) + return d + + + def setOptions(self, service, nodeIdentifier, subscriber, + options, subscriptionIdentifier=None, sender=None): + """ + Set subscription options. + + @param service: The publish subscribe service that keeps the node. + @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param nodeIdentifier: The identifier of the node. + @type nodeIdentifier: C{unicode} + + @param subscriber: The entity subscribed to the node. + @type subscriber: L{JID<twisted.words.protocols.jabber.jid.JID>} + + @param options: Subscription options. + @type options: C{dict}. + + @param subscriptionIdentifier: Optional subscription identifier. + @type subscriptionIdentifier: C{unicode} + """ + request = PubSubRequest('optionsSet') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.subscriber = subscriber + request.subscriptionIdentifier = subscriptionIdentifier + request.sender = sender + + form = data_form.Form(formType='submit', + formNamespace=NS_PUBSUB_SUBSCRIBE_OPTIONS) + form.makeFields(options) + request.options = form + + d = request.send(self.xmlstream) + return d + + + +class PubSubService(XMPPHandler, IQHandlerMixin): + """ + Protocol implementation for a XMPP Publish Subscribe Service. + + The word Service here is used as taken from the Publish Subscribe + specification. It is the party responsible for keeping nodes and their + subscriptions, and sending out notifications. + + Methods from the L{IPubSubService} interface that are called as a result + of an XMPP request may raise exceptions. Alternatively the deferred + returned by these methods may have their errback called. These are handled + as follows: + + - If the exception is an instance of L{error.StanzaError}, an error + response iq is returned. + - Any other exception is reported using L{log.msg}. An error response + with the condition C{internal-server-error} is returned. + + The default implementation of said methods raises an L{Unsupported} + exception and are meant to be overridden. + + @ivar discoIdentity: Service discovery identity as a dictionary with + keys C{'category'}, C{'type'} and C{'name'}. + @ivar pubSubFeatures: List of supported publish-subscribe features for + service discovery, as C{str}. + @type pubSubFeatures: C{list} or C{None} + """ + + implements(IPubSubService, disco.IDisco) + + iqHandlers = { + '/*': '_onPubSubRequest', + } + + _legacyHandlers = { + 'publish': ('publish', ['sender', 'recipient', + 'nodeIdentifier', 'items']), + 'subscribe': ('subscribe', ['sender', 'recipient', + 'nodeIdentifier', 'subscriber']), + 'unsubscribe': ('unsubscribe', ['sender', 'recipient', + 'nodeIdentifier', 'subscriber']), + 'subscriptions': ('subscriptions', ['sender', 'recipient']), + 'affiliations': ('affiliations', ['sender', 'recipient']), + 'create': ('create', ['sender', 'recipient', 'nodeIdentifier']), + 'getConfigurationOptions': ('getConfigurationOptions', []), + 'default': ('getDefaultConfiguration', + ['sender', 'recipient', 'nodeType']), + 'configureGet': ('getConfiguration', ['sender', 'recipient', + 'nodeIdentifier']), + 'configureSet': ('setConfiguration', ['sender', 'recipient', + 'nodeIdentifier', 'options']), + 'items': ('items', ['sender', 'recipient', 'nodeIdentifier', + 'maxItems', 'itemIdentifiers']), + 'retract': ('retract', ['sender', 'recipient', 'nodeIdentifier', + 'itemIdentifiers']), + 'purge': ('purge', ['sender', 'recipient', 'nodeIdentifier']), + 'delete': ('delete', ['sender', 'recipient', 'nodeIdentifier']), + } + + _request_class = PubSubRequest + + hideNodes = False + + def __init__(self, resource=None): + self.resource = resource + self.discoIdentity = {'category': 'pubsub', + 'type': 'service', + 'name': 'Generic Publish-Subscribe Service'} + + self.pubSubFeatures = [] + + + def connectionMade(self): + self.xmlstream.addObserver(PUBSUB_REQUEST, self.handleRequest) + + + def getDiscoInfo(self, requestor, target, nodeIdentifier=''): + def toInfo(nodeInfo): + if not nodeInfo: + return + + (nodeType, metaData) = nodeInfo['type'], nodeInfo['meta-data'] + info.append(disco.DiscoIdentity('pubsub', nodeType)) + if metaData: + form = data_form.Form(formType="result", + formNamespace=NS_PUBSUB_META_DATA) + form.addField( + data_form.Field( + var='pubsub#node_type', + value=nodeType, + label='The type of node (collection or leaf)' + ) + ) + + for metaDatum in metaData: + form.addField(data_form.Field.fromDict(metaDatum)) + + info.append(form) + + return + + info = [] + + request = PubSubRequest('discoInfo') + + if self.resource is not None: + resource = self.resource.locateResource(request) + identity = resource.discoIdentity + features = resource.features + getInfo = resource.getInfo + else: + category = self.discoIdentity['category'] + idType = self.discoIdentity['type'] + name = self.discoIdentity['name'] + identity = disco.DiscoIdentity(category, idType, name) + features = self.pubSubFeatures + getInfo = self.getNodeInfo + + if not nodeIdentifier: + info.append(identity) + info.append(disco.DiscoFeature(disco.NS_DISCO_ITEMS)) + info.extend([disco.DiscoFeature("%s#%s" % (NS_PUBSUB, feature)) + for feature in features]) + + d = defer.maybeDeferred(getInfo, requestor, target, nodeIdentifier or '') + d.addCallback(toInfo) + d.addErrback(log.err) + d.addCallback(lambda _: info) + return d + + + def getDiscoItems(self, requestor, target, nodeIdentifier=''): + if self.hideNodes: + d = defer.succeed([]) + elif self.resource is not None: + request = PubSubRequest('discoInfo') + resource = self.resource.locateResource(request) + d = resource.getNodes(requestor, target, nodeIdentifier) + elif nodeIdentifier: + d = self.getNodes(requestor, target) + else: + d = defer.succeed([]) + + d.addCallback(lambda nodes: [disco.DiscoItem(target, node) + for node in nodes]) + return d + + + def _onPubSubRequest(self, iq): + request = self._request_class.fromElement(iq) + + if self.resource is not None: + resource = self.resource.locateResource(request) + else: + resource = self + + # Preprocess the request, knowing the handling resource + try: + preProcessor = getattr(self, '_preProcess_%s' % request.verb) + except AttributeError: + pass + else: + request = preProcessor(resource, request) + if request is None: + return defer.succeed(None) + + # Process the request itself, + if resource is not self: + try: + handler = getattr(resource, request.verb) + except AttributeError: + text = "Request verb: %s" % request.verb + return defer.fail(Unsupported('', text)) + + d = handler(request) + else: + try: + handlerName, argNames = self._legacyHandlers[request.verb] + except KeyError: + text = "Request verb: %s" % request.verb + return defer.fail(Unsupported('', text)) + + handler = getattr(self, handlerName) + args = [getattr(request, arg) for arg in argNames] + d = handler(*args) + + # If needed, translate the result into a response + try: + cb = getattr(self, '_toResponse_%s' % request.verb) + except AttributeError: + pass + else: + d.addCallback(cb, resource, request) + + return d + + + def _toResponse_subscribe(self, result, resource, request): + response = domish.Element((NS_PUBSUB, "pubsub")) + response.addChild(result.toElement(NS_PUBSUB)) + return response + + + def _toResponse_subscriptions(self, result, resource, request): + response = domish.Element((NS_PUBSUB, 'pubsub')) + subscriptions = response.addElement('subscriptions') + for subscription in result: + subscriptions.addChild(subscription.toElement(NS_PUBSUB)) + return response + + + def _toResponse_affiliations(self, result, resource, request): + response = domish.Element((NS_PUBSUB, 'pubsub')) + affiliations = response.addElement('affiliations') + + for nodeIdentifier, affiliation in result: + item = affiliations.addElement('affiliation') + item['node'] = nodeIdentifier + item['affiliation'] = affiliation + + return response + + + def _toResponse_create(self, result, resource, request): + if not request.nodeIdentifier or request.nodeIdentifier != result: + response = domish.Element((NS_PUBSUB, 'pubsub')) + create = response.addElement('create') + create['node'] = result + return response + else: + return None + + + def _formFromConfiguration(self, resource, values): + fieldDefs = resource.getConfigurationOptions() + form = data_form.Form(formType="form", + formNamespace=NS_PUBSUB_NODE_CONFIG) + form.makeFields(values, fieldDefs) + return form + + + def _checkConfiguration(self, resource, form): + fieldDefs = resource.getConfigurationOptions() + form.typeCheck(fieldDefs, filterUnknown=True) + + + def _preProcess_create(self, resource, request): + if request.options: + self._checkConfiguration(resource, request.options) + return request + + + def _preProcess_default(self, resource, request): + if request.nodeType not in ('leaf', 'collection'): + raise error.StanzaError('not-acceptable') + else: + return request + + + def _toResponse_default(self, options, resource, request): + response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) + default = response.addElement("default") + form = self._formFromConfiguration(resource, options) + default.addChild(form.toElement()) + return response + + + def _toResponse_configureGet(self, options, resource, request): + response = domish.Element((NS_PUBSUB_OWNER, "pubsub")) + configure = response.addElement("configure") + form = self._formFromConfiguration(resource, options) + configure.addChild(form.toElement()) + + if request.nodeIdentifier: + configure["node"] = request.nodeIdentifier + + return response + + + def _preProcess_configureSet(self, resource, request): + if request.options.formType == 'cancel': + return None + else: + self._checkConfiguration(resource, request.options) + return request + + + def _toResponse_items(self, result, resource, request): + response = domish.Element((NS_PUBSUB, 'pubsub')) + items = response.addElement('items') + items["node"] = request.nodeIdentifier + + for item in result: + if item.name == 'item': + item.uri = NS_PUBSUB + items.addChild(item) + + return response + + + def _createNotification(self, eventType, service, nodeIdentifier, + subscriber, subscriptions=None): + headers = [] + + if subscriptions: + for subscription in subscriptions: + if nodeIdentifier != subscription.nodeIdentifier: + headers.append(('Collection', subscription.nodeIdentifier)) + + message = domish.Element((None, "message")) + message["from"] = service.full() + message["to"] = subscriber.full() + event = message.addElement((NS_PUBSUB_EVENT, "event")) + + element = event.addElement(eventType) + element["node"] = nodeIdentifier + + if headers: + message.addChild(shim.Headers(headers)) + + return message + + + def _toResponse_affiliationsGet(self, result, resource, request): + response = domish.Element((NS_PUBSUB_OWNER, 'pubsub')) + affiliations = response.addElement('affiliations') + + if request.nodeIdentifier: + affiliations['node'] = request.nodeIdentifier + + for entity, affiliation in result.iteritems(): + item = affiliations.addElement('affiliation') + item['jid'] = entity.full() + item['affiliation'] = affiliation + + return response + + + # public methods + + def notifyPublish(self, service, nodeIdentifier, notifications): + for subscriber, subscriptions, items in notifications: + message = self._createNotification('items', service, + nodeIdentifier, subscriber, + subscriptions) + for item in items: + item.uri = NS_PUBSUB_EVENT + message.event.items.addChild(item) + self.send(message) + + + def notifyDelete(self, service, nodeIdentifier, subscribers, + redirectURI=None): + for subscriber in subscribers: + message = self._createNotification('delete', service, + nodeIdentifier, + subscriber) + if redirectURI: + redirect = message.event.delete.addElement('redirect') + redirect['uri'] = redirectURI + self.send(message) + + + def getNodeInfo(self, requestor, service, nodeIdentifier): + return None + + + def getNodes(self, requestor, service): + return [] + + + def publish(self, requestor, service, nodeIdentifier, items): + raise Unsupported('publish') + + + def subscribe(self, requestor, service, nodeIdentifier, subscriber): + raise Unsupported('subscribe') + + + def unsubscribe(self, requestor, service, nodeIdentifier, subscriber): + raise Unsupported('subscribe') + + + def subscriptions(self, requestor, service): + raise Unsupported('retrieve-subscriptions') + + + def affiliations(self, requestor, service): + raise Unsupported('retrieve-affiliations') + + + def create(self, requestor, service, nodeIdentifier): + raise Unsupported('create-nodes') + + + def getConfigurationOptions(self): + return {} + + + def getDefaultConfiguration(self, requestor, service, nodeType): + raise Unsupported('retrieve-default') + + + def getConfiguration(self, requestor, service, nodeIdentifier): + raise Unsupported('config-node') + + + def setConfiguration(self, requestor, service, nodeIdentifier, options): + raise Unsupported('config-node') + + + def items(self, requestor, service, nodeIdentifier, maxItems, + itemIdentifiers): + raise Unsupported('retrieve-items') + + + def retract(self, requestor, service, nodeIdentifier, itemIdentifiers): + raise Unsupported('retract-items') + + + def purge(self, requestor, service, nodeIdentifier): + raise Unsupported('purge-nodes') + + + def delete(self, requestor, service, nodeIdentifier): + raise Unsupported('delete-nodes') + + + +class PubSubResource(object): + + implements(IPubSubResource) + + features = [] + discoIdentity = disco.DiscoIdentity('pubsub', + 'service', + 'Publish-Subscribe Service') + + + def locateResource(self, request): + return self + + + def getInfo(self, requestor, service, nodeIdentifier): + return defer.succeed(None) + + + def getNodes(self, requestor, service, nodeIdentifier): + return defer.succeed([]) + + + def getConfigurationOptions(self): + return {} + + + def publish(self, request): + return defer.fail(Unsupported('publish')) + + + def subscribe(self, request): + return defer.fail(Unsupported('subscribe')) + + + def unsubscribe(self, request): + return defer.fail(Unsupported('subscribe')) + + + def subscriptions(self, request): + return defer.fail(Unsupported('retrieve-subscriptions')) + + + def affiliations(self, request): + return defer.fail(Unsupported('retrieve-affiliations')) + + + def create(self, request): + return defer.fail(Unsupported('create-nodes')) + + + def default(self, request): + return defer.fail(Unsupported('retrieve-default')) + + + def configureGet(self, request): + return defer.fail(Unsupported('config-node')) + + + def configureSet(self, request): + return defer.fail(Unsupported('config-node')) + + + def items(self, request): + return defer.fail(Unsupported('retrieve-items')) + + + def retract(self, request): + return defer.fail(Unsupported('retract-items')) + + + def purge(self, request): + return defer.fail(Unsupported('purge-nodes')) + + + def delete(self, request): + return defer.fail(Unsupported('delete-nodes')) + + + def affiliationsGet(self, request): + return defer.fail(Unsupported('modify-affiliations')) + + + def affiliationsSet(self, request): + return defer.fail(Unsupported('modify-affiliations')) + + + def subscriptionsGet(self, request): + return defer.fail(Unsupported('manage-subscriptions')) + + + def subscriptionsSet(self, request): + return defer.fail(Unsupported('manage-subscriptions'))