# HG changeset patch # User souliane # Date 1413656592 -7200 # Node ID f584fbda4773e75f4369a0ca5ef51d3a869de3c0 # Parent e56dfe0378a132490937bde5042c3a1524e6e0ab plugin XEP-0060: fixes listing the nodes on the pubsub service diff -r e56dfe0378a1 -r f584fbda4773 src/plugins/plugin_xep_0060.py --- a/src/plugins/plugin_xep_0060.py Sat Oct 18 13:02:41 2014 +0200 +++ b/src/plugins/plugin_xep_0060.py Sat Oct 18 20:23:12 2014 +0200 @@ -60,7 +60,7 @@ self.host = host self.managedNodes = [] self.clients = {} - self.node_cache = Sessions(timeout=30, resettable_timeout=False) + self.node_cache = Sessions(timeout=60, resettable_timeout=False) def getHandler(self, profile): self.clients[profile] = SatPubSubClient(self.host, self) @@ -93,48 +93,64 @@ raise Exception(err_mess) return profile, client - @defer.inlineCallbacks + def _getDeferredNodeCache(self, session_id, init, profile): + """Manage a node cache with deferred initialisation and concurrent access. + + @param session_id (string): node cache session ID + @param init (Deferred): deferred list of strings to initialise the cache. + @param profile (str): %(doc_profile)s + @return: Deferred list[str] + """ + if session_id in self.node_cache: + cache = self.node_cache.profileGet(session_id, profile) + if cache['nodes'] is None: + # init is still running + d = defer.Deferred() + cache['waiting'].append(d) + return d + return defer.succeed(cache['nodes']) + + cache = {'init': init, 'waiting': [], 'nodes': None} + self.node_cache.newSession(cache, session_id=session_id, profile=profile) + + def cb(nodes): + cache['nodes'] = nodes + for d in cache['waiting']: + d.callback(nodes) + return nodes + + return init.addCallback(cb) + def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): - """Retrieve the name of the nodes that are accesible on the target service. + """Retrieve the name of the nodes that are accessible on the target service. @param service (JID): target service @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) @param profile (str): %(doc_profile)s - @return: list[str] + @return: Deferred list[str] """ - session_id = profile + "@found@" + service.userhost() - # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany - if session_id in self.node_cache: - cache = self.node_cache.profileGet(session_id, profile) - else: - # FIXME: we arrive here while the cache is already being built... - result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile) - node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')] - dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) - defer.returnValue(cache) + session_id = profile + '@found@' + service.userhost() + d = self.getDiscoItems(service, nodeIdentifier, profile_key=profile) + d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) + return self._getDeferredNodeCache(session_id, d, profile) - @defer.inlineCallbacks - def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE): + def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): """Retrieve the name of the nodes to which the profile is subscribed on the target service. @param service (JID): target service @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) - @param filter (str): filter the result according to the given subscription type: + @param filter_ (str): filter the result according to the given subscription type: - None: do not filter - 'pending': subscription has been approved yet by the node owner - 'unconfigured': subscription options have not been configured yet - 'subscribed': subscription is complete @param profile (str): %(doc_profile)s - @return: list[str] + @return: Deferred list[str] """ - session_id = profile + "@subscriptions@" + service.userhost() - if session_id in self.node_cache: - cache = self.node_cache.profileGet(session_id, profile) - else: - subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile) - node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter] - dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) - defer.returnValue(cache) + session_id = profile + '@subscriptions@' + service.userhost() + d = self.subscriptions(service, nodeIdentifier, profile_key=profile) + d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) + return self._getDeferredNodeCache(session_id, d, profile) def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'publish item')