#!/usr/bin/python
# -*- coding: utf-8 -*-
# SAT plugin for microblogging over XMPP (xep-0277)
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from sat.core.i18n import _, D_
from sat.core.constants import Const as C
from sat.core.log import getLogger
log = getLogger(__name__)
from twisted.words.protocols.jabber import jid, error
from twisted.words.xish import domish
from twisted.internet import defer
from twisted.python import failure
from sat.core import exceptions
from sat.tools import xml_tools
from sat.tools import sat_defer
from sat.tools import common
# XXX: tmp.pubsub is actually use instead of wokkel version
from wokkel import pubsub
try:
from feed.date import rfc3339
except ImportError:
raise exceptions.MissingModule(u"Missing module pyfeed, please download/install it from http://home.avvanta.com/~steveha/pyfeed.html")
import uuid
import time
import urlparse
import urllib
NS_MICROBLOG = 'urn:xmpp:microblog:0'
NS_ATOM = 'http://www.w3.org/2005/Atom'
NS_XHTML = 'http://www.w3.org/1999/xhtml'
NS_PUBSUB_EVENT = "{}{}".format(pubsub.NS_PUBSUB, "#event")
NS_COMMENT_PREFIX = '{}:comments/'.format(NS_MICROBLOG)
PLUGIN_INFO = {
"name": "Microblogging over XMPP Plugin",
"import_name": "XEP-0277",
"type": "XEP",
"protocols": ["XEP-0277"],
"dependencies": ["XEP-0163", "XEP-0060", "TEXT-SYNTAXES"],
"recommendations": ["XEP-0059"],
"main": "XEP_0277",
"handler": "no",
"description": _("""Implementation of microblogging Protocol""")
}
class NodeAccessChangeException(Exception):
pass
class XEP_0277(object):
def __init__(self, host):
log.info(_(u"Microblogging plugin initialization"))
self.host = host
self._p = self.host.plugins["XEP-0060"] # this facilitate the access to pubsub plugin
self.rt_sessions = sat_defer.RTDeferredSessions()
self.host.plugins["XEP-0060"].addManagedNode(None, items_cb=self._itemsReceived)
host.bridge.addMethod("mbSend", ".plugin",
in_sign='ssa{ss}s', out_sign='',
method=self._mbSend,
async=True)
host.bridge.addMethod("mbRetract", ".plugin",
in_sign='ssss', out_sign='',
method=self._mbRetract,
async=True)
host.bridge.addMethod("mbGet", ".plugin",
in_sign='ssiasa{ss}s', out_sign='(aa{ss}a{ss})',
method=self._mbGet,
async=True)
host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='',
method=self.mbSetAccess,
async=True)
host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='',
method=self.mbSetAccess,
async=True)
host.bridge.addMethod("mbSubscribeToMany", ".plugin", in_sign='sass', out_sign='s',
method=self._mbSubscribeToMany)
host.bridge.addMethod("mbGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssaa{ss}a{ss}))',
method=self._mbGetFromManyRTResult, async=True)
host.bridge.addMethod("mbGetFromMany", ".plugin", in_sign='sasia{ss}s', out_sign='s',
method=self._mbGetFromMany)
host.bridge.addMethod("mbGetFromManyWithCommentsRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssa(a{ss}a(sssaa{ss}a{ss}))a{ss}))',
method=self._mbGetFromManyWithCommentsRTResult, async=True)
host.bridge.addMethod("mbGetFromManyWithComments", ".plugin", in_sign='sasiia{ss}a{ss}s', out_sign='s',
method=self._mbGetFromManyWithComments)
host.bridge.addMethod("mbGetAtom", ".plugin", in_sign='ssiasa{ss}s', out_sign='s',
method=self._mbGetAtom, async=True)
def _checkFeaturesCb(self, available):
return {'available': C.BOOL_TRUE}
def _checkFeaturesEb(self, fail):
return {'available': C.BOOL_FALSE}
def getFeatures(self, profile):
d = self.host.checkFeatures([], identity=('pubsub', 'pep'), profile=profile)
d.addCallbacks(self._checkFeaturesCb, self._checkFeaturesEb)
return d
## plugin management methods ##
def _itemsReceived(self, itemsEvent, profile):
"""Callback which manage items notifications (publish + retract)"""
if not itemsEvent.nodeIdentifier.startswith(NS_MICROBLOG):
return
def manageItem(data, event):
self.host.bridge.psEvent(C.PS_MICROBLOG, itemsEvent.sender.full(), itemsEvent.nodeIdentifier, event, data, profile)
for item in itemsEvent.items:
if item.name == C.PS_ITEM:
self.item2mbdata(item).addCallbacks(manageItem, lambda failure: None, (C.PS_PUBLISH,))
elif item.name == C.PS_RETRACT:
manageItem({'id': item['id']}, C.PS_RETRACT)
else:
raise exceptions.InternalError("Invalid event value")
## data/item transformation ##
@defer.inlineCallbacks
def item2mbdata(self, item_elt):
"""Convert an XML Item to microblog data used in bridge API
@param item_elt: domish.Element of microblog item
@return: microblog data (dictionary)
"""
microblog_data = {}
def check_conflict(key, increment=False):
"""Check if key is already in microblog data
@param key(unicode): key to check
@param increment(bool): if suffix the key with an increment
instead of raising an exception
@raise exceptions.DataError: the key already exists
(not raised if increment is True)
"""
if key in microblog_data:
if not increment:
raise failure.Failure(exceptions.DataError("key {} is already present for item {}").format(key, item_elt['id']))
else:
idx=1 # the idx 0 is the key without suffix
fmt = "{}#{}"
new_key = fmt.format(key, idx)
while new_key in microblog_data:
idx+=1
new_key = fmt.format(key, idx)
key = new_key
return key
@defer.inlineCallbacks
def parseElement(elem):
"""Parse title/content elements and fill microblog_data accordingly"""
type_ = elem.getAttribute('type')
if type_ == 'xhtml':
data_elt = elem.firstChildElement()
if data_elt.uri != NS_XHTML:
raise failure.Failure(exceptions.DataError(_('Content of type XHTML must declare its namespace!')))
key = check_conflict(u'{}_xhtml'.format(elem.name))
data = data_elt.toXml()
microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].clean_xhtml(data)
else:
key = check_conflict(elem.name)
microblog_data[key] = unicode(elem)
id_ = item_elt.getAttribute('id', '') # there can be no id for transient nodes
microblog_data['id'] = id_
if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT):
msg = u"Unsupported namespace {ns} in pubsub item {id_}".format(ns=item_elt.uri, id_=id_)
log.warning(msg)
raise failure.Failure(exceptions.DataError(msg))
try:
entry_elt = item_elt.elements(NS_ATOM, 'entry').next()
except StopIteration:
msg = u'No atom entry found in the pubsub item {}'.format(id_)
raise failure.Failure(exceptions.DataError(msg))
# atom:id
try:
id_elt = entry_elt.elements(NS_ATOM, 'id').next()
except StopIteration:
msg = u'No atom id found in the pubsub item {}, this is not standard !'.format(id_)
log.warning(msg)
microblog_data['atom_id'] = ""
else:
microblog_data['atom_id'] = unicode(id_elt)
# title/content(s)
# FIXME: ATOM and XEP-0277 only allow 1
element
# but in the wild we have some blogs with several ones
# so we don't respect the standard for now (it doesn't break
# anything anyway), and we'll find a better option later
# try:
# title_elt = entry_elt.elements(NS_ATOM, 'title').next()
# except StopIteration:
# msg = u'No atom title found in the pubsub item {}'.format(id_)
# raise failure.Failure(exceptions.DataError(msg))
title_elts = list(entry_elt.elements(NS_ATOM, 'title'))
if not title_elts:
msg = u'No atom title found in the pubsub item {}'.format(id_)
raise failure.Failure(exceptions.DataError(msg))
for title_elt in title_elts:
yield parseElement(title_elt)
# FIXME: as for , Atom only authorise at most 1 content
# but XEP-0277 allows several ones. So for no we handle as
# if more than one can be present
for content_elt in entry_elt.elements(NS_ATOM, 'content'):
yield parseElement(content_elt)
# we check that text content is present
for key in ('title', 'content'):
if key not in microblog_data and ('{}_xhtml'.format(key)) in microblog_data:
log.warning(u"item {id_} provide a {key}_xhtml data but not a text one".format(id_=id_, key=key))
# ... and do the conversion if it's not
microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].\
convert(microblog_data['{}_xhtml'.format(key)],
self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML,
self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT,
False)
if 'content' not in microblog_data:
# use the atom title data as the microblog body content
microblog_data['content'] = microblog_data['title']
del microblog_data['title']
if 'title_xhtml' in microblog_data:
microblog_data['content_xhtml'] = microblog_data['title_xhtml']
del microblog_data['title_xhtml']
# published/updated dates
try:
updated_elt = entry_elt.elements(NS_ATOM, 'updated').next()
except StopIteration:
msg = u'No atom updated element found in the pubsub item {}'.format(id_)
raise failure.Failure(exceptions.DataError(msg))
microblog_data['updated'] = unicode(rfc3339.tf_from_timestamp(unicode(updated_elt)))
try:
published_elt = entry_elt.elements(NS_ATOM, 'published').next()
except StopIteration:
microblog_data['published'] = microblog_data['updated']
else:
microblog_data['published'] = unicode(rfc3339.tf_from_timestamp(unicode(published_elt)))
# links
for link_elt in entry_elt.elements(NS_ATOM, 'link'):
if link_elt.getAttribute('rel') == 'replies' and link_elt.getAttribute('title') == 'comments':
key = check_conflict('comments', True)
microblog_data[key] = link_elt['href']
try:
service, node = self.parseCommentUrl(microblog_data[key])
except:
log.warning(u"Can't parse url {}".format(microblog_data[key]))
del microblog_data[key]
else:
microblog_data['{}_service'.format(key)] = service.full()
microblog_data['{}_node'.format(key)] = node
else:
rel = link_elt.getAttribute('rel','')
title = link_elt.getAttribute('title','')
href = link_elt.getAttribute('href','')
log.warning(u"Unmanaged link element: rel={rel} title={title} href={href}".format(rel=rel, title=title, href=href))
# author
try:
author_elt = entry_elt.elements(NS_ATOM, 'author').next()
except StopIteration:
log.debug(u"Can't find author element in item {}".format(id_))
else:
publisher = item_elt.getAttribute("publisher")
# name
try:
name_elt = author_elt.elements(NS_ATOM, 'name').next()
except StopIteration:
log.warning(u"No name element found in author element of item {}".format(id_))
else:
microblog_data['author'] = unicode(name_elt)
# uri
try:
uri_elt = author_elt.elements(NS_ATOM, 'uri').next()
except StopIteration:
log.debug(u"No uri element found in author element of item {}".format(id_))
if publisher:
microblog_data['author_jid'] = publisher
else:
uri = unicode(uri_elt)
if uri.startswith("xmpp:"):
uri = uri[5:]
microblog_data['author_jid'] = uri
else:
microblog_data['author_jid'] = item_elt.getAttribute("publisher") or ""
if not publisher:
log.debug(u"No publisher attribute, we can't verify author jid")
microblog_data['author_jid_verified'] = C.BOOL_FALSE
elif jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID():
microblog_data['author_jid_verified'] = C.BOOL_TRUE
else:
log.warning(u"item atom:uri differ from publisher attribute, spoofing attempt ? atom:uri = {} publisher = {}".format(uri, item_elt.getAttribute("publisher")))
microblog_data['author_jid_verified'] = C.BOOL_FALSE
# email
try:
email_elt = author_elt.elements(NS_ATOM, 'email').next()
except StopIteration:
pass
else:
microblog_data['author_email'] = unicode(email_elt)
# categories
categories = (category_elt.getAttribute('term','') for category_elt in entry_elt.elements(NS_ATOM, 'category'))
common.iter2dict('tag', categories, microblog_data)
## the trigger ##
# if other plugins have things to add or change
yield self.host.trigger.point("XEP-0277_item2data", item_elt, entry_elt, microblog_data)
defer.returnValue(microblog_data)
@defer.inlineCallbacks
def data2entry(self, data, item_id=None, profile=C.PROF_KEY_NONE):
"""Convert a data dict to en entry usable to create an item
@param data: data dict as given by bridge method.
@param item_id(unicode, None): id of the item to use
if None the id will be generated randomly
@return: deferred which fire domish.Element
"""
if item_id is None:
item_id = unicode(uuid.uuid4())
client = self.host.getClient(profile)
entry_elt = domish.Element((NS_ATOM, 'entry'))
## content and title ##
synt = self.host.plugins["TEXT-SYNTAXES"]
for elem_name in ('title', 'content'):
for type_ in ['', '_rich', '_xhtml']:
attr = "{}{}".format(elem_name, type_)
if attr in data:
elem = entry_elt.addElement(elem_name)
if type_:
if type_ == '_rich': # convert input from current syntax to XHTML
converted = yield synt.convert(data[attr], synt.getCurrentSyntax(profile), "XHTML")
if '{}_xhtml'.format(elem_name) in data:
raise failure.Failure(exceptions.DataError(_("Can't have xhtml and rich content at the same time")))
else: # clean the XHTML input
converted = yield synt.clean_xhtml(data[attr])
xml_content = u'{converted}
'.format(
ns=NS_XHTML,
converted=converted)
elem.addChild(xml_tools.ElementParser()(xml_content))
elem['type'] = 'xhtml'
if elem_name not in data:
# there is raw text content, which is mandatory
# so we create one from xhtml content
elem_txt = entry_elt.addElement(elem_name)
text_content = yield self.host.plugins["TEXT-SYNTAXES"].convert(xml_content,
self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML,
self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT,
False)
elem_txt.addContent(text_content)
elem_txt['type'] = 'text'
else: # raw text only needs to be escaped to get HTML-safe sequence
elem.addContent(data[attr])
elem['type'] = 'text'
try:
entry_elt.elements(NS_ATOM, 'title').next()
except StopIteration:
# we have no title element which is mandatory
# so we transform content element to title
elems = list(entry_elt.elements(NS_ATOM, 'content'))
if not elems:
raise exceptions.DataError("There must be at least one content or title element")
for elem in elems:
elem.name = 'title'
## author ##
author_elt = entry_elt.addElement('author')
try:
author_name = data['author']
except KeyError:
# FIXME: must use better name
author_name = client.jid.user
author_elt.addElement('name', content=author_name)
try:
author_jid_s = data['author_jid']
except KeyError:
author_jid_s = client.jid.userhost()
author_elt.addElement('uri', content="xmpp:{}".format(author_jid_s))
## published/updated time ##
current_time = time.time()
entry_elt.addElement('updated',
content=rfc3339.timestamp_from_tf(float(data.get('updated', current_time))))
entry_elt.addElement('published',
content=rfc3339.timestamp_from_tf(float(data.get('published', current_time))))
## categories ##
for tag in common.dict2iter("tag", data):
category_elt = entry_elt.addElement("category")
category_elt['term'] = tag
## id ##
entry_id = data.get('id', item_id) # FIXME: use a proper id (see XEP-0277 §7.1)
entry_elt.addElement('id', content=entry_id) #
## comments ##
if 'comments' in data:
link_elt = entry_elt.addElement('link')
link_elt['href'] = data['comments']
link_elt['rel'] = 'replies'
link_elt['title'] = 'comments'
## final item building ##
item_elt = pubsub.Item(id=item_id, payload=entry_elt)
## the trigger ##
# if other plugins have things to add or change
yield self.host.trigger.point("XEP-0277_data2entry", client, data, entry_elt, item_elt)
defer.returnValue(item_elt)
## publish ##
@defer.inlineCallbacks
def _manageComments(self, access, mb_data, service, node, item_id, profile):
"""Check comments keys in mb_data and create comments node if necessary
@param access(unicode): access model
@param mb_data(dict): microblog mb_data
@param service(jid.JID): Pubsub service of the parent item
@param node(unicode): node of the parent item
@param item_id(unicoe): id of the parent item
"""
allow_comments = C.bool(mb_data.pop("allow_comments", "false"))
if not allow_comments:
return
client = self.host.getClient(profile)
options = {self._p.OPT_ACCESS_MODEL: access,
self._p.OPT_PERSIST_ITEMS: 1,
self._p.OPT_MAX_ITEMS: -1,
self._p.OPT_DELIVER_PAYLOADS: 1,
self._p.OPT_SEND_ITEM_SUBSCRIBE: 1,
self._p.OPT_PUBLISH_MODEL: "subscribers", # TODO: should be open if *both* node and item access_model are open (public node and item)
}
# if other plugins need to change the options
yield self.host.trigger.point("XEP-0277_comments", client, mb_data, options)
comments_node_base = u"{}{}".format(NS_COMMENT_PREFIX, item_id)
comments_node = comments_node_base
suffix = None
comments_service = client.pubsub_service if client.pubsub_service is not None else service
# we try to create the comment nodes #
max_tries = 3
for i in xrange(max_tries+1):
try:
yield self._p.createNode(comments_service, comments_node, options, profile_key=profile)
break
except error.StanzaError as e:
if e.condition == 'conflict' and i'+feed_elt.toXml())