Mercurial > libervia-backend
changeset 1420:7c0acb966fd6
plugins groupblog, xep-0060: first pass of simplification
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Apr 2015 20:21:55 +0200 |
parents | be2df1ddea8e |
children | 16b1ba7ccaaa |
files | src/plugins/plugin_misc_groupblog.py src/plugins/plugin_xep_0060.py |
diffstat | 2 files changed, 133 insertions(+), 139 deletions(-) [+] |
line wrap: on
line diff
--- a/src/plugins/plugin_misc_groupblog.py Wed Apr 22 18:30:28 2015 +0200 +++ b/src/plugins/plugin_misc_groupblog.py Wed Apr 22 20:21:55 2015 +0200 @@ -143,6 +143,8 @@ host.trigger.add("PubSubItemsReceived", self.pubSubItemsReceivedTrigger) + ## plugin management methods ## + def getHandler(self, profile): return GroupBlog_handler() @@ -212,6 +214,8 @@ return False return True + ## internal helping methodes ## + def _handleCommentsItems(self, items, service, node_identifier): """ Convert comments items to groupblog data, and send them as signals @@ -268,6 +272,8 @@ """ return NS_NODE_PREFIX + publisher.userhost() + ## publish ## + def _publishMblog(self, service, client, access_type, access_list, message, extra): """Actually publish the message on the group blog @@ -291,7 +297,7 @@ if extra.get('allow_comments', 'False').lower() == 'true': # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63 - comments_node = self.__fillCommentsElement(mblog_data, None, node_name, service) + comments_node = self._fillCommentsElement(mblog_data, None, node_name, service) _options = {P.OPT_ACCESS_MODEL: access_model_value, P.OPT_PERSIST_ITEMS: 1, P.OPT_MAX_ITEMS: -1, @@ -335,7 +341,7 @@ entry_d.addCallback(itemCreated) return entry_d - def __fillCommentsElement(self, mblog_data, entry_id, node_name, service_jid): + def _fillCommentsElement(self, mblog_data, entry_id, node_name, service_jid): """ @param mblog_data: dict containing the microblog data @param entry_id: unique identifier of the entry @@ -356,6 +362,7 @@ def sendGroupBlog(self, access_type, access_list, message, extra, profile_key=C.PROF_KEY_NONE): """Publish a microblog with given item access + @param access_type: one of "PUBLIC", "GROUP", "JID" @param access_list: list of authorized entity (empty list for PUBLIC ACCESS, list of groups or list of jids) for this item @@ -385,80 +392,6 @@ return self._initialise(profile_key).addCallback(initialised) - def deleteGroupBlog(self, pub_data, comments, profile_key=C.PROF_KEY_NONE): - """Delete a microblog item from a node. - @param pub_data: a tuple (service, node identifier, item identifier) - @param comments: comments node identifier (for main item) or empty string - @param profile_key: %(doc_profile_key)s - """ - - def initialised(result): - profile, client = result - service, node, item_id = pub_data - service_jid = jid.JID(service) if service else client.item_access_pubsub - if comments or not node: # main item - node = self.getNodeName(client.jid) - if comments: - # remove the associated comments node - comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments) - d = self.host.plugins["XEP-0060"].deleteNode(comments_service, comments_node, profile_key=profile) - d.addErrback(lambda failure: log.error(u"Deletion of node %s failed: %s" % (comments_node, failure.getErrorMessage()))) - # remove the item itself - d = self.host.plugins["XEP-0060"].retractItems(service_jid, node, [item_id], profile_key=profile) - d.addErrback(lambda failure: log.error(u"Deletion of item %s from %s failed: %s" % (item_id, node, failure.getErrorMessage()))) - return d - - def notify(d): - # TODO: this works only on the same host, and notifications for item deletion should be - # implemented according to http://xmpp.org/extensions/xep-0060.html#publisher-delete-success-notify - # instead. The notification mechanism implemented in sat_pubsub and wokkel have apriori - # a problem with retrieving the subscriptions, or something else. - service, node, item_id = pub_data - publisher = self.host.getJidNStream(profile_key)[0] - profile = self.host.memory.getProfileName(profile_key) - gbdatum = {'id': item_id, 'type': 'main_item' if (comments or not node) else 'comment'} - self.host.bridge.personalEvent(publisher.full(), "MICROBLOG_DELETE", gbdatum, profile) - return d - - return self._initialise(profile_key).addCallback(initialised).addCallback(notify) - - def updateGroupBlog(self, pub_data, comments, message, extra, profile_key=C.PROF_KEY_NONE): - """Modify a microblog node - @param pub_data: a tuple (service, node identifier, item identifier) - @param comments: comments node identifier (for main item) or empty string - @param message: new message - @param extra: dict which option name as key, which can be: - - allow_comments: True to accept an other level of comments, False else (default: False) - - rich: if present, contain rich text in currently selected syntax - @param profile_key: %(doc_profile) - """ - - def initialised(result): - profile, client = result - mblog_data = {'content': message} - for attr in ['content_rich', 'title', 'title_rich']: - if attr in extra and extra[attr]: - mblog_data[attr] = extra[attr] - service, node, item_id = pub_data - service_jid = jid.JID(service) if service else client.item_access_pubsub - if comments or not node: # main item - node = self.getNodeName(client.jid) - mblog_data['id'] = unicode(item_id) - if 'published' in extra: - mblog_data['published'] = extra['published'] - if extra.get('allow_comments', 'False').lower() == 'true': - comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments) - # we could use comments_node directly but it's safer to rebuild it - # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63 - entry_id = comments_node.split('_')[1].split('__')[0] - self.__fillCommentsElement(mblog_data, entry_id, node, service_jid) - entry_d = self.host.plugins["XEP-0277"].data2entry(mblog_data, profile) - entry_d.addCallback(lambda mblog_item: self.host.plugins["XEP-0060"].publish(service_jid, node, items=[mblog_item], profile_key=profile)) - entry_d.addErrback(lambda failure: log.error(u"Modification of %s failed: %s" % (pub_data, failure.getErrorMessage()))) - return entry_d - - return self._initialise(profile_key).addCallback(initialised) - def sendGroupBlogComment(self, node_url, message, extra, profile_key=C.PROF_KEY_NONE): """Publish a comment in the given node @param node url: link to the comments node as specified in XEP-0277 and given in microblog data's comments key @@ -519,6 +452,48 @@ d_list.append(self.item2gbdata(item).addCallback(cb)) return defer.DeferredList(d_list, consumeErrors=True).addCallback(lambda result: [value for (success, value) in result if success]) + ## modify ## + + def updateGroupBlog(self, pub_data, comments, message, extra, profile_key=C.PROF_KEY_NONE): + """Modify a microblog node + + @param pub_data: a tuple (service, node identifier, item identifier) + @param comments: comments node identifier (for main item) or empty string + @param message: new message + @param extra: dict which option name as key, which can be: + - allow_comments: True to accept an other level of comments, False else (default: False) + - rich: if present, contain rich text in currently selected syntax + @param profile_key: %(doc_profile) + """ + + def initialised(result): + profile, client = result + mblog_data = {'content': message} + for attr in ['content_rich', 'title', 'title_rich']: + if attr in extra and extra[attr]: + mblog_data[attr] = extra[attr] + service, node, item_id = pub_data + service_jid = jid.JID(service) if service else client.item_access_pubsub + if comments or not node: # main item + node = self.getNodeName(client.jid) + mblog_data['id'] = unicode(item_id) + if 'published' in extra: + mblog_data['published'] = extra['published'] + if extra.get('allow_comments', 'False').lower() == 'true': + comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments) + # we could use comments_node directly but it's safer to rebuild it + # XXX: use the item identifier? http://bugs.goffi.org/show_bug.cgi?id=63 + entry_id = comments_node.split('_')[1].split('__')[0] + self._fillCommentsElement(mblog_data, entry_id, node, service_jid) + entry_d = self.host.plugins["XEP-0277"].data2entry(mblog_data, profile) + entry_d.addCallback(lambda mblog_item: self.host.plugins["XEP-0060"].publish(service_jid, node, items=[mblog_item], profile_key=profile)) + entry_d.addErrback(lambda failure: log.error(u"Modification of %s failed: %s" % (pub_data, failure.getErrorMessage()))) + return entry_d + + return self._initialise(profile_key).addCallback(initialised) + + ## get ## + def _getOrCountComments(self, items, max=0, profile_key=C.PROF_KEY_NONE): """Get and/or count the comments of the given items. @@ -553,7 +528,7 @@ deferred_list.addCallback(lambda result: [value for (success, value) in result if success]) return deferred_list - def __getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, max_comments=0, profile_key=C.PROF_KEY_NONE): + def _getGroupBlogs(self, pub_jid_s, item_ids=None, rsm=None, max_comments=0, profile_key=C.PROF_KEY_NONE): """Retrieve previously published items from a publish subscribe node. @param pub_jid_s: jid of the publisher @@ -594,7 +569,7 @@ - RSM response data """ max_comments = 0 if count_comments else DO_NOT_COUNT_COMMENTS - return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) + return self._getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) def getGroupBlogsWithComments(self, pub_jid_s, item_ids=None, rsm=None, max_comments=None, profile_key=C.PROF_KEY_NONE): """Get the published microblogs of the specified IDs and their comments. If @@ -614,7 +589,7 @@ if max_comments is None: max_comments = MAX_COMMENTS assert(max_comments > 0) # otherwise the return signature is not the same - return self.__getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) + return self._getGroupBlogs(pub_jid_s, item_ids=item_ids, rsm=rsm, max_comments=max_comments, profile_key=profile_key) def getGroupBlogsAtom(self, pub_jid_s, rsm=None, profile_key=C.PROF_KEY_NONE): """Get the atom feed of the last published microblogs @@ -719,6 +694,8 @@ #TODO: we need to use the server corresponding to the host of the jid return DeferredItemsFromMany(self, cb, profile_key).get(publishers_type, publishers, rsm=rsm) + ## subscribe ## + def subscribeGroupBlog(self, pub_jid, profile_key=C.PROF_KEY_NONE): def initialised(result): profile, client = result @@ -752,6 +729,46 @@ yield defer.DeferredList(d_list, consumeErrors=False) defer.returnValue(None) + ## delete ## + + def deleteGroupBlog(self, pub_data, comments, profile_key=C.PROF_KEY_NONE): + """Delete a microblog item from a node. + + @param pub_data: a tuple (service, node identifier, item identifier) + @param comments: comments node identifier (for main item) or empty string + @param profile_key: %(doc_profile_key)s + """ + + def initialised(result): + profile, client = result + service, node, item_id = pub_data + service_jid = jid.JID(service) if service else client.item_access_pubsub + if comments or not node: # main item + node = self.getNodeName(client.jid) + if comments: + # remove the associated comments node + comments_service, comments_node = self.host.plugins["XEP-0277"].parseCommentUrl(comments) + d = self.host.plugins["XEP-0060"].deleteNode(comments_service, comments_node, profile_key=profile) + d.addErrback(lambda failure: log.error(u"Deletion of node %s failed: %s" % (comments_node, failure.getErrorMessage()))) + # remove the item itself + d = self.host.plugins["XEP-0060"].retractItems(service_jid, node, [item_id], profile_key=profile) + d.addErrback(lambda failure: log.error(u"Deletion of item %s from %s failed: %s" % (item_id, node, failure.getErrorMessage()))) + return d + + def notify(d): + # TODO: this works only on the same host, and notifications for item deletion should be + # implemented according to http://xmpp.org/extensions/xep-0060.html#publisher-delete-success-notify + # instead. The notification mechanism implemented in sat_pubsub and wokkel have apriori + # a problem with retrieving the subscriptions, or something else. + service, node, item_id = pub_data + publisher = self.host.getJidNStream(profile_key)[0] + profile = self.host.memory.getProfileName(profile_key) + gbdatum = {'id': item_id, 'type': 'main_item' if (comments or not node) else 'comment'} + self.host.bridge.personalEvent(publisher.full(), "MICROBLOG_DELETE", gbdatum, profile) + return d + + return self._initialise(profile_key).addCallback(initialised).addCallback(notify) + def deleteAllGroupBlogsAndComments(self, profile_key=C.PROF_KEY_NONE): """Delete absolutely all the microblog data that the user has posted""" calls = [self.deleteAllGroupBlogs(profile_key), self.deleteAllGroupBlogsComments(profile_key)] @@ -865,9 +882,10 @@ return self._initialise(profile_key).addCallback(initialised) +## helper classes to manipulate items ## class DeferredItems(): - """Helper class to retrieve items using XEP-0060""" + """Retrieve items using XEP-0060""" def __init__(self, parent, cb, eb=None, profile_key=C.PROF_KEY_NONE): """ @@ -883,6 +901,7 @@ def get(self, node, item_ids=None, sub_id=None, rsm=None): """ + @param node (str): node identifier. @param item_ids (list[str]): list of items identifiers. @param sub_id (str): optional subscription identifier. @@ -898,7 +917,7 @@ profile, client = result rsm_ = wokkel_rsm.RSMRequest(**rsm) d = self.parent.host.plugins["XEP-0060"].getItems(client.item_access_pubsub, - node, rsm_.max, + node, rsm_.max_, item_ids, sub_id, rsm_, profile_key=profile) @@ -924,7 +943,7 @@ self.cb = cb self.profile_key = profile_key - def __buildData(self, publishers_type, publishers, client): + def _buildData(self, publishers_type, publishers, client): jids = self.parent._getPublishersJIDs(publishers_type, publishers, client) return {publisher: self.parent.getNodeName(publisher) for publisher in jids} @@ -946,7 +965,7 @@ def initialised(result): profile, client = result - data = self.__buildData(publishers_type, publishers, client) + data = self._buildData(publishers_type, publishers, client) rsm_ = wokkel_rsm.RSMRequest(**rsm) d = self.parent.host.plugins["XEP-0060"].getItemsFromMany(client.item_access_pubsub, data, rsm_.max, sub_id,
--- a/src/plugins/plugin_xep_0060.py Wed Apr 22 18:30:28 2015 +0200 +++ b/src/plugins/plugin_xep_0060.py Wed Apr 22 20:21:55 2015 +0200 @@ -58,46 +58,21 @@ log.info(_(u"PubSub plugin initialization")) self.host = host self.managedNodes = [] - self.clients = {} self.node_cache = Sessions(timeout=60, resettable_timeout=False) def getHandler(self, profile): - self.clients[profile] = SatPubSubClient(self.host, self) - return self.clients[profile] - - def profileDisconnected(self, profile): - try: - del self.clients[profile] - except KeyError: - pass + client = self.host.getClient(profile) + client.pubsub_client = SatPubSubClient(self.host, self) + return client.pubsub_client def addManagedNode(self, node_name, callback): """Add a handler for a namespace + @param namespace: NS of the handler (will appear in disco info) @param callback: method to call when the handler is found @param profile: profile which manage this handler""" self.managedNodes.append((node_name, callback)) - def __getClientNProfile(self, profile_key, action='do pusbsub'): - """Return a tuple of (client, profile) - raise error when the profile doesn't exists - @param profile_key: as usual :) - @param action: text of action to show in case of error""" - profile = self.host.memory.getProfileName(profile_key) - if not profile: - err_mess = _('Trying to %(action)s with an unknown profile key [%(profile_key)s]') % { - 'action': action, - 'profile_key': profile_key} - log.error(err_mess) - raise Exception(err_mess) - try: - client = self.clients[profile] - except KeyError: - err_mess = _('INTERNAL ERROR: no handler for required profile') - log.error(err_mess) - raise Exception(err_mess) - return profile, client - def _getDeferredNodeCache(self, session_id, init, profile): """Manage a node cache with deferred initialisation and concurrent access. @@ -146,7 +121,7 @@ @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) @param filter_ (str): filter the result according to the given subscription type: - None: do not filter - - 'pending': subscription has been approved yet by the node owner + - 'pending': subscription has not been approved yet by the node owner - 'unconfigured': subscription options have not been configured yet - 'subscribed': subscription is complete @param profile (str): %(doc_profile)s @@ -158,8 +133,8 @@ return self._getDeferredNodeCache(session_id, d, profile) def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'publish item') - return client.publish(service, nodeIdentifier, items, client.parent.jid) + client = self.host.getClient(profile_key) + return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): """Retrieve pubsub items from a node. @@ -175,10 +150,10 @@ - list of items - RSM response data """ - profile, client = self.__getClientNProfile(profile_key, 'get items') + client = self.host.getClient(profile_key) ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None - d = client.items(service, node, max_items, item_ids, sub_id, client.parent.jid, ext_data) - d.addCallback(lambda items: (items, client.getRSMResponse(ext_data['id']) if rsm else {})) + d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data) + d.addCallback(lambda items: (items, client.pubsub_client.getRSMResponse(ext_data['id']) if rsm else {})) return d @defer.inlineCallbacks @@ -197,39 +172,39 @@ - list of items - RSM response data """ - profile, client = self.__getClientNProfile(profile_key, 'get items') - found_nodes = yield self.listNodes(service, profile=profile) + client = self.host.getClient(profile_key) + found_nodes = yield self.listNodes(service, profile=client.profile) d_dict = {} for publisher, node in data.items(): if node not in found_nodes: log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) continue # avoid pubsub "item-not-found" error - d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, profile) + d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) defer.returnValue(d_dict) def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'get options') - return client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) + client = self.host.getClient(profile_key) + return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'set options') - return client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) + client = self.host.getClient(profile_key) + return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'create node') - return client.createNode(service, nodeIdentifier, options) + client = self.host.getClient(profile_key) + return client.pubsub_client.createNode(service, nodeIdentifier, options) def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'delete node') - return client.deleteNode(service, nodeIdentifier) + client = self.host.getClient(profile_key) + return client.pubsub_client.deleteNode(service, nodeIdentifier) def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'retract items') - return client.retractItems(service, nodeIdentifier, itemIdentifiers) + client = self.host.getClient(profile_key) + return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers) def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'subscribe node') - return client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options) + client = self.host.getClient(profile_key) + return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options) @defer.inlineCallbacks def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE): @@ -242,20 +217,20 @@ @param profile_key (str): %(doc_profile_key)s @return: list of Deferred instances. """ - profile, client = self.__getClientNProfile(profile_key, 'subscribe nodes') - found_nodes = yield self.listNodes(service, profile=profile) - subscribed_nodes = yield self.listSubscribedNodes(service, profile=profile) + client = self.host.getClient(profile_key) + found_nodes = yield self.listNodes(service, profile=client.profile) + subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) d_list = [] for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): if nodeIdentifier not in found_nodes: log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) continue # avoid sat-pubsub "SubscriptionExists" error - d_list.append(client.subscribe(service, nodeIdentifier, sub_jid or client.parent.jid.userhostJID(), options=options)) + d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) defer.returnValue(d_list) def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): - profile, client = self.__getClientNProfile(profile_key, 'retrieve subscriptions') - return client.subscriptions(service, nodeIdentifier) + client = self.host.getClient(profile_key) + return client.pubsub_client.subscriptions(service, nodeIdentifier) class SatPubSubClient(rsm.PubSubClient):