Mercurial > libervia-backend
view src/plugins/plugin_misc_groupblog.py @ 1626:63cef4dbf2a4
core, plugins file, XEP-0234, bridge: progression api enhancement:
- progressStarted have a new metadata parameter, useful to know the kind of progression, direction, etc. Check bridge doc
- progressGetAllMetadata can be used to retrieve this data and discover on currently running progressions
- progressFinished also have a new metadata parameter, used to e.g. indicate that hash is checked
- core: fixed progressGetAll
- file, XEP-0234: implemented the API modifications, hash is returned on progressFinished
- file: SatFile.checkSize allows to check size independently of close (be sure that all the data have been transfered though)
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 19 Nov 2015 18:13:26 +0100 |
parents | e0bde0d0b321 |
children | 9aa2a703e460 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT plugin for microbloging with roster access # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.xish.domish import generateElementsNamed from sat.core import exceptions from wokkel import disco, data_form, iwokkel from wokkel import rsm from zope.interface import implements # import uuid try: from twisted.words.protocols.xmlstream import XMPPHandler except ImportError: from wokkel.subprotocols import XMPPHandler NS_PUBSUB = 'http://jabber.org/protocol/pubsub' NS_GROUPBLOG = 'http://goffi.org/protocol/groupblog' NS_NODE_PREFIX = 'urn:xmpp:groupblog:' #NS_PUBSUB_EXP = 'http://goffi.org/protocol/pubsub' #for non official features NS_PUBSUB_EXP = NS_PUBSUB # XXX: we can't use custom namespace as Wokkel's PubSubService use official NS NS_PUBSUB_ITEM_ACCESS = NS_PUBSUB_EXP + "#item-access" NS_PUBSUB_GROUPBLOG = NS_PUBSUB_EXP + "#groupblog" NS_PUBSUB_CREATOR_JID_CHECK = NS_PUBSUB_EXP + "#creator-jid-check" NS_PUBSUB_ITEM_CONFIG = NS_PUBSUB_EXP + "#item-config" NS_PUBSUB_AUTO_CREATE = NS_PUBSUB + "#auto-create" ACCESS_TYPE_MAP = { 'PUBLIC': 'open', 'GROUP': 'roster', 'JID': None, #JID is not yet managed } MAX_ITEMS = 5 MAX_COMMENTS = 5 DO_NOT_COUNT_COMMENTS = -1 # must be lower than 0 PLUGIN_INFO = { "name": "Group blogging throught collections", "import_name": "GROUPBLOG", "type": "MISC", "protocols": [], "dependencies": ["XEP-0277"], "recommendations": ["XEP-0059"], "main": "GroupBlog", "handler": "yes", "description": _("""Implementation of microblogging with roster access""") } class NoCompatiblePubSubServerFound(Exception): pass class BadAccessTypeError(Exception): pass class BadAccessListError(Exception): pass class UnknownType(Exception): pass class GroupBlog(object): """This class use a SàT PubSub Service to manage access on microblog""" def __init__(self, host): log.info(_("Group blog plugin initialization")) self.host = host host.bridge.addMethod("sendGroupBlog", ".plugin", in_sign='sassa{ss}s', out_sign='', method=self.sendGroupBlog, async=True) host.bridge.addMethod("deleteGroupBlog", ".plugin", in_sign='(sss)ss', out_sign='', method=self.deleteGroupBlog, async=True) host.bridge.addMethod("updateGroupBlog", ".plugin", in_sign='(sss)ssa{ss}s', out_sign='', method=self.updateGroupBlog, async=True) host.bridge.addMethod("sendGroupBlogComment", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self.sendGroupBlogComment, async=True) # host.bridge.addMethod("getGroupBlogs", ".plugin", # in_sign='sasa{ss}bs', out_sign='(aa{ss}a{ss})', # method=self.getGroupBlogs, # async=True) host.bridge.addMethod("getGroupBlogsWithComments", ".plugin", in_sign='sasa{ss}is', out_sign='(a(a{ss}(aa{ss}a{ss}))a{ss})', method=self.getGroupBlogsWithComments, async=True) # host.bridge.addMethod("getMassiveGroupBlogs", ".plugin", # in_sign='sasa{ss}s', out_sign='a{s(aa{ss}a{ss})}', # method=self._getMassiveGroupBlogs, # async=True) # host.bridge.addMethod("getGroupBlogComments", ".plugin", # in_sign='ssa{ss}s', out_sign='(aa{ss}a{ss})', # method=self.getGroupBlogComments, # async=True) # host.bridge.addMethod("subscribeGroupBlog", ".plugin", in_sign='ss', out_sign='', # method=self.subscribeGroupBlog, # async=True) # host.trigger.add("PubSubItemsReceived", self.pubSubItemsReceivedTrigger) ## plugin management methods ## def getHandler(self, profile): return GroupBlog_handler() @defer.inlineCallbacks def profileConnected(self, profile): client = self.host.getClient(profile) try: yield self.host.checkFeatures((NS_PUBSUB_GROUPBLOG, NS_PUBSUB_AUTO_CREATE), profile=profile) except exceptions.FeatureNotFound: client.server_groupblog_available = False log.warning(_(u"Server is not able to manage item-access pubsub, we can't use group blog")) else: client.server_groupblog_available = True log.info(_(u"Server can manage group blogs")) def getFeatures(self, profile): try: client = self.host.getClient(profile) except exceptions.ProfileNotSetError: return {} try: return {'available': C.boolConst(client.server_groupblog_available)} except AttributeError: if self.host.isConnected(profile): log.debug("Profile is not connected, service is not checked yet") else: log.error("Service should be available !") return {} @defer.inlineCallbacks def _initialise(self, profile_key): """Check that the data for this profile are initialised, and do it else @param profile_key: %(doc_profile)s""" profile = self.host.memory.getProfileName(profile_key) if not profile: raise exceptions.ProfileUnknownError client = self.host.getClient(profile) #we first check that we have a item-access pubsub server if not hasattr(client, "item_access_pubsub"): log.debug(_('Looking for item-access powered pubsub server')) #we don't have any pubsub server featuring item access yet item_access_pubsubs = yield self.host.findFeaturesSet((NS_PUBSUB_AUTO_CREATE, NS_PUBSUB_CREATOR_JID_CHECK), "pubsub", "service", profile=profile) # item_access_pubsubs = yield self.host.findFeaturesSet((NS_PUBSUB_ITEM_ACCESS, NS_PUBSUB_AUTO_CREATE, NS_PUBSUB_CREATOR_JID_CHECK), "pubsub", "service", profile_key=profile) try: client.item_access_pubsub = item_access_pubsubs.pop() log.info(_(u"item-access powered pubsub service found: [%s]") % client.item_access_pubsub.full()) except KeyError: client.item_access_pubsub = None if not client.item_access_pubsub: log.error(_(u"No item-access powered pubsub server found, can't use group blog")) raise NoCompatiblePubSubServerFound defer.returnValue((profile, client)) # def pubSubItemsReceivedTrigger(self, event, profile): # """"Trigger which catch groupblogs events""" # if event.nodeIdentifier.startswith(NS_NODE_PREFIX): # # Microblog # publisher = jid.JID(event.nodeIdentifier[len(NS_NODE_PREFIX):]) # origin_host = publisher.host.split('.') # event_host = event.sender.host.split('.') # #FIXME: basic origin check, must be improved # #TODO: automatic security test # if (not (origin_host) # or len(event_host) < len(origin_host) # or event_host[-len(origin_host):] != origin_host): # log.warning(u"Host incoherence between %s and %s (hack attempt ?)" % (unicode(event.sender), # unicode(publisher))) # return False # client = self.host.getClient(profile) # def gbdataManagementMicroblog(gbdata): # for gbdatum in gbdata: # self.host.bridge.personalEvent(publisher.full(), "MICROBLOG", gbdatum, profile) # d = self._itemsConstruction(event.items, publisher, client) # d.addCallback(gbdataManagementMicroblog) # return False # elif event.nodeIdentifier.startswith(NS_COMMENT_PREFIX): # # Comment # def gbdataManagementComments(gbdata): # for gbdatum in gbdata: # publisher = None # FIXME: see below (_handleCommentsItems) # self.host.bridge.personalEvent(publisher.full() if publisher else gbdatum["author"], "MICROBLOG", gbdatum, profile) # d = self._handleCommentsItems(event.items, event.sender, event.nodeIdentifier) # d.addCallback(gbdataManagementComments) # return False # return True ## internal helping methodes ## def _handleCommentsItems(self, items, service, node_identifier): """ Convert comments items to groupblog data, and send them as signals @param items: comments items @param service: jid of the PubSub service used @param node_identifier: comments node @return: deferred list of group blog data """ d_list = [] def cb(microblog_data): publisher = "" # FIXME: publisher attribute for item in SàT pubsub is not managed yet, so # publisher is not checked and can be easily spoofed. This need to be fixed # quickly. microblog_data["service"] = service.userhost() microblog_data["node"] = node_identifier microblog_data["verified_publisher"] = "true" if publisher else "false" return microblog_data for item in items: d_list.append(self.item2gbdata(item, "comment").addCallback(cb)) return defer.DeferredList(d_list, consumeErrors=True).addCallback(lambda result: [value for (success, value) in result if success]) def _parseAccessData(self, microblog_data, item): P = self.host.plugins["XEP-0060"] form_elts = [child for child in item.elements() if child.name == "x"] for form_elt in form_elts: form = data_form.Form.fromElement(form_elt) if (form.formNamespace == NS_PUBSUB_ITEM_CONFIG): access_model = form.get(P.OPT_ACCESS_MODEL, 'open') if access_model == "roster": try: # FIXME: groups are xs:string, so they can contain "\n" ! This code is bugged microblog_data["groups"] = '\n'.join(form.fields[P.OPT_ROSTER_GROUPS_ALLOWED].values) except KeyError: log.warning("No group found for roster access-model") microblog_data["groups"] = '' break @defer.inlineCallbacks def item2gbdata(self, item, _type="main_item"): """ Convert item to microblog data dictionary + add access data """ microblog_data = yield self.host.plugins["XEP-0277"].item2mbdata(item) microblog_data["type"] = _type self._parseAccessData(microblog_data, item) defer.returnValue(microblog_data) def getNodeName(self, publisher): """Retrieve the name of publisher's node @param publisher: publisher's jid @return: node's name (string) """ return NS_NODE_PREFIX + publisher.userhost() ## publish ## def _publishMblog(self, service, client, access_type, access_list, message, extra): """Actually publish the message on the group blog @param service: jid of the item-access pubsub service @param client: SatXMPPClient of the publisher @param access_type: one of "PUBLIC", "GROUP", "JID" @param access_list: set of entities (empty list for all, groups or jids) allowed to see the item @param message: message to publish @param extra: dict which option name as key, which can be: - allow_comments: True to accept comments, False else (default: False) - rich: if present, contain rich text in currently selected syntax """ node_name = self.getNodeName(client.jid) mblog_data = {'content': message} for attr in ['content_rich', 'title', 'title_rich']: if attr in extra and extra[attr]: mblog_data[attr] = extra[attr] P = self.host.plugins["XEP-0060"] access_model_value = ACCESS_TYPE_MAP[access_type] if extra.get('allow_comments', 'False').lower() == 'true': # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63 comments_node = self._fillCommentsElement(mblog_data, None, node_name, service) _options = {P.OPT_ACCESS_MODEL: access_model_value, P.OPT_PERSIST_ITEMS: 1, P.OPT_MAX_ITEMS: -1, P.OPT_DELIVER_PAYLOADS: 1, P.OPT_SEND_ITEM_SUBSCRIBE: 1, P.OPT_PUBLISH_MODEL: "subscribers", # TODO: should be open if *both* node and item access_model are open (public node and item) } if access_model_value == 'roster': _options[P.OPT_ROSTER_GROUPS_ALLOWED] = list(access_list) # FIXME: check comments node creation success, at the moment this is a potential security risk (if the node # already exists, the creation will silently fail, but the comments link will stay the same, linking to a # node owned by somebody else) self.host.plugins["XEP-0060"].createNode(service, comments_node, _options, profile_key=client.profile) def itemCreated(mblog_item): form = data_form.Form('submit', formNamespace=NS_PUBSUB_ITEM_CONFIG) if access_type == "PUBLIC": if access_list: raise BadAccessListError("access_list must be empty for PUBLIC access") access = data_form.Field(None, P.OPT_ACCESS_MODEL, value=access_model_value) form.addField(access) elif access_type == "GROUP": access = data_form.Field(None, P.OPT_ACCESS_MODEL, value=access_model_value) allowed = data_form.Field(None, P.OPT_ROSTER_GROUPS_ALLOWED, values=access_list) form.addField(access) form.addField(allowed) mblog_item.addChild(form.toElement()) elif access_type == "JID": raise NotImplementedError else: log.error(_("Unknown access_type")) raise BadAccessTypeError defer_blog = self.host.plugins["XEP-0060"].publish(service, node_name, items=[mblog_item], profile_key=client.profile) defer_blog.addErrback(self._mblogPublicationFailed) return defer_blog entry_d = self.host.plugins["XEP-0277"].data2entry(mblog_data, client.profile) entry_d.addCallback(itemCreated) return entry_d # def _fillCommentsElement(self, mblog_data, entry_id, node_name, service_jid): # """ # @param mblog_data: dict containing the microblog data # @param entry_id: unique identifier of the entry # @param node_name: the pubsub node name # @param service_jid: the JID of the pubsub service # @return: the comments node string # """ # if entry_id is None: # entry_id = unicode(uuid.uuid4()) # comments_node = "%s_%s__%s" % (NS_COMMENT_PREFIX, entry_id, node_name) # mblog_data['comments'] = "xmpp:%(service)s?%(query)s" % {'service': service_jid.userhost(), # 'query': urllib.urlencode([('node', comments_node.encode('utf-8'))])} # return comments_node def _mblogPublicationFailed(self, failure): #TODO return failure def sendGroupBlog(self, access_type, access_list, message, extra, profile_key=C.PROF_KEY_NONE): """Publish a microblog with given item access @param access_type: one of "PUBLIC", "GROUP", "JID" @param access_list: list of authorized entity (empty list for PUBLIC ACCESS, list of groups or list of jids) for this item @param message: microblog @param extra: dict which option name as key, which can be: - allow_comments: True to accept comments, False else (default: False) - rich: if present, contain rich text in currently selected syntax @profile_key: %(doc_profile_key)s """ def initialised(result): profile, client = result if access_type == "PUBLIC": if access_list: raise Exception("Publishers list must be empty when getting microblogs for all contacts") return self._publishMblog(client.item_access_pubsub, client, "PUBLIC", [], message, extra) elif access_type == "GROUP": _groups = set(access_list).intersection(client.roster.getGroups()) # We only keep group which actually exist if not _groups: raise BadAccessListError("No valid group") return self._publishMblog(client.item_access_pubsub, client, "GROUP", _groups, message, extra) elif access_type == "JID": raise NotImplementedError else: log.error(_("Unknown access type")) raise BadAccessTypeError return self._initialise(profile_key).addCallback(initialised) def sendGroupBlogComment(self, node_url, message, extra, profile_key=C.PROF_KEY_NONE): """Publish a comment in the given node @param node url: link to the comments node as specified in XEP-0277 and given in microblog data's comments key @param message: comment @param extra: dict which option name as key, which can be: - allow_comments: True to accept an other level of comments, False else (default: False) - rich: if present, contain rich text in currently selected syntax @profile_key: %(doc_profile)s """ def initialised(result): profile, client = result service, node = self.host.plugins["XEP-0277"].parseCommentUrl(node_url) mblog_data = {'content': message} for attr in ['content_rich', 'title', 'title_rich']: if attr in extra and extra[attr]: mblog_data[attr] = extra[attr] if 'allow_comments' in extra: raise NotImplementedError # TODO entry_d = self.host.plugins["XEP-0277"].data2entry(mblog_data, profile) entry_d.addCallback(lambda mblog_item: self.host.plugins["XEP-0060"].publish(service, node, items=[mblog_item], profile_key=profile)) return entry_d return self._initialise(profile_key).addCallback(initialised) def _itemsConstruction(self, items, pub_jid, client): """ Transforms items to group blog data and manage comments node @param items: iterable of items @param pub_jid: jid of the publisher or None to use items data @param client: SatXMPPClient instance @return: deferred which fire list of group blog data """ # TODO: use items data when pub_jid is None d_list = [] @defer.inlineCallbacks def cb(gbdata): try: gbdata['service'] = client.item_access_pubsub.full() except AttributeError: log.warning(_(u"Pubsub service is unknown for blog entry %s") % gbdata['id']) # every comments node must be subscribed, except if we are the publisher (we are already subscribed in this case) if "comments_node" in gbdata and pub_jid.userhostJID() != client.jid.userhostJID(): try: service = jid.JID(gbdata["comments_service"]) node = gbdata["comments_node"] except KeyError: log.error(_(u"Missing key for blog comment %s") % gbdata['id']) defer.returnValue(gbdata) # TODO: see if it is really needed to check for not subscribing twice to the node # It previously worked without this check, but the pubsub service logs were polluted # or, if in debug mode, it made sat-pubsub very difficult to debug. subscribed_nodes = yield self.host.plugins['XEP-0060'].listSubscribedNodes(service, profile=client.profile) if node not in subscribed_nodes: # avoid sat-pubsub "SubscriptionExists" error self.host.plugins["XEP-0060"].subscribe(service, node, profile_key=client.profile) defer.returnValue(gbdata) for item in items: d_list.append(self.item2gbdata(item).addCallback(cb)) return defer.DeferredList(d_list, consumeErrors=True).addCallback(lambda result: [value for (success, value) in result if success]) ## modify ## def updateGroupBlog(self, pub_data, comments, message, extra, profile_key=C.PROF_KEY_NONE): """Modify a microblog node @param pub_data: a tuple (service, node identifier, item identifier) @param comments: comments node identifier (for main item) or empty string @param message: new message @param extra: dict which option name as key, which can be: - allow_comments: True to accept an other level of comments, False else (default: False) - rich: if present, contain rich text in currently selected syntax @param profile_key: %(doc_profile) """ def initialised(result): profile, client = result mblog_data = {'content': message} for attr in ['content_rich', 'title', 'title_rich']: if attr in extra and extra[attr]: mblog_data[attr] = extra[attr] service, node, item_id = pub_data service_jid = jid.JID(service) if service else client.item_access_pubsub if comments or not node: # main item node = self.getNodeName(client.jid) mblog_data['id'] = unicode(item_id) if 'published' in extra: mblog_data['published'] = extra['published'] if extra.get('allow_comments', 'False').lower() == 'true': comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments) # we could use comments_node directly but it's safer to rebuild it # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63 entry_id = comments_node.split('_')[1].split('__')[0] self._fillCommentsElement(mblog_data, entry_id, node, service_jid) entry_d = self.host.plugins["XEP-0277"].data2entry(mblog_data, profile) entry_d.addCallback(lambda mblog_item: self.host.plugins["XEP-0060"].publish(service_jid, node, items=[mblog_item], profile_key=profile)) entry_d.addErrback(lambda failure: log.error(u"Modification of %s failed: %s" % (pub_data, failure.getErrorMessage()))) return entry_d return self._initialise(profile_key).addCallback(initialised) ## get ## def _getOrCountComments(self, items, max_=0, profile_key=C.PROF_KEY_NONE): """Get and/or count the comments of the given items. @param items (list): items to consider. @param max_ (int): maximum number of comments to get, if 0 only count them. The count is set to the item data of key "comments_count". @param profile_key (str): %(doc_profile_key)s @return: a deferred list of: - if max_ == 0: microblog data - else: couple (dict, (list[dict], dict)) containing: - microblog data (main item) - couple (comments data, RSM response data for the comments) """ def comments_cb(comments_data, entry): try: entry['comments_count'] = comments_data[1]['count'] except KeyError: # target pubsub server probably doesn't handle RSM pass return (entry, comments_data) if max_ > 0 else entry assert max_ >= 0 d_list = [] for entry in items: if entry.get('comments', False): comments_rsm = {'max_': max_} d = self.getGroupBlogComments(entry['comments_service'], entry['comments_node'], rsm_data=comments_rsm, profile_key=profile_key) d.addCallback(comments_cb, entry) d_list.append(d) else: if max_ > 0: d_list.append(defer.succeed((entry, ([], {})))) else: d_list.append(defer.succeed(entry)) deferred_list = defer.DeferredList(d_list) deferred_list.addCallback(lambda result: [value for (success, value) in result if success]) return deferred_list def _getGroupBlogs(self, pub_jid_s, item_ids=None, rsm_data=None, max_comments=0, profile_key=C.PROF_KEY_NONE): """Retrieve previously published items from a publish subscribe node. @param pub_jid_s: jid of the publisher @param item_ids: list of microblogs items IDs @param rsm_data (dict): RSM request data @param max_comments (int): maximum number of comments to retrieve @param profile_key (str): %(doc_profile_key)s @return: a deferred couple (list, dict) containing: - list of: - if max_comments == 0: microblog data - else: couple (dict, (list[dict], dict)) containing: - microblog data (main item) - couple (comments data, RSM response data for the comments) - RSM response data """ pub_jid = jid.JID(pub_jid_s) def cb(items, client): d = self._itemsConstruction(items, pub_jid, client) if max_comments == DO_NOT_COUNT_COMMENTS: return d return d.addCallback(self._getOrCountComments, max_comments, profile_key) return DeferredItems(self, cb, None, profile_key).get(self.getNodeName(pub_jid), item_ids, rsm_data=rsm_data) # def getGroupBlogs(self, pub_jid_s, item_ids=None, rsm_data=None, count_comments=True, profile_key=C.PROF_KEY_NONE): # """Get the published microblogs of the specified IDs. If item_ids is # None, the result would be the same than calling getGroupBlogs # with the default value for the attribute max_items. # @param pub_jid_s: jid of the publisher # @param item_ids: list of microblogs items IDs # @param rsm_data (dict): RSM request data # @param count_comments (bool): also count the comments if True # @param profile_key (str): %(doc_profile_key)s # @return: a deferred couple (list, dict) containing: # - list of microblog data # - RSM response data # """ # max_comments = 0 if count_comments else DO_NOT_COUNT_COMMENTS # return self._getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm_data=rsm_data, max_comments=max_comments, profile_key=profile_key) def getGroupBlogsWithComments(self, pub_jid_s, item_ids=None, rsm_data=None, max_comments=None, profile_key=C.PROF_KEY_NONE): """Get the published microblogs of the specified IDs and their comments. If item_ids is None, returns the last published microblogs and their comments. @param pub_jid_s: jid of the publisher @param item_ids: list of microblogs items IDs @param rsm (dict): RSM request data @param max_comments (int): maximum number of comments to retrieve @param profile_key (str): %(doc_profile_key)s @return: a deferred couple (list, dict) containing: - list of couple (dict, (list[dict], dict)) containing: - microblog data (main item) - couple (comments data, RSM response data for the comments) - RSM response data """ if max_comments is None: max_comments = MAX_COMMENTS assert max_comments > 0 # otherwise the return signature is not the same return self._getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm_data=rsm_data, max_comments=max_comments, profile_key=profile_key) # def _getMassiveGroupBlogs(self, publishers_type, publishers, rsm_data=None, profile_key=C.PROF_KEY_NONE): # if publishers_type == 'JID': # publishers_jids = [jid.JID(publisher) for publisher in publishers] # else: # publishers_jids = publishers # return self.getMassiveGroupBlogs(publishers_type, publishers_jids, rsm_data, profile_key) # def _getPublishersJIDs(self, publishers_type, publishers, client): # #TODO: custom exception # if publishers_type not in ["GROUP", "JID", "ALL"]: # raise Exception("Bad call, unknown publishers_type") # if publishers_type == "ALL" and publishers: # raise Exception("Publishers list must be empty when getting microblogs for all contacts") # if publishers_type == "ALL": # contacts = client.roster.getItems() # jids = [contact.jid.userhostJID() for contact in contacts] # elif publishers_type == "GROUP": # jids = [] # for _group in publishers: # jids.extend(client.roster.getJidsFromGroup(_group)) # elif publishers_type == 'JID': # jids = publishers # else: # raise UnknownType # return jids # def getMassiveGroupBlogs(self, publishers_type, publishers, rsm_data=None, profile_key=C.PROF_KEY_NONE): # """Get the last published microblogs for a list of groups or jids # @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") # @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) # @param rsm_data (dict): RSM request data, common to all publishers # @param profile_key: profile key # @return: a deferred dict with: # - key: publisher (unicode) # - value: couple (list[dict], dict) with: # - the microblogs data # - RSM response data # """ # def cb(items, publisher, client): # d = self._itemsConstruction(items, publisher, client) # return d.addCallback(self._getOrCountComments, False, profile_key) # #TODO: we need to use the server corresponding to the host of the jid # return DeferredItemsFromMany(self, cb, profile_key).get(publishers_type, publishers, rsm_data=rsm_data) ## subscribe ## # def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): # def initialised(result): # profile, client = result # d = self.host.plugins["XEP-0060"].subscribe(client.item_access_pubsub, self.getNodeName(jid.JID(pub_jid)), # profile_key=profile_key) # return d # #TODO: we need to use the server corresponding the the host of the jid # return self._initialise(profile_key).addCallback(initialised) ## delete ## def deleteGroupBlog(self, pub_data, comments, profile_key=C.PROF_KEY_NONE): """Delete a microblog item from a node. @param pub_data: a tuple (service, node identifier, item identifier) @param comments: comments node identifier (for main item) or empty string @param profile_key: %(doc_profile_key)s """ def initialised(result): profile, client = result service, node, item_id = pub_data service_jid = jid.JID(service) if service else client.item_access_pubsub if comments or not node: # main item node = self.getNodeName(client.jid) if comments: # remove the associated comments node comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments) d = self.host.plugins["XEP-0060"].deleteNode(comments_service, comments_node, profile_key=profile) d.addErrback(lambda failure: log.error(u"Deletion of node %s failed: %s" % (comments_node, failure.getErrorMessage()))) # remove the item itself d = self.host.plugins["XEP-0060"].retractItems(service_jid, node, [item_id], profile_key=profile) d.addErrback(lambda failure: log.error(u"Deletion of item %s from %s failed: %s" % (item_id, node, failure.getErrorMessage()))) return d def notify(d): # TODO: this works only on the same host, and notifications for item deletion should be # implemented according to http://xmpp.org/extensions/xep-0060.html#publisher-delete-success-notify # instead. The notification mechanism implemented in sat_pubsub and wokkel have apriori # a problem with retrieving the subscriptions, or something else. service, node, item_id = pub_data publisher = self.host.getJidNStream(profile_key)[0] profile = self.host.memory.getProfileName(profile_key) gbdatum = {'id': item_id, 'type': 'main_item' if (comments or not node) else 'comment'} self.host.bridge.personalEvent(publisher.full(), "MICROBLOG_DELETE", gbdatum, profile) return d return self._initialise(profile_key).addCallback(initialised).addCallback(notify) def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): """Delete absolutely all the microblog data that the user has posted""" calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)] return defer.DeferredList(calls) def deleteAllGroupBlogs(self, profile_key=C.PROF_KEY_NONE): """Delete all the main items that the user has posted and their comments. """ def initialised(result): profile, client = result service = client.item_access_pubsub jid_ = client.jid main_node = self.getNodeName(jid_) def cb(nodes): d_list = [] for node in [node for node in nodes if node.endswith(main_node)]: d = self.host.plugins["XEP-0060"].deleteNode(service, node, profile_key=profile) d.addErrback(lambda failure: log.error(_(u"Deletion of node %(node)s failed: %(message)s") % {'node': node, 'message': failure.getErrorMessage()})) d_list.append(d) return defer.DeferredList(d_list) d = self.host.plugins["XEP-0060"].listNodes(service, profile=profile) d.addCallback(cb) d.addCallback(lambda dummy: log.info(_(u"All microblog's main items from %s have been deleted!") % jid_.userhost())) return d return self._initialise(profile_key).addCallback(initialised) def deleteAllGroupBlogsComments(self, profile_key=C.PROF_KEY_NONE): """Delete all the comments that the user posted on other's main items. We avoid the conversions from item to microblog data as we only need to retrieve some attributes, no need to convert text syntax... """ def initialised(result): """Get all the main items from our contact list @return: a DeferredList """ profile, client = result service = client.item_access_pubsub jids = [contact.jid.userhostJID() for contact in client.roster.getItems()] blogs = [] for jid_ in jids: if jid_ == client.jid.userhostJID(): continue # do not remove the comments on our own node main_node = self.getNodeName(jid_) d = self.host.plugins["XEP-0060"].getItems(service, main_node, profile_key=profile) d.addCallback(lambda res: getComments(res[0], client)) d.addErrback(lambda failure, main_node: log.error(_(u"Retrieval of items for node %(node)s failed: %(message)s") % {'node': main_node, 'message': failure.getErrorMessage()}), main_node) blogs.append(d) return defer.DeferredList(blogs) def getComments(items, client): """Get all the comments for a list of items @param items: a list of main items for one user @param client: the client of the user @return: a DeferredList """ comments = [] for item in items: try: entry = generateElementsNamed(item.elements(), 'entry').next() link = generateElementsNamed(entry.elements(), 'link').next() except StopIteration: continue href = link.getAttribute('href') service, node = self.host.plugins['XEP-0277'].parseCommentUrl(href) d = self.host.plugins["XEP-0060"].getItems(service, node, profile_key=profile_key) d.addCallback(lambda items: (service, node, items[0])) d.addErrback(lambda failure, node: log.error(_(u"Retrieval of comments for node %(node)s failed: %(message)s") % {'node': node, 'message': failure.getErrorMessage()}), node) comments.append(d) dlist = defer.DeferredList(comments) dlist.addCallback(deleteComments, client) return dlist def deleteComments(result, client): """Delete all the comments of the user that are found in result @param result: a list of couple (success, value) with success a boolean and value a tuple (service as JID, node_id, comment_items) @param client: the client of the user @return: a DeferredList with the deletions result """ user_jid_s = client.jid.userhost() for (success, value) in result: if not success: continue service, node_id, comment_items = value item_ids = [] for comment_item in comment_items: # for all the comments on one post try: entry = generateElementsNamed(comment_item.elements(), 'entry').next() author = generateElementsNamed(entry.elements(), 'author').next() name = generateElementsNamed(author.elements(), 'name').next() except StopIteration: continue if name.children[0] == user_jid_s: item_ids.append(comment_item.getAttribute('id')) deletions = [] if item_ids: # remove the comments of the user on the given post d = self.host.plugins['XEP-0060'].retractItems(service, node_id, item_ids, profile_key=profile_key) d.addCallback(lambda dummy, node_id: log.debug(_(u'Comments of user %(user)s in node %(node)s have been retracted') % {'user': user_jid_s, 'node': node_id}), node_id) d.addErrback(lambda failure, node_id: log.error(_(u"Retraction of comments from %(user)s in node %(node)s failed: %(message)s") % {'user': user_jid_s, 'node': node_id, 'message': failure.getErrorMessage()}), node_id) deletions.append(d) return defer.DeferredList(deletions) return self._initialise(profile_key).addCallback(initialised) ## helper classes to manipulate items ## class DeferredItems(): """Retrieve items using XEP-0060""" def __init__(self, parent, cb, eb=None, profile_key=C.PROF_KEY_NONE): """ @param parent (GroupBlog): GroupBlog instance @param cb (callable): callback method to be applied on items @param eb (callable): errback method to be applied on items @param profile_key (str): %(doc_profile_key)s """ self.parent = parent self.cb = cb self.eb = (lambda dummy: ([], {})) if eb is None else eb self.profile_key = profile_key def get(self, node, item_ids=None, sub_id=None, rsm_data=None): """Retrieve and process a page of pubsub items @param node (str): node identifier. @param item_ids (list[str]): list of items identifiers. @param sub_id (str): optional subscription identifier. @param rsm_data (dict): RSM request data @return: a deferred couple (list, dict) containing: - list of microblog data - RSM response data """ if rsm_data is None: rsm_data = {'max_': (len(item_ids) if item_ids else MAX_ITEMS)} def initialised(result): profile, client = result rsm_request = rsm.RSMRequest(**rsm_data) d = self.parent.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, node, rsm_request.max, item_ids, sub_id, rsm_request, profile_key=profile) def cb(result): d = defer.maybeDeferred(self.cb, result[0], client) return d.addCallback(lambda items: (items, result[1])) d.addCallbacks(cb, self.eb) return d #TODO: we need to use the server corresponding to the host of the jid return self.parent._initialise(self.profile_key).addCallback(initialised) class DeferredItemsFromMany(): def __init__(self, parent, cb, profile_key=C.PROF_KEY_NONE): """ @param parent (GroupBlog): GroupBlog instance @param cb (callable): callback method to be applied on items @param profile_key (str): %(doc_profile_key)s """ self.parent = parent self.cb = cb self.profile_key = profile_key def _buildData(self, publishers_type, publishers, client): jids = self.parent._getPublishersJIDs(publishers_type, publishers, client) return {publisher: self.parent.getNodeName(publisher) for publisher in jids} def get(self, publishers_type, publishers, sub_id=None, rsm_data=None): """Retrieve and process a page of pubsub items @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) @param sub_id (str): optional subscription identifier. @param rsm_data (dict): RSM request data @return: a deferred dict with: - key: publisher (unicode) - value: couple (list[dict], dict) with: - the microblogs data - RSM response data """ if rsm_data is None: rsm_data = {'max_': MAX_ITEMS} def initialised(result): profile, client = result data = self._buildData(publishers_type, publishers, client) rsm_request = rsm.RSMRequest(**rsm_data) d = self.parent.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, rsm_request.max, sub_id, rsm_request, profile_key=profile) def cb(publisher): def callback(result): d = defer.maybeDeferred(self.cb, result[0], publisher, client) d.addCallback(lambda items: (publisher.full(), (items, result[1]))) return d return callback def cb_list(result): return {value[0]: value[1] for success, value in result if success} def main_cb(result): d_list = [] for publisher, d_items in result.items(): # XXX: trick needed as publisher is a loop variable d_list.append(d_items.addCallback(cb(publisher))) return defer.DeferredList(d_list, consumeErrors=False).addCallback(cb_list) d.addCallback(main_cb) return d #TODO: we need to use the server corresponding to the host of the jid return self.parent._initialise(self.profile_key).addCallback(initialised) class GroupBlog_handler(XMPPHandler): implements(iwokkel.IDisco) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): return [disco.DiscoFeature(NS_GROUPBLOG)] def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []