Mercurial > libervia-backend
view src/plugins/plugin_misc_groupblog.py @ 343:6fe6ae70904a
plugin xep 0277: added OPT_DELIVER_PAYLOADS OPT_SEND_ITEM_SUBSCRIBE to microblog options when changing access
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 May 2011 20:21:45 +0200 |
parents | 2572351d875a |
children | f964dcec1611 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- """ SAT plugin for microbloging with roster access Copyright (C) 2009, 2010, 2011 Jérôme Poisson (goffi@goffi.org) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. """ from logging import debug, info, error from twisted.internet import protocol, defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error as jab_error import twisted.internet.error from twisted.words.xish import domish from sat.tools.xml_tools import ElementParser from wokkel import disco,pubsub from feed.atom import Entry, Author import uuid from time import time NS_MICROBLOG = 'urn:xmpp:microblog:%02d' CONFIG_NODE = 'CONFIG' OPT_ACCESS_MODEL = 'pubsub#access_model' OPT_PERSIST_ITEMS = 'pubsub#persist_items' OPT_MAX_ITEMS = 'pubsub#max_items' OPT_NODE_TYPE = 'pubsub#node_type' OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type' OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth' TYPE_COLLECTION = 'collection' PLUGIN_INFO = { "name": "Group blogging throught collections", "import_name": "groupblog", "type": "MISC", "protocols": [], "dependencies": ["XEP-0277"], "main": "GroupBlog", "handler": "no", "description": _("""Implementation of microblogging with roster access""") } class NodeCreationError(Exception): pass class NodeDeletionError(Exception): pass class GroupBlog(): """This class use a PubSub Collection to manage roster access on microblog""" def __init__(self, host): info(_("Group blog plugin initialization")) self.host = host self._blog_nodes={} #keep association betweek [profile][node] and [groups] for i in range(1,21): self.host.plugins["XEP-0163"].addPEPEvent("MICROBLOG_%02d" % i, NS_MICROBLOG % i, self.groupblogCB, None) host.bridge.addMethod("cleanBlogCollection", ".communication", in_sign='s', out_sign='', method=self.cleanBlogCollection, doc = { }) host.bridge.addMethod("getMblogNodes", ".communication", in_sign='s', out_sign='a{sas}', method=self.getMblogNodes, async = True, doc = { 'summary':"retrieve mblog node, and their association with roster's groups", 'param_0':'%(doc_profile)s', 'return':'list of microblog data (dict)' }) host.bridge.addMethod("sendGroupBlog", ".communication", in_sign='asss', out_sign='', method=self.sendGroupBlog, doc = { 'summary':"Send a microblog to a list of groups", 'param_0':'list of groups which can read the microblog', 'param_1':'text to send', 'param_2':'%(doc_profile)s' }) host.bridge.addMethod("subscribeGroupBlog", ".communication", in_sign='ss', out_sign='', method=self.subscribeGroupBlog, doc = { 'summary':"Subscribe to the group blog of somebody", 'param_0':'jid of the group node published', 'param_1':'%(doc_profile)s' }) def groupblogCB(self, itemsEvent, profile): for item in itemsEvent.items: microblog_data = self.host.plugins["XEP-0277"]._item2mbdata(item) microblog_data["node"] = itemsEvent.nodeIdentifier try: microblog_data["groups"] = "\n".join(self._blog_nodes[profile].get(itemsEvent.nodeIdentifier, [])) except KeyError: pass self.host.bridge.personalEvent(itemsEvent.sender.full(), "MICROBLOG", microblog_data, profile) def _getRootNode(self, entity): return "%(entity)s_%(root_suff)s" % {'entity':entity.userhost(), 'root_suff':MBLOG_COLLECTION} def _getNodeName(self, number): """Return the node name @param number: int number of the node @param entity: jid of the owner""" return NS_MICROBLOG % number def _getConfigNode(self, entity): return "%(entity)s_%(root_suff)s" % {'entity':entity.userhost(), 'root_suff':CONFIG_NODE} def _configNodeCb(self, result, callback, profile): self._blog_nodes[profile] = {} for item in result: node_ass = item.firstChildElement() assert(node_ass.name == "node_association") node = node_ass['node'] groups = [unicode(group) for group in node_ass.children] self._blog_nodes[profile][node] = groups callback(self._blog_nodes[profile]) def _configNodeFail(self, failure, errback): errback("") #FIXME def _configNodeErr(self, failure, user_jid, pubsub_ent, callback, errback, profile): if failure.value.condition == 'item-not-found': debug(_('Multiblog config node not found, creating it')) _options = {OPT_ACCESS_MODEL:"whitelist", OPT_PERSIST_ITEMS:1, OPT_MAX_ITEMS:-1} d = self.host.plugins["XEP-0060"].createNode(pubsub_ent, self._getConfigNode(user_jid), _options, profile_key=profile) d.addCallback(lambda result: self._configNodeCb([] , callback, profile)) d.addErrback(self._configNodeFail, errback) else: self._configNodeFail(failure, errback) def getMblogNodes(self, profile_key='@DEFAULT@', callback=None, errback=None): debug(_('Getting mblog nodes')) profile = self.host.memory.getProfileName(profile_key) if not profile: error(_("Unknown profile")) return {} def after_init(ignore): pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile) _jid, xmlstream = self.host.getJidNStream(profile_key) d = self.host.plugins["XEP-0060"].getItems(pubsub_ent, self._getConfigNode(_jid), profile_key=profile_key) d.addCallbacks(self._configNodeCb, self._configNodeErr, callbackArgs=(callback, profile), errbackArgs=(_jid, pubsub_ent, callback, errback, profile)) client = self.host.getClient(profile) if not client: error(_('No client for this profile key: %s') % profile_key) return client.client_initialized.addCallback(after_init) def _publishMblog(self, name, message, client): """Actually publish the message on the group blog @param name: name of the node where we publish @param message: message to publish @param client: SatXMPPClient of the published""" mblog_item = self.host.plugins["XEP-0277"].data2entry({'content':message}, client.profile) defer_blog = self.host.plugins["XEP-0060"].publish(client.jid.userhostJID(), name, items=[mblog_item], profile_key=client.profile) defer_blog.addErrback(self._mblogPublicationFailed) def _groupNodeCreated(self, ignore, groups, name, message, client): """A group node as been created, we need to add it to the configure node, and send the message to it @param groups: list of groups authorized to subscribe to the node @param name: unique name of the node @param message: message to publish to the group @param client: SatXMPPClient""" def configNodeUpdated(result): self._blog_nodes[client.profile][name] = groups debug(_("Configuration node updated")) config_node = self._getConfigNode(client.jid) _payload = domish.Element(('','node_association')) _payload['node'] = name for group in groups: _payload.addElement('group',content=group) config_item = pubsub.Item(payload=_payload) pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", client.profile) defer_config = self.host.plugins["XEP-0060"].publish(pubsub_ent, config_node, items=[config_item], profile_key=client.profile) defer_config.addCallback(configNodeUpdated) defer_config.addErrback(self._configUpdateFailed) #Finally, we publish the message self._publishMblog(name, message, client) def _mblogPublicationFailed(self, failure): #TODO pass def _configUpdateFailed(self, failure): #TODO pass def _nodeCreationFailed(self, failure, name, groups, message, client): #FIXME: temporary behaviour is to delete the node, #user input should be required in the future def unmanagedError(failure): msg = _("Can't create node") error(msg) raise NodeCreationError(msg) if failure.value.condition == 'conflict': pubsub_elts = filter(lambda elt: elt.name == 'pubsub', failure.value.children) if pubsub_elts: create_elts = filter(lambda elt: elt.name == 'create', pubsub_elts[0].children) if create_elts: _from = jid.JID(failure.value.stanza['from']) _node = create_elts[0]['node'] d = self.host.plugins["XEP-0060"].deleteNode(_from, _node, client.profile) d.addCallback(self._createNode, name, groups, message, client) d.addErrback(unmanagedError) else: unmanagedError(None) msg = _("Can't create node") error(msg) raise NodeCreationError(msg) def _createNode(self, ignore, name, groups, message, client): """create a group microblog node @param ignore: ignored param, necessary to be added as a deferred callback @param name: name of the node @param groups: list of group than can subscribe to the node @param message: message to publish @param client: SatXMPPClient""" _options = {OPT_ACCESS_MODEL:"roster", OPT_PERSIST_ITEMS:1, OPT_MAX_ITEMS:-1, 'pubsub#roster_groups_allowed':groups} d = self.host.plugins["XEP-0060"].createNode(client.jid.userhostJID(), name, _options, client.profile) d.addCallback(self._groupNodeCreated, groups, name, message, client) d.addErrback(self._nodeCreationFailed, name, groups, message, client) def _getNodeForGroups(self, groups, profile): """Return node associated with the given list of groups @param groups: list of groups @param profile: profile of publisher""" for node in self._blog_nodes[profile]: node_groups = self._blog_nodes[profile][node] if set(node_groups) == set(groups): return node return None def _getFreeNode(self, entity, profile): """Return a free group number, raise an exception if we have reach limit""" _all = set([self._getNodeName(idx) for idx in range(1,21)]) _used = set(self._blog_nodes[profile].keys()) _free = _all.difference(_used) if not _free: msg = _("Can't create group node: no more free node available") warning(msg) raise NodeCreationError(msg) else: return _free.pop() def sendGroupBlog(self, groups, message, profile_key='@DEFAULT@'): """Publish a microblog to the node associated to the groups If the node doesn't exist, it is created, then the message is posted @param groups: list of groups allowed to retrieve the microblog @param message: microblog @profile_key: %(doc_profile)s """ profile = self.host.memory.getProfileName(profile_key) if not profile: error(_("Unknown profile")) return def after_init(ignore): _groups = list(set(groups).intersection(client.roster.getGroups())) #We only keep group which actually exist #TODO: send an error signal if user want to post to non existant groups _groups.sort() for group in _groups: _node = self._getNodeForGroups([group], profile) if not _node: _node_name = self._getFreeNode(client.jid, profile) self._createNode(None, _node_name, [group], message, client) else: self._publishMblog(_node, message, client) client = self.host.getClient(profile) if not client: error(_('No client for this profile key: %s') % profile_key) return client.client_initialized.addCallback(after_init) def _doCleaning(self, result, pubsub_ent, profile): """Compare the node in config node, and the existing nodes, and delete unknown ones""" #TODO: manage groups which don't exist anymore assert(len(result)==2) assert(result[0][0]==True and result[1][0]==True) config_nodes = [item.firstChildElement()["node"] for item in result[0][1]] existing_nodes = [item.nodeIdentifier for item in result[1][1]._items] to_delete = set(config_nodes).symmetric_difference(existing_nodes) def check_deletion(result): for (success, value) in result: if not success: msg = _("Can't delete node") error(msg) raise NodeDeletionError(msg) #TODO: log node which was not deleted d = defer.DeferredList([self.host.plugins["XEP-0060"].deleteNode(pubsub_ent, node, profile) for node in to_delete]) d.addCallback(check_deletion) def cleanBlogCollection(self, profile_key='@DEFAULT@'): """Remove blog nodes not referenced in config node""" debug(_('Cleaning mblog nodes')) profile = self.host.memory.getProfileName(profile_key) if not profile: error(_("Unknown profile")) return {} def after_init(ignore): pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile) _jid, xmlstream = self.host.getJidNStream(profile_key) d_config = self.host.plugins["XEP-0060"].getItems(pubsub_ent, self._getConfigNode(_jid), profile_key=profile_key) d_root = client.disco.requestItems(pubsub_ent, self._getRootNode(client.jid)) defer.DeferredList([d_config, d_root]).addCallback(self._doCleaning, pubsub_ent, profile) client = self.host.getClient(profile) if not client: error(_('No client for this profile key: %s') % profile_key) return client.client_initialized.addCallback(after_init) def subscribeGroupBlog(self, pub_jid, profile_key='@DEFAULT@'): debug(_('subscribing mblog nodes')) _pub_jid = jid.JID(pub_jid) profile = self.host.memory.getProfileName(profile_key) if not profile: error(_("Unknown profile")) return def after_init(ignore): pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile) _options = {OPT_SUBSCRIPTION_TYPE:'items', OPT_SUBSCRIPTION_DEPTH:'1'} d = self.host.plugins["XEP-0060"].subscribe(pubsub_ent, self._getRootNode(_pub_jid), options = _options, profile_key=profile) d.addCallback(lambda x: debug(_("%(publisher)s's group node subscribed [%(profile)s]") % {'publisher':_pub_jid.userhost(), 'profile': profile})) d.addErrback(lambda x: error(_("Can't subscribe group node [%(profile)s]") % {'profile': profile})) client = self.host.getClient(profile) if not client: error(_('No client for this profile key: %s') % profile_key) return client.client_initialized.addCallback(after_init)