Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4401:ae26233b655f default tip
doc (components): Add message cleaning section to email gateway doc:
fix 464
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 11 Sep 2025 21:17:51 +0200 |
parents | fe09446a09ce |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia Email Gateway Component # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import base64 from email import encoders from email.header import decode_header from email.message import EmailMessage from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formataddr, formatdate, getaddresses, make_msgid, parseaddr from functools import partial import hashlib from pathlib import Path import re import shutil import tempfile from typing import NamedTuple, TYPE_CHECKING, cast from pydantic import BaseModel from twisted.internet import defer, reactor from twisted.internet.threads import deferToThread from twisted.mail import smtp from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber import error as jabber_error from twisted.words.protocols.jabber.error import StanzaError from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend import G from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPComponent, SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.memory.persistent import LazyPersistentBinaryDict from libervia.backend.memory.sqla import select from libervia.backend.memory.sqla_mapping import AccessModel, Affiliation, PrivateIndBin, PublishModel, PubsubAffiliation from libervia.backend.models.core import MessageData from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import ( EmailGWPubsubService, ) from libervia.backend.plugins.plugin_exp_gre import GRE, GetDataHandler from libervia.backend.plugins.plugin_misc_text_syntaxes import TextSyntaxes from libervia.backend.plugins.plugin_sec_gre_encrypter_openpgp import NS_GRE_OPENPGP from libervia.backend.plugins.plugin_sec_gre_formatter_mime import NS_GRE_MIME from libervia.backend.plugins.plugin_xep_0033 import ( AddressType, AddressesData, RECIPIENT_FIELDS, ) from libervia.backend.plugins.plugin_xep_0077 import XEP_0077 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106 from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency, XEP_0131 from libervia.backend.plugins.plugin_xep_0277 import Comment, MbData, XEP_0277 from libervia.backend.plugins.plugin_xep_0373 import binary_to_ascii_armor from libervia.backend.plugins.plugin_xep_0498 import XEP_0498 from libervia.backend.plugins.plugin_xep_0499 import XEP_0499, ExtDiscoOptions from libervia.backend.tools.common import date_utils, regex, uri from libervia.backend.tools.utils import aio from .cleaning import convert_to_html_and_detect_noise from .imap import IMAPClientFactory from .models import Credentials, UserData if TYPE_CHECKING: from libervia.backend.core.main import LiberviaBackend log = getLogger(__name__) IMPORT_NAME = "email-gateway" NAME = "Libervia Email Gateway" PLUGIN_INFO = { C.PI_NAME: "Email Gateway Component", C.PI_IMPORT_NAME: IMPORT_NAME, C.PI_MODES: [C.PLUG_MODE_COMPONENT], C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT, C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: [ "XEP-0033", "XEP-0077", "XEP-0106", "XEP-0277", "XEP-0498", "XEP-0499", "GRE", "GRE-MIME", "GRE-OpenPGP", "PUBSUB_CACHE", "TEXT_SYNTAXES" ], C.PI_RECOMMENDATIONS: [], C.PI_MAIN: "EmailGatewayComponent", C.PI_HANDLER: C.BOOL_TRUE, C.PI_DESCRIPTION: D_( "Gateway to handle email. Usual emails are handled as message, while mailing " "lists are converted to pubsub blogs." ), } CONF_SECTION = f"component {IMPORT_NAME}" PREFIX_KEY_CREDENTIALS = "CREDENTIALS_" KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}" class FileMetadata(NamedTuple): path: Path hash: str size: int class SendMailExtra(BaseModel): addresses: AddressesData | None = None headers: HeadersData | None = None class EmailGatewayComponent(GetDataHandler): IMPORT_NAME = IMPORT_NAME gre_formatters = [NS_GRE_MIME] gre_encrypters = [NS_GRE_OPENPGP] verbose = 0 def __init__(self, host: "LiberviaBackend") -> None: """ Initialize the Email Gateway component. @param host: The Libervia backend instance. """ self.host = host self.client: SatXMPPComponent | None = None self.initalized = False self.storage: LazyPersistentBinaryDict | None = None self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"]) self._iq_register.register_handler( self._on_registration_form, self._on_registration_submit ) self._e = cast(XEP_0106, host.plugins["XEP-0106"]) self._shim = cast(XEP_0131, host.plugins["XEP-0131"]) self._mb = cast(XEP_0277, host.plugins["XEP-0277"]) self._pfs = cast(XEP_0498, host.plugins["XEP-0498"]) self._ext_disco = cast(XEP_0499, host.plugins["XEP-0499"]) self._gre = cast(GRE, host.plugins["GRE"]) self._syntax = cast(TextSyntaxes, host.plugins["TEXT_SYNTAXES"]) # TODO: For the moment, all credentials are kept in cache; we should only keep the # X latest. self.users_data: dict[jid.JID, UserData] = {} self.files_path = self.host.get_local_path(None, C.FILES_DIR) host.trigger.add_with_check( "message_received", self, self._message_received_trigger, priority=-1000 ) async def _init(self) -> None: """Initialisation done after profile is connected""" assert self.client is not None self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME)) self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile) await self.connect_registered_users() @aio async def get_registered_users(self) -> dict[jid.JID, Credentials]: """Retrieve credentials for all registered users @return: a mapping from user JID to credentials data. """ assert self.client is not None profile_id = self.host.memory.storage.profiles[self.client.profile] async with self.host.memory.storage.session() as session: query = select(PrivateIndBin).where( PrivateIndBin.profile_id == profile_id, PrivateIndBin.namespace == IMPORT_NAME, PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS), ) result = await session.execute(query) return { jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): Credentials(**p.value) for p in result.scalars() } async def connect_registered_users(self) -> None: """Connected users already registered to the gateway.""" registered_data = await self.get_registered_users() for user_jid, credentials in registered_data.items(): user_data = self.users_data[user_jid] = UserData(credentials=credentials) if not credentials.imap_success: log.warning( f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user " "won't receive message from this gateway." ) else: try: await self.connect_imap(user_jid, user_data) except Exception as e: log.warning(f"Can't connect {user_jid} to IMAP: {e}.") else: log.debug(f"Connection to IMAP server successful for {user_jid}.") def get_handler(self, __) -> tuple[XMPPHandler, XMPPHandler]: return EmailGatewayHandler(), EmailGWPubsubService(self) async def profile_connecting(self, client: SatXMPPEntity) -> None: assert isinstance(client, SatXMPPComponent) self.client = client self._gre.register_get_data_handler(client, self) if not self.initalized: await self._init() self.initalized = True def _message_received_trigger( self, client: SatXMPPEntity, message_elt: domish.Element, post_treat: defer.Deferred, ) -> bool: """add the gateway workflow on post treatment""" if client != self.client: return True post_treat.addCallback( lambda mess_data: defer.ensureDeferred( self.on_message(client, mess_data, message_elt) ) ) return True async def on_message( self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element ) -> dict: """Called once message has been parsed @param client: Client session. @param mess_data: Message data. @return: Message data. """ if client != self.client: return mess_data from_jid = mess_data["from"].userhostJID() extra_kw = {} if mess_data["type"] not in ("chat", "normal"): log.warning(f"ignoring message with unexpected type: {mess_data}") return mess_data if not client.is_local(from_jid): log.warning(f"ignoring non local message: {mess_data}") return mess_data if not mess_data["to"].user: addresses = mess_data["extra"].get("addresses") if not addresses: log.warning(f"ignoring message addressed to gateway itself: {mess_data}") return mess_data else: to_email = None extra_kw["addresses"] = addresses else: try: to_email = self._e.unescape(mess_data["to"].user) except ValueError: raise exceptions.DataError( f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.' ) encrypted_payload = self._gre.get_encrypted_payload(message_elt) try: if encrypted_payload is not None: # We convert the base64 datat to ASCII Armor encrypted_binary = base64.b64decode(encrypted_payload) encrypted_payload = binary_to_ascii_armor(encrypted_binary) assert to_email is not None subject = "This is an encrypted message." outer = MIMEMultipart('encrypted', protocol="application/pgp-encrypted") outer["Subject"] = subject # FIXME: use credentials here. outer["From"] = from_jid.userhost() outer["To"] = to_email outer["Content-Type"] = "multipart/encrypted; protocol=\"application/pgp-encrypted\"" version = MIMEApplication( "Version: 1\n", _subtype='pgp-encrypted', _encoder=encoders.encode_7or8bit ) version["Content-Description"] = "PGP/MIME version identification" encrypted_part = MIMEApplication( encrypted_payload, _subtype='octet-stream', _encoder=encoders.encode_7or8bit ) encrypted_part["Content-Description"] = "OpenPGP encrypted message" encrypted_part["Content-Type"] = "application/octet-stream; name=\"encrypted.asc\"" encrypted_part["Content-Disposition"] = "inline; filename=\"encrypted.asc\"" outer.attach(version) outer.attach(encrypted_part) body = outer.as_bytes() await self.send_encrypted_email( from_jid=from_jid, to_email=to_email, body=body, extra=SendMailExtra(**extra_kw) if extra_kw else None, ) else: self._shim.move_keywords_to_headers(mess_data["extra"]) headers = mess_data["extra"].get("headers") if headers: extra_kw["headers"] = headers try: body_lang, body = next(iter(mess_data["message"].items())) except (KeyError, StopIteration): log.warning(f"No body found: {mess_data}") body_lang, body = "", "" try: subject_lang, subject = next(iter(mess_data["subject"].items())) except (KeyError, StopIteration): subject_lang, subject = "", None if not body and not subject: log.warning(f"Ignoring empty message: {mess_data}") return mess_data await self.send_email( from_jid=from_jid, to_email=to_email, body=body, subject=subject, extra=SendMailExtra(**extra_kw) if extra_kw else None, ) except exceptions.UnknownEntityError: log.warning(f"Can't send message, user {from_jid} is not registered.") message_error_elt = StanzaError( "subscription-required", text="User need to register to the gateway before sending emails.", ).toResponse(message_elt) await client.a_send(message_error_elt) raise exceptions.CancelError("User not registered.") except StanzaError as e: log.warning("Can't send message: {e}") message_error_elt = e.toResponse(message_elt) await client.a_send(message_error_elt) raise exceptions.CancelError("Can't send message: {e}") return mess_data def jid_to_email( self, client: SatXMPPEntity, address_jid: jid.JID, credentials: Credentials ) -> str: """Convert a JID to an email address. If JID is from the gateway, email address will be extracted. Otherwise, the gateway email will be used, with XMPP address specified in name part. @param address_jid: JID of the recipient. @param credentials: Sender credentials. @return: Email address. """ if address_jid and address_jid.host.endswith(str(client.jid)): return self._e.unescape(address_jid.user) else: email_address = credentials.user_email if address_jid: email_address = formataddr((f"xmpp:{address_jid}", email_address)) return email_address async def get_credentials(self, from_jid: jid.JID) -> Credentials: """Retrieve user credentials from a bare JID. @param from_jid: Entity to retrieve credentials from. @return: Credentials. @raise UnknownEntityError: If no credentials are found for the given JID. """ # We need a bare jid. assert self.storage is not None assert not from_jid.resource try: user_data = self.users_data[from_jid] except KeyError: key = KEY_CREDENTIALS.format(from_jid=from_jid) credentials = await self.storage.get(key) if credentials is None: raise exceptions.UnknownEntityError( f"No credentials found for {from_jid}." ) self.users_data[from_jid] = UserData(credentials) else: credentials = user_data.credentials return credentials async def send_encrypted_email( self, from_jid: jid.JID, to_email: str | None, body: bytes, extra: SendMailExtra | None = None, ) -> None: """Send an email using sender credentials. Credentials will be retrieved from cache or database. @param from_jid: Bare JID of the sender. @param to_email: Email address of the recipient. @param body: Encrypted body of the email. @param extra: Extra data. """ assert self.client is not None assert isinstance(body, bytes) credentials = await self.get_credentials(from_jid) sender_domain = credentials.user_email.split("@", 1)[-1] recipients = [] if to_email is not None: recipients.append(to_email.encode()) if extra is not None and extra.addresses is not None: for address in extra.addresses.addresses: recipient_jid = address.jid if recipient_jid is None: continue recipient_email = self.jid_to_email( self.client, recipient_jid, credentials ) recipients.append(recipient_email.encode()) if not recipients: raise exceptions.InternalError("No recipient found.") await smtp.sendmail( credentials.smtp_host.encode(), credentials.user_email.encode(), recipients, body, senderDomainName=sender_domain, port=int(credentials.smtp_port), username=credentials.smtp_username.encode(), password=credentials.smtp_password.encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. requireTransportSecurity=True, ) async def send_email( self, from_jid: jid.JID, to_email: str | None, body: str, subject: str | None, extra: SendMailExtra | None = None, ) -> None: """Send an email using sender credentials. Credentials will be retrieve from cache, or database. @param from_jid: Bare JID of the sender. @param to_email: Email address of the destinee. @param body: Body of the email. @param subject: Subject of the email. @param extra: Extra data. @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found. """ assert self.client is not None if extra is None: extra = SendMailExtra() if to_email is None and (extra.addresses is None or not extra.addresses.to): raise exceptions.InternalError( '"to_email" can\'t be None if there is no "to" address!' ) credentials = await self.get_credentials(from_jid) if isinstance(body, bytes): assert to_email is not None sender_domain = credentials.user_email.split("@", 1)[-1] await smtp.sendmail( credentials.smtp_host.encode(), credentials.user_email.encode(), [to_email.encode()], body, senderDomainName=sender_domain, port=int(credentials.smtp_port), username=credentials.smtp_username.encode(), password=credentials.smtp_password.encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. requireTransportSecurity=True, ) return msg = MIMEText(body, "plain", "UTF-8") if subject is not None: msg["Subject"] = subject msg["From"] = formataddr( (credentials.user_name or None, credentials.user_email) ) if extra.addresses: assert extra.addresses.to main_to_address = extra.addresses.to[0] assert main_to_address.jid to_email = self.jid_to_email(self.client, main_to_address.jid, credentials) for field in RECIPIENT_FIELDS: addresses = getattr(extra.addresses, field) if not addresses: continue for address in addresses: if not address.delivered and ( address.jid is None or address.jid.host != str(self.client.jid) ): log.warning( "Received undelivered message to external JID, this is not " "allowed! Cancelling the message sending." ) stanza_err = jabber_error.StanzaError( "forbidden", text="Multicasting (XEP-0033 addresses) can only be used " "with JID from this gateway, not external ones. " f" {address.jid} can't be delivered by this gateway and " "should be delivered by server instead.", ) raise stanza_err email_addresses = [ self.jid_to_email(self.client, address.jid, credentials) for address in addresses if address.jid ] if email_addresses: msg[field.upper()] = ", ".join(email_addresses) else: assert to_email is not None msg["To"] = to_email sender_domain = credentials.user_email.split("@", 1)[-1] if extra.headers: if extra.headers.keywords: msg["Keywords"] = extra.headers.keywords if extra.headers.urgency: urgency = extra.headers.urgency if urgency == Urgency.medium: importance = "normal" else: importance = urgency msg["Importance"] = importance if getattr(extra.headers, "autocrypt", None): msg["Autocrypt"] = extra.headers.autocrypt await smtp.sendmail( credentials.smtp_host.encode(), credentials.user_email.encode(), [to_email.encode()], msg.as_bytes(), senderDomainName=sender_domain, port=int(credentials.smtp_port), username=credentials.smtp_username.encode(), password=credentials.smtp_password.encode(), requireAuthentication=True, # TODO: only STARTTLS is supported right now, implicit TLS should be supported # too. requireTransportSecurity=True, ) async def _on_registration_form( self, client: SatXMPPEntity, iq_elt: domish.Element ) -> tuple[bool, data_form.Form] | None: if client != self.client: return assert self.storage is not None from_jid = jid.JID(iq_elt["from"]) key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost()) credentials = await self.storage.get(key) or {} form = data_form.Form(formType="form", title="IMAP/SMTP Credentials") # Add instructions form.instructions = [ D_( "Please provide your IMAP and SMTP credentials to configure the " "connection." ) ] # Add identity fields form.addField( data_form.Field( fieldType="text-single", var="user_name", label="User Name", desc=D_('The display name to use in the "From" field of sent emails.'), value=credentials.get("user_name"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="user_email", label="User Email", desc=D_('The email address to use in the "From" field of sent emails.'), value=credentials.get("user_email"), required=True, ) ) # Add fields for IMAP credentials form.addField( data_form.Field( fieldType="text-single", var="imap_host", label="IMAP Host", desc=D_("IMAP server hostname or IP address"), value=credentials.get("imap_host"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="imap_port", label="IMAP Port", desc=D_("IMAP server port (default: 993)"), value=credentials.get("imap_port", "993"), ) ) form.addField( data_form.Field( fieldType="text-single", var="imap_username", label="IMAP Username", desc=D_("Username for IMAP authentication"), value=credentials.get("imap_username"), required=True, ) ) form.addField( data_form.Field( fieldType="text-private", var="imap_password", label="IMAP Password", desc=D_("Password for IMAP authentication"), value=credentials.get("imap_password"), required=True, ) ) # Add fields for SMTP credentials form.addField( data_form.Field( fieldType="text-single", var="smtp_host", label="SMTP Host", desc=D_("SMTP server hostname or IP address"), value=credentials.get("smtp_host"), required=True, ) ) form.addField( data_form.Field( fieldType="text-single", var="smtp_port", label="SMTP Port", desc=D_("SMTP server port (default: 587)"), value=credentials.get("smtp_port", "587"), ) ) form.addField( data_form.Field( fieldType="text-single", var="smtp_username", label="SMTP Username", desc=D_("Username for SMTP authentication"), value=credentials.get("smtp_username"), required=True, ) ) form.addField( data_form.Field( fieldType="text-private", var="smtp_password", label="SMTP Password", desc=D_("Password for SMTP authentication"), value=credentials.get("smtp_password"), required=True, ) ) return bool(credentials), form def validate_field( self, form: data_form.Form, key: str, field_type: str, min_value: int | None = None, max_value: int | None = None, default: str | int | None = None, ) -> None: """Validate a single field. @param form: The form containing the fields. @param key: The key of the field to validate. @param field_type: The expected type of the field value. @param min_value: Optional minimum value for integer fields. @param max_value: Optional maximum value for integer fields. @param default: Default value to use if the field is missing. @raise StanzaError: If the field value is invalid or missing. """ field = form.fields.get(key) if field is None: if default is None: raise StanzaError("bad-request", text=f"{key} is required") field = data_form.Field(var=key, value=str(default)) form.addField(field) value = field.value if field_type == "int": try: value = int(value) if (min_value is not None and value < min_value) or ( max_value is not None and value > max_value ): raise ValueError except (ValueError, TypeError): raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") elif field_type == "str": if not isinstance(value, str): raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}") # Basic email validation for user_email field if key == "user_email": # XXX: This is a minimal check. A complete email validation is notoriously # difficult. if not regex.RE_EMAIL.match(value): raise StanzaError( "bad-request", text=f"Invalid email address: {value}" ) def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None: """Validate the submitted IMAP/SMTP credentials form. @param submit_form: The submitted form containing IMAP/SMTP credentials. @raise StanzaError: If any of the values are invalid. """ # Validate identity fields self.validate_field(submit_form, "user_name", "str") self.validate_field(submit_form, "user_email", "str") # Validate IMAP fields self.validate_field(submit_form, "imap_host", "str") self.validate_field( submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993 ) self.validate_field(submit_form, "imap_username", "str") self.validate_field(submit_form, "imap_password", "str") # Validate SMTP fields self.validate_field(submit_form, "smtp_host", "str") self.validate_field( submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587 ) self.validate_field(submit_form, "smtp_username", "str") self.validate_field(submit_form, "smtp_password", "str") def email_to_jid( self, client: SatXMPPEntity, user_email: str, user_jid: jid.JID, email_name: str, email_addr: str, ) -> tuple[jid.JID, str | None]: """Convert an email address to a JID and extract the name if present. @param client: Client session. @param user_email: Email address of the gateway user. @param user_jid: JID of the gateway user. @param email_name: Email associated name. @param email_addr: Email address. @return: Tuple of JID and name (if present). """ email_name = email_name.strip() if email_name.startswith("xmpp:"): return jid.JID(email_name[5:]), None elif email_addr == user_email: return (user_jid, None) else: return ( jid.JID(None, (self._e.escape(email_addr), client.jid.host, None)), email_name or None, ) async def on_new_email( self, user_data: UserData, user_jid: jid.JID, email: EmailMessage ) -> None: """Called when a new message has been received. @param user_data: user data, used to map registered user email to corresponding jid. @param user_jid: JID of the recipient. @param email: Parsed email. """ assert self.client is not None user_email = user_data.credentials.user_email author_name, author_email = parseaddr(email["from"]) author_email = author_email.lower() from_jid = jid.JID(None, (self._e.escape(author_email), self.client.jid.host, None)) # Get the email body body_mime = email.get_body(("plain",)) if body_mime is not None: charset = body_mime.get_content_charset() or "utf-8" body = body_mime.get_payload(decode=True).decode(charset, errors="replace") else: log.warning(f"No body found in email:\n{email}") body = "" # Decode the subject subject = email.get("subject") if subject: decoded_subject = decode_header(subject) subject = "".join( [ part.decode(encoding or "utf-8") if isinstance(part, bytes) else part for part, encoding in decoded_subject ] ).strip() else: subject = None # Parse recipient fields kwargs = {} for field in RECIPIENT_FIELDS: email_addresses = email.get_all(field) if email_addresses: jids_and_names = [ self.email_to_jid(self.client, user_email, user_jid, name, addr) for name, addr in getaddresses(email_addresses) ] kwargs[field] = [ AddressType(jid=jid, desc=name) for jid, name in jids_and_names ] # At least "to" header should be set, so kwargs should never be empty assert kwargs addresses_data = AddressesData(**kwargs) # Parse reply-to field reply_to_addresses = email.get_all("reply-to") if reply_to_addresses: jids_with_names = [ self.email_to_jid(self.client, user_email, user_jid, name, addr) for name, addr in getaddresses(reply_to_addresses) ] addresses_data.replyto = [ AddressType(jid=jid, desc=name) for jid, name in jids_with_names ] # Set noreply flag # There is no flag to indicate a no-reply message, so we check common user parts # in from and reply-to headers. from_addresses = [author_email] if reply_to_addresses: from_addresses.extend( addr for a in reply_to_addresses if (addr := parseaddr(a)[1]) ) for from_address in from_addresses: from_user_part = from_address.split("@", 1)[0].lower() if from_user_part in ( "no-reply", "noreply", "do-not-reply", "donotreply", "notification", "notifications", ): addresses_data.noreply = True break extra = {} if ( not addresses_data.replyto and not addresses_data.noreply and not addresses_data.cc and not addresses_data.bcc and addresses_data.to == [AddressType(jid=user_jid)] ): # The main recipient is the only one, and there is no other metadata: there is # no need to add addresses metadata. pass else: for address in addresses_data.addresses: if address.jid and ( address.jid == user_jid or address.jid.host == str(self.client.jid) ): # Those are email address, and have been delivered by the sender, # other JID addresses will have to be delivered by us. address.delivered = True extra["addresses"] = addresses_data.model_dump(mode="json", exclude_none=True) # We look for interesting headers headers = {} keywords_headers = email.get_all("keywords") if keywords_headers: keywords = ",".join(keywords_headers) headers["keywords"] = keywords importance = email["importance"] if importance: # We convert to urgency if importance in ("low", "high"): headers["urgency"] = importance elif importance == "normal": headers["urgency"] = "medium" else: log.warning("Ignoring invalid importance header: {importance!r}") autocrypt = email["autocrypt"] if autocrypt: headers["autocrypt"] = autocrypt if headers: extra["headers"] = HeadersData(**headers).model_dump( mode="json", exclude_none=True ) # Handle attachments for part in email.iter_attachments(): await self.handle_attachment(part, user_jid) client = self.client.get_virtual_client(from_jid) # Now that the message is parsed, we check if it's a mailing list. list_ids = email.get_all("list-id") if list_ids: try: await self.handle_mailing_list( client, user_data, author_name, author_email, list_ids[-1], email, user_jid, body, subject, extra ) except exceptions.DataError as e: log.warning(f"Can't parse mailing list email: {e}.") except Exception: log.exception("Can't parse mailing list email.") else: await client.sendMessage( user_jid, {"": body}, {"": subject} if subject else None, extra=extra, ) def parse_references(self, email: EmailMessage) -> list[str]: """Extract message IDs from the "References" header. @param email: The parsed email message. @returns: Message IDs """ references = [] for header in email.get_all('references', []): for token in header.split(): _, address = parseaddr(token) if address: references.append(address) return references def email_to_mb_data( self, email: EmailMessage, service: jid.JID, node: str, item_id: str, author_name: str, author_email: str ) -> MbData: """Convert an email to blog data. @param email: email to convert. @return: Blog data. """ tags = set() # Title title = email.get('subject', '') def strip_tags(m): value = m.group()[1:-1].strip().lower() if value: tags.add(value) return "" title = re.sub(r"\[.+?\]", strip_tags, title).strip() # Dates date_header = email.get('Date') if date_header: try: published = date_utils.date_parse(date_header) except ValueError: published = None else: published = None # Body content = None content_xhtml = None if email.is_multipart(): # We'll collect possible plain and html content parts plain_parts = [] html_parts = [] for part in email.walk(): content_type = part.get_content_type() content_disposition = part.get('Content-Disposition', '').lower() # Skip attachments if 'attachment' in content_disposition: continue payload = part.get_payload(decode=True) if payload is None: continue payload = cast(bytes, payload) charset = part.get_content_charset() or 'utf-8' try: payload_text = payload.decode(charset, errors='replace') except (LookupError, TypeError): payload_text = payload.decode('utf-8', errors='replace') if content_type == 'text/plain': plain_parts.append(payload_text) elif content_type == 'text/html': html_parts.append(payload_text) # Prefer first valid parts for safety if plain_parts: content = plain_parts[0] if html_parts: content_xhtml = html_parts[0] else: # Single part email payload = email.get_payload(decode=True) if payload is not None: payload = cast(bytes, payload) charset = email.get_content_charset() or 'utf-8' try: content_text = payload.decode(charset, errors='replace') except (LookupError, TypeError): content_text = payload.decode('utf-8', errors='replace') content_type = email.get_content_type() if content_type == 'text/plain': content = content_text elif content_type == 'text/html': content_xhtml = content_text if content_xhtml is None: assert content is not None content_xhtml = convert_to_html_and_detect_noise(content) else: content_xhtml = convert_to_html_and_detect_noise(content_xhtml, is_html=True) content_xhtml = self._syntax.clean_xhtml(content_xhtml) return MbData( service=service, node=node, id=item_id, published=published, title=title, content=content, content_xhtml=content_xhtml, author=author_name, author_email=author_email, tags=list(tags) ) async def mb_data_to_email( self, credentials: Credentials, mb_data: MbData ) -> EmailMessage: """Convert blog data to an email message. @param mb_data: Blog data to convert. @return: Email message. """ email = EmailMessage() # Title title_parts = [] tags = [] for tag in mb_data.tags: if stripped_tag := tag.strip(): title_parts.append(f"[{stripped_tag}]") tags.append(stripped_tag) if mb_data.title: title_parts.append(mb_data.title) email['Subject'] = ' '.join(title_parts) if tags: email['Keywords'] = ', '.join(tags) # From # FIXME: Check email according to sender. if mb_data.author and mb_data.author_email: email['From'] = f"{mb_data.author} <{mb_data.author_email}>" elif mb_data.author: email['From'] = f"{mb_data.author} <{credentials.user_email}>" elif mb_data.author_email: email['From'] = mb_data.author_email else: email['From'] = credentials.user_email # Message ID if mb_data.id: email['Message-ID'] = f"<{mb_data.id}>" else: msg_id = make_msgid() email['Message-ID'] = msg_id mb_data.id = msg_id[1:-1] # In-Reply-To if mb_data.in_reply_tos: # Use the first reply-to reference in_reply_to = mb_data.in_reply_tos[0] if in_reply_to.ref: email['In-Reply-To'] = f"<{in_reply_to.ref}>" # Dates date_set = False if mb_data.published is not None: try: email['Date'] = formatdate(mb_data.published, localtime=True) date_set = True except Exception: pass if not date_set and mb_data.updated is not None: try: email['Date'] = formatdate(mb_data.updated, localtime=True) except Exception: pass # Content if mb_data.content_xhtml: # XHTML content xhtml_content = mb_data.content_xhtml if not mb_data.content: mb_data.content = await self._syntax.convert( xhtml_content, self._syntax.SYNTAX_XHTML, self._syntax.SYNTAX_TEXT, False, ) email.set_content(mb_data.content) email.add_alternative(xhtml_content, subtype='html') elif mb_data.content: email.set_content(mb_data.content) else: email.set_content("") return email async def handle_mailing_list( self, client: SatXMPPEntity, user_data: UserData, author_name: str, author_email: str, list_id_header: str, email: EmailMessage, user_jid: jid.JID, body: str, subject: str|None, extra: dict ) -> None: """Handle emails from mailing lists. Mailing list emails are converted to pubsub blogs. """ assert self.client is not None pubsub_service = self.client.jid message_id_list = email.get_all("message-id") if not message_id_list: raise exceptions.DataError("Missing message ID.") message_id = message_id_list[-1].strip() if message_id.startswith("<") and message_id.endswith(">"): message_id = message_id[1:-1] if not message_id: raise exceptions.DataError("Emtpy message ID.") list_name, list_id = parseaddr(list_id_header, strict=False) list_name = list_name.strip() list_id = list_id.strip() if not list_id: raise exceptions.DataError( f"Mailing list ID is empty, we can't parse id: {list_id_header=}." ) root_node = await G.storage.get_pubsub_node( client, pubsub_service, list_id, with_subscriptions=True, create=True, create_kwargs={ "access_model": AccessModel.whitelist, "publish_model": PublishModel.publishers, "affiliations": [PubsubAffiliation( entity = user_jid, affiliation = Affiliation.owner )], "subscribed": True }, ) assert root_node is not None in_reply_to_value = ''.join(email.get_all("in-reply-to", [])) __, in_reply_to = parseaddr(in_reply_to_value) references = self.parse_references(email) if in_reply_to: if not references: log.warning( '"References" header should not be empty when "In-Reply-To" is set.' ) references = [in_reply_to] else: if references[-1] != in_reply_to: log.warning('Last ID in "References" should be "In-Reply-To".') references.append(in_reply_to) if references: # We check that the top message of the thread has a corresponding item. top_item_id = references[0] parent_node_name = self._mb.get_comments_node(top_item_id) parent_node_uri = uri.build_xmpp_uri( "pubsub", path=root_node.service.full(), node=parent_node_name, ) top_items = await G.storage.get_items(root_node, item_ids=[top_item_id]) if not top_items: # The top item is missing, we make an empty one. empty_item_data = MbData( service=pubsub_service, node=root_node.name, id=top_item_id, title="missing item", content="missing item", author_jid=pubsub_service, comments=[ Comment( uri=parent_node_uri, service=pubsub_service, node=parent_node_name ) ] ) await G.storage.cache_pubsub_items( client, root_node, [await empty_item_data.to_element(client)] ) parent_node = await G.storage.get_pubsub_node( client, pubsub_service, parent_node_name, with_subscriptions=True, create=True, create_kwargs={ "access_model": AccessModel.whitelist, "publish_model": PublishModel.publishers, "subscribed": True, "affiliations": [PubsubAffiliation( entity = user_jid, affiliation = Affiliation.owner )], "parent_node": root_node }, ) else: parent_node = root_node parent_node_name = cast(str, root_node.name) mb_data = self.email_to_mb_data( email, pubsub_service, parent_node_name, message_id, author_name, author_email ) mb_data.comments.append( Comment( service = pubsub_service, node = self._mb.get_comments_node(message_id), ) ) await G.storage.cache_pubsub_items( client, parent_node, [await mb_data.to_element(client)], [mb_data.model_dump(mode="json")] ) async def handle_attachment(self, part: EmailMessage, recipient_jid: jid.JID) -> None: """Handle an attachment from an email. @param part: The object representing the attachment. @param recipient_jid: JID of the recipient to whom the attachment is being sent. """ assert self.client is not None content_type = part.get_content_type() filename = part.get_filename() or "attachment" log.debug(f"Handling attachment: {filename} ({content_type})") file_metadata = await deferToThread(self._save_attachment, part) if file_metadata is not None: log.debug(f"Attachment {filename!r} saved to {file_metadata.path}") try: await self.host.memory.set_file( self.client, filename, file_hash=file_metadata.hash, hash_algo="sha-256", size=file_metadata.size, namespace=PLUGIN_INFO[C.PI_IMPORT_NAME], mime_type=content_type, owner=recipient_jid, ) except Exception: log.exception(f"Failed to register file {filename!r}") def _save_attachment(self, part: EmailMessage) -> FileMetadata | None: """Save the attachment to files path. This method must be executed in a thread with deferToThread to avoid blocking the reactor with IO operations if the attachment is large. @param part: The object representing the attachment. @return: Attachment data, or None if an error occurs. @raises IOError: Can't save the attachment. """ temp_file = None try: with tempfile.NamedTemporaryFile(delete=False) as temp_file: payload = part.get_payload(decode=True) if isinstance(payload, bytes): temp_file.write(payload) file_hash = hashlib.sha256(payload).hexdigest() file_path = self.files_path / file_hash shutil.move(temp_file.name, file_path) file_size = len(payload) return FileMetadata(path=file_path, hash=file_hash, size=file_size) else: log.warning(f"Can't write payload of type {type(payload)}.") return None except Exception as e: raise IOError(f"Failed to save attachment: {e}") finally: if temp_file is not None and Path(temp_file.name).exists(): Path(temp_file.name).unlink() async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None: """Connect to IMAP service. [self.on_new_email] will be used as callback on new messages. @param from_jid: JID of the user associated with given credentials. @param credentials: Email credentials. """ credentials = user_data.credentials connected = defer.Deferred() factory = IMAPClientFactory( user_data, partial(self.on_new_email, user_data, from_jid.userhostJID()), connected, ) reactor.connectTCP( credentials.imap_host, int(credentials.imap_port), factory ) await connected async def _on_registration_submit( self, client: SatXMPPEntity, iq_elt: domish.Element, submit_form: data_form.Form | None, ) -> bool | None: """Handle registration submit request. Submit form is validated, and credentials are stored. @param client: client session. iq_elt: IQ stanza of the submission request. submit_form: submit form. @return: True if successful. None if the callback is not relevant for this request. """ if client != self.client: return assert self.storage is not None from_jid = jid.JID(iq_elt["from"]).userhostJID() if submit_form is None: # This is an unregistration request. try: user_data = self.users_data[from_jid] except KeyError: pass else: if user_data.imap_client is not None: try: await user_data.imap_client.logout() except Exception: log.exception(f"Can't log out {from_jid} from IMAP server.") key = KEY_CREDENTIALS.format(from_jid=from_jid) await self.storage.adel(key) log.info(f"{from_jid} unregistered from this gateway.") return True self.validate_imap_smtp_form(submit_form) credentials = Credentials(**{ key: field.value for key, field in submit_form.fields.items() }) user_data = self.users_data.get(from_jid) if user_data is None: # The user is not in cache, we cache current credentials. user_data = self.users_data[from_jid] = UserData(credentials=credentials) else: # The user is known, we update credentials. user_data.credentials = credentials key = KEY_CREDENTIALS.format(from_jid=from_jid) try: await self.connect_imap(from_jid, user_data) except Exception as e: log.warning(f"Can't connect to IMAP server for {from_jid}") credentials.imap_success = False await self.storage.aset(key, credentials) raise e else: log.debug(f"Connection successful to IMAP server for {from_jid}") credentials.imap_success = True await self.storage.aset(key, credentials) return True async def on_relayed_encryption_data( self, client: SatXMPPEntity, iq_elt: domish.Element, form: data_form.Form ) -> None: from_jid = jid.JID(iq_elt["from"]).userhostJID() credentials = await self.get_credentials(from_jid) form.addField(data_form.Field(var="sender_id", value=credentials.user_email)) @implementer(iwokkel.IDisco) class EmailGatewayHandler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [ data_form.Form( "result", formNamespace="urn:xmpp:data-policy:0", fields=( data_form.Field( "list-single", "auth_data", "plain" ), data_form.Field( "list-single", "data-transmission", "encrypted" ), data_form.Field( "text-single", "encryption_algorithm", "TLS" ), data_form.Field( "text-single", "data_retention", "0", ), ), ), data_form.Form( "result", formNamespace="urn:xmpp:data-policy:identity:gateway:smtp:0", fields=( data_form.Field( "text-multi", "extra_info", "This gateway acts as a relay to external IMAP/SMTP servers. Data policies depend entirely on the external server chosen by the user. This gateway does not store or process user data.", ), ), ), ] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []