changeset 4383:7c1d77efc752

plugin XEP-0277: Make MbData a Pydantic model: - MbData is now a Pydantic based model. - Bridge method has been updated to use `service` and `node` from MbData instead of using separate fields. - Added `In-Reply-To` to MbData. - Adapted code using XEP-0277 to work with the changes. rel 462
author Goffi <goffi@goffi.org>
date Sun, 03 Aug 2025 23:35:21 +0200
parents b897d98b2c51
children 33468e175ade
files libervia/backend/plugins/plugin_misc_groupblog.py libervia/backend/plugins/plugin_xep_0277.py libervia/backend/plugins/plugin_xep_0470.py libervia/cli/cmd_blog.py libervia/frontends/quick_frontend/quick_blog.py
diffstat 5 files changed, 826 insertions(+), 490 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_misc_groupblog.py	Fri Jul 04 12:33:42 2025 +0200
+++ b/libervia/backend/plugins/plugin_misc_groupblog.py	Sun Aug 03 23:35:21 2025 +0200
@@ -17,9 +17,12 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from twisted.words.xish import domish
+from libervia.backend.core.core_types import SatXMPPClient
 from libervia.backend.core.i18n import _
 from libervia.backend.core.constants import Const as C
 from libervia.backend.core.log import getLogger
+from libervia.backend.plugins.plugin_xep_0277 import MbData
 
 log = getLogger(__name__)
 from twisted.internet import defer
@@ -102,18 +105,31 @@
         config_form = data_form.findForm(item_elt, NS_PUBSUB_ITEM_CONFIG)
         if config_form is None:
             return
+        # FIXME: Legacy code, need to be adapted to new MbData model.
+        log.warning(
+            "Item config is temporarily not supported, it needs to be moved to "
+            "adapted to new model"
+        )
+        return
         access_model = config_form.get(self._p.OPT_ACCESS_MODEL, self._p.ACCESS_OPEN)
         if access_model == self._p.ACCESS_PUBLISHER_ROSTER:
             opt = self._p.OPT_ROSTER_GROUPS_ALLOWED
             microblog_data["groups"] = config_form.fields[opt].values
 
-    def _data_2_entry_trigger(self, client, mb_data, entry_elt, item_elt):
+    def _data_2_entry_trigger(
+        self,
+        client: SatXMPPClient,
+        mb_data: MbData,
+        entry_elt: domish.Element,
+        item_elt: domish.Element
+    ) -> None:
         """Build fine access permission if needed
 
         This trigger check if "group*" key are present,
         and create a fine item config to restrict view to these groups
         """
-        groups = mb_data.get("groups", [])
+        # FIXME: Model_dump is temporarily used while converting MbData to Pydantic model.
+        groups = mb_data.model_dump().get("groups", [])
         if not groups:
             return
         if not client.server_groupblog_available:
--- a/libervia/backend/plugins/plugin_xep_0277.py	Fri Jul 04 12:33:42 2025 +0200
+++ b/libervia/backend/plugins/plugin_xep_0277.py	Sun Aug 03 23:35:21 2025 +0200
@@ -22,11 +22,12 @@
 import calendar
 from mimetypes import guess_type
 from secrets import token_urlsafe
-from typing import List, Optional, Dict, Tuple, Any, Dict
+from typing import List, Optional, Dict, Self, Tuple, Any, Dict, cast
 from functools import partial
 
+from pydantic import BaseModel, ConfigDict, Field, model_validator
+
 import shortuuid
-
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
 from twisted.words.xish import domish
@@ -38,11 +39,16 @@
 from wokkel import disco, iwokkel, rsm
 from zope.interface import implementer
 
+from libervia.backend import G
 from libervia.backend.core.i18n import _
 from libervia.backend.core.constants import Const as C
 from libervia.backend.core.log import getLogger
 from libervia.backend.core import exceptions
 from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.models.types import JIDType
+from libervia.backend.plugins.plugin_misc_text_syntaxes import TextSyntaxes
+from libervia.backend.plugins.plugin_pubsub_cache import PubsubCache
+from libervia.backend.plugins.plugin_xep_0060 import XEP_0060
 from libervia.backend.tools import xml_tools
 from libervia.backend.tools import sat_defer
 from libervia.backend.tools import utils
@@ -56,6 +62,7 @@
 
 NS_MICROBLOG = "urn:xmpp:microblog:0"
 NS_ATOM = "http://www.w3.org/2005/Atom"
+NS_ATOM_THREADING = "http://purl.org/syndication/thread/1.0"
 NS_PUBSUB_EVENT = f"{pubsub.NS_PUBSUB}#event"
 NS_COMMENT_PREFIX = f"{NS_MICROBLOG}:comments/"
 
@@ -78,7 +85,659 @@
     pass
 
 
-class XEP_0277(object):
+class Comment(BaseModel):
+    uri: str = ""
+    service: JIDType | None = None
+    node: str = ""
+
+    @model_validator(mode='after')
+    def set_uri_if_missing(self) -> Self:
+        if not self.uri and self.service is not None and self.node:
+            self.uri = xmpp_uri.build_xmpp_uri(
+                "pubsub",
+                path=self.service.full(),
+                node=self.node
+            )
+        return self
+
+class InReplyTo(BaseModel):
+    ref: str
+    href: str|None
+    type: str|None = None
+
+class Attachment(BaseModel):
+    sources: list[dict[str, str]] = Field(default_factory=list)
+    external: bool = Field(
+        default=False,
+        description="If True, this is a link to an external data, such as a website, "
+        "otherwise it's an attached file.",
+    )
+    media_type: str | None = None
+    desc: str | None = None
+    size: int | None = None
+
+
+class AltLink(BaseModel):
+    url: str
+    media_type: str | None = None
+
+
+class Repeated(BaseModel):
+    by: JIDType
+    uri: str
+
+
+class MbExtra(BaseModel):
+    """Extra data for microblog posts that accepts arbitrary keys"""
+
+    model_config = ConfigDict(extra="allow")
+
+    repeated: Repeated | None = None
+    attachments: list[Attachment] = Field(default_factory=list)
+    alt_links: list[AltLink] = Field(default_factory=list)
+    # FIXME: Must be moved to pubsub signing plugin.
+    encrypted: bool = False
+    encrypted_for: dict | None = None
+    signed: bool = False
+
+
+class MbData(BaseModel):
+    id: str | None = None
+    atom_id: str | None = None
+    service: JIDType | None
+    node: str = NS_MICROBLOG
+    uri: str | None = None
+
+    user_friendly_id: bool = Field(default=True, exclude=True)
+    user_friendly_id_suffix: bool = Field(default=True, exclude=True)
+
+    published: float | None = None
+    updated: float | None = None
+    language: str | None = None
+
+
+    title: str | None = None
+    title_xhtml: str | None = None
+    title_rich: str | None = Field(default=None, exclude=True)
+    content: str | None = None
+    content_xhtml: str | None = None
+    content_rich: str | None = Field(default=None, exclude=True)
+
+    author: str | None = None
+    author_jid: JIDType | None = None
+    author_email: str | None = None
+    author_jid_verified: bool = False
+
+    allow_comments: bool | None = None
+    comments: list[Comment] = Field(default_factory=list)
+    in_reply_tos: list[InReplyTo]|None = None
+    tags: list[str] = Field(default_factory=list)
+
+    extra: MbExtra = Field(default_factory=MbExtra)
+
+    @classmethod
+    async def from_element(
+        cls,
+        client: SatXMPPEntity,
+        item_elt: domish.Element,
+        service: jid.JID | None = None,
+        # FIXME: node is Optional until all calls to item_2_mb_data set properly service
+        #   and node. Once done, the Optional must be removed here
+        node: str | None = None,
+    ) -> Self:
+        """Create an MbData instance from a microblog XML element.
+
+        @param item_elt: Microblog item element.
+        @param service: PubSub service where the item has been retrieved. Profile's PEP is
+            used when service is None.
+        @param node: PubSub node where the item has been retrieved.
+            If None, "uri" won't be set.
+        @return: MbData instance.
+        @raise exceptions.DataError: if the XML is malformed or missing required data
+        """
+        text_syntaxes = cast(TextSyntaxes, G.host.plugins["TEXT_SYNTAXES"])
+        if service is None:
+            service = client.jid.userhostJID()
+
+        # there can be no id for transient nodes
+        id_ = item_elt.getAttribute("id") or ""
+        if item_elt.uri not in (pubsub.NS_PUBSUB, pubsub.NS_PUBSUB_EVENT):
+            raise exceptions.DataError(
+                f"Unsupported namespace {item_elt.uri} in pubsub item {id_!r}."
+            )
+
+        try:
+            entry_elt = next(item_elt.elements(NS_ATOM, "entry"))
+        except StopIteration:
+            raise exceptions.DataError(f"No atom entry found in the pubsub item {id_!r}.")
+
+        # Initialize basic data
+        data = {
+            "id": id_,
+            "service": service,
+            "node": node,
+        }
+
+        # Set URI if node is provided
+        # FIXME: node should alway be set in the future, check FIXME in method signature.
+        if node is not None:
+            data["uri"] = xmpp_uri.build_xmpp_uri(
+                "pubsub",
+                path=service.full(),
+                node=node,
+                item=id_,
+            )
+
+        # Language.
+        try:
+            data["language"] = entry_elt[(C.NS_XML, "lang")].strip()
+        except KeyError:
+            pass
+
+        # atom:id.
+        try:
+            id_elt = next(entry_elt.elements(NS_ATOM, "id"))
+        except StopIteration:
+            log.warning(
+                f"No atom id found in the pubsub item {id_}, this is not standard!"
+            )
+            data["atom_id"] = ""
+        else:
+            data["atom_id"] = str(id_elt)
+
+        # Title(s) and content(s).
+        # FIXME: ATOM and XEP-0277 only allow 1 <title/> element but in the wild we have
+        #   some blogs with several ones so we don't respect the standard for now (it
+        #   doesn't break anything anyway), and we'll find a better option later.
+        title_elts = list(entry_elt.elements(NS_ATOM, "title"))
+        if not title_elts:
+            raise exceptions.DataError(f"No atom title found in the pubsub item {id_!r}.")
+
+        for title_elt in title_elts:
+            await cls._parse_content_element(title_elt, data)
+
+        # Content(s)
+        # FIXME: As for <title/>, Atom only authorise at most 1 content but XEP-0277
+        #   allows several ones. So for no we handle as if more than one can be present.
+        for content_elt in entry_elt.elements(NS_ATOM, "content"):
+            await cls._parse_content_element(content_elt, data)
+
+        # Ensure text content exists when only xhtml is provided.
+        for key in ("title", "content"):
+            if key not in data and f"{key}_xhtml" in data:
+                log.warning(f"item {id_!r} provides {key}_xhtml data but not a text one.")
+                data[key] = await text_syntaxes.convert(
+                    data[f"{key}_xhtml"],
+                    text_syntaxes.SYNTAX_XHTML,
+                    text_syntaxes.SYNTAX_TEXT,
+                    False,
+                )
+
+        if "content" not in data:
+            # Use the atom title data as the microblog body content.
+            data["content"] = data["title"]
+            del data["title"]
+            if "title_xhtml" in data:
+                data["content_xhtml"] = data["title_xhtml"]
+                del data["title_xhtml"]
+
+        # Dates.
+        try:
+            updated_elt = next(entry_elt.elements(NS_ATOM, "updated"))
+        except StopIteration:
+            raise exceptions.DataError(
+                f"No atom updated element found in the pubsub item {id_!r}."
+            )
+
+        data["updated"] = calendar.timegm(
+            dateutil.parser.parse(str(updated_elt)).utctimetuple()
+        )
+
+        try:
+            published_elt = next(entry_elt.elements(NS_ATOM, "published"))
+        except StopIteration:
+            data["published"] = data["updated"]
+        else:
+            data["published"] = calendar.timegm(
+                dateutil.parser.parse(str(published_elt)).utctimetuple()
+            )
+
+        # Initialize extra data
+        extra = MbExtra()
+
+        # Links (comments, attachments, etc.)
+        comments = []
+        for link_elt in entry_elt.elements(NS_ATOM, "link"):
+            href = link_elt.getAttribute("href")
+            if not href:
+                log.warning(f"Missing href in <link> element: {link_elt.toXml()}.")
+                continue
+
+            rel = link_elt.getAttribute("rel")
+            if rel == "replies" and link_elt.getAttribute("title") == "comments":
+                try:
+                    comment_service, comment_node = XEP_0277.parse_comment_url(href)
+                except exceptions.DataError as e:
+                    log.exception(f"Can't parse comments url: {e}.")
+                    continue
+                else:
+                    comments.append(
+                        Comment(uri=href, service=comment_service, node=comment_node)
+                    )
+            elif rel == "via":
+                try:
+                    repeater_jid = jid.JID(item_elt["publisher"])
+                except (KeyError, RuntimeError):
+                    # We look for stanza element which is at the root, meaning that it has
+                    #   not parent.
+                    top_elt = item_elt.parent
+                    assert top_elt is not None
+                    while top_elt.parent is not None:
+                        top_elt = top_elt.parent
+                    try:
+                        repeater_jid = jid.JID(top_elt["from"])
+                    except (AttributeError, RuntimeError):
+                        log.error(f"Can't find repeater of the post: {item_elt.toXml()}.")
+                        continue
+                extra.repeated = Repeated(by=repeater_jid, uri=href)
+            elif rel in ("related", "enclosure"):
+                extra.attachments.append(cls._parse_attachment(link_elt, href, rel))
+            elif rel == "alternate":
+                media_type = link_elt.getAttribute("type") or guess_type(href)[0]
+                if not media_type:
+                    log.warning(
+                        "Invalid or missing media type for alternate link: "
+                        f"{link_elt.toXml()}."
+                    )
+
+                extra.alt_links.append(AltLink(url=href, media_type=media_type))
+            else:
+                log.warning(f"Unmanaged link element: {link_elt.toXml()}.")
+
+        if comments:
+            data["comments"] = comments
+
+        # Reply-To
+        in_reply_tos = []
+        for in_reply_to_elt in entry_elt.elements(NS_ATOM_THREADING, "in-reply-to"):
+            in_reply_to = InReplyTo(
+                ref=in_reply_to_elt.getAttribute("ref", ""),
+                href=in_reply_to_elt.getAttribute("href"),
+                type=in_reply_to_elt.getAttribute("type")
+            )
+            if not in_reply_to_elt.ref:
+                log.warning(
+                    "No ref in <in-reply-to> element, this is not valid, ignoring: "
+                    f"in_reply_to_elt.toXml()"
+                )
+                continue
+            in_reply_tos.append(in_reply_to)
+
+        if in_reply_tos:
+            data["in_reply_tos"] = in_reply_tos
+
+        # Author information.
+        author_data = cls._parse_author(entry_elt, item_elt, id_, extra)
+        data.update(author_data)
+
+        # Tags/categories
+        data["tags"] = [
+            term
+            for category_elt in entry_elt.elements(NS_ATOM, "category")
+            if (term := category_elt.getAttribute("term"))
+        ]
+
+        data["extra"] = extra
+
+        ## the trigger ##
+        # if other plugins have things to add or change
+        G.host.trigger.point("XEP-0277_item2data", item_elt, entry_elt, data)
+
+        return cls(**data)
+
+    @staticmethod
+    async def _parse_content_element(elem: domish.Element, data: dict[str, Any]) -> None:
+        """Parse title/content elements and add to data dict."""
+        type_ = elem.getAttribute("type")
+        if type_ == "xhtml":
+            data_elt = elem.firstChildElement()
+            if data_elt is None:
+                raise exceptions.DataError(
+                    "XHML content not wrapped in a <div/> element, this is not standard!"
+                )
+            if data_elt.uri != C.NS_XHTML:
+                raise exceptions.DataError(
+                    "Content of type XHTML must declare its namespace!"
+                )
+
+            # We clean the content to avoid anything dangerous.
+            text_syntaxes = cast(TextSyntaxes, G.host.plugins["TEXT_SYNTAXES"])
+            data[f"{elem.name}_xhtml"] = text_syntaxes.clean_xhtml(data_elt.toXml())
+        else:
+            data[elem.name] = str(elem)
+
+    @classmethod
+    def _parse_attachment(
+        cls, link_elt: domish.Element, href: str, rel: str
+    ) -> Attachment:
+        """Parse an attachment (related/enclosure) link."""
+        attachment = Attachment(sources=[{"url": href}], external=(rel == "related"))
+
+        if media_type := link_elt.getAttribute("type"):
+            attachment.media_type = media_type
+        if desc := link_elt.getAttribute("title"):
+            attachment.desc = desc
+        try:
+            attachment.size = int(link_elt["length"])
+        except (KeyError, ValueError):
+            pass
+
+        if not attachment.media_type:
+            if guessed_type := guess_type(href, False)[0]:
+                attachment.media_type = guessed_type
+
+        return attachment
+
+    @classmethod
+    def _parse_author(
+        cls,
+        entry_elt: domish.Element,
+        item_elt: domish.Element,
+        item_id: str,
+        extra: MbExtra,
+    ) -> dict[str, Any]:
+        """Parse author information from the entry."""
+        author_data: dict[str, Any] = {}
+        publisher = item_elt.getAttribute("publisher")
+
+        try:
+            author_elt = next(entry_elt.elements(NS_ATOM, "author"))
+        except StopIteration:
+            log.debug(f"Can't find author element in item {item_id!r}")
+            if publisher:
+                author_data["author_jid"] = publisher
+                author_data["author_jid_verified"] = True
+            return author_data
+
+        # Name.
+        try:
+            name_elt = next(author_elt.elements(NS_ATOM, "name"))
+        except StopIteration:
+            log.warning("No name element found in author element of item {item_id!r}.")
+        else:
+            author_data["author"] = str(name_elt).strip()
+
+        # Parse URI.
+        try:
+            uri_elt = next(author_elt.elements(NS_ATOM, "uri"))
+        except StopIteration:
+            log.debug(f"No uri element found in author element of item {item_id!r}.")
+            if publisher:
+                author_data["author_jid"] = publisher
+                author_data["author_jid_verified"] = True
+            else:
+                iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM)
+                author_data["author_jid"] = iq_elt["from"]
+        else:
+            uri = str(uri_elt)
+            if uri.startswith("xmpp:"):
+                author_data["author_jid"] = uri[5:] or None
+            else:
+                author_data["author_jid"] = publisher or None
+
+            if publisher:
+                try:
+                    author_data["author_jid_verified"] = (
+                        jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID()
+                    )
+                except Exception:
+                    author_data["author_jid_verified"] = False
+
+                if not author_data["author_jid_verified"] and not extra.repeated is None:
+                    log.warning(
+                        'Item\'s atom:uri differs from "publisher" attribute, spoofing '
+                        f"attempt? {uri=} {publisher=}"
+                    )
+
+        # If no author name but we have JID, use the username.
+        if "author" not in author_data and author_data.get("author_jid"):
+            try:
+                author_data["author"] = jid.JID(author_data["author_jid"]).user
+            except Exception as e:
+                log.warning(f"Couldn't parse author JID: {e}.")
+
+        # Email.
+        try:
+            email_elt = next(author_elt.elements(NS_ATOM, "email"))
+        except StopIteration:
+            pass
+        else:
+            author_data["author_email"] = str(email_elt)
+
+        return author_data
+
+    async def to_element(
+        self,
+        client: SatXMPPEntity,
+    ) -> pubsub.Item:
+        """Convert this MbData instance to a PubSub item Element.
+
+        @param client: Client instance.
+        @return: PubSub Item containing the Atom entry element
+        """
+        text_syntaxes = cast(TextSyntaxes, G.host.plugins["TEXT_SYNTAXES"])
+        entry_elt = domish.Element((NS_ATOM, "entry"))
+
+        ## Language ##
+        if self.language:
+            entry_elt[(C.NS_XML, "lang")] = self.language.strip()
+
+        ## Content and Title ##
+
+        for elem_name in ("title", "content"):
+            for type_ in ["", "_rich", "_xhtml"]:
+                attr = f"{elem_name}{type_}"
+                value = getattr(self, attr)
+                if value is not None:
+                    elem = entry_elt.addElement(elem_name)
+                    if type_:
+                        if type_ == "_rich":
+                            # Convert input from current syntax to XHTML
+                            if getattr(self, f"{elem_name}_xhtml") != None:
+                                raise failure.Failure(
+                                    exceptions.DataError(
+                                        "Can't have xhtml and rich content at the same "
+                                        "time"
+                                    )
+                                )
+                            content_xhtml = await text_syntaxes.convert(
+                                value,
+                                text_syntaxes.get_current_syntax(client.profile),
+                                "XHTML",
+                            )
+                        else:
+                            content_xhtml = value
+
+                        div_elt = xml_tools.ElementParser()(
+                            content_xhtml, namespace=C.NS_XHTML
+                        )
+                        if (
+                            div_elt.name != "div"
+                            or div_elt.uri != C.NS_XHTML
+                            or div_elt.attributes
+                        ):
+                            # We need a wrapping <div/> at the top with XHTML namespace.
+                            wrap_div_elt = domish.Element((C.NS_XHTML, "div"))
+                            wrap_div_elt.addChild(div_elt)
+                            div_elt = wrap_div_elt
+                        elem.addChild(div_elt)
+                        elem["type"] = "xhtml"
+                        if getattr(self, elem_name) is None:
+                            # There is no text content. It is mandatory so we create one
+                            # from xhtml content.
+                            elem_txt = entry_elt.addElement(elem_name)
+                            text_content = await text_syntaxes.convert(
+                                content_xhtml,
+                                text_syntaxes.SYNTAX_XHTML,
+                                text_syntaxes.SYNTAX_TEXT,
+                                False,
+                            )
+                            elem_txt.addContent(text_content)
+                            elem_txt["type"] = "text"
+
+                    else:
+                        # Raw text only needs to be escaped to get HTML-safe sequence.
+                        elem.addContent(value)
+                        elem["type"] = "text"
+
+        # Ensure we have at least a title.
+        if not any(entry_elt.elements(NS_ATOM, "title")):
+            if any(entry_elt.elements(NS_ATOM, "content")):
+                for elem in entry_elt.elements(NS_ATOM, "content"):
+                    elem.name = "title"
+            else:
+                raise exceptions.DataError(
+                    "Atom entry must have at least a title or content element"
+                )
+
+        ## Attachments ##
+        for attachment in self.extra.attachments:
+            url = None
+            if attachment.sources:
+                for source in attachment.sources:
+                    if "url" in source:
+                        url = source["url"]
+                        break
+
+            if not url:
+                log.warning(f'"url" missing in attachment, ignoring: {attachment}')
+                continue
+
+            if not url.startswith("http"):
+                log.warning(f"Non HTTP URL in attachment, ignoring: {attachment}.")
+                continue
+
+            link_elt = entry_elt.addElement("link")
+            # XXX: "uri" is set in self._manage_comments if not already existing
+            link_elt["href"] = url
+            link_elt["rel"] = "related" if attachment.external else "enclosure"
+
+            if attachment.media_type:
+                link_elt["type"] = attachment.media_type
+            if attachment.desc:
+                link_elt["title"] = attachment.desc
+            if attachment.size:
+                link_elt["length"] = str(attachment.size)
+
+        ## Alternate Links ##
+        for alt_link in self.extra.alt_links:
+            if self.service is None or self.node is None or self.id is None:
+                log.warning(
+                    f"Can't compute alternate link due to missing service, node or ID."
+                )
+                continue
+            link_elt = entry_elt.addElement("link")
+            url = link_elt["href"] = alt_link.url.format(
+                service=quote(self.service.full(), safe=""),
+                node=quote(self.node, safe=""),
+                item=quote(self.id, safe=""),
+            )
+            link_elt["rel"] = "alternate"
+            if alt_link.media_type:
+                media_type = alt_link.media_type
+            else:
+                parsed_url = urlparse(url)
+                if parsed_url.scheme in ["http", "https"]:
+                    media_type = "text/html"
+                else:
+                    media_type = guess_type(url)[0] or "application/octet-stream"
+
+            link_elt["type"] = media_type
+
+        ## Author ##
+        author_elt = entry_elt.addElement("author")
+        author_name = self.author or (self.author_jid.user if self.author_jid else "")
+        author_elt.addElement("name", content=author_name)
+
+        author_jid = self.author_jid or client.jid
+        author_elt.addElement("uri", content=f"xmpp:{author_jid.userhost()}")
+
+        if self.author_email:
+            author_elt.addElement("email", content=self.author_email)
+
+        ## Dates ##
+        current_time = time.time()
+        entry_elt.addElement(
+            "updated",
+            content=utils.xmpp_date(
+                float(self.updated if self.updated else current_time)
+            ),
+        )
+        entry_elt.addElement(
+            "published",
+            content=utils.xmpp_date(
+                float(self.published if self.published else current_time)
+            ),
+        )
+
+        ## Tags ##
+        for tag in self.tags:
+            category_elt = entry_elt.addElement("category")
+            category_elt["term"] = tag
+
+        ## ID ##
+        if self.atom_id is None:
+            if self.id is not None:
+                if self.service is not None:
+                    self.atom_id = xmpp_uri.build_xmpp_uri(
+                        "pubsub",
+                        path=self.service.full(),
+                        node=self.node,
+                        item=self.id or "",
+                    )
+                else:
+                    self.atom_id = self.id
+            else:
+                self.atom_id = shortuuid.uuid()
+        entry_elt.addElement("id", content=self.atom_id)
+
+        ## Comments ##
+        for comment in self.comments:
+            link_elt = entry_elt.addElement("link")
+            # XXX: "uri" is set in self._manage_comments if not already existing.
+            if not comment.uri:
+                log.warning("Missing URI for {comment}, ignoring.")
+                continue
+            link_elt["href"] = comment.uri
+            link_elt["rel"] = "replies"
+            link_elt["title"] = "comments"
+
+        ## In-Reply-Tos ###
+        if self.in_reply_tos is not None:
+            for in_reply_to in self.in_reply_tos:
+                in_reply_to_elt = entry_elt.addElement((NS_ATOM_THREADING, "in-reply-to"))
+                in_reply_to_elt["ref"] = in_reply_to.ref
+                if href := in_reply_to.href is not None:
+                    in_reply_to_elt["href"] = href
+                if type_ := in_reply_to.type is not None:
+                   in_reply_to_elt["type"] = type_
+
+        ## Reposts ##
+        if self.extra.repeated:
+            link_elt = entry_elt.addElement("link")
+            link_elt["rel"] = "via"
+            link_elt["href"] = self.extra.repeated.uri
+
+        ## Final item building ##
+        item_elt = pubsub.Item(id=self.id, payload=entry_elt)
+
+        ## Trigger ##
+        G.host.trigger.point("XEP-0277_data2entry", client, self, entry_elt, item_elt)
+
+        return item_elt
+
+
+class XEP_0277:
     namespace = NS_MICROBLOG
     NS_ATOM = NS_ATOM
 
@@ -86,10 +745,8 @@
         log.info(_("Microblogging plugin initialization"))
         self.host = host
         host.register_namespace("microblog", NS_MICROBLOG)
-        self._p = self.host.plugins[
-            "XEP-0060"
-        ]  # this facilitate the access to pubsub plugin
-        ps_cache = self.host.plugins.get("PUBSUB_CACHE")
+        self._p = cast(XEP_0060, self.host.plugins["XEP-0060"])
+        ps_cache = cast(PubsubCache, self.host.plugins.get("PUBSUB_CACHE"))
         if ps_cache is not None:
             ps_cache.register_analyser(
                 {
@@ -103,14 +760,12 @@
                 }
             )
         self.rt_sessions = sat_defer.RTDeferredSessions()
-        self.host.plugins["XEP-0060"].add_managed_node(
-            NS_MICROBLOG, items_cb=self._items_received
-        )
+        self._p.add_managed_node(NS_MICROBLOG, items_cb=self._items_received)
 
         host.bridge.add_method(
             "mb_send",
             ".plugin",
-            in_sign="ssss",
+            in_sign="ss",
             out_sign="s",
             method=self._mb_send,
             async_=True,
@@ -260,15 +915,14 @@
 
     ## data/item transformation ##
 
-    @defer.inlineCallbacks
-    def item_2_mb_data(
+    async def item_2_mb_data(
         self,
         client: SatXMPPEntity,
         item_elt: domish.Element,
-        service: Optional[jid.JID],
+        service: jid.JID | None,
         # FIXME: node is Optional until all calls to item_2_mb_data set properly service
         #   and node. Once done, the Optional must be removed here
-        node: Optional[str],
+        node: str | None,
     ) -> dict:
         """Convert an XML Item to microblog data
 
@@ -279,342 +933,21 @@
             if None, "uri" won't be set
         @return: microblog data
         """
-        if service is None:
-            service = client.jid.userhostJID()
-
-        extra: Dict[str, Any] = {}
-        mb_data: Dict[str, Any] = {"service": service.full(), "extra": extra}
-
-        def check_conflict(key, increment=False):
-            """Check if key is already in microblog data
-
-            @param key(unicode): key to check
-            @param increment(bool): if suffix the key with an increment
-                instead of raising an exception
-            @raise exceptions.DataError: the key already exists
-                (not raised if increment is True)
-            """
-            if key in mb_data:
-                if not increment:
-                    raise failure.Failure(
-                        exceptions.DataError(
-                            "key {} is already present for item {}"
-                        ).format(key, item_elt["id"])
-                    )
-                else:
-                    idx = 1  # the idx 0 is the key without suffix
-                    fmt = "{}#{}"
-                    new_key = fmt.format(key, idx)
-                    while new_key in mb_data:
-                        idx += 1
-                        new_key = fmt.format(key, idx)
-                    key = new_key
-            return key
-
-        @defer.inlineCallbacks
-        def parseElement(elem):
-            """Parse title/content elements and fill microblog_data accordingly"""
-            type_ = elem.getAttribute("type")
-            if type_ == "xhtml":
-                data_elt = elem.firstChildElement()
-                if data_elt is None:
-                    raise failure.Failure(
-                        exceptions.DataError(
-                            "XHML content not wrapped in a <div/> element, this is not "
-                            "standard !"
-                        )
-                    )
-                if data_elt.uri != C.NS_XHTML:
-                    raise failure.Failure(
-                        exceptions.DataError(
-                            _("Content of type XHTML must declare its namespace!")
-                        )
-                    )
-                key = check_conflict("{}_xhtml".format(elem.name))
-                data = data_elt.toXml()
-                mb_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml(data)
-            else:
-                key = check_conflict(elem.name)
-                mb_data[key] = str(elem)
-
-        id_ = item_elt.getAttribute("id", "")  # there can be no id for transient nodes
-        mb_data["id"] = id_
-        if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT):
-            msg = "Unsupported namespace {ns} in pubsub item {id_}".format(
-                ns=item_elt.uri, id_=id_
-            )
-            log.warning(msg)
-            raise failure.Failure(exceptions.DataError(msg))
-
-        try:
-            entry_elt = next(item_elt.elements(NS_ATOM, "entry"))
-        except StopIteration:
-            msg = "No atom entry found in the pubsub item {}".format(id_)
-            raise failure.Failure(exceptions.DataError(msg))
-
-        # uri
-        # FIXME: node should alway be set in the future, check FIXME in method signature
-        if node is not None:
-            mb_data["node"] = node
-            mb_data["uri"] = xmpp_uri.build_xmpp_uri(
-                "pubsub",
-                path=service.full(),
-                node=node,
-                item=id_,
-            )
-
-        # language
-        try:
-            mb_data["language"] = entry_elt[(C.NS_XML, "lang")].strip()
-        except KeyError:
-            pass
-
-        # atom:id
-        try:
-            id_elt = next(entry_elt.elements(NS_ATOM, "id"))
-        except StopIteration:
-            msg = "No atom id found in the pubsub item {}, this is not standard !".format(
-                id_
-            )
-            log.warning(msg)
-            mb_data["atom_id"] = ""
-        else:
-            mb_data["atom_id"] = str(id_elt)
-
-        # title/content(s)
-
-        # FIXME: ATOM and XEP-0277 only allow 1 <title/> element
-        #        but in the wild we have some blogs with several ones
-        #        so we don't respect the standard for now (it doesn't break
-        #        anything anyway), and we'll find a better option later
-        # try:
-        #     title_elt = entry_elt.elements(NS_ATOM, 'title').next()
-        # except StopIteration:
-        #     msg = u'No atom title found in the pubsub item {}'.format(id_)
-        #     raise failure.Failure(exceptions.DataError(msg))
-        title_elts = list(entry_elt.elements(NS_ATOM, "title"))
-        if not title_elts:
-            msg = "No atom title found in the pubsub item {}".format(id_)
-            raise failure.Failure(exceptions.DataError(msg))
-        for title_elt in title_elts:
-            yield parseElement(title_elt)
-
-        # FIXME: as for <title/>, Atom only authorise at most 1 content
-        #        but XEP-0277 allows several ones. So for no we handle as
-        #        if more than one can be present
-        for content_elt in entry_elt.elements(NS_ATOM, "content"):
-            yield parseElement(content_elt)
-
-        # we check that text content is present
-        for key in ("title", "content"):
-            if key not in mb_data and ("{}_xhtml".format(key)) in mb_data:
-                log.warning(
-                    "item {id_} provide a {key}_xhtml data but not a text one".format(
-                        id_=id_, key=key
-                    )
-                )
-                # ... and do the conversion if it's not
-                mb_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert(
-                    mb_data["{}_xhtml".format(key)],
-                    self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML,
-                    self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT,
-                    False,
-                )
-
-        if "content" not in mb_data:
-            # use the atom title data as the microblog body content
-            mb_data["content"] = mb_data["title"]
-            del mb_data["title"]
-            if "title_xhtml" in mb_data:
-                mb_data["content_xhtml"] = mb_data["title_xhtml"]
-                del mb_data["title_xhtml"]
+        mb_data = await MbData.from_element(client, item_elt, service, node)
+        return mb_data.model_dump(exclude_none=True)
 
-        # published/updated dates
-        try:
-            updated_elt = next(entry_elt.elements(NS_ATOM, "updated"))
-        except StopIteration:
-            msg = "No atom updated element found in the pubsub item {}".format(id_)
-            raise failure.Failure(exceptions.DataError(msg))
-        mb_data["updated"] = calendar.timegm(
-            dateutil.parser.parse(str(updated_elt)).utctimetuple()
-        )
-        try:
-            published_elt = next(entry_elt.elements(NS_ATOM, "published"))
-        except StopIteration:
-            mb_data["published"] = mb_data["updated"]
-        else:
-            mb_data["published"] = calendar.timegm(
-                dateutil.parser.parse(str(published_elt)).utctimetuple()
-            )
-
-        # links
-        comments = mb_data["comments"] = []
-        for link_elt in entry_elt.elements(NS_ATOM, "link"):
-            href = link_elt.getAttribute("href")
-            if not href:
-                log.warning(f"missing href in <link> element: {link_elt.toXml()}")
-                continue
-            rel = link_elt.getAttribute("rel")
-            if rel == "replies" and link_elt.getAttribute("title") == "comments":
-                uri = href
-                comments_data = {
-                    "uri": uri,
-                }
-                try:
-                    comment_service, comment_node = self.parse_comment_url(uri)
-                except Exception as e:
-                    log.warning(f"Can't parse comments url: {e}")
-                    continue
-                else:
-                    comments_data["service"] = comment_service.full()
-                    comments_data["node"] = comment_node
-                comments.append(comments_data)
-            elif rel == "via":
-                try:
-                    repeater_jid = jid.JID(item_elt["publisher"])
-                except (KeyError, RuntimeError):
-                    try:
-                        # we look for stanza element which is at the root, meaning that it
-                        # has not parent
-                        top_elt = item_elt.parent
-                        while top_elt.parent is not None:
-                            top_elt = top_elt.parent
-                        repeater_jid = jid.JID(top_elt["from"])
-                    except (AttributeError, RuntimeError):
-                        # we should always have either the "publisher" attribute or the
-                        # stanza available
-                        log.error(f"Can't find repeater of the post: {item_elt.toXml()}")
-                        continue
-
-                extra["repeated"] = {"by": repeater_jid.full(), "uri": href}
-            elif rel in ("related", "enclosure"):
-                attachment: Dict[str, Any] = {"sources": [{"url": href}]}
-                if rel == "related":
-                    attachment["external"] = True
-                for attr, key in (
-                    ("type", "media_type"),
-                    ("title", "desc"),
-                ):
-                    value = link_elt.getAttribute(attr)
-                    if value:
-                        attachment[key] = value
-                try:
-                    attachment["size"] = int(link_elt.attributes["lenght"])
-                except (KeyError, ValueError):
-                    pass
-                if "media_type" not in attachment:
-                    media_type = guess_type(href, False)[0]
-                    if media_type is not None:
-                        attachment["media_type"] = media_type
-
-                attachments = extra.setdefault("attachments", [])
-                attachments.append(attachment)
-            elif rel == "alternate":
-                link_data = {"url": href}
-                media_type = link_elt.getAttribute("type") or guess_type(href)[0]
-                if media_type:
-                    link_data["media_type"] = media_type
-                else:
-                    log.warning(
-                        f"Invalid or missing media type for alternate link: {href}"
-                    )
-                extra.setdefault("alt_links", []).append(link_data)
-            else:
-                log.warning(f"Unmanaged link element: {link_elt.toXml()}")
-
-        # author
-        publisher = item_elt.getAttribute("publisher")
-        try:
-            author_elt = next(entry_elt.elements(NS_ATOM, "author"))
-        except StopIteration:
-            log.debug("Can't find author element in item {}".format(id_))
-        else:
-            # name
-            try:
-                name_elt = next(author_elt.elements(NS_ATOM, "name"))
-            except StopIteration:
-                log.warning(
-                    "No name element found in author element of item {}".format(id_)
-                )
-                author = None
-            else:
-                author = mb_data["author"] = str(name_elt).strip()
-            # uri
-            try:
-                uri_elt = next(author_elt.elements(NS_ATOM, "uri"))
-            except StopIteration:
-                log.debug("No uri element found in author element of item {}".format(id_))
-                if publisher:
-                    mb_data["author_jid"] = publisher
-            else:
-                uri = str(uri_elt)
-                if uri.startswith("xmpp:"):
-                    uri = uri[5:]
-                    mb_data["author_jid"] = uri
-                else:
-                    mb_data["author_jid"] = item_elt.getAttribute("publisher") or ""
-                if not author and mb_data["author_jid"]:
-                    # FIXME: temporary workaround for missing author name, would be
-                    #   better to use directly JID's identity (to be done from frontends?)
-                    try:
-                        mb_data["author"] = jid.JID(mb_data["author_jid"]).user
-                    except Exception as e:
-                        log.warning(
-                            f"No author name found, and can't parse author jid: {e}"
-                        )
-
-                if not publisher:
-                    log.debug("No publisher attribute, we can't verify author jid")
-                    mb_data["author_jid_verified"] = False
-                elif jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID():
-                    mb_data["author_jid_verified"] = True
-                else:
-                    if "repeated" not in extra:
-                        log.warning(
-                            "item atom:uri differ from publisher attribute, spoofing "
-                            "attempt ? atom:uri = {} publisher = {}".format(
-                                uri, item_elt.getAttribute("publisher")
-                            )
-                        )
-                    mb_data["author_jid_verified"] = False
-            # email
-            try:
-                email_elt = next(author_elt.elements(NS_ATOM, "email"))
-            except StopIteration:
-                pass
-            else:
-                mb_data["author_email"] = str(email_elt)
-
-        if not mb_data.get("author_jid"):
-            if publisher:
-                mb_data["author_jid"] = publisher
-                mb_data["author_jid_verified"] = True
-            else:
-                iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM)
-                mb_data["author_jid"] = iq_elt["from"]
-                mb_data["author_jid_verified"] = False
-
-        # categories
-        categories = [
-            category_elt.getAttribute("term", "")
-            for category_elt in entry_elt.elements(NS_ATOM, "category")
-        ]
-        mb_data["tags"] = categories
-
-        ## the trigger ##
-        # if other plugins have things to add or change
-        yield self.host.trigger.point("XEP-0277_item2data", item_elt, entry_elt, mb_data)
-
-        return mb_data
-
-    async def mb_data_2_entry_elt(self, client, mb_data, item_id, service, node):
+    async def mb_data_2_entry_elt(
+        self,
+        client: SatXMPPEntity,
+        mb_data: dict,
+    ) -> pubsub.Item:
         """Convert a data dict to en entry usable to create an item
 
         @param mb_data: data dict as given by bridge method.
-        @param item_id(unicode): id of the item to use
-        @param service(jid.JID, None): pubsub service where the item is sent
+        @param item_id: id of the item to use
+        @param service: pubsub service where the item is sent
             Needed to construct Atom id
-        @param node(unicode): pubsub node where the item is sent
+        @param node: pubsub node where the item is sent
             Needed to construct Atom id
         @return: deferred which fire domish.Element
         """
@@ -626,7 +959,7 @@
             entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip()
 
         ## content and title ##
-        synt = self.host.plugins["TEXT_SYNTAXES"]
+        text_syntaxes = cast(TextSyntaxes, self.host.plugins["TEXT_SYNTAXES"])
 
         for elem_name in ("title", "content"):
             for type_ in ["", "_rich", "_xhtml"]:
@@ -635,9 +968,9 @@
                     elem = entry_elt.addElement(elem_name)
                     if type_:
                         if type_ == "_rich":  # convert input from current syntax to XHTML
-                            xml_content = await synt.convert(
+                            xml_content = await text_syntaxes.convert(
                                 mb_data[attr],
-                                synt.get_current_syntax(client.profile),
+                                text_syntaxes.get_current_syntax(client.profile),
                                 "XHTML",
                             )
                             if f"{elem_name}_xhtml" in mb_data:
@@ -889,45 +1222,45 @@
         )
 
     async def _manage_comments(
-        self, client, mb_data, service, node, item_id, access=None
-    ):
-        """Check comments keys in mb_data and create comments node if necessary
+        self, client: SatXMPPEntity, mb_data: MbData, access: str | None = None
+    ) -> None:
+        """Check comments keys in mb_data and create comments node if necessary.
 
-        if a comments node metadata is set in the mb_data['comments'] list, it is used
+        If a comments node metadata is set in the mb_data['comments'] list, it is used
         otherwise it is generated (if allow_comments is True).
-        @param mb_data(dict): microblog mb_data
-        @param service(jid.JID, None): PubSub service of the parent item
-        @param node(unicode): node of the parent item
-        @param item_id(unicode): id of the parent item
-        @param access(unicode, None): access model
+
+        @param mb_data: Microblog data.
+        @param access: Access model.
             None to use same access model as parent item
         """
-        allow_comments = mb_data.pop("allow_comments", None)
-        if allow_comments is None:
-            if "comments" in mb_data:
-                mb_data["allow_comments"] = True
+        if mb_data.allow_comments is None:
+            if mb_data.comments:
+                mb_data.allow_comments = True
             else:
-                # no comments set or requested, nothing to do
+                # No comments set or requested, nothing to do.
                 return
-        elif allow_comments == False:
-            if "comments" in mb_data:
+        elif mb_data.allow_comments == False:
+            if mb_data.comments:
                 log.warning(
-                    "comments are not allowed but there is already a comments node, "
-                    "it may be lost: {uri}".format(uri=mb_data["comments"])
+                    "Comments are not allowed but there is already a comments node, "
+                    "it may be lost: {mb_data['comments']}."
                 )
-                del mb_data["comments"]
+                mb_data.comments.clear()
             return
 
-        # we have usually a single comment node, but the spec allow several, so we need to
-        # handle this in a list
-        if len(mb_data.setdefault("comments", [])) == 0:
-            # we need at least one comment node
-            mb_data["comments"].append({})
+        # We have usually a single comment node, but the spec allow several, so we need to
+        # handle this in a list.
+        if len(mb_data.comments) == 0:
+            # We need at least one comment node, we set an empty one for now, we'll
+            # complete it below.
+            mb_data.comments.append(Comment())
 
         if access is None:
-            # TODO: cache access models per service/node
+            # TODO: cache access models per service/node.
             try:
-                parent_node_config = await self._p.getConfiguration(client, service, node)
+                parent_node_config = await self._p.getConfiguration(
+                    client, mb_data.service, mb_data.node
+                )
             except error.StanzaError as e:
                 log.debug(f"Can't get parent node configuration: {e}")
                 access = self._p.ACCESS_OPEN
@@ -946,51 +1279,50 @@
             self._p.OPT_PUBLISH_MODEL: self._p.ACCESS_OPEN,
         }
 
-        # if other plugins need to change the options
+        # If other plugins need to change the options.
         self.host.trigger.point("XEP-0277_comments", client, mb_data, options)
 
-        for comments_data in mb_data["comments"]:
-            uri = comments_data.get("uri")
-            comments_node = comments_data.get("node")
-            try:
-                comments_service = jid.JID(comments_data["service"])
-            except KeyError:
-                comments_service = None
-
-            if uri:
-                uri_service, uri_node = self.parse_comment_url(uri)
-                if (comments_node is not None and comments_node != uri_node) or (
-                    comments_service is not None and comments_service != uri_service
+        for comments_data in mb_data.comments:
+            if comments_data.uri:
+                uri_service, uri_node = self.parse_comment_url(comments_data.uri)
+                if (
+                    comments_data.node is not None and comments_data.node != uri_node
+                ) or (
+                    comments_data.service is not None
+                    and comments_data.service != uri_service
                 ):
                     raise ValueError(
-                        f"Incoherence between comments URI ({uri}) and comments_service "
-                        f"({comments_service}) or comments_node ({comments_node})"
+                        f"Incoherence between comments URI ({comments_data.uri}) and "
+                        f"comments_service ({comments_data.service}) or comments_node "
+                        f"({comments_data.node})."
                     )
-                comments_data["service"] = comments_service = uri_service
-                comments_data["node"] = comments_node = uri_node
+                comments_data.service = uri_service
+                comments_data.node = uri_node
             else:
-                if not comments_node:
-                    comments_node = self.get_comments_node(item_id)
-                comments_data["node"] = comments_node
-                if comments_service is None:
-                    comments_service = await self.get_comments_service(client, service)
-                    if comments_service is None:
-                        comments_service = client.jid.userhostJID()
-                comments_data["service"] = comments_service
+                if not comments_data.node:
+                    comments_data.node = self.get_comments_node(mb_data.id)
+                if comments_data.service is None:
+                    comments_data.service = await self.get_comments_service(
+                        client, mb_data.service
+                    )
+                    if comments_data.service is None:
+                        comments_data.service = client.jid.userhostJID()
 
-                comments_data["uri"] = xmpp_uri.build_xmpp_uri(
+                comments_data.uri = xmpp_uri.build_xmpp_uri(
                     "pubsub",
-                    path=comments_service.full(),
-                    node=comments_node,
+                    path=comments_data.service.full(),
+                    node=comments_data.node,
                 )
 
             try:
-                await self._p.createNode(client, comments_service, comments_node, options)
+                await self._p.createNode(
+                    client, comments_data.service, comments_data.node, options
+                )
             except error.StanzaError as e:
                 if e.condition == "conflict":
                     log.info(
                         "node {} already exists on service {}".format(
-                            comments_node, comments_service
+                            comments_data.node, comments_data.service
                         )
                     )
                 else:
@@ -999,7 +1331,7 @@
                 if access == self._p.ACCESS_WHITELIST:
                     # for whitelist access we need to copy affiliations from parent item
                     comments_affiliations = await self._p.get_node_affiliations(
-                        client, service, node
+                        client, mb_data.service, mb_data.node
                     )
                     # …except for "member", that we transform to publisher
                     # because we wants members to be able to write to comments
@@ -1008,96 +1340,84 @@
                             comments_affiliations[jid_] = "publisher"
 
                     await self._p.set_node_affiliations(
-                        client, comments_service, comments_node, comments_affiliations
+                        client,
+                        comments_data.service,
+                        comments_data.node,
+                        comments_affiliations,
                     )
 
     def friendly_id(self, data):
         """Generate a user friendly id from title or content"""
         # TODO: rich content should be converted to plain text
         id_base = regex.url_friendly_text(
-            data.get("title")
-            or data.get("title_rich")
-            or data.get("content")
-            or data.get("content_rich")
-            or ""
+            data.title or data.title_rich or data.content or data.content_rich or ""
         )
-        if not data.get("user_friendly_id_suffix", True):
+        if not data.user_friendly_id_suffix:
             return id_base
         else:
             return f"{id_base}-{token_urlsafe(3)}"
 
-    def _mb_send(self, service, node, data, profile_key):
-        service = jid.JID(service) if service else None
-        node = node if node else NS_MICROBLOG
+    def _mb_send(self, data, profile_key) -> defer.Deferred[str | None]:
+        data = MbData.model_validate_json(data)
         client = self.host.get_client(profile_key)
-        data = data_format.deserialise(data)
-        return defer.ensureDeferred(self.send(client, data, service, node))
+        return defer.ensureDeferred(self.send(client, data))
 
     async def send(
         self,
         client: SatXMPPEntity,
-        data: dict,
-        service: Optional[jid.JID] = None,
-        node: Optional[str] = NS_MICROBLOG,
-    ) -> Optional[str]:
-        """Send XEP-0277's microblog data
+        data: MbData,
+    ) -> str | None:
+        """Send XEP-0277's microblog data.
 
-        @param data: microblog data (must include at least a "content" or a "title" key).
-            see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details
-        @param service: PubSub service where the microblog must be published
-            None to publish on profile's PEP
-        @param node: PubSub node to use (defaut to microblog NS)
-            None is equivalend as using default value
+        @param data: Microblog data (must include at least a "content" or a "title" key).
+            see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details.
         @return: ID of the published item
         """
         # TODO: check that all data keys are used, this would avoid sending publicly a private message
         #       by accident (e.g. if group plugin is not loaded, and "group*" key are not used)
-        if service is None:
-            service = client.jid.userhostJID()
-        if node is None:
-            node = NS_MICROBLOG
-
-        item_id = data.get("id")
-        if item_id is None:
-            if data.get("user_friendly_id", True):
-                item_id = self.friendly_id(data)
-                if not data.get("user_friendly_id_suffix", True):
+        if data.id is None:
+            if data.user_friendly_id:
+                data.id = self.friendly_id(data)
+                if not data.user_friendly_id_suffix:
                     # we have no random suffix, which can lead to conflict, so we check if
                     # the item doesn't already exist, and change ID if it's the case.
                     try:
-                        items, __ = await self._p.get_items(
-                            client, service, node, item_ids=[item_id]
+                        __, __ = await self._p.get_items(
+                            client, data.service, data.node, item_ids=[data.id]
                         )
                     except exceptions.NotFound:
                         pass
                     else:
                         # the item already exists
                         log.info(
-                            f"there is already an item with ID {item_id}, we have to "
-                            ' set the "user_friendly_id_suffix" flag.'
+                            f"there is already an item with ID {data.id!r}, we have to "
+                            'set the "user_friendly_id_suffix" flag.'
                         )
-                        data["user_friendly_id_suffix"] = True
-                        item_id = self.friendly_id(data)
+                        data.user_friendly_id_suffix = True
+                        data.id = self.friendly_id(data)
+
+        if not data.service:
+            data.service = client.jid.userhostJID()
 
         try:
-            await self._manage_comments(client, data, service, node, item_id, access=None)
+            await self._manage_comments(client, data, access=None)
         except error.StanzaError:
-            log.warning("Can't create comments node for item {}".format(item_id))
-        item = await self.mb_data_2_entry_elt(client, data, item_id, service, node)
+            log.warning("Can't create comments node for item {data.id}")
+        item_elt = await data.to_element(client)
 
         if not await self.host.trigger.async_point(
-            "XEP-0277_send", client, service, node, item, data
+            "XEP-0277_send", client, item_elt, data
         ):
             return None
 
         extra = {}
         for key in ("encrypted", "encrypted_for", "signed"):
-            value = data.get(key)
+            value = getattr(data.extra, key)
             if value is not None:
                 extra[key] = value
 
-        await self._p.publish(client, service, node, [item], extra=extra)
-        return item_id
+        await self._p.publish(client, data.service, data.node, [item_elt], extra=extra)
+        return data.id
 
     def _mb_repeat(
         self, service_s: str, node: str, item: str, extra_s: str, profile_key: str
@@ -1168,36 +1488,30 @@
             client, client.jid.userhostJID(), NS_MICROBLOG, entry_elt
         )
 
-    def _mb_preview(self, service, node, data, profile_key):
-        service = jid.JID(service) if service else None
-        node = node if node else NS_MICROBLOG
+    def _mb_preview(self, mb_data_s: str, profile_key: str) -> defer.Deferred[str]:
         client = self.host.get_client(profile_key)
-        data = data_format.deserialise(data)
-        d = defer.ensureDeferred(self.preview(client, data, service, node))
-        d.addCallback(data_format.serialise)
+        mb_data = MbData.model_validate_json(mb_data_s)
+        d = defer.ensureDeferred(self.preview(client, mb_data))
+        d.addCallback(lambda data: data.model_dump_json())
+        d = cast(defer.Deferred[str], d)
         return d
 
     async def preview(
         self,
         client: SatXMPPEntity,
-        data: dict,
-        service: Optional[jid.JID] = None,
-        node: Optional[str] = NS_MICROBLOG,
-    ) -> dict:
-        """Preview microblog data without publishing them
+        mb_data: MbData,
+    ) -> MbData:
+        """Preview microblog data without publishing them.
 
         params are the same as for [send]
         @return: microblog data as would be retrieved from published item
         """
-        if node is None:
-            node = NS_MICROBLOG
-
-        item_id = data.get("id", "")
-
+        if mb_data.service is None:
+            mb_data.service = client.jid.userhostJID()
         # we have to serialise then deserialise to be sure that all triggers are called
-        item_elt = await self.mb_data_2_entry_elt(client, data, item_id, service, node)
+        item_elt = await mb_data.to_element(client)
         item_elt.uri = pubsub.NS_PUBSUB
-        return await self.item_2_mb_data(client, item_elt, service, node)
+        return await MbData.from_element(client, item_elt, mb_data.service, mb_data.node)
 
     ## retract ##
 
@@ -1320,7 +1634,8 @@
             node = NS_MICROBLOG
         await self._p.rename_item(client, service, node, item_id, new_id)
 
-    def parse_comment_url(self, node_url):
+    @staticmethod
+    def parse_comment_url(node_url):
         """Parse a XMPP URI
 
         Determine the fields comments_service and comments_node of a microblog data
@@ -1499,6 +1814,8 @@
                 - items_metadata(dict): metadata as returned by [mb_get]
         @param profile_key: %(doc_profile_key)s
         """
+        # FIXME: check if this code must be removed.
+        raise NotImplementedError("Legacy code to be removed.")
 
         client = self.host.get_client(profile_key)
 
--- a/libervia/backend/plugins/plugin_xep_0470.py	Fri Jul 04 12:33:42 2025 +0200
+++ b/libervia/backend/plugins/plugin_xep_0470.py	Sun Aug 03 23:35:21 2025 +0200
@@ -29,6 +29,7 @@
 from libervia.backend.core.log import getLogger
 from libervia.backend.core.core_types import SatXMPPEntity
 from libervia.backend.core import exceptions
+from libervia.backend.plugins.plugin_xep_0277 import MbData
 from libervia.backend.tools.common import uri, data_format, date_utils
 from libervia.backend.tools.utils import as_deferred, xmpp_date
 
@@ -43,7 +44,7 @@
     C.PI_TYPE: C.PLUG_TYPE_XEP,
     C.PI_MODES: C.PLUG_MODE_BOTH,
     C.PI_PROTOCOLS: [],
-    C.PI_DEPENDENCIES: ["XEP-0060"],
+    C.PI_DEPENDENCIES: ["XEP-0060", "XEP-0277"],
     C.PI_MAIN: "PubsubAttachments",
     C.PI_HANDLER: "yes",
     C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""),
@@ -152,14 +153,12 @@
     async def on_mb_send(
         self,
         client: SatXMPPEntity,
-        service: jid.JID,
-        node: str,
         item: domish.Element,
-        data: dict,
+        data: MbData,
     ) -> bool:
         """trigger to create attachment node on each publication"""
         await self.create_attachments_node(
-            client, service, node, item["id"], autocreate=True
+            client, data.service, data.node, item["id"], autocreate=True
         )
         return True
 
--- a/libervia/cli/cmd_blog.py	Fri Jul 04 12:33:42 2025 +0200
+++ b/libervia/cli/cmd_blog.py	Sun Aug 03 23:35:21 2025 +0200
@@ -313,6 +313,7 @@
 
         if metadata already exist, it will be overwritten
         """
+        extra = mb_data.setdefault("extra", {})
         if self.args.comments is not None:
             mb_data["allow_comments"] = self.args.comments
         if self.args.tag:
@@ -324,13 +325,13 @@
         if self.args.no_id_suffix:
             mb_data["user_friendly_id_suffix"] = False
         if self.args.alt_links:
-            mb_data.setdefault("extra", {})["alt_links"] = self.args.alt_links
+            extra["alt_links"] = self.args.alt_links
         if self.args.encrypt:
-            mb_data["encrypted"] = True
+            extra["encrypted"] = True
         if self.args.sign:
-            mb_data["signed"] = True
+            extra["signed"] = True
         if self.args.encrypt_for:
-            mb_data["encrypted_for"] = {"targets": self.args.encrypt_for}
+            extra["encrypted_for"] = {"targets": self.args.encrypt_for}
         self.handle_attachments(mb_data)
 
 
@@ -352,7 +353,11 @@
     async def start(self):
         self.current_syntax = await self.get_current_syntax()
         self.pubsub_item = self.args.item
-        mb_data = {}
+        mb_data = {
+            "service": self.args.service or None,
+        }
+        if self.args.node:
+            mb_data["node"] = self.args.node
         self.set_mb_data_from_args(mb_data)
         if self.pubsub_item:
             mb_data["id"] = self.pubsub_item
@@ -361,8 +366,6 @@
 
         try:
             item_id = await self.host.bridge.mb_send(
-                self.args.service,
-                self.args.node,
                 data_format.serialise(mb_data),
                 self.profile,
             )
@@ -708,6 +711,9 @@
         await asyncio.gather(*coroutines)
 
     async def publish(self, content, mb_data):
+        mb_data["service"] = self.pubsub_service or None
+        if self.pubsub_node:
+            mb_data["node"] = self.pubsub_node
         await self.set_mb_data_content(content, mb_data)
 
         if self.pubsub_item:
@@ -715,9 +721,7 @@
 
         mb_data = data_format.serialise(mb_data)
 
-        await self.host.bridge.mb_send(
-            self.pubsub_service, self.pubsub_node, mb_data, self.profile
-        )
+        await self.host.bridge.mb_send(mb_data, self.profile)
         self.disp("Blog item published")
 
     def get_tmp_suff(self):
--- a/libervia/frontends/quick_frontend/quick_blog.py	Fri Jul 04 12:33:42 2025 +0200
+++ b/libervia/frontends/quick_frontend/quick_blog.py	Sun Aug 03 23:35:21 2025 +0200
@@ -294,9 +294,9 @@
         if self.blog.new_message_target == C.GROUP:
             mb_data["groups"] = list(self.blog.targets)
 
+        mb_data["service"] = self.service
+        mb_data["node"] = self.node
         self.blog.host.bridge.mb_send(
-            str(self.service or ""),
-            self.node or "",
             data_format.serialise(mb_data),
             profile=self.blog.profile,
         )