Mercurial > libervia-backend
comparison libervia/backend/test/helpers.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/helpers.py@524856bd7b19 |
children | 15055a00162c |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 | |
21 ## logging configuration for tests ## | |
22 from libervia.backend.core import log_config | |
23 log_config.sat_configure() | |
24 | |
25 import logging | |
26 from libervia.backend.core.log import getLogger | |
27 getLogger().setLevel(logging.WARNING) # put this to DEBUG when needed | |
28 | |
29 from libervia.backend.core import exceptions | |
30 from libervia.backend.tools import config as tools_config | |
31 from .constants import Const as C | |
32 from wokkel.xmppim import RosterItem | |
33 from wokkel.generic import parseXml | |
34 from libervia.backend.core.xmpp import SatRosterProtocol | |
35 from libervia.backend.memory.memory import Params, Memory | |
36 from twisted.trial.unittest import FailTest | |
37 from twisted.trial import unittest | |
38 from twisted.internet import defer | |
39 from twisted.words.protocols.jabber.jid import JID | |
40 from twisted.words.xish import domish | |
41 from xml.etree import cElementTree as etree | |
42 from collections import Counter | |
43 import re | |
44 | |
45 | |
46 def b2s(value): | |
47 """Convert a bool to a unicode string used in bridge | |
48 @param value: boolean value | |
49 @return: unicode conversion, according to bridge convention | |
50 | |
51 """ | |
52 return "True" if value else "False" | |
53 | |
54 | |
55 def mute_logging(): | |
56 """Temporarily set the logging level to CRITICAL to not pollute the output with expected errors.""" | |
57 logger = getLogger() | |
58 logger.original_level = logger.getEffectiveLevel() | |
59 logger.setLevel(logging.CRITICAL) | |
60 | |
61 | |
62 def unmute_logging(): | |
63 """Restore the logging level after it has been temporarily disabled.""" | |
64 logger = getLogger() | |
65 logger.setLevel(logger.original_level) | |
66 | |
67 | |
68 class DifferentArgsException(FailTest): | |
69 pass | |
70 | |
71 | |
72 class DifferentXMLException(FailTest): | |
73 pass | |
74 | |
75 | |
76 class DifferentListException(FailTest): | |
77 pass | |
78 | |
79 | |
80 class FakeSAT(object): | |
81 """Class to simulate a SAT instance""" | |
82 | |
83 def __init__(self): | |
84 self.bridge = FakeBridge() | |
85 self.memory = FakeMemory(self) | |
86 self.trigger = FakeTriggerManager() | |
87 self.profiles = {} | |
88 self.reinit() | |
89 | |
90 def reinit(self): | |
91 """This can be called by tests that check for sent and stored messages, | |
92 uses FakeClient or get/set some other data that need to be cleaned""" | |
93 for profile in self.profiles: | |
94 self.profiles[profile].reinit() | |
95 self.memory.reinit() | |
96 self.stored_messages = [] | |
97 self.plugins = {} | |
98 self.profiles = {} | |
99 | |
100 def contact_del(self, to, profile_key): | |
101 #TODO | |
102 pass | |
103 | |
104 def register_callback(self, callback, *args, **kwargs): | |
105 pass | |
106 | |
107 def message_send(self, to_s, msg, subject=None, mess_type='auto', extra={}, profile_key='@NONE@'): | |
108 self.send_and_store_message({"to": JID(to_s)}) | |
109 | |
110 def _send_message_to_stream(self, mess_data, client): | |
111 """Save the information to check later to whom messages have been sent. | |
112 | |
113 @param mess_data: message data dictionnary | |
114 @param client: profile's client | |
115 """ | |
116 client.xmlstream.send(mess_data['xml']) | |
117 return mess_data | |
118 | |
119 def _store_message(self, mess_data, client): | |
120 """Save the information to check later if entries have been added to the history. | |
121 | |
122 @param mess_data: message data dictionnary | |
123 @param client: profile's client | |
124 """ | |
125 self.stored_messages.append(mess_data["to"]) | |
126 return mess_data | |
127 | |
128 def send_message_to_bridge(self, mess_data, client): | |
129 """Simulate the message being sent to the frontends. | |
130 | |
131 @param mess_data: message data dictionnary | |
132 @param client: profile's client | |
133 """ | |
134 return mess_data # TODO | |
135 | |
136 def get_profile_name(self, profile_key): | |
137 """Get the profile name from the profile_key""" | |
138 return profile_key | |
139 | |
140 def get_client(self, profile_key): | |
141 """Convenient method to get client from profile key | |
142 @return: client or None if it doesn't exist""" | |
143 profile = self.memory.get_profile_name(profile_key) | |
144 if not profile: | |
145 raise exceptions.ProfileKeyUnknown | |
146 if profile not in self.profiles: | |
147 self.profiles[profile] = FakeClient(self, profile) | |
148 return self.profiles[profile] | |
149 | |
150 def get_jid_n_stream(self, profile_key): | |
151 """Convenient method to get jid and stream from profile key | |
152 @return: tuple (jid, xmlstream) from profile, can be None""" | |
153 return (C.PROFILE_DICT[profile_key], None) | |
154 | |
155 def is_connected(self, profile): | |
156 return True | |
157 | |
158 def get_sent_messages(self, profile_index): | |
159 """Return all the sent messages (in the order they have been sent) and | |
160 empty the list. Called by tests. FakeClient instances associated to each | |
161 profile must have been previously initialized with the method | |
162 FakeSAT.get_client. | |
163 | |
164 @param profile_index: index of the profile to consider (cf. C.PROFILE) | |
165 @return: the sent messages for given profile, or None""" | |
166 try: | |
167 tmp = self.profiles[C.PROFILE[profile_index]].xmlstream.sent | |
168 self.profiles[C.PROFILE[profile_index]].xmlstream.sent = [] | |
169 return tmp | |
170 except IndexError: | |
171 return None | |
172 | |
173 def get_sent_message(self, profile_index): | |
174 """Pop and return the sent message in first position (works like a FIFO). | |
175 Called by tests. FakeClient instances associated to each profile must have | |
176 been previously initialized with the method FakeSAT.get_client. | |
177 | |
178 @param profile_index: index of the profile to consider (cf. C.PROFILE) | |
179 @return: the sent message for given profile, or None""" | |
180 try: | |
181 return self.profiles[C.PROFILE[profile_index]].xmlstream.sent.pop(0) | |
182 except IndexError: | |
183 return None | |
184 | |
185 def get_sent_message_xml(self, profile_index): | |
186 """Pop and return the sent message in first position (works like a FIFO). | |
187 Called by tests. FakeClient instances associated to each profile must have | |
188 been previously initialized with the method FakeSAT.get_client. | |
189 @return: XML representation of the sent message for given profile, or None""" | |
190 entry = self.get_sent_message(profile_index) | |
191 return entry.toXml() if entry else None | |
192 | |
193 def find_features_set(self, features, identity=None, jid_=None, profile=C.PROF_KEY_NONE): | |
194 """Call self.add_feature from your tests to change the return value. | |
195 | |
196 @return: a set of entities | |
197 """ | |
198 client = self.get_client(profile) | |
199 if jid_ is None: | |
200 jid_ = JID(client.jid.host) | |
201 try: | |
202 if set(features).issubset(client.features[jid_]): | |
203 return defer.succeed(set([jid_])) | |
204 except (TypeError, AttributeError, KeyError): | |
205 pass | |
206 return defer.succeed(set()) | |
207 | |
208 def add_feature(self, jid_, feature, profile_key): | |
209 """Add a feature to an entity. | |
210 | |
211 To be called from your tests when needed. | |
212 """ | |
213 client = self.get_client(profile_key) | |
214 if not hasattr(client, 'features'): | |
215 client.features = {} | |
216 if jid_ not in client.features: | |
217 client.features[jid_] = set() | |
218 client.features[jid_].add(feature) | |
219 | |
220 | |
221 class FakeBridge(object): | |
222 """Class to simulate and test bridge calls""" | |
223 | |
224 def __init__(self): | |
225 self.expected_calls = {} | |
226 | |
227 def expect_call(self, name, *check_args, **check_kwargs): | |
228 if hasattr(self, name): # queue this new call as one already exists | |
229 self.expected_calls.setdefault(name, []) | |
230 self.expected_calls[name].append((check_args, check_kwargs)) | |
231 return | |
232 | |
233 def check_call(*args, **kwargs): | |
234 if args != check_args or kwargs != check_kwargs: | |
235 print("\n\n--------------------") | |
236 print("Args are not equals:") | |
237 print("args\n----\n%s (sent)\n%s (wanted)" % (args, check_args)) | |
238 print("kwargs\n------\n%s (sent)\n%s (wanted)" % (kwargs, check_kwargs)) | |
239 print("--------------------\n\n") | |
240 raise DifferentArgsException | |
241 delattr(self, name) | |
242 | |
243 if name in self.expected_calls: # register the next call | |
244 args, kwargs = self.expected_calls[name].pop(0) | |
245 if len(self.expected_calls[name]) == 0: | |
246 del self.expected_calls[name] | |
247 self.expect_call(name, *args, **kwargs) | |
248 | |
249 setattr(self, name, check_call) | |
250 | |
251 def add_method(self, name, int_suffix, in_sign, out_sign, method, async_=False, doc=None): | |
252 pass | |
253 | |
254 def add_signal(self, name, int_suffix, signature): | |
255 pass | |
256 | |
257 def add_test_callback(self, name, method): | |
258 """This can be used to register callbacks for bridge methods AND signals. | |
259 Contrary to expect_call, this will not check if the method or signal is | |
260 called/sent with the correct arguments, it will instead run the callback | |
261 of your choice.""" | |
262 setattr(self, name, method) | |
263 | |
264 | |
265 class FakeParams(Params): | |
266 """Class to simulate and test params object. The methods of Params that could | |
267 not be run (for example those using the storage attribute must be overwritten | |
268 by a naive simulation of what they should do.""" | |
269 | |
270 def __init__(self, host, storage): | |
271 Params.__init__(self, host, storage) | |
272 self.params = {} # naive simulation of values storage | |
273 | |
274 def param_set(self, name, value, category, security_limit=-1, profile_key='@NONE@'): | |
275 profile = self.get_profile_name(profile_key) | |
276 self.params.setdefault(profile, {}) | |
277 self.params[profile_key][(category, name)] = value | |
278 | |
279 def param_get_a(self, name, category, attr="value", profile_key='@NONE@'): | |
280 profile = self.get_profile_name(profile_key) | |
281 return self.params[profile][(category, name)] | |
282 | |
283 def get_profile_name(self, profile_key, return_profile_keys=False): | |
284 if profile_key == '@DEFAULT@': | |
285 return C.PROFILE[0] | |
286 elif profile_key == '@NONE@': | |
287 raise exceptions.ProfileNotSetError | |
288 else: | |
289 return profile_key | |
290 | |
291 def load_ind_params(self, profile, cache=None): | |
292 self.params[profile] = {} | |
293 return defer.succeed(None) | |
294 | |
295 | |
296 class FakeMemory(Memory): | |
297 """Class to simulate and test memory object""" | |
298 | |
299 def __init__(self, host): | |
300 # do not call Memory.__init__, we just want to call the methods that are | |
301 # manipulating basic stuff, the others should be overwritten when needed | |
302 self.host = host | |
303 self.params = FakeParams(host, None) | |
304 self.config = tools_config.parse_main_conf() | |
305 self.reinit() | |
306 | |
307 def reinit(self): | |
308 """Tests that manipulate params, entities, features should | |
309 re-initialise the memory first to not fake the result.""" | |
310 self.params.load_default_params() | |
311 self.params.params.clear() | |
312 self.params.frontends_cache = [] | |
313 self.entities_data = {} | |
314 | |
315 def get_profile_name(self, profile_key, return_profile_keys=False): | |
316 return self.params.get_profile_name(profile_key, return_profile_keys) | |
317 | |
318 def add_to_history(self, from_jid, to_jid, message, _type='chat', extra=None, timestamp=None, profile="@NONE@"): | |
319 pass | |
320 | |
321 def contact_add(self, contact_jid, attributes, groups, profile_key='@DEFAULT@'): | |
322 pass | |
323 | |
324 def set_presence_status(self, contact_jid, show, priority, statuses, profile_key='@DEFAULT@'): | |
325 pass | |
326 | |
327 def add_waiting_sub(self, type_, contact_jid, profile_key): | |
328 pass | |
329 | |
330 def del_waiting_sub(self, contact_jid, profile_key): | |
331 pass | |
332 | |
333 def update_entity_data(self, entity_jid, key, value, silent=False, profile_key="@NONE@"): | |
334 self.entities_data.setdefault(entity_jid, {}) | |
335 self.entities_data[entity_jid][key] = value | |
336 | |
337 def entity_data_get(self, entity_jid, keys, profile_key): | |
338 result = {} | |
339 for key in keys: | |
340 result[key] = self.entities_data[entity_jid][key] | |
341 return result | |
342 | |
343 | |
344 class FakeTriggerManager(object): | |
345 | |
346 def add(self, point_name, callback, priority=0): | |
347 pass | |
348 | |
349 def point(self, point_name, *args, **kwargs): | |
350 """We always return true to continue the action""" | |
351 return True | |
352 | |
353 | |
354 class FakeRosterProtocol(SatRosterProtocol): | |
355 """This class is used by FakeClient (one instance per profile)""" | |
356 | |
357 def __init__(self, host, parent): | |
358 SatRosterProtocol.__init__(self, host) | |
359 self.parent = parent | |
360 self._jids = {} | |
361 self.add_item(parent.jid.userhostJID()) | |
362 | |
363 def add_item(self, jid, *args, **kwargs): | |
364 if not args and not kwargs: | |
365 # defaults values setted for the tests only | |
366 kwargs["subscriptionTo"] = True | |
367 kwargs["subscriptionFrom"] = True | |
368 roster_item = RosterItem(jid, *args, **kwargs) | |
369 attrs = {'to': b2s(roster_item.subscriptionTo), 'from': b2s(roster_item.subscriptionFrom), 'ask': b2s(roster_item.pendingOut)} | |
370 if roster_item.name: | |
371 attrs['name'] = roster_item.name | |
372 self.host.bridge.expect_call("contact_new", jid.full(), attrs, roster_item.groups, self.parent.profile) | |
373 self._jids[jid] = roster_item | |
374 self._register_item(roster_item) | |
375 | |
376 | |
377 class FakeXmlStream(object): | |
378 """This class is used by FakeClient (one instance per profile)""" | |
379 | |
380 def __init__(self): | |
381 self.sent = [] | |
382 | |
383 def send(self, obj): | |
384 """Save the sent messages to compare them later. | |
385 | |
386 @param obj (domish.Element, str or unicode): message to send | |
387 """ | |
388 if not isinstance(obj, domish.Element): | |
389 assert(isinstance(obj, str) or isinstance(obj, str)) | |
390 obj = parseXml(obj) | |
391 | |
392 if obj.name == 'iq': | |
393 # IQ request expects an answer, return the request itself so | |
394 # you can check if it has been well built by your plugin. | |
395 self.iqDeferreds[obj['id']].callback(obj) | |
396 | |
397 self.sent.append(obj) | |
398 return defer.succeed(None) | |
399 | |
400 def addObserver(self, *argv): | |
401 pass | |
402 | |
403 | |
404 class FakeClient(object): | |
405 """Tests involving more than one profile need one instance of this class per profile""" | |
406 | |
407 def __init__(self, host, profile=None): | |
408 self.host = host | |
409 self.profile = profile if profile else C.PROFILE[0] | |
410 self.jid = C.PROFILE_DICT[self.profile] | |
411 self.roster = FakeRosterProtocol(host, self) | |
412 self.xmlstream = FakeXmlStream() | |
413 | |
414 def reinit(self): | |
415 self.xmlstream = FakeXmlStream() | |
416 | |
417 def send(self, obj): | |
418 return self.xmlstream.send(obj) | |
419 | |
420 | |
421 class SatTestCase(unittest.TestCase): | |
422 | |
423 def assert_equal_xml(self, xml, expected, ignore_blank=False): | |
424 def equal_elt(got_elt, exp_elt): | |
425 if ignore_blank: | |
426 for elt in got_elt, exp_elt: | |
427 for attr in ('text', 'tail'): | |
428 value = getattr(elt, attr) | |
429 try: | |
430 value = value.strip() or None | |
431 except AttributeError: | |
432 value = None | |
433 setattr(elt, attr, value) | |
434 if (got_elt.tag != exp_elt.tag): | |
435 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | |
436 print("tag: got [%s] expected: [%s]" % (got_elt.tag, exp_elt.tag)) | |
437 return False | |
438 if (got_elt.attrib != exp_elt.attrib): | |
439 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | |
440 print("attribs: got %s expected %s" % (got_elt.attrib, exp_elt.attrib)) | |
441 return False | |
442 if (got_elt.tail != exp_elt.tail or got_elt.text != exp_elt.text): | |
443 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | |
444 print("text: got [%s] expected: [%s]" % (got_elt.text, exp_elt.text)) | |
445 print("tail: got [%s] expected: [%s]" % (got_elt.tail, exp_elt.tail)) | |
446 return False | |
447 if (len(got_elt) != len(exp_elt)): | |
448 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | |
449 print("children len: got %d expected: %d" % (len(got_elt), len(exp_elt))) | |
450 return False | |
451 for idx, child in enumerate(got_elt): | |
452 if not equal_elt(child, exp_elt[idx]): | |
453 return False | |
454 return True | |
455 | |
456 def remove_blank(xml): | |
457 lines = [line.strip() for line in re.sub(r'[ \t\r\f\v]+', ' ', xml).split('\n')] | |
458 return '\n'.join([line for line in lines if line]) | |
459 | |
460 xml_elt = etree.fromstring(remove_blank(xml) if ignore_blank else xml) | |
461 expected_elt = etree.fromstring(remove_blank(expected) if ignore_blank else expected) | |
462 | |
463 if not equal_elt(xml_elt, expected_elt): | |
464 print("---") | |
465 print("XML are not equals:") | |
466 print("got:\n-\n%s\n-\n\n" % etree.tostring(xml_elt, encoding='utf-8')) | |
467 print("was expecting:\n-\n%s\n-\n\n" % etree.tostring(expected_elt, encoding='utf-8')) | |
468 print("---") | |
469 raise DifferentXMLException | |
470 | |
471 def assert_equal_unsorted_list(self, a, b, msg): | |
472 counter_a = Counter(a) | |
473 counter_b = Counter(b) | |
474 if counter_a != counter_b: | |
475 print("---") | |
476 print("Unsorted lists are not equals:") | |
477 print("got : %s" % counter_a) | |
478 print("was expecting: %s" % counter_b) | |
479 if msg: | |
480 print(msg) | |
481 print("---") | |
482 raise DifferentListException |