Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0277.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0277.py@524856bd7b19 |
children | 0e48181d50ab |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0277.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,1724 @@ +#!/usr/bin/env python3 + +# SAT plugin for microblogging over XMPP (xep-0277) +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import time +import dateutil +import calendar +from mimetypes import guess_type +from secrets import token_urlsafe +from typing import List, Optional, Dict, Tuple, Any, Dict +from functools import partial + +import shortuuid + +from twisted.words.protocols.jabber import jid, error +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from twisted.internet import defer +from twisted.python import failure + +# XXX: sat_tmp.wokkel.pubsub is actually used instead of wokkel version +from wokkel import pubsub +from wokkel import disco, iwokkel, rsm +from zope.interface import implementer + +from libervia.backend.core.i18n import _ +from libervia.backend.core.constants import Const as C +from libervia.backend.core.log import getLogger +from libervia.backend.core import exceptions +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.tools import xml_tools +from libervia.backend.tools import sat_defer +from libervia.backend.tools import utils +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common import uri as xmpp_uri +from libervia.backend.tools.common import regex + + +log = getLogger(__name__) + + +NS_MICROBLOG = "urn:xmpp:microblog:0" +NS_ATOM = "http://www.w3.org/2005/Atom" +NS_PUBSUB_EVENT = f"{pubsub.NS_PUBSUB}#event" +NS_COMMENT_PREFIX = f"{NS_MICROBLOG}:comments/" + + +PLUGIN_INFO = { + C.PI_NAME: "Microblogging over XMPP Plugin", + C.PI_IMPORT_NAME: "XEP-0277", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: ["XEP-0277"], + C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0060", "TEXT_SYNTAXES"], + C.PI_RECOMMENDATIONS: ["XEP-0059", "EXTRA-PEP", "PUBSUB_CACHE"], + C.PI_MAIN: "XEP_0277", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Implementation of microblogging Protocol"""), +} + + +class NodeAccessChangeException(Exception): + pass + + +class XEP_0277(object): + namespace = NS_MICROBLOG + NS_ATOM = NS_ATOM + + def __init__(self, host): + log.info(_("Microblogging plugin initialization")) + self.host = host + host.register_namespace("microblog", NS_MICROBLOG) + self._p = self.host.plugins[ + "XEP-0060" + ] # this facilitate the access to pubsub plugin + ps_cache = self.host.plugins.get("PUBSUB_CACHE") + if ps_cache is not None: + ps_cache.register_analyser( + { + "name": "XEP-0277", + "node": NS_MICROBLOG, + "namespace": NS_ATOM, + "type": "blog", + "to_sync": True, + "parser": self.item_2_mb_data, + "match_cb": self._cache_node_match_cb, + } + ) + self.rt_sessions = sat_defer.RTDeferredSessions() + self.host.plugins["XEP-0060"].add_managed_node( + NS_MICROBLOG, items_cb=self._items_received + ) + + host.bridge.add_method( + "mb_send", + ".plugin", + in_sign="ssss", + out_sign="s", + method=self._mb_send, + async_=True, + ) + host.bridge.add_method( + "mb_repeat", + ".plugin", + in_sign="sssss", + out_sign="s", + method=self._mb_repeat, + async_=True, + ) + host.bridge.add_method( + "mb_preview", + ".plugin", + in_sign="ssss", + out_sign="s", + method=self._mb_preview, + async_=True, + ) + host.bridge.add_method( + "mb_retract", + ".plugin", + in_sign="ssss", + out_sign="", + method=self._mb_retract, + async_=True, + ) + host.bridge.add_method( + "mb_get", + ".plugin", + in_sign="ssiasss", + out_sign="s", + method=self._mb_get, + async_=True, + ) + host.bridge.add_method( + "mb_rename", + ".plugin", + in_sign="sssss", + out_sign="", + method=self._mb_rename, + async_=True, + ) + host.bridge.add_method( + "mb_access_set", + ".plugin", + in_sign="ss", + out_sign="", + method=self.mb_access_set, + async_=True, + ) + host.bridge.add_method( + "mb_subscribe_to_many", + ".plugin", + in_sign="sass", + out_sign="s", + method=self._mb_subscribe_to_many, + ) + host.bridge.add_method( + "mb_get_from_many_rt_result", + ".plugin", + in_sign="ss", + out_sign="(ua(sssasa{ss}))", + method=self._mb_get_from_many_rt_result, + async_=True, + ) + host.bridge.add_method( + "mb_get_from_many", + ".plugin", + in_sign="sasia{ss}s", + out_sign="s", + method=self._mb_get_from_many, + ) + host.bridge.add_method( + "mb_get_from_many_with_comments_rt_result", + ".plugin", + in_sign="ss", + out_sign="(ua(sssa(sa(sssasa{ss}))a{ss}))", + method=self._mb_get_from_many_with_comments_rt_result, + async_=True, + ) + host.bridge.add_method( + "mb_get_from_many_with_comments", + ".plugin", + in_sign="sasiia{ss}a{ss}s", + out_sign="s", + method=self._mb_get_from_many_with_comments, + ) + host.bridge.add_method( + "mb_is_comment_node", + ".plugin", + in_sign="s", + out_sign="b", + method=self.is_comment_node, + ) + + def get_handler(self, client): + return XEP_0277_handler() + + def _cache_node_match_cb( + self, + client: SatXMPPEntity, + analyse: dict, + ) -> None: + """Check is analysed node is a comment and fill analyse accordingly""" + if analyse["node"].startswith(NS_COMMENT_PREFIX): + analyse["subtype"] = "comment" + + def _check_features_cb(self, available): + return {"available": C.BOOL_TRUE} + + def _check_features_eb(self, fail): + return {"available": C.BOOL_FALSE} + + def features_get(self, profile): + client = self.host.get_client(profile) + d = self.host.check_features(client, [], identity=("pubsub", "pep")) + d.addCallbacks(self._check_features_cb, self._check_features_eb) + return d + + ## plugin management methods ## + + def _items_received(self, client, itemsEvent): + """Callback which manage items notifications (publish + retract)""" + + def manage_item(data, event): + self.host.bridge.ps_event( + C.PS_MICROBLOG, + itemsEvent.sender.full(), + itemsEvent.nodeIdentifier, + event, + data_format.serialise(data), + client.profile, + ) + + for item in itemsEvent.items: + if item.name == C.PS_ITEM: + # FIXME: service and node should be used here + self.item_2_mb_data(client, item, None, None).addCallbacks( + manage_item, lambda failure: None, (C.PS_PUBLISH,) + ) + elif item.name == C.PS_RETRACT: + manage_item({"id": item["id"]}, C.PS_RETRACT) + else: + raise exceptions.InternalError("Invalid event value") + + ## data/item transformation ## + + @defer.inlineCallbacks + def item_2_mb_data( + self, + client: SatXMPPEntity, + item_elt: domish.Element, + service: Optional[jid.JID], + # FIXME: node is Optional until all calls to item_2_mb_data set properly service + # and node. Once done, the Optional must be removed here + node: Optional[str] + ) -> dict: + """Convert an XML Item to microblog data + + @param item_elt: microblog item element + @param service: PubSub service where the item has been retrieved + profile's PEP is used when service is None + @param node: PubSub node where the item has been retrieved + if None, "uri" won't be set + @return: microblog data + """ + if service is None: + service = client.jid.userhostJID() + + extra: Dict[str, Any] = {} + microblog_data: Dict[str, Any] = { + "service": service.full(), + "extra": extra + } + + def check_conflict(key, increment=False): + """Check if key is already in microblog data + + @param key(unicode): key to check + @param increment(bool): if suffix the key with an increment + instead of raising an exception + @raise exceptions.DataError: the key already exists + (not raised if increment is True) + """ + if key in microblog_data: + if not increment: + raise failure.Failure( + exceptions.DataError( + "key {} is already present for item {}" + ).format(key, item_elt["id"]) + ) + else: + idx = 1 # the idx 0 is the key without suffix + fmt = "{}#{}" + new_key = fmt.format(key, idx) + while new_key in microblog_data: + idx += 1 + new_key = fmt.format(key, idx) + key = new_key + return key + + @defer.inlineCallbacks + def parseElement(elem): + """Parse title/content elements and fill microblog_data accordingly""" + type_ = elem.getAttribute("type") + if type_ == "xhtml": + data_elt = elem.firstChildElement() + if data_elt is None: + raise failure.Failure( + exceptions.DataError( + "XHML content not wrapped in a <div/> element, this is not " + "standard !" + ) + ) + if data_elt.uri != C.NS_XHTML: + raise failure.Failure( + exceptions.DataError( + _("Content of type XHTML must declare its namespace!") + ) + ) + key = check_conflict("{}_xhtml".format(elem.name)) + data = data_elt.toXml() + microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml( + data + ) + else: + key = check_conflict(elem.name) + microblog_data[key] = str(elem) + + id_ = item_elt.getAttribute("id", "") # there can be no id for transient nodes + microblog_data["id"] = id_ + if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT): + msg = "Unsupported namespace {ns} in pubsub item {id_}".format( + ns=item_elt.uri, id_=id_ + ) + log.warning(msg) + raise failure.Failure(exceptions.DataError(msg)) + + try: + entry_elt = next(item_elt.elements(NS_ATOM, "entry")) + except StopIteration: + msg = "No atom entry found in the pubsub item {}".format(id_) + raise failure.Failure(exceptions.DataError(msg)) + + # uri + # FIXME: node should alway be set in the future, check FIXME in method signature + if node is not None: + microblog_data["node"] = node + microblog_data['uri'] = xmpp_uri.build_xmpp_uri( + "pubsub", + path=service.full(), + node=node, + item=id_, + ) + + # language + try: + microblog_data["language"] = entry_elt[(C.NS_XML, "lang")].strip() + except KeyError: + pass + + # atom:id + try: + id_elt = next(entry_elt.elements(NS_ATOM, "id")) + except StopIteration: + msg = ("No atom id found in the pubsub item {}, this is not standard !" + .format(id_)) + log.warning(msg) + microblog_data["atom_id"] = "" + else: + microblog_data["atom_id"] = str(id_elt) + + # title/content(s) + + # FIXME: ATOM and XEP-0277 only allow 1 <title/> element + # but in the wild we have some blogs with several ones + # so we don't respect the standard for now (it doesn't break + # anything anyway), and we'll find a better option later + # try: + # title_elt = entry_elt.elements(NS_ATOM, 'title').next() + # except StopIteration: + # msg = u'No atom title found in the pubsub item {}'.format(id_) + # raise failure.Failure(exceptions.DataError(msg)) + title_elts = list(entry_elt.elements(NS_ATOM, "title")) + if not title_elts: + msg = "No atom title found in the pubsub item {}".format(id_) + raise failure.Failure(exceptions.DataError(msg)) + for title_elt in title_elts: + yield parseElement(title_elt) + + # FIXME: as for <title/>, Atom only authorise at most 1 content + # but XEP-0277 allows several ones. So for no we handle as + # if more than one can be present + for content_elt in entry_elt.elements(NS_ATOM, "content"): + yield parseElement(content_elt) + + # we check that text content is present + for key in ("title", "content"): + if key not in microblog_data and ("{}_xhtml".format(key)) in microblog_data: + log.warning( + "item {id_} provide a {key}_xhtml data but not a text one".format( + id_=id_, key=key + ) + ) + # ... and do the conversion if it's not + microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert( + microblog_data["{}_xhtml".format(key)], + self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML, + self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT, + False, + ) + + if "content" not in microblog_data: + # use the atom title data as the microblog body content + microblog_data["content"] = microblog_data["title"] + del microblog_data["title"] + if "title_xhtml" in microblog_data: + microblog_data["content_xhtml"] = microblog_data["title_xhtml"] + del microblog_data["title_xhtml"] + + # published/updated dates + try: + updated_elt = next(entry_elt.elements(NS_ATOM, "updated")) + except StopIteration: + msg = "No atom updated element found in the pubsub item {}".format(id_) + raise failure.Failure(exceptions.DataError(msg)) + microblog_data["updated"] = calendar.timegm( + dateutil.parser.parse(str(updated_elt)).utctimetuple() + ) + try: + published_elt = next(entry_elt.elements(NS_ATOM, "published")) + except StopIteration: + microblog_data["published"] = microblog_data["updated"] + else: + microblog_data["published"] = calendar.timegm( + dateutil.parser.parse(str(published_elt)).utctimetuple() + ) + + # links + comments = microblog_data['comments'] = [] + for link_elt in entry_elt.elements(NS_ATOM, "link"): + href = link_elt.getAttribute("href") + if not href: + log.warning( + f'missing href in <link> element: {link_elt.toXml()}' + ) + continue + rel = link_elt.getAttribute("rel") + if (rel == "replies" and link_elt.getAttribute("title") == "comments"): + uri = href + comments_data = { + "uri": uri, + } + try: + comment_service, comment_node = self.parse_comment_url(uri) + except Exception as e: + log.warning(f"Can't parse comments url: {e}") + continue + else: + comments_data["service"] = comment_service.full() + comments_data["node"] = comment_node + comments.append(comments_data) + elif rel == "via": + try: + repeater_jid = jid.JID(item_elt["publisher"]) + except (KeyError, RuntimeError): + try: + # we look for stanza element which is at the root, meaning that it + # has not parent + top_elt = item_elt.parent + while top_elt.parent is not None: + top_elt = top_elt.parent + repeater_jid = jid.JID(top_elt["from"]) + except (AttributeError, RuntimeError): + # we should always have either the "publisher" attribute or the + # stanza available + log.error( + f"Can't find repeater of the post: {item_elt.toXml()}" + ) + continue + + extra["repeated"] = { + "by": repeater_jid.full(), + "uri": href + } + elif rel in ("related", "enclosure"): + attachment: Dict[str, Any] = { + "sources": [{"url": href}] + } + if rel == "related": + attachment["external"] = True + for attr, key in ( + ("type", "media_type"), + ("title", "desc"), + ): + value = link_elt.getAttribute(attr) + if value: + attachment[key] = value + try: + attachment["size"] = int(link_elt.attributes["lenght"]) + except (KeyError, ValueError): + pass + if "media_type" not in attachment: + media_type = guess_type(href, False)[0] + if media_type is not None: + attachment["media_type"] = media_type + + attachments = extra.setdefault("attachments", []) + attachments.append(attachment) + else: + log.warning( + f"Unmanaged link element: {link_elt.toXml()}" + ) + + # author + publisher = item_elt.getAttribute("publisher") + try: + author_elt = next(entry_elt.elements(NS_ATOM, "author")) + except StopIteration: + log.debug("Can't find author element in item {}".format(id_)) + else: + # name + try: + name_elt = next(author_elt.elements(NS_ATOM, "name")) + except StopIteration: + log.warning( + "No name element found in author element of item {}".format(id_) + ) + author = None + else: + author = microblog_data["author"] = str(name_elt).strip() + # uri + try: + uri_elt = next(author_elt.elements(NS_ATOM, "uri")) + except StopIteration: + log.debug( + "No uri element found in author element of item {}".format(id_) + ) + if publisher: + microblog_data["author_jid"] = publisher + else: + uri = str(uri_elt) + if uri.startswith("xmpp:"): + uri = uri[5:] + microblog_data["author_jid"] = uri + else: + microblog_data["author_jid"] = ( + item_elt.getAttribute("publisher") or "" + ) + if not author and microblog_data["author_jid"]: + # FIXME: temporary workaround for missing author name, would be + # better to use directly JID's identity (to be done from frontends?) + try: + microblog_data["author"] = jid.JID(microblog_data["author_jid"]).user + except Exception as e: + log.warning(f"No author name found, and can't parse author jid: {e}") + + if not publisher: + log.debug("No publisher attribute, we can't verify author jid") + microblog_data["author_jid_verified"] = False + elif jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID(): + microblog_data["author_jid_verified"] = True + else: + if "repeated" not in extra: + log.warning( + "item atom:uri differ from publisher attribute, spoofing " + "attempt ? atom:uri = {} publisher = {}".format( + uri, item_elt.getAttribute("publisher") + ) + ) + microblog_data["author_jid_verified"] = False + # email + try: + email_elt = next(author_elt.elements(NS_ATOM, "email")) + except StopIteration: + pass + else: + microblog_data["author_email"] = str(email_elt) + + if not microblog_data.get("author_jid"): + if publisher: + microblog_data["author_jid"] = publisher + microblog_data["author_jid_verified"] = True + else: + iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM) + microblog_data["author_jid"] = iq_elt["from"] + microblog_data["author_jid_verified"] = False + + # categories + categories = [ + category_elt.getAttribute("term", "") + for category_elt in entry_elt.elements(NS_ATOM, "category") + ] + microblog_data["tags"] = categories + + ## the trigger ## + # if other plugins have things to add or change + yield self.host.trigger.point( + "XEP-0277_item2data", item_elt, entry_elt, microblog_data + ) + + defer.returnValue(microblog_data) + + async def mb_data_2_entry_elt(self, client, mb_data, item_id, service, node): + """Convert a data dict to en entry usable to create an item + + @param mb_data: data dict as given by bridge method. + @param item_id(unicode): id of the item to use + @param service(jid.JID, None): pubsub service where the item is sent + Needed to construct Atom id + @param node(unicode): pubsub node where the item is sent + Needed to construct Atom id + @return: deferred which fire domish.Element + """ + entry_elt = domish.Element((NS_ATOM, "entry")) + extra = mb_data.get("extra", {}) + + ## language ## + if "language" in mb_data: + entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip() + + ## content and title ## + synt = self.host.plugins["TEXT_SYNTAXES"] + + for elem_name in ("title", "content"): + for type_ in ["", "_rich", "_xhtml"]: + attr = f"{elem_name}{type_}" + if attr in mb_data: + elem = entry_elt.addElement(elem_name) + if type_: + if type_ == "_rich": # convert input from current syntax to XHTML + xml_content = await synt.convert( + mb_data[attr], synt.get_current_syntax(client.profile), "XHTML" + ) + if f"{elem_name}_xhtml" in mb_data: + raise failure.Failure( + exceptions.DataError( + _( + "Can't have xhtml and rich content at the same time" + ) + ) + ) + else: + xml_content = mb_data[attr] + + div_elt = xml_tools.ElementParser()( + xml_content, namespace=C.NS_XHTML + ) + if ( + div_elt.name != "div" + or div_elt.uri != C.NS_XHTML + or div_elt.attributes + ): + # we need a wrapping <div/> at the top with XHTML namespace + wrap_div_elt = domish.Element((C.NS_XHTML, "div")) + wrap_div_elt.addChild(div_elt) + div_elt = wrap_div_elt + elem.addChild(div_elt) + elem["type"] = "xhtml" + if elem_name not in mb_data: + # there is raw text content, which is mandatory + # so we create one from xhtml content + elem_txt = entry_elt.addElement(elem_name) + text_content = await self.host.plugins[ + "TEXT_SYNTAXES" + ].convert( + xml_content, + self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML, + self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT, + False, + ) + elem_txt.addContent(text_content) + elem_txt["type"] = "text" + + else: # raw text only needs to be escaped to get HTML-safe sequence + elem.addContent(mb_data[attr]) + elem["type"] = "text" + + try: + next(entry_elt.elements(NS_ATOM, "title")) + except StopIteration: + # we have no title element which is mandatory + # so we transform content element to title + elems = list(entry_elt.elements(NS_ATOM, "content")) + if not elems: + raise exceptions.DataError( + "There must be at least one content or title element" + ) + for elem in elems: + elem.name = "title" + + ## attachments ## + attachments = extra.get(C.KEY_ATTACHMENTS) + if attachments: + for attachment in attachments: + try: + url = attachment["url"] + except KeyError: + try: + url = next( + s['url'] for s in attachment["sources"] if 'url' in s + ) + except (StopIteration, KeyError): + log.warning( + f'"url" missing in attachment, ignoring: {attachment}' + ) + continue + + if not url.startswith("http"): + log.warning(f"non HTTP URL in attachment, ignoring: {attachment}") + continue + link_elt = entry_elt.addElement("link") + # XXX: "uri" is set in self._manage_comments if not already existing + link_elt["href"] = url + if attachment.get("external", False): + # this is a link to an external data such as a website + link_elt["rel"] = "related" + else: + # this is an attached file + link_elt["rel"] = "enclosure" + for key, attr in ( + ("media_type", "type"), + ("desc", "title"), + ("size", "lenght") + ): + value = attachment.get(key) + if value: + link_elt[attr] = str(value) + + ## author ## + author_elt = entry_elt.addElement("author") + try: + author_name = mb_data["author"] + except KeyError: + # FIXME: must use better name + author_name = client.jid.user + author_elt.addElement("name", content=author_name) + + try: + author_jid_s = mb_data["author_jid"] + except KeyError: + author_jid_s = client.jid.userhost() + author_elt.addElement("uri", content="xmpp:{}".format(author_jid_s)) + + try: + author_jid_s = mb_data["author_email"] + except KeyError: + pass + + ## published/updated time ## + current_time = time.time() + entry_elt.addElement( + "updated", content=utils.xmpp_date(float(mb_data.get("updated", current_time))) + ) + entry_elt.addElement( + "published", + content=utils.xmpp_date(float(mb_data.get("published", current_time))), + ) + + ## categories ## + for tag in mb_data.get('tags', []): + category_elt = entry_elt.addElement("category") + category_elt["term"] = tag + + ## id ## + entry_id = mb_data.get( + "id", + xmpp_uri.build_xmpp_uri( + "pubsub", + path=service.full() if service is not None else client.jid.userhost(), + node=node, + item=item_id, + ), + ) + entry_elt.addElement("id", content=entry_id) # + + ## comments ## + for comments_data in mb_data.get('comments', []): + link_elt = entry_elt.addElement("link") + # XXX: "uri" is set in self._manage_comments if not already existing + link_elt["href"] = comments_data["uri"] + link_elt["rel"] = "replies" + link_elt["title"] = "comments" + + if "repeated" in extra: + try: + repeated = extra["repeated"] + link_elt = entry_elt.addElement("link") + link_elt["rel"] = "via" + link_elt["href"] = repeated["uri"] + except KeyError as e: + log.warning( + f"invalid repeated element({e}): {extra['repeated']}" + ) + + ## final item building ## + item_elt = pubsub.Item(id=item_id, payload=entry_elt) + + ## the trigger ## + # if other plugins have things to add or change + self.host.trigger.point( + "XEP-0277_data2entry", client, mb_data, entry_elt, item_elt + ) + + return item_elt + + ## publish/preview ## + + def is_comment_node(self, node: str) -> bool: + """Indicate if the node is prefixed with comments namespace""" + return node.startswith(NS_COMMENT_PREFIX) + + def get_parent_item(self, item_id: str) -> str: + """Return parent of a comment node + + @param item_id: a comment node + """ + if not self.is_comment_node(item_id): + raise ValueError("This node is not a comment node") + return item_id[len(NS_COMMENT_PREFIX):] + + def get_comments_node(self, item_id): + """Generate comment node + + @param item_id(unicode): id of the parent item + @return (unicode): comment node to use + """ + return f"{NS_COMMENT_PREFIX}{item_id}" + + def get_comments_service(self, client, parent_service=None): + """Get prefered PubSub service to create comment node + + @param pubsub_service(jid.JID, None): PubSub service of the parent item + @param return((D)jid.JID, None): PubSub service to use + """ + if parent_service is not None: + if parent_service.user: + # we are on a PEP + if parent_service.host == client.jid.host: + # it's our server, we use already found client.pubsub_service below + pass + else: + # other server, let's try to find a non PEP service there + d = self.host.find_service_entity( + client, "pubsub", "service", parent_service + ) + d.addCallback(lambda entity: entity or parent_service) + else: + # parent is already on a normal Pubsub service, we re-use it + return defer.succeed(parent_service) + + return defer.succeed( + client.pubsub_service if client.pubsub_service is not None else parent_service + ) + + async def _manage_comments(self, client, mb_data, service, node, item_id, access=None): + """Check comments keys in mb_data and create comments node if necessary + + if a comments node metadata is set in the mb_data['comments'] list, it is used + otherwise it is generated (if allow_comments is True). + @param mb_data(dict): microblog mb_data + @param service(jid.JID, None): PubSub service of the parent item + @param node(unicode): node of the parent item + @param item_id(unicode): id of the parent item + @param access(unicode, None): access model + None to use same access model as parent item + """ + allow_comments = mb_data.pop("allow_comments", None) + if allow_comments is None: + if "comments" in mb_data: + mb_data["allow_comments"] = True + else: + # no comments set or requested, nothing to do + return + elif allow_comments == False: + if "comments" in mb_data: + log.warning( + "comments are not allowed but there is already a comments node, " + "it may be lost: {uri}".format( + uri=mb_data["comments"] + ) + ) + del mb_data["comments"] + return + + # we have usually a single comment node, but the spec allow several, so we need to + # handle this in a list + if len(mb_data.setdefault('comments', [])) == 0: + # we need at least one comment node + comments_data = {} + mb_data['comments'].append({}) + + if access is None: + # TODO: cache access models per service/node + parent_node_config = await self._p.getConfiguration(client, service, node) + access = parent_node_config.get(self._p.OPT_ACCESS_MODEL, self._p.ACCESS_OPEN) + + options = { + self._p.OPT_ACCESS_MODEL: access, + self._p.OPT_MAX_ITEMS: "max", + self._p.OPT_PERSIST_ITEMS: 1, + self._p.OPT_DELIVER_PAYLOADS: 1, + self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, + # FIXME: would it make sense to restrict publish model to subscribers? + self._p.OPT_PUBLISH_MODEL: self._p.ACCESS_OPEN, + } + + # if other plugins need to change the options + self.host.trigger.point("XEP-0277_comments", client, mb_data, options) + + for comments_data in mb_data['comments']: + uri = comments_data.get('uri') + comments_node = comments_data.get('node') + try: + comments_service = jid.JID(comments_data["service"]) + except KeyError: + comments_service = None + + if uri: + uri_service, uri_node = self.parse_comment_url(uri) + if ((comments_node is not None and comments_node!=uri_node) + or (comments_service is not None and comments_service!=uri_service)): + raise ValueError( + f"Incoherence between comments URI ({uri}) and comments_service " + f"({comments_service}) or comments_node ({comments_node})") + comments_data['service'] = comments_service = uri_service + comments_data['node'] = comments_node = uri_node + else: + if not comments_node: + comments_node = self.get_comments_node(item_id) + comments_data['node'] = comments_node + if comments_service is None: + comments_service = await self.get_comments_service(client, service) + if comments_service is None: + comments_service = client.jid.userhostJID() + comments_data['service'] = comments_service + + comments_data['uri'] = xmpp_uri.build_xmpp_uri( + "pubsub", + path=comments_service.full(), + node=comments_node, + ) + + try: + await self._p.createNode(client, comments_service, comments_node, options) + except error.StanzaError as e: + if e.condition == "conflict": + log.info( + "node {} already exists on service {}".format( + comments_node, comments_service + ) + ) + else: + raise e + else: + if access == self._p.ACCESS_WHITELIST: + # for whitelist access we need to copy affiliations from parent item + comments_affiliations = await self._p.get_node_affiliations( + client, service, node + ) + # …except for "member", that we transform to publisher + # because we wants members to be able to write to comments + for jid_, affiliation in list(comments_affiliations.items()): + if affiliation == "member": + comments_affiliations[jid_] == "publisher" + + await self._p.set_node_affiliations( + client, comments_service, comments_node, comments_affiliations + ) + + def friendly_id(self, data): + """Generate a user friendly id from title or content""" + # TODO: rich content should be converted to plain text + id_base = regex.url_friendly_text( + data.get('title') + or data.get('title_rich') + or data.get('content') + or data.get('content_rich') + or '' + ) + return f"{id_base}-{token_urlsafe(3)}" + + def _mb_send(self, service, node, data, profile_key): + service = jid.JID(service) if service else None + node = node if node else NS_MICROBLOG + client = self.host.get_client(profile_key) + data = data_format.deserialise(data) + return defer.ensureDeferred(self.send(client, data, service, node)) + + async def send( + self, + client: SatXMPPEntity, + data: dict, + service: Optional[jid.JID] = None, + node: Optional[str] = NS_MICROBLOG + ) -> Optional[str]: + """Send XEP-0277's microblog data + + @param data: microblog data (must include at least a "content" or a "title" key). + see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details + @param service: PubSub service where the microblog must be published + None to publish on profile's PEP + @param node: PubSub node to use (defaut to microblog NS) + None is equivalend as using default value + @return: ID of the published item + """ + # TODO: check that all data keys are used, this would avoid sending publicly a private message + # by accident (e.g. if group plugin is not loaded, and "group*" key are not used) + if service is None: + service = client.jid.userhostJID() + if node is None: + node = NS_MICROBLOG + + item_id = data.get("id") + if item_id is None: + if data.get("user_friendly_id", True): + item_id = self.friendly_id(data) + else: + item_id = str(shortuuid.uuid()) + + try: + await self._manage_comments(client, data, service, node, item_id, access=None) + except error.StanzaError: + log.warning("Can't create comments node for item {}".format(item_id)) + item = await self.mb_data_2_entry_elt(client, data, item_id, service, node) + + if not await self.host.trigger.async_point( + "XEP-0277_send", client, service, node, item, data + ): + return None + + extra = {} + for key in ("encrypted", "encrypted_for", "signed"): + value = data.get(key) + if value is not None: + extra[key] = value + + await self._p.publish(client, service, node, [item], extra=extra) + return item_id + + def _mb_repeat( + self, + service_s: str, + node: str, + item: str, + extra_s: str, + profile_key: str + ) -> defer.Deferred: + service = jid.JID(service_s) if service_s else None + node = node if node else NS_MICROBLOG + client = self.host.get_client(profile_key) + extra = data_format.deserialise(extra_s) + d = defer.ensureDeferred( + self.repeat(client, item, service, node, extra) + ) + # [repeat] can return None, and we always need a str + d.addCallback(lambda ret: ret or "") + return d + + async def repeat( + self, + client: SatXMPPEntity, + item: str, + service: Optional[jid.JID] = None, + node: str = NS_MICROBLOG, + extra: Optional[dict] = None, + ) -> Optional[str]: + """Re-publish a post from somewhere else + + This is a feature often name "share" or "boost", it is generally used to make a + publication more visible by sharing it with our own audience + """ + if service is None: + service = client.jid.userhostJID() + + # we first get the post to repeat + items, __ = await self._p.get_items( + client, + service, + node, + item_ids = [item] + ) + if not items: + raise exceptions.NotFound( + f"no item found at node {node!r} on {service} with ID {item!r}" + ) + item_elt = items[0] + try: + entry_elt = next(item_elt.elements(NS_ATOM, "entry")) + except StopIteration: + raise exceptions.DataError( + "post to repeat is not a XEP-0277 blog item" + ) + + # we want to be sure that we have an author element + try: + author_elt = next(entry_elt.elements(NS_ATOM, "author")) + except StopIteration: + author_elt = entry_elt.addElement("author") + + try: + next(author_elt.elements(NS_ATOM, "name")) + except StopIteration: + author_elt.addElement("name", content=service.user) + + try: + next(author_elt.elements(NS_ATOM, "uri")) + except StopIteration: + entry_elt.addElement( + "uri", content=xmpp_uri.build_xmpp_uri(None, path=service.full()) + ) + + # we add the link indicating that it's a repeated post + link_elt = entry_elt.addElement("link") + link_elt["rel"] = "via" + link_elt["href"] = xmpp_uri.build_xmpp_uri( + "pubsub", path=service.full(), node=node, item=item + ) + + return await self._p.send_item( + client, + client.jid.userhostJID(), + NS_MICROBLOG, + entry_elt + ) + + def _mb_preview(self, service, node, data, profile_key): + service = jid.JID(service) if service else None + node = node if node else NS_MICROBLOG + client = self.host.get_client(profile_key) + data = data_format.deserialise(data) + d = defer.ensureDeferred(self.preview(client, data, service, node)) + d.addCallback(data_format.serialise) + return d + + async def preview( + self, + client: SatXMPPEntity, + data: dict, + service: Optional[jid.JID] = None, + node: Optional[str] = NS_MICROBLOG + ) -> dict: + """Preview microblog data without publishing them + + params are the same as for [send] + @return: microblog data as would be retrieved from published item + """ + if node is None: + node = NS_MICROBLOG + + item_id = data.get("id", "") + + # we have to serialise then deserialise to be sure that all triggers are called + item_elt = await self.mb_data_2_entry_elt(client, data, item_id, service, node) + item_elt.uri = pubsub.NS_PUBSUB + return await self.item_2_mb_data(client, item_elt, service, node) + + + ## retract ## + + def _mb_retract(self, service_jid_s, nodeIdentifier, itemIdentifier, profile_key): + """Call self._p._retract_item, but use default node if node is empty""" + return self._p._retract_item( + service_jid_s, + nodeIdentifier or NS_MICROBLOG, + itemIdentifier, + True, + profile_key, + ) + + ## get ## + + def _mb_get_serialise(self, data): + items, metadata = data + metadata['items'] = items + return data_format.serialise(metadata) + + def _mb_get(self, service="", node="", max_items=10, item_ids=None, extra="", + profile_key=C.PROF_KEY_NONE): + """ + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit + @param item_ids (list[unicode]): list of item IDs + """ + client = self.host.get_client(profile_key) + service = jid.JID(service) if service else None + max_items = None if max_items == C.NO_LIMIT else max_items + extra = self._p.parse_extra(data_format.deserialise(extra)) + d = defer.ensureDeferred( + self.mb_get(client, service, node or None, max_items, item_ids, + extra.rsm_request, extra.extra) + ) + d.addCallback(self._mb_get_serialise) + return d + + async def mb_get( + self, + client: SatXMPPEntity, + service: Optional[jid.JID] = None, + node: Optional[str] = None, + max_items: Optional[int] = 10, + item_ids: Optional[List[str]] = None, + rsm_request: Optional[rsm.RSMRequest] = None, + extra: Optional[Dict[str, Any]] = None + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + """Get some microblogs + + @param service(jid.JID, None): jid of the publisher + None to get profile's PEP + @param node(unicode, None): node to get (or microblog node if None) + @param max_items(int): maximum number of item to get, None for no limit + ignored if rsm_request is set + @param item_ids (list[unicode]): list of item IDs + @param rsm_request (rsm.RSMRequest): RSM request data + @param extra (dict): extra data + + @return: a deferred couple with the list of items and metadatas. + """ + if node is None: + node = NS_MICROBLOG + if rsm_request: + max_items = None + items_data = await self._p.get_items( + client, + service, + node, + max_items=max_items, + item_ids=item_ids, + rsm_request=rsm_request, + extra=extra, + ) + mb_data_list, metadata = await self._p.trans_items_data_d( + items_data, partial(self.item_2_mb_data, client, service=service, node=node)) + encrypted = metadata.pop("encrypted", None) + if encrypted is not None: + for mb_data in mb_data_list: + try: + mb_data["encrypted"] = encrypted[mb_data["id"]] + except KeyError: + pass + return (mb_data_list, metadata) + + def _mb_rename(self, service, node, item_id, new_id, profile_key): + return defer.ensureDeferred(self.mb_rename( + self.host.get_client(profile_key), + jid.JID(service) if service else None, + node or None, + item_id, + new_id + )) + + async def mb_rename( + self, + client: SatXMPPEntity, + service: Optional[jid.JID], + node: Optional[str], + item_id: str, + new_id: str + ) -> None: + if not node: + node = NS_MICROBLOG + await self._p.rename_item(client, service, node, item_id, new_id) + + def parse_comment_url(self, node_url): + """Parse a XMPP URI + + Determine the fields comments_service and comments_node of a microblog data + from the href attribute of an entry's link element. For example this input: + xmpp:sat-pubsub.example.net?;node=urn%3Axmpp%3Acomments%3A_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asomebody%40example.net + will return(JID(u'sat-pubsub.example.net'), 'urn:xmpp:comments:_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:somebody@example.net') + @return (tuple[jid.JID, unicode]): service and node + """ + try: + parsed_url = xmpp_uri.parse_xmpp_uri(node_url) + service = jid.JID(parsed_url["path"]) + node = parsed_url["node"] + except Exception as e: + raise exceptions.DataError(f"Invalid comments link: {e}") + + return (service, node) + + ## configure ## + + def mb_access_set(self, access="presence", profile_key=C.PROF_KEY_NONE): + """Create a microblog node on PEP with given access + + If the node already exists, it change options + @param access: Node access model, according to xep-0060 #4.5 + @param profile_key: profile key + """ + # FIXME: check if this mehtod is need, deprecate it if not + client = self.host.get_client(profile_key) + + _options = { + self._p.OPT_ACCESS_MODEL: access, + self._p.OPT_MAX_ITEMS: "max", + self._p.OPT_PERSIST_ITEMS: 1, + self._p.OPT_DELIVER_PAYLOADS: 1, + self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, + } + + def cb(result): + # Node is created with right permission + log.debug(_("Microblog node has now access %s") % access) + + def fatal_err(s_error): + # Something went wrong + log.error(_("Can't set microblog access")) + raise NodeAccessChangeException() + + def err_cb(s_error): + # If the node already exists, the condition is "conflict", + # else we have an unmanaged error + if s_error.value.condition == "conflict": + # d = self.host.plugins["XEP-0060"].deleteNode(client, client.jid.userhostJID(), NS_MICROBLOG) + # d.addCallback(lambda x: create_node().addCallback(cb).addErrback(fatal_err)) + change_node_options().addCallback(cb).addErrback(fatal_err) + else: + fatal_err(s_error) + + def create_node(): + return self._p.createNode( + client, client.jid.userhostJID(), NS_MICROBLOG, _options + ) + + def change_node_options(): + return self._p.setOptions( + client.jid.userhostJID(), + NS_MICROBLOG, + client.jid.userhostJID(), + _options, + profile_key=profile_key, + ) + + create_node().addCallback(cb).addErrback(err_cb) + + ## methods to manage several stanzas/jids at once ## + + # common + + def _get_client_and_node_data(self, publishers_type, publishers, profile_key): + """Helper method to construct node_data from publishers_type/publishers + + @param publishers_type: type of the list of publishers, one of: + C.ALL: get all jids from roster, publishers is not used + C.GROUP: get jids from groups + C.JID: use publishers directly as list of jids + @param publishers: list of publishers, according to "publishers_type" (None, + list of groups or list of jids) + @param profile_key: %(doc_profile_key)s + """ + client = self.host.get_client(profile_key) + if publishers_type == C.JID: + jids_set = set(publishers) + else: + jids_set = client.roster.get_jids_set(publishers_type, publishers) + if publishers_type == C.ALL: + try: + # display messages from salut-a-toi@libervia.org or other PEP services + services = self.host.plugins["EXTRA-PEP"].get_followed_entities( + profile_key + ) + except KeyError: + pass # plugin is not loaded + else: + if services: + log.debug( + "Extra PEP followed entities: %s" + % ", ".join([str(service) for service in services]) + ) + jids_set.update(services) + + node_data = [] + for jid_ in jids_set: + node_data.append((jid_, NS_MICROBLOG)) + return client, node_data + + def _check_publishers(self, publishers_type, publishers): + """Helper method to deserialise publishers coming from bridge + + publishers_type(unicode): type of the list of publishers, one of: + publishers: list of publishers according to type + @return: deserialised (publishers_type, publishers) tuple + """ + if publishers_type == C.ALL: + if publishers: + raise failure.Failure( + ValueError( + "Can't use publishers with {} type".format(publishers_type) + ) + ) + else: + publishers = None + elif publishers_type == C.JID: + publishers[:] = [jid.JID(publisher) for publisher in publishers] + return publishers_type, publishers + + # subscribe # + + def _mb_subscribe_to_many(self, publishers_type, publishers, profile_key): + """ + + @return (str): session id: Use pubsub.getSubscribeRTResult to get the results + """ + publishers_type, publishers = self._check_publishers(publishers_type, publishers) + return self.mb_subscribe_to_many(publishers_type, publishers, profile_key) + + def mb_subscribe_to_many(self, publishers_type, publishers, profile_key): + """Subscribe microblogs for a list of groups or jids + + @param publishers_type: type of the list of publishers, one of: + C.ALL: get all jids from roster, publishers is not used + C.GROUP: get jids from groups + C.JID: use publishers directly as list of jids + @param publishers: list of publishers, according to "publishers_type" (None, list + of groups or list of jids) + @param profile: %(doc_profile)s + @return (str): session id + """ + client, node_data = self._get_client_and_node_data( + publishers_type, publishers, profile_key + ) + return self._p.subscribe_to_many( + node_data, client.jid.userhostJID(), profile_key=profile_key + ) + + # get # + + def _mb_get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): + """Get real-time results for mb_get_from_many session + + @param session_id: id of the real-time deferred session + @param return (tuple): (remaining, results) where: + - remaining is the number of still expected results + - results is a list of tuple with + - service (unicode): pubsub service + - node (unicode): pubsub node + - failure (unicode): empty string in case of success, error message else + - items_data(list): data as returned by [mb_get] + - items_metadata(dict): metadata as returned by [mb_get] + @param profile_key: %(doc_profile_key)s + """ + + client = self.host.get_client(profile_key) + + def onSuccess(items_data): + """convert items elements to list of microblog data in items_data""" + d = self._p.trans_items_data_d( + items_data, + # FIXME: service and node should be used here + partial(self.item_2_mb_data, client), + serialise=True + ) + d.addCallback(lambda serialised: ("", serialised)) + return d + + d = self._p.get_rt_results( + session_id, + on_success=onSuccess, + on_error=lambda failure: (str(failure.value), ([], {})), + profile=client.profile, + ) + d.addCallback( + lambda ret: ( + ret[0], + [ + (service.full(), node, failure, items, metadata) + for (service, node), (success, (failure, (items, metadata))) in ret[ + 1 + ].items() + ], + ) + ) + return d + + def _mb_get_from_many(self, publishers_type, publishers, max_items=10, extra_dict=None, + profile_key=C.PROF_KEY_NONE): + """ + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit + """ + max_items = None if max_items == C.NO_LIMIT else max_items + publishers_type, publishers = self._check_publishers(publishers_type, publishers) + extra = self._p.parse_extra(extra_dict) + return self.mb_get_from_many( + publishers_type, + publishers, + max_items, + extra.rsm_request, + extra.extra, + profile_key, + ) + + def mb_get_from_many(self, publishers_type, publishers, max_items=None, rsm_request=None, + extra=None, profile_key=C.PROF_KEY_NONE): + """Get the published microblogs for a list of groups or jids + + @param publishers_type (str): type of the list of publishers (one of "GROUP" or + "JID" or "ALL") + @param publishers (list): list of publishers, according to publishers_type (list + of groups or list of jids) + @param max_items (int): optional limit on the number of retrieved items. + @param rsm_request (rsm.RSMRequest): RSM request data, common to all publishers + @param extra (dict): Extra data + @param profile_key: profile key + @return (str): RT Deferred session id + """ + # XXX: extra is unused here so far + client, node_data = self._get_client_and_node_data( + publishers_type, publishers, profile_key + ) + return self._p.get_from_many( + node_data, max_items, rsm_request, profile_key=profile_key + ) + + # comments # + + def _mb_get_from_many_with_comments_rt_result_serialise(self, data): + """Serialisation of result + + This is probably the longest method name of whole SàT ecosystem ^^ + @param data(dict): data as received by rt_sessions + @return (tuple): see [_mb_get_from_many_with_comments_rt_result] + """ + ret = [] + data_iter = iter(data[1].items()) + for (service, node), (success, (failure_, (items_data, metadata))) in data_iter: + items = [] + for item, item_metadata in items_data: + item = data_format.serialise(item) + items.append((item, item_metadata)) + ret.append(( + service.full(), + node, + failure_, + items, + metadata)) + + return data[0], ret + + def _mb_get_from_many_with_comments_rt_result(self, session_id, + profile_key=C.PROF_KEY_DEFAULT): + """Get real-time results for [mb_get_from_many_with_comments] session + + @param session_id: id of the real-time deferred session + @param return (tuple): (remaining, results) where: + - remaining is the number of still expected results + - results is a list of 5-tuple with + - service (unicode): pubsub service + - node (unicode): pubsub node + - failure (unicode): empty string in case of success, error message else + - items(list[tuple(dict, list)]): list of 2-tuple with + - item(dict): item microblog data + - comments_list(list[tuple]): list of 5-tuple with + - service (unicode): pubsub service where the comments node is + - node (unicode): comments node + - failure (unicode): empty in case of success, else error message + - comments(list[dict]): list of microblog data + - comments_metadata(dict): metadata of the comment node + - metadata(dict): original node metadata + @param profile_key: %(doc_profile_key)s + """ + profile = self.host.get_client(profile_key).profile + d = self.rt_sessions.get_results(session_id, profile=profile) + d.addCallback(self._mb_get_from_many_with_comments_rt_result_serialise) + return d + + def _mb_get_from_many_with_comments(self, publishers_type, publishers, max_items=10, + max_comments=C.NO_LIMIT, extra_dict=None, + extra_comments_dict=None, profile_key=C.PROF_KEY_NONE): + """ + @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit + @param max_comments(int): maximum number of comments to get, C.NO_LIMIT for no + limit + """ + max_items = None if max_items == C.NO_LIMIT else max_items + max_comments = None if max_comments == C.NO_LIMIT else max_comments + publishers_type, publishers = self._check_publishers(publishers_type, publishers) + extra = self._p.parse_extra(extra_dict) + extra_comments = self._p.parse_extra(extra_comments_dict) + return self.mb_get_from_many_with_comments( + publishers_type, + publishers, + max_items, + max_comments or None, + extra.rsm_request, + extra.extra, + extra_comments.rsm_request, + extra_comments.extra, + profile_key, + ) + + def mb_get_from_many_with_comments(self, publishers_type, publishers, max_items=None, + max_comments=None, rsm_request=None, extra=None, + rsm_comments=None, extra_comments=None, + profile_key=C.PROF_KEY_NONE): + """Helper method to get the microblogs and their comments in one shot + + @param publishers_type (str): type of the list of publishers (one of "GROUP" or + "JID" or "ALL") + @param publishers (list): list of publishers, according to publishers_type (list + of groups or list of jids) + @param max_items (int): optional limit on the number of retrieved items. + @param max_comments (int): maximum number of comments to retrieve + @param rsm_request (rsm.RSMRequest): RSM request for initial items only + @param extra (dict): extra configuration for initial items only + @param rsm_comments (rsm.RSMRequest): RSM request for comments only + @param extra_comments (dict): extra configuration for comments only + @param profile_key: profile key + @return (str): RT Deferred session id + """ + # XXX: this method seems complicated because it do a couple of treatments + # to serialise and associate the data, but it make life in frontends side + # a lot easier + + client, node_data = self._get_client_and_node_data( + publishers_type, publishers, profile_key + ) + + def get_comments(items_data): + """Retrieve comments and add them to the items_data + + @param items_data: serialised items data + @return (defer.Deferred): list of items where each item is associated + with a list of comments data (service, node, list of items, metadata) + """ + items, metadata = items_data + items_dlist = [] # deferred list for items + for item in items: + dlist = [] # deferred list for comments + for key, value in item.items(): + # we look for comments + if key.startswith("comments") and key.endswith("_service"): + prefix = key[: key.find("_")] + service_s = value + service = jid.JID(service_s) + node = item["{}{}".format(prefix, "_node")] + # time to get the comments + d = defer.ensureDeferred( + self._p.get_items( + client, + service, + node, + max_comments, + rsm_request=rsm_comments, + extra=extra_comments, + ) + ) + # then serialise + d.addCallback( + lambda items_data: self._p.trans_items_data_d( + items_data, + partial( + self.item_2_mb_data, client, service=service, node=node + ), + serialise=True + ) + ) + # with failure handling + d.addCallback( + lambda serialised_items_data: ("",) + serialised_items_data + ) + d.addErrback(lambda failure: (str(failure.value), [], {})) + # and associate with service/node (needed if there are several + # comments nodes) + d.addCallback( + lambda serialised, service_s=service_s, node=node: ( + service_s, + node, + ) + + serialised + ) + dlist.append(d) + # we get the comments + comments_d = defer.gatherResults(dlist) + # and add them to the item data + comments_d.addCallback( + lambda comments_data, item=item: (item, comments_data) + ) + items_dlist.append(comments_d) + # we gather the items + comments in a list + items_d = defer.gatherResults(items_dlist) + # and add the metadata + items_d.addCallback(lambda items_completed: (items_completed, metadata)) + return items_d + + deferreds = {} + for service, node in node_data: + d = deferreds[(service, node)] = defer.ensureDeferred(self._p.get_items( + client, service, node, max_items, rsm_request=rsm_request, extra=extra + )) + d.addCallback( + lambda items_data: self._p.trans_items_data_d( + items_data, + partial(self.item_2_mb_data, client, service=service, node=node), + ) + ) + d.addCallback(get_comments) + d.addCallback(lambda items_comments_data: ("", items_comments_data)) + d.addErrback(lambda failure: (str(failure.value), ([], {}))) + + return self.rt_sessions.new_session(deferreds, client.profile) + + +@implementer(iwokkel.IDisco) +class XEP_0277_handler(XMPPHandler): + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_MICROBLOG)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []