Mercurial > libervia-backend
diff libervia/backend/test/test_plugin_xep_0033.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/test_plugin_xep_0033.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/test/test_plugin_xep_0033.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 + + +# SAT: a jabber client +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) +# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" Plugin extended addressing stanzas """ + +from .constants import Const +from libervia.backend.test import helpers +from libervia.backend.plugins import plugin_xep_0033 as plugin +from libervia.backend.core.exceptions import CancelError +from twisted.internet import defer +from wokkel.generic import parseXml +from twisted.words.protocols.jabber.jid import JID + +PROFILE_INDEX = 0 +PROFILE = Const.PROFILE[PROFILE_INDEX] +JID_STR_FROM = Const.JID_STR[1] +JID_STR_TO = Const.PROFILE_DICT[PROFILE].host +JID_STR_X_TO = Const.JID_STR[0] +JID_STR_X_CC = Const.JID_STR[1] +JID_STR_X_BCC = Const.JID_STR[2] + +ADDRS = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC) + + +class XEP_0033Test(helpers.SatTestCase): + def setUp(self): + self.host = helpers.FakeSAT() + self.plugin = plugin.XEP_0033(self.host) + + def test_message_received(self): + self.host.memory.reinit() + xml = """ + <message type="chat" from="%s" to="%s" id="test_1"> + <body>test</body> + <addresses xmlns='http://jabber.org/protocol/address'> + <address type='to' jid='%s'/> + <address type='cc' jid='%s'/> + <address type='bcc' jid='%s'/> + </addresses> + </message> + """ % ( + JID_STR_FROM, + JID_STR_TO, + JID_STR_X_TO, + JID_STR_X_CC, + JID_STR_X_BCC, + ) + stanza = parseXml(xml.encode("utf-8")) + treatments = defer.Deferred() + self.plugin.message_received_trigger( + self.host.get_client(PROFILE), stanza, treatments + ) + data = {"extra": {}} + + def cb(data): + expected = ("to", JID_STR_X_TO, "cc", JID_STR_X_CC, "bcc", JID_STR_X_BCC) + msg = "Expected: %s\nGot: %s" % (expected, data["extra"]["addresses"]) + self.assertEqual( + data["extra"]["addresses"], "%s:%s\n%s:%s\n%s:%s\n" % expected, msg + ) + + treatments.addCallback(cb) + return treatments.callback(data) + + def _get_mess_data(self): + mess_data = { + "to": JID(JID_STR_TO), + "type": "chat", + "message": "content", + "extra": {}, + } + mess_data["extra"]["address"] = "%s:%s\n%s:%s\n%s:%s\n" % ADDRS + original_stanza = """ + <message type="chat" from="%s" to="%s" id="test_1"> + <body>content</body> + </message> + """ % ( + JID_STR_FROM, + JID_STR_TO, + ) + mess_data["xml"] = parseXml(original_stanza.encode("utf-8")) + return mess_data + + def _assert_addresses(self, mess_data): + """The mess_data that we got here has been modified by self.plugin.messageSendTrigger, + check that the addresses element has been added to the stanza.""" + expected = self._get_mess_data()["xml"] + addresses_extra = ( + """ + <addresses xmlns='http://jabber.org/protocol/address'> + <address type='%s' jid='%s'/> + <address type='%s' jid='%s'/> + <address type='%s' jid='%s'/> + </addresses>""" + % ADDRS + ) + addresses_element = parseXml(addresses_extra.encode("utf-8")) + expected.addChild(addresses_element) + self.assert_equal_xml( + mess_data["xml"].toXml().encode("utf-8"), expected.toXml().encode("utf-8") + ) + + def _check_sent_and_stored(self): + """Check that all the recipients got their messages and that the history has been filled. + /!\ see the comments in XEP_0033.send_and_store_message""" + sent = [] + stored = [] + d_list = [] + + def cb(entities, to_jid): + if host in entities: + if ( + host not in sent + ): # send the message to the entity offering the feature + sent.append(host) + stored.append(host) + stored.append(to_jid) # store in history for each recipient + else: # feature not supported, use normal behavior + sent.append(to_jid) + stored.append(to_jid) + helpers.unmute_logging() + + for to_s in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): + to_jid = JID(to_s) + host = JID(to_jid.host) + helpers.mute_logging() + d = self.host.find_features_set([plugin.NS_ADDRESS], jid_=host, profile=PROFILE) + d.addCallback(cb, to_jid) + d_list.append(d) + + def cb_list(__): + msg = "/!\ see the comments in XEP_0033.send_and_store_message" + sent_recipients = [ + JID(elt["to"]) for elt in self.host.get_sent_messages(PROFILE_INDEX) + ] + self.assert_equal_unsorted_list(sent_recipients, sent, msg) + self.assert_equal_unsorted_list(self.host.stored_messages, stored, msg) + + return defer.DeferredList(d_list).addCallback(cb_list) + + def _trigger(self, data): + """Execute self.plugin.messageSendTrigger with a different logging + level to not pollute the output, then check that the plugin did its + job. It should abort sending the message or add the extended + addressing information to the stanza. + @param data: the data to be processed by self.plugin.messageSendTrigger + """ + pre_treatments = defer.Deferred() + post_treatments = defer.Deferred() + helpers.mute_logging() + self.plugin.messageSendTrigger( + self.host.get_client[PROFILE], data, pre_treatments, post_treatments + ) + post_treatments.callback(data) + helpers.unmute_logging() + post_treatments.addCallbacks( + self._assert_addresses, lambda failure: failure.trap(CancelError) + ) + return post_treatments + + def test_message_send_trigger_feature_not_supported(self): + # feature is not supported, abort the message + self.host.memory.reinit() + data = self._get_mess_data() + return self._trigger(data) + + def test_message_send_trigger_feature_supported(self): + # feature is supported by the main target server + self.host.reinit() + self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) + data = self._get_mess_data() + d = self._trigger(data) + return d.addCallback(lambda __: self._check_sent_and_stored()) + + def test_message_send_trigger_feature_fully_supported(self): + # feature is supported by all target servers + self.host.reinit() + self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) + for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): + self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) + data = self._get_mess_data() + d = self._trigger(data) + return d.addCallback(lambda __: self._check_sent_and_stored()) + + def test_message_send_trigger_fix_wrong_entity(self): + # check that a wrong recipient entity is fixed by the backend + self.host.reinit() + self.host.add_feature(JID(JID_STR_TO), plugin.NS_ADDRESS, PROFILE) + for dest in (JID_STR_X_TO, JID_STR_X_CC, JID_STR_X_BCC): + self.host.add_feature(JID(JID(dest).host), plugin.NS_ADDRESS, PROFILE) + data = self._get_mess_data() + data["to"] = JID(JID_STR_X_TO) + d = self._trigger(data) + return d.addCallback(lambda __: self._check_sent_and_stored())