view src/plugins/plugin_xep_0277.py @ 1626:63cef4dbf2a4

core, plugins file, XEP-0234, bridge: progression api enhancement: - progressStarted have a new metadata parameter, useful to know the kind of progression, direction, etc. Check bridge doc - progressGetAllMetadata can be used to retrieve this data and discover on currently running progressions - progressFinished also have a new metadata parameter, used to e.g. indicate that hash is checked - core: fixed progressGetAll - file, XEP-0234: implemented the API modifications, hash is returned on progressFinished - file: SatFile.checkSize allows to check size independently of close (be sure that all the data have been transfered though)
author Goffi <goffi@goffi.org>
date Thu, 19 Nov 2015 18:13:26 +0100
parents 94901070478e
children 2b8a975ff712
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SAT plugin for microblogging over XMPP (xep-0277)
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger
log = getLogger(__name__)
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from twisted.internet import defer
from twisted.python import failure
from sat.core import exceptions
from sat.tools import xml_tools
from sat.tools import sat_defer

# XXX: tmp.pubsub is actually use instead of wokkel version
from wokkel import pubsub
try:
    from feed.date import rfc3339
except ImportError:
    raise exceptions.MissingModule(u"Missing module pyfeed, please download/install it from http://home.avvanta.com/~steveha/pyfeed.html")
import uuid
import time
import urlparse
import urllib

NS_MICROBLOG = 'urn:xmpp:microblog:0'
NS_ATOM = 'http://www.w3.org/2005/Atom'
NS_XHTML = 'http://www.w3.org/1999/xhtml'
NS_PUBSUB_EVENT = "{}{}".format(pubsub.NS_PUBSUB, "#event")
NS_COMMENT_PREFIX = '{}:comments/'.format(NS_MICROBLOG)


PLUGIN_INFO = {
    "name": "Microblogging over XMPP Plugin",
    "import_name": "XEP-0277",
    "type": "XEP",
    "protocols": ["XEP-0277"],
    "dependencies": ["XEP-0163", "XEP-0060", "TEXT-SYNTAXES"],
    "recommendations": ["XEP-0059"],
    "main": "XEP_0277",
    "handler": "no",
    "description": _("""Implementation of microblogging Protocol""")
}


class NodeAccessChangeException(Exception):
    pass


class XEP_0277(object):

    def __init__(self, host):
        log.info(_("Microblogging plugin initialization"))
        self.host = host
        self._p = self.host.plugins["XEP-0060"] # this facilitate the access to pubsub plugin
        self.rt_sessions = sat_defer.RTDeferredSessions()
        self.host.plugins["XEP-0060"].addManagedNode(None, items_cb=self._itemsReceived)

        host.bridge.addMethod("mbSend", ".plugin",
                              in_sign='ssa{ss}s', out_sign='',
                              method=self._mbSend,
                              async=True)
        host.bridge.addMethod("mbRetract", ".plugin",
                              in_sign='ssss', out_sign='',
                              method=self._mbRetract,
                              async=True)
        host.bridge.addMethod("mbGet", ".plugin",
                              in_sign='ssiasa{ss}s', out_sign='(aa{ss}a{ss})',
                              method=self._mbGet,
                              async=True)
        host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='',
                              method=self.mbSetAccess,
                              async=True)
        host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='',
                              method=self.mbSetAccess,
                              async=True)
        host.bridge.addMethod("mbSubscribeToMany", ".plugin", in_sign='sass', out_sign='s',
                              method=self._mbSubscribeToMany)
        host.bridge.addMethod("mbGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssaa{ss}a{ss}))',
                              method=self._mbGetFromManyRTResult, async=True)
        host.bridge.addMethod("mbGetFromMany", ".plugin", in_sign='sasia{ss}s', out_sign='s',
                              method=self._mbGetFromMany)
        host.bridge.addMethod("mbGetFromManyWithCommentsRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssa(a{ss}a(sssaa{ss}a{ss}))a{ss}))',
                              method=self._mbGetFromManyWithCommentsRTResult, async=True)
        host.bridge.addMethod("mbGetFromManyWithComments", ".plugin", in_sign='sasiia{ss}a{ss}s', out_sign='s',
                              method=self._mbGetFromManyWithComments)
        host.bridge.addMethod("mbGetAtom", ".plugin", in_sign='ssiasa{ss}s', out_sign='s',
                              method=self._mbGetAtom, async=True)

    ## plugin management methods ##

    def _itemsReceived(self, itemsEvent, profile):
        """Callback which manage items notifications (publish + retract)"""
        if not itemsEvent.nodeIdentifier.startswith(NS_MICROBLOG):
            return
        def manageItem(data, event):
            self.host.bridge.psEvent(C.PS_MICROBLOG, itemsEvent.sender.full(), itemsEvent.nodeIdentifier, event, data, profile)

        for item in itemsEvent.items:
            if item.name == C.PS_ITEM:
                self.item2mbdata(item).addCallbacks(manageItem, lambda failure: None, (C.PS_PUBLISH,))
            elif item.name == C.PS_RETRACT:
                manageItem({'id': item['id']}, C.PS_RETRACT)
            else:
                raise exceptions.InternalError("Invalid event value")


    ## data/item transformation ##

    @defer.inlineCallbacks
    def item2mbdata(self, item_elt):
        """Convert an XML Item to microblog data used in bridge API

        @param item_elt: domish.Element of microblog item
        @return: microblog data (dictionary)
        """
        microblog_data = {}

        def check_conflict(key, increment=False):
            """Check if key is already in microblog data

            @param key(unicode): key to check
            @param increment(bool): if suffix the key with an increment
                instead of raising an exception
            @raise exceptions.DataError: the key already exists
                (not raised if increment is True)
            """
            if key in microblog_data:
                if not increment:
                    raise failure.Failure(exceptions.DataError("key {} is already present for item {}").format(key, item_elt['id']))
                else:
                    idx=1 # the idx 0 is the key without suffix
                    fmt = "{}#{}"
                    new_key = fmt.format(key, idx)
                    while new_key in microblog_data:
                        idx+=1
                        new_key = fmt.format(key, idx)
                    key = new_key
            return key

        @defer.inlineCallbacks
        def parseElement(elem):
            """Parse title/content elements and fill microblog_data accordingly"""
            type_ = elem.getAttribute('type')
            if type_ == 'xhtml':
                data_elt = elem.firstChildElement()
                if data_elt.uri != NS_XHTML:
                    raise failure.Failure(exceptions.DataError(_('Content of type XHTML must declare its namespace!')))
                key = check_conflict(u'{}_xhtml'.format(elem.name))
                data = data_elt.toXml()
                microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].clean_xhtml(data)
            else:
                key = check_conflict(elem.name)
                microblog_data[key] = unicode(elem)


        id_ = item_elt.getAttribute('id', '') # there can be no id for transient nodes
        microblog_data['id'] = id_
        if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT):
            msg = u"Unsupported namespace {ns} in pubsub item {id_}".format(ns=item_elt.uri, id_=id_)
            log.warning(msg)
            raise failure.Failure(exceptions.DataError(msg))

        try:
            entry_elt = item_elt.elements(NS_ATOM, 'entry').next()
        except StopIteration:
            msg = u'No atom entry found in the pubsub item {}'.format(id_)
            raise failure.Failure(exceptions.DataError(msg))

        # atom:id
        try:
            id_elt = entry_elt.elements(NS_ATOM, 'id').next()
        except StopIteration:
            msg = u'No atom id found in the pubsub item {}, this is not standard !'.format(id_)
            log.warning(msg)
            microblog_data['atom_id'] = ""
        else:
            microblog_data['atom_id'] = unicode(id_elt)

        # title/content(s)
        try:
            title_elt = entry_elt.elements(NS_ATOM, 'title').next()
        except StopIteration:
            msg = u'No atom title found in the pubsub item {}'.format(id_)
            raise failure.Failure(exceptions.DataError(msg))

        yield parseElement(title_elt)

        for content_elt in entry_elt.elements(NS_ATOM, 'content'):
            yield parseElement(content_elt)

        # we check that text content is present
        for key in ('title', 'content'):
            if key not in microblog_data and ('{}_xhtml'.format(key)) in microblog_data:
                log.warning(u"item {id_} provide a {key}_xhtml data but not a text one".format(id_=id_, key=key))
                # ... and do the conversion if it's not
                microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].\
                                            convert(microblog_data['{}_xhtml'.format(key)],
                                            self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML,
                                            self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT,
                                            False)

        if 'content' not in microblog_data:
            # use the atom title data as the microblog body content
            microblog_data['content'] = microblog_data['title']
            del microblog_data['title']
            if 'title_xhtml' in microblog_data:
                microblog_data['content_xhtml'] = microblog_data['title_xhtml']
                del microblog_data['title_xhtml']

        # published/updated dates
        try:
            updated_elt = entry_elt.elements(NS_ATOM, 'updated').next()
        except StopIteration:
            msg = u'No atom updated element found in the pubsub item {}'.format(id_)
            raise failure.Failure(exceptions.DataError(msg))
        microblog_data['updated'] = unicode(rfc3339.tf_from_timestamp(unicode(updated_elt)))
        try:
            published_elt = entry_elt.elements(NS_ATOM, 'published').next()
        except StopIteration:
            microblog_data['published'] = microblog_data['updated']
        else:
            microblog_data['published'] = unicode(rfc3339.tf_from_timestamp(unicode(published_elt)))

        # links
        for link_elt in entry_elt.elements(NS_ATOM, 'link'):
            if link_elt.getAttribute('rel') == 'replies' and link_elt.getAttribute('title') == 'comments':
                key = check_conflict('comments', True)
                microblog_data[key] = link_elt['href']
                try:
                    service, node = self.parseCommentUrl(microblog_data[key])
                except:
                    log.warning(u"Can't parse url {}".format(microblog_data[key]))
                    del microblog_data[key]
                else:
                    microblog_data['{}_service'.format(key)] = service.full()
                    microblog_data['{}_node'.format(key)] = node
            else:
                rel = link_elt.getAttribute('rel','')
                title = link_elt.getAttribute('title','')
                href = link_elt.getAttribute('href','')
                log.warning(u"Unmanaged link element: rel={rel} title={title} href={href}".format(rel=rel, title=title, href=href))

        # author
        publisher = item_elt.getAttribute("publisher")
        try:
            author_elt = entry_elt.elements(NS_ATOM, 'author').next()
        except StopIteration:
            log.debug("Can't find author element in item {}".format(id_))
        else:
            # name
            try:
                name_elt = author_elt.elements(NS_ATOM, 'name').next()
            except StopIteration:
                log.warning("No name element found in author element of item {}".format(id_))
            else:
                microblog_data['author'] = unicode(name_elt)
            # uri
            try:
                uri_elt = author_elt.elements(NS_ATOM, 'uri').next()
            except StopIteration:
                log.debug("No uri element found in author element of item {}".format(id_))
                if publisher:
                    microblog_data['author_jid'] =  publisher
            else:
                uri = unicode(uri_elt)
                if uri.startswith("xmpp:"):
                    uri = uri[5:]
                    microblog_data['author_jid'] = uri
                else:
                    microblog_data['author_jid'] = item_elt.getAttribute("publisher") or ""

                if not publisher:
                    log.debug("No publisher attribute, we can't verify author jid")
                    microblog_data['author_jid_verified'] = C.BOOL_FALSE
                elif publisher == uri:
                    microblog_data['author_jid_verified'] = C.BOOL_TRUE
                else:
                    log.warning("item atom:uri differ from publisher attribute, spoofing attempt ? atom:uri = {} publisher = {}".format(uri, item_elt.getAttribute("publisher")))
                    microblog_data['author_jid_verified'] = C.BOOL_FALSE
            # email
            try:
                email_elt = author_elt.elements(NS_ATOM, 'email').next()
            except StopIteration:
                pass
            else:
                microblog_data['author_email'] = unicode(email_elt)

        defer.returnValue(microblog_data)

    @defer.inlineCallbacks
    def data2entry(self, data, item_id=None, profile_key=C.PROF_KEY_NONE):
        """Convert a data dict to en entry usable to create an item

        @param data: data dict as given by bridge method.
        @param item_id(unicode, None): id of the item to use
            if None the id will be generated randomly
        @return: deferred which fire domish.Element
        """
        if item_id is None:
            item_id = unicode(uuid.uuid4())
        client = self.host.getClient(profile_key)
        entry_elt = domish.Element((NS_ATOM, 'entry'))

        ## content and title ##
        synt = self.host.plugins["TEXT-SYNTAXES"]

        for elem_name in ('title', 'content'):
            for type_ in ['', '_rich', '_xhtml']:
                attr = "{}{}".format(elem_name, type_)
                if attr in data:
                    elem = entry_elt.addElement(elem_name)
                    if type_:
                        if type_ == '_rich':  # convert input from current syntax to XHTML
                            converted = yield synt.convert(data[attr], synt.getCurrentSyntax(profile_key), "XHTML")
                            if '{}_xhtml'.format(elem_name) in data:
                                raise failure.Failure(exceptions.DataError(_("Can't have xhtml and rich content at the same time")))
                        else:  # clean the XHTML input
                            converted = yield synt.clean_xhtml(data[attr])

                        xml_content = u'<div xmlns="{ns}">{converted}</div>'.format(
                                        ns=NS_XHTML,
                                        converted=converted)
                        elem.addChild(xml_tools.ElementParser()(xml_content))
                        elem['type'] = 'xhtml'
                        if elem_name not in data:
                            # there is raw text content, which is mandatory
                            # so we create one from xhtml content
                            elem_txt = entry_elt.addElement(elem_name)
                            text_content = yield self.host.plugins["TEXT-SYNTAXES"].convert(xml_content,
                                self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML,
                                self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT,
                                False)
                            elem_txt.addContent(text_content)
                            elem_txt['type'] = 'text'

                    else:  # raw text only needs to be escaped to get HTML-safe sequence
                        elem.addContent(data[attr])
                        elem['type'] = 'text'

        try:
            entry_elt.elements(NS_ATOM, 'title').next()
        except StopIteration:
            # we have no title element which is mandatory
            # so we transform content element to title
            elems = list(entry_elt.elements(NS_ATOM, 'content'))
            if not elems:
                raise exceptions.DataError("There must be at least one content or title element")
            for elem in elems:
                elem.name = 'title'

        ## author ##
        author_elt = entry_elt.addElement('author')
        try:
            author_name = data['author']
        except KeyError:
            # FIXME: must use better name
            author_name = client.jid.user
        author_elt.addElement('name', content=author_name)

        try:
            author_jid_s = data['author_jid']
        except KeyError:
            author_jid_s = client.jid.userhost()
        author_elt.addElement('uri', content="xmpp:{}".format(author_jid_s))

        ## published/updated time ##
        current_time = time.time()
        entry_elt.addElement('updated',
            content=rfc3339.timestamp_from_tf(float(data.get('updated', current_time))))
        entry_elt.addElement('published',
            content=rfc3339.timestamp_from_tf(float(data.get('published', current_time))))

        ## id ##
        entry_id = data.get('id', item_id) # FIXME: use a proper id (see XEP-0277 §7.1)
        entry_elt.addElement('id', content=entry_id) #

        ## comments ##
        if 'comments' in data:
            link_elt = entry_elt.addElement('link')
            link_elt['href'] = data['comments']
            link_elt['rel'] = 'replies'
            link_elt['title'] = 'comments'

        ## final item building ##
        item_elt = pubsub.Item(id=item_id, payload=entry_elt)
        defer.returnValue(item_elt)

    ## publish ##

    @defer.inlineCallbacks
    def _manageComments(self, access, mb_data, service, node, item_id, profile):
        """Check comments keys in mb_data and create comments node if necessary

        @param access(unicode): access model
        @param mb_data(dict): microblog mb_data
        @param service(jid.JID): Pubsub service of the parent item
        @param node(unicode): node of the parent item
        @param item_id(unicoe): id of the parent item
        """
        allow_comments = C.bool(mb_data.pop("allow_comments", "false"))
        if not allow_comments:
            return

        client = self.host.getClient(profile)

        options = {self._p.OPT_ACCESS_MODEL: access,
                   self._p.OPT_PERSIST_ITEMS: 1,
                   self._p.OPT_MAX_ITEMS: -1,
                   self._p.OPT_DELIVER_PAYLOADS: 1,
                   self._p.OPT_SEND_ITEM_SUBSCRIBE: 1,
                   self._p.OPT_PUBLISH_MODEL: "subscribers",  # TODO: should be open if *both* node and item access_model are open (public node and item)
                   }

        comments_node_base = u"{}{}".format(NS_COMMENT_PREFIX, item_id)
        comments_node = comments_node_base

        suffix = None
        comments_service = client.pubsub_service if client.pubsub_service is not None else service
        max_tries = 3

        for i in xrange(max_tries+1):
            try:
                yield self._p.createNode(comments_service, comments_node, options, profile_key=profile)
                break
            except error.StanzaError as e:
                if e.condition == 'conflict' and i<max_tries:
                    log.warning(u"node {} already exists on service {}".format(comments_node, comments_service))
                    suffix = 0 if suffix is None else suffix + 1
                    comments_node = u"{}_{}".format(comments_node_base, suffix)
                else:
                    raise e

        if comments_service is None:
            comments_service = client.jid.userhostJID()

        mb_data['comments'] = "xmpp:%(service)s?%(query)s" % {
            'service': comments_service.userhost(),
            'query': urllib.urlencode([('node', comments_node.encode('utf-8'))])
            }

    def _mbSend(self, service, node, data, profile_key):
        service = jid.JID(service) if service else None
        node = node if node else NS_MICROBLOG
        profile = self.host.memory.getProfileName(profile_key)
        return self.send(service, node, data, profile)

    @defer.inlineCallbacks
    def send(self, service=None, node=NS_MICROBLOG, data=None, profile=None):
        """Send XEP-0277's microblog data

        @param service(jid.JID, None): PubSub service where the microblog must be published
            None to publish on profile's PEP
        @param node(unicode): PubSub node to use (defaut to microblog NS)
        @param data(dict): microblog data (must include at least a "content" or a "title" key).
            see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details
        @param profile: %(doc_profile)s
        """
        assert profile is not None

        item_id = data.get('id') or unicode(uuid.uuid4())
        try:
            yield self._manageComments(self._p.ACCESS_OPEN, data, service, node, item_id, profile)
        except error.StanzaError:
            log.warning("Can't create comments node for item {}".format(item_id))
        item = yield self.data2entry(data, item_id, profile)
        ret = yield self._p.publish(service, node, [item], profile_key=profile)
        defer.returnValue(ret)


    ## retract ##

    def _mbRetract(self, service_jid_s, nodeIdentifier, itemIdentifier, profile_key):
        """Call self._p._retractItem, but use default node if node is empty"""
        return self._p._retractItem(service_jid_s, nodeIdentifier or NS_MICROBLOG, itemIdentifier, True, profile_key)

    ## get ##

    def _mbGet(self, service_jid_s, node="", max_items=10, item_ids=None, extra_dict=None, profile_key=C.PROF_KEY_NONE):
        """
        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        @param item_ids (list[unicode]): list of item IDs
        """
        max_items = None if max_items == C.NO_LIMIT else max_items
        extra = self._p.parseExtra(extra_dict)
        return self.mbGet(jid.JID(service_jid_s), node or None, max_items, item_ids, extra.rsm_request, extra.extra, profile_key)

    @defer.inlineCallbacks
    def mbGet(self, service_jid, node=None, max_items=None, item_ids=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE):
        """Get some microblogs

        @param service_jid(jid.JID): jid of the publisher
        @param node(unicode, None): node to get (or microblog node if None)
        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        @param item_ids (list[unicode]): list of item IDs
        @param rsm_request (rsm.RSMRequest): RSM request data
        @param extra (dict): extra data
        @param profile_key: %(doc_profile_key)s

        @return: a deferred couple with the list of items and metadatas.
        """
        if node is None:
            node = NS_MICROBLOG
        items_data = yield self._p.getItems(service_jid, node, max_items=max_items, item_ids=item_ids, rsm_request=rsm_request, extra=extra, profile_key=profile_key)
        serialised = yield self._p.serItemsDataD(items_data, self.item2mbdata)
        defer.returnValue(serialised)

    def parseCommentUrl(self, node_url):
        """Parse a XMPP URI

        Determine the fields comments_service and comments_node of a microblog data
        from the href attribute of an entry's link element. For example this input:
        xmpp:sat-pubsub.libervia.org?node=urn%3Axmpp%3Acomments%3A_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asouliane%40libervia.org
        will return (JID(u'sat-pubsub.libervia.org'), 'urn:xmpp:comments:_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:souliane@libervia.org')
        @return: a tuple (JID, str)
        """
        parsed_url = urlparse.urlparse(node_url, 'xmpp')
        service = jid.JID(parsed_url.path)
        queries = parsed_url.query.split(';')
        parsed_queries = dict()
        for query in queries:
            parsed_queries.update(urlparse.parse_qs(query))
        node = parsed_queries.get('node', [''])[0]

        if not node:
            raise failure.Failure(exceptions.DataError('Invalid comments link'))

        return (service, node)

    ## configure ##

    def mbSetAccess(self, access="presence", profile_key=C.PROF_KEY_NONE):
        """Create a microblog node on PEP with given access

        If the node already exists, it change options
        @param access: Node access model, according to xep-0060 #4.5
        @param profile_key: profile key"""

        _jid, xmlstream = self.host.getJidNStream(profile_key)
        if not _jid:
            log.error(_("Can't find profile's jid"))
            return
        _options = {self._p.OPT_ACCESS_MODEL: access, self._p.OPT_PERSIST_ITEMS: 1, self._p.OPT_MAX_ITEMS: -1, self._p.OPT_DELIVER_PAYLOADS: 1, self._p.OPT_SEND_ITEM_SUBSCRIBE: 1}

        def cb(result):
            #Node is created with right permission
            log.debug(_(u"Microblog node has now access %s") % access)

        def fatal_err(s_error):
            #Something went wrong
            log.error(_("Can't set microblog access"))
            raise NodeAccessChangeException()

        def err_cb(s_error):
            #If the node already exists, the condition is "conflict",
            #else we have an unmanaged error
            if s_error.value.condition == 'conflict':
                #d = self.host.plugins["XEP-0060"].deleteNode(_jid.userhostJID(), NS_MICROBLOG, profile_key=profile_key)
                #d.addCallback(lambda x: create_node().addCallback(cb).addErrback(fatal_err))
                change_node_options().addCallback(cb).addErrback(fatal_err)
            else:
                fatal_err(s_error)

        def create_node():
            return self._p.createNode(_jid.userhostJID(), NS_MICROBLOG, _options, profile_key=profile_key)

        def change_node_options():
            return self._p.setOptions(_jid.userhostJID(), NS_MICROBLOG, _jid.userhostJID(), _options, profile_key=profile_key)

        create_node().addCallback(cb).addErrback(err_cb)

    ## methods to manage several stanzas/jids at once ##

    # common

    def _getClientAndNodeData(self, publishers_type, publishers, profile_key):
        """Helper method to construct node_data from publishers_type/publishers

        @param publishers_type: type of the list of publishers, one of:
            C.ALL: get all jids from roster, publishers is not used
            C.GROUP: get jids from groups
            C.JID: use publishers directly as list of jids
        @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids)
        @param profile_key: %(doc_profile_key)s
        """
        client = self.host.getClient(profile_key)
        if publishers_type == C.JID:
            jids_set = set(publishers)
        else:
            jids_set = client.roster.getJidsSet(publishers_type, publishers)

        node_data = []
        for jid_ in jids_set:
            node_data.append((jid_, NS_MICROBLOG))
        return client, node_data

    def _checkPublishers(self, publishers_type, publishers):
        """Helper method to deserialise publishers coming from bridge

        publishers_type(unicode): type of the list of publishers, one of:
        publishers: list of publishers according to type
        @return: deserialised (publishers_type, publishers) tuple
        """
        if publishers_type == C.ALL:
            if publishers:
                raise failure.Failure(ValueError("Can't use publishers with {} type".format(publishers_type)))
            else:
                publishers = None
        elif publishers_type == C.JID:
            publishers[:] = [jid.JID(publisher) for publisher in publishers]
        return publishers_type, publishers



    # subscribe #

    def _mbSubscribeToMany(self, publishers_type, publishers, profile_key):
        """

        @return (str): session id: Use pubsub.getSubscribeRTResult to get the results
        """
        publishers_type, publishers = self._checkPublishers(publishers_type, publishers)
        return self.mbSubscribeToMany(publishers_type, publishers, profile_key)

    def mbSubscribeToMany(self, publishers_type, publishers, profile_key):
        """Subscribe microblogs for a list of groups or jids

        @param publishers_type: type of the list of publishers, one of:
            C.ALL: get all jids from roster, publishers is not used
            C.GROUP: get jids from groups
            C.JID: use publishers directly as list of jids
        @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids)
        @param profile: %(doc_profile)s
        @return (str): session id
        """
        client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key)
        return self._p.subscribeToMany(node_data, client.jid.userhostJID(), profile_key=profile_key)

    # get #

    def _mbGetFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
        """Get real-time results for mbGetFromMany session

        @param session_id: id of the real-time deferred session
        @param return (tuple): (remaining, results) where:
            - remaining is the number of still expected results
            - results is a list of tuple with
                - service (unicode): pubsub service
                - node (unicode): pubsub node
                - failure (unicode): empty string in case of success, error message else
                - items_data(list): data as returned by [mbGet]
                - items_metadata(dict): metadata as returned by [mbGet]
        @param profile_key: %(doc_profile_key)s
        """
        def onSuccess(items_data):
            """convert items elements to list of microblog data in items_data"""
            d = self._p.serItemsDataD(items_data, self.item2mbdata)
            d.addCallback(lambda serialised:('', serialised))
            return d

        profile = self.host.getClient(profile_key).profile
        d = self._p.getRTResults(session_id,
                                 on_success = onSuccess,
                                 on_error = lambda failure: (unicode(failure.value), ([],{})),
                                 profile = profile)
        d.addCallback(lambda ret: (ret[0],
                                   [(service.full(), node, failure, items, metadata)
                                    for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()]))
        return d

    def _mbGetFromMany(self, publishers_type, publishers, max_items=10, extra_dict=None, profile_key=C.PROF_KEY_NONE):
        """
        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        """
        max_items = None if max_items == C.NO_LIMIT else max_items
        publishers_type, publishers = self._checkPublishers(publishers_type, publishers)
        extra = self._p.parseExtra(extra_dict)
        return self.mbGetFromMany(publishers_type, publishers, max_items, extra.rsm_request, extra.extra, profile_key)

    def mbGetFromMany(self, publishers_type, publishers, max_items=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE):
        """Get the published microblogs for a list of groups or jids

        @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL")
        @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids)
        @param max_items (int): optional limit on the number of retrieved items.
        @param rsm_request (rsm.RSMRequest): RSM request data, common to all publishers
        @param extra (dict): Extra data
        @param profile_key: profile key
        @return (str): RT Deferred session id
        """
        # XXX: extra is unused here so far
        client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key)
        return self._p.getFromMany(node_data, max_items, rsm_request, profile_key=profile_key)

    # comments #

    def _mbGetFromManyWithCommentsRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
        """Get real-time results for [mbGetFromManyWithComments] session

        @param session_id: id of the real-time deferred session
        @param return (tuple): (remaining, results) where:
            - remaining is the number of still expected results
            - results is a list of 5-tuple with
                - service (unicode): pubsub service
                - node (unicode): pubsub node
                - failure (unicode): empty string in case of success, error message else
                - items(list[tuple(dict, list)]): list of 2-tuple with
                    - item(dict): item microblog data
                    - comments_list(list[tuple]): list of 5-tuple with
                        - service (unicode): pubsub service where the comments node is
                        - node (unicode): comments node
                        - failure (unicode): empty in case of success, else error message
                        - comments(list[dict]): list of microblog data
                        - comments_metadata(dict): metadata of the comment node
                - metadata(dict): original node metadata
        @param profile_key: %(doc_profile_key)s
        """
        profile = self.host.getClient(profile_key).profile
        d = self.rt_sessions.getResults(session_id, profile=profile)
        d.addCallback(lambda ret: (ret[0],
                                   [(service.full(), node, failure, items, metadata)
                                    for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()]))
        return d

    def _mbGetFromManyWithComments(self, publishers_type, publishers, max_items=10, max_comments=C.NO_LIMIT, extra_dict=None, extra_comments_dict=None, profile_key=C.PROF_KEY_NONE):
        """
        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        @param max_comments(int): maximum number of comments to get, C.NO_LIMIT for no limit
        """
        max_items = None if max_items == C.NO_LIMIT else max_items
        max_comments = None if max_comments == C.NO_LIMIT else max_comments
        publishers_type, publishers = self._checkPublishers(publishers_type, publishers)
        extra = self._p.parseExtra(extra_dict)
        extra_comments = self._p.parseExtra(extra_comments_dict)
        return self.mbGetFromManyWithComments(publishers_type, publishers, max_items, max_comments,
                                              extra.rsm_request,
                                              extra.extra,
                                              extra_comments.rsm_request,
                                              extra_comments.extra,
                                              profile_key)

    def mbGetFromManyWithComments(self, publishers_type, publishers, max_items=None, max_comments=None, rsm_request=None, extra=None, rsm_comments=None, extra_comments=None, profile_key=C.PROF_KEY_NONE):
        """Helper method to get the microblogs and their comments in one shot

        @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL")
        @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids)
        @param max_items (int): optional limit on the number of retrieved items.
        @param max_comments (int): maximum number of comments to retrieve
        @param rsm_request (rsm.RSMRequest): RSM request for initial items only
        @param extra (dict): extra configuration for initial items only
        @param rsm_comments (rsm.RSMRequest): RSM request for comments only
        @param extra_comments (dict): extra configuration for comments only
        @param profile_key: profile key
        @return (str): RT Deferred session id
        """
        # XXX: this method seems complicated because it do a couple of treatments
        #      to serialise and associate the data, but it make life in frontends side
        #      a lot easier

        def getComments(items_data):
            """Retrieve comments and add them to the items_data

            @param items_data: serialised items data
            @return (defer.Deferred): list of items where each item is associated
                with a list of comments data (service, node, list of items, metadata)
            """
            items, metadata = items_data
            items_dlist = [] # deferred list for items
            for item in items:
                dlist = [] # deferred list for comments
                for key, value in item.iteritems():
                    # we look for comments
                    if key.startswith('comments') and key.endswith('_service'):
                        prefix = key[:key.find('_')]
                        service_s = value
                        node = item["{}{}".format(prefix, "_node")]
                        # time to get the comments
                        d = self._p.getItems(jid.JID(service_s), node, max_comments, rsm_request=rsm_comments, extra=extra_comments, profile_key=profile_key)
                        # then serialise
                        d.addCallback(lambda items_data: self._p.serItemsDataD(items_data, self.item2mbdata))
                        # with failure handling
                        d.addCallback(lambda serialised_items_data: ('',) + serialised_items_data)
                        d.addErrback(lambda failure: (unicode(failure.value), [], {}))
                        # and associate with service/node (needed if there are several comments nodes)
                        d.addCallback(lambda serialised, service_s=service_s, node=node: (service_s, node) + serialised)
                        dlist.append(d)
                # we get the comments
                comments_d = defer.gatherResults(dlist)
                # and add them to the item data
                comments_d.addCallback(lambda comments_data, item=item: (item, comments_data))
                items_dlist.append(comments_d)
            # we gather the items + comments in a list
            items_d = defer.gatherResults(items_dlist)
            # and add the metadata
            items_d.addCallback(lambda items_completed: (items_completed, metadata))
            return items_d

        client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key)
        deferreds = {}
        for service, node in node_data:
            d = deferreds[(service, node)] = self._p.getItems(service, node, max_items, rsm_request=rsm_request, extra=extra, profile_key=profile_key)
            d.addCallback(lambda items_data: self._p.serItemsDataD(items_data, self.item2mbdata))
            d.addCallback(getComments)
            d.addCallback(lambda items_comments_data: ('', items_comments_data))
            d.addErrback(lambda failure: (unicode(failure.value), ([],{})))

        return self.rt_sessions.newSession(deferreds, client.profile)

    # atom feed

    def _mbGetAtom(self,  service_jid_s, node="", max_items=10, item_ids=None, extra_dict=None, profile_key=C.PROF_KEY_NONE):
        """Get the atom feed of the last published microblogs

        @param service_jid(jid.JID): jid of the publisher
        @param node(unicode, None): node to get (or microblog node if None)
        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        @param item_ids (list[unicode]): list of item IDs
        @param rsm_request (rsm.RSMRequest): RSM request data
        @param extra (dict): extra data
        @param profile_key: %(doc_profile_key)s
        @return: a deferred unicode (atom XML feed)
        """
        max_items = None if max_items == C.NO_LIMIT else max_items
        extra = self._p.parseExtra(extra_dict)
        return self.mbGetAtom(jid.JID(service_jid_s), node or None, max_items, item_ids, extra.rsm_request, extra.extra, profile_key)

    @defer.inlineCallbacks
    def mbGetAtom(self, service_jid, node=None, max_items=None, item_ids=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE):
        """Get the atom feed of the last published microblogs

        @param service_jid(jid.JID): jid of the publisher
        @param node(unicode, None): node to get (or microblog node if None)
        @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
        @param item_ids (list[unicode]): list of item IDs
        @param rsm_request (rsm.RSMRequest): RSM request data
        @param extra (dict): extra data
        @param profile_key: %(doc_profile_key)s

        @return: a deferred couple with the list of items and metadatas.
        """
        if node is None:
            node = NS_MICROBLOG
        items, metadata = yield self._p.getItems(service_jid, node, max_items=max_items, item_ids=item_ids, rsm_request=rsm_request, extra=extra, profile_key=profile_key)

        feed = """<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
    <title>%(user)s's blogposts</title>
    <link href="%(feed)s" rel="self" />
    <link href="%(blog)s" />
    <id>%(id)s</id>
    <updated>%(date)s</updated>\n""" % {'user': service_jid.user,
                                        'feed': 'http://%s/blog/%s/atom.xml' % (service_jid.host, service_jid.user),
                                        'blog': 'http://%s/blog/%s' % (service_jid.host, service_jid.user),
                                        'id': node,
                                        'date': rfc3339.timestamp_from_tf(rfc3339.tf_utc())}

        def removeAllURIs(element):
            """Recursively remove the URIs of the element and its children.
            Without that, the entry would still be valid but not displayed
            by Firefox nor Thunderbird (and probably more readers)"""
            element.uri = element.defaultUri = None
            for child in element.children:
                if isinstance(child, domish.Element):
                    removeAllURIs(child)

        for item in items:
            entry = item.firstChildElement()
            removeAllURIs(entry)
            feed += "    " + entry.toXml() + "\n"
        defer.returnValue(feed + "</feed>")