view src/plugins/ @ 1453:d5e72362ee91

plugin XEP-0277: better parsing of atom:author element + item2mbdata minor reorganisation for better readability
author Goffi <>
date Sat, 15 Aug 2015 22:22:36 +0200
parents 5116d70ddd1c
children 4e2fab4de195
line wrap: on
line source

# -*- coding: utf-8 -*-

# SAT plugin for microblogging over XMPP (xep-0277)
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger
log = getLogger(__name__)
from twisted.words.protocols.jabber import jid
from twisted.internet import defer
from twisted.python import failure
from sat.core import exceptions
from import ElementParser

from wokkel import pubsub
from wokkel import rsm
from feed import atom, date
import uuid
from time import time
import urlparse
from cgi import escape

NS_MICROBLOG = 'urn:xmpp:microblog:0'
NS_ATOM = ''
NS_PUBSUB_EVENT = "{}{}".format(pubsub.NS_PUBSUB, "#event")

    "name": "Microblogging over XMPP Plugin",
    "import_name": "XEP-0277",
    "type": "XEP",
    "protocols": ["XEP-0277"],
    "dependencies": ["XEP-0163", "XEP-0060", "TEXT-SYNTAXES"],
    "recommendations": ["XEP-0059"],
    "main": "XEP_0277",
    "handler": "no",
    "description": _("""Implementation of microblogging Protocol""")

class NodeAccessChangeException(Exception):

class XEP_0277(object):

    def __init__(self, host):"Microblogging plugin initialization")) = host
        self._p =["XEP-0060"] # this facilitate the access to pubsub plugin["XEP-0163"].addPEPEvent("MICROBLOG", NS_MICROBLOG, self.microblogCB, self.sendMicroblog, notify=False)
        host.bridge.addMethod("getLastMicroblogs", ".plugin",
                              in_sign='sis', out_sign='(aa{ss}a{ss})',
                              doc={'summary': 'retrieve items',
                                   'param_0': 'jid: publisher of wanted microblog',
                                   'param_1': 'max_items: see XEP-0060 #6.5.7',
                                   'param_2': '%(doc_profile)s',
                                   'return': 'list of microblog data (dict)'})
        host.bridge.addMethod("setMicroblogAccess", ".plugin", in_sign='ss', out_sign='',
        host.bridge.addMethod("mBSubscribeToMany", ".plugin", in_sign='sass', out_sign='s',
        host.bridge.addMethod("mBGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssaa{ss}a{ss}))',
                              method=self._mBGetFromManyRTResult, async=True)
        host.bridge.addMethod("mBGetFromMany", ".plugin", in_sign='sasia{ss}s', out_sign='s',

    ## plugin management methods ##

    def microblogCB(self, itemsEvent, profile):
        """Callback to "MICROBLOG" PEP event."""
        def manageItem(microblog_data):
  , "MICROBLOG", microblog_data, profile)

        for item in itemsEvent.items:
            self.item2mbdata(item).addCallbacks(manageItem, lambda failure: None)

    ## data/item transformation ##

    def _removeXHTMLMarkups(self, xhtml):
        """Remove XHTML markups from the given string.

        @param xhtml: the XHTML string to be cleaned
        @return: a Deferred instance for the cleaned string

    def item2mbdata(self, item_elt):
        """Convert an XML Item to microblog data used in bridge API

        @param item_elt: domish.Element of microblog item
        @return: microblog data (dictionary)
        microblog_data = {}

        def check_conflict(key, increment=False):
            """Check if key is already in microblog data

            @param key(unicode): key to check
            @param increment(bool): if suffix the key with an increment
                instead of raising an exception
            @raise exceptions.DataError: the key already exists
                (not raised if increment is True)
            if key in microblog_data:
                if not increment:
                    raise failure.Failure(exceptions.DataError("key {} is already present for item {}").format(key, item_elt['id']))
                    idx=1 # the idx 0 is the key without suffix
                    fmt = "{}#{}"
                    new_key = fmt.format(key, idx)
                    while new_key in microblog_data:
                        new_key = fmt.format(key, idx)
                    key = new_key
            return key

        def parseElement(elem):
            """Parse title/content elements and fill microblog_data accordingly"""
            type_ = elem.getAttribute('type')
            if type_ == 'xhtml':
                data_elt = elem.firstChildElement()
                if data_elt.uri != NS_XHTML:
                    raise failure.Failure(exceptions.DataError(_('Content of type XHTML must declare its namespace!')))
                key = check_conflict(u'{}_xhtml'.format(
                data = unicode(data_elt)
                microblog_data[key] = yield["TEXT-SYNTAXES"].clean_xhtml(data)
                key = check_conflict(
                microblog_data[key] = unicode(elem)

        id_ = item_elt.getAttribute('id', '') # there can be no id for transient nodes
        microblog_data['id'] = id_
        if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT):
            msg = u"Unsupported namespace {ns} in pubsub item {id_}".format(ns=item_elt.uri, id_=id_)
            raise failure.Failure(exceptions.DataError(msg))

            entry_elt = item_elt.elements(NS_ATOM, 'entry').next()
        except StopIteration:
            msg = u'No atom entry found in the pubsub item {}'.format(id_)
            raise failure.Failure(exceptions.DataError(msg))

        # atom:id
            id_elt = entry_elt.elements(NS_ATOM, 'id').next()
        except StopIteration:
            msg = u'No atom id found in the pubsub item {}, this is not standard !'.format(id_)
            microblog_data['atom_id'] = ""
            microblog_data['atom_id'] = unicode(id_elt)

        # title/content(s)
            title_elt = entry_elt.elements(NS_ATOM, 'title').next()
        except StopIteration:
            msg = u'No atom title found in the pubsub item {}'.format(id_)
            raise failure.Failure(exceptions.DataError(msg))

        yield parseElement(title_elt)

        for content_elt in entry_elt.elements(NS_ATOM, 'content'):
            yield parseElement(content_elt)

        # we check that text content is present
        for key in ('title', 'content'):
            if key not in microblog_data and ('{}_xhtml'.format(key)) in microblog_data:
                log.warning(u"item {id_} provide a {key}_xhtml data but not a text one".format(id_, key))
                # ... and do the conversion if it's not
                microblog_data[key] = yield["TEXT-SYNTAXES"].\

        if 'content' not in microblog_data:
            # use the atom title data as the microblog body content
            microblog_data['content'] = microblog_data['title']
            del microblog_data['title']
            if 'title_xhtml' in microblog_data:
                microblog_data['content_xhtml'] = microblog_data['title_xhtml']
                del microblog_data['title_xhtml']

        # published/updated dates
            updated_elt = entry_elt.elements(NS_ATOM, 'updated').next()
        except StopIteration:
            msg = u'No atom updated element found in the pubsub item {}'.format(id_)
            raise failure.Failure(exceptions.DataError(msg))
        microblog_data['updated'] = unicode(date.rfc3339.tf_from_timestamp(unicode(updated_elt)))
            published_elt = entry_elt.elements(NS_ATOM, 'published').next()
        except StopIteration:
            microblog_data['published'] = microblog_data['updated']
            microblog_data['published'] = unicode(date.rfc3339.tf_from_timestamp(unicode(published_elt)))

        # links
        for link_elt in entry_elt.elements(NS_ATOM, 'link'):
            if link_elt.getAttribute('rel') == 'replies' and link_elt.getAttribute('title') == 'comments':
                key = check_conflict('comments', True)
                microblog_data[key] = link_elt['href']
                    service, node = self.parseCommentUrl(microblog_data[key])
                    log.warning(u"Can't parse url {}".format(microblog_data[key]))
                    del microblog_data[key]
                    microblog_data['{}_service'.format(key)] = service.full()
                    microblog_data['{}_node'.format(key)] = node
                rel = link_elt.getAttribute('rel','')
                title = link_elt.getAttribute('title','')
                href = link_elt.getAttribute('href','')
                log.warning(u"Unmanaged link element: rel={rel} title={title} href={href}".format(rel=rel, title=title, href=href))

        # author
            author_elt = entry_elt.elements(NS_ATOM, 'author').next()
        except StopIteration:
            log.debug("Can't find author element in item {}".format(id_))
            # name
                name_elt = author_elt.elements(NS_ATOM, 'name').next()
            except StopIteration:
                log.warning("No name element found in author element of item {}".format(id_))
                microblog_data['author'] = unicode(name_elt)
            # uri
                uri_elt = author_elt.elements(NS_ATOM, 'uri').next()
            except StopIteration:
                log.debug("No uri element found in author element of item {}".format(id_))
                uri = unicode(uri_elt)
                if uri.startswith("xmpp:"):
                    uri = uri[5:]
                    microblog_data['author_uri'] = uri
                if item_elt.getAttribute("publisher") == uri:
                    microblog_data['author_uri_verified'] = C.BOOL_TRUE
                    log.warning("item atom:uri differ from publisher attribute, spoofing attempt ? atom:uri = {} publisher = {}".format(uri, item_elt.getAttribute("publisher")))
                    microblog_data['author_uri_verified'] = C.BOOL_FALSE
            # email
                email_elt = author_elt.elements(NS_ATOM, 'email').next()
            except StopIteration:
                microblog_data['author_email'] = unicode(email_elt)


    def data2entry(self, data, profile):
        """Convert a data dict to en entry usable to create an item

        @param data: data dict as given by bridge method.
        @return: deferred which fire domish.Element
        #TODO: rewrite this directly with twisted (i.e. without atom / reparsing)
        _uuid = unicode(uuid.uuid1())
        _entry = atom.Entry()
        _entry.title = ''  # reset the default value which is not empty

        elems = {'title': atom.Title, 'content': atom.Content}
        synt =["TEXT-SYNTAXES"]

        # loop on ('title', 'title_rich', 'title_xhtml', 'content', 'content_rich', 'content_xhtml')
        for key in elems.keys():
            for type_ in ['', 'rich', 'xhtml']:
                attr = "%s_%s" % (key, type_) if type_ else key
                if attr in data:
                    if type_:
                        if type_ == 'rich':  # convert input from current syntax to XHTML
                            converted = yield synt.convert(data[attr], synt.getCurrentSyntax(profile), "XHTML")
                        else:  # clean the XHTML input
                            converted = yield synt.clean_xhtml(data[attr])
                        elem = elems[key]((u'<div xmlns="%s">%s</div>' % (NS_XHTML, converted)).encode('utf-8'))
                        elem.attrs['type'] = 'xhtml'
                        if hasattr(_entry, '%s_xhtml' % key):
                            raise failure.Failure(exceptions.DataError(_("Can't have xhtml and rich content at the same time")))
                        setattr(_entry, '%s_xhtml' % key, elem)
                    else:  # raw text only needs to be escaped to get HTML-safe sequence
                        elem = elems[key](escape(data[attr]).encode('utf-8'))
                        elem.attrs['type'] = 'text'
                        setattr(_entry, key, elem)
            if not getattr(_entry, key).text:
                if hasattr(_entry, '%s_xhtml' % key):
                    text = yield self._removeXHTMLMarkups(getattr(_entry, '%s_xhtml' % key).text)
                    setattr(_entry, key, text)
        if not _entry.title.text:  # eventually move the data from content to title
            _entry.title = _entry.content.text
            _entry.title.attrs['type'] = _entry.content.attrs['type']
            _entry.content.text = ''
            _entry.content.attrs['type'] = ''
            if hasattr(_entry, 'content_xhtml'):
                _entry.title_xhtml = atom.Title(_entry.content_xhtml.text)
                _entry.title_xhtml.attrs['type'] = _entry.content_xhtml.attrs['type']
                _entry.content_xhtml.text = ''
                _entry.content_xhtml.attrs['type'] = '' = atom.Author() = data.get('author',[0].userhost()).encode('utf-8')
        _entry.updated = float(data.get('updated', time()))
        _entry.published = float(data.get('published', time()))
        entry_id = data.get('id', unicode(_uuid)) = entry_id.encode('utf-8')
        if 'comments' in data:
            link = atom.Link()
            link.attrs['href'] = data['comments']
            link.attrs['rel'] = 'replies'
            link.attrs['title'] = 'comments'
        _entry_elt = ElementParser()(str(_entry).decode('utf-8'))
        item = pubsub.Item(id=entry_id, payload=_entry_elt)

    ## publish ##

    def sendMicroblog(self, data, profile):
        """Send XEP-0277's microblog data

        @param data: must include content
        @param profile: profile which send the mood"""
        if 'content' not in data:
            log.error("Microblog data must contain at least 'content' key")
            raise failure.Failure(exceptions.DataError('no "content" key found'))
        content = data['content']
        if not content:
            log.error("Microblog data's content value must not be empty")
            raise failure.Failure(exceptions.DataError('empty content'))
        item = yield self.data2entry(data, profile)
        ret = yield self._p.publish(None, NS_MICROBLOG, [item], profile_key=profile)

    ## get ##

    def _getLastMicroblogs(self, pub_jid_s, max_items=10, profile_key=C.PROF_KEY_NONE):
        return self.getLastMicroblogs(jid.JID(pub_jid_s), max_items, profile_key)

    def getLastMicroblogs(self, pub_jid, max_items=10, profile_key=C.PROF_KEY_NONE):
        """Get the last published microblogs

        @param pub_jid(jid.JID): jid of the publisher
        @param max_items: how many microblogs we want to get
        @param profile_key: profile key

        @return: a deferred couple with the list of items and metadatas.
        items_data = yield self._p.getItems(pub_jid, NS_MICROBLOG, max_items=max_items, profile_key=profile_key)
        serialised = yield self._p.serItemsDataD(items_data, self.item2mbdata)

    def parseCommentUrl(self, node_url):
        """Parse a XMPP URI

        Determine the fields comments_service and comments_node of a microblog data
        from the href attribute of an entry's link element. For example this input:
        will return (JID(u''), '')
        @return: a tuple (JID, str)
        parsed_url = urlparse.urlparse(node_url, 'xmpp')
        service = jid.JID(parsed_url.path)
        queries = parsed_url.query.split(';')
        parsed_queries = dict()
        for query in queries:
        node = parsed_queries.get('node', [''])[0]

        if not node:
            raise failure.Failure(exceptions.DataError('Invalid comments link'))

        return (service, node)

    ## configure ##

    def setMicroblogAccess(self, access="presence", profile_key=C.PROF_KEY_NONE):
        """Create a microblog node on PEP with given access

        If the node already exists, it change options
        @param access: Node access model, according to xep-0060 #4.5
        @param profile_key: profile key"""

        _jid, xmlstream =
        if not _jid:
            log.error(_("Can't find profile's jid"))
        _options = {self._p.OPT_ACCESS_MODEL: access, self._p.OPT_PERSIST_ITEMS: 1, self._p.OPT_MAX_ITEMS: -1, self._p.OPT_DELIVER_PAYLOADS: 1, self._p.OPT_SEND_ITEM_SUBSCRIBE: 1}

        def cb(result):
            #Node is created with right permission
            log.debug(_(u"Microblog node has now access %s") % access)

        def fatal_err(s_error):
            #Something went wrong
            log.error(_("Can't set microblog access"))
            raise NodeAccessChangeException()

        def err_cb(s_error):
            #If the node already exists, the condition is "conflict",
            #else we have an unmanaged error
            if s_error.value.condition == 'conflict':
                #d =["XEP-0060"].deleteNode(_jid.userhostJID(), NS_MICROBLOG, profile_key=profile_key)
                #d.addCallback(lambda x: create_node().addCallback(cb).addErrback(fatal_err))

        def create_node():
            return self._p.createNode(_jid.userhostJID(), NS_MICROBLOG, _options, profile_key=profile_key)

        def change_node_options():
            return self._p.setOptions(_jid.userhostJID(), NS_MICROBLOG, _jid.userhostJID(), _options, profile_key=profile_key)


    ## methods to manage several stanzas/jids at once ##

    # common

    def _getClientAndNodeData(self, publishers_type, publishers, profile_key):
        """Helper method to construct node_data from publishers_type/publishers

        @param publishers_type: type of the list of publishers, one of:
            C.ALL: get all jids from roster, publishers is not used
            C.GROUP: get jids from groups
            C.JID: use publishers directly as list of jids
        @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids)
        @param profile_key: %(doc_profile_key)s
        client =
        if publishers_type == C.JID:
            jids_set = set(publishers)
            jids_set = client.roster.getJidsSet(publishers_type, publishers)

        node_data = []
        for jid_ in jids_set:
            node_data.append((jid_, NS_MICROBLOG))
        return client, node_data

    def _checkPublishers(self, publishers_type, publishers):
        """Helper method to deserialise publishers coming from bridge

        publishers_type(unicode): type of the list of publishers, one of:
        publishers: list of publishers according to type
        @return: deserialised (publishers_type, publishers) tuple
        if publishers_type == C.ALL:
            if publishers:
                raise failure.Failure(ValueError("Can't use publishers with {} type".format(publishers_type)))
                publishers = None
        elif publishers_type == C.JID:
            publishers[:] = [jid.JID(publisher) for publisher in publishers]
        return publishers_type, publishers

    # subscribe #

    def _mBSubscribeToMany(self, publishers_type, publishers, profile_key):

        @return (str): session id: Use pubsub.getSubscribeRTResult to get the results
        publishers_type, publishers = self._checkPublishers(publishers_type, publishers)
        return self.mBSubscribeToMany(publishers_type, publishers, profile_key)

    def mBSubscribeToMany(self, publishers_type, publishers, profile_key):
        """Subscribe microblogs for a list of groups or jids

        @param publishers_type: type of the list of publishers, one of:
            C.ALL: get all jids from roster, publishers is not used
            C.GROUP: get jids from groups
            C.JID: use publishers directly as list of jids
        @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids)
        @param profile: %(doc_profile)s
        @return (str): session id
        client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key)
        return self._p.subscribeToMany(node_data, client.jid.userhostJID(), profile_key=profile_key)

    # get #

    def _mBGetFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
        """Get real-time results for mBGetFromMany session

        @param session_id: id of the real-time deferred session
        @param return (tuple): (remaining, results) where:
            - remaining is the number of still expected results
            - results is a list of tuple with
                - service (unicode): pubsub service
                - node (unicode): pubsub node
                - failure (unicode): empty string in case of success, error message else
                - items_data(tuple): data tuple as returned by [getLastMicroblogs]
        @param profile_key: %(doc_profile_key)s
        def onSuccess(items_data):
            """convert items elements to list of microblog data in items_data"""
            d = self._p.serItemsDataD(items_data, self.item2mbdata)
            d.addCallback(lambda serialised:('', serialised))
            return d

        profile =
        d = self._p.getRTResults(session_id,
                                 on_success = onSuccess,
                                 on_error = lambda failure: (unicode(failure.value), ([],{})),
                                 profile = profile)
        d.addCallback(lambda ret: (ret[0],
                                   [(service.full(), node, failure, items, metadata)
                                    for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()]))
        return d

    def _mBGetFromMany(self, publishers_type, publishers, max_item=10, rsm_dict=None, profile_key=C.PROF_KEY_NONE):
        @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit
        max_item = None if max_item == C.NO_LIMIT else max_item
        publishers_type, publishers = self._checkPublishers(publishers_type, publishers)
        return self.mBGetFromMany(publishers_type, publishers, max_item, rsm.RSMRequest(**rsm_dict) if rsm_dict else None, profile_key)

    def mBGetFromMany(self, publishers_type, publishers, max_item=None, rsm_data=None, profile_key=C.PROF_KEY_NONE):
        """Get the published microblogs for a list of groups or jids

        @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL")
        @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids)
        @param max_items (int): optional limit on the number of retrieved items.
        @param rsm_data (rsm.RSMRequest): RSM request data, common to all publishers
        @param profile_key: profile key
        @return: a deferred dict with:
            - key: publisher (unicode)
            - value: couple (list[dict], dict) with:
                - the microblogs data
                - RSM response data
        client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key)
        return self._p.getFromMany(node_data, max_item, rsm_data, profile_key=profile_key)