Mercurial > libervia-backend
view src/plugins/plugin_xep_0060.py @ 1304:1a61b18703c4 frontends_multi_profiles
quick frontend (quick widgets): class' __name__ method is used for classes_map hash because the use of class directly was causing bugs with pyjamas (difficult to find, several MicroblogPanel instances were added only once in Libervia's TabPanel, hash method seemed buggy)
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Feb 2015 19:05:51 +0100 |
parents | faa1129559b8 |
children | f71a0fc26886 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT plugin for Publish-Subscribe (xep-0060) # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) from sat.memory.memory import Sessions from wokkel.compat import IQ from wokkel import disco, pubsub from zope.interface import implements from twisted.internet import defer PLUGIN_INFO = { "name": "Publish-Subscribe", "import_name": "XEP-0060", "type": "XEP", "protocols": ["XEP-0060"], "dependencies": [], "recommendations": ["XEP-0059"], "main": "XEP_0060", "handler": "yes", "description": _("""Implementation of PubSub Protocol""") } class XEP_0060(object): OPT_ACCESS_MODEL = 'pubsub#access_model' OPT_PERSIST_ITEMS = 'pubsub#persist_items' OPT_MAX_ITEMS = 'pubsub#max_items' OPT_DELIVER_PAYLOADS = 'pubsub#deliver_payloads' OPT_SEND_ITEM_SUBSCRIBE = 'pubsub#send_item_subscribe' OPT_NODE_TYPE = 'pubsub#node_type' OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type' OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth' OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' OPT_PUBLISH_MODEL = 'pubsub#publish_model' def __init__(self, host): log.info(_(u"PubSub plugin initialization")) self.host = host self.managedNodes = [] self.clients = {} self.node_cache = Sessions(timeout=60, resettable_timeout=False) def getHandler(self, profile): self.clients[profile] = SatPubSubClient(self.host, self) return self.clients[profile] def addManagedNode(self, node_name, callback): """Add a handler for a namespace @param namespace: NS of the handler (will appear in disco info) @param callback: method to call when the handler is found @param profile: profile which manage this handler""" self.managedNodes.append((node_name, callback)) def __getClientNProfile(self, profile_key, action='do pusbsub'): """Return a tuple of (client, profile) raise error when the profile doesn't exists @param profile_key: as usual :) @param action: text of action to show in case of error""" profile = self.host.memory.getProfileName(profile_key) if not profile: err_mess = _('Trying to %(action)s with an unknown profile key [%(profile_key)s]') % { 'action': action, 'profile_key': profile_key} log.error(err_mess) raise Exception(err_mess) try: client = self.clients[profile] except KeyError: err_mess = _('INTERNAL ERROR: no handler for required profile') log.error(err_mess) raise Exception(err_mess) return profile, client def _getDeferredNodeCache(self, session_id, init, profile): """Manage a node cache with deferred initialisation and concurrent access. @param session_id (string): node cache session ID @param init (Deferred): deferred list of strings to initialise the cache. @param profile (str): %(doc_profile)s @return: Deferred list[str] """ if session_id in self.node_cache: cache = self.node_cache.profileGet(session_id, profile) if cache['nodes'] is None: # init is still running d = defer.Deferred() cache['waiting'].append(d) return d return defer.succeed(cache['nodes']) cache = {'init': init, 'waiting': [], 'nodes': None} self.node_cache.newSession(cache, session_id=session_id, profile=profile) def cb(nodes): cache['nodes'] = nodes for d in cache['waiting']: d.callback(nodes) return nodes return init.addCallback(cb) def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): """Retrieve the name of the nodes that are accessible on the target service. @param service (JID): target service @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) @param profile (str): %(doc_profile)s @return: Deferred list[str] """ session_id = profile + '@found@' + service.userhost() d = self.host.getDiscoItems(service, nodeIdentifier, profile_key=profile) d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) return self._getDeferredNodeCache(session_id, d, profile) def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): """Retrieve the name of the nodes to which the profile is subscribed on the target service. @param service (JID): target service @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) @param filter_ (str): filter the result according to the given subscription type: - None: do not filter - 'pending': subscription has been approved yet by the node owner - 'unconfigured': subscription options have not been configured yet - 'subscribed': subscription is complete @param profile (str): %(doc_profile)s @return: Deferred list[str] """ session_id = profile + '@subscriptions@' + service.userhost() d = self.subscriptions(service, nodeIdentifier, profile_key=profile) d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) return self._getDeferredNodeCache(session_id, d, profile) def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'publish item') return client.publish(service, nodeIdentifier, items, client.parent.jid) def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'get items') return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) @defer.inlineCallbacks def getItemsFromMany(self, service, data, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): """Massively retrieve pubsub items from many nodes. @param service (JID): target service. @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. @param max_items (int): optional limit on the number of retrieved items *per node*. @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used). @param sub_id (str): optional subscription identifier. @param profile_key (str): %(doc_profile_key)s @return: dict binding a subset of the keys of data to Deferred instances. """ profile, client = self.__getClientNProfile(profile_key, 'get items') found_nodes = yield self.listNodes(service, profile=profile) d_dict = {} for publisher, node in data.items(): if node not in found_nodes: log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) continue # avoid pubsub "item-not-found" error d_dict[publisher] = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) defer.returnValue(d_dict) def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'get options') return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'set options') return client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'create node') return client.createNode(service, nodeIdentifier, options) def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'delete node') return client.deleteNode(service, nodeIdentifier) def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'retract items') return client.retractItems(service, nodeIdentifier, itemIdentifiers) def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'subscribe node') return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) @defer.inlineCallbacks def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): """Massively subscribe to many nodes. @param service (JID): target service. @param nodeIdentifiers (list): the list of node identifiers to subscribe to. @param sub_id (str): optional subscription identifier. @param options (list): optional list of subscription options @param profile_key (str): %(doc_profile_key)s @return: list of Deferred instances. """ profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes') found_nodes = yield self.listNodes(service, profile=profile) subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile) d_list = [] for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): if nodeIdentifier not in found_nodes: log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) continue # avoid sat-pubsub "SubscriptionExists" error d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)) defer.returnValue(d_list) def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions') return client.subscriptions(service, nodeIdentifier) class SatPubSubClient(pubsub.PubSubClient): implements(disco.IDisco) def __init__(self, host, parent_plugin): self.host = host self.parent_plugin = parent_plugin pubsub.PubSubClient.__init__(self) def connectionInitialized(self): pubsub.PubSubClient.connectionInitialized(self) # FIXME: we have to temporary override this method here just # to set the attributes itemIdentifiers which is not used # in pubsub.PubSubClient.items + use the XEP-0059 def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, subscriptionIdentifier=None, sender=None): """ Retrieve previously published items from a publish subscribe node. @param service: The publish subscribe service that keeps the node. @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} @param nodeIdentifier: The identifier of the node. @type nodeIdentifier: C{unicode} @param maxItems: Optional limit on the number of retrieved items. @type maxItems: C{int} @param itemIdentifiers: Identifiers of the items to be retrieved. @type itemIdentifiers: C{set} @param subscriptionIdentifier: Optional subscription identifier. In case the node has been subscribed to multiple times, this narrows the results to the specific subscription. @type subscriptionIdentifier: C{unicode} """ # TODO: add method attributes for RSM: before, after, index request = PubSubRequest('items', self.host, {'limit': maxItems} if maxItems else {}) request.recipient = service request.nodeIdentifier = nodeIdentifier if maxItems: request.maxItems = str(int(maxItems)) request.subscriptionIdentifier = subscriptionIdentifier request.sender = sender request.itemIdentifiers = itemIdentifiers # XXX: this line has been added def cb(iq): items = [] for element in iq.pubsub.items.elements(): if element.uri == pubsub.NS_PUBSUB and element.name == 'item': items.append(element) # TODO: return (items, self.host.plugins['XEP-0059'].extractMetadata(iq)) ?? return items d = request.send(self.xmlstream) d.addCallback(cb) return d # FIXME: this should be done in wokkel def retractItems(self, service, nodeIdentifier, itemIdentifiers, sender=None): """ Retract items from a publish subscribe node. @param service: The publish subscribe service to delete the node from. @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} @param nodeIdentifier: The identifier of the node. @type nodeIdentifier: C{unicode} @param itemIdentifiers: Identifiers of the items to be retracted. @type itemIdentifiers: C{set} """ request = PubSubRequest('retract') request.recipient = service request.nodeIdentifier = nodeIdentifier request.itemIdentifiers = itemIdentifiers request.sender = sender return request.send(self.xmlstream) def itemsReceived(self, event): if not self.host.trigger.point("PubSubItemsReceived", event, self.parent.profile): return for node in self.parent_plugin.managedNodes: if event.nodeIdentifier == node[0]: node[1](event, self.parent.profile) def deleteReceived(self, event): #TODO: manage delete event log.debug(_(u"Publish node deleted")) # def purgeReceived(self, event): @defer.inlineCallbacks def subscriptions(self, service, nodeIdentifier, sender=None): """Return the list of subscriptions to the given service and node. @param service: The publish subscribe service to retrieve the subscriptions from. @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). @type nodeIdentifier: C{unicode} """ request = PubSubRequest('subscriptions') request.recipient = service request.nodeIdentifier = nodeIdentifier request.sender = sender iq = yield request.send(self.xmlstream) defer.returnValue([sub for sub in iq.pubsub.subscriptions.elements() if (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')]) def getDiscoInfo(self, requestor, service, nodeIdentifier=''): disco_info = [] self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) return disco_info def getDiscoItems(self, requestor, service, nodeIdentifier=''): return [] class PubSubRequest(pubsub.PubSubRequest): def __init__(self, verb=None, host=None, page_attrs=None): """ @param verb (str): the type of pubsub request @param host (SAT): the SAT instance @param page_attrs (dict): options for RSM paging: - limit (int): the maximum number of items in the page - index (int): the starting index of the requested page - after (str, int): the element immediately preceding the page - before (str, int): the element immediately following the page """ self.verb = verb self.host = host self.page_attrs = page_attrs # FIXME: the redefinition of this wokkel method is the easiest way I found # to handle RSM. We should find a proper solution, maybe just add in wokkel an # empty method postProcessMessage, call it before sending and overwrite it here # instead of overwriting the whole send method. def send(self, xs): """ Send this request to its recipient. This renders all of the relevant parameters for this specific requests into an L{IQ}, and invoke its C{send} method. This returns a deferred that fires upon reception of a response. See L{IQ} for details. @param xs: The XML stream to send the request on. @type xs: L{twisted.words.protocols.jabber.xmlstream.XmlStream} @rtype: L{defer.Deferred}. """ try: (self.stanzaType, childURI, childName) = self._verbRequestMap[self.verb] except KeyError: raise NotImplementedError() iq = IQ(xs, self.stanzaType) iq.addElement((childURI, 'pubsub')) verbElement = iq.pubsub.addElement(childName) if self.sender: iq['from'] = self.sender.full() if self.recipient: iq['to'] = self.recipient.full() for parameter in self._parameters[self.verb]: getattr(self, '_render_%s' % parameter)(verbElement) # This lines have been added for RSM if self.host and 'XEP-0059' in self.host.plugins and self.page_attrs: self.page_attrs['stanza'] = iq self.host.plugins['XEP-0059'].requestPage(**self.page_attrs) return iq.send()