comparison libervia/backend/test/test_memory.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/test/test_memory.py@524856bd7b19
children
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT: a jabber client
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from libervia.backend.core.i18n import _
21 from libervia.backend.test import helpers
22 from twisted.trial import unittest
23 import traceback
24 from .constants import Const
25 from xml.dom import minidom
26
27
28 class MemoryTest(unittest.TestCase):
29 def setUp(self):
30 self.host = helpers.FakeSAT()
31
32 def _get_param_xml(self, param="1", security_level=None):
33 """Generate XML for testing parameters
34
35 @param param (str): a subset of "123"
36 @param security_level: security level of the parameters
37 @return (str)
38 """
39
40 def get_param(name):
41 return """
42 <param name="%(param_name)s" label="%(param_label)s" value="true" type="bool" %(security)s/>
43 """ % {
44 "param_name": name,
45 "param_label": _(name),
46 "security": ""
47 if security_level is None
48 else ('security="%d"' % security_level),
49 }
50
51 params = ""
52 if "1" in param:
53 params += get_param(Const.ENABLE_UNIBOX_PARAM)
54 if "2" in param:
55 params += get_param(Const.PARAM_IN_QUOTES)
56 if "3" in param:
57 params += get_param("Dummy param")
58 return """
59 <params>
60 <individual>
61 <category name="%(category_name)s" label="%(category_label)s">
62 %(params)s
63 </category>
64 </individual>
65 </params>
66 """ % {
67 "category_name": Const.COMPOSITION_KEY,
68 "category_label": _(Const.COMPOSITION_KEY),
69 "params": params,
70 }
71
72 def _param_exists(self, param="1", src=None):
73 """
74
75 @param param (str): a character in "12"
76 @param src (DOM element): the top-level element to look in
77 @return: True is the param exists
78 """
79 if param == "1":
80 name = Const.ENABLE_UNIBOX_PARAM
81 else:
82 name = Const.PARAM_IN_QUOTES
83 category = Const.COMPOSITION_KEY
84 if src is None:
85 src = self.host.memory.params.dom.documentElement
86 for type_node in src.childNodes:
87 # when src comes self.host.memory.params.dom, we have here
88 # some "individual" or "general" elements, when it comes
89 # from Memory.get_params we have here a "params" elements
90 if type_node.nodeName not in ("individual", "general", "params"):
91 continue
92 for cat_node in type_node.childNodes:
93 if (
94 cat_node.nodeName != "category"
95 or cat_node.getAttribute("name") != category
96 ):
97 continue
98 for param in cat_node.childNodes:
99 if param.nodeName == "param" and param.getAttribute("name") == name:
100 return True
101 return False
102
103 def assert_param_generic(self, param="1", src=None, exists=True, deferred=False):
104 """
105 @param param (str): a character in "12"
106 @param src (DOM element): the top-level element to look in
107 @param exists (boolean): True to assert the param exists, False to assert it doesn't
108 @param deferred (boolean): True if this method is called from a Deferred callback
109 """
110 msg = (
111 "Expected parameter not found!\n"
112 if exists
113 else "Unexpected parameter found!\n"
114 )
115 if deferred:
116 # in this stack we can see the line where the error came from,
117 # if limit=5, 6 is not enough you can increase the value
118 msg += "\n".join(traceback.format_stack(limit=5 if exists else 6))
119 assertion = self._param_exists(param, src)
120 getattr(self, "assert%s" % exists)(assertion, msg)
121
122 def assert_param_exists(self, param="1", src=None):
123 self.assert_param_generic(param, src, True)
124
125 def assert_param_not_exists(self, param="1", src=None):
126 self.assert_param_generic(param, src, False)
127
128 def assert_param_exists_async(self, src, param="1"):
129 """@param src: a deferred result from Memory.get_params"""
130 self.assert_param_generic(
131 param, minidom.parseString(src.encode("utf-8")), True, True
132 )
133
134 def assert_param_not_exists_async(self, src, param="1"):
135 """@param src: a deferred result from Memory.get_params"""
136 self.assert_param_generic(
137 param, minidom.parseString(src.encode("utf-8")), False, True
138 )
139
140 def _get_params(self, security_limit, app="", profile_key="@NONE@"):
141 """Get the parameters accessible with the given security limit and application name.
142
143 @param security_limit (int): the security limit
144 @param app (str): empty string or "libervia"
145 @param profile_key
146 """
147 if profile_key == "@NONE@":
148 profile_key = "@DEFAULT@"
149 return self.host.memory.params.get_params(security_limit, app, profile_key)
150
151 def test_update_params(self):
152 self.host.memory.reinit()
153 # check if the update works
154 self.host.memory.update_params(self._get_param_xml())
155 self.assert_param_exists()
156 previous = self.host.memory.params.dom.cloneNode(True)
157 # now check if it is really updated and not duplicated
158 self.host.memory.update_params(self._get_param_xml())
159 self.assertEqual(
160 previous.toxml().encode("utf-8"),
161 self.host.memory.params.dom.toxml().encode("utf-8"),
162 )
163
164 self.host.memory.reinit()
165 # check successive updates (without intersection)
166 self.host.memory.update_params(self._get_param_xml("1"))
167 self.assert_param_exists("1")
168 self.assert_param_not_exists("2")
169 self.host.memory.update_params(self._get_param_xml("2"))
170 self.assert_param_exists("1")
171 self.assert_param_exists("2")
172
173 previous = self.host.memory.params.dom.cloneNode(True) # save for later
174
175 self.host.memory.reinit()
176 # check successive updates (with intersection)
177 self.host.memory.update_params(self._get_param_xml("1"))
178 self.assert_param_exists("1")
179 self.assert_param_not_exists("2")
180 self.host.memory.update_params(self._get_param_xml("12"))
181 self.assert_param_exists("1")
182 self.assert_param_exists("2")
183
184 # successive updates with or without intersection should have the same result
185 self.assertEqual(
186 previous.toxml().encode("utf-8"),
187 self.host.memory.params.dom.toxml().encode("utf-8"),
188 )
189
190 self.host.memory.reinit()
191 # one update with two params in a new category
192 self.host.memory.update_params(self._get_param_xml("12"))
193 self.assert_param_exists("1")
194 self.assert_param_exists("2")
195
196 def test_get_params(self):
197 # tests with no security level on the parameter (most secure)
198 params = self._get_param_xml()
199 self.host.memory.reinit()
200 self.host.memory.update_params(params)
201 self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async)
202 self._get_params(0).addCallback(self.assert_param_not_exists_async)
203 self._get_params(1).addCallback(self.assert_param_not_exists_async)
204 # tests with security level 0 on the parameter (not secure)
205 params = self._get_param_xml(security_level=0)
206 self.host.memory.reinit()
207 self.host.memory.update_params(params)
208 self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async)
209 self._get_params(0).addCallback(self.assert_param_exists_async)
210 self._get_params(1).addCallback(self.assert_param_exists_async)
211 # tests with security level 1 on the parameter (more secure)
212 params = self._get_param_xml(security_level=1)
213 self.host.memory.reinit()
214 self.host.memory.update_params(params)
215 self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async)
216 self._get_params(0).addCallback(self.assert_param_not_exists_async)
217 return self._get_params(1).addCallback(self.assert_param_exists_async)
218
219 def test_params_register_app(self):
220 def register(xml, security_limit, app):
221 """
222 @param xml: XML definition of the parameters to be added
223 @param security_limit: -1 means no security, 0 is the maximum security then the higher the less secure
224 @param app: name of the frontend registering the parameters
225 """
226 helpers.mute_logging()
227 self.host.memory.params_register_app(xml, security_limit, app)
228 helpers.unmute_logging()
229
230 # tests with no security level on the parameter (most secure)
231 params = self._get_param_xml()
232 self.host.memory.reinit()
233 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME)
234 self.assert_param_exists()
235 self.host.memory.reinit()
236 register(params, 0, Const.APP_NAME)
237 self.assert_param_not_exists()
238 self.host.memory.reinit()
239 register(params, 1, Const.APP_NAME)
240 self.assert_param_not_exists()
241
242 # tests with security level 0 on the parameter (not secure)
243 params = self._get_param_xml(security_level=0)
244 self.host.memory.reinit()
245 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME)
246 self.assert_param_exists()
247 self.host.memory.reinit()
248 register(params, 0, Const.APP_NAME)
249 self.assert_param_exists()
250 self.host.memory.reinit()
251 register(params, 1, Const.APP_NAME)
252 self.assert_param_exists()
253
254 # tests with security level 1 on the parameter (more secure)
255 params = self._get_param_xml(security_level=1)
256 self.host.memory.reinit()
257 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME)
258 self.assert_param_exists()
259 self.host.memory.reinit()
260 register(params, 0, Const.APP_NAME)
261 self.assert_param_not_exists()
262 self.host.memory.reinit()
263 register(params, 1, Const.APP_NAME)
264 self.assert_param_exists()
265
266 # tests with security level 1 and several parameters being registered
267 params = self._get_param_xml("12", security_level=1)
268 self.host.memory.reinit()
269 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME)
270 self.assert_param_exists()
271 self.assert_param_exists("2")
272 self.host.memory.reinit()
273 register(params, 0, Const.APP_NAME)
274 self.assert_param_not_exists()
275 self.assert_param_not_exists("2")
276 self.host.memory.reinit()
277 register(params, 1, Const.APP_NAME)
278 self.assert_param_exists()
279 self.assert_param_exists("2")
280
281 # tests with several parameters being registered in an existing category
282 self.host.memory.reinit()
283 self.host.memory.update_params(self._get_param_xml("3"))
284 register(self._get_param_xml("12"), Const.NO_SECURITY_LIMIT, Const.APP_NAME)
285 self.assert_param_exists()
286 self.assert_param_exists("2")
287 self.host.memory.reinit()
288
289 def test_params_register_app_get_params(self):
290 # test retrieving the parameter for a specific frontend
291 self.host.memory.reinit()
292 params = self._get_param_xml(security_level=1)
293 self.host.memory.params_register_app(params, 1, Const.APP_NAME)
294 self._get_params(1, "").addCallback(self.assert_param_exists_async)
295 self._get_params(1, Const.APP_NAME).addCallback(self.assert_param_exists_async)
296 self._get_params(1, "another_dummy_frontend").addCallback(
297 self.assert_param_not_exists_async
298 )
299
300 # the same with several parameters registered at the same time
301 self.host.memory.reinit()
302 params = self._get_param_xml("12", security_level=0)
303 self.host.memory.params_register_app(params, 5, Const.APP_NAME)
304 self._get_params(5, "").addCallback(self.assert_param_exists_async)
305 self._get_params(5, "").addCallback(self.assert_param_exists_async, "2")
306 self._get_params(5, Const.APP_NAME).addCallback(self.assert_param_exists_async)
307 self._get_params(5, Const.APP_NAME).addCallback(self.assert_param_exists_async, "2")
308 self._get_params(5, "another_dummy_frontend").addCallback(
309 self.assert_param_not_exists_async
310 )
311 return self._get_params(5, "another_dummy_frontend").addCallback(
312 self.assert_param_not_exists_async, "2"
313 )