Mercurial > libervia-backend
comparison libervia/backend/test/test_memory.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/test/test_memory.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT: a jabber client | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from libervia.backend.core.i18n import _ | |
21 from libervia.backend.test import helpers | |
22 from twisted.trial import unittest | |
23 import traceback | |
24 from .constants import Const | |
25 from xml.dom import minidom | |
26 | |
27 | |
28 class MemoryTest(unittest.TestCase): | |
29 def setUp(self): | |
30 self.host = helpers.FakeSAT() | |
31 | |
32 def _get_param_xml(self, param="1", security_level=None): | |
33 """Generate XML for testing parameters | |
34 | |
35 @param param (str): a subset of "123" | |
36 @param security_level: security level of the parameters | |
37 @return (str) | |
38 """ | |
39 | |
40 def get_param(name): | |
41 return """ | |
42 <param name="%(param_name)s" label="%(param_label)s" value="true" type="bool" %(security)s/> | |
43 """ % { | |
44 "param_name": name, | |
45 "param_label": _(name), | |
46 "security": "" | |
47 if security_level is None | |
48 else ('security="%d"' % security_level), | |
49 } | |
50 | |
51 params = "" | |
52 if "1" in param: | |
53 params += get_param(Const.ENABLE_UNIBOX_PARAM) | |
54 if "2" in param: | |
55 params += get_param(Const.PARAM_IN_QUOTES) | |
56 if "3" in param: | |
57 params += get_param("Dummy param") | |
58 return """ | |
59 <params> | |
60 <individual> | |
61 <category name="%(category_name)s" label="%(category_label)s"> | |
62 %(params)s | |
63 </category> | |
64 </individual> | |
65 </params> | |
66 """ % { | |
67 "category_name": Const.COMPOSITION_KEY, | |
68 "category_label": _(Const.COMPOSITION_KEY), | |
69 "params": params, | |
70 } | |
71 | |
72 def _param_exists(self, param="1", src=None): | |
73 """ | |
74 | |
75 @param param (str): a character in "12" | |
76 @param src (DOM element): the top-level element to look in | |
77 @return: True is the param exists | |
78 """ | |
79 if param == "1": | |
80 name = Const.ENABLE_UNIBOX_PARAM | |
81 else: | |
82 name = Const.PARAM_IN_QUOTES | |
83 category = Const.COMPOSITION_KEY | |
84 if src is None: | |
85 src = self.host.memory.params.dom.documentElement | |
86 for type_node in src.childNodes: | |
87 # when src comes self.host.memory.params.dom, we have here | |
88 # some "individual" or "general" elements, when it comes | |
89 # from Memory.get_params we have here a "params" elements | |
90 if type_node.nodeName not in ("individual", "general", "params"): | |
91 continue | |
92 for cat_node in type_node.childNodes: | |
93 if ( | |
94 cat_node.nodeName != "category" | |
95 or cat_node.getAttribute("name") != category | |
96 ): | |
97 continue | |
98 for param in cat_node.childNodes: | |
99 if param.nodeName == "param" and param.getAttribute("name") == name: | |
100 return True | |
101 return False | |
102 | |
103 def assert_param_generic(self, param="1", src=None, exists=True, deferred=False): | |
104 """ | |
105 @param param (str): a character in "12" | |
106 @param src (DOM element): the top-level element to look in | |
107 @param exists (boolean): True to assert the param exists, False to assert it doesn't | |
108 @param deferred (boolean): True if this method is called from a Deferred callback | |
109 """ | |
110 msg = ( | |
111 "Expected parameter not found!\n" | |
112 if exists | |
113 else "Unexpected parameter found!\n" | |
114 ) | |
115 if deferred: | |
116 # in this stack we can see the line where the error came from, | |
117 # if limit=5, 6 is not enough you can increase the value | |
118 msg += "\n".join(traceback.format_stack(limit=5 if exists else 6)) | |
119 assertion = self._param_exists(param, src) | |
120 getattr(self, "assert%s" % exists)(assertion, msg) | |
121 | |
122 def assert_param_exists(self, param="1", src=None): | |
123 self.assert_param_generic(param, src, True) | |
124 | |
125 def assert_param_not_exists(self, param="1", src=None): | |
126 self.assert_param_generic(param, src, False) | |
127 | |
128 def assert_param_exists_async(self, src, param="1"): | |
129 """@param src: a deferred result from Memory.get_params""" | |
130 self.assert_param_generic( | |
131 param, minidom.parseString(src.encode("utf-8")), True, True | |
132 ) | |
133 | |
134 def assert_param_not_exists_async(self, src, param="1"): | |
135 """@param src: a deferred result from Memory.get_params""" | |
136 self.assert_param_generic( | |
137 param, minidom.parseString(src.encode("utf-8")), False, True | |
138 ) | |
139 | |
140 def _get_params(self, security_limit, app="", profile_key="@NONE@"): | |
141 """Get the parameters accessible with the given security limit and application name. | |
142 | |
143 @param security_limit (int): the security limit | |
144 @param app (str): empty string or "libervia" | |
145 @param profile_key | |
146 """ | |
147 if profile_key == "@NONE@": | |
148 profile_key = "@DEFAULT@" | |
149 return self.host.memory.params.get_params(security_limit, app, profile_key) | |
150 | |
151 def test_update_params(self): | |
152 self.host.memory.reinit() | |
153 # check if the update works | |
154 self.host.memory.update_params(self._get_param_xml()) | |
155 self.assert_param_exists() | |
156 previous = self.host.memory.params.dom.cloneNode(True) | |
157 # now check if it is really updated and not duplicated | |
158 self.host.memory.update_params(self._get_param_xml()) | |
159 self.assertEqual( | |
160 previous.toxml().encode("utf-8"), | |
161 self.host.memory.params.dom.toxml().encode("utf-8"), | |
162 ) | |
163 | |
164 self.host.memory.reinit() | |
165 # check successive updates (without intersection) | |
166 self.host.memory.update_params(self._get_param_xml("1")) | |
167 self.assert_param_exists("1") | |
168 self.assert_param_not_exists("2") | |
169 self.host.memory.update_params(self._get_param_xml("2")) | |
170 self.assert_param_exists("1") | |
171 self.assert_param_exists("2") | |
172 | |
173 previous = self.host.memory.params.dom.cloneNode(True) # save for later | |
174 | |
175 self.host.memory.reinit() | |
176 # check successive updates (with intersection) | |
177 self.host.memory.update_params(self._get_param_xml("1")) | |
178 self.assert_param_exists("1") | |
179 self.assert_param_not_exists("2") | |
180 self.host.memory.update_params(self._get_param_xml("12")) | |
181 self.assert_param_exists("1") | |
182 self.assert_param_exists("2") | |
183 | |
184 # successive updates with or without intersection should have the same result | |
185 self.assertEqual( | |
186 previous.toxml().encode("utf-8"), | |
187 self.host.memory.params.dom.toxml().encode("utf-8"), | |
188 ) | |
189 | |
190 self.host.memory.reinit() | |
191 # one update with two params in a new category | |
192 self.host.memory.update_params(self._get_param_xml("12")) | |
193 self.assert_param_exists("1") | |
194 self.assert_param_exists("2") | |
195 | |
196 def test_get_params(self): | |
197 # tests with no security level on the parameter (most secure) | |
198 params = self._get_param_xml() | |
199 self.host.memory.reinit() | |
200 self.host.memory.update_params(params) | |
201 self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async) | |
202 self._get_params(0).addCallback(self.assert_param_not_exists_async) | |
203 self._get_params(1).addCallback(self.assert_param_not_exists_async) | |
204 # tests with security level 0 on the parameter (not secure) | |
205 params = self._get_param_xml(security_level=0) | |
206 self.host.memory.reinit() | |
207 self.host.memory.update_params(params) | |
208 self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async) | |
209 self._get_params(0).addCallback(self.assert_param_exists_async) | |
210 self._get_params(1).addCallback(self.assert_param_exists_async) | |
211 # tests with security level 1 on the parameter (more secure) | |
212 params = self._get_param_xml(security_level=1) | |
213 self.host.memory.reinit() | |
214 self.host.memory.update_params(params) | |
215 self._get_params(Const.NO_SECURITY_LIMIT).addCallback(self.assert_param_exists_async) | |
216 self._get_params(0).addCallback(self.assert_param_not_exists_async) | |
217 return self._get_params(1).addCallback(self.assert_param_exists_async) | |
218 | |
219 def test_params_register_app(self): | |
220 def register(xml, security_limit, app): | |
221 """ | |
222 @param xml: XML definition of the parameters to be added | |
223 @param security_limit: -1 means no security, 0 is the maximum security then the higher the less secure | |
224 @param app: name of the frontend registering the parameters | |
225 """ | |
226 helpers.mute_logging() | |
227 self.host.memory.params_register_app(xml, security_limit, app) | |
228 helpers.unmute_logging() | |
229 | |
230 # tests with no security level on the parameter (most secure) | |
231 params = self._get_param_xml() | |
232 self.host.memory.reinit() | |
233 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) | |
234 self.assert_param_exists() | |
235 self.host.memory.reinit() | |
236 register(params, 0, Const.APP_NAME) | |
237 self.assert_param_not_exists() | |
238 self.host.memory.reinit() | |
239 register(params, 1, Const.APP_NAME) | |
240 self.assert_param_not_exists() | |
241 | |
242 # tests with security level 0 on the parameter (not secure) | |
243 params = self._get_param_xml(security_level=0) | |
244 self.host.memory.reinit() | |
245 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) | |
246 self.assert_param_exists() | |
247 self.host.memory.reinit() | |
248 register(params, 0, Const.APP_NAME) | |
249 self.assert_param_exists() | |
250 self.host.memory.reinit() | |
251 register(params, 1, Const.APP_NAME) | |
252 self.assert_param_exists() | |
253 | |
254 # tests with security level 1 on the parameter (more secure) | |
255 params = self._get_param_xml(security_level=1) | |
256 self.host.memory.reinit() | |
257 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) | |
258 self.assert_param_exists() | |
259 self.host.memory.reinit() | |
260 register(params, 0, Const.APP_NAME) | |
261 self.assert_param_not_exists() | |
262 self.host.memory.reinit() | |
263 register(params, 1, Const.APP_NAME) | |
264 self.assert_param_exists() | |
265 | |
266 # tests with security level 1 and several parameters being registered | |
267 params = self._get_param_xml("12", security_level=1) | |
268 self.host.memory.reinit() | |
269 register(params, Const.NO_SECURITY_LIMIT, Const.APP_NAME) | |
270 self.assert_param_exists() | |
271 self.assert_param_exists("2") | |
272 self.host.memory.reinit() | |
273 register(params, 0, Const.APP_NAME) | |
274 self.assert_param_not_exists() | |
275 self.assert_param_not_exists("2") | |
276 self.host.memory.reinit() | |
277 register(params, 1, Const.APP_NAME) | |
278 self.assert_param_exists() | |
279 self.assert_param_exists("2") | |
280 | |
281 # tests with several parameters being registered in an existing category | |
282 self.host.memory.reinit() | |
283 self.host.memory.update_params(self._get_param_xml("3")) | |
284 register(self._get_param_xml("12"), Const.NO_SECURITY_LIMIT, Const.APP_NAME) | |
285 self.assert_param_exists() | |
286 self.assert_param_exists("2") | |
287 self.host.memory.reinit() | |
288 | |
289 def test_params_register_app_get_params(self): | |
290 # test retrieving the parameter for a specific frontend | |
291 self.host.memory.reinit() | |
292 params = self._get_param_xml(security_level=1) | |
293 self.host.memory.params_register_app(params, 1, Const.APP_NAME) | |
294 self._get_params(1, "").addCallback(self.assert_param_exists_async) | |
295 self._get_params(1, Const.APP_NAME).addCallback(self.assert_param_exists_async) | |
296 self._get_params(1, "another_dummy_frontend").addCallback( | |
297 self.assert_param_not_exists_async | |
298 ) | |
299 | |
300 # the same with several parameters registered at the same time | |
301 self.host.memory.reinit() | |
302 params = self._get_param_xml("12", security_level=0) | |
303 self.host.memory.params_register_app(params, 5, Const.APP_NAME) | |
304 self._get_params(5, "").addCallback(self.assert_param_exists_async) | |
305 self._get_params(5, "").addCallback(self.assert_param_exists_async, "2") | |
306 self._get_params(5, Const.APP_NAME).addCallback(self.assert_param_exists_async) | |
307 self._get_params(5, Const.APP_NAME).addCallback(self.assert_param_exists_async, "2") | |
308 self._get_params(5, "another_dummy_frontend").addCallback( | |
309 self.assert_param_not_exists_async | |
310 ) | |
311 return self._get_params(5, "another_dummy_frontend").addCallback( | |
312 self.assert_param_not_exists_async, "2" | |
313 ) |