Mercurial > libervia-pubsub
view idavoll/pubsub.py @ 121:4f0113adb7ed
Add Node._check_node_exists() calls to all Node methods, because nodes could
have
been deleted in between calls.
Add Node.get_subscription().
Only fire deferred (with None) on success of Node.add_subscription().
Fix Node.set_configuration() to actually work and only update the Node objects
configuration when the SQL query has succeeded.
Implement Node.remove_subscription().
Implement Node.is_subscribed().
Implement LeafNode methods (unchecked!).
author | Ralph Meijer <ralphm@ik.nu> |
---|---|
date | Tue, 12 Apr 2005 12:26:05 +0000 |
parents | 7043839982ba |
children | 8527bce09862 |
line wrap: on
line source
from twisted.words.protocols.jabber import component,jid from twisted.xish import utility, domish from twisted.python import components from twisted.internet import defer from zope.interface import implements import backend import storage import xmpp_error import disco import data_form NS_COMPONENT = 'jabber:component:accept' NS_PUBSUB = 'http://jabber.org/protocol/pubsub' NS_PUBSUB_EVENT = NS_PUBSUB + '#event' NS_PUBSUB_ERRORS = NS_PUBSUB + '#errors' NS_PUBSUB_OWNER = NS_PUBSUB + "#owner" IQ_GET = '/iq[@type="get"]' IQ_SET = '/iq[@type="set"]' PUBSUB_ELEMENT = '/pubsub[@xmlns="' + NS_PUBSUB + '"]' PUBSUB_OWNER_ELEMENT = '/pubsub[@xmlns="' + NS_PUBSUB_OWNER + '"]' PUBSUB_GET = IQ_GET + PUBSUB_ELEMENT PUBSUB_SET = IQ_SET + PUBSUB_ELEMENT PUBSUB_OWNER_GET = IQ_GET + PUBSUB_OWNER_ELEMENT PUBSUB_OWNER_SET = IQ_SET + PUBSUB_OWNER_ELEMENT PUBSUB_CREATE = PUBSUB_SET + '/create' PUBSUB_PUBLISH = PUBSUB_SET + '/publish' PUBSUB_SUBSCRIBE = PUBSUB_SET + '/subscribe' PUBSUB_UNSUBSCRIBE = PUBSUB_SET + '/unsubscribe' PUBSUB_OPTIONS_GET = PUBSUB_GET + '/options' PUBSUB_OPTIONS_SET = PUBSUB_SET + '/options' PUBSUB_CONFIGURE_GET = PUBSUB_OWNER_GET + '/configure' PUBSUB_CONFIGURE_SET = PUBSUB_OWNER_SET + '/configure' PUBSUB_AFFILIATIONS = PUBSUB_GET + '/affiliations' PUBSUB_ITEMS = PUBSUB_GET + '/items' PUBSUB_RETRACT = PUBSUB_SET + '/retract' PUBSUB_PURGE = PUBSUB_SET + '/purge' PUBSUB_DELETE = PUBSUB_SET + '/delete' class Error(Exception): pubsub_error = None stanza_error = None msg = '' class NotImplemented(Error): stanza_error = 'feature-not-implemented' class BadRequest(Error): stanza_error = 'bad-request' class OptionsUnavailable(Error): stanza_error = 'feature-not-implemented' pubsub_error = 'subscription-options-unavailable' class NodeNotConfigurable(Error): stanza_error = 'feature-not-implemented' pubsub_error = 'node-not-configurable' error_map = { storage.NodeNotFound: ('item-not-found', None), storage.NodeExists: ('conflict', None), backend.NotAuthorized: ('not-authorized', None), backend.NoPayloadAllowed: ('bad-request', None), backend.PayloadExpected: ('bad-request', None), backend.NoInstantNodes: ('not-acceptable', None), backend.NotImplemented: ('feature-not-implemented', None), backend.NotSubscribed: ('not-authorized', 'requestor-not-subscribed'), backend.InvalidConfigurationOption: ('not-acceptable', None), backend.InvalidConfigurationValue: ('not-acceptable', None), } class Service(component.Service): implements(component.IService) def __init__(self, backend): self.backend = backend def error(self, failure, iq): try: e = failure.trap(Error, *error_map.keys()) except: failure.printBriefTraceback() xmpp_error.error_from_iq(iq, 'internal-server-error') return iq else: if e == Error: stanza_error = failure.value.stanza_error pubsub_error = failure.value.pubsub_error msg = '' else: stanza_error, pubsub_error = error_map[e] msg = failure.value.msg xmpp_error.error_from_iq(iq, stanza_error, msg) if pubsub_error: iq.error.addElement((NS_PUBSUB_ERRORS, pubsub_error)) return iq def success(self, result, iq): iq.swapAttributeValues("to", "from") iq["type"] = 'result' iq.children = [] if result: for child in result: iq.addChild(child) return iq def handler_wrapper(self, handler, iq): try: d = handler(iq) except: d = defer.fail() d.addCallback(self.success, iq) d.addErrback(self.error, iq) d.addCallback(self.send) iq.handled = True class ComponentServiceFromService(Service): def __init__(self, backend): Service.__init__(self, backend) self.hide_nodes = False def get_disco_info(self, node): info = [] if not node: info.append(disco.Identity('pubsub', 'generic', 'Generic Pubsub Service')) if self.backend.supports_publisher_affiliation(): info.append(disco.Feature(NS_PUBSUB + "#publisher-affiliation")) if self.backend.supports_outcast_affiliation(): info.append(disco.Feature(NS_PUBSUB + "#outcast-affiliation")) if self.backend.supports_persistent_items(): info.append(disco.Feature(NS_PUBSUB + "#persistent-items")) return defer.succeed(info) else: try: d = self.backend.get_node_type(node) d.addCallback(self._add_identity, [], node) d.addErrback(lambda _: []) except storage.NodeNotFound: return defer.succeed([]) return d def _add_identity(self, node_type, result_list, node): result_list.append(disco.Identity('pubsub', node_type)) d = self.backend.get_node_meta_data(node) d.addCallback(self._add_meta_data, node_type, result_list) return d def _add_meta_data(self, meta_data, node_type, result_list): form = data_form.Form(type="result", form_type=NS_PUBSUB + "#meta-data") for meta_datum in meta_data: form.add_field_single(**meta_datum) form.add_field_single("text-single", "pubsub#node_type", "The type of node (collection or leaf)", node_type) result_list.append(form) return result_list def get_disco_items(self, node): if node or self.hide_nodes: return defer.succeed([]) d = self.backend.get_nodes() d.addCallback(lambda nodes: [disco.Item(self.parent.jabberId, node) for node in nodes]) return d components.registerAdapter(ComponentServiceFromService, backend.IBackendService, component.IService) class ComponentServiceFromNotificationService(Service): def componentConnected(self, xmlstream): self.backend.register_notifier(self.notify) def notify(self, object): node_id = object["node_id"] items = object["items"] d = self.backend.get_notification_list(node_id, items) d.addCallback(self._notify, node_id) def _notify(self, list, node_id): for recipient, items in list: self._notify_recipient(recipient, node_id, items) def _notify_recipient(self, recipient, node_id, itemlist): message = domish.Element((NS_COMPONENT, "message")) message["from"] = self.parent.jabberId message["to"] = recipient.full() event = message.addElement((NS_PUBSUB_EVENT, "event")) items = event.addElement("items") items["node"] = node_id items.children.extend(itemlist) self.send(message) components.registerAdapter(ComponentServiceFromNotificationService, backend.INotificationService, component.IService) class ComponentServiceFromPublishService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_PUBLISH, self.onPublish) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#item-ids")) return defer.succeed(info) def onPublish(self, iq): self.handler_wrapper(self._onPublish, iq) def _onPublish(self, iq): try: node = iq.pubsub.publish["node"] except KeyError: raise BadRequest items = [] for child in iq.pubsub.publish.children: if child.__class__ == domish.Element and child.name == 'item': items.append(child) return self.backend.publish(node, items, jid.JID(iq["from"]).userhostJID()) components.registerAdapter(ComponentServiceFromPublishService, backend.IPublishService, component.IService) class ComponentServiceFromSubscriptionService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_SUBSCRIBE, self.onSubscribe) xmlstream.addObserver(PUBSUB_UNSUBSCRIBE, self.onUnsubscribe) xmlstream.addObserver(PUBSUB_OPTIONS_GET, self.onOptionsGet) xmlstream.addObserver(PUBSUB_OPTIONS_SET, self.onOptionsSet) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + '#subscribe')) return defer.succeed(info) def onSubscribe(self, iq): self.handler_wrapper(self._onSubscribe, iq) def _onSubscribe(self, iq): try: node_id = iq.pubsub.subscribe["node"] subscriber = jid.JID(iq.pubsub.subscribe["jid"]) except KeyError: raise BadRequest requestor = jid.JID(iq["from"]).userhostJID() d = self.backend.subscribe(node_id, subscriber, requestor) d.addCallback(self.return_subscription) return d def return_subscription(self, result): reply = domish.Element((NS_PUBSUB, "pubsub")) entity = reply.addElement("entity") entity["node"] = result["node"] entity["jid"] = result["jid"].full() entity["affiliation"] = result["affiliation"] or 'none' entity["subscription"] = result["subscription"] return [reply] def onUnsubscribe(self, iq): self.handler_wrapper(self._onUnsubscribe, iq) def _onUnsubscribe(self, iq): try: node_id = iq.pubsub.unsubscribe["node"] subscriber = jid.JID(iq.pubsub.unsubscribe["jid"]) except KeyError: raise BadRequest requestor = jid.JID(iq["from"]).userhostJID() return self.backend.unsubscribe(node_id, subscriber, requestor) def onOptionsGet(self, iq): self.handler_wrapper(self._onOptionsGet, iq) def _onOptionsGet(self, iq): raise OptionsUnavailable def onOptionsSet(self, iq): self.handler_wrapper(self._onOptionsSet, iq) def _onOptionsSet(self, iq): raise OptionsUnavailable components.registerAdapter(ComponentServiceFromSubscriptionService, backend.ISubscriptionService, component.IService) class ComponentServiceFromNodeCreationService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_CREATE, self.onCreate) xmlstream.addObserver(PUBSUB_CONFIGURE_GET, self.onConfigureGet) xmlstream.addObserver(PUBSUB_CONFIGURE_SET, self.onConfigureSet) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#create-nodes")) info.append(disco.Feature(NS_PUBSUB + "#config-node")) if self.backend.supports_instant_nodes(): info.append(disco.Feature(NS_PUBSUB + "#instant-nodes")) return defer.succeed(info) def onCreate(self, iq): self.handler_wrapper(self._onCreate, iq) def _onCreate(self, iq): node = iq.pubsub.create.getAttribute("node") owner = jid.JID(iq["from"]).userhostJID() d = self.backend.create_node(node, owner) d.addCallback(self._return_create_response, iq) return d def _return_create_response(self, result, iq): node_id = iq.pubsub.create.getAttribute("node") if not node_id or node_id != result: reply = domish.Element((NS_PUBSUB, 'pubsub')) entity = reply.addElement('create') entity['node'] = result return [reply] def onConfigureGet(self, iq): self.handler_wrapper(self._onConfigureGet, iq) def _onConfigureGet(self, iq): try: node_id = iq.pubsub.configure["node"] except KeyError: raise NodeNotConfigurable d = self.backend.get_node_configuration(node_id) d.addCallback(self._return_configuration_response, node_id) return d def _return_configuration_response(self, options, node_id): reply = domish.Element((NS_PUBSUB_OWNER, "pubsub")) configure = reply.addElement("configure") if node_id: configure["node"] = node_id form = data_form.Form(type="form", form_type=NS_PUBSUB + "#node_config") for option in options: form.add_field_single(**option) form.parent = configure configure.addChild(form) return [reply] def onConfigureSet(self, iq): self.handler_wrapper(self._onConfigureSet, iq) def _onConfigureSet(self, iq): try: node_id = iq.pubsub.configure["node"] except KeyError: raise BadRequest requestor = jid.JID(iq["from"]).userhostJID() for element in iq.pubsub.configure.elements(): if element.name != 'x' or element.uri != data_form.NS_X_DATA: continue type = element.getAttribute("type") if type == "cancel": return None elif type != "submit": continue options = self._get_form_options(element) if options["FORM_TYPE"] == NS_PUBSUB + "#node_config": del options["FORM_TYPE"] return self.backend.set_node_configuration(node_id, options, requestor) raise BadRequest def _get_form_options(self, form): options = {} for element in form.elements(): if element.name == 'field' and element.uri == data_form.NS_X_DATA: try: options[element["var"]] = str(element.value) except (KeyError, AttributeError): raise BadRequest return options components.registerAdapter(ComponentServiceFromNodeCreationService, backend.INodeCreationService, component.IService) class ComponentServiceFromAffiliationsService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_AFFILIATIONS, self.onAffiliations) def onAffiliations(self, iq): self.handler_wrapper(self._onAffiliations, iq) def _onAffiliations(self, iq): d = self.backend.get_affiliations(jid.JID(iq["from"]).userhostJID()) d.addCallback(self._return_affiliations_response, iq) return d def _return_affiliations_response(self, result, iq): reply = domish.Element((NS_PUBSUB, 'pubsub')) affiliations = reply.addElement('affiliations') for r in result: entity = affiliations.addElement('entity') entity['node'] = r['node'] entity['jid'] = r['jid'].full() entity['affiliation'] = r['affiliation'] or 'none' entity['subscription'] = r['subscription'] or 'none' return [reply] components.registerAdapter(ComponentServiceFromAffiliationsService, backend.IAffiliationsService, component.IService) class ComponentServiceFromItemRetrievalService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_ITEMS, self.onItems) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#retrieve-items")) return defer.succeed(info) def onItems(self, iq): self.handler_wrapper(self._onItems, iq) def _onItems(self, iq): try: node_id = iq.pubsub.items["node"] except KeyError: raise BadRequest max_items = iq.pubsub.items.getAttribute('max_items') if max_items: try: max_items = int(max_items) except ValueError: raise BadRequest item_ids = [] for child in iq.pubsub.items.children: if child.name == 'item' and child.uri == NS_PUBSUB: try: item_ids.append(child["id"]) except KeyError: raise BadRequest d = self.backend.get_items(node_id, jid.JID(iq["from"]), max_items, item_ids) d.addCallback(self._return_items_response, node_id) return d def _return_items_response(self, result, node_id): reply = domish.Element((NS_PUBSUB, 'pubsub')) items = reply.addElement('items') items["node"] = node_id for r in result: items.addRawXml(r) return [reply] components.registerAdapter(ComponentServiceFromItemRetrievalService, backend.IItemRetrievalService, component.IService) class ComponentServiceFromRetractionService(Service): def componentConnected(self, xmlstream): xmlstream.addObserver(PUBSUB_RETRACT, self.onRetract) xmlstream.addObserver(PUBSUB_PURGE, self.onPurge) def get_disco_info(self, node): info = [] if not node: info.append(disco.Feature(NS_PUBSUB + "#delete-any")) return defer.succeed(info) def onRetract(self, iq): self.handler_wrapper(self._onRetract, iq) def _onRetract(self, iq): try: node = iq.pubsub.retract["node"] except KeyError: raise BadRequest item_ids = [] for child in iq.pubsub.retract.children: if child.__class__ == domish.Element and child.name == 'item': try: item_ids.append(child["id"]) except KeyError: raise BadRequest return self.backend.retract_item(node, item_ids, jid.JID(iq["from"]).userhostJID()) def onPurge(self, iq): self.handler_wrapper(self._onPurge, iq) def _onPurge(self, iq): try: node = iq.pubsub.purge["node"] except KeyError: raise BadRequest return self.backend.purge_node(node, jid.JID(iq["from"]).userhostJID()) components.registerAdapter(ComponentServiceFromRetractionService, backend.IRetractionService, component.IService) class ComponentServiceFromNodeDeletionService(Service): def __init__(self, backend): Service.__init__(self, backend) self.subscribers = [] def componentConnected(self, xmlstream): self.backend.register_pre_delete(self._pre_delete) xmlstream.addObserver(PUBSUB_DELETE, self.onDelete) def _pre_delete(self, node_id): d = self.backend.get_subscribers(node_id) d.addCallback(self._return_deferreds, node_id) return d def _return_deferreds(self, subscribers, node_id): d = defer.Deferred() d.addCallback(self._notify, subscribers, node_id) return [d] def _notify(self, result, subscribers, node_id): message = domish.Element((NS_COMPONENT, "message")) message["from"] = self.parent.jabberId event = message.addElement((NS_PUBSUB_EVENT, "event")) event.addElement("delete")["node"] = node_id for subscriber in subscribers: message["to"] = subscriber self.send(message) def onDelete(self, iq): self.handler_wrapper(self._onDelete, iq) def _onDelete(self, iq): try: node = iq.pubsub.delete["node"] except KeyError: raise BadRequest return self.backend.delete_node(node, jid.JID(iq["from"]).userhostJID()) components.registerAdapter(ComponentServiceFromNodeDeletionService, backend.INodeDeletionService, component.IService)