Mercurial > libervia-backend
view tests/unit/test_email_gateway.py @ 4340:ea72364131d5 default tip @
doc (components): Update Email Gateway documentation:
A section has been added to explain how attachments are handled.
fix 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:53:18 +0100 |
parents | 699aa8788d98 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from email.message import EmailMessage from email.parser import BytesParser from email.utils import formataddr from unittest.mock import AsyncMock, MagicMock import pytest from pytest_twisted import ensureDeferred as ed from twisted.words.protocols.jabber import jid from wokkel import disco from libervia.backend.plugins.plugin_comp_email_gateway import ( EmailGatewayComponent, SendMailExtra, ) from libervia.backend.plugins.plugin_comp_email_gateway.models import ( Credentials, UserData, ) from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import ( EmailGWPubsubService, NODE_CONFIG, NODE_CONFIG_VALUES, NODE_OPTIONS, ) from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency @pytest.fixture def email_gw(host): email_gw = EmailGatewayComponent(host) email_gw.storage = MagicMock() email_gw.users_data = {} return email_gw class TestEmailGatewayComponent: def test_jid_to_email_gateway_jid(self, email_gw): """JID from the gateway is converted to an email address.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") address_jid = jid.JID(r"user\40some-email-domain.example@gateway.example.org") credentials = {"user_email": "user@example.org"} result = email_gw.jid_to_email(client, address_jid, credentials) assert result == "user@some-email-domain.example" def test_jid_to_email_non_gateway_jid(self, email_gw): """Non-gateway JID is converted to an email address with ``xmpp:``.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") address_jid = jid.JID("external@example.org") credentials = {"user_email": "user@example.org"} result = email_gw.jid_to_email(client, address_jid, credentials) assert result == '"xmpp:external@example.org" <user@example.org>' def test_email_to_jid_user_email(self, email_gw): """User email returns user JID if no "xmpp:" scheme is used.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") email_name = "" email_addr = "user@example.org" user_email = "user@example.org" user_jid = jid.JID("gw-user@example.org") result, name = email_gw.email_to_jid( client, user_email, user_jid, email_name, email_addr ) assert result == jid.JID("gw-user@example.org") assert name is None def test_email_to_jid_xmpp_address(self, email_gw): """Email address with XMPP in name part is converted to a JID.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") email_name = "xmpp:user@example.net" email_addr = "user@example.org" user_email = "user@example.org" user_jid = jid.JID("gw-user@example.org") result, name = email_gw.email_to_jid( client, user_email, user_jid, email_name, email_addr ) assert result == jid.JID("user@example.net") assert name is None def test_email_to_jid_regular_email(self, email_gw): """Regular email address is converted to a JID with escaped local part.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") email_name = "User Name" email_addr = "user@some-email-domain.example" user_email = "user@example.org" user_jid = jid.JID("gw-user@example.org") result, name = email_gw.email_to_jid( client, user_email, user_jid, email_name, email_addr ) assert result == jid.JID(r"user\40some-email-domain.example@gateway.example.org") assert name == "User Name" @ed async def test_on_new_email_single_recipient(self, email_gw): """Email with a single recipient is correctly processed and sent as a message.""" client = MagicMock() client.get_virtual_client = lambda __: client client.jid = jid.JID("gateway.example.org") client.sendMessage = AsyncMock() email_gw.client = client email = EmailMessage() email["from"] = formataddr(("User Name", "sender@somewhere.example")) email["to"] = "user@example.org" email.set_content("Hello, world!") to_jid = jid.JID("gw-user@example.org") user_email = "user@example.org" user_data = UserData(Credentials({"user_email": user_email})) await email_gw.on_new_email(user_data, to_jid, email) client.sendMessage.assert_called_once() call_args = client.sendMessage.call_args[0] assert call_args[0] == to_jid assert call_args[1] == {"": "Hello, world!\n"} assert call_args[2] == None client.sendMessage.assert_called_once_with( to_jid, {"": "Hello, world!\n"}, None, extra={} ) @ed async def test_on_new_email_multiple_recipients(self, email_gw): """Email with multiple recipients is correctly processed and sent .""" client = MagicMock() client.get_virtual_client = lambda __: client client.jid = jid.JID("gateway.example.org") client.sendMessage = AsyncMock() email_gw.client = client email = EmailMessage() email["from"] = formataddr(("User Name", "user@example.org")) email["to"] = "user@example.org" email["cc"] = "recipient2@example.org" email["bcc"] = "recipient3@example.org" email.set_content("Hello, world!") user_jid = jid.JID("gw-user@example.org") user_email = "user@example.org" user_data = UserData(Credentials({"user_email": user_email})) await email_gw.on_new_email(user_data, user_jid, email) client.sendMessage.assert_called_once_with( user_jid, {"": "Hello, world!\n"}, None, extra={ "addresses": { "to": [{"jid": "gw-user@example.org", "delivered": True}], "cc": [ { "jid": "recipient2\\40example.org@gateway.example.org", "delivered": True, } ], "bcc": [ { "jid": "recipient3\\40example.org@gateway.example.org", "delivered": True, } ], } }, ) @ed async def test_send_email_with_headers(self, email_gw, monkeypatch): """Email is sent with correct headers.""" email_gw.client = MagicMock() email_gw.client.jid = jid.JID("gateway.example.org") email_gw.storage = MagicMock() email_gw.storage.get = AsyncMock( return_value={ "user_email": "user@example.org", "user_name": "Sender Name", "smtp_host": "smtp.example.org", "smtp_port": "587", "smtp_username": "sender", "smtp_password": "password", } ) from_jid = jid.JID("user@example.org") to_email = "recipient@example.com" body = "Hello, world!" subject = "Test email" headers = HeadersData(keywords="important,urgent", urgency=Urgency.high) # Mock the smtp.sendmail function sendmail_mock = AsyncMock() monkeypatch.setattr("twisted.mail.smtp.sendmail", sendmail_mock) await email_gw.send_email( from_jid, to_email, body, subject, extra=SendMailExtra(headers=headers) ) sendmail_mock.assert_called_once() # Extract the email content from the call arguments call_args = sendmail_mock.call_args[0] _, _, _, email_content_bytes = call_args # Parse the email content parser = BytesParser() msg = parser.parsebytes(email_content_bytes) # Assert the headers are correctly set assert msg["Keywords"] == headers.keywords assert msg["Importance"] == "high" @ed async def test_on_new_email_with_headers(self, email_gw): """Headers from the email are correctly processed and included in the message.""" client = MagicMock() client.get_virtual_client = lambda __: client client.jid = jid.JID("gateway.example.org") client.sendMessage = AsyncMock() email_gw.client = client email = EmailMessage() email["from"] = formataddr(("User Name", "sender@somewhere.example")) email["to"] = "user@example.org" email.set_content("Hello, world!") email["Keywords"] = "test, example" email["Importance"] = "high" to_jid = jid.JID("gw-user@example.org") user_email = "user@example.org" user_data = UserData(Credentials({"user_email": user_email})) await email_gw.on_new_email(user_data, to_jid, email) client.sendMessage.assert_called_once_with( to_jid, {"": "Hello, world!\n"}, None, extra={"headers": {"keywords": "test, example", "urgency": "high"}}, ) class TestPubsubService: @pytest.fixture def pubsub_service(self, email_gw, client): email_gw.client = client service = EmailGWPubsubService(email_gw) return service @pytest.fixture def pubsub_resource(self, pubsub_service): return pubsub_service.resource def test_getNodes(self, pubsub_resource, client): """XEP-0498 well-known node is returned.""" requestor = client.jid service = client.pubsub_service nodeIdentifier = "test_node" result = pubsub_resource.getNodes(requestor, service, nodeIdentifier) assert result.result == [pubsub_resource._pfs.namespace] @ed async def test_items(self, pubsub_resource, client, host): """Items are retrieved from the storage""" request = MagicMock() request.sender = client.jid request.nodeIdentifier = pubsub_resource._pfs.namespace files = [ {"id": "1", "name": "file1", "media_type": "application", "media_subtype": "octet-stream", "size": 123, "hash_algo": "sha-256", "file_hash": "0123456789abcdef", "created": 123}, {"id": "2", "name": "file2", "media_type": "application", "media_subtype": "octet-stream", "size": 456, "hash_algo": "sha-256", "file_hash": "0123456789abcdef", "created": 123}, ] host.memory.get_files = AsyncMock(return_value=files) result, _ = await pubsub_resource.items(request) assert len(result) == 2 @ed async def test_retract(self, pubsub_resource, client, host): """Items are retracted from the storage""" request = MagicMock() request.sender = client.jid request.nodeIdentifier = pubsub_resource._pfs.namespace request.itemIdentifiers = ["item_1"] host.memory.file_delete = AsyncMock() await pubsub_resource.retract(request) host.memory.file_delete.assert_called_once() def test_getConfigurationOptions(self, pubsub_resource): """Configuration options are returned""" options = pubsub_resource.getConfigurationOptions() assert options == NODE_OPTIONS def test_getConfiguration(self, pubsub_resource, client): """Configuration values are returned""" requestor = client.jid service = client.pubsub_service nodeIdentifier = "test_node" result = pubsub_resource.getConfiguration(requestor, service, nodeIdentifier) assert result.result == NODE_CONFIG_VALUES def test_getNodeInfo(self, pubsub_resource, client): """Node information is returned""" requestor = client.jid service = client.pubsub_service nodeIdentifier = pubsub_resource._pfs.namespace info = pubsub_resource.getNodeInfo(requestor, service, nodeIdentifier) assert info == {"type": "leaf", "meta-data": NODE_CONFIG} @ed async def test_getDiscoInfo(self, pubsub_service, client): """Disco information is returned""" requestor = client.jid target = client.pubsub_service nodeIdentifier = "" result = await pubsub_service.getDiscoInfo(requestor, target, nodeIdentifier) assert len(result) > 0 assert any(isinstance(info, disco.DiscoFeature) for info in result)