view src/plugins/plugin_misc_groupblog.py @ 384:785420cd63f7

plugins: In-Band Bytestreams (XEP-0047) implementation
author Goffi <goffi@goffi.org>
date Thu, 29 Sep 2011 12:05:45 +0200
parents f964dcec1611
children cf005701624b
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

"""
SAT plugin for microbloging with roster access
Copyright (C) 2009, 2010, 2011  Jérôme Poisson (goffi@goffi.org)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""

from logging import debug, info, error
from twisted.internet import protocol, defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber import error as jab_error
import twisted.internet.error
from twisted.words.xish import domish
from sat.tools.xml_tools import ElementParser

from wokkel import disco,pubsub
from feed.atom import Entry, Author
import uuid
from time import time

NS_MICROBLOG = 'urn:xmpp:microblog:%02d'
CONFIG_NODE = 'CONFIG'
OPT_ACCESS_MODEL = 'pubsub#access_model'
OPT_PERSIST_ITEMS = 'pubsub#persist_items'
OPT_MAX_ITEMS = 'pubsub#max_items'
OPT_NODE_TYPE = 'pubsub#node_type'
OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type'
OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth'
TYPE_COLLECTION = 'collection'

PLUGIN_INFO = {
"name": "Group blogging throught collections",
"import_name": "groupblog",
"type": "MISC",
"protocols": [],
"dependencies": ["XEP-0277"],
"main": "GroupBlog",
"handler": "no",
"description": _("""Implementation of microblogging with roster access""")
}

class NodeCreationError(Exception):
    pass

class NodeDeletionError(Exception):
    pass

class GroupBlog():
    """This class use a PubSub Collection to manage roster access on microblog"""

    def __init__(self, host):
        info(_("Group blog plugin initialization"))
        self.host = host
        self._blog_nodes={} #keep association betweek [profile][node] and [groups]
        for i in range(1,21):
            self.host.plugins["XEP-0163"].addPEPEvent("MICROBLOG_%02d" % i, NS_MICROBLOG % i, self.groupblogCB, None)

        host.bridge.addMethod("cleanBlogCollection", ".plugin", in_sign='s', out_sign='',
                               method=self.cleanBlogCollection,
                               doc = {
                                     })
        
        host.bridge.addMethod("getMblogNodes", ".plugin", in_sign='s', out_sign='a{sas}',
                               method=self.getMblogNodes,
                               async = True,
                               doc = { 'summary':"retrieve mblog node, and their association with roster's groups",
                                       'param_0':'%(doc_profile)s',
                                       'return':'list of microblog data (dict)'
                                     })

        host.bridge.addMethod("sendGroupBlog", ".plugin", in_sign='asss', out_sign='',
                               method=self.sendGroupBlog,
                               doc = { 'summary':"Send a microblog to a list of groups",
                                       'param_0':'list of groups which can read the microblog',
                                       'param_1':'text to send',
                                       'param_2':'%(doc_profile)s'
                                     })
        
        host.bridge.addMethod("subscribeGroupBlog", ".plugin", in_sign='ss', out_sign='',
                               method=self.subscribeGroupBlog,
                               doc = { 'summary':"Subscribe to the group blog of somebody",
                                       'param_0':'jid of the group node published',
                                       'param_1':'%(doc_profile)s'
                                     })
    
    def groupblogCB(self, itemsEvent, profile):
        for item in itemsEvent.items:
            microblog_data = self.host.plugins["XEP-0277"]._item2mbdata(item)
            microblog_data["node"] = itemsEvent.nodeIdentifier
            try:
                microblog_data["groups"] = "\n".join(self._blog_nodes[profile].get(itemsEvent.nodeIdentifier, []))
            except KeyError:
                pass 
            self.host.bridge.personalEvent(itemsEvent.sender.full(), "MICROBLOG", microblog_data, profile)

    def _getRootNode(self, entity):
        return "%(entity)s_%(root_suff)s" % {'entity':entity.userhost(), 'root_suff':MBLOG_COLLECTION}

    def _getNodeName(self, number):
        """Return the node name
         @param number: int number of the node
         @param entity: jid of the owner"""
        return NS_MICROBLOG % number 

    def _getConfigNode(self, entity):
        return "%(entity)s_%(root_suff)s" % {'entity':entity.userhost(), 'root_suff':CONFIG_NODE}

    def _configNodeCb(self, result, callback, profile):
        self._blog_nodes[profile] = {}
        for item in result:
            node_ass = item.firstChildElement()
            assert(node_ass.name == "node_association")
            node = node_ass['node']
            groups = [unicode(group) for group in node_ass.children]
            self._blog_nodes[profile][node] = groups
        callback(self._blog_nodes[profile])
    
    def _configNodeFail(self, failure, errback):
        errback("") #FIXME

    def _configNodeErr(self, failure, user_jid, pubsub_ent, callback, errback, profile):
        if failure.value.condition == 'item-not-found':
            debug(_('Multiblog config node not found, creating it'))
            _options = {OPT_ACCESS_MODEL:"whitelist", OPT_PERSIST_ITEMS:1, OPT_MAX_ITEMS:-1}
            d = self.host.plugins["XEP-0060"].createNode(pubsub_ent, self._getConfigNode(user_jid), _options, profile_key=profile)
            d.addCallback(lambda result: self._configNodeCb([] , callback, profile))
            d.addErrback(self._configNodeFail, errback)
        else:
            self._configNodeFail(failure, errback)
    
    def getMblogNodes(self, profile_key='@DEFAULT@', callback=None, errback=None):
        debug(_('Getting mblog nodes'))
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            error(_("Unknown profile"))
            return {}
        
        def after_init(ignore):
            pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile)
            _jid, xmlstream = self.host.getJidNStream(profile_key)
            d = self.host.plugins["XEP-0060"].getItems(pubsub_ent, self._getConfigNode(_jid), profile_key=profile_key)
            d.addCallbacks(self._configNodeCb, self._configNodeErr, callbackArgs=(callback, profile), errbackArgs=(_jid, pubsub_ent, callback, errback, profile))

        client = self.host.getClient(profile)
        if not client:
            error(_('No client for this profile key: %s') % profile_key)
            return
        client.client_initialized.addCallback(after_init)

    def _publishMblog(self, name, message, client):
        """Actually publish the message on the group blog
        @param name: name of the node where we publish
        @param message: message to publish
        @param client: SatXMPPClient of the published"""
        mblog_item = self.host.plugins["XEP-0277"].data2entry({'content':message}, client.profile)
        defer_blog = self.host.plugins["XEP-0060"].publish(client.jid.userhostJID(), name, items=[mblog_item], profile_key=client.profile)
        defer_blog.addErrback(self._mblogPublicationFailed)

    def _groupNodeCreated(self, ignore, groups, name, message, client):
        """A group node as been created, we need to add it to the configure node, and send the message to it
        @param groups: list of groups authorized to subscribe to the node
        @param name: unique name of the node
        @param message: message to publish to the group
        @param client: SatXMPPClient"""
        def configNodeUpdated(result):
            self._blog_nodes[client.profile][name] = groups
            debug(_("Configuration node updated"))

        config_node = self._getConfigNode(client.jid)
        _payload = domish.Element(('','node_association'))
        _payload['node'] = name
        for group in groups:
            _payload.addElement('group',content=group)
        config_item = pubsub.Item(payload=_payload)
        pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", client.profile)
        defer_config = self.host.plugins["XEP-0060"].publish(pubsub_ent, config_node, items=[config_item], profile_key=client.profile)
        defer_config.addCallback(configNodeUpdated)
        defer_config.addErrback(self._configUpdateFailed)

        #Finally, we publish the message
        self._publishMblog(name, message, client)


    def _mblogPublicationFailed(self, failure):
        #TODO
        pass

    def _configUpdateFailed(self, failure):
        #TODO
        pass

    def _nodeCreationFailed(self, failure, name, groups, message, client):
        #FIXME: temporary behaviour is to delete the node,
        #user input should be required in the future
        def unmanagedError(failure):
            msg = _("Can't create node")
            error(msg)
            raise NodeCreationError(msg)
        
        if failure.value.condition == 'conflict':
            pubsub_elts = filter(lambda elt: elt.name == 'pubsub', failure.value.children)
            if pubsub_elts:
                create_elts = filter(lambda elt: elt.name == 'create', pubsub_elts[0].children)
                if create_elts:
                    _from = jid.JID(failure.value.stanza['from'])
                    _node = create_elts[0]['node']
                    d = self.host.plugins["XEP-0060"].deleteNode(_from, _node, client.profile)
                    d.addCallback(self._createNode, name, groups, message, client)
                    d.addErrback(unmanagedError)
        else:
            unmanagedError(None)
            msg = _("Can't create node")
            error(msg)
            raise NodeCreationError(msg)

    def _createNode(self, ignore, name, groups, message, client):
        """create a group microblog node
        @param ignore: ignored param, necessary to be added as a deferred callback
        @param name: name of the node
        @param groups: list of group than can subscribe to the node
        @param message: message to publish
        @param client: SatXMPPClient"""
        _options = {OPT_ACCESS_MODEL:"roster", OPT_PERSIST_ITEMS:1, OPT_MAX_ITEMS:-1,
                    'pubsub#roster_groups_allowed':groups}
        d = self.host.plugins["XEP-0060"].createNode(client.jid.userhostJID(), name, _options, client.profile)
        d.addCallback(self._groupNodeCreated, groups, name, message, client)
        d.addErrback(self._nodeCreationFailed, name, groups, message, client)
    
    def _getNodeForGroups(self, groups, profile):
        """Return node associated with the given list of groups
        @param groups: list of groups
        @param profile: profile of publisher"""
        for node in self._blog_nodes[profile]:
            node_groups = self._blog_nodes[profile][node]
            if set(node_groups) == set(groups):
                return node
        return None

    def _getFreeNode(self, entity, profile):
        """Return a free group number,
        raise an exception if we have reach limit"""
        _all = set([self._getNodeName(idx) for idx in range(1,21)])
        _used = set(self._blog_nodes[profile].keys())
        _free = _all.difference(_used)
        if not _free:
            msg = _("Can't create group node: no more free node available")
            warning(msg)
            raise NodeCreationError(msg)
        else:
            return _free.pop()

    def sendGroupBlog(self, groups, message, profile_key='@DEFAULT@'):
        """Publish a microblog to the node associated to the groups
        If the node doesn't exist, it is created, then the message is posted
        @param groups: list of groups allowed to retrieve the microblog
        @param message: microblog
        @profile_key: %(doc_profile)s
        """
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            error(_("Unknown profile"))
            return
        
        def after_init(ignore):
            _groups = list(set(groups).intersection(client.roster.getGroups())) #We only keep group which actually exist
            #TODO: send an error signal if user want to post to non existant groups
            _groups.sort()
            for group in _groups:
                _node = self._getNodeForGroups([group], profile)
                if not _node:
                    _node_name = self._getFreeNode(client.jid, profile)
                    self._createNode(None, _node_name, [group], message, client)
                else:
                    self._publishMblog(_node, message, client)
        
        client = self.host.getClient(profile)
        if not client:
            error(_('No client for this profile key: %s') % profile_key)
            return
        client.client_initialized.addCallback(after_init)

    def _doCleaning(self, result, pubsub_ent, profile):
        """Compare the node in config node, and the existing nodes, and delete unknown ones"""
        #TODO: manage groups which don't exist anymore
        assert(len(result)==2)
        assert(result[0][0]==True and result[1][0]==True)
        config_nodes = [item.firstChildElement()["node"] for item in result[0][1]]
        existing_nodes = [item.nodeIdentifier for item in result[1][1]._items]
        to_delete = set(config_nodes).symmetric_difference(existing_nodes)
        def check_deletion(result):
            for (success, value) in result:
                if not success:
                    msg = _("Can't delete node")
                    error(msg)
                    raise NodeDeletionError(msg)
                    #TODO: log node which was not deleted

        
        d = defer.DeferredList([self.host.plugins["XEP-0060"].deleteNode(pubsub_ent, node, profile) for node in to_delete])
        d.addCallback(check_deletion)

    def cleanBlogCollection(self, profile_key='@DEFAULT@'):
        """Remove blog nodes not referenced in config node"""
        debug(_('Cleaning mblog nodes'))
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            error(_("Unknown profile"))
            return {}
        
        def after_init(ignore):
            pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile)
            _jid, xmlstream = self.host.getJidNStream(profile_key)
            d_config = self.host.plugins["XEP-0060"].getItems(pubsub_ent, self._getConfigNode(_jid), profile_key=profile_key)
            d_root = client.disco.requestItems(pubsub_ent, self._getRootNode(client.jid))
            defer.DeferredList([d_config, d_root]).addCallback(self._doCleaning, pubsub_ent, profile)

        client = self.host.getClient(profile)
        if not client:
            error(_('No client for this profile key: %s') % profile_key)
            return
        client.client_initialized.addCallback(after_init)

    def subscribeGroupBlog(self, pub_jid, profile_key='@DEFAULT@'):
        debug(_('subscribing mblog nodes'))
        _pub_jid = jid.JID(pub_jid)
        profile = self.host.memory.getProfileName(profile_key)
        if not profile:
            error(_("Unknown profile"))
            return
        
        def after_init(ignore):
            pubsub_ent = self.host.memory.getServerServiceEntity("pubsub", "service", profile)
            _options = {OPT_SUBSCRIPTION_TYPE:'items', OPT_SUBSCRIPTION_DEPTH:'1'}
            d = self.host.plugins["XEP-0060"].subscribe(pubsub_ent, self._getRootNode(_pub_jid), options = _options, profile_key=profile)
            d.addCallback(lambda x: debug(_("%(publisher)s's group node subscribed [%(profile)s]") % {'publisher':_pub_jid.userhost(), 'profile': profile}))
            d.addErrback(lambda x: error(_("Can't subscribe group node [%(profile)s]") % {'profile': profile}))

        client = self.host.getClient(profile)
        if not client:
            error(_('No client for this profile key: %s') % profile_key)
            return
        client.client_initialized.addCallback(after_init)