comparison libervia/backend/test/test_plugin_xep_0033.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/test/test_plugin_xep_0033.py@524856bd7b19
children 0d7bb4df2343
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
7
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
17
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21 """ Plugin extended addressing stanzas """
22
23 from .constants import Const
24 from libervia.backend.test import helpers
25 from libervia.backend.plugins import plugin_xep_0033 as plugin
26 from libervia.backend.core.exceptions import CancelError
27 from twisted.internet import defer
28 from wokkel.generic import parseXml
29 from twisted.words.protocols.jabber.jid import JID
30
31 PROFILE_INDEX = 0
32 PROFILE = Const.PROFILE[PROFILE_INDEX]
33 JID_STR_FROM = Const.JID_STR[1]
34 JID_STR_TO = Const.PROFILE_DICT[PROFILE].host
35 JID_STR_X_TO = Const.JID_STR[0]
36 JID_STR_X_CC = Const.JID_STR[1]
37 JID_STR_X_BCC = Const.JID_STR[2]
38
39 ADDRS = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC)
40
41
42 class XEP_0033Test(helpers.SatTestCase):
43 def setUp(self):
44 self.host = helpers.FakeSAT()
45 self.plugin = plugin.XEP_0033(self.host)
46
47 def test_message_received(self):
48 self.host.memory.reinit()
49 xml = """
50 <message type="chat" from="%s" to="%s" id="test_1">
51 <body>test</body>
52 <addresses xmlns='http://jabber.org/protocol/address'>
53 <address type='to' jid='%s'/>
54 <address type='cc' jid='%s'/>
55 <address type='bcc' jid='%s'/>
56 </addresses>
57 </message>
58 """ % (
59 JID_STR_FROM,
60 JID_STR_TO,
61 JID_STR_X_TO,
62 JID_STR_X_CC,
63 JID_STR_X_BCC,
64 )
65 stanza = parseXml(xml.encode("utf-8"))
66 treatments = defer.Deferred()
67 self.plugin.message_received_trigger(
68 self.host.get_client(PROFILE), stanza, treatments
69 )
70 data = {"extra": {}}
71
72 def cb(data):
73 expected = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC)
74 msg = "Expected: %s\nGot: %s" % (expected, data["extra"]["addresses"])
75 self.assertEqual(
76 data["extra"]["addresses"], "%s:%s\n%s:%s\n%s:%s\n" % expected, msg
77 )
78
79 treatments.addCallback(cb)
80 return treatments.callback(data)
81
82 def _get_mess_data(self):
83 mess_data = {
84 "to": JID(JID_STR_TO),
85 "type": "chat",
86 "message": "content",
87 "extra": {},
88 }
89 mess_data["extra"]["address"] = "%s:%s\n%s:%s\n%s:%s\n" % ADDRS
90 original_stanza = """
91 <message type="chat" from="%s" to="%s" id="test_1">
92 <body>content</body>
93 </message>
94 """ % (
95 JID_STR_FROM,
96 JID_STR_TO,
97 )
98 mess_data["xml"] = parseXml(original_stanza.encode("utf-8"))
99 return mess_data
100
101 def _assert_addresses(self, mess_data):
102 """The mess_data that we got here has been modified by self.plugin.messageSendTrigger,
103 check that the addresses element has been added to the stanza."""
104 expected = self._get_mess_data()["xml"]
105 addresses_extra = (
106 """
107 <addresses xmlns='http://jabber.org/protocol/address'>
108 <address type='%s' jid='%s'/>
109 <address type='%s' jid='%s'/>
110 <address type='%s' jid='%s'/>
111 </addresses>"""
112 % ADDRS
113 )
114 addresses_element = parseXml(addresses_extra.encode("utf-8"))
115 expected.addChild(addresses_element)
116 self.assert_equal_xml(
117 mess_data["xml"].toXml().encode("utf-8"), expected.toXml().encode("utf-8")
118 )
119
120 def _check_sent_and_stored(self):
121 """Check that all the recipients got their messages and that the history has been filled.
122 /!\ see the comments in XEP_0033.send_and_store_message"""
123 sent = []
124 stored = []
125 d_list = []
126
127 def cb(entities, to_jid):
128 if host in entities:
129 if (
130 host not in sent
131 ): # send the message to the entity offering the feature
132 sent.append(host)
133 stored.append(host)
134 stored.append(to_jid) # store in history for each recipient
135 else: # feature not supported, use normal behavior
136 sent.append(to_jid)
137 stored.append(to_jid)
138 helpers.unmute_logging()
139
140 for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
141 to_jid = JID(to_s)
142 host = JID(to_jid.host)
143 helpers.mute_logging()
144 d = self.host.find_features_set([plugin.NS_ADDRESS], jid_=host, profile=PROFILE)
145 d.addCallback(cb, to_jid)
146 d_list.append(d)
147
148 def cb_list(__):
149 msg = "/!\ see the comments in XEP_0033.send_and_store_message"
150 sent_recipients = [
151 JID(elt["to"]) for elt in self.host.get_sent_messages(PROFILE_INDEX)
152 ]
153 self.assert_equal_unsorted_list(sent_recipients, sent, msg)
154 self.assert_equal_unsorted_list(self.host.stored_messages, stored, msg)
155
156 return defer.DeferredList(d_list).addCallback(cb_list)
157
158 def _trigger(self, data):
159 """Execute self.plugin.messageSendTrigger with a different logging
160 level to not pollute the output, then check that the plugin did its
161 job. It should abort sending the message or add the extended
162 addressing information to the stanza.
163 @param data: the data to be processed by self.plugin.messageSendTrigger
164 """
165 pre_treatments = defer.Deferred()
166 post_treatments = defer.Deferred()
167 helpers.mute_logging()
168 self.plugin.messageSendTrigger(
169 self.host.get_client[PROFILE], data, pre_treatments, post_treatments
170 )
171 post_treatments.callback(data)
172 helpers.unmute_logging()
173 post_treatments.addCallbacks(
174 self._assert_addresses, lambda failure: failure.trap(CancelError)
175 )
176 return post_treatments
177
178 def test_message_send_trigger_feature_not_supported(self):
179 # feature is not supported, abort the message
180 self.host.memory.reinit()
181 data = self._get_mess_data()
182 return self._trigger(data)
183
184 def test_message_send_trigger_feature_supported(self):
185 # feature is supported by the main target server
186 self.host.reinit()
187 self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
188 data = self._get_mess_data()
189 d = self._trigger(data)
190 return d.addCallback(lambda __: self._check_sent_and_stored())
191
192 def test_message_send_trigger_feature_fully_supported(self):
193 # feature is supported by all target servers
194 self.host.reinit()
195 self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
196 for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
197 self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE)
198 data = self._get_mess_data()
199 d = self._trigger(data)
200 return d.addCallback(lambda __: self._check_sent_and_stored())
201
202 def test_message_send_trigger_fix_wrong_entity(self):
203 # check that a wrong recipient entity is fixed by the backend
204 self.host.reinit()
205 self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE)
206 for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC):
207 self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE)
208 data = self._get_mess_data()
209 data["to"] = JID(JID_STR_X_TO)
210 d = self._trigger(data)
211 return d.addCallback(lambda __: self._check_sent_and_stored())