Mercurial > libervia-pubsub
view idavoll/pubsub.py @ 155:5191ba7c4df8
Work towards first release 0.5.0.
- Add licensing information (MIT)
- Improve installation instructions.
- Use new plugins framework in twisted.
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Mon, 21 Aug 2006 16:05:35 +0000 |
parents | bd8e58c73370 |
children | 6fe78048baf9 |
line wrap: on
line source
# Copyright (c) 2003-2006 Ralph Meijer # See LICENSE for details. from twisted.words.protocols.jabber import component,jid from twisted.words.xish import utility, domish from twisted.python import components from twisted.internet import defer from zope.interface import implements import backend import storage import xmpp_error import disco import data_form if issubclass(domish.SerializedXML, str): # Work around bug # in twisted Xish class SerializedXML(unicode): """ Marker class for pre-serialized XML in the DOM. """ domish.SerializedXML = SerializedXML NS_COMPONENT = 'jabber:component:accept' NS_PUBSUB = 'http://jabber.org/protocol/pubsub' NS_PUBSUB_EVENT = NS_PUBSUB + '#event' NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" IQ_GET = '/iq[@type="get"]' IQ_SET = '/iq[@type="set"]' PUBSUB_ELEMENT = '/pubsub[@xmlns="' + NS_PUBSUB + '"]' PUBSUB_OWNER_ELEMENT = '/pubsub[@xmlns="' + NS_PUBSUB_OWNER + '"]' PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT PUBSUB_CREATE = PUBSUB_SET + '/create' PUBSUB_PUBLISH = PUBSUB_SET + '/publish' PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' PUBSUB_SUBSCRIPTIONS = PUBSUB_GET + '/subscriptions' PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' PUBSUB_ITEMS = PUBSUB_GET + '/items' PUBSUB_RETRACT = PUBSUB_SET + '/retract' PUBSUB_PURGE = PUBSUB_OWNER_SET + '/purge' PUBSUB_DELETE = PUBSUB_OWNER_SET + '/delete' class Error(Exception): pubsub_error = None stanza_error = None msg = '' class NotImplemented(Error): stanza_error = 'feature-not-implemented' class BadRequest(Error): stanza_error = 'bad-request' class OptionsUnavailable(Error): stanza_error = 'feature-not-implemented' pubsub_error = 'subscription-options-unavailable' class NodeNotConfigurable(Error): stanza_error = 'feature-not-implemented' pubsub_error = 'node-not-configurable' error_map = { storage.NodeNotFound: ('item-not-found', None), storage.NodeExists: ('conflict', None), storage.SubscriptionNotFound: ('not-authorized', 'not-subscribed'), backend.NotAuthorized: ('not-authorized', None), backend.NoPayloadAllowed: ('bad-request', None), backend.PayloadExpected: ('bad-request', None), backend.NoInstantNodes: ('not-acceptable', None), backend.NotImplemented: ('feature-not-implemented', None), backend.InvalidConfigurationOption: ('not-acceptable', None), backend.InvalidConfigurationValue: ('not-acceptable', None), } class Service(component.Service): implements(component.IService) def __init__(self, backend): self.backend = backend def error(self, failure, iq): try: e = failure.trap(Error, *error_map.keys()) except: failure.printBriefTraceback() xmpp_error.error_from_iq(iq, 'internal-server-error') return iq else: if e == Error: stanza_error = failure.value.stanza_error pubsub_error = failure.value.pubsub_error msg = '' else: stanza_error, pubsub_error = error_map[e] msg = failure.value.msg xmpp_error.error_from_iq(iq, stanza_error, msg) if pubsub_error: iq.error.addElement((NS_PUBSUB_ERRORS, pubsub_error)) return iq def success(self, result, iq): iq.swapAttributeValues("to", "from") iq["type"] = 'result' iq.children = [] if result: for child in result: iq.addChild(child) return iq def handler_wrapper(self, handler, iq): try: d = handler(iq) except: d = defer.fail() d.addCallback(self.success, iq) d.addErrback(self.error, iq) d.addCallback(self.send) iq.handled = True class ComponentServiceFromService(Service): def __init__(self, backend): Service.__init__(self, backend) self.hide_nodes = False def get_disco_info(self, node): info = [] if not node: info.append(disco.Identity('pubsub', 'generic', 'Generic Pubsub Service')) info.append(disco.Feature(NS_PUBSUB + "#meta-data")) if self.backend.supports_outcast_affiliation(): info.append(disco.Feature(NS_PUBSUB + "#outcast-affiliation")) if self.backend.supports_persistent_items(): info.append(disco.Feature(NS_PUBSUB + "#persistent-items")) if self.backend.supports_publisher_affiliation(): info.append(disco.Feature(NS_PUBSUB + "#publisher-affiliation")) return defer.succeed(info) else: def trap_not_found(result): result.trap(storage.NodeNotFound) return [] d = self.backend.get_node_type(node) d.addCallback(self._add_identity, [], node) d.addErrback(trap_not_found) return d def _add_identity(self, node_type, result_list, node): result_list.append(disco.Identity('pubsub', node_type)) d = self.backend.get_node_meta_data(node) d.addCallback(self._add_meta_data, node_type, result_list) return d def _add_meta_data(self, meta_data, node_type, result_list): form = data_form.Form(type="result", form_type=NS_PUBSUB + "#meta-data") for meta_datum in meta_data: try: del meta_datum['options'] except KeyError: pass form.add_field(**meta_datum) form.add_field("text-single", "pubsub#node_type", "The type of node (collection or leaf)", node_type) result_list.append(form) return result_list def get_disco_items(self, node): if node or self.hide_nodes: return defer.succeed([]) d = self.backend.get_nodes() d.addCallback(lambda nodes: [disco.Item(self.parent.jabberId, node) for node in nodes]) return d components.registerAdapter(ComponentServiceFromService, backend.IBackendService, component.IService) class ComponentServiceFromNotificationService(Service): def componentConnected(self, xmlstream): self.backend.register_notifier(self.notify) def notify(self, object): node_id = object["node_id"] items = object["items"] d = self.backend.get_notification_list(node_id, items) d.addCallback(self._notify, node_id) def _notify(self, list, node_id): for recipient, items in list: self._notify_recipient(recipient, node_id, items) def _notify_recipient(self, recipient, node_id, itemlist): message = domish.Element((NS_COMPONENT, "message")) message["from"] = self.parent.jabberId message["to"] = recipient.full() event = message.addElement((NS_PUBSUB_EVENT, "event")) items = event.addElement("items") items["node"] = node_id items.children.extend(itemlist) self.send(message) components.registerAdapter(ComponentServiceFromNotificationService, backend.INotificationService, component.IService) class ComponentServiceFromPublishService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_PUBLISH, self.onPublish) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#item-ids")) return defer.succeed(info) def onPublish(self, iq): self.handler_wrapper(self._onPublish, iq) def _onPublish(self, iq): try: node = iq.pubsub.publish["node"] except KeyError: raise BadRequest items = [] for child in iq.pubsub.publish.children: if child.__class__ == domish.Element and child.name == 'item': items.append(child) return self.backend.publish(node, items, jid.internJID(iq["from"]).userhostJID()) components.registerAdapter(ComponentServiceFromPublishService, backend.IPublishService, component.IService) class ComponentServiceFromSubscriptionService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_SUBSCRIBE, self.onSubscribe) xmlstream.addObserver(PUBSUB_UNSUBSCRIBE, self.onUnsubscribe) xmlstream.addObserver(PUBSUB_OPTIONS_GET, self.onOptionsGet) xmlstream.addObserver(PUBSUB_OPTIONS_SET, self.onOptionsSet) xmlstream.addObserver(PUBSUB_SUBSCRIPTIONS, self.onSubscriptions) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + '#subscribe')) info.append(disco.Feature(NS_PUBSUB + '#retrieve-subscriptions')) return defer.succeed(info) def onSubscribe(self, iq): self.handler_wrapper(self._onSubscribe, iq) def _onSubscribe(self, iq): try: node_id = iq.pubsub.subscribe["node"] subscriber = jid.internJID(iq.pubsub.subscribe["jid"]) except KeyError: raise BadRequest requestor = jid.internJID(iq["from"]).userhostJID() d = self.backend.subscribe(node_id, subscriber, requestor) d.addCallback(self.return_subscription, subscriber) return d def return_subscription(self, result, subscriber): node, state = result reply = domish.Element((NS_PUBSUB, "pubsub")) subscription = reply.addElement("subscription") subscription["node"] = node subscription["jid"] = subscriber.full() subscription["subscription"] = state return [reply] def onUnsubscribe(self, iq): self.handler_wrapper(self._onUnsubscribe, iq) def _onUnsubscribe(self, iq): try: node_id = iq.pubsub.unsubscribe["node"] subscriber = jid.internJID(iq.pubsub.unsubscribe["jid"]) except KeyError: raise BadRequest requestor = jid.internJID(iq["from"]).userhostJID() return self.backend.unsubscribe(node_id, subscriber, requestor) def onOptionsGet(self, iq): self.handler_wrapper(self._onOptionsGet, iq) def _onOptionsGet(self, iq): raise OptionsUnavailable def onOptionsSet(self, iq): self.handler_wrapper(self._onOptionsSet, iq) def _onOptionsSet(self, iq): raise OptionsUnavailable def onSubscriptions(self, iq): self.handler_wrapper(self._onSubscriptions, iq) def _onSubscriptions(self, iq): entity = jid.internJID(iq["from"]).userhostJID() d = self.backend.get_subscriptions(entity) d.addCallback(self._return_subscriptions_response, iq) return d def _return_subscriptions_response(self, result, iq): reply = domish.Element((NS_PUBSUB, 'pubsub')) subscriptions = reply.addElement('subscriptions') for node, subscriber, state in result: item = subscriptions.addElement('subscription') item['node'] = node item['jid'] = subscriber.full() item['subscription'] = state return [reply] components.registerAdapter(ComponentServiceFromSubscriptionService, backend.ISubscriptionService, component.IService) class ComponentServiceFromNodeCreationService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_CREATE, self.onCreate) xmlstream.addObserver(PUBSUB_CONFIGURE_GET, self.onConfigureGet) xmlstream.addObserver(PUBSUB_CONFIGURE_SET, self.onConfigureSet) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#create-nodes")) info.append(disco.Feature(NS_PUBSUB + "#config-node")) if self.backend.supports_instant_nodes(): info.append(disco.Feature(NS_PUBSUB + "#instant-nodes")) return defer.succeed(info) def onCreate(self, iq): self.handler_wrapper(self._onCreate, iq) def _onCreate(self, iq): node = iq.pubsub.create.getAttribute("node") owner = jid.internJID(iq["from"]).userhostJID() d = self.backend.create_node(node, owner) d.addCallback(self._return_create_response, iq) return d def _return_create_response(self, result, iq): node_id = iq.pubsub.create.getAttribute("node") if not node_id or node_id != result: reply = domish.Element((NS_PUBSUB, 'pubsub')) entity = reply.addElement('create') entity['node'] = result return [reply] def onConfigureGet(self, iq): self.handler_wrapper(self._onConfigureGet, iq) def _onConfigureGet(self, iq): try: node_id = iq.pubsub.configure["node"] except KeyError: raise NodeNotConfigurable d = self.backend.get_node_configuration(node_id) d.addCallback(self._return_configuration_response, node_id) return d def _return_configuration_response(self, options, node_id): reply = domish.Element((NS_PUBSUB_OWNER, "pubsub")) configure = reply.addElement("configure") if node_id: configure["node"] = node_id form = data_form.Form(type="form", form_type=NS_PUBSUB + "#node_config") for option in options: form.add_field(**option) form.parent = configure configure.addChild(form) return [reply] def onConfigureSet(self, iq): self.handler_wrapper(self._onConfigureSet, iq) def _onConfigureSet(self, iq): try: node_id = iq.pubsub.configure["node"] except KeyError: raise BadRequest requestor = jid.internJID(iq["from"]).userhostJID() for element in iq.pubsub.configure.elements(): if element.name != 'x' or element.uri != data_form.NS_X_DATA: continue type = element.getAttribute("type") if type == "cancel": return None elif type != "submit": continue options = self._get_form_options(element) if options["FORM_TYPE"] == NS_PUBSUB + "#node_config": del options["FORM_TYPE"] return self.backend.set_node_configuration(node_id, options, requestor) raise BadRequest def _get_form_options(self, form): options = {} for element in form.elements(): if element.name == 'field' and element.uri == data_form.NS_X_DATA: try: options[element["var"]] = str(element.value) except (KeyError, AttributeError): raise BadRequest return options components.registerAdapter(ComponentServiceFromNodeCreationService, backend.INodeCreationService, component.IService) class ComponentServiceFromAffiliationsService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_AFFILIATIONS, self.onAffiliations) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#retrieve-affiliations")) return defer.succeed(info) def onAffiliations(self, iq): self.handler_wrapper(self._onAffiliations, iq) def _onAffiliations(self, iq): entity = jid.internJID(iq["from"]).userhostJID() d = self.backend.get_affiliations(entity) d.addCallback(self._return_affiliations_response, iq) return d def _return_affiliations_response(self, result, iq): reply = domish.Element((NS_PUBSUB, 'pubsub')) affiliations = reply.addElement('affiliations') for node, affiliation in result: item = affiliations.addElement('affiliation') item['node'] = node item['affiliation'] = affiliation return [reply] components.registerAdapter(ComponentServiceFromAffiliationsService, backend.IAffiliationsService, component.IService) class ComponentServiceFromItemRetrievalService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_ITEMS, self.onItems) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#retrieve-items")) return defer.succeed(info) def onItems(self, iq): self.handler_wrapper(self._onItems, iq) def _onItems(self, iq): try: node_id = iq.pubsub.items["node"] except KeyError: raise BadRequest max_items = iq.pubsub.items.getAttribute('max_items') if max_items: try: max_items = int(max_items) except ValueError: raise BadRequest item_ids = [] for child in iq.pubsub.items.elements(): if child.name == 'item' and child.uri == NS_PUBSUB: try: item_ids.append(child["id"]) except KeyError: raise BadRequest d = self.backend.get_items(node_id, jid.internJID(iq["from"]), max_items, item_ids) d.addCallback(self._return_items_response, node_id) return d def _return_items_response(self, result, node_id): reply = domish.Element((NS_PUBSUB, 'pubsub')) items = reply.addElement('items') items["node"] = node_id for r in result: items.addRawXml(r) return [reply] components.registerAdapter(ComponentServiceFromItemRetrievalService, backend.IItemRetrievalService, component.IService) class ComponentServiceFromRetractionService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_RETRACT, self.onRetract) xmlstream.addObserver(PUBSUB_PURGE, self.onPurge) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#delete-any")) info.append(disco.Feature(NS_PUBSUB + "#retract-items")) info.append(disco.Feature(NS_PUBSUB + "#purge-nodes")) return defer.succeed(info) def onRetract(self, iq): self.handler_wrapper(self._onRetract, iq) def _onRetract(self, iq): try: node = iq.pubsub.retract["node"] except KeyError: raise BadRequest item_ids = [] for child in iq.pubsub.retract.children: if child.__class__ == domish.Element and child.name == 'item': try: item_ids.append(child["id"]) except KeyError: raise BadRequest requestor = jid.internJID(iq["from"]).userhostJID() return self.backend.retract_item(node, item_ids, requestor) def onPurge(self, iq): self.handler_wrapper(self._onPurge, iq) def _onPurge(self, iq): try: node = iq.pubsub.purge["node"] except KeyError: raise BadRequest return self.backend.purge_node(node, jid.internJID(iq["from"]).userhostJID()) components.registerAdapter(ComponentServiceFromRetractionService, backend.IRetractionService, component.IService) class ComponentServiceFromNodeDeletionService(Service): def __init__(self, backend): Service.__init__(self, backend) self.subscribers = [] def componentConnected(self, xmlstream): self.backend.register_pre_delete(self._pre_delete) xmlstream.addObserver(PUBSUB_DELETE, self.onDelete) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#delete-nodes")) return defer.succeed(info) def _pre_delete(self, node_id): d = self.backend.get_subscribers(node_id) d.addCallback(self._return_deferreds, node_id) return d def _return_deferreds(self, subscribers, node_id): d = defer.Deferred() d.addCallback(self._notify, subscribers, node_id) return [d] def _notify(self, result, subscribers, node_id): message = domish.Element((NS_COMPONENT, "message")) message["from"] = self.parent.jabberId event = message.addElement((NS_PUBSUB_EVENT, "event")) event.addElement("delete")["node"] = node_id for subscriber in subscribers: message["to"] = subscriber self.send(message) def onDelete(self, iq): self.handler_wrapper(self._onDelete, iq) def _onDelete(self, iq): try: node = iq.pubsub.delete["node"] except KeyError: raise BadRequest return self.backend.delete_node(node, jid.internJID(iq["from"]).userhostJID()) components.registerAdapter(ComponentServiceFromNodeDeletionService, backend.INodeDeletionService, component.IService)