diff src/plugins/plugin_xep_0060.py @ 1449:389357fd79ce

plugin XEP-0060: use of new RTDeferredSession to subscribe many nodes at once + subscribeToMany can now subscribe on separate services
author Goffi <goffi@goffi.org>
date Sat, 15 Aug 2015 22:13:27 +0200
parents e8c8e467964b
children 9b88b19b1ca8
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0060.py	Sat Aug 15 22:13:27 2015 +0200
+++ b/src/plugins/plugin_xep_0060.py	Sat Aug 15 22:13:27 2015 +0200
@@ -22,11 +22,18 @@
 from sat.core.log import getLogger
 log = getLogger(__name__)
 
-from wokkel import disco, pubsub, rsm
+from sat.tools import sat_defer
+
+from twisted.words.protocols.jabber import jid
+from wokkel import disco
+from wokkel import pubsub
+from wokkel import rsm
 from zope.interface import implements
 # from twisted.internet import defer
 import uuid
 
+UNSPECIFIED = "unspecified error"
+
 
 PLUGIN_INFO = {
     "name": "Publish-Subscribe",
@@ -57,6 +64,9 @@
         log.info(_(u"PubSub plugin initialization"))
         self.host = host
         self.managedNodes = []
+        self.rt_sessions = sat_defer.RTDeferredSessions()
+        host.bridge.addMethod("subscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany)
+        host.bridge.addMethod("getSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True)
 
     def getHandler(self, profile):
         client = self.host.getClient(profile)
@@ -185,32 +195,62 @@
         client = self.host.getClient(profile_key)
         return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)
 
-    # @defer.inlineCallbacks
-    # def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
-    #     """Massively subscribe to many nodes.
-
-    #     @param service (JID): target service.
-    #     @param nodeIdentifiers (list): the list of node identifiers to subscribe to.
-    #     @param sub_id (str): optional subscription identifier.
-    #     @param options (list): optional list of subscription options
-    #     @param profile_key (str): %(doc_profile_key)s
-    #     @return: list of Deferred instances.
-    #     """
-    #     client = self.host.getClient(profile_key)
-    #     found_nodes = yield self.listNodes(service, profile=client.profile)
-    #     subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
-    #     d_list = []
-    #     for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
-    #         if nodeIdentifier not in found_nodes:
-    #             log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
-    #             continue  # avoid sat-pubsub "SubscriptionExists" error
-    #         d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
-    #     defer.returnValue(d_list)
-
     def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
         client = self.host.getClient(profile_key)
         return client.pubsub_client.subscriptions(service, nodeIdentifier)
 
+    ## methods to manage several stanzas/jids at once ##
+
+    # subscribe #
+
+    def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
+        """Get real-time results for subcribeToManu session
+
+        @param session_id: id of the real-time deferred session
+        @param return (tuple): (remaining, results) where:
+            - remaining is the number of still expected results
+            - results is a list of tuple(unicode, unicode, bool, unicode) with:
+                - service: pubsub service
+                - and node: pubsub node
+                - failure(unicode): empty string in case of success, error message else
+        @param profile_key: %(doc_profile_key)s
+        """
+        profile = self.host.getClient(profile_key).profile
+        d = self.rt_sessions.getResults(session_id, on_success=lambda result:'', on_error=lambda failure:unicode(failure.value), profile=profile)
+        # we need to convert jid.JID to unicode with full() to serialise it for the bridge
+        d.addCallback(lambda ret: (ret[0], [(service.full(), node, '' if success else failure or UNSPECIFIED)
+                                            for (service, node), (success, failure) in ret[1].iteritems()]))
+        return d
+
+    def _subscribeToMany(self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE):
+        return self.subscribeToMany([(jid.JID(service), unicode(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key)
+
+    def subscribeToMany(self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE):
+        """Subscribe to several nodes at once.
+
+        @param node_data (iterable[tuple]): iterable of tuple (service, node) where:
+            - service (jid.JID) is the pubsub service
+            - node (unicode) is the node to subscribe to
+        @param subscriber (jid.JID): optional subscription identifier.
+        @param options (dict): subscription options
+        @param profile_key (str): %(doc_profile_key)s
+        @return (str): RT Deferred session id
+        """
+        client = self.host.getClient(profile_key)
+        deferreds = {}
+        for service, node in node_data:
+            deferreds[(service, node)] = client.pubsub_client.subscribe(service, node, subscriber, options=options)
+        return self.rt_sessions.newSession(deferreds, client.profile)
+        # found_nodes = yield self.listNodes(service, profile=client.profile)
+        # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
+        # d_list = []
+        # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
+        #     if nodeIdentifier not in found_nodes:
+        #         log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
+        #         continue  # avoid sat-pubsub "SubscriptionExists" error
+        #     d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
+        # defer.returnValue(d_list)
+
 
 class SatPubSubClient(rsm.PubSubClient):
     implements(disco.IDisco)