Mercurial > libervia-backend
view libervia/backend/plugins/plugin_comp_email_gateway/imap.py @ 4339:699aa8788d98
tests (unit/email gateway): add tests for pubsub service:
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:52:06 +0100 |
parents | a7ec325246fb |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia Email Gateway Component # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from email.message import EmailMessage from email.parser import BytesParser, Parser from email import policy from typing import Callable, cast from twisted.internet import defer, protocol, reactor from twisted.internet.base import DelayedCall from twisted.mail import imap4 from twisted.python.failure import Failure from libervia.backend.core import exceptions from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from .models import UserData log = getLogger(__name__) class IMAPClient(imap4.IMAP4Client): _idling = False _idle_timer: DelayedCall | None = None def __init__(self, connected: defer.Deferred, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._connected = connected def serverGreeting(self, caps: dict) -> None: """Handle the server greeting and capabilities. @param caps: Server capabilities. """ defer.ensureDeferred(self.on_server_greeting(caps)) async def on_server_greeting(self, caps: dict) -> None: """Async method called when server greeting is received. @param caps: Server capabilities. """ self.server_capabilities = caps try: await self.authenticate(self.factory.password.encode()) except Exception as e: log.warning(f"Can't authenticate: {e}") self._connected.errback( exceptions.PasswordError("Authentication error for IMAP server.") ) return log.debug("Authenticated.") self._connected.callback(None) if b"IDLE" in caps: # We use "examine" for read-only access for now, will probably change in the # future. await self.examine(b"INBOX") log.debug("Activating IDLE mode") await self.idle() else: log.warning( f'"IDLE" mode is not supported by your server, this gateways needs a ' "server supporting this mode." ) return async def idle(self) -> None: """Enter the IDLE mode to receive real-time updates from the server.""" if self._idling: # We are already in idle state. return self._idling = True self._idle_timer = reactor.callLater(29 * 60, self.on_idle_timeout) await self.sendCommand( imap4.Command( b"IDLE", continuation=lambda *a, **kw: log.debug(f"continuation: {a=} {kw=}"), ) ) def idle_exit(self) -> None: """Exit the IDLE mode.""" assert self._idling assert self._idle_timer is not None if not self._idle_timer.called: self._idle_timer.cancel() self._idle_timer = None # Send DONE command to exit IDLE mode. self.sendLine(b"DONE") self._idling = False log.debug("IDLE mode terminated") def on_idle_timeout(self): """Called when IDLE mode timeout is reached.""" if self._idling: # We've reached 29 min of IDLE mode, we restart it as recommended in the # specifications. self.idle_exit() defer.ensureDeferred(self.idle()) def newMessages(self, exists: int | None, recent: int | None): """Called when new messages are received. @param exists: Number of existing messages. @param recent: Number of recent messages. """ defer.ensureDeferred(self.on_new_emails(exists, recent)) async def on_new_emails(self, exists: int | None, recent: int | None) -> None: """Async method called when new messages are received. @param exists: Number of existing messages. @param recent: Number of recent messages. """ log.debug(f"New messages: {exists}, Recent messages: {recent}") log.debug("Retrieving last message.") self.idle_exit() mess_data = await self.fetchMessage("*") for message in mess_data.values(): try: content = message["RFC822"] except KeyError: log.warning(f"Can't find content for {message}.") continue else: if isinstance(content, str): parser = Parser(policy=policy.default) parser_method = parser.parsestr elif isinstance(content, bytes): parser = BytesParser(policy=policy.default) parser_method = parser.parsebytes else: log.error(f"Invalid content: {content}") continue try: parsed = parser_method(content) except Exception as e: log.warning(f"Can't parse content of email: {e}") continue else: assert self.factory is not None factory = cast(IMAPClientFactory, self.factory) await factory.on_new_email(parsed) defer.ensureDeferred(self.idle()) def connectionLost(self, reason: Failure) -> None: """Called when the connection is lost. @param reason: The reason for the lost connection. """ log.debug(f"connectionLost {reason=}") if not self._connected.called: self._connected.errback(reason) super().connectionLost(reason) def lineReceived(self, line: bytes) -> None: """Called when a line is received from the server. @param line: The received line. """ if self._idling: if line == b"* OK Still here": pass elif line == b"+ idling": pass elif line.startswith(b"* "): # Handle unsolicited responses during IDLE self._extraInfo([imap4.parseNestedParens(line[2:])]) else: log.warning(f"Unexpected line received: {line!r}") return return super().lineReceived(line) def sendCommand(self, cmd: imap4.Command) -> defer.Deferred: """Send a command to the server. This method is overriden to stop and restart IDLE mode when a command is received. @param cmd: The command to send. @return: A deferred that fires when the command is sent. """ if self._idling and cmd.command != b"IDLE": self.idle_exit() d = super().sendCommand(cmd) def restart_idle_mode(ret): defer.ensureDeferred(self.idle()) return ret d.addCallback(restart_idle_mode) return d else: return super().sendCommand(cmd) class IMAPClientFactory(protocol.ClientFactory): protocol = IMAPClient def __init__( self, user_data: UserData, on_new_email: Callable[[EmailMessage], None], connected: defer.Deferred, ) -> None: """Initialize the IMAP client factory. @param username: The username to use for authentication. @param password: The password to use for authentication. """ credentials = user_data.credentials self.user_data = user_data self.username = credentials["imap_username"] self.password = credentials["imap_password"] self.on_new_email = on_new_email self._connected = connected def buildProtocol(self, addr) -> IMAPClient: """Build the IMAP client protocol. @return: The IMAP client protocol. """ assert self.protocol is not None assert isinstance(self.protocol, type(IMAPClient)) protocol_ = self.protocol(self._connected) protocol_.factory = self self.user_data.imap_client = protocol_ assert isinstance(protocol_, IMAPClient) protocol_.factory = self encoded_username = self.username.encode() protocol_.registerAuthenticator(imap4.PLAINAuthenticator(encoded_username)) protocol_.registerAuthenticator(imap4.LOGINAuthenticator(encoded_username)) protocol_.registerAuthenticator( imap4.CramMD5ClientAuthenticator(encoded_username) ) return protocol_ def clientConnectionFailed(self, connector, reason: Failure) -> None: """Called when the client connection fails. @param reason: The reason for the failure. """ log.warning(f"Connection failed: {reason}")