Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4318:27bb22eace65
tests (unit/email gateway): add test for XEP-0131 handling:
rel 451
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 28 Sep 2024 15:59:48 +0200 |
parents | 055930cc81f9 |
children | 95792a1f26c7 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia Email Gateway Component # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from email.header import decode_header from email.message import EmailMessage from email.mime.text import MIMEText from email.utils import formataddr, getaddresses, parseaddr from functools import partial import re from typing import cast from pydantic import BaseModel from twisted.internet import defer, reactor from twisted.mail import smtp from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error as jabber_error from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.memory.persistent import LazyPersistentBinaryDict from libervia.backend.memory.sqla import select from libervia.backend.memory.sqla_mapping import PrivateIndBin from libervia.backend.models.core import MessageData from libervia.backend.plugins.plugin_xep_0033 import ( RECIPIENT_FIELDS, AddressType, AddressesData, ) from libervia.backend.plugins.plugin_xep_0077 import XEP_0077 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0131 import XEP_0131, HeadersData, Urgency from libervia.backend.tools.utils import aio from .models import Credentials, UserData from .imap import IMAPClientFactory log = getLogger(__name__) IMPORT_NAME = "email-gateway" NAME = "Libervia Email Gateway" PLUGIN_INFO = { C.PI_NAME: "Email Gateway Component", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0033", "XEP-0077", "XEP-0106"], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "EmailGatewayComponent", C.PI_HANDLER: C.BOOL_TRUE, C.PI_DESCRIPTION: D_( "Gateway to handle email. Usual emails are handled as message, while mailing " "lists are converted to pubsub blogs." ), } CONF_SECTION = f"component {IMPORT_NAME}" PREFIX_KEY_CREDENTIALS = "CREDENTIALS_" KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}" email_pattern = re.compile(r"[^@]+@[^@]+\.[^@]+") class SendMailExtra(BaseModel): addresses: AddressesData | None = None headers: HeadersData | None = None class EmailGatewayComponent: IMPORT_NAME = IMPORT_NAME verbose = 0 def __init__(self, host): self.host = host self.client: SatXMPPEntity | None = None self.initalized = False self.storage: LazyPersistentBinaryDict | None = None self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"]) self._iq_register.register_handler( self._on_registration_form, self._on_registration_submit ) self._e = cast(XEP_0106, host.plugins["XEP-0106"]) self._shim = cast(XEP_0131, host.plugins["XEP-0131"]) # TODO: For the moment, all credentials are kept in cache; we should only keep the # X latest. self.users_data: dict[jid.JID, UserData] = {} host.trigger.add_with_check( "message_received", self, self._message_received_trigger, priority=-1000 ) async def _init(self) -> None: """Initialisation done after profile is connected""" assert self.client is not None self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME)) self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile) await self.connect_registered_users() @aio async def get_registered_users(self) -> dict[jid.JID, Credentials]: """Retrieve credentials for all registered users @return: a mapping from user JID to credentials data. """ assert self.client is not None profile_id = self.host.memory.storage.profiles[self.client.profile] async with self.host.memory.storage.session() as session: query = select(PrivateIndBin).where( PrivateIndBin.profile_id == profile_id, PrivateIndBin.namespace == IMPORT_NAME, PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS), ) result = await session.execute(query) return { jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value for p in result.scalars() } async def connect_registered_users(self) -> None: """Connected users already registered to the gateway.""" registered_data = await self.get_registered_users() for user_jid, credentials in registered_data.items(): user_data = self.users_data[user_jid] = UserData(credentials=credentials) if not credentials["imap_success"]: log.warning( f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user " "won't receive message from this gateway." ) else: try: await self.connect_imap(user_jid, user_data) except Exception as e: log.warning(f"Can't connect {user_jid} to IMAP: {e}.") else: log.debug(f"Connection to IMAP server successful for {user_jid}.") def get_handler(self, __) -> XMPPHandler: return EmailGatewayHandler() async def profile_connecting(self, client: SatXMPPEntity) -> None: self.client = client if not self.initalized: await self._init() self.initalized = True def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred, ) -> bool: """add the gateway workflow on post treatment""" if client != self.client: return True post_treat.addCallback( lambda mess_data: defer.ensureDeferred( self.on_message(client, mess_data, message_elt) ) ) return True async def on_message( self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element ) -> dict: """Called once message has been parsed @param client: Client session. @param mess_data: Message data. @return: Message data. """ if client != self.client: return mess_data from_jid = mess_data["from"].userhostJID() extra_kw = {} if mess_data["type"] not in ("chat", "normal"): log.warning(f"ignoring message with unexpected type: {mess_data}") return mess_data if not client.is_local(from_jid): log.warning(f"ignoring non local message: {mess_data}") return mess_data if not mess_data["to"].user: addresses = mess_data["extra"].get("addresses") if not addresses: log.warning(f"ignoring message addressed to gateway itself: {mess_data}") return mess_data else: to_email = None extra_kw["addresses"] = addresses else: try: to_email = self._e.unescape(mess_data["to"].user) except ValueError: raise exceptions.DataError( f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.' ) self._shim.move_keywords_to_headers(mess_data["extra"]) headers = mess_data["extra"].get("headers") if headers: extra_kw["headers"] = headers try: body_lang, body = next(iter(mess_data["message"].items())) except (KeyError, StopIteration): log.warning(f"No body found: {mess_data}") body_lang, body = "", "" try: subject_lang, subject = next(iter(mess_data["subject"].items())) except (KeyError, StopIteration): subject_lang, subject = "", None if not body and not subject: log.warning(f"Ignoring empty message: {mess_data}") return mess_data try: await self.send_email( from_jid=from_jid, to_email=to_email, body=body, subject=subject, extra=SendMailExtra(**extra_kw) if extra_kw else None, ) except exceptions.UnknownEntityError: log.warning(f"Can't send message, user {from_jid} is not registered.") message_error_elt = StanzaError( "subscription-required", text="User need to register to the gateway before sending emails.", ).toResponse(message_elt) await client.a_send(message_error_elt) raise exceptions.CancelError("User not registered.") except StanzaError as e: log.warning("Can't send message: {e}") message_error_elt = e.toResponse(message_elt) await client.a_send(message_error_elt) raise exceptions.CancelError("Can't send message: {e}") return mess_data def jid_to_email( self, client: SatXMPPEntity, address_jid: jid.JID, credentials: dict[str, str] ) -> str: """Convert a JID to an email address. If JID is from the gateway, email address will be extracted. Otherwise, the gateway email will be used, with XMPP address specified in name part. @param address_jid: JID of the recipient. @param credentials: Sender credentials. @return: Email address. """ if address_jid and address_jid.host.endswith(str(client.jid)): return self._e.unescape(address_jid.user) else: email_address = credentials["user_email"] if address_jid: email_address = formataddr((f"xmpp:{address_jid}", email_address)) return email_address async def send_email( self, from_jid: jid.JID, to_email: str | None, body: str, subject: str | None, extra: SendMailExtra | None = None, ) -> None: """Send an email using sender credentials. Credentials will be retrieve from cache, or database. @param from_jid: Bare JID of the sender. @param to_email: Email address of the destinee. @param body: Body of the email. @param subject: Subject of the email. @param extra: Extra data. @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found. """ assert self.client is not None if extra is None: extra = SendMailExtra() if to_email is None and (extra.addresses is None or not extra.addresses.to): raise exceptions.InternalError( '"to_email" can\'t be None if there is no "to" address!' ) # We need a bare jid. assert self.storage is not None assert not from_jid.resource try: user_data = self.users_data[from_jid] except KeyError: key = KEY_CREDENTIALS.format(from_jid=from_jid) credentials = await self.storage.get(key) if credentials is None: raise exceptions.UnknownEntityError( f"No credentials found for {from_jid}." ) self.users_data[from_jid] = UserData(credentials) else: credentials = user_data.credentials msg = MIMEText(body, "plain", "UTF-8") if subject is not None: msg["Subject"] = subject msg["From"] = formataddr( (credentials["user_name"] or None, credentials["user_email"]) ) if extra.addresses: assert extra.addresses.to main_to_address = extra.addresses.to[0] assert main_to_address.jid to_email = self.jid_to_email(self.client, main_to_address.jid, credentials) for field in RECIPIENT_FIELDS: addresses = getattr(extra.addresses, field) if not addresses: continue for address in addresses: if not address.delivered and ( address.jid is None or address.jid.host != str(self.client.jid) ): log.warning( "Received undelivered message to external JID, this is not " "allowed! Cancelling the message sending." ) stanza_err = jabber_error.StanzaError( "forbidden", text="Multicasting (XEP-0033 addresses) can only be used " "with JID from this gateway, not external ones. " f" {address.jid} can't be delivered by this gateway and " "should be delivered by server instead.", ) raise stanza_err email_addresses = [ self.jid_to_email(self.client, address.jid, credentials) for address in addresses if address.jid ] if email_addresses: msg[field.upper()] = ", ".join(email_addresses) else: assert to_email is not None msg["To"] = to_email sender_domain = credentials["user_email"].split("@", 1)[-1] if extra.headers: if extra.headers.keywords: msg["Keywords"] = extra.headers.keywords if extra.headers.urgency: urgency = extra.headers.urgency if urgency == Urgency.medium: importance = "normal" else: importance = urgency msg["Importance"] = importance await smtp.sendmail( credentials["smtp_host"].encode(), credentials["user_email"].encode(), [to_email.encode()], msg.as_bytes(), senderDomainName=sender_domain, port=int(credentials["smtp_port"]), username=credentials["smtp_username"].encode(), password=credentials["smtp_password"].encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. requireTransportSecurity=True, ) async def _on_registration_form( self, client: SatXMPPEntity, iq_elt: domish.Element ) -> tuple[bool, data_form.Form] | None: if client != self.client: return assert self.storage is not None from_jid = jid.JID(iq_elt["from"]) key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost()) credentials = await self.storage.get(key) or {} form = data_form.Form(formType="form", title="IMAP/SMTP Credentials") # Add instructions form.instructions = [ D_( "Please provide your IMAP and SMTP credentials to configure the " "connection." ) ] # Add identity fields form.addField( data_form.Field( fieldType="text-single", var="user_name", label="User Name", desc=D_('The display name to use in the "From" field of sent emails.'), value=credentials.get("user_name"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="user_email", label="User Email", desc=D_('The email address to use in the "From" field of sent emails.'), value=credentials.get("user_email"), required=True, ) ) # Add fields for IMAP credentials form.addField( data_form.Field( fieldType="text-single", var="imap_host", label="IMAP Host", desc=D_("IMAP server hostname or IP address"), value=credentials.get("imap_host"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="imap_port", label="IMAP Port", desc=D_("IMAP server port (default: 993)"), value=credentials.get("imap_port", "993"), ) ) form.addField( data_form.Field( fieldType="text-single", var="imap_username", label="IMAP Username", desc=D_("Username for IMAP authentication"), value=credentials.get("imap_username"), required=True, ) ) form.addField( data_form.Field( fieldType="text-private", var="imap_password", label="IMAP Password", desc=D_("Password for IMAP authentication"), value=credentials.get("imap_password"), required=True, ) ) # Add fields for SMTP credentials form.addField( data_form.Field( fieldType="text-single", var="smtp_host", label="SMTP Host", desc=D_("SMTP server hostname or IP address"), value=credentials.get("smtp_host"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="smtp_port", label="SMTP Port", desc=D_("SMTP server port (default: 587)"), value=credentials.get("smtp_port", "587"), ) ) form.addField( data_form.Field( fieldType="text-single", var="smtp_username", label="SMTP Username", desc=D_("Username for SMTP authentication"), value=credentials.get("smtp_username"), required=True, ) ) form.addField( data_form.Field( fieldType="text-private", var="smtp_password", label="SMTP Password", desc=D_("Password for SMTP authentication"), value=credentials.get("smtp_password"), required=True, ) ) return bool(credentials), form def validate_field( self, form: data_form.Form, key: str, field_type: str, min_value: int | None = None, max_value: int | None = None, default: str | int | None = None, ) -> None: """Validate a single field. @param form: The form containing the fields. @param key: The key of the field to validate. @param field_type: The expected type of the field value. @param min_value: Optional minimum value for integer fields. @param max_value: Optional maximum value for integer fields. @param default: Default value to use if the field is missing. @raise StanzaError: If the field value is invalid or missing. """ field = form.fields.get(key) if field is None: if default is None: raise StanzaError("bad-request", text=f"{key} is required") field = data_form.Field(var=key, value=str(default)) form.addField(field) value = field.value if field_type == "int": try: value = int(value) if (min_value is not None and value < min_value) or ( max_value is not None and value > max_value ): raise ValueError except (ValueError, TypeError): raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") elif field_type == "str": if not isinstance(value, str): raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") # Basic email validation for user_email field if key == "user_email": # XXX: This is a minimal check. A complete email validation is notoriously # difficult. if not email_pattern.match(value): raise StanzaError( "bad-request", text=f"Invalid email address: {value}" ) def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None: """Validate the submitted IMAP/SMTP credentials form. @param submit_form: The submitted form containing IMAP/SMTP credentials. @raise StanzaError: If any of the values are invalid. """ # Validate identity fields self.validate_field(submit_form, "user_name", "str") self.validate_field(submit_form, "user_email", "str") # Validate IMAP fields self.validate_field(submit_form, "imap_host", "str") self.validate_field( submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993 ) self.validate_field(submit_form, "imap_username", "str") self.validate_field(submit_form, "imap_password", "str") # Validate SMTP fields self.validate_field(submit_form, "smtp_host", "str") self.validate_field( submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587 ) self.validate_field(submit_form, "smtp_username", "str") self.validate_field(submit_form, "smtp_password", "str") def email_to_jid( self, client: SatXMPPEntity, user_email: str, user_jid: jid.JID, email_name: str, email_addr: str, ) -> tuple[jid.JID, str | None]: """Convert an email address to a JID and extract the name if present. @param client: Client session. @param user_email: Email address of the gateway user. @param user_jid: JID of the gateway user. @param email_name: Email associated name. @param email_addr: Email address. @return: Tuple of JID and name (if present). """ email_name = email_name.strip() if email_name.startswith("xmpp:"): return jid.JID(email_name[5:]), None elif email_addr == user_email: return (user_jid, None) else: return ( jid.JID(None, (self._e.escape(email_addr), client.jid.host, None)), email_name or None, ) async def on_new_email( self, user_data: UserData, user_jid: jid.JID, email: EmailMessage ) -> None: """Called when a new message has been received. @param user_data: user data, used to map registered user email to corresponding jid. @param user_jid: JID of the recipient. @param email: Parsed email. """ assert self.client is not None user_email = user_data.credentials["user_email"] name, email_addr = parseaddr(email["from"]) email_addr = email_addr.lower() from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None)) # Get the email body body_mime = email.get_body(("plain",)) if body_mime is not None: charset = body_mime.get_content_charset() or "utf-8" body = body_mime.get_payload(decode=True).decode(charset, errors="replace") else: log.warning(f"No body found in email:\n{email}") body = "" # Decode the subject subject = email.get("subject") if subject: decoded_subject = decode_header(subject) subject = "".join( [ part.decode(encoding or "utf-8") if isinstance(part, bytes) else part for part, encoding in decoded_subject ] ).strip() else: subject = None # Parse recipient fields kwargs = {} for field in RECIPIENT_FIELDS: email_addresses = email.get_all(field) if email_addresses: jids_and_names = [ self.email_to_jid(self.client, user_email, user_jid, name, addr) for name, addr in getaddresses(email_addresses) ] kwargs[field] = [ AddressType(jid=jid, desc=name) for jid, name in jids_and_names ] # At least "to" header should be set, so kwargs should never be empty assert kwargs addresses_data = AddressesData(**kwargs) # Parse reply-to field reply_to_addresses = email.get_all("reply-to") if reply_to_addresses: jids_with_names = [ self.email_to_jid(self.client, user_email, user_jid, name, addr) for name, addr in getaddresses(reply_to_addresses) ] addresses_data.replyto = [ AddressType(jid=jid, desc=name) for jid, name in jids_with_names ] # Set noreply flag # The is no flag to indicate a no-reply message, so we check common user parts in # from and reply-to headers. from_addresses = [email_addr] if reply_to_addresses: from_addresses.extend( addr for a in reply_to_addresses if (addr := parseaddr(a)[1]) ) for from_address in from_addresses: from_user_part = from_address.split("@", 1)[0].lower() if from_user_part in ( "no-reply", "noreply", "do-not-reply", "donotreply", "notification", "notifications", ): addresses_data.noreply = True break extra = {} if ( not addresses_data.replyto and not addresses_data.noreply and not addresses_data.cc and not addresses_data.bcc and addresses_data.to == [AddressType(jid=user_jid)] ): # The main recipient is the only one, and there is no other metadata: there is # no need to add addresses metadata. pass else: for address in addresses_data.addresses: if address.jid and ( address.jid == user_jid or address.jid.host == str(self.client.jid) ): # Those are email address, and have been delivered by the sender, # other JID addresses will have to be delivered by us. address.delivered = True extra["addresses"] = addresses_data.model_dump(mode="json", exclude_none=True) # We look for interesting headers headers = {} keywords_headers = email.get_all("keywords") if keywords_headers: keywords = ",".join(keywords_headers) headers["keywords"] = keywords importance = email["importance"] if importance: # We convert to urgency if importance in ("low", "high"): headers["urgency"] = importance elif importance == "normal": headers["urgency"] = "medium" else: log.warning("Ignoring invalid importance header: {importance!r}") if headers: extra["headers"] = HeadersData(**headers).model_dump( mode="json", exclude_none=True ) client = self.client.get_virtual_client(from_jid) await client.sendMessage( user_jid, {"": body}, {"": subject} if subject else None, extra=extra, ) async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None: """Connect to IMAP service. [self.on_new_email] will be used as callback on new messages. @param from_jid: JID of the user associated with given credentials. @param credentials: Email credentials. """ credentials = user_data.credentials connected = defer.Deferred() factory = IMAPClientFactory( user_data, partial(self.on_new_email, user_data, from_jid.userhostJID()), connected, ) reactor.connectTCP( credentials["imap_host"], int(credentials["imap_port"]), factory ) await connected async def _on_registration_submit( self, client: SatXMPPEntity, iq_elt: domish.Element, submit_form: data_form.Form | None, ) -> bool | None: """Handle registration submit request. Submit form is validated, and credentials are stored. @param client: client session. iq_elt: IQ stanza of the submission request. submit_form: submit form. @return: True if successful. None if the callback is not relevant for this request. """ if client != self.client: return assert self.storage is not None from_jid = jid.JID(iq_elt["from"]).userhostJID() if submit_form is None: # This is an unregistration request. try: user_data = self.users_data[from_jid] except KeyError: pass else: if user_data.imap_client is not None: try: await user_data.imap_client.logout() except Exception: log.exception(f"Can't log out {from_jid} from IMAP server.") key = KEY_CREDENTIALS.format(from_jid=from_jid) await self.storage.adel(key) log.info(f"{from_jid} unregistered from this gateway.") return True self.validate_imap_smtp_form(submit_form) credentials = {key: field.value for key, field in submit_form.fields.items()} user_data = self.users_data.get(from_jid) if user_data is None: # The user is not in cache, we cache current credentials. user_data = self.users_data[from_jid] = UserData(credentials=credentials) else: # The user is known, we update credentials. user_data.credentials = credentials key = KEY_CREDENTIALS.format(from_jid=from_jid) try: await self.connect_imap(from_jid, user_data) except Exception as e: log.warning(f"Can't connect to IMAP server for {from_jid}") credentials["imap_success"] = False await self.storage.aset(key, credentials) raise e else: log.debug(f"Connection successful to IMAP server for {from_jid}") credentials["imap_success"] = True await self.storage.aset(key, credentials) return True @implementer(iwokkel.IDisco) class EmailGatewayHandler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []