view libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4338:7c0b7ecb816f

component email gateway: Add a pubsub service: a pubsub service is implemented to retrieve and manage attachments using XEP-0498. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 95792a1f26c7
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Email Gateway Component
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from email.header import decode_header
from email.message import EmailMessage
from email.mime.text import MIMEText
from email.utils import formataddr, getaddresses, parseaddr
from functools import partial
import hashlib
from pathlib import Path
import re
import shutil
import tempfile
from typing import TYPE_CHECKING, NamedTuple, cast

from pydantic import BaseModel
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
from twisted.mail import smtp
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber import error as jabber_error
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPComponent, SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.memory.persistent import LazyPersistentBinaryDict
from libervia.backend.memory.sqla import select
from libervia.backend.memory.sqla_mapping import PrivateIndBin
from libervia.backend.models.core import MessageData
from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import (
    EmailGWPubsubService,
)
from libervia.backend.plugins.plugin_xep_0033 import (
    AddressType,
    AddressesData,
    RECIPIENT_FIELDS,
)
from libervia.backend.plugins.plugin_xep_0077 import XEP_0077
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency, XEP_0131
from libervia.backend.plugins.plugin_xep_0498 import XEP_0498
from libervia.backend.tools.utils import aio

from .imap import IMAPClientFactory
from .models import Credentials, UserData

if TYPE_CHECKING:
    from libervia.backend.core.main import LiberviaBackend


log = getLogger(__name__)

IMPORT_NAME = "email-gateway"
NAME = "Libervia Email Gateway"

PLUGIN_INFO = {
    C.PI_NAME: "Email Gateway Component",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0033", "XEP-0077", "XEP-0106", "XEP-0498"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "EmailGatewayComponent",
    C.PI_HANDLER: C.BOOL_TRUE,
    C.PI_DESCRIPTION: D_(
        "Gateway to handle email. Usual emails are handled as message, while mailing "
        "lists are converted to pubsub blogs."
    ),
}

CONF_SECTION = f"component {IMPORT_NAME}"
PREFIX_KEY_CREDENTIALS = "CREDENTIALS_"
KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}"

email_pattern = re.compile(r"[^@]+@[^@]+\.[^@]+")


class FileMetadata(NamedTuple):
    path: Path
    hash: str
    size: int


class SendMailExtra(BaseModel):
    addresses: AddressesData | None = None
    headers: HeadersData | None = None


class EmailGatewayComponent:
    IMPORT_NAME = IMPORT_NAME
    verbose = 0

    def __init__(self, host: "LiberviaBackend") -> None:
        self.host = host
        self.client: SatXMPPComponent | None = None
        self.initalized = False
        self.storage: LazyPersistentBinaryDict | None = None
        self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"])
        self._iq_register.register_handler(
            self._on_registration_form, self._on_registration_submit
        )
        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
        self._shim = cast(XEP_0131, host.plugins["XEP-0131"])
        self._pfs = cast(XEP_0498, host.plugins["XEP-0498"])
        # TODO: For the moment, all credentials are kept in cache; we should only keep the
        #   X latest.
        self.users_data: dict[jid.JID, UserData] = {}
        self.files_path = self.host.get_local_path(None, C.FILES_DIR)
        host.trigger.add_with_check(
            "message_received", self, self._message_received_trigger, priority=-1000
        )

    async def _init(self) -> None:
        """Initialisation done after profile is connected"""
        assert self.client is not None
        self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME))
        self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile)
        await self.connect_registered_users()

    @aio
    async def get_registered_users(self) -> dict[jid.JID, Credentials]:
        """Retrieve credentials for all registered users

        @return: a mapping from user JID to credentials data.
        """
        assert self.client is not None
        profile_id = self.host.memory.storage.profiles[self.client.profile]
        async with self.host.memory.storage.session() as session:
            query = select(PrivateIndBin).where(
                PrivateIndBin.profile_id == profile_id,
                PrivateIndBin.namespace == IMPORT_NAME,
                PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS),
            )
            result = await session.execute(query)
            return {
                jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value
                for p in result.scalars()
            }

    async def connect_registered_users(self) -> None:
        """Connected users already registered to the gateway."""
        registered_data = await self.get_registered_users()
        for user_jid, credentials in registered_data.items():
            user_data = self.users_data[user_jid] = UserData(credentials=credentials)
            if not credentials["imap_success"]:
                log.warning(
                    f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user "
                    "won't receive message from this gateway."
                )
            else:
                try:
                    await self.connect_imap(user_jid, user_data)
                except Exception as e:
                    log.warning(f"Can't connect {user_jid} to IMAP: {e}.")
                else:
                    log.debug(f"Connection to IMAP server successful for {user_jid}.")

    def get_handler(self, __) -> tuple[XMPPHandler, XMPPHandler]:
        return EmailGatewayHandler(), EmailGWPubsubService(self)

    async def profile_connecting(self, client: SatXMPPEntity) -> None:
        assert isinstance(client, SatXMPPComponent)
        self.client = client
        if not self.initalized:
            await self._init()
            self.initalized = True

    def _message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred,
    ) -> bool:
        """add the gateway workflow on post treatment"""
        if client != self.client:
            return True
        post_treat.addCallback(
            lambda mess_data: defer.ensureDeferred(
                self.on_message(client, mess_data, message_elt)
            )
        )
        return True

    async def on_message(
        self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element
    ) -> dict:
        """Called once message has been parsed

        @param client: Client session.
        @param mess_data: Message data.
        @return: Message data.
        """
        if client != self.client:
            return mess_data
        from_jid = mess_data["from"].userhostJID()
        extra_kw = {}
        if mess_data["type"] not in ("chat", "normal"):
            log.warning(f"ignoring message with unexpected type: {mess_data}")
            return mess_data
        if not client.is_local(from_jid):
            log.warning(f"ignoring non local message: {mess_data}")
            return mess_data
        if not mess_data["to"].user:
            addresses = mess_data["extra"].get("addresses")
            if not addresses:
                log.warning(f"ignoring message addressed to gateway itself: {mess_data}")
                return mess_data
            else:
                to_email = None
                extra_kw["addresses"] = addresses
        else:
            try:
                to_email = self._e.unescape(mess_data["to"].user)
            except ValueError:
                raise exceptions.DataError(
                    f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.'
                )

        self._shim.move_keywords_to_headers(mess_data["extra"])
        headers = mess_data["extra"].get("headers")
        if headers:
            extra_kw["headers"] = headers

        try:
            body_lang, body = next(iter(mess_data["message"].items()))
        except (KeyError, StopIteration):
            log.warning(f"No body found: {mess_data}")
            body_lang, body = "", ""
        try:
            subject_lang, subject = next(iter(mess_data["subject"].items()))
        except (KeyError, StopIteration):
            subject_lang, subject = "", None

        if not body and not subject:
            log.warning(f"Ignoring empty message: {mess_data}")
            return mess_data

        try:
            await self.send_email(
                from_jid=from_jid,
                to_email=to_email,
                body=body,
                subject=subject,
                extra=SendMailExtra(**extra_kw) if extra_kw else None,
            )
        except exceptions.UnknownEntityError:
            log.warning(f"Can't send message, user {from_jid} is not registered.")
            message_error_elt = StanzaError(
                "subscription-required",
                text="User need to register to the gateway before sending emails.",
            ).toResponse(message_elt)
            await client.a_send(message_error_elt)
            raise exceptions.CancelError("User not registered.")
        except StanzaError as e:
            log.warning("Can't send message: {e}")
            message_error_elt = e.toResponse(message_elt)
            await client.a_send(message_error_elt)
            raise exceptions.CancelError("Can't send message: {e}")

        return mess_data

    def jid_to_email(
        self, client: SatXMPPEntity, address_jid: jid.JID, credentials: dict[str, str]
    ) -> str:
        """Convert a JID to an email address.

        If JID is from the gateway, email address will be extracted. Otherwise, the
        gateway email will be used, with XMPP address specified in name part.

        @param address_jid: JID of the recipient.
        @param credentials: Sender credentials.
        @return: Email address.
        """
        if address_jid and address_jid.host.endswith(str(client.jid)):
            return self._e.unescape(address_jid.user)
        else:
            email_address = credentials["user_email"]
            if address_jid:
                email_address = formataddr((f"xmpp:{address_jid}", email_address))
            return email_address

    async def send_email(
        self,
        from_jid: jid.JID,
        to_email: str | None,
        body: str,
        subject: str | None,
        extra: SendMailExtra | None = None,
    ) -> None:
        """Send an email using sender credentials.

        Credentials will be retrieve from cache, or database.

        @param from_jid: Bare JID of the sender.
        @param to_email: Email address of the destinee.
        @param body: Body of the email.
        @param subject: Subject of the email.
        @param extra: Extra data.

        @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found.
        """
        assert self.client is not None
        if extra is None:
            extra = SendMailExtra()
        if to_email is None and (extra.addresses is None or not extra.addresses.to):
            raise exceptions.InternalError(
                '"to_email" can\'t be None if there is no "to" address!'
            )

        # We need a bare jid.
        assert self.storage is not None
        assert not from_jid.resource
        try:
            user_data = self.users_data[from_jid]
        except KeyError:
            key = KEY_CREDENTIALS.format(from_jid=from_jid)
            credentials = await self.storage.get(key)
            if credentials is None:
                raise exceptions.UnknownEntityError(
                    f"No credentials found for {from_jid}."
                )
            self.users_data[from_jid] = UserData(credentials)
        else:
            credentials = user_data.credentials

        msg = MIMEText(body, "plain", "UTF-8")
        if subject is not None:
            msg["Subject"] = subject
        msg["From"] = formataddr(
            (credentials["user_name"] or None, credentials["user_email"])
        )
        if extra.addresses:
            assert extra.addresses.to
            main_to_address = extra.addresses.to[0]
            assert main_to_address.jid
            to_email = self.jid_to_email(self.client, main_to_address.jid, credentials)
            for field in RECIPIENT_FIELDS:
                addresses = getattr(extra.addresses, field)
                if not addresses:
                    continue
                for address in addresses:
                    if not address.delivered and (
                        address.jid is None or address.jid.host != str(self.client.jid)
                    ):
                        log.warning(
                            "Received undelivered message to external JID, this is not "
                            "allowed! Cancelling the message sending."
                        )
                        stanza_err = jabber_error.StanzaError(
                            "forbidden",
                            text="Multicasting (XEP-0033 addresses) can only be used "
                            "with JID from this gateway, not external ones. "
                            f" {address.jid} can't be delivered by this gateway and "
                            "should be delivered by server instead.",
                        )
                        raise stanza_err
                email_addresses = [
                    self.jid_to_email(self.client, address.jid, credentials)
                    for address in addresses
                    if address.jid
                ]
                if email_addresses:
                    msg[field.upper()] = ", ".join(email_addresses)
        else:
            assert to_email is not None
            msg["To"] = to_email

        sender_domain = credentials["user_email"].split("@", 1)[-1]

        if extra.headers:
            if extra.headers.keywords:
                msg["Keywords"] = extra.headers.keywords
            if extra.headers.urgency:
                urgency = extra.headers.urgency
                if urgency == Urgency.medium:
                    importance = "normal"
                else:
                    importance = urgency
                msg["Importance"] = importance

        await smtp.sendmail(
            credentials["smtp_host"].encode(),
            credentials["user_email"].encode(),
            [to_email.encode()],
            msg.as_bytes(),
            senderDomainName=sender_domain,
            port=int(credentials["smtp_port"]),
            username=credentials["smtp_username"].encode(),
            password=credentials["smtp_password"].encode(),
            requireAuthentication=True,
            # TODO: only STARTTLS is supported right now, implicit TLS should be supported
            #   too.
            requireTransportSecurity=True,
        )

    async def _on_registration_form(
        self, client: SatXMPPEntity, iq_elt: domish.Element
    ) -> tuple[bool, data_form.Form] | None:
        if client != self.client:
            return
        assert self.storage is not None
        from_jid = jid.JID(iq_elt["from"])
        key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost())
        credentials = await self.storage.get(key) or {}

        form = data_form.Form(formType="form", title="IMAP/SMTP Credentials")

        # Add instructions
        form.instructions = [
            D_(
                "Please provide your IMAP and SMTP credentials to configure the "
                "connection."
            )
        ]

        # Add identity fields
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="user_name",
                label="User Name",
                desc=D_('The display name to use in the "From" field of sent emails.'),
                value=credentials.get("user_name"),
                required=True,
            )
        )

        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="user_email",
                label="User Email",
                desc=D_('The email address to use in the "From" field of sent emails.'),
                value=credentials.get("user_email"),
                required=True,
            )
        )

        # Add fields for IMAP credentials
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_host",
                label="IMAP Host",
                desc=D_("IMAP server hostname or IP address"),
                value=credentials.get("imap_host"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_port",
                label="IMAP Port",
                desc=D_("IMAP server port (default: 993)"),
                value=credentials.get("imap_port", "993"),
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_username",
                label="IMAP Username",
                desc=D_("Username for IMAP authentication"),
                value=credentials.get("imap_username"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-private",
                var="imap_password",
                label="IMAP Password",
                desc=D_("Password for IMAP authentication"),
                value=credentials.get("imap_password"),
                required=True,
            )
        )

        # Add fields for SMTP credentials
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_host",
                label="SMTP Host",
                desc=D_("SMTP server hostname or IP address"),
                value=credentials.get("smtp_host"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_port",
                label="SMTP Port",
                desc=D_("SMTP server port (default: 587)"),
                value=credentials.get("smtp_port", "587"),
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_username",
                label="SMTP Username",
                desc=D_("Username for SMTP authentication"),
                value=credentials.get("smtp_username"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-private",
                var="smtp_password",
                label="SMTP Password",
                desc=D_("Password for SMTP authentication"),
                value=credentials.get("smtp_password"),
                required=True,
            )
        )

        return bool(credentials), form

    def validate_field(
        self,
        form: data_form.Form,
        key: str,
        field_type: str,
        min_value: int | None = None,
        max_value: int | None = None,
        default: str | int | None = None,
    ) -> None:
        """Validate a single field.

        @param form: The form containing the fields.
        @param key: The key of the field to validate.
        @param field_type: The expected type of the field value.
        @param min_value: Optional minimum value for integer fields.
        @param max_value: Optional maximum value for integer fields.
        @param default: Default value to use if the field is missing.
        @raise StanzaError: If the field value is invalid or missing.
        """
        field = form.fields.get(key)
        if field is None:
            if default is None:
                raise StanzaError("bad-request", text=f"{key} is required")
            field = data_form.Field(var=key, value=str(default))
            form.addField(field)

        value = field.value
        if field_type == "int":
            try:
                value = int(value)
                if (min_value is not None and value < min_value) or (
                    max_value is not None and value > max_value
                ):
                    raise ValueError
            except (ValueError, TypeError):
                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")
        elif field_type == "str":
            if not isinstance(value, str):
                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")

            # Basic email validation for user_email field
            if key == "user_email":
                # XXX: This is a minimal check. A complete email validation is notoriously
                #   difficult.
                if not email_pattern.match(value):
                    raise StanzaError(
                        "bad-request", text=f"Invalid email address: {value}"
                    )

    def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None:
        """Validate the submitted IMAP/SMTP credentials form.

        @param submit_form: The submitted form containing IMAP/SMTP credentials.
        @raise StanzaError: If any of the values are invalid.
        """
        # Validate identity fields
        self.validate_field(submit_form, "user_name", "str")
        self.validate_field(submit_form, "user_email", "str")

        # Validate IMAP fields
        self.validate_field(submit_form, "imap_host", "str")
        self.validate_field(
            submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993
        )
        self.validate_field(submit_form, "imap_username", "str")
        self.validate_field(submit_form, "imap_password", "str")

        # Validate SMTP fields
        self.validate_field(submit_form, "smtp_host", "str")
        self.validate_field(
            submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587
        )
        self.validate_field(submit_form, "smtp_username", "str")
        self.validate_field(submit_form, "smtp_password", "str")

    def email_to_jid(
        self,
        client: SatXMPPEntity,
        user_email: str,
        user_jid: jid.JID,
        email_name: str,
        email_addr: str,
    ) -> tuple[jid.JID, str | None]:
        """Convert an email address to a JID and extract the name if present.

        @param client: Client session.
        @param user_email: Email address of the gateway user.
        @param user_jid: JID of the gateway user.
        @param email_name: Email associated name.
        @param email_addr: Email address.
        @return: Tuple of JID and name (if present).
        """
        email_name = email_name.strip()
        if email_name.startswith("xmpp:"):
            return jid.JID(email_name[5:]), None
        elif email_addr == user_email:
            return (user_jid, None)
        else:
            return (
                jid.JID(None, (self._e.escape(email_addr), client.jid.host, None)),
                email_name or None,
            )

    async def on_new_email(
        self, user_data: UserData, user_jid: jid.JID, email: EmailMessage
    ) -> None:
        """Called when a new message has been received.

        @param user_data: user data, used to map registered user email to corresponding
            jid.
        @param user_jid: JID of the recipient.
        @param email: Parsed email.
        """
        assert self.client is not None
        user_email = user_data.credentials["user_email"]
        name, email_addr = parseaddr(email["from"])
        email_addr = email_addr.lower()
        from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None))

        # Get the email body
        body_mime = email.get_body(("plain",))
        if body_mime is not None:
            charset = body_mime.get_content_charset() or "utf-8"
            body = body_mime.get_payload(decode=True).decode(charset, errors="replace")
        else:
            log.warning(f"No body found in email:\n{email}")
            body = ""

        # Decode the subject
        subject = email.get("subject")
        if subject:
            decoded_subject = decode_header(subject)
            subject = "".join(
                [
                    part.decode(encoding or "utf-8") if isinstance(part, bytes) else part
                    for part, encoding in decoded_subject
                ]
            ).strip()
        else:
            subject = None

        # Parse recipient fields
        kwargs = {}
        for field in RECIPIENT_FIELDS:
            email_addresses = email.get_all(field)
            if email_addresses:
                jids_and_names = [
                    self.email_to_jid(self.client, user_email, user_jid, name, addr)
                    for name, addr in getaddresses(email_addresses)
                ]
                kwargs[field] = [
                    AddressType(jid=jid, desc=name) for jid, name in jids_and_names
                ]

        # At least "to" header should be set, so kwargs should never be empty
        assert kwargs
        addresses_data = AddressesData(**kwargs)

        # Parse reply-to field
        reply_to_addresses = email.get_all("reply-to")
        if reply_to_addresses:
            jids_with_names = [
                self.email_to_jid(self.client, user_email, user_jid, name, addr)
                for name, addr in getaddresses(reply_to_addresses)
            ]
            addresses_data.replyto = [
                AddressType(jid=jid, desc=name) for jid, name in jids_with_names
            ]

        # Set noreply flag
        # The is no flag to indicate a no-reply message, so we check common user parts in
        # from and reply-to headers.
        from_addresses = [email_addr]
        if reply_to_addresses:
            from_addresses.extend(
                addr for a in reply_to_addresses if (addr := parseaddr(a)[1])
            )
        for from_address in from_addresses:
            from_user_part = from_address.split("@", 1)[0].lower()
            if from_user_part in (
                "no-reply",
                "noreply",
                "do-not-reply",
                "donotreply",
                "notification",
                "notifications",
            ):
                addresses_data.noreply = True
                break
        extra = {}

        if (
            not addresses_data.replyto
            and not addresses_data.noreply
            and not addresses_data.cc
            and not addresses_data.bcc
            and addresses_data.to == [AddressType(jid=user_jid)]
        ):
            # The main recipient is the only one, and there is no other metadata: there is
            # no need to add addresses metadata.
            pass
        else:
            for address in addresses_data.addresses:
                if address.jid and (
                    address.jid == user_jid or address.jid.host == str(self.client.jid)
                ):
                    # Those are email address, and have been delivered by the sender,
                    # other JID addresses will have to be delivered by us.
                    address.delivered = True

            extra["addresses"] = addresses_data.model_dump(mode="json", exclude_none=True)

        # We look for interesting headers
        headers = {}
        keywords_headers = email.get_all("keywords")
        if keywords_headers:
            keywords = ",".join(keywords_headers)
            headers["keywords"] = keywords

        importance = email["importance"]
        if importance:
            # We convert to urgency
            if importance in ("low", "high"):
                headers["urgency"] = importance
            elif importance == "normal":
                headers["urgency"] = "medium"
            else:
                log.warning("Ignoring invalid importance header: {importance!r}")

        if headers:
            extra["headers"] = HeadersData(**headers).model_dump(
                mode="json", exclude_none=True
            )

        # Handle attachments
        for part in email.iter_attachments():
            await self.handle_attachment(part, user_jid)

        client = self.client.get_virtual_client(from_jid)

        await client.sendMessage(
            user_jid,
            {"": body},
            {"": subject} if subject else None,
            extra=extra,
        )

    async def handle_attachment(self, part: EmailMessage, recipient_jid: jid.JID) -> None:
        """Handle an attachment from an email.

        @param part: The object representing the attachment.
        @param recipient_jid: JID of the recipient to whom the attachment is being sent.
        """
        assert self.client is not None
        content_type = part.get_content_type()
        filename = part.get_filename() or "attachment"
        log.debug(f"Handling attachment: {filename} ({content_type})")
        file_metadata = await deferToThread(self._save_attachment, part)
        if file_metadata is not None:
            log.debug(f"Attachment {filename!r} saved to {file_metadata.path}")
            try:
                await self.host.memory.set_file(
                    self.client,
                    filename,
                    file_hash=file_metadata.hash,
                    hash_algo="sha-256",
                    size=file_metadata.size,
                    namespace=PLUGIN_INFO[C.PI_IMPORT_NAME],
                    mime_type=content_type,
                    owner=recipient_jid,
                )
            except Exception:
                log.exception(f"Failed to register file {filename!r}")

    def _save_attachment(self, part: EmailMessage) -> FileMetadata | None:
        """Save the attachment to files path.

        This method must be executed in a thread with deferToThread to avoid blocking the
        reactor with IO operations if the attachment is large.

        @param part: The object representing the attachment.
        @return: Attachment data, or None if an error occurs.
        @raises IOError: Can't save the attachment.
        """
        temp_file = None
        try:
            with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                payload = part.get_payload(decode=True)
                if isinstance(payload, bytes):
                    temp_file.write(payload)
                    file_hash = hashlib.sha256(payload).hexdigest()
                    file_path = self.files_path / file_hash
                    shutil.move(temp_file.name, file_path)
                    file_size = len(payload)
                    return FileMetadata(path=file_path, hash=file_hash, size=file_size)
                else:
                    log.warning(f"Can't write payload of type {type(payload)}.")
                    return None
        except Exception as e:
            raise IOError(f"Failed to save attachment: {e}")
        finally:
            if temp_file is not None and Path(temp_file.name).exists():
                Path(temp_file.name).unlink()

    async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None:
        """Connect to IMAP service.

        [self.on_new_email] will be used as callback on new messages.

        @param from_jid: JID of the user associated with given credentials.
        @param credentials: Email credentials.
        """
        credentials = user_data.credentials

        connected = defer.Deferred()
        factory = IMAPClientFactory(
            user_data,
            partial(self.on_new_email, user_data, from_jid.userhostJID()),
            connected,
        )
        reactor.connectTCP(
            credentials["imap_host"], int(credentials["imap_port"]), factory
        )
        await connected

    async def _on_registration_submit(
        self,
        client: SatXMPPEntity,
        iq_elt: domish.Element,
        submit_form: data_form.Form | None,
    ) -> bool | None:
        """Handle registration submit request.

        Submit form is validated, and credentials are stored.
        @param client: client session.
        iq_elt: IQ stanza of the submission request.
        submit_form: submit form.
        @return: True if successful.
            None if the callback is not relevant for this request.
        """
        if client != self.client:
            return
        assert self.storage is not None
        from_jid = jid.JID(iq_elt["from"]).userhostJID()

        if submit_form is None:
            # This is an unregistration request.
            try:
                user_data = self.users_data[from_jid]
            except KeyError:
                pass
            else:
                if user_data.imap_client is not None:
                    try:
                        await user_data.imap_client.logout()
                    except Exception:
                        log.exception(f"Can't log out {from_jid} from IMAP server.")
            key = KEY_CREDENTIALS.format(from_jid=from_jid)
            await self.storage.adel(key)
            log.info(f"{from_jid} unregistered from this gateway.")
            return True

        self.validate_imap_smtp_form(submit_form)
        credentials = {key: field.value for key, field in submit_form.fields.items()}
        user_data = self.users_data.get(from_jid)
        if user_data is None:
            # The user is not in cache, we cache current credentials.
            user_data = self.users_data[from_jid] = UserData(credentials=credentials)
        else:
            # The user is known, we update credentials.
            user_data.credentials = credentials
        key = KEY_CREDENTIALS.format(from_jid=from_jid)
        try:
            await self.connect_imap(from_jid, user_data)
        except Exception as e:
            log.warning(f"Can't connect to IMAP server for {from_jid}")
            credentials["imap_success"] = False
            await self.storage.aset(key, credentials)
            raise e
        else:
            log.debug(f"Connection successful to IMAP server for {from_jid}")
            credentials["imap_success"] = True
            await self.storage.aset(key, credentials)
            return True


@implementer(iwokkel.IDisco)
class EmailGatewayHandler(XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return []

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []