Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_comp_email_gateway/imap.py @ 4303:a7ec325246fb
component email-gateway: first draft:
Initial implementation of the Email Gateway.
This component uses XEP-0100 for registration. Upon registration and subsequent startups,
a connection is made to registered IMAP services, and incoming emails (in `INBOX`
mailboxes) are immediately forwarded as XMPP messages.
In the opposite direction, an SMTP connection is established to send emails on incoming
XMPP messages.
rel 449
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Sep 2024 18:07:17 +0200 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_comp_email_gateway/imap.py Fri Sep 06 18:07:17 2024 +0200 @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 + +# Libervia Email Gateway Component +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from email.message import EmailMessage +from email.parser import BytesParser, Parser +from email import policy +from typing import Callable, cast +from twisted.internet import defer, protocol, reactor +from twisted.internet.base import DelayedCall +from twisted.mail import imap4 +from twisted.python.failure import Failure + +from libervia.backend.core import exceptions +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from .models import UserData + +log = getLogger(__name__) + + +class IMAPClient(imap4.IMAP4Client): + _idling = False + _idle_timer: DelayedCall | None = None + + def __init__(self, connected: defer.Deferred, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._connected = connected + + def serverGreeting(self, caps: dict) -> None: + """Handle the server greeting and capabilities. + + @param caps: Server capabilities. + """ + defer.ensureDeferred(self.on_server_greeting(caps)) + + async def on_server_greeting(self, caps: dict) -> None: + """Async method called when server greeting is received. + + @param caps: Server capabilities. + """ + self.server_capabilities = caps + try: + await self.authenticate(self.factory.password.encode()) + except Exception as e: + log.warning(f"Can't authenticate: {e}") + self._connected.errback( + exceptions.PasswordError("Authentication error for IMAP server.") + ) + return + log.debug("Authenticated.") + self._connected.callback(None) + if b"IDLE" in caps: + # We use "examine" for read-only access for now, will probably change in the + # future. + await self.examine(b"INBOX") + log.debug("Activating IDLE mode") + await self.idle() + else: + log.warning( + f'"IDLE" mode is not supported by your server, this gateways needs a ' + "server supporting this mode." + ) + return + + async def idle(self) -> None: + """Enter the IDLE mode to receive real-time updates from the server.""" + if self._idling: + # We are already in idle state. + return + self._idling = True + self._idle_timer = reactor.callLater(29 * 60, self.on_idle_timeout) + await self.sendCommand( + imap4.Command( + b"IDLE", + continuation=lambda *a, **kw: log.debug(f"continuation: {a=} {kw=}"), + ) + ) + + def idle_exit(self) -> None: + """Exit the IDLE mode.""" + assert self._idling + assert self._idle_timer is not None + if not self._idle_timer.called: + self._idle_timer.cancel() + self._idle_timer = None + # Send DONE command to exit IDLE mode. + self.sendLine(b"DONE") + self._idling = False + log.debug("IDLE mode terminated") + + def on_idle_timeout(self): + """Called when IDLE mode timeout is reached.""" + if self._idling: + # We've reached 29 min of IDLE mode, we restart it as recommended in the + # specifications. + self.idle_exit() + defer.ensureDeferred(self.idle()) + + def newMessages(self, exists: int | None, recent: int | None): + """Called when new messages are received. + + @param exists: Number of existing messages. + @param recent: Number of recent messages. + """ + defer.ensureDeferred(self.on_new_emails(exists, recent)) + + async def on_new_emails(self, exists: int | None, recent: int | None) -> None: + """Async method called when new messages are received. + + @param exists: Number of existing messages. + @param recent: Number of recent messages. + """ + log.debug(f"New messages: {exists}, Recent messages: {recent}") + log.debug("Retrieving last message.") + self.idle_exit() + mess_data = await self.fetchMessage("*") + for message in mess_data.values(): + try: + content = message["RFC822"] + except KeyError: + log.warning(f"Can't find content for {message}.") + continue + else: + if isinstance(content, str): + parser = Parser(policy=policy.default) + parser_method = parser.parsestr + elif isinstance(content, bytes): + parser = BytesParser(policy=policy.default) + parser_method = parser.parsebytes + else: + log.error(f"Invalid content: {content}") + continue + try: + parsed = parser_method(content) + except Exception as e: + log.warning(f"Can't parse content of email: {e}") + continue + else: + assert self.factory is not None + factory = cast(IMAPClientFactory, self.factory) + await factory.on_new_email(parsed) + + defer.ensureDeferred(self.idle()) + + def connectionLost(self, reason: Failure) -> None: + """Called when the connection is lost. + + @param reason: The reason for the lost connection. + """ + log.debug(f"connectionLost {reason=}") + if not self._connected.called: + self._connected.errback(reason) + super().connectionLost(reason) + + def lineReceived(self, line: bytes) -> None: + """Called when a line is received from the server. + + @param line: The received line. + """ + if self._idling: + if line == b"* OK Still here": + pass + elif line == b"+ idling": + pass + elif line.startswith(b"* "): + # Handle unsolicited responses during IDLE + self._extraInfo([imap4.parseNestedParens(line[2:])]) + else: + log.warning(f"Unexpected line received: {line!r}") + + return + + return super().lineReceived(line) + + def sendCommand(self, cmd: imap4.Command) -> defer.Deferred: + """Send a command to the server. + + This method is overriden to stop and restart IDLE mode when a command is received. + + @param cmd: The command to send. + @return: A deferred that fires when the command is sent. + """ + if self._idling and cmd.command != b"IDLE": + self.idle_exit() + d = super().sendCommand(cmd) + + def restart_idle_mode(ret): + defer.ensureDeferred(self.idle()) + return ret + + d.addCallback(restart_idle_mode) + return d + else: + return super().sendCommand(cmd) + + +class IMAPClientFactory(protocol.ClientFactory): + protocol = IMAPClient + + def __init__( + self, + user_data: UserData, + on_new_email: Callable[[EmailMessage], None], + connected: defer.Deferred, + ) -> None: + """Initialize the IMAP client factory. + + @param username: The username to use for authentication. + @param password: The password to use for authentication. + """ + credentials = user_data.credentials + self.user_data = user_data + self.username = credentials["imap_username"] + self.password = credentials["imap_password"] + self.on_new_email = on_new_email + self._connected = connected + + def buildProtocol(self, addr) -> IMAPClient: + """Build the IMAP client protocol. + + @return: The IMAP client protocol. + """ + assert self.protocol is not None + assert isinstance(self.protocol, type(IMAPClient)) + protocol_ = self.protocol(self._connected) + protocol_.factory = self + self.user_data.imap_client = protocol_ + assert isinstance(protocol_, IMAPClient) + protocol_.factory = self + encoded_username = self.username.encode() + + protocol_.registerAuthenticator(imap4.PLAINAuthenticator(encoded_username)) + protocol_.registerAuthenticator(imap4.LOGINAuthenticator(encoded_username)) + protocol_.registerAuthenticator( + imap4.CramMD5ClientAuthenticator(encoded_username) + ) + + return protocol_ + + def clientConnectionFailed(self, connector, reason: Failure) -> None: + """Called when the client connection fails. + + @param reason: The reason for the failure. + """ + log.warning(f"Connection failed: {reason}")