Mercurial > libervia-backend
annotate tests/unit/test_plugin_xep_0272.py @ 4294:a0ed5c976bf8
component conferences, plugin XEP-0167, XEP-0298: add stream user metadata:
A/V conference now adds user metadata about the stream it is forwarding through XEP-0298.
This is parsed and added to metadata during confirmation on client side.
rel 448
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 06 Aug 2024 23:43:11 +0200 |
parents | f1d0cde61af7 |
children |
rev | line source |
---|---|
4246 | 1 #!/usr/bin/env python3 |
2 | |
3 # Libervia: an XMPP client | |
4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from unittest.mock import MagicMock, patch | |
20 from pytest_twisted import ensureDeferred as ed | |
21 | |
22 from pytest import fixture | |
23 from twisted.internet import defer | |
24 from twisted.words.protocols.jabber import jid | |
25 from twisted.words.xish import domish | |
26 | |
27 from libervia.backend.plugins.plugin_xep_0272 import ( | |
28 NS_MUJI, | |
29 PRESENCE_MUJI, | |
30 XEP_0272, | |
31 XEP_0272_handler, | |
32 ) | |
33 from libervia.backend.tools.common import data_format | |
34 | |
35 TEST_ROOM_JID = jid.JID("room@example.com/user") | |
36 | |
37 | |
38 @fixture | |
39 def xep_0272(host) -> XEP_0272: | |
40 """Fixture for initializing XEP-0272 plugin.""" | |
41 host.plugins = { | |
42 "XEP-0045": MagicMock(), | |
43 "XEP-0166": MagicMock(), | |
44 "XEP-0167": MagicMock(), | |
45 "XEP-0249": MagicMock(), | |
46 } | |
47 return XEP_0272(host) | |
48 | |
49 | |
50 class TestXEP0272: | |
51 def test_get_handler(self, host, client): | |
52 """XEP_0272_handler is instantiated and returned.""" | |
53 xep_0272 = XEP_0272(host) | |
54 handler = xep_0272.get_handler(client) | |
55 assert isinstance(handler, XEP_0272_handler) | |
56 assert handler.plugin_parent == xep_0272 | |
57 | |
58 def test_on_muji_request_preparing_state(self, host, client): | |
59 """try_to_finish_preparation is called when preparing_state is True.""" | |
60 xep_0272 = XEP_0272(host) | |
61 presence_elt = domish.Element((None, "presence")) | |
62 presence_elt["from"] = TEST_ROOM_JID.full() | |
63 muji_elt = presence_elt.addElement((NS_MUJI, "muji")) | |
64 muji_elt.addElement("preparing") | |
65 | |
4285
f1d0cde61af7
tests (unit): fix tests + black reformatting.
Goffi <goffi@goffi.org>
parents:
4246
diff
changeset
|
66 muji_data = {"preparing_jids": set()} |
f1d0cde61af7
tests (unit): fix tests + black reformatting.
Goffi <goffi@goffi.org>
parents:
4246
diff
changeset
|
67 with patch.object(xep_0272._muc, "get_room_user_jid", return_value=TEST_ROOM_JID): |
4246 | 68 with patch.object(xep_0272, "get_muji_data", return_value=muji_data): |
69 with patch.object( | |
70 xep_0272, "try_to_finish_preparation" | |
71 ) as mock_try_to_finish_preparation: | |
72 xep_0272.on_muji_request(presence_elt, client) | |
73 mock_try_to_finish_preparation.assert_called_once() | |
74 | |
75 def test_on_muji_request_not_preparing_state(self, host, client): | |
76 """try_to_finish_preparation is not called when preparing_state is False.""" | |
77 xep_0272 = XEP_0272(host) | |
78 presence_elt = domish.Element((None, "presence")) | |
79 presence_elt["from"] = "room@example.com/user" | |
80 presence_elt.addElement((NS_MUJI, "muji")) | |
81 | |
4285
f1d0cde61af7
tests (unit): fix tests + black reformatting.
Goffi <goffi@goffi.org>
parents:
4246
diff
changeset
|
82 muji_data = {"done_preparing": True, "preparing_jids": set(), "to_call": set()} |
4246 | 83 with patch.object(xep_0272, "get_muji_data", return_value=muji_data): |
84 with patch.object( | |
85 xep_0272, "try_to_finish_preparation" | |
86 ) as mock_try_to_finish_preparation: | |
87 xep_0272.on_muji_request(presence_elt, client) | |
88 mock_try_to_finish_preparation.assert_not_called() | |
89 | |
90 def test_on_muji_request_own_jid(self, host, client): | |
91 """try_to_finish_preparation is not called when the presence is from own JID.""" | |
92 xep_0272 = XEP_0272(host) | |
93 presence_elt = domish.Element((None, "presence")) | |
94 presence_elt["from"] = "room@example.com/user" | |
95 presence_elt.addElement((NS_MUJI, "muji")) | |
96 | |
97 client.jid = jid.JID("room@example.com/user") | |
98 with patch.object( | |
99 xep_0272, "try_to_finish_preparation" | |
100 ) as mock_try_to_finish_preparation: | |
101 xep_0272.on_muji_request(presence_elt, client) | |
102 mock_try_to_finish_preparation.assert_not_called() | |
103 | |
104 def test_try_to_finish_preparation(self, host, client): | |
105 """try_to_finish_preparation sets done_preparing to True.""" | |
106 xep_0272 = XEP_0272(host) | |
107 room = MagicMock() | |
108 muji_data = {"preparing_jids": set(), "to_call": set()} | |
109 | |
110 with patch.object(xep_0272, "get_muji_data", return_value=muji_data): | |
111 xep_0272.try_to_finish_preparation(client, room, muji_data) | |
112 assert muji_data["done_preparing"] is True | |
113 | |
114 @ed | |
115 async def test_call_group_data_set(self, host, client): | |
116 """call_group_data_set sends correct presence and muji data.""" | |
117 xep_0272 = XEP_0272(host) | |
118 room_jid = jid.JID("room@example.com") | |
119 call_data = {"sdp": "sdp_data"} | |
120 | |
121 with patch.object( | |
122 xep_0272, | |
123 "generate_presence_and_muji", | |
124 return_value=(MagicMock(), MagicMock()), | |
125 ): | |
126 with patch.object( | |
127 client, "a_send", return_value=defer.succeed(None) | |
128 ) as mock_a_send: | |
129 await xep_0272.call_group_data_set(client, room_jid, call_data) | |
130 mock_a_send.assert_called() | |
131 | |
132 @ed | |
133 async def test_start_preparation(self, host, client): | |
134 """start_preparation sends correct presence and muji data.""" | |
135 xep_0272 = XEP_0272(host) | |
136 room = MagicMock() | |
137 | |
138 with patch.object( | |
139 xep_0272, | |
140 "generate_presence_and_muji", | |
141 return_value=(MagicMock(), MagicMock()), | |
142 ): | |
143 with patch.object( | |
144 client, "a_send", return_value=defer.succeed(None) | |
145 ) as mock_a_send: | |
146 await xep_0272.start_preparation(client, room) | |
147 mock_a_send.assert_called() | |
148 | |
149 | |
150 class TestXEP0272Handler: | |
151 def test_connectionInitialized(self, host, client): | |
152 """connectionInitialized adds MUJI presence observer.""" | |
153 xep_0272 = XEP_0272(host) | |
154 handler = XEP_0272_handler(xep_0272) | |
155 handler.parent = MagicMock() | |
156 handler.xmlstream = MagicMock() | |
157 | |
158 with patch.object(handler.xmlstream, "addObserver") as mock_addObserver: | |
159 handler.connectionInitialized() | |
160 mock_addObserver.assert_called_once_with( | |
161 PRESENCE_MUJI, xep_0272.on_muji_request, client=handler.parent | |
162 ) | |
163 | |
164 def test_getDiscoInfo(self): | |
165 """getDiscoInfo returns MUJI feature.""" | |
166 handler = XEP_0272_handler(MagicMock()) | |
167 info = handler.getDiscoInfo(MagicMock(), MagicMock()) | |
168 assert len(info) == 1 | |
169 assert info[0] == NS_MUJI | |
170 | |
171 def test_getDiscoItems(self): | |
172 """getDiscoItems returns empty list.""" | |
173 handler = XEP_0272_handler(MagicMock()) | |
174 items = handler.getDiscoItems(MagicMock(), MagicMock()) | |
175 assert items == [] |