comparison src/plugins/plugin_misc_groupblog.py @ 1217:318eab3f93f8

plugin XEP-0060, groupblog: avoid unecessary pubsub errors while doing massive requests: - don't try to retrieve items from non accessible nodes - don't try to subscribe to non accessible or already subscribed nodes
author souliane <souliane@mailoo.org>
date Mon, 22 Sep 2014 20:49:13 +0200
parents 4be53c14845e
children 0b87d029f0a3
comparison
equal deleted inserted replaced
1216:8ad37c3d58a9 1217:318eab3f93f8
479 479
480 480
481 @defer.inlineCallbacks 481 @defer.inlineCallbacks
482 def _itemsConstruction(self, items, pub_jid, client): 482 def _itemsConstruction(self, items, pub_jid, client):
483 """ Transforms items to group blog data and manage comments node 483 """ Transforms items to group blog data and manage comments node
484
484 @param items: iterable of items 485 @param items: iterable of items
485 @param pub_jid: jid of the publisher or None to use items data 486 @param pub_jid: jid of the publisher or None to use items data
486 @param client: SatXMPPClient instance 487 @param client: SatXMPPClient instance
487 @return: deferred which fire list of group blog data """ 488 @return: deferred which fire list of group blog data """
488 # TODO: use items data when pub_jid is None 489 # TODO: use items data when pub_jid is None
492 try: 493 try:
493 gbdata['service'] = client.item_access_pubsub.full() 494 gbdata['service'] = client.item_access_pubsub.full()
494 except AttributeError: 495 except AttributeError:
495 pass 496 pass
496 ret.append(gbdata) 497 ret.append(gbdata)
497 # if there is a comments node, we subscribe to it 498 # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case)
498 if "comments_node" in gbdata: 499 if "comments_node" in gbdata and pub_jid.userhostJID() != client.jid.userhostJID():
499 try: 500 try:
500 # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case) 501 service = jid.JID(gbdata["comments_service"])
501 if pub_jid.userhostJID() != client.jid.userhostJID(): 502 node = gbdata["comments_node"]
502 self.host.plugins["XEP-0060"].subscribe(jid.JID(gbdata["comments_service"]), gbdata["comments_node"],
503 profile_key=client.profile)
504 except KeyError: 503 except KeyError:
505 log.warning("Missing key for comments") 504 log.warning("Missing key for comments")
505 continue
506 # TODO: see if it is really needed to check for not subscribing twice to the node
507 # It previously worked without this check, but the pubsub service logs were polluted
508 # or, if in debug mode, it made sat-pubsub very difficult to debug.
509 subscribed_nodes = yield self.host.plugins['XEP-0060'].listSubscribedNodes(service, profile=client.profile)
510 if node not in subscribed_nodes: # avoid sat-pubsub "SubscriptionExists" error
511 self.host.plugins["XEP-0060"].subscribe(service, node, profile_key=client.profile)
506 defer.returnValue(ret) 512 defer.returnValue(ret)
507 513
508 def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE): 514 def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE):
509 """Retrieve previously published items from a publish subscribe node. 515 """Retrieve previously published items from a publish subscribe node.
510 @param pub_jid_s: jid of the publisher 516 @param pub_jid_s: jid of the publisher
643 publishers_jids = [jid.JID(publisher) for publisher in publishers] 649 publishers_jids = [jid.JID(publisher) for publisher in publishers]
644 else: 650 else:
645 publishers_jids = publishers 651 publishers_jids = publishers
646 return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key) 652 return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key)
647 653
654 @defer.inlineCallbacks
648 def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): 655 def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE):
649 """Get the last published microblogs for a list of groups or jids 656 """Get the last published microblogs for a list of groups or jids
650 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") 657 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL")
651 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) 658 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids)
652 @param max_items: how many microblogs we want to get 659 @param max_items: how many microblogs we want to get
653 @param profile_key: profile key 660 @param profile_key: profile key
654 """ 661 """
655
656 def sendResult(result):
657 """send result of DeferredList (dict of jid => microblogs) to the calling method"""
658
659 ret = {}
660
661 for (success, value) in result:
662 if success:
663 source_jid, data = value
664 ret[source_jid] = data
665
666 return ret
667
668 def initialised(result):
669 profile, client = result
670
671 if publishers_type == "ALL":
672 contacts = client.roster.getItems()
673 jids = [contact.jid.userhostJID() for contact in contacts]
674 elif publishers_type == "GROUP":
675 jids = []
676 for _group in publishers:
677 jids.extend(client.roster.getJidsFromGroup(_group))
678 elif publishers_type == 'JID':
679 jids = publishers
680 else:
681 raise UnknownType
682
683 mblogs = []
684
685 for jid_ in jids:
686 d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(jid_),
687 max_items=max_items, profile_key=profile_key)
688 d.addCallback(self._itemsConstruction, jid_, client)
689 d.addCallback(lambda gbdata, source_jid: (source_jid, gbdata), jid_.full())
690
691 mblogs.append(d)
692 # consume the failure "StanzaError with condition u'item-not-found'"
693 # when the node doesn't exist (e.g that JID hasn't posted any message)
694 dlist = defer.DeferredList(mblogs, consumeErrors=True)
695 dlist.addCallback(sendResult)
696
697 return dlist
698
699 #TODO: custom exception 662 #TODO: custom exception
700 if publishers_type not in ["GROUP", "JID", "ALL"]: 663 if publishers_type not in ["GROUP", "JID", "ALL"]:
701 raise Exception("Bad call, unknown publishers_type") 664 raise Exception("Bad call, unknown publishers_type")
702 if publishers_type == "ALL" and publishers: 665 if publishers_type == "ALL" and publishers:
703 raise Exception("Publishers list must be empty when getting microblogs for all contacts") 666 raise Exception("Publishers list must be empty when getting microblogs for all contacts")
704 return self._initialise(profile_key).addCallback(initialised) 667 profile, client = yield self._initialise(profile_key)
705 #TODO: we need to use the server corresponding the the host of the jid 668 #TODO: we need to use the server corresponding the the host of the jid
669
670 if publishers_type == "ALL":
671 contacts = client.roster.getItems()
672 jids = [contact.jid.userhostJID() for contact in contacts]
673 elif publishers_type == "GROUP":
674 jids = []
675 for _group in publishers:
676 jids.extend(client.roster.getJidsFromGroup(_group))
677 elif publishers_type == 'JID':
678 jids = publishers
679 else:
680 raise UnknownType
681
682 data = {publisher: self.getNodeName(publisher) for publisher in jids}
683 d_dict = yield self.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, max_items=max_items, profile_key=profile)
684 for publisher, d in d_dict.items():
685 d.addCallback(self._itemsConstruction, publisher, client)
686 d.addCallback(lambda gbdata: (publisher.full(), gbdata))
687 # consume the failure "StanzaError with condition u'item-not-found'"
688 # when the node doesn't exist (e.g that JID hasn't posted any message)
689 result = yield defer.DeferredList(d_dict.values(), consumeErrors=True)
690 defer.returnValue({value[0]: value[1] for success, value in result if success})
706 691
707 def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): 692 def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE):
708 def initialised(result): 693 def initialised(result):
709 profile, client = result 694 profile, client = result
710 d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid.JID(pub_jid)), 695 d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid.JID(pub_jid)),
719 publishers_jids = [jid.JID(publisher) for publisher in publishers] 704 publishers_jids = [jid.JID(publisher) for publisher in publishers]
720 else: 705 else:
721 publishers_jids = publishers 706 publishers_jids = publishers
722 return self.massiveSubscribeGroupBlogs(publishers_type, publishers_jids, profile_key) 707 return self.massiveSubscribeGroupBlogs(publishers_type, publishers_jids, profile_key)
723 708
709 @defer.inlineCallbacks
724 def massiveSubscribeGroupBlogs(self, publishers_type, publishers, profile_key=C.PROF_KEY_NONE): 710 def massiveSubscribeGroupBlogs(self, publishers_type, publishers, profile_key=C.PROF_KEY_NONE):
725 """Subscribe microblogs for a list of groups or jids 711 """Subscribe microblogs for a list of groups or jids
726 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") 712 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL")
727 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) 713 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids)
728 @param profile_key: profile key 714 @param profile_key: profile key
729 """ 715 """
730
731 def initialised(result):
732 profile, client = result
733
734 if publishers_type == "ALL":
735 contacts = client.roster.getItems()
736 jids = [contact.jid.userhostJID() for contact in contacts]
737 elif publishers_type == "GROUP":
738 jids = []
739 for _group in publishers:
740 jids.extend(client.roster.getJidsFromGroup(_group))
741 elif publishers_type == 'JID':
742 jids = publishers
743 else:
744 raise UnknownType
745
746 mblogs = []
747 for jid_ in jids:
748 d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid_),
749 profile_key=profile_key)
750 mblogs.append(d)
751 # consume the failure "StanzaError with condition u'item-not-found'"
752 # when the node doesn't exist (e.g that JID hasn't posted any message)
753 dlist = defer.DeferredList(mblogs, consumeErrors=True)
754 return dlist
755
756 #TODO: custom exception 716 #TODO: custom exception
757 if publishers_type not in ["GROUP", "JID", "ALL"]: 717 if publishers_type not in ["GROUP", "JID", "ALL"]:
758 raise Exception("Bad call, unknown publishers_type") 718 raise Exception("Bad call, unknown publishers_type")
759 if publishers_type == "ALL" and publishers: 719 if publishers_type == "ALL" and publishers:
760 raise Exception("Publishers list must be empty when getting microblogs for all contacts") 720 raise Exception("Publishers list must be empty when getting microblogs for all contacts")
761 return self._initialise(profile_key).addCallback(initialised) 721 profile, client = yield self._initialise(profile_key)
762 #TODO: we need to use the server corresponding the the host of the jid 722 #TODO: we need to use the server corresponding the the host of the jid
723
724 if publishers_type == "ALL":
725 contacts = client.roster.getItems()
726 jids = [contact.jid.userhostJID() for contact in contacts]
727 elif publishers_type == "GROUP":
728 jids = []
729 for _group in publishers:
730 jids.extend(client.roster.getJidsFromGroup(_group))
731 elif publishers_type == 'JID':
732 jids = publishers
733 else:
734 raise UnknownType
735
736 node_ids = [self.getNodeName(publisher) for publisher in jids]
737 d_list = yield self.host.plugins["XEP-0060"].subscribeToMany(client.item_access_pubsub, node_ids, profile_key=profile_key)
738 # consume the failure "StanzaError with condition u'item-not-found'"
739 # when the node doesn't exist (e.g that JID hasn't posted any message)
740 result = yield defer.DeferredList(d_list, consumeErrors=True)
741 defer.returnValue(result)
763 742
764 def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): 743 def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE):
765 """Delete absolutely all the microblog data that the user has posted""" 744 """Delete absolutely all the microblog data that the user has posted"""
766 calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)] 745 calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)]
767 return defer.DeferredList(calls) 746 return defer.DeferredList(calls)