3415
|
1 #!/usr/bin/env python3 |
|
2 |
|
3 # SàT: an XMPP client |
|
4 # Copyright (C) 2009-2020 Jérôme Poisson (goffi@goffi.org) |
|
5 |
|
6 # This program is free software: you can redistribute it and/or modify |
|
7 # it under the terms of the GNU Affero General Public License as published by |
|
8 # the Free Software Foundation, either version 3 of the License, or |
|
9 # (at your option) any later version. |
|
10 |
|
11 # This program is distributed in the hope that it will be useful, |
|
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
14 # GNU Affero General Public License for more details. |
|
15 |
|
16 # You should have received a copy of the GNU Affero General Public License |
|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
18 |
|
19 import sys |
|
20 import os |
|
21 import tempfile |
|
22 from pathlib import Path |
|
23 from textwrap import dedent |
|
24 import json |
|
25 import pytest |
|
26 from sh import jp |
|
27 |
|
28 |
|
29 class JpJson: |
|
30 """jp like commands parsing result as JSON""" |
|
31 |
|
32 def __init__(self): |
|
33 self.subcommands = [] |
|
34 |
|
35 def __call__(self, *args, **kwargs): |
|
36 args = self.subcommands + list(args) |
|
37 self.subcommands.clear() |
|
38 kwargs['output'] = 'json_raw' |
|
39 kwargs['_tty_out'] = False |
|
40 cmd = jp(*args, **kwargs) |
|
41 return json.loads(cmd.stdout) |
|
42 |
|
43 def __getattr__(self, name): |
|
44 if name.startswith('_'): |
|
45 # no jp subcommand starts with a "_", |
|
46 # and pytest uses some attributes with this name scheme |
|
47 return super().__getattr__(name) |
|
48 self.subcommands.append(name) |
|
49 return self |
|
50 |
|
51 |
|
52 class JpElt(JpJson): |
|
53 """jp like commands parsing result as domishElement""" |
|
54 |
|
55 def __init__(self): |
|
56 super().__init__() |
|
57 from sat.tools.xml_tools import ElementParser |
|
58 self.parser = ElementParser() |
|
59 |
|
60 def __call__(self, *args, **kwargs): |
|
61 args = self.subcommands + list(args) |
|
62 self.subcommands.clear() |
|
63 kwargs['output'] = 'xml_raw' |
|
64 kwargs['_tty_out'] = False |
|
65 cmd = jp(*args, **kwargs) |
|
66 return self.parser(cmd.stdout.decode().strip()) |
|
67 |
|
68 |
|
69 class Editor: |
|
70 |
|
71 def __init__(self): |
|
72 # temporary directory will be deleted Automatically when this object will be |
|
73 # destroyed |
|
74 self.tmp_dir_obj = tempfile.TemporaryDirectory(prefix="sat_e2e_test_editor_") |
|
75 self.tmp_dir_path = Path(self.tmp_dir_obj.name) |
|
76 if not sys.executable: |
|
77 raise Exception("Can't find python executable") |
|
78 self.editor_set = False |
|
79 self.editor_path = self.tmp_dir_path / "editor.py" |
|
80 self.ori_content_path = self.tmp_dir_path / "original_content" |
|
81 self.new_content_path = self.tmp_dir_path / "new_content" |
|
82 self.base_script = dedent(f"""\ |
|
83 #!{sys.executable} |
|
84 import sys |
|
85 |
|
86 def content_filter(content): |
|
87 return {{content_filter}} |
|
88 |
|
89 with open(sys.argv[1], 'r+') as f: |
|
90 original_content = f.read() |
|
91 f.seek(0) |
|
92 new_content = content_filter(original_content) |
|
93 f.write(new_content) |
|
94 f.truncate() |
|
95 |
|
96 with open("{self.ori_content_path}", "w") as f: |
|
97 f.write(original_content) |
|
98 |
|
99 with open("{self.new_content_path}", "w") as f: |
|
100 f.write(new_content) |
|
101 """ |
|
102 ) |
|
103 self._env = os.environ.copy() |
|
104 self._env["EDITOR"] = str(self.editor_path) |
|
105 |
|
106 def set_filter(self, content_filter: str = "content"): |
|
107 """Python code to modify original content |
|
108 |
|
109 The code will be applied to content received by editor. |
|
110 The original content received by editor is in the "content" variable. |
|
111 If filter_ is not specified, original content is written unmodified. |
|
112 Code must be on a single line. |
|
113 """ |
|
114 if '\n' in content_filter: |
|
115 raise ValueError("new lines can't be used in filter_") |
|
116 with self.editor_path.open('w') as f: |
|
117 f.write(self.base_script.format(content_filter=content_filter)) |
|
118 self.editor_path.chmod(0o700) |
|
119 self.editor_set = True |
|
120 |
|
121 @property |
|
122 def env(self): |
|
123 """Get environment variable with the editor set""" |
|
124 if not self.editor_set: |
|
125 self.set_filter() |
|
126 return self._env |
|
127 |
|
128 @property |
|
129 def original_content(self): |
|
130 """Last content received by editor, before any modification |
|
131 |
|
132 returns None if editor has not yet been called |
|
133 """ |
|
134 try: |
|
135 with self.ori_content_path.open() as f: |
|
136 return f.read() |
|
137 except FileNotFoundError: |
|
138 return None |
|
139 |
|
140 @property |
|
141 def new_content(self): |
|
142 """Last content writen by editor |
|
143 |
|
144 This is the final content, after filter has been applied to original content |
|
145 returns None if editor has not yet been called |
|
146 """ |
|
147 try: |
|
148 with self.new_content_path.open() as f: |
|
149 return f.read() |
|
150 except FileNotFoundError: |
|
151 return None |
|
152 |
|
153 |
|
154 @pytest.fixture(scope="session") |
|
155 def jp_json(): |
|
156 """Run jp with "json_raw" output, and returns the parsed value""" |
|
157 return JpJson() |
|
158 |
|
159 |
|
160 @pytest.fixture(scope="session") |
|
161 def jp_elt(): |
|
162 """Run jp with "xml_raw" output, and returns the parsed value""" |
|
163 return JpElt() |
|
164 |
|
165 |
|
166 @pytest.fixture() |
|
167 def editor(): |
|
168 """Create a fake editor to automatise edition from CLI""" |
|
169 return Editor() |