Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0277.py @ 4387:a6270030968d default tip
doc (components): Document the handling of mailing lists in Email Gateway:
fix 462
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 03 Aug 2025 23:45:48 +0200 |
parents | 7c1d77efc752 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for microblogging over XMPP (xep-0277) # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time from urllib.parse import quote, urlparse import dateutil import calendar from mimetypes import guess_type from secrets import token_urlsafe from typing import List, Optional, Dict, Self, Tuple, Any, Dict, cast from functools import partial from pydantic import BaseModel, ConfigDict, Field, model_validator import shortuuid from twisted.words.protocols.jabber import jid, error from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from twisted.internet import defer from twisted.python import failure # XXX: sat_tmp.wokkel.pubsub is actually used instead of wokkel version from wokkel import pubsub from wokkel import disco, iwokkel, rsm from zope.interface import implementer from libervia.backend import G from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger from libervia.backend.core import exceptions from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.models.types import JIDType from libervia.backend.plugins.plugin_misc_text_syntaxes import TextSyntaxes from libervia.backend.plugins.plugin_pubsub_cache import PubsubCache from libervia.backend.plugins.plugin_xep_0060 import XEP_0060 from libervia.backend.tools import xml_tools from libervia.backend.tools import sat_defer from libervia.backend.tools import utils from libervia.backend.tools.common import data_format from libervia.backend.tools.common import uri as xmpp_uri from libervia.backend.tools.common import regex log = getLogger(__name__) NS_MICROBLOG = "urn:xmpp:microblog:0" NS_ATOM = "http://www.w3.org/2005/Atom" NS_ATOM_THREADING = "http://purl.org/syndication/thread/1.0" NS_PUBSUB_EVENT = f"{pubsub.NS_PUBSUB}#event" NS_COMMENT_PREFIX = f"{NS_MICROBLOG}:comments/" PLUGIN_INFO = { C.PI_NAME: "Microblogging over XMPP Plugin", C.PI_IMPORT_NAME: "XEP-0277", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0277"], C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0060", "TEXT_SYNTAXES"], C.PI_RECOMMENDATIONS: ["XEP-0059", "EXTRA-PEP", "PUBSUB_CACHE"], C.PI_MAIN: "XEP_0277", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of microblogging Protocol"""), } class NodeAccessChangeException(Exception): pass class Comment(BaseModel): uri: str = "" service: JIDType | None = None node: str = "" @model_validator(mode='after') def set_uri_if_missing(self) -> Self: if not self.uri and self.service is not None and self.node: self.uri = xmpp_uri.build_xmpp_uri( "pubsub", path=self.service.full(), node=self.node ) return self class InReplyTo(BaseModel): ref: str href: str|None type: str|None = None class Attachment(BaseModel): sources: list[dict[str, str]] = Field(default_factory=list) external: bool = Field( default=False, description="If True, this is a link to an external data, such as a website, " "otherwise it's an attached file.", ) media_type: str | None = None desc: str | None = None size: int | None = None class AltLink(BaseModel): url: str media_type: str | None = None class Repeated(BaseModel): by: JIDType uri: str class MbExtra(BaseModel): """Extra data for microblog posts that accepts arbitrary keys""" model_config = ConfigDict(extra="allow") repeated: Repeated | None = None attachments: list[Attachment] = Field(default_factory=list) alt_links: list[AltLink] = Field(default_factory=list) # FIXME: Must be moved to pubsub signing plugin. encrypted: bool = False encrypted_for: dict | None = None signed: bool = False class MbData(BaseModel): id: str | None = None atom_id: str | None = None service: JIDType | None node: str = NS_MICROBLOG uri: str | None = None user_friendly_id: bool = Field(default=True, exclude=True) user_friendly_id_suffix: bool = Field(default=True, exclude=True) published: float | None = None updated: float | None = None language: str | None = None title: str | None = None title_xhtml: str | None = None title_rich: str | None = Field(default=None, exclude=True) content: str | None = None content_xhtml: str | None = None content_rich: str | None = Field(default=None, exclude=True) author: str | None = None author_jid: JIDType | None = None author_email: str | None = None author_jid_verified: bool = False allow_comments: bool | None = None comments: list[Comment] = Field(default_factory=list) in_reply_tos: list[InReplyTo]|None = None tags: list[str] = Field(default_factory=list) extra: MbExtra = Field(default_factory=MbExtra) @classmethod async def from_element( cls, client: SatXMPPEntity, item_elt: domish.Element, service: jid.JID | None = None, # FIXME: node is Optional until all calls to item_2_mb_data set properly service # and node. Once done, the Optional must be removed here node: str | None = None, ) -> Self: """Create an MbData instance from a microblog XML element. @param item_elt: Microblog item element. @param service: PubSub service where the item has been retrieved. Profile's PEP is used when service is None. @param node: PubSub node where the item has been retrieved. If None, "uri" won't be set. @return: MbData instance. @raise exceptions.DataError: if the XML is malformed or missing required data """ text_syntaxes = cast(TextSyntaxes, G.host.plugins["TEXT_SYNTAXES"]) if service is None: service = client.jid.userhostJID() # there can be no id for transient nodes id_ = item_elt.getAttribute("id") or "" if item_elt.uri not in (pubsub.NS_PUBSUB, pubsub.NS_PUBSUB_EVENT): raise exceptions.DataError( f"Unsupported namespace {item_elt.uri} in pubsub item {id_!r}." ) try: entry_elt = next(item_elt.elements(NS_ATOM, "entry")) except StopIteration: raise exceptions.DataError(f"No atom entry found in the pubsub item {id_!r}.") # Initialize basic data data = { "id": id_, "service": service, "node": node, } # Set URI if node is provided # FIXME: node should alway be set in the future, check FIXME in method signature. if node is not None: data["uri"] = xmpp_uri.build_xmpp_uri( "pubsub", path=service.full(), node=node, item=id_, ) # Language. try: data["language"] = entry_elt[(C.NS_XML, "lang")].strip() except KeyError: pass # atom:id. try: id_elt = next(entry_elt.elements(NS_ATOM, "id")) except StopIteration: log.warning( f"No atom id found in the pubsub item {id_}, this is not standard!" ) data["atom_id"] = "" else: data["atom_id"] = str(id_elt) # Title(s) and content(s). # FIXME: ATOM and XEP-0277 only allow 1 <title/> element but in the wild we have # some blogs with several ones so we don't respect the standard for now (it # doesn't break anything anyway), and we'll find a better option later. title_elts = list(entry_elt.elements(NS_ATOM, "title")) if not title_elts: raise exceptions.DataError(f"No atom title found in the pubsub item {id_!r}.") for title_elt in title_elts: await cls._parse_content_element(title_elt, data) # Content(s) # FIXME: As for <title/>, Atom only authorise at most 1 content but XEP-0277 # allows several ones. So for no we handle as if more than one can be present. for content_elt in entry_elt.elements(NS_ATOM, "content"): await cls._parse_content_element(content_elt, data) # Ensure text content exists when only xhtml is provided. for key in ("title", "content"): if key not in data and f"{key}_xhtml" in data: log.warning(f"item {id_!r} provides {key}_xhtml data but not a text one.") data[key] = await text_syntaxes.convert( data[f"{key}_xhtml"], text_syntaxes.SYNTAX_XHTML, text_syntaxes.SYNTAX_TEXT, False, ) if "content" not in data: # Use the atom title data as the microblog body content. data["content"] = data["title"] del data["title"] if "title_xhtml" in data: data["content_xhtml"] = data["title_xhtml"] del data["title_xhtml"] # Dates. try: updated_elt = next(entry_elt.elements(NS_ATOM, "updated")) except StopIteration: raise exceptions.DataError( f"No atom updated element found in the pubsub item {id_!r}." ) data["updated"] = calendar.timegm( dateutil.parser.parse(str(updated_elt)).utctimetuple() ) try: published_elt = next(entry_elt.elements(NS_ATOM, "published")) except StopIteration: data["published"] = data["updated"] else: data["published"] = calendar.timegm( dateutil.parser.parse(str(published_elt)).utctimetuple() ) # Initialize extra data extra = MbExtra() # Links (comments, attachments, etc.) comments = [] for link_elt in entry_elt.elements(NS_ATOM, "link"): href = link_elt.getAttribute("href") if not href: log.warning(f"Missing href in <link> element: {link_elt.toXml()}.") continue rel = link_elt.getAttribute("rel") if rel == "replies" and link_elt.getAttribute("title") == "comments": try: comment_service, comment_node = XEP_0277.parse_comment_url(href) except exceptions.DataError as e: log.exception(f"Can't parse comments url: {e}.") continue else: comments.append( Comment(uri=href, service=comment_service, node=comment_node) ) elif rel == "via": try: repeater_jid = jid.JID(item_elt["publisher"]) except (KeyError, RuntimeError): # We look for stanza element which is at the root, meaning that it has # not parent. top_elt = item_elt.parent assert top_elt is not None while top_elt.parent is not None: top_elt = top_elt.parent try: repeater_jid = jid.JID(top_elt["from"]) except (AttributeError, RuntimeError): log.error(f"Can't find repeater of the post: {item_elt.toXml()}.") continue extra.repeated = Repeated(by=repeater_jid, uri=href) elif rel in ("related", "enclosure"): extra.attachments.append(cls._parse_attachment(link_elt, href, rel)) elif rel == "alternate": media_type = link_elt.getAttribute("type") or guess_type(href)[0] if not media_type: log.warning( "Invalid or missing media type for alternate link: " f"{link_elt.toXml()}." ) extra.alt_links.append(AltLink(url=href, media_type=media_type)) else: log.warning(f"Unmanaged link element: {link_elt.toXml()}.") if comments: data["comments"] = comments # Reply-To in_reply_tos = [] for in_reply_to_elt in entry_elt.elements(NS_ATOM_THREADING, "in-reply-to"): in_reply_to = InReplyTo( ref=in_reply_to_elt.getAttribute("ref", ""), href=in_reply_to_elt.getAttribute("href"), type=in_reply_to_elt.getAttribute("type") ) if not in_reply_to_elt.ref: log.warning( "No ref in <in-reply-to> element, this is not valid, ignoring: " f"in_reply_to_elt.toXml()" ) continue in_reply_tos.append(in_reply_to) if in_reply_tos: data["in_reply_tos"] = in_reply_tos # Author information. author_data = cls._parse_author(entry_elt, item_elt, id_, extra) data.update(author_data) # Tags/categories data["tags"] = [ term for category_elt in entry_elt.elements(NS_ATOM, "category") if (term := category_elt.getAttribute("term")) ] data["extra"] = extra ## the trigger ## # if other plugins have things to add or change G.host.trigger.point("XEP-0277_item2data", item_elt, entry_elt, data) return cls(**data) @staticmethod async def _parse_content_element(elem: domish.Element, data: dict[str, Any]) -> None: """Parse title/content elements and add to data dict.""" type_ = elem.getAttribute("type") if type_ == "xhtml": data_elt = elem.firstChildElement() if data_elt is None: raise exceptions.DataError( "XHML content not wrapped in a <div/> element, this is not standard!" ) if data_elt.uri != C.NS_XHTML: raise exceptions.DataError( "Content of type XHTML must declare its namespace!" ) # We clean the content to avoid anything dangerous. text_syntaxes = cast(TextSyntaxes, G.host.plugins["TEXT_SYNTAXES"]) data[f"{elem.name}_xhtml"] = text_syntaxes.clean_xhtml(data_elt.toXml()) else: data[elem.name] = str(elem) @classmethod def _parse_attachment( cls, link_elt: domish.Element, href: str, rel: str ) -> Attachment: """Parse an attachment (related/enclosure) link.""" attachment = Attachment(sources=[{"url": href}], external=(rel == "related")) if media_type := link_elt.getAttribute("type"): attachment.media_type = media_type if desc := link_elt.getAttribute("title"): attachment.desc = desc try: attachment.size = int(link_elt["length"]) except (KeyError, ValueError): pass if not attachment.media_type: if guessed_type := guess_type(href, False)[0]: attachment.media_type = guessed_type return attachment @classmethod def _parse_author( cls, entry_elt: domish.Element, item_elt: domish.Element, item_id: str, extra: MbExtra, ) -> dict[str, Any]: """Parse author information from the entry.""" author_data: dict[str, Any] = {} publisher = item_elt.getAttribute("publisher") try: author_elt = next(entry_elt.elements(NS_ATOM, "author")) except StopIteration: log.debug(f"Can't find author element in item {item_id!r}") if publisher: author_data["author_jid"] = publisher author_data["author_jid_verified"] = True return author_data # Name. try: name_elt = next(author_elt.elements(NS_ATOM, "name")) except StopIteration: log.warning("No name element found in author element of item {item_id!r}.") else: author_data["author"] = str(name_elt).strip() # Parse URI. try: uri_elt = next(author_elt.elements(NS_ATOM, "uri")) except StopIteration: log.debug(f"No uri element found in author element of item {item_id!r}.") if publisher: author_data["author_jid"] = publisher author_data["author_jid_verified"] = True else: iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM) author_data["author_jid"] = iq_elt["from"] else: uri = str(uri_elt) if uri.startswith("xmpp:"): author_data["author_jid"] = uri[5:] or None else: author_data["author_jid"] = publisher or None if publisher: try: author_data["author_jid_verified"] = ( jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID() ) except Exception: author_data["author_jid_verified"] = False if not author_data["author_jid_verified"] and not extra.repeated is None: log.warning( 'Item\'s atom:uri differs from "publisher" attribute, spoofing ' f"attempt? {uri=} {publisher=}" ) # If no author name but we have JID, use the username. if "author" not in author_data and author_data.get("author_jid"): try: author_data["author"] = jid.JID(author_data["author_jid"]).user except Exception as e: log.warning(f"Couldn't parse author JID: {e}.") # Email. try: email_elt = next(author_elt.elements(NS_ATOM, "email")) except StopIteration: pass else: author_data["author_email"] = str(email_elt) return author_data async def to_element( self, client: SatXMPPEntity, ) -> pubsub.Item: """Convert this MbData instance to a PubSub item Element. @param client: Client instance. @return: PubSub Item containing the Atom entry element """ text_syntaxes = cast(TextSyntaxes, G.host.plugins["TEXT_SYNTAXES"]) entry_elt = domish.Element((NS_ATOM, "entry")) ## Language ## if self.language: entry_elt[(C.NS_XML, "lang")] = self.language.strip() ## Content and Title ## for elem_name in ("title", "content"): for type_ in ["", "_rich", "_xhtml"]: attr = f"{elem_name}{type_}" value = getattr(self, attr) if value is not None: elem = entry_elt.addElement(elem_name) if type_: if type_ == "_rich": # Convert input from current syntax to XHTML if getattr(self, f"{elem_name}_xhtml") != None: raise failure.Failure( exceptions.DataError( "Can't have xhtml and rich content at the same " "time" ) ) content_xhtml = await text_syntaxes.convert( value, text_syntaxes.get_current_syntax(client.profile), "XHTML", ) else: content_xhtml = value div_elt = xml_tools.ElementParser()( content_xhtml, namespace=C.NS_XHTML ) if ( div_elt.name != "div" or div_elt.uri != C.NS_XHTML or div_elt.attributes ): # We need a wrapping <div/> at the top with XHTML namespace. wrap_div_elt = domish.Element((C.NS_XHTML, "div")) wrap_div_elt.addChild(div_elt) div_elt = wrap_div_elt elem.addChild(div_elt) elem["type"] = "xhtml" if getattr(self, elem_name) is None: # There is no text content. It is mandatory so we create one # from xhtml content. elem_txt = entry_elt.addElement(elem_name) text_content = await text_syntaxes.convert( content_xhtml, text_syntaxes.SYNTAX_XHTML, text_syntaxes.SYNTAX_TEXT, False, ) elem_txt.addContent(text_content) elem_txt["type"] = "text" else: # Raw text only needs to be escaped to get HTML-safe sequence. elem.addContent(value) elem["type"] = "text" # Ensure we have at least a title. if not any(entry_elt.elements(NS_ATOM, "title")): if any(entry_elt.elements(NS_ATOM, "content")): for elem in entry_elt.elements(NS_ATOM, "content"): elem.name = "title" else: raise exceptions.DataError( "Atom entry must have at least a title or content element" ) ## Attachments ## for attachment in self.extra.attachments: url = None if attachment.sources: for source in attachment.sources: if "url" in source: url = source["url"] break if not url: log.warning(f'"url" missing in attachment, ignoring: {attachment}') continue if not url.startswith("http"): log.warning(f"Non HTTP URL in attachment, ignoring: {attachment}.") continue link_elt = entry_elt.addElement("link") # XXX: "uri" is set in self._manage_comments if not already existing link_elt["href"] = url link_elt["rel"] = "related" if attachment.external else "enclosure" if attachment.media_type: link_elt["type"] = attachment.media_type if attachment.desc: link_elt["title"] = attachment.desc if attachment.size: link_elt["length"] = str(attachment.size) ## Alternate Links ## for alt_link in self.extra.alt_links: if self.service is None or self.node is None or self.id is None: log.warning( f"Can't compute alternate link due to missing service, node or ID." ) continue link_elt = entry_elt.addElement("link") url = link_elt["href"] = alt_link.url.format( service=quote(self.service.full(), safe=""), node=quote(self.node, safe=""), item=quote(self.id, safe=""), ) link_elt["rel"] = "alternate" if alt_link.media_type: media_type = alt_link.media_type else: parsed_url = urlparse(url) if parsed_url.scheme in ["http", "https"]: media_type = "text/html" else: media_type = guess_type(url)[0] or "application/octet-stream" link_elt["type"] = media_type ## Author ## author_elt = entry_elt.addElement("author") author_name = self.author or (self.author_jid.user if self.author_jid else "") author_elt.addElement("name", content=author_name) author_jid = self.author_jid or client.jid author_elt.addElement("uri", content=f"xmpp:{author_jid.userhost()}") if self.author_email: author_elt.addElement("email", content=self.author_email) ## Dates ## current_time = time.time() entry_elt.addElement( "updated", content=utils.xmpp_date( float(self.updated if self.updated else current_time) ), ) entry_elt.addElement( "published", content=utils.xmpp_date( float(self.published if self.published else current_time) ), ) ## Tags ## for tag in self.tags: category_elt = entry_elt.addElement("category") category_elt["term"] = tag ## ID ## if self.atom_id is None: if self.id is not None: if self.service is not None: self.atom_id = xmpp_uri.build_xmpp_uri( "pubsub", path=self.service.full(), node=self.node, item=self.id or "", ) else: self.atom_id = self.id else: self.atom_id = shortuuid.uuid() entry_elt.addElement("id", content=self.atom_id) ## Comments ## for comment in self.comments: link_elt = entry_elt.addElement("link") # XXX: "uri" is set in self._manage_comments if not already existing. if not comment.uri: log.warning("Missing URI for {comment}, ignoring.") continue link_elt["href"] = comment.uri link_elt["rel"] = "replies" link_elt["title"] = "comments" ## In-Reply-Tos ### if self.in_reply_tos is not None: for in_reply_to in self.in_reply_tos: in_reply_to_elt = entry_elt.addElement((NS_ATOM_THREADING, "in-reply-to")) in_reply_to_elt["ref"] = in_reply_to.ref if href := in_reply_to.href is not None: in_reply_to_elt["href"] = href if type_ := in_reply_to.type is not None: in_reply_to_elt["type"] = type_ ## Reposts ## if self.extra.repeated: link_elt = entry_elt.addElement("link") link_elt["rel"] = "via" link_elt["href"] = self.extra.repeated.uri ## Final item building ## item_elt = pubsub.Item(id=self.id, payload=entry_elt) ## Trigger ## G.host.trigger.point("XEP-0277_data2entry", client, self, entry_elt, item_elt) return item_elt class XEP_0277: namespace = NS_MICROBLOG NS_ATOM = NS_ATOM def __init__(self, host): log.info(_("Microblogging plugin initialization")) self.host = host host.register_namespace("microblog", NS_MICROBLOG) self._p = cast(XEP_0060, self.host.plugins["XEP-0060"]) ps_cache = cast(PubsubCache, self.host.plugins.get("PUBSUB_CACHE")) if ps_cache is not None: ps_cache.register_analyser( { "name": "XEP-0277", "node": NS_MICROBLOG, "namespace": NS_ATOM, "type": "blog", "to_sync": True, "parser": self.item_2_mb_data, "match_cb": self._cache_node_match_cb, } ) self.rt_sessions = sat_defer.RTDeferredSessions() self._p.add_managed_node(NS_MICROBLOG, items_cb=self._items_received) host.bridge.add_method( "mb_send", ".plugin", in_sign="ss", out_sign="s", method=self._mb_send, async_=True, ) host.bridge.add_method( "mb_repeat", ".plugin", in_sign="sssss", out_sign="s", method=self._mb_repeat, async_=True, ) host.bridge.add_method( "mb_preview", ".plugin", in_sign="ssss", out_sign="s", method=self._mb_preview, async_=True, ) host.bridge.add_method( "mb_retract", ".plugin", in_sign="ssss", out_sign="", method=self._mb_retract, async_=True, ) host.bridge.add_method( "mb_get", ".plugin", in_sign="ssiasss", out_sign="s", method=self._mb_get, async_=True, ) host.bridge.add_method( "mb_rename", ".plugin", in_sign="sssss", out_sign="", method=self._mb_rename, async_=True, ) host.bridge.add_method( "mb_access_set", ".plugin", in_sign="ss", out_sign="", method=self.mb_access_set, async_=True, ) host.bridge.add_method( "mb_subscribe_to_many", ".plugin", in_sign="sass", out_sign="s", method=self._mb_subscribe_to_many, ) host.bridge.add_method( "mb_get_from_many_rt_result", ".plugin", in_sign="ss", out_sign="(ua(sssasa{ss}))", method=self._mb_get_from_many_rt_result, async_=True, ) host.bridge.add_method( "mb_get_from_many", ".plugin", in_sign="sasia{ss}s", out_sign="s", method=self._mb_get_from_many, ) host.bridge.add_method( "mb_get_from_many_with_comments_rt_result", ".plugin", in_sign="ss", out_sign="(ua(sssa(sa(sssasa{ss}))a{ss}))", method=self._mb_get_from_many_with_comments_rt_result, async_=True, ) host.bridge.add_method( "mb_get_from_many_with_comments", ".plugin", in_sign="sasiia{ss}a{ss}s", out_sign="s", method=self._mb_get_from_many_with_comments, ) host.bridge.add_method( "mb_is_comment_node", ".plugin", in_sign="s", out_sign="b", method=self.is_comment_node, ) def get_handler(self, client): return XEP_0277_handler() def _cache_node_match_cb( self, client: SatXMPPEntity, analyse: dict, ) -> None: """Check is analysed node is a comment and fill analyse accordingly""" if analyse["node"].startswith(NS_COMMENT_PREFIX): analyse["subtype"] = "comment" def _check_features_cb(self, available): return {"available": C.BOOL_TRUE} def _check_features_eb(self, fail): return {"available": C.BOOL_FALSE} def features_get(self, profile): client = self.host.get_client(profile) d = self.host.check_features(client, [], identity=("pubsub", "pep")) d.addCallbacks(self._check_features_cb, self._check_features_eb) return d ## plugin management methods ## def _items_received(self, client, itemsEvent): """Callback which manage items notifications (publish + retract)""" def manage_item(data, event): self.host.bridge.ps_event( C.PS_MICROBLOG, itemsEvent.sender.full(), itemsEvent.nodeIdentifier, event, data_format.serialise(data), client.profile, ) for item in itemsEvent.items: if item.name == C.PS_ITEM: # FIXME: service and node should be used here self.item_2_mb_data(client, item, None, None).addCallbacks( manage_item, lambda failure: None, (C.PS_PUBLISH,) ) elif item.name == C.PS_RETRACT: manage_item({"id": item["id"]}, C.PS_RETRACT) else: raise exceptions.InternalError("Invalid event value") ## data/item transformation ## async def item_2_mb_data( self, client: SatXMPPEntity, item_elt: domish.Element, service: jid.JID | None, # FIXME: node is Optional until all calls to item_2_mb_data set properly service # and node. Once done, the Optional must be removed here node: str | None, ) -> dict: """Convert an XML Item to microblog data @param item_elt: microblog item element @param service: PubSub service where the item has been retrieved profile's PEP is used when service is None @param node: PubSub node where the item has been retrieved if None, "uri" won't be set @return: microblog data """ mb_data = await MbData.from_element(client, item_elt, service, node) return mb_data.model_dump(exclude_none=True) async def mb_data_2_entry_elt( self, client: SatXMPPEntity, mb_data: dict, ) -> pubsub.Item: """Convert a data dict to en entry usable to create an item @param mb_data: data dict as given by bridge method. @param item_id: id of the item to use @param service: pubsub service where the item is sent Needed to construct Atom id @param node: pubsub node where the item is sent Needed to construct Atom id @return: deferred which fire domish.Element """ entry_elt = domish.Element((NS_ATOM, "entry")) extra = mb_data.get("extra", {}) ## language ## if "language" in mb_data: entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip() ## content and title ## text_syntaxes = cast(TextSyntaxes, self.host.plugins["TEXT_SYNTAXES"]) for elem_name in ("title", "content"): for type_ in ["", "_rich", "_xhtml"]: attr = f"{elem_name}{type_}" if attr in mb_data: elem = entry_elt.addElement(elem_name) if type_: if type_ == "_rich": # convert input from current syntax to XHTML xml_content = await text_syntaxes.convert( mb_data[attr], text_syntaxes.get_current_syntax(client.profile), "XHTML", ) if f"{elem_name}_xhtml" in mb_data: raise failure.Failure( exceptions.DataError( _( "Can't have xhtml and rich content at the same time" ) ) ) else: xml_content = mb_data[attr] div_elt = xml_tools.ElementParser()( xml_content, namespace=C.NS_XHTML ) if ( div_elt.name != "div" or div_elt.uri != C.NS_XHTML or div_elt.attributes ): # we need a wrapping <div/> at the top with XHTML namespace wrap_div_elt = domish.Element((C.NS_XHTML, "div")) wrap_div_elt.addChild(div_elt) div_elt = wrap_div_elt elem.addChild(div_elt) elem["type"] = "xhtml" if elem_name not in mb_data: # there is raw text content, which is mandatory # so we create one from xhtml content elem_txt = entry_elt.addElement(elem_name) text_content = await self.host.plugins[ "TEXT_SYNTAXES" ].convert( xml_content, self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML, self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT, False, ) elem_txt.addContent(text_content) elem_txt["type"] = "text" else: # raw text only needs to be escaped to get HTML-safe sequence elem.addContent(mb_data[attr]) elem["type"] = "text" try: next(entry_elt.elements(NS_ATOM, "title")) except StopIteration: # we have no title element which is mandatory # so we transform content element to title elems = list(entry_elt.elements(NS_ATOM, "content")) if not elems: raise exceptions.DataError( "There must be at least one content or title element" ) for elem in elems: elem.name = "title" ## attachments ## attachments = extra.get(C.KEY_ATTACHMENTS) if attachments: for attachment in attachments: try: url = attachment["url"] except KeyError: try: url = next(s["url"] for s in attachment["sources"] if "url" in s) except (StopIteration, KeyError): log.warning( f'"url" missing in attachment, ignoring: {attachment}' ) continue if not url.startswith("http"): log.warning(f"non HTTP URL in attachment, ignoring: {attachment}") continue link_elt = entry_elt.addElement("link") # XXX: "uri" is set in self._manage_comments if not already existing link_elt["href"] = url if attachment.get("external", False): # this is a link to an external data such as a website link_elt["rel"] = "related" else: # this is an attached file link_elt["rel"] = "enclosure" for key, attr in ( ("media_type", "type"), ("desc", "title"), ("size", "lenght"), ): value = attachment.get(key) if value: link_elt[attr] = str(value) ## alternate links ## alt_links = extra.get("alt_links") if alt_links: for link_data in alt_links: url_template = link_data["url"] url = url_template.format( service=quote(service.full(), safe=""), node=quote(node, safe=""), item=quote(item_id, safe=""), ) link_elt = entry_elt.addElement("link") link_elt["href"] = url link_elt["rel"] = "alternate" media_type = link_data.get("media_type") if not media_type: parsed_url = urlparse(url) if parsed_url.scheme in ["http", "https"]: media_type = "text/html" else: media_type = guess_type(url)[0] or "application/octet-stream" link_elt["type"] = media_type ## author ## author_elt = entry_elt.addElement("author") try: author_name = mb_data["author"] except KeyError: # FIXME: must use better name author_name = client.jid.user author_elt.addElement("name", content=author_name) try: author_jid_s = mb_data["author_jid"] except KeyError: author_jid_s = client.jid.userhost() author_elt.addElement("uri", content="xmpp:{}".format(author_jid_s)) try: author_jid_s = mb_data["author_email"] except KeyError: pass ## published/updated time ## current_time = time.time() entry_elt.addElement( "updated", content=utils.xmpp_date(float(mb_data.get("updated", current_time))), ) entry_elt.addElement( "published", content=utils.xmpp_date(float(mb_data.get("published", current_time))), ) ## categories ## for tag in mb_data.get("tags", []): category_elt = entry_elt.addElement("category") category_elt["term"] = tag ## id ## entry_id = mb_data.get( "id", xmpp_uri.build_xmpp_uri( "pubsub", path=service.full() if service is not None else client.jid.userhost(), node=node, item=item_id, ), ) entry_elt.addElement("id", content=entry_id) # ## comments ## for comments_data in mb_data.get("comments", []): link_elt = entry_elt.addElement("link") # XXX: "uri" is set in self._manage_comments if not already existing try: link_elt["href"] = comments_data["uri"] except KeyError: log.warning(f"missing URI in comments data: {comments_data}") entry_elt.children.remove(link_elt) else: link_elt["rel"] = "replies" link_elt["title"] = "comments" if "repeated" in extra: try: repeated = extra["repeated"] link_elt = entry_elt.addElement("link") link_elt["rel"] = "via" link_elt["href"] = repeated["uri"] except KeyError as e: log.warning(f"invalid repeated element({e}): {extra['repeated']}") ## final item building ## item_elt = pubsub.Item(id=item_id, payload=entry_elt) ## the trigger ## # if other plugins have things to add or change self.host.trigger.point( "XEP-0277_data2entry", client, mb_data, entry_elt, item_elt ) return item_elt ## publish/preview ## def is_comment_node(self, node: str) -> bool: """Indicate if the node is prefixed with comments namespace""" return node.startswith(NS_COMMENT_PREFIX) def get_parent_item(self, item_id: str) -> str: """Return parent of a comment node @param item_id: a comment node """ if not self.is_comment_node(item_id): raise ValueError("This node is not a comment node") return item_id[len(NS_COMMENT_PREFIX) :] def get_comments_node(self, item_id): """Generate comment node @param item_id(unicode): id of the parent item @return (unicode): comment node to use """ return f"{NS_COMMENT_PREFIX}{item_id}" def get_comments_service(self, client, parent_service=None): """Get prefered PubSub service to create comment node @param pubsub_service(jid.JID, None): PubSub service of the parent item @param return((D)jid.JID, None): PubSub service to use """ if parent_service is not None: if parent_service.user: # we are on a PEP if parent_service.host == client.jid.host: # it's our server, we use already found client.pubsub_service below pass else: # other server, let's try to find a non PEP service there d = self.host.find_service_entity( client, "pubsub", "service", parent_service ) d.addCallback(lambda entity: entity or parent_service) else: # parent is already on a normal Pubsub service, we re-use it return defer.succeed(parent_service) return defer.succeed( client.pubsub_service if client.pubsub_service is not None else parent_service ) async def _manage_comments( self, client: SatXMPPEntity, mb_data: MbData, access: str | None = None ) -> None: """Check comments keys in mb_data and create comments node if necessary. If a comments node metadata is set in the mb_data['comments'] list, it is used otherwise it is generated (if allow_comments is True). @param mb_data: Microblog data. @param access: Access model. None to use same access model as parent item """ if mb_data.allow_comments is None: if mb_data.comments: mb_data.allow_comments = True else: # No comments set or requested, nothing to do. return elif mb_data.allow_comments == False: if mb_data.comments: log.warning( "Comments are not allowed but there is already a comments node, " "it may be lost: {mb_data['comments']}." ) mb_data.comments.clear() return # We have usually a single comment node, but the spec allow several, so we need to # handle this in a list. if len(mb_data.comments) == 0: # We need at least one comment node, we set an empty one for now, we'll # complete it below. mb_data.comments.append(Comment()) if access is None: # TODO: cache access models per service/node. try: parent_node_config = await self._p.getConfiguration( client, mb_data.service, mb_data.node ) except error.StanzaError as e: log.debug(f"Can't get parent node configuration: {e}") access = self._p.ACCESS_OPEN else: access = parent_node_config.get( self._p.OPT_ACCESS_MODEL, self._p.ACCESS_OPEN ) options = { self._p.OPT_ACCESS_MODEL: access, self._p.OPT_MAX_ITEMS: "max", self._p.OPT_PERSIST_ITEMS: 1, self._p.OPT_DELIVER_PAYLOADS: 1, self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, # FIXME: would it make sense to restrict publish model to subscribers? self._p.OPT_PUBLISH_MODEL: self._p.ACCESS_OPEN, } # If other plugins need to change the options. self.host.trigger.point("XEP-0277_comments", client, mb_data, options) for comments_data in mb_data.comments: if comments_data.uri: uri_service, uri_node = self.parse_comment_url(comments_data.uri) if ( comments_data.node is not None and comments_data.node != uri_node ) or ( comments_data.service is not None and comments_data.service != uri_service ): raise ValueError( f"Incoherence between comments URI ({comments_data.uri}) and " f"comments_service ({comments_data.service}) or comments_node " f"({comments_data.node})." ) comments_data.service = uri_service comments_data.node = uri_node else: if not comments_data.node: comments_data.node = self.get_comments_node(mb_data.id) if comments_data.service is None: comments_data.service = await self.get_comments_service( client, mb_data.service ) if comments_data.service is None: comments_data.service = client.jid.userhostJID() comments_data.uri = xmpp_uri.build_xmpp_uri( "pubsub", path=comments_data.service.full(), node=comments_data.node, ) try: await self._p.createNode( client, comments_data.service, comments_data.node, options ) except error.StanzaError as e: if e.condition == "conflict": log.info( "node {} already exists on service {}".format( comments_data.node, comments_data.service ) ) else: raise e else: if access == self._p.ACCESS_WHITELIST: # for whitelist access we need to copy affiliations from parent item comments_affiliations = await self._p.get_node_affiliations( client, mb_data.service, mb_data.node ) # …except for "member", that we transform to publisher # because we wants members to be able to write to comments for jid_, affiliation in list(comments_affiliations.items()): if affiliation == "member": comments_affiliations[jid_] = "publisher" await self._p.set_node_affiliations( client, comments_data.service, comments_data.node, comments_affiliations, ) def friendly_id(self, data): """Generate a user friendly id from title or content""" # TODO: rich content should be converted to plain text id_base = regex.url_friendly_text( data.title or data.title_rich or data.content or data.content_rich or "" ) if not data.user_friendly_id_suffix: return id_base else: return f"{id_base}-{token_urlsafe(3)}" def _mb_send(self, data, profile_key) -> defer.Deferred[str | None]: data = MbData.model_validate_json(data) client = self.host.get_client(profile_key) return defer.ensureDeferred(self.send(client, data)) async def send( self, client: SatXMPPEntity, data: MbData, ) -> str | None: """Send XEP-0277's microblog data. @param data: Microblog data (must include at least a "content" or a "title" key). see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details. @return: ID of the published item """ # TODO: check that all data keys are used, this would avoid sending publicly a private message # by accident (e.g. if group plugin is not loaded, and "group*" key are not used) if data.id is None: if data.user_friendly_id: data.id = self.friendly_id(data) if not data.user_friendly_id_suffix: # we have no random suffix, which can lead to conflict, so we check if # the item doesn't already exist, and change ID if it's the case. try: __, __ = await self._p.get_items( client, data.service, data.node, item_ids=[data.id] ) except exceptions.NotFound: pass else: # the item already exists log.info( f"there is already an item with ID {data.id!r}, we have to " 'set the "user_friendly_id_suffix" flag.' ) data.user_friendly_id_suffix = True data.id = self.friendly_id(data) if not data.service: data.service = client.jid.userhostJID() try: await self._manage_comments(client, data, access=None) except error.StanzaError: log.warning("Can't create comments node for item {data.id}") item_elt = await data.to_element(client) if not await self.host.trigger.async_point( "XEP-0277_send", client, item_elt, data ): return None extra = {} for key in ("encrypted", "encrypted_for", "signed"): value = getattr(data.extra, key) if value is not None: extra[key] = value await self._p.publish(client, data.service, data.node, [item_elt], extra=extra) return data.id def _mb_repeat( self, service_s: str, node: str, item: str, extra_s: str, profile_key: str ) -> defer.Deferred: service = jid.JID(service_s) if service_s else None node = node if node else NS_MICROBLOG client = self.host.get_client(profile_key) extra = data_format.deserialise(extra_s) d = defer.ensureDeferred(self.repeat(client, item, service, node, extra)) # [repeat] can return None, and we always need a str d.addCallback(lambda ret: ret or "") return d async def repeat( self, client: SatXMPPEntity, item: str, service: Optional[jid.JID] = None, node: str = NS_MICROBLOG, extra: Optional[dict] = None, ) -> Optional[str]: """Re-publish a post from somewhere else This is a feature often name "share" or "boost", it is generally used to make a publication more visible by sharing it with our own audience """ if service is None: service = client.jid.userhostJID() # we first get the post to repeat items, __ = await self._p.get_items(client, service, node, item_ids=[item]) if not items: raise exceptions.NotFound( f"no item found at node {node!r} on {service} with ID {item!r}" ) item_elt = items[0] try: entry_elt = next(item_elt.elements(NS_ATOM, "entry")) except StopIteration: raise exceptions.DataError("post to repeat is not a XEP-0277 blog item") # we want to be sure that we have an author element try: author_elt = next(entry_elt.elements(NS_ATOM, "author")) except StopIteration: author_elt = entry_elt.addElement("author") try: next(author_elt.elements(NS_ATOM, "name")) except StopIteration: author_elt.addElement("name", content=service.user) try: next(author_elt.elements(NS_ATOM, "uri")) except StopIteration: entry_elt.addElement( "uri", content=xmpp_uri.build_xmpp_uri(None, path=service.full()) ) # we add the link indicating that it's a repeated post link_elt = entry_elt.addElement("link") link_elt["rel"] = "via" link_elt["href"] = xmpp_uri.build_xmpp_uri( "pubsub", path=service.full(), node=node, item=item ) return await self._p.send_item( client, client.jid.userhostJID(), NS_MICROBLOG, entry_elt ) def _mb_preview(self, mb_data_s: str, profile_key: str) -> defer.Deferred[str]: client = self.host.get_client(profile_key) mb_data = MbData.model_validate_json(mb_data_s) d = defer.ensureDeferred(self.preview(client, mb_data)) d.addCallback(lambda data: data.model_dump_json()) d = cast(defer.Deferred[str], d) return d async def preview( self, client: SatXMPPEntity, mb_data: MbData, ) -> MbData: """Preview microblog data without publishing them. params are the same as for [send] @return: microblog data as would be retrieved from published item """ if mb_data.service is None: mb_data.service = client.jid.userhostJID() # we have to serialise then deserialise to be sure that all triggers are called item_elt = await mb_data.to_element(client) item_elt.uri = pubsub.NS_PUBSUB return await MbData.from_element(client, item_elt, mb_data.service, mb_data.node) ## retract ## def _mb_retract(self, service_jid_s, nodeIdentifier, itemIdentifier, profile_key): """Call self._p._retract_item, but use default node if node is empty""" return self._p._retract_item( service_jid_s, nodeIdentifier or NS_MICROBLOG, itemIdentifier, True, profile_key, ) ## get ## def _mb_get_serialise(self, data): items, metadata = data metadata["items"] = items return data_format.serialise(metadata) def _mb_get( self, service="", node="", max_items=10, item_ids=None, extra="", profile_key=C.PROF_KEY_NONE, ): """ @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param item_ids (list[unicode]): list of item IDs """ client = self.host.get_client(profile_key) service = jid.JID(service) if service else None max_items = None if max_items == C.NO_LIMIT else max_items extra = self._p.parse_extra(data_format.deserialise(extra)) d = defer.ensureDeferred( self.mb_get( client, service, node or None, max_items, item_ids, extra.rsm_request, extra.extra, ) ) d.addCallback(self._mb_get_serialise) return d async def mb_get( self, client: SatXMPPEntity, service: Optional[jid.JID] = None, node: Optional[str] = None, max_items: Optional[int] = 10, item_ids: Optional[List[str]] = None, rsm_request: Optional[rsm.RSMRequest] = None, extra: Optional[Dict[str, Any]] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """Get some microblogs @param service(jid.JID, None): jid of the publisher None to get profile's PEP @param node(unicode, None): node to get (or microblog node if None) @param max_items(int): maximum number of item to get, None for no limit ignored if rsm_request is set @param item_ids (list[unicode]): list of item IDs @param rsm_request (rsm.RSMRequest): RSM request data @param extra (dict): extra data @return: a deferred couple with the list of items and metadatas. """ if node is None: node = NS_MICROBLOG if rsm_request: max_items = None items_data = await self._p.get_items( client, service, node, max_items=max_items, item_ids=item_ids, rsm_request=rsm_request, extra=extra, ) mb_data_list, metadata = await self._p.trans_items_data_d( items_data, partial(self.item_2_mb_data, client, service=service, node=node) ) encrypted = metadata.pop("encrypted", None) if encrypted is not None: for mb_data in mb_data_list: try: mb_data["encrypted"] = encrypted[mb_data["id"]] except KeyError: pass return (mb_data_list, metadata) def _mb_rename(self, service, node, item_id, new_id, profile_key): return defer.ensureDeferred( self.mb_rename( self.host.get_client(profile_key), jid.JID(service) if service else None, node or None, item_id, new_id, ) ) async def mb_rename( self, client: SatXMPPEntity, service: Optional[jid.JID], node: Optional[str], item_id: str, new_id: str, ) -> None: if not node: node = NS_MICROBLOG await self._p.rename_item(client, service, node, item_id, new_id) @staticmethod def parse_comment_url(node_url): """Parse a XMPP URI Determine the fields comments_service and comments_node of a microblog data from the href attribute of an entry's link element. For example this input: xmpp:sat-pubsub.example.net?;node=urn%3Axmpp%3Acomments%3A_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asomebody%40example.net will return(JID(u'sat-pubsub.example.net'), 'urn:xmpp:comments:_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:somebody@example.net') @return (tuple[jid.JID, unicode]): service and node """ try: parsed_url = xmpp_uri.parse_xmpp_uri(node_url) service = jid.JID(parsed_url["path"]) node = parsed_url["node"] except Exception as e: raise exceptions.DataError(f"Invalid comments link: {e}") return (service, node) ## configure ## def mb_access_set(self, access="presence", profile_key=C.PROF_KEY_NONE): """Create a microblog node on PEP with given access If the node already exists, it change options @param access: Node access model, according to xep-0060 #4.5 @param profile_key: profile key """ # FIXME: check if this mehtod is need, deprecate it if not client = self.host.get_client(profile_key) _options = { self._p.OPT_ACCESS_MODEL: access, self._p.OPT_MAX_ITEMS: "max", self._p.OPT_PERSIST_ITEMS: 1, self._p.OPT_DELIVER_PAYLOADS: 1, self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, } def cb(result): # Node is created with right permission log.debug(_("Microblog node has now access %s") % access) def fatal_err(s_error): # Something went wrong log.error(_("Can't set microblog access")) raise NodeAccessChangeException() def err_cb(s_error): # If the node already exists, the condition is "conflict", # else we have an unmanaged error if s_error.value.condition == "conflict": # d = self.host.plugins["XEP-0060"].deleteNode(client, client.jid.userhostJID(), NS_MICROBLOG) # d.addCallback(lambda x: create_node().addCallback(cb).addErrback(fatal_err)) change_node_options().addCallback(cb).addErrback(fatal_err) else: fatal_err(s_error) def create_node(): return self._p.createNode( client, client.jid.userhostJID(), NS_MICROBLOG, _options ) def change_node_options(): return self._p.setOptions( client.jid.userhostJID(), NS_MICROBLOG, client.jid.userhostJID(), _options, profile_key=profile_key, ) create_node().addCallback(cb).addErrback(err_cb) ## methods to manage several stanzas/jids at once ## # common def _get_client_and_node_data(self, publishers_type, publishers, profile_key): """Helper method to construct node_data from publishers_type/publishers @param publishers_type: type of the list of publishers, one of: C.ALL: get all jids from roster, publishers is not used C.GROUP: get jids from groups C.JID: use publishers directly as list of jids @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids) @param profile_key: %(doc_profile_key)s """ client = self.host.get_client(profile_key) if publishers_type == C.JID: jids_set = set(publishers) else: jids_set = client.roster.get_jids_set(publishers_type, publishers) if publishers_type == C.ALL: try: # display messages from salut-a-toi@libervia.org or other PEP services services = self.host.plugins["EXTRA-PEP"].get_followed_entities( profile_key ) except KeyError: pass # plugin is not loaded else: if services: log.debug( "Extra PEP followed entities: %s" % ", ".join([str(service) for service in services]) ) jids_set.update(services) node_data = [] for jid_ in jids_set: node_data.append((jid_, NS_MICROBLOG)) return client, node_data def _check_publishers(self, publishers_type, publishers): """Helper method to deserialise publishers coming from bridge publishers_type(unicode): type of the list of publishers, one of: publishers: list of publishers according to type @return: deserialised (publishers_type, publishers) tuple """ if publishers_type == C.ALL: if publishers: raise failure.Failure( ValueError( "Can't use publishers with {} type".format(publishers_type) ) ) else: publishers = None elif publishers_type == C.JID: publishers[:] = [jid.JID(publisher) for publisher in publishers] return publishers_type, publishers # subscribe # def _mb_subscribe_to_many(self, publishers_type, publishers, profile_key): """ @return (str): session id: Use pubsub.getSubscribeRTResult to get the results """ publishers_type, publishers = self._check_publishers(publishers_type, publishers) return self.mb_subscribe_to_many(publishers_type, publishers, profile_key) def mb_subscribe_to_many(self, publishers_type, publishers, profile_key): """Subscribe microblogs for a list of groups or jids @param publishers_type: type of the list of publishers, one of: C.ALL: get all jids from roster, publishers is not used C.GROUP: get jids from groups C.JID: use publishers directly as list of jids @param publishers: list of publishers, according to "publishers_type" (None, list of groups or list of jids) @param profile: %(doc_profile)s @return (str): session id """ client, node_data = self._get_client_and_node_data( publishers_type, publishers, profile_key ) return self._p.subscribe_to_many( node_data, client.jid.userhostJID(), profile_key=profile_key ) # get # def _mb_get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): """Get real-time results for mb_get_from_many session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of tuple with - service (unicode): pubsub service - node (unicode): pubsub node - failure (unicode): empty string in case of success, error message else - items_data(list): data as returned by [mb_get] - items_metadata(dict): metadata as returned by [mb_get] @param profile_key: %(doc_profile_key)s """ # FIXME: check if this code must be removed. raise NotImplementedError("Legacy code to be removed.") client = self.host.get_client(profile_key) def onSuccess(items_data): """convert items elements to list of microblog data in items_data""" d = self._p.trans_items_data_d( items_data, # FIXME: service and node should be used here partial(self.item_2_mb_data, client), serialise=True, ) d.addCallback(lambda serialised: ("", serialised)) return d d = self._p.get_rt_results( session_id, on_success=onSuccess, on_error=lambda failure: (str(failure.value), ([], {})), profile=client.profile, ) d.addCallback( lambda ret: ( ret[0], [ (service.full(), node, failure, items, metadata) for (service, node), (success, (failure, (items, metadata))) in ret[ 1 ].items() ], ) ) return d def _mb_get_from_many( self, publishers_type, publishers, max_items=10, extra_dict=None, profile_key=C.PROF_KEY_NONE, ): """ @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit """ max_items = None if max_items == C.NO_LIMIT else max_items publishers_type, publishers = self._check_publishers(publishers_type, publishers) extra = self._p.parse_extra(extra_dict) return self.mb_get_from_many( publishers_type, publishers, max_items, extra.rsm_request, extra.extra, profile_key, ) def mb_get_from_many( self, publishers_type, publishers, max_items=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE, ): """Get the published microblogs for a list of groups or jids @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) @param max_items (int): optional limit on the number of retrieved items. @param rsm_request (rsm.RSMRequest): RSM request data, common to all publishers @param extra (dict): Extra data @param profile_key: profile key @return (str): RT Deferred session id """ # XXX: extra is unused here so far client, node_data = self._get_client_and_node_data( publishers_type, publishers, profile_key ) return self._p.get_from_many( node_data, max_items, rsm_request, profile_key=profile_key ) # comments # def _mb_get_from_many_with_comments_rt_result_serialise(self, data): """Serialisation of result This is probably the longest method name of whole SàT ecosystem ^^ @param data(dict): data as received by rt_sessions @return (tuple): see [_mb_get_from_many_with_comments_rt_result] """ ret = [] data_iter = iter(data[1].items()) for (service, node), (success, (failure_, (items_data, metadata))) in data_iter: items = [] for item, item_metadata in items_data: item = data_format.serialise(item) items.append((item, item_metadata)) ret.append((service.full(), node, failure_, items, metadata)) return data[0], ret def _mb_get_from_many_with_comments_rt_result( self, session_id, profile_key=C.PROF_KEY_DEFAULT ): """Get real-time results for [mb_get_from_many_with_comments] session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of 5-tuple with - service (unicode): pubsub service - node (unicode): pubsub node - failure (unicode): empty string in case of success, error message else - items(list[tuple(dict, list)]): list of 2-tuple with - item(dict): item microblog data - comments_list(list[tuple]): list of 5-tuple with - service (unicode): pubsub service where the comments node is - node (unicode): comments node - failure (unicode): empty in case of success, else error message - comments(list[dict]): list of microblog data - comments_metadata(dict): metadata of the comment node - metadata(dict): original node metadata @param profile_key: %(doc_profile_key)s """ profile = self.host.get_client(profile_key).profile d = self.rt_sessions.get_results(session_id, profile=profile) d.addCallback(self._mb_get_from_many_with_comments_rt_result_serialise) return d def _mb_get_from_many_with_comments( self, publishers_type, publishers, max_items=10, max_comments=C.NO_LIMIT, extra_dict=None, extra_comments_dict=None, profile_key=C.PROF_KEY_NONE, ): """ @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit @param max_comments(int): maximum number of comments to get, C.NO_LIMIT for no limit """ max_items = None if max_items == C.NO_LIMIT else max_items max_comments = None if max_comments == C.NO_LIMIT else max_comments publishers_type, publishers = self._check_publishers(publishers_type, publishers) extra = self._p.parse_extra(extra_dict) extra_comments = self._p.parse_extra(extra_comments_dict) return self.mb_get_from_many_with_comments( publishers_type, publishers, max_items, max_comments or None, extra.rsm_request, extra.extra, extra_comments.rsm_request, extra_comments.extra, profile_key, ) def mb_get_from_many_with_comments( self, publishers_type, publishers, max_items=None, max_comments=None, rsm_request=None, extra=None, rsm_comments=None, extra_comments=None, profile_key=C.PROF_KEY_NONE, ): """Helper method to get the microblogs and their comments in one shot @param publishers_type (str): type of the list of publishers (one of "GROUP" or "JID" or "ALL") @param publishers (list): list of publishers, according to publishers_type (list of groups or list of jids) @param max_items (int): optional limit on the number of retrieved items. @param max_comments (int): maximum number of comments to retrieve @param rsm_request (rsm.RSMRequest): RSM request for initial items only @param extra (dict): extra configuration for initial items only @param rsm_comments (rsm.RSMRequest): RSM request for comments only @param extra_comments (dict): extra configuration for comments only @param profile_key: profile key @return (str): RT Deferred session id """ # XXX: this method seems complicated because it do a couple of treatments # to serialise and associate the data, but it make life in frontends side # a lot easier client, node_data = self._get_client_and_node_data( publishers_type, publishers, profile_key ) def get_comments(items_data): """Retrieve comments and add them to the items_data @param items_data: serialised items data @return (defer.Deferred): list of items where each item is associated with a list of comments data (service, node, list of items, metadata) """ items, metadata = items_data items_dlist = [] # deferred list for items for item in items: dlist = [] # deferred list for comments for key, value in item.items(): # we look for comments if key.startswith("comments") and key.endswith("_service"): prefix = key[: key.find("_")] service_s = value service = jid.JID(service_s) node = item["{}{}".format(prefix, "_node")] # time to get the comments d = defer.ensureDeferred( self._p.get_items( client, service, node, max_comments, rsm_request=rsm_comments, extra=extra_comments, ) ) # then serialise d.addCallback( lambda items_data: self._p.trans_items_data_d( items_data, partial( self.item_2_mb_data, client, service=service, node=node, ), serialise=True, ) ) # with failure handling d.addCallback( lambda serialised_items_data: ("",) + serialised_items_data ) d.addErrback(lambda failure: (str(failure.value), [], {})) # and associate with service/node (needed if there are several # comments nodes) d.addCallback( lambda serialised, service_s=service_s, node=node: ( service_s, node, ) + serialised ) dlist.append(d) # we get the comments comments_d = defer.gatherResults(dlist) # and add them to the item data comments_d.addCallback( lambda comments_data, item=item: (item, comments_data) ) items_dlist.append(comments_d) # we gather the items + comments in a list items_d = defer.gatherResults(items_dlist) # and add the metadata items_d.addCallback(lambda items_completed: (items_completed, metadata)) return items_d deferreds = {} for service, node in node_data: d = deferreds[(service, node)] = defer.ensureDeferred( self._p.get_items( client, service, node, max_items, rsm_request=rsm_request, extra=extra ) ) d.addCallback( lambda items_data: self._p.trans_items_data_d( items_data, partial(self.item_2_mb_data, client, service=service, node=node), ) ) d.addCallback(get_comments) d.addCallback(lambda items_comments_data: ("", items_comments_data)) d.addErrback(lambda failure: (str(failure.value), ([], {}))) return self.rt_sessions.new_session(deferreds, client.profile) @implementer(iwokkel.IDisco) class XEP_0277_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_MICROBLOG)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []