Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4303:a7ec325246fb
component email-gateway: first draft:
Initial implementation of the Email Gateway.
This component uses XEP-0100 for registration. Upon registration and subsequent startups,
a connection is made to registered IMAP services, and incoming emails (in `INBOX`
mailboxes) are immediately forwarded as XMPP messages.
In the opposite direction, an SMTP connection is established to send emails on incoming
XMPP messages.
rel 449
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Sep 2024 18:07:17 +0200 |
parents | |
children | b56b1eae7994 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/__init__.py Fri Sep 06 18:07:17 2024 +0200 @@ -0,0 +1,621 @@ +#!/usr/bin/env python3 + +# Libervia Email Gateway Component +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from dataclasses import dataclass +from email.header import decode_header +from email.message import EmailMessage +from email.mime.text import MIMEText +from email.utils import formataddr, parseaddr +from functools import partial +import re +from typing import Any, cast + +from twisted.internet import defer, reactor +from twisted.mail import smtp +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.error import StanzaError +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import data_form, disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import D_, _ +from libervia.backend.core.log import getLogger +from libervia.backend.memory.persistent import LazyPersistentBinaryDict +from libervia.backend.memory.sqla import select +from libervia.backend.memory.sqla_mapping import PrivateIndBin +from libervia.backend.models.core import MessageData +from libervia.backend.plugins.plugin_xep_0077 import XEP_0077 +from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 +from libervia.backend.tools.utils import aio + +from .models import Credentials, UserData +from .imap import IMAPClientFactory + + +log = getLogger(__name__) + +IMPORT_NAME = "email-gateway" +NAME = "Libervia Email Gateway" + +PLUGIN_INFO = { + C.PI_NAME: "Email Gateway Component", + C.PI_IMPORT_NAME: IMPORT_NAME, + C.PI_MODES: [C.PLUG_MODE_COMPONENT], + C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0077", "XEP-0106"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "EmailGatewayComponent", + C.PI_HANDLER: C.BOOL_TRUE, + C.PI_DESCRIPTION: D_( + "Gateway to handle email. Usual emails are handled as message, while mailing " + "lists are converted to pubsub blogs." + ), +} + +CONF_SECTION = f"component {IMPORT_NAME}" +PREFIX_KEY_CREDENTIALS = "CREDENTIALS_" +KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}" + +email_pattern = re.compile(r"[^@]+@[^@]+\.[^@]+") + + +class EmailGatewayComponent: + IMPORT_NAME = IMPORT_NAME + verbose = 0 + + def __init__(self, host): + self.host = host + self.client: SatXMPPEntity | None = None + self.initalized = False + self.storage: LazyPersistentBinaryDict | None = None + self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"]) + self._iq_register.register_handler( + self._on_registration_form, self._on_registration_submit + ) + self._e = cast(XEP_0106, host.plugins["XEP-0106"]) + # TODO: For the moment, all credentials are kept in cache; we should only keep the + # X latest. + self.users_data: dict[jid.JID, UserData] = {} + host.trigger.add_with_check( + "message_received", self, self._message_received_trigger, priority=-1000 + ) + + async def _init(self) -> None: + """Initialisation done after profile is connected""" + assert self.client is not None + self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME)) + self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile) + await self.connect_registered_users() + + @aio + async def get_registered_users(self) -> dict[jid.JID, Credentials]: + """Retrieve credentials for all registered users + + @return: a mapping from user JID to credentials data. + """ + assert self.client is not None + profile_id = self.host.memory.storage.profiles[self.client.profile] + async with self.host.memory.storage.session() as session: + query = select(PrivateIndBin).where( + PrivateIndBin.profile_id == profile_id, + PrivateIndBin.namespace == IMPORT_NAME, + PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS), + ) + result = await session.execute(query) + return { + jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value + for p in result.scalars() + } + + async def connect_registered_users(self) -> None: + """Connected users already registered to the gateway.""" + registered_data = await self.get_registered_users() + for user_jid, credentials in registered_data.items(): + user_data = self.users_data[user_jid] = UserData(credentials=credentials) + if not credentials["imap_success"]: + log.warning( + f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user " + "won't receive message from this gateway." + ) + else: + try: + await self.connect_imap(user_jid, user_data) + except Exception as e: + log.warning(f"Can't connect {user_jid} to IMAP: {e}.") + else: + log.debug(f"Connection to IMAP server successful for {user_jid}.") + + def get_handler(self, __) -> XMPPHandler: + return EmailGatewayHandler() + + async def profile_connecting(self, client: SatXMPPEntity) -> None: + self.client = client + if not self.initalized: + await self._init() + self.initalized = True + + def _message_received_trigger( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + post_treat: defer.Deferred, + ) -> bool: + """add the gateway workflow on post treatment""" + if client != self.client: + return True + post_treat.addCallback( + lambda mess_data: defer.ensureDeferred( + self.on_message(client, mess_data, message_elt) + ) + ) + return True + + async def on_message( + self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element + ) -> dict: + """Called once message has been parsed + + @param client: Client session. + @param mess_data: Message data. + @return: Message data. + """ + if client != self.client: + return mess_data + from_jid = mess_data["from"].userhostJID() + if mess_data["type"] not in ("chat", "normal"): + log.warning(f"ignoring message with unexpected type: {mess_data}") + return mess_data + if not client.is_local(from_jid): + log.warning(f"ignoring non local message: {mess_data}") + return mess_data + if not mess_data["to"].user: + log.warning(f"ignoring message addressed to gateway itself: {mess_data}") + return mess_data + + try: + to_email = self._e.unescape(mess_data["to"].user) + except ValueError: + raise exceptions.DataError( + f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.' + ) + + try: + body_lang, body = next(iter(mess_data["message"].items())) + except (KeyError, StopIteration): + log.warning(f"No body found: {mess_data}") + body_lang, body = "", "" + try: + subject_lang, subject = next(iter(mess_data["subject"].items())) + except (KeyError, StopIteration): + subject_lang, subject = "", None + + if not body and not subject: + log.warning(f"Ignoring empty message: {mess_data}") + return mess_data + + try: + await self.send_email( + from_jid=from_jid, + to_email=to_email, + body=body, + subject=subject, + ) + except exceptions.UnknownEntityError: + log.warning(f"Can't send message, user {from_jid} is not registered.") + message_error_elt = StanzaError( + "subscription-required", + text="User need to register to the gateway before sending emails.", + ).toResponse(message_elt) + await client.a_send(message_error_elt) + + raise exceptions.CancelError("User not registered.") + + return mess_data + + async def send_email( + self, + from_jid: jid.JID, + to_email: str, + body: str, + subject: str | None, + ) -> None: + """Send an email using sender credentials. + + Credentials will be retrieve from cache, or database. + + @param from_jid: Bare JID of the sender. + @param to_email: Email address of the destinee. + @param body: Body of the email. + @param subject: Subject of the email. + + @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found. + """ + # We need a bare jid. + assert self.storage is not None + assert not from_jid.resource + try: + user_data = self.users_data[from_jid] + except KeyError: + key = KEY_CREDENTIALS.format(from_jid=from_jid) + credentials = await self.storage.get(key) + if credentials is None: + raise exceptions.UnknownEntityError( + f"No credentials found for {from_jid}." + ) + self.users_data[from_jid] = UserData(credentials) + else: + credentials = user_data.credentials + + msg = MIMEText(body, "plain", "UTF-8") + if subject is not None: + msg["Subject"] = subject + msg["From"] = formataddr( + (credentials["user_name"] or None, credentials["user_email"]) + ) + msg["To"] = to_email + + sender_domain = credentials["user_email"].split("@", 1)[-1] + + await smtp.sendmail( + credentials["smtp_host"].encode(), + credentials["user_email"].encode(), + [to_email.encode()], + msg.as_bytes(), + senderDomainName=sender_domain, + port=int(credentials["smtp_port"]), + username=credentials["smtp_username"].encode(), + password=credentials["smtp_password"].encode(), + requireAuthentication=True, + # TODO: only STARTTLS is supported right now, implicit TLS should be supported + # too. + requireTransportSecurity=True, + ) + + async def _on_registration_form( + self, client: SatXMPPEntity, iq_elt: domish.Element + ) -> tuple[bool, data_form.Form] | None: + if client != self.client: + return + assert self.storage is not None + from_jid = jid.JID(iq_elt["from"]) + key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost()) + credentials = await self.storage.get(key) or {} + + form = data_form.Form(formType="form", title="IMAP/SMTP Credentials") + + # Add instructions + form.instructions = [ + D_( + "Please provide your IMAP and SMTP credentials to configure the " + "connection." + ) + ] + + # Add identity fields + form.addField( + data_form.Field( + fieldType="text-single", + var="user_name", + label="User Name", + desc=D_('The display name to use in the "From" field of sent emails.'), + value=credentials.get("user_name"), + required=True, + ) + ) + + form.addField( + data_form.Field( + fieldType="text-single", + var="user_email", + label="User Email", + desc=D_('The email address to use in the "From" field of sent emails.'), + value=credentials.get("user_email"), + required=True, + ) + ) + + # Add fields for IMAP credentials + form.addField( + data_form.Field( + fieldType="text-single", + var="imap_host", + label="IMAP Host", + desc=D_("IMAP server hostname or IP address"), + value=credentials.get("imap_host"), + required=True, + ) + ) + form.addField( + data_form.Field( + fieldType="text-single", + var="imap_port", + label="IMAP Port", + desc=D_("IMAP server port (default: 993)"), + value=credentials.get("imap_port", "993"), + ) + ) + form.addField( + data_form.Field( + fieldType="text-single", + var="imap_username", + label="IMAP Username", + desc=D_("Username for IMAP authentication"), + value=credentials.get("imap_username"), + required=True, + ) + ) + form.addField( + data_form.Field( + fieldType="text-private", + var="imap_password", + label="IMAP Password", + desc=D_("Password for IMAP authentication"), + value=credentials.get("imap_password"), + required=True, + ) + ) + + # Add fields for SMTP credentials + form.addField( + data_form.Field( + fieldType="text-single", + var="smtp_host", + label="SMTP Host", + desc=D_("SMTP server hostname or IP address"), + value=credentials.get("smtp_host"), + required=True, + ) + ) + form.addField( + data_form.Field( + fieldType="text-single", + var="smtp_port", + label="SMTP Port", + desc=D_("SMTP server port (default: 587)"), + value=credentials.get("smtp_port", "587"), + ) + ) + form.addField( + data_form.Field( + fieldType="text-single", + var="smtp_username", + label="SMTP Username", + desc=D_("Username for SMTP authentication"), + value=credentials.get("smtp_username"), + required=True, + ) + ) + form.addField( + data_form.Field( + fieldType="text-private", + var="smtp_password", + label="SMTP Password", + desc=D_("Password for SMTP authentication"), + value=credentials.get("smtp_password"), + required=True, + ) + ) + + return bool(credentials), form + + def validate_field( + self, + form: data_form.Form, + key: str, + field_type: str, + min_value: int | None = None, + max_value: int | None = None, + default: str | int | None = None, + ) -> None: + """Validate a single field. + + @param form: The form containing the fields. + @param key: The key of the field to validate. + @param field_type: The expected type of the field value. + @param min_value: Optional minimum value for integer fields. + @param max_value: Optional maximum value for integer fields. + @param default: Default value to use if the field is missing. + @raise StanzaError: If the field value is invalid or missing. + """ + field = form.fields.get(key) + if field is None: + if default is None: + raise StanzaError("bad-request", text=f"{key} is required") + field = data_form.Field(var=key, value=str(default)) + form.addField(field) + + value = field.value + if field_type == "int": + try: + value = int(value) + if (min_value is not None and value < min_value) or ( + max_value is not None and value > max_value + ): + raise ValueError + except (ValueError, TypeError): + raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") + elif field_type == "str": + if not isinstance(value, str): + raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") + + # Basic email validation for user_email field + if key == "user_email": + # XXX: This is a minimal check. A complete email validation is notoriously + # difficult. + if not email_pattern.match(value): + raise StanzaError( + "bad-request", text=f"Invalid email address: {value}" + ) + + def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None: + """Validate the submitted IMAP/SMTP credentials form. + + @param submit_form: The submitted form containing IMAP/SMTP credentials. + @raise StanzaError: If any of the values are invalid. + """ + # Validate identity fields + self.validate_field(submit_form, "user_name", "str") + self.validate_field(submit_form, "user_email", "str") + + # Validate IMAP fields + self.validate_field(submit_form, "imap_host", "str") + self.validate_field( + submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993 + ) + self.validate_field(submit_form, "imap_username", "str") + self.validate_field(submit_form, "imap_password", "str") + + # Validate SMTP fields + self.validate_field(submit_form, "smtp_host", "str") + self.validate_field( + submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587 + ) + self.validate_field(submit_form, "smtp_username", "str") + self.validate_field(submit_form, "smtp_password", "str") + + async def on_new_email(self, to_jid: jid.JID, email: EmailMessage) -> None: + """Called when a new message has been received. + + @param to_jid: JID of the recipient. + @param email: Parsed email. + """ + assert self.client is not None + name, email_addr = parseaddr(email["from"]) + email_addr = email_addr.lower() + from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None)) + + # Get the email body + body_mime = email.get_body(("plain",)) + if body_mime is not None: + charset = body_mime.get_content_charset() or "utf-8" + body = body_mime.get_payload(decode=True).decode(charset, errors="replace") + else: + log.warning(f"No body found in email:\n{email}") + body = "" + + # Decode the subject + subject = email.get("subject") + if subject: + decoded_subject = decode_header(subject) + subject = "".join( + [ + part.decode(encoding or "utf-8") if isinstance(part, bytes) else part + for part, encoding in decoded_subject + ] + ).strip() + else: + subject = None + + client = self.client.get_virtual_client(from_jid) + await client.sendMessage(to_jid, {"": body}, {"": subject} if subject else None) + + async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None: + """Connect to IMAP service. + + [self.on_new_email] will be used as callback on new messages. + + @param from_jid: JID of the user associated with given credentials. + @param credentials: Email credentials. + """ + credentials = user_data.credentials + + connected = defer.Deferred() + factory = IMAPClientFactory( + user_data, + partial(self.on_new_email, from_jid.userhostJID()), + connected, + ) + reactor.connectTCP( + credentials["imap_host"], int(credentials["imap_port"]), factory + ) + await connected + + async def _on_registration_submit( + self, + client: SatXMPPEntity, + iq_elt: domish.Element, + submit_form: data_form.Form | None, + ) -> bool | None: + """Handle registration submit request. + + Submit form is validated, and credentials are stored. + @param client: client session. + iq_elt: IQ stanza of the submission request. + submit_form: submit form. + @return: True if successful. + None if the callback is not relevant for this request. + """ + if client != self.client: + return + assert self.storage is not None + from_jid = jid.JID(iq_elt["from"]).userhostJID() + + if submit_form is None: + # This is an unregistration request. + try: + user_data = self.users_data[from_jid] + except KeyError: + pass + else: + if user_data.imap_client is not None: + try: + await user_data.imap_client.logout() + except Exception: + log.exception(f"Can't log out {from_jid} from IMAP server.") + key = KEY_CREDENTIALS.format(from_jid=from_jid) + await self.storage.adel(key) + log.info(f"{from_jid} unregistered from this gateway.") + return True + + self.validate_imap_smtp_form(submit_form) + credentials = {key: field.value for key, field in submit_form.fields.items()} + user_data = self.users_data.get(from_jid) + if user_data is None: + # The user is not in cache, we cache current credentials. + user_data = self.users_data[from_jid] = UserData(credentials=credentials) + else: + # The user is known, we update credentials. + user_data.credentials = credentials + key = KEY_CREDENTIALS.format(from_jid=from_jid) + try: + await self.connect_imap(from_jid, user_data) + except Exception as e: + log.warning(f"Can't connect to IMAP server for {from_jid}") + credentials["imap_success"] = False + await self.storage.aset(key, credentials) + raise e + else: + log.debug(f"Connection successful to IMAP server for {from_jid}") + credentials["imap_success"] = True + await self.storage.aset(key, credentials) + return True + + +@implementer(iwokkel.IDisco) +class EmailGatewayHandler(XMPPHandler): + + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []