diff libervia/backend/plugins/plugin_comp_email_gateway/imap.py @ 4303:a7ec325246fb

component email-gateway: first draft: Initial implementation of the Email Gateway. This component uses XEP-0100 for registration. Upon registration and subsequent startups, a connection is made to registered IMAP services, and incoming emails (in `INBOX` mailboxes) are immediately forwarded as XMPP messages. In the opposite direction, an SMTP connection is established to send emails on incoming XMPP messages. rel 449
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 18:07:17 +0200
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_comp_email_gateway/imap.py	Fri Sep 06 18:07:17 2024 +0200
@@ -0,0 +1,260 @@
+#!/usr/bin/env python3
+
+# Libervia Email Gateway Component
+# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from email.message import EmailMessage
+from email.parser import BytesParser, Parser
+from email import policy
+from typing import Callable, cast
+from twisted.internet import defer, protocol, reactor
+from twisted.internet.base import DelayedCall
+from twisted.mail import imap4
+from twisted.python.failure import Failure
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from .models import UserData
+
+log = getLogger(__name__)
+
+
+class IMAPClient(imap4.IMAP4Client):
+    _idling = False
+    _idle_timer: DelayedCall | None = None
+
+    def __init__(self, connected: defer.Deferred, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self._connected = connected
+
+    def serverGreeting(self, caps: dict) -> None:
+        """Handle the server greeting and capabilities.
+
+        @param caps: Server capabilities.
+        """
+        defer.ensureDeferred(self.on_server_greeting(caps))
+
+    async def on_server_greeting(self, caps: dict) -> None:
+        """Async method called when server greeting is received.
+
+        @param caps: Server capabilities.
+        """
+        self.server_capabilities = caps
+        try:
+            await self.authenticate(self.factory.password.encode())
+        except Exception as e:
+            log.warning(f"Can't authenticate: {e}")
+            self._connected.errback(
+                exceptions.PasswordError("Authentication error for IMAP server.")
+            )
+            return
+        log.debug("Authenticated.")
+        self._connected.callback(None)
+        if b"IDLE" in caps:
+            # We use "examine" for read-only access for now, will probably change in the
+            # future.
+            await self.examine(b"INBOX")
+            log.debug("Activating IDLE mode")
+            await self.idle()
+        else:
+            log.warning(
+                f'"IDLE" mode is not supported by your server, this gateways needs a '
+                "server supporting this mode."
+            )
+            return
+
+    async def idle(self) -> None:
+        """Enter the IDLE mode to receive real-time updates from the server."""
+        if self._idling:
+            # We are already in idle state.
+            return
+        self._idling = True
+        self._idle_timer = reactor.callLater(29 * 60, self.on_idle_timeout)
+        await self.sendCommand(
+            imap4.Command(
+                b"IDLE",
+                continuation=lambda *a, **kw: log.debug(f"continuation: {a=} {kw=}"),
+            )
+        )
+
+    def idle_exit(self) -> None:
+        """Exit the IDLE mode."""
+        assert self._idling
+        assert self._idle_timer is not None
+        if not self._idle_timer.called:
+            self._idle_timer.cancel()
+        self._idle_timer = None
+        # Send DONE command to exit IDLE mode.
+        self.sendLine(b"DONE")
+        self._idling = False
+        log.debug("IDLE mode terminated")
+
+    def on_idle_timeout(self):
+        """Called when IDLE mode timeout is reached."""
+        if self._idling:
+            # We've reached 29 min of IDLE mode, we restart it as recommended in the
+            # specifications.
+            self.idle_exit()
+            defer.ensureDeferred(self.idle())
+
+    def newMessages(self, exists: int | None, recent: int | None):
+        """Called when new messages are received.
+
+        @param exists: Number of existing messages.
+        @param recent: Number of recent messages.
+        """
+        defer.ensureDeferred(self.on_new_emails(exists, recent))
+
+    async def on_new_emails(self, exists: int | None, recent: int | None) -> None:
+        """Async method called when new messages are received.
+
+        @param exists: Number of existing messages.
+        @param recent: Number of recent messages.
+        """
+        log.debug(f"New messages: {exists}, Recent messages: {recent}")
+        log.debug("Retrieving last message.")
+        self.idle_exit()
+        mess_data = await self.fetchMessage("*")
+        for message in mess_data.values():
+            try:
+                content = message["RFC822"]
+            except KeyError:
+                log.warning(f"Can't find content for {message}.")
+                continue
+            else:
+                if isinstance(content, str):
+                    parser = Parser(policy=policy.default)
+                    parser_method = parser.parsestr
+                elif isinstance(content, bytes):
+                    parser = BytesParser(policy=policy.default)
+                    parser_method = parser.parsebytes
+                else:
+                    log.error(f"Invalid content: {content}")
+                    continue
+                try:
+                    parsed = parser_method(content)
+                except Exception as e:
+                    log.warning(f"Can't parse content of email: {e}")
+                    continue
+                else:
+                    assert self.factory is not None
+                    factory = cast(IMAPClientFactory, self.factory)
+                    await factory.on_new_email(parsed)
+
+        defer.ensureDeferred(self.idle())
+
+    def connectionLost(self, reason: Failure) -> None:
+        """Called when the connection is lost.
+
+        @param reason: The reason for the lost connection.
+        """
+        log.debug(f"connectionLost {reason=}")
+        if not self._connected.called:
+            self._connected.errback(reason)
+        super().connectionLost(reason)
+
+    def lineReceived(self, line: bytes) -> None:
+        """Called when a line is received from the server.
+
+        @param line: The received line.
+        """
+        if self._idling:
+            if line == b"* OK Still here":
+                pass
+            elif line == b"+ idling":
+                pass
+            elif line.startswith(b"* "):
+                # Handle unsolicited responses during IDLE
+                self._extraInfo([imap4.parseNestedParens(line[2:])])
+            else:
+                log.warning(f"Unexpected line received: {line!r}")
+
+            return
+
+        return super().lineReceived(line)
+
+    def sendCommand(self, cmd: imap4.Command) -> defer.Deferred:
+        """Send a command to the server.
+
+        This method is overriden to stop and restart IDLE mode when a command is received.
+
+        @param cmd: The command to send.
+        @return: A deferred that fires when the command is sent.
+        """
+        if self._idling and cmd.command != b"IDLE":
+            self.idle_exit()
+            d = super().sendCommand(cmd)
+
+            def restart_idle_mode(ret):
+                defer.ensureDeferred(self.idle())
+                return ret
+
+            d.addCallback(restart_idle_mode)
+            return d
+        else:
+            return super().sendCommand(cmd)
+
+
+class IMAPClientFactory(protocol.ClientFactory):
+    protocol = IMAPClient
+
+    def __init__(
+        self,
+        user_data: UserData,
+        on_new_email: Callable[[EmailMessage], None],
+        connected: defer.Deferred,
+    ) -> None:
+        """Initialize the IMAP client factory.
+
+        @param username: The username to use for authentication.
+        @param password: The password to use for authentication.
+        """
+        credentials = user_data.credentials
+        self.user_data = user_data
+        self.username = credentials["imap_username"]
+        self.password = credentials["imap_password"]
+        self.on_new_email = on_new_email
+        self._connected = connected
+
+    def buildProtocol(self, addr) -> IMAPClient:
+        """Build the IMAP client protocol.
+
+        @return: The IMAP client protocol.
+        """
+        assert self.protocol is not None
+        assert isinstance(self.protocol, type(IMAPClient))
+        protocol_ = self.protocol(self._connected)
+        protocol_.factory = self
+        self.user_data.imap_client = protocol_
+        assert isinstance(protocol_, IMAPClient)
+        protocol_.factory = self
+        encoded_username = self.username.encode()
+
+        protocol_.registerAuthenticator(imap4.PLAINAuthenticator(encoded_username))
+        protocol_.registerAuthenticator(imap4.LOGINAuthenticator(encoded_username))
+        protocol_.registerAuthenticator(
+            imap4.CramMD5ClientAuthenticator(encoded_username)
+        )
+
+        return protocol_
+
+    def clientConnectionFailed(self, connector, reason: Failure) -> None:
+        """Called when the client connection fails.
+
+        @param reason: The reason for the failure.
+        """
+        log.warning(f"Connection failed: {reason}")