view tests/unit/test_email_gateway.py @ 4340:ea72364131d5 default tip @

doc (components): Update Email Gateway documentation: A section has been added to explain how attachments are handled. fix 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:53:18 +0100
parents 699aa8788d98
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from email.message import EmailMessage
from email.parser import BytesParser
from email.utils import formataddr
from unittest.mock import AsyncMock, MagicMock

import pytest
from pytest_twisted import ensureDeferred as ed
from twisted.words.protocols.jabber import jid
from wokkel import disco

from libervia.backend.plugins.plugin_comp_email_gateway import (
    EmailGatewayComponent,
    SendMailExtra,
)
from libervia.backend.plugins.plugin_comp_email_gateway.models import (
    Credentials,
    UserData,
)
from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import (
    EmailGWPubsubService,
    NODE_CONFIG,
    NODE_CONFIG_VALUES,
    NODE_OPTIONS,
)
from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency

@pytest.fixture
def email_gw(host):
    email_gw = EmailGatewayComponent(host)
    email_gw.storage = MagicMock()
    email_gw.users_data = {}
    return email_gw

class TestEmailGatewayComponent:

    def test_jid_to_email_gateway_jid(self, email_gw):
        """JID from the gateway is converted to an email address."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        address_jid = jid.JID(r"user\40some-email-domain.example@gateway.example.org")
        credentials = {"user_email": "user@example.org"}
        result = email_gw.jid_to_email(client, address_jid, credentials)
        assert result == "user@some-email-domain.example"

    def test_jid_to_email_non_gateway_jid(self, email_gw):
        """Non-gateway JID is converted to an email address with ``xmpp:``."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        address_jid = jid.JID("external@example.org")
        credentials = {"user_email": "user@example.org"}
        result = email_gw.jid_to_email(client, address_jid, credentials)
        assert result == '"xmpp:external@example.org" <user@example.org>'

    def test_email_to_jid_user_email(self, email_gw):
        """User email returns user JID if no "xmpp:" scheme is used."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        email_name = ""
        email_addr = "user@example.org"
        user_email = "user@example.org"
        user_jid = jid.JID("gw-user@example.org")
        result, name = email_gw.email_to_jid(
            client, user_email, user_jid, email_name, email_addr
        )
        assert result == jid.JID("gw-user@example.org")
        assert name is None

    def test_email_to_jid_xmpp_address(self, email_gw):
        """Email address with XMPP in name part is converted to a JID."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        email_name = "xmpp:user@example.net"
        email_addr = "user@example.org"
        user_email = "user@example.org"
        user_jid = jid.JID("gw-user@example.org")
        result, name = email_gw.email_to_jid(
            client, user_email, user_jid, email_name, email_addr
        )
        assert result == jid.JID("user@example.net")
        assert name is None

    def test_email_to_jid_regular_email(self, email_gw):
        """Regular email address is converted to a JID with escaped local part."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        email_name = "User Name"
        email_addr = "user@some-email-domain.example"
        user_email = "user@example.org"
        user_jid = jid.JID("gw-user@example.org")
        result, name = email_gw.email_to_jid(
            client, user_email, user_jid, email_name, email_addr
        )
        assert result == jid.JID(r"user\40some-email-domain.example@gateway.example.org")
        assert name == "User Name"

    @ed
    async def test_on_new_email_single_recipient(self, email_gw):
        """Email with a single recipient is correctly processed and sent as a message."""
        client = MagicMock()
        client.get_virtual_client = lambda __: client
        client.jid = jid.JID("gateway.example.org")
        client.sendMessage = AsyncMock()
        email_gw.client = client

        email = EmailMessage()
        email["from"] = formataddr(("User Name", "sender@somewhere.example"))
        email["to"] = "user@example.org"
        email.set_content("Hello, world!")

        to_jid = jid.JID("gw-user@example.org")
        user_email = "user@example.org"
        user_data = UserData(Credentials({"user_email": user_email}))
        await email_gw.on_new_email(user_data, to_jid, email)

        client.sendMessage.assert_called_once()
        call_args = client.sendMessage.call_args[0]
        assert call_args[0] == to_jid
        assert call_args[1] == {"": "Hello, world!\n"}
        assert call_args[2] == None
        client.sendMessage.assert_called_once_with(
            to_jid, {"": "Hello, world!\n"}, None, extra={}
        )

    @ed
    async def test_on_new_email_multiple_recipients(self, email_gw):
        """Email with multiple recipients is correctly processed and sent ."""
        client = MagicMock()
        client.get_virtual_client = lambda __: client
        client.jid = jid.JID("gateway.example.org")
        client.sendMessage = AsyncMock()
        email_gw.client = client

        email = EmailMessage()
        email["from"] = formataddr(("User Name", "user@example.org"))
        email["to"] = "user@example.org"
        email["cc"] = "recipient2@example.org"
        email["bcc"] = "recipient3@example.org"
        email.set_content("Hello, world!")

        user_jid = jid.JID("gw-user@example.org")
        user_email = "user@example.org"
        user_data = UserData(Credentials({"user_email": user_email}))
        await email_gw.on_new_email(user_data, user_jid, email)

        client.sendMessage.assert_called_once_with(
            user_jid,
            {"": "Hello, world!\n"},
            None,
            extra={
                "addresses": {
                    "to": [{"jid": "gw-user@example.org", "delivered": True}],
                    "cc": [
                        {
                            "jid": "recipient2\\40example.org@gateway.example.org",
                            "delivered": True,
                        }
                    ],
                    "bcc": [
                        {
                            "jid": "recipient3\\40example.org@gateway.example.org",
                            "delivered": True,
                        }
                    ],
                }
            },
        )

    @ed
    async def test_send_email_with_headers(self, email_gw, monkeypatch):
        """Email is sent with correct headers."""
        email_gw.client = MagicMock()
        email_gw.client.jid = jid.JID("gateway.example.org")
        email_gw.storage = MagicMock()
        email_gw.storage.get = AsyncMock(
            return_value={
                "user_email": "user@example.org",
                "user_name": "Sender Name",
                "smtp_host": "smtp.example.org",
                "smtp_port": "587",
                "smtp_username": "sender",
                "smtp_password": "password",
            }
        )

        from_jid = jid.JID("user@example.org")
        to_email = "recipient@example.com"
        body = "Hello, world!"
        subject = "Test email"
        headers = HeadersData(keywords="important,urgent", urgency=Urgency.high)

        # Mock the smtp.sendmail function
        sendmail_mock = AsyncMock()
        monkeypatch.setattr("twisted.mail.smtp.sendmail", sendmail_mock)

        await email_gw.send_email(
            from_jid, to_email, body, subject, extra=SendMailExtra(headers=headers)
        )

        sendmail_mock.assert_called_once()

        # Extract the email content from the call arguments
        call_args = sendmail_mock.call_args[0]
        _, _, _, email_content_bytes = call_args

        # Parse the email content
        parser = BytesParser()
        msg = parser.parsebytes(email_content_bytes)

        # Assert the headers are correctly set
        assert msg["Keywords"] == headers.keywords
        assert msg["Importance"] == "high"

    @ed
    async def test_on_new_email_with_headers(self, email_gw):
        """Headers from the email are correctly processed and included in the message."""
        client = MagicMock()
        client.get_virtual_client = lambda __: client
        client.jid = jid.JID("gateway.example.org")
        client.sendMessage = AsyncMock()
        email_gw.client = client

        email = EmailMessage()
        email["from"] = formataddr(("User Name", "sender@somewhere.example"))
        email["to"] = "user@example.org"
        email.set_content("Hello, world!")
        email["Keywords"] = "test, example"
        email["Importance"] = "high"

        to_jid = jid.JID("gw-user@example.org")
        user_email = "user@example.org"
        user_data = UserData(Credentials({"user_email": user_email}))
        await email_gw.on_new_email(user_data, to_jid, email)

        client.sendMessage.assert_called_once_with(
            to_jid,
            {"": "Hello, world!\n"},
            None,
            extra={"headers": {"keywords": "test, example", "urgency": "high"}},
        )


class TestPubsubService:

    @pytest.fixture
    def pubsub_service(self, email_gw, client):
        email_gw.client = client
        service = EmailGWPubsubService(email_gw)
        return service

    @pytest.fixture
    def pubsub_resource(self, pubsub_service):
        return pubsub_service.resource

    def test_getNodes(self, pubsub_resource, client):
        """XEP-0498 well-known node is returned."""
        requestor = client.jid
        service = client.pubsub_service
        nodeIdentifier = "test_node"
        result = pubsub_resource.getNodes(requestor, service, nodeIdentifier)
        assert result.result == [pubsub_resource._pfs.namespace]

    @ed
    async def test_items(self, pubsub_resource, client, host):
        """Items are retrieved from the storage"""
        request = MagicMock()
        request.sender = client.jid
        request.nodeIdentifier = pubsub_resource._pfs.namespace
        files = [
            {"id": "1", "name": "file1", "media_type": "application", "media_subtype":
             "octet-stream", "size": 123, "hash_algo": "sha-256", "file_hash": "0123456789abcdef",
             "created": 123},
            {"id": "2", "name": "file2", "media_type": "application", "media_subtype":
             "octet-stream", "size": 456, "hash_algo": "sha-256", "file_hash": "0123456789abcdef",
             "created": 123},
        ]
        host.memory.get_files = AsyncMock(return_value=files)
        result, _ = await pubsub_resource.items(request)
        assert len(result) == 2

    @ed
    async def test_retract(self, pubsub_resource, client, host):
        """Items are retracted from the storage"""
        request = MagicMock()
        request.sender = client.jid
        request.nodeIdentifier = pubsub_resource._pfs.namespace
        request.itemIdentifiers = ["item_1"]
        host.memory.file_delete = AsyncMock()
        await pubsub_resource.retract(request)
        host.memory.file_delete.assert_called_once()

    def test_getConfigurationOptions(self, pubsub_resource):
        """Configuration options are returned"""
        options = pubsub_resource.getConfigurationOptions()
        assert options == NODE_OPTIONS

    def test_getConfiguration(self, pubsub_resource, client):
        """Configuration values are returned"""
        requestor = client.jid
        service = client.pubsub_service
        nodeIdentifier = "test_node"
        result = pubsub_resource.getConfiguration(requestor, service, nodeIdentifier)
        assert result.result == NODE_CONFIG_VALUES

    def test_getNodeInfo(self, pubsub_resource, client):
        """Node information is returned"""
        requestor = client.jid
        service = client.pubsub_service
        nodeIdentifier = pubsub_resource._pfs.namespace
        info = pubsub_resource.getNodeInfo(requestor, service, nodeIdentifier)
        assert info == {"type": "leaf", "meta-data": NODE_CONFIG}

    @ed
    async def test_getDiscoInfo(self, pubsub_service, client):
        """Disco information is returned"""
        requestor = client.jid
        target = client.pubsub_service
        nodeIdentifier = ""
        result = await pubsub_service.getDiscoInfo(requestor, target, nodeIdentifier)
        assert len(result) > 0
        assert any(isinstance(info, disco.DiscoFeature) for info in result)