Mercurial > libervia-backend
view tests/e2e/conftest.py @ 3422:c97476c41f0f
docker (backend_e2e): set `xmpp_domain` in conf.
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 16 Nov 2020 14:51:13 +0100 |
parents | 814e118d9ef3 |
children | d4558f3cbf13 |
line wrap: on
line source
#!/usr/bin/env python3 # SàT: an XMPP client # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys import os import tempfile import string import hashlib import random from pathlib import Path from textwrap import dedent import json import pytest from sh import jp class JpJson: """jp like commands parsing result as JSON""" def __init__(self): self.subcommands = [] def __call__(self, *args, **kwargs): args = self.subcommands + list(args) self.subcommands.clear() kwargs['output'] = 'json_raw' kwargs['_tty_out'] = False cmd = jp(*args, **kwargs) return json.loads(cmd.stdout) def __getattr__(self, name): if name.startswith('_'): # no jp subcommand starts with a "_", # and pytest uses some attributes with this name scheme return super().__getattr__(name) self.subcommands.append(name) return self class JpElt(JpJson): """jp like commands parsing result as domishElement""" def __init__(self): super().__init__() from sat.tools.xml_tools import ElementParser self.parser = ElementParser() def __call__(self, *args, **kwargs): args = self.subcommands + list(args) self.subcommands.clear() kwargs['output'] = 'xml_raw' kwargs['_tty_out'] = False cmd = jp(*args, **kwargs) return self.parser(cmd.stdout.decode().strip()) class Editor: def __init__(self): # temporary directory will be deleted Automatically when this object will be # destroyed self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") self.tmp_dir_path = Path(self.tmp_dir_obj.name) if not sys.executable: raise Exception("Can't find python executable") self.editor_set = False self.editor_path = self.tmp_dir_path / "editor.py" self.ori_content_path = self.tmp_dir_path / "original_content" self.new_content_path = self.tmp_dir_path / "new_content" self.base_script = dedent(f"""\ #!{sys.executable} import sys def content_filter(content): return {{content_filter}} with open(sys.argv[1], 'r+') as f: original_content = f.read() f.seek(0) new_content = content_filter(original_content) f.write(new_content) f.truncate() with open("{self.ori_content_path}", "w") as f: f.write(original_content) with open("{self.new_content_path}", "w") as f: f.write(new_content) """ ) self._env = os.environ.copy() self._env["EDITOR"] = str(self.editor_path) def set_filter(self, content_filter: str = "content"): """Python code to modify original content The code will be applied to content received by editor. The original content received by editor is in the "content" variable. If filter_ is not specified, original content is written unmodified. Code must be on a single line. """ if '\n' in content_filter: raise ValueError("new lines can't be used in filter_") with self.editor_path.open('w') as f: f.write(self.base_script.format(content_filter=content_filter)) self.editor_path.chmod(0o700) self.editor_set = True @property def env(self): """Get environment variable with the editor set""" if not self.editor_set: self.set_filter() return self._env @property def original_content(self): """Last content received by editor, before any modification returns None if editor has not yet been called """ try: with self.ori_content_path.open() as f: return f.read() except FileNotFoundError: return None @property def new_content(self): """Last content writen by editor This is the final content, after filter has been applied to original content returns None if editor has not yet been called """ try: with self.new_content_path.open() as f: return f.read() except FileNotFoundError: return None class FakeFile: ALPHABET = f"{string.ascii_letters}{string.digits}_" BUF_SIZE = 65535 def __init__(self): self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_files_") self.tmp_dir_path = Path(self.tmp_dir_obj.name) self.source_files = self.tmp_dir_path / "source" self.source_files.mkdir() self.dest_files = self.tmp_dir_path / "dest" self.dest_files.mkdir() self.hashes = {} @property def dest_path(self): """Path of a directory where files can be received The directory will be deleted at the end of session. Files from other test can be present, be sure to create a unique subdirectory or to use a unique destination file name """ return self.dest_files def new_dest_file(self) -> Path: """Path to a randomly named destination file The file will be in self.dest_path. The file should be deleted after use. If not, it will be deleted at the end of session with the whole temporary test files directory. """ name = ''.join(random.choices(self.ALPHABET, k=8)) return self.dest_files / name def size(self, size: int, use_cache: bool = True): """Create a file of requested size, and returns its path @param use_cache: if True and a file of this size already exists, it is re-used """ dest_path = self.source_files / str(size) if not use_cache or not dest_path.exists(): hash_ = hashlib.sha256() remaining = size with dest_path.open('wb') as f: while remaining: if remaining > self.BUF_SIZE: to_get = self.BUF_SIZE else: to_get = remaining buf = os.urandom(to_get) f.write(buf) hash_.update(buf) remaining -= to_get self.hashes[dest_path] = hash_.hexdigest() return dest_path def get_source_hash(self, source_file: Path) -> str: """Retrieve hash calculated for a generated source file""" return self.hashes[source_file] def get_dest_hash(self, dest_file: Path) -> str: """Calculate hash of file at given path""" hash_ = hashlib.sha256() with dest_file.open('rb') as f: while True: buf = f.read(self.BUF_SIZE) if not buf: break hash_.update(buf) return hash_.hexdigest() @pytest.fixture(scope="session") def jp_json(): """Run jp with "json_raw" output, and returns the parsed value""" return JpJson() @pytest.fixture(scope="session") def jp_elt(): """Run jp with "xml_raw" output, and returns the parsed value""" return JpElt() @pytest.fixture(scope="session") def test_profiles(): """Test accounts created using in-band registration They will be removed at the end of session. The number of account per servers is set in the "accounts_by_servers" dict. Jids are in the form "account[x]@server[y].test". The profiles used are in the form "account[x]" for server1.test, and "account[x]_s[y]" for other servers. Password is "test" for all profiles and XMPP accounts. "account1" is connected and set as default profile Profiles created are returned as a tuple """ profiles = [] nb_servers = 3 accounts_by_servers = { 1: 1, 2: 1, 3: 0, } for server_idx in range(1, nb_servers+1): account_stop = accounts_by_servers[server_idx] + 1 for account_idx in range(1, account_stop): profile_suff = f"_s{server_idx}" if server_idx>1 else "" profile = f"account{account_idx}{profile_suff}" profiles.append(profile) jp.account.create( f"account{account_idx}@server{server_idx}.test", "test", profile=profile, host=f"server{server_idx}.test" ) jp.profile.modify(profile="account1", default=True, connect=True) jp.profile.connect(profile="account1_s2", connect=True) yield tuple(profiles) for profile in profiles: jp.account.delete(profile=profile, connect=True, force=True) jp.profile.delete(profile, force=True) @pytest.fixture(scope="class") def pubsub_nodes(test_profiles): """Create 2 testing nodes Both nodes will be created with "account1" profile, named "test" and have and "open" access model. One node will account1's PEP, the other one on pubsub.server1.test. """ jp.pubsub.node.create( "-f", "access_model", "open", node="test", profile="account1", connect=True ) jp.pubsub.node.create( "-f", "access_model", "open", service="pubsub.server1.test", node="test", profile="account1" ) yield jp.pubsub.node.delete( node="test", profile="account1", connect=True, force=True ) jp.pubsub.node.delete( service="pubsub.server1.test", node="test", profile="account1", force=True ) @pytest.fixture() def editor(): """Create a fake editor to automatise edition from CLI""" return Editor() @pytest.fixture(scope="session") def fake_file(): """Manage dummy files creation and destination path""" return FakeFile()