comparison libervia/backend/test/helpers.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/test/helpers.py@524856bd7b19
children 15055a00162c
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
21 ## logging configuration for tests ##
22 from libervia.backend.core import log_config
23 log_config.sat_configure()
24
25 import logging
26 from libervia.backend.core.log import getLogger
27 getLogger().setLevel(logging.WARNING) # put this to DEBUG when needed
28
29 from libervia.backend.core import exceptions
30 from libervia.backend.tools import config as tools_config
31 from .constants import Const as C
32 from wokkel.xmppim import RosterItem
33 from wokkel.generic import parseXml
34 from libervia.backend.core.xmpp import SatRosterProtocol
35 from libervia.backend.memory.memory import Params, Memory
36 from twisted.trial.unittest import FailTest
37 from twisted.trial import unittest
38 from twisted.internet import defer
39 from twisted.words.protocols.jabber.jid import JID
40 from twisted.words.xish import domish
41 from xml.etree import cElementTree as etree
42 from collections import Counter
43 import re
44
45
46 def b2s(value):
47 """Convert a bool to a unicode string used in bridge
48 @param value: boolean value
49 @return: unicode conversion, according to bridge convention
50
51 """
52 return "True" if value else "False"
53
54
55 def mute_logging():
56 """Temporarily set the logging level to CRITICAL to not pollute the output with expected errors."""
57 logger = getLogger()
58 logger.original_level = logger.getEffectiveLevel()
59 logger.setLevel(logging.CRITICAL)
60
61
62 def unmute_logging():
63 """Restore the logging level after it has been temporarily disabled."""
64 logger = getLogger()
65 logger.setLevel(logger.original_level)
66
67
68 class DifferentArgsException(FailTest):
69 pass
70
71
72 class DifferentXMLException(FailTest):
73 pass
74
75
76 class DifferentListException(FailTest):
77 pass
78
79
80 class FakeSAT(object):
81 """Class to simulate a SAT instance"""
82
83 def __init__(self):
84 self.bridge = FakeBridge()
85 self.memory = FakeMemory(self)
86 self.trigger = FakeTriggerManager()
87 self.profiles = {}
88 self.reinit()
89
90 def reinit(self):
91 """This can be called by tests that check for sent and stored messages,
92 uses FakeClient or get/set some other data that need to be cleaned"""
93 for profile in self.profiles:
94 self.profiles[profile].reinit()
95 self.memory.reinit()
96 self.stored_messages = []
97 self.plugins = {}
98 self.profiles = {}
99
100 def contact_del(self, to, profile_key):
101 #TODO
102 pass
103
104 def register_callback(self, callback, *args, **kwargs):
105 pass
106
107 def message_send(self, to_s, msg, subject=None, mess_type='auto', extra={}, profile_key='@NONE@'):
108 self.send_and_store_message({"to": JID(to_s)})
109
110 def _send_message_to_stream(self, mess_data, client):
111 """Save the information to check later to whom messages have been sent.
112
113 @param mess_data: message data dictionnary
114 @param client: profile's client
115 """
116 client.xmlstream.send(mess_data['xml'])
117 return mess_data
118
119 def _store_message(self, mess_data, client):
120 """Save the information to check later if entries have been added to the history.
121
122 @param mess_data: message data dictionnary
123 @param client: profile's client
124 """
125 self.stored_messages.append(mess_data["to"])
126 return mess_data
127
128 def send_message_to_bridge(self, mess_data, client):
129 """Simulate the message being sent to the frontends.
130
131 @param mess_data: message data dictionnary
132 @param client: profile's client
133 """
134 return mess_data # TODO
135
136 def get_profile_name(self, profile_key):
137 """Get the profile name from the profile_key"""
138 return profile_key
139
140 def get_client(self, profile_key):
141 """Convenient method to get client from profile key
142 @return: client or None if it doesn't exist"""
143 profile = self.memory.get_profile_name(profile_key)
144 if not profile:
145 raise exceptions.ProfileKeyUnknown
146 if profile not in self.profiles:
147 self.profiles[profile] = FakeClient(self, profile)
148 return self.profiles[profile]
149
150 def get_jid_n_stream(self, profile_key):
151 """Convenient method to get jid and stream from profile key
152 @return: tuple (jid, xmlstream) from profile, can be None"""
153 return (C.PROFILE_DICT[profile_key], None)
154
155 def is_connected(self, profile):
156 return True
157
158 def get_sent_messages(self, profile_index):
159 """Return all the sent messages (in the order they have been sent) and
160 empty the list. Called by tests. FakeClient instances associated to each
161 profile must have been previously initialized with the method
162 FakeSAT.get_client.
163
164 @param profile_index: index of the profile to consider (cf. C.PROFILE)
165 @return: the sent messages for given profile, or None"""
166 try:
167 tmp = self.profiles[C.PROFILE[profile_index]].xmlstream.sent
168 self.profiles[C.PROFILE[profile_index]].xmlstream.sent = []
169 return tmp
170 except IndexError:
171 return None
172
173 def get_sent_message(self, profile_index):
174 """Pop and return the sent message in first position (works like a FIFO).
175 Called by tests. FakeClient instances associated to each profile must have
176 been previously initialized with the method FakeSAT.get_client.
177
178 @param profile_index: index of the profile to consider (cf. C.PROFILE)
179 @return: the sent message for given profile, or None"""
180 try:
181 return self.profiles[C.PROFILE[profile_index]].xmlstream.sent.pop(0)
182 except IndexError:
183 return None
184
185 def get_sent_message_xml(self, profile_index):
186 """Pop and return the sent message in first position (works like a FIFO).
187 Called by tests. FakeClient instances associated to each profile must have
188 been previously initialized with the method FakeSAT.get_client.
189 @return: XML representation of the sent message for given profile, or None"""
190 entry = self.get_sent_message(profile_index)
191 return entry.toXml() if entry else None
192
193 def find_features_set(self, features, identity=None, jid_=None, profile=C.PROF_KEY_NONE):
194 """Call self.add_feature from your tests to change the return value.
195
196 @return: a set of entities
197 """
198 client = self.get_client(profile)
199 if jid_ is None:
200 jid_ = JID(client.jid.host)
201 try:
202 if set(features).issubset(client.features[jid_]):
203 return defer.succeed(set([jid_]))
204 except (TypeError, AttributeError, KeyError):
205 pass
206 return defer.succeed(set())
207
208 def add_feature(self, jid_, feature, profile_key):
209 """Add a feature to an entity.
210
211 To be called from your tests when needed.
212 """
213 client = self.get_client(profile_key)
214 if not hasattr(client, 'features'):
215 client.features = {}
216 if jid_ not in client.features:
217 client.features[jid_] = set()
218 client.features[jid_].add(feature)
219
220
221 class FakeBridge(object):
222 """Class to simulate and test bridge calls"""
223
224 def __init__(self):
225 self.expected_calls = {}
226
227 def expect_call(self, name, *check_args, **check_kwargs):
228 if hasattr(self, name): # queue this new call as one already exists
229 self.expected_calls.setdefault(name, [])
230 self.expected_calls[name].append((check_args, check_kwargs))
231 return
232
233 def check_call(*args, **kwargs):
234 if args != check_args or kwargs != check_kwargs:
235 print("\n\n--------------------")
236 print("Args are not equals:")
237 print("args\n----\n%s (sent)\n%s (wanted)" % (args, check_args))
238 print("kwargs\n------\n%s (sent)\n%s (wanted)" % (kwargs, check_kwargs))
239 print("--------------------\n\n")
240 raise DifferentArgsException
241 delattr(self, name)
242
243 if name in self.expected_calls: # register the next call
244 args, kwargs = self.expected_calls[name].pop(0)
245 if len(self.expected_calls[name]) == 0:
246 del self.expected_calls[name]
247 self.expect_call(name, *args, **kwargs)
248
249 setattr(self, name, check_call)
250
251 def add_method(self, name, int_suffix, in_sign, out_sign, method, async_=False, doc=None):
252 pass
253
254 def add_signal(self, name, int_suffix, signature):
255 pass
256
257 def add_test_callback(self, name, method):
258 """This can be used to register callbacks for bridge methods AND signals.
259 Contrary to expect_call, this will not check if the method or signal is
260 called/sent with the correct arguments, it will instead run the callback
261 of your choice."""
262 setattr(self, name, method)
263
264
265 class FakeParams(Params):
266 """Class to simulate and test params object. The methods of Params that could
267 not be run (for example those using the storage attribute must be overwritten
268 by a naive simulation of what they should do."""
269
270 def __init__(self, host, storage):
271 Params.__init__(self, host, storage)
272 self.params = {} # naive simulation of values storage
273
274 def param_set(self, name, value, category, security_limit=-1, profile_key='@NONE@'):
275 profile = self.get_profile_name(profile_key)
276 self.params.setdefault(profile, {})
277 self.params[profile_key][(category, name)] = value
278
279 def param_get_a(self, name, category, attr="value", profile_key='@NONE@'):
280 profile = self.get_profile_name(profile_key)
281 return self.params[profile][(category, name)]
282
283 def get_profile_name(self, profile_key, return_profile_keys=False):
284 if profile_key == '@DEFAULT@':
285 return C.PROFILE[0]
286 elif profile_key == '@NONE@':
287 raise exceptions.ProfileNotSetError
288 else:
289 return profile_key
290
291 def load_ind_params(self, profile, cache=None):
292 self.params[profile] = {}
293 return defer.succeed(None)
294
295
296 class FakeMemory(Memory):
297 """Class to simulate and test memory object"""
298
299 def __init__(self, host):
300 # do not call Memory.__init__, we just want to call the methods that are
301 # manipulating basic stuff, the others should be overwritten when needed
302 self.host = host
303 self.params = FakeParams(host, None)
304 self.config = tools_config.parse_main_conf()
305 self.reinit()
306
307 def reinit(self):
308 """Tests that manipulate params, entities, features should
309 re-initialise the memory first to not fake the result."""
310 self.params.load_default_params()
311 self.params.params.clear()
312 self.params.frontends_cache = []
313 self.entities_data = {}
314
315 def get_profile_name(self, profile_key, return_profile_keys=False):
316 return self.params.get_profile_name(profile_key, return_profile_keys)
317
318 def add_to_history(self, from_jid, to_jid, message, _type='chat', extra=None, timestamp=None, profile="@NONE@"):
319 pass
320
321 def contact_add(self, contact_jid, attributes, groups, profile_key='@DEFAULT@'):
322 pass
323
324 def set_presence_status(self, contact_jid, show, priority, statuses, profile_key='@DEFAULT@'):
325 pass
326
327 def add_waiting_sub(self, type_, contact_jid, profile_key):
328 pass
329
330 def del_waiting_sub(self, contact_jid, profile_key):
331 pass
332
333 def update_entity_data(self, entity_jid, key, value, silent=False, profile_key="@NONE@"):
334 self.entities_data.setdefault(entity_jid, {})
335 self.entities_data[entity_jid][key] = value
336
337 def entity_data_get(self, entity_jid, keys, profile_key):
338 result = {}
339 for key in keys:
340 result[key] = self.entities_data[entity_jid][key]
341 return result
342
343
344 class FakeTriggerManager(object):
345
346 def add(self, point_name, callback, priority=0):
347 pass
348
349 def point(self, point_name, *args, **kwargs):
350 """We always return true to continue the action"""
351 return True
352
353
354 class FakeRosterProtocol(SatRosterProtocol):
355 """This class is used by FakeClient (one instance per profile)"""
356
357 def __init__(self, host, parent):
358 SatRosterProtocol.__init__(self, host)
359 self.parent = parent
360 self._jids = {}
361 self.add_item(parent.jid.userhostJID())
362
363 def add_item(self, jid, *args, **kwargs):
364 if not args and not kwargs:
365 # defaults values setted for the tests only
366 kwargs["subscriptionTo"] = True
367 kwargs["subscriptionFrom"] = True
368 roster_item = RosterItem(jid, *args, **kwargs)
369 attrs = {'to': b2s(roster_item.subscriptionTo), 'from': b2s(roster_item.subscriptionFrom), 'ask': b2s(roster_item.pendingOut)}
370 if roster_item.name:
371 attrs['name'] = roster_item.name
372 self.host.bridge.expect_call("contact_new", jid.full(), attrs, roster_item.groups, self.parent.profile)
373 self._jids[jid] = roster_item
374 self._register_item(roster_item)
375
376
377 class FakeXmlStream(object):
378 """This class is used by FakeClient (one instance per profile)"""
379
380 def __init__(self):
381 self.sent = []
382
383 def send(self, obj):
384 """Save the sent messages to compare them later.
385
386 @param obj (domish.Element, str or unicode): message to send
387 """
388 if not isinstance(obj, domish.Element):
389 assert(isinstance(obj, str) or isinstance(obj, str))
390 obj = parseXml(obj)
391
392 if obj.name == 'iq':
393 # IQ request expects an answer, return the request itself so
394 # you can check if it has been well built by your plugin.
395 self.iqDeferreds[obj['id']].callback(obj)
396
397 self.sent.append(obj)
398 return defer.succeed(None)
399
400 def addObserver(self, *argv):
401 pass
402
403
404 class FakeClient(object):
405 """Tests involving more than one profile need one instance of this class per profile"""
406
407 def __init__(self, host, profile=None):
408 self.host = host
409 self.profile = profile if profile else C.PROFILE[0]
410 self.jid = C.PROFILE_DICT[self.profile]
411 self.roster = FakeRosterProtocol(host, self)
412 self.xmlstream = FakeXmlStream()
413
414 def reinit(self):
415 self.xmlstream = FakeXmlStream()
416
417 def send(self, obj):
418 return self.xmlstream.send(obj)
419
420
421 class SatTestCase(unittest.TestCase):
422
423 def assert_equal_xml(self, xml, expected, ignore_blank=False):
424 def equal_elt(got_elt, exp_elt):
425 if ignore_blank:
426 for elt in got_elt, exp_elt:
427 for attr in ('text', 'tail'):
428 value = getattr(elt, attr)
429 try:
430 value = value.strip() or None
431 except AttributeError:
432 value = None
433 setattr(elt, attr, value)
434 if (got_elt.tag != exp_elt.tag):
435 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt))
436 print("tag: got [%s] expected: [%s]" % (got_elt.tag, exp_elt.tag))
437 return False
438 if (got_elt.attrib != exp_elt.attrib):
439 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt))
440 print("attribs: got %s expected %s" % (got_elt.attrib, exp_elt.attrib))
441 return False
442 if (got_elt.tail != exp_elt.tail or got_elt.text != exp_elt.text):
443 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt))
444 print("text: got [%s] expected: [%s]" % (got_elt.text, exp_elt.text))
445 print("tail: got [%s] expected: [%s]" % (got_elt.tail, exp_elt.tail))
446 return False
447 if (len(got_elt) != len(exp_elt)):
448 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt))
449 print("children len: got %d expected: %d" % (len(got_elt), len(exp_elt)))
450 return False
451 for idx, child in enumerate(got_elt):
452 if not equal_elt(child, exp_elt[idx]):
453 return False
454 return True
455
456 def remove_blank(xml):
457 lines = [line.strip() for line in re.sub(r'[ \t\r\f\v]+', ' ', xml).split('\n')]
458 return '\n'.join([line for line in lines if line])
459
460 xml_elt = etree.fromstring(remove_blank(xml) if ignore_blank else xml)
461 expected_elt = etree.fromstring(remove_blank(expected) if ignore_blank else expected)
462
463 if not equal_elt(xml_elt, expected_elt):
464 print("---")
465 print("XML are not equals:")
466 print("got:\n-\n%s\n-\n\n" % etree.tostring(xml_elt, encoding='utf-8'))
467 print("was expecting:\n-\n%s\n-\n\n" % etree.tostring(expected_elt, encoding='utf-8'))
468 print("---")
469 raise DifferentXMLException
470
471 def assert_equal_unsorted_list(self, a, b, msg):
472 counter_a = Counter(a)
473 counter_b = Counter(b)
474 if counter_a != counter_b:
475 print("---")
476 print("Unsorted lists are not equals:")
477 print("got : %s" % counter_a)
478 print("was expecting: %s" % counter_b)
479 if msg:
480 print(msg)
481 print("---")
482 raise DifferentListException