view libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4308:472a938a46e3

cli (message/send): add arguments for message addressing: commands have been added to add `to`, `cc` and `bcc` metadata for sending copy of the message to several entities. Metadata such as `reply-to` `reply-room` and `no-reply` are also supported. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents a7ec325246fb
children b56b1eae7994
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Email Gateway Component
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from dataclasses import dataclass
from email.header import decode_header
from email.message import EmailMessage
from email.mime.text import MIMEText
from email.utils import formataddr, parseaddr
from functools import partial
import re
from typing import Any, cast

from twisted.internet import defer, reactor
from twisted.mail import smtp
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.error import StanzaError
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.memory.persistent import LazyPersistentBinaryDict
from libervia.backend.memory.sqla import select
from libervia.backend.memory.sqla_mapping import PrivateIndBin
from libervia.backend.models.core import MessageData
from libervia.backend.plugins.plugin_xep_0077 import XEP_0077
from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
from libervia.backend.tools.utils import aio

from .models import Credentials, UserData
from .imap import IMAPClientFactory


log = getLogger(__name__)

IMPORT_NAME = "email-gateway"
NAME = "Libervia Email Gateway"

PLUGIN_INFO = {
    C.PI_NAME: "Email Gateway Component",
    C.PI_IMPORT_NAME: IMPORT_NAME,
    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0077", "XEP-0106"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "EmailGatewayComponent",
    C.PI_HANDLER: C.BOOL_TRUE,
    C.PI_DESCRIPTION: D_(
        "Gateway to handle email. Usual emails are handled as message, while mailing "
        "lists are converted to pubsub blogs."
    ),
}

CONF_SECTION = f"component {IMPORT_NAME}"
PREFIX_KEY_CREDENTIALS = "CREDENTIALS_"
KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}"

email_pattern = re.compile(r"[^@]+@[^@]+\.[^@]+")


class EmailGatewayComponent:
    IMPORT_NAME = IMPORT_NAME
    verbose = 0

    def __init__(self, host):
        self.host = host
        self.client: SatXMPPEntity | None = None
        self.initalized = False
        self.storage: LazyPersistentBinaryDict | None = None
        self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"])
        self._iq_register.register_handler(
            self._on_registration_form, self._on_registration_submit
        )
        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
        # TODO: For the moment, all credentials are kept in cache; we should only keep the
        #   X latest.
        self.users_data: dict[jid.JID, UserData] = {}
        host.trigger.add_with_check(
            "message_received", self, self._message_received_trigger, priority=-1000
        )

    async def _init(self) -> None:
        """Initialisation done after profile is connected"""
        assert self.client is not None
        self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME))
        self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile)
        await self.connect_registered_users()

    @aio
    async def get_registered_users(self) -> dict[jid.JID, Credentials]:
        """Retrieve credentials for all registered users

        @return: a mapping from user JID to credentials data.
        """
        assert self.client is not None
        profile_id = self.host.memory.storage.profiles[self.client.profile]
        async with self.host.memory.storage.session() as session:
            query = select(PrivateIndBin).where(
                PrivateIndBin.profile_id == profile_id,
                PrivateIndBin.namespace == IMPORT_NAME,
                PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS),
            )
            result = await session.execute(query)
            return {
                jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value
                for p in result.scalars()
            }

    async def connect_registered_users(self) -> None:
        """Connected users already registered to the gateway."""
        registered_data = await self.get_registered_users()
        for user_jid, credentials in registered_data.items():
            user_data = self.users_data[user_jid] = UserData(credentials=credentials)
            if not credentials["imap_success"]:
                log.warning(
                    f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user "
                    "won't receive message from this gateway."
                )
            else:
                try:
                    await self.connect_imap(user_jid, user_data)
                except Exception as e:
                    log.warning(f"Can't connect {user_jid} to IMAP: {e}.")
                else:
                    log.debug(f"Connection to IMAP server successful for {user_jid}.")

    def get_handler(self, __) -> XMPPHandler:
        return EmailGatewayHandler()

    async def profile_connecting(self, client: SatXMPPEntity) -> None:
        self.client = client
        if not self.initalized:
            await self._init()
            self.initalized = True

    def _message_received_trigger(
        self,
        client: SatXMPPEntity,
        message_elt: domish.Element,
        post_treat: defer.Deferred,
    ) -> bool:
        """add the gateway workflow on post treatment"""
        if client != self.client:
            return True
        post_treat.addCallback(
            lambda mess_data: defer.ensureDeferred(
                self.on_message(client, mess_data, message_elt)
            )
        )
        return True

    async def on_message(
        self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element
    ) -> dict:
        """Called once message has been parsed

        @param client: Client session.
        @param mess_data: Message data.
        @return: Message data.
        """
        if client != self.client:
            return mess_data
        from_jid = mess_data["from"].userhostJID()
        if mess_data["type"] not in ("chat", "normal"):
            log.warning(f"ignoring message with unexpected type: {mess_data}")
            return mess_data
        if not client.is_local(from_jid):
            log.warning(f"ignoring non local message: {mess_data}")
            return mess_data
        if not mess_data["to"].user:
            log.warning(f"ignoring message addressed to gateway itself: {mess_data}")
            return mess_data

        try:
            to_email = self._e.unescape(mess_data["to"].user)
        except ValueError:
            raise exceptions.DataError(
                f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.'
            )

        try:
            body_lang, body = next(iter(mess_data["message"].items()))
        except (KeyError, StopIteration):
            log.warning(f"No body found: {mess_data}")
            body_lang, body = "", ""
        try:
            subject_lang, subject = next(iter(mess_data["subject"].items()))
        except (KeyError, StopIteration):
            subject_lang, subject = "", None

        if not body and not subject:
            log.warning(f"Ignoring empty message: {mess_data}")
            return mess_data

        try:
            await self.send_email(
                from_jid=from_jid,
                to_email=to_email,
                body=body,
                subject=subject,
            )
        except exceptions.UnknownEntityError:
            log.warning(f"Can't send message, user {from_jid} is not registered.")
            message_error_elt = StanzaError(
                "subscription-required",
                text="User need to register to the gateway before sending emails.",
            ).toResponse(message_elt)
            await client.a_send(message_error_elt)

            raise exceptions.CancelError("User not registered.")

        return mess_data

    async def send_email(
        self,
        from_jid: jid.JID,
        to_email: str,
        body: str,
        subject: str | None,
    ) -> None:
        """Send an email using sender credentials.

        Credentials will be retrieve from cache, or database.

        @param from_jid: Bare JID of the sender.
        @param to_email: Email address of the destinee.
        @param body: Body of the email.
        @param subject: Subject of the email.

        @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found.
        """
        # We need a bare jid.
        assert self.storage is not None
        assert not from_jid.resource
        try:
            user_data = self.users_data[from_jid]
        except KeyError:
            key = KEY_CREDENTIALS.format(from_jid=from_jid)
            credentials = await self.storage.get(key)
            if credentials is None:
                raise exceptions.UnknownEntityError(
                    f"No credentials found for {from_jid}."
                )
            self.users_data[from_jid] = UserData(credentials)
        else:
            credentials = user_data.credentials

        msg = MIMEText(body, "plain", "UTF-8")
        if subject is not None:
            msg["Subject"] = subject
        msg["From"] = formataddr(
            (credentials["user_name"] or None, credentials["user_email"])
        )
        msg["To"] = to_email

        sender_domain = credentials["user_email"].split("@", 1)[-1]

        await smtp.sendmail(
            credentials["smtp_host"].encode(),
            credentials["user_email"].encode(),
            [to_email.encode()],
            msg.as_bytes(),
            senderDomainName=sender_domain,
            port=int(credentials["smtp_port"]),
            username=credentials["smtp_username"].encode(),
            password=credentials["smtp_password"].encode(),
            requireAuthentication=True,
            # TODO: only STARTTLS is supported right now, implicit TLS should be supported
            #   too.
            requireTransportSecurity=True,
        )

    async def _on_registration_form(
        self, client: SatXMPPEntity, iq_elt: domish.Element
    ) -> tuple[bool, data_form.Form] | None:
        if client != self.client:
            return
        assert self.storage is not None
        from_jid = jid.JID(iq_elt["from"])
        key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost())
        credentials = await self.storage.get(key) or {}

        form = data_form.Form(formType="form", title="IMAP/SMTP Credentials")

        # Add instructions
        form.instructions = [
            D_(
                "Please provide your IMAP and SMTP credentials to configure the "
                "connection."
            )
        ]

        # Add identity fields
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="user_name",
                label="User Name",
                desc=D_('The display name to use in the "From" field of sent emails.'),
                value=credentials.get("user_name"),
                required=True,
            )
        )

        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="user_email",
                label="User Email",
                desc=D_('The email address to use in the "From" field of sent emails.'),
                value=credentials.get("user_email"),
                required=True,
            )
        )

        # Add fields for IMAP credentials
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_host",
                label="IMAP Host",
                desc=D_("IMAP server hostname or IP address"),
                value=credentials.get("imap_host"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_port",
                label="IMAP Port",
                desc=D_("IMAP server port (default: 993)"),
                value=credentials.get("imap_port", "993"),
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="imap_username",
                label="IMAP Username",
                desc=D_("Username for IMAP authentication"),
                value=credentials.get("imap_username"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-private",
                var="imap_password",
                label="IMAP Password",
                desc=D_("Password for IMAP authentication"),
                value=credentials.get("imap_password"),
                required=True,
            )
        )

        # Add fields for SMTP credentials
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_host",
                label="SMTP Host",
                desc=D_("SMTP server hostname or IP address"),
                value=credentials.get("smtp_host"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_port",
                label="SMTP Port",
                desc=D_("SMTP server port (default: 587)"),
                value=credentials.get("smtp_port", "587"),
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-single",
                var="smtp_username",
                label="SMTP Username",
                desc=D_("Username for SMTP authentication"),
                value=credentials.get("smtp_username"),
                required=True,
            )
        )
        form.addField(
            data_form.Field(
                fieldType="text-private",
                var="smtp_password",
                label="SMTP Password",
                desc=D_("Password for SMTP authentication"),
                value=credentials.get("smtp_password"),
                required=True,
            )
        )

        return bool(credentials), form

    def validate_field(
        self,
        form: data_form.Form,
        key: str,
        field_type: str,
        min_value: int | None = None,
        max_value: int | None = None,
        default: str | int | None = None,
    ) -> None:
        """Validate a single field.

        @param form: The form containing the fields.
        @param key: The key of the field to validate.
        @param field_type: The expected type of the field value.
        @param min_value: Optional minimum value for integer fields.
        @param max_value: Optional maximum value for integer fields.
        @param default: Default value to use if the field is missing.
        @raise StanzaError: If the field value is invalid or missing.
        """
        field = form.fields.get(key)
        if field is None:
            if default is None:
                raise StanzaError("bad-request", text=f"{key} is required")
            field = data_form.Field(var=key, value=str(default))
            form.addField(field)

        value = field.value
        if field_type == "int":
            try:
                value = int(value)
                if (min_value is not None and value < min_value) or (
                    max_value is not None and value > max_value
                ):
                    raise ValueError
            except (ValueError, TypeError):
                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")
        elif field_type == "str":
            if not isinstance(value, str):
                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")

            # Basic email validation for user_email field
            if key == "user_email":
                # XXX: This is a minimal check. A complete email validation is notoriously
                #   difficult.
                if not email_pattern.match(value):
                    raise StanzaError(
                        "bad-request", text=f"Invalid email address: {value}"
                    )

    def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None:
        """Validate the submitted IMAP/SMTP credentials form.

        @param submit_form: The submitted form containing IMAP/SMTP credentials.
        @raise StanzaError: If any of the values are invalid.
        """
        # Validate identity fields
        self.validate_field(submit_form, "user_name", "str")
        self.validate_field(submit_form, "user_email", "str")

        # Validate IMAP fields
        self.validate_field(submit_form, "imap_host", "str")
        self.validate_field(
            submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993
        )
        self.validate_field(submit_form, "imap_username", "str")
        self.validate_field(submit_form, "imap_password", "str")

        # Validate SMTP fields
        self.validate_field(submit_form, "smtp_host", "str")
        self.validate_field(
            submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587
        )
        self.validate_field(submit_form, "smtp_username", "str")
        self.validate_field(submit_form, "smtp_password", "str")

    async def on_new_email(self, to_jid: jid.JID, email: EmailMessage) -> None:
        """Called when a new message has been received.

        @param to_jid: JID of the recipient.
        @param email: Parsed email.
        """
        assert self.client is not None
        name, email_addr = parseaddr(email["from"])
        email_addr = email_addr.lower()
        from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None))

        # Get the email body
        body_mime = email.get_body(("plain",))
        if body_mime is not None:
            charset = body_mime.get_content_charset() or "utf-8"
            body = body_mime.get_payload(decode=True).decode(charset, errors="replace")
        else:
            log.warning(f"No body found in email:\n{email}")
            body = ""

        # Decode the subject
        subject = email.get("subject")
        if subject:
            decoded_subject = decode_header(subject)
            subject = "".join(
                [
                    part.decode(encoding or "utf-8") if isinstance(part, bytes) else part
                    for part, encoding in decoded_subject
                ]
            ).strip()
        else:
            subject = None

        client = self.client.get_virtual_client(from_jid)
        await client.sendMessage(to_jid, {"": body}, {"": subject} if subject else None)

    async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None:
        """Connect to IMAP service.

        [self.on_new_email] will be used as callback on new messages.

        @param from_jid: JID of the user associated with given credentials.
        @param credentials: Email credentials.
        """
        credentials = user_data.credentials

        connected = defer.Deferred()
        factory = IMAPClientFactory(
            user_data,
            partial(self.on_new_email, from_jid.userhostJID()),
            connected,
        )
        reactor.connectTCP(
            credentials["imap_host"], int(credentials["imap_port"]), factory
        )
        await connected

    async def _on_registration_submit(
        self,
        client: SatXMPPEntity,
        iq_elt: domish.Element,
        submit_form: data_form.Form | None,
    ) -> bool | None:
        """Handle registration submit request.

        Submit form is validated, and credentials are stored.
        @param client: client session.
        iq_elt: IQ stanza of the submission request.
        submit_form: submit form.
        @return: True if successful.
            None if the callback is not relevant for this request.
        """
        if client != self.client:
            return
        assert self.storage is not None
        from_jid = jid.JID(iq_elt["from"]).userhostJID()

        if submit_form is None:
            # This is an unregistration request.
            try:
                user_data = self.users_data[from_jid]
            except KeyError:
                pass
            else:
                if user_data.imap_client is not None:
                    try:
                        await user_data.imap_client.logout()
                    except Exception:
                        log.exception(f"Can't log out {from_jid} from IMAP server.")
            key = KEY_CREDENTIALS.format(from_jid=from_jid)
            await self.storage.adel(key)
            log.info(f"{from_jid} unregistered from this gateway.")
            return True

        self.validate_imap_smtp_form(submit_form)
        credentials = {key: field.value for key, field in submit_form.fields.items()}
        user_data = self.users_data.get(from_jid)
        if user_data is None:
            # The user is not in cache, we cache current credentials.
            user_data = self.users_data[from_jid] = UserData(credentials=credentials)
        else:
            # The user is known, we update credentials.
            user_data.credentials = credentials
        key = KEY_CREDENTIALS.format(from_jid=from_jid)
        try:
            await self.connect_imap(from_jid, user_data)
        except Exception as e:
            log.warning(f"Can't connect to IMAP server for {from_jid}")
            credentials["imap_success"] = False
            await self.storage.aset(key, credentials)
            raise e
        else:
            log.debug(f"Connection successful to IMAP server for {from_jid}")
            credentials["imap_success"] = True
            await self.storage.aset(key, credentials)
            return True


@implementer(iwokkel.IDisco)
class EmailGatewayHandler(XMPPHandler):

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return []

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []