3415
|
1 #!/usr/bin/env python3 |
|
2 |
|
3 # SàT: an XMPP client |
|
4 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) |
|
5 |
|
6 # This program is free software: you can redistribute it and/or modify |
|
7 # it under the terms of the GNU Affero General Public License as published by |
|
8 # the Free Software Foundation, either version 3 of the License, or |
|
9 # (at your option) any later version. |
|
10 |
|
11 # This program is distributed in the hope that it will be useful, |
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
14 # GNU Affero General Public License for more details. |
|
15 |
|
16 # You should have received a copy of the GNU Affero General Public License |
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
18 |
|
19 import sys |
|
20 import os |
|
21 import tempfile |
|
22 import string |
|
23 import hashlib |
|
24 import random |
|
25 from pathlib import Path |
|
26 from textwrap import dedent |
|
27 import json |
|
28 import pytest |
|
29 from sh import jp |
|
30 |
|
31 |
|
32 class JpJson: |
|
33 """jp like commands parsing result as JSON""" |
|
34 |
|
35 def __init__(self): |
|
36 self.subcommands = [] |
|
37 |
|
38 def __call__(self, *args, **kwargs): |
|
39 args = self.subcommands + list(args) |
|
40 self.subcommands.clear() |
|
41 kwargs['output'] = 'json_raw' |
|
42 kwargs['_tty_out'] = False |
|
43 cmd = jp(*args, **kwargs) |
|
44 return json.loads(cmd.stdout) |
|
45 |
|
46 def __getattr__(self, name): |
|
47 if name.startswith('_'): |
|
48 # no jp subcommand starts with a "_", |
|
49 # and pytest uses some attributes with this name scheme |
|
50 return super().__getattr__(name) |
|
51 self.subcommands.append(name) |
|
52 return self |
|
53 |
|
54 |
|
55 class JpElt(JpJson): |
|
56 """jp like commands parsing result as domishElement""" |
|
57 |
|
58 def __init__(self): |
|
59 super().__init__() |
|
60 from sat.tools.xml_tools import ElementParser |
|
61 self.parser = ElementParser() |
|
62 |
|
63 def __call__(self, *args, **kwargs): |
|
64 args = self.subcommands + list(args) |
|
65 self.subcommands.clear() |
|
66 kwargs['output'] = 'xml_raw' |
|
67 kwargs['_tty_out'] = False |
|
68 cmd = jp(*args, **kwargs) |
|
69 return self.parser(cmd.stdout.decode().strip()) |
|
70 |
|
71 |
|
72 class Editor: |
|
73 |
|
74 def __init__(self): |
|
75 # temporary directory will be deleted Automatically when this object will be |
|
76 # destroyed |
|
77 self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") |
|
78 self.tmp_dir_path = Path(self.tmp_dir_obj.name) |
|
79 if not sys.executable: |
|
80 raise Exception("Can't find python executable") |
|
81 self.editor_set = False |
|
82 self.editor_path = self.tmp_dir_path / "editor.py" |
|
83 self.ori_content_path = self.tmp_dir_path / "original_content" |
|
84 self.new_content_path = self.tmp_dir_path / "new_content" |
|
85 self.base_script = dedent(f"""\ |
|
86 #!{sys.executable} |
|
87 import sys |
|
88 |
|
89 def content_filter(content): |
|
90 return {{content_filter}} |
|
91 |
|
92 with open(sys.argv[1], 'r+') as f: |
|
93 original_content = f.read() |
|
94 f.seek(0) |
|
95 new_content = content_filter(original_content) |
|
96 f.write(new_content) |
|
97 f.truncate() |
|
98 |
|
99 with open("{self.ori_content_path}", "w") as f: |
|
100 f.write(original_content) |
|
101 |
|
102 with open("{self.new_content_path}", "w") as f: |
|
103 f.write(new_content) |
|
104 """ |
|
105 ) |
|
106 self._env = os.environ.copy() |
|
107 self._env["EDITOR"] = str(self.editor_path) |
|
108 |
|
109 def set_filter(self, content_filter: str = "content"): |
|
110 """Python code to modify original content |
|
111 |
|
112 The code will be applied to content received by editor. |
|
113 The original content received by editor is in the "content" variable. |
|
114 If filter_ is not specified, original content is written unmodified. |
|
115 Code must be on a single line. |
|
116 """ |
|
117 if '\n' in content_filter: |
|
118 raise ValueError("new lines can't be used in filter_") |
|
119 with self.editor_path.open('w') as f: |
|
120 f.write(self.base_script.format(content_filter=content_filter)) |
|
121 self.editor_path.chmod(0o700) |
|
122 self.editor_set = True |
|
123 |
|
124 @property |
|
125 def env(self): |
|
126 """Get environment variable with the editor set""" |
|
127 if not self.editor_set: |
|
128 self.set_filter() |
|
129 return self._env |
|
130 |
|
131 @property |
|
132 def original_content(self): |
|
133 """Last content received by editor, before any modification |
|
134 |
|
135 returns None if editor has not yet been called |
|
136 """ |
|
137 try: |
|
138 with self.ori_content_path.open() as f: |
|
139 return f.read() |
|
140 except FileNotFoundError: |
|
141 return None |
|
142 |
|
143 @property |
|
144 def new_content(self): |
|
145 """Last content writen by editor |
|
146 |
|
147 This is the final content, after filter has been applied to original content |
|
148 returns None if editor has not yet been called |
|
149 """ |
|
150 try: |
|
151 with self.new_content_path.open() as f: |
|
152 return f.read() |
|
153 except FileNotFoundError: |
|
154 return None |
|
155 |
|
156 |
|
157 class FakeFile: |
|
158 ALPHABET = f"{string.ascii_letters}{string.digits}_" |
|
159 BUF_SIZE = 65535 |
|
160 |
|
161 def __init__(self): |
|
162 self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_files_") |
|
163 self.tmp_dir_path = Path(self.tmp_dir_obj.name) |
|
164 self.source_files = self.tmp_dir_path / "source" |
|
165 self.source_files.mkdir() |
|
166 self.dest_files = self.tmp_dir_path / "dest" |
|
167 self.dest_files.mkdir() |
|
168 self.hashes = {} |
|
169 |
|
170 @property |
|
171 def dest_path(self): |
|
172 """Path of a directory where files can be received |
|
173 |
|
174 The directory will be deleted at the end of session. |
|
175 Files from other test can be present, be sure to create a unique subdirectory or |
|
176 to use a unique destination file name |
|
177 """ |
|
178 return self.dest_files |
|
179 |
|
180 def new_dest_file(self) -> Path: |
|
181 """Path to a randomly named destination file |
|
182 |
|
183 The file will be in self.dest_path. |
|
184 The file should be deleted after use. If not, it will be deleted at the end of |
|
185 session with the whole temporary test files directory. |
|
186 """ |
|
187 name = ''.join(random.choices(self.ALPHABET, k=8)) |
|
188 return self.dest_files / name |
|
189 |
|
190 def size(self, size: int, use_cache: bool = True): |
|
191 """Create a file of requested size, and returns its path |
|
192 |
|
193 @param use_cache: if True and a file of this size already exists, it is re-used |
|
194 """ |
|
195 dest_path = self.source_files / str(size) |
|
196 if not use_cache or not dest_path.exists(): |
|
197 hash_ = hashlib.sha256() |
|
198 remaining = size |
|
199 with dest_path.open('wb') as f: |
|
200 while remaining: |
|
201 if remaining > self.BUF_SIZE: |
|
202 to_get = self.BUF_SIZE |
|
203 else: |
|
204 to_get = remaining |
|
205 buf = os.urandom(to_get) |
|
206 f.write(buf) |
|
207 hash_.update(buf) |
|
208 remaining -= to_get |
|
209 self.hashes[dest_path] = hash_.hexdigest() |
|
210 return dest_path |
|
211 |
|
212 def get_source_hash(self, source_file: Path) -> str: |
|
213 """Retrieve hash calculated for a generated source file""" |
|
214 return self.hashes[source_file] |
|
215 |
|
216 def get_dest_hash(self, dest_file: Path) -> str: |
|
217 """Calculate hash of file at given path""" |
|
218 hash_ = hashlib.sha256() |
|
219 with dest_file.open('rb') as f: |
|
220 while True: |
|
221 buf = f.read(self.BUF_SIZE) |
|
222 if not buf: |
|
223 break |
|
224 hash_.update(buf) |
|
225 return hash_.hexdigest() |
|
226 |
|
227 |
|
228 @pytest.fixture(scope="session") |
|
229 def jp_json(): |
|
230 """Run jp with "json_raw" output, and returns the parsed value""" |
|
231 return JpJson() |
|
232 |
|
233 |
|
234 @pytest.fixture(scope="session") |
|
235 def jp_elt(): |
|
236 """Run jp with "xml_raw" output, and returns the parsed value""" |
|
237 return JpElt() |
|
238 |
|
239 |
|
240 @pytest.fixture(scope="session") |
|
241 def test_profiles(): |
|
242 """Test accounts created using in-band registration |
|
243 |
|
244 They will be removed at the end of session. |
|
245 The number of account per servers is set in the "accounts_by_servers" dict. |
|
246 Jids are in the form "account[x]@server[y].test". |
|
247 The profiles used are in the form "account[x]" for server1.test, and |
|
248 "account[x]_s[y]" for other servers. |
|
249 Password is "test" for all profiles and XMPP accounts. |
|
250 "account1" is connected and set as default profile |
|
251 Profiles created are returned as a tuple |
|
252 """ |
|
253 profiles = [] |
|
254 nb_servers = 3 |
|
255 accounts_by_servers = { |
|
256 1: 1, |
|
257 2: 1, |
|
258 3: 0, |
|
259 } |
|
260 for server_idx in range(1, nb_servers+1): |
|
261 account_stop = accounts_by_servers[server_idx] + 1 |
|
262 for account_idx in range(1, account_stop): |
|
263 profile_suff = f"_s{server_idx}" if server_idx>1 else "" |
|
264 profile = f"account{account_idx}{profile_suff}" |
|
265 profiles.append(profile) |
|
266 jp.account.create( |
|
267 f"account{account_idx}@server{server_idx}.test", |
|
268 "test", |
|
269 profile=profile, |
|
270 host=f"server{server_idx}.test" |
|
271 ) |
|
272 jp.profile.modify(profile="account1", default=True, connect=True) |
|
273 jp.profile.connect(profile="account1_s2", connect=True) |
|
274 yield tuple(profiles) |
|
275 for profile in profiles: |
|
276 jp.account.delete(profile=profile, connect=True, force=True) |
|
277 jp.profile.delete(profile, force=True) |
|
278 |
|
279 |
|
280 @pytest.fixture(scope="class") |
|
281 def pubsub_nodes(test_profiles): |
|
282 """Create 2 testing nodes |
|
283 |
|
284 Both nodes will be created with "account1" profile, named "test" and have and "open" |
|
285 access model. |
|
286 One node will account1's PEP, the other one on pubsub.server1.test. |
|
287 """ |
|
288 jp.pubsub.node.create( |
|
289 "-f", "access_model", "open", |
|
290 node="test", |
|
291 profile="account1", connect=True |
|
292 ) |
|
293 jp.pubsub.node.create( |
|
294 "-f", "access_model", "open", |
|
295 service="pubsub.server1.test", node="test", |
|
296 profile="account1" |
|
297 ) |
|
298 yield |
|
299 jp.pubsub.node.delete( |
|
300 node="test", |
|
301 profile="account1", connect=True, |
|
302 force=True |
|
303 ) |
|
304 jp.pubsub.node.delete( |
|
305 service="pubsub.server1.test", node="test", |
|
306 profile="account1", |
|
307 force=True |
|
308 ) |
|
309 |
|
310 |
|
311 @pytest.fixture() |
|
312 def editor(): |
|
313 """Create a fake editor to automatise edition from CLI""" |
|
314 return Editor() |
|
315 |
|
316 |
|
317 @pytest.fixture(scope="session") |
|
318 def fake_file(): |
|
319 """Manage dummy files creation and destination path""" |
|
320 return FakeFile() |