Mercurial > libervia-backend
comparison libervia/backend/test/helpers.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | bc7d45dedeb0 |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 | 20 |
21 ## logging configuration for tests ## | 21 ## logging configuration for tests ## |
22 from libervia.backend.core import log_config | 22 from libervia.backend.core import log_config |
23 | |
23 log_config.libervia_configure() | 24 log_config.libervia_configure() |
24 | 25 |
25 import logging | 26 import logging |
26 from libervia.backend.core.log import getLogger | 27 from libervia.backend.core.log import getLogger |
28 | |
27 getLogger().setLevel(logging.WARNING) # put this to DEBUG when needed | 29 getLogger().setLevel(logging.WARNING) # put this to DEBUG when needed |
28 | 30 |
29 from libervia.backend.core import exceptions | 31 from libervia.backend.core import exceptions |
30 from libervia.backend.tools import config as tools_config | 32 from libervia.backend.tools import config as tools_config |
31 from .constants import Const as C | 33 from .constants import Const as C |
47 """Convert a bool to a unicode string used in bridge | 49 """Convert a bool to a unicode string used in bridge |
48 @param value: boolean value | 50 @param value: boolean value |
49 @return: unicode conversion, according to bridge convention | 51 @return: unicode conversion, according to bridge convention |
50 | 52 |
51 """ | 53 """ |
52 return "True" if value else "False" | 54 return "True" if value else "False" |
53 | 55 |
54 | 56 |
55 def mute_logging(): | 57 def mute_logging(): |
56 """Temporarily set the logging level to CRITICAL to not pollute the output with expected errors.""" | 58 """Temporarily set the logging level to CRITICAL to not pollute the output with expected errors.""" |
57 logger = getLogger() | 59 logger = getLogger() |
96 self.stored_messages = [] | 98 self.stored_messages = [] |
97 self.plugins = {} | 99 self.plugins = {} |
98 self.profiles = {} | 100 self.profiles = {} |
99 | 101 |
100 def contact_del(self, to, profile_key): | 102 def contact_del(self, to, profile_key): |
101 #TODO | 103 # TODO |
102 pass | 104 pass |
103 | 105 |
104 def register_callback(self, callback, *args, **kwargs): | 106 def register_callback(self, callback, *args, **kwargs): |
105 pass | 107 pass |
106 | 108 |
107 def message_send(self, to_s, msg, subject=None, mess_type='auto', extra={}, profile_key='@NONE@'): | 109 def message_send( |
110 self, to_s, msg, subject=None, mess_type="auto", extra={}, profile_key="@NONE@" | |
111 ): | |
108 self.send_and_store_message({"to": JID(to_s)}) | 112 self.send_and_store_message({"to": JID(to_s)}) |
109 | 113 |
110 def _send_message_to_stream(self, mess_data, client): | 114 def _send_message_to_stream(self, mess_data, client): |
111 """Save the information to check later to whom messages have been sent. | 115 """Save the information to check later to whom messages have been sent. |
112 | 116 |
113 @param mess_data: message data dictionnary | 117 @param mess_data: message data dictionnary |
114 @param client: profile's client | 118 @param client: profile's client |
115 """ | 119 """ |
116 client.xmlstream.send(mess_data['xml']) | 120 client.xmlstream.send(mess_data["xml"]) |
117 return mess_data | 121 return mess_data |
118 | 122 |
119 def _store_message(self, mess_data, client): | 123 def _store_message(self, mess_data, client): |
120 """Save the information to check later if entries have been added to the history. | 124 """Save the information to check later if entries have been added to the history. |
121 | 125 |
188 been previously initialized with the method FakeSAT.get_client. | 192 been previously initialized with the method FakeSAT.get_client. |
189 @return: XML representation of the sent message for given profile, or None""" | 193 @return: XML representation of the sent message for given profile, or None""" |
190 entry = self.get_sent_message(profile_index) | 194 entry = self.get_sent_message(profile_index) |
191 return entry.toXml() if entry else None | 195 return entry.toXml() if entry else None |
192 | 196 |
193 def find_features_set(self, features, identity=None, jid_=None, profile=C.PROF_KEY_NONE): | 197 def find_features_set( |
198 self, features, identity=None, jid_=None, profile=C.PROF_KEY_NONE | |
199 ): | |
194 """Call self.add_feature from your tests to change the return value. | 200 """Call self.add_feature from your tests to change the return value. |
195 | 201 |
196 @return: a set of entities | 202 @return: a set of entities |
197 """ | 203 """ |
198 client = self.get_client(profile) | 204 client = self.get_client(profile) |
209 """Add a feature to an entity. | 215 """Add a feature to an entity. |
210 | 216 |
211 To be called from your tests when needed. | 217 To be called from your tests when needed. |
212 """ | 218 """ |
213 client = self.get_client(profile_key) | 219 client = self.get_client(profile_key) |
214 if not hasattr(client, 'features'): | 220 if not hasattr(client, "features"): |
215 client.features = {} | 221 client.features = {} |
216 if jid_ not in client.features: | 222 if jid_ not in client.features: |
217 client.features[jid_] = set() | 223 client.features[jid_] = set() |
218 client.features[jid_].add(feature) | 224 client.features[jid_].add(feature) |
219 | 225 |
246 del self.expected_calls[name] | 252 del self.expected_calls[name] |
247 self.expect_call(name, *args, **kwargs) | 253 self.expect_call(name, *args, **kwargs) |
248 | 254 |
249 setattr(self, name, check_call) | 255 setattr(self, name, check_call) |
250 | 256 |
251 def add_method(self, name, int_suffix, in_sign, out_sign, method, async_=False, doc=None): | 257 def add_method( |
258 self, name, int_suffix, in_sign, out_sign, method, async_=False, doc=None | |
259 ): | |
252 pass | 260 pass |
253 | 261 |
254 def add_signal(self, name, int_suffix, signature): | 262 def add_signal(self, name, int_suffix, signature): |
255 pass | 263 pass |
256 | 264 |
269 | 277 |
270 def __init__(self, host, storage): | 278 def __init__(self, host, storage): |
271 Params.__init__(self, host, storage) | 279 Params.__init__(self, host, storage) |
272 self.params = {} # naive simulation of values storage | 280 self.params = {} # naive simulation of values storage |
273 | 281 |
274 def param_set(self, name, value, category, security_limit=-1, profile_key='@NONE@'): | 282 def param_set(self, name, value, category, security_limit=-1, profile_key="@NONE@"): |
275 profile = self.get_profile_name(profile_key) | 283 profile = self.get_profile_name(profile_key) |
276 self.params.setdefault(profile, {}) | 284 self.params.setdefault(profile, {}) |
277 self.params[profile_key][(category, name)] = value | 285 self.params[profile_key][(category, name)] = value |
278 | 286 |
279 def param_get_a(self, name, category, attr="value", profile_key='@NONE@'): | 287 def param_get_a(self, name, category, attr="value", profile_key="@NONE@"): |
280 profile = self.get_profile_name(profile_key) | 288 profile = self.get_profile_name(profile_key) |
281 return self.params[profile][(category, name)] | 289 return self.params[profile][(category, name)] |
282 | 290 |
283 def get_profile_name(self, profile_key, return_profile_keys=False): | 291 def get_profile_name(self, profile_key, return_profile_keys=False): |
284 if profile_key == '@DEFAULT@': | 292 if profile_key == "@DEFAULT@": |
285 return C.PROFILE[0] | 293 return C.PROFILE[0] |
286 elif profile_key == '@NONE@': | 294 elif profile_key == "@NONE@": |
287 raise exceptions.ProfileNotSetError | 295 raise exceptions.ProfileNotSetError |
288 else: | 296 else: |
289 return profile_key | 297 return profile_key |
290 | 298 |
291 def load_ind_params(self, profile, cache=None): | 299 def load_ind_params(self, profile, cache=None): |
313 self.entities_data = {} | 321 self.entities_data = {} |
314 | 322 |
315 def get_profile_name(self, profile_key, return_profile_keys=False): | 323 def get_profile_name(self, profile_key, return_profile_keys=False): |
316 return self.params.get_profile_name(profile_key, return_profile_keys) | 324 return self.params.get_profile_name(profile_key, return_profile_keys) |
317 | 325 |
318 def add_to_history(self, from_jid, to_jid, message, _type='chat', extra=None, timestamp=None, profile="@NONE@"): | 326 def add_to_history( |
319 pass | 327 self, |
320 | 328 from_jid, |
321 def contact_add(self, contact_jid, attributes, groups, profile_key='@DEFAULT@'): | 329 to_jid, |
322 pass | 330 message, |
323 | 331 _type="chat", |
324 def set_presence_status(self, contact_jid, show, priority, statuses, profile_key='@DEFAULT@'): | 332 extra=None, |
333 timestamp=None, | |
334 profile="@NONE@", | |
335 ): | |
336 pass | |
337 | |
338 def contact_add(self, contact_jid, attributes, groups, profile_key="@DEFAULT@"): | |
339 pass | |
340 | |
341 def set_presence_status( | |
342 self, contact_jid, show, priority, statuses, profile_key="@DEFAULT@" | |
343 ): | |
325 pass | 344 pass |
326 | 345 |
327 def add_waiting_sub(self, type_, contact_jid, profile_key): | 346 def add_waiting_sub(self, type_, contact_jid, profile_key): |
328 pass | 347 pass |
329 | 348 |
330 def del_waiting_sub(self, contact_jid, profile_key): | 349 def del_waiting_sub(self, contact_jid, profile_key): |
331 pass | 350 pass |
332 | 351 |
333 def update_entity_data(self, entity_jid, key, value, silent=False, profile_key="@NONE@"): | 352 def update_entity_data( |
353 self, entity_jid, key, value, silent=False, profile_key="@NONE@" | |
354 ): | |
334 self.entities_data.setdefault(entity_jid, {}) | 355 self.entities_data.setdefault(entity_jid, {}) |
335 self.entities_data[entity_jid][key] = value | 356 self.entities_data[entity_jid][key] = value |
336 | 357 |
337 def entity_data_get(self, entity_jid, keys, profile_key): | 358 def entity_data_get(self, entity_jid, keys, profile_key): |
338 result = {} | 359 result = {} |
364 if not args and not kwargs: | 385 if not args and not kwargs: |
365 # defaults values setted for the tests only | 386 # defaults values setted for the tests only |
366 kwargs["subscriptionTo"] = True | 387 kwargs["subscriptionTo"] = True |
367 kwargs["subscriptionFrom"] = True | 388 kwargs["subscriptionFrom"] = True |
368 roster_item = RosterItem(jid, *args, **kwargs) | 389 roster_item = RosterItem(jid, *args, **kwargs) |
369 attrs = {'to': b2s(roster_item.subscriptionTo), 'from': b2s(roster_item.subscriptionFrom), 'ask': b2s(roster_item.pendingOut)} | 390 attrs = { |
391 "to": b2s(roster_item.subscriptionTo), | |
392 "from": b2s(roster_item.subscriptionFrom), | |
393 "ask": b2s(roster_item.pendingOut), | |
394 } | |
370 if roster_item.name: | 395 if roster_item.name: |
371 attrs['name'] = roster_item.name | 396 attrs["name"] = roster_item.name |
372 self.host.bridge.expect_call("contact_new", jid.full(), attrs, roster_item.groups, self.parent.profile) | 397 self.host.bridge.expect_call( |
398 "contact_new", jid.full(), attrs, roster_item.groups, self.parent.profile | |
399 ) | |
373 self._jids[jid] = roster_item | 400 self._jids[jid] = roster_item |
374 self._register_item(roster_item) | 401 self._register_item(roster_item) |
375 | 402 |
376 | 403 |
377 class FakeXmlStream(object): | 404 class FakeXmlStream(object): |
384 """Save the sent messages to compare them later. | 411 """Save the sent messages to compare them later. |
385 | 412 |
386 @param obj (domish.Element, str or unicode): message to send | 413 @param obj (domish.Element, str or unicode): message to send |
387 """ | 414 """ |
388 if not isinstance(obj, domish.Element): | 415 if not isinstance(obj, domish.Element): |
389 assert(isinstance(obj, str) or isinstance(obj, str)) | 416 assert isinstance(obj, str) or isinstance(obj, str) |
390 obj = parseXml(obj) | 417 obj = parseXml(obj) |
391 | 418 |
392 if obj.name == 'iq': | 419 if obj.name == "iq": |
393 # IQ request expects an answer, return the request itself so | 420 # IQ request expects an answer, return the request itself so |
394 # you can check if it has been well built by your plugin. | 421 # you can check if it has been well built by your plugin. |
395 self.iqDeferreds[obj['id']].callback(obj) | 422 self.iqDeferreds[obj["id"]].callback(obj) |
396 | 423 |
397 self.sent.append(obj) | 424 self.sent.append(obj) |
398 return defer.succeed(None) | 425 return defer.succeed(None) |
399 | 426 |
400 def addObserver(self, *argv): | 427 def addObserver(self, *argv): |
422 | 449 |
423 def assert_equal_xml(self, xml, expected, ignore_blank=False): | 450 def assert_equal_xml(self, xml, expected, ignore_blank=False): |
424 def equal_elt(got_elt, exp_elt): | 451 def equal_elt(got_elt, exp_elt): |
425 if ignore_blank: | 452 if ignore_blank: |
426 for elt in got_elt, exp_elt: | 453 for elt in got_elt, exp_elt: |
427 for attr in ('text', 'tail'): | 454 for attr in ("text", "tail"): |
428 value = getattr(elt, attr) | 455 value = getattr(elt, attr) |
429 try: | 456 try: |
430 value = value.strip() or None | 457 value = value.strip() or None |
431 except AttributeError: | 458 except AttributeError: |
432 value = None | 459 value = None |
433 setattr(elt, attr, value) | 460 setattr(elt, attr, value) |
434 if (got_elt.tag != exp_elt.tag): | 461 if got_elt.tag != exp_elt.tag: |
435 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | 462 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) |
436 print("tag: got [%s] expected: [%s]" % (got_elt.tag, exp_elt.tag)) | 463 print("tag: got [%s] expected: [%s]" % (got_elt.tag, exp_elt.tag)) |
437 return False | 464 return False |
438 if (got_elt.attrib != exp_elt.attrib): | 465 if got_elt.attrib != exp_elt.attrib: |
439 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | 466 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) |
440 print("attribs: got %s expected %s" % (got_elt.attrib, exp_elt.attrib)) | 467 print("attribs: got %s expected %s" % (got_elt.attrib, exp_elt.attrib)) |
441 return False | 468 return False |
442 if (got_elt.tail != exp_elt.tail or got_elt.text != exp_elt.text): | 469 if got_elt.tail != exp_elt.tail or got_elt.text != exp_elt.text: |
443 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | 470 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) |
444 print("text: got [%s] expected: [%s]" % (got_elt.text, exp_elt.text)) | 471 print("text: got [%s] expected: [%s]" % (got_elt.text, exp_elt.text)) |
445 print("tail: got [%s] expected: [%s]" % (got_elt.tail, exp_elt.tail)) | 472 print("tail: got [%s] expected: [%s]" % (got_elt.tail, exp_elt.tail)) |
446 return False | 473 return False |
447 if (len(got_elt) != len(exp_elt)): | 474 if len(got_elt) != len(exp_elt): |
448 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) | 475 print("XML are not equals (elt %s/%s):" % (got_elt, exp_elt)) |
449 print("children len: got %d expected: %d" % (len(got_elt), len(exp_elt))) | 476 print("children len: got %d expected: %d" % (len(got_elt), len(exp_elt))) |
450 return False | 477 return False |
451 for idx, child in enumerate(got_elt): | 478 for idx, child in enumerate(got_elt): |
452 if not equal_elt(child, exp_elt[idx]): | 479 if not equal_elt(child, exp_elt[idx]): |
453 return False | 480 return False |
454 return True | 481 return True |
455 | 482 |
456 def remove_blank(xml): | 483 def remove_blank(xml): |
457 lines = [line.strip() for line in re.sub(r'[ \t\r\f\v]+', ' ', xml).split('\n')] | 484 lines = [ |
458 return '\n'.join([line for line in lines if line]) | 485 line.strip() for line in re.sub(r"[ \t\r\f\v]+", " ", xml).split("\n") |
486 ] | |
487 return "\n".join([line for line in lines if line]) | |
459 | 488 |
460 xml_elt = etree.fromstring(remove_blank(xml) if ignore_blank else xml) | 489 xml_elt = etree.fromstring(remove_blank(xml) if ignore_blank else xml) |
461 expected_elt = etree.fromstring(remove_blank(expected) if ignore_blank else expected) | 490 expected_elt = etree.fromstring( |
491 remove_blank(expected) if ignore_blank else expected | |
492 ) | |
462 | 493 |
463 if not equal_elt(xml_elt, expected_elt): | 494 if not equal_elt(xml_elt, expected_elt): |
464 print("---") | 495 print("---") |
465 print("XML are not equals:") | 496 print("XML are not equals:") |
466 print("got:\n-\n%s\n-\n\n" % etree.tostring(xml_elt, encoding='utf-8')) | 497 print("got:\n-\n%s\n-\n\n" % etree.tostring(xml_elt, encoding="utf-8")) |
467 print("was expecting:\n-\n%s\n-\n\n" % etree.tostring(expected_elt, encoding='utf-8')) | 498 print( |
499 "was expecting:\n-\n%s\n-\n\n" | |
500 % etree.tostring(expected_elt, encoding="utf-8") | |
501 ) | |
468 print("---") | 502 print("---") |
469 raise DifferentXMLException | 503 raise DifferentXMLException |
470 | 504 |
471 def assert_equal_unsorted_list(self, a, b, msg): | 505 def assert_equal_unsorted_list(self, a, b, msg): |
472 counter_a = Counter(a) | 506 counter_a = Counter(a) |