view tests/unit/test_email_gateway.py @ 4351:6a0a081485b8

plugin autocrypt: Autocrypt protocol implementation: Implementation of autocrypt: `autocrypt` header is checked, and if present and no public key is known for the peer, the key is imported. `autocrypt` header is also added to outgoing message (only if an email gateway is detected). For the moment, the JID is use as identifier, but the real email used by gateway should be used in the future. rel 456
author Goffi <goffi@goffi.org>
date Fri, 28 Feb 2025 09:23:35 +0100
parents 699aa8788d98
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from email.message import EmailMessage
from email.parser import BytesParser
from email.utils import formataddr
from unittest.mock import AsyncMock, MagicMock

import pytest
from pytest_twisted import ensureDeferred as ed
from twisted.words.protocols.jabber import jid
from wokkel import disco

from libervia.backend.plugins.plugin_comp_email_gateway import (
    EmailGatewayComponent,
    SendMailExtra,
)
from libervia.backend.plugins.plugin_comp_email_gateway.models import (
    Credentials,
    UserData,
)
from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import (
    EmailGWPubsubService,
    NODE_CONFIG,
    NODE_CONFIG_VALUES,
    NODE_OPTIONS,
)
from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency

@pytest.fixture
def email_gw(host):
    email_gw = EmailGatewayComponent(host)
    email_gw.storage = MagicMock()
    email_gw.users_data = {}
    return email_gw

class TestEmailGatewayComponent:

    def test_jid_to_email_gateway_jid(self, email_gw):
        """JID from the gateway is converted to an email address."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        address_jid = jid.JID(r"user\40some-email-domain.example@gateway.example.org")
        credentials = {"user_email": "user@example.org"}
        result = email_gw.jid_to_email(client, address_jid, credentials)
        assert result == "user@some-email-domain.example"

    def test_jid_to_email_non_gateway_jid(self, email_gw):
        """Non-gateway JID is converted to an email address with ``xmpp:``."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        address_jid = jid.JID("external@example.org")
        credentials = {"user_email": "user@example.org"}
        result = email_gw.jid_to_email(client, address_jid, credentials)
        assert result == '"xmpp:external@example.org" <user@example.org>'

    def test_email_to_jid_user_email(self, email_gw):
        """User email returns user JID if no "xmpp:" scheme is used."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        email_name = ""
        email_addr = "user@example.org"
        user_email = "user@example.org"
        user_jid = jid.JID("gw-user@example.org")
        result, name = email_gw.email_to_jid(
            client, user_email, user_jid, email_name, email_addr
        )
        assert result == jid.JID("gw-user@example.org")
        assert name is None

    def test_email_to_jid_xmpp_address(self, email_gw):
        """Email address with XMPP in name part is converted to a JID."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        email_name = "xmpp:user@example.net"
        email_addr = "user@example.org"
        user_email = "user@example.org"
        user_jid = jid.JID("gw-user@example.org")
        result, name = email_gw.email_to_jid(
            client, user_email, user_jid, email_name, email_addr
        )
        assert result == jid.JID("user@example.net")
        assert name is None

    def test_email_to_jid_regular_email(self, email_gw):
        """Regular email address is converted to a JID with escaped local part."""
        client = MagicMock()
        client.jid = jid.JID("gateway.example.org")
        email_name = "User Name"
        email_addr = "user@some-email-domain.example"
        user_email = "user@example.org"
        user_jid = jid.JID("gw-user@example.org")
        result, name = email_gw.email_to_jid(
            client, user_email, user_jid, email_name, email_addr
        )
        assert result == jid.JID(r"user\40some-email-domain.example@gateway.example.org")
        assert name == "User Name"

    @ed
    async def test_on_new_email_single_recipient(self, email_gw):
        """Email with a single recipient is correctly processed and sent as a message."""
        client = MagicMock()
        client.get_virtual_client = lambda __: client
        client.jid = jid.JID("gateway.example.org")
        client.sendMessage = AsyncMock()
        email_gw.client = client

        email = EmailMessage()
        email["from"] = formataddr(("User Name", "sender@somewhere.example"))
        email["to"] = "user@example.org"
        email.set_content("Hello, world!")

        to_jid = jid.JID("gw-user@example.org")
        user_email = "user@example.org"
        user_data = UserData(Credentials({"user_email": user_email}))
        await email_gw.on_new_email(user_data, to_jid, email)

        client.sendMessage.assert_called_once()
        call_args = client.sendMessage.call_args[0]
        assert call_args[0] == to_jid
        assert call_args[1] == {"": "Hello, world!\n"}
        assert call_args[2] == None
        client.sendMessage.assert_called_once_with(
            to_jid, {"": "Hello, world!\n"}, None, extra={}
        )

    @ed
    async def test_on_new_email_multiple_recipients(self, email_gw):
        """Email with multiple recipients is correctly processed and sent ."""
        client = MagicMock()
        client.get_virtual_client = lambda __: client
        client.jid = jid.JID("gateway.example.org")
        client.sendMessage = AsyncMock()
        email_gw.client = client

        email = EmailMessage()
        email["from"] = formataddr(("User Name", "user@example.org"))
        email["to"] = "user@example.org"
        email["cc"] = "recipient2@example.org"
        email["bcc"] = "recipient3@example.org"
        email.set_content("Hello, world!")

        user_jid = jid.JID("gw-user@example.org")
        user_email = "user@example.org"
        user_data = UserData(Credentials({"user_email": user_email}))
        await email_gw.on_new_email(user_data, user_jid, email)

        client.sendMessage.assert_called_once_with(
            user_jid,
            {"": "Hello, world!\n"},
            None,
            extra={
                "addresses": {
                    "to": [{"jid": "gw-user@example.org", "delivered": True}],
                    "cc": [
                        {
                            "jid": "recipient2\\40example.org@gateway.example.org",
                            "delivered": True,
                        }
                    ],
                    "bcc": [
                        {
                            "jid": "recipient3\\40example.org@gateway.example.org",
                            "delivered": True,
                        }
                    ],
                }
            },
        )

    @ed
    async def test_send_email_with_headers(self, email_gw, monkeypatch):
        """Email is sent with correct headers."""
        email_gw.client = MagicMock()
        email_gw.client.jid = jid.JID("gateway.example.org")
        email_gw.storage = MagicMock()
        email_gw.storage.get = AsyncMock(
            return_value={
                "user_email": "user@example.org",
                "user_name": "Sender Name",
                "smtp_host": "smtp.example.org",
                "smtp_port": "587",
                "smtp_username": "sender",
                "smtp_password": "password",
            }
        )

        from_jid = jid.JID("user@example.org")
        to_email = "recipient@example.com"
        body = "Hello, world!"
        subject = "Test email"
        headers = HeadersData(keywords="important,urgent", urgency=Urgency.high)

        # Mock the smtp.sendmail function
        sendmail_mock = AsyncMock()
        monkeypatch.setattr("twisted.mail.smtp.sendmail", sendmail_mock)

        await email_gw.send_email(
            from_jid, to_email, body, subject, extra=SendMailExtra(headers=headers)
        )

        sendmail_mock.assert_called_once()

        # Extract the email content from the call arguments
        call_args = sendmail_mock.call_args[0]
        _, _, _, email_content_bytes = call_args

        # Parse the email content
        parser = BytesParser()
        msg = parser.parsebytes(email_content_bytes)

        # Assert the headers are correctly set
        assert msg["Keywords"] == headers.keywords
        assert msg["Importance"] == "high"

    @ed
    async def test_on_new_email_with_headers(self, email_gw):
        """Headers from the email are correctly processed and included in the message."""
        client = MagicMock()
        client.get_virtual_client = lambda __: client
        client.jid = jid.JID("gateway.example.org")
        client.sendMessage = AsyncMock()
        email_gw.client = client

        email = EmailMessage()
        email["from"] = formataddr(("User Name", "sender@somewhere.example"))
        email["to"] = "user@example.org"
        email.set_content("Hello, world!")
        email["Keywords"] = "test, example"
        email["Importance"] = "high"

        to_jid = jid.JID("gw-user@example.org")
        user_email = "user@example.org"
        user_data = UserData(Credentials({"user_email": user_email}))
        await email_gw.on_new_email(user_data, to_jid, email)

        client.sendMessage.assert_called_once_with(
            to_jid,
            {"": "Hello, world!\n"},
            None,
            extra={"headers": {"keywords": "test, example", "urgency": "high"}},
        )


class TestPubsubService:

    @pytest.fixture
    def pubsub_service(self, email_gw, client):
        email_gw.client = client
        service = EmailGWPubsubService(email_gw)
        return service

    @pytest.fixture
    def pubsub_resource(self, pubsub_service):
        return pubsub_service.resource

    def test_getNodes(self, pubsub_resource, client):
        """XEP-0498 well-known node is returned."""
        requestor = client.jid
        service = client.pubsub_service
        nodeIdentifier = "test_node"
        result = pubsub_resource.getNodes(requestor, service, nodeIdentifier)
        assert result.result == [pubsub_resource._pfs.namespace]

    @ed
    async def test_items(self, pubsub_resource, client, host):
        """Items are retrieved from the storage"""
        request = MagicMock()
        request.sender = client.jid
        request.nodeIdentifier = pubsub_resource._pfs.namespace
        files = [
            {"id": "1", "name": "file1", "media_type": "application", "media_subtype":
             "octet-stream", "size": 123, "hash_algo": "sha-256", "file_hash": "0123456789abcdef",
             "created": 123},
            {"id": "2", "name": "file2", "media_type": "application", "media_subtype":
             "octet-stream", "size": 456, "hash_algo": "sha-256", "file_hash": "0123456789abcdef",
             "created": 123},
        ]
        host.memory.get_files = AsyncMock(return_value=files)
        result, _ = await pubsub_resource.items(request)
        assert len(result) == 2

    @ed
    async def test_retract(self, pubsub_resource, client, host):
        """Items are retracted from the storage"""
        request = MagicMock()
        request.sender = client.jid
        request.nodeIdentifier = pubsub_resource._pfs.namespace
        request.itemIdentifiers = ["item_1"]
        host.memory.file_delete = AsyncMock()
        await pubsub_resource.retract(request)
        host.memory.file_delete.assert_called_once()

    def test_getConfigurationOptions(self, pubsub_resource):
        """Configuration options are returned"""
        options = pubsub_resource.getConfigurationOptions()
        assert options == NODE_OPTIONS

    def test_getConfiguration(self, pubsub_resource, client):
        """Configuration values are returned"""
        requestor = client.jid
        service = client.pubsub_service
        nodeIdentifier = "test_node"
        result = pubsub_resource.getConfiguration(requestor, service, nodeIdentifier)
        assert result.result == NODE_CONFIG_VALUES

    def test_getNodeInfo(self, pubsub_resource, client):
        """Node information is returned"""
        requestor = client.jid
        service = client.pubsub_service
        nodeIdentifier = pubsub_resource._pfs.namespace
        info = pubsub_resource.getNodeInfo(requestor, service, nodeIdentifier)
        assert info == {"type": "leaf", "meta-data": NODE_CONFIG}

    @ed
    async def test_getDiscoInfo(self, pubsub_service, client):
        """Disco information is returned"""
        requestor = client.jid
        target = client.pubsub_service
        nodeIdentifier = ""
        result = await pubsub_service.getDiscoInfo(requestor, target, nodeIdentifier)
        assert len(result) > 0
        assert any(isinstance(info, disco.DiscoFeature) for info in result)