Mercurial > libervia-backend
view tests/unit/test_email_gateway.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | 699aa8788d98 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from email.message import EmailMessage from email.parser import BytesParser from email.utils import formataddr from unittest.mock import AsyncMock, MagicMock import pytest from pytest_twisted import ensureDeferred as ed from twisted.words.protocols.jabber import jid from wokkel import disco from libervia.backend.plugins.plugin_comp_email_gateway import ( EmailGatewayComponent, SendMailExtra, ) from libervia.backend.plugins.plugin_comp_email_gateway.models import ( Credentials, UserData, ) from libervia.backend.plugins.plugin_comp_email_gateway.pubsub_service import ( EmailGWPubsubService, NODE_CONFIG, NODE_CONFIG_VALUES, NODE_OPTIONS, ) from libervia.backend.plugins.plugin_xep_0131 import HeadersData, Urgency @pytest.fixture def email_gw(host): email_gw = EmailGatewayComponent(host) email_gw.storage = MagicMock() email_gw.users_data = {} return email_gw class TestEmailGatewayComponent: def test_jid_to_email_gateway_jid(self, email_gw): """JID from the gateway is converted to an email address.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") address_jid = jid.JID(r"user\40some-email-domain.example@gateway.example.org") credentials = {"user_email": "user@example.org"} result = email_gw.jid_to_email(client, address_jid, credentials) assert result == "user@some-email-domain.example" def test_jid_to_email_non_gateway_jid(self, email_gw): """Non-gateway JID is converted to an email address with ``xmpp:``.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") address_jid = jid.JID("external@example.org") credentials = {"user_email": "user@example.org"} result = email_gw.jid_to_email(client, address_jid, credentials) assert result == '"xmpp:external@example.org" <user@example.org>' def test_email_to_jid_user_email(self, email_gw): """User email returns user JID if no "xmpp:" scheme is used.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") email_name = "" email_addr = "user@example.org" user_email = "user@example.org" user_jid = jid.JID("gw-user@example.org") result, name = email_gw.email_to_jid( client, user_email, user_jid, email_name, email_addr ) assert result == jid.JID("gw-user@example.org") assert name is None def test_email_to_jid_xmpp_address(self, email_gw): """Email address with XMPP in name part is converted to a JID.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") email_name = "xmpp:user@example.net" email_addr = "user@example.org" user_email = "user@example.org" user_jid = jid.JID("gw-user@example.org") result, name = email_gw.email_to_jid( client, user_email, user_jid, email_name, email_addr ) assert result == jid.JID("user@example.net") assert name is None def test_email_to_jid_regular_email(self, email_gw): """Regular email address is converted to a JID with escaped local part.""" client = MagicMock() client.jid = jid.JID("gateway.example.org") email_name = "User Name" email_addr = "user@some-email-domain.example" user_email = "user@example.org" user_jid = jid.JID("gw-user@example.org") result, name = email_gw.email_to_jid( client, user_email, user_jid, email_name, email_addr ) assert result == jid.JID(r"user\40some-email-domain.example@gateway.example.org") assert name == "User Name" @ed async def test_on_new_email_single_recipient(self, email_gw): """Email with a single recipient is correctly processed and sent as a message.""" client = MagicMock() client.get_virtual_client = lambda __: client client.jid = jid.JID("gateway.example.org") client.sendMessage = AsyncMock() email_gw.client = client email = EmailMessage() email["from"] = formataddr(("User Name", "sender@somewhere.example")) email["to"] = "user@example.org" email.set_content("Hello, world!") to_jid = jid.JID("gw-user@example.org") user_email = "user@example.org" user_data = UserData(Credentials({"user_email": user_email})) await email_gw.on_new_email(user_data, to_jid, email) client.sendMessage.assert_called_once() call_args = client.sendMessage.call_args[0] assert call_args[0] == to_jid assert call_args[1] == {"": "Hello, world!\n"} assert call_args[2] == None client.sendMessage.assert_called_once_with( to_jid, {"": "Hello, world!\n"}, None, extra={} ) @ed async def test_on_new_email_multiple_recipients(self, email_gw): """Email with multiple recipients is correctly processed and sent .""" client = MagicMock() client.get_virtual_client = lambda __: client client.jid = jid.JID("gateway.example.org") client.sendMessage = AsyncMock() email_gw.client = client email = EmailMessage() email["from"] = formataddr(("User Name", "user@example.org")) email["to"] = "user@example.org" email["cc"] = "recipient2@example.org" email["bcc"] = "recipient3@example.org" email.set_content("Hello, world!") user_jid = jid.JID("gw-user@example.org") user_email = "user@example.org" user_data = UserData(Credentials({"user_email": user_email})) await email_gw.on_new_email(user_data, user_jid, email) client.sendMessage.assert_called_once_with( user_jid, {"": "Hello, world!\n"}, None, extra={ "addresses": { "to": [{"jid": "gw-user@example.org", "delivered": True}], "cc": [ { "jid": "recipient2\\40example.org@gateway.example.org", "delivered": True, } ], "bcc": [ { "jid": "recipient3\\40example.org@gateway.example.org", "delivered": True, } ], } }, ) @ed async def test_send_email_with_headers(self, email_gw, monkeypatch): """Email is sent with correct headers.""" email_gw.client = MagicMock() email_gw.client.jid = jid.JID("gateway.example.org") email_gw.storage = MagicMock() email_gw.storage.get = AsyncMock( return_value={ "user_email": "user@example.org", "user_name": "Sender Name", "smtp_host": "smtp.example.org", "smtp_port": "587", "smtp_username": "sender", "smtp_password": "password", } ) from_jid = jid.JID("user@example.org") to_email = "recipient@example.com" body = "Hello, world!" subject = "Test email" headers = HeadersData(keywords="important,urgent", urgency=Urgency.high) # Mock the smtp.sendmail function sendmail_mock = AsyncMock() monkeypatch.setattr("twisted.mail.smtp.sendmail", sendmail_mock) await email_gw.send_email( from_jid, to_email, body, subject, extra=SendMailExtra(headers=headers) ) sendmail_mock.assert_called_once() # Extract the email content from the call arguments call_args = sendmail_mock.call_args[0] _, _, _, email_content_bytes = call_args # Parse the email content parser = BytesParser() msg = parser.parsebytes(email_content_bytes) # Assert the headers are correctly set assert msg["Keywords"] == headers.keywords assert msg["Importance"] == "high" @ed async def test_on_new_email_with_headers(self, email_gw): """Headers from the email are correctly processed and included in the message.""" client = MagicMock() client.get_virtual_client = lambda __: client client.jid = jid.JID("gateway.example.org") client.sendMessage = AsyncMock() email_gw.client = client email = EmailMessage() email["from"] = formataddr(("User Name", "sender@somewhere.example")) email["to"] = "user@example.org" email.set_content("Hello, world!") email["Keywords"] = "test, example" email["Importance"] = "high" to_jid = jid.JID("gw-user@example.org") user_email = "user@example.org" user_data = UserData(Credentials({"user_email": user_email})) await email_gw.on_new_email(user_data, to_jid, email) client.sendMessage.assert_called_once_with( to_jid, {"": "Hello, world!\n"}, None, extra={"headers": {"keywords": "test, example", "urgency": "high"}}, ) class TestPubsubService: @pytest.fixture def pubsub_service(self, email_gw, client): email_gw.client = client service = EmailGWPubsubService(email_gw) return service @pytest.fixture def pubsub_resource(self, pubsub_service): return pubsub_service.resource def test_getNodes(self, pubsub_resource, client): """XEP-0498 well-known node is returned.""" requestor = client.jid service = client.pubsub_service nodeIdentifier = "test_node" result = pubsub_resource.getNodes(requestor, service, nodeIdentifier) assert result.result == [pubsub_resource._pfs.namespace] @ed async def test_items(self, pubsub_resource, client, host): """Items are retrieved from the storage""" request = MagicMock() request.sender = client.jid request.nodeIdentifier = pubsub_resource._pfs.namespace files = [ {"id": "1", "name": "file1", "media_type": "application", "media_subtype": "octet-stream", "size": 123, "hash_algo": "sha-256", "file_hash": "0123456789abcdef", "created": 123}, {"id": "2", "name": "file2", "media_type": "application", "media_subtype": "octet-stream", "size": 456, "hash_algo": "sha-256", "file_hash": "0123456789abcdef", "created": 123}, ] host.memory.get_files = AsyncMock(return_value=files) result, _ = await pubsub_resource.items(request) assert len(result) == 2 @ed async def test_retract(self, pubsub_resource, client, host): """Items are retracted from the storage""" request = MagicMock() request.sender = client.jid request.nodeIdentifier = pubsub_resource._pfs.namespace request.itemIdentifiers = ["item_1"] host.memory.file_delete = AsyncMock() await pubsub_resource.retract(request) host.memory.file_delete.assert_called_once() def test_getConfigurationOptions(self, pubsub_resource): """Configuration options are returned""" options = pubsub_resource.getConfigurationOptions() assert options == NODE_OPTIONS def test_getConfiguration(self, pubsub_resource, client): """Configuration values are returned""" requestor = client.jid service = client.pubsub_service nodeIdentifier = "test_node" result = pubsub_resource.getConfiguration(requestor, service, nodeIdentifier) assert result.result == NODE_CONFIG_VALUES def test_getNodeInfo(self, pubsub_resource, client): """Node information is returned""" requestor = client.jid service = client.pubsub_service nodeIdentifier = pubsub_resource._pfs.namespace info = pubsub_resource.getNodeInfo(requestor, service, nodeIdentifier) assert info == {"type": "leaf", "meta-data": NODE_CONFIG} @ed async def test_getDiscoInfo(self, pubsub_service, client): """Disco information is returned""" requestor = client.jid target = client.pubsub_service nodeIdentifier = "" result = await pubsub_service.getDiscoInfo(requestor, target, nodeIdentifier) assert len(result) > 0 assert any(isinstance(info, disco.DiscoFeature) for info in result)