Mercurial > libervia-backend
diff src/plugins/plugin_misc_groupblog.py @ 1268:bb30bf3ae932
plugins XEP-0060, XEP-0277, groupblog: make use of RSM (XEP-0059)
author | souliane <souliane@mailoo.org> |
---|---|
date | Mon, 15 Dec 2014 14:04:19 +0100 |
parents | b4a264915ea9 |
children | 1c90a913fbb9 |
line wrap: on
line diff
--- a/src/plugins/plugin_misc_groupblog.py Mon Dec 15 14:03:13 2014 +0100 +++ b/src/plugins/plugin_misc_groupblog.py Mon Dec 15 14:04:19 2014 +0100 @@ -26,6 +26,7 @@ from twisted.words.xish.domish import Element, generateElementsNamed from sat.core import exceptions from wokkel import disco, data_form, iwokkel +from wokkel import rsm as wokkel_rsm from zope.interface import implements from feed import date import uuid @@ -52,12 +53,17 @@ 'JID': None, #JID is not yet managed } +MAX_ITEMS = 5 +MAX_COMMENTS = 5 +DO_NOT_COUNT_COMMENTS = -1 # must be lower than 0 + PLUGIN_INFO = { "name": "Group blogging throught collections", "import_name": "GROUPBLOG", "type": "MISC", "protocols": [], "dependencies": ["XEP-0277"], + "recommendations": ["XEP-0059"], "main": "GroupBlog", "handler": "yes", "description": _("""Implementation of microblogging with roster access""") @@ -79,7 +85,6 @@ class UnknownType(Exception): pass - class GroupBlog(object): """This class use a SàT PubSub Service to manage access on microblog""" @@ -104,32 +109,27 @@ async=True) host.bridge.addMethod("getGroupBlogs", ".plugin", - in_sign='sass', out_sign='aa{ss}', + in_sign='sasa{ss}bs', out_sign='(aa{ss}a{ss})', method=self.getGroupBlogs, async=True) host.bridge.addMethod("getGroupBlogsWithComments", ".plugin", - in_sign='sass', out_sign='a(a{ss}aa{ss})', + in_sign='sasa{ss}is', out_sign='(a(a{ss}(aa{ss}a{ss}))a{ss})', method=self.getGroupBlogsWithComments, async=True) - host.bridge.addMethod("getLastGroupBlogs", ".plugin", - in_sign='sis', out_sign='aa{ss}', - method=self.getLastGroupBlogs, + host.bridge.addMethod("getGroupBlogsAtom", ".plugin", + in_sign='sa{ss}s', out_sign='s', + method=self.getGroupBlogsAtom, async=True) - host.bridge.addMethod("getLastGroupBlogsAtom", ".plugin", - in_sign='sis', out_sign='s', - method=self.getLastGroupBlogsAtom, - async=True) - - host.bridge.addMethod("getMassiveLastGroupBlogs", ".plugin", - in_sign='sasis', out_sign='a{saa{ss}}', - method=self._getMassiveLastGroupBlogs, + host.bridge.addMethod("getMassiveGroupBlogs", ".plugin", + in_sign='sasa{ss}s', out_sign='a{s(aa{ss}a{ss})}', + method=self._getMassiveGroupBlogs, async=True) host.bridge.addMethod("getGroupBlogComments", ".plugin", - in_sign='sss', out_sign='aa{ss}', + in_sign='ssa{ss}s', out_sign='(aa{ss}a{ss})', method=self.getGroupBlogComments, async=True) @@ -515,78 +515,108 @@ d_list.append(self.item2gbdata(item).addCallback(cb)) return defer.DeferredList(d_list, consumeErrors=True).addCallback(lambda result: [value for (success, value) in result if success]) - def __getGroupBlogs(self, pub_jid_s, max_items=10, item_ids=None, profile_key=C.PROF_KEY_NONE): + def _getOrCountComments(self, items, max=0, profile_key=C.PROF_KEY_NONE): + """Get and/or count the comments of the given items. + + @param items (list): items to consider. + @param max (int): maximum number of comments to get, if 0 only count + them. The count is set to the item data of key "comments_count". + @param profile_key (str): %(doc_profile_key)s + @return: a deferred list of: + - if max == 0: microblog data + - else: couple (dict, (list[dict], dict)) containing: + - microblog data (main item) + - couple (comments data, RSM response data for the comments) + """ + def comments_cb(comments_data, entry): + entry['comments_count'] = comments_data[1]['count'] + return (entry, comments_data) if max > 0 else entry + + assert(max >= 0) + d_list = [] + for entry in items: + if entry.get('comments', False): + comments_rsm = {'max': max} + d = self.getGroupBlogComments(entry['comments_service'], entry['comments_node'], rsm=comments_rsm, profile_key=profile_key) + d.addCallback(comments_cb, entry) + d_list.append(d) + else: + if max > 0: + d_list.append(defer.succeed((entry, ([], {})))) + else: + d_list.append(defer.succeed(entry)) + deferred_list = defer.DeferredList(d_list) + deferred_list.addCallback(lambda result: [value for (success, value) in result if success]) + return deferred_list + + def __getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, max_comments=0, profile_key=C.PROF_KEY_NONE): """Retrieve previously published items from a publish subscribe node. + @param pub_jid_s: jid of the publisher - @param max_items: how many microblogs we want to get (see XEP-0060 #6.5.7) @param item_ids: list of microblogs items IDs - @param profile_key: profile key - @return: list of microblog data (dict) + @param rsm (dict): RSM request data + @param max_comments (int): maximum number of comments to retrieve + @param profile_key (str): %(doc_profile_key)s + @return: a deferred couple (list, dict) containing: + - list of: + - if max_comments == 0: microblog data + - else: couple (dict, (list[dict], dict)) containing: + - microblog data (main item) + - couple (comments data, RSM response data for the comments) + - RSM response data """ pub_jid = jid.JID(pub_jid_s) - def initialised(result): - profile, client = result - d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(pub_jid), - max_items=max_items, item_ids=item_ids, profile_key=profile_key) - d.addCallback(self._itemsConstruction, pub_jid, client) - d.addErrback(lambda ignore: {}) # TODO: more complete error management (log !) - return d + def cb(items, client): + d = self._itemsConstruction(items, pub_jid, client) + if max_comments == DO_NOT_COUNT_COMMENTS: + return d + return d.addCallback(self._getOrCountComments, max_comments, profile_key) - #TODO: we need to use the server corresponding the the host of the jid - return self._initialise(profile_key).addCallback(initialised) + return DeferredItems(self, cb, None, profile_key).get(self.getNodeName(pub_jid), item_ids, rsm=rsm) - def getGroupBlogs(self, pub_jid_s, item_ids=None, profile_key=C.PROF_KEY_NONE): + def getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, count_comments=True, profile_key=C.PROF_KEY_NONE): """Get the published microblogs of the specified IDs. If item_ids is - None, the result would be the same than calling getLastGroupBlogs + None, the result would be the same than calling getGroupBlogs with the default value for the attribute max_items. - @param pub_jid_s: jid of the publisher - @param item_ids: list of microblogs items IDs - @param profile_key: profile key - @return: list of microblog data (dict) - """ - return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, profile_key=profile_key) - def getGroupBlogsWithComments(self, pub_jid_s, item_ids=None, profile_key=C.PROF_KEY_NONE): - """Get the published microblogs of the specified IDs and their comments. If - item_ids is None, returns the last published microblogs and their comments. @param pub_jid_s: jid of the publisher @param item_ids: list of microblogs items IDs - @param profile_key: profile key - @return: list of couple (microblog data, list of microblog data) + @param rsm (dict): RSM request data + @param count_comments (bool): also count the comments if True + @param profile_key (str): %(doc_profile_key)s + @return: a deferred couple (list, dict) containing: + - list of microblog data + - RSM response data """ - def get_comments(data): - d_list = [] - for entry in data: - if entry.get('comments', False): - d = self.getGroupBlogComments(entry['comments_service'], entry['comments_node'], profile_key=profile_key) - d.addCallback(lambda data: (entry, data)) - d_list.append(d) - else: - d_list.append(defer.succeed((entry, []))) - deferred_list = defer.DeferredList(d_list) - deferred_list.addCallback(lambda result: [value for (success, value) in result if success]) - return deferred_list + max_comments = 0 if count_comments else DO_NOT_COUNT_COMMENTS + return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) + + def getGroupBlogsWithComments(self, pub_jid_s, item_ids=None, rsm=None, max_comments=None, profile_key=C.PROF_KEY_NONE): + """Get the published microblogs of the specified IDs and their comments. If + item_ids is None, returns the last published microblogs and their comments. - d = self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, profile_key=profile_key) - d.addCallback(get_comments) - return d + @param pub_jid_s: jid of the publisher + @param item_ids: list of microblogs items IDs + @param rsm (dict): RSM request data + @param max_comments (int): maximum number of comments to retrieve + @param profile_key (str): %(doc_profile_key)s + @return: a deferred couple (list, dict) containing: + - list of couple (dict, (list[dict], dict)) containing: + - microblog data (main item) + - couple (comments data, RSM response data for the comments) + - RSM response data + """ + if max_comments is None: + max_comments = MAX_COMMENTS + assert(max_comments > 0) # otherwise the return signature is not the same + return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) - def getLastGroupBlogs(self, pub_jid_s, max_items=10, profile_key=C.PROF_KEY_NONE): - """Get the last published microblogs - @param pub_jid_s: jid of the publisher - @param max_items: how many microblogs we want to get (see XEP-0060 #6.5.7) - @param profile_key: profile key - @return: list of microblog data (dict) - """ - return self.__getGroupBlogs(pub_jid_s, max_items=max_items, profile_key=profile_key) - - def getLastGroupBlogsAtom(self, pub_jid_s, max_items=10, profile_key=C.PROF_KEY_NONE): + def getGroupBlogsAtom(self, pub_jid_s, rsm=None, profile_key=C.PROF_KEY_NONE): """Get the atom feed of the last published microblogs @param pub_jid: jid of the publisher - @param max_items: how many microblogs we want to get (see XEP-0060 #6.5.7) @param profile_key: profile key - @return: atom XML feed (unicode) + @return: a deferred unicode (atom XML feed) """ pub_jid = jid.JID(pub_jid_s) @@ -617,59 +647,41 @@ feed += " " + entry.toXml() + "\n" return feed + "</feed>" - def initialised(result): - profile, client = result - d = self.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, self.getNodeName(pub_jid), - max_items=max_items, profile_key=profile_key) - d.addCallback(items2feed, pub_jid, client) - d.addErrback(lambda ignore: '') # TODO: more complete error management (log !) - return d + def cb(items, client): + return items2feed(items, pub_jid, client) - #TODO: we need to use the server corresponding the the host of the jid - return self._initialise(profile_key).addCallback(initialised) + d = DeferredItems(self, cb, lambda dummy: '', profile_key).get(self.getNodeName(pub_jid), rsm=rsm) + return d.addCallback(lambda res: res[0]) - def getGroupBlogComments(self, service_s, node, profile_key=C.PROF_KEY_NONE): + def getGroupBlogComments(self, service_s, node, rsm=None, profile_key=C.PROF_KEY_NONE): """Get all comments of given node @param service_s: service hosting the node @param node: comments node @param profile_key: profile key - @return: list of microblog data (dict) + @return: a deferred couple (list, dict) containing: + - list of microblog data + - RSM response data """ service = jid.JID(service_s) - def initialised(result): - profile, client = result - d = self.host.plugins["XEP-0060"].getItems(service, node, - profile_key=profile_key) - d.addCallback(self._handleCommentsItems, service, node) - d.addErrback(lambda ignore: {}) # TODO: more complete error management (log !) - return d + def cb(items, client): + return self._handleCommentsItems(items, service, node) - #TODO: we need to use the server corresponding the the host of the jid - return self._initialise(profile_key).addCallback(initialised) + return DeferredItems(self, cb, None, profile_key).get(node, rsm=rsm) - def _getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): + def _getMassiveGroupBlogs(self, publishers_type, publishers, rsm=None, profile_key=C.PROF_KEY_NONE): if publishers_type == 'JID': publishers_jids = [jid.JID(publisher) for publisher in publishers] else: publishers_jids = publishers - return self.getMassiveLastGroupBlogs(publishers_type, publishers_jids, max_items, profile_key) + return self.getMassiveGroupBlogs(publishers_type, publishers_jids, rsm, profile_key) - @defer.inlineCallbacks - def getMassiveLastGroupBlogs(self, publishers_type, publishers, max_items=10, profile_key=C.PROF_KEY_NONE): - """Get the last published microblogs for a list of groups or jids - @param publishers_type: type of the list of publishers (one of "GROUP" or "JID" or "ALL") - @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) - @param max_items: how many microblogs we want to get - @param profile_key: profile key - """ + def _getPublishersJIDs(self, publishers_type, publishers, client): #TODO: custom exception if publishers_type not in ["GROUP", "JID", "ALL"]: raise Exception("Bad call, unknown publishers_type") if publishers_type == "ALL" and publishers: raise Exception("Publishers list must be empty when getting microblogs for all contacts") - profile, client = yield self._initialise(profile_key) - #TODO: we need to use the server corresponding the the host of the jid if publishers_type == "ALL": contacts = client.roster.getItems() @@ -682,20 +694,26 @@ jids = publishers else: raise UnknownType - - data = {publisher: self.getNodeName(publisher) for publisher in jids} - d_dict = yield self.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, max_items=max_items, profile_key=profile) + return jids - def cb(jid): - def res(gbdata): - return (jid.full(), gbdata) - return res + def getMassiveGroupBlogs(self, publishers_type, publishers, rsm=None, profile_key=C.PROF_KEY_NONE): + """Get the last published microblogs for a list of groups or jids + @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") + @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) + @param rsm (dict): RSM request data, common to all publishers + @param profile_key: profile key + @return: a deferred dict with: + - key: publisher (unicode) + - value: couple (list[dict], dict) with: + - the microblogs data + - RSM response data + """ + def cb(items, publisher, client): + d = self._itemsConstruction(items, publisher, client) + return d.addCallback(self._getOrCountComments, False, profile_key) - for publisher, d in d_dict.items(): - d.addCallback(self._itemsConstruction, publisher, client) - d.addCallback(cb(publisher)) - result = yield defer.DeferredList(d_dict.values(), consumeErrors=False) - defer.returnValue({value[0]: value[1] for success, value in result if success}) + #TODO: we need to use the server corresponding to the host of the jid + return DeferredItemsFromMany(self, cb, profile_key).get(publishers_type, publishers, rsm=rsm) def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): def initialised(result): @@ -721,30 +739,14 @@ @param publishers: list of publishers, according to "publishers_type" (list of groups or list of jids) @param profile_key: profile key """ - #TODO: custom exception - if publishers_type not in ["GROUP", "JID", "ALL"]: - raise Exception("Bad call, unknown publishers_type") - if publishers_type == "ALL" and publishers: - raise Exception("Publishers list must be empty when getting microblogs for all contacts") profile, client = yield self._initialise(profile_key) #TODO: we need to use the server corresponding the the host of the jid - if publishers_type == "ALL": - contacts = client.roster.getItems() - jids = [contact.jid.userhostJID() for contact in contacts] - elif publishers_type == "GROUP": - jids = [] - for _group in publishers: - jids.extend(client.roster.getJidsFromGroup(_group)) - elif publishers_type == 'JID': - jids = publishers - else: - raise UnknownType - + jids = self._getPublishersJIDs(publishers_type, publishers, client) node_ids = [self.getNodeName(publisher) for publisher in jids] d_list = yield self.host.plugins["XEP-0060"].subscribeToMany(client.item_access_pubsub, node_ids, profile_key=profile_key) result = yield defer.DeferredList(d_list, consumeErrors=False) - defer.returnValue(result) + defer.returnValue(None) def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): """Delete absolutely all the microblog data that the user has posted""" @@ -752,18 +754,26 @@ return defer.DeferredList(calls) def deleteAllGroupBlogs(self, profile_key=C.PROF_KEY_NONE): - """Delete all the main items and their comments that the user has posted + """Delete all the main items that the user has posted and their comments. """ def initialised(result): profile, client = result service = client.item_access_pubsub jid_ = client.jid + main_node = self.getNodeName(jid_) - main_node = self.getNodeName(jid_) - d = self.host.plugins["XEP-0060"].deleteNode(service, main_node, profile_key=profile) + def cb(nodes): + d_list = [] + for node in [node for node in nodes if node.endswith(main_node)]: + d = self.host.plugins["XEP-0060"].deleteNode(service, node, profile_key=profile) + d.addErrback(lambda failure: log.error(_("Deletion of node %(node)s failed: %(message)s") % + {'node': node, 'message': failure.getErrorMessage()})) + d_list.append(d) + return defer.DeferredList(d_list) + + d = self.host.plugins["XEP-0060"].listNodes(service, profile=profile) + d.addCallback(cb) d.addCallback(lambda dummy: log.info(_("All microblog's main items from %s have been deleted!") % jid_.userhost())) - d.addErrback(lambda failure: log.error(_("Deletion of node %(node)s failed: %(message)s") % - {'node': main_node, 'message': failure.getErrorMessage()})) return d return self._initialise(profile_key).addCallback(initialised) @@ -782,9 +792,11 @@ jids = [contact.jid.userhostJID() for contact in client.roster.getItems()] blogs = [] for jid_ in jids: + if jid_ == client.jid.userhostJID(): + continue # do not remove the comments on our own node main_node = self.getNodeName(jid_) d = self.host.plugins["XEP-0060"].getItems(service, main_node, profile_key=profile) - d.addCallback(getComments, client) + d.addCallback(lambda res: getComments(res[0], client)) d.addErrback(lambda failure, main_node: log.error(_("Retrieval of items for node %(node)s failed: %(message)s") % {'node': main_node, 'message': failure.getErrorMessage()}), main_node) blogs.append(d) @@ -807,7 +819,7 @@ href = link.getAttribute('href') service, node = self.host.plugins['XEP-0277'].parseCommentUrl(href) d = self.host.plugins["XEP-0060"].getItems(service, node, profile_key=profile_key) - d.addCallback(lambda items, service, node: (service, node, items), service, node) + d.addCallback(lambda items: (service, node, items[0])) d.addErrback(lambda failure, node: log.error(_("Retrieval of comments for node %(node)s failed: %(message)s") % {'node': node, 'message': failure.getErrorMessage()}), node) comments.append(d) @@ -850,6 +862,116 @@ return self._initialise(profile_key).addCallback(initialised) +class DeferredItems(): + """Helper class to retrieve items using XEP-0060""" + + def __init__(self, parent, cb, eb=None, profile_key=C.PROF_KEY_NONE): + """ + @param parent (GroupBlog): GroupBlog instance + @param cb (callable): callback method to be applied on items + @param eb (callable): errback method to be applied on items + @param profile_key (str): %(doc_profile_key)s + """ + self.parent = parent + self.cb = cb + self.eb = (lambda dummy: []) if eb is None else eb + self.profile_key = profile_key + + def get(self, node, item_ids=None, sub_id=None, rsm=None): + """ + @param node (str): node identifier. + @param item_ids (list[str]): list of items identifiers. + @param sub_id (str): optional subscription identifier. + @param rsm (dict): RSM request data + @return: a deferred couple (list, dict) containing: + - list of microblog data + - RSM response data + """ + if rsm is None: + rsm = {'max': (len(item_ids) if item_ids else MAX_ITEMS)} + + def initialised(result): + profile, client = result + rsm_ = wokkel_rsm.RSMRequest(**rsm) + d = self.parent.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, + node, rsm_.max, + item_ids, sub_id, rsm_, + profile_key=profile) + + def cb(result): + d = defer.maybeDeferred(self.cb, result[0], client) + return d.addCallback(lambda items: (items, result[1])) + + d.addCallbacks(cb, self.eb) + return d + + #TODO: we need to use the server corresponding to the host of the jid + return self.parent._initialise(self.profile_key).addCallback(initialised) + + +class DeferredItemsFromMany(): + def __init__(self, parent, cb, profile_key=C.PROF_KEY_NONE): + """ + @param parent (GroupBlog): GroupBlog instance + @param cb (callable): callback method to be applied on items + @param profile_key (str): %(doc_profile_key)s + """ + self.parent = parent + self.cb = cb + self.profile_key = profile_key + + def __buildData(self, publishers_type, publishers, client): + jids = self.parent._getPublishersJIDs(publishers_type, publishers, client) + return {publisher: self.parent.getNodeName(publisher) for publisher in jids} + + def get(self, publishers_type, publishers, sub_id=None, rsm=None): + """ + @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") + @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) + @param sub_id (str): optional subscription identifier. + @param rsm (dict): RSM request data + @return: a deferred dict with: + - key: publisher (unicode) + - value: couple (list[dict], dict) with: + - the microblogs data + - RSM response data + """ + if rsm is None: + rsm = {'max': MAX_ITEMS} + + def initialised(result): + profile, client = result + + data = self.__buildData(publishers_type, publishers, client) + rsm_ = wokkel_rsm.RSMRequest(**rsm) + d = self.parent.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, + data, rsm_.max, sub_id, + rsm_, profile_key=profile) + + def cb(publisher): + def callback(result): + d = defer.maybeDeferred(self.cb, result[0], publisher, client) + d.addCallback(lambda items: (publisher.full(), (items, result[1]))) + return d + return callback + + def cb_list(result): + return {value[0]: value[1] for success, value in result if success} + + def main_cb(result): + d_list = [] + for publisher, d_items in result.items(): + # XXX: trick needed as publisher is a loop variable + d_list.append(d_items.addCallback(cb(publisher))) + return defer.DeferredList(d_list, consumeErrors=False).addCallback(cb_list) + + d.addCallback(main_cb) + return d + + #TODO: we need to use the server corresponding to the host of the jid + return self.parent._initialise(self.profile_key).addCallback(initialised) + + class GroupBlog_handler(XMPPHandler): implements(iwokkel.IDisco)