diff tests/unit/test_email_gateway.py @ 4318:27bb22eace65

tests (unit/email gateway): add test for XEP-0131 handling: rel 451
author Goffi <goffi@goffi.org>
date Sat, 28 Sep 2024 15:59:48 +0200
parents d27228b3c704
children
line wrap: on
line diff
--- a/tests/unit/test_email_gateway.py	Sat Sep 28 15:59:12 2024 +0200
+++ b/tests/unit/test_email_gateway.py	Sat Sep 28 15:59:48 2024 +0200
@@ -16,17 +16,24 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from email.message import EmailMessage
+from email.parser import BytesParser
 from email.utils import formataddr
 from unittest.mock import AsyncMock, MagicMock
+
 import pytest
 from pytest_twisted import ensureDeferred as ed
 from twisted.words.protocols.jabber import jid
-from email.message import EmailMessage
-from libervia.backend.plugins.plugin_comp_email_gateway import EmailGatewayComponent
+
+from libervia.backend.plugins.plugin_comp_email_gateway import (
+    EmailGatewayComponent,
+    SendMailExtra,
+)
 from libervia.backend.plugins.plugin_comp_email_gateway.models import (
     Credentials,
     UserData,
 )
+from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency
 
 
 class TestEmailGatewayComponent:
@@ -122,7 +129,7 @@
         assert call_args[1] == {"": "Hello, world!\n"}
         assert call_args[2] == None
         client.sendMessage.assert_called_once_with(
-            to_jid, {"": "Hello, world!\n"}, None, extra=None
+            to_jid, {"": "Hello, world!\n"}, None, extra={}
         )
 
     @ed
@@ -168,3 +175,76 @@
                 }
             },
         )
+
+    @ed
+    async def test_send_email_with_headers(self, email_gw, monkeypatch):
+        """Email is sent with correct headers."""
+        email_gw.client = MagicMock()
+        email_gw.client.jid = jid.JID("gateway.example.org")
+        email_gw.storage = MagicMock()
+        email_gw.storage.get = AsyncMock(
+            return_value={
+                "user_email": "user@example.org",
+                "user_name": "Sender Name",
+                "smtp_host": "smtp.example.org",
+                "smtp_port": "587",
+                "smtp_username": "sender",
+                "smtp_password": "password",
+            }
+        )
+
+        from_jid = jid.JID("user@example.org")
+        to_email = "recipient@example.com"
+        body = "Hello, world!"
+        subject = "Test email"
+        headers = HeadersData(keywords="important,urgent", urgency=Urgency.high)
+
+        # Mock the smtp.sendmail function
+        sendmail_mock = AsyncMock()
+        monkeypatch.setattr("twisted.mail.smtp.sendmail", sendmail_mock)
+
+        await email_gw.send_email(
+            from_jid, to_email, body, subject, extra=SendMailExtra(headers=headers)
+        )
+
+        sendmail_mock.assert_called_once()
+
+        # Extract the email content from the call arguments
+        call_args = sendmail_mock.call_args[0]
+        _, _, _, email_content_bytes = call_args
+
+        # Parse the email content
+        parser = BytesParser()
+        msg = parser.parsebytes(email_content_bytes)
+
+        # Assert the headers are correctly set
+        assert msg["Keywords"] == headers.keywords
+        assert msg["Importance"] == "high"
+
+    @ed
+    async def test_on_new_email_with_headers(self, email_gw):
+        """Headers from the email are correctly processed and included in the message."""
+        client = MagicMock()
+        client.get_virtual_client = lambda __: client
+        client.jid = jid.JID("gateway.example.org")
+        client.sendMessage = AsyncMock()
+        email_gw.client = client
+
+        email = EmailMessage()
+        email["from"] = formataddr(("User Name", "sender@somewhere.example"))
+        email["to"] = "user@example.org"
+        email.set_content("Hello, world!")
+        email["Keywords"] = "test, example"
+        email["Importance"] = "high"
+
+        to_jid = jid.JID("gw-user@example.org")
+        user_email = "user@example.org"
+        user_data = UserData(Credentials({"user_email": user_email}))
+        await email_gw.on_new_email(user_data, to_jid, email)
+
+        client.sendMessage.assert_called_once_with(
+            to_jid,
+            {"": "Hello, world!\n"},
+            None,
+            extra={"headers": {"keywords": "test, example", "urgency": "high"}},
+        )