view libervia/backend/plugins/plugin_comp_email_gateway/imap.py @ 4309:b56b1eae7994

component email gateway: add multicasting: XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033 metadata are converted to suitable Email headers and vice versa. Email address and JID are both supported, and delivery is done by the gateway when suitable on incoming messages. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents a7ec325246fb
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Email Gateway Component
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from email.message import EmailMessage
from email.parser import BytesParser, Parser
from email import policy
from typing import Callable, cast
from twisted.internet import defer, protocol, reactor
from twisted.internet.base import DelayedCall
from twisted.mail import imap4
from twisted.python.failure import Failure

from libervia.backend.core import exceptions
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from .models import UserData

log = getLogger(__name__)


class IMAPClient(imap4.IMAP4Client):
    _idling = False
    _idle_timer: DelayedCall | None = None

    def __init__(self, connected: defer.Deferred, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self._connected = connected

    def serverGreeting(self, caps: dict) -> None:
        """Handle the server greeting and capabilities.

        @param caps: Server capabilities.
        """
        defer.ensureDeferred(self.on_server_greeting(caps))

    async def on_server_greeting(self, caps: dict) -> None:
        """Async method called when server greeting is received.

        @param caps: Server capabilities.
        """
        self.server_capabilities = caps
        try:
            await self.authenticate(self.factory.password.encode())
        except Exception as e:
            log.warning(f"Can't authenticate: {e}")
            self._connected.errback(
                exceptions.PasswordError("Authentication error for IMAP server.")
            )
            return
        log.debug("Authenticated.")
        self._connected.callback(None)
        if b"IDLE" in caps:
            # We use "examine" for read-only access for now, will probably change in the
            # future.
            await self.examine(b"INBOX")
            log.debug("Activating IDLE mode")
            await self.idle()
        else:
            log.warning(
                f'"IDLE" mode is not supported by your server, this gateways needs a '
                "server supporting this mode."
            )
            return

    async def idle(self) -> None:
        """Enter the IDLE mode to receive real-time updates from the server."""
        if self._idling:
            # We are already in idle state.
            return
        self._idling = True
        self._idle_timer = reactor.callLater(29 * 60, self.on_idle_timeout)
        await self.sendCommand(
            imap4.Command(
                b"IDLE",
                continuation=lambda *a, **kw: log.debug(f"continuation: {a=} {kw=}"),
            )
        )

    def idle_exit(self) -> None:
        """Exit the IDLE mode."""
        assert self._idling
        assert self._idle_timer is not None
        if not self._idle_timer.called:
            self._idle_timer.cancel()
        self._idle_timer = None
        # Send DONE command to exit IDLE mode.
        self.sendLine(b"DONE")
        self._idling = False
        log.debug("IDLE mode terminated")

    def on_idle_timeout(self):
        """Called when IDLE mode timeout is reached."""
        if self._idling:
            # We've reached 29 min of IDLE mode, we restart it as recommended in the
            # specifications.
            self.idle_exit()
            defer.ensureDeferred(self.idle())

    def newMessages(self, exists: int | None, recent: int | None):
        """Called when new messages are received.

        @param exists: Number of existing messages.
        @param recent: Number of recent messages.
        """
        defer.ensureDeferred(self.on_new_emails(exists, recent))

    async def on_new_emails(self, exists: int | None, recent: int | None) -> None:
        """Async method called when new messages are received.

        @param exists: Number of existing messages.
        @param recent: Number of recent messages.
        """
        log.debug(f"New messages: {exists}, Recent messages: {recent}")
        log.debug("Retrieving last message.")
        self.idle_exit()
        mess_data = await self.fetchMessage("*")
        for message in mess_data.values():
            try:
                content = message["RFC822"]
            except KeyError:
                log.warning(f"Can't find content for {message}.")
                continue
            else:
                if isinstance(content, str):
                    parser = Parser(policy=policy.default)
                    parser_method = parser.parsestr
                elif isinstance(content, bytes):
                    parser = BytesParser(policy=policy.default)
                    parser_method = parser.parsebytes
                else:
                    log.error(f"Invalid content: {content}")
                    continue
                try:
                    parsed = parser_method(content)
                except Exception as e:
                    log.warning(f"Can't parse content of email: {e}")
                    continue
                else:
                    assert self.factory is not None
                    factory = cast(IMAPClientFactory, self.factory)
                    await factory.on_new_email(parsed)

        defer.ensureDeferred(self.idle())

    def connectionLost(self, reason: Failure) -> None:
        """Called when the connection is lost.

        @param reason: The reason for the lost connection.
        """
        log.debug(f"connectionLost {reason=}")
        if not self._connected.called:
            self._connected.errback(reason)
        super().connectionLost(reason)

    def lineReceived(self, line: bytes) -> None:
        """Called when a line is received from the server.

        @param line: The received line.
        """
        if self._idling:
            if line == b"* OK Still here":
                pass
            elif line == b"+ idling":
                pass
            elif line.startswith(b"* "):
                # Handle unsolicited responses during IDLE
                self._extraInfo([imap4.parseNestedParens(line[2:])])
            else:
                log.warning(f"Unexpected line received: {line!r}")

            return

        return super().lineReceived(line)

    def sendCommand(self, cmd: imap4.Command) -> defer.Deferred:
        """Send a command to the server.

        This method is overriden to stop and restart IDLE mode when a command is received.

        @param cmd: The command to send.
        @return: A deferred that fires when the command is sent.
        """
        if self._idling and cmd.command != b"IDLE":
            self.idle_exit()
            d = super().sendCommand(cmd)

            def restart_idle_mode(ret):
                defer.ensureDeferred(self.idle())
                return ret

            d.addCallback(restart_idle_mode)
            return d
        else:
            return super().sendCommand(cmd)


class IMAPClientFactory(protocol.ClientFactory):
    protocol = IMAPClient

    def __init__(
        self,
        user_data: UserData,
        on_new_email: Callable[[EmailMessage], None],
        connected: defer.Deferred,
    ) -> None:
        """Initialize the IMAP client factory.

        @param username: The username to use for authentication.
        @param password: The password to use for authentication.
        """
        credentials = user_data.credentials
        self.user_data = user_data
        self.username = credentials["imap_username"]
        self.password = credentials["imap_password"]
        self.on_new_email = on_new_email
        self._connected = connected

    def buildProtocol(self, addr) -> IMAPClient:
        """Build the IMAP client protocol.

        @return: The IMAP client protocol.
        """
        assert self.protocol is not None
        assert isinstance(self.protocol, type(IMAPClient))
        protocol_ = self.protocol(self._connected)
        protocol_.factory = self
        self.user_data.imap_client = protocol_
        assert isinstance(protocol_, IMAPClient)
        protocol_.factory = self
        encoded_username = self.username.encode()

        protocol_.registerAuthenticator(imap4.PLAINAuthenticator(encoded_username))
        protocol_.registerAuthenticator(imap4.LOGINAuthenticator(encoded_username))
        protocol_.registerAuthenticator(
            imap4.CramMD5ClientAuthenticator(encoded_username)
        )

        return protocol_

    def clientConnectionFailed(self, connector, reason: Failure) -> None:
        """Called when the client connection fails.

        @param reason: The reason for the failure.
        """
        log.warning(f"Connection failed: {reason}")