# HG changeset patch # User souliane # Date 1411411753 -7200 # Node ID 318eab3f93f8843b2fccad9cde2cdd90e13578c4 # Parent 8ad37c3d58a9b458651c633f5c1955d9973b000c plugin XEP-0060, groupblog: avoid unecessary pubsub errors while doing massive requests: - don't try to retrieve items from non accessible nodes - don't try to subscribe to non accessible or already subscribed nodes diff -r 8ad37c3d58a9 -r 318eab3f93f8 src/plugins/plugin_misc_groupblog.py --- a/src/plugins/plugin_misc_groupblog.py Mon Sep 22 20:34:29 2014 +0200 +++ b/src/plugins/plugin_misc_groupblog.py Mon Sep 22 20:49:13 2014 +0200 @@ -481,6 +481,7 @@ @defer.inlineCallbacks def _itemsConstruction(self, items, pub_jid, client): """ Transforms items to group blog data and manage comments node + @param items: iterable of items @param pub_jid: jid of the publisher or None to use items data @param client: SatXMPPClient instance @@ -494,15 +495,20 @@ except AttributeError: pass ret.append(gbdata) - # if there is a comments node, we subscribe to it - if "comments_node" in gbdata: + # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case) + if "comments_node" in gbdata and pub_jid.userhostJID() != client.jid.userhostJID(): try: - # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case) - if pub_jid.userhostJID() != client.jid.userhostJID(): - self.host.plugins["XEP-0060"].subscribe(jid.JID(gbdata["comments_service"]), gbdata["comments_node"], - profile_key=client.profile) + service = jid.JID(gbdata["comments_service"]) + node = gbdata["comments_node"] except KeyError: log.warning("Missing key for comments") + continue + # TODO: see if it is really needed to check for not subscribing twice to the node + # It previously worked without this check, but the pubsub service logs were polluted + # or, if in debug mode, it made sat-pubsub very difficult to debug. + subscribed_nodes = yield self.host.plugins['XEP-0060'].listSubscribedNodes(service, profile=client.profile) + if node not in subscribed_nodes: # avoid sat-pubsub "SubscriptionExists" error + self.host.plugins["XEP-0060"].subscribe(service, node, profile_key=client.profile) defer.returnValue(ret) def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE): @@ -645,6 +651,7 @@ publishers_jids = publishers return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key) + @defer.inlineCallbacks def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): """Get the last published microblogs for a list of groups or jids @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") @@ -652,58 +659,36 @@ @param max_items: how many microblogs we want to get @param profile_key: profile key """ - - def sendResult(result): - """send result of DeferredList (dict of jid => microblogs) to the calling method""" - - ret = {} - - for (success, value) in result: - if success: - source_jid, data = value - ret[source_jid] = data - - return ret - - def initialised(result): - profile, client = result - - if publishers_type == "ALL": - contacts = client.roster.getItems() - jids = [contact.jid.userhostJID() for contact in contacts] - elif publishers_type == "GROUP": - jids = [] - for _group in publishers: - jids.extend(client.roster.getJidsFromGroup(_group)) - elif publishers_type == 'JID': - jids = publishers - else: - raise UnknownType - - mblogs = [] - - for jid_ in jids: - d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(jid_), - max_items=max_items, profile_key=profile_key) - d.addCallback(self._itemsConstruction, jid_, client) - d.addCallback(lambda gbdata, source_jid: (source_jid, gbdata), jid_.full()) - - mblogs.append(d) - # consume the failure "StanzaError with condition u'item-not-found'" - # when the node doesn't exist (e.g that JID hasn't posted any message) - dlist = defer.DeferredList(mblogs, consumeErrors=True) - dlist.addCallback(sendResult) - - return dlist - #TODO: custom exception if publishers_type not in ["GROUP", "JID", "ALL"]: raise Exception("Bad call, unknown publishers_type") if publishers_type == "ALL" and publishers: raise Exception("Publishers list must be empty when getting microblogs for all contacts") - return self._initialise(profile_key).addCallback(initialised) + profile, client = yield self._initialise(profile_key) #TODO: we need to use the server corresponding the the host of the jid + if publishers_type == "ALL": + contacts = client.roster.getItems() + jids = [contact.jid.userhostJID() for contact in contacts] + elif publishers_type == "GROUP": + jids = [] + for _group in publishers: + jids.extend(client.roster.getJidsFromGroup(_group)) + elif publishers_type == 'JID': + jids = publishers + else: + raise UnknownType + + data = {publisher: self.getNodeName(publisher) for publisher in jids} + d_dict = yield self.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, max_items=max_items, profile_key=profile) + for publisher, d in d_dict.items(): + d.addCallback(self._itemsConstruction, publisher, client) + d.addCallback(lambda gbdata: (publisher.full(), gbdata)) + # consume the failure "StanzaError with condition u'item-not-found'" + # when the node doesn't exist (e.g that JID hasn't posted any message) + result = yield defer.DeferredList(d_dict.values(), consumeErrors=True) + defer.returnValue({value[0]: value[1] for success, value in result if success}) + def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): def initialised(result): profile, client = result @@ -721,46 +706,40 @@ publishers_jids = publishers return self.massiveSubscribeGroupBlogs(publishers_type, publishers_jids, profile_key) + @defer.inlineCallbacks def massiveSubscribeGroupBlogs(self, publishers_type, publishers, profile_key=C.PROF_KEY_NONE): """Subscribe microblogs for a list of groups or jids @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) @param profile_key: profile key """ - - def initialised(result): - profile, client = result - - if publishers_type == "ALL": - contacts = client.roster.getItems() - jids = [contact.jid.userhostJID() for contact in contacts] - elif publishers_type == "GROUP": - jids = [] - for _group in publishers: - jids.extend(client.roster.getJidsFromGroup(_group)) - elif publishers_type == 'JID': - jids = publishers - else: - raise UnknownType - - mblogs = [] - for jid_ in jids: - d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid_), - profile_key=profile_key) - mblogs.append(d) - # consume the failure "StanzaError with condition u'item-not-found'" - # when the node doesn't exist (e.g that JID hasn't posted any message) - dlist = defer.DeferredList(mblogs, consumeErrors=True) - return dlist - #TODO: custom exception if publishers_type not in ["GROUP", "JID", "ALL"]: raise Exception("Bad call, unknown publishers_type") if publishers_type == "ALL" and publishers: raise Exception("Publishers list must be empty when getting microblogs for all contacts") - return self._initialise(profile_key).addCallback(initialised) + profile, client = yield self._initialise(profile_key) #TODO: we need to use the server corresponding the the host of the jid + if publishers_type == "ALL": + contacts = client.roster.getItems() + jids = [contact.jid.userhostJID() for contact in contacts] + elif publishers_type == "GROUP": + jids = [] + for _group in publishers: + jids.extend(client.roster.getJidsFromGroup(_group)) + elif publishers_type == 'JID': + jids = publishers + else: + raise UnknownType + + node_ids = [self.getNodeName(publisher) for publisher in jids] + d_list = yield self.host.plugins["XEP-0060"].subscribeToMany(client.item_access_pubsub, node_ids, profile_key=profile_key) + # consume the failure "StanzaError with condition u'item-not-found'" + # when the node doesn't exist (e.g that JID hasn't posted any message) + result = yield defer.DeferredList(d_list, consumeErrors=True) + defer.returnValue(result) + def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): """Delete absolutely all the microblog data that the user has posted""" calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)] diff -r 8ad37c3d58a9 -r 318eab3f93f8 src/plugins/plugin_xep_0060.py --- a/src/plugins/plugin_xep_0060.py Mon Sep 22 20:34:29 2014 +0200 +++ b/src/plugins/plugin_xep_0060.py Mon Sep 22 20:49:13 2014 +0200 @@ -21,9 +21,13 @@ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) -from wokkel.pubsub import PubSubRequest +from sat.memory.memory import Sessions + from wokkel import disco, pubsub +from wokkel.pubsub import PubSubRequest, NS_PUBSUB from zope.interface import implements +from twisted.internet import defer + PLUGIN_INFO = { "name": "Publish-Subscribe", @@ -54,17 +58,7 @@ self.host = host self.managedNodes = [] self.clients = {} - """host.bridge.addMethod("getItems", ".plugin", in_sign='ssa{ss}s', out_sign='as', method=self.getItems, - async = True, - doc = { 'summary':'retrieve items', - 'param_0':'service: pubsub service', - 'param_1':'node: node identifier', - 'param_2':'\n'.join(['options: can be:', - '- max_items: see XEP-0060 #6.5.7', - '- sub_id: subscription identifier, see XEP-0060 #7.2.2.2']), - 'param_3':'%(doc_profile)s', - 'return':'array of raw XML (content of the items)' - })""" + self.node_cache = Sessions(timeout=30, resettable_timeout=False) def getHandler(self, profile): self.clients[profile] = SatPubSubClient(self.host, self) @@ -97,6 +91,49 @@ raise Exception(err_mess) return profile, client + @defer.inlineCallbacks + def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): + """Retrieve the name of the nodes that are accesible on the target service. + + @param service (JID): target service + @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) + @param profile (str): %(doc_profile)s + @return: list[str] + """ + session_id = profile + "@found@" + service.userhost() + # FIXME: this can be called from self.subscribeToMany before the cache has been built by self.getItemsFromMany + if session_id in self.node_cache: + cache = self.node_cache.profileGet(session_id, profile) + else: + # FIXME: we arrive here while the cache is already being built... + result = yield self.getDiscoItems(service, nodeIdentifier, profile_key=profile) + node_names = [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')] + dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) + defer.returnValue(cache) + + @defer.inlineCallbacks + def listSubscribedNodes(self, service, nodeIdentifier='', filter='subscribed', profile=C.PROF_KEY_NONE): + """Retrieve the name of the nodes to which the profile is subscribed on the target service. + + @param service (JID): target service + @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) + @param filter (str): filter the result according to the given subscription type: + - None: do not filter + - 'pending': subscription has been approved yet by the node owner + - 'unconfigured': subscription options have not been configured yet + - 'subscribed': subscription is complete + @param profile (str): %(doc_profile)s + @return: list[str] + """ + session_id = profile + "@subscriptions@" + service.userhost() + if session_id in self.node_cache: + cache = self.node_cache.profileGet(session_id, profile) + else: + subs = yield self.subscriptions(service, nodeIdentifier, profile_key=profile) + node_names = [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter] + dummy, cache = self.node_cache.newSession(node_names, session_id=session_id, profile=profile) + defer.returnValue(cache) + def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'publish item') return client.publish(service, nodeIdentifier, items, client.parent.jid) @@ -105,6 +142,28 @@ profile, client = self.__getClientNProfile(profile_key, 'get items') return client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) + @defer.inlineCallbacks + def getItemsFromMany(self, service, data, max_items=None, item_ids=None, sub_id=None, profile_key=C.PROF_KEY_NONE): + """Massively retrieve pubsub items from many nodes. + + @param service (JID): target service. + @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. + @param max_items (int): optional limit on the number of retrieved items *per node*. + @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used). + @param sub_id (str): optional subscription identifier. + @param profile_key (str): %(doc_profile_key)s + @return: dict binding a subset of the keys of data to Deferred instances. + """ + profile, client = self.__getClientNProfile(profile_key, 'get items') + found_nodes = yield self.listNodes(service, profile=profile) + d_dict = {} + for publisher, node in data.items(): + if node not in found_nodes: + log.info("Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) + continue # avoid pubsub "item-not-found" error + d_dict[publisher] = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid) + defer.returnValue(d_dict) + def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): profile, client = self.__getClientNProfile(profile_key, 'get options') return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) @@ -129,6 +188,36 @@ profile, client = self.__getClientNProfile(profile_key, 'subscribe node') return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) + @defer.inlineCallbacks + def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): + """Massively subscribe to many nodes. + + @param service (JID): target service. + @param nodeIdentifiers (list): the list of node identifiers to subscribe to. + @param sub_id (str): optional subscription identifier. + @param options (list): optional list of subscription options + @param profile_key (str): %(doc_profile_key)s + @return: dict binding a subset of the keys of data to Deferred instances. + """ + profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes') + found_nodes = yield self.listNodes(service, profile=profile) + subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile) + d_list = [] + for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): + if nodeIdentifier not in found_nodes: + log.info("Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) + continue # avoid sat-pubsub "SubscriptionExists" error + d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)) + defer.returnValue(d_list) + + def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): + profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions') + return client.subscriptions(service, nodeIdentifier) + + def getDiscoItems(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): + profile, client = self.__getClientNProfile(profile_key, 'disco items') + return client.getDiscoItems(None, service, nodeIdentifier) + class SatPubSubClient(pubsub.PubSubClient): implements(disco.IDisco) @@ -158,7 +247,7 @@ @param maxItems: Optional limit on the number of retrieved items. @type maxItems: C{int} - @param itemIdentifiers: Identifiers of the items to be retracted. + @param itemIdentifiers: Identifiers of the items to be retrieved. @type itemIdentifiers: C{set} @param subscriptionIdentifier: Optional subscription identifier. In @@ -166,8 +255,6 @@ the results to the specific subscription. @type subscriptionIdentifier: C{unicode} """ - NS_PUBSUB = 'http://jabber.org/protocol/pubsub' - request = PubSubRequest('items') request.recipient = service request.nodeIdentifier = nodeIdentifier @@ -220,12 +307,27 @@ # def purgeReceived(self, event): - + @defer.inlineCallbacks + def subscriptions(self, service, nodeIdentifier, sender=None): + """Return the list of subscriptions to the given service and node. - def getDiscoInfo(self, requestor, target, nodeIdentifier=''): - _disco_info = [] - self.host.trigger.point("PubSub Disco Info", _disco_info, self.parent.profile) - return _disco_info + @param service: The publish subscribe service to retrieve the subscriptions from. + @type service: L{JID} + @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). + @type nodeIdentifier: C{unicode} + """ + request = PubSubRequest('subscriptions') + request.recipient = service + request.nodeIdentifier = nodeIdentifier + request.sender = sender + iq = yield request.send(self.xmlstream) + defer.returnValue([sub for sub in iq.pubsub.subscriptions.elements() if + (sub.uri == NS_PUBSUB and sub.name == 'subscription')]) - def getDiscoItems(self, requestor, target, nodeIdentifier=''): - return [] + def getDiscoInfo(self, requestor, service, nodeIdentifier=''): + disco_info = [] + self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) + return disco_info + + def getDiscoItems(self, requestor, service, nodeIdentifier=''): + return self.host.getDiscoItems(service, nodeIdentifier, self.parent.profile)