comparison libervia/backend/plugins/plugin_comp_email_gateway/__init__.py @ 4303:a7ec325246fb

component email-gateway: first draft: Initial implementation of the Email Gateway. This component uses XEP-0100 for registration. Upon registration and subsequent startups, a connection is made to registered IMAP services, and incoming emails (in `INBOX` mailboxes) are immediately forwarded as XMPP messages. In the opposite direction, an SMTP connection is established to send emails on incoming XMPP messages. rel 449
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 18:07:17 +0200
parents
children b56b1eae7994
comparison
equal deleted inserted replaced
4302:9e7ea54b93ee 4303:a7ec325246fb
1 #!/usr/bin/env python3
2
3 # Libervia Email Gateway Component
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from dataclasses import dataclass
20 from email.header import decode_header
21 from email.message import EmailMessage
22 from email.mime.text import MIMEText
23 from email.utils import formataddr, parseaddr
24 from functools import partial
25 import re
26 from typing import Any, cast
27
28 from twisted.internet import defer, reactor
29 from twisted.mail import smtp
30 from twisted.words.protocols.jabber import jid
31 from twisted.words.protocols.jabber.error import StanzaError
32 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
33 from twisted.words.xish import domish
34 from wokkel import data_form, disco, iwokkel
35 from zope.interface import implementer
36
37 from libervia.backend.core import exceptions
38 from libervia.backend.core.constants import Const as C
39 from libervia.backend.core.core_types import SatXMPPEntity
40 from libervia.backend.core.i18n import D_, _
41 from libervia.backend.core.log import getLogger
42 from libervia.backend.memory.persistent import LazyPersistentBinaryDict
43 from libervia.backend.memory.sqla import select
44 from libervia.backend.memory.sqla_mapping import PrivateIndBin
45 from libervia.backend.models.core import MessageData
46 from libervia.backend.plugins.plugin_xep_0077 import XEP_0077
47 from libervia.backend.plugins.plugin_xep_0106 import XEP_0106
48 from libervia.backend.tools.utils import aio
49
50 from .models import Credentials, UserData
51 from .imap import IMAPClientFactory
52
53
54 log = getLogger(__name__)
55
56 IMPORT_NAME = "email-gateway"
57 NAME = "Libervia Email Gateway"
58
59 PLUGIN_INFO = {
60 C.PI_NAME: "Email Gateway Component",
61 C.PI_IMPORT_NAME: IMPORT_NAME,
62 C.PI_MODES: [C.PLUG_MODE_COMPONENT],
63 C.PI_TYPE: C.PLUG_TYPE_ENTRY_POINT,
64 C.PI_PROTOCOLS: [],
65 C.PI_DEPENDENCIES: ["XEP-0077", "XEP-0106"],
66 C.PI_RECOMMENDATIONS: [],
67 C.PI_MAIN: "EmailGatewayComponent",
68 C.PI_HANDLER: C.BOOL_TRUE,
69 C.PI_DESCRIPTION: D_(
70 "Gateway to handle email. Usual emails are handled as message, while mailing "
71 "lists are converted to pubsub blogs."
72 ),
73 }
74
75 CONF_SECTION = f"component {IMPORT_NAME}"
76 PREFIX_KEY_CREDENTIALS = "CREDENTIALS_"
77 KEY_CREDENTIALS = f"{PREFIX_KEY_CREDENTIALS}{{from_jid}}"
78
79 email_pattern = re.compile(r"[^@]+@[^@]+\.[^@]+")
80
81
82 class EmailGatewayComponent:
83 IMPORT_NAME = IMPORT_NAME
84 verbose = 0
85
86 def __init__(self, host):
87 self.host = host
88 self.client: SatXMPPEntity | None = None
89 self.initalized = False
90 self.storage: LazyPersistentBinaryDict | None = None
91 self._iq_register = cast(XEP_0077, host.plugins["XEP-0077"])
92 self._iq_register.register_handler(
93 self._on_registration_form, self._on_registration_submit
94 )
95 self._e = cast(XEP_0106, host.plugins["XEP-0106"])
96 # TODO: For the moment, all credentials are kept in cache; we should only keep the
97 # X latest.
98 self.users_data: dict[jid.JID, UserData] = {}
99 host.trigger.add_with_check(
100 "message_received", self, self._message_received_trigger, priority=-1000
101 )
102
103 async def _init(self) -> None:
104 """Initialisation done after profile is connected"""
105 assert self.client is not None
106 self.client.identities.append(disco.DiscoIdentity("gateway", "smtp", NAME))
107 self.storage = LazyPersistentBinaryDict(IMPORT_NAME, self.client.profile)
108 await self.connect_registered_users()
109
110 @aio
111 async def get_registered_users(self) -> dict[jid.JID, Credentials]:
112 """Retrieve credentials for all registered users
113
114 @return: a mapping from user JID to credentials data.
115 """
116 assert self.client is not None
117 profile_id = self.host.memory.storage.profiles[self.client.profile]
118 async with self.host.memory.storage.session() as session:
119 query = select(PrivateIndBin).where(
120 PrivateIndBin.profile_id == profile_id,
121 PrivateIndBin.namespace == IMPORT_NAME,
122 PrivateIndBin.key.startswith(PREFIX_KEY_CREDENTIALS),
123 )
124 result = await session.execute(query)
125 return {
126 jid.JID(p.key[len(PREFIX_KEY_CREDENTIALS) :]): p.value
127 for p in result.scalars()
128 }
129
130 async def connect_registered_users(self) -> None:
131 """Connected users already registered to the gateway."""
132 registered_data = await self.get_registered_users()
133 for user_jid, credentials in registered_data.items():
134 user_data = self.users_data[user_jid] = UserData(credentials=credentials)
135 if not credentials["imap_success"]:
136 log.warning(
137 f"Ignoring unsuccessful IMAP credentials of {user_jid}. This user "
138 "won't receive message from this gateway."
139 )
140 else:
141 try:
142 await self.connect_imap(user_jid, user_data)
143 except Exception as e:
144 log.warning(f"Can't connect {user_jid} to IMAP: {e}.")
145 else:
146 log.debug(f"Connection to IMAP server successful for {user_jid}.")
147
148 def get_handler(self, __) -> XMPPHandler:
149 return EmailGatewayHandler()
150
151 async def profile_connecting(self, client: SatXMPPEntity) -> None:
152 self.client = client
153 if not self.initalized:
154 await self._init()
155 self.initalized = True
156
157 def _message_received_trigger(
158 self,
159 client: SatXMPPEntity,
160 message_elt: domish.Element,
161 post_treat: defer.Deferred,
162 ) -> bool:
163 """add the gateway workflow on post treatment"""
164 if client != self.client:
165 return True
166 post_treat.addCallback(
167 lambda mess_data: defer.ensureDeferred(
168 self.on_message(client, mess_data, message_elt)
169 )
170 )
171 return True
172
173 async def on_message(
174 self, client: SatXMPPEntity, mess_data: MessageData, message_elt: domish.Element
175 ) -> dict:
176 """Called once message has been parsed
177
178 @param client: Client session.
179 @param mess_data: Message data.
180 @return: Message data.
181 """
182 if client != self.client:
183 return mess_data
184 from_jid = mess_data["from"].userhostJID()
185 if mess_data["type"] not in ("chat", "normal"):
186 log.warning(f"ignoring message with unexpected type: {mess_data}")
187 return mess_data
188 if not client.is_local(from_jid):
189 log.warning(f"ignoring non local message: {mess_data}")
190 return mess_data
191 if not mess_data["to"].user:
192 log.warning(f"ignoring message addressed to gateway itself: {mess_data}")
193 return mess_data
194
195 try:
196 to_email = self._e.unescape(mess_data["to"].user)
197 except ValueError:
198 raise exceptions.DataError(
199 f'Invalid "to" JID, can\'t send message: {message_elt.toXml()}.'
200 )
201
202 try:
203 body_lang, body = next(iter(mess_data["message"].items()))
204 except (KeyError, StopIteration):
205 log.warning(f"No body found: {mess_data}")
206 body_lang, body = "", ""
207 try:
208 subject_lang, subject = next(iter(mess_data["subject"].items()))
209 except (KeyError, StopIteration):
210 subject_lang, subject = "", None
211
212 if not body and not subject:
213 log.warning(f"Ignoring empty message: {mess_data}")
214 return mess_data
215
216 try:
217 await self.send_email(
218 from_jid=from_jid,
219 to_email=to_email,
220 body=body,
221 subject=subject,
222 )
223 except exceptions.UnknownEntityError:
224 log.warning(f"Can't send message, user {from_jid} is not registered.")
225 message_error_elt = StanzaError(
226 "subscription-required",
227 text="User need to register to the gateway before sending emails.",
228 ).toResponse(message_elt)
229 await client.a_send(message_error_elt)
230
231 raise exceptions.CancelError("User not registered.")
232
233 return mess_data
234
235 async def send_email(
236 self,
237 from_jid: jid.JID,
238 to_email: str,
239 body: str,
240 subject: str | None,
241 ) -> None:
242 """Send an email using sender credentials.
243
244 Credentials will be retrieve from cache, or database.
245
246 @param from_jid: Bare JID of the sender.
247 @param to_email: Email address of the destinee.
248 @param body: Body of the email.
249 @param subject: Subject of the email.
250
251 @raise exceptions.UnknownEntityError: Credentials for "from_jid" can't be found.
252 """
253 # We need a bare jid.
254 assert self.storage is not None
255 assert not from_jid.resource
256 try:
257 user_data = self.users_data[from_jid]
258 except KeyError:
259 key = KEY_CREDENTIALS.format(from_jid=from_jid)
260 credentials = await self.storage.get(key)
261 if credentials is None:
262 raise exceptions.UnknownEntityError(
263 f"No credentials found for {from_jid}."
264 )
265 self.users_data[from_jid] = UserData(credentials)
266 else:
267 credentials = user_data.credentials
268
269 msg = MIMEText(body, "plain", "UTF-8")
270 if subject is not None:
271 msg["Subject"] = subject
272 msg["From"] = formataddr(
273 (credentials["user_name"] or None, credentials["user_email"])
274 )
275 msg["To"] = to_email
276
277 sender_domain = credentials["user_email"].split("@", 1)[-1]
278
279 await smtp.sendmail(
280 credentials["smtp_host"].encode(),
281 credentials["user_email"].encode(),
282 [to_email.encode()],
283 msg.as_bytes(),
284 senderDomainName=sender_domain,
285 port=int(credentials["smtp_port"]),
286 username=credentials["smtp_username"].encode(),
287 password=credentials["smtp_password"].encode(),
288 requireAuthentication=True,
289 # TODO: only STARTTLS is supported right now, implicit TLS should be supported
290 # too.
291 requireTransportSecurity=True,
292 )
293
294 async def _on_registration_form(
295 self, client: SatXMPPEntity, iq_elt: domish.Element
296 ) -> tuple[bool, data_form.Form] | None:
297 if client != self.client:
298 return
299 assert self.storage is not None
300 from_jid = jid.JID(iq_elt["from"])
301 key = KEY_CREDENTIALS.format(from_jid=from_jid.userhost())
302 credentials = await self.storage.get(key) or {}
303
304 form = data_form.Form(formType="form", title="IMAP/SMTP Credentials")
305
306 # Add instructions
307 form.instructions = [
308 D_(
309 "Please provide your IMAP and SMTP credentials to configure the "
310 "connection."
311 )
312 ]
313
314 # Add identity fields
315 form.addField(
316 data_form.Field(
317 fieldType="text-single",
318 var="user_name",
319 label="User Name",
320 desc=D_('The display name to use in the "From" field of sent emails.'),
321 value=credentials.get("user_name"),
322 required=True,
323 )
324 )
325
326 form.addField(
327 data_form.Field(
328 fieldType="text-single",
329 var="user_email",
330 label="User Email",
331 desc=D_('The email address to use in the "From" field of sent emails.'),
332 value=credentials.get("user_email"),
333 required=True,
334 )
335 )
336
337 # Add fields for IMAP credentials
338 form.addField(
339 data_form.Field(
340 fieldType="text-single",
341 var="imap_host",
342 label="IMAP Host",
343 desc=D_("IMAP server hostname or IP address"),
344 value=credentials.get("imap_host"),
345 required=True,
346 )
347 )
348 form.addField(
349 data_form.Field(
350 fieldType="text-single",
351 var="imap_port",
352 label="IMAP Port",
353 desc=D_("IMAP server port (default: 993)"),
354 value=credentials.get("imap_port", "993"),
355 )
356 )
357 form.addField(
358 data_form.Field(
359 fieldType="text-single",
360 var="imap_username",
361 label="IMAP Username",
362 desc=D_("Username for IMAP authentication"),
363 value=credentials.get("imap_username"),
364 required=True,
365 )
366 )
367 form.addField(
368 data_form.Field(
369 fieldType="text-private",
370 var="imap_password",
371 label="IMAP Password",
372 desc=D_("Password for IMAP authentication"),
373 value=credentials.get("imap_password"),
374 required=True,
375 )
376 )
377
378 # Add fields for SMTP credentials
379 form.addField(
380 data_form.Field(
381 fieldType="text-single",
382 var="smtp_host",
383 label="SMTP Host",
384 desc=D_("SMTP server hostname or IP address"),
385 value=credentials.get("smtp_host"),
386 required=True,
387 )
388 )
389 form.addField(
390 data_form.Field(
391 fieldType="text-single",
392 var="smtp_port",
393 label="SMTP Port",
394 desc=D_("SMTP server port (default: 587)"),
395 value=credentials.get("smtp_port", "587"),
396 )
397 )
398 form.addField(
399 data_form.Field(
400 fieldType="text-single",
401 var="smtp_username",
402 label="SMTP Username",
403 desc=D_("Username for SMTP authentication"),
404 value=credentials.get("smtp_username"),
405 required=True,
406 )
407 )
408 form.addField(
409 data_form.Field(
410 fieldType="text-private",
411 var="smtp_password",
412 label="SMTP Password",
413 desc=D_("Password for SMTP authentication"),
414 value=credentials.get("smtp_password"),
415 required=True,
416 )
417 )
418
419 return bool(credentials), form
420
421 def validate_field(
422 self,
423 form: data_form.Form,
424 key: str,
425 field_type: str,
426 min_value: int | None = None,
427 max_value: int | None = None,
428 default: str | int | None = None,
429 ) -> None:
430 """Validate a single field.
431
432 @param form: The form containing the fields.
433 @param key: The key of the field to validate.
434 @param field_type: The expected type of the field value.
435 @param min_value: Optional minimum value for integer fields.
436 @param max_value: Optional maximum value for integer fields.
437 @param default: Default value to use if the field is missing.
438 @raise StanzaError: If the field value is invalid or missing.
439 """
440 field = form.fields.get(key)
441 if field is None:
442 if default is None:
443 raise StanzaError("bad-request", text=f"{key} is required")
444 field = data_form.Field(var=key, value=str(default))
445 form.addField(field)
446
447 value = field.value
448 if field_type == "int":
449 try:
450 value = int(value)
451 if (min_value is not None and value < min_value) or (
452 max_value is not None and value > max_value
453 ):
454 raise ValueError
455 except (ValueError, TypeError):
456 raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")
457 elif field_type == "str":
458 if not isinstance(value, str):
459 raise StanzaError("bad-request", text=f"Invalid value for {key}: {value}")
460
461 # Basic email validation for user_email field
462 if key == "user_email":
463 # XXX: This is a minimal check. A complete email validation is notoriously
464 # difficult.
465 if not email_pattern.match(value):
466 raise StanzaError(
467 "bad-request", text=f"Invalid email address: {value}"
468 )
469
470 def validate_imap_smtp_form(self, submit_form: data_form.Form) -> None:
471 """Validate the submitted IMAP/SMTP credentials form.
472
473 @param submit_form: The submitted form containing IMAP/SMTP credentials.
474 @raise StanzaError: If any of the values are invalid.
475 """
476 # Validate identity fields
477 self.validate_field(submit_form, "user_name", "str")
478 self.validate_field(submit_form, "user_email", "str")
479
480 # Validate IMAP fields
481 self.validate_field(submit_form, "imap_host", "str")
482 self.validate_field(
483 submit_form, "imap_port", "int", min_value=1, max_value=65535, default=993
484 )
485 self.validate_field(submit_form, "imap_username", "str")
486 self.validate_field(submit_form, "imap_password", "str")
487
488 # Validate SMTP fields
489 self.validate_field(submit_form, "smtp_host", "str")
490 self.validate_field(
491 submit_form, "smtp_port", "int", min_value=1, max_value=65535, default=587
492 )
493 self.validate_field(submit_form, "smtp_username", "str")
494 self.validate_field(submit_form, "smtp_password", "str")
495
496 async def on_new_email(self, to_jid: jid.JID, email: EmailMessage) -> None:
497 """Called when a new message has been received.
498
499 @param to_jid: JID of the recipient.
500 @param email: Parsed email.
501 """
502 assert self.client is not None
503 name, email_addr = parseaddr(email["from"])
504 email_addr = email_addr.lower()
505 from_jid = jid.JID(None, (self._e.escape(email_addr), self.client.jid.host, None))
506
507 # Get the email body
508 body_mime = email.get_body(("plain",))
509 if body_mime is not None:
510 charset = body_mime.get_content_charset() or "utf-8"
511 body = body_mime.get_payload(decode=True).decode(charset, errors="replace")
512 else:
513 log.warning(f"No body found in email:\n{email}")
514 body = ""
515
516 # Decode the subject
517 subject = email.get("subject")
518 if subject:
519 decoded_subject = decode_header(subject)
520 subject = "".join(
521 [
522 part.decode(encoding or "utf-8") if isinstance(part, bytes) else part
523 for part, encoding in decoded_subject
524 ]
525 ).strip()
526 else:
527 subject = None
528
529 client = self.client.get_virtual_client(from_jid)
530 await client.sendMessage(to_jid, {"": body}, {"": subject} if subject else None)
531
532 async def connect_imap(self, from_jid: jid.JID, user_data: UserData) -> None:
533 """Connect to IMAP service.
534
535 [self.on_new_email] will be used as callback on new messages.
536
537 @param from_jid: JID of the user associated with given credentials.
538 @param credentials: Email credentials.
539 """
540 credentials = user_data.credentials
541
542 connected = defer.Deferred()
543 factory = IMAPClientFactory(
544 user_data,
545 partial(self.on_new_email, from_jid.userhostJID()),
546 connected,
547 )
548 reactor.connectTCP(
549 credentials["imap_host"], int(credentials["imap_port"]), factory
550 )
551 await connected
552
553 async def _on_registration_submit(
554 self,
555 client: SatXMPPEntity,
556 iq_elt: domish.Element,
557 submit_form: data_form.Form | None,
558 ) -> bool | None:
559 """Handle registration submit request.
560
561 Submit form is validated, and credentials are stored.
562 @param client: client session.
563 iq_elt: IQ stanza of the submission request.
564 submit_form: submit form.
565 @return: True if successful.
566 None if the callback is not relevant for this request.
567 """
568 if client != self.client:
569 return
570 assert self.storage is not None
571 from_jid = jid.JID(iq_elt["from"]).userhostJID()
572
573 if submit_form is None:
574 # This is an unregistration request.
575 try:
576 user_data = self.users_data[from_jid]
577 except KeyError:
578 pass
579 else:
580 if user_data.imap_client is not None:
581 try:
582 await user_data.imap_client.logout()
583 except Exception:
584 log.exception(f"Can't log out {from_jid} from IMAP server.")
585 key = KEY_CREDENTIALS.format(from_jid=from_jid)
586 await self.storage.adel(key)
587 log.info(f"{from_jid} unregistered from this gateway.")
588 return True
589
590 self.validate_imap_smtp_form(submit_form)
591 credentials = {key: field.value for key, field in submit_form.fields.items()}
592 user_data = self.users_data.get(from_jid)
593 if user_data is None:
594 # The user is not in cache, we cache current credentials.
595 user_data = self.users_data[from_jid] = UserData(credentials=credentials)
596 else:
597 # The user is known, we update credentials.
598 user_data.credentials = credentials
599 key = KEY_CREDENTIALS.format(from_jid=from_jid)
600 try:
601 await self.connect_imap(from_jid, user_data)
602 except Exception as e:
603 log.warning(f"Can't connect to IMAP server for {from_jid}")
604 credentials["imap_success"] = False
605 await self.storage.aset(key, credentials)
606 raise e
607 else:
608 log.debug(f"Connection successful to IMAP server for {from_jid}")
609 credentials["imap_success"] = True
610 await self.storage.aset(key, credentials)
611 return True
612
613
614 @implementer(iwokkel.IDisco)
615 class EmailGatewayHandler(XMPPHandler):
616
617 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
618 return []
619
620 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
621 return []