Mercurial > libervia-backend
view src/plugins/plugin_xep_0277.py @ 1596:b7ee113183fc
jp: better profile commands:
- new "profile/default" command
- info doesn't show password anymore by default, need to be explicitly requested
- info and modify don't need to connect anymore
- modify can now set default profile. As use_profile is set, at least a profile session need to be started when it would not be mandatory technicaly (if just setting the profile as default is needed). But this option should not be used often, and it's not a big side effect, so I don't feel the need to create a new dedicated command, or to do complicated checks to avoid the session start.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 14 Nov 2015 19:18:10 +0100 |
parents | 94901070478e |
children | 2b8a975ff712 |
line wrap: on
line source
#!/usr/bin/python # -*- coding: utf-8 -*- # SAT plugin for microblogging over XMPP (xep-0277) # Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) from twisted.words.protocols.jabber import jid, error from twisted.words.xish import domish from twisted.internet import defer from twisted.python import failure from sat.core import exceptions from sat.tools import xml_tools from sat.tools import sat_defer # XXX: tmp.pubsub is actually use instead of wokkel version from wokkel import pubsub try: from feed.date import rfc3339 except ImportError: raise exceptions.MissingModule(u"Missing module pyfeed, please download/install it from http://home.avvanta.com/~steveha/pyfeed.html") import uuid import time import urlparse import urllib NS_MICROBLOG = 'urn:xmpp:microblog:0' NS_ATOM = 'http://www.w3.org/2005/Atom' NS_XHTML = 'http://www.w3.org/1999/xhtml' NS_PUBSUB_EVENT = "{}{}".format(pubsub.NS_PUBSUB, "#event") NS_COMMENT_PREFIX = '{}:comments/'.format(NS_MICROBLOG) PLUGIN_INFO = { "name": "Microblogging over XMPP Plugin", "import_name": "XEP-0277", "type": "XEP", "protocols": ["XEP-0277"], "dependencies": ["XEP-0163", "XEP-0060", "TEXT-SYNTAXES"], "recommendations": ["XEP-0059"], "main": "XEP_0277", "handler": "no", "description": _("""Implementation of microblogging Protocol""") } class NodeAccessChangeException(Exception): pass class XEP_0277(object): def __init__(self, host): log.info(_("Microblogging plugin initialization")) self.host = host self._p = self.host.plugins["XEP-0060"] # this facilitate the access to pubsub plugin self.rt_sessions = sat_defer.RTDeferredSessions() self.host.plugins["XEP-0060"].addManagedNode(None, items_cb=self._itemsReceived) host.bridge.addMethod("mbSend", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._mbSend, async=True) host.bridge.addMethod("mbRetract", ".plugin", in_sign='ssss', out_sign='', method=self._mbRetract, async=True) host.bridge.addMethod("mbGet", ".plugin", in_sign='ssiasa{ss}s', out_sign='(aa{ss}a{ss})', method=self._mbGet, async=True) host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='', method=self.mbSetAccess, async=True) host.bridge.addMethod("mbSetAccess", ".plugin", in_sign='ss', out_sign='', method=self.mbSetAccess, async=True) host.bridge.addMethod("mbSubscribeToMany", ".plugin", in_sign='sass', out_sign='s', method=self._mbSubscribeToMany) host.bridge.addMethod("mbGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssaa{ss}a{ss}))', method=self._mbGetFromManyRTResult, async=True) host.bridge.addMethod("mbGetFromMany", ".plugin", in_sign='sasia{ss}s', out_sign='s', method=self._mbGetFromMany) host.bridge.addMethod("mbGetFromManyWithCommentsRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssa(a{ss}a(sssaa{ss}a{ss}))a{ss}))', method=self._mbGetFromManyWithCommentsRTResult, async=True) host.bridge.addMethod("mbGetFromManyWithComments", ".plugin", in_sign='sasiia{ss}a{ss}s', out_sign='s', method=self._mbGetFromManyWithComments) host.bridge.addMethod("mbGetAtom", ".plugin", in_sign='ssiasa{ss}s', out_sign='s', method=self._mbGetAtom, async=True) ## plugin management methods ## def _itemsReceived(self, itemsEvent, profile): """Callback which manage items notifications (publish + retract)""" if not itemsEvent.nodeIdentifier.startswith(NS_MICROBLOG): return def manageItem(data, event): self.host.bridge.psEvent(C.PS_MICROBLOG, itemsEvent.sender.full(), itemsEvent.nodeIdentifier, event, data, profile) for item in itemsEvent.items: if item.name == C.PS_ITEM: self.item2mbdata(item).addCallbacks(manageItem, lambda failure: None, (C.PS_PUBLISH,)) elif item.name == C.PS_RETRACT: manageItem({'id': item['id']}, C.PS_RETRACT) else: raise exceptions.InternalError("Invalid event value") ## data/item transformation ## @defer.inlineCallbacks def item2mbdata(self, item_elt): """Convert an XML Item to microblog data used in bridge API @param item_elt: domish.Element of microblog item @return: microblog data (dictionary) """ microblog_data = {} def check_conflict(key, increment=False): """Check if key is already in microblog data @param key(unicode): key to check @param increment(bool): if suffix the key with an increment instead of raising an exception @raise exceptions.DataError: the key already exists (not raised if increment is True) """ if key in microblog_data: if not increment: raise failure.Failure(exceptions.DataError("key {} is already present for item {}").format(key, item_elt['id'])) else: idx=1 # the idx 0 is the key without suffix fmt = "{}#{}" new_key = fmt.format(key, idx) while new_key in microblog_data: idx+=1 new_key = fmt.format(key, idx) key = new_key return key @defer.inlineCallbacks def parseElement(elem): """Parse title/content elements and fill microblog_data accordingly""" type_ = elem.getAttribute('type') if type_ == 'xhtml': data_elt = elem.firstChildElement() if data_elt.uri != NS_XHTML: raise failure.Failure(exceptions.DataError(_('Content of type XHTML must declare its namespace!'))) key = check_conflict(u'{}_xhtml'.format(elem.name)) data = data_elt.toXml() microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].clean_xhtml(data) else: key = check_conflict(elem.name) microblog_data[key] = unicode(elem) id_ = item_elt.getAttribute('id', '') # there can be no id for transient nodes microblog_data['id'] = id_ if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT): msg = u"Unsupported namespace {ns} in pubsub item {id_}".format(ns=item_elt.uri, id_=id_) log.warning(msg) raise failure.Failure(exceptions.DataError(msg)) try: entry_elt = item_elt.elements(NS_ATOM, 'entry').next() except StopIteration: msg = u'No atom entry found in the pubsub item {}'.format(id_) raise failure.Failure(exceptions.DataError(msg)) # atom:id try: id_elt = entry_elt.elements(NS_ATOM, 'id').next() except StopIteration: msg = u'No atom id found in the pubsub item {}, this is not standard !'.format(id_) log.warning(msg) microblog_data['atom_id'] = "" else: microblog_data['atom_id'] = unicode(id_elt) # title/content(s) try: title_elt = entry_elt.elements(NS_ATOM, 'title').next() except StopIteration: msg = u'No atom title found in the pubsub item {}'.format(id_) raise failure.Failure(exceptions.DataError(msg)) yield parseElement(title_elt) for content_elt in entry_elt.elements(NS_ATOM, 'content'): yield parseElement(content_elt) # we check that text content is present for key in ('title', 'content'): if key not in microblog_data and ('{}_xhtml'.format(key)) in microblog_data: log.warning(u"item {id_} provide a {key}_xhtml data but not a text one".format(id_=id_, key=key)) # ... and do the conversion if it's not microblog_data[key] = yield self.host.plugins["TEXT-SYNTAXES"].\ convert(microblog_data['{}_xhtml'.format(key)], self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML, self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT, False) if 'content' not in microblog_data: # use the atom title data as the microblog body content microblog_data['content'] = microblog_data['title'] del microblog_data['title'] if 'title_xhtml' in microblog_data: microblog_data['content_xhtml'] = microblog_data['title_xhtml'] del microblog_data['title_xhtml'] # published/updated dates try: updated_elt = entry_elt.elements(NS_ATOM, 'updated').next() except StopIteration: msg = u'No atom updated element found in the pubsub item {}'.format(id_) raise failure.Failure(exceptions.DataError(msg)) microblog_data['updated'] = unicode(rfc3339.tf_from_timestamp(unicode(updated_elt))) try: published_elt = entry_elt.elements(NS_ATOM, 'published').next() except StopIteration: microblog_data['published'] = microblog_data['updated'] else: microblog_data['published'] = unicode(rfc3339.tf_from_timestamp(unicode(published_elt))) # links for link_elt in entry_elt.elements(NS_ATOM, 'link'): if link_elt.getAttribute('rel') == 'replies' and link_elt.getAttribute('title') == 'comments': key = check_conflict('comments', True) microblog_data[key] = link_elt['href'] try: service, node = self.parseCommentUrl(microblog_data[key]) except: log.warning(u"Can't parse url {}".format(microblog_data[key])) del microblog_data[key] else: microblog_data['{}_service'.format(key)] = service.full() microblog_data['{}_node'.format(key)] = node else: rel = link_elt.getAttribute('rel','') title = link_elt.getAttribute('title','') href = link_elt.getAttribute('href','') log.warning(u"Unmanaged link element: rel={rel} title={title} href={href}".format(rel=rel, title=title, href=href)) # author publisher = item_elt.getAttribute("publisher") try: author_elt = entry_elt.elements(NS_ATOM, 'author').next() except StopIteration: log.debug("Can't find author element in item {}".format(id_)) else: # name try: name_elt = author_elt.elements(NS_ATOM, 'name').next() except StopIteration: log.warning("No name element found in author element of item {}".format(id_)) else: microblog_data['author'] = unicode(name_elt) # uri try: uri_elt = author_elt.elements(NS_ATOM, 'uri').next() except StopIteration: log.debug("No uri element found in author element of item {}".format(id_)) if publisher: microblog_data['author_jid'] = publisher else: uri = unicode(uri_elt) if uri.startswith("xmpp:"): uri = uri[5:] microblog_data['author_jid'] = uri else: microblog_data['author_jid'] = item_elt.getAttribute("publisher") or "" if not publisher: log.debug("No publisher attribute, we can't verify author jid") microblog_data['author_jid_verified'] = C.BOOL_FALSE elif publisher == uri: microblog_data['author_jid_verified'] = C.BOOL_TRUE else: log.warning("item atom:uri differ from publisher attribute, spoofing attempt ? atom:uri = {} publisher = {}".format(uri, item_elt.getAttribute("publisher"))) microblog_data['author_jid_verified'] = C.BOOL_FALSE # email try: email_elt = author_elt.elements(NS_ATOM, 'email').next() except StopIteration: pass else: microblog_data['author_email'] = unicode(email_elt) defer.returnValue(microblog_data) @defer.inlineCallbacks def data2entry(self, data, item_id=None, profile_key=C.PROF_KEY_NONE): """Convert a data dict to en entry usable to create an item @param data: data dict as given by bridge method. @param item_id(unicode, None): id of the item to use if None the id will be generated randomly @return: deferred which fire domish.Element """ if item_id is None: item_id = unicode(uuid.uuid4()) client = self.host.getClient(profile_key) entry_elt = domish.Element((NS_ATOM, 'entry')) ## content and title ## synt = self.host.plugins["TEXT-SYNTAXES"] for elem_name in ('title', 'content'): for type_ in ['', '_rich', '_xhtml']: attr = "{}{}".format(elem_name, type_) if attr in data: elem = entry_elt.addElement(elem_name) if type_: if type_ == '_rich': # convert input from current syntax to XHTML converted = yield synt.convert(data[attr], synt.getCurrentSyntax(profile_key), "XHTML") if '{}_xhtml'.format(elem_name) in data: raise failure.Failure(exceptions.DataError(_("Can't have xhtml and rich content at the same time"))) else: # clean the XHTML input converted = yield synt.clean_xhtml(data[attr]) xml_content = u'<div xmlns="{ns}">{converted}</div>'.format( ns=NS_XHTML, converted=converted) elem.addChild(xml_tools.ElementParser()(xml_content)) elem['type'] = 'xhtml' if elem_name not in data: # there is raw text content, which is mandatory # so we create one from xhtml content elem_txt = entry_elt.addElement(elem_name) text_content = yield self.host.plugins["TEXT-SYNTAXES"].convert(xml_content, self.host.plugins["TEXT-SYNTAXES"].SYNTAX_XHTML, self.host.plugins["TEXT-SYNTAXES"].SYNTAX_TEXT, False) elem_txt.addContent(text_content) elem_txt['type'] = 'text' else: # raw text only needs to be escaped to get HTML-safe sequence elem.addContent(data[attr]) elem['type'] = 'text' try: entry_elt.elements(NS_ATOM, 'title').next() except StopIteration: # we have no title element which is mandatory # so we transform content element to title elems = list(entry_elt.elements(NS_ATOM, 'content')) if not elems: raise exceptions.DataError("There must be at least one content or title element") for elem in elems: elem.name = 'title' ## author ## author_elt = entry_elt.addElement('author') try: author_name = data['author'] except KeyError: # FIXME: must use better name author_name = client.jid.user author_elt.addElement('name', content=author_name) try: author_jid_s = data['author_jid'] except KeyError: author_jid_s = client.jid.userhost() author_elt.addElement('uri', content="xmpp:{}".format(author_jid_s)) ## published/updated time ## current_time = time.time() entry_elt.addElement('updated', content=rfc3339.timestamp_from_tf(float(data.get('updated', current_time)))) entry_elt.addElement('published', content=rfc3339.timestamp_from_tf(float(data.get('published', current_time)))) ## id ## entry_id = data.get('id', item_id) # FIXME: use a proper id (see XEP-0277 §7.1) entry_elt.addElement('id', content=entry_id) # ## comments ## if 'comments' in data: link_elt = entry_elt.addElement('link') link_elt['href'] = data['comments'] link_elt['rel'] = 'replies' link_elt['title'] = 'comments' ## final item building ## item_elt = pubsub.Item(id=item_id, payload=entry_elt) defer.returnValue(item_elt) ## publish ## @defer.inlineCallbacks def _manageComments(self, access, mb_data, service, node, item_id, profile): """Check comments keys in mb_data and create comments node if necessary @param access(unicode): access model @param mb_data(dict): microblog mb_data @param service(jid.JID): Pubsub service of the parent item @param node(unicode): node of the parent item @param item_id(unicoe): id of the parent item """ allow_comments = C.bool(mb_data.pop("allow_comments", "false")) if not allow_comments: return client = self.host.getClient(profile) options = {self._p.OPT_ACCESS_MODEL: access, self._p.OPT_PERSIST_ITEMS: 1, self._p.OPT_MAX_ITEMS: -1, self._p.OPT_DELIVER_PAYLOADS: 1, self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, self._p.OPT_PUBLISH_MODEL: "subscribers", # TODO: should be open if *both* node and item access_model are open (public node and item) } comments_node_base = u"{}{}".format(NS_COMMENT_PREFIX, item_id) comments_node = comments_node_base suffix = None comments_service = client.pubsub_service if client.pubsub_service is not None else service max_tries = 3 for i in xrange(max_tries+1): try: yield self._p.createNode(comments_service, comments_node, options, profile_key=profile) break except error.StanzaError as e: if e.condition == 'conflict' and i<max_tries: log.warning(u"node {} already exists on service {}".format(comments_node, comments_service)) suffix = 0 if suffix is None else suffix + 1 comments_node = u"{}_{}".format(comments_node_base, suffix) else: raise e if comments_service is None: comments_service = client.jid.userhostJID() mb_data['comments'] = "xmpp:%(service)s?%(query)s" % { 'service': comments_service.userhost(), 'query': urllib.urlencode([('node', comments_node.encode('utf-8'))]) } def _mbSend(self, service, node, data, profile_key): service = jid.JID(service) if service else None node = node if node else NS_MICROBLOG profile = self.host.memory.getProfileName(profile_key) return self.send(service, node, data, profile) @defer.inlineCallbacks def send(self, service=None, node=NS_MICROBLOG, data=None, profile=None): """Send XEP-0277's microblog data @param service(jid.JID, None): PubSub service where the microblog must be published None to publish on profile's PEP @param node(unicode): PubSub node to use (defaut to microblog NS) @param data(dict): microblog data (must include at least a "content" or a "title" key). see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details @param profile: %(doc_profile)s """ assert profile is not None item_id = data.get('id') or unicode(uuid.uuid4()) try: yield self._manageComments(self._p.ACCESS_OPEN, data, service, node, item_id, profile) except error.StanzaError: log.warning("Can't create comments node for item {}".format(item_id)) item = yield self.data2entry(data, item_id, profile) ret = yield self._p.publish(service, node, [item], profile_key=profile) defer.returnValue(ret) ## retract ## def _mbRetract(self, service_jid_s, nodeIdentifier, itemIdentifier, profile_key): """Call self._p._retractItem, but use default node if node is empty""" return self._p._retractItem(service_jid_s, nodeIdentifier or NS_MICROBLOG, itemIdentifier, True, profile_key) ## get ## def _mbGet(self, service_jid_s, node="", max_items=10, item_ids=None, extra_dict=None, profile_key=C.PROF_KEY_NONE): """ @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param item_ids (list[unicode]): list of item IDs """ max_items = None if max_items == C.NO_LIMIT else max_items extra = self._p.parseExtra(extra_dict) return self.mbGet(jid.JID(service_jid_s), node or None, max_items, item_ids, extra.rsm_request, extra.extra, profile_key) @defer.inlineCallbacks def mbGet(self, service_jid, node=None, max_items=None, item_ids=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get some microblogs @param service_jid(jid.JID): jid of the publisher @param node(unicode, None): node to get (or microblog node if None) @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param item_ids (list[unicode]): list of item IDs @param rsm_request (rsm.RSMRequest): RSM request data @param extra (dict): extra data @param profile_key: %(doc_profile_key)s @return: a deferred couple with the list of items and metadatas. """ if node is None: node = NS_MICROBLOG items_data = yield self._p.getItems(service_jid, node, max_items=max_items, item_ids=item_ids, rsm_request=rsm_request, extra=extra, profile_key=profile_key) serialised = yield self._p.serItemsDataD(items_data, self.item2mbdata) defer.returnValue(serialised) def parseCommentUrl(self, node_url): """Parse a XMPP URI Determine the fields comments_service and comments_node of a microblog data from the href attribute of an entry's link element. For example this input: xmpp:sat-pubsub.libervia.org?node=urn%3Axmpp%3Acomments%3A_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asouliane%40libervia.org will return (JID(u'sat-pubsub.libervia.org'), 'urn:xmpp:comments:_c5c4a142-2279-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:souliane@libervia.org') @return: a tuple (JID, str) """ parsed_url = urlparse.urlparse(node_url, 'xmpp') service = jid.JID(parsed_url.path) queries = parsed_url.query.split(';') parsed_queries = dict() for query in queries: parsed_queries.update(urlparse.parse_qs(query)) node = parsed_queries.get('node', [''])[0] if not node: raise failure.Failure(exceptions.DataError('Invalid comments link')) return (service, node) ## configure ## def mbSetAccess(self, access="presence", profile_key=C.PROF_KEY_NONE): """Create a microblog node on PEP with given access If the node already exists, it change options @param access: Node access model, according to xep-0060 #4.5 @param profile_key: profile key""" _jid, xmlstream = self.host.getJidNStream(profile_key) if not _jid: log.error(_("Can't find profile's jid")) return _options = {self._p.OPT_ACCESS_MODEL: access, self._p.OPT_PERSIST_ITEMS: 1, self._p.OPT_MAX_ITEMS: -1, self._p.OPT_DELIVER_PAYLOADS: 1, self._p.OPT_SEND_ITEM_SUBSCRIBE: 1} def cb(result): #Node is created with right permission log.debug(_(u"Microblog node has now access %s") % access) def fatal_err(s_error): #Something went wrong log.error(_("Can't set microblog access")) raise NodeAccessChangeException() def err_cb(s_error): #If the node already exists, the condition is "conflict", #else we have an unmanaged error if s_error.value.condition == 'conflict': #d = self.host.plugins["XEP-0060"].deleteNode(_jid.userhostJID(), NS_MICROBLOG, profile_key=profile_key) #d.addCallback(lambda x: create_node().addCallback(cb).addErrback(fatal_err)) change_node_options().addCallback(cb).addErrback(fatal_err) else: fatal_err(s_error) def create_node(): return self._p.createNode(_jid.userhostJID(), NS_MICROBLOG, _options, profile_key=profile_key) def change_node_options(): return self._p.setOptions(_jid.userhostJID(), NS_MICROBLOG, _jid.userhostJID(), _options, profile_key=profile_key) create_node().addCallback(cb).addErrback(err_cb) ## methods to manage several stanzas/jids at once ## # common def _getClientAndNodeData(self, publishers_type, publishers, profile_key): """Helper method to construct node_data from publishers_type/publishers @param publishers_type: type of the list of publishers, one of: C.ALL: get all jids from roster, publishers is not used C.GROUP: get jids from groups C.JID: use publishers directly as list of jids @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids) @param profile_key: %(doc_profile_key)s """ client = self.host.getClient(profile_key) if publishers_type == C.JID: jids_set = set(publishers) else: jids_set = client.roster.getJidsSet(publishers_type, publishers) node_data = [] for jid_ in jids_set: node_data.append((jid_, NS_MICROBLOG)) return client, node_data def _checkPublishers(self, publishers_type, publishers): """Helper method to deserialise publishers coming from bridge publishers_type(unicode): type of the list of publishers, one of: publishers: list of publishers according to type @return: deserialised (publishers_type, publishers) tuple """ if publishers_type == C.ALL: if publishers: raise failure.Failure(ValueError("Can't use publishers with {} type".format(publishers_type))) else: publishers = None elif publishers_type == C.JID: publishers[:] = [jid.JID(publisher) for publisher in publishers] return publishers_type, publishers # subscribe # def _mbSubscribeToMany(self, publishers_type, publishers, profile_key): """ @return (str): session id: Use pubsub.getSubscribeRTResult to get the results """ publishers_type, publishers = self._checkPublishers(publishers_type, publishers) return self.mbSubscribeToMany(publishers_type, publishers, profile_key) def mbSubscribeToMany(self, publishers_type, publishers, profile_key): """Subscribe microblogs for a list of groups or jids @param publishers_type: type of the list of publishers, one of: C.ALL: get all jids from roster, publishers is not used C.GROUP: get jids from groups C.JID: use publishers directly as list of jids @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids) @param profile: %(doc_profile)s @return (str): session id """ client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key) return self._p.subscribeToMany(node_data, client.jid.userhostJID(), profile_key=profile_key) # get # def _mbGetFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): """Get real-time results for mbGetFromMany session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of tuple with - service (unicode): pubsub service - node (unicode): pubsub node - failure (unicode): empty string in case of success, error message else - items_data(list): data as returned by [mbGet] - items_metadata(dict): metadata as returned by [mbGet] @param profile_key: %(doc_profile_key)s """ def onSuccess(items_data): """convert items elements to list of microblog data in items_data""" d = self._p.serItemsDataD(items_data, self.item2mbdata) d.addCallback(lambda serialised:('', serialised)) return d profile = self.host.getClient(profile_key).profile d = self._p.getRTResults(session_id, on_success = onSuccess, on_error = lambda failure: (unicode(failure.value), ([],{})), profile = profile) d.addCallback(lambda ret: (ret[0], [(service.full(), node, failure, items, metadata) for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) return d def _mbGetFromMany(self, publishers_type, publishers, max_items=10, extra_dict=None, profile_key=C.PROF_KEY_NONE): """ @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit """ max_items = None if max_items == C.NO_LIMIT else max_items publishers_type, publishers = self._checkPublishers(publishers_type, publishers) extra = self._p.parseExtra(extra_dict) return self.mbGetFromMany(publishers_type, publishers, max_items, extra.rsm_request, extra.extra, profile_key) def mbGetFromMany(self, publishers_type, publishers, max_items=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get the published microblogs for a list of groups or jids @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) @param max_items (int): optional limit on the number of retrieved items. @param rsm_request (rsm.RSMRequest): RSM request data, common to all publishers @param extra (dict): Extra data @param profile_key: profile key @return (str): RT Deferred session id """ # XXX: extra is unused here so far client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key) return self._p.getFromMany(node_data, max_items, rsm_request, profile_key=profile_key) # comments # def _mbGetFromManyWithCommentsRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): """Get real-time results for [mbGetFromManyWithComments] session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of 5-tuple with - service (unicode): pubsub service - node (unicode): pubsub node - failure (unicode): empty string in case of success, error message else - items(list[tuple(dict, list)]): list of 2-tuple with - item(dict): item microblog data - comments_list(list[tuple]): list of 5-tuple with - service (unicode): pubsub service where the comments node is - node (unicode): comments node - failure (unicode): empty in case of success, else error message - comments(list[dict]): list of microblog data - comments_metadata(dict): metadata of the comment node - metadata(dict): original node metadata @param profile_key: %(doc_profile_key)s """ profile = self.host.getClient(profile_key).profile d = self.rt_sessions.getResults(session_id, profile=profile) d.addCallback(lambda ret: (ret[0], [(service.full(), node, failure, items, metadata) for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) return d def _mbGetFromManyWithComments(self, publishers_type, publishers, max_items=10, max_comments=C.NO_LIMIT, extra_dict=None, extra_comments_dict=None, profile_key=C.PROF_KEY_NONE): """ @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param max_comments(int): maximum number of comments to get, C.NO_LIMIT for no limit """ max_items = None if max_items == C.NO_LIMIT else max_items max_comments = None if max_comments == C.NO_LIMIT else max_comments publishers_type, publishers = self._checkPublishers(publishers_type, publishers) extra = self._p.parseExtra(extra_dict) extra_comments = self._p.parseExtra(extra_comments_dict) return self.mbGetFromManyWithComments(publishers_type, publishers, max_items, max_comments, extra.rsm_request, extra.extra, extra_comments.rsm_request, extra_comments.extra, profile_key) def mbGetFromManyWithComments(self, publishers_type, publishers, max_items=None, max_comments=None, rsm_request=None, extra=None, rsm_comments=None, extra_comments=None, profile_key=C.PROF_KEY_NONE): """Helper method to get the microblogs and their comments in one shot @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) @param max_items (int): optional limit on the number of retrieved items. @param max_comments (int): maximum number of comments to retrieve @param rsm_request (rsm.RSMRequest): RSM request for initial items only @param extra (dict): extra configuration for initial items only @param rsm_comments (rsm.RSMRequest): RSM request for comments only @param extra_comments (dict): extra configuration for comments only @param profile_key: profile key @return (str): RT Deferred session id """ # XXX: this method seems complicated because it do a couple of treatments # to serialise and associate the data, but it make life in frontends side # a lot easier def getComments(items_data): """Retrieve comments and add them to the items_data @param items_data: serialised items data @return (defer.Deferred): list of items where each item is associated with a list of comments data (service, node, list of items, metadata) """ items, metadata = items_data items_dlist = [] # deferred list for items for item in items: dlist = [] # deferred list for comments for key, value in item.iteritems(): # we look for comments if key.startswith('comments') and key.endswith('_service'): prefix = key[:key.find('_')] service_s = value node = item["{}{}".format(prefix, "_node")] # time to get the comments d = self._p.getItems(jid.JID(service_s), node, max_comments, rsm_request=rsm_comments, extra=extra_comments, profile_key=profile_key) # then serialise d.addCallback(lambda items_data: self._p.serItemsDataD(items_data, self.item2mbdata)) # with failure handling d.addCallback(lambda serialised_items_data: ('',) + serialised_items_data) d.addErrback(lambda failure: (unicode(failure.value), [], {})) # and associate with service/node (needed if there are several comments nodes) d.addCallback(lambda serialised, service_s=service_s, node=node: (service_s, node) + serialised) dlist.append(d) # we get the comments comments_d = defer.gatherResults(dlist) # and add them to the item data comments_d.addCallback(lambda comments_data, item=item: (item, comments_data)) items_dlist.append(comments_d) # we gather the items + comments in a list items_d = defer.gatherResults(items_dlist) # and add the metadata items_d.addCallback(lambda items_completed: (items_completed, metadata)) return items_d client, node_data = self._getClientAndNodeData(publishers_type, publishers, profile_key) deferreds = {} for service, node in node_data: d = deferreds[(service, node)] = self._p.getItems(service, node, max_items, rsm_request=rsm_request, extra=extra, profile_key=profile_key) d.addCallback(lambda items_data: self._p.serItemsDataD(items_data, self.item2mbdata)) d.addCallback(getComments) d.addCallback(lambda items_comments_data: ('', items_comments_data)) d.addErrback(lambda failure: (unicode(failure.value), ([],{}))) return self.rt_sessions.newSession(deferreds, client.profile) # atom feed def _mbGetAtom(self, service_jid_s, node="", max_items=10, item_ids=None, extra_dict=None, profile_key=C.PROF_KEY_NONE): """Get the atom feed of the last published microblogs @param service_jid(jid.JID): jid of the publisher @param node(unicode, None): node to get (or microblog node if None) @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param item_ids (list[unicode]): list of item IDs @param rsm_request (rsm.RSMRequest): RSM request data @param extra (dict): extra data @param profile_key: %(doc_profile_key)s @return: a deferred unicode (atom XML feed) """ max_items = None if max_items == C.NO_LIMIT else max_items extra = self._p.parseExtra(extra_dict) return self.mbGetAtom(jid.JID(service_jid_s), node or None, max_items, item_ids, extra.rsm_request, extra.extra, profile_key) @defer.inlineCallbacks def mbGetAtom(self, service_jid, node=None, max_items=None, item_ids=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get the atom feed of the last published microblogs @param service_jid(jid.JID): jid of the publisher @param node(unicode, None): node to get (or microblog node if None) @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param item_ids (list[unicode]): list of item IDs @param rsm_request (rsm.RSMRequest): RSM request data @param extra (dict): extra data @param profile_key: %(doc_profile_key)s @return: a deferred couple with the list of items and metadatas. """ if node is None: node = NS_MICROBLOG items, metadata = yield self._p.getItems(service_jid, node, max_items=max_items, item_ids=item_ids, rsm_request=rsm_request, extra=extra, profile_key=profile_key) feed = """<?xml version="1.0" encoding="utf-8"?> <feed xmlns="http://www.w3.org/2005/Atom"> <title>%(user)s's blogposts</title> <link href="%(feed)s" rel="self" /> <link href="%(blog)s" /> <id>%(id)s</id> <updated>%(date)s</updated>\n""" % {'user': service_jid.user, 'feed': 'http://%s/blog/%s/atom.xml' % (service_jid.host, service_jid.user), 'blog': 'http://%s/blog/%s' % (service_jid.host, service_jid.user), 'id': node, 'date': rfc3339.timestamp_from_tf(rfc3339.tf_utc())} def removeAllURIs(element): """Recursively remove the URIs of the element and its children. Without that, the entry would still be valid but not displayed by Firefox nor Thunderbird (and probably more readers)""" element.uri = element.defaultUri = None for child in element.children: if isinstance(child, domish.Element): removeAllURIs(child) for item in items: entry = item.firstChildElement() removeAllURIs(entry) feed += " " + entry.toXml() + "\n" defer.returnValue(feed + "</feed>")