# HG changeset patch # User Goffi # Date 1439669607 -7200 # Node ID 389357fd79cee4b046162f722d12f7142d917622 # Parent 227856b13d7aed9554595abd93934633ab675c31 plugin XEP-0060: use of new RTDeferredSession to subscribe many nodes at once + subscribeToMany can now subscribe on separate services diff -r 227856b13d7a -r 389357fd79ce src/plugins/plugin_xep_0060.py --- a/src/plugins/plugin_xep_0060.py Sat Aug 15 22:13:27 2015 +0200 +++ b/src/plugins/plugin_xep_0060.py Sat Aug 15 22:13:27 2015 +0200 @@ -22,11 +22,18 @@ from sat.core.log import getLogger log = getLogger(__name__) -from wokkel import disco, pubsub, rsm +from sat.tools import sat_defer + +from twisted.words.protocols.jabber import jid +from wokkel import disco +from wokkel import pubsub +from wokkel import rsm from zope.interface import implements # from twisted.internet import defer import uuid +UNSPECIFIED = "unspecified error" + PLUGIN_INFO = { "name": "Publish-Subscribe", @@ -57,6 +64,9 @@ log.info(_(u"PubSub plugin initialization")) self.host = host self.managedNodes = [] + self.rt_sessions = sat_defer.RTDeferredSessions() + host.bridge.addMethod("subscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) + host.bridge.addMethod("getSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) def getHandler(self, profile): client = self.host.getClient(profile) @@ -185,32 +195,62 @@ client = self.host.getClient(profile_key) return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) - # @defer.inlineCallbacks - # def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): - # """Massively subscribe to many nodes. - - # @param service (JID): target service. - # @param nodeIdentifiers (list): the list of node identifiers to subscribe to. - # @param sub_id (str): optional subscription identifier. - # @param options (list): optional list of subscription options - # @param profile_key (str): %(doc_profile_key)s - # @return: list of Deferred instances. - # """ - # client = self.host.getClient(profile_key) - # found_nodes = yield self.listNodes(service, profile=client.profile) - # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) - # d_list = [] - # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): - # if nodeIdentifier not in found_nodes: - # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) - # continue # avoid sat-pubsub "SubscriptionExists" error - # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) - # defer.returnValue(d_list) - def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) return client.pubsub_client.subscriptions(service, nodeIdentifier) + ## methods to manage several stanzas/jids at once ## + + # subscribe # + + def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): + """Get real-time results for subcribeToManu session + + @param session_id: id of the real-time deferred session + @param return (tuple): (remaining, results) where: + - remaining is the number of still expected results + - results is a list of tuple(unicode, unicode, bool, unicode) with: + - service: pubsub service + - and node: pubsub node + - failure(unicode): empty string in case of success, error message else + @param profile_key: %(doc_profile_key)s + """ + profile = self.host.getClient(profile_key).profile + d = self.rt_sessions.getResults(session_id, on_success=lambda result:'', on_error=lambda failure:unicode(failure.value), profile=profile) + # we need to convert jid.JID to unicode with full() to serialise it for the bridge + d.addCallback(lambda ret: (ret[0], [(service.full(), node, '' if success else failure or UNSPECIFIED) + for (service, node), (success, failure) in ret[1].iteritems()])) + return d + + def _subscribeToMany(self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE): + return self.subscribeToMany([(jid.JID(service), unicode(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key) + + def subscribeToMany(self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE): + """Subscribe to several nodes at once. + + @param node_data (iterable[tuple]): iterable of tuple (service, node) where: + - service (jid.JID) is the pubsub service + - node (unicode) is the node to subscribe to + @param subscriber (jid.JID): optional subscription identifier. + @param options (dict): subscription options + @param profile_key (str): %(doc_profile_key)s + @return (str): RT Deferred session id + """ + client = self.host.getClient(profile_key) + deferreds = {} + for service, node in node_data: + deferreds[(service, node)] = client.pubsub_client.subscribe(service, node, subscriber, options=options) + return self.rt_sessions.newSession(deferreds, client.profile) + # found_nodes = yield self.listNodes(service, profile=client.profile) + # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) + # d_list = [] + # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): + # if nodeIdentifier not in found_nodes: + # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) + # continue # avoid sat-pubsub "SubscriptionExists" error + # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) + # defer.returnValue(d_list) + class SatPubSubClient(rsm.PubSubClient): implements(disco.IDisco)