diff libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4303:a7ec325246fb

component email-gateway: first draft: Initial implementation of the Email Gateway. This component uses XEP-0100 for registration. Upon registration and subsequent startups, a connection is made to registered IMAP services, and incoming emails (in `INBOX` mailboxes) are immediately forwarded as XMPP messages. In the opposite direction, an SMTP connection is established to send emails on incoming XMPP messages. rel 449
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 18:07:17 +0200
parents
children b56b1eae7994
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py	Fri Sep 06 18:07:17 2024 +0200
@@ -0,0 +1,621 @@
+#!/usr/bin/env python3
+
+# Libervia Email Gateway Component
+# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from dataclasses import dataclass
+from email.header import decode_header
+from email.message import EmailMessage
+from email.mime.text import MIMEText
+from email.utils import formataddr, parseaddr
+from functools import partial
+import re
+from typing import Any, cast
+
+from twisted.internet import defer, reactor
+from twisted.mail import smtp
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.error import StanzaError
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import data_form, disco, iwokkel
+from zope.interface import implementer
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import D_, _
+from libervia.backend.core.log import getLogger
+from libervia.backend.memory.persistent import LazyPersistentBinaryDict
+from libervia.backend.memory.sqla import select
+from libervia.backend.memory.sqla_mapping import PrivateIndBin
+from libervia.backend.models.core import MessageData
+from libervia.backend.plugins.plugin_xep_0077 import XEP_0077
+from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
+from libervia.backend.tools.utils import aio
+
+from .models import Credentials, UserData
+from .imap import IMAPClientFactory
+
+
+log = getLogger(__name__)
+
+IMPORT_NAME = "email-gateway"
+NAME = "Libervia Email Gateway"
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Email Gateway Component",
+    C.PI_IMPORT_NAME: IMPORT_NAME,
+    C.PI_MODES: [C.PLUG_MODE_COMPONENT],
+    C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0077", "XEP-0106"],
+    C.PI_RECOMMENDATIONS: [],
+    C.PI_MAIN: "EmailGatewayComponent",
+    C.PI_HANDLER: C.BOOL_TRUE,
+    C.PI_DESCRIPTION: D_(
+        "Gateway to handle email. Usual emails are handled as message, while mailing "
+        "lists are converted to pubsub blogs."
+    ),
+}
+
+CONF_SECTION = f"component {IMPORT_NAME}"
+PREFIX_KEY_CREDENTIALS = "CREDENTIALS_"
+KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}"
+
+email_pattern = re.compile(r"[^@]+@[^@]+\.[^@]+")
+
+
+class EmailGatewayComponent:
+    IMPORT_NAME = IMPORT_NAME
+    verbose = 0
+
+    def __init__(self, host):
+        self.host = host
+        self.client: SatXMPPEntity | None = None
+        self.initalized = False
+        self.storage: LazyPersistentBinaryDict | None = None
+        self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"])
+        self._iq_register.register_handler(
+            self._on_registration_form, self._on_registration_submit
+        )
+        self._e = cast(XEP_0106, host.plugins["XEP-0106"])
+        # TODO: For the moment, all credentials are kept in cache; we should only keep the
+        #   X latest.
+        self.users_data: dict[jid.JID, UserData] = {}
+        host.trigger.add_with_check(
+            "message_received", self, self._message_received_trigger, priority=-1000
+        )
+
+    async def _init(self) -> None:
+        """Initialisation done after profile is connected"""
+        assert self.client is not None
+        self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME))
+        self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile)
+        await self.connect_registered_users()
+
+    @aio
+    async def get_registered_users(self) -> dict[jid.JID, Credentials]:
+        """Retrieve credentials for all registered users
+
+        @return: a mapping from user JID to credentials data.
+        """
+        assert self.client is not None
+        profile_id = self.host.memory.storage.profiles[self.client.profile]
+        async with self.host.memory.storage.session() as session:
+            query = select(PrivateIndBin).where(
+                PrivateIndBin.profile_id == profile_id,
+                PrivateIndBin.namespace == IMPORT_NAME,
+                PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS),
+            )
+            result = await session.execute(query)
+            return {
+                jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value
+                for p in result.scalars()
+            }
+
+    async def connect_registered_users(self) -> None:
+        """Connected users already registered to the gateway."""
+        registered_data = await self.get_registered_users()
+        for user_jid, credentials in registered_data.items():
+            user_data = self.users_data[user_jid] = UserData(credentials=credentials)
+            if not credentials["imap_success"]:
+                log.warning(
+                    f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user "
+                    "won't receive message from this gateway."
+                )
+            else:
+                try:
+                    await self.connect_imap(user_jid, user_data)
+                except Exception as e:
+                    log.warning(f"Can't connect {user_jid} to IMAP: {e}.")
+                else:
+                    log.debug(f"Connection to IMAP server successful for {user_jid}.")
+
+    def get_handler(self, __) -> XMPPHandler:
+        return EmailGatewayHandler()
+
+    async def profile_connecting(self, client: SatXMPPEntity) -> None:
+        self.client = client
+        if not self.initalized:
+            await self._init()
+            self.initalized = True
+
+    def _message_received_trigger(
+        self,
+        client: SatXMPPEntity,
+        message_elt: domish.Element,
+        post_treat: defer.Deferred,
+    ) -> bool:
+        """add the gateway workflow on post treatment"""
+        if client != self.client:
+            return True
+        post_treat.addCallback(
+            lambda mess_data: defer.ensureDeferred(
+                self.on_message(client, mess_data, message_elt)
+            )
+        )
+        return True
+
+    async def on_message(
+        self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element
+    ) -> dict:
+        """Called once message has been parsed
+
+        @param client: Client session.
+        @param mess_data: Message data.
+        @return: Message data.
+        """
+        if client != self.client:
+            return mess_data
+        from_jid = mess_data["from"].userhostJID()
+        if mess_data["type"] not in ("chat", "normal"):
+            log.warning(f"ignoring message with unexpected type: {mess_data}")
+            return mess_data
+        if not client.is_local(from_jid):
+            log.warning(f"ignoring non local message: {mess_data}")
+            return mess_data
+        if not mess_data["to"].user:
+            log.warning(f"ignoring message addressed to gateway itself: {mess_data}")
+            return mess_data
+
+        try:
+            to_email = self._e.unescape(mess_data["to"].user)
+        except ValueError:
+            raise exceptions.DataError(
+                f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.'
+            )
+
+        try:
+            body_lang, body = next(iter(mess_data["message"].items()))
+        except (KeyError, StopIteration):
+            log.warning(f"No body found: {mess_data}")
+            body_lang, body = "", ""
+        try:
+            subject_lang, subject = next(iter(mess_data["subject"].items()))
+        except (KeyError, StopIteration):
+            subject_lang, subject = "", None
+
+        if not body and not subject:
+            log.warning(f"Ignoring empty message: {mess_data}")
+            return mess_data
+
+        try:
+            await self.send_email(
+                from_jid=from_jid,
+                to_email=to_email,
+                body=body,
+                subject=subject,
+            )
+        except exceptions.UnknownEntityError:
+            log.warning(f"Can't send message, user {from_jid} is not registered.")
+            message_error_elt = StanzaError(
+                "subscription-required",
+                text="User need to register to the gateway before sending emails.",
+            ).toResponse(message_elt)
+            await client.a_send(message_error_elt)
+
+            raise exceptions.CancelError("User not registered.")
+
+        return mess_data
+
+    async def send_email(
+        self,
+        from_jid: jid.JID,
+        to_email: str,
+        body: str,
+        subject: str | None,
+    ) -> None:
+        """Send an email using sender credentials.
+
+        Credentials will be retrieve from cache, or database.
+
+        @param from_jid: Bare JID of the sender.
+        @param to_email: Email address of the destinee.
+        @param body: Body of the email.
+        @param subject: Subject of the email.
+
+        @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found.
+        """
+        # We need a bare jid.
+        assert self.storage is not None
+        assert not from_jid.resource
+        try:
+            user_data = self.users_data[from_jid]
+        except KeyError:
+            key = KEY_CREDENTIALS.format(from_jid=from_jid)
+            credentials = await self.storage.get(key)
+            if credentials is None:
+                raise exceptions.UnknownEntityError(
+                    f"No credentials found for {from_jid}."
+                )
+            self.users_data[from_jid] = UserData(credentials)
+        else:
+            credentials = user_data.credentials
+
+        msg = MIMEText(body, "plain", "UTF-8")
+        if subject is not None:
+            msg["Subject"] = subject
+        msg["From"] = formataddr(
+            (credentials["user_name"] or None, credentials["user_email"])
+        )
+        msg["To"] = to_email
+
+        sender_domain = credentials["user_email"].split("@", 1)[-1]
+
+        await smtp.sendmail(
+            credentials["smtp_host"].encode(),
+            credentials["user_email"].encode(),
+            [to_email.encode()],
+            msg.as_bytes(),
+            senderDomainName=sender_domain,
+            port=int(credentials["smtp_port"]),
+            username=credentials["smtp_username"].encode(),
+            password=credentials["smtp_password"].encode(),
+            requireAuthentication=True,
+            # TODO: only STARTTLS is supported right now, implicit TLS should be supported
+            #   too.
+            requireTransportSecurity=True,
+        )
+
+    async def _on_registration_form(
+        self, client: SatXMPPEntity, iq_elt: domish.Element
+    ) -> tuple[bool, data_form.Form] | None:
+        if client != self.client:
+            return
+        assert self.storage is not None
+        from_jid = jid.JID(iq_elt["from"])
+        key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost())
+        credentials = await self.storage.get(key) or {}
+
+        form = data_form.Form(formType="form", title="IMAP/SMTP Credentials")
+
+        # Add instructions
+        form.instructions = [
+            D_(
+                "Please provide your IMAP and SMTP credentials to configure the "
+                "connection."
+            )
+        ]
+
+        # Add identity fields
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="user_name",
+                label="User Name",
+                desc=D_('The display name to use in the "From" field of sent emails.'),
+                value=credentials.get("user_name"),
+                required=True,
+            )
+        )
+
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="user_email",
+                label="User Email",
+                desc=D_('The email address to use in the "From" field of sent emails.'),
+                value=credentials.get("user_email"),
+                required=True,
+            )
+        )
+
+        # Add fields for IMAP credentials
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="imap_host",
+                label="IMAP Host",
+                desc=D_("IMAP server hostname or IP address"),
+                value=credentials.get("imap_host"),
+                required=True,
+            )
+        )
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="imap_port",
+                label="IMAP Port",
+                desc=D_("IMAP server port (default: 993)"),
+                value=credentials.get("imap_port", "993"),
+            )
+        )
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="imap_username",
+                label="IMAP Username",
+                desc=D_("Username for IMAP authentication"),
+                value=credentials.get("imap_username"),
+                required=True,
+            )
+        )
+        form.addField(
+            data_form.Field(
+                fieldType="text-private",
+                var="imap_password",
+                label="IMAP Password",
+                desc=D_("Password for IMAP authentication"),
+                value=credentials.get("imap_password"),
+                required=True,
+            )
+        )
+
+        # Add fields for SMTP credentials
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="smtp_host",
+                label="SMTP Host",
+                desc=D_("SMTP server hostname or IP address"),
+                value=credentials.get("smtp_host"),
+                required=True,
+            )
+        )
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="smtp_port",
+                label="SMTP Port",
+                desc=D_("SMTP server port (default: 587)"),
+                value=credentials.get("smtp_port", "587"),
+            )
+        )
+        form.addField(
+            data_form.Field(
+                fieldType="text-single",
+                var="smtp_username",
+                label="SMTP Username",
+                desc=D_("Username for SMTP authentication"),
+                value=credentials.get("smtp_username"),
+                required=True,
+            )
+        )
+        form.addField(
+            data_form.Field(
+                fieldType="text-private",
+                var="smtp_password",
+                label="SMTP Password",
+                desc=D_("Password for SMTP authentication"),
+                value=credentials.get("smtp_password"),
+                required=True,
+            )
+        )
+
+        return bool(credentials), form
+
+    def validate_field(
+        self,
+        form: data_form.Form,
+        key: str,
+        field_type: str,
+        min_value: int | None = None,
+        max_value: int | None = None,
+        default: str | int | None = None,
+    ) -> None:
+        """Validate a single field.
+
+        @param form: The form containing the fields.
+        @param key: The key of the field to validate.
+        @param field_type: The expected type of the field value.
+        @param min_value: Optional minimum value for integer fields.
+        @param max_value: Optional maximum value for integer fields.
+        @param default: Default value to use if the field is missing.
+        @raise StanzaError: If the field value is invalid or missing.
+        """
+        field = form.fields.get(key)
+        if field is None:
+            if default is None:
+                raise StanzaError("bad-request", text=f"{key} is required")
+            field = data_form.Field(var=key, value=str(default))
+            form.addField(field)
+
+        value = field.value
+        if field_type == "int":
+            try:
+                value = int(value)
+                if (min_value is not None and value < min_value) or (
+                    max_value is not None and value > max_value
+                ):
+                    raise ValueError
+            except (ValueError, TypeError):
+                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")
+        elif field_type == "str":
+            if not isinstance(value, str):
+                raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")
+
+            # Basic email validation for user_email field
+            if key == "user_email":
+                # XXX: This is a minimal check. A complete email validation is notoriously
+                #   difficult.
+                if not email_pattern.match(value):
+                    raise StanzaError(
+                        "bad-request", text=f"Invalid email address: {value}"
+                    )
+
+    def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None:
+        """Validate the submitted IMAP/SMTP credentials form.
+
+        @param submit_form: The submitted form containing IMAP/SMTP credentials.
+        @raise StanzaError: If any of the values are invalid.
+        """
+        # Validate identity fields
+        self.validate_field(submit_form, "user_name", "str")
+        self.validate_field(submit_form, "user_email", "str")
+
+        # Validate IMAP fields
+        self.validate_field(submit_form, "imap_host", "str")
+        self.validate_field(
+            submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993
+        )
+        self.validate_field(submit_form, "imap_username", "str")
+        self.validate_field(submit_form, "imap_password", "str")
+
+        # Validate SMTP fields
+        self.validate_field(submit_form, "smtp_host", "str")
+        self.validate_field(
+            submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587
+        )
+        self.validate_field(submit_form, "smtp_username", "str")
+        self.validate_field(submit_form, "smtp_password", "str")
+
+    async def on_new_email(self, to_jid: jid.JID, email: EmailMessage) -> None:
+        """Called when a new message has been received.
+
+        @param to_jid: JID of the recipient.
+        @param email: Parsed email.
+        """
+        assert self.client is not None
+        name, email_addr = parseaddr(email["from"])
+        email_addr = email_addr.lower()
+        from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None))
+
+        # Get the email body
+        body_mime = email.get_body(("plain",))
+        if body_mime is not None:
+            charset = body_mime.get_content_charset() or "utf-8"
+            body = body_mime.get_payload(decode=True).decode(charset, errors="replace")
+        else:
+            log.warning(f"No body found in email:\n{email}")
+            body = ""
+
+        # Decode the subject
+        subject = email.get("subject")
+        if subject:
+            decoded_subject = decode_header(subject)
+            subject = "".join(
+                [
+                    part.decode(encoding or "utf-8") if isinstance(part, bytes) else part
+                    for part, encoding in decoded_subject
+                ]
+            ).strip()
+        else:
+            subject = None
+
+        client = self.client.get_virtual_client(from_jid)
+        await client.sendMessage(to_jid, {"": body}, {"": subject} if subject else None)
+
+    async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None:
+        """Connect to IMAP service.
+
+        [self.on_new_email] will be used as callback on new messages.
+
+        @param from_jid: JID of the user associated with given credentials.
+        @param credentials: Email credentials.
+        """
+        credentials = user_data.credentials
+
+        connected = defer.Deferred()
+        factory = IMAPClientFactory(
+            user_data,
+            partial(self.on_new_email, from_jid.userhostJID()),
+            connected,
+        )
+        reactor.connectTCP(
+            credentials["imap_host"], int(credentials["imap_port"]), factory
+        )
+        await connected
+
+    async def _on_registration_submit(
+        self,
+        client: SatXMPPEntity,
+        iq_elt: domish.Element,
+        submit_form: data_form.Form | None,
+    ) -> bool | None:
+        """Handle registration submit request.
+
+        Submit form is validated, and credentials are stored.
+        @param client: client session.
+        iq_elt: IQ stanza of the submission request.
+        submit_form: submit form.
+        @return: True if successful.
+            None if the callback is not relevant for this request.
+        """
+        if client != self.client:
+            return
+        assert self.storage is not None
+        from_jid = jid.JID(iq_elt["from"]).userhostJID()
+
+        if submit_form is None:
+            # This is an unregistration request.
+            try:
+                user_data = self.users_data[from_jid]
+            except KeyError:
+                pass
+            else:
+                if user_data.imap_client is not None:
+                    try:
+                        await user_data.imap_client.logout()
+                    except Exception:
+                        log.exception(f"Can't log out {from_jid} from IMAP server.")
+            key = KEY_CREDENTIALS.format(from_jid=from_jid)
+            await self.storage.adel(key)
+            log.info(f"{from_jid} unregistered from this gateway.")
+            return True
+
+        self.validate_imap_smtp_form(submit_form)
+        credentials = {key: field.value for key, field in submit_form.fields.items()}
+        user_data = self.users_data.get(from_jid)
+        if user_data is None:
+            # The user is not in cache, we cache current credentials.
+            user_data = self.users_data[from_jid] = UserData(credentials=credentials)
+        else:
+            # The user is known, we update credentials.
+            user_data.credentials = credentials
+        key = KEY_CREDENTIALS.format(from_jid=from_jid)
+        try:
+            await self.connect_imap(from_jid, user_data)
+        except Exception as e:
+            log.warning(f"Can't connect to IMAP server for {from_jid}")
+            credentials["imap_success"] = False
+            await self.storage.aset(key, credentials)
+            raise e
+        else:
+            log.debug(f"Connection successful to IMAP server for {from_jid}")
+            credentials["imap_success"] = True
+            await self.storage.aset(key, credentials)
+            return True
+
+
+@implementer(iwokkel.IDisco)
+class EmailGatewayHandler(XMPPHandler):
+
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return []
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []