Mercurial > libervia-backend
comparison libervia/backend/test/test_plugin_xep_0033.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/test_plugin_xep_0033.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) | |
7 | |
8 # This program is free software: you can redistribute it and/or modify | |
9 # it under the terms of the GNU Affero General Public License as published by | |
10 # the Free Software Foundation, either version 3 of the License, or | |
11 # (at your option) any later version. | |
12 | |
13 # This program is distributed in the hope that it will be useful, | |
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 # GNU Affero General Public License for more details. | |
17 | |
18 # You should have received a copy of the GNU Affero General Public License | |
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 | |
21 """ Plugin extended addressing stanzas """ | |
22 | |
23 from .constants import Const | |
24 from libervia.backend.test import helpers | |
25 from libervia.backend.plugins import plugin_xep_0033 as plugin | |
26 from libervia.backend.core.exceptions import CancelError | |
27 from twisted.internet import defer | |
28 from wokkel.generic import parseXml | |
29 from twisted.words.protocols.jabber.jid import JID | |
30 | |
31 PROFILE_INDEX = 0 | |
32 PROFILE = Const.PROFILE[PROFILE_INDEX] | |
33 JID_STR_FROM = Const.JID_STR[1] | |
34 JID_STR_TO = Const.PROFILE_DICT[PROFILE].host | |
35 JID_STR_X_TO = Const.JID_STR[0] | |
36 JID_STR_X_CC = Const.JID_STR[1] | |
37 JID_STR_X_BCC = Const.JID_STR[2] | |
38 | |
39 ADDRS = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC) | |
40 | |
41 | |
42 class XEP_0033Test(helpers.SatTestCase): | |
43 def setUp(self): | |
44 self.host = helpers.FakeSAT() | |
45 self.plugin = plugin.XEP_0033(self.host) | |
46 | |
47 def test_message_received(self): | |
48 self.host.memory.reinit() | |
49 xml = """ | |
50 <message type="chat" from="%s" to="%s" id="test_1"> | |
51 <body>test</body> | |
52 <addresses xmlns='http://jabber.org/protocol/address'> | |
53 <address type='to' jid='%s'/> | |
54 <address type='cc' jid='%s'/> | |
55 <address type='bcc' jid='%s'/> | |
56 </addresses> | |
57 </message> | |
58 """ % ( | |
59 JID_STR_FROM, | |
60 JID_STR_TO, | |
61 JID_STR_X_TO, | |
62 JID_STR_X_CC, | |
63 JID_STR_X_BCC, | |
64 ) | |
65 stanza = parseXml(xml.encode("utf-8")) | |
66 treatments = defer.Deferred() | |
67 self.plugin.message_received_trigger( | |
68 self.host.get_client(PROFILE), stanza, treatments | |
69 ) | |
70 data = {"extra": {}} | |
71 | |
72 def cb(data): | |
73 expected = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC) | |
74 msg = "Expected: %s\nGot: %s" % (expected, data["extra"]["addresses"]) | |
75 self.assertEqual( | |
76 data["extra"]["addresses"], "%s:%s\n%s:%s\n%s:%s\n" % expected, msg | |
77 ) | |
78 | |
79 treatments.addCallback(cb) | |
80 return treatments.callback(data) | |
81 | |
82 def _get_mess_data(self): | |
83 mess_data = { | |
84 "to": JID(JID_STR_TO), | |
85 "type": "chat", | |
86 "message": "content", | |
87 "extra": {}, | |
88 } | |
89 mess_data["extra"]["address"] = "%s:%s\n%s:%s\n%s:%s\n" % ADDRS | |
90 original_stanza = """ | |
91 <message type="chat" from="%s" to="%s" id="test_1"> | |
92 <body>content</body> | |
93 </message> | |
94 """ % ( | |
95 JID_STR_FROM, | |
96 JID_STR_TO, | |
97 ) | |
98 mess_data["xml"] = parseXml(original_stanza.encode("utf-8")) | |
99 return mess_data | |
100 | |
101 def _assert_addresses(self, mess_data): | |
102 """The mess_data that we got here has been modified by self.plugin.messageSendTrigger, | |
103 check that the addresses element has been added to the stanza.""" | |
104 expected = self._get_mess_data()["xml"] | |
105 addresses_extra = ( | |
106 """ | |
107 <addresses xmlns='http://jabber.org/protocol/address'> | |
108 <address type='%s' jid='%s'/> | |
109 <address type='%s' jid='%s'/> | |
110 <address type='%s' jid='%s'/> | |
111 </addresses>""" | |
112 % ADDRS | |
113 ) | |
114 addresses_element = parseXml(addresses_extra.encode("utf-8")) | |
115 expected.addChild(addresses_element) | |
116 self.assert_equal_xml( | |
117 mess_data["xml"].toXml().encode("utf-8"), expected.toXml().encode("utf-8") | |
118 ) | |
119 | |
120 def _check_sent_and_stored(self): | |
121 """Check that all the recipients got their messages and that the history has been filled. | |
122 /!\ see the comments in XEP_0033.send_and_store_message""" | |
123 sent = [] | |
124 stored = [] | |
125 d_list = [] | |
126 | |
127 def cb(entities, to_jid): | |
128 if host in entities: | |
129 if ( | |
130 host not in sent | |
131 ): # send the message to the entity offering the feature | |
132 sent.append(host) | |
133 stored.append(host) | |
134 stored.append(to_jid) # store in history for each recipient | |
135 else: # feature not supported, use normal behavior | |
136 sent.append(to_jid) | |
137 stored.append(to_jid) | |
138 helpers.unmute_logging() | |
139 | |
140 for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): | |
141 to_jid = JID(to_s) | |
142 host = JID(to_jid.host) | |
143 helpers.mute_logging() | |
144 d = self.host.find_features_set([plugin.NS_ADDRESS], jid_=host, profile=PROFILE) | |
145 d.addCallback(cb, to_jid) | |
146 d_list.append(d) | |
147 | |
148 def cb_list(__): | |
149 msg = "/!\ see the comments in XEP_0033.send_and_store_message" | |
150 sent_recipients = [ | |
151 JID(elt["to"]) for elt in self.host.get_sent_messages(PROFILE_INDEX) | |
152 ] | |
153 self.assert_equal_unsorted_list(sent_recipients, sent, msg) | |
154 self.assert_equal_unsorted_list(self.host.stored_messages, stored, msg) | |
155 | |
156 return defer.DeferredList(d_list).addCallback(cb_list) | |
157 | |
158 def _trigger(self, data): | |
159 """Execute self.plugin.messageSendTrigger with a different logging | |
160 level to not pollute the output, then check that the plugin did its | |
161 job. It should abort sending the message or add the extended | |
162 addressing information to the stanza. | |
163 @param data: the data to be processed by self.plugin.messageSendTrigger | |
164 """ | |
165 pre_treatments = defer.Deferred() | |
166 post_treatments = defer.Deferred() | |
167 helpers.mute_logging() | |
168 self.plugin.messageSendTrigger( | |
169 self.host.get_client[PROFILE], data, pre_treatments, post_treatments | |
170 ) | |
171 post_treatments.callback(data) | |
172 helpers.unmute_logging() | |
173 post_treatments.addCallbacks( | |
174 self._assert_addresses, lambda failure: failure.trap(CancelError) | |
175 ) | |
176 return post_treatments | |
177 | |
178 def test_message_send_trigger_feature_not_supported(self): | |
179 # feature is not supported, abort the message | |
180 self.host.memory.reinit() | |
181 data = self._get_mess_data() | |
182 return self._trigger(data) | |
183 | |
184 def test_message_send_trigger_feature_supported(self): | |
185 # feature is supported by the main target server | |
186 self.host.reinit() | |
187 self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) | |
188 data = self._get_mess_data() | |
189 d = self._trigger(data) | |
190 return d.addCallback(lambda __: self._check_sent_and_stored()) | |
191 | |
192 def test_message_send_trigger_feature_fully_supported(self): | |
193 # feature is supported by all target servers | |
194 self.host.reinit() | |
195 self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) | |
196 for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): | |
197 self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) | |
198 data = self._get_mess_data() | |
199 d = self._trigger(data) | |
200 return d.addCallback(lambda __: self._check_sent_and_stored()) | |
201 | |
202 def test_message_send_trigger_fix_wrong_entity(self): | |
203 # check that a wrong recipient entity is fixed by the backend | |
204 self.host.reinit() | |
205 self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) | |
206 for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): | |
207 self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) | |
208 data = self._get_mess_data() | |
209 data["to"] = JID(JID_STR_X_TO) | |
210 d = self._trigger(data) | |
211 return d.addCallback(lambda __: self._check_sent_and_stored()) |