Mercurial > libervia-backend
view libervia/backend/test/test_memory.py @ 4151:18026ce0819c
core (xmpp): message reception workflow refactoring:
- Call methods from a root async one instead of using Deferred callbacks chain.
- Use a queue to be sure to process messages in order.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 22 Nov 2023 14:50:35 +0100 |
parents | 4b842c1fb686 |
children | 0d7bb4df2343 |
line wrap: on
line source
#!/usr/bin/env python3 # SAT: a jabber client # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.core.i18n import _ from libervia.backend.test import helpers from twisted.trial import unittest import traceback from .constants import Const from xml.dom import minidom class MemoryTest(unittest.TestCase): def setUp(self): self.host = helpers.FakeSAT() def _get_param_xml(self, param="1", security_level=None): """Generate XML for testing parameters @param param (str): a subset of "123" @param security_level: security level of the parameters @return (str) """ def get_param(name): return """ <param name="%(param_name)s" label="%(param_label)s" value="true" type="bool" %(security)s/> """ % { "param_name": name, "param_label": _(name), "security": "" if security_level is None else ('security="%d"' % security_level), } params = "" if "1" in param: params += get_param(Const.ENABLE_UNIBOX_PARAM) if "2" in param: params += get_param(Const.PARAM_IN_QUOTES) if "3" in param: params += get_param("Dummy param") return """ <params> <individual> <category name="%(category_name)s" label="%(category_label)s"> %(params)s </category> </individual> </params> """ % { "category_name": Const.COMPOSITION_KEY, "category_label": _(Const.COMPOSITION_KEY), "params": params, } def _param_exists(self, param="1", src=None): """ @param param (str): a character in "12" @param src (DOM element): the top-level element to look in @return: True is the param exists """ if param == "1": name = Const.ENABLE_UNIBOX_PARAM else: name = Const.PARAM_IN_QUOTES category = Const.COMPOSITION_KEY if src is None: src = self.host.memory.params.dom.documentElement for type_node in src.childNodes: # when src comes self.host.memory.params.dom, we have here # some "individual" or "general" elements, when it comes # from Memory.get_params we have here a "params" elements if type_node.nodeName not in ("individual", "general", "params"): continue for cat_node in type_node.childNodes: if ( cat_node.nodeName != "category" or cat_node.getAttribute("name") != category ): continue for param in cat_node.childNodes: if param.nodeName == "param" and param.getAttribute("name") == name: return True return False def assert_param_generic(self, param="1", src=None, exists=True, deferred=False): """ @param param (str): a character in "12" @param src (DOM element): the top-level element to look in @param exists (boolean): True to assert the param exists, False to assert it doesn't @param deferred (boolean): True if this method is called from a Deferred callback """ msg = ( "Expected parameter not found!\n" if exists else "Unexpected parameter found!\n" ) if deferred: # in this stack we can see the line where the error came from, # if limit=5, 6 is not enough you can increase the value msg += "\n".join(traceback.format_stack(limit=5 if exists else 6)) assertion = self._param_exists(param, src) getattr(self, "assert%s" % exists)(assertion, msg) def assert_param_exists(self, param="1", src=None): self.assert_param_generic(param, src, True) def assert_param_not_exists(self, param="1", src=None): self.assert_param_generic(param, src, False) def assert_param_exists_async(self, src, param="1"): """@param src: a deferred result from Memory.get_params""" self.assert_param_generic( param, minidom.parseString(src.encode("utf-8")), True, True ) def assert_param_not_exists_async(self, src, param="1"): """@param src: a deferred result from Memory.get_params""" self.assert_param_generic( param, minidom.parseString(src.encode("utf-8")), False, True ) def _get_params(self, security_limit, app="", profile_key="@NONE@"): """Get the parameters accessible with the given security limit and application name. @param security_limit (int): the security limit @param app (str): empty string or "libervia" @param profile_key """ if profile_key == "@NONE@": profile_key = "@DEFAULT@" return self.host.memory.params.get_params(security_limit, app, profile_key) def test_update_params(self): self.host.memory.reinit() # check if the update works self.host.memory.update_params(self._get_param_xml()) self.assert_param_exists() previous = self.host.memory.params.dom.cloneNode(True) # now check if it is really updated and not duplicated self.host.memory.update_params(self._get_param_xml()) self.assertEqual( previous.toxml().encode("utf-8"), self.host.memory.params.dom.toxml().encode("utf-8"), ) self.host.memory.reinit() # check successive updates (without intersection) self.host.memory.update_params(self._get_param_xml("1")) self.assert_param_exists("1") self.assert_param_not_exists("2") self.host.memory.update_params(self._get_param_xml("2")) self.assert_param_exists("1") self.assert_param_exists("2") previous = self.host.memory.params.dom.cloneNode(True) # save for later self.host.memory.reinit() # check successive updates (with intersection) self.host.memory.update_params(self._get_param_xml("1")) self.assert_param_exists("1") self.assert_param_not_exists("2") self.host.memory.update_params(self._get_param_xml("12")) self.assert_param_exists("1") self.assert_param_exists("2") # successive updates with or without intersection should have the same result self.assertEqual( previous.toxml().encode("utf-8"), self.host.memory.params.dom.toxml().encode("utf-8"), ) self.host.memory.reinit() # one update with two params in a new category self.host.memory.update_params(self._get_param_xml("12")) self.assert_param_exists("1") self.assert_param_exists("2") def test_get_params(self): # tests with no security level on the parameter (most secure) params = self._get_param_xml() self.host.memory.reinit() self.host.memory.update_params(params) self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async) self._get_params(0).addCallback(self.assert_param_not_exists_async) self._get_params(1).addCallback(self.assert_param_not_exists_async) # tests with security level 0 on the parameter (not secure) params = self._get_param_xml(security_level=0) self.host.memory.reinit() self.host.memory.update_params(params) self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async) self._get_params(0).addCallback(self.assert_param_exists_async) self._get_params(1).addCallback(self.assert_param_exists_async) # tests with security level 1 on the parameter (more secure) params = self._get_param_xml(security_level=1) self.host.memory.reinit() self.host.memory.update_params(params) self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async) self._get_params(0).addCallback(self.assert_param_not_exists_async) return self._get_params(1).addCallback(self.assert_param_exists_async) def test_params_register_app(self): def register(xml, security_limit, app): """ @param xml: XML definition of the parameters to be added @param security_limit: -1 means no security, 0 is the maximum security then the higher the less secure @param app: name of the frontend registering the parameters """ helpers.mute_logging() self.host.memory.params_register_app(xml, security_limit, app) helpers.unmute_logging() # tests with no security level on the parameter (most secure) params = self._get_param_xml() self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assert_param_exists() self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assert_param_not_exists() self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assert_param_not_exists() # tests with security level 0 on the parameter (not secure) params = self._get_param_xml(security_level=0) self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assert_param_exists() self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assert_param_exists() self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assert_param_exists() # tests with security level 1 on the parameter (more secure) params = self._get_param_xml(security_level=1) self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assert_param_exists() self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assert_param_not_exists() self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assert_param_exists() # tests with security level 1 and several parameters being registered params = self._get_param_xml("12", security_level=1) self.host.memory.reinit() register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assert_param_exists() self.assert_param_exists("2") self.host.memory.reinit() register(params, 0, Const.APP_NAME) self.assert_param_not_exists() self.assert_param_not_exists("2") self.host.memory.reinit() register(params, 1, Const.APP_NAME) self.assert_param_exists() self.assert_param_exists("2") # tests with several parameters being registered in an existing category self.host.memory.reinit() self.host.memory.update_params(self._get_param_xml("3")) register(self._get_param_xml("12"), Const.NO_SECURITY_LIMIT, Const.APP_NAME) self.assert_param_exists() self.assert_param_exists("2") self.host.memory.reinit() def test_params_register_app_get_params(self): # test retrieving the parameter for a specific frontend self.host.memory.reinit() params = self._get_param_xml(security_level=1) self.host.memory.params_register_app(params, 1, Const.APP_NAME) self._get_params(1, "").addCallback(self.assert_param_exists_async) self._get_params(1, Const.APP_NAME).addCallback(self.assert_param_exists_async) self._get_params(1, "another_dummy_frontend").addCallback( self.assert_param_not_exists_async ) # the same with several parameters registered at the same time self.host.memory.reinit() params = self._get_param_xml("12", security_level=0) self.host.memory.params_register_app(params, 5, Const.APP_NAME) self._get_params(5, "").addCallback(self.assert_param_exists_async) self._get_params(5, "").addCallback(self.assert_param_exists_async, "2") self._get_params(5, Const.APP_NAME).addCallback(self.assert_param_exists_async) self._get_params(5, Const.APP_NAME).addCallback(self.assert_param_exists_async, "2") self._get_params(5, "another_dummy_frontend").addCallback( self.assert_param_not_exists_async ) return self._get_params(5, "another_dummy_frontend").addCallback( self.assert_param_not_exists_async, "2" )