diff src/plugins/plugin_xep_0060.py @ 1242:f584fbda4773

plugin XEP-0060: fixes listing the nodes on the pubsub service
author souliane <souliane@mailoo.org>
date Sat, 18 Oct 2014 20:23:12 +0200
parents 16484ebb695b
children b4a264915ea9
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0060.py	Sat Oct 18 13:02:41 2014 +0200
+++ b/src/plugins/plugin_xep_0060.py	Sat Oct 18 20:23:12 2014 +0200
@@ -60,7 +60,7 @@
         self.host = host
         self.managedNodes = []
         self.clients = {}
-        self.node_cache = Sessions(timeout=30, resettable_timeout=False)
+        self.node_cache = Sessions(timeout=60, resettable_timeout=False)
 
     def getHandler(self, profile):
         self.clients[profile] = SatPubSubClient(self.host, self)
@@ -93,48 +93,64 @@
             raise Exception(err_mess)
         return profile, client
 
-    @defer.inlineCallbacks
+    def _getDeferredNodeCache(self, session_id, init, profile):
+        """Manage a node cache with deferred initialisation and concurrent access.
+
+        @param session_id (string): node cache session ID
+        @param init (Deferred): deferred list of strings to initialise the cache.
+        @param profile (str): %(doc_profile)s
+        @return: Deferred list[str]
+        """
+        if session_id in self.node_cache:
+            cache = self.node_cache.profileGet(session_id, profile)
+            if cache['nodes'] is None:
+                # init is still running
+                d = defer.Deferred()
+                cache['waiting'].append(d)
+                return d
+            return defer.succeed(cache['nodes'])
+
+        cache = {'init': init, 'waiting': [], 'nodes': None}
+        self.node_cache.newSession(cache, session_id=session_id, profile=profile)
+
+        def cb(nodes):
+            cache['nodes'] = nodes
+            for d in cache['waiting']:
+                d.callback(nodes)
+            return nodes
+
+        return init.addCallback(cb)
+
     def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
-        """Retrieve the name of the nodes that are accesible on the target service.
+        """Retrieve the name of the nodes that are accessible on the target service.
 
         @param service (JID): target service
         @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
         @param profile (str): %(doc_profile)s
-        @return: list[str]
+        @return: Deferred list[str]
         """
-        session_id = profile + "@found@" + service.userhost()
-        # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany
-        if session_id in self.node_cache:
-            cache = self.node_cache.profileGet(session_id, profile)
-        else:
-            # FIXME: we arrive here while the cache is already being built...
-            result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile)
-            node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]
-            dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
-        defer.returnValue(cache)
+        session_id = profile + '@found@' + service.userhost()
+        d = self.getDiscoItems(service, nodeIdentifier, profile_key=profile)
+        d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
+        return self._getDeferredNodeCache(session_id, d, profile)
 
-    @defer.inlineCallbacks
-    def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE):
+    def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
         """Retrieve the name of the nodes to which the profile is subscribed on the target service.
 
         @param service (JID): target service
         @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
-        @param filter (str): filter the result according to the given subscription type:
+        @param filter_ (str): filter the result according to the given subscription type:
             - None: do not filter
             - 'pending': subscription has been approved yet by the node owner
             - 'unconfigured': subscription options have not been configured yet
             - 'subscribed': subscription is complete
         @param profile (str): %(doc_profile)s
-        @return: list[str]
+        @return: Deferred list[str]
         """
-        session_id = profile + "@subscriptions@" + service.userhost()
-        if session_id in self.node_cache:
-            cache = self.node_cache.profileGet(session_id, profile)
-        else:
-            subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile)
-            node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter]
-            dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
-        defer.returnValue(cache)
+        session_id = profile + '@subscriptions@' + service.userhost()
+        d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
+        d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
+        return self._getDeferredNodeCache(session_id, d, profile)
 
     def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
         profile, client = self.__getClientNProfile(profile_key, 'publish item')