Mercurial > libervia-backend
annotate tests/unit/test_plugin_xep_0272.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | f1d0cde61af7 |
children |
rev | line source |
---|---|
4246 | 1 #!/usr/bin/env python3 |
2 | |
3 # Libervia: an XMPP client | |
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from unittest.mock import MagicMock, patch | |
20 from pytest_twisted import ensureDeferred as ed | |
21 | |
22 from pytest import fixture | |
23 from twisted.internet import defer | |
24 from twisted.words.protocols.jabber import jid | |
25 from twisted.words.xish import domish | |
26 | |
27 from libervia.backend.plugins.plugin_xep_0272 import ( | |
28 NS_MUJI, | |
29 PRESENCE_MUJI, | |
30 XEP_0272, | |
31 XEP_0272_handler, | |
32 ) | |
33 from libervia.backend.tools.common import data_format | |
34 | |
35 TEST_ROOM_JID = jid.JID("room@example.com/user") | |
36 | |
37 | |
38 @fixture | |
39 def xep_0272(host) -> XEP_0272: | |
40 """Fixture for initializing XEP-0272 plugin.""" | |
41 host.plugins = { | |
42 "XEP-0045": MagicMock(), | |
43 "XEP-0166": MagicMock(), | |
44 "XEP-0167": MagicMock(), | |
45 "XEP-0249": MagicMock(), | |
46 } | |
47 return XEP_0272(host) | |
48 | |
49 | |
50 class TestXEP0272: | |
51 def test_get_handler(self, host, client): | |
52 """XEP_0272_handler is instantiated and returned.""" | |
53 xep_0272 = XEP_0272(host) | |
54 handler = xep_0272.get_handler(client) | |
55 assert isinstance(handler, XEP_0272_handler) | |
56 assert handler.plugin_parent == xep_0272 | |
57 | |
58 def test_on_muji_request_preparing_state(self, host, client): | |
59 """try_to_finish_preparation is called when preparing_state is True.""" | |
60 xep_0272 = XEP_0272(host) | |
61 presence_elt = domish.Element((None, "presence")) | |
62 presence_elt["from"] = TEST_ROOM_JID.full() | |
63 muji_elt = presence_elt.addElement((NS_MUJI, "muji")) | |
64 muji_elt.addElement("preparing") | |
65 | |
4285
f1d0cde61af7
tests (unit): fix tests + black reformatting.
Goffi <goffi@goffi.org>
parents:
4246
diff
changeset
|
66 muji_data = {"preparing_jids": set()} |
f1d0cde61af7
tests (unit): fix tests + black reformatting.
Goffi <goffi@goffi.org>
parents:
4246
diff
changeset
|
67 with patch.object(xep_0272._muc, "get_room_user_jid", return_value=TEST_ROOM_JID): |
4246 | 68 with patch.object(xep_0272, "get_muji_data", return_value=muji_data): |
69 with patch.object( | |
70 xep_0272, "try_to_finish_preparation" | |
71 ) as mock_try_to_finish_preparation: | |
72 xep_0272.on_muji_request(presence_elt, client) | |
73 mock_try_to_finish_preparation.assert_called_once() | |
74 | |
75 def test_on_muji_request_not_preparing_state(self, host, client): | |
76 """try_to_finish_preparation is not called when preparing_state is False.""" | |
77 xep_0272 = XEP_0272(host) | |
78 presence_elt = domish.Element((None, "presence")) | |
79 presence_elt["from"] = "room@example.com/user" | |
80 presence_elt.addElement((NS_MUJI, "muji")) | |
81 | |
4285
f1d0cde61af7
tests (unit): fix tests + black reformatting.
Goffi <goffi@goffi.org>
parents:
4246
diff
changeset
|
82 muji_data = {"done_preparing": True, "preparing_jids": set(), "to_call": set()} |
4246 | 83 with patch.object(xep_0272, "get_muji_data", return_value=muji_data): |
84 with patch.object( | |
85 xep_0272, "try_to_finish_preparation" | |
86 ) as mock_try_to_finish_preparation: | |
87 xep_0272.on_muji_request(presence_elt, client) | |
88 mock_try_to_finish_preparation.assert_not_called() | |
89 | |
90 def test_on_muji_request_own_jid(self, host, client): | |
91 """try_to_finish_preparation is not called when the presence is from own JID.""" | |
92 xep_0272 = XEP_0272(host) | |
93 presence_elt = domish.Element((None, "presence")) | |
94 presence_elt["from"] = "room@example.com/user" | |
95 presence_elt.addElement((NS_MUJI, "muji")) | |
96 | |
97 client.jid = jid.JID("room@example.com/user") | |
98 with patch.object( | |
99 xep_0272, "try_to_finish_preparation" | |
100 ) as mock_try_to_finish_preparation: | |
101 xep_0272.on_muji_request(presence_elt, client) | |
102 mock_try_to_finish_preparation.assert_not_called() | |
103 | |
104 def test_try_to_finish_preparation(self, host, client): | |
105 """try_to_finish_preparation sets done_preparing to True.""" | |
106 xep_0272 = XEP_0272(host) | |
107 room = MagicMock() | |
108 muji_data = {"preparing_jids": set(), "to_call": set()} | |
109 | |
110 with patch.object(xep_0272, "get_muji_data", return_value=muji_data): | |
111 xep_0272.try_to_finish_preparation(client, room, muji_data) | |
112 assert muji_data["done_preparing"] is True | |
113 | |
114 @ed | |
115 async def test_call_group_data_set(self, host, client): | |
116 """call_group_data_set sends correct presence and muji data.""" | |
117 xep_0272 = XEP_0272(host) | |
118 room_jid = jid.JID("room@example.com") | |
119 call_data = {"sdp": "sdp_data"} | |
120 | |
121 with patch.object( | |
122 xep_0272, | |
123 "generate_presence_and_muji", | |
124 return_value=(MagicMock(), MagicMock()), | |
125 ): | |
126 with patch.object( | |
127 client, "a_send", return_value=defer.succeed(None) | |
128 ) as mock_a_send: | |
129 await xep_0272.call_group_data_set(client, room_jid, call_data) | |
130 mock_a_send.assert_called() | |
131 | |
132 @ed | |
133 async def test_start_preparation(self, host, client): | |
134 """start_preparation sends correct presence and muji data.""" | |
135 xep_0272 = XEP_0272(host) | |
136 room = MagicMock() | |
137 | |
138 with patch.object( | |
139 xep_0272, | |
140 "generate_presence_and_muji", | |
141 return_value=(MagicMock(), MagicMock()), | |
142 ): | |
143 with patch.object( | |
144 client, "a_send", return_value=defer.succeed(None) | |
145 ) as mock_a_send: | |
146 await xep_0272.start_preparation(client, room) | |
147 mock_a_send.assert_called() | |
148 | |
149 | |
150 class TestXEP0272Handler: | |
151 def test_connectionInitialized(self, host, client): | |
152 """connectionInitialized adds MUJI presence observer.""" | |
153 xep_0272 = XEP_0272(host) | |
154 handler = XEP_0272_handler(xep_0272) | |
155 handler.parent = MagicMock() | |
156 handler.xmlstream = MagicMock() | |
157 | |
158 with patch.object(handler.xmlstream, "addObserver") as mock_addObserver: | |
159 handler.connectionInitialized() | |
160 mock_addObserver.assert_called_once_with( | |
161 PRESENCE_MUJI, xep_0272.on_muji_request, client=handler.parent | |
162 ) | |
163 | |
164 def test_getDiscoInfo(self): | |
165 """getDiscoInfo returns MUJI feature.""" | |
166 handler = XEP_0272_handler(MagicMock()) | |
167 info = handler.getDiscoInfo(MagicMock(), MagicMock()) | |
168 assert len(info) == 1 | |
169 assert info[0] == NS_MUJI | |
170 | |
171 def test_getDiscoItems(self): | |
172 """getDiscoItems returns empty list.""" | |
173 handler = XEP_0272_handler(MagicMock()) | |
174 items = handler.getDiscoItems(MagicMock(), MagicMock()) | |
175 assert items == [] |