comparison tests/e2e/libervia-cli/test_libervia-cli.py @ 4305:4cd4922de876

tests: reformat tests using black.
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:11:56 +0200
parents f59e9421a650
children
comparison
equal deleted inserted replaced
4304:92a886f31581 4305:4cd4922de876
32 32
33 33
34 if os.getenv("LIBERVIA_TEST_ENV_E2E") is None: 34 if os.getenv("LIBERVIA_TEST_ENV_E2E") is None:
35 pytest.skip( 35 pytest.skip(
36 "skipping end-to-end tests, we are not in a test environment", 36 "skipping end-to-end tests, we are not in a test environment",
37 allow_module_level=True 37 allow_module_level=True,
38 ) 38 )
39 39
40 40
41 pytestmark = pytest.mark.usefixtures("test_profiles") 41 pytestmark = pytest.mark.usefixtures("test_profiles")
42 42
50 class TestLiberviaCliAccount: 50 class TestLiberviaCliAccount:
51 51
52 def test_create_and_delete(self, li_json): 52 def test_create_and_delete(self, li_json):
53 """Create an account in-band, connect it, then delete it and its profile""" 53 """Create an account in-band, connect it, then delete it and its profile"""
54 li.account.create( 54 li.account.create(
55 "test_create@server1.test", 55 "test_create@server1.test", "test", profile="test_create", host="server1.test"
56 "test",
57 profile="test_create",
58 host="server1.test"
59 ) 56 )
60 profiles = li_json.profile.list() 57 profiles = li_json.profile.list()
61 assert "test_create" in profiles 58 assert "test_create" in profiles
62 li.profile.connect(connect=True, profile="test_create") 59 li.profile.connect(connect=True, profile="test_create")
63 li.account.delete(profile="test_create", force=True) 60 li.account.delete(profile="test_create", force=True)
94 item1_id = li.pubsub.set(node="test", quiet=True, _in=payload) 91 item1_id = li.pubsub.set(node="test", quiet=True, _in=payload)
95 item2_id = li.pubsub.set(node="test", quiet=True, _in=payload) 92 item2_id = li.pubsub.set(node="test", quiet=True, _in=payload)
96 item3_id = li.pubsub.set(node="test", quiet=True, _in=payload) 93 item3_id = li.pubsub.set(node="test", quiet=True, _in=payload)
97 parsed_elt = li_elt.pubsub.get(node="test", item=item1_id) 94 parsed_elt = li_elt.pubsub.get(node="test", item=item1_id)
98 payload = parsed_elt.firstChildElement() 95 payload = parsed_elt.firstChildElement()
99 assert payload.name == 'test' 96 assert payload.name == "test"
100 assert str(payload) == content 97 assert str(payload) == content
101 parsed_elt = li_elt.pubsub.get(node="test", item=item2_id) 98 parsed_elt = li_elt.pubsub.get(node="test", item=item2_id)
102 payload = parsed_elt.firstChildElement() 99 payload = parsed_elt.firstChildElement()
103 assert payload.name == 'test' 100 assert payload.name == "test"
104 assert str(payload) == content 101 assert str(payload) == content
105 parsed_elt = li_elt.pubsub.get(node="test", item=item3_id) 102 parsed_elt = li_elt.pubsub.get(node="test", item=item3_id)
106 payload = parsed_elt.firstChildElement() 103 payload = parsed_elt.firstChildElement()
107 assert payload.name == 'test' 104 assert payload.name == "test"
108 assert str(payload) == content 105 assert str(payload) == content
109 106
110 # deleting first item should work 107 # deleting first item should work
111 li.pubsub.delete(node="test", item=item1_id, force=True) 108 li.pubsub.delete(node="test", item=item1_id, force=True)
112 with pytest.raises(sh.ErrorReturnCode_16): 109 with pytest.raises(sh.ErrorReturnCode_16):
129 li.pubsub.edit(node="test", item=item_id, _env=editor.env) 126 li.pubsub.edit(node="test", item=item_id, _env=editor.env)
130 assert "original item" in editor.original_content 127 assert "original item" in editor.original_content
131 parsed_elt = li_elt.pubsub.get(node="test", item=item_id) 128 parsed_elt = li_elt.pubsub.get(node="test", item=item_id)
132 edited_payload = parsed_elt.firstChildElement() 129 edited_payload = parsed_elt.firstChildElement()
133 expected_edited_content = content.replace("original", "edited") 130 expected_edited_content = content.replace("original", "edited")
134 assert edited_payload.name == 'test' 131 assert edited_payload.name == "test"
135 assert str(edited_payload) == expected_edited_content 132 assert str(edited_payload) == expected_edited_content
136 133
137 def test_affiliations(self, li_json): 134 def test_affiliations(self, li_json):
138 affiliations = li_json.pubsub.affiliations() 135 affiliations = li_json.pubsub.affiliations()
139 assert affiliations["test"] == "owner" 136 assert affiliations["test"] == "owner"
140 137
141 def test_uri(self): 138 def test_uri(self):
142 built_uri = li.pubsub.uri( 139 built_uri = li.pubsub.uri(service="pubsub.example.net", node="some_node").strip()
143 service="pubsub.example.net", node="some_node"
144 ).strip()
145 assert built_uri == "xmpp:pubsub.example.net?;node=some_node" 140 assert built_uri == "xmpp:pubsub.example.net?;node=some_node"
146 built_uri = li.pubsub.uri( 141 built_uri = li.pubsub.uri(
147 service="pubsub.example.net", node="some_node", item="some_item" 142 service="pubsub.example.net", node="some_node", item="some_item"
148 ).strip() 143 ).strip()
149 assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item" 144 assert built_uri == "xmpp:pubsub.example.net?;node=some_node;item=some_item"
152 """A Full-Text Search query can be done""" 147 """A Full-Text Search query can be done"""
153 sk_txt = "this is a blog post about Slovakia" 148 sk_txt = "this is a blog post about Slovakia"
154 fr_txt = "this is a blog post about France" 149 fr_txt = "this is a blog post about France"
155 nc_txt = "this is a blog post about New Caledonia" 150 nc_txt = "this is a blog post about New Caledonia"
156 au_txt = "this is a blog post about Australia" 151 au_txt = "this is a blog post about Australia"
152 li.blog.set("-t", "travel", "-t", "europe", _in=sk_txt, syntax="markdown")
153 li.blog.set("-t", "travel", "-t", "europe", _in=fr_txt, syntax="markdown")
154 li.blog.set("-t", "travel", "-t", "south pacific", _in=nc_txt, syntax="markdown")
157 li.blog.set( 155 li.blog.set(
158 "-t", "travel", "-t", "europe", 156 "-t",
159 _in=sk_txt, 157 "travel",
160 syntax="markdown" 158 "-t",
161 ) 159 "south pacific",
162 li.blog.set(
163 "-t", "travel", "-t", "europe",
164 _in=fr_txt,
165 syntax="markdown"
166 )
167 li.blog.set(
168 "-t", "travel", "-t", "south pacific",
169 _in=nc_txt,
170 syntax="markdown"
171 )
172 li.blog.set(
173 "-t", "travel", "-t", "south pacific",
174 _in="this is a blog post about Australia", 160 _in="this is a blog post about Australia",
175 title=au_txt, 161 title=au_txt,
176 syntax="markdown" 162 syntax="markdown",
177 ) 163 )
178 # we get the blog to activate the cache for it 164 # we get the blog to activate the cache for it
179 li.blog.get(max_items=1) 165 li.blog.get(max_items=1)
180 # FTS 166 # FTS
181 found = [] 167 found = []
213 def test_set_get(self, li_json): 199 def test_set_get(self, li_json):
214 li.blog.set(_in="markdown **bold** [link](https://example.net)") 200 li.blog.set(_in="markdown **bold** [link](https://example.net)")
215 item_data = li_json.blog.get(max=1, before="") 201 item_data = li_json.blog.get(max=1, before="")
216 item = item_data[0][0] 202 item = item_data[0][0]
217 metadata = item_data[1] 203 metadata = item_data[1]
218 assert metadata['service'] == "account1@server1.test" 204 assert metadata["service"] == "account1@server1.test"
219 assert metadata['node'] == self.MICROBLOG_NS 205 assert metadata["node"] == self.MICROBLOG_NS
220 assert metadata['rsm'].keys() <= {"first", "last", "index", "count"} 206 assert metadata["rsm"].keys() <= {"first", "last", "index", "count"}
221 item_id = item['id'] 207 item_id = item["id"]
222 expected_uri = uri.build_xmpp_uri( 208 expected_uri = uri.build_xmpp_uri(
223 'pubsub', subtype="microblog", path="account1@server1.test", 209 "pubsub",
224 node=self.MICROBLOG_NS, item=item_id 210 subtype="microblog",
225 ) 211 path="account1@server1.test",
226 assert item['uri'] == expected_uri 212 node=self.MICROBLOG_NS,
227 assert item['content_xhtml'] == ( 213 item=item_id,
228 '<div><p>markdown <strong>bold</strong> ' 214 )
215 assert item["uri"] == expected_uri
216 assert item["content_xhtml"] == (
217 "<div><p>markdown <strong>bold</strong> "
229 '<a href="https://example.net">link</a></p></div>' 218 '<a href="https://example.net">link</a></p></div>'
230 ) 219 )
231 assert isinstance(item['published'], int) 220 assert isinstance(item["published"], int)
232 assert isinstance(item['updated'], int) 221 assert isinstance(item["updated"], int)
233 assert isinstance(item['comments'], list) 222 assert isinstance(item["comments"], list)
234 assert isinstance(item['tags'], list) 223 assert isinstance(item["tags"], list)
235 assert item['author'] == 'account1' 224 assert item["author"] == "account1"
236 assert item['author_jid'] == 'account1@server1.test' 225 assert item["author_jid"] == "account1@server1.test"
237 226
238 def test_edit(self, editor, li_json): 227 def test_edit(self, editor, li_json):
239 payload_md = "content in **markdown**" 228 payload_md = "content in **markdown**"
240 editor.set_filter(repr(payload_md)) 229 editor.set_filter(repr(payload_md))
241 li.blog.edit(_env=editor.env) 230 li.blog.edit(_env=editor.env)
242 assert len(editor.original_content) == 0 231 assert len(editor.original_content) == 0
243 assert editor.new_content == payload_md 232 assert editor.new_content == payload_md
244 items_data = li_json.blog.get(max_items=1) 233 items_data = li_json.blog.get(max_items=1)
245 last_item = items_data[0][0] 234 last_item = items_data[0][0]
246 last_item_id = last_item['id'] 235 last_item_id = last_item["id"]
247 assert last_item['content'] == "content in markdown" 236 assert last_item["content"] == "content in markdown"
248 assert last_item['content_xhtml'] == ( 237 assert last_item["content_xhtml"] == (
249 "<div><p>content in <strong>markdown</strong></p></div>" 238 "<div><p>content in <strong>markdown</strong></p></div>"
250 ) 239 )
251 editor.set_filter('f"{content} extended"') 240 editor.set_filter('f"{content} extended"')
252 li.blog.edit("--last-item", _env=editor.env) 241 li.blog.edit("--last-item", _env=editor.env)
253 assert editor.original_content == payload_md 242 assert editor.original_content == payload_md
254 assert editor.new_content == f"{payload_md} extended" 243 assert editor.new_content == f"{payload_md} extended"
255 items_data = li_json.blog.get(max_items=1) 244 items_data = li_json.blog.get(max_items=1)
256 last_item = items_data[0][0] 245 last_item = items_data[0][0]
257 # we check that the id hasn't been modified 246 # we check that the id hasn't been modified
258 assert last_item['id'] == last_item_id 247 assert last_item["id"] == last_item_id
259 assert last_item['content'] == "content in markdown extended" 248 assert last_item["content"] == "content in markdown extended"
260 assert last_item['content_xhtml'] == ( 249 assert last_item["content_xhtml"] == (
261 "<div><p>content in <strong>markdown</strong> extended</p></div>" 250 "<div><p>content in <strong>markdown</strong> extended</p></div>"
262 ) 251 )
263 252
264 253
265 class TestLiberviaCliFile: 254 class TestLiberviaCliFile:
284 send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True) 273 send_cmd = li.file.send(source_file, "account1@server2.test", _bg=True)
285 dest_path = fake_file.dest_files / "test_send_receive" 274 dest_path = fake_file.dest_files / "test_send_receive"
286 dest_path.mkdir() 275 dest_path.mkdir()
287 try: 276 try:
288 li.file.receive( 277 li.file.receive(
289 "account1@server1.test", profile="account1_s2", path=dest_path) 278 "account1@server1.test", profile="account1_s2", path=dest_path
279 )
290 dest_file = dest_path / source_file.name 280 dest_file = dest_path / source_file.name
291 dest_file_hash = fake_file.get_dest_hash(dest_file) 281 dest_file_hash = fake_file.get_dest_hash(dest_file)
292 finally: 282 finally:
293 shutil.rmtree(dest_path) 283 shutil.rmtree(dest_path)
294 send_cmd.wait() 284 send_cmd.wait()
303 ) 293 )
304 dest_path = fake_file.dest_files / "test_send_receive" 294 dest_path = fake_file.dest_files / "test_send_receive"
305 dest_path.mkdir() 295 dest_path.mkdir()
306 try: 296 try:
307 li.file.receive( 297 li.file.receive(
308 "account1@server1.test", profile="account1_s2", path=dest_path) 298 "account1@server1.test", profile="account1_s2", path=dest_path
299 )
309 dest_file = dest_path / source_file.name 300 dest_file = dest_path / source_file.name
310 dest_file_hash = fake_file.get_dest_hash(dest_file) 301 dest_file_hash = fake_file.get_dest_hash(dest_file)
311 finally: 302 finally:
312 shutil.rmtree(dest_path) 303 shutil.rmtree(dest_path)
313 send_cmd.wait() 304 send_cmd.wait()
314 305
315 assert source_file_hash == dest_file_hash 306 assert source_file_hash == dest_file_hash
316 307
317 308
318
319 class TestE2EEncryption: 309 class TestE2EEncryption:
320 310
321 def test_pubsub_encryption_oxps(self, li_elt): 311 def test_pubsub_encryption_oxps(self, li_elt):
322 secret_blog = "this is a secret blog post" 312 secret_blog = "this is a secret blog post"
323 node = "e2ee_blog" 313 node = "e2ee_blog"
324 li.blog.set(_in=secret_blog, node="e2ee_blog", item="test_e2ee", encrypt=True) 314 li.blog.set(_in=secret_blog, node="e2ee_blog", item="test_e2ee", encrypt=True)
325 315
326 # the item should be transparently decrypted 316 # the item should be transparently decrypted
327 parsed_decrypted = li_elt.pubsub.get( 317 parsed_decrypted = li_elt.pubsub.get(node=node, item="test_e2ee", no_cache=True)
328 node=node, item="test_e2ee", no_cache=True
329 )
330 entry_elt = parsed_decrypted.firstChildElement() 318 entry_elt = parsed_decrypted.firstChildElement()
331 assert entry_elt.name == "entry" 319 assert entry_elt.name == "entry"
332 assert entry_elt.uri == NS_ATOM 320 assert entry_elt.uri == NS_ATOM
333 assert secret_blog in parsed_decrypted.toXml() 321 assert secret_blog in parsed_decrypted.toXml()
334 322
342 # the body must not be readable in plain text 330 # the body must not be readable in plain text
343 assert secret_blog not in parsed_ori_item.toXml() 331 assert secret_blog not in parsed_ori_item.toXml()
344 332
345 def test_pubsub_secrets_sharing_oxps(self, li_elt): 333 def test_pubsub_secrets_sharing_oxps(self, li_elt):
346 secret_blog = "this is a secret blog post" 334 secret_blog = "this is a secret blog post"
347 node="secret_sharing" 335 node = "secret_sharing"
348 336
349 li.blog.set(_in=secret_blog, node=node, item="test_e2ee", encrypt=True) 337 li.blog.set(_in=secret_blog, node=node, item="test_e2ee", encrypt=True)
350 338
351 # the item must not be decrypted for account1_s2 (secret is not known) 339 # the item must not be decrypted for account1_s2 (secret is not known)
352 parsed_item = li_elt.pubsub.get( 340 parsed_item = li_elt.pubsub.get(
353 service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, 341 service="account1@server1.test",
354 profile="account1_s2" 342 node=node,
343 item="test_e2ee",
344 no_cache=True,
345 profile="account1_s2",
355 ) 346 )
356 encrypted_elt = parsed_item.firstChildElement() 347 encrypted_elt = parsed_item.firstChildElement()
357 assert encrypted_elt.name == "encrypted" 348 assert encrypted_elt.name == "encrypted"
358 assert encrypted_elt.uri == NS_OXPS 349 assert encrypted_elt.uri == NS_OXPS
359 # the body must not be readable in plain text 350 # the body must not be readable in plain text
360 assert secret_blog not in parsed_item.toXml() 351 assert secret_blog not in parsed_item.toXml()
361 352
362 # we share the secrets 353 # we share the secrets
363 li.pubsub.secret.share("account1@server2.test", service="account1@server1.test", node=node) 354 li.pubsub.secret.share(
355 "account1@server2.test", service="account1@server1.test", node=node
356 )
364 357
365 # and get the item again 358 # and get the item again
366 parsed_item = li_elt.pubsub.get( 359 parsed_item = li_elt.pubsub.get(
367 service="account1@server1.test", node=node, item="test_e2ee", no_cache=True, 360 service="account1@server1.test",
368 profile="account1_s2" 361 node=node,
362 item="test_e2ee",
363 no_cache=True,
364 profile="account1_s2",
369 ) 365 )
370 # now it should be decrypted 366 # now it should be decrypted
371 entry_elt = parsed_item.firstChildElement() 367 entry_elt = parsed_item.firstChildElement()
372 assert entry_elt.name == "entry" 368 assert entry_elt.name == "entry"
373 assert entry_elt.uri == NS_ATOM 369 assert entry_elt.uri == NS_ATOM
374 assert secret_blog in parsed_item.toXml() 370 assert secret_blog in parsed_item.toXml()
375 371
376 def test_pubsub_signature(self, li_json): 372 def test_pubsub_signature(self, li_json):
377 """A pubsub item can be signed, and the signature can be verified""" 373 """A pubsub item can be signed, and the signature can be verified"""
378 body = "this message is signed" 374 body = "this message is signed"
379 service="account1@server1.test" 375 service = "account1@server1.test"
380 node ="blog_signing" 376 node = "blog_signing"
381 item="signed_item" 377 item = "signed_item"
382 li.blog.set(_in=body, service=service, node=node, item=item, sign=True) 378 li.blog.set(_in=body, service=service, node=node, item=item, sign=True)
383 attachments = li_json.pubsub.attachments.get( 379 attachments = li_json.pubsub.attachments.get(
384 service=service, node=node, item=item 380 service=service, node=node, item=item
385 ) 381 )
386 assert len(attachments) == 1 382 assert len(attachments) == 1
387 attachment = attachments[0] 383 attachment = attachments[0]
388 assert attachment["from"] == "account1@server1.test" 384 assert attachment["from"] == "account1@server1.test"
389 signature_json = attachment["signature"] 385 signature_json = attachment["signature"]
390 sign_data = li_json.pubsub.signature.check( 386 sign_data = li_json.pubsub.signature.check(
391 json.dumps(signature_json), service=service, node=node, item=item, 387 json.dumps(signature_json),
388 service=service,
389 node=node,
390 item=item,
392 ) 391 )
393 assert sign_data["signer"] == "account1@server1.test" 392 assert sign_data["signer"] == "account1@server1.test"
394 assert sign_data["validated"] == True 393 assert sign_data["validated"] == True
395 assert all(t == "undecided" for t in sign_data["trusts"].values()) 394 assert all(t == "undecided" for t in sign_data["trusts"].values())
396 395
400 # Maybe we can use tcpdump? 399 # Maybe we can use tcpdump?
401 li.encryption.start("account1@server2.test", name="omemo_legacy") 400 li.encryption.start("account1@server2.test", name="omemo_legacy")
402 source_file = fake_file.size(10240) 401 source_file = fake_file.size(10240)
403 source_file_hash = fake_file.get_source_hash(source_file) 402 source_file_hash = fake_file.get_source_hash(source_file)
404 send_cmd = li.file.send( 403 send_cmd = li.file.send(
405 source_file, "account1@server2.test", encrypt=True, _bg=True, 404 source_file,
405 "account1@server2.test",
406 encrypt=True,
407 _bg=True,
406 ) 408 )
407 dest_path = fake_file.dest_files / "test_send_receive" 409 dest_path = fake_file.dest_files / "test_send_receive"
408 dest_path.mkdir() 410 dest_path.mkdir()
409 try: 411 try:
410 li.file.receive( 412 li.file.receive(
411 "account1@server1.test", profile="account1_s2", path=dest_path, 413 "account1@server1.test",
414 profile="account1_s2",
415 path=dest_path,
412 ) 416 )
413 dest_file = dest_path / source_file.name 417 dest_file = dest_path / source_file.name
414 dest_file_hash = fake_file.get_dest_hash(dest_file) 418 dest_file_hash = fake_file.get_dest_hash(dest_file)
415 finally: 419 finally:
416 shutil.rmtree(dest_path) 420 shutil.rmtree(dest_path)
423 """An item is encrypted for specific recipients""" 427 """An item is encrypted for specific recipients"""
424 secret_blog = "this is a secret blog post" 428 secret_blog = "this is a secret blog post"
425 node = "e2ee_blog" 429 node = "e2ee_blog"
426 item = "test_pte" 430 item = "test_pte"
427 li.encryption.start("account1@server2.test", name="omemo") 431 li.encryption.start("account1@server2.test", name="omemo")
428 li.encryption.start( 432 li.encryption.start("account1@server1.test", name="omemo", profile="account1_s2")
429 "account1@server1.test", name="omemo", profile="account1_s2"
430 )
431 li.blog.set( 433 li.blog.set(
432 _in=secret_blog, node="e2ee_blog", item=item, 434 _in=secret_blog,
433 encrypt_for="account1@server2.test" 435 node="e2ee_blog",
436 item=item,
437 encrypt_for="account1@server2.test",
434 ) 438 )
435 439
436 # the item should be transparently decrypted 440 # the item should be transparently decrypted
437 parsed_decrypted = li_elt.pubsub.get( 441 parsed_decrypted = li_elt.pubsub.get(
438 service="account1@server1.test", node=node, item=item, no_cache=True, 442 service="account1@server1.test",
439 profile="account1_s2" 443 node=node,
444 item=item,
445 no_cache=True,
446 profile="account1_s2",
440 ) 447 )
441 entry_elt = parsed_decrypted.firstChildElement() 448 entry_elt = parsed_decrypted.firstChildElement()
442 assert entry_elt.name == "entry" 449 assert entry_elt.name == "entry"
443 assert entry_elt.uri == NS_ATOM 450 assert entry_elt.uri == NS_ATOM
444 assert secret_blog in parsed_decrypted.toXml() 451 assert secret_blog in parsed_decrypted.toXml()