changeset 4386:c055042c01e3

component Email gateway: Convert mailing list to pubsub nodes: - `Credentials` is now a Pydantic based model. - Mailing list related emails are now detected hand saved as pubsub items, using XEP-0277 blog items, and creating suitable nodes. The ID of the mailing list is used for root node. - Mailing list items are currently restricted to recipient associated JID. - Words in square bracket in title `[Like][That]` are removed and converted to blog categories. - Method to convert between MbData and email (in both directions) have been created. rel 462
author Goffi <goffi@goffi.org>
date Sun, 03 Aug 2025 23:45:48 +0200
parents a1ac33fe6b97
children a6270030968d
files libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py libervia/backend/plugins/plugin_comp_email_gateway/__init__.py libervia/backend/plugins/plugin_comp_email_gateway/imap.py libervia/backend/plugins/plugin_comp_email_gateway/models.py libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py
diffstat 6 files changed, 540 insertions(+), 66 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py	Sun Aug 03 23:45:45 2025 +0200
+++ b/libervia/backend/plugins/plugin_comp_ap_gateway/__init__.py	Sun Aug 03 23:45:48 2025 +0200
@@ -33,6 +33,7 @@
     Set,
     Tuple,
     Union,
+    cast,
     overload,
 )
 from urllib import parse
@@ -59,6 +60,7 @@
 from libervia.backend.core.log import getLogger
 from libervia.backend.memory import persistent
 from libervia.backend.memory.sqla_mapping import History, SubscriptionState
+from libervia.backend.plugins.plugin_xep_0277 import XEP_0277
 from libervia.backend.tools import utils
 from libervia.backend.tools.common import data_format, tls, uri
 from libervia.backend.tools.common.async_utils import async_lru
@@ -157,7 +159,7 @@
         self._p = host.plugins["XEP-0060"]
         self._a = host.plugins["XEP-0084"]
         self._e = host.plugins["XEP-0106"]
-        self._m = host.plugins["XEP-0277"]
+        self._m = cast(XEP_0277, host.plugins["XEP-0277"])
         self._v = host.plugins["XEP-0292"]
         self._refs = host.plugins["XEP-0372"]
         self._r = host.plugins["XEP-0424"]
@@ -526,6 +528,7 @@
 
         for now, only handle XMPP items to convert to AP
         """
+        assert self.client is not None
         url_type, url_args = self.parse_apurl(url)
         if url_type == TYPE_ITEM:
             try:
@@ -2228,6 +2231,7 @@
         @param public: if True, the activity will be addressed to public namespace
         @return: actor_id of the entity deleting the item, activity to send
         """
+        assert self.client is not None
         if node is None:
             node = self._m.namespace
 
@@ -2763,7 +2767,7 @@
                 create_kwargs={"subscribed": True},
             )
         else:
-            # it is a root item (i.e. not a reply to an other item)
+            # it is a root item (i.e. not a reply to another item)
             create = node == self._events.namespace
             cached_node = await self.host.memory.storage.get_pubsub_node(
                 client, service, node, with_subscriptions=True, create=create
--- a/libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py	Sun Aug 03 23:45:45 2025 +0200
+++ b/libervia/backend/plugins/plugin_comp_ap_gateway/pubsub_service.py	Sun Aug 03 23:45:48 2025 +0200
@@ -16,7 +16,7 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import Optional, Tuple, List, Dict, Any, Union
+from typing import Optional, Tuple, List, Dict, Any
 from urllib.parse import urlparse
 from pathlib import Path
 from base64 import b64encode
@@ -25,7 +25,7 @@
 from twisted.internet import defer, threads
 from twisted.words.protocols.jabber import jid, error
 from twisted.words.xish import domish
-from wokkel import rsm, pubsub, disco
+from wokkel import rsm, pubsub
 
 from libervia.backend.core.i18n import _
 from libervia.backend.core import exceptions
@@ -265,7 +265,7 @@
         avatar_data = await self.get_avatar_data(client, requestor_actor_id, ap_account)
         return self.apg._a.build_item_metadata_elt(avatar_data)
 
-    def _blocking_b_6_4_encode_avatar(self, avatar_data: Dict[str, Any]) -> None:
+    def _blocking_b64encode_avatar(self, avatar_data: Dict[str, Any]) -> None:
         with avatar_data["path"].open("rb") as f:
             avatar_data["base64"] = b64encode(f.read()).decode()
 
@@ -287,7 +287,7 @@
             )
             if "base64" not in avatar_data:
                 await threads.deferToThread(
-                    self._blocking_b_6_4_encode_avatar, avatar_data
+                    self._blocking_b64encode_avatar, avatar_data
                 )
         else:
             if len(itemIdentifiers) > 1:
@@ -300,7 +300,7 @@
             if cache_data is None:
                 raise error.StanzaError("item-not-found")
             avatar_data = {"cache_uid": item_id, "path": cache_data["path"]}
-            await threads.deferToThread(self._blocking_b_6_4_encode_avatar, avatar_data)
+            await threads.deferToThread(self._blocking_b64encode_avatar, avatar_data)
 
         return self.apg._a.build_item_data_elt(avatar_data)
 
--- a/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py	Sun Aug 03 23:45:45 2025 +0200
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py	Sun Aug 03 23:45:48 2025 +0200
@@ -23,7 +23,7 @@
 from email.mime.application import MIMEApplication
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
-from email.utils import formataddr, getaddresses, parseaddr
+from email.utils import formataddr, formatdate, getaddresses, make_msgid, parseaddr
 from functools import partial
 import hashlib
 from pathlib import Path
@@ -44,6 +44,7 @@
 from wokkel import data_form, disco, iwokkel
 from zope.interface import implementer
 
+from libervia.backend import G
 from libervia.backend.core import exceptions
 from libervia.backend.core.constants import Const as C
 from libervia.backend.core.core_types import SatXMPPComponent, SatXMPPEntity
@@ -51,12 +52,13 @@
 from libervia.backend.core.log import getLogger
 from libervia.backend.memory.persistent import LazyPersistentBinaryDict
 from libervia.backend.memory.sqla import select
-from libervia.backend.memory.sqla_mapping import PrivateIndBin
+from libervia.backend.memory.sqla_mapping import AccessModel, Affiliation, PrivateIndBin, PublishModel, PubsubAffiliation
 from libervia.backend.models.core import MessageData
 from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import (
     EmailGWPubsubService,
 )
 from libervia.backend.plugins.plugin_exp_gre import GRE, GetDataHandler
+from libervia.backend.plugins.plugin_misc_text_syntaxes import TextSyntaxes
 from libervia.backend.plugins.plugin_sec_gre_encrypter_openpgp import NS_GRE_OPENPGP
 from libervia.backend.plugins.plugin_sec_gre_formatter_mime import NS_GRE_MIME
 from libervia.backend.plugins.plugin_xep_0033 import (
@@ -67,9 +69,10 @@
 from libervia.backend.plugins.plugin_xep_0077 import XEP_0077
 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
 from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency, XEP_0131
+from libervia.backend.plugins.plugin_xep_0277 import Comment, MbData, XEP_0277
 from libervia.backend.plugins.plugin_xep_0373 import binary_to_ascii_armor
 from libervia.backend.plugins.plugin_xep_0498 import XEP_0498
-from libervia.backend.tools.common import regex
+from libervia.backend.tools.common import date_utils, regex, uri
 from libervia.backend.tools.utils import aio
 
 from .imap import IMAPClientFactory
@@ -91,7 +94,8 @@
     C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
     C.PI_PROTOCOLS: [],
     C.PI_DEPENDENCIES: [
-        "XEP-0033", "XEP-0077", "XEP-0106", "XEP-0498", "GRE", "GRE-MIME", "GRE-OpenPGP"
+        "XEP-0033", "XEP-0077", "XEP-0106", "XEP-0277", "XEP-0498", "GRE", "GRE-MIME",
+        "GRE-OpenPGP", "PUBSUB_CACHE", "TEXT_SYNTAXES"
     ],
     C.PI_RECOMMENDATIONS: [],
     C.PI_MAIN: "EmailGatewayComponent",
@@ -140,8 +144,10 @@
         )
         self._e = cast(XEP_0106, host.plugins["XEP-0106"])
         self._shim = cast(XEP_0131, host.plugins["XEP-0131"])
+        self._mb = cast(XEP_0277, host.plugins["XEP-0277"])
         self._pfs = cast(XEP_0498, host.plugins["XEP-0498"])
         self._gre = cast(GRE, host.plugins["GRE"])
+        self._syntax = cast(TextSyntaxes, host.plugins["TEXT_SYNTAXES"])
         # TODO: For the moment, all credentials are kept in cache; we should only keep the
         #   X latest.
         self.users_data: dict[jid.JID, UserData] = {}
@@ -173,7 +179,7 @@
             )
             result = await session.execute(query)
             return {
-                jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value
+                jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): Credentials(**p.value)
                 for p in result.scalars()
             }
 
@@ -182,7 +188,7 @@
         registered_data = await self.get_registered_users()
         for user_jid, credentials in registered_data.items():
             user_data = self.users_data[user_jid] = UserData(credentials=credentials)
-            if not credentials["imap_success"]:
+            if not credentials.imap_success:
                 log.warning(
                     f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user "
                     "won't receive message from this gateway."
@@ -340,7 +346,7 @@
         return mess_data
 
     def jid_to_email(
-        self, client: SatXMPPEntity, address_jid: jid.JID, credentials: dict[str, str]
+        self, client: SatXMPPEntity, address_jid: jid.JID, credentials: Credentials
     ) -> str:
         """Convert a JID to an email address.
 
@@ -354,7 +360,7 @@
         if address_jid and address_jid.host.endswith(str(client.jid)):
             return self._e.unescape(address_jid.user)
         else:
-            email_address = credentials["user_email"]
+            email_address = credentials.user_email
             if address_jid:
                 email_address = formataddr((f"xmpp:{address_jid}", email_address))
             return email_address
@@ -404,7 +410,7 @@
         assert isinstance(body, bytes)
         credentials = await self.get_credentials(from_jid)
 
-        sender_domain = credentials["user_email"].split("@", 1)[-1]
+        sender_domain = credentials.user_email.split("@", 1)[-1]
         recipients = []
         if to_email is not None:
             recipients.append(to_email.encode())
@@ -422,14 +428,14 @@
             raise exceptions.InternalError("No recipient found.")
 
         await smtp.sendmail(
-            credentials["smtp_host"].encode(),
-            credentials["user_email"].encode(),
+            credentials.smtp_host.encode(),
+            credentials.user_email.encode(),
             recipients,
             body,
             senderDomainName=sender_domain,
-            port=int(credentials["smtp_port"]),
-            username=credentials["smtp_username"].encode(),
-            password=credentials["smtp_password"].encode(),
+            port=int(credentials.smtp_port),
+            username=credentials.smtp_username.encode(),
+            password=credentials.smtp_password.encode(),
             requireAuthentication=True,
             # TODO: only STARTTLS is supported right now, implicit TLS should be supported
             #   too.
@@ -468,16 +474,16 @@
 
         if isinstance(body, bytes):
             assert to_email is not None
-            sender_domain = credentials["user_email"].split("@", 1)[-1]
+            sender_domain = credentials.user_email.split("@", 1)[-1]
             await smtp.sendmail(
-                credentials["smtp_host"].encode(),
-                credentials["user_email"].encode(),
+                credentials.smtp_host.encode(),
+                credentials.user_email.encode(),
                 [to_email.encode()],
                 body,
                 senderDomainName=sender_domain,
-                port=int(credentials["smtp_port"]),
-                username=credentials["smtp_username"].encode(),
-                password=credentials["smtp_password"].encode(),
+                port=int(credentials.smtp_port),
+                username=credentials.smtp_username.encode(),
+                password=credentials.smtp_password.encode(),
                 requireAuthentication=True,
                 # TODO: only STARTTLS is supported right now, implicit TLS should be supported
                 #   too.
@@ -489,7 +495,7 @@
         if subject is not None:
             msg["Subject"] = subject
         msg["From"] = formataddr(
-            (credentials["user_name"] or None, credentials["user_email"])
+            (credentials.user_name or None, credentials.user_email)
         )
         if extra.addresses:
             assert extra.addresses.to
@@ -527,7 +533,7 @@
             assert to_email is not None
             msg["To"] = to_email
 
-        sender_domain = credentials["user_email"].split("@", 1)[-1]
+        sender_domain = credentials.user_email.split("@", 1)[-1]
 
         if extra.headers:
             if extra.headers.keywords:
@@ -543,14 +549,14 @@
                 msg["Autocrypt"] = extra.headers.autocrypt
 
         await smtp.sendmail(
-            credentials["smtp_host"].encode(),
-            credentials["user_email"].encode(),
+            credentials.smtp_host.encode(),
+            credentials.user_email.encode(),
             [to_email.encode()],
             msg.as_bytes(),
             senderDomainName=sender_domain,
-            port=int(credentials["smtp_port"]),
-            username=credentials["smtp_username"].encode(),
-            password=credentials["smtp_password"].encode(),
+            port=int(credentials.smtp_port),
+            username=credentials.smtp_username.encode(),
+            password=credentials.smtp_password.encode(),
             requireAuthentication=True,
             # TODO: only STARTTLS is supported right now, implicit TLS should be supported
             #   too.
@@ -798,10 +804,10 @@
         @param email: Parsed email.
         """
         assert self.client is not None
-        user_email = user_data.credentials["user_email"]
-        name, email_addr = parseaddr(email["from"])
-        email_addr = email_addr.lower()
-        from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None))
+        user_email = user_data.credentials.user_email
+        author_name, author_email = parseaddr(email["from"])
+        author_email = author_email.lower()
+        from_jid = jid.JID(None, (self._e.escape(author_email), self.client.jid.host, None))
 
         # Get the email body
         body_mime = email.get_body(("plain",))
@@ -854,9 +860,9 @@
             ]
 
         # Set noreply flag
-        # The is no flag to indicate a no-reply message, so we check common user parts in
-        # from and reply-to headers.
-        from_addresses = [email_addr]
+        # There is no flag to indicate a no-reply message, so we check common user parts
+        # in from and reply-to headers.
+        from_addresses = [author_email]
         if reply_to_addresses:
             from_addresses.extend(
                 addr for a in reply_to_addresses if (addr := parseaddr(a)[1])
@@ -927,12 +933,382 @@
             await self.handle_attachment(part, user_jid)
 
         client = self.client.get_virtual_client(from_jid)
+        # Now that the message is parsed, we check if it's a mailing list.
+        list_ids   = email.get_all("list-id")
+        if list_ids:
+            try:
+                await self.handle_mailing_list(
+                    client,
+                    user_data,
+                    author_name,
+                    author_email,
+                    list_ids[-1],
+                    email,
+                    user_jid,
+                    body,
+                    subject,
+                    extra
+                )
+            except exceptions.DataError as e:
+                log.warning(f"Can't parse mailing list email: {e}.")
+            except Exception:
+                log.exception("Can't parse mailing list email.")
+        else:
+            await client.sendMessage(
+                user_jid,
+                {"": body},
+                {"": subject} if subject else None,
+                extra=extra,
+            )
+    def parse_references(self, email: EmailMessage) -> list[str]:
+        """Extract message IDs from the "References" header.
 
-        await client.sendMessage(
-            user_jid,
-            {"": body},
-            {"": subject} if subject else None,
-            extra=extra,
+        @param email: The parsed email message.
+        @returns: Message IDs
+        """
+        references = []
+        for header in email.get_all('references', []):
+            for token in header.split():
+                _, address = parseaddr(token)
+                if address:
+                    references.append(address)
+        return references
+
+    def email_to_mb_data(
+        self,
+        email: EmailMessage,
+        service: jid.JID,
+        node: str,
+        item_id: str,
+        author_name: str,
+        author_email: str
+    ) -> MbData:
+        """Convert an email to blog data.
+
+        @param email: email to convert.
+        @return: Blog data.
+        """
+        tags = set()
+
+        # Title
+        title = email.get('subject', '')
+        def strip_tags(m):
+            value = m.group()[1:-1].strip().lower()
+            if value:
+                tags.add(value)
+            return ""
+        title = re.sub(r"\[.+?\]", strip_tags, title).strip()
+
+        # Dates
+        date_header = email.get('Date')
+        if date_header:
+            try:
+                published = date_utils.date_parse(date_header)
+            except ValueError:
+                published = None
+        else:
+            published = None
+
+        # Body
+        content = None
+        content_xhtml = None
+
+        if email.is_multipart():
+            # We'll collect possible plain and html content parts
+            plain_parts = []
+            html_parts = []
+
+            for part in email.walk():
+                content_type = part.get_content_type()
+                content_disposition = part.get('Content-Disposition', '').lower()
+
+                # Skip attachments
+                if 'attachment' in content_disposition:
+                    continue
+
+                payload = part.get_payload(decode=True)
+                if payload is None:
+                    continue
+                payload = cast(bytes, payload)
+
+                charset = part.get_content_charset() or 'utf-8'
+                try:
+                    payload_text = payload.decode(charset, errors='replace')
+                except (LookupError, TypeError):
+                    payload_text = payload.decode('utf-8', errors='replace')
+
+                if content_type == 'text/plain':
+                    plain_parts.append(payload_text)
+                elif content_type == 'text/html':
+                    html_parts.append(payload_text)
+
+            # Prefer first valid parts for safety
+            if plain_parts:
+                content = plain_parts[0]
+            if html_parts:
+                content_xhtml = html_parts[0]
+        else:
+            # Single part email
+            payload = email.get_payload(decode=True)
+            if payload is not None:
+                payload = cast(bytes, payload)
+                charset = email.get_content_charset() or 'utf-8'
+
+                try:
+                    content_text = payload.decode(charset, errors='replace')
+                except (LookupError, TypeError):
+                    content_text = payload.decode('utf-8', errors='replace')
+
+                content_type = email.get_content_type()
+                if content_type == 'text/plain':
+                    content = content_text
+                elif content_type == 'text/html':
+                    content_xhtml = content_text
+
+        if content_xhtml is not None:
+            content_xhtml = self._syntax.clean_xhtml(content_xhtml)
+
+        return MbData(
+            service=service,
+            node=node,
+            id=item_id,
+            published=published,
+            title=title,
+            content=content,
+            content_xhtml=content_xhtml,
+            author=author_name,
+            author_email=author_email,
+            tags=list(tags)
+        )
+
+    async def mb_data_to_email(
+        self,
+        credentials: Credentials,
+        mb_data: MbData
+    ) -> EmailMessage:
+        """Convert blog data to an email message.
+
+        @param mb_data: Blog data to convert.
+        @return: Email message.
+        """
+        email = EmailMessage()
+
+        # Title
+        title_parts = []
+        tags = []
+
+        for tag in mb_data.tags:
+            if stripped_tag := tag.strip():
+                title_parts.append(f"[{stripped_tag}]")
+                tags.append(stripped_tag)
+
+        if mb_data.title:
+            title_parts.append(mb_data.title)
+
+        email['Subject'] = ' '.join(title_parts)
+
+        if tags:
+            email['Keywords'] = ', '.join(tags)
+
+        # From
+        # FIXME: Check email according to sender.
+        if mb_data.author and mb_data.author_email:
+            email['From'] = f"{mb_data.author} <{mb_data.author_email}>"
+        elif mb_data.author:
+            email['From'] = f"{mb_data.author} <{credentials.user_email}>"
+        elif mb_data.author_email:
+            email['From'] = mb_data.author_email
+        else:
+            email['From'] = credentials.user_email
+
+        # Message ID
+        if mb_data.id:
+            email['Message-ID'] = f"<{mb_data.id}>"
+        else:
+            msg_id = make_msgid()
+            email['Message-ID'] = msg_id
+            mb_data.id = msg_id[1:-1]
+
+        # In-Reply-To
+        if mb_data.in_reply_tos:
+            # Use the first reply-to reference
+            in_reply_to = mb_data.in_reply_tos[0]
+            if in_reply_to.ref:
+                email['In-Reply-To'] = f"<{in_reply_to.ref}>"
+
+        # Dates
+        date_set = False
+        if mb_data.published is not None:
+            try:
+                email['Date'] = formatdate(mb_data.published, localtime=True)
+                date_set = True
+            except Exception:
+                pass
+
+        if not date_set and mb_data.updated is not None:
+            try:
+                email['Date'] = formatdate(mb_data.updated, localtime=True)
+            except Exception:
+                pass
+
+        # Content
+        if mb_data.content_xhtml:
+            # XHTML content
+            xhtml_content = mb_data.content_xhtml
+            if not mb_data.content:
+                mb_data.content = await self._syntax.convert(
+                    xhtml_content,
+                    self._syntax.SYNTAX_XHTML,
+                    self._syntax.SYNTAX_TEXT,
+                    False,
+                )
+
+            email.set_content(mb_data.content)
+            email.add_alternative(xhtml_content, subtype='html')
+        elif mb_data.content:
+            email.set_content(mb_data.content)
+        else:
+            email.set_content("")
+
+        return email
+
+    async def handle_mailing_list(
+        self,
+        client: SatXMPPEntity,
+        user_data: UserData,
+        author_name: str,
+        author_email: str,
+        list_id_header: str,
+        email: EmailMessage,
+        user_jid: jid.JID,
+        body: str,
+        subject: str|None,
+        extra: dict
+    ) -> None:
+        """Handle emails from mailing lists.
+
+        Mailing list emails are converted to pubsub blogs.
+        """
+        assert self.client is not None
+        pubsub_service = self.client.jid
+        message_id_list = email.get_all("message-id")
+        if not message_id_list:
+            raise exceptions.DataError("Missing message ID.")
+        message_id = message_id_list[-1].strip()
+        if message_id.startswith("<") and message_id.endswith(">"):
+            message_id = message_id[1:-1]
+        if not message_id:
+            raise exceptions.DataError("Emtpy message ID.")
+
+        list_name, list_id = parseaddr(list_id_header, strict=False)
+        list_name = list_name.strip()
+        list_id = list_id.strip()
+        if not list_id:
+            raise exceptions.DataError(
+                f"Mailing list ID is empty, we can't parse id: {list_id_header=}."
+            )
+        root_node = await G.storage.get_pubsub_node(
+            client,
+            pubsub_service,
+            list_id,
+            with_subscriptions=True,
+            create=True,
+            create_kwargs={
+                "access_model": AccessModel.whitelist,
+                "publish_model": PublishModel.publishers,
+                "affiliations": [PubsubAffiliation(
+                    entity = user_jid,
+                    affiliation = Affiliation.owner
+                )],
+                "subscribed": True
+            },
+        )
+        assert root_node is not None
+
+        in_reply_to_value = ''.join(email.get_all("in-reply-to", []))
+        __, in_reply_to = parseaddr(in_reply_to_value)
+        references = self.parse_references(email)
+        if in_reply_to:
+            if not references:
+                log.warning(
+                    '"References" header should not be empty when "In-Reply-To" is set.'
+                )
+                references = [in_reply_to]
+            else:
+                if references[-1] != in_reply_to:
+                    log.warning('Last ID in "References" should be "In-Reply-To".')
+                    references.append(in_reply_to)
+        if references:
+            # We check that the top message of the thread has a corresponding item.
+            top_item_id = references[0]
+            parent_node_name = self._mb.get_comments_node(top_item_id)
+            parent_node_uri = uri.build_xmpp_uri(
+                "pubsub",
+                path=root_node.service.full(),
+                node=parent_node_name,
+            )
+            top_items = await G.storage.get_items(root_node, item_ids=[top_item_id])
+            if not top_items:
+                # The top item is missing, we make an empty one.
+                empty_item_data = MbData(
+                    service=pubsub_service,
+                    node=root_node.name,
+                    id=top_item_id,
+                    title="missing item",
+                    content="missing item",
+                    author_jid=pubsub_service,
+                    comments=[
+                        Comment(
+                            uri=parent_node_uri,
+                            service=pubsub_service,
+                            node=parent_node_name
+                        )
+                    ]
+                )
+                await G.storage.cache_pubsub_items(
+                    client,
+                    root_node,
+                    [await empty_item_data.to_element(client)]
+                )
+            parent_node = await G.storage.get_pubsub_node(
+                client,
+                pubsub_service,
+                parent_node_name,
+                with_subscriptions=True,
+                create=True,
+                create_kwargs={
+                    "access_model": AccessModel.whitelist,
+                    "publish_model": PublishModel.publishers,
+                    "subscribed": True,
+                    "affiliations": [PubsubAffiliation(
+                        entity = user_jid,
+                        affiliation = Affiliation.owner
+                    )],
+                },
+            )
+        else:
+            parent_node = root_node
+            parent_node_name = cast(str, root_node.name)
+
+        mb_data = self.email_to_mb_data(
+            email,
+            pubsub_service,
+            parent_node_name,
+            message_id,
+            author_name,
+            author_email
+        )
+        mb_data.comments.append(
+            Comment(
+                service = pubsub_service,
+                node = self._mb.get_comments_node(message_id),
+            )
+        )
+        await G.storage.cache_pubsub_items(
+            client,
+            parent_node,
+            [await mb_data.to_element(client)],
+            [mb_data.model_dump(mode="json")]
         )
 
     async def handle_attachment(self, part: EmailMessage, recipient_jid: jid.JID) -> None:
@@ -1009,7 +1385,7 @@
             connected,
         )
         reactor.connectTCP(
-            credentials["imap_host"], int(credentials["imap_port"]), factory
+            credentials.imap_host, int(credentials.imap_port), factory
         )
         await connected
 
@@ -1051,7 +1427,9 @@
             return True
 
         self.validate_imap_smtp_form(submit_form)
-        credentials = {key: field.value for key, field in submit_form.fields.items()}
+        credentials = Credentials(**{
+            key: field.value for key, field in submit_form.fields.items()
+        })
         user_data = self.users_data.get(from_jid)
         if user_data is None:
             # The user is not in cache, we cache current credentials.
@@ -1064,12 +1442,12 @@
             await self.connect_imap(from_jid, user_data)
         except Exception as e:
             log.warning(f"Can't connect to IMAP server for {from_jid}")
-            credentials["imap_success"] = False
+            credentials.imap_success = False
             await self.storage.aset(key, credentials)
             raise e
         else:
             log.debug(f"Connection successful to IMAP server for {from_jid}")
-            credentials["imap_success"] = True
+            credentials.imap_success = True
             await self.storage.aset(key, credentials)
             return True
 
@@ -1082,7 +1460,7 @@
     ) -> None:
         from_jid = jid.JID(iq_elt["from"]).userhostJID()
         credentials = await self.get_credentials(from_jid)
-        form.addField(data_form.Field(var="sender_id", value=credentials["user_email"]))
+        form.addField(data_form.Field(var="sender_id", value=credentials.user_email))
 
 
 
--- a/libervia/backend/plugins/plugin_comp_email_gateway/imap.py	Sun Aug 03 23:45:45 2025 +0200
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/imap.py	Sun Aug 03 23:45:48 2025 +0200
@@ -225,8 +225,8 @@
         """
         credentials = user_data.credentials
         self.user_data = user_data
-        self.username = credentials["imap_username"]
-        self.password = credentials["imap_password"]
+        self.username = credentials.imap_username
+        self.password = credentials.imap_password
         self.on_new_email = on_new_email
         self._connected = connected
 
--- a/libervia/backend/plugins/plugin_comp_email_gateway/models.py	Sun Aug 03 23:45:45 2025 +0200
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/models.py	Sun Aug 03 23:45:48 2025 +0200
@@ -17,11 +17,24 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 from dataclasses import dataclass
-from typing import Any
+from pydantic import BaseModel, ConfigDict
 from twisted.mail import imap4
 
 
-Credentials = dict[str, Any]
+class Credentials(BaseModel):
+    model_config = ConfigDict(extra='forbid')
+    user_name: str
+    user_email: str
+    smtp_host: str
+    smtp_port: int
+    smtp_username: str
+    smtp_password: str
+    imap_success: bool = False
+    imap_host: str
+    imap_port: int
+    imap_username: str
+    imap_password: str
+
 
 
 @dataclass
--- a/libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py	Sun Aug 03 23:45:45 2025 +0200
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/pubsub_service.py	Sun Aug 03 23:45:48 2025 +0200
@@ -16,18 +16,22 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
+
 from twisted.internet import defer
-from twisted.words.protocols.jabber import jid, error
+from twisted.words.protocols.jabber import error, jid
 from twisted.words.xish import domish
 from wokkel import data_form, disco, pubsub, rsm
 
+from libervia.backend import G
+from libervia.backend.core import exceptions
+from libervia.backend.core.core_types import SatXMPPComponent
 from libervia.backend.core.i18n import _
-from libervia.backend.core.constants import Const as C
 from libervia.backend.core.log import getLogger
+from libervia.backend.memory.sqla_mapping import AccessModel, Affiliation
+from libervia.backend.plugins.plugin_pubsub_cache import PubsubCache
 from libervia.backend.plugins.plugin_xep_0498 import NodeData
 from libervia.backend.tools.utils import ensure_deferred
-
 if TYPE_CHECKING:
     from . import EmailGatewayComponent
 
@@ -57,31 +61,106 @@
         self.host = self.gateway.host
         self.service = service
         self._pfs = service._pfs
+        self._ps_cache = cast(PubsubCache, G.host.plugins["PUBSUB_CACHE"])
         super().__init__()
 
+    @property
+    def client(self) -> SatXMPPComponent:
+        client = self.gateway.client
+        assert client is not None
+        return client
+
     def getNodes(
         self, requestor: jid.JID, service: jid.JID, nodeIdentifier: str
     ) -> defer.Deferred[list[str]]:
-        return defer.succeed([self._pfs.namespace])
+        return defer.ensureDeferred(self.get_nodes(requestor, service, nodeIdentifier))
+
+    async def get_nodes(
+        self,
+        requestor: jid.JID,
+        service: jid.JID,
+        node_id: str
+    ) -> list[str]:
+        nodes = await G.storage.get_pubsub_nodes(self.client, self.client.jid)
+        return [self._pfs.namespace] + [cast(str, node.name) for node in nodes]
 
     @ensure_deferred
     async def items(
         self,
         request: rsm.PubSubRequest,
     ) -> tuple[list[domish.Element], rsm.RSMResponse | None]:
-        client = self.gateway.client
-        assert client is not None
-        sender = request.sender.userhostJID()
-        if not client.is_local(sender):
+        client = self.client
+        requestor_jid = request.sender.userhostJID()
+        if not client.is_local(requestor_jid):
             raise error.StanzaError("forbidden")
 
         if request.nodeIdentifier != self._pfs.namespace:
-            return [], None
+            return await self.items_from_mailing_list(request, requestor_jid)
 
-        files = await self.host.memory.get_files(client, sender)
+        files = await self.host.memory.get_files(client, requestor_jid)
         node_data = NodeData.from_files_data(client.jid, files)
         return node_data.to_elements(), None
 
+    async def items_from_mailing_list(
+        self,
+        request: rsm.PubSubRequest,
+        requestor_jid: jid.JID
+    ) -> tuple[list[domish.Element], rsm.RSMResponse|None]:
+        """Handle items coming from mailing lists.
+
+        @param request: Pubsub request.
+        @param requestor_jid: Bare jid of the requestor.
+        @return: Items matching request, if allowed, and RSM response.
+        @raise error.StanzaError: One of:
+            - ``item-not-found`` if no corresponding node or item if found
+            - ``forbidden`` if the requestor does not have sufficient privileges
+        """
+        node = request.nodeIdentifier
+        node = await G.storage.get_pubsub_node(
+            self.client,
+            self.client.jid,
+            node,
+            with_affiliations=True
+        )
+        if node is None:
+            raise error.StanzaError("item-not-found")
+
+        match str(node.access_model):
+            case AccessModel.open:
+                pass
+            case AccessModel.whitelist:
+                for affiliation in node.affiliations:
+                    if (
+                        affiliation.entity == requestor_jid
+                        and affiliation.affiliation in {
+                            Affiliation.owner, Affiliation.publisher, Affiliation.member
+                        }
+                    ):
+                        break
+                else:
+                    raise error.StanzaError("forbidden")
+            case _:
+                raise exceptions.InternalError(
+                    f"Unmanaged access model: {node.access_model}"
+                )
+
+        pubsub_items, metadata = await self._ps_cache.get_items_from_cache(
+            self.client,
+            node,
+            request.maxItems,
+            request.itemIdentifiers,
+            request.subscriptionIdentifier,
+            request.rsm
+        )
+        if rsm_data := metadata.get("rsm"):
+            rsm_response = rsm.RSMResponse(**rsm_data)
+        else:
+            rsm_response = None
+        return (
+            [cast(domish.Element, ps_item.data) for ps_item in pubsub_items],
+            rsm_response
+        )
+
     @ensure_deferred
     async def retract(self, request: rsm.PubSubRequest) -> None:
         client = self.gateway.client