Mercurial > libervia-backend
comparison tests/e2e/conftest.py @ 3415:814e118d9ef3
tests: end-2-end tests first draft:
- e2e tests are launched inside the new docker e2e test environment
- `run_e2e.py` launch the docker container, mount the current code base in it, launch the
e2e tests and print report in real time
- `conftest.py` are pytest fixtures managing many things such as account creation, fake files
management, JSON or Domish.Element parsing, fake editor, etc.
- `test_jp.py` are end-to-end test done with `jp`. `sh` library is used to make tests
writting as user-friendly as possible. The `SAT_TEST_ENV_E2E` environment variable is
checked, and tests will be skipped if it's not set.
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 12 Nov 2020 14:53:16 +0100 |
parents | |
children | d4558f3cbf13 |
comparison
equal
deleted
inserted
replaced
3414:ffe7a6d6018a | 3415:814e118d9ef3 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # SàT: an XMPP client | |
4 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import sys | |
20 import os | |
21 import tempfile | |
22 import string | |
23 import hashlib | |
24 import random | |
25 from pathlib import Path | |
26 from textwrap import dedent | |
27 import json | |
28 import pytest | |
29 from sh import jp | |
30 | |
31 | |
32 class JpJson: | |
33 """jp like commands parsing result as JSON""" | |
34 | |
35 def __init__(self): | |
36 self.subcommands = [] | |
37 | |
38 def __call__(self, *args, **kwargs): | |
39 args = self.subcommands + list(args) | |
40 self.subcommands.clear() | |
41 kwargs['output'] = 'json_raw' | |
42 kwargs['_tty_out'] = False | |
43 cmd = jp(*args, **kwargs) | |
44 return json.loads(cmd.stdout) | |
45 | |
46 def __getattr__(self, name): | |
47 if name.startswith('_'): | |
48 # no jp subcommand starts with a "_", | |
49 # and pytest uses some attributes with this name scheme | |
50 return super().__getattr__(name) | |
51 self.subcommands.append(name) | |
52 return self | |
53 | |
54 | |
55 class JpElt(JpJson): | |
56 """jp like commands parsing result as domishElement""" | |
57 | |
58 def __init__(self): | |
59 super().__init__() | |
60 from sat.tools.xml_tools import ElementParser | |
61 self.parser = ElementParser() | |
62 | |
63 def __call__(self, *args, **kwargs): | |
64 args = self.subcommands + list(args) | |
65 self.subcommands.clear() | |
66 kwargs['output'] = 'xml_raw' | |
67 kwargs['_tty_out'] = False | |
68 cmd = jp(*args, **kwargs) | |
69 return self.parser(cmd.stdout.decode().strip()) | |
70 | |
71 | |
72 class Editor: | |
73 | |
74 def __init__(self): | |
75 # temporary directory will be deleted Automatically when this object will be | |
76 # destroyed | |
77 self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") | |
78 self.tmp_dir_path = Path(self.tmp_dir_obj.name) | |
79 if not sys.executable: | |
80 raise Exception("Can't find python executable") | |
81 self.editor_set = False | |
82 self.editor_path = self.tmp_dir_path / "editor.py" | |
83 self.ori_content_path = self.tmp_dir_path / "original_content" | |
84 self.new_content_path = self.tmp_dir_path / "new_content" | |
85 self.base_script = dedent(f"""\ | |
86 #!{sys.executable} | |
87 import sys | |
88 | |
89 def content_filter(content): | |
90 return {{content_filter}} | |
91 | |
92 with open(sys.argv[1], 'r+') as f: | |
93 original_content = f.read() | |
94 f.seek(0) | |
95 new_content = content_filter(original_content) | |
96 f.write(new_content) | |
97 f.truncate() | |
98 | |
99 with open("{self.ori_content_path}", "w") as f: | |
100 f.write(original_content) | |
101 | |
102 with open("{self.new_content_path}", "w") as f: | |
103 f.write(new_content) | |
104 """ | |
105 ) | |
106 self._env = os.environ.copy() | |
107 self._env["EDITOR"] = str(self.editor_path) | |
108 | |
109 def set_filter(self, content_filter: str = "content"): | |
110 """Python code to modify original content | |
111 | |
112 The code will be applied to content received by editor. | |
113 The original content received by editor is in the "content" variable. | |
114 If filter_ is not specified, original content is written unmodified. | |
115 Code must be on a single line. | |
116 """ | |
117 if '\n' in content_filter: | |
118 raise ValueError("new lines can't be used in filter_") | |
119 with self.editor_path.open('w') as f: | |
120 f.write(self.base_script.format(content_filter=content_filter)) | |
121 self.editor_path.chmod(0o700) | |
122 self.editor_set = True | |
123 | |
124 @property | |
125 def env(self): | |
126 """Get environment variable with the editor set""" | |
127 if not self.editor_set: | |
128 self.set_filter() | |
129 return self._env | |
130 | |
131 @property | |
132 def original_content(self): | |
133 """Last content received by editor, before any modification | |
134 | |
135 returns None if editor has not yet been called | |
136 """ | |
137 try: | |
138 with self.ori_content_path.open() as f: | |
139 return f.read() | |
140 except FileNotFoundError: | |
141 return None | |
142 | |
143 @property | |
144 def new_content(self): | |
145 """Last content writen by editor | |
146 | |
147 This is the final content, after filter has been applied to original content | |
148 returns None if editor has not yet been called | |
149 """ | |
150 try: | |
151 with self.new_content_path.open() as f: | |
152 return f.read() | |
153 except FileNotFoundError: | |
154 return None | |
155 | |
156 | |
157 class FakeFile: | |
158 ALPHABET = f"{string.ascii_letters}{string.digits}_" | |
159 BUF_SIZE = 65535 | |
160 | |
161 def __init__(self): | |
162 self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_files_") | |
163 self.tmp_dir_path = Path(self.tmp_dir_obj.name) | |
164 self.source_files = self.tmp_dir_path / "source" | |
165 self.source_files.mkdir() | |
166 self.dest_files = self.tmp_dir_path / "dest" | |
167 self.dest_files.mkdir() | |
168 self.hashes = {} | |
169 | |
170 @property | |
171 def dest_path(self): | |
172 """Path of a directory where files can be received | |
173 | |
174 The directory will be deleted at the end of session. | |
175 Files from other test can be present, be sure to create a unique subdirectory or | |
176 to use a unique destination file name | |
177 """ | |
178 return self.dest_files | |
179 | |
180 def new_dest_file(self) -> Path: | |
181 """Path to a randomly named destination file | |
182 | |
183 The file will be in self.dest_path. | |
184 The file should be deleted after use. If not, it will be deleted at the end of | |
185 session with the whole temporary test files directory. | |
186 """ | |
187 name = ''.join(random.choices(self.ALPHABET, k=8)) | |
188 return self.dest_files / name | |
189 | |
190 def size(self, size: int, use_cache: bool = True): | |
191 """Create a file of requested size, and returns its path | |
192 | |
193 @param use_cache: if True and a file of this size already exists, it is re-used | |
194 """ | |
195 dest_path = self.source_files / str(size) | |
196 if not use_cache or not dest_path.exists(): | |
197 hash_ = hashlib.sha256() | |
198 remaining = size | |
199 with dest_path.open('wb') as f: | |
200 while remaining: | |
201 if remaining > self.BUF_SIZE: | |
202 to_get = self.BUF_SIZE | |
203 else: | |
204 to_get = remaining | |
205 buf = os.urandom(to_get) | |
206 f.write(buf) | |
207 hash_.update(buf) | |
208 remaining -= to_get | |
209 self.hashes[dest_path] = hash_.hexdigest() | |
210 return dest_path | |
211 | |
212 def get_source_hash(self, source_file: Path) -> str: | |
213 """Retrieve hash calculated for a generated source file""" | |
214 return self.hashes[source_file] | |
215 | |
216 def get_dest_hash(self, dest_file: Path) -> str: | |
217 """Calculate hash of file at given path""" | |
218 hash_ = hashlib.sha256() | |
219 with dest_file.open('rb') as f: | |
220 while True: | |
221 buf = f.read(self.BUF_SIZE) | |
222 if not buf: | |
223 break | |
224 hash_.update(buf) | |
225 return hash_.hexdigest() | |
226 | |
227 | |
228 @pytest.fixture(scope="session") | |
229 def jp_json(): | |
230 """Run jp with "json_raw" output, and returns the parsed value""" | |
231 return JpJson() | |
232 | |
233 | |
234 @pytest.fixture(scope="session") | |
235 def jp_elt(): | |
236 """Run jp with "xml_raw" output, and returns the parsed value""" | |
237 return JpElt() | |
238 | |
239 | |
240 @pytest.fixture(scope="session") | |
241 def test_profiles(): | |
242 """Test accounts created using in-band registration | |
243 | |
244 They will be removed at the end of session. | |
245 The number of account per servers is set in the "accounts_by_servers" dict. | |
246 Jids are in the form "account[x]@server[y].test". | |
247 The profiles used are in the form "account[x]" for server1.test, and | |
248 "account[x]_s[y]" for other servers. | |
249 Password is "test" for all profiles and XMPP accounts. | |
250 "account1" is connected and set as default profile | |
251 Profiles created are returned as a tuple | |
252 """ | |
253 profiles = [] | |
254 nb_servers = 3 | |
255 accounts_by_servers = { | |
256 1: 1, | |
257 2: 1, | |
258 3: 0, | |
259 } | |
260 for server_idx in range(1, nb_servers+1): | |
261 account_stop = accounts_by_servers[server_idx] + 1 | |
262 for account_idx in range(1, account_stop): | |
263 profile_suff = f"_s{server_idx}" if server_idx>1 else "" | |
264 profile = f"account{account_idx}{profile_suff}" | |
265 profiles.append(profile) | |
266 jp.account.create( | |
267 f"account{account_idx}@server{server_idx}.test", | |
268 "test", | |
269 profile=profile, | |
270 host=f"server{server_idx}.test" | |
271 ) | |
272 jp.profile.modify(profile="account1", default=True, connect=True) | |
273 jp.profile.connect(profile="account1_s2", connect=True) | |
274 yield tuple(profiles) | |
275 for profile in profiles: | |
276 jp.account.delete(profile=profile, connect=True, force=True) | |
277 jp.profile.delete(profile, force=True) | |
278 | |
279 | |
280 @pytest.fixture(scope="class") | |
281 def pubsub_nodes(test_profiles): | |
282 """Create 2 testing nodes | |
283 | |
284 Both nodes will be created with "account1" profile, named "test" and have and "open" | |
285 access model. | |
286 One node will account1's PEP, the other one on pubsub.server1.test. | |
287 """ | |
288 jp.pubsub.node.create( | |
289 "-f", "access_model", "open", | |
290 node="test", | |
291 profile="account1", connect=True | |
292 ) | |
293 jp.pubsub.node.create( | |
294 "-f", "access_model", "open", | |
295 service="pubsub.server1.test", node="test", | |
296 profile="account1" | |
297 ) | |
298 yield | |
299 jp.pubsub.node.delete( | |
300 node="test", | |
301 profile="account1", connect=True, | |
302 force=True | |
303 ) | |
304 jp.pubsub.node.delete( | |
305 service="pubsub.server1.test", node="test", | |
306 profile="account1", | |
307 force=True | |
308 ) | |
309 | |
310 | |
311 @pytest.fixture() | |
312 def editor(): | |
313 """Create a fake editor to automatise edition from CLI""" | |
314 return Editor() | |
315 | |
316 | |
317 @pytest.fixture(scope="session") | |
318 def fake_file(): | |
319 """Manage dummy files creation and destination path""" | |
320 return FakeFile() |