Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4308:472a938a46e3
cli (message/send): add arguments for message addressing:
commands have been added to add `to`, `cc` and `bcc` metadata for sending copy of the
message to several entities. Metadata such as `reply-to` `reply-room` and `no-reply` are
also supported.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | a7ec325246fb |
children | b56b1eae7994 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia Email Gateway Component # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from dataclasses import dataclass from email.header import decode_header from email.message import EmailMessage from email.mime.text import MIMEText from email.utils import formataddr, parseaddr from functools import partial import re from typing import Any, cast from twisted.internet import defer, reactor from twisted.mail import smtp from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.memory.persistent import LazyPersistentBinaryDict from libervia.backend.memory.sqla import select from libervia.backend.memory.sqla_mapping import PrivateIndBin from libervia.backend.models.core import MessageData from libervia.backend.plugins.plugin_xep_0077 import XEP_0077 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.tools.utils import aio from .models import Credentials, UserData from .imap import IMAPClientFactory log = getLogger(__name__) IMPORT_NAME = "email-gateway" NAME = "Libervia Email Gateway" PLUGIN_INFO = { C.PI_NAME: "Email Gateway Component", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0077", "XEP-0106"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "EmailGatewayComponent", C.PI_HANDLER: C.BOOL_TRUE, C.PI_DESCRIPTION: D_( "Gateway to handle email. Usual emails are handled as message, while mailing " "lists are converted to pubsub blogs." ), } CONF_SECTION = f"component {IMPORT_NAME}" PREFIX_KEY_CREDENTIALS = "CREDENTIALS_" KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}" email_pattern = re.compile(r"[^@]+@[^@]+\.[^@]+") class EmailGatewayComponent: IMPORT_NAME = IMPORT_NAME verbose = 0 def __init__(self, host): self.host = host self.client: SatXMPPEntity | None = None self.initalized = False self.storage: LazyPersistentBinaryDict | None = None self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"]) self._iq_register.register_handler( self._on_registration_form, self._on_registration_submit ) self._e = cast(XEP_0106, host.plugins["XEP-0106"]) # TODO: For the moment, all credentials are kept in cache; we should only keep the # X latest. self.users_data: dict[jid.JID, UserData] = {} host.trigger.add_with_check( "message_received", self, self._message_received_trigger, priority=-1000 ) async def _init(self) -> None: """Initialisation done after profile is connected""" assert self.client is not None self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME)) self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile) await self.connect_registered_users() @aio async def get_registered_users(self) -> dict[jid.JID, Credentials]: """Retrieve credentials for all registered users @return: a mapping from user JID to credentials data. """ assert self.client is not None profile_id = self.host.memory.storage.profiles[self.client.profile] async with self.host.memory.storage.session() as session: query = select(PrivateIndBin).where( PrivateIndBin.profile_id == profile_id, PrivateIndBin.namespace == IMPORT_NAME, PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS), ) result = await session.execute(query) return { jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value for p in result.scalars() } async def connect_registered_users(self) -> None: """Connected users already registered to the gateway.""" registered_data = await self.get_registered_users() for user_jid, credentials in registered_data.items(): user_data = self.users_data[user_jid] = UserData(credentials=credentials) if not credentials["imap_success"]: log.warning( f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user " "won't receive message from this gateway." ) else: try: await self.connect_imap(user_jid, user_data) except Exception as e: log.warning(f"Can't connect {user_jid} to IMAP: {e}.") else: log.debug(f"Connection to IMAP server successful for {user_jid}.") def get_handler(self, __) -> XMPPHandler: return EmailGatewayHandler() async def profile_connecting(self, client: SatXMPPEntity) -> None: self.client = client if not self.initalized: await self._init() self.initalized = True def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred, ) -> bool: """add the gateway workflow on post treatment""" if client != self.client: return True post_treat.addCallback( lambda mess_data: defer.ensureDeferred( self.on_message(client, mess_data, message_elt) ) ) return True async def on_message( self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element ) -> dict: """Called once message has been parsed @param client: Client session. @param mess_data: Message data. @return: Message data. """ if client != self.client: return mess_data from_jid = mess_data["from"].userhostJID() if mess_data["type"] not in ("chat", "normal"): log.warning(f"ignoring message with unexpected type: {mess_data}") return mess_data if not client.is_local(from_jid): log.warning(f"ignoring non local message: {mess_data}") return mess_data if not mess_data["to"].user: log.warning(f"ignoring message addressed to gateway itself: {mess_data}") return mess_data try: to_email = self._e.unescape(mess_data["to"].user) except ValueError: raise exceptions.DataError( f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.' ) try: body_lang, body = next(iter(mess_data["message"].items())) except (KeyError, StopIteration): log.warning(f"No body found: {mess_data}") body_lang, body = "", "" try: subject_lang, subject = next(iter(mess_data["subject"].items())) except (KeyError, StopIteration): subject_lang, subject = "", None if not body and not subject: log.warning(f"Ignoring empty message: {mess_data}") return mess_data try: await self.send_email( from_jid=from_jid, to_email=to_email, body=body, subject=subject, ) except exceptions.UnknownEntityError: log.warning(f"Can't send message, user {from_jid} is not registered.") message_error_elt = StanzaError( "subscription-required", text="User need to register to the gateway before sending emails.", ).toResponse(message_elt) await client.a_send(message_error_elt) raise exceptions.CancelError("User not registered.") return mess_data async def send_email( self, from_jid: jid.JID, to_email: str, body: str, subject: str | None, ) -> None: """Send an email using sender credentials. Credentials will be retrieve from cache, or database. @param from_jid: Bare JID of the sender. @param to_email: Email address of the destinee. @param body: Body of the email. @param subject: Subject of the email. @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found. """ # We need a bare jid. assert self.storage is not None assert not from_jid.resource try: user_data = self.users_data[from_jid] except KeyError: key = KEY_CREDENTIALS.format(from_jid=from_jid) credentials = await self.storage.get(key) if credentials is None: raise exceptions.UnknownEntityError( f"No credentials found for {from_jid}." ) self.users_data[from_jid] = UserData(credentials) else: credentials = user_data.credentials msg = MIMEText(body, "plain", "UTF-8") if subject is not None: msg["Subject"] = subject msg["From"] = formataddr( (credentials["user_name"] or None, credentials["user_email"]) ) msg["To"] = to_email sender_domain = credentials["user_email"].split("@", 1)[-1] await smtp.sendmail( credentials["smtp_host"].encode(), credentials["user_email"].encode(), [to_email.encode()], msg.as_bytes(), senderDomainName=sender_domain, port=int(credentials["smtp_port"]), username=credentials["smtp_username"].encode(), password=credentials["smtp_password"].encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. requireTransportSecurity=True, ) async def _on_registration_form( self, client: SatXMPPEntity, iq_elt: domish.Element ) -> tuple[bool, data_form.Form] | None: if client != self.client: return assert self.storage is not None from_jid = jid.JID(iq_elt["from"]) key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost()) credentials = await self.storage.get(key) or {} form = data_form.Form(formType="form", title="IMAP/SMTP Credentials") # Add instructions form.instructions = [ D_( "Please provide your IMAP and SMTP credentials to configure the " "connection." ) ] # Add identity fields form.addField( data_form.Field( fieldType="text-single", var="user_name", label="User Name", desc=D_('The display name to use in the "From" field of sent emails.'), value=credentials.get("user_name"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="user_email", label="User Email", desc=D_('The email address to use in the "From" field of sent emails.'), value=credentials.get("user_email"), required=True, ) ) # Add fields for IMAP credentials form.addField( data_form.Field( fieldType="text-single", var="imap_host", label="IMAP Host", desc=D_("IMAP server hostname or IP address"), value=credentials.get("imap_host"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="imap_port", label="IMAP Port", desc=D_("IMAP server port (default: 993)"), value=credentials.get("imap_port", "993"), ) ) form.addField( data_form.Field( fieldType="text-single", var="imap_username", label="IMAP Username", desc=D_("Username for IMAP authentication"), value=credentials.get("imap_username"), required=True, ) ) form.addField( data_form.Field( fieldType="text-private", var="imap_password", label="IMAP Password", desc=D_("Password for IMAP authentication"), value=credentials.get("imap_password"), required=True, ) ) # Add fields for SMTP credentials form.addField( data_form.Field( fieldType="text-single", var="smtp_host", label="SMTP Host", desc=D_("SMTP server hostname or IP address"), value=credentials.get("smtp_host"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="smtp_port", label="SMTP Port", desc=D_("SMTP server port (default: 587)"), value=credentials.get("smtp_port", "587"), ) ) form.addField( data_form.Field( fieldType="text-single", var="smtp_username", label="SMTP Username", desc=D_("Username for SMTP authentication"), value=credentials.get("smtp_username"), required=True, ) ) form.addField( data_form.Field( fieldType="text-private", var="smtp_password", label="SMTP Password", desc=D_("Password for SMTP authentication"), value=credentials.get("smtp_password"), required=True, ) ) return bool(credentials), form def validate_field( self, form: data_form.Form, key: str, field_type: str, min_value: int | None = None, max_value: int | None = None, default: str | int | None = None, ) -> None: """Validate a single field. @param form: The form containing the fields. @param key: The key of the field to validate. @param field_type: The expected type of the field value. @param min_value: Optional minimum value for integer fields. @param max_value: Optional maximum value for integer fields. @param default: Default value to use if the field is missing. @raise StanzaError: If the field value is invalid or missing. """ field = form.fields.get(key) if field is None: if default is None: raise StanzaError("bad-request", text=f"{key} is required") field = data_form.Field(var=key, value=str(default)) form.addField(field) value = field.value if field_type == "int": try: value = int(value) if (min_value is not None and value < min_value) or ( max_value is not None and value > max_value ): raise ValueError except (ValueError, TypeError): raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") elif field_type == "str": if not isinstance(value, str): raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") # Basic email validation for user_email field if key == "user_email": # XXX: This is a minimal check. A complete email validation is notoriously # difficult. if not email_pattern.match(value): raise StanzaError( "bad-request", text=f"Invalid email address: {value}" ) def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None: """Validate the submitted IMAP/SMTP credentials form. @param submit_form: The submitted form containing IMAP/SMTP credentials. @raise StanzaError: If any of the values are invalid. """ # Validate identity fields self.validate_field(submit_form, "user_name", "str") self.validate_field(submit_form, "user_email", "str") # Validate IMAP fields self.validate_field(submit_form, "imap_host", "str") self.validate_field( submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993 ) self.validate_field(submit_form, "imap_username", "str") self.validate_field(submit_form, "imap_password", "str") # Validate SMTP fields self.validate_field(submit_form, "smtp_host", "str") self.validate_field( submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587 ) self.validate_field(submit_form, "smtp_username", "str") self.validate_field(submit_form, "smtp_password", "str") async def on_new_email(self, to_jid: jid.JID, email: EmailMessage) -> None: """Called when a new message has been received. @param to_jid: JID of the recipient. @param email: Parsed email. """ assert self.client is not None name, email_addr = parseaddr(email["from"]) email_addr = email_addr.lower() from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None)) # Get the email body body_mime = email.get_body(("plain",)) if body_mime is not None: charset = body_mime.get_content_charset() or "utf-8" body = body_mime.get_payload(decode=True).decode(charset, errors="replace") else: log.warning(f"No body found in email:\n{email}") body = "" # Decode the subject subject = email.get("subject") if subject: decoded_subject = decode_header(subject) subject = "".join( [ part.decode(encoding or "utf-8") if isinstance(part, bytes) else part for part, encoding in decoded_subject ] ).strip() else: subject = None client = self.client.get_virtual_client(from_jid) await client.sendMessage(to_jid, {"": body}, {"": subject} if subject else None) async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None: """Connect to IMAP service. [self.on_new_email] will be used as callback on new messages. @param from_jid: JID of the user associated with given credentials. @param credentials: Email credentials. """ credentials = user_data.credentials connected = defer.Deferred() factory = IMAPClientFactory( user_data, partial(self.on_new_email, from_jid.userhostJID()), connected, ) reactor.connectTCP( credentials["imap_host"], int(credentials["imap_port"]), factory ) await connected async def _on_registration_submit( self, client: SatXMPPEntity, iq_elt: domish.Element, submit_form: data_form.Form | None, ) -> bool | None: """Handle registration submit request. Submit form is validated, and credentials are stored. @param client: client session. iq_elt: IQ stanza of the submission request. submit_form: submit form. @return: True if successful. None if the callback is not relevant for this request. """ if client != self.client: return assert self.storage is not None from_jid = jid.JID(iq_elt["from"]).userhostJID() if submit_form is None: # This is an unregistration request. try: user_data = self.users_data[from_jid] except KeyError: pass else: if user_data.imap_client is not None: try: await user_data.imap_client.logout() except Exception: log.exception(f"Can't log out {from_jid} from IMAP server.") key = KEY_CREDENTIALS.format(from_jid=from_jid) await self.storage.adel(key) log.info(f"{from_jid} unregistered from this gateway.") return True self.validate_imap_smtp_form(submit_form) credentials = {key: field.value for key, field in submit_form.fields.items()} user_data = self.users_data.get(from_jid) if user_data is None: # The user is not in cache, we cache current credentials. user_data = self.users_data[from_jid] = UserData(credentials=credentials) else: # The user is known, we update credentials. user_data.credentials = credentials key = KEY_CREDENTIALS.format(from_jid=from_jid) try: await self.connect_imap(from_jid, user_data) except Exception as e: log.warning(f"Can't connect to IMAP server for {from_jid}") credentials["imap_success"] = False await self.storage.aset(key, credentials) raise e else: log.debug(f"Connection successful to IMAP server for {from_jid}") credentials["imap_success"] = True await self.storage.aset(key, credentials) return True @implementer(iwokkel.IDisco) class EmailGatewayHandler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []