view libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4401:ae26233b655f default tip

doc (components): Add message cleaning section to email gateway doc: fix 464
author Goffi <goffi@goffi.org>
date Thu, 11 Sep 2025 21:17:51 +0200
parents fe09446a09ce
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Email Gateway Component
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
from email import encoders
from email.header import decode_header
from email.message import EmailMessage
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr, formatdate, getaddresses, make_msgid, parseaddr
from functools import partial
import hashlib
from pathlib import Path
import re
import shutil
import tempfile
from typing import NamedTuple, TYPE_CHECKING, cast

from pydantic import BaseModel
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
from twisted.mail import smtp
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber import error as jabber_error
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend import G
from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPComponent, SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.memory.persistent import LazyPersistentBinaryDict
from libervia.backend.memory.sqla import select
from libervia.backend.memory.sqla_mapping import AccessModel, Affiliation, PrivateIndBin, PublishModel, PubsubAffiliation
from libervia.backend.models.core import MessageData
from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import (
    EmailGWPubsubService,
)
from libervia.backend.plugins.plugin_exp_gre import GRE, GetDataHandler
from libervia.backend.plugins.plugin_misc_text_syntaxes import TextSyntaxes
from libervia.backend.plugins.plugin_sec_gre_encrypter_openpgp import NS_GRE_OPENPGP
from libervia.backend.plugins.plugin_sec_gre_formatter_mime import NS_GRE_MIME
from libervia.backend.plugins.plugin_xep_0033 import (
    AddressType,
    AddressesData,
    RECIPIENT_FIELDS,
)
from libervia.backend.plugins.plugin_xep_0077 import XEP_0077
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency, XEP_0131
from libervia.backend.plugins.plugin_xep_0277 import Comment, MbData, XEP_0277
from libervia.backend.plugins.plugin_xep_0373 import binary_to_ascii_armor
from libervia.backend.plugins.plugin_xep_0498 import XEP_0498
from libervia.backend.plugins.plugin_xep_0499 import XEP_0499, ExtDiscoOptions
from libervia.backend.tools.common import date_utils, regex, uri
from libervia.backend.tools.utils import aio

from .cleaning import convert_to_html_and_detect_noise
from .imap import IMAPClientFactory
from .models import Credentials, UserData

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend


log = getLogger(__name__)

IMPORT_NAME = "email-gateway"
NAME = "Libervia Email Gateway"

PLUGIN_INFO = {
    C.PI_NAME: "Email Gateway Component",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: [
        "XEP-0033", "XEP-0077", "XEP-0106", "XEP-0277", "XEP-0498", "XEP-0499", "GRE", "GRE-MIME",
        "GRE-OpenPGP", "PUBSUB_CACHE", "TEXT_SYNTAXES"
    ],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "EmailGatewayComponent",
    C.PI_HANDLER: C.BOOL_TRUE,
    C.PI_DESCRIPTION: D_(
        "Gateway to handle email. Usual emails are handled as message, while mailing "
        "lists are converted to pubsub blogs."
    ),
}

CONF_SECTION = f"component {IMPORT_NAME}"
PREFIX_KEY_CREDENTIALS = "CREDENTIALS_"
KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}"


class FileMetadata(NamedTuple):
    path: Path
    hash: str
    size: int


class SendMailExtra(BaseModel):
    addresses: AddressesData | None = None
    headers: HeadersData | None = None


class EmailGatewayComponent(GetDataHandler):
    IMPORT_NAME = IMPORT_NAME
    gre_formatters = [NS_GRE_MIME]
    gre_encrypters = [NS_GRE_OPENPGP]
    verbose = 0

    def __init__(self, host: "LiberviaBackend") -> None:
        """
        Initialize the Email Gateway component.

        @param host: The Libervia backend instance.
        """
        self.host = host
        self.client: SatXMPPComponent | None = None
        self.initalized = False
        self.storage: LazyPersistentBinaryDict | None = None
        self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"])
        self._iq_register.register_handler(
            self._on_registration_form, self._on_registration_submit
        )
        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
        self._shim = cast(XEP_0131, host.plugins["XEP-0131"])
        self._mb = cast(XEP_0277, host.plugins["XEP-0277"])
        self._pfs = cast(XEP_0498, host.plugins["XEP-0498"])
        self._ext_disco = cast(XEP_0499, host.plugins["XEP-0499"])
        self._gre = cast(GRE, host.plugins["GRE"])
        self._syntax = cast(TextSyntaxes, host.plugins["TEXT_SYNTAXES"])
        # TODO: For the moment, all credentials are kept in cache; we should only keep the
        #   X latest.
        self.users_data: dict[jid.JID, UserData] = {}
        self.files_path = self.host.get_local_path(None, C.FILES_DIR)
        host.trigger.add_with_check(
            "message_received", self, self._message_received_trigger, priority=-1000
        )

    async def _init(self) -> None:
        """Initialisation done after profile is connected"""
        assert self.client is not None
        self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME))
        self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile)
        await self.connect_registered_users()

    @aio
    async def get_registered_users(self) -> dict[jid.JID, Credentials]:
        """Retrieve credentials for all registered users

        @return: a mapping from user JID to credentials data.
        """
        assert self.client is not None
        profile_id = self.host.memory.storage.profiles[self.client.profile]
        async with self.host.memory.storage.session() as session:
            query = select(PrivateIndBin).where(
                PrivateIndBin.profile_id == profile_id,
                PrivateIndBin.namespace == IMPORT_NAME,
                PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS),
            )
            result = await session.execute(query)
            return {
                jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): Credentials(**p.value)
                for p in result.scalars()
            }

    async def connect_registered_users(self) -> None:
        """Connected users already registered to the gateway."""
        registered_data = await self.get_registered_users()
        for user_jid, credentials in registered_data.items():
            user_data = self.users_data[user_jid] = UserData(credentials=credentials)
            if not credentials.imap_success:
                log.warning(
                    f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user "
                    "won't receive message from this gateway."
                )
            else:
                try:
                    await self.connect_imap(user_jid, user_data)
                except Exception as e:
                    log.warning(f"Can't connect {user_jid} to IMAP: {e}.")
                else:
                    log.debug(f"Connection to IMAP server successful for {user_jid}.")

    def get_handler(self, __) -> tuple[XMPPHandler, XMPPHandler]:
        return EmailGatewayHandler(), EmailGWPubsubService(self)

    async def profile_connecting(self, client: SatXMPPEntity) -> None:
        assert isinstance(client, SatXMPPComponent)
        self.client = client
        self._gre.register_get_data_handler(client, self)
        if not self.initalized:
            await self._init()
            self.initalized = True

    def _message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred,
    ) -> bool:
        """add the gateway workflow on post treatment"""
        if client != self.client:
            return True
        post_treat.addCallback(
            lambda mess_data: defer.ensureDeferred(
                self.on_message(client, mess_data, message_elt)
            )
        )
        return True

    async def on_message(
        self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element
    ) -> dict:
        """Called once message has been parsed

        @param client: Client session.
        @param mess_data: Message data.
        @return: Message data.
        """
        if client != self.client:
            return mess_data
        from_jid = mess_data["from"].userhostJID()
        extra_kw = {}
        if mess_data["type"] not in ("chat", "normal"):
            log.warning(f"ignoring message with unexpected type: {mess_data}")
            return mess_data
        if not client.is_local(from_jid):
            log.warning(f"ignoring non local message: {mess_data}")
            return mess_data
        if not mess_data["to"].user:
            addresses = mess_data["extra"].get("addresses")
            if not addresses:
                log.warning(f"ignoring message addressed to gateway itself: {mess_data}")
                return mess_data
            else:
                to_email = None
                extra_kw["addresses"] = addresses
        else:
            try:
                to_email = self._e.unescape(mess_data["to"].user)
            except ValueError:
                raise exceptions.DataError(
                    f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.'
                )

        encrypted_payload = self._gre.get_encrypted_payload(message_elt)

        try:
            if encrypted_payload is not None:
                # We convert the base64 datat to ASCII Armor
                encrypted_binary = base64.b64decode(encrypted_payload)
                encrypted_payload = binary_to_ascii_armor(encrypted_binary)

                assert to_email is not None
                subject = "This is an encrypted message."
                outer = MIMEMultipart('encrypted', protocol="application/pgp-encrypted")
                outer["Subject"] = subject
                # FIXME: use credentials here.
                outer["From"] = from_jid.userhost()
                outer["To"] = to_email
                outer["Content-Type"] = "multipart/encrypted; protocol=\"application/pgp-encrypted\""
                version = MIMEApplication(
                    "Version: 1\n",
                    _subtype='pgp-encrypted',
                    _encoder=encoders.encode_7or8bit
                )
                version["Content-Description"] = "PGP/MIME version identification"
                encrypted_part = MIMEApplication(
                    encrypted_payload,
                    _subtype='octet-stream',
                    _encoder=encoders.encode_7or8bit
                )
                encrypted_part["Content-Description"] = "OpenPGP encrypted message"
                encrypted_part["Content-Type"] = "application/octet-stream; name=\"encrypted.asc\""
                encrypted_part["Content-Disposition"] = "inline; filename=\"encrypted.asc\""
                outer.attach(version)
                outer.attach(encrypted_part)
                body = outer.as_bytes()
                await self.send_encrypted_email(
                    from_jid=from_jid,
                    to_email=to_email,
                    body=body,
                    extra=SendMailExtra(**extra_kw) if extra_kw else None,
                )
            else:
                self._shim.move_keywords_to_headers(mess_data["extra"])
                headers = mess_data["extra"].get("headers")
                if headers:
                    extra_kw["headers"] = headers

                try:
                    body_lang, body = next(iter(mess_data["message"].items()))
                except (KeyError, StopIteration):
                    log.warning(f"No body found: {mess_data}")
                    body_lang, body = "", ""
                try:
                    subject_lang, subject = next(iter(mess_data["subject"].items()))
                except (KeyError, StopIteration):
                    subject_lang, subject = "", None

                if not body and not subject:
                    log.warning(f"Ignoring empty message: {mess_data}")
                    return mess_data

                await self.send_email(
                    from_jid=from_jid,
                    to_email=to_email,
                    body=body,
                    subject=subject,
                    extra=SendMailExtra(**extra_kw) if extra_kw else None,
                )
        except exceptions.UnknownEntityError:
            log.warning(f"Can't send message, user {from_jid} is not registered.")
            message_error_elt = StanzaError(
                "subscription-required",
                text="User need to register to the gateway before sending emails.",
            ).toResponse(message_elt)
            await client.a_send(message_error_elt)
            raise exceptions.CancelError("User not registered.")
        except StanzaError as e:
            log.warning("Can't send message: {e}")
            message_error_elt = e.toResponse(message_elt)
            await client.a_send(message_error_elt)
            raise exceptions.CancelError("Can't send message: {e}")

        return mess_data

    def jid_to_email(
        self, client: SatXMPPEntity, address_jid: jid.JID, credentials: Credentials
    ) -> str:
        """Convert a JID to an email address.

        If JID is from the gateway, email address will be extracted. Otherwise, the
        gateway email will be used, with XMPP address specified in name part.

        @param address_jid: JID of the recipient.
        @param credentials: Sender credentials.
        @return: Email address.
        """
        if address_jid and address_jid.host.endswith(str(client.jid)):
            return self._e.unescape(address_jid.user)
        else:
            email_address = credentials.user_email
            if address_jid:
                email_address = formataddr((f"xmpp:{address_jid}", email_address))
            return email_address

    async def get_credentials(self, from_jid: jid.JID) -> Credentials:
        """Retrieve user credentials from a bare JID.

        @param from_jid: Entity to retrieve credentials from.
        @return: Credentials.

        @raise UnknownEntityError: If no credentials are found for the given JID.
        """
        # We need a bare jid.
        assert self.storage is not None
        assert not from_jid.resource
        try:
            user_data = self.users_data[from_jid]
        except KeyError:
            key = KEY_CREDENTIALS.format(from_jid=from_jid)
            credentials = await self.storage.get(key)
            if credentials is None:
                raise exceptions.UnknownEntityError(
                    f"No credentials found for {from_jid}."
                )
            self.users_data[from_jid] = UserData(credentials)
        else:
            credentials = user_data.credentials
        return credentials

    async def send_encrypted_email(
        self,
        from_jid: jid.JID,
        to_email: str | None,
        body: bytes,
        extra: SendMailExtra | None = None,
    ) -> None:
        """Send an email using sender credentials.

        Credentials will be retrieved from cache or database.

        @param from_jid: Bare JID of the sender.
        @param to_email: Email address of the recipient.
        @param body: Encrypted body of the email.
        @param extra: Extra data.
        """
        assert self.client is not None
        assert isinstance(body, bytes)
        credentials = await self.get_credentials(from_jid)

        sender_domain = credentials.user_email.split("@", 1)[-1]
        recipients = []
        if to_email is not None:
            recipients.append(to_email.encode())
        if extra is not None and extra.addresses is not None:
            for address in extra.addresses.addresses:
                recipient_jid = address.jid
                if recipient_jid is None:
                    continue
                recipient_email = self.jid_to_email(
                    self.client, recipient_jid, credentials
                )
                recipients.append(recipient_email.encode())

        if not recipients:
            raise exceptions.InternalError("No recipient found.")

        await smtp.sendmail(
            credentials.smtp_host.encode(),
            credentials.user_email.encode(),
            recipients,
            body,
            senderDomainName=sender_domain,
            port=int(credentials.smtp_port),
            username=credentials.smtp_username.encode(),
            password=credentials.smtp_password.encode(),
            requireAuthentication=True,
            # TODO: only STARTTLS is supported right now, implicit TLS should be supported
            #   too.
            requireTransportSecurity=True,
        )

    async def send_email(
        self,
        from_jid: jid.JID,
        to_email: str | None,
        body: str,
        subject: str | None,
        extra: SendMailExtra | None = None,
    ) -> None:
        """Send an email using sender credentials.

        Credentials will be retrieve from cache, or database.

        @param from_jid: Bare JID of the sender.
        @param to_email: Email address of the destinee.
        @param body: Body of the email.
        @param subject: Subject of the email.
        @param extra: Extra data.

        @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found.
        """
        assert self.client is not None
        if extra is None:
            extra = SendMailExtra()
        if to_email is None and (extra.addresses is None or not extra.addresses.to):
            raise exceptions.InternalError(
                '"to_email" can\'t be None if there is no "to" address!'
            )

        credentials = await self.get_credentials(from_jid)

        if isinstance(body, bytes):
            assert to_email is not None
            sender_domain = credentials.user_email.split("@", 1)[-1]
            await smtp.sendmail(
                credentials.smtp_host.encode(),
                credentials.user_email.encode(),
                [to_email.encode()],
                body,
                senderDomainName=sender_domain,
                port=int(credentials.smtp_port),
                username=credentials.smtp_username.encode(),
                password=credentials.smtp_password.encode(),
                requireAuthentication=True,
                # TODO: only STARTTLS is supported right now, implicit TLS should be supported
                #   too.
                requireTransportSecurity=True,
            )
            return

        msg = MIMEText(body, "plain", "UTF-8")
        if subject is not None:
            msg["Subject"] = subject
        msg["From"] = formataddr(
            (credentials.user_name or None, credentials.user_email)
        )
        if extra.addresses:
            assert extra.addresses.to
            main_to_address = extra.addresses.to[0]
            assert main_to_address.jid
            to_email = self.jid_to_email(self.client, main_to_address.jid, credentials)
            for field in RECIPIENT_FIELDS:
                addresses = getattr(extra.addresses, field)
                if not addresses:
                    continue
                for address in addresses:
                    if not address.delivered and (
                        address.jid is None or address.jid.host != str(self.client.jid)
                    ):
                        log.warning(
                            "Received undelivered message to external JID, this is not "
                            "allowed! Cancelling the message sending."
                        )
                        stanza_err = jabber_error.StanzaError(
                            "forbidden",
                            text="Multicasting (XEP-0033 addresses) can only be used "
                            "with JID from this gateway, not external ones. "
                            f" {address.jid} can't be delivered by this gateway and "
                            "should be delivered by server instead.",
                        )
                        raise stanza_err
                email_addresses = [
                    self.jid_to_email(self.client, address.jid, credentials)
                    for address in addresses
                    if address.jid
                ]
                if email_addresses:
                    msg[field.upper()] = ", ".join(email_addresses)
        else:
            assert to_email is not None
            msg["To"] = to_email

        sender_domain = credentials.user_email.split("@", 1)[-1]

        if extra.headers:
            if extra.headers.keywords:
                msg["Keywords"] = extra.headers.keywords
            if extra.headers.urgency:
                urgency = extra.headers.urgency
                if urgency == Urgency.medium:
                    importance = "normal"
                else:
                    importance = urgency
                msg["Importance"] = importance
            if getattr(extra.headers, "autocrypt", None):
                msg["Autocrypt"] = extra.headers.autocrypt

        await smtp.sendmail(
            credentials.smtp_host.encode(),
            credentials.user_email.encode(),
            [to_email.encode()],
            msg.as_bytes(),
            senderDomainName=sender_domain,
            port=int(credentials.smtp_port),
            username=credentials.smtp_username.encode(),
            password=credentials.smtp_password.encode(),
            requireAuthentication=True,
            # TODO: only STARTTLS is supported right now, implicit TLS should be supported
            #   too.
            requireTransportSecurity=True,
        )

    async def _on_registration_form(
        self, client: SatXMPPEntity, iq_elt: domish.Element
    ) -> tuple[bool, data_form.Form] | None:
        if client != self.client:
            return
        assert self.storage is not None
        from_jid = jid.JID(iq_elt["from"])
        key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost())
        credentials = await self.storage.get(key) or {}

        form = data_form.Form(formType="form", title="IMAP/SMTP Credentials")

        # Add instructions
        form.instructions = [
            D_(
                "Please provide your IMAP and SMTP credentials to configure the "
                "connection."
            )
        ]

        # Add identity fields
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="user_name",
                label="User Name",
                desc=D_('The display name to use in the "From" field of sent emails.'),
                value=credentials.get("user_name"),
                required=True,
            )
        )

        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="user_email",
                label="User Email",
                desc=D_('The email address to use in the "From" field of sent emails.'),
                value=credentials.get("user_email"),
                required=True,
            )
        )

        # Add fields for IMAP credentials
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_host",
                label="IMAP Host",
                desc=D_("IMAP server hostname or IP address"),
                value=credentials.get("imap_host"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_port",
                label="IMAP Port",
                desc=D_("IMAP server port (default: 993)"),
                value=credentials.get("imap_port", "993"),
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_username",
                label="IMAP Username",
                desc=D_("Username for IMAP authentication"),
                value=credentials.get("imap_username"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-private",
                var="imap_password",
                label="IMAP Password",
                desc=D_("Password for IMAP authentication"),
                value=credentials.get("imap_password"),
                required=True,
            )
        )

        # Add fields for SMTP credentials
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_host",
                label="SMTP Host",
                desc=D_("SMTP server hostname or IP address"),
                value=credentials.get("smtp_host"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_port",
                label="SMTP Port",
                desc=D_("SMTP server port (default: 587)"),
                value=credentials.get("smtp_port", "587"),
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_username",
                label="SMTP Username",
                desc=D_("Username for SMTP authentication"),
                value=credentials.get("smtp_username"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-private",
                var="smtp_password",
                label="SMTP Password",
                desc=D_("Password for SMTP authentication"),
                value=credentials.get("smtp_password"),
                required=True,
            )
        )

        return bool(credentials), form

    def validate_field(
        self,
        form: data_form.Form,
        key: str,
        field_type: str,
        min_value: int | None = None,
        max_value: int | None = None,
        default: str | int | None = None,
    ) -> None:
        """Validate a single field.

        @param form: The form containing the fields.
        @param key: The key of the field to validate.
        @param field_type: The expected type of the field value.
        @param min_value: Optional minimum value for integer fields.
        @param max_value: Optional maximum value for integer fields.
        @param default: Default value to use if the field is missing.
        @raise StanzaError: If the field value is invalid or missing.
        """
        field = form.fields.get(key)
        if field is None:
            if default is None:
                raise StanzaError("bad-request", text=f"{key} is required")
            field = data_form.Field(var=key, value=str(default))
            form.addField(field)

        value = field.value
        if field_type == "int":
            try:
                value = int(value)
                if (min_value is not None and value < min_value) or (
                    max_value is not None and value > max_value
                ):
                    raise ValueError
            except (ValueError, TypeError):
                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")
        elif field_type == "str":
            if not isinstance(value, str):
                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")

            # Basic email validation for user_email field
            if key == "user_email":
                # XXX: This is a minimal check. A complete email validation is notoriously
                #   difficult.
                if not regex.RE_EMAIL.match(value):
                    raise StanzaError(
                        "bad-request", text=f"Invalid email address: {value}"
                    )

    def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None:
        """Validate the submitted IMAP/SMTP credentials form.

        @param submit_form: The submitted form containing IMAP/SMTP credentials.
        @raise StanzaError: If any of the values are invalid.
        """
        # Validate identity fields
        self.validate_field(submit_form, "user_name", "str")
        self.validate_field(submit_form, "user_email", "str")

        # Validate IMAP fields
        self.validate_field(submit_form, "imap_host", "str")
        self.validate_field(
            submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993
        )
        self.validate_field(submit_form, "imap_username", "str")
        self.validate_field(submit_form, "imap_password", "str")

        # Validate SMTP fields
        self.validate_field(submit_form, "smtp_host", "str")
        self.validate_field(
            submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587
        )
        self.validate_field(submit_form, "smtp_username", "str")
        self.validate_field(submit_form, "smtp_password", "str")

    def email_to_jid(
        self,
        client: SatXMPPEntity,
        user_email: str,
        user_jid: jid.JID,
        email_name: str,
        email_addr: str,
    ) -> tuple[jid.JID, str | None]:
        """Convert an email address to a JID and extract the name if present.

        @param client: Client session.
        @param user_email: Email address of the gateway user.
        @param user_jid: JID of the gateway user.
        @param email_name: Email associated name.
        @param email_addr: Email address.
        @return: Tuple of JID and name (if present).
        """
        email_name = email_name.strip()
        if email_name.startswith("xmpp:"):
            return jid.JID(email_name[5:]), None
        elif email_addr == user_email:
            return (user_jid, None)
        else:
            return (
                jid.JID(None, (self._e.escape(email_addr), client.jid.host, None)),
                email_name or None,
            )

    async def on_new_email(
        self, user_data: UserData, user_jid: jid.JID, email: EmailMessage
    ) -> None:
        """Called when a new message has been received.

        @param user_data: user data, used to map registered user email to corresponding
            jid.
        @param user_jid: JID of the recipient.
        @param email: Parsed email.
        """
        assert self.client is not None
        user_email = user_data.credentials.user_email
        author_name, author_email = parseaddr(email["from"])
        author_email = author_email.lower()
        from_jid = jid.JID(None, (self._e.escape(author_email), self.client.jid.host, None))

        # Get the email body
        body_mime = email.get_body(("plain",))
        if body_mime is not None:
            charset = body_mime.get_content_charset() or "utf-8"
            body = body_mime.get_payload(decode=True).decode(charset, errors="replace")
        else:
            log.warning(f"No body found in email:\n{email}")
            body = ""

        # Decode the subject
        subject = email.get("subject")
        if subject:
            decoded_subject = decode_header(subject)
            subject = "".join(
                [
                    part.decode(encoding or "utf-8") if isinstance(part, bytes) else part
                    for part, encoding in decoded_subject
                ]
            ).strip()
        else:
            subject = None

        # Parse recipient fields
        kwargs = {}
        for field in RECIPIENT_FIELDS:
            email_addresses = email.get_all(field)
            if email_addresses:
                jids_and_names = [
                    self.email_to_jid(self.client, user_email, user_jid, name, addr)
                    for name, addr in getaddresses(email_addresses)
                ]
                kwargs[field] = [
                    AddressType(jid=jid, desc=name) for jid, name in jids_and_names
                ]

        # At least "to" header should be set, so kwargs should never be empty
        assert kwargs
        addresses_data = AddressesData(**kwargs)

        # Parse reply-to field
        reply_to_addresses = email.get_all("reply-to")
        if reply_to_addresses:
            jids_with_names = [
                self.email_to_jid(self.client, user_email, user_jid, name, addr)
                for name, addr in getaddresses(reply_to_addresses)
            ]
            addresses_data.replyto = [
                AddressType(jid=jid, desc=name) for jid, name in jids_with_names
            ]

        # Set noreply flag
        # There is no flag to indicate a no-reply message, so we check common user parts
        # in from and reply-to headers.
        from_addresses = [author_email]
        if reply_to_addresses:
            from_addresses.extend(
                addr for a in reply_to_addresses if (addr := parseaddr(a)[1])
            )
        for from_address in from_addresses:
            from_user_part = from_address.split("@", 1)[0].lower()
            if from_user_part in (
                "no-reply",
                "noreply",
                "do-not-reply",
                "donotreply",
                "notification",
                "notifications",
            ):
                addresses_data.noreply = True
                break
        extra = {}

        if (
            not addresses_data.replyto
            and not addresses_data.noreply
            and not addresses_data.cc
            and not addresses_data.bcc
            and addresses_data.to == [AddressType(jid=user_jid)]
        ):
            # The main recipient is the only one, and there is no other metadata: there is
            # no need to add addresses metadata.
            pass
        else:
            for address in addresses_data.addresses:
                if address.jid and (
                    address.jid == user_jid or address.jid.host == str(self.client.jid)
                ):
                    # Those are email address, and have been delivered by the sender,
                    # other JID addresses will have to be delivered by us.
                    address.delivered = True

            extra["addresses"] = addresses_data.model_dump(mode="json", exclude_none=True)

        # We look for interesting headers
        headers = {}
        keywords_headers = email.get_all("keywords")
        if keywords_headers:
            keywords = ",".join(keywords_headers)
            headers["keywords"] = keywords

        importance = email["importance"]
        if importance:
            # We convert to urgency
            if importance in ("low", "high"):
                headers["urgency"] = importance
            elif importance == "normal":
                headers["urgency"] = "medium"
            else:
                log.warning("Ignoring invalid importance header: {importance!r}")

        autocrypt = email["autocrypt"]
        if autocrypt:
            headers["autocrypt"] = autocrypt

        if headers:
            extra["headers"] = HeadersData(**headers).model_dump(
                mode="json", exclude_none=True
            )

        # Handle attachments
        for part in email.iter_attachments():
            await self.handle_attachment(part, user_jid)

        client = self.client.get_virtual_client(from_jid)
        # Now that the message is parsed, we check if it's a mailing list.
        list_ids   = email.get_all("list-id")
        if list_ids:
            try:
                await self.handle_mailing_list(
                    client,
                    user_data,
                    author_name,
                    author_email,
                    list_ids[-1],
                    email,
                    user_jid,
                    body,
                    subject,
                    extra
                )
            except exceptions.DataError as e:
                log.warning(f"Can't parse mailing list email: {e}.")
            except Exception:
                log.exception("Can't parse mailing list email.")
        else:
            await client.sendMessage(
                user_jid,
                {"": body},
                {"": subject} if subject else None,
                extra=extra,
            )
    def parse_references(self, email: EmailMessage) -> list[str]:
        """Extract message IDs from the "References" header.

        @param email: The parsed email message.
        @returns: Message IDs
        """
        references = []
        for header in email.get_all('references', []):
            for token in header.split():
                _, address = parseaddr(token)
                if address:
                    references.append(address)
        return references

    def email_to_mb_data(
        self,
        email: EmailMessage,
        service: jid.JID,
        node: str,
        item_id: str,
        author_name: str,
        author_email: str
    ) -> MbData:
        """Convert an email to blog data.

        @param email: email to convert.
        @return: Blog data.
        """
        tags = set()

        # Title
        title = email.get('subject', '')
        def strip_tags(m):
            value = m.group()[1:-1].strip().lower()
            if value:
                tags.add(value)
            return ""
        title = re.sub(r"\[.+?\]", strip_tags, title).strip()

        # Dates
        date_header = email.get('Date')
        if date_header:
            try:
                published = date_utils.date_parse(date_header)
            except ValueError:
                published = None
        else:
            published = None

        # Body
        content = None
        content_xhtml = None

        if email.is_multipart():
            # We'll collect possible plain and html content parts
            plain_parts = []
            html_parts = []

            for part in email.walk():
                content_type = part.get_content_type()
                content_disposition = part.get('Content-Disposition', '').lower()

                # Skip attachments
                if 'attachment' in content_disposition:
                    continue

                payload = part.get_payload(decode=True)
                if payload is None:
                    continue
                payload = cast(bytes, payload)

                charset = part.get_content_charset() or 'utf-8'
                try:
                    payload_text = payload.decode(charset, errors='replace')
                except (LookupError, TypeError):
                    payload_text = payload.decode('utf-8', errors='replace')

                if content_type == 'text/plain':
                    plain_parts.append(payload_text)
                elif content_type == 'text/html':
                    html_parts.append(payload_text)

            # Prefer first valid parts for safety
            if plain_parts:
                content = plain_parts[0]
            if html_parts:
                content_xhtml = html_parts[0]
        else:
            # Single part email
            payload = email.get_payload(decode=True)
            if payload is not None:
                payload = cast(bytes, payload)
                charset = email.get_content_charset() or 'utf-8'

                try:
                    content_text = payload.decode(charset, errors='replace')
                except (LookupError, TypeError):
                    content_text = payload.decode('utf-8', errors='replace')

                content_type = email.get_content_type()
                if content_type == 'text/plain':
                    content = content_text
                elif content_type == 'text/html':
                    content_xhtml = content_text

        if content_xhtml is None:
            assert content is not None
            content_xhtml = convert_to_html_and_detect_noise(content)
        else:
            content_xhtml = convert_to_html_and_detect_noise(content_xhtml, is_html=True)

        content_xhtml = self._syntax.clean_xhtml(content_xhtml)

        return MbData(
            service=service,
            node=node,
            id=item_id,
            published=published,
            title=title,
            content=content,
            content_xhtml=content_xhtml,
            author=author_name,
            author_email=author_email,
            tags=list(tags)
        )

    async def mb_data_to_email(
        self,
        credentials: Credentials,
        mb_data: MbData
    ) -> EmailMessage:
        """Convert blog data to an email message.

        @param mb_data: Blog data to convert.
        @return: Email message.
        """
        email = EmailMessage()

        # Title
        title_parts = []
        tags = []

        for tag in mb_data.tags:
            if stripped_tag := tag.strip():
                title_parts.append(f"[{stripped_tag}]")
                tags.append(stripped_tag)

        if mb_data.title:
            title_parts.append(mb_data.title)

        email['Subject'] = ' '.join(title_parts)

        if tags:
            email['Keywords'] = ', '.join(tags)

        # From
        # FIXME: Check email according to sender.
        if mb_data.author and mb_data.author_email:
            email['From'] = f"{mb_data.author} <{mb_data.author_email}>"
        elif mb_data.author:
            email['From'] = f"{mb_data.author} <{credentials.user_email}>"
        elif mb_data.author_email:
            email['From'] = mb_data.author_email
        else:
            email['From'] = credentials.user_email

        # Message ID
        if mb_data.id:
            email['Message-ID'] = f"<{mb_data.id}>"
        else:
            msg_id = make_msgid()
            email['Message-ID'] = msg_id
            mb_data.id = msg_id[1:-1]

        # In-Reply-To
        if mb_data.in_reply_tos:
            # Use the first reply-to reference
            in_reply_to = mb_data.in_reply_tos[0]
            if in_reply_to.ref:
                email['In-Reply-To'] = f"<{in_reply_to.ref}>"

        # Dates
        date_set = False
        if mb_data.published is not None:
            try:
                email['Date'] = formatdate(mb_data.published, localtime=True)
                date_set = True
            except Exception:
                pass

        if not date_set and mb_data.updated is not None:
            try:
                email['Date'] = formatdate(mb_data.updated, localtime=True)
            except Exception:
                pass

        # Content
        if mb_data.content_xhtml:
            # XHTML content
            xhtml_content = mb_data.content_xhtml
            if not mb_data.content:
                mb_data.content = await self._syntax.convert(
                    xhtml_content,
                    self._syntax.SYNTAX_XHTML,
                    self._syntax.SYNTAX_TEXT,
                    False,
                )

            email.set_content(mb_data.content)
            email.add_alternative(xhtml_content, subtype='html')
        elif mb_data.content:
            email.set_content(mb_data.content)
        else:
            email.set_content("")

        return email

    async def handle_mailing_list(
        self,
        client: SatXMPPEntity,
        user_data: UserData,
        author_name: str,
        author_email: str,
        list_id_header: str,
        email: EmailMessage,
        user_jid: jid.JID,
        body: str,
        subject: str|None,
        extra: dict
    ) -> None:
        """Handle emails from mailing lists.

        Mailing list emails are converted to pubsub blogs.
        """
        assert self.client is not None
        pubsub_service = self.client.jid
        message_id_list = email.get_all("message-id")
        if not message_id_list:
            raise exceptions.DataError("Missing message ID.")
        message_id = message_id_list[-1].strip()
        if message_id.startswith("<") and message_id.endswith(">"):
            message_id = message_id[1:-1]
        if not message_id:
            raise exceptions.DataError("Emtpy message ID.")

        list_name, list_id = parseaddr(list_id_header, strict=False)
        list_name = list_name.strip()
        list_id = list_id.strip()
        if not list_id:
            raise exceptions.DataError(
                f"Mailing list ID is empty, we can't parse id: {list_id_header=}."
            )
        root_node = await G.storage.get_pubsub_node(
            client,
            pubsub_service,
            list_id,
            with_subscriptions=True,
            create=True,
            create_kwargs={
                "access_model": AccessModel.whitelist,
                "publish_model": PublishModel.publishers,
                "affiliations": [PubsubAffiliation(
                    entity = user_jid,
                    affiliation = Affiliation.owner
                )],
                "subscribed": True
            },
        )
        assert root_node is not None

        in_reply_to_value = ''.join(email.get_all("in-reply-to", []))
        __, in_reply_to = parseaddr(in_reply_to_value)
        references = self.parse_references(email)
        if in_reply_to:
            if not references:
                log.warning(
                    '"References" header should not be empty when "In-Reply-To" is set.'
                )
                references = [in_reply_to]
            else:
                if references[-1] != in_reply_to:
                    log.warning('Last ID in "References" should be "In-Reply-To".')
                    references.append(in_reply_to)
        if references:
            # We check that the top message of the thread has a corresponding item.
            top_item_id = references[0]
            parent_node_name = self._mb.get_comments_node(top_item_id)
            parent_node_uri = uri.build_xmpp_uri(
                "pubsub",
                path=root_node.service.full(),
                node=parent_node_name,
            )
            top_items = await G.storage.get_items(root_node, item_ids=[top_item_id])
            if not top_items:
                # The top item is missing, we make an empty one.
                empty_item_data = MbData(
                    service=pubsub_service,
                    node=root_node.name,
                    id=top_item_id,
                    title="missing item",
                    content="missing item",
                    author_jid=pubsub_service,
                    comments=[
                        Comment(
                            uri=parent_node_uri,
                            service=pubsub_service,
                            node=parent_node_name
                        )
                    ]
                )
                await G.storage.cache_pubsub_items(
                    client,
                    root_node,
                    [await empty_item_data.to_element(client)]
                )
            parent_node = await G.storage.get_pubsub_node(
                client,
                pubsub_service,
                parent_node_name,
                with_subscriptions=True,
                create=True,
                create_kwargs={
                    "access_model": AccessModel.whitelist,
                    "publish_model": PublishModel.publishers,
                    "subscribed": True,
                    "affiliations": [PubsubAffiliation(
                        entity = user_jid,
                        affiliation = Affiliation.owner
                    )],
                    "parent_node": root_node
                },
            )
        else:
            parent_node = root_node
            parent_node_name = cast(str, root_node.name)

        mb_data = self.email_to_mb_data(
            email,
            pubsub_service,
            parent_node_name,
            message_id,
            author_name,
            author_email
        )
        mb_data.comments.append(
            Comment(
                service = pubsub_service,
                node = self._mb.get_comments_node(message_id),
            )
        )
        await G.storage.cache_pubsub_items(
            client,
            parent_node,
            [await mb_data.to_element(client)],
            [mb_data.model_dump(mode="json")]
        )

    async def handle_attachment(self, part: EmailMessage, recipient_jid: jid.JID) -> None:
        """Handle an attachment from an email.

        @param part: The object representing the attachment.
        @param recipient_jid: JID of the recipient to whom the attachment is being sent.
        """
        assert self.client is not None
        content_type = part.get_content_type()
        filename = part.get_filename() or "attachment"
        log.debug(f"Handling attachment: {filename} ({content_type})")
        file_metadata = await deferToThread(self._save_attachment, part)
        if file_metadata is not None:
            log.debug(f"Attachment {filename!r} saved to {file_metadata.path}")
            try:
                await self.host.memory.set_file(
                    self.client,
                    filename,
                    file_hash=file_metadata.hash,
                    hash_algo="sha-256",
                    size=file_metadata.size,
                    namespace=PLUGIN_INFO[C.PI_IMPORT_NAME],
                    mime_type=content_type,
                    owner=recipient_jid,
                )
            except Exception:
                log.exception(f"Failed to register file {filename!r}")

    def _save_attachment(self, part: EmailMessage) -> FileMetadata | None:
        """Save the attachment to files path.

        This method must be executed in a thread with deferToThread to avoid blocking the
        reactor with IO operations if the attachment is large.

        @param part: The object representing the attachment.
        @return: Attachment data, or None if an error occurs.
        @raises IOError: Can't save the attachment.
        """
        temp_file = None
        try:
            with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                payload = part.get_payload(decode=True)
                if isinstance(payload, bytes):
                    temp_file.write(payload)
                    file_hash = hashlib.sha256(payload).hexdigest()
                    file_path = self.files_path / file_hash
                    shutil.move(temp_file.name, file_path)
                    file_size = len(payload)
                    return FileMetadata(path=file_path, hash=file_hash, size=file_size)
                else:
                    log.warning(f"Can't write payload of type {type(payload)}.")
                    return None
        except Exception as e:
            raise IOError(f"Failed to save attachment: {e}")
        finally:
            if temp_file is not None and Path(temp_file.name).exists():
                Path(temp_file.name).unlink()

    async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None:
        """Connect to IMAP service.

        [self.on_new_email] will be used as callback on new messages.

        @param from_jid: JID of the user associated with given credentials.
        @param credentials: Email credentials.
        """
        credentials = user_data.credentials

        connected = defer.Deferred()
        factory = IMAPClientFactory(
            user_data,
            partial(self.on_new_email, user_data, from_jid.userhostJID()),
            connected,
        )
        reactor.connectTCP(
            credentials.imap_host, int(credentials.imap_port), factory
        )
        await connected

    async def _on_registration_submit(
        self,
        client: SatXMPPEntity,
        iq_elt: domish.Element,
        submit_form: data_form.Form | None,
    ) -> bool | None:
        """Handle registration submit request.

        Submit form is validated, and credentials are stored.
        @param client: client session.
        iq_elt: IQ stanza of the submission request.
        submit_form: submit form.
        @return: True if successful.
            None if the callback is not relevant for this request.
        """
        if client != self.client:
            return
        assert self.storage is not None
        from_jid = jid.JID(iq_elt["from"]).userhostJID()

        if submit_form is None:
            # This is an unregistration request.
            try:
                user_data = self.users_data[from_jid]
            except KeyError:
                pass
            else:
                if user_data.imap_client is not None:
                    try:
                        await user_data.imap_client.logout()
                    except Exception:
                        log.exception(f"Can't log out {from_jid} from IMAP server.")
            key = KEY_CREDENTIALS.format(from_jid=from_jid)
            await self.storage.adel(key)
            log.info(f"{from_jid} unregistered from this gateway.")
            return True

        self.validate_imap_smtp_form(submit_form)
        credentials = Credentials(**{
            key: field.value for key, field in submit_form.fields.items()
        })
        user_data = self.users_data.get(from_jid)
        if user_data is None:
            # The user is not in cache, we cache current credentials.
            user_data = self.users_data[from_jid] = UserData(credentials=credentials)
        else:
            # The user is known, we update credentials.
            user_data.credentials = credentials
        key = KEY_CREDENTIALS.format(from_jid=from_jid)
        try:
            await self.connect_imap(from_jid, user_data)
        except Exception as e:
            log.warning(f"Can't connect to IMAP server for {from_jid}")
            credentials.imap_success = False
            await self.storage.aset(key, credentials)
            raise e
        else:
            log.debug(f"Connection successful to IMAP server for {from_jid}")
            credentials.imap_success = True
            await self.storage.aset(key, credentials)
            return True


    async def on_relayed_encryption_data(
        self,
        client: SatXMPPEntity,
        iq_elt: domish.Element,
        form: data_form.Form
    ) -> None:
        from_jid = jid.JID(iq_elt["from"]).userhostJID()
        credentials = await self.get_credentials(from_jid)
        form.addField(data_form.Field(var="sender_id", value=credentials.user_email))



@implementer(iwokkel.IDisco)
class EmailGatewayHandler(XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [
            data_form.Form(
                "result",
                formNamespace="urn:xmpp:data-policy:0",
                fields=(
                    data_form.Field(
                        "list-single",
                        "auth_data",
                        "plain"
                    ),
                    data_form.Field(
                        "list-single",
                        "data-transmission",
                        "encrypted"

                    ),
                    data_form.Field(
                        "text-single",
                        "encryption_algorithm",
                        "TLS"

                    ),
                    data_form.Field(
                        "text-single",
                        "data_retention",
                        "0",

                    ),
                ),
            ),
            data_form.Form(
                "result",
                formNamespace="urn:xmpp:data-policy:identity:gateway:smtp:0",
                fields=(
                    data_form.Field(
                        "text-multi",
                        "extra_info",
                        "This gateway acts as a relay to external IMAP/SMTP servers. Data policies depend entirely on the external server chosen by the user. This gateway does not store or process user data.",

                    ),
                ),
            ),
        ]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []