comparison sat/plugins/plugin_xep_0373.py @ 3933:cecf45416403

plugin XEP-0373 and XEP-0374: Implementation of OX and OXIM: GPGME is used as the GPG provider. rel 374
author Syndace <me@syndace.dev>
date Tue, 20 Sep 2022 16:22:18 +0200
parents
children a92eef737703
comparison
equal deleted inserted replaced
3932:7af29260ecb8 3933:cecf45416403
1 #!/usr/bin/env python3
2
3 # Libervia plugin for OpenPGP for XMPP
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from abc import ABC, abstractmethod
20 import base64
21 from datetime import datetime, timezone
22 import enum
23 import secrets
24 import string
25 from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast
26 from xml.sax.saxutils import quoteattr
27
28 from typing_extensions import Final, NamedTuple, Never, assert_never
29 from wokkel import muc, pubsub
30 from wokkel.disco import DiscoFeature, DiscoInfo
31 import xmlschema
32
33 from sat.core import exceptions
34 from sat.core.constants import Const as C
35 from sat.core.core_types import SatXMPPEntity
36 from sat.core.i18n import _, D_
37 from sat.core.log import getLogger, Logger
38 from sat.core.sat_main import SAT
39 from sat.core.xmpp import SatXMPPClient
40 from sat.memory import persistent
41 from sat.plugins.plugin_xep_0045 import XEP_0045
42 from sat.plugins.plugin_xep_0060 import XEP_0060
43 from sat.plugins.plugin_xep_0163 import XEP_0163
44 from sat.tools.xmpp_datetime import format_datetime, parse_datetime
45 from sat.tools import xml_tools
46 from twisted.internet import defer
47 from twisted.words.protocols.jabber import jid
48 from twisted.words.xish import domish
49
50 try:
51 import gpg
52 except ImportError as import_error:
53 raise exceptions.MissingModule(
54 "You are missing the 'gpg' package required by the OX plugin. The recommended"
55 " installation method is via your operating system's package manager, since the"
56 " version of the library has to match the version of your GnuPG installation. See"
57 " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme"
58 ) from import_error
59
60
61 __all__ = [ # pylint: disable=unused-variable
62 "PLUGIN_INFO",
63 "NS_OX",
64 "XEP_0373",
65 "VerificationError",
66 "XMPPInteractionFailed",
67 "InvalidPacket",
68 "DecryptionFailed",
69 "VerificationFailed",
70 "UnknownKey",
71 "GPGProviderError",
72 "GPGPublicKey",
73 "GPGSecretKey",
74 "GPGProvider",
75 "PublicKeyMetadata",
76 "gpg_provider",
77 "TrustLevel"
78 ]
79
80
81 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
82
83
84 PLUGIN_INFO = {
85 C.PI_NAME: "XEP-0373",
86 C.PI_IMPORT_NAME: "XEP-0373",
87 C.PI_TYPE: "SEC",
88 C.PI_PROTOCOLS: [ "XEP-0373" ],
89 C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
90 C.PI_RECOMMENDATIONS: [],
91 C.PI_MAIN: "XEP_0373",
92 C.PI_HANDLER: "no",
93 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
94 }
95
96
97 NS_OX: Final = "urn:xmpp:openpgp:0"
98
99
100 PARAM_CATEGORY = "Security"
101 PARAM_NAME = "ox_policy"
102
103
104 class VerificationError(Exception):
105 """
106 Raised by verifying methods of :class:`XEP_0373` on semantical verification errors.
107 """
108
109
110 class XMPPInteractionFailed(Exception):
111 """
112 Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this
113 exception exists is that the exceptions raised by XMPP interactions are not properly
114 documented for the most part, thus all exceptions are caught and wrapped in instances
115 of this class.
116 """
117
118
119 class InvalidPacket(ValueError):
120 """
121 Raised by methods of :class:`GPGProvider` when an invalid packet is encountered.
122 """
123
124
125 class DecryptionFailed(Exception):
126 """
127 Raised by methods of :class:`GPGProvider` on decryption failures.
128 """
129
130
131 class VerificationFailed(Exception):
132 """
133 Raised by methods of :class:`GPGProvider` on verification failures.
134 """
135
136
137 class UnknownKey(ValueError):
138 """
139 Raised by methods of :class:`GPGProvider` when an unknown key is referenced.
140 """
141
142
143 class GPGProviderError(Exception):
144 """
145 Raised by methods of :class:`GPGProvider` on internal errors.
146 """
147
148
149 class GPGPublicKey(ABC):
150 """
151 Interface describing a GPG public key.
152 """
153
154 @property
155 @abstractmethod
156 def fingerprint(self) -> str:
157 """
158 @return: The OpenPGP v4 fingerprint string of this public key.
159 """
160
161
162 class GPGSecretKey(ABC):
163 """
164 Interface descibing a GPG secret key.
165 """
166
167 @property
168 @abstractmethod
169 def public_key(self) -> GPGPublicKey:
170 """
171 @return: The public key corresponding to this secret key.
172 """
173
174
175 class GPGProvider(ABC):
176 """
177 Interface describing a GPG provider, i.e. a library or framework providing GPG
178 encryption, signing and key management.
179
180 All methods may raise :class:`GPGProviderError` in addition to those exception types
181 listed explicitly.
182
183 # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?)
184 """
185
186 @abstractmethod
187 def export_public_key(self, public_key: GPGPublicKey) -> bytes:
188 """Export a public key in a key material packet according to RFC 4880 §5.5.
189
190 Do not use OpenPGP's ASCII Armor.
191
192 @param public_key: The public key to export.
193 @return: The packet containing the exported public key.
194 @raise UnknownKey: if the public key is not available.
195 """
196
197 @abstractmethod
198 def import_public_key(self, packet: bytes) -> GPGPublicKey:
199 """Import a public key from a key material packet according to RFC 4880 §5.5.
200
201 OpenPGP's ASCII Armor is not used.
202
203 @param packet: A packet containing an exported public key.
204 @return: The public key imported from the packet.
205 @raise InvalidPacket: if the packet is either syntactically or semantically deemed
206 invalid.
207
208 @warning: Only packets of version 4 or higher may be accepted, packets below
209 version 4 MUST be rejected.
210 """
211
212 @abstractmethod
213 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
214 """Export a secret key for transfer according to RFC 4880 §11.1.
215
216 Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage
217 conventions to zero in the corresponding secret-key packet according to RFC 4880
218 §5.5.3. Do not use OpenPGP's ASCII Armor.
219
220 @param secret_key: The secret key to export.
221 @return: The binary blob containing the exported secret key.
222 @raise UnknownKey: if the secret key is not available.
223 """
224
225 @abstractmethod
226 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
227 """Restore secret keys exported for transfer according to RFC 4880 §11.1.
228
229 The secret data is not encrypted, i.e. the octet indicating string-to-key usage
230 conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3
231 are set to zero. OpenPGP's ASCII Armor is not used.
232
233 @param data: Concatenation of one or more secret keys exported for transfer.
234 @return: The secret keys imported from the data.
235 @raise InvalidPacket: if the data or one of the packets included in the data is
236 either syntactically or semantically deemed invalid.
237
238 @warning: Only packets of version 4 or higher may be accepted, packets below
239 version 4 MUST be rejected.
240 """
241
242 @abstractmethod
243 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
244 """Encrypt data symmetrically according to RFC 4880 §5.3.
245
246 The password is used to build a Symmetric-Key Encrypted Session Key packet which
247 precedes the Symmetrically Encrypted Data packet that holds the encrypted data.
248
249 @param plaintext: The data to encrypt.
250 @param password: The password to encrypt the data with.
251 @return: The encrypted data.
252 """
253
254 @abstractmethod
255 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
256 """Decrypt data symmetrically according to RFC 4880 §5.3.
257
258 The ciphertext consists of a Symmetrically Encrypted Data packet that holds the
259 encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the
260 password.
261
262 @param ciphertext: The ciphertext.
263 @param password: The password to decrypt the data with.
264 @return: The plaintext.
265 @raise DecryptionFailed: on decryption failure.
266 """
267
268 @abstractmethod
269 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
270 """Sign some data.
271
272 OpenPGP's ASCII Armor is not used.
273
274 @param data: The data to sign.
275 @param secret_keys: The secret keys to sign the data with.
276 @return: The OpenPGP message carrying the signed data.
277 """
278
279 @abstractmethod
280 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
281 """Sign some data. Create the signature detached from the data.
282
283 OpenPGP's ASCII Armor is not used.
284
285 @param data: The data to sign.
286 @param secret_keys: The secret keys to sign the data with.
287 @return: The OpenPGP message carrying the detached signature.
288 """
289
290 @abstractmethod
291 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
292 """Verify signed data.
293
294 OpenPGP's ASCII Armor is not used.
295
296 @param signed_data: The signed data as an OpenPGP message.
297 @param public_keys: The public keys to verify the signature with.
298 @return: The verified and unpacked data.
299 @raise VerificationFailed: if the data could not be verified.
300
301 @warning: For implementors: it has to be confirmed that a valid signature by one
302 of the public keys is available.
303 """
304
305 @abstractmethod
306 def verify_detached(
307 self,
308 data: bytes,
309 signature: bytes,
310 public_keys: Set[GPGPublicKey]
311 ) -> None:
312 """Verify signed data, where the signature was created detached from the data.
313
314 OpenPGP's ASCII Armor is not used.
315
316 @param data: The data.
317 @param signature: The signature as an OpenPGP message.
318 @param public_keys: The public keys to verify the signature with.
319 @raise VerificationFailed: if the data could not be verified.
320
321 @warning: For implementors: it has to be confirmed that a valid signature by one
322 of the public keys is available.
323 """
324
325 @abstractmethod
326 def encrypt(
327 self,
328 plaintext: bytes,
329 public_keys: Set[GPGPublicKey],
330 signing_keys: Optional[Set[GPGSecretKey]] = None
331 ) -> bytes:
332 """Encrypt and optionally sign some data.
333
334 OpenPGP's ASCII Armor is not used.
335
336 @param plaintext: The data to encrypt and optionally sign.
337 @param public_keys: The public keys to encrypt the data for.
338 @param signing_keys: The secret keys to sign the data with.
339 @return: The OpenPGP message carrying the encrypted and optionally signed data.
340 """
341
342 @abstractmethod
343 def decrypt(
344 self,
345 ciphertext: bytes,
346 secret_keys: Set[GPGSecretKey],
347 public_keys: Optional[Set[GPGPublicKey]] = None
348 ) -> bytes:
349 """Decrypt and optionally verify some data.
350
351 OpenPGP's ASCII Armor is not used.
352
353 @param ciphertext: The encrypted and optionally signed data as an OpenPGP message.
354 @param secret_keys: The secret keys to attempt decryption with.
355 @param public_keys: The public keys to verify the optional signature with.
356 @return: The decrypted, optionally verified and unpacked data.
357 @raise DecryptionFailed: on decryption failure.
358 @raise VerificationFailed: if the data could not be verified.
359
360 @warning: For implementors: it has to be confirmed that the data was decrypted
361 using one of the secret keys and that a valid signature by one of the public
362 keys is available in case the data is signed.
363 """
364
365 @abstractmethod
366 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
367 """List public keys.
368
369 @param user_id: The user id.
370 @return: The set of public keys available for this user id.
371 """
372
373 @abstractmethod
374 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
375 """List secret keys.
376
377 @param user_id: The user id.
378 @return: The set of secret keys available for this user id.
379 """
380
381 @abstractmethod
382 def can_sign(self, public_key: GPGPublicKey) -> bool:
383 """
384 @return: Whether the public key belongs to a key pair capable of signing.
385 """
386
387 @abstractmethod
388 def can_encrypt(self, public_key: GPGPublicKey) -> bool:
389 """
390 @return: Whether the public key belongs to a key pair capable of encryption.
391 """
392
393 @abstractmethod
394 def create_key(self, user_id: str) -> GPGSecretKey:
395 """Create a new GPG key, capable of signing and encryption.
396
397 The key is generated without password protection and without expiration. If a key
398 with the same user id already exists, a new key is created anyway.
399
400 @param user_id: The user id to assign to the new key.
401 @return: The new key.
402 """
403
404
405 class GPGME_GPGPublicKey(GPGPublicKey):
406 """
407 GPG public key implementation based on GnuPG Made Easy (GPGME).
408 """
409
410 def __init__(self, key_obj: Any) -> None:
411 """
412 @param key_obj: The GPGME key object.
413 """
414
415 self.__key_obj = key_obj
416
417 @property
418 def fingerprint(self) -> str:
419 return self.__key_obj.fpr
420
421 @property
422 def key_obj(self) -> Any:
423 return self.__key_obj
424
425
426 class GPGME_GPGSecretKey(GPGSecretKey):
427 """
428 GPG secret key implementation based on GnuPG Made Easy (GPGME).
429 """
430
431 def __init__(self, public_key: GPGME_GPGPublicKey) -> None:
432 """
433 @param public_key: The public key corresponding to this secret key.
434 """
435
436 self.__public_key = public_key
437
438 @property
439 def public_key(self) -> GPGME_GPGPublicKey:
440 return self.__public_key
441
442
443 class GPGME_GPGProvider(GPGProvider):
444 """
445 GPG provider implementation based on GnuPG Made Easy (GPGME).
446 """
447
448 def __init__(self, home_dir: Optional[str] = None) -> None:
449 """
450 @param home_dir: Optional GPG home directory path to use for all operations.
451 """
452
453 self.__home_dir = home_dir
454
455 def export_public_key(self, public_key: GPGPublicKey) -> bytes:
456 assert isinstance(public_key, GPGME_GPGPublicKey)
457
458 pattern = public_key.fingerprint
459
460 with gpg.Context(home_dir=self.__home_dir) as c:
461 try:
462 result = c.key_export_minimal(pattern)
463 except gpg.errors.GPGMEError as e:
464 raise GPGProviderError("Internal GPGME error") from e
465
466 if result is None:
467 raise UnknownKey(f"Public key {pattern} not found.")
468
469 return result
470
471 def import_public_key(self, packet: bytes) -> GPGPublicKey:
472 # TODO
473 # - Reject packets older than version 4
474 # - Check whether it's actually a public key (through packet inspection?)
475
476 with gpg.Context(home_dir=self.__home_dir) as c:
477 try:
478 result = c.key_import(packet)
479 except gpg.errors.GPGMEError as e:
480 # From looking at the code, `key_import` never raises. The documentation
481 # says it does though, so this is included for future-proofness.
482 raise GPGProviderError("Internal GPGME error") from e
483
484 if not hasattr(result, "considered"):
485 raise InvalidPacket(
486 f"Data not considered for public key import: {result}"
487 )
488
489 if len(result.imports) != 1:
490 raise InvalidPacket(
491 "Public key packet does not contain exactly one public key (not"
492 " counting subkeys)."
493 )
494
495 try:
496 key_obj = c.get_key(result.imports[0].fpr, secret=False)
497 except gpg.errors.GPGMEError as e:
498 raise GPGProviderError("Internal GPGME error") from e
499 except gpg.errors.KeyError as e:
500 raise GPGProviderError("Newly imported public key not found") from e
501
502 return GPGME_GPGPublicKey(key_obj)
503
504 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
505 assert isinstance(secret_key, GPGME_GPGSecretKey)
506 # TODO
507 # - Handle password protection/pinentry
508 # - Make sure the key is exported unencrypted
509
510 pattern = secret_key.public_key.fingerprint
511
512 with gpg.Context(home_dir=self.__home_dir) as c:
513 try:
514 result = c.key_export_secret(pattern)
515 except gpg.errors.GPGMEError as e:
516 raise GPGProviderError("Internal GPGME error") from e
517
518 if result is None:
519 raise UnknownKey(f"Secret key {pattern} not found.")
520
521 return result
522
523 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
524 # TODO
525 # - Reject packets older than version 4
526 # - Check whether it's actually secret keys (through packet inspection?)
527
528 with gpg.Context(home_dir=self.__home_dir) as c:
529 try:
530 result = c.key_import(data)
531 except gpg.errors.GPGMEError as e:
532 # From looking at the code, `key_import` never raises. The documentation
533 # says it does though, so this is included for future-proofness.
534 raise GPGProviderError("Internal GPGME error") from e
535
536 if not hasattr(result, "considered"):
537 raise InvalidPacket(
538 f"Data not considered for secret key import: {result}"
539 )
540
541 if len(result.imports) == 0:
542 raise InvalidPacket("Secret key packet does not contain a secret key.")
543
544 secret_keys = set()
545 for import_status in result.imports:
546 try:
547 key_obj = c.get_key(import_status.fpr, secret=True)
548 except gpg.errors.GPGMEError as e:
549 raise GPGProviderError("Internal GPGME error") from e
550 except gpg.errors.KeyError as e:
551 raise GPGProviderError("Newly imported secret key not found") from e
552
553 secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)))
554
555 return secret_keys
556
557 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
558 with gpg.Context(home_dir=self.__home_dir) as c:
559 try:
560 ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False)
561 except gpg.errors.GPGMEError as e:
562 raise GPGProviderError("Internal GPGME error") from e
563
564 return ciphertext
565
566 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
567 with gpg.Context(home_dir=self.__home_dir) as c:
568 try:
569 plaintext, __, __ = c.decrypt(
570 ciphertext,
571 passphrase=password,
572 verify=False
573 )
574 except gpg.errors.GPGMEError as e:
575 # TODO: Find out what kind of error is raised if the password is wrong and
576 # re-raise it as DecryptionFailed instead.
577 raise GPGProviderError("Internal GPGME error") from e
578 except gpg.UnsupportedAlgorithm as e:
579 raise DecryptionFailed("Unsupported algorithm") from e
580
581 return plaintext
582
583 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
584 signers = []
585 for secret_key in secret_keys:
586 assert isinstance(secret_key, GPGME_GPGSecretKey)
587
588 signers.append(secret_key.public_key.key_obj)
589
590 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
591 try:
592 signed_data, __ = c.sign(data)
593 except gpg.error.GPGMEError as e:
594 raise GPGProviderError("Internal GPGME error") from e
595 except gpg.errors.InvalidSigners as e:
596 raise GPGProviderError(
597 "At least one of the secret keys is invalid for signing"
598 ) from e
599
600 return signed_data
601
602 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
603 signers = []
604 for secret_key in secret_keys:
605 assert isinstance(secret_key, GPGME_GPGSecretKey)
606
607 signers.append(secret_key.public_key.key_obj)
608
609 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
610 try:
611 signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH)
612 except gpg.error.GPGMEError as e:
613 raise GPGProviderError("Internal GPGME error") from e
614 except gpg.errors.InvalidSigners as e:
615 raise GPGProviderError(
616 "At least one of the secret keys is invalid for signing"
617 ) from e
618
619 return signature
620
621 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
622 with gpg.Context(home_dir=self.__home_dir) as c:
623 try:
624 data, result = c.verify(signed_data)
625 except gpg.errors.GPGMEError as e:
626 raise GPGProviderError("Internal GPGME error") from e
627 except gpg.errors.BadSignatures as e:
628 raise VerificationFailed("Bad signatures on signed data") from e
629
630 valid_signature_found = False
631 for public_key in public_keys:
632 assert isinstance(public_key, GPGME_GPGPublicKey)
633
634 for subkey in public_key.key_obj.subkeys:
635 for sig in result.signatures:
636 if subkey.can_sign and subkey.fpr == sig.fpr:
637 valid_signature_found = True
638
639 if not valid_signature_found:
640 raise VerificationFailed(
641 "Data not signed by one of the expected public keys"
642 )
643
644 return data
645
646 def verify_detached(
647 self,
648 data: bytes,
649 signature: bytes,
650 public_keys: Set[GPGPublicKey]
651 ) -> None:
652 with gpg.Context(home_dir=self.__home_dir) as c:
653 try:
654 __, result = c.verify(data, signature=signature)
655 except gpg.errors.GPGMEError as e:
656 raise GPGProviderError("Internal GPGME error") from e
657 except gpg.errors.BadSignatures as e:
658 raise VerificationFailed("Bad signatures on signed data") from e
659
660 valid_signature_found = False
661 for public_key in public_keys:
662 assert isinstance(public_key, GPGME_GPGPublicKey)
663
664 for subkey in public_key.key_obj.subkeys:
665 for sig in result.signatures:
666 if subkey.can_sign and subkey.fpr == sig.fpr:
667 valid_signature_found = True
668
669 if not valid_signature_found:
670 raise VerificationFailed(
671 "Data not signed by one of the expected public keys"
672 )
673
674 def encrypt(
675 self,
676 plaintext: bytes,
677 public_keys: Set[GPGPublicKey],
678 signing_keys: Optional[Set[GPGSecretKey]] = None
679 ) -> bytes:
680 recipients = []
681 for public_key in public_keys:
682 assert isinstance(public_key, GPGME_GPGPublicKey)
683
684 recipients.append(public_key.key_obj)
685
686 signers = []
687 if signing_keys is not None:
688 for secret_key in signing_keys:
689 assert isinstance(secret_key, GPGME_GPGSecretKey)
690
691 signers.append(secret_key.public_key.key_obj)
692
693 sign = signing_keys is not None
694
695 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
696 try:
697 ciphertext, __, __ = c.encrypt(
698 plaintext,
699 recipients=recipients,
700 sign=sign,
701 always_trust=True,
702 add_encrypt_to=True
703 )
704 except gpg.errors.GPGMEError as e:
705 raise GPGProviderError("Internal GPGME error") from e
706 except gpg.errors.InvalidRecipients as e:
707 raise GPGProviderError(
708 "At least one of the public keys is invalid for encryption"
709 ) from e
710 except gpg.errors.InvalidSigners as e:
711 raise GPGProviderError(
712 "At least one of the signing keys is invalid for signing"
713 ) from e
714
715 return ciphertext
716
717 def decrypt(
718 self,
719 ciphertext: bytes,
720 secret_keys: Set[GPGSecretKey],
721 public_keys: Optional[Set[GPGPublicKey]] = None
722 ) -> bytes:
723 verify = public_keys is not None
724
725 with gpg.Context(home_dir=self.__home_dir) as c:
726 try:
727 plaintext, result, verify_result = c.decrypt(
728 ciphertext,
729 verify=verify
730 )
731 except gpg.errors.GPGMEError as e:
732 raise GPGProviderError("Internal GPGME error") from e
733 except gpg.UnsupportedAlgorithm as e:
734 raise DecryptionFailed("Unsupported algorithm") from e
735
736 # TODO: Check whether the data was decrypted using one of the expected secret
737 # keys
738
739 if public_keys is not None:
740 valid_signature_found = False
741 for public_key in public_keys:
742 assert isinstance(public_key, GPGME_GPGPublicKey)
743
744 for subkey in public_key.key_obj.subkeys:
745 for sig in verify_result.signatures:
746 if subkey.can_sign and subkey.fpr == sig.fpr:
747 valid_signature_found = True
748
749 if not valid_signature_found:
750 raise VerificationFailed(
751 "Data not signed by one of the expected public keys"
752 )
753
754 return plaintext
755
756 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
757 with gpg.Context(home_dir=self.__home_dir) as c:
758 try:
759 return {
760 GPGME_GPGPublicKey(key)
761 for key
762 in c.keylist(pattern=user_id, secret=False)
763 }
764 except gpg.errors.GPGMEError as e:
765 raise GPGProviderError("Internal GPGME error") from e
766
767 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
768 with gpg.Context(home_dir=self.__home_dir) as c:
769 try:
770 return {
771 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
772 for key
773 in c.keylist(pattern=user_id, secret=True)
774 }
775 except gpg.errors.GPGMEError as e:
776 raise GPGProviderError("Internal GPGME error") from e
777
778 def can_sign(self, public_key: GPGPublicKey) -> bool:
779 assert isinstance(public_key, GPGME_GPGPublicKey)
780
781 return any(subkey.can_sign for subkey in public_key.key_obj.subkeys)
782
783 def can_encrypt(self, public_key: GPGPublicKey) -> bool:
784 assert isinstance(public_key, GPGME_GPGPublicKey)
785
786 return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys)
787
788 def create_key(self, user_id: str) -> GPGSecretKey:
789 with gpg.Context(home_dir=self.__home_dir) as c:
790 try:
791 result = c.create_key(
792 user_id,
793 expires=False,
794 sign=True,
795 encrypt=True,
796 certify=False,
797 authenticate=False,
798 force=True
799 )
800
801 key_obj = c.get_key(result.fpr, secret=True)
802 except gpg.errors.GPGMEError as e:
803 raise GPGProviderError("Internal GPGME error") from e
804 except gpg.errors.KeyError as e:
805 raise GPGProviderError("Newly created key not found") from e
806
807 return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))
808
809
810 class PublicKeyMetadata(NamedTuple):
811 """
812 Metadata about a published public key.
813 """
814
815 fingerprint: str
816 timestamp: datetime
817
818
819 @enum.unique
820 class TrustLevel(enum.Enum):
821 """
822 The trust levels required for BTBV and manual trust.
823 """
824
825 TRUSTED: str = "TRUSTED"
826 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
827 UNDECIDED: str = "UNDECIDED"
828 DISTRUSTED: str = "DISTRUSTED"
829
830
831 OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
832 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
833 targetNamespace="urn:xmpp:openpgp:0"
834 xmlns="urn:xmpp:openpgp:0">
835
836 <xs:element name="openpgp" type="xs:base64Binary"/>
837 </xs:schema>
838 """)
839
840
841 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
842 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
843 # implementation of XML Schema, including version 1.1.
844 CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
845 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
846 targetNamespace="urn:xmpp:openpgp:0"
847 xmlns="urn:xmpp:openpgp:0">
848
849 <xs:element name="signcrypt">
850 <xs:complexType>
851 <xs:all>
852 <xs:element ref="to" maxOccurs="unbounded"/>
853 <xs:element ref="time"/>
854 <xs:element ref="rpad" minOccurs="0"/>
855 <xs:element ref="payload"/>
856 </xs:all>
857 </xs:complexType>
858 </xs:element>
859
860 <xs:element name="sign">
861 <xs:complexType>
862 <xs:all>
863 <xs:element ref="to" maxOccurs="unbounded"/>
864 <xs:element ref="time"/>
865 <xs:element ref="rpad" minOccurs="0"/>
866 <xs:element ref="payload"/>
867 </xs:all>
868 </xs:complexType>
869 </xs:element>
870
871 <xs:element name="crypt">
872 <xs:complexType>
873 <xs:all>
874 <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/>
875 <xs:element ref="time"/>
876 <xs:element ref="rpad" minOccurs="0"/>
877 <xs:element ref="payload"/>
878 </xs:all>
879 </xs:complexType>
880 </xs:element>
881
882 <xs:element name="to">
883 <xs:complexType>
884 <xs:attribute name="jid" type="xs:string"/>
885 </xs:complexType>
886 </xs:element>
887
888 <xs:element name="time">
889 <xs:complexType>
890 <xs:attribute name="stamp" type="xs:dateTime"/>
891 </xs:complexType>
892 </xs:element>
893
894 <xs:element name="rpad" type="xs:string"/>
895
896 <xs:element name="payload">
897 <xs:complexType>
898 <xs:sequence>
899 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
900 </xs:sequence>
901 </xs:complexType>
902 </xs:element>
903 </xs:schema>
904 """)
905
906
907 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
908 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
909 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
910 targetNamespace="urn:xmpp:openpgp:0"
911 xmlns="urn:xmpp:openpgp:0">
912
913 <xs:element name="public-keys-list">
914 <xs:complexType>
915 <xs:sequence>
916 <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/>
917 </xs:sequence>
918 </xs:complexType>
919 </xs:element>
920
921 <xs:element name="pubkey-metadata">
922 <xs:complexType>
923 <xs:attribute name="v4-fingerprint" type="xs:string"/>
924 <xs:attribute name="date" type="xs:dateTime"/>
925 </xs:complexType>
926 </xs:element>
927 </xs:schema>
928 """)
929
930
931 PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
932 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
933 targetNamespace="urn:xmpp:openpgp:0"
934 xmlns="urn:xmpp:openpgp:0">
935
936 <xs:element name="pubkey">
937 <xs:complexType>
938 <xs:all>
939 <xs:element ref="data"/>
940 </xs:all>
941 <xs:anyAttribute processContents="skip"/>
942 </xs:complexType>
943 </xs:element>
944
945 <xs:element name="data" type="xs:base64Binary"/>
946 </xs:schema>
947 """)
948
949
950 SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
951 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
952 targetNamespace="urn:xmpp:openpgp:0"
953 xmlns="urn:xmpp:openpgp:0">
954
955 <xs:element name="secretkey" type="xs:base64Binary"/>
956 </xs:schema>
957 """)
958
959
960 DEFAULT_TRUST_MODEL_PARAM = f"""
961 <params>
962 <individual>
963 <category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
964 <param name="{PARAM_NAME}"
965 label={quoteattr(D_('OMEMO default trust policy'))}
966 type="list" security="3">
967 <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
968 <option value="btbv"
969 label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
970 selected="true" />
971 </param>
972 </category>
973 </individual>
974 </params>
975 """
976
977
978 def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider:
979 """Get the GPG provider for a client.
980
981 @param sat: The SAT instance.
982 @param client: The client.
983 @return: The GPG provider specifically for that client.
984 """
985
986 return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home")))
987
988
989 def generate_passphrase() -> str:
990 """Generate a secure passphrase for symmetric encryption.
991
992 @return: The passphrase.
993 """
994
995 return "-".join("".join(
996 secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
997 ) for __ in range(6))
998
999
1000 # TODO: Handle the user id mess
1001 class XEP_0373:
1002 """
1003 Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``.
1004 """
1005
1006 def __init__(self, sat: SAT) -> None:
1007 """
1008 @param sat: The SAT instance.
1009 """
1010
1011 self.__sat = sat
1012
1013 # Add configuration option to choose between manual trust and BTBV as the trust
1014 # model
1015 sat.memory.updateParams(DEFAULT_TRUST_MODEL_PARAM)
1016
1017 self.__xep_0045 = cast(Optional[XEP_0045], sat.plugins.get("XEP-0045"))
1018 self.__xep_0060 = cast(XEP_0060, sat.plugins["XEP-0060"])
1019
1020 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}
1021
1022 xep_0163 = cast(XEP_0163, sat.plugins["XEP-0163"])
1023 xep_0163.addPEPEvent(
1024 "OX_PUBLIC_KEYS_LIST",
1025 PUBLIC_KEYS_LIST_NODE,
1026 lambda items_event, profile: defer.ensureDeferred(
1027 self.__on_public_keys_list_update(items_event, profile)
1028 )
1029 )
1030
1031 async def profileConnected( # pylint: disable=invalid-name
1032 self,
1033 client: SatXMPPClient
1034 ) -> None:
1035 """
1036 @param client: The client.
1037 """
1038
1039 profile = cast(str, client.profile)
1040
1041 if not profile in self.__storage:
1042 self.__storage[profile] = \
1043 persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)
1044
1045 if len(self.list_secret_keys(client)) == 0:
1046 log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
1047 await self.create_key(client)
1048
1049 async def __on_public_keys_list_update(
1050 self,
1051 items_event: pubsub.ItemsEvent,
1052 profile: str
1053 ) -> None:
1054 """Handle public keys list updates fired by PEP.
1055
1056 @param items_event: The event.
1057 @param profile: The profile this event belongs to.
1058 """
1059
1060 client = self.__sat.getClient(profile)
1061
1062 sender = cast(jid.JID, items_event.sender)
1063 items = cast(List[domish.Element], items_event.items)
1064
1065 if len(items) > 1:
1066 log.warning("Ignoring public keys list update with more than one element.")
1067 return
1068
1069 item_elt = next(iter(items), None)
1070 if item_elt is None:
1071 log.debug("Ignoring empty public keys list update.")
1072 return
1073
1074 public_keys_list_elt = cast(
1075 Optional[domish.Element],
1076 next(item_elt.elements(NS_OX, "public-keys-list"), None)
1077 )
1078
1079 pubkey_metadata_elts: Optional[List[domish.Element]] = None
1080
1081 if public_keys_list_elt is not None:
1082 try:
1083 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
1084 except xmlschema.XMLSchemaValidationError:
1085 pass
1086 else:
1087 pubkey_metadata_elts = \
1088 list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))
1089
1090 if pubkey_metadata_elts is None:
1091 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
1092 return
1093
1094 new_public_keys_metadata = { PublicKeyMetadata(
1095 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
1096 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
1097 ) for pubkey_metadata_elt in pubkey_metadata_elts }
1098
1099 storage_key = f"/public-keys-metadata/{sender.userhost()}"
1100
1101 local_public_keys_metadata = cast(
1102 Set[PublicKeyMetadata],
1103 await self.__storage[profile].get(storage_key, set())
1104 )
1105
1106 unchanged_keys = new_public_keys_metadata & local_public_keys_metadata
1107 changed_or_new_keys = new_public_keys_metadata - unchanged_keys
1108 available_keys = self.list_public_keys(client, sender)
1109
1110 for key_metadata in changed_or_new_keys:
1111 # Check whether the changed or new key has been imported before
1112 if any(key.fingerprint == key_metadata.fingerprint for key in available_keys):
1113 try:
1114 # If it has been imported before, try to update it
1115 await self.import_public_key(client, sender, key_metadata.fingerprint)
1116 except Exception as e:
1117 log.warning(f"Public key import failed: {e}")
1118
1119 # If the update fails, remove the key from the local metadata list
1120 # such that the update is attempted again next time
1121 new_public_keys_metadata.remove(key_metadata)
1122
1123 # Check whether this update was for our account and make sure all of our keys are
1124 # included in the update
1125 if sender.userhost() == client.jid.userhost():
1126 secret_keys = self.list_secret_keys(client)
1127 missing_keys = set(filter(lambda secret_key: all(
1128 key_metadata.fingerprint != secret_key.public_key.fingerprint
1129 for key_metadata
1130 in new_public_keys_metadata
1131 ), secret_keys))
1132
1133 if len(missing_keys) > 0:
1134 log.warning(
1135 "Public keys list update did not contain at least one of our keys."
1136 f" {new_public_keys_metadata}"
1137 )
1138
1139 for missing_key in missing_keys:
1140 log.warning(missing_key.public_key.fingerprint)
1141 new_public_keys_metadata.add(PublicKeyMetadata(
1142 fingerprint=missing_key.public_key.fingerprint,
1143 timestamp=datetime.now(timezone.utc)
1144 ))
1145
1146 await self.publish_public_keys_list(client, new_public_keys_metadata)
1147
1148 await self.__storage[profile].force(storage_key, new_public_keys_metadata)
1149
1150 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
1151 """List GPG public keys available for a JID.
1152
1153 @param client: The client to perform this operation with.
1154 @param jid: The JID. Can be a bare JID.
1155 @return: The set of public keys available for this JID.
1156 """
1157
1158 gpg_provider = get_gpg_provider(self.__sat, client)
1159
1160 return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}")
1161
1162 def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]:
1163 """List GPG secret keys available for a JID.
1164
1165 @param client: The client to perform this operation with.
1166 @return: The set of secret keys available for this JID.
1167 """
1168
1169 gpg_provider = get_gpg_provider(self.__sat, client)
1170
1171 return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}")
1172
1173 async def create_key(self, client: SatXMPPClient) -> GPGSecretKey:
1174 """Create a new GPG key, capable of signing and encryption.
1175
1176 The key is generated without password protection and without expiration.
1177
1178 @param client: The client to perform this operation with.
1179 @return: The new key.
1180 """
1181
1182 gpg_provider = get_gpg_provider(self.__sat, client)
1183
1184 secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}")
1185
1186 await self.publish_public_key(client, secret_key.public_key)
1187
1188 storage_key = f"/public-keys-metadata/{client.jid.userhost()}"
1189
1190 public_keys_list = cast(
1191 Set[PublicKeyMetadata],
1192 await self.__storage[client.profile].get(storage_key, set())
1193 )
1194
1195 public_keys_list.add(PublicKeyMetadata(
1196 fingerprint=secret_key.public_key.fingerprint,
1197 timestamp=datetime.now(timezone.utc)
1198 ))
1199
1200 await self.publish_public_keys_list(client, public_keys_list)
1201
1202 await self.__storage[client.profile].force(storage_key, public_keys_list)
1203
1204 return secret_key
1205
1206 @staticmethod
1207 def __build_content_element(
1208 element_name: Literal["signcrypt", "sign", "crypt"],
1209 recipient_jids: Iterable[jid.JID],
1210 include_rpad: bool
1211 ) -> Tuple[domish.Element, domish.Element]:
1212 """Build a content element.
1213
1214 @param element_name: The name of the content element.
1215 @param recipient_jids: The intended recipients of this content element. Can be
1216 bare JIDs.
1217 @param include_rpad: Whether to include random-length random-content padding.
1218 @return: The content element and the ``<payload/>`` element to add the stanza
1219 extension elements to.
1220 """
1221
1222 content_elt = domish.Element((NS_OX, element_name))
1223
1224 for recipient_jid in recipient_jids:
1225 content_elt.addElement("to")["jid"] = recipient_jid.userhost()
1226
1227 content_elt.addElement("time")["stamp"] = format_datetime()
1228
1229 if include_rpad:
1230 # XEP-0373 doesn't specify bounds for the length of the random padding. This
1231 # uses the bounds specified in XEP-0420 for the closely related rpad affix.
1232 rpad_length = secrets.randbelow(201)
1233 rpad_content = "".join(
1234 secrets.choice(string.digits + string.ascii_letters + string.punctuation)
1235 for __
1236 in range(rpad_length)
1237 )
1238 content_elt.addElement("rpad", content=rpad_content)
1239
1240 payload_elt = content_elt.addElement("payload")
1241
1242 return content_elt, payload_elt
1243
1244 @staticmethod
1245 def build_signcrypt_element(
1246 recipient_jids: Iterable[jid.JID]
1247 ) -> Tuple[domish.Element, domish.Element]:
1248 """Build a ``<signcrypt/>`` content element.
1249
1250 @param recipient_jids: The intended recipients of this content element. Can be
1251 bare JIDs.
1252 @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the
1253 stanza extension elements to.
1254 """
1255
1256 if len(recipient_jids) == 0:
1257 raise ValueError("Recipient JIDs must be provided.")
1258
1259 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)
1260
1261 @staticmethod
1262 def build_sign_element(
1263 recipient_jids: Iterable[jid.JID],
1264 include_rpad: bool
1265 ) -> Tuple[domish.Element, domish.Element]:
1266 """Build a ``<sign/>`` content element.
1267
1268 @param recipient_jids: The intended recipients of this content element. Can be
1269 bare JIDs.
1270 @param include_rpad: Whether to include random-length random-content padding,
1271 which is OPTIONAL for the ``<sign/>`` content element.
1272 @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza
1273 extension elements to.
1274 """
1275
1276 if len(recipient_jids) == 0:
1277 raise ValueError("Recipient JIDs must be provided.")
1278
1279 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)
1280
1281 @staticmethod
1282 def build_crypt_element(
1283 recipient_jids: Iterable[jid.JID]
1284 ) -> Tuple[domish.Element, domish.Element]:
1285 """Build a ``<crypt/>`` content element.
1286
1287 @param recipient_jids: The intended recipients of this content element. Specifying
1288 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
1289 be bare JIDs.
1290 @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza
1291 extension elements to.
1292 """
1293
1294 return XEP_0373.__build_content_element("crypt", recipient_jids, True)
1295
1296 async def build_openpgp_element(
1297 self,
1298 client: SatXMPPClient,
1299 content_elt: domish.Element,
1300 recipient_jids: Set[jid.JID]
1301 ) -> domish.Element:
1302 """Build an ``<openpgp/>`` element.
1303
1304 @param client: The client to perform this operation with.
1305 @param content_elt: The content element to contain in the ``<openpgp/>`` element.
1306 @param recipient_jids: The recipient's JIDs. Can be bare JIDs.
1307 @return: The ``<openpgp/>`` element.
1308 """
1309
1310 gpg_provider = get_gpg_provider(self.__sat, client)
1311
1312 # TODO: I'm not sure whether we want to sign with all keys by default or choose
1313 # just one key/a subset of keys to sign with.
1314 signing_keys = set(filter(
1315 lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
1316 self.list_secret_keys(client)
1317 ))
1318
1319 encryption_keys: Set[GPGPublicKey] = set()
1320
1321 for recipient_jid in recipient_jids:
1322 # Import all keys of the recipient
1323 all_public_keys = await self.import_all_public_keys(client, recipient_jid)
1324
1325 # Filter for keys that can encrypt
1326 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))
1327
1328 # TODO: Handle trust
1329
1330 content = content_elt.toXml().encode("utf-8")
1331 data: bytes
1332
1333 if content_elt.name == "signcrypt":
1334 data = gpg_provider.encrypt(content, encryption_keys, signing_keys)
1335 elif content_elt.name == "sign":
1336 data = gpg_provider.sign(content, signing_keys)
1337 elif content_elt.name == "crypt":
1338 data = gpg_provider.encrypt(content, encryption_keys)
1339 else:
1340 raise ValueError(f"Unknown content element <{content_elt.name}/>")
1341
1342 openpgp_elt = domish.Element((NS_OX, "openpgp"))
1343 openpgp_elt.addContent(base64.b64encode(data).decode("ASCII"))
1344 return openpgp_elt
1345
1346 async def unpack_openpgp_element(
1347 self,
1348 client: SatXMPPClient,
1349 openpgp_elt: domish.Element,
1350 element_name: Literal["signcrypt", "sign", "crypt"],
1351 sender_jid: jid.JID
1352 ) -> Tuple[domish.Element, datetime]:
1353 """Verify, decrypt and unpack an ``<openpgp/>`` element.
1354
1355 @param client: The client to perform this operation with.
1356 @param openpgp_elt: The ``<openpgp/>`` element.
1357 @param element_name: The name of the content element.
1358 @param sender_jid: The sender's JID. Can be a bare JID.
1359 @return: The ``<payload/>`` element containing the decrypted/verified stanza
1360 extension elements carried by this ``<openpgp/>`` element, and the timestamp
1361 contained in the content element.
1362 @raise exceptions.ParsingError: on syntactical verification errors.
1363 @raise VerificationError: on semantical verification errors accoding to XEP-0373.
1364 @raise DecryptionFailed: on decryption failure.
1365 @raise VerificationFailed: if the data could not be verified.
1366
1367 @warning: The timestamp is not verified for plausibility; this SHOULD be done by
1368 the calling code.
1369 """
1370
1371 gpg_provider = get_gpg_provider(self.__sat, client)
1372
1373 decryption_keys = set(filter(
1374 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
1375 self.list_secret_keys(client)
1376 ))
1377
1378 # Import all keys of the sender
1379 all_public_keys = await self.import_all_public_keys(client, sender_jid)
1380
1381 # Filter for keys that can sign
1382 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))
1383
1384 # TODO: Handle trust
1385
1386 try:
1387 OPENPGP_SCHEMA.validate(openpgp_elt.toXml())
1388 except xmlschema.XMLSchemaValidationError as e:
1389 raise exceptions.ParsingError(
1390 "<openpgp/> element doesn't pass schema validation."
1391 ) from e
1392
1393 openpgp_message = base64.b64decode(str(openpgp_elt))
1394 content: bytes
1395
1396 if element_name == "signcrypt":
1397 content = gpg_provider.decrypt(
1398 openpgp_message,
1399 decryption_keys,
1400 public_keys=verification_keys
1401 )
1402 elif element_name == "sign":
1403 content = gpg_provider.verify(openpgp_message, verification_keys)
1404 elif element_name == "crypt":
1405 content = gpg_provider.decrypt(openpgp_message, decryption_keys)
1406 else:
1407 assert_never(element_name)
1408
1409 try:
1410 content_elt = cast(
1411 domish.Element,
1412 xml_tools.ElementParser()(content.decode("utf-8"))
1413 )
1414 except UnicodeDecodeError as e:
1415 raise exceptions.ParsingError("UTF-8 decoding error") from e
1416
1417 try:
1418 CONTENT_SCHEMA.validate(content_elt.toXml())
1419 except xmlschema.XMLSchemaValidationError as e:
1420 raise exceptions.ParsingError(
1421 f"<{element_name}/> element doesn't pass schema validation."
1422 ) from e
1423
1424 if content_elt.name != element_name:
1425 raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
1426
1427 recipient_jids = \
1428 { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }
1429
1430 if (
1431 client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
1432 and element_name != "crypt"
1433 ):
1434 raise VerificationError(
1435 f"Recipient list in <{element_name}/> element does not list our (bare)"
1436 f" JID."
1437 )
1438
1439 time_elt = next(content_elt.elements(NS_OX, "time"))
1440
1441 timestamp = parse_datetime(time_elt["stamp"])
1442
1443 payload_elt = next(content_elt.elements(NS_OX, "payload"))
1444
1445 return payload_elt, timestamp
1446
1447 async def publish_public_key(
1448 self,
1449 client: SatXMPPClient,
1450 public_key: GPGPublicKey
1451 ) -> None:
1452 """Publish a public key.
1453
1454 @param client: The client.
1455 @param public_key: The public key to publish.
1456 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1457 """
1458
1459 gpg_provider = get_gpg_provider(self.__sat, client)
1460
1461 packet = gpg_provider.export_public_key(public_key)
1462
1463 node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}"
1464
1465 pubkey_elt = domish.Element((NS_OX, "pubkey"))
1466
1467 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))
1468
1469 try:
1470 await self.__xep_0060.sendItem(
1471 client,
1472 client.jid.userhostJID(),
1473 node,
1474 pubkey_elt,
1475 format_datetime(),
1476 extra={
1477 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
1478 XEP_0060.OPT_PERSIST_ITEMS: "true",
1479 XEP_0060.OPT_ACCESS_MODEL: "open",
1480 XEP_0060.OPT_MAX_ITEMS: 1
1481 },
1482 # TODO: Do we really want publish_without_options here?
1483 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
1484 }
1485 )
1486 except Exception as e:
1487 raise XMPPInteractionFailed("Publishing the public key failed.") from e
1488
1489 async def import_all_public_keys(
1490 self,
1491 client: SatXMPPClient,
1492 jid: jid.JID
1493 ) -> Set[GPGPublicKey]:
1494 """Import all public keys of a JID that have not been imported before.
1495
1496 @param client: The client.
1497 @param jid: The JID. Can be a bare JID.
1498 @return: The public keys.
1499 @note: Failure to import a key simply results in the key not being included in the
1500 result.
1501 """
1502
1503 available_public_keys = self.list_public_keys(client, jid)
1504
1505 storage_key = f"/public-keys-metadata/{jid.userhost()}"
1506
1507 public_keys_metadata = cast(
1508 Set[PublicKeyMetadata],
1509 await self.__storage[client.profile].get(storage_key, set())
1510 )
1511
1512 missing_keys = set(filter(lambda public_key_metadata: all(
1513 public_key_metadata.fingerprint != public_key.fingerprint
1514 for public_key
1515 in available_public_keys
1516 ), public_keys_metadata))
1517
1518 for missing_key in missing_keys:
1519 try:
1520 available_public_keys.add(
1521 await self.import_public_key(client, jid, missing_key.fingerprint)
1522 )
1523 except Exception as e:
1524 log.warning(
1525 f"Import of public key {missing_key.fingerprint} owned by"
1526 f" {jid.userhost()} failed, ignoring: {e}"
1527 )
1528
1529 return available_public_keys
1530
1531 async def import_public_key(
1532 self,
1533 client: SatXMPPClient,
1534 jid: jid.JID,
1535 fingerprint: str
1536 ) -> GPGPublicKey:
1537 """Import a public key.
1538
1539 @param client: The client.
1540 @param jid: The JID owning the public key. Can be a bare JID.
1541 @param fingerprint: The fingerprint of the public key.
1542 @return: The public key.
1543 @raise exceptions.NotFound: if the public key was not found.
1544 @raise exceptions.ParsingError: on XML-level parsing errors.
1545 @raise InvalidPacket: if the packet is either syntactically or semantically deemed
1546 invalid.
1547 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1548 """
1549
1550 gpg_provider = get_gpg_provider(self.__sat, client)
1551
1552 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
1553
1554 try:
1555 items, __ = await self.__xep_0060.getItems(
1556 client,
1557 jid.userhostJID(),
1558 node,
1559 max_items=1
1560 )
1561 except exceptions.NotFound as e:
1562 raise exceptions.NotFound(
1563 f"No public key with fingerprint {fingerprint} published by JID"
1564 f" {jid.userhost()}."
1565 ) from e
1566 except Exception as e:
1567 raise XMPPInteractionFailed("Fetching the public keys list failed.") from e
1568
1569 try:
1570 item_elt = cast(domish.Element, items[0])
1571 except IndexError as e:
1572 raise exceptions.NotFound(
1573 f"No public key with fingerprint {fingerprint} published by JID"
1574 f" {jid.userhost()}."
1575 ) from e
1576
1577 pubkey_elt = cast(
1578 Optional[domish.Element],
1579 next(item_elt.elements(NS_OX, "pubkey"), None)
1580 )
1581
1582 if pubkey_elt is None:
1583 raise exceptions.ParsingError(
1584 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
1585 f" element."
1586 )
1587
1588 try:
1589 PUBKEY_SCHEMA.validate(pubkey_elt.toXml())
1590 except xmlschema.XMLSchemaValidationError as e:
1591 raise exceptions.ParsingError(
1592 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
1593 f" schema validation."
1594 ) from e
1595
1596 public_key = gpg_provider.import_public_key(base64.b64decode(str(
1597 next(pubkey_elt.elements(NS_OX, "data"))
1598 )))
1599
1600 return public_key
1601
1602 async def publish_public_keys_list(
1603 self,
1604 client: SatXMPPClient,
1605 public_keys_list: Iterable[PublicKeyMetadata]
1606 ) -> None:
1607 """Publish/update the own public keys list.
1608
1609 @param client: The client.
1610 @param public_keys_list: The public keys list.
1611 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1612
1613 @warning: All public keys referenced in the public keys list MUST be published
1614 beforehand.
1615 """
1616
1617 if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
1618 raise ValueError("Public keys list contains duplicate fingerprints.")
1619
1620 node = "urn:xmpp:openpgp:0:public-keys"
1621
1622 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))
1623
1624 for public_key_metadata in public_keys_list:
1625 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
1626 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
1627 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)
1628
1629 try:
1630 await self.__xep_0060.sendItem(
1631 client,
1632 client.jid.userhostJID(),
1633 node,
1634 public_keys_list_elt,
1635 item_id=XEP_0060.ID_SINGLETON,
1636 extra={
1637 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
1638 XEP_0060.OPT_PERSIST_ITEMS: "true",
1639 XEP_0060.OPT_ACCESS_MODEL: "open",
1640 XEP_0060.OPT_MAX_ITEMS: 1
1641 },
1642 # TODO: Do we really want publish_without_options here?
1643 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
1644 }
1645 )
1646 except Exception as e:
1647 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
1648
1649 async def download_public_keys_list(
1650 self,
1651 client: SatXMPPClient,
1652 jid: jid.JID
1653 ) -> Optional[Set[PublicKeyMetadata]]:
1654 """Download the public keys list of a JID.
1655
1656 @param client: The client.
1657 @param jid: The JID. Can be a bare JID.
1658 @return: The public keys list or ``None`` if the JID hasn't published a public
1659 keys list. An empty list means the JID has published an empty list.
1660 @raise exceptions.ParsingError: on XML-level parsing errors.
1661 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1662 """
1663
1664 node = "urn:xmpp:openpgp:0:public-keys"
1665
1666 try:
1667 items, __ = await self.__xep_0060.getItems(
1668 client,
1669 jid.userhostJID(),
1670 node,
1671 max_items=1
1672 )
1673 except exceptions.NotFound:
1674 return None
1675 except Exception as e:
1676 raise XMPPInteractionFailed() from e
1677
1678 try:
1679 item_elt = cast(domish.Element, items[0])
1680 except IndexError:
1681 return None
1682
1683 public_keys_list_elt = cast(
1684 Optional[domish.Element],
1685 next(item_elt.elements(NS_OX, "public-keys-list"), None)
1686 )
1687
1688 if public_keys_list_elt is None:
1689 return None
1690
1691 try:
1692 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
1693 except xmlschema.XMLSchemaValidationError as e:
1694 raise exceptions.ParsingError(
1695 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys"
1696 f" list schema validation."
1697 ) from e
1698
1699 return {
1700 PublicKeyMetadata(
1701 fingerprint=pubkey_metadata_elt["v4-fingerprint"],
1702 timestamp=parse_datetime(pubkey_metadata_elt["date"])
1703 )
1704 for pubkey_metadata_elt
1705 in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
1706 }
1707
1708 async def __prepare_secret_key_synchronization(
1709 self,
1710 client: SatXMPPClient
1711 ) -> Optional[domish.Element]:
1712 """Prepare for secret key synchronization.
1713
1714 Makes sure the relative protocols and protocol extensions are supported by the
1715 server and makes sure that the PEP node for secret synchronization exists and is
1716 configured correctly. The node is created if necessary.
1717
1718 @param client: The client.
1719 @return: As part of the preparations, the secret key synchronization PEP node is
1720 fetched. The result of that fetch is returned here.
1721 @raise exceptions.FeatureNotFound: if the server lacks support for the required
1722 protocols or protocol extensions.
1723 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1724 """
1725
1726 try:
1727 infos = cast(DiscoInfo, await self.__sat.memory.disco.getInfos(
1728 client,
1729 client.jid.userhostJID()
1730 ))
1731 except Exception as e:
1732 raise XMPPInteractionFailed(
1733 "Error performing service discovery on the own bare JID."
1734 ) from e
1735
1736 identities = cast(Dict[Tuple[str, str], str], infos.identities)
1737 features = cast(Set[DiscoFeature], infos.features)
1738
1739 if ("pubsub", "pep") not in identities:
1740 raise exceptions.FeatureNotFound("Server doesn't support PEP.")
1741
1742 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
1743 raise exceptions.FeatureNotFound(
1744 "Server doesn't support the whitelist access model."
1745 )
1746
1747 persistent_items_supported = \
1748 "http://jabber.org/protocol/pubsub#persistent-items" in features
1749
1750 # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
1751
1752 node = "urn:xmpp:openpgp:0:secret-key"
1753
1754 try:
1755 items, __ = await self.__xep_0060.getItems(
1756 client,
1757 client.jid.userhostJID(),
1758 node,
1759 max_items=1
1760 )
1761 except exceptions.NotFound:
1762 try:
1763 await self.__xep_0060.createNode(
1764 client,
1765 client.jid.userhostJID(),
1766 node,
1767 {
1768 XEP_0060.OPT_PERSIST_ITEMS: "true",
1769 XEP_0060.OPT_ACCESS_MODEL: "whitelist",
1770 XEP_0060.OPT_MAX_ITEMS: "1"
1771 }
1772 )
1773 except Exception as e:
1774 raise XMPPInteractionFailed(
1775 "Error creating the secret key synchronization node."
1776 ) from e
1777 except Exception as e:
1778 raise XMPPInteractionFailed(
1779 "Error fetching the secret key synchronization node."
1780 ) from e
1781
1782 try:
1783 return cast(domish.Element, items[0])
1784 except IndexError:
1785 return None
1786
1787 async def export_secret_keys(
1788 self,
1789 client: SatXMPPClient,
1790 secret_keys: Iterable[GPGSecretKey]
1791 ) -> str:
1792 """Export secret keys to synchronize them with other devices.
1793
1794 @param client: The client.
1795 @param secret_keys: The secret keys to export.
1796 @return: The backup code needed to decrypt the exported secret keys.
1797 @raise exceptions.FeatureNotFound: if the server lacks support for the required
1798 protocols or protocol extensions.
1799 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1800 """
1801
1802 gpg_provider = get_gpg_provider(self.__sat, client)
1803
1804 await self.__prepare_secret_key_synchronization(client)
1805
1806 backup_code = generate_passphrase()
1807
1808 plaintext = b"".join(
1809 gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys
1810 )
1811
1812 ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code)
1813
1814 node = "urn:xmpp:openpgp:0:secret-key"
1815
1816 secretkey_elt = domish.Element((NS_OX, "secretkey"))
1817 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
1818
1819 try:
1820 await self.__xep_0060.sendItem(
1821 client,
1822 client.jid.userhostJID(),
1823 node,
1824 secretkey_elt
1825 )
1826 except Exception as e:
1827 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
1828
1829 return backup_code
1830
1831 async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]:
1832 """Download previously exported secret keys to import them in a second step.
1833
1834 The downloading and importing steps are separate since a backup code is required
1835 for the import and it should be possible to try multiple backup codes without
1836 redownloading the data every time. The second half of the import procedure is
1837 provided by :meth:`import_secret_keys`.
1838
1839 @param client: The client.
1840 @return: The encrypted secret keys previously exported, if any.
1841 @raise exceptions.FeatureNotFound: if the server lacks support for the required
1842 protocols or protocol extensions.
1843 @raise exceptions.ParsingError: on XML-level parsing errors.
1844 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1845 """
1846
1847 item_elt = await self.__prepare_secret_key_synchronization(client)
1848 if item_elt is None:
1849 return None
1850
1851 secretkey_elt = cast(
1852 Optional[domish.Element],
1853 next(item_elt.elements(NS_OX, "secretkey"), None)
1854 )
1855
1856 if secretkey_elt is None:
1857 return None
1858
1859 try:
1860 SECRETKEY_SCHEMA.validate(secretkey_elt.toXml())
1861 except xmlschema.XMLSchemaValidationError as e:
1862 raise exceptions.ParsingError(
1863 "Publish-Subscribe item doesn't pass secretkey schema validation."
1864 ) from e
1865
1866 return base64.b64decode(str(secretkey_elt))
1867
1868 def import_secret_keys(
1869 self,
1870 client: SatXMPPClient,
1871 ciphertext: bytes,
1872 backup_code: str
1873 ) -> Set[GPGSecretKey]:
1874 """Import previously downloaded secret keys.
1875
1876 The downloading and importing steps are separate since a backup code is required
1877 for the import and it should be possible to try multiple backup codes without
1878 redownloading the data every time. The first half of the import procedure is
1879 provided by :meth:`download_secret_keys`.
1880
1881 @param client: The client to perform this operation with.
1882 @param ciphertext: The ciphertext, i.e. the data returned by
1883 :meth:`download_secret_keys`.
1884 @param backup_code: The backup code needed to decrypt the data.
1885 @raise InvalidPacket: if one of the GPG packets building the secret key data is
1886 either syntactically or semantically deemed invalid.
1887 @raise DecryptionFailed: on decryption failure.
1888 """
1889
1890 gpg_provider = get_gpg_provider(self.__sat, client)
1891
1892 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
1893 ciphertext,
1894 backup_code
1895 ))
1896
1897 @staticmethod
1898 def __get_joined_muc_users(
1899 client: SatXMPPClient,
1900 xep_0045: XEP_0045,
1901 room_jid: jid.JID
1902 ) -> Set[jid.JID]:
1903 """
1904 @param client: The client.
1905 @param xep_0045: A MUC plugin instance.
1906 @param room_jid: The room JID.
1907 @return: A set containing the bare JIDs of the MUC participants.
1908 @raise InternalError: if the MUC is not joined or the entity information of a
1909 participant isn't available.
1910 """
1911 # TODO: This should probably be a global helper somewhere
1912
1913 bare_jids: Set[jid.JID] = set()
1914
1915 try:
1916 room = cast(muc.Room, xep_0045.getRoom(client, room_jid))
1917 except exceptions.NotFound as e:
1918 raise exceptions.InternalError(
1919 "Participant list of unjoined MUC requested."
1920 ) from e
1921
1922 for user in cast(Dict[str, muc.User], room.roster).values():
1923 entity = cast(Optional[SatXMPPEntity], user.entity)
1924 if entity is None:
1925 raise exceptions.InternalError(
1926 f"Participant list of MUC requested, but the entity information of"
1927 f" the participant {user} is not available."
1928 )
1929
1930 bare_jids.add(entity.jid.userhostJID())
1931
1932 return bare_jids
1933
1934 async def get_trust(
1935 self,
1936 client: SatXMPPClient,
1937 public_key: GPGPublicKey,
1938 owner: jid.JID
1939 ) -> TrustLevel:
1940 """Query the trust level of a public key.
1941
1942 @param client: The client to perform this operation under.
1943 @param public_key: The public key.
1944 @param owner: The owner of the public key. Can be a bare JID.
1945 @return: The trust level.
1946 """
1947
1948 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
1949
1950 try:
1951 return TrustLevel(await self.__storage[client.profile][key])
1952 except KeyError:
1953 return TrustLevel.UNDECIDED
1954
1955 async def set_trust(
1956 self,
1957 client: SatXMPPClient,
1958 public_key: GPGPublicKey,
1959 owner: jid.JID,
1960 trust_level: TrustLevel
1961 ) -> None:
1962 """Set the trust level of a public key.
1963
1964 @param client: The client to perform this operation under.
1965 @param public_key: The public key.
1966 @param owner: The owner of the public key. Can be a bare JID.
1967 @param trust_leve: The trust level.
1968 """
1969
1970 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
1971
1972 await self.__storage[client.profile].force(key, trust_level.name)
1973
1974 async def getTrustUI( # pylint: disable=invalid-name
1975 self,
1976 client: SatXMPPClient,
1977 entity: jid.JID
1978 ) -> xml_tools.XMLUI:
1979 """
1980 @param client: The client.
1981 @param entity: The entity whose device trust levels to manage.
1982 @return: An XMLUI instance which opens a form to manage the trust level of all
1983 devices belonging to the entity.
1984 """
1985
1986 if entity.resource:
1987 raise ValueError("A bare JID is expected.")
1988
1989 bare_jids: Set[jid.JID]
1990 if self.__xep_0045 is not None and self.__xep_0045.isJoinedRoom(client, entity):
1991 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
1992 else:
1993 bare_jids = { entity.userhostJID() }
1994
1995 all_public_keys = list({
1996 bare_jid: list(self.list_public_keys(client, bare_jid))
1997 for bare_jid
1998 in bare_jids
1999 }.items())
2000
2001 async def callback(
2002 data: Any,
2003 profile: str # pylint: disable=unused-argument
2004 ) -> Dict[Never, Never]:
2005 """
2006 @param data: The XMLUI result produces by the trust UI form.
2007 @param profile: The profile.
2008 @return: An empty dictionary. The type of the return value was chosen
2009 conservatively since the exact options are neither known not needed here.
2010 """
2011
2012 if C.bool(data.get("cancelled", "false")):
2013 return {}
2014
2015 data_form_result = cast(
2016 Dict[str, str],
2017 xml_tools.XMLUIResult2DataFormResult(data)
2018 )
2019 for key, value in data_form_result.items():
2020 if not key.startswith("trust_"):
2021 continue
2022
2023 outer_index, inner_index = key.split("_")[1:]
2024
2025 owner, public_keys = all_public_keys[int(outer_index)]
2026 public_key = public_keys[int(inner_index)]
2027 trust = TrustLevel(value)
2028
2029 if (await self.get_trust(client, public_key, owner)) is not trust:
2030 await self.set_trust(client, public_key, owner, value)
2031
2032 return {}
2033
2034 submit_id = self.__sat.registerCallback(callback, with_data=True, one_shot=True)
2035
2036 result = xml_tools.XMLUI(
2037 panel_type=C.XMLUI_FORM,
2038 title=D_("OX trust management"),
2039 submit_id=submit_id
2040 )
2041 # Casting this to Any, otherwise all calls on the variable cause type errors
2042 # pylint: disable=no-member
2043 trust_ui = cast(Any, result)
2044 trust_ui.addText(D_(
2045 "This is OX trusting system. You'll see below the GPG keys of your "
2046 "contacts, and a list selection to trust them or not. A trusted key "
2047 "can read your messages in plain text, so be sure to only validate "
2048 "keys that you are sure are belonging to your contact. It's better "
2049 "to do this when you are next to your contact, so "
2050 "you can check the \"fingerprint\" of the key "
2051 "yourself. Do *not* validate a key if the fingerprint is wrong!"
2052 ))
2053
2054 own_secret_keys = self.list_secret_keys(client)
2055
2056 trust_ui.changeContainer("label")
2057 for index, secret_key in enumerate(own_secret_keys):
2058 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
2059 trust_ui.addText(secret_key.public_key.fingerprint)
2060 trust_ui.addEmpty()
2061 trust_ui.addEmpty()
2062
2063 for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
2064 for inner_index, public_key in enumerate(public_keys):
2065 trust_ui.addLabel(D_("Contact"))
2066 trust_ui.addJid(jid.JID(owner))
2067 trust_ui.addLabel(D_("Fingerprint"))
2068 trust_ui.addText(public_key.fingerprint)
2069 trust_ui.addLabel(D_("Trust this device?"))
2070
2071 current_trust_level = await self.get_trust(client, public_key, owner)
2072 avaiable_trust_levels = \
2073 { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
2074
2075 trust_ui.addList(
2076 f"trust_{outer_index}_{inner_index}",
2077 options=[ trust_level.name for trust_level in avaiable_trust_levels ],
2078 selected=current_trust_level.name,
2079 styles=[ "inline" ]
2080 )
2081
2082 trust_ui.addEmpty()
2083 trust_ui.addEmpty()
2084
2085 return result