changeset 1217:318eab3f93f8

plugin XEP-0060, groupblog: avoid unecessary pubsub errors while doing massive requests: - don't try to retrieve items from non accessible nodes - don't try to subscribe to non accessible or already subscribed nodes
author souliane <souliane@mailoo.org>
date Mon, 22 Sep 2014 20:49:13 +0200
parents 8ad37c3d58a9
children 3b1c5f723c4b
files src/plugins/plugin_misc_groupblog.py src/plugins/plugin_xep_0060.py
diffstat 2 files changed, 181 insertions(+), 100 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_misc_groupblog.py	Mon Sep 22 20:34:29 2014 +0200
+++ b/src/plugins/plugin_misc_groupblog.py	Mon Sep 22 20:49:13 2014 +0200
@@ -481,6 +481,7 @@
     @defer.inlineCallbacks
     def _itemsConstruction(self, items, pub_jid, client):
         """ Transforms items to group blog data and manage comments node
+
         @param items: iterable of items
         @param pub_jid: jid of the publisher or None to use items data
         @param client: SatXMPPClient instance
@@ -494,15 +495,20 @@
             except AttributeError:
                 pass
             ret.append(gbdata)
-            # if there is a comments node, we subscribe to it
-            if "comments_node" in gbdata:
+            # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case)
+            if "comments_node" in gbdata and pub_jid.userhostJID() != client.jid.userhostJID():
                 try:
-                    # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case)
-                    if pub_jid.userhostJID() != client.jid.userhostJID():
-                        self.host.plugins["XEP-0060"].subscribe(jid.JID(gbdata["comments_service"]), gbdata["comments_node"],
-                                                                profile_key=client.profile)
+                    service = jid.JID(gbdata["comments_service"])
+                    node = gbdata["comments_node"]
                 except KeyError:
                     log.warning("Missing key for comments")
+                    continue
+                # TODO: see if it is really needed to check for not subscribing twice to the node
+                # It previously worked without this check, but the pubsub service logs were polluted
+                # or, if in debug mode, it made sat-pubsub very difficult to debug.
+                subscribed_nodes = yield self.host.plugins['XEP-0060'].listSubscribedNodes(service, profile=client.profile)
+                if node not in subscribed_nodes:  # avoid sat-pubsub "SubscriptionExists" error
+                    self.host.plugins["XEP-0060"].subscribe(service, node, profile_key=client.profile)
         defer.returnValue(ret)
 
     def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE):
@@ -645,6 +651,7 @@
             publishers_jids = publishers
         return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key)
 
+    @defer.inlineCallbacks
     def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE):
         """Get the last published microblogs for a list of groups or jids
         @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL")
@@ -652,58 +659,36 @@
         @param max_items: how many microblogs we want to get
         @param profile_key: profile key
         """
-
-        def sendResult(result):
-            """send result of DeferredList (dict of jid => microblogs) to the calling method"""
-
-            ret = {}
-
-            for (success, value) in result:
-                if success:
-                    source_jid, data = value
-                    ret[source_jid] = data
-
-            return ret
-
-        def initialised(result):
-            profile, client = result
-
-            if publishers_type == "ALL":
-                contacts = client.roster.getItems()
-                jids = [contact.jid.userhostJID() for contact in contacts]
-            elif publishers_type == "GROUP":
-                jids = []
-                for _group in publishers:
-                    jids.extend(client.roster.getJidsFromGroup(_group))
-            elif publishers_type == 'JID':
-                jids = publishers
-            else:
-                raise UnknownType
-
-            mblogs = []
-
-            for jid_ in jids:
-                d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(jid_),
-                                                           max_items=max_items, profile_key=profile_key)
-                d.addCallback(self._itemsConstruction, jid_, client)
-                d.addCallback(lambda gbdata, source_jid: (source_jid, gbdata), jid_.full())
-
-                mblogs.append(d)
-            # consume the failure "StanzaError with condition u'item-not-found'"
-            # when the node doesn't exist (e.g that JID hasn't posted any message)
-            dlist = defer.DeferredList(mblogs, consumeErrors=True)
-            dlist.addCallback(sendResult)
-
-            return dlist
-
         #TODO: custom exception
         if publishers_type not in ["GROUP", "JID", "ALL"]:
             raise Exception("Bad call, unknown publishers_type")
         if publishers_type == "ALL" and publishers:
             raise Exception("Publishers list must be empty when getting microblogs for all contacts")
-        return self._initialise(profile_key).addCallback(initialised)
+        profile, client = yield self._initialise(profile_key)
         #TODO: we need to use the server corresponding the the host of the jid
 
+        if publishers_type == "ALL":
+            contacts = client.roster.getItems()
+            jids = [contact.jid.userhostJID() for contact in contacts]
+        elif publishers_type == "GROUP":
+            jids = []
+            for _group in publishers:
+                jids.extend(client.roster.getJidsFromGroup(_group))
+        elif publishers_type == 'JID':
+            jids = publishers
+        else:
+            raise UnknownType
+
+        data = {publisher: self.getNodeName(publisher) for publisher in jids}
+        d_dict = yield self.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, max_items=max_items, profile_key=profile)
+        for publisher, d in d_dict.items():
+            d.addCallback(self._itemsConstruction, publisher, client)
+            d.addCallback(lambda gbdata: (publisher.full(), gbdata))
+        # consume the failure "StanzaError with condition u'item-not-found'"
+        # when the node doesn't exist (e.g that JID hasn't posted any message)
+        result = yield defer.DeferredList(d_dict.values(), consumeErrors=True)
+        defer.returnValue({value[0]: value[1] for success, value in result if success})
+
     def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE):
         def initialised(result):
             profile, client = result
@@ -721,46 +706,40 @@
             publishers_jids = publishers
         return self.massiveSubscribeGroupBlogs(publishers_type, publishers_jids, profile_key)
 
+    @defer.inlineCallbacks
     def massiveSubscribeGroupBlogs(self, publishers_type, publishers, profile_key=C.PROF_KEY_NONE):
         """Subscribe microblogs for a list of groups or jids
         @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL")
         @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids)
         @param profile_key: profile key
         """
-
-        def initialised(result):
-            profile, client = result
-
-            if publishers_type == "ALL":
-                contacts = client.roster.getItems()
-                jids = [contact.jid.userhostJID() for contact in contacts]
-            elif publishers_type == "GROUP":
-                jids = []
-                for _group in publishers:
-                    jids.extend(client.roster.getJidsFromGroup(_group))
-            elif publishers_type == 'JID':
-                jids = publishers
-            else:
-                raise UnknownType
-
-            mblogs = []
-            for jid_ in jids:
-                d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid_),
-                                                            profile_key=profile_key)
-                mblogs.append(d)
-            # consume the failure "StanzaError with condition u'item-not-found'"
-            # when the node doesn't exist (e.g that JID hasn't posted any message)
-            dlist = defer.DeferredList(mblogs, consumeErrors=True)
-            return dlist
-
         #TODO: custom exception
         if publishers_type not in ["GROUP", "JID", "ALL"]:
             raise Exception("Bad call, unknown publishers_type")
         if publishers_type == "ALL" and publishers:
             raise Exception("Publishers list must be empty when getting microblogs for all contacts")
-        return self._initialise(profile_key).addCallback(initialised)
+        profile, client = yield self._initialise(profile_key)
         #TODO: we need to use the server corresponding the the host of the jid
 
+        if publishers_type == "ALL":
+            contacts = client.roster.getItems()
+            jids = [contact.jid.userhostJID() for contact in contacts]
+        elif publishers_type == "GROUP":
+            jids = []
+            for _group in publishers:
+                jids.extend(client.roster.getJidsFromGroup(_group))
+        elif publishers_type == 'JID':
+            jids = publishers
+        else:
+            raise UnknownType
+
+        node_ids = [self.getNodeName(publisher) for publisher in jids]
+        d_list = yield self.host.plugins["XEP-0060"].subscribeToMany(client.item_access_pubsub, node_ids, profile_key=profile_key)
+        # consume the failure "StanzaError with condition u'item-not-found'"
+        # when the node doesn't exist (e.g that JID hasn't posted any message)
+        result = yield defer.DeferredList(d_list, consumeErrors=True)
+        defer.returnValue(result)
+
     def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE):
         """Delete absolutely all the microblog data that the user has posted"""
         calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)]
--- a/src/plugins/plugin_xep_0060.py	Mon Sep 22 20:34:29 2014 +0200
+++ b/src/plugins/plugin_xep_0060.py	Mon Sep 22 20:49:13 2014 +0200
@@ -21,9 +21,13 @@
 from sat.core.constants import Const as C
 from sat.core.log import getLogger
 log = getLogger(__name__)
-from wokkel.pubsub import PubSubRequest
+from sat.memory.memory import Sessions
+
 from wokkel import disco, pubsub
+from wokkel.pubsub import PubSubRequest, NS_PUBSUB
 from zope.interface import implements
+from twisted.internet import defer
+
 
 PLUGIN_INFO = {
     "name": "Publish-Subscribe",
@@ -54,17 +58,7 @@
         self.host = host
         self.managedNodes = []
         self.clients = {}
-        """host.bridge.addMethod("getItems", ".plugin", in_sign='ssa{ss}s', out_sign='as', method=self.getItems,
-                              async = True,
-                              doc = { 'summary':'retrieve items',
-                                      'param_0':'service: pubsub service',
-                                      'param_1':'node: node identifier',
-                                      'param_2':'\n'.join(['options: can be:',
-                                          '- max_items: see XEP-0060 #6.5.7',
-                                          '- sub_id: subscription identifier, see XEP-0060 #7.2.2.2']),
-                                      'param_3':'%(doc_profile)s',
-                                      'return':'array of raw XML (content of the items)'
-                                    })"""
+        self.node_cache = Sessions(timeout=30, resettable_timeout=False)
 
     def getHandler(self, profile):
         self.clients[profile] = SatPubSubClient(self.host, self)
@@ -97,6 +91,49 @@
             raise Exception(err_mess)
         return profile, client
 
+    @defer.inlineCallbacks
+    def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
+        """Retrieve the name of the nodes that are accesible on the target service.
+
+        @param service (JID): target service
+        @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
+        @param profile (str): %(doc_profile)s
+        @return: list[str]
+        """
+        session_id = profile + "@found@" + service.userhost()
+        # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany
+        if session_id in self.node_cache:
+            cache = self.node_cache.profileGet(session_id, profile)
+        else:
+            # FIXME: we arrive here while the cache is already being built...
+            result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile)
+            node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]
+            dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
+        defer.returnValue(cache)
+
+    @defer.inlineCallbacks
+    def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE):
+        """Retrieve the name of the nodes to which the profile is subscribed on the target service.
+
+        @param service (JID): target service
+        @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
+        @param filter (str): filter the result according to the given subscription type:
+            - None: do not filter
+            - 'pending': subscription has been approved yet by the node owner
+            - 'unconfigured': subscription options have not been configured yet
+            - 'subscribed': subscription is complete
+        @param profile (str): %(doc_profile)s
+        @return: list[str]
+        """
+        session_id = profile + "@subscriptions@" + service.userhost()
+        if session_id in self.node_cache:
+            cache = self.node_cache.profileGet(session_id, profile)
+        else:
+            subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile)
+            node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter]
+            dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile)
+        defer.returnValue(cache)
+
     def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
         profile, client = self.__getClientNProfile(profile_key, 'publish item')
         return client.publish(service, nodeIdentifier, items, client.parent.jid)
@@ -105,6 +142,28 @@
         profile, client = self.__getClientNProfile(profile_key, 'get items')
         return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid)
 
+    @defer.inlineCallbacks
+    def getItemsFromMany(self, service, data, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE):
+        """Massively retrieve pubsub items from many nodes.
+
+        @param service (JID): target service.
+        @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
+        @param max_items (int): optional limit on the number of retrieved items *per node*.
+        @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used).
+        @param sub_id (str): optional subscription identifier.
+        @param profile_key (str): %(doc_profile_key)s
+        @return: dict binding a subset of the keys of data to Deferred instances.
+        """
+        profile, client = self.__getClientNProfile(profile_key, 'get items')
+        found_nodes = yield self.listNodes(service, profile=profile)
+        d_dict = {}
+        for publisher, node in data.items():
+            if node not in found_nodes:
+                log.info("Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
+                continue  # avoid pubsub "item-not-found" error
+            d_dict[publisher] = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid)
+        defer.returnValue(d_dict)
+
     def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
         profile, client = self.__getClientNProfile(profile_key, 'get options')
         return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier)
@@ -129,6 +188,36 @@
         profile, client = self.__getClientNProfile(profile_key, 'subscribe node')
         return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)
 
+    @defer.inlineCallbacks
+    def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
+        """Massively subscribe to many nodes.
+
+        @param service (JID): target service.
+        @param nodeIdentifiers (list): the list of node identifiers to subscribe to.
+        @param sub_id (str): optional subscription identifier.
+        @param options (list): optional list of subscription options
+        @param profile_key (str): %(doc_profile_key)s
+        @return: dict binding a subset of the keys of data to Deferred instances.
+        """
+        profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes')
+        found_nodes = yield self.listNodes(service, profile=profile)
+        subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile)
+        d_list = []
+        for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
+            if nodeIdentifier not in found_nodes:
+                log.info("Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
+                continue  # avoid sat-pubsub "SubscriptionExists" error
+            d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options))
+        defer.returnValue(d_list)
+
+    def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
+        profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions')
+        return client.subscriptions(service, nodeIdentifier)
+
+    def getDiscoItems(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
+        profile, client = self.__getClientNProfile(profile_key, 'disco items')
+        return client.getDiscoItems(None, service, nodeIdentifier)
+
 
 class SatPubSubClient(pubsub.PubSubClient):
     implements(disco.IDisco)
@@ -158,7 +247,7 @@
         @param maxItems: Optional limit on the number of retrieved items.
         @type maxItems: C{int}
 
-        @param itemIdentifiers: Identifiers of the items to be retracted.
+        @param itemIdentifiers: Identifiers of the items to be retrieved.
         @type itemIdentifiers: C{set}
 
         @param subscriptionIdentifier: Optional subscription identifier. In
@@ -166,8 +255,6 @@
             the results to the specific subscription.
         @type subscriptionIdentifier: C{unicode}
         """
-        NS_PUBSUB = 'http://jabber.org/protocol/pubsub'
-
         request = PubSubRequest('items')
         request.recipient = service
         request.nodeIdentifier = nodeIdentifier
@@ -220,12 +307,27 @@
 
     # def purgeReceived(self, event):
 
-
+    @defer.inlineCallbacks
+    def subscriptions(self, service, nodeIdentifier, sender=None):
+        """Return the list of subscriptions to the given service and node.
 
-    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
-        _disco_info = []
-        self.host.trigger.point("PubSub Disco Info", _disco_info, self.parent.profile)
-        return _disco_info
+        @param service: The publish subscribe service to retrieve the subscriptions from.
+        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
+        @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
+        @type nodeIdentifier: C{unicode}
+        """
+        request = PubSubRequest('subscriptions')
+        request.recipient = service
+        request.nodeIdentifier = nodeIdentifier
+        request.sender = sender
+        iq = yield request.send(self.xmlstream)
+        defer.returnValue([sub for sub in iq.pubsub.subscriptions.elements() if
+                           (sub.uri == NS_PUBSUB and sub.name == 'subscription')])
 
-    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
-        return []
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
+        disco_info = []
+        self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
+        return disco_info
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=''):
+        return self.host.getDiscoItems(service, nodeIdentifier, self.parent.profile)