comparison libervia/backend/plugins/plugin_xep_0373.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0373.py@524856bd7b19
children 040095a5dc7f
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for OpenPGP for XMPP
4 # Copyright (C) 2022-2022 Tim Henkes (me@syndace.dev)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from abc import ABC, abstractmethod
20 import base64
21 from datetime import datetime, timezone
22 import enum
23 import secrets
24 import string
25 from typing import Any, Dict, Iterable, List, Literal, Optional, Set, Tuple, cast
26 from xml.sax.saxutils import quoteattr
27
28 from typing_extensions import Final, NamedTuple, Never, assert_never
29 from wokkel import muc, pubsub
30 from wokkel.disco import DiscoFeature, DiscoInfo
31 import xmlschema
32
33 from libervia.backend.core import exceptions
34 from libervia.backend.core.constants import Const as C
35 from libervia.backend.core.core_types import SatXMPPEntity
36 from libervia.backend.core.i18n import _, D_
37 from libervia.backend.core.log import getLogger, Logger
38 from libervia.backend.core.sat_main import SAT
39 from libervia.backend.core.xmpp import SatXMPPClient
40 from libervia.backend.memory import persistent
41 from libervia.backend.plugins.plugin_xep_0045 import XEP_0045
42 from libervia.backend.plugins.plugin_xep_0060 import XEP_0060
43 from libervia.backend.plugins.plugin_xep_0163 import XEP_0163
44 from libervia.backend.tools.xmpp_datetime import format_datetime, parse_datetime
45 from libervia.backend.tools import xml_tools
46 from twisted.internet import defer
47 from twisted.words.protocols.jabber import jid
48 from twisted.words.xish import domish
49
50 try:
51 import gpg
52 except ImportError as import_error:
53 raise exceptions.MissingModule(
54 "You are missing the 'gpg' package required by the OX plugin. The recommended"
55 " installation method is via your operating system's package manager, since the"
56 " version of the library has to match the version of your GnuPG installation. See"
57 " https://wiki.python.org/moin/GnuPrivacyGuard#Accessing_GnuPG_via_gpgme"
58 ) from import_error
59
60
61 __all__ = [ # pylint: disable=unused-variable
62 "PLUGIN_INFO",
63 "NS_OX",
64 "XEP_0373",
65 "VerificationError",
66 "XMPPInteractionFailed",
67 "InvalidPacket",
68 "DecryptionFailed",
69 "VerificationFailed",
70 "UnknownKey",
71 "GPGProviderError",
72 "GPGPublicKey",
73 "GPGSecretKey",
74 "GPGProvider",
75 "PublicKeyMetadata",
76 "gpg_provider",
77 "TrustLevel"
78 ]
79
80
81 log = cast(Logger, getLogger(__name__)) # type: ignore[no-untyped-call]
82
83
84 PLUGIN_INFO = {
85 C.PI_NAME: "XEP-0373",
86 C.PI_IMPORT_NAME: "XEP-0373",
87 C.PI_TYPE: "SEC",
88 C.PI_PROTOCOLS: [ "XEP-0373" ],
89 C.PI_DEPENDENCIES: [ "XEP-0060", "XEP-0163" ],
90 C.PI_RECOMMENDATIONS: [],
91 C.PI_MAIN: "XEP_0373",
92 C.PI_HANDLER: "no",
93 C.PI_DESCRIPTION: D_("Implementation of OpenPGP for XMPP"),
94 }
95
96
97 NS_OX: Final = "urn:xmpp:openpgp:0"
98
99
100 PARAM_CATEGORY = "Security"
101 PARAM_NAME = "ox_policy"
102 STR_KEY_PUBLIC_KEYS_METADATA = "/public-keys-metadata/{}"
103
104
105 class VerificationError(Exception):
106 """
107 Raised by verifying methods of :class:`XEP_0373` on semantical verification errors.
108 """
109
110
111 class XMPPInteractionFailed(Exception):
112 """
113 Raised by methods of :class:`XEP_0373` on XMPP interaction failure. The reason this
114 exception exists is that the exceptions raised by XMPP interactions are not properly
115 documented for the most part, thus all exceptions are caught and wrapped in instances
116 of this class.
117 """
118
119
120 class InvalidPacket(ValueError):
121 """
122 Raised by methods of :class:`GPGProvider` when an invalid packet is encountered.
123 """
124
125
126 class DecryptionFailed(Exception):
127 """
128 Raised by methods of :class:`GPGProvider` on decryption failures.
129 """
130
131
132 class VerificationFailed(Exception):
133 """
134 Raised by methods of :class:`GPGProvider` on verification failures.
135 """
136
137
138 class UnknownKey(ValueError):
139 """
140 Raised by methods of :class:`GPGProvider` when an unknown key is referenced.
141 """
142
143
144 class GPGProviderError(Exception):
145 """
146 Raised by methods of :class:`GPGProvider` on internal errors.
147 """
148
149
150 class GPGPublicKey(ABC):
151 """
152 Interface describing a GPG public key.
153 """
154
155 @property
156 @abstractmethod
157 def fingerprint(self) -> str:
158 """
159 @return: The OpenPGP v4 fingerprint string of this public key.
160 """
161
162
163 class GPGSecretKey(ABC):
164 """
165 Interface descibing a GPG secret key.
166 """
167
168 @property
169 @abstractmethod
170 def public_key(self) -> GPGPublicKey:
171 """
172 @return: The public key corresponding to this secret key.
173 """
174
175
176 class GPGProvider(ABC):
177 """
178 Interface describing a GPG provider, i.e. a library or framework providing GPG
179 encryption, signing and key management.
180
181 All methods may raise :class:`GPGProviderError` in addition to those exception types
182 listed explicitly.
183
184 # TODO: Check keys for revoked, disabled and expired everywhere and exclude those (?)
185 """
186
187 @abstractmethod
188 def export_public_key(self, public_key: GPGPublicKey) -> bytes:
189 """Export a public key in a key material packet according to RFC 4880 §5.5.
190
191 Do not use OpenPGP's ASCII Armor.
192
193 @param public_key: The public key to export.
194 @return: The packet containing the exported public key.
195 @raise UnknownKey: if the public key is not available.
196 """
197
198 @abstractmethod
199 def import_public_key(self, packet: bytes) -> GPGPublicKey:
200 """import a public key from a key material packet according to RFC 4880 §5.5.
201
202 OpenPGP's ASCII Armor is not used.
203
204 @param packet: A packet containing an exported public key.
205 @return: The public key imported from the packet.
206 @raise InvalidPacket: if the packet is either syntactically or semantically deemed
207 invalid.
208
209 @warning: Only packets of version 4 or higher may be accepted, packets below
210 version 4 MUST be rejected.
211 """
212
213 @abstractmethod
214 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
215 """Export a secret key for transfer according to RFC 4880 §11.1.
216
217 Do not encrypt the secret data, i.e. set the octet indicating string-to-key usage
218 conventions to zero in the corresponding secret-key packet according to RFC 4880
219 §5.5.3. Do not use OpenPGP's ASCII Armor.
220
221 @param secret_key: The secret key to export.
222 @return: The binary blob containing the exported secret key.
223 @raise UnknownKey: if the secret key is not available.
224 """
225
226 @abstractmethod
227 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
228 """Restore secret keys exported for transfer according to RFC 4880 §11.1.
229
230 The secret data is not encrypted, i.e. the octet indicating string-to-key usage
231 conventions in the corresponding secret-key packets according to RFC 4880 §5.5.3
232 are set to zero. OpenPGP's ASCII Armor is not used.
233
234 @param data: Concatenation of one or more secret keys exported for transfer.
235 @return: The secret keys imported from the data.
236 @raise InvalidPacket: if the data or one of the packets included in the data is
237 either syntactically or semantically deemed invalid.
238
239 @warning: Only packets of version 4 or higher may be accepted, packets below
240 version 4 MUST be rejected.
241 """
242
243 @abstractmethod
244 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
245 """Encrypt data symmetrically according to RFC 4880 §5.3.
246
247 The password is used to build a Symmetric-Key Encrypted Session Key packet which
248 precedes the Symmetrically Encrypted Data packet that holds the encrypted data.
249
250 @param plaintext: The data to encrypt.
251 @param password: The password to encrypt the data with.
252 @return: The encrypted data.
253 """
254
255 @abstractmethod
256 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
257 """Decrypt data symmetrically according to RFC 4880 §5.3.
258
259 The ciphertext consists of a Symmetrically Encrypted Data packet that holds the
260 encrypted data, preceded by a Symmetric-Key Encrypted Session Key packet using the
261 password.
262
263 @param ciphertext: The ciphertext.
264 @param password: The password to decrypt the data with.
265 @return: The plaintext.
266 @raise DecryptionFailed: on decryption failure.
267 """
268
269 @abstractmethod
270 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
271 """Sign some data.
272
273 OpenPGP's ASCII Armor is not used.
274
275 @param data: The data to sign.
276 @param secret_keys: The secret keys to sign the data with.
277 @return: The OpenPGP message carrying the signed data.
278 """
279
280 @abstractmethod
281 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
282 """Sign some data. Create the signature detached from the data.
283
284 OpenPGP's ASCII Armor is not used.
285
286 @param data: The data to sign.
287 @param secret_keys: The secret keys to sign the data with.
288 @return: The OpenPGP message carrying the detached signature.
289 """
290
291 @abstractmethod
292 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
293 """Verify signed data.
294
295 OpenPGP's ASCII Armor is not used.
296
297 @param signed_data: The signed data as an OpenPGP message.
298 @param public_keys: The public keys to verify the signature with.
299 @return: The verified and unpacked data.
300 @raise VerificationFailed: if the data could not be verified.
301
302 @warning: For implementors: it has to be confirmed that a valid signature by one
303 of the public keys is available.
304 """
305
306 @abstractmethod
307 def verify_detached(
308 self,
309 data: bytes,
310 signature: bytes,
311 public_keys: Set[GPGPublicKey]
312 ) -> None:
313 """Verify signed data, where the signature was created detached from the data.
314
315 OpenPGP's ASCII Armor is not used.
316
317 @param data: The data.
318 @param signature: The signature as an OpenPGP message.
319 @param public_keys: The public keys to verify the signature with.
320 @raise VerificationFailed: if the data could not be verified.
321
322 @warning: For implementors: it has to be confirmed that a valid signature by one
323 of the public keys is available.
324 """
325
326 @abstractmethod
327 def encrypt(
328 self,
329 plaintext: bytes,
330 public_keys: Set[GPGPublicKey],
331 signing_keys: Optional[Set[GPGSecretKey]] = None
332 ) -> bytes:
333 """Encrypt and optionally sign some data.
334
335 OpenPGP's ASCII Armor is not used.
336
337 @param plaintext: The data to encrypt and optionally sign.
338 @param public_keys: The public keys to encrypt the data for.
339 @param signing_keys: The secret keys to sign the data with.
340 @return: The OpenPGP message carrying the encrypted and optionally signed data.
341 """
342
343 @abstractmethod
344 def decrypt(
345 self,
346 ciphertext: bytes,
347 secret_keys: Set[GPGSecretKey],
348 public_keys: Optional[Set[GPGPublicKey]] = None
349 ) -> bytes:
350 """Decrypt and optionally verify some data.
351
352 OpenPGP's ASCII Armor is not used.
353
354 @param ciphertext: The encrypted and optionally signed data as an OpenPGP message.
355 @param secret_keys: The secret keys to attempt decryption with.
356 @param public_keys: The public keys to verify the optional signature with.
357 @return: The decrypted, optionally verified and unpacked data.
358 @raise DecryptionFailed: on decryption failure.
359 @raise VerificationFailed: if the data could not be verified.
360
361 @warning: For implementors: it has to be confirmed that the data was decrypted
362 using one of the secret keys and that a valid signature by one of the public
363 keys is available in case the data is signed.
364 """
365
366 @abstractmethod
367 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
368 """List public keys.
369
370 @param user_id: The user id.
371 @return: The set of public keys available for this user id.
372 """
373
374 @abstractmethod
375 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
376 """List secret keys.
377
378 @param user_id: The user id.
379 @return: The set of secret keys available for this user id.
380 """
381
382 @abstractmethod
383 def can_sign(self, public_key: GPGPublicKey) -> bool:
384 """
385 @return: Whether the public key belongs to a key pair capable of signing.
386 """
387
388 @abstractmethod
389 def can_encrypt(self, public_key: GPGPublicKey) -> bool:
390 """
391 @return: Whether the public key belongs to a key pair capable of encryption.
392 """
393
394 @abstractmethod
395 def create_key(self, user_id: str) -> GPGSecretKey:
396 """Create a new GPG key, capable of signing and encryption.
397
398 The key is generated without password protection and without expiration. If a key
399 with the same user id already exists, a new key is created anyway.
400
401 @param user_id: The user id to assign to the new key.
402 @return: The new key.
403 """
404
405
406 class GPGME_GPGPublicKey(GPGPublicKey):
407 """
408 GPG public key implementation based on GnuPG Made Easy (GPGME).
409 """
410
411 def __init__(self, key_obj: Any) -> None:
412 """
413 @param key_obj: The GPGME key object.
414 """
415
416 self.__key_obj = key_obj
417
418 @property
419 def fingerprint(self) -> str:
420 return self.__key_obj.fpr
421
422 @property
423 def key_obj(self) -> Any:
424 return self.__key_obj
425
426
427 class GPGME_GPGSecretKey(GPGSecretKey):
428 """
429 GPG secret key implementation based on GnuPG Made Easy (GPGME).
430 """
431
432 def __init__(self, public_key: GPGME_GPGPublicKey) -> None:
433 """
434 @param public_key: The public key corresponding to this secret key.
435 """
436
437 self.__public_key = public_key
438
439 @property
440 def public_key(self) -> GPGME_GPGPublicKey:
441 return self.__public_key
442
443
444 class GPGME_GPGProvider(GPGProvider):
445 """
446 GPG provider implementation based on GnuPG Made Easy (GPGME).
447 """
448
449 def __init__(self, home_dir: Optional[str] = None) -> None:
450 """
451 @param home_dir: Optional GPG home directory path to use for all operations.
452 """
453
454 self.__home_dir = home_dir
455
456 def export_public_key(self, public_key: GPGPublicKey) -> bytes:
457 assert isinstance(public_key, GPGME_GPGPublicKey)
458
459 pattern = public_key.fingerprint
460
461 with gpg.Context(home_dir=self.__home_dir) as c:
462 try:
463 result = c.key_export_minimal(pattern)
464 except gpg.errors.GPGMEError as e:
465 raise GPGProviderError("Internal GPGME error") from e
466
467 if result is None:
468 raise UnknownKey(f"Public key {pattern} not found.")
469
470 return result
471
472 def import_public_key(self, packet: bytes) -> GPGPublicKey:
473 # TODO
474 # - Reject packets older than version 4
475 # - Check whether it's actually a public key (through packet inspection?)
476
477 with gpg.Context(home_dir=self.__home_dir) as c:
478 try:
479 result = c.key_import(packet)
480 except gpg.errors.GPGMEError as e:
481 # From looking at the code, `key_import` never raises. The documentation
482 # says it does though, so this is included for future-proofness.
483 raise GPGProviderError("Internal GPGME error") from e
484
485 if not hasattr(result, "considered"):
486 raise InvalidPacket(
487 f"Data not considered for public key import: {result}"
488 )
489
490 if len(result.imports) != 1:
491 raise InvalidPacket(
492 "Public key packet does not contain exactly one public key (not"
493 " counting subkeys)."
494 )
495
496 try:
497 key_obj = c.get_key(result.imports[0].fpr, secret=False)
498 except gpg.errors.GPGMEError as e:
499 raise GPGProviderError("Internal GPGME error") from e
500 except gpg.errors.KeyError as e:
501 raise GPGProviderError("Newly imported public key not found") from e
502
503 return GPGME_GPGPublicKey(key_obj)
504
505 def backup_secret_key(self, secret_key: GPGSecretKey) -> bytes:
506 assert isinstance(secret_key, GPGME_GPGSecretKey)
507 # TODO
508 # - Handle password protection/pinentry
509 # - Make sure the key is exported unencrypted
510
511 pattern = secret_key.public_key.fingerprint
512
513 with gpg.Context(home_dir=self.__home_dir) as c:
514 try:
515 result = c.key_export_secret(pattern)
516 except gpg.errors.GPGMEError as e:
517 raise GPGProviderError("Internal GPGME error") from e
518
519 if result is None:
520 raise UnknownKey(f"Secret key {pattern} not found.")
521
522 return result
523
524 def restore_secret_keys(self, data: bytes) -> Set[GPGSecretKey]:
525 # TODO
526 # - Reject packets older than version 4
527 # - Check whether it's actually secret keys (through packet inspection?)
528
529 with gpg.Context(home_dir=self.__home_dir) as c:
530 try:
531 result = c.key_import(data)
532 except gpg.errors.GPGMEError as e:
533 # From looking at the code, `key_import` never raises. The documentation
534 # says it does though, so this is included for future-proofness.
535 raise GPGProviderError("Internal GPGME error") from e
536
537 if not hasattr(result, "considered"):
538 raise InvalidPacket(
539 f"Data not considered for secret key import: {result}"
540 )
541
542 if len(result.imports) == 0:
543 raise InvalidPacket("Secret key packet does not contain a secret key.")
544
545 secret_keys = set()
546 for import_status in result.imports:
547 try:
548 key_obj = c.get_key(import_status.fpr, secret=True)
549 except gpg.errors.GPGMEError as e:
550 raise GPGProviderError("Internal GPGME error") from e
551 except gpg.errors.KeyError as e:
552 raise GPGProviderError("Newly imported secret key not found") from e
553
554 secret_keys.add(GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj)))
555
556 return secret_keys
557
558 def encrypt_symmetrically(self, plaintext: bytes, password: str) -> bytes:
559 with gpg.Context(home_dir=self.__home_dir) as c:
560 try:
561 ciphertext, __, __ = c.encrypt(plaintext, passphrase=password, sign=False)
562 except gpg.errors.GPGMEError as e:
563 raise GPGProviderError("Internal GPGME error") from e
564
565 return ciphertext
566
567 def decrypt_symmetrically(self, ciphertext: bytes, password: str) -> bytes:
568 with gpg.Context(home_dir=self.__home_dir) as c:
569 try:
570 plaintext, __, __ = c.decrypt(
571 ciphertext,
572 passphrase=password,
573 verify=False
574 )
575 except gpg.errors.GPGMEError as e:
576 # TODO: Find out what kind of error is raised if the password is wrong and
577 # re-raise it as DecryptionFailed instead.
578 raise GPGProviderError("Internal GPGME error") from e
579 except gpg.UnsupportedAlgorithm as e:
580 raise DecryptionFailed("Unsupported algorithm") from e
581
582 return plaintext
583
584 def sign(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
585 signers = []
586 for secret_key in secret_keys:
587 assert isinstance(secret_key, GPGME_GPGSecretKey)
588
589 signers.append(secret_key.public_key.key_obj)
590
591 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
592 try:
593 signed_data, __ = c.sign(data)
594 except gpg.errors.GPGMEError as e:
595 raise GPGProviderError("Internal GPGME error") from e
596 except gpg.errors.InvalidSigners as e:
597 raise GPGProviderError(
598 "At least one of the secret keys is invalid for signing"
599 ) from e
600
601 return signed_data
602
603 def sign_detached(self, data: bytes, secret_keys: Set[GPGSecretKey]) -> bytes:
604 signers = []
605 for secret_key in secret_keys:
606 assert isinstance(secret_key, GPGME_GPGSecretKey)
607
608 signers.append(secret_key.public_key.key_obj)
609
610 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
611 try:
612 signature, __ = c.sign(data, mode=gpg.constants.sig.mode.DETACH)
613 except gpg.errors.GPGMEError as e:
614 raise GPGProviderError("Internal GPGME error") from e
615 except gpg.errors.InvalidSigners as e:
616 raise GPGProviderError(
617 "At least one of the secret keys is invalid for signing"
618 ) from e
619
620 return signature
621
622 def verify(self, signed_data: bytes, public_keys: Set[GPGPublicKey]) -> bytes:
623 with gpg.Context(home_dir=self.__home_dir) as c:
624 try:
625 data, result = c.verify(signed_data)
626 except gpg.errors.GPGMEError as e:
627 raise GPGProviderError("Internal GPGME error") from e
628 except gpg.errors.BadSignatures as e:
629 raise VerificationFailed("Bad signatures on signed data") from e
630
631 valid_signature_found = False
632 for public_key in public_keys:
633 assert isinstance(public_key, GPGME_GPGPublicKey)
634
635 for subkey in public_key.key_obj.subkeys:
636 for sig in result.signatures:
637 if subkey.can_sign and subkey.fpr == sig.fpr:
638 valid_signature_found = True
639
640 if not valid_signature_found:
641 raise VerificationFailed(
642 "Data not signed by one of the expected public keys"
643 )
644
645 return data
646
647 def verify_detached(
648 self,
649 data: bytes,
650 signature: bytes,
651 public_keys: Set[GPGPublicKey]
652 ) -> None:
653 with gpg.Context(home_dir=self.__home_dir) as c:
654 try:
655 __, result = c.verify(data, signature=signature)
656 except gpg.errors.GPGMEError as e:
657 raise GPGProviderError("Internal GPGME error") from e
658 except gpg.errors.BadSignatures as e:
659 raise VerificationFailed("Bad signatures on signed data") from e
660
661 valid_signature_found = False
662 for public_key in public_keys:
663 assert isinstance(public_key, GPGME_GPGPublicKey)
664
665 for subkey in public_key.key_obj.subkeys:
666 for sig in result.signatures:
667 if subkey.can_sign and subkey.fpr == sig.fpr:
668 valid_signature_found = True
669
670 if not valid_signature_found:
671 raise VerificationFailed(
672 "Data not signed by one of the expected public keys"
673 )
674
675 def encrypt(
676 self,
677 plaintext: bytes,
678 public_keys: Set[GPGPublicKey],
679 signing_keys: Optional[Set[GPGSecretKey]] = None
680 ) -> bytes:
681 recipients = []
682 for public_key in public_keys:
683 assert isinstance(public_key, GPGME_GPGPublicKey)
684
685 recipients.append(public_key.key_obj)
686
687 signers = []
688 if signing_keys is not None:
689 for secret_key in signing_keys:
690 assert isinstance(secret_key, GPGME_GPGSecretKey)
691
692 signers.append(secret_key.public_key.key_obj)
693
694 sign = signing_keys is not None
695
696 with gpg.Context(home_dir=self.__home_dir, signers=signers) as c:
697 try:
698 ciphertext, __, __ = c.encrypt(
699 plaintext,
700 recipients=recipients,
701 sign=sign,
702 always_trust=True,
703 add_encrypt_to=True
704 )
705 except gpg.errors.GPGMEError as e:
706 raise GPGProviderError("Internal GPGME error") from e
707 except gpg.errors.InvalidRecipients as e:
708 raise GPGProviderError(
709 "At least one of the public keys is invalid for encryption"
710 ) from e
711 except gpg.errors.InvalidSigners as e:
712 raise GPGProviderError(
713 "At least one of the signing keys is invalid for signing"
714 ) from e
715
716 return ciphertext
717
718 def decrypt(
719 self,
720 ciphertext: bytes,
721 secret_keys: Set[GPGSecretKey],
722 public_keys: Optional[Set[GPGPublicKey]] = None
723 ) -> bytes:
724 verify = public_keys is not None
725
726 with gpg.Context(home_dir=self.__home_dir) as c:
727 try:
728 plaintext, result, verify_result = c.decrypt(
729 ciphertext,
730 verify=verify
731 )
732 except gpg.errors.GPGMEError as e:
733 raise GPGProviderError("Internal GPGME error") from e
734 except gpg.UnsupportedAlgorithm as e:
735 raise DecryptionFailed("Unsupported algorithm") from e
736
737 # TODO: Check whether the data was decrypted using one of the expected secret
738 # keys
739
740 if public_keys is not None:
741 valid_signature_found = False
742 for public_key in public_keys:
743 assert isinstance(public_key, GPGME_GPGPublicKey)
744
745 for subkey in public_key.key_obj.subkeys:
746 for sig in verify_result.signatures:
747 if subkey.can_sign and subkey.fpr == sig.fpr:
748 valid_signature_found = True
749
750 if not valid_signature_found:
751 raise VerificationFailed(
752 "Data not signed by one of the expected public keys"
753 )
754
755 return plaintext
756
757 def list_public_keys(self, user_id: str) -> Set[GPGPublicKey]:
758 with gpg.Context(home_dir=self.__home_dir) as c:
759 try:
760 return {
761 GPGME_GPGPublicKey(key)
762 for key
763 in c.keylist(pattern=user_id, secret=False)
764 }
765 except gpg.errors.GPGMEError as e:
766 raise GPGProviderError("Internal GPGME error") from e
767
768 def list_secret_keys(self, user_id: str) -> Set[GPGSecretKey]:
769 with gpg.Context(home_dir=self.__home_dir) as c:
770 try:
771 return {
772 GPGME_GPGSecretKey(GPGME_GPGPublicKey(key))
773 for key
774 in c.keylist(pattern=user_id, secret=True)
775 }
776 except gpg.errors.GPGMEError as e:
777 raise GPGProviderError("Internal GPGME error") from e
778
779 def can_sign(self, public_key: GPGPublicKey) -> bool:
780 assert isinstance(public_key, GPGME_GPGPublicKey)
781
782 return any(subkey.can_sign for subkey in public_key.key_obj.subkeys)
783
784 def can_encrypt(self, public_key: GPGPublicKey) -> bool:
785 assert isinstance(public_key, GPGME_GPGPublicKey)
786
787 return any(subkey.can_encrypt for subkey in public_key.key_obj.subkeys)
788
789 def create_key(self, user_id: str) -> GPGSecretKey:
790 with gpg.Context(home_dir=self.__home_dir) as c:
791 try:
792 result = c.create_key(
793 user_id,
794 expires=False,
795 sign=True,
796 encrypt=True,
797 certify=False,
798 authenticate=False,
799 force=True
800 )
801
802 key_obj = c.get_key(result.fpr, secret=True)
803 except gpg.errors.GPGMEError as e:
804 raise GPGProviderError("Internal GPGME error") from e
805 except gpg.errors.KeyError as e:
806 raise GPGProviderError("Newly created key not found") from e
807
808 return GPGME_GPGSecretKey(GPGME_GPGPublicKey(key_obj))
809
810
811 class PublicKeyMetadata(NamedTuple):
812 """
813 Metadata about a published public key.
814 """
815
816 fingerprint: str
817 timestamp: datetime
818
819
820 @enum.unique
821 class TrustLevel(enum.Enum):
822 """
823 The trust levels required for BTBV and manual trust.
824 """
825
826 TRUSTED: str = "TRUSTED"
827 BLINDLY_TRUSTED: str = "BLINDLY_TRUSTED"
828 UNDECIDED: str = "UNDECIDED"
829 DISTRUSTED: str = "DISTRUSTED"
830
831
832 OPENPGP_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
833 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
834 targetNamespace="urn:xmpp:openpgp:0"
835 xmlns="urn:xmpp:openpgp:0">
836
837 <xs:element name="openpgp" type="xs:base64Binary"/>
838 </xs:schema>
839 """)
840
841
842 # The following schema needs verion 1.1 of XML Schema, which is not supported by lxml.
843 # Luckily, xmlschema exists, which is a clean, well maintained, cross-platform
844 # implementation of XML Schema, including version 1.1.
845 CONTENT_SCHEMA = xmlschema.XMLSchema11("""<?xml version="1.1" encoding="utf8"?>
846 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
847 targetNamespace="urn:xmpp:openpgp:0"
848 xmlns="urn:xmpp:openpgp:0">
849
850 <xs:element name="signcrypt">
851 <xs:complexType>
852 <xs:all>
853 <xs:element ref="to" maxOccurs="unbounded"/>
854 <xs:element ref="time"/>
855 <xs:element ref="rpad" minOccurs="0"/>
856 <xs:element ref="payload"/>
857 </xs:all>
858 </xs:complexType>
859 </xs:element>
860
861 <xs:element name="sign">
862 <xs:complexType>
863 <xs:all>
864 <xs:element ref="to" maxOccurs="unbounded"/>
865 <xs:element ref="time"/>
866 <xs:element ref="rpad" minOccurs="0"/>
867 <xs:element ref="payload"/>
868 </xs:all>
869 </xs:complexType>
870 </xs:element>
871
872 <xs:element name="crypt">
873 <xs:complexType>
874 <xs:all>
875 <xs:element ref="to" minOccurs="0" maxOccurs="unbounded"/>
876 <xs:element ref="time"/>
877 <xs:element ref="rpad" minOccurs="0"/>
878 <xs:element ref="payload"/>
879 </xs:all>
880 </xs:complexType>
881 </xs:element>
882
883 <xs:element name="to">
884 <xs:complexType>
885 <xs:attribute name="jid" type="xs:string"/>
886 </xs:complexType>
887 </xs:element>
888
889 <xs:element name="time">
890 <xs:complexType>
891 <xs:attribute name="stamp" type="xs:dateTime"/>
892 </xs:complexType>
893 </xs:element>
894
895 <xs:element name="rpad" type="xs:string"/>
896
897 <xs:element name="payload">
898 <xs:complexType>
899 <xs:sequence>
900 <xs:any minOccurs="0" maxOccurs="unbounded" processContents="skip"/>
901 </xs:sequence>
902 </xs:complexType>
903 </xs:element>
904 </xs:schema>
905 """)
906
907
908 PUBLIC_KEYS_LIST_NODE = "urn:xmpp:openpgp:0:public-keys"
909 PUBLIC_KEYS_LIST_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
910 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
911 targetNamespace="urn:xmpp:openpgp:0"
912 xmlns="urn:xmpp:openpgp:0">
913
914 <xs:element name="public-keys-list">
915 <xs:complexType>
916 <xs:sequence>
917 <xs:element ref="pubkey-metadata" minOccurs="0" maxOccurs="unbounded"/>
918 </xs:sequence>
919 </xs:complexType>
920 </xs:element>
921
922 <xs:element name="pubkey-metadata">
923 <xs:complexType>
924 <xs:attribute name="v4-fingerprint" type="xs:string"/>
925 <xs:attribute name="date" type="xs:dateTime"/>
926 </xs:complexType>
927 </xs:element>
928 </xs:schema>
929 """)
930
931
932 PUBKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
933 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
934 targetNamespace="urn:xmpp:openpgp:0"
935 xmlns="urn:xmpp:openpgp:0">
936
937 <xs:element name="pubkey">
938 <xs:complexType>
939 <xs:all>
940 <xs:element ref="data"/>
941 </xs:all>
942 <xs:anyAttribute processContents="skip"/>
943 </xs:complexType>
944 </xs:element>
945
946 <xs:element name="data" type="xs:base64Binary"/>
947 </xs:schema>
948 """)
949
950
951 SECRETKEY_SCHEMA = xmlschema.XMLSchema("""<?xml version="1.0" encoding="utf8"?>
952 <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
953 targetNamespace="urn:xmpp:openpgp:0"
954 xmlns="urn:xmpp:openpgp:0">
955
956 <xs:element name="secretkey" type="xs:base64Binary"/>
957 </xs:schema>
958 """)
959
960
961 DEFAULT_TRUST_MODEL_PARAM = f"""
962 <params>
963 <individual>
964 <category name="{PARAM_CATEGORY}" label={quoteattr(D_('Security'))}>
965 <param name="{PARAM_NAME}"
966 label={quoteattr(D_('OMEMO default trust policy'))}
967 type="list" security="3">
968 <option value="manual" label={quoteattr(D_('Manual trust (more secure)'))} />
969 <option value="btbv"
970 label={quoteattr(D_('Blind Trust Before Verification (more user friendly)'))}
971 selected="true" />
972 </param>
973 </category>
974 </individual>
975 </params>
976 """
977
978
979 def get_gpg_provider(sat: SAT, client: SatXMPPClient) -> GPGProvider:
980 """Get the GPG provider for a client.
981
982 @param sat: The SAT instance.
983 @param client: The client.
984 @return: The GPG provider specifically for that client.
985 """
986
987 return GPGME_GPGProvider(str(sat.get_local_path(client, "gnupg-home")))
988
989
990 def generate_passphrase() -> str:
991 """Generate a secure passphrase for symmetric encryption.
992
993 @return: The passphrase.
994 """
995
996 return "-".join("".join(
997 secrets.choice("123456789ABCDEFGHIJKLMNPQRSTUVWXYZ") for __ in range(4)
998 ) for __ in range(6))
999
1000
1001 # TODO: Handle the user id mess
1002 class XEP_0373:
1003 """
1004 Implementation of XEP-0373: OpenPGP for XMPP under namespace ``urn:xmpp:openpgp:0``.
1005 """
1006
1007 def __init__(self, host: SAT) -> None:
1008 """
1009 @param sat: The SAT instance.
1010 """
1011
1012 self.host = host
1013
1014 # Add configuration option to choose between manual trust and BTBV as the trust
1015 # model
1016 host.memory.update_params(DEFAULT_TRUST_MODEL_PARAM)
1017
1018 self.__xep_0045 = cast(Optional[XEP_0045], host.plugins.get("XEP-0045"))
1019 self.__xep_0060 = cast(XEP_0060, host.plugins["XEP-0060"])
1020
1021 self.__storage: Dict[str, persistent.LazyPersistentBinaryDict] = {}
1022
1023 xep_0163 = cast(XEP_0163, host.plugins["XEP-0163"])
1024 xep_0163.add_pep_event(
1025 "OX_PUBLIC_KEYS_LIST",
1026 PUBLIC_KEYS_LIST_NODE,
1027 lambda items_event, profile: defer.ensureDeferred(
1028 self.__on_public_keys_list_update(items_event, profile)
1029 )
1030 )
1031
1032 async def profile_connecting(self, client):
1033 client.gpg_provider = get_gpg_provider(self.host, client)
1034
1035 async def profile_connected( # pylint: disable=invalid-name
1036 self,
1037 client: SatXMPPClient
1038 ) -> None:
1039 """
1040 @param client: The client.
1041 """
1042
1043 profile = cast(str, client.profile)
1044
1045 if not profile in self.__storage:
1046 self.__storage[profile] = \
1047 persistent.LazyPersistentBinaryDict("XEP-0373", client.profile)
1048
1049 if len(self.list_secret_keys(client)) == 0:
1050 log.debug(f"Generating first GPG key for {client.jid.userhost()}.")
1051 await self.create_key(client)
1052
1053 async def __on_public_keys_list_update(
1054 self,
1055 items_event: pubsub.ItemsEvent,
1056 profile: str
1057 ) -> None:
1058 """Handle public keys list updates fired by PEP.
1059
1060 @param items_event: The event.
1061 @param profile: The profile this event belongs to.
1062 """
1063
1064 client = self.host.get_client(profile)
1065
1066 sender = cast(jid.JID, items_event.sender)
1067 items = cast(List[domish.Element], items_event.items)
1068
1069 if len(items) > 1:
1070 log.warning("Ignoring public keys list update with more than one element.")
1071 return
1072
1073 item_elt = next(iter(items), None)
1074 if item_elt is None:
1075 log.debug("Ignoring empty public keys list update.")
1076 return
1077
1078 public_keys_list_elt = cast(
1079 Optional[domish.Element],
1080 next(item_elt.elements(NS_OX, "public-keys-list"), None)
1081 )
1082
1083 pubkey_metadata_elts: Optional[List[domish.Element]] = None
1084
1085 if public_keys_list_elt is not None:
1086 try:
1087 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
1088 except xmlschema.XMLSchemaValidationError:
1089 pass
1090 else:
1091 pubkey_metadata_elts = \
1092 list(public_keys_list_elt.elements(NS_OX, "pubkey-metadata"))
1093
1094 if pubkey_metadata_elts is None:
1095 log.warning(f"Malformed public keys list update item: {item_elt.toXml()}")
1096 return
1097
1098 new_public_keys_metadata = { PublicKeyMetadata(
1099 fingerprint=cast(str, pubkey_metadata_elt["v4-fingerprint"]),
1100 timestamp=parse_datetime(cast(str, pubkey_metadata_elt["date"]))
1101 ) for pubkey_metadata_elt in pubkey_metadata_elts }
1102
1103 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(sender.userhost())
1104
1105 local_public_keys_metadata = cast(
1106 Set[PublicKeyMetadata],
1107 await self.__storage[profile].get(storage_key, set())
1108 )
1109
1110 unchanged_keys = new_public_keys_metadata & local_public_keys_metadata
1111 changed_or_new_keys = new_public_keys_metadata - unchanged_keys
1112 available_keys = self.list_public_keys(client, sender)
1113
1114 for key_metadata in changed_or_new_keys:
1115 # Check whether the changed or new key has been imported before
1116 if any(key.fingerprint == key_metadata.fingerprint for key in available_keys):
1117 try:
1118 # If it has been imported before, try to update it
1119 await self.import_public_key(client, sender, key_metadata.fingerprint)
1120 except Exception as e:
1121 log.warning(f"Public key import failed: {e}")
1122
1123 # If the update fails, remove the key from the local metadata list
1124 # such that the update is attempted again next time
1125 new_public_keys_metadata.remove(key_metadata)
1126
1127 # Check whether this update was for our account and make sure all of our keys are
1128 # included in the update
1129 if sender.userhost() == client.jid.userhost():
1130 secret_keys = self.list_secret_keys(client)
1131 missing_keys = set(filter(lambda secret_key: all(
1132 key_metadata.fingerprint != secret_key.public_key.fingerprint
1133 for key_metadata
1134 in new_public_keys_metadata
1135 ), secret_keys))
1136
1137 if len(missing_keys) > 0:
1138 log.warning(
1139 "Public keys list update did not contain at least one of our keys."
1140 f" {new_public_keys_metadata}"
1141 )
1142
1143 for missing_key in missing_keys:
1144 log.warning(missing_key.public_key.fingerprint)
1145 new_public_keys_metadata.add(PublicKeyMetadata(
1146 fingerprint=missing_key.public_key.fingerprint,
1147 timestamp=datetime.now(timezone.utc)
1148 ))
1149
1150 await self.publish_public_keys_list(client, new_public_keys_metadata)
1151
1152 await self.__storage[profile].force(storage_key, new_public_keys_metadata)
1153
1154 def list_public_keys(self, client: SatXMPPClient, jid: jid.JID) -> Set[GPGPublicKey]:
1155 """List GPG public keys available for a JID.
1156
1157 @param client: The client to perform this operation with.
1158 @param jid: The JID. Can be a bare JID.
1159 @return: The set of public keys available for this JID.
1160 """
1161
1162 gpg_provider = get_gpg_provider(self.host, client)
1163
1164 return gpg_provider.list_public_keys(f"xmpp:{jid.userhost()}")
1165
1166 def list_secret_keys(self, client: SatXMPPClient) -> Set[GPGSecretKey]:
1167 """List GPG secret keys available for a JID.
1168
1169 @param client: The client to perform this operation with.
1170 @return: The set of secret keys available for this JID.
1171 """
1172
1173 gpg_provider = get_gpg_provider(self.host, client)
1174
1175 return gpg_provider.list_secret_keys(f"xmpp:{client.jid.userhost()}")
1176
1177 async def create_key(self, client: SatXMPPClient) -> GPGSecretKey:
1178 """Create a new GPG key, capable of signing and encryption.
1179
1180 The key is generated without password protection and without expiration.
1181
1182 @param client: The client to perform this operation with.
1183 @return: The new key.
1184 """
1185
1186 gpg_provider = get_gpg_provider(self.host, client)
1187
1188 secret_key = gpg_provider.create_key(f"xmpp:{client.jid.userhost()}")
1189
1190 await self.publish_public_key(client, secret_key.public_key)
1191
1192 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(client.jid.userhost())
1193
1194 public_keys_list = cast(
1195 Set[PublicKeyMetadata],
1196 await self.__storage[client.profile].get(storage_key, set())
1197 )
1198
1199 public_keys_list.add(PublicKeyMetadata(
1200 fingerprint=secret_key.public_key.fingerprint,
1201 timestamp=datetime.now(timezone.utc)
1202 ))
1203
1204 await self.publish_public_keys_list(client, public_keys_list)
1205
1206 await self.__storage[client.profile].force(storage_key, public_keys_list)
1207
1208 return secret_key
1209
1210 @staticmethod
1211 def __build_content_element(
1212 element_name: Literal["signcrypt", "sign", "crypt"],
1213 recipient_jids: Iterable[jid.JID],
1214 include_rpad: bool
1215 ) -> Tuple[domish.Element, domish.Element]:
1216 """Build a content element.
1217
1218 @param element_name: The name of the content element.
1219 @param recipient_jids: The intended recipients of this content element. Can be
1220 bare JIDs.
1221 @param include_rpad: Whether to include random-length random-content padding.
1222 @return: The content element and the ``<payload/>`` element to add the stanza
1223 extension elements to.
1224 """
1225
1226 content_elt = domish.Element((NS_OX, element_name))
1227
1228 for recipient_jid in recipient_jids:
1229 content_elt.addElement("to")["jid"] = recipient_jid.userhost()
1230
1231 content_elt.addElement("time")["stamp"] = format_datetime()
1232
1233 if include_rpad:
1234 # XEP-0373 doesn't specify bounds for the length of the random padding. This
1235 # uses the bounds specified in XEP-0420 for the closely related rpad affix.
1236 rpad_length = secrets.randbelow(201)
1237 rpad_content = "".join(
1238 secrets.choice(string.digits + string.ascii_letters + string.punctuation)
1239 for __
1240 in range(rpad_length)
1241 )
1242 content_elt.addElement("rpad", content=rpad_content)
1243
1244 payload_elt = content_elt.addElement("payload")
1245
1246 return content_elt, payload_elt
1247
1248 @staticmethod
1249 def build_signcrypt_element(
1250 recipient_jids: Iterable[jid.JID]
1251 ) -> Tuple[domish.Element, domish.Element]:
1252 """Build a ``<signcrypt/>`` content element.
1253
1254 @param recipient_jids: The intended recipients of this content element. Can be
1255 bare JIDs.
1256 @return: The ``<signcrypt/>`` element and the ``<payload/>`` element to add the
1257 stanza extension elements to.
1258 """
1259
1260 if len(recipient_jids) == 0:
1261 raise ValueError("Recipient JIDs must be provided.")
1262
1263 return XEP_0373.__build_content_element("signcrypt", recipient_jids, True)
1264
1265 @staticmethod
1266 def build_sign_element(
1267 recipient_jids: Iterable[jid.JID],
1268 include_rpad: bool
1269 ) -> Tuple[domish.Element, domish.Element]:
1270 """Build a ``<sign/>`` content element.
1271
1272 @param recipient_jids: The intended recipients of this content element. Can be
1273 bare JIDs.
1274 @param include_rpad: Whether to include random-length random-content padding,
1275 which is OPTIONAL for the ``<sign/>`` content element.
1276 @return: The ``<sign/>`` element and the ``<payload/>`` element to add the stanza
1277 extension elements to.
1278 """
1279
1280 if len(recipient_jids) == 0:
1281 raise ValueError("Recipient JIDs must be provided.")
1282
1283 return XEP_0373.__build_content_element("sign", recipient_jids, include_rpad)
1284
1285 @staticmethod
1286 def build_crypt_element(
1287 recipient_jids: Iterable[jid.JID]
1288 ) -> Tuple[domish.Element, domish.Element]:
1289 """Build a ``<crypt/>`` content element.
1290
1291 @param recipient_jids: The intended recipients of this content element. Specifying
1292 the intended recipients is OPTIONAL for the ``<crypt/>`` content element. Can
1293 be bare JIDs.
1294 @return: The ``<crypt/>`` element and the ``<payload/>`` element to add the stanza
1295 extension elements to.
1296 """
1297
1298 return XEP_0373.__build_content_element("crypt", recipient_jids, True)
1299
1300 async def build_openpgp_element(
1301 self,
1302 client: SatXMPPClient,
1303 content_elt: domish.Element,
1304 recipient_jids: Set[jid.JID]
1305 ) -> domish.Element:
1306 """Build an ``<openpgp/>`` element.
1307
1308 @param client: The client to perform this operation with.
1309 @param content_elt: The content element to contain in the ``<openpgp/>`` element.
1310 @param recipient_jids: The recipient's JIDs. Can be bare JIDs.
1311 @return: The ``<openpgp/>`` element.
1312 """
1313
1314 gpg_provider = get_gpg_provider(self.host, client)
1315
1316 # TODO: I'm not sure whether we want to sign with all keys by default or choose
1317 # just one key/a subset of keys to sign with.
1318 signing_keys = set(filter(
1319 lambda secret_key: gpg_provider.can_sign(secret_key.public_key),
1320 self.list_secret_keys(client)
1321 ))
1322
1323 encryption_keys: Set[GPGPublicKey] = set()
1324
1325 for recipient_jid in recipient_jids:
1326 # import all keys of the recipient
1327 all_public_keys = await self.import_all_public_keys(client, recipient_jid)
1328
1329 # Filter for keys that can encrypt
1330 encryption_keys |= set(filter(gpg_provider.can_encrypt, all_public_keys))
1331
1332 # TODO: Handle trust
1333
1334 content = content_elt.toXml().encode("utf-8")
1335 data: bytes
1336
1337 if content_elt.name == "signcrypt":
1338 data = gpg_provider.encrypt(content, encryption_keys, signing_keys)
1339 elif content_elt.name == "sign":
1340 data = gpg_provider.sign(content, signing_keys)
1341 elif content_elt.name == "crypt":
1342 data = gpg_provider.encrypt(content, encryption_keys)
1343 else:
1344 raise ValueError(f"Unknown content element <{content_elt.name}/>")
1345
1346 openpgp_elt = domish.Element((NS_OX, "openpgp"))
1347 openpgp_elt.addContent(base64.b64encode(data).decode("ASCII"))
1348 return openpgp_elt
1349
1350 async def unpack_openpgp_element(
1351 self,
1352 client: SatXMPPClient,
1353 openpgp_elt: domish.Element,
1354 element_name: Literal["signcrypt", "sign", "crypt"],
1355 sender_jid: jid.JID
1356 ) -> Tuple[domish.Element, datetime]:
1357 """Verify, decrypt and unpack an ``<openpgp/>`` element.
1358
1359 @param client: The client to perform this operation with.
1360 @param openpgp_elt: The ``<openpgp/>`` element.
1361 @param element_name: The name of the content element.
1362 @param sender_jid: The sender's JID. Can be a bare JID.
1363 @return: The ``<payload/>`` element containing the decrypted/verified stanza
1364 extension elements carried by this ``<openpgp/>`` element, and the timestamp
1365 contained in the content element.
1366 @raise exceptions.ParsingError: on syntactical verification errors.
1367 @raise VerificationError: on semantical verification errors accoding to XEP-0373.
1368 @raise DecryptionFailed: on decryption failure.
1369 @raise VerificationFailed: if the data could not be verified.
1370
1371 @warning: The timestamp is not verified for plausibility; this SHOULD be done by
1372 the calling code.
1373 """
1374
1375 gpg_provider = get_gpg_provider(self.host, client)
1376
1377 decryption_keys = set(filter(
1378 lambda secret_key: gpg_provider.can_encrypt(secret_key.public_key),
1379 self.list_secret_keys(client)
1380 ))
1381
1382 # import all keys of the sender
1383 all_public_keys = await self.import_all_public_keys(client, sender_jid)
1384
1385 # Filter for keys that can sign
1386 verification_keys = set(filter(gpg_provider.can_sign, all_public_keys))
1387
1388 # TODO: Handle trust
1389
1390 try:
1391 OPENPGP_SCHEMA.validate(openpgp_elt.toXml())
1392 except xmlschema.XMLSchemaValidationError as e:
1393 raise exceptions.ParsingError(
1394 "<openpgp/> element doesn't pass schema validation."
1395 ) from e
1396
1397 openpgp_message = base64.b64decode(str(openpgp_elt))
1398 content: bytes
1399
1400 if element_name == "signcrypt":
1401 content = gpg_provider.decrypt(
1402 openpgp_message,
1403 decryption_keys,
1404 public_keys=verification_keys
1405 )
1406 elif element_name == "sign":
1407 content = gpg_provider.verify(openpgp_message, verification_keys)
1408 elif element_name == "crypt":
1409 content = gpg_provider.decrypt(openpgp_message, decryption_keys)
1410 else:
1411 assert_never(element_name)
1412
1413 try:
1414 content_elt = cast(
1415 domish.Element,
1416 xml_tools.ElementParser()(content.decode("utf-8"))
1417 )
1418 except UnicodeDecodeError as e:
1419 raise exceptions.ParsingError("UTF-8 decoding error") from e
1420
1421 try:
1422 CONTENT_SCHEMA.validate(content_elt.toXml())
1423 except xmlschema.XMLSchemaValidationError as e:
1424 raise exceptions.ParsingError(
1425 f"<{element_name}/> element doesn't pass schema validation."
1426 ) from e
1427
1428 if content_elt.name != element_name:
1429 raise exceptions.ParsingError(f"Not a <{element_name}/> element.")
1430
1431 recipient_jids = \
1432 { jid.JID(to_elt["jid"]) for to_elt in content_elt.elements(NS_OX, "to") }
1433
1434 if (
1435 client.jid.userhostJID() not in { jid.userhostJID() for jid in recipient_jids }
1436 and element_name != "crypt"
1437 ):
1438 raise VerificationError(
1439 f"Recipient list in <{element_name}/> element does not list our (bare)"
1440 f" JID."
1441 )
1442
1443 time_elt = next(content_elt.elements(NS_OX, "time"))
1444
1445 timestamp = parse_datetime(time_elt["stamp"])
1446
1447 payload_elt = next(content_elt.elements(NS_OX, "payload"))
1448
1449 return payload_elt, timestamp
1450
1451 async def publish_public_key(
1452 self,
1453 client: SatXMPPClient,
1454 public_key: GPGPublicKey
1455 ) -> None:
1456 """Publish a public key.
1457
1458 @param client: The client.
1459 @param public_key: The public key to publish.
1460 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1461 """
1462
1463 gpg_provider = get_gpg_provider(self.host, client)
1464
1465 packet = gpg_provider.export_public_key(public_key)
1466
1467 node = f"urn:xmpp:openpgp:0:public-keys:{public_key.fingerprint}"
1468
1469 pubkey_elt = domish.Element((NS_OX, "pubkey"))
1470
1471 pubkey_elt.addElement("data", content=base64.b64encode(packet).decode("ASCII"))
1472
1473 try:
1474 await self.__xep_0060.send_item(
1475 client,
1476 client.jid.userhostJID(),
1477 node,
1478 pubkey_elt,
1479 format_datetime(),
1480 extra={
1481 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
1482 XEP_0060.OPT_PERSIST_ITEMS: "true",
1483 XEP_0060.OPT_ACCESS_MODEL: "open",
1484 XEP_0060.OPT_MAX_ITEMS: 1
1485 },
1486 # TODO: Do we really want publish_without_options here?
1487 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
1488 }
1489 )
1490 except Exception as e:
1491 raise XMPPInteractionFailed("Publishing the public key failed.") from e
1492
1493 async def import_all_public_keys(
1494 self,
1495 client: SatXMPPClient,
1496 entity_jid: jid.JID
1497 ) -> Set[GPGPublicKey]:
1498 """import all public keys of a JID that have not been imported before.
1499
1500 @param client: The client.
1501 @param jid: The JID. Can be a bare JID.
1502 @return: The public keys.
1503 @note: Failure to import a key simply results in the key not being included in the
1504 result.
1505 """
1506
1507 available_public_keys = self.list_public_keys(client, entity_jid)
1508
1509 storage_key = STR_KEY_PUBLIC_KEYS_METADATA.format(entity_jid.userhost())
1510
1511 public_keys_metadata = cast(
1512 Set[PublicKeyMetadata],
1513 await self.__storage[client.profile].get(storage_key, set())
1514 )
1515 if not public_keys_metadata:
1516 public_keys_metadata = await self.download_public_keys_list(
1517 client, entity_jid
1518 )
1519 if not public_keys_metadata:
1520 raise exceptions.NotFound(
1521 f"Can't find public keys for {entity_jid}"
1522 )
1523 else:
1524 await self.__storage[client.profile].aset(
1525 storage_key, public_keys_metadata
1526 )
1527
1528
1529 missing_keys = set(filter(lambda public_key_metadata: all(
1530 public_key_metadata.fingerprint != public_key.fingerprint
1531 for public_key
1532 in available_public_keys
1533 ), public_keys_metadata))
1534
1535 for missing_key in missing_keys:
1536 try:
1537 available_public_keys.add(
1538 await self.import_public_key(client, entity_jid, missing_key.fingerprint)
1539 )
1540 except Exception as e:
1541 log.warning(
1542 f"import of public key {missing_key.fingerprint} owned by"
1543 f" {entity_jid.userhost()} failed, ignoring: {e}"
1544 )
1545
1546 return available_public_keys
1547
1548 async def import_public_key(
1549 self,
1550 client: SatXMPPClient,
1551 jid: jid.JID,
1552 fingerprint: str
1553 ) -> GPGPublicKey:
1554 """import a public key.
1555
1556 @param client: The client.
1557 @param jid: The JID owning the public key. Can be a bare JID.
1558 @param fingerprint: The fingerprint of the public key.
1559 @return: The public key.
1560 @raise exceptions.NotFound: if the public key was not found.
1561 @raise exceptions.ParsingError: on XML-level parsing errors.
1562 @raise InvalidPacket: if the packet is either syntactically or semantically deemed
1563 invalid.
1564 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1565 """
1566
1567 gpg_provider = get_gpg_provider(self.host, client)
1568
1569 node = f"urn:xmpp:openpgp:0:public-keys:{fingerprint}"
1570
1571 try:
1572 items, __ = await self.__xep_0060.get_items(
1573 client,
1574 jid.userhostJID(),
1575 node,
1576 max_items=1
1577 )
1578 except exceptions.NotFound as e:
1579 raise exceptions.NotFound(
1580 f"No public key with fingerprint {fingerprint} published by JID"
1581 f" {jid.userhost()}."
1582 ) from e
1583 except Exception as e:
1584 raise XMPPInteractionFailed("Fetching the public keys list failed.") from e
1585
1586 try:
1587 item_elt = cast(domish.Element, items[0])
1588 except IndexError as e:
1589 raise exceptions.NotFound(
1590 f"No public key with fingerprint {fingerprint} published by JID"
1591 f" {jid.userhost()}."
1592 ) from e
1593
1594 pubkey_elt = cast(
1595 Optional[domish.Element],
1596 next(item_elt.elements(NS_OX, "pubkey"), None)
1597 )
1598
1599 if pubkey_elt is None:
1600 raise exceptions.ParsingError(
1601 f"Publish-Subscribe item of JID {jid.userhost()} doesn't contain pubkey"
1602 f" element."
1603 )
1604
1605 try:
1606 PUBKEY_SCHEMA.validate(pubkey_elt.toXml())
1607 except xmlschema.XMLSchemaValidationError as e:
1608 raise exceptions.ParsingError(
1609 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass pubkey"
1610 f" schema validation."
1611 ) from e
1612
1613 public_key = gpg_provider.import_public_key(base64.b64decode(str(
1614 next(pubkey_elt.elements(NS_OX, "data"))
1615 )))
1616
1617 return public_key
1618
1619 async def publish_public_keys_list(
1620 self,
1621 client: SatXMPPClient,
1622 public_keys_list: Iterable[PublicKeyMetadata]
1623 ) -> None:
1624 """Publish/update the own public keys list.
1625
1626 @param client: The client.
1627 @param public_keys_list: The public keys list.
1628 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1629
1630 @warning: All public keys referenced in the public keys list MUST be published
1631 beforehand.
1632 """
1633
1634 if len({ pkm.fingerprint for pkm in public_keys_list }) != len(public_keys_list):
1635 raise ValueError("Public keys list contains duplicate fingerprints.")
1636
1637 node = "urn:xmpp:openpgp:0:public-keys"
1638
1639 public_keys_list_elt = domish.Element((NS_OX, "public-keys-list"))
1640
1641 for public_key_metadata in public_keys_list:
1642 pubkey_metadata_elt = public_keys_list_elt.addElement("pubkey-metadata")
1643 pubkey_metadata_elt["v4-fingerprint"] = public_key_metadata.fingerprint
1644 pubkey_metadata_elt["date"] = format_datetime(public_key_metadata.timestamp)
1645
1646 try:
1647 await self.__xep_0060.send_item(
1648 client,
1649 client.jid.userhostJID(),
1650 node,
1651 public_keys_list_elt,
1652 item_id=XEP_0060.ID_SINGLETON,
1653 extra={
1654 XEP_0060.EXTRA_PUBLISH_OPTIONS: {
1655 XEP_0060.OPT_PERSIST_ITEMS: "true",
1656 XEP_0060.OPT_ACCESS_MODEL: "open",
1657 XEP_0060.OPT_MAX_ITEMS: 1
1658 },
1659 # TODO: Do we really want publish_without_options here?
1660 XEP_0060.EXTRA_ON_PRECOND_NOT_MET: "publish_without_options"
1661 }
1662 )
1663 except Exception as e:
1664 raise XMPPInteractionFailed("Publishing the public keys list failed.") from e
1665
1666 async def download_public_keys_list(
1667 self,
1668 client: SatXMPPClient,
1669 jid: jid.JID
1670 ) -> Optional[Set[PublicKeyMetadata]]:
1671 """Download the public keys list of a JID.
1672
1673 @param client: The client.
1674 @param jid: The JID. Can be a bare JID.
1675 @return: The public keys list or ``None`` if the JID hasn't published a public
1676 keys list. An empty list means the JID has published an empty list.
1677 @raise exceptions.ParsingError: on XML-level parsing errors.
1678 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1679 """
1680
1681 node = "urn:xmpp:openpgp:0:public-keys"
1682
1683 try:
1684 items, __ = await self.__xep_0060.get_items(
1685 client,
1686 jid.userhostJID(),
1687 node,
1688 max_items=1
1689 )
1690 except exceptions.NotFound:
1691 return None
1692 except Exception as e:
1693 raise XMPPInteractionFailed() from e
1694
1695 try:
1696 item_elt = cast(domish.Element, items[0])
1697 except IndexError:
1698 return None
1699
1700 public_keys_list_elt = cast(
1701 Optional[domish.Element],
1702 next(item_elt.elements(NS_OX, "public-keys-list"), None)
1703 )
1704
1705 if public_keys_list_elt is None:
1706 return None
1707
1708 try:
1709 PUBLIC_KEYS_LIST_SCHEMA.validate(public_keys_list_elt.toXml())
1710 except xmlschema.XMLSchemaValidationError as e:
1711 raise exceptions.ParsingError(
1712 f"Publish-Subscribe item of JID {jid.userhost()} doesn't pass public keys"
1713 f" list schema validation."
1714 ) from e
1715
1716 return {
1717 PublicKeyMetadata(
1718 fingerprint=pubkey_metadata_elt["v4-fingerprint"],
1719 timestamp=parse_datetime(pubkey_metadata_elt["date"])
1720 )
1721 for pubkey_metadata_elt
1722 in public_keys_list_elt.elements(NS_OX, "pubkey-metadata")
1723 }
1724
1725 async def __prepare_secret_key_synchronization(
1726 self,
1727 client: SatXMPPClient
1728 ) -> Optional[domish.Element]:
1729 """Prepare for secret key synchronization.
1730
1731 Makes sure the relative protocols and protocol extensions are supported by the
1732 server and makes sure that the PEP node for secret synchronization exists and is
1733 configured correctly. The node is created if necessary.
1734
1735 @param client: The client.
1736 @return: As part of the preparations, the secret key synchronization PEP node is
1737 fetched. The result of that fetch is returned here.
1738 @raise exceptions.FeatureNotFound: if the server lacks support for the required
1739 protocols or protocol extensions.
1740 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1741 """
1742
1743 try:
1744 infos = cast(DiscoInfo, await self.host.memory.disco.get_infos(
1745 client,
1746 client.jid.userhostJID()
1747 ))
1748 except Exception as e:
1749 raise XMPPInteractionFailed(
1750 "Error performing service discovery on the own bare JID."
1751 ) from e
1752
1753 identities = cast(Dict[Tuple[str, str], str], infos.identities)
1754 features = cast(Set[DiscoFeature], infos.features)
1755
1756 if ("pubsub", "pep") not in identities:
1757 raise exceptions.FeatureNotFound("Server doesn't support PEP.")
1758
1759 if "http://jabber.org/protocol/pubsub#access-whitelist" not in features:
1760 raise exceptions.FeatureNotFound(
1761 "Server doesn't support the whitelist access model."
1762 )
1763
1764 persistent_items_supported = \
1765 "http://jabber.org/protocol/pubsub#persistent-items" in features
1766
1767 # TODO: persistent-items is a SHOULD, how do we handle the feature missing?
1768
1769 node = "urn:xmpp:openpgp:0:secret-key"
1770
1771 try:
1772 items, __ = await self.__xep_0060.get_items(
1773 client,
1774 client.jid.userhostJID(),
1775 node,
1776 max_items=1
1777 )
1778 except exceptions.NotFound:
1779 try:
1780 await self.__xep_0060.createNode(
1781 client,
1782 client.jid.userhostJID(),
1783 node,
1784 {
1785 XEP_0060.OPT_PERSIST_ITEMS: "true",
1786 XEP_0060.OPT_ACCESS_MODEL: "whitelist",
1787 XEP_0060.OPT_MAX_ITEMS: "1"
1788 }
1789 )
1790 except Exception as e:
1791 raise XMPPInteractionFailed(
1792 "Error creating the secret key synchronization node."
1793 ) from e
1794 except Exception as e:
1795 raise XMPPInteractionFailed(
1796 "Error fetching the secret key synchronization node."
1797 ) from e
1798
1799 try:
1800 return cast(domish.Element, items[0])
1801 except IndexError:
1802 return None
1803
1804 async def export_secret_keys(
1805 self,
1806 client: SatXMPPClient,
1807 secret_keys: Iterable[GPGSecretKey]
1808 ) -> str:
1809 """Export secret keys to synchronize them with other devices.
1810
1811 @param client: The client.
1812 @param secret_keys: The secret keys to export.
1813 @return: The backup code needed to decrypt the exported secret keys.
1814 @raise exceptions.FeatureNotFound: if the server lacks support for the required
1815 protocols or protocol extensions.
1816 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1817 """
1818
1819 gpg_provider = get_gpg_provider(self.host, client)
1820
1821 await self.__prepare_secret_key_synchronization(client)
1822
1823 backup_code = generate_passphrase()
1824
1825 plaintext = b"".join(
1826 gpg_provider.backup_secret_key(secret_key) for secret_key in secret_keys
1827 )
1828
1829 ciphertext = gpg_provider.encrypt_symmetrically(plaintext, backup_code)
1830
1831 node = "urn:xmpp:openpgp:0:secret-key"
1832
1833 secretkey_elt = domish.Element((NS_OX, "secretkey"))
1834 secretkey_elt.addContent(base64.b64encode(ciphertext).decode("ASCII"))
1835
1836 try:
1837 await self.__xep_0060.send_item(
1838 client,
1839 client.jid.userhostJID(),
1840 node,
1841 secretkey_elt
1842 )
1843 except Exception as e:
1844 raise XMPPInteractionFailed("Publishing the secret keys failed.") from e
1845
1846 return backup_code
1847
1848 async def download_secret_keys(self, client: SatXMPPClient) -> Optional[bytes]:
1849 """Download previously exported secret keys to import them in a second step.
1850
1851 The downloading and importing steps are separate since a backup code is required
1852 for the import and it should be possible to try multiple backup codes without
1853 redownloading the data every time. The second half of the import procedure is
1854 provided by :meth:`import_secret_keys`.
1855
1856 @param client: The client.
1857 @return: The encrypted secret keys previously exported, if any.
1858 @raise exceptions.FeatureNotFound: if the server lacks support for the required
1859 protocols or protocol extensions.
1860 @raise exceptions.ParsingError: on XML-level parsing errors.
1861 @raise XMPPInteractionFailed: if any interaction via XMPP failed.
1862 """
1863
1864 item_elt = await self.__prepare_secret_key_synchronization(client)
1865 if item_elt is None:
1866 return None
1867
1868 secretkey_elt = cast(
1869 Optional[domish.Element],
1870 next(item_elt.elements(NS_OX, "secretkey"), None)
1871 )
1872
1873 if secretkey_elt is None:
1874 return None
1875
1876 try:
1877 SECRETKEY_SCHEMA.validate(secretkey_elt.toXml())
1878 except xmlschema.XMLSchemaValidationError as e:
1879 raise exceptions.ParsingError(
1880 "Publish-Subscribe item doesn't pass secretkey schema validation."
1881 ) from e
1882
1883 return base64.b64decode(str(secretkey_elt))
1884
1885 def import_secret_keys(
1886 self,
1887 client: SatXMPPClient,
1888 ciphertext: bytes,
1889 backup_code: str
1890 ) -> Set[GPGSecretKey]:
1891 """import previously downloaded secret keys.
1892
1893 The downloading and importing steps are separate since a backup code is required
1894 for the import and it should be possible to try multiple backup codes without
1895 redownloading the data every time. The first half of the import procedure is
1896 provided by :meth:`download_secret_keys`.
1897
1898 @param client: The client to perform this operation with.
1899 @param ciphertext: The ciphertext, i.e. the data returned by
1900 :meth:`download_secret_keys`.
1901 @param backup_code: The backup code needed to decrypt the data.
1902 @raise InvalidPacket: if one of the GPG packets building the secret key data is
1903 either syntactically or semantically deemed invalid.
1904 @raise DecryptionFailed: on decryption failure.
1905 """
1906
1907 gpg_provider = get_gpg_provider(self.host, client)
1908
1909 return gpg_provider.restore_secret_keys(gpg_provider.decrypt_symmetrically(
1910 ciphertext,
1911 backup_code
1912 ))
1913
1914 @staticmethod
1915 def __get_joined_muc_users(
1916 client: SatXMPPClient,
1917 xep_0045: XEP_0045,
1918 room_jid: jid.JID
1919 ) -> Set[jid.JID]:
1920 """
1921 @param client: The client.
1922 @param xep_0045: A MUC plugin instance.
1923 @param room_jid: The room JID.
1924 @return: A set containing the bare JIDs of the MUC participants.
1925 @raise InternalError: if the MUC is not joined or the entity information of a
1926 participant isn't available.
1927 """
1928 # TODO: This should probably be a global helper somewhere
1929
1930 bare_jids: Set[jid.JID] = set()
1931
1932 try:
1933 room = cast(muc.Room, xep_0045.get_room(client, room_jid))
1934 except exceptions.NotFound as e:
1935 raise exceptions.InternalError(
1936 "Participant list of unjoined MUC requested."
1937 ) from e
1938
1939 for user in cast(Dict[str, muc.User], room.roster).values():
1940 entity = cast(Optional[SatXMPPEntity], user.entity)
1941 if entity is None:
1942 raise exceptions.InternalError(
1943 f"Participant list of MUC requested, but the entity information of"
1944 f" the participant {user} is not available."
1945 )
1946
1947 bare_jids.add(entity.jid.userhostJID())
1948
1949 return bare_jids
1950
1951 async def get_trust(
1952 self,
1953 client: SatXMPPClient,
1954 public_key: GPGPublicKey,
1955 owner: jid.JID
1956 ) -> TrustLevel:
1957 """Query the trust level of a public key.
1958
1959 @param client: The client to perform this operation under.
1960 @param public_key: The public key.
1961 @param owner: The owner of the public key. Can be a bare JID.
1962 @return: The trust level.
1963 """
1964
1965 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
1966
1967 try:
1968 return TrustLevel(await self.__storage[client.profile][key])
1969 except KeyError:
1970 return TrustLevel.UNDECIDED
1971
1972 async def set_trust(
1973 self,
1974 client: SatXMPPClient,
1975 public_key: GPGPublicKey,
1976 owner: jid.JID,
1977 trust_level: TrustLevel
1978 ) -> None:
1979 """Set the trust level of a public key.
1980
1981 @param client: The client to perform this operation under.
1982 @param public_key: The public key.
1983 @param owner: The owner of the public key. Can be a bare JID.
1984 @param trust_leve: The trust level.
1985 """
1986
1987 key = f"/trust/{owner.userhost()}/{public_key.fingerprint}"
1988
1989 await self.__storage[client.profile].force(key, trust_level.name)
1990
1991 async def get_trust_ui( # pylint: disable=invalid-name
1992 self,
1993 client: SatXMPPClient,
1994 entity: jid.JID
1995 ) -> xml_tools.XMLUI:
1996 """
1997 @param client: The client.
1998 @param entity: The entity whose device trust levels to manage.
1999 @return: An XMLUI instance which opens a form to manage the trust level of all
2000 devices belonging to the entity.
2001 """
2002
2003 if entity.resource:
2004 raise ValueError("A bare JID is expected.")
2005
2006 bare_jids: Set[jid.JID]
2007 if self.__xep_0045 is not None and self.__xep_0045.is_joined_room(client, entity):
2008 bare_jids = self.__get_joined_muc_users(client, self.__xep_0045, entity)
2009 else:
2010 bare_jids = { entity.userhostJID() }
2011
2012 all_public_keys = list({
2013 bare_jid: list(self.list_public_keys(client, bare_jid))
2014 for bare_jid
2015 in bare_jids
2016 }.items())
2017
2018 async def callback(
2019 data: Any,
2020 profile: str # pylint: disable=unused-argument
2021 ) -> Dict[Never, Never]:
2022 """
2023 @param data: The XMLUI result produces by the trust UI form.
2024 @param profile: The profile.
2025 @return: An empty dictionary. The type of the return value was chosen
2026 conservatively since the exact options are neither known not needed here.
2027 """
2028
2029 if C.bool(data.get("cancelled", "false")):
2030 return {}
2031
2032 data_form_result = cast(
2033 Dict[str, str],
2034 xml_tools.xmlui_result_2_data_form_result(data)
2035 )
2036 for key, value in data_form_result.items():
2037 if not key.startswith("trust_"):
2038 continue
2039
2040 outer_index, inner_index = key.split("_")[1:]
2041
2042 owner, public_keys = all_public_keys[int(outer_index)]
2043 public_key = public_keys[int(inner_index)]
2044 trust = TrustLevel(value)
2045
2046 if (await self.get_trust(client, public_key, owner)) is not trust:
2047 await self.set_trust(client, public_key, owner, value)
2048
2049 return {}
2050
2051 submit_id = self.host.register_callback(callback, with_data=True, one_shot=True)
2052
2053 result = xml_tools.XMLUI(
2054 panel_type=C.XMLUI_FORM,
2055 title=D_("OX trust management"),
2056 submit_id=submit_id
2057 )
2058 # Casting this to Any, otherwise all calls on the variable cause type errors
2059 # pylint: disable=no-member
2060 trust_ui = cast(Any, result)
2061 trust_ui.addText(D_(
2062 "This is OX trusting system. You'll see below the GPG keys of your "
2063 "contacts, and a list selection to trust them or not. A trusted key "
2064 "can read your messages in plain text, so be sure to only validate "
2065 "keys that you are sure are belonging to your contact. It's better "
2066 "to do this when you are next to your contact, so "
2067 "you can check the \"fingerprint\" of the key "
2068 "yourself. Do *not* validate a key if the fingerprint is wrong!"
2069 ))
2070
2071 own_secret_keys = self.list_secret_keys(client)
2072
2073 trust_ui.change_container("label")
2074 for index, secret_key in enumerate(own_secret_keys):
2075 trust_ui.addLabel(D_(f"Own secret key {index} fingerprint"))
2076 trust_ui.addText(secret_key.public_key.fingerprint)
2077 trust_ui.addEmpty()
2078 trust_ui.addEmpty()
2079
2080 for outer_index, [ owner, public_keys ] in enumerate(all_public_keys):
2081 for inner_index, public_key in enumerate(public_keys):
2082 trust_ui.addLabel(D_("Contact"))
2083 trust_ui.addJid(jid.JID(owner))
2084 trust_ui.addLabel(D_("Fingerprint"))
2085 trust_ui.addText(public_key.fingerprint)
2086 trust_ui.addLabel(D_("Trust this device?"))
2087
2088 current_trust_level = await self.get_trust(client, public_key, owner)
2089 avaiable_trust_levels = \
2090 { TrustLevel.DISTRUSTED, TrustLevel.TRUSTED, current_trust_level }
2091
2092 trust_ui.addList(
2093 f"trust_{outer_index}_{inner_index}",
2094 options=[ trust_level.name for trust_level in avaiable_trust_levels ],
2095 selected=current_trust_level.name,
2096 styles=[ "inline" ]
2097 )
2098
2099 trust_ui.addEmpty()
2100 trust_ui.addEmpty()
2101
2102 return result