Mercurial > libervia-backend
comparison src/plugins/plugin_misc_groupblog.py @ 1217:318eab3f93f8
plugin XEP-0060, groupblog: avoid unecessary pubsub errors while doing massive requests:
- don't try to retrieve items from non accessible nodes
- don't try to subscribe to non accessible or already subscribed nodes
author | souliane <souliane@mailoo.org> |
---|---|
date | Mon, 22 Sep 2014 20:49:13 +0200 |
parents | 4be53c14845e |
children | 0b87d029f0a3 |
comparison
equal
deleted
inserted
replaced
1216:8ad37c3d58a9 | 1217:318eab3f93f8 |
---|---|
479 | 479 |
480 | 480 |
481 @defer.inlineCallbacks | 481 @defer.inlineCallbacks |
482 def _itemsConstruction(self, items, pub_jid, client): | 482 def _itemsConstruction(self, items, pub_jid, client): |
483 """ Transforms items to group blog data and manage comments node | 483 """ Transforms items to group blog data and manage comments node |
484 | |
484 @param items: iterable of items | 485 @param items: iterable of items |
485 @param pub_jid: jid of the publisher or None to use items data | 486 @param pub_jid: jid of the publisher or None to use items data |
486 @param client: SatXMPPClient instance | 487 @param client: SatXMPPClient instance |
487 @return: deferred which fire list of group blog data """ | 488 @return: deferred which fire list of group blog data """ |
488 # TODO: use items data when pub_jid is None | 489 # TODO: use items data when pub_jid is None |
492 try: | 493 try: |
493 gbdata['service'] = client.item_access_pubsub.full() | 494 gbdata['service'] = client.item_access_pubsub.full() |
494 except AttributeError: | 495 except AttributeError: |
495 pass | 496 pass |
496 ret.append(gbdata) | 497 ret.append(gbdata) |
497 # if there is a comments node, we subscribe to it | 498 # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case) |
498 if "comments_node" in gbdata: | 499 if "comments_node" in gbdata and pub_jid.userhostJID() != client.jid.userhostJID(): |
499 try: | 500 try: |
500 # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case) | 501 service = jid.JID(gbdata["comments_service"]) |
501 if pub_jid.userhostJID() != client.jid.userhostJID(): | 502 node = gbdata["comments_node"] |
502 self.host.plugins["XEP-0060"].subscribe(jid.JID(gbdata["comments_service"]), gbdata["comments_node"], | |
503 profile_key=client.profile) | |
504 except KeyError: | 503 except KeyError: |
505 log.warning("Missing key for comments") | 504 log.warning("Missing key for comments") |
505 continue | |
506 # TODO: see if it is really needed to check for not subscribing twice to the node | |
507 # It previously worked without this check, but the pubsub service logs were polluted | |
508 # or, if in debug mode, it made sat-pubsub very difficult to debug. | |
509 subscribed_nodes = yield self.host.plugins['XEP-0060'].listSubscribedNodes(service, profile=client.profile) | |
510 if node not in subscribed_nodes: # avoid sat-pubsub "SubscriptionExists" error | |
511 self.host.plugins["XEP-0060"].subscribe(service, node, profile_key=client.profile) | |
506 defer.returnValue(ret) | 512 defer.returnValue(ret) |
507 | 513 |
508 def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE): | 514 def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE): |
509 """Retrieve previously published items from a publish subscribe node. | 515 """Retrieve previously published items from a publish subscribe node. |
510 @param pub_jid_s: jid of the publisher | 516 @param pub_jid_s: jid of the publisher |
643 publishers_jids = [jid.JID(publisher) for publisher in publishers] | 649 publishers_jids = [jid.JID(publisher) for publisher in publishers] |
644 else: | 650 else: |
645 publishers_jids = publishers | 651 publishers_jids = publishers |
646 return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key) | 652 return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key) |
647 | 653 |
654 @defer.inlineCallbacks | |
648 def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): | 655 def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): |
649 """Get the last published microblogs for a list of groups or jids | 656 """Get the last published microblogs for a list of groups or jids |
650 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") | 657 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") |
651 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) | 658 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) |
652 @param max_items: how many microblogs we want to get | 659 @param max_items: how many microblogs we want to get |
653 @param profile_key: profile key | 660 @param profile_key: profile key |
654 """ | 661 """ |
655 | |
656 def sendResult(result): | |
657 """send result of DeferredList (dict of jid => microblogs) to the calling method""" | |
658 | |
659 ret = {} | |
660 | |
661 for (success, value) in result: | |
662 if success: | |
663 source_jid, data = value | |
664 ret[source_jid] = data | |
665 | |
666 return ret | |
667 | |
668 def initialised(result): | |
669 profile, client = result | |
670 | |
671 if publishers_type == "ALL": | |
672 contacts = client.roster.getItems() | |
673 jids = [contact.jid.userhostJID() for contact in contacts] | |
674 elif publishers_type == "GROUP": | |
675 jids = [] | |
676 for _group in publishers: | |
677 jids.extend(client.roster.getJidsFromGroup(_group)) | |
678 elif publishers_type == 'JID': | |
679 jids = publishers | |
680 else: | |
681 raise UnknownType | |
682 | |
683 mblogs = [] | |
684 | |
685 for jid_ in jids: | |
686 d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(jid_), | |
687 max_items=max_items, profile_key=profile_key) | |
688 d.addCallback(self._itemsConstruction, jid_, client) | |
689 d.addCallback(lambda gbdata, source_jid: (source_jid, gbdata), jid_.full()) | |
690 | |
691 mblogs.append(d) | |
692 # consume the failure "StanzaError with condition u'item-not-found'" | |
693 # when the node doesn't exist (e.g that JID hasn't posted any message) | |
694 dlist = defer.DeferredList(mblogs, consumeErrors=True) | |
695 dlist.addCallback(sendResult) | |
696 | |
697 return dlist | |
698 | |
699 #TODO: custom exception | 662 #TODO: custom exception |
700 if publishers_type not in ["GROUP", "JID", "ALL"]: | 663 if publishers_type not in ["GROUP", "JID", "ALL"]: |
701 raise Exception("Bad call, unknown publishers_type") | 664 raise Exception("Bad call, unknown publishers_type") |
702 if publishers_type == "ALL" and publishers: | 665 if publishers_type == "ALL" and publishers: |
703 raise Exception("Publishers list must be empty when getting microblogs for all contacts") | 666 raise Exception("Publishers list must be empty when getting microblogs for all contacts") |
704 return self._initialise(profile_key).addCallback(initialised) | 667 profile, client = yield self._initialise(profile_key) |
705 #TODO: we need to use the server corresponding the the host of the jid | 668 #TODO: we need to use the server corresponding the the host of the jid |
669 | |
670 if publishers_type == "ALL": | |
671 contacts = client.roster.getItems() | |
672 jids = [contact.jid.userhostJID() for contact in contacts] | |
673 elif publishers_type == "GROUP": | |
674 jids = [] | |
675 for _group in publishers: | |
676 jids.extend(client.roster.getJidsFromGroup(_group)) | |
677 elif publishers_type == 'JID': | |
678 jids = publishers | |
679 else: | |
680 raise UnknownType | |
681 | |
682 data = {publisher: self.getNodeName(publisher) for publisher in jids} | |
683 d_dict = yield self.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, max_items=max_items, profile_key=profile) | |
684 for publisher, d in d_dict.items(): | |
685 d.addCallback(self._itemsConstruction, publisher, client) | |
686 d.addCallback(lambda gbdata: (publisher.full(), gbdata)) | |
687 # consume the failure "StanzaError with condition u'item-not-found'" | |
688 # when the node doesn't exist (e.g that JID hasn't posted any message) | |
689 result = yield defer.DeferredList(d_dict.values(), consumeErrors=True) | |
690 defer.returnValue({value[0]: value[1] for success, value in result if success}) | |
706 | 691 |
707 def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): | 692 def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): |
708 def initialised(result): | 693 def initialised(result): |
709 profile, client = result | 694 profile, client = result |
710 d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid.JID(pub_jid)), | 695 d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid.JID(pub_jid)), |
719 publishers_jids = [jid.JID(publisher) for publisher in publishers] | 704 publishers_jids = [jid.JID(publisher) for publisher in publishers] |
720 else: | 705 else: |
721 publishers_jids = publishers | 706 publishers_jids = publishers |
722 return self.massiveSubscribeGroupBlogs(publishers_type, publishers_jids, profile_key) | 707 return self.massiveSubscribeGroupBlogs(publishers_type, publishers_jids, profile_key) |
723 | 708 |
709 @defer.inlineCallbacks | |
724 def massiveSubscribeGroupBlogs(self, publishers_type, publishers, profile_key=C.PROF_KEY_NONE): | 710 def massiveSubscribeGroupBlogs(self, publishers_type, publishers, profile_key=C.PROF_KEY_NONE): |
725 """Subscribe microblogs for a list of groups or jids | 711 """Subscribe microblogs for a list of groups or jids |
726 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") | 712 @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") |
727 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) | 713 @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) |
728 @param profile_key: profile key | 714 @param profile_key: profile key |
729 """ | 715 """ |
730 | |
731 def initialised(result): | |
732 profile, client = result | |
733 | |
734 if publishers_type == "ALL": | |
735 contacts = client.roster.getItems() | |
736 jids = [contact.jid.userhostJID() for contact in contacts] | |
737 elif publishers_type == "GROUP": | |
738 jids = [] | |
739 for _group in publishers: | |
740 jids.extend(client.roster.getJidsFromGroup(_group)) | |
741 elif publishers_type == 'JID': | |
742 jids = publishers | |
743 else: | |
744 raise UnknownType | |
745 | |
746 mblogs = [] | |
747 for jid_ in jids: | |
748 d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid_), | |
749 profile_key=profile_key) | |
750 mblogs.append(d) | |
751 # consume the failure "StanzaError with condition u'item-not-found'" | |
752 # when the node doesn't exist (e.g that JID hasn't posted any message) | |
753 dlist = defer.DeferredList(mblogs, consumeErrors=True) | |
754 return dlist | |
755 | |
756 #TODO: custom exception | 716 #TODO: custom exception |
757 if publishers_type not in ["GROUP", "JID", "ALL"]: | 717 if publishers_type not in ["GROUP", "JID", "ALL"]: |
758 raise Exception("Bad call, unknown publishers_type") | 718 raise Exception("Bad call, unknown publishers_type") |
759 if publishers_type == "ALL" and publishers: | 719 if publishers_type == "ALL" and publishers: |
760 raise Exception("Publishers list must be empty when getting microblogs for all contacts") | 720 raise Exception("Publishers list must be empty when getting microblogs for all contacts") |
761 return self._initialise(profile_key).addCallback(initialised) | 721 profile, client = yield self._initialise(profile_key) |
762 #TODO: we need to use the server corresponding the the host of the jid | 722 #TODO: we need to use the server corresponding the the host of the jid |
723 | |
724 if publishers_type == "ALL": | |
725 contacts = client.roster.getItems() | |
726 jids = [contact.jid.userhostJID() for contact in contacts] | |
727 elif publishers_type == "GROUP": | |
728 jids = [] | |
729 for _group in publishers: | |
730 jids.extend(client.roster.getJidsFromGroup(_group)) | |
731 elif publishers_type == 'JID': | |
732 jids = publishers | |
733 else: | |
734 raise UnknownType | |
735 | |
736 node_ids = [self.getNodeName(publisher) for publisher in jids] | |
737 d_list = yield self.host.plugins["XEP-0060"].subscribeToMany(client.item_access_pubsub, node_ids, profile_key=profile_key) | |
738 # consume the failure "StanzaError with condition u'item-not-found'" | |
739 # when the node doesn't exist (e.g that JID hasn't posted any message) | |
740 result = yield defer.DeferredList(d_list, consumeErrors=True) | |
741 defer.returnValue(result) | |
763 | 742 |
764 def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): | 743 def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): |
765 """Delete absolutely all the microblog data that the user has posted""" | 744 """Delete absolutely all the microblog data that the user has posted""" |
766 calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)] | 745 calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)] |
767 return defer.DeferredList(calls) | 746 return defer.DeferredList(calls) |